From b0c4e18464dc36584e4034ee1cb6e820de1cecd1 Mon Sep 17 00:00:00 2001 From: vdimir Date: Mon, 19 Dec 2022 15:15:52 +0000 Subject: [PATCH 01/13] Fix double initialization GraceHashJoin::initBuckets --- src/Interpreters/GraceHashJoin.cpp | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/Interpreters/GraceHashJoin.cpp b/src/Interpreters/GraceHashJoin.cpp index b8c6c639e82..a18d0b5e505 100644 --- a/src/Interpreters/GraceHashJoin.cpp +++ b/src/Interpreters/GraceHashJoin.cpp @@ -288,6 +288,9 @@ GraceHashJoin::GraceHashJoin( void GraceHashJoin::initBuckets() { + if (!buckets.empty()) + return; + const auto & settings = context->getSettingsRef(); size_t initial_num_buckets = roundUpToPowerOfTwoOrZero(std::clamp(settings.grace_hash_join_initial_buckets, 1, settings.grace_hash_join_max_buckets)); From efcfcca545e3412f6f948b620c1f2a1cfffe1ce4 Mon Sep 17 00:00:00 2001 From: vdimir Date: Mon, 19 Dec 2022 15:19:49 +0000 Subject: [PATCH 02/13] Fix HashJoin::getTotalByteCount caclulation --- src/Interpreters/GraceHashJoin.cpp | 23 +++++++++----- src/Interpreters/GraceHashJoin.h | 2 +- src/Interpreters/HashJoin.cpp | 50 ++++++++++++++++++++++++------ src/Interpreters/HashJoin.h | 4 +++ 4 files changed, 61 insertions(+), 18 deletions(-) diff --git a/src/Interpreters/GraceHashJoin.cpp b/src/Interpreters/GraceHashJoin.cpp index a18d0b5e505..bac61b8205f 100644 --- a/src/Interpreters/GraceHashJoin.cpp +++ b/src/Interpreters/GraceHashJoin.cpp @@ -327,13 +327,22 @@ bool GraceHashJoin::addJoinedBlock(const Block & block, bool /*check_limits*/) return true; } -bool GraceHashJoin::fitsInMemory() const +bool GraceHashJoin::hasMemoryOverflow() const { /// One row can't be split, avoid loop if (hash_join->getTotalRowCount() < 2) - return true; + return false; - return table_join->sizeLimits().softCheck(hash_join->getTotalRowCount(), hash_join->getTotalByteCount()); + size_t total_rows = hash_join->getTotalRowCount(); + size_t total_bytes = hash_join->getTotalByteCount(); + + bool has_overflow = !table_join->sizeLimits().softCheck(total_rows, total_bytes); + + if (has_overflow) + LOG_TRACE(log, "GraceHashJoin has memory overflow {} / {} bytes, {} / {} rows", + ReadableSize(total_bytes), ReadableSize(table_join->sizeLimits().max_bytes), + total_rows, table_join->sizeLimits().max_rows); + return has_overflow; } GraceHashJoin::Buckets GraceHashJoin::rehashBuckets(size_t to_size) @@ -571,7 +580,7 @@ IBlocksStreamPtr GraceHashJoin::getDelayedBlocks() return std::make_unique(current_bucket->idx, buckets, hash_join, left_key_names, right_key_names); } - LOG_TRACE(log, "Finished loading all buckets"); + LOG_TRACE(log, "Finished loading all {} buckets", buckets.size()); current_bucket = nullptr; return nullptr; @@ -593,8 +602,8 @@ void GraceHashJoin::addJoinedBlockImpl(Block block) { std::lock_guard lock(hash_join_mutex); - hash_join->addJoinedBlock(blocks[bucket_index], /* check_limits = */ false); - bool overflow = !fitsInMemory(); + hash_join->addJoinedBlock(std::move(blocks[bucket_index]), /* check_limits = */ false); + bool overflow = hasMemoryOverflow(); if (overflow) { @@ -612,7 +621,7 @@ void GraceHashJoin::addJoinedBlockImpl(Block block) blocks = JoinCommon::scatterBlockByHash(right_key_names, blocks, buckets_snapshot.size()); hash_join = makeInMemoryJoin(); hash_join->addJoinedBlock(blocks[bucket_index], /* check_limits = */ false); - overflow = !fitsInMemory(); + overflow = hasMemoryOverflow(); } blocks[bucket_index].clear(); } diff --git a/src/Interpreters/GraceHashJoin.h b/src/Interpreters/GraceHashJoin.h index f4e75f142f3..aa29838fd27 100644 --- a/src/Interpreters/GraceHashJoin.h +++ b/src/Interpreters/GraceHashJoin.h @@ -96,7 +96,7 @@ private: void addJoinedBlockImpl(Block block); /// Check that @join satisifes limits on rows/bytes in @table_join. - bool fitsInMemory() const; + bool hasMemoryOverflow() const; /// Create new bucket at the end of @destination. void addBucket(Buckets & destination); diff --git a/src/Interpreters/HashJoin.cpp b/src/Interpreters/HashJoin.cpp index 5ff4f9beb05..5ff7d0fe7f8 100644 --- a/src/Interpreters/HashJoin.cpp +++ b/src/Interpreters/HashJoin.cpp @@ -484,28 +484,42 @@ size_t HashJoin::getTotalRowCount() const } } - return res; } size_t HashJoin::getTotalByteCount() const { +#ifdef NDEBUG + size_t debug_blocks_allocated_size = 0; + for (const auto & block : data->blocks) + debug_blocks_allocated_size += block.allocatedBytes(); + + if (data->blocks_allocated_size != debug_blocks_allocated_size) + throw Exception(ErrorCodes::LOGICAL_ERROR, "data->blocks_allocated_size != debug_blocks_allocated_size ({} != {})", + data->blocks_allocated_size, debug_blocks_allocated_size); + + size_t debug_blocks_nullmaps_allocated_size = 0; + for (const auto & nullmap : data->blocks_nullmaps) + debug_blocks_nullmaps_allocated_size += nullmap.second->allocatedBytes(); + + if (data->blocks_nullmaps_allocated_size != debug_blocks_nullmaps_allocated_size) + throw Exception(ErrorCodes::LOGICAL_ERROR, "data->blocks_nullmaps_allocated_size != debug_blocks_nullmaps_allocated_size ({} != {})", + data->blocks_nullmaps_allocated_size, debug_blocks_nullmaps_allocated_size); +#endif + size_t res = 0; - if (data->type == Type::CROSS) - { - for (const auto & block : data->blocks) - res += block.bytes(); - } - else + res += data->blocks_allocated_size; + res += data->blocks_nullmaps_allocated_size; + res += data->pool.size(); + + if (data->type != Type::CROSS) { for (const auto & map : data->maps) { joinDispatch(kind, strictness, map, [&](auto, auto, auto & map_) { res += map_.getTotalByteCountImpl(data->type); }); } - res += data->pool.size(); } - return res; } @@ -691,6 +705,7 @@ bool HashJoin::addJoinedBlock(const Block & source_block, bool check_limits) throw DB::Exception("addJoinedBlock called when HashJoin locked to prevent updates", ErrorCodes::LOGICAL_ERROR); + data->blocks_allocated_size += structured_block.allocatedBytes(); data->blocks.emplace_back(std::move(structured_block)); Block * stored_block = &data->blocks.back(); @@ -759,10 +774,16 @@ bool HashJoin::addJoinedBlock(const Block & source_block, bool check_limits) } if (!multiple_disjuncts && save_nullmap) + { + data->blocks_nullmaps_allocated_size += null_map_holder->allocatedBytes(); data->blocks_nullmaps.emplace_back(stored_block, null_map_holder); + } if (!multiple_disjuncts && not_joined_map) + { + data->blocks_nullmaps_allocated_size += not_joined_map->allocatedBytes(); data->blocks_nullmaps.emplace_back(stored_block, std::move(not_joined_map)); + } if (!check_limits) return true; @@ -1715,6 +1736,16 @@ void HashJoin::joinBlock(Block & block, ExtraBlockPtr & not_processed) } } +HashJoin::~HashJoin() +{ + if (!data) + { + LOG_DEBUG(log, "Join data has been released"); + return; + } + LOG_DEBUG(log, "Join data is being destroyed, {} bytes and {} rows in hash table", getTotalByteCount(), getTotalRowCount()); +} + template struct AdderNonJoined { @@ -1753,7 +1784,6 @@ struct AdderNonJoined } }; - /// Stream from not joined earlier rows of the right table. /// Based on: /// - map offsetInternal saved in used_flags for single disjuncts diff --git a/src/Interpreters/HashJoin.h b/src/Interpreters/HashJoin.h index 5ea47823b69..cb8f3e83022 100644 --- a/src/Interpreters/HashJoin.h +++ b/src/Interpreters/HashJoin.h @@ -149,6 +149,8 @@ class HashJoin : public IJoin public: HashJoin(std::shared_ptr table_join_, const Block & right_sample_block, bool any_take_last_row_ = false); + ~HashJoin() override; + const TableJoin & getTableJoin() const override { return *table_join; } /** Add block of data from right hand of JOIN to the map. @@ -337,6 +339,8 @@ public: Arena pool; bool released = false; + size_t blocks_allocated_size = 0; + size_t blocks_nullmaps_allocated_size = 0; }; using RightTableDataPtr = std::shared_ptr; From 57a35cae3334ead5b8290d27cd3a9967dc1f6ddf Mon Sep 17 00:00:00 2001 From: vdimir Date: Tue, 20 Dec 2022 12:50:27 +0000 Subject: [PATCH 03/13] wip --- src/Core/Block.cpp | 3 + src/Interpreters/GraceHashJoin.cpp | 133 ++++++++++++++++++----------- src/Interpreters/GraceHashJoin.h | 7 +- src/Interpreters/HashJoin.cpp | 63 ++++++++++---- src/Interpreters/HashJoin.h | 13 +-- src/Interpreters/JoinUtils.cpp | 11 ++- 6 files changed, 149 insertions(+), 81 deletions(-) diff --git a/src/Core/Block.cpp b/src/Core/Block.cpp index 2aa66c3e682..6cb324ca084 100644 --- a/src/Core/Block.cpp +++ b/src/Core/Block.cpp @@ -841,6 +841,9 @@ Block concatenateBlocks(const std::vector & blocks) if (blocks.empty()) return {}; + if (blocks.size() == 1) + return blocks.front(); + size_t num_rows = 0; for (const auto & block : blocks) num_rows += block.rows(); diff --git a/src/Interpreters/GraceHashJoin.cpp b/src/Interpreters/GraceHashJoin.cpp index bac61b8205f..aa52ae164e5 100644 --- a/src/Interpreters/GraceHashJoin.cpp +++ b/src/Interpreters/GraceHashJoin.cpp @@ -41,7 +41,7 @@ namespace public: AccumulatedBlockReader(TemporaryFileStream & reader_, std::mutex & mutex_, - size_t result_block_size_ = DEFAULT_BLOCK_SIZE * 8) + size_t result_block_size_ = 0) : reader(reader_) , mutex(mutex_) , result_block_size(result_block_size_) @@ -59,7 +59,7 @@ namespace Blocks blocks; size_t rows_read = 0; - while (rows_read < result_block_size) + while (true) { Block block = reader.read(); rows_read += block.rows(); @@ -69,6 +69,9 @@ namespace return concatenateBlocks(blocks); } blocks.push_back(std::move(block)); + + if (rows_read >= result_block_size) + break; } return concatenateBlocks(blocks); @@ -124,15 +127,14 @@ public: TemporaryFileStream::Stat right; }; - explicit FileBucket(size_t bucket_index_, - TemporaryFileStream & left_file_, - TemporaryFileStream & right_file_, - Poco::Logger * log_) + explicit FileBucket(size_t bucket_index_, GraceHashJoin & grace_join_) : idx{bucket_index_} - , left_file{left_file_} - , right_file{right_file_} + , left_file{ + grace_join_.tmp_data->createStream(grace_join_.left_sample_block)} + , right_file{ + grace_join_.tmp_data->createStream(grace_join_.prepareRightBlock(grace_join_.right_sample_block))} , state{State::WRITING_BLOCKS} - , log(log_) + , log{grace_join_.log} { } @@ -174,14 +176,13 @@ public: { LOG_TRACE(log, "Joining file bucket {}", idx); - { - std::unique_lock left_lock(left_file_mutex); - std::unique_lock right_lock(right_file_mutex); + std::unique_lock left_lock(left_file_mutex); + std::unique_lock right_lock(right_file_mutex); - stats.left = left_file.finishWriting(); - stats.right = right_file.finishWriting(); - state = State::JOINING_BLOCKS; - } + stats.left = left_file.finishWriting(); + stats.right = right_file.finishWriting(); + + state = State::JOINING_BLOCKS; return AccumulatedBlockReader(right_file, right_file_mutex); } @@ -238,15 +239,17 @@ private: namespace { + template -void flushBlocksToBuckets(Blocks & blocks, const GraceHashJoin::Buckets & buckets) +void flushBlocksToBuckets(Blocks & blocks, const GraceHashJoin::Buckets & buckets, size_t except_index = 0) { chassert(blocks.size() == buckets.size()); retryForEach( generateRandomPermutation(1, buckets.size()), // skipping 0 block, since we join it in memory w/o spilling on disk [&](size_t i) { - if (!blocks[i].rows()) + /// Skip empty and current bucket + if (!blocks[i].rows() || i == except_index) return true; bool flushed = false; @@ -281,6 +284,7 @@ GraceHashJoin::GraceHashJoin( , right_key_names(table_join->getOnlyClause().key_names_right) , tmp_data(std::make_unique(tmp_data_, CurrentMetrics::TemporaryFilesForJoin)) , hash_join(makeInMemoryJoin()) + , hash_join_sample_block(hash_join->savedBlockSample()) { if (!GraceHashJoin::isSupported(table_join)) throw Exception(ErrorCodes::NOT_IMPLEMENTED, "GraceHashJoin is not supported for this join type"); @@ -303,7 +307,7 @@ void GraceHashJoin::initBuckets() if (buckets.empty()) throw Exception(ErrorCodes::LOGICAL_ERROR, "No buckets created"); - LOG_TRACE(log, "Initialize {} buckets", buckets.size()); + LOG_TRACE(log, "Initialize {} bucket{}", buckets.size(), buckets.size() > 1 ? "s" : ""); current_bucket = buckets.front().get(); current_bucket->startJoining(); @@ -323,10 +327,26 @@ bool GraceHashJoin::addJoinedBlock(const Block & block, bool /*check_limits*/) throw Exception(ErrorCodes::LOGICAL_ERROR, "GraceHashJoin is not initialized"); Block materialized = materializeBlock(block); - addJoinedBlockImpl(materialized); + addJoinedBlockImpl(std::move(materialized)); return true; } +bool GraceHashJoin::hasMemoryOverflow(const Block & block) const +{ + /// One row can't be split, avoid loop + if (block.rows() < 2) + return false; + + bool has_overflow = !table_join->sizeLimits().softCheck(block.rows(), block.allocatedBytes()); + + if (has_overflow) + LOG_TRACE(log, "GraceHashJoin has memory overflow, block {} / {} bytes, {} / {} rows", + ReadableSize(block.allocatedBytes()), ReadableSize(table_join->sizeLimits().max_bytes), + block.rows(), table_join->sizeLimits().max_rows); + + return has_overflow; +} + bool GraceHashJoin::hasMemoryOverflow() const { /// One row can't be split, avoid loop @@ -339,7 +359,7 @@ bool GraceHashJoin::hasMemoryOverflow() const bool has_overflow = !table_join->sizeLimits().softCheck(total_rows, total_bytes); if (has_overflow) - LOG_TRACE(log, "GraceHashJoin has memory overflow {} / {} bytes, {} / {} rows", + LOG_TRACE(log, "GraceHashJoin has memory overflow, hash {} / {} bytes, {} / {} rows", ReadableSize(total_bytes), ReadableSize(table_join->sizeLimits().max_bytes), total_rows, table_join->sizeLimits().max_rows); return has_overflow; @@ -353,7 +373,7 @@ GraceHashJoin::Buckets GraceHashJoin::rehashBuckets(size_t to_size) if (to_size <= current_size) return buckets; - assert(isPowerOf2(to_size)); + chassert(isPowerOf2(to_size)); if (to_size > max_num_buckets) { @@ -373,8 +393,7 @@ GraceHashJoin::Buckets GraceHashJoin::rehashBuckets(size_t to_size) void GraceHashJoin::addBucket(Buckets & destination) { - BucketPtr new_bucket = std::make_shared( - destination.size(), tmp_data->createStream(left_sample_block), tmp_data->createStream(right_sample_block), log); + BucketPtr new_bucket = std::make_shared(destination.size(), *this); destination.emplace_back(std::move(new_bucket)); } @@ -538,17 +557,11 @@ IBlocksStreamPtr GraceHashJoin::getDelayedBlocks() if (hash_join) { - auto right_blocks = hash_join->releaseJoinedBlocks(); - Blocks blocks = JoinCommon::scatterBlockByHash(right_key_names, right_blocks, buckets.size()); - - for (size_t i = 0; i < blocks.size(); ++i) + auto right_blocks = hash_join->releaseJoinedBlocks(/* restructure */ false); + for (auto & block : right_blocks) { - if (blocks[i].rows() == 0 || i == bucket_idx) - continue; - - if (i < bucket_idx) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected bucket index {} when current bucket is {}", i, bucket_idx); - buckets[i]->addRightBlock(blocks[i]); + Blocks blocks = JoinCommon::scatterBlockByHash(right_key_names, block, buckets.size()); + flushBlocksToBuckets(blocks, buckets, bucket_idx); } } @@ -591,42 +604,58 @@ GraceHashJoin::InMemoryJoinPtr GraceHashJoin::makeInMemoryJoin() return std::make_unique(table_join, right_sample_block, any_take_last_row); } +Block GraceHashJoin::prepareRightBlock(const Block & block) +{ + return HashJoin::prepareRightBlock(block, hash_join_sample_block); +} + void GraceHashJoin::addJoinedBlockImpl(Block block) { Buckets buckets_snapshot = getCurrentBuckets(); - Blocks blocks = JoinCommon::scatterBlockByHash(right_key_names, block, buckets_snapshot.size()); size_t bucket_index = current_bucket->idx; + Block current_block; + + { + Blocks blocks = JoinCommon::scatterBlockByHash(right_key_names, block, buckets_snapshot.size()); + flushBlocksToBuckets(blocks, buckets_snapshot, bucket_index); + current_block = std::move(blocks[bucket_index]); + } // Add block to the in-memory join - if (blocks[bucket_index].rows() > 0) + if (current_block.rows() > 0) { std::lock_guard lock(hash_join_mutex); - hash_join->addJoinedBlock(std::move(blocks[bucket_index]), /* check_limits = */ false); - bool overflow = hasMemoryOverflow(); + hash_join->addJoinedBlock(current_block, /* check_limits = */ false); - if (overflow) + if (!hasMemoryOverflow()) + return; + + current_block = {}; + + auto right_blocks = hash_join->releaseJoinedBlocks(/* restructure */ false); + + for (const auto & right_block : right_blocks) { - auto right_blocks = hash_join->releaseJoinedBlocks(); - right_blocks.pop_back(); - - for (const auto & right_block : right_blocks) - blocks.push_back(right_block); + Blocks blocks = JoinCommon::scatterBlockByHash(right_key_names, right_block, buckets_snapshot.size()); + flushBlocksToBuckets(blocks, buckets_snapshot, bucket_index); + current_block = blocks[bucket_index]; } - while (overflow) + while (hasMemoryOverflow(current_block)) { buckets_snapshot = rehashBuckets(buckets_snapshot.size() * 2); - blocks = JoinCommon::scatterBlockByHash(right_key_names, blocks, buckets_snapshot.size()); - hash_join = makeInMemoryJoin(); - hash_join->addJoinedBlock(blocks[bucket_index], /* check_limits = */ false); - overflow = hasMemoryOverflow(); + Blocks blocks = JoinCommon::scatterBlockByHash(right_key_names, current_block, buckets_snapshot.size()); + flushBlocksToBuckets(blocks, buckets_snapshot, bucket_index); + current_block = blocks[bucket_index]; } - blocks[bucket_index].clear(); - } - flushBlocksToBuckets(blocks, buckets_snapshot); + hash_join = makeInMemoryJoin(); + + if (current_block.rows() > 0) + hash_join->addJoinedBlock(current_block, /* check_limits = */ false); + } } size_t GraceHashJoin::getNumBuckets() const diff --git a/src/Interpreters/GraceHashJoin.h b/src/Interpreters/GraceHashJoin.h index aa29838fd27..3076c8870d7 100644 --- a/src/Interpreters/GraceHashJoin.h +++ b/src/Interpreters/GraceHashJoin.h @@ -95,8 +95,10 @@ private: /// Add right table block to the @join. Calls @rehash on overflow. void addJoinedBlockImpl(Block block); - /// Check that @join satisifes limits on rows/bytes in @table_join. + /// Check that join satisifes limits on rows/bytes in table_join. bool hasMemoryOverflow() const; + /// Check that block satisifes limits on rows/bytes in table_join. + bool hasMemoryOverflow(const Block & block) const; /// Create new bucket at the end of @destination. void addBucket(Buckets & destination); @@ -114,6 +116,8 @@ private: size_t getNumBuckets() const; Buckets getCurrentBuckets() const; + Block prepareRightBlock(const Block & block); + Poco::Logger * log; ContextPtr context; std::shared_ptr table_join; @@ -136,6 +140,7 @@ private: mutable std::mutex current_bucket_mutex; InMemoryJoinPtr hash_join; + Block hash_join_sample_block; mutable std::mutex hash_join_mutex; }; diff --git a/src/Interpreters/HashJoin.cpp b/src/Interpreters/HashJoin.cpp index 5ff7d0fe7f8..e7ad3f30c4a 100644 --- a/src/Interpreters/HashJoin.cpp +++ b/src/Interpreters/HashJoin.cpp @@ -469,6 +469,9 @@ bool HashJoin::alwaysReturnsEmptySet() const size_t HashJoin::getTotalRowCount() const { + if (!data) + return 0; + size_t res = 0; if (data->type == Type::CROSS) @@ -489,6 +492,9 @@ size_t HashJoin::getTotalRowCount() const size_t HashJoin::getTotalByteCount() const { + if (!data) + return 0; + #ifdef NDEBUG size_t debug_blocks_allocated_size = 0; for (const auto & block : data->blocks) @@ -670,34 +676,44 @@ void HashJoin::initRightBlockStructure(Block & saved_block_sample) } } -Block HashJoin::structureRightBlock(const Block & block) const +Block HashJoin::prepareRightBlock(const Block & block, const Block & saved_block_sample_) { Block structured_block; - for (const auto & sample_column : savedBlockSample().getColumnsWithTypeAndName()) + for (const auto & sample_column : saved_block_sample_.getColumnsWithTypeAndName()) { ColumnWithTypeAndName column = block.getByName(sample_column.name); if (sample_column.column->isNullable()) JoinCommon::convertColumnToNullable(column); - structured_block.insert(column); - } + /// There's no optimization for right side const columns. Remove constness if any. + column.column = recursiveRemoveSparse(column.column->convertToFullColumnIfConst()); + structured_block.insert(std::move(column)); + } return structured_block; } +Block HashJoin::prepareRightBlock(const Block & block) const +{ + return prepareRightBlock(block, savedBlockSample()); +} + bool HashJoin::addJoinedBlock(const Block & source_block, bool check_limits) { + if (!data) + throw Exception("Join data was released", ErrorCodes::LOGICAL_ERROR); + + Block structured_block = source_block; + prepareRightBlock(structured_block); + /// RowRef::SizeT is uint32_t (not size_t) for hash table Cell memory efficiency. /// It's possible to split bigger blocks and insert them by parts here. But it would be a dead code. - if (unlikely(source_block.rows() > std::numeric_limits::max())) - throw Exception("Too many rows in right table block for HashJoin: " + toString(source_block.rows()), ErrorCodes::NOT_IMPLEMENTED); + if (unlikely(structured_block.rows() > std::numeric_limits::max())) + throw Exception("Too many rows in right table block for HashJoin: " + toString(structured_block.rows()), ErrorCodes::NOT_IMPLEMENTED); - /// There's no optimization for right side const columns. Remove constness if any. - Block block = materializeBlock(source_block); - size_t rows = block.rows(); + size_t rows = structured_block.rows(); - ColumnRawPtrMap all_key_columns = JoinCommon::materializeColumnsInplaceMap(block, table_join->getAllNames(JoinTableSide::Right)); + ColumnRawPtrMap all_key_columns = JoinCommon::materializeColumnsInplaceMap(structured_block, table_join->getAllNames(JoinTableSide::Right)); - Block structured_block = structureRightBlock(block); size_t total_rows = 0; size_t total_bytes = 0; { @@ -733,14 +749,14 @@ bool HashJoin::addJoinedBlock(const Block & source_block, bool check_limits) save_nullmap |= (*null_map)[i]; } - auto join_mask_col = JoinCommon::getColumnAsMask(block, onexprs[onexpr_idx].condColumnNames().second); + auto join_mask_col = JoinCommon::getColumnAsMask(structured_block, onexprs[onexpr_idx].condColumnNames().second); /// Save blocks that do not hold conditions in ON section ColumnUInt8::MutablePtr not_joined_map = nullptr; if (!multiple_disjuncts && isRightOrFull(kind) && !join_mask_col.isConstant()) { const auto & join_mask = join_mask_col.getData(); /// Save rows that do not hold conditions - not_joined_map = ColumnUInt8::create(block.rows(), 0); + not_joined_map = ColumnUInt8::create(rows, 0); for (size_t i = 0, sz = join_mask->size(); i < sz; ++i) { /// Condition hold, do not save row @@ -1697,7 +1713,7 @@ void HashJoin::checkTypesOfKeys(const Block & block) const void HashJoin::joinBlock(Block & block, ExtraBlockPtr & not_processed) { - if (data->released) + if (!data) throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot join after data has been released"); for (const auto & onexpr : table_join->getClauses()) @@ -1794,7 +1810,10 @@ class NotJoinedHash final : public NotJoinedBlocks::RightColumnsFiller public: NotJoinedHash(const HashJoin & parent_, UInt64 max_block_size_) : parent(parent_), max_block_size(max_block_size_), current_block_start(0) - {} + { + if (parent.data == nullptr) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot join after data has been released"); + } Block getEmptyBlock() override { return parent.savedBlockSample().cloneEmpty(); } @@ -1991,7 +2010,6 @@ IBlocksStreamPtr HashJoin::getNonJoinedBlocks(const Block & left_sample_block, size_t left_columns_count = left_sample_block.columns(); auto non_joined = std::make_unique>(*this, max_block_size); return std::make_unique(std::move(non_joined), result_sample_block, left_columns_count, *table_join); - } else { @@ -2020,10 +2038,18 @@ void HashJoin::reuseJoinedData(const HashJoin & join) } } -BlocksList HashJoin::releaseJoinedBlocks() +BlocksList HashJoin::releaseJoinedBlocks(bool restructure) { BlocksList right_blocks = std::move(data->blocks); - data->released = true; + if (!restructure) + { + data.reset(); + return right_blocks; + } + + data->maps.clear(); + data->blocks_nullmaps.clear(); + BlocksList restored_blocks; /// names to positions optimization @@ -2052,6 +2078,7 @@ BlocksList HashJoin::releaseJoinedBlocks() restored_blocks.emplace_back(std::move(restored_block)); } + data.reset(); return restored_blocks; } diff --git a/src/Interpreters/HashJoin.h b/src/Interpreters/HashJoin.h index cb8f3e83022..b348829a990 100644 --- a/src/Interpreters/HashJoin.h +++ b/src/Interpreters/HashJoin.h @@ -338,7 +338,6 @@ public: /// Additional data - strings for string keys and continuation elements of single-linked lists of references to rows. Arena pool; - bool released = false; size_t blocks_allocated_size = 0; size_t blocks_nullmaps_allocated_size = 0; }; @@ -355,7 +354,13 @@ public: void reuseJoinedData(const HashJoin & join); RightTableDataPtr getJoinedData() const { return data; } - BlocksList releaseJoinedBlocks(); + BlocksList releaseJoinedBlocks(bool restructure = false); + + /// Modify (structure) right block to save it in block list + static Block prepareRightBlock(const Block & block, const Block & saved_block_sample_); + Block prepareRightBlock(const Block & block) const; + + const Block & savedBlockSample() const { return data->sample_block; } bool isUsed(size_t off) const { return used_flags.getUsedSafe(off); } bool isUsed(const Block * block_ptr, size_t row_idx) const { return used_flags.getUsedSafe(block_ptr, row_idx); } @@ -407,10 +412,6 @@ private: void dataMapInit(MapsVariant &); - const Block & savedBlockSample() const { return data->sample_block; } - - /// Modify (structure) right block to save it in block list - Block structureRightBlock(const Block & stored_block) const; void initRightBlockStructure(Block & saved_block_sample); template diff --git a/src/Interpreters/JoinUtils.cpp b/src/Interpreters/JoinUtils.cpp index a4ec64ab70e..ce151ccaa6a 100644 --- a/src/Interpreters/JoinUtils.cpp +++ b/src/Interpreters/JoinUtils.cpp @@ -332,7 +332,11 @@ ColumnRawPtrMap materializeColumnsInplaceMap(Block & block, const Names & names) for (const auto & column_name : names) { auto & column = block.getByName(column_name); - column.column = recursiveRemoveLowCardinality(column.column->convertToFullColumnIfConst()); + + column.column = column.column->convertToFullColumnIfConst(); + column.column = recursiveRemoveLowCardinality(column.column); + column.column = recursiveRemoveSparse(column.column); + column.type = recursiveRemoveLowCardinality(column.type); ptrs[column_name] = column.column.get(); } @@ -639,9 +643,8 @@ Blocks scatterBlockByHash(const Strings & key_columns_names, const Block & block { if (num_shards == 0) throw Exception(ErrorCodes::LOGICAL_ERROR, "Number of shards must be positive"); - UNUSED(scatterBlockByHashPow2); - // if (likely(isPowerOf2(num_shards))) - // return scatterBlockByHashPow2(key_columns_names, block, num_shards); + if (likely(isPowerOf2(num_shards))) + return scatterBlockByHashPow2(key_columns_names, block, num_shards); return scatterBlockByHashGeneric(key_columns_names, block, num_shards); } From beb8ba7e62f0da2ce316c271674e98406979091c Mon Sep 17 00:00:00 2001 From: vdimir Date: Tue, 27 Dec 2022 10:13:12 +0000 Subject: [PATCH 04/13] wip --- src/Interpreters/GraceHashJoin.cpp | 63 +++++++++++++++--------------- src/Interpreters/GraceHashJoin.h | 6 +-- src/Interpreters/HashJoin.cpp | 35 ++++++++++++----- src/Interpreters/HashJoin.h | 2 + src/Interpreters/JoinUtils.cpp | 4 +- src/Interpreters/JoinUtils.h | 2 +- 6 files changed, 64 insertions(+), 48 deletions(-) diff --git a/src/Interpreters/GraceHashJoin.cpp b/src/Interpreters/GraceHashJoin.cpp index aa52ae164e5..0ec1a308553 100644 --- a/src/Interpreters/GraceHashJoin.cpp +++ b/src/Interpreters/GraceHashJoin.cpp @@ -331,40 +331,42 @@ bool GraceHashJoin::addJoinedBlock(const Block & block, bool /*check_limits*/) return true; } -bool GraceHashJoin::hasMemoryOverflow(const Block & block) const +bool GraceHashJoin::hasMemoryOverflow(size_t total_rows, size_t total_bytes) const { /// One row can't be split, avoid loop - if (block.rows() < 2) + if (total_rows < 2) return false; - bool has_overflow = !table_join->sizeLimits().softCheck(block.rows(), block.allocatedBytes()); - - if (has_overflow) - LOG_TRACE(log, "GraceHashJoin has memory overflow, block {} / {} bytes, {} / {} rows", - ReadableSize(block.allocatedBytes()), ReadableSize(table_join->sizeLimits().max_bytes), - block.rows(), table_join->sizeLimits().max_rows); - - return has_overflow; -} - -bool GraceHashJoin::hasMemoryOverflow() const -{ - /// One row can't be split, avoid loop - if (hash_join->getTotalRowCount() < 2) - return false; - - size_t total_rows = hash_join->getTotalRowCount(); - size_t total_bytes = hash_join->getTotalByteCount(); - bool has_overflow = !table_join->sizeLimits().softCheck(total_rows, total_bytes); if (has_overflow) - LOG_TRACE(log, "GraceHashJoin has memory overflow, hash {} / {} bytes, {} / {} rows", + LOG_TRACE(log, "Memory overflow, size exceeded {} / {} bytes, {} / {} rows", ReadableSize(total_bytes), ReadableSize(table_join->sizeLimits().max_bytes), total_rows, table_join->sizeLimits().max_rows); + return has_overflow; } +bool GraceHashJoin::hasMemoryOverflow(const BlocksList & blocks) const +{ + size_t total_rows = 0; + size_t total_bytes = 0; + for (const auto & block : blocks) + { + total_rows += block.rows(); + total_bytes += block.allocatedBytes(); + } + return hasMemoryOverflow(total_rows, total_bytes); +} + +bool GraceHashJoin::hasMemoryOverflow(const InMemoryJoinPtr & hash_join_) const +{ + size_t total_rows = hash_join_->getTotalRowCount(); + size_t total_bytes = hash_join_->getTotalByteCount(); + + return hasMemoryOverflow(total_rows, total_bytes); +} + GraceHashJoin::Buckets GraceHashJoin::rehashBuckets(size_t to_size) { std::unique_lock lock(rehash_mutex); @@ -628,28 +630,25 @@ void GraceHashJoin::addJoinedBlockImpl(Block block) hash_join->addJoinedBlock(current_block, /* check_limits = */ false); - if (!hasMemoryOverflow()) + if (!hasMemoryOverflow(hash_join)) return; current_block = {}; auto right_blocks = hash_join->releaseJoinedBlocks(/* restructure */ false); + buckets_snapshot = rehashBuckets(buckets_snapshot.size() * 2); + + Blocks current_blocks; + current_blocks.reserve(right_blocks.size()); for (const auto & right_block : right_blocks) { Blocks blocks = JoinCommon::scatterBlockByHash(right_key_names, right_block, buckets_snapshot.size()); flushBlocksToBuckets(blocks, buckets_snapshot, bucket_index); - current_block = blocks[bucket_index]; + current_blocks.emplace_back(std::move(blocks[bucket_index])); } - while (hasMemoryOverflow(current_block)) - { - buckets_snapshot = rehashBuckets(buckets_snapshot.size() * 2); - - Blocks blocks = JoinCommon::scatterBlockByHash(right_key_names, current_block, buckets_snapshot.size()); - flushBlocksToBuckets(blocks, buckets_snapshot, bucket_index); - current_block = blocks[bucket_index]; - } + current_block = concatenateBlocks(std::move(current_blocks)); hash_join = makeInMemoryJoin(); diff --git a/src/Interpreters/GraceHashJoin.h b/src/Interpreters/GraceHashJoin.h index 3076c8870d7..ff0f1a96ca2 100644 --- a/src/Interpreters/GraceHashJoin.h +++ b/src/Interpreters/GraceHashJoin.h @@ -96,9 +96,9 @@ private: void addJoinedBlockImpl(Block block); /// Check that join satisifes limits on rows/bytes in table_join. - bool hasMemoryOverflow() const; - /// Check that block satisifes limits on rows/bytes in table_join. - bool hasMemoryOverflow(const Block & block) const; + bool hasMemoryOverflow(size_t total_rows, size_t total_bytes) const; + bool hasMemoryOverflow(const InMemoryJoinPtr & hash_join_) const; + bool hasMemoryOverflow(const BlocksList & blocks) const; /// Create new bucket at the end of @destination. void addBucket(Buckets & destination); diff --git a/src/Interpreters/HashJoin.cpp b/src/Interpreters/HashJoin.cpp index e7ad3f30c4a..abcf14ef1a4 100644 --- a/src/Interpreters/HashJoin.cpp +++ b/src/Interpreters/HashJoin.cpp @@ -676,6 +676,20 @@ void HashJoin::initRightBlockStructure(Block & saved_block_sample) } } +Block HashJoin::structureRightBlock(const Block & block) const +{ + Block structured_block; + for (const auto & sample_column : savedBlockSample().getColumnsWithTypeAndName()) + { + ColumnWithTypeAndName column = block.getByName(sample_column.name); + if (sample_column.column->isNullable()) + JoinCommon::convertColumnToNullable(column); + structured_block.insert(column); + } + + return structured_block; +} + Block HashJoin::prepareRightBlock(const Block & block, const Block & saved_block_sample_) { Block structured_block; @@ -689,6 +703,7 @@ Block HashJoin::prepareRightBlock(const Block & block, const Block & saved_block column.column = recursiveRemoveSparse(column.column->convertToFullColumnIfConst()); structured_block.insert(std::move(column)); } + return structured_block; } @@ -702,18 +717,16 @@ bool HashJoin::addJoinedBlock(const Block & source_block, bool check_limits) if (!data) throw Exception("Join data was released", ErrorCodes::LOGICAL_ERROR); - Block structured_block = source_block; - prepareRightBlock(structured_block); - /// RowRef::SizeT is uint32_t (not size_t) for hash table Cell memory efficiency. /// It's possible to split bigger blocks and insert them by parts here. But it would be a dead code. - if (unlikely(structured_block.rows() > std::numeric_limits::max())) - throw Exception("Too many rows in right table block for HashJoin: " + toString(structured_block.rows()), ErrorCodes::NOT_IMPLEMENTED); + if (unlikely(source_block.rows() > std::numeric_limits::max())) + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Too many rows in right table block for HashJoin: {}", source_block.rows()); - size_t rows = structured_block.rows(); + size_t rows = source_block.rows(); - ColumnRawPtrMap all_key_columns = JoinCommon::materializeColumnsInplaceMap(structured_block, table_join->getAllNames(JoinTableSide::Right)); + ColumnRawPtrMap all_key_columns = JoinCommon::materializeColumnsInplaceMap(source_block, table_join->getAllNames(JoinTableSide::Right)); + Block block_to_save = prepareRightBlock(source_block); size_t total_rows = 0; size_t total_bytes = 0; { @@ -721,8 +734,8 @@ bool HashJoin::addJoinedBlock(const Block & source_block, bool check_limits) throw DB::Exception("addJoinedBlock called when HashJoin locked to prevent updates", ErrorCodes::LOGICAL_ERROR); - data->blocks_allocated_size += structured_block.allocatedBytes(); - data->blocks.emplace_back(std::move(structured_block)); + data->blocks_allocated_size += block_to_save.allocatedBytes(); + data->blocks.emplace_back(std::move(block_to_save)); Block * stored_block = &data->blocks.back(); if (rows) @@ -749,7 +762,7 @@ bool HashJoin::addJoinedBlock(const Block & source_block, bool check_limits) save_nullmap |= (*null_map)[i]; } - auto join_mask_col = JoinCommon::getColumnAsMask(structured_block, onexprs[onexpr_idx].condColumnNames().second); + auto join_mask_col = JoinCommon::getColumnAsMask(source_block, onexprs[onexpr_idx].condColumnNames().second); /// Save blocks that do not hold conditions in ON section ColumnUInt8::MutablePtr not_joined_map = nullptr; if (!multiple_disjuncts && isRightOrFull(kind) && !join_mask_col.isConstant()) @@ -2040,6 +2053,8 @@ void HashJoin::reuseJoinedData(const HashJoin & join) BlocksList HashJoin::releaseJoinedBlocks(bool restructure) { + LOG_DEBUG(log, "Join data is being released, {} bytes and {} rows in hash table", getTotalByteCount(), getTotalRowCount()); + BlocksList right_blocks = std::move(data->blocks); if (!restructure) { diff --git a/src/Interpreters/HashJoin.h b/src/Interpreters/HashJoin.h index b348829a990..7b4fa1d5344 100644 --- a/src/Interpreters/HashJoin.h +++ b/src/Interpreters/HashJoin.h @@ -356,6 +356,8 @@ public: RightTableDataPtr getJoinedData() const { return data; } BlocksList releaseJoinedBlocks(bool restructure = false); + Block structureRightBlock(const Block & block) const; + /// Modify (structure) right block to save it in block list static Block prepareRightBlock(const Block & block, const Block & saved_block_sample_); Block prepareRightBlock(const Block & block) const; diff --git a/src/Interpreters/JoinUtils.cpp b/src/Interpreters/JoinUtils.cpp index ce151ccaa6a..7a2a7dc5037 100644 --- a/src/Interpreters/JoinUtils.cpp +++ b/src/Interpreters/JoinUtils.cpp @@ -324,14 +324,14 @@ ColumnRawPtrs materializeColumnsInplace(Block & block, const Names & names) return ptrs; } -ColumnRawPtrMap materializeColumnsInplaceMap(Block & block, const Names & names) +ColumnRawPtrMap materializeColumnsInplaceMap(const Block & block, const Names & names) { ColumnRawPtrMap ptrs; ptrs.reserve(names.size()); for (const auto & column_name : names) { - auto & column = block.getByName(column_name); + ColumnWithTypeAndName column = block.getByName(column_name); column.column = column.column->convertToFullColumnIfConst(); column.column = recursiveRemoveLowCardinality(column.column); diff --git a/src/Interpreters/JoinUtils.h b/src/Interpreters/JoinUtils.h index 36be71f2a91..6a5449d7a58 100644 --- a/src/Interpreters/JoinUtils.h +++ b/src/Interpreters/JoinUtils.h @@ -71,7 +71,7 @@ ColumnPtr emptyNotNullableClone(const ColumnPtr & column); ColumnPtr materializeColumn(const Block & block, const String & name); Columns materializeColumns(const Block & block, const Names & names); ColumnRawPtrs materializeColumnsInplace(Block & block, const Names & names); -ColumnRawPtrMap materializeColumnsInplaceMap(Block & block, const Names & names); +ColumnRawPtrMap materializeColumnsInplaceMap(const Block & block, const Names & names); ColumnRawPtrs getRawPointers(const Columns & columns); void convertToFullColumnsInplace(Block & block); void convertToFullColumnsInplace(Block & block, const Names & names, bool change_type = true); From 18d751aed4e5ecc61ba69139b19750d6306b4163 Mon Sep 17 00:00:00 2001 From: vdimir Date: Tue, 27 Dec 2022 17:49:17 +0000 Subject: [PATCH 05/13] wip --- src/Interpreters/HashJoin.cpp | 4 ++-- src/Interpreters/JoinUtils.cpp | 15 +++++++-------- src/Interpreters/JoinUtils.h | 3 ++- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/src/Interpreters/HashJoin.cpp b/src/Interpreters/HashJoin.cpp index abcf14ef1a4..1da55537408 100644 --- a/src/Interpreters/HashJoin.cpp +++ b/src/Interpreters/HashJoin.cpp @@ -724,7 +724,7 @@ bool HashJoin::addJoinedBlock(const Block & source_block, bool check_limits) size_t rows = source_block.rows(); - ColumnRawPtrMap all_key_columns = JoinCommon::materializeColumnsInplaceMap(source_block, table_join->getAllNames(JoinTableSide::Right)); + ColumnPtrMap all_key_columns = JoinCommon::materializeColumnsInplaceMap(source_block, table_join->getAllNames(JoinTableSide::Right)); Block block_to_save = prepareRightBlock(source_block); size_t total_rows = 0; @@ -747,7 +747,7 @@ bool HashJoin::addJoinedBlock(const Block & source_block, bool check_limits) { ColumnRawPtrs key_columns; for (const auto & name : onexprs[onexpr_idx].key_names_right) - key_columns.push_back(all_key_columns[name]); + key_columns.push_back(all_key_columns[name].get()); /// We will insert to the map only keys, where all components are not NULL. ConstNullMapPtr null_map{}; diff --git a/src/Interpreters/JoinUtils.cpp b/src/Interpreters/JoinUtils.cpp index 7a2a7dc5037..9a0781cd2f3 100644 --- a/src/Interpreters/JoinUtils.cpp +++ b/src/Interpreters/JoinUtils.cpp @@ -324,21 +324,20 @@ ColumnRawPtrs materializeColumnsInplace(Block & block, const Names & names) return ptrs; } -ColumnRawPtrMap materializeColumnsInplaceMap(const Block & block, const Names & names) +ColumnPtrMap materializeColumnsInplaceMap(const Block & block, const Names & names) { - ColumnRawPtrMap ptrs; + ColumnPtrMap ptrs; ptrs.reserve(names.size()); for (const auto & column_name : names) { - ColumnWithTypeAndName column = block.getByName(column_name); + ColumnPtr column = block.getByName(column_name).column; - column.column = column.column->convertToFullColumnIfConst(); - column.column = recursiveRemoveLowCardinality(column.column); - column.column = recursiveRemoveSparse(column.column); + column = column->convertToFullColumnIfConst(); + column = recursiveRemoveLowCardinality(column); + column = recursiveRemoveSparse(column); - column.type = recursiveRemoveLowCardinality(column.type); - ptrs[column_name] = column.column.get(); + ptrs[column_name] = column; } return ptrs; diff --git a/src/Interpreters/JoinUtils.h b/src/Interpreters/JoinUtils.h index 6a5449d7a58..b5bdf801b0a 100644 --- a/src/Interpreters/JoinUtils.h +++ b/src/Interpreters/JoinUtils.h @@ -14,6 +14,7 @@ class TableJoin; class IColumn; using ColumnRawPtrs = std::vector; +using ColumnPtrMap = std::unordered_map; using ColumnRawPtrMap = std::unordered_map; using UInt8ColumnDataPtr = const ColumnUInt8::Container *; @@ -71,7 +72,7 @@ ColumnPtr emptyNotNullableClone(const ColumnPtr & column); ColumnPtr materializeColumn(const Block & block, const String & name); Columns materializeColumns(const Block & block, const Names & names); ColumnRawPtrs materializeColumnsInplace(Block & block, const Names & names); -ColumnRawPtrMap materializeColumnsInplaceMap(const Block & block, const Names & names); +ColumnPtrMap materializeColumnsInplaceMap(const Block & block, const Names & names); ColumnRawPtrs getRawPointers(const Columns & columns); void convertToFullColumnsInplace(Block & block); void convertToFullColumnsInplace(Block & block, const Names & names, bool change_type = true); From 4aecb836a9afe0451829e95c5513a9058f69d2a0 Mon Sep 17 00:00:00 2001 From: vdimir Date: Wed, 28 Dec 2022 13:20:58 +0000 Subject: [PATCH 06/13] Fix JoinMask --- src/Interpreters/HashJoin.cpp | 3 +-- src/Interpreters/JoinUtils.cpp | 8 ++++---- src/Interpreters/JoinUtils.h | 22 ++++++++++------------ src/Interpreters/MergeJoin.cpp | 2 +- 4 files changed, 16 insertions(+), 19 deletions(-) diff --git a/src/Interpreters/HashJoin.cpp b/src/Interpreters/HashJoin.cpp index 1da55537408..e35b0e8a225 100644 --- a/src/Interpreters/HashJoin.cpp +++ b/src/Interpreters/HashJoin.cpp @@ -765,7 +765,7 @@ bool HashJoin::addJoinedBlock(const Block & source_block, bool check_limits) auto join_mask_col = JoinCommon::getColumnAsMask(source_block, onexprs[onexpr_idx].condColumnNames().second); /// Save blocks that do not hold conditions in ON section ColumnUInt8::MutablePtr not_joined_map = nullptr; - if (!multiple_disjuncts && isRightOrFull(kind) && !join_mask_col.isConstant()) + if (!multiple_disjuncts && isRightOrFull(kind) && join_mask_col.hasData()) { const auto & join_mask = join_mask_col.getData(); /// Save rows that do not hold conditions @@ -845,7 +845,6 @@ struct JoinOnKeyColumns Sizes key_sizes; - explicit JoinOnKeyColumns(const Block & block, const Names & key_names_, const String & cond_column_name, const Sizes & key_sizes_) : key_names(key_names_) , materialized_keys_holder(JoinCommon::materializeColumns(block, key_names)) /// Rare case, when keys are constant or low cardinality. To avoid code bloat, simply materialize them. diff --git a/src/Interpreters/JoinUtils.cpp b/src/Interpreters/JoinUtils.cpp index 9a0781cd2f3..b8d8dd5df74 100644 --- a/src/Interpreters/JoinUtils.cpp +++ b/src/Interpreters/JoinUtils.cpp @@ -532,24 +532,24 @@ bool typesEqualUpToNullability(DataTypePtr left_type, DataTypePtr right_type) JoinMask getColumnAsMask(const Block & block, const String & column_name) { if (column_name.empty()) - return JoinMask(true); + return JoinMask(true, block.rows()); const auto & src_col = block.getByName(column_name); DataTypePtr col_type = recursiveRemoveLowCardinality(src_col.type); if (isNothing(col_type)) - return JoinMask(false); + return JoinMask(false, block.rows()); if (const auto * const_cond = checkAndGetColumn(*src_col.column)) { - return JoinMask(const_cond->getBool(0)); + return JoinMask(const_cond->getBool(0), block.rows()); } ColumnPtr join_condition_col = recursiveRemoveLowCardinality(src_col.column->convertToFullColumnIfConst()); if (const auto * nullable_col = typeid_cast(join_condition_col.get())) { if (isNothing(assert_cast(*col_type).getNestedType())) - return JoinMask(false); + return JoinMask(false, block.rows()); /// Return nested column with NULL set to false const auto & nest_col = assert_cast(nullable_col->getNestedColumn()); diff --git a/src/Interpreters/JoinUtils.h b/src/Interpreters/JoinUtils.h index b5bdf801b0a..f4f5f5bdc8d 100644 --- a/src/Interpreters/JoinUtils.h +++ b/src/Interpreters/JoinUtils.h @@ -21,24 +21,26 @@ using UInt8ColumnDataPtr = const ColumnUInt8::Container *; namespace JoinCommon { -/// Store boolean column handling constant value without materializing -/// Behaves similar to std::variant, but provides more convenient specialized interface class JoinMask { public: - explicit JoinMask(bool value) + explicit JoinMask() : column(nullptr) - , const_value(value) + {} + + explicit JoinMask(bool value, size_t size) + : column(ColumnUInt8::create(size, value)) {} explicit JoinMask(ColumnPtr col) : column(col) - , const_value(false) {} - bool isConstant() { return !column; } + bool hasData() + { + return column != nullptr; + } - /// Return data if mask is not constant UInt8ColumnDataPtr getData() { if (column) @@ -48,15 +50,11 @@ public: inline bool isRowFiltered(size_t row) const { - if (column) - return !assert_cast(*column).getData()[row]; - return !const_value; + return !assert_cast(*column).getData()[row]; } private: ColumnPtr column; - /// Used if column is null - bool const_value; }; diff --git a/src/Interpreters/MergeJoin.cpp b/src/Interpreters/MergeJoin.cpp index 2d54accc76a..1b3f35614f9 100644 --- a/src/Interpreters/MergeJoin.cpp +++ b/src/Interpreters/MergeJoin.cpp @@ -55,7 +55,7 @@ ColumnWithTypeAndName condtitionColumnToJoinable(const Block & block, const Stri if (!src_column_name.empty()) { auto join_mask = JoinCommon::getColumnAsMask(block, src_column_name); - if (!join_mask.isConstant()) + if (join_mask.hasData()) { for (size_t i = 0; i < res_size; ++i) null_map->getData()[i] = join_mask.isRowFiltered(i); From e0e60bb460a866cfd8bea6851af4e669ff560a57 Mon Sep 17 00:00:00 2001 From: vdimir Date: Wed, 28 Dec 2022 16:02:32 +0000 Subject: [PATCH 07/13] wip --- src/Interpreters/HashJoin.cpp | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/Interpreters/HashJoin.cpp b/src/Interpreters/HashJoin.cpp index e35b0e8a225..2ef7810e8dd 100644 --- a/src/Interpreters/HashJoin.cpp +++ b/src/Interpreters/HashJoin.cpp @@ -699,6 +699,12 @@ Block HashJoin::prepareRightBlock(const Block & block, const Block & saved_block if (sample_column.column->isNullable()) JoinCommon::convertColumnToNullable(column); + if (column.column->lowCardinality() && !sample_column.column->lowCardinality()) + { + column.column = column.column->convertToFullColumnIfLowCardinality(); + column.type = removeLowCardinality(column.type); + } + /// There's no optimization for right side const columns. Remove constness if any. column.column = recursiveRemoveSparse(column.column->convertToFullColumnIfConst()); structured_block.insert(std::move(column)); From 40bf9939b72f4ac99a873e53635842ee6ce398d2 Mon Sep 17 00:00:00 2001 From: vdimir Date: Fri, 30 Dec 2022 15:54:39 +0000 Subject: [PATCH 08/13] Update JoinSwitcher::switchJoin --- src/Interpreters/JoinSwitcher.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Interpreters/JoinSwitcher.cpp b/src/Interpreters/JoinSwitcher.cpp index 996fd1e4ac7..15702784d74 100644 --- a/src/Interpreters/JoinSwitcher.cpp +++ b/src/Interpreters/JoinSwitcher.cpp @@ -41,7 +41,7 @@ bool JoinSwitcher::addJoinedBlock(const Block & block, bool) bool JoinSwitcher::switchJoin() { HashJoin * hash_join = assert_cast(join.get()); - BlocksList right_blocks = hash_join->releaseJoinedBlocks(); + BlocksList right_blocks = hash_join->releaseJoinedBlocks(true); /// Destroy old join & create new one. join = std::make_shared(table_join, right_sample_block); From 1e9ccfb4b977da394bc71b7fe51124975e441dab Mon Sep 17 00:00:00 2001 From: vdimir Date: Mon, 16 Jan 2023 12:33:45 +0000 Subject: [PATCH 09/13] wip --- src/Interpreters/TableJoin.h | 2 +- tests/queries/1_stateful/00172_hits_joins.reference.j2 | 2 +- tests/queries/1_stateful/00172_hits_joins.sql.j2 | 5 +++-- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/src/Interpreters/TableJoin.h b/src/Interpreters/TableJoin.h index 9d03c9bd57b..84390adc0df 100644 --- a/src/Interpreters/TableJoin.h +++ b/src/Interpreters/TableJoin.h @@ -198,7 +198,7 @@ public: : size_limits(limits) , default_max_bytes(0) , join_use_nulls(use_nulls) - , join_algorithm(JoinAlgorithm::HASH) + , join_algorithm(JoinAlgorithm::DEFAULT) { clauses.emplace_back().key_names_right = key_names_right; table_join.kind = kind; diff --git a/tests/queries/1_stateful/00172_hits_joins.reference.j2 b/tests/queries/1_stateful/00172_hits_joins.reference.j2 index c357ede4c2c..1a43f1fb6ef 100644 --- a/tests/queries/1_stateful/00172_hits_joins.reference.j2 +++ b/tests/queries/1_stateful/00172_hits_joins.reference.j2 @@ -1,4 +1,4 @@ -{% for join_algorithm in ['hash', 'parallel_hash', 'full_sorting_merge'] -%} +{% for join_algorithm in ['hash', 'parallel_hash', 'full_sorting_merge', 'grace_hash'] -%} --- {{ join_algorithm }} --- 2014-03-17 1406958 265108 2014-03-19 1405797 261624 diff --git a/tests/queries/1_stateful/00172_hits_joins.sql.j2 b/tests/queries/1_stateful/00172_hits_joins.sql.j2 index 07ea899f536..4599d1d5a5d 100644 --- a/tests/queries/1_stateful/00172_hits_joins.sql.j2 +++ b/tests/queries/1_stateful/00172_hits_joins.sql.j2 @@ -1,6 +1,7 @@ -{% for join_algorithm in ['hash', 'parallel_hash', 'full_sorting_merge'] -%} +{% for join_algorithm in ['hash', 'parallel_hash', 'full_sorting_merge', 'grace_hash'] -%} -SET max_bytes_in_join = '{% if join_algorithm == 'grace_hash' %}20K{% else %}0{% endif %}'; +SET max_rows_in_join = '{% if join_algorithm == 'grace_hash' %}10K{% else %}0{% endif %}'; +SET grace_hash_join_initial_buckets = 4; SELECT '--- {{ join_algorithm }} ---'; From 60acd5e424397a8fbeb9ccc458482e529110d690 Mon Sep 17 00:00:00 2001 From: vdimir Date: Tue, 17 Jan 2023 12:21:35 +0000 Subject: [PATCH 10/13] fix clang tidy --- src/Interpreters/GraceHashJoin.cpp | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/src/Interpreters/GraceHashJoin.cpp b/src/Interpreters/GraceHashJoin.cpp index 0ec1a308553..46c60360b62 100644 --- a/src/Interpreters/GraceHashJoin.cpp +++ b/src/Interpreters/GraceHashJoin.cpp @@ -639,16 +639,18 @@ void GraceHashJoin::addJoinedBlockImpl(Block block) buckets_snapshot = rehashBuckets(buckets_snapshot.size() * 2); - Blocks current_blocks; - current_blocks.reserve(right_blocks.size()); - for (const auto & right_block : right_blocks) { - Blocks blocks = JoinCommon::scatterBlockByHash(right_key_names, right_block, buckets_snapshot.size()); - flushBlocksToBuckets(blocks, buckets_snapshot, bucket_index); - current_blocks.emplace_back(std::move(blocks[bucket_index])); - } + Blocks current_blocks; + current_blocks.reserve(right_blocks.size()); + for (const auto & right_block : right_blocks) + { + Blocks blocks = JoinCommon::scatterBlockByHash(right_key_names, right_block, buckets_snapshot.size()); + flushBlocksToBuckets(blocks, buckets_snapshot, bucket_index); + current_blocks.emplace_back(std::move(blocks[bucket_index])); + } - current_block = concatenateBlocks(std::move(current_blocks)); + current_block = concatenateBlocks(current_blocks); + } hash_join = makeInMemoryJoin(); From e30ab0874bb221b7175ed3bea7ecfe8d491938d7 Mon Sep 17 00:00:00 2001 From: vdimir Date: Fri, 20 Jan 2023 16:30:34 +0000 Subject: [PATCH 11/13] Review fixes --- src/Core/Block.cpp | 3 -- src/Interpreters/GraceHashJoin.cpp | 44 +++++++++---------- src/Interpreters/GraceHashJoin.h | 2 +- src/Interpreters/HashJoin.cpp | 6 +-- src/Interpreters/JoinUtils.h | 1 + .../01881_join_on_conditions_hash.sql.j2 | 2 + 6 files changed, 27 insertions(+), 31 deletions(-) diff --git a/src/Core/Block.cpp b/src/Core/Block.cpp index 6cb324ca084..2aa66c3e682 100644 --- a/src/Core/Block.cpp +++ b/src/Core/Block.cpp @@ -841,9 +841,6 @@ Block concatenateBlocks(const std::vector & blocks) if (blocks.empty()) return {}; - if (blocks.size() == 1) - return blocks.front(); - size_t num_rows = 0; for (const auto & block : blocks) num_rows += block.rows(); diff --git a/src/Interpreters/GraceHashJoin.cpp b/src/Interpreters/GraceHashJoin.cpp index 46c60360b62..4cf8f91d0a9 100644 --- a/src/Interpreters/GraceHashJoin.cpp +++ b/src/Interpreters/GraceHashJoin.cpp @@ -59,21 +59,22 @@ namespace Blocks blocks; size_t rows_read = 0; - while (true) + do { Block block = reader.read(); rows_read += block.rows(); if (!block) { eof = true; + if (blocks.size() == 1) + return blocks.front(); return concatenateBlocks(blocks); } blocks.push_back(std::move(block)); + } while (rows_read < result_block_size); - if (rows_read >= result_block_size) - break; - } - + if (blocks.size() == 1) + return blocks.front(); return concatenateBlocks(blocks); } @@ -121,20 +122,12 @@ class GraceHashJoin::FileBucket : boost::noncopyable public: using BucketLock = std::unique_lock; - struct Stats - { - TemporaryFileStream::Stat left; - TemporaryFileStream::Stat right; - }; - - explicit FileBucket(size_t bucket_index_, GraceHashJoin & grace_join_) + explicit FileBucket(size_t bucket_index_, TemporaryFileStream & left_file_, TemporaryFileStream & right_file_, Poco::Logger * log_) : idx{bucket_index_} - , left_file{ - grace_join_.tmp_data->createStream(grace_join_.left_sample_block)} - , right_file{ - grace_join_.tmp_data->createStream(grace_join_.prepareRightBlock(grace_join_.right_sample_block))} + , left_file{left_file_} + , right_file{right_file_} , state{State::WRITING_BLOCKS} - , log{grace_join_.log} + , log{log_} { } @@ -170,8 +163,6 @@ public: bool empty() const { return is_empty.load(); } - Stats getStat() const { return stats; } - AccumulatedBlockReader startJoining() { LOG_TRACE(log, "Joining file bucket {}", idx); @@ -179,8 +170,8 @@ public: std::unique_lock left_lock(left_file_mutex); std::unique_lock right_lock(right_file_mutex); - stats.left = left_file.finishWriting(); - stats.right = right_file.finishWriting(); + left_file.finishWriting(); + right_file.finishWriting(); state = State::JOINING_BLOCKS; @@ -232,7 +223,6 @@ private: std::atomic_bool is_empty = true; std::atomic state; - Stats stats; Poco::Logger * log; }; @@ -395,7 +385,10 @@ GraceHashJoin::Buckets GraceHashJoin::rehashBuckets(size_t to_size) void GraceHashJoin::addBucket(Buckets & destination) { - BucketPtr new_bucket = std::make_shared(destination.size(), *this); + auto & left_file = tmp_data->createStream(left_sample_block); + auto & right_file = tmp_data->createStream(prepareRightBlock(right_sample_block)); + + BucketPtr new_bucket = std::make_shared(destination.size(), left_file, right_file, log); destination.emplace_back(std::move(new_bucket)); } @@ -649,7 +642,10 @@ void GraceHashJoin::addJoinedBlockImpl(Block block) current_blocks.emplace_back(std::move(blocks[bucket_index])); } - current_block = concatenateBlocks(current_blocks); + if (current_blocks.size() == 1) + current_block = std::move(current_blocks.front()); + else + current_block = concatenateBlocks(current_blocks); } hash_join = makeInMemoryJoin(); diff --git a/src/Interpreters/GraceHashJoin.h b/src/Interpreters/GraceHashJoin.h index ff0f1a96ca2..781cbf24edf 100644 --- a/src/Interpreters/GraceHashJoin.h +++ b/src/Interpreters/GraceHashJoin.h @@ -95,7 +95,7 @@ private: /// Add right table block to the @join. Calls @rehash on overflow. void addJoinedBlockImpl(Block block); - /// Check that join satisifes limits on rows/bytes in table_join. + /// Check that join satisfies limits on rows/bytes in table_join. bool hasMemoryOverflow(size_t total_rows, size_t total_bytes) const; bool hasMemoryOverflow(const InMemoryJoinPtr & hash_join_) const; bool hasMemoryOverflow(const BlocksList & blocks) const; diff --git a/src/Interpreters/HashJoin.cpp b/src/Interpreters/HashJoin.cpp index 2ef7810e8dd..005f24cba2a 100644 --- a/src/Interpreters/HashJoin.cpp +++ b/src/Interpreters/HashJoin.cpp @@ -1774,10 +1774,10 @@ HashJoin::~HashJoin() { if (!data) { - LOG_DEBUG(log, "Join data has been released"); + LOG_TRACE(log, "Join data has been released"); return; } - LOG_DEBUG(log, "Join data is being destroyed, {} bytes and {} rows in hash table", getTotalByteCount(), getTotalRowCount()); + LOG_TRACE(log, "Join data is being destroyed, {} bytes and {} rows in hash table", getTotalByteCount(), getTotalRowCount()); } template @@ -2058,7 +2058,7 @@ void HashJoin::reuseJoinedData(const HashJoin & join) BlocksList HashJoin::releaseJoinedBlocks(bool restructure) { - LOG_DEBUG(log, "Join data is being released, {} bytes and {} rows in hash table", getTotalByteCount(), getTotalRowCount()); + LOG_TRACE(log, "Join data is being released, {} bytes and {} rows in hash table", getTotalByteCount(), getTotalRowCount()); BlocksList right_blocks = std::move(data->blocks); if (!restructure) diff --git a/src/Interpreters/JoinUtils.h b/src/Interpreters/JoinUtils.h index f4f5f5bdc8d..f112ca22e5b 100644 --- a/src/Interpreters/JoinUtils.h +++ b/src/Interpreters/JoinUtils.h @@ -21,6 +21,7 @@ using UInt8ColumnDataPtr = const ColumnUInt8::Container *; namespace JoinCommon { +/// Helper interface to work with mask from JOIN ON section class JoinMask { public: diff --git a/tests/queries/0_stateless/01881_join_on_conditions_hash.sql.j2 b/tests/queries/0_stateless/01881_join_on_conditions_hash.sql.j2 index 0d6bef7fadb..d2cc066a1b1 100644 --- a/tests/queries/0_stateless/01881_join_on_conditions_hash.sql.j2 +++ b/tests/queries/0_stateless/01881_join_on_conditions_hash.sql.j2 @@ -30,6 +30,8 @@ SELECT t1.key, t1.key2 FROM t1 INNER ALL JOIN t2 ON t1.id == t2.id AND t2.key == SELECT '--'; SELECT t1.key FROM t1 INNER ANY JOIN t2 ON t1.id == t2.id AND t2.key == t2.key2 AND t1.key == t1.key2; +SELECT t1.key FROM t1 INNER ANY JOIN t2 ON t1.id == t2.id AND t2.key == t2.key2 AND t1.key == t1.key2 AND 0; -- { serverError INVALID_JOIN_ON_EXPRESSION } +SELECT t1.key FROM t1 INNER ANY JOIN t2 ON t1.id == t2.id AND t2.key == t2.key2 AND t1.key == t1.key2 AND 1; -- { serverError INVALID_JOIN_ON_EXPRESSION } SELECT '--'; SELECT '333' = t1.key FROM t1 INNER ANY JOIN t2 ON t1.id == t2.id AND t2.key == t2.key2 AND t1.key == t1.key2 AND t2.id > 2; From dac86d48d2a82dcb5296b42b454b02d8f8e6a8ee Mon Sep 17 00:00:00 2001 From: vdimir Date: Mon, 23 Jan 2023 12:34:36 +0000 Subject: [PATCH 12/13] fixing join data was released --- src/Interpreters/GraceHashJoin.cpp | 17 +++++++++++++---- src/Interpreters/HashJoin.cpp | 10 +++++----- 2 files changed, 18 insertions(+), 9 deletions(-) diff --git a/src/Interpreters/GraceHashJoin.cpp b/src/Interpreters/GraceHashJoin.cpp index 4cf8f91d0a9..c2175eb9e45 100644 --- a/src/Interpreters/GraceHashJoin.cpp +++ b/src/Interpreters/GraceHashJoin.cpp @@ -394,7 +394,7 @@ void GraceHashJoin::addBucket(Buckets & destination) void GraceHashJoin::checkTypesOfKeys(const Block & block) const { - assert(hash_join); + chassert(hash_join); return hash_join->checkTypesOfKeys(block); } @@ -447,7 +447,7 @@ size_t GraceHashJoin::getTotalRowCount() const size_t GraceHashJoin::getTotalByteCount() const { std::lock_guard lock(hash_join_mutex); - assert(hash_join); + chassert(hash_join); return hash_join->getTotalByteCount(); } @@ -461,9 +461,14 @@ bool GraceHashJoin::alwaysReturnsEmptySet() const std::shared_lock lock(rehash_mutex); return std::all_of(buckets.begin(), buckets.end(), [](const auto & bucket) { return bucket->empty(); }); }(); - bool hash_join_is_empty = hash_join && hash_join->alwaysReturnsEmptySet(); - return hash_join_is_empty && file_buckets_are_empty; + if (!file_buckets_are_empty) + return false; + + chassert(hash_join); + bool hash_join_is_empty = hash_join->alwaysReturnsEmptySet(); + + return hash_join_is_empty; } IBlocksStreamPtr GraceHashJoin::getNonJoinedBlocks(const Block &, const Block &, UInt64) const @@ -621,6 +626,9 @@ void GraceHashJoin::addJoinedBlockImpl(Block block) { std::lock_guard lock(hash_join_mutex); + if (!hash_join) + hash_join = makeInMemoryJoin(); + hash_join->addJoinedBlock(current_block, /* check_limits = */ false); if (!hasMemoryOverflow(hash_join)) @@ -629,6 +637,7 @@ void GraceHashJoin::addJoinedBlockImpl(Block block) current_block = {}; auto right_blocks = hash_join->releaseJoinedBlocks(/* restructure */ false); + hash_join = nullptr; buckets_snapshot = rehashBuckets(buckets_snapshot.size() * 2); diff --git a/src/Interpreters/HashJoin.cpp b/src/Interpreters/HashJoin.cpp index 005f24cba2a..2d71fe13191 100644 --- a/src/Interpreters/HashJoin.cpp +++ b/src/Interpreters/HashJoin.cpp @@ -221,8 +221,8 @@ HashJoin::HashJoin(std::shared_ptr table_join_, const Block & right_s , right_sample_block(right_sample_block_) , log(&Poco::Logger::get("HashJoin")) { - LOG_DEBUG(log, "Datatype: {}, kind: {}, strictness: {}, right header: {}", data->type, kind, strictness, right_sample_block.dumpStructure()); - LOG_DEBUG(log, "Keys: {}", TableJoin::formatClauses(table_join->getClauses(), true)); + LOG_DEBUG(log, "({}) Datatype: {}, kind: {}, strictness: {}, right header: {}", fmt::ptr(this), data->type, kind, strictness, right_sample_block.dumpStructure()); + LOG_DEBUG(log, "({}) Keys: {}", fmt::ptr(this), TableJoin::formatClauses(table_join->getClauses(), true)); if (isCrossOrComma(kind)) { @@ -1774,10 +1774,10 @@ HashJoin::~HashJoin() { if (!data) { - LOG_TRACE(log, "Join data has been released"); + LOG_TRACE(log, "({}) Join data has been already released", fmt::ptr(this)); return; } - LOG_TRACE(log, "Join data is being destroyed, {} bytes and {} rows in hash table", getTotalByteCount(), getTotalRowCount()); + LOG_TRACE(log, "({}) Join data is being destroyed, {} bytes and {} rows in hash table", fmt::ptr(this), getTotalByteCount(), getTotalRowCount()); } template @@ -2058,7 +2058,7 @@ void HashJoin::reuseJoinedData(const HashJoin & join) BlocksList HashJoin::releaseJoinedBlocks(bool restructure) { - LOG_TRACE(log, "Join data is being released, {} bytes and {} rows in hash table", getTotalByteCount(), getTotalRowCount()); + LOG_TRACE(log, "({}) Join data is being released, {} bytes and {} rows in hash table", fmt::ptr(this), getTotalByteCount(), getTotalRowCount()); BlocksList right_blocks = std::move(data->blocks); if (!restructure) From 5be2b31e6733c725c938c44849d74df33065a74b Mon Sep 17 00:00:00 2001 From: vdimir Date: Mon, 23 Jan 2023 18:09:26 +0000 Subject: [PATCH 13/13] Review fixes --- src/Interpreters/GraceHashJoin.cpp | 14 +++++++------- src/Interpreters/GraceHashJoin.h | 1 + src/Interpreters/HashJoin.cpp | 14 -------------- src/Interpreters/HashJoin.h | 4 +--- 4 files changed, 9 insertions(+), 24 deletions(-) diff --git a/src/Interpreters/GraceHashJoin.cpp b/src/Interpreters/GraceHashJoin.cpp index c2175eb9e45..2f611d4386d 100644 --- a/src/Interpreters/GraceHashJoin.cpp +++ b/src/Interpreters/GraceHashJoin.cpp @@ -166,15 +166,15 @@ public: AccumulatedBlockReader startJoining() { LOG_TRACE(log, "Joining file bucket {}", idx); + { + std::unique_lock left_lock(left_file_mutex); + std::unique_lock right_lock(right_file_mutex); - std::unique_lock left_lock(left_file_mutex); - std::unique_lock right_lock(right_file_mutex); - - left_file.finishWriting(); - right_file.finishWriting(); - - state = State::JOINING_BLOCKS; + left_file.finishWriting(); + right_file.finishWriting(); + state = State::JOINING_BLOCKS; + } return AccumulatedBlockReader(right_file, right_file_mutex); } diff --git a/src/Interpreters/GraceHashJoin.h b/src/Interpreters/GraceHashJoin.h index 781cbf24edf..be03cee4a35 100644 --- a/src/Interpreters/GraceHashJoin.h +++ b/src/Interpreters/GraceHashJoin.h @@ -116,6 +116,7 @@ private: size_t getNumBuckets() const; Buckets getCurrentBuckets() const; + /// Structure block to store in the HashJoin according to sample_block. Block prepareRightBlock(const Block & block); Poco::Logger * log; diff --git a/src/Interpreters/HashJoin.cpp b/src/Interpreters/HashJoin.cpp index 2d71fe13191..d5f88a748de 100644 --- a/src/Interpreters/HashJoin.cpp +++ b/src/Interpreters/HashJoin.cpp @@ -676,20 +676,6 @@ void HashJoin::initRightBlockStructure(Block & saved_block_sample) } } -Block HashJoin::structureRightBlock(const Block & block) const -{ - Block structured_block; - for (const auto & sample_column : savedBlockSample().getColumnsWithTypeAndName()) - { - ColumnWithTypeAndName column = block.getByName(sample_column.name); - if (sample_column.column->isNullable()) - JoinCommon::convertColumnToNullable(column); - structured_block.insert(column); - } - - return structured_block; -} - Block HashJoin::prepareRightBlock(const Block & block, const Block & saved_block_sample_) { Block structured_block; diff --git a/src/Interpreters/HashJoin.h b/src/Interpreters/HashJoin.h index 7b4fa1d5344..b29b6e617c8 100644 --- a/src/Interpreters/HashJoin.h +++ b/src/Interpreters/HashJoin.h @@ -356,9 +356,7 @@ public: RightTableDataPtr getJoinedData() const { return data; } BlocksList releaseJoinedBlocks(bool restructure = false); - Block structureRightBlock(const Block & block) const; - - /// Modify (structure) right block to save it in block list + /// Modify right block (update structure according to sample block) to save it in block list static Block prepareRightBlock(const Block & block, const Block & saved_block_sample_); Block prepareRightBlock(const Block & block) const;