hash join use memory tracker delta to decide on shrink to fit

This commit is contained in:
vdimir 2023-09-14 12:03:30 +00:00
parent 5a47803a80
commit 40d42f4123
No known key found for this signature in database
GPG Key ID: 6EE4CE2BEDC51862
4 changed files with 68 additions and 11 deletions

View File

@ -59,6 +59,16 @@ struct NotProcessedCrossJoin : public ExtraBlock
size_t right_block;
};
Int64 getCurrentQueryMemoryUsage()
{
/// Use query-level memory tracker
if (auto * memory_tracker_child = CurrentThread::getMemoryTracker())
if (auto * memory_tracker = memory_tracker_child->getParent())
return memory_tracker->get();
return 0;
}
}
namespace JoinStuff
@ -738,6 +748,13 @@ bool HashJoin::addBlockToJoin(const Block & source_block_, bool check_limits)
if (unlikely(source_block_.rows() > std::numeric_limits<RowRef::SizeT>::max()))
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Too many rows in right table block for HashJoin: {}", source_block_.rows());
/** We do not allocate memory for stored blocks inside HashJoin, only for hash table.
* In case when we have all the blocks allocated before the first `addBlockToJoin` call, will already be quite high.
* In that case memory consumed by stored blocks will be underestimated.
*/
if (!memory_usage_before_adding_blocks)
memory_usage_before_adding_blocks = getCurrentQueryMemoryUsage();
Block source_block = source_block_;
if (strictness == JoinStrictness::Asof)
{
@ -882,12 +899,35 @@ bool HashJoin::addBlockToJoin(const Block & source_block_, bool check_limits)
}
}
auto max_total_bytes = table_join->sizeLimits().max_bytes;
if (!shrink_blocks && max_total_bytes && total_bytes > max_total_bytes / 2)
shrinkStoredBlocksToFit(total_bytes);
return table_join->sizeLimits().check(total_rows, total_bytes, "JOIN", ErrorCodes::SET_SIZE_LIMIT_EXCEEDED);
}
void HashJoin::shrinkStoredBlocksToFit(size_t & total_bytes_in_join)
{
if (shrink_blocks)
return; /// Already shrunk
Int64 current_memory_usage = getCurrentQueryMemoryUsage();
Int64 query_memory_usage_delta = current_memory_usage - memory_usage_before_adding_blocks;
Int64 max_total_bytes_for_query = memory_usage_before_adding_blocks ? table_join->getMaxMemoryUsage() : 0;
auto max_total_bytes_in_join = table_join->sizeLimits().max_bytes;
/** If accounted data size is more than half of `max_bytes_in_join`
* or query memory consumption growth from the beginning of adding blocks (estimation of memory consumed by join using memory tracker)
* is bigger than half of all memory available for query,
* then shrink stored blocks to fit.
*/
if ((max_total_bytes_in_join && total_bytes_in_join > max_total_bytes_in_join / 2) ||
(max_total_bytes_for_query && query_memory_usage_delta > max_total_bytes_for_query / 2))
{
shrink_blocks = true;
LOG_DEBUG(log, "Shrinking stored blocks table after {} of {} consumed",
ReadableSize(total_bytes), ReadableSize(max_total_bytes));
LOG_DEBUG(log, "Shrinking stored blocks, memory consumption is {} / {} caclulated by join, {} / {} by memory tracker",
ReadableSize(total_bytes_in_join), ReadableSize(max_total_bytes_in_join),
ReadableSize(query_memory_usage_delta), ReadableSize(max_total_bytes_for_query));
for (auto & stored_block : data->blocks)
{
size_t old_size = stored_block.allocatedBytes();
@ -896,15 +936,17 @@ bool HashJoin::addBlockToJoin(const Block & source_block_, bool check_limits)
chassert(old_size >= new_size);
data->blocks_allocated_size -= old_size - new_size;
}
auto new_total_bytes = getTotalByteCount();
size_t total_bytes_delta = total_bytes - new_total_bytes;
chassert(new_total_bytes <= total_bytes);
LOG_DEBUG(log, "Shrunk stored blocks {} freed, new memory consumption is {}",
ReadableSize(total_bytes_delta), ReadableSize(new_total_bytes));
total_bytes = new_total_bytes;
auto new_total_bytes_in_join = getTotalByteCount();
Int64 new_current_memory_usage = getCurrentQueryMemoryUsage();
size_t total_bytes_delta = total_bytes_in_join - new_total_bytes_in_join;
chassert(new_total_bytes_in_join <= total_bytes_in_join);
LOG_DEBUG(log, "Shrunk stored blocks {} freed ({} by memory tracker), new memory consumption is {} ({} by memory tracker)",
ReadableSize(total_bytes_delta), ReadableSize(new_current_memory_usage - current_memory_usage),
ReadableSize(new_total_bytes_in_join), ReadableSize(new_current_memory_usage));
total_bytes_in_join = new_total_bytes_in_join;
}
return table_join->sizeLimits().check(total_rows, total_bytes, "JOIN", ErrorCodes::SET_SIZE_LIMIT_EXCEEDED);
}

View File

@ -393,6 +393,8 @@ public:
void debugKeys() const;
void shrinkStoredBlocksToFit(size_t & total_bytes_in_join);
private:
template<bool> friend class NotJoinedHash;
@ -432,6 +434,7 @@ private:
/// When tracked memory consumption is more than a threshold, we will shrink to fit stored blocks.
bool shrink_blocks = false;
Int64 memory_usage_before_adding_blocks = 0;
Poco::Logger * log;

View File

@ -112,6 +112,7 @@ TableJoin::TableJoin(const Settings & settings, VolumePtr tmp_volume_)
, partial_merge_join_left_table_buffer_bytes(settings.partial_merge_join_left_table_buffer_bytes)
, max_files_to_merge(settings.join_on_disk_max_files_to_merge)
, temporary_files_codec(settings.temporary_files_codec)
, max_memory_usage(settings.max_memory_usage)
, tmp_volume(tmp_volume_)
{
}
@ -953,4 +954,10 @@ ActionsDAGPtr TableJoin::createJoinedBlockActions(ContextPtr context) const
return ExpressionAnalyzer(expression_list, syntax_result, context).getActionsDAG(true, false);
}
size_t TableJoin::getMaxMemoryUsage() const
{
return max_memory_usage;
}
}

View File

@ -146,6 +146,9 @@ private:
const size_t max_files_to_merge = 0;
const String temporary_files_codec = "LZ4";
/// Value if setting max_memory_usage for query, can be used when max_bytes_in_join is not specified.
size_t max_memory_usage = 0;
ASTs key_asts_left;
ASTs key_asts_right;
@ -244,6 +247,8 @@ public:
JoinStrictness strictness() const { return table_join.strictness; }
bool sameStrictnessAndKind(JoinStrictness, JoinKind) const;
const SizeLimits & sizeLimits() const { return size_limits; }
size_t getMaxMemoryUsage() const;
VolumePtr getGlobalTemporaryVolume() { return tmp_volume; }
ActionsDAGPtr createJoinedBlockActions(ContextPtr context) const;