Merge pull request #16121 from azat/total_memory_tracker-by-default

Significantly decrease MemoryTracking drift
This commit is contained in:
alexey-milovidov 2020-10-24 21:36:38 +03:00 committed by GitHub
commit b19311339f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 274 additions and 24 deletions

View File

@ -268,7 +268,9 @@ TESTS_TO_SKIP=(
00974_query_profiler
# Look at DistributedFilesToInsert, so cannot run in parallel.
01460_DistributedFilesToInsert
01457_DistributedFilesToInsert
01541_max_memory_usage_for_user
# Require python libraries like scipy, pandas and numpy
01322_ttest_scipy

View File

@ -258,7 +258,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
Poco::Logger * log = &logger();
UseSSL use_ssl;
ThreadStatus thread_status;
MainThreadStatus::getInstance();
registerFunctions();
registerAggregateFunctions();

View File

@ -13,6 +13,24 @@
#include <random>
#include <cstdlib>
namespace
{
MemoryTracker * getMemoryTracker()
{
if (auto * thread_memory_tracker = DB::CurrentThread::getMemoryTracker())
return thread_memory_tracker;
/// Once the main thread is initialized,
/// total_memory_tracker is initialized too.
/// And can be used, since MainThreadStatus is required for profiling.
if (DB::MainThreadStatus::get())
return &total_memory_tracker;
return nullptr;
}
}
namespace DB
{
@ -192,14 +210,15 @@ void MemoryTracker::free(Int64 size)
DB::TraceCollector::collect(DB::TraceType::MemorySample, StackTrace(), -size);
}
Int64 accounted_size = size;
if (level == VariableContext::Thread)
{
/// Could become negative if memory allocated in this thread is freed in another one
amount.fetch_sub(size, std::memory_order_relaxed);
amount.fetch_sub(accounted_size, std::memory_order_relaxed);
}
else
{
Int64 new_amount = amount.fetch_sub(size, std::memory_order_relaxed) - size;
Int64 new_amount = amount.fetch_sub(accounted_size, std::memory_order_relaxed) - accounted_size;
/** Sometimes, query could free some data, that was allocated outside of query context.
* Example: cache eviction.
@ -210,7 +229,7 @@ void MemoryTracker::free(Int64 size)
if (unlikely(new_amount < 0))
{
amount.fetch_sub(new_amount);
size += new_amount;
accounted_size += new_amount;
}
}
@ -218,7 +237,7 @@ void MemoryTracker::free(Int64 size)
loaded_next->free(size);
if (metric != CurrentMetrics::end())
CurrentMetrics::sub(metric, size);
CurrentMetrics::sub(metric, accounted_size);
}
@ -270,16 +289,24 @@ namespace CurrentMemoryTracker
void alloc(Int64 size)
{
if (auto * memory_tracker = DB::CurrentThread::getMemoryTracker())
if (auto * memory_tracker = getMemoryTracker())
{
current_thread->untracked_memory += size;
if (current_thread->untracked_memory > current_thread->untracked_memory_limit)
if (current_thread)
{
/// Zero untracked before track. If tracker throws out-of-limit we would be able to alloc up to untracked_memory_limit bytes
/// more. It could be useful to enlarge Exception message in rethrow logic.
Int64 tmp = current_thread->untracked_memory;
current_thread->untracked_memory = 0;
memory_tracker->alloc(tmp);
current_thread->untracked_memory += size;
if (current_thread->untracked_memory > current_thread->untracked_memory_limit)
{
/// Zero untracked before track. If tracker throws out-of-limit we would be able to alloc up to untracked_memory_limit bytes
/// more. It could be useful to enlarge Exception message in rethrow logic.
Int64 tmp = current_thread->untracked_memory;
current_thread->untracked_memory = 0;
memory_tracker->alloc(tmp);
}
}
/// total_memory_tracker only, ignore untracked_memory
else
{
memory_tracker->alloc(size);
}
}
}
@ -292,13 +319,21 @@ namespace CurrentMemoryTracker
void free(Int64 size)
{
if (auto * memory_tracker = DB::CurrentThread::getMemoryTracker())
if (auto * memory_tracker = getMemoryTracker())
{
current_thread->untracked_memory -= size;
if (current_thread->untracked_memory < -current_thread->untracked_memory_limit)
if (current_thread)
{
memory_tracker->free(-current_thread->untracked_memory);
current_thread->untracked_memory = 0;
current_thread->untracked_memory -= size;
if (current_thread->untracked_memory < -current_thread->untracked_memory_limit)
{
memory_tracker->free(-current_thread->untracked_memory);
current_thread->untracked_memory = 0;
}
}
/// total_memory_tracker only, ignore untracked_memory
else
{
memory_tracker->free(size);
}
}
}

View File

@ -20,6 +20,7 @@ namespace ErrorCodes
thread_local ThreadStatus * current_thread = nullptr;
thread_local ThreadStatus * main_thread = nullptr;
ThreadStatus::ThreadStatus()
@ -115,4 +116,20 @@ void ThreadStatus::onFatalError()
fatal_error_callback();
}
ThreadStatus * MainThreadStatus::main_thread = nullptr;
MainThreadStatus & MainThreadStatus::getInstance()
{
static MainThreadStatus thread_status;
return thread_status;
}
MainThreadStatus::MainThreadStatus()
: ThreadStatus()
{
main_thread = current_thread;
}
MainThreadStatus::~MainThreadStatus()
{
main_thread = nullptr;
}
}

View File

@ -215,4 +215,22 @@ private:
void setupState(const ThreadGroupStatusPtr & thread_group_);
};
/**
* Creates ThreadStatus for the main thread.
*/
class MainThreadStatus : public ThreadStatus
{
public:
static MainThreadStatus & getInstance();
static ThreadStatus * get() { return main_thread; }
static bool isMainThread() { return main_thread == current_thread; }
~MainThreadStatus();
private:
MainThreadStatus();
static ThreadStatus * main_thread;
};
}

View File

@ -66,10 +66,20 @@ void TraceCollector::collect(TraceType trace_type, const StackTrace & stack_trac
char buffer[buf_size];
WriteBufferFromFileDescriptorDiscardOnFailure out(pipe.fds_rw[1], buf_size, buffer);
StringRef query_id = CurrentThread::getQueryId();
query_id.size = std::min(query_id.size, QUERY_ID_MAX_LEN);
StringRef query_id;
UInt64 thread_id;
auto thread_id = CurrentThread::get().thread_id;
if (CurrentThread::isInitialized())
{
query_id = CurrentThread::getQueryId();
query_id.size = std::min(query_id.size, QUERY_ID_MAX_LEN);
thread_id = CurrentThread::get().thread_id;
}
else
{
thread_id = MainThreadStatus::get()->thread_id;
}
writeChar(false, out); /// true if requested to stop the collecting thread.
writeStringBinary(query_id, out);

View File

@ -300,8 +300,8 @@ void ThreadStatus::detachQuery(bool exit_if_already_detached, bool thread_exits)
performance_counters.setParent(&ProfileEvents::global_counters);
memory_tracker.reset();
/// Must reset pointer to thread_group's memory_tracker, because it will be destroyed two lines below.
memory_tracker.setParent(nullptr);
/// Must reset pointer to thread_group's memory_tracker, because it will be destroyed two lines below (will reset to its parent).
memory_tracker.setParent(thread_group->memory_tracker.getParent());
query_id.clear();
query_context = nullptr;

View File

@ -0,0 +1,4 @@
<yandex>
<!-- this update period also syncs MemoryTracking with RSS, disable this, by using period = 1 day -->
<asynchronous_metrics_update_period_s>86400</asynchronous_metrics_update_period_s>
</yandex>

View File

@ -0,0 +1,7 @@
<yandex>
<metric_log remove="remove"/>
<query_masking_rules remove="remove"/>
<query_thread_log remove="remove"/>
<text_log remove="remove"/>
<trace_log remove="remove"/>
</yandex>

View File

@ -0,0 +1,95 @@
# pylint: disable=unused-argument
# pylint: disable=redefined-outer-name
# pylint: disable=line-too-long
import logging
import pytest
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node = cluster.add_instance('node', main_configs=[
'configs/no_system_log.xml',
'configs/asynchronous_metrics_update_period_s.xml',
])
logging.getLogger().setLevel(logging.INFO)
logging.getLogger().addHandler(logging.StreamHandler())
@pytest.fixture(scope='module', autouse=True)
def start_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
query_settings = {
'max_threads': 1,
'query_profiler_real_time_period_ns': 0,
'query_profiler_cpu_time_period_ns': 0,
'log_queries': 0,
}
sample_query = "SELECT groupArray(repeat('a', 1000)) FROM numbers(10000) GROUP BY number%10 FORMAT JSON"
def query(*args, **kwargs):
if 'settings' not in kwargs:
kwargs['settings'] = query_settings
else:
kwargs['settings'].update(query_settings)
return node.query(*args, **kwargs)
def http_query(*args, **kwargs):
if 'params' not in kwargs:
kwargs['params'] = query_settings
else:
kwargs['params'].update(query_settings)
return node.http_query(*args, **kwargs)
def get_MemoryTracking():
return int(http_query("SELECT value FROM system.metrics WHERE metric = 'MemoryTracking'"))
def check_memory(memory):
# 3 changes to MemoryTracking is minimum, since:
# - this is not that high to not detect inacuracy
# - memory can go like X/X+N due to some background allocations
# - memory can go like X/X+N/X, so at least 2 changes
changes_allowed = 3
# if number of samples is large enough, use 10% from them
# (actually most of the time there will be only few changes, it was made 10% to avoid flackiness)
changes_allowed_auto=int(len(memory) * 0.1)
changes_allowed = max(changes_allowed_auto, changes_allowed)
changed=len(set(memory))
logging.info('Changes: allowed=%s, actual=%s, sample=%s',
changes_allowed, changed, len(memory))
assert changed < changes_allowed
def test_http():
memory = []
memory.append(get_MemoryTracking())
for _ in range(100):
http_query(sample_query)
memory.append(get_MemoryTracking())
check_memory(memory)
def test_tcp_multiple_sessions():
memory = []
memory.append(get_MemoryTracking())
for _ in range(100):
query(sample_query)
memory.append(get_MemoryTracking())
check_memory(memory)
def test_tcp_single_session():
memory = []
memory.append(get_MemoryTracking())
sample_queries = [
sample_query,
"SELECT metric, value FROM system.metrics WHERE metric = 'MemoryTracking'",
] * 100
rows = query(';'.join(sample_queries))
memory = rows.split('\n')
memory = filter(lambda x: x.startswith('MemoryTracking'), memory)
memory = map(lambda x: x.split('\t')[1], memory)
memory = [*memory]
check_memory(memory)

View File

@ -0,0 +1,5 @@
HTTP
TCP_ONE_SESSION
TCP
OK
KILL sleep

View File

@ -0,0 +1,57 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. "$CURDIR"/../shell_config.sh
# Regression for MemoryTracker drift via HTTP queries.
#
# For this will be used:
# - max_memory_usage_for_user
# - one users' query in background (to avoid reseting max_memory_usage_for_user)
query="SELECT groupArray(repeat('a', 1000)) FROM numbers(10000) GROUP BY number%10 FORMAT JSON"
function execute_http()
{
for _ in {1..100}; do
$CLICKHOUSE_CURL -sS "$CLICKHOUSE_URL&max_memory_usage_for_user=100Mi&max_threads=1" -d@- <<<"$query" | grep -F DB::Exception:
done
}
function execute_tcp()
{
# slow in debug, but should trigger the problem in ~10 iterations, so 20 is ok
for _ in {1..20}; do
${CLICKHOUSE_CLIENT} --max_memory_usage_for_user=100Mi --max_threads=1 -q "$query" | grep -F DB::Exception:
done
}
function execute_tcp_one_session()
{
for _ in {1..30}; do
echo "$query;"
done | ${CLICKHOUSE_CLIENT} -nm --max_memory_usage_for_user=100Mi --max_threads=1 | grep -F DB::Exception:
}
# one users query in background (to avoid reseting max_memory_usage_for_user)
# --max_block_size=1 to make it killable (check the state each 1 second, 1 row)
# (the test takes ~40 seconds in debug build, so 60 seconds is ok)
${CLICKHOUSE_CLIENT} --max_block_size=1 --format Null -q 'SELECT sleepEachRow(1) FROM numbers(600)' &
# trap
sleep_query_pid=$!
function cleanup()
{
echo 'KILL sleep'
# if the timeout will not be enough, it will trigger "No such process" error/message
kill $sleep_query_pid
}
trap cleanup EXIT
echo 'HTTP'
execute_http
echo 'TCP_ONE_SESSION'
execute_tcp_one_session
echo 'TCP'
execute_tcp
echo 'OK'
exit 0