Merge pull request #33239 from CurtizJ/async-insert-query-log

Add asynchronous inserts to query log
This commit is contained in:
Kseniia Sumarokova 2021-12-29 09:34:08 +03:00 committed by GitHub
commit 9b63fa6949
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 105 additions and 50 deletions

View File

@ -466,7 +466,7 @@ StorageID InterpreterInsertQuery::getDatabaseTable() const
}
void InterpreterInsertQuery::extendQueryLogElemImpl(QueryLogElement & elem, const ASTPtr &, ContextPtr context_) const
void InterpreterInsertQuery::extendQueryLogElemImpl(QueryLogElement & elem, ContextPtr context_)
{
elem.query_kind = "Insert";
const auto & insert_table = context_->getInsertionTable();
@ -477,4 +477,9 @@ void InterpreterInsertQuery::extendQueryLogElemImpl(QueryLogElement & elem, cons
}
}
void InterpreterInsertQuery::extendQueryLogElemImpl(QueryLogElement & elem, const ASTPtr &, ContextPtr context_) const
{
extendQueryLogElemImpl(elem, context_);
}
}

View File

@ -40,6 +40,7 @@ public:
ThreadStatus * thread_status = nullptr,
std::atomic_uint64_t * elapsed_counter_ms = nullptr);
static void extendQueryLogElemImpl(QueryLogElement & elem, ContextPtr context_);
void extendQueryLogElemImpl(QueryLogElement & elem, const ASTPtr & ast, ContextPtr context_) const override;
StoragePtr getTable(ASTInsertQuery & query);

View File

@ -556,9 +556,14 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
auto * insert_query = ast->as<ASTInsertQuery>();
if (insert_query && insert_query->table_id)
/// Resolve database before trying to use async insert feature - to properly hash the query.
insert_query->table_id = context->resolveStorageID(insert_query->table_id);
/// Resolve database before trying to use async insert feature - to properly hash the query.
if (insert_query)
{
if (insert_query->table_id)
insert_query->table_id = context->resolveStorageID(insert_query->table_id);
else if (auto table = insert_query->getTable(); !table.empty())
insert_query->table_id = context->resolveStorageID(StorageID{insert_query->getDatabase(), table});
}
if (insert_query && insert_query->select)
{
@ -579,8 +584,14 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
}
}
else
{
/// reset Input callbacks if query is not INSERT SELECT
context->resetInputCallbacks();
}
StreamLocalLimits limits;
std::shared_ptr<const EnabledQuota> quota;
std::unique_ptr<IInterpreter> interpreter;
auto * queue = context->getAsynchronousInsertQueue();
const bool async_insert = queue
@ -591,65 +602,71 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
{
queue->push(ast, context);
BlockIO io;
if (settings.wait_for_async_insert)
{
auto timeout = settings.wait_for_async_insert_timeout.totalMilliseconds();
auto query_id = context->getCurrentQueryId();
auto source = std::make_shared<WaitForAsyncInsertSource>(query_id, timeout, *queue);
io.pipeline = QueryPipeline(Pipe(std::move(source)));
res.pipeline = QueryPipeline(Pipe(std::move(source)));
}
return std::make_tuple(ast, std::move(io));
}
auto interpreter = InterpreterFactory::get(ast, context, SelectQueryOptions(stage).setInternal(internal));
std::shared_ptr<const EnabledQuota> quota;
if (!interpreter->ignoreQuota())
{
quota = context->getQuota();
if (quota)
{
if (ast->as<ASTSelectQuery>() || ast->as<ASTSelectWithUnionQuery>())
{
quota->used(QuotaType::QUERY_SELECTS, 1);
}
else if (ast->as<ASTInsertQuery>())
{
quota->used(QuotaType::QUERY_INSERTS, 1);
}
quota->used(QuotaType::QUERY_INSERTS, 1);
quota->used(QuotaType::QUERIES, 1);
quota->checkExceeded(QuotaType::ERRORS);
}
}
StreamLocalLimits limits;
if (!interpreter->ignoreLimits())
{
limits.mode = LimitsMode::LIMITS_CURRENT; //-V1048
limits.size_limits = SizeLimits(settings.max_result_rows, settings.max_result_bytes, settings.result_overflow_mode);
}
{
std::unique_ptr<OpenTelemetrySpanHolder> span;
if (context->query_trace_context.trace_id != UUID())
{
auto * raw_interpreter_ptr = interpreter.get();
std::string class_name(abi::__cxa_demangle(typeid(*raw_interpreter_ptr).name(), nullptr, nullptr, nullptr));
span = std::make_unique<OpenTelemetrySpanHolder>(class_name + "::execute()");
}
res = interpreter->execute();
}
QueryPipeline & pipeline = res.pipeline;
if (const auto * insert_interpreter = typeid_cast<const InterpreterInsertQuery *>(&*interpreter))
{
/// Save insertion table (not table function). TODO: support remote() table function.
auto table_id = insert_interpreter->getDatabaseTable();
const auto & table_id = insert_query->table_id;
if (!table_id.empty())
context->setInsertionTable(std::move(table_id));
context->setInsertionTable(table_id);
}
else
{
interpreter = InterpreterFactory::get(ast, context, SelectQueryOptions(stage).setInternal(internal));
if (!interpreter->ignoreQuota())
{
quota = context->getQuota();
if (quota)
{
if (ast->as<ASTSelectQuery>() || ast->as<ASTSelectWithUnionQuery>())
{
quota->used(QuotaType::QUERY_SELECTS, 1);
}
else if (ast->as<ASTInsertQuery>())
{
quota->used(QuotaType::QUERY_INSERTS, 1);
}
quota->used(QuotaType::QUERIES, 1);
quota->checkExceeded(QuotaType::ERRORS);
}
}
if (!interpreter->ignoreLimits())
{
limits.mode = LimitsMode::LIMITS_CURRENT; //-V1048
limits.size_limits = SizeLimits(settings.max_result_rows, settings.max_result_bytes, settings.result_overflow_mode);
}
{
std::unique_ptr<OpenTelemetrySpanHolder> span;
if (context->query_trace_context.trace_id != UUID())
{
auto * raw_interpreter_ptr = interpreter.get();
std::string class_name(abi::__cxa_demangle(typeid(*raw_interpreter_ptr).name(), nullptr, nullptr, nullptr));
span = std::make_unique<OpenTelemetrySpanHolder>(class_name + "::execute()");
}
res = interpreter->execute();
}
if (const auto * insert_interpreter = typeid_cast<const InterpreterInsertQuery *>(&*interpreter))
{
/// Save insertion table (not table function). TODO: support remote() table function.
auto table_id = insert_interpreter->getDatabaseTable();
if (!table_id.empty())
context->setInsertionTable(std::move(table_id));
}
}
if (process_list_entry)
@ -663,6 +680,8 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
/// Hold element of process list till end of query execution.
res.process_list_entry = process_list_entry;
auto & pipeline = res.pipeline;
if (pipeline.pulling() || pipeline.completed())
{
/// Limits on the result, the quota on the result, and also callback for progress.
@ -712,7 +731,10 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
elem.query_views = info.views;
}
interpreter->extendQueryLogElem(elem, ast, context, query_database, query_table);
if (async_insert)
InterpreterInsertQuery::extendQueryLogElemImpl(elem, context);
else if (interpreter)
interpreter->extendQueryLogElem(elem, ast, context, query_database, query_table);
if (settings.log_query_settings)
elem.query_settings = std::make_shared<Settings>(context->getSettingsRef());

View File

@ -0,0 +1,4 @@
1 a
2 b
INSERT INTO async_inserts_2156 VALUES 1 Insert 1 0
INSERT INTO async_inserts_2156 VALUES 1 Insert 1

View File

@ -0,0 +1,23 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
${CLICKHOUSE_CLIENT} -q "DROP TABLE IF EXISTS async_inserts_2156"
${CLICKHOUSE_CLIENT} -q "CREATE TABLE async_inserts_2156 (id UInt32, s String) ENGINE = Memory"
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&async_insert=1&wait_for_async_insert=0" -d "INSERT INTO async_inserts_2156 VALUES (1, 'a')"
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&async_insert=1&wait_for_async_insert=1" -d "INSERT INTO async_inserts_2156 VALUES (2, 'b')"
${CLICKHOUSE_CLIENT} -q "SELECT * FROM async_inserts_2156 ORDER BY id"
${CLICKHOUSE_CLIENT} -q "SYSTEM FLUSH LOGS"
${CLICKHOUSE_CLIENT} -q "SELECT query, arrayExists(x -> x LIKE '%async_inserts_2156', tables), \
query_kind, Settings['async_insert'], Settings['wait_for_async_insert'] FROM system.query_log \
WHERE event_date >= yesterday() AND current_database = '$CLICKHOUSE_DATABASE' \
AND query ILIKE 'INSERT INTO async_inserts_2156 VALUES%' AND type = 'QueryFinish' \
ORDER BY query_start_time_microseconds"
${CLICKHOUSE_CLIENT} -q "DROP TABLE async_inserts_2156"