change iface for one-to-many blocks expressions calculation

This commit is contained in:
chertus 2020-01-13 21:00:32 +03:00
parent 4b9acaaa90
commit 8ab68e2c77
10 changed files with 131 additions and 45 deletions

View File

@ -44,4 +44,28 @@ Block ExpressionBlockInputStream::readImpl()
return res;
}
Block SplittingExpressionBlockInputStream::readImpl()
{
if (!initialized)
{
if (expression->resultIsAlwaysEmpty())
return {};
initialized = true;
}
Block res;
if (likely(!not_processed))
{
res = children.back()->read();
if (!res)
return res;
}
else
res.swap(not_processed);
action_number = expression->execute(res, action_number, not_processed);
return res;
}
}

View File

@ -15,10 +15,9 @@ class ExpressionActions;
*/
class ExpressionBlockInputStream : public IBlockInputStream
{
private:
public:
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
public:
ExpressionBlockInputStream(const BlockInputStreamPtr & input, const ExpressionActionsPtr & expression_);
String getName() const override;
@ -26,12 +25,29 @@ public:
Block getHeader() const override;
protected:
bool initialized = false;
ExpressionActionsPtr expression;
Block readImpl() override;
private:
ExpressionActionsPtr expression;
Block cached_header;
bool initialized = false;
};
/// ExpressionBlockInputStream that could generate many out blocks for single input block.
class SplittingExpressionBlockInputStream : public ExpressionBlockInputStream
{
public:
SplittingExpressionBlockInputStream(const BlockInputStreamPtr & input, const ExpressionActionsPtr & expression_)
: ExpressionBlockInputStream(input, expression_)
{}
protected:
Block readImpl() override;
private:
Block not_processed;
size_t action_number = 0;
};
}

View File

@ -346,7 +346,7 @@ void ExpressionAction::prepare(Block & sample_block, const Settings & settings,
}
void ExpressionAction::execute(Block & block, bool dry_run) const
Block ExpressionAction::execute(Block & block, bool dry_run) const
{
size_t input_rows_count = block.rows();
@ -477,8 +477,9 @@ void ExpressionAction::execute(Block & block, bool dry_run) const
case JOIN:
{
join->joinBlock(block);
break;
Block not_processed;
join->joinBlock(block, not_processed);
return not_processed;
}
case PROJECT:
@ -537,6 +538,8 @@ void ExpressionAction::execute(Block & block, bool dry_run) const
break;
}
return {};
}
@ -762,6 +765,20 @@ void ExpressionActions::execute(Block & block, bool dry_run) const
}
}
size_t ExpressionActions::execute(Block & block, size_t start_action, Block & not_processed) const
{
for (size_t i = start_action; i < actions.size(); ++i)
{
not_processed = actions[i].execute(block, false);
checkLimits(block);
if (not_processed)
return i;
}
return 0;
}
bool ExpressionActions::hasTotalsInJoin() const
{
for (const auto & action : actions)

View File

@ -138,8 +138,11 @@ private:
friend class ExpressionActions;
void prepare(Block & sample_block, const Settings & settings, NameSet & names_not_for_constant_folding);
void execute(Block & block, bool dry_run) const;
void executeOnTotals(Block & block) const;
/// Executes action on block (modify it). If block is splitted @returns block of not processed rows, empty block otherwise.
/// Block could be splitted in case of JOIN (or another row multiplying action).
Block execute(Block & block, bool dry_run) const;
};
@ -221,6 +224,10 @@ public:
/// Execute the expression on the block. The block must contain all the columns returned by getRequiredColumns.
void execute(Block & block, bool dry_run = false) const;
/// Execute the expression on the block.
/// @returns starting action to continue with and block with not processed rows if any.
size_t execute(Block & block, size_t start_action, Block & not_processed) const;
/// Check if joined subquery has totals.
bool hasTotalsInJoin() const;

View File

@ -23,7 +23,7 @@ public:
/// Join the block with data from left hand of JOIN to the right hand data (that was previously built by calls to addJoinedBlock).
/// Could be called from different threads in parallel.
virtual void joinBlock(Block & block) = 0;
virtual void joinBlock(Block & block, Block & not_processed) = 0;
virtual bool hasTotals() const = 0;
virtual void setTotals(const Block & block) = 0;

View File

@ -1113,7 +1113,7 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS
header_before_join = pipeline.firstStream()->getHeader();
/// Applies to all sources except stream_with_non_joined_data.
for (auto & stream : pipeline.streams)
stream = std::make_shared<ExpressionBlockInputStream>(stream, expressions.before_join);
stream = std::make_shared<SplittingExpressionBlockInputStream>(stream, expressions.before_join);
if (isMergeJoin(expressions.before_join->getTableJoinAlgo()) && settings.partial_merge_join_optimizations)
{

View File

@ -1091,7 +1091,7 @@ void Join::joinGet(Block & block, const String & column_name) const
}
void Join::joinBlock(Block & block)
void Join::joinBlock(Block & block, Block &)
{
std::shared_lock lock(data->rwlock);

View File

@ -158,7 +158,7 @@ public:
/** Join data from the map (that was previously built by calls to addJoinedBlock) to the block with data from "left" table.
* Could be called from different threads in parallel.
*/
void joinBlock(Block & block) override;
void joinBlock(Block & block, Block & not_processed) override;
/// Infer the return type for joinGet function
DataTypePtr joinGetReturnType(const String & column_name) const;

View File

@ -294,11 +294,14 @@ void joinEqualsAnyLeft(const Block & right_block, const Block & right_columns_to
copyRightRange(right_block, right_columns_to_add, right_columns, range.right_start, range.left_length);
}
template <bool is_all>
void joinEquals(const Block & left_block, const Block & right_block, const Block & right_columns_to_add,
MutableColumns & left_columns, MutableColumns & right_columns, const Range & range, bool is_all)
MutableColumns & left_columns, MutableColumns & right_columns, const Range & range)
{
size_t left_rows_to_add = range.left_length;
size_t right_rows_to_add = is_all ? range.right_length : 1;
size_t right_rows_to_add = 1;
if constexpr (is_all)
right_rows_to_add = range.right_length;
size_t row_position = range.right_start;
for (size_t right_row = 0; right_row < right_rows_to_add; ++right_row, ++row_position)
@ -308,22 +311,20 @@ void joinEquals(const Block & left_block, const Block & right_block, const Block
}
}
void appendNulls(MutableColumns & right_columns, size_t rows_to_add)
{
for (auto & column : right_columns)
column->insertManyDefaults(rows_to_add);
}
template <bool copy_left>
void joinInequalsLeft(const Block & left_block, MutableColumns & left_columns, MutableColumns & right_columns,
size_t start, size_t end, bool copy_left)
size_t start, size_t end)
{
if (end <= start)
return;
size_t rows_to_add = end - start;
if (copy_left)
if constexpr (copy_left)
copyLeftRange(left_block, left_columns, start, rows_to_add);
appendNulls(right_columns, rows_to_add);
/// append nulls
for (auto & column : right_columns)
column->insertManyDefaults(rows_to_add);
}
Blocks blocksListToBlocks(const BlocksList & in_blocks)
@ -427,7 +428,7 @@ MergeJoin::MergeJoin(std::shared_ptr<AnalyzedJoin> table_join_, const Block & ri
, size_limits(table_join->sizeLimits())
, right_sample_block(right_sample_block_)
, nullable_right_side(table_join->forceNullableRight())
, is_all(table_join->strictness() == ASTTableJoin::Strictness::All)
, is_all_join(table_join->strictness() == ASTTableJoin::Strictness::All)
, is_inner(isInner(table_join->kind()))
, is_left(isLeft(table_join->kind()))
, skip_not_intersected(table_join->enablePartialMergeJoinOptimizations())
@ -567,7 +568,8 @@ bool MergeJoin::addJoinedBlock(const Block & src_block)
return saveRightBlock(std::move(block));
}
void MergeJoin::joinBlock(Block & block)
/// TODO: not processed
void MergeJoin::joinBlock(Block & block, Block &)
{
JoinCommon::checkTypesOfKeys(block, table_join->keyNamesLeft(), right_table_keys, table_join->keyNamesRight());
materializeBlockInplace(block);
@ -575,12 +577,22 @@ void MergeJoin::joinBlock(Block & block)
sortBlock(block, left_sort_description);
if (is_in_memory)
joinSortedBlock<true>(block);
{
if (is_all_join)
joinSortedBlock<true, true>(block);
else
joinSortedBlock<true, false>(block);
}
else
joinSortedBlock<false>(block);
{
if (is_all_join)
joinSortedBlock<false, true>(block);
else
joinSortedBlock<false, false>(block);
}
}
template <bool in_memory>
template <bool in_memory, bool is_all>
void MergeJoin::joinSortedBlock(Block & block)
{
std::shared_lock lock(rwlock);
@ -610,11 +622,11 @@ void MergeJoin::joinSortedBlock(Block & block)
std::shared_ptr<Block> right_block = loadRightBlock<in_memory>(i);
leftJoin(left_cursor, block, *right_block, left_columns, right_columns, left_key_tail);
leftJoin<is_all>(left_cursor, block, *right_block, left_columns, right_columns, left_key_tail);
}
left_cursor.nextN(left_key_tail);
joinInequalsLeft(block, left_columns, right_columns, left_cursor.position(), left_cursor.end(), is_all);
joinInequalsLeft<is_all>(block, left_columns, right_columns, left_cursor.position(), left_cursor.end());
//left_cursor.nextN(left_cursor.end() - left_cursor.position());
changeLeftColumns(block, std::move(left_columns));
@ -638,7 +650,7 @@ void MergeJoin::joinSortedBlock(Block & block)
std::shared_ptr<Block> right_block = loadRightBlock<in_memory>(i);
innerJoin(left_cursor, block, *right_block, left_columns, right_columns, left_key_tail);
innerJoin<is_all>(left_cursor, block, *right_block, left_columns, right_columns, left_key_tail);
}
left_cursor.nextN(left_key_tail);
@ -647,6 +659,7 @@ void MergeJoin::joinSortedBlock(Block & block)
}
}
template <bool is_all>
void MergeJoin::leftJoin(MergeJoinCursor & left_cursor, const Block & left_block, const Block & right_block,
MutableColumns & left_columns, MutableColumns & right_columns, size_t & left_key_tail)
{
@ -662,28 +675,32 @@ void MergeJoin::leftJoin(MergeJoinCursor & left_cursor, const Block & left_block
Range range = left_cursor.getNextEqualRange(right_cursor);
joinInequalsLeft(left_block, left_columns, right_columns, left_unequal_position, range.left_start, is_all);
joinInequalsLeft<is_all>(left_block, left_columns, right_columns, left_unequal_position, range.left_start);
if (range.empty())
break;
if (is_all)
joinEquals(left_block, right_block, right_columns_to_add, left_columns, right_columns, range, is_all);
if constexpr (is_all)
joinEquals<true>(left_block, right_block, right_columns_to_add, left_columns, right_columns, range);
else
joinEqualsAnyLeft(right_block, right_columns_to_add, right_columns, range);
right_cursor.nextN(range.right_length);
/// Do not run over last left keys for ALL JOIN (cause of possible duplicates in next right block)
if (is_all && right_cursor.atEnd())
if constexpr (is_all)
{
left_key_tail = range.left_length;
break;
if (right_cursor.atEnd())
{
left_key_tail = range.left_length;
break;
}
}
left_cursor.nextN(range.left_length);
}
}
template <bool is_all>
void MergeJoin::innerJoin(MergeJoinCursor & left_cursor, const Block & left_block, const Block & right_block,
MutableColumns & left_columns, MutableColumns & right_columns, size_t & left_key_tail)
{
@ -696,14 +713,17 @@ void MergeJoin::innerJoin(MergeJoinCursor & left_cursor, const Block & left_bloc
if (range.empty())
break;
joinEquals(left_block, right_block, right_columns_to_add, left_columns, right_columns, range, is_all);
joinEquals<is_all>(left_block, right_block, right_columns_to_add, left_columns, right_columns, range);
right_cursor.nextN(range.right_length);
/// Do not run over last left keys for ALL JOIN (cause of possible duplicates in next right block)
if (is_all && right_cursor.atEnd())
if constexpr (is_all)
{
left_key_tail = range.left_length;
break;
if (right_cursor.atEnd())
{
left_key_tail = range.left_length;
break;
}
}
left_cursor.nextN(range.left_length);
}
@ -711,7 +731,7 @@ void MergeJoin::innerJoin(MergeJoinCursor & left_cursor, const Block & left_bloc
void MergeJoin::changeLeftColumns(Block & block, MutableColumns && columns)
{
if (is_left && !is_all)
if (is_left && !is_all_join)
return;
block.setColumns(std::move(columns));
}

View File

@ -48,7 +48,7 @@ public:
MergeJoin(std::shared_ptr<AnalyzedJoin> table_join_, const Block & right_sample_block);
bool addJoinedBlock(const Block & block) override;
void joinBlock(Block &) override;
void joinBlock(Block &, Block & not_processed) override;
void joinTotals(Block &) const override;
void setTotals(const Block &) override;
bool hasTotals() const override { return totals; }
@ -85,7 +85,7 @@ private:
size_t right_blocks_bytes = 0;
bool is_in_memory = true;
const bool nullable_right_side;
const bool is_all;
const bool is_all_join;
const bool is_inner;
const bool is_left;
const bool skip_not_intersected;
@ -98,13 +98,15 @@ private:
template <bool in_memory>
size_t rightBlocksCount();
template <bool in_memory>
template <bool in_memory, bool is_all>
void joinSortedBlock(Block & block);
template <bool in_memory>
std::shared_ptr<Block> loadRightBlock(size_t pos);
template <bool is_all>
void leftJoin(MergeJoinCursor & left_cursor, const Block & left_block, const Block & right_block,
MutableColumns & left_columns, MutableColumns & right_columns, size_t & left_key_tail);
template <bool is_all>
void innerJoin(MergeJoinCursor & left_cursor, const Block & left_block, const Block & right_block,
MutableColumns & left_columns, MutableColumns & right_columns, size_t & left_key_tail);