Merge pull request #47102 from CheSema/merge-mutate-do-thread-group

do flushUntrackedMemory when context switches
This commit is contained in:
Sema Checherinda 2023-03-03 20:23:25 +01:00 committed by GitHub
commit 85178c9609
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 30 additions and 18 deletions

View File

@ -126,11 +126,7 @@ void CurrentThread::flushUntrackedMemory()
{
if (unlikely(!current_thread))
return;
if (current_thread->untracked_memory == 0)
return;
current_thread->memory_tracker.adjustWithUntrackedMemory(current_thread->untracked_memory);
current_thread->untracked_memory = 0;
current_thread->flushUntrackedMemory();
}
}

View File

@ -144,9 +144,18 @@ ThreadStatus::ThreadStatus()
#endif
}
void ThreadStatus::flushUntrackedMemory()
{
if (untracked_memory == 0)
return;
memory_tracker.adjustWithUntrackedMemory(untracked_memory);
untracked_memory = 0;
}
ThreadStatus::~ThreadStatus()
{
memory_tracker.adjustWithUntrackedMemory(untracked_memory);
flushUntrackedMemory();
if (thread_group)
{

View File

@ -290,6 +290,8 @@ public:
void logToQueryViewsLog(const ViewRuntimeData & vinfo);
void flushUntrackedMemory();
protected:
void applyQuerySettings();

View File

@ -352,8 +352,10 @@ void ThreadStatus::detachQuery(bool exit_if_already_detached, bool thread_exits)
thread_group->threads.erase(this);
}
performance_counters.setParent(&ProfileEvents::global_counters);
memory_tracker.reset();
flushUntrackedMemory();
memory_tracker.reset();
memory_tracker.setParent(thread_group->memory_tracker.getParent());
query_id.clear();

View File

@ -144,8 +144,11 @@ MergeInfo MergeListElement::getInfo() const
MergeListElement::~MergeListElement()
{
CurrentThread::getMemoryTracker()->adjustWithUntrackedMemory(untracked_memory);
untracked_memory = 0;
if (untracked_memory != 0)
{
CurrentThread::getMemoryTracker()->adjustWithUntrackedMemory(untracked_memory);
untracked_memory = 0;
}
}

View File

@ -1056,13 +1056,13 @@ def test_seekable_formats(started_cluster):
table_function = f"s3(s3_orc, structure='a Int32, b String', format='ORC')"
exec_query_with_retry(
instance,
f"insert into table function {table_function} SELECT number, randomString(100) FROM numbers(1000000) settings s3_truncate_on_insert=1",
f"insert into table function {table_function} SELECT number, randomString(100) FROM numbers(1500000) settings s3_truncate_on_insert=1",
)
result = instance.query(
f"SELECT count() FROM {table_function} SETTINGS max_memory_usage='50M'"
f"SELECT count() FROM {table_function} SETTINGS max_memory_usage='60M'"
)
assert int(result) == 1000000
assert int(result) == 1500000
instance.query(f"SELECT * FROM {table_function} FORMAT Null")
@ -1073,7 +1073,7 @@ def test_seekable_formats(started_cluster):
result = result.strip()
assert result.endswith("MiB")
result = result[: result.index(".")]
assert int(result) > 80
assert int(result) > 150
def test_seekable_formats_url(started_cluster):
@ -1083,23 +1083,23 @@ def test_seekable_formats_url(started_cluster):
table_function = f"s3(s3_parquet, structure='a Int32, b String', format='Parquet')"
exec_query_with_retry(
instance,
f"insert into table function {table_function} SELECT number, randomString(100) FROM numbers(1000000) settings s3_truncate_on_insert=1",
f"insert into table function {table_function} SELECT number, randomString(100) FROM numbers(1500000) settings s3_truncate_on_insert=1",
)
result = instance.query(f"SELECT count() FROM {table_function}")
assert int(result) == 1000000
assert int(result) == 1500000
table_function = f"s3(s3_orc, structure='a Int32, b String', format='ORC')"
exec_query_with_retry(
instance,
f"insert into table function {table_function} SELECT number, randomString(100) FROM numbers(1000000) settings s3_truncate_on_insert=1",
f"insert into table function {table_function} SELECT number, randomString(100) FROM numbers(1500000) settings s3_truncate_on_insert=1",
)
table_function = f"url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_parquet', 'Parquet', 'a Int32, b String')"
result = instance.query(
f"SELECT count() FROM {table_function} SETTINGS max_memory_usage='50M'"
f"SELECT count() FROM {table_function} SETTINGS max_memory_usage='60M'"
)
assert int(result) == 1000000
assert int(result) == 1500000
def test_empty_file(started_cluster):