Send profile events for INSERT queries (previously only SELECT was supported)

Reproducer:

    echo "1" | clickhouse-client --query="insert into function null('foo String') format TSV" --print-profile-events --profile-events-delay-ms=-1

However, clickhouse-local is differnt, it does sent the periodically,
but only if query was long enough, i.e.:

    # yes | head -n100000 | clickhouse-local --query="insert into function null('foo String') format TSV" --print-profile-events --profile-events-delay-ms=-1
    [s1.ch] 2022.05.20 15:20:27 [ 0 ] ContextLock: 10 (increment)
    [s1.ch] 2022.05.20 15:20:27 [ 0 ] DiskReadElapsedMicroseconds: 29 (increment)
    [s1.ch] 2022.05.20 15:20:27 [ 0 ] IOBufferAllocBytes: 200000 (increment)
    [s1.ch] 2022.05.20 15:20:27 [ 0 ] IOBufferAllocs: 1 (increment)
    [s1.ch] 2022.05.20 15:20:27 [ 0 ] InsertQuery: 1 (increment)
    [s1.ch] 2022.05.20 15:20:27 [ 0 ] InsertedBytes: 1000000 (increment)
    [s1.ch] 2022.05.20 15:20:27 [ 0 ] InsertedRows: 100000 (increment)
    [s1.ch] 2022.05.20 15:20:27 [ 0 ] MemoryTrackerUsage: 1521975 (gauge)
    [s1.ch] 2022.05.20 15:20:27 [ 0 ] OSCPUVirtualTimeMicroseconds: 102148 (increment)
    [s1.ch] 2022.05.20 15:20:27 [ 0 ] OSReadChars: 135700 (increment)
    [s1.ch] 2022.05.20 15:20:27 [ 0 ] OSWriteChars: 8 (increment)
    [s1.ch] 2022.05.20 15:20:27 [ 0 ] Query: 1 (increment)
    [s1.ch] 2022.05.20 15:20:27 [ 0 ] RWLockAcquiredReadLocks: 2 (increment)
    [s1.ch] 2022.05.20 15:20:27 [ 0 ] ReadBufferFromFileDescriptorRead: 5 (increment)
    [s1.ch] 2022.05.20 15:20:27 [ 0 ] ReadBufferFromFileDescriptorReadBytes: 134464 (increment)
    [s1.ch] 2022.05.20 15:20:27 [ 0 ] RealTimeMicroseconds: 293747 (increment)
    [s1.ch] 2022.05.20 15:20:27 [ 0 ] SoftPageFaults: 382 (increment)
    [s1.ch] 2022.05.20 15:20:27 [ 0 ] TableFunctionExecute: 1 (increment)
    [s1.ch] 2022.05.20 15:20:27 [ 0 ] UserTimeMicroseconds: 102148 (increment)

v2: Proper support ProfileEvents in INSERTs (with protocol change)
v3: Receive profile events on INSERT queries
Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
This commit is contained in:
Azat Khuzhin 2022-05-20 18:19:40 +03:00
parent 3b29db6e9f
commit 4baa7690ae
10 changed files with 72 additions and 28 deletions

View File

@ -1275,7 +1275,7 @@ try
}
/// Check if server send Log packet
receiveLogs(parsed_query);
receiveLogsAndProfileEvents(parsed_query);
/// Check if server send Exception packet
auto packet_type = connection->checkPacket(0);
@ -1328,11 +1328,11 @@ void ClientBase::sendDataFromStdin(Block & sample, const ColumnsDescription & co
/// Process Log packets, used when inserting data by blocks
void ClientBase::receiveLogs(ASTPtr parsed_query)
void ClientBase::receiveLogsAndProfileEvents(ASTPtr parsed_query)
{
auto packet_type = connection->checkPacket(0);
while (packet_type && *packet_type == Protocol::Server::Log)
while (packet_type && (*packet_type == Protocol::Server::Log || *packet_type == Protocol::Server::ProfileEvents))
{
receiveAndProcessPacket(parsed_query, false);
packet_type = connection->checkPacket(0);

View File

@ -117,7 +117,7 @@ protected:
private:
void receiveResult(ASTPtr parsed_query);
bool receiveAndProcessPacket(ASTPtr parsed_query, bool cancelled_);
void receiveLogs(ASTPtr parsed_query);
void receiveLogsAndProfileEvents(ASTPtr parsed_query);
bool receiveSampleBlock(Block & out, ColumnsDescription & columns_description, ASTPtr parsed_query);
bool receiveEndOfQuery();
void cancelQuery();

View File

@ -18,6 +18,7 @@ namespace ErrorCodes
extern const int UNKNOWN_PACKET_FROM_SERVER;
extern const int UNKNOWN_EXCEPTION;
extern const int NOT_IMPLEMENTED;
extern const int LOGICAL_ERROR;
}
LocalConnection::LocalConnection(ContextPtr context_, bool send_progress_, bool send_profile_events_, const String & server_display_name_)
@ -62,9 +63,13 @@ void LocalConnection::updateProgress(const Progress & value)
state->progress.incrementPiecewiseAtomically(value);
}
void LocalConnection::getProfileEvents(Block & block)
void LocalConnection::sendProfileEvents()
{
ProfileEvents::getProfileEvents(server_display_name, state->profile_queue, block, last_sent_snapshots);
Block profile_block;
state->after_send_profile_events.restart();
next_packet_type = Protocol::Server::ProfileEvents;
ProfileEvents::getProfileEvents(server_display_name, state->profile_queue, profile_block, last_sent_snapshots);
state->block.emplace(std::move(profile_block));
}
void LocalConnection::sendQuery(
@ -192,13 +197,14 @@ void LocalConnection::sendData(const Block & block, const String &, bool)
return;
if (state->pushing_async_executor)
{
state->pushing_async_executor->push(block);
}
else if (state->pushing_executor)
{
state->pushing_executor->push(block);
}
else
throw Exception("Unknown executor", ErrorCodes::LOGICAL_ERROR);
if (send_profile_events)
sendProfileEvents();
}
void LocalConnection::sendCancel()
@ -264,11 +270,7 @@ bool LocalConnection::poll(size_t)
if (send_profile_events && (state->after_send_profile_events.elapsedMicroseconds() >= query_context->getSettingsRef().interactive_delay))
{
Block block;
state->after_send_profile_events.restart();
next_packet_type = Protocol::Server::ProfileEvents;
getProfileEvents(block);
state->block.emplace(std::move(block));
sendProfileEvents();
return true;
}
@ -349,11 +351,7 @@ bool LocalConnection::poll(size_t)
if (send_profile_events && state->executor)
{
Block block;
state->after_send_profile_events.restart();
next_packet_type = Protocol::Server::ProfileEvents;
getProfileEvents(block);
state->block.emplace(std::move(block));
sendProfileEvents();
return true;
}
}

View File

@ -142,7 +142,7 @@ private:
void updateProgress(const Progress & value);
void getProfileEvents(Block & block);
void sendProfileEvents();
bool pollImpl();

View File

@ -52,6 +52,8 @@
/// NOTE: DBMS_TCP_PROTOCOL_VERSION has nothing common with VERSION_REVISION,
/// later is just a number for server version (one number instead of commit SHA)
/// for simplicity (sometimes it may be more convenient in some use cases).
#define DBMS_TCP_PROTOCOL_VERSION 54455
#define DBMS_TCP_PROTOCOL_VERSION 54456
#define DBMS_MIN_PROTOCOL_VERSION_WITH_INITIAL_QUERY_START_TIME 54449
#define DBMS_MIN_PROTOCOL_VERSION_WITH_PROFILE_EVENTS_IN_INSERT 54456

View File

@ -72,6 +72,10 @@ RemoteInserter::RemoteInserter(
if (auto log_queue = CurrentThread::getInternalTextLogsQueue())
log_queue->pushBlock(std::move(packet.block));
}
else if (Protocol::Server::ProfileEvents == packet.type)
{
// Do nothing
}
else if (Protocol::Server::TableColumns == packet.type)
{
/// Server could attach ColumnsDescription in front of stream for column defaults. There's no need to pass it through cause
@ -132,6 +136,10 @@ void RemoteInserter::onFinish()
{
// Do nothing
}
else if (Protocol::Server::ProfileEvents == packet.type)
{
// Do nothing
}
else
throw NetException(
ErrorCodes::UNEXPECTED_PACKET_FROM_SERVER,

View File

@ -357,7 +357,7 @@ void TCPHandler::runImpl()
return true;
sendProgress();
sendProfileEvents();
sendSelectProfileEvents();
sendLogs();
return false;
@ -586,7 +586,10 @@ bool TCPHandler::readDataNext()
}
if (read_ok)
{
sendLogs();
sendInsertProfileEvents();
}
else
state.read_all_data = true;
@ -659,6 +662,8 @@ void TCPHandler::processInsertQuery()
PushingPipelineExecutor executor(state.io.pipeline);
run_executor(executor);
}
sendInsertProfileEvents();
}
@ -701,7 +706,7 @@ void TCPHandler::processOrdinaryQueryWithProcessors()
/// Some time passed and there is a progress.
after_send_progress.restart();
sendProgress();
sendProfileEvents();
sendSelectProfileEvents();
}
sendLogs();
@ -727,7 +732,7 @@ void TCPHandler::processOrdinaryQueryWithProcessors()
sendProfileInfo(executor.getProfileInfo());
sendProgress();
sendLogs();
sendProfileEvents();
sendSelectProfileEvents();
}
if (state.is_connection_closed)
@ -861,9 +866,6 @@ void TCPHandler::sendExtremes(const Block & extremes)
void TCPHandler::sendProfileEvents()
{
if (client_tcp_protocol_version < DBMS_MIN_PROTOCOL_VERSION_WITH_INCREMENTAL_PROFILE_EVENTS)
return;
Block block;
ProfileEvents::getProfileEvents(server_display_name, state.profile_queue, block, last_sent_snapshots);
if (block.rows() != 0)
@ -878,6 +880,21 @@ void TCPHandler::sendProfileEvents()
}
}
void TCPHandler::sendSelectProfileEvents()
{
if (client_tcp_protocol_version < DBMS_MIN_PROTOCOL_VERSION_WITH_INCREMENTAL_PROFILE_EVENTS)
return;
sendProfileEvents();
}
void TCPHandler::sendInsertProfileEvents()
{
if (client_tcp_protocol_version < DBMS_MIN_PROTOCOL_VERSION_WITH_PROFILE_EVENTS_IN_INSERT)
return;
sendProfileEvents();
}
bool TCPHandler::receiveProxyHeader()
{

View File

@ -251,6 +251,8 @@ private:
void sendTotals(const Block & totals);
void sendExtremes(const Block & extremes);
void sendProfileEvents();
void sendSelectProfileEvents();
void sendInsertProfileEvents();
/// Creates state.block_in/block_out for blocks read/write, depending on whether compression is enabled.
void initBlockInput();

View File

@ -0,0 +1,4 @@
client
InsertedRows: 1 (increment)
local
InsertedRows: 1 (increment)

View File

@ -0,0 +1,13 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
echo client
$CLICKHOUSE_CLIENT --print-profile-events --profile-events-delay-ms=-1 -q "insert into function null('foo Int') values (1)" |& grep -o 'InsertedRows: .*'
echo local
$CLICKHOUSE_LOCAL --print-profile-events --profile-events-delay-ms=-1 -q "insert into function null('foo Int') values (1)" |& grep -o 'InsertedRows: .*'
exit 0