squash left-hand blocks in pmj (phase 2)

This commit is contained in:
chertus 2019-09-26 21:51:17 +03:00
parent dd08f06c81
commit 10eaef1adc
8 changed files with 30 additions and 18 deletions

View File

@ -292,9 +292,9 @@ struct Settings : public SettingsCollection<Settings>
M(SettingOverflowMode, join_overflow_mode, OverflowMode::THROW, "What to do when the limit is exceeded.") \
M(SettingBool, join_any_take_last_row, false, "When disabled (default) ANY JOIN will take the first found row for a key. When enabled, it will take the last row seen if there are multiple rows for the same key.") \
M(SettingBool, partial_merge_join, false, "Use partial merge join instead of hash join for LEFT and INNER JOINs.") \
M(SettingBool, partial_merge_join_optimisations, false, "Enable optimisations in partial merge join") \
M(SettingBool, partial_merge_join_optimizations, false, "Enable optimizations in partial merge join") \
M(SettingUInt64, partial_merge_join_rows_in_right_blocks, 10000, "Split right-hand joining data in blocks of specified size.") \
M(SettingFloat, partial_merge_join_memory_coefficient, 0.25, "") \
M(SettingFloat, partial_merge_join_memory_coefficient, 0.25, "How much query memory would be used for left|right table in join. Do not include result data memory.") \
\
M(SettingUInt64, max_rows_to_transfer, 0, "Maximum size (in rows) of the transmitted external table obtained when the GLOBAL IN/JOIN section is executed.") \
M(SettingUInt64, max_bytes_to_transfer, 0, "Maximum size (in uncompressed bytes) of the transmitted external table obtained when the GLOBAL IN/JOIN section is executed.") \

View File

@ -6,7 +6,7 @@ namespace DB
SquashingBlockInputStream::SquashingBlockInputStream(
const BlockInputStreamPtr & src, size_t min_block_size_rows, size_t min_block_size_bytes)
: header(src->getHeader()), transform(min_block_size_rows, min_block_size_bytes)
: header(src->getHeader()), transform(min_block_size_rows, min_block_size_bytes, true)
{
children.emplace_back(src);
}

View File

@ -4,8 +4,10 @@
namespace DB
{
SquashingTransform::SquashingTransform(size_t min_block_size_rows_, size_t min_block_size_bytes_)
: min_block_size_rows(min_block_size_rows_), min_block_size_bytes(min_block_size_bytes_)
SquashingTransform::SquashingTransform(size_t min_block_size_rows_, size_t min_block_size_bytes_, bool reserve_memory_)
: min_block_size_rows(min_block_size_rows_)
, min_block_size_bytes(min_block_size_bytes_)
, reserve_memory(reserve_memory_)
{
}
@ -59,7 +61,12 @@ void SquashingTransform::append(MutableColumns && columns)
}
for (size_t i = 0, size = columns.size(); i < size; ++i)
accumulated_columns[i]->insertRangeFrom(*columns[i], 0, columns[i]->size());
{
auto & column = accumulated_columns[i];
if (reserve_memory)
column->reserve(min_block_size_bytes);
column->insertRangeFrom(*columns[i], 0, columns[i]->size());
}
}

View File

@ -23,7 +23,7 @@ class SquashingTransform
{
public:
/// Conditions on rows and bytes are OR-ed. If one of them is zero, then corresponding condition is ignored.
SquashingTransform(size_t min_block_size_rows_, size_t min_block_size_bytes_);
SquashingTransform(size_t min_block_size_rows_, size_t min_block_size_bytes_, bool reserve_memory_ = false);
/// When not ready, you need to pass more blocks to add function.
struct Result
@ -43,6 +43,7 @@ public:
private:
size_t min_block_size_rows;
size_t min_block_size_bytes;
bool reserve_memory;
MutableColumns accumulated_columns;

View File

@ -21,6 +21,7 @@ namespace DB
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int PARAMETER_OUT_OF_BOUND;
}
AnalyzedJoin::AnalyzedJoin(const Settings & settings)
@ -31,9 +32,13 @@ AnalyzedJoin::AnalyzedJoin(const Settings & settings)
settings.join_overflow_mode})
, join_use_nulls(settings.join_use_nulls)
, partial_merge_join(settings.partial_merge_join)
, partial_merge_join_optimisations(settings.partial_merge_join_optimisations)
, partial_merge_join_optimizations(settings.partial_merge_join_optimizations)
, partial_merge_join_rows_in_right_blocks(settings.partial_merge_join_rows_in_right_blocks)
{}
{
Float32 memory_coef = settings.partial_merge_join_memory_coefficient;
if (memory_coef < 0.0f || memory_coef > 1.0f)
throw Exception("Wrond partial_merge_join_memory_coefficient. It should be in range [0,1]", ErrorCodes::PARAMETER_OUT_OF_BOUND);
}
void AnalyzedJoin::addUsingKey(const ASTPtr & ast)
{

View File

@ -39,9 +39,9 @@ class AnalyzedJoin
const SizeLimits size_limits;
const bool join_use_nulls;
const bool partial_merge_join;
const bool partial_merge_join_optimisations;
const size_t partial_merge_join_rows_in_right_blocks;
const bool partial_merge_join = false;
const bool partial_merge_join_optimizations = false;
const size_t partial_merge_join_rows_in_right_blocks = 0;
Names key_names_left;
Names key_names_right; /// Duplicating names are qualified.
@ -67,9 +67,6 @@ public:
const Names & key_names_right_)
: size_limits(limits)
, join_use_nulls(use_nulls)
, partial_merge_join(false)
, partial_merge_join_optimisations(false)
, partial_merge_join_rows_in_right_blocks(0)
, key_names_right(key_names_right_)
{
table_join.kind = kind;
@ -83,7 +80,7 @@ public:
bool forceNullableRight() const { return join_use_nulls && isLeftOrFull(table_join.kind); }
bool forceNullableLeft() const { return join_use_nulls && isRightOrFull(table_join.kind); }
size_t maxRowsInRightBlock() const { return partial_merge_join_rows_in_right_blocks; }
bool enablePartialMergeJoinOptimisations() const { return partial_merge_join_optimisations; }
bool enablePartialMergeJoinOptimizations() const { return partial_merge_join_optimizations; }
void addUsingKey(const ASTPtr & ast);
void addOnKeys(ASTPtr & left_table_ast, ASTPtr & right_table_ast);

View File

@ -1118,10 +1118,12 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS
for (auto & stream : pipeline.streams)
stream = std::make_shared<ExpressionBlockInputStream>(stream, expressions.before_join);
if (isMergeJoin(expressions.before_join->getTableJoinAlgo()) && settings.partial_merge_join_optimisations)
if (isMergeJoin(expressions.before_join->getTableJoinAlgo()) && settings.partial_merge_join_optimizations)
{
/// TODO: * min(query_memoty_limit, max_bytes_in_join)
size_t bytes_in_block = settings.partial_merge_join_memory_coefficient * settings.max_bytes_in_join;
if (pipeline.streams.size())
bytes_in_block /= pipeline.streams.size();
if (bytes_in_block)
for (auto & stream : pipeline.streams)
stream = std::make_shared<SquashingBlockInputStream>(stream, 0, bytes_in_block);

View File

@ -330,7 +330,7 @@ MergeJoin::MergeJoin(std::shared_ptr<AnalyzedJoin> table_join_, const Block & ri
, is_all(table_join->strictness() == ASTTableJoin::Strictness::All)
, is_inner(isInner(table_join->kind()))
, is_left(isLeft(table_join->kind()))
, skip_not_intersected(table_join->enablePartialMergeJoinOptimisations())
, skip_not_intersected(table_join->enablePartialMergeJoinOptimizations())
{
if (!isLeft(table_join->kind()) && !isInner(table_join->kind()))
throw Exception("Partial merge supported for LEFT and INNER JOINs only", ErrorCodes::NOT_IMPLEMENTED);