diff --git a/src/Processors/Transforms/AggregatingTransform.cpp b/src/Processors/Transforms/AggregatingTransform.cpp index b0faf0f96d4..b4d2785bed2 100644 --- a/src/Processors/Transforms/AggregatingTransform.cpp +++ b/src/Processors/Transforms/AggregatingTransform.cpp @@ -664,13 +664,11 @@ void AggregatingTransform::consume(Chunk chunk) { auto block = getInputs().front().getHeader().cloneWithColumns(chunk.detachColumns()); block = materializeBlock(block); - LOG_DEBUG(log, "AggregatingTransform::consume. Merge Block columns {}", block.dumpNames()); if (!params->aggregator.mergeOnBlock(block, variants, no_more_keys)) is_consume_finished = true; } else { - LOG_DEBUG(log, "AggregatingTransform::consume. Execute Block"); if (!params->aggregator.executeOnBlock(chunk.detachColumns(), 0, num_rows, variants, key_columns, aggregate_columns, no_more_keys)) is_consume_finished = true; } diff --git a/tests/queries/0_stateless/02833_partial_sorting_result_during_query_execution.python b/tests/queries/0_stateless/02833_partial_sorting_result_during_query_execution.python index 1fadddb0871..ec13a41d07b 100755 --- a/tests/queries/0_stateless/02833_partial_sorting_result_during_query_execution.python +++ b/tests/queries/0_stateless/02833_partial_sorting_result_during_query_execution.python @@ -28,7 +28,8 @@ def run_query_without_errors(query, support_partial_result): while True: assert all( - a >= b for a, b in zip(partial_result.value, partial_result.value[1:]) + a >= b + for a, b in zip(partial_result.value, partial_result.value[1:]) ), "Partial result always should be sorted for this test" new_partial_result = client.readDataWithoutProgress( @@ -39,7 +40,8 @@ def run_query_without_errors(query, support_partial_result): data_size = len(partial_result.value) assert all( - partial_result.value[i] <= new_partial_result.value[i] for i in range(data_size) + partial_result.value[i] <= new_partial_result.value[i] + for i in range(data_size) ), f"New partial result values should always be greater then old one because a new block contains more information about the full data. New result {new_partial_result}. Previous result {partial_result}" partial_result = new_partial_result @@ -49,7 +51,6 @@ def run_query_without_errors(query, support_partial_result): block_rows == 0 ), f"Expected only empty partial result block before getting the full result, but block has {block_rows} rows" - # Full result full_result = client.readDataWithoutProgress()[0] @@ -64,16 +65,28 @@ def run_query_without_errors(query, support_partial_result): def main(): # Request with partial result limit less then full limit - run_query_without_errors("SELECT number FROM numbers_mt(5e6+1) ORDER BY -number LIMIT 5 SETTINGS max_threads = 1, partial_result_update_duration_ms = 1, max_rows_in_partial_result = 3", True) + run_query_without_errors( + "SELECT number FROM numbers_mt(5e6+1) ORDER BY -number LIMIT 5 SETTINGS max_threads = 1, partial_result_update_duration_ms = 1, max_rows_in_partial_result = 3", + support_partial_result=True, + ) # Request with partial result limit greater then full limit - run_query_without_errors("SELECT number FROM numbers_mt(5e6+1) ORDER BY -number LIMIT 3 SETTINGS max_threads = 1, partial_result_update_duration_ms = 1, max_rows_in_partial_result = 5", True) + run_query_without_errors( + "SELECT number FROM numbers_mt(5e6+1) ORDER BY -number LIMIT 3 SETTINGS max_threads = 1, partial_result_update_duration_ms = 1, max_rows_in_partial_result = 5", + support_partial_result=True, + ) # Request with OFFSET - run_query_without_errors("SELECT number FROM numbers_mt(5e6+1) ORDER BY -number LIMIT 3 OFFSET 1 SETTINGS max_threads = 1, partial_result_update_duration_ms = 1, max_rows_in_partial_result = 5", True) + run_query_without_errors( + "SELECT number FROM numbers_mt(5e6+1) ORDER BY -number LIMIT 3 OFFSET 1 SETTINGS max_threads = 1, partial_result_update_duration_ms = 1, max_rows_in_partial_result = 5", + support_partial_result=True, + ) # Request with OFFSET greater then partial result limit (partial result pipeline use blocks with less then OFFSET, so there will be no elements in block after LimitPartialResultTransform) - run_query_without_errors("SELECT number FROM numbers_mt(5e6+1) ORDER BY -number LIMIT 3 OFFSET 15 SETTINGS max_threads = 1, partial_result_update_duration_ms = 1, max_rows_in_partial_result = 5", False) + run_query_without_errors( + "SELECT number FROM numbers_mt(5e6+1) ORDER BY -number LIMIT 3 OFFSET 15 SETTINGS max_threads = 1, partial_result_update_duration_ms = 1, max_rows_in_partial_result = 5", + support_partial_result=False, + ) if __name__ == "__main__": main() diff --git a/tests/queries/0_stateless/02834_partial_aggregating_result_during_query_execution.python b/tests/queries/0_stateless/02834_partial_aggregating_result_during_query_execution.python index 4306ae577d0..4d869b05580 100644 --- a/tests/queries/0_stateless/02834_partial_aggregating_result_during_query_execution.python +++ b/tests/queries/0_stateless/02834_partial_aggregating_result_during_query_execution.python @@ -35,7 +35,9 @@ def check_new_result(new_results, old_results, invariants, rows_limit): ), f"Problem with the invariant between new and old result for key: {key}. New value {new_value}. Old value {old_value}" -def run_query_without_errors(query, support_partial_result, invariants=None, rows_limit=None): +def run_query_without_errors( + query, support_partial_result, invariants=None, rows_limit=None +): if invariants is None: invariants = {} @@ -59,7 +61,9 @@ def run_query_without_errors(query, support_partial_result, invariants=None, row if len(new_partial_results[0].value) == 0: break - check_new_result(new_partial_results, partial_results, invariants, rows_limit) + check_new_result( + new_partial_results, partial_results, invariants, rows_limit + ) partial_results = new_partial_results else: block_rows = len(partial_results[0].value) @@ -88,14 +92,18 @@ def supported_scenarios_without_key(): "avg(number)": lambda old_value, new_value: old_value <= new_value, "sum(number)": lambda old_value, new_value: old_value <= new_value, } - run_query_without_errors(query, support_partial_result=True, invariants=invariants, rows_limit=1) + run_query_without_errors( + query, support_partial_result=True, invariants=invariants, rows_limit=1 + ) # Aggregation query with a nested ORDER BY subquery query = "select median(number), stddevSamp(number), stddevPop(number), max(number), min(number), any(number), count(number), avg(number), sum(number) FROM (SELECT number FROM numbers_mt(1e7) ORDER BY -number LIMIT 3) settings max_threads = 1, partial_result_update_duration_ms=1" # Aggregation receives small partial result blocks from ORDER BY which always sends blocks with bigger values - invariants["min(number)"] = lambda old_value, new_value: old_value <= new_value - run_query_without_errors(query, support_partial_result=True, invariants=invariants, rows_limit=1) + invariants["min(number)"] = lambda old_value, new_value: old_value <= new_value + run_query_without_errors( + query, support_partial_result=True, invariants=invariants, rows_limit=1 + ) def unsupported_scenarios(): diff --git a/tests/queries/0_stateless/helpers/tcp_client.py b/tests/queries/0_stateless/helpers/tcp_client.py index e9ce01f45ff..fdc4ab28e04 100644 --- a/tests/queries/0_stateless/helpers/tcp_client.py +++ b/tests/queries/0_stateless/helpers/tcp_client.py @@ -63,6 +63,7 @@ class Data(object): self.key = key self.value = value + class TCPClient(object): def __init__(self, timeout=30): self.timeout = timeout