2022-04-03 15:54:22 +00:00
|
|
|
#include <cstddef>
|
2022-04-05 10:12:42 +00:00
|
|
|
#include <memory>
|
2022-04-05 10:14:42 +00:00
|
|
|
#include <type_traits>
|
2022-04-03 15:54:22 +00:00
|
|
|
#include <vector>
|
2022-03-30 10:07:09 +00:00
|
|
|
#include <Processors/Transforms/MergeJoinTransform.h>
|
|
|
|
#include <base/logger_useful.h>
|
|
|
|
#include <IO/WriteHelpers.h>
|
2022-04-03 15:54:22 +00:00
|
|
|
#include <Interpreters/TableJoin.h>
|
|
|
|
#include <Core/SortDescription.h>
|
2022-04-05 10:12:42 +00:00
|
|
|
#include <boost/core/noncopyable.hpp>
|
2022-04-05 10:14:42 +00:00
|
|
|
#include <Columns/ColumnsNumber.h>
|
|
|
|
#include <Columns/IColumn.h>
|
|
|
|
#include <Columns/ColumnNullable.h>
|
|
|
|
#include <Core/SortCursor.h>
|
|
|
|
#include <Parsers/ASTTablesInSelectQuery.h>
|
|
|
|
#include <base/defines.h>
|
|
|
|
#include <base/types.h>
|
2022-03-30 10:07:09 +00:00
|
|
|
|
|
|
|
|
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
|
|
|
|
namespace ErrorCodes
|
|
|
|
{
|
|
|
|
extern const int NOT_IMPLEMENTED;
|
2022-04-03 15:54:22 +00:00
|
|
|
extern const int TOO_MANY_ROWS;
|
2022-03-30 10:07:09 +00:00
|
|
|
}
|
|
|
|
|
2022-04-03 15:54:22 +00:00
|
|
|
namespace
|
2022-04-01 18:20:58 +00:00
|
|
|
{
|
2022-04-03 15:54:22 +00:00
|
|
|
|
2022-04-07 16:14:56 +00:00
|
|
|
constexpr size_t EMPTY_VALUE_IDX = std::numeric_limits<size_t>::max();
|
2022-04-03 15:54:22 +00:00
|
|
|
|
2022-04-05 10:12:42 +00:00
|
|
|
FullMergeJoinCursor createCursor(const Block & block, const Names & columns)
|
2022-04-03 15:54:22 +00:00
|
|
|
{
|
2022-04-05 10:12:42 +00:00
|
|
|
SortDescription desc;
|
|
|
|
desc.reserve(columns.size());
|
2022-04-03 15:54:22 +00:00
|
|
|
for (const auto & name : columns)
|
2022-04-06 14:36:51 +00:00
|
|
|
desc.emplace_back(name);
|
|
|
|
return FullMergeJoinCursor(block, desc);
|
2022-04-01 18:20:58 +00:00
|
|
|
}
|
|
|
|
|
2022-04-05 10:14:42 +00:00
|
|
|
template <bool has_left_nulls, bool has_right_nulls>
|
|
|
|
int nullableCompareAt(const IColumn & left_column, const IColumn & right_column, size_t lhs_pos, size_t rhs_pos, int null_direction_hint = 1)
|
|
|
|
{
|
|
|
|
if constexpr (has_left_nulls && has_right_nulls)
|
|
|
|
{
|
|
|
|
const auto * left_nullable = checkAndGetColumn<ColumnNullable>(left_column);
|
|
|
|
const auto * right_nullable = checkAndGetColumn<ColumnNullable>(right_column);
|
|
|
|
|
|
|
|
if (left_nullable && right_nullable)
|
|
|
|
{
|
|
|
|
int res = left_column.compareAt(lhs_pos, rhs_pos, right_column, null_direction_hint);
|
|
|
|
if (res)
|
|
|
|
return res;
|
|
|
|
|
|
|
|
/// NULL != NULL case
|
|
|
|
if (left_column.isNullAt(lhs_pos))
|
|
|
|
return null_direction_hint;
|
|
|
|
|
|
|
|
return 0;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if constexpr (has_left_nulls)
|
|
|
|
{
|
|
|
|
if (const auto * left_nullable = checkAndGetColumn<ColumnNullable>(left_column))
|
|
|
|
{
|
|
|
|
if (left_column.isNullAt(lhs_pos))
|
|
|
|
return null_direction_hint;
|
|
|
|
return left_nullable->getNestedColumn().compareAt(lhs_pos, rhs_pos, right_column, null_direction_hint);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if constexpr (has_right_nulls)
|
|
|
|
{
|
|
|
|
if (const auto * right_nullable = checkAndGetColumn<ColumnNullable>(right_column))
|
|
|
|
{
|
|
|
|
if (right_column.isNullAt(rhs_pos))
|
|
|
|
return -null_direction_hint;
|
|
|
|
return left_column.compareAt(lhs_pos, rhs_pos, right_nullable->getNestedColumn(), null_direction_hint);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return left_column.compareAt(lhs_pos, rhs_pos, right_column, null_direction_hint);
|
|
|
|
}
|
|
|
|
|
|
|
|
/// If on_pos == true, compare two columns at specified positions.
|
|
|
|
/// Otherwise, compare two columns at the current positions, `lpos` and `rpos` are ignored.
|
|
|
|
template <typename Cursor, bool on_pos = false>
|
|
|
|
int ALWAYS_INLINE compareCursors(const Cursor & lhs, const Cursor & rhs,
|
|
|
|
[[ maybe_unused ]] size_t lpos = 0,
|
|
|
|
[[ maybe_unused ]] size_t rpos = 0)
|
2022-04-01 18:20:58 +00:00
|
|
|
{
|
2022-04-03 15:54:22 +00:00
|
|
|
for (size_t i = 0; i < lhs->sort_columns_size; ++i)
|
2022-04-01 18:20:58 +00:00
|
|
|
{
|
2022-04-03 15:54:22 +00:00
|
|
|
const auto & desc = lhs->desc[i];
|
|
|
|
int direction = desc.direction;
|
|
|
|
int nulls_direction = desc.nulls_direction;
|
2022-04-05 10:14:42 +00:00
|
|
|
|
|
|
|
int cmp = direction * nullableCompareAt<true, true>(
|
|
|
|
*lhs->sort_columns[i],
|
|
|
|
*rhs->sort_columns[i],
|
|
|
|
on_pos ? lpos : lhs->getRow(),
|
|
|
|
on_pos ? rpos : rhs->getRow(),
|
|
|
|
nulls_direction);
|
|
|
|
if (cmp != 0)
|
|
|
|
return cmp;
|
2022-04-01 18:20:58 +00:00
|
|
|
}
|
2022-04-03 15:54:22 +00:00
|
|
|
return 0;
|
|
|
|
}
|
|
|
|
|
2022-04-05 10:14:42 +00:00
|
|
|
bool ALWAYS_INLINE totallyLess(const FullMergeJoinCursor & lhs, const FullMergeJoinCursor & rhs)
|
|
|
|
{
|
|
|
|
if (lhs->rows == 0 || rhs->rows == 0)
|
|
|
|
return false;
|
|
|
|
|
|
|
|
if (!lhs->isValid() || !rhs->isValid())
|
|
|
|
return false;
|
|
|
|
|
|
|
|
/// The last row of this cursor is no larger than the first row of the another cursor.
|
|
|
|
int cmp = compareCursors<FullMergeJoinCursor, true>(lhs, rhs, lhs->rows - 1, 0);
|
|
|
|
return cmp < 0;
|
|
|
|
}
|
|
|
|
|
2022-04-06 14:36:51 +00:00
|
|
|
int ALWAYS_INLINE totallyCompare(const FullMergeJoinCursor & lhs, const FullMergeJoinCursor & rhs)
|
|
|
|
{
|
|
|
|
if (totallyLess(lhs, rhs))
|
|
|
|
return -1;
|
|
|
|
if (totallyLess(rhs, lhs))
|
|
|
|
return 1;
|
|
|
|
return 0;
|
|
|
|
}
|
|
|
|
|
2022-04-07 16:14:56 +00:00
|
|
|
void addIndexColumn(const Columns & columns, ColumnUInt64 & indices, Chunk & result, size_t start, size_t limit)
|
2022-04-05 10:12:42 +00:00
|
|
|
{
|
|
|
|
for (const auto & col : columns)
|
|
|
|
{
|
|
|
|
if (indices.empty())
|
|
|
|
{
|
2022-04-07 16:14:56 +00:00
|
|
|
result.addColumn(col->cut(start, limit));
|
2022-04-05 10:12:42 +00:00
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
2022-04-07 16:14:56 +00:00
|
|
|
if (limit == 0)
|
|
|
|
limit = indices.size();
|
|
|
|
|
|
|
|
assert(limit == indices.size());
|
|
|
|
|
2022-04-05 10:12:42 +00:00
|
|
|
auto tmp_col = col->cloneResized(col->size() + 1);
|
2022-04-07 16:14:56 +00:00
|
|
|
ColumnPtr new_col = tmp_col->index(indices, limit);
|
2022-04-05 10:12:42 +00:00
|
|
|
result.addColumn(std::move(new_col));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-04-03 15:54:22 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
MergeJoinAlgorithm::MergeJoinAlgorithm(
|
2022-04-05 10:12:42 +00:00
|
|
|
JoinPtr table_join_,
|
2022-04-03 15:54:22 +00:00
|
|
|
const Blocks & input_headers)
|
|
|
|
: table_join(table_join_)
|
|
|
|
, log(&Poco::Logger::get("MergeJoinAlgorithm"))
|
|
|
|
{
|
|
|
|
if (input_headers.size() != 2)
|
|
|
|
throw Exception("MergeJoinAlgorithm requires exactly two inputs", ErrorCodes::LOGICAL_ERROR);
|
|
|
|
|
2022-04-05 10:12:42 +00:00
|
|
|
if (table_join->getTableJoin().strictness() != ASTTableJoin::Strictness::Any)
|
2022-04-03 15:54:22 +00:00
|
|
|
throw Exception("MergeJoinAlgorithm is not implemented for strictness != ANY", ErrorCodes::NOT_IMPLEMENTED);
|
|
|
|
|
2022-04-05 10:12:42 +00:00
|
|
|
const auto & join_on = table_join->getTableJoin().getOnlyClause();
|
2022-04-03 15:54:22 +00:00
|
|
|
|
|
|
|
cursors.push_back(createCursor(input_headers[0], join_on.key_names_left));
|
|
|
|
cursors.push_back(createCursor(input_headers[1], join_on.key_names_right));
|
|
|
|
}
|
|
|
|
|
2022-04-05 10:12:42 +00:00
|
|
|
|
2022-04-07 16:14:56 +00:00
|
|
|
static void copyColumnsResized(const Chunk & chunk, size_t start, size_t size, Chunk & result_chunk)
|
2022-04-05 10:12:42 +00:00
|
|
|
{
|
|
|
|
const auto & cols = chunk.getColumns();
|
|
|
|
for (const auto & col : cols)
|
|
|
|
{
|
2022-04-07 16:14:56 +00:00
|
|
|
if (!start || start > col->size())
|
|
|
|
result_chunk.addColumn(col->cloneResized(size));
|
|
|
|
else
|
|
|
|
{
|
|
|
|
assert(size <= col->size());
|
|
|
|
result_chunk.addColumn(col->cut(start, size));
|
|
|
|
}
|
2022-04-05 10:12:42 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-04-03 15:54:22 +00:00
|
|
|
void MergeJoinAlgorithm::initialize(Inputs inputs)
|
|
|
|
{
|
|
|
|
if (inputs.size() != 2)
|
|
|
|
throw Exception("MergeJoinAlgorithm requires exactly two inputs", ErrorCodes::LOGICAL_ERROR);
|
2022-04-07 16:14:56 +00:00
|
|
|
|
2022-04-03 15:54:22 +00:00
|
|
|
LOG_DEBUG(log, "MergeJoinAlgorithm initialize, number of inputs: {}", inputs.size());
|
2022-04-05 10:14:42 +00:00
|
|
|
|
2022-04-03 15:54:22 +00:00
|
|
|
for (size_t i = 0; i < inputs.size(); ++i)
|
2022-04-05 10:12:42 +00:00
|
|
|
{
|
2022-04-07 16:14:56 +00:00
|
|
|
copyColumnsResized(inputs[i].chunk, 0, 0, sample_chunks.emplace_back());
|
2022-04-03 15:54:22 +00:00
|
|
|
consume(inputs[i], i);
|
2022-04-05 10:12:42 +00:00
|
|
|
}
|
2022-04-01 18:20:58 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
static void prepareChunk(Chunk & chunk)
|
|
|
|
{
|
|
|
|
auto num_rows = chunk.getNumRows();
|
|
|
|
auto columns = chunk.detachColumns();
|
|
|
|
for (auto & column : columns)
|
|
|
|
column = column->convertToFullColumnIfConst();
|
|
|
|
|
|
|
|
chunk.setColumns(std::move(columns), num_rows);
|
|
|
|
}
|
|
|
|
|
|
|
|
void MergeJoinAlgorithm::consume(Input & input, size_t source_num)
|
|
|
|
{
|
2022-04-07 16:14:56 +00:00
|
|
|
LOG_DEBUG(log, "TODO: remove. Consume from {} chunk: {}", source_num, bool(input.chunk));
|
2022-04-03 15:54:22 +00:00
|
|
|
|
2022-04-01 18:20:58 +00:00
|
|
|
prepareChunk(input.chunk);
|
2022-04-03 15:54:22 +00:00
|
|
|
|
2022-04-07 16:14:56 +00:00
|
|
|
if (input.chunk.getNumRows() >= EMPTY_VALUE_IDX)
|
2022-04-03 15:54:22 +00:00
|
|
|
throw Exception("Too many rows in input", ErrorCodes::TOO_MANY_ROWS);
|
|
|
|
|
2022-04-05 10:14:42 +00:00
|
|
|
cursors[source_num].setInput(std::move(input));
|
2022-04-05 10:12:42 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
using JoinKind = ASTTableJoin::Kind;
|
|
|
|
|
|
|
|
template <JoinKind kind>
|
2022-04-07 16:14:56 +00:00
|
|
|
static void anyJoin(FullMergeJoinCursor & left_cursor, FullMergeJoinCursor & right_cursor, PaddedPODArray<UInt64> & left_map, PaddedPODArray<UInt64> & right_map)
|
2022-04-05 10:12:42 +00:00
|
|
|
{
|
|
|
|
static_assert(kind == JoinKind::Left || kind == JoinKind::Right || kind == JoinKind::Inner, "Invalid join kind");
|
|
|
|
|
2022-04-06 14:36:51 +00:00
|
|
|
size_t num_rows = kind == JoinKind::Left ? left_cursor->rowsLeft() :
|
|
|
|
kind == JoinKind::Right ? right_cursor->rowsLeft() :
|
|
|
|
std::min(left_cursor->rowsLeft(), right_cursor->rowsLeft());
|
2022-04-05 10:12:42 +00:00
|
|
|
|
|
|
|
constexpr bool is_left_or_inner = kind == JoinKind::Left || kind == JoinKind::Inner;
|
|
|
|
constexpr bool is_right_or_inner = kind == JoinKind::Right || kind == JoinKind::Inner;
|
|
|
|
|
|
|
|
if constexpr (is_left_or_inner)
|
|
|
|
right_map.reserve(num_rows);
|
|
|
|
|
|
|
|
if constexpr (is_right_or_inner)
|
|
|
|
left_map.reserve(num_rows);
|
|
|
|
|
|
|
|
while (left_cursor->isValid() && right_cursor->isValid())
|
|
|
|
{
|
|
|
|
int cmp = compareCursors(left_cursor, right_cursor);
|
|
|
|
if (cmp == 0)
|
|
|
|
{
|
|
|
|
if constexpr (is_left_or_inner)
|
|
|
|
right_map.emplace_back(right_cursor->getRow());
|
|
|
|
|
|
|
|
if constexpr (is_right_or_inner)
|
|
|
|
left_map.emplace_back(left_cursor->getRow());
|
|
|
|
|
|
|
|
if constexpr (is_left_or_inner)
|
|
|
|
left_cursor->next();
|
|
|
|
|
|
|
|
if constexpr (is_right_or_inner)
|
|
|
|
right_cursor->next();
|
|
|
|
|
|
|
|
}
|
|
|
|
else if (cmp < 0)
|
|
|
|
{
|
|
|
|
if constexpr (kind == JoinKind::Left)
|
|
|
|
right_map.emplace_back(right_cursor->rows);
|
|
|
|
left_cursor->next();
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
if constexpr (kind == JoinKind::Right)
|
|
|
|
left_map.emplace_back(left_cursor->rows);
|
|
|
|
right_cursor->next();
|
|
|
|
}
|
|
|
|
}
|
2022-04-01 18:20:58 +00:00
|
|
|
}
|
|
|
|
|
2022-04-07 16:14:56 +00:00
|
|
|
static Chunk createBlockWithDefaults(const Chunk & lhs, const Chunk & rhs, size_t start, size_t num_rows)
|
2022-04-01 18:20:58 +00:00
|
|
|
{
|
2022-04-05 10:14:42 +00:00
|
|
|
Chunk result;
|
2022-04-07 16:14:56 +00:00
|
|
|
copyColumnsResized(lhs, start, num_rows, result);
|
|
|
|
copyColumnsResized(rhs, start, num_rows, result);
|
2022-04-05 10:14:42 +00:00
|
|
|
return result;
|
|
|
|
}
|
2022-04-05 10:12:42 +00:00
|
|
|
|
2022-04-07 16:14:56 +00:00
|
|
|
static Chunk createBlockWithDefaults(const Chunk & lhs, FullMergeJoinCursor & rhs)
|
|
|
|
{
|
|
|
|
auto res = createBlockWithDefaults(lhs, rhs.getCurrentChunk(), rhs->getRow(), rhs->rowsLeft());
|
|
|
|
rhs.reset();
|
|
|
|
return res;
|
|
|
|
}
|
|
|
|
|
|
|
|
static Chunk createBlockWithDefaults(FullMergeJoinCursor & lhs, const Chunk & rhs)
|
|
|
|
{
|
|
|
|
auto res = createBlockWithDefaults(lhs.getCurrentChunk(), rhs, lhs->getRow(), lhs->rowsLeft());
|
|
|
|
lhs.reset();
|
|
|
|
return res;
|
|
|
|
}
|
|
|
|
|
2022-04-06 14:36:51 +00:00
|
|
|
static bool isFinished(const std::vector<FullMergeJoinCursor> & cursors, JoinKind kind)
|
|
|
|
{
|
|
|
|
return (cursors[0].fullyCompleted() && cursors[1].fullyCompleted())
|
|
|
|
|| ((isLeft(kind) || isInner(kind)) && cursors[0].fullyCompleted())
|
|
|
|
|| ((isRight(kind) || isInner(kind)) && cursors[1].fullyCompleted());
|
|
|
|
}
|
|
|
|
|
2022-04-05 10:14:42 +00:00
|
|
|
IMergingAlgorithm::Status MergeJoinAlgorithm::merge()
|
|
|
|
{
|
2022-04-07 16:14:56 +00:00
|
|
|
if (!cursors[0]->isValid() && !cursors[0].fullyCompleted())
|
2022-04-06 14:36:51 +00:00
|
|
|
{
|
2022-04-01 18:20:58 +00:00
|
|
|
return Status(0);
|
2022-04-06 14:36:51 +00:00
|
|
|
}
|
2022-04-03 15:54:22 +00:00
|
|
|
|
2022-04-07 16:14:56 +00:00
|
|
|
if (!cursors[1]->isValid() && !cursors[1].fullyCompleted())
|
2022-04-06 14:36:51 +00:00
|
|
|
{
|
2022-04-01 18:20:58 +00:00
|
|
|
return Status(1);
|
2022-04-06 14:36:51 +00:00
|
|
|
}
|
2022-04-01 18:20:58 +00:00
|
|
|
|
2022-04-05 10:12:42 +00:00
|
|
|
JoinKind kind = table_join->getTableJoin().kind();
|
|
|
|
|
2022-04-06 14:36:51 +00:00
|
|
|
if (isFinished(cursors, kind))
|
2022-04-03 15:54:22 +00:00
|
|
|
{
|
|
|
|
return Status({}, true);
|
|
|
|
}
|
|
|
|
|
2022-04-05 10:14:42 +00:00
|
|
|
if (cursors[0].fullyCompleted() && isRightOrFull(kind))
|
2022-04-05 10:12:42 +00:00
|
|
|
{
|
2022-04-07 16:14:56 +00:00
|
|
|
Chunk result = createBlockWithDefaults(sample_chunks[0], cursors[1]);
|
2022-04-05 10:14:42 +00:00
|
|
|
return Status(std::move(result));
|
2022-04-05 10:12:42 +00:00
|
|
|
}
|
|
|
|
|
2022-04-05 10:14:42 +00:00
|
|
|
if (isLeftOrFull(kind) && cursors[1].fullyCompleted())
|
2022-04-05 10:12:42 +00:00
|
|
|
{
|
2022-04-07 16:14:56 +00:00
|
|
|
Chunk result = createBlockWithDefaults(cursors[0], sample_chunks[1]);
|
2022-04-05 10:14:42 +00:00
|
|
|
return Status(std::move(result));
|
2022-04-05 10:12:42 +00:00
|
|
|
}
|
|
|
|
|
2022-04-06 14:36:51 +00:00
|
|
|
if (int cmp = totallyCompare(cursors[0], cursors[1]); cmp != 0)
|
2022-04-03 15:54:22 +00:00
|
|
|
{
|
2022-04-06 14:36:51 +00:00
|
|
|
if (cmp < 0)
|
|
|
|
{
|
2022-04-07 16:14:56 +00:00
|
|
|
if (cursors[0]->isValid() && isLeftOrFull(kind))
|
|
|
|
{
|
|
|
|
return Status(createBlockWithDefaults(cursors[0], sample_chunks[1]));
|
|
|
|
}
|
|
|
|
cursors[0].reset();
|
2022-04-06 14:36:51 +00:00
|
|
|
return Status(0);
|
|
|
|
}
|
|
|
|
|
|
|
|
if (cmp > 0)
|
|
|
|
{
|
2022-04-07 16:14:56 +00:00
|
|
|
if (isRightOrFull(kind) && cursors[1]->isValid())
|
|
|
|
{
|
|
|
|
return Status(createBlockWithDefaults(sample_chunks[0], cursors[1]));
|
|
|
|
}
|
|
|
|
cursors[1].reset();
|
2022-04-06 14:36:51 +00:00
|
|
|
return Status(1);
|
|
|
|
}
|
|
|
|
|
|
|
|
if (!isInner(kind) && !isLeft(kind) && !isRight(kind) && !isFull(kind))
|
|
|
|
throw DB::Exception(ErrorCodes::NOT_IMPLEMENTED, "Not implemented for kind {}", kind);
|
|
|
|
}
|
2022-04-03 15:54:22 +00:00
|
|
|
|
|
|
|
auto left_map = ColumnUInt64::create();
|
|
|
|
auto right_map = ColumnUInt64::create();
|
2022-04-07 16:14:56 +00:00
|
|
|
std::pair<size_t, size_t> prev_pos = std::make_pair(cursors[0]->getRow(), cursors[1]->getRow());
|
2022-04-05 10:12:42 +00:00
|
|
|
if (isInner(kind))
|
2022-04-03 15:54:22 +00:00
|
|
|
{
|
2022-04-07 16:14:56 +00:00
|
|
|
anyJoin<JoinKind::Inner>(cursors[0], cursors[1], left_map->getData(), right_map->getData());
|
2022-04-03 15:54:22 +00:00
|
|
|
}
|
2022-04-05 10:12:42 +00:00
|
|
|
else if (isLeft(kind))
|
2022-04-01 18:20:58 +00:00
|
|
|
{
|
2022-04-07 16:14:56 +00:00
|
|
|
anyJoin<JoinKind::Left>(cursors[0], cursors[1], left_map->getData(), right_map->getData());
|
2022-04-01 18:20:58 +00:00
|
|
|
}
|
2022-04-05 10:12:42 +00:00
|
|
|
else if (isRight(kind))
|
2022-04-01 18:20:58 +00:00
|
|
|
{
|
2022-04-07 16:14:56 +00:00
|
|
|
anyJoin<JoinKind::Right>(cursors[0], cursors[1], left_map->getData(), right_map->getData());
|
2022-04-01 18:20:58 +00:00
|
|
|
}
|
2022-04-05 10:12:42 +00:00
|
|
|
else
|
2022-04-03 15:54:22 +00:00
|
|
|
{
|
2022-04-05 10:12:42 +00:00
|
|
|
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Unsupported join kind: \"{}\"", table_join->getTableJoin().kind());
|
2022-04-03 15:54:22 +00:00
|
|
|
}
|
|
|
|
|
2022-04-06 14:36:51 +00:00
|
|
|
assert(left_map->empty() || right_map->empty() || left_map->size() == right_map->size());
|
|
|
|
|
2022-04-05 10:12:42 +00:00
|
|
|
Chunk result;
|
2022-04-07 16:14:56 +00:00
|
|
|
size_t num_result_rows = std::max(left_map->size(), right_map->size());
|
|
|
|
addIndexColumn(cursors[0].getCurrentChunk().getColumns(), *left_map, result, prev_pos.first, num_result_rows);
|
|
|
|
addIndexColumn(cursors[1].getCurrentChunk().getColumns(), *right_map, result, prev_pos.second, num_result_rows);
|
2022-04-06 14:36:51 +00:00
|
|
|
return Status(std::move(result), isFinished(cursors, kind));
|
2022-04-01 18:20:58 +00:00
|
|
|
}
|
|
|
|
|
2022-03-30 10:07:09 +00:00
|
|
|
MergeJoinTransform::MergeJoinTransform(
|
2022-04-05 10:12:42 +00:00
|
|
|
JoinPtr table_join,
|
2022-03-30 10:07:09 +00:00
|
|
|
const Blocks & input_headers,
|
|
|
|
const Block & output_header,
|
|
|
|
UInt64 limit_hint)
|
2022-04-03 15:54:22 +00:00
|
|
|
: IMergingTransform<MergeJoinAlgorithm>(input_headers, output_header, true, limit_hint, table_join, input_headers)
|
2022-03-30 10:07:09 +00:00
|
|
|
, log(&Poco::Logger::get("MergeJoinTransform"))
|
|
|
|
{
|
|
|
|
LOG_TRACE(log, "Will use MergeJoinTransform");
|
|
|
|
}
|
|
|
|
|
|
|
|
void MergeJoinTransform::onFinish()
|
|
|
|
{
|
2022-04-07 16:14:56 +00:00
|
|
|
LOG_TRACE(log, "TODO: remove onFinish");
|
2022-03-30 10:07:09 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
}
|