Rename need_data_elapsed_us/port_full_elapsed_us to input_wait_us/output_wait_us

$ gg -e need_data_ -e port_full_  | cut -d: -f1 | sort -u | xargs sed -i -e s/port_full_/output_wait_/g -e s/need_data_/input_wait_/g -e s/getPortFull/getOutputWait/g -e s/getNeedData/getInputWait/g

Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
This commit is contained in:
Azat Khuzhin 2022-02-27 13:52:27 +03:00
parent 0d06dab362
commit 99528e296c
10 changed files with 47 additions and 47 deletions

View File

@ -9,8 +9,8 @@ Columns:
- `query_id` ([String](../../sql-reference/data-types/string.md)) — ID of the query - `query_id` ([String](../../sql-reference/data-types/string.md)) — ID of the query
- `name` ([LowCardinality(String)](../../sql-reference/data-types/lowcardinality.md)) — Name of the processor. - `name` ([LowCardinality(String)](../../sql-reference/data-types/lowcardinality.md)) — Name of the processor.
- `elapsed_us` ([UInt64](../../sql-reference/data-types/int-uint.md)) — Number of microseconds this processor was executed. - `elapsed_us` ([UInt64](../../sql-reference/data-types/int-uint.md)) — Number of microseconds this processor was executed.
- `need_data_elapsed_us` ([UInt64](../../sql-reference/data-types/int-uint.md)) — Number of microseconds this processor was waiting for data (from other processor). - `input_wait_elapsed_us` ([UInt64](../../sql-reference/data-types/int-uint.md)) — Number of microseconds this processor was waiting for data (from other processor).
- `port_full_elapsed_us` ([UInt64](../../sql-reference/data-types/int-uint.md)) — Number of microseconds this processor was waiting because output port was full. - `output_wait_elapsed_us` ([UInt64](../../sql-reference/data-types/int-uint.md)) — Number of microseconds this processor was waiting because output port was full.
**Example** **Example**
@ -42,8 +42,8 @@ Query id: feb5ed16-1c24-4227-aa54-78c02b3b27d4
SELECT SELECT
name, name,
elapsed_us, elapsed_us,
need_data_elapsed_us, input_wait_elapsed_us,
port_full_elapsed_us output_wait_elapsed_us
FROM system.processors_profile_log FROM system.processors_profile_log
WHERE query_id = 'feb5ed16-1c24-4227-aa54-78c02b3b27d4' WHERE query_id = 'feb5ed16-1c24-4227-aa54-78c02b3b27d4'
ORDER BY name ASC ORDER BY name ASC
@ -52,21 +52,21 @@ ORDER BY name ASC
Result: Result:
``` text ``` text
┌─name────────────────────┬─elapsed_us─┬─need_data_elapsed_us─┬─port_full_elapsed_us─┐ ┌─name────────────────────┬─elapsed_us─┬─input_wait_elapsed_us─┬─output_wait_elapsed_us─┐
│ ExpressionTransform │ 1000497 │ 2823 │ 197 │ │ ExpressionTransform │ 1000497 │ 2823 │ 197 │
│ LazyOutputFormat │ 36 │ 1002188 │ 0 │ │ LazyOutputFormat │ 36 │ 1002188 │ 0 │
│ LimitsCheckingTransform │ 10 │ 1002994 │ 106 │ │ LimitsCheckingTransform │ 10 │ 1002994 │ 106 │
│ NullSource │ 5 │ 1002074 │ 0 │ │ NullSource │ 5 │ 1002074 │ 0 │
│ NullSource │ 1 │ 1002084 │ 0 │ │ NullSource │ 1 │ 1002084 │ 0 │
│ SourceFromSingleChunk │ 45 │ 4736 │ 1000819 │ │ SourceFromSingleChunk │ 45 │ 4736 │ 1000819 │
└─────────────────────────┴────────────┴──────────────────────┴──────────────────────┘ └─────────────────────────┴────────────┴──────────────────────────────────────────────┘
``` ```
Here you can see: Here you can see:
- `ExpressionTransform` was executing `sleep(1)` function, so it `work` will takes 1e6, and so `elapsed_us` > 1e6. - `ExpressionTransform` was executing `sleep(1)` function, so it `work` will takes 1e6, and so `elapsed_us` > 1e6.
- `SourceFromSingleChunk` need to wait, because `ExpressionTransform` does not accept any data during execution of `sleep(1)`, so it will be in `PortFull` state for 1e6 us, and so `port_full_elapsed_us` > 1e6. - `SourceFromSingleChunk` need to wait, because `ExpressionTransform` does not accept any data during execution of `sleep(1)`, so it will be in `PortFull` state for 1e6 us, and so `output_wait_elapsed_us` > 1e6.
- `LimitsCheckingTransform`/`NullSource`/`LazyOutputFormat` need to wait until `ExpressionTransform` will execute `sleep(1)` to process the result, so `need_data_elapsed_us` > 1e6. - `LimitsCheckingTransform`/`NullSource`/`LazyOutputFormat` need to wait until `ExpressionTransform` will execute `sleep(1)` to process the result, so `input_wait_elapsed_us` > 1e6.
**See Also** **See Also**

View File

@ -26,8 +26,8 @@ NamesAndTypesList ProcessorProfileLogElement::getNamesAndTypes()
{"query_id", std::make_shared<DataTypeString>()}, {"query_id", std::make_shared<DataTypeString>()},
{"name", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())}, {"name", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())},
{"elapsed_us", std::make_shared<DataTypeUInt64>()}, {"elapsed_us", std::make_shared<DataTypeUInt64>()},
{"need_data_elapsed_us", std::make_shared<DataTypeUInt64>()}, {"input_wait_elapsed_us", std::make_shared<DataTypeUInt64>()},
{"port_full_elapsed_us", std::make_shared<DataTypeUInt64>()}, {"output_wait_elapsed_us", std::make_shared<DataTypeUInt64>()},
}; };
} }
@ -42,8 +42,8 @@ void ProcessorProfileLogElement::appendToBlock(MutableColumns & columns) const
columns[i++]->insertData(query_id.data(), query_id.size()); columns[i++]->insertData(query_id.data(), query_id.size());
columns[i++]->insertData(processor_name.data(), processor_name.size()); columns[i++]->insertData(processor_name.data(), processor_name.size());
columns[i++]->insert(elapsed_us); columns[i++]->insert(elapsed_us);
columns[i++]->insert(need_data_elapsed_us); columns[i++]->insert(input_wait_elapsed_us);
columns[i++]->insert(port_full_elapsed_us); columns[i++]->insert(output_wait_elapsed_us);
} }
ProcessorsProfileLog::ProcessorsProfileLog(ContextPtr context_, const String & database_name_, ProcessorsProfileLog::ProcessorsProfileLog(ContextPtr context_, const String & database_name_,

View File

@ -18,9 +18,9 @@ struct ProcessorProfileLogElement
/// Milliseconds spend in IProcessor::work() /// Milliseconds spend in IProcessor::work()
UInt32 elapsed_us{}; UInt32 elapsed_us{};
/// IProcessor::NeedData /// IProcessor::NeedData
UInt32 need_data_elapsed_us{}; UInt32 input_wait_elapsed_us{};
/// IProcessor::PortFull /// IProcessor::PortFull
UInt32 port_full_elapsed_us{}; UInt32 output_wait_elapsed_us{};
static std::string name() { return "ProcessorsProfileLog"; } static std::string name() { return "ProcessorsProfileLog"; }
static NamesAndTypesList getNamesAndTypes(); static NamesAndTypesList getNamesAndTypes();

View File

@ -881,8 +881,8 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
{ {
processor_elem.processor_name = processor->getName(); processor_elem.processor_name = processor->getName();
processor_elem.elapsed_us = processor->getElapsedUs(); processor_elem.elapsed_us = processor->getElapsedUs();
processor_elem.need_data_elapsed_us = processor->getNeedDataElapsedUs(); processor_elem.input_wait_elapsed_us = processor->getInputWaitElapsedUs();
processor_elem.port_full_elapsed_us = processor->getPortFullElapsedUs(); processor_elem.output_wait_elapsed_us = processor->getOutputWaitElapsedUs();
processors_profile_log->add(processor_elem); processors_profile_log->add(processor_elem);
} }
} }

View File

@ -271,21 +271,21 @@ bool ExecutingGraph::updateNode(uint64_t pid, Queue & queue, Queue & async_queue
/// NeedData /// NeedData
if (last_status != IProcessor::Status::NeedData && status == IProcessor::Status::NeedData) if (last_status != IProcessor::Status::NeedData && status == IProcessor::Status::NeedData)
{ {
processor.need_data_watch.restart(); processor.input_wait_watch.restart();
} }
else if (last_status == IProcessor::Status::NeedData && status != IProcessor::Status::NeedData) else if (last_status == IProcessor::Status::NeedData && status != IProcessor::Status::NeedData)
{ {
processor.need_data_elapsed_us += processor.need_data_watch.elapsedMicroseconds(); processor.input_wait_elapsed_us += processor.input_wait_watch.elapsedMicroseconds();
} }
/// PortFull /// PortFull
if (last_status != IProcessor::Status::PortFull && status == IProcessor::Status::PortFull) if (last_status != IProcessor::Status::PortFull && status == IProcessor::Status::PortFull)
{ {
processor.port_full_watch.restart(); processor.output_wait_watch.restart();
} }
else if (last_status == IProcessor::Status::PortFull && status != IProcessor::Status::PortFull) else if (last_status == IProcessor::Status::PortFull && status != IProcessor::Status::PortFull)
{ {
processor.port_full_elapsed_us += processor.port_full_watch.elapsedMicroseconds(); processor.output_wait_elapsed_us += processor.output_wait_watch.elapsedMicroseconds();
} }
} }
catch (...) catch (...)

View File

@ -15,16 +15,16 @@ namespace ErrorCodes
class PushingSource : public ISource class PushingSource : public ISource
{ {
public: public:
explicit PushingSource(const Block & header, std::atomic_bool & need_data_flag_) explicit PushingSource(const Block & header, std::atomic_bool & input_wait_flag_)
: ISource(header) : ISource(header)
, need_data_flag(need_data_flag_) , input_wait_flag(input_wait_flag_)
{} {}
String getName() const override { return "PushingSource"; } String getName() const override { return "PushingSource"; }
void setData(Chunk chunk) void setData(Chunk chunk)
{ {
need_data_flag = false; input_wait_flag = false;
data = std::move(chunk); data = std::move(chunk);
} }
@ -34,7 +34,7 @@ protected:
{ {
auto status = ISource::prepare(); auto status = ISource::prepare();
if (status == Status::Ready) if (status == Status::Ready)
need_data_flag = true; input_wait_flag = true;
return status; return status;
} }
@ -46,7 +46,7 @@ protected:
private: private:
Chunk data; Chunk data;
std::atomic_bool & need_data_flag; std::atomic_bool & input_wait_flag;
}; };
@ -55,7 +55,7 @@ PushingPipelineExecutor::PushingPipelineExecutor(QueryPipeline & pipeline_) : pi
if (!pipeline.pushing()) if (!pipeline.pushing())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Pipeline for PushingPipelineExecutor must be pushing"); throw Exception(ErrorCodes::LOGICAL_ERROR, "Pipeline for PushingPipelineExecutor must be pushing");
pushing_source = std::make_shared<PushingSource>(pipeline.input->getHeader(), need_data_flag); pushing_source = std::make_shared<PushingSource>(pipeline.input->getHeader(), input_wait_flag);
connect(pushing_source->getPort(), *pipeline.input); connect(pushing_source->getPort(), *pipeline.input);
pipeline.processors.emplace_back(pushing_source); pipeline.processors.emplace_back(pushing_source);
} }
@ -86,7 +86,7 @@ void PushingPipelineExecutor::start()
started = true; started = true;
executor = std::make_shared<PipelineExecutor>(pipeline.processors, pipeline.process_list_element); executor = std::make_shared<PipelineExecutor>(pipeline.processors, pipeline.process_list_element);
if (!executor->executeStep(&need_data_flag)) if (!executor->executeStep(&input_wait_flag))
throw Exception(ErrorCodes::LOGICAL_ERROR, throw Exception(ErrorCodes::LOGICAL_ERROR,
"Pipeline for PushingPipelineExecutor was finished before all data was inserted"); "Pipeline for PushingPipelineExecutor was finished before all data was inserted");
} }
@ -98,7 +98,7 @@ void PushingPipelineExecutor::push(Chunk chunk)
pushing_source->setData(std::move(chunk)); pushing_source->setData(std::move(chunk));
if (!executor->executeStep(&need_data_flag)) if (!executor->executeStep(&input_wait_flag))
throw Exception(ErrorCodes::LOGICAL_ERROR, throw Exception(ErrorCodes::LOGICAL_ERROR,
"Pipeline for PushingPipelineExecutor was finished before all data was inserted"); "Pipeline for PushingPipelineExecutor was finished before all data was inserted");
} }

View File

@ -47,7 +47,7 @@ public:
private: private:
QueryPipeline & pipeline; QueryPipeline & pipeline;
std::atomic_bool need_data_flag = false; std::atomic_bool input_wait_flag = false;
std::shared_ptr<PushingSource> pushing_source; std::shared_ptr<PushingSource> pushing_source;
PipelineExecutorPtr executor; PipelineExecutorPtr executor;

View File

@ -301,8 +301,8 @@ public:
size_t getQueryPlanStepGroup() const { return query_plan_step_group; } size_t getQueryPlanStepGroup() const { return query_plan_step_group; }
uint64_t getElapsedUs() const { return elapsed_us; } uint64_t getElapsedUs() const { return elapsed_us; }
uint64_t getNeedDataElapsedUs() const { return need_data_elapsed_us; } uint64_t getInputWaitElapsedUs() const { return input_wait_elapsed_us; }
uint64_t getPortFullElapsedUs() const { return port_full_elapsed_us; } uint64_t getOutputWaitElapsedUs() const { return output_wait_elapsed_us; }
protected: protected:
virtual void onCancel() {} virtual void onCancel() {}
@ -312,8 +312,8 @@ private:
/// - elapsed_us /// - elapsed_us
friend class ExecutionThreadContext; friend class ExecutionThreadContext;
/// For /// For
/// - need_data_elapsed_us /// - input_wait_elapsed_us
/// - port_full_elapsed_us /// - output_wait_elapsed_us
friend class ExecutingGraph; friend class ExecutingGraph;
std::atomic<bool> is_cancelled{false}; std::atomic<bool> is_cancelled{false};
@ -322,10 +322,10 @@ private:
/// For processors_profile_log /// For processors_profile_log
uint64_t elapsed_us = 0; uint64_t elapsed_us = 0;
Stopwatch need_data_watch; Stopwatch input_wait_watch;
uint64_t need_data_elapsed_us = 0; uint64_t input_wait_elapsed_us = 0;
Stopwatch port_full_watch; Stopwatch output_wait_watch;
uint64_t port_full_elapsed_us = 0; uint64_t output_wait_elapsed_us = 0;
size_t stream_number = NO_STREAM; size_t stream_number = NO_STREAM;

View File

@ -22,10 +22,10 @@ SELECT
name = 'ExpressionTransform', elapsed_us>1e6, name = 'ExpressionTransform', elapsed_us>1e6,
-- SourceFromSingleChunk, that feed data to ExpressionTransform, -- SourceFromSingleChunk, that feed data to ExpressionTransform,
-- will feed first block and then wait in PortFull. -- will feed first block and then wait in PortFull.
name = 'SourceFromSingleChunk', port_full_elapsed_us>1e6, name = 'SourceFromSingleChunk', output_wait_elapsed_us>1e6,
-- NullSource/LazyOutputFormatLazyOutputFormat are the outputs -- NullSource/LazyOutputFormatLazyOutputFormat are the outputs
-- so they cannot starts to execute before sleep(1) will be executed. -- so they cannot starts to execute before sleep(1) will be executed.
need_data_elapsed_us>1e6) input_wait_elapsed_us>1e6)
elapsed elapsed
FROM system.processors_profile_log FROM system.processors_profile_log
WHERE query_id = query_id_ WHERE query_id = query_id_

View File

@ -18,10 +18,10 @@ SELECT
name = 'ExpressionTransform', elapsed_us>1e6, name = 'ExpressionTransform', elapsed_us>1e6,
-- SourceFromSingleChunk, that feed data to ExpressionTransform, -- SourceFromSingleChunk, that feed data to ExpressionTransform,
-- will feed first block and then wait in PortFull. -- will feed first block and then wait in PortFull.
name = 'SourceFromSingleChunk', port_full_elapsed_us>1e6, name = 'SourceFromSingleChunk', output_wait_elapsed_us>1e6,
-- NullSource/LazyOutputFormatLazyOutputFormat are the outputs -- NullSource/LazyOutputFormatLazyOutputFormat are the outputs
-- so they cannot starts to execute before sleep(1) will be executed. -- so they cannot starts to execute before sleep(1) will be executed.
need_data_elapsed_us>1e6) input_wait_elapsed_us>1e6)
elapsed elapsed
FROM system.processors_profile_log FROM system.processors_profile_log
WHERE query_id = query_id_ WHERE query_id = query_id_