all|any left|inner, not tested

This commit is contained in:
chertus 2019-09-13 20:23:32 +03:00
parent bd957168d2
commit eb3d87032c
2 changed files with 91 additions and 22 deletions

View File

@ -122,10 +122,6 @@ MergeJoin::MergeJoin(const AnalyzedJoin & table_join_, const Block & right_sampl
{
if (!isLeft(table_join.kind()) && !isInner(table_join.kind()))
throw Exception("Partial merge supported for LEFT and INNER JOINs only", ErrorCodes::NOT_IMPLEMENTED);
#if 0
if (table_join.strictness() != ASTTableJoin::Strictness::Any)
throw Exception("Partial merge supported for ANY JOIN variant only", ErrorCodes::NOT_IMPLEMENTED);
#endif
JoinCommon::extractKeysForJoin(table_join.keyNamesRight(), right_sample_block, right_table_keys, right_columns_to_add);
@ -189,7 +185,7 @@ void MergeJoin::joinBlock(Block & block)
if (isLeft(table_join.kind()))
{
MutableColumns right_columns = makeRightColumns(0);
MutableColumns right_columns = makeMutableColumns(right_columns_to_add);
MergeJoinCursor left_cursor(block, left_merge_description);
for (auto it = right_blocks.begin(); it != right_blocks.end(); ++it)
@ -203,8 +199,19 @@ void MergeJoin::joinBlock(Block & block)
}
else if (isInner(table_join.kind()))
{
/// TODO
MutableColumns right_columns = makeRightColumns(block.rows());
MutableColumns left_columns = makeMutableColumns(block);
MutableColumns right_columns = makeMutableColumns(right_columns_to_add);
MergeJoinCursor left_cursor(block, left_merge_description);
for (auto it = right_blocks.begin(); it != right_blocks.end(); ++it)
{
if (left_cursor.atEnd())
break;
innerJoin(left_cursor, block, *it, left_columns, right_columns);
}
block.clear();
appendRightColumns(block, std::move(left_columns));
appendRightColumns(block, std::move(right_columns));
}
}
@ -224,7 +231,7 @@ void MergeJoin::leftJoin(MergeJoinCursor & left_cursor, const Block & right_bloc
if (range.empty())
break;
anyLeftJoinEquals(right_block, right_columns, range);
leftJoinEquals(right_block, right_columns, range);
right_cursor.nextN(range.right_length);
/// TODO: Do not run over last left keys for ALL JOIN (cause of possible duplicates in next right block)
@ -233,13 +240,33 @@ void MergeJoin::leftJoin(MergeJoinCursor & left_cursor, const Block & right_bloc
}
}
MutableColumns MergeJoin::makeRightColumns(size_t rows)
void MergeJoin::innerJoin(MergeJoinCursor & left_cursor, const Block & left_block, const Block & right_block,
MutableColumns & left_columns, MutableColumns & right_columns)
{
MergeJoinCursor right_cursor(right_block, right_merge_description);
while (!left_cursor.atEnd() && !right_cursor.atEnd())
{
Range range = left_cursor.getNextEqualRange(right_cursor);
if (range.empty())
break;
innerJoinEquals(left_block, right_block, left_columns, right_columns, range);
right_cursor.nextN(range.right_length);
/// TODO: Do not run over last left keys for ALL JOIN (cause of possible duplicates in next right block)
//if (!right_cursor.atEnd())
left_cursor.nextN(range.left_length);
}
}
MutableColumns MergeJoin::makeMutableColumns(const Block & block)
{
MutableColumns columns;
columns.reserve(right_columns_to_add.columns());
columns.reserve(block.columns());
for (const auto & src_column : right_columns_to_add)
columns.push_back(src_column.column->cloneResized(rows));
for (const auto & src_column : block)
columns.push_back(src_column.column->cloneEmpty());
return columns;
}
@ -259,18 +286,56 @@ void MergeJoin::appendRightNulls(MutableColumns & right_columns, size_t rows_to_
column->insertDefault();
}
void MergeJoin::anyLeftJoinEquals(const Block & right_block, MutableColumns & right_columns, const Range & range)
void MergeJoin::leftJoinEquals(const Block & right_block, MutableColumns & right_columns, const Range & range)
{
size_t rows_to_insert = range.left_length;
size_t any_row_position = range.right_start;
bool any = table_join.strictness() == ASTTableJoin::Strictness::Any;
for (size_t i = 0; i < right_columns_to_add.columns(); ++i)
size_t left_rows_to_insert = range.left_length;
size_t right_rows_to_insert = any ? 1 : range.right_length;
size_t row_position = range.right_start;
for (size_t right_row = 0; right_row < right_rows_to_insert; ++right_row, ++row_position)
{
const auto & src_column = right_block.getByName(right_columns_to_add.getByPosition(i).name);
auto & dst_column = right_columns[i];
for (size_t i = 0; i < right_columns_to_add.columns(); ++i)
{
const auto & src_column = right_block.getByName(right_columns_to_add.getByPosition(i).name);
auto & dst_column = right_columns[i];
for (size_t row = 0; row < rows_to_insert; ++row)
dst_column->insertFrom(*src_column.column, any_row_position);
for (size_t left_row = 0; left_row < left_rows_to_insert; ++left_row)
dst_column->insertFrom(*src_column.column, row_position);
}
}
}
void MergeJoin::innerJoinEquals(const Block & left_block, const Block & right_block,
MutableColumns & left_columns, MutableColumns & right_columns, const Range & range)
{
bool any = table_join.strictness() == ASTTableJoin::Strictness::Any;
size_t left_rows_to_insert = range.left_length;
size_t right_rows_to_insert = any ? 1 : range.right_length;
size_t row_position = range.right_start;
for (size_t right_row = 0; right_row < right_rows_to_insert; ++right_row, ++row_position)
{
for (size_t i = 0; i < left_block.columns(); ++i)
{
const auto & src_column = left_block.getByPosition(i);
auto & dst_column = left_columns[i];
size_t row_pos = range.left_start;
for (size_t row = 0; row < left_rows_to_insert; ++row, ++row_pos)
dst_column->insertFrom(*src_column.column, row_pos);
}
for (size_t i = 0; i < right_columns_to_add.columns(); ++i)
{
const auto & src_column = right_block.getByName(right_columns_to_add.getByPosition(i).name);
auto & dst_column = right_columns[i];
for (size_t row = 0; row < left_rows_to_insert; ++row)
dst_column->insertFrom(*src_column.column, row_position);
}
}
}

View File

@ -41,14 +41,18 @@ private:
size_t right_blocks_row_count = 0;
size_t right_blocks_bytes = 0;
MutableColumns makeRightColumns(size_t rows);
MutableColumns makeMutableColumns(const Block & block);
void appendRightColumns(Block & block, MutableColumns && right_columns);
void mergeRightBlocks();
void leftJoin(MergeJoinCursor & left_cursor, const Block & right_block, MutableColumns & right_columns);
void innerJoin(MergeJoinCursor & left_cursor, const Block & left_block, const Block & right_block,
MutableColumns & left_columns, MutableColumns & right_columns);
void appendRightNulls(MutableColumns & right_columns, size_t rows_to_add);
void anyLeftJoinEquals(const Block & right_block, MutableColumns & right_columns, const MergeJoinEqualRange & range);
void leftJoinEquals(const Block & right_block, MutableColumns & right_columns, const MergeJoinEqualRange & range);
void innerJoinEquals(const Block & left_block, const Block & right_block,
MutableColumns & left_columns, MutableColumns & right_columns, const MergeJoinEqualRange & range);
};
}