support JOIN ON expression

This commit is contained in:
chertus 2019-11-05 23:22:20 +03:00
parent 6a871f579f
commit 950e95c178
5 changed files with 107 additions and 31 deletions

View File

@ -15,6 +15,7 @@
#include <DataStreams/DistinctBlockInputStream.h> #include <DataStreams/DistinctBlockInputStream.h>
#include <DataStreams/NullBlockInputStream.h> #include <DataStreams/NullBlockInputStream.h>
#include <DataStreams/TotalsHavingBlockInputStream.h> #include <DataStreams/TotalsHavingBlockInputStream.h>
#include <DataStreams/OneBlockInputStream.h>
#include <DataStreams/copyData.h> #include <DataStreams/copyData.h>
#include <DataStreams/CreatingSetsBlockInputStream.h> #include <DataStreams/CreatingSetsBlockInputStream.h>
#include <DataStreams/MaterializingBlockInputStream.h> #include <DataStreams/MaterializingBlockInputStream.h>
@ -1179,7 +1180,10 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS
if (JoinPtr join = expressions.before_join->getTableJoinAlgo()) if (JoinPtr join = expressions.before_join->getTableJoinAlgo())
{ {
if (auto stream = join->createStreamWithNonJoinedRows(header_before_join, settings.max_block_size)) Block join_result_sample = ExpressionBlockInputStream(
std::make_shared<OneBlockInputStream>(header_before_join), expressions.before_join).getHeader();
if (auto stream = join->createStreamWithNonJoinedRows(join_result_sample, settings.max_block_size))
{ {
if constexpr (pipeline_with_processors) if constexpr (pipeline_with_processors)
{ {

View File

@ -1066,9 +1066,10 @@ struct AdderNonJoined<ASTTableJoin::Strictness::Asof, Mapped>
class NonJoinedBlockInputStream : public IBlockInputStream class NonJoinedBlockInputStream : public IBlockInputStream
{ {
public: public:
NonJoinedBlockInputStream(const Join & parent_, const Block & left_sample_block, UInt64 max_block_size_) NonJoinedBlockInputStream(const Join & parent_, const Block & result_sample_block_, UInt64 max_block_size_)
: parent(parent_) : parent(parent_)
, max_block_size(max_block_size_) , max_block_size(max_block_size_)
, result_sample_block(materializeBlock(result_sample_block_))
{ {
bool remap_keys = parent.table_join->hasUsing(); bool remap_keys = parent.table_join->hasUsing();
std::unordered_map<size_t, size_t> left_to_right_key_remap; std::unordered_map<size_t, size_t> left_to_right_key_remap;
@ -1078,16 +1079,18 @@ public:
const String & left_key_name = parent.table_join->keyNamesLeft()[i]; const String & left_key_name = parent.table_join->keyNamesLeft()[i];
const String & right_key_name = parent.table_join->keyNamesRight()[i]; const String & right_key_name = parent.table_join->keyNamesRight()[i];
size_t left_key_pos = left_sample_block.getPositionByName(left_key_name); size_t left_key_pos = result_sample_block.getPositionByName(left_key_name);
size_t right_key_pos = parent.saved_block_sample.getPositionByName(right_key_name); size_t right_key_pos = parent.saved_block_sample.getPositionByName(right_key_name);
if (remap_keys && !parent.required_right_keys.has(right_key_name)) if (remap_keys && !parent.required_right_keys.has(right_key_name))
left_to_right_key_remap[left_key_pos] = right_key_pos; left_to_right_key_remap[left_key_pos] = right_key_pos;
} }
makeResultSampleBlock(left_sample_block); /// result_sample_block: left_sample_block + left expressions, right not key columns, required right keys
size_t left_columns_count = result_sample_block.columns() -
parent.sample_block_with_columns_to_add.columns() - parent.required_right_keys.columns();
for (size_t left_pos = 0; left_pos < left_sample_block.columns(); ++left_pos) for (size_t left_pos = 0; left_pos < left_columns_count; ++left_pos)
{ {
/// We need right 'x' for 'RIGHT JOIN ... USING(x)'. /// We need right 'x' for 'RIGHT JOIN ... USING(x)'.
if (left_to_right_key_remap.count(left_pos)) if (left_to_right_key_remap.count(left_pos))
@ -1108,7 +1111,7 @@ public:
size_t result_position = result_sample_block.getPositionByName(name); size_t result_position = result_sample_block.getPositionByName(name);
/// Don't remap left keys twice. We need only qualified right keys here /// Don't remap left keys twice. We need only qualified right keys here
if (result_position < left_sample_block.columns()) if (result_position < left_columns_count)
continue; continue;
setRightIndex(right_pos, result_position); setRightIndex(right_pos, result_position);
@ -1140,7 +1143,7 @@ private:
UInt64 max_block_size; UInt64 max_block_size;
Block result_sample_block; Block result_sample_block;
/// Indices of columns in result_sample_block that come from the left-side table: left_pos == result_pos /// Indices of columns in result_sample_block that should be generated
std::vector<size_t> column_indices_left; std::vector<size_t> column_indices_left;
/// Indices of columns that come from the right-side table: right_pos -> result_pos /// Indices of columns that come from the right-side table: right_pos -> result_pos
std::unordered_map<size_t, size_t> column_indices_right; std::unordered_map<size_t, size_t> column_indices_right;
@ -1152,27 +1155,6 @@ private:
std::any position; std::any position;
std::optional<Join::BlockNullmapList::const_iterator> nulls_position; std::optional<Join::BlockNullmapList::const_iterator> nulls_position;
/// "left" columns, "right" not key columns, some "right keys"
void makeResultSampleBlock(const Block & left_sample_block)
{
result_sample_block = materializeBlock(left_sample_block);
if (parent.nullable_left_side)
JoinCommon::convertColumnsToNullable(result_sample_block);
for (const ColumnWithTypeAndName & column : parent.sample_block_with_columns_to_add)
{
bool is_nullable = parent.nullable_right_side || column.column->isNullable();
result_sample_block.insert(correctNullability({column.column, column.type, column.name}, is_nullable));
}
for (const ColumnWithTypeAndName & right_key : parent.required_right_keys)
{
bool is_nullable = parent.nullable_right_side || right_key.column->isNullable();
result_sample_block.insert(correctNullability({right_key.column, right_key.type, right_key.name}, is_nullable));
}
}
void setRightIndex(size_t right_pos, size_t result_position) void setRightIndex(size_t right_pos, size_t result_position)
{ {
if (!column_indices_right.count(right_pos)) if (!column_indices_right.count(right_pos))
@ -1328,10 +1310,10 @@ private:
}; };
BlockInputStreamPtr Join::createStreamWithNonJoinedRows(const Block & left_sample_block, UInt64 max_block_size) const BlockInputStreamPtr Join::createStreamWithNonJoinedRows(const Block & result_sample_block, UInt64 max_block_size) const
{ {
if (isRightOrFull(table_join->kind())) if (isRightOrFull(table_join->kind()))
return std::make_shared<NonJoinedBlockInputStream>(*this, left_sample_block, max_block_size); return std::make_shared<NonJoinedBlockInputStream>(*this, result_sample_block, max_block_size);
return {}; return {};
} }

View File

@ -156,7 +156,7 @@ public:
* Use only after all calls to joinBlock was done. * Use only after all calls to joinBlock was done.
* left_sample_block is passed without account of 'use_nulls' setting (columns will be converted to Nullable inside). * left_sample_block is passed without account of 'use_nulls' setting (columns will be converted to Nullable inside).
*/ */
BlockInputStreamPtr createStreamWithNonJoinedRows(const Block & left_sample_block, UInt64 max_block_size) const override; BlockInputStreamPtr createStreamWithNonJoinedRows(const Block & result_sample_block, UInt64 max_block_size) const override;
/// Number of keys in all built JOIN maps. /// Number of keys in all built JOIN maps.
size_t getTotalRowCount() const final; size_t getTotalRowCount() const final;

View File

@ -0,0 +1,36 @@
0 2
-
0 2
1 0
-
1 2
-
1 2
-
1 2
-
1 2
-
0 2
-
0 2
1 0
----
\N 2
-
1 \N
\N 2
-
1 2
-
1 2
-
1 2
-
1 2
-
\N 2
-
1 \N
\N 2
-

View File

@ -0,0 +1,54 @@
drop table if exists X;
drop table if exists Y;
create table X (id Int64) Engine = Memory;
create table Y (id Int64) Engine = Memory;
insert into X (id) values (1);
insert into Y (id) values (2);
set join_use_nulls = 0;
select X.id, Y.id from X right join Y on X.id = Y.id order by X.id, Y.id;
select '-';
select X.id, Y.id from X full join Y on Y.id = X.id order by X.id, Y.id;
select '-';
select X.id, Y.id from X right join Y on X.id = (Y.id - 1) order by X.id, Y.id;
select '-';
select X.id, Y.id from X full join Y on (Y.id - 1) = X.id order by X.id, Y.id;
select '-';
select X.id, Y.id from X right join Y on (X.id + 1) = Y.id order by X.id, Y.id;
select '-';
select X.id, Y.id from X full join Y on Y.id = (X.id + 1) order by X.id, Y.id;
select '-';
select X.id, Y.id from X right join Y on (X.id + 1) = (Y.id + 1) order by X.id, Y.id;
select '-';
select X.id, Y.id from X full join Y on (Y.id + 1) = (X.id + 1) order by X.id, Y.id;
select '----';
set join_use_nulls = 1;
select X.id, Y.id from X right join Y on X.id = Y.id order by X.id, Y.id;
select '-';
select X.id, Y.id from X full join Y on Y.id = X.id order by X.id, Y.id;
select '-';
select X.id, Y.id from X right join Y on X.id = (Y.id - 1) order by X.id, Y.id;
select '-';
select X.id, Y.id from X full join Y on (Y.id - 1) = X.id order by X.id, Y.id;
select '-';
select X.id, Y.id from X right join Y on (X.id + 1) = Y.id order by X.id, Y.id;
select '-';
select X.id, Y.id from X full join Y on Y.id = (X.id + 1) order by X.id, Y.id;
select '-';
select X.id, Y.id from X right join Y on (X.id + 1) = (Y.id + 1) order by X.id, Y.id;
select '-';
select X.id, Y.id from X full join Y on (Y.id + 1) = (X.id + 1) order by X.id, Y.id;
select '-';
drop table X;
drop table Y;