From 68f8b9d235e7417537e4066fb864a71dd8149fd0 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Wed, 29 Sep 2021 20:45:01 +0300 Subject: [PATCH] Update ColumnGathererStream --- src/DataStreams/ColumnGathererStream.cpp | 164 ++++++++++++------ src/DataStreams/ColumnGathererStream.h | 98 ++++++----- .../MergeTree/IMergedBlockOutputStream.h | 4 +- src/Storages/MergeTree/MergeTask.cpp | 5 - .../MergeTree/MergedBlockOutputStream.cpp | 5 - .../MergeTree/MergedBlockOutputStream.h | 7 +- .../MergedColumnOnlyOutputStream.cpp | 5 - .../MergeTree/MergedColumnOnlyOutputStream.h | 5 +- 8 files changed, 168 insertions(+), 125 deletions(-) diff --git a/src/DataStreams/ColumnGathererStream.cpp b/src/DataStreams/ColumnGathererStream.cpp index 683b8012efe..90da7792c21 100644 --- a/src/DataStreams/ColumnGathererStream.cpp +++ b/src/DataStreams/ColumnGathererStream.cpp @@ -18,97 +18,151 @@ namespace ErrorCodes } ColumnGathererStream::ColumnGathererStream( - const String & column_name_, const BlockInputStreams & source_streams, ReadBuffer & row_sources_buf_, - size_t block_preferred_size_) - : column_name(column_name_), sources(source_streams.size()), row_sources_buf(row_sources_buf_) - , block_preferred_size(block_preferred_size_), log(&Poco::Logger::get("ColumnGathererStream")) + size_t num_inputs, ReadBuffer & row_sources_buf_, size_t block_preferred_size_) + : sources(num_inputs), row_sources_buf(row_sources_buf_) + , block_preferred_size(block_preferred_size_) { - if (source_streams.empty()) + if (num_inputs == 0) throw Exception("There are no streams to gather", ErrorCodes::EMPTY_DATA_PASSED); +} - children.assign(source_streams.begin(), source_streams.end()); - - for (size_t i = 0; i < children.size(); ++i) +void ColumnGathererStream::initialize(Inputs inputs) +{ + for (size_t i = 0; i < inputs.size(); ++i) { - const Block & header = children[i]->getHeader(); - - /// Sometimes MergeTreeReader injects additional column with partitioning key - if (header.columns() > 2) - throw Exception( - "Block should have 1 or 2 columns, but contains " + toString(header.columns()), - ErrorCodes::INCORRECT_NUMBER_OF_COLUMNS); - - if (i == 0) + if (inputs[i].chunk) { - column.name = column_name; - column.type = header.getByName(column_name).type; - column.column = column.type->createColumn(); + sources[i].update(inputs[i].chunk.detachColumns().at(0)); + if (!result_column) + result_column = sources[i].column->cloneEmpty(); } - else if (header.getByName(column_name).column->getName() != column.column->getName()) - throw Exception("Column types don't match", ErrorCodes::INCOMPATIBLE_COLUMNS); } } - -Block ColumnGathererStream::readImpl() +IMergingAlgorithm::Status ColumnGathererStream::merge() { + /// Nothing to read after initialize. + if (!result_column) + return Status(Chunk(), true); + + if (source_to_fully_copy) /// Was set on a previous iteration + { + Chunk res; + res.addColumn(source_to_fully_copy->column); + merged_rows += source_to_fully_copy->size; + source_to_fully_copy->pos = source_to_fully_copy->size; + source_to_fully_copy = nullptr; + return Status(std::move(res)); + } + /// Special case: single source and there are no skipped rows - if (children.size() == 1 && row_sources_buf.eof() && !source_to_fully_copy) - return children[0]->read(); + /// Note: looks like this should never happen because row_sources_buf cannot just skip row info. + if (sources.size() == 1 && row_sources_buf.eof()) + { + if (sources.front().pos < sources.front().size) + { + next_required_source = 0; + Chunk res; + merged_rows += sources.front().column->size(); + merged_bytes += sources.front().column->allocatedBytes(); + res.addColumn(std::move(sources.front().column)); + sources.front().pos = sources.front().size = 0; + return Status(std::move(res)); + } - if (!source_to_fully_copy && row_sources_buf.eof()) - return Block(); + if (next_required_source == -1) + return Status(Chunk(), true); - MutableColumnPtr output_column = column.column->cloneEmpty(); - output_block = Block{column.cloneEmpty()}; - /// Surprisingly this call may directly change output_block, bypassing + next_required_source = 0; + return Status(next_required_source); + } + + if (next_required_source != -1 && sources[next_required_source].size == 0) + throw Exception("Cannot fetch required block. Source " + toString(next_required_source), ErrorCodes::RECEIVED_EMPTY_DATA); + + /// Surprisingly this call may directly change some internal state of ColumnGathererStream. /// output_column. See ColumnGathererStream::gather. - output_column->gather(*this); - if (!output_column->empty()) - output_block.getByPosition(0).column = std::move(output_column); + result_column->gather(*this); - return output_block; + if (next_required_source != -1) + return Status(next_required_source); + + if (source_to_fully_copy && result_column->empty()) + { + Chunk res; + merged_rows += source_to_fully_copy->column->size(); + merged_bytes += source_to_fully_copy->column->allocatedBytes(); + res.addColumn(source_to_fully_copy->column); + source_to_fully_copy->pos = source_to_fully_copy->size; + source_to_fully_copy = nullptr; + return Status(std::move(res)); + } + + auto col = result_column->cloneEmpty(); + result_column.swap(col); + + Chunk res; + merged_rows += col->size(); + merged_bytes += col->allocatedBytes(); + res.addColumn(std::move(col)); + return Status(std::move(res), row_sources_buf.eof()); } -void ColumnGathererStream::fetchNewBlock(Source & source, size_t source_num) +void ColumnGathererStream::consume(Input & input, size_t source_num) { - try - { - source.block = children[source_num]->read(); - source.update(column_name); - } - catch (Exception & e) - { - e.addMessage("Cannot fetch required block. Stream " + children[source_num]->getName() + ", part " + toString(source_num)); - throw; - } + auto & source = sources[source_num]; + if (input.chunk) + source.update(input.chunk.getColumns().at(0)); if (0 == source.size) { - throw Exception("Fetched block is empty. Stream " + children[source_num]->getName() + ", part " + toString(source_num), + throw Exception("Fetched block is empty. Source " + toString(source_num), ErrorCodes::RECEIVED_EMPTY_DATA); } } - -void ColumnGathererStream::readSuffixImpl() +ColumnGathererTransform::ColumnGathererTransform( + const Block & header, + size_t num_inputs, + ReadBuffer & row_sources_buf_, + size_t block_preferred_size_) + : IMergingTransform( + num_inputs, header, header, /*have_all_inputs_=*/ true, /*has_limit_below_one_block_=*/ false, + num_inputs, row_sources_buf_, block_preferred_size_) + , log(&Poco::Logger::get("ColumnGathererStream")) { - const BlockStreamProfileInfo & profile_info = getProfileInfo(); + if (header.columns() != 1) + throw Exception( + "Header should have 1 column, but contains " + toString(header.columns()), + ErrorCodes::INCORRECT_NUMBER_OF_COLUMNS); +} +void ColumnGathererTransform::work() +{ + Stopwatch stopwatch; + IMergingTransform::work(); + elapsed_ns += stopwatch.elapsedNanoseconds(); +} + +void ColumnGathererTransform::onFinish() +{ + auto merged_rows = algorithm.getMergedRows(); + auto merged_bytes = algorithm.getMergedRows(); /// Don't print info for small parts (< 10M rows) - if (profile_info.rows < 10000000) + if (merged_rows < 10000000) return; - double seconds = profile_info.total_stopwatch.elapsedSeconds(); + double seconds = static_cast(elapsed_ns) / 1000000000ULL; + const auto & column_name = getOutputPort().getHeader().getByPosition(0).name; if (!seconds) LOG_DEBUG(log, "Gathered column {} ({} bytes/elem.) in 0 sec.", - column_name, static_cast(profile_info.bytes) / profile_info.rows); + column_name, static_cast(merged_bytes) / merged_rows); else LOG_DEBUG(log, "Gathered column {} ({} bytes/elem.) in {} sec., {} rows/sec., {}/sec.", - column_name, static_cast(profile_info.bytes) / profile_info.rows, seconds, - profile_info.rows / seconds, ReadableSize(profile_info.bytes / seconds)); + column_name, static_cast(merged_bytes) / merged_rows, seconds, + merged_rows / seconds, ReadableSize(merged_bytes / seconds)); } } diff --git a/src/DataStreams/ColumnGathererStream.h b/src/DataStreams/ColumnGathererStream.h index 05665ab3f42..43cbf7094d8 100644 --- a/src/DataStreams/ColumnGathererStream.h +++ b/src/DataStreams/ColumnGathererStream.h @@ -1,8 +1,9 @@ #pragma once -#include #include #include +#include +#include namespace Poco { class Logger; } @@ -53,77 +54,91 @@ using MergedRowSources = PODArray; * Stream mask maps row number to index of source stream. * Streams should contain exactly one column. */ -class ColumnGathererStream : public IBlockInputStream +class ColumnGathererStream final : public IMergingAlgorithm { public: - ColumnGathererStream( - const String & column_name_, const BlockInputStreams & source_streams, ReadBuffer & row_sources_buf_, - size_t block_preferred_size_ = DEFAULT_BLOCK_SIZE); + ColumnGathererStream(size_t num_inputs, ReadBuffer & row_sources_buf_, size_t block_preferred_size_ = DEFAULT_BLOCK_SIZE); - String getName() const override { return "ColumnGatherer"; } - - Block readImpl() override; - - void readSuffixImpl() override; - - Block getHeader() const override { return children.at(0)->getHeader(); } + void initialize(Inputs inputs) override; + void consume(Input & input, size_t source_num) override; + Status merge() override; /// for use in implementations of IColumn::gather() template void gather(Column & column_res); + UInt64 getMergedRows() const { return merged_rows; } + UInt64 getMergedBytes() const { return merged_bytes; } + private: /// Cache required fields struct Source { - const IColumn * column = nullptr; + ColumnPtr column; size_t pos = 0; size_t size = 0; - Block block; - void update(const String & name) + void update(ColumnPtr column_) { - column = block.getByName(name).column.get(); - size = block.rows(); + column = std::move(column_); + size = column->size(); pos = 0; } }; - void fetchNewBlock(Source & source, size_t source_num); - - String column_name; - ColumnWithTypeAndName column; + MutableColumnPtr result_column; std::vector sources; ReadBuffer & row_sources_buf; - size_t block_preferred_size; + const size_t block_preferred_size; Source * source_to_fully_copy = nullptr; - Block output_block; + + ssize_t next_required_source = -1; + size_t cur_block_preferred_size; + + UInt64 merged_rows = 0; + UInt64 merged_bytes = 0; +}; + +class ColumnGathererTransform final : public IMergingTransform +{ +public: + ColumnGathererTransform( + const Block & header, + size_t num_inputs, + ReadBuffer & row_sources_buf_, + size_t block_preferred_size_ = DEFAULT_BLOCK_SIZE); + + String getName() const override { return "ColumnGathererTransform"; } + + void work() override; + +protected: + void onFinish() override; + UInt64 elapsed_ns = 0; Poco::Logger * log; }; + template void ColumnGathererStream::gather(Column & column_res) { - if (source_to_fully_copy) /// Was set on a previous iteration - { - output_block.getByPosition(0).column = source_to_fully_copy->block.getByName(column_name).column; - source_to_fully_copy->pos = source_to_fully_copy->size; - source_to_fully_copy = nullptr; - return; - } - row_sources_buf.nextIfAtEnd(); RowSourcePart * row_source_pos = reinterpret_cast(row_sources_buf.position()); RowSourcePart * row_sources_end = reinterpret_cast(row_sources_buf.buffer().end()); - size_t cur_block_preferred_size = std::min(static_cast(row_sources_end - row_source_pos), block_preferred_size); - column_res.reserve(cur_block_preferred_size); + if (next_required_source == -1) + { + /// Start new column. + cur_block_preferred_size = std::min(static_cast(row_sources_end - row_source_pos), block_preferred_size); + column_res.reserve(cur_block_preferred_size); + } - size_t cur_size = 0; + size_t cur_size = column_res->size(); + next_required_source = -1; while (row_source_pos < row_sources_end && cur_size < cur_block_preferred_size) { @@ -131,13 +146,15 @@ void ColumnGathererStream::gather(Column & column_res) size_t source_num = row_source.getSourceNum(); Source & source = sources[source_num]; bool source_skip = row_source.getSkipFlag(); - ++row_source_pos; if (source.pos >= source.size) /// Fetch new block from source_num part { - fetchNewBlock(source, source_num); + next_required_source = source_num; + return; } + ++row_source_pos; + /// Consecutive optimization. TODO: precompute lengths size_t len = 1; size_t max_len = std::min(static_cast(row_sources_end - row_source_pos), source.size - source.pos); // interval should be in the same block @@ -156,14 +173,7 @@ void ColumnGathererStream::gather(Column & column_res) { /// If current block already contains data, return it. /// Whole column from current source will be returned on next read() iteration. - if (cur_size > 0) - { - source_to_fully_copy = &source; - return; - } - - output_block.getByPosition(0).column = source.block.getByName(column_name).column; - source.pos += len; + source_to_fully_copy = &source; return; } else if (len == 1) diff --git a/src/Storages/MergeTree/IMergedBlockOutputStream.h b/src/Storages/MergeTree/IMergedBlockOutputStream.h index 0e689b7c84c..133f0804838 100644 --- a/src/Storages/MergeTree/IMergedBlockOutputStream.h +++ b/src/Storages/MergeTree/IMergedBlockOutputStream.h @@ -2,14 +2,13 @@ #include #include -#include #include #include namespace DB { -class IMergedBlockOutputStream : public IBlockOutputStream +class IMergedBlockOutputStream { public: IMergedBlockOutputStream( @@ -35,7 +34,6 @@ protected: NamesAndTypesList & columns, MergeTreeData::DataPart::Checksums & checksums); -protected: const MergeTreeData & storage; StorageMetadataPtr metadata_snapshot; diff --git a/src/Storages/MergeTree/MergeTask.cpp b/src/Storages/MergeTree/MergeTask.cpp index e6b37d0657c..54230f56a6c 100644 --- a/src/Storages/MergeTree/MergeTask.cpp +++ b/src/Storages/MergeTree/MergeTask.cpp @@ -238,9 +238,6 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare() global_ctx->merged_stream->readPrefix(); - /// TODO: const - const_cast(*global_ctx->to).writePrefix(); - global_ctx->rows_written = 0; ctx->initial_reservation = global_ctx->space_reservation ? global_ctx->space_reservation->getSize() : 0; @@ -421,8 +418,6 @@ void MergeTask::VerticalMergeStage::prepareVerticalMergeForOneColumn() const global_ctx->to->getIndexGranularity()); ctx->column_elems_written = 0; - - ctx->column_to->writePrefix(); } diff --git a/src/Storages/MergeTree/MergedBlockOutputStream.cpp b/src/Storages/MergeTree/MergedBlockOutputStream.cpp index 31675789257..495ce5ee933 100644 --- a/src/Storages/MergeTree/MergedBlockOutputStream.cpp +++ b/src/Storages/MergeTree/MergedBlockOutputStream.cpp @@ -51,11 +51,6 @@ void MergedBlockOutputStream::writeWithPermutation(const Block & block, const IC writeImpl(block, permutation); } -void MergedBlockOutputStream::writeSuffix() -{ - throw Exception("Method writeSuffix is not supported by MergedBlockOutputStream", ErrorCodes::NOT_IMPLEMENTED); -} - void MergedBlockOutputStream::writeSuffixAndFinalizePart( MergeTreeData::MutableDataPartPtr & new_part, bool sync, diff --git a/src/Storages/MergeTree/MergedBlockOutputStream.h b/src/Storages/MergeTree/MergedBlockOutputStream.h index 95cc91a8ebc..3586ac17298 100644 --- a/src/Storages/MergeTree/MergedBlockOutputStream.h +++ b/src/Storages/MergeTree/MergedBlockOutputStream.h @@ -21,18 +21,16 @@ public: CompressionCodecPtr default_codec_, bool blocks_are_granules_size = false); - Block getHeader() const override { return metadata_snapshot->getSampleBlock(); } + Block getHeader() const { return metadata_snapshot->getSampleBlock(); } /// If the data is pre-sorted. - void write(const Block & block) override; + void write(const Block & block); /** If the data is not sorted, but we have previously calculated the permutation, that will sort it. * This method is used to save RAM, since you do not need to keep two blocks at once - the original one and the sorted one. */ void writeWithPermutation(const Block & block, const IColumn::Permutation * permutation); - void writeSuffix() override; - /// Finalize writing part and fill inner structures /// If part is new and contains projections, they should be added before invoking this method. void writeSuffixAndFinalizePart( @@ -53,7 +51,6 @@ private: MergeTreeData::DataPart::Checksums & checksums, bool sync); -private: NamesAndTypesList columns_list; IMergeTreeDataPart::MinMaxIndex minmax_idx; size_t rows_count = 0; diff --git a/src/Storages/MergeTree/MergedColumnOnlyOutputStream.cpp b/src/Storages/MergeTree/MergedColumnOnlyOutputStream.cpp index 3638212b320..4b760103750 100644 --- a/src/Storages/MergeTree/MergedColumnOnlyOutputStream.cpp +++ b/src/Storages/MergeTree/MergedColumnOnlyOutputStream.cpp @@ -53,11 +53,6 @@ void MergedColumnOnlyOutputStream::write(const Block & block) writer->write(block, nullptr); } -void MergedColumnOnlyOutputStream::writeSuffix() -{ - throw Exception("Method writeSuffix is not supported by MergedColumnOnlyOutputStream", ErrorCodes::NOT_IMPLEMENTED); -} - MergeTreeData::DataPart::Checksums MergedColumnOnlyOutputStream::writeSuffixAndGetChecksums( MergeTreeData::MutableDataPartPtr & new_part, diff --git a/src/Storages/MergeTree/MergedColumnOnlyOutputStream.h b/src/Storages/MergeTree/MergedColumnOnlyOutputStream.h index c82357dfb1d..7a146a91331 100644 --- a/src/Storages/MergeTree/MergedColumnOnlyOutputStream.h +++ b/src/Storages/MergeTree/MergedColumnOnlyOutputStream.h @@ -23,9 +23,8 @@ public: const MergeTreeIndexGranularity & index_granularity = {}, const MergeTreeIndexGranularityInfo * index_granularity_info_ = nullptr); - Block getHeader() const override { return header; } - void write(const Block & block) override; - void writeSuffix() override; + Block getHeader() const { return header; } + void write(const Block & block); MergeTreeData::DataPart::Checksums writeSuffixAndGetChecksums(MergeTreeData::MutableDataPartPtr & new_part, MergeTreeData::DataPart::Checksums & all_checksums, bool sync = false);