diff --git a/docs/en/operations/settings/settings.md b/docs/en/operations/settings/settings.md index 66b8c923d5c..a1989ab3d13 100644 --- a/docs/en/operations/settings/settings.md +++ b/docs/en/operations/settings/settings.md @@ -4644,6 +4644,14 @@ SELECT toFloat64('1.7091'), toFloat64('1.5008753E7') SETTINGS precise_float_pars └─────────────────────┴──────────────────────────┘ ``` +## partial_result_update_duration_ms + +Interval (in milliseconds) for sending updates with partial data about the result table to the client (in interactive mode) during query execution. Setting to 0 disables partial results. Only supported for single-threaded GROUP BY without key, ORDER BY, LIMIT and OFFSET. + +## max_rows_in_partial_result + +Maximum rows to show in the partial result after every real-time update while the query runs (use partial result limit + OFFSET as a value in case of OFFSET in the query). + ## validate_tcp_client_information {#validate-tcp-client-information} Determines whether validation of client information enabled when query packet is received from a client using a TCP connection. diff --git a/src/Client/ClientBase.cpp b/src/Client/ClientBase.cpp index 859665c64b3..cc60095884e 100644 --- a/src/Client/ClientBase.cpp +++ b/src/Client/ClientBase.cpp @@ -441,7 +441,20 @@ void ClientBase::onData(Block & block, ASTPtr parsed_query) if (!block) return; - processed_rows += block.rows(); + if (block.rows() == 0 && partial_result_mode == PartialResultMode::Active) + { + partial_result_mode = PartialResultMode::Inactive; + if (is_interactive) + { + progress_indication.clearProgressOutput(*tty_buf); + std::cout << "Full result:" << std::endl; + progress_indication.writeProgress(*tty_buf); + } + } + + if (partial_result_mode == PartialResultMode::Inactive) + processed_rows += block.rows(); + /// Even if all blocks are empty, we still need to initialize the output stream to write empty resultset. initOutputFormat(block, parsed_query); @@ -451,13 +464,20 @@ void ClientBase::onData(Block & block, ASTPtr parsed_query) if (block.rows() == 0 || (query_fuzzer_runs != 0 && processed_rows >= 100)) return; + if (!is_interactive && partial_result_mode == PartialResultMode::Active) + return; + /// If results are written INTO OUTFILE, we can avoid clearing progress to avoid flicker. if (need_render_progress && tty_buf && (!select_into_file || select_into_file_and_stdout)) progress_indication.clearProgressOutput(*tty_buf); try { - output_format->write(materializeBlock(block)); + if (partial_result_mode == PartialResultMode::Active) + output_format->writePartialResult(materializeBlock(block)); + else + output_format->write(materializeBlock(block)); + written_first_block = true; } catch (const Exception &) @@ -521,6 +541,9 @@ void ClientBase::onProfileInfo(const ProfileInfo & profile_info) void ClientBase::initOutputFormat(const Block & block, ASTPtr parsed_query) try { + if (partial_result_mode == PartialResultMode::NotInit) + partial_result_mode = PartialResultMode::Active; + if (!output_format) { /// Ignore all results when fuzzing as they can be huge. @@ -931,6 +954,14 @@ void ClientBase::processOrdinaryQuery(const String & query_to_execute, ASTPtr pa const auto & settings = global_context->getSettingsRef(); const Int32 signals_before_stop = settings.partial_result_on_first_cancel ? 2 : 1; + bool has_partial_result_setting = settings.partial_result_update_duration_ms.totalMilliseconds() > 0; + + if (has_partial_result_setting) + { + partial_result_mode = PartialResultMode::NotInit; + if (is_interactive) + std::cout << "Partial result:" << std::endl; + } int retries_left = 10; while (retries_left) @@ -1736,6 +1767,7 @@ void ClientBase::processParsedSingleQuery(const String & full_query, const Strin } processed_rows = 0; + partial_result_mode = PartialResultMode::Inactive; written_first_block = false; progress_indication.resetProgress(); profile_events.watch.restart(); diff --git a/src/Client/ClientBase.h b/src/Client/ClientBase.h index eabc79b7432..e2394259f85 100644 --- a/src/Client/ClientBase.h +++ b/src/Client/ClientBase.h @@ -272,6 +272,21 @@ protected: size_t processed_rows = 0; /// How many rows have been read or written. bool print_num_processed_rows = false; /// Whether to print the number of processed rows at + enum class PartialResultMode: UInt8 + { + /// Query doesn't show partial result before the first block with 0 rows. + /// The first block with 0 rows initializes the output table format using its header. + NotInit, + + /// Query shows partial result after the first and before the second block with 0 rows. + /// The second block with 0 rows indicates that that receiving blocks with partial result has been completed and next blocks will be with the full result. + Active, + + /// Query doesn't show partial result at all. + Inactive, + }; + PartialResultMode partial_result_mode = PartialResultMode::Inactive; + bool print_stack_trace = false; /// The last exception that was received from the server. Is used for the /// return code in batch mode. diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 620cc8fd67f..719def2a649 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -309,6 +309,9 @@ class IColumn; \ M(Bool, partial_result_on_first_cancel, false, "Allows query to return a partial result after cancel.", 0) \ \ + M(Milliseconds, partial_result_update_duration_ms, 0, "Interval (in milliseconds) for sending updates with partial data about the result table to the client (in interactive mode) during query execution. Setting to 0 disables partial results. Only supported for single-threaded GROUP BY without key, ORDER BY, LIMIT and OFFSET.", 0) \ + M(UInt64, max_rows_in_partial_result, 10, "Maximum rows to show in the partial result after every real-time update while the query runs (use partial result limit + OFFSET as a value in case of OFFSET in the query).", 0) \ + \ M(Bool, ignore_on_cluster_for_replicated_udf_queries, false, "Ignore ON CLUSTER clause for replicated UDF management queries.", 0) \ M(Bool, ignore_on_cluster_for_replicated_access_entities_queries, false, "Ignore ON CLUSTER clause for replicated access entities management queries.", 0) \ /** Settings for testing hedged requests */ \ diff --git a/src/Interpreters/Aggregator.cpp b/src/Interpreters/Aggregator.cpp index 9249c3ce4ce..23ee097ebff 100644 --- a/src/Interpreters/Aggregator.cpp +++ b/src/Interpreters/Aggregator.cpp @@ -2272,6 +2272,29 @@ Block Aggregator::prepareBlockAndFillWithoutKey(AggregatedDataVariants & data_va return block; } +Block Aggregator::prepareBlockAndFillWithoutKeySnapshot(AggregatedDataVariants & data_variants) const +{ + size_t rows = 1; + bool final = true; + + auto && out_cols + = prepareOutputBlockColumns(params, aggregate_functions, getHeader(final), data_variants.aggregates_pools, final, rows); + auto && [key_columns, raw_key_columns, aggregate_columns, final_aggregate_columns, aggregate_columns_data] = out_cols; + + AggregatedDataWithoutKey & data = data_variants.without_key; + + /// Always single-thread. It's safe to pass current arena from 'aggregates_pool'. + for (size_t insert_i = 0; insert_i < params.aggregates_size; ++insert_i) + aggregate_functions[insert_i]->insertResultInto( + data + offsets_of_aggregate_states[insert_i], + *final_aggregate_columns[insert_i], + data_variants.aggregates_pool); + + Block block = finalizeBlock(params, getHeader(final), std::move(out_cols), final, rows); + + return block; +} + template Aggregator::ConvertToBlockRes Aggregator::prepareBlockAndFillSingleLevel(AggregatedDataVariants & data_variants, bool final) const diff --git a/src/Interpreters/Aggregator.h b/src/Interpreters/Aggregator.h index 4f2c86606c5..6bfaa76f9b3 100644 --- a/src/Interpreters/Aggregator.h +++ b/src/Interpreters/Aggregator.h @@ -1210,6 +1210,7 @@ private: friend class ConvertingAggregatedToChunksSource; friend class ConvertingAggregatedToChunksWithMergingSource; friend class AggregatingInOrderTransform; + friend class AggregatingPartialResultTransform; /// Data structure of source blocks. Block header; @@ -1391,6 +1392,7 @@ private: std::atomic * is_cancelled = nullptr) const; Block prepareBlockAndFillWithoutKey(AggregatedDataVariants & data_variants, bool final, bool is_overflows) const; + Block prepareBlockAndFillWithoutKeySnapshot(AggregatedDataVariants & data_variants) const; BlocksList prepareBlocksAndFillTwoLevel(AggregatedDataVariants & data_variants, bool final, ThreadPool * thread_pool) const; template diff --git a/src/Processors/Chunk.cpp b/src/Processors/Chunk.cpp index 3839a8963b2..cd442085eca 100644 --- a/src/Processors/Chunk.cpp +++ b/src/Processors/Chunk.cpp @@ -14,7 +14,8 @@ namespace ErrorCodes extern const int POSITION_OUT_OF_BOUND; } -Chunk::Chunk(DB::Columns columns_, UInt64 num_rows_) : columns(std::move(columns_)), num_rows(num_rows_) +Chunk::Chunk(DB::Columns columns_, UInt64 num_rows_) + : columns(std::move(columns_)), num_rows(num_rows_) { checkNumRowsIsConsistent(); } diff --git a/src/Processors/Executors/CompletedPipelineExecutor.cpp b/src/Processors/Executors/CompletedPipelineExecutor.cpp index 598a51bf0c7..c30586e194e 100644 --- a/src/Processors/Executors/CompletedPipelineExecutor.cpp +++ b/src/Processors/Executors/CompletedPipelineExecutor.cpp @@ -75,7 +75,7 @@ void CompletedPipelineExecutor::execute() if (interactive_timeout_ms) { data = std::make_unique(); - data->executor = std::make_shared(pipeline.processors, pipeline.process_list_element); + data->executor = std::make_shared(pipeline.processors, pipeline.process_list_element, pipeline.partial_result_duration_ms); data->executor->setReadProgressCallback(pipeline.getReadProgressCallback()); /// Avoid passing this to lambda, copy ptr to data instead. @@ -105,7 +105,7 @@ void CompletedPipelineExecutor::execute() } else { - PipelineExecutor executor(pipeline.processors, pipeline.process_list_element); + PipelineExecutor executor(pipeline.processors, pipeline.process_list_element, pipeline.partial_result_duration_ms); executor.setReadProgressCallback(pipeline.getReadProgressCallback()); executor.execute(pipeline.getNumThreads(), pipeline.getConcurrencyControl()); } diff --git a/src/Processors/Executors/ExecutingGraph.cpp b/src/Processors/Executors/ExecutingGraph.cpp index 27f6a454b24..6a946b4a4b9 100644 --- a/src/Processors/Executors/ExecutingGraph.cpp +++ b/src/Processors/Executors/ExecutingGraph.cpp @@ -260,7 +260,6 @@ bool ExecutingGraph::updateNode(uint64_t pid, Queue & queue, Queue & async_queue { pid = updated_processors.top(); updated_processors.pop(); - /// In this method we have ownership on node. auto & node = *nodes[pid]; diff --git a/src/Processors/Executors/ExecutionThreadContext.h b/src/Processors/Executors/ExecutionThreadContext.h index eb048f8ab09..85788a70771 100644 --- a/src/Processors/Executors/ExecutionThreadContext.h +++ b/src/Processors/Executors/ExecutionThreadContext.h @@ -30,6 +30,12 @@ private: /// Callback for read progress. ReadProgressCallback * read_progress_callback = nullptr; + /// Timer that stops optimization of running local tasks instead of queuing them. + /// It provides local progress for each IProcessor task, allowing the partial result of the request to be always sended to the user. + Stopwatch watch; + /// Time period that limits the maximum allowed duration for optimizing the scheduling of local tasks within the executor + const UInt64 partial_result_duration_ms; + public: #ifndef NDEBUG /// Time for different processing stages. @@ -62,8 +68,13 @@ public: void setException(std::exception_ptr exception_) { exception = exception_; } void rethrowExceptionIfHas(); - explicit ExecutionThreadContext(size_t thread_number_, bool profile_processors_, bool trace_processors_, ReadProgressCallback * callback) + bool needWatchRestartForPartialResultProgress() { return partial_result_duration_ms != 0 && partial_result_duration_ms < watch.elapsedMilliseconds(); } + void restartWatch() { watch.restart(); } + + explicit ExecutionThreadContext(size_t thread_number_, bool profile_processors_, bool trace_processors_, ReadProgressCallback * callback, UInt64 partial_result_duration_ms_) : read_progress_callback(callback) + , watch(CLOCK_MONOTONIC) + , partial_result_duration_ms(partial_result_duration_ms_) , thread_number(thread_number_) , profile_processors(profile_processors_) , trace_processors(trace_processors_) diff --git a/src/Processors/Executors/ExecutorTasks.cpp b/src/Processors/Executors/ExecutorTasks.cpp index e61d225a968..08920592391 100644 --- a/src/Processors/Executors/ExecutorTasks.cpp +++ b/src/Processors/Executors/ExecutorTasks.cpp @@ -108,8 +108,15 @@ void ExecutorTasks::pushTasks(Queue & queue, Queue & async_queue, ExecutionThrea { context.setTask(nullptr); - /// Take local task from queue if has one. - if (!queue.empty() && !context.hasAsyncTasks()) + /// If sending partial results is allowed and local tasks scheduling optimization is repeated longer than the limit + /// or new task need to send partial result later, skip optimization for this iteration. + /// Otherwise take local task from queue if has one. + if ((!queue.empty() && queue.front()->processor->isPartialResultProcessor()) + || context.needWatchRestartForPartialResultProgress()) + { + context.restartWatch(); + } + else if (!queue.empty() && !context.hasAsyncTasks()) { context.setTask(queue.front()); queue.pop(); @@ -139,7 +146,7 @@ void ExecutorTasks::pushTasks(Queue & queue, Queue & async_queue, ExecutionThrea } } -void ExecutorTasks::init(size_t num_threads_, size_t use_threads_, bool profile_processors, bool trace_processors, ReadProgressCallback * callback) +void ExecutorTasks::init(size_t num_threads_, size_t use_threads_, bool profile_processors, bool trace_processors, ReadProgressCallback * callback, UInt64 partial_result_duration_ms) { num_threads = num_threads_; use_threads = use_threads_; @@ -151,7 +158,7 @@ void ExecutorTasks::init(size_t num_threads_, size_t use_threads_, bool profile_ executor_contexts.reserve(num_threads); for (size_t i = 0; i < num_threads; ++i) - executor_contexts.emplace_back(std::make_unique(i, profile_processors, trace_processors, callback)); + executor_contexts.emplace_back(std::make_unique(i, profile_processors, trace_processors, callback, partial_result_duration_ms)); } } diff --git a/src/Processors/Executors/ExecutorTasks.h b/src/Processors/Executors/ExecutorTasks.h index d35f8de94d1..ab6d5e91411 100644 --- a/src/Processors/Executors/ExecutorTasks.h +++ b/src/Processors/Executors/ExecutorTasks.h @@ -58,7 +58,7 @@ public: void tryGetTask(ExecutionThreadContext & context); void pushTasks(Queue & queue, Queue & async_queue, ExecutionThreadContext & context); - void init(size_t num_threads_, size_t use_threads_, bool profile_processors, bool trace_processors, ReadProgressCallback * callback); + void init(size_t num_threads_, size_t use_threads_, bool profile_processors, bool trace_processors, ReadProgressCallback * callback, UInt64 partial_result_duration_ms); void fill(Queue & queue); void upscale(size_t use_threads_); diff --git a/src/Processors/Executors/PipelineExecutor.cpp b/src/Processors/Executors/PipelineExecutor.cpp index 37af391fba3..77779e2cec2 100644 --- a/src/Processors/Executors/PipelineExecutor.cpp +++ b/src/Processors/Executors/PipelineExecutor.cpp @@ -33,8 +33,9 @@ namespace ErrorCodes } -PipelineExecutor::PipelineExecutor(std::shared_ptr & processors, QueryStatusPtr elem) +PipelineExecutor::PipelineExecutor(std::shared_ptr & processors, QueryStatusPtr elem, UInt64 partial_result_duration_ms_) : process_list_element(std::move(elem)) + , partial_result_duration_ms(partial_result_duration_ms_) { if (process_list_element) { @@ -328,7 +329,7 @@ void PipelineExecutor::initializeExecution(size_t num_threads, bool concurrency_ Queue queue; graph->initializeExecution(queue); - tasks.init(num_threads, use_threads, profile_processors, trace_processors, read_progress_callback.get()); + tasks.init(num_threads, use_threads, profile_processors, trace_processors, read_progress_callback.get(), partial_result_duration_ms); tasks.fill(queue); if (num_threads > 1) diff --git a/src/Processors/Executors/PipelineExecutor.h b/src/Processors/Executors/PipelineExecutor.h index dee12dad282..6cb0e6c4ac1 100644 --- a/src/Processors/Executors/PipelineExecutor.h +++ b/src/Processors/Executors/PipelineExecutor.h @@ -33,7 +33,7 @@ public: /// During pipeline execution new processors can appear. They will be added to existing set. /// /// Explicit graph representation is built in constructor. Throws if graph is not correct. - explicit PipelineExecutor(std::shared_ptr & processors, QueryStatusPtr elem); + explicit PipelineExecutor(std::shared_ptr & processors, QueryStatusPtr elem, UInt64 partial_result_duration_ms_ = 0); ~PipelineExecutor(); /// Execute pipeline in multiple threads. Must be called once. @@ -90,6 +90,9 @@ private: ReadProgressCallbackPtr read_progress_callback; + /// Duration between sending partial result through the pipeline + const UInt64 partial_result_duration_ms; + using Queue = std::queue; void initializeExecution(size_t num_threads, bool concurrency_control); /// Initialize executor contexts and task_queue. diff --git a/src/Processors/Executors/PullingAsyncPipelineExecutor.cpp b/src/Processors/Executors/PullingAsyncPipelineExecutor.cpp index 345bec395b2..95a2022bf93 100644 --- a/src/Processors/Executors/PullingAsyncPipelineExecutor.cpp +++ b/src/Processors/Executors/PullingAsyncPipelineExecutor.cpp @@ -41,12 +41,13 @@ struct PullingAsyncPipelineExecutor::Data } }; -PullingAsyncPipelineExecutor::PullingAsyncPipelineExecutor(QueryPipeline & pipeline_) : pipeline(pipeline_) +PullingAsyncPipelineExecutor::PullingAsyncPipelineExecutor(QueryPipeline & pipeline_, bool has_partial_result_setting) : pipeline(pipeline_) { if (!pipeline.pulling()) throw Exception(ErrorCodes::LOGICAL_ERROR, "Pipeline for PullingAsyncPipelineExecutor must be pulling"); - lazy_format = std::make_shared(pipeline.output->getHeader()); + lazy_format = std::make_shared(pipeline.output->getHeader(), /*is_partial_result_protocol_active*/ has_partial_result_setting); + pipeline.complete(lazy_format); } @@ -103,7 +104,7 @@ bool PullingAsyncPipelineExecutor::pull(Chunk & chunk, uint64_t milliseconds) if (!data) { data = std::make_unique(); - data->executor = std::make_shared(pipeline.processors, pipeline.process_list_element); + data->executor = std::make_shared(pipeline.processors, pipeline.process_list_element, pipeline.partial_result_duration_ms); data->executor->setReadProgressCallback(pipeline.getReadProgressCallback()); data->lazy_format = lazy_format.get(); diff --git a/src/Processors/Executors/PullingAsyncPipelineExecutor.h b/src/Processors/Executors/PullingAsyncPipelineExecutor.h index 361bcc0155c..202ecbf281b 100644 --- a/src/Processors/Executors/PullingAsyncPipelineExecutor.h +++ b/src/Processors/Executors/PullingAsyncPipelineExecutor.h @@ -21,7 +21,7 @@ struct ProfileInfo; class PullingAsyncPipelineExecutor { public: - explicit PullingAsyncPipelineExecutor(QueryPipeline & pipeline_); + explicit PullingAsyncPipelineExecutor(QueryPipeline & pipeline_, bool has_partial_result_setting = false); ~PullingAsyncPipelineExecutor(); /// Get structure of returned block or chunk. diff --git a/src/Processors/Executors/PullingPipelineExecutor.cpp b/src/Processors/Executors/PullingPipelineExecutor.cpp index cbf73c5cb07..f79f15c19bf 100644 --- a/src/Processors/Executors/PullingPipelineExecutor.cpp +++ b/src/Processors/Executors/PullingPipelineExecutor.cpp @@ -44,7 +44,7 @@ bool PullingPipelineExecutor::pull(Chunk & chunk) { if (!executor) { - executor = std::make_shared(pipeline.processors, pipeline.process_list_element); + executor = std::make_shared(pipeline.processors, pipeline.process_list_element, pipeline.partial_result_duration_ms); executor->setReadProgressCallback(pipeline.getReadProgressCallback()); } diff --git a/src/Processors/Executors/PushingAsyncPipelineExecutor.cpp b/src/Processors/Executors/PushingAsyncPipelineExecutor.cpp index a816ab9ca7f..f3ed24e7e96 100644 --- a/src/Processors/Executors/PushingAsyncPipelineExecutor.cpp +++ b/src/Processors/Executors/PushingAsyncPipelineExecutor.cpp @@ -167,7 +167,7 @@ void PushingAsyncPipelineExecutor::start() started = true; data = std::make_unique(); - data->executor = std::make_shared(pipeline.processors, pipeline.process_list_element); + data->executor = std::make_shared(pipeline.processors, pipeline.process_list_element, pipeline.partial_result_duration_ms); data->executor->setReadProgressCallback(pipeline.getReadProgressCallback()); data->source = pushing_source.get(); diff --git a/src/Processors/Executors/PushingPipelineExecutor.cpp b/src/Processors/Executors/PushingPipelineExecutor.cpp index 696932932df..f2b018792c7 100644 --- a/src/Processors/Executors/PushingPipelineExecutor.cpp +++ b/src/Processors/Executors/PushingPipelineExecutor.cpp @@ -87,7 +87,7 @@ void PushingPipelineExecutor::start() return; started = true; - executor = std::make_shared(pipeline.processors, pipeline.process_list_element); + executor = std::make_shared(pipeline.processors, pipeline.process_list_element, pipeline.partial_result_duration_ms); executor->setReadProgressCallback(pipeline.getReadProgressCallback()); if (!executor->executeStep(&input_wait_flag)) diff --git a/src/Processors/Formats/IOutputFormat.cpp b/src/Processors/Formats/IOutputFormat.cpp index 88a6fb1e92f..e691e32a7bc 100644 --- a/src/Processors/Formats/IOutputFormat.cpp +++ b/src/Processors/Formats/IOutputFormat.cpp @@ -1,40 +1,89 @@ #include #include +#include namespace DB { -IOutputFormat::IOutputFormat(const Block & header_, WriteBuffer & out_) - : IProcessor({header_, header_, header_}, {}), out(out_) +IOutputFormat::IOutputFormat(const Block & header_, WriteBuffer & out_, bool is_partial_result_protocol_active_) + : IProcessor({header_, header_, header_, header_}, {}) + , out(out_) + , is_partial_result_protocol_active(is_partial_result_protocol_active_) { } +void IOutputFormat::setCurrentChunk(InputPort & input, PortKind kind) +{ + current_chunk = input.pull(true); + current_block_kind = kind; + has_input = true; +} + +IOutputFormat::Status IOutputFormat::prepareMainAndPartialResult() +{ + bool need_data = false; + for (auto kind : {Main, PartialResult}) + { + auto & input = getPort(kind); + + if (input.isFinished()) + continue; + + if (kind == PartialResult && main_input_activated) + { + input.close(); + continue; + } + + input.setNeeded(); + need_data = true; + + if (!input.hasData()) + continue; + + setCurrentChunk(input, kind); + return Status::Ready; + } + + if (need_data) + return Status::NeedData; + + return Status::Finished; +} + +IOutputFormat::Status IOutputFormat::prepareTotalsAndExtremes() +{ + for (auto kind : {Totals, Extremes}) + { + auto & input = getPort(kind); + + if (!input.isConnected() || input.isFinished()) + continue; + + input.setNeeded(); + if (!input.hasData()) + return Status::NeedData; + + setCurrentChunk(input, kind); + return Status::Ready; + } + + return Status::Finished; +} + IOutputFormat::Status IOutputFormat::prepare() { if (has_input) return Status::Ready; - for (auto kind : {Main, Totals, Extremes}) - { - auto & input = getPort(kind); + auto status = prepareMainAndPartialResult(); + if (status != Status::Finished) + return status; - if (kind != Main && !input.isConnected()) - continue; - - if (input.isFinished()) - continue; - - input.setNeeded(); - - if (!input.hasData()) - return Status::NeedData; - - current_chunk = input.pull(true); - current_block_kind = kind; - has_input = true; - return Status::Ready; - } + status = prepareTotalsAndExtremes(); + if (status != Status::Finished) + return status; finished = true; @@ -83,8 +132,18 @@ void IOutputFormat::work() case Main: result_rows += current_chunk.getNumRows(); result_bytes += current_chunk.allocatedBytes(); + if (is_partial_result_protocol_active && !main_input_activated && current_chunk.hasRows()) + { + /// Sending an empty block signals to the client that partial results are terminated, + /// and only data from the main pipeline will be forwarded. + consume(Chunk(current_chunk.cloneEmptyColumns(), 0)); + main_input_activated = true; + } consume(std::move(current_chunk)); break; + case PartialResult: + consumePartialResult(std::move(current_chunk)); + break; case Totals: writeSuffixIfNeeded(); if (auto totals = prepareTotals(std::move(current_chunk))) @@ -119,6 +178,15 @@ void IOutputFormat::write(const Block & block) flush(); } +void IOutputFormat::writePartialResult(const Block & block) +{ + writePrefixIfNeeded(); + consumePartialResult(Chunk(block.getColumns(), block.rows())); + + if (auto_flush) + flush(); +} + void IOutputFormat::finalize() { if (finalized) diff --git a/src/Processors/Formats/IOutputFormat.h b/src/Processors/Formats/IOutputFormat.h index 58700a978ff..470d24e9a22 100644 --- a/src/Processors/Formats/IOutputFormat.h +++ b/src/Processors/Formats/IOutputFormat.h @@ -23,9 +23,9 @@ class WriteBuffer; class IOutputFormat : public IProcessor { public: - enum PortKind { Main = 0, Totals = 1, Extremes = 2 }; + enum PortKind { Main = 0, Totals = 1, Extremes = 2, PartialResult = 3 }; - IOutputFormat(const Block & header_, WriteBuffer & out_); + IOutputFormat(const Block & header_, WriteBuffer & out_, bool is_partial_result_protocol_active_ = false); Status prepare() override; void work() override; @@ -54,6 +54,7 @@ public: /// TODO: separate formats and processors. void write(const Block & block); + void writePartialResult(const Block & block); void finalize(); @@ -118,6 +119,7 @@ protected: virtual void consume(Chunk) = 0; virtual void consumeTotals(Chunk) {} virtual void consumeExtremes(Chunk) {} + virtual void consumePartialResult(Chunk) {} virtual void finalizeImpl() {} virtual void finalizeBuffers() {} virtual void writePrefix() {} @@ -166,6 +168,7 @@ protected: Chunk current_chunk; PortKind current_block_kind = PortKind::Main; + bool main_input_activated = false; bool has_input = false; bool finished = false; bool finalized = false; @@ -180,9 +183,15 @@ protected: Statistics statistics; private: + void setCurrentChunk(InputPort & input, PortKind kind); + IOutputFormat::Status prepareMainAndPartialResult(); + IOutputFormat::Status prepareTotalsAndExtremes(); + size_t rows_read_before = 0; bool are_totals_written = false; + bool is_partial_result_protocol_active = false; + /// Counters for consumed chunks. Are used for QueryLog. size_t result_rows = 0; size_t result_bytes = 0; diff --git a/src/Processors/Formats/Impl/PrettyBlockOutputFormat.cpp b/src/Processors/Formats/Impl/PrettyBlockOutputFormat.cpp index 14648e68f94..6fa891297f6 100644 --- a/src/Processors/Formats/Impl/PrettyBlockOutputFormat.cpp +++ b/src/Processors/Formats/Impl/PrettyBlockOutputFormat.cpp @@ -134,7 +134,8 @@ void PrettyBlockOutputFormat::write(Chunk chunk, PortKind port_kind) { if (total_rows >= format_settings.pretty.max_rows) { - total_rows += chunk.getNumRows(); + if (port_kind != PortKind::PartialResult) + total_rows += chunk.getNumRows(); return; } if (mono_block) @@ -315,7 +316,8 @@ void PrettyBlockOutputFormat::writeChunk(const Chunk & chunk, PortKind port_kind } writeString(bottom_separator_s, out); - total_rows += num_rows; + if (port_kind != PortKind::PartialResult) + total_rows += num_rows; } @@ -388,6 +390,34 @@ void PrettyBlockOutputFormat::consumeExtremes(Chunk chunk) write(std::move(chunk), PortKind::Extremes); } +void PrettyBlockOutputFormat::clearLastLines(size_t lines_number) +{ + /// http://en.wikipedia.org/wiki/ANSI_escape_code + #define MOVE_TO_PREV_LINE "\033[A" + #define CLEAR_TO_END_OF_LINE "\033[K" + + static const char * clear_prev_line = MOVE_TO_PREV_LINE \ + CLEAR_TO_END_OF_LINE; + + /// Move cursor to the beginning of line + writeCString("\r", out); + + for (size_t line = 0; line < lines_number; ++line) + { + writeCString(clear_prev_line, out); + } +} + +void PrettyBlockOutputFormat::consumePartialResult(Chunk chunk) +{ + if (prev_partial_block_rows > 0) + /// number of rows + header line + footer line + clearLastLines(prev_partial_block_rows + 2); + + prev_partial_block_rows = chunk.getNumRows(); + write(std::move(chunk), PortKind::PartialResult); +} + void PrettyBlockOutputFormat::writeMonoChunkIfNeeded() { diff --git a/src/Processors/Formats/Impl/PrettyBlockOutputFormat.h b/src/Processors/Formats/Impl/PrettyBlockOutputFormat.h index dfb23ac63f9..92466dce3ff 100644 --- a/src/Processors/Formats/Impl/PrettyBlockOutputFormat.h +++ b/src/Processors/Formats/Impl/PrettyBlockOutputFormat.h @@ -28,7 +28,12 @@ protected: void consumeTotals(Chunk) override; void consumeExtremes(Chunk) override; + void clearLastLines(size_t lines_number); + void consumePartialResult(Chunk) override; + size_t total_rows = 0; + size_t prev_partial_block_rows = 0; + size_t row_number_width = 7; // "10000. " const FormatSettings format_settings; @@ -55,6 +60,7 @@ protected: void resetFormatterImpl() override { total_rows = 0; + prev_partial_block_rows = 0; } private: diff --git a/src/Processors/Formats/Impl/PrettyCompactBlockOutputFormat.cpp b/src/Processors/Formats/Impl/PrettyCompactBlockOutputFormat.cpp index 2ba9ec725e2..3a04d86b1ad 100644 --- a/src/Processors/Formats/Impl/PrettyCompactBlockOutputFormat.cpp +++ b/src/Processors/Formats/Impl/PrettyCompactBlockOutputFormat.cpp @@ -194,7 +194,8 @@ void PrettyCompactBlockOutputFormat::writeChunk(const Chunk & chunk, PortKind po writeBottom(max_widths); - total_rows += num_rows; + if (port_kind != PortKind::PartialResult) + total_rows += num_rows; } diff --git a/src/Processors/Formats/LazyOutputFormat.h b/src/Processors/Formats/LazyOutputFormat.h index 9cf609ed2d7..bbcfdbb7193 100644 --- a/src/Processors/Formats/LazyOutputFormat.h +++ b/src/Processors/Formats/LazyOutputFormat.h @@ -14,8 +14,8 @@ class LazyOutputFormat : public IOutputFormat { public: - explicit LazyOutputFormat(const Block & header) - : IOutputFormat(header, out), queue(2) {} + explicit LazyOutputFormat(const Block & header, bool is_partial_result_protocol_active = false) + : IOutputFormat(header, out, is_partial_result_protocol_active), queue(2) {} String getName() const override { return "LazyOutputFormat"; } @@ -49,6 +49,7 @@ protected: void consumeTotals(Chunk chunk) override { totals = std::move(chunk); } void consumeExtremes(Chunk chunk) override { extremes = std::move(chunk); } + void consumePartialResult(Chunk chunk) override { consume(std::move(chunk)); } private: diff --git a/src/Processors/IProcessor.cpp b/src/Processors/IProcessor.cpp index 8b160153733..2f294a32531 100644 --- a/src/Processors/IProcessor.cpp +++ b/src/Processors/IProcessor.cpp @@ -40,5 +40,10 @@ std::string IProcessor::statusToName(Status status) UNREACHABLE(); } +ProcessorPtr IProcessor::getPartialResultProcessorPtr(const ProcessorPtr & current_processor, UInt64 partial_result_limit, UInt64 partial_result_duration_ms) +{ + return current_processor->getPartialResultProcessor(current_processor, partial_result_limit, partial_result_duration_ms); +} + } diff --git a/src/Processors/IProcessor.h b/src/Processors/IProcessor.h index c6bef186877..51a0bb1c121 100644 --- a/src/Processors/IProcessor.h +++ b/src/Processors/IProcessor.h @@ -164,6 +164,8 @@ public: static std::string statusToName(Status status); + static ProcessorPtr getPartialResultProcessorPtr(const ProcessorPtr & current_processor, UInt64 partial_result_limit, UInt64 partial_result_duration_ms); + /** Method 'prepare' is responsible for all cheap ("instantaneous": O(1) of data volume, no wait) calculations. * * It may access input and output ports, @@ -235,6 +237,22 @@ public: throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method 'expandPipeline' is not implemented for {} processor", getName()); } + enum class PartialResultStatus + { + /// Processor currently doesn't support work with the partial result pipeline. + NotSupported, + + /// Processor can be skipped in the partial result pipeline. + SkipSupported, + + /// Processor creates a light-weight copy of itself in the partial result pipeline. + /// The copy can create snapshots of the original processor or transform small blocks of data in the same way as the original processor + FullSupported, + }; + + virtual bool isPartialResultProcessor() const { return false; } + virtual PartialResultStatus getPartialResultProcessorSupportStatus() const { return PartialResultStatus::NotSupported; } + /// In case if query was cancelled executor will wait till all processors finish their jobs. /// Generally, there is no reason to check this flag. However, it may be reasonable for long operations (e.g. i/o). bool isCancelled() const { return is_cancelled.load(std::memory_order_acquire); } @@ -369,6 +387,11 @@ public: protected: virtual void onCancel() {} + virtual ProcessorPtr getPartialResultProcessor(const ProcessorPtr & /*current_processor*/, UInt64 /*partial_result_limit*/, UInt64 /*partial_result_duration_ms*/) + { + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method 'getPartialResultProcessor' is not implemented for {} processor", getName()); + } + private: /// For: /// - elapsed_us diff --git a/src/Processors/LimitTransform.cpp b/src/Processors/LimitTransform.cpp index 5e24062d67a..b2bf3c28eee 100644 --- a/src/Processors/LimitTransform.cpp +++ b/src/Processors/LimitTransform.cpp @@ -1,5 +1,5 @@ #include - +#include namespace DB { @@ -180,7 +180,6 @@ LimitTransform::Status LimitTransform::preparePair(PortsData & data) return Status::NeedData; data.current_chunk = input.pull(true); - auto rows = data.current_chunk.getNumRows(); if (rows_before_limit_at_least && !data.input_port_has_counter) @@ -367,5 +366,11 @@ bool LimitTransform::sortColumnsEqualAt(const ColumnRawPtrs & current_chunk_sort return true; } +ProcessorPtr LimitTransform::getPartialResultProcessor(const ProcessorPtr & /*current_processor*/, UInt64 partial_result_limit, UInt64 partial_result_duration_ms) +{ + const auto & header = inputs.front().getHeader(); + return std::make_shared(header, partial_result_limit, partial_result_duration_ms, limit, offset); +} + } diff --git a/src/Processors/LimitTransform.h b/src/Processors/LimitTransform.h index 33ff968985f..eac5f9e8d6d 100644 --- a/src/Processors/LimitTransform.h +++ b/src/Processors/LimitTransform.h @@ -55,6 +55,8 @@ private: ColumnRawPtrs extractSortColumns(const Columns & columns) const; bool sortColumnsEqualAt(const ColumnRawPtrs & current_chunk_sort_columns, UInt64 current_chunk_row_num) const; + ProcessorPtr getPartialResultProcessor(const ProcessorPtr & current_processor, UInt64 partial_result_limit, UInt64 partial_result_duration_ms) override; + public: LimitTransform( const Block & header_, UInt64 limit_, UInt64 offset_, size_t num_streams = 1, @@ -73,6 +75,8 @@ public: void setRowsBeforeLimitCounter(RowsBeforeLimitCounterPtr counter) override { rows_before_limit_at_least.swap(counter); } void setInputPortHasCounter(size_t pos) { ports_data[pos].input_port_has_counter = true; } + + PartialResultStatus getPartialResultProcessorSupportStatus() const override { return PartialResultStatus::FullSupported; } }; } diff --git a/src/Processors/QueryPlan/BuildQueryPipelineSettings.cpp b/src/Processors/QueryPlan/BuildQueryPipelineSettings.cpp index fb3ed7f80fc..60ac30389a1 100644 --- a/src/Processors/QueryPlan/BuildQueryPipelineSettings.cpp +++ b/src/Processors/QueryPlan/BuildQueryPipelineSettings.cpp @@ -9,7 +9,12 @@ namespace DB BuildQueryPipelineSettings BuildQueryPipelineSettings::fromContext(ContextPtr from) { BuildQueryPipelineSettings settings; - settings.actions_settings = ExpressionActionsSettings::fromSettings(from->getSettingsRef(), CompileExpressions::yes); + + const auto & context_settings = from->getSettingsRef(); + settings.partial_result_limit = context_settings.max_rows_in_partial_result; + settings.partial_result_duration_ms = context_settings.partial_result_update_duration_ms.totalMilliseconds(); + + settings.actions_settings = ExpressionActionsSettings::fromSettings(context_settings, CompileExpressions::yes); settings.process_list_element = from->getProcessListElement(); settings.progress_callback = from->getProgressCallback(); return settings; diff --git a/src/Processors/QueryPlan/BuildQueryPipelineSettings.h b/src/Processors/QueryPlan/BuildQueryPipelineSettings.h index 3b5e4e06953..0410bf925d1 100644 --- a/src/Processors/QueryPlan/BuildQueryPipelineSettings.h +++ b/src/Processors/QueryPlan/BuildQueryPipelineSettings.h @@ -19,6 +19,9 @@ struct BuildQueryPipelineSettings QueryStatusPtr process_list_element; ProgressCallback progress_callback = nullptr; + UInt64 partial_result_limit = 0; + UInt64 partial_result_duration_ms = 0; + const ExpressionActionsSettings & getActionsSettings() const { return actions_settings; } static BuildQueryPipelineSettings fromContext(ContextPtr from); }; diff --git a/src/Processors/QueryPlan/QueryPlan.cpp b/src/Processors/QueryPlan/QueryPlan.cpp index 8054209c1c3..5d38bfb42c4 100644 --- a/src/Processors/QueryPlan/QueryPlan.cpp +++ b/src/Processors/QueryPlan/QueryPlan.cpp @@ -168,6 +168,8 @@ QueryPipelineBuilderPtr QueryPlan::buildQueryPipeline( QueryPipelineBuilderPtr last_pipeline; + bool has_partial_result_setting = build_pipeline_settings.partial_result_duration_ms > 0; + std::stack stack; stack.push(Frame{.node = root}); @@ -194,6 +196,9 @@ QueryPipelineBuilderPtr QueryPlan::buildQueryPipeline( } else stack.push(Frame{.node = frame.node->children[next_child]}); + + if (has_partial_result_setting && last_pipeline && !last_pipeline->isPartialResultActive()) + last_pipeline->activatePartialResult(build_pipeline_settings.partial_result_limit, build_pipeline_settings.partial_result_duration_ms); } last_pipeline->setProgressCallback(build_pipeline_settings.progress_callback); diff --git a/src/Processors/QueryPlan/SortingStep.h b/src/Processors/QueryPlan/SortingStep.h index 371a24ac6f2..a72cab05754 100644 --- a/src/Processors/QueryPlan/SortingStep.h +++ b/src/Processors/QueryPlan/SortingStep.h @@ -27,6 +27,8 @@ public: size_t max_bytes_before_external_sort = 0; TemporaryDataOnDiskScopePtr tmp_data = nullptr; size_t min_free_disk_space = 0; + UInt64 partial_result_limit = 0; + UInt64 partial_result_duration_ms = 0; explicit Settings(const Context & context); explicit Settings(size_t max_block_size_); diff --git a/src/Processors/Transforms/AggregatingPartialResultTransform.cpp b/src/Processors/Transforms/AggregatingPartialResultTransform.cpp new file mode 100644 index 00000000000..cf8ce72e096 --- /dev/null +++ b/src/Processors/Transforms/AggregatingPartialResultTransform.cpp @@ -0,0 +1,47 @@ +#include + +namespace DB +{ + +AggregatingPartialResultTransform::AggregatingPartialResultTransform( + const Block & input_header, const Block & output_header, AggregatingTransformPtr aggregating_transform_, + UInt64 partial_result_limit_, UInt64 partial_result_duration_ms_) + : PartialResultTransform(input_header, output_header, partial_result_limit_, partial_result_duration_ms_) + , aggregating_transform(std::move(aggregating_transform_)) + , transform_aggregator(input_header, aggregating_transform->params->params) + {} + +void AggregatingPartialResultTransform::transformPartialResult(Chunk & chunk) +{ + auto & params = aggregating_transform->params->params; + + bool no_more_keys = false; + AggregatedDataVariants variants; + ColumnRawPtrs key_columns(params.keys_size); + Aggregator::AggregateColumns aggregate_columns(params.aggregates_size); + + const UInt64 num_rows = chunk.getNumRows(); + transform_aggregator.executeOnBlock(chunk.detachColumns(), 0, num_rows, variants, key_columns, aggregate_columns, no_more_keys); + + auto transformed_block = transform_aggregator.convertToBlocks(variants, /*final*/ true, /*max_threads*/ 1).front(); + + chunk = convertToChunk(transformed_block); +} + +PartialResultTransform::ShaphotResult AggregatingPartialResultTransform::getRealProcessorSnapshot() +{ + std::lock_guard lock(aggregating_transform->snapshot_mutex); + if (aggregating_transform->is_generate_initialized) + return {{}, SnaphotStatus::Stopped}; + + if (aggregating_transform->variants.empty()) + return {{}, SnaphotStatus::NotReady}; + + auto & snapshot_aggregator = aggregating_transform->params->aggregator; + auto & snapshot_variants = aggregating_transform->many_data->variants; + auto block = snapshot_aggregator.prepareBlockAndFillWithoutKeySnapshot(*snapshot_variants.at(0)); + + return {convertToChunk(block), SnaphotStatus::Ready}; +} + +} diff --git a/src/Processors/Transforms/AggregatingPartialResultTransform.h b/src/Processors/Transforms/AggregatingPartialResultTransform.h new file mode 100644 index 00000000000..f7bac3a5394 --- /dev/null +++ b/src/Processors/Transforms/AggregatingPartialResultTransform.h @@ -0,0 +1,29 @@ +#pragma once + +#include +#include +#include + +namespace DB +{ + +class AggregatingPartialResultTransform : public PartialResultTransform +{ +public: + using AggregatingTransformPtr = std::shared_ptr; + + AggregatingPartialResultTransform( + const Block & input_header, const Block & output_header, AggregatingTransformPtr aggregating_transform_, + UInt64 partial_result_limit_, UInt64 partial_result_duration_ms_); + + String getName() const override { return "AggregatingPartialResultTransform"; } + + void transformPartialResult(Chunk & chunk) override; + ShaphotResult getRealProcessorSnapshot() override; + +private: + AggregatingTransformPtr aggregating_transform; + Aggregator transform_aggregator; +}; + +} diff --git a/src/Processors/Transforms/AggregatingTransform.cpp b/src/Processors/Transforms/AggregatingTransform.cpp index 4bd000797a6..b4d2785bed2 100644 --- a/src/Processors/Transforms/AggregatingTransform.cpp +++ b/src/Processors/Transforms/AggregatingTransform.cpp @@ -1,3 +1,4 @@ +#include #include #include @@ -657,6 +658,8 @@ void AggregatingTransform::consume(Chunk chunk) src_rows += num_rows; src_bytes += chunk.bytes(); + std::lock_guard lock(snapshot_mutex); + if (params->params.only_merge) { auto block = getInputs().front().getHeader().cloneWithColumns(chunk.detachColumns()); @@ -676,6 +679,7 @@ void AggregatingTransform::initGenerate() if (is_generate_initialized) return; + std::lock_guard lock(snapshot_mutex); is_generate_initialized = true; /// If there was no data, and we aggregate without keys, and we must return single row with the result of empty aggregation. @@ -806,4 +810,12 @@ void AggregatingTransform::initGenerate() } } +ProcessorPtr AggregatingTransform::getPartialResultProcessor(const ProcessorPtr & current_processor, UInt64 partial_result_limit, UInt64 partial_result_duration_ms) +{ + const auto & input_header = inputs.front().getHeader(); + const auto & output_header = outputs.front().getHeader(); + auto aggregating_processor = std::dynamic_pointer_cast(current_processor); + return std::make_shared(input_header, output_header, std::move(aggregating_processor), partial_result_limit, partial_result_duration_ms); +} + } diff --git a/src/Processors/Transforms/AggregatingTransform.h b/src/Processors/Transforms/AggregatingTransform.h index 38baa4d0394..791cd12326f 100644 --- a/src/Processors/Transforms/AggregatingTransform.h +++ b/src/Processors/Transforms/AggregatingTransform.h @@ -170,9 +170,23 @@ public: void work() override; Processors expandPipeline() override; + PartialResultStatus getPartialResultProcessorSupportStatus() const override + { + /// Currently AggregatingPartialResultTransform support only single-thread aggregation without key. + + /// TODO: check that insert results from aggregator.prepareBlockAndFillWithoutKey return values without + /// changing of the aggregator state when aggregation with keys will be supported in AggregatingPartialResultTransform. + bool is_partial_result_supported = params->params.keys_size == 0 /// Aggregation without key. + && many_data->variants.size() == 1; /// Use only one stream for aggregation. + + return is_partial_result_supported ? PartialResultStatus::FullSupported : PartialResultStatus::NotSupported; + } + protected: void consume(Chunk chunk); + ProcessorPtr getPartialResultProcessor(const ProcessorPtr & current_processor, UInt64 partial_result_limit, UInt64 partial_result_duration_ms) override; + private: /// To read the data that was flushed into the temporary data file. Processors processors; @@ -212,6 +226,13 @@ private: bool is_consume_started = false; + friend class AggregatingPartialResultTransform; + /// The mutex protects variables that are used for creating a snapshot of the current processor. + /// The current implementation of AggregatingPartialResultTransform uses the 'is_generate_initialized' variable to check + /// whether the processor has started sending data through the main pipeline, and the corresponding partial result processor should stop creating snapshots. + /// Additionally, the mutex protects the 'params->aggregator' and 'many_data->variants' variables, which are used to get data from them for a snapshot. + std::mutex snapshot_mutex; + void initGenerate(); }; diff --git a/src/Processors/Transforms/ExpressionTransform.cpp b/src/Processors/Transforms/ExpressionTransform.cpp index 0d3341b000c..78dace56e4e 100644 --- a/src/Processors/Transforms/ExpressionTransform.cpp +++ b/src/Processors/Transforms/ExpressionTransform.cpp @@ -25,6 +25,12 @@ void ExpressionTransform::transform(Chunk & chunk) chunk.setColumns(block.getColumns(), num_rows); } +ProcessorPtr ExpressionTransform::getPartialResultProcessor(const ProcessorPtr & /*current_processor*/, UInt64 /*partial_result_limit*/, UInt64 /*partial_result_duration_ms*/) +{ + const auto & header = getInputPort().getHeader(); + return std::make_shared(header, expression); +} + ConvertingTransform::ConvertingTransform(const Block & header_, ExpressionActionsPtr expression_) : ExceptionKeepingTransform(header_, ExpressionTransform::transformHeader(header_, expression_->getActionsDAG())) , expression(std::move(expression_)) diff --git a/src/Processors/Transforms/ExpressionTransform.h b/src/Processors/Transforms/ExpressionTransform.h index 791c7d7ba73..8250f25f0f8 100644 --- a/src/Processors/Transforms/ExpressionTransform.h +++ b/src/Processors/Transforms/ExpressionTransform.h @@ -26,10 +26,15 @@ public: static Block transformHeader(Block header, const ActionsDAG & expression); + PartialResultStatus getPartialResultProcessorSupportStatus() const override { return PartialResultStatus::FullSupported; } + protected: void transform(Chunk & chunk) override; + ProcessorPtr getPartialResultProcessor(const ProcessorPtr & current_processor, UInt64 partial_result_limit, UInt64 partial_result_duration_ms) override; + private: + ExpressionActionsPtr expression; }; diff --git a/src/Processors/Transforms/LimitPartialResultTransform.cpp b/src/Processors/Transforms/LimitPartialResultTransform.cpp new file mode 100644 index 00000000000..c9eaa9dc7dd --- /dev/null +++ b/src/Processors/Transforms/LimitPartialResultTransform.cpp @@ -0,0 +1,42 @@ +#include +#include + +namespace DB +{ + +LimitPartialResultTransform::LimitPartialResultTransform( + const Block & header, + UInt64 partial_result_limit_, + UInt64 partial_result_duration_ms_, + UInt64 limit_, + UInt64 offset_) + : PartialResultTransform(header, partial_result_limit_, partial_result_duration_ms_) + , limit(limit_) + , offset(offset_) + {} + +void LimitPartialResultTransform::transformPartialResult(Chunk & chunk) +{ + UInt64 num_rows = chunk.getNumRows(); + if (num_rows < offset || limit == 0) + { + chunk = {}; + return; + } + + UInt64 length = std::min(limit, num_rows - offset); + + /// Check if some rows should be removed + if (length < num_rows) + { + UInt64 num_columns = chunk.getNumColumns(); + auto columns = chunk.detachColumns(); + + for (UInt64 i = 0; i < num_columns; ++i) + columns[i] = columns[i]->cut(offset, length); + + chunk.setColumns(std::move(columns), length); + } +} + +} diff --git a/src/Processors/Transforms/LimitPartialResultTransform.h b/src/Processors/Transforms/LimitPartialResultTransform.h new file mode 100644 index 00000000000..3a0116b624d --- /dev/null +++ b/src/Processors/Transforms/LimitPartialResultTransform.h @@ -0,0 +1,36 @@ +#pragma once + +#include + +namespace DB +{ + +class LimitTransform; + +/// Currently support only single thread implementation with one input and one output ports +class LimitPartialResultTransform : public PartialResultTransform +{ +public: + using LimitTransformPtr = std::shared_ptr; + + LimitPartialResultTransform( + const Block & header, + UInt64 partial_result_limit_, + UInt64 partial_result_duration_ms_, + UInt64 limit_, + UInt64 offset_); + + String getName() const override { return "LimitPartialResultTransform"; } + + void transformPartialResult(Chunk & chunk) override; + /// LimitsTransform doesn't have a state which can be snapshoted + ShaphotResult getRealProcessorSnapshot() override { return {{}, SnaphotStatus::Stopped}; } + +private: + UInt64 limit; + UInt64 offset; + + LimitTransformPtr limit_transform; +}; + +} diff --git a/src/Processors/Transforms/LimitsCheckingTransform.cpp b/src/Processors/Transforms/LimitsCheckingTransform.cpp index 02d2fef808c..0557f3f291e 100644 --- a/src/Processors/Transforms/LimitsCheckingTransform.cpp +++ b/src/Processors/Transforms/LimitsCheckingTransform.cpp @@ -1,4 +1,5 @@ #include +#include #include namespace DB diff --git a/src/Processors/Transforms/LimitsCheckingTransform.h b/src/Processors/Transforms/LimitsCheckingTransform.h index 2f96a17c17b..eabb988dab6 100644 --- a/src/Processors/Transforms/LimitsCheckingTransform.h +++ b/src/Processors/Transforms/LimitsCheckingTransform.h @@ -33,6 +33,8 @@ public: void setQuota(const std::shared_ptr & quota_) { quota = quota_; } + PartialResultStatus getPartialResultProcessorSupportStatus() const override { return PartialResultStatus::SkipSupported; } + protected: void transform(Chunk & chunk) override; diff --git a/src/Processors/Transforms/MergeSortingPartialResultTransform.cpp b/src/Processors/Transforms/MergeSortingPartialResultTransform.cpp new file mode 100644 index 00000000000..e4a2af2cdd8 --- /dev/null +++ b/src/Processors/Transforms/MergeSortingPartialResultTransform.cpp @@ -0,0 +1,48 @@ +#include + +namespace DB +{ + +MergeSortingPartialResultTransform::MergeSortingPartialResultTransform( + const Block & header, MergeSortingTransformPtr merge_sorting_transform_, + UInt64 partial_result_limit_, UInt64 partial_result_duration_ms_) + : PartialResultTransform(header, partial_result_limit_, partial_result_duration_ms_) + , merge_sorting_transform(std::move(merge_sorting_transform_)) + {} + +PartialResultTransform::ShaphotResult MergeSortingPartialResultTransform::getRealProcessorSnapshot() +{ + std::lock_guard lock(merge_sorting_transform->snapshot_mutex); + if (merge_sorting_transform->generated_prefix) + return {{}, SnaphotStatus::Stopped}; + + if (merge_sorting_transform->chunks.empty()) + return {{}, SnaphotStatus::NotReady}; + + /// Sort all input data + merge_sorting_transform->remerge(); + /// Add a copy of the first `partial_result_limit` rows to a generated_chunk + /// to send it later as a partial result in the next prepare stage of the current processor + auto generated_columns = merge_sorting_transform->chunks[0].cloneEmptyColumns(); + size_t total_rows = 0; + for (const auto & merged_chunk : merge_sorting_transform->chunks) + { + size_t rows = std::min(merged_chunk.getNumRows(), partial_result_limit - total_rows); + if (rows == 0) + break; + + for (size_t position = 0; position < generated_columns.size(); ++position) + { + auto column = merged_chunk.getColumns()[position]; + generated_columns[position]->insertRangeFrom(*column, 0, rows); + } + + total_rows += rows; + } + + auto partial_result = Chunk(std::move(generated_columns), total_rows, merge_sorting_transform->chunks[0].getChunkInfo()); + merge_sorting_transform->enrichChunkWithConstants(partial_result); + return {std::move(partial_result), SnaphotStatus::Ready}; +} + +} diff --git a/src/Processors/Transforms/MergeSortingPartialResultTransform.h b/src/Processors/Transforms/MergeSortingPartialResultTransform.h new file mode 100644 index 00000000000..781aa8e1265 --- /dev/null +++ b/src/Processors/Transforms/MergeSortingPartialResultTransform.h @@ -0,0 +1,28 @@ +#pragma once + +#include +#include + +namespace DB +{ + +class MergeSortingPartialResultTransform : public PartialResultTransform +{ +public: + using MergeSortingTransformPtr = std::shared_ptr; + + MergeSortingPartialResultTransform( + const Block & header, MergeSortingTransformPtr merge_sorting_transform_, + UInt64 partial_result_limit_, UInt64 partial_result_duration_ms_); + + String getName() const override { return "MergeSortingPartialResultTransform"; } + + /// MergeSortingTransform always receives chunks in a sorted state, so transformation is not needed + void transformPartialResult(Chunk & /*chunk*/) override {} + ShaphotResult getRealProcessorSnapshot() override; + +private: + MergeSortingTransformPtr merge_sorting_transform; +}; + +} diff --git a/src/Processors/Transforms/MergeSortingTransform.cpp b/src/Processors/Transforms/MergeSortingTransform.cpp index de77711d129..e801e5e16d5 100644 --- a/src/Processors/Transforms/MergeSortingTransform.cpp +++ b/src/Processors/Transforms/MergeSortingTransform.cpp @@ -1,4 +1,5 @@ #include +#include #include #include #include @@ -136,6 +137,8 @@ void MergeSortingTransform::consume(Chunk chunk) /// If there were only const columns in sort description, then there is no need to sort. /// Return the chunk as is. + std::lock_guard lock(snapshot_mutex); + if (description.empty()) { generated_chunk = std::move(chunk); @@ -213,6 +216,8 @@ void MergeSortingTransform::serialize() void MergeSortingTransform::generate() { + std::lock_guard lock(snapshot_mutex); + if (!generated_prefix) { size_t num_tmp_files = tmp_data ? tmp_data->getStreams().size() : 0; @@ -273,4 +278,11 @@ void MergeSortingTransform::remerge() sum_bytes_in_blocks = new_sum_bytes_in_blocks; } +ProcessorPtr MergeSortingTransform::getPartialResultProcessor(const ProcessorPtr & current_processor, UInt64 partial_result_limit, UInt64 partial_result_duration_ms) +{ + const auto & header = inputs.front().getHeader(); + auto merge_sorting_processor = std::dynamic_pointer_cast(current_processor); + return std::make_shared(header, std::move(merge_sorting_processor), partial_result_limit, partial_result_duration_ms); +} + } diff --git a/src/Processors/Transforms/MergeSortingTransform.h b/src/Processors/Transforms/MergeSortingTransform.h index e8c180b6903..67f098b4362 100644 --- a/src/Processors/Transforms/MergeSortingTransform.h +++ b/src/Processors/Transforms/MergeSortingTransform.h @@ -33,6 +33,8 @@ public: String getName() const override { return "MergeSortingTransform"; } + PartialResultStatus getPartialResultProcessorSupportStatus() const override { return PartialResultStatus::FullSupported; } + protected: void consume(Chunk chunk) override; void serialize() override; @@ -40,6 +42,8 @@ protected: Processors expandPipeline() override; + ProcessorPtr getPartialResultProcessor(const ProcessorPtr & current_processor, UInt64 partial_result_limit, UInt64 partial_result_duration_ms) override; + private: size_t max_bytes_before_remerge; double remerge_lowered_memory_bytes_ratio; @@ -59,6 +63,13 @@ private: void remerge(); ProcessorPtr external_merging_sorted; + + friend class MergeSortingPartialResultTransform; + /// The mutex protects variables that are used for creating a snapshot of the current processor. + /// The current implementation of MergeSortingPartialResultTransform uses the 'generated_prefix' variable to check + /// whether the processor has started sending data through the main pipeline, and the corresponding partial result processor should stop creating snapshots. + /// Additionally, the mutex protects the 'chunks' variable and all variables in the 'remerge' function, which is used to transition 'chunks' to a sorted state. + std::mutex snapshot_mutex; }; } diff --git a/src/Processors/Transforms/PartialResultTransform.cpp b/src/Processors/Transforms/PartialResultTransform.cpp new file mode 100644 index 00000000000..97ff79dee54 --- /dev/null +++ b/src/Processors/Transforms/PartialResultTransform.cpp @@ -0,0 +1,80 @@ +#include + +namespace DB +{ + + +PartialResultTransform::PartialResultTransform(const Block & header, UInt64 partial_result_limit_, UInt64 partial_result_duration_ms_) + : PartialResultTransform(header, header, partial_result_limit_, partial_result_duration_ms_) {} + +PartialResultTransform::PartialResultTransform(const Block & input_header, const Block & output_header, UInt64 partial_result_limit_, UInt64 partial_result_duration_ms_) + : IProcessor({input_header}, {output_header}) + , input(inputs.front()) + , output(outputs.front()) + , partial_result_limit(partial_result_limit_) + , partial_result_duration_ms(partial_result_duration_ms_) + , watch(CLOCK_MONOTONIC) + {} + +IProcessor::Status PartialResultTransform::prepare() +{ + if (output.isFinished()) + { + input.close(); + return Status::Finished; + } + + if (finished_getting_snapshots) + { + output.finish(); + return Status::Finished; + } + + if (!output.canPush()) + { + input.setNotNeeded(); + return Status::PortFull; + } + + /// If input data from previous partial result processor is finished then + /// PartialResultTransform ready to create snapshots and send them as a partial result + if (input.isFinished()) + { + if (partial_result.snapshot_status == SnaphotStatus::Ready) + { + partial_result.snapshot_status = SnaphotStatus::NotReady; + output.push(std::move(partial_result.chunk)); + return Status::PortFull; + } + + return Status::Ready; + } + + input.setNeeded(); + if (!input.hasData()) + return Status::NeedData; + + partial_result.chunk = input.pull(); + transformPartialResult(partial_result.chunk); + if (partial_result.chunk.getNumRows() > 0) + { + output.push(std::move(partial_result.chunk)); + return Status::PortFull; + } + + return Status::NeedData; +} + +void PartialResultTransform::work() +{ + if (partial_result_duration_ms < watch.elapsedMilliseconds()) + { + partial_result = getRealProcessorSnapshot(); + if (partial_result.snapshot_status == SnaphotStatus::Stopped) + finished_getting_snapshots = true; + + watch.restart(); + } +} + +} diff --git a/src/Processors/Transforms/PartialResultTransform.h b/src/Processors/Transforms/PartialResultTransform.h new file mode 100644 index 00000000000..4fe87638f38 --- /dev/null +++ b/src/Processors/Transforms/PartialResultTransform.h @@ -0,0 +1,57 @@ +#pragma once + +#include + +namespace DB +{ + +/// Processors of this type are used to construct an auxiliary pipeline with processors corresponding to those in the main pipeline. +/// These processors work in two modes: +/// 1) Creating a snapshot of the corresponding processor from the main pipeline once per partial_result_duration_ms (period in milliseconds), and then sending the snapshot through the partial result pipeline. +/// 2) Transforming small blocks of data in the same way as the original processor and sending the transformed data through the partial result pipeline. +/// All processors of this type rely on the invariant that a new block from the previous processor of the partial result pipeline overwrites information about the previous block of the same previous processor. +class PartialResultTransform : public IProcessor +{ +public: + PartialResultTransform(const Block & header, UInt64 partial_result_limit_, UInt64 partial_result_duration_ms_); + PartialResultTransform(const Block & input_header, const Block & output_header, UInt64 partial_result_limit_, UInt64 partial_result_duration_ms_); + + String getName() const override { return "PartialResultTransform"; } + + Status prepare() override; + void work() override; + + bool isPartialResultProcessor() const override { return true; } + +protected: + enum class SnaphotStatus + { + NotReady, // Waiting for data from the previous partial result processor or awaiting a timer before creating the snapshot. + Ready, // Current partial result processor has received a snapshot from the processor in the main pipeline. + Stopped, // The processor from the main pipeline has started sending data, and the pipeline for partial results should use data from the next processors of the main pipeline. + }; + + struct ShaphotResult + { + Chunk chunk; + SnaphotStatus snapshot_status; + }; + + InputPort & input; + OutputPort & output; + + UInt64 partial_result_limit; + UInt64 partial_result_duration_ms; + + ShaphotResult partial_result = {{}, SnaphotStatus::NotReady}; + + bool finished_getting_snapshots = false; + + virtual void transformPartialResult(Chunk & /*chunk*/) = 0; + virtual ShaphotResult getRealProcessorSnapshot() = 0; // { return {{}, SnaphotStatus::Stopped}; } + +private: + Stopwatch watch; +}; + +} diff --git a/src/QueryPipeline/Pipe.cpp b/src/QueryPipeline/Pipe.cpp index 91ba01c479f..293d152ea65 100644 --- a/src/QueryPipeline/Pipe.cpp +++ b/src/QueryPipeline/Pipe.cpp @@ -12,6 +12,7 @@ #include #include #include +#include namespace DB { @@ -167,12 +168,9 @@ Pipe::Pipe(ProcessorPtr source) { checkSource(*source); - if (collected_processors) - collected_processors->emplace_back(source); - output_ports.push_back(&source->getOutputs().front()); header = output_ports.front()->getHeader(); - processors->emplace_back(std::move(source)); + addProcessor(std::move(source)); max_parallel_streams = 1; } @@ -319,6 +317,16 @@ Pipe Pipe::unitePipes(Pipes pipes, Processors * collected_processors, bool allow res.processors->insert(res.processors->end(), pipe.processors->begin(), pipe.processors->end()); res.output_ports.insert(res.output_ports.end(), pipe.output_ports.begin(), pipe.output_ports.end()); + if (res.isPartialResultActive() && pipe.isPartialResultActive()) + { + res.partial_result_ports.insert( + res.partial_result_ports.end(), + pipe.partial_result_ports.begin(), + pipe.partial_result_ports.end()); + } + else + res.dropPartialResult(); + res.max_parallel_streams += pipe.max_parallel_streams; if (pipe.totals_port) @@ -352,11 +360,11 @@ void Pipe::addSource(ProcessorPtr source) else assertBlocksHaveEqualStructure(header, source_header, "Pipes"); - if (collected_processors) - collected_processors->emplace_back(source); - output_ports.push_back(&source->getOutputs().front()); - processors->emplace_back(std::move(source)); + if (isPartialResultActive()) + partial_result_ports.push_back(nullptr); + + addProcessor(std::move(source)); max_parallel_streams = std::max(max_parallel_streams, output_ports.size()); } @@ -374,11 +382,9 @@ void Pipe::addTotalsSource(ProcessorPtr source) assertBlocksHaveEqualStructure(header, source_header, "Pipes"); - if (collected_processors) - collected_processors->emplace_back(source); - totals_port = &source->getOutputs().front(); - processors->emplace_back(std::move(source)); + + addProcessor(std::move(source)); } void Pipe::addExtremesSource(ProcessorPtr source) @@ -394,11 +400,20 @@ void Pipe::addExtremesSource(ProcessorPtr source) assertBlocksHaveEqualStructure(header, source_header, "Pipes"); - if (collected_processors) - collected_processors->emplace_back(source); - extremes_port = &source->getOutputs().front(); - processors->emplace_back(std::move(source)); + + addProcessor(std::move(source)); +} + +void Pipe::activatePartialResult(UInt64 partial_result_limit_, UInt64 partial_result_duration_ms_) +{ + if (is_partial_result_active) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Partial result for Pipe should be initialized only once"); + + is_partial_result_active = true; + partial_result_limit = partial_result_limit_; + partial_result_duration_ms = partial_result_duration_ms_; + partial_result_ports.assign(output_ports.size(), nullptr); } static void dropPort(OutputPort *& port, Processors & processors, Processors * collected_processors) @@ -426,6 +441,15 @@ void Pipe::dropExtremes() dropPort(extremes_port, *processors, collected_processors); } +void Pipe::dropPartialResult() +{ + for (auto & port : partial_result_ports) + dropPort(port, *processors, collected_processors); + + is_partial_result_active = false; + partial_result_ports.clear(); +} + void Pipe::addTransform(ProcessorPtr transform) { addTransform(std::move(transform), static_cast(nullptr), static_cast(nullptr)); @@ -456,6 +480,8 @@ void Pipe::addTransform(ProcessorPtr transform, OutputPort * totals, OutputPort if (extremes) extremes_port = extremes; + addPartialResultTransform(transform); + size_t next_output = 0; for (auto & input : inputs) { @@ -506,10 +532,7 @@ void Pipe::addTransform(ProcessorPtr transform, OutputPort * totals, OutputPort if (extremes_port) assertBlocksHaveEqualStructure(header, extremes_port->getHeader(), "Pipes"); - if (collected_processors) - collected_processors->emplace_back(transform); - - processors->emplace_back(std::move(transform)); + addProcessor(std::move(transform)); max_parallel_streams = std::max(max_parallel_streams, output_ports.size()); } @@ -546,6 +569,8 @@ void Pipe::addTransform(ProcessorPtr transform, InputPort * totals, InputPort * extremes_port = nullptr; } + addPartialResultTransform(transform); + bool found_totals = false; bool found_extremes = false; @@ -595,14 +620,104 @@ void Pipe::addTransform(ProcessorPtr transform, InputPort * totals, InputPort * if (extremes_port) assertBlocksHaveEqualStructure(header, extremes_port->getHeader(), "Pipes"); - if (collected_processors) - collected_processors->emplace_back(transform); - - processors->emplace_back(std::move(transform)); + addProcessor(std::move(transform)); max_parallel_streams = std::max(max_parallel_streams, output_ports.size()); } +void Pipe::addPartialResultSimpleTransform(const ProcessorPtr & transform, size_t partial_result_port_id) +{ + if (isPartialResultActive()) + { + auto & partial_result_port = partial_result_ports[partial_result_port_id]; + auto partial_result_status = transform->getPartialResultProcessorSupportStatus(); + + if (partial_result_status == IProcessor::PartialResultStatus::NotSupported) + dropPort(partial_result_port, *processors, collected_processors); + + if (partial_result_status != IProcessor::PartialResultStatus::FullSupported) + return; + + auto partial_result_transform = IProcessor::getPartialResultProcessorPtr(transform, partial_result_limit, partial_result_duration_ms); + + connectPartialResultPort(partial_result_port, partial_result_transform->getInputs().front()); + + partial_result_port = &partial_result_transform->getOutputs().front(); + + addProcessor(std::move(partial_result_transform)); + } +} + +void Pipe::addPartialResultTransform(const ProcessorPtr & transform) +{ + if (isPartialResultActive()) + { + size_t new_outputs_size = transform->getOutputs().size(); + auto partial_result_status = transform->getPartialResultProcessorSupportStatus(); + + if (partial_result_status == IProcessor::PartialResultStatus::SkipSupported && new_outputs_size != partial_result_ports.size()) + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Cannot skip transform {} in the partial result part of the Pipe because it has {} output ports, but the partial result part expects {} output ports", + transform->getName(), + new_outputs_size, + partial_result_ports.size()); + + if (partial_result_status == IProcessor::PartialResultStatus::NotSupported) + { + for (auto & partial_result_port : partial_result_ports) + dropPort(partial_result_port, *processors, collected_processors); + + partial_result_ports.assign(new_outputs_size, nullptr); + } + + if (partial_result_status != IProcessor::PartialResultStatus::FullSupported) + return; + + auto partial_result_transform = IProcessor::getPartialResultProcessorPtr(transform, partial_result_limit, partial_result_duration_ms); + auto & inputs = partial_result_transform->getInputs(); + + if (inputs.size() != partial_result_ports.size()) + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Cannot add partial result transform {} to Pipe because it has {} input ports, but {} expected", + partial_result_transform->getName(), + inputs.size(), + partial_result_ports.size()); + + size_t next_port = 0; + for (auto & input : inputs) + { + connectPartialResultPort(partial_result_ports[next_port], input); + ++next_port; + } + + partial_result_ports.assign(new_outputs_size, nullptr); + + next_port = 0; + for (auto & new_partial_result_port : partial_result_transform->getOutputs()) + { + partial_result_ports[next_port] = &new_partial_result_port; + ++next_port; + } + + addProcessor(std::move(partial_result_transform)); + } +} + +void Pipe::connectPartialResultPort(OutputPort * partial_result_port, InputPort & partial_result_transform_port) +{ + if (partial_result_port == nullptr) + { + auto source = std::make_shared(getHeader()); + partial_result_port = &source->getPort(); + + addProcessor(std::move(source)); + } + + connect(*partial_result_port, partial_result_transform_port); +} + void Pipe::addSimpleTransform(const ProcessorGetterWithStreamKind & getter) { if (output_ports.empty()) @@ -610,7 +725,7 @@ void Pipe::addSimpleTransform(const ProcessorGetterWithStreamKind & getter) Block new_header; - auto add_transform = [&](OutputPort *& port, StreamType stream_type) + auto add_transform = [&](OutputPort *& port, size_t partial_result_port_id, StreamType stream_type) { if (!port) return; @@ -646,19 +761,22 @@ void Pipe::addSimpleTransform(const ProcessorGetterWithStreamKind & getter) { connect(*port, transform->getInputs().front()); port = &transform->getOutputs().front(); + if (stream_type == StreamType::Main) + addPartialResultSimpleTransform(transform, partial_result_port_id); - if (collected_processors) - collected_processors->emplace_back(transform); - - processors->emplace_back(std::move(transform)); + addProcessor(std::move(transform)); } }; + size_t partial_result_port_id = 0; for (auto & port : output_ports) - add_transform(port, StreamType::Main); + { + add_transform(port, partial_result_port_id, StreamType::Main); + ++partial_result_port_id; + } - add_transform(totals_port, StreamType::Totals); - add_transform(extremes_port, StreamType::Extremes); + add_transform(totals_port, 0, StreamType::Totals); + add_transform(extremes_port, 0, StreamType::Extremes); header = std::move(new_header); } @@ -679,6 +797,7 @@ void Pipe::addChains(std::vector chains) dropTotals(); dropExtremes(); + dropPartialResult(); size_t max_parallel_streams_for_chains = 0; @@ -697,18 +816,21 @@ void Pipe::addChains(std::vector chains) auto added_processors = Chain::getProcessors(std::move(chains[i])); for (auto & transform : added_processors) - { - if (collected_processors) - collected_processors->emplace_back(transform); - - processors->emplace_back(std::move(transform)); - } + addProcessor(std::move(transform)); } header = std::move(new_header); max_parallel_streams = std::max(max_parallel_streams, max_parallel_streams_for_chains); } +void Pipe::addProcessor(ProcessorPtr processor) +{ + if (collected_processors) + collected_processors->emplace_back(processor); + + processors->emplace_back(std::move(processor)); +} + void Pipe::resize(size_t num_streams, bool force, bool strict) { if (output_ports.empty()) @@ -769,6 +891,9 @@ void Pipe::setSinks(const Pipe::ProcessorGetterWithStreamKind & getter) add_transform(totals_port, StreamType::Totals); add_transform(extremes_port, StreamType::Extremes); + for (auto & port : partial_result_ports) + add_transform(port, StreamType::PartialResult); + output_ports.clear(); header.clear(); } @@ -778,6 +903,9 @@ void Pipe::transform(const Transformer & transformer, bool check_ports) if (output_ports.empty()) throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot transform empty Pipe"); + /// TODO: Add functionality to work with partial result ports in transformer. + dropPartialResult(); + auto new_processors = transformer(output_ports); /// Create hash table with new processors. diff --git a/src/QueryPipeline/Pipe.h b/src/QueryPipeline/Pipe.h index 09931e38578..70e933bcfd2 100644 --- a/src/QueryPipeline/Pipe.h +++ b/src/QueryPipeline/Pipe.h @@ -48,6 +48,9 @@ public: OutputPort * getOutputPort(size_t pos) const { return output_ports[pos]; } OutputPort * getTotalsPort() const { return totals_port; } OutputPort * getExtremesPort() const { return extremes_port; } + OutputPort * getPartialResultPort(size_t pos) const { return partial_result_ports.empty() ? nullptr : partial_result_ports[pos]; } + + bool isPartialResultActive() { return is_partial_result_active; } /// Add processor to list, add it output ports to output_ports. /// Processor shouldn't have input ports, output ports shouldn't be connected. @@ -58,9 +61,13 @@ public: void addTotalsSource(ProcessorPtr source); void addExtremesSource(ProcessorPtr source); - /// Drop totals and extremes (create NullSink for them). + /// Activate sending partial result during main pipeline execution + void activatePartialResult(UInt64 partial_result_limit_, UInt64 partial_result_duration_ms_); + + /// Drop totals, extremes and partial result (create NullSink for them). void dropTotals(); void dropExtremes(); + void dropPartialResult(); /// Add processor to list. It should have size() input ports with compatible header. /// Output ports should have same headers. @@ -69,11 +76,16 @@ public: void addTransform(ProcessorPtr transform, OutputPort * totals, OutputPort * extremes); void addTransform(ProcessorPtr transform, InputPort * totals, InputPort * extremes); + void addPartialResultTransform(const ProcessorPtr & transform); + void addPartialResultSimpleTransform(const ProcessorPtr & transform, size_t partial_result_port_id); + void connectPartialResultPort(OutputPort * partial_result_port, InputPort & partial_result_transform_port); + enum class StreamType { Main = 0, /// Stream for query data. There may be several streams of this type. Totals, /// Stream for totals. No more than one. Extremes, /// Stream for extremes. No more than one. + PartialResult, /// Stream for partial result data. There may be several streams of this type. }; using ProcessorGetter = std::function; @@ -109,10 +121,17 @@ private: Block header; std::shared_ptr processors; - /// Output ports. Totals and extremes are allowed to be empty. + /// If the variable is true, then each time a processor is added pipe will try + /// to add processor which will send partial result from original processor + bool is_partial_result_active = false; + UInt64 partial_result_limit = 0; + UInt64 partial_result_duration_ms = 0; + + /// Output ports. Totals, extremes and partial results are allowed to be empty. OutputPortRawPtrs output_ports; OutputPort * totals_port = nullptr; OutputPort * extremes_port = nullptr; + OutputPortRawPtrs partial_result_ports; /// It is the max number of processors which can be executed in parallel for each step. /// Usually, it's the same as the number of output ports. @@ -128,6 +147,8 @@ private: static Pipe unitePipes(Pipes pipes, Processors * collected_processors, bool allow_empty_header); void setSinks(const Pipe::ProcessorGetterWithStreamKind & getter); + void addProcessor(ProcessorPtr processor); + friend class QueryPipelineBuilder; friend class QueryPipeline; }; diff --git a/src/QueryPipeline/QueryPipeline.cpp b/src/QueryPipeline/QueryPipeline.cpp index 128972b8ff0..9a836f68da1 100644 --- a/src/QueryPipeline/QueryPipeline.cpp +++ b/src/QueryPipeline/QueryPipeline.cpp @@ -66,7 +66,8 @@ static void checkPulling( Processors & processors, OutputPort * output, OutputPort * totals, - OutputPort * extremes) + OutputPort * extremes, + OutputPort * partial_result) { if (!output || output->isConnected()) throw Exception( @@ -83,9 +84,15 @@ static void checkPulling( ErrorCodes::LOGICAL_ERROR, "Cannot create pulling QueryPipeline because its extremes port is connected"); + if (partial_result && partial_result->isConnected()) + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Cannot create pulling QueryPipeline because its partial_result port is connected"); + bool found_output = false; bool found_totals = false; bool found_extremes = false; + bool found_partial_result = false; for (const auto & processor : processors) { for (const auto & in : processor->getInputs()) @@ -99,6 +106,8 @@ static void checkPulling( found_totals = true; else if (extremes && &out == extremes) found_extremes = true; + else if (partial_result && &out == partial_result) + found_partial_result = true; else checkOutput(out, processor); } @@ -116,6 +125,10 @@ static void checkPulling( throw Exception( ErrorCodes::LOGICAL_ERROR, "Cannot create pulling QueryPipeline because its extremes port does not belong to any processor"); + if (partial_result && !found_partial_result) + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Cannot create pulling QueryPipeline because its partial result port does not belong to any processor"); } static void checkCompleted(Processors & processors) @@ -318,17 +331,20 @@ QueryPipeline::QueryPipeline( std::shared_ptr processors_, OutputPort * output_, OutputPort * totals_, - OutputPort * extremes_) + OutputPort * extremes_, + OutputPort * partial_result_) : resources(std::move(resources_)) , processors(std::move(processors_)) , output(output_) , totals(totals_) , extremes(extremes_) + , partial_result(partial_result_) { - checkPulling(*processors, output, totals, extremes); + checkPulling(*processors, output, totals, extremes, partial_result); } QueryPipeline::QueryPipeline(Pipe pipe) + : partial_result_duration_ms(pipe.partial_result_duration_ms) { if (pipe.numOutputPorts() > 0) { @@ -336,9 +352,10 @@ QueryPipeline::QueryPipeline(Pipe pipe) output = pipe.getOutputPort(0); totals = pipe.getTotalsPort(); extremes = pipe.getExtremesPort(); + partial_result = pipe.getPartialResultPort(0); processors = std::move(pipe.processors); - checkPulling(*processors, output, totals, extremes); + checkPulling(*processors, output, totals, extremes, partial_result); } else { @@ -370,6 +387,7 @@ QueryPipeline::QueryPipeline(std::shared_ptr format) auto & format_main = format->getPort(IOutputFormat::PortKind::Main); auto & format_totals = format->getPort(IOutputFormat::PortKind::Totals); auto & format_extremes = format->getPort(IOutputFormat::PortKind::Extremes); + auto & format_partial_result = format->getPort(IOutputFormat::PortKind::PartialResult); if (!totals) { @@ -385,12 +403,21 @@ QueryPipeline::QueryPipeline(std::shared_ptr format) processors->emplace_back(std::move(source)); } + if (!partial_result) + { + auto source = std::make_shared(format_partial_result.getHeader()); + partial_result = &source->getPort(); + processors->emplace_back(std::move(source)); + } + connect(*totals, format_totals); connect(*extremes, format_extremes); + connect(*partial_result, format_partial_result); input = &format_main; totals = nullptr; extremes = nullptr; + partial_result = nullptr; output_format = format.get(); @@ -418,6 +445,7 @@ void QueryPipeline::complete(std::shared_ptr sink) drop(totals, *processors); drop(extremes, *processors); + drop(partial_result, *processors); connect(*output, sink->getPort()); processors->emplace_back(std::move(sink)); @@ -433,6 +461,7 @@ void QueryPipeline::complete(Chain chain) drop(totals, *processors); drop(extremes, *processors); + drop(partial_result, *processors); processors->reserve(processors->size() + chain.getProcessors().size() + 1); for (auto processor : chain.getProcessors()) @@ -458,6 +487,7 @@ void QueryPipeline::complete(Pipe pipe) pipe.resize(1); pipe.dropExtremes(); pipe.dropTotals(); + pipe.dropPartialResult(); connect(*pipe.getOutputPort(0), *input); input = nullptr; @@ -486,11 +516,13 @@ void QueryPipeline::complete(std::shared_ptr format) addMaterializing(output, *processors); addMaterializing(totals, *processors); addMaterializing(extremes, *processors); + addMaterializing(partial_result, *processors); } auto & format_main = format->getPort(IOutputFormat::PortKind::Main); auto & format_totals = format->getPort(IOutputFormat::PortKind::Totals); auto & format_extremes = format->getPort(IOutputFormat::PortKind::Extremes); + auto & format_partial_result = format->getPort(IOutputFormat::PortKind::PartialResult); if (!totals) { @@ -506,13 +538,22 @@ void QueryPipeline::complete(std::shared_ptr format) processors->emplace_back(std::move(source)); } + if (!partial_result) + { + auto source = std::make_shared(format_partial_result.getHeader()); + partial_result = &source->getPort(); + processors->emplace_back(std::move(source)); + } + connect(*output, format_main); connect(*totals, format_totals); connect(*extremes, format_extremes); + connect(*partial_result, format_partial_result); output = nullptr; totals = nullptr; extremes = nullptr; + partial_result = nullptr; initRowsBeforeLimit(format.get()); output_format = format.get(); @@ -684,6 +725,7 @@ void QueryPipeline::convertStructureTo(const ColumnsWithTypeAndName & columns) addExpression(output, actions, *processors); addExpression(totals, actions, *processors); addExpression(extremes, actions, *processors); + addExpression(partial_result, actions, *processors); } std::unique_ptr QueryPipeline::getReadProgressCallback() const diff --git a/src/QueryPipeline/QueryPipeline.h b/src/QueryPipeline/QueryPipeline.h index f14cf61aac2..20e58bc0f59 100644 --- a/src/QueryPipeline/QueryPipeline.h +++ b/src/QueryPipeline/QueryPipeline.h @@ -75,7 +75,8 @@ public: std::shared_ptr processors_, OutputPort * output_, OutputPort * totals_ = nullptr, - OutputPort * extremes_ = nullptr); + OutputPort * extremes_ = nullptr, + OutputPort * partial_result_ = nullptr); bool initialized() const { return !processors->empty(); } /// When initialized, exactly one of the following is true. @@ -154,6 +155,7 @@ private: OutputPort * output = nullptr; OutputPort * totals = nullptr; OutputPort * extremes = nullptr; + OutputPort * partial_result = nullptr; QueryStatusPtr process_list_element; @@ -162,6 +164,9 @@ private: size_t num_threads = 0; bool concurrency_control = false; + UInt64 partial_result_limit = 0; + UInt64 partial_result_duration_ms = 0; + friend class PushingPipelineExecutor; friend class PullingPipelineExecutor; friend class PushingAsyncPipelineExecutor; diff --git a/src/QueryPipeline/QueryPipelineBuilder.cpp b/src/QueryPipeline/QueryPipelineBuilder.cpp index f9726339872..90f5ee364f3 100644 --- a/src/QueryPipeline/QueryPipelineBuilder.cpp +++ b/src/QueryPipeline/QueryPipelineBuilder.cpp @@ -110,6 +110,16 @@ void QueryPipelineBuilder::init(QueryPipeline & pipeline) pipe.header = {}; } + if (pipeline.partial_result) + { + /// Set partial result ports only after activation because when activated, it is set to nullptr + pipe.activatePartialResult(pipeline.partial_result_limit, pipeline.partial_result_duration_ms); + pipe.partial_result_ports = {pipeline.partial_result}; + } + + if (!pipeline.partial_result) + pipe.dropPartialResult(); + pipe.totals_port = pipeline.totals; pipe.extremes_port = pipeline.extremes; pipe.max_parallel_streams = pipeline.num_threads; @@ -352,6 +362,10 @@ std::unique_ptr QueryPipelineBuilder::joinPipelinesYShaped left->checkInitializedAndNotCompleted(); right->checkInitializedAndNotCompleted(); + /// TODO: Support joining of partial results from different pipelines. + left->pipe.dropPartialResult(); + right->pipe.dropPartialResult(); + left->pipe.dropExtremes(); right->pipe.dropExtremes(); if (left->getNumStreams() != 1 || right->getNumStreams() != 1) @@ -364,6 +378,7 @@ std::unique_ptr QueryPipelineBuilder::joinPipelinesYShaped auto joining = std::make_shared(join, inputs, out_header, max_block_size); + /// TODO: Support partial results in merge pipelines after joining support above. return mergePipelines(std::move(left), std::move(right), std::move(joining), collected_processors); } @@ -384,6 +399,10 @@ std::unique_ptr QueryPipelineBuilder::joinPipelinesRightLe left->pipe.dropExtremes(); right->pipe.dropExtremes(); + /// TODO: Support joining of partial results from different pipelines. + left->pipe.dropPartialResult(); + right->pipe.dropPartialResult(); + left->pipe.collected_processors = collected_processors; /// Collect the NEW processors for the right pipeline. @@ -634,7 +653,7 @@ PipelineExecutorPtr QueryPipelineBuilder::execute() if (!isCompleted()) throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot execute pipeline because it is not completed"); - return std::make_shared(pipe.processors, process_list_element); + return std::make_shared(pipe.processors, process_list_element, pipe.partial_result_duration_ms); } Pipe QueryPipelineBuilder::getPipe(QueryPipelineBuilder pipeline, QueryPlanResourceHolder & resources) diff --git a/src/QueryPipeline/QueryPipelineBuilder.h b/src/QueryPipeline/QueryPipelineBuilder.h index 5d273df7068..612e7b1652f 100644 --- a/src/QueryPipeline/QueryPipelineBuilder.h +++ b/src/QueryPipeline/QueryPipelineBuilder.h @@ -85,6 +85,12 @@ public: /// Pipeline will be completed after this transformation. void setSinks(const Pipe::ProcessorGetterWithStreamKind & getter); + /// Activate building separate pipeline for sending partial result. + void activatePartialResult(UInt64 partial_result_limit, UInt64 partial_result_duration_ms) { pipe.activatePartialResult(partial_result_limit, partial_result_duration_ms); } + + /// Check if building of a pipeline for sending partial result active. + bool isPartialResultActive() { return pipe.isPartialResultActive(); } + /// Add totals which returns one chunk with single row with defaults. void addDefaultTotals(); diff --git a/src/Server/TCPHandler.cpp b/src/Server/TCPHandler.cpp index 96c449b7e17..c687a6064b4 100644 --- a/src/Server/TCPHandler.cpp +++ b/src/Server/TCPHandler.cpp @@ -885,7 +885,8 @@ void TCPHandler::processOrdinaryQueryWithProcessors() std::unique_lock progress_lock(task_callback_mutex, std::defer_lock); { - PullingAsyncPipelineExecutor executor(pipeline); + bool has_partial_result_setting = query_context->getSettingsRef().partial_result_update_duration_ms.totalMilliseconds() > 0; + PullingAsyncPipelineExecutor executor(pipeline, has_partial_result_setting); CurrentMetrics::Increment query_thread_metric_increment{CurrentMetrics::QueryThread}; Block block; diff --git a/tests/queries/0_stateless/02010_lc_native.python b/tests/queries/0_stateless/02010_lc_native.python index 6c4220855c8..219fdf04472 100755 --- a/tests/queries/0_stateless/02010_lc_native.python +++ b/tests/queries/0_stateless/02010_lc_native.python @@ -1,227 +1,33 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- -import socket import os -import uuid +import sys -CLICKHOUSE_HOST = os.environ.get("CLICKHOUSE_HOST", "127.0.0.1") -CLICKHOUSE_PORT = int(os.environ.get("CLICKHOUSE_PORT_TCP", "900000")) -CLICKHOUSE_DATABASE = os.environ.get("CLICKHOUSE_DATABASE", "default") -CLIENT_NAME = "simple native protocol" +CURDIR = os.path.dirname(os.path.realpath(__file__)) +sys.path.insert(0, os.path.join(CURDIR, "helpers")) - -def writeVarUInt(x, ba): - for _ in range(0, 9): - byte = x & 0x7F - if x > 0x7F: - byte |= 0x80 - - ba.append(byte) - - x >>= 7 - if x == 0: - return - - -def writeStringBinary(s, ba): - b = bytes(s, "utf-8") - writeVarUInt(len(s), ba) - ba.extend(b) - - -def readStrict(s, size=1): - res = bytearray() - while size: - cur = s.recv(size) - # if not res: - # raise "Socket is closed" - size -= len(cur) - res.extend(cur) - - return res - - -def readUInt(s, size=1): - res = readStrict(s, size) - val = 0 - for i in range(len(res)): - val += res[i] << (i * 8) - return val - - -def readUInt8(s): - return readUInt(s) - - -def readUInt16(s): - return readUInt(s, 2) - - -def readUInt32(s): - return readUInt(s, 4) - - -def readUInt64(s): - return readUInt(s, 8) - - -def readVarUInt(s): - x = 0 - for i in range(9): - byte = readStrict(s)[0] - x |= (byte & 0x7F) << (7 * i) - - if not byte & 0x80: - return x - - return x - - -def readStringBinary(s): - size = readVarUInt(s) - s = readStrict(s, size) - return s.decode("utf-8") - - -def sendHello(s): - ba = bytearray() - writeVarUInt(0, ba) # Hello - writeStringBinary(CLIENT_NAME, ba) - writeVarUInt(21, ba) - writeVarUInt(9, ba) - writeVarUInt(54449, ba) - writeStringBinary("default", ba) # database - writeStringBinary("default", ba) # user - writeStringBinary("", ba) # pwd - s.sendall(ba) - - -def receiveHello(s): - p_type = readVarUInt(s) - assert p_type == 0 # Hello - server_name = readStringBinary(s) - # print("Server name: ", server_name) - server_version_major = readVarUInt(s) - # print("Major: ", server_version_major) - server_version_minor = readVarUInt(s) - # print("Minor: ", server_version_minor) - server_revision = readVarUInt(s) - # print("Revision: ", server_revision) - server_timezone = readStringBinary(s) - # print("Timezone: ", server_timezone) - server_display_name = readStringBinary(s) - # print("Display name: ", server_display_name) - server_version_patch = readVarUInt(s) - # print("Version patch: ", server_version_patch) - - -def serializeClientInfo(ba, query_id): - writeStringBinary("default", ba) # initial_user - writeStringBinary(query_id, ba) # initial_query_id - writeStringBinary("127.0.0.1:9000", ba) # initial_address - ba.extend([0] * 8) # initial_query_start_time_microseconds - ba.append(1) # TCP - writeStringBinary("os_user", ba) # os_user - writeStringBinary("client_hostname", ba) # client_hostname - writeStringBinary(CLIENT_NAME, ba) # client_name - writeVarUInt(21, ba) - writeVarUInt(9, ba) - writeVarUInt(54449, ba) - writeStringBinary("", ba) # quota_key - writeVarUInt(0, ba) # distributed_depth - writeVarUInt(1, ba) # client_version_patch - ba.append(0) # No telemetry - - -def sendQuery(s, query): - ba = bytearray() - query_id = uuid.uuid4().hex - writeVarUInt(1, ba) # query - writeStringBinary(query_id, ba) - - ba.append(1) # INITIAL_QUERY - - # client info - serializeClientInfo(ba, query_id) - - writeStringBinary("", ba) # No settings - writeStringBinary("", ba) # No interserver secret - writeVarUInt(2, ba) # Stage - Complete - ba.append(0) # No compression - writeStringBinary(query, ba) # query, finally - s.sendall(ba) - - -def serializeBlockInfo(ba): - writeVarUInt(1, ba) # 1 - ba.append(0) # is_overflows - writeVarUInt(2, ba) # 2 - writeVarUInt(0, ba) # 0 - ba.extend([0] * 4) # bucket_num - - -def sendEmptyBlock(s): - ba = bytearray() - writeVarUInt(2, ba) # Data - writeStringBinary("", ba) - serializeBlockInfo(ba) - writeVarUInt(0, ba) # rows - writeVarUInt(0, ba) # columns - s.sendall(ba) - - -def assertPacket(packet, expected): - assert packet == expected, packet - - -def readHeader(s): - packet_type = readVarUInt(s) - if packet_type == 2: # Exception - raise RuntimeError(readException(s)) - assertPacket(packet_type, 1) # Data - - readStringBinary(s) # external table name - # BlockInfo - assertPacket(readVarUInt(s), 1) # 1 - assertPacket(readUInt8(s), 0) # is_overflows - assertPacket(readVarUInt(s), 2) # 2 - assertPacket(readUInt32(s), 4294967295) # bucket_num - assertPacket(readVarUInt(s), 0) # 0 - columns = readVarUInt(s) # rows - rows = readVarUInt(s) # columns - print("Rows {} Columns {}".format(rows, columns)) - for _ in range(columns): - col_name = readStringBinary(s) - type_name = readStringBinary(s) - print("Column {} type {}".format(col_name, type_name)) - - -def readException(s): - code = readUInt32(s) - name = readStringBinary(s) - text = readStringBinary(s) - readStringBinary(s) # trace - assertPacket(readUInt8(s), 0) # has_nested - return "code {}: {}".format(code, text.replace("DB::Exception:", "")) +from tcp_client import ( + TCPClient, + CLICKHOUSE_DATABASE, + writeVarUInt, + writeStringBinary, + serializeBlockInfo, + assertPacket, +) def insertValidLowCardinalityRow(): - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: - s.settimeout(30) - s.connect((CLICKHOUSE_HOST, CLICKHOUSE_PORT)) - sendHello(s) - receiveHello(s) - sendQuery( - s, + with TCPClient() as client: + client.sendQuery( "insert into {}.tab settings input_format_defaults_for_omitted_fields=0 format TSV".format( CLICKHOUSE_DATABASE ), ) # external tables - sendEmptyBlock(s) - readHeader(s) + client.sendEmptyBlock() + client.readHeader() # Data ba = bytearray() @@ -240,31 +46,25 @@ def insertValidLowCardinalityRow(): writeStringBinary("hello", ba) # key ba.extend([1] + [0] * 7) # num_indexes ba.extend([0] * 8) # UInt64 index (0 for 'hello') - s.sendall(ba) + client.send(ba) # Fin block - sendEmptyBlock(s) + client.sendEmptyBlock() - assertPacket(readVarUInt(s), 5) # End of stream - s.close() + assertPacket(client.readVarUInt(), 5) # End of stream def insertLowCardinalityRowWithIndexOverflow(): - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: - s.settimeout(30) - s.connect((CLICKHOUSE_HOST, CLICKHOUSE_PORT)) - sendHello(s) - receiveHello(s) - sendQuery( - s, + with TCPClient() as client: + client.sendQuery( "insert into {}.tab settings input_format_defaults_for_omitted_fields=0 format TSV".format( CLICKHOUSE_DATABASE ), ) # external tables - sendEmptyBlock(s) - readHeader(s) + client.sendEmptyBlock() + client.readHeader() # Data ba = bytearray() @@ -283,29 +83,23 @@ def insertLowCardinalityRowWithIndexOverflow(): writeStringBinary("hello", ba) # key ba.extend([1] + [0] * 7) # num_indexes ba.extend([0] * 7 + [1]) # UInt64 index (overflow) - s.sendall(ba) + client.send(ba) - assertPacket(readVarUInt(s), 2) - print(readException(s)) - s.close() + assertPacket(client.readVarUInt(), 2) # Exception + print(client.readException()) def insertLowCardinalityRowWithIncorrectDictType(): - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: - s.settimeout(30) - s.connect((CLICKHOUSE_HOST, CLICKHOUSE_PORT)) - sendHello(s) - receiveHello(s) - sendQuery( - s, + with TCPClient() as client: + client.sendQuery( "insert into {}.tab settings input_format_defaults_for_omitted_fields=0 format TSV".format( CLICKHOUSE_DATABASE ), ) # external tables - sendEmptyBlock(s) - readHeader(s) + client.sendEmptyBlock() + client.readHeader() # Data ba = bytearray() @@ -324,29 +118,23 @@ def insertLowCardinalityRowWithIncorrectDictType(): writeStringBinary("hello", ba) # key ba.extend([1] + [0] * 7) # num_indexes ba.extend([0] * 8) # UInt64 index (overflow) - s.sendall(ba) + client.send(ba) - assertPacket(readVarUInt(s), 2) - print(readException(s)) - s.close() + assertPacket(client.readVarUInt(), 2) # Exception + print(client.readException()) def insertLowCardinalityRowWithIncorrectAdditionalKeys(): - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: - s.settimeout(30) - s.connect((CLICKHOUSE_HOST, CLICKHOUSE_PORT)) - sendHello(s) - receiveHello(s) - sendQuery( - s, + with TCPClient() as client: + client.sendQuery( "insert into {}.tab settings input_format_defaults_for_omitted_fields=0 format TSV".format( CLICKHOUSE_DATABASE ), ) # external tables - sendEmptyBlock(s) - readHeader(s) + client.sendEmptyBlock() + client.readHeader() # Data ba = bytearray() @@ -365,11 +153,10 @@ def insertLowCardinalityRowWithIncorrectAdditionalKeys(): writeStringBinary("hello", ba) # key ba.extend([1] + [0] * 7) # num_indexes ba.extend([0] * 8) # UInt64 index (0 for 'hello') - s.sendall(ba) + client.send(ba) - assertPacket(readVarUInt(s), 2) - print(readException(s)) - s.close() + assertPacket(client.readVarUInt(), 2) # Exception + print(client.readException()) def main(): diff --git a/tests/queries/0_stateless/02210_processors_profile_log.reference b/tests/queries/0_stateless/02210_processors_profile_log.reference index 41543d0706a..f480236111f 100644 --- a/tests/queries/0_stateless/02210_processors_profile_log.reference +++ b/tests/queries/0_stateless/02210_processors_profile_log.reference @@ -38,4 +38,5 @@ LazyOutputFormat 1 1 1 0 0 LimitsCheckingTransform 1 1 1 1 1 NullSource 1 0 0 0 0 NullSource 1 0 0 0 0 +NullSource 0 0 0 0 0 SourceFromSingleChunk 1 0 0 1 1 diff --git a/tests/queries/0_stateless/02458_insert_select_progress_tcp.python b/tests/queries/0_stateless/02458_insert_select_progress_tcp.python index 92240e109c1..fdc64a8dba8 100644 --- a/tests/queries/0_stateless/02458_insert_select_progress_tcp.python +++ b/tests/queries/0_stateless/02458_insert_select_progress_tcp.python @@ -1,188 +1,30 @@ #!/usr/bin/env python3 -import socket -import os -import uuid import json +import os +import sys -CLICKHOUSE_HOST = os.environ.get("CLICKHOUSE_HOST", "127.0.0.1") -CLICKHOUSE_PORT = int(os.environ.get("CLICKHOUSE_PORT_TCP", "900000")) -CLICKHOUSE_DATABASE = os.environ.get("CLICKHOUSE_DATABASE", "default") -CLIENT_NAME = "simple native protocol" +CURDIR = os.path.dirname(os.path.realpath(__file__)) +sys.path.insert(0, os.path.join(CURDIR, "helpers")) - -def writeVarUInt(x, ba): - for _ in range(0, 9): - byte = x & 0x7F - if x > 0x7F: - byte |= 0x80 - - ba.append(byte) - - x >>= 7 - if x == 0: - return - - -def writeStringBinary(s, ba): - b = bytes(s, "utf-8") - writeVarUInt(len(s), ba) - ba.extend(b) - - -def readStrict(s, size=1): - res = bytearray() - while size: - cur = s.recv(size) - # if not res: - # raise "Socket is closed" - size -= len(cur) - res.extend(cur) - - return res - - -def readUInt(s, size=1): - res = readStrict(s, size) - val = 0 - for i in range(len(res)): - val += res[i] << (i * 8) - return val - - -def readUInt8(s): - return readUInt(s) - - -def readUInt16(s): - return readUInt(s, 2) - - -def readUInt32(s): - return readUInt(s, 4) - - -def readUInt64(s): - return readUInt(s, 8) - - -def readVarUInt(s): - x = 0 - for i in range(9): - byte = readStrict(s)[0] - x |= (byte & 0x7F) << (7 * i) - - if not byte & 0x80: - return x - - return x - - -def readStringBinary(s): - size = readVarUInt(s) - s = readStrict(s, size) - return s.decode("utf-8") - - -def sendHello(s): - ba = bytearray() - writeVarUInt(0, ba) # Hello - writeStringBinary(CLIENT_NAME, ba) - writeVarUInt(21, ba) - writeVarUInt(9, ba) - writeVarUInt(54449, ba) - writeStringBinary(CLICKHOUSE_DATABASE, ba) # database - writeStringBinary("default", ba) # user - writeStringBinary("", ba) # pwd - s.sendall(ba) - - -def receiveHello(s): - p_type = readVarUInt(s) - assert p_type == 0 # Hello - server_name = readStringBinary(s) - # print("Server name: ", server_name) - server_version_major = readVarUInt(s) - # print("Major: ", server_version_major) - server_version_minor = readVarUInt(s) - # print("Minor: ", server_version_minor) - server_revision = readVarUInt(s) - # print("Revision: ", server_revision) - server_timezone = readStringBinary(s) - # print("Timezone: ", server_timezone) - server_display_name = readStringBinary(s) - # print("Display name: ", server_display_name) - server_version_patch = readVarUInt(s) - # print("Version patch: ", server_version_patch) - - -def serializeClientInfo(ba, query_id): - writeStringBinary("default", ba) # initial_user - writeStringBinary(query_id, ba) # initial_query_id - writeStringBinary("127.0.0.1:9000", ba) # initial_address - ba.extend([0] * 8) # initial_query_start_time_microseconds - ba.append(1) # TCP - writeStringBinary("os_user", ba) # os_user - writeStringBinary("client_hostname", ba) # client_hostname - writeStringBinary(CLIENT_NAME, ba) # client_name - writeVarUInt(21, ba) - writeVarUInt(9, ba) - writeVarUInt(54449, ba) - writeStringBinary("", ba) # quota_key - writeVarUInt(0, ba) # distributed_depth - writeVarUInt(1, ba) # client_version_patch - ba.append(0) # No telemetry - - -def sendQuery(s, query): - ba = bytearray() - query_id = uuid.uuid4().hex - writeVarUInt(1, ba) # query - writeStringBinary(query_id, ba) - - ba.append(1) # INITIAL_QUERY - - # client info - serializeClientInfo(ba, query_id) - - writeStringBinary("", ba) # No settings - writeStringBinary("", ba) # No interserver secret - writeVarUInt(2, ba) # Stage - Complete - ba.append(0) # No compression - writeStringBinary(query, ba) # query, finally - s.sendall(ba) - - -def serializeBlockInfo(ba): - writeVarUInt(1, ba) # 1 - ba.append(0) # is_overflows - writeVarUInt(2, ba) # 2 - writeVarUInt(0, ba) # 0 - ba.extend([0] * 4) # bucket_num - - -def sendEmptyBlock(s): - ba = bytearray() - writeVarUInt(2, ba) # Data - writeStringBinary("", ba) - serializeBlockInfo(ba) - writeVarUInt(0, ba) # rows - writeVarUInt(0, ba) # columns - s.sendall(ba) - - -def assertPacket(packet, expected): - assert packet == expected, packet +from tcp_client import TCPClient class Progress: - def __init__(self): + def __init__( + self, + read_rows=0, + read_bytes=0, + total_rows_to_read=0, + written_rows=0, + written_bytes=0, + ): # NOTE: this is done in ctor to initialize __dict__ - self.read_rows = 0 - self.read_bytes = 0 - self.total_rows_to_read = 0 - self.written_rows = 0 - self.written_bytes = 0 + self.read_rows = read_rows + self.read_bytes = read_bytes + self.total_rows_to_read = total_rows_to_read + self.written_rows = written_rows + self.written_bytes = written_bytes def __str__(self): return json.dumps(self.__dict__) @@ -195,13 +37,6 @@ class Progress: self.written_bytes += b.written_bytes return self - def readPacket(self, s): - self.read_rows += readVarUInt(s) - self.read_bytes += readVarUInt(s) - self.total_rows_to_read += readVarUInt(s) - self.written_rows += readVarUInt(s) - self.written_bytes += readVarUInt(s) - def __bool__(self): return ( self.read_rows > 0 @@ -212,52 +47,25 @@ class Progress: ) -def readProgress(s): - packet_type = readVarUInt(s) - if packet_type == 2: # Exception - raise RuntimeError(readException(s)) - - if packet_type == 5: # End stream - return None - - assertPacket(packet_type, 3) # Progress - - progress = Progress() - progress.readPacket(s) - return progress - - -def readException(s): - code = readUInt32(s) - name = readStringBinary(s) - text = readStringBinary(s) - readStringBinary(s) # trace - assertPacket(readUInt8(s), 0) # has_nested - return "code {}: {}".format(code, text.replace("DB::Exception:", "")) - - def main(): - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: - s.settimeout(30) - s.connect((CLICKHOUSE_HOST, CLICKHOUSE_PORT)) - sendHello(s) - receiveHello(s) + with TCPClient() as client: # For 1 second sleep and 1000ms of interactive_delay we definitelly should have non zero progress packet. # NOTE: interactive_delay=0 cannot be used since in this case CompletedPipelineExecutor will not call cancelled callback. - sendQuery( - s, + client.sendQuery( "insert into function null('_ Int') select sleep(1) from numbers(2) settings max_block_size=1, interactive_delay=1000", ) # external tables - sendEmptyBlock(s) + client.sendEmptyBlock() summary_progress = Progress() non_empty_progress_packets = 0 while True: - progress = readProgress(s) - if progress is None: + progress_info = client.readProgress() + if progress_info is None: break + + progress = Progress(*progress_info) summary_progress += progress if progress: non_empty_progress_packets += 1 @@ -268,8 +76,6 @@ def main(): # - 1 or 2 for each SELECT block assert non_empty_progress_packets in (3, 4), f"{non_empty_progress_packets=:}" - s.close() - if __name__ == "__main__": main() diff --git a/tests/queries/0_stateless/02750_settings_alias_tcp_protocol.python b/tests/queries/0_stateless/02750_settings_alias_tcp_protocol.python index 48b27d434ec..1736807410f 100644 --- a/tests/queries/0_stateless/02750_settings_alias_tcp_protocol.python +++ b/tests/queries/0_stateless/02750_settings_alias_tcp_protocol.python @@ -1,217 +1,23 @@ #!/usr/bin/env python3 -import socket + import os -import uuid -import json +import sys -CLICKHOUSE_HOST = os.environ.get("CLICKHOUSE_HOST", "127.0.0.1") -CLICKHOUSE_PORT = int(os.environ.get("CLICKHOUSE_PORT_TCP", "900000")) -CLICKHOUSE_DATABASE = os.environ.get("CLICKHOUSE_DATABASE", "default") -CLIENT_NAME = "simple native protocol" +CURDIR = os.path.dirname(os.path.realpath(__file__)) +sys.path.insert(0, os.path.join(CURDIR, "helpers")) - -def writeVarUInt(x, ba): - for _ in range(0, 9): - byte = x & 0x7F - if x > 0x7F: - byte |= 0x80 - - ba.append(byte) - - x >>= 7 - if x == 0: - return - - -def writeStringBinary(s, ba): - b = bytes(s, "utf-8") - writeVarUInt(len(s), ba) - ba.extend(b) - - -def readStrict(s, size=1): - res = bytearray() - while size: - cur = s.recv(size) - # if not res: - # raise "Socket is closed" - size -= len(cur) - res.extend(cur) - - return res - - -def readUInt(s, size=1): - res = readStrict(s, size) - val = 0 - for i in range(len(res)): - val += res[i] << (i * 8) - return val - - -def readUInt8(s): - return readUInt(s) - - -def readUInt16(s): - return readUInt(s, 2) - - -def readUInt32(s): - return readUInt(s, 4) - - -def readUInt64(s): - return readUInt(s, 8) - - -def readVarUInt(s): - x = 0 - for i in range(9): - byte = readStrict(s)[0] - x |= (byte & 0x7F) << (7 * i) - - if not byte & 0x80: - return x - - return x - - -def readStringBinary(s): - size = readVarUInt(s) - s = readStrict(s, size) - return s.decode("utf-8") - - -def sendHello(s): - ba = bytearray() - writeVarUInt(0, ba) # Hello - writeStringBinary(CLIENT_NAME, ba) - writeVarUInt(21, ba) - writeVarUInt(9, ba) - writeVarUInt(54449, ba) - writeStringBinary(CLICKHOUSE_DATABASE, ba) # database - writeStringBinary("default", ba) # user - writeStringBinary("", ba) # pwd - s.sendall(ba) - - -def receiveHello(s): - p_type = readVarUInt(s) - assert p_type == 0 # Hello - _server_name = readStringBinary(s) - _server_version_major = readVarUInt(s) - _server_version_minor = readVarUInt(s) - _server_revision = readVarUInt(s) - _server_timezone = readStringBinary(s) - _server_display_name = readStringBinary(s) - _server_version_patch = readVarUInt(s) - - -def serializeClientInfo(ba, query_id): - writeStringBinary("default", ba) # initial_user - writeStringBinary(query_id, ba) # initial_query_id - writeStringBinary("127.0.0.1:9000", ba) # initial_address - ba.extend([0] * 8) # initial_query_start_time_microseconds - ba.append(1) # TCP - writeStringBinary("os_user", ba) # os_user - writeStringBinary("client_hostname", ba) # client_hostname - writeStringBinary(CLIENT_NAME, ba) # client_name - writeVarUInt(21, ba) - writeVarUInt(9, ba) - writeVarUInt(54449, ba) - writeStringBinary("", ba) # quota_key - writeVarUInt(0, ba) # distributed_depth - writeVarUInt(1, ba) # client_version_patch - ba.append(0) # No telemetry - - -def sendQuery(s, query, settings): - ba = bytearray() - query_id = uuid.uuid4().hex - writeVarUInt(1, ba) # query - writeStringBinary(query_id, ba) - - ba.append(1) # INITIAL_QUERY - - # client info - serializeClientInfo(ba, query_id) - - # Settings - for key, value in settings.items(): - writeStringBinary(key, ba) - writeVarUInt(1, ba) # is_important - writeStringBinary(str(value), ba) - writeStringBinary("", ba) # End of settings - - writeStringBinary("", ba) # No interserver secret - writeVarUInt(2, ba) # Stage - Complete - ba.append(0) # No compression - writeStringBinary(query, ba) # query, finally - s.sendall(ba) - - -def serializeBlockInfo(ba): - writeVarUInt(1, ba) # 1 - ba.append(0) # is_overflows - writeVarUInt(2, ba) # 2 - writeVarUInt(0, ba) # 0 - ba.extend([0] * 4) # bucket_num - - -def sendEmptyBlock(s): - ba = bytearray() - writeVarUInt(2, ba) # Data - writeStringBinary("", ba) - serializeBlockInfo(ba) - writeVarUInt(0, ba) # rows - writeVarUInt(0, ba) # columns - s.sendall(ba) - - -def assertPacket(packet, expected): - assert packet == expected, "Got: {}, expected: {}".format(packet, expected) - - -def readResponse(s): - packet_type = readVarUInt(s) - if packet_type == 2: # Exception - raise RuntimeError(readException(s)) - - if packet_type == 1: # Data - return None - if packet_type == 3: # Progress - return None - if packet_type == 5: # End stream - return None - - raise RuntimeError("Unexpected packet: {}".format(packet_type)) - - -def readException(s): - code = readUInt32(s) - _name = readStringBinary(s) - text = readStringBinary(s) - readStringBinary(s) # trace - assertPacket(readUInt8(s), 0) # has_nested - return "code {}: {}".format(code, text.replace("DB::Exception:", "")) +from tcp_client import TCPClient def main(): - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: - s.settimeout(30) - s.connect((CLICKHOUSE_HOST, CLICKHOUSE_PORT)) - sendHello(s) - receiveHello(s) - sendQuery(s, "select 1", {"replication_alter_partitions_sync": 1}) + with TCPClient() as client: + client.sendQuery("select 1", {"replication_alter_partitions_sync": 1}) # external tables - sendEmptyBlock(s) + client.sendEmptyBlock() - while readResponse(s) is not None: + while client.readResponse() is not None: pass - - s.close() print("OK") diff --git a/tests/queries/0_stateless/02833_partial_sorting_result_during_query_execution.python b/tests/queries/0_stateless/02833_partial_sorting_result_during_query_execution.python new file mode 100755 index 00000000000..b3d04d263dc --- /dev/null +++ b/tests/queries/0_stateless/02833_partial_sorting_result_during_query_execution.python @@ -0,0 +1,93 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + + +import os +import sys + +CURDIR = os.path.dirname(os.path.realpath(__file__)) +sys.path.insert(0, os.path.join(CURDIR, "helpers")) + +from tcp_client import TCPClient + + +def run_query_without_errors(query, support_partial_result): + with TCPClient() as client: + client.sendQuery(query) + + # external tables + client.sendEmptyBlock() + client.readHeader() + + # Partial result + partial_result = client.readDataWithoutProgress()[0] + if support_partial_result: + assert ( + len(partial_result.value) > 0 + ), "Expected at least one block with a non-empty partial result before getting the full result" + + while True: + assert all( + a >= b + for a, b in zip(partial_result.value, partial_result.value[1:]) + ), "Partial result always should be sorted for this test" + + new_partial_result = client.readDataWithoutProgress( + need_print_info=False + )[0] + if len(new_partial_result.value) == 0: + break + + data_size = len(partial_result.value) + assert all( + partial_result.value[i] <= new_partial_result.value[i] + for i in range(data_size) + ), f"New partial result values should always be greater then old one because a new block contains more information about the full data. New result {new_partial_result}. Previous result {partial_result}" + + partial_result = new_partial_result + else: + block_rows = len(partial_result.value) + assert ( + block_rows == 0 + ), f"Expected only empty partial result block before getting the full result, but block has {block_rows} rows" + + # Full result + full_result = client.readDataWithoutProgress()[0] + + data_size = len(partial_result.value) + assert all( + partial_result.value[i] <= full_result.value[i] for i in range(data_size) + ), f"Full result values should always be greater then partial result values. Full result {full_result}. Partial result {partial_result}" + + for result in full_result.value: + print(result) + + +def main(): + # Request with partial result limit less then full limit + run_query_without_errors( + "SELECT number FROM numbers_mt(5e6+1) ORDER BY -number LIMIT 5 SETTINGS max_threads = 1, partial_result_update_duration_ms = 1, max_rows_in_partial_result = 3", + support_partial_result=True, + ) + + # Request with partial result limit greater then full limit + run_query_without_errors( + "SELECT number FROM numbers_mt(5e6+1) ORDER BY -number LIMIT 3 SETTINGS max_threads = 1, partial_result_update_duration_ms = 1, max_rows_in_partial_result = 5", + support_partial_result=True, + ) + + # Request with OFFSET + run_query_without_errors( + "SELECT number FROM numbers_mt(5e6+1) ORDER BY -number LIMIT 3 OFFSET 1 SETTINGS max_threads = 1, partial_result_update_duration_ms = 1, max_rows_in_partial_result = 5", + support_partial_result=True, + ) + + # Request with OFFSET greater then partial result limit (partial result pipeline use blocks with less then OFFSET, so there will be no elements in block after LimitPartialResultTransform) + run_query_without_errors( + "SELECT number FROM numbers_mt(5e6+1) ORDER BY -number LIMIT 3 OFFSET 15 SETTINGS max_threads = 1, partial_result_update_duration_ms = 1, max_rows_in_partial_result = 5", + support_partial_result=False, + ) + + +if __name__ == "__main__": + main() diff --git a/tests/queries/0_stateless/02833_partial_sorting_result_during_query_execution.reference b/tests/queries/0_stateless/02833_partial_sorting_result_during_query_execution.reference new file mode 100644 index 00000000000..211a193940a --- /dev/null +++ b/tests/queries/0_stateless/02833_partial_sorting_result_during_query_execution.reference @@ -0,0 +1,38 @@ +Rows 0 Columns 1 +Column number type UInt64 +Rows 3 Columns 1 +Column number type UInt64 +Rows 5 Columns 1 +Column number type UInt64 +5000000 +4999999 +4999998 +4999997 +4999996 +Rows 0 Columns 1 +Column number type UInt64 +Rows 3 Columns 1 +Column number type UInt64 +Rows 3 Columns 1 +Column number type UInt64 +5000000 +4999999 +4999998 +Rows 0 Columns 1 +Column number type UInt64 +Rows 3 Columns 1 +Column number type UInt64 +Rows 3 Columns 1 +Column number type UInt64 +4999999 +4999998 +4999997 +Rows 0 Columns 1 +Column number type UInt64 +Rows 0 Columns 1 +Column number type UInt64 +Rows 3 Columns 1 +Column number type UInt64 +4999985 +4999984 +4999983 diff --git a/tests/queries/0_stateless/02833_partial_sorting_result_during_query_execution.sh b/tests/queries/0_stateless/02833_partial_sorting_result_during_query_execution.sh new file mode 100755 index 00000000000..1ed15197dbf --- /dev/null +++ b/tests/queries/0_stateless/02833_partial_sorting_result_during_query_execution.sh @@ -0,0 +1,8 @@ +#!/usr/bin/env bash + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CURDIR"/../shell_config.sh + +# We should have correct env vars from shell_config.sh to run this test +python3 "$CURDIR"/02833_partial_sorting_result_during_query_execution.python diff --git a/tests/queries/0_stateless/02834_partial_aggregating_result_during_query_execution.python b/tests/queries/0_stateless/02834_partial_aggregating_result_during_query_execution.python new file mode 100644 index 00000000000..4d869b05580 --- /dev/null +++ b/tests/queries/0_stateless/02834_partial_aggregating_result_during_query_execution.python @@ -0,0 +1,125 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + + +import os +import sys + +CURDIR = os.path.dirname(os.path.realpath(__file__)) +sys.path.insert(0, os.path.join(CURDIR, "helpers")) + +from tcp_client import TCPClient + + +def get_keys(results): + return [key for key, _ in results] + + +def check_new_result(new_results, old_results, invariants, rows_limit): + if rows_limit is not None: + assert ( + len(new_results[0].value) <= rows_limit + ), f"Result should have no more then {rows_limit} rows. But it has {len(new_results[0].value)} rows" + + for new_result, old_result in zip(new_results, old_results): + assert ( + new_result.key == old_result.key + ), f"Keys in blocks should be in the same order. Full results keys {get_keys(full_results)}. Partial results keys {get_keys(partial_results)}" + + key = new_result.key + if key in invariants: + new_value = new_result.value + old_value = old_result.value + assert invariants[key]( + old_value, new_value + ), f"Problem with the invariant between new and old result for key: {key}. New value {new_value}. Old value {old_value}" + + +def run_query_without_errors( + query, support_partial_result, invariants=None, rows_limit=None +): + if invariants is None: + invariants = {} + + with TCPClient() as client: + client.sendQuery(query) + + # external tables + client.sendEmptyBlock() + client.readHeader() + + # Partial result + partial_results = client.readDataWithoutProgress() + if support_partial_result: + assert ( + len(partial_results) > 0 and len(partial_results[0].value) > 0 + ), "Expected at least one block with a non-empty partial result before getting the full result" + while True: + new_partial_results = client.readDataWithoutProgress( + need_print_info=False + ) + if len(new_partial_results[0].value) == 0: + break + + check_new_result( + new_partial_results, partial_results, invariants, rows_limit + ) + partial_results = new_partial_results + else: + block_rows = len(partial_results[0].value) + assert ( + block_rows == 0 + ), f"Expected only empty partial result block before getting the full result, but block has {block_rows} rows" + + # Full result + full_results = client.readDataWithoutProgress() + if support_partial_result: + check_new_result(full_results, partial_results, invariants, rows_limit) + + for data in full_results: + if isinstance(data.value[0], int): + print(data.key, data.value) + + +def supported_scenarios_without_key(): + # Simple aggregation query + query = "select median(number), stddevSamp(number), stddevPop(number), max(number), min(number), any(number), count(number), avg(number), sum(number) from numbers_mt(1e7+1) settings max_threads = 1, partial_result_update_duration_ms = 1" + invariants = { + "median(number)": lambda old_value, new_value: old_value <= new_value, + "max(number)": lambda old_value, new_value: old_value <= new_value, + "min(number)": lambda old_value, new_value: old_value >= new_value, + "count(number)": lambda old_value, new_value: old_value <= new_value, + "avg(number)": lambda old_value, new_value: old_value <= new_value, + "sum(number)": lambda old_value, new_value: old_value <= new_value, + } + run_query_without_errors( + query, support_partial_result=True, invariants=invariants, rows_limit=1 + ) + + # Aggregation query with a nested ORDER BY subquery + query = "select median(number), stddevSamp(number), stddevPop(number), max(number), min(number), any(number), count(number), avg(number), sum(number) FROM (SELECT number FROM numbers_mt(1e7) ORDER BY -number LIMIT 3) settings max_threads = 1, partial_result_update_duration_ms=1" + + # Aggregation receives small partial result blocks from ORDER BY which always sends blocks with bigger values + invariants["min(number)"] = lambda old_value, new_value: old_value <= new_value + run_query_without_errors( + query, support_partial_result=True, invariants=invariants, rows_limit=1 + ) + + +def unsupported_scenarios(): + # Currently aggregator for partial result supports only single thread aggregation without key + # Update test when multithreading or aggregation with GROUP BY will be supported for partial result updates + multithread_query = "select sum(number) from numbers_mt(1e7+1) settings max_threads = 2, partial_result_update_duration_ms = 100" + run_query_without_errors(multithread_query, support_partial_result=False) + + group_with_key_query = "select mod2, sum(number) from numbers_mt(1e7+1) group by number % 2 as mod2 settings max_threads = 1, partial_result_update_duration_ms = 100" + run_query_without_errors(group_with_key_query, support_partial_result=False) + + +def main(): + supported_scenarios_without_key() + unsupported_scenarios() + + +if __name__ == "__main__": + main() diff --git a/tests/queries/0_stateless/02834_partial_aggregating_result_during_query_execution.reference b/tests/queries/0_stateless/02834_partial_aggregating_result_during_query_execution.reference new file mode 100644 index 00000000000..a813b18f24f --- /dev/null +++ b/tests/queries/0_stateless/02834_partial_aggregating_result_during_query_execution.reference @@ -0,0 +1,88 @@ +Rows 0 Columns 9 +Column median(number) type Float64 +Column stddevSamp(number) type Float64 +Column stddevPop(number) type Float64 +Column max(number) type UInt64 +Column min(number) type UInt64 +Column any(number) type UInt64 +Column count(number) type UInt64 +Column avg(number) type Float64 +Column sum(number) type UInt64 +Rows 1 Columns 9 +Column median(number) type Float64 +Column stddevSamp(number) type Float64 +Column stddevPop(number) type Float64 +Column max(number) type UInt64 +Column min(number) type UInt64 +Column any(number) type UInt64 +Column count(number) type UInt64 +Column avg(number) type Float64 +Column sum(number) type UInt64 +Rows 1 Columns 9 +Column median(number) type Float64 +Column stddevSamp(number) type Float64 +Column stddevPop(number) type Float64 +Column max(number) type UInt64 +Column min(number) type UInt64 +Column any(number) type UInt64 +Column count(number) type UInt64 +Column avg(number) type Float64 +Column sum(number) type UInt64 +max(number) [10000000] +min(number) [0] +any(number) [0] +count(number) [10000001] +sum(number) [50000005000000] +Rows 0 Columns 9 +Column median(number) type Float64 +Column stddevSamp(number) type Float64 +Column stddevPop(number) type Float64 +Column max(number) type UInt64 +Column min(number) type UInt64 +Column any(number) type UInt64 +Column count(number) type UInt64 +Column avg(number) type Float64 +Column sum(number) type UInt64 +Rows 1 Columns 9 +Column median(number) type Float64 +Column stddevSamp(number) type Float64 +Column stddevPop(number) type Float64 +Column max(number) type UInt64 +Column min(number) type UInt64 +Column any(number) type UInt64 +Column count(number) type UInt64 +Column avg(number) type Float64 +Column sum(number) type UInt64 +Rows 1 Columns 9 +Column median(number) type Float64 +Column stddevSamp(number) type Float64 +Column stddevPop(number) type Float64 +Column max(number) type UInt64 +Column min(number) type UInt64 +Column any(number) type UInt64 +Column count(number) type UInt64 +Column avg(number) type Float64 +Column sum(number) type UInt64 +max(number) [9999999] +min(number) [9999997] +any(number) [9999999] +count(number) [3] +sum(number) [29999994] +Rows 0 Columns 1 +Column sum(number) type UInt64 +Rows 0 Columns 1 +Column sum(number) type UInt64 +Rows 1 Columns 1 +Column sum(number) type UInt64 +sum(number) [50000005000000] +Rows 0 Columns 2 +Column mod2 type UInt8 +Column sum(number) type UInt64 +Rows 0 Columns 2 +Column mod2 type UInt8 +Column sum(number) type UInt64 +Rows 2 Columns 2 +Column mod2 type UInt8 +Column sum(number) type UInt64 +mod2 [0, 1] +sum(number) [25000005000000, 25000000000000] diff --git a/tests/queries/0_stateless/02834_partial_aggregating_result_during_query_execution.sh b/tests/queries/0_stateless/02834_partial_aggregating_result_during_query_execution.sh new file mode 100755 index 00000000000..e70a3c53ec4 --- /dev/null +++ b/tests/queries/0_stateless/02834_partial_aggregating_result_during_query_execution.sh @@ -0,0 +1,8 @@ +#!/usr/bin/env bash + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CURDIR"/../shell_config.sh + +# We should have correct env vars from shell_config.sh to run this test +python3 "$CURDIR"/02834_partial_aggregating_result_during_query_execution.python diff --git a/tests/queries/0_stateless/helpers/tcp_client.py b/tests/queries/0_stateless/helpers/tcp_client.py new file mode 100644 index 00000000000..fdc4ab28e04 --- /dev/null +++ b/tests/queries/0_stateless/helpers/tcp_client.py @@ -0,0 +1,313 @@ +import socket +import os +import uuid +import struct + +CLICKHOUSE_HOST = os.environ.get("CLICKHOUSE_HOST", "127.0.0.1") +CLICKHOUSE_PORT = int(os.environ.get("CLICKHOUSE_PORT_TCP", "900000")) +CLICKHOUSE_DATABASE = os.environ.get("CLICKHOUSE_DATABASE", "default") +CLIENT_NAME = "simple native protocol" + + +def writeVarUInt(x, ba): + for _ in range(0, 9): + byte = x & 0x7F + if x > 0x7F: + byte |= 0x80 + + ba.append(byte) + + x >>= 7 + if x == 0: + return + + +def writeStringBinary(s, ba): + b = bytes(s, "utf-8") + writeVarUInt(len(s), ba) + ba.extend(b) + + +def serializeClientInfo(ba, query_id): + writeStringBinary("default", ba) # initial_user + writeStringBinary(query_id, ba) # initial_query_id + writeStringBinary("127.0.0.1:9000", ba) # initial_address + ba.extend([0] * 8) # initial_query_start_time_microseconds + ba.append(1) # TCP + writeStringBinary("os_user", ba) # os_user + writeStringBinary("client_hostname", ba) # client_hostname + writeStringBinary(CLIENT_NAME, ba) # client_name + writeVarUInt(21, ba) + writeVarUInt(9, ba) + writeVarUInt(54449, ba) + writeStringBinary("", ba) # quota_key + writeVarUInt(0, ba) # distributed_depth + writeVarUInt(1, ba) # client_version_patch + ba.append(0) # No telemetry + + +def serializeBlockInfo(ba): + writeVarUInt(1, ba) # 1 + ba.append(0) # is_overflows + writeVarUInt(2, ba) # 2 + writeVarUInt(0, ba) # 0 + ba.extend([0] * 4) # bucket_num + + +def assertPacket(packet, expected): + assert packet == expected, "Got: {}, expected: {}".format(packet, expected) + + +class Data(object): + def __init__(self, key, value): + self.key = key + self.value = value + + +class TCPClient(object): + def __init__(self, timeout=30): + self.timeout = timeout + self.socket = None + + def __enter__(self): + self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.socket.settimeout(self.timeout) + self.socket.connect((CLICKHOUSE_HOST, CLICKHOUSE_PORT)) + + self.sendHello() + self.receiveHello() + + return self + + def __exit__(self, exc_type, exc_value, traceback): + if self.socket: + self.socket.close() + + def readStrict(self, size=1): + res = bytearray() + while size: + cur = self.socket.recv(size) + # if not res: + # raise "Socket is closed" + size -= len(cur) + res.extend(cur) + + return res + + def readUInt(self, size=1): + res = self.readStrict(size) + val = 0 + for i in range(len(res)): + val += res[i] << (i * 8) + return val + + def readUInt8(self): + return self.readUInt() + + def readUInt16(self): + return self.readUInt(2) + + def readUInt32(self): + return self.readUInt(4) + + def readUInt64(self): + return self.readUInt(8) + + def readFloat16(self): + return struct.unpack("e", self.readStrict(2)) + + def readFloat32(self): + return struct.unpack("f", self.readStrict(4)) + + def readFloat64(self): + return struct.unpack("d", self.readStrict(8)) + + def readVarUInt(self): + x = 0 + for i in range(9): + byte = self.readStrict()[0] + x |= (byte & 0x7F) << (7 * i) + + if not byte & 0x80: + return x + + return x + + def readStringBinary(self): + size = self.readVarUInt() + s = self.readStrict(size) + return s.decode("utf-8") + + def send(self, byte_array): + self.socket.sendall(byte_array) + + def sendHello(self): + ba = bytearray() + writeVarUInt(0, ba) # Hello + writeStringBinary(CLIENT_NAME, ba) + writeVarUInt(21, ba) + writeVarUInt(9, ba) + writeVarUInt(54449, ba) + writeStringBinary(CLICKHOUSE_DATABASE, ba) # database + writeStringBinary("default", ba) # user + writeStringBinary("", ba) # pwd + self.send(ba) + + def receiveHello(self): + p_type = self.readVarUInt() + assert p_type == 0 # Hello + _server_name = self.readStringBinary() + _server_version_major = self.readVarUInt() + _server_version_minor = self.readVarUInt() + _server_revision = self.readVarUInt() + _server_timezone = self.readStringBinary() + _server_display_name = self.readStringBinary() + _server_version_patch = self.readVarUInt() + + def sendQuery(self, query, settings=None): + if settings == None: + settings = {} # No settings + + ba = bytearray() + query_id = uuid.uuid4().hex + writeVarUInt(1, ba) # query + writeStringBinary(query_id, ba) + + ba.append(1) # INITIAL_QUERY + + # client info + serializeClientInfo(ba, query_id) + + # Settings + for key, value in settings.items(): + writeStringBinary(key, ba) + writeVarUInt(1, ba) # is_important + writeStringBinary(str(value), ba) + writeStringBinary("", ba) # End of settings + + writeStringBinary("", ba) # No interserver secret + writeVarUInt(2, ba) # Stage - Complete + ba.append(0) # No compression + writeStringBinary(query, ba) # query, finally + self.send(ba) + + def sendEmptyBlock(self): + ba = bytearray() + writeVarUInt(2, ba) # Data + writeStringBinary("", ba) + serializeBlockInfo(ba) + writeVarUInt(0, ba) # rows + writeVarUInt(0, ba) # columns + self.send(ba) + + def readException(self): + code = self.readUInt32() + _name = self.readStringBinary() + text = self.readStringBinary() + self.readStringBinary() # trace + assertPacket(self.readUInt8(), 0) # has_nested + return "code {}: {}".format(code, text.replace("DB::Exception:", "")) + + def readPacketType(self): + packet_type = self.readVarUInt() + if packet_type == 2: # Exception + raise RuntimeError(self.readException()) + + return packet_type + + def readResponse(self): + packet_type = self.readPacketType() + if packet_type == 1: # Data + return None + if packet_type == 3: # Progress + return None + if packet_type == 5: # End stream + return None + + raise RuntimeError("Unexpected packet: {}".format(packet_type)) + + def readProgressData(self): + read_rows = self.readVarUInt() + read_bytes = self.readVarUInt() + total_rows_to_read = self.readVarUInt() + written_rows = self.readVarUInt() + written_bytes = self.readVarUInt() + + return read_rows, read_bytes, total_rows_to_read, written_rows, written_bytes + + def readProgress(self): + packet_type = self.readPacketType() + if packet_type == 5: # End stream + return None + assertPacket(packet_type, 3) # Progress + return self.readProgressData() + + def readHeaderInfo(self): + self.readStringBinary() # external table name + # BlockInfo + assertPacket(self.readVarUInt(), 1) # field number 1 + assertPacket(self.readUInt8(), 0) # is_overflows + assertPacket(self.readVarUInt(), 2) # field number 2 + assertPacket(self.readUInt32(), 4294967295) # bucket_num + assertPacket(self.readVarUInt(), 0) # 0 + columns = self.readVarUInt() # rows + rows = self.readVarUInt() # columns + + return columns, rows + + def readHeader(self): + packet_type = self.readPacketType() + assertPacket(packet_type, 1) # Data + + columns, rows = self.readHeaderInfo() + print("Rows {} Columns {}".format(rows, columns)) + for _ in range(columns): + col_name = self.readStringBinary() + type_name = self.readStringBinary() + print("Column {} type {}".format(col_name, type_name)) + + def readRow(self, row_type, rows): + supported_row_types = { + "UInt8": self.readUInt8, + "UInt16": self.readUInt16, + "UInt32": self.readUInt32, + "UInt64": self.readUInt64, + "Float16": self.readFloat16, + "Float32": self.readFloat32, + "Float64": self.readFloat64, + } + if row_type in supported_row_types: + read_type = supported_row_types[row_type] + row = [read_type() for _ in range(rows)] + return row + else: + raise RuntimeError( + "Current python version of tcp client doesn't support the following type of row: {}".format( + row_type + ) + ) + + def readDataWithoutProgress(self, need_print_info=True): + packet_type = self.readPacketType() + while packet_type == 3: # Progress + self.readProgressData() + packet_type = self.readPacketType() + + if packet_type == 5: # End stream + return None + assertPacket(packet_type, 1) # Data + + columns, rows = self.readHeaderInfo() + data = [] + if need_print_info: + print("Rows {} Columns {}".format(rows, columns)) + + for _ in range(columns): + col_name = self.readStringBinary() + type_name = self.readStringBinary() + if need_print_info: + print("Column {} type {}".format(col_name, type_name)) + + data.append(Data(col_name, self.readRow(type_name, rows))) + + return data