This commit is contained in:
Nikita Taranov 2024-07-01 16:38:39 +01:00
parent 31eeeae4fd
commit adcaf117a1
3 changed files with 9 additions and 9 deletions

View File

@ -382,7 +382,7 @@ Pipe ReadFromMergeTree::readFromPoolParallelReplicas(
pool, std::move(algorithm), prewhere_info,
actions_settings, block_size_copy, reader_settings);
auto source = std::make_shared<MergeTreeSource>(std::move(processor));
auto source = std::make_shared<MergeTreeSource>(std::move(processor), data.getLogName());
pipes.emplace_back(std::move(source));
}
@ -481,7 +481,7 @@ Pipe ReadFromMergeTree::readFromPool(
pool, std::move(algorithm), prewhere_info,
actions_settings, block_size_copy, reader_settings);
auto source = std::make_shared<MergeTreeSource>(std::move(processor));
auto source = std::make_shared<MergeTreeSource>(std::move(processor), data.getLogName());
if (i == 0)
source->addTotalRowsApprox(total_rows);
@ -593,7 +593,7 @@ Pipe ReadFromMergeTree::readInOrder(
processor->addPartLevelToChunk(isQueryWithFinal());
auto source = std::make_shared<MergeTreeSource>(std::move(processor));
auto source = std::make_shared<MergeTreeSource>(std::move(processor), data.getLogName());
if (set_rows_approx)
source->addTotalRowsApprox(total_rows);

View File

@ -133,9 +133,8 @@ private:
};
#endif
MergeTreeSource::MergeTreeSource(MergeTreeSelectProcessorPtr processor_)
: ISource(processor_->getHeader())
, processor(std::move(processor_))
MergeTreeSource::MergeTreeSource(MergeTreeSelectProcessorPtr processor_, const std::string & log_name_)
: ISource(processor_->getHeader()), processor(std::move(processor_)), log_name(log_name_)
{
#if defined(OS_LINUX)
if (processor->getSettings().use_asynchronous_read_from_pool)
@ -207,7 +206,7 @@ std::optional<Chunk> MergeTreeSource::tryGenerate()
try
{
OpenTelemetry::SpanHolder span{"MergeTreeSource::tryGenerate()"};
OpenTelemetry::SpanHolder span{fmt::format("MergeTreeSource({})::tryGenerate", log_name)};
holder->setResult(processor->read());
}
catch (...)
@ -222,7 +221,7 @@ std::optional<Chunk> MergeTreeSource::tryGenerate()
}
#endif
OpenTelemetry::SpanHolder span{"MergeTreeSource::tryGenerate()"};
OpenTelemetry::SpanHolder span{fmt::format("MergeTreeSource({})::tryGenerate", log_name)};
return processReadResult(processor->read());
}

View File

@ -12,7 +12,7 @@ struct ChunkAndProgress;
class MergeTreeSource final : public ISource
{
public:
explicit MergeTreeSource(MergeTreeSelectProcessorPtr processor_);
explicit MergeTreeSource(MergeTreeSelectProcessorPtr processor_, const std::string & log_name_);
~MergeTreeSource() override;
std::string getName() const override;
@ -30,6 +30,7 @@ protected:
private:
MergeTreeSelectProcessorPtr processor;
const std::string log_name;
#if defined(OS_LINUX)
struct AsyncReadingState;