From 8429162038354296235ee4bede827ea78fe37d1c Mon Sep 17 00:00:00 2001 From: vdimir Date: Mon, 20 Nov 2023 14:56:05 +0000 Subject: [PATCH 1/6] HashJoin respects max_joined_block_size_rows --- src/Interpreters/HashJoin.cpp | 79 ++++++++++++++++++++++++++--------- src/Interpreters/HashJoin.h | 2 +- 2 files changed, 60 insertions(+), 21 deletions(-) diff --git a/src/Interpreters/HashJoin.cpp b/src/Interpreters/HashJoin.cpp index 0d7c40cc27d..cb0dc6db28e 100644 --- a/src/Interpreters/HashJoin.cpp +++ b/src/Interpreters/HashJoin.cpp @@ -1165,9 +1165,11 @@ public: std::vector join_on_keys; + size_t max_joined_block_rows = 0; size_t rows_to_add; std::unique_ptr offsets_to_replicate; bool need_filter = false; + IColumn::Filter filter; private: std::vector type_name; @@ -1356,7 +1358,7 @@ void setUsed(IColumn::Filter & filter [[maybe_unused]], size_t pos [[maybe_unuse /// Joins right table columns which indexes are present in right_indexes using specified map. /// Makes filter (1 if row presented in right table) and returns offsets to replicate (for ALL JOINS). template -NO_INLINE IColumn::Filter joinRightColumns( +NO_INLINE size_t joinRightColumns( std::vector && key_getter_vector, const std::vector & mapv, AddedColumns & added_columns, @@ -1365,9 +1367,8 @@ NO_INLINE IColumn::Filter joinRightColumns( constexpr JoinFeatures join_features; size_t rows = added_columns.rows_to_add; - IColumn::Filter filter; if constexpr (need_filter) - filter = IColumn::Filter(rows, 0); + added_columns.filter = IColumn::Filter(rows, 0); Arena pool; @@ -1375,9 +1376,20 @@ NO_INLINE IColumn::Filter joinRightColumns( added_columns.offsets_to_replicate = std::make_unique(rows); IColumn::Offset current_offset = 0; - - for (size_t i = 0; i < rows; ++i) + size_t max_joined_block_rows = added_columns.max_joined_block_rows; + size_t i = 0; + for (; i < rows; ++i) { + if constexpr (join_features.need_replication) + { + if (unlikely(current_offset > max_joined_block_rows)) + { + added_columns.offsets_to_replicate->resize_assume_reserved(i); + added_columns.filter.resize_assume_reserved(i); + break; + } + } + bool right_row_found = false; KnownRowsHolder known_rows; @@ -1402,7 +1414,7 @@ NO_INLINE IColumn::Filter joinRightColumns( auto row_ref = mapped->findAsof(left_asof_key, i); if (row_ref.block) { - setUsed(filter, i); + setUsed(added_columns.filter, i); if constexpr (multiple_disjuncts) used_flags.template setUsed(row_ref.block, row_ref.row_num, 0); else @@ -1415,7 +1427,7 @@ NO_INLINE IColumn::Filter joinRightColumns( } else if constexpr (join_features.is_all_join) { - setUsed(filter, i); + setUsed(added_columns.filter, i); used_flags.template setUsed(find_result); auto used_flags_opt = join_features.need_flags ? &used_flags : nullptr; addFoundRowAll(mapped, added_columns, current_offset, known_rows, used_flags_opt); @@ -1427,7 +1439,7 @@ NO_INLINE IColumn::Filter joinRightColumns( if (used_once) { auto used_flags_opt = join_features.need_flags ? &used_flags : nullptr; - setUsed(filter, i); + setUsed(added_columns.filter, i); addFoundRowAll(mapped, added_columns, current_offset, known_rows, used_flags_opt); } } @@ -1438,7 +1450,7 @@ NO_INLINE IColumn::Filter joinRightColumns( /// Use first appeared left key only if (used_once) { - setUsed(filter, i); + setUsed(added_columns.filter, i); added_columns.appendFromBlock(*mapped.block, mapped.row_num); } @@ -1455,7 +1467,7 @@ NO_INLINE IColumn::Filter joinRightColumns( } else /// ANY LEFT, SEMI LEFT, old ANY (RightAny) { - setUsed(filter, i); + setUsed(added_columns.filter, i); used_flags.template setUsed(find_result); added_columns.appendFromBlock(*mapped.block, mapped.row_num); @@ -1470,7 +1482,7 @@ NO_INLINE IColumn::Filter joinRightColumns( if (!right_row_found) { if constexpr (join_features.is_anti_join && join_features.left) - setUsed(filter, i); + setUsed(added_columns.filter, i); addNotFoundRow(added_columns, current_offset); } @@ -1481,11 +1493,11 @@ NO_INLINE IColumn::Filter joinRightColumns( } added_columns.applyLazyDefaults(); - return filter; + return i; } template -IColumn::Filter joinRightColumnsSwitchMultipleDisjuncts( +size_t joinRightColumnsSwitchMultipleDisjuncts( std::vector && key_getter_vector, const std::vector & mapv, AddedColumns & added_columns, @@ -1497,7 +1509,7 @@ IColumn::Filter joinRightColumnsSwitchMultipleDisjuncts( } template -IColumn::Filter joinRightColumnsSwitchNullability( +size_t joinRightColumnsSwitchNullability( std::vector && key_getter_vector, const std::vector & mapv, AddedColumns & added_columns, @@ -1514,7 +1526,7 @@ IColumn::Filter joinRightColumnsSwitchNullability( } template -IColumn::Filter switchJoinRightColumns( +size_t switchJoinRightColumns( const std::vector & mapv, AddedColumns & added_columns, HashJoin::Type type, @@ -1597,10 +1609,27 @@ ColumnWithTypeAndName copyLeftKeyColumnToRight( return right_column; } +/// Cut first num_rows rows from block in place and returns block with remaining rows +Block sliceBlock(Block & block, size_t num_rows) +{ + size_t total_rows = block.rows(); + if (num_rows >= total_rows) + return {}; + + Block remaining_block = block.cloneEmpty(); + for (size_t i = 0; i < block.columns(); ++i) + { + auto & col = block.getByPosition(i); + remaining_block.getByPosition(i).column = col.column->cut(num_rows, total_rows - num_rows); + col.column = col.column->cut(0, num_rows); + } + return remaining_block; +} + } /// nameless template -void HashJoin::joinBlockImpl( +Block HashJoin::joinBlockImpl( Block & block, const Block & block_with_columns_to_add, const std::vector & maps_, @@ -1642,8 +1671,12 @@ void HashJoin::joinBlockImpl( bool has_required_right_keys = (required_right_keys.columns() != 0); added_columns.need_filter = join_features.need_filter || has_required_right_keys; + added_columns.max_joined_block_rows = table_join->maxJoinedBlockRows(); + if (!added_columns.max_joined_block_rows) + added_columns.max_joined_block_rows = std::numeric_limits::max(); - IColumn::Filter row_filter = switchJoinRightColumns(maps_, added_columns, data->type, used_flags); + size_t num_joined = switchJoinRightColumns(maps_, added_columns, data->type, used_flags); + Block remaining_block = sliceBlock(block, num_joined); for (size_t i = 0; i < added_columns.size(); ++i) block.insert(added_columns.moveColumn(i)); @@ -1654,7 +1687,7 @@ void HashJoin::joinBlockImpl( { /// If ANY INNER | RIGHT JOIN - filter all the columns except the new ones. for (size_t i = 0; i < existing_columns; ++i) - block.safeGetByPosition(i).column = block.safeGetByPosition(i).column->filter(row_filter, -1); + block.safeGetByPosition(i).column = block.safeGetByPosition(i).column->filter(added_columns.filter, -1); /// Add join key columns from right block if needed using value from left table because of equality for (size_t i = 0; i < required_right_keys.columns(); ++i) @@ -1688,7 +1721,7 @@ void HashJoin::joinBlockImpl( continue; const auto & left_column = block.getByName(required_right_keys_sources[i]); - auto right_col = copyLeftKeyColumnToRight(right_key.type, right_col_name, left_column, &row_filter); + auto right_col = copyLeftKeyColumnToRight(right_key.type, right_col_name, left_column, &added_columns.filter); block.insert(std::move(right_col)); if constexpr (join_features.need_replication) @@ -1709,6 +1742,8 @@ void HashJoin::joinBlockImpl( for (size_t pos : right_keys_to_replicate) block.safeGetByPosition(pos).column = block.safeGetByPosition(pos).column->replicate(*offsets_to_replicate); } + + return remaining_block; } void HashJoin::joinBlockImplCross(Block & block, ExtraBlockPtr & not_processed) const @@ -1885,7 +1920,11 @@ void HashJoin::joinBlock(Block & block, ExtraBlockPtr & not_processed) if (joinDispatch(kind, strictness, maps_vector, [&](auto kind_, auto strictness_, auto & maps_vector_) { - joinBlockImpl(block, sample_block_with_columns_to_add, maps_vector_); + Block remaining_block = joinBlockImpl(block, sample_block_with_columns_to_add, maps_vector_); + if (remaining_block.rows()) + not_processed = std::make_shared(ExtraBlock{std::move(remaining_block)}); + else + not_processed.reset(); })) { /// Joined diff --git a/src/Interpreters/HashJoin.h b/src/Interpreters/HashJoin.h index d125e56057f..a2cd5eca618 100644 --- a/src/Interpreters/HashJoin.h +++ b/src/Interpreters/HashJoin.h @@ -447,7 +447,7 @@ private: void initRightBlockStructure(Block & saved_block_sample); template - void joinBlockImpl( + Block joinBlockImpl( Block & block, const Block & block_with_columns_to_add, const std::vector & maps_, From afda7f10796b20561a5fc9c72f78e94f25f5a142 Mon Sep 17 00:00:00 2001 From: vdimir Date: Mon, 20 Nov 2023 14:59:37 +0000 Subject: [PATCH 2/6] Add tests/performance/hashjoin_with_large_output.xml Co-authored-by: liuneng <1398775315@qq.com> --- .../hashjoin_with_large_output.xml | 64 +++++++++++++++++++ 1 file changed, 64 insertions(+) create mode 100644 tests/performance/hashjoin_with_large_output.xml diff --git a/tests/performance/hashjoin_with_large_output.xml b/tests/performance/hashjoin_with_large_output.xml new file mode 100644 index 00000000000..f4b61c15f82 --- /dev/null +++ b/tests/performance/hashjoin_with_large_output.xml @@ -0,0 +1,64 @@ + + + 16 + 10G + + + + + settings + + join_algorithm='hash' + join_algorithm='grace_hash' + + + + + + create table test_left + ( + k1 String, + v1 String + ) + engine = Memory(); + + + create table test_right + ( + k1 String, + v1 String, + v2 String, + v3 String, + v4 String, + v5 String, + v6 String, + v7 String, + v8 String, + v9 String + ) + engine = Memory(); + + insert into test_left SELECT toString(number % 20), toString(number) from system.numbers limit 10000; + + insert into test_right + SELECT + toString(number % 20), + toString(number * 10000), + toString(number * 10000 + 1), + toString(number * 10000 + 2), + toString(number * 10000 + 3), + toString(number * 10000 + 4), + toString(number * 10000 + 5), + toString(number * 10000 + 6), + toString(number * 10000 + 7), + toString(number * 10000 + 8) + from system.numbers limit 10000; + + + + select * from test_left all inner join test_right on test_left.k1 = test_right.k1 SETTINGS {settings} format Null + + + DROP TABLE IF EXISTS test_left + DROP TABLE IF EXISTS test_right + From 6c9aa2ced53402b3322fa8cb178dddc778df838d Mon Sep 17 00:00:00 2001 From: vdimir Date: Mon, 20 Nov 2023 16:16:37 +0000 Subject: [PATCH 3/6] handle not_processed in GraceHashJoin --- src/Interpreters/GraceHashJoin.cpp | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/src/Interpreters/GraceHashJoin.cpp b/src/Interpreters/GraceHashJoin.cpp index 89ea3a326cc..3f9216d9346 100644 --- a/src/Interpreters/GraceHashJoin.cpp +++ b/src/Interpreters/GraceHashJoin.cpp @@ -424,8 +424,10 @@ void GraceHashJoin::initialize(const Block & sample_block) { left_sample_block = sample_block.cloneEmpty(); output_sample_block = left_sample_block.cloneEmpty(); - ExtraBlockPtr not_processed; + ExtraBlockPtr not_processed = nullptr; hash_join->joinBlock(output_sample_block, not_processed); + if (not_processed) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Unhandled not processed block in GraceHashJoin"); initBuckets(); } @@ -447,9 +449,6 @@ void GraceHashJoin::joinBlock(Block & block, std::shared_ptr & not_p block = std::move(blocks[current_bucket->idx]); hash_join->joinBlock(block, not_processed); - if (not_processed) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Unhandled not processed block in GraceHashJoin"); - flushBlocksToBuckets(blocks, buckets); } @@ -528,6 +527,13 @@ public: Block nextImpl() override { + if (not_processed) + { + Block block = not_processed->block; + hash_join->joinBlock(block, not_processed); + return block; + } + Block block; size_t num_buckets = buckets.size(); size_t current_idx = buckets[current_bucket]->idx; @@ -565,12 +571,7 @@ public: } } while (block.rows() == 0); - ExtraBlockPtr not_processed; hash_join->joinBlock(block, not_processed); - - if (not_processed) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Unsupported hash join type"); - return block; } @@ -582,6 +583,8 @@ public: Names left_key_names; Names right_key_names; + + ExtraBlockPtr not_processed = nullptr; }; IBlocksStreamPtr GraceHashJoin::getDelayedBlocks() From 47e46e0c50556c99ac23e53b5f6d904be6332e8d Mon Sep 17 00:00:00 2001 From: vdimir Date: Tue, 21 Nov 2023 13:36:53 +0000 Subject: [PATCH 4/6] Fix race in GraceHashJoin::DelayedBlocks --- src/Interpreters/GraceHashJoin.cpp | 26 ++++++++++++++++++++++++-- 1 file changed, 24 insertions(+), 2 deletions(-) diff --git a/src/Interpreters/GraceHashJoin.cpp b/src/Interpreters/GraceHashJoin.cpp index 3f9216d9346..c77c708d0df 100644 --- a/src/Interpreters/GraceHashJoin.cpp +++ b/src/Interpreters/GraceHashJoin.cpp @@ -527,10 +527,26 @@ public: Block nextImpl() override { + ExtraBlockPtr not_processed = nullptr; + + { + std::lock_guard lock(extra_block_mutex); + if (!not_processed_blocks.empty()) + { + not_processed = std::move(not_processed_blocks.front()); + not_processed_blocks.pop_front(); + } + } + if (not_processed) { - Block block = not_processed->block; + Block block = std::move(not_processed->block); hash_join->joinBlock(block, not_processed); + if (not_processed) + { + std::lock_guard lock(extra_block_mutex); + not_processed_blocks.emplace_back(std::move(not_processed)); + } return block; } @@ -572,6 +588,11 @@ public: } while (block.rows() == 0); hash_join->joinBlock(block, not_processed); + if (not_processed) + { + std::lock_guard lock(extra_block_mutex); + not_processed_blocks.emplace_back(std::move(not_processed)); + } return block; } @@ -584,7 +605,8 @@ public: Names left_key_names; Names right_key_names; - ExtraBlockPtr not_processed = nullptr; + std::mutex extra_block_mutex; + std::list not_processed_blocks TSA_GUARDED_BY(extra_block_mutex); }; IBlocksStreamPtr GraceHashJoin::getDelayedBlocks() From 4309d4573066efe5e7137c68d70dee6189c82d74 Mon Sep 17 00:00:00 2001 From: vdimir Date: Wed, 22 Nov 2023 11:59:53 +0000 Subject: [PATCH 5/6] Reserve memory in AddedColums --- src/Interpreters/HashJoin.cpp | 24 ++++++++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) diff --git a/src/Interpreters/HashJoin.cpp b/src/Interpreters/HashJoin.cpp index cb0dc6db28e..b09eb906e4f 100644 --- a/src/Interpreters/HashJoin.cpp +++ b/src/Interpreters/HashJoin.cpp @@ -1171,6 +1171,22 @@ public: bool need_filter = false; IColumn::Filter filter; + void reserve(bool need_replicate) + { + if (!max_joined_block_rows) + return; + + /// Do not allow big allocations when user set max_joined_block_rows to huge value + size_t reserve_size = std::min(max_joined_block_rows, DEFAULT_BLOCK_SIZE * 2); + + if (need_replicate) + /// Reserve 10% more space for columns, because some rows can be repeated + reserve_size = static_cast(1.1 * reserve_size); + + for (auto & column : columns) + column->reserve(reserve_size); + } + private: std::vector type_name; MutableColumns columns; @@ -1615,12 +1631,12 @@ Block sliceBlock(Block & block, size_t num_rows) size_t total_rows = block.rows(); if (num_rows >= total_rows) return {}; - + size_t remaining_rows = total_rows - num_rows; Block remaining_block = block.cloneEmpty(); for (size_t i = 0; i < block.columns(); ++i) { auto & col = block.getByPosition(i); - remaining_block.getByPosition(i).column = col.column->cut(num_rows, total_rows - num_rows); + remaining_block.getByPosition(i).column = col.column->cut(num_rows, remaining_rows); col.column = col.column->cut(0, num_rows); } return remaining_block; @@ -1674,8 +1690,12 @@ Block HashJoin::joinBlockImpl( added_columns.max_joined_block_rows = table_join->maxJoinedBlockRows(); if (!added_columns.max_joined_block_rows) added_columns.max_joined_block_rows = std::numeric_limits::max(); + else + added_columns.reserve(join_features.need_replication); size_t num_joined = switchJoinRightColumns(maps_, added_columns, data->type, used_flags); + /// Do not hold memory for join_on_keys anymore + added_columns.join_on_keys.clear(); Block remaining_block = sliceBlock(block, num_joined); for (size_t i = 0; i < added_columns.size(); ++i) From 9b137059a32de4ac7a0a2fcf3038b6a46988ef06 Mon Sep 17 00:00:00 2001 From: vdimir Date: Tue, 28 Nov 2023 10:28:11 +0000 Subject: [PATCH 6/6] Better logging in hash join --- src/Interpreters/ConcurrentHashJoin.cpp | 3 ++- src/Interpreters/GraceHashJoin.cpp | 13 ++++++------- src/Interpreters/GraceHashJoin.h | 5 ++--- src/Interpreters/HashJoin.cpp | 14 ++++++++------ src/Interpreters/HashJoin.h | 7 ++++++- 5 files changed, 24 insertions(+), 18 deletions(-) diff --git a/src/Interpreters/ConcurrentHashJoin.cpp b/src/Interpreters/ConcurrentHashJoin.cpp index 1a8e0ad96fa..8e73bc8b484 100644 --- a/src/Interpreters/ConcurrentHashJoin.cpp +++ b/src/Interpreters/ConcurrentHashJoin.cpp @@ -44,7 +44,8 @@ ConcurrentHashJoin::ConcurrentHashJoin(ContextPtr context_, std::shared_ptr(); - inner_hash_join->data = std::make_unique(table_join_, right_sample_block, any_take_last_row_); + + inner_hash_join->data = std::make_unique(table_join_, right_sample_block, any_take_last_row_, 0, fmt::format("concurrent{}", i)); hash_joins.emplace_back(std::move(inner_hash_join)); } } diff --git a/src/Interpreters/GraceHashJoin.cpp b/src/Interpreters/GraceHashJoin.cpp index c77c708d0df..f6fa9b44d57 100644 --- a/src/Interpreters/GraceHashJoin.cpp +++ b/src/Interpreters/GraceHashJoin.cpp @@ -271,7 +271,7 @@ GraceHashJoin::GraceHashJoin( , left_key_names(table_join->getOnlyClause().key_names_left) , right_key_names(table_join->getOnlyClause().key_names_right) , tmp_data(std::make_unique(tmp_data_, CurrentMetrics::TemporaryFilesForJoin)) - , hash_join(makeInMemoryJoin()) + , hash_join(makeInMemoryJoin("grace0")) , hash_join_sample_block(hash_join->savedBlockSample()) { if (!GraceHashJoin::isSupported(table_join)) @@ -636,7 +636,7 @@ IBlocksStreamPtr GraceHashJoin::getDelayedBlocks() continue; } - hash_join = makeInMemoryJoin(prev_keys_num); + hash_join = makeInMemoryJoin(fmt::format("grace{}", bucket_idx), prev_keys_num); auto right_reader = current_bucket->startJoining(); size_t num_rows = 0; /// count rows that were written and rehashed while (Block block = right_reader.read()) @@ -657,10 +657,9 @@ IBlocksStreamPtr GraceHashJoin::getDelayedBlocks() return nullptr; } -GraceHashJoin::InMemoryJoinPtr GraceHashJoin::makeInMemoryJoin(size_t reserve_num) +GraceHashJoin::InMemoryJoinPtr GraceHashJoin::makeInMemoryJoin(const String & bucket_id, size_t reserve_num) { - auto ret = std::make_unique(table_join, right_sample_block, any_take_last_row, reserve_num); - return std::move(ret); + return std::make_unique(table_join, right_sample_block, any_take_last_row, reserve_num, bucket_id); } Block GraceHashJoin::prepareRightBlock(const Block & block) @@ -686,7 +685,7 @@ void GraceHashJoin::addBlockToJoinImpl(Block block) { std::lock_guard lock(hash_join_mutex); if (!hash_join) - hash_join = makeInMemoryJoin(); + hash_join = makeInMemoryJoin(fmt::format("grace{}", bucket_index)); // buckets size has been changed in other threads. Need to scatter current_block again. // rehash could only happen under hash_join_mutex's scope. @@ -730,7 +729,7 @@ void GraceHashJoin::addBlockToJoinImpl(Block block) current_block = concatenateBlocks(current_blocks); } - hash_join = makeInMemoryJoin(prev_keys_num); + hash_join = makeInMemoryJoin(fmt::format("grace{}", bucket_index), prev_keys_num); if (current_block.rows() > 0) hash_join->addBlockToJoin(current_block, /* check_limits = */ false); diff --git a/src/Interpreters/GraceHashJoin.h b/src/Interpreters/GraceHashJoin.h index 44949440467..2cadeee10b9 100644 --- a/src/Interpreters/GraceHashJoin.h +++ b/src/Interpreters/GraceHashJoin.h @@ -44,9 +44,8 @@ class GraceHashJoin final : public IJoin { class FileBucket; class DelayedBlocks; - using InMemoryJoin = HashJoin; - using InMemoryJoinPtr = std::shared_ptr; + using InMemoryJoinPtr = std::shared_ptr; public: using BucketPtr = std::shared_ptr; @@ -91,7 +90,7 @@ public: private: void initBuckets(); /// Create empty join for in-memory processing. - InMemoryJoinPtr makeInMemoryJoin(size_t reserve_num = 0); + InMemoryJoinPtr makeInMemoryJoin(const String & bucket_id, size_t reserve_num = 0); /// Add right table block to the @join. Calls @rehash on overflow. void addBlockToJoinImpl(Block block); diff --git a/src/Interpreters/HashJoin.cpp b/src/Interpreters/HashJoin.cpp index b09eb906e4f..8cbc0647e75 100644 --- a/src/Interpreters/HashJoin.cpp +++ b/src/Interpreters/HashJoin.cpp @@ -233,7 +233,8 @@ static void correctNullabilityInplace(ColumnWithTypeAndName & column, bool nulla JoinCommon::removeColumnNullability(column); } -HashJoin::HashJoin(std::shared_ptr table_join_, const Block & right_sample_block_, bool any_take_last_row_, size_t reserve_num) +HashJoin::HashJoin(std::shared_ptr table_join_, const Block & right_sample_block_, + bool any_take_last_row_, size_t reserve_num, const String & instance_id_) : table_join(table_join_) , kind(table_join->kind()) , strictness(table_join->strictness()) @@ -241,10 +242,11 @@ HashJoin::HashJoin(std::shared_ptr table_join_, const Block & right_s , asof_inequality(table_join->getAsofInequality()) , data(std::make_shared()) , right_sample_block(right_sample_block_) + , instance_log_id(!instance_id_.empty() ? "(" + instance_id_ + ") " : "") , log(&Poco::Logger::get("HashJoin")) { - LOG_DEBUG(log, "({}) Datatype: {}, kind: {}, strictness: {}, right header: {}", fmt::ptr(this), data->type, kind, strictness, right_sample_block.dumpStructure()); - LOG_DEBUG(log, "({}) Keys: {}", fmt::ptr(this), TableJoin::formatClauses(table_join->getClauses(), true)); + LOG_TRACE(log, "{}Keys: {}, datatype: {}, kind: {}, strictness: {}, right header: {}", + instance_log_id, TableJoin::formatClauses(table_join->getClauses(), true), data->type, kind, strictness, right_sample_block.dumpStructure()); if (isCrossOrComma(kind)) { @@ -1958,10 +1960,10 @@ HashJoin::~HashJoin() { if (!data) { - LOG_TRACE(log, "({}) Join data has been already released", fmt::ptr(this)); + LOG_TRACE(log, "{}Join data has been already released", instance_log_id); return; } - LOG_TRACE(log, "({}) Join data is being destroyed, {} bytes and {} rows in hash table", fmt::ptr(this), getTotalByteCount(), getTotalRowCount()); + LOG_TRACE(log, "{}Join data is being destroyed, {} bytes and {} rows in hash table", instance_log_id, getTotalByteCount(), getTotalRowCount()); } template @@ -2242,7 +2244,7 @@ void HashJoin::reuseJoinedData(const HashJoin & join) BlocksList HashJoin::releaseJoinedBlocks(bool restructure) { - LOG_TRACE(log, "({}) Join data is being released, {} bytes and {} rows in hash table", fmt::ptr(this), getTotalByteCount(), getTotalRowCount()); + LOG_TRACE(log, "{}Join data is being released, {} bytes and {} rows in hash table", instance_log_id, getTotalByteCount(), getTotalRowCount()); BlocksList right_blocks = std::move(data->blocks); if (!restructure) diff --git a/src/Interpreters/HashJoin.h b/src/Interpreters/HashJoin.h index a2cd5eca618..284cf5d0e7f 100644 --- a/src/Interpreters/HashJoin.h +++ b/src/Interpreters/HashJoin.h @@ -147,7 +147,8 @@ class HashJoin : public IJoin { public: HashJoin( - std::shared_ptr table_join_, const Block & right_sample_block, bool any_take_last_row_ = false, size_t reserve_num = 0); + std::shared_ptr table_join_, const Block & right_sample_block, + bool any_take_last_row_ = false, size_t reserve_num = 0, const String & instance_id_ = ""); ~HashJoin() override; @@ -436,6 +437,10 @@ private: bool shrink_blocks = false; Int64 memory_usage_before_adding_blocks = 0; + /// Identifier to distinguish different HashJoin instances in logs + /// Several instances can be created, for example, in GraceHashJoin to handle different buckets + String instance_log_id; + Poco::Logger * log; /// Should be set via setLock to protect hash table from modification from StorageJoin