From 57d727d0784fad8d4cddd4d1c734a5626c3fbb65 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Thu, 2 Jul 2020 17:51:10 +0300 Subject: [PATCH] Fix result_rows and result_bytes metrics for selects. --- src/DataStreams/BlockIO.h | 12 +++++++++--- src/Interpreters/executeQuery.cpp | 11 ++++++++++- src/Processors/Formats/IOutputFormat.cpp | 2 ++ src/Processors/Formats/IOutputFormat.h | 8 ++++++++ src/Processors/QueryPipeline.h | 2 ++ 5 files changed, 31 insertions(+), 4 deletions(-) diff --git a/src/DataStreams/BlockIO.h b/src/DataStreams/BlockIO.h index d4733e6aebe..91d7efac8d1 100644 --- a/src/DataStreams/BlockIO.h +++ b/src/DataStreams/BlockIO.h @@ -31,8 +31,8 @@ struct BlockIO QueryPipeline pipeline; /// Callbacks for query logging could be set here. - std::function finish_callback; - std::function exception_callback; + std::function finish_callback; + std::function exception_callback; /// When it is true, don't bother sending any non-empty blocks to the out stream bool null_format = false; @@ -41,7 +41,13 @@ struct BlockIO void onFinish() { if (finish_callback) - finish_callback(in.get(), out.get()); + { + QueryPipeline * pipeline_ptr = nullptr; + if (pipeline.initialized()) + pipeline_ptr = &pipeline; + + finish_callback(in.get(), out.get(), pipeline_ptr); + } } void onException() diff --git a/src/Interpreters/executeQuery.cpp b/src/Interpreters/executeQuery.cpp index 41fa60324ec..860c56b1052 100644 --- a/src/Interpreters/executeQuery.cpp +++ b/src/Interpreters/executeQuery.cpp @@ -479,7 +479,8 @@ static std::tuple executeQueryImpl( } /// Also make possible for caller to log successful query finish and exception during execution. - auto finish_callback = [elem, &context, log_queries, log_queries_min_type = settings.log_queries_min_type] (IBlockInputStream * stream_in, IBlockOutputStream * stream_out) mutable + auto finish_callback = [elem, &context, log_queries, log_queries_min_type = settings.log_queries_min_type] + (IBlockInputStream * stream_in, IBlockOutputStream * stream_out, QueryPipeline * query_pipeline) mutable { QueryStatus * process_list_elem = context.getProcessListElement(); @@ -528,6 +529,14 @@ static std::tuple executeQueryImpl( elem.result_bytes = counting_stream->getProgress().read_bytes; } } + else if (query_pipeline) + { + if (const auto * output_format = query_pipeline->getOutputFormat()) + { + elem.result_rows = output_format->getResultRows(); + elem.result_bytes = output_format->getResultBytes(); + } + } if (elem.read_rows != 0) { diff --git a/src/Processors/Formats/IOutputFormat.cpp b/src/Processors/Formats/IOutputFormat.cpp index 334843036dc..f7fc6170cad 100644 --- a/src/Processors/Formats/IOutputFormat.cpp +++ b/src/Processors/Formats/IOutputFormat.cpp @@ -59,6 +59,8 @@ void IOutputFormat::work() switch (current_block_kind) { case Main: + result_rows += current_chunk.getNumRows(); + result_bytes += current_chunk.allocatedBytes(); consume(std::move(current_chunk)); break; case Totals: diff --git a/src/Processors/Formats/IOutputFormat.h b/src/Processors/Formats/IOutputFormat.h index 2e3db50ee6e..5b8f664aa23 100644 --- a/src/Processors/Formats/IOutputFormat.h +++ b/src/Processors/Formats/IOutputFormat.h @@ -79,6 +79,14 @@ public: void setTotals(const Block & totals) { consumeTotals(Chunk(totals.getColumns(), totals.rows())); } void setExtremes(const Block & extremes) { consumeExtremes(Chunk(extremes.getColumns(), extremes.rows())); } + + size_t getResultRows() const { return result_rows; } + size_t getResultBytes() const { return result_rows; } + +private: + /// Counters for consumed chunks. Are used for QueryLog. + size_t result_rows = 0; + size_t result_bytes = 0; }; } diff --git a/src/Processors/QueryPipeline.h b/src/Processors/QueryPipeline.h index 7dddb2526e5..9a33b549ab5 100644 --- a/src/Processors/QueryPipeline.h +++ b/src/Processors/QueryPipeline.h @@ -110,6 +110,8 @@ public: void addCreatingSetsTransform(ProcessorPtr transform); /// Resize pipeline to single output and add IOutputFormat. Pipeline will be completed after this transformation. void setOutputFormat(ProcessorPtr output); + /// Get current OutputFormat. + IOutputFormat * getOutputFormat() const { return output_format; } /// Sink is a processor with single input port and no output ports. Creates sink for each output port. /// Pipeline will be completed after this transformation. void setSinks(const ProcessorGetterWithStreamKind & getter);