diff --git a/src/DataStreams/PushingToViewsBlockOutputStream.cpp b/src/DataStreams/PushingToViewsBlockOutputStream.cpp index 37a1ffa4781..5cfb80e5564 100644 --- a/src/DataStreams/PushingToViewsBlockOutputStream.cpp +++ b/src/DataStreams/PushingToViewsBlockOutputStream.cpp @@ -602,8 +602,6 @@ static void logQueryViews(std::list & views, ContextPtr context try { - //std::cerr << "============ Logging for " << static_cast(view.runtime_stats.thread_status.get()) << ' ' << view.table_id.getNameForLogs() << "\n"; - if (view.runtime_stats.thread_status) view.runtime_stats.thread_status->logToQueryViewsLog(view); } diff --git a/src/Interpreters/executeDDLQueryOnCluster.cpp b/src/Interpreters/executeDDLQueryOnCluster.cpp index f32bf301070..576c1f3ffdd 100644 --- a/src/Interpreters/executeDDLQueryOnCluster.cpp +++ b/src/Interpreters/executeDDLQueryOnCluster.cpp @@ -217,7 +217,7 @@ BlockIO getDistributedDDLStatus(const String & node_path, const DDLLogEntry & en io.pipeline = QueryPipeline(std::move(source)); if (context->getSettingsRef().distributed_ddl_output_mode == DistributedDDLOutputMode::NONE) - io.pipeline.complete(Pipe(std::make_shared(io.pipeline.getHeader()))); + io.pipeline.complete(std::make_shared(io.pipeline.getHeader())); return io; } diff --git a/src/Interpreters/executeQuery.cpp b/src/Interpreters/executeQuery.cpp index 8d750e57278..d98f3f4bd00 100644 --- a/src/Interpreters/executeQuery.cpp +++ b/src/Interpreters/executeQuery.cpp @@ -657,13 +657,13 @@ static std::tuple executeQueryImpl( /// Hold element of process list till end of query execution. res.process_list_entry = process_list_entry; - if (pipeline.pulling()) + if (pipeline.pulling() || pipeline.completed()) { /// Limits on the result, the quota on the result, and also callback for progress. /// Limits apply only to the final result. pipeline.setProgressCallback(context->getProgressCallback()); pipeline.setProcessListElement(context->getProcessListElement()); - if (stage == QueryProcessingStage::Complete) + if (stage == QueryProcessingStage::Complete && pipeline.pulling()) pipeline.setLimitsAndQuota(limits, quota); } else if (pipeline.pushing()) diff --git a/src/Processors/QueryPipeline.cpp b/src/Processors/QueryPipeline.cpp index c1600dc1c31..0b4b2c308fe 100644 --- a/src/Processors/QueryPipeline.cpp +++ b/src/Processors/QueryPipeline.cpp @@ -322,6 +322,19 @@ static void drop(OutputPort *& port, Processors & processors) QueryPipeline::QueryPipeline(std::shared_ptr sink) : QueryPipeline(Chain(std::move(sink))) {} +void QueryPipeline::complete(std::shared_ptr sink) +{ + if (!pulling()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Pipeline must be pulling to be completed with chain"); + + drop(totals, processors); + drop(extremes, processors); + + connect(*output, sink->getPort()); + processors.emplace_back(std::move(sink)); + output = nullptr; +} + void QueryPipeline::complete(Chain chain) { if (!pulling()) @@ -444,7 +457,7 @@ void QueryPipeline::setProcessListElement(QueryStatus * elem) { process_list_element = elem; - if (pulling()) + if (pulling() || completed()) { for (auto & processor : processors) { @@ -467,7 +480,7 @@ void QueryPipeline::setLimitsAndQuota(const StreamLocalLimits & limits, std::sha if (!pulling()) throw Exception( ErrorCodes::LOGICAL_ERROR, - "It is possible to set limits and quota only to pullint QueryPipeline"); + "It is possible to set limits and quota only to pulling QueryPipeline"); auto transform = std::make_shared(output->getHeader(), limits); transform->setQuota(quota); diff --git a/src/Processors/QueryPipeline.h b/src/Processors/QueryPipeline.h index de1396e9565..ff4c80a55c0 100644 --- a/src/Processors/QueryPipeline.h +++ b/src/Processors/QueryPipeline.h @@ -25,6 +25,7 @@ class Chain; class IOutputFormat; class SinkToStorage; class ISource; +class ISink; class QueryPipeline { @@ -79,6 +80,7 @@ public: void complete(std::shared_ptr format); void complete(Chain chain); void complete(std::shared_ptr sink); + void complete(std::shared_ptr sink); /// Only for pushing and pulling. Block getHeader() const;