diff --git a/dbms/include/DB/Storages/MergeTree/MergeTreeDataWriter.h b/dbms/include/DB/Storages/MergeTree/MergeTreeDataWriter.h index 6e720bfd791..2220456ac33 100644 --- a/dbms/include/DB/Storages/MergeTree/MergeTreeDataWriter.h +++ b/dbms/include/DB/Storages/MergeTree/MergeTreeDataWriter.h @@ -34,12 +34,12 @@ public: MergeTreeDataWriter(MergeTreeData & data_) : data(data_), log(&Logger::get("MergeTreeDataWriter")), flags(O_TRUNC | O_CREAT | O_WRONLY) {} /** Разбивает блок на блоки, каждый из которых нужно записать в отдельный кусок. - * (читай: разбивает строки по месяцам) + * (читай: разбивает строки по месяцам) * Работает детерминированно: если отдать на вход такой же блок, на выходе получатся такие же блоки в таком же порядке. */ BlocksWithDateIntervals splitBlockIntoParts(const Block & block); - /** Все строки должны относиться к одному месяцу. Возвращает название временного куска. + /** Все строки должны относиться к одному месяцу. * temp_index - значение left и right для нового куска. Можно будет изменить при переименовании. * Возвращает кусок с именем, начинающимся с tmp_, еще не добавленный в MergeTreeData. */ @@ -56,7 +56,7 @@ private: /// Записать данные одного столбца. void writeData(const String & path, const String & name, const IDataType & type, const IColumn & column, - OffsetColumns & offset_columns, size_t level = 0); + OffsetColumns & offset_columns, MergeTreeData::DataPart::Checksums & checksums, size_t level = 0); }; } diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataWriter.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataWriter.cpp index a7c1550c7ee..e2ebfaab235 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataWriter.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataWriter.cpp @@ -2,6 +2,7 @@ #include #include #include +#include namespace DB { @@ -104,12 +105,15 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithDa /// Наконец-то можно писать данные на диск. LOG_TRACE(log, "Writing index."); - /// Сначала пишем индекс. Индекс содержит значение PK для каждой index_granularity строки. + MergeTreeData::DataPart::Checksums checksums; MergeTreeData::DataPart::Index index_vec; + + /// Сначала пишем индекс. Индекс содержит значение PK для каждой index_granularity строки. index_vec.reserve(part_size * sort_descr.size()); { - WriteBufferFromFile index(part_tmp_path + "primary.idx", DBMS_DEFAULT_BUFFER_SIZE, flags); + WriteBufferFromFile index_file(part_tmp_path + "primary.idx", DBMS_DEFAULT_BUFFER_SIZE, flags); + HashingWriteBuffer index(index_file); typedef std::vector PrimaryColumns; PrimaryColumns primary_columns; @@ -130,6 +134,8 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithDa } index.next(); + checksums.file_checksums["primary.idx"].size = index.count(); + checksums.file_checksums["primary.idx"].hash = index.getHash(); } LOG_TRACE(log, "Writing data."); @@ -140,7 +146,14 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithDa for (size_t i = 0; i < columns; ++i) { const ColumnWithNameAndType & column = block.getByPosition(i); - writeData(part_tmp_path, column.name, *column.type, *column.column, offset_columns); + writeData(part_tmp_path, column.name, *column.type, *column.column, offset_columns, checksums); + } + + /// Запишем файл с чексуммами. + { + WriteBufferFromFile checksums_file(part_tmp_path + "checksums.txt", 1024, flags); + checksums.writeText(checksums_file); + checksums_file.next(); } MergeTreeData::MutableDataPartPtr new_data_part = std::make_shared(data); @@ -155,12 +168,13 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithDa new_data_part->left_month = date_lut.toFirstDayNumOfMonth(new_data_part->left_date); new_data_part->right_month = date_lut.toFirstDayNumOfMonth(new_data_part->right_date); new_data_part->index.swap(index_vec); + new_data_part->checksums = checksums; return new_data_part; } void MergeTreeDataWriter::writeData(const String & path, const String & name, const IDataType & type, const IColumn & column, - OffsetColumns & offset_columns, size_t level) + OffsetColumns & offset_columns, MergeTreeData::DataPart::Checksums & checksums, size_t level) { String escaped_column_name = escapeForFileName(name); size_t size = column.size(); @@ -174,15 +188,17 @@ void MergeTreeDataWriter::writeData(const String & path, const String & name, co { offset_columns.insert(size_name); - WriteBufferFromFile plain(path + size_name + ".bin", DBMS_DEFAULT_BUFFER_SIZE, flags); - WriteBufferFromFile marks(path + size_name + ".mrk", 4096, flags); - CompressedWriteBuffer compressed(plain); + WriteBufferFromFile plain_file(path + size_name + ".bin", DBMS_DEFAULT_BUFFER_SIZE, flags); + WriteBufferFromFile marks_file(path + size_name + ".mrk", 4096, flags); + CompressedWriteBuffer compressed_file(plain_file); + HashingWriteBuffer marks(marks_file); + HashingWriteBuffer compressed(compressed_file); size_t prev_mark = 0; while (prev_mark < size) { /// Каждая засечка - это: (смещение в файле до начала сжатого блока, смещение внутри блока) - writeIntBinary(plain.count(), marks); + writeIntBinary(plain_file.count(), marks); writeIntBinary(compressed.offset(), marks); type_arr->serializeOffsets(column, compressed, prev_mark, data.index_granularity); @@ -192,45 +208,26 @@ void MergeTreeDataWriter::writeData(const String & path, const String & name, co } compressed.next(); - plain.next(); + plain_file.next(); marks.next(); + checksums.file_checksums[size_name + ".bin"].size = compressed.count(); + checksums.file_checksums[size_name + ".bin"].hash = compressed.getHash(); + checksums.file_checksums[size_name + ".mrk"].size = marks.count(); + checksums.file_checksums[size_name + ".mrk"].hash = marks.getHash(); } } - if (const DataTypeNested * type_nested = dynamic_cast(&type)) - { - String size_name = escaped_column_name + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level); - WriteBufferFromFile plain(path + size_name + ".bin", DBMS_DEFAULT_BUFFER_SIZE, flags); - WriteBufferFromFile marks(path + size_name + ".mrk", 4096, flags); - CompressedWriteBuffer compressed(plain); + { + WriteBufferFromFile plain_file(path + escaped_column_name + ".bin", DBMS_DEFAULT_BUFFER_SIZE, flags); + WriteBufferFromFile marks_file(path + escaped_column_name + ".mrk", 4096, flags); + CompressedWriteBuffer compressed_file(plain_file); + HashingWriteBuffer marks(marks_file); + HashingWriteBuffer compressed(compressed_file); size_t prev_mark = 0; while (prev_mark < size) { - /// Каждая засечка - это: (смещение в файле до начала сжатого блока, смещение внутри блока) - writeIntBinary(plain.count(), marks); - writeIntBinary(compressed.offset(), marks); - - type_nested->serializeOffsets(column, compressed, prev_mark, data.index_granularity); - prev_mark += data.index_granularity; - - compressed.nextIfAtEnd(); /// Чтобы вместо засечек, указывающих на конец сжатого блока, были засечки, указывающие на начало следующего. - } - - compressed.next(); - plain.next(); - marks.next(); - } - - { - WriteBufferFromFile plain(path + escaped_column_name + ".bin", DBMS_DEFAULT_BUFFER_SIZE, flags); - WriteBufferFromFile marks(path + escaped_column_name + ".mrk", 4096, flags); - CompressedWriteBuffer compressed(plain); - - size_t prev_mark = 0; - while (prev_mark < size) - { - writeIntBinary(plain.count(), marks); + writeIntBinary(plain_file.count(), marks); writeIntBinary(compressed.offset(), marks); type.serializeBinary(column, compressed, prev_mark, data.index_granularity); @@ -240,8 +237,12 @@ void MergeTreeDataWriter::writeData(const String & path, const String & name, co } compressed.next(); - plain.next(); + plain_file.next(); marks.next(); + checksums.file_checksums[escaped_column_name + ".bin"].size = compressed.count(); + checksums.file_checksums[escaped_column_name + ".bin"].hash = compressed.getHash(); + checksums.file_checksums[escaped_column_name + ".mrk"].size = marks.count(); + checksums.file_checksums[escaped_column_name + ".mrk"].hash = marks.getHash(); } }