Fix build, small changes

This commit is contained in:
vdimir 2022-04-06 14:36:51 +00:00
parent d34a66c915
commit ba787db0bb
No known key found for this signature in database
GPG Key ID: 6EE4CE2BEDC51862
3 changed files with 64 additions and 46 deletions

View File

@ -142,9 +142,12 @@ struct SortCursorImpl
bool isLast() const { return pos + 1 >= rows; }
bool isLast(size_t size) const { return pos + size >= rows; }
bool isValid() const { return pos < rows; }
void next() { ++pos; }
void next(size_t size) { pos += size; }
size_t getSize() const { return rows; }
size_t rowsLeft() const { return rows - pos; }
/// Prevent using pos instead of getRow()
private:

View File

@ -31,19 +31,15 @@ namespace
constexpr size_t EMPTY_VALUE = std::numeric_limits<size_t>::max();
FullMergeJoinCursor createCursor(const Block & block, const Names & columns)
{
SortDescription desc;
desc.reserve(columns.size());
for (const auto & name : columns)
desc.emplace_back(block.getPositionByName(name));
return FullMergeJoinCursor(block.cloneEmpty().getColumns(), desc);
desc.emplace_back(name);
return FullMergeJoinCursor(block, desc);
}
template <bool has_left_nulls, bool has_right_nulls>
int nullableCompareAt(const IColumn & left_column, const IColumn & right_column, size_t lhs_pos, size_t rhs_pos, int null_direction_hint = 1)
{
@ -127,6 +123,15 @@ bool ALWAYS_INLINE totallyLess(const FullMergeJoinCursor & lhs, const FullMergeJ
return cmp < 0;
}
int ALWAYS_INLINE totallyCompare(const FullMergeJoinCursor & lhs, const FullMergeJoinCursor & rhs)
{
if (totallyLess(lhs, rhs))
return -1;
if (totallyLess(rhs, lhs))
return 1;
return 0;
}
void addIndexColumn(const Columns & columns, const IColumn & indices, Chunk & result)
{
for (const auto & col : columns)
@ -201,9 +206,6 @@ void MergeJoinAlgorithm::consume(Input & input, size_t source_num)
{
LOG_DEBUG(log, "Consume from {} chunk: {}", source_num, bool(input.chunk));
if (!input.chunk)
cursors[source_num].completeAll();
prepareChunk(input.chunk);
if (input.chunk.getNumRows() >= EMPTY_VALUE)
@ -212,11 +214,6 @@ void MergeJoinAlgorithm::consume(Input & input, size_t source_num)
cursors[source_num].setInput(std::move(input));
}
static size_t ALWAYS_INLINE rowsLeft(SortCursor cursor)
{
return cursor->rows - cursor->getPosRef();
}
using JoinKind = ASTTableJoin::Kind;
template <JoinKind kind>
@ -224,9 +221,9 @@ static void leftOrFullAny(SortCursor left_cursor, SortCursor right_cursor, Padde
{
static_assert(kind == JoinKind::Left || kind == JoinKind::Right || kind == JoinKind::Inner, "Invalid join kind");
size_t num_rows = kind == JoinKind::Left ? rowsLeft(left_cursor) :
kind == JoinKind::Right ? rowsLeft(right_cursor) :
std::min(rowsLeft(left_cursor), rowsLeft(right_cursor));
size_t num_rows = kind == JoinKind::Left ? left_cursor->rowsLeft() :
kind == JoinKind::Right ? right_cursor->rowsLeft() :
std::min(left_cursor->rowsLeft(), right_cursor->rowsLeft());
constexpr bool is_left_or_inner = kind == JoinKind::Left || kind == JoinKind::Inner;
constexpr bool is_right_or_inner = kind == JoinKind::Right || kind == JoinKind::Inner;
@ -291,27 +288,32 @@ static Chunk createBlockWithDefaults(const Chunk & lhs, const Chunk & rhs)
return result;
}
static bool isFinished(const std::vector<FullMergeJoinCursor> & cursors, JoinKind kind)
{
return (cursors[0].fullyCompleted() && cursors[1].fullyCompleted())
|| ((isLeft(kind) || isInner(kind)) && cursors[0].fullyCompleted())
|| ((isRight(kind) || isInner(kind)) && cursors[1].fullyCompleted());
}
IMergingAlgorithm::Status MergeJoinAlgorithm::merge()
{
if (!cursors[0].isValid() && !cursors[0].fullyCompleted())
{
return Status(0);
}
if (!cursors[1].isValid() && !cursors[1].fullyCompleted())
{
return Status(1);
}
JoinKind kind = table_join->getTableJoin().kind();
if (cursors[0].fullyCompleted() && cursors[1].fullyCompleted())
if (isFinished(cursors, kind))
{
return Status({}, true);
}
if (isInner(kind) && (cursors[0].fullyCompleted() || cursors[1].fullyCompleted()))
{
LOG_DEBUG(log, "{}:{} ", __FILE__, __LINE__);
return Status({}, true);
}
if (cursors[0].fullyCompleted() && isRightOrFull(kind))
{
Chunk result = createBlockWithDefaults(sample_chunks[0], cursors[1].moveCurrentChunk());
@ -324,21 +326,32 @@ IMergingAlgorithm::Status MergeJoinAlgorithm::merge()
return Status(std::move(result));
}
if (!cursors[0]->isValid() || totallyLess(cursors[0], cursors[1]))
if (int cmp = totallyCompare(cursors[0], cursors[1]); cmp != 0)
{
if (cursors[0]->isValid() && isLeft(kind))
if (cursors[0]->isValid() && isLeftOrFull(kind))
{
Chunk result = createBlockWithDefaults(cursors[0].moveCurrentChunk(), sample_chunks[1]);
return Status(std::move(result), false);
return Status(createBlockWithDefaults(cursors[0].moveCurrentChunk(), sample_chunks[1]));
}
if (isRightOrFull(kind) && cursors[1]->isValid())
{
return Status(createBlockWithDefaults(sample_chunks[0], cursors[1].moveCurrentChunk()));
}
cursors[0].moveCurrentChunk();
if (cursors[0].fullyCompleted())
return Status({}, true);
return Status(0);
}
// if (!cursors[1]->isValid() || totallyLess(cursors[1], cursors[0]))
// ...
if (cmp < 0)
{
cursors[0].moveCurrentChunk();
return Status(0);
}
if (cmp > 0)
{
cursors[1].moveCurrentChunk();
return Status(1);
}
if (!isInner(kind) && !isLeft(kind) && !isRight(kind) && !isFull(kind))
throw DB::Exception(ErrorCodes::NOT_IMPLEMENTED, "Not implemented for kind {}", kind);
}
auto left_map = ColumnUInt64::create();
auto right_map = ColumnUInt64::create();
@ -359,11 +372,13 @@ IMergingAlgorithm::Status MergeJoinAlgorithm::merge()
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Unsupported join kind: \"{}\"", table_join->getTableJoin().kind());
}
assert(left_map->empty() || right_map->empty() || left_map->size() == right_map->size());
Chunk result;
addIndexColumn(cursors[0].getCurrentChunk().getColumns(), *left_map, result);
addIndexColumn(cursors[1].getCurrentChunk().getColumns(), *right_map, result);
return Status(std::move(result), cursors[0].fullyCompleted() && cursors[1].fullyCompleted());
return Status(std::move(result), isFinished(cursors, kind));
}
MergeJoinTransform::MergeJoinTransform(

View File

@ -29,8 +29,9 @@ class FullMergeJoinCursor
{
public:
FullMergeJoinCursor(const Columns & columns, const SortDescription & desc)
: impl(columns, desc)
FullMergeJoinCursor(const Block & block, const SortDescription & desc)
: impl(block, desc)
, sample_block(block)
{
}
@ -82,14 +83,14 @@ public:
current_input = std::move(input);
if (!current_input.chunk)
completeAll();
fully_completed = true;
if (current_input.skip_last_row)
throw Exception("MergeJoinAlgorithm does not support skipLastRow", ErrorCodes::LOGICAL_ERROR);
if (current_input.chunk)
{
impl.reset(current_input.chunk.getColumns(), {}, current_input.permutation);
impl.reset(current_input.chunk.getColumns(), sample_block, current_input.permutation);
}
}
@ -98,17 +99,19 @@ public:
return current_input.chunk && impl.isValid();
}
bool fullyCompleted() const { return fully_completed; }
void completeAll() { fully_completed = true; }
bool fullyCompleted() const { return !isValid() && fully_completed; }
SortCursorImpl * operator-> () { return &impl; }
const SortCursorImpl * operator-> () const { return &impl; }
private:
SortCursorImpl impl;
IMergingAlgorithm::Input current_input;
bool fully_completed = false;
Block sample_block;
// bool has_left_nullable = false;
// bool has_right_nullable = false;
};
@ -127,9 +130,6 @@ public:
virtual Status merge() override;
private:
SortDescription left_desc;
SortDescription right_desc;
std::vector<FullMergeJoinCursor> cursors;
std::vector<Chunk> sample_chunks;