diff --git a/src/Interpreters/GraceHashJoin.cpp b/src/Interpreters/GraceHashJoin.cpp index 221b5d2599c..51d4c7d1f4b 100644 --- a/src/Interpreters/GraceHashJoin.cpp +++ b/src/Interpreters/GraceHashJoin.cpp @@ -41,7 +41,7 @@ namespace public: AccumulatedBlockReader(TemporaryFileStream & reader_, std::mutex & mutex_, - size_t result_block_size_ = DEFAULT_BLOCK_SIZE * 8) + size_t result_block_size_ = 0) : reader(reader_) , mutex(mutex_) , result_block_size(result_block_size_) @@ -59,18 +59,22 @@ namespace Blocks blocks; size_t rows_read = 0; - while (rows_read < result_block_size) + do { Block block = reader.read(); rows_read += block.rows(); if (!block) { eof = true; + if (blocks.size() == 1) + return blocks.front(); return concatenateBlocks(blocks); } blocks.push_back(std::move(block)); - } + } while (rows_read < result_block_size); + if (blocks.size() == 1) + return blocks.front(); return concatenateBlocks(blocks); } @@ -118,21 +122,12 @@ class GraceHashJoin::FileBucket : boost::noncopyable public: using BucketLock = std::unique_lock; - struct Stats - { - TemporaryFileStream::Stat left; - TemporaryFileStream::Stat right; - }; - - explicit FileBucket(size_t bucket_index_, - TemporaryFileStream & left_file_, - TemporaryFileStream & right_file_, - Poco::Logger * log_) + explicit FileBucket(size_t bucket_index_, TemporaryFileStream & left_file_, TemporaryFileStream & right_file_, Poco::Logger * log_) : idx{bucket_index_} , left_file{left_file_} , right_file{right_file_} , state{State::WRITING_BLOCKS} - , log(log_) + , log{log_} { } @@ -168,21 +163,18 @@ public: bool empty() const { return is_empty.load(); } - Stats getStat() const { return stats; } - AccumulatedBlockReader startJoining() { LOG_TRACE(log, "Joining file bucket {}", idx); - { std::unique_lock left_lock(left_file_mutex); std::unique_lock right_lock(right_file_mutex); - stats.left = left_file.finishWriting(); - stats.right = right_file.finishWriting(); + left_file.finishWriting(); + right_file.finishWriting(); + state = State::JOINING_BLOCKS; } - return AccumulatedBlockReader(right_file, right_file_mutex); } @@ -231,22 +223,23 @@ private: std::atomic_bool is_empty = true; std::atomic state; - Stats stats; Poco::Logger * log; }; namespace { + template -void flushBlocksToBuckets(Blocks & blocks, const GraceHashJoin::Buckets & buckets) +void flushBlocksToBuckets(Blocks & blocks, const GraceHashJoin::Buckets & buckets, size_t except_index = 0) { chassert(blocks.size() == buckets.size()); retryForEach( generateRandomPermutation(1, buckets.size()), // skipping 0 block, since we join it in memory w/o spilling on disk [&](size_t i) { - if (!blocks[i].rows()) + /// Skip empty and current bucket + if (!blocks[i].rows() || i == except_index) return true; bool flushed = false; @@ -281,6 +274,7 @@ GraceHashJoin::GraceHashJoin( , right_key_names(table_join->getOnlyClause().key_names_right) , tmp_data(std::make_unique(tmp_data_, CurrentMetrics::TemporaryFilesForJoin)) , hash_join(makeInMemoryJoin()) + , hash_join_sample_block(hash_join->savedBlockSample()) { if (!GraceHashJoin::isSupported(table_join)) throw Exception(ErrorCodes::NOT_IMPLEMENTED, "GraceHashJoin is not supported for this join type"); @@ -288,6 +282,9 @@ GraceHashJoin::GraceHashJoin( void GraceHashJoin::initBuckets() { + if (!buckets.empty()) + return; + const auto & settings = context->getSettingsRef(); size_t initial_num_buckets = roundUpToPowerOfTwoOrZero(std::clamp(settings.grace_hash_join_initial_buckets, 1, settings.grace_hash_join_max_buckets)); @@ -300,7 +297,7 @@ void GraceHashJoin::initBuckets() if (buckets.empty()) throw Exception(ErrorCodes::LOGICAL_ERROR, "No buckets created"); - LOG_TRACE(log, "Initialize {} buckets", buckets.size()); + LOG_TRACE(log, "Initialize {} bucket{}", buckets.size(), buckets.size() > 1 ? "s" : ""); current_bucket = buckets.front().get(); current_bucket->startJoining(); @@ -320,18 +317,44 @@ bool GraceHashJoin::addJoinedBlock(const Block & block, bool /*check_limits*/) throw Exception(ErrorCodes::LOGICAL_ERROR, "GraceHashJoin is not initialized"); Block materialized = materializeBlock(block); - addJoinedBlockImpl(materialized); + addJoinedBlockImpl(std::move(materialized)); return true; } -bool GraceHashJoin::fitsInMemory() const +bool GraceHashJoin::hasMemoryOverflow(size_t total_rows, size_t total_bytes) const { /// One row can't be split, avoid loop - size_t total_row_count = hash_join->getTotalRowCount(); - if (total_row_count < 2) - return true; + if (total_rows < 2) + return false; - return table_join->sizeLimits().softCheck(total_row_count, hash_join->getTotalByteCount()); + bool has_overflow = !table_join->sizeLimits().softCheck(total_rows, total_bytes); + + if (has_overflow) + LOG_TRACE(log, "Memory overflow, size exceeded {} / {} bytes, {} / {} rows", + ReadableSize(total_bytes), ReadableSize(table_join->sizeLimits().max_bytes), + total_rows, table_join->sizeLimits().max_rows); + + return has_overflow; +} + +bool GraceHashJoin::hasMemoryOverflow(const BlocksList & blocks) const +{ + size_t total_rows = 0; + size_t total_bytes = 0; + for (const auto & block : blocks) + { + total_rows += block.rows(); + total_bytes += block.allocatedBytes(); + } + return hasMemoryOverflow(total_rows, total_bytes); +} + +bool GraceHashJoin::hasMemoryOverflow(const InMemoryJoinPtr & hash_join_) const +{ + size_t total_rows = hash_join_->getTotalRowCount(); + size_t total_bytes = hash_join_->getTotalByteCount(); + + return hasMemoryOverflow(total_rows, total_bytes); } GraceHashJoin::Buckets GraceHashJoin::rehashBuckets(size_t to_size) @@ -342,7 +365,7 @@ GraceHashJoin::Buckets GraceHashJoin::rehashBuckets(size_t to_size) if (to_size <= current_size) return buckets; - assert(isPowerOf2(to_size)); + chassert(isPowerOf2(to_size)); if (to_size > max_num_buckets) { @@ -363,14 +386,16 @@ GraceHashJoin::Buckets GraceHashJoin::rehashBuckets(size_t to_size) void GraceHashJoin::addBucket(Buckets & destination) { - BucketPtr new_bucket = std::make_shared( - destination.size(), tmp_data->createStream(left_sample_block), tmp_data->createStream(right_sample_block), log); + auto & left_file = tmp_data->createStream(left_sample_block); + auto & right_file = tmp_data->createStream(prepareRightBlock(right_sample_block)); + + BucketPtr new_bucket = std::make_shared(destination.size(), left_file, right_file, log); destination.emplace_back(std::move(new_bucket)); } void GraceHashJoin::checkTypesOfKeys(const Block & block) const { - assert(hash_join); + chassert(hash_join); return hash_join->checkTypesOfKeys(block); } @@ -423,7 +448,7 @@ size_t GraceHashJoin::getTotalRowCount() const size_t GraceHashJoin::getTotalByteCount() const { std::lock_guard lock(hash_join_mutex); - assert(hash_join); + chassert(hash_join); return hash_join->getTotalByteCount(); } @@ -437,9 +462,14 @@ bool GraceHashJoin::alwaysReturnsEmptySet() const std::shared_lock lock(rehash_mutex); return std::all_of(buckets.begin(), buckets.end(), [](const auto & bucket) { return bucket->empty(); }); }(); - bool hash_join_is_empty = hash_join && hash_join->alwaysReturnsEmptySet(); - return hash_join_is_empty && file_buckets_are_empty; + if (!file_buckets_are_empty) + return false; + + chassert(hash_join); + bool hash_join_is_empty = hash_join->alwaysReturnsEmptySet(); + + return hash_join_is_empty; } IBlocksStreamPtr GraceHashJoin::getNonJoinedBlocks(const Block &, const Block &, UInt64) const @@ -528,17 +558,11 @@ IBlocksStreamPtr GraceHashJoin::getDelayedBlocks() if (hash_join) { - auto right_blocks = hash_join->releaseJoinedBlocks(); - Blocks blocks = JoinCommon::scatterBlockByHash(right_key_names, right_blocks, buckets.size()); - - for (size_t i = 0; i < blocks.size(); ++i) + auto right_blocks = hash_join->releaseJoinedBlocks(/* restructure */ false); + for (auto & block : right_blocks) { - if (blocks[i].rows() == 0 || i == bucket_idx) - continue; - - if (i < bucket_idx) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected bucket index {} when current bucket is {}", i, bucket_idx); - buckets[i]->addRightBlock(blocks[i]); + Blocks blocks = JoinCommon::scatterBlockByHash(right_key_names, block, buckets.size()); + flushBlocksToBuckets(blocks, buckets, bucket_idx); } } @@ -570,7 +594,7 @@ IBlocksStreamPtr GraceHashJoin::getDelayedBlocks() return std::make_unique(current_bucket->idx, buckets, hash_join, left_key_names, right_key_names); } - LOG_TRACE(log, "Finished loading all buckets"); + LOG_TRACE(log, "Finished loading all {} buckets", buckets.size()); current_bucket = nullptr; return nullptr; @@ -581,42 +605,64 @@ GraceHashJoin::InMemoryJoinPtr GraceHashJoin::makeInMemoryJoin() return std::make_unique(table_join, right_sample_block, any_take_last_row); } +Block GraceHashJoin::prepareRightBlock(const Block & block) +{ + return HashJoin::prepareRightBlock(block, hash_join_sample_block); +} + void GraceHashJoin::addJoinedBlockImpl(Block block) { Buckets buckets_snapshot = getCurrentBuckets(); - Blocks blocks = JoinCommon::scatterBlockByHash(right_key_names, block, buckets_snapshot.size()); size_t bucket_index = current_bucket->idx; + Block current_block; + + { + Blocks blocks = JoinCommon::scatterBlockByHash(right_key_names, block, buckets_snapshot.size()); + flushBlocksToBuckets(blocks, buckets_snapshot, bucket_index); + current_block = std::move(blocks[bucket_index]); + } // Add block to the in-memory join - if (blocks[bucket_index].rows() > 0) + if (current_block.rows() > 0) { std::lock_guard lock(hash_join_mutex); - hash_join->addJoinedBlock(blocks[bucket_index], /* check_limits = */ false); - bool overflow = !fitsInMemory(); - - if (overflow) - { - auto right_blocks = hash_join->releaseJoinedBlocks(); - right_blocks.pop_back(); - - for (const auto & right_block : right_blocks) - blocks.push_back(right_block); - } - - while (overflow) - { - buckets_snapshot = rehashBuckets(buckets_snapshot.size() * 2); - - blocks = JoinCommon::scatterBlockByHash(right_key_names, blocks, buckets_snapshot.size()); + if (!hash_join) hash_join = makeInMemoryJoin(); - hash_join->addJoinedBlock(blocks[bucket_index], /* check_limits = */ false); - overflow = !fitsInMemory(); - } - blocks[bucket_index].clear(); - } - flushBlocksToBuckets(blocks, buckets_snapshot); + hash_join->addJoinedBlock(current_block, /* check_limits = */ false); + + if (!hasMemoryOverflow(hash_join)) + return; + + current_block = {}; + + auto right_blocks = hash_join->releaseJoinedBlocks(/* restructure */ false); + hash_join = nullptr; + + buckets_snapshot = rehashBuckets(buckets_snapshot.size() * 2); + + { + Blocks current_blocks; + current_blocks.reserve(right_blocks.size()); + for (const auto & right_block : right_blocks) + { + Blocks blocks = JoinCommon::scatterBlockByHash(right_key_names, right_block, buckets_snapshot.size()); + flushBlocksToBuckets(blocks, buckets_snapshot, bucket_index); + current_blocks.emplace_back(std::move(blocks[bucket_index])); + } + + if (current_blocks.size() == 1) + current_block = std::move(current_blocks.front()); + else + current_block = concatenateBlocks(current_blocks); + } + + hash_join = makeInMemoryJoin(); + + if (current_block.rows() > 0) + hash_join->addJoinedBlock(current_block, /* check_limits = */ false); + } } size_t GraceHashJoin::getNumBuckets() const diff --git a/src/Interpreters/GraceHashJoin.h b/src/Interpreters/GraceHashJoin.h index f4e75f142f3..be03cee4a35 100644 --- a/src/Interpreters/GraceHashJoin.h +++ b/src/Interpreters/GraceHashJoin.h @@ -95,8 +95,10 @@ private: /// Add right table block to the @join. Calls @rehash on overflow. void addJoinedBlockImpl(Block block); - /// Check that @join satisifes limits on rows/bytes in @table_join. - bool fitsInMemory() const; + /// Check that join satisfies limits on rows/bytes in table_join. + bool hasMemoryOverflow(size_t total_rows, size_t total_bytes) const; + bool hasMemoryOverflow(const InMemoryJoinPtr & hash_join_) const; + bool hasMemoryOverflow(const BlocksList & blocks) const; /// Create new bucket at the end of @destination. void addBucket(Buckets & destination); @@ -114,6 +116,9 @@ private: size_t getNumBuckets() const; Buckets getCurrentBuckets() const; + /// Structure block to store in the HashJoin according to sample_block. + Block prepareRightBlock(const Block & block); + Poco::Logger * log; ContextPtr context; std::shared_ptr table_join; @@ -136,6 +141,7 @@ private: mutable std::mutex current_bucket_mutex; InMemoryJoinPtr hash_join; + Block hash_join_sample_block; mutable std::mutex hash_join_mutex; }; diff --git a/src/Interpreters/HashJoin.cpp b/src/Interpreters/HashJoin.cpp index db6507ae96f..22d02c4dbdf 100644 --- a/src/Interpreters/HashJoin.cpp +++ b/src/Interpreters/HashJoin.cpp @@ -221,8 +221,8 @@ HashJoin::HashJoin(std::shared_ptr table_join_, const Block & right_s , right_sample_block(right_sample_block_) , log(&Poco::Logger::get("HashJoin")) { - LOG_DEBUG(log, "Datatype: {}, kind: {}, strictness: {}, right header: {}", data->type, kind, strictness, right_sample_block.dumpStructure()); - LOG_DEBUG(log, "Keys: {}", TableJoin::formatClauses(table_join->getClauses(), true)); + LOG_DEBUG(log, "({}) Datatype: {}, kind: {}, strictness: {}, right header: {}", fmt::ptr(this), data->type, kind, strictness, right_sample_block.dumpStructure()); + LOG_DEBUG(log, "({}) Keys: {}", fmt::ptr(this), TableJoin::formatClauses(table_join->getClauses(), true)); if (isCrossOrComma(kind)) { @@ -469,6 +469,9 @@ bool HashJoin::alwaysReturnsEmptySet() const size_t HashJoin::getTotalRowCount() const { + if (!data) + return 0; + size_t res = 0; if (data->type == Type::CROSS) @@ -484,28 +487,45 @@ size_t HashJoin::getTotalRowCount() const } } - return res; } size_t HashJoin::getTotalByteCount() const { + if (!data) + return 0; + +#ifdef NDEBUG + size_t debug_blocks_allocated_size = 0; + for (const auto & block : data->blocks) + debug_blocks_allocated_size += block.allocatedBytes(); + + if (data->blocks_allocated_size != debug_blocks_allocated_size) + throw Exception(ErrorCodes::LOGICAL_ERROR, "data->blocks_allocated_size != debug_blocks_allocated_size ({} != {})", + data->blocks_allocated_size, debug_blocks_allocated_size); + + size_t debug_blocks_nullmaps_allocated_size = 0; + for (const auto & nullmap : data->blocks_nullmaps) + debug_blocks_nullmaps_allocated_size += nullmap.second->allocatedBytes(); + + if (data->blocks_nullmaps_allocated_size != debug_blocks_nullmaps_allocated_size) + throw Exception(ErrorCodes::LOGICAL_ERROR, "data->blocks_nullmaps_allocated_size != debug_blocks_nullmaps_allocated_size ({} != {})", + data->blocks_nullmaps_allocated_size, debug_blocks_nullmaps_allocated_size); +#endif + size_t res = 0; - if (data->type == Type::CROSS) - { - for (const auto & block : data->blocks) - res += block.bytes(); - } - else + res += data->blocks_allocated_size; + res += data->blocks_nullmaps_allocated_size; + res += data->pool.size(); + + if (data->type != Type::CROSS) { for (const auto & map : data->maps) { joinDispatch(kind, strictness, map, [&](auto, auto, auto & map_) { res += map_.getTotalByteCountImpl(data->type); }); } - res += data->pool.size(); } - return res; } @@ -656,41 +676,57 @@ void HashJoin::initRightBlockStructure(Block & saved_block_sample) } } -Block HashJoin::structureRightBlock(const Block & block) const +Block HashJoin::prepareRightBlock(const Block & block, const Block & saved_block_sample_) { Block structured_block; - for (const auto & sample_column : savedBlockSample().getColumnsWithTypeAndName()) + for (const auto & sample_column : saved_block_sample_.getColumnsWithTypeAndName()) { ColumnWithTypeAndName column = block.getByName(sample_column.name); if (sample_column.column->isNullable()) JoinCommon::convertColumnToNullable(column); - structured_block.insert(column); + + if (column.column->lowCardinality() && !sample_column.column->lowCardinality()) + { + column.column = column.column->convertToFullColumnIfLowCardinality(); + column.type = removeLowCardinality(column.type); + } + + /// There's no optimization for right side const columns. Remove constness if any. + column.column = recursiveRemoveSparse(column.column->convertToFullColumnIfConst()); + structured_block.insert(std::move(column)); } return structured_block; } +Block HashJoin::prepareRightBlock(const Block & block) const +{ + return prepareRightBlock(block, savedBlockSample()); +} + bool HashJoin::addJoinedBlock(const Block & source_block, bool check_limits) { + if (!data) + throw Exception("Join data was released", ErrorCodes::LOGICAL_ERROR); + /// RowRef::SizeT is uint32_t (not size_t) for hash table Cell memory efficiency. /// It's possible to split bigger blocks and insert them by parts here. But it would be a dead code. if (unlikely(source_block.rows() > std::numeric_limits::max())) throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Too many rows in right table block for HashJoin: {}", source_block.rows()); - /// There's no optimization for right side const columns. Remove constness if any. - Block block = materializeBlock(source_block); - size_t rows = block.rows(); + size_t rows = source_block.rows(); - ColumnRawPtrMap all_key_columns = JoinCommon::materializeColumnsInplaceMap(block, table_join->getAllNames(JoinTableSide::Right)); + ColumnPtrMap all_key_columns = JoinCommon::materializeColumnsInplaceMap(source_block, table_join->getAllNames(JoinTableSide::Right)); - Block structured_block = structureRightBlock(block); + Block block_to_save = prepareRightBlock(source_block); size_t total_rows = 0; size_t total_bytes = 0; { if (storage_join_lock) throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "addJoinedBlock called when HashJoin locked to prevent updates"); - data->blocks.emplace_back(std::move(structured_block)); + data->blocks_allocated_size += block_to_save.allocatedBytes(); + data->blocks.emplace_back(std::move(block_to_save)); Block * stored_block = &data->blocks.back(); if (rows) @@ -702,7 +738,7 @@ bool HashJoin::addJoinedBlock(const Block & source_block, bool check_limits) { ColumnRawPtrs key_columns; for (const auto & name : onexprs[onexpr_idx].key_names_right) - key_columns.push_back(all_key_columns[name]); + key_columns.push_back(all_key_columns[name].get()); /// We will insert to the map only keys, where all components are not NULL. ConstNullMapPtr null_map{}; @@ -717,14 +753,14 @@ bool HashJoin::addJoinedBlock(const Block & source_block, bool check_limits) save_nullmap |= (*null_map)[i]; } - auto join_mask_col = JoinCommon::getColumnAsMask(block, onexprs[onexpr_idx].condColumnNames().second); + auto join_mask_col = JoinCommon::getColumnAsMask(source_block, onexprs[onexpr_idx].condColumnNames().second); /// Save blocks that do not hold conditions in ON section ColumnUInt8::MutablePtr not_joined_map = nullptr; - if (!multiple_disjuncts && isRightOrFull(kind) && !join_mask_col.isConstant()) + if (!multiple_disjuncts && isRightOrFull(kind) && join_mask_col.hasData()) { const auto & join_mask = join_mask_col.getData(); /// Save rows that do not hold conditions - not_joined_map = ColumnUInt8::create(block.rows(), 0); + not_joined_map = ColumnUInt8::create(rows, 0); for (size_t i = 0, sz = join_mask->size(); i < sz; ++i) { /// Condition hold, do not save row @@ -758,10 +794,16 @@ bool HashJoin::addJoinedBlock(const Block & source_block, bool check_limits) } if (!multiple_disjuncts && save_nullmap) + { + data->blocks_nullmaps_allocated_size += null_map_holder->allocatedBytes(); data->blocks_nullmaps.emplace_back(stored_block, null_map_holder); + } if (!multiple_disjuncts && not_joined_map) + { + data->blocks_nullmaps_allocated_size += not_joined_map->allocatedBytes(); data->blocks_nullmaps.emplace_back(stored_block, std::move(not_joined_map)); + } if (!check_limits) return true; @@ -794,7 +836,6 @@ struct JoinOnKeyColumns Sizes key_sizes; - explicit JoinOnKeyColumns(const Block & block, const Names & key_names_, const String & cond_column_name, const Sizes & key_sizes_) : key_names(key_names_) , materialized_keys_holder(JoinCommon::materializeColumns(block, key_names)) /// Rare case, when keys are constant or low cardinality. To avoid code bloat, simply materialize them. @@ -1672,7 +1713,7 @@ void HashJoin::checkTypesOfKeys(const Block & block) const void HashJoin::joinBlock(Block & block, ExtraBlockPtr & not_processed) { - if (data->released) + if (!data) throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot join after data has been released"); for (const auto & onexpr : table_join->getClauses()) @@ -1711,6 +1752,16 @@ void HashJoin::joinBlock(Block & block, ExtraBlockPtr & not_processed) } } +HashJoin::~HashJoin() +{ + if (!data) + { + LOG_TRACE(log, "({}) Join data has been already released", fmt::ptr(this)); + return; + } + LOG_TRACE(log, "({}) Join data is being destroyed, {} bytes and {} rows in hash table", fmt::ptr(this), getTotalByteCount(), getTotalRowCount()); +} + template struct AdderNonJoined { @@ -1749,7 +1800,6 @@ struct AdderNonJoined } }; - /// Stream from not joined earlier rows of the right table. /// Based on: /// - map offsetInternal saved in used_flags for single disjuncts @@ -1760,7 +1810,10 @@ class NotJoinedHash final : public NotJoinedBlocks::RightColumnsFiller public: NotJoinedHash(const HashJoin & parent_, UInt64 max_block_size_) : parent(parent_), max_block_size(max_block_size_), current_block_start(0) - {} + { + if (parent.data == nullptr) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot join after data has been released"); + } Block getEmptyBlock() override { return parent.savedBlockSample().cloneEmpty(); } @@ -1957,7 +2010,6 @@ IBlocksStreamPtr HashJoin::getNonJoinedBlocks(const Block & left_sample_block, size_t left_columns_count = left_sample_block.columns(); auto non_joined = std::make_unique>(*this, max_block_size); return std::make_unique(std::move(non_joined), result_sample_block, left_columns_count, *table_join); - } else { @@ -1986,10 +2038,20 @@ void HashJoin::reuseJoinedData(const HashJoin & join) } } -BlocksList HashJoin::releaseJoinedBlocks() +BlocksList HashJoin::releaseJoinedBlocks(bool restructure) { + LOG_TRACE(log, "({}) Join data is being released, {} bytes and {} rows in hash table", fmt::ptr(this), getTotalByteCount(), getTotalRowCount()); + BlocksList right_blocks = std::move(data->blocks); - data->released = true; + if (!restructure) + { + data.reset(); + return right_blocks; + } + + data->maps.clear(); + data->blocks_nullmaps.clear(); + BlocksList restored_blocks; /// names to positions optimization @@ -2018,6 +2080,7 @@ BlocksList HashJoin::releaseJoinedBlocks() restored_blocks.emplace_back(std::move(restored_block)); } + data.reset(); return restored_blocks; } diff --git a/src/Interpreters/HashJoin.h b/src/Interpreters/HashJoin.h index 5ea47823b69..b29b6e617c8 100644 --- a/src/Interpreters/HashJoin.h +++ b/src/Interpreters/HashJoin.h @@ -149,6 +149,8 @@ class HashJoin : public IJoin public: HashJoin(std::shared_ptr table_join_, const Block & right_sample_block, bool any_take_last_row_ = false); + ~HashJoin() override; + const TableJoin & getTableJoin() const override { return *table_join; } /** Add block of data from right hand of JOIN to the map. @@ -336,7 +338,8 @@ public: /// Additional data - strings for string keys and continuation elements of single-linked lists of references to rows. Arena pool; - bool released = false; + size_t blocks_allocated_size = 0; + size_t blocks_nullmaps_allocated_size = 0; }; using RightTableDataPtr = std::shared_ptr; @@ -351,7 +354,13 @@ public: void reuseJoinedData(const HashJoin & join); RightTableDataPtr getJoinedData() const { return data; } - BlocksList releaseJoinedBlocks(); + BlocksList releaseJoinedBlocks(bool restructure = false); + + /// Modify right block (update structure according to sample block) to save it in block list + static Block prepareRightBlock(const Block & block, const Block & saved_block_sample_); + Block prepareRightBlock(const Block & block) const; + + const Block & savedBlockSample() const { return data->sample_block; } bool isUsed(size_t off) const { return used_flags.getUsedSafe(off); } bool isUsed(const Block * block_ptr, size_t row_idx) const { return used_flags.getUsedSafe(block_ptr, row_idx); } @@ -403,10 +412,6 @@ private: void dataMapInit(MapsVariant &); - const Block & savedBlockSample() const { return data->sample_block; } - - /// Modify (structure) right block to save it in block list - Block structureRightBlock(const Block & stored_block) const; void initRightBlockStructure(Block & saved_block_sample); template diff --git a/src/Interpreters/JoinSwitcher.cpp b/src/Interpreters/JoinSwitcher.cpp index 996fd1e4ac7..15702784d74 100644 --- a/src/Interpreters/JoinSwitcher.cpp +++ b/src/Interpreters/JoinSwitcher.cpp @@ -41,7 +41,7 @@ bool JoinSwitcher::addJoinedBlock(const Block & block, bool) bool JoinSwitcher::switchJoin() { HashJoin * hash_join = assert_cast(join.get()); - BlocksList right_blocks = hash_join->releaseJoinedBlocks(); + BlocksList right_blocks = hash_join->releaseJoinedBlocks(true); /// Destroy old join & create new one. join = std::make_shared(table_join, right_sample_block); diff --git a/src/Interpreters/JoinUtils.cpp b/src/Interpreters/JoinUtils.cpp index a4ec64ab70e..b8d8dd5df74 100644 --- a/src/Interpreters/JoinUtils.cpp +++ b/src/Interpreters/JoinUtils.cpp @@ -324,17 +324,20 @@ ColumnRawPtrs materializeColumnsInplace(Block & block, const Names & names) return ptrs; } -ColumnRawPtrMap materializeColumnsInplaceMap(Block & block, const Names & names) +ColumnPtrMap materializeColumnsInplaceMap(const Block & block, const Names & names) { - ColumnRawPtrMap ptrs; + ColumnPtrMap ptrs; ptrs.reserve(names.size()); for (const auto & column_name : names) { - auto & column = block.getByName(column_name); - column.column = recursiveRemoveLowCardinality(column.column->convertToFullColumnIfConst()); - column.type = recursiveRemoveLowCardinality(column.type); - ptrs[column_name] = column.column.get(); + ColumnPtr column = block.getByName(column_name).column; + + column = column->convertToFullColumnIfConst(); + column = recursiveRemoveLowCardinality(column); + column = recursiveRemoveSparse(column); + + ptrs[column_name] = column; } return ptrs; @@ -529,24 +532,24 @@ bool typesEqualUpToNullability(DataTypePtr left_type, DataTypePtr right_type) JoinMask getColumnAsMask(const Block & block, const String & column_name) { if (column_name.empty()) - return JoinMask(true); + return JoinMask(true, block.rows()); const auto & src_col = block.getByName(column_name); DataTypePtr col_type = recursiveRemoveLowCardinality(src_col.type); if (isNothing(col_type)) - return JoinMask(false); + return JoinMask(false, block.rows()); if (const auto * const_cond = checkAndGetColumn(*src_col.column)) { - return JoinMask(const_cond->getBool(0)); + return JoinMask(const_cond->getBool(0), block.rows()); } ColumnPtr join_condition_col = recursiveRemoveLowCardinality(src_col.column->convertToFullColumnIfConst()); if (const auto * nullable_col = typeid_cast(join_condition_col.get())) { if (isNothing(assert_cast(*col_type).getNestedType())) - return JoinMask(false); + return JoinMask(false, block.rows()); /// Return nested column with NULL set to false const auto & nest_col = assert_cast(nullable_col->getNestedColumn()); @@ -639,9 +642,8 @@ Blocks scatterBlockByHash(const Strings & key_columns_names, const Block & block { if (num_shards == 0) throw Exception(ErrorCodes::LOGICAL_ERROR, "Number of shards must be positive"); - UNUSED(scatterBlockByHashPow2); - // if (likely(isPowerOf2(num_shards))) - // return scatterBlockByHashPow2(key_columns_names, block, num_shards); + if (likely(isPowerOf2(num_shards))) + return scatterBlockByHashPow2(key_columns_names, block, num_shards); return scatterBlockByHashGeneric(key_columns_names, block, num_shards); } diff --git a/src/Interpreters/JoinUtils.h b/src/Interpreters/JoinUtils.h index 36be71f2a91..f112ca22e5b 100644 --- a/src/Interpreters/JoinUtils.h +++ b/src/Interpreters/JoinUtils.h @@ -14,30 +14,34 @@ class TableJoin; class IColumn; using ColumnRawPtrs = std::vector; +using ColumnPtrMap = std::unordered_map; using ColumnRawPtrMap = std::unordered_map; using UInt8ColumnDataPtr = const ColumnUInt8::Container *; namespace JoinCommon { -/// Store boolean column handling constant value without materializing -/// Behaves similar to std::variant, but provides more convenient specialized interface +/// Helper interface to work with mask from JOIN ON section class JoinMask { public: - explicit JoinMask(bool value) + explicit JoinMask() : column(nullptr) - , const_value(value) + {} + + explicit JoinMask(bool value, size_t size) + : column(ColumnUInt8::create(size, value)) {} explicit JoinMask(ColumnPtr col) : column(col) - , const_value(false) {} - bool isConstant() { return !column; } + bool hasData() + { + return column != nullptr; + } - /// Return data if mask is not constant UInt8ColumnDataPtr getData() { if (column) @@ -47,15 +51,11 @@ public: inline bool isRowFiltered(size_t row) const { - if (column) - return !assert_cast(*column).getData()[row]; - return !const_value; + return !assert_cast(*column).getData()[row]; } private: ColumnPtr column; - /// Used if column is null - bool const_value; }; @@ -71,7 +71,7 @@ ColumnPtr emptyNotNullableClone(const ColumnPtr & column); ColumnPtr materializeColumn(const Block & block, const String & name); Columns materializeColumns(const Block & block, const Names & names); ColumnRawPtrs materializeColumnsInplace(Block & block, const Names & names); -ColumnRawPtrMap materializeColumnsInplaceMap(Block & block, const Names & names); +ColumnPtrMap materializeColumnsInplaceMap(const Block & block, const Names & names); ColumnRawPtrs getRawPointers(const Columns & columns); void convertToFullColumnsInplace(Block & block); void convertToFullColumnsInplace(Block & block, const Names & names, bool change_type = true); diff --git a/src/Interpreters/MergeJoin.cpp b/src/Interpreters/MergeJoin.cpp index bce5f9210bf..a5ab6b25d02 100644 --- a/src/Interpreters/MergeJoin.cpp +++ b/src/Interpreters/MergeJoin.cpp @@ -55,7 +55,7 @@ ColumnWithTypeAndName condtitionColumnToJoinable(const Block & block, const Stri if (!src_column_name.empty()) { auto join_mask = JoinCommon::getColumnAsMask(block, src_column_name); - if (!join_mask.isConstant()) + if (join_mask.hasData()) { for (size_t i = 0; i < res_size; ++i) null_map->getData()[i] = join_mask.isRowFiltered(i); diff --git a/src/Interpreters/TableJoin.h b/src/Interpreters/TableJoin.h index 9d03c9bd57b..84390adc0df 100644 --- a/src/Interpreters/TableJoin.h +++ b/src/Interpreters/TableJoin.h @@ -198,7 +198,7 @@ public: : size_limits(limits) , default_max_bytes(0) , join_use_nulls(use_nulls) - , join_algorithm(JoinAlgorithm::HASH) + , join_algorithm(JoinAlgorithm::DEFAULT) { clauses.emplace_back().key_names_right = key_names_right; table_join.kind = kind; diff --git a/tests/queries/0_stateless/01881_join_on_conditions_hash.sql.j2 b/tests/queries/0_stateless/01881_join_on_conditions_hash.sql.j2 index 0d6bef7fadb..d2cc066a1b1 100644 --- a/tests/queries/0_stateless/01881_join_on_conditions_hash.sql.j2 +++ b/tests/queries/0_stateless/01881_join_on_conditions_hash.sql.j2 @@ -30,6 +30,8 @@ SELECT t1.key, t1.key2 FROM t1 INNER ALL JOIN t2 ON t1.id == t2.id AND t2.key == SELECT '--'; SELECT t1.key FROM t1 INNER ANY JOIN t2 ON t1.id == t2.id AND t2.key == t2.key2 AND t1.key == t1.key2; +SELECT t1.key FROM t1 INNER ANY JOIN t2 ON t1.id == t2.id AND t2.key == t2.key2 AND t1.key == t1.key2 AND 0; -- { serverError INVALID_JOIN_ON_EXPRESSION } +SELECT t1.key FROM t1 INNER ANY JOIN t2 ON t1.id == t2.id AND t2.key == t2.key2 AND t1.key == t1.key2 AND 1; -- { serverError INVALID_JOIN_ON_EXPRESSION } SELECT '--'; SELECT '333' = t1.key FROM t1 INNER ANY JOIN t2 ON t1.id == t2.id AND t2.key == t2.key2 AND t1.key == t1.key2 AND t2.id > 2; diff --git a/tests/queries/1_stateful/00172_hits_joins.reference.j2 b/tests/queries/1_stateful/00172_hits_joins.reference.j2 index c357ede4c2c..1a43f1fb6ef 100644 --- a/tests/queries/1_stateful/00172_hits_joins.reference.j2 +++ b/tests/queries/1_stateful/00172_hits_joins.reference.j2 @@ -1,4 +1,4 @@ -{% for join_algorithm in ['hash', 'parallel_hash', 'full_sorting_merge'] -%} +{% for join_algorithm in ['hash', 'parallel_hash', 'full_sorting_merge', 'grace_hash'] -%} --- {{ join_algorithm }} --- 2014-03-17 1406958 265108 2014-03-19 1405797 261624 diff --git a/tests/queries/1_stateful/00172_hits_joins.sql.j2 b/tests/queries/1_stateful/00172_hits_joins.sql.j2 index 07ea899f536..4599d1d5a5d 100644 --- a/tests/queries/1_stateful/00172_hits_joins.sql.j2 +++ b/tests/queries/1_stateful/00172_hits_joins.sql.j2 @@ -1,6 +1,7 @@ -{% for join_algorithm in ['hash', 'parallel_hash', 'full_sorting_merge'] -%} +{% for join_algorithm in ['hash', 'parallel_hash', 'full_sorting_merge', 'grace_hash'] -%} -SET max_bytes_in_join = '{% if join_algorithm == 'grace_hash' %}20K{% else %}0{% endif %}'; +SET max_rows_in_join = '{% if join_algorithm == 'grace_hash' %}10K{% else %}0{% endif %}'; +SET grace_hash_join_initial_buckets = 4; SELECT '--- {{ join_algorithm }} ---';