From 4baa7690ae556926a288dc098f285716cf674fc8 Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Fri, 20 May 2022 18:19:40 +0300 Subject: [PATCH] Send profile events for INSERT queries (previously only SELECT was supported) Reproducer: echo "1" | clickhouse-client --query="insert into function null('foo String') format TSV" --print-profile-events --profile-events-delay-ms=-1 However, clickhouse-local is differnt, it does sent the periodically, but only if query was long enough, i.e.: # yes | head -n100000 | clickhouse-local --query="insert into function null('foo String') format TSV" --print-profile-events --profile-events-delay-ms=-1 [s1.ch] 2022.05.20 15:20:27 [ 0 ] ContextLock: 10 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] DiskReadElapsedMicroseconds: 29 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] IOBufferAllocBytes: 200000 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] IOBufferAllocs: 1 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] InsertQuery: 1 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] InsertedBytes: 1000000 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] InsertedRows: 100000 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] MemoryTrackerUsage: 1521975 (gauge) [s1.ch] 2022.05.20 15:20:27 [ 0 ] OSCPUVirtualTimeMicroseconds: 102148 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] OSReadChars: 135700 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] OSWriteChars: 8 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] Query: 1 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] RWLockAcquiredReadLocks: 2 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] ReadBufferFromFileDescriptorRead: 5 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] ReadBufferFromFileDescriptorReadBytes: 134464 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] RealTimeMicroseconds: 293747 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] SoftPageFaults: 382 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] TableFunctionExecute: 1 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] UserTimeMicroseconds: 102148 (increment) v2: Proper support ProfileEvents in INSERTs (with protocol change) v3: Receive profile events on INSERT queries Signed-off-by: Azat Khuzhin --- src/Client/ClientBase.cpp | 6 ++-- src/Client/ClientBase.h | 2 +- src/Client/LocalConnection.cpp | 30 +++++++++---------- src/Client/LocalConnection.h | 2 +- src/Core/ProtocolDefines.h | 4 ++- src/QueryPipeline/RemoteInserter.cpp | 8 +++++ src/Server/TCPHandler.cpp | 29 ++++++++++++++---- src/Server/TCPHandler.h | 2 ++ .../02310_profile_events_insert.reference | 4 +++ .../02310_profile_events_insert.sh | 13 ++++++++ 10 files changed, 72 insertions(+), 28 deletions(-) create mode 100644 tests/queries/0_stateless/02310_profile_events_insert.reference create mode 100755 tests/queries/0_stateless/02310_profile_events_insert.sh diff --git a/src/Client/ClientBase.cpp b/src/Client/ClientBase.cpp index 733c2d6b4df..72144c99f3c 100644 --- a/src/Client/ClientBase.cpp +++ b/src/Client/ClientBase.cpp @@ -1275,7 +1275,7 @@ try } /// Check if server send Log packet - receiveLogs(parsed_query); + receiveLogsAndProfileEvents(parsed_query); /// Check if server send Exception packet auto packet_type = connection->checkPacket(0); @@ -1328,11 +1328,11 @@ void ClientBase::sendDataFromStdin(Block & sample, const ColumnsDescription & co /// Process Log packets, used when inserting data by blocks -void ClientBase::receiveLogs(ASTPtr parsed_query) +void ClientBase::receiveLogsAndProfileEvents(ASTPtr parsed_query) { auto packet_type = connection->checkPacket(0); - while (packet_type && *packet_type == Protocol::Server::Log) + while (packet_type && (*packet_type == Protocol::Server::Log || *packet_type == Protocol::Server::ProfileEvents)) { receiveAndProcessPacket(parsed_query, false); packet_type = connection->checkPacket(0); diff --git a/src/Client/ClientBase.h b/src/Client/ClientBase.h index d11977e984a..d34fe282839 100644 --- a/src/Client/ClientBase.h +++ b/src/Client/ClientBase.h @@ -117,7 +117,7 @@ protected: private: void receiveResult(ASTPtr parsed_query); bool receiveAndProcessPacket(ASTPtr parsed_query, bool cancelled_); - void receiveLogs(ASTPtr parsed_query); + void receiveLogsAndProfileEvents(ASTPtr parsed_query); bool receiveSampleBlock(Block & out, ColumnsDescription & columns_description, ASTPtr parsed_query); bool receiveEndOfQuery(); void cancelQuery(); diff --git a/src/Client/LocalConnection.cpp b/src/Client/LocalConnection.cpp index 0707b0bcdc0..425e54fb392 100644 --- a/src/Client/LocalConnection.cpp +++ b/src/Client/LocalConnection.cpp @@ -18,6 +18,7 @@ namespace ErrorCodes extern const int UNKNOWN_PACKET_FROM_SERVER; extern const int UNKNOWN_EXCEPTION; extern const int NOT_IMPLEMENTED; + extern const int LOGICAL_ERROR; } LocalConnection::LocalConnection(ContextPtr context_, bool send_progress_, bool send_profile_events_, const String & server_display_name_) @@ -62,9 +63,13 @@ void LocalConnection::updateProgress(const Progress & value) state->progress.incrementPiecewiseAtomically(value); } -void LocalConnection::getProfileEvents(Block & block) +void LocalConnection::sendProfileEvents() { - ProfileEvents::getProfileEvents(server_display_name, state->profile_queue, block, last_sent_snapshots); + Block profile_block; + state->after_send_profile_events.restart(); + next_packet_type = Protocol::Server::ProfileEvents; + ProfileEvents::getProfileEvents(server_display_name, state->profile_queue, profile_block, last_sent_snapshots); + state->block.emplace(std::move(profile_block)); } void LocalConnection::sendQuery( @@ -192,13 +197,14 @@ void LocalConnection::sendData(const Block & block, const String &, bool) return; if (state->pushing_async_executor) - { state->pushing_async_executor->push(block); - } else if (state->pushing_executor) - { state->pushing_executor->push(block); - } + else + throw Exception("Unknown executor", ErrorCodes::LOGICAL_ERROR); + + if (send_profile_events) + sendProfileEvents(); } void LocalConnection::sendCancel() @@ -264,11 +270,7 @@ bool LocalConnection::poll(size_t) if (send_profile_events && (state->after_send_profile_events.elapsedMicroseconds() >= query_context->getSettingsRef().interactive_delay)) { - Block block; - state->after_send_profile_events.restart(); - next_packet_type = Protocol::Server::ProfileEvents; - getProfileEvents(block); - state->block.emplace(std::move(block)); + sendProfileEvents(); return true; } @@ -349,11 +351,7 @@ bool LocalConnection::poll(size_t) if (send_profile_events && state->executor) { - Block block; - state->after_send_profile_events.restart(); - next_packet_type = Protocol::Server::ProfileEvents; - getProfileEvents(block); - state->block.emplace(std::move(block)); + sendProfileEvents(); return true; } } diff --git a/src/Client/LocalConnection.h b/src/Client/LocalConnection.h index 1ad6ad73238..1ebe4a1d901 100644 --- a/src/Client/LocalConnection.h +++ b/src/Client/LocalConnection.h @@ -142,7 +142,7 @@ private: void updateProgress(const Progress & value); - void getProfileEvents(Block & block); + void sendProfileEvents(); bool pollImpl(); diff --git a/src/Core/ProtocolDefines.h b/src/Core/ProtocolDefines.h index 6ee491f3ab5..2df48a79776 100644 --- a/src/Core/ProtocolDefines.h +++ b/src/Core/ProtocolDefines.h @@ -52,6 +52,8 @@ /// NOTE: DBMS_TCP_PROTOCOL_VERSION has nothing common with VERSION_REVISION, /// later is just a number for server version (one number instead of commit SHA) /// for simplicity (sometimes it may be more convenient in some use cases). -#define DBMS_TCP_PROTOCOL_VERSION 54455 +#define DBMS_TCP_PROTOCOL_VERSION 54456 #define DBMS_MIN_PROTOCOL_VERSION_WITH_INITIAL_QUERY_START_TIME 54449 + +#define DBMS_MIN_PROTOCOL_VERSION_WITH_PROFILE_EVENTS_IN_INSERT 54456 diff --git a/src/QueryPipeline/RemoteInserter.cpp b/src/QueryPipeline/RemoteInserter.cpp index d5cef72b020..1e1c4537d2f 100644 --- a/src/QueryPipeline/RemoteInserter.cpp +++ b/src/QueryPipeline/RemoteInserter.cpp @@ -72,6 +72,10 @@ RemoteInserter::RemoteInserter( if (auto log_queue = CurrentThread::getInternalTextLogsQueue()) log_queue->pushBlock(std::move(packet.block)); } + else if (Protocol::Server::ProfileEvents == packet.type) + { + // Do nothing + } else if (Protocol::Server::TableColumns == packet.type) { /// Server could attach ColumnsDescription in front of stream for column defaults. There's no need to pass it through cause @@ -132,6 +136,10 @@ void RemoteInserter::onFinish() { // Do nothing } + else if (Protocol::Server::ProfileEvents == packet.type) + { + // Do nothing + } else throw NetException( ErrorCodes::UNEXPECTED_PACKET_FROM_SERVER, diff --git a/src/Server/TCPHandler.cpp b/src/Server/TCPHandler.cpp index cc51901ac40..eff91ae2302 100644 --- a/src/Server/TCPHandler.cpp +++ b/src/Server/TCPHandler.cpp @@ -357,7 +357,7 @@ void TCPHandler::runImpl() return true; sendProgress(); - sendProfileEvents(); + sendSelectProfileEvents(); sendLogs(); return false; @@ -586,7 +586,10 @@ bool TCPHandler::readDataNext() } if (read_ok) + { sendLogs(); + sendInsertProfileEvents(); + } else state.read_all_data = true; @@ -659,6 +662,8 @@ void TCPHandler::processInsertQuery() PushingPipelineExecutor executor(state.io.pipeline); run_executor(executor); } + + sendInsertProfileEvents(); } @@ -701,7 +706,7 @@ void TCPHandler::processOrdinaryQueryWithProcessors() /// Some time passed and there is a progress. after_send_progress.restart(); sendProgress(); - sendProfileEvents(); + sendSelectProfileEvents(); } sendLogs(); @@ -727,7 +732,7 @@ void TCPHandler::processOrdinaryQueryWithProcessors() sendProfileInfo(executor.getProfileInfo()); sendProgress(); sendLogs(); - sendProfileEvents(); + sendSelectProfileEvents(); } if (state.is_connection_closed) @@ -861,9 +866,6 @@ void TCPHandler::sendExtremes(const Block & extremes) void TCPHandler::sendProfileEvents() { - if (client_tcp_protocol_version < DBMS_MIN_PROTOCOL_VERSION_WITH_INCREMENTAL_PROFILE_EVENTS) - return; - Block block; ProfileEvents::getProfileEvents(server_display_name, state.profile_queue, block, last_sent_snapshots); if (block.rows() != 0) @@ -878,6 +880,21 @@ void TCPHandler::sendProfileEvents() } } +void TCPHandler::sendSelectProfileEvents() +{ + if (client_tcp_protocol_version < DBMS_MIN_PROTOCOL_VERSION_WITH_INCREMENTAL_PROFILE_EVENTS) + return; + + sendProfileEvents(); +} + +void TCPHandler::sendInsertProfileEvents() +{ + if (client_tcp_protocol_version < DBMS_MIN_PROTOCOL_VERSION_WITH_PROFILE_EVENTS_IN_INSERT) + return; + + sendProfileEvents(); +} bool TCPHandler::receiveProxyHeader() { diff --git a/src/Server/TCPHandler.h b/src/Server/TCPHandler.h index 4f2516e7923..a873f9ba75c 100644 --- a/src/Server/TCPHandler.h +++ b/src/Server/TCPHandler.h @@ -251,6 +251,8 @@ private: void sendTotals(const Block & totals); void sendExtremes(const Block & extremes); void sendProfileEvents(); + void sendSelectProfileEvents(); + void sendInsertProfileEvents(); /// Creates state.block_in/block_out for blocks read/write, depending on whether compression is enabled. void initBlockInput(); diff --git a/tests/queries/0_stateless/02310_profile_events_insert.reference b/tests/queries/0_stateless/02310_profile_events_insert.reference new file mode 100644 index 00000000000..7308b2da5b1 --- /dev/null +++ b/tests/queries/0_stateless/02310_profile_events_insert.reference @@ -0,0 +1,4 @@ +client +InsertedRows: 1 (increment) +local +InsertedRows: 1 (increment) diff --git a/tests/queries/0_stateless/02310_profile_events_insert.sh b/tests/queries/0_stateless/02310_profile_events_insert.sh new file mode 100755 index 00000000000..e51297ea7c9 --- /dev/null +++ b/tests/queries/0_stateless/02310_profile_events_insert.sh @@ -0,0 +1,13 @@ +#!/usr/bin/env bash + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CURDIR"/../shell_config.sh + +echo client +$CLICKHOUSE_CLIENT --print-profile-events --profile-events-delay-ms=-1 -q "insert into function null('foo Int') values (1)" |& grep -o 'InsertedRows: .*' + +echo local +$CLICKHOUSE_LOCAL --print-profile-events --profile-events-delay-ms=-1 -q "insert into function null('foo Int') values (1)" |& grep -o 'InsertedRows: .*' + +exit 0