ClickHouse/dbms/src/Storages/StorageLog.cpp
2014-10-03 19:30:10 +04:00

707 lines
23 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#include <map>
#include <Poco/Path.h>
#include <DB/Common/escapeForFileName.h>
#include <DB/Core/Exception.h>
#include <DB/Core/ErrorCodes.h>
#include <DB/IO/ReadHelpers.h>
#include <DB/IO/WriteHelpers.h>
#include <DB/DataTypes/DataTypeArray.h>
#include <DB/DataTypes/DataTypeNested.h>
#include <DB/Columns/ColumnArray.h>
#include <DB/Columns/ColumnNested.h>
#include <DB/Storages/StorageLog.h>
#include <DB/DataTypes/DataTypeString.h>
#define DBMS_STORAGE_LOG_DATA_FILE_EXTENSION ".bin"
#define DBMS_STORAGE_LOG_MARKS_FILE_EXTENSION ".mrk"
#define DBMS_STORAGE_LOG_MARKS_FILE_NAME "__marks.mrk"
namespace DB
{
using Poco::SharedPtr;
LogBlockInputStream::LogBlockInputStream(size_t block_size_, const Names & column_names_, StorageLog & storage_, size_t mark_number_, size_t rows_limit_)
: block_size(block_size_), column_names(column_names_), storage(storage_),
mark_number(mark_number_), rows_limit(rows_limit_), rows_read(0), current_mark(mark_number_)
{
}
String LogBlockInputStream::getID() const
{
std::stringstream res;
res << "Log(" << storage.getTableName() << ", " << &storage << ", " << mark_number << ", " << rows_limit;
for (size_t i = 0; i < column_names.size(); ++i)
res << ", " << column_names[i];
res << ")";
return res.str();
}
Block LogBlockInputStream::readImpl()
{
Block res;
if (rows_read == rows_limit)
return res;
/// Если файлы не открыты, то открываем их.
if (streams.empty())
{
Poco::ScopedReadRWLock lock(storage.rwlock);
for (Names::const_iterator it = column_names.begin(); it != column_names.end(); ++it)
if (*it != storage._table_column_name) /// Для виртуального столбца не надо ничего открывать
addStream(*it, *storage.getDataTypeByName(*it));
}
bool has_virtual_column_table = false;
for (Names::const_iterator it = column_names.begin(); it != column_names.end(); ++it)
if (*it == storage._table_column_name)
has_virtual_column_table = true;
/// Сколько строк читать для следующего блока.
size_t max_rows_to_read = std::min(block_size, rows_limit - rows_read);
const Marks & marks = storage.getMarksWithRealRowCount();
std::pair<String, size_t> current_table;
/// Отдельно обрабатываем виртуальный столбец
if (has_virtual_column_table)
{
size_t current_row = rows_read;
if (mark_number > 0)
current_row += marks[mark_number-1].rows;
while (current_mark < marks.size() && marks[current_mark].rows <= current_row)
++current_mark;
current_table = storage.getTableFromMark(current_mark);
current_table.second = std::min(current_table.second, marks.size() - 1);
max_rows_to_read = std::min(max_rows_to_read, marks[current_table.second].rows - current_row);
}
/// Указатели на столбцы смещений, общие для столбцов из вложенных структур данных
typedef std::map<std::string, ColumnPtr> OffsetColumns;
OffsetColumns offset_columns;
for (Names::const_iterator it = column_names.begin(); it != column_names.end(); ++it)
{
/// Виртуальный столбец не надо считывать с жесткого диска
if (*it == storage._table_column_name)
continue;
ColumnWithNameAndType column;
column.name = *it;
column.type = storage.getDataTypeByName(*it);
bool read_offsets = true;
/// Для вложенных структур запоминаем указатели на столбцы со смещениями
if (const DataTypeArray * type_arr = typeid_cast<const DataTypeArray *>(&*column.type))
{
String name = DataTypeNested::extractNestedTableName(column.name);
if (offset_columns.count(name) == 0)
offset_columns[name] = new ColumnArray::ColumnOffsets_t;
else
read_offsets = false; /// на предыдущих итерациях смещения уже считали вызовом readData
column.column = new ColumnArray(type_arr->getNestedType()->createColumn(), offset_columns[name]);
}
else
column.column = column.type->createColumn();
readData(*it, *column.type, *column.column, max_rows_to_read, 0, read_offsets);
if (column.column->size())
res.insert(column);
}
/// Отдельно обрабатываем виртуальный столбец
if (has_virtual_column_table)
{
size_t rows = max_rows_to_read;
if (res.columns() > 0)
rows = res.rows();
if (rows > 0)
{
ColumnPtr column_ptr = ColumnConst<String> (rows, current_table.first, new DataTypeString).convertToFullColumn();
ColumnWithNameAndType column(column_ptr, new DataTypeString, storage._table_column_name);
res.insert(column);
}
}
if (res)
rows_read += res.rows();
if (!res || rows_read == rows_limit)
{
/** Закрываем файлы (ещё до уничтожения объекта).
* Чтобы при создании многих источников, но одновременном чтении только из нескольких,
* буферы не висели в памяти.
*/
streams.clear();
}
return res;
}
void LogBlockInputStream::addStream(const String & name, const IDataType & type, size_t level)
{
/// Для массивов используются отдельные потоки для размеров.
if (const DataTypeArray * type_arr = typeid_cast<const DataTypeArray *>(&type))
{
String size_name = DataTypeNested::extractNestedTableName(name) + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level);
if (!streams.count(size_name))
streams.emplace(size_name, std::unique_ptr<Stream>(new Stream(
storage.files[size_name].data_file.path(),
mark_number
? storage.files[size_name].marks[mark_number].offset
: 0)));
addStream(name, *type_arr->getNestedType(), level + 1);
}
else if (const DataTypeNested * type_nested = typeid_cast<const DataTypeNested *>(&type))
{
String size_name = name + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level);
streams[size_name].reset(new Stream(
storage.files[size_name].data_file.path(),
mark_number
? storage.files[size_name].marks[mark_number].offset
: 0));
const NamesAndTypesList & columns = *type_nested->getNestedTypesList();
for (NamesAndTypesList::const_iterator it = columns.begin(); it != columns.end(); ++it)
addStream(DataTypeNested::concatenateNestedName(name, it->name), *it->type, level + 1);
}
else
streams[name].reset(new Stream(
storage.files[name].data_file.path(),
mark_number
? storage.files[name].marks[mark_number].offset
: 0));
}
void LogBlockInputStream::readData(const String & name, const IDataType & type, IColumn & column, size_t max_rows_to_read,
size_t level, bool read_offsets)
{
/// Для массивов требуется сначала десериализовать размеры, а потом значения.
if (const DataTypeArray * type_arr = typeid_cast<const DataTypeArray *>(&type))
{
if (read_offsets)
{
type_arr->deserializeOffsets(
column,
streams[DataTypeNested::extractNestedTableName(name) + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level)]->compressed,
max_rows_to_read);
}
if (column.size())
readData(
name,
*type_arr->getNestedType(),
typeid_cast<ColumnArray &>(column).getData(),
typeid_cast<const ColumnArray &>(column).getOffsets()[column.size() - 1],
level + 1);
}
else if (const DataTypeNested * type_nested = typeid_cast<const DataTypeNested *>(&type))
{
type_nested->deserializeOffsets(
column,
streams[name + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level)]->compressed,
max_rows_to_read);
if (column.size())
{
ColumnNested & column_nested = typeid_cast<ColumnNested &>(column);
NamesAndTypesList::const_iterator it = type_nested->getNestedTypesList()->begin();
for (size_t i = 0; i < column_nested.getData().size(); ++i, ++it)
{
readData(
DataTypeNested::concatenateNestedName(name, it->name),
*it->type,
*column_nested.getData()[i],
column_nested.getOffsets()[column.size() - 1],
level + 1);
}
}
}
else
type.deserializeBinary(column, streams[name]->compressed, max_rows_to_read);
}
LogBlockOutputStream::LogBlockOutputStream(StorageLog & storage_)
: storage(storage_),
lock(storage.rwlock), marks_stream(storage.marks_file.path(), 4096, O_APPEND | O_CREAT | O_WRONLY)
{
for (NamesAndTypesList::const_iterator it = storage.columns->begin(); it != storage.columns->end(); ++it)
addStream(it->name, *it->type);
}
void LogBlockOutputStream::write(const Block & block)
{
storage.check(block, true);
/// Множество записанных столбцов со смещениями, чтобы не писать общие для вложенных структур столбцы несколько раз
OffsetColumns offset_columns;
MarksForColumns marks;
marks.reserve(storage.files.size());
for (size_t i = 0; i < block.columns(); ++i)
{
const ColumnWithNameAndType & column = block.getByPosition(i);
writeData(column.name, *column.type, *column.column, marks, offset_columns);
}
writeMarks(marks);
}
void LogBlockOutputStream::writeSuffix()
{
/// Заканчиваем запись.
marks_stream.next();
for (FileStreams::iterator it = streams.begin(); it != streams.end(); ++it)
it->second->finalize();
std::vector<Poco::File> column_files;
for (auto & pair : streams)
column_files.push_back(storage.files[pair.first].data_file);
column_files.push_back(storage.marks_file);
storage.file_checker.update(column_files.begin(), column_files.end());
streams.clear();
}
void LogBlockOutputStream::addStream(const String & name, const IDataType & type, size_t level)
{
/// Для массивов используются отдельные потоки для размеров.
if (const DataTypeArray * type_arr = typeid_cast<const DataTypeArray *>(&type))
{
String size_name = DataTypeNested::extractNestedTableName(name) + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level);
if (!streams.count(size_name))
streams.emplace(size_name, std::unique_ptr<Stream>(new Stream(
storage.files[size_name].data_file.path(), storage.max_compress_block_size)));
addStream(name, *type_arr->getNestedType(), level + 1);
}
else if (const DataTypeNested * type_nested = typeid_cast<const DataTypeNested *>(&type))
{
String size_name = name + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level);
streams[size_name].reset(new Stream(storage.files[size_name].data_file.path(), storage.max_compress_block_size));
const NamesAndTypesList & columns = *type_nested->getNestedTypesList();
for (NamesAndTypesList::const_iterator it = columns.begin(); it != columns.end(); ++it)
addStream(DataTypeNested::concatenateNestedName(name, it->name), *it->type, level + 1);
}
else
streams[name].reset(new Stream(storage.files[name].data_file.path(), storage.max_compress_block_size));
}
void LogBlockOutputStream::writeData(const String & name, const IDataType & type, const IColumn & column, MarksForColumns & out_marks,
OffsetColumns & offset_columns, size_t level)
{
/// Для массивов требуется сначала сериализовать размеры, а потом значения.
if (const DataTypeArray * type_arr = typeid_cast<const DataTypeArray *>(&type))
{
String size_name = DataTypeNested::extractNestedTableName(name) + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level);
if (offset_columns.count(size_name) == 0)
{
offset_columns.insert(size_name);
Mark mark;
mark.rows = (storage.files[size_name].marks.empty() ? 0 : storage.files[size_name].marks.back().rows) + column.size();
mark.offset = streams[size_name]->plain_offset + streams[size_name]->plain.count();
out_marks.push_back(std::make_pair(storage.files[size_name].column_index, mark));
type_arr->serializeOffsets(column, streams[size_name]->compressed);
streams[size_name]->compressed.next();
}
writeData(name, *type_arr->getNestedType(), typeid_cast<const ColumnArray &>(column).getData(), out_marks, offset_columns, level + 1);
}
else if (const DataTypeNested * type_nested = typeid_cast<const DataTypeNested *>(&type))
{
String size_name = name + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level);
Mark mark;
mark.rows = (storage.files[size_name].marks.empty() ? 0 : storage.files[size_name].marks.back().rows) + column.size();
mark.offset = streams[size_name]->plain_offset + streams[size_name]->plain.count();
out_marks.push_back(std::make_pair(storage.files[size_name].column_index, mark));
type_nested->serializeOffsets(column, streams[size_name]->compressed);
streams[size_name]->compressed.next();
const ColumnNested & column_nested = typeid_cast<const ColumnNested &>(column);
NamesAndTypesList::const_iterator it = type_nested->getNestedTypesList()->begin();
for (size_t i = 0; i < column_nested.getData().size(); ++i, ++it)
{
writeData(
DataTypeNested::concatenateNestedName(name, it->name),
*it->type,
*column_nested.getData()[i],
out_marks,
offset_columns,
level + 1);
}
}
else
{
Mark mark;
mark.rows = (storage.files[name].marks.empty() ? 0 : storage.files[name].marks.back().rows) + column.size();
mark.offset = streams[name]->plain_offset + streams[name]->plain.count();
out_marks.push_back(std::make_pair(storage.files[name].column_index, mark));
type.serializeBinary(column, streams[name]->compressed);
streams[name]->compressed.next();
}
}
static bool ColumnIndexLess(const std::pair<size_t, Mark> & a, const std::pair<size_t, Mark> & b)
{
return a.first < b.first;
}
void LogBlockOutputStream::writeMarks(MarksForColumns marks)
{
if (marks.size() != storage.files.size())
throw Exception("Wrong number of marks generated from block. Makes no sense.", ErrorCodes::LOGICAL_ERROR);
sort(marks.begin(), marks.end(), ColumnIndexLess);
for (size_t i = 0; i < marks.size(); ++i)
{
if (marks[i].first != i)
throw Exception("Invalid marks generated from block. Makes no sense.", ErrorCodes::LOGICAL_ERROR);
Mark mark = marks[i].second;
writeIntBinary(mark.rows, marks_stream);
writeIntBinary(mark.offset, marks_stream);
storage.files[storage.column_names[i]].marks.push_back(mark);
}
}
StorageLog::StorageLog(
const std::string & path_,
const std::string & name_,
NamesAndTypesListPtr columns_,
const NamesAndTypesList & materialized_columns_,
const NamesAndTypesList & alias_columns_,
const ColumnDefaults & column_defaults_,
size_t max_compress_block_size_)
: IStorage{materialized_columns_, alias_columns_, column_defaults_},
path(path_), name(name_), columns(columns_),
loaded_marks(false), max_compress_block_size(max_compress_block_size_),
file_checker(path + escapeForFileName(name) + '/' + "sizes.json", *this)
{
if (columns->empty())
throw Exception("Empty list of columns passed to StorageLog constructor", ErrorCodes::EMPTY_LIST_OF_COLUMNS_PASSED);
/// создаём файлы, если их нет
Poco::File(path + escapeForFileName(name) + '/').createDirectories();
for (NamesAndTypesList::const_iterator it = columns->begin(); it != columns->end(); ++it)
addFile(it->name, *it->type);
marks_file = Poco::File(path + escapeForFileName(name) + '/' + DBMS_STORAGE_LOG_MARKS_FILE_NAME);
}
StoragePtr StorageLog::create(
const std::string & path_,
const std::string & name_,
NamesAndTypesListPtr columns_,
const NamesAndTypesList & materialized_columns_,
const NamesAndTypesList & alias_columns_,
const ColumnDefaults & column_defaults_,
size_t max_compress_block_size_)
{
return (new StorageLog{
path_, name_, columns_,
materialized_columns_, alias_columns_, column_defaults_,
max_compress_block_size_
})->thisPtr();
}
void StorageLog::addFile(const String & column_name, const IDataType & type, size_t level)
{
if (files.end() != files.find(column_name))
throw Exception("Duplicate column with name " + column_name + " in constructor of StorageLog.",
ErrorCodes::DUPLICATE_COLUMN);
if (const DataTypeArray * type_arr = typeid_cast<const DataTypeArray *>(&type))
{
String size_column_suffix = ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level);
String size_name = DataTypeNested::extractNestedTableName(column_name) + size_column_suffix;
if (files.end() == files.find(size_name))
{
ColumnData & column_data = files.insert(std::make_pair(size_name, ColumnData())).first->second;
column_data.column_index = column_names.size();
column_data.data_file = Poco::File(
path + escapeForFileName(name) + '/' + escapeForFileName(DataTypeNested::extractNestedTableName(column_name)) + size_column_suffix + DBMS_STORAGE_LOG_DATA_FILE_EXTENSION);
column_names.push_back(size_name);
}
addFile(column_name, *type_arr->getNestedType(), level + 1);
}
else if (const DataTypeNested * type_nested = typeid_cast<const DataTypeNested *>(&type))
{
String size_column_suffix = ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level);
ColumnData & column_data = files.insert(std::make_pair(column_name + size_column_suffix, ColumnData())).first->second;
column_data.column_index = column_names.size();
column_data.data_file = Poco::File(
path + escapeForFileName(name) + '/' + escapeForFileName(column_name) + size_column_suffix + DBMS_STORAGE_LOG_DATA_FILE_EXTENSION);
column_names.push_back(column_name + size_column_suffix);
const NamesAndTypesList & columns = *type_nested->getNestedTypesList();
for (NamesAndTypesList::const_iterator it = columns.begin(); it != columns.end(); ++it)
addFile(DataTypeNested::concatenateNestedName(name, it->name), *it->type, level + 1);
}
else
{
ColumnData & column_data = files.insert(std::make_pair(column_name, ColumnData())).first->second;
column_data.column_index = column_names.size();
column_data.data_file = Poco::File(
path + escapeForFileName(name) + '/' + escapeForFileName(column_name) + DBMS_STORAGE_LOG_DATA_FILE_EXTENSION);
column_names.push_back(column_name);
}
}
void StorageLog::loadMarks()
{
Poco::ScopedWriteRWLock lock(rwlock);
if (loaded_marks)
return;
typedef std::vector<Files_t::iterator> FilesByIndex;
FilesByIndex files_by_index(files.size());
for (Files_t::iterator it = files.begin(); it != files.end(); ++it)
{
files_by_index[it->second.column_index] = it;
}
if (marks_file.exists())
{
size_t file_size = marks_file.getSize();
if (file_size % (files.size() * sizeof(Mark)) != 0)
throw Exception("Size of marks file is inconsistent", ErrorCodes::SIZES_OF_MARKS_FILES_ARE_INCONSISTENT);
int marks_count = file_size / (files.size() * sizeof(Mark));
for (size_t i = 0; i < files_by_index.size(); ++i)
{
files_by_index[i]->second.marks.reserve(marks_count);
}
ReadBufferFromFile marks_rb(marks_file.path(), 32768);
while (!marks_rb.eof())
{
for (size_t i = 0; i < files_by_index.size(); ++i)
{
Mark mark;
readIntBinary(mark.rows, marks_rb);
readIntBinary(mark.offset, marks_rb);
files_by_index[i]->second.marks.push_back(mark);
}
}
}
loaded_marks = true;
}
size_t StorageLog::marksCount()
{
return files.begin()->second.marks.size();
}
void StorageLog::rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name)
{
Poco::ScopedWriteRWLock lock(rwlock);
/// Переименовываем директорию с данными.
Poco::File(path + escapeForFileName(name)).renameTo(new_path_to_db + escapeForFileName(new_table_name));
path = new_path_to_db;
name = new_table_name;
file_checker.setPath(path + escapeForFileName(name) + '/' + "sizes.json");
for (Files_t::iterator it = files.begin(); it != files.end(); ++it)
{
it->second.data_file = Poco::File(path + escapeForFileName(name) + '/' + Poco::Path(it->second.data_file.path()).getFileName());
}
marks_file = Poco::File(path + escapeForFileName(name) + '/' + DBMS_STORAGE_LOG_MARKS_FILE_NAME);
}
const Marks & StorageLog::getMarksWithRealRowCount() const
{
const String & column_name = columns->front().name;
const IDataType & column_type = *columns->front().type;
String file_name;
/** Засечки достаём из первого столбца.
* Если это - массив, то берём засечки, соответствующие размерам, а не внутренностям массивов.
*/
if (typeid_cast<const DataTypeArray *>(&column_type))
{
file_name = DataTypeNested::extractNestedTableName(column_name) + ARRAY_SIZES_COLUMN_NAME_SUFFIX "0";
}
else if (typeid_cast<const DataTypeNested *>(&column_type))
{
file_name = column_name + ARRAY_SIZES_COLUMN_NAME_SUFFIX "0";
}
else
{
file_name = column_name;
}
Files_t::const_iterator it = files.find(file_name);
if (files.end() == it)
throw Exception("Cannot find file " + file_name, ErrorCodes::LOGICAL_ERROR);
return it->second.marks;
}
BlockInputStreams StorageLog::read(
size_t from_mark,
size_t to_mark,
const Names & column_names,
ASTPtr query,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size,
unsigned threads)
{
/** Если читаем все данные в один поток, то засечки не требуются.
* Отсутствие необходимости загружать засечки позволяет уменьшить потребление памяти при использовании таблицы типа ChunkMerger.
*/
bool read_all_data_in_one_thread = (threads == 1 && from_mark == 0 && to_mark == std::numeric_limits<size_t>::max());
if (!read_all_data_in_one_thread)
loadMarks();
bool has_virtual_column = false;
Names real_column_names;
for (const auto & column : column_names)
if (column != _table_column_name)
real_column_names.push_back(column);
else
has_virtual_column = true;
/// Если есть виртуальный столбец и нет остальных, то ничего проверять не надо
if (!(has_virtual_column && real_column_names.size() == 0))
check(real_column_names);
processed_stage = QueryProcessingStage::FetchColumns;
Poco::ScopedReadRWLock lock(rwlock);
BlockInputStreams res;
if (read_all_data_in_one_thread)
{
res.push_back(new LogBlockInputStream(
max_block_size,
column_names,
*this,
0, std::numeric_limits<size_t>::max()));
}
else
{
const Marks & marks = getMarksWithRealRowCount();
size_t marks_size = marks.size();
if (to_mark == std::numeric_limits<size_t>::max())
to_mark = marks_size;
if (to_mark > marks_size || to_mark < from_mark)
throw Exception("Marks out of range in StorageLog::read", ErrorCodes::LOGICAL_ERROR);
if (threads > to_mark - from_mark)
threads = to_mark - from_mark;
for (size_t thread = 0; thread < threads; ++thread)
{
res.push_back(new LogBlockInputStream(
max_block_size,
column_names,
*this,
from_mark + thread * (to_mark - from_mark) / threads,
marks[from_mark + (thread + 1) * (to_mark - from_mark) / threads - 1].rows -
((thread == 0 && from_mark == 0)
? 0
: marks[from_mark + thread * (to_mark - from_mark) / threads - 1].rows)));
}
}
return res;
}
BlockInputStreams StorageLog::read(
const Names & column_names,
ASTPtr query,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size,
unsigned threads)
{
return read(0, std::numeric_limits<size_t>::max(), column_names, query, settings, processed_stage, max_block_size, threads);
}
BlockOutputStreamPtr StorageLog::write(
ASTPtr query)
{
loadMarks();
return new LogBlockOutputStream(*this);
}
bool StorageLog::checkData() const
{
Poco::ScopedReadRWLock lock(const_cast<Poco::RWLock &>(rwlock));
return file_checker.check();
}
}