mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 15:42:02 +00:00
processor profile
This commit is contained in:
parent
aff8149f5c
commit
99a38e41aa
@ -26,12 +26,18 @@ NamesAndTypesList ProcessorProfileLogElement::getNamesAndTypes()
|
||||
|
||||
{"id", std::make_shared<DataTypeUInt64>()},
|
||||
{"parent_ids", std::make_shared<DataTypeArray>(std::make_shared<DataTypeUInt64>())},
|
||||
{"plan_step", std::make_shared<DataTypeUInt64>()},
|
||||
{"plan_group", std::make_shared<DataTypeUInt64>()},
|
||||
|
||||
{"query_id", std::make_shared<DataTypeString>()},
|
||||
{"name", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())},
|
||||
{"elapsed_us", std::make_shared<DataTypeUInt64>()},
|
||||
{"input_wait_elapsed_us", std::make_shared<DataTypeUInt64>()},
|
||||
{"output_wait_elapsed_us", std::make_shared<DataTypeUInt64>()},
|
||||
{"input_rows", std::make_shared<DataTypeUInt64>()},
|
||||
{"input_bytes", std::make_shared<DataTypeUInt64>()},
|
||||
{"output_rows", std::make_shared<DataTypeUInt64>()},
|
||||
{"output_bytes", std::make_shared<DataTypeUInt64>()},
|
||||
};
|
||||
}
|
||||
|
||||
@ -52,11 +58,17 @@ void ProcessorProfileLogElement::appendToBlock(MutableColumns & columns) const
|
||||
columns[i++]->insert(parent_ids_array);
|
||||
}
|
||||
|
||||
columns[i++]->insert(plan_step);
|
||||
columns[i++]->insert(plan_group);
|
||||
columns[i++]->insertData(query_id.data(), query_id.size());
|
||||
columns[i++]->insertData(processor_name.data(), processor_name.size());
|
||||
columns[i++]->insert(elapsed_us);
|
||||
columns[i++]->insert(input_wait_elapsed_us);
|
||||
columns[i++]->insert(output_wait_elapsed_us);
|
||||
columns[i++]->insert(input_rows);
|
||||
columns[i++]->insert(input_bytes);
|
||||
columns[i++]->insert(output_rows);
|
||||
columns[i++]->insert(output_bytes);
|
||||
}
|
||||
|
||||
ProcessorsProfileLog::ProcessorsProfileLog(ContextPtr context_, const String & database_name_,
|
||||
|
@ -13,9 +13,12 @@ struct ProcessorProfileLogElement
|
||||
time_t event_time{};
|
||||
Decimal64 event_time_microseconds{};
|
||||
|
||||
UInt64 id;
|
||||
UInt64 id{};
|
||||
std::vector<UInt64> parent_ids;
|
||||
|
||||
UInt64 plan_step{};
|
||||
UInt64 plan_group{};
|
||||
|
||||
String query_id;
|
||||
String processor_name;
|
||||
|
||||
@ -26,6 +29,11 @@ struct ProcessorProfileLogElement
|
||||
/// IProcessor::PortFull
|
||||
UInt32 output_wait_elapsed_us{};
|
||||
|
||||
size_t input_rows{};
|
||||
size_t input_bytes{};
|
||||
size_t output_rows{};
|
||||
size_t output_bytes{};
|
||||
|
||||
static std::string name() { return "ProcessorsProfileLog"; }
|
||||
static NamesAndTypesList getNamesAndTypes();
|
||||
static NamesAndAliases getNamesAndAliases() { return {}; }
|
||||
|
@ -924,12 +924,21 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
|
||||
processor_elem.id = get_proc_id(*processor);
|
||||
processor_elem.parent_ids = std::move(parents);
|
||||
|
||||
processor_elem.plan_step = reinterpret_cast<std::uintptr_t>(processor->getQueryPlanStep());
|
||||
processor_elem.plan_group = processor->getQueryPlanStepGroup();
|
||||
|
||||
processor_elem.processor_name = processor->getName();
|
||||
|
||||
processor_elem.elapsed_us = processor->getElapsedUs();
|
||||
processor_elem.input_wait_elapsed_us = processor->getInputWaitElapsedUs();
|
||||
processor_elem.output_wait_elapsed_us = processor->getOutputWaitElapsedUs();
|
||||
|
||||
auto stats = processor->getProcessorDataStats();
|
||||
processor_elem.input_rows = stats.input_rows;
|
||||
processor_elem.input_bytes = stats.input_bytes;
|
||||
processor_elem.output_rows = stats.output_rows;
|
||||
processor_elem.output_bytes = stats.output_bytes;
|
||||
|
||||
processors_profile_log->add(processor_elem);
|
||||
}
|
||||
}
|
||||
|
@ -307,6 +307,33 @@ public:
|
||||
uint64_t getInputWaitElapsedUs() const { return input_wait_elapsed_us; }
|
||||
uint64_t getOutputWaitElapsedUs() const { return output_wait_elapsed_us; }
|
||||
|
||||
struct ProcessorDataStats
|
||||
{
|
||||
size_t input_rows = 0;
|
||||
size_t input_bytes = 0;
|
||||
size_t output_rows = 0;
|
||||
size_t output_bytes = 0;
|
||||
};
|
||||
|
||||
ProcessorDataStats getProcessorDataStats() const
|
||||
{
|
||||
ProcessorDataStats stats;
|
||||
|
||||
for (const auto & input : inputs)
|
||||
{
|
||||
stats.input_rows += input.rows;
|
||||
stats.input_bytes += input.bytes;
|
||||
}
|
||||
|
||||
for (const auto & output : outputs)
|
||||
{
|
||||
stats.output_rows += output.rows;
|
||||
stats.output_bytes += output.bytes;
|
||||
}
|
||||
|
||||
return stats;
|
||||
}
|
||||
|
||||
struct ReadProgressCounters
|
||||
{
|
||||
uint64_t read_rows = 0;
|
||||
|
@ -254,6 +254,10 @@ protected:
|
||||
if (likely(update_info))
|
||||
update_info->update();
|
||||
}
|
||||
|
||||
/// For processors_profile_log
|
||||
size_t rows = 0;
|
||||
size_t bytes = 0;
|
||||
};
|
||||
|
||||
/// Invariants:
|
||||
@ -300,6 +304,9 @@ public:
|
||||
chunk.dumpStructure());
|
||||
}
|
||||
|
||||
rows += data->chunk.getNumRows();
|
||||
bytes += data->chunk.bytes();
|
||||
|
||||
return std::move(*data);
|
||||
}
|
||||
|
||||
@ -422,6 +429,10 @@ public:
|
||||
|
||||
std::uintptr_t flags = 0;
|
||||
*data = std::move(data_);
|
||||
|
||||
rows += data->chunk.getNumRows();
|
||||
bytes += data->chunk.bytes();
|
||||
|
||||
state->push(data, flags);
|
||||
}
|
||||
|
||||
|
@ -25,13 +25,17 @@ SELECT
|
||||
-- NullSource/LazyOutputFormatLazyOutputFormat are the outputs
|
||||
-- so they cannot starts to execute before sleep(1) will be executed.
|
||||
input_wait_elapsed_us>1e6)
|
||||
elapsed
|
||||
elapsed,
|
||||
input_rows,
|
||||
input_bytes,
|
||||
output_rows,
|
||||
output_bytes
|
||||
FROM system.processors_profile_log
|
||||
WHERE query_id = query_id_
|
||||
ORDER BY name;
|
||||
ExpressionTransform 1
|
||||
LazyOutputFormat 1
|
||||
LimitsCheckingTransform 1
|
||||
NullSource 1
|
||||
NullSource 1
|
||||
SourceFromSingleChunk 1
|
||||
ExpressionTransform 1 1 1 1 1
|
||||
LazyOutputFormat 1 1 1 0 0
|
||||
LimitsCheckingTransform 1 1 1 1 1
|
||||
NullSource 1 0 0 0 0
|
||||
NullSource 1 0 0 0 0
|
||||
SourceFromSingleChunk 1 0 0 1 1
|
||||
|
@ -22,7 +22,11 @@ SELECT
|
||||
-- NullSource/LazyOutputFormatLazyOutputFormat are the outputs
|
||||
-- so they cannot starts to execute before sleep(1) will be executed.
|
||||
input_wait_elapsed_us>1e6)
|
||||
elapsed
|
||||
elapsed,
|
||||
input_rows,
|
||||
input_bytes,
|
||||
output_rows,
|
||||
output_bytes
|
||||
FROM system.processors_profile_log
|
||||
WHERE query_id = query_id_
|
||||
ORDER BY name;
|
||||
|
@ -0,0 +1,5 @@
|
||||
499999500000
|
||||
AggregatingTransform 1000002 8000016 2 16
|
||||
ExpressionTransform 1 8 1 8
|
||||
ExpressionTransform 1000000 8000000 1000000 8000000
|
||||
NumbersMt 3 24 1000003 8000024
|
19
tests/queries/0_stateless/02210_processors_profile_log_2.sh
Executable file
19
tests/queries/0_stateless/02210_processors_profile_log_2.sh
Executable file
@ -0,0 +1,19 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
set -eo pipefail
|
||||
|
||||
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
# shellcheck source=../shell_config.sh
|
||||
. "$CURDIR"/../shell_config.sh
|
||||
|
||||
QUERY_ID=$(${CLICKHOUSE_CLIENT} -q "select lower(hex(reverse(reinterpretAsString(generateUUIDv4()))))")
|
||||
|
||||
|
||||
${CLICKHOUSE_CLIENT} --query_id "${QUERY_ID}" <<EOF
|
||||
SELECT sum(number) FROM numbers_mt(1000000)
|
||||
SETTINGS log_processors_profiles=true, log_queries=1, log_queries_min_type='QUERY_FINISH';
|
||||
EOF
|
||||
|
||||
${CLICKHOUSE_CLIENT} -q "SYSTEM FLUSH LOGS"
|
||||
|
||||
${CLICKHOUSE_CLIENT} -q "select any(name) name, sum(input_rows), sum(input_bytes), sum(output_rows), sum(output_bytes) from system.processors_profile_log where query_id = '${QUERY_ID}' group by plan_step, plan_group order by name"
|
Loading…
Reference in New Issue
Block a user