Collect the amount of parts, rows and marks during the build query pipeline

This commit is contained in:
Peng Jian 2021-07-13 22:50:47 +08:00
parent 9c0097cd9b
commit 8522589483
4 changed files with 15 additions and 15 deletions

View File

@ -88,7 +88,6 @@ Block InterpreterExplainQuery::getSampleBlock(const ASTExplainQuery::ExplainKind
{"parts", std::make_shared<DataTypeUInt64>()}, {"parts", std::make_shared<DataTypeUInt64>()},
{"rows", std::make_shared<DataTypeUInt64>()}, {"rows", std::make_shared<DataTypeUInt64>()},
{"marks", std::make_shared<DataTypeUInt64>()}, {"marks", std::make_shared<DataTypeUInt64>()},
{"bytes", std::make_shared<DataTypeUInt64>()}
}; };
return Block({ return Block({
{cols[0].type->createColumn(), cols[0].type, cols[0].name}, {cols[0].type->createColumn(), cols[0].type, cols[0].name},
@ -96,7 +95,6 @@ Block InterpreterExplainQuery::getSampleBlock(const ASTExplainQuery::ExplainKind
{cols[2].type->createColumn(), cols[2].type, cols[2].name}, {cols[2].type->createColumn(), cols[2].type, cols[2].name},
{cols[3].type->createColumn(), cols[3].type, cols[3].name}, {cols[3].type->createColumn(), cols[3].type, cols[3].name},
{cols[4].type->createColumn(), cols[4].type, cols[4].name}, {cols[4].type->createColumn(), cols[4].type, cols[4].name},
{cols[5].type->createColumn(), cols[5].type, cols[5].name},
}); });
} }
else else
@ -343,6 +341,10 @@ BlockInputStreamPtr InterpreterExplainQuery::executeImpl()
InterpreterSelectWithUnionQuery interpreter(ast.getExplainedQuery(), getContext(), SelectQueryOptions()); InterpreterSelectWithUnionQuery interpreter(ast.getExplainedQuery(), getContext(), SelectQueryOptions());
interpreter.buildQueryPlan(plan); interpreter.buildQueryPlan(plan);
// collect the selected marks, rows, parts during build query pipeline.
plan.buildQueryPipeline(
QueryPlanOptimizationSettings::fromContext(getContext()),
BuildQueryPipelineSettings::fromContext(getContext()));
if (settings.optimize) if (settings.optimize)
plan.optimize(QueryPlanOptimizationSettings::fromContext(getContext())); plan.optimize(QueryPlanOptimizationSettings::fromContext(getContext()));

View File

@ -446,7 +446,6 @@ void QueryPlan::explainEstimate(MutableColumns & columns)
UInt64 parts = 0; UInt64 parts = 0;
UInt64 rows = 0; UInt64 rows = 0;
UInt64 marks = 0; UInt64 marks = 0;
UInt64 bytes = 0;
EstimateCounters(const std::string & database, const std::string & table) : database_name(database), table_name(table) EstimateCounters(const std::string & database, const std::string & table) : database_name(database), table_name(table)
{ {
@ -472,7 +471,6 @@ void QueryPlan::explainEstimate(MutableColumns & columns)
it->second->parts += step->getSelectedParts(); it->second->parts += step->getSelectedParts();
it->second->rows += step->getSelectedRows(); it->second->rows += step->getSelectedRows();
it->second->marks += step->getSelectedMarks(); it->second->marks += step->getSelectedMarks();
it->second->bytes += step->getSelectedBytes();
} }
for (const auto * child : node->children) for (const auto * child : node->children)
process_node(child); process_node(child);
@ -489,7 +487,6 @@ void QueryPlan::explainEstimate(MutableColumns & columns)
columns[index++]->insert(counter.second->parts); columns[index++]->insert(counter.second->parts);
columns[index++]->insert(counter.second->rows); columns[index++]->insert(counter.second->rows);
columns[index++]->insert(counter.second->marks); columns[index++]->insert(counter.second->marks);
columns[index++]->insert(counter.second->bytes);
} }
} }

View File

@ -47,6 +47,9 @@ struct ReadFromMergeTree::AnalysisResult
IndexStats index_stats; IndexStats index_stats;
Names column_names_to_read; Names column_names_to_read;
ReadFromMergeTree::ReadType read_type = ReadFromMergeTree::ReadType::Default; ReadFromMergeTree::ReadType read_type = ReadFromMergeTree::ReadType::Default;
UInt64 selected_rows = 0;
UInt64 selected_marks = 0;
UInt64 selected_parts = 0;
}; };
static MergeTreeReaderSettings getMergeTreeReaderSettings(const ContextPtr & context) static MergeTreeReaderSettings getMergeTreeReaderSettings(const ContextPtr & context)
@ -113,13 +116,6 @@ ReadFromMergeTree::ReadFromMergeTree(
auto type = std::make_shared<DataTypeFloat64>(); auto type = std::make_shared<DataTypeFloat64>();
output_stream->header.insert({type->createColumn(), type, "_sample_factor"}); output_stream->header.insert({type->createColumn(), type, "_sample_factor"});
} }
selected_parts = prepared_parts.size();
for (auto & p : prepared_parts)
{
selected_marks += p->getMarksCount();
selected_rows += p->rows_count;
selected_bytes += p->getBytesOnDisk();
}
} }
Pipe ReadFromMergeTree::readFromPool( Pipe ReadFromMergeTree::readFromPool(
@ -845,13 +841,17 @@ ReadFromMergeTree::AnalysisResult ReadFromMergeTree::selectRangesToRead(MergeTre
size_t sum_marks = 0; size_t sum_marks = 0;
size_t sum_ranges = 0; size_t sum_ranges = 0;
size_t sum_rows = 0;
for (const auto & part : result.parts_with_ranges) for (const auto & part : result.parts_with_ranges)
{ {
sum_ranges += part.ranges.size(); sum_ranges += part.ranges.size();
sum_marks += part.getMarksCount(); sum_marks += part.getMarksCount();
sum_rows += part.getRowsCount();
} }
result.selected_parts = result.parts_with_ranges.size();
result.selected_marks = sum_marks;
result.selected_rows = sum_rows;
LOG_DEBUG( LOG_DEBUG(
log, log,
"Selected {}/{} parts by partition key, {} parts by primary key, {}/{} marks by primary key, {} marks to read from {} ranges", "Selected {}/{} parts by partition key, {} parts by primary key, {}/{} marks by primary key, {} marks to read from {} ranges",
@ -889,6 +889,9 @@ void ReadFromMergeTree::initializePipeline(QueryPipeline & pipeline, const Build
return; return;
} }
selected_marks = result.selected_marks;
selected_rows = result.selected_rows;
selected_parts = result.selected_parts;
/// Projection, that needed to drop columns, which have appeared by execution /// Projection, that needed to drop columns, which have appeared by execution
/// of some extra expressions, and to allow execute the same expressions later. /// of some extra expressions, and to allow execute the same expressions later.
/// NOTE: It may lead to double computation of expressions. /// NOTE: It may lead to double computation of expressions.

View File

@ -84,7 +84,6 @@ public:
UInt64 getSelectedParts() const { return selected_parts; } UInt64 getSelectedParts() const { return selected_parts; }
UInt64 getSelectedRows() const { return selected_rows; } UInt64 getSelectedRows() const { return selected_rows; }
UInt64 getSelectedMarks() const { return selected_marks; } UInt64 getSelectedMarks() const { return selected_marks; }
UInt64 getSelectedBytes() const { return selected_bytes; }
private: private:
const MergeTreeReaderSettings reader_settings; const MergeTreeReaderSettings reader_settings;
@ -114,7 +113,6 @@ private:
UInt64 selected_parts = 0; UInt64 selected_parts = 0;
UInt64 selected_rows = 0; UInt64 selected_rows = 0;
UInt64 selected_marks = 0; UInt64 selected_marks = 0;
UInt64 selected_bytes = 0;
Pipe read(RangesInDataParts parts_with_range, Names required_columns, ReadType read_type, size_t max_streams, size_t min_marks_for_concurrent_read, bool use_uncompressed_cache); Pipe read(RangesInDataParts parts_with_range, Names required_columns, ReadType read_type, size_t max_streams, size_t min_marks_for_concurrent_read, bool use_uncompressed_cache);
Pipe readFromPool(RangesInDataParts parts_with_ranges, Names required_columns, size_t max_streams, size_t min_marks_for_concurrent_read, bool use_uncompressed_cache); Pipe readFromPool(RangesInDataParts parts_with_ranges, Names required_columns, size_t max_streams, size_t min_marks_for_concurrent_read, bool use_uncompressed_cache);