partial merge join (minimal tested version)

This commit is contained in:
chertus 2019-09-16 22:31:22 +03:00
parent cbd96af079
commit da5d35b34e
4 changed files with 380 additions and 103 deletions

View File

@ -36,6 +36,7 @@ public:
{}
size_t position() const { return impl.pos; }
size_t end() const { return impl.rows; }
bool atEnd() const { return impl.pos >= impl.rows; }
void nextN(size_t num) { impl.pos += num; }
@ -100,7 +101,20 @@ private:
SortCursorImpl impl;
};
static void makeSortAndMerge(const Names & keys, SortDescription & sort, SortDescription & merge)
namespace
{
MutableColumns makeMutableColumns(const Block & block)
{
MutableColumns columns;
columns.reserve(block.columns());
for (const auto & src_column : block)
columns.push_back(src_column.column->cloneEmpty());
return columns;
}
void makeSortAndMerge(const Names & keys, SortDescription & sort, SortDescription & merge)
{
NameSet unique_keys;
for (auto & key_name : keys)
@ -115,10 +129,80 @@ static void makeSortAndMerge(const Names & keys, SortDescription & sort, SortDes
}
}
void copyLeftRange(const Block & block, MutableColumns & columns, size_t start, size_t rows_to_add)
{
for (size_t i = 0; i < block.columns(); ++i)
{
const auto & src_column = block.getByPosition(i);
auto & dst_column = columns[i];
size_t row_pos = start;
for (size_t row = 0; row < rows_to_add; ++row, ++row_pos)
dst_column->insertFrom(*src_column.column, row_pos);
}
}
void copyRightRange(const Block & right_block, const Block & right_columns_to_add, MutableColumns & columns,
size_t row_position, size_t rows_to_add)
{
for (size_t i = 0; i < right_columns_to_add.columns(); ++i)
{
const auto & src_column = right_block.getByName(right_columns_to_add.getByPosition(i).name);
auto & dst_column = columns[i];
for (size_t row = 0; row < rows_to_add; ++row)
dst_column->insertFrom(*src_column.column, row_position);
}
}
void joinEqualsAnyLeft(const Block & right_block, const Block & right_columns_to_add, MutableColumns & right_columns, const Range & range)
{
copyRightRange(right_block, right_columns_to_add, right_columns, range.right_start, range.left_length);
}
void joinEquals(const Block & left_block, const Block & right_block, const Block & right_columns_to_add,
MutableColumns & left_columns, MutableColumns & right_columns, const Range & range, bool is_all)
{
size_t left_rows_to_add = range.left_length;
size_t right_rows_to_add = is_all ? range.right_length : 1;
size_t row_position = range.right_start;
for (size_t right_row = 0; right_row < right_rows_to_add; ++right_row, ++row_position)
{
copyLeftRange(left_block, left_columns, range.left_start, left_rows_to_add);
copyRightRange(right_block, right_columns_to_add, right_columns, row_position, left_rows_to_add);
}
}
void appendNulls(MutableColumns & right_columns, size_t rows_to_add)
{
for (auto & column : right_columns)
for (size_t i = 0; i < rows_to_add; ++i)
column->insertDefault();
}
void joinInequalsLeft(const Block & left_block, MutableColumns & left_columns, MutableColumns & right_columns,
size_t start, size_t end, bool copy_left)
{
if (end <= start)
return;
size_t rows_to_add = end - start;
if (copy_left)
copyLeftRange(left_block, left_columns, start, rows_to_add);
appendNulls(right_columns, rows_to_add);
}
}
MergeJoin::MergeJoin(std::shared_ptr<AnalyzedJoin> table_join_, const Block & right_sample_block)
: table_join(table_join_)
, nullable_right_side(table_join->forceNullabelRight())
, is_all(table_join->strictness() == ASTTableJoin::Strictness::All)
, is_inner(isInner(table_join->kind()))
, is_left(isLeft(table_join->kind()))
{
if (!isLeft(table_join->kind()) && !isInner(table_join->kind()))
throw Exception("Partial merge supported for LEFT and INNER JOINs only", ErrorCodes::NOT_IMPLEMENTED);
@ -149,6 +233,9 @@ void MergeJoin::mergeRightBlocks()
{
const size_t max_merged_block_size = 128 * 1024 * 1024;
if (right_blocks.empty())
return;
Blocks unsorted_blocks;
unsorted_blocks.reserve(right_blocks.size());
for (const auto & block : right_blocks)
@ -183,26 +270,27 @@ void MergeJoin::joinBlock(Block & block)
std::shared_lock lock(rwlock);
if (isLeft(table_join->kind()))
{
MutableColumns right_columns = makeMutableColumns(right_columns_to_add);
MutableColumns left_columns = makeMutableColumns(block);
MutableColumns right_columns = makeMutableColumns(right_columns_to_add);
MergeJoinCursor left_cursor(block, left_merge_description);
MergeJoinCursor left_cursor(block, left_merge_description);
if (is_left)
{
for (auto it = right_blocks.begin(); it != right_blocks.end(); ++it)
{
if (left_cursor.atEnd())
break;
leftJoin(left_cursor, *it, right_columns);
leftJoin(left_cursor, block, *it, left_columns, right_columns);
}
appendRightColumns(block, std::move(right_columns));
}
else if (isInner(table_join->kind()))
{
MutableColumns left_columns = makeMutableColumns(block);
MutableColumns right_columns = makeMutableColumns(right_columns_to_add);
joinInequalsLeft(block, left_columns, right_columns, left_cursor.position(), left_cursor.end(), is_all);
//left_cursor.nextN(left_cursor.end() - left_cursor.position());
MergeJoinCursor left_cursor(block, left_merge_description);
changeLeftColumns(block, std::move(left_columns));
addRightColumns(block, std::move(right_columns));
}
else if (is_inner)
{
for (auto it = right_blocks.begin(); it != right_blocks.end(); ++it)
{
if (left_cursor.atEnd())
@ -210,32 +298,36 @@ void MergeJoin::joinBlock(Block & block)
innerJoin(left_cursor, block, *it, left_columns, right_columns);
}
block.clear();
appendRightColumns(block, std::move(left_columns));
appendRightColumns(block, std::move(right_columns));
changeLeftColumns(block, std::move(left_columns));
addRightColumns(block, std::move(right_columns));
}
}
void MergeJoin::leftJoin(MergeJoinCursor & left_cursor, const Block & right_block, MutableColumns & right_columns)
void MergeJoin::leftJoin(MergeJoinCursor & left_cursor, const Block & left_block, const Block & right_block,
MutableColumns & left_columns, MutableColumns & right_columns)
{
MergeJoinCursor right_cursor(right_block, right_merge_description);
while (!left_cursor.atEnd() && !right_cursor.atEnd())
{
size_t left_position = left_cursor.position();
size_t left_position = left_cursor.position(); /// save inequal position
Range range = left_cursor.getNextEqualRange(right_cursor);
if (left_position < range.left_start)
appendRightNulls(right_columns, range.left_start - left_position);
joinInequalsLeft(left_block, left_columns, right_columns, left_position, range.left_start, is_all);
if (range.empty())
break;
leftJoinEquals(right_block, right_columns, range);
if (is_all)
joinEquals(left_block, right_block, right_columns_to_add, left_columns, right_columns, range, is_all);
else
joinEqualsAnyLeft(right_block, right_columns_to_add, right_columns, range);
right_cursor.nextN(range.right_length);
/// TODO: Do not run over last left keys for ALL JOIN (cause of possible duplicates in next right block)
//if (!right_cursor.atEnd())
/// Do not run over last left keys for ALL JOIN (cause of possible duplicates in next right block)
if (is_all && right_cursor.atEnd())
break;
left_cursor.nextN(range.left_length);
}
}
@ -251,26 +343,24 @@ void MergeJoin::innerJoin(MergeJoinCursor & left_cursor, const Block & left_bloc
if (range.empty())
break;
innerJoinEquals(left_block, right_block, left_columns, right_columns, range);
joinEquals(left_block, right_block, right_columns_to_add, left_columns, right_columns, range, is_all);
right_cursor.nextN(range.right_length);
/// TODO: Do not run over last left keys for ALL JOIN (cause of possible duplicates in next right block)
//if (!right_cursor.atEnd())
/// Do not run over last left keys for ALL JOIN (cause of possible duplicates in next right block)
if (is_all && right_cursor.atEnd())
break;
left_cursor.nextN(range.left_length);
}
}
MutableColumns MergeJoin::makeMutableColumns(const Block & block)
void MergeJoin::changeLeftColumns(Block & block, MutableColumns && columns)
{
MutableColumns columns;
columns.reserve(block.columns());
for (const auto & src_column : block)
columns.push_back(src_column.column->cloneEmpty());
return columns;
if (is_left && !is_all)
return;
block.setColumns(std::move(columns));
}
void MergeJoin::appendRightColumns(Block & block, MutableColumns && right_columns)
void MergeJoin::addRightColumns(Block & block, MutableColumns && right_columns)
{
for (size_t i = 0; i < right_columns_to_add.columns(); ++i)
{
@ -279,64 +369,4 @@ void MergeJoin::appendRightColumns(Block & block, MutableColumns && right_column
}
}
void MergeJoin::appendRightNulls(MutableColumns & right_columns, size_t rows_to_add)
{
for (auto & column : right_columns)
for (size_t i = 0; i < rows_to_add; ++i)
column->insertDefault();
}
void MergeJoin::leftJoinEquals(const Block & right_block, MutableColumns & right_columns, const Range & range)
{
bool any = table_join->strictness() == ASTTableJoin::Strictness::Any;
size_t left_rows_to_insert = range.left_length;
size_t right_rows_to_insert = any ? 1 : range.right_length;
size_t row_position = range.right_start;
for (size_t right_row = 0; right_row < right_rows_to_insert; ++right_row, ++row_position)
{
for (size_t i = 0; i < right_columns_to_add.columns(); ++i)
{
const auto & src_column = right_block.getByName(right_columns_to_add.getByPosition(i).name);
auto & dst_column = right_columns[i];
for (size_t left_row = 0; left_row < left_rows_to_insert; ++left_row)
dst_column->insertFrom(*src_column.column, row_position);
}
}
}
void MergeJoin::innerJoinEquals(const Block & left_block, const Block & right_block,
MutableColumns & left_columns, MutableColumns & right_columns, const Range & range)
{
bool any = table_join->strictness() == ASTTableJoin::Strictness::Any;
size_t left_rows_to_insert = range.left_length;
size_t right_rows_to_insert = any ? 1 : range.right_length;
size_t row_position = range.right_start;
for (size_t right_row = 0; right_row < right_rows_to_insert; ++right_row, ++row_position)
{
for (size_t i = 0; i < left_block.columns(); ++i)
{
const auto & src_column = left_block.getByPosition(i);
auto & dst_column = left_columns[i];
size_t row_pos = range.left_start;
for (size_t row = 0; row < left_rows_to_insert; ++row, ++row_pos)
dst_column->insertFrom(*src_column.column, row_pos);
}
for (size_t i = 0; i < right_columns_to_add.columns(); ++i)
{
const auto & src_column = right_block.getByName(right_columns_to_add.getByPosition(i).name);
auto & dst_column = right_columns[i];
for (size_t row = 0; row < left_rows_to_insert; ++row)
dst_column->insertFrom(*src_column.column, row_position);
}
}
}
}

View File

@ -37,22 +37,21 @@ private:
Block right_columns_to_add;
BlocksList right_blocks;
Block totals;
bool nullable_right_side;
size_t right_blocks_row_count = 0;
size_t right_blocks_bytes = 0;
const bool nullable_right_side;
const bool is_all;
const bool is_inner;
const bool is_left;
MutableColumns makeMutableColumns(const Block & block);
void appendRightColumns(Block & block, MutableColumns && right_columns);
void changeLeftColumns(Block & block, MutableColumns && columns);
void addRightColumns(Block & block, MutableColumns && columns);
void mergeRightBlocks();
void leftJoin(MergeJoinCursor & left_cursor, const Block & right_block, MutableColumns & right_columns);
void leftJoin(MergeJoinCursor & left_cursor, const Block & left_block, const Block & right_block,
MutableColumns & left_columns, MutableColumns & right_columns);
void innerJoin(MergeJoinCursor & left_cursor, const Block & left_block, const Block & right_block,
MutableColumns & left_columns, MutableColumns & right_columns);
void appendRightNulls(MutableColumns & right_columns, size_t rows_to_add);
void leftJoinEquals(const Block & right_block, MutableColumns & right_columns, const MergeJoinEqualRange & range);
void innerJoinEquals(const Block & left_block, const Block & right_block,
MutableColumns & left_columns, MutableColumns & right_columns, const MergeJoinEqualRange & range);
};
}

View File

@ -0,0 +1,124 @@
t join none using
0 0 0
-
0 0 0
-
-
t join none on
0 0 0 0
-
0 0 0 0
-
-
none join t using
none join t on
/none
t join none using
0 0 \N
-
0 0 \N
-
-
t join none on
0 0 \N \N
-
0 0 \N \N
-
-
none join t using
none join t on
/none
any left
0 0 0
1 10 0
2 20 2
3 30 0
4 40 4
-
0 0 0
1 10 0
2 20 0
3 30 0
4 40 0
-
0 0 0
1 10 0
2 20 2
3 30 0
4 40 4
-
0 0 0
1 10 0
2 20 0
3 30 0
4 40 0
all left
0 0 0 0
1 10 0 0
2 20 2 21
2 20 2 22
3 30 0 0
4 40 4 41
4 40 4 42
-
0 0 0 0
1 10 0 0
2 20 0 0
3 30 0 0
4 40 0 0
-
0 0 0 0
1 10 0 0
2 20 0 0
3 30 0 0
4 40 0 0
-
0 0 0 0
1 10 0 0
2 20 2 21
2 20 2 22
3 30 0 0
4 40 4 41
4 40 4 42
-
0 0 0 0
1 10 0 0
2 20 2 21
2 20 2 22
3 30 0 0
4 40 4 41
4 40 4 42
any inner
0 0 0
2 20 2
4 40 4
-
0 0 0
-
0 0 0
2 20 2
4 40 4
-
0 0 0
all inner
0 0 0 0
2 20 2 21
2 20 2 22
4 40 4 41
4 40 4 42
-
0 0 0 0
-
0 0 0 0
-
0 0 0 0
2 20 2 21
2 20 2 22
4 40 4 41
4 40 4 42
-
0 0 0 0
2 20 2 21
2 20 2 22
4 40 4 41
4 40 4 42

View File

@ -0,0 +1,124 @@
DROP TABLE IF EXISTS t0;
DROP TABLE IF EXISTS t1;
DROP TABLE IF EXISTS t2;
CREATE TABLE t0 (x UInt32, y UInt64) engine = MergeTree ORDER BY (x,y);
CREATE TABLE t1 (x UInt32, y UInt64) engine = MergeTree ORDER BY (x,y);
CREATE TABLE t2 (x UInt32, y UInt64) engine = MergeTree ORDER BY (x,y);
INSERT INTO t1 (x, y) VALUES (0, 0);
SET partial_merge_join = 1;
SET any_join_distinct_right_table_keys = 1;
SELECT 't join none using';
SELECT * FROM t1 ANY LEFT JOIN t0 USING (x) ORDER BY x;
SELECT '-';
SELECT * FROM t1 LEFT JOIN t0 USING (x) ORDER BY x;
SELECT '-';
SELECT * FROM t1 ANY INNER JOIN t0 USING (x) ORDER BY x;
SELECT '-';
SELECT * FROM t1 INNER JOIN t0 USING (x) ORDER BY x;
SELECT 't join none on';
SELECT * FROM t1 ANY LEFT JOIN t0 ON t1.x = t0.x ORDER BY x;
SELECT '-';
SELECT * FROM t1 LEFT JOIN t0 ON t1.x = t0.x ORDER BY x;
SELECT '-';
SELECT * FROM t1 ANY INNER JOIN t0 ON t1.x = t0.x ORDER BY x;
SELECT '-';
SELECT * FROM t1 INNER JOIN t0 ON t1.x = t0.x ORDER BY x;
SELECT 'none join t using';
SELECT * FROM t0 ANY LEFT JOIN t1 USING (x);
SELECT * FROM t0 LEFT JOIN t1 USING (x);
SELECT * FROM t0 ANY INNER JOIN t1 USING (x);
SELECT * FROM t0 INNER JOIN t1 USING (x);
SELECT 'none join t on';
SELECT * FROM t0 ANY LEFT JOIN t1 ON t1.x = t0.x;
SELECT * FROM t0 LEFT JOIN t1 ON t1.x = t0.x;
SELECT * FROM t0 ANY INNER JOIN t1 ON t1.x = t0.x;
SELECT * FROM t0 INNER JOIN t1 ON t1.x = t0.x;
SELECT '/none';
SET join_use_nulls = 1;
SELECT 't join none using';
SELECT * FROM t1 ANY LEFT JOIN t0 USING (x) ORDER BY x;
SELECT '-';
SELECT * FROM t1 LEFT JOIN t0 USING (x) ORDER BY x;
SELECT '-';
SELECT * FROM t1 ANY INNER JOIN t0 USING (x) ORDER BY x;
SELECT '-';
SELECT * FROM t1 INNER JOIN t0 USING (x) ORDER BY x;
SELECT 't join none on';
SELECT * FROM t1 ANY LEFT JOIN t0 ON t1.x = t0.x ORDER BY x;
SELECT '-';
SELECT * FROM t1 LEFT JOIN t0 ON t1.x = t0.x ORDER BY x;
SELECT '-';
SELECT * FROM t1 ANY INNER JOIN t0 ON t1.x = t0.x ORDER BY x;
SELECT '-';
SELECT * FROM t1 INNER JOIN t0 ON t1.x = t0.x ORDER BY x;
SELECT 'none join t using';
SELECT * FROM t0 ANY LEFT JOIN t1 USING (x);
SELECT * FROM t0 LEFT JOIN t1 USING (x);
SELECT * FROM t0 ANY INNER JOIN t1 USING (x);
SELECT * FROM t0 INNER JOIN t1 USING (x);
SELECT 'none join t on';
SELECT * FROM t0 ANY LEFT JOIN t1 ON t1.x = t0.x;
SELECT * FROM t0 LEFT JOIN t1 ON t1.x = t0.x;
SELECT * FROM t0 ANY INNER JOIN t1 ON t1.x = t0.x;
SELECT * FROM t0 INNER JOIN t1 ON t1.x = t0.x;
SELECT '/none';
INSERT INTO t1 (x, y) VALUES (1, 10) (2, 20);
INSERT INTO t1 (x, y) VALUES (4, 40) (3, 30);
INSERT INTO t2 (x, y) VALUES (4, 41) (2, 21) (2, 22);
INSERT INTO t2 (x, y) VALUES (0, 0) (5, 50) (4, 42);
SET join_use_nulls = 0;
SELECT 'any left';
SELECT t1.*, t2.x FROM t1 ANY LEFT JOIN t2 USING (x) ORDER BY x;
SELECT '-';
SELECT t1.*, t2.x FROM t1 ANY LEFT JOIN t2 USING (x,y) ORDER BY x;
SELECT '-';
SELECT t1.*, t2.x FROM t1 ANY LEFT JOIN t2 USING (x) ORDER BY x;
SELECT '-';
SELECT t1.*, t2.x FROM t1 ANY LEFT JOIN t2 USING (x,y) ORDER BY x;
SELECT 'all left';
SELECT t1.*, t2.* FROM t1 LEFT JOIN t2 ON t1.x = t2.x ORDER BY x, t2.y;
SELECT '-';
SELECT t1.*, t2.* FROM t1 LEFT JOIN t2 ON t1.y = t2.y ORDER BY x;
SELECT '-';
SELECT t1.*, t2.* FROM t1 LEFT JOIN t2 ON t1.x = t2.x AND t1.y = t2.y ORDER BY x;
SELECT '-';
SELECT t1.*, t2.* FROM t1 LEFT JOIN t2 ON t1.x = t2.x AND toUInt32(intDiv(t1.y,10)) = t2.x ORDER BY x, t2.y;
SELECT '-';
SELECT t1.*, t2.* FROM t1 LEFT JOIN t2 ON t1.x = t2.x AND toUInt64(t1.x) = intDiv(t2.y,10) ORDER BY x, t2.y;
SELECT 'any inner';
SELECT t1.*, t2.x FROM t1 ANY INNER JOIN t2 USING (x) ORDER BY x;
SELECT '-';
SELECT t1.*, t2.x FROM t1 ANY INNER JOIN t2 USING (x,y) ORDER BY x;
SELECT '-';
SELECT t1.*, t2.x FROM t1 ANY INNER JOIN t2 USING (x) ORDER BY x;
SELECT '-';
SELECT t1.*, t2.x FROM t1 ANY INNER JOIN t2 USING (x,y) ORDER BY x;
SELECT 'all inner';
SELECT t1.*, t2.* FROM t1 INNER JOIN t2 ON t1.x = t2.x ORDER BY x, t2.y;
SELECT '-';
SELECT t1.*, t2.* FROM t1 INNER JOIN t2 ON t1.y = t2.y ORDER BY x;
SELECT '-';
SELECT t1.*, t2.* FROM t1 INNER JOIN t2 ON t1.x = t2.x AND t1.y = t2.y ORDER BY x;
SELECT '-';
SELECT t1.*, t2.* FROM t1 INNER JOIN t2 ON t1.x = t2.x AND toUInt32(intDiv(t1.y,10)) = t2.x ORDER BY x, t2.y;
SELECT '-';
SELECT t1.*, t2.* FROM t1 INNER JOIN t2 ON t1.x = t2.x AND toUInt64(t1.x) = intDiv(t2.y,10) ORDER BY x, t2.y;
-- TODO: SET join_use_nulls = 1;
DROP TABLE t0;
DROP TABLE t1;
DROP TABLE t2;