From 8ab6564538a06d45545ade8c7f37da39587012ac Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Tue, 19 Dec 2023 13:30:59 +0100 Subject: [PATCH] Revert "Fix bug window functions: revert #39631" --- src/Interpreters/InterpreterSelectQuery.cpp | 1 + src/Planner/Planner.cpp | 1 + src/Processors/QueryPlan/SortingStep.cpp | 78 ++++++- src/Processors/QueryPlan/SortingStep.h | 30 ++- src/Processors/QueryPlan/WindowStep.cpp | 3 +- .../ScatterByPartitionTransform.cpp | 129 +++++++++++ .../Transforms/ScatterByPartitionTransform.h | 34 +++ ...568_window_functions_distributed.reference | 29 +++ .../01568_window_functions_distributed.sql | 4 + .../02884_parallel_window_functions.reference | 100 ++++++++ .../02884_parallel_window_functions.sql | 119 ++++++++++ ...2_window_functions_logical_error.reference | 216 ------------------ .../02942_window_functions_logical_error.sql | 158 ------------- 13 files changed, 521 insertions(+), 381 deletions(-) create mode 100644 src/Processors/Transforms/ScatterByPartitionTransform.cpp create mode 100644 src/Processors/Transforms/ScatterByPartitionTransform.h create mode 100644 tests/queries/0_stateless/02884_parallel_window_functions.reference create mode 100644 tests/queries/0_stateless/02884_parallel_window_functions.sql delete mode 100644 tests/queries/0_stateless/02942_window_functions_logical_error.reference delete mode 100644 tests/queries/0_stateless/02942_window_functions_logical_error.sql diff --git a/src/Interpreters/InterpreterSelectQuery.cpp b/src/Interpreters/InterpreterSelectQuery.cpp index 67245438156..4f4e96a9be7 100644 --- a/src/Interpreters/InterpreterSelectQuery.cpp +++ b/src/Interpreters/InterpreterSelectQuery.cpp @@ -2942,6 +2942,7 @@ void InterpreterSelectQuery::executeWindow(QueryPlan & query_plan) auto sorting_step = std::make_unique( query_plan.getCurrentDataStream(), window.full_sort_description, + window.partition_by, 0 /* LIMIT */, sort_settings, settings.optimize_sorting_by_input_stream_properties); diff --git a/src/Planner/Planner.cpp b/src/Planner/Planner.cpp index 2ab88491357..95c61f8d011 100644 --- a/src/Planner/Planner.cpp +++ b/src/Planner/Planner.cpp @@ -915,6 +915,7 @@ void addWindowSteps(QueryPlan & query_plan, auto sorting_step = std::make_unique( query_plan.getCurrentDataStream(), window_description.full_sort_description, + window_description.partition_by, 0 /*limit*/, sort_settings, settings.optimize_sorting_by_input_stream_properties); diff --git a/src/Processors/QueryPlan/SortingStep.cpp b/src/Processors/QueryPlan/SortingStep.cpp index 55ce763575e..641b9036d4c 100644 --- a/src/Processors/QueryPlan/SortingStep.cpp +++ b/src/Processors/QueryPlan/SortingStep.cpp @@ -1,3 +1,4 @@ +#include #include #include #include @@ -9,6 +10,8 @@ #include #include +#include +#include namespace CurrentMetrics { @@ -76,6 +79,21 @@ SortingStep::SortingStep( output_stream->sort_scope = DataStream::SortScope::Global; } +SortingStep::SortingStep( + const DataStream & input_stream, + const SortDescription & description_, + const SortDescription & partition_by_description_, + UInt64 limit_, + const Settings & settings_, + bool optimize_sorting_by_input_stream_properties_) + : SortingStep(input_stream, description_, limit_, settings_, optimize_sorting_by_input_stream_properties_) +{ + partition_by_description = partition_by_description_; + + output_stream->sort_description = result_description; + output_stream->sort_scope = DataStream::SortScope::Stream; +} + SortingStep::SortingStep( const DataStream & input_stream_, SortDescription prefix_description_, @@ -117,7 +135,11 @@ void SortingStep::updateOutputStream() { output_stream = createOutputStream(input_streams.front(), input_streams.front().header, getDataStreamTraits()); output_stream->sort_description = result_description; - output_stream->sort_scope = DataStream::SortScope::Global; + + if (partition_by_description.empty()) + output_stream->sort_scope = DataStream::SortScope::Global; + else + output_stream->sort_scope = DataStream::SortScope::Stream; } void SortingStep::updateLimit(size_t limit_) @@ -135,6 +157,55 @@ void SortingStep::convertToFinishSorting(SortDescription prefix_description_) prefix_description = std::move(prefix_description_); } +void SortingStep::scatterByPartitionIfNeeded(QueryPipelineBuilder& pipeline) +{ + size_t threads = pipeline.getNumThreads(); + size_t streams = pipeline.getNumStreams(); + + if (!partition_by_description.empty() && threads > 1) + { + Block stream_header = pipeline.getHeader(); + + ColumnNumbers key_columns; + key_columns.reserve(partition_by_description.size()); + for (auto & col : partition_by_description) + { + key_columns.push_back(stream_header.getPositionByName(col.column_name)); + } + + pipeline.transform([&](OutputPortRawPtrs ports) + { + Processors processors; + for (auto * port : ports) + { + auto scatter = std::make_shared(stream_header, threads, key_columns); + connect(*port, scatter->getInputs().front()); + processors.push_back(scatter); + } + return processors; + }); + + if (streams > 1) + { + pipeline.transform([&](OutputPortRawPtrs ports) + { + Processors processors; + for (size_t i = 0; i < threads; ++i) + { + size_t output_it = i; + auto resize = std::make_shared(stream_header, streams, 1); + auto & inputs = resize->getInputs(); + + for (auto input_it = inputs.begin(); input_it != inputs.end(); output_it += threads, ++input_it) + connect(*ports[output_it], *input_it); + processors.push_back(resize); + } + return processors; + }); + } + } +} + void SortingStep::finishSorting( QueryPipelineBuilder & pipeline, const SortDescription & input_sort_desc, const SortDescription & result_sort_desc, const UInt64 limit_) { @@ -260,10 +331,12 @@ void SortingStep::fullSortStreams( void SortingStep::fullSort( QueryPipelineBuilder & pipeline, const SortDescription & result_sort_desc, const UInt64 limit_, const bool skip_partial_sort) { + scatterByPartitionIfNeeded(pipeline); + fullSortStreams(pipeline, sort_settings, result_sort_desc, limit_, skip_partial_sort); /// If there are several streams, then we merge them into one - if (pipeline.getNumStreams() > 1) + if (pipeline.getNumStreams() > 1 && (partition_by_description.empty() || pipeline.getNumThreads() == 1)) { auto transform = std::make_shared( pipeline.getHeader(), @@ -295,6 +368,7 @@ void SortingStep::transformPipeline(QueryPipelineBuilder & pipeline, const Build { bool need_finish_sorting = (prefix_description.size() < result_description.size()); mergingSorted(pipeline, prefix_description, (need_finish_sorting ? 0 : limit)); + if (need_finish_sorting) { finishSorting(pipeline, prefix_description, result_description, limit); diff --git a/src/Processors/QueryPlan/SortingStep.h b/src/Processors/QueryPlan/SortingStep.h index 371a24ac6f2..52f48f66a32 100644 --- a/src/Processors/QueryPlan/SortingStep.h +++ b/src/Processors/QueryPlan/SortingStep.h @@ -40,6 +40,15 @@ public: const Settings & settings_, bool optimize_sorting_by_input_stream_properties_); + /// Full with partitioning + SortingStep( + const DataStream & input_stream, + const SortDescription & description_, + const SortDescription & partition_by_description_, + UInt64 limit_, + const Settings & settings_, + bool optimize_sorting_by_input_stream_properties_); + /// FinishSorting SortingStep( const DataStream & input_stream_, @@ -83,14 +92,24 @@ public: bool skip_partial_sort = false); private: + void scatterByPartitionIfNeeded(QueryPipelineBuilder& pipeline); void updateOutputStream() override; - static void - mergeSorting(QueryPipelineBuilder & pipeline, const Settings & sort_settings, const SortDescription & result_sort_desc, UInt64 limit_); + static void mergeSorting( + QueryPipelineBuilder & pipeline, + const Settings & sort_settings, + const SortDescription & result_sort_desc, + UInt64 limit_); - void mergingSorted(QueryPipelineBuilder & pipeline, const SortDescription & result_sort_desc, UInt64 limit_); + void mergingSorted( + QueryPipelineBuilder & pipeline, + const SortDescription & result_sort_desc, + UInt64 limit_); void finishSorting( - QueryPipelineBuilder & pipeline, const SortDescription & input_sort_desc, const SortDescription & result_sort_desc, UInt64 limit_); + QueryPipelineBuilder & pipeline, + const SortDescription & input_sort_desc, + const SortDescription & result_sort_desc, + UInt64 limit_); void fullSort( QueryPipelineBuilder & pipeline, const SortDescription & result_sort_desc, @@ -101,6 +120,9 @@ private: SortDescription prefix_description; const SortDescription result_description; + + SortDescription partition_by_description; + UInt64 limit; bool always_read_till_end = false; diff --git a/src/Processors/QueryPlan/WindowStep.cpp b/src/Processors/QueryPlan/WindowStep.cpp index 9c68a4b73d1..bb4f429d626 100644 --- a/src/Processors/QueryPlan/WindowStep.cpp +++ b/src/Processors/QueryPlan/WindowStep.cpp @@ -67,7 +67,8 @@ void WindowStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQ // This resize is needed for cases such as `over ()` when we don't have a // sort node, and the input might have multiple streams. The sort node would // have resized it. - pipeline.resize(1); + if (window_description.full_sort_description.empty()) + pipeline.resize(1); pipeline.addSimpleTransform( [&](const Block & /*header*/) diff --git a/src/Processors/Transforms/ScatterByPartitionTransform.cpp b/src/Processors/Transforms/ScatterByPartitionTransform.cpp new file mode 100644 index 00000000000..6e3cdc0fda1 --- /dev/null +++ b/src/Processors/Transforms/ScatterByPartitionTransform.cpp @@ -0,0 +1,129 @@ +#include + +#include +#include + +namespace DB +{ +ScatterByPartitionTransform::ScatterByPartitionTransform(Block header, size_t output_size_, ColumnNumbers key_columns_) + : IProcessor(InputPorts{header}, OutputPorts{output_size_, header}) + , output_size(output_size_) + , key_columns(std::move(key_columns_)) + , hash(0) +{} + +IProcessor::Status ScatterByPartitionTransform::prepare() +{ + auto & input = getInputs().front(); + + /// Check all outputs are finished or ready to get data. + + bool all_finished = true; + for (auto & output : outputs) + { + if (output.isFinished()) + continue; + + all_finished = false; + } + + if (all_finished) + { + input.close(); + return Status::Finished; + } + + if (!all_outputs_processed) + { + auto output_it = outputs.begin(); + bool can_push = false; + for (size_t i = 0; i < output_size; ++i, ++output_it) + if (!was_output_processed[i] && output_it->canPush()) + can_push = true; + if (!can_push) + return Status::PortFull; + return Status::Ready; + } + /// Try get chunk from input. + + if (input.isFinished()) + { + for (auto & output : outputs) + output.finish(); + + return Status::Finished; + } + + input.setNeeded(); + if (!input.hasData()) + return Status::NeedData; + + chunk = input.pull(); + has_data = true; + was_output_processed.assign(outputs.size(), false); + + return Status::Ready; +} + +void ScatterByPartitionTransform::work() +{ + if (all_outputs_processed) + generateOutputChunks(); + all_outputs_processed = true; + + size_t chunk_number = 0; + for (auto & output : outputs) + { + auto & was_processed = was_output_processed[chunk_number]; + auto & output_chunk = output_chunks[chunk_number]; + ++chunk_number; + + if (was_processed) + continue; + + if (output.isFinished()) + continue; + + if (!output.canPush()) + { + all_outputs_processed = false; + continue; + } + + output.push(std::move(output_chunk)); + was_processed = true; + } + + if (all_outputs_processed) + { + has_data = false; + output_chunks.clear(); + } +} + +void ScatterByPartitionTransform::generateOutputChunks() +{ + auto num_rows = chunk.getNumRows(); + const auto & columns = chunk.getColumns(); + + hash.reset(num_rows); + + for (const auto & column_number : key_columns) + columns[column_number]->updateWeakHash32(hash); + + const auto & hash_data = hash.getData(); + IColumn::Selector selector(num_rows); + + for (size_t row = 0; row < num_rows; ++row) + selector[row] = hash_data[row] % output_size; + + output_chunks.resize(output_size); + for (const auto & column : columns) + { + auto filtered_columns = column->scatter(output_size, selector); + for (size_t i = 0; i < output_size; ++i) + output_chunks[i].addColumn(std::move(filtered_columns[i])); + } +} + +} diff --git a/src/Processors/Transforms/ScatterByPartitionTransform.h b/src/Processors/Transforms/ScatterByPartitionTransform.h new file mode 100644 index 00000000000..327f6dd62b4 --- /dev/null +++ b/src/Processors/Transforms/ScatterByPartitionTransform.h @@ -0,0 +1,34 @@ +#pragma once +#include +#include +#include + +namespace DB +{ + +struct ScatterByPartitionTransform : IProcessor +{ + ScatterByPartitionTransform(Block header, size_t output_size_, ColumnNumbers key_columns_); + + String getName() const override { return "ScatterByPartitionTransform"; } + + Status prepare() override; + void work() override; + +private: + + void generateOutputChunks(); + + size_t output_size; + ColumnNumbers key_columns; + + bool has_data = false; + bool all_outputs_processed = true; + std::vector was_output_processed; + Chunk chunk; + + WeakHash32 hash; + Chunks output_chunks; +}; + +} diff --git a/tests/queries/0_stateless/01568_window_functions_distributed.reference b/tests/queries/0_stateless/01568_window_functions_distributed.reference index 13ac0769a24..29ff2e7133c 100644 --- a/tests/queries/0_stateless/01568_window_functions_distributed.reference +++ b/tests/queries/0_stateless/01568_window_functions_distributed.reference @@ -22,6 +22,16 @@ select sum(number) over w as x, max(number) over w as y from t_01568 window w as 21 8 21 8 21 8 +select sum(number) over w, max(number) over w from t_01568 window w as (partition by p) order by p; +3 2 +3 2 +3 2 +12 5 +12 5 +12 5 +21 8 +21 8 +21 8 select sum(number) over w as x, max(number) over w as y from remote('127.0.0.{1,2}', '', t_01568) window w as (partition by p) order by x, y; 6 2 6 2 @@ -41,6 +51,25 @@ select sum(number) over w as x, max(number) over w as y from remote('127.0.0.{1, 42 8 42 8 42 8 +select sum(number) over w as x, max(number) over w as y from remote('127.0.0.{1,2}', '', t_01568) window w as (partition by p) order by x, y SETTINGS max_threads = 1; +6 2 +6 2 +6 2 +6 2 +6 2 +6 2 +24 5 +24 5 +24 5 +24 5 +24 5 +24 5 +42 8 +42 8 +42 8 +42 8 +42 8 +42 8 select distinct sum(number) over w as x, max(number) over w as y from remote('127.0.0.{1,2}', '', t_01568) window w as (partition by p) order by x, y; 6 2 24 5 diff --git a/tests/queries/0_stateless/01568_window_functions_distributed.sql b/tests/queries/0_stateless/01568_window_functions_distributed.sql index 95072d6460f..ecce7b412ba 100644 --- a/tests/queries/0_stateless/01568_window_functions_distributed.sql +++ b/tests/queries/0_stateless/01568_window_functions_distributed.sql @@ -15,8 +15,12 @@ from numbers(9); select sum(number) over w as x, max(number) over w as y from t_01568 window w as (partition by p) order by x, y; +select sum(number) over w, max(number) over w from t_01568 window w as (partition by p) order by p; + select sum(number) over w as x, max(number) over w as y from remote('127.0.0.{1,2}', '', t_01568) window w as (partition by p) order by x, y; +select sum(number) over w as x, max(number) over w as y from remote('127.0.0.{1,2}', '', t_01568) window w as (partition by p) order by x, y SETTINGS max_threads = 1; + select distinct sum(number) over w as x, max(number) over w as y from remote('127.0.0.{1,2}', '', t_01568) window w as (partition by p) order by x, y; -- window functions + aggregation w/shards diff --git a/tests/queries/0_stateless/02884_parallel_window_functions.reference b/tests/queries/0_stateless/02884_parallel_window_functions.reference new file mode 100644 index 00000000000..bac15838dc2 --- /dev/null +++ b/tests/queries/0_stateless/02884_parallel_window_functions.reference @@ -0,0 +1,100 @@ +1 +-- { echoOn } + +SELECT + nw, + sum(WR) AS R, + sumIf(WR, uniq_rows = 1) AS UNR +FROM +( + SELECT + uniq(nw) OVER (PARTITION BY ac) AS uniq_rows, + AVG(wg) AS WR, + ac, + nw + FROM window_funtion_threading + GROUP BY ac, nw +) +GROUP BY nw +ORDER BY nw ASC, R DESC +LIMIT 10; +0 2 0 +1 2 0 +2 2 0 +SELECT + nw, + sum(WR) AS R, + sumIf(WR, uniq_rows = 1) AS UNR +FROM +( + SELECT + uniq(nw) OVER (PARTITION BY ac) AS uniq_rows, + AVG(wg) AS WR, + ac, + nw + FROM window_funtion_threading + GROUP BY ac, nw +) +GROUP BY nw +ORDER BY nw ASC, R DESC +LIMIT 10 +SETTINGS max_threads = 1; +0 2 0 +1 2 0 +2 2 0 +SELECT + nw, + sum(WR) AS R, + sumIf(WR, uniq_rows = 1) AS UNR +FROM +( + SELECT + uniq(nw) OVER (PARTITION BY ac) AS uniq_rows, + AVG(wg) AS WR, + ac, + nw + FROM window_funtion_threading + WHERE (ac % 4) = 0 + GROUP BY + ac, + nw + UNION ALL + SELECT + uniq(nw) OVER (PARTITION BY ac) AS uniq_rows, + AVG(wg) AS WR, + ac, + nw + FROM window_funtion_threading + WHERE (ac % 4) = 1 + GROUP BY + ac, + nw + UNION ALL + SELECT + uniq(nw) OVER (PARTITION BY ac) AS uniq_rows, + AVG(wg) AS WR, + ac, + nw + FROM window_funtion_threading + WHERE (ac % 4) = 2 + GROUP BY + ac, + nw + UNION ALL + SELECT + uniq(nw) OVER (PARTITION BY ac) AS uniq_rows, + AVG(wg) AS WR, + ac, + nw + FROM window_funtion_threading + WHERE (ac % 4) = 3 + GROUP BY + ac, + nw +) +GROUP BY nw +ORDER BY nw ASC, R DESC +LIMIT 10; +0 2 0 +1 2 0 +2 2 0 diff --git a/tests/queries/0_stateless/02884_parallel_window_functions.sql b/tests/queries/0_stateless/02884_parallel_window_functions.sql new file mode 100644 index 00000000000..3151b42f896 --- /dev/null +++ b/tests/queries/0_stateless/02884_parallel_window_functions.sql @@ -0,0 +1,119 @@ +CREATE TABLE window_funtion_threading +Engine = MergeTree +ORDER BY (ac, nw) +AS SELECT + toUInt64(toFloat32(number % 2) % 20000000) as ac, + toFloat32(1) as wg, + toUInt16(toFloat32(number % 3) % 400) as nw +FROM numbers_mt(10000000); + +SELECT count() FROM (EXPLAIN PIPELINE SELECT + nw, + sum(WR) AS R, + sumIf(WR, uniq_rows = 1) AS UNR +FROM +( + SELECT + uniq(nw) OVER (PARTITION BY ac) AS uniq_rows, + AVG(wg) AS WR, + ac, + nw + FROM window_funtion_threading + GROUP BY ac, nw +) +GROUP BY nw +ORDER BY nw ASC, R DESC +LIMIT 10) where explain ilike '%ScatterByPartitionTransform%' SETTINGS max_threads = 4; + +-- { echoOn } + +SELECT + nw, + sum(WR) AS R, + sumIf(WR, uniq_rows = 1) AS UNR +FROM +( + SELECT + uniq(nw) OVER (PARTITION BY ac) AS uniq_rows, + AVG(wg) AS WR, + ac, + nw + FROM window_funtion_threading + GROUP BY ac, nw +) +GROUP BY nw +ORDER BY nw ASC, R DESC +LIMIT 10; + +SELECT + nw, + sum(WR) AS R, + sumIf(WR, uniq_rows = 1) AS UNR +FROM +( + SELECT + uniq(nw) OVER (PARTITION BY ac) AS uniq_rows, + AVG(wg) AS WR, + ac, + nw + FROM window_funtion_threading + GROUP BY ac, nw +) +GROUP BY nw +ORDER BY nw ASC, R DESC +LIMIT 10 +SETTINGS max_threads = 1; + +SELECT + nw, + sum(WR) AS R, + sumIf(WR, uniq_rows = 1) AS UNR +FROM +( + SELECT + uniq(nw) OVER (PARTITION BY ac) AS uniq_rows, + AVG(wg) AS WR, + ac, + nw + FROM window_funtion_threading + WHERE (ac % 4) = 0 + GROUP BY + ac, + nw + UNION ALL + SELECT + uniq(nw) OVER (PARTITION BY ac) AS uniq_rows, + AVG(wg) AS WR, + ac, + nw + FROM window_funtion_threading + WHERE (ac % 4) = 1 + GROUP BY + ac, + nw + UNION ALL + SELECT + uniq(nw) OVER (PARTITION BY ac) AS uniq_rows, + AVG(wg) AS WR, + ac, + nw + FROM window_funtion_threading + WHERE (ac % 4) = 2 + GROUP BY + ac, + nw + UNION ALL + SELECT + uniq(nw) OVER (PARTITION BY ac) AS uniq_rows, + AVG(wg) AS WR, + ac, + nw + FROM window_funtion_threading + WHERE (ac % 4) = 3 + GROUP BY + ac, + nw +) +GROUP BY nw +ORDER BY nw ASC, R DESC +LIMIT 10; diff --git a/tests/queries/0_stateless/02942_window_functions_logical_error.reference b/tests/queries/0_stateless/02942_window_functions_logical_error.reference deleted file mode 100644 index 73f8351d9df..00000000000 --- a/tests/queries/0_stateless/02942_window_functions_logical_error.reference +++ /dev/null @@ -1,216 +0,0 @@ -1 901 19 -1 911 19 -1 921 19 -1 931 19 -1 941 19 -1 951 20 -1 961 20 -1 971 20 -1 981 20 -1 991 20 -2 902 19 -2 912 19 -2 922 19 -2 932 19 -2 942 19 -2 952 20 -2 962 20 -2 972 20 -2 982 20 -2 992 20 -3 903 19 -3 913 19 -3 923 19 -3 933 19 -3 943 19 -3 953 20 -3 963 20 -3 973 20 -3 983 20 -3 993 20 -4 904 19 -4 914 19 -4 924 19 -4 934 19 -4 944 19 -4 954 20 -4 964 20 -4 974 20 -4 984 20 -4 994 20 -5 905 19 -5 915 19 -5 925 19 -5 935 19 -5 945 19 -5 955 20 -5 965 20 -5 975 20 -5 985 20 -5 995 20 -6 906 19 -6 916 19 -6 926 19 -6 936 19 -6 946 19 -6 956 20 -6 966 20 -6 976 20 -6 986 20 -6 996 20 -7 907 19 -7 917 19 -7 927 19 -7 937 19 -7 947 19 -7 957 20 -7 967 20 -7 977 20 -7 987 20 -7 997 20 -8 908 19 -8 918 19 -8 928 19 -8 938 19 -8 948 19 -8 958 20 -8 968 20 -8 978 20 -8 988 20 -8 998 20 -9 909 19 -9 919 19 -9 929 19 -9 939 19 -9 949 19 -9 959 20 -9 969 20 -9 979 20 -9 989 20 -9 999 20 -1 1301 19 -1 1311 19 -1 1321 19 -1 1331 19 -1 1341 19 -1 1351 19 -1 1361 19 -1 1371 20 -1 1381 20 -1 1391 20 -1 1401 20 -1 1411 20 -1 1421 20 -1 1431 20 -2 1302 19 -2 1312 19 -2 1322 19 -2 1332 19 -2 1342 19 -2 1352 19 -2 1362 19 -2 1372 20 -2 1382 20 -2 1392 20 -2 1402 20 -2 1412 20 -2 1422 20 -2 1432 20 -3 1303 19 -3 1313 19 -3 1323 19 -3 1333 19 -3 1343 19 -3 1353 19 -3 1363 19 -3 1373 20 -3 1383 20 -3 1393 20 -3 1403 20 -3 1413 20 -3 1423 20 -3 1433 20 -4 1304 19 -4 1314 19 -4 1324 19 -4 1334 19 -4 1344 19 -4 1354 19 -4 1364 19 -4 1374 20 -4 1384 20 -4 1394 20 -4 1404 20 -4 1414 20 -4 1424 20 -4 1434 20 -5 1305 19 -5 1315 19 -5 1325 19 -5 1335 19 -5 1345 19 -5 1355 19 -5 1365 19 -5 1375 20 -5 1385 20 -5 1395 20 -5 1405 20 -5 1415 20 -5 1425 20 -5 1435 20 -6 1306 19 -6 1316 19 -6 1326 19 -6 1336 19 -6 1346 19 -6 1356 19 -6 1366 19 -6 1376 20 -6 1386 20 -6 1396 20 -6 1406 20 -6 1416 20 -6 1426 20 -6 1436 20 -7 1307 19 -7 1317 19 -7 1327 19 -7 1337 19 -7 1347 19 -7 1357 19 -7 1367 19 -7 1377 20 -7 1387 20 -7 1397 20 -7 1407 20 -7 1417 20 -7 1427 20 -7 1437 20 -8 1308 19 -8 1318 19 -8 1328 19 -8 1338 19 -8 1348 19 -8 1358 19 -8 1368 19 -8 1378 20 -8 1388 20 -8 1398 20 -8 1408 20 -8 1418 20 -8 1428 20 -8 1438 20 -9 1309 19 -9 1319 19 -9 1329 19 -9 1339 19 -9 1349 19 -9 1359 19 -9 1369 19 -9 1379 20 -9 1389 20 -9 1399 20 -9 1409 20 -9 1419 20 -9 1429 20 -9 1439 20 diff --git a/tests/queries/0_stateless/02942_window_functions_logical_error.sql b/tests/queries/0_stateless/02942_window_functions_logical_error.sql deleted file mode 100644 index 1e4371a134f..00000000000 --- a/tests/queries/0_stateless/02942_window_functions_logical_error.sql +++ /dev/null @@ -1,158 +0,0 @@ -DROP TABLE IF EXISTS posts; -DROP TABLE IF EXISTS post_metrics; - -CREATE TABLE IF NOT EXISTS posts -( - `page_id` LowCardinality(String), - `post_id` String CODEC(LZ4), - `host_id` UInt32 CODEC(T64, LZ4), - `path_id` UInt32, - `created` DateTime CODEC(T64, LZ4), - `as_of` DateTime CODEC(T64, LZ4) -) -ENGINE = ReplacingMergeTree(as_of) -PARTITION BY toStartOfMonth(created) -ORDER BY (page_id, post_id) -TTL created + toIntervalMonth(26); - - -INSERT INTO posts SELECT - repeat('a', (number % 10) + 1), - toString(number), - number % 10, - number, - now() - toIntervalMinute(number), - now() -FROM numbers(1000); - - -CREATE TABLE IF NOT EXISTS post_metrics -( - `page_id` LowCardinality(String), - `post_id` String CODEC(LZ4), - `created` DateTime CODEC(T64, LZ4), - `impressions` UInt32 CODEC(T64, LZ4), - `clicks` UInt32 CODEC(T64, LZ4), - `as_of` DateTime CODEC(T64, LZ4) -) -ENGINE = ReplacingMergeTree(as_of) -PARTITION BY toStartOfMonth(created) -ORDER BY (page_id, post_id) -TTL created + toIntervalMonth(26); - - -INSERT INTO post_metrics SELECT - repeat('a', (number % 10) + 1), - toString(number), - now() - toIntervalMinute(number), - number * 100, - number * 10, - now() -FROM numbers(1000); - - -SELECT - host_id, - path_id, - max(rank) AS rank -FROM -( - WITH - as_of_posts AS - ( - SELECT - *, - row_number() OVER (PARTITION BY (page_id, post_id) ORDER BY as_of DESC) AS row_num - FROM posts - WHERE (created >= subtractHours(now(), 24)) AND (host_id > 0) - ), - as_of_post_metrics AS - ( - SELECT - *, - row_number() OVER (PARTITION BY (page_id, post_id) ORDER BY as_of DESC) AS row_num - FROM post_metrics - WHERE created >= subtractHours(now(), 24) - ) - SELECT - page_id, - post_id, - host_id, - path_id, - impressions, - clicks, - ntile(20) OVER (PARTITION BY page_id ORDER BY clicks ASC ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS rank - FROM as_of_posts - GLOBAL LEFT JOIN as_of_post_metrics USING (page_id, post_id, row_num) - WHERE (row_num = 1) AND (impressions > 0) -) AS t -WHERE t.rank > 18 -GROUP BY - host_id, - path_id -ORDER BY host_id, path_id; - - -INSERT INTO posts SELECT - repeat('a', (number % 10) + 1), - toString(number), - number % 10, - number, - now() - toIntervalMinute(number), - now() -FROM numbers(100000); - - -INSERT INTO post_metrics SELECT - repeat('a', (number % 10) + 1), - toString(number), - now() - toIntervalMinute(number), - number * 100, - number * 10, - now() -FROM numbers(100000); - - -SELECT - host_id, - path_id, - max(rank) AS rank -FROM -( - WITH - as_of_posts AS - ( - SELECT - *, - row_number() OVER (PARTITION BY (page_id, post_id) ORDER BY as_of DESC) AS row_num - FROM posts - WHERE (created >= subtractHours(now(), 24)) AND (host_id > 0) - ), - as_of_post_metrics AS - ( - SELECT - *, - row_number() OVER (PARTITION BY (page_id, post_id) ORDER BY as_of DESC) AS row_num - FROM post_metrics - WHERE created >= subtractHours(now(), 24) - ) - SELECT - page_id, - post_id, - host_id, - path_id, - impressions, - clicks, - ntile(20) OVER (PARTITION BY page_id ORDER BY clicks ASC ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS rank - FROM as_of_posts - GLOBAL LEFT JOIN as_of_post_metrics USING (page_id, post_id, row_num) - WHERE (row_num = 1) AND (impressions > 0) -) AS t -WHERE t.rank > 18 -GROUP BY - host_id, - path_id -ORDER BY host_id, path_id; - -DROP TABLE posts; -DROP TABLE post_metrics;