diff --git a/dbms/src/Storages/MergeTree/IMergeTreeDataPart.h b/dbms/src/Storages/MergeTree/IMergeTreeDataPart.h index dccb52246dd..d8815986d49 100644 --- a/dbms/src/Storages/MergeTree/IMergeTreeDataPart.h +++ b/dbms/src/Storages/MergeTree/IMergeTreeDataPart.h @@ -35,7 +35,7 @@ struct ColumnSize; class MergeTreeData; class IMergeTreeReader; -class IMergeTreeWriter; +class IMergeTreeDataPartWriter; namespace ErrorCodes { @@ -50,7 +50,7 @@ public: using ValueSizeMap = std::map; using MergeTreeReaderPtr = std::unique_ptr; - using MergeTreeWriterPtr = std::unique_ptr; + using MergeTreeWriterPtr = std::unique_ptr; // virtual BlockInputStreamPtr readAll() = 0; // virtual BlockInputStreamPtr read() = 0; @@ -68,7 +68,6 @@ public: virtual MergeTreeWriterPtr getWriter( const NamesAndTypesList & columns_list, - const IColumn::Permutation * permutation, const CompressionCodecPtr & default_codec_, const WriterSettings & writer_settings) const = 0; diff --git a/dbms/src/Storages/MergeTree/IMergeTreeDataPartWriter.cpp b/dbms/src/Storages/MergeTree/IMergeTreeDataPartWriter.cpp index b4a8feb135e..f1bd137be6b 100644 --- a/dbms/src/Storages/MergeTree/IMergeTreeDataPartWriter.cpp +++ b/dbms/src/Storages/MergeTree/IMergeTreeDataPartWriter.cpp @@ -7,14 +7,12 @@ IMergeTreeDataPartWriter::IMergeTreeDataPartWriter( const String & part_path_, const MergeTreeData & storage_, const NamesAndTypesList & columns_list_, - const IColumn::Permutation * permutation_, const String & marks_file_extension_, const CompressionCodecPtr & default_codec_, const WriterSettings & settings_) : part_path(part_path_) , storage(storage_) , columns_list(columns_list_) -, permutation(permutation_) , marks_file_extension(marks_file_extension_) , default_codec(default_codec_) , settings(settings_) {} diff --git a/dbms/src/Storages/MergeTree/IMergeTreeDataPartWriter.h b/dbms/src/Storages/MergeTree/IMergeTreeDataPartWriter.h index 3adf125a18a..f39399a2818 100644 --- a/dbms/src/Storages/MergeTree/IMergeTreeDataPartWriter.h +++ b/dbms/src/Storages/MergeTree/IMergeTreeDataPartWriter.h @@ -61,17 +61,17 @@ public: const String & part_path, const MergeTreeData & storage, const NamesAndTypesList & columns_list, - const IColumn::Permutation * permutation, const String & marks_file_extension, const CompressionCodecPtr & default_codec, const WriterSettings & settings); virtual size_t write( - const Block & block, size_t from_mark, size_t offset, const MergeTreeIndexGranularity & index_granularity, + const Block & block, const IColumn::Permutation * permutation, + size_t from_mark, size_t offset, const MergeTreeIndexGranularity & index_granularity, /* Blocks with already sorted index columns */ const Block & primary_key_block = {}, const Block & skip_indexes_block = {}) = 0; - - // virtual void writeFinalMarks() = 0; + + virtual void finalize(IMergeTreeDataPart::Checksums & checksums, bool write_final_mark) = 0; virtual ~IMergeTreeDataPartWriter(); @@ -82,7 +82,6 @@ protected: String part_path; const MergeTreeData & storage; NamesAndTypesList columns_list; - const IColumn::Permutation * permutation; const String marks_file_extension; CompressionCodecPtr default_codec; diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataPartCompact.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataPartCompact.cpp index 251779f2d9f..dba86814959 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataPartCompact.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataPartCompact.cpp @@ -87,12 +87,10 @@ IMergeTreeDataPart::MergeTreeReaderPtr MergeTreeDataPartCompact::getReader( IMergeTreeDataPart::MergeTreeWriterPtr MergeTreeDataPartCompact::getWriter( const NamesAndTypesList & columns_list, - const IColumn::Permutation * permutation, const CompressionCodecPtr & default_codec, const WriterSettings & writer_settings) const { UNUSED(columns_list); - UNUSED(permutation); UNUSED(default_codec); UNUSED(writer_settings); return {}; diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataPartCompact.h b/dbms/src/Storages/MergeTree/MergeTreeDataPartCompact.h index 4a233843a1b..7b9e9631a49 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataPartCompact.h +++ b/dbms/src/Storages/MergeTree/MergeTreeDataPartCompact.h @@ -60,7 +60,6 @@ public: MergeTreeWriterPtr getWriter( const NamesAndTypesList & columns_list, - const IColumn::Permutation * permutation, const CompressionCodecPtr & default_codec, const WriterSettings & writer_settings) const override; diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataPartWide.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataPartWide.cpp index aeb18c08f44..8d55ba399b9 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataPartWide.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataPartWide.cpp @@ -24,6 +24,8 @@ #include #include +#include +#include namespace DB @@ -87,15 +89,13 @@ IMergeTreeDataPart::MergeTreeReaderPtr MergeTreeDataPartWide::getReader( IMergeTreeDataPart::MergeTreeWriterPtr MergeTreeDataPartWide::getWriter( const NamesAndTypesList & columns_list, - const IColumn::Permutation * permutation, const CompressionCodecPtr & default_codec, const WriterSettings & writer_settings) const { - UNUSED(columns_list); - UNUSED(permutation); - UNUSED(default_codec); - UNUSED(writer_settings); - return {}; + return std::make_unique( + getFullPath(), storage, columns_list, + index_granularity_info.marks_file_extension, + default_codec, writer_settings); } diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataPartWide.h b/dbms/src/Storages/MergeTree/MergeTreeDataPartWide.h index 5f9672ad015..81c3bb776a5 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataPartWide.h +++ b/dbms/src/Storages/MergeTree/MergeTreeDataPartWide.h @@ -60,7 +60,6 @@ public: MergeTreeWriterPtr getWriter( const NamesAndTypesList & columns_list, - const IColumn::Permutation * permutation, const CompressionCodecPtr & default_codec_, const WriterSettings & writer_settings) const override; diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.cpp index 81271badb27..3fbd61b8dc5 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.cpp @@ -3,7 +3,9 @@ namespace DB { -size_t MergeTreeDataPartWriterCompact::write(const Block & block, size_t from_mark, size_t index_offset, +size_t MergeTreeDataPartWriterCompact::write( + const Block & block, const IColumn::Permutation * permutation, + size_t from_mark, size_t index_offset, const MergeTreeIndexGranularity & index_granularity, const Block & primary_key_block, const Block & skip_indexes_block) { @@ -83,4 +85,21 @@ size_t MergeTreeDataPartWriterCompact::writeColumnSingleGranule(const ColumnWith return from_row + number_of_rows; } +void MergeTreeDataPartWriterCompact::finalize(IMergeTreeDataPart::Checksums & checksums, bool write_final_mark) +{ + if (write_final_mark) + { + writeIntBinary(0, stream->marks); + for (size_t i = 0; i < columns_list.size(); ++i) + { + writeIntBinary(stream->plain_hashing.count(), stream->marks); + writeIntBinary(stream->compressed.offset(), stream->marks); + } + } + + stream->finalize(); + stream->addToChecksums(checksums); + stream.reset(); +} + } diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.h b/dbms/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.h index 6fb641d69ce..514dc8ddff4 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.h +++ b/dbms/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.h @@ -6,16 +6,19 @@ namespace DB class MergeTreeDataPartWriterCompact : IMergeTreeDataPartWriter { public: - size_t write(const Block & block, size_t from_mark, size_t index_offset, const MergeTreeIndexGranularity & index_granularity, + size_t write(const Block & block, const IColumn::Permutation * permutation, + size_t from_mark, size_t index_offset, const MergeTreeIndexGranularity & index_granularity, const Block & primary_key_block, const Block & skip_indexes_block) override; + void finalize(IMergeTreeDataPart::Checksums & checksums, bool write_final_mark) override; + +private: /// Write single granule of one column (rows between 2 marks) size_t writeColumnSingleGranule( const ColumnWithTypeAndName & column, size_t from_row, size_t number_of_rows); - -private: + ColumnStreamPtr stream; }; diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp index 69ec99fcf74..a2fa713f0f5 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp @@ -12,13 +12,13 @@ MergeTreeDataPartWriterWide::MergeTreeDataPartWriterWide( const String & part_path_, const MergeTreeData & storage_, const NamesAndTypesList & columns_list_, - const IColumn::Permutation * permutation_, const String & marks_file_extension_, const CompressionCodecPtr & default_codec_, const WriterSettings & settings_, const ColumnToSize & merged_column_to_size) - : IMergeTreeDataPartWriter(part_path_, storage_, columns_list_, - permutation_, marks_file_extension_, + : IMergeTreeDataPartWriter(part_path_, + storage_, columns_list_, + marks_file_extension_, default_codec_, settings_) { size_t total_size = 0; @@ -89,9 +89,10 @@ IDataType::OutputStreamGetter MergeTreeDataPartWriterWide::createStreamGetter( }; } -size_t MergeTreeDataPartWriterWide::write(const Block & block, size_t from_mark, size_t index_offset, - const MergeTreeIndexGranularity & index_granularity, - const Block & primary_key_block, const Block & skip_indexes_block) +size_t MergeTreeDataPartWriterWide::write(const Block & block, + const IColumn::Permutation * permutation, size_t from_mark, size_t index_offset, + const MergeTreeIndexGranularity & index_granularity, + const Block & primary_key_block, const Block & skip_indexes_block) { if (serialization_states.empty()) { @@ -286,4 +287,56 @@ std::pair MergeTreeDataPartWriterWide::writeColumn( return std::make_pair(current_column_mark, current_row - total_rows); } +void MergeTreeDataPartWriterWide::finalize(IMergeTreeDataPart::Checksums & checksums, bool write_final_mark) +{ + const auto & settings = storage.global_context.getSettingsRef(); + IDataType::SerializeBinaryBulkSettings serialize_settings; + serialize_settings.low_cardinality_max_dictionary_size = settings.low_cardinality_max_dictionary_size; + serialize_settings.low_cardinality_use_single_dictionary_for_part = settings.low_cardinality_use_single_dictionary_for_part != 0; + WrittenOffsetColumns offset_columns; + + { + auto it = columns_list.begin(); + for (size_t i = 0; i < columns_list.size(); ++i, ++it) + { + if (!serialization_states.empty()) + { + serialize_settings.getter = createStreamGetter(it->name, offset_columns, false); + it->type->serializeBinaryBulkStateSuffix(serialize_settings, serialization_states[i]); + } + + if (write_final_mark) + writeFinalMark(it->name, it->type, offset_columns, false, serialize_settings.path); + } + } + + for (ColumnStreams::iterator it = column_streams.begin(); it != column_streams.end(); ++it) + { + it->second->finalize(); + it->second->addToChecksums(checksums); + } + + column_streams.clear(); +} + +void MergeTreeDataPartWriterWide::writeFinalMark( + const std::string & column_name, + const DataTypePtr column_type, + WrittenOffsetColumns & offset_columns, + bool skip_offsets, + DB::IDataType::SubstreamPath & path) +{ + writeSingleMark(column_name, *column_type, offset_columns, skip_offsets, 0, path); + /// Memoize information about offsets + column_type->enumerateStreams([&] (const IDataType::SubstreamPath & substream_path) + { + bool is_offsets = !substream_path.empty() && substream_path.back().type == IDataType::Substream::ArraySizes; + if (is_offsets) + { + String stream_name = IDataType::getFileNameForStream(column_name, substream_path); + offset_columns.insert(stream_name); + } + }, path); +} + } diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataPartWriterWide.h b/dbms/src/Storages/MergeTree/MergeTreeDataPartWriterWide.h index 5b9d89e8d4e..fd1887741d4 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataPartWriterWide.h +++ b/dbms/src/Storages/MergeTree/MergeTreeDataPartWriterWide.h @@ -3,7 +3,7 @@ namespace DB { -class MergeTreeDataPartWriterWide : IMergeTreeDataPartWriter +class MergeTreeDataPartWriterWide : public IMergeTreeDataPartWriter { public: @@ -13,16 +13,18 @@ public: const String & part_path, const MergeTreeData & storage, const NamesAndTypesList & columns_list, - const IColumn::Permutation * permutation, const String & marks_file_extension, const CompressionCodecPtr & default_codec, const WriterSettings & settings, const ColumnToSize & merged_column_to_size = {}); - size_t write(const Block & block, size_t from_mark, size_t index_offset, + size_t write(const Block & block, const IColumn::Permutation * permutation, + size_t from_mark, size_t index_offset, const MergeTreeIndexGranularity & index_granularity, const Block & primary_key_block, const Block & skip_indexes_block) override; + void finalize(IMergeTreeDataPart::Checksums & checksums, bool write_final_mark) override; + IDataType::OutputStreamGetter createStreamGetter(const String & name, WrittenOffsetColumns & offset_columns, bool skip_offsets); /// Write data of one column. @@ -61,6 +63,13 @@ private: bool skip_offsets, size_t number_of_rows, DB::IDataType::SubstreamPath & path); + + void writeFinalMark( + const std::string & column_name, + const DataTypePtr column_type, + WrittenOffsetColumns & offset_columns, + bool skip_offsets, + DB::IDataType::SubstreamPath & path); void addStreams( const String & name, diff --git a/dbms/src/Storages/MergeTree/MergedBlockOutputStream.cpp b/dbms/src/Storages/MergeTree/MergedBlockOutputStream.cpp index 670e7f2b644..bb1ddc2400f 100644 --- a/dbms/src/Storages/MergeTree/MergedBlockOutputStream.cpp +++ b/dbms/src/Storages/MergeTree/MergedBlockOutputStream.cpp @@ -37,6 +37,7 @@ MergedBlockOutputStream::MergedBlockOutputStream( , columns_list(columns_list_) { init(); + writer = data_part_->getWriter(columns_list_, default_codec_, writer_settings); } MergedBlockOutputStream::MergedBlockOutputStream( @@ -58,6 +59,7 @@ MergedBlockOutputStream::MergedBlockOutputStream( , columns_list(columns_list_) { init(); + writer = data_part_->getWriter(columns_list_, default_codec_, writer_settings); } std::string MergedBlockOutputStream::getPartPath() const @@ -89,37 +91,19 @@ void MergedBlockOutputStream::writeSuffixAndFinalizePart( const NamesAndTypesList * total_column_list, MergeTreeData::DataPart::Checksums * additional_column_checksums) { + /// Finish write and get checksums. + MergeTreeData::DataPart::Checksums checksums; + /// Finish columns serialization. - { - /// FIXME - // const auto & settings = storage.global_context.getSettingsRef(); - // IDataType::SerializeBinaryBulkSettings serialize_settings; - // serialize_settings.low_cardinality_max_dictionary_size = settings.low_cardinality_max_dictionary_size; - // serialize_settings.low_cardinality_use_single_dictionary_for_part = settings.low_cardinality_use_single_dictionary_for_part != 0; - // WrittenOffsetColumns offset_columns; - // auto it = columns_list.begin(); - // for (size_t i = 0; i < columns_list.size(); ++i, ++it) - // { - // if (!serialization_states.empty()) - // { - // serialize_settings.getter = createStreamGetter(it->name, offset_columns, false); - // it->type->serializeBinaryBulkStateSuffix(serialize_settings, serialization_states[i]); - // } + bool write_final_mark = (with_final_mark && rows_count != 0); + writer->finalize(checksums, write_final_mark); - // if (with_final_mark && rows_count != 0) - // writeFinalMark(it->name, it->type, offset_columns, false, serialize_settings.path); - // } - } - - if (with_final_mark && rows_count != 0) + if (write_final_mark) index_granularity.appendMark(0); /// last mark if (!total_column_list) total_column_list = &columns_list; - /// Finish write and get checksums. - MergeTreeData::DataPart::Checksums checksums; - if (additional_column_checksums) checksums = std::move(*additional_column_checksums); @@ -142,17 +126,8 @@ void MergedBlockOutputStream::writeSuffixAndFinalizePart( index_stream = nullptr; } - /// FIXME - // for (ColumnStreams::iterator it = column_streams.begin(); it != column_streams.end(); ++it) - // { - // it->second->finalize(); - // it->second->addToChecksums(checksums); - // } - finishSkipIndicesSerialization(checksums); - // column_streams.clear(); - if (storage.format_version >= MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING) { new_part->partition.store(storage, part_path, checksums); @@ -278,7 +253,7 @@ void MergedBlockOutputStream::writeImpl(const Block & block, const IColumn::Perm } } - size_t new_index_offset = writer->write(block, current_mark, index_offset, index_granularity, primary_key_block, skip_indexes_block); + size_t new_index_offset = writer->write(block, permutation, current_mark, index_offset, index_granularity, primary_key_block, skip_indexes_block); rows_count += rows; /// Should be written before index offset update, because we calculate, @@ -299,9 +274,11 @@ void MergedBlockOutputStream::writeImpl(const Block & block, const IColumn::Perm { if (storage.hasPrimaryKey()) { - for (size_t j = 0, size = primary_key_block.rows(); j < size; ++j) + for (size_t j = 0, size = primary_key_block.columns(); j < size; ++j) { const auto & primary_column = primary_key_block.getByPosition(j); + std::cerr << "(writeImpl) primary_column: " << !!primary_column.column << "\n"; + std::cerr << "(writeImpl) index_column: " << !!index_columns[j] << "\n"; index_columns[j]->insertFrom(*primary_column.column, i); primary_column.type->serializeBinary(*primary_column.column, i, *index_stream); } @@ -316,7 +293,7 @@ void MergedBlockOutputStream::writeImpl(const Block & block, const IColumn::Perm } /// store last index row to write final mark at the end of column - for (size_t j = 0, size = primary_key_block.rows(); j < size; ++j) + for (size_t j = 0, size = primary_key_block.columns(); j < size; ++j) { const IColumn & primary_column = *primary_key_block.getByPosition(j).column.get(); auto mutable_column = std::move(*last_index_row[j].column).mutate();