diff --git a/src/Dictionaries/DirectDictionary.cpp b/src/Dictionaries/DirectDictionary.cpp index 3fc41b72d5e..6ecc216e370 100644 --- a/src/Dictionaries/DirectDictionary.cpp +++ b/src/Dictionaries/DirectDictionary.cpp @@ -274,7 +274,7 @@ public: Chunk generate() override { Chunk chunk; - while (!executor.pull(chunk)) + while (executor.pull(chunk)) { if (chunk) return chunk; diff --git a/src/Processors/ISource.cpp b/src/Processors/ISource.cpp index a36059b76a3..9e81c916118 100644 --- a/src/Processors/ISource.cpp +++ b/src/Processors/ISource.cpp @@ -50,6 +50,7 @@ ISource::Status ISource::prepare() void ISource::progress(size_t read_rows, size_t read_bytes) { + //std::cerr << "========= Progress " << read_rows << " from " << getName() << std::endl << StackTrace().toString() << std::endl; read_progress_was_set = true; read_progress.read_rows += read_rows; read_progress.read_bytes += read_bytes; diff --git a/src/Processors/QueryPlan/ExpressionStep.cpp b/src/Processors/QueryPlan/ExpressionStep.cpp index d640ca9e401..33d2ad6e1cf 100644 --- a/src/Processors/QueryPlan/ExpressionStep.cpp +++ b/src/Processors/QueryPlan/ExpressionStep.cpp @@ -80,8 +80,6 @@ void ExpressionStep::describeActions(FormatSettings & settings) const String prefix(settings.offset, ' '); bool first = true; - std::cerr << actions_dag->dumpDAG() << std::endl; - auto expression = std::make_shared(actions_dag); for (const auto & action : expression->getActions()) { diff --git a/src/QueryPipeline/ReadProgressCallback.cpp b/src/QueryPipeline/ReadProgressCallback.cpp index efa46a57731..2e5d19c0cf0 100644 --- a/src/QueryPipeline/ReadProgressCallback.cpp +++ b/src/QueryPipeline/ReadProgressCallback.cpp @@ -32,14 +32,13 @@ void ReadProgressCallback::setProcessListElement(QueryStatus * elem) /// /// NOTE: This can be done only if progress callback already set, since /// otherwise total_rows_approx will lost. - if (total_rows_approx != 0 && progress_callback) + size_t rows_approx = 0; + if (progress_callback && (rows_approx = total_rows_approx.exchange(0)) != 0) { - Progress total_rows_progress = {0, 0, total_rows_approx}; + Progress total_rows_progress = {0, 0, rows_approx}; progress_callback(total_rows_progress); process_list_elem->updateProgressIn(total_rows_progress); - - total_rows_approx = 0; } } @@ -48,22 +47,21 @@ bool ReadProgressCallback::onProgress(uint64_t read_rows, uint64_t read_bytes) if (!limits.speed_limits.checkTimeLimit(total_stopwatch, limits.timeout_overflow_mode)) return false; - if (total_rows_approx != 0) + size_t rows_approx = 0; + if ((rows_approx = total_rows_approx.exchange(0)) != 0) { - Progress total_rows_progress = {0, 0, total_rows_approx}; + Progress total_rows_progress = {0, 0, rows_approx}; if (progress_callback) progress_callback(total_rows_progress); if (process_list_elem) process_list_elem->updateProgressIn(total_rows_progress); - - total_rows_approx = 0; } Progress value {read_rows, read_bytes}; - if (progress_callback) + if (progress_callback && (read_rows || read_bytes)) progress_callback(value); if (process_list_elem) @@ -108,11 +106,14 @@ bool ReadProgressCallback::onProgress(uint64_t read_rows, uint64_t read_bytes) constexpr UInt64 profile_events_update_period_microseconds = 10 * 1000; // 10 milliseconds UInt64 total_elapsed_microseconds = total_stopwatch.elapsedMicroseconds(); - if (last_profile_events_update_time + profile_events_update_period_microseconds < total_elapsed_microseconds) + std::lock_guard lock(last_profile_events_update_time_mutex); { - /// TODO: Should be done in PipelineExecutor. - CurrentThread::updatePerformanceCounters(); - last_profile_events_update_time = total_elapsed_microseconds; + if (last_profile_events_update_time + profile_events_update_period_microseconds < total_elapsed_microseconds) + { + /// TODO: Should be done in PipelineExecutor. + CurrentThread::updatePerformanceCounters(); + last_profile_events_update_time = total_elapsed_microseconds; + } } /// TODO: Should be done in PipelineExecutor. diff --git a/src/QueryPipeline/ReadProgressCallback.h b/src/QueryPipeline/ReadProgressCallback.h index 5e825301b4c..9dc57fa1695 100644 --- a/src/QueryPipeline/ReadProgressCallback.h +++ b/src/QueryPipeline/ReadProgressCallback.h @@ -2,6 +2,7 @@ #include #include #include +#include namespace DB { @@ -33,11 +34,12 @@ private: QueryStatus * process_list_elem = nullptr; /// The approximate total number of rows to read. For progress bar. - size_t total_rows_approx = 0; + std::atomic_size_t total_rows_approx = 0; Stopwatch total_stopwatch {CLOCK_MONOTONIC_COARSE}; /// Time with waiting time. /// According to total_stopwatch in microseconds. UInt64 last_profile_events_update_time = 0; + std::mutex last_profile_events_update_time_mutex; bool update_profile_events = true; }; diff --git a/tests/queries/0_stateless/01508_explain_header.reference b/tests/queries/0_stateless/01508_explain_header.reference index 4bfbe1c818b..7510e67c643 100644 --- a/tests/queries/0_stateless/01508_explain_header.reference +++ b/tests/queries/0_stateless/01508_explain_header.reference @@ -1,6 +1,4 @@ Expression ((Projection + Before ORDER BY)) Header: x UInt8 - SettingQuotaAndLimits (Set limits and quota after reading from storage) + ReadFromStorage (SystemOne) Header: dummy UInt8 - ReadFromStorage (SystemOne) - Header: dummy UInt8 diff --git a/tests/queries/0_stateless/01551_mergetree_read_in_order_spread.reference b/tests/queries/0_stateless/01551_mergetree_read_in_order_spread.reference index b6b8b04907c..1d76d9bd631 100644 --- a/tests/queries/0_stateless/01551_mergetree_read_in_order_spread.reference +++ b/tests/queries/0_stateless/01551_mergetree_read_in_order_spread.reference @@ -7,12 +7,11 @@ ExpressionTransform × 3 AggregatingInOrderTransform × 3 (Expression) ExpressionTransform × 3 - (SettingQuotaAndLimits) - (ReadFromMergeTree) - ExpressionTransform × 4 - MergeTreeInOrder 0 → 1 - MergingSortedTransform 2 → 1 - ExpressionTransform × 2 - MergeTreeInOrder × 2 0 → 1 - ExpressionTransform - MergeTreeInOrder 0 → 1 + (ReadFromMergeTree) + ExpressionTransform × 4 + MergeTreeInOrder 0 → 1 + MergingSortedTransform 2 → 1 + ExpressionTransform × 2 + MergeTreeInOrder × 2 0 → 1 + ExpressionTransform + MergeTreeInOrder 0 → 1 diff --git a/tests/queries/0_stateless/01786_explain_merge_tree.reference b/tests/queries/0_stateless/01786_explain_merge_tree.reference index 25c7c37beca..7e0a91b203f 100644 --- a/tests/queries/0_stateless/01786_explain_merge_tree.reference +++ b/tests/queries/0_stateless/01786_explain_merge_tree.reference @@ -1,85 +1,83 @@ - ReadFromMergeTree (default.test_index) - Indexes: - MinMax - Keys: - y - Condition: (y in [1, +Inf)) - Parts: 4/5 - Granules: 11/12 - Partition - Keys: - y - bitAnd(z, 3) - Condition: and((bitAnd(z, 3) not in [1, 1]), and((y in [1, +Inf)), (bitAnd(z, 3) not in [1, 1]))) - Parts: 3/4 - Granules: 10/11 - PrimaryKey - Keys: - x - y - Condition: and((x in [11, +Inf)), (y in [1, +Inf))) - Parts: 2/3 - Granules: 6/10 - Skip - Name: t_minmax - Description: minmax GRANULARITY 2 - Parts: 1/2 - Granules: 2/6 - Skip - Name: t_set - Description: set GRANULARITY 2 - Parts: 1/1 - Granules: 1/2 + ReadFromMergeTree (default.test_index) + Indexes: + MinMax + Keys: + y + Condition: (y in [1, +Inf)) + Parts: 4/5 + Granules: 11/12 + Partition + Keys: + y + bitAnd(z, 3) + Condition: and((bitAnd(z, 3) not in [1, 1]), and((y in [1, +Inf)), (bitAnd(z, 3) not in [1, 1]))) + Parts: 3/4 + Granules: 10/11 + PrimaryKey + Keys: + x + y + Condition: and((x in [11, +Inf)), (y in [1, +Inf))) + Parts: 2/3 + Granules: 6/10 + Skip + Name: t_minmax + Description: minmax GRANULARITY 2 + Parts: 1/2 + Granules: 2/6 + Skip + Name: t_set + Description: set GRANULARITY 2 + Parts: 1/1 + Granules: 1/2 ----------------- - "Node Type": "ReadFromMergeTree", - "Description": "default.test_index", - "Indexes": [ - { - "Type": "MinMax", - "Keys": ["y"], - "Condition": "(y in [1, +Inf))", - "Initial Parts": 5, - "Selected Parts": 4, - "Initial Granules": 12, - "Selected Granules": 11 - }, - { - "Type": "Partition", - "Keys": ["y", "bitAnd(z, 3)"], - "Condition": "and((bitAnd(z, 3) not in [1, 1]), and((y in [1, +Inf)), (bitAnd(z, 3) not in [1, 1])))", - "Initial Parts": 4, - "Selected Parts": 3, - "Initial Granules": 11, - "Selected Granules": 10 - }, - { - "Type": "PrimaryKey", - "Keys": ["x", "y"], - "Condition": "and((x in [11, +Inf)), (y in [1, +Inf)))", - "Initial Parts": 3, - "Selected Parts": 2, - "Initial Granules": 10, - "Selected Granules": 6 - }, - { - "Type": "Skip", - "Name": "t_minmax", - "Description": "minmax GRANULARITY 2", - "Initial Parts": 2, - "Selected Parts": 1, - "Initial Granules": 6, - "Selected Granules": 2 - }, - { - "Type": "Skip", - "Name": "t_set", - "Description": "set GRANULARITY 2", - "Initial Parts": 1, - "Selected Parts": 1, - "Initial Granules": 2, - "Selected Granules": 1 - } - ] + "Node Type": "ReadFromMergeTree", + "Description": "default.test_index", + "Indexes": [ + { + "Type": "MinMax", + "Keys": ["y"], + "Condition": "(y in [1, +Inf))", + "Initial Parts": 5, + "Selected Parts": 4, + "Initial Granules": 12, + "Selected Granules": 11 + }, + { + "Type": "Partition", + "Keys": ["y", "bitAnd(z, 3)"], + "Condition": "and((bitAnd(z, 3) not in [1, 1]), and((y in [1, +Inf)), (bitAnd(z, 3) not in [1, 1])))", + "Initial Parts": 4, + "Selected Parts": 3, + "Initial Granules": 11, + "Selected Granules": 10 + }, + { + "Type": "PrimaryKey", + "Keys": ["x", "y"], + "Condition": "and((x in [11, +Inf)), (y in [1, +Inf)))", + "Initial Parts": 3, + "Selected Parts": 2, + "Initial Granules": 10, + "Selected Granules": 6 + }, + { + "Type": "Skip", + "Name": "t_minmax", + "Description": "minmax GRANULARITY 2", + "Initial Parts": 2, + "Selected Parts": 1, + "Initial Granules": 6, + "Selected Granules": 2 + }, + { + "Type": "Skip", + "Name": "t_set", + "Description": "set GRANULARITY 2", + "Initial Parts": 1, + "Selected Parts": 1, + "Initial Granules": 2, + "Selected Granules": 1 } ] } @@ -90,21 +88,21 @@ } ] ----------------- - ReadFromMergeTree (default.test_index) - ReadType: InOrder - Parts: 1 - Granules: 3 + ReadFromMergeTree (default.test_index) + ReadType: InOrder + Parts: 1 + Granules: 3 ----------------- - ReadFromMergeTree (default.test_index) - ReadType: InReverseOrder - Parts: 1 - Granules: 3 - ReadFromMergeTree (default.idx) - Indexes: - PrimaryKey - Keys: - x - plus(x, y) - Condition: or((x in 2-element set), (plus(plus(x, y), 1) in (-Inf, 2])) - Parts: 1/1 - Granules: 1/1 + ReadFromMergeTree (default.test_index) + ReadType: InReverseOrder + Parts: 1 + Granules: 3 + ReadFromMergeTree (default.idx) + Indexes: + PrimaryKey + Keys: + x + plus(x, y) + Condition: or((x in 2-element set), (plus(plus(x, y), 1) in (-Inf, 2])) + Parts: 1/1 + Granules: 1/1 diff --git a/tests/queries/0_stateless/01861_explain_pipeline.reference b/tests/queries/0_stateless/01861_explain_pipeline.reference index 63ba55f5a04..2ba294d7e4d 100644 --- a/tests/queries/0_stateless/01861_explain_pipeline.reference +++ b/tests/queries/0_stateless/01861_explain_pipeline.reference @@ -1,11 +1,10 @@ (Expression) ExpressionTransform - (SettingQuotaAndLimits) - (ReadFromMergeTree) - ExpressionTransform - ReplacingSorted 2 → 1 - ExpressionTransform × 2 - MergeTreeInOrder × 2 0 → 1 + (ReadFromMergeTree) + ExpressionTransform + ReplacingSorted 2 → 1 + ExpressionTransform × 2 + MergeTreeInOrder × 2 0 → 1 0 0 1 1 2 2 @@ -15,11 +14,10 @@ ExpressionTransform 6 6 (Expression) ExpressionTransform × 2 - (SettingQuotaAndLimits) - (ReadFromMergeTree) - ExpressionTransform × 2 - ReplacingSorted × 2 2 → 1 - Copy × 2 1 → 2 - AddingSelector × 2 - ExpressionTransform × 2 - MergeTreeInOrder × 2 0 → 1 + (ReadFromMergeTree) + ExpressionTransform × 2 + ReplacingSorted × 2 2 → 1 + Copy × 2 1 → 2 + AddingSelector × 2 + ExpressionTransform × 2 + MergeTreeInOrder × 2 0 → 1 diff --git a/tests/queries/0_stateless/01883_with_grouping_sets.reference b/tests/queries/0_stateless/01883_with_grouping_sets.reference index 83fda9556e7..8fae10a05a4 100644 --- a/tests/queries/0_stateless/01883_with_grouping_sets.reference +++ b/tests/queries/0_stateless/01883_with_grouping_sets.reference @@ -13,9 +13,8 @@ ExpressionTransform Copy 1 → 2 (Expression) ExpressionTransform - (SettingQuotaAndLimits) - (ReadFromStorage) - Memory 0 → 1 + (ReadFromStorage) + Memory 0 → 1 1 0 1 4500 1 0 3 4700 1 0 5 4900 @@ -106,9 +105,8 @@ ExpressionTransform Copy × 3 1 → 2 (Expression) ExpressionTransform × 3 - (SettingQuotaAndLimits) - (ReadFromStorage) - NumbersMt × 3 0 → 1 + (ReadFromStorage) + NumbersMt × 3 0 → 1 4999500000 10000 4999510000 10000 4999520000 10000 diff --git a/tests/queries/0_stateless/01951_distributed_push_down_limit.reference b/tests/queries/0_stateless/01951_distributed_push_down_limit.reference index d0e7a9ef15b..7f73a8c6554 100644 --- a/tests/queries/0_stateless/01951_distributed_push_down_limit.reference +++ b/tests/queries/0_stateless/01951_distributed_push_down_limit.reference @@ -3,26 +3,18 @@ explain select * from remote('127.{1,2}', view(select * from numbers(1e6))) orde Expression (Projection) Limit (preliminary LIMIT (without OFFSET)) Sorting (Merge sorted streams after aggregation stage for ORDER BY) - SettingQuotaAndLimits (Set limits and quota after reading from storage) - Union - Sorting (Sorting for ORDER BY) - Expression (Before ORDER BY) - SettingQuotaAndLimits (Set limits and quota after reading from storage) - Expression ((Convert VIEW subquery result to VIEW table structure + (Materialize constants after VIEW subquery + (Projection + Before ORDER BY)))) - SettingQuotaAndLimits (Set limits and quota after reading from storage) - ReadFromStorage (SystemNumbers) - ReadFromRemote (Read from remote replica) + Union + Sorting (Sorting for ORDER BY) + Expression ((Before ORDER BY + (Convert VIEW subquery result to VIEW table structure + (Materialize constants after VIEW subquery + (Projection + Before ORDER BY))))) + ReadFromStorage (SystemNumbers) + ReadFromRemote (Read from remote replica) explain select * from remote('127.{1,2}', view(select * from numbers(1e6))) order by number limit 10 settings distributed_push_down_limit=1; Expression (Projection) Limit (preliminary LIMIT (without OFFSET)) Sorting (Merge sorted streams after aggregation stage for ORDER BY) - SettingQuotaAndLimits (Set limits and quota after reading from storage) - Union - Limit (preliminary LIMIT (with OFFSET)) - Sorting (Sorting for ORDER BY) - Expression (Before ORDER BY) - SettingQuotaAndLimits (Set limits and quota after reading from storage) - Expression ((Convert VIEW subquery result to VIEW table structure + (Materialize constants after VIEW subquery + (Projection + Before ORDER BY)))) - SettingQuotaAndLimits (Set limits and quota after reading from storage) - ReadFromStorage (SystemNumbers) - ReadFromRemote (Read from remote replica) + Union + Limit (preliminary LIMIT (with OFFSET)) + Sorting (Sorting for ORDER BY) + Expression ((Before ORDER BY + (Convert VIEW subquery result to VIEW table structure + (Materialize constants after VIEW subquery + (Projection + Before ORDER BY))))) + ReadFromStorage (SystemNumbers) + ReadFromRemote (Read from remote replica) diff --git a/tests/queries/0_stateless/02013_lc_nullable_and_infinity.sql b/tests/queries/0_stateless/02013_lc_nullable_and_infinity.sql index c1c8a9c00b1..8cca4aa4e59 100644 --- a/tests/queries/0_stateless/02013_lc_nullable_and_infinity.sql +++ b/tests/queries/0_stateless/02013_lc_nullable_and_infinity.sql @@ -1,3 +1,3 @@ -set receive_timeout = '10', receive_data_timeout_ms = '10000', extremes = '1', allow_suspicious_low_cardinality_types = '1', force_primary_key = '1', join_use_nulls = '1', max_rows_to_read = '1', join_algorithm = 'partial_merge'; +set receive_timeout = '10', receive_data_timeout_ms = '10000', extremes = '1', allow_suspicious_low_cardinality_types = '1', force_primary_key = '1', join_use_nulls = '1', max_rows_to_read = '2', join_algorithm = 'partial_merge'; SELECT * FROM (SELECT dummy AS val FROM system.one) AS s1 ANY LEFT JOIN (SELECT toLowCardinality(dummy) AS rval FROM system.one) AS s2 ON (val + 9223372036854775806) = (rval * 1); diff --git a/tests/queries/0_stateless/02210_processors_profile_log.reference b/tests/queries/0_stateless/02210_processors_profile_log.reference index a056b445bbd..1a7dd64d657 100644 --- a/tests/queries/0_stateless/02210_processors_profile_log.reference +++ b/tests/queries/0_stateless/02210_processors_profile_log.reference @@ -2,9 +2,8 @@ EXPLAIN PIPELINE SELECT sleep(1); (Expression) ExpressionTransform - (SettingQuotaAndLimits) - (ReadFromStorage) - SourceFromSingleChunk 0 → 1 + (ReadFromStorage) + SourceFromSingleChunk 0 → 1 SELECT sleep(1) SETTINGS log_processors_profiles=true, log_queries=1, log_queries_min_type='QUERY_FINISH'; 0 SYSTEM FLUSH LOGS; diff --git a/tests/queries/0_stateless/02236_explain_pipeline_join.reference b/tests/queries/0_stateless/02236_explain_pipeline_join.reference index ed993e2a1e7..5d7a7bfc488 100644 --- a/tests/queries/0_stateless/02236_explain_pipeline_join.reference +++ b/tests/queries/0_stateless/02236_explain_pipeline_join.reference @@ -4,16 +4,14 @@ ExpressionTransform JoiningTransform 2 → 1 (Expression) ExpressionTransform - (SettingQuotaAndLimits) + (Limit) + Limit + (ReadFromStorage) + Numbers 0 → 1 + (Expression) + FillingRightJoinSide + ExpressionTransform (Limit) Limit (ReadFromStorage) Numbers 0 → 1 - (Expression) - FillingRightJoinSide - ExpressionTransform - (SettingQuotaAndLimits) - (Limit) - Limit - (ReadFromStorage) - Numbers 0 → 1