2022-04-12 13:23:36 +00:00
|
|
|
#include <cassert>
|
2022-04-03 15:54:22 +00:00
|
|
|
#include <cstddef>
|
2022-04-14 12:30:34 +00:00
|
|
|
#include <limits>
|
2022-04-05 10:12:42 +00:00
|
|
|
#include <memory>
|
2022-04-12 13:23:36 +00:00
|
|
|
#include <optional>
|
2022-04-05 10:14:42 +00:00
|
|
|
#include <type_traits>
|
2022-04-03 15:54:22 +00:00
|
|
|
#include <vector>
|
2022-04-26 13:51:33 +00:00
|
|
|
|
|
|
|
#include <base/defines.h>
|
2022-03-30 10:07:09 +00:00
|
|
|
#include <base/logger_useful.h>
|
2022-04-26 13:51:33 +00:00
|
|
|
#include <base/types.h>
|
2022-04-05 10:12:42 +00:00
|
|
|
#include <boost/core/noncopyable.hpp>
|
2022-04-26 13:51:33 +00:00
|
|
|
|
|
|
|
#include <Columns/ColumnNullable.h>
|
2022-04-05 10:14:42 +00:00
|
|
|
#include <Columns/ColumnsNumber.h>
|
|
|
|
#include <Columns/IColumn.h>
|
|
|
|
#include <Core/SortCursor.h>
|
2022-04-26 13:51:33 +00:00
|
|
|
#include <Core/SortDescription.h>
|
|
|
|
#include <IO/WriteHelpers.h>
|
|
|
|
#include <Interpreters/TableJoin.h>
|
2022-04-05 10:14:42 +00:00
|
|
|
#include <Parsers/ASTTablesInSelectQuery.h>
|
2022-04-26 13:51:33 +00:00
|
|
|
#include <Processors/Transforms/MergeJoinTransform.h>
|
2022-03-30 10:07:09 +00:00
|
|
|
|
|
|
|
|
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
|
|
|
|
namespace ErrorCodes
|
|
|
|
{
|
|
|
|
extern const int NOT_IMPLEMENTED;
|
2022-04-08 10:04:52 +00:00
|
|
|
extern const int LOGICAL_ERROR;
|
2022-03-30 10:07:09 +00:00
|
|
|
}
|
|
|
|
|
2022-04-12 13:23:36 +00:00
|
|
|
using JoinKind = ASTTableJoin::Kind;
|
|
|
|
|
|
|
|
namespace
|
|
|
|
{
|
|
|
|
|
|
|
|
FullMergeJoinCursorPtr createCursor(const Block & block, const Names & columns)
|
|
|
|
{
|
|
|
|
SortDescription desc;
|
|
|
|
desc.reserve(columns.size());
|
|
|
|
for (const auto & name : columns)
|
|
|
|
desc.emplace_back(name);
|
2022-04-26 20:22:57 +00:00
|
|
|
return std::make_unique<FullMergeJoinCursor>(materializeBlock(block), desc);
|
2022-04-12 13:23:36 +00:00
|
|
|
}
|
|
|
|
|
2022-04-05 10:14:42 +00:00
|
|
|
template <bool has_left_nulls, bool has_right_nulls>
|
2022-04-12 13:23:36 +00:00
|
|
|
int nullableCompareAt(const IColumn & left_column, const IColumn & right_column, size_t lhs_pos, size_t rhs_pos, int null_direction_hint = 1)
|
2022-04-05 10:14:42 +00:00
|
|
|
{
|
|
|
|
if constexpr (has_left_nulls && has_right_nulls)
|
|
|
|
{
|
|
|
|
const auto * left_nullable = checkAndGetColumn<ColumnNullable>(left_column);
|
|
|
|
const auto * right_nullable = checkAndGetColumn<ColumnNullable>(right_column);
|
|
|
|
|
|
|
|
if (left_nullable && right_nullable)
|
|
|
|
{
|
|
|
|
int res = left_column.compareAt(lhs_pos, rhs_pos, right_column, null_direction_hint);
|
|
|
|
if (res)
|
|
|
|
return res;
|
|
|
|
|
|
|
|
/// NULL != NULL case
|
|
|
|
if (left_column.isNullAt(lhs_pos))
|
|
|
|
return null_direction_hint;
|
|
|
|
|
|
|
|
return 0;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if constexpr (has_left_nulls)
|
|
|
|
{
|
|
|
|
if (const auto * left_nullable = checkAndGetColumn<ColumnNullable>(left_column))
|
|
|
|
{
|
|
|
|
if (left_column.isNullAt(lhs_pos))
|
|
|
|
return null_direction_hint;
|
|
|
|
return left_nullable->getNestedColumn().compareAt(lhs_pos, rhs_pos, right_column, null_direction_hint);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if constexpr (has_right_nulls)
|
|
|
|
{
|
|
|
|
if (const auto * right_nullable = checkAndGetColumn<ColumnNullable>(right_column))
|
|
|
|
{
|
|
|
|
if (right_column.isNullAt(rhs_pos))
|
|
|
|
return -null_direction_hint;
|
|
|
|
return left_column.compareAt(lhs_pos, rhs_pos, right_nullable->getNestedColumn(), null_direction_hint);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return left_column.compareAt(lhs_pos, rhs_pos, right_column, null_direction_hint);
|
|
|
|
}
|
|
|
|
|
2022-04-12 13:23:36 +00:00
|
|
|
int ALWAYS_INLINE compareCursors(const SortCursorImpl & lhs, size_t lpos,
|
|
|
|
const SortCursorImpl & rhs, size_t rpos)
|
|
|
|
{
|
|
|
|
for (size_t i = 0; i < lhs.sort_columns_size; ++i)
|
|
|
|
{
|
2022-04-26 13:51:33 +00:00
|
|
|
/// TODO(@vdimir): use nullableCompareAt only if there's nullable columns
|
|
|
|
int cmp = nullableCompareAt<true, true>(*lhs.sort_columns[i], *rhs.sort_columns[i], lpos, rpos);
|
2022-04-12 13:23:36 +00:00
|
|
|
if (cmp != 0)
|
|
|
|
return cmp;
|
|
|
|
}
|
|
|
|
return 0;
|
|
|
|
}
|
|
|
|
|
|
|
|
int ALWAYS_INLINE compareCursors(const SortCursorImpl & lhs, const SortCursorImpl & rhs)
|
2022-04-08 15:05:38 +00:00
|
|
|
{
|
2022-04-12 13:23:36 +00:00
|
|
|
return compareCursors(lhs, lhs.getRow(), rhs, rhs.getRow());
|
2022-04-08 15:05:38 +00:00
|
|
|
}
|
|
|
|
|
2022-04-14 12:30:34 +00:00
|
|
|
bool ALWAYS_INLINE totallyLess(SortCursorImpl & lhs, SortCursorImpl & rhs)
|
2022-04-08 15:05:38 +00:00
|
|
|
{
|
2022-04-25 21:29:23 +00:00
|
|
|
/// The last row of left cursor is less than the current row of the right cursor.
|
2022-04-15 10:01:08 +00:00
|
|
|
int cmp = compareCursors(lhs, lhs.rows - 1, rhs, rhs.getRow());
|
2022-04-12 13:23:36 +00:00
|
|
|
return cmp < 0;
|
|
|
|
}
|
|
|
|
|
2022-04-14 12:30:34 +00:00
|
|
|
int ALWAYS_INLINE totallyCompare(SortCursorImpl & lhs, SortCursorImpl & rhs)
|
2022-04-12 13:23:36 +00:00
|
|
|
{
|
|
|
|
if (totallyLess(lhs, rhs))
|
|
|
|
return -1;
|
|
|
|
if (totallyLess(rhs, lhs))
|
|
|
|
return 1;
|
|
|
|
return 0;
|
|
|
|
}
|
|
|
|
|
|
|
|
void addIndexColumn(const Columns & columns, ColumnUInt64 & indices, Chunk & result, size_t start, size_t limit)
|
|
|
|
{
|
|
|
|
for (const auto & col : columns)
|
2022-04-08 15:05:38 +00:00
|
|
|
{
|
2022-04-12 13:23:36 +00:00
|
|
|
if (indices.empty())
|
|
|
|
{
|
|
|
|
result.addColumn(col->cut(start, limit));
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
if (limit == 0)
|
|
|
|
limit = indices.size();
|
|
|
|
|
|
|
|
assert(limit == indices.size());
|
2022-04-26 14:55:20 +00:00
|
|
|
/// rows where default value should be inserted have index == size
|
2022-04-26 14:32:48 +00:00
|
|
|
/// add row with defaults to handle it
|
2022-04-12 13:23:36 +00:00
|
|
|
auto tmp_col = col->cloneResized(col->size() + 1);
|
|
|
|
ColumnPtr new_col = tmp_col->index(indices, limit);
|
|
|
|
result.addColumn(std::move(new_col));
|
|
|
|
}
|
2022-04-08 15:05:38 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-04-25 21:29:23 +00:00
|
|
|
bool sameNext(const SortCursorImpl & impl, std::optional<size_t> pos_opt = {})
|
2022-04-08 15:05:38 +00:00
|
|
|
{
|
2022-04-25 21:29:23 +00:00
|
|
|
size_t pos = pos_opt.value_or(impl.getRow());
|
2022-04-08 15:05:38 +00:00
|
|
|
for (size_t i = 0; i < impl.sort_columns_size; ++i)
|
|
|
|
{
|
|
|
|
const auto & col = *impl.sort_columns[i];
|
2022-04-15 09:46:44 +00:00
|
|
|
if (auto cmp = col.compareAt(pos, pos + 1, col, impl.desc[i].nulls_direction); cmp != 0)
|
2022-04-08 15:05:38 +00:00
|
|
|
return false;
|
|
|
|
}
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
2022-04-12 13:23:36 +00:00
|
|
|
size_t nextDistinct(SortCursorImpl & impl)
|
2022-04-08 15:05:38 +00:00
|
|
|
{
|
2022-04-14 20:28:09 +00:00
|
|
|
assert(impl.isValid());
|
2022-04-08 15:05:38 +00:00
|
|
|
size_t start_pos = impl.getRow();
|
2022-04-14 12:30:34 +00:00
|
|
|
while (!impl.isLast() && sameNext(impl))
|
2022-04-08 15:05:38 +00:00
|
|
|
{
|
|
|
|
impl.next();
|
|
|
|
}
|
|
|
|
impl.next();
|
|
|
|
|
2022-04-14 12:30:34 +00:00
|
|
|
if (impl.isValid())
|
|
|
|
return impl.getRow() - start_pos;
|
|
|
|
return impl.rows - start_pos;
|
2022-04-12 13:23:36 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
2022-04-25 21:29:23 +00:00
|
|
|
const Chunk & FullMergeJoinCursor::getCurrent() const
|
2022-04-12 13:23:36 +00:00
|
|
|
{
|
2022-04-25 21:29:23 +00:00
|
|
|
return current_chunk;
|
|
|
|
}
|
2022-04-08 15:05:38 +00:00
|
|
|
|
2022-04-25 21:29:23 +00:00
|
|
|
Chunk FullMergeJoinCursor::detach()
|
|
|
|
{
|
|
|
|
cursor = SortCursorImpl();
|
|
|
|
return std::move(current_chunk);
|
2022-04-08 15:05:38 +00:00
|
|
|
}
|
|
|
|
|
2022-04-25 21:29:23 +00:00
|
|
|
void FullMergeJoinCursor::setChunk(Chunk && chunk)
|
2022-04-08 15:05:38 +00:00
|
|
|
{
|
2022-04-14 12:30:34 +00:00
|
|
|
assert(!recieved_all_blocks);
|
2022-04-29 14:02:00 +00:00
|
|
|
assert(!cursor.isValid());
|
|
|
|
|
2022-04-14 12:30:34 +00:00
|
|
|
if (!chunk)
|
2022-04-08 15:05:38 +00:00
|
|
|
{
|
2022-04-14 12:30:34 +00:00
|
|
|
recieved_all_blocks = true;
|
2022-04-25 21:29:23 +00:00
|
|
|
detach();
|
2022-04-14 12:30:34 +00:00
|
|
|
return;
|
2022-04-08 15:05:38 +00:00
|
|
|
}
|
2022-04-12 13:23:36 +00:00
|
|
|
|
2022-04-25 21:29:23 +00:00
|
|
|
current_chunk = std::move(chunk);
|
|
|
|
cursor = SortCursorImpl(sample_block, current_chunk.getColumns(), desc);
|
2022-04-03 15:54:22 +00:00
|
|
|
}
|
|
|
|
|
2022-04-25 21:29:23 +00:00
|
|
|
bool FullMergeJoinCursor::fullyCompleted() const
|
2022-04-12 13:23:36 +00:00
|
|
|
{
|
2022-04-25 21:29:23 +00:00
|
|
|
return !cursor.isValid() && recieved_all_blocks;
|
2022-04-03 15:54:22 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
MergeJoinAlgorithm::MergeJoinAlgorithm(
|
2022-04-05 10:12:42 +00:00
|
|
|
JoinPtr table_join_,
|
2022-04-25 21:29:23 +00:00
|
|
|
const Blocks & input_headers,
|
|
|
|
size_t max_block_size_)
|
2022-04-03 15:54:22 +00:00
|
|
|
: table_join(table_join_)
|
2022-04-25 21:29:23 +00:00
|
|
|
, max_block_size(max_block_size_)
|
2022-04-03 15:54:22 +00:00
|
|
|
, log(&Poco::Logger::get("MergeJoinAlgorithm"))
|
|
|
|
{
|
|
|
|
if (input_headers.size() != 2)
|
|
|
|
throw Exception("MergeJoinAlgorithm requires exactly two inputs", ErrorCodes::LOGICAL_ERROR);
|
|
|
|
|
2022-04-12 13:23:36 +00:00
|
|
|
auto strictness = table_join->getTableJoin().strictness();
|
2022-04-25 21:29:23 +00:00
|
|
|
if (strictness != ASTTableJoin::Strictness::Any && strictness != ASTTableJoin::Strictness::All)
|
|
|
|
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "MergeJoinAlgorithm is not implemented for strictness {}", strictness);
|
2022-04-12 13:23:36 +00:00
|
|
|
|
|
|
|
auto kind = table_join->getTableJoin().kind();
|
|
|
|
if (!isInner(kind) && !isLeft(kind) && !isRight(kind) && !isFull(kind))
|
|
|
|
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "MergeJoinAlgorithm is not implemented for kind {}", kind);
|
2022-04-03 15:54:22 +00:00
|
|
|
|
2022-04-05 10:12:42 +00:00
|
|
|
const auto & join_on = table_join->getTableJoin().getOnlyClause();
|
2022-04-03 15:54:22 +00:00
|
|
|
|
2022-04-26 14:55:20 +00:00
|
|
|
if (join_on.on_filter_condition_left || join_on.on_filter_condition_right)
|
|
|
|
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "MergeJoinAlgorithm does not support ON filter conditions");
|
|
|
|
|
2022-04-03 15:54:22 +00:00
|
|
|
cursors.push_back(createCursor(input_headers[0], join_on.key_names_left));
|
|
|
|
cursors.push_back(createCursor(input_headers[1], join_on.key_names_right));
|
|
|
|
}
|
|
|
|
|
2022-04-29 14:02:00 +00:00
|
|
|
static ColumnPtr replicateRow(const IColumn & column, size_t num)
|
2022-04-14 20:28:09 +00:00
|
|
|
{
|
|
|
|
MutableColumnPtr res = column.cloneEmpty();
|
2022-04-29 14:02:00 +00:00
|
|
|
res->insertManyFrom(column, 0, num);
|
2022-04-25 21:29:23 +00:00
|
|
|
return res;
|
2022-04-14 20:28:09 +00:00
|
|
|
}
|
|
|
|
|
2022-04-29 14:02:00 +00:00
|
|
|
template <typename TColumns>
|
|
|
|
static void copyColumnsResized(const TColumns & cols, size_t start, size_t size, Chunk & result_chunk)
|
2022-04-05 10:12:42 +00:00
|
|
|
{
|
|
|
|
for (const auto & col : cols)
|
|
|
|
{
|
2022-04-14 20:28:09 +00:00
|
|
|
if (col->empty())
|
|
|
|
{
|
2022-04-25 21:29:23 +00:00
|
|
|
/// add defaults
|
2022-04-07 16:14:56 +00:00
|
|
|
result_chunk.addColumn(col->cloneResized(size));
|
2022-04-14 20:28:09 +00:00
|
|
|
}
|
|
|
|
else if (col->size() == 1)
|
|
|
|
{
|
2022-04-25 21:29:23 +00:00
|
|
|
/// copy same row n times
|
2022-04-29 14:02:00 +00:00
|
|
|
result_chunk.addColumn(replicateRow(*col, size));
|
2022-04-14 20:28:09 +00:00
|
|
|
}
|
2022-04-07 16:14:56 +00:00
|
|
|
else
|
|
|
|
{
|
2022-04-25 21:29:23 +00:00
|
|
|
/// cut column
|
2022-04-14 20:28:09 +00:00
|
|
|
assert(start + size <= col->size());
|
2022-04-07 16:14:56 +00:00
|
|
|
result_chunk.addColumn(col->cut(start, size));
|
|
|
|
}
|
2022-04-05 10:12:42 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-04-29 14:02:00 +00:00
|
|
|
static Chunk copyChunkResized(const Chunk & lhs, const Chunk & rhs, size_t start, size_t num_rows)
|
2022-04-14 12:30:34 +00:00
|
|
|
{
|
|
|
|
Chunk result;
|
2022-04-29 14:02:00 +00:00
|
|
|
copyColumnsResized(lhs.getColumns(), start, num_rows, result);
|
|
|
|
copyColumnsResized(rhs.getColumns(), start, num_rows, result);
|
2022-04-14 12:30:34 +00:00
|
|
|
return result;
|
|
|
|
}
|
|
|
|
|
2022-04-01 18:20:58 +00:00
|
|
|
static void prepareChunk(Chunk & chunk)
|
|
|
|
{
|
2022-04-12 13:23:36 +00:00
|
|
|
if (!chunk)
|
|
|
|
return;
|
|
|
|
|
2022-04-01 18:20:58 +00:00
|
|
|
auto num_rows = chunk.getNumRows();
|
|
|
|
auto columns = chunk.detachColumns();
|
|
|
|
for (auto & column : columns)
|
|
|
|
column = column->convertToFullColumnIfConst();
|
|
|
|
|
|
|
|
chunk.setColumns(std::move(columns), num_rows);
|
|
|
|
}
|
|
|
|
|
2022-04-26 20:22:57 +00:00
|
|
|
void MergeJoinAlgorithm::initialize(Inputs inputs)
|
|
|
|
{
|
|
|
|
if (inputs.size() != 2)
|
|
|
|
throw Exception("MergeJoinAlgorithm requires exactly two inputs", ErrorCodes::LOGICAL_ERROR);
|
|
|
|
|
|
|
|
LOG_DEBUG(log, "MergeJoinAlgorithm initialize, number of inputs: {}", inputs.size());
|
|
|
|
for (size_t i = 0; i < inputs.size(); ++i)
|
|
|
|
{
|
|
|
|
assert(inputs[i].chunk.getNumColumns() == cursors[i]->sampleBlock().columns());
|
|
|
|
prepareChunk(inputs[i].chunk);
|
2022-04-29 14:02:00 +00:00
|
|
|
copyColumnsResized(inputs[i].chunk.getColumns(), 0, 0, sample_chunks.emplace_back());
|
2022-04-26 20:22:57 +00:00
|
|
|
consume(inputs[i], i);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-04-01 18:20:58 +00:00
|
|
|
void MergeJoinAlgorithm::consume(Input & input, size_t source_num)
|
|
|
|
{
|
2022-04-12 13:23:36 +00:00
|
|
|
if (input.skip_last_row)
|
|
|
|
throw Exception("skip_last_row is not supported", ErrorCodes::NOT_IMPLEMENTED);
|
|
|
|
|
|
|
|
if (input.permutation)
|
|
|
|
throw DB::Exception("permutation is not supported", ErrorCodes::NOT_IMPLEMENTED);
|
|
|
|
|
2022-04-08 15:05:38 +00:00
|
|
|
if (input.chunk)
|
2022-04-12 13:23:36 +00:00
|
|
|
{
|
2022-04-08 15:05:38 +00:00
|
|
|
stat.num_blocks[source_num] += 1;
|
2022-04-12 13:23:36 +00:00
|
|
|
stat.num_rows[source_num] += input.chunk.getNumRows();
|
|
|
|
}
|
2022-04-08 15:05:38 +00:00
|
|
|
|
2022-04-12 13:23:36 +00:00
|
|
|
prepareChunk(input.chunk);
|
2022-04-25 21:29:23 +00:00
|
|
|
cursors[source_num]->setChunk(std::move(input.chunk));
|
2022-04-05 10:12:42 +00:00
|
|
|
}
|
|
|
|
|
2022-04-14 20:28:09 +00:00
|
|
|
static Chunk getRowFromChunk(const Chunk & chunk, size_t pos)
|
|
|
|
{
|
|
|
|
Chunk result;
|
2022-04-29 14:02:00 +00:00
|
|
|
copyColumnsResized(chunk.getColumns(), pos, 1, result);
|
2022-04-14 20:28:09 +00:00
|
|
|
return result;
|
|
|
|
}
|
|
|
|
|
2022-04-25 21:29:23 +00:00
|
|
|
static void ALWAYS_INLINE addRange(PaddedPODArray<UInt64> & left_map, size_t start, size_t end)
|
|
|
|
{
|
|
|
|
assert(end > start);
|
|
|
|
for (size_t i = start; i < end; ++i)
|
|
|
|
left_map.push_back(i);
|
|
|
|
}
|
|
|
|
|
|
|
|
static void ALWAYS_INLINE addMany(PaddedPODArray<UInt64> & left_map, size_t idx, size_t num)
|
|
|
|
{
|
|
|
|
for (size_t i = 0; i < num; ++i)
|
|
|
|
left_map.push_back(idx);
|
|
|
|
}
|
|
|
|
|
|
|
|
template <JoinKind kind>
|
|
|
|
struct AllJoinImpl
|
|
|
|
{
|
|
|
|
constexpr static bool enabled = isInner(kind) || isLeft(kind) || isRight(kind) || isFull(kind);
|
|
|
|
|
|
|
|
static void join(FullMergeJoinCursor & left_cursor,
|
|
|
|
FullMergeJoinCursor & right_cursor,
|
|
|
|
size_t max_block_size,
|
|
|
|
PaddedPODArray<UInt64> & left_map,
|
|
|
|
PaddedPODArray<UInt64> & right_map,
|
|
|
|
std::unique_ptr<AllJoinState> & state)
|
|
|
|
{
|
2022-04-29 14:02:00 +00:00
|
|
|
right_map.clear();
|
|
|
|
right_map.reserve(max_block_size);
|
|
|
|
left_map.clear();
|
|
|
|
left_map.reserve(max_block_size);
|
2022-04-25 21:29:23 +00:00
|
|
|
|
|
|
|
size_t rpos = std::numeric_limits<size_t>::max();
|
|
|
|
size_t lpos = std::numeric_limits<size_t>::max();
|
|
|
|
int cmp = 0;
|
|
|
|
assert(left_cursor->isValid() && right_cursor->isValid());
|
|
|
|
while (left_cursor->isValid() && right_cursor->isValid())
|
|
|
|
{
|
|
|
|
lpos = left_cursor->getRow();
|
|
|
|
rpos = right_cursor->getRow();
|
|
|
|
|
|
|
|
cmp = compareCursors(left_cursor.cursor, right_cursor.cursor);
|
|
|
|
if (cmp == 0)
|
|
|
|
{
|
|
|
|
size_t lnum = nextDistinct(left_cursor.cursor);
|
|
|
|
size_t rnum = nextDistinct(right_cursor.cursor);
|
|
|
|
|
|
|
|
bool all_fit_in_block = std::max(left_map.size(), right_map.size()) + lnum * rnum <= max_block_size;
|
|
|
|
bool have_all_ranges = left_cursor.cursor.isValid() && right_cursor.cursor.isValid();
|
|
|
|
if (all_fit_in_block && have_all_ranges)
|
|
|
|
{
|
|
|
|
/// fast path if all joined rows fit in one block
|
|
|
|
for (size_t i = 0; i < rnum; ++i)
|
|
|
|
{
|
|
|
|
addRange(left_map, lpos, left_cursor.cursor.getRow());
|
|
|
|
addMany(right_map, rpos + i, lnum);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
2022-04-26 20:22:57 +00:00
|
|
|
assert(state == nullptr);
|
2022-04-25 21:29:23 +00:00
|
|
|
state = std::make_unique<AllJoinState>(left_cursor.cursor, lpos, right_cursor.cursor, rpos);
|
|
|
|
state->addRange(0, left_cursor.getCurrent().clone(), lpos, lnum);
|
|
|
|
state->addRange(1, right_cursor.getCurrent().clone(), rpos, rnum);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
else if (cmp < 0)
|
|
|
|
{
|
|
|
|
size_t num = nextDistinct(left_cursor.cursor);
|
2022-04-29 14:02:00 +00:00
|
|
|
if constexpr (isLeftOrFull(kind))
|
2022-04-26 13:51:33 +00:00
|
|
|
{
|
|
|
|
right_map.resize_fill(right_map.size() + num, right_cursor->rows);
|
|
|
|
for (size_t i = lpos; i < left_cursor->getRow(); ++i)
|
|
|
|
left_map.push_back(i);
|
|
|
|
}
|
2022-04-25 21:29:23 +00:00
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
size_t num = nextDistinct(right_cursor.cursor);
|
2022-04-29 14:02:00 +00:00
|
|
|
if constexpr (isRightOrFull(kind))
|
2022-04-26 13:51:33 +00:00
|
|
|
{
|
|
|
|
left_map.resize_fill(left_map.size() + num, left_cursor->rows);
|
|
|
|
for (size_t i = rpos; i < right_cursor->getRow(); ++i)
|
|
|
|
right_map.push_back(i);
|
|
|
|
}
|
2022-04-25 21:29:23 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
template <template<JoinKind> class Impl, typename ... Args>
|
|
|
|
void dispatchKind(JoinKind kind, Args && ... args)
|
|
|
|
{
|
|
|
|
if (Impl<JoinKind::Inner>::enabled && kind == JoinKind::Inner)
|
|
|
|
return Impl<JoinKind::Inner>::join(std::forward<Args>(args)...);
|
|
|
|
else if (Impl<JoinKind::Left>::enabled && kind == JoinKind::Left)
|
|
|
|
return Impl<JoinKind::Left>::join(std::forward<Args>(args)...);
|
|
|
|
else if (Impl<JoinKind::Right>::enabled && kind == JoinKind::Right)
|
|
|
|
return Impl<JoinKind::Right>::join(std::forward<Args>(args)...);
|
|
|
|
else if (Impl<JoinKind::Full>::enabled && kind == JoinKind::Full)
|
|
|
|
return Impl<JoinKind::Full>::join(std::forward<Args>(args)...);
|
|
|
|
else
|
|
|
|
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Unsupported join kind: \"{}\"", kind);
|
|
|
|
|
|
|
|
__builtin_unreachable();
|
|
|
|
}
|
|
|
|
|
2022-04-26 13:51:33 +00:00
|
|
|
std::optional<MergeJoinAlgorithm::Status> MergeJoinAlgorithm::handleAllJoinState()
|
2022-04-25 21:29:23 +00:00
|
|
|
{
|
|
|
|
if (all_join_state && all_join_state->finished())
|
|
|
|
{
|
|
|
|
all_join_state.reset();
|
|
|
|
}
|
|
|
|
|
|
|
|
if (all_join_state)
|
|
|
|
{
|
|
|
|
/// Accumulate blocks with same key in all_join_state
|
|
|
|
for (size_t i = 0; i < cursors.size(); ++i)
|
|
|
|
{
|
2022-04-26 13:51:33 +00:00
|
|
|
if (cursors[i]->cursor.isValid() && all_join_state->keys[i].equals(cursors[i]->cursor))
|
2022-04-25 21:29:23 +00:00
|
|
|
{
|
|
|
|
size_t pos = cursors[i]->cursor.getRow();
|
|
|
|
size_t num = nextDistinct(cursors[i]->cursor);
|
|
|
|
all_join_state->addRange(i, cursors[i]->getCurrent().clone(), pos, num);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
for (size_t i = 0; i < cursors.size(); ++i)
|
|
|
|
{
|
|
|
|
if (!cursors[i]->cursor.isValid() && !cursors[i]->fullyCompleted())
|
|
|
|
{
|
|
|
|
return Status(i);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
/// If current position is valid, then we've found new key, can join accumulated data
|
|
|
|
|
2022-04-26 13:51:33 +00:00
|
|
|
stat.max_blocks_loaded = std::max(stat.max_blocks_loaded, all_join_state->blocksStored());
|
|
|
|
|
2022-04-25 21:29:23 +00:00
|
|
|
/// join all rows with current key
|
|
|
|
MutableColumns result_cols;
|
|
|
|
for (size_t i = 0; i < 2; ++i)
|
|
|
|
{
|
|
|
|
for (const auto & col : sample_chunks[i].getColumns())
|
|
|
|
result_cols.push_back(col->cloneEmpty());
|
|
|
|
}
|
|
|
|
|
|
|
|
size_t total_rows = 0;
|
|
|
|
while (total_rows < max_block_size)
|
|
|
|
{
|
|
|
|
const auto & left_range = all_join_state->getLeft();
|
|
|
|
const auto & right_range = all_join_state->getRight();
|
|
|
|
|
|
|
|
total_rows += left_range.length;
|
|
|
|
|
|
|
|
size_t i = 0;
|
|
|
|
/// Copy left block
|
|
|
|
for (const auto & col : left_range.chunk.getColumns())
|
|
|
|
result_cols[i++]->insertRangeFrom(*col, left_range.begin, left_range.length);
|
|
|
|
/// And replicate current right column
|
|
|
|
for (const auto & col : right_range.chunk.getColumns())
|
|
|
|
result_cols[i++]->insertManyFrom(*col, right_range.current, left_range.length);
|
|
|
|
bool valid = all_join_state->next();
|
|
|
|
if (!valid)
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
|
|
|
|
if (total_rows)
|
|
|
|
return Status(Chunk(std::move(result_cols), total_rows));
|
|
|
|
}
|
2022-04-26 13:51:33 +00:00
|
|
|
return {};
|
|
|
|
}
|
2022-04-25 21:29:23 +00:00
|
|
|
|
2022-04-26 13:51:33 +00:00
|
|
|
MergeJoinAlgorithm::Status MergeJoinAlgorithm::allJoin(JoinKind kind)
|
|
|
|
{
|
2022-04-25 21:29:23 +00:00
|
|
|
auto left_map = ColumnUInt64::create();
|
|
|
|
auto right_map = ColumnUInt64::create();
|
|
|
|
size_t prev_pos[] = {cursors[0]->cursor.getRow(), cursors[1]->cursor.getRow()};
|
|
|
|
|
|
|
|
dispatchKind<AllJoinImpl>(kind, *cursors[0], *cursors[1], max_block_size, left_map->getData(), right_map->getData(), all_join_state);
|
2022-04-26 13:51:33 +00:00
|
|
|
|
2022-04-25 21:29:23 +00:00
|
|
|
assert(left_map->empty() || right_map->empty() || left_map->size() == right_map->size());
|
|
|
|
size_t num_result_rows = std::max(left_map->size(), right_map->size());
|
|
|
|
|
|
|
|
Chunk result;
|
|
|
|
addIndexColumn(cursors[0]->getCurrent().getColumns(), *left_map, result, prev_pos[0], num_result_rows);
|
|
|
|
addIndexColumn(cursors[1]->getCurrent().getColumns(), *right_map, result, prev_pos[1], num_result_rows);
|
|
|
|
return Status(std::move(result));
|
|
|
|
}
|
|
|
|
|
|
|
|
|
2022-04-05 10:12:42 +00:00
|
|
|
template <JoinKind kind>
|
2022-04-26 13:51:33 +00:00
|
|
|
struct AnyJoinImpl
|
2022-04-05 10:12:42 +00:00
|
|
|
{
|
2022-04-26 13:51:33 +00:00
|
|
|
constexpr static bool enabled = isInner(kind) || isLeft(kind) || isRight(kind);
|
2022-04-05 10:12:42 +00:00
|
|
|
|
2022-04-26 13:51:33 +00:00
|
|
|
static void join(FullMergeJoinCursor & left_cursor,
|
|
|
|
FullMergeJoinCursor & right_cursor,
|
|
|
|
PaddedPODArray<UInt64> & left_map,
|
|
|
|
PaddedPODArray<UInt64> & right_map,
|
|
|
|
AnyJoinState & state)
|
|
|
|
{
|
2022-04-29 14:02:00 +00:00
|
|
|
assert(enabled);
|
|
|
|
|
2022-04-26 13:51:33 +00:00
|
|
|
size_t num_rows = isLeft(kind) ? left_cursor->rowsLeft() :
|
|
|
|
isRight(kind) ? right_cursor->rowsLeft() :
|
|
|
|
std::min(left_cursor->rowsLeft(), right_cursor->rowsLeft());
|
2022-04-05 10:12:42 +00:00
|
|
|
|
2022-04-26 13:51:33 +00:00
|
|
|
if constexpr (isLeft(kind) || isInner(kind))
|
|
|
|
right_map.reserve(num_rows);
|
2022-04-05 10:12:42 +00:00
|
|
|
|
2022-04-26 13:51:33 +00:00
|
|
|
if constexpr (isRight(kind) || isInner(kind))
|
|
|
|
left_map.reserve(num_rows);
|
2022-04-05 10:12:42 +00:00
|
|
|
|
2022-04-26 13:51:33 +00:00
|
|
|
size_t rpos = std::numeric_limits<size_t>::max();
|
|
|
|
size_t lpos = std::numeric_limits<size_t>::max();
|
|
|
|
assert(left_cursor->isValid() && right_cursor->isValid());
|
|
|
|
int cmp = 0;
|
|
|
|
while (left_cursor->isValid() && right_cursor->isValid())
|
2022-04-14 12:30:34 +00:00
|
|
|
{
|
2022-04-26 13:51:33 +00:00
|
|
|
lpos = left_cursor->getRow();
|
|
|
|
rpos = right_cursor->getRow();
|
|
|
|
|
|
|
|
cmp = compareCursors(left_cursor.cursor, right_cursor.cursor);
|
|
|
|
if (cmp == 0)
|
2022-04-14 12:30:34 +00:00
|
|
|
{
|
2022-04-29 14:02:00 +00:00
|
|
|
if constexpr (isLeftOrFull(kind))
|
2022-04-26 13:51:33 +00:00
|
|
|
{
|
|
|
|
size_t lnum = nextDistinct(left_cursor.cursor);
|
|
|
|
right_map.resize_fill(right_map.size() + lnum, rpos);
|
|
|
|
}
|
|
|
|
|
2022-04-29 14:02:00 +00:00
|
|
|
if constexpr (isRightOrFull(kind))
|
2022-04-26 13:51:33 +00:00
|
|
|
{
|
|
|
|
size_t rnum = nextDistinct(right_cursor.cursor);
|
|
|
|
left_map.resize_fill(left_map.size() + rnum, lpos);
|
|
|
|
}
|
2022-04-14 12:30:34 +00:00
|
|
|
|
2022-04-26 13:51:33 +00:00
|
|
|
if constexpr (isInner(kind))
|
|
|
|
{
|
|
|
|
nextDistinct(left_cursor.cursor);
|
|
|
|
nextDistinct(right_cursor.cursor);
|
|
|
|
left_map.emplace_back(lpos);
|
|
|
|
right_map.emplace_back(rpos);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
else if (cmp < 0)
|
2022-04-14 12:30:34 +00:00
|
|
|
{
|
2022-04-26 13:51:33 +00:00
|
|
|
size_t num = nextDistinct(left_cursor.cursor);
|
2022-04-29 14:02:00 +00:00
|
|
|
if constexpr (isLeftOrFull(kind))
|
2022-04-26 13:51:33 +00:00
|
|
|
right_map.resize_fill(right_map.size() + num, right_cursor->rows);
|
2022-04-14 12:30:34 +00:00
|
|
|
}
|
2022-04-26 13:51:33 +00:00
|
|
|
else
|
2022-04-14 12:30:34 +00:00
|
|
|
{
|
2022-04-26 13:51:33 +00:00
|
|
|
size_t num = nextDistinct(right_cursor.cursor);
|
2022-04-29 14:02:00 +00:00
|
|
|
if constexpr (isRightOrFull(kind))
|
2022-04-26 13:51:33 +00:00
|
|
|
left_map.resize_fill(left_map.size() + num, left_cursor->rows);
|
2022-04-14 12:30:34 +00:00
|
|
|
}
|
2022-04-05 10:12:42 +00:00
|
|
|
}
|
2022-04-26 13:51:33 +00:00
|
|
|
|
|
|
|
/// Remember index of last joined row to propagate it to next block
|
|
|
|
|
|
|
|
state.setValue({});
|
|
|
|
if (!left_cursor->isValid())
|
2022-04-05 10:12:42 +00:00
|
|
|
{
|
2022-04-26 13:51:33 +00:00
|
|
|
state.set(0, left_cursor.cursor);
|
|
|
|
if (cmp == 0 && isLeft(kind))
|
|
|
|
state.setValue(getRowFromChunk(right_cursor.getCurrent(), rpos));
|
2022-04-05 10:12:42 +00:00
|
|
|
}
|
2022-04-26 13:51:33 +00:00
|
|
|
|
|
|
|
if (!right_cursor->isValid())
|
2022-04-05 10:12:42 +00:00
|
|
|
{
|
2022-04-26 13:51:33 +00:00
|
|
|
state.set(1, right_cursor.cursor);
|
|
|
|
if (cmp == 0 && isRight(kind))
|
|
|
|
state.setValue(getRowFromChunk(left_cursor.getCurrent(), lpos));
|
2022-04-05 10:12:42 +00:00
|
|
|
}
|
|
|
|
}
|
2022-04-26 13:51:33 +00:00
|
|
|
};
|
2022-04-12 13:23:36 +00:00
|
|
|
|
2022-04-26 13:51:33 +00:00
|
|
|
std::optional<MergeJoinAlgorithm::Status> MergeJoinAlgorithm::handleAnyJoinState()
|
|
|
|
{
|
|
|
|
if (any_join_state.empty())
|
|
|
|
return {};
|
2022-04-14 12:30:34 +00:00
|
|
|
|
2022-04-26 13:51:33 +00:00
|
|
|
auto kind = table_join->getTableJoin().kind();
|
2022-04-14 12:30:34 +00:00
|
|
|
|
2022-04-26 13:51:33 +00:00
|
|
|
Chunk result;
|
2022-04-01 18:20:58 +00:00
|
|
|
|
2022-04-26 13:51:33 +00:00
|
|
|
for (size_t source_num = 0; source_num < 2; ++source_num)
|
2022-04-12 13:23:36 +00:00
|
|
|
{
|
2022-04-26 13:51:33 +00:00
|
|
|
auto & current = *cursors[source_num];
|
|
|
|
auto & state = any_join_state;
|
|
|
|
if (any_join_state.keys[source_num].equals(current.cursor))
|
|
|
|
{
|
|
|
|
size_t start_pos = current->getRow();
|
|
|
|
size_t length = nextDistinct(current.cursor);
|
|
|
|
if (length && isLeft(kind) && source_num == 0)
|
2022-04-29 14:02:00 +00:00
|
|
|
result = copyChunkResized(current.getCurrent(), state.value ? state.value : sample_chunks[1 - source_num], start_pos, length);
|
2022-04-26 13:51:33 +00:00
|
|
|
if (length && isRight(kind) && source_num == 1)
|
2022-04-29 14:02:00 +00:00
|
|
|
result = copyChunkResized(state.value ? state.value : sample_chunks[1 - source_num], current.getCurrent(), start_pos, length);
|
2022-04-26 13:51:33 +00:00
|
|
|
|
|
|
|
/// We've found row with other key, no need to skip more rows with current key
|
|
|
|
if (current->isValid())
|
|
|
|
{
|
|
|
|
state.keys[source_num].reset();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
any_join_state.keys[source_num].reset();
|
|
|
|
}
|
2022-04-12 13:23:36 +00:00
|
|
|
}
|
2022-04-05 10:12:42 +00:00
|
|
|
|
2022-04-26 13:51:33 +00:00
|
|
|
if (result)
|
|
|
|
return Status(std::move(result));
|
|
|
|
return {};
|
2022-04-07 16:14:56 +00:00
|
|
|
}
|
|
|
|
|
2022-04-14 12:30:34 +00:00
|
|
|
MergeJoinAlgorithm::Status MergeJoinAlgorithm::anyJoin(JoinKind kind)
|
2022-04-07 16:14:56 +00:00
|
|
|
{
|
2022-04-26 13:51:33 +00:00
|
|
|
if (auto result = handleAnyJoinState())
|
|
|
|
return std::move(*result);
|
2022-04-14 12:30:34 +00:00
|
|
|
|
2022-04-25 21:29:23 +00:00
|
|
|
auto & current_left = cursors[0]->cursor;
|
|
|
|
if (!current_left.isValid())
|
2022-04-14 12:30:34 +00:00
|
|
|
return Status(0);
|
|
|
|
|
2022-04-25 21:29:23 +00:00
|
|
|
auto & current_right = cursors[1]->cursor;
|
|
|
|
if (!current_right.isValid())
|
2022-04-14 12:30:34 +00:00
|
|
|
return Status(1);
|
|
|
|
|
2022-04-26 14:55:20 +00:00
|
|
|
/// join doesn't build result block, but returns indices where result rows should be placed
|
2022-04-12 13:23:36 +00:00
|
|
|
auto left_map = ColumnUInt64::create();
|
|
|
|
auto right_map = ColumnUInt64::create();
|
2022-04-25 21:29:23 +00:00
|
|
|
size_t prev_pos[] = {current_left.getRow(), current_right.getRow()};
|
2022-04-07 16:14:56 +00:00
|
|
|
|
2022-04-26 13:51:33 +00:00
|
|
|
dispatchKind<AnyJoinImpl>(kind, *cursors[0], *cursors[1], left_map->getData(), right_map->getData(), any_join_state);
|
2022-04-12 13:23:36 +00:00
|
|
|
|
2022-04-14 12:30:34 +00:00
|
|
|
assert(left_map->empty() || right_map->empty() || left_map->size() == right_map->size());
|
2022-04-12 13:23:36 +00:00
|
|
|
size_t num_result_rows = std::max(left_map->size(), right_map->size());
|
|
|
|
|
2022-04-26 14:32:48 +00:00
|
|
|
/// build result block from indices
|
2022-04-14 12:30:34 +00:00
|
|
|
Chunk result;
|
2022-04-25 21:29:23 +00:00
|
|
|
addIndexColumn(cursors[0]->getCurrent().getColumns(), *left_map, result, prev_pos[0], num_result_rows);
|
|
|
|
addIndexColumn(cursors[1]->getCurrent().getColumns(), *right_map, result, prev_pos[1], num_result_rows);
|
2022-04-14 12:30:34 +00:00
|
|
|
return Status(std::move(result));
|
2022-04-06 14:36:51 +00:00
|
|
|
}
|
|
|
|
|
2022-04-29 14:02:00 +00:00
|
|
|
|
|
|
|
String logRow(const SortCursorImpl & lhs, std::optional<size_t> pos = {})
|
|
|
|
{
|
|
|
|
std::vector<String> text;
|
|
|
|
size_t lhs_pos = pos.value_or(lhs.getRow());
|
|
|
|
|
|
|
|
if (lhs_pos >= lhs.rows)
|
|
|
|
return fmt::format("[{}/{}] : END", lhs_pos, lhs.rows);
|
|
|
|
|
|
|
|
for (size_t i = 0; i < lhs.sort_columns_size; ++i)
|
|
|
|
{
|
|
|
|
if (const auto * left_nullable = checkAndGetColumn<ColumnNullable>(lhs.sort_columns[i]))
|
|
|
|
{
|
|
|
|
if (left_nullable->isNullAt(lhs_pos))
|
|
|
|
{
|
|
|
|
text.push_back("<NULL>");
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
Field f;
|
|
|
|
left_nullable->getNestedColumn().get(lhs_pos, f);
|
|
|
|
text.push_back(fmt::format("<{}>", f.dump()));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
Field f;
|
|
|
|
lhs.sort_columns[i]->get(lhs_pos, f);
|
|
|
|
text.push_back(fmt::format("{}", f.dump()));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return fmt::format("[{}/{}] : ({})", lhs_pos, lhs.rows, fmt::join(text, ", "));
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
String dumpBlock(Block block, const Chunk & chunk = {})
|
|
|
|
{
|
|
|
|
if (chunk)
|
|
|
|
{
|
|
|
|
assert(block.columns() == chunk.getNumColumns());
|
|
|
|
block.setColumns(chunk.getColumns());
|
|
|
|
}
|
|
|
|
|
|
|
|
WriteBufferFromOwnString out;
|
|
|
|
for (size_t i = 0; i < block.columns(); ++i)
|
|
|
|
{
|
|
|
|
const auto & col = block.safeGetByPosition(i);
|
|
|
|
writeString(col.name, out);
|
|
|
|
writeString(" :: ", out);
|
|
|
|
writeString(col.type->getName(), out);
|
|
|
|
writeString(" [", out);
|
|
|
|
|
|
|
|
size_t limit = std::min<size_t>(col.column->size(), 10);
|
|
|
|
for (size_t j = 0; j < limit; ++j)
|
|
|
|
{
|
|
|
|
if (j != 0)
|
|
|
|
writeString(", ", out);
|
|
|
|
{
|
|
|
|
Field f;
|
|
|
|
col.column->get(j, f);
|
|
|
|
writeString(f.dump(), out);
|
|
|
|
}
|
|
|
|
|
|
|
|
// {
|
|
|
|
// FormatSettings fs;
|
|
|
|
// col.type->getDefaultSerialization()->serializeText(*col.column, j, out, fs);
|
|
|
|
// }
|
|
|
|
}
|
|
|
|
if (limit < col.column->size())
|
|
|
|
writeString("...", out);
|
|
|
|
writeString("] ", out);
|
|
|
|
}
|
|
|
|
out.finalize();
|
|
|
|
return out.str();
|
|
|
|
}
|
|
|
|
|
|
|
|
/// if `source_num == 0` get data from left cursor and fill defaults at right
|
|
|
|
/// otherwise vise versa
|
|
|
|
Chunk MergeJoinAlgorithm::createBlockWithDefaults(size_t source_num)
|
|
|
|
{
|
|
|
|
size_t start = cursors[source_num]->cursor.getRow();
|
|
|
|
size_t num_rows = cursors[source_num]->cursor.rowsLeft();
|
|
|
|
|
|
|
|
ColumnRawPtrs cols;
|
|
|
|
{
|
|
|
|
const Chunk & chunk_left = source_num == 0 ? cursors[0]->getCurrent() : sample_chunks[0];
|
|
|
|
for (const auto & col : chunk_left.getColumns())
|
|
|
|
cols.push_back(col.get());
|
|
|
|
|
|
|
|
const Chunk & chunk_right = source_num == 1 ? cursors[1]->getCurrent() : sample_chunks[1];
|
|
|
|
for (const auto & col : chunk_right.getColumns())
|
|
|
|
cols.push_back(col.get());
|
|
|
|
}
|
|
|
|
|
|
|
|
Chunk result_chunk;
|
|
|
|
copyColumnsResized(cols, start, num_rows, result_chunk);
|
|
|
|
cursors[source_num]->detach();
|
|
|
|
return result_chunk;
|
|
|
|
}
|
|
|
|
|
2022-04-26 13:51:33 +00:00
|
|
|
IMergingAlgorithm::Status MergeJoinAlgorithm::merge()
|
2022-04-05 10:14:42 +00:00
|
|
|
{
|
2022-04-14 12:30:34 +00:00
|
|
|
auto kind = table_join->getTableJoin().kind();
|
2022-04-12 13:23:36 +00:00
|
|
|
|
2022-04-25 21:29:23 +00:00
|
|
|
if (!cursors[0]->cursor.isValid() && !cursors[0]->fullyCompleted())
|
2022-04-01 18:20:58 +00:00
|
|
|
return Status(0);
|
2022-04-03 15:54:22 +00:00
|
|
|
|
2022-04-25 21:29:23 +00:00
|
|
|
if (!cursors[1]->cursor.isValid() && !cursors[1]->fullyCompleted())
|
2022-04-01 18:20:58 +00:00
|
|
|
return Status(1);
|
|
|
|
|
2022-04-26 13:51:33 +00:00
|
|
|
if (auto result = handleAllJoinState())
|
|
|
|
return std::move(*result);
|
|
|
|
|
2022-04-14 12:30:34 +00:00
|
|
|
if (cursors[0]->fullyCompleted() || cursors[1]->fullyCompleted())
|
2022-04-03 15:54:22 +00:00
|
|
|
{
|
2022-04-14 12:30:34 +00:00
|
|
|
if (!cursors[0]->fullyCompleted() && isLeftOrFull(kind))
|
2022-04-29 14:02:00 +00:00
|
|
|
return Status(createBlockWithDefaults(0));
|
2022-04-12 13:23:36 +00:00
|
|
|
|
2022-04-14 12:30:34 +00:00
|
|
|
if (!cursors[1]->fullyCompleted() && isRightOrFull(kind))
|
2022-04-29 14:02:00 +00:00
|
|
|
return Status(createBlockWithDefaults(1));
|
2022-04-12 13:23:36 +00:00
|
|
|
|
2022-04-14 12:30:34 +00:00
|
|
|
return Status({}, true);
|
2022-04-05 10:12:42 +00:00
|
|
|
}
|
|
|
|
|
2022-04-26 14:32:48 +00:00
|
|
|
/// check if blocks are not intersecting at all
|
2022-04-29 14:02:00 +00:00
|
|
|
if (int cmp = totallyCompare(cursors[0]->cursor, cursors[1]->cursor); cmp == 111)
|
2022-04-03 15:54:22 +00:00
|
|
|
{
|
2022-04-06 14:36:51 +00:00
|
|
|
if (cmp < 0)
|
|
|
|
{
|
2022-04-12 13:23:36 +00:00
|
|
|
if (isLeftOrFull(kind))
|
2022-04-29 14:02:00 +00:00
|
|
|
return Status(createBlockWithDefaults(0));
|
2022-04-25 21:29:23 +00:00
|
|
|
cursors[0]->detach();
|
2022-04-06 14:36:51 +00:00
|
|
|
return Status(0);
|
|
|
|
}
|
|
|
|
|
|
|
|
if (cmp > 0)
|
|
|
|
{
|
2022-04-12 13:23:36 +00:00
|
|
|
if (isRightOrFull(kind))
|
2022-04-29 14:02:00 +00:00
|
|
|
return Status(createBlockWithDefaults(1));
|
2022-04-25 21:29:23 +00:00
|
|
|
cursors[1]->detach();
|
2022-04-06 14:36:51 +00:00
|
|
|
return Status(1);
|
|
|
|
}
|
|
|
|
}
|
2022-04-03 15:54:22 +00:00
|
|
|
|
2022-04-26 14:32:48 +00:00
|
|
|
auto strictness = table_join->getTableJoin().strictness();
|
|
|
|
|
2022-04-12 13:23:36 +00:00
|
|
|
if (strictness == ASTTableJoin::Strictness::Any)
|
2022-04-14 12:30:34 +00:00
|
|
|
return anyJoin(kind);
|
2022-04-06 14:36:51 +00:00
|
|
|
|
2022-04-25 21:29:23 +00:00
|
|
|
if (strictness == ASTTableJoin::Strictness::All)
|
|
|
|
return allJoin(kind);
|
|
|
|
|
2022-04-26 14:32:48 +00:00
|
|
|
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Unsupported strictness '{}'", strictness);
|
2022-04-01 18:20:58 +00:00
|
|
|
}
|
|
|
|
|
2022-03-30 10:07:09 +00:00
|
|
|
MergeJoinTransform::MergeJoinTransform(
|
2022-04-05 10:12:42 +00:00
|
|
|
JoinPtr table_join,
|
2022-03-30 10:07:09 +00:00
|
|
|
const Blocks & input_headers,
|
|
|
|
const Block & output_header,
|
2022-04-25 21:29:23 +00:00
|
|
|
size_t max_block_size,
|
2022-03-30 10:07:09 +00:00
|
|
|
UInt64 limit_hint)
|
2022-04-25 21:29:23 +00:00
|
|
|
: IMergingTransform<MergeJoinAlgorithm>(input_headers, output_header, true, limit_hint, table_join, input_headers, max_block_size)
|
2022-03-30 10:07:09 +00:00
|
|
|
, log(&Poco::Logger::get("MergeJoinTransform"))
|
|
|
|
{
|
2022-04-26 13:51:33 +00:00
|
|
|
LOG_TRACE(log, "Use MergeJoinTransform");
|
2022-03-30 10:07:09 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
void MergeJoinTransform::onFinish()
|
|
|
|
{
|
2022-04-08 15:05:38 +00:00
|
|
|
algorithm.onFinish(total_stopwatch.elapsedSeconds());
|
2022-03-30 10:07:09 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
}
|