fix style

This commit is contained in:
alexX512 2023-08-29 13:15:38 +00:00
parent b3ba00f443
commit 238f6e8327
4 changed files with 34 additions and 14 deletions

View File

@ -664,13 +664,11 @@ void AggregatingTransform::consume(Chunk chunk)
{
auto block = getInputs().front().getHeader().cloneWithColumns(chunk.detachColumns());
block = materializeBlock(block);
LOG_DEBUG(log, "AggregatingTransform::consume. Merge Block columns {}", block.dumpNames());
if (!params->aggregator.mergeOnBlock(block, variants, no_more_keys))
is_consume_finished = true;
}
else
{
LOG_DEBUG(log, "AggregatingTransform::consume. Execute Block");
if (!params->aggregator.executeOnBlock(chunk.detachColumns(), 0, num_rows, variants, key_columns, aggregate_columns, no_more_keys))
is_consume_finished = true;
}

View File

@ -28,7 +28,8 @@ def run_query_without_errors(query, support_partial_result):
while True:
assert all(
a >= b for a, b in zip(partial_result.value, partial_result.value[1:])
a >= b
for a, b in zip(partial_result.value, partial_result.value[1:])
), "Partial result always should be sorted for this test"
new_partial_result = client.readDataWithoutProgress(
@ -39,7 +40,8 @@ def run_query_without_errors(query, support_partial_result):
data_size = len(partial_result.value)
assert all(
partial_result.value[i] <= new_partial_result.value[i] for i in range(data_size)
partial_result.value[i] <= new_partial_result.value[i]
for i in range(data_size)
), f"New partial result values should always be greater then old one because a new block contains more information about the full data. New result {new_partial_result}. Previous result {partial_result}"
partial_result = new_partial_result
@ -49,7 +51,6 @@ def run_query_without_errors(query, support_partial_result):
block_rows == 0
), f"Expected only empty partial result block before getting the full result, but block has {block_rows} rows"
# Full result
full_result = client.readDataWithoutProgress()[0]
@ -64,16 +65,28 @@ def run_query_without_errors(query, support_partial_result):
def main():
# Request with partial result limit less then full limit
run_query_without_errors("SELECT number FROM numbers_mt(5e6+1) ORDER BY -number LIMIT 5 SETTINGS max_threads = 1, partial_result_update_duration_ms = 1, max_rows_in_partial_result = 3", True)
run_query_without_errors(
"SELECT number FROM numbers_mt(5e6+1) ORDER BY -number LIMIT 5 SETTINGS max_threads = 1, partial_result_update_duration_ms = 1, max_rows_in_partial_result = 3",
support_partial_result=True,
)
# Request with partial result limit greater then full limit
run_query_without_errors("SELECT number FROM numbers_mt(5e6+1) ORDER BY -number LIMIT 3 SETTINGS max_threads = 1, partial_result_update_duration_ms = 1, max_rows_in_partial_result = 5", True)
run_query_without_errors(
"SELECT number FROM numbers_mt(5e6+1) ORDER BY -number LIMIT 3 SETTINGS max_threads = 1, partial_result_update_duration_ms = 1, max_rows_in_partial_result = 5",
support_partial_result=True,
)
# Request with OFFSET
run_query_without_errors("SELECT number FROM numbers_mt(5e6+1) ORDER BY -number LIMIT 3 OFFSET 1 SETTINGS max_threads = 1, partial_result_update_duration_ms = 1, max_rows_in_partial_result = 5", True)
run_query_without_errors(
"SELECT number FROM numbers_mt(5e6+1) ORDER BY -number LIMIT 3 OFFSET 1 SETTINGS max_threads = 1, partial_result_update_duration_ms = 1, max_rows_in_partial_result = 5",
support_partial_result=True,
)
# Request with OFFSET greater then partial result limit (partial result pipeline use blocks with less then OFFSET, so there will be no elements in block after LimitPartialResultTransform)
run_query_without_errors("SELECT number FROM numbers_mt(5e6+1) ORDER BY -number LIMIT 3 OFFSET 15 SETTINGS max_threads = 1, partial_result_update_duration_ms = 1, max_rows_in_partial_result = 5", False)
run_query_without_errors(
"SELECT number FROM numbers_mt(5e6+1) ORDER BY -number LIMIT 3 OFFSET 15 SETTINGS max_threads = 1, partial_result_update_duration_ms = 1, max_rows_in_partial_result = 5",
support_partial_result=False,
)
if __name__ == "__main__":
main()

View File

@ -35,7 +35,9 @@ def check_new_result(new_results, old_results, invariants, rows_limit):
), f"Problem with the invariant between new and old result for key: {key}. New value {new_value}. Old value {old_value}"
def run_query_without_errors(query, support_partial_result, invariants=None, rows_limit=None):
def run_query_without_errors(
query, support_partial_result, invariants=None, rows_limit=None
):
if invariants is None:
invariants = {}
@ -59,7 +61,9 @@ def run_query_without_errors(query, support_partial_result, invariants=None, row
if len(new_partial_results[0].value) == 0:
break
check_new_result(new_partial_results, partial_results, invariants, rows_limit)
check_new_result(
new_partial_results, partial_results, invariants, rows_limit
)
partial_results = new_partial_results
else:
block_rows = len(partial_results[0].value)
@ -88,14 +92,18 @@ def supported_scenarios_without_key():
"avg(number)": lambda old_value, new_value: old_value <= new_value,
"sum(number)": lambda old_value, new_value: old_value <= new_value,
}
run_query_without_errors(query, support_partial_result=True, invariants=invariants, rows_limit=1)
run_query_without_errors(
query, support_partial_result=True, invariants=invariants, rows_limit=1
)
# Aggregation query with a nested ORDER BY subquery
query = "select median(number), stddevSamp(number), stddevPop(number), max(number), min(number), any(number), count(number), avg(number), sum(number) FROM (SELECT number FROM numbers_mt(1e7) ORDER BY -number LIMIT 3) settings max_threads = 1, partial_result_update_duration_ms=1"
# Aggregation receives small partial result blocks from ORDER BY which always sends blocks with bigger values
invariants["min(number)"] = lambda old_value, new_value: old_value <= new_value
run_query_without_errors(query, support_partial_result=True, invariants=invariants, rows_limit=1)
invariants["min(number)"] = lambda old_value, new_value: old_value <= new_value
run_query_without_errors(
query, support_partial_result=True, invariants=invariants, rows_limit=1
)
def unsupported_scenarios():

View File

@ -63,6 +63,7 @@ class Data(object):
self.key = key
self.value = value
class TCPClient(object):
def __init__(self, timeout=30):
self.timeout = timeout