Merge pull request #54476 from ClickHouse/revert-54470-revert-48607-master

Revert "Revert "Add settings for real-time updates during query execution""
This commit is contained in:
Nikolai Kochetov 2023-09-11 14:59:13 +02:00 committed by GitHub
commit 59108f3d96
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
67 changed files with 1791 additions and 774 deletions

View File

@ -4644,6 +4644,14 @@ SELECT toFloat64('1.7091'), toFloat64('1.5008753E7') SETTINGS precise_float_pars
└─────────────────────┴──────────────────────────┘
```
## partial_result_update_duration_ms
Interval (in milliseconds) for sending updates with partial data about the result table to the client (in interactive mode) during query execution. Setting to 0 disables partial results. Only supported for single-threaded GROUP BY without key, ORDER BY, LIMIT and OFFSET.
## max_rows_in_partial_result
Maximum rows to show in the partial result after every real-time update while the query runs (use partial result limit + OFFSET as a value in case of OFFSET in the query).
## validate_tcp_client_information {#validate-tcp-client-information}
Determines whether validation of client information enabled when query packet is received from a client using a TCP connection.

View File

@ -441,7 +441,20 @@ void ClientBase::onData(Block & block, ASTPtr parsed_query)
if (!block)
return;
processed_rows += block.rows();
if (block.rows() == 0 && partial_result_mode == PartialResultMode::Active)
{
partial_result_mode = PartialResultMode::Inactive;
if (is_interactive)
{
progress_indication.clearProgressOutput(*tty_buf);
std::cout << "Full result:" << std::endl;
progress_indication.writeProgress(*tty_buf);
}
}
if (partial_result_mode == PartialResultMode::Inactive)
processed_rows += block.rows();
/// Even if all blocks are empty, we still need to initialize the output stream to write empty resultset.
initOutputFormat(block, parsed_query);
@ -451,13 +464,20 @@ void ClientBase::onData(Block & block, ASTPtr parsed_query)
if (block.rows() == 0 || (query_fuzzer_runs != 0 && processed_rows >= 100))
return;
if (!is_interactive && partial_result_mode == PartialResultMode::Active)
return;
/// If results are written INTO OUTFILE, we can avoid clearing progress to avoid flicker.
if (need_render_progress && tty_buf && (!select_into_file || select_into_file_and_stdout))
progress_indication.clearProgressOutput(*tty_buf);
try
{
output_format->write(materializeBlock(block));
if (partial_result_mode == PartialResultMode::Active)
output_format->writePartialResult(materializeBlock(block));
else
output_format->write(materializeBlock(block));
written_first_block = true;
}
catch (const Exception &)
@ -521,6 +541,9 @@ void ClientBase::onProfileInfo(const ProfileInfo & profile_info)
void ClientBase::initOutputFormat(const Block & block, ASTPtr parsed_query)
try
{
if (partial_result_mode == PartialResultMode::NotInit)
partial_result_mode = PartialResultMode::Active;
if (!output_format)
{
/// Ignore all results when fuzzing as they can be huge.
@ -931,6 +954,14 @@ void ClientBase::processOrdinaryQuery(const String & query_to_execute, ASTPtr pa
const auto & settings = global_context->getSettingsRef();
const Int32 signals_before_stop = settings.partial_result_on_first_cancel ? 2 : 1;
bool has_partial_result_setting = settings.partial_result_update_duration_ms.totalMilliseconds() > 0;
if (has_partial_result_setting)
{
partial_result_mode = PartialResultMode::NotInit;
if (is_interactive)
std::cout << "Partial result:" << std::endl;
}
int retries_left = 10;
while (retries_left)
@ -1736,6 +1767,7 @@ void ClientBase::processParsedSingleQuery(const String & full_query, const Strin
}
processed_rows = 0;
partial_result_mode = PartialResultMode::Inactive;
written_first_block = false;
progress_indication.resetProgress();
profile_events.watch.restart();

View File

@ -272,6 +272,21 @@ protected:
size_t processed_rows = 0; /// How many rows have been read or written.
bool print_num_processed_rows = false; /// Whether to print the number of processed rows at
enum class PartialResultMode: UInt8
{
/// Query doesn't show partial result before the first block with 0 rows.
/// The first block with 0 rows initializes the output table format using its header.
NotInit,
/// Query shows partial result after the first and before the second block with 0 rows.
/// The second block with 0 rows indicates that that receiving blocks with partial result has been completed and next blocks will be with the full result.
Active,
/// Query doesn't show partial result at all.
Inactive,
};
PartialResultMode partial_result_mode = PartialResultMode::Inactive;
bool print_stack_trace = false;
/// The last exception that was received from the server. Is used for the
/// return code in batch mode.

View File

@ -309,6 +309,9 @@ class IColumn;
\
M(Bool, partial_result_on_first_cancel, false, "Allows query to return a partial result after cancel.", 0) \
\
M(Milliseconds, partial_result_update_duration_ms, 0, "Interval (in milliseconds) for sending updates with partial data about the result table to the client (in interactive mode) during query execution. Setting to 0 disables partial results. Only supported for single-threaded GROUP BY without key, ORDER BY, LIMIT and OFFSET.", 0) \
M(UInt64, max_rows_in_partial_result, 10, "Maximum rows to show in the partial result after every real-time update while the query runs (use partial result limit + OFFSET as a value in case of OFFSET in the query).", 0) \
\
M(Bool, ignore_on_cluster_for_replicated_udf_queries, false, "Ignore ON CLUSTER clause for replicated UDF management queries.", 0) \
M(Bool, ignore_on_cluster_for_replicated_access_entities_queries, false, "Ignore ON CLUSTER clause for replicated access entities management queries.", 0) \
/** Settings for testing hedged requests */ \

View File

@ -2272,6 +2272,29 @@ Block Aggregator::prepareBlockAndFillWithoutKey(AggregatedDataVariants & data_va
return block;
}
Block Aggregator::prepareBlockAndFillWithoutKeySnapshot(AggregatedDataVariants & data_variants) const
{
size_t rows = 1;
bool final = true;
auto && out_cols
= prepareOutputBlockColumns(params, aggregate_functions, getHeader(final), data_variants.aggregates_pools, final, rows);
auto && [key_columns, raw_key_columns, aggregate_columns, final_aggregate_columns, aggregate_columns_data] = out_cols;
AggregatedDataWithoutKey & data = data_variants.without_key;
/// Always single-thread. It's safe to pass current arena from 'aggregates_pool'.
for (size_t insert_i = 0; insert_i < params.aggregates_size; ++insert_i)
aggregate_functions[insert_i]->insertResultInto(
data + offsets_of_aggregate_states[insert_i],
*final_aggregate_columns[insert_i],
data_variants.aggregates_pool);
Block block = finalizeBlock(params, getHeader(final), std::move(out_cols), final, rows);
return block;
}
template <bool return_single_block>
Aggregator::ConvertToBlockRes<return_single_block>
Aggregator::prepareBlockAndFillSingleLevel(AggregatedDataVariants & data_variants, bool final) const

View File

@ -1213,6 +1213,7 @@ private:
friend class ConvertingAggregatedToChunksSource;
friend class ConvertingAggregatedToChunksWithMergingSource;
friend class AggregatingInOrderTransform;
friend class AggregatingPartialResultTransform;
/// Data structure of source blocks.
Block header;
@ -1394,6 +1395,7 @@ private:
std::atomic<bool> * is_cancelled = nullptr) const;
Block prepareBlockAndFillWithoutKey(AggregatedDataVariants & data_variants, bool final, bool is_overflows) const;
Block prepareBlockAndFillWithoutKeySnapshot(AggregatedDataVariants & data_variants) const;
BlocksList prepareBlocksAndFillTwoLevel(AggregatedDataVariants & data_variants, bool final, ThreadPool * thread_pool) const;
template <bool return_single_block>

View File

@ -14,7 +14,8 @@ namespace ErrorCodes
extern const int POSITION_OUT_OF_BOUND;
}
Chunk::Chunk(DB::Columns columns_, UInt64 num_rows_) : columns(std::move(columns_)), num_rows(num_rows_)
Chunk::Chunk(DB::Columns columns_, UInt64 num_rows_)
: columns(std::move(columns_)), num_rows(num_rows_)
{
checkNumRowsIsConsistent();
}

View File

@ -75,7 +75,7 @@ void CompletedPipelineExecutor::execute()
if (interactive_timeout_ms)
{
data = std::make_unique<Data>();
data->executor = std::make_shared<PipelineExecutor>(pipeline.processors, pipeline.process_list_element);
data->executor = std::make_shared<PipelineExecutor>(pipeline.processors, pipeline.process_list_element, pipeline.partial_result_duration_ms);
data->executor->setReadProgressCallback(pipeline.getReadProgressCallback());
/// Avoid passing this to lambda, copy ptr to data instead.
@ -105,7 +105,7 @@ void CompletedPipelineExecutor::execute()
}
else
{
PipelineExecutor executor(pipeline.processors, pipeline.process_list_element);
PipelineExecutor executor(pipeline.processors, pipeline.process_list_element, pipeline.partial_result_duration_ms);
executor.setReadProgressCallback(pipeline.getReadProgressCallback());
executor.execute(pipeline.getNumThreads(), pipeline.getConcurrencyControl());
}

View File

@ -260,7 +260,6 @@ bool ExecutingGraph::updateNode(uint64_t pid, Queue & queue, Queue & async_queue
{
pid = updated_processors.top();
updated_processors.pop();
/// In this method we have ownership on node.
auto & node = *nodes[pid];

View File

@ -30,6 +30,12 @@ private:
/// Callback for read progress.
ReadProgressCallback * read_progress_callback = nullptr;
/// Timer that stops optimization of running local tasks instead of queuing them.
/// It provides local progress for each IProcessor task, allowing the partial result of the request to be always sended to the user.
Stopwatch watch;
/// Time period that limits the maximum allowed duration for optimizing the scheduling of local tasks within the executor
const UInt64 partial_result_duration_ms;
public:
#ifndef NDEBUG
/// Time for different processing stages.
@ -62,8 +68,13 @@ public:
void setException(std::exception_ptr exception_) { exception = exception_; }
void rethrowExceptionIfHas();
explicit ExecutionThreadContext(size_t thread_number_, bool profile_processors_, bool trace_processors_, ReadProgressCallback * callback)
bool needWatchRestartForPartialResultProgress() { return partial_result_duration_ms != 0 && partial_result_duration_ms < watch.elapsedMilliseconds(); }
void restartWatch() { watch.restart(); }
explicit ExecutionThreadContext(size_t thread_number_, bool profile_processors_, bool trace_processors_, ReadProgressCallback * callback, UInt64 partial_result_duration_ms_)
: read_progress_callback(callback)
, watch(CLOCK_MONOTONIC)
, partial_result_duration_ms(partial_result_duration_ms_)
, thread_number(thread_number_)
, profile_processors(profile_processors_)
, trace_processors(trace_processors_)

View File

@ -108,8 +108,15 @@ void ExecutorTasks::pushTasks(Queue & queue, Queue & async_queue, ExecutionThrea
{
context.setTask(nullptr);
/// Take local task from queue if has one.
if (!queue.empty() && !context.hasAsyncTasks())
/// If sending partial results is allowed and local tasks scheduling optimization is repeated longer than the limit
/// or new task need to send partial result later, skip optimization for this iteration.
/// Otherwise take local task from queue if has one.
if ((!queue.empty() && queue.front()->processor->isPartialResultProcessor())
|| context.needWatchRestartForPartialResultProgress())
{
context.restartWatch();
}
else if (!queue.empty() && !context.hasAsyncTasks())
{
context.setTask(queue.front());
queue.pop();
@ -139,7 +146,7 @@ void ExecutorTasks::pushTasks(Queue & queue, Queue & async_queue, ExecutionThrea
}
}
void ExecutorTasks::init(size_t num_threads_, size_t use_threads_, bool profile_processors, bool trace_processors, ReadProgressCallback * callback)
void ExecutorTasks::init(size_t num_threads_, size_t use_threads_, bool profile_processors, bool trace_processors, ReadProgressCallback * callback, UInt64 partial_result_duration_ms)
{
num_threads = num_threads_;
use_threads = use_threads_;
@ -151,7 +158,7 @@ void ExecutorTasks::init(size_t num_threads_, size_t use_threads_, bool profile_
executor_contexts.reserve(num_threads);
for (size_t i = 0; i < num_threads; ++i)
executor_contexts.emplace_back(std::make_unique<ExecutionThreadContext>(i, profile_processors, trace_processors, callback));
executor_contexts.emplace_back(std::make_unique<ExecutionThreadContext>(i, profile_processors, trace_processors, callback, partial_result_duration_ms));
}
}

View File

@ -58,7 +58,7 @@ public:
void tryGetTask(ExecutionThreadContext & context);
void pushTasks(Queue & queue, Queue & async_queue, ExecutionThreadContext & context);
void init(size_t num_threads_, size_t use_threads_, bool profile_processors, bool trace_processors, ReadProgressCallback * callback);
void init(size_t num_threads_, size_t use_threads_, bool profile_processors, bool trace_processors, ReadProgressCallback * callback, UInt64 partial_result_duration_ms);
void fill(Queue & queue);
void upscale(size_t use_threads_);

View File

@ -33,8 +33,9 @@ namespace ErrorCodes
}
PipelineExecutor::PipelineExecutor(std::shared_ptr<Processors> & processors, QueryStatusPtr elem)
PipelineExecutor::PipelineExecutor(std::shared_ptr<Processors> & processors, QueryStatusPtr elem, UInt64 partial_result_duration_ms_)
: process_list_element(std::move(elem))
, partial_result_duration_ms(partial_result_duration_ms_)
{
if (process_list_element)
{
@ -328,7 +329,7 @@ void PipelineExecutor::initializeExecution(size_t num_threads, bool concurrency_
Queue queue;
graph->initializeExecution(queue);
tasks.init(num_threads, use_threads, profile_processors, trace_processors, read_progress_callback.get());
tasks.init(num_threads, use_threads, profile_processors, trace_processors, read_progress_callback.get(), partial_result_duration_ms);
tasks.fill(queue);
if (num_threads > 1)

View File

@ -33,7 +33,7 @@ public:
/// During pipeline execution new processors can appear. They will be added to existing set.
///
/// Explicit graph representation is built in constructor. Throws if graph is not correct.
explicit PipelineExecutor(std::shared_ptr<Processors> & processors, QueryStatusPtr elem);
explicit PipelineExecutor(std::shared_ptr<Processors> & processors, QueryStatusPtr elem, UInt64 partial_result_duration_ms_ = 0);
~PipelineExecutor();
/// Execute pipeline in multiple threads. Must be called once.
@ -90,6 +90,9 @@ private:
ReadProgressCallbackPtr read_progress_callback;
/// Duration between sending partial result through the pipeline
const UInt64 partial_result_duration_ms;
using Queue = std::queue<ExecutingGraph::Node *>;
void initializeExecution(size_t num_threads, bool concurrency_control); /// Initialize executor contexts and task_queue.

View File

@ -41,12 +41,13 @@ struct PullingAsyncPipelineExecutor::Data
}
};
PullingAsyncPipelineExecutor::PullingAsyncPipelineExecutor(QueryPipeline & pipeline_) : pipeline(pipeline_)
PullingAsyncPipelineExecutor::PullingAsyncPipelineExecutor(QueryPipeline & pipeline_, bool has_partial_result_setting) : pipeline(pipeline_)
{
if (!pipeline.pulling())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Pipeline for PullingAsyncPipelineExecutor must be pulling");
lazy_format = std::make_shared<LazyOutputFormat>(pipeline.output->getHeader());
lazy_format = std::make_shared<LazyOutputFormat>(pipeline.output->getHeader(), /*is_partial_result_protocol_active*/ has_partial_result_setting);
pipeline.complete(lazy_format);
}
@ -103,7 +104,7 @@ bool PullingAsyncPipelineExecutor::pull(Chunk & chunk, uint64_t milliseconds)
if (!data)
{
data = std::make_unique<Data>();
data->executor = std::make_shared<PipelineExecutor>(pipeline.processors, pipeline.process_list_element);
data->executor = std::make_shared<PipelineExecutor>(pipeline.processors, pipeline.process_list_element, pipeline.partial_result_duration_ms);
data->executor->setReadProgressCallback(pipeline.getReadProgressCallback());
data->lazy_format = lazy_format.get();

View File

@ -21,7 +21,7 @@ struct ProfileInfo;
class PullingAsyncPipelineExecutor
{
public:
explicit PullingAsyncPipelineExecutor(QueryPipeline & pipeline_);
explicit PullingAsyncPipelineExecutor(QueryPipeline & pipeline_, bool has_partial_result_setting = false);
~PullingAsyncPipelineExecutor();
/// Get structure of returned block or chunk.

View File

@ -44,7 +44,7 @@ bool PullingPipelineExecutor::pull(Chunk & chunk)
{
if (!executor)
{
executor = std::make_shared<PipelineExecutor>(pipeline.processors, pipeline.process_list_element);
executor = std::make_shared<PipelineExecutor>(pipeline.processors, pipeline.process_list_element, pipeline.partial_result_duration_ms);
executor->setReadProgressCallback(pipeline.getReadProgressCallback());
}

View File

@ -167,7 +167,7 @@ void PushingAsyncPipelineExecutor::start()
started = true;
data = std::make_unique<Data>();
data->executor = std::make_shared<PipelineExecutor>(pipeline.processors, pipeline.process_list_element);
data->executor = std::make_shared<PipelineExecutor>(pipeline.processors, pipeline.process_list_element, pipeline.partial_result_duration_ms);
data->executor->setReadProgressCallback(pipeline.getReadProgressCallback());
data->source = pushing_source.get();

View File

@ -87,7 +87,7 @@ void PushingPipelineExecutor::start()
return;
started = true;
executor = std::make_shared<PipelineExecutor>(pipeline.processors, pipeline.process_list_element);
executor = std::make_shared<PipelineExecutor>(pipeline.processors, pipeline.process_list_element, pipeline.partial_result_duration_ms);
executor->setReadProgressCallback(pipeline.getReadProgressCallback());
if (!executor->executeStep(&input_wait_flag))

View File

@ -1,40 +1,89 @@
#include <Processors/Formats/IOutputFormat.h>
#include <IO/WriteBuffer.h>
#include <IO/WriteHelpers.h>
namespace DB
{
IOutputFormat::IOutputFormat(const Block & header_, WriteBuffer & out_)
: IProcessor({header_, header_, header_}, {}), out(out_)
IOutputFormat::IOutputFormat(const Block & header_, WriteBuffer & out_, bool is_partial_result_protocol_active_)
: IProcessor({header_, header_, header_, header_}, {})
, out(out_)
, is_partial_result_protocol_active(is_partial_result_protocol_active_)
{
}
void IOutputFormat::setCurrentChunk(InputPort & input, PortKind kind)
{
current_chunk = input.pull(true);
current_block_kind = kind;
has_input = true;
}
IOutputFormat::Status IOutputFormat::prepareMainAndPartialResult()
{
bool need_data = false;
for (auto kind : {Main, PartialResult})
{
auto & input = getPort(kind);
if (input.isFinished())
continue;
if (kind == PartialResult && main_input_activated)
{
input.close();
continue;
}
input.setNeeded();
need_data = true;
if (!input.hasData())
continue;
setCurrentChunk(input, kind);
return Status::Ready;
}
if (need_data)
return Status::NeedData;
return Status::Finished;
}
IOutputFormat::Status IOutputFormat::prepareTotalsAndExtremes()
{
for (auto kind : {Totals, Extremes})
{
auto & input = getPort(kind);
if (!input.isConnected() || input.isFinished())
continue;
input.setNeeded();
if (!input.hasData())
return Status::NeedData;
setCurrentChunk(input, kind);
return Status::Ready;
}
return Status::Finished;
}
IOutputFormat::Status IOutputFormat::prepare()
{
if (has_input)
return Status::Ready;
for (auto kind : {Main, Totals, Extremes})
{
auto & input = getPort(kind);
auto status = prepareMainAndPartialResult();
if (status != Status::Finished)
return status;
if (kind != Main && !input.isConnected())
continue;
if (input.isFinished())
continue;
input.setNeeded();
if (!input.hasData())
return Status::NeedData;
current_chunk = input.pull(true);
current_block_kind = kind;
has_input = true;
return Status::Ready;
}
status = prepareTotalsAndExtremes();
if (status != Status::Finished)
return status;
finished = true;
@ -83,8 +132,18 @@ void IOutputFormat::work()
case Main:
result_rows += current_chunk.getNumRows();
result_bytes += current_chunk.allocatedBytes();
if (is_partial_result_protocol_active && !main_input_activated && current_chunk.hasRows())
{
/// Sending an empty block signals to the client that partial results are terminated,
/// and only data from the main pipeline will be forwarded.
consume(Chunk(current_chunk.cloneEmptyColumns(), 0));
main_input_activated = true;
}
consume(std::move(current_chunk));
break;
case PartialResult:
consumePartialResult(std::move(current_chunk));
break;
case Totals:
writeSuffixIfNeeded();
if (auto totals = prepareTotals(std::move(current_chunk)))
@ -119,6 +178,15 @@ void IOutputFormat::write(const Block & block)
flush();
}
void IOutputFormat::writePartialResult(const Block & block)
{
writePrefixIfNeeded();
consumePartialResult(Chunk(block.getColumns(), block.rows()));
if (auto_flush)
flush();
}
void IOutputFormat::finalize()
{
if (finalized)

View File

@ -23,9 +23,9 @@ class WriteBuffer;
class IOutputFormat : public IProcessor
{
public:
enum PortKind { Main = 0, Totals = 1, Extremes = 2 };
enum PortKind { Main = 0, Totals = 1, Extremes = 2, PartialResult = 3 };
IOutputFormat(const Block & header_, WriteBuffer & out_);
IOutputFormat(const Block & header_, WriteBuffer & out_, bool is_partial_result_protocol_active_ = false);
Status prepare() override;
void work() override;
@ -54,6 +54,7 @@ public:
/// TODO: separate formats and processors.
void write(const Block & block);
void writePartialResult(const Block & block);
void finalize();
@ -118,6 +119,7 @@ protected:
virtual void consume(Chunk) = 0;
virtual void consumeTotals(Chunk) {}
virtual void consumeExtremes(Chunk) {}
virtual void consumePartialResult(Chunk) {}
virtual void finalizeImpl() {}
virtual void finalizeBuffers() {}
virtual void writePrefix() {}
@ -166,6 +168,7 @@ protected:
Chunk current_chunk;
PortKind current_block_kind = PortKind::Main;
bool main_input_activated = false;
bool has_input = false;
bool finished = false;
bool finalized = false;
@ -180,9 +183,15 @@ protected:
Statistics statistics;
private:
void setCurrentChunk(InputPort & input, PortKind kind);
IOutputFormat::Status prepareMainAndPartialResult();
IOutputFormat::Status prepareTotalsAndExtremes();
size_t rows_read_before = 0;
bool are_totals_written = false;
bool is_partial_result_protocol_active = false;
/// Counters for consumed chunks. Are used for QueryLog.
size_t result_rows = 0;
size_t result_bytes = 0;

View File

@ -134,7 +134,8 @@ void PrettyBlockOutputFormat::write(Chunk chunk, PortKind port_kind)
{
if (total_rows >= format_settings.pretty.max_rows)
{
total_rows += chunk.getNumRows();
if (port_kind != PortKind::PartialResult)
total_rows += chunk.getNumRows();
return;
}
if (mono_block)
@ -315,7 +316,8 @@ void PrettyBlockOutputFormat::writeChunk(const Chunk & chunk, PortKind port_kind
}
writeString(bottom_separator_s, out);
total_rows += num_rows;
if (port_kind != PortKind::PartialResult)
total_rows += num_rows;
}
@ -388,6 +390,34 @@ void PrettyBlockOutputFormat::consumeExtremes(Chunk chunk)
write(std::move(chunk), PortKind::Extremes);
}
void PrettyBlockOutputFormat::clearLastLines(size_t lines_number)
{
/// http://en.wikipedia.org/wiki/ANSI_escape_code
#define MOVE_TO_PREV_LINE "\033[A"
#define CLEAR_TO_END_OF_LINE "\033[K"
static const char * clear_prev_line = MOVE_TO_PREV_LINE \
CLEAR_TO_END_OF_LINE;
/// Move cursor to the beginning of line
writeCString("\r", out);
for (size_t line = 0; line < lines_number; ++line)
{
writeCString(clear_prev_line, out);
}
}
void PrettyBlockOutputFormat::consumePartialResult(Chunk chunk)
{
if (prev_partial_block_rows > 0)
/// number of rows + header line + footer line
clearLastLines(prev_partial_block_rows + 2);
prev_partial_block_rows = chunk.getNumRows();
write(std::move(chunk), PortKind::PartialResult);
}
void PrettyBlockOutputFormat::writeMonoChunkIfNeeded()
{

View File

@ -28,7 +28,12 @@ protected:
void consumeTotals(Chunk) override;
void consumeExtremes(Chunk) override;
void clearLastLines(size_t lines_number);
void consumePartialResult(Chunk) override;
size_t total_rows = 0;
size_t prev_partial_block_rows = 0;
size_t row_number_width = 7; // "10000. "
const FormatSettings format_settings;
@ -55,6 +60,7 @@ protected:
void resetFormatterImpl() override
{
total_rows = 0;
prev_partial_block_rows = 0;
}
private:

View File

@ -194,7 +194,8 @@ void PrettyCompactBlockOutputFormat::writeChunk(const Chunk & chunk, PortKind po
writeBottom(max_widths);
total_rows += num_rows;
if (port_kind != PortKind::PartialResult)
total_rows += num_rows;
}

View File

@ -14,8 +14,8 @@ class LazyOutputFormat : public IOutputFormat
{
public:
explicit LazyOutputFormat(const Block & header)
: IOutputFormat(header, out), queue(2) {}
explicit LazyOutputFormat(const Block & header, bool is_partial_result_protocol_active = false)
: IOutputFormat(header, out, is_partial_result_protocol_active), queue(2) {}
String getName() const override { return "LazyOutputFormat"; }
@ -49,6 +49,7 @@ protected:
void consumeTotals(Chunk chunk) override { totals = std::move(chunk); }
void consumeExtremes(Chunk chunk) override { extremes = std::move(chunk); }
void consumePartialResult(Chunk chunk) override { consume(std::move(chunk)); }
private:

View File

@ -40,5 +40,10 @@ std::string IProcessor::statusToName(Status status)
UNREACHABLE();
}
ProcessorPtr IProcessor::getPartialResultProcessorPtr(const ProcessorPtr & current_processor, UInt64 partial_result_limit, UInt64 partial_result_duration_ms)
{
return current_processor->getPartialResultProcessor(current_processor, partial_result_limit, partial_result_duration_ms);
}
}

View File

@ -164,6 +164,8 @@ public:
static std::string statusToName(Status status);
static ProcessorPtr getPartialResultProcessorPtr(const ProcessorPtr & current_processor, UInt64 partial_result_limit, UInt64 partial_result_duration_ms);
/** Method 'prepare' is responsible for all cheap ("instantaneous": O(1) of data volume, no wait) calculations.
*
* It may access input and output ports,
@ -235,6 +237,22 @@ public:
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method 'expandPipeline' is not implemented for {} processor", getName());
}
enum class PartialResultStatus
{
/// Processor currently doesn't support work with the partial result pipeline.
NotSupported,
/// Processor can be skipped in the partial result pipeline.
SkipSupported,
/// Processor creates a light-weight copy of itself in the partial result pipeline.
/// The copy can create snapshots of the original processor or transform small blocks of data in the same way as the original processor
FullSupported,
};
virtual bool isPartialResultProcessor() const { return false; }
virtual PartialResultStatus getPartialResultProcessorSupportStatus() const { return PartialResultStatus::NotSupported; }
/// In case if query was cancelled executor will wait till all processors finish their jobs.
/// Generally, there is no reason to check this flag. However, it may be reasonable for long operations (e.g. i/o).
bool isCancelled() const { return is_cancelled.load(std::memory_order_acquire); }
@ -369,6 +387,11 @@ public:
protected:
virtual void onCancel() {}
virtual ProcessorPtr getPartialResultProcessor(const ProcessorPtr & /*current_processor*/, UInt64 /*partial_result_limit*/, UInt64 /*partial_result_duration_ms*/)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method 'getPartialResultProcessor' is not implemented for {} processor", getName());
}
private:
/// For:
/// - elapsed_us

View File

@ -1,5 +1,5 @@
#include <Processors/LimitTransform.h>
#include <Processors/Transforms/LimitPartialResultTransform.h>
namespace DB
{
@ -180,7 +180,6 @@ LimitTransform::Status LimitTransform::preparePair(PortsData & data)
return Status::NeedData;
data.current_chunk = input.pull(true);
auto rows = data.current_chunk.getNumRows();
if (rows_before_limit_at_least && !data.input_port_has_counter)
@ -367,5 +366,11 @@ bool LimitTransform::sortColumnsEqualAt(const ColumnRawPtrs & current_chunk_sort
return true;
}
ProcessorPtr LimitTransform::getPartialResultProcessor(const ProcessorPtr & /*current_processor*/, UInt64 partial_result_limit, UInt64 partial_result_duration_ms)
{
const auto & header = inputs.front().getHeader();
return std::make_shared<LimitPartialResultTransform>(header, partial_result_limit, partial_result_duration_ms, limit, offset);
}
}

View File

@ -55,6 +55,8 @@ private:
ColumnRawPtrs extractSortColumns(const Columns & columns) const;
bool sortColumnsEqualAt(const ColumnRawPtrs & current_chunk_sort_columns, UInt64 current_chunk_row_num) const;
ProcessorPtr getPartialResultProcessor(const ProcessorPtr & current_processor, UInt64 partial_result_limit, UInt64 partial_result_duration_ms) override;
public:
LimitTransform(
const Block & header_, UInt64 limit_, UInt64 offset_, size_t num_streams = 1,
@ -73,6 +75,14 @@ public:
void setRowsBeforeLimitCounter(RowsBeforeLimitCounterPtr counter) override { rows_before_limit_at_least.swap(counter); }
void setInputPortHasCounter(size_t pos) { ports_data[pos].input_port_has_counter = true; }
PartialResultStatus getPartialResultProcessorSupportStatus() const override
{
/// Currently LimitPartialResultTransform support only single-thread work.
bool is_partial_result_supported = inputs.size() == 1 && outputs.size() == 1;
return is_partial_result_supported ? PartialResultStatus::FullSupported : PartialResultStatus::NotSupported;
}
};
}

View File

@ -9,7 +9,12 @@ namespace DB
BuildQueryPipelineSettings BuildQueryPipelineSettings::fromContext(ContextPtr from)
{
BuildQueryPipelineSettings settings;
settings.actions_settings = ExpressionActionsSettings::fromSettings(from->getSettingsRef(), CompileExpressions::yes);
const auto & context_settings = from->getSettingsRef();
settings.partial_result_limit = context_settings.max_rows_in_partial_result;
settings.partial_result_duration_ms = context_settings.partial_result_update_duration_ms.totalMilliseconds();
settings.actions_settings = ExpressionActionsSettings::fromSettings(context_settings, CompileExpressions::yes);
settings.process_list_element = from->getProcessListElement();
settings.progress_callback = from->getProgressCallback();
return settings;

View File

@ -19,6 +19,9 @@ struct BuildQueryPipelineSettings
QueryStatusPtr process_list_element;
ProgressCallback progress_callback = nullptr;
UInt64 partial_result_limit = 0;
UInt64 partial_result_duration_ms = 0;
const ExpressionActionsSettings & getActionsSettings() const { return actions_settings; }
static BuildQueryPipelineSettings fromContext(ContextPtr from);
};

View File

@ -168,6 +168,8 @@ QueryPipelineBuilderPtr QueryPlan::buildQueryPipeline(
QueryPipelineBuilderPtr last_pipeline;
bool has_partial_result_setting = build_pipeline_settings.partial_result_duration_ms > 0;
std::stack<Frame> stack;
stack.push(Frame{.node = root});
@ -194,6 +196,9 @@ QueryPipelineBuilderPtr QueryPlan::buildQueryPipeline(
}
else
stack.push(Frame{.node = frame.node->children[next_child]});
if (has_partial_result_setting && last_pipeline && !last_pipeline->isPartialResultActive())
last_pipeline->activatePartialResult(build_pipeline_settings.partial_result_limit, build_pipeline_settings.partial_result_duration_ms);
}
last_pipeline->setProgressCallback(build_pipeline_settings.progress_callback);

View File

@ -27,6 +27,8 @@ public:
size_t max_bytes_before_external_sort = 0;
TemporaryDataOnDiskScopePtr tmp_data = nullptr;
size_t min_free_disk_space = 0;
UInt64 partial_result_limit = 0;
UInt64 partial_result_duration_ms = 0;
explicit Settings(const Context & context);
explicit Settings(size_t max_block_size_);

View File

@ -0,0 +1,47 @@
#include <Processors/Transforms/AggregatingPartialResultTransform.h>
namespace DB
{
AggregatingPartialResultTransform::AggregatingPartialResultTransform(
const Block & input_header, const Block & output_header, AggregatingTransformPtr aggregating_transform_,
UInt64 partial_result_limit_, UInt64 partial_result_duration_ms_)
: PartialResultTransform(input_header, output_header, partial_result_limit_, partial_result_duration_ms_)
, aggregating_transform(std::move(aggregating_transform_))
, transform_aggregator(input_header, aggregating_transform->params->params)
{}
void AggregatingPartialResultTransform::transformPartialResult(Chunk & chunk)
{
auto & params = aggregating_transform->params->params;
bool no_more_keys = false;
AggregatedDataVariants variants;
ColumnRawPtrs key_columns(params.keys_size);
Aggregator::AggregateColumns aggregate_columns(params.aggregates_size);
const UInt64 num_rows = chunk.getNumRows();
transform_aggregator.executeOnBlock(chunk.detachColumns(), 0, num_rows, variants, key_columns, aggregate_columns, no_more_keys);
auto transformed_block = transform_aggregator.convertToBlocks(variants, /*final*/ true, /*max_threads*/ 1).front();
chunk = convertToChunk(transformed_block);
}
PartialResultTransform::ShaphotResult AggregatingPartialResultTransform::getRealProcessorSnapshot()
{
std::lock_guard lock(aggregating_transform->snapshot_mutex);
if (aggregating_transform->is_generate_initialized)
return {{}, SnaphotStatus::Stopped};
if (aggregating_transform->variants.empty())
return {{}, SnaphotStatus::NotReady};
auto & snapshot_aggregator = aggregating_transform->params->aggregator;
auto & snapshot_variants = aggregating_transform->many_data->variants;
auto block = snapshot_aggregator.prepareBlockAndFillWithoutKeySnapshot(*snapshot_variants.at(0));
return {convertToChunk(block), SnaphotStatus::Ready};
}
}

View File

@ -0,0 +1,29 @@
#pragma once
#include <Interpreters/Aggregator.h>
#include <Processors/Transforms/AggregatingTransform.h>
#include <Processors/Transforms/PartialResultTransform.h>
namespace DB
{
class AggregatingPartialResultTransform : public PartialResultTransform
{
public:
using AggregatingTransformPtr = std::shared_ptr<AggregatingTransform>;
AggregatingPartialResultTransform(
const Block & input_header, const Block & output_header, AggregatingTransformPtr aggregating_transform_,
UInt64 partial_result_limit_, UInt64 partial_result_duration_ms_);
String getName() const override { return "AggregatingPartialResultTransform"; }
void transformPartialResult(Chunk & chunk) override;
ShaphotResult getRealProcessorSnapshot() override;
private:
AggregatingTransformPtr aggregating_transform;
Aggregator transform_aggregator;
};
}

View File

@ -1,3 +1,4 @@
#include <Processors/Transforms/AggregatingPartialResultTransform.h>
#include <Processors/Transforms/AggregatingTransform.h>
#include <Formats/NativeReader.h>
@ -657,6 +658,8 @@ void AggregatingTransform::consume(Chunk chunk)
src_rows += num_rows;
src_bytes += chunk.bytes();
std::lock_guard lock(snapshot_mutex);
if (params->params.only_merge)
{
auto block = getInputs().front().getHeader().cloneWithColumns(chunk.detachColumns());
@ -676,6 +679,7 @@ void AggregatingTransform::initGenerate()
if (is_generate_initialized)
return;
std::lock_guard lock(snapshot_mutex);
is_generate_initialized = true;
/// If there was no data, and we aggregate without keys, and we must return single row with the result of empty aggregation.
@ -806,4 +810,12 @@ void AggregatingTransform::initGenerate()
}
}
ProcessorPtr AggregatingTransform::getPartialResultProcessor(const ProcessorPtr & current_processor, UInt64 partial_result_limit, UInt64 partial_result_duration_ms)
{
const auto & input_header = inputs.front().getHeader();
const auto & output_header = outputs.front().getHeader();
auto aggregating_processor = std::dynamic_pointer_cast<AggregatingTransform>(current_processor);
return std::make_shared<AggregatingPartialResultTransform>(input_header, output_header, std::move(aggregating_processor), partial_result_limit, partial_result_duration_ms);
}
}

View File

@ -170,9 +170,23 @@ public:
void work() override;
Processors expandPipeline() override;
PartialResultStatus getPartialResultProcessorSupportStatus() const override
{
/// Currently AggregatingPartialResultTransform support only single-thread aggregation without key.
/// TODO: check that insert results from aggregator.prepareBlockAndFillWithoutKey return values without
/// changing of the aggregator state when aggregation with keys will be supported in AggregatingPartialResultTransform.
bool is_partial_result_supported = params->params.keys_size == 0 /// Aggregation without key.
&& many_data->variants.size() == 1; /// Use only one stream for aggregation.
return is_partial_result_supported ? PartialResultStatus::FullSupported : PartialResultStatus::NotSupported;
}
protected:
void consume(Chunk chunk);
ProcessorPtr getPartialResultProcessor(const ProcessorPtr & current_processor, UInt64 partial_result_limit, UInt64 partial_result_duration_ms) override;
private:
/// To read the data that was flushed into the temporary data file.
Processors processors;
@ -212,6 +226,13 @@ private:
bool is_consume_started = false;
friend class AggregatingPartialResultTransform;
/// The mutex protects variables that are used for creating a snapshot of the current processor.
/// The current implementation of AggregatingPartialResultTransform uses the 'is_generate_initialized' variable to check
/// whether the processor has started sending data through the main pipeline, and the corresponding partial result processor should stop creating snapshots.
/// Additionally, the mutex protects the 'params->aggregator' and 'many_data->variants' variables, which are used to get data from them for a snapshot.
std::mutex snapshot_mutex;
void initGenerate();
};

View File

@ -25,6 +25,12 @@ void ExpressionTransform::transform(Chunk & chunk)
chunk.setColumns(block.getColumns(), num_rows);
}
ProcessorPtr ExpressionTransform::getPartialResultProcessor(const ProcessorPtr & /*current_processor*/, UInt64 /*partial_result_limit*/, UInt64 /*partial_result_duration_ms*/)
{
const auto & header = getInputPort().getHeader();
return std::make_shared<ExpressionTransform>(header, expression);
}
ConvertingTransform::ConvertingTransform(const Block & header_, ExpressionActionsPtr expression_)
: ExceptionKeepingTransform(header_, ExpressionTransform::transformHeader(header_, expression_->getActionsDAG()))
, expression(std::move(expression_))

View File

@ -26,10 +26,15 @@ public:
static Block transformHeader(Block header, const ActionsDAG & expression);
PartialResultStatus getPartialResultProcessorSupportStatus() const override { return PartialResultStatus::FullSupported; }
protected:
void transform(Chunk & chunk) override;
ProcessorPtr getPartialResultProcessor(const ProcessorPtr & current_processor, UInt64 partial_result_limit, UInt64 partial_result_duration_ms) override;
private:
ExpressionActionsPtr expression;
};

View File

@ -0,0 +1,42 @@
#include <Processors/LimitTransform.h>
#include <Processors/Transforms/LimitPartialResultTransform.h>
namespace DB
{
LimitPartialResultTransform::LimitPartialResultTransform(
const Block & header,
UInt64 partial_result_limit_,
UInt64 partial_result_duration_ms_,
UInt64 limit_,
UInt64 offset_)
: PartialResultTransform(header, partial_result_limit_, partial_result_duration_ms_)
, limit(limit_)
, offset(offset_)
{}
void LimitPartialResultTransform::transformPartialResult(Chunk & chunk)
{
UInt64 num_rows = chunk.getNumRows();
if (num_rows < offset || limit == 0)
{
chunk = {};
return;
}
UInt64 length = std::min(limit, num_rows - offset);
/// Check if some rows should be removed
if (length < num_rows)
{
UInt64 num_columns = chunk.getNumColumns();
auto columns = chunk.detachColumns();
for (UInt64 i = 0; i < num_columns; ++i)
columns[i] = columns[i]->cut(offset, length);
chunk.setColumns(std::move(columns), length);
}
}
}

View File

@ -0,0 +1,36 @@
#pragma once
#include <Processors/Transforms/PartialResultTransform.h>
namespace DB
{
class LimitTransform;
/// Currently support only single thread implementation with one input and one output ports
class LimitPartialResultTransform : public PartialResultTransform
{
public:
using LimitTransformPtr = std::shared_ptr<LimitTransform>;
LimitPartialResultTransform(
const Block & header,
UInt64 partial_result_limit_,
UInt64 partial_result_duration_ms_,
UInt64 limit_,
UInt64 offset_);
String getName() const override { return "LimitPartialResultTransform"; }
void transformPartialResult(Chunk & chunk) override;
/// LimitsTransform doesn't have a state which can be snapshoted
ShaphotResult getRealProcessorSnapshot() override { return {{}, SnaphotStatus::Stopped}; }
private:
UInt64 limit;
UInt64 offset;
LimitTransformPtr limit_transform;
};
}

View File

@ -1,4 +1,5 @@
#include <Processors/Transforms/LimitsCheckingTransform.h>
#include <Processors/Transforms/PartialResultTransform.h>
#include <Access/EnabledQuota.h>
namespace DB

View File

@ -33,6 +33,8 @@ public:
void setQuota(const std::shared_ptr<const EnabledQuota> & quota_) { quota = quota_; }
PartialResultStatus getPartialResultProcessorSupportStatus() const override { return PartialResultStatus::SkipSupported; }
protected:
void transform(Chunk & chunk) override;

View File

@ -0,0 +1,48 @@
#include <Processors/Transforms/MergeSortingPartialResultTransform.h>
namespace DB
{
MergeSortingPartialResultTransform::MergeSortingPartialResultTransform(
const Block & header, MergeSortingTransformPtr merge_sorting_transform_,
UInt64 partial_result_limit_, UInt64 partial_result_duration_ms_)
: PartialResultTransform(header, partial_result_limit_, partial_result_duration_ms_)
, merge_sorting_transform(std::move(merge_sorting_transform_))
{}
PartialResultTransform::ShaphotResult MergeSortingPartialResultTransform::getRealProcessorSnapshot()
{
std::lock_guard lock(merge_sorting_transform->snapshot_mutex);
if (merge_sorting_transform->generated_prefix)
return {{}, SnaphotStatus::Stopped};
if (merge_sorting_transform->chunks.empty())
return {{}, SnaphotStatus::NotReady};
/// Sort all input data
merge_sorting_transform->remerge();
/// Add a copy of the first `partial_result_limit` rows to a generated_chunk
/// to send it later as a partial result in the next prepare stage of the current processor
auto generated_columns = merge_sorting_transform->chunks[0].cloneEmptyColumns();
size_t total_rows = 0;
for (const auto & merged_chunk : merge_sorting_transform->chunks)
{
size_t rows = std::min(merged_chunk.getNumRows(), partial_result_limit - total_rows);
if (rows == 0)
break;
for (size_t position = 0; position < generated_columns.size(); ++position)
{
auto column = merged_chunk.getColumns()[position];
generated_columns[position]->insertRangeFrom(*column, 0, rows);
}
total_rows += rows;
}
auto partial_result = Chunk(std::move(generated_columns), total_rows, merge_sorting_transform->chunks[0].getChunkInfo());
merge_sorting_transform->enrichChunkWithConstants(partial_result);
return {std::move(partial_result), SnaphotStatus::Ready};
}
}

View File

@ -0,0 +1,28 @@
#pragma once
#include <Processors/Transforms/MergeSortingTransform.h>
#include <Processors/Transforms/PartialResultTransform.h>
namespace DB
{
class MergeSortingPartialResultTransform : public PartialResultTransform
{
public:
using MergeSortingTransformPtr = std::shared_ptr<MergeSortingTransform>;
MergeSortingPartialResultTransform(
const Block & header, MergeSortingTransformPtr merge_sorting_transform_,
UInt64 partial_result_limit_, UInt64 partial_result_duration_ms_);
String getName() const override { return "MergeSortingPartialResultTransform"; }
/// MergeSortingTransform always receives chunks in a sorted state, so transformation is not needed
void transformPartialResult(Chunk & /*chunk*/) override {}
ShaphotResult getRealProcessorSnapshot() override;
private:
MergeSortingTransformPtr merge_sorting_transform;
};
}

View File

@ -1,4 +1,5 @@
#include <Processors/Transforms/MergeSortingTransform.h>
#include <Processors/Transforms/MergeSortingPartialResultTransform.h>
#include <Processors/IAccumulatingTransform.h>
#include <Processors/Merges/MergingSortedTransform.h>
#include <Common/ProfileEvents.h>
@ -136,6 +137,8 @@ void MergeSortingTransform::consume(Chunk chunk)
/// If there were only const columns in sort description, then there is no need to sort.
/// Return the chunk as is.
std::lock_guard lock(snapshot_mutex);
if (description.empty())
{
generated_chunk = std::move(chunk);
@ -213,6 +216,8 @@ void MergeSortingTransform::serialize()
void MergeSortingTransform::generate()
{
std::lock_guard lock(snapshot_mutex);
if (!generated_prefix)
{
size_t num_tmp_files = tmp_data ? tmp_data->getStreams().size() : 0;
@ -273,4 +278,11 @@ void MergeSortingTransform::remerge()
sum_bytes_in_blocks = new_sum_bytes_in_blocks;
}
ProcessorPtr MergeSortingTransform::getPartialResultProcessor(const ProcessorPtr & current_processor, UInt64 partial_result_limit, UInt64 partial_result_duration_ms)
{
const auto & header = inputs.front().getHeader();
auto merge_sorting_processor = std::dynamic_pointer_cast<MergeSortingTransform>(current_processor);
return std::make_shared<MergeSortingPartialResultTransform>(header, std::move(merge_sorting_processor), partial_result_limit, partial_result_duration_ms);
}
}

View File

@ -33,6 +33,8 @@ public:
String getName() const override { return "MergeSortingTransform"; }
PartialResultStatus getPartialResultProcessorSupportStatus() const override { return PartialResultStatus::FullSupported; }
protected:
void consume(Chunk chunk) override;
void serialize() override;
@ -40,6 +42,8 @@ protected:
Processors expandPipeline() override;
ProcessorPtr getPartialResultProcessor(const ProcessorPtr & current_processor, UInt64 partial_result_limit, UInt64 partial_result_duration_ms) override;
private:
size_t max_bytes_before_remerge;
double remerge_lowered_memory_bytes_ratio;
@ -59,6 +63,13 @@ private:
void remerge();
ProcessorPtr external_merging_sorted;
friend class MergeSortingPartialResultTransform;
/// The mutex protects variables that are used for creating a snapshot of the current processor.
/// The current implementation of MergeSortingPartialResultTransform uses the 'generated_prefix' variable to check
/// whether the processor has started sending data through the main pipeline, and the corresponding partial result processor should stop creating snapshots.
/// Additionally, the mutex protects the 'chunks' variable and all variables in the 'remerge' function, which is used to transition 'chunks' to a sorted state.
std::mutex snapshot_mutex;
};
}

View File

@ -0,0 +1,80 @@
#include <Processors/Transforms/PartialResultTransform.h>
namespace DB
{
PartialResultTransform::PartialResultTransform(const Block & header, UInt64 partial_result_limit_, UInt64 partial_result_duration_ms_)
: PartialResultTransform(header, header, partial_result_limit_, partial_result_duration_ms_) {}
PartialResultTransform::PartialResultTransform(const Block & input_header, const Block & output_header, UInt64 partial_result_limit_, UInt64 partial_result_duration_ms_)
: IProcessor({input_header}, {output_header})
, input(inputs.front())
, output(outputs.front())
, partial_result_limit(partial_result_limit_)
, partial_result_duration_ms(partial_result_duration_ms_)
, watch(CLOCK_MONOTONIC)
{}
IProcessor::Status PartialResultTransform::prepare()
{
if (output.isFinished())
{
input.close();
return Status::Finished;
}
if (finished_getting_snapshots)
{
output.finish();
return Status::Finished;
}
if (!output.canPush())
{
input.setNotNeeded();
return Status::PortFull;
}
/// If input data from previous partial result processor is finished then
/// PartialResultTransform ready to create snapshots and send them as a partial result
if (input.isFinished())
{
if (partial_result.snapshot_status == SnaphotStatus::Ready)
{
partial_result.snapshot_status = SnaphotStatus::NotReady;
output.push(std::move(partial_result.chunk));
return Status::PortFull;
}
return Status::Ready;
}
input.setNeeded();
if (!input.hasData())
return Status::NeedData;
partial_result.chunk = input.pull();
transformPartialResult(partial_result.chunk);
if (partial_result.chunk.getNumRows() > 0)
{
output.push(std::move(partial_result.chunk));
return Status::PortFull;
}
return Status::NeedData;
}
void PartialResultTransform::work()
{
if (partial_result_duration_ms < watch.elapsedMilliseconds())
{
partial_result = getRealProcessorSnapshot();
if (partial_result.snapshot_status == SnaphotStatus::Stopped)
finished_getting_snapshots = true;
watch.restart();
}
}
}

View File

@ -0,0 +1,57 @@
#pragma once
#include <Processors/IProcessor.h>
namespace DB
{
/// Processors of this type are used to construct an auxiliary pipeline with processors corresponding to those in the main pipeline.
/// These processors work in two modes:
/// 1) Creating a snapshot of the corresponding processor from the main pipeline once per partial_result_duration_ms (period in milliseconds), and then sending the snapshot through the partial result pipeline.
/// 2) Transforming small blocks of data in the same way as the original processor and sending the transformed data through the partial result pipeline.
/// All processors of this type rely on the invariant that a new block from the previous processor of the partial result pipeline overwrites information about the previous block of the same previous processor.
class PartialResultTransform : public IProcessor
{
public:
PartialResultTransform(const Block & header, UInt64 partial_result_limit_, UInt64 partial_result_duration_ms_);
PartialResultTransform(const Block & input_header, const Block & output_header, UInt64 partial_result_limit_, UInt64 partial_result_duration_ms_);
String getName() const override { return "PartialResultTransform"; }
Status prepare() override;
void work() override;
bool isPartialResultProcessor() const override { return true; }
protected:
enum class SnaphotStatus
{
NotReady, // Waiting for data from the previous partial result processor or awaiting a timer before creating the snapshot.
Ready, // Current partial result processor has received a snapshot from the processor in the main pipeline.
Stopped, // The processor from the main pipeline has started sending data, and the pipeline for partial results should use data from the next processors of the main pipeline.
};
struct ShaphotResult
{
Chunk chunk;
SnaphotStatus snapshot_status;
};
InputPort & input;
OutputPort & output;
UInt64 partial_result_limit;
UInt64 partial_result_duration_ms;
ShaphotResult partial_result = {{}, SnaphotStatus::NotReady};
bool finished_getting_snapshots = false;
virtual void transformPartialResult(Chunk & /*chunk*/) = 0;
virtual ShaphotResult getRealProcessorSnapshot() = 0; // { return {{}, SnaphotStatus::Stopped}; }
private:
Stopwatch watch;
};
}

View File

@ -12,6 +12,7 @@
#include <Processors/QueryPlan/QueryPlan.h>
#include <QueryPipeline/ReadProgressCallback.h>
#include <Columns/ColumnConst.h>
#include <Common/logger_useful.h>
namespace DB
{
@ -167,12 +168,9 @@ Pipe::Pipe(ProcessorPtr source)
{
checkSource(*source);
if (collected_processors)
collected_processors->emplace_back(source);
output_ports.push_back(&source->getOutputs().front());
header = output_ports.front()->getHeader();
processors->emplace_back(std::move(source));
addProcessor(std::move(source));
max_parallel_streams = 1;
}
@ -319,6 +317,16 @@ Pipe Pipe::unitePipes(Pipes pipes, Processors * collected_processors, bool allow
res.processors->insert(res.processors->end(), pipe.processors->begin(), pipe.processors->end());
res.output_ports.insert(res.output_ports.end(), pipe.output_ports.begin(), pipe.output_ports.end());
if (res.isPartialResultActive() && pipe.isPartialResultActive())
{
res.partial_result_ports.insert(
res.partial_result_ports.end(),
pipe.partial_result_ports.begin(),
pipe.partial_result_ports.end());
}
else
res.dropPartialResult();
res.max_parallel_streams += pipe.max_parallel_streams;
if (pipe.totals_port)
@ -352,11 +360,11 @@ void Pipe::addSource(ProcessorPtr source)
else
assertBlocksHaveEqualStructure(header, source_header, "Pipes");
if (collected_processors)
collected_processors->emplace_back(source);
output_ports.push_back(&source->getOutputs().front());
processors->emplace_back(std::move(source));
if (isPartialResultActive())
partial_result_ports.push_back(nullptr);
addProcessor(std::move(source));
max_parallel_streams = std::max<size_t>(max_parallel_streams, output_ports.size());
}
@ -374,11 +382,9 @@ void Pipe::addTotalsSource(ProcessorPtr source)
assertBlocksHaveEqualStructure(header, source_header, "Pipes");
if (collected_processors)
collected_processors->emplace_back(source);
totals_port = &source->getOutputs().front();
processors->emplace_back(std::move(source));
addProcessor(std::move(source));
}
void Pipe::addExtremesSource(ProcessorPtr source)
@ -394,11 +400,20 @@ void Pipe::addExtremesSource(ProcessorPtr source)
assertBlocksHaveEqualStructure(header, source_header, "Pipes");
if (collected_processors)
collected_processors->emplace_back(source);
extremes_port = &source->getOutputs().front();
processors->emplace_back(std::move(source));
addProcessor(std::move(source));
}
void Pipe::activatePartialResult(UInt64 partial_result_limit_, UInt64 partial_result_duration_ms_)
{
if (is_partial_result_active)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Partial result for Pipe should be initialized only once");
is_partial_result_active = true;
partial_result_limit = partial_result_limit_;
partial_result_duration_ms = partial_result_duration_ms_;
partial_result_ports.assign(output_ports.size(), nullptr);
}
static void dropPort(OutputPort *& port, Processors & processors, Processors * collected_processors)
@ -426,6 +441,15 @@ void Pipe::dropExtremes()
dropPort(extremes_port, *processors, collected_processors);
}
void Pipe::dropPartialResult()
{
for (auto & port : partial_result_ports)
dropPort(port, *processors, collected_processors);
is_partial_result_active = false;
partial_result_ports.clear();
}
void Pipe::addTransform(ProcessorPtr transform)
{
addTransform(std::move(transform), static_cast<OutputPort *>(nullptr), static_cast<OutputPort *>(nullptr));
@ -456,6 +480,8 @@ void Pipe::addTransform(ProcessorPtr transform, OutputPort * totals, OutputPort
if (extremes)
extremes_port = extremes;
addPartialResultTransform(transform);
size_t next_output = 0;
for (auto & input : inputs)
{
@ -506,10 +532,7 @@ void Pipe::addTransform(ProcessorPtr transform, OutputPort * totals, OutputPort
if (extremes_port)
assertBlocksHaveEqualStructure(header, extremes_port->getHeader(), "Pipes");
if (collected_processors)
collected_processors->emplace_back(transform);
processors->emplace_back(std::move(transform));
addProcessor(std::move(transform));
max_parallel_streams = std::max<size_t>(max_parallel_streams, output_ports.size());
}
@ -546,6 +569,8 @@ void Pipe::addTransform(ProcessorPtr transform, InputPort * totals, InputPort *
extremes_port = nullptr;
}
addPartialResultTransform(transform);
bool found_totals = false;
bool found_extremes = false;
@ -595,14 +620,104 @@ void Pipe::addTransform(ProcessorPtr transform, InputPort * totals, InputPort *
if (extremes_port)
assertBlocksHaveEqualStructure(header, extremes_port->getHeader(), "Pipes");
if (collected_processors)
collected_processors->emplace_back(transform);
processors->emplace_back(std::move(transform));
addProcessor(std::move(transform));
max_parallel_streams = std::max<size_t>(max_parallel_streams, output_ports.size());
}
void Pipe::addPartialResultSimpleTransform(const ProcessorPtr & transform, size_t partial_result_port_id)
{
if (isPartialResultActive())
{
auto & partial_result_port = partial_result_ports[partial_result_port_id];
auto partial_result_status = transform->getPartialResultProcessorSupportStatus();
if (partial_result_status == IProcessor::PartialResultStatus::NotSupported)
dropPort(partial_result_port, *processors, collected_processors);
if (partial_result_status != IProcessor::PartialResultStatus::FullSupported)
return;
auto partial_result_transform = IProcessor::getPartialResultProcessorPtr(transform, partial_result_limit, partial_result_duration_ms);
connectPartialResultPort(partial_result_port, partial_result_transform->getInputs().front());
partial_result_port = &partial_result_transform->getOutputs().front();
addProcessor(std::move(partial_result_transform));
}
}
void Pipe::addPartialResultTransform(const ProcessorPtr & transform)
{
if (isPartialResultActive())
{
size_t new_outputs_size = transform->getOutputs().size();
auto partial_result_status = transform->getPartialResultProcessorSupportStatus();
if (partial_result_status == IProcessor::PartialResultStatus::SkipSupported && new_outputs_size != partial_result_ports.size())
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Cannot skip transform {} in the partial result part of the Pipe because it has {} output ports, but the partial result part expects {} output ports",
transform->getName(),
new_outputs_size,
partial_result_ports.size());
if (partial_result_status == IProcessor::PartialResultStatus::NotSupported)
{
for (auto & partial_result_port : partial_result_ports)
dropPort(partial_result_port, *processors, collected_processors);
partial_result_ports.assign(new_outputs_size, nullptr);
}
if (partial_result_status != IProcessor::PartialResultStatus::FullSupported)
return;
auto partial_result_transform = IProcessor::getPartialResultProcessorPtr(transform, partial_result_limit, partial_result_duration_ms);
auto & inputs = partial_result_transform->getInputs();
if (inputs.size() != partial_result_ports.size())
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Cannot add partial result transform {} to Pipe because it has {} input ports, but {} expected",
partial_result_transform->getName(),
inputs.size(),
partial_result_ports.size());
size_t next_port = 0;
for (auto & input : inputs)
{
connectPartialResultPort(partial_result_ports[next_port], input);
++next_port;
}
partial_result_ports.assign(new_outputs_size, nullptr);
next_port = 0;
for (auto & new_partial_result_port : partial_result_transform->getOutputs())
{
partial_result_ports[next_port] = &new_partial_result_port;
++next_port;
}
addProcessor(std::move(partial_result_transform));
}
}
void Pipe::connectPartialResultPort(OutputPort * partial_result_port, InputPort & partial_result_transform_port)
{
if (partial_result_port == nullptr)
{
auto source = std::make_shared<NullSource>(getHeader());
partial_result_port = &source->getPort();
addProcessor(std::move(source));
}
connect(*partial_result_port, partial_result_transform_port);
}
void Pipe::addSimpleTransform(const ProcessorGetterWithStreamKind & getter)
{
if (output_ports.empty())
@ -610,7 +725,7 @@ void Pipe::addSimpleTransform(const ProcessorGetterWithStreamKind & getter)
Block new_header;
auto add_transform = [&](OutputPort *& port, StreamType stream_type)
auto add_transform = [&](OutputPort *& port, size_t partial_result_port_id, StreamType stream_type)
{
if (!port)
return;
@ -646,19 +761,22 @@ void Pipe::addSimpleTransform(const ProcessorGetterWithStreamKind & getter)
{
connect(*port, transform->getInputs().front());
port = &transform->getOutputs().front();
if (stream_type == StreamType::Main)
addPartialResultSimpleTransform(transform, partial_result_port_id);
if (collected_processors)
collected_processors->emplace_back(transform);
processors->emplace_back(std::move(transform));
addProcessor(std::move(transform));
}
};
size_t partial_result_port_id = 0;
for (auto & port : output_ports)
add_transform(port, StreamType::Main);
{
add_transform(port, partial_result_port_id, StreamType::Main);
++partial_result_port_id;
}
add_transform(totals_port, StreamType::Totals);
add_transform(extremes_port, StreamType::Extremes);
add_transform(totals_port, 0, StreamType::Totals);
add_transform(extremes_port, 0, StreamType::Extremes);
header = std::move(new_header);
}
@ -679,6 +797,7 @@ void Pipe::addChains(std::vector<Chain> chains)
dropTotals();
dropExtremes();
dropPartialResult();
size_t max_parallel_streams_for_chains = 0;
@ -697,18 +816,21 @@ void Pipe::addChains(std::vector<Chain> chains)
auto added_processors = Chain::getProcessors(std::move(chains[i]));
for (auto & transform : added_processors)
{
if (collected_processors)
collected_processors->emplace_back(transform);
processors->emplace_back(std::move(transform));
}
addProcessor(std::move(transform));
}
header = std::move(new_header);
max_parallel_streams = std::max(max_parallel_streams, max_parallel_streams_for_chains);
}
void Pipe::addProcessor(ProcessorPtr processor)
{
if (collected_processors)
collected_processors->emplace_back(processor);
processors->emplace_back(std::move(processor));
}
void Pipe::resize(size_t num_streams, bool force, bool strict)
{
if (output_ports.empty())
@ -769,6 +891,9 @@ void Pipe::setSinks(const Pipe::ProcessorGetterWithStreamKind & getter)
add_transform(totals_port, StreamType::Totals);
add_transform(extremes_port, StreamType::Extremes);
for (auto & port : partial_result_ports)
add_transform(port, StreamType::PartialResult);
output_ports.clear();
header.clear();
}
@ -778,6 +903,9 @@ void Pipe::transform(const Transformer & transformer, bool check_ports)
if (output_ports.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot transform empty Pipe");
/// TODO: Add functionality to work with partial result ports in transformer.
dropPartialResult();
auto new_processors = transformer(output_ports);
/// Create hash table with new processors.

View File

@ -48,6 +48,9 @@ public:
OutputPort * getOutputPort(size_t pos) const { return output_ports[pos]; }
OutputPort * getTotalsPort() const { return totals_port; }
OutputPort * getExtremesPort() const { return extremes_port; }
OutputPort * getPartialResultPort(size_t pos) const { return partial_result_ports.empty() ? nullptr : partial_result_ports[pos]; }
bool isPartialResultActive() { return is_partial_result_active; }
/// Add processor to list, add it output ports to output_ports.
/// Processor shouldn't have input ports, output ports shouldn't be connected.
@ -58,9 +61,13 @@ public:
void addTotalsSource(ProcessorPtr source);
void addExtremesSource(ProcessorPtr source);
/// Drop totals and extremes (create NullSink for them).
/// Activate sending partial result during main pipeline execution
void activatePartialResult(UInt64 partial_result_limit_, UInt64 partial_result_duration_ms_);
/// Drop totals, extremes and partial result (create NullSink for them).
void dropTotals();
void dropExtremes();
void dropPartialResult();
/// Add processor to list. It should have size() input ports with compatible header.
/// Output ports should have same headers.
@ -69,11 +76,16 @@ public:
void addTransform(ProcessorPtr transform, OutputPort * totals, OutputPort * extremes);
void addTransform(ProcessorPtr transform, InputPort * totals, InputPort * extremes);
void addPartialResultTransform(const ProcessorPtr & transform);
void addPartialResultSimpleTransform(const ProcessorPtr & transform, size_t partial_result_port_id);
void connectPartialResultPort(OutputPort * partial_result_port, InputPort & partial_result_transform_port);
enum class StreamType
{
Main = 0, /// Stream for query data. There may be several streams of this type.
Totals, /// Stream for totals. No more than one.
Extremes, /// Stream for extremes. No more than one.
PartialResult, /// Stream for partial result data. There may be several streams of this type.
};
using ProcessorGetter = std::function<ProcessorPtr(const Block & header)>;
@ -109,10 +121,17 @@ private:
Block header;
std::shared_ptr<Processors> processors;
/// Output ports. Totals and extremes are allowed to be empty.
/// If the variable is true, then each time a processor is added pipe will try
/// to add processor which will send partial result from original processor
bool is_partial_result_active = false;
UInt64 partial_result_limit = 0;
UInt64 partial_result_duration_ms = 0;
/// Output ports. Totals, extremes and partial results are allowed to be empty.
OutputPortRawPtrs output_ports;
OutputPort * totals_port = nullptr;
OutputPort * extremes_port = nullptr;
OutputPortRawPtrs partial_result_ports;
/// It is the max number of processors which can be executed in parallel for each step.
/// Usually, it's the same as the number of output ports.
@ -128,6 +147,8 @@ private:
static Pipe unitePipes(Pipes pipes, Processors * collected_processors, bool allow_empty_header);
void setSinks(const Pipe::ProcessorGetterWithStreamKind & getter);
void addProcessor(ProcessorPtr processor);
friend class QueryPipelineBuilder;
friend class QueryPipeline;
};

View File

@ -66,7 +66,8 @@ static void checkPulling(
Processors & processors,
OutputPort * output,
OutputPort * totals,
OutputPort * extremes)
OutputPort * extremes,
OutputPort * partial_result)
{
if (!output || output->isConnected())
throw Exception(
@ -83,9 +84,15 @@ static void checkPulling(
ErrorCodes::LOGICAL_ERROR,
"Cannot create pulling QueryPipeline because its extremes port is connected");
if (partial_result && partial_result->isConnected())
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Cannot create pulling QueryPipeline because its partial_result port is connected");
bool found_output = false;
bool found_totals = false;
bool found_extremes = false;
bool found_partial_result = false;
for (const auto & processor : processors)
{
for (const auto & in : processor->getInputs())
@ -99,6 +106,8 @@ static void checkPulling(
found_totals = true;
else if (extremes && &out == extremes)
found_extremes = true;
else if (partial_result && &out == partial_result)
found_partial_result = true;
else
checkOutput(out, processor);
}
@ -116,6 +125,10 @@ static void checkPulling(
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Cannot create pulling QueryPipeline because its extremes port does not belong to any processor");
if (partial_result && !found_partial_result)
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Cannot create pulling QueryPipeline because its partial result port does not belong to any processor");
}
static void checkCompleted(Processors & processors)
@ -318,17 +331,20 @@ QueryPipeline::QueryPipeline(
std::shared_ptr<Processors> processors_,
OutputPort * output_,
OutputPort * totals_,
OutputPort * extremes_)
OutputPort * extremes_,
OutputPort * partial_result_)
: resources(std::move(resources_))
, processors(std::move(processors_))
, output(output_)
, totals(totals_)
, extremes(extremes_)
, partial_result(partial_result_)
{
checkPulling(*processors, output, totals, extremes);
checkPulling(*processors, output, totals, extremes, partial_result);
}
QueryPipeline::QueryPipeline(Pipe pipe)
: partial_result_duration_ms(pipe.partial_result_duration_ms)
{
if (pipe.numOutputPorts() > 0)
{
@ -336,9 +352,10 @@ QueryPipeline::QueryPipeline(Pipe pipe)
output = pipe.getOutputPort(0);
totals = pipe.getTotalsPort();
extremes = pipe.getExtremesPort();
partial_result = pipe.getPartialResultPort(0);
processors = std::move(pipe.processors);
checkPulling(*processors, output, totals, extremes);
checkPulling(*processors, output, totals, extremes, partial_result);
}
else
{
@ -370,6 +387,7 @@ QueryPipeline::QueryPipeline(std::shared_ptr<IOutputFormat> format)
auto & format_main = format->getPort(IOutputFormat::PortKind::Main);
auto & format_totals = format->getPort(IOutputFormat::PortKind::Totals);
auto & format_extremes = format->getPort(IOutputFormat::PortKind::Extremes);
auto & format_partial_result = format->getPort(IOutputFormat::PortKind::PartialResult);
if (!totals)
{
@ -385,12 +403,21 @@ QueryPipeline::QueryPipeline(std::shared_ptr<IOutputFormat> format)
processors->emplace_back(std::move(source));
}
if (!partial_result)
{
auto source = std::make_shared<NullSource>(format_partial_result.getHeader());
partial_result = &source->getPort();
processors->emplace_back(std::move(source));
}
connect(*totals, format_totals);
connect(*extremes, format_extremes);
connect(*partial_result, format_partial_result);
input = &format_main;
totals = nullptr;
extremes = nullptr;
partial_result = nullptr;
output_format = format.get();
@ -418,6 +445,7 @@ void QueryPipeline::complete(std::shared_ptr<ISink> sink)
drop(totals, *processors);
drop(extremes, *processors);
drop(partial_result, *processors);
connect(*output, sink->getPort());
processors->emplace_back(std::move(sink));
@ -433,6 +461,7 @@ void QueryPipeline::complete(Chain chain)
drop(totals, *processors);
drop(extremes, *processors);
drop(partial_result, *processors);
processors->reserve(processors->size() + chain.getProcessors().size() + 1);
for (auto processor : chain.getProcessors())
@ -458,6 +487,7 @@ void QueryPipeline::complete(Pipe pipe)
pipe.resize(1);
pipe.dropExtremes();
pipe.dropTotals();
pipe.dropPartialResult();
connect(*pipe.getOutputPort(0), *input);
input = nullptr;
@ -486,11 +516,13 @@ void QueryPipeline::complete(std::shared_ptr<IOutputFormat> format)
addMaterializing(output, *processors);
addMaterializing(totals, *processors);
addMaterializing(extremes, *processors);
addMaterializing(partial_result, *processors);
}
auto & format_main = format->getPort(IOutputFormat::PortKind::Main);
auto & format_totals = format->getPort(IOutputFormat::PortKind::Totals);
auto & format_extremes = format->getPort(IOutputFormat::PortKind::Extremes);
auto & format_partial_result = format->getPort(IOutputFormat::PortKind::PartialResult);
if (!totals)
{
@ -506,13 +538,22 @@ void QueryPipeline::complete(std::shared_ptr<IOutputFormat> format)
processors->emplace_back(std::move(source));
}
if (!partial_result)
{
auto source = std::make_shared<NullSource>(format_partial_result.getHeader());
partial_result = &source->getPort();
processors->emplace_back(std::move(source));
}
connect(*output, format_main);
connect(*totals, format_totals);
connect(*extremes, format_extremes);
connect(*partial_result, format_partial_result);
output = nullptr;
totals = nullptr;
extremes = nullptr;
partial_result = nullptr;
initRowsBeforeLimit(format.get());
output_format = format.get();
@ -684,6 +725,7 @@ void QueryPipeline::convertStructureTo(const ColumnsWithTypeAndName & columns)
addExpression(output, actions, *processors);
addExpression(totals, actions, *processors);
addExpression(extremes, actions, *processors);
addExpression(partial_result, actions, *processors);
}
std::unique_ptr<ReadProgressCallback> QueryPipeline::getReadProgressCallback() const

View File

@ -75,7 +75,8 @@ public:
std::shared_ptr<Processors> processors_,
OutputPort * output_,
OutputPort * totals_ = nullptr,
OutputPort * extremes_ = nullptr);
OutputPort * extremes_ = nullptr,
OutputPort * partial_result_ = nullptr);
bool initialized() const { return !processors->empty(); }
/// When initialized, exactly one of the following is true.
@ -154,6 +155,7 @@ private:
OutputPort * output = nullptr;
OutputPort * totals = nullptr;
OutputPort * extremes = nullptr;
OutputPort * partial_result = nullptr;
QueryStatusPtr process_list_element;
@ -162,6 +164,9 @@ private:
size_t num_threads = 0;
bool concurrency_control = false;
UInt64 partial_result_limit = 0;
UInt64 partial_result_duration_ms = 0;
friend class PushingPipelineExecutor;
friend class PullingPipelineExecutor;
friend class PushingAsyncPipelineExecutor;

View File

@ -110,6 +110,16 @@ void QueryPipelineBuilder::init(QueryPipeline & pipeline)
pipe.header = {};
}
if (pipeline.partial_result)
{
/// Set partial result ports only after activation because when activated, it is set to nullptr
pipe.activatePartialResult(pipeline.partial_result_limit, pipeline.partial_result_duration_ms);
pipe.partial_result_ports = {pipeline.partial_result};
}
if (!pipeline.partial_result)
pipe.dropPartialResult();
pipe.totals_port = pipeline.totals;
pipe.extremes_port = pipeline.extremes;
pipe.max_parallel_streams = pipeline.num_threads;
@ -352,6 +362,10 @@ std::unique_ptr<QueryPipelineBuilder> QueryPipelineBuilder::joinPipelinesYShaped
left->checkInitializedAndNotCompleted();
right->checkInitializedAndNotCompleted();
/// TODO: Support joining of partial results from different pipelines.
left->pipe.dropPartialResult();
right->pipe.dropPartialResult();
left->pipe.dropExtremes();
right->pipe.dropExtremes();
if (left->getNumStreams() != 1 || right->getNumStreams() != 1)
@ -364,6 +378,7 @@ std::unique_ptr<QueryPipelineBuilder> QueryPipelineBuilder::joinPipelinesYShaped
auto joining = std::make_shared<MergeJoinTransform>(join, inputs, out_header, max_block_size);
/// TODO: Support partial results in merge pipelines after joining support above.
return mergePipelines(std::move(left), std::move(right), std::move(joining), collected_processors);
}
@ -384,6 +399,10 @@ std::unique_ptr<QueryPipelineBuilder> QueryPipelineBuilder::joinPipelinesRightLe
left->pipe.dropExtremes();
right->pipe.dropExtremes();
/// TODO: Support joining of partial results from different pipelines.
left->pipe.dropPartialResult();
right->pipe.dropPartialResult();
left->pipe.collected_processors = collected_processors;
/// Collect the NEW processors for the right pipeline.
@ -634,7 +653,7 @@ PipelineExecutorPtr QueryPipelineBuilder::execute()
if (!isCompleted())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot execute pipeline because it is not completed");
return std::make_shared<PipelineExecutor>(pipe.processors, process_list_element);
return std::make_shared<PipelineExecutor>(pipe.processors, process_list_element, pipe.partial_result_duration_ms);
}
Pipe QueryPipelineBuilder::getPipe(QueryPipelineBuilder pipeline, QueryPlanResourceHolder & resources)

View File

@ -85,6 +85,12 @@ public:
/// Pipeline will be completed after this transformation.
void setSinks(const Pipe::ProcessorGetterWithStreamKind & getter);
/// Activate building separate pipeline for sending partial result.
void activatePartialResult(UInt64 partial_result_limit, UInt64 partial_result_duration_ms) { pipe.activatePartialResult(partial_result_limit, partial_result_duration_ms); }
/// Check if building of a pipeline for sending partial result active.
bool isPartialResultActive() { return pipe.isPartialResultActive(); }
/// Add totals which returns one chunk with single row with defaults.
void addDefaultTotals();

View File

@ -885,7 +885,8 @@ void TCPHandler::processOrdinaryQueryWithProcessors()
std::unique_lock progress_lock(task_callback_mutex, std::defer_lock);
{
PullingAsyncPipelineExecutor executor(pipeline);
bool has_partial_result_setting = query_context->getSettingsRef().partial_result_update_duration_ms.totalMilliseconds() > 0;
PullingAsyncPipelineExecutor executor(pipeline, has_partial_result_setting);
CurrentMetrics::Increment query_thread_metric_increment{CurrentMetrics::QueryThread};
Block block;

View File

@ -1,227 +1,33 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import socket
import os
import uuid
import sys
CLICKHOUSE_HOST = os.environ.get("CLICKHOUSE_HOST", "127.0.0.1")
CLICKHOUSE_PORT = int(os.environ.get("CLICKHOUSE_PORT_TCP", "900000"))
CLICKHOUSE_DATABASE = os.environ.get("CLICKHOUSE_DATABASE", "default")
CLIENT_NAME = "simple native protocol"
CURDIR = os.path.dirname(os.path.realpath(__file__))
sys.path.insert(0, os.path.join(CURDIR, "helpers"))
def writeVarUInt(x, ba):
for _ in range(0, 9):
byte = x & 0x7F
if x > 0x7F:
byte |= 0x80
ba.append(byte)
x >>= 7
if x == 0:
return
def writeStringBinary(s, ba):
b = bytes(s, "utf-8")
writeVarUInt(len(s), ba)
ba.extend(b)
def readStrict(s, size=1):
res = bytearray()
while size:
cur = s.recv(size)
# if not res:
# raise "Socket is closed"
size -= len(cur)
res.extend(cur)
return res
def readUInt(s, size=1):
res = readStrict(s, size)
val = 0
for i in range(len(res)):
val += res[i] << (i * 8)
return val
def readUInt8(s):
return readUInt(s)
def readUInt16(s):
return readUInt(s, 2)
def readUInt32(s):
return readUInt(s, 4)
def readUInt64(s):
return readUInt(s, 8)
def readVarUInt(s):
x = 0
for i in range(9):
byte = readStrict(s)[0]
x |= (byte & 0x7F) << (7 * i)
if not byte & 0x80:
return x
return x
def readStringBinary(s):
size = readVarUInt(s)
s = readStrict(s, size)
return s.decode("utf-8")
def sendHello(s):
ba = bytearray()
writeVarUInt(0, ba) # Hello
writeStringBinary(CLIENT_NAME, ba)
writeVarUInt(21, ba)
writeVarUInt(9, ba)
writeVarUInt(54449, ba)
writeStringBinary("default", ba) # database
writeStringBinary("default", ba) # user
writeStringBinary("", ba) # pwd
s.sendall(ba)
def receiveHello(s):
p_type = readVarUInt(s)
assert p_type == 0 # Hello
server_name = readStringBinary(s)
# print("Server name: ", server_name)
server_version_major = readVarUInt(s)
# print("Major: ", server_version_major)
server_version_minor = readVarUInt(s)
# print("Minor: ", server_version_minor)
server_revision = readVarUInt(s)
# print("Revision: ", server_revision)
server_timezone = readStringBinary(s)
# print("Timezone: ", server_timezone)
server_display_name = readStringBinary(s)
# print("Display name: ", server_display_name)
server_version_patch = readVarUInt(s)
# print("Version patch: ", server_version_patch)
def serializeClientInfo(ba, query_id):
writeStringBinary("default", ba) # initial_user
writeStringBinary(query_id, ba) # initial_query_id
writeStringBinary("127.0.0.1:9000", ba) # initial_address
ba.extend([0] * 8) # initial_query_start_time_microseconds
ba.append(1) # TCP
writeStringBinary("os_user", ba) # os_user
writeStringBinary("client_hostname", ba) # client_hostname
writeStringBinary(CLIENT_NAME, ba) # client_name
writeVarUInt(21, ba)
writeVarUInt(9, ba)
writeVarUInt(54449, ba)
writeStringBinary("", ba) # quota_key
writeVarUInt(0, ba) # distributed_depth
writeVarUInt(1, ba) # client_version_patch
ba.append(0) # No telemetry
def sendQuery(s, query):
ba = bytearray()
query_id = uuid.uuid4().hex
writeVarUInt(1, ba) # query
writeStringBinary(query_id, ba)
ba.append(1) # INITIAL_QUERY
# client info
serializeClientInfo(ba, query_id)
writeStringBinary("", ba) # No settings
writeStringBinary("", ba) # No interserver secret
writeVarUInt(2, ba) # Stage - Complete
ba.append(0) # No compression
writeStringBinary(query, ba) # query, finally
s.sendall(ba)
def serializeBlockInfo(ba):
writeVarUInt(1, ba) # 1
ba.append(0) # is_overflows
writeVarUInt(2, ba) # 2
writeVarUInt(0, ba) # 0
ba.extend([0] * 4) # bucket_num
def sendEmptyBlock(s):
ba = bytearray()
writeVarUInt(2, ba) # Data
writeStringBinary("", ba)
serializeBlockInfo(ba)
writeVarUInt(0, ba) # rows
writeVarUInt(0, ba) # columns
s.sendall(ba)
def assertPacket(packet, expected):
assert packet == expected, packet
def readHeader(s):
packet_type = readVarUInt(s)
if packet_type == 2: # Exception
raise RuntimeError(readException(s))
assertPacket(packet_type, 1) # Data
readStringBinary(s) # external table name
# BlockInfo
assertPacket(readVarUInt(s), 1) # 1
assertPacket(readUInt8(s), 0) # is_overflows
assertPacket(readVarUInt(s), 2) # 2
assertPacket(readUInt32(s), 4294967295) # bucket_num
assertPacket(readVarUInt(s), 0) # 0
columns = readVarUInt(s) # rows
rows = readVarUInt(s) # columns
print("Rows {} Columns {}".format(rows, columns))
for _ in range(columns):
col_name = readStringBinary(s)
type_name = readStringBinary(s)
print("Column {} type {}".format(col_name, type_name))
def readException(s):
code = readUInt32(s)
name = readStringBinary(s)
text = readStringBinary(s)
readStringBinary(s) # trace
assertPacket(readUInt8(s), 0) # has_nested
return "code {}: {}".format(code, text.replace("DB::Exception:", ""))
from tcp_client import (
TCPClient,
CLICKHOUSE_DATABASE,
writeVarUInt,
writeStringBinary,
serializeBlockInfo,
assertPacket,
)
def insertValidLowCardinalityRow():
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(30)
s.connect((CLICKHOUSE_HOST, CLICKHOUSE_PORT))
sendHello(s)
receiveHello(s)
sendQuery(
s,
with TCPClient() as client:
client.sendQuery(
"insert into {}.tab settings input_format_defaults_for_omitted_fields=0 format TSV".format(
CLICKHOUSE_DATABASE
),
)
# external tables
sendEmptyBlock(s)
readHeader(s)
client.sendEmptyBlock()
client.readHeader()
# Data
ba = bytearray()
@ -240,31 +46,25 @@ def insertValidLowCardinalityRow():
writeStringBinary("hello", ba) # key
ba.extend([1] + [0] * 7) # num_indexes
ba.extend([0] * 8) # UInt64 index (0 for 'hello')
s.sendall(ba)
client.send(ba)
# Fin block
sendEmptyBlock(s)
client.sendEmptyBlock()
assertPacket(readVarUInt(s), 5) # End of stream
s.close()
assertPacket(client.readVarUInt(), 5) # End of stream
def insertLowCardinalityRowWithIndexOverflow():
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(30)
s.connect((CLICKHOUSE_HOST, CLICKHOUSE_PORT))
sendHello(s)
receiveHello(s)
sendQuery(
s,
with TCPClient() as client:
client.sendQuery(
"insert into {}.tab settings input_format_defaults_for_omitted_fields=0 format TSV".format(
CLICKHOUSE_DATABASE
),
)
# external tables
sendEmptyBlock(s)
readHeader(s)
client.sendEmptyBlock()
client.readHeader()
# Data
ba = bytearray()
@ -283,29 +83,23 @@ def insertLowCardinalityRowWithIndexOverflow():
writeStringBinary("hello", ba) # key
ba.extend([1] + [0] * 7) # num_indexes
ba.extend([0] * 7 + [1]) # UInt64 index (overflow)
s.sendall(ba)
client.send(ba)
assertPacket(readVarUInt(s), 2)
print(readException(s))
s.close()
assertPacket(client.readVarUInt(), 2) # Exception
print(client.readException())
def insertLowCardinalityRowWithIncorrectDictType():
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(30)
s.connect((CLICKHOUSE_HOST, CLICKHOUSE_PORT))
sendHello(s)
receiveHello(s)
sendQuery(
s,
with TCPClient() as client:
client.sendQuery(
"insert into {}.tab settings input_format_defaults_for_omitted_fields=0 format TSV".format(
CLICKHOUSE_DATABASE
),
)
# external tables
sendEmptyBlock(s)
readHeader(s)
client.sendEmptyBlock()
client.readHeader()
# Data
ba = bytearray()
@ -324,29 +118,23 @@ def insertLowCardinalityRowWithIncorrectDictType():
writeStringBinary("hello", ba) # key
ba.extend([1] + [0] * 7) # num_indexes
ba.extend([0] * 8) # UInt64 index (overflow)
s.sendall(ba)
client.send(ba)
assertPacket(readVarUInt(s), 2)
print(readException(s))
s.close()
assertPacket(client.readVarUInt(), 2) # Exception
print(client.readException())
def insertLowCardinalityRowWithIncorrectAdditionalKeys():
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(30)
s.connect((CLICKHOUSE_HOST, CLICKHOUSE_PORT))
sendHello(s)
receiveHello(s)
sendQuery(
s,
with TCPClient() as client:
client.sendQuery(
"insert into {}.tab settings input_format_defaults_for_omitted_fields=0 format TSV".format(
CLICKHOUSE_DATABASE
),
)
# external tables
sendEmptyBlock(s)
readHeader(s)
client.sendEmptyBlock()
client.readHeader()
# Data
ba = bytearray()
@ -365,11 +153,10 @@ def insertLowCardinalityRowWithIncorrectAdditionalKeys():
writeStringBinary("hello", ba) # key
ba.extend([1] + [0] * 7) # num_indexes
ba.extend([0] * 8) # UInt64 index (0 for 'hello')
s.sendall(ba)
client.send(ba)
assertPacket(readVarUInt(s), 2)
print(readException(s))
s.close()
assertPacket(client.readVarUInt(), 2) # Exception
print(client.readException())
def main():

View File

@ -38,4 +38,5 @@ LazyOutputFormat 1 1 1 0 0
LimitsCheckingTransform 1 1 1 1 1
NullSource 1 0 0 0 0
NullSource 1 0 0 0 0
NullSource 0 0 0 0 0
SourceFromSingleChunk 1 0 0 1 1

View File

@ -1,188 +1,30 @@
#!/usr/bin/env python3
import socket
import os
import uuid
import json
import os
import sys
CLICKHOUSE_HOST = os.environ.get("CLICKHOUSE_HOST", "127.0.0.1")
CLICKHOUSE_PORT = int(os.environ.get("CLICKHOUSE_PORT_TCP", "900000"))
CLICKHOUSE_DATABASE = os.environ.get("CLICKHOUSE_DATABASE", "default")
CLIENT_NAME = "simple native protocol"
CURDIR = os.path.dirname(os.path.realpath(__file__))
sys.path.insert(0, os.path.join(CURDIR, "helpers"))
def writeVarUInt(x, ba):
for _ in range(0, 9):
byte = x & 0x7F
if x > 0x7F:
byte |= 0x80
ba.append(byte)
x >>= 7
if x == 0:
return
def writeStringBinary(s, ba):
b = bytes(s, "utf-8")
writeVarUInt(len(s), ba)
ba.extend(b)
def readStrict(s, size=1):
res = bytearray()
while size:
cur = s.recv(size)
# if not res:
# raise "Socket is closed"
size -= len(cur)
res.extend(cur)
return res
def readUInt(s, size=1):
res = readStrict(s, size)
val = 0
for i in range(len(res)):
val += res[i] << (i * 8)
return val
def readUInt8(s):
return readUInt(s)
def readUInt16(s):
return readUInt(s, 2)
def readUInt32(s):
return readUInt(s, 4)
def readUInt64(s):
return readUInt(s, 8)
def readVarUInt(s):
x = 0
for i in range(9):
byte = readStrict(s)[0]
x |= (byte & 0x7F) << (7 * i)
if not byte & 0x80:
return x
return x
def readStringBinary(s):
size = readVarUInt(s)
s = readStrict(s, size)
return s.decode("utf-8")
def sendHello(s):
ba = bytearray()
writeVarUInt(0, ba) # Hello
writeStringBinary(CLIENT_NAME, ba)
writeVarUInt(21, ba)
writeVarUInt(9, ba)
writeVarUInt(54449, ba)
writeStringBinary(CLICKHOUSE_DATABASE, ba) # database
writeStringBinary("default", ba) # user
writeStringBinary("", ba) # pwd
s.sendall(ba)
def receiveHello(s):
p_type = readVarUInt(s)
assert p_type == 0 # Hello
server_name = readStringBinary(s)
# print("Server name: ", server_name)
server_version_major = readVarUInt(s)
# print("Major: ", server_version_major)
server_version_minor = readVarUInt(s)
# print("Minor: ", server_version_minor)
server_revision = readVarUInt(s)
# print("Revision: ", server_revision)
server_timezone = readStringBinary(s)
# print("Timezone: ", server_timezone)
server_display_name = readStringBinary(s)
# print("Display name: ", server_display_name)
server_version_patch = readVarUInt(s)
# print("Version patch: ", server_version_patch)
def serializeClientInfo(ba, query_id):
writeStringBinary("default", ba) # initial_user
writeStringBinary(query_id, ba) # initial_query_id
writeStringBinary("127.0.0.1:9000", ba) # initial_address
ba.extend([0] * 8) # initial_query_start_time_microseconds
ba.append(1) # TCP
writeStringBinary("os_user", ba) # os_user
writeStringBinary("client_hostname", ba) # client_hostname
writeStringBinary(CLIENT_NAME, ba) # client_name
writeVarUInt(21, ba)
writeVarUInt(9, ba)
writeVarUInt(54449, ba)
writeStringBinary("", ba) # quota_key
writeVarUInt(0, ba) # distributed_depth
writeVarUInt(1, ba) # client_version_patch
ba.append(0) # No telemetry
def sendQuery(s, query):
ba = bytearray()
query_id = uuid.uuid4().hex
writeVarUInt(1, ba) # query
writeStringBinary(query_id, ba)
ba.append(1) # INITIAL_QUERY
# client info
serializeClientInfo(ba, query_id)
writeStringBinary("", ba) # No settings
writeStringBinary("", ba) # No interserver secret
writeVarUInt(2, ba) # Stage - Complete
ba.append(0) # No compression
writeStringBinary(query, ba) # query, finally
s.sendall(ba)
def serializeBlockInfo(ba):
writeVarUInt(1, ba) # 1
ba.append(0) # is_overflows
writeVarUInt(2, ba) # 2
writeVarUInt(0, ba) # 0
ba.extend([0] * 4) # bucket_num
def sendEmptyBlock(s):
ba = bytearray()
writeVarUInt(2, ba) # Data
writeStringBinary("", ba)
serializeBlockInfo(ba)
writeVarUInt(0, ba) # rows
writeVarUInt(0, ba) # columns
s.sendall(ba)
def assertPacket(packet, expected):
assert packet == expected, packet
from tcp_client import TCPClient
class Progress:
def __init__(self):
def __init__(
self,
read_rows=0,
read_bytes=0,
total_rows_to_read=0,
written_rows=0,
written_bytes=0,
):
# NOTE: this is done in ctor to initialize __dict__
self.read_rows = 0
self.read_bytes = 0
self.total_rows_to_read = 0
self.written_rows = 0
self.written_bytes = 0
self.read_rows = read_rows
self.read_bytes = read_bytes
self.total_rows_to_read = total_rows_to_read
self.written_rows = written_rows
self.written_bytes = written_bytes
def __str__(self):
return json.dumps(self.__dict__)
@ -195,13 +37,6 @@ class Progress:
self.written_bytes += b.written_bytes
return self
def readPacket(self, s):
self.read_rows += readVarUInt(s)
self.read_bytes += readVarUInt(s)
self.total_rows_to_read += readVarUInt(s)
self.written_rows += readVarUInt(s)
self.written_bytes += readVarUInt(s)
def __bool__(self):
return (
self.read_rows > 0
@ -212,52 +47,25 @@ class Progress:
)
def readProgress(s):
packet_type = readVarUInt(s)
if packet_type == 2: # Exception
raise RuntimeError(readException(s))
if packet_type == 5: # End stream
return None
assertPacket(packet_type, 3) # Progress
progress = Progress()
progress.readPacket(s)
return progress
def readException(s):
code = readUInt32(s)
name = readStringBinary(s)
text = readStringBinary(s)
readStringBinary(s) # trace
assertPacket(readUInt8(s), 0) # has_nested
return "code {}: {}".format(code, text.replace("DB::Exception:", ""))
def main():
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(30)
s.connect((CLICKHOUSE_HOST, CLICKHOUSE_PORT))
sendHello(s)
receiveHello(s)
with TCPClient() as client:
# For 1 second sleep and 1000ms of interactive_delay we definitelly should have non zero progress packet.
# NOTE: interactive_delay=0 cannot be used since in this case CompletedPipelineExecutor will not call cancelled callback.
sendQuery(
s,
client.sendQuery(
"insert into function null('_ Int') select sleep(1) from numbers(2) settings max_block_size=1, interactive_delay=1000",
)
# external tables
sendEmptyBlock(s)
client.sendEmptyBlock()
summary_progress = Progress()
non_empty_progress_packets = 0
while True:
progress = readProgress(s)
if progress is None:
progress_info = client.readProgress()
if progress_info is None:
break
progress = Progress(*progress_info)
summary_progress += progress
if progress:
non_empty_progress_packets += 1
@ -268,8 +76,6 @@ def main():
# - 1 or 2 for each SELECT block
assert non_empty_progress_packets in (3, 4), f"{non_empty_progress_packets=:}"
s.close()
if __name__ == "__main__":
main()

View File

@ -1,217 +1,23 @@
#!/usr/bin/env python3
import socket
import os
import uuid
import json
import sys
CLICKHOUSE_HOST = os.environ.get("CLICKHOUSE_HOST", "127.0.0.1")
CLICKHOUSE_PORT = int(os.environ.get("CLICKHOUSE_PORT_TCP", "900000"))
CLICKHOUSE_DATABASE = os.environ.get("CLICKHOUSE_DATABASE", "default")
CLIENT_NAME = "simple native protocol"
CURDIR = os.path.dirname(os.path.realpath(__file__))
sys.path.insert(0, os.path.join(CURDIR, "helpers"))
def writeVarUInt(x, ba):
for _ in range(0, 9):
byte = x & 0x7F
if x > 0x7F:
byte |= 0x80
ba.append(byte)
x >>= 7
if x == 0:
return
def writeStringBinary(s, ba):
b = bytes(s, "utf-8")
writeVarUInt(len(s), ba)
ba.extend(b)
def readStrict(s, size=1):
res = bytearray()
while size:
cur = s.recv(size)
# if not res:
# raise "Socket is closed"
size -= len(cur)
res.extend(cur)
return res
def readUInt(s, size=1):
res = readStrict(s, size)
val = 0
for i in range(len(res)):
val += res[i] << (i * 8)
return val
def readUInt8(s):
return readUInt(s)
def readUInt16(s):
return readUInt(s, 2)
def readUInt32(s):
return readUInt(s, 4)
def readUInt64(s):
return readUInt(s, 8)
def readVarUInt(s):
x = 0
for i in range(9):
byte = readStrict(s)[0]
x |= (byte & 0x7F) << (7 * i)
if not byte & 0x80:
return x
return x
def readStringBinary(s):
size = readVarUInt(s)
s = readStrict(s, size)
return s.decode("utf-8")
def sendHello(s):
ba = bytearray()
writeVarUInt(0, ba) # Hello
writeStringBinary(CLIENT_NAME, ba)
writeVarUInt(21, ba)
writeVarUInt(9, ba)
writeVarUInt(54449, ba)
writeStringBinary(CLICKHOUSE_DATABASE, ba) # database
writeStringBinary("default", ba) # user
writeStringBinary("", ba) # pwd
s.sendall(ba)
def receiveHello(s):
p_type = readVarUInt(s)
assert p_type == 0 # Hello
_server_name = readStringBinary(s)
_server_version_major = readVarUInt(s)
_server_version_minor = readVarUInt(s)
_server_revision = readVarUInt(s)
_server_timezone = readStringBinary(s)
_server_display_name = readStringBinary(s)
_server_version_patch = readVarUInt(s)
def serializeClientInfo(ba, query_id):
writeStringBinary("default", ba) # initial_user
writeStringBinary(query_id, ba) # initial_query_id
writeStringBinary("127.0.0.1:9000", ba) # initial_address
ba.extend([0] * 8) # initial_query_start_time_microseconds
ba.append(1) # TCP
writeStringBinary("os_user", ba) # os_user
writeStringBinary("client_hostname", ba) # client_hostname
writeStringBinary(CLIENT_NAME, ba) # client_name
writeVarUInt(21, ba)
writeVarUInt(9, ba)
writeVarUInt(54449, ba)
writeStringBinary("", ba) # quota_key
writeVarUInt(0, ba) # distributed_depth
writeVarUInt(1, ba) # client_version_patch
ba.append(0) # No telemetry
def sendQuery(s, query, settings):
ba = bytearray()
query_id = uuid.uuid4().hex
writeVarUInt(1, ba) # query
writeStringBinary(query_id, ba)
ba.append(1) # INITIAL_QUERY
# client info
serializeClientInfo(ba, query_id)
# Settings
for key, value in settings.items():
writeStringBinary(key, ba)
writeVarUInt(1, ba) # is_important
writeStringBinary(str(value), ba)
writeStringBinary("", ba) # End of settings
writeStringBinary("", ba) # No interserver secret
writeVarUInt(2, ba) # Stage - Complete
ba.append(0) # No compression
writeStringBinary(query, ba) # query, finally
s.sendall(ba)
def serializeBlockInfo(ba):
writeVarUInt(1, ba) # 1
ba.append(0) # is_overflows
writeVarUInt(2, ba) # 2
writeVarUInt(0, ba) # 0
ba.extend([0] * 4) # bucket_num
def sendEmptyBlock(s):
ba = bytearray()
writeVarUInt(2, ba) # Data
writeStringBinary("", ba)
serializeBlockInfo(ba)
writeVarUInt(0, ba) # rows
writeVarUInt(0, ba) # columns
s.sendall(ba)
def assertPacket(packet, expected):
assert packet == expected, "Got: {}, expected: {}".format(packet, expected)
def readResponse(s):
packet_type = readVarUInt(s)
if packet_type == 2: # Exception
raise RuntimeError(readException(s))
if packet_type == 1: # Data
return None
if packet_type == 3: # Progress
return None
if packet_type == 5: # End stream
return None
raise RuntimeError("Unexpected packet: {}".format(packet_type))
def readException(s):
code = readUInt32(s)
_name = readStringBinary(s)
text = readStringBinary(s)
readStringBinary(s) # trace
assertPacket(readUInt8(s), 0) # has_nested
return "code {}: {}".format(code, text.replace("DB::Exception:", ""))
from tcp_client import TCPClient
def main():
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(30)
s.connect((CLICKHOUSE_HOST, CLICKHOUSE_PORT))
sendHello(s)
receiveHello(s)
sendQuery(s, "select 1", {"replication_alter_partitions_sync": 1})
with TCPClient() as client:
client.sendQuery("select 1", {"replication_alter_partitions_sync": 1})
# external tables
sendEmptyBlock(s)
client.sendEmptyBlock()
while readResponse(s) is not None:
while client.readResponse() is not None:
pass
s.close()
print("OK")

View File

@ -0,0 +1,95 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import sys
CURDIR = os.path.dirname(os.path.realpath(__file__))
sys.path.insert(0, os.path.join(CURDIR, "helpers"))
from tcp_client import TCPClient
def run_query_without_errors(query, support_partial_result):
with TCPClient() as client:
client.sendQuery(query)
# external tables
client.sendEmptyBlock()
client.readHeader()
# Partial result
partial_result = client.readDataWithoutProgress()[0]
if support_partial_result:
assert (
len(partial_result.value) > 0
), "Expected at least one block with a non-empty partial result before getting the full result"
while True:
assert all(
a >= b
for a, b in zip(partial_result.value, partial_result.value[1:])
), "Partial result always should be sorted for this test"
new_partial_result = client.readDataWithoutProgress(
need_print_info=False
)[0]
if len(new_partial_result.value) == 0:
break
data_size = len(partial_result.value)
assert all(
partial_result.value[i] <= new_partial_result.value[i]
for i in range(data_size)
), f"New partial result values should always be greater then old one because a new block contains more information about the full data. New result {new_partial_result}. Previous result {partial_result}"
partial_result = new_partial_result
else:
block_rows = len(partial_result.value)
assert (
block_rows == 0
), f"Expected only empty partial result block before getting the full result, but block has {block_rows} rows"
# Full result
full_result = client.readDataWithoutProgress()[0]
data_size = len(partial_result.value)
assert all(
partial_result.value[i] <= full_result.value[i] for i in range(data_size)
), f"Full result values should always be greater then partial result values. Full result {full_result}. Partial result {partial_result}"
for result in full_result.value:
print(result)
def main():
rows_number = 2e7 + 1
# Request with partial result limit less then full limit
run_query_without_errors(
f"SELECT number FROM numbers_mt({rows_number}) ORDER BY -number LIMIT 5 SETTINGS max_threads = 1, partial_result_update_duration_ms = 1, max_rows_in_partial_result = 3",
support_partial_result=True,
)
# Request with partial result limit greater then full limit
run_query_without_errors(
f"SELECT number FROM numbers_mt({rows_number}) ORDER BY -number LIMIT 3 SETTINGS max_threads = 1, partial_result_update_duration_ms = 1, max_rows_in_partial_result = 5",
support_partial_result=True,
)
# Request with OFFSET
run_query_without_errors(
f"SELECT number FROM numbers_mt({rows_number}) ORDER BY -number LIMIT 3 OFFSET 1 SETTINGS max_threads = 1, partial_result_update_duration_ms = 1, max_rows_in_partial_result = 5",
support_partial_result=True,
)
# Request with OFFSET greater then partial result limit (partial result pipeline use blocks with less then OFFSET, so there will be no elements in block after LimitPartialResultTransform)
run_query_without_errors(
f"SELECT number FROM numbers_mt({rows_number}) ORDER BY -number LIMIT 3 OFFSET 15 SETTINGS max_threads = 1, partial_result_update_duration_ms = 1, max_rows_in_partial_result = 5",
support_partial_result=False,
)
if __name__ == "__main__":
main()

View File

@ -0,0 +1,38 @@
Rows 0 Columns 1
Column number type UInt64
Rows 3 Columns 1
Column number type UInt64
Rows 5 Columns 1
Column number type UInt64
20000000
19999999
19999998
19999997
19999996
Rows 0 Columns 1
Column number type UInt64
Rows 3 Columns 1
Column number type UInt64
Rows 3 Columns 1
Column number type UInt64
20000000
19999999
19999998
Rows 0 Columns 1
Column number type UInt64
Rows 3 Columns 1
Column number type UInt64
Rows 3 Columns 1
Column number type UInt64
19999999
19999998
19999997
Rows 0 Columns 1
Column number type UInt64
Rows 0 Columns 1
Column number type UInt64
Rows 3 Columns 1
Column number type UInt64
19999985
19999984
19999983

View File

@ -0,0 +1,8 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
# We should have correct env vars from shell_config.sh to run this test
python3 "$CURDIR"/02833_partial_sorting_result_during_query_execution.python

View File

@ -0,0 +1,129 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import sys
CURDIR = os.path.dirname(os.path.realpath(__file__))
sys.path.insert(0, os.path.join(CURDIR, "helpers"))
from tcp_client import TCPClient
def get_keys(results):
return [key for key, _ in results]
def check_new_result(new_results, old_results, invariants, rows_limit):
if rows_limit is not None:
assert (
len(new_results[0].value) <= rows_limit
), f"Result should have no more then {rows_limit} rows. But it has {len(new_results[0].value)} rows"
for new_result, old_result in zip(new_results, old_results):
assert (
new_result.key == old_result.key
), f"Keys in blocks should be in the same order. Full results keys {get_keys(full_results)}. Partial results keys {get_keys(partial_results)}"
key = new_result.key
if key in invariants:
new_value = new_result.value
old_value = old_result.value
assert invariants[key](
old_value, new_value
), f"Problem with the invariant between new and old result for key: {key}. New value {new_value}. Old value {old_value}"
def run_query_without_errors(
query, support_partial_result, invariants=None, rows_limit=None
):
if invariants is None:
invariants = {}
with TCPClient() as client:
client.sendQuery(query)
# external tables
client.sendEmptyBlock()
client.readHeader()
# Partial result
partial_results = client.readDataWithoutProgress()
if support_partial_result:
assert (
len(partial_results) > 0 and len(partial_results[0].value) > 0
), "Expected at least one block with a non-empty partial result before getting the full result"
while True:
new_partial_results = client.readDataWithoutProgress(
need_print_info=False
)
if len(new_partial_results[0].value) == 0:
break
check_new_result(
new_partial_results, partial_results, invariants, rows_limit
)
partial_results = new_partial_results
else:
block_rows = len(partial_results[0].value)
assert (
block_rows == 0
), f"Expected only empty partial result block before getting the full result, but block has {block_rows} rows"
# Full result
full_results = client.readDataWithoutProgress()
if support_partial_result:
check_new_result(full_results, partial_results, invariants, rows_limit)
for data in full_results:
if isinstance(data.value[0], int):
print(data.key, data.value)
def supported_scenarios_without_key():
rows_number = 2e7 + 1
# Simple aggregation query
query = f"select median(number), stddevSamp(number), stddevPop(number), max(number), min(number), any(number), count(number), avg(number), sum(number) from numbers_mt({rows_number}) settings max_threads = 1, partial_result_update_duration_ms = 1"
invariants = {
"median(number)": lambda old_value, new_value: old_value <= new_value,
"max(number)": lambda old_value, new_value: old_value <= new_value,
"min(number)": lambda old_value, new_value: old_value >= new_value,
"count(number)": lambda old_value, new_value: old_value <= new_value,
"avg(number)": lambda old_value, new_value: old_value <= new_value,
"sum(number)": lambda old_value, new_value: old_value <= new_value,
}
run_query_without_errors(
query, support_partial_result=True, invariants=invariants, rows_limit=1
)
# Aggregation query with a nested ORDER BY subquery
query = f"select median(number), stddevSamp(number), stddevPop(number), max(number), min(number), any(number), count(number), avg(number), sum(number) FROM (SELECT number FROM numbers_mt({rows_number}) ORDER BY -number LIMIT 3) settings max_threads = 1, partial_result_update_duration_ms=1"
# Aggregation receives small partial result blocks from ORDER BY which always sends blocks with bigger values
invariants["min(number)"] = lambda old_value, new_value: old_value <= new_value
run_query_without_errors(
query, support_partial_result=True, invariants=invariants, rows_limit=1
)
def unsupported_scenarios():
rows_number = 2e7 + 1
# Currently aggregator for partial result supports only single thread aggregation without key
# Update test when multithreading or aggregation with GROUP BY will be supported for partial result updates
multithread_query = f"select sum(number) from numbers_mt({rows_number}) settings max_threads = 2, partial_result_update_duration_ms = 100"
run_query_without_errors(multithread_query, support_partial_result=False)
group_with_key_query = f"select mod2, sum(number) from numbers_mt({rows_number}) group by number % 2 as mod2 settings max_threads = 1, partial_result_update_duration_ms = 100"
run_query_without_errors(group_with_key_query, support_partial_result=False)
def main():
supported_scenarios_without_key()
unsupported_scenarios()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,88 @@
Rows 0 Columns 9
Column median(number) type Float64
Column stddevSamp(number) type Float64
Column stddevPop(number) type Float64
Column max(number) type UInt64
Column min(number) type UInt64
Column any(number) type UInt64
Column count(number) type UInt64
Column avg(number) type Float64
Column sum(number) type UInt64
Rows 1 Columns 9
Column median(number) type Float64
Column stddevSamp(number) type Float64
Column stddevPop(number) type Float64
Column max(number) type UInt64
Column min(number) type UInt64
Column any(number) type UInt64
Column count(number) type UInt64
Column avg(number) type Float64
Column sum(number) type UInt64
Rows 1 Columns 9
Column median(number) type Float64
Column stddevSamp(number) type Float64
Column stddevPop(number) type Float64
Column max(number) type UInt64
Column min(number) type UInt64
Column any(number) type UInt64
Column count(number) type UInt64
Column avg(number) type Float64
Column sum(number) type UInt64
max(number) [20000000]
min(number) [0]
any(number) [0]
count(number) [20000001]
sum(number) [200000010000000]
Rows 0 Columns 9
Column median(number) type Float64
Column stddevSamp(number) type Float64
Column stddevPop(number) type Float64
Column max(number) type UInt64
Column min(number) type UInt64
Column any(number) type UInt64
Column count(number) type UInt64
Column avg(number) type Float64
Column sum(number) type UInt64
Rows 1 Columns 9
Column median(number) type Float64
Column stddevSamp(number) type Float64
Column stddevPop(number) type Float64
Column max(number) type UInt64
Column min(number) type UInt64
Column any(number) type UInt64
Column count(number) type UInt64
Column avg(number) type Float64
Column sum(number) type UInt64
Rows 1 Columns 9
Column median(number) type Float64
Column stddevSamp(number) type Float64
Column stddevPop(number) type Float64
Column max(number) type UInt64
Column min(number) type UInt64
Column any(number) type UInt64
Column count(number) type UInt64
Column avg(number) type Float64
Column sum(number) type UInt64
max(number) [20000000]
min(number) [19999998]
any(number) [20000000]
count(number) [3]
sum(number) [59999997]
Rows 0 Columns 1
Column sum(number) type UInt64
Rows 0 Columns 1
Column sum(number) type UInt64
Rows 1 Columns 1
Column sum(number) type UInt64
sum(number) [200000010000000]
Rows 0 Columns 2
Column mod2 type UInt8
Column sum(number) type UInt64
Rows 0 Columns 2
Column mod2 type UInt8
Column sum(number) type UInt64
Rows 2 Columns 2
Column mod2 type UInt8
Column sum(number) type UInt64
mod2 [0, 1]
sum(number) [100000010000000, 100000000000000]

View File

@ -0,0 +1,8 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
# We should have correct env vars from shell_config.sh to run this test
python3 "$CURDIR"/02834_partial_aggregating_result_during_query_execution.python

View File

@ -0,0 +1,313 @@
import socket
import os
import uuid
import struct
CLICKHOUSE_HOST = os.environ.get("CLICKHOUSE_HOST", "127.0.0.1")
CLICKHOUSE_PORT = int(os.environ.get("CLICKHOUSE_PORT_TCP", "900000"))
CLICKHOUSE_DATABASE = os.environ.get("CLICKHOUSE_DATABASE", "default")
CLIENT_NAME = "simple native protocol"
def writeVarUInt(x, ba):
for _ in range(0, 9):
byte = x & 0x7F
if x > 0x7F:
byte |= 0x80
ba.append(byte)
x >>= 7
if x == 0:
return
def writeStringBinary(s, ba):
b = bytes(s, "utf-8")
writeVarUInt(len(s), ba)
ba.extend(b)
def serializeClientInfo(ba, query_id):
writeStringBinary("default", ba) # initial_user
writeStringBinary(query_id, ba) # initial_query_id
writeStringBinary("127.0.0.1:9000", ba) # initial_address
ba.extend([0] * 8) # initial_query_start_time_microseconds
ba.append(1) # TCP
writeStringBinary("os_user", ba) # os_user
writeStringBinary("client_hostname", ba) # client_hostname
writeStringBinary(CLIENT_NAME, ba) # client_name
writeVarUInt(21, ba)
writeVarUInt(9, ba)
writeVarUInt(54449, ba)
writeStringBinary("", ba) # quota_key
writeVarUInt(0, ba) # distributed_depth
writeVarUInt(1, ba) # client_version_patch
ba.append(0) # No telemetry
def serializeBlockInfo(ba):
writeVarUInt(1, ba) # 1
ba.append(0) # is_overflows
writeVarUInt(2, ba) # 2
writeVarUInt(0, ba) # 0
ba.extend([0] * 4) # bucket_num
def assertPacket(packet, expected):
assert packet == expected, "Got: {}, expected: {}".format(packet, expected)
class Data(object):
def __init__(self, key, value):
self.key = key
self.value = value
class TCPClient(object):
def __init__(self, timeout=30):
self.timeout = timeout
self.socket = None
def __enter__(self):
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.settimeout(self.timeout)
self.socket.connect((CLICKHOUSE_HOST, CLICKHOUSE_PORT))
self.sendHello()
self.receiveHello()
return self
def __exit__(self, exc_type, exc_value, traceback):
if self.socket:
self.socket.close()
def readStrict(self, size=1):
res = bytearray()
while size:
cur = self.socket.recv(size)
# if not res:
# raise "Socket is closed"
size -= len(cur)
res.extend(cur)
return res
def readUInt(self, size=1):
res = self.readStrict(size)
val = 0
for i in range(len(res)):
val += res[i] << (i * 8)
return val
def readUInt8(self):
return self.readUInt()
def readUInt16(self):
return self.readUInt(2)
def readUInt32(self):
return self.readUInt(4)
def readUInt64(self):
return self.readUInt(8)
def readFloat16(self):
return struct.unpack("e", self.readStrict(2))
def readFloat32(self):
return struct.unpack("f", self.readStrict(4))
def readFloat64(self):
return struct.unpack("d", self.readStrict(8))
def readVarUInt(self):
x = 0
for i in range(9):
byte = self.readStrict()[0]
x |= (byte & 0x7F) << (7 * i)
if not byte & 0x80:
return x
return x
def readStringBinary(self):
size = self.readVarUInt()
s = self.readStrict(size)
return s.decode("utf-8")
def send(self, byte_array):
self.socket.sendall(byte_array)
def sendHello(self):
ba = bytearray()
writeVarUInt(0, ba) # Hello
writeStringBinary(CLIENT_NAME, ba)
writeVarUInt(21, ba)
writeVarUInt(9, ba)
writeVarUInt(54449, ba)
writeStringBinary(CLICKHOUSE_DATABASE, ba) # database
writeStringBinary("default", ba) # user
writeStringBinary("", ba) # pwd
self.send(ba)
def receiveHello(self):
p_type = self.readVarUInt()
assert p_type == 0 # Hello
_server_name = self.readStringBinary()
_server_version_major = self.readVarUInt()
_server_version_minor = self.readVarUInt()
_server_revision = self.readVarUInt()
_server_timezone = self.readStringBinary()
_server_display_name = self.readStringBinary()
_server_version_patch = self.readVarUInt()
def sendQuery(self, query, settings=None):
if settings == None:
settings = {} # No settings
ba = bytearray()
query_id = uuid.uuid4().hex
writeVarUInt(1, ba) # query
writeStringBinary(query_id, ba)
ba.append(1) # INITIAL_QUERY
# client info
serializeClientInfo(ba, query_id)
# Settings
for key, value in settings.items():
writeStringBinary(key, ba)
writeVarUInt(1, ba) # is_important
writeStringBinary(str(value), ba)
writeStringBinary("", ba) # End of settings
writeStringBinary("", ba) # No interserver secret
writeVarUInt(2, ba) # Stage - Complete
ba.append(0) # No compression
writeStringBinary(query, ba) # query, finally
self.send(ba)
def sendEmptyBlock(self):
ba = bytearray()
writeVarUInt(2, ba) # Data
writeStringBinary("", ba)
serializeBlockInfo(ba)
writeVarUInt(0, ba) # rows
writeVarUInt(0, ba) # columns
self.send(ba)
def readException(self):
code = self.readUInt32()
_name = self.readStringBinary()
text = self.readStringBinary()
self.readStringBinary() # trace
assertPacket(self.readUInt8(), 0) # has_nested
return "code {}: {}".format(code, text.replace("DB::Exception:", ""))
def readPacketType(self):
packet_type = self.readVarUInt()
if packet_type == 2: # Exception
raise RuntimeError(self.readException())
return packet_type
def readResponse(self):
packet_type = self.readPacketType()
if packet_type == 1: # Data
return None
if packet_type == 3: # Progress
return None
if packet_type == 5: # End stream
return None
raise RuntimeError("Unexpected packet: {}".format(packet_type))
def readProgressData(self):
read_rows = self.readVarUInt()
read_bytes = self.readVarUInt()
total_rows_to_read = self.readVarUInt()
written_rows = self.readVarUInt()
written_bytes = self.readVarUInt()
return read_rows, read_bytes, total_rows_to_read, written_rows, written_bytes
def readProgress(self):
packet_type = self.readPacketType()
if packet_type == 5: # End stream
return None
assertPacket(packet_type, 3) # Progress
return self.readProgressData()
def readHeaderInfo(self):
self.readStringBinary() # external table name
# BlockInfo
assertPacket(self.readVarUInt(), 1) # field number 1
assertPacket(self.readUInt8(), 0) # is_overflows
assertPacket(self.readVarUInt(), 2) # field number 2
assertPacket(self.readUInt32(), 4294967295) # bucket_num
assertPacket(self.readVarUInt(), 0) # 0
columns = self.readVarUInt() # rows
rows = self.readVarUInt() # columns
return columns, rows
def readHeader(self):
packet_type = self.readPacketType()
assertPacket(packet_type, 1) # Data
columns, rows = self.readHeaderInfo()
print("Rows {} Columns {}".format(rows, columns))
for _ in range(columns):
col_name = self.readStringBinary()
type_name = self.readStringBinary()
print("Column {} type {}".format(col_name, type_name))
def readRow(self, row_type, rows):
supported_row_types = {
"UInt8": self.readUInt8,
"UInt16": self.readUInt16,
"UInt32": self.readUInt32,
"UInt64": self.readUInt64,
"Float16": self.readFloat16,
"Float32": self.readFloat32,
"Float64": self.readFloat64,
}
if row_type in supported_row_types:
read_type = supported_row_types[row_type]
row = [read_type() for _ in range(rows)]
return row
else:
raise RuntimeError(
"Current python version of tcp client doesn't support the following type of row: {}".format(
row_type
)
)
def readDataWithoutProgress(self, need_print_info=True):
packet_type = self.readPacketType()
while packet_type == 3: # Progress
self.readProgressData()
packet_type = self.readPacketType()
if packet_type == 5: # End stream
return None
assertPacket(packet_type, 1) # Data
columns, rows = self.readHeaderInfo()
data = []
if need_print_info:
print("Rows {} Columns {}".format(rows, columns))
for _ in range(columns):
col_name = self.readStringBinary()
type_name = self.readStringBinary()
if need_print_info:
print("Column {} type {}".format(col_name, type_name))
data.append(Data(col_name, self.readRow(type_name, rows)))
return data