Merge pull request #13290 from ClickHouse/fix-totals-for-delayed-replica

Fix totals for delayed replica
This commit is contained in:
Nikolai Kochetov 2020-08-04 11:26:09 +03:00 committed by GitHub
commit 389f895b43
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 72 additions and 24 deletions

View File

@ -285,7 +285,7 @@ void SelectStreamFactory::createForShard(
}
};
res.emplace_back(createDelayedPipe(header, lazily_create_stream));
res.emplace_back(createDelayedPipe(header, lazily_create_stream, add_totals, add_extremes));
}
else
emplace_remote_stream();

View File

@ -1,13 +1,30 @@
#include <Processors/Sources/DelayedSource.h>
#include "NullSource.h"
#include <Processors/Sources/NullSource.h>
#include <Processors/NullSink.h>
namespace DB
{
DelayedSource::DelayedSource(const Block & header, Creator processors_creator)
: IProcessor({}, OutputPorts(3, header))
DelayedSource::DelayedSource(const Block & header, Creator processors_creator, bool add_totals_port, bool add_extremes_port)
: IProcessor({}, OutputPorts(1 + (add_totals_port ? 1 : 0) + (add_extremes_port ? 1 : 0), header))
, creator(std::move(processors_creator))
{
auto output = outputs.begin();
main = &*output;
++output;
if (add_totals_port)
{
totals = &*output;
++output;
}
if (add_extremes_port)
{
extremes = &*output;
++output;
}
}
IProcessor::Status DelayedSource::prepare()
@ -66,6 +83,31 @@ IProcessor::Status DelayedSource::prepare()
return Status::Finished;
}
/// Fix port from returned pipe. Create source_port if created or drop if source_port is null.
void synchronizePorts(OutputPort *& pipe_port, OutputPort * source_port, const Block & header, Processors & processors)
{
if (source_port)
{
/// Need port in DelayedSource. Create NullSource.
if (!pipe_port)
{
processors.emplace_back(std::make_shared<NullSource>(header));
pipe_port = &processors.back()->getOutputs().back();
}
}
else
{
/// Has port in pipe, but don't need it. Create NullSink.
if (pipe_port)
{
auto sink = std::make_shared<NullSink>(header);
connect(*pipe_port, sink->getPort());
processors.emplace_back(std::move(sink));
pipe_port = nullptr;
}
}
}
void DelayedSource::work()
{
auto pipe = creator();
@ -76,17 +118,8 @@ void DelayedSource::work()
processors = std::move(pipe).detachProcessors();
if (!totals_output)
{
processors.emplace_back(std::make_shared<NullSource>(main_output->getHeader()));
totals_output = &processors.back()->getOutputs().back();
}
if (!extremes_output)
{
processors.emplace_back(std::make_shared<NullSource>(main_output->getHeader()));
extremes_output = &processors.back()->getOutputs().back();
}
synchronizePorts(totals_output, totals, main->getHeader(), processors);
synchronizePorts(extremes_output, extremes, main->getHeader(), processors);
}
Processors DelayedSource::expandPipeline()
@ -94,6 +127,9 @@ Processors DelayedSource::expandPipeline()
/// Add new inputs. They must have the same header as output.
for (const auto & output : {main_output, totals_output, extremes_output})
{
if (!output)
continue;
inputs.emplace_back(outputs.front().getHeader(), this);
/// Connect checks that header is same for ports.
connect(*output, inputs.back());
@ -104,13 +140,13 @@ Processors DelayedSource::expandPipeline()
return std::move(processors);
}
Pipe createDelayedPipe(const Block & header, DelayedSource::Creator processors_creator)
Pipe createDelayedPipe(const Block & header, DelayedSource::Creator processors_creator, bool add_totals_port, bool add_extremes_port)
{
auto source = std::make_shared<DelayedSource>(header, std::move(processors_creator));
auto source = std::make_shared<DelayedSource>(header, std::move(processors_creator), add_totals_port, add_extremes_port);
Pipe pipe(&source->getPort(DelayedSource::Main));
pipe.setTotalsPort(&source->getPort(DelayedSource::Totals));
pipe.setExtremesPort(&source->getPort(DelayedSource::Extremes));
Pipe pipe(&source->getPort());
pipe.setTotalsPort(source->getTotalsPort());
pipe.setExtremesPort(source->getExtremesPort());
pipe.addProcessors({std::move(source)});
return pipe;

View File

@ -19,20 +19,26 @@ class DelayedSource : public IProcessor
public:
using Creator = std::function<Pipe()>;
DelayedSource(const Block & header, Creator processors_creator);
DelayedSource(const Block & header, Creator processors_creator, bool add_totals_port, bool add_extremes_port);
String getName() const override { return "Delayed"; }
Status prepare() override;
void work() override;
Processors expandPipeline() override;
enum PortKind { Main = 0, Totals = 1, Extremes = 2 };
OutputPort & getPort(PortKind kind) { return *std::next(outputs.begin(), kind); }
OutputPort & getPort() { return *main; }
OutputPort * getTotalsPort() { return totals; }
OutputPort * getExtremesPort() { return extremes; }
private:
Creator creator;
Processors processors;
/// Outputs for DelayedSource.
OutputPort * main = nullptr;
OutputPort * totals = nullptr;
OutputPort * extremes = nullptr;
/// Outputs from returned pipe.
OutputPort * main_output = nullptr;
OutputPort * totals_output = nullptr;
@ -40,6 +46,6 @@ private:
};
/// Creates pipe from DelayedSource.
Pipe createDelayedPipe(const Block & header, DelayedSource::Creator processors_creator);
Pipe createDelayedPipe(const Block & header, DelayedSource::Creator processors_creator, bool add_totals_port, bool add_extremes_port);
}

View File

@ -69,6 +69,12 @@ SELECT sum(x) FROM distributed SETTINGS
max_replica_delay_for_distributed_queries=1
''').strip() == '3'
assert instance_with_dist_table.query('''
SELECT sum(x) FROM distributed WITH TOTALS SETTINGS
load_balancing='in_order',
max_replica_delay_for_distributed_queries=1
''').strip() == '3\n\n3'
pm.drop_instance_zk_connections(node_1_2)
pm.drop_instance_zk_connections(node_2_2)