mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 15:12:02 +00:00
Merge pull request #11440 from ClickHouse/add-remote-source
Add remote source
This commit is contained in:
commit
595fc9eff6
@ -61,8 +61,8 @@ public:
|
|||||||
void cancel();
|
void cancel();
|
||||||
|
|
||||||
/// Get totals and extremes if any.
|
/// Get totals and extremes if any.
|
||||||
Block getTotals() const { return totals; }
|
Block getTotals() { return std::move(totals); }
|
||||||
Block getExtremes() const { return extremes; }
|
Block getExtremes() { return std::move(extremes); }
|
||||||
|
|
||||||
/// Set callback for progress. It will be called on Progress packet.
|
/// Set callback for progress. It will be called on Progress packet.
|
||||||
void setProgressCallback(ProgressCallback callback) { progress_callback = std::move(callback); }
|
void setProgressCallback(ProgressCallback callback) { progress_callback = std::move(callback); }
|
||||||
|
@ -1,8 +1,6 @@
|
|||||||
#include <Interpreters/ClusterProxy/SelectStreamFactory.h>
|
#include <Interpreters/ClusterProxy/SelectStreamFactory.h>
|
||||||
#include <Interpreters/InterpreterSelectQuery.h>
|
#include <Interpreters/InterpreterSelectQuery.h>
|
||||||
#include <DataStreams/RemoteBlockInputStream.h>
|
#include <DataStreams/RemoteBlockInputStream.h>
|
||||||
#include <DataStreams/MaterializingBlockInputStream.h>
|
|
||||||
#include <DataStreams/LazyBlockInputStream.h>
|
|
||||||
#include <Storages/StorageReplicatedMergeTree.h>
|
#include <Storages/StorageReplicatedMergeTree.h>
|
||||||
#include <Storages/VirtualColumnUtils.h>
|
#include <Storages/VirtualColumnUtils.h>
|
||||||
#include <Common/Exception.h>
|
#include <Common/Exception.h>
|
||||||
@ -13,9 +11,8 @@
|
|||||||
#include <common/logger_useful.h>
|
#include <common/logger_useful.h>
|
||||||
#include <Processors/Pipe.h>
|
#include <Processors/Pipe.h>
|
||||||
#include <Processors/Transforms/ConvertingTransform.h>
|
#include <Processors/Transforms/ConvertingTransform.h>
|
||||||
#include <Processors/Sources/SourceFromInputStream.h>
|
#include <Processors/Sources/RemoteSource.h>
|
||||||
#include <Processors/Executors/TreeExecutorBlockInputStream.h>
|
#include <Processors/Sources/DelayedSource.h>
|
||||||
#include <Processors/Executors/PipelineExecutingBlockInputStream.h>
|
|
||||||
|
|
||||||
namespace ProfileEvents
|
namespace ProfileEvents
|
||||||
{
|
{
|
||||||
@ -118,13 +115,13 @@ void SelectStreamFactory::createForShard(
|
|||||||
const SelectQueryInfo &,
|
const SelectQueryInfo &,
|
||||||
Pipes & res)
|
Pipes & res)
|
||||||
{
|
{
|
||||||
bool force_add_agg_info = processed_stage == QueryProcessingStage::WithMergeableState;
|
bool add_agg_info = processed_stage == QueryProcessingStage::WithMergeableState;
|
||||||
bool add_totals_port = false;
|
bool add_totals = false;
|
||||||
bool add_extremes_port = false;
|
bool add_extremes = false;
|
||||||
if (processed_stage == QueryProcessingStage::Complete)
|
if (processed_stage == QueryProcessingStage::Complete)
|
||||||
{
|
{
|
||||||
add_totals_port = query_ast->as<ASTSelectQuery &>().group_by_with_totals;
|
add_totals = query_ast->as<ASTSelectQuery &>().group_by_with_totals;
|
||||||
add_extremes_port = context.getSettingsRef().extremes;
|
add_extremes = context.getSettingsRef().extremes;
|
||||||
}
|
}
|
||||||
|
|
||||||
auto modified_query_ast = query_ast->clone();
|
auto modified_query_ast = query_ast->clone();
|
||||||
@ -140,20 +137,13 @@ void SelectStreamFactory::createForShard(
|
|||||||
|
|
||||||
auto emplace_remote_stream = [&]()
|
auto emplace_remote_stream = [&]()
|
||||||
{
|
{
|
||||||
auto stream = std::make_shared<RemoteBlockInputStream>(
|
auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
|
||||||
shard_info.pool, modified_query, header, context, nullptr, throttler, scalars, external_tables, processed_stage);
|
shard_info.pool, modified_query, header, context, nullptr, throttler, scalars, external_tables, processed_stage);
|
||||||
stream->setPoolMode(PoolMode::GET_MANY);
|
remote_query_executor->setPoolMode(PoolMode::GET_MANY);
|
||||||
if (!table_func_ptr)
|
if (!table_func_ptr)
|
||||||
stream->setMainTable(main_table);
|
remote_query_executor->setMainTable(main_table);
|
||||||
|
|
||||||
auto source = std::make_shared<SourceFromInputStream>(std::move(stream), force_add_agg_info);
|
res.emplace_back(createRemoteSourcePipe(remote_query_executor, add_agg_info, add_totals, add_extremes));
|
||||||
|
|
||||||
if (add_totals_port)
|
|
||||||
source->addTotalsPort();
|
|
||||||
if (add_extremes_port)
|
|
||||||
source->addExtremesPort();
|
|
||||||
|
|
||||||
res.emplace_back(std::move(source));
|
|
||||||
};
|
};
|
||||||
|
|
||||||
const auto & settings = context.getSettingsRef();
|
const auto & settings = context.getSettingsRef();
|
||||||
@ -246,8 +236,8 @@ void SelectStreamFactory::createForShard(
|
|||||||
auto lazily_create_stream = [
|
auto lazily_create_stream = [
|
||||||
pool = shard_info.pool, shard_num = shard_info.shard_num, modified_query, header = header, modified_query_ast, context, throttler,
|
pool = shard_info.pool, shard_num = shard_info.shard_num, modified_query, header = header, modified_query_ast, context, throttler,
|
||||||
main_table = main_table, table_func_ptr = table_func_ptr, scalars = scalars, external_tables = external_tables,
|
main_table = main_table, table_func_ptr = table_func_ptr, scalars = scalars, external_tables = external_tables,
|
||||||
stage = processed_stage, local_delay]()
|
stage = processed_stage, local_delay, add_agg_info, add_totals, add_extremes]()
|
||||||
-> BlockInputStreamPtr
|
-> Pipe
|
||||||
{
|
{
|
||||||
auto current_settings = context.getSettingsRef();
|
auto current_settings = context.getSettingsRef();
|
||||||
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(
|
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(
|
||||||
@ -277,8 +267,7 @@ void SelectStreamFactory::createForShard(
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (try_results.empty() || local_delay < max_remote_delay)
|
if (try_results.empty() || local_delay < max_remote_delay)
|
||||||
return std::make_shared<PipelineExecutingBlockInputStream>(
|
return createLocalStream(modified_query_ast, header, context, stage).getPipe();
|
||||||
createLocalStream(modified_query_ast, header, context, stage));
|
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
std::vector<IConnectionPool::Entry> connections;
|
std::vector<IConnectionPool::Entry> connections;
|
||||||
@ -286,20 +275,14 @@ void SelectStreamFactory::createForShard(
|
|||||||
for (auto & try_result : try_results)
|
for (auto & try_result : try_results)
|
||||||
connections.emplace_back(std::move(try_result.entry));
|
connections.emplace_back(std::move(try_result.entry));
|
||||||
|
|
||||||
return std::make_shared<RemoteBlockInputStream>(
|
auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
|
||||||
std::move(connections), modified_query, header, context, nullptr, throttler, scalars, external_tables, stage);
|
std::move(connections), modified_query, header, context, nullptr, throttler, scalars, external_tables, stage);
|
||||||
|
|
||||||
|
return createRemoteSourcePipe(remote_query_executor, add_agg_info, add_totals, add_extremes);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
auto lazy_stream = std::make_shared<LazyBlockInputStream>("LazyShardWithLocalReplica", header, lazily_create_stream);
|
res.emplace_back(createDelayedPipe(header, lazily_create_stream));
|
||||||
auto source = std::make_shared<SourceFromInputStream>(std::move(lazy_stream), force_add_agg_info);
|
|
||||||
|
|
||||||
if (add_totals_port)
|
|
||||||
source->addTotalsPort();
|
|
||||||
if (add_extremes_port)
|
|
||||||
source->addExtremesPort();
|
|
||||||
|
|
||||||
res.emplace_back(std::move(source));
|
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
emplace_remote_stream();
|
emplace_remote_stream();
|
||||||
|
@ -20,6 +20,7 @@
|
|||||||
#include <Common/CurrentThread.h>
|
#include <Common/CurrentThread.h>
|
||||||
#include <Processors/DelayedPortsProcessor.h>
|
#include <Processors/DelayedPortsProcessor.h>
|
||||||
#include <Processors/RowsBeforeLimitCounter.h>
|
#include <Processors/RowsBeforeLimitCounter.h>
|
||||||
|
#include <Processors/Sources/RemoteSource.h>
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
@ -673,8 +674,10 @@ void QueryPipeline::initRowsBeforeLimit()
|
|||||||
{
|
{
|
||||||
RowsBeforeLimitCounterPtr rows_before_limit_at_least;
|
RowsBeforeLimitCounterPtr rows_before_limit_at_least;
|
||||||
|
|
||||||
|
/// TODO: add setRowsBeforeLimitCounter as virtual method to IProcessor.
|
||||||
std::vector<LimitTransform *> limits;
|
std::vector<LimitTransform *> limits;
|
||||||
std::vector<SourceFromInputStream *> sources;
|
std::vector<SourceFromInputStream *> sources;
|
||||||
|
std::vector<RemoteSource *> remote_sources;
|
||||||
|
|
||||||
std::unordered_set<IProcessor *> visited;
|
std::unordered_set<IProcessor *> visited;
|
||||||
|
|
||||||
@ -705,6 +708,9 @@ void QueryPipeline::initRowsBeforeLimit()
|
|||||||
|
|
||||||
if (auto * source = typeid_cast<SourceFromInputStream *>(processor))
|
if (auto * source = typeid_cast<SourceFromInputStream *>(processor))
|
||||||
sources.emplace_back(source);
|
sources.emplace_back(source);
|
||||||
|
|
||||||
|
if (auto * source = typeid_cast<RemoteSource *>(processor))
|
||||||
|
remote_sources.emplace_back(source);
|
||||||
}
|
}
|
||||||
else if (auto * sorting = typeid_cast<PartialSortingTransform *>(processor))
|
else if (auto * sorting = typeid_cast<PartialSortingTransform *>(processor))
|
||||||
{
|
{
|
||||||
@ -735,7 +741,7 @@ void QueryPipeline::initRowsBeforeLimit()
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!rows_before_limit_at_least && (!limits.empty() || !sources.empty()))
|
if (!rows_before_limit_at_least && (!limits.empty() || !sources.empty() || !remote_sources.empty()))
|
||||||
{
|
{
|
||||||
rows_before_limit_at_least = std::make_shared<RowsBeforeLimitCounter>();
|
rows_before_limit_at_least = std::make_shared<RowsBeforeLimitCounter>();
|
||||||
|
|
||||||
@ -744,6 +750,9 @@ void QueryPipeline::initRowsBeforeLimit()
|
|||||||
|
|
||||||
for (auto & source : sources)
|
for (auto & source : sources)
|
||||||
source->setRowsBeforeLimitCounter(rows_before_limit_at_least);
|
source->setRowsBeforeLimitCounter(rows_before_limit_at_least);
|
||||||
|
|
||||||
|
for (auto & source : remote_sources)
|
||||||
|
source->setRowsBeforeLimitCounter(rows_before_limit_at_least);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// If there is a limit, then enable rows_before_limit_at_least
|
/// If there is a limit, then enable rows_before_limit_at_least
|
||||||
|
@ -15,6 +15,12 @@ public:
|
|||||||
rows_before_limit.fetch_add(rows, std::memory_order_release);
|
rows_before_limit.fetch_add(rows, std::memory_order_release);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void set(uint64_t rows)
|
||||||
|
{
|
||||||
|
setAppliedLimit();
|
||||||
|
rows_before_limit.store(rows, std::memory_order_release);
|
||||||
|
}
|
||||||
|
|
||||||
uint64_t get() const { return rows_before_limit.load(std::memory_order_acquire); }
|
uint64_t get() const { return rows_before_limit.load(std::memory_order_acquire); }
|
||||||
|
|
||||||
void setAppliedLimit() { has_applied_limit.store(true, std::memory_order_release); }
|
void setAppliedLimit() { has_applied_limit.store(true, std::memory_order_release); }
|
||||||
|
119
src/Processors/Sources/DelayedSource.cpp
Normal file
119
src/Processors/Sources/DelayedSource.cpp
Normal file
@ -0,0 +1,119 @@
|
|||||||
|
#include <Processors/Sources/DelayedSource.h>
|
||||||
|
#include "NullSource.h"
|
||||||
|
|
||||||
|
namespace DB
|
||||||
|
{
|
||||||
|
|
||||||
|
DelayedSource::DelayedSource(const Block & header, Creator processors_creator)
|
||||||
|
: IProcessor({}, OutputPorts(3, header))
|
||||||
|
, creator(std::move(processors_creator))
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
IProcessor::Status DelayedSource::prepare()
|
||||||
|
{
|
||||||
|
/// At first, wait for main input is needed and expand pipeline.
|
||||||
|
if (inputs.empty())
|
||||||
|
{
|
||||||
|
auto & first_output = outputs.front();
|
||||||
|
|
||||||
|
/// If main port was finished before callback was called, stop execution.
|
||||||
|
if (first_output.isFinished())
|
||||||
|
{
|
||||||
|
for (auto & output : outputs)
|
||||||
|
output.finish();
|
||||||
|
|
||||||
|
return Status::Finished;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!first_output.isNeeded())
|
||||||
|
return Status::PortFull;
|
||||||
|
|
||||||
|
/// Call creator callback to get processors.
|
||||||
|
if (processors.empty())
|
||||||
|
return Status::Ready;
|
||||||
|
|
||||||
|
return Status::ExpandPipeline;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Process ports in order: main, totals, extremes
|
||||||
|
auto output = outputs.begin();
|
||||||
|
for (auto input = inputs.begin(); input != inputs.end(); ++input, ++output)
|
||||||
|
{
|
||||||
|
if (output->isFinished())
|
||||||
|
{
|
||||||
|
input->close();
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!output->isNeeded())
|
||||||
|
return Status::PortFull;
|
||||||
|
|
||||||
|
if (input->isFinished())
|
||||||
|
{
|
||||||
|
output->finish();
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
input->setNeeded();
|
||||||
|
if (!input->hasData())
|
||||||
|
return Status::PortFull;
|
||||||
|
|
||||||
|
output->pushData(input->pullData(true));
|
||||||
|
return Status::PortFull;
|
||||||
|
}
|
||||||
|
|
||||||
|
return Status::Finished;
|
||||||
|
}
|
||||||
|
|
||||||
|
void DelayedSource::work()
|
||||||
|
{
|
||||||
|
auto pipe = creator();
|
||||||
|
|
||||||
|
main_output = &pipe.getPort();
|
||||||
|
totals_output = pipe.getTotalsPort();
|
||||||
|
extremes_output = pipe.getExtremesPort();
|
||||||
|
|
||||||
|
processors = std::move(pipe).detachProcessors();
|
||||||
|
|
||||||
|
if (!totals_output)
|
||||||
|
{
|
||||||
|
processors.emplace_back(std::make_shared<NullSource>(main_output->getHeader()));
|
||||||
|
totals_output = &processors.back()->getOutputs().back();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!extremes_output)
|
||||||
|
{
|
||||||
|
processors.emplace_back(std::make_shared<NullSource>(main_output->getHeader()));
|
||||||
|
extremes_output = &processors.back()->getOutputs().back();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Processors DelayedSource::expandPipeline()
|
||||||
|
{
|
||||||
|
/// Add new inputs. They must have the same header as output.
|
||||||
|
for (const auto & output : {main_output, totals_output, extremes_output})
|
||||||
|
{
|
||||||
|
inputs.emplace_back(outputs.front().getHeader(), this);
|
||||||
|
/// Connect checks that header is same for ports.
|
||||||
|
connect(*output, inputs.back());
|
||||||
|
inputs.back().setNeeded();
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Executor will check that all processors are connected.
|
||||||
|
return std::move(processors);
|
||||||
|
}
|
||||||
|
|
||||||
|
Pipe createDelayedPipe(const Block & header, DelayedSource::Creator processors_creator)
|
||||||
|
{
|
||||||
|
auto source = std::make_shared<DelayedSource>(header, std::move(processors_creator));
|
||||||
|
|
||||||
|
Pipe pipe(&source->getPort(DelayedSource::Main));
|
||||||
|
pipe.setTotalsPort(&source->getPort(DelayedSource::Totals));
|
||||||
|
pipe.setExtremesPort(&source->getPort(DelayedSource::Extremes));
|
||||||
|
|
||||||
|
pipe.addProcessors({std::move(source)});
|
||||||
|
return pipe;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
45
src/Processors/Sources/DelayedSource.h
Normal file
45
src/Processors/Sources/DelayedSource.h
Normal file
@ -0,0 +1,45 @@
|
|||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include <Processors/IProcessor.h>
|
||||||
|
#include <Processors/Pipe.h>
|
||||||
|
|
||||||
|
namespace DB
|
||||||
|
{
|
||||||
|
|
||||||
|
/// DelayedSource delays pipeline calculation until it starts execution.
|
||||||
|
/// It accepts callback which creates a new pipe.
|
||||||
|
///
|
||||||
|
/// First time when DelayedSource's main output port needs data, callback is called.
|
||||||
|
/// Then, DelayedSource expands pipeline: adds new inputs and connects pipe with it.
|
||||||
|
/// Then, DelayedSource just move data from inputs to outputs until finished.
|
||||||
|
///
|
||||||
|
/// It main output port of DelayedSource is never needed, callback won't be called.
|
||||||
|
class DelayedSource : public IProcessor
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
using Creator = std::function<Pipe()>;
|
||||||
|
|
||||||
|
DelayedSource(const Block & header, Creator processors_creator);
|
||||||
|
String getName() const override { return "Delayed"; }
|
||||||
|
|
||||||
|
Status prepare() override;
|
||||||
|
void work() override;
|
||||||
|
Processors expandPipeline() override;
|
||||||
|
|
||||||
|
enum PortKind { Main = 0, Totals = 1, Extremes = 2 };
|
||||||
|
OutputPort & getPort(PortKind kind) { return *std::next(outputs.begin(), kind); }
|
||||||
|
|
||||||
|
private:
|
||||||
|
Creator creator;
|
||||||
|
Processors processors;
|
||||||
|
|
||||||
|
/// Outputs from returned pipe.
|
||||||
|
OutputPort * main_output = nullptr;
|
||||||
|
OutputPort * totals_output = nullptr;
|
||||||
|
OutputPort * extremes_output = nullptr;
|
||||||
|
};
|
||||||
|
|
||||||
|
/// Creates pipe from DelayedSource.
|
||||||
|
Pipe createDelayedPipe(const Block & header, DelayedSource::Creator processors_creator);
|
||||||
|
|
||||||
|
}
|
132
src/Processors/Sources/RemoteSource.cpp
Normal file
132
src/Processors/Sources/RemoteSource.cpp
Normal file
@ -0,0 +1,132 @@
|
|||||||
|
#include <Processors/Sources/RemoteSource.h>
|
||||||
|
#include <DataStreams/RemoteQueryExecutor.h>
|
||||||
|
#include <Processors/Transforms/AggregatingTransform.h>
|
||||||
|
#include <DataTypes/DataTypeAggregateFunction.h>
|
||||||
|
|
||||||
|
namespace DB
|
||||||
|
{
|
||||||
|
|
||||||
|
RemoteSource::RemoteSource(RemoteQueryExecutorPtr executor, bool add_aggregation_info_)
|
||||||
|
: SourceWithProgress(executor->getHeader(), false)
|
||||||
|
, add_aggregation_info(add_aggregation_info_), query_executor(std::move(executor))
|
||||||
|
{
|
||||||
|
/// Add AggregatedChunkInfo if we expect DataTypeAggregateFunction as a result.
|
||||||
|
const auto & sample = getPort().getHeader();
|
||||||
|
for (auto & type : sample.getDataTypes())
|
||||||
|
if (typeid_cast<const DataTypeAggregateFunction *>(type.get()))
|
||||||
|
add_aggregation_info = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
RemoteSource::~RemoteSource() = default;
|
||||||
|
|
||||||
|
Chunk RemoteSource::generate()
|
||||||
|
{
|
||||||
|
if (!was_query_sent)
|
||||||
|
{
|
||||||
|
/// Progress method will be called on Progress packet.
|
||||||
|
query_executor->setProgressCallback([this](const Progress & value) { progress(value); });
|
||||||
|
|
||||||
|
/// Get rows_before_limit result for remote query from ProfileInfo packet.
|
||||||
|
query_executor->setProfileInfoCallback([this](const BlockStreamProfileInfo & info)
|
||||||
|
{
|
||||||
|
if (rows_before_limit && info.hasAppliedLimit())
|
||||||
|
rows_before_limit->set(info.getRowsBeforeLimit());
|
||||||
|
});
|
||||||
|
|
||||||
|
query_executor->sendQuery();
|
||||||
|
|
||||||
|
was_query_sent = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
auto block = query_executor->read();
|
||||||
|
|
||||||
|
if (!block)
|
||||||
|
{
|
||||||
|
query_executor->finish();
|
||||||
|
return {};
|
||||||
|
}
|
||||||
|
|
||||||
|
UInt64 num_rows = block.rows();
|
||||||
|
Chunk chunk(block.getColumns(), num_rows);
|
||||||
|
|
||||||
|
if (add_aggregation_info)
|
||||||
|
{
|
||||||
|
auto info = std::make_shared<AggregatedChunkInfo>();
|
||||||
|
info->bucket_num = block.info.bucket_num;
|
||||||
|
info->is_overflows = block.info.is_overflows;
|
||||||
|
chunk.setChunkInfo(std::move(info));
|
||||||
|
}
|
||||||
|
|
||||||
|
return chunk;
|
||||||
|
}
|
||||||
|
|
||||||
|
void RemoteSource::onCancel()
|
||||||
|
{
|
||||||
|
query_executor->cancel();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
RemoteTotalsSource::RemoteTotalsSource(RemoteQueryExecutorPtr executor)
|
||||||
|
: ISource(executor->getHeader())
|
||||||
|
, query_executor(std::move(executor))
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
RemoteTotalsSource::~RemoteTotalsSource() = default;
|
||||||
|
|
||||||
|
Chunk RemoteTotalsSource::generate()
|
||||||
|
{
|
||||||
|
if (auto block = query_executor->getTotals())
|
||||||
|
{
|
||||||
|
UInt64 num_rows = block.rows();
|
||||||
|
return Chunk(block.getColumns(), num_rows);
|
||||||
|
}
|
||||||
|
|
||||||
|
return {};
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
RemoteExtremesSource::RemoteExtremesSource(RemoteQueryExecutorPtr executor)
|
||||||
|
: ISource(executor->getHeader())
|
||||||
|
, query_executor(std::move(executor))
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
RemoteExtremesSource::~RemoteExtremesSource() = default;
|
||||||
|
|
||||||
|
Chunk RemoteExtremesSource::generate()
|
||||||
|
{
|
||||||
|
if (auto block = query_executor->getExtremes())
|
||||||
|
{
|
||||||
|
UInt64 num_rows = block.rows();
|
||||||
|
return Chunk(block.getColumns(), num_rows);
|
||||||
|
}
|
||||||
|
|
||||||
|
return {};
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
Pipe createRemoteSourcePipe(
|
||||||
|
RemoteQueryExecutorPtr query_executor,
|
||||||
|
bool add_aggregation_info, bool add_totals, bool add_extremes)
|
||||||
|
{
|
||||||
|
Pipe pipe(std::make_shared<RemoteSource>(query_executor, add_aggregation_info));
|
||||||
|
|
||||||
|
if (add_totals)
|
||||||
|
{
|
||||||
|
auto totals_source = std::make_shared<RemoteTotalsSource>(query_executor);
|
||||||
|
pipe.setTotalsPort(&totals_source->getPort());
|
||||||
|
pipe.addProcessors({std::move(totals_source)});
|
||||||
|
}
|
||||||
|
|
||||||
|
if (add_extremes)
|
||||||
|
{
|
||||||
|
auto extremes_source = std::make_shared<RemoteExtremesSource>(query_executor);
|
||||||
|
pipe.setExtremesPort(&extremes_source->getPort());
|
||||||
|
pipe.addProcessors({std::move(extremes_source)});
|
||||||
|
}
|
||||||
|
|
||||||
|
return pipe;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
82
src/Processors/Sources/RemoteSource.h
Normal file
82
src/Processors/Sources/RemoteSource.h
Normal file
@ -0,0 +1,82 @@
|
|||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include <Processors/Sources/SourceWithProgress.h>
|
||||||
|
#include <Processors/RowsBeforeLimitCounter.h>
|
||||||
|
#include <Processors/Pipe.h>
|
||||||
|
|
||||||
|
namespace DB
|
||||||
|
{
|
||||||
|
|
||||||
|
class RemoteQueryExecutor;
|
||||||
|
using RemoteQueryExecutorPtr = std::shared_ptr<RemoteQueryExecutor>;
|
||||||
|
|
||||||
|
/// Source from RemoteQueryExecutor. Executes remote query and returns query result chunks.
|
||||||
|
class RemoteSource : public SourceWithProgress
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
/// Flag add_aggregation_info tells if AggregatedChunkInfo should be added to result chunk.
|
||||||
|
/// AggregatedChunkInfo stores the bucket number used for two-level aggregation.
|
||||||
|
/// This flag should be typically enabled for queries with GROUP BY which are executed till WithMergeableState.
|
||||||
|
RemoteSource(RemoteQueryExecutorPtr executor, bool add_aggregation_info_);
|
||||||
|
~RemoteSource() override;
|
||||||
|
|
||||||
|
String getName() const override { return "Remote"; }
|
||||||
|
|
||||||
|
void setRowsBeforeLimitCounter(RowsBeforeLimitCounterPtr counter) { rows_before_limit.swap(counter); }
|
||||||
|
|
||||||
|
/// Stop reading from stream if output port is finished.
|
||||||
|
void onUpdatePorts() override
|
||||||
|
{
|
||||||
|
if (getPort().isFinished())
|
||||||
|
cancel();
|
||||||
|
}
|
||||||
|
|
||||||
|
protected:
|
||||||
|
Chunk generate() override;
|
||||||
|
void onCancel() override;
|
||||||
|
|
||||||
|
private:
|
||||||
|
bool was_query_sent = false;
|
||||||
|
bool add_aggregation_info = false;
|
||||||
|
RemoteQueryExecutorPtr query_executor;
|
||||||
|
RowsBeforeLimitCounterPtr rows_before_limit;
|
||||||
|
};
|
||||||
|
|
||||||
|
/// Totals source from RemoteQueryExecutor.
|
||||||
|
class RemoteTotalsSource : public ISource
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
explicit RemoteTotalsSource(RemoteQueryExecutorPtr executor);
|
||||||
|
~RemoteTotalsSource() override;
|
||||||
|
|
||||||
|
String getName() const override { return "RemoteTotals"; }
|
||||||
|
|
||||||
|
protected:
|
||||||
|
Chunk generate() override;
|
||||||
|
|
||||||
|
private:
|
||||||
|
RemoteQueryExecutorPtr query_executor;
|
||||||
|
};
|
||||||
|
|
||||||
|
/// Extremes source from RemoteQueryExecutor.
|
||||||
|
class RemoteExtremesSource : public ISource
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
explicit RemoteExtremesSource(RemoteQueryExecutorPtr executor);
|
||||||
|
~RemoteExtremesSource() override;
|
||||||
|
|
||||||
|
String getName() const override { return "RemoteExtremes"; }
|
||||||
|
|
||||||
|
protected:
|
||||||
|
Chunk generate() override;
|
||||||
|
|
||||||
|
private:
|
||||||
|
RemoteQueryExecutorPtr query_executor;
|
||||||
|
};
|
||||||
|
|
||||||
|
/// Create pipe with remote sources.
|
||||||
|
Pipe createRemoteSourcePipe(
|
||||||
|
RemoteQueryExecutorPtr query_executor,
|
||||||
|
bool add_aggregation_info, bool add_totals, bool add_extremes);
|
||||||
|
|
||||||
|
}
|
@ -12,6 +12,11 @@ namespace ErrorCodes
|
|||||||
extern const int TOO_MANY_BYTES;
|
extern const int TOO_MANY_BYTES;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SourceWithProgress::SourceWithProgress(Block header, bool enable_auto_progress)
|
||||||
|
: ISourceWithProgress(header), auto_progress(enable_auto_progress)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
void SourceWithProgress::work()
|
void SourceWithProgress::work()
|
||||||
{
|
{
|
||||||
if (!limits.speed_limits.checkTimeLimit(total_stopwatch.elapsed(), limits.timeout_overflow_mode))
|
if (!limits.speed_limits.checkTimeLimit(total_stopwatch.elapsed(), limits.timeout_overflow_mode))
|
||||||
@ -24,7 +29,7 @@ void SourceWithProgress::work()
|
|||||||
|
|
||||||
ISourceWithProgress::work();
|
ISourceWithProgress::work();
|
||||||
|
|
||||||
if (!was_progress_called && has_input)
|
if (auto_progress && !was_progress_called && has_input)
|
||||||
progress({ current_chunk.chunk.getNumRows(), current_chunk.chunk.bytes() });
|
progress({ current_chunk.chunk.getNumRows(), current_chunk.chunk.bytes() });
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -44,6 +44,8 @@ class SourceWithProgress : public ISourceWithProgress
|
|||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
using ISourceWithProgress::ISourceWithProgress;
|
using ISourceWithProgress::ISourceWithProgress;
|
||||||
|
/// If enable_auto_progress flag is set, progress() will be automatically called on each generated chunk.
|
||||||
|
SourceWithProgress(Block header, bool enable_auto_progress);
|
||||||
|
|
||||||
using LocalLimits = IBlockInputStream::LocalLimits;
|
using LocalLimits = IBlockInputStream::LocalLimits;
|
||||||
using LimitsMode = IBlockInputStream::LimitsMode;
|
using LimitsMode = IBlockInputStream::LimitsMode;
|
||||||
@ -76,6 +78,9 @@ private:
|
|||||||
/// This flag checks if progress() was manually called at generate() call.
|
/// This flag checks if progress() was manually called at generate() call.
|
||||||
/// If not, it will be called for chunk after generate() was finished.
|
/// If not, it will be called for chunk after generate() was finished.
|
||||||
bool was_progress_called = false;
|
bool was_progress_called = false;
|
||||||
|
|
||||||
|
/// If enabled, progress() will be automatically called on each generated chunk.
|
||||||
|
bool auto_progress = true;
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -106,9 +106,11 @@ SRCS(
|
|||||||
Port.cpp
|
Port.cpp
|
||||||
QueryPipeline.cpp
|
QueryPipeline.cpp
|
||||||
ResizeProcessor.cpp
|
ResizeProcessor.cpp
|
||||||
|
Sources/DelayedSource.cpp
|
||||||
Sources/SinkToOutputStream.cpp
|
Sources/SinkToOutputStream.cpp
|
||||||
Sources/SourceFromInputStream.cpp
|
Sources/SourceFromInputStream.cpp
|
||||||
Sources/SourceWithProgress.cpp
|
Sources/SourceWithProgress.cpp
|
||||||
|
Sources/RemoteSource.cpp
|
||||||
Transforms/AddingMissedTransform.cpp
|
Transforms/AddingMissedTransform.cpp
|
||||||
Transforms/AddingSelectorTransform.cpp
|
Transforms/AddingSelectorTransform.cpp
|
||||||
Transforms/AggregatingTransform.cpp
|
Transforms/AggregatingTransform.cpp
|
||||||
|
Loading…
Reference in New Issue
Block a user