Improve right table insert for Partial MergeJoin on disk (#10467)

This commit is contained in:
Artem Zuikov 2020-04-28 16:55:50 +03:00 committed by GitHub
parent 1b6e6b7e5c
commit e34a9457e6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 368 additions and 186 deletions

View File

@ -325,9 +325,11 @@ struct Settings : public SettingsCollection<Settings>
M(SettingOverflowMode, join_overflow_mode, OverflowMode::THROW, "What to do when the limit is exceeded.", 0) \ M(SettingOverflowMode, join_overflow_mode, OverflowMode::THROW, "What to do when the limit is exceeded.", 0) \
M(SettingBool, join_any_take_last_row, false, "When disabled (default) ANY JOIN will take the first found row for a key. When enabled, it will take the last row seen if there are multiple rows for the same key.", IMPORTANT) \ M(SettingBool, join_any_take_last_row, false, "When disabled (default) ANY JOIN will take the first found row for a key. When enabled, it will take the last row seen if there are multiple rows for the same key.", IMPORTANT) \
M(SettingJoinAlgorithm, join_algorithm, JoinAlgorithm::HASH, "Specify join algorithm: 'auto', 'hash', 'partial_merge', 'prefer_partial_merge'. 'auto' tries to change HashJoin to MergeJoin on the fly to avoid out of memory.", 0) \ M(SettingJoinAlgorithm, join_algorithm, JoinAlgorithm::HASH, "Specify join algorithm: 'auto', 'hash', 'partial_merge', 'prefer_partial_merge'. 'auto' tries to change HashJoin to MergeJoin on the fly to avoid out of memory.", 0) \
M(SettingBool, partial_merge_join_optimizations, false, "Enable optimizations in partial merge join", 0) \ M(SettingBool, partial_merge_join_optimizations, true, "Enable optimizations in partial merge join", 0) \
M(SettingUInt64, default_max_bytes_in_join, 1000000000, "Maximum size of right-side table if limit is required but max_bytes_in_join is not set.", 0) \ M(SettingUInt64, default_max_bytes_in_join, 1000000000, "Maximum size of right-side table if limit is required but max_bytes_in_join is not set.", 0) \
M(SettingUInt64, partial_merge_join_rows_in_right_blocks, 10000, "Split right-hand joining data in blocks of specified size. It's a portion of data indexed by min-max values and possibly unloaded on disk.", 0) \ M(SettingUInt64, partial_merge_join_rows_in_right_blocks, 65536, "Split right-hand joining data in blocks of specified size. It's a portion of data indexed by min-max values and possibly unloaded on disk.", 0) \
M(SettingUInt64, join_on_disk_max_files_to_merge, 64, "For MergeJoin on disk set how much files it's allowed to sort simultaneously. Then this value bigger then more memory used and then less disk I/O needed. Minimum is 2.", 0) \
M(SettingString, temporary_files_codec, "LZ4", "Set compression codec for temporary files (sort and join on disk). I.e. LZ4, NONE.", 0) \
\ \
M(SettingUInt64, max_rows_to_transfer, 0, "Maximum size (in rows) of the transmitted external table obtained when the GLOBAL IN/JOIN section is executed.", 0) \ M(SettingUInt64, max_rows_to_transfer, 0, "Maximum size (in rows) of the transmitted external table obtained when the GLOBAL IN/JOIN section is executed.", 0) \
M(SettingUInt64, max_bytes_to_transfer, 0, "Maximum size (in uncompressed bytes) of the transmitted external table obtained when the GLOBAL IN/JOIN section is executed.", 0) \ M(SettingUInt64, max_bytes_to_transfer, 0, "Maximum size (in uncompressed bytes) of the transmitted external table obtained when the GLOBAL IN/JOIN section is executed.", 0) \

View File

@ -26,10 +26,11 @@ namespace ErrorCodes
MergeSortingBlockInputStream::MergeSortingBlockInputStream( MergeSortingBlockInputStream::MergeSortingBlockInputStream(
const BlockInputStreamPtr & input, SortDescription & description_, const BlockInputStreamPtr & input, SortDescription & description_,
size_t max_merged_block_size_, UInt64 limit_, size_t max_bytes_before_remerge_, size_t max_merged_block_size_, UInt64 limit_, size_t max_bytes_before_remerge_,
size_t max_bytes_before_external_sort_, VolumePtr tmp_volume_, size_t min_free_disk_space_) size_t max_bytes_before_external_sort_, VolumePtr tmp_volume_, const String & codec_, size_t min_free_disk_space_)
: description(description_), max_merged_block_size(max_merged_block_size_), limit(limit_), : description(description_), max_merged_block_size(max_merged_block_size_), limit(limit_),
max_bytes_before_remerge(max_bytes_before_remerge_), max_bytes_before_remerge(max_bytes_before_remerge_),
max_bytes_before_external_sort(max_bytes_before_external_sort_), tmp_volume(tmp_volume_), max_bytes_before_external_sort(max_bytes_before_external_sort_), tmp_volume(tmp_volume_),
codec(codec_),
min_free_disk_space(min_free_disk_space_) min_free_disk_space(min_free_disk_space_)
{ {
children.push_back(input); children.push_back(input);
@ -96,7 +97,7 @@ Block MergeSortingBlockInputStream::readImpl()
LOG_INFO(log, "Sorting and writing part of data into temporary file " + path); LOG_INFO(log, "Sorting and writing part of data into temporary file " + path);
ProfileEvents::increment(ProfileEvents::ExternalSortWritePart); ProfileEvents::increment(ProfileEvents::ExternalSortWritePart);
TemporaryFileStream::write(path, header_without_constants, block_in, &is_cancelled); /// NOTE. Possibly limit disk usage. TemporaryFileStream::write(path, header_without_constants, block_in, &is_cancelled, codec); /// NOTE. Possibly limit disk usage.
LOG_INFO(log, "Done writing part of data into temporary file " + path); LOG_INFO(log, "Done writing part of data into temporary file " + path);
blocks.clear(); blocks.clear();

View File

@ -80,6 +80,7 @@ public:
size_t max_merged_block_size_, UInt64 limit_, size_t max_merged_block_size_, UInt64 limit_,
size_t max_bytes_before_remerge_, size_t max_bytes_before_remerge_,
size_t max_bytes_before_external_sort_, VolumePtr tmp_volume_, size_t max_bytes_before_external_sort_, VolumePtr tmp_volume_,
const String & codec_,
size_t min_free_disk_space_); size_t min_free_disk_space_);
String getName() const override { return "MergeSorting"; } String getName() const override { return "MergeSorting"; }
@ -100,6 +101,7 @@ private:
size_t max_bytes_before_remerge; size_t max_bytes_before_remerge;
size_t max_bytes_before_external_sort; size_t max_bytes_before_external_sort;
VolumePtr tmp_volume; VolumePtr tmp_volume;
String codec;
size_t min_free_disk_space; size_t min_free_disk_space;
Logger * log = &Logger::get("MergeSortingBlockInputStream"); Logger * log = &Logger::get("MergeSortingBlockInputStream");

View File

@ -33,10 +33,11 @@ struct TemporaryFileStream
{} {}
/// Flush data from input stream into file for future reading /// Flush data from input stream into file for future reading
static void write(const std::string & path, const Block & header, IBlockInputStream & input, std::atomic<bool> * is_cancelled = nullptr) static void write(const std::string & path, const Block & header, IBlockInputStream & input,
std::atomic<bool> * is_cancelled, const std::string & codec)
{ {
WriteBufferFromFile file_buf(path); WriteBufferFromFile file_buf(path);
CompressedWriteBuffer compressed_buf(file_buf); CompressedWriteBuffer compressed_buf(file_buf, CompressionCodecFactory::instance().get(codec, {}));
NativeBlockOutputStream output(compressed_buf, 0, header); NativeBlockOutputStream output(compressed_buf, 0, header);
copyData(input, output, is_cancelled); copyData(input, output, is_cancelled);
} }
@ -58,19 +59,26 @@ public:
protected: protected:
Block readImpl() override Block readImpl() override
{ {
if (!done) if (done)
return {};
if (!stream)
stream = std::make_unique<TemporaryFileStream>(path, header);
auto block = stream->block_in->read();
if (!block)
{ {
done = true; done = true;
TemporaryFileStream stream(path, header); stream.reset();
return stream.block_in->read();
} }
return {}; return block;
} }
private: private:
const std::string path; const std::string path;
Block header; Block header;
bool done; bool done;
std::unique_ptr<TemporaryFileStream> stream;
}; };
} }

View File

@ -2033,7 +2033,9 @@ void InterpreterSelectQuery::executeOrder(Pipeline & pipeline, InputSortingInfoP
sorting_stream, output_order_descr, settings.max_block_size, limit, sorting_stream, output_order_descr, settings.max_block_size, limit,
settings.max_bytes_before_remerge_sort, settings.max_bytes_before_remerge_sort,
settings.max_bytes_before_external_sort / pipeline.streams.size(), settings.max_bytes_before_external_sort / pipeline.streams.size(),
context->getTemporaryVolume(), settings.min_free_disk_space_for_temporary_data); context->getTemporaryVolume(),
settings.temporary_files_codec,
settings.min_free_disk_space_for_temporary_data);
stream = merging_stream; stream = merging_stream;
}); });

View File

@ -9,11 +9,7 @@
#include <Interpreters/join_common.h> #include <Interpreters/join_common.h>
#include <DataStreams/materializeBlock.h> #include <DataStreams/materializeBlock.h>
#include <DataStreams/MergeSortingBlockInputStream.h> #include <DataStreams/MergeSortingBlockInputStream.h>
#include <DataStreams/MergingSortedBlockInputStream.h>
#include <DataStreams/OneBlockInputStream.h>
#include <DataStreams/TemporaryFileStream.h> #include <DataStreams/TemporaryFileStream.h>
#include <DataStreams/ConcatBlockInputStream.h>
#include <Disks/DiskSpaceMonitor.h>
namespace DB namespace DB
{ {
@ -22,7 +18,6 @@ namespace ErrorCodes
{ {
extern const int NOT_IMPLEMENTED; extern const int NOT_IMPLEMENTED;
extern const int PARAMETER_OUT_OF_BOUND; extern const int PARAMETER_OUT_OF_BOUND;
extern const int NOT_ENOUGH_SPACE;
extern const int LOGICAL_ERROR; extern const int LOGICAL_ERROR;
} }
@ -355,93 +350,6 @@ Blocks blocksListToBlocks(const BlocksList & in_blocks)
return out_blocks; return out_blocks;
} }
std::unique_ptr<TemporaryFile> flushBlockToFile(const String & tmp_path, const Block & header, Block && block)
{
auto tmp_file = createTemporaryFile(tmp_path);
OneBlockInputStream stream(block);
std::atomic<bool> is_cancelled{false};
TemporaryFileStream::write(tmp_file->path(), header, stream, &is_cancelled);
if (is_cancelled)
throw Exception("Cannot flush MergeJoin data on disk. No space at " + tmp_path, ErrorCodes::NOT_ENOUGH_SPACE);
return tmp_file;
}
void flushStreamToFiles(const String & tmp_path, const Block & header, IBlockInputStream & stream,
std::vector<std::unique_ptr<TemporaryFile>> & files,
std::function<void(const Block &)> callback = [](const Block &){})
{
while (Block block = stream.read())
{
if (!block.rows())
continue;
callback(block);
auto tmp_file = flushBlockToFile(tmp_path, header, std::move(block));
files.emplace_back(std::move(tmp_file));
}
}
BlockInputStreams makeSortedInputStreams(std::vector<MiniLSM::SortedFiles> & sorted_files, const Block & header)
{
BlockInputStreams inputs;
for (const auto & track : sorted_files)
{
BlockInputStreams sequence;
for (const auto & file : track)
sequence.emplace_back(std::make_shared<TemporaryFileLazyInputStream>(file->path(), header));
inputs.emplace_back(std::make_shared<ConcatBlockInputStream>(sequence));
}
return inputs;
}
}
void MiniLSM::insert(const BlocksList & blocks)
{
if (blocks.empty())
return;
const std::string path(volume->getNextDisk()->getPath());
SortedFiles sorted_blocks;
if (blocks.size() > 1)
{
BlockInputStreams inputs;
inputs.reserve(blocks.size());
for (const auto & block : blocks)
inputs.push_back(std::make_shared<OneBlockInputStream>(block));
MergingSortedBlockInputStream sorted_input(inputs, sort_description, rows_in_block);
flushStreamToFiles(path, sample_block, sorted_input, sorted_blocks);
}
else
{
OneBlockInputStream sorted_input(blocks.front());
flushStreamToFiles(path, sample_block, sorted_input, sorted_blocks);
}
sorted_files.emplace_back(std::move(sorted_blocks));
if (sorted_files.size() >= max_size)
merge();
}
/// TODO: better merge strategy
void MiniLSM::merge(std::function<void(const Block &)> callback)
{
BlockInputStreams inputs = makeSortedInputStreams(sorted_files, sample_block);
MergingSortedBlockInputStream sorted_stream(inputs, sort_description, rows_in_block);
const std::string path(volume->getNextDisk()->getPath());
SortedFiles out;
flushStreamToFiles(path, sample_block, sorted_stream, out, callback);
sorted_files.clear();
sorted_files.emplace_back(std::move(out));
} }
@ -458,6 +366,7 @@ MergeJoin::MergeJoin(std::shared_ptr<TableJoin> table_join_, const Block & right
, skip_not_intersected(table_join->enablePartialMergeJoinOptimizations()) , skip_not_intersected(table_join->enablePartialMergeJoinOptimizations())
, max_joined_block_rows(table_join->maxJoinedBlockRows()) , max_joined_block_rows(table_join->maxJoinedBlockRows())
, max_rows_in_right_block(table_join->maxRowsInRightBlock()) , max_rows_in_right_block(table_join->maxRowsInRightBlock())
, max_files_to_merge(table_join->maxFilesToMerge())
{ {
if (!isLeft(table_join->kind()) && !isInner(table_join->kind())) if (!isLeft(table_join->kind()) && !isInner(table_join->kind()))
throw Exception("Not supported. PartialMergeJoin supports LEFT and INNER JOINs kinds.", ErrorCodes::NOT_IMPLEMENTED); throw Exception("Not supported. PartialMergeJoin supports LEFT and INNER JOINs kinds.", ErrorCodes::NOT_IMPLEMENTED);
@ -475,6 +384,9 @@ MergeJoin::MergeJoin(std::shared_ptr<TableJoin> table_join_, const Block & right
if (!max_rows_in_right_block) if (!max_rows_in_right_block)
throw Exception("partial_merge_join_rows_in_right_blocks cannot be zero", ErrorCodes::PARAMETER_OUT_OF_BOUND); throw Exception("partial_merge_join_rows_in_right_blocks cannot be zero", ErrorCodes::PARAMETER_OUT_OF_BOUND);
if (max_files_to_merge < 2)
throw Exception("max_files_to_merge cannot be less than 2", ErrorCodes::PARAMETER_OUT_OF_BOUND);
if (!size_limits.hasLimits()) if (!size_limits.hasLimits())
{ {
size_limits.max_bytes = table_join->defaultMaxBytes(); size_limits.max_bytes = table_join->defaultMaxBytes();
@ -499,8 +411,6 @@ MergeJoin::MergeJoin(std::shared_ptr<TableJoin> table_join_, const Block & right
makeSortAndMerge(table_join->keyNamesLeft(), left_sort_description, left_merge_description); makeSortAndMerge(table_join->keyNamesLeft(), left_sort_description, left_merge_description);
makeSortAndMerge(table_join->keyNamesRight(), right_sort_description, right_merge_description); makeSortAndMerge(table_join->keyNamesRight(), right_sort_description, right_merge_description);
lsm = std::make_unique<MiniLSM>(table_join->getTemporaryVolume(), right_sample_block, right_sort_description, max_rows_in_right_block);
} }
void MergeJoin::setTotals(const Block & totals_block) void MergeJoin::setTotals(const Block & totals_block)
@ -529,8 +439,8 @@ void MergeJoin::mergeInMemoryRightBlocks()
if (right_blocks.empty()) if (right_blocks.empty())
return; return;
Blocks blocks_to_merge = blocksListToBlocks(right_blocks); Blocks blocks_to_merge = blocksListToBlocks(right_blocks.blocks);
clearRightBlocksList(); right_blocks.clear();
/// TODO: there should be no splitted keys by blocks for RIGHT|FULL JOIN /// TODO: there should be no splitted keys by blocks for RIGHT|FULL JOIN
MergeSortingBlocksBlockInputStream sorted_input(blocks_to_merge, right_sort_description, max_rows_in_right_block); MergeSortingBlocksBlockInputStream sorted_input(blocks_to_merge, right_sort_description, max_rows_in_right_block);
@ -542,7 +452,7 @@ void MergeJoin::mergeInMemoryRightBlocks()
if (skip_not_intersected) if (skip_not_intersected)
min_max_right_blocks.emplace_back(extractMinMax(block, right_table_keys)); min_max_right_blocks.emplace_back(extractMinMax(block, right_table_keys));
countBlockSize(block); right_blocks.countBlockSize(block);
loaded_right_blocks.emplace_back(std::make_shared<Block>(std::move(block))); loaded_right_blocks.emplace_back(std::make_shared<Block>(std::move(block)));
} }
} }
@ -551,47 +461,50 @@ void MergeJoin::mergeFlushedRightBlocks()
{ {
std::unique_lock lock(rwlock); std::unique_lock lock(rwlock);
lsm->insert(right_blocks);
clearRightBlocksList();
auto callback = [&](const Block & block) auto callback = [&](const Block & block)
{ {
if (skip_not_intersected) if (skip_not_intersected)
min_max_right_blocks.emplace_back(extractMinMax(block, right_table_keys)); min_max_right_blocks.emplace_back(extractMinMax(block, right_table_keys));
countBlockSize(block); right_blocks.countBlockSize(block);
}; };
lsm->merge(callback); flushed_right_blocks = disk_writer->finishMerge(callback);
flushed_right_blocks.swap(lsm->sorted_files.front()); disk_writer.reset();
/// Get memory limit or approximate it from row limit and bytes per row factor /// Get memory limit or approximate it from row limit and bytes per row factor
UInt64 memory_limit = size_limits.max_bytes; UInt64 memory_limit = size_limits.max_bytes;
UInt64 rows_limit = size_limits.max_rows; UInt64 rows_limit = size_limits.max_rows;
if (!memory_limit && rows_limit) if (!memory_limit && rows_limit)
memory_limit = right_blocks_bytes * rows_limit / right_blocks_row_count; memory_limit = right_blocks.bytes * rows_limit / right_blocks.row_count;
cached_right_blocks = std::make_unique<Cache>(memory_limit); cached_right_blocks = std::make_unique<Cache>(memory_limit);
} }
void MergeJoin::flushRightBlocks()
{
/// it's under unique_lock(rwlock)
is_in_memory = false;
lsm->insert(right_blocks);
clearRightBlocksList();
}
bool MergeJoin::saveRightBlock(Block && block) bool MergeJoin::saveRightBlock(Block && block)
{ {
if (is_in_memory)
{
std::unique_lock lock(rwlock); std::unique_lock lock(rwlock);
countBlockSize(block); if (!is_in_memory)
right_blocks.emplace_back(std::move(block)); {
disk_writer->insert(std::move(block));
return true;
}
bool has_memory = size_limits.softCheck(right_blocks_row_count, right_blocks_bytes); right_blocks.insert(std::move(block));
bool has_memory = size_limits.softCheck(right_blocks.row_count, right_blocks.bytes);
if (!has_memory) if (!has_memory)
flushRightBlocks(); {
disk_writer = std::make_unique<SortedBlocksWriter>(size_limits, table_join->getTemporaryVolume(),
right_sample_block, right_sort_description, right_blocks,
max_rows_in_right_block, max_files_to_merge, table_join->temporaryFilesCodec());
is_in_memory = false;
}
}
else
disk_writer->insert(std::move(block));
return true; return true;
} }

View File

@ -1,13 +1,12 @@
#pragma once #pragma once
#include <memory>
#include <shared_mutex> #include <shared_mutex>
#include <Common/LRUCache.h> #include <Common/LRUCache.h>
#include <Common/filesystemHelpers.h>
#include <Core/Block.h> #include <Core/Block.h>
#include <Core/SortDescription.h> #include <Core/SortDescription.h>
#include <Interpreters/IJoin.h> #include <Interpreters/IJoin.h>
#include <Interpreters/SortedBlocksWriter.h>
#include <DataStreams/SizeLimits.h> #include <DataStreams/SizeLimits.h>
namespace DB namespace DB
@ -17,34 +16,6 @@ class TableJoin;
class MergeJoinCursor; class MergeJoinCursor;
struct MergeJoinEqualRange; struct MergeJoinEqualRange;
class Volume;
using VolumePtr = std::shared_ptr<Volume>;
struct MiniLSM
{
using SortedFiles = std::vector<std::unique_ptr<TemporaryFile>>;
VolumePtr volume;
const Block & sample_block;
const SortDescription & sort_description;
const size_t rows_in_block;
const size_t max_size;
std::vector<SortedFiles> sorted_files;
MiniLSM(VolumePtr volume_, const Block & sample_block_, const SortDescription & description,
size_t rows_in_block_, size_t max_size_ = 16)
: volume(volume_)
, sample_block(sample_block_)
, sort_description(description)
, rows_in_block(rows_in_block_)
, max_size(max_size_)
{}
void insert(const BlocksList & blocks);
void merge(std::function<void(const Block &)> callback = [](const Block &){});
};
class MergeJoin : public IJoin class MergeJoin : public IJoin
{ {
public: public:
@ -55,8 +26,8 @@ public:
void joinTotals(Block &) const override; void joinTotals(Block &) const override;
void setTotals(const Block &) override; void setTotals(const Block &) override;
bool hasTotals() const override { return totals; } bool hasTotals() const override { return totals; }
size_t getTotalRowCount() const override { return right_blocks_row_count; } size_t getTotalRowCount() const override { return right_blocks.row_count; }
size_t getTotalByteCount() const override { return right_blocks_bytes; } size_t getTotalByteCount() const override { return right_blocks.bytes; }
private: private:
struct NotProcessed : public ExtraBlock struct NotProcessed : public ExtraBlock
@ -85,16 +56,14 @@ private:
Block right_sample_block; Block right_sample_block;
Block right_table_keys; Block right_table_keys;
Block right_columns_to_add; Block right_columns_to_add;
BlocksList right_blocks; SortedBlocksWriter::Blocks right_blocks;
Blocks min_max_right_blocks; Blocks min_max_right_blocks;
std::unique_ptr<Cache> cached_right_blocks; std::unique_ptr<Cache> cached_right_blocks;
std::vector<std::shared_ptr<Block>> loaded_right_blocks; std::vector<std::shared_ptr<Block>> loaded_right_blocks;
std::unique_ptr<MiniLSM> lsm; std::unique_ptr<SortedBlocksWriter> disk_writer;
MiniLSM::SortedFiles flushed_right_blocks; SortedBlocksWriter::SortedFiles flushed_right_blocks;
Block totals; Block totals;
size_t right_blocks_row_count = 0; std::atomic<bool> is_in_memory{true};
size_t right_blocks_bytes = 0;
bool is_in_memory = true;
const bool nullable_right_side; const bool nullable_right_side;
const bool is_any_join; const bool is_any_join;
const bool is_all_join; const bool is_all_join;
@ -104,6 +73,7 @@ private:
const bool skip_not_intersected; const bool skip_not_intersected;
const size_t max_joined_block_rows; const size_t max_joined_block_rows;
const size_t max_rows_in_right_block; const size_t max_rows_in_right_block;
const size_t max_files_to_merge;
void changeLeftColumns(Block & block, MutableColumns && columns) const; void changeLeftColumns(Block & block, MutableColumns && columns) const;
void addRightColumns(Block & block, MutableColumns && columns); void addRightColumns(Block & block, MutableColumns && columns);
@ -130,23 +100,9 @@ private:
MutableColumns & left_columns, MutableColumns & right_columns, size_t & left_key_tail, size_t & skip_right); MutableColumns & left_columns, MutableColumns & right_columns, size_t & left_key_tail, size_t & skip_right);
bool saveRightBlock(Block && block); bool saveRightBlock(Block && block);
void flushRightBlocks();
void mergeInMemoryRightBlocks(); void mergeInMemoryRightBlocks();
void mergeFlushedRightBlocks(); void mergeFlushedRightBlocks();
void clearRightBlocksList()
{
right_blocks.clear();
right_blocks_row_count = 0;
right_blocks_bytes = 0;
}
void countBlockSize(const Block & block)
{
right_blocks_row_count += block.rows();
right_blocks_bytes += block.bytes();
}
}; };
} }

View File

@ -0,0 +1,199 @@
#include <Core/SortCursor.h>
#include <Interpreters/SortedBlocksWriter.h>
#include <DataStreams/MergingSortedBlockInputStream.h>
#include <DataStreams/OneBlockInputStream.h>
#include <DataStreams/TemporaryFileStream.h>
#include <Disks/DiskSpaceMonitor.h>
namespace DB
{
namespace ErrorCodes
{
extern const int NOT_ENOUGH_SPACE;
}
namespace
{
std::unique_ptr<TemporaryFile> flushToFile(const String & tmp_path, const Block & header, IBlockInputStream & stream, const String & codec)
{
auto tmp_file = createTemporaryFile(tmp_path);
std::atomic<bool> is_cancelled{false};
TemporaryFileStream::write(tmp_file->path(), header, stream, &is_cancelled, codec);
if (is_cancelled)
throw Exception("Cannot flush MergeJoin data on disk. No space at " + tmp_path, ErrorCodes::NOT_ENOUGH_SPACE);
return tmp_file;
}
SortedBlocksWriter::SortedFiles flushToManyFiles(const String & tmp_path, const Block & header, IBlockInputStream & stream,
const String & codec, std::function<void(const Block &)> callback = [](const Block &){})
{
std::vector<std::unique_ptr<TemporaryFile>> files;
while (Block block = stream.read())
{
if (!block.rows())
continue;
callback(block);
OneBlockInputStream block_stream(block);
auto tmp_file = flushToFile(tmp_path, header, block_stream, codec);
files.emplace_back(std::move(tmp_file));
}
return files;
}
}
void SortedBlocksWriter::insert(Block && block)
{
bool can_insert_more = false;
bool has_data_to_flush = false;
BlocksList current_blocks;
size_t row_count = 0;
size_t bytes = 0;
size_t flush_no = 0;
{
std::lock_guard lock{insert_mutex};
/// insert bock into BlocksList undef lock
inserted_blocks.insert(std::move(block));
size_t total_row_count = inserted_blocks.row_count + row_count_in_flush;
size_t total_bytes = inserted_blocks.bytes + bytes_in_flush;
can_insert_more = size_limits.softCheck(total_row_count, total_bytes);
has_data_to_flush = !size_limits.softCheck(inserted_blocks.row_count * num_streams, inserted_blocks.bytes * num_streams);
if (has_data_to_flush)
{
++flush_inflight;
current_blocks.swap(inserted_blocks.blocks);
row_count_in_flush = total_row_count;
bytes_in_flush = total_bytes;
row_count = inserted_blocks.row_count;
bytes = inserted_blocks.bytes;
inserted_blocks.clear();
}
else if (can_insert_more)
flush_no = flush_number;
}
if (has_data_to_flush)
{
/// flush new blocks without lock
auto flushed = flush(current_blocks);
current_blocks.clear();
std::lock_guard lock{insert_mutex};
sorted_files.emplace_back(std::move(flushed));
row_count_in_flush -= row_count;
bytes_in_flush -= bytes;
/// notify another insert (flush_number) and merge (flush_inflight)
++flush_number;
--flush_inflight;
flush_condvar.notify_all();
}
else if (!can_insert_more)
{
/// wakeup insert blocked by out of limit
std::unique_lock lock{insert_mutex};
flush_condvar.wait(lock, [&]{ return flush_no < flush_number; });
}
}
SortedBlocksWriter::TmpFilePtr SortedBlocksWriter::flush(const BlocksList & blocks) const
{
const std::string path = getPath();
if (blocks.empty())
return {};
if (blocks.size() == 1)
{
OneBlockInputStream sorted_input(blocks.front());
return flushToFile(path, sample_block, sorted_input, codec);
}
BlockInputStreams inputs;
inputs.reserve(blocks.size());
for (const auto & block : blocks)
if (block.rows())
inputs.push_back(std::make_shared<OneBlockInputStream>(block));
MergingSortedBlockInputStream sorted_input(inputs, sort_description, rows_in_block);
return flushToFile(path, sample_block, sorted_input, codec);
}
SortedBlocksWriter::SortedFiles SortedBlocksWriter::finishMerge(std::function<void(const Block &)> callback)
{
/// wait other flushes if any
{
std::unique_lock lock{insert_mutex};
flush_condvar.wait(lock, [&]{ return !flush_inflight; });
}
/// flush not flushed
if (!inserted_blocks.empty())
sorted_files.emplace_back(flush(inserted_blocks.blocks));
inserted_blocks.clear();
BlockInputStreams inputs;
inputs.reserve(num_files_for_merge);
/// Merge by parts to save memory. It's possible to exchange disk I/O and memory by num_files_for_merge.
{
SortedFiles new_files;
new_files.reserve(sorted_files.size() / num_files_for_merge + 1);
while (sorted_files.size() > num_files_for_merge)
{
for (const auto & file : sorted_files)
{
inputs.emplace_back(streamFromFile(file));
if (inputs.size() == num_files_for_merge || &file == &sorted_files.back())
{
MergingSortedBlockInputStream sorted_input(inputs, sort_description, rows_in_block);
new_files.emplace_back(flushToFile(getPath(), sample_block, sorted_input, codec));
inputs.clear();
}
}
sorted_files.clear();
sorted_files.swap(new_files);
}
for (const auto & file : sorted_files)
inputs.emplace_back(streamFromFile(file));
}
MergingSortedBlockInputStream sorted_input(inputs, sort_description, rows_in_block);
SortedFiles out = flushToManyFiles(getPath(), sample_block, sorted_input, codec, callback);
sorted_files.clear();
return out; /// There're also inserted_blocks counters as indirect output
}
BlockInputStreamPtr SortedBlocksWriter::streamFromFile(const TmpFilePtr & file) const
{
return std::make_shared<TemporaryFileLazyInputStream>(file->path(), sample_block);
}
String SortedBlocksWriter::getPath() const
{
return volume->getNextDisk()->getPath();
}
}

View File

@ -0,0 +1,96 @@
#pragma once
#include <mutex>
#include <condition_variable>
#include <Common/filesystemHelpers.h>
#include <Core/Block.h>
#include <Core/SortDescription.h>
#include <DataStreams/SizeLimits.h>
#include <DataStreams/IBlockStream_fwd.h>
namespace DB
{
class TableJoin;
class MergeJoinCursor;
struct MergeJoinEqualRange;
class Volume;
using VolumePtr = std::shared_ptr<Volume>;
struct SortedBlocksWriter
{
using TmpFilePtr = std::unique_ptr<TemporaryFile>;
using SortedFiles = std::vector<TmpFilePtr>;
struct Blocks
{
BlocksList blocks;
size_t row_count = 0;
size_t bytes = 0;
bool empty() const { return blocks.empty(); }
void insert(Block && block)
{
countBlockSize(block);
blocks.emplace_back(std::move(block));
}
void countBlockSize(const Block & block)
{
row_count += block.rows();
bytes += block.bytes();
}
void clear()
{
blocks.clear();
row_count = 0;
bytes = 0;
}
};
static constexpr const size_t num_streams = 2;
std::mutex insert_mutex;
std::condition_variable flush_condvar;
const SizeLimits & size_limits;
VolumePtr volume;
const Block & sample_block;
const SortDescription & sort_description;
Blocks & inserted_blocks;
const size_t rows_in_block;
const size_t num_files_for_merge;
const String & codec;
SortedFiles sorted_files;
size_t row_count_in_flush = 0;
size_t bytes_in_flush = 0;
size_t flush_number = 0;
size_t flush_inflight = 0;
SortedBlocksWriter(const SizeLimits & size_limits_, VolumePtr volume_, const Block & sample_block_, const SortDescription & description,
Blocks & blocks, size_t rows_in_block_, size_t num_files_to_merge_, const String & codec_)
: size_limits(size_limits_)
, volume(volume_)
, sample_block(sample_block_)
, sort_description(description)
, inserted_blocks(blocks)
, rows_in_block(rows_in_block_)
, num_files_for_merge(num_files_to_merge_)
, codec(codec_)
{
sorted_files.emplace_back(flush(inserted_blocks.blocks));
inserted_blocks.clear();
}
String getPath() const;
BlockInputStreamPtr streamFromFile(const TmpFilePtr & file) const;
void insert(Block && block);
TmpFilePtr flush(const BlocksList & blocks) const;
SortedFiles finishMerge(std::function<void(const Block &)> callback = [](const Block &){});
};
}

View File

@ -13,10 +13,6 @@
namespace DB namespace DB
{ {
namespace ErrorCodes
{
}
TableJoin::TableJoin(const Settings & settings, VolumePtr tmp_volume_) TableJoin::TableJoin(const Settings & settings, VolumePtr tmp_volume_)
: size_limits(SizeLimits{settings.max_rows_in_join, settings.max_bytes_in_join, settings.join_overflow_mode}) : size_limits(SizeLimits{settings.max_rows_in_join, settings.max_bytes_in_join, settings.join_overflow_mode})
, default_max_bytes(settings.default_max_bytes_in_join) , default_max_bytes(settings.default_max_bytes_in_join)
@ -25,6 +21,8 @@ TableJoin::TableJoin(const Settings & settings, VolumePtr tmp_volume_)
, join_algorithm(settings.join_algorithm) , join_algorithm(settings.join_algorithm)
, partial_merge_join_optimizations(settings.partial_merge_join_optimizations) , partial_merge_join_optimizations(settings.partial_merge_join_optimizations)
, partial_merge_join_rows_in_right_blocks(settings.partial_merge_join_rows_in_right_blocks) , partial_merge_join_rows_in_right_blocks(settings.partial_merge_join_rows_in_right_blocks)
, max_files_to_merge(settings.join_on_disk_max_files_to_merge)
, temporary_files_codec(settings.temporary_files_codec)
, tmp_volume(tmp_volume_) , tmp_volume(tmp_volume_)
{ {
if (settings.partial_merge_join) if (settings.partial_merge_join)

View File

@ -50,6 +50,8 @@ class TableJoin
JoinAlgorithm join_algorithm = JoinAlgorithm::AUTO; JoinAlgorithm join_algorithm = JoinAlgorithm::AUTO;
const bool partial_merge_join_optimizations = false; const bool partial_merge_join_optimizations = false;
const size_t partial_merge_join_rows_in_right_blocks = 0; const size_t partial_merge_join_rows_in_right_blocks = 0;
const size_t max_files_to_merge = 0;
const String temporary_files_codec = "LZ4";
Names key_names_left; Names key_names_left;
Names key_names_right; /// Duplicating names are qualified. Names key_names_right; /// Duplicating names are qualified.
@ -106,6 +108,8 @@ public:
size_t defaultMaxBytes() const { return default_max_bytes; } size_t defaultMaxBytes() const { return default_max_bytes; }
size_t maxJoinedBlockRows() const { return max_joined_block_rows; } size_t maxJoinedBlockRows() const { return max_joined_block_rows; }
size_t maxRowsInRightBlock() const { return partial_merge_join_rows_in_right_blocks; } size_t maxRowsInRightBlock() const { return partial_merge_join_rows_in_right_blocks; }
size_t maxFilesToMerge() const { return max_files_to_merge; }
const String & temporaryFilesCodec() const { return temporary_files_codec; }
bool enablePartialMergeJoinOptimizations() const { return partial_merge_join_optimizations; } bool enablePartialMergeJoinOptimizations() const { return partial_merge_join_optimizations; }
void addUsingKey(const ASTPtr & ast); void addUsingKey(const ASTPtr & ast);

View File

@ -1,4 +1,5 @@
SET max_memory_usage = 32000000; SET max_memory_usage = 32000000;
SET join_on_disk_max_files_to_merge = 4;
SELECT number * 200000 as n, j FROM numbers(5) nums SELECT number * 200000 as n, j FROM numbers(5) nums
ANY LEFT JOIN ( ANY LEFT JOIN (