diff --git a/src/Interpreters/HashJoin/HashJoin.cpp b/src/Interpreters/HashJoin/HashJoin.cpp index 0c7cad4360d..a621ce16fb1 100644 --- a/src/Interpreters/HashJoin/HashJoin.cpp +++ b/src/Interpreters/HashJoin/HashJoin.cpp @@ -648,10 +648,8 @@ bool HashJoin::addBlockToJoin(const Block & source_block_, bool check_limits) return table_join->sizeLimits().check(total_rows, total_bytes, "JOIN", ErrorCodes::SET_SIZE_LIMIT_EXCEEDED); } -void HashJoin::shrinkStoredBlocksToFit(size_t & total_bytes_in_join) +void HashJoin::shrinkStoredBlocksToFit(size_t & total_bytes_in_join, bool force_optimize) { - if (shrink_blocks) - return; /// Already shrunk Int64 current_memory_usage = getCurrentQueryMemoryUsage(); Int64 query_memory_usage_delta = current_memory_usage - memory_usage_before_adding_blocks; @@ -659,15 +657,21 @@ void HashJoin::shrinkStoredBlocksToFit(size_t & total_bytes_in_join) auto max_total_bytes_in_join = table_join->sizeLimits().max_bytes; - /** If accounted data size is more than half of `max_bytes_in_join` - * or query memory consumption growth from the beginning of adding blocks (estimation of memory consumed by join using memory tracker) - * is bigger than half of all memory available for query, - * then shrink stored blocks to fit. - */ - shrink_blocks = (max_total_bytes_in_join && total_bytes_in_join > max_total_bytes_in_join / 2) || - (max_total_bytes_for_query && query_memory_usage_delta > max_total_bytes_for_query / 2); - if (!shrink_blocks) - return; + if (!force_optimize) + { + if (shrink_blocks) + return; /// Already shrunk + + /** If accounted data size is more than half of `max_bytes_in_join` + * or query memory consumption growth from the beginning of adding blocks (estimation of memory consumed by join using memory tracker) + * is bigger than half of all memory available for query, + * then shrink stored blocks to fit. + */ + shrink_blocks = (max_total_bytes_in_join && total_bytes_in_join > max_total_bytes_in_join / 2) || + (max_total_bytes_for_query && query_memory_usage_delta > max_total_bytes_for_query / 2); + if (!shrink_blocks) + return; + } LOG_DEBUG(log, "Shrinking stored blocks, memory consumption is {} {} calculated by join, {} {} by memory tracker", ReadableSize(total_bytes_in_join), max_total_bytes_in_join ? fmt::format("/ {}", ReadableSize(max_total_bytes_in_join)) : "", diff --git a/src/Interpreters/HashJoin/HashJoin.h b/src/Interpreters/HashJoin/HashJoin.h index 0b115b9fdbb..00f5ef6d214 100644 --- a/src/Interpreters/HashJoin/HashJoin.h +++ b/src/Interpreters/HashJoin/HashJoin.h @@ -372,7 +372,7 @@ public: void debugKeys() const; - void shrinkStoredBlocksToFit(size_t & total_bytes_in_join); + void shrinkStoredBlocksToFit(size_t & total_bytes_in_join, bool force_optimize = false); void setMaxJoinedBlockRows(size_t value) { max_joined_block_rows = value; } diff --git a/src/Storages/StorageJoin.cpp b/src/Storages/StorageJoin.cpp index a0d6cf11b64..9dace45d2ac 100644 --- a/src/Storages/StorageJoin.cpp +++ b/src/Storages/StorageJoin.cpp @@ -75,6 +75,7 @@ StorageJoin::StorageJoin( table_join = std::make_shared(limits, use_nulls, kind, strictness, key_names); join = std::make_shared(table_join, getRightSampleBlock(), overwrite); restore(); + optimizeUnlocked(); } RWLockImpl::LockHolder StorageJoin::tryLockTimedWithContext(const RWLock & lock, RWLockImpl::Type type, ContextPtr context) const @@ -99,6 +100,47 @@ SinkToStoragePtr StorageJoin::write(const ASTPtr & query, const StorageMetadataP return StorageSetOrJoinBase::write(query, metadata_snapshot, context, /*async_insert=*/false); } +bool StorageJoin::optimize( + const ASTPtr & /*query*/, + const StorageMetadataPtr & /*metadata_snapshot*/, + const ASTPtr & partition, + bool final, + bool deduplicate, + const Names & /* deduplicate_by_columns */, + bool cleanup, + ContextPtr context) +{ + + if (partition) + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Partition cannot be specified when optimizing table of type Join"); + + if (final) + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "FINAL cannot be specified when optimizing table of type Join"); + + if (deduplicate) + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "DEDUPLICATE cannot be specified when optimizing table of type Join"); + + if (cleanup) + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "CLEANUP cannot be specified when optimizing table of type Join"); + + std::lock_guard mutate_lock(mutate_mutex); + TableLockHolder lock_holder = tryLockTimedWithContext(rwlock, RWLockImpl::Write, context); + + optimizeUnlocked(); + return true; +} + +void StorageJoin::optimizeUnlocked() +{ + size_t current_bytes = join->getTotalByteCount(); + size_t dummy = current_bytes; + join->shrinkStoredBlocksToFit(dummy, true); + + size_t optimized_bytes = join->getTotalByteCount(); + if (current_bytes > optimized_bytes) + LOG_INFO(getLogger("StorageJoin"), "Optimized Join storage from {} to {} bytes", current_bytes, optimized_bytes); +} + void StorageJoin::truncate(const ASTPtr &, const StorageMetadataPtr &, ContextPtr context, TableExclusiveLockHolder &) { std::lock_guard mutate_lock(mutate_mutex); diff --git a/src/Storages/StorageJoin.h b/src/Storages/StorageJoin.h index c76df0cb452..10a551b4063 100644 --- a/src/Storages/StorageJoin.h +++ b/src/Storages/StorageJoin.h @@ -61,6 +61,18 @@ public: SinkToStoragePtr write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr context, bool async_insert) override; + bool optimize( + const ASTPtr & /*query*/, + const StorageMetadataPtr & /*metadata_snapshot*/, + const ASTPtr & /*partition*/, + bool /*final*/, + bool /*deduplicate*/, + const Names & /* deduplicate_by_columns */, + bool /*cleanup*/, + ContextPtr /*context*/) override; + + void optimizeUnlocked(); + Pipe read( const Names & column_names, const StorageSnapshotPtr & storage_snapshot, diff --git a/tests/queries/0_stateless/03204_storage_join_optimize.reference b/tests/queries/0_stateless/03204_storage_join_optimize.reference new file mode 100644 index 00000000000..af98bcd6397 --- /dev/null +++ b/tests/queries/0_stateless/03204_storage_join_optimize.reference @@ -0,0 +1,10 @@ +0 0 +1 1 +2 2 +3 3 +4 4 +5 5 +6 6 +7 7 +8 8 +9 9 diff --git a/tests/queries/0_stateless/03204_storage_join_optimize.sql b/tests/queries/0_stateless/03204_storage_join_optimize.sql new file mode 100644 index 00000000000..03a4658ba6c --- /dev/null +++ b/tests/queries/0_stateless/03204_storage_join_optimize.sql @@ -0,0 +1,5 @@ +CREATE TABLE dict_03204 (k UInt64, v UInt64) ENGINE = Join(ANY, LEFT, k); +INSERT INTO dict_03204 SELECT number, number FROM numbers(10); +OPTIMIZE TABLE dict_03204; +SELECT * FROM dict_03204 ORDER BY k; +DROP TABLE dict_03204;