PartialMergeJoin flush data on disk (#7186)

This commit is contained in:
Artem Zuikov 2019-10-15 19:31:49 +03:00 committed by Alexander Kuzmenkov
parent 1b8cb59e4b
commit cf99f88bc4
15 changed files with 509 additions and 53 deletions

View File

@ -306,8 +306,9 @@ struct Settings : public SettingsCollection<Settings>
M(SettingBool, join_any_take_last_row, false, "When disabled (default) ANY JOIN will take the first found row for a key. When enabled, it will take the last row seen if there are multiple rows for the same key.") \
M(SettingBool, partial_merge_join, false, "Use partial merge join instead of hash join for LEFT and INNER JOINs.") \
M(SettingBool, partial_merge_join_optimizations, false, "Enable optimizations in partial merge join") \
M(SettingUInt64, default_max_bytes_in_join, 100000000, "Maximum size of right-side table if limit's required but max_bytes_in_join is not set.") \
M(SettingUInt64, partial_merge_join_rows_in_right_blocks, 10000, "Split right-hand joining data in blocks of specified size. It's a portion of data indexed by min-max values and possibly unloaded on disk.") \
M(SettingFloat, partial_merge_join_rows_in_left_blocks, 10000, "Group left-hand joining data in bigger blocks. Setting it to a bigger value increase JOIN performance and memory usage.") \
M(SettingUInt64, partial_merge_join_rows_in_left_blocks, 10000, "Group left-hand joining data in bigger blocks. Setting it to a bigger value increase JOIN performance and memory usage.") \
\
M(SettingUInt64, max_rows_to_transfer, 0, "Maximum size (in rows) of the transmitted external table obtained when the GLOBAL IN/JOIN section is executed.") \
M(SettingUInt64, max_bytes_to_transfer, 0, "Maximum size (in uncompressed bytes) of the transmitted external table obtained when the GLOBAL IN/JOIN section is executed.") \

View File

@ -2,7 +2,7 @@
#include <DataStreams/MergeSortingBlockInputStream.h>
#include <DataStreams/MergingSortedBlockInputStream.h>
#include <DataStreams/NativeBlockOutputStream.h>
#include <DataStreams/copyData.h>
#include <DataStreams/TemporaryFileStream.h>
#include <DataStreams/processConstants.h>
#include <Common/formatReadable.h>
#include <IO/WriteBufferFromFile.h>
@ -84,14 +84,11 @@ Block MergeSortingBlockInputStream::readImpl()
temporary_files.emplace_back(createTemporaryFile(tmp_path));
const std::string & path = temporary_files.back()->path();
WriteBufferFromFile file_buf(path);
CompressedWriteBuffer compressed_buf(file_buf);
NativeBlockOutputStream block_out(compressed_buf, 0, header_without_constants);
MergeSortingBlocksBlockInputStream block_in(blocks, description, max_merged_block_size, limit);
LOG_INFO(log, "Sorting and writing part of data into temporary file " + path);
ProfileEvents::increment(ProfileEvents::ExternalSortWritePart);
copyData(block_in, block_out, &is_cancelled); /// NOTE. Possibly limit disk usage.
TemporaryFileStream::write(path, header_without_constants, block_in, &is_cancelled); /// NOTE. Possibly limit disk usage.
LOG_INFO(log, "Done writing part of data into temporary file " + path);
blocks.clear();
@ -138,7 +135,7 @@ Block MergeSortingBlockInputStream::readImpl()
MergeSortingBlocksBlockInputStream::MergeSortingBlocksBlockInputStream(
Blocks & blocks_, SortDescription & description_, size_t max_merged_block_size_, UInt64 limit_)
Blocks & blocks_, const SortDescription & description_, size_t max_merged_block_size_, UInt64 limit_)
: blocks(blocks_), header(blocks.at(0).cloneEmpty()), description(description_), max_merged_block_size(max_merged_block_size_), limit(limit_)
{
Blocks nonempty_blocks;

View File

@ -3,14 +3,13 @@
#include <queue>
#include <common/logger_useful.h>
#include <Common/filesystemHelpers.h>
#include <Common/filesystemHelpers.h>
#include <Core/SortDescription.h>
#include <Core/SortCursor.h>
#include <DataStreams/IBlockInputStream.h>
#include <DataStreams/NativeBlockInputStream.h>
#include <DataStreams/TemporaryFileStream.h>
#include <IO/ReadBufferFromFile.h>
#include <Compression/CompressedReadBuffer.h>
@ -19,6 +18,8 @@
namespace DB
{
struct TemporaryFileStream;
namespace ErrorCodes
{
extern const int NOT_ENOUGH_SPACE;
@ -34,7 +35,7 @@ class MergeSortingBlocksBlockInputStream : public IBlockInputStream
{
public:
/// limit - if not 0, allowed to return just first 'limit' rows in sorted order.
MergeSortingBlocksBlockInputStream(Blocks & blocks_, SortDescription & description_,
MergeSortingBlocksBlockInputStream(Blocks & blocks_, const SortDescription & description_,
size_t max_merged_block_size_, UInt64 limit_ = 0);
String getName() const override { return "MergeSortingBlocks"; }

View File

@ -9,24 +9,28 @@ namespace DB
bool SizeLimits::check(UInt64 rows, UInt64 bytes, const char * what, int exception_code) const
{
if (max_rows && rows > max_rows)
if (overflow_mode == OverflowMode::THROW)
{
if (overflow_mode == OverflowMode::THROW)
if (max_rows && rows > max_rows)
throw Exception("Limit for " + std::string(what) + " exceeded, max rows: " + formatReadableQuantity(max_rows)
+ ", current rows: " + formatReadableQuantity(rows), exception_code);
else
return false;
}
if (max_bytes && bytes > max_bytes)
{
if (overflow_mode == OverflowMode::THROW)
if (max_bytes && bytes > max_bytes)
throw Exception("Limit for " + std::string(what) + " exceeded, max bytes: " + formatReadableSizeWithBinarySuffix(max_bytes)
+ ", current bytes: " + formatReadableSizeWithBinarySuffix(bytes), exception_code);
else
return false;
return true;
}
return softCheck(rows, bytes);
}
bool SizeLimits::softCheck(UInt64 rows, UInt64 bytes) const
{
if (max_rows && rows > max_rows)
return false;
if (max_bytes && bytes > max_bytes)
return false;
return true;
}

View File

@ -32,6 +32,11 @@ struct SizeLimits
/// Check limits. If exceeded, return false or throw an exception, depending on overflow_mode.
bool check(UInt64 rows, UInt64 bytes, const char * what, int exception_code) const;
/// Check limits. No exceptions.
bool softCheck(UInt64 rows, UInt64 bytes) const;
bool hasLimits() const { return max_rows || max_bytes; }
};
}

View File

@ -3,8 +3,12 @@
#include <Common/ClickHouseRevision.h>
#include <DataStreams/IBlockInputStream.h>
#include <DataStreams/NativeBlockInputStream.h>
#include <DataStreams/NativeBlockOutputStream.h>
#include <DataStreams/copyData.h>
#include <Compression/CompressedReadBuffer.h>
#include <Compression/CompressedWriteBuffer.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/WriteBufferFromFile.h>
namespace DB
{
@ -27,6 +31,46 @@ struct TemporaryFileStream
, compressed_in(file_in)
, block_in(std::make_shared<NativeBlockInputStream>(compressed_in, header_, 0))
{}
/// Flush data from input stream into file for future reading
static void write(const std::string & path, const Block & header, IBlockInputStream & input, std::atomic<bool> * is_cancelled = nullptr)
{
WriteBufferFromFile file_buf(path);
CompressedWriteBuffer compressed_buf(file_buf);
NativeBlockOutputStream output(compressed_buf, 0, header);
copyData(input, output, is_cancelled);
}
};
class TemporaryFileLazyInputStream : public IBlockInputStream
{
public:
TemporaryFileLazyInputStream(const std::string & path_, const Block & header_)
: path(path_)
, header(header_)
, done(false)
{}
String getName() const override { return "TemporaryFile"; }
Block getHeader() const override { return header; }
void readSuffix() override {}
protected:
Block readImpl() override
{
if (!done)
{
done = true;
TemporaryFileStream stream(path, header);
return stream.block_in->read();
}
return {};
}
private:
const std::string path;
Block header;
bool done;
};
}

View File

@ -24,12 +24,14 @@ namespace ErrorCodes
extern const int PARAMETER_OUT_OF_BOUND;
}
AnalyzedJoin::AnalyzedJoin(const Settings & settings)
AnalyzedJoin::AnalyzedJoin(const Settings & settings, const String & tmp_path_)
: size_limits(SizeLimits{settings.max_rows_in_join, settings.max_bytes_in_join, settings.join_overflow_mode})
, default_max_bytes(settings.default_max_bytes_in_join)
, join_use_nulls(settings.join_use_nulls)
, partial_merge_join(settings.partial_merge_join)
, partial_merge_join_optimizations(settings.partial_merge_join_optimizations)
, partial_merge_join_rows_in_right_blocks(settings.partial_merge_join_rows_in_right_blocks)
, tmp_path(tmp_path_)
{}
void AnalyzedJoin::addUsingKey(const ASTPtr & ast)

View File

@ -38,6 +38,7 @@ class AnalyzedJoin
friend class SyntaxAnalyzer;
const SizeLimits size_limits;
const size_t default_max_bytes;
const bool join_use_nulls;
const bool partial_merge_join = false;
const bool partial_merge_join_optimizations = false;
@ -59,13 +60,16 @@ class AnalyzedJoin
/// Original name -> name. Only ranamed columns.
std::unordered_map<String, String> renames;
String tmp_path;
public:
AnalyzedJoin(const Settings &);
AnalyzedJoin(const Settings &, const String & tmp_path);
/// for StorageJoin
AnalyzedJoin(SizeLimits limits, bool use_nulls, ASTTableJoin::Kind kind, ASTTableJoin::Strictness strictness,
const Names & key_names_right_)
: size_limits(limits)
, default_max_bytes(0)
, join_use_nulls(use_nulls)
, key_names_right(key_names_right_)
{
@ -76,9 +80,11 @@ public:
ASTTableJoin::Kind kind() const { return table_join.kind; }
ASTTableJoin::Strictness strictness() const { return table_join.strictness; }
const SizeLimits & sizeLimits() const { return size_limits; }
const String & getTemporaryPath() const { return tmp_path; }
bool forceNullableRight() const { return join_use_nulls && isLeftOrFull(table_join.kind); }
bool forceNullableLeft() const { return join_use_nulls && isRightOrFull(table_join.kind); }
size_t defaultMaxBytes() const { return default_max_bytes; }
size_t maxRowsInRightBlock() const { return partial_merge_join_rows_in_right_blocks; }
bool enablePartialMergeJoinOptimizations() const { return partial_merge_join_optimizations; }

View File

@ -1,3 +1,5 @@
#include <limits>
#include <Core/NamesAndTypes.h>
#include <Core/SortCursor.h>
#include <Columns/ColumnNullable.h>
@ -7,6 +9,10 @@
#include <Interpreters/join_common.h>
#include <DataStreams/materializeBlock.h>
#include <DataStreams/MergeSortingBlockInputStream.h>
#include <DataStreams/MergingSortedBlockInputStream.h>
#include <DataStreams/OneBlockInputStream.h>
#include <DataStreams/TemporaryFileStream.h>
#include <DataStreams/ConcatBlockInputStream.h>
namespace DB
{
@ -16,6 +22,7 @@ namespace ErrorCodes
extern const int SET_SIZE_LIMIT_EXCEEDED;
extern const int NOT_IMPLEMENTED;
extern const int PARAMETER_OUT_OF_BOUND;
extern const int NOT_ENOUGH_SPACE;
extern const int LOGICAL_ERROR;
}
@ -133,9 +140,8 @@ public:
return getNextEqualRangeImpl<false>(rhs);
}
int intersect(const Block & right_block, const Block & right_table_keys, const Names & key_names)
int intersect(const Block & min_max, const Names & key_names)
{
const Block min_max = extractMinMax(right_block, right_table_keys);
if (end() == 0 || min_max.rows() != 2)
throw Exception("Unexpected block size", ErrorCodes::LOGICAL_ERROR);
@ -320,21 +326,127 @@ void joinInequalsLeft(const Block & left_block, MutableColumns & left_columns, M
appendNulls(right_columns, rows_to_add);
}
Blocks blocksListToBlocks(const BlocksList & in_blocks)
{
Blocks out_blocks;
out_blocks.reserve(in_blocks.size());
for (const auto & block : in_blocks)
out_blocks.push_back(block);
return out_blocks;
}
std::unique_ptr<TemporaryFile> flushBlockToFile(const String & tmp_path, const Block & header, Block && block)
{
auto tmp_file = createTemporaryFile(tmp_path);
OneBlockInputStream stream(block);
std::atomic<bool> is_cancelled{false};
TemporaryFileStream::write(tmp_file->path(), header, stream, &is_cancelled);
if (is_cancelled)
throw Exception("Cannot flush MergeJoin data on disk. No space at " + tmp_path, ErrorCodes::NOT_ENOUGH_SPACE);
return tmp_file;
}
void flushStreamToFiles(const String & tmp_path, const Block & header, IBlockInputStream & stream,
std::vector<std::unique_ptr<TemporaryFile>> & files,
std::function<void(const Block &)> callback = [](const Block &){})
{
while (Block block = stream.read())
{
if (!block.rows())
continue;
callback(block);
auto tmp_file = flushBlockToFile(tmp_path, header, std::move(block));
files.emplace_back(std::move(tmp_file));
}
}
BlockInputStreams makeSortedInputStreams(std::vector<MiniLSM::SortedFiles> & sorted_files, const Block & header)
{
BlockInputStreams inputs;
for (const auto & track : sorted_files)
{
BlockInputStreams sequence;
for (const auto & file : track)
sequence.emplace_back(std::make_shared<TemporaryFileLazyInputStream>(file->path(), header));
inputs.emplace_back(std::make_shared<ConcatBlockInputStream>(sequence));
}
return inputs;
}
}
void MiniLSM::insert(const BlocksList & blocks)
{
if (blocks.empty())
return;
MergeJoin::MergeJoin(std::shared_ptr<AnalyzedJoin> table_join_, const Block & right_sample_block)
SortedFiles sorted_blocks;
if (blocks.size() > 1)
{
BlockInputStreams inputs;
inputs.reserve(blocks.size());
for (auto & block : blocks)
inputs.push_back(std::make_shared<OneBlockInputStream>(block));
MergingSortedBlockInputStream sorted_input(inputs, sort_description, rows_in_block);
flushStreamToFiles(path, sample_block, sorted_input, sorted_blocks);
}
else
{
OneBlockInputStream sorted_input(blocks.front());
flushStreamToFiles(path, sample_block, sorted_input, sorted_blocks);
}
sorted_files.emplace_back(std::move(sorted_blocks));
if (sorted_files.size() >= max_size)
merge();
}
/// TODO: better merge strategy
void MiniLSM::merge(std::function<void(const Block &)> callback)
{
BlockInputStreams inputs = makeSortedInputStreams(sorted_files, sample_block);
MergingSortedBlockInputStream sorted_stream(inputs, sort_description, rows_in_block);
SortedFiles out;
flushStreamToFiles(path, sample_block, sorted_stream, out, callback);
sorted_files.clear();
sorted_files.emplace_back(std::move(out));
}
MergeJoin::MergeJoin(std::shared_ptr<AnalyzedJoin> table_join_, const Block & right_sample_block_)
: table_join(table_join_)
, size_limits(table_join->sizeLimits())
, right_sample_block(right_sample_block_)
, nullable_right_side(table_join->forceNullableRight())
, is_all(table_join->strictness() == ASTTableJoin::Strictness::All)
, is_inner(isInner(table_join->kind()))
, is_left(isLeft(table_join->kind()))
, skip_not_intersected(table_join->enablePartialMergeJoinOptimizations())
, max_rows_in_right_block(table_join->maxRowsInRightBlock())
{
if (!isLeft(table_join->kind()) && !isInner(table_join->kind()))
throw Exception("Partial merge supported for LEFT and INNER JOINs only", ErrorCodes::NOT_IMPLEMENTED);
if (!max_rows_in_right_block)
throw Exception("partial_merge_join_rows_in_right_blocks cannot be zero", ErrorCodes::PARAMETER_OUT_OF_BOUND);
if (!size_limits.hasLimits())
{
size_limits.max_bytes = table_join->defaultMaxBytes();
if (!size_limits.max_bytes)
throw Exception("No limit for MergeJoin (max_rows_in_join, max_bytes_in_join or default_max_bytes_in_join have to be set)",
ErrorCodes::PARAMETER_OUT_OF_BOUND);
}
JoinCommon::extractKeysForJoin(table_join->keyNamesRight(), right_sample_block, right_table_keys, right_columns_to_add);
const NameSet required_right_keys = table_join->requiredRightKeys();
@ -350,6 +462,8 @@ MergeJoin::MergeJoin(std::shared_ptr<AnalyzedJoin> table_join_, const Block & ri
makeSortAndMerge(table_join->keyNamesLeft(), left_sort_description, left_merge_description);
makeSortAndMerge(table_join->keyNamesRight(), right_sort_description, right_merge_description);
lsm = std::make_unique<MiniLSM>(table_join->getTemporaryPath(), right_sample_block, right_sort_description, max_rows_in_right_block);
}
void MergeJoin::setTotals(const Block & totals_block)
@ -365,24 +479,83 @@ void MergeJoin::joinTotals(Block & block) const
void MergeJoin::mergeRightBlocks()
{
if (is_in_memory)
mergeInMemoryRightBlocks();
else
mergeFlushedRightBlocks();
}
void MergeJoin::mergeInMemoryRightBlocks()
{
std::unique_lock lock(rwlock);
if (right_blocks.empty())
return;
Blocks unsorted_blocks;
unsorted_blocks.reserve(right_blocks.size());
for (const auto & block : right_blocks)
unsorted_blocks.push_back(block);
size_t max_rows_in_block = table_join->maxRowsInRightBlock();
if (!max_rows_in_block)
throw Exception("partial_merge_join_rows_in_right_blocks cannot be zero", ErrorCodes::PARAMETER_OUT_OF_BOUND);
Blocks blocks_to_merge = blocksListToBlocks(right_blocks);
clearRightBlocksList();
/// TODO: there should be no splitted keys by blocks for RIGHT|FULL JOIN
MergeSortingBlocksBlockInputStream stream(unsorted_blocks, right_sort_description, max_rows_in_block);
MergeSortingBlocksBlockInputStream sorted_input(blocks_to_merge, right_sort_description, max_rows_in_right_block);
right_blocks.clear();
while (Block block = stream.read())
right_blocks.emplace_back(std::move(block));
while (Block block = sorted_input.read())
{
if (!block.rows())
continue;
if (skip_not_intersected)
min_max_right_blocks.emplace_back(extractMinMax(block, right_table_keys));
countBlockSize(block);
loaded_right_blocks.emplace_back(std::make_shared<Block>(std::move(block)));
}
}
void MergeJoin::mergeFlushedRightBlocks()
{
std::unique_lock lock(rwlock);
lsm->insert(right_blocks);
clearRightBlocksList();
auto callback = [&](const Block & block)
{
if (skip_not_intersected)
min_max_right_blocks.emplace_back(extractMinMax(block, right_table_keys));
countBlockSize(block);
};
lsm->merge(callback);
flushed_right_blocks.swap(lsm->sorted_files.front());
/// Get memory limit or aproximate it from row limit and bytes per row factor
UInt64 memory_limit = size_limits.max_bytes;
UInt64 rows_limit = size_limits.max_rows;
if (!memory_limit && rows_limit)
memory_limit = right_blocks_bytes * rows_limit / right_blocks_row_count;
cached_right_blocks = std::make_unique<Cache>(memory_limit);
}
void MergeJoin::flushRightBlocks()
{
/// it's under unique_lock(rwlock)
is_in_memory = false;
lsm->insert(right_blocks);
clearRightBlocksList();
}
bool MergeJoin::saveRightBlock(Block && block)
{
std::unique_lock lock(rwlock);
countBlockSize(block);
right_blocks.emplace_back(std::move(block));
bool has_memory = size_limits.softCheck(right_blocks_row_count, right_blocks_bytes);
if (!has_memory)
flushRightBlocks();
return true;
}
bool MergeJoin::addJoinedBlock(const Block & src_block)
@ -391,14 +564,7 @@ bool MergeJoin::addJoinedBlock(const Block & src_block)
JoinCommon::removeLowCardinalityInplace(block);
sortBlock(block, right_sort_description);
std::unique_lock lock(rwlock);
right_blocks.push_back(block);
right_blocks_row_count += block.rows();
right_blocks_bytes += block.bytes();
return table_join->sizeLimits().check(right_blocks_row_count, right_blocks_bytes, "JOIN", ErrorCodes::SET_SIZE_LIMIT_EXCEEDED);
return saveRightBlock(std::move(block));
}
void MergeJoin::joinBlock(Block & block)
@ -408,7 +574,15 @@ void MergeJoin::joinBlock(Block & block)
JoinCommon::removeLowCardinalityInplace(block);
sortBlock(block, left_sort_description);
if (is_in_memory)
joinSortedBlock<true>(block);
else
joinSortedBlock<false>(block);
}
template <bool in_memory>
void MergeJoin::joinSortedBlock(Block & block)
{
std::shared_lock lock(rwlock);
size_t rows_to_reserve = is_left ? block.rows() : 0;
@ -416,24 +590,27 @@ void MergeJoin::joinBlock(Block & block)
MutableColumns right_columns = makeMutableColumns(right_columns_to_add, rows_to_reserve);
MergeJoinCursor left_cursor(block, left_merge_description);
size_t left_key_tail = 0;
size_t right_blocks_count = rightBlocksCount<in_memory>();
if (is_left)
{
for (auto it = right_blocks.begin(); it != right_blocks.end(); ++it)
for (size_t i = 0; i < right_blocks_count; ++i)
{
if (left_cursor.atEnd())
break;
if (skip_not_intersected)
{
int intersection = left_cursor.intersect(*it, right_table_keys, table_join->keyNamesRight());
int intersection = left_cursor.intersect(min_max_right_blocks[i], table_join->keyNamesRight());
if (intersection < 0)
break; /// (left) ... (right)
if (intersection > 0)
continue; /// (right) ... (left)
}
leftJoin(left_cursor, block, *it, left_columns, right_columns, left_key_tail);
std::shared_ptr<Block> right_block = loadRightBlock<in_memory>(i);
leftJoin(left_cursor, block, *right_block, left_columns, right_columns, left_key_tail);
}
left_cursor.nextN(left_key_tail);
@ -445,21 +622,23 @@ void MergeJoin::joinBlock(Block & block)
}
else if (is_inner)
{
for (auto it = right_blocks.begin(); it != right_blocks.end(); ++it)
for (size_t i = 0; i < right_blocks_count; ++i)
{
if (left_cursor.atEnd())
break;
if (skip_not_intersected)
{
int intersection = left_cursor.intersect(*it, right_table_keys, table_join->keyNamesRight());
int intersection = left_cursor.intersect(min_max_right_blocks[i], table_join->keyNamesRight());
if (intersection < 0)
break; /// (left) ... (right)
if (intersection > 0)
continue; /// (right) ... (left)
}
innerJoin(left_cursor, block, *it, left_columns, right_columns, left_key_tail);
std::shared_ptr<Block> right_block = loadRightBlock<in_memory>(i);
innerJoin(left_cursor, block, *right_block, left_columns, right_columns, left_key_tail);
}
left_cursor.nextN(left_key_tail);
@ -546,4 +725,30 @@ void MergeJoin::addRightColumns(Block & block, MutableColumns && right_columns)
}
}
template <bool in_memory>
size_t MergeJoin::rightBlocksCount()
{
if constexpr (!in_memory)
return flushed_right_blocks.size();
else
return loaded_right_blocks.size();
}
template <bool in_memory>
std::shared_ptr<Block> MergeJoin::loadRightBlock(size_t pos)
{
if constexpr (!in_memory)
{
auto load_func = [&]() -> std::shared_ptr<Block>
{
TemporaryFileStream input(flushed_right_blocks[pos]->path(), right_sample_block);
return std::make_shared<Block>(input.block_in->read());
};
return cached_right_blocks->getOrSet(pos, load_func).first;
}
else
return loaded_right_blocks[pos];
}
}

View File

@ -3,10 +3,12 @@
#include <memory>
#include <shared_mutex>
#include <Common/LRUCache.h>
#include <Common/filesystemHelpers.h>
#include <Core/Block.h>
#include <Core/SortDescription.h>
#include <Interpreters/IJoin.h>
#include <DataStreams/SizeLimits.h>
namespace DB
{
@ -15,6 +17,31 @@ class AnalyzedJoin;
class MergeJoinCursor;
struct MergeJoinEqualRange;
struct MiniLSM
{
using SortedFiles = std::vector<std::unique_ptr<TemporaryFile>>;
const String & path;
const Block & sample_block;
const SortDescription & sort_description;
const size_t rows_in_block;
const size_t max_size;
std::vector<SortedFiles> sorted_files;
MiniLSM(const String & path_, const Block & sample_block_, const SortDescription & description,
size_t rows_in_block_, size_t max_size_ = 16)
: path(path_)
, sample_block(sample_block_)
, sort_description(description)
, rows_in_block(rows_in_block_)
, max_size(max_size_)
{}
void insert(const BlocksList & blocks);
void merge(std::function<void(const Block &)> callback = [](const Block &){});
};
class MergeJoin : public IJoin
{
public:
@ -28,32 +55,77 @@ public:
size_t getTotalRowCount() const override { return right_blocks_row_count; }
private:
/// There're two size limits for right-hand table: max_rows_in_join, max_bytes_in_join.
/// max_bytes is prefered. If it isn't set we aproximate it as (max_rows * bytes/row).
struct BlockByteWeight
{
size_t operator()(const Block & block) const { return block.bytes(); }
};
using Cache = LRUCache<size_t, Block, std::hash<size_t>, BlockByteWeight>;
mutable std::shared_mutex rwlock;
std::shared_ptr<AnalyzedJoin> table_join;
SizeLimits size_limits;
SortDescription left_sort_description;
SortDescription right_sort_description;
SortDescription left_merge_description;
SortDescription right_merge_description;
Block right_sample_block;
Block right_table_keys;
Block right_columns_to_add;
BlocksList right_blocks;
Blocks min_max_right_blocks;
std::unique_ptr<Cache> cached_right_blocks;
std::vector<std::shared_ptr<Block>> loaded_right_blocks;
std::unique_ptr<MiniLSM> lsm;
MiniLSM::SortedFiles flushed_right_blocks;
Block totals;
size_t right_blocks_row_count = 0;
size_t right_blocks_bytes = 0;
bool is_in_memory = true;
const bool nullable_right_side;
const bool is_all;
const bool is_inner;
const bool is_left;
const bool skip_not_intersected;
const size_t max_rows_in_right_block;
void changeLeftColumns(Block & block, MutableColumns && columns);
void addRightColumns(Block & block, MutableColumns && columns);
void mergeRightBlocks();
template <bool in_memory>
size_t rightBlocksCount();
template <bool in_memory>
void joinSortedBlock(Block & block);
template <bool in_memory>
std::shared_ptr<Block> loadRightBlock(size_t pos);
void leftJoin(MergeJoinCursor & left_cursor, const Block & left_block, const Block & right_block,
MutableColumns & left_columns, MutableColumns & right_columns, size_t & left_key_tail);
void innerJoin(MergeJoinCursor & left_cursor, const Block & left_block, const Block & right_block,
MutableColumns & left_columns, MutableColumns & right_columns, size_t & left_key_tail);
bool saveRightBlock(Block && block);
void flushRightBlocks();
void mergeInMemoryRightBlocks();
void mergeFlushedRightBlocks();
void clearRightBlocksList()
{
right_blocks.clear();
right_blocks_row_count = 0;
right_blocks_bytes = 0;
}
void countBlockSize(const Block & block)
{
right_blocks_row_count += block.rows();
right_blocks_bytes += block.bytes();
}
};
}

View File

@ -805,7 +805,7 @@ SyntaxAnalyzerResultPtr SyntaxAnalyzer::analyze(
SyntaxAnalyzerResult result;
result.storage = storage;
result.source_columns = source_columns_;
result.analyzed_join = std::make_shared<AnalyzedJoin>(settings); /// TODO: move to select_query logic
result.analyzed_join = std::make_shared<AnalyzedJoin>(settings, context.getTemporaryPath()); /// TODO: move to select_query logic
collectSourceColumns(select_query, result.storage, result.source_columns);
NameSet source_columns_set = removeDuplicateColumns(result.source_columns);

View File

@ -0,0 +1,12 @@
0 10
1 0
2 11
3 0
0 10
1 0
2 11
3 0
0 10
1 0
2 11
3 0

View File

@ -0,0 +1,35 @@
SET partial_merge_join = 0;
SELECT number as n, j FROM numbers(4)
ANY LEFT JOIN (
SELECT number * 2 AS n, number + 10 AS j
FROM numbers(4000)
) js2
USING n;
SET max_rows_in_join = 1000;
SELECT number as n, j FROM numbers(4)
ANY LEFT JOIN (
SELECT number * 2 AS n, number + 10 AS j
FROM numbers(4000)
) js2
USING n; -- { serverError 191 }
SET partial_merge_join = 1;
SELECT number as n, j FROM numbers(4)
ANY LEFT JOIN (
SELECT number * 2 AS n, number + 10 AS j
FROM numbers(4000)
) js2
USING n;
SET partial_merge_join_optimizations = 1;
SELECT number as n, j FROM numbers(4)
ANY LEFT JOIN (
SELECT number * 2 AS n, number + 10 AS j
FROM numbers(4000)
) js2
USING n;

View File

@ -0,0 +1,15 @@
0 0
200000 100000
400000 200000
600000 300000
800000 400000
0 0
200000 100000
400000 200000
600000 300000
800000 400000
0 0
200000 100000
400000 200000
600000 300000
800000 400000

View File

@ -0,0 +1,57 @@
SET max_memory_usage = 32000000;
SELECT number * 200000 as n, j FROM numbers(5)
ANY LEFT JOIN (
SELECT number * 2 AS n, number AS j
FROM numbers(1000000)
) js2
USING n; -- { serverError 241 }
SET partial_merge_join = 1;
SET default_max_bytes_in_join = 0;
SELECT number * 200000 as n, j FROM numbers(5)
ANY LEFT JOIN (
SELECT number * 2 AS n, number AS j
FROM numbers(1000000)
) js2
USING n; -- { serverError 12 }
SELECT number * 200000 as n, j FROM numbers(5)
ANY LEFT JOIN (
SELECT number * 2 AS n, number AS j
FROM numbers(1000000)
) js2
USING n
SETTINGS max_bytes_in_join = 30000000; -- { serverError 241 }
SELECT number * 200000 as n, j FROM numbers(5)
ANY LEFT JOIN (
SELECT number * 2 AS n, number AS j
FROM numbers(1000000)
) js2
USING n
ORDER BY n
SETTINGS max_bytes_in_join = 10000000;
SET partial_merge_join_optimizations = 1;
SET partial_merge_join_rows_in_left_blocks = 100000;
SELECT number * 200000 as n, j FROM numbers(5)
LEFT JOIN (
SELECT number * 2 AS n, number AS j
FROM numbers(1000000)
) js2
USING n
ORDER BY n
SETTINGS max_rows_in_join = 100000;
SET default_max_bytes_in_join = 10000000;
SELECT number * 200000 as n, j FROM numbers(5)
JOIN (
SELECT number * 2 AS n, number AS j
FROM numbers(1000000)
) js2
USING n
ORDER BY n;