diff --git a/src/Client/ClientBase.h b/src/Client/ClientBase.h index c5e40186962..8aa9fc42de2 100644 --- a/src/Client/ClientBase.h +++ b/src/Client/ClientBase.h @@ -271,9 +271,16 @@ protected: enum class PartialResultMode: UInt8 { - NotInit, /// Query doesn't show partial result before the first block with 0 rows. - Active, /// Query shows partial result after the first and before the second block with 0 rows. - Inactive /// Query doesn't show partial result at all. + /// Query doesn't show partial result before the first block with 0 rows. + /// The first block with 0 rows initializes the output table format using its header. + NotInit, + + /// Query shows partial result after the first and before the second block with 0 rows. + /// The second block with 0 rows indicates that that receiving blocks with partial result has been completed and next blocks will be with the full result. + Active, + + /// Query doesn't show partial result at all. + Inactive, }; PartialResultMode partial_result_mode = PartialResultMode::Inactive; diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 08292034e01..dc69831533d 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -307,8 +307,8 @@ class IColumn; \ M(Bool, partial_result_on_first_cancel, false, "Allows query to return a partial result after cancel.", 0) \ \ - M(Milliseconds, partial_result_update_duration_ms, 0, "Duration of time in milliseconds between real-time updates of result table sent to the client during query execution.", 0) \ - M(UInt64, max_rows_in_partial_result, 10, "Max rows displayed to user after each real-time update of output table during query execution.", 0) \ + M(Milliseconds, partial_result_update_duration_ms, 0, "Interval (in milliseconds) for sending updates with partial data about the result table to the client (in interactive mode) during query execution. Setting to 0 disables partial results. Only supported for single-threaded GROUP BY without key, ORDER BY, LIMIT and OFFSET.", 0) \ + M(UInt64, max_rows_in_partial_result, 10, "Maximum rows to show in the partial result after every real-time update while the query runs (use partial result limit + OFFSET as a value in case of OFFSET in the query).", 0) \ /** Settings for testing hedged requests */ \ M(Milliseconds, sleep_in_send_tables_status_ms, 0, "Time to sleep in sending tables status response in TCPHandler", 0) \ M(Milliseconds, sleep_in_send_data_ms, 0, "Time to sleep in sending data in TCPHandler", 0) \ diff --git a/src/Processors/Formats/IOutputFormat.cpp b/src/Processors/Formats/IOutputFormat.cpp index 6104037a72b..e691e32a7bc 100644 --- a/src/Processors/Formats/IOutputFormat.cpp +++ b/src/Processors/Formats/IOutputFormat.cpp @@ -198,22 +198,4 @@ void IOutputFormat::finalize() finalized = true; } -void IOutputFormat::clearLastLines(size_t lines_number) -{ - /// http://en.wikipedia.org/wiki/ANSI_escape_code - #define MOVE_TO_PREV_LINE "\033[A" - #define CLEAR_TO_END_OF_LINE "\033[K" - - static const char * clear_prev_line = MOVE_TO_PREV_LINE \ - CLEAR_TO_END_OF_LINE; - - /// Move cursor to the beginning of line - writeCString("\r", out); - - for (size_t line = 0; line < lines_number; ++line) - { - writeCString(clear_prev_line, out); - } -} - } diff --git a/src/Processors/Formats/IOutputFormat.h b/src/Processors/Formats/IOutputFormat.h index fc85b0d063b..470d24e9a22 100644 --- a/src/Processors/Formats/IOutputFormat.h +++ b/src/Processors/Formats/IOutputFormat.h @@ -103,8 +103,6 @@ public: } } - void clearLastLines(size_t lines_number); - protected: friend class ParallelFormattingOutputFormat; diff --git a/src/Processors/Formats/Impl/PrettyBlockOutputFormat.cpp b/src/Processors/Formats/Impl/PrettyBlockOutputFormat.cpp index 6f8461232b1..6fa891297f6 100644 --- a/src/Processors/Formats/Impl/PrettyBlockOutputFormat.cpp +++ b/src/Processors/Formats/Impl/PrettyBlockOutputFormat.cpp @@ -390,9 +390,28 @@ void PrettyBlockOutputFormat::consumeExtremes(Chunk chunk) write(std::move(chunk), PortKind::Extremes); } +void PrettyBlockOutputFormat::clearLastLines(size_t lines_number) +{ + /// http://en.wikipedia.org/wiki/ANSI_escape_code + #define MOVE_TO_PREV_LINE "\033[A" + #define CLEAR_TO_END_OF_LINE "\033[K" + + static const char * clear_prev_line = MOVE_TO_PREV_LINE \ + CLEAR_TO_END_OF_LINE; + + /// Move cursor to the beginning of line + writeCString("\r", out); + + for (size_t line = 0; line < lines_number; ++line) + { + writeCString(clear_prev_line, out); + } +} + void PrettyBlockOutputFormat::consumePartialResult(Chunk chunk) { if (prev_partial_block_rows > 0) + /// number of rows + header line + footer line clearLastLines(prev_partial_block_rows + 2); prev_partial_block_rows = chunk.getNumRows(); diff --git a/src/Processors/Formats/Impl/PrettyBlockOutputFormat.h b/src/Processors/Formats/Impl/PrettyBlockOutputFormat.h index a916a66a620..92466dce3ff 100644 --- a/src/Processors/Formats/Impl/PrettyBlockOutputFormat.h +++ b/src/Processors/Formats/Impl/PrettyBlockOutputFormat.h @@ -27,6 +27,8 @@ protected: void consume(Chunk) override; void consumeTotals(Chunk) override; void consumeExtremes(Chunk) override; + + void clearLastLines(size_t lines_number); void consumePartialResult(Chunk) override; size_t total_rows = 0; diff --git a/src/Processors/IProcessor.h b/src/Processors/IProcessor.h index a156da7bad2..51a0bb1c121 100644 --- a/src/Processors/IProcessor.h +++ b/src/Processors/IProcessor.h @@ -237,8 +237,21 @@ public: throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method 'expandPipeline' is not implemented for {} processor", getName()); } + enum class PartialResultStatus + { + /// Processor currently doesn't support work with the partial result pipeline. + NotSupported, + + /// Processor can be skipped in the partial result pipeline. + SkipSupported, + + /// Processor creates a light-weight copy of itself in the partial result pipeline. + /// The copy can create snapshots of the original processor or transform small blocks of data in the same way as the original processor + FullSupported, + }; + virtual bool isPartialResultProcessor() const { return false; } - virtual bool supportPartialResultProcessor() const { return false; } + virtual PartialResultStatus getPartialResultProcessorSupportStatus() const { return PartialResultStatus::NotSupported; } /// In case if query was cancelled executor will wait till all processors finish their jobs. /// Generally, there is no reason to check this flag. However, it may be reasonable for long operations (e.g. i/o). diff --git a/src/Processors/LimitTransform.h b/src/Processors/LimitTransform.h index ebdcbe49670..eac5f9e8d6d 100644 --- a/src/Processors/LimitTransform.h +++ b/src/Processors/LimitTransform.h @@ -76,7 +76,7 @@ public: void setRowsBeforeLimitCounter(RowsBeforeLimitCounterPtr counter) override { rows_before_limit_at_least.swap(counter); } void setInputPortHasCounter(size_t pos) { ports_data[pos].input_port_has_counter = true; } - bool supportPartialResultProcessor() const override { return true; } + PartialResultStatus getPartialResultProcessorSupportStatus() const override { return PartialResultStatus::FullSupported; } }; } diff --git a/src/Processors/QueryPlan/QueryPlan.cpp b/src/Processors/QueryPlan/QueryPlan.cpp index a0588455690..824c29cb158 100644 --- a/src/Processors/QueryPlan/QueryPlan.cpp +++ b/src/Processors/QueryPlan/QueryPlan.cpp @@ -197,7 +197,7 @@ QueryPipelineBuilderPtr QueryPlan::buildQueryPipeline( else stack.push(Frame{.node = frame.node->children[next_child]}); - if (last_pipeline && has_partial_result_setting) + if (has_partial_result_setting && last_pipeline && !last_pipeline->isPartialResultActive()) last_pipeline->activatePartialResult(build_pipeline_settings.partial_result_limit, build_pipeline_settings.partial_result_duration_ms); } diff --git a/src/Processors/Transforms/AggregatingPartialResultTransform.cpp b/src/Processors/Transforms/AggregatingPartialResultTransform.cpp index fbfb9be572b..cf8ce72e096 100644 --- a/src/Processors/Transforms/AggregatingPartialResultTransform.cpp +++ b/src/Processors/Transforms/AggregatingPartialResultTransform.cpp @@ -8,33 +8,38 @@ AggregatingPartialResultTransform::AggregatingPartialResultTransform( UInt64 partial_result_limit_, UInt64 partial_result_duration_ms_) : PartialResultTransform(input_header, output_header, partial_result_limit_, partial_result_duration_ms_) , aggregating_transform(std::move(aggregating_transform_)) + , transform_aggregator(input_header, aggregating_transform->params->params) {} +void AggregatingPartialResultTransform::transformPartialResult(Chunk & chunk) +{ + auto & params = aggregating_transform->params->params; + + bool no_more_keys = false; + AggregatedDataVariants variants; + ColumnRawPtrs key_columns(params.keys_size); + Aggregator::AggregateColumns aggregate_columns(params.aggregates_size); + + const UInt64 num_rows = chunk.getNumRows(); + transform_aggregator.executeOnBlock(chunk.detachColumns(), 0, num_rows, variants, key_columns, aggregate_columns, no_more_keys); + + auto transformed_block = transform_aggregator.convertToBlocks(variants, /*final*/ true, /*max_threads*/ 1).front(); + + chunk = convertToChunk(transformed_block); +} + PartialResultTransform::ShaphotResult AggregatingPartialResultTransform::getRealProcessorSnapshot() { std::lock_guard lock(aggregating_transform->snapshot_mutex); - - auto & params = aggregating_transform->params; - /// Currently not supported cases - /// TODO: check that insert results from prepareBlockAndFillWithoutKey return values without changing of the aggregator state - if (params->params.keys_size != 0 /// has at least one key for aggregation - || params->aggregator.hasTemporaryData() /// use external storage for aggregation - || aggregating_transform->many_data->variants.size() > 1) /// use more then one stream for aggregation - return {{}, SnaphotStatus::Stopped}; - if (aggregating_transform->is_generate_initialized) return {{}, SnaphotStatus::Stopped}; if (aggregating_transform->variants.empty()) return {{}, SnaphotStatus::NotReady}; - auto & aggregator = params->aggregator; - - auto prepared_data = aggregator.prepareVariantsToMerge(aggregating_transform->many_data->variants); - AggregatedDataVariantsPtr & first = prepared_data.at(0); - - aggregator.mergeWithoutKeyDataImpl(prepared_data); - auto block = aggregator.prepareBlockAndFillWithoutKeySnapshot(*first); + auto & snapshot_aggregator = aggregating_transform->params->aggregator; + auto & snapshot_variants = aggregating_transform->many_data->variants; + auto block = snapshot_aggregator.prepareBlockAndFillWithoutKeySnapshot(*snapshot_variants.at(0)); return {convertToChunk(block), SnaphotStatus::Ready}; } diff --git a/src/Processors/Transforms/AggregatingPartialResultTransform.h b/src/Processors/Transforms/AggregatingPartialResultTransform.h index e5e28dc7f7a..f7bac3a5394 100644 --- a/src/Processors/Transforms/AggregatingPartialResultTransform.h +++ b/src/Processors/Transforms/AggregatingPartialResultTransform.h @@ -1,5 +1,6 @@ #pragma once +#include #include #include @@ -17,10 +18,12 @@ public: String getName() const override { return "AggregatingPartialResultTransform"; } + void transformPartialResult(Chunk & chunk) override; ShaphotResult getRealProcessorSnapshot() override; private: AggregatingTransformPtr aggregating_transform; + Aggregator transform_aggregator; }; } diff --git a/src/Processors/Transforms/AggregatingTransform.cpp b/src/Processors/Transforms/AggregatingTransform.cpp index b4d2785bed2..b0faf0f96d4 100644 --- a/src/Processors/Transforms/AggregatingTransform.cpp +++ b/src/Processors/Transforms/AggregatingTransform.cpp @@ -664,11 +664,13 @@ void AggregatingTransform::consume(Chunk chunk) { auto block = getInputs().front().getHeader().cloneWithColumns(chunk.detachColumns()); block = materializeBlock(block); + LOG_DEBUG(log, "AggregatingTransform::consume. Merge Block columns {}", block.dumpNames()); if (!params->aggregator.mergeOnBlock(block, variants, no_more_keys)) is_consume_finished = true; } else { + LOG_DEBUG(log, "AggregatingTransform::consume. Execute Block"); if (!params->aggregator.executeOnBlock(chunk.detachColumns(), 0, num_rows, variants, key_columns, aggregate_columns, no_more_keys)) is_consume_finished = true; } diff --git a/src/Processors/Transforms/AggregatingTransform.h b/src/Processors/Transforms/AggregatingTransform.h index 649b90ae74a..b6f32bde389 100644 --- a/src/Processors/Transforms/AggregatingTransform.h +++ b/src/Processors/Transforms/AggregatingTransform.h @@ -170,7 +170,16 @@ public: void work() override; Processors expandPipeline() override; - bool supportPartialResultProcessor() const override { return true; } + PartialResultStatus getPartialResultProcessorSupportStatus() const override { + /// Currently AggregatingPartialResultTransform support only single-thread aggregation without key. + + /// TODO: check that insert results from aggregator.prepareBlockAndFillWithoutKey return values without + /// changing of the aggregator state when aggregation with keys will be supported in AggregatingPartialResultTransform. + bool is_partial_result_supported = params->params.keys_size == 0 /// Aggregation without key. + && many_data->variants.size() == 1; /// Use only one stream for aggregation. + + return is_partial_result_supported ? PartialResultStatus::FullSupported : PartialResultStatus::NotSupported; + } protected: void consume(Chunk chunk); @@ -217,6 +226,10 @@ private: bool is_consume_started = false; friend class AggregatingPartialResultTransform; + /// The mutex protects variables that are used for creating a snapshot of the current processor. + /// The current implementation of AggregatingPartialResultTransform uses the 'is_generate_initialized' variable to check + /// whether the processor has started sending data through the main pipeline, and the corresponding partial result processor should stop creating snapshots. + /// Additionally, the mutex protects the 'params->aggregator' and 'many_data->variants' variables, which are used to get data from them for a snapshot. std::mutex snapshot_mutex; void initGenerate(); diff --git a/src/Processors/Transforms/ExpressionTransform.h b/src/Processors/Transforms/ExpressionTransform.h index a7b5ef4f51b..8250f25f0f8 100644 --- a/src/Processors/Transforms/ExpressionTransform.h +++ b/src/Processors/Transforms/ExpressionTransform.h @@ -26,7 +26,7 @@ public: static Block transformHeader(Block header, const ActionsDAG & expression); - bool supportPartialResultProcessor() const override { return true; } + PartialResultStatus getPartialResultProcessorSupportStatus() const override { return PartialResultStatus::FullSupported; } protected: void transform(Chunk & chunk) override; diff --git a/src/Processors/Transforms/LimitPartialResultTransform.cpp b/src/Processors/Transforms/LimitPartialResultTransform.cpp index fdaa95aac6c..c9eaa9dc7dd 100644 --- a/src/Processors/Transforms/LimitPartialResultTransform.cpp +++ b/src/Processors/Transforms/LimitPartialResultTransform.cpp @@ -29,11 +29,11 @@ void LimitPartialResultTransform::transformPartialResult(Chunk & chunk) /// Check if some rows should be removed if (length < num_rows) { - auto columns = chunk.detachColumns(); UInt64 num_columns = chunk.getNumColumns(); + auto columns = chunk.detachColumns(); for (UInt64 i = 0; i < num_columns; ++i) - columns[i] = columns[i]->cut(offset, limit); + columns[i] = columns[i]->cut(offset, length); chunk.setColumns(std::move(columns), length); } diff --git a/src/Processors/Transforms/LimitPartialResultTransform.h b/src/Processors/Transforms/LimitPartialResultTransform.h index 0f5549323c0..3a0116b624d 100644 --- a/src/Processors/Transforms/LimitPartialResultTransform.h +++ b/src/Processors/Transforms/LimitPartialResultTransform.h @@ -23,6 +23,8 @@ public: String getName() const override { return "LimitPartialResultTransform"; } void transformPartialResult(Chunk & chunk) override; + /// LimitsTransform doesn't have a state which can be snapshoted + ShaphotResult getRealProcessorSnapshot() override { return {{}, SnaphotStatus::Stopped}; } private: UInt64 limit; diff --git a/src/Processors/Transforms/LimitsCheckingTransform.cpp b/src/Processors/Transforms/LimitsCheckingTransform.cpp index 487592f9809..0557f3f291e 100644 --- a/src/Processors/Transforms/LimitsCheckingTransform.cpp +++ b/src/Processors/Transforms/LimitsCheckingTransform.cpp @@ -75,10 +75,4 @@ void LimitsCheckingTransform::checkQuota(Chunk & chunk) } } -ProcessorPtr LimitsCheckingTransform::getPartialResultProcessor(const ProcessorPtr & /*current_processor*/, UInt64 partial_result_limit, UInt64 partial_result_duration_ms) -{ - const auto & header = inputs.front().getHeader(); - return std::make_shared(header, partial_result_limit, partial_result_duration_ms); -} - } diff --git a/src/Processors/Transforms/LimitsCheckingTransform.h b/src/Processors/Transforms/LimitsCheckingTransform.h index e8635384129..eabb988dab6 100644 --- a/src/Processors/Transforms/LimitsCheckingTransform.h +++ b/src/Processors/Transforms/LimitsCheckingTransform.h @@ -33,13 +33,11 @@ public: void setQuota(const std::shared_ptr & quota_) { quota = quota_; } - bool supportPartialResultProcessor() const override { return true; } + PartialResultStatus getPartialResultProcessorSupportStatus() const override { return PartialResultStatus::SkipSupported; } protected: void transform(Chunk & chunk) override; - ProcessorPtr getPartialResultProcessor(const ProcessorPtr & current_processor, UInt64 partial_result_limit, UInt64 partial_result_duration_ms) override; - private: StreamLocalLimits limits; diff --git a/src/Processors/Transforms/MergeSortingPartialResultTransform.h b/src/Processors/Transforms/MergeSortingPartialResultTransform.h index 1eaebf9d902..781aa8e1265 100644 --- a/src/Processors/Transforms/MergeSortingPartialResultTransform.h +++ b/src/Processors/Transforms/MergeSortingPartialResultTransform.h @@ -17,6 +17,8 @@ public: String getName() const override { return "MergeSortingPartialResultTransform"; } + /// MergeSortingTransform always receives chunks in a sorted state, so transformation is not needed + void transformPartialResult(Chunk & /*chunk*/) override {} ShaphotResult getRealProcessorSnapshot() override; private: diff --git a/src/Processors/Transforms/MergeSortingTransform.h b/src/Processors/Transforms/MergeSortingTransform.h index e8a1f7a5853..67f098b4362 100644 --- a/src/Processors/Transforms/MergeSortingTransform.h +++ b/src/Processors/Transforms/MergeSortingTransform.h @@ -33,7 +33,7 @@ public: String getName() const override { return "MergeSortingTransform"; } - bool supportPartialResultProcessor() const override { return true; } + PartialResultStatus getPartialResultProcessorSupportStatus() const override { return PartialResultStatus::FullSupported; } protected: void consume(Chunk chunk) override; @@ -65,6 +65,10 @@ private: ProcessorPtr external_merging_sorted; friend class MergeSortingPartialResultTransform; + /// The mutex protects variables that are used for creating a snapshot of the current processor. + /// The current implementation of MergeSortingPartialResultTransform uses the 'generated_prefix' variable to check + /// whether the processor has started sending data through the main pipeline, and the corresponding partial result processor should stop creating snapshots. + /// Additionally, the mutex protects the 'chunks' variable and all variables in the 'remerge' function, which is used to transition 'chunks' to a sorted state. std::mutex snapshot_mutex; }; diff --git a/src/Processors/Transforms/PartialResultTransform.h b/src/Processors/Transforms/PartialResultTransform.h index 2bcf494400d..4fe87638f38 100644 --- a/src/Processors/Transforms/PartialResultTransform.h +++ b/src/Processors/Transforms/PartialResultTransform.h @@ -5,6 +5,11 @@ namespace DB { +/// Processors of this type are used to construct an auxiliary pipeline with processors corresponding to those in the main pipeline. +/// These processors work in two modes: +/// 1) Creating a snapshot of the corresponding processor from the main pipeline once per partial_result_duration_ms (period in milliseconds), and then sending the snapshot through the partial result pipeline. +/// 2) Transforming small blocks of data in the same way as the original processor and sending the transformed data through the partial result pipeline. +/// All processors of this type rely on the invariant that a new block from the previous processor of the partial result pipeline overwrites information about the previous block of the same previous processor. class PartialResultTransform : public IProcessor { public: @@ -42,8 +47,8 @@ protected: bool finished_getting_snapshots = false; - virtual void transformPartialResult(Chunk & /*chunk*/) {} - virtual ShaphotResult getRealProcessorSnapshot() { return {{}, SnaphotStatus::Stopped}; } + virtual void transformPartialResult(Chunk & /*chunk*/) = 0; + virtual ShaphotResult getRealProcessorSnapshot() = 0; // { return {{}, SnaphotStatus::Stopped}; } private: Stopwatch watch; diff --git a/src/QueryPipeline/Pipe.cpp b/src/QueryPipeline/Pipe.cpp index 15f6bea70a7..293d152ea65 100644 --- a/src/QueryPipeline/Pipe.cpp +++ b/src/QueryPipeline/Pipe.cpp @@ -407,12 +407,13 @@ void Pipe::addExtremesSource(ProcessorPtr source) void Pipe::activatePartialResult(UInt64 partial_result_limit_, UInt64 partial_result_duration_ms_) { - if (!is_partial_result_active) - partial_result_ports.assign(output_ports.size(), nullptr); + if (is_partial_result_active) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Partial result for Pipe should be initialized only once"); is_partial_result_active = true; partial_result_limit = partial_result_limit_; partial_result_duration_ms = partial_result_duration_ms_; + partial_result_ports.assign(output_ports.size(), nullptr); } static void dropPort(OutputPort *& port, Processors & processors, Processors * collected_processors) @@ -629,12 +630,13 @@ void Pipe::addPartialResultSimpleTransform(const ProcessorPtr & transform, size_ if (isPartialResultActive()) { auto & partial_result_port = partial_result_ports[partial_result_port_id]; + auto partial_result_status = transform->getPartialResultProcessorSupportStatus(); - if (!transform->supportPartialResultProcessor()) - { + if (partial_result_status == IProcessor::PartialResultStatus::NotSupported) dropPort(partial_result_port, *processors, collected_processors); + + if (partial_result_status != IProcessor::PartialResultStatus::FullSupported) return; - } auto partial_result_transform = IProcessor::getPartialResultProcessorPtr(transform, partial_result_limit, partial_result_duration_ms); @@ -651,23 +653,34 @@ void Pipe::addPartialResultTransform(const ProcessorPtr & transform) if (isPartialResultActive()) { size_t new_outputs_size = transform->getOutputs().size(); + auto partial_result_status = transform->getPartialResultProcessorSupportStatus(); - if (!transform->supportPartialResultProcessor()) + if (partial_result_status == IProcessor::PartialResultStatus::SkipSupported && new_outputs_size != partial_result_ports.size()) + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Cannot skip transform {} in the partial result part of the Pipe because it has {} output ports, but the partial result part expects {} output ports", + transform->getName(), + new_outputs_size, + partial_result_ports.size()); + + if (partial_result_status == IProcessor::PartialResultStatus::NotSupported) { for (auto & partial_result_port : partial_result_ports) dropPort(partial_result_port, *processors, collected_processors); partial_result_ports.assign(new_outputs_size, nullptr); - return; } + if (partial_result_status != IProcessor::PartialResultStatus::FullSupported) + return; + auto partial_result_transform = IProcessor::getPartialResultProcessorPtr(transform, partial_result_limit, partial_result_duration_ms); auto & inputs = partial_result_transform->getInputs(); if (inputs.size() != partial_result_ports.size()) throw Exception( ErrorCodes::LOGICAL_ERROR, - "Cannot add transform {} to Pipe because it has {} input ports, but {} expected", + "Cannot add partial result transform {} to Pipe because it has {} input ports, but {} expected", partial_result_transform->getName(), inputs.size(), partial_result_ports.size()); diff --git a/tests/queries/0_stateless/02833_partial_sorting_result_during_query_execution.python b/tests/queries/0_stateless/02833_partial_sorting_result_during_query_execution.python index 38a9d82d8b6..1fadddb0871 100755 --- a/tests/queries/0_stateless/02833_partial_sorting_result_during_query_execution.python +++ b/tests/queries/0_stateless/02833_partial_sorting_result_during_query_execution.python @@ -11,51 +11,69 @@ sys.path.insert(0, os.path.join(CURDIR, "helpers")) from tcp_client import TCPClient -def main(): +def run_query_without_errors(query, support_partial_result): with TCPClient() as client: - client.sendQuery( - "SELECT number FROM numbers_mt(1e7+1) ORDER BY -number LIMIT 15 SETTINGS max_threads = 1, partial_result_update_duration_ms = 1, max_rows_in_partial_result = 10" - ) + client.sendQuery(query) # external tables client.sendEmptyBlock() client.readHeader() # Partial result - _, partial_result = client.readDataWithoutProgress()[0] - assert ( - len(partial_result) > 0 - ), "Expected at least one block with a non-empty partial result before getting the full result" + partial_result = client.readDataWithoutProgress()[0] + if support_partial_result: + assert ( + len(partial_result.value) > 0 + ), "Expected at least one block with a non-empty partial result before getting the full result" - while True: - assert all( - a >= b for a, b in zip(partial_result, partial_result[1:]) - ), "Partial result always should be sorted for this test" + while True: + assert all( + a >= b for a, b in zip(partial_result.value, partial_result.value[1:]) + ), "Partial result always should be sorted for this test" - _, new_partial_result = client.readDataWithoutProgress( - need_print_info=False - )[0] - if len(new_partial_result) == 0: - break + new_partial_result = client.readDataWithoutProgress( + need_print_info=False + )[0] + if len(new_partial_result.value) == 0: + break - data_size = len(partial_result) - assert all( - partial_result[i] <= new_partial_result[i] for i in range(data_size) - ), f"New partial result values should always be greater then old one because a new block contains more information about the full data. New result {new_partial_result}. Previous result {partial_result}" + data_size = len(partial_result.value) + assert all( + partial_result.value[i] <= new_partial_result.value[i] for i in range(data_size) + ), f"New partial result values should always be greater then old one because a new block contains more information about the full data. New result {new_partial_result}. Previous result {partial_result}" + + partial_result = new_partial_result + else: + block_rows = len(partial_result.value) + assert ( + block_rows == 0 + ), f"Expected only empty partial result block before getting the full result, but block has {block_rows} rows" - partial_result = new_partial_result # Full result - _, full_result = client.readDataWithoutProgress()[0] + full_result = client.readDataWithoutProgress()[0] - data_size = len(partial_result) + data_size = len(partial_result.value) assert all( - partial_result[i] <= full_result[i] for i in range(data_size) + partial_result.value[i] <= full_result.value[i] for i in range(data_size) ), f"Full result values should always be greater then partial result values. Full result {full_result}. Partial result {partial_result}" - for result in full_result: + for result in full_result.value: print(result) +def main(): + # Request with partial result limit less then full limit + run_query_without_errors("SELECT number FROM numbers_mt(5e6+1) ORDER BY -number LIMIT 5 SETTINGS max_threads = 1, partial_result_update_duration_ms = 1, max_rows_in_partial_result = 3", True) + + # Request with partial result limit greater then full limit + run_query_without_errors("SELECT number FROM numbers_mt(5e6+1) ORDER BY -number LIMIT 3 SETTINGS max_threads = 1, partial_result_update_duration_ms = 1, max_rows_in_partial_result = 5", True) + + # Request with OFFSET + run_query_without_errors("SELECT number FROM numbers_mt(5e6+1) ORDER BY -number LIMIT 3 OFFSET 1 SETTINGS max_threads = 1, partial_result_update_duration_ms = 1, max_rows_in_partial_result = 5", True) + + # Request with OFFSET greater then partial result limit (partial result pipeline use blocks with less then OFFSET, so there will be no elements in block after LimitPartialResultTransform) + run_query_without_errors("SELECT number FROM numbers_mt(5e6+1) ORDER BY -number LIMIT 3 OFFSET 15 SETTINGS max_threads = 1, partial_result_update_duration_ms = 1, max_rows_in_partial_result = 5", False) + if __name__ == "__main__": main() diff --git a/tests/queries/0_stateless/02833_partial_sorting_result_during_query_execution.reference b/tests/queries/0_stateless/02833_partial_sorting_result_during_query_execution.reference index 4d706474b22..211a193940a 100644 --- a/tests/queries/0_stateless/02833_partial_sorting_result_during_query_execution.reference +++ b/tests/queries/0_stateless/02833_partial_sorting_result_during_query_execution.reference @@ -1,21 +1,38 @@ Rows 0 Columns 1 Column number type UInt64 -Rows 10 Columns 1 +Rows 3 Columns 1 Column number type UInt64 -Rows 15 Columns 1 +Rows 5 Columns 1 Column number type UInt64 -10000000 -9999999 -9999998 -9999997 -9999996 -9999995 -9999994 -9999993 -9999992 -9999991 -9999990 -9999989 -9999988 -9999987 -9999986 +5000000 +4999999 +4999998 +4999997 +4999996 +Rows 0 Columns 1 +Column number type UInt64 +Rows 3 Columns 1 +Column number type UInt64 +Rows 3 Columns 1 +Column number type UInt64 +5000000 +4999999 +4999998 +Rows 0 Columns 1 +Column number type UInt64 +Rows 3 Columns 1 +Column number type UInt64 +Rows 3 Columns 1 +Column number type UInt64 +4999999 +4999998 +4999997 +Rows 0 Columns 1 +Column number type UInt64 +Rows 0 Columns 1 +Column number type UInt64 +Rows 3 Columns 1 +Column number type UInt64 +4999985 +4999984 +4999983 diff --git a/tests/queries/0_stateless/02834_partial_aggregating_result_during_query_execution.python b/tests/queries/0_stateless/02834_partial_aggregating_result_during_query_execution.python index 25e9e6b73c9..4306ae577d0 100644 --- a/tests/queries/0_stateless/02834_partial_aggregating_result_during_query_execution.python +++ b/tests/queries/0_stateless/02834_partial_aggregating_result_during_query_execution.python @@ -11,11 +11,31 @@ sys.path.insert(0, os.path.join(CURDIR, "helpers")) from tcp_client import TCPClient -def get_keys(result): - return [key for key, _ in rasults] +def get_keys(results): + return [key for key, _ in results] -def run_query_without_errors(query, support_partial_result, invariants=None): +def check_new_result(new_results, old_results, invariants, rows_limit): + if rows_limit is not None: + assert ( + len(new_results[0].value) <= rows_limit + ), f"Result should have no more then {rows_limit} rows. But it has {len(new_results[0].value)} rows" + + for new_result, old_result in zip(new_results, old_results): + assert ( + new_result.key == old_result.key + ), f"Keys in blocks should be in the same order. Full results keys {get_keys(full_results)}. Partial results keys {get_keys(partial_results)}" + + key = new_result.key + if key in invariants: + new_value = new_result.value + old_value = old_result.value + assert invariants[key]( + old_value, new_value + ), f"Problem with the invariant between new and old result for key: {key}. New value {new_value}. Old value {old_value}" + + +def run_query_without_errors(query, support_partial_result, invariants=None, rows_limit=None): if invariants is None: invariants = {} @@ -30,54 +50,35 @@ def run_query_without_errors(query, support_partial_result, invariants=None): partial_results = client.readDataWithoutProgress() if support_partial_result: assert ( - len(partial_results[0][1]) > 0 + len(partial_results) > 0 and len(partial_results[0].value) > 0 ), "Expected at least one block with a non-empty partial result before getting the full result" while True: new_partial_results = client.readDataWithoutProgress( need_print_info=False ) - if len(new_partial_results[0][1]) == 0: + if len(new_partial_results[0].value) == 0: break - for new_result, old_result in zip(new_partial_results, partial_results): - assert ( - new_result[0] == old_result[0] - ), "Keys in blocks should be in the same order" - - key = new_result[0] - if key in invariants: - old_value = old_result[1] - new_value = new_result[1] - assert invariants[key]( - old_value, new_value - ), f"Problem with the invariant between old and new versions of a partial result for key: {key}. Old value {old_value}, new value {new_value}" + check_new_result(new_partial_results, partial_results, invariants, rows_limit) + partial_results = new_partial_results else: + block_rows = len(partial_results[0].value) assert ( - len(partial_results[0][1]) == 0 - ), "Expected no non-empty partial result blocks before getting the full result" + block_rows == 0 + ), f"Expected only empty partial result block before getting the full result, but block has {block_rows} rows" # Full result full_results = client.readDataWithoutProgress() if support_partial_result: - for full_result, partial_result in zip(full_results, partial_results): - assert ( - full_result[0] == partial_result[0] - ), f"Keys in blocks should be in the same order. Full results keys {get_keys(full_results)}. Partial results keys {get_keys(partial_results)}" + check_new_result(full_results, partial_results, invariants, rows_limit) - key = full_result[0] - if key in invariants: - full_value = full_result[1] - partial_value = partial_result[1] - assert invariants[key]( - partial_value, full_value - ), f"Problem with the invariant between full and partial result for key: {key}. Partial value {partial_value}. Full value {full_value}" - - for key, value in full_results: - if isinstance(value[0], int): - print(key, value) + for data in full_results: + if isinstance(data.value[0], int): + print(data.key, data.value) -def supported_scenarios(): +def supported_scenarios_without_key(): + # Simple aggregation query query = "select median(number), stddevSamp(number), stddevPop(number), max(number), min(number), any(number), count(number), avg(number), sum(number) from numbers_mt(1e7+1) settings max_threads = 1, partial_result_update_duration_ms = 1" invariants = { "median(number)": lambda old_value, new_value: old_value <= new_value, @@ -87,21 +88,28 @@ def supported_scenarios(): "avg(number)": lambda old_value, new_value: old_value <= new_value, "sum(number)": lambda old_value, new_value: old_value <= new_value, } - run_query_without_errors(query, support_partial_result=True, invariants=invariants) + run_query_without_errors(query, support_partial_result=True, invariants=invariants, rows_limit=1) + + # Aggregation query with a nested ORDER BY subquery + query = "select median(number), stddevSamp(number), stddevPop(number), max(number), min(number), any(number), count(number), avg(number), sum(number) FROM (SELECT number FROM numbers_mt(1e7) ORDER BY -number LIMIT 3) settings max_threads = 1, partial_result_update_duration_ms=1" + + # Aggregation receives small partial result blocks from ORDER BY which always sends blocks with bigger values + invariants["min(number)"] = lambda old_value, new_value: old_value <= new_value + run_query_without_errors(query, support_partial_result=True, invariants=invariants, rows_limit=1) def unsupported_scenarios(): # Currently aggregator for partial result supports only single thread aggregation without key # Update test when multithreading or aggregation with GROUP BY will be supported for partial result updates - multithread_query = "select sum(number) from numbers_mt(1e7+1) settings max_threads = 2, partial_result_update_duration_ms = 1" + multithread_query = "select sum(number) from numbers_mt(1e7+1) settings max_threads = 2, partial_result_update_duration_ms = 100" run_query_without_errors(multithread_query, support_partial_result=False) - group_with_key_query = "select mod2, sum(number) from numbers_mt(1e7+1) group by number % 2 as mod2 settings max_threads = 1, partial_result_update_duration_ms = 1" + group_with_key_query = "select mod2, sum(number) from numbers_mt(1e7+1) group by number % 2 as mod2 settings max_threads = 1, partial_result_update_duration_ms = 100" run_query_without_errors(group_with_key_query, support_partial_result=False) def main(): - supported_scenarios() + supported_scenarios_without_key() unsupported_scenarios() diff --git a/tests/queries/0_stateless/02834_partial_aggregating_result_during_query_execution.reference b/tests/queries/0_stateless/02834_partial_aggregating_result_during_query_execution.reference index a97ee3f8d79..a813b18f24f 100644 --- a/tests/queries/0_stateless/02834_partial_aggregating_result_during_query_execution.reference +++ b/tests/queries/0_stateless/02834_partial_aggregating_result_during_query_execution.reference @@ -33,6 +33,41 @@ min(number) [0] any(number) [0] count(number) [10000001] sum(number) [50000005000000] +Rows 0 Columns 9 +Column median(number) type Float64 +Column stddevSamp(number) type Float64 +Column stddevPop(number) type Float64 +Column max(number) type UInt64 +Column min(number) type UInt64 +Column any(number) type UInt64 +Column count(number) type UInt64 +Column avg(number) type Float64 +Column sum(number) type UInt64 +Rows 1 Columns 9 +Column median(number) type Float64 +Column stddevSamp(number) type Float64 +Column stddevPop(number) type Float64 +Column max(number) type UInt64 +Column min(number) type UInt64 +Column any(number) type UInt64 +Column count(number) type UInt64 +Column avg(number) type Float64 +Column sum(number) type UInt64 +Rows 1 Columns 9 +Column median(number) type Float64 +Column stddevSamp(number) type Float64 +Column stddevPop(number) type Float64 +Column max(number) type UInt64 +Column min(number) type UInt64 +Column any(number) type UInt64 +Column count(number) type UInt64 +Column avg(number) type Float64 +Column sum(number) type UInt64 +max(number) [9999999] +min(number) [9999997] +any(number) [9999999] +count(number) [3] +sum(number) [29999994] Rows 0 Columns 1 Column sum(number) type UInt64 Rows 0 Columns 1 diff --git a/tests/queries/0_stateless/helpers/tcp_client.py b/tests/queries/0_stateless/helpers/tcp_client.py index bab449d71fe..e0989a000f4 100644 --- a/tests/queries/0_stateless/helpers/tcp_client.py +++ b/tests/queries/0_stateless/helpers/tcp_client.py @@ -57,6 +57,11 @@ def assertPacket(packet, expected): assert packet == expected, "Got: {}, expected: {}".format(packet, expected) +class Data(object): + def __init__(self, key, value): + self.key = key + self.value = value + class TCPClient(object): def __init__(self, timeout=30): self.timeout = timeout @@ -301,6 +306,6 @@ class TCPClient(object): if need_print_info: print("Column {} type {}".format(col_name, type_name)) - data.append((col_name, self.readRow(type_name, rows))) + data.append(Data(col_name, self.readRow(type_name, rows))) return data