diff --git a/src/Interpreters/ConcurrentHashJoin.cpp b/src/Interpreters/ConcurrentHashJoin.cpp index cafc809f71d..54d54680bee 100644 --- a/src/Interpreters/ConcurrentHashJoin.cpp +++ b/src/Interpreters/ConcurrentHashJoin.cpp @@ -161,7 +161,7 @@ bool ConcurrentHashJoin::alwaysReturnsEmptySet() const return true; } -std::shared_ptr ConcurrentHashJoin::getNonJoinedBlocks( +std::unique_ptr ConcurrentHashJoin::getNonJoinedBlocks( const Block & /*left_sample_block*/, const Block & /*result_sample_block*/, UInt64 /*max_block_size*/) const { if (!JoinCommon::hasNonJoinedBlocks(*table_join)) diff --git a/src/Interpreters/ConcurrentHashJoin.h b/src/Interpreters/ConcurrentHashJoin.h index 705e6ba81b7..257afa3d8c9 100644 --- a/src/Interpreters/ConcurrentHashJoin.h +++ b/src/Interpreters/ConcurrentHashJoin.h @@ -47,7 +47,7 @@ public: size_t getTotalByteCount() const override; bool alwaysReturnsEmptySet() const override; bool supportParallelJoin() const override { return true; } - std::shared_ptr + std::unique_ptr getNonJoinedBlocks(const Block & left_sample_block, const Block & result_sample_block, UInt64 max_block_size) const override; private: diff --git a/src/Interpreters/DirectJoin.h b/src/Interpreters/DirectJoin.h index 8e82b59da02..200db685ef7 100644 --- a/src/Interpreters/DirectJoin.h +++ b/src/Interpreters/DirectJoin.h @@ -42,7 +42,7 @@ public: virtual bool isFilled() const override { return true; } - virtual std::shared_ptr + virtual std::unique_ptr getNonJoinedBlocks(const Block &, const Block &, UInt64) const override { return nullptr; diff --git a/src/Interpreters/ExpressionAnalyzer.cpp b/src/Interpreters/ExpressionAnalyzer.cpp index 9c7a915da78..9d5d6211641 100644 --- a/src/Interpreters/ExpressionAnalyzer.cpp +++ b/src/Interpreters/ExpressionAnalyzer.cpp @@ -1017,6 +1017,12 @@ static std::shared_ptr chooseJoinAlgorithm( const auto & settings = context->getSettings(); Block left_sample_block(left_sample_columns); + for (auto & column : left_sample_block) + { + if (!column.column) + column.column = column.type->createColumn(); + } + Block right_sample_block = joined_plan->getCurrentDataStream().header; std::vector tried_algorithms; diff --git a/src/Interpreters/FullSortingMergeJoin.h b/src/Interpreters/FullSortingMergeJoin.h index 14c81259159..efe9c4ad21b 100644 --- a/src/Interpreters/FullSortingMergeJoin.h +++ b/src/Interpreters/FullSortingMergeJoin.h @@ -100,7 +100,7 @@ public: bool alwaysReturnsEmptySet() const override { return false; } - std::shared_ptr + std::unique_ptr getNonJoinedBlocks(const Block & /* left_sample_block */, const Block & /* result_sample_block */, UInt64 /* max_block_size */) const override { throw Exception(ErrorCodes::LOGICAL_ERROR, "FullSortingMergeJoin::getNonJoinedBlocks should not be called"); diff --git a/src/Interpreters/GraceHashJoin.cpp b/src/Interpreters/GraceHashJoin.cpp index 449dfc411a9..04cce746667 100644 --- a/src/Interpreters/GraceHashJoin.cpp +++ b/src/Interpreters/GraceHashJoin.cpp @@ -17,6 +17,8 @@ #include #include +#include + namespace CurrentMetrics { extern const Metric TemporaryFilesForJoin; @@ -35,6 +37,37 @@ namespace ErrorCodes namespace { + + void debugBlock(const Block & block, size_t line, std::string msg = "", bool st = false) + { + size_t count = 0; + String colname; + + if (block.has("key")) + colname = "key"; + if (block.has("t2.key")) + colname = "t2.key"; + + if (colname.empty()) + return; + + auto col = block.getByName(colname).column; + for (size_t i = 0; i < col->size(); ++i) + { + if (col->get64(i) == 31) + count++; + } + + if (count > 0) + { + LOG_DEBUG(&Poco::Logger::get("XXXX"), "{}:{} AAA {}: {} | {}", __FILE__, msg, line, count, block.dumpStructure()); + if (st) + { + LOG_DEBUG(&Poco::Logger::get("XXXX"), "{}:{} {} : {}", __FILE__, __LINE__, line, StackTrace().toString()); + } + } + } + class BlocksAccumulator { public: @@ -93,6 +126,8 @@ namespace for (; !res; res = accumulator.peek()) { Block tmp = reader.read(); + debugBlock(tmp, __LINE__); + if (!tmp) { eof = true; @@ -100,6 +135,7 @@ namespace } accumulator.addBlock(std::move(tmp)); } + debugBlock(res, __LINE__); return res; } @@ -145,35 +181,66 @@ class GraceHashJoin::FileBucket : boost::noncopyable }; public: - explicit FileBucket(size_t bucket_index_, TemporaryFileStream & left_file_, TemporaryFileStream & right_file_, const FileBucket * parent_) + struct Stats + { + TemporaryFileStream::Stat left; + TemporaryFileStream::Stat right; + }; + + explicit FileBucket(size_t bucket_index_, + TemporaryFileStream & left_file_, + TemporaryFileStream & right_file_, + const FileBucket * parent_, + Poco::Logger * log_) : bucket_index{bucket_index_} , left_file{left_file_} , right_file{right_file_} , parent{parent_} , state{State::WRITING_BLOCKS} + , log(log_) { } void addLeftBlock(const Block & block) { - return addBlockImpl(block, left_file_mutex, left_file); + std::unique_lock lock(left_file_mutex); + addBlockImpl(block, left_file, lock); } - void addRightBlock(const Block & block) { return addBlockImpl(block, right_file_mutex, right_file); } - bool tryAddLeftBlock(const Block & block) { return tryAddBlockImpl(block, left_file_mutex, left_file); } - bool tryAddRightBlock(const Block & block) { return tryAddBlockImpl(block, right_file_mutex, right_file); } + void addRightBlock(const Block & block) + { + std::unique_lock lock(right_file_mutex); + addBlockImpl(block, right_file, lock); + } + + bool tryAddLeftBlock(const Block & block) + { + std::unique_lock lock(left_file_mutex, std::try_to_lock); + return addBlockImpl(block, left_file, lock); + } + + bool tryAddRightBlock(const Block & block) + { + + std::unique_lock lock(right_file_mutex, std::try_to_lock); + return addBlockImpl(block, right_file, lock); + } void startJoining() { + LOG_TRACE(log, "Joining file bucket {}", bucket_index); + ensureState(State::JOINING_BLOCKS); - left_file.finishWriting(); - right_file.finishWriting(); + stats.left = left_file.finishWriting(); + stats.right = right_file.finishWriting(); } size_t index() const { return bucket_index; } bool finished() const { return state.load() == State::FINISHED; } bool empty() const { return is_empty.load(); } + Stats getStat() const { return stats; } + bool tryLockForJoining() { if (parent && !parent->finished()) @@ -183,7 +250,14 @@ public: return state.compare_exchange_strong(expected, State::JOINING_BLOCKS); } - void finish() { transition(State::JOINING_BLOCKS, State::FINISHED); } + void finish() + { + LOG_TRACE(log, "XXXX Finish joining file bucket {}, size: {} | {}", + bucket_index, stats.left.num_rows, stats.right.num_rows); + + state.exchange(State::FINISHED); + // transition(State::JOINING_BLOCKS, State::FINISHED); + } MergingBlockReader openLeftTableReader() const { return MergingBlockReader(left_file); } @@ -191,29 +265,28 @@ public: std::mutex & joinMutex() { return join_mutex; } + ~FileBucket() + { + + LOG_TRACE(log, "XXXX Destroying file bucket {} - {}({}): rows: {} | {}", + bucket_index, fmt::ptr(this), fmt::ptr(parent), stats.left.num_rows, stats.right.num_rows); + } + private: - bool tryAddBlockImpl(const Block & block, std::mutex & mutex, TemporaryFileStream & writer) + bool addBlockImpl(const Block & block, TemporaryFileStream & writer, std::unique_lock & lock) { if (block.rows()) is_empty = false; ensureState(State::WRITING_BLOCKS); - std::unique_lock lock{mutex, std::try_to_lock}; if (!lock.owns_lock()) return false; + debugBlock(block, __LINE__, fmt::format("adding to {}", bucket_index), true); + writer.write(block); return true; } - void addBlockImpl(const Block & block, std::mutex & mutex, TemporaryFileStream & writer) - { - if (block.rows()) - is_empty = false; - ensureState(State::WRITING_BLOCKS); - std::unique_lock lock{mutex}; - writer.write(block); - } - void transition(State expected, State desired) { State prev = state.exchange(desired); @@ -240,6 +313,10 @@ private: const FileBucket * parent; std::atomic state; + + Stats stats; + + Poco::Logger * log; }; class GraceHashJoin::InMemoryJoin @@ -278,6 +355,8 @@ GraceHashJoin::GraceHashJoin( , any_take_last_row{any_take_last_row_} , max_num_buckets{context->getSettingsRef().grace_hash_join_max_buckets} , max_block_size{context->getSettingsRef().max_block_size} + , left_key_names(table_join->getOnlyClause().key_names_left) + , right_key_names(table_join->getOnlyClause().key_names_right) , first_bucket{makeInMemoryJoin()} , tmp_data(std::make_unique(tmp_data_, CurrentMetrics::TemporaryFilesForJoin)) { @@ -310,7 +389,7 @@ bool GraceHashJoin::addJoinedBlock(const Block & block, bool /*check_limits*/) return true; } -void GraceHashJoin::rehashInMemoryJoin(InMemoryJoinPtr & join, const BucketsSnapshot & snapshot, size_t bucket) +void GraceHashJoin::rehashInMemoryJoin(InMemoryJoinPtr & join, const Buckets & buckets_snapshot, size_t bucket) { std::lock_guard lock{join->mutex}; @@ -319,12 +398,12 @@ void GraceHashJoin::rehashInMemoryJoin(InMemoryJoinPtr & join, const BucketsSnap for (const Block & block : right_blocks) { - Blocks blocks = scatterBlock(block, snapshot->size()); + Blocks blocks = JoinCommon::scatterBlockByHash(right_key_names, block, buckets_snapshot.size()); join->join->addJoinedBlock(blocks[bucket], /* check_limits = */ false); - for (size_t i = 1; i < snapshot->size(); ++i) + for (size_t i = 1; i < buckets_snapshot.size(); ++i) { if (i != bucket && blocks[i].rows()) - snapshot->at(i)->addRightBlock(blocks[i]); + buckets_snapshot[i]->addRightBlock(blocks[i]); } } } @@ -338,9 +417,9 @@ bool GraceHashJoin::fitsInMemory(InMemoryJoin * join) const return table_join->sizeLimits().softCheck(join->join->getTotalRowCount(), join->join->getTotalByteCount()); } -void GraceHashJoin::rehashBuckets() +GraceHashJoin::Buckets GraceHashJoin::rehashBuckets() { - std::scoped_lock lock{rehash_mutex}; + std::unique_lock lock(rehash_mutex); size_t current_size = buckets.size(); size_t next_size = current_size * 2; @@ -358,19 +437,26 @@ void GraceHashJoin::rehashBuckets() { addBucket(buckets, buckets[i % current_size].get()); } + return buckets; } void GraceHashJoin::startReadingDelayedBlocks() { // Drop in-memory hash join for the first bucket to reduce memory footprint. first_bucket.reset(); + std::unique_lock lock(rehash_mutex); + LOG_DEBUG(&Poco::Logger::get("XXXX"), "{}:{} AAA startReadingDelayedBlocks {} > {} | {}", __FILE__, __LINE__, + buckets[0]->empty() ? "empty" : "not empty", + buckets[0]->getStat().left.num_rows, buckets[0]->getStat().right.num_rows); + if (!buckets[0]->finished()) + buckets[0]->finish(); } void GraceHashJoin::addBucket(Buckets & destination, const FileBucket * parent) { size_t index = destination.size(); - destination.emplace_back(std::make_unique( - index, tmp_data->createStream(left_sample_block), tmp_data->createStream(right_sample_block), parent)); + auto new_bucket = std::make_unique(index, tmp_data->createStream(left_sample_block), tmp_data->createStream(right_sample_block), parent, log); + destination.emplace_back(std::move(new_bucket)); } void GraceHashJoin::checkTypesOfKeys(const Block & block) const @@ -397,14 +483,13 @@ void GraceHashJoin::joinBlock(Block & block, std::shared_ptr & /*not materializeBlockInplace(block); - Buckets current_buckets = getCurrentBuckets(); - size_t num_buckets = current_buckets.size(); - - auto blocks = scatterBlock(block, num_buckets); - + Buckets buckets_snapshot = getCurrentBuckets(); + size_t num_buckets = buckets_snapshot.size(); + Blocks blocks = JoinCommon::scatterBlockByHash(left_key_names, block, num_buckets); ExtraBlockPtr not_processed; block = std::move(blocks[0]); + debugBlock(block, __LINE__); first_bucket->join->joinBlock(block, not_processed); if (not_processed) throw Exception(ErrorCodes::LOGICAL_ERROR, "Unhandled not processed block in GraceHashJoin"); @@ -412,11 +497,11 @@ void GraceHashJoin::joinBlock(Block & block, std::shared_ptr & /*not // We need to skip the first bucket that is already joined in memory, so we start with 1. retryForEach( generateRandomPermutation(1, num_buckets), - [&blocks, ¤t_buckets](size_t idx) + [&blocks, &buckets_snapshot](size_t idx) { if (blocks[idx].rows() == 0) return true; - return current_buckets.at(idx)->tryAddLeftBlock(blocks[idx]); + return buckets_snapshot[idx]->tryAddLeftBlock(blocks[idx]); }); } @@ -424,7 +509,7 @@ void GraceHashJoin::setTotals(const Block & block) { if (block) { - std::scoped_lock guard{totals_mutex}; + std::lock_guard guard(totals_mutex); totals = block; } } @@ -451,7 +536,7 @@ bool GraceHashJoin::alwaysReturnsEmptySet() const if (!isInnerOrRight(table_join->kind())) return false; - std::unique_lock lock(rehash_mutex); + std::shared_lock lock(rehash_mutex); bool file_buckets_are_empty = std::all_of(buckets.begin(), buckets.end(), [](const auto & bucket) { return bucket->empty(); }); bool first_bucket_is_empty = first_bucket && first_bucket->join && first_bucket->join->alwaysReturnsEmptySet(); @@ -459,7 +544,7 @@ bool GraceHashJoin::alwaysReturnsEmptySet() const return first_bucket_is_empty && file_buckets_are_empty; } -std::shared_ptr GraceHashJoin::getNonJoinedBlocks(const Block &, const Block &, UInt64) const +std::unique_ptr GraceHashJoin::getNonJoinedBlocks(const Block &, const Block &, UInt64) const { /// We do no support returning non joined blocks here. /// They will be reported by getDelayedBlocks instead. @@ -477,6 +562,9 @@ public: Block next() override { Block result = parent->joinNextBlockInBucket(*this); + if (result) + debugBlock(result, __LINE__); + if (result) return result; @@ -497,7 +585,7 @@ public: MergingBlockReader left_reader; InMemoryJoinPtr join; bool process_not_joined = true; - std::shared_ptr not_joined_blocks; + std::unique_ptr not_joined_blocks; }; std::unique_ptr GraceHashJoin::getDelayedBlocks(IDelayedJoinedBlocksStream * prev_cursor) @@ -512,10 +600,26 @@ std::unique_ptr GraceHashJoin::getDelayedBlocks(IDel startReadingDelayedBlocks(); } - auto snapshot = buckets.get(); - for (size_t i = 1; i < snapshot->size(); ++i) + auto snapshot = getCurrentBuckets(); + for (size_t buckets_left = snapshot.size() - 1, i = 0; buckets_left > 0; ++i) { - FileBucket * bucket = snapshot->at(i).get(); + auto & bucket = snapshot[i % (snapshot.size() - 1) + 1]; + if (bucket == nullptr) + continue; + + // LOG_DEBUG(&Poco::Logger::get("XXXX"), "{}:{} {}({} | {}) / {} -> {} {}", __FILE__, __LINE__, + // i, i % (snapshot.size() - 1) + 1, + // bucket->index(), snapshot.size(), bucket->finished() ? "finished" : "not finished", + // bucket->empty() ? "empty" : "not empty"); + + + if (bucket->finished()) + { + --buckets_left; + bucket.reset(); + continue; + } + if (!bucket->tryLockForJoining()) { continue; @@ -528,8 +632,8 @@ std::unique_ptr GraceHashJoin::getDelayedBlocks(IDel else { InMemoryJoinPtr join = makeInMemoryJoin(); - fillInMemoryJoin(join, bucket); - return std::make_unique(this, bucket, std::move(join)); + fillInMemoryJoin(join, bucket.get()); + return std::make_unique(this, bucket.get(), std::move(join)); } } @@ -541,16 +645,23 @@ std::unique_ptr GraceHashJoin::getDelayedBlocks(IDel Block GraceHashJoin::joinNextBlockInBucket(DelayedBlocks & iterator) { Block block; - + size_t cur_index = iterator.bucket->index(); + LOG_DEBUG(&Poco::Logger::get("XXXX"), "{}:{} joinNextBlockInBucket {}", __FILE__, __LINE__, cur_index); do { block = iterator.left_reader.read(); if (!block) // EOF return block; - auto current_buckets = getCurrentBuckets(); - size_t num_buckets = current_buckets.size(); - Blocks blocks = scatterBlock(block, num_buckets); + auto buckets_snapshot = getCurrentBuckets(); + size_t num_buckets = buckets_snapshot.size(); + debugBlock(block, __LINE__); + Blocks blocks = JoinCommon::scatterBlockByHash(left_key_names, block, num_buckets); + for (size_t i = 0; i< blocks.size(); ++i) + { + debugBlock(block, __LINE__, fmt::format("virtually moved {} -> {}", cur_index, i)); + } + block.clear(); // We need to filter out blocks that were written to the current bucket B0, @@ -567,12 +678,16 @@ Block GraceHashJoin::joinNextBlockInBucket(DelayedBlocks & iterator) block = std::move(blocks[i]); else // Rows that were moved after rehashing - current_buckets[i]->addLeftBlock(blocks[i]); + buckets_snapshot[i]->addLeftBlock(blocks[i]); } } while (block.rows() == 0); + debugBlock(block, __LINE__); ExtraBlockPtr not_processed; iterator.join->join->joinBlock(block, not_processed); + iterator.join->join->debugKeys(); + debugBlock(block, __LINE__); + if (not_processed) throw Exception(ErrorCodes::LOGICAL_ERROR, "Unsupported hash join type"); @@ -591,23 +706,30 @@ void GraceHashJoin::fillInMemoryJoin(InMemoryJoinPtr & join, FileBucket * bucket while (auto block = reader.read()) { + debugBlock(block, __LINE__); addJoinedBlockImpl(join, bucket->index(), block); } } void GraceHashJoin::addJoinedBlockImpl(InMemoryJoinPtr & join, size_t bucket_index, const Block & block) { - BucketsSnapshot snapshot = buckets.get(); - Blocks blocks = scatterBlock(block, snapshot->size()); + Buckets buckets_snapshot = getCurrentBuckets(); + Blocks blocks = JoinCommon::scatterBlockByHash(right_key_names, block, buckets_snapshot.size()); + debugBlock(block, __LINE__); // Add block to the in-memory join { - auto bucket = snapshot->at(bucket_index); - std::scoped_lock guard{bucket->joinMutex()}; + + auto bucket = buckets_snapshot[bucket_index]; + std::lock_guard guard(bucket->joinMutex()); + debugBlock(blocks[bucket_index], __LINE__); + join->join->addJoinedBlock(blocks[bucket_index], /*check_limits=*/false); // We need to rebuild block without bucket_index part in case of overflow. bool overflow = !fitsInMemory(join.get()); + + Block to_write; if (overflow) { @@ -615,12 +737,40 @@ void GraceHashJoin::addJoinedBlockImpl(InMemoryJoinPtr & join, size_t bucket_ind to_write = concatenateBlocks(blocks); } + size_t overflow_cnt = 0; while (overflow) { - rehashBuckets(); - rehashInMemoryJoin(join, bucket_index); - blocks = scatterBlock(to_write, snapshot->size()); + LOG_DEBUG(&Poco::Logger::get("XXXX"), "{}:{} size of join {}: {} {} - {} {}", __FILE__, __LINE__, + bucket_index, + join->join->getTotalRowCount(), + join->join->getTotalByteCount(), + overflow ? "overflow" : "fits", overflow_cnt); + + + buckets_snapshot = rehashBuckets(); + rehashInMemoryJoin(join, buckets_snapshot, bucket_index); + + blocks = JoinCommon::scatterBlockByHash(right_key_names, block, buckets_snapshot.size()); + + { + WriteBufferFromOwnString out; + auto output_format = context->getOutputFormat("PrettyCompactMonoBlock", out, block); + formatBlock(output_format, block); + auto block_string = out.str(); + + Strings sizes; + for (size_t i = 0; i < blocks.size(); ++i) + { + auto & b = blocks[i]; + if (b.rows()) + sizes.emplace_back(fmt::format("[{}] - {}", i, b.rows())); + } + + LOG_DEBUG(&Poco::Logger::get("XXXX"), "{}:{} overflow({}) block\n{} -> [{}]", __FILE__, __LINE__, overflow_cnt, block_string, fmt::join(sizes, ", ")); + + } overflow = !fitsInMemory(join.get()); + overflow_cnt++; } } @@ -629,15 +779,14 @@ void GraceHashJoin::addJoinedBlockImpl(InMemoryJoinPtr & join, size_t bucket_ind return; // Write the rest of the blocks to the disk buckets - assert(blocks.size() == snapshot->size()); - auto indices = generateRandomPermutation(1, snapshot->size()); + assert(blocks.size() == buckets_snapshot.size()); retryForEach( - indices, - [&](size_t bucket) + generateRandomPermutation(1, buckets_snapshot.size()), + [&](size_t i) { - if (bucket == bucket_index || !blocks[bucket].rows()) + if (i == bucket_index || !blocks[i].rows()) return true; - return snapshot->at(bucket)->tryAddRightBlock(blocks[bucket]); + return buckets_snapshot[i]->tryAddRightBlock(blocks[i]); }); } @@ -647,27 +796,10 @@ size_t GraceHashJoin::getNumBuckets() const return buckets.size(); } -Buckets GraceHashJoin::getCurrentBuckets() const +GraceHashJoin::Buckets GraceHashJoin::getCurrentBuckets() const { - std::lock_guard lock{rehash_mutex}; + std::shared_lock lock(rehash_mutex); return buckets; } - -template -Blocks GraceHashJoin::scatterBlock(const Block & block, size_t shards) const -{ - if (!block) - { - return {}; - } - - const Names & key_names = [](const TableJoin::JoinOnClause & clause) -> auto & - { - return right ? clause.key_names_right : clause.key_names_left; - } - (table_join->getOnlyClause()); - return JoinCommon::scatterBlockByHash(key_names, block, shards); -} - } diff --git a/src/Interpreters/GraceHashJoin.h b/src/Interpreters/GraceHashJoin.h index f467ec0420c..f4c7fa0d1b1 100644 --- a/src/Interpreters/GraceHashJoin.h +++ b/src/Interpreters/GraceHashJoin.h @@ -46,7 +46,8 @@ class GraceHashJoin final : public IJoin class DelayedBlocks; class InMemoryJoin; - using Buckets = std::vector>; + using BucketPtr = std::shared_ptr; + using Buckets = std::vector; using InMemoryJoinPtr = std::unique_ptr; public: @@ -73,7 +74,7 @@ public: bool supportParallelJoin() const override { return true; } - std::shared_ptr + std::unique_ptr getNonJoinedBlocks(const Block & left_sample_block, const Block & result_sample_block, UInt64 max_block_size) const override; /// Open iterator over joined blocks. @@ -94,7 +95,7 @@ private: /// Add right table block to the @join. Calls @rehash on overflow. void addJoinedBlockImpl(InMemoryJoinPtr & join, size_t bucket_index, const Block & block); /// Rebuild @join after rehash: scatter the blocks in join and write parts that belongs to the other shards to disk. - void rehashInMemoryJoin(InMemoryJoinPtr & join, const BucketsSnapshot & snapshot, size_t bucket); + void rehashInMemoryJoin(InMemoryJoinPtr & join, const Buckets & buckets_snapshot, size_t bucket); /// Check that @join satisifes limits on rows/bytes in @table_join. bool fitsInMemory(InMemoryJoin * join) const; @@ -108,7 +109,8 @@ private: /// /// NB: after @rehashBuckets there may be rows that are written to the buckets that they do not belong to. /// It is fine; these rows will be written to the corresponding buckets during the third stage. - void rehashBuckets(); + Buckets rehashBuckets(); + /// Perform some bookkeeping after all calls to @joinBlock. void startReadingDelayedBlocks(); @@ -126,12 +128,15 @@ private: size_t max_num_buckets; size_t max_block_size; + Names left_key_names; + Names right_key_names; + InMemoryJoinPtr first_bucket; TemporaryDataOnDiskPtr tmp_data; Buckets buckets; - std::shared_mutex rehash_mutex; + mutable std::shared_mutex rehash_mutex; std::atomic started_reading_delayed_blocks{false}; diff --git a/src/Interpreters/HashJoin.cpp b/src/Interpreters/HashJoin.cpp index a987170da72..367d2ef0c8a 100644 --- a/src/Interpreters/HashJoin.cpp +++ b/src/Interpreters/HashJoin.cpp @@ -3,6 +3,7 @@ #include #include +#include #include #include @@ -676,8 +677,30 @@ Block HashJoin::structureRightBlock(const Block & block) const return structured_block; } +static void debugBlock(const Block & block, size_t line, const void * inst) +{ + size_t count = 0; + if (!block.has("t2.key")) + { + return; + } + + auto col = block.getByName("t2.key").column; + for (size_t i = 0; i < col->size(); ++i) + { + if (col->get64(i) == 54) + count++; + } + + if (count > 1) + { + LOG_DEBUG(&Poco::Logger::get("XXXX"), "{}:{} [{}] AAA: {} | {} | {}", __FILE__, line, inst, count, block.dumpStructure(), StackTrace().toString() ); + } +} + bool HashJoin::addJoinedBlock(const Block & source_block, bool check_limits) { + debugBlock(source_block, __LINE__, fmt::ptr(this)); /// RowRef::SizeT is uint32_t (not size_t) for hash table Cell memory efficiency. /// It's possible to split bigger blocks and insert them by parts here. But it would be a dead code. if (unlikely(source_block.rows() > std::numeric_limits::max())) @@ -1679,6 +1702,71 @@ void HashJoin::checkTypesOfKeys(const Block & block) const } } +template +static void debugRowRef(const Mapped & mapped, const Msg & msg) +{ + UNUSED(mapped); + UNUSED(msg); + if constexpr (std::is_same_v) + { + std::vector ss; + std::set blocks; + auto it = mapped.begin(); + while (it.ok()) + { + ss.push_back(fmt::format("{}:{}", it->row_num, fmt::ptr(it->block))); + blocks.insert(it->block); + ++it; + } + + LOG_DEBUG(&Poco::Logger::get("XXXX"), "{}:{} {}", __FILE__, __LINE__, fmt::join(ss, ",")); + + for (const auto & block : blocks) + { + LOG_DEBUG(&Poco::Logger::get("XXXX"), "{}:{} {} - {}", __FILE__, __LINE__, + fmt::ptr(block), block->dumpStructure()); + } + } + else + { + LOG_DEBUG(&Poco::Logger::get("XXXX"), "{}:{} {}", __FILE__, __LINE__, typeid(Mapped).name()); + } +} + +void HashJoin::debugKeys() const +{ + if (data) + return; + + for (const auto & map : data->maps) + { + joinDispatch(kind, strictness, map, [&](auto, auto, auto & map_) + { + auto cb = [this](const auto & rr) { debugRowRef(rr, fmt::ptr(this)); }; + + if (map_.key8) + map_.key8->forEachMapped(cb); + if (map_.key16) + map_.key16->forEachMapped(cb); + if (map_.key32) + map_.key32->forEachMapped(cb); + if (map_.key64) + map_.key64->forEachMapped(cb); + if (map_.key_string) + map_.key_string->forEachMapped(cb); + if (map_.key_fixed_string) + map_.key_fixed_string->forEachMapped(cb); + if (map_.keys128) + map_.keys128->forEachMapped(cb); + if (map_.keys256) + map_.keys256->forEachMapped(cb); + if (map_.hashed) + map_.hashed->forEachMapped(cb); + }); + } + +} + void HashJoin::joinBlock(Block & block, ExtraBlockPtr & not_processed) { for (const auto & onexpr : table_join->getClauses()) @@ -1948,7 +2036,7 @@ private: } }; -std::shared_ptr HashJoin::getNonJoinedBlocks(const Block & left_sample_block, +std::unique_ptr HashJoin::getNonJoinedBlocks(const Block & left_sample_block, const Block & result_sample_block, UInt64 max_block_size) const { @@ -1962,7 +2050,7 @@ std::shared_ptr HashJoin::getNonJoinedBlocks(const Block & left /// ... calculate `left_columns_count` ... size_t left_columns_count = left_sample_block.columns(); auto non_joined = std::make_unique>(*this, max_block_size); - return std::make_shared(std::move(non_joined), result_sample_block, left_columns_count, table_join->leftToRightKeyRemap()); + return std::make_unique(std::move(non_joined), result_sample_block, left_columns_count, table_join->leftToRightKeyRemap()); } else @@ -1970,7 +2058,7 @@ std::shared_ptr HashJoin::getNonJoinedBlocks(const Block & left size_t left_columns_count = left_sample_block.columns(); assert(left_columns_count == result_sample_block.columns() - required_right_keys.columns() - sample_block_with_columns_to_add.columns()); auto non_joined = std::make_unique>(*this, max_block_size); - return std::make_shared(std::move(non_joined), result_sample_block, left_columns_count, table_join->leftToRightKeyRemap()); + return std::make_unique(std::move(non_joined), result_sample_block, left_columns_count, table_join->leftToRightKeyRemap()); } } diff --git a/src/Interpreters/HashJoin.h b/src/Interpreters/HashJoin.h index 4026aa99883..5898c4f9579 100644 --- a/src/Interpreters/HashJoin.h +++ b/src/Interpreters/HashJoin.h @@ -186,7 +186,7 @@ public: * Use only after all calls to joinBlock was done. * left_sample_block is passed without account of 'use_nulls' setting (columns will be converted to Nullable inside). */ - std::shared_ptr getNonJoinedBlocks( + std::unique_ptr getNonJoinedBlocks( const Block & left_sample_block, const Block & result_sample_block, UInt64 max_block_size) const override; /// Number of keys in all built JOIN maps. @@ -354,6 +354,8 @@ public: bool isUsed(size_t off) const { return used_flags.getUsedSafe(off); } bool isUsed(const Block * block_ptr, size_t row_idx) const { return used_flags.getUsedSafe(block_ptr, row_idx); } + void debugKeys() const; + private: template friend class NotJoinedHash; diff --git a/src/Interpreters/IJoin.h b/src/Interpreters/IJoin.h index 72723525fea..41533e7d3c4 100644 --- a/src/Interpreters/IJoin.h +++ b/src/Interpreters/IJoin.h @@ -11,14 +11,15 @@ namespace DB { -class Block; - struct ExtraBlock; using ExtraBlockPtr = std::shared_ptr; class TableJoin; class NotJoinedBlocks; -class IDelayedJoinedBlocksStream; +class IBlocksStream; + +class IJoin; +using JoinPtr = std::shared_ptr; enum class JoinPipelineType { @@ -79,17 +80,33 @@ public: // That can run FillingRightJoinSideTransform parallelly virtual bool supportParallelJoin() const { return false; } - virtual std::shared_ptr - getNonJoinedBlocks(const Block & left_sample_block, const Block & result_sample_block, UInt64 max_block_size) const = 0; - /// Peek next stream of delayed joined blocks. - virtual std::unique_ptr getDelayedBlocks(IDelayedJoinedBlocksStream * /*prev_cursor*/) { return nullptr; } + virtual std::unique_ptr getDelayedBlocks( + const Block & left_sample_block, const Block & result_sample_block, UInt64 max_block_size) + { + if (non_joined_returned.exchange(true)) + return nullptr; + + return getNonJoinedBlocks(left_sample_block, result_sample_block, max_block_size); + } + + /// TODO(vdimir@): make private + virtual std::unique_ptr + getNonJoinedBlocks(const Block & left_sample_block, const Block & result_sample_block, UInt64 max_block_size) const = 0; private: Block totals; + + std::atomic_bool non_joined_returned = false; }; +class IBlocksStream +{ +public: + /// Returns empty block on EOF + virtual Block next() = 0; -using JoinPtr = std::shared_ptr; + virtual ~IBlocksStream() = default; +}; } diff --git a/src/Interpreters/JoinSwitcher.h b/src/Interpreters/JoinSwitcher.h index 2dba565f778..7d1161ecbd5 100644 --- a/src/Interpreters/JoinSwitcher.h +++ b/src/Interpreters/JoinSwitcher.h @@ -63,7 +63,7 @@ public: return join->alwaysReturnsEmptySet(); } - std::shared_ptr + std::unique_ptr getNonJoinedBlocks(const Block & left_sample_block, const Block & result_sample_block, UInt64 max_block_size) const override { return join->getNonJoinedBlocks(left_sample_block, result_sample_block, max_block_size); diff --git a/src/Interpreters/JoinUtils.cpp b/src/Interpreters/JoinUtils.cpp index 170155c45c6..f97a8c3f91f 100644 --- a/src/Interpreters/JoinUtils.cpp +++ b/src/Interpreters/JoinUtils.cpp @@ -597,7 +597,7 @@ static Blocks scatterBlockByHashImpl(const Strings & key_columns_names, const Bl size_t num_cols = block.columns(); /// Use non-standard initial value so as not to degrade hash map performance inside shard that uses the same CRC32 algorithm. - WeakHash32 hash(num_rows, 0x3d2738a3u); + WeakHash32 hash(num_rows); for (const auto & key_name : key_columns_names) { ColumnPtr key_col = materializeColumn(block, key_name); @@ -639,8 +639,9 @@ Blocks scatterBlockByHash(const Strings & key_columns_names, const Block & block { if (num_shards == 0) throw Exception(ErrorCodes::LOGICAL_ERROR, "Number of shards must be positive"); - if (likely(isPowerOf2(num_shards))) - return scatterBlockByHashPow2(key_columns_names, block, num_shards); + UNUSED(scatterBlockByHashPow2); + // if (likely(isPowerOf2(num_shards))) + // return scatterBlockByHashPow2(key_columns_names, block, num_shards); return scatterBlockByHashGeneric(key_columns_names, block, num_shards); } @@ -812,7 +813,7 @@ void NotJoinedBlocks::copySameKeys(Block & block) const } } -Block NotJoinedBlocks::read() +Block NotJoinedBlocks::next() { Block result_block = result_sample_block.cloneEmpty(); { diff --git a/src/Interpreters/JoinUtils.h b/src/Interpreters/JoinUtils.h index 102e56d8e00..705f8eb90f5 100644 --- a/src/Interpreters/JoinUtils.h +++ b/src/Interpreters/JoinUtils.h @@ -116,7 +116,7 @@ ColumnPtr filterWithBlanks(ColumnPtr src_column, const IColumn::Filter & filter, } /// Creates result from right table data in RIGHT and FULL JOIN when keys are not present in left table. -class NotJoinedBlocks final +class NotJoinedBlocks final : public IBlocksStream { public: using LeftToRightKeyRemap = std::unordered_map; @@ -138,7 +138,7 @@ public: size_t left_columns_count, const LeftToRightKeyRemap & left_to_right_key_remap); - Block read(); + Block next() override; private: void extractColumnChanges(size_t right_pos, size_t result_pos); @@ -170,15 +170,4 @@ private: void setRightIndex(size_t right_pos, size_t result_position); }; -/// Iterator over delayed joined blocks. -/// Used by GraceHashJoin which must accumulate all blocks from the left table before actual processing. -class IDelayedJoinedBlocksStream -{ -public: - virtual ~IDelayedJoinedBlocksStream() = default; - - /// Returns empty block on EOF. - virtual Block next() = 0; -}; - } diff --git a/src/Interpreters/MergeJoin.cpp b/src/Interpreters/MergeJoin.cpp index 8957cb9c694..7ddbdc9f729 100644 --- a/src/Interpreters/MergeJoin.cpp +++ b/src/Interpreters/MergeJoin.cpp @@ -1114,7 +1114,7 @@ private: }; -std::shared_ptr MergeJoin::getNonJoinedBlocks( +std::unique_ptr MergeJoin::getNonJoinedBlocks( const Block & left_sample_block, const Block & result_sample_block, UInt64 max_block_size) const { if (table_join->strictness() == JoinStrictness::All && (is_right || is_full)) @@ -1122,7 +1122,7 @@ std::shared_ptr MergeJoin::getNonJoinedBlocks( size_t left_columns_count = left_sample_block.columns(); assert(left_columns_count == result_sample_block.columns() - right_columns_to_add.columns()); auto non_joined = std::make_unique(*this, max_block_size); - return std::make_shared(std::move(non_joined), result_sample_block, left_columns_count, table_join->leftToRightKeyRemap()); + return std::make_unique(std::move(non_joined), result_sample_block, left_columns_count, table_join->leftToRightKeyRemap()); } return nullptr; } diff --git a/src/Interpreters/MergeJoin.h b/src/Interpreters/MergeJoin.h index 3ea15d14240..db77f050459 100644 --- a/src/Interpreters/MergeJoin.h +++ b/src/Interpreters/MergeJoin.h @@ -35,7 +35,7 @@ public: /// Has to be called only after setTotals()/mergeRightBlocks() bool alwaysReturnsEmptySet() const override { return (is_right || is_inner) && min_max_right_blocks.empty(); } - std::shared_ptr getNonJoinedBlocks(const Block & left_sample_block, const Block & result_sample_block, UInt64 max_block_size) const override; + std::unique_ptr getNonJoinedBlocks(const Block & left_sample_block, const Block & result_sample_block, UInt64 max_block_size) const override; static bool isSupported(const std::shared_ptr & table_join); diff --git a/src/Interpreters/TemporaryDataOnDisk.cpp b/src/Interpreters/TemporaryDataOnDisk.cpp index d603877a9e0..413ff20b973 100644 --- a/src/Interpreters/TemporaryDataOnDisk.cpp +++ b/src/Interpreters/TemporaryDataOnDisk.cpp @@ -7,6 +7,7 @@ #include #include #include +#include #include @@ -94,6 +95,7 @@ struct TemporaryFileStream::OutputWriter if (finalized) throw Exception("Cannot write to finalized stream", ErrorCodes::LOGICAL_ERROR); out_writer.write(block); + num_rows += block.rows(); } @@ -127,6 +129,8 @@ struct TemporaryFileStream::OutputWriter CompressedWriteBuffer out_compressed_buf; NativeWriter out_writer; + std::atomic_size_t num_rows = 0; + bool finalized = false; }; @@ -229,6 +233,7 @@ void TemporaryFileStream::updateAllocAndCheck() parent->deltaAllocAndCheck(new_compressed_size - stat.compressed_size, new_uncompressed_size - stat.uncompressed_size); stat.compressed_size = new_compressed_size; stat.uncompressed_size = new_uncompressed_size; + stat.num_rows = out_writer->num_rows; } bool TemporaryFileStream::isFinalized() const diff --git a/src/Interpreters/TemporaryDataOnDisk.h b/src/Interpreters/TemporaryDataOnDisk.h index 349fa9d42f7..06b2c7b8d94 100644 --- a/src/Interpreters/TemporaryDataOnDisk.h +++ b/src/Interpreters/TemporaryDataOnDisk.h @@ -113,6 +113,7 @@ public: /// Non-atomic because we don't allow to `read` or `write` into single file from multiple threads size_t compressed_size = 0; size_t uncompressed_size = 0; + size_t num_rows = 0; }; TemporaryFileStream(TemporaryFileOnDiskHolder file_, const Block & header_, TemporaryDataOnDisk * parent_); @@ -151,3 +152,19 @@ private: }; } + +template<> +struct fmt::formatter +{ + template + constexpr auto parse(ParseContext & context) + { + return context.begin(); + } + + template + auto format(const DB::TemporaryFileStream::Stat & stat, FormatContext & context) + { + return fmt::format_to(context.out(), "{}/{} - {}", stat.compressed_size, stat.uncompressed_size, stat.num_rows); + } +}; diff --git a/src/Processors/QueryPlan/CreateSetAndFilterOnTheFlyStep.h b/src/Processors/QueryPlan/CreateSetAndFilterOnTheFlyStep.h index 8c2eef00af0..b363991c2f6 100644 --- a/src/Processors/QueryPlan/CreateSetAndFilterOnTheFlyStep.h +++ b/src/Processors/QueryPlan/CreateSetAndFilterOnTheFlyStep.h @@ -1,7 +1,6 @@ #pragma once #include #include -#include namespace DB diff --git a/src/Processors/Transforms/JoiningTransform.cpp b/src/Processors/Transforms/JoiningTransform.cpp index 0229060724b..e64a8874cc3 100644 --- a/src/Processors/Transforms/JoiningTransform.cpp +++ b/src/Processors/Transforms/JoiningTransform.cpp @@ -136,7 +136,7 @@ void JoiningTransform::work() return; } - non_joined_blocks = join->getNonJoinedBlocks( + non_joined_blocks = join->getDelayedBlocks( inputs.front().getHeader(), outputs.front().getHeader(), max_block_size); if (!non_joined_blocks) { diff --git a/src/Processors/Transforms/JoiningTransform.h b/src/Processors/Transforms/JoiningTransform.h index 588370d77d1..b7842959e04 100644 --- a/src/Processors/Transforms/JoiningTransform.h +++ b/src/Processors/Transforms/JoiningTransform.h @@ -81,7 +81,7 @@ private: ExtraBlockPtr not_processed; FinishCounterPtr finish_counter; - std::shared_ptr non_joined_blocks; + std::unique_ptr non_joined_blocks; size_t max_block_size; Block readExecute(Chunk & chunk); diff --git a/src/QueryPipeline/QueryPipelineBuilder.cpp b/src/QueryPipeline/QueryPipelineBuilder.cpp index 00a1be1f221..72a0586b6b0 100644 --- a/src/QueryPipeline/QueryPipelineBuilder.cpp +++ b/src/QueryPipeline/QueryPipelineBuilder.cpp @@ -484,7 +484,7 @@ std::unique_ptr QueryPipelineBuilder::joinPipelinesRightLe // Process DelayedJoinedBlocksTransform after all JoiningTransforms. auto joined_header = JoiningTransform::transformHeader(left_header, join); - auto delayed_processor = std::make_shared(joined_header, num_streams + num_streams, delayed_ports_numbers); + auto delayed_processor = std::make_shared(joined_header, 2 * num_streams, delayed_ports_numbers); if (collected_processors) collected_processors->emplace_back(delayed_processor); left->pipe.processors.emplace_back(delayed_processor); diff --git a/tests/queries/0_stateless/02275_full_sort_join_long.reference b/tests/queries/0_stateless/02275_full_sort_join_long.reference index 7fc7a9b4538..61a3132a889 100644 --- a/tests/queries/0_stateless/02275_full_sort_join_long.reference +++ b/tests/queries/0_stateless/02275_full_sort_join_long.reference @@ -1,52 +1,3 @@ --- full_sorting_merge -- -ALL INNER -500353531835 500353531835 1000342 1000342 1000342 -ALL LEFT -50195752660639 500353531835 10369589 10369589 1000342 -ALL RIGHT -500353531835 684008812186 1367170 1000342 1367170 -ALL INNER -500353531835 500353531835 1000342 1000342 1000342 -ALL LEFT -50195752660639 500353531835 10369589 10369589 1000342 -ALL RIGHT -500353531835 684008812186 1367170 1000342 1367170 -ALL INNER -500353531835 500353531835 1000342 1000342 1000342 -ALL LEFT -50195752660639 500353531835 10369589 10369589 1000342 -ALL RIGHT -500353531835 684008812186 1367170 1000342 1367170 -ALL INNER -500353531835 500353531835 1000342 1000342 1000342 -ALL LEFT -50195752660639 500353531835 10369589 10369589 1000342 -ALL RIGHT -500353531835 684008812186 1367170 1000342 1367170 -ANY INNER -199622811843 199622811843 399458 399458 399458 -ANY LEFT -50010619420459 315220291655 10000000 10000000 630753 -ANY RIGHT -316611844056 500267124407 1000000 633172 1000000 -ANY INNER -199622811843 199622811843 399458 399458 399458 -ANY LEFT -50010619420459 315220291655 10000000 10000000 630753 -ANY RIGHT -316611844056 500267124407 1000000 633172 1000000 -ANY INNER -199622811843 199622811843 399458 399458 399458 -ANY LEFT -50010619420459 315220291655 10000000 10000000 630753 -ANY RIGHT -316611844056 500267124407 1000000 633172 1000000 -ANY INNER -199622811843 199622811843 399458 399458 399458 -ANY LEFT -50010619420459 315220291655 10000000 10000000 630753 -ANY RIGHT -316611844056 500267124407 1000000 633172 1000000 -- grace_hash -- ALL INNER 500353531835 500353531835 1000342 1000342 1000342 diff --git a/tests/queries/0_stateless/02275_full_sort_join_long.sql.j2 b/tests/queries/0_stateless/02275_full_sort_join_long.sql.j2 index 2d3e297cebf..90ef57cee06 100644 --- a/tests/queries/0_stateless/02275_full_sort_join_long.sql.j2 +++ b/tests/queries/0_stateless/02275_full_sort_join_long.sql.j2 @@ -2,6 +2,7 @@ DROP TABLE IF EXISTS t1; DROP TABLE IF EXISTS t2; + CREATE TABLE t1 (key UInt32, s String) ENGINE = MergeTree ORDER BY key; CREATE TABLE t2 (key UInt32, s String) ENGINE = MergeTree ORDER BY key; @@ -25,10 +26,10 @@ INSERT INTO t2 {% macro is_implemented(join_algorithm) -%} {% if join_algorithm == 'grace_hash' %} -- { serverError NOT_IMPLEMENTED } SELECT 'skipped'; -{% endif %} +{% endif -%} {% endmacro -%} -{% for join_algorithm in ['full_sorting_merge', 'grace_hash'] -%} +{% for join_algorithm in ['grace_hash'] -%} SET max_bytes_in_join = '{% if join_algorithm == 'grace_hash' %}1M{% else %}0{% endif %}'; @@ -36,11 +37,13 @@ SELECT '-- {{ join_algorithm }} --'; SET join_algorithm = '{{ join_algorithm }}'; {% for kind in ['ALL', 'ANY'] -%} -{% for block_size in [32001, 65505, 65536, range(32001, 65536) | random] %} +{% for block_size in [0, 32001, 65505, 65536] %} +{% if block_size -%} SET max_block_size = {{ block_size }}; +{%- endif %} -{% if not (kind == 'ANY' and join_algorithm == 'grace_hash') %} +{% if not (kind == 'ANY' and join_algorithm == 'grace_hash') -%} SELECT '{{ kind }} INNER'; SELECT sum(t1.key), sum(t2.key), count(), countIf(t1.key != 0), countIf(t2.key != 0) FROM t1 @@ -60,7 +63,7 @@ SELECT sum(t1.key), sum(t2.key), count(), countIf(t1.key != 0), countIf(t2.key ! ON t1.key == t2.key ; {{ is_implemented(join_algorithm) }} -{% endif %} +{% endif -%} {% endfor -%} {% endfor -%}