From d0c6f11fcb203b50e32fbab6b9488c6ffa87fcde Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Wed, 6 Oct 2021 20:59:27 +0300 Subject: [PATCH] More. --- .../DistinctSortedBlockInputStream.cpp | 78 +++++++++--------- .../DistinctSortedBlockInputStream.h | 22 +++-- src/DataStreams/TTLBlockInputStream.cpp | 40 ++++++--- src/DataStreams/TTLBlockInputStream.h | 16 ++-- src/DataStreams/TTLCalcInputStream.cpp | 34 +++++--- src/DataStreams/TTLCalcInputStream.h | 15 ++-- src/Interpreters/MutationsInterpreter.cpp | 7 +- src/Interpreters/MutationsInterpreter.h | 2 +- src/Storages/MergeTree/MergeTask.cpp | 37 +++++---- src/Storages/MergeTree/MergeTask.h | 3 +- src/Storages/MergeTree/MutateTask.cpp | 82 ++++++++++++------- src/Storages/StorageJoin.cpp | 10 +-- src/Storages/StorageMemory.cpp | 9 +- 13 files changed, 197 insertions(+), 158 deletions(-) diff --git a/src/DataStreams/DistinctSortedBlockInputStream.cpp b/src/DataStreams/DistinctSortedBlockInputStream.cpp index eab706924c1..a3105d6330c 100644 --- a/src/DataStreams/DistinctSortedBlockInputStream.cpp +++ b/src/DataStreams/DistinctSortedBlockInputStream.cpp @@ -8,40 +8,28 @@ namespace ErrorCodes extern const int SET_SIZE_LIMIT_EXCEEDED; } -DistinctSortedBlockInputStream::DistinctSortedBlockInputStream( - const BlockInputStreamPtr & input, SortDescription sort_description, const SizeLimits & set_size_limits_, UInt64 limit_hint_, const Names & columns) - : description(std::move(sort_description)) +DistinctSortedTransform::DistinctSortedTransform( + const Block & header, SortDescription sort_description, const SizeLimits & set_size_limits_, UInt64 limit_hint_, const Names & columns) + : ISimpleTransform(header, header, true) + , description(std::move(sort_description)) , columns_names(columns) , limit_hint(limit_hint_) , set_size_limits(set_size_limits_) { - children.push_back(input); } -Block DistinctSortedBlockInputStream::readImpl() +void DistinctSortedTransform::transform(Chunk & chunk) { - /// Execute until end of stream or until - /// a block with some new records will be gotten. - for (;;) - { - /// Stop reading if we already reached the limit. - if (limit_hint && data.getTotalRowCount() >= limit_hint) - return Block(); - - Block block = children.back()->read(); - if (!block) - return Block(); - - const ColumnRawPtrs column_ptrs(getKeyColumns(block)); + const ColumnRawPtrs column_ptrs(getKeyColumns(chunk)); if (column_ptrs.empty()) - return block; + return; - const ColumnRawPtrs clearing_hint_columns(getClearingColumns(block, column_ptrs)); + const ColumnRawPtrs clearing_hint_columns(getClearingColumns(chunk, column_ptrs)); if (data.type == ClearableSetVariants::Type::EMPTY) data.init(ClearableSetVariants::chooseMethod(column_ptrs, key_sizes)); - const size_t rows = block.rows(); + const size_t rows = chunk.getNumRows(); IColumn::Filter filter(rows); bool has_new_data = false; @@ -59,25 +47,33 @@ Block DistinctSortedBlockInputStream::readImpl() /// Just go to the next block if there isn't any new record in the current one. if (!has_new_data) - continue; + return; if (!set_size_limits.check(data.getTotalRowCount(), data.getTotalByteCount(), "DISTINCT", ErrorCodes::SET_SIZE_LIMIT_EXCEEDED)) - return {}; + { + stopReading(); + chunk.clear(); + return; + } - prev_block.block = block; - prev_block.clearing_hint_columns = std::move(clearing_hint_columns); + /// Stop reading if we already reached the limit. + if (limit_hint && data.getTotalRowCount() >= limit_hint) + stopReading(); - size_t all_columns = block.columns(); + prev_chunk.chunk = std::move(chunk); + prev_chunk.clearing_hint_columns = std::move(clearing_hint_columns); + + size_t all_columns = prev_chunk.chunk.getNumColumns(); + Chunk res_chunk; for (size_t i = 0; i < all_columns; ++i) - block.safeGetByPosition(i).column = block.safeGetByPosition(i).column->filter(filter, -1); + res_chunk.addColumn(prev_chunk.chunk.getColumns().at(i)->filter(filter, -1)); - return block; - } + chunk = std::move(res_chunk); } template -bool DistinctSortedBlockInputStream::buildFilter( +bool DistinctSortedTransform::buildFilter( Method & method, const ColumnRawPtrs & columns, const ColumnRawPtrs & clearing_hint_columns, @@ -90,8 +86,8 @@ bool DistinctSortedBlockInputStream::buildFilter( /// Compare last row of previous block and first row of current block, /// If rows not equal, we can clear HashSet, /// If clearing_hint_columns is empty, we CAN'T clear HashSet. - if (!clearing_hint_columns.empty() && !prev_block.clearing_hint_columns.empty() - && !rowsEqual(clearing_hint_columns, 0, prev_block.clearing_hint_columns, prev_block.block.rows() - 1)) + if (!clearing_hint_columns.empty() && !prev_chunk.clearing_hint_columns.empty() + && !rowsEqual(clearing_hint_columns, 0, prev_chunk.clearing_hint_columns, prev_chunk.chunk.getNumRows() - 1)) { method.data.clear(); } @@ -117,18 +113,20 @@ bool DistinctSortedBlockInputStream::buildFilter( return has_new_data; } -ColumnRawPtrs DistinctSortedBlockInputStream::getKeyColumns(const Block & block) const +ColumnRawPtrs DistinctSortedTransform::getKeyColumns(const Chunk & chunk) const { - size_t columns = columns_names.empty() ? block.columns() : columns_names.size(); + size_t columns = columns_names.empty() ? chunk.getNumColumns() : columns_names.size(); ColumnRawPtrs column_ptrs; column_ptrs.reserve(columns); for (size_t i = 0; i < columns; ++i) { - const auto & column = columns_names.empty() - ? block.safeGetByPosition(i).column - : block.getByName(columns_names[i]).column; + auto pos = i; + if (!columns_names.empty()) + pos = input.getHeader().getPositionByName(columns_names[i]); + + const auto & column = chunk.getColumns()[pos]; /// Ignore all constant columns. if (!isColumnConst(*column)) @@ -138,13 +136,13 @@ ColumnRawPtrs DistinctSortedBlockInputStream::getKeyColumns(const Block & block) return column_ptrs; } -ColumnRawPtrs DistinctSortedBlockInputStream::getClearingColumns(const Block & block, const ColumnRawPtrs & key_columns) const +ColumnRawPtrs DistinctSortedTransform::getClearingColumns(const Chunk & chunk, const ColumnRawPtrs & key_columns) const { ColumnRawPtrs clearing_hint_columns; clearing_hint_columns.reserve(description.size()); for (const auto & sort_column_description : description) { - const auto * sort_column_ptr = block.safeGetByPosition(sort_column_description.column_number).column.get(); + const auto * sort_column_ptr = chunk.getColumns().at(sort_column_description.column_number).get(); const auto it = std::find(key_columns.cbegin(), key_columns.cend(), sort_column_ptr); if (it != key_columns.cend()) /// if found in key_columns clearing_hint_columns.emplace_back(sort_column_ptr); @@ -154,7 +152,7 @@ ColumnRawPtrs DistinctSortedBlockInputStream::getClearingColumns(const Block & b return clearing_hint_columns; } -bool DistinctSortedBlockInputStream::rowsEqual(const ColumnRawPtrs & lhs, size_t n, const ColumnRawPtrs & rhs, size_t m) +bool DistinctSortedTransform::rowsEqual(const ColumnRawPtrs & lhs, size_t n, const ColumnRawPtrs & rhs, size_t m) { for (size_t column_index = 0, num_columns = lhs.size(); column_index < num_columns; ++column_index) { diff --git a/src/DataStreams/DistinctSortedBlockInputStream.h b/src/DataStreams/DistinctSortedBlockInputStream.h index 146c9326e5d..ddac6c18a64 100644 --- a/src/DataStreams/DistinctSortedBlockInputStream.h +++ b/src/DataStreams/DistinctSortedBlockInputStream.h @@ -1,6 +1,6 @@ #pragma once -#include +#include #include #include @@ -18,24 +18,22 @@ namespace DB * set limit_hint to non zero value. So we stop emitting new rows after * count of already emitted rows will reach the limit_hint. */ -class DistinctSortedBlockInputStream : public IBlockInputStream +class DistinctSortedTransform : public ISimpleTransform { public: /// Empty columns_ means all columns. - DistinctSortedBlockInputStream(const BlockInputStreamPtr & input, SortDescription sort_description, const SizeLimits & set_size_limits_, UInt64 limit_hint_, const Names & columns); + DistinctSortedTransform(const Block & header, SortDescription sort_description, const SizeLimits & set_size_limits_, UInt64 limit_hint_, const Names & columns); - String getName() const override { return "DistinctSorted"; } - - Block getHeader() const override { return children.at(0)->getHeader(); } + String getName() const override { return "DistinctSortedTransform"; } protected: - Block readImpl() override; + void transform(Chunk & chunk) override; private: - ColumnRawPtrs getKeyColumns(const Block & block) const; + ColumnRawPtrs getKeyColumns(const Chunk & chunk) const; /// When clearing_columns changed, we can clean HashSet to memory optimization /// clearing_columns is a left-prefix of SortDescription exists in key_columns - ColumnRawPtrs getClearingColumns(const Block & block, const ColumnRawPtrs & key_columns) const; + ColumnRawPtrs getClearingColumns(const Chunk & chunk, const ColumnRawPtrs & key_columns) const; static bool rowsEqual(const ColumnRawPtrs & lhs, size_t n, const ColumnRawPtrs & rhs, size_t m); /// return true if has new data @@ -50,12 +48,12 @@ private: SortDescription description; - struct PreviousBlock + struct PreviousChunk { - Block block; + Chunk chunk; ColumnRawPtrs clearing_hint_columns; }; - PreviousBlock prev_block; + PreviousChunk prev_chunk; Names columns_names; ClearableSetVariants data; diff --git a/src/DataStreams/TTLBlockInputStream.cpp b/src/DataStreams/TTLBlockInputStream.cpp index 05d4ba0a395..1a1484fc08e 100644 --- a/src/DataStreams/TTLBlockInputStream.cpp +++ b/src/DataStreams/TTLBlockInputStream.cpp @@ -16,18 +16,17 @@ namespace DB { -TTLBlockInputStream::TTLBlockInputStream( - const BlockInputStreamPtr & input_, +TTLTransform::TTLTransform( + const Block & header_, const MergeTreeData & storage_, const StorageMetadataPtr & metadata_snapshot_, const MergeTreeData::MutableDataPartPtr & data_part_, time_t current_time_, bool force_) - : data_part(data_part_) - , log(&Poco::Logger::get(storage_.getLogName() + " (TTLBlockInputStream)")) + : ISimpleTransform(header_, header_, false) + , data_part(data_part_) + , log(&Poco::Logger::get(storage_.getLogName() + " (TTLTransform)")) { - children.push_back(input_); - header = children.at(0)->getHeader(); auto old_ttl_infos = data_part->ttl_infos; if (metadata_snapshot_->hasRowsTTL()) @@ -50,7 +49,7 @@ TTLBlockInputStream::TTLBlockInputStream( for (const auto & group_by_ttl : metadata_snapshot_->getGroupByTTLs()) algorithms.emplace_back(std::make_unique( - group_by_ttl, old_ttl_infos.group_by_ttl[group_by_ttl.result_column], current_time_, force_, header, storage_)); + group_by_ttl, old_ttl_infos.group_by_ttl[group_by_ttl.result_column], current_time_, force_, getInputPort().getHeader(), storage_)); if (metadata_snapshot_->hasAnyColumnTTL()) { @@ -98,22 +97,28 @@ Block reorderColumns(Block block, const Block & header) return res; } -Block TTLBlockInputStream::readImpl() +void TTLTransform::transform(Chunk & chunk) { if (all_data_dropped) - return {}; + { + stopReading(); + chunk.clear(); + return; + } - auto block = children.at(0)->read(); + auto block = getInputPort().getHeader().cloneWithColumns(chunk.detachColumns()); for (const auto & algorithm : algorithms) algorithm->execute(block); if (!block) - return block; + return; - return reorderColumns(std::move(block), header); + size_t num_rows = block.rows(); + + chunk = Chunk(reorderColumns(std::move(block), getOutputPort().getHeader()).getColumns(), num_rows); } -void TTLBlockInputStream::readSuffixImpl() +void TTLTransform::finalize() { data_part->ttl_infos = {}; for (const auto & algorithm : algorithms) @@ -126,4 +131,13 @@ void TTLBlockInputStream::readSuffixImpl() } } +IProcessor::Status TTLTransform::prepare() +{ + auto status = ISimpleTransform::prepare(); + if (status == Status::Finished) + finalize(); + + return status; +} + } diff --git a/src/DataStreams/TTLBlockInputStream.h b/src/DataStreams/TTLBlockInputStream.h index bf854d9cc9c..986181df652 100644 --- a/src/DataStreams/TTLBlockInputStream.h +++ b/src/DataStreams/TTLBlockInputStream.h @@ -1,5 +1,5 @@ #pragma once -#include +#include #include #include #include @@ -12,11 +12,11 @@ namespace DB { -class TTLBlockInputStream : public IBlockInputStream +class TTLTransform : public ISimpleTransform { public: - TTLBlockInputStream( - const BlockInputStreamPtr & input_, + TTLTransform( + const Block & header_, const MergeTreeData & storage_, const StorageMetadataPtr & metadata_snapshot_, const MergeTreeData::MutableDataPartPtr & data_part_, @@ -25,13 +25,14 @@ public: ); String getName() const override { return "TTL"; } - Block getHeader() const override { return header; } + + Status prepare() override; protected: - Block readImpl() override; + void transform(Chunk & chunk) override; /// Finalizes ttl infos and updates data part - void readSuffixImpl() override; + void finalize(); private: std::vector algorithms; @@ -41,7 +42,6 @@ private: /// ttl_infos and empty_columns are updating while reading const MergeTreeData::MutableDataPartPtr & data_part; Poco::Logger * log; - Block header; }; } diff --git a/src/DataStreams/TTLCalcInputStream.cpp b/src/DataStreams/TTLCalcInputStream.cpp index 2353e9ec259..c156b31428a 100644 --- a/src/DataStreams/TTLCalcInputStream.cpp +++ b/src/DataStreams/TTLCalcInputStream.cpp @@ -4,18 +4,17 @@ namespace DB { -TTLCalcInputStream::TTLCalcInputStream( - const BlockInputStreamPtr & input_, +TTLCalcTransform::TTLCalcTransform( + const Block & header_, const MergeTreeData & storage_, const StorageMetadataPtr & metadata_snapshot_, const MergeTreeData::MutableDataPartPtr & data_part_, time_t current_time_, bool force_) - : data_part(data_part_) + : ISimpleTransform(header_, header_, true) + , data_part(data_part_) , log(&Poco::Logger::get(storage_.getLogName() + " (TTLCalcInputStream)")) { - children.push_back(input_); - header = children.at(0)->getHeader(); auto old_ttl_infos = data_part->ttl_infos; if (metadata_snapshot_->hasRowsTTL()) @@ -51,27 +50,36 @@ TTLCalcInputStream::TTLCalcInputStream( recompression_ttl, TTLUpdateField::RECOMPRESSION_TTL, recompression_ttl.result_column, old_ttl_infos.recompression_ttl[recompression_ttl.result_column], current_time_, force_)); } -Block TTLCalcInputStream::readImpl() +void TTLCalcTransform::transform(Chunk & chunk) { - auto block = children.at(0)->read(); + auto block = getInputPort().getHeader().cloneWithColumns(chunk.detachColumns()); for (const auto & algorithm : algorithms) algorithm->execute(block); if (!block) - return block; + return; - Block res; - for (const auto & col : header) - res.insert(block.getByName(col.name)); + Chunk res; + for (const auto & col : getOutputPort().getHeader()) + res.addColumn(block.getByName(col.name).column); - return res; + chunk = std::move(res); } -void TTLCalcInputStream::readSuffixImpl() +void TTLCalcTransform::finalize() { data_part->ttl_infos = {}; for (const auto & algorithm : algorithms) algorithm->finalize(data_part); } +IProcessor::Status TTLCalcTransform::prepare() +{ + auto status = ISimpleTransform::prepare(); + if (status == Status::Finished) + finalize(); + + return status; +} + } diff --git a/src/DataStreams/TTLCalcInputStream.h b/src/DataStreams/TTLCalcInputStream.h index 20148eadfc2..d0e7b0055f2 100644 --- a/src/DataStreams/TTLCalcInputStream.h +++ b/src/DataStreams/TTLCalcInputStream.h @@ -1,5 +1,5 @@ #pragma once -#include +#include #include #include #include @@ -11,11 +11,11 @@ namespace DB { -class TTLCalcInputStream : public IBlockInputStream +class TTLCalcTransform : public ISimpleTransform { public: - TTLCalcInputStream( - const BlockInputStreamPtr & input_, + TTLCalcTransform( + const Block & header_, const MergeTreeData & storage_, const StorageMetadataPtr & metadata_snapshot_, const MergeTreeData::MutableDataPartPtr & data_part_, @@ -24,13 +24,13 @@ public: ); String getName() const override { return "TTL_CALC"; } - Block getHeader() const override { return header; } + Status prepare() override; protected: - Block readImpl() override; + void transform(Chunk & chunk) override; /// Finalizes ttl infos and updates data part - void readSuffixImpl() override; + void finalize(); private: std::vector algorithms; @@ -38,7 +38,6 @@ private: /// ttl_infos and empty_columns are updating while reading const MergeTreeData::MutableDataPartPtr & data_part; Poco::Logger * log; - Block header; }; } diff --git a/src/Interpreters/MutationsInterpreter.cpp b/src/Interpreters/MutationsInterpreter.cpp index 540d5c76c97..e5a129cbe12 100644 --- a/src/Interpreters/MutationsInterpreter.cpp +++ b/src/Interpreters/MutationsInterpreter.cpp @@ -932,7 +932,7 @@ void MutationsInterpreter::validate() auto pipeline = addStreamsForLaterStages(stages, plan); } -BlockInputStreamPtr MutationsInterpreter::execute() +QueryPipeline MutationsInterpreter::execute() { if (!can_execute) throw Exception("Cannot execute mutations interpreter because can_execute flag set to false", ErrorCodes::LOGICAL_ERROR); @@ -956,12 +956,11 @@ BlockInputStreamPtr MutationsInterpreter::execute() } auto pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder)); - BlockInputStreamPtr result_stream = std::make_shared(std::move(pipeline)); if (!updated_header) - updated_header = std::make_unique(result_stream->getHeader()); + updated_header = std::make_unique(pipeline.getHeader()); - return result_stream; + return pipeline; } Block MutationsInterpreter::getUpdatedHeader() const diff --git a/src/Interpreters/MutationsInterpreter.h b/src/Interpreters/MutationsInterpreter.h index b0540f7d2ed..7b0ccb3bae5 100644 --- a/src/Interpreters/MutationsInterpreter.h +++ b/src/Interpreters/MutationsInterpreter.h @@ -50,7 +50,7 @@ public: size_t evaluateCommandsSize(); /// The resulting stream will return blocks containing only changed columns and columns, that we need to recalculate indices. - BlockInputStreamPtr execute(); + QueryPipeline execute(); /// Only changed columns. Block getUpdatedHeader() const; diff --git a/src/Storages/MergeTree/MergeTask.cpp b/src/Storages/MergeTree/MergeTask.cpp index e5fcaae3417..0810d45a805 100644 --- a/src/Storages/MergeTree/MergeTask.cpp +++ b/src/Storages/MergeTree/MergeTask.cpp @@ -11,6 +11,7 @@ #include "Storages/MergeTree/MergeTreeSequentialSource.h" #include "Storages/MergeTree/FutureMergedMutatedPart.h" #include "Processors/Transforms/ExpressionTransform.h" +#include "Processors/Transforms/MaterializingTransform.h" #include "Processors/Merges/MergingSortedTransform.h" #include "Processors/Merges/CollapsingSortedTransform.h" #include "Processors/Merges/SummingSortedTransform.h" @@ -236,8 +237,6 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare() ctx->compression_codec, ctx->blocks_are_granules_size); - global_ctx->merged_stream->readPrefix(); - global_ctx->rows_written = 0; ctx->initial_reservation = global_ctx->space_reservation ? global_ctx->space_reservation->getSize() : 0; @@ -298,14 +297,17 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::execute() bool MergeTask::ExecuteAndFinalizeHorizontalPart::executeImpl() { Block block; - if (!ctx->is_cancelled() && (block = global_ctx->merged_stream->read())) + if (!ctx->is_cancelled() && (global_ctx->merging_executor->pull(block))) { global_ctx->rows_written += block.rows(); const_cast(*global_ctx->to).write(block); - global_ctx->merge_list_element_ptr->rows_written = global_ctx->merged_stream->getProfileInfo().rows; - global_ctx->merge_list_element_ptr->bytes_written_uncompressed = global_ctx->merged_stream->getProfileInfo().bytes; + UInt64 result_rows = 0; + UInt64 result_bytes = 0; + global_ctx->merged_pipeline.tryGetResultRowsAndBytes(result_rows, result_bytes); + global_ctx->merge_list_element_ptr->rows_written = result_rows; + global_ctx->merge_list_element_ptr->bytes_written_uncompressed = result_bytes; /// Reservation updates is not performed yet, during the merge it may lead to higher free space requirements if (global_ctx->space_reservation && ctx->sum_input_rows_upper_bound) @@ -323,8 +325,8 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::executeImpl() return true; } - global_ctx->merged_stream->readSuffix(); - global_ctx->merged_stream.reset(); + global_ctx->merging_executor.reset(); + global_ctx->merged_pipeline.reset(); if (global_ctx->merges_blocker->isCancelled()) throw Exception("Cancelled merging parts", ErrorCodes::ABORTED); @@ -799,26 +801,25 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream() auto res_pipe = Pipe::unitePipes(std::move(pipes)); res_pipe.addTransform(std::move(merged_transform)); - QueryPipeline pipeline(std::move(res_pipe)); - pipeline.setNumThreads(1); - - global_ctx->merged_stream = std::make_shared(std::move(pipeline)); if (global_ctx->deduplicate) - global_ctx->merged_stream = std::make_shared( - global_ctx->merged_stream, sort_description, SizeLimits(), 0 /*limit_hint*/, global_ctx->deduplicate_by_columns); + res_pipe.addTransform(std::make_shared( + res_pipe.getHeader(), sort_description, SizeLimits(), 0 /*limit_hint*/, global_ctx->deduplicate_by_columns)); if (ctx->need_remove_expired_values) - global_ctx->merged_stream = std::make_shared( - global_ctx->merged_stream, *global_ctx->data, global_ctx->metadata_snapshot, global_ctx->new_data_part, global_ctx->time_of_merge, ctx->force_ttl); + res_pipe.addTransform(std::make_shared( + res_pipe.getHeader(), *global_ctx->data, global_ctx->metadata_snapshot, global_ctx->new_data_part, global_ctx->time_of_merge, ctx->force_ttl)); if (global_ctx->metadata_snapshot->hasSecondaryIndices()) { const auto & indices = global_ctx->metadata_snapshot->getSecondaryIndices(); - global_ctx->merged_stream = std::make_shared( - global_ctx->merged_stream, indices.getSingleExpressionForIndices(global_ctx->metadata_snapshot->getColumns(), global_ctx->data->getContext())); - global_ctx->merged_stream = std::make_shared(global_ctx->merged_stream); + res_pipe.addTransform(std::make_shared( + res_pipe.getHeader(), indices.getSingleExpressionForIndices(global_ctx->metadata_snapshot->getColumns(), global_ctx->data->getContext()))); + res_pipe.addTransform(std::make_shared(res_pipe.getHeader())); } + + global_ctx->merged_pipeline = QueryPipeline(std::move(res_pipe)); + global_ctx->merging_executor = std::make_unique(global_ctx->merged_pipeline); } diff --git a/src/Storages/MergeTree/MergeTask.h b/src/Storages/MergeTree/MergeTask.h index ce628d831ae..aceca912cea 100644 --- a/src/Storages/MergeTree/MergeTask.h +++ b/src/Storages/MergeTree/MergeTask.h @@ -148,7 +148,8 @@ private: std::unique_ptr column_progress{nullptr}; std::shared_ptr to{nullptr}; - BlockInputStreamPtr merged_stream{nullptr}; + QueryPipeline merged_pipeline; + std::unique_ptr merging_executor; SyncGuardPtr sync_guard{nullptr}; MergeTreeData::MutableDataPartPtr new_data_part{nullptr}; diff --git a/src/Storages/MergeTree/MutateTask.cpp b/src/Storages/MergeTree/MutateTask.cpp index 746f0c879d3..e38342e21dd 100644 --- a/src/Storages/MergeTree/MutateTask.cpp +++ b/src/Storages/MergeTree/MutateTask.cpp @@ -11,6 +11,9 @@ #include #include #include +#include +#include +#include #include #include #include @@ -182,7 +185,7 @@ static std::vector getProjectionsForNewDataPart( /// Return set of indices which should be recalculated during mutation also /// wraps input stream into additional expression stream static std::set getIndicesToRecalculate( - BlockInputStreamPtr & input_stream, + QueryPipeline & pipeline, const NameSet & updated_columns, const StorageMetadataPtr & metadata_snapshot, ContextPtr context, @@ -234,9 +237,9 @@ static std::set getIndicesToRecalculate( } } - if (!indices_to_recalc.empty() && input_stream) + if (!indices_to_recalc.empty() && pipeline.initialized()) { - auto indices_recalc_syntax = TreeRewriter(context).analyze(indices_recalc_expr_list, input_stream->getHeader().getNamesAndTypesList()); + auto indices_recalc_syntax = TreeRewriter(context).analyze(indices_recalc_expr_list, pipeline.getHeader().getNamesAndTypesList()); auto indices_recalc_expr = ExpressionAnalyzer( indices_recalc_expr_list, indices_recalc_syntax, context).getActions(false); @@ -246,8 +249,11 @@ static std::set getIndicesToRecalculate( /// MutationsInterpreter which knows about skip indices and stream 'in' already has /// all required columns. /// TODO move this logic to single place. - input_stream = std::make_shared( - std::make_shared(input_stream, indices_recalc_expr)); + QueryPipelineBuilder builder; + builder.init(std::move(pipeline)); + builder.addTransform(std::make_shared(builder.getHeader(), indices_recalc_expr)); + builder.addTransform(std::make_shared(builder.getHeader())); + pipeline = QueryPipelineBuilder::getPipeline(std::move(builder)); } return indices_to_recalc; } @@ -500,7 +506,8 @@ struct MutationContext std::unique_ptr num_mutations; - BlockInputStreamPtr mutating_stream{nullptr}; // in + QueryPipeline mutating_pipeline; // in + std::unique_ptr mutating_executor; Block updated_header; std::unique_ptr interpreter; @@ -795,24 +802,25 @@ void PartMergerWriter::prepare() bool PartMergerWriter::mutateOriginalPartAndPrepareProjections() { - if (MutationHelpers::checkOperationIsNotCanceled(*ctx->merges_blocker, ctx->mutate_entry) && (block = ctx->mutating_stream->read())) + Block cur_block; + if (MutationHelpers::checkOperationIsNotCanceled(*ctx->merges_blocker, ctx->mutate_entry) && ctx->mutating_executor->pull(cur_block)) { if (ctx->minmax_idx) - ctx->minmax_idx->update(block, ctx->data->getMinMaxColumnsNames(ctx->metadata_snapshot->getPartitionKey())); + ctx->minmax_idx->update(cur_block, ctx->data->getMinMaxColumnsNames(ctx->metadata_snapshot->getPartitionKey())); - ctx->out->write(block); + ctx->out->write(cur_block); for (size_t i = 0, size = ctx->projections_to_build.size(); i < size; ++i) { const auto & projection = *ctx->projections_to_build[i]; - auto projection_block = projection_squashes[i].add(projection.calculate(block, ctx->context)); + auto projection_block = projection_squashes[i].add(projection.calculate(cur_block, ctx->context)); if (projection_block) projection_parts[projection.name].emplace_back(MergeTreeDataWriter::writeTempProjectionPart( *ctx->data, ctx->log, projection_block, projection, ctx->new_data_part.get(), ++block_num)); } - (*ctx->mutate_entry)->rows_written += block.rows(); - (*ctx->mutate_entry)->bytes_written_uncompressed += block.bytes(); + (*ctx->mutate_entry)->rows_written += cur_block.rows(); + (*ctx->mutate_entry)->bytes_written_uncompressed += cur_block.bytes(); /// Need execute again return true; @@ -937,18 +945,25 @@ private: auto skip_part_indices = MutationHelpers::getIndicesForNewDataPart(ctx->metadata_snapshot->getSecondaryIndices(), ctx->for_file_renames); ctx->projections_to_build = MutationHelpers::getProjectionsForNewDataPart(ctx->metadata_snapshot->getProjections(), ctx->for_file_renames); - if (ctx->mutating_stream == nullptr) + if (!ctx->mutating_pipeline.initialized()) throw Exception("Cannot mutate part columns with uninitialized mutations stream. It's a bug", ErrorCodes::LOGICAL_ERROR); + QueryPipelineBuilder builder; + builder.init(std::move(ctx->mutating_pipeline)); + if (ctx->metadata_snapshot->hasPrimaryKey() || ctx->metadata_snapshot->hasSecondaryIndices()) - ctx->mutating_stream = std::make_shared( - std::make_shared(ctx->mutating_stream, ctx->data->getPrimaryKeyAndSkipIndicesExpression(ctx->metadata_snapshot))); + { + builder.addTransform( + std::make_shared(builder.getHeader(), ctx->data->getPrimaryKeyAndSkipIndicesExpression(ctx->metadata_snapshot))); + + builder.addTransform(std::make_shared(builder.getHeader())); + } if (ctx->execute_ttl_type == ExecuteTTLType::NORMAL) - ctx->mutating_stream = std::make_shared(ctx->mutating_stream, *ctx->data, ctx->metadata_snapshot, ctx->new_data_part, ctx->time_of_mutation, true); + builder.addTransform(std::make_shared(builder.getHeader(), *ctx->data, ctx->metadata_snapshot, ctx->new_data_part, ctx->time_of_mutation, true)); if (ctx->execute_ttl_type == ExecuteTTLType::RECALCULATE) - ctx->mutating_stream = std::make_shared(ctx->mutating_stream, *ctx->data, ctx->metadata_snapshot, ctx->new_data_part, ctx->time_of_mutation, true); + builder.addTransform(std::make_shared(builder.getHeader(), *ctx->data, ctx->metadata_snapshot, ctx->new_data_part, ctx->time_of_mutation, true)); ctx->minmax_idx = std::make_shared(); @@ -959,7 +974,8 @@ private: skip_part_indices, ctx->compression_codec); - ctx->mutating_stream->readPrefix(); + ctx->mutating_pipeline = QueryPipelineBuilder::getPipeline(std::move(builder)); + ctx->mutating_executor = std::make_unique(ctx->mutating_pipeline); part_merger_writer_task = std::make_unique(ctx); } @@ -968,7 +984,8 @@ private: void finalize() { ctx->new_data_part->minmax_idx = std::move(ctx->minmax_idx); - ctx->mutating_stream->readSuffix(); + ctx->mutating_executor.reset(); + ctx->mutating_pipeline.reset(); static_pointer_cast(ctx->out)->writeSuffixAndFinalizePart(ctx->new_data_part, ctx->need_sync); } @@ -1087,16 +1104,16 @@ private: ctx->compression_codec = ctx->source_part->default_codec; - if (ctx->mutating_stream) + if (ctx->mutating_pipeline.initialized()) { - if (ctx->mutating_stream == nullptr) - throw Exception("Cannot mutate part columns with uninitialized mutations stream. It's a bug", ErrorCodes::LOGICAL_ERROR); + QueryPipelineBuilder builder; + builder.init(std::move(ctx->mutating_pipeline)); if (ctx->execute_ttl_type == ExecuteTTLType::NORMAL) - ctx->mutating_stream = std::make_shared(ctx->mutating_stream, *ctx->data, ctx->metadata_snapshot, ctx->new_data_part, ctx->time_of_mutation, true); + builder.addTransform(std::make_shared(builder.getHeader(), *ctx->data, ctx->metadata_snapshot, ctx->new_data_part, ctx->time_of_mutation, true)); if (ctx->execute_ttl_type == ExecuteTTLType::RECALCULATE) - ctx->mutating_stream = std::make_shared(ctx->mutating_stream, *ctx->data, ctx->metadata_snapshot, ctx->new_data_part, ctx->time_of_mutation, true); + builder.addTransform(std::make_shared(builder.getHeader(), *ctx->data, ctx->metadata_snapshot, ctx->new_data_part, ctx->time_of_mutation, true)); ctx->out = std::make_shared( ctx->new_data_part, @@ -1109,7 +1126,9 @@ private: &ctx->source_part->index_granularity_info ); - ctx->mutating_stream->readPrefix(); + ctx->mutating_pipeline = QueryPipelineBuilder::getPipeline(std::move(builder)); + ctx->mutating_executor = std::make_unique(ctx->mutating_pipeline); + ctx->projections_to_build = std::vector{ctx->projections_to_recalc.begin(), ctx->projections_to_recalc.end()}; part_merger_writer_task = std::make_unique(ctx); @@ -1119,9 +1138,10 @@ private: void finalize() { - if (ctx->mutating_stream) + if (ctx->mutating_executor) { - ctx->mutating_stream->readSuffix(); + ctx->mutating_executor.reset(); + ctx->mutating_pipeline.reset(); auto changed_checksums = static_pointer_cast(ctx->out)->writeSuffixAndGetChecksums( @@ -1267,9 +1287,9 @@ bool MutateTask::prepare() ctx->materialized_indices = ctx->interpreter->grabMaterializedIndices(); ctx->materialized_projections = ctx->interpreter->grabMaterializedProjections(); ctx->mutation_kind = ctx->interpreter->getMutationKind(); - ctx->mutating_stream = ctx->interpreter->execute(); + ctx->mutating_pipeline = ctx->interpreter->execute(); ctx->updated_header = ctx->interpreter->getUpdatedHeader(); - ctx->mutating_stream->setProgressCallback(MergeProgressCallback((*ctx->mutate_entry)->ptr(), ctx->watch_prev_elapsed, *ctx->stage_progress)); + ctx->mutating_pipeline.setProgressCallback(MergeProgressCallback((*ctx->mutate_entry)->ptr(), ctx->watch_prev_elapsed, *ctx->stage_progress)); } ctx->single_disk_volume = std::make_shared("volume_" + ctx->future_part->name, ctx->space_reservation->getDisk(), 0); @@ -1299,7 +1319,7 @@ bool MutateTask::prepare() ctx->need_sync = needSyncPart(ctx->source_part->rows_count, ctx->source_part->getBytesOnDisk(), *data_settings); ctx->execute_ttl_type = ExecuteTTLType::NONE; - if (ctx->mutating_stream) + if (ctx->mutating_pipeline.initialized()) ctx->execute_ttl_type = MergeTreeDataMergerMutator::shouldExecuteTTL(ctx->metadata_snapshot, ctx->interpreter->getColumnDependencies()); @@ -1318,7 +1338,7 @@ bool MutateTask::prepare() ctx->updated_columns.emplace(name_type.name); ctx->indices_to_recalc = MutationHelpers::getIndicesToRecalculate( - ctx->mutating_stream, ctx->updated_columns, ctx->metadata_snapshot, ctx->context, ctx->materialized_indices, ctx->source_part); + ctx->mutating_pipeline, ctx->updated_columns, ctx->metadata_snapshot, ctx->context, ctx->materialized_indices, ctx->source_part); ctx->projections_to_recalc = MutationHelpers::getProjectionsToRecalculate( ctx->updated_columns, ctx->metadata_snapshot, ctx->materialized_projections, ctx->source_part); diff --git a/src/Storages/StorageJoin.cpp b/src/Storages/StorageJoin.cpp index e45183591f2..2acdba18c2d 100644 --- a/src/Storages/StorageJoin.cpp +++ b/src/Storages/StorageJoin.cpp @@ -18,6 +18,7 @@ #include #include #include +#include #include /// toLower @@ -114,17 +115,16 @@ void StorageJoin::mutate(const MutationCommands & commands, ContextPtr context) { auto storage_ptr = DatabaseCatalog::instance().getTable(getStorageID(), context); auto interpreter = std::make_unique(storage_ptr, metadata_snapshot, commands, context, true); - auto in = interpreter->execute(); - in->readPrefix(); + auto pipeline = interpreter->execute(); + PullingPipelineExecutor executor(pipeline); - while (const Block & block = in->read()) + Block block; + while (executor.pull(block)) { new_data->addJoinedBlock(block, true); if (persistent) backup_stream.write(block); } - - in->readSuffix(); } /// Now acquire exclusive lock and modify storage. diff --git a/src/Storages/StorageMemory.cpp b/src/Storages/StorageMemory.cpp index 3fe6083ab13..299e39a3836 100644 --- a/src/Storages/StorageMemory.cpp +++ b/src/Storages/StorageMemory.cpp @@ -10,6 +10,7 @@ #include #include #include +#include namespace DB @@ -263,11 +264,12 @@ void StorageMemory::mutate(const MutationCommands & commands, ContextPtr context new_context->setSetting("max_threads", 1); auto interpreter = std::make_unique(storage_ptr, metadata_snapshot, commands, new_context, true); - auto in = interpreter->execute(); + auto pipeline = interpreter->execute(); + PullingPipelineExecutor executor(pipeline); - in->readPrefix(); Blocks out; - while (Block block = in->read()) + Block block; + while (executor.pull(block)) { if (compress) for (auto & elem : block) @@ -275,7 +277,6 @@ void StorageMemory::mutate(const MutationCommands & commands, ContextPtr context out.push_back(block); } - in->readSuffix(); std::unique_ptr new_data;