From e30ab0874bb221b7175ed3bea7ecfe8d491938d7 Mon Sep 17 00:00:00 2001 From: vdimir Date: Fri, 20 Jan 2023 16:30:34 +0000 Subject: [PATCH] Review fixes --- src/Core/Block.cpp | 3 -- src/Interpreters/GraceHashJoin.cpp | 44 +++++++++---------- src/Interpreters/GraceHashJoin.h | 2 +- src/Interpreters/HashJoin.cpp | 6 +-- src/Interpreters/JoinUtils.h | 1 + .../01881_join_on_conditions_hash.sql.j2 | 2 + 6 files changed, 27 insertions(+), 31 deletions(-) diff --git a/src/Core/Block.cpp b/src/Core/Block.cpp index 6cb324ca084..2aa66c3e682 100644 --- a/src/Core/Block.cpp +++ b/src/Core/Block.cpp @@ -841,9 +841,6 @@ Block concatenateBlocks(const std::vector & blocks) if (blocks.empty()) return {}; - if (blocks.size() == 1) - return blocks.front(); - size_t num_rows = 0; for (const auto & block : blocks) num_rows += block.rows(); diff --git a/src/Interpreters/GraceHashJoin.cpp b/src/Interpreters/GraceHashJoin.cpp index 46c60360b62..4cf8f91d0a9 100644 --- a/src/Interpreters/GraceHashJoin.cpp +++ b/src/Interpreters/GraceHashJoin.cpp @@ -59,21 +59,22 @@ namespace Blocks blocks; size_t rows_read = 0; - while (true) + do { Block block = reader.read(); rows_read += block.rows(); if (!block) { eof = true; + if (blocks.size() == 1) + return blocks.front(); return concatenateBlocks(blocks); } blocks.push_back(std::move(block)); + } while (rows_read < result_block_size); - if (rows_read >= result_block_size) - break; - } - + if (blocks.size() == 1) + return blocks.front(); return concatenateBlocks(blocks); } @@ -121,20 +122,12 @@ class GraceHashJoin::FileBucket : boost::noncopyable public: using BucketLock = std::unique_lock; - struct Stats - { - TemporaryFileStream::Stat left; - TemporaryFileStream::Stat right; - }; - - explicit FileBucket(size_t bucket_index_, GraceHashJoin & grace_join_) + explicit FileBucket(size_t bucket_index_, TemporaryFileStream & left_file_, TemporaryFileStream & right_file_, Poco::Logger * log_) : idx{bucket_index_} - , left_file{ - grace_join_.tmp_data->createStream(grace_join_.left_sample_block)} - , right_file{ - grace_join_.tmp_data->createStream(grace_join_.prepareRightBlock(grace_join_.right_sample_block))} + , left_file{left_file_} + , right_file{right_file_} , state{State::WRITING_BLOCKS} - , log{grace_join_.log} + , log{log_} { } @@ -170,8 +163,6 @@ public: bool empty() const { return is_empty.load(); } - Stats getStat() const { return stats; } - AccumulatedBlockReader startJoining() { LOG_TRACE(log, "Joining file bucket {}", idx); @@ -179,8 +170,8 @@ public: std::unique_lock left_lock(left_file_mutex); std::unique_lock right_lock(right_file_mutex); - stats.left = left_file.finishWriting(); - stats.right = right_file.finishWriting(); + left_file.finishWriting(); + right_file.finishWriting(); state = State::JOINING_BLOCKS; @@ -232,7 +223,6 @@ private: std::atomic_bool is_empty = true; std::atomic state; - Stats stats; Poco::Logger * log; }; @@ -395,7 +385,10 @@ GraceHashJoin::Buckets GraceHashJoin::rehashBuckets(size_t to_size) void GraceHashJoin::addBucket(Buckets & destination) { - BucketPtr new_bucket = std::make_shared(destination.size(), *this); + auto & left_file = tmp_data->createStream(left_sample_block); + auto & right_file = tmp_data->createStream(prepareRightBlock(right_sample_block)); + + BucketPtr new_bucket = std::make_shared(destination.size(), left_file, right_file, log); destination.emplace_back(std::move(new_bucket)); } @@ -649,7 +642,10 @@ void GraceHashJoin::addJoinedBlockImpl(Block block) current_blocks.emplace_back(std::move(blocks[bucket_index])); } - current_block = concatenateBlocks(current_blocks); + if (current_blocks.size() == 1) + current_block = std::move(current_blocks.front()); + else + current_block = concatenateBlocks(current_blocks); } hash_join = makeInMemoryJoin(); diff --git a/src/Interpreters/GraceHashJoin.h b/src/Interpreters/GraceHashJoin.h index ff0f1a96ca2..781cbf24edf 100644 --- a/src/Interpreters/GraceHashJoin.h +++ b/src/Interpreters/GraceHashJoin.h @@ -95,7 +95,7 @@ private: /// Add right table block to the @join. Calls @rehash on overflow. void addJoinedBlockImpl(Block block); - /// Check that join satisifes limits on rows/bytes in table_join. + /// Check that join satisfies limits on rows/bytes in table_join. bool hasMemoryOverflow(size_t total_rows, size_t total_bytes) const; bool hasMemoryOverflow(const InMemoryJoinPtr & hash_join_) const; bool hasMemoryOverflow(const BlocksList & blocks) const; diff --git a/src/Interpreters/HashJoin.cpp b/src/Interpreters/HashJoin.cpp index 2ef7810e8dd..005f24cba2a 100644 --- a/src/Interpreters/HashJoin.cpp +++ b/src/Interpreters/HashJoin.cpp @@ -1774,10 +1774,10 @@ HashJoin::~HashJoin() { if (!data) { - LOG_DEBUG(log, "Join data has been released"); + LOG_TRACE(log, "Join data has been released"); return; } - LOG_DEBUG(log, "Join data is being destroyed, {} bytes and {} rows in hash table", getTotalByteCount(), getTotalRowCount()); + LOG_TRACE(log, "Join data is being destroyed, {} bytes and {} rows in hash table", getTotalByteCount(), getTotalRowCount()); } template @@ -2058,7 +2058,7 @@ void HashJoin::reuseJoinedData(const HashJoin & join) BlocksList HashJoin::releaseJoinedBlocks(bool restructure) { - LOG_DEBUG(log, "Join data is being released, {} bytes and {} rows in hash table", getTotalByteCount(), getTotalRowCount()); + LOG_TRACE(log, "Join data is being released, {} bytes and {} rows in hash table", getTotalByteCount(), getTotalRowCount()); BlocksList right_blocks = std::move(data->blocks); if (!restructure) diff --git a/src/Interpreters/JoinUtils.h b/src/Interpreters/JoinUtils.h index f4f5f5bdc8d..f112ca22e5b 100644 --- a/src/Interpreters/JoinUtils.h +++ b/src/Interpreters/JoinUtils.h @@ -21,6 +21,7 @@ using UInt8ColumnDataPtr = const ColumnUInt8::Container *; namespace JoinCommon { +/// Helper interface to work with mask from JOIN ON section class JoinMask { public: diff --git a/tests/queries/0_stateless/01881_join_on_conditions_hash.sql.j2 b/tests/queries/0_stateless/01881_join_on_conditions_hash.sql.j2 index 0d6bef7fadb..d2cc066a1b1 100644 --- a/tests/queries/0_stateless/01881_join_on_conditions_hash.sql.j2 +++ b/tests/queries/0_stateless/01881_join_on_conditions_hash.sql.j2 @@ -30,6 +30,8 @@ SELECT t1.key, t1.key2 FROM t1 INNER ALL JOIN t2 ON t1.id == t2.id AND t2.key == SELECT '--'; SELECT t1.key FROM t1 INNER ANY JOIN t2 ON t1.id == t2.id AND t2.key == t2.key2 AND t1.key == t1.key2; +SELECT t1.key FROM t1 INNER ANY JOIN t2 ON t1.id == t2.id AND t2.key == t2.key2 AND t1.key == t1.key2 AND 0; -- { serverError INVALID_JOIN_ON_EXPRESSION } +SELECT t1.key FROM t1 INNER ANY JOIN t2 ON t1.id == t2.id AND t2.key == t2.key2 AND t1.key == t1.key2 AND 1; -- { serverError INVALID_JOIN_ON_EXPRESSION } SELECT '--'; SELECT '333' = t1.key FROM t1 INNER ANY JOIN t2 ON t1.id == t2.id AND t2.key == t2.key2 AND t1.key == t1.key2 AND t2.id > 2;