diff --git a/dbms/src/Interpreters/MergeJoin.cpp b/dbms/src/Interpreters/MergeJoin.cpp index 55a33a350fe..ea512697ddb 100644 --- a/dbms/src/Interpreters/MergeJoin.cpp +++ b/dbms/src/Interpreters/MergeJoin.cpp @@ -122,10 +122,6 @@ MergeJoin::MergeJoin(const AnalyzedJoin & table_join_, const Block & right_sampl { if (!isLeft(table_join.kind()) && !isInner(table_join.kind())) throw Exception("Partial merge supported for LEFT and INNER JOINs only", ErrorCodes::NOT_IMPLEMENTED); -#if 0 - if (table_join.strictness() != ASTTableJoin::Strictness::Any) - throw Exception("Partial merge supported for ANY JOIN variant only", ErrorCodes::NOT_IMPLEMENTED); -#endif JoinCommon::extractKeysForJoin(table_join.keyNamesRight(), right_sample_block, right_table_keys, right_columns_to_add); @@ -189,7 +185,7 @@ void MergeJoin::joinBlock(Block & block) if (isLeft(table_join.kind())) { - MutableColumns right_columns = makeRightColumns(0); + MutableColumns right_columns = makeMutableColumns(right_columns_to_add); MergeJoinCursor left_cursor(block, left_merge_description); for (auto it = right_blocks.begin(); it != right_blocks.end(); ++it) @@ -203,8 +199,19 @@ void MergeJoin::joinBlock(Block & block) } else if (isInner(table_join.kind())) { - /// TODO - MutableColumns right_columns = makeRightColumns(block.rows()); + MutableColumns left_columns = makeMutableColumns(block); + MutableColumns right_columns = makeMutableColumns(right_columns_to_add); + + MergeJoinCursor left_cursor(block, left_merge_description); + for (auto it = right_blocks.begin(); it != right_blocks.end(); ++it) + { + if (left_cursor.atEnd()) + break; + innerJoin(left_cursor, block, *it, left_columns, right_columns); + } + + block.clear(); + appendRightColumns(block, std::move(left_columns)); appendRightColumns(block, std::move(right_columns)); } } @@ -224,7 +231,7 @@ void MergeJoin::leftJoin(MergeJoinCursor & left_cursor, const Block & right_bloc if (range.empty()) break; - anyLeftJoinEquals(right_block, right_columns, range); + leftJoinEquals(right_block, right_columns, range); right_cursor.nextN(range.right_length); /// TODO: Do not run over last left keys for ALL JOIN (cause of possible duplicates in next right block) @@ -233,13 +240,33 @@ void MergeJoin::leftJoin(MergeJoinCursor & left_cursor, const Block & right_bloc } } -MutableColumns MergeJoin::makeRightColumns(size_t rows) +void MergeJoin::innerJoin(MergeJoinCursor & left_cursor, const Block & left_block, const Block & right_block, + MutableColumns & left_columns, MutableColumns & right_columns) +{ + MergeJoinCursor right_cursor(right_block, right_merge_description); + + while (!left_cursor.atEnd() && !right_cursor.atEnd()) + { + Range range = left_cursor.getNextEqualRange(right_cursor); + if (range.empty()) + break; + + innerJoinEquals(left_block, right_block, left_columns, right_columns, range); + right_cursor.nextN(range.right_length); + + /// TODO: Do not run over last left keys for ALL JOIN (cause of possible duplicates in next right block) + //if (!right_cursor.atEnd()) + left_cursor.nextN(range.left_length); + } +} + +MutableColumns MergeJoin::makeMutableColumns(const Block & block) { MutableColumns columns; - columns.reserve(right_columns_to_add.columns()); + columns.reserve(block.columns()); - for (const auto & src_column : right_columns_to_add) - columns.push_back(src_column.column->cloneResized(rows)); + for (const auto & src_column : block) + columns.push_back(src_column.column->cloneEmpty()); return columns; } @@ -259,18 +286,56 @@ void MergeJoin::appendRightNulls(MutableColumns & right_columns, size_t rows_to_ column->insertDefault(); } -void MergeJoin::anyLeftJoinEquals(const Block & right_block, MutableColumns & right_columns, const Range & range) +void MergeJoin::leftJoinEquals(const Block & right_block, MutableColumns & right_columns, const Range & range) { - size_t rows_to_insert = range.left_length; - size_t any_row_position = range.right_start; + bool any = table_join.strictness() == ASTTableJoin::Strictness::Any; - for (size_t i = 0; i < right_columns_to_add.columns(); ++i) + size_t left_rows_to_insert = range.left_length; + size_t right_rows_to_insert = any ? 1 : range.right_length; + + size_t row_position = range.right_start; + for (size_t right_row = 0; right_row < right_rows_to_insert; ++right_row, ++row_position) { - const auto & src_column = right_block.getByName(right_columns_to_add.getByPosition(i).name); - auto & dst_column = right_columns[i]; + for (size_t i = 0; i < right_columns_to_add.columns(); ++i) + { + const auto & src_column = right_block.getByName(right_columns_to_add.getByPosition(i).name); + auto & dst_column = right_columns[i]; - for (size_t row = 0; row < rows_to_insert; ++row) - dst_column->insertFrom(*src_column.column, any_row_position); + for (size_t left_row = 0; left_row < left_rows_to_insert; ++left_row) + dst_column->insertFrom(*src_column.column, row_position); + } + } +} + +void MergeJoin::innerJoinEquals(const Block & left_block, const Block & right_block, + MutableColumns & left_columns, MutableColumns & right_columns, const Range & range) +{ + bool any = table_join.strictness() == ASTTableJoin::Strictness::Any; + + size_t left_rows_to_insert = range.left_length; + size_t right_rows_to_insert = any ? 1 : range.right_length; + + size_t row_position = range.right_start; + for (size_t right_row = 0; right_row < right_rows_to_insert; ++right_row, ++row_position) + { + for (size_t i = 0; i < left_block.columns(); ++i) + { + const auto & src_column = left_block.getByPosition(i); + auto & dst_column = left_columns[i]; + + size_t row_pos = range.left_start; + for (size_t row = 0; row < left_rows_to_insert; ++row, ++row_pos) + dst_column->insertFrom(*src_column.column, row_pos); + } + + for (size_t i = 0; i < right_columns_to_add.columns(); ++i) + { + const auto & src_column = right_block.getByName(right_columns_to_add.getByPosition(i).name); + auto & dst_column = right_columns[i]; + + for (size_t row = 0; row < left_rows_to_insert; ++row) + dst_column->insertFrom(*src_column.column, row_position); + } } } diff --git a/dbms/src/Interpreters/MergeJoin.h b/dbms/src/Interpreters/MergeJoin.h index 55b30f04947..26423395421 100644 --- a/dbms/src/Interpreters/MergeJoin.h +++ b/dbms/src/Interpreters/MergeJoin.h @@ -41,14 +41,18 @@ private: size_t right_blocks_row_count = 0; size_t right_blocks_bytes = 0; - MutableColumns makeRightColumns(size_t rows); + MutableColumns makeMutableColumns(const Block & block); void appendRightColumns(Block & block, MutableColumns && right_columns); void mergeRightBlocks(); void leftJoin(MergeJoinCursor & left_cursor, const Block & right_block, MutableColumns & right_columns); + void innerJoin(MergeJoinCursor & left_cursor, const Block & left_block, const Block & right_block, + MutableColumns & left_columns, MutableColumns & right_columns); void appendRightNulls(MutableColumns & right_columns, size_t rows_to_add); - void anyLeftJoinEquals(const Block & right_block, MutableColumns & right_columns, const MergeJoinEqualRange & range); + void leftJoinEquals(const Block & right_block, MutableColumns & right_columns, const MergeJoinEqualRange & range); + void innerJoinEquals(const Block & left_block, const Block & right_block, + MutableColumns & left_columns, MutableColumns & right_columns, const MergeJoinEqualRange & range); }; }