diff --git a/src/DataStreams/RemoteQueryExecutor.h b/src/DataStreams/RemoteQueryExecutor.h index e39a7ccc94b..0db0e0218be 100644 --- a/src/DataStreams/RemoteQueryExecutor.h +++ b/src/DataStreams/RemoteQueryExecutor.h @@ -61,8 +61,8 @@ public: void cancel(); /// Get totals and extremes if any. - Block getTotals() const { return totals; } - Block getExtremes() const { return extremes; } + Block getTotals() { return std::move(totals); } + Block getExtremes() { return std::move(extremes); } /// Set callback for progress. It will be called on Progress packet. void setProgressCallback(ProgressCallback callback) { progress_callback = std::move(callback); } diff --git a/src/Interpreters/ClusterProxy/SelectStreamFactory.cpp b/src/Interpreters/ClusterProxy/SelectStreamFactory.cpp index 5d41b0e87ce..bfa6fae0977 100644 --- a/src/Interpreters/ClusterProxy/SelectStreamFactory.cpp +++ b/src/Interpreters/ClusterProxy/SelectStreamFactory.cpp @@ -1,8 +1,6 @@ #include #include #include -#include -#include #include #include #include @@ -13,9 +11,8 @@ #include #include #include -#include -#include -#include +#include +#include namespace ProfileEvents { @@ -118,13 +115,13 @@ void SelectStreamFactory::createForShard( const SelectQueryInfo &, Pipes & res) { - bool force_add_agg_info = processed_stage == QueryProcessingStage::WithMergeableState; - bool add_totals_port = false; - bool add_extremes_port = false; + bool add_agg_info = processed_stage == QueryProcessingStage::WithMergeableState; + bool add_totals = false; + bool add_extremes = false; if (processed_stage == QueryProcessingStage::Complete) { - add_totals_port = query_ast->as().group_by_with_totals; - add_extremes_port = context.getSettingsRef().extremes; + add_totals = query_ast->as().group_by_with_totals; + add_extremes = context.getSettingsRef().extremes; } auto modified_query_ast = query_ast->clone(); @@ -140,20 +137,13 @@ void SelectStreamFactory::createForShard( auto emplace_remote_stream = [&]() { - auto stream = std::make_shared( + auto remote_query_executor = std::make_shared( shard_info.pool, modified_query, header, context, nullptr, throttler, scalars, external_tables, processed_stage); - stream->setPoolMode(PoolMode::GET_MANY); + remote_query_executor->setPoolMode(PoolMode::GET_MANY); if (!table_func_ptr) - stream->setMainTable(main_table); + remote_query_executor->setMainTable(main_table); - auto source = std::make_shared(std::move(stream), force_add_agg_info); - - if (add_totals_port) - source->addTotalsPort(); - if (add_extremes_port) - source->addExtremesPort(); - - res.emplace_back(std::move(source)); + res.emplace_back(createRemoteSourcePipe(remote_query_executor, add_agg_info, add_totals, add_extremes)); }; const auto & settings = context.getSettingsRef(); @@ -246,8 +236,8 @@ void SelectStreamFactory::createForShard( auto lazily_create_stream = [ pool = shard_info.pool, shard_num = shard_info.shard_num, modified_query, header = header, modified_query_ast, context, throttler, main_table = main_table, table_func_ptr = table_func_ptr, scalars = scalars, external_tables = external_tables, - stage = processed_stage, local_delay]() - -> BlockInputStreamPtr + stage = processed_stage, local_delay, add_agg_info, add_totals, add_extremes]() + -> Pipe { auto current_settings = context.getSettingsRef(); auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover( @@ -277,8 +267,7 @@ void SelectStreamFactory::createForShard( } if (try_results.empty() || local_delay < max_remote_delay) - return std::make_shared( - createLocalStream(modified_query_ast, header, context, stage)); + return createLocalStream(modified_query_ast, header, context, stage).getPipe(); else { std::vector connections; @@ -286,20 +275,14 @@ void SelectStreamFactory::createForShard( for (auto & try_result : try_results) connections.emplace_back(std::move(try_result.entry)); - return std::make_shared( + auto remote_query_executor = std::make_shared( std::move(connections), modified_query, header, context, nullptr, throttler, scalars, external_tables, stage); + + return createRemoteSourcePipe(remote_query_executor, add_agg_info, add_totals, add_extremes); } }; - auto lazy_stream = std::make_shared("LazyShardWithLocalReplica", header, lazily_create_stream); - auto source = std::make_shared(std::move(lazy_stream), force_add_agg_info); - - if (add_totals_port) - source->addTotalsPort(); - if (add_extremes_port) - source->addExtremesPort(); - - res.emplace_back(std::move(source)); + res.emplace_back(createDelayedPipe(header, lazily_create_stream)); } else emplace_remote_stream(); diff --git a/src/Processors/QueryPipeline.cpp b/src/Processors/QueryPipeline.cpp index 92c91a81b8a..5b6109440d5 100644 --- a/src/Processors/QueryPipeline.cpp +++ b/src/Processors/QueryPipeline.cpp @@ -20,6 +20,7 @@ #include #include #include +#include namespace DB { @@ -673,8 +674,10 @@ void QueryPipeline::initRowsBeforeLimit() { RowsBeforeLimitCounterPtr rows_before_limit_at_least; + /// TODO: add setRowsBeforeLimitCounter as virtual method to IProcessor. std::vector limits; std::vector sources; + std::vector remote_sources; std::unordered_set visited; @@ -705,6 +708,9 @@ void QueryPipeline::initRowsBeforeLimit() if (auto * source = typeid_cast(processor)) sources.emplace_back(source); + + if (auto * source = typeid_cast(processor)) + remote_sources.emplace_back(source); } else if (auto * sorting = typeid_cast(processor)) { @@ -735,7 +741,7 @@ void QueryPipeline::initRowsBeforeLimit() } } - if (!rows_before_limit_at_least && (!limits.empty() || !sources.empty())) + if (!rows_before_limit_at_least && (!limits.empty() || !sources.empty() || !remote_sources.empty())) { rows_before_limit_at_least = std::make_shared(); @@ -744,6 +750,9 @@ void QueryPipeline::initRowsBeforeLimit() for (auto & source : sources) source->setRowsBeforeLimitCounter(rows_before_limit_at_least); + + for (auto & source : remote_sources) + source->setRowsBeforeLimitCounter(rows_before_limit_at_least); } /// If there is a limit, then enable rows_before_limit_at_least diff --git a/src/Processors/RowsBeforeLimitCounter.h b/src/Processors/RowsBeforeLimitCounter.h index 36ea4a557a8..f5eb40ff84a 100644 --- a/src/Processors/RowsBeforeLimitCounter.h +++ b/src/Processors/RowsBeforeLimitCounter.h @@ -15,6 +15,12 @@ public: rows_before_limit.fetch_add(rows, std::memory_order_release); } + void set(uint64_t rows) + { + setAppliedLimit(); + rows_before_limit.store(rows, std::memory_order_release); + } + uint64_t get() const { return rows_before_limit.load(std::memory_order_acquire); } void setAppliedLimit() { has_applied_limit.store(true, std::memory_order_release); } diff --git a/src/Processors/Sources/DelayedSource.cpp b/src/Processors/Sources/DelayedSource.cpp new file mode 100644 index 00000000000..42a33d00196 --- /dev/null +++ b/src/Processors/Sources/DelayedSource.cpp @@ -0,0 +1,119 @@ +#include +#include "NullSource.h" + +namespace DB +{ + +DelayedSource::DelayedSource(const Block & header, Creator processors_creator) + : IProcessor({}, OutputPorts(3, header)) + , creator(std::move(processors_creator)) +{ +} + +IProcessor::Status DelayedSource::prepare() +{ + /// At first, wait for main input is needed and expand pipeline. + if (inputs.empty()) + { + auto & first_output = outputs.front(); + + /// If main port was finished before callback was called, stop execution. + if (first_output.isFinished()) + { + for (auto & output : outputs) + output.finish(); + + return Status::Finished; + } + + if (!first_output.isNeeded()) + return Status::PortFull; + + /// Call creator callback to get processors. + if (processors.empty()) + return Status::Ready; + + return Status::ExpandPipeline; + } + + /// Process ports in order: main, totals, extremes + auto output = outputs.begin(); + for (auto input = inputs.begin(); input != inputs.end(); ++input, ++output) + { + if (output->isFinished()) + { + input->close(); + continue; + } + + if (!output->isNeeded()) + return Status::PortFull; + + if (input->isFinished()) + { + output->finish(); + continue; + } + + input->setNeeded(); + if (!input->hasData()) + return Status::PortFull; + + output->pushData(input->pullData(true)); + return Status::PortFull; + } + + return Status::Finished; +} + +void DelayedSource::work() +{ + auto pipe = creator(); + + main_output = &pipe.getPort(); + totals_output = pipe.getTotalsPort(); + extremes_output = pipe.getExtremesPort(); + + processors = std::move(pipe).detachProcessors(); + + if (!totals_output) + { + processors.emplace_back(std::make_shared(main_output->getHeader())); + totals_output = &processors.back()->getOutputs().back(); + } + + if (!extremes_output) + { + processors.emplace_back(std::make_shared(main_output->getHeader())); + extremes_output = &processors.back()->getOutputs().back(); + } +} + +Processors DelayedSource::expandPipeline() +{ + /// Add new inputs. They must have the same header as output. + for (const auto & output : {main_output, totals_output, extremes_output}) + { + inputs.emplace_back(outputs.front().getHeader(), this); + /// Connect checks that header is same for ports. + connect(*output, inputs.back()); + inputs.back().setNeeded(); + } + + /// Executor will check that all processors are connected. + return std::move(processors); +} + +Pipe createDelayedPipe(const Block & header, DelayedSource::Creator processors_creator) +{ + auto source = std::make_shared(header, std::move(processors_creator)); + + Pipe pipe(&source->getPort(DelayedSource::Main)); + pipe.setTotalsPort(&source->getPort(DelayedSource::Totals)); + pipe.setExtremesPort(&source->getPort(DelayedSource::Extremes)); + + pipe.addProcessors({std::move(source)}); + return pipe; +} + +} diff --git a/src/Processors/Sources/DelayedSource.h b/src/Processors/Sources/DelayedSource.h new file mode 100644 index 00000000000..31ec1e054fe --- /dev/null +++ b/src/Processors/Sources/DelayedSource.h @@ -0,0 +1,45 @@ +#pragma once + +#include +#include + +namespace DB +{ + +/// DelayedSource delays pipeline calculation until it starts execution. +/// It accepts callback which creates a new pipe. +/// +/// First time when DelayedSource's main output port needs data, callback is called. +/// Then, DelayedSource expands pipeline: adds new inputs and connects pipe with it. +/// Then, DelayedSource just move data from inputs to outputs until finished. +/// +/// It main output port of DelayedSource is never needed, callback won't be called. +class DelayedSource : public IProcessor +{ +public: + using Creator = std::function; + + DelayedSource(const Block & header, Creator processors_creator); + String getName() const override { return "Delayed"; } + + Status prepare() override; + void work() override; + Processors expandPipeline() override; + + enum PortKind { Main = 0, Totals = 1, Extremes = 2 }; + OutputPort & getPort(PortKind kind) { return *std::next(outputs.begin(), kind); } + +private: + Creator creator; + Processors processors; + + /// Outputs from returned pipe. + OutputPort * main_output = nullptr; + OutputPort * totals_output = nullptr; + OutputPort * extremes_output = nullptr; +}; + +/// Creates pipe from DelayedSource. +Pipe createDelayedPipe(const Block & header, DelayedSource::Creator processors_creator); + +} diff --git a/src/Processors/Sources/RemoteSource.cpp b/src/Processors/Sources/RemoteSource.cpp new file mode 100644 index 00000000000..2f76e0c87d4 --- /dev/null +++ b/src/Processors/Sources/RemoteSource.cpp @@ -0,0 +1,132 @@ +#include +#include +#include +#include + +namespace DB +{ + +RemoteSource::RemoteSource(RemoteQueryExecutorPtr executor, bool add_aggregation_info_) + : SourceWithProgress(executor->getHeader(), false) + , add_aggregation_info(add_aggregation_info_), query_executor(std::move(executor)) +{ + /// Add AggregatedChunkInfo if we expect DataTypeAggregateFunction as a result. + const auto & sample = getPort().getHeader(); + for (auto & type : sample.getDataTypes()) + if (typeid_cast(type.get())) + add_aggregation_info = true; +} + +RemoteSource::~RemoteSource() = default; + +Chunk RemoteSource::generate() +{ + if (!was_query_sent) + { + /// Progress method will be called on Progress packet. + query_executor->setProgressCallback([this](const Progress & value) { progress(value); }); + + /// Get rows_before_limit result for remote query from ProfileInfo packet. + query_executor->setProfileInfoCallback([this](const BlockStreamProfileInfo & info) + { + if (rows_before_limit && info.hasAppliedLimit()) + rows_before_limit->set(info.getRowsBeforeLimit()); + }); + + query_executor->sendQuery(); + + was_query_sent = true; + } + + auto block = query_executor->read(); + + if (!block) + { + query_executor->finish(); + return {}; + } + + UInt64 num_rows = block.rows(); + Chunk chunk(block.getColumns(), num_rows); + + if (add_aggregation_info) + { + auto info = std::make_shared(); + info->bucket_num = block.info.bucket_num; + info->is_overflows = block.info.is_overflows; + chunk.setChunkInfo(std::move(info)); + } + + return chunk; +} + +void RemoteSource::onCancel() +{ + query_executor->cancel(); +} + + +RemoteTotalsSource::RemoteTotalsSource(RemoteQueryExecutorPtr executor) + : ISource(executor->getHeader()) + , query_executor(std::move(executor)) +{ +} + +RemoteTotalsSource::~RemoteTotalsSource() = default; + +Chunk RemoteTotalsSource::generate() +{ + if (auto block = query_executor->getTotals()) + { + UInt64 num_rows = block.rows(); + return Chunk(block.getColumns(), num_rows); + } + + return {}; +} + + +RemoteExtremesSource::RemoteExtremesSource(RemoteQueryExecutorPtr executor) + : ISource(executor->getHeader()) + , query_executor(std::move(executor)) +{ +} + +RemoteExtremesSource::~RemoteExtremesSource() = default; + +Chunk RemoteExtremesSource::generate() +{ + if (auto block = query_executor->getExtremes()) + { + UInt64 num_rows = block.rows(); + return Chunk(block.getColumns(), num_rows); + } + + return {}; +} + + +Pipe createRemoteSourcePipe( + RemoteQueryExecutorPtr query_executor, + bool add_aggregation_info, bool add_totals, bool add_extremes) +{ + Pipe pipe(std::make_shared(query_executor, add_aggregation_info)); + + if (add_totals) + { + auto totals_source = std::make_shared(query_executor); + pipe.setTotalsPort(&totals_source->getPort()); + pipe.addProcessors({std::move(totals_source)}); + } + + if (add_extremes) + { + auto extremes_source = std::make_shared(query_executor); + pipe.setExtremesPort(&extremes_source->getPort()); + pipe.addProcessors({std::move(extremes_source)}); + } + + return pipe; +} + +} diff --git a/src/Processors/Sources/RemoteSource.h b/src/Processors/Sources/RemoteSource.h new file mode 100644 index 00000000000..0b4405a0905 --- /dev/null +++ b/src/Processors/Sources/RemoteSource.h @@ -0,0 +1,82 @@ +#pragma once + +#include +#include +#include + +namespace DB +{ + +class RemoteQueryExecutor; +using RemoteQueryExecutorPtr = std::shared_ptr; + +/// Source from RemoteQueryExecutor. Executes remote query and returns query result chunks. +class RemoteSource : public SourceWithProgress +{ +public: + /// Flag add_aggregation_info tells if AggregatedChunkInfo should be added to result chunk. + /// AggregatedChunkInfo stores the bucket number used for two-level aggregation. + /// This flag should be typically enabled for queries with GROUP BY which are executed till WithMergeableState. + RemoteSource(RemoteQueryExecutorPtr executor, bool add_aggregation_info_); + ~RemoteSource() override; + + String getName() const override { return "Remote"; } + + void setRowsBeforeLimitCounter(RowsBeforeLimitCounterPtr counter) { rows_before_limit.swap(counter); } + + /// Stop reading from stream if output port is finished. + void onUpdatePorts() override + { + if (getPort().isFinished()) + cancel(); + } + +protected: + Chunk generate() override; + void onCancel() override; + +private: + bool was_query_sent = false; + bool add_aggregation_info = false; + RemoteQueryExecutorPtr query_executor; + RowsBeforeLimitCounterPtr rows_before_limit; +}; + +/// Totals source from RemoteQueryExecutor. +class RemoteTotalsSource : public ISource +{ +public: + explicit RemoteTotalsSource(RemoteQueryExecutorPtr executor); + ~RemoteTotalsSource() override; + + String getName() const override { return "RemoteTotals"; } + +protected: + Chunk generate() override; + +private: + RemoteQueryExecutorPtr query_executor; +}; + +/// Extremes source from RemoteQueryExecutor. +class RemoteExtremesSource : public ISource +{ +public: + explicit RemoteExtremesSource(RemoteQueryExecutorPtr executor); + ~RemoteExtremesSource() override; + + String getName() const override { return "RemoteExtremes"; } + +protected: + Chunk generate() override; + +private: + RemoteQueryExecutorPtr query_executor; +}; + +/// Create pipe with remote sources. +Pipe createRemoteSourcePipe( + RemoteQueryExecutorPtr query_executor, + bool add_aggregation_info, bool add_totals, bool add_extremes); + +} diff --git a/src/Processors/Sources/SourceWithProgress.cpp b/src/Processors/Sources/SourceWithProgress.cpp index 8d7a0a3d946..6488289d5ce 100644 --- a/src/Processors/Sources/SourceWithProgress.cpp +++ b/src/Processors/Sources/SourceWithProgress.cpp @@ -12,6 +12,11 @@ namespace ErrorCodes extern const int TOO_MANY_BYTES; } +SourceWithProgress::SourceWithProgress(Block header, bool enable_auto_progress) + : ISourceWithProgress(header), auto_progress(enable_auto_progress) +{ +} + void SourceWithProgress::work() { if (!limits.speed_limits.checkTimeLimit(total_stopwatch.elapsed(), limits.timeout_overflow_mode)) @@ -24,7 +29,7 @@ void SourceWithProgress::work() ISourceWithProgress::work(); - if (!was_progress_called && has_input) + if (auto_progress && !was_progress_called && has_input) progress({ current_chunk.chunk.getNumRows(), current_chunk.chunk.bytes() }); } } diff --git a/src/Processors/Sources/SourceWithProgress.h b/src/Processors/Sources/SourceWithProgress.h index 4778c50e49d..34810045143 100644 --- a/src/Processors/Sources/SourceWithProgress.h +++ b/src/Processors/Sources/SourceWithProgress.h @@ -44,6 +44,8 @@ class SourceWithProgress : public ISourceWithProgress { public: using ISourceWithProgress::ISourceWithProgress; + /// If enable_auto_progress flag is set, progress() will be automatically called on each generated chunk. + SourceWithProgress(Block header, bool enable_auto_progress); using LocalLimits = IBlockInputStream::LocalLimits; using LimitsMode = IBlockInputStream::LimitsMode; @@ -76,6 +78,9 @@ private: /// This flag checks if progress() was manually called at generate() call. /// If not, it will be called for chunk after generate() was finished. bool was_progress_called = false; + + /// If enabled, progress() will be automatically called on each generated chunk. + bool auto_progress = true; }; } diff --git a/src/Processors/ya.make b/src/Processors/ya.make index 064505673d1..ccc48047763 100644 --- a/src/Processors/ya.make +++ b/src/Processors/ya.make @@ -106,9 +106,11 @@ SRCS( Port.cpp QueryPipeline.cpp ResizeProcessor.cpp + Sources/DelayedSource.cpp Sources/SinkToOutputStream.cpp Sources/SourceFromInputStream.cpp Sources/SourceWithProgress.cpp + Sources/RemoteSource.cpp Transforms/AddingMissedTransform.cpp Transforms/AddingSelectorTransform.cpp Transforms/AggregatingTransform.cpp