mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-27 18:12:02 +00:00
full sorting all join
This commit is contained in:
parent
4e88e8f5ec
commit
a0144e115d
@ -5,22 +5,21 @@
|
||||
#include <optional>
|
||||
#include <type_traits>
|
||||
#include <vector>
|
||||
#include <Processors/Transforms/MergeJoinTransform.h>
|
||||
|
||||
#include <base/defines.h>
|
||||
#include <base/logger_useful.h>
|
||||
#include <IO/WriteHelpers.h>
|
||||
#include <Interpreters/TableJoin.h>
|
||||
#include <Core/SortDescription.h>
|
||||
#include <base/types.h>
|
||||
#include <boost/core/noncopyable.hpp>
|
||||
|
||||
#include <Columns/ColumnNullable.h>
|
||||
#include <Columns/ColumnsNumber.h>
|
||||
#include <Columns/IColumn.h>
|
||||
#include <Columns/ColumnNullable.h>
|
||||
#include <Core/SortCursor.h>
|
||||
#include <Core/SortDescription.h>
|
||||
#include <IO/WriteHelpers.h>
|
||||
#include <Interpreters/TableJoin.h>
|
||||
#include <Parsers/ASTTablesInSelectQuery.h>
|
||||
#include <base/defines.h>
|
||||
#include <base/types.h>
|
||||
#include <boost/logic/tribool.hpp>
|
||||
#include "Common/Exception.h"
|
||||
#include "Core/SettingsEnums.h"
|
||||
#include <Processors/Transforms/MergeJoinTransform.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -29,14 +28,11 @@ namespace DB
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int NOT_IMPLEMENTED;
|
||||
extern const int TOO_MANY_ROWS;
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
|
||||
constexpr size_t EMPTY_VALUE_IDX = std::numeric_limits<size_t>::max();
|
||||
using JoinKind = ASTTableJoin::Kind;
|
||||
|
||||
|
||||
namespace
|
||||
{
|
||||
|
||||
@ -49,7 +45,6 @@ FullMergeJoinCursorPtr createCursor(const Block & block, const Names & columns)
|
||||
return std::make_unique<FullMergeJoinCursor>(block, desc);
|
||||
}
|
||||
|
||||
|
||||
template <bool has_left_nulls, bool has_right_nulls>
|
||||
int nullableCompareAt(const IColumn & left_column, const IColumn & right_column, size_t lhs_pos, size_t rhs_pos, int null_direction_hint = 1)
|
||||
{
|
||||
@ -100,16 +95,8 @@ int ALWAYS_INLINE compareCursors(const SortCursorImpl & lhs, size_t lpos,
|
||||
{
|
||||
for (size_t i = 0; i < lhs.sort_columns_size; ++i)
|
||||
{
|
||||
const auto & desc = lhs.desc[i];
|
||||
int direction = desc.direction;
|
||||
int nulls_direction = desc.nulls_direction;
|
||||
|
||||
int cmp = direction * nullableCompareAt<true, true>(
|
||||
*lhs.sort_columns[i],
|
||||
*rhs.sort_columns[i],
|
||||
lpos,
|
||||
rpos,
|
||||
nulls_direction);
|
||||
/// TODO(@vdimir): use nullableCompareAt only if there's nullable columns
|
||||
int cmp = nullableCompareAt<true, true>(*lhs.sort_columns[i], *rhs.sort_columns[i], lpos, rpos);
|
||||
if (cmp != 0)
|
||||
return cmp;
|
||||
}
|
||||
@ -246,9 +233,7 @@ MergeJoinAlgorithm::MergeJoinAlgorithm(
|
||||
static ColumnPtr replicateRow(const IColumn & column, size_t pos, size_t num)
|
||||
{
|
||||
MutableColumnPtr res = column.cloneEmpty();
|
||||
LOG_DEBUG(&Poco::Logger::get("XXXX"), "{}:{} replicateRow {} | {} | {}", __FILE__, __LINE__, column.dumpStructure(), pos, num);
|
||||
res->insertManyFrom(column, pos, num);
|
||||
LOG_DEBUG(&Poco::Logger::get("XXXX"), "{}:{} replicateRow >>> {} | {} | {}", __FILE__, __LINE__, res->dumpStructure(), pos, num);
|
||||
return res;
|
||||
}
|
||||
|
||||
@ -333,11 +318,6 @@ void MergeJoinAlgorithm::consume(Input & input, size_t source_num)
|
||||
if (input.permutation)
|
||||
throw DB::Exception("permutation is not supported", ErrorCodes::NOT_IMPLEMENTED);
|
||||
|
||||
LOG_DEBUG(log, "XXXX: consume from {} chunk: {}", source_num, input.chunk.getNumRows());
|
||||
|
||||
if (input.chunk.getNumRows() >= EMPTY_VALUE_IDX)
|
||||
throw Exception("Too many rows in input", ErrorCodes::TOO_MANY_ROWS);
|
||||
|
||||
if (input.chunk)
|
||||
{
|
||||
stat.num_blocks[source_num] += 1;
|
||||
@ -346,8 +326,6 @@ void MergeJoinAlgorithm::consume(Input & input, size_t source_num)
|
||||
|
||||
prepareChunk(input.chunk);
|
||||
cursors[source_num]->setChunk(std::move(input.chunk));
|
||||
|
||||
// stat.max_blocks_loaded = std::max(stat.max_blocks_loaded, std::max(cursors[0]->blocksCount(), cursors[1]->blocksCount()));
|
||||
}
|
||||
|
||||
static Chunk getRowFromChunk(const Chunk & chunk, size_t pos)
|
||||
@ -429,16 +407,22 @@ struct AllJoinImpl
|
||||
else if (cmp < 0)
|
||||
{
|
||||
size_t num = nextDistinct(left_cursor.cursor);
|
||||
right_map.resize_fill(right_map.size() + num, right_cursor->rows);
|
||||
for (size_t i = lpos; i < left_cursor->getRow(); ++i)
|
||||
left_map.push_back(i);
|
||||
if constexpr (isLeft(kind))
|
||||
{
|
||||
right_map.resize_fill(right_map.size() + num, right_cursor->rows);
|
||||
for (size_t i = lpos; i < left_cursor->getRow(); ++i)
|
||||
left_map.push_back(i);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
size_t num = nextDistinct(right_cursor.cursor);
|
||||
left_map.resize_fill(left_map.size() + num, left_cursor->rows);
|
||||
for (size_t i = rpos; i < right_cursor->getRow(); ++i)
|
||||
right_map.push_back(i);
|
||||
if constexpr (isRight(kind))
|
||||
{
|
||||
left_map.resize_fill(left_map.size() + num, left_cursor->rows);
|
||||
for (size_t i = rpos; i < right_cursor->getRow(); ++i)
|
||||
right_map.push_back(i);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -461,7 +445,7 @@ void dispatchKind(JoinKind kind, Args && ... args)
|
||||
__builtin_unreachable();
|
||||
}
|
||||
|
||||
MergeJoinAlgorithm::Status MergeJoinAlgorithm::allJoin(JoinKind kind)
|
||||
std::optional<MergeJoinAlgorithm::Status> MergeJoinAlgorithm::handleAllJoinState()
|
||||
{
|
||||
if (all_join_state && all_join_state->finished())
|
||||
{
|
||||
@ -473,7 +457,7 @@ MergeJoinAlgorithm::Status MergeJoinAlgorithm::allJoin(JoinKind kind)
|
||||
/// Accumulate blocks with same key in all_join_state
|
||||
for (size_t i = 0; i < cursors.size(); ++i)
|
||||
{
|
||||
if (cursors[i]->cursor.isValid() && all_join_state->left_key.equals(cursors[i]->cursor))
|
||||
if (cursors[i]->cursor.isValid() && all_join_state->keys[i].equals(cursors[i]->cursor))
|
||||
{
|
||||
size_t pos = cursors[i]->cursor.getRow();
|
||||
size_t num = nextDistinct(cursors[i]->cursor);
|
||||
@ -490,6 +474,8 @@ MergeJoinAlgorithm::Status MergeJoinAlgorithm::allJoin(JoinKind kind)
|
||||
}
|
||||
/// If current position is valid, then we've found new key, can join accumulated data
|
||||
|
||||
stat.max_blocks_loaded = std::max(stat.max_blocks_loaded, all_join_state->blocksStored());
|
||||
|
||||
/// join all rows with current key
|
||||
MutableColumns result_cols;
|
||||
for (size_t i = 0; i < 2; ++i)
|
||||
@ -521,14 +507,17 @@ MergeJoinAlgorithm::Status MergeJoinAlgorithm::allJoin(JoinKind kind)
|
||||
if (total_rows)
|
||||
return Status(Chunk(std::move(result_cols), total_rows));
|
||||
}
|
||||
return {};
|
||||
}
|
||||
|
||||
MergeJoinAlgorithm::Status MergeJoinAlgorithm::allJoin(JoinKind kind)
|
||||
{
|
||||
auto left_map = ColumnUInt64::create();
|
||||
auto right_map = ColumnUInt64::create();
|
||||
size_t prev_pos[] = {cursors[0]->cursor.getRow(), cursors[1]->cursor.getRow()};
|
||||
|
||||
dispatchKind<AllJoinImpl>(kind, *cursors[0], *cursors[1], max_block_size, left_map->getData(), right_map->getData(), all_join_state);
|
||||
LOG_DEBUG(&Poco::Logger::get("XXXX"), "{}:{} left_map->size({}) == right_map->size({})",
|
||||
__FILE__, __LINE__, left_map->size(), right_map->size());
|
||||
|
||||
assert(left_map->empty() || right_map->empty() || left_map->size() == right_map->size());
|
||||
size_t num_result_rows = std::max(left_map->size(), right_map->size());
|
||||
|
||||
@ -540,153 +529,134 @@ MergeJoinAlgorithm::Status MergeJoinAlgorithm::allJoin(JoinKind kind)
|
||||
|
||||
|
||||
template <JoinKind kind>
|
||||
static void anyJoinImpl(FullMergeJoinCursor & left_cursor, FullMergeJoinCursor & right_cursor,
|
||||
PaddedPODArray<UInt64> & left_map, PaddedPODArray<UInt64> & right_map,
|
||||
AnyJoinState & state)
|
||||
struct AnyJoinImpl
|
||||
{
|
||||
static_assert(kind == JoinKind::Left || kind == JoinKind::Right || kind == JoinKind::Inner, "Invalid join kind");
|
||||
constexpr static bool enabled = isInner(kind) || isLeft(kind) || isRight(kind);
|
||||
|
||||
size_t num_rows = isLeft(kind) ? left_cursor->rowsLeft() :
|
||||
isRight(kind) ? right_cursor->rowsLeft() :
|
||||
std::min(left_cursor->rowsLeft(), right_cursor->rowsLeft());
|
||||
|
||||
if constexpr (isLeft(kind) || isInner(kind))
|
||||
right_map.reserve(num_rows);
|
||||
|
||||
if constexpr (isRight(kind) || isInner(kind))
|
||||
left_map.reserve(num_rows);
|
||||
|
||||
size_t rpos = std::numeric_limits<size_t>::max();
|
||||
size_t lpos = std::numeric_limits<size_t>::max();
|
||||
assert(left_cursor->isValid() && right_cursor->isValid());
|
||||
int cmp = 0;
|
||||
while (left_cursor->isValid() && right_cursor->isValid())
|
||||
static void join(FullMergeJoinCursor & left_cursor,
|
||||
FullMergeJoinCursor & right_cursor,
|
||||
PaddedPODArray<UInt64> & left_map,
|
||||
PaddedPODArray<UInt64> & right_map,
|
||||
AnyJoinState & state)
|
||||
{
|
||||
lpos = left_cursor->getRow();
|
||||
rpos = right_cursor->getRow();
|
||||
size_t num_rows = isLeft(kind) ? left_cursor->rowsLeft() :
|
||||
isRight(kind) ? right_cursor->rowsLeft() :
|
||||
std::min(left_cursor->rowsLeft(), right_cursor->rowsLeft());
|
||||
|
||||
cmp = compareCursors(left_cursor.cursor, right_cursor.cursor);
|
||||
if (cmp == 0)
|
||||
if constexpr (isLeft(kind) || isInner(kind))
|
||||
right_map.reserve(num_rows);
|
||||
|
||||
if constexpr (isRight(kind) || isInner(kind))
|
||||
left_map.reserve(num_rows);
|
||||
|
||||
size_t rpos = std::numeric_limits<size_t>::max();
|
||||
size_t lpos = std::numeric_limits<size_t>::max();
|
||||
assert(left_cursor->isValid() && right_cursor->isValid());
|
||||
int cmp = 0;
|
||||
while (left_cursor->isValid() && right_cursor->isValid())
|
||||
{
|
||||
if constexpr (isLeft(kind))
|
||||
{
|
||||
size_t lnum = nextDistinct(left_cursor.cursor);
|
||||
right_map.resize_fill(right_map.size() + lnum, rpos);
|
||||
}
|
||||
lpos = left_cursor->getRow();
|
||||
rpos = right_cursor->getRow();
|
||||
|
||||
if constexpr (isRight(kind))
|
||||
cmp = compareCursors(left_cursor.cursor, right_cursor.cursor);
|
||||
if (cmp == 0)
|
||||
{
|
||||
size_t rnum = nextDistinct(right_cursor.cursor);
|
||||
left_map.resize_fill(left_map.size() + rnum, lpos);
|
||||
}
|
||||
if constexpr (isLeft(kind))
|
||||
{
|
||||
size_t lnum = nextDistinct(left_cursor.cursor);
|
||||
right_map.resize_fill(right_map.size() + lnum, rpos);
|
||||
}
|
||||
|
||||
if constexpr (isInner(kind))
|
||||
if constexpr (isRight(kind))
|
||||
{
|
||||
size_t rnum = nextDistinct(right_cursor.cursor);
|
||||
left_map.resize_fill(left_map.size() + rnum, lpos);
|
||||
}
|
||||
|
||||
if constexpr (isInner(kind))
|
||||
{
|
||||
nextDistinct(left_cursor.cursor);
|
||||
nextDistinct(right_cursor.cursor);
|
||||
left_map.emplace_back(lpos);
|
||||
right_map.emplace_back(rpos);
|
||||
}
|
||||
}
|
||||
else if (cmp < 0)
|
||||
{
|
||||
nextDistinct(left_cursor.cursor);
|
||||
nextDistinct(right_cursor.cursor);
|
||||
left_map.emplace_back(lpos);
|
||||
right_map.emplace_back(rpos);
|
||||
size_t num = nextDistinct(left_cursor.cursor);
|
||||
if constexpr (isLeft(kind))
|
||||
right_map.resize_fill(right_map.size() + num, right_cursor->rows);
|
||||
}
|
||||
else
|
||||
{
|
||||
size_t num = nextDistinct(right_cursor.cursor);
|
||||
if constexpr (isRight(kind))
|
||||
left_map.resize_fill(left_map.size() + num, left_cursor->rows);
|
||||
}
|
||||
}
|
||||
else if (cmp < 0)
|
||||
|
||||
/// Remember index of last joined row to propagate it to next block
|
||||
|
||||
state.setValue({});
|
||||
if (!left_cursor->isValid())
|
||||
{
|
||||
size_t num = nextDistinct(left_cursor.cursor);
|
||||
if constexpr (isLeft(kind))
|
||||
right_map.resize_fill(right_map.size() + num, right_cursor->rows);
|
||||
state.set(0, left_cursor.cursor);
|
||||
if (cmp == 0 && isLeft(kind))
|
||||
state.setValue(getRowFromChunk(right_cursor.getCurrent(), rpos));
|
||||
}
|
||||
|
||||
if (!right_cursor->isValid())
|
||||
{
|
||||
state.set(1, right_cursor.cursor);
|
||||
if (cmp == 0 && isRight(kind))
|
||||
state.setValue(getRowFromChunk(left_cursor.getCurrent(), lpos));
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
std::optional<MergeJoinAlgorithm::Status> MergeJoinAlgorithm::handleAnyJoinState()
|
||||
{
|
||||
if (any_join_state.empty())
|
||||
return {};
|
||||
|
||||
auto kind = table_join->getTableJoin().kind();
|
||||
|
||||
Chunk result;
|
||||
|
||||
for (size_t source_num = 0; source_num < 2; ++source_num)
|
||||
{
|
||||
auto & current = *cursors[source_num];
|
||||
auto & state = any_join_state;
|
||||
if (any_join_state.keys[source_num].equals(current.cursor))
|
||||
{
|
||||
size_t start_pos = current->getRow();
|
||||
size_t length = nextDistinct(current.cursor);
|
||||
if (length && isLeft(kind) && source_num == 0)
|
||||
result = createBlockWithDefaults(current.getCurrent(), state.value ? state.value : sample_chunks[1 - source_num], start_pos, length);
|
||||
if (length && isRight(kind) && source_num == 1)
|
||||
result = createBlockWithDefaults(state.value ? state.value : sample_chunks[1 - source_num], current.getCurrent(), start_pos, length);
|
||||
|
||||
/// We've found row with other key, no need to skip more rows with current key
|
||||
if (current->isValid())
|
||||
{
|
||||
state.keys[source_num].reset();
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
size_t num = nextDistinct(right_cursor.cursor);
|
||||
if constexpr (isRight(kind))
|
||||
left_map.resize_fill(left_map.size() + num, left_cursor->rows);
|
||||
any_join_state.keys[source_num].reset();
|
||||
}
|
||||
}
|
||||
|
||||
/// Remember index of last joined row to propagate it to next block
|
||||
|
||||
state.setValue({});
|
||||
if (!left_cursor->isValid())
|
||||
{
|
||||
state.set(0, left_cursor.cursor);
|
||||
if (cmp == 0 && isLeft(kind))
|
||||
state.setValue(getRowFromChunk(right_cursor.getCurrent(), rpos));
|
||||
}
|
||||
|
||||
if (!right_cursor->isValid())
|
||||
{
|
||||
state.set(1, right_cursor.cursor);
|
||||
if (cmp == 0 && isRight(kind))
|
||||
state.setValue(getRowFromChunk(left_cursor.getCurrent(), lpos));
|
||||
}
|
||||
}
|
||||
|
||||
static void anyJoinDispatch(JoinKind kind,
|
||||
FullMergeJoinCursor & left_cursor,
|
||||
FullMergeJoinCursor & right_cursor,
|
||||
PaddedPODArray<UInt64> & left_map,
|
||||
PaddedPODArray<UInt64> & right_map,
|
||||
AnyJoinState & state)
|
||||
{
|
||||
if (isInner(kind))
|
||||
{
|
||||
return anyJoinImpl<JoinKind::Inner>(left_cursor, right_cursor, left_map, right_map, state);
|
||||
}
|
||||
else if (isLeft(kind))
|
||||
{
|
||||
return anyJoinImpl<JoinKind::Left>(left_cursor, right_cursor, left_map, right_map, state);
|
||||
}
|
||||
else if (isRight(kind))
|
||||
{
|
||||
return anyJoinImpl<JoinKind::Right>(left_cursor, right_cursor, left_map, right_map, state);
|
||||
}
|
||||
else
|
||||
{
|
||||
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Unsupported join kind: \"{}\"", kind);
|
||||
}
|
||||
__builtin_unreachable();
|
||||
}
|
||||
|
||||
static bool handleAnyJoinState(JoinKind kind,
|
||||
size_t source_num,
|
||||
AnyJoinState & state,
|
||||
FullMergeJoinCursor & current,
|
||||
const Chunk & sample_chunk,
|
||||
Chunk & result)
|
||||
{
|
||||
bool has_more = state.keys[source_num].equals(current.cursor);
|
||||
if (has_more)
|
||||
{
|
||||
size_t start_pos = current->getRow();
|
||||
size_t length = nextDistinct(current.cursor);
|
||||
if (length && isLeft(kind) && source_num == 0)
|
||||
result = createBlockWithDefaults(current.getCurrent(), state.value ? state.value : sample_chunk, start_pos, length);
|
||||
if (length && isRight(kind) && source_num == 1)
|
||||
result = createBlockWithDefaults(state.value ? state.value : sample_chunk, current.getCurrent(), start_pos, length);
|
||||
|
||||
/// We've found row with other key, no need to skip more rows with current key
|
||||
if (current->isValid())
|
||||
has_more = false;
|
||||
}
|
||||
return !has_more;
|
||||
if (result)
|
||||
return Status(std::move(result));
|
||||
return {};
|
||||
}
|
||||
|
||||
MergeJoinAlgorithm::Status MergeJoinAlgorithm::anyJoin(JoinKind kind)
|
||||
{
|
||||
if (!any_join_state.empty())
|
||||
{
|
||||
Chunk result;
|
||||
|
||||
bool left_finished = handleAnyJoinState(kind, 0, any_join_state, *cursors[0], sample_chunks[1], result);
|
||||
if (left_finished)
|
||||
any_join_state.keys[0].reset();
|
||||
|
||||
bool right_finished = handleAnyJoinState(kind, 1, any_join_state, *cursors[1], sample_chunks[0], result);
|
||||
if (right_finished)
|
||||
any_join_state.keys[1].reset();
|
||||
|
||||
if (result)
|
||||
return Status(std::move(result));
|
||||
}
|
||||
if (auto result = handleAnyJoinState())
|
||||
return std::move(*result);
|
||||
|
||||
auto & current_left = cursors[0]->cursor;
|
||||
if (!current_left.isValid())
|
||||
@ -700,7 +670,7 @@ MergeJoinAlgorithm::Status MergeJoinAlgorithm::anyJoin(JoinKind kind)
|
||||
auto right_map = ColumnUInt64::create();
|
||||
size_t prev_pos[] = {current_left.getRow(), current_right.getRow()};
|
||||
|
||||
anyJoinDispatch(kind, *cursors[0], *cursors[1], left_map->getData(), right_map->getData(), any_join_state);
|
||||
dispatchKind<AnyJoinImpl>(kind, *cursors[0], *cursors[1], left_map->getData(), right_map->getData(), any_join_state);
|
||||
|
||||
assert(left_map->empty() || right_map->empty() || left_map->size() == right_map->size());
|
||||
size_t num_result_rows = std::max(left_map->size(), right_map->size());
|
||||
@ -711,19 +681,20 @@ MergeJoinAlgorithm::Status MergeJoinAlgorithm::anyJoin(JoinKind kind)
|
||||
return Status(std::move(result));
|
||||
}
|
||||
|
||||
IMergingAlgorithm::Status MergeJoinAlgorithm::mergeImpl()
|
||||
IMergingAlgorithm::Status MergeJoinAlgorithm::merge()
|
||||
{
|
||||
auto kind = table_join->getTableJoin().kind();
|
||||
auto strictness = table_join->getTableJoin().strictness();
|
||||
|
||||
LOG_DEBUG(log, "XXXX: merge, {} {}", kind, strictness);
|
||||
|
||||
if (!cursors[0]->cursor.isValid() && !cursors[0]->fullyCompleted())
|
||||
return Status(0);
|
||||
|
||||
if (!cursors[1]->cursor.isValid() && !cursors[1]->fullyCompleted())
|
||||
return Status(1);
|
||||
|
||||
if (auto result = handleAllJoinState())
|
||||
return std::move(*result);
|
||||
|
||||
if (cursors[0]->fullyCompleted() || cursors[1]->fullyCompleted())
|
||||
{
|
||||
if (!cursors[0]->fullyCompleted() && isLeftOrFull(kind))
|
||||
@ -772,7 +743,7 @@ MergeJoinTransform::MergeJoinTransform(
|
||||
: IMergingTransform<MergeJoinAlgorithm>(input_headers, output_header, true, limit_hint, table_join, input_headers, max_block_size)
|
||||
, log(&Poco::Logger::get("MergeJoinTransform"))
|
||||
{
|
||||
LOG_TRACE(log, "Will use MergeJoinTransform");
|
||||
LOG_TRACE(log, "Use MergeJoinTransform");
|
||||
}
|
||||
|
||||
void MergeJoinTransform::onFinish()
|
||||
@ -780,5 +751,4 @@ void MergeJoinTransform::onFinish()
|
||||
algorithm.onFinish(total_stopwatch.elapsedSeconds());
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
@ -79,8 +79,9 @@ struct JoinKeyRow
|
||||
}
|
||||
};
|
||||
|
||||
struct AnyJoinState : boost::noncopyable
|
||||
class AnyJoinState : boost::noncopyable
|
||||
{
|
||||
public:
|
||||
AnyJoinState() = default;
|
||||
|
||||
void set(size_t source_num, const SortCursorImpl & cursor)
|
||||
@ -103,8 +104,9 @@ struct AnyJoinState : boost::noncopyable
|
||||
Chunk value;
|
||||
};
|
||||
|
||||
struct AllJoinState : boost::noncopyable
|
||||
class AllJoinState : boost::noncopyable
|
||||
{
|
||||
public:
|
||||
struct Range
|
||||
{
|
||||
Range() = default;
|
||||
@ -127,8 +129,7 @@ struct AllJoinState : boost::noncopyable
|
||||
|
||||
AllJoinState(const SortCursorImpl & lcursor, size_t lpos,
|
||||
const SortCursorImpl & rcursor, size_t rpos)
|
||||
: left_key(lcursor, lpos)
|
||||
, right_key(rcursor, rpos)
|
||||
: keys{JoinKeyRow(lcursor, lpos), JoinKeyRow(rcursor, rpos)}
|
||||
{
|
||||
}
|
||||
|
||||
@ -156,6 +157,14 @@ struct AllJoinState : boost::noncopyable
|
||||
|
||||
bool finished() const { return lidx >= left.size(); }
|
||||
|
||||
size_t blocksStored() const { return left.size() + right.size(); }
|
||||
const Range & getLeft() const { return left[lidx]; }
|
||||
const Range & getRight() const { return right[ridx]; }
|
||||
|
||||
/// Left and right types can be different because of nullable
|
||||
JoinKeyRow keys[2];
|
||||
|
||||
private:
|
||||
bool nextLeft()
|
||||
{
|
||||
lidx += 1;
|
||||
@ -179,17 +188,9 @@ struct AllJoinState : boost::noncopyable
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
const Range & getLeft() const { return left[lidx]; }
|
||||
const Range & getRight() const { return right[ridx]; }
|
||||
|
||||
std::vector<Range> left;
|
||||
std::vector<Range> right;
|
||||
|
||||
/// Left and right types can be different because of nullable
|
||||
JoinKeyRow left_key;
|
||||
JoinKeyRow right_key;
|
||||
|
||||
size_t lidx = 0;
|
||||
size_t ridx = 0;
|
||||
};
|
||||
@ -236,13 +237,7 @@ public:
|
||||
|
||||
virtual void initialize(Inputs inputs) override;
|
||||
virtual void consume(Input & input, size_t source_num) override;
|
||||
virtual Status merge() override
|
||||
{
|
||||
Status result = mergeImpl();
|
||||
LOG_TRACE(log, "XXXX: merge result: chunk: {}, required: {}, finished: {}",
|
||||
result.chunk.getNumRows(), result.required_source, result.is_finished);
|
||||
return result ;
|
||||
}
|
||||
virtual Status merge() override;
|
||||
|
||||
void onFinish(double seconds)
|
||||
{
|
||||
@ -255,9 +250,10 @@ public:
|
||||
}
|
||||
|
||||
private:
|
||||
Status mergeImpl();
|
||||
|
||||
std::optional<Status> handleAnyJoinState();
|
||||
Status anyJoin(ASTTableJoin::Kind kind);
|
||||
|
||||
std::optional<Status> handleAllJoinState();
|
||||
Status allJoin(ASTTableJoin::Kind kind);
|
||||
|
||||
std::vector<FullMergeJoinCursorPtr> cursors;
|
||||
|
File diff suppressed because it is too large
Load Diff
@ -18,31 +18,30 @@ INSERT INTO t2
|
||||
'val' || toString(number) as s
|
||||
FROM numbers_mt({{ table_size - 3 }});
|
||||
|
||||
SET join_algorithm = 'full_sorting_merge';
|
||||
-- SET join_algorithm = 'full_sorting_merge';
|
||||
|
||||
{% for block_size in range (1, table_size + 1) -%}
|
||||
{% for kind in ['ALL', 'ANY'] -%}
|
||||
{% for block_size in range(1, table_size + 1) -%}
|
||||
|
||||
SET max_block_size = {{ block_size }};
|
||||
|
||||
SELECT 'simple cases, block size = {{ block_size }}';
|
||||
|
||||
SELECT 'ANY INNER';
|
||||
SELECT '{{ kind }} INNER | bs = {{ block_size }}';
|
||||
SELECT t1.key, t2.key, empty(t1.s), empty(t2.s) FROM t1
|
||||
ANY INNER JOIN t2
|
||||
{{ kind }} INNER JOIN t2
|
||||
ON t1.key == t2.key
|
||||
ORDER BY t1.key, t2.key
|
||||
;
|
||||
|
||||
SELECT 'ANY LEFT';
|
||||
SELECT '{{ kind }} LEFT | bs = {{ block_size }}';
|
||||
SELECT t1.key, t2.key, t1.s, empty(t2.s) FROM t1
|
||||
ANY LEFT JOIN t2
|
||||
{{ kind }} LEFT JOIN t2
|
||||
ON t1.key == t2.key
|
||||
ORDER BY t1.key, t2.key, t1.s
|
||||
;
|
||||
|
||||
SELECT 'ANY RIGHT';
|
||||
SELECT '{{ kind }} RIGHT | bs = {{ block_size }}';
|
||||
SELECT t1.key, t2.key, empty(t1.s), t2.s FROM t1
|
||||
ANY RIGHT JOIN t2
|
||||
{{ kind }} RIGHT JOIN t2
|
||||
ON t1.key == t2.key
|
||||
ORDER BY t1.key, t2.key, t2.s
|
||||
;
|
||||
@ -51,25 +50,25 @@ ORDER BY t1.key, t2.key, t2.s
|
||||
|
||||
SET join_use_nulls = 1;
|
||||
|
||||
SELECT 'simple cases: join_use_nulls';
|
||||
|
||||
SELECT 'ANY INNER';
|
||||
SELECT '{{ kind }} INNER | join_use_nulls = 1';
|
||||
SELECT t1.key, t2.key, isNull(t1.s), isNull(t2.s) FROM t1
|
||||
ANY INNER JOIN t2
|
||||
{{ kind }} INNER JOIN t2
|
||||
ON t1.key == t2.key
|
||||
ORDER BY t1.key, t2.key
|
||||
;
|
||||
|
||||
SELECT 'ANY LEFT';
|
||||
SELECT '{{ kind }} LEFT | join_use_nulls = 1';
|
||||
SELECT t1.key, t2.key, t1.s, isNull(t2.s) FROM t1
|
||||
ANY LEFT JOIN t2
|
||||
{{ kind }} LEFT JOIN t2
|
||||
ON t1.key == t2.key
|
||||
ORDER BY t1.key, t2.key, t1.s
|
||||
;
|
||||
|
||||
SELECT 'ANY RIGHT';
|
||||
SELECT '{{ kind }} RIGHT | join_use_nulls = 1';
|
||||
SELECT t1.key, t2.key, isNull(t1.s), t2.s FROM t1
|
||||
ANY RIGHT JOIN t2
|
||||
{{ kind }} RIGHT JOIN t2
|
||||
ON t1.key == t2.key
|
||||
ORDER BY t1.key, t2.key, t2.s
|
||||
;
|
||||
|
||||
{% endfor -%}
|
||||
|
File diff suppressed because it is too large
Load Diff
@ -1,36 +1,38 @@
|
||||
DROP TABLE IF EXISTS t1;
|
||||
DROP TABLE IF EXISTS t2;
|
||||
DROP TABLE IF EXISTS tn1;
|
||||
DROP TABLE IF EXISTS tn2;
|
||||
|
||||
CREATE TABLE t1 (key UInt32, s String) engine = TinyLog;
|
||||
CREATE TABLE tn1 (key Nullable(UInt32), s String) engine = TinyLog;
|
||||
CREATE TABLE t2 (key UInt32, s String) engine = TinyLog;
|
||||
CREATE TABLE tn2 (key Nullable(UInt32), s String) engine = TinyLog;
|
||||
|
||||
{% set table_size = 10 %}
|
||||
|
||||
INSERT INTO t1 VALUES (1, 'val1'), (2, 'val21'), (2, 'val22'), (2, 'val23'), (2, 'val24'), (2, 'val25'), (2, 'val26'), (2, 'val27'), (3, 'val3');
|
||||
INSERT INTO tn1 VALUES (1, 'val1'), (NULL, 'val21'), (NULL, 'val22'), (NULL, 'val23'), (NULL, 'val24'), (NULL, 'val25'), (NULL, 'val26'), (NULL, 'val27'), (3, 'val3');
|
||||
INSERT INTO t2 VALUES (1, 'val11'), (1, 'val12'), (2, 'val22'), (2, 'val23'), (2, 'val24'), (2, 'val25'), (2, 'val26'), (2, 'val27'), (2, 'val28'), (3, 'val3');
|
||||
INSERT INTO tn2 VALUES (1, 'val11'), (1, 'val12'), (NULL, 'val22'), (NULL, 'val23'), (NULL, 'val24'), (NULL, 'val25'), (NULL, 'val26'), (NULL, 'val27'), (NULL, 'val28'), (3, 'val3');
|
||||
|
||||
SET join_algorithm = 'full_sorting_merge';
|
||||
-- SET join_algorithm = 'full_sorting_merge';
|
||||
|
||||
{% for block_size in range (1, table_size + 1) -%}
|
||||
{% for block_size in range(1, 11) -%}
|
||||
|
||||
SET max_block_size = {{ block_size }};
|
||||
|
||||
SELECT 'block size = {{ block_size }}';
|
||||
|
||||
{% for t1, t2 in [('t1', 't2'), ('t1', 'tn2'), ('tn1', 't2'), ('tn1', 'tn2')] -%}
|
||||
{% for kind in ['ALL', 'ANY'] -%}
|
||||
|
||||
SELECT 'ANY INNER';
|
||||
SELECT t1.key, t2.key, length(t1.s), length(t2.s) FROM {{ t1 }} AS t1 ANY INNER JOIN {{ t2 }} AS t2 ON t1.key == t2.key ORDER BY t1.key, t2.key;
|
||||
SELECT '{{ t1 }} {{ kind }} INNER JOIN {{ t2 }} | bs = {{ block_size }}';
|
||||
SELECT t1.key, t2.key, length(t1.s), length(t2.s) FROM {{ t1 }} AS t1 {{ kind }} INNER JOIN {{ t2 }} AS t2 ON t1.key == t2.key ORDER BY t1.key, t2.key;
|
||||
|
||||
SELECT 'ANY LEFT';
|
||||
SELECT t1.key, t2.key, t1.s, length(t2.s) FROM {{ t1 }} AS t1 ANY LEFT JOIN {{ t2 }} AS t2 ON t1.key == t2.key ORDER BY t1.key, t2.key, t1.s;
|
||||
SELECT '{{ t1 }} {{ kind }} LEFT JOIN {{ t2 }} | bs = {{ block_size }}';
|
||||
SELECT t1.key, t2.key, t1.s, length(t2.s) FROM {{ t1 }} AS t1 {{ kind }} LEFT JOIN {{ t2 }} AS t2 ON t1.key == t2.key ORDER BY t1.key, t2.key, t1.s;
|
||||
|
||||
SELECT 'ANY RIGHT';
|
||||
SELECT t1.key, t2.key, length(t1.s), t2.s FROM {{ t1 }} AS t1 ANY RIGHT JOIN {{ t2 }} AS t2 ON t1.key == t2.key ORDER BY t1.key, t2.key, t2.s;
|
||||
SELECT '{{ t1 }} {{ kind }} RIGHT JOIN {{ t2 }} | bs = {{ block_size }}';
|
||||
SELECT t1.key, t2.key, length(t1.s), t2.s FROM {{ t1 }} AS t1 {{ kind }} RIGHT JOIN {{ t2 }} AS t2 ON t1.key == t2.key ORDER BY t1.key, t2.key, t2.s;
|
||||
|
||||
{% endfor -%}
|
||||
{% endfor -%}
|
||||
{% endfor -%}
|
||||
|
@ -1,3 +1,27 @@
|
||||
ALL INNER
|
||||
500353531835 500353531835 1000342 1000342 1000342
|
||||
ALL LEFT
|
||||
50195752660639 500353531835 10369589 10369589 1000342
|
||||
ALL RIGHT
|
||||
500353531835 684008812186 1367170 1000342 1367170
|
||||
ALL INNER
|
||||
500353531835 500353531835 1000342 1000342 1000342
|
||||
ALL LEFT
|
||||
50195752660639 500353531835 10369589 10369589 1000342
|
||||
ALL RIGHT
|
||||
500353531835 684008812186 1367170 1000342 1367170
|
||||
ALL INNER
|
||||
500353531835 500353531835 1000342 1000342 1000342
|
||||
ALL LEFT
|
||||
50195752660639 500353531835 10369589 10369589 1000342
|
||||
ALL RIGHT
|
||||
500353531835 684008812186 1367170 1000342 1367170
|
||||
ALL INNER
|
||||
500353531835 500353531835 1000342 1000342 1000342
|
||||
ALL LEFT
|
||||
50195752660639 500353531835 10369589 10369589 1000342
|
||||
ALL RIGHT
|
||||
500353531835 684008812186 1367170 1000342 1367170
|
||||
ANY INNER
|
||||
199622811843 199622811843 399458 399458 399458
|
||||
ANY LEFT
|
||||
|
@ -1,3 +1,4 @@
|
||||
-- Tags: long
|
||||
DROP TABLE IF EXISTS t1;
|
||||
DROP TABLE IF EXISTS t2;
|
||||
|
||||
@ -21,28 +22,30 @@ INSERT INTO t2
|
||||
FROM numbers_mt({{ rtable_size }})
|
||||
;
|
||||
|
||||
SET join_algorithm = 'full_sorting_merge';
|
||||
-- SET join_algorithm = 'full_sorting_merge';
|
||||
|
||||
{% for kind in ['ALL', 'ANY'] -%}
|
||||
{% for block_size in [32001, 65505, 65536, range(32001, 65536) | random] %}
|
||||
|
||||
SET max_block_size = {{ block_size }};
|
||||
|
||||
SELECT 'ANY INNER';
|
||||
SELECT '{{ kind }} INNER';
|
||||
SELECT sum(t1.key), sum(t2.key), count(), countIf(t1.key != 0), countIf(t2.key != 0) FROM t1
|
||||
ANY INNER JOIN t2
|
||||
{{ kind }} INNER JOIN t2
|
||||
ON t1.key == t2.key
|
||||
;
|
||||
|
||||
SELECT 'ANY LEFT';
|
||||
SELECT '{{ kind }} LEFT';
|
||||
SELECT sum(t1.key), sum(t2.key), count(), countIf(t1.key != 0), countIf(t2.key != 0) FROM t1
|
||||
ANY LEFT JOIN t2
|
||||
{{ kind }} LEFT JOIN t2
|
||||
ON t1.key == t2.key
|
||||
;
|
||||
|
||||
SELECT 'ANY RIGHT';
|
||||
SELECT '{{ kind }} RIGHT';
|
||||
SELECT sum(t1.key), sum(t2.key), count(), countIf(t1.key != 0), countIf(t2.key != 0) FROM t1
|
||||
ANY RIGHT JOIN t2
|
||||
{{ kind }} RIGHT JOIN t2
|
||||
ON t1.key == t2.key
|
||||
;
|
||||
|
||||
{% endfor %}
|
||||
{% endfor -%}
|
||||
{% endfor -%}
|
||||
|
Loading…
Reference in New Issue
Block a user