mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-19 16:20:50 +00:00
Merge pull request #47630 from ClickHouse/revert-47476-revert_46622
Fix user MemoryTracker counter in async inserts
This commit is contained in:
commit
befcdbcc6d
@ -106,4 +106,23 @@ std::string_view CurrentThread::getQueryId()
|
||||
return current_thread->getQueryId();
|
||||
}
|
||||
|
||||
MemoryTracker * CurrentThread::getUserMemoryTracker()
|
||||
{
|
||||
if (unlikely(!current_thread))
|
||||
return nullptr;
|
||||
|
||||
auto * tracker = current_thread->memory_tracker.getParent();
|
||||
while (tracker && tracker->level != VariableContext::User)
|
||||
tracker = tracker->getParent();
|
||||
|
||||
return tracker;
|
||||
}
|
||||
|
||||
void CurrentThread::flushUntrackedMemory()
|
||||
{
|
||||
if (unlikely(!current_thread))
|
||||
return;
|
||||
current_thread->flushUntrackedMemory();
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -41,6 +41,12 @@ public:
|
||||
/// Group to which belongs current thread
|
||||
static ThreadGroupPtr getGroup();
|
||||
|
||||
/// MemoryTracker for user that owns current thread if any
|
||||
static MemoryTracker * getUserMemoryTracker();
|
||||
|
||||
/// Adjust counters in MemoryTracker hierarchy if untracked_memory is not 0.
|
||||
static void flushUntrackedMemory();
|
||||
|
||||
/// A logs queue used by TCPHandler to pass logs to a client
|
||||
static void attachInternalTextLogsQueue(const std::shared_ptr<InternalTextLogsQueue> & logs_queue,
|
||||
LogsLevel client_logs_level);
|
||||
|
@ -18,6 +18,7 @@
|
||||
#include <Parsers/ASTInsertQuery.h>
|
||||
#include <Parsers/queryToString.h>
|
||||
#include <Storages/IStorage.h>
|
||||
#include <Common/CurrentThread.h>
|
||||
#include <Common/SipHash.h>
|
||||
#include <Common/FieldVisitorHash.h>
|
||||
#include <Common/DateLUT.h>
|
||||
@ -106,9 +107,10 @@ bool AsynchronousInsertQueue::InsertQuery::operator==(const InsertQuery & other)
|
||||
return query_str == other.query_str && settings == other.settings;
|
||||
}
|
||||
|
||||
AsynchronousInsertQueue::InsertData::Entry::Entry(String && bytes_, String && query_id_)
|
||||
AsynchronousInsertQueue::InsertData::Entry::Entry(String && bytes_, String && query_id_, MemoryTracker * user_memory_tracker_)
|
||||
: bytes(std::move(bytes_))
|
||||
, query_id(std::move(query_id_))
|
||||
, user_memory_tracker(user_memory_tracker_)
|
||||
, create_time(std::chrono::system_clock::now())
|
||||
{
|
||||
}
|
||||
@ -118,6 +120,15 @@ void AsynchronousInsertQueue::InsertData::Entry::finish(std::exception_ptr excep
|
||||
if (finished.exchange(true))
|
||||
return;
|
||||
|
||||
{
|
||||
// To avoid races on counter of user's MemoryTracker we should free memory at this moment.
|
||||
// Entries data must be destroyed in context of user who runs async insert.
|
||||
// Each entry in the list may correspond to a different user,
|
||||
// so we need to switch current thread's MemoryTracker.
|
||||
UserMemoryTrackerSwitcher switcher(user_memory_tracker);
|
||||
bytes = "";
|
||||
}
|
||||
|
||||
if (exception_)
|
||||
{
|
||||
promise.set_exception(exception_);
|
||||
@ -237,7 +248,7 @@ AsynchronousInsertQueue::push(ASTPtr query, ContextPtr query_context)
|
||||
if (auto quota = query_context->getQuota())
|
||||
quota->used(QuotaType::WRITTEN_BYTES, bytes.size());
|
||||
|
||||
auto entry = std::make_shared<InsertData::Entry>(std::move(bytes), query_context->getCurrentQueryId());
|
||||
auto entry = std::make_shared<InsertData::Entry>(std::move(bytes), query_context->getCurrentQueryId(), CurrentThread::getUserMemoryTracker());
|
||||
|
||||
InsertQuery key{query, settings};
|
||||
InsertDataPtr data_to_process;
|
||||
@ -445,6 +456,7 @@ try
|
||||
{
|
||||
auto buffer = std::make_unique<ReadBufferFromString>(entry->bytes);
|
||||
current_entry = entry;
|
||||
auto bytes_size = entry->bytes.size();
|
||||
size_t num_rows = executor.execute(*buffer);
|
||||
total_rows += num_rows;
|
||||
chunk_info->offsets.push_back(total_rows);
|
||||
@ -460,7 +472,7 @@ try
|
||||
elem.event_time_microseconds = timeInMicroseconds(entry->create_time);
|
||||
elem.query = key.query;
|
||||
elem.query_id = entry->query_id;
|
||||
elem.bytes = entry->bytes.size();
|
||||
elem.bytes = bytes_size;
|
||||
elem.rows = num_rows;
|
||||
elem.exception = current_exception;
|
||||
current_exception.clear();
|
||||
|
@ -1,6 +1,7 @@
|
||||
#pragma once
|
||||
|
||||
#include <Parsers/IAST_fwd.h>
|
||||
#include <Common/CurrentThread.h>
|
||||
#include <Common/ThreadPool.h>
|
||||
#include <Core/Settings.h>
|
||||
#include <Poco/Logger.h>
|
||||
@ -59,16 +60,42 @@ private:
|
||||
UInt128 calculateHash() const;
|
||||
};
|
||||
|
||||
struct UserMemoryTrackerSwitcher
|
||||
{
|
||||
explicit UserMemoryTrackerSwitcher(MemoryTracker * new_tracker)
|
||||
{
|
||||
auto * thread_tracker = CurrentThread::getMemoryTracker();
|
||||
prev_untracked_memory = current_thread->untracked_memory;
|
||||
prev_memory_tracker_parent = thread_tracker->getParent();
|
||||
|
||||
current_thread->untracked_memory = 0;
|
||||
thread_tracker->setParent(new_tracker);
|
||||
}
|
||||
|
||||
~UserMemoryTrackerSwitcher()
|
||||
{
|
||||
CurrentThread::flushUntrackedMemory();
|
||||
auto * thread_tracker = CurrentThread::getMemoryTracker();
|
||||
|
||||
current_thread->untracked_memory = prev_untracked_memory;
|
||||
thread_tracker->setParent(prev_memory_tracker_parent);
|
||||
}
|
||||
|
||||
MemoryTracker * prev_memory_tracker_parent;
|
||||
Int64 prev_untracked_memory;
|
||||
};
|
||||
|
||||
struct InsertData
|
||||
{
|
||||
struct Entry
|
||||
{
|
||||
public:
|
||||
const String bytes;
|
||||
String bytes;
|
||||
const String query_id;
|
||||
MemoryTracker * const user_memory_tracker;
|
||||
const std::chrono::time_point<std::chrono::system_clock> create_time;
|
||||
|
||||
Entry(String && bytes_, String && query_id_);
|
||||
Entry(String && bytes_, String && query_id_, MemoryTracker * user_memory_tracker_);
|
||||
|
||||
void finish(std::exception_ptr exception_ = nullptr);
|
||||
std::future<void> getFuture() { return promise.get_future(); }
|
||||
@ -79,6 +106,19 @@ private:
|
||||
std::atomic_bool finished = false;
|
||||
};
|
||||
|
||||
~InsertData()
|
||||
{
|
||||
auto it = entries.begin();
|
||||
// Entries must be destroyed in context of user who runs async insert.
|
||||
// Each entry in the list may correspond to a different user,
|
||||
// so we need to switch current thread's MemoryTracker parent on each iteration.
|
||||
while (it != entries.end())
|
||||
{
|
||||
UserMemoryTrackerSwitcher switcher((*it)->user_memory_tracker);
|
||||
it = entries.erase(it);
|
||||
}
|
||||
}
|
||||
|
||||
using EntryPtr = std::shared_ptr<Entry>;
|
||||
|
||||
std::list<EntryPtr> entries;
|
||||
|
11
tests/integration/test_async_insert_memory/configs/users.xml
Normal file
11
tests/integration/test_async_insert_memory/configs/users.xml
Normal file
@ -0,0 +1,11 @@
|
||||
<clickhouse>
|
||||
<users>
|
||||
<default>
|
||||
<password></password>
|
||||
<profile>default</profile>
|
||||
<quota>default</quota>
|
||||
<named_collection_control>1</named_collection_control>
|
||||
<show_named_collections_secrets>1</show_named_collections_secrets>
|
||||
</default>
|
||||
</users>
|
||||
</clickhouse>
|
55
tests/integration/test_async_insert_memory/test.py
Normal file
55
tests/integration/test_async_insert_memory/test.py
Normal file
@ -0,0 +1,55 @@
|
||||
import time
|
||||
|
||||
import pytest
|
||||
|
||||
from helpers.cluster import ClickHouseCluster
|
||||
|
||||
cluster = ClickHouseCluster(__file__)
|
||||
|
||||
node = cluster.add_instance(
|
||||
"node",
|
||||
user_configs=[
|
||||
"configs/users.xml",
|
||||
],
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture(scope="module", autouse=True)
|
||||
def start_cluster():
|
||||
try:
|
||||
cluster.start()
|
||||
yield cluster
|
||||
finally:
|
||||
cluster.shutdown()
|
||||
|
||||
|
||||
def test_memory_usage():
|
||||
node.query("CREATE USER IF NOT EXISTS A")
|
||||
node.query("GRANT ALL ON *.* TO A")
|
||||
|
||||
node.query(
|
||||
"CREATE TABLE async_table(data Array(UInt64)) ENGINE=MergeTree() ORDER BY data"
|
||||
)
|
||||
|
||||
node.get_query_request("SELECT count() FROM system.numbers", user="A")
|
||||
|
||||
INSERT_QUERY = "INSERT INTO async_table SETTINGS async_insert=1, wait_for_async_insert=1,async_insert_max_data_size=150000000 VALUES ({})"
|
||||
for iter in range(10):
|
||||
values = list(range(iter * 5000000, (iter + 1) * 5000000))
|
||||
node.query(INSERT_QUERY.format(values), user="A")
|
||||
|
||||
# Wait until buffers are freed
|
||||
time.sleep(5)
|
||||
|
||||
response = node.get_query_request(
|
||||
"SELECT groupArray(number) FROM numbers(1000000) SETTINGS max_memory_usage_for_user={}".format(
|
||||
30 * (2**23)
|
||||
),
|
||||
user="A",
|
||||
)
|
||||
|
||||
_, err = response.get_answer_and_error()
|
||||
assert err == "", "Query failed with error {}".format(err)
|
||||
|
||||
node.query("DROP TABLE async_table")
|
||||
node.query("DROP USER IF EXISTS A")
|
Loading…
Reference in New Issue
Block a user