Merge pull request #54564 from vitlibar/fix-sorting-of-union-of-sorted

Fix sorting of UNION ALL of already sorted results
This commit is contained in:
Igor Nikonov 2023-09-21 22:49:53 +02:00 committed by GitHub
commit b1cc698477
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 142 additions and 44 deletions

View File

@ -16,6 +16,11 @@ using Processors = std::vector<ProcessorPtr>;
namespace JSONBuilder { class JSONMap; }
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
}
/// Description of data stream.
/// Single logical data stream may relate to many ports of pipeline.
class DataStream
@ -107,7 +112,30 @@ public:
/// Append extra processors for this step.
void appendExtraProcessors(const Processors & extra_processors);
/// Updates the input streams of the given step. Used during query plan optimizations.
/// It won't do any validation of new streams, so it is your responsibility to ensure that this update doesn't break anything
/// (e.g. you update data stream traits or correctly remove / add columns).
void updateInputStreams(DataStreams input_streams_)
{
chassert(canUpdateInputStream());
input_streams = std::move(input_streams_);
updateOutputStream();
}
void updateInputStream(DataStream input_stream) { updateInputStreams(DataStreams{input_stream}); }
void updateInputStream(DataStream input_stream, size_t idx)
{
chassert(canUpdateInputStream() && idx < input_streams.size());
input_streams[idx] = input_stream;
updateOutputStream();
}
virtual bool canUpdateInputStream() const { return false; }
protected:
virtual void updateOutputStream() { throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Not implemented"); }
DataStreams input_streams;
std::optional<DataStream> output_stream;

View File

@ -55,17 +55,6 @@ public:
const TransformTraits & getTransformTraits() const { return transform_traits; }
const DataStreamTraits & getDataStreamTraits() const { return data_stream_traits; }
/// Updates the input stream of the given step. Used during query plan optimizations.
/// It won't do any validation of a new stream, so it is your responsibility to ensure that this update doesn't break anything
/// (e.g. you update data stream traits or correctly remove / add columns).
void updateInputStream(DataStream input_stream)
{
input_streams.clear();
input_streams.emplace_back(std::move(input_stream));
updateOutputStream();
}
void describePipeline(FormatSettings & settings) const override;
/// Enforcement is supposed to be done through the special settings that will be taken into account by remote nodes during query planning (e.g. force_aggregation_in_order).
@ -75,6 +64,8 @@ public:
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Not implemented");
}
bool canUpdateInputStream() const override { return true; }
protected:
/// Create output stream from header and traits.
static DataStream createOutputStream(
@ -85,8 +76,6 @@ protected:
TransformTraits transform_traits;
private:
virtual void updateOutputStream() = 0;
/// If we should collect processors got after pipeline transformation.
bool collect_processors;

View File

@ -24,11 +24,7 @@ JoinStep::JoinStep(
bool keep_left_read_in_order_)
: join(std::move(join_)), max_block_size(max_block_size_), max_streams(max_streams_), keep_left_read_in_order(keep_left_read_in_order_)
{
input_streams = {left_stream_, right_stream_};
output_stream = DataStream
{
.header = JoiningTransform::transformHeader(left_stream_.header, join),
};
updateInputStreams(DataStreams{left_stream_, right_stream_});
}
QueryPipelineBuilderPtr JoinStep::updatePipeline(QueryPipelineBuilders pipelines, const BuildQueryPipelineSettings &)
@ -95,21 +91,13 @@ void JoinStep::describeActions(JSONBuilder::JSONMap & map) const
map.add("Clauses", table_join.formatClauses(table_join.getClauses(), true /*short_format*/));
}
void JoinStep::updateInputStream(const DataStream & new_input_stream_, size_t idx)
void JoinStep::updateOutputStream()
{
if (idx == 0)
{
input_streams = {new_input_stream_, input_streams.at(1)};
output_stream = DataStream
{
.header = JoiningTransform::transformHeader(new_input_stream_.header, join),
.header = JoiningTransform::transformHeader(input_streams[0].header, join),
};
}
else
{
input_streams = {input_streams.at(0), new_input_stream_};
}
}
static ITransformingStep::Traits getStorageJoinTraits()
{

View File

@ -33,9 +33,11 @@ public:
const JoinPtr & getJoin() const { return join; }
bool allowPushDownToRight() const;
void updateInputStream(const DataStream & new_input_stream_, size_t idx);
bool canUpdateInputStream() const override { return true; }
private:
void updateOutputStream() override;
JoinPtr join;
size_t max_block_size;
size_t max_streams;

View File

@ -460,16 +460,24 @@ static void updateDataStreams(QueryPlan::Node & root)
static bool visitTopDownImpl(QueryPlan::Node * /*current_node*/, QueryPlan::Node * /*parent_node*/) { return true; }
static void visitBottomUpImpl(QueryPlan::Node * current_node, QueryPlan::Node * parent_node)
static void visitBottomUpImpl(QueryPlan::Node * current_node, QueryPlan::Node * /*parent_node*/)
{
if (!parent_node || parent_node->children.size() != 1)
auto & current_step = *current_node->step;
if (!current_step.canUpdateInputStream() || current_node->children.empty())
return;
if (!current_node->step->hasOutputStream())
for (const auto * child : current_node->children)
{
if (!child->step->hasOutputStream())
return;
}
if (auto * parent_transform_step = dynamic_cast<ITransformingStep *>(parent_node->step.get()); parent_transform_step)
parent_transform_step->updateInputStream(current_node->step->getOutputStream());
DataStreams streams;
streams.reserve(current_node->children.size());
for (const auto * child : current_node->children)
streams.emplace_back(child->step->getOutputStream());
current_step.updateInputStreams(std::move(streams));
}
};

View File

@ -30,18 +30,16 @@ UnionStep::UnionStep(DataStreams input_streams_, size_t max_threads_)
: header(checkHeaders(input_streams_))
, max_threads(max_threads_)
{
input_streams = std::move(input_streams_);
updateInputStreams(std::move(input_streams_));
}
void UnionStep::updateOutputStream()
{
if (input_streams.size() == 1)
output_stream = input_streams.front();
else
output_stream = DataStream{.header = header};
updateOutputSortDescription();
}
void UnionStep::updateOutputSortDescription()
{
SortDescription common_sort_description = input_streams.front().sort_description;
DataStream::SortScope sort_scope = input_streams.front().sort_scope;
for (const auto & input_stream : input_streams)

View File

@ -19,9 +19,11 @@ public:
size_t getMaxThreads() const { return max_threads; }
void updateOutputSortDescription();
bool canUpdateInputStream() const override { return true; }
private:
void updateOutputStream() override;
Block header;
size_t max_threads;
};

View File

@ -0,0 +1,63 @@
1..20:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
20..1:
20
19
18
17
16
15
14
13
12
11
10
9
8
7
6
5
4
3
2
1
20..1:
20
19
18
17
16
15
14
13
12
11
10
9
8
7
6
5
4
3
2
1

View File

@ -0,0 +1,20 @@
DROP TABLE IF EXISTS table1;
DROP TABLE IF EXISTS table2;
CREATE TABLE table1 (number UInt64) ENGINE=MergeTree ORDER BY tuple();
CREATE TABLE table2 (number UInt64) ENGINE=MergeTree ORDER BY tuple();
INSERT INTO table1 SELECT number FROM numbers_mt(1, 10);
INSERT INTO table2 SELECT number FROM numbers_mt(11, 10);
SELECT '1..20:';
SELECT * FROM ((SELECT * FROM table1 ORDER BY number) UNION ALL (SELECT * FROM table2 ORDER BY number)) ORDER BY number;
SELECT '20..1:';
SELECT * FROM ((SELECT * FROM table1 ORDER BY number) UNION ALL (SELECT * FROM table2 ORDER BY number)) ORDER BY number DESC;
SELECT '20..1:';
SELECT * FROM ((SELECT * FROM table1 ORDER BY number DESC) UNION ALL (SELECT * FROM table2 ORDER BY number DESC)) ORDER BY number DESC;
DROP TABLE table1;
DROP TABLE table2;