Merge pull request #67883 from canhld94/optimize_join_engine

Join engine support OPTIMIZE query
This commit is contained in:
vdimir 2024-08-07 16:24:50 +00:00 committed by GitHub
commit 5e9c3c222b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 86 additions and 13 deletions

View File

@ -648,10 +648,8 @@ bool HashJoin::addBlockToJoin(const Block & source_block_, bool check_limits)
return table_join->sizeLimits().check(total_rows, total_bytes, "JOIN", ErrorCodes::SET_SIZE_LIMIT_EXCEEDED);
}
void HashJoin::shrinkStoredBlocksToFit(size_t & total_bytes_in_join)
void HashJoin::shrinkStoredBlocksToFit(size_t & total_bytes_in_join, bool force_optimize)
{
if (shrink_blocks)
return; /// Already shrunk
Int64 current_memory_usage = getCurrentQueryMemoryUsage();
Int64 query_memory_usage_delta = current_memory_usage - memory_usage_before_adding_blocks;
@ -659,15 +657,21 @@ void HashJoin::shrinkStoredBlocksToFit(size_t & total_bytes_in_join)
auto max_total_bytes_in_join = table_join->sizeLimits().max_bytes;
/** If accounted data size is more than half of `max_bytes_in_join`
* or query memory consumption growth from the beginning of adding blocks (estimation of memory consumed by join using memory tracker)
* is bigger than half of all memory available for query,
* then shrink stored blocks to fit.
*/
shrink_blocks = (max_total_bytes_in_join && total_bytes_in_join > max_total_bytes_in_join / 2) ||
(max_total_bytes_for_query && query_memory_usage_delta > max_total_bytes_for_query / 2);
if (!shrink_blocks)
return;
if (!force_optimize)
{
if (shrink_blocks)
return; /// Already shrunk
/** If accounted data size is more than half of `max_bytes_in_join`
* or query memory consumption growth from the beginning of adding blocks (estimation of memory consumed by join using memory tracker)
* is bigger than half of all memory available for query,
* then shrink stored blocks to fit.
*/
shrink_blocks = (max_total_bytes_in_join && total_bytes_in_join > max_total_bytes_in_join / 2) ||
(max_total_bytes_for_query && query_memory_usage_delta > max_total_bytes_for_query / 2);
if (!shrink_blocks)
return;
}
LOG_DEBUG(log, "Shrinking stored blocks, memory consumption is {} {} calculated by join, {} {} by memory tracker",
ReadableSize(total_bytes_in_join), max_total_bytes_in_join ? fmt::format("/ {}", ReadableSize(max_total_bytes_in_join)) : "",

View File

@ -372,7 +372,7 @@ public:
void debugKeys() const;
void shrinkStoredBlocksToFit(size_t & total_bytes_in_join);
void shrinkStoredBlocksToFit(size_t & total_bytes_in_join, bool force_optimize = false);
void setMaxJoinedBlockRows(size_t value) { max_joined_block_rows = value; }

View File

@ -75,6 +75,7 @@ StorageJoin::StorageJoin(
table_join = std::make_shared<TableJoin>(limits, use_nulls, kind, strictness, key_names);
join = std::make_shared<HashJoin>(table_join, getRightSampleBlock(), overwrite);
restore();
optimizeUnlocked();
}
RWLockImpl::LockHolder StorageJoin::tryLockTimedWithContext(const RWLock & lock, RWLockImpl::Type type, ContextPtr context) const
@ -99,6 +100,47 @@ SinkToStoragePtr StorageJoin::write(const ASTPtr & query, const StorageMetadataP
return StorageSetOrJoinBase::write(query, metadata_snapshot, context, /*async_insert=*/false);
}
bool StorageJoin::optimize(
const ASTPtr & /*query*/,
const StorageMetadataPtr & /*metadata_snapshot*/,
const ASTPtr & partition,
bool final,
bool deduplicate,
const Names & /* deduplicate_by_columns */,
bool cleanup,
ContextPtr context)
{
if (partition)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Partition cannot be specified when optimizing table of type Join");
if (final)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "FINAL cannot be specified when optimizing table of type Join");
if (deduplicate)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "DEDUPLICATE cannot be specified when optimizing table of type Join");
if (cleanup)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "CLEANUP cannot be specified when optimizing table of type Join");
std::lock_guard mutate_lock(mutate_mutex);
TableLockHolder lock_holder = tryLockTimedWithContext(rwlock, RWLockImpl::Write, context);
optimizeUnlocked();
return true;
}
void StorageJoin::optimizeUnlocked()
{
size_t current_bytes = join->getTotalByteCount();
size_t dummy = current_bytes;
join->shrinkStoredBlocksToFit(dummy, true);
size_t optimized_bytes = join->getTotalByteCount();
if (current_bytes > optimized_bytes)
LOG_INFO(getLogger("StorageJoin"), "Optimized Join storage from {} to {} bytes", current_bytes, optimized_bytes);
}
void StorageJoin::truncate(const ASTPtr &, const StorageMetadataPtr &, ContextPtr context, TableExclusiveLockHolder &)
{
std::lock_guard mutate_lock(mutate_mutex);

View File

@ -61,6 +61,18 @@ public:
SinkToStoragePtr write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr context, bool async_insert) override;
bool optimize(
const ASTPtr & /*query*/,
const StorageMetadataPtr & /*metadata_snapshot*/,
const ASTPtr & /*partition*/,
bool /*final*/,
bool /*deduplicate*/,
const Names & /* deduplicate_by_columns */,
bool /*cleanup*/,
ContextPtr /*context*/) override;
void optimizeUnlocked();
Pipe read(
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,

View File

@ -0,0 +1,10 @@
0 0
1 1
2 2
3 3
4 4
5 5
6 6
7 7
8 8
9 9

View File

@ -0,0 +1,5 @@
CREATE TABLE dict_03204 (k UInt64, v UInt64) ENGINE = Join(ANY, LEFT, k);
INSERT INTO dict_03204 SELECT number, number FROM numbers(10);
OPTIMIZE TABLE dict_03204;
SELECT * FROM dict_03204 ORDER BY k;
DROP TABLE dict_03204;