Add support for connection of partial result processors with multiple ports

This commit is contained in:
alexX512 2023-05-02 07:26:36 +00:00
parent c0c8fa7b8e
commit 2cd1b37f9b
8 changed files with 82 additions and 21 deletions

View File

@ -41,12 +41,15 @@ struct PullingAsyncPipelineExecutor::Data
}
};
PullingAsyncPipelineExecutor::PullingAsyncPipelineExecutor(QueryPipeline & pipeline_) : pipeline(pipeline_)
PullingAsyncPipelineExecutor::PullingAsyncPipelineExecutor(QueryPipeline & pipeline_, bool /*has_partial_result_setting*/) : pipeline(pipeline_)
{
if (!pipeline.pulling())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Pipeline for PullingAsyncPipelineExecutor must be pulling");
lazy_format = std::make_shared<LazyOutputFormat>(pipeline.output->getHeader());
// if (has_partial_result_setting)
// lazy_format->activatePartialResultProtocol();
pipeline.complete(lazy_format);
}

View File

@ -21,7 +21,7 @@ struct ProfileInfo;
class PullingAsyncPipelineExecutor
{
public:
explicit PullingAsyncPipelineExecutor(QueryPipeline & pipeline_);
explicit PullingAsyncPipelineExecutor(QueryPipeline & pipeline_, bool has_partial_result_setting = false);
~PullingAsyncPipelineExecutor();
/// Get structure of returned block or chunk.

View File

@ -129,7 +129,7 @@ void IOutputFormat::work()
case Main:
result_rows += current_chunk.getNumRows();
result_bytes += current_chunk.allocatedBytes();
if (!was_main_input && current_chunk.hasRows())
if (is_partial_result_protocol_active && !was_main_input && current_chunk.hasRows())
{
consume(Chunk(current_chunk.cloneEmptyColumns(), 0));
was_main_input = true;

View File

@ -105,6 +105,8 @@ public:
void clearLastLines(size_t lines_number);
void activatePartialResultProtocol() { is_partial_result_protocol_active = true; }
protected:
friend class ParallelFormattingOutputFormat;
@ -192,6 +194,8 @@ private:
size_t rows_read_before = 0;
bool are_totals_written = false;
bool is_partial_result_protocol_active = false;
/// Counters for consumed chunks. Are used for QueryLog.
size_t result_rows = 0;
size_t result_bytes = 0;

View File

@ -5,6 +5,7 @@
namespace DB
{
/// Currently support only single thread implementation with one input and one output ports
class LimitPartialResultTransform : public PartialResultTransform
{
public:

View File

@ -483,8 +483,7 @@ void Pipe::addTransform(ProcessorPtr transform, OutputPort * totals, OutputPort
if (extremes)
extremes_port = extremes;
/// TODO: Add support for partial result in multithreading mode
dropPartialResult();
addPartialResultTransform(transform);
size_t next_output = 0;
for (auto & input : inputs)
@ -573,8 +572,7 @@ void Pipe::addTransform(ProcessorPtr transform, InputPort * totals, InputPort *
extremes_port = nullptr;
}
/// TODO: Add support for partial result in multithreading mode
dropPartialResult();
addPartialResultTransform(transform);
bool found_totals = false;
bool found_extremes = false;
@ -627,11 +625,10 @@ void Pipe::addTransform(ProcessorPtr transform, InputPort * totals, InputPort *
addProcessor(std::move(transform));
max_parallel_streams = std::max<size_t>(max_parallel_streams, output_ports.size());
}
void Pipe::addPartialResultTransformIfNeeded(ProcessorPtr transform, size_t partial_result_port_id)
void Pipe::addPartialResultSimpleTransform(ProcessorPtr transform, size_t partial_result_port_id)
{
if (isPartialResultActive())
{
@ -643,23 +640,75 @@ void Pipe::addPartialResultTransformIfNeeded(ProcessorPtr transform, size_t part
return;
}
if (partial_result_port == nullptr)
{
auto source = std::make_shared<NullSource>(getHeader());
partial_result_port = &source->getPort();
addProcessor(std::move(source));
}
auto partial_result_transform = transform->getPartialResultProcessor(std::move(transform), partial_result_limit, partial_result_duration_ms);
connect(*partial_result_port, partial_result_transform->getInputs().front());
connectPartialResultPort(partial_result_port, partial_result_transform->getInputs().front());
partial_result_port = &partial_result_transform->getOutputs().front();
addProcessor(std::move(partial_result_transform));
}
}
void Pipe::addPartialResultTransform(ProcessorPtr transform)
{
if (isPartialResultActive())
{
size_t new_outputs_size = transform->getOutputs().size();
if (!transform->supportPartialResultProcessor())
{
for (auto & partial_result_port : partial_result_ports)
dropPort(partial_result_port, *processors, collected_processors);
partial_result_ports.assign(new_outputs_size, nullptr);
return;
}
auto partial_result_transform = transform->getPartialResultProcessor(std::move(transform), partial_result_limit, partial_result_duration_ms);
auto & inputs = partial_result_transform->getInputs();
if (inputs.size() != partial_result_ports.size())
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Cannot add transform {} to Pipe because it has {} input ports, but {} expected",
partial_result_transform->getName(),
inputs.size(),
partial_result_ports.size());
size_t next_port = 0;
for (auto & input : inputs)
{
connectPartialResultPort(partial_result_ports[next_port], input);
++next_port;
}
partial_result_ports.assign(new_outputs_size, nullptr);
next_port = 0;
for (auto & new_partial_result_port : partial_result_transform->getOutputs())
{
partial_result_ports[next_port] = &new_partial_result_port;
++next_port;
}
addProcessor(std::move(partial_result_transform));
}
}
void Pipe::connectPartialResultPort(OutputPort * partial_result_port, InputPort & partial_result_transform_port)
{
if (partial_result_port == nullptr)
{
auto source = std::make_shared<NullSource>(getHeader());
partial_result_port = &source->getPort();
addProcessor(std::move(source));
}
connect(*partial_result_port, partial_result_transform_port);
}
void Pipe::addSimpleTransform(const ProcessorGetterWithStreamKind & getter)
{
if (output_ports.empty())
@ -704,7 +753,7 @@ void Pipe::addSimpleTransform(const ProcessorGetterWithStreamKind & getter)
connect(*port, transform->getInputs().front());
port = &transform->getOutputs().front();
if (stream_type == StreamType::Main)
addPartialResultTransformIfNeeded(transform, partial_result_port_id);
addPartialResultSimpleTransform(transform, partial_result_port_id);
addProcessor(std::move(transform));
}
@ -832,6 +881,7 @@ void Pipe::setSinks(const Pipe::ProcessorGetterWithStreamKind & getter)
add_transform(totals_port, StreamType::Totals);
add_transform(extremes_port, StreamType::Extremes);
dropPartialResult();
output_ports.clear();
header.clear();

View File

@ -76,7 +76,9 @@ public:
void addTransform(ProcessorPtr transform, OutputPort * totals, OutputPort * extremes);
void addTransform(ProcessorPtr transform, InputPort * totals, InputPort * extremes);
void addPartialResultTransformIfNeeded(ProcessorPtr transform, size_t partial_result_port_id);
void addPartialResultTransform(ProcessorPtr transform);
void addPartialResultSimpleTransform(ProcessorPtr transform, size_t partial_result_port_id);
void connectPartialResultPort(OutputPort * partial_result_port, InputPort & partial_result_transform_port);
enum class StreamType
{

View File

@ -815,7 +815,8 @@ void TCPHandler::processOrdinaryQueryWithProcessors()
std::unique_lock progress_lock(task_callback_mutex, std::defer_lock);
{
PullingAsyncPipelineExecutor executor(pipeline);
bool has_partial_result_setting = query_context->getSettingsRef().partial_result_update_duration_ms.totalMilliseconds() > 0;
PullingAsyncPipelineExecutor executor(pipeline, has_partial_result_setting);
CurrentMetrics::Increment query_thread_metric_increment{CurrentMetrics::QueryThread};
Block block;