Merge pull request #12089 from ClickHouse/fix-11595

Fix result_rows and result_bytes metrics for selects.
This commit is contained in:
alexey-milovidov 2020-07-04 23:41:51 +03:00 committed by GitHub
commit cf1967bb29
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 38 additions and 4 deletions

View File

@ -31,8 +31,8 @@ struct BlockIO
QueryPipeline pipeline;
/// Callbacks for query logging could be set here.
std::function<void(IBlockInputStream *, IBlockOutputStream *)> finish_callback;
std::function<void()> exception_callback;
std::function<void(IBlockInputStream *, IBlockOutputStream *, QueryPipeline *)> finish_callback;
std::function<void()> exception_callback;
/// When it is true, don't bother sending any non-empty blocks to the out stream
bool null_format = false;
@ -41,7 +41,13 @@ struct BlockIO
void onFinish()
{
if (finish_callback)
finish_callback(in.get(), out.get());
{
QueryPipeline * pipeline_ptr = nullptr;
if (pipeline.initialized())
pipeline_ptr = &pipeline;
finish_callback(in.get(), out.get(), pipeline_ptr);
}
}
void onException()

View File

@ -479,7 +479,8 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
}
/// Also make possible for caller to log successful query finish and exception during execution.
auto finish_callback = [elem, &context, log_queries, log_queries_min_type = settings.log_queries_min_type] (IBlockInputStream * stream_in, IBlockOutputStream * stream_out) mutable
auto finish_callback = [elem, &context, log_queries, log_queries_min_type = settings.log_queries_min_type]
(IBlockInputStream * stream_in, IBlockOutputStream * stream_out, QueryPipeline * query_pipeline) mutable
{
QueryStatus * process_list_elem = context.getProcessListElement();
@ -528,6 +529,14 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
elem.result_bytes = counting_stream->getProgress().read_bytes;
}
}
else if (query_pipeline)
{
if (const auto * output_format = query_pipeline->getOutputFormat())
{
elem.result_rows = output_format->getResultRows();
elem.result_bytes = output_format->getResultBytes();
}
}
if (elem.read_rows != 0)
{

View File

@ -59,6 +59,8 @@ void IOutputFormat::work()
switch (current_block_kind)
{
case Main:
result_rows += current_chunk.getNumRows();
result_bytes += current_chunk.allocatedBytes();
consume(std::move(current_chunk));
break;
case Totals:

View File

@ -79,6 +79,14 @@ public:
void setTotals(const Block & totals) { consumeTotals(Chunk(totals.getColumns(), totals.rows())); }
void setExtremes(const Block & extremes) { consumeExtremes(Chunk(extremes.getColumns(), extremes.rows())); }
size_t getResultRows() const { return result_rows; }
size_t getResultBytes() const { return result_bytes; }
private:
/// Counters for consumed chunks. Are used for QueryLog.
size_t result_rows = 0;
size_t result_bytes = 0;
};
}

View File

@ -110,6 +110,8 @@ public:
void addCreatingSetsTransform(ProcessorPtr transform);
/// Resize pipeline to single output and add IOutputFormat. Pipeline will be completed after this transformation.
void setOutputFormat(ProcessorPtr output);
/// Get current OutputFormat.
IOutputFormat * getOutputFormat() const { return output_format; }
/// Sink is a processor with single input port and no output ports. Creates sink for each output port.
/// Pipeline will be completed after this transformation.
void setSinks(const ProcessorGetterWithStreamKind & getter);

View File

@ -0,0 +1,2 @@
1
1 1

View File

@ -0,0 +1,5 @@
set log_queries = 1;
select count() > 0 from system.settings;
system flush logs;
select result_rows, result_bytes >= 8 from system.query_log where event_date >= today() - 1 and lower(query) like '%select count() > 0 from system.settings%' and type = 'QueryFinish' order by query_start_time desc limit 1;