Buffer left blocks optimisation for MergeJoin (#10601)

This commit is contained in:
Artem Zuikov 2020-06-16 23:13:18 +03:00 committed by GitHub
parent b6a4360c29
commit 857d84bf7c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 294 additions and 102 deletions

View File

@ -163,6 +163,8 @@ using BlocksPtrs = std::shared_ptr<std::vector<BlocksPtr>>;
struct ExtraBlock struct ExtraBlock
{ {
Block block; Block block;
bool empty() const { return !block; }
}; };
using ExtraBlockPtr = std::shared_ptr<ExtraBlock>; using ExtraBlockPtr = std::shared_ptr<ExtraBlock>;

View File

@ -289,6 +289,7 @@ struct Settings : public SettingsCollection<Settings>
M(SettingJoinAlgorithm, join_algorithm, JoinAlgorithm::HASH, "Specify join algorithm: 'auto', 'hash', 'partial_merge', 'prefer_partial_merge'. 'auto' tries to change HashJoin to MergeJoin on the fly to avoid out of memory.", 0) \ M(SettingJoinAlgorithm, join_algorithm, JoinAlgorithm::HASH, "Specify join algorithm: 'auto', 'hash', 'partial_merge', 'prefer_partial_merge'. 'auto' tries to change HashJoin to MergeJoin on the fly to avoid out of memory.", 0) \
M(SettingBool, partial_merge_join_optimizations, true, "Enable optimizations in partial merge join", 0) \ M(SettingBool, partial_merge_join_optimizations, true, "Enable optimizations in partial merge join", 0) \
M(SettingUInt64, default_max_bytes_in_join, 1000000000, "Maximum size of right-side table if limit is required but max_bytes_in_join is not set.", 0) \ M(SettingUInt64, default_max_bytes_in_join, 1000000000, "Maximum size of right-side table if limit is required but max_bytes_in_join is not set.", 0) \
M(SettingUInt64, partial_merge_join_left_table_buffer_bytes, 32000000, "If not 0 group left table blocks in bigger ones for left-side table in partial merge join. It uses up to 2x of specified memory per joining thread. In current version work only with 'partial_merge_join_optimizations = 1'.", 0) \
M(SettingUInt64, partial_merge_join_rows_in_right_blocks, 65536, "Split right-hand joining data in blocks of specified size. It's a portion of data indexed by min-max values and possibly unloaded on disk.", 0) \ M(SettingUInt64, partial_merge_join_rows_in_right_blocks, 65536, "Split right-hand joining data in blocks of specified size. It's a portion of data indexed by min-max values and possibly unloaded on disk.", 0) \
M(SettingUInt64, join_on_disk_max_files_to_merge, 64, "For MergeJoin on disk set how much files it's allowed to sort simultaneously. Then this value bigger then more memory used and then less disk I/O needed. Minimum is 2.", 0) \ M(SettingUInt64, join_on_disk_max_files_to_merge, 64, "For MergeJoin on disk set how much files it's allowed to sort simultaneously. Then this value bigger then more memory used and then less disk I/O needed. Minimum is 2.", 0) \
M(SettingString, temporary_files_codec, "LZ4", "Set compression codec for temporary files (sort and join on disk). I.e. LZ4, NONE.", 0) \ M(SettingString, temporary_files_codec, "LZ4", "Set compression codec for temporary files (sort and join on disk). I.e. LZ4, NONE.", 0) \

View File

@ -55,10 +55,14 @@ Block InflatingExpressionBlockInputStream::readImpl()
} }
Block res; Block res;
if (likely(!not_processed)) bool keep_going = not_processed && not_processed->empty(); /// There's data inside expression.
if (!not_processed || keep_going)
{ {
not_processed.reset();
res = children.back()->read(); res = children.back()->read();
if (res) if (res || keep_going)
expression->execute(res, not_processed, action_number); expression->execute(res, not_processed, action_number);
} }
else else

View File

@ -1151,13 +1151,20 @@ ExpressionAnalysisResult::ExpressionAnalysisResult(
} }
} }
bool has_stream_with_non_joined_rows = (before_join && before_join->getTableJoinAlgo()->hasStreamWithNonJoinedRows()); bool join_allow_read_in_order = true;
if (before_join)
{
/// You may find it strange but we support read_in_order for HashJoin and do not support for MergeJoin.
auto join = before_join->getTableJoinAlgo();
join_allow_read_in_order = typeid_cast<HashJoin *>(join.get()) && !join->hasStreamWithNonJoinedRows();
}
optimize_read_in_order = optimize_read_in_order =
settings.optimize_read_in_order settings.optimize_read_in_order
&& storage && query.orderBy() && storage && query.orderBy()
&& !query_analyzer.hasAggregation() && !query_analyzer.hasAggregation()
&& !query.final() && !query.final()
&& !has_stream_with_non_joined_rows; && join_allow_read_in_order;
/// If there is aggregation, we execute expressions in SELECT and ORDER BY on the initiating server, otherwise on the source servers. /// If there is aggregation, we execute expressions in SELECT and ORDER BY on the initiating server, otherwise on the source servers.
query_analyzer.appendSelect(chain, only_types || (need_aggregate ? !second_stage : !first_stage)); query_analyzer.appendSelect(chain, only_types || (need_aggregate ? !second_stage : !first_stage));

View File

@ -29,12 +29,12 @@ namespace ErrorCodes
namespace namespace
{ {
template <bool has_nulls> template <bool has_left_nulls, bool has_right_nulls>
int nullableCompareAt(const IColumn & left_column, const IColumn & right_column, size_t lhs_pos, size_t rhs_pos) int nullableCompareAt(const IColumn & left_column, const IColumn & right_column, size_t lhs_pos, size_t rhs_pos)
{ {
static constexpr int null_direction_hint = 1; static constexpr int null_direction_hint = 1;
if constexpr (has_nulls) if constexpr (has_left_nulls && has_right_nulls)
{ {
const auto * left_nullable = checkAndGetColumn<ColumnNullable>(left_column); const auto * left_nullable = checkAndGetColumn<ColumnNullable>(left_column);
const auto * right_nullable = checkAndGetColumn<ColumnNullable>(right_column); const auto * right_nullable = checkAndGetColumn<ColumnNullable>(right_column);
@ -48,16 +48,24 @@ int nullableCompareAt(const IColumn & left_column, const IColumn & right_column,
/// NULL != NULL case /// NULL != NULL case
if (left_column.isNullAt(lhs_pos)) if (left_column.isNullAt(lhs_pos))
return null_direction_hint; return null_direction_hint;
}
if (left_nullable && !right_nullable) return 0;
}
}
if constexpr (has_left_nulls)
{
if (const auto * left_nullable = checkAndGetColumn<ColumnNullable>(left_column))
{ {
if (left_column.isNullAt(lhs_pos)) if (left_column.isNullAt(lhs_pos))
return null_direction_hint; return null_direction_hint;
return left_nullable->getNestedColumn().compareAt(lhs_pos, rhs_pos, right_column, null_direction_hint); return left_nullable->getNestedColumn().compareAt(lhs_pos, rhs_pos, right_column, null_direction_hint);
} }
}
if (!left_nullable && right_nullable) if constexpr (has_right_nulls)
{
if (const auto * right_nullable = checkAndGetColumn<ColumnNullable>(right_column))
{ {
if (right_column.isNullAt(rhs_pos)) if (right_column.isNullAt(rhs_pos))
return -null_direction_hint; return -null_direction_hint;
@ -65,7 +73,6 @@ int nullableCompareAt(const IColumn & left_column, const IColumn & right_column,
} }
} }
/// !left_nullable && !right_nullable
return left_column.compareAt(lhs_pos, rhs_pos, right_column, null_direction_hint); return left_column.compareAt(lhs_pos, rhs_pos, right_column, null_direction_hint);
} }
@ -118,26 +125,25 @@ public:
void setCompareNullability(const MergeJoinCursor & rhs) void setCompareNullability(const MergeJoinCursor & rhs)
{ {
has_nullable_columns = false; has_left_nullable = false;
has_right_nullable = false;
for (size_t i = 0; i < impl.sort_columns_size; ++i) for (size_t i = 0; i < impl.sort_columns_size; ++i)
{ {
bool is_left_nullable = isColumnNullable(*impl.sort_columns[i]); has_left_nullable = has_left_nullable || isColumnNullable(*impl.sort_columns[i]);
bool is_right_nullable = isColumnNullable(*rhs.impl.sort_columns[i]); has_right_nullable = has_right_nullable || isColumnNullable(*rhs.impl.sort_columns[i]);
if (is_left_nullable || is_right_nullable)
{
has_nullable_columns = true;
break;
}
} }
} }
Range getNextEqualRange(MergeJoinCursor & rhs) Range getNextEqualRange(MergeJoinCursor & rhs)
{ {
if (has_nullable_columns) if (has_left_nullable && has_right_nullable)
return getNextEqualRangeImpl<true>(rhs); return getNextEqualRangeImpl<true, true>(rhs);
return getNextEqualRangeImpl<false>(rhs); else if (has_left_nullable)
return getNextEqualRangeImpl<true, false>(rhs);
else if (has_right_nullable)
return getNextEqualRangeImpl<false, true>(rhs);
return getNextEqualRangeImpl<false, false>(rhs);
} }
int intersect(const Block & min_max, const Names & key_names) int intersect(const Block & min_max, const Names & key_names)
@ -149,16 +155,16 @@ public:
int first_vs_max = 0; int first_vs_max = 0;
int last_vs_min = 0; int last_vs_min = 0;
for (size_t i = 0; i < impl.sort_columns.size(); ++i) for (size_t i = 0; i < impl.sort_columns_size; ++i)
{ {
const auto & left_column = *impl.sort_columns[i]; const auto & left_column = *impl.sort_columns[i];
const auto & right_column = *min_max.getByName(key_names[i]).column; /// cannot get by position cause of possible duplicates const auto & right_column = *min_max.getByName(key_names[i]).column; /// cannot get by position cause of possible duplicates
if (!first_vs_max) if (!first_vs_max)
first_vs_max = nullableCompareAt<true>(left_column, right_column, position(), 1); first_vs_max = nullableCompareAt<true, true>(left_column, right_column, position(), 1);
if (!last_vs_min) if (!last_vs_min)
last_vs_min = nullableCompareAt<true>(left_column, right_column, last_position, 0); last_vs_min = nullableCompareAt<true, true>(left_column, right_column, last_position, 0);
} }
if (first_vs_max > 0) if (first_vs_max > 0)
@ -170,64 +176,56 @@ public:
private: private:
SortCursorImpl impl; SortCursorImpl impl;
bool has_nullable_columns = false; bool has_left_nullable = false;
bool has_right_nullable = false;
template <bool has_nulls> template <bool left_nulls, bool right_nulls>
Range getNextEqualRangeImpl(MergeJoinCursor & rhs) Range getNextEqualRangeImpl(MergeJoinCursor & rhs)
{ {
while (!atEnd() && !rhs.atEnd()) while (!atEnd() && !rhs.atEnd())
{ {
int cmp = compareAt<has_nulls>(rhs, impl.pos, rhs.impl.pos); int cmp = compareAtCursor<left_nulls, right_nulls>(rhs);
if (cmp < 0) if (cmp < 0)
impl.next(); impl.next();
if (cmp > 0) else if (cmp > 0)
rhs.impl.next(); rhs.impl.next();
if (!cmp) else if (!cmp)
{ return Range{impl.pos, rhs.impl.pos, getEqualLength(), rhs.getEqualLength()};
Range range{impl.pos, rhs.impl.pos, 0, 0};
range.left_length = getEqualLength();
range.right_length = rhs.getEqualLength();
return range;
}
} }
return Range{impl.pos, rhs.impl.pos, 0, 0}; return Range{impl.pos, rhs.impl.pos, 0, 0};
} }
template <bool has_nulls> template <bool left_nulls, bool right_nulls>
int compareAt(const MergeJoinCursor & rhs, size_t lhs_pos, size_t rhs_pos) const int ALWAYS_INLINE compareAtCursor(const MergeJoinCursor & rhs) const
{ {
int res = 0;
for (size_t i = 0; i < impl.sort_columns_size; ++i) for (size_t i = 0; i < impl.sort_columns_size; ++i)
{ {
const auto * left_column = impl.sort_columns[i]; const auto * left_column = impl.sort_columns[i];
const auto * right_column = rhs.impl.sort_columns[i]; const auto * right_column = rhs.impl.sort_columns[i];
res = nullableCompareAt<has_nulls>(*left_column, *right_column, lhs_pos, rhs_pos); int res = nullableCompareAt<left_nulls, right_nulls>(*left_column, *right_column, impl.pos, rhs.impl.pos);
if (res) if (res)
break; return res;
} }
return res; return 0;
} }
/// Expects !atEnd()
size_t getEqualLength() size_t getEqualLength()
{ {
if (atEnd()) size_t pos = impl.pos + 1;
return 0; for (; pos < impl.rows; ++pos)
if (!samePrev(pos))
size_t pos = impl.pos; break;
while (sameNext(pos)) return pos - impl.pos;
++pos;
return pos - impl.pos + 1;
} }
bool sameNext(size_t lhs_pos) const /// Expects lhs_pos > 0
bool ALWAYS_INLINE samePrev(size_t lhs_pos) const
{ {
if (lhs_pos + 1 >= impl.rows)
return false;
for (size_t i = 0; i < impl.sort_columns_size; ++i) for (size_t i = 0; i < impl.sort_columns_size; ++i)
if (impl.sort_columns[i]->compareAt(lhs_pos, lhs_pos + 1, *(impl.sort_columns[i]), 1) != 0) if (impl.sort_columns[i]->compareAt(lhs_pos - 1, lhs_pos, *(impl.sort_columns[i]), 1) != 0)
return false; return false;
return true; return true;
} }
@ -359,7 +357,6 @@ MergeJoin::MergeJoin(std::shared_ptr<TableJoin> table_join_, const Block & right
, is_semi_join(table_join->strictness() == ASTTableJoin::Strictness::Semi) , is_semi_join(table_join->strictness() == ASTTableJoin::Strictness::Semi)
, is_inner(isInner(table_join->kind())) , is_inner(isInner(table_join->kind()))
, is_left(isLeft(table_join->kind())) , is_left(isLeft(table_join->kind()))
, skip_not_intersected(table_join->enablePartialMergeJoinOptimizations())
, max_joined_block_rows(table_join->maxJoinedBlockRows()) , max_joined_block_rows(table_join->maxJoinedBlockRows())
, max_rows_in_right_block(table_join->maxRowsInRightBlock()) , max_rows_in_right_block(table_join->maxRowsInRightBlock())
, max_files_to_merge(table_join->maxFilesToMerge()) , max_files_to_merge(table_join->maxFilesToMerge())
@ -407,6 +404,11 @@ MergeJoin::MergeJoin(std::shared_ptr<TableJoin> table_join_, const Block & right
makeSortAndMerge(table_join->keyNamesLeft(), left_sort_description, left_merge_description); makeSortAndMerge(table_join->keyNamesLeft(), left_sort_description, left_merge_description);
makeSortAndMerge(table_join->keyNamesRight(), right_sort_description, right_merge_description); makeSortAndMerge(table_join->keyNamesRight(), right_sort_description, right_merge_description);
/// Temporary disable 'partial_merge_join_left_table_buffer_bytes' without 'partial_merge_join_optimizations'
if (table_join->enablePartialMergeJoinOptimizations())
if (size_t max_bytes = table_join->maxBytesInLeftBuffer())
left_blocks_buffer = std::make_shared<SortedBlocksBuffer>(left_sort_description, max_bytes);
} }
void MergeJoin::setTotals(const Block & totals_block) void MergeJoin::setTotals(const Block & totals_block)
@ -499,9 +501,7 @@ bool MergeJoin::saveRightBlock(Block && block)
bool has_memory = size_limits.softCheck(right_blocks.row_count, right_blocks.bytes); bool has_memory = size_limits.softCheck(right_blocks.row_count, right_blocks.bytes);
if (!has_memory) if (!has_memory)
{ {
disk_writer = std::make_unique<SortedBlocksWriter>(size_limits, table_join->getTemporaryVolume(), initRightTableWriter();
right_sample_block, right_sort_description, right_blocks,
max_rows_in_right_block, max_files_to_merge, table_join->temporaryFilesCodec());
is_in_memory = false; is_in_memory = false;
} }
} }
@ -521,11 +521,23 @@ bool MergeJoin::addJoinedBlock(const Block & src_block, bool)
void MergeJoin::joinBlock(Block & block, ExtraBlockPtr & not_processed) void MergeJoin::joinBlock(Block & block, ExtraBlockPtr & not_processed)
{ {
JoinCommon::checkTypesOfKeys(block, table_join->keyNamesLeft(), right_table_keys, table_join->keyNamesRight()); if (block)
materializeBlockInplace(block); {
JoinCommon::removeLowCardinalityInplace(block); JoinCommon::checkTypesOfKeys(block, table_join->keyNamesLeft(), right_table_keys, table_join->keyNamesRight());
materializeBlockInplace(block);
JoinCommon::removeLowCardinalityInplace(block);
sortBlock(block, left_sort_description);
}
if (!not_processed && left_blocks_buffer)
{
if (!block || block.rows())
block = left_blocks_buffer->exchange(std::move(block));
if (!block)
return;
}
sortBlock(block, left_sort_description);
if (is_in_memory) if (is_in_memory)
{ {
if (is_all_join) if (is_all_join)
@ -540,12 +552,16 @@ void MergeJoin::joinBlock(Block & block, ExtraBlockPtr & not_processed)
else else
joinSortedBlock<false, false>(block, not_processed); joinSortedBlock<false, false>(block, not_processed);
} }
/// Back thread even with no data. We have some unfinished data in buffer.
if (!not_processed && left_blocks_buffer)
not_processed = std::make_shared<NotProcessed>(NotProcessed{{}, 0, 0, 0});
} }
template <bool in_memory, bool is_all> template <bool in_memory, bool is_all>
void MergeJoin::joinSortedBlock(Block & block, ExtraBlockPtr & not_processed) void MergeJoin::joinSortedBlock(Block & block, ExtraBlockPtr & not_processed)
{ {
std::shared_lock lock(rwlock); //std::shared_lock lock(rwlock);
size_t rows_to_reserve = is_left ? block.rows() : 0; size_t rows_to_reserve = is_left ? block.rows() : 0;
MutableColumns left_columns = makeMutableColumns(block, (is_all ? rows_to_reserve : 0)); MutableColumns left_columns = makeMutableColumns(block, (is_all ? rows_to_reserve : 0));
@ -829,4 +845,13 @@ std::shared_ptr<Block> MergeJoin::loadRightBlock(size_t pos)
return loaded_right_blocks[pos]; return loaded_right_blocks[pos];
} }
void MergeJoin::initRightTableWriter()
{
disk_writer = std::make_unique<SortedBlocksWriter>(size_limits, table_join->getTemporaryVolume(),
right_sample_block, right_sort_description, max_rows_in_right_block, max_files_to_merge,
table_join->temporaryFilesCodec());
disk_writer->addBlocks(right_blocks);
right_blocks.clear();
}
} }

View File

@ -16,6 +16,7 @@ class TableJoin;
class MergeJoinCursor; class MergeJoinCursor;
struct MergeJoinEqualRange; struct MergeJoinEqualRange;
class MergeJoin : public IJoin class MergeJoin : public IJoin
{ {
public: public:
@ -58,6 +59,7 @@ private:
Block right_columns_to_add; Block right_columns_to_add;
SortedBlocksWriter::Blocks right_blocks; SortedBlocksWriter::Blocks right_blocks;
Blocks min_max_right_blocks; Blocks min_max_right_blocks;
std::shared_ptr<SortedBlocksBuffer> left_blocks_buffer;
std::unique_ptr<Cache> cached_right_blocks; std::unique_ptr<Cache> cached_right_blocks;
std::vector<std::shared_ptr<Block>> loaded_right_blocks; std::vector<std::shared_ptr<Block>> loaded_right_blocks;
std::unique_ptr<SortedBlocksWriter> disk_writer; std::unique_ptr<SortedBlocksWriter> disk_writer;
@ -70,7 +72,7 @@ private:
const bool is_semi_join; const bool is_semi_join;
const bool is_inner; const bool is_inner;
const bool is_left; const bool is_left;
const bool skip_not_intersected; static constexpr const bool skip_not_intersected = true; /// skip index for right blocks
const size_t max_joined_block_rows; const size_t max_joined_block_rows;
const size_t max_rows_in_right_block; const size_t max_rows_in_right_block;
const size_t max_files_to_merge; const size_t max_files_to_merge;
@ -103,6 +105,8 @@ private:
void mergeInMemoryRightBlocks(); void mergeInMemoryRightBlocks();
void mergeFlushedRightBlocks(); void mergeFlushedRightBlocks();
void initRightTableWriter();
}; };
} }

View File

@ -136,18 +136,25 @@ SortedBlocksWriter::TmpFilePtr SortedBlocksWriter::flush(const BlocksList & bloc
return flushToFile(path, sample_block, sorted_input, codec); return flushToFile(path, sample_block, sorted_input, codec);
} }
SortedBlocksWriter::SortedFiles SortedBlocksWriter::finishMerge(std::function<void(const Block &)> callback) SortedBlocksWriter::PremergedFiles SortedBlocksWriter::premerge()
{ {
SortedFiles files;
BlocksList blocks;
/// wait other flushes if any /// wait other flushes if any
{ {
std::unique_lock lock{insert_mutex}; std::unique_lock lock{insert_mutex};
files.swap(sorted_files);
blocks.swap(inserted_blocks.blocks);
inserted_blocks.clear();
flush_condvar.wait(lock, [&]{ return !flush_inflight; }); flush_condvar.wait(lock, [&]{ return !flush_inflight; });
} }
/// flush not flushed /// flush not flushed
if (!inserted_blocks.empty()) if (!blocks.empty())
sorted_files.emplace_back(flush(inserted_blocks.blocks)); files.emplace_back(flush(blocks));
inserted_blocks.clear();
BlockInputStreams inputs; BlockInputStreams inputs;
inputs.reserve(num_files_for_merge); inputs.reserve(num_files_for_merge);
@ -155,15 +162,15 @@ SortedBlocksWriter::SortedFiles SortedBlocksWriter::finishMerge(std::function<vo
/// Merge by parts to save memory. It's possible to exchange disk I/O and memory by num_files_for_merge. /// Merge by parts to save memory. It's possible to exchange disk I/O and memory by num_files_for_merge.
{ {
SortedFiles new_files; SortedFiles new_files;
new_files.reserve(sorted_files.size() / num_files_for_merge + 1); new_files.reserve(files.size() / num_files_for_merge + 1);
while (sorted_files.size() > num_files_for_merge) while (files.size() > num_files_for_merge)
{ {
for (const auto & file : sorted_files) for (const auto & file : files)
{ {
inputs.emplace_back(streamFromFile(file)); inputs.emplace_back(streamFromFile(file));
if (inputs.size() == num_files_for_merge || &file == &sorted_files.back()) if (inputs.size() == num_files_for_merge || &file == &files.back())
{ {
MergingSortedBlockInputStream sorted_input(inputs, sort_description, rows_in_block); MergingSortedBlockInputStream sorted_input(inputs, sort_description, rows_in_block);
new_files.emplace_back(flushToFile(getPath(), sample_block, sorted_input, codec)); new_files.emplace_back(flushToFile(getPath(), sample_block, sorted_input, codec));
@ -171,19 +178,22 @@ SortedBlocksWriter::SortedFiles SortedBlocksWriter::finishMerge(std::function<vo
} }
} }
sorted_files.clear(); files.clear();
sorted_files.swap(new_files); files.swap(new_files);
} }
for (const auto & file : sorted_files) for (const auto & file : files)
inputs.emplace_back(streamFromFile(file)); inputs.emplace_back(streamFromFile(file));
} }
MergingSortedBlockInputStream sorted_input(inputs, sort_description, rows_in_block); return PremergedFiles{std::move(files), std::move(inputs)};
}
SortedFiles out = flushToManyFiles(getPath(), sample_block, sorted_input, codec, callback); SortedBlocksWriter::SortedFiles SortedBlocksWriter::finishMerge(std::function<void(const Block &)> callback)
sorted_files.clear(); {
return out; /// There're also inserted_blocks counters as indirect output PremergedFiles files = premerge();
MergingSortedBlockInputStream sorted_input(files.streams, sort_description, rows_in_block);
return flushToManyFiles(getPath(), sample_block, sorted_input, codec, callback);
} }
BlockInputStreamPtr SortedBlocksWriter::streamFromFile(const TmpFilePtr & file) const BlockInputStreamPtr SortedBlocksWriter::streamFromFile(const TmpFilePtr & file) const
@ -196,4 +206,87 @@ String SortedBlocksWriter::getPath() const
return volume->getNextDisk()->getPath(); return volume->getNextDisk()->getPath();
} }
Block SortedBlocksBuffer::exchange(Block && block)
{
static constexpr const float reserve_coef = 1.2;
Blocks out_blocks;
Block empty_out = block.cloneEmpty();
{
std::lock_guard lock(mutex);
if (block)
{
current_bytes += block.bytes();
buffer.emplace_back(std::move(block));
/// Saved. Return empty block with same structure.
if (current_bytes < max_bytes)
return empty_out;
}
/// Not saved. Return buffered.
out_blocks.swap(buffer);
buffer.reserve(out_blocks.size() * reserve_coef);
current_bytes = 0;
}
if (size_t size = out_blocks.size())
{
if (size == 1)
return out_blocks[0];
return mergeBlocks(std::move(out_blocks));
}
return {};
}
Block SortedBlocksBuffer::mergeBlocks(Blocks && blocks) const
{
size_t num_rows = 0;
{ /// Merge sort blocks
BlockInputStreams inputs;
inputs.reserve(blocks.size());
for (auto & block : blocks)
{
num_rows += block.rows();
inputs.emplace_back(std::make_shared<OneBlockInputStream>(block));
}
Blocks tmp_blocks;
MergingSortedBlockInputStream stream(inputs, sort_description, num_rows);
while (const auto & block = stream.read())
tmp_blocks.emplace_back(block);
blocks.swap(tmp_blocks);
}
if (blocks.size() == 1)
return blocks[0];
Block out = blocks[0].cloneEmpty();
{ /// Concatenate blocks
MutableColumns columns = out.mutateColumns();
for (size_t i = 0; i < columns.size(); ++i)
{
columns[i]->reserve(num_rows);
for (const auto & block : blocks)
{
const auto & tmp_column = *block.getByPosition(i).column;
columns[i]->insertRangeFrom(tmp_column, 0, block.rows());
}
}
out.setColumns(std::move(columns));
}
return out;
}
} }

View File

@ -52,15 +52,21 @@ struct SortedBlocksWriter
} }
}; };
struct PremergedFiles
{
SortedFiles files;
BlockInputStreams streams;
};
static constexpr const size_t num_streams = 2; static constexpr const size_t num_streams = 2;
std::mutex insert_mutex; std::mutex insert_mutex;
std::condition_variable flush_condvar; std::condition_variable flush_condvar;
const SizeLimits & size_limits; const SizeLimits & size_limits;
VolumeJBODPtr volume; VolumeJBODPtr volume;
const Block & sample_block; Block sample_block;
const SortDescription & sort_description; const SortDescription & sort_description;
Blocks & inserted_blocks; Blocks inserted_blocks;
const size_t rows_in_block; const size_t rows_in_block;
const size_t num_files_for_merge; const size_t num_files_for_merge;
const String & codec; const String & codec;
@ -70,19 +76,20 @@ struct SortedBlocksWriter
size_t flush_number = 0; size_t flush_number = 0;
size_t flush_inflight = 0; size_t flush_inflight = 0;
SortedBlocksWriter(const SizeLimits & size_limits_, VolumeJBODPtr volume_, const Block & sample_block_, const SortDescription & description, SortedBlocksWriter(const SizeLimits & size_limits_, VolumeJBODPtr volume_, const Block & sample_block_,
Blocks & blocks, size_t rows_in_block_, size_t num_files_to_merge_, const String & codec_) const SortDescription & description, size_t rows_in_block_, size_t num_files_to_merge_, const String & codec_)
: size_limits(size_limits_) : size_limits(size_limits_)
, volume(volume_) , volume(volume_)
, sample_block(sample_block_) , sample_block(sample_block_)
, sort_description(description) , sort_description(description)
, inserted_blocks(blocks)
, rows_in_block(rows_in_block_) , rows_in_block(rows_in_block_)
, num_files_for_merge(num_files_to_merge_) , num_files_for_merge(num_files_to_merge_)
, codec(codec_) , codec(codec_)
{}
void addBlocks(const Blocks & blocks)
{ {
sorted_files.emplace_back(flush(inserted_blocks.blocks)); sorted_files.emplace_back(flush(blocks.blocks));
inserted_blocks.clear();
} }
String getPath() const; String getPath() const;
@ -90,7 +97,30 @@ struct SortedBlocksWriter
void insert(Block && block); void insert(Block && block);
TmpFilePtr flush(const BlocksList & blocks) const; TmpFilePtr flush(const BlocksList & blocks) const;
PremergedFiles premerge();
SortedFiles finishMerge(std::function<void(const Block &)> callback = [](const Block &){}); SortedFiles finishMerge(std::function<void(const Block &)> callback = [](const Block &){});
}; };
class SortedBlocksBuffer
{
public:
SortedBlocksBuffer(const SortDescription & sort_description_, size_t max_bytes_)
: max_bytes(max_bytes_)
, current_bytes(0)
, sort_description(sort_description_)
{}
Block exchange(Block && block);
private:
std::mutex mutex;
size_t max_bytes;
size_t current_bytes;
Blocks buffer;
const SortDescription & sort_description;
Block mergeBlocks(Blocks &&) const;
};
} }

View File

@ -21,6 +21,7 @@ TableJoin::TableJoin(const Settings & settings, VolumeJBODPtr tmp_volume_)
, join_algorithm(settings.join_algorithm) , join_algorithm(settings.join_algorithm)
, partial_merge_join_optimizations(settings.partial_merge_join_optimizations) , partial_merge_join_optimizations(settings.partial_merge_join_optimizations)
, partial_merge_join_rows_in_right_blocks(settings.partial_merge_join_rows_in_right_blocks) , partial_merge_join_rows_in_right_blocks(settings.partial_merge_join_rows_in_right_blocks)
, partial_merge_join_left_table_buffer_bytes(settings.partial_merge_join_left_table_buffer_bytes)
, max_files_to_merge(settings.join_on_disk_max_files_to_merge) , max_files_to_merge(settings.join_on_disk_max_files_to_merge)
, temporary_files_codec(settings.temporary_files_codec) , temporary_files_codec(settings.temporary_files_codec)
, tmp_volume(tmp_volume_) , tmp_volume(tmp_volume_)

View File

@ -50,6 +50,7 @@ class TableJoin
JoinAlgorithm join_algorithm = JoinAlgorithm::AUTO; JoinAlgorithm join_algorithm = JoinAlgorithm::AUTO;
const bool partial_merge_join_optimizations = false; const bool partial_merge_join_optimizations = false;
const size_t partial_merge_join_rows_in_right_blocks = 0; const size_t partial_merge_join_rows_in_right_blocks = 0;
const size_t partial_merge_join_left_table_buffer_bytes = 0;
const size_t max_files_to_merge = 0; const size_t max_files_to_merge = 0;
const String temporary_files_codec = "LZ4"; const String temporary_files_codec = "LZ4";
@ -108,6 +109,7 @@ public:
size_t defaultMaxBytes() const { return default_max_bytes; } size_t defaultMaxBytes() const { return default_max_bytes; }
size_t maxJoinedBlockRows() const { return max_joined_block_rows; } size_t maxJoinedBlockRows() const { return max_joined_block_rows; }
size_t maxRowsInRightBlock() const { return partial_merge_join_rows_in_right_blocks; } size_t maxRowsInRightBlock() const { return partial_merge_join_rows_in_right_blocks; }
size_t maxBytesInLeftBuffer() const { return partial_merge_join_left_table_buffer_bytes; }
size_t maxFilesToMerge() const { return max_files_to_merge; } size_t maxFilesToMerge() const { return max_files_to_merge; }
const String & temporaryFilesCodec() const { return temporary_files_codec; } const String & temporaryFilesCodec() const { return temporary_files_codec; }
bool enablePartialMergeJoinOptimizations() const { return partial_merge_join_optimizations; } bool enablePartialMergeJoinOptimizations() const { return partial_merge_join_optimizations; }

View File

@ -52,12 +52,23 @@ void InflatingExpressionTransform::transform(Chunk & chunk)
Block InflatingExpressionTransform::readExecute(Chunk & chunk) Block InflatingExpressionTransform::readExecute(Chunk & chunk)
{ {
Block res; Block res;
if (likely(!not_processed))
if (!not_processed)
{ {
res = getInputPort().getHeader().cloneWithColumns(chunk.detachColumns()); if (chunk.hasColumns())
res = getInputPort().getHeader().cloneWithColumns(chunk.detachColumns());
if (res) if (res)
expression->execute(res, not_processed, action_number); expression->execute(res, not_processed, action_number);
} }
else if (not_processed->empty()) /// There's not processed data inside expression.
{
if (chunk.hasColumns())
res = getInputPort().getHeader().cloneWithColumns(chunk.detachColumns());
not_processed.reset();
expression->execute(res, not_processed, action_number);
}
else else
{ {
res = std::move(not_processed->block); res = std::move(not_processed->block);

View File

@ -1,9 +1,6 @@
<test> <test>
<create_query>CREATE TABLE ints (i64 Int64, i32 Int32, i16 Int16, i8 Int8) ENGINE = Memory</create_query> <create_query>CREATE TABLE ints (i64 Int64, i32 Int32, i16 Int16, i8 Int8) ENGINE = Memory</create_query>
<create_query>SET partial_merge_join = 1</create_query> <create_query>SET join_algorithm = 'partial_merge'</create_query>
<fill_query>INSERT INTO ints SELECT number AS i64, i64 AS i32, i64 AS i16, i64 AS i8 FROM numbers(5000)</fill_query> <fill_query>INSERT INTO ints SELECT number AS i64, i64 AS i32, i64 AS i16, i64 AS i8 FROM numbers(5000)</fill_query>
<fill_query>INSERT INTO ints SELECT 10000 + number % 1000 AS i64, i64 AS i32, i64 AS i16, i64 AS i8 FROM numbers(5000)</fill_query> <fill_query>INSERT INTO ints SELECT 10000 + number % 1000 AS i64, i64 AS i32, i64 AS i16, i64 AS i8 FROM numbers(5000)</fill_query>
@ -25,6 +22,21 @@
<query tag='LEFT KEY'>SELECT COUNT() FROM ints l LEFT JOIN ints r USING i64,i32,i16,i8 WHERE i32 = 200042</query> <query tag='LEFT KEY'>SELECT COUNT() FROM ints l LEFT JOIN ints r USING i64,i32,i16,i8 WHERE i32 = 200042</query>
<query tag='LEFT ON'>SELECT COUNT() FROM ints l LEFT JOIN ints r ON l.i64 = r.i64 WHERE i32 = 200042</query> <query tag='LEFT ON'>SELECT COUNT() FROM ints l LEFT JOIN ints r ON l.i64 = r.i64 WHERE i32 = 200042</query>
<query tag='LEFT IN'>SELECT COUNT() FROM ints l LEFT JOIN ints r USING i64 WHERE i32 IN(42, 100042, 200042, 300042, 400042)</query> <query tag='LEFT IN'>SELECT COUNT() FROM ints l LEFT JOIN ints r USING i64 WHERE i32 IN(42, 100042, 200042, 300042, 400042)</query>
<query tag='ANY LEFT (noopt)'>SELECT COUNT() FROM ints l ANY LEFT JOIN ints r USING i64 WHERE i32 = 200042 SETTINGS partial_merge_join_optimizations = 0</query>
<query tag='ANY LEFT KEY (noopt)'>SELECT COUNT() FROM ints l ANY LEFT JOIN ints r USING i64,i32,i16,i8 WHERE i32 = 200042 SETTINGS partial_merge_join_optimizations = 0</query>
<query tag='ANY LEFT ON (noopt)'>SELECT COUNT() FROM ints l ANY LEFT JOIN ints r ON l.i64 = r.i64 WHERE i32 = 200042 SETTINGS partial_merge_join_optimizations = 0</query>
<query tag='ANY LEFT IN (noopt)'>SELECT COUNT() FROM ints l ANY LEFT JOIN ints r USING i64 WHERE i32 IN(42, 100042, 200042, 300042, 400042) SETTINGS partial_merge_join_optimizations = 0</query>
<query tag='INNER (noopt)'>SELECT COUNT() FROM ints l INNER JOIN ints r USING i64 WHERE i32 = 200042 SETTINGS partial_merge_join_optimizations = 0</query>
<query tag='INNER KEY (noopt)'>SELECT COUNT() FROM ints l INNER JOIN ints r USING i64,i32,i16,i8 WHERE i32 = 200042 SETTINGS partial_merge_join_optimizations = 0</query>
<query tag='INNER ON (noopt)'>SELECT COUNT() FROM ints l INNER JOIN ints r ON l.i64 = r.i64 WHERE i32 = 200042 SETTINGS partial_merge_join_optimizations = 0</query>
<query tag='INNER IN (noopt)'>SELECT COUNT() FROM ints l INNER JOIN ints r USING i64 WHERE i32 IN(42, 100042, 200042, 300042, 400042) SETTINGS partial_merge_join_optimizations = 0</query>
<query tag='LEFT (noopt)'>SELECT COUNT() FROM ints l LEFT JOIN ints r USING i64 WHERE i32 = 200042 SETTINGS partial_merge_join_optimizations = 0</query>
<query tag='LEFT KEY (noopt)'>SELECT COUNT() FROM ints l LEFT JOIN ints r USING i64,i32,i16,i8 WHERE i32 = 200042 SETTINGS partial_merge_join_optimizations = 0</query>
<query tag='LEFT ON (noopt)'>SELECT COUNT() FROM ints l LEFT JOIN ints r ON l.i64 = r.i64 WHERE i32 = 200042 SETTINGS partial_merge_join_optimizations = 0</query>
<query tag='LEFT IN (noopt)'>SELECT COUNT() FROM ints l LEFT JOIN ints r USING i64 WHERE i32 IN(42, 100042, 200042, 300042, 400042) SETTINGS partial_merge_join_optimizations = 0</query>
<drop_query>DROP TABLE IF EXISTS ints</drop_query> <drop_query>DROP TABLE IF EXISTS ints</drop_query>
</test> </test>

View File

@ -8,7 +8,7 @@ CREATE TABLE t2 (x UInt32, y UInt64) engine = MergeTree ORDER BY (x,y);
INSERT INTO t1 (x, y) VALUES (0, 0); INSERT INTO t1 (x, y) VALUES (0, 0);
SET partial_merge_join = 1; SET join_algorithm = 'prefer_partial_merge';
SET any_join_distinct_right_table_keys = 1; SET any_join_distinct_right_table_keys = 1;
SELECT 't join none using'; SELECT 't join none using';

View File

@ -1,4 +1,4 @@
set partial_merge_join = 1; SET join_algorithm = 'partial_merge';
select s1.x, s2.x from (select 1 as x) s1 left join (select 1 as x) s2 using x; select s1.x, s2.x from (select 1 as x) s1 left join (select 1 as x) s2 using x;
select * from (select materialize(2) as x) s1 left join (select 2 as x) s2 using x; select * from (select materialize(2) as x) s1 left join (select 2 as x) s2 using x;

View File

@ -1,7 +1,7 @@
DROP TABLE IF EXISTS ints; DROP TABLE IF EXISTS ints;
CREATE TABLE ints (i64 Int64, i32 Int32) ENGINE = Memory; CREATE TABLE ints (i64 Int64, i32 Int32) ENGINE = Memory;
SET partial_merge_join = 1; SET join_algorithm = 'partial_merge';
INSERT INTO ints SELECT 1 AS i64, number AS i32 FROM numbers(2); INSERT INTO ints SELECT 1 AS i64, number AS i32 FROM numbers(2);

View File

@ -6,7 +6,7 @@ CREATE TABLE t0 (x UInt32, y UInt64) engine = MergeTree ORDER BY (x,y);
CREATE TABLE t1 (x UInt32, y UInt64) engine = MergeTree ORDER BY (x,y); CREATE TABLE t1 (x UInt32, y UInt64) engine = MergeTree ORDER BY (x,y);
CREATE TABLE t2 (x UInt32, y UInt64) engine = MergeTree ORDER BY (x,y); CREATE TABLE t2 (x UInt32, y UInt64) engine = MergeTree ORDER BY (x,y);
SET partial_merge_join = 1; SET join_algorithm = 'prefer_partial_merge';
SET partial_merge_join_rows_in_right_blocks = 1; SET partial_merge_join_rows_in_right_blocks = 1;
SET any_join_distinct_right_table_keys = 1; SET any_join_distinct_right_table_keys = 1;

View File

@ -8,7 +8,7 @@ ANY LEFT JOIN (
) js2 ) js2
USING n; -- { serverError 241 } USING n; -- { serverError 241 }
SET partial_merge_join = 1; SET join_algorithm = 'partial_merge';
SET default_max_bytes_in_join = 0; SET default_max_bytes_in_join = 0;
SELECT number * 200000 as n, j FROM numbers(5) nums SELECT number * 200000 as n, j FROM numbers(5) nums

View File

@ -6,7 +6,7 @@ CREATE TABLE t0 (x UInt32, y UInt64) engine = MergeTree ORDER BY (x,y);
CREATE TABLE t1 (x UInt32, y UInt64) engine = MergeTree ORDER BY (x,y); CREATE TABLE t1 (x UInt32, y UInt64) engine = MergeTree ORDER BY (x,y);
CREATE TABLE t2 (x UInt32, y UInt64) engine = MergeTree ORDER BY (x,y); CREATE TABLE t2 (x UInt32, y UInt64) engine = MergeTree ORDER BY (x,y);
SET partial_merge_join = 1; SET join_algorithm = 'prefer_partial_merge';
SET partial_merge_join_optimizations = 1; SET partial_merge_join_optimizations = 1;
SET any_join_distinct_right_table_keys = 1; SET any_join_distinct_right_table_keys = 1;

View File

@ -7,7 +7,7 @@ CREATE TABLE t2 (x UInt32, s String) engine = Memory;
INSERT INTO t1 (x, s) VALUES (0, 'a1'), (1, 'a2'), (2, 'a3'), (3, 'a4'), (4, 'a5'); INSERT INTO t1 (x, s) VALUES (0, 'a1'), (1, 'a2'), (2, 'a3'), (3, 'a4'), (4, 'a5');
INSERT INTO t2 (x, s) VALUES (2, 'b1'), (2, 'b2'), (4, 'b3'), (4, 'b4'), (4, 'b5'), (5, 'b6'); INSERT INTO t2 (x, s) VALUES (2, 'b1'), (2, 'b2'), (4, 'b3'), (4, 'b4'), (4, 'b5'), (5, 'b6');
SET partial_merge_join = 1; SET join_algorithm = 'prefer_partial_merge';
SET join_use_nulls = 0; SET join_use_nulls = 0;
SET any_join_distinct_right_table_keys = 0; SET any_join_distinct_right_table_keys = 0;

View File

@ -1,5 +1,5 @@
SET max_memory_usage = 50000000; SET max_memory_usage = 50000000;
SET partial_merge_join = 1; SET join_algorithm = 'partial_merge';
SELECT 'defaults'; SELECT 'defaults';

View File

@ -1,5 +1,5 @@
SET max_memory_usage = 50000000; SET max_memory_usage = 50000000;
SET partial_merge_join = 1; SET join_algorithm = 'partial_merge';
SELECT count(1) FROM ( SELECT count(1) FROM (
SELECT t2.n FROM numbers(10) t1 SELECT t2.n FROM numbers(10) t1

View File

@ -1,4 +1,4 @@
SET partial_merge_join = 1; SET join_algorithm = 'partial_merge';
SELECT count(1), uniqExact(1) FROM ( SELECT count(1), uniqExact(1) FROM (
SELECT materialize(1) as k FROM numbers(1) nums SELECT materialize(1) as k FROM numbers(1) nums