mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-29 02:52:13 +00:00
Revert "Fix bug window functions: revert #39631"
This commit is contained in:
parent
91609a104c
commit
8ab6564538
@ -2942,6 +2942,7 @@ void InterpreterSelectQuery::executeWindow(QueryPlan & query_plan)
|
|||||||
auto sorting_step = std::make_unique<SortingStep>(
|
auto sorting_step = std::make_unique<SortingStep>(
|
||||||
query_plan.getCurrentDataStream(),
|
query_plan.getCurrentDataStream(),
|
||||||
window.full_sort_description,
|
window.full_sort_description,
|
||||||
|
window.partition_by,
|
||||||
0 /* LIMIT */,
|
0 /* LIMIT */,
|
||||||
sort_settings,
|
sort_settings,
|
||||||
settings.optimize_sorting_by_input_stream_properties);
|
settings.optimize_sorting_by_input_stream_properties);
|
||||||
|
@ -915,6 +915,7 @@ void addWindowSteps(QueryPlan & query_plan,
|
|||||||
auto sorting_step = std::make_unique<SortingStep>(
|
auto sorting_step = std::make_unique<SortingStep>(
|
||||||
query_plan.getCurrentDataStream(),
|
query_plan.getCurrentDataStream(),
|
||||||
window_description.full_sort_description,
|
window_description.full_sort_description,
|
||||||
|
window_description.partition_by,
|
||||||
0 /*limit*/,
|
0 /*limit*/,
|
||||||
sort_settings,
|
sort_settings,
|
||||||
settings.optimize_sorting_by_input_stream_properties);
|
settings.optimize_sorting_by_input_stream_properties);
|
||||||
|
@ -1,3 +1,4 @@
|
|||||||
|
#include <memory>
|
||||||
#include <stdexcept>
|
#include <stdexcept>
|
||||||
#include <IO/Operators.h>
|
#include <IO/Operators.h>
|
||||||
#include <Processors/Merges/MergingSortedTransform.h>
|
#include <Processors/Merges/MergingSortedTransform.h>
|
||||||
@ -9,6 +10,8 @@
|
|||||||
#include <QueryPipeline/QueryPipelineBuilder.h>
|
#include <QueryPipeline/QueryPipelineBuilder.h>
|
||||||
#include <Common/JSONBuilder.h>
|
#include <Common/JSONBuilder.h>
|
||||||
|
|
||||||
|
#include <Processors/ResizeProcessor.h>
|
||||||
|
#include <Processors/Transforms/ScatterByPartitionTransform.h>
|
||||||
|
|
||||||
namespace CurrentMetrics
|
namespace CurrentMetrics
|
||||||
{
|
{
|
||||||
@ -76,6 +79,21 @@ SortingStep::SortingStep(
|
|||||||
output_stream->sort_scope = DataStream::SortScope::Global;
|
output_stream->sort_scope = DataStream::SortScope::Global;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SortingStep::SortingStep(
|
||||||
|
const DataStream & input_stream,
|
||||||
|
const SortDescription & description_,
|
||||||
|
const SortDescription & partition_by_description_,
|
||||||
|
UInt64 limit_,
|
||||||
|
const Settings & settings_,
|
||||||
|
bool optimize_sorting_by_input_stream_properties_)
|
||||||
|
: SortingStep(input_stream, description_, limit_, settings_, optimize_sorting_by_input_stream_properties_)
|
||||||
|
{
|
||||||
|
partition_by_description = partition_by_description_;
|
||||||
|
|
||||||
|
output_stream->sort_description = result_description;
|
||||||
|
output_stream->sort_scope = DataStream::SortScope::Stream;
|
||||||
|
}
|
||||||
|
|
||||||
SortingStep::SortingStep(
|
SortingStep::SortingStep(
|
||||||
const DataStream & input_stream_,
|
const DataStream & input_stream_,
|
||||||
SortDescription prefix_description_,
|
SortDescription prefix_description_,
|
||||||
@ -117,7 +135,11 @@ void SortingStep::updateOutputStream()
|
|||||||
{
|
{
|
||||||
output_stream = createOutputStream(input_streams.front(), input_streams.front().header, getDataStreamTraits());
|
output_stream = createOutputStream(input_streams.front(), input_streams.front().header, getDataStreamTraits());
|
||||||
output_stream->sort_description = result_description;
|
output_stream->sort_description = result_description;
|
||||||
|
|
||||||
|
if (partition_by_description.empty())
|
||||||
output_stream->sort_scope = DataStream::SortScope::Global;
|
output_stream->sort_scope = DataStream::SortScope::Global;
|
||||||
|
else
|
||||||
|
output_stream->sort_scope = DataStream::SortScope::Stream;
|
||||||
}
|
}
|
||||||
|
|
||||||
void SortingStep::updateLimit(size_t limit_)
|
void SortingStep::updateLimit(size_t limit_)
|
||||||
@ -135,6 +157,55 @@ void SortingStep::convertToFinishSorting(SortDescription prefix_description_)
|
|||||||
prefix_description = std::move(prefix_description_);
|
prefix_description = std::move(prefix_description_);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void SortingStep::scatterByPartitionIfNeeded(QueryPipelineBuilder& pipeline)
|
||||||
|
{
|
||||||
|
size_t threads = pipeline.getNumThreads();
|
||||||
|
size_t streams = pipeline.getNumStreams();
|
||||||
|
|
||||||
|
if (!partition_by_description.empty() && threads > 1)
|
||||||
|
{
|
||||||
|
Block stream_header = pipeline.getHeader();
|
||||||
|
|
||||||
|
ColumnNumbers key_columns;
|
||||||
|
key_columns.reserve(partition_by_description.size());
|
||||||
|
for (auto & col : partition_by_description)
|
||||||
|
{
|
||||||
|
key_columns.push_back(stream_header.getPositionByName(col.column_name));
|
||||||
|
}
|
||||||
|
|
||||||
|
pipeline.transform([&](OutputPortRawPtrs ports)
|
||||||
|
{
|
||||||
|
Processors processors;
|
||||||
|
for (auto * port : ports)
|
||||||
|
{
|
||||||
|
auto scatter = std::make_shared<ScatterByPartitionTransform>(stream_header, threads, key_columns);
|
||||||
|
connect(*port, scatter->getInputs().front());
|
||||||
|
processors.push_back(scatter);
|
||||||
|
}
|
||||||
|
return processors;
|
||||||
|
});
|
||||||
|
|
||||||
|
if (streams > 1)
|
||||||
|
{
|
||||||
|
pipeline.transform([&](OutputPortRawPtrs ports)
|
||||||
|
{
|
||||||
|
Processors processors;
|
||||||
|
for (size_t i = 0; i < threads; ++i)
|
||||||
|
{
|
||||||
|
size_t output_it = i;
|
||||||
|
auto resize = std::make_shared<ResizeProcessor>(stream_header, streams, 1);
|
||||||
|
auto & inputs = resize->getInputs();
|
||||||
|
|
||||||
|
for (auto input_it = inputs.begin(); input_it != inputs.end(); output_it += threads, ++input_it)
|
||||||
|
connect(*ports[output_it], *input_it);
|
||||||
|
processors.push_back(resize);
|
||||||
|
}
|
||||||
|
return processors;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void SortingStep::finishSorting(
|
void SortingStep::finishSorting(
|
||||||
QueryPipelineBuilder & pipeline, const SortDescription & input_sort_desc, const SortDescription & result_sort_desc, const UInt64 limit_)
|
QueryPipelineBuilder & pipeline, const SortDescription & input_sort_desc, const SortDescription & result_sort_desc, const UInt64 limit_)
|
||||||
{
|
{
|
||||||
@ -260,10 +331,12 @@ void SortingStep::fullSortStreams(
|
|||||||
void SortingStep::fullSort(
|
void SortingStep::fullSort(
|
||||||
QueryPipelineBuilder & pipeline, const SortDescription & result_sort_desc, const UInt64 limit_, const bool skip_partial_sort)
|
QueryPipelineBuilder & pipeline, const SortDescription & result_sort_desc, const UInt64 limit_, const bool skip_partial_sort)
|
||||||
{
|
{
|
||||||
|
scatterByPartitionIfNeeded(pipeline);
|
||||||
|
|
||||||
fullSortStreams(pipeline, sort_settings, result_sort_desc, limit_, skip_partial_sort);
|
fullSortStreams(pipeline, sort_settings, result_sort_desc, limit_, skip_partial_sort);
|
||||||
|
|
||||||
/// If there are several streams, then we merge them into one
|
/// If there are several streams, then we merge them into one
|
||||||
if (pipeline.getNumStreams() > 1)
|
if (pipeline.getNumStreams() > 1 && (partition_by_description.empty() || pipeline.getNumThreads() == 1))
|
||||||
{
|
{
|
||||||
auto transform = std::make_shared<MergingSortedTransform>(
|
auto transform = std::make_shared<MergingSortedTransform>(
|
||||||
pipeline.getHeader(),
|
pipeline.getHeader(),
|
||||||
@ -295,6 +368,7 @@ void SortingStep::transformPipeline(QueryPipelineBuilder & pipeline, const Build
|
|||||||
{
|
{
|
||||||
bool need_finish_sorting = (prefix_description.size() < result_description.size());
|
bool need_finish_sorting = (prefix_description.size() < result_description.size());
|
||||||
mergingSorted(pipeline, prefix_description, (need_finish_sorting ? 0 : limit));
|
mergingSorted(pipeline, prefix_description, (need_finish_sorting ? 0 : limit));
|
||||||
|
|
||||||
if (need_finish_sorting)
|
if (need_finish_sorting)
|
||||||
{
|
{
|
||||||
finishSorting(pipeline, prefix_description, result_description, limit);
|
finishSorting(pipeline, prefix_description, result_description, limit);
|
||||||
|
@ -40,6 +40,15 @@ public:
|
|||||||
const Settings & settings_,
|
const Settings & settings_,
|
||||||
bool optimize_sorting_by_input_stream_properties_);
|
bool optimize_sorting_by_input_stream_properties_);
|
||||||
|
|
||||||
|
/// Full with partitioning
|
||||||
|
SortingStep(
|
||||||
|
const DataStream & input_stream,
|
||||||
|
const SortDescription & description_,
|
||||||
|
const SortDescription & partition_by_description_,
|
||||||
|
UInt64 limit_,
|
||||||
|
const Settings & settings_,
|
||||||
|
bool optimize_sorting_by_input_stream_properties_);
|
||||||
|
|
||||||
/// FinishSorting
|
/// FinishSorting
|
||||||
SortingStep(
|
SortingStep(
|
||||||
const DataStream & input_stream_,
|
const DataStream & input_stream_,
|
||||||
@ -83,14 +92,24 @@ public:
|
|||||||
bool skip_partial_sort = false);
|
bool skip_partial_sort = false);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
void scatterByPartitionIfNeeded(QueryPipelineBuilder& pipeline);
|
||||||
void updateOutputStream() override;
|
void updateOutputStream() override;
|
||||||
|
|
||||||
static void
|
static void mergeSorting(
|
||||||
mergeSorting(QueryPipelineBuilder & pipeline, const Settings & sort_settings, const SortDescription & result_sort_desc, UInt64 limit_);
|
QueryPipelineBuilder & pipeline,
|
||||||
|
const Settings & sort_settings,
|
||||||
|
const SortDescription & result_sort_desc,
|
||||||
|
UInt64 limit_);
|
||||||
|
|
||||||
void mergingSorted(QueryPipelineBuilder & pipeline, const SortDescription & result_sort_desc, UInt64 limit_);
|
void mergingSorted(
|
||||||
|
QueryPipelineBuilder & pipeline,
|
||||||
|
const SortDescription & result_sort_desc,
|
||||||
|
UInt64 limit_);
|
||||||
void finishSorting(
|
void finishSorting(
|
||||||
QueryPipelineBuilder & pipeline, const SortDescription & input_sort_desc, const SortDescription & result_sort_desc, UInt64 limit_);
|
QueryPipelineBuilder & pipeline,
|
||||||
|
const SortDescription & input_sort_desc,
|
||||||
|
const SortDescription & result_sort_desc,
|
||||||
|
UInt64 limit_);
|
||||||
void fullSort(
|
void fullSort(
|
||||||
QueryPipelineBuilder & pipeline,
|
QueryPipelineBuilder & pipeline,
|
||||||
const SortDescription & result_sort_desc,
|
const SortDescription & result_sort_desc,
|
||||||
@ -101,6 +120,9 @@ private:
|
|||||||
|
|
||||||
SortDescription prefix_description;
|
SortDescription prefix_description;
|
||||||
const SortDescription result_description;
|
const SortDescription result_description;
|
||||||
|
|
||||||
|
SortDescription partition_by_description;
|
||||||
|
|
||||||
UInt64 limit;
|
UInt64 limit;
|
||||||
bool always_read_till_end = false;
|
bool always_read_till_end = false;
|
||||||
|
|
||||||
|
@ -67,6 +67,7 @@ void WindowStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQ
|
|||||||
// This resize is needed for cases such as `over ()` when we don't have a
|
// This resize is needed for cases such as `over ()` when we don't have a
|
||||||
// sort node, and the input might have multiple streams. The sort node would
|
// sort node, and the input might have multiple streams. The sort node would
|
||||||
// have resized it.
|
// have resized it.
|
||||||
|
if (window_description.full_sort_description.empty())
|
||||||
pipeline.resize(1);
|
pipeline.resize(1);
|
||||||
|
|
||||||
pipeline.addSimpleTransform(
|
pipeline.addSimpleTransform(
|
||||||
|
129
src/Processors/Transforms/ScatterByPartitionTransform.cpp
Normal file
129
src/Processors/Transforms/ScatterByPartitionTransform.cpp
Normal file
@ -0,0 +1,129 @@
|
|||||||
|
#include <Processors/Transforms/ScatterByPartitionTransform.h>
|
||||||
|
|
||||||
|
#include <Common/PODArray.h>
|
||||||
|
#include <Core/ColumnNumbers.h>
|
||||||
|
|
||||||
|
namespace DB
|
||||||
|
{
|
||||||
|
ScatterByPartitionTransform::ScatterByPartitionTransform(Block header, size_t output_size_, ColumnNumbers key_columns_)
|
||||||
|
: IProcessor(InputPorts{header}, OutputPorts{output_size_, header})
|
||||||
|
, output_size(output_size_)
|
||||||
|
, key_columns(std::move(key_columns_))
|
||||||
|
, hash(0)
|
||||||
|
{}
|
||||||
|
|
||||||
|
IProcessor::Status ScatterByPartitionTransform::prepare()
|
||||||
|
{
|
||||||
|
auto & input = getInputs().front();
|
||||||
|
|
||||||
|
/// Check all outputs are finished or ready to get data.
|
||||||
|
|
||||||
|
bool all_finished = true;
|
||||||
|
for (auto & output : outputs)
|
||||||
|
{
|
||||||
|
if (output.isFinished())
|
||||||
|
continue;
|
||||||
|
|
||||||
|
all_finished = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (all_finished)
|
||||||
|
{
|
||||||
|
input.close();
|
||||||
|
return Status::Finished;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!all_outputs_processed)
|
||||||
|
{
|
||||||
|
auto output_it = outputs.begin();
|
||||||
|
bool can_push = false;
|
||||||
|
for (size_t i = 0; i < output_size; ++i, ++output_it)
|
||||||
|
if (!was_output_processed[i] && output_it->canPush())
|
||||||
|
can_push = true;
|
||||||
|
if (!can_push)
|
||||||
|
return Status::PortFull;
|
||||||
|
return Status::Ready;
|
||||||
|
}
|
||||||
|
/// Try get chunk from input.
|
||||||
|
|
||||||
|
if (input.isFinished())
|
||||||
|
{
|
||||||
|
for (auto & output : outputs)
|
||||||
|
output.finish();
|
||||||
|
|
||||||
|
return Status::Finished;
|
||||||
|
}
|
||||||
|
|
||||||
|
input.setNeeded();
|
||||||
|
if (!input.hasData())
|
||||||
|
return Status::NeedData;
|
||||||
|
|
||||||
|
chunk = input.pull();
|
||||||
|
has_data = true;
|
||||||
|
was_output_processed.assign(outputs.size(), false);
|
||||||
|
|
||||||
|
return Status::Ready;
|
||||||
|
}
|
||||||
|
|
||||||
|
void ScatterByPartitionTransform::work()
|
||||||
|
{
|
||||||
|
if (all_outputs_processed)
|
||||||
|
generateOutputChunks();
|
||||||
|
all_outputs_processed = true;
|
||||||
|
|
||||||
|
size_t chunk_number = 0;
|
||||||
|
for (auto & output : outputs)
|
||||||
|
{
|
||||||
|
auto & was_processed = was_output_processed[chunk_number];
|
||||||
|
auto & output_chunk = output_chunks[chunk_number];
|
||||||
|
++chunk_number;
|
||||||
|
|
||||||
|
if (was_processed)
|
||||||
|
continue;
|
||||||
|
|
||||||
|
if (output.isFinished())
|
||||||
|
continue;
|
||||||
|
|
||||||
|
if (!output.canPush())
|
||||||
|
{
|
||||||
|
all_outputs_processed = false;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
output.push(std::move(output_chunk));
|
||||||
|
was_processed = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (all_outputs_processed)
|
||||||
|
{
|
||||||
|
has_data = false;
|
||||||
|
output_chunks.clear();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void ScatterByPartitionTransform::generateOutputChunks()
|
||||||
|
{
|
||||||
|
auto num_rows = chunk.getNumRows();
|
||||||
|
const auto & columns = chunk.getColumns();
|
||||||
|
|
||||||
|
hash.reset(num_rows);
|
||||||
|
|
||||||
|
for (const auto & column_number : key_columns)
|
||||||
|
columns[column_number]->updateWeakHash32(hash);
|
||||||
|
|
||||||
|
const auto & hash_data = hash.getData();
|
||||||
|
IColumn::Selector selector(num_rows);
|
||||||
|
|
||||||
|
for (size_t row = 0; row < num_rows; ++row)
|
||||||
|
selector[row] = hash_data[row] % output_size;
|
||||||
|
|
||||||
|
output_chunks.resize(output_size);
|
||||||
|
for (const auto & column : columns)
|
||||||
|
{
|
||||||
|
auto filtered_columns = column->scatter(output_size, selector);
|
||||||
|
for (size_t i = 0; i < output_size; ++i)
|
||||||
|
output_chunks[i].addColumn(std::move(filtered_columns[i]));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
34
src/Processors/Transforms/ScatterByPartitionTransform.h
Normal file
34
src/Processors/Transforms/ScatterByPartitionTransform.h
Normal file
@ -0,0 +1,34 @@
|
|||||||
|
#pragma once
|
||||||
|
#include <Common/WeakHash.h>
|
||||||
|
#include <Core/ColumnNumbers.h>
|
||||||
|
#include <Processors/IProcessor.h>
|
||||||
|
|
||||||
|
namespace DB
|
||||||
|
{
|
||||||
|
|
||||||
|
struct ScatterByPartitionTransform : IProcessor
|
||||||
|
{
|
||||||
|
ScatterByPartitionTransform(Block header, size_t output_size_, ColumnNumbers key_columns_);
|
||||||
|
|
||||||
|
String getName() const override { return "ScatterByPartitionTransform"; }
|
||||||
|
|
||||||
|
Status prepare() override;
|
||||||
|
void work() override;
|
||||||
|
|
||||||
|
private:
|
||||||
|
|
||||||
|
void generateOutputChunks();
|
||||||
|
|
||||||
|
size_t output_size;
|
||||||
|
ColumnNumbers key_columns;
|
||||||
|
|
||||||
|
bool has_data = false;
|
||||||
|
bool all_outputs_processed = true;
|
||||||
|
std::vector<char> was_output_processed;
|
||||||
|
Chunk chunk;
|
||||||
|
|
||||||
|
WeakHash32 hash;
|
||||||
|
Chunks output_chunks;
|
||||||
|
};
|
||||||
|
|
||||||
|
}
|
@ -22,6 +22,16 @@ select sum(number) over w as x, max(number) over w as y from t_01568 window w as
|
|||||||
21 8
|
21 8
|
||||||
21 8
|
21 8
|
||||||
21 8
|
21 8
|
||||||
|
select sum(number) over w, max(number) over w from t_01568 window w as (partition by p) order by p;
|
||||||
|
3 2
|
||||||
|
3 2
|
||||||
|
3 2
|
||||||
|
12 5
|
||||||
|
12 5
|
||||||
|
12 5
|
||||||
|
21 8
|
||||||
|
21 8
|
||||||
|
21 8
|
||||||
select sum(number) over w as x, max(number) over w as y from remote('127.0.0.{1,2}', '', t_01568) window w as (partition by p) order by x, y;
|
select sum(number) over w as x, max(number) over w as y from remote('127.0.0.{1,2}', '', t_01568) window w as (partition by p) order by x, y;
|
||||||
6 2
|
6 2
|
||||||
6 2
|
6 2
|
||||||
@ -41,6 +51,25 @@ select sum(number) over w as x, max(number) over w as y from remote('127.0.0.{1,
|
|||||||
42 8
|
42 8
|
||||||
42 8
|
42 8
|
||||||
42 8
|
42 8
|
||||||
|
select sum(number) over w as x, max(number) over w as y from remote('127.0.0.{1,2}', '', t_01568) window w as (partition by p) order by x, y SETTINGS max_threads = 1;
|
||||||
|
6 2
|
||||||
|
6 2
|
||||||
|
6 2
|
||||||
|
6 2
|
||||||
|
6 2
|
||||||
|
6 2
|
||||||
|
24 5
|
||||||
|
24 5
|
||||||
|
24 5
|
||||||
|
24 5
|
||||||
|
24 5
|
||||||
|
24 5
|
||||||
|
42 8
|
||||||
|
42 8
|
||||||
|
42 8
|
||||||
|
42 8
|
||||||
|
42 8
|
||||||
|
42 8
|
||||||
select distinct sum(number) over w as x, max(number) over w as y from remote('127.0.0.{1,2}', '', t_01568) window w as (partition by p) order by x, y;
|
select distinct sum(number) over w as x, max(number) over w as y from remote('127.0.0.{1,2}', '', t_01568) window w as (partition by p) order by x, y;
|
||||||
6 2
|
6 2
|
||||||
24 5
|
24 5
|
||||||
|
@ -15,8 +15,12 @@ from numbers(9);
|
|||||||
|
|
||||||
select sum(number) over w as x, max(number) over w as y from t_01568 window w as (partition by p) order by x, y;
|
select sum(number) over w as x, max(number) over w as y from t_01568 window w as (partition by p) order by x, y;
|
||||||
|
|
||||||
|
select sum(number) over w, max(number) over w from t_01568 window w as (partition by p) order by p;
|
||||||
|
|
||||||
select sum(number) over w as x, max(number) over w as y from remote('127.0.0.{1,2}', '', t_01568) window w as (partition by p) order by x, y;
|
select sum(number) over w as x, max(number) over w as y from remote('127.0.0.{1,2}', '', t_01568) window w as (partition by p) order by x, y;
|
||||||
|
|
||||||
|
select sum(number) over w as x, max(number) over w as y from remote('127.0.0.{1,2}', '', t_01568) window w as (partition by p) order by x, y SETTINGS max_threads = 1;
|
||||||
|
|
||||||
select distinct sum(number) over w as x, max(number) over w as y from remote('127.0.0.{1,2}', '', t_01568) window w as (partition by p) order by x, y;
|
select distinct sum(number) over w as x, max(number) over w as y from remote('127.0.0.{1,2}', '', t_01568) window w as (partition by p) order by x, y;
|
||||||
|
|
||||||
-- window functions + aggregation w/shards
|
-- window functions + aggregation w/shards
|
||||||
|
@ -0,0 +1,100 @@
|
|||||||
|
1
|
||||||
|
-- { echoOn }
|
||||||
|
|
||||||
|
SELECT
|
||||||
|
nw,
|
||||||
|
sum(WR) AS R,
|
||||||
|
sumIf(WR, uniq_rows = 1) AS UNR
|
||||||
|
FROM
|
||||||
|
(
|
||||||
|
SELECT
|
||||||
|
uniq(nw) OVER (PARTITION BY ac) AS uniq_rows,
|
||||||
|
AVG(wg) AS WR,
|
||||||
|
ac,
|
||||||
|
nw
|
||||||
|
FROM window_funtion_threading
|
||||||
|
GROUP BY ac, nw
|
||||||
|
)
|
||||||
|
GROUP BY nw
|
||||||
|
ORDER BY nw ASC, R DESC
|
||||||
|
LIMIT 10;
|
||||||
|
0 2 0
|
||||||
|
1 2 0
|
||||||
|
2 2 0
|
||||||
|
SELECT
|
||||||
|
nw,
|
||||||
|
sum(WR) AS R,
|
||||||
|
sumIf(WR, uniq_rows = 1) AS UNR
|
||||||
|
FROM
|
||||||
|
(
|
||||||
|
SELECT
|
||||||
|
uniq(nw) OVER (PARTITION BY ac) AS uniq_rows,
|
||||||
|
AVG(wg) AS WR,
|
||||||
|
ac,
|
||||||
|
nw
|
||||||
|
FROM window_funtion_threading
|
||||||
|
GROUP BY ac, nw
|
||||||
|
)
|
||||||
|
GROUP BY nw
|
||||||
|
ORDER BY nw ASC, R DESC
|
||||||
|
LIMIT 10
|
||||||
|
SETTINGS max_threads = 1;
|
||||||
|
0 2 0
|
||||||
|
1 2 0
|
||||||
|
2 2 0
|
||||||
|
SELECT
|
||||||
|
nw,
|
||||||
|
sum(WR) AS R,
|
||||||
|
sumIf(WR, uniq_rows = 1) AS UNR
|
||||||
|
FROM
|
||||||
|
(
|
||||||
|
SELECT
|
||||||
|
uniq(nw) OVER (PARTITION BY ac) AS uniq_rows,
|
||||||
|
AVG(wg) AS WR,
|
||||||
|
ac,
|
||||||
|
nw
|
||||||
|
FROM window_funtion_threading
|
||||||
|
WHERE (ac % 4) = 0
|
||||||
|
GROUP BY
|
||||||
|
ac,
|
||||||
|
nw
|
||||||
|
UNION ALL
|
||||||
|
SELECT
|
||||||
|
uniq(nw) OVER (PARTITION BY ac) AS uniq_rows,
|
||||||
|
AVG(wg) AS WR,
|
||||||
|
ac,
|
||||||
|
nw
|
||||||
|
FROM window_funtion_threading
|
||||||
|
WHERE (ac % 4) = 1
|
||||||
|
GROUP BY
|
||||||
|
ac,
|
||||||
|
nw
|
||||||
|
UNION ALL
|
||||||
|
SELECT
|
||||||
|
uniq(nw) OVER (PARTITION BY ac) AS uniq_rows,
|
||||||
|
AVG(wg) AS WR,
|
||||||
|
ac,
|
||||||
|
nw
|
||||||
|
FROM window_funtion_threading
|
||||||
|
WHERE (ac % 4) = 2
|
||||||
|
GROUP BY
|
||||||
|
ac,
|
||||||
|
nw
|
||||||
|
UNION ALL
|
||||||
|
SELECT
|
||||||
|
uniq(nw) OVER (PARTITION BY ac) AS uniq_rows,
|
||||||
|
AVG(wg) AS WR,
|
||||||
|
ac,
|
||||||
|
nw
|
||||||
|
FROM window_funtion_threading
|
||||||
|
WHERE (ac % 4) = 3
|
||||||
|
GROUP BY
|
||||||
|
ac,
|
||||||
|
nw
|
||||||
|
)
|
||||||
|
GROUP BY nw
|
||||||
|
ORDER BY nw ASC, R DESC
|
||||||
|
LIMIT 10;
|
||||||
|
0 2 0
|
||||||
|
1 2 0
|
||||||
|
2 2 0
|
119
tests/queries/0_stateless/02884_parallel_window_functions.sql
Normal file
119
tests/queries/0_stateless/02884_parallel_window_functions.sql
Normal file
@ -0,0 +1,119 @@
|
|||||||
|
CREATE TABLE window_funtion_threading
|
||||||
|
Engine = MergeTree
|
||||||
|
ORDER BY (ac, nw)
|
||||||
|
AS SELECT
|
||||||
|
toUInt64(toFloat32(number % 2) % 20000000) as ac,
|
||||||
|
toFloat32(1) as wg,
|
||||||
|
toUInt16(toFloat32(number % 3) % 400) as nw
|
||||||
|
FROM numbers_mt(10000000);
|
||||||
|
|
||||||
|
SELECT count() FROM (EXPLAIN PIPELINE SELECT
|
||||||
|
nw,
|
||||||
|
sum(WR) AS R,
|
||||||
|
sumIf(WR, uniq_rows = 1) AS UNR
|
||||||
|
FROM
|
||||||
|
(
|
||||||
|
SELECT
|
||||||
|
uniq(nw) OVER (PARTITION BY ac) AS uniq_rows,
|
||||||
|
AVG(wg) AS WR,
|
||||||
|
ac,
|
||||||
|
nw
|
||||||
|
FROM window_funtion_threading
|
||||||
|
GROUP BY ac, nw
|
||||||
|
)
|
||||||
|
GROUP BY nw
|
||||||
|
ORDER BY nw ASC, R DESC
|
||||||
|
LIMIT 10) where explain ilike '%ScatterByPartitionTransform%' SETTINGS max_threads = 4;
|
||||||
|
|
||||||
|
-- { echoOn }
|
||||||
|
|
||||||
|
SELECT
|
||||||
|
nw,
|
||||||
|
sum(WR) AS R,
|
||||||
|
sumIf(WR, uniq_rows = 1) AS UNR
|
||||||
|
FROM
|
||||||
|
(
|
||||||
|
SELECT
|
||||||
|
uniq(nw) OVER (PARTITION BY ac) AS uniq_rows,
|
||||||
|
AVG(wg) AS WR,
|
||||||
|
ac,
|
||||||
|
nw
|
||||||
|
FROM window_funtion_threading
|
||||||
|
GROUP BY ac, nw
|
||||||
|
)
|
||||||
|
GROUP BY nw
|
||||||
|
ORDER BY nw ASC, R DESC
|
||||||
|
LIMIT 10;
|
||||||
|
|
||||||
|
SELECT
|
||||||
|
nw,
|
||||||
|
sum(WR) AS R,
|
||||||
|
sumIf(WR, uniq_rows = 1) AS UNR
|
||||||
|
FROM
|
||||||
|
(
|
||||||
|
SELECT
|
||||||
|
uniq(nw) OVER (PARTITION BY ac) AS uniq_rows,
|
||||||
|
AVG(wg) AS WR,
|
||||||
|
ac,
|
||||||
|
nw
|
||||||
|
FROM window_funtion_threading
|
||||||
|
GROUP BY ac, nw
|
||||||
|
)
|
||||||
|
GROUP BY nw
|
||||||
|
ORDER BY nw ASC, R DESC
|
||||||
|
LIMIT 10
|
||||||
|
SETTINGS max_threads = 1;
|
||||||
|
|
||||||
|
SELECT
|
||||||
|
nw,
|
||||||
|
sum(WR) AS R,
|
||||||
|
sumIf(WR, uniq_rows = 1) AS UNR
|
||||||
|
FROM
|
||||||
|
(
|
||||||
|
SELECT
|
||||||
|
uniq(nw) OVER (PARTITION BY ac) AS uniq_rows,
|
||||||
|
AVG(wg) AS WR,
|
||||||
|
ac,
|
||||||
|
nw
|
||||||
|
FROM window_funtion_threading
|
||||||
|
WHERE (ac % 4) = 0
|
||||||
|
GROUP BY
|
||||||
|
ac,
|
||||||
|
nw
|
||||||
|
UNION ALL
|
||||||
|
SELECT
|
||||||
|
uniq(nw) OVER (PARTITION BY ac) AS uniq_rows,
|
||||||
|
AVG(wg) AS WR,
|
||||||
|
ac,
|
||||||
|
nw
|
||||||
|
FROM window_funtion_threading
|
||||||
|
WHERE (ac % 4) = 1
|
||||||
|
GROUP BY
|
||||||
|
ac,
|
||||||
|
nw
|
||||||
|
UNION ALL
|
||||||
|
SELECT
|
||||||
|
uniq(nw) OVER (PARTITION BY ac) AS uniq_rows,
|
||||||
|
AVG(wg) AS WR,
|
||||||
|
ac,
|
||||||
|
nw
|
||||||
|
FROM window_funtion_threading
|
||||||
|
WHERE (ac % 4) = 2
|
||||||
|
GROUP BY
|
||||||
|
ac,
|
||||||
|
nw
|
||||||
|
UNION ALL
|
||||||
|
SELECT
|
||||||
|
uniq(nw) OVER (PARTITION BY ac) AS uniq_rows,
|
||||||
|
AVG(wg) AS WR,
|
||||||
|
ac,
|
||||||
|
nw
|
||||||
|
FROM window_funtion_threading
|
||||||
|
WHERE (ac % 4) = 3
|
||||||
|
GROUP BY
|
||||||
|
ac,
|
||||||
|
nw
|
||||||
|
)
|
||||||
|
GROUP BY nw
|
||||||
|
ORDER BY nw ASC, R DESC
|
||||||
|
LIMIT 10;
|
@ -1,216 +0,0 @@
|
|||||||
1 901 19
|
|
||||||
1 911 19
|
|
||||||
1 921 19
|
|
||||||
1 931 19
|
|
||||||
1 941 19
|
|
||||||
1 951 20
|
|
||||||
1 961 20
|
|
||||||
1 971 20
|
|
||||||
1 981 20
|
|
||||||
1 991 20
|
|
||||||
2 902 19
|
|
||||||
2 912 19
|
|
||||||
2 922 19
|
|
||||||
2 932 19
|
|
||||||
2 942 19
|
|
||||||
2 952 20
|
|
||||||
2 962 20
|
|
||||||
2 972 20
|
|
||||||
2 982 20
|
|
||||||
2 992 20
|
|
||||||
3 903 19
|
|
||||||
3 913 19
|
|
||||||
3 923 19
|
|
||||||
3 933 19
|
|
||||||
3 943 19
|
|
||||||
3 953 20
|
|
||||||
3 963 20
|
|
||||||
3 973 20
|
|
||||||
3 983 20
|
|
||||||
3 993 20
|
|
||||||
4 904 19
|
|
||||||
4 914 19
|
|
||||||
4 924 19
|
|
||||||
4 934 19
|
|
||||||
4 944 19
|
|
||||||
4 954 20
|
|
||||||
4 964 20
|
|
||||||
4 974 20
|
|
||||||
4 984 20
|
|
||||||
4 994 20
|
|
||||||
5 905 19
|
|
||||||
5 915 19
|
|
||||||
5 925 19
|
|
||||||
5 935 19
|
|
||||||
5 945 19
|
|
||||||
5 955 20
|
|
||||||
5 965 20
|
|
||||||
5 975 20
|
|
||||||
5 985 20
|
|
||||||
5 995 20
|
|
||||||
6 906 19
|
|
||||||
6 916 19
|
|
||||||
6 926 19
|
|
||||||
6 936 19
|
|
||||||
6 946 19
|
|
||||||
6 956 20
|
|
||||||
6 966 20
|
|
||||||
6 976 20
|
|
||||||
6 986 20
|
|
||||||
6 996 20
|
|
||||||
7 907 19
|
|
||||||
7 917 19
|
|
||||||
7 927 19
|
|
||||||
7 937 19
|
|
||||||
7 947 19
|
|
||||||
7 957 20
|
|
||||||
7 967 20
|
|
||||||
7 977 20
|
|
||||||
7 987 20
|
|
||||||
7 997 20
|
|
||||||
8 908 19
|
|
||||||
8 918 19
|
|
||||||
8 928 19
|
|
||||||
8 938 19
|
|
||||||
8 948 19
|
|
||||||
8 958 20
|
|
||||||
8 968 20
|
|
||||||
8 978 20
|
|
||||||
8 988 20
|
|
||||||
8 998 20
|
|
||||||
9 909 19
|
|
||||||
9 919 19
|
|
||||||
9 929 19
|
|
||||||
9 939 19
|
|
||||||
9 949 19
|
|
||||||
9 959 20
|
|
||||||
9 969 20
|
|
||||||
9 979 20
|
|
||||||
9 989 20
|
|
||||||
9 999 20
|
|
||||||
1 1301 19
|
|
||||||
1 1311 19
|
|
||||||
1 1321 19
|
|
||||||
1 1331 19
|
|
||||||
1 1341 19
|
|
||||||
1 1351 19
|
|
||||||
1 1361 19
|
|
||||||
1 1371 20
|
|
||||||
1 1381 20
|
|
||||||
1 1391 20
|
|
||||||
1 1401 20
|
|
||||||
1 1411 20
|
|
||||||
1 1421 20
|
|
||||||
1 1431 20
|
|
||||||
2 1302 19
|
|
||||||
2 1312 19
|
|
||||||
2 1322 19
|
|
||||||
2 1332 19
|
|
||||||
2 1342 19
|
|
||||||
2 1352 19
|
|
||||||
2 1362 19
|
|
||||||
2 1372 20
|
|
||||||
2 1382 20
|
|
||||||
2 1392 20
|
|
||||||
2 1402 20
|
|
||||||
2 1412 20
|
|
||||||
2 1422 20
|
|
||||||
2 1432 20
|
|
||||||
3 1303 19
|
|
||||||
3 1313 19
|
|
||||||
3 1323 19
|
|
||||||
3 1333 19
|
|
||||||
3 1343 19
|
|
||||||
3 1353 19
|
|
||||||
3 1363 19
|
|
||||||
3 1373 20
|
|
||||||
3 1383 20
|
|
||||||
3 1393 20
|
|
||||||
3 1403 20
|
|
||||||
3 1413 20
|
|
||||||
3 1423 20
|
|
||||||
3 1433 20
|
|
||||||
4 1304 19
|
|
||||||
4 1314 19
|
|
||||||
4 1324 19
|
|
||||||
4 1334 19
|
|
||||||
4 1344 19
|
|
||||||
4 1354 19
|
|
||||||
4 1364 19
|
|
||||||
4 1374 20
|
|
||||||
4 1384 20
|
|
||||||
4 1394 20
|
|
||||||
4 1404 20
|
|
||||||
4 1414 20
|
|
||||||
4 1424 20
|
|
||||||
4 1434 20
|
|
||||||
5 1305 19
|
|
||||||
5 1315 19
|
|
||||||
5 1325 19
|
|
||||||
5 1335 19
|
|
||||||
5 1345 19
|
|
||||||
5 1355 19
|
|
||||||
5 1365 19
|
|
||||||
5 1375 20
|
|
||||||
5 1385 20
|
|
||||||
5 1395 20
|
|
||||||
5 1405 20
|
|
||||||
5 1415 20
|
|
||||||
5 1425 20
|
|
||||||
5 1435 20
|
|
||||||
6 1306 19
|
|
||||||
6 1316 19
|
|
||||||
6 1326 19
|
|
||||||
6 1336 19
|
|
||||||
6 1346 19
|
|
||||||
6 1356 19
|
|
||||||
6 1366 19
|
|
||||||
6 1376 20
|
|
||||||
6 1386 20
|
|
||||||
6 1396 20
|
|
||||||
6 1406 20
|
|
||||||
6 1416 20
|
|
||||||
6 1426 20
|
|
||||||
6 1436 20
|
|
||||||
7 1307 19
|
|
||||||
7 1317 19
|
|
||||||
7 1327 19
|
|
||||||
7 1337 19
|
|
||||||
7 1347 19
|
|
||||||
7 1357 19
|
|
||||||
7 1367 19
|
|
||||||
7 1377 20
|
|
||||||
7 1387 20
|
|
||||||
7 1397 20
|
|
||||||
7 1407 20
|
|
||||||
7 1417 20
|
|
||||||
7 1427 20
|
|
||||||
7 1437 20
|
|
||||||
8 1308 19
|
|
||||||
8 1318 19
|
|
||||||
8 1328 19
|
|
||||||
8 1338 19
|
|
||||||
8 1348 19
|
|
||||||
8 1358 19
|
|
||||||
8 1368 19
|
|
||||||
8 1378 20
|
|
||||||
8 1388 20
|
|
||||||
8 1398 20
|
|
||||||
8 1408 20
|
|
||||||
8 1418 20
|
|
||||||
8 1428 20
|
|
||||||
8 1438 20
|
|
||||||
9 1309 19
|
|
||||||
9 1319 19
|
|
||||||
9 1329 19
|
|
||||||
9 1339 19
|
|
||||||
9 1349 19
|
|
||||||
9 1359 19
|
|
||||||
9 1369 19
|
|
||||||
9 1379 20
|
|
||||||
9 1389 20
|
|
||||||
9 1399 20
|
|
||||||
9 1409 20
|
|
||||||
9 1419 20
|
|
||||||
9 1429 20
|
|
||||||
9 1439 20
|
|
@ -1,158 +0,0 @@
|
|||||||
DROP TABLE IF EXISTS posts;
|
|
||||||
DROP TABLE IF EXISTS post_metrics;
|
|
||||||
|
|
||||||
CREATE TABLE IF NOT EXISTS posts
|
|
||||||
(
|
|
||||||
`page_id` LowCardinality(String),
|
|
||||||
`post_id` String CODEC(LZ4),
|
|
||||||
`host_id` UInt32 CODEC(T64, LZ4),
|
|
||||||
`path_id` UInt32,
|
|
||||||
`created` DateTime CODEC(T64, LZ4),
|
|
||||||
`as_of` DateTime CODEC(T64, LZ4)
|
|
||||||
)
|
|
||||||
ENGINE = ReplacingMergeTree(as_of)
|
|
||||||
PARTITION BY toStartOfMonth(created)
|
|
||||||
ORDER BY (page_id, post_id)
|
|
||||||
TTL created + toIntervalMonth(26);
|
|
||||||
|
|
||||||
|
|
||||||
INSERT INTO posts SELECT
|
|
||||||
repeat('a', (number % 10) + 1),
|
|
||||||
toString(number),
|
|
||||||
number % 10,
|
|
||||||
number,
|
|
||||||
now() - toIntervalMinute(number),
|
|
||||||
now()
|
|
||||||
FROM numbers(1000);
|
|
||||||
|
|
||||||
|
|
||||||
CREATE TABLE IF NOT EXISTS post_metrics
|
|
||||||
(
|
|
||||||
`page_id` LowCardinality(String),
|
|
||||||
`post_id` String CODEC(LZ4),
|
|
||||||
`created` DateTime CODEC(T64, LZ4),
|
|
||||||
`impressions` UInt32 CODEC(T64, LZ4),
|
|
||||||
`clicks` UInt32 CODEC(T64, LZ4),
|
|
||||||
`as_of` DateTime CODEC(T64, LZ4)
|
|
||||||
)
|
|
||||||
ENGINE = ReplacingMergeTree(as_of)
|
|
||||||
PARTITION BY toStartOfMonth(created)
|
|
||||||
ORDER BY (page_id, post_id)
|
|
||||||
TTL created + toIntervalMonth(26);
|
|
||||||
|
|
||||||
|
|
||||||
INSERT INTO post_metrics SELECT
|
|
||||||
repeat('a', (number % 10) + 1),
|
|
||||||
toString(number),
|
|
||||||
now() - toIntervalMinute(number),
|
|
||||||
number * 100,
|
|
||||||
number * 10,
|
|
||||||
now()
|
|
||||||
FROM numbers(1000);
|
|
||||||
|
|
||||||
|
|
||||||
SELECT
|
|
||||||
host_id,
|
|
||||||
path_id,
|
|
||||||
max(rank) AS rank
|
|
||||||
FROM
|
|
||||||
(
|
|
||||||
WITH
|
|
||||||
as_of_posts AS
|
|
||||||
(
|
|
||||||
SELECT
|
|
||||||
*,
|
|
||||||
row_number() OVER (PARTITION BY (page_id, post_id) ORDER BY as_of DESC) AS row_num
|
|
||||||
FROM posts
|
|
||||||
WHERE (created >= subtractHours(now(), 24)) AND (host_id > 0)
|
|
||||||
),
|
|
||||||
as_of_post_metrics AS
|
|
||||||
(
|
|
||||||
SELECT
|
|
||||||
*,
|
|
||||||
row_number() OVER (PARTITION BY (page_id, post_id) ORDER BY as_of DESC) AS row_num
|
|
||||||
FROM post_metrics
|
|
||||||
WHERE created >= subtractHours(now(), 24)
|
|
||||||
)
|
|
||||||
SELECT
|
|
||||||
page_id,
|
|
||||||
post_id,
|
|
||||||
host_id,
|
|
||||||
path_id,
|
|
||||||
impressions,
|
|
||||||
clicks,
|
|
||||||
ntile(20) OVER (PARTITION BY page_id ORDER BY clicks ASC ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS rank
|
|
||||||
FROM as_of_posts
|
|
||||||
GLOBAL LEFT JOIN as_of_post_metrics USING (page_id, post_id, row_num)
|
|
||||||
WHERE (row_num = 1) AND (impressions > 0)
|
|
||||||
) AS t
|
|
||||||
WHERE t.rank > 18
|
|
||||||
GROUP BY
|
|
||||||
host_id,
|
|
||||||
path_id
|
|
||||||
ORDER BY host_id, path_id;
|
|
||||||
|
|
||||||
|
|
||||||
INSERT INTO posts SELECT
|
|
||||||
repeat('a', (number % 10) + 1),
|
|
||||||
toString(number),
|
|
||||||
number % 10,
|
|
||||||
number,
|
|
||||||
now() - toIntervalMinute(number),
|
|
||||||
now()
|
|
||||||
FROM numbers(100000);
|
|
||||||
|
|
||||||
|
|
||||||
INSERT INTO post_metrics SELECT
|
|
||||||
repeat('a', (number % 10) + 1),
|
|
||||||
toString(number),
|
|
||||||
now() - toIntervalMinute(number),
|
|
||||||
number * 100,
|
|
||||||
number * 10,
|
|
||||||
now()
|
|
||||||
FROM numbers(100000);
|
|
||||||
|
|
||||||
|
|
||||||
SELECT
|
|
||||||
host_id,
|
|
||||||
path_id,
|
|
||||||
max(rank) AS rank
|
|
||||||
FROM
|
|
||||||
(
|
|
||||||
WITH
|
|
||||||
as_of_posts AS
|
|
||||||
(
|
|
||||||
SELECT
|
|
||||||
*,
|
|
||||||
row_number() OVER (PARTITION BY (page_id, post_id) ORDER BY as_of DESC) AS row_num
|
|
||||||
FROM posts
|
|
||||||
WHERE (created >= subtractHours(now(), 24)) AND (host_id > 0)
|
|
||||||
),
|
|
||||||
as_of_post_metrics AS
|
|
||||||
(
|
|
||||||
SELECT
|
|
||||||
*,
|
|
||||||
row_number() OVER (PARTITION BY (page_id, post_id) ORDER BY as_of DESC) AS row_num
|
|
||||||
FROM post_metrics
|
|
||||||
WHERE created >= subtractHours(now(), 24)
|
|
||||||
)
|
|
||||||
SELECT
|
|
||||||
page_id,
|
|
||||||
post_id,
|
|
||||||
host_id,
|
|
||||||
path_id,
|
|
||||||
impressions,
|
|
||||||
clicks,
|
|
||||||
ntile(20) OVER (PARTITION BY page_id ORDER BY clicks ASC ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS rank
|
|
||||||
FROM as_of_posts
|
|
||||||
GLOBAL LEFT JOIN as_of_post_metrics USING (page_id, post_id, row_num)
|
|
||||||
WHERE (row_num = 1) AND (impressions > 0)
|
|
||||||
) AS t
|
|
||||||
WHERE t.rank > 18
|
|
||||||
GROUP BY
|
|
||||||
host_id,
|
|
||||||
path_id
|
|
||||||
ORDER BY host_id, path_id;
|
|
||||||
|
|
||||||
DROP TABLE posts;
|
|
||||||
DROP TABLE post_metrics;
|
|
Loading…
Reference in New Issue
Block a user