Add initial_query_start_time to query log

This commit is contained in:
Amos Bird 2021-05-20 21:21:42 +08:00
parent 58005a30a8
commit 7d9a1106b8
No known key found for this signature in database
GPG Key ID: 80D430DCBECFEDB4
9 changed files with 66 additions and 6 deletions

View File

@ -83,10 +83,12 @@
#define DBMS_MIN_REVISION_WITH_X_FORWARDED_FOR_IN_CLIENT_INFO 54443 #define DBMS_MIN_REVISION_WITH_X_FORWARDED_FOR_IN_CLIENT_INFO 54443
#define DBMS_MIN_REVISION_WITH_REFERER_IN_CLIENT_INFO 54447 #define DBMS_MIN_REVISION_WITH_REFERER_IN_CLIENT_INFO 54447
/// Version of ClickHouse TCP protocol. Increment it manually when you change the protocol.
#define DBMS_TCP_PROTOCOL_VERSION 54448
#define DBMS_MIN_PROTOCOL_VERSION_WITH_DISTRIBUTED_DEPTH 54448 #define DBMS_MIN_PROTOCOL_VERSION_WITH_DISTRIBUTED_DEPTH 54448
/// Version of ClickHouse TCP protocol. Increment it manually when you change the protocol.
#define DBMS_TCP_PROTOCOL_VERSION 54449
#define DBMS_MIN_PROTOCOL_VERSION_WITH_INITIAL_QUERY_START_TIME 54449
/// The boundary on which the blocks for asynchronous file operations should be aligned. /// The boundary on which the blocks for asynchronous file operations should be aligned.
#define DEFAULT_AIO_FILE_BLOCK_SIZE 4096 #define DEFAULT_AIO_FILE_BLOCK_SIZE 4096

View File

@ -34,6 +34,12 @@ void ClientInfo::write(WriteBuffer & out, const UInt64 server_protocol_revision)
writeBinary(initial_query_id, out); writeBinary(initial_query_id, out);
writeBinary(initial_address.toString(), out); writeBinary(initial_address.toString(), out);
if (server_protocol_revision >= DBMS_MIN_PROTOCOL_VERSION_WITH_INITIAL_QUERY_START_TIME)
{
writeBinary(initial_query_start_time, out);
writeBinary(initial_query_start_time_microseconds, out);
}
writeBinary(UInt8(interface), out); writeBinary(UInt8(interface), out);
if (interface == Interface::TCP) if (interface == Interface::TCP)
@ -109,6 +115,12 @@ void ClientInfo::read(ReadBuffer & in, const UInt64 client_protocol_revision)
readBinary(initial_address_string, in); readBinary(initial_address_string, in);
initial_address = Poco::Net::SocketAddress(initial_address_string); initial_address = Poco::Net::SocketAddress(initial_address_string);
if (client_protocol_revision >= DBMS_MIN_PROTOCOL_VERSION_WITH_INITIAL_QUERY_START_TIME)
{
readBinary(initial_query_start_time, in);
readBinary(initial_query_start_time_microseconds, in);
}
UInt8 read_interface = 0; UInt8 read_interface = 0;
readBinary(read_interface, in); readBinary(read_interface, in);
interface = Interface(read_interface); interface = Interface(read_interface);

View File

@ -61,6 +61,8 @@ public:
String initial_user; String initial_user;
String initial_query_id; String initial_query_id;
Poco::Net::SocketAddress initial_address; Poco::Net::SocketAddress initial_address;
time_t initial_query_start_time{};
Decimal64 initial_query_start_time_microseconds{};
// OpenTelemetry trace context we received from client, or which we are going // OpenTelemetry trace context we received from client, or which we are going
// to send to server. // to send to server.

View File

@ -78,6 +78,8 @@ Block QueryLogElement::createBlock()
{std::make_shared<DataTypeString>(), "initial_query_id"}, {std::make_shared<DataTypeString>(), "initial_query_id"},
{DataTypeFactory::instance().get("IPv6"), "initial_address"}, {DataTypeFactory::instance().get("IPv6"), "initial_address"},
{std::make_shared<DataTypeUInt16>(), "initial_port"}, {std::make_shared<DataTypeUInt16>(), "initial_port"},
{std::make_shared<DataTypeDateTime>(), "initial_query_start_time"},
{std::make_shared<DataTypeDateTime64>(6), "initial_query_start_time_microseconds"},
{std::make_shared<DataTypeUInt8>(), "interface"}, {std::make_shared<DataTypeUInt8>(), "interface"},
{std::make_shared<DataTypeString>(), "os_user"}, {std::make_shared<DataTypeString>(), "os_user"},
{std::make_shared<DataTypeString>(), "client_hostname"}, {std::make_shared<DataTypeString>(), "client_hostname"},
@ -256,6 +258,8 @@ void QueryLogElement::appendClientInfo(const ClientInfo & client_info, MutableCo
columns[i++]->insert(client_info.initial_query_id); columns[i++]->insert(client_info.initial_query_id);
columns[i++]->insertData(IPv6ToBinary(client_info.initial_address.host()).data(), 16); columns[i++]->insertData(IPv6ToBinary(client_info.initial_address.host()).data(), 16);
columns[i++]->insert(client_info.initial_address.port()); columns[i++]->insert(client_info.initial_address.port());
columns[i++]->insert(client_info.initial_query_start_time);
columns[i++]->insert(client_info.initial_query_start_time_microseconds);
columns[i++]->insert(UInt64(client_info.interface)); columns[i++]->insert(UInt64(client_info.interface));

View File

@ -51,6 +51,8 @@ Block QueryThreadLogElement::createBlock()
{std::make_shared<DataTypeString>(), "initial_query_id"}, {std::make_shared<DataTypeString>(), "initial_query_id"},
{DataTypeFactory::instance().get("IPv6"), "initial_address"}, {DataTypeFactory::instance().get("IPv6"), "initial_address"},
{std::make_shared<DataTypeUInt16>(), "initial_port"}, {std::make_shared<DataTypeUInt16>(), "initial_port"},
{std::make_shared<DataTypeDateTime>(), "initial_query_start_time"},
{std::make_shared<DataTypeDateTime64>(6), "initial_query_start_time_microseconds"},
{std::make_shared<DataTypeUInt8>(), "interface"}, {std::make_shared<DataTypeUInt8>(), "interface"},
{std::make_shared<DataTypeString>(), "os_user"}, {std::make_shared<DataTypeString>(), "os_user"},
{std::make_shared<DataTypeString>(), "client_hostname"}, {std::make_shared<DataTypeString>(), "client_hostname"},

View File

@ -355,6 +355,15 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
{ {
const auto current_time = std::chrono::system_clock::now(); const auto current_time = std::chrono::system_clock::now();
auto & client_info = context->getClientInfo();
// If it's an initial query, set to current_time
if (!internal && client_info.initial_query_start_time == 0)
{
client_info.initial_query_start_time = time_in_seconds(current_time);
client_info.initial_query_start_time_microseconds = time_in_microseconds(current_time);
}
#if !defined(ARCADIA_BUILD) #if !defined(ARCADIA_BUILD)
assert(internal || CurrentThread::get().getQueryContext()); assert(internal || CurrentThread::get().getQueryContext());
assert(internal || CurrentThread::get().getQueryContext()->getCurrentQueryId() == CurrentThread::getQueryId()); assert(internal || CurrentThread::get().getQueryContext()->getCurrentQueryId() == CurrentThread::getQueryId());
@ -643,7 +652,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
elem.query = query_for_logging; elem.query = query_for_logging;
elem.normalized_query_hash = normalizedQueryHash<false>(query_for_logging); elem.normalized_query_hash = normalizedQueryHash<false>(query_for_logging);
elem.client_info = context->getClientInfo(); elem.client_info = client_info;
bool log_queries = settings.log_queries && !internal; bool log_queries = settings.log_queries && !internal;

View File

@ -1135,8 +1135,9 @@ void TCPHandler::receiveQuery()
/// Per query settings are also passed via TCP. /// Per query settings are also passed via TCP.
/// We need to check them before applying due to they can violate the settings constraints. /// We need to check them before applying due to they can violate the settings constraints.
auto settings_format = (client_tcp_protocol_version >= DBMS_MIN_REVISION_WITH_SETTINGS_SERIALIZED_AS_STRINGS) ? SettingsWriteFormat::STRINGS_WITH_FLAGS auto settings_format = (client_tcp_protocol_version >= DBMS_MIN_REVISION_WITH_SETTINGS_SERIALIZED_AS_STRINGS)
: SettingsWriteFormat::BINARY; ? SettingsWriteFormat::STRINGS_WITH_FLAGS
: SettingsWriteFormat::BINARY;
Settings passed_settings; Settings passed_settings;
passed_settings.read(*in, settings_format); passed_settings.read(*in, settings_format);

View File

@ -0,0 +1 @@
1 1

View File

@ -0,0 +1,27 @@
#!/usr/bin/env bash
set -ue
# this test doesn't need 'current_database = currentDatabase()',
unset CLICKHOUSE_LOG_COMMENT
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
${CLICKHOUSE_CLIENT} -q "drop table if exists m"
${CLICKHOUSE_CLIENT} -q "create table m (dummy UInt8) ENGINE = Distributed('test_cluster_two_shards', 'system', 'one')"
query_id=$(${CLICKHOUSE_CLIENT} -q "select lower(hex(reverse(reinterpretAsString(generateUUIDv4()))))")
${CLICKHOUSE_CLIENT} -q "select * from m format Null" "--query_id=$query_id"
${CLICKHOUSE_CLIENT} -n -q "
system flush logs;
select
anyIf(initial_query_start_time, is_initial_query) = anyIf(initial_query_start_time, not is_initial_query),
anyIf(initial_query_start_time_microseconds, is_initial_query) = anyIf(initial_query_start_time_microseconds, not is_initial_query)
from system.query_log
where initial_query_id = '$query_id' and type = 'QueryFinish';
"
${CLICKHOUSE_CLIENT} -q "drop table m"