From 857d84bf7cdccafa2f6f06347813cab19f8e0205 Mon Sep 17 00:00:00 2001 From: Artem Zuikov Date: Tue, 16 Jun 2020 23:13:18 +0300 Subject: [PATCH] Buffer left blocks optimisation for MergeJoin (#10601) --- src/Core/Block.h | 2 + src/Core/Settings.h | 1 + .../ExpressionBlockInputStream.cpp | 8 +- src/Interpreters/ExpressionAnalyzer.cpp | 11 +- src/Interpreters/MergeJoin.cpp | 143 ++++++++++-------- src/Interpreters/MergeJoin.h | 6 +- src/Interpreters/SortedBlocksWriter.cpp | 123 +++++++++++++-- src/Interpreters/SortedBlocksWriter.h | 44 +++++- src/Interpreters/TableJoin.cpp | 1 + src/Interpreters/TableJoin.h | 2 + .../InflatingExpressionTransform.cpp | 15 +- tests/performance/joins_in_memory_pmj.xml | 20 ++- .../0_stateless/01010_partial_merge_join.sql | 2 +- .../01010_partial_merge_join_const_and_lc.sql | 2 +- .../01010_pm_join_all_join_bug.sql | 2 +- .../0_stateless/01010_pmj_one_row_blocks.sql | 2 +- .../01010_pmj_right_table_memory_limits.sql | 2 +- .../0_stateless/01010_pmj_skip_blocks.sql | 2 +- .../01031_pmj_new_any_semi_join.sql | 2 +- ...62_pm_all_join_with_block_continuation.sql | 2 +- .../01062_pm_multiple_all_join_same_value.sql | 2 +- .../01064_pm_all_join_const_and_nullable.sql | 2 +- 22 files changed, 294 insertions(+), 102 deletions(-) diff --git a/src/Core/Block.h b/src/Core/Block.h index ce804ddc0b5..31e3ffc14de 100644 --- a/src/Core/Block.h +++ b/src/Core/Block.h @@ -163,6 +163,8 @@ using BlocksPtrs = std::shared_ptr>; struct ExtraBlock { Block block; + + bool empty() const { return !block; } }; using ExtraBlockPtr = std::shared_ptr; diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 0693f53db9f..0fe70297981 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -289,6 +289,7 @@ struct Settings : public SettingsCollection M(SettingJoinAlgorithm, join_algorithm, JoinAlgorithm::HASH, "Specify join algorithm: 'auto', 'hash', 'partial_merge', 'prefer_partial_merge'. 'auto' tries to change HashJoin to MergeJoin on the fly to avoid out of memory.", 0) \ M(SettingBool, partial_merge_join_optimizations, true, "Enable optimizations in partial merge join", 0) \ M(SettingUInt64, default_max_bytes_in_join, 1000000000, "Maximum size of right-side table if limit is required but max_bytes_in_join is not set.", 0) \ + M(SettingUInt64, partial_merge_join_left_table_buffer_bytes, 32000000, "If not 0 group left table blocks in bigger ones for left-side table in partial merge join. It uses up to 2x of specified memory per joining thread. In current version work only with 'partial_merge_join_optimizations = 1'.", 0) \ M(SettingUInt64, partial_merge_join_rows_in_right_blocks, 65536, "Split right-hand joining data in blocks of specified size. It's a portion of data indexed by min-max values and possibly unloaded on disk.", 0) \ M(SettingUInt64, join_on_disk_max_files_to_merge, 64, "For MergeJoin on disk set how much files it's allowed to sort simultaneously. Then this value bigger then more memory used and then less disk I/O needed. Minimum is 2.", 0) \ M(SettingString, temporary_files_codec, "LZ4", "Set compression codec for temporary files (sort and join on disk). I.e. LZ4, NONE.", 0) \ diff --git a/src/DataStreams/ExpressionBlockInputStream.cpp b/src/DataStreams/ExpressionBlockInputStream.cpp index 5a4fe4ecf1d..cce02af8262 100644 --- a/src/DataStreams/ExpressionBlockInputStream.cpp +++ b/src/DataStreams/ExpressionBlockInputStream.cpp @@ -55,10 +55,14 @@ Block InflatingExpressionBlockInputStream::readImpl() } Block res; - if (likely(!not_processed)) + bool keep_going = not_processed && not_processed->empty(); /// There's data inside expression. + + if (!not_processed || keep_going) { + not_processed.reset(); + res = children.back()->read(); - if (res) + if (res || keep_going) expression->execute(res, not_processed, action_number); } else diff --git a/src/Interpreters/ExpressionAnalyzer.cpp b/src/Interpreters/ExpressionAnalyzer.cpp index ecfa011f1c8..04abba06912 100644 --- a/src/Interpreters/ExpressionAnalyzer.cpp +++ b/src/Interpreters/ExpressionAnalyzer.cpp @@ -1151,13 +1151,20 @@ ExpressionAnalysisResult::ExpressionAnalysisResult( } } - bool has_stream_with_non_joined_rows = (before_join && before_join->getTableJoinAlgo()->hasStreamWithNonJoinedRows()); + bool join_allow_read_in_order = true; + if (before_join) + { + /// You may find it strange but we support read_in_order for HashJoin and do not support for MergeJoin. + auto join = before_join->getTableJoinAlgo(); + join_allow_read_in_order = typeid_cast(join.get()) && !join->hasStreamWithNonJoinedRows(); + } + optimize_read_in_order = settings.optimize_read_in_order && storage && query.orderBy() && !query_analyzer.hasAggregation() && !query.final() - && !has_stream_with_non_joined_rows; + && join_allow_read_in_order; /// If there is aggregation, we execute expressions in SELECT and ORDER BY on the initiating server, otherwise on the source servers. query_analyzer.appendSelect(chain, only_types || (need_aggregate ? !second_stage : !first_stage)); diff --git a/src/Interpreters/MergeJoin.cpp b/src/Interpreters/MergeJoin.cpp index eb816a96e52..bb054169e71 100644 --- a/src/Interpreters/MergeJoin.cpp +++ b/src/Interpreters/MergeJoin.cpp @@ -29,12 +29,12 @@ namespace ErrorCodes namespace { -template +template int nullableCompareAt(const IColumn & left_column, const IColumn & right_column, size_t lhs_pos, size_t rhs_pos) { static constexpr int null_direction_hint = 1; - if constexpr (has_nulls) + if constexpr (has_left_nulls && has_right_nulls) { const auto * left_nullable = checkAndGetColumn(left_column); const auto * right_nullable = checkAndGetColumn(right_column); @@ -48,16 +48,24 @@ int nullableCompareAt(const IColumn & left_column, const IColumn & right_column, /// NULL != NULL case if (left_column.isNullAt(lhs_pos)) return null_direction_hint; - } - if (left_nullable && !right_nullable) + return 0; + } + } + + if constexpr (has_left_nulls) + { + if (const auto * left_nullable = checkAndGetColumn(left_column)) { if (left_column.isNullAt(lhs_pos)) return null_direction_hint; return left_nullable->getNestedColumn().compareAt(lhs_pos, rhs_pos, right_column, null_direction_hint); } + } - if (!left_nullable && right_nullable) + if constexpr (has_right_nulls) + { + if (const auto * right_nullable = checkAndGetColumn(right_column)) { if (right_column.isNullAt(rhs_pos)) return -null_direction_hint; @@ -65,7 +73,6 @@ int nullableCompareAt(const IColumn & left_column, const IColumn & right_column, } } - /// !left_nullable && !right_nullable return left_column.compareAt(lhs_pos, rhs_pos, right_column, null_direction_hint); } @@ -118,26 +125,25 @@ public: void setCompareNullability(const MergeJoinCursor & rhs) { - has_nullable_columns = false; + has_left_nullable = false; + has_right_nullable = false; for (size_t i = 0; i < impl.sort_columns_size; ++i) { - bool is_left_nullable = isColumnNullable(*impl.sort_columns[i]); - bool is_right_nullable = isColumnNullable(*rhs.impl.sort_columns[i]); - - if (is_left_nullable || is_right_nullable) - { - has_nullable_columns = true; - break; - } + has_left_nullable = has_left_nullable || isColumnNullable(*impl.sort_columns[i]); + has_right_nullable = has_right_nullable || isColumnNullable(*rhs.impl.sort_columns[i]); } } Range getNextEqualRange(MergeJoinCursor & rhs) { - if (has_nullable_columns) - return getNextEqualRangeImpl(rhs); - return getNextEqualRangeImpl(rhs); + if (has_left_nullable && has_right_nullable) + return getNextEqualRangeImpl(rhs); + else if (has_left_nullable) + return getNextEqualRangeImpl(rhs); + else if (has_right_nullable) + return getNextEqualRangeImpl(rhs); + return getNextEqualRangeImpl(rhs); } int intersect(const Block & min_max, const Names & key_names) @@ -149,16 +155,16 @@ public: int first_vs_max = 0; int last_vs_min = 0; - for (size_t i = 0; i < impl.sort_columns.size(); ++i) + for (size_t i = 0; i < impl.sort_columns_size; ++i) { const auto & left_column = *impl.sort_columns[i]; const auto & right_column = *min_max.getByName(key_names[i]).column; /// cannot get by position cause of possible duplicates if (!first_vs_max) - first_vs_max = nullableCompareAt(left_column, right_column, position(), 1); + first_vs_max = nullableCompareAt(left_column, right_column, position(), 1); if (!last_vs_min) - last_vs_min = nullableCompareAt(left_column, right_column, last_position, 0); + last_vs_min = nullableCompareAt(left_column, right_column, last_position, 0); } if (first_vs_max > 0) @@ -170,64 +176,56 @@ public: private: SortCursorImpl impl; - bool has_nullable_columns = false; + bool has_left_nullable = false; + bool has_right_nullable = false; - template + template Range getNextEqualRangeImpl(MergeJoinCursor & rhs) { while (!atEnd() && !rhs.atEnd()) { - int cmp = compareAt(rhs, impl.pos, rhs.impl.pos); + int cmp = compareAtCursor(rhs); if (cmp < 0) impl.next(); - if (cmp > 0) + else if (cmp > 0) rhs.impl.next(); - if (!cmp) - { - Range range{impl.pos, rhs.impl.pos, 0, 0}; - range.left_length = getEqualLength(); - range.right_length = rhs.getEqualLength(); - return range; - } + else if (!cmp) + return Range{impl.pos, rhs.impl.pos, getEqualLength(), rhs.getEqualLength()}; } return Range{impl.pos, rhs.impl.pos, 0, 0}; } - template - int compareAt(const MergeJoinCursor & rhs, size_t lhs_pos, size_t rhs_pos) const + template + int ALWAYS_INLINE compareAtCursor(const MergeJoinCursor & rhs) const { - int res = 0; for (size_t i = 0; i < impl.sort_columns_size; ++i) { const auto * left_column = impl.sort_columns[i]; const auto * right_column = rhs.impl.sort_columns[i]; - res = nullableCompareAt(*left_column, *right_column, lhs_pos, rhs_pos); + int res = nullableCompareAt(*left_column, *right_column, impl.pos, rhs.impl.pos); if (res) - break; + return res; } - return res; + return 0; } + /// Expects !atEnd() size_t getEqualLength() { - if (atEnd()) - return 0; - - size_t pos = impl.pos; - while (sameNext(pos)) - ++pos; - return pos - impl.pos + 1; + size_t pos = impl.pos + 1; + for (; pos < impl.rows; ++pos) + if (!samePrev(pos)) + break; + return pos - impl.pos; } - bool sameNext(size_t lhs_pos) const + /// Expects lhs_pos > 0 + bool ALWAYS_INLINE samePrev(size_t lhs_pos) const { - if (lhs_pos + 1 >= impl.rows) - return false; - for (size_t i = 0; i < impl.sort_columns_size; ++i) - if (impl.sort_columns[i]->compareAt(lhs_pos, lhs_pos + 1, *(impl.sort_columns[i]), 1) != 0) + if (impl.sort_columns[i]->compareAt(lhs_pos - 1, lhs_pos, *(impl.sort_columns[i]), 1) != 0) return false; return true; } @@ -359,7 +357,6 @@ MergeJoin::MergeJoin(std::shared_ptr table_join_, const Block & right , is_semi_join(table_join->strictness() == ASTTableJoin::Strictness::Semi) , is_inner(isInner(table_join->kind())) , is_left(isLeft(table_join->kind())) - , skip_not_intersected(table_join->enablePartialMergeJoinOptimizations()) , max_joined_block_rows(table_join->maxJoinedBlockRows()) , max_rows_in_right_block(table_join->maxRowsInRightBlock()) , max_files_to_merge(table_join->maxFilesToMerge()) @@ -407,6 +404,11 @@ MergeJoin::MergeJoin(std::shared_ptr table_join_, const Block & right makeSortAndMerge(table_join->keyNamesLeft(), left_sort_description, left_merge_description); makeSortAndMerge(table_join->keyNamesRight(), right_sort_description, right_merge_description); + + /// Temporary disable 'partial_merge_join_left_table_buffer_bytes' without 'partial_merge_join_optimizations' + if (table_join->enablePartialMergeJoinOptimizations()) + if (size_t max_bytes = table_join->maxBytesInLeftBuffer()) + left_blocks_buffer = std::make_shared(left_sort_description, max_bytes); } void MergeJoin::setTotals(const Block & totals_block) @@ -499,9 +501,7 @@ bool MergeJoin::saveRightBlock(Block && block) bool has_memory = size_limits.softCheck(right_blocks.row_count, right_blocks.bytes); if (!has_memory) { - disk_writer = std::make_unique(size_limits, table_join->getTemporaryVolume(), - right_sample_block, right_sort_description, right_blocks, - max_rows_in_right_block, max_files_to_merge, table_join->temporaryFilesCodec()); + initRightTableWriter(); is_in_memory = false; } } @@ -521,11 +521,23 @@ bool MergeJoin::addJoinedBlock(const Block & src_block, bool) void MergeJoin::joinBlock(Block & block, ExtraBlockPtr & not_processed) { - JoinCommon::checkTypesOfKeys(block, table_join->keyNamesLeft(), right_table_keys, table_join->keyNamesRight()); - materializeBlockInplace(block); - JoinCommon::removeLowCardinalityInplace(block); + if (block) + { + JoinCommon::checkTypesOfKeys(block, table_join->keyNamesLeft(), right_table_keys, table_join->keyNamesRight()); + materializeBlockInplace(block); + JoinCommon::removeLowCardinalityInplace(block); + + sortBlock(block, left_sort_description); + } + + if (!not_processed && left_blocks_buffer) + { + if (!block || block.rows()) + block = left_blocks_buffer->exchange(std::move(block)); + if (!block) + return; + } - sortBlock(block, left_sort_description); if (is_in_memory) { if (is_all_join) @@ -540,12 +552,16 @@ void MergeJoin::joinBlock(Block & block, ExtraBlockPtr & not_processed) else joinSortedBlock(block, not_processed); } + + /// Back thread even with no data. We have some unfinished data in buffer. + if (!not_processed && left_blocks_buffer) + not_processed = std::make_shared(NotProcessed{{}, 0, 0, 0}); } template void MergeJoin::joinSortedBlock(Block & block, ExtraBlockPtr & not_processed) { - std::shared_lock lock(rwlock); + //std::shared_lock lock(rwlock); size_t rows_to_reserve = is_left ? block.rows() : 0; MutableColumns left_columns = makeMutableColumns(block, (is_all ? rows_to_reserve : 0)); @@ -829,4 +845,13 @@ std::shared_ptr MergeJoin::loadRightBlock(size_t pos) return loaded_right_blocks[pos]; } +void MergeJoin::initRightTableWriter() +{ + disk_writer = std::make_unique(size_limits, table_join->getTemporaryVolume(), + right_sample_block, right_sort_description, max_rows_in_right_block, max_files_to_merge, + table_join->temporaryFilesCodec()); + disk_writer->addBlocks(right_blocks); + right_blocks.clear(); +} + } diff --git a/src/Interpreters/MergeJoin.h b/src/Interpreters/MergeJoin.h index 479b39b1b9a..0bd2bcbd127 100644 --- a/src/Interpreters/MergeJoin.h +++ b/src/Interpreters/MergeJoin.h @@ -16,6 +16,7 @@ class TableJoin; class MergeJoinCursor; struct MergeJoinEqualRange; + class MergeJoin : public IJoin { public: @@ -58,6 +59,7 @@ private: Block right_columns_to_add; SortedBlocksWriter::Blocks right_blocks; Blocks min_max_right_blocks; + std::shared_ptr left_blocks_buffer; std::unique_ptr cached_right_blocks; std::vector> loaded_right_blocks; std::unique_ptr disk_writer; @@ -70,7 +72,7 @@ private: const bool is_semi_join; const bool is_inner; const bool is_left; - const bool skip_not_intersected; + static constexpr const bool skip_not_intersected = true; /// skip index for right blocks const size_t max_joined_block_rows; const size_t max_rows_in_right_block; const size_t max_files_to_merge; @@ -103,6 +105,8 @@ private: void mergeInMemoryRightBlocks(); void mergeFlushedRightBlocks(); + + void initRightTableWriter(); }; } diff --git a/src/Interpreters/SortedBlocksWriter.cpp b/src/Interpreters/SortedBlocksWriter.cpp index 385c67292ad..3fa6fbe153e 100644 --- a/src/Interpreters/SortedBlocksWriter.cpp +++ b/src/Interpreters/SortedBlocksWriter.cpp @@ -136,18 +136,25 @@ SortedBlocksWriter::TmpFilePtr SortedBlocksWriter::flush(const BlocksList & bloc return flushToFile(path, sample_block, sorted_input, codec); } -SortedBlocksWriter::SortedFiles SortedBlocksWriter::finishMerge(std::function callback) +SortedBlocksWriter::PremergedFiles SortedBlocksWriter::premerge() { + SortedFiles files; + BlocksList blocks; + /// wait other flushes if any { std::unique_lock lock{insert_mutex}; + + files.swap(sorted_files); + blocks.swap(inserted_blocks.blocks); + inserted_blocks.clear(); + flush_condvar.wait(lock, [&]{ return !flush_inflight; }); } /// flush not flushed - if (!inserted_blocks.empty()) - sorted_files.emplace_back(flush(inserted_blocks.blocks)); - inserted_blocks.clear(); + if (!blocks.empty()) + files.emplace_back(flush(blocks)); BlockInputStreams inputs; inputs.reserve(num_files_for_merge); @@ -155,15 +162,15 @@ SortedBlocksWriter::SortedFiles SortedBlocksWriter::finishMerge(std::function num_files_for_merge) + while (files.size() > num_files_for_merge) { - for (const auto & file : sorted_files) + for (const auto & file : files) { inputs.emplace_back(streamFromFile(file)); - if (inputs.size() == num_files_for_merge || &file == &sorted_files.back()) + if (inputs.size() == num_files_for_merge || &file == &files.back()) { MergingSortedBlockInputStream sorted_input(inputs, sort_description, rows_in_block); new_files.emplace_back(flushToFile(getPath(), sample_block, sorted_input, codec)); @@ -171,19 +178,22 @@ SortedBlocksWriter::SortedFiles SortedBlocksWriter::finishMerge(std::function callback) +{ + PremergedFiles files = premerge(); + MergingSortedBlockInputStream sorted_input(files.streams, sort_description, rows_in_block); + return flushToManyFiles(getPath(), sample_block, sorted_input, codec, callback); } BlockInputStreamPtr SortedBlocksWriter::streamFromFile(const TmpFilePtr & file) const @@ -196,4 +206,87 @@ String SortedBlocksWriter::getPath() const return volume->getNextDisk()->getPath(); } + +Block SortedBlocksBuffer::exchange(Block && block) +{ + static constexpr const float reserve_coef = 1.2; + + Blocks out_blocks; + Block empty_out = block.cloneEmpty(); + + { + std::lock_guard lock(mutex); + + if (block) + { + current_bytes += block.bytes(); + buffer.emplace_back(std::move(block)); + + /// Saved. Return empty block with same structure. + if (current_bytes < max_bytes) + return empty_out; + } + + /// Not saved. Return buffered. + out_blocks.swap(buffer); + buffer.reserve(out_blocks.size() * reserve_coef); + current_bytes = 0; + } + + if (size_t size = out_blocks.size()) + { + if (size == 1) + return out_blocks[0]; + return mergeBlocks(std::move(out_blocks)); + } + + return {}; +} + +Block SortedBlocksBuffer::mergeBlocks(Blocks && blocks) const +{ + size_t num_rows = 0; + + { /// Merge sort blocks + BlockInputStreams inputs; + inputs.reserve(blocks.size()); + + for (auto & block : blocks) + { + num_rows += block.rows(); + inputs.emplace_back(std::make_shared(block)); + } + + Blocks tmp_blocks; + MergingSortedBlockInputStream stream(inputs, sort_description, num_rows); + while (const auto & block = stream.read()) + tmp_blocks.emplace_back(block); + + blocks.swap(tmp_blocks); + } + + if (blocks.size() == 1) + return blocks[0]; + + Block out = blocks[0].cloneEmpty(); + + { /// Concatenate blocks + MutableColumns columns = out.mutateColumns(); + + for (size_t i = 0; i < columns.size(); ++i) + { + columns[i]->reserve(num_rows); + for (const auto & block : blocks) + { + const auto & tmp_column = *block.getByPosition(i).column; + columns[i]->insertRangeFrom(tmp_column, 0, block.rows()); + } + } + + out.setColumns(std::move(columns)); + } + + return out; +} + } diff --git a/src/Interpreters/SortedBlocksWriter.h b/src/Interpreters/SortedBlocksWriter.h index 073c37c4ff9..11e7c8c1413 100644 --- a/src/Interpreters/SortedBlocksWriter.h +++ b/src/Interpreters/SortedBlocksWriter.h @@ -52,15 +52,21 @@ struct SortedBlocksWriter } }; + struct PremergedFiles + { + SortedFiles files; + BlockInputStreams streams; + }; + static constexpr const size_t num_streams = 2; std::mutex insert_mutex; std::condition_variable flush_condvar; const SizeLimits & size_limits; VolumeJBODPtr volume; - const Block & sample_block; + Block sample_block; const SortDescription & sort_description; - Blocks & inserted_blocks; + Blocks inserted_blocks; const size_t rows_in_block; const size_t num_files_for_merge; const String & codec; @@ -70,19 +76,20 @@ struct SortedBlocksWriter size_t flush_number = 0; size_t flush_inflight = 0; - SortedBlocksWriter(const SizeLimits & size_limits_, VolumeJBODPtr volume_, const Block & sample_block_, const SortDescription & description, - Blocks & blocks, size_t rows_in_block_, size_t num_files_to_merge_, const String & codec_) + SortedBlocksWriter(const SizeLimits & size_limits_, VolumeJBODPtr volume_, const Block & sample_block_, + const SortDescription & description, size_t rows_in_block_, size_t num_files_to_merge_, const String & codec_) : size_limits(size_limits_) , volume(volume_) , sample_block(sample_block_) , sort_description(description) - , inserted_blocks(blocks) , rows_in_block(rows_in_block_) , num_files_for_merge(num_files_to_merge_) , codec(codec_) + {} + + void addBlocks(const Blocks & blocks) { - sorted_files.emplace_back(flush(inserted_blocks.blocks)); - inserted_blocks.clear(); + sorted_files.emplace_back(flush(blocks.blocks)); } String getPath() const; @@ -90,7 +97,30 @@ struct SortedBlocksWriter void insert(Block && block); TmpFilePtr flush(const BlocksList & blocks) const; + PremergedFiles premerge(); SortedFiles finishMerge(std::function callback = [](const Block &){}); }; + +class SortedBlocksBuffer +{ +public: + SortedBlocksBuffer(const SortDescription & sort_description_, size_t max_bytes_) + : max_bytes(max_bytes_) + , current_bytes(0) + , sort_description(sort_description_) + {} + + Block exchange(Block && block); + +private: + std::mutex mutex; + size_t max_bytes; + size_t current_bytes; + Blocks buffer; + const SortDescription & sort_description; + + Block mergeBlocks(Blocks &&) const; +}; + } diff --git a/src/Interpreters/TableJoin.cpp b/src/Interpreters/TableJoin.cpp index aa88df2e921..b286baed2bd 100644 --- a/src/Interpreters/TableJoin.cpp +++ b/src/Interpreters/TableJoin.cpp @@ -21,6 +21,7 @@ TableJoin::TableJoin(const Settings & settings, VolumeJBODPtr tmp_volume_) , join_algorithm(settings.join_algorithm) , partial_merge_join_optimizations(settings.partial_merge_join_optimizations) , partial_merge_join_rows_in_right_blocks(settings.partial_merge_join_rows_in_right_blocks) + , partial_merge_join_left_table_buffer_bytes(settings.partial_merge_join_left_table_buffer_bytes) , max_files_to_merge(settings.join_on_disk_max_files_to_merge) , temporary_files_codec(settings.temporary_files_codec) , tmp_volume(tmp_volume_) diff --git a/src/Interpreters/TableJoin.h b/src/Interpreters/TableJoin.h index 46d2c2e457c..bb633acaa1d 100644 --- a/src/Interpreters/TableJoin.h +++ b/src/Interpreters/TableJoin.h @@ -50,6 +50,7 @@ class TableJoin JoinAlgorithm join_algorithm = JoinAlgorithm::AUTO; const bool partial_merge_join_optimizations = false; const size_t partial_merge_join_rows_in_right_blocks = 0; + const size_t partial_merge_join_left_table_buffer_bytes = 0; const size_t max_files_to_merge = 0; const String temporary_files_codec = "LZ4"; @@ -108,6 +109,7 @@ public: size_t defaultMaxBytes() const { return default_max_bytes; } size_t maxJoinedBlockRows() const { return max_joined_block_rows; } size_t maxRowsInRightBlock() const { return partial_merge_join_rows_in_right_blocks; } + size_t maxBytesInLeftBuffer() const { return partial_merge_join_left_table_buffer_bytes; } size_t maxFilesToMerge() const { return max_files_to_merge; } const String & temporaryFilesCodec() const { return temporary_files_codec; } bool enablePartialMergeJoinOptimizations() const { return partial_merge_join_optimizations; } diff --git a/src/Processors/Transforms/InflatingExpressionTransform.cpp b/src/Processors/Transforms/InflatingExpressionTransform.cpp index 017cfc7cf82..de4e93ef8d2 100644 --- a/src/Processors/Transforms/InflatingExpressionTransform.cpp +++ b/src/Processors/Transforms/InflatingExpressionTransform.cpp @@ -52,12 +52,23 @@ void InflatingExpressionTransform::transform(Chunk & chunk) Block InflatingExpressionTransform::readExecute(Chunk & chunk) { Block res; - if (likely(!not_processed)) + + if (!not_processed) { - res = getInputPort().getHeader().cloneWithColumns(chunk.detachColumns()); + if (chunk.hasColumns()) + res = getInputPort().getHeader().cloneWithColumns(chunk.detachColumns()); + if (res) expression->execute(res, not_processed, action_number); } + else if (not_processed->empty()) /// There's not processed data inside expression. + { + if (chunk.hasColumns()) + res = getInputPort().getHeader().cloneWithColumns(chunk.detachColumns()); + + not_processed.reset(); + expression->execute(res, not_processed, action_number); + } else { res = std::move(not_processed->block); diff --git a/tests/performance/joins_in_memory_pmj.xml b/tests/performance/joins_in_memory_pmj.xml index 6742943151e..054e4f2d108 100644 --- a/tests/performance/joins_in_memory_pmj.xml +++ b/tests/performance/joins_in_memory_pmj.xml @@ -1,9 +1,6 @@ - - - CREATE TABLE ints (i64 Int64, i32 Int32, i16 Int16, i8 Int8) ENGINE = Memory - SET partial_merge_join = 1 + SET join_algorithm = 'partial_merge' INSERT INTO ints SELECT number AS i64, i64 AS i32, i64 AS i16, i64 AS i8 FROM numbers(5000) INSERT INTO ints SELECT 10000 + number % 1000 AS i64, i64 AS i32, i64 AS i16, i64 AS i8 FROM numbers(5000) @@ -25,6 +22,21 @@ SELECT COUNT() FROM ints l LEFT JOIN ints r USING i64,i32,i16,i8 WHERE i32 = 200042 SELECT COUNT() FROM ints l LEFT JOIN ints r ON l.i64 = r.i64 WHERE i32 = 200042 SELECT COUNT() FROM ints l LEFT JOIN ints r USING i64 WHERE i32 IN(42, 100042, 200042, 300042, 400042) + + SELECT COUNT() FROM ints l ANY LEFT JOIN ints r USING i64 WHERE i32 = 200042 SETTINGS partial_merge_join_optimizations = 0 + SELECT COUNT() FROM ints l ANY LEFT JOIN ints r USING i64,i32,i16,i8 WHERE i32 = 200042 SETTINGS partial_merge_join_optimizations = 0 + SELECT COUNT() FROM ints l ANY LEFT JOIN ints r ON l.i64 = r.i64 WHERE i32 = 200042 SETTINGS partial_merge_join_optimizations = 0 + SELECT COUNT() FROM ints l ANY LEFT JOIN ints r USING i64 WHERE i32 IN(42, 100042, 200042, 300042, 400042) SETTINGS partial_merge_join_optimizations = 0 + SELECT COUNT() FROM ints l INNER JOIN ints r USING i64 WHERE i32 = 200042 SETTINGS partial_merge_join_optimizations = 0 + SELECT COUNT() FROM ints l INNER JOIN ints r USING i64,i32,i16,i8 WHERE i32 = 200042 SETTINGS partial_merge_join_optimizations = 0 + SELECT COUNT() FROM ints l INNER JOIN ints r ON l.i64 = r.i64 WHERE i32 = 200042 SETTINGS partial_merge_join_optimizations = 0 + SELECT COUNT() FROM ints l INNER JOIN ints r USING i64 WHERE i32 IN(42, 100042, 200042, 300042, 400042) SETTINGS partial_merge_join_optimizations = 0 + + SELECT COUNT() FROM ints l LEFT JOIN ints r USING i64 WHERE i32 = 200042 SETTINGS partial_merge_join_optimizations = 0 + SELECT COUNT() FROM ints l LEFT JOIN ints r USING i64,i32,i16,i8 WHERE i32 = 200042 SETTINGS partial_merge_join_optimizations = 0 + SELECT COUNT() FROM ints l LEFT JOIN ints r ON l.i64 = r.i64 WHERE i32 = 200042 SETTINGS partial_merge_join_optimizations = 0 + SELECT COUNT() FROM ints l LEFT JOIN ints r USING i64 WHERE i32 IN(42, 100042, 200042, 300042, 400042) SETTINGS partial_merge_join_optimizations = 0 + DROP TABLE IF EXISTS ints diff --git a/tests/queries/0_stateless/01010_partial_merge_join.sql b/tests/queries/0_stateless/01010_partial_merge_join.sql index 99bba62b48e..a978437bc68 100644 --- a/tests/queries/0_stateless/01010_partial_merge_join.sql +++ b/tests/queries/0_stateless/01010_partial_merge_join.sql @@ -8,7 +8,7 @@ CREATE TABLE t2 (x UInt32, y UInt64) engine = MergeTree ORDER BY (x,y); INSERT INTO t1 (x, y) VALUES (0, 0); -SET partial_merge_join = 1; +SET join_algorithm = 'prefer_partial_merge'; SET any_join_distinct_right_table_keys = 1; SELECT 't join none using'; diff --git a/tests/queries/0_stateless/01010_partial_merge_join_const_and_lc.sql b/tests/queries/0_stateless/01010_partial_merge_join_const_and_lc.sql index 9f45f14a44b..9d266a0ece7 100644 --- a/tests/queries/0_stateless/01010_partial_merge_join_const_and_lc.sql +++ b/tests/queries/0_stateless/01010_partial_merge_join_const_and_lc.sql @@ -1,4 +1,4 @@ -set partial_merge_join = 1; +SET join_algorithm = 'partial_merge'; select s1.x, s2.x from (select 1 as x) s1 left join (select 1 as x) s2 using x; select * from (select materialize(2) as x) s1 left join (select 2 as x) s2 using x; diff --git a/tests/queries/0_stateless/01010_pm_join_all_join_bug.sql b/tests/queries/0_stateless/01010_pm_join_all_join_bug.sql index 0a00628d3b5..ef406108514 100644 --- a/tests/queries/0_stateless/01010_pm_join_all_join_bug.sql +++ b/tests/queries/0_stateless/01010_pm_join_all_join_bug.sql @@ -1,7 +1,7 @@ DROP TABLE IF EXISTS ints; CREATE TABLE ints (i64 Int64, i32 Int32) ENGINE = Memory; -SET partial_merge_join = 1; +SET join_algorithm = 'partial_merge'; INSERT INTO ints SELECT 1 AS i64, number AS i32 FROM numbers(2); diff --git a/tests/queries/0_stateless/01010_pmj_one_row_blocks.sql b/tests/queries/0_stateless/01010_pmj_one_row_blocks.sql index 59aa7d7d4d7..23f468294ed 100644 --- a/tests/queries/0_stateless/01010_pmj_one_row_blocks.sql +++ b/tests/queries/0_stateless/01010_pmj_one_row_blocks.sql @@ -6,7 +6,7 @@ CREATE TABLE t0 (x UInt32, y UInt64) engine = MergeTree ORDER BY (x,y); CREATE TABLE t1 (x UInt32, y UInt64) engine = MergeTree ORDER BY (x,y); CREATE TABLE t2 (x UInt32, y UInt64) engine = MergeTree ORDER BY (x,y); -SET partial_merge_join = 1; +SET join_algorithm = 'prefer_partial_merge'; SET partial_merge_join_rows_in_right_blocks = 1; SET any_join_distinct_right_table_keys = 1; diff --git a/tests/queries/0_stateless/01010_pmj_right_table_memory_limits.sql b/tests/queries/0_stateless/01010_pmj_right_table_memory_limits.sql index 5646a7b091d..2c5f0ef6d99 100644 --- a/tests/queries/0_stateless/01010_pmj_right_table_memory_limits.sql +++ b/tests/queries/0_stateless/01010_pmj_right_table_memory_limits.sql @@ -8,7 +8,7 @@ ANY LEFT JOIN ( ) js2 USING n; -- { serverError 241 } -SET partial_merge_join = 1; +SET join_algorithm = 'partial_merge'; SET default_max_bytes_in_join = 0; SELECT number * 200000 as n, j FROM numbers(5) nums diff --git a/tests/queries/0_stateless/01010_pmj_skip_blocks.sql b/tests/queries/0_stateless/01010_pmj_skip_blocks.sql index cb66fe60382..7815f711d1a 100644 --- a/tests/queries/0_stateless/01010_pmj_skip_blocks.sql +++ b/tests/queries/0_stateless/01010_pmj_skip_blocks.sql @@ -6,7 +6,7 @@ CREATE TABLE t0 (x UInt32, y UInt64) engine = MergeTree ORDER BY (x,y); CREATE TABLE t1 (x UInt32, y UInt64) engine = MergeTree ORDER BY (x,y); CREATE TABLE t2 (x UInt32, y UInt64) engine = MergeTree ORDER BY (x,y); -SET partial_merge_join = 1; +SET join_algorithm = 'prefer_partial_merge'; SET partial_merge_join_optimizations = 1; SET any_join_distinct_right_table_keys = 1; diff --git a/tests/queries/0_stateless/01031_pmj_new_any_semi_join.sql b/tests/queries/0_stateless/01031_pmj_new_any_semi_join.sql index 28cb7345e31..4c306828bb8 100644 --- a/tests/queries/0_stateless/01031_pmj_new_any_semi_join.sql +++ b/tests/queries/0_stateless/01031_pmj_new_any_semi_join.sql @@ -7,7 +7,7 @@ CREATE TABLE t2 (x UInt32, s String) engine = Memory; INSERT INTO t1 (x, s) VALUES (0, 'a1'), (1, 'a2'), (2, 'a3'), (3, 'a4'), (4, 'a5'); INSERT INTO t2 (x, s) VALUES (2, 'b1'), (2, 'b2'), (4, 'b3'), (4, 'b4'), (4, 'b5'), (5, 'b6'); -SET partial_merge_join = 1; +SET join_algorithm = 'prefer_partial_merge'; SET join_use_nulls = 0; SET any_join_distinct_right_table_keys = 0; diff --git a/tests/queries/0_stateless/01062_pm_all_join_with_block_continuation.sql b/tests/queries/0_stateless/01062_pm_all_join_with_block_continuation.sql index d264d73b0bf..15a28e92386 100644 --- a/tests/queries/0_stateless/01062_pm_all_join_with_block_continuation.sql +++ b/tests/queries/0_stateless/01062_pm_all_join_with_block_continuation.sql @@ -1,5 +1,5 @@ SET max_memory_usage = 50000000; -SET partial_merge_join = 1; +SET join_algorithm = 'partial_merge'; SELECT 'defaults'; diff --git a/tests/queries/0_stateless/01062_pm_multiple_all_join_same_value.sql b/tests/queries/0_stateless/01062_pm_multiple_all_join_same_value.sql index a2e1d95e2b6..048da47de6d 100644 --- a/tests/queries/0_stateless/01062_pm_multiple_all_join_same_value.sql +++ b/tests/queries/0_stateless/01062_pm_multiple_all_join_same_value.sql @@ -1,5 +1,5 @@ SET max_memory_usage = 50000000; -SET partial_merge_join = 1; +SET join_algorithm = 'partial_merge'; SELECT count(1) FROM ( SELECT t2.n FROM numbers(10) t1 diff --git a/tests/queries/0_stateless/01064_pm_all_join_const_and_nullable.sql b/tests/queries/0_stateless/01064_pm_all_join_const_and_nullable.sql index 6b87fd67744..10306777f25 100644 --- a/tests/queries/0_stateless/01064_pm_all_join_const_and_nullable.sql +++ b/tests/queries/0_stateless/01064_pm_all_join_const_and_nullable.sql @@ -1,4 +1,4 @@ -SET partial_merge_join = 1; +SET join_algorithm = 'partial_merge'; SELECT count(1), uniqExact(1) FROM ( SELECT materialize(1) as k FROM numbers(1) nums