From 57a35cae3334ead5b8290d27cd3a9967dc1f6ddf Mon Sep 17 00:00:00 2001 From: vdimir Date: Tue, 20 Dec 2022 12:50:27 +0000 Subject: [PATCH] wip --- src/Core/Block.cpp | 3 + src/Interpreters/GraceHashJoin.cpp | 133 ++++++++++++++++++----------- src/Interpreters/GraceHashJoin.h | 7 +- src/Interpreters/HashJoin.cpp | 63 ++++++++++---- src/Interpreters/HashJoin.h | 13 +-- src/Interpreters/JoinUtils.cpp | 11 ++- 6 files changed, 149 insertions(+), 81 deletions(-) diff --git a/src/Core/Block.cpp b/src/Core/Block.cpp index 2aa66c3e682..6cb324ca084 100644 --- a/src/Core/Block.cpp +++ b/src/Core/Block.cpp @@ -841,6 +841,9 @@ Block concatenateBlocks(const std::vector & blocks) if (blocks.empty()) return {}; + if (blocks.size() == 1) + return blocks.front(); + size_t num_rows = 0; for (const auto & block : blocks) num_rows += block.rows(); diff --git a/src/Interpreters/GraceHashJoin.cpp b/src/Interpreters/GraceHashJoin.cpp index bac61b8205f..aa52ae164e5 100644 --- a/src/Interpreters/GraceHashJoin.cpp +++ b/src/Interpreters/GraceHashJoin.cpp @@ -41,7 +41,7 @@ namespace public: AccumulatedBlockReader(TemporaryFileStream & reader_, std::mutex & mutex_, - size_t result_block_size_ = DEFAULT_BLOCK_SIZE * 8) + size_t result_block_size_ = 0) : reader(reader_) , mutex(mutex_) , result_block_size(result_block_size_) @@ -59,7 +59,7 @@ namespace Blocks blocks; size_t rows_read = 0; - while (rows_read < result_block_size) + while (true) { Block block = reader.read(); rows_read += block.rows(); @@ -69,6 +69,9 @@ namespace return concatenateBlocks(blocks); } blocks.push_back(std::move(block)); + + if (rows_read >= result_block_size) + break; } return concatenateBlocks(blocks); @@ -124,15 +127,14 @@ public: TemporaryFileStream::Stat right; }; - explicit FileBucket(size_t bucket_index_, - TemporaryFileStream & left_file_, - TemporaryFileStream & right_file_, - Poco::Logger * log_) + explicit FileBucket(size_t bucket_index_, GraceHashJoin & grace_join_) : idx{bucket_index_} - , left_file{left_file_} - , right_file{right_file_} + , left_file{ + grace_join_.tmp_data->createStream(grace_join_.left_sample_block)} + , right_file{ + grace_join_.tmp_data->createStream(grace_join_.prepareRightBlock(grace_join_.right_sample_block))} , state{State::WRITING_BLOCKS} - , log(log_) + , log{grace_join_.log} { } @@ -174,14 +176,13 @@ public: { LOG_TRACE(log, "Joining file bucket {}", idx); - { - std::unique_lock left_lock(left_file_mutex); - std::unique_lock right_lock(right_file_mutex); + std::unique_lock left_lock(left_file_mutex); + std::unique_lock right_lock(right_file_mutex); - stats.left = left_file.finishWriting(); - stats.right = right_file.finishWriting(); - state = State::JOINING_BLOCKS; - } + stats.left = left_file.finishWriting(); + stats.right = right_file.finishWriting(); + + state = State::JOINING_BLOCKS; return AccumulatedBlockReader(right_file, right_file_mutex); } @@ -238,15 +239,17 @@ private: namespace { + template -void flushBlocksToBuckets(Blocks & blocks, const GraceHashJoin::Buckets & buckets) +void flushBlocksToBuckets(Blocks & blocks, const GraceHashJoin::Buckets & buckets, size_t except_index = 0) { chassert(blocks.size() == buckets.size()); retryForEach( generateRandomPermutation(1, buckets.size()), // skipping 0 block, since we join it in memory w/o spilling on disk [&](size_t i) { - if (!blocks[i].rows()) + /// Skip empty and current bucket + if (!blocks[i].rows() || i == except_index) return true; bool flushed = false; @@ -281,6 +284,7 @@ GraceHashJoin::GraceHashJoin( , right_key_names(table_join->getOnlyClause().key_names_right) , tmp_data(std::make_unique(tmp_data_, CurrentMetrics::TemporaryFilesForJoin)) , hash_join(makeInMemoryJoin()) + , hash_join_sample_block(hash_join->savedBlockSample()) { if (!GraceHashJoin::isSupported(table_join)) throw Exception(ErrorCodes::NOT_IMPLEMENTED, "GraceHashJoin is not supported for this join type"); @@ -303,7 +307,7 @@ void GraceHashJoin::initBuckets() if (buckets.empty()) throw Exception(ErrorCodes::LOGICAL_ERROR, "No buckets created"); - LOG_TRACE(log, "Initialize {} buckets", buckets.size()); + LOG_TRACE(log, "Initialize {} bucket{}", buckets.size(), buckets.size() > 1 ? "s" : ""); current_bucket = buckets.front().get(); current_bucket->startJoining(); @@ -323,10 +327,26 @@ bool GraceHashJoin::addJoinedBlock(const Block & block, bool /*check_limits*/) throw Exception(ErrorCodes::LOGICAL_ERROR, "GraceHashJoin is not initialized"); Block materialized = materializeBlock(block); - addJoinedBlockImpl(materialized); + addJoinedBlockImpl(std::move(materialized)); return true; } +bool GraceHashJoin::hasMemoryOverflow(const Block & block) const +{ + /// One row can't be split, avoid loop + if (block.rows() < 2) + return false; + + bool has_overflow = !table_join->sizeLimits().softCheck(block.rows(), block.allocatedBytes()); + + if (has_overflow) + LOG_TRACE(log, "GraceHashJoin has memory overflow, block {} / {} bytes, {} / {} rows", + ReadableSize(block.allocatedBytes()), ReadableSize(table_join->sizeLimits().max_bytes), + block.rows(), table_join->sizeLimits().max_rows); + + return has_overflow; +} + bool GraceHashJoin::hasMemoryOverflow() const { /// One row can't be split, avoid loop @@ -339,7 +359,7 @@ bool GraceHashJoin::hasMemoryOverflow() const bool has_overflow = !table_join->sizeLimits().softCheck(total_rows, total_bytes); if (has_overflow) - LOG_TRACE(log, "GraceHashJoin has memory overflow {} / {} bytes, {} / {} rows", + LOG_TRACE(log, "GraceHashJoin has memory overflow, hash {} / {} bytes, {} / {} rows", ReadableSize(total_bytes), ReadableSize(table_join->sizeLimits().max_bytes), total_rows, table_join->sizeLimits().max_rows); return has_overflow; @@ -353,7 +373,7 @@ GraceHashJoin::Buckets GraceHashJoin::rehashBuckets(size_t to_size) if (to_size <= current_size) return buckets; - assert(isPowerOf2(to_size)); + chassert(isPowerOf2(to_size)); if (to_size > max_num_buckets) { @@ -373,8 +393,7 @@ GraceHashJoin::Buckets GraceHashJoin::rehashBuckets(size_t to_size) void GraceHashJoin::addBucket(Buckets & destination) { - BucketPtr new_bucket = std::make_shared( - destination.size(), tmp_data->createStream(left_sample_block), tmp_data->createStream(right_sample_block), log); + BucketPtr new_bucket = std::make_shared(destination.size(), *this); destination.emplace_back(std::move(new_bucket)); } @@ -538,17 +557,11 @@ IBlocksStreamPtr GraceHashJoin::getDelayedBlocks() if (hash_join) { - auto right_blocks = hash_join->releaseJoinedBlocks(); - Blocks blocks = JoinCommon::scatterBlockByHash(right_key_names, right_blocks, buckets.size()); - - for (size_t i = 0; i < blocks.size(); ++i) + auto right_blocks = hash_join->releaseJoinedBlocks(/* restructure */ false); + for (auto & block : right_blocks) { - if (blocks[i].rows() == 0 || i == bucket_idx) - continue; - - if (i < bucket_idx) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected bucket index {} when current bucket is {}", i, bucket_idx); - buckets[i]->addRightBlock(blocks[i]); + Blocks blocks = JoinCommon::scatterBlockByHash(right_key_names, block, buckets.size()); + flushBlocksToBuckets(blocks, buckets, bucket_idx); } } @@ -591,42 +604,58 @@ GraceHashJoin::InMemoryJoinPtr GraceHashJoin::makeInMemoryJoin() return std::make_unique(table_join, right_sample_block, any_take_last_row); } +Block GraceHashJoin::prepareRightBlock(const Block & block) +{ + return HashJoin::prepareRightBlock(block, hash_join_sample_block); +} + void GraceHashJoin::addJoinedBlockImpl(Block block) { Buckets buckets_snapshot = getCurrentBuckets(); - Blocks blocks = JoinCommon::scatterBlockByHash(right_key_names, block, buckets_snapshot.size()); size_t bucket_index = current_bucket->idx; + Block current_block; + + { + Blocks blocks = JoinCommon::scatterBlockByHash(right_key_names, block, buckets_snapshot.size()); + flushBlocksToBuckets(blocks, buckets_snapshot, bucket_index); + current_block = std::move(blocks[bucket_index]); + } // Add block to the in-memory join - if (blocks[bucket_index].rows() > 0) + if (current_block.rows() > 0) { std::lock_guard lock(hash_join_mutex); - hash_join->addJoinedBlock(std::move(blocks[bucket_index]), /* check_limits = */ false); - bool overflow = hasMemoryOverflow(); + hash_join->addJoinedBlock(current_block, /* check_limits = */ false); - if (overflow) + if (!hasMemoryOverflow()) + return; + + current_block = {}; + + auto right_blocks = hash_join->releaseJoinedBlocks(/* restructure */ false); + + for (const auto & right_block : right_blocks) { - auto right_blocks = hash_join->releaseJoinedBlocks(); - right_blocks.pop_back(); - - for (const auto & right_block : right_blocks) - blocks.push_back(right_block); + Blocks blocks = JoinCommon::scatterBlockByHash(right_key_names, right_block, buckets_snapshot.size()); + flushBlocksToBuckets(blocks, buckets_snapshot, bucket_index); + current_block = blocks[bucket_index]; } - while (overflow) + while (hasMemoryOverflow(current_block)) { buckets_snapshot = rehashBuckets(buckets_snapshot.size() * 2); - blocks = JoinCommon::scatterBlockByHash(right_key_names, blocks, buckets_snapshot.size()); - hash_join = makeInMemoryJoin(); - hash_join->addJoinedBlock(blocks[bucket_index], /* check_limits = */ false); - overflow = hasMemoryOverflow(); + Blocks blocks = JoinCommon::scatterBlockByHash(right_key_names, current_block, buckets_snapshot.size()); + flushBlocksToBuckets(blocks, buckets_snapshot, bucket_index); + current_block = blocks[bucket_index]; } - blocks[bucket_index].clear(); - } - flushBlocksToBuckets(blocks, buckets_snapshot); + hash_join = makeInMemoryJoin(); + + if (current_block.rows() > 0) + hash_join->addJoinedBlock(current_block, /* check_limits = */ false); + } } size_t GraceHashJoin::getNumBuckets() const diff --git a/src/Interpreters/GraceHashJoin.h b/src/Interpreters/GraceHashJoin.h index aa29838fd27..3076c8870d7 100644 --- a/src/Interpreters/GraceHashJoin.h +++ b/src/Interpreters/GraceHashJoin.h @@ -95,8 +95,10 @@ private: /// Add right table block to the @join. Calls @rehash on overflow. void addJoinedBlockImpl(Block block); - /// Check that @join satisifes limits on rows/bytes in @table_join. + /// Check that join satisifes limits on rows/bytes in table_join. bool hasMemoryOverflow() const; + /// Check that block satisifes limits on rows/bytes in table_join. + bool hasMemoryOverflow(const Block & block) const; /// Create new bucket at the end of @destination. void addBucket(Buckets & destination); @@ -114,6 +116,8 @@ private: size_t getNumBuckets() const; Buckets getCurrentBuckets() const; + Block prepareRightBlock(const Block & block); + Poco::Logger * log; ContextPtr context; std::shared_ptr table_join; @@ -136,6 +140,7 @@ private: mutable std::mutex current_bucket_mutex; InMemoryJoinPtr hash_join; + Block hash_join_sample_block; mutable std::mutex hash_join_mutex; }; diff --git a/src/Interpreters/HashJoin.cpp b/src/Interpreters/HashJoin.cpp index 5ff7d0fe7f8..e7ad3f30c4a 100644 --- a/src/Interpreters/HashJoin.cpp +++ b/src/Interpreters/HashJoin.cpp @@ -469,6 +469,9 @@ bool HashJoin::alwaysReturnsEmptySet() const size_t HashJoin::getTotalRowCount() const { + if (!data) + return 0; + size_t res = 0; if (data->type == Type::CROSS) @@ -489,6 +492,9 @@ size_t HashJoin::getTotalRowCount() const size_t HashJoin::getTotalByteCount() const { + if (!data) + return 0; + #ifdef NDEBUG size_t debug_blocks_allocated_size = 0; for (const auto & block : data->blocks) @@ -670,34 +676,44 @@ void HashJoin::initRightBlockStructure(Block & saved_block_sample) } } -Block HashJoin::structureRightBlock(const Block & block) const +Block HashJoin::prepareRightBlock(const Block & block, const Block & saved_block_sample_) { Block structured_block; - for (const auto & sample_column : savedBlockSample().getColumnsWithTypeAndName()) + for (const auto & sample_column : saved_block_sample_.getColumnsWithTypeAndName()) { ColumnWithTypeAndName column = block.getByName(sample_column.name); if (sample_column.column->isNullable()) JoinCommon::convertColumnToNullable(column); - structured_block.insert(column); - } + /// There's no optimization for right side const columns. Remove constness if any. + column.column = recursiveRemoveSparse(column.column->convertToFullColumnIfConst()); + structured_block.insert(std::move(column)); + } return structured_block; } +Block HashJoin::prepareRightBlock(const Block & block) const +{ + return prepareRightBlock(block, savedBlockSample()); +} + bool HashJoin::addJoinedBlock(const Block & source_block, bool check_limits) { + if (!data) + throw Exception("Join data was released", ErrorCodes::LOGICAL_ERROR); + + Block structured_block = source_block; + prepareRightBlock(structured_block); + /// RowRef::SizeT is uint32_t (not size_t) for hash table Cell memory efficiency. /// It's possible to split bigger blocks and insert them by parts here. But it would be a dead code. - if (unlikely(source_block.rows() > std::numeric_limits::max())) - throw Exception("Too many rows in right table block for HashJoin: " + toString(source_block.rows()), ErrorCodes::NOT_IMPLEMENTED); + if (unlikely(structured_block.rows() > std::numeric_limits::max())) + throw Exception("Too many rows in right table block for HashJoin: " + toString(structured_block.rows()), ErrorCodes::NOT_IMPLEMENTED); - /// There's no optimization for right side const columns. Remove constness if any. - Block block = materializeBlock(source_block); - size_t rows = block.rows(); + size_t rows = structured_block.rows(); - ColumnRawPtrMap all_key_columns = JoinCommon::materializeColumnsInplaceMap(block, table_join->getAllNames(JoinTableSide::Right)); + ColumnRawPtrMap all_key_columns = JoinCommon::materializeColumnsInplaceMap(structured_block, table_join->getAllNames(JoinTableSide::Right)); - Block structured_block = structureRightBlock(block); size_t total_rows = 0; size_t total_bytes = 0; { @@ -733,14 +749,14 @@ bool HashJoin::addJoinedBlock(const Block & source_block, bool check_limits) save_nullmap |= (*null_map)[i]; } - auto join_mask_col = JoinCommon::getColumnAsMask(block, onexprs[onexpr_idx].condColumnNames().second); + auto join_mask_col = JoinCommon::getColumnAsMask(structured_block, onexprs[onexpr_idx].condColumnNames().second); /// Save blocks that do not hold conditions in ON section ColumnUInt8::MutablePtr not_joined_map = nullptr; if (!multiple_disjuncts && isRightOrFull(kind) && !join_mask_col.isConstant()) { const auto & join_mask = join_mask_col.getData(); /// Save rows that do not hold conditions - not_joined_map = ColumnUInt8::create(block.rows(), 0); + not_joined_map = ColumnUInt8::create(rows, 0); for (size_t i = 0, sz = join_mask->size(); i < sz; ++i) { /// Condition hold, do not save row @@ -1697,7 +1713,7 @@ void HashJoin::checkTypesOfKeys(const Block & block) const void HashJoin::joinBlock(Block & block, ExtraBlockPtr & not_processed) { - if (data->released) + if (!data) throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot join after data has been released"); for (const auto & onexpr : table_join->getClauses()) @@ -1794,7 +1810,10 @@ class NotJoinedHash final : public NotJoinedBlocks::RightColumnsFiller public: NotJoinedHash(const HashJoin & parent_, UInt64 max_block_size_) : parent(parent_), max_block_size(max_block_size_), current_block_start(0) - {} + { + if (parent.data == nullptr) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot join after data has been released"); + } Block getEmptyBlock() override { return parent.savedBlockSample().cloneEmpty(); } @@ -1991,7 +2010,6 @@ IBlocksStreamPtr HashJoin::getNonJoinedBlocks(const Block & left_sample_block, size_t left_columns_count = left_sample_block.columns(); auto non_joined = std::make_unique>(*this, max_block_size); return std::make_unique(std::move(non_joined), result_sample_block, left_columns_count, *table_join); - } else { @@ -2020,10 +2038,18 @@ void HashJoin::reuseJoinedData(const HashJoin & join) } } -BlocksList HashJoin::releaseJoinedBlocks() +BlocksList HashJoin::releaseJoinedBlocks(bool restructure) { BlocksList right_blocks = std::move(data->blocks); - data->released = true; + if (!restructure) + { + data.reset(); + return right_blocks; + } + + data->maps.clear(); + data->blocks_nullmaps.clear(); + BlocksList restored_blocks; /// names to positions optimization @@ -2052,6 +2078,7 @@ BlocksList HashJoin::releaseJoinedBlocks() restored_blocks.emplace_back(std::move(restored_block)); } + data.reset(); return restored_blocks; } diff --git a/src/Interpreters/HashJoin.h b/src/Interpreters/HashJoin.h index cb8f3e83022..b348829a990 100644 --- a/src/Interpreters/HashJoin.h +++ b/src/Interpreters/HashJoin.h @@ -338,7 +338,6 @@ public: /// Additional data - strings for string keys and continuation elements of single-linked lists of references to rows. Arena pool; - bool released = false; size_t blocks_allocated_size = 0; size_t blocks_nullmaps_allocated_size = 0; }; @@ -355,7 +354,13 @@ public: void reuseJoinedData(const HashJoin & join); RightTableDataPtr getJoinedData() const { return data; } - BlocksList releaseJoinedBlocks(); + BlocksList releaseJoinedBlocks(bool restructure = false); + + /// Modify (structure) right block to save it in block list + static Block prepareRightBlock(const Block & block, const Block & saved_block_sample_); + Block prepareRightBlock(const Block & block) const; + + const Block & savedBlockSample() const { return data->sample_block; } bool isUsed(size_t off) const { return used_flags.getUsedSafe(off); } bool isUsed(const Block * block_ptr, size_t row_idx) const { return used_flags.getUsedSafe(block_ptr, row_idx); } @@ -407,10 +412,6 @@ private: void dataMapInit(MapsVariant &); - const Block & savedBlockSample() const { return data->sample_block; } - - /// Modify (structure) right block to save it in block list - Block structureRightBlock(const Block & stored_block) const; void initRightBlockStructure(Block & saved_block_sample); template diff --git a/src/Interpreters/JoinUtils.cpp b/src/Interpreters/JoinUtils.cpp index a4ec64ab70e..ce151ccaa6a 100644 --- a/src/Interpreters/JoinUtils.cpp +++ b/src/Interpreters/JoinUtils.cpp @@ -332,7 +332,11 @@ ColumnRawPtrMap materializeColumnsInplaceMap(Block & block, const Names & names) for (const auto & column_name : names) { auto & column = block.getByName(column_name); - column.column = recursiveRemoveLowCardinality(column.column->convertToFullColumnIfConst()); + + column.column = column.column->convertToFullColumnIfConst(); + column.column = recursiveRemoveLowCardinality(column.column); + column.column = recursiveRemoveSparse(column.column); + column.type = recursiveRemoveLowCardinality(column.type); ptrs[column_name] = column.column.get(); } @@ -639,9 +643,8 @@ Blocks scatterBlockByHash(const Strings & key_columns_names, const Block & block { if (num_shards == 0) throw Exception(ErrorCodes::LOGICAL_ERROR, "Number of shards must be positive"); - UNUSED(scatterBlockByHashPow2); - // if (likely(isPowerOf2(num_shards))) - // return scatterBlockByHashPow2(key_columns_names, block, num_shards); + if (likely(isPowerOf2(num_shards))) + return scatterBlockByHashPow2(key_columns_names, block, num_shards); return scatterBlockByHashGeneric(key_columns_names, block, num_shards); }