From aa81937eeea7f17000ec2cd7682d723907584fcb Mon Sep 17 00:00:00 2001 From: Anton Popov Date: Tue, 25 Apr 2023 00:31:55 +0000 Subject: [PATCH 1/2] add field with number of rows to async insert log --- src/Interpreters/AsynchronousInsertLog.cpp | 2 ++ src/Interpreters/AsynchronousInsertLog.h | 1 + src/Interpreters/AsynchronousInsertQueue.cpp | 4 +++- .../0_stateless/02456_async_inserts_logs.reference | 12 ++++++------ .../queries/0_stateless/02456_async_inserts_logs.sh | 2 +- 5 files changed, 13 insertions(+), 8 deletions(-) diff --git a/src/Interpreters/AsynchronousInsertLog.cpp b/src/Interpreters/AsynchronousInsertLog.cpp index 916ec8f3d56..eeccd9ad92e 100644 --- a/src/Interpreters/AsynchronousInsertLog.cpp +++ b/src/Interpreters/AsynchronousInsertLog.cpp @@ -36,6 +36,7 @@ NamesAndTypesList AsynchronousInsertLogElement::getNamesAndTypes() {"format", std::make_shared(std::make_shared())}, {"query_id", std::make_shared()}, {"bytes", std::make_shared()}, + {"rows", std::make_shared()}, {"exception", std::make_shared()}, {"status", type_status}, @@ -71,6 +72,7 @@ void AsynchronousInsertLogElement::appendToBlock(MutableColumns & columns) const columns[i++]->insert(insert_query.format); columns[i++]->insert(query_id); columns[i++]->insert(bytes); + columns[i++]->insert(rows); columns[i++]->insert(exception); columns[i++]->insert(status); diff --git a/src/Interpreters/AsynchronousInsertLog.h b/src/Interpreters/AsynchronousInsertLog.h index e2fdd4c90a0..f1e39db1ded 100644 --- a/src/Interpreters/AsynchronousInsertLog.h +++ b/src/Interpreters/AsynchronousInsertLog.h @@ -24,6 +24,7 @@ struct AsynchronousInsertLogElement ASTPtr query; String query_id; UInt64 bytes{}; + UInt64 rows{}; String exception; Status status{}; diff --git a/src/Interpreters/AsynchronousInsertQueue.cpp b/src/Interpreters/AsynchronousInsertQueue.cpp index b8de0246ae2..76f956341fd 100644 --- a/src/Interpreters/AsynchronousInsertQueue.cpp +++ b/src/Interpreters/AsynchronousInsertQueue.cpp @@ -444,7 +444,8 @@ try { auto buffer = std::make_unique(entry->bytes); current_entry = entry; - total_rows += executor.execute(*buffer); + size_t num_rows = executor.execute(*buffer); + total_rows += num_rows; chunk_info->offsets.push_back(total_rows); /// Keep buffer, because it still can be used @@ -459,6 +460,7 @@ try elem.query = key.query; elem.query_id = entry->query_id; elem.bytes = entry->bytes.size(); + elem.rows = num_rows; elem.exception = current_exception; current_exception.clear(); diff --git a/tests/queries/0_stateless/02456_async_inserts_logs.reference b/tests/queries/0_stateless/02456_async_inserts_logs.reference index efd8a88eca4..79f7ea458a7 100644 --- a/tests/queries/0_stateless/02456_async_inserts_logs.reference +++ b/tests/queries/0_stateless/02456_async_inserts_logs.reference @@ -1,7 +1,7 @@ 5 - Values 21 1 Ok 1 -t_async_inserts_logs JSONEachRow 39 1 Ok 1 -t_async_inserts_logs Values 8 1 Ok 1 -t_async_inserts_logs JSONEachRow 6 0 ParsingError 1 -t_async_inserts_logs Values 6 0 ParsingError 1 -t_async_inserts_logs Values 8 0 FlushError 1 + Values 21 2 1 Ok 1 +t_async_inserts_logs JSONEachRow 39 2 1 Ok 1 +t_async_inserts_logs Values 8 1 1 Ok 1 +t_async_inserts_logs JSONEachRow 6 0 0 ParsingError 1 +t_async_inserts_logs Values 6 0 0 ParsingError 1 +t_async_inserts_logs Values 8 1 0 FlushError 1 diff --git a/tests/queries/0_stateless/02456_async_inserts_logs.sh b/tests/queries/0_stateless/02456_async_inserts_logs.sh index 006455e2d42..2e4db67c069 100755 --- a/tests/queries/0_stateless/02456_async_inserts_logs.sh +++ b/tests/queries/0_stateless/02456_async_inserts_logs.sh @@ -30,7 +30,7 @@ ${CLICKHOUSE_CLIENT} -q "SELECT count() FROM t_async_inserts_logs" ${CLICKHOUSE_CLIENT} -q "SYSTEM FLUSH LOGS" ${CLICKHOUSE_CLIENT} -q " - SELECT table, format, bytes, empty(exception), status, + SELECT table, format, bytes, rows, empty(exception), status, status = 'ParsingError' ? flush_time_microseconds = 0 : flush_time_microseconds > event_time_microseconds AS time_ok FROM system.asynchronous_insert_log WHERE database = '$CLICKHOUSE_DATABASE' OR query ILIKE 'INSERT INTO FUNCTION%$CLICKHOUSE_DATABASE%' From cc5acfbe62a52c2b651794db9c8173e0d8be0ea5 Mon Sep 17 00:00:00 2001 From: Anton Popov Date: Tue, 25 Apr 2023 01:37:37 +0000 Subject: [PATCH 2/2] add profile event --- src/Common/ProfileEvents.cpp | 1 + src/Interpreters/AsynchronousInsertQueue.cpp | 2 ++ tests/queries/0_stateless/02456_async_inserts_logs.reference | 3 +++ tests/queries/0_stateless/02456_async_inserts_logs.sh | 5 +++++ 4 files changed, 11 insertions(+) diff --git a/src/Common/ProfileEvents.cpp b/src/Common/ProfileEvents.cpp index da096085d5b..166db25e14c 100644 --- a/src/Common/ProfileEvents.cpp +++ b/src/Common/ProfileEvents.cpp @@ -10,6 +10,7 @@ M(InsertQuery, "Same as Query, but only for INSERT queries.") \ M(AsyncInsertQuery, "Same as InsertQuery, but only for asynchronous INSERT queries.") \ M(AsyncInsertBytes, "Data size in bytes of asynchronous INSERT queries.") \ + M(AsyncInsertRows, "Number of rows inserted by asynchronous INSERT queries.") \ M(AsyncInsertCacheHits, "Number of times a duplicate hash id has been found in asynchronous INSERT hash id cache.") \ M(FailedQuery, "Number of failed queries.") \ M(FailedSelectQuery, "Same as FailedQuery, but only for SELECT queries.") \ diff --git a/src/Interpreters/AsynchronousInsertQueue.cpp b/src/Interpreters/AsynchronousInsertQueue.cpp index 76f956341fd..88233f58a54 100644 --- a/src/Interpreters/AsynchronousInsertQueue.cpp +++ b/src/Interpreters/AsynchronousInsertQueue.cpp @@ -40,6 +40,7 @@ namespace ProfileEvents { extern const Event AsyncInsertQuery; extern const Event AsyncInsertBytes; + extern const Event AsyncInsertRows; extern const Event FailedAsyncInsertQuery; } @@ -481,6 +482,7 @@ try format->addBuffer(std::move(last_buffer)); auto insert_query_id = insert_context->getCurrentQueryId(); + ProfileEvents::increment(ProfileEvents::AsyncInsertRows, total_rows); auto finish_entries = [&] { diff --git a/tests/queries/0_stateless/02456_async_inserts_logs.reference b/tests/queries/0_stateless/02456_async_inserts_logs.reference index 79f7ea458a7..ba1b19fb184 100644 --- a/tests/queries/0_stateless/02456_async_inserts_logs.reference +++ b/tests/queries/0_stateless/02456_async_inserts_logs.reference @@ -5,3 +5,6 @@ t_async_inserts_logs Values 8 1 1 Ok 1 t_async_inserts_logs JSONEachRow 6 0 0 ParsingError 1 t_async_inserts_logs Values 6 0 0 ParsingError 1 t_async_inserts_logs Values 8 1 0 FlushError 1 +AsyncInsertBytes 1 +AsyncInsertQuery 1 +AsyncInsertRows 1 diff --git a/tests/queries/0_stateless/02456_async_inserts_logs.sh b/tests/queries/0_stateless/02456_async_inserts_logs.sh index 2e4db67c069..43cd73d7231 100755 --- a/tests/queries/0_stateless/02456_async_inserts_logs.sh +++ b/tests/queries/0_stateless/02456_async_inserts_logs.sh @@ -37,3 +37,8 @@ ${CLICKHOUSE_CLIENT} -q " ORDER BY table, status, format" ${CLICKHOUSE_CLIENT} -q "DROP TABLE t_async_inserts_logs" + +${CLICKHOUSE_CLIENT} -q " +SELECT event, value > 0 FROM system.events +WHERE event IN ('AsyncInsertQuery', 'AsyncInsertBytes', 'AsyncInsertRows') +ORDER BY event"