mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-23 16:12:01 +00:00
fix crash in RIGHT or FULL JOIN switch
This commit is contained in:
parent
e38b537017
commit
28afbafa08
@ -1142,8 +1142,8 @@ ExpressionAnalysisResult::ExpressionAnalysisResult(
|
||||
if (hasJoin())
|
||||
{
|
||||
/// You may find it strange but we support read_in_order for HashJoin and do not support for MergeJoin.
|
||||
bool has_delayed_stream = query_analyzer.analyzedJoin().needStreamWithNonJoinedRows();
|
||||
join_allow_read_in_order = typeid_cast<HashJoin *>(join.get()) && !has_delayed_stream;
|
||||
join_has_delayed_stream = query_analyzer.analyzedJoin().needStreamWithNonJoinedRows();
|
||||
join_allow_read_in_order = typeid_cast<HashJoin *>(join.get()) && !join_has_delayed_stream;
|
||||
}
|
||||
|
||||
optimize_read_in_order =
|
||||
|
@ -176,6 +176,7 @@ struct ExpressionAnalysisResult
|
||||
bool remove_where_filter = false;
|
||||
bool optimize_read_in_order = false;
|
||||
bool optimize_aggregation_in_order = false;
|
||||
bool join_has_delayed_stream = false;
|
||||
|
||||
ExpressionActionsPtr before_array_join;
|
||||
ArrayJoinActionPtr array_join;
|
||||
|
@ -25,7 +25,7 @@
|
||||
#include <Interpreters/JoinToSubqueryTransformVisitor.h>
|
||||
#include <Interpreters/CrossToInnerJoinVisitor.h>
|
||||
#include <Interpreters/TableJoin.h>
|
||||
#include <Interpreters/HashJoin.h>
|
||||
#include <Interpreters/JoinSwitcher.h>
|
||||
#include <Interpreters/JoinedTables.h>
|
||||
#include <Interpreters/QueryAliasesVisitor.h>
|
||||
|
||||
@ -925,8 +925,9 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, const BlockInpu
|
||||
join_step->setStepDescription("JOIN");
|
||||
query_plan.addStep(std::move(join_step));
|
||||
|
||||
if (auto stream = join->createStreamWithNonJoinedRows(join_result_sample, settings.max_block_size))
|
||||
if (expressions.join_has_delayed_stream)
|
||||
{
|
||||
auto stream = std::make_shared<LazyNonJoinedBlockInputStream>(*join, join_result_sample, settings.max_block_size);
|
||||
auto source = std::make_shared<SourceFromInputStream>(std::move(stream));
|
||||
auto add_non_joined_rows_step = std::make_unique<AddingDelayedSourceStep>(
|
||||
query_plan.getCurrentDataStream(), std::move(source));
|
||||
|
@ -5,6 +5,7 @@
|
||||
#include <Core/Block.h>
|
||||
#include <Interpreters/IJoin.h>
|
||||
#include <Interpreters/TableJoin.h>
|
||||
#include <DataStreams/OneBlockInputStream.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -75,4 +76,38 @@ private:
|
||||
void switchJoin();
|
||||
};
|
||||
|
||||
|
||||
/// Creates NonJoinedBlockInputStream on the first read. Allows to swap join algo before it.
|
||||
class LazyNonJoinedBlockInputStream : public IBlockInputStream
|
||||
{
|
||||
public:
|
||||
LazyNonJoinedBlockInputStream(const IJoin & join_, const Block & block, UInt64 max_block_size_)
|
||||
: join(join_)
|
||||
, result_sample_block(block)
|
||||
, max_block_size(max_block_size_)
|
||||
{}
|
||||
|
||||
String getName() const override { return "LazyNonMergeJoined"; }
|
||||
Block getHeader() const override { return result_sample_block; }
|
||||
|
||||
protected:
|
||||
Block readImpl() override
|
||||
{
|
||||
if (!stream)
|
||||
{
|
||||
stream = join.createStreamWithNonJoinedRows(result_sample_block, max_block_size);
|
||||
if (!stream)
|
||||
return {};
|
||||
}
|
||||
|
||||
return stream->read();
|
||||
}
|
||||
|
||||
private:
|
||||
BlockInputStreamPtr stream;
|
||||
const IJoin & join;
|
||||
Block result_sample_block;
|
||||
UInt64 max_block_size;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -0,0 +1,9 @@
|
||||
1 l \N LowCardinality(String) Nullable(String)
|
||||
2 \N LowCardinality(String) Nullable(String)
|
||||
1 l \N LowCardinality(String) Nullable(String)
|
||||
2 \N LowCardinality(String) Nullable(String)
|
||||
-
|
||||
0 \N Nullable(String) LowCardinality(String)
|
||||
1 \N l Nullable(String) LowCardinality(String)
|
||||
0 \N Nullable(String) LowCardinality(String)
|
||||
1 \N l Nullable(String) LowCardinality(String)
|
26
tests/queries/0_stateless/01476_right_full_join_switch.sql
Normal file
26
tests/queries/0_stateless/01476_right_full_join_switch.sql
Normal file
@ -0,0 +1,26 @@
|
||||
SET join_algorithm = 'auto';
|
||||
SET max_bytes_in_join = 100;
|
||||
|
||||
DROP TABLE IF EXISTS t;
|
||||
DROP TABLE IF EXISTS nr;
|
||||
|
||||
CREATE TABLE t (`x` UInt32, `s` LowCardinality(String)) ENGINE = Memory;
|
||||
CREATE TABLE nr (`x` Nullable(UInt32), `s` Nullable(String)) ENGINE = Memory;
|
||||
|
||||
INSERT INTO t VALUES (1, 'l');
|
||||
INSERT INTO nr VALUES (2, NULL);
|
||||
|
||||
SET join_use_nulls = 0;
|
||||
|
||||
SELECT t.x, l.s, r.s, toTypeName(l.s), toTypeName(r.s) FROM t AS l LEFT JOIN nr AS r USING (x) ORDER BY t.x;
|
||||
SELECT t.x, l.s, r.s, toTypeName(l.s), toTypeName(r.s) FROM t AS l RIGHT JOIN nr AS r USING (x) ORDER BY t.x;
|
||||
SELECT t.x, l.s, r.s, toTypeName(l.s), toTypeName(r.s) FROM t AS l FULL JOIN nr AS r USING (x) ORDER BY t.x;
|
||||
|
||||
SELECT '-';
|
||||
|
||||
SELECT t.x, l.s, r.s, toTypeName(l.s), toTypeName(r.s) FROM nr AS l LEFT JOIN t AS r USING (x) ORDER BY t.x;
|
||||
SELECT t.x, l.s, r.s, toTypeName(l.s), toTypeName(r.s) FROM nr AS l RIGHT JOIN t AS r USING (x) ORDER BY t.x;
|
||||
SELECT t.x, l.s, r.s, toTypeName(l.s), toTypeName(r.s) FROM nr AS l FULL JOIN t AS r USING (x) ORDER BY t.x;
|
||||
|
||||
DROP TABLE t;
|
||||
DROP TABLE nr;
|
Loading…
Reference in New Issue
Block a user