diff --git a/src/Interpreters/InterpreterInsertQuery.cpp b/src/Interpreters/InterpreterInsertQuery.cpp index 8677cf59d79..a1f83c81a81 100644 --- a/src/Interpreters/InterpreterInsertQuery.cpp +++ b/src/Interpreters/InterpreterInsertQuery.cpp @@ -466,7 +466,7 @@ StorageID InterpreterInsertQuery::getDatabaseTable() const } -void InterpreterInsertQuery::extendQueryLogElemImpl(QueryLogElement & elem, const ASTPtr &, ContextPtr context_) const +void InterpreterInsertQuery::extendQueryLogElemImpl(QueryLogElement & elem, ContextPtr context_) { elem.query_kind = "Insert"; const auto & insert_table = context_->getInsertionTable(); @@ -477,4 +477,9 @@ void InterpreterInsertQuery::extendQueryLogElemImpl(QueryLogElement & elem, cons } } +void InterpreterInsertQuery::extendQueryLogElemImpl(QueryLogElement & elem, const ASTPtr &, ContextPtr context_) const +{ + extendQueryLogElemImpl(elem, context_); +} + } diff --git a/src/Interpreters/InterpreterInsertQuery.h b/src/Interpreters/InterpreterInsertQuery.h index e5733a8c28b..93de92a0680 100644 --- a/src/Interpreters/InterpreterInsertQuery.h +++ b/src/Interpreters/InterpreterInsertQuery.h @@ -40,6 +40,7 @@ public: ThreadStatus * thread_status = nullptr, std::atomic_uint64_t * elapsed_counter_ms = nullptr); + static void extendQueryLogElemImpl(QueryLogElement & elem, ContextPtr context_); void extendQueryLogElemImpl(QueryLogElement & elem, const ASTPtr & ast, ContextPtr context_) const override; StoragePtr getTable(ASTInsertQuery & query); diff --git a/src/Interpreters/executeQuery.cpp b/src/Interpreters/executeQuery.cpp index e177fd8e6b3..bd3c35c12f6 100644 --- a/src/Interpreters/executeQuery.cpp +++ b/src/Interpreters/executeQuery.cpp @@ -556,9 +556,14 @@ static std::tuple executeQueryImpl( auto * insert_query = ast->as(); - if (insert_query && insert_query->table_id) - /// Resolve database before trying to use async insert feature - to properly hash the query. - insert_query->table_id = context->resolveStorageID(insert_query->table_id); + /// Resolve database before trying to use async insert feature - to properly hash the query. + if (insert_query) + { + if (insert_query->table_id) + insert_query->table_id = context->resolveStorageID(insert_query->table_id); + else if (auto table = insert_query->getTable(); !table.empty()) + insert_query->table_id = context->resolveStorageID(StorageID{insert_query->getDatabase(), table}); + } if (insert_query && insert_query->select) { @@ -579,8 +584,14 @@ static std::tuple executeQueryImpl( } } else + { /// reset Input callbacks if query is not INSERT SELECT context->resetInputCallbacks(); + } + + StreamLocalLimits limits; + std::shared_ptr quota; + std::unique_ptr interpreter; auto * queue = context->getAsynchronousInsertQueue(); const bool async_insert = queue @@ -591,65 +602,71 @@ static std::tuple executeQueryImpl( { queue->push(ast, context); - BlockIO io; if (settings.wait_for_async_insert) { auto timeout = settings.wait_for_async_insert_timeout.totalMilliseconds(); auto query_id = context->getCurrentQueryId(); auto source = std::make_shared(query_id, timeout, *queue); - io.pipeline = QueryPipeline(Pipe(std::move(source))); + res.pipeline = QueryPipeline(Pipe(std::move(source))); } - return std::make_tuple(ast, std::move(io)); - } - - auto interpreter = InterpreterFactory::get(ast, context, SelectQueryOptions(stage).setInternal(internal)); - - std::shared_ptr quota; - if (!interpreter->ignoreQuota()) - { quota = context->getQuota(); if (quota) { - if (ast->as() || ast->as()) - { - quota->used(QuotaType::QUERY_SELECTS, 1); - } - else if (ast->as()) - { - quota->used(QuotaType::QUERY_INSERTS, 1); - } + quota->used(QuotaType::QUERY_INSERTS, 1); quota->used(QuotaType::QUERIES, 1); - quota->checkExceeded(QuotaType::ERRORS); } - } - StreamLocalLimits limits; - if (!interpreter->ignoreLimits()) - { - limits.mode = LimitsMode::LIMITS_CURRENT; //-V1048 - limits.size_limits = SizeLimits(settings.max_result_rows, settings.max_result_bytes, settings.result_overflow_mode); - } - - { - std::unique_ptr span; - if (context->query_trace_context.trace_id != UUID()) - { - auto * raw_interpreter_ptr = interpreter.get(); - std::string class_name(abi::__cxa_demangle(typeid(*raw_interpreter_ptr).name(), nullptr, nullptr, nullptr)); - span = std::make_unique(class_name + "::execute()"); - } - res = interpreter->execute(); - } - - QueryPipeline & pipeline = res.pipeline; - - if (const auto * insert_interpreter = typeid_cast(&*interpreter)) - { - /// Save insertion table (not table function). TODO: support remote() table function. - auto table_id = insert_interpreter->getDatabaseTable(); + const auto & table_id = insert_query->table_id; if (!table_id.empty()) - context->setInsertionTable(std::move(table_id)); + context->setInsertionTable(table_id); + } + else + { + interpreter = InterpreterFactory::get(ast, context, SelectQueryOptions(stage).setInternal(internal)); + + if (!interpreter->ignoreQuota()) + { + quota = context->getQuota(); + if (quota) + { + if (ast->as() || ast->as()) + { + quota->used(QuotaType::QUERY_SELECTS, 1); + } + else if (ast->as()) + { + quota->used(QuotaType::QUERY_INSERTS, 1); + } + quota->used(QuotaType::QUERIES, 1); + quota->checkExceeded(QuotaType::ERRORS); + } + } + + if (!interpreter->ignoreLimits()) + { + limits.mode = LimitsMode::LIMITS_CURRENT; //-V1048 + limits.size_limits = SizeLimits(settings.max_result_rows, settings.max_result_bytes, settings.result_overflow_mode); + } + + { + std::unique_ptr span; + if (context->query_trace_context.trace_id != UUID()) + { + auto * raw_interpreter_ptr = interpreter.get(); + std::string class_name(abi::__cxa_demangle(typeid(*raw_interpreter_ptr).name(), nullptr, nullptr, nullptr)); + span = std::make_unique(class_name + "::execute()"); + } + res = interpreter->execute(); + } + + if (const auto * insert_interpreter = typeid_cast(&*interpreter)) + { + /// Save insertion table (not table function). TODO: support remote() table function. + auto table_id = insert_interpreter->getDatabaseTable(); + if (!table_id.empty()) + context->setInsertionTable(std::move(table_id)); + } } if (process_list_entry) @@ -663,6 +680,8 @@ static std::tuple executeQueryImpl( /// Hold element of process list till end of query execution. res.process_list_entry = process_list_entry; + auto & pipeline = res.pipeline; + if (pipeline.pulling() || pipeline.completed()) { /// Limits on the result, the quota on the result, and also callback for progress. @@ -712,7 +731,10 @@ static std::tuple executeQueryImpl( elem.query_views = info.views; } - interpreter->extendQueryLogElem(elem, ast, context, query_database, query_table); + if (async_insert) + InterpreterInsertQuery::extendQueryLogElemImpl(elem, context); + else if (interpreter) + interpreter->extendQueryLogElem(elem, ast, context, query_database, query_table); if (settings.log_query_settings) elem.query_settings = std::make_shared(context->getSettingsRef()); diff --git a/tests/queries/0_stateless/02156_async_insert_query_log.reference b/tests/queries/0_stateless/02156_async_insert_query_log.reference new file mode 100644 index 00000000000..404dbfe753d --- /dev/null +++ b/tests/queries/0_stateless/02156_async_insert_query_log.reference @@ -0,0 +1,4 @@ +1 a +2 b +INSERT INTO async_inserts_2156 VALUES 1 Insert 1 0 +INSERT INTO async_inserts_2156 VALUES 1 Insert 1 diff --git a/tests/queries/0_stateless/02156_async_insert_query_log.sh b/tests/queries/0_stateless/02156_async_insert_query_log.sh new file mode 100755 index 00000000000..d7177fbe70c --- /dev/null +++ b/tests/queries/0_stateless/02156_async_insert_query_log.sh @@ -0,0 +1,23 @@ +#!/usr/bin/env bash + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CURDIR"/../shell_config.sh + +${CLICKHOUSE_CLIENT} -q "DROP TABLE IF EXISTS async_inserts_2156" +${CLICKHOUSE_CLIENT} -q "CREATE TABLE async_inserts_2156 (id UInt32, s String) ENGINE = Memory" + +${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&async_insert=1&wait_for_async_insert=0" -d "INSERT INTO async_inserts_2156 VALUES (1, 'a')" +${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&async_insert=1&wait_for_async_insert=1" -d "INSERT INTO async_inserts_2156 VALUES (2, 'b')" + +${CLICKHOUSE_CLIENT} -q "SELECT * FROM async_inserts_2156 ORDER BY id" + +${CLICKHOUSE_CLIENT} -q "SYSTEM FLUSH LOGS" + +${CLICKHOUSE_CLIENT} -q "SELECT query, arrayExists(x -> x LIKE '%async_inserts_2156', tables), \ + query_kind, Settings['async_insert'], Settings['wait_for_async_insert'] FROM system.query_log \ + WHERE event_date >= yesterday() AND current_database = '$CLICKHOUSE_DATABASE' \ + AND query ILIKE 'INSERT INTO async_inserts_2156 VALUES%' AND type = 'QueryFinish' \ + ORDER BY query_start_time_microseconds" + +${CLICKHOUSE_CLIENT} -q "DROP TABLE async_inserts_2156"