From e34a9457e664fc6ea05936cd8d7a97a6759456e1 Mon Sep 17 00:00:00 2001 From: Artem Zuikov Date: Tue, 28 Apr 2020 16:55:50 +0300 Subject: [PATCH] Improve right table insert for Partial MergeJoin on disk (#10467) --- src/Core/Settings.h | 6 +- .../MergeSortingBlockInputStream.cpp | 5 +- .../MergeSortingBlockInputStream.h | 2 + src/DataStreams/TemporaryFileStream.h | 20 +- src/Interpreters/InterpreterSelectQuery.cpp | 4 +- src/Interpreters/MergeJoin.cpp | 151 +++---------- src/Interpreters/MergeJoin.h | 60 +----- src/Interpreters/SortedBlocksWriter.cpp | 199 ++++++++++++++++++ src/Interpreters/SortedBlocksWriter.h | 96 +++++++++ src/Interpreters/TableJoin.cpp | 6 +- src/Interpreters/TableJoin.h | 4 + .../01010_pmj_right_table_memory_limits.sql | 1 + 12 files changed, 368 insertions(+), 186 deletions(-) create mode 100644 src/Interpreters/SortedBlocksWriter.cpp create mode 100644 src/Interpreters/SortedBlocksWriter.h diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 458e76f3fb5..2ce8553864c 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -325,9 +325,11 @@ struct Settings : public SettingsCollection M(SettingOverflowMode, join_overflow_mode, OverflowMode::THROW, "What to do when the limit is exceeded.", 0) \ M(SettingBool, join_any_take_last_row, false, "When disabled (default) ANY JOIN will take the first found row for a key. When enabled, it will take the last row seen if there are multiple rows for the same key.", IMPORTANT) \ M(SettingJoinAlgorithm, join_algorithm, JoinAlgorithm::HASH, "Specify join algorithm: 'auto', 'hash', 'partial_merge', 'prefer_partial_merge'. 'auto' tries to change HashJoin to MergeJoin on the fly to avoid out of memory.", 0) \ - M(SettingBool, partial_merge_join_optimizations, false, "Enable optimizations in partial merge join", 0) \ + M(SettingBool, partial_merge_join_optimizations, true, "Enable optimizations in partial merge join", 0) \ M(SettingUInt64, default_max_bytes_in_join, 1000000000, "Maximum size of right-side table if limit is required but max_bytes_in_join is not set.", 0) \ - M(SettingUInt64, partial_merge_join_rows_in_right_blocks, 10000, "Split right-hand joining data in blocks of specified size. It's a portion of data indexed by min-max values and possibly unloaded on disk.", 0) \ + M(SettingUInt64, partial_merge_join_rows_in_right_blocks, 65536, "Split right-hand joining data in blocks of specified size. It's a portion of data indexed by min-max values and possibly unloaded on disk.", 0) \ + M(SettingUInt64, join_on_disk_max_files_to_merge, 64, "For MergeJoin on disk set how much files it's allowed to sort simultaneously. Then this value bigger then more memory used and then less disk I/O needed. Minimum is 2.", 0) \ + M(SettingString, temporary_files_codec, "LZ4", "Set compression codec for temporary files (sort and join on disk). I.e. LZ4, NONE.", 0) \ \ M(SettingUInt64, max_rows_to_transfer, 0, "Maximum size (in rows) of the transmitted external table obtained when the GLOBAL IN/JOIN section is executed.", 0) \ M(SettingUInt64, max_bytes_to_transfer, 0, "Maximum size (in uncompressed bytes) of the transmitted external table obtained when the GLOBAL IN/JOIN section is executed.", 0) \ diff --git a/src/DataStreams/MergeSortingBlockInputStream.cpp b/src/DataStreams/MergeSortingBlockInputStream.cpp index 9a8e938e35d..0ac919f7a98 100644 --- a/src/DataStreams/MergeSortingBlockInputStream.cpp +++ b/src/DataStreams/MergeSortingBlockInputStream.cpp @@ -26,10 +26,11 @@ namespace ErrorCodes MergeSortingBlockInputStream::MergeSortingBlockInputStream( const BlockInputStreamPtr & input, SortDescription & description_, size_t max_merged_block_size_, UInt64 limit_, size_t max_bytes_before_remerge_, - size_t max_bytes_before_external_sort_, VolumePtr tmp_volume_, size_t min_free_disk_space_) + size_t max_bytes_before_external_sort_, VolumePtr tmp_volume_, const String & codec_, size_t min_free_disk_space_) : description(description_), max_merged_block_size(max_merged_block_size_), limit(limit_), max_bytes_before_remerge(max_bytes_before_remerge_), max_bytes_before_external_sort(max_bytes_before_external_sort_), tmp_volume(tmp_volume_), + codec(codec_), min_free_disk_space(min_free_disk_space_) { children.push_back(input); @@ -96,7 +97,7 @@ Block MergeSortingBlockInputStream::readImpl() LOG_INFO(log, "Sorting and writing part of data into temporary file " + path); ProfileEvents::increment(ProfileEvents::ExternalSortWritePart); - TemporaryFileStream::write(path, header_without_constants, block_in, &is_cancelled); /// NOTE. Possibly limit disk usage. + TemporaryFileStream::write(path, header_without_constants, block_in, &is_cancelled, codec); /// NOTE. Possibly limit disk usage. LOG_INFO(log, "Done writing part of data into temporary file " + path); blocks.clear(); diff --git a/src/DataStreams/MergeSortingBlockInputStream.h b/src/DataStreams/MergeSortingBlockInputStream.h index b4b9aafdddf..89b16688e9e 100644 --- a/src/DataStreams/MergeSortingBlockInputStream.h +++ b/src/DataStreams/MergeSortingBlockInputStream.h @@ -80,6 +80,7 @@ public: size_t max_merged_block_size_, UInt64 limit_, size_t max_bytes_before_remerge_, size_t max_bytes_before_external_sort_, VolumePtr tmp_volume_, + const String & codec_, size_t min_free_disk_space_); String getName() const override { return "MergeSorting"; } @@ -100,6 +101,7 @@ private: size_t max_bytes_before_remerge; size_t max_bytes_before_external_sort; VolumePtr tmp_volume; + String codec; size_t min_free_disk_space; Logger * log = &Logger::get("MergeSortingBlockInputStream"); diff --git a/src/DataStreams/TemporaryFileStream.h b/src/DataStreams/TemporaryFileStream.h index 25ea54a7505..6871800a540 100644 --- a/src/DataStreams/TemporaryFileStream.h +++ b/src/DataStreams/TemporaryFileStream.h @@ -33,10 +33,11 @@ struct TemporaryFileStream {} /// Flush data from input stream into file for future reading - static void write(const std::string & path, const Block & header, IBlockInputStream & input, std::atomic * is_cancelled = nullptr) + static void write(const std::string & path, const Block & header, IBlockInputStream & input, + std::atomic * is_cancelled, const std::string & codec) { WriteBufferFromFile file_buf(path); - CompressedWriteBuffer compressed_buf(file_buf); + CompressedWriteBuffer compressed_buf(file_buf, CompressionCodecFactory::instance().get(codec, {})); NativeBlockOutputStream output(compressed_buf, 0, header); copyData(input, output, is_cancelled); } @@ -58,19 +59,26 @@ public: protected: Block readImpl() override { - if (!done) + if (done) + return {}; + + if (!stream) + stream = std::make_unique(path, header); + + auto block = stream->block_in->read(); + if (!block) { done = true; - TemporaryFileStream stream(path, header); - return stream.block_in->read(); + stream.reset(); } - return {}; + return block; } private: const std::string path; Block header; bool done; + std::unique_ptr stream; }; } diff --git a/src/Interpreters/InterpreterSelectQuery.cpp b/src/Interpreters/InterpreterSelectQuery.cpp index 2f98addd975..290bc26856a 100644 --- a/src/Interpreters/InterpreterSelectQuery.cpp +++ b/src/Interpreters/InterpreterSelectQuery.cpp @@ -2033,7 +2033,9 @@ void InterpreterSelectQuery::executeOrder(Pipeline & pipeline, InputSortingInfoP sorting_stream, output_order_descr, settings.max_block_size, limit, settings.max_bytes_before_remerge_sort, settings.max_bytes_before_external_sort / pipeline.streams.size(), - context->getTemporaryVolume(), settings.min_free_disk_space_for_temporary_data); + context->getTemporaryVolume(), + settings.temporary_files_codec, + settings.min_free_disk_space_for_temporary_data); stream = merging_stream; }); diff --git a/src/Interpreters/MergeJoin.cpp b/src/Interpreters/MergeJoin.cpp index 859a341846f..b4d39c0738e 100644 --- a/src/Interpreters/MergeJoin.cpp +++ b/src/Interpreters/MergeJoin.cpp @@ -9,11 +9,7 @@ #include #include #include -#include -#include #include -#include -#include namespace DB { @@ -22,7 +18,6 @@ namespace ErrorCodes { extern const int NOT_IMPLEMENTED; extern const int PARAMETER_OUT_OF_BOUND; - extern const int NOT_ENOUGH_SPACE; extern const int LOGICAL_ERROR; } @@ -355,93 +350,6 @@ Blocks blocksListToBlocks(const BlocksList & in_blocks) return out_blocks; } -std::unique_ptr flushBlockToFile(const String & tmp_path, const Block & header, Block && block) -{ - auto tmp_file = createTemporaryFile(tmp_path); - - OneBlockInputStream stream(block); - std::atomic is_cancelled{false}; - TemporaryFileStream::write(tmp_file->path(), header, stream, &is_cancelled); - if (is_cancelled) - throw Exception("Cannot flush MergeJoin data on disk. No space at " + tmp_path, ErrorCodes::NOT_ENOUGH_SPACE); - - return tmp_file; -} - -void flushStreamToFiles(const String & tmp_path, const Block & header, IBlockInputStream & stream, - std::vector> & files, - std::function callback = [](const Block &){}) -{ - while (Block block = stream.read()) - { - if (!block.rows()) - continue; - - callback(block); - auto tmp_file = flushBlockToFile(tmp_path, header, std::move(block)); - files.emplace_back(std::move(tmp_file)); - } -} - -BlockInputStreams makeSortedInputStreams(std::vector & sorted_files, const Block & header) -{ - BlockInputStreams inputs; - - for (const auto & track : sorted_files) - { - BlockInputStreams sequence; - for (const auto & file : track) - sequence.emplace_back(std::make_shared(file->path(), header)); - inputs.emplace_back(std::make_shared(sequence)); - } - - return inputs; -} - -} - - -void MiniLSM::insert(const BlocksList & blocks) -{ - if (blocks.empty()) - return; - - const std::string path(volume->getNextDisk()->getPath()); - - SortedFiles sorted_blocks; - if (blocks.size() > 1) - { - BlockInputStreams inputs; - inputs.reserve(blocks.size()); - for (const auto & block : blocks) - inputs.push_back(std::make_shared(block)); - - MergingSortedBlockInputStream sorted_input(inputs, sort_description, rows_in_block); - flushStreamToFiles(path, sample_block, sorted_input, sorted_blocks); - } - else - { - OneBlockInputStream sorted_input(blocks.front()); - flushStreamToFiles(path, sample_block, sorted_input, sorted_blocks); - } - - sorted_files.emplace_back(std::move(sorted_blocks)); - if (sorted_files.size() >= max_size) - merge(); -} - -/// TODO: better merge strategy -void MiniLSM::merge(std::function callback) -{ - BlockInputStreams inputs = makeSortedInputStreams(sorted_files, sample_block); - MergingSortedBlockInputStream sorted_stream(inputs, sort_description, rows_in_block); - - const std::string path(volume->getNextDisk()->getPath()); - SortedFiles out; - flushStreamToFiles(path, sample_block, sorted_stream, out, callback); - - sorted_files.clear(); - sorted_files.emplace_back(std::move(out)); } @@ -458,6 +366,7 @@ MergeJoin::MergeJoin(std::shared_ptr table_join_, const Block & right , skip_not_intersected(table_join->enablePartialMergeJoinOptimizations()) , max_joined_block_rows(table_join->maxJoinedBlockRows()) , max_rows_in_right_block(table_join->maxRowsInRightBlock()) + , max_files_to_merge(table_join->maxFilesToMerge()) { if (!isLeft(table_join->kind()) && !isInner(table_join->kind())) throw Exception("Not supported. PartialMergeJoin supports LEFT and INNER JOINs kinds.", ErrorCodes::NOT_IMPLEMENTED); @@ -475,6 +384,9 @@ MergeJoin::MergeJoin(std::shared_ptr table_join_, const Block & right if (!max_rows_in_right_block) throw Exception("partial_merge_join_rows_in_right_blocks cannot be zero", ErrorCodes::PARAMETER_OUT_OF_BOUND); + if (max_files_to_merge < 2) + throw Exception("max_files_to_merge cannot be less than 2", ErrorCodes::PARAMETER_OUT_OF_BOUND); + if (!size_limits.hasLimits()) { size_limits.max_bytes = table_join->defaultMaxBytes(); @@ -499,8 +411,6 @@ MergeJoin::MergeJoin(std::shared_ptr table_join_, const Block & right makeSortAndMerge(table_join->keyNamesLeft(), left_sort_description, left_merge_description); makeSortAndMerge(table_join->keyNamesRight(), right_sort_description, right_merge_description); - - lsm = std::make_unique(table_join->getTemporaryVolume(), right_sample_block, right_sort_description, max_rows_in_right_block); } void MergeJoin::setTotals(const Block & totals_block) @@ -529,8 +439,8 @@ void MergeJoin::mergeInMemoryRightBlocks() if (right_blocks.empty()) return; - Blocks blocks_to_merge = blocksListToBlocks(right_blocks); - clearRightBlocksList(); + Blocks blocks_to_merge = blocksListToBlocks(right_blocks.blocks); + right_blocks.clear(); /// TODO: there should be no splitted keys by blocks for RIGHT|FULL JOIN MergeSortingBlocksBlockInputStream sorted_input(blocks_to_merge, right_sort_description, max_rows_in_right_block); @@ -542,7 +452,7 @@ void MergeJoin::mergeInMemoryRightBlocks() if (skip_not_intersected) min_max_right_blocks.emplace_back(extractMinMax(block, right_table_keys)); - countBlockSize(block); + right_blocks.countBlockSize(block); loaded_right_blocks.emplace_back(std::make_shared(std::move(block))); } } @@ -551,47 +461,50 @@ void MergeJoin::mergeFlushedRightBlocks() { std::unique_lock lock(rwlock); - lsm->insert(right_blocks); - clearRightBlocksList(); - auto callback = [&](const Block & block) { if (skip_not_intersected) min_max_right_blocks.emplace_back(extractMinMax(block, right_table_keys)); - countBlockSize(block); + right_blocks.countBlockSize(block); }; - lsm->merge(callback); - flushed_right_blocks.swap(lsm->sorted_files.front()); + flushed_right_blocks = disk_writer->finishMerge(callback); + disk_writer.reset(); /// Get memory limit or approximate it from row limit and bytes per row factor UInt64 memory_limit = size_limits.max_bytes; UInt64 rows_limit = size_limits.max_rows; if (!memory_limit && rows_limit) - memory_limit = right_blocks_bytes * rows_limit / right_blocks_row_count; + memory_limit = right_blocks.bytes * rows_limit / right_blocks.row_count; cached_right_blocks = std::make_unique(memory_limit); } -void MergeJoin::flushRightBlocks() -{ - /// it's under unique_lock(rwlock) - - is_in_memory = false; - lsm->insert(right_blocks); - clearRightBlocksList(); -} - bool MergeJoin::saveRightBlock(Block && block) { - std::unique_lock lock(rwlock); + if (is_in_memory) + { + std::unique_lock lock(rwlock); - countBlockSize(block); - right_blocks.emplace_back(std::move(block)); + if (!is_in_memory) + { + disk_writer->insert(std::move(block)); + return true; + } - bool has_memory = size_limits.softCheck(right_blocks_row_count, right_blocks_bytes); - if (!has_memory) - flushRightBlocks(); + right_blocks.insert(std::move(block)); + + bool has_memory = size_limits.softCheck(right_blocks.row_count, right_blocks.bytes); + if (!has_memory) + { + disk_writer = std::make_unique(size_limits, table_join->getTemporaryVolume(), + right_sample_block, right_sort_description, right_blocks, + max_rows_in_right_block, max_files_to_merge, table_join->temporaryFilesCodec()); + is_in_memory = false; + } + } + else + disk_writer->insert(std::move(block)); return true; } diff --git a/src/Interpreters/MergeJoin.h b/src/Interpreters/MergeJoin.h index b04720ca168..479b39b1b9a 100644 --- a/src/Interpreters/MergeJoin.h +++ b/src/Interpreters/MergeJoin.h @@ -1,13 +1,12 @@ #pragma once -#include #include #include -#include #include #include #include +#include #include namespace DB @@ -17,34 +16,6 @@ class TableJoin; class MergeJoinCursor; struct MergeJoinEqualRange; -class Volume; -using VolumePtr = std::shared_ptr; - -struct MiniLSM -{ - using SortedFiles = std::vector>; - - VolumePtr volume; - const Block & sample_block; - const SortDescription & sort_description; - const size_t rows_in_block; - const size_t max_size; - std::vector sorted_files; - - MiniLSM(VolumePtr volume_, const Block & sample_block_, const SortDescription & description, - size_t rows_in_block_, size_t max_size_ = 16) - : volume(volume_) - , sample_block(sample_block_) - , sort_description(description) - , rows_in_block(rows_in_block_) - , max_size(max_size_) - {} - - void insert(const BlocksList & blocks); - void merge(std::function callback = [](const Block &){}); -}; - - class MergeJoin : public IJoin { public: @@ -55,8 +26,8 @@ public: void joinTotals(Block &) const override; void setTotals(const Block &) override; bool hasTotals() const override { return totals; } - size_t getTotalRowCount() const override { return right_blocks_row_count; } - size_t getTotalByteCount() const override { return right_blocks_bytes; } + size_t getTotalRowCount() const override { return right_blocks.row_count; } + size_t getTotalByteCount() const override { return right_blocks.bytes; } private: struct NotProcessed : public ExtraBlock @@ -85,16 +56,14 @@ private: Block right_sample_block; Block right_table_keys; Block right_columns_to_add; - BlocksList right_blocks; + SortedBlocksWriter::Blocks right_blocks; Blocks min_max_right_blocks; std::unique_ptr cached_right_blocks; std::vector> loaded_right_blocks; - std::unique_ptr lsm; - MiniLSM::SortedFiles flushed_right_blocks; + std::unique_ptr disk_writer; + SortedBlocksWriter::SortedFiles flushed_right_blocks; Block totals; - size_t right_blocks_row_count = 0; - size_t right_blocks_bytes = 0; - bool is_in_memory = true; + std::atomic is_in_memory{true}; const bool nullable_right_side; const bool is_any_join; const bool is_all_join; @@ -104,6 +73,7 @@ private: const bool skip_not_intersected; const size_t max_joined_block_rows; const size_t max_rows_in_right_block; + const size_t max_files_to_merge; void changeLeftColumns(Block & block, MutableColumns && columns) const; void addRightColumns(Block & block, MutableColumns && columns); @@ -130,23 +100,9 @@ private: MutableColumns & left_columns, MutableColumns & right_columns, size_t & left_key_tail, size_t & skip_right); bool saveRightBlock(Block && block); - void flushRightBlocks(); void mergeInMemoryRightBlocks(); void mergeFlushedRightBlocks(); - - void clearRightBlocksList() - { - right_blocks.clear(); - right_blocks_row_count = 0; - right_blocks_bytes = 0; - } - - void countBlockSize(const Block & block) - { - right_blocks_row_count += block.rows(); - right_blocks_bytes += block.bytes(); - } }; } diff --git a/src/Interpreters/SortedBlocksWriter.cpp b/src/Interpreters/SortedBlocksWriter.cpp new file mode 100644 index 00000000000..625f7d1bcbe --- /dev/null +++ b/src/Interpreters/SortedBlocksWriter.cpp @@ -0,0 +1,199 @@ +#include +#include +#include +#include +#include +#include + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int NOT_ENOUGH_SPACE; +} + +namespace +{ + +std::unique_ptr flushToFile(const String & tmp_path, const Block & header, IBlockInputStream & stream, const String & codec) +{ + auto tmp_file = createTemporaryFile(tmp_path); + + std::atomic is_cancelled{false}; + TemporaryFileStream::write(tmp_file->path(), header, stream, &is_cancelled, codec); + if (is_cancelled) + throw Exception("Cannot flush MergeJoin data on disk. No space at " + tmp_path, ErrorCodes::NOT_ENOUGH_SPACE); + + return tmp_file; +} + +SortedBlocksWriter::SortedFiles flushToManyFiles(const String & tmp_path, const Block & header, IBlockInputStream & stream, + const String & codec, std::function callback = [](const Block &){}) +{ + std::vector> files; + + while (Block block = stream.read()) + { + if (!block.rows()) + continue; + + callback(block); + + OneBlockInputStream block_stream(block); + auto tmp_file = flushToFile(tmp_path, header, block_stream, codec); + files.emplace_back(std::move(tmp_file)); + } + + return files; +} + +} + + +void SortedBlocksWriter::insert(Block && block) +{ + bool can_insert_more = false; + bool has_data_to_flush = false; + + BlocksList current_blocks; + size_t row_count = 0; + size_t bytes = 0; + size_t flush_no = 0; + + { + std::lock_guard lock{insert_mutex}; + + /// insert bock into BlocksList undef lock + inserted_blocks.insert(std::move(block)); + + size_t total_row_count = inserted_blocks.row_count + row_count_in_flush; + size_t total_bytes = inserted_blocks.bytes + bytes_in_flush; + + can_insert_more = size_limits.softCheck(total_row_count, total_bytes); + has_data_to_flush = !size_limits.softCheck(inserted_blocks.row_count * num_streams, inserted_blocks.bytes * num_streams); + + if (has_data_to_flush) + { + ++flush_inflight; + current_blocks.swap(inserted_blocks.blocks); + row_count_in_flush = total_row_count; + bytes_in_flush = total_bytes; + + row_count = inserted_blocks.row_count; + bytes = inserted_blocks.bytes; + inserted_blocks.clear(); + } + else if (can_insert_more) + flush_no = flush_number; + } + + if (has_data_to_flush) + { + /// flush new blocks without lock + auto flushed = flush(current_blocks); + current_blocks.clear(); + + std::lock_guard lock{insert_mutex}; + + sorted_files.emplace_back(std::move(flushed)); + row_count_in_flush -= row_count; + bytes_in_flush -= bytes; + + /// notify another insert (flush_number) and merge (flush_inflight) + ++flush_number; + --flush_inflight; + flush_condvar.notify_all(); + } + else if (!can_insert_more) + { + /// wakeup insert blocked by out of limit + std::unique_lock lock{insert_mutex}; + flush_condvar.wait(lock, [&]{ return flush_no < flush_number; }); + } +} + +SortedBlocksWriter::TmpFilePtr SortedBlocksWriter::flush(const BlocksList & blocks) const +{ + const std::string path = getPath(); + + if (blocks.empty()) + return {}; + + if (blocks.size() == 1) + { + OneBlockInputStream sorted_input(blocks.front()); + return flushToFile(path, sample_block, sorted_input, codec); + } + + BlockInputStreams inputs; + inputs.reserve(blocks.size()); + for (const auto & block : blocks) + if (block.rows()) + inputs.push_back(std::make_shared(block)); + + MergingSortedBlockInputStream sorted_input(inputs, sort_description, rows_in_block); + return flushToFile(path, sample_block, sorted_input, codec); +} + +SortedBlocksWriter::SortedFiles SortedBlocksWriter::finishMerge(std::function callback) +{ + /// wait other flushes if any + { + std::unique_lock lock{insert_mutex}; + flush_condvar.wait(lock, [&]{ return !flush_inflight; }); + } + + /// flush not flushed + if (!inserted_blocks.empty()) + sorted_files.emplace_back(flush(inserted_blocks.blocks)); + inserted_blocks.clear(); + + BlockInputStreams inputs; + inputs.reserve(num_files_for_merge); + + /// Merge by parts to save memory. It's possible to exchange disk I/O and memory by num_files_for_merge. + { + SortedFiles new_files; + new_files.reserve(sorted_files.size() / num_files_for_merge + 1); + + while (sorted_files.size() > num_files_for_merge) + { + for (const auto & file : sorted_files) + { + inputs.emplace_back(streamFromFile(file)); + + if (inputs.size() == num_files_for_merge || &file == &sorted_files.back()) + { + MergingSortedBlockInputStream sorted_input(inputs, sort_description, rows_in_block); + new_files.emplace_back(flushToFile(getPath(), sample_block, sorted_input, codec)); + inputs.clear(); + } + } + + sorted_files.clear(); + sorted_files.swap(new_files); + } + + for (const auto & file : sorted_files) + inputs.emplace_back(streamFromFile(file)); + } + + MergingSortedBlockInputStream sorted_input(inputs, sort_description, rows_in_block); + + SortedFiles out = flushToManyFiles(getPath(), sample_block, sorted_input, codec, callback); + sorted_files.clear(); + return out; /// There're also inserted_blocks counters as indirect output +} + +BlockInputStreamPtr SortedBlocksWriter::streamFromFile(const TmpFilePtr & file) const +{ + return std::make_shared(file->path(), sample_block); +} + +String SortedBlocksWriter::getPath() const +{ + return volume->getNextDisk()->getPath(); +} + +} diff --git a/src/Interpreters/SortedBlocksWriter.h b/src/Interpreters/SortedBlocksWriter.h new file mode 100644 index 00000000000..6a28e60e553 --- /dev/null +++ b/src/Interpreters/SortedBlocksWriter.h @@ -0,0 +1,96 @@ +#pragma once + +#include +#include + +#include +#include +#include +#include +#include + +namespace DB +{ + +class TableJoin; +class MergeJoinCursor; +struct MergeJoinEqualRange; + +class Volume; +using VolumePtr = std::shared_ptr; + +struct SortedBlocksWriter +{ + using TmpFilePtr = std::unique_ptr; + using SortedFiles = std::vector; + + struct Blocks + { + BlocksList blocks; + size_t row_count = 0; + size_t bytes = 0; + + bool empty() const { return blocks.empty(); } + + void insert(Block && block) + { + countBlockSize(block); + blocks.emplace_back(std::move(block)); + } + + void countBlockSize(const Block & block) + { + row_count += block.rows(); + bytes += block.bytes(); + } + + void clear() + { + blocks.clear(); + row_count = 0; + bytes = 0; + } + }; + + static constexpr const size_t num_streams = 2; + + std::mutex insert_mutex; + std::condition_variable flush_condvar; + const SizeLimits & size_limits; + VolumePtr volume; + const Block & sample_block; + const SortDescription & sort_description; + Blocks & inserted_blocks; + const size_t rows_in_block; + const size_t num_files_for_merge; + const String & codec; + SortedFiles sorted_files; + size_t row_count_in_flush = 0; + size_t bytes_in_flush = 0; + size_t flush_number = 0; + size_t flush_inflight = 0; + + SortedBlocksWriter(const SizeLimits & size_limits_, VolumePtr volume_, const Block & sample_block_, const SortDescription & description, + Blocks & blocks, size_t rows_in_block_, size_t num_files_to_merge_, const String & codec_) + : size_limits(size_limits_) + , volume(volume_) + , sample_block(sample_block_) + , sort_description(description) + , inserted_blocks(blocks) + , rows_in_block(rows_in_block_) + , num_files_for_merge(num_files_to_merge_) + , codec(codec_) + { + sorted_files.emplace_back(flush(inserted_blocks.blocks)); + inserted_blocks.clear(); + } + + String getPath() const; + BlockInputStreamPtr streamFromFile(const TmpFilePtr & file) const; + + void insert(Block && block); + TmpFilePtr flush(const BlocksList & blocks) const; + SortedFiles finishMerge(std::function callback = [](const Block &){}); +}; + +} diff --git a/src/Interpreters/TableJoin.cpp b/src/Interpreters/TableJoin.cpp index c06dd50803a..eea1c576f38 100644 --- a/src/Interpreters/TableJoin.cpp +++ b/src/Interpreters/TableJoin.cpp @@ -13,10 +13,6 @@ namespace DB { -namespace ErrorCodes -{ -} - TableJoin::TableJoin(const Settings & settings, VolumePtr tmp_volume_) : size_limits(SizeLimits{settings.max_rows_in_join, settings.max_bytes_in_join, settings.join_overflow_mode}) , default_max_bytes(settings.default_max_bytes_in_join) @@ -25,6 +21,8 @@ TableJoin::TableJoin(const Settings & settings, VolumePtr tmp_volume_) , join_algorithm(settings.join_algorithm) , partial_merge_join_optimizations(settings.partial_merge_join_optimizations) , partial_merge_join_rows_in_right_blocks(settings.partial_merge_join_rows_in_right_blocks) + , max_files_to_merge(settings.join_on_disk_max_files_to_merge) + , temporary_files_codec(settings.temporary_files_codec) , tmp_volume(tmp_volume_) { if (settings.partial_merge_join) diff --git a/src/Interpreters/TableJoin.h b/src/Interpreters/TableJoin.h index 2047f935966..61f6d122ec1 100644 --- a/src/Interpreters/TableJoin.h +++ b/src/Interpreters/TableJoin.h @@ -50,6 +50,8 @@ class TableJoin JoinAlgorithm join_algorithm = JoinAlgorithm::AUTO; const bool partial_merge_join_optimizations = false; const size_t partial_merge_join_rows_in_right_blocks = 0; + const size_t max_files_to_merge = 0; + const String temporary_files_codec = "LZ4"; Names key_names_left; Names key_names_right; /// Duplicating names are qualified. @@ -106,6 +108,8 @@ public: size_t defaultMaxBytes() const { return default_max_bytes; } size_t maxJoinedBlockRows() const { return max_joined_block_rows; } size_t maxRowsInRightBlock() const { return partial_merge_join_rows_in_right_blocks; } + size_t maxFilesToMerge() const { return max_files_to_merge; } + const String & temporaryFilesCodec() const { return temporary_files_codec; } bool enablePartialMergeJoinOptimizations() const { return partial_merge_join_optimizations; } void addUsingKey(const ASTPtr & ast); diff --git a/tests/queries/0_stateless/01010_pmj_right_table_memory_limits.sql b/tests/queries/0_stateless/01010_pmj_right_table_memory_limits.sql index 8a23b58b66f..5646a7b091d 100644 --- a/tests/queries/0_stateless/01010_pmj_right_table_memory_limits.sql +++ b/tests/queries/0_stateless/01010_pmj_right_table_memory_limits.sql @@ -1,4 +1,5 @@ SET max_memory_usage = 32000000; +SET join_on_disk_max_files_to_merge = 4; SELECT number * 200000 as n, j FROM numbers(5) nums ANY LEFT JOIN (