mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-28 18:42:26 +00:00
Added DelayedSource.
This commit is contained in:
parent
83b6467308
commit
f54f948162
@ -1,8 +1,6 @@
|
||||
#include <Interpreters/ClusterProxy/SelectStreamFactory.h>
|
||||
#include <Interpreters/InterpreterSelectQuery.h>
|
||||
#include <DataStreams/RemoteBlockInputStream.h>
|
||||
#include <DataStreams/MaterializingBlockInputStream.h>
|
||||
#include <DataStreams/LazyBlockInputStream.h>
|
||||
#include <Storages/StorageReplicatedMergeTree.h>
|
||||
#include <Storages/VirtualColumnUtils.h>
|
||||
#include <Common/Exception.h>
|
||||
@ -13,9 +11,8 @@
|
||||
#include <common/logger_useful.h>
|
||||
#include <Processors/Pipe.h>
|
||||
#include <Processors/Transforms/ConvertingTransform.h>
|
||||
#include <Processors/Sources/SourceFromInputStream.h>
|
||||
#include <Processors/Executors/TreeExecutorBlockInputStream.h>
|
||||
#include <Processors/Executors/PipelineExecutingBlockInputStream.h>
|
||||
#include <Processors/Sources/RemoteSource.h>
|
||||
#include <Processors/Sources/DelayedSource.h>
|
||||
|
||||
namespace ProfileEvents
|
||||
{
|
||||
@ -118,13 +115,13 @@ void SelectStreamFactory::createForShard(
|
||||
const SelectQueryInfo &,
|
||||
Pipes & res)
|
||||
{
|
||||
bool force_add_agg_info = processed_stage == QueryProcessingStage::WithMergeableState;
|
||||
bool add_totals_port = false;
|
||||
bool add_extremes_port = false;
|
||||
bool add_agg_info = processed_stage == QueryProcessingStage::WithMergeableState;
|
||||
bool add_totals = false;
|
||||
bool add_extremes = false;
|
||||
if (processed_stage == QueryProcessingStage::Complete)
|
||||
{
|
||||
add_totals_port = query_ast->as<ASTSelectQuery &>().group_by_with_totals;
|
||||
add_extremes_port = context.getSettingsRef().extremes;
|
||||
add_totals = query_ast->as<ASTSelectQuery &>().group_by_with_totals;
|
||||
add_extremes = context.getSettingsRef().extremes;
|
||||
}
|
||||
|
||||
auto modified_query_ast = query_ast->clone();
|
||||
@ -140,20 +137,13 @@ void SelectStreamFactory::createForShard(
|
||||
|
||||
auto emplace_remote_stream = [&]()
|
||||
{
|
||||
auto stream = std::make_shared<RemoteBlockInputStream>(
|
||||
auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
|
||||
shard_info.pool, modified_query, header, context, nullptr, throttler, scalars, external_tables, processed_stage);
|
||||
stream->setPoolMode(PoolMode::GET_MANY);
|
||||
remote_query_executor->setPoolMode(PoolMode::GET_MANY);
|
||||
if (!table_func_ptr)
|
||||
stream->setMainTable(main_table);
|
||||
remote_query_executor->setMainTable(main_table);
|
||||
|
||||
auto source = std::make_shared<SourceFromInputStream>(std::move(stream), force_add_agg_info);
|
||||
|
||||
if (add_totals_port)
|
||||
source->addTotalsPort();
|
||||
if (add_extremes_port)
|
||||
source->addExtremesPort();
|
||||
|
||||
res.emplace_back(std::move(source));
|
||||
res.emplace_back(createRemoteSourcePipe(remote_query_executor, add_agg_info, add_totals, add_extremes));
|
||||
};
|
||||
|
||||
const auto & settings = context.getSettingsRef();
|
||||
@ -246,8 +236,8 @@ void SelectStreamFactory::createForShard(
|
||||
auto lazily_create_stream = [
|
||||
pool = shard_info.pool, shard_num = shard_info.shard_num, modified_query, header = header, modified_query_ast, context, throttler,
|
||||
main_table = main_table, table_func_ptr = table_func_ptr, scalars = scalars, external_tables = external_tables,
|
||||
stage = processed_stage, local_delay]()
|
||||
-> BlockInputStreamPtr
|
||||
stage = processed_stage, local_delay, add_agg_info, add_totals, add_extremes]()
|
||||
-> Pipe
|
||||
{
|
||||
auto current_settings = context.getSettingsRef();
|
||||
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(
|
||||
@ -277,8 +267,7 @@ void SelectStreamFactory::createForShard(
|
||||
}
|
||||
|
||||
if (try_results.empty() || local_delay < max_remote_delay)
|
||||
return std::make_shared<PipelineExecutingBlockInputStream>(
|
||||
createLocalStream(modified_query_ast, header, context, stage));
|
||||
return createLocalStream(modified_query_ast, header, context, stage).getPipe();
|
||||
else
|
||||
{
|
||||
std::vector<IConnectionPool::Entry> connections;
|
||||
@ -286,20 +275,14 @@ void SelectStreamFactory::createForShard(
|
||||
for (auto & try_result : try_results)
|
||||
connections.emplace_back(std::move(try_result.entry));
|
||||
|
||||
return std::make_shared<RemoteBlockInputStream>(
|
||||
auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
|
||||
std::move(connections), modified_query, header, context, nullptr, throttler, scalars, external_tables, stage);
|
||||
|
||||
return createRemoteSourcePipe(remote_query_executor, add_agg_info, add_totals, add_extremes);
|
||||
}
|
||||
};
|
||||
|
||||
auto lazy_stream = std::make_shared<LazyBlockInputStream>("LazyShardWithLocalReplica", header, lazily_create_stream);
|
||||
auto source = std::make_shared<SourceFromInputStream>(std::move(lazy_stream), force_add_agg_info);
|
||||
|
||||
if (add_totals_port)
|
||||
source->addTotalsPort();
|
||||
if (add_extremes_port)
|
||||
source->addExtremesPort();
|
||||
|
||||
res.emplace_back(std::move(source));
|
||||
res.emplace_back(createDelayedPipe(header, lazily_create_stream));
|
||||
}
|
||||
else
|
||||
emplace_remote_stream();
|
||||
|
@ -1,6 +1,119 @@
|
||||
#include <Processors/Sources/DelayedSource.h>
|
||||
#include "NullSource.h"
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
DelayedSource::DelayedSource(const Block & header, Creator processors_creator)
|
||||
: IProcessor({}, OutputPorts(3, header))
|
||||
, creator(std::move(processors_creator))
|
||||
{
|
||||
}
|
||||
|
||||
IProcessor::Status DelayedSource::prepare()
|
||||
{
|
||||
/// At first, wait for main input is needed and expand pipeline.
|
||||
if (inputs.empty())
|
||||
{
|
||||
auto & first_output = outputs.front();
|
||||
|
||||
/// If main port was finished before callback was called, stop execution.
|
||||
if (first_output.isFinished())
|
||||
{
|
||||
for (auto & output : outputs)
|
||||
output.finish();
|
||||
|
||||
return Status::Finished;
|
||||
}
|
||||
|
||||
if (!first_output.isNeeded())
|
||||
return Status::PortFull;
|
||||
|
||||
/// Call creator callback to get processors.
|
||||
if (processors.empty())
|
||||
return Status::Ready;
|
||||
|
||||
return Status::ExpandPipeline;
|
||||
}
|
||||
|
||||
/// Process ports in order: main, totals, extremes
|
||||
auto output = outputs.begin();
|
||||
for (auto & input : inputs)
|
||||
{
|
||||
if (output->isFinished())
|
||||
{
|
||||
input.close();
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!output->isNeeded())
|
||||
return Status::PortFull;
|
||||
|
||||
if (input.isFinished())
|
||||
{
|
||||
output->finish();
|
||||
continue;
|
||||
}
|
||||
|
||||
input.setNeeded();
|
||||
if (!input.hasData())
|
||||
return Status::PortFull;
|
||||
|
||||
output->pushData(input.pullData(true));
|
||||
return Status::PortFull;
|
||||
}
|
||||
|
||||
return Status::Finished;
|
||||
}
|
||||
|
||||
void DelayedSource::work()
|
||||
{
|
||||
auto pipe = creator();
|
||||
|
||||
main_output = &pipe.getPort();
|
||||
totals_output = pipe.getTotalsPort();
|
||||
extremes_output = pipe.getExtremesPort();
|
||||
|
||||
processors = std::move(pipe).detachProcessors();
|
||||
|
||||
if (!totals_output)
|
||||
{
|
||||
processors.emplace_back(std::make_shared<NullSource>(main_output->getHeader()));
|
||||
totals_output = &processors.back()->getOutputs().back();
|
||||
}
|
||||
|
||||
if (!extremes_output)
|
||||
{
|
||||
processors.emplace_back(std::make_shared<NullSource>(main_output->getHeader()));
|
||||
extremes_output = &processors.back()->getOutputs().back();
|
||||
}
|
||||
}
|
||||
|
||||
Processors DelayedSource::expandPipeline()
|
||||
{
|
||||
/// Add new inputs. They must have the same header as output.
|
||||
for (const auto & output : {main_output, totals_output, extremes_output})
|
||||
{
|
||||
inputs.emplace_back(outputs.front().getHeader(), this);
|
||||
/// Connect checks that header is same for ports.
|
||||
connect(*output, inputs.back());
|
||||
inputs.back().setNeeded();
|
||||
}
|
||||
|
||||
/// Executor will check that all processors are connected.
|
||||
return std::move(processors);
|
||||
}
|
||||
|
||||
Pipe createDelayedPipe(const Block & header, DelayedSource::Creator processors_creator)
|
||||
{
|
||||
auto source = std::make_shared<DelayedSource>(header, std::move(processors_creator));
|
||||
|
||||
Pipe pipe(&source->getPort(DelayedSource::Main));
|
||||
pipe.setTotalsPort(&source->getPort(DelayedSource::Totals));
|
||||
pipe.setExtremesPort(&source->getPort(DelayedSource::Extremes));
|
||||
|
||||
pipe.addProcessors({std::move(source)});
|
||||
return pipe;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -1,23 +1,45 @@
|
||||
#pragma once
|
||||
|
||||
#include <Processors/IProcessor.h>
|
||||
#include <Processors/Pipe.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
/// DelayedSource delays pipeline calculation until it starts execution.
|
||||
/// It accepts callback which creates a new pipe.
|
||||
///
|
||||
/// First time when DelayedSource's main output port needs data, callback is called.
|
||||
/// Then, DelayedSource expands pipeline: adds new inputs and connects pipe with it.
|
||||
/// Then, DelayedSource just move data from inputs to outputs until finished.
|
||||
///
|
||||
/// It main output port of DelayedSource is never needed, callback won't be called.
|
||||
class DelayedSource : public IProcessor
|
||||
{
|
||||
public:
|
||||
using Creator = std::function<Processors()>;
|
||||
using Creator = std::function<Pipe()>;
|
||||
|
||||
DelayedSource(Block header, Creator processors_creator);
|
||||
DelayedSource(const Block & header, Creator processors_creator);
|
||||
String getName() const override { return "Delayed"; }
|
||||
|
||||
Status prepare() override;
|
||||
void work() override;
|
||||
Processors expandPipeline() override;
|
||||
|
||||
enum PortKind { Main = 0, Totals = 1, Extremes = 2 };
|
||||
OutputPort & getPort(PortKind kind) { return *std::next(outputs.begin(), kind); }
|
||||
|
||||
private:
|
||||
Creator creator;
|
||||
Processors processors;
|
||||
|
||||
/// Outputs from returned pipe.
|
||||
OutputPort * main_output = nullptr;
|
||||
OutputPort * totals_output = nullptr;
|
||||
OutputPort * extremes_output = nullptr;
|
||||
};
|
||||
|
||||
/// Creates pipe from DelayedSource.
|
||||
Pipe createDelayedPipe(const Block & header, DelayedSource::Creator processors_creator);
|
||||
|
||||
}
|
||||
|
@ -66,42 +66,67 @@ void RemoteSource::onCancel()
|
||||
}
|
||||
|
||||
|
||||
RemoteTotalsSource::RemoteTotalsSource(Block header) : ISource(std::move(header)) {}
|
||||
RemoteTotalsSource::RemoteTotalsSource(RemoteQueryExecutorPtr executor)
|
||||
: ISource(executor->getHeader())
|
||||
, query_executor(std::move(executor))
|
||||
{
|
||||
}
|
||||
|
||||
RemoteTotalsSource::~RemoteTotalsSource() = default;
|
||||
|
||||
Chunk RemoteTotalsSource::generate()
|
||||
{
|
||||
/// Check use_count instead of comparing with nullptr just in case.
|
||||
/// setQueryExecutor() may be called from other thread, but there shouldn't be any race,
|
||||
/// because totals end extremes are always read after main data.
|
||||
if (query_executor.use_count())
|
||||
if (auto block = query_executor->getTotals())
|
||||
{
|
||||
if (auto block = query_executor->getTotals())
|
||||
{
|
||||
UInt64 num_rows = block.rows();
|
||||
return Chunk(block.getColumns(), num_rows);
|
||||
}
|
||||
UInt64 num_rows = block.rows();
|
||||
return Chunk(block.getColumns(), num_rows);
|
||||
}
|
||||
|
||||
return {};
|
||||
}
|
||||
|
||||
|
||||
RemoteExtremesSource::RemoteExtremesSource(Block header) : ISource(std::move(header)) {}
|
||||
RemoteExtremesSource::RemoteExtremesSource(RemoteQueryExecutorPtr executor)
|
||||
: ISource(executor->getHeader())
|
||||
, query_executor(std::move(executor))
|
||||
{
|
||||
}
|
||||
|
||||
RemoteExtremesSource::~RemoteExtremesSource() = default;
|
||||
|
||||
Chunk RemoteExtremesSource::generate()
|
||||
{
|
||||
if (query_executor.use_count())
|
||||
if (auto block = query_executor->getExtremes())
|
||||
{
|
||||
if (auto block = query_executor->getExtremes())
|
||||
{
|
||||
UInt64 num_rows = block.rows();
|
||||
return Chunk(block.getColumns(), num_rows);
|
||||
}
|
||||
UInt64 num_rows = block.rows();
|
||||
return Chunk(block.getColumns(), num_rows);
|
||||
}
|
||||
|
||||
return {};
|
||||
}
|
||||
|
||||
|
||||
Pipe createRemoteSourcePipe(
|
||||
RemoteQueryExecutorPtr query_executor,
|
||||
bool add_aggregation_info, bool add_totals, bool add_extremes)
|
||||
{
|
||||
Pipe pipe(std::make_shared<RemoteSource>(query_executor, add_aggregation_info));
|
||||
|
||||
if (add_totals)
|
||||
{
|
||||
auto totals_source = std::make_shared<RemoteTotalsSource>(query_executor);
|
||||
pipe.setTotalsPort(&totals_source->getPort());
|
||||
pipe.addProcessors({std::move(totals_source)});
|
||||
}
|
||||
|
||||
if (add_extremes)
|
||||
{
|
||||
auto extremes_source = std::make_shared<RemoteExtremesSource>(query_executor);
|
||||
pipe.setExtremesPort(&extremes_source->getPort());
|
||||
pipe.addProcessors({std::move(extremes_source)});
|
||||
}
|
||||
|
||||
return pipe;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -2,6 +2,7 @@
|
||||
|
||||
#include <Processors/Sources/SourceWithProgress.h>
|
||||
#include <Processors/RowsBeforeLimitCounter.h>
|
||||
#include <Processors/Pipe.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -45,13 +46,11 @@ private:
|
||||
class RemoteTotalsSource : public ISource
|
||||
{
|
||||
public:
|
||||
explicit RemoteTotalsSource(Block header);
|
||||
explicit RemoteTotalsSource(RemoteQueryExecutorPtr executor);
|
||||
~RemoteTotalsSource();
|
||||
|
||||
String getName() const override { return "RemoteTotals"; }
|
||||
|
||||
void setQueryExecutor(RemoteQueryExecutorPtr executor) { query_executor.swap(executor); }
|
||||
|
||||
protected:
|
||||
Chunk generate() override;
|
||||
|
||||
@ -63,13 +62,11 @@ private:
|
||||
class RemoteExtremesSource : public ISource
|
||||
{
|
||||
public:
|
||||
explicit RemoteExtremesSource(Block header);
|
||||
explicit RemoteExtremesSource(RemoteQueryExecutorPtr executor);
|
||||
~RemoteExtremesSource();
|
||||
|
||||
String getName() const override { return "RemoteExtremes"; }
|
||||
|
||||
void setQueryExecutor(RemoteQueryExecutorPtr executor) { query_executor.swap(executor); }
|
||||
|
||||
protected:
|
||||
Chunk generate() override;
|
||||
|
||||
@ -77,4 +74,9 @@ private:
|
||||
RemoteQueryExecutorPtr query_executor;
|
||||
};
|
||||
|
||||
/// Create pipe with remote sources.
|
||||
Pipe createRemoteSourcePipe(
|
||||
RemoteQueryExecutorPtr query_executor,
|
||||
bool add_aggregation_info, bool add_totals, bool add_extremes);
|
||||
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user