improvements of async inserts

This commit is contained in:
Anton Popov 2021-09-02 02:18:09 +03:00
parent 5e694596c9
commit 7c42ce8370
9 changed files with 139 additions and 35 deletions

View File

@ -1,4 +1,4 @@
#include <Interpreters/AsynchronousInsertionQueue.h>
#include <Interpreters/AsynchronousInsertQueue.h>
#include <Core/Settings.h>
#include <DataStreams/BlockIO.h>
@ -17,6 +17,7 @@
#include <Storages/IStorage.h>
#include <Common/SipHash.h>
#include <Common/FieldVisitorHash.h>
#include <Access/AccessFlags.h>
namespace DB
@ -55,12 +56,24 @@ void AsynchronousInsertQueue::InsertData::Entry::finish(std::exception_ptr excep
cv.notify_all();
}
bool AsynchronousInsertQueue::InsertData::Entry::wait(const Milliseconds & timeout)
bool AsynchronousInsertQueue::InsertData::Entry::wait(const Milliseconds & timeout) const
{
std::unique_lock lock(mutex);
return cv.wait_for(lock, timeout, [&] { return finished; });
}
bool AsynchronousInsertQueue::InsertData::Entry::isFinished() const
{
std::lock_guard lock(mutex);
return finished;
}
std::exception_ptr AsynchronousInsertQueue::InsertData::Entry::getException() const
{
std::lock_guard lock(mutex);
return exception;
}
AsynchronousInsertQueue::AsynchronousInsertQueue(ContextPtr context_, size_t pool_size, size_t max_data_size_, const Timeout & timeouts)
: WithContext(context_)
@ -99,7 +112,7 @@ AsynchronousInsertQueue::~AsynchronousInsertQueue()
std::lock_guard lock(currently_processing_mutex);
for (const auto & [_, entry] : currently_processing_queries)
{
if (!entry->finished)
if (!entry->isFinished())
entry->finish(std::make_exception_ptr(Exception(
ErrorCodes::TIMEOUT_EXCEEDED,
"Wait for async insert timeout exceeded)")));
@ -116,13 +129,23 @@ void AsynchronousInsertQueue::scheduleProcessDataJob(const InsertQuery & key, In
});
}
void AsynchronousInsertQueue::push(const ASTPtr & query, const Settings & settings, const String & query_id)
void AsynchronousInsertQueue::push(ASTPtr query, ContextPtr query_context)
{
query = query->clone();
const auto & settings = query_context->getSettingsRef();
auto & insert_query = query->as<ASTInsertQuery &>();
InterpreterInsertQuery interpreter(query, query_context, settings.insert_allow_materialized_columns);
auto table = interpreter.getTable(insert_query);
auto sample_block = interpreter.getSampleBlock(insert_query, table, table->getInMemoryMetadataPtr());
query_context->checkAccess(AccessFlags(AccessType::INSERT), insert_query.table_id, sample_block.getNames());
auto read_buf = getReadBufferFromASTInsertQuery(query);
/// It's important to read the whole data per query as a single chunk, so we can safely drop it in case of parsing failure.
auto entry = std::make_shared<InsertData::Entry>();
entry->query_id = query_id;
entry->query_id = query_context->getCurrentQueryId();
entry->bytes.reserve(read_buf->totalSize());
WriteBufferFromString write_buf(entry->bytes);
@ -157,7 +180,7 @@ void AsynchronousInsertQueue::push(const ASTPtr & query, const Settings & settin
{
std::lock_guard currently_processing_lock(currently_processing_mutex);
currently_processing_queries.emplace(query_id, entry);
currently_processing_queries.emplace(entry->query_id, entry);
}
LOG_INFO(log, "Have {} pending inserts with total {} bytes of data for query '{}'",
@ -185,8 +208,8 @@ void AsynchronousInsertQueue::waitForProcessingQuery(const String & query_id, co
if (!finished)
throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Wait for async insert timeout ({} ms) exceeded)", timeout.count());
if (entry->exception)
std::rethrow_exception(entry->exception);
if (auto exception = entry->getException())
std::rethrow_exception(exception);
}
void AsynchronousInsertQueue::busyCheck()
@ -238,7 +261,7 @@ void AsynchronousInsertQueue::staleCheck()
void AsynchronousInsertQueue::cleanup()
{
auto timeout = busy_timeout * 3;
auto timeout = busy_timeout * 5;
while (!shutdown)
{
@ -259,16 +282,38 @@ void AsynchronousInsertQueue::cleanup()
if (!keys_to_remove.empty())
{
std::unique_lock write_lock(rwlock);
size_t total_removed = 0;
for (const auto & key : keys_to_remove)
{
auto it = queue.find(key);
if (it != queue.end() && !it->second)
if (it != queue.end() && !it->second->data)
{
queue.erase(it);
++total_removed;
}
}
if (total_removed)
LOG_TRACE(log, "Removed stale entries for {} queries from asynchronous insertion queue", keys_to_remove.size());
}
{
std::vector<String> ids_to_remove;
std::lock_guard lock(currently_processing_mutex);
for (const auto & [query_id, entry] : currently_processing_queries)
if (entry->isFinished())
ids_to_remove.push_back(query_id);
if (!ids_to_remove.empty())
{
for (const auto & id : ids_to_remove)
currently_processing_queries.erase(id);
LOG_TRACE(log, "Removed {} finished entries", ids_to_remove.size());
}
}
}
}
@ -341,7 +386,7 @@ try
total_rows, total_bytes, queryToString(key.query));
for (const auto & entry : data->entries)
if (!entry->finished)
if (!entry->isFinished())
entry->finish();
}
catch (...)

View File

@ -31,7 +31,7 @@ public:
AsynchronousInsertQueue(ContextPtr context_, size_t pool_size, size_t max_data_size, const Timeout & timeouts);
~AsynchronousInsertQueue();
void push(const ASTPtr & query, const Settings & settings, const String & query_id);
void push(ASTPtr query, ContextPtr query_context);
void waitForProcessingQuery(const String & query_id, const Milliseconds & timeout);
private:
@ -53,15 +53,17 @@ private:
String bytes;
String query_id;
bool finished = false;
std::exception_ptr exception;
void finish(std::exception_ptr exception_ = nullptr);
bool wait(const Milliseconds & timeout);
bool wait(const Milliseconds & timeout) const;
bool isFinished() const;
std::exception_ptr getException() const;
private:
std::mutex mutex;
std::condition_variable cv;
mutable std::mutex mutex;
mutable std::condition_variable cv;
bool finished = false;
std::exception_ptr exception;
};
using EntryPtr = std::shared_ptr<Entry>;
@ -92,8 +94,9 @@ private:
std::shared_mutex rwlock;
Queue queue;
using QueryIdToEntry = std::unordered_map<String, InsertData::EntryPtr>;
std::mutex currently_processing_mutex;
std::unordered_map<String, InsertData::EntryPtr> currently_processing_queries;
QueryIdToEntry currently_processing_queries;
/// Logic and events behind queue are as follows:
/// - reset_timeout: if queue is empty for some time, then we delete the queue and free all associated resources, e.g. tables.
@ -123,6 +126,10 @@ private:
void scheduleProcessDataJob(const InsertQuery & key, InsertDataPtr data, ContextPtr global_context);
static void processData(InsertQuery key, InsertDataPtr data, ContextPtr global_context);
public:
Queue getQueue() const { return queue; }
QueryIdToEntry getCurrentlyProcessingQueries() const { return currently_processing_queries; }
};
}

View File

@ -75,7 +75,7 @@
#include <Common/TraceCollector.h>
#include <common/logger_useful.h>
#include <Common/RemoteHostFilter.h>
#include <Interpreters/AsynchronousInsertionQueue.h>
#include <Interpreters/AsynchronousInsertQueue.h>
#include <Interpreters/DatabaseCatalog.h>
#include <Interpreters/JIT/CompiledExpressionCache.h>
#include <Storages/MergeTree/BackgroundJobsExecutor.h>

View File

@ -33,10 +33,11 @@ public:
StorageID getDatabaseTable() const;
void extendQueryLogElemImpl(QueryLogElement & elem, const ASTPtr & ast, ContextPtr context_) const override;
Block getSampleBlock(const ASTInsertQuery & query, const StoragePtr & table, const StorageMetadataPtr & metadata_snapshot) const;
StoragePtr getTable(ASTInsertQuery & query);
private:
StoragePtr getTable(ASTInsertQuery & query);
Block getSampleBlock(const ASTInsertQuery & query, const StoragePtr & table, const StorageMetadataPtr & metadata_snapshot) const;
std::pair<BlockIO, BlockOutputStreams> executeImpl(const StoragePtr & table, Block & sample_block);
ASTPtr query_ptr;

View File

@ -3,7 +3,7 @@
#include <Common/typeid_cast.h>
#include <Common/ThreadProfileEvents.h>
#include <Interpreters/AsynchronousInsertionQueue.h>
#include <Interpreters/AsynchronousInsertQueue.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/WriteBufferFromVector.h>
#include <IO/LimitReadBuffer.h>
@ -556,14 +556,14 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
if (async_insert)
{
auto query_id = context->getCurrentQueryId();
queue->push(ast, settings, query_id);
queue->push(ast, context);
BlockIO io;
if (settings.wait_for_async_insert)
{
auto timeout = settings.wait_for_async_insert_timeout.totalMilliseconds();
auto source = std::make_shared<WaitForAsyncInsertSource>(query_id, timeout, context->getGlobalContext());
auto query_id = context->getCurrentQueryId();
auto source = std::make_shared<WaitForAsyncInsertSource>(query_id, timeout, *queue);
io.pipeline.init(Pipe(source));
}

View File

@ -77,7 +77,9 @@ void ASTInsertQuery::formatImpl(const FormatSettings & settings, FormatState & s
void ASTInsertQuery::updateTreeHashImpl(SipHash & hash_state) const
{
hash_state.update(table_id.getFullTableName());
hash_state.update(table_id.database_name);
hash_state.update(table_id.table_name);
hash_state.update(table_id.uuid);
hash_state.update(format);
IAST::updateTreeHashImpl(hash_state);
}

View File

@ -1,7 +1,7 @@
#pragma once
#include <Processors/ISource.h>
#include <Interpreters/Context.h>
#include <Interpreters/AsynchronousInsertQueue.h>
namespace DB
{
@ -10,11 +10,11 @@ class WaitForAsyncInsertSource : public ISource, WithContext
{
public:
WaitForAsyncInsertSource(
const String & query_id_, size_t timeout_ms_, ContextPtr context_)
const String & query_id_, size_t timeout_ms_, AsynchronousInsertQueue & queue_)
: ISource(Block())
, WithContext(context_)
, query_id(query_id_)
, timeout_ms(timeout_ms_)
, queue(queue_)
{
}
@ -23,16 +23,14 @@ public:
protected:
Chunk generate() override
{
auto context = getContext();
auto * queue = context->getAsynchronousInsertQueue();
assert(queue);
queue->waitForProcessingQuery(query_id, std::chrono::milliseconds(timeout_ms));
queue.waitForProcessingQuery(query_id, std::chrono::milliseconds(timeout_ms));
return Chunk();
}
private:
String query_id;
size_t timeout_ms;
AsynchronousInsertQueue & queue;
};
}

View File

@ -0,0 +1,12 @@
1 a
2 b
3 c
4 d
5 e
6 f
7 g
8 h
9 i
10 j
11 k
12 l

View File

@ -0,0 +1,39 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
url="${CLICKHOUSE_URL}&async_insert_mode=1&wait_for_async_insert=1"
${CLICKHOUSE_CLIENT} -q "DROP TABLE IF EXISTS async_inserts"
${CLICKHOUSE_CLIENT} -q "CREATE TABLE async_inserts (id UInt32, s String) ENGINE = Memory"
${CLICKHOUSE_CURL} -sS "$url" -d 'INSERT INTO async_inserts FORMAT CSV
1,"a"
2,"b"' &
${CLICKHOUSE_CURL} -sS "$url" -d 'INSERT INTO async_inserts FORMAT CSV
3,"c"
4,"d"
' &
${CLICKHOUSE_CURL} -sS "$url" -d 'INSERT INTO async_inserts FORMAT JSONEachRow {"id": 5, "s": "e"} {"id": 6, "s": "f"}' &
${CLICKHOUSE_CURL} -sS "$url" -d 'INSERT INTO async_inserts FORMAT JSONEachRow {"id": 7, "s": "g"} {"id": 8, "s": "h"}' &
${CLICKHOUSE_CURL} -sS "$url" -d 'INSERT INTO async_inserts FORMAT CSVWithNames
"id","s"
9,"i"
10,"j"' &
${CLICKHOUSE_CURL} -sS "$url" -d 'INSERT INTO async_inserts FORMAT CSVWithNames
"id","s"
11,"k"
12,"l"
' &
wait
${CLICKHOUSE_CLIENT} -q "SELECT * FROM async_inserts ORDER BY id"
${CLICKHOUSE_CLIENT} -q "DROP TABLE async_inserts"