Merge pull request #56996 from ClickHouse/vdimir/hash_join_max_block_size

HashJoin respects max_joined_block_size_rows
This commit is contained in:
Alexey Milovidov 2023-12-27 15:46:46 +01:00 committed by GitHub
commit 0e678fb6c1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 201 additions and 47 deletions

View File

@ -44,7 +44,8 @@ ConcurrentHashJoin::ConcurrentHashJoin(ContextPtr context_, std::shared_ptr<Tabl
for (size_t i = 0; i < slots; ++i)
{
auto inner_hash_join = std::make_shared<InternalHashJoin>();
inner_hash_join->data = std::make_unique<HashJoin>(table_join_, right_sample_block, any_take_last_row_);
inner_hash_join->data = std::make_unique<HashJoin>(table_join_, right_sample_block, any_take_last_row_, 0, fmt::format("concurrent{}", i));
hash_joins.emplace_back(std::move(inner_hash_join));
}
}

View File

@ -271,7 +271,7 @@ GraceHashJoin::GraceHashJoin(
, left_key_names(table_join->getOnlyClause().key_names_left)
, right_key_names(table_join->getOnlyClause().key_names_right)
, tmp_data(std::make_unique<TemporaryDataOnDisk>(tmp_data_, CurrentMetrics::TemporaryFilesForJoin))
, hash_join(makeInMemoryJoin())
, hash_join(makeInMemoryJoin("grace0"))
, hash_join_sample_block(hash_join->savedBlockSample())
{
if (!GraceHashJoin::isSupported(table_join))
@ -424,8 +424,10 @@ void GraceHashJoin::initialize(const Block & sample_block)
{
left_sample_block = sample_block.cloneEmpty();
output_sample_block = left_sample_block.cloneEmpty();
ExtraBlockPtr not_processed;
ExtraBlockPtr not_processed = nullptr;
hash_join->joinBlock(output_sample_block, not_processed);
if (not_processed)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unhandled not processed block in GraceHashJoin");
initBuckets();
}
@ -447,9 +449,6 @@ void GraceHashJoin::joinBlock(Block & block, std::shared_ptr<ExtraBlock> & not_p
block = std::move(blocks[current_bucket->idx]);
hash_join->joinBlock(block, not_processed);
if (not_processed)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unhandled not processed block in GraceHashJoin");
flushBlocksToBuckets<JoinTableSide::Left>(blocks, buckets);
}
@ -528,6 +527,29 @@ public:
Block nextImpl() override
{
ExtraBlockPtr not_processed = nullptr;
{
std::lock_guard lock(extra_block_mutex);
if (!not_processed_blocks.empty())
{
not_processed = std::move(not_processed_blocks.front());
not_processed_blocks.pop_front();
}
}
if (not_processed)
{
Block block = std::move(not_processed->block);
hash_join->joinBlock(block, not_processed);
if (not_processed)
{
std::lock_guard lock(extra_block_mutex);
not_processed_blocks.emplace_back(std::move(not_processed));
}
return block;
}
Block block;
size_t num_buckets = buckets.size();
size_t current_idx = buckets[current_bucket]->idx;
@ -565,12 +587,12 @@ public:
}
} while (block.rows() == 0);
ExtraBlockPtr not_processed;
hash_join->joinBlock(block, not_processed);
if (not_processed)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unsupported hash join type");
{
std::lock_guard lock(extra_block_mutex);
not_processed_blocks.emplace_back(std::move(not_processed));
}
return block;
}
@ -582,6 +604,9 @@ public:
Names left_key_names;
Names right_key_names;
std::mutex extra_block_mutex;
std::list<ExtraBlockPtr> not_processed_blocks TSA_GUARDED_BY(extra_block_mutex);
};
IBlocksStreamPtr GraceHashJoin::getDelayedBlocks()
@ -611,7 +636,7 @@ IBlocksStreamPtr GraceHashJoin::getDelayedBlocks()
continue;
}
hash_join = makeInMemoryJoin(prev_keys_num);
hash_join = makeInMemoryJoin(fmt::format("grace{}", bucket_idx), prev_keys_num);
auto right_reader = current_bucket->startJoining();
size_t num_rows = 0; /// count rows that were written and rehashed
while (Block block = right_reader.read())
@ -632,10 +657,9 @@ IBlocksStreamPtr GraceHashJoin::getDelayedBlocks()
return nullptr;
}
GraceHashJoin::InMemoryJoinPtr GraceHashJoin::makeInMemoryJoin(size_t reserve_num)
GraceHashJoin::InMemoryJoinPtr GraceHashJoin::makeInMemoryJoin(const String & bucket_id, size_t reserve_num)
{
auto ret = std::make_unique<InMemoryJoin>(table_join, right_sample_block, any_take_last_row, reserve_num);
return std::move(ret);
return std::make_unique<HashJoin>(table_join, right_sample_block, any_take_last_row, reserve_num, bucket_id);
}
Block GraceHashJoin::prepareRightBlock(const Block & block)
@ -661,7 +685,7 @@ void GraceHashJoin::addBlockToJoinImpl(Block block)
{
std::lock_guard lock(hash_join_mutex);
if (!hash_join)
hash_join = makeInMemoryJoin();
hash_join = makeInMemoryJoin(fmt::format("grace{}", bucket_index));
// buckets size has been changed in other threads. Need to scatter current_block again.
// rehash could only happen under hash_join_mutex's scope.
@ -705,7 +729,7 @@ void GraceHashJoin::addBlockToJoinImpl(Block block)
current_block = concatenateBlocks(current_blocks);
}
hash_join = makeInMemoryJoin(prev_keys_num);
hash_join = makeInMemoryJoin(fmt::format("grace{}", bucket_index), prev_keys_num);
if (current_block.rows() > 0)
hash_join->addBlockToJoin(current_block, /* check_limits = */ false);

View File

@ -44,9 +44,8 @@ class GraceHashJoin final : public IJoin
{
class FileBucket;
class DelayedBlocks;
using InMemoryJoin = HashJoin;
using InMemoryJoinPtr = std::shared_ptr<InMemoryJoin>;
using InMemoryJoinPtr = std::shared_ptr<HashJoin>;
public:
using BucketPtr = std::shared_ptr<FileBucket>;
@ -91,7 +90,7 @@ public:
private:
void initBuckets();
/// Create empty join for in-memory processing.
InMemoryJoinPtr makeInMemoryJoin(size_t reserve_num = 0);
InMemoryJoinPtr makeInMemoryJoin(const String & bucket_id, size_t reserve_num = 0);
/// Add right table block to the @join. Calls @rehash on overflow.
void addBlockToJoinImpl(Block block);

View File

@ -233,7 +233,8 @@ static void correctNullabilityInplace(ColumnWithTypeAndName & column, bool nulla
JoinCommon::removeColumnNullability(column);
}
HashJoin::HashJoin(std::shared_ptr<TableJoin> table_join_, const Block & right_sample_block_, bool any_take_last_row_, size_t reserve_num)
HashJoin::HashJoin(std::shared_ptr<TableJoin> table_join_, const Block & right_sample_block_,
bool any_take_last_row_, size_t reserve_num, const String & instance_id_)
: table_join(table_join_)
, kind(table_join->kind())
, strictness(table_join->strictness())
@ -241,10 +242,11 @@ HashJoin::HashJoin(std::shared_ptr<TableJoin> table_join_, const Block & right_s
, asof_inequality(table_join->getAsofInequality())
, data(std::make_shared<RightTableData>())
, right_sample_block(right_sample_block_)
, instance_log_id(!instance_id_.empty() ? "(" + instance_id_ + ") " : "")
, log(&Poco::Logger::get("HashJoin"))
{
LOG_DEBUG(log, "({}) Datatype: {}, kind: {}, strictness: {}, right header: {}", fmt::ptr(this), data->type, kind, strictness, right_sample_block.dumpStructure());
LOG_DEBUG(log, "({}) Keys: {}", fmt::ptr(this), TableJoin::formatClauses(table_join->getClauses(), true));
LOG_TRACE(log, "{}Keys: {}, datatype: {}, kind: {}, strictness: {}, right header: {}",
instance_log_id, TableJoin::formatClauses(table_join->getClauses(), true), data->type, kind, strictness, right_sample_block.dumpStructure());
if (isCrossOrComma(kind))
{
@ -1165,9 +1167,27 @@ public:
std::vector<JoinOnKeyColumns> join_on_keys;
size_t max_joined_block_rows = 0;
size_t rows_to_add;
std::unique_ptr<IColumn::Offsets> offsets_to_replicate;
bool need_filter = false;
IColumn::Filter filter;
void reserve(bool need_replicate)
{
if (!max_joined_block_rows)
return;
/// Do not allow big allocations when user set max_joined_block_rows to huge value
size_t reserve_size = std::min<size_t>(max_joined_block_rows, DEFAULT_BLOCK_SIZE * 2);
if (need_replicate)
/// Reserve 10% more space for columns, because some rows can be repeated
reserve_size = static_cast<size_t>(1.1 * reserve_size);
for (auto & column : columns)
column->reserve(reserve_size);
}
private:
std::vector<TypeAndName> type_name;
@ -1356,7 +1376,7 @@ void setUsed(IColumn::Filter & filter [[maybe_unused]], size_t pos [[maybe_unuse
/// Joins right table columns which indexes are present in right_indexes using specified map.
/// Makes filter (1 if row presented in right table) and returns offsets to replicate (for ALL JOINS).
template <JoinKind KIND, JoinStrictness STRICTNESS, typename KeyGetter, typename Map, bool need_filter, bool multiple_disjuncts>
NO_INLINE IColumn::Filter joinRightColumns(
NO_INLINE size_t joinRightColumns(
std::vector<KeyGetter> && key_getter_vector,
const std::vector<const Map *> & mapv,
AddedColumns & added_columns,
@ -1365,9 +1385,8 @@ NO_INLINE IColumn::Filter joinRightColumns(
constexpr JoinFeatures<KIND, STRICTNESS> join_features;
size_t rows = added_columns.rows_to_add;
IColumn::Filter filter;
if constexpr (need_filter)
filter = IColumn::Filter(rows, 0);
added_columns.filter = IColumn::Filter(rows, 0);
Arena pool;
@ -1375,9 +1394,20 @@ NO_INLINE IColumn::Filter joinRightColumns(
added_columns.offsets_to_replicate = std::make_unique<IColumn::Offsets>(rows);
IColumn::Offset current_offset = 0;
for (size_t i = 0; i < rows; ++i)
size_t max_joined_block_rows = added_columns.max_joined_block_rows;
size_t i = 0;
for (; i < rows; ++i)
{
if constexpr (join_features.need_replication)
{
if (unlikely(current_offset > max_joined_block_rows))
{
added_columns.offsets_to_replicate->resize_assume_reserved(i);
added_columns.filter.resize_assume_reserved(i);
break;
}
}
bool right_row_found = false;
KnownRowsHolder<multiple_disjuncts> known_rows;
@ -1402,7 +1432,7 @@ NO_INLINE IColumn::Filter joinRightColumns(
auto row_ref = mapped->findAsof(left_asof_key, i);
if (row_ref.block)
{
setUsed<need_filter>(filter, i);
setUsed<need_filter>(added_columns.filter, i);
if constexpr (multiple_disjuncts)
used_flags.template setUsed<join_features.need_flags, multiple_disjuncts>(row_ref.block, row_ref.row_num, 0);
else
@ -1415,7 +1445,7 @@ NO_INLINE IColumn::Filter joinRightColumns(
}
else if constexpr (join_features.is_all_join)
{
setUsed<need_filter>(filter, i);
setUsed<need_filter>(added_columns.filter, i);
used_flags.template setUsed<join_features.need_flags, multiple_disjuncts>(find_result);
auto used_flags_opt = join_features.need_flags ? &used_flags : nullptr;
addFoundRowAll<Map, join_features.add_missing>(mapped, added_columns, current_offset, known_rows, used_flags_opt);
@ -1427,7 +1457,7 @@ NO_INLINE IColumn::Filter joinRightColumns(
if (used_once)
{
auto used_flags_opt = join_features.need_flags ? &used_flags : nullptr;
setUsed<need_filter>(filter, i);
setUsed<need_filter>(added_columns.filter, i);
addFoundRowAll<Map, join_features.add_missing>(mapped, added_columns, current_offset, known_rows, used_flags_opt);
}
}
@ -1438,7 +1468,7 @@ NO_INLINE IColumn::Filter joinRightColumns(
/// Use first appeared left key only
if (used_once)
{
setUsed<need_filter>(filter, i);
setUsed<need_filter>(added_columns.filter, i);
added_columns.appendFromBlock<join_features.add_missing>(*mapped.block, mapped.row_num);
}
@ -1455,7 +1485,7 @@ NO_INLINE IColumn::Filter joinRightColumns(
}
else /// ANY LEFT, SEMI LEFT, old ANY (RightAny)
{
setUsed<need_filter>(filter, i);
setUsed<need_filter>(added_columns.filter, i);
used_flags.template setUsed<join_features.need_flags, multiple_disjuncts>(find_result);
added_columns.appendFromBlock<join_features.add_missing>(*mapped.block, mapped.row_num);
@ -1470,7 +1500,7 @@ NO_INLINE IColumn::Filter joinRightColumns(
if (!right_row_found)
{
if constexpr (join_features.is_anti_join && join_features.left)
setUsed<need_filter>(filter, i);
setUsed<need_filter>(added_columns.filter, i);
addNotFoundRow<join_features.add_missing, join_features.need_replication>(added_columns, current_offset);
}
@ -1481,11 +1511,11 @@ NO_INLINE IColumn::Filter joinRightColumns(
}
added_columns.applyLazyDefaults();
return filter;
return i;
}
template <JoinKind KIND, JoinStrictness STRICTNESS, typename KeyGetter, typename Map, bool need_filter>
IColumn::Filter joinRightColumnsSwitchMultipleDisjuncts(
size_t joinRightColumnsSwitchMultipleDisjuncts(
std::vector<KeyGetter> && key_getter_vector,
const std::vector<const Map *> & mapv,
AddedColumns & added_columns,
@ -1497,7 +1527,7 @@ IColumn::Filter joinRightColumnsSwitchMultipleDisjuncts(
}
template <JoinKind KIND, JoinStrictness STRICTNESS, typename KeyGetter, typename Map>
IColumn::Filter joinRightColumnsSwitchNullability(
size_t joinRightColumnsSwitchNullability(
std::vector<KeyGetter> && key_getter_vector,
const std::vector<const Map *> & mapv,
AddedColumns & added_columns,
@ -1514,7 +1544,7 @@ IColumn::Filter joinRightColumnsSwitchNullability(
}
template <JoinKind KIND, JoinStrictness STRICTNESS, typename Maps>
IColumn::Filter switchJoinRightColumns(
size_t switchJoinRightColumns(
const std::vector<const Maps *> & mapv,
AddedColumns & added_columns,
HashJoin::Type type,
@ -1597,10 +1627,27 @@ ColumnWithTypeAndName copyLeftKeyColumnToRight(
return right_column;
}
/// Cut first num_rows rows from block in place and returns block with remaining rows
Block sliceBlock(Block & block, size_t num_rows)
{
size_t total_rows = block.rows();
if (num_rows >= total_rows)
return {};
size_t remaining_rows = total_rows - num_rows;
Block remaining_block = block.cloneEmpty();
for (size_t i = 0; i < block.columns(); ++i)
{
auto & col = block.getByPosition(i);
remaining_block.getByPosition(i).column = col.column->cut(num_rows, remaining_rows);
col.column = col.column->cut(0, num_rows);
}
return remaining_block;
}
} /// nameless
template <JoinKind KIND, JoinStrictness STRICTNESS, typename Maps>
void HashJoin::joinBlockImpl(
Block HashJoin::joinBlockImpl(
Block & block,
const Block & block_with_columns_to_add,
const std::vector<const Maps *> & maps_,
@ -1642,8 +1689,16 @@ void HashJoin::joinBlockImpl(
bool has_required_right_keys = (required_right_keys.columns() != 0);
added_columns.need_filter = join_features.need_filter || has_required_right_keys;
added_columns.max_joined_block_rows = table_join->maxJoinedBlockRows();
if (!added_columns.max_joined_block_rows)
added_columns.max_joined_block_rows = std::numeric_limits<size_t>::max();
else
added_columns.reserve(join_features.need_replication);
IColumn::Filter row_filter = switchJoinRightColumns<KIND, STRICTNESS>(maps_, added_columns, data->type, used_flags);
size_t num_joined = switchJoinRightColumns<KIND, STRICTNESS>(maps_, added_columns, data->type, used_flags);
/// Do not hold memory for join_on_keys anymore
added_columns.join_on_keys.clear();
Block remaining_block = sliceBlock(block, num_joined);
for (size_t i = 0; i < added_columns.size(); ++i)
block.insert(added_columns.moveColumn(i));
@ -1654,7 +1709,7 @@ void HashJoin::joinBlockImpl(
{
/// If ANY INNER | RIGHT JOIN - filter all the columns except the new ones.
for (size_t i = 0; i < existing_columns; ++i)
block.safeGetByPosition(i).column = block.safeGetByPosition(i).column->filter(row_filter, -1);
block.safeGetByPosition(i).column = block.safeGetByPosition(i).column->filter(added_columns.filter, -1);
/// Add join key columns from right block if needed using value from left table because of equality
for (size_t i = 0; i < required_right_keys.columns(); ++i)
@ -1688,7 +1743,7 @@ void HashJoin::joinBlockImpl(
continue;
const auto & left_column = block.getByName(required_right_keys_sources[i]);
auto right_col = copyLeftKeyColumnToRight(right_key.type, right_col_name, left_column, &row_filter);
auto right_col = copyLeftKeyColumnToRight(right_key.type, right_col_name, left_column, &added_columns.filter);
block.insert(std::move(right_col));
if constexpr (join_features.need_replication)
@ -1709,6 +1764,8 @@ void HashJoin::joinBlockImpl(
for (size_t pos : right_keys_to_replicate)
block.safeGetByPosition(pos).column = block.safeGetByPosition(pos).column->replicate(*offsets_to_replicate);
}
return remaining_block;
}
void HashJoin::joinBlockImplCross(Block & block, ExtraBlockPtr & not_processed) const
@ -1885,7 +1942,11 @@ void HashJoin::joinBlock(Block & block, ExtraBlockPtr & not_processed)
if (joinDispatch(kind, strictness, maps_vector, [&](auto kind_, auto strictness_, auto & maps_vector_)
{
joinBlockImpl<kind_, strictness_>(block, sample_block_with_columns_to_add, maps_vector_);
Block remaining_block = joinBlockImpl<kind_, strictness_>(block, sample_block_with_columns_to_add, maps_vector_);
if (remaining_block.rows())
not_processed = std::make_shared<ExtraBlock>(ExtraBlock{std::move(remaining_block)});
else
not_processed.reset();
}))
{
/// Joined
@ -1899,10 +1960,10 @@ HashJoin::~HashJoin()
{
if (!data)
{
LOG_TRACE(log, "({}) Join data has been already released", fmt::ptr(this));
LOG_TRACE(log, "{}Join data has been already released", instance_log_id);
return;
}
LOG_TRACE(log, "({}) Join data is being destroyed, {} bytes and {} rows in hash table", fmt::ptr(this), getTotalByteCount(), getTotalRowCount());
LOG_TRACE(log, "{}Join data is being destroyed, {} bytes and {} rows in hash table", instance_log_id, getTotalByteCount(), getTotalRowCount());
}
template <typename Mapped>
@ -2183,7 +2244,7 @@ void HashJoin::reuseJoinedData(const HashJoin & join)
BlocksList HashJoin::releaseJoinedBlocks(bool restructure)
{
LOG_TRACE(log, "({}) Join data is being released, {} bytes and {} rows in hash table", fmt::ptr(this), getTotalByteCount(), getTotalRowCount());
LOG_TRACE(log, "{}Join data is being released, {} bytes and {} rows in hash table", instance_log_id, getTotalByteCount(), getTotalRowCount());
BlocksList right_blocks = std::move(data->blocks);
if (!restructure)

View File

@ -147,7 +147,8 @@ class HashJoin : public IJoin
{
public:
HashJoin(
std::shared_ptr<TableJoin> table_join_, const Block & right_sample_block, bool any_take_last_row_ = false, size_t reserve_num = 0);
std::shared_ptr<TableJoin> table_join_, const Block & right_sample_block,
bool any_take_last_row_ = false, size_t reserve_num = 0, const String & instance_id_ = "");
~HashJoin() override;
@ -436,6 +437,10 @@ private:
bool shrink_blocks = false;
Int64 memory_usage_before_adding_blocks = 0;
/// Identifier to distinguish different HashJoin instances in logs
/// Several instances can be created, for example, in GraceHashJoin to handle different buckets
String instance_log_id;
Poco::Logger * log;
/// Should be set via setLock to protect hash table from modification from StorageJoin
@ -447,7 +452,7 @@ private:
void initRightBlockStructure(Block & saved_block_sample);
template <JoinKind KIND, JoinStrictness STRICTNESS, typename Maps>
void joinBlockImpl(
Block joinBlockImpl(
Block & block,
const Block & block_with_columns_to_add,
const std::vector<const Maps *> & maps_,

View File

@ -0,0 +1,64 @@
<test>
<settings>
<max_threads>16</max_threads>
<max_memory_usage>10G</max_memory_usage>
</settings>
<substitutions>
<substitution>
<name>settings</name>
<values>
<value>join_algorithm='hash'</value>
<value>join_algorithm='grace_hash'</value>
</values>
</substitution>
</substitutions>
<create_query>
create table test_left
(
k1 String,
v1 String
)
engine = Memory();
</create_query>
<create_query>
create table test_right
(
k1 String,
v1 String,
v2 String,
v3 String,
v4 String,
v5 String,
v6 String,
v7 String,
v8 String,
v9 String
)
engine = Memory();
</create_query>
<fill_query>insert into test_left SELECT toString(number % 20), toString(number) from system.numbers limit 10000;</fill_query>
<fill_query>
insert into test_right
SELECT
toString(number % 20),
toString(number * 10000),
toString(number * 10000 + 1),
toString(number * 10000 + 2),
toString(number * 10000 + 3),
toString(number * 10000 + 4),
toString(number * 10000 + 5),
toString(number * 10000 + 6),
toString(number * 10000 + 7),
toString(number * 10000 + 8)
from system.numbers limit 10000;
</fill_query>
<query>
select * from test_left all inner join test_right on test_left.k1 = test_right.k1 SETTINGS {settings} format Null
</query>
<drop_query>DROP TABLE IF EXISTS test_left</drop_query>
<drop_query>DROP TABLE IF EXISTS test_right</drop_query>
</test>