diff --git a/programs/client/Client.cpp b/programs/client/Client.cpp index f791c39bad1..e1a33231592 100644 --- a/programs/client/Client.cpp +++ b/programs/client/Client.cpp @@ -1173,12 +1173,12 @@ void Client::processOptions(const OptionsDescription & options_description, { String traceparent = options["opentelemetry-traceparent"].as(); String error; - if (!global_context->getClientInfo().client_trace_context.parseTraceparentHeader(traceparent, error)) + if (!global_context->getClientTraceContext().parseTraceparentHeader(traceparent, error)) throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot parse OpenTelemetry traceparent '{}': {}", traceparent, error); } if (options.count("opentelemetry-tracestate")) - global_context->getClientInfo().client_trace_context.tracestate = options["opentelemetry-tracestate"].as(); + global_context->getClientTraceContext().tracestate = options["opentelemetry-tracestate"].as(); } @@ -1238,10 +1238,9 @@ void Client::processConfig() global_context->getSettingsRef().max_insert_block_size); } - ClientInfo & client_info = global_context->getClientInfo(); - client_info.setInitialQuery(); - client_info.quota_key = config().getString("quota_key", ""); - client_info.query_kind = query_kind; + global_context->setQueryKindInitial(); + global_context->setQuotaClientKey(config().getString("quota_key", "")); + global_context->setQueryKind(query_kind); } diff --git a/programs/local/LocalServer.cpp b/programs/local/LocalServer.cpp index 96924e3c8d9..3c2a8ae3152 100644 --- a/programs/local/LocalServer.cpp +++ b/programs/local/LocalServer.cpp @@ -737,9 +737,8 @@ void LocalServer::processConfig() for (const auto & [key, value] : prompt_substitutions) boost::replace_all(prompt_by_server_display_name, "{" + key + "}", value); - ClientInfo & client_info = global_context->getClientInfo(); - client_info.setInitialQuery(); - client_info.query_kind = query_kind; + global_context->setQueryKindInitial(); + global_context->setQueryKind(query_kind); } diff --git a/src/Databases/DatabaseReplicated.cpp b/src/Databases/DatabaseReplicated.cpp index 661afc6bf1f..25c23e2be17 100644 --- a/src/Databases/DatabaseReplicated.cpp +++ b/src/Databases/DatabaseReplicated.cpp @@ -814,8 +814,8 @@ void DatabaseReplicated::recoverLostReplica(const ZooKeeperPtr & current_zookeep { auto query_context = Context::createCopy(getContext()); query_context->makeQueryContext(); - query_context->getClientInfo().query_kind = ClientInfo::QueryKind::SECONDARY_QUERY; - query_context->getClientInfo().is_replicated_database_internal = true; + query_context->setQueryKind(ClientInfo::QueryKind::SECONDARY_QUERY); + query_context->setQueryKindReplicatedDatabaseInternal(); query_context->setCurrentDatabase(getDatabaseName()); query_context->setCurrentQueryId(""); auto txn = std::make_shared(current_zookeeper, zookeeper_path, false, ""); diff --git a/src/Databases/MySQL/MaterializedMySQLSyncThread.cpp b/src/Databases/MySQL/MaterializedMySQLSyncThread.cpp index a01ab2a15a8..379e6ef5097 100644 --- a/src/Databases/MySQL/MaterializedMySQLSyncThread.cpp +++ b/src/Databases/MySQL/MaterializedMySQLSyncThread.cpp @@ -59,7 +59,7 @@ static ContextMutablePtr createQueryContext(ContextPtr context) query_context->setSettings(new_query_settings); query_context->setInternalQuery(true); - query_context->getClientInfo().query_kind = ClientInfo::QueryKind::SECONDARY_QUERY; + query_context->setQueryKind(ClientInfo::QueryKind::SECONDARY_QUERY); query_context->setCurrentQueryId(""); // generate random query_id return query_context; } diff --git a/src/Interpreters/AsynchronousInsertQueue.cpp b/src/Interpreters/AsynchronousInsertQueue.cpp index 0da762699d2..6081919a120 100644 --- a/src/Interpreters/AsynchronousInsertQueue.cpp +++ b/src/Interpreters/AsynchronousInsertQueue.cpp @@ -421,12 +421,10 @@ try auto insert_query_id = insert_context->getCurrentQueryId(); auto query_start_time = std::chrono::system_clock::now(); Stopwatch start_watch{CLOCK_MONOTONIC}; - ClientInfo & client_info = insert_context->getClientInfo(); - client_info.query_kind = ClientInfo::QueryKind::INITIAL_QUERY; - client_info.initial_query_start_time = timeInSeconds(query_start_time); - client_info.initial_query_start_time_microseconds = timeInMicroseconds(query_start_time); - client_info.current_query_id = insert_query_id; - client_info.initial_query_id = insert_query_id; + insert_context->setQueryKind(ClientInfo::QueryKind::INITIAL_QUERY); + insert_context->setInitialQueryStartTime(query_start_time); + insert_context->setCurrentQueryId(insert_query_id); + insert_context->setInitialQueryId(insert_query_id); size_t log_queries_cut_to_length = insert_context->getSettingsRef().log_queries_cut_to_length; String query_for_logging = insert_query.hasSecretParts() ? insert_query.formatForLogging(log_queries_cut_to_length) diff --git a/src/Interpreters/ClusterProxy/executeQuery.cpp b/src/Interpreters/ClusterProxy/executeQuery.cpp index e2f1dfe8ba7..3dea52faf46 100644 --- a/src/Interpreters/ClusterProxy/executeQuery.cpp +++ b/src/Interpreters/ClusterProxy/executeQuery.cpp @@ -171,7 +171,7 @@ void executeQuery( SelectStreamFactory::Shards remote_shards; auto new_context = updateSettingsForCluster(*query_info.getCluster(), context, settings, main_table, &query_info, log); - new_context->getClientInfo().distributed_depth += 1; + new_context->increaseDistributedDepth(); size_t shards = query_info.getCluster()->getShardCount(); for (const auto & shard_info : query_info.getCluster()->getShardsInfo()) diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index 5fae9374705..c097eeb87f1 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -3850,6 +3850,129 @@ void Context::resetInputCallbacks() } +void Context::setClientInfo(const ClientInfo & client_info_) +{ + client_info = client_info_; + need_recalculate_access = true; +} + +void Context::setClientName(const String & client_name) +{ + client_info.client_name = client_name; +} + +void Context::setClientInterface(ClientInfo::Interface interface) +{ + client_info.interface = interface; + need_recalculate_access = true; +} + +void Context::setClientVersion(UInt64 client_version_major, UInt64 client_version_minor, UInt64 client_version_patch, unsigned client_tcp_protocol_version) +{ + client_info.client_version_major = client_version_major; + client_info.client_version_minor = client_version_minor; + client_info.client_version_patch = client_version_patch; + client_info.client_tcp_protocol_version = client_tcp_protocol_version; +} + +void Context::setClientConnectionId(uint32_t connection_id_) +{ + client_info.connection_id = connection_id_; +} + +void Context::setHttpClientInfo(ClientInfo::HTTPMethod http_method, const String & http_user_agent, const String & http_referer) +{ + client_info.http_method = http_method; + client_info.http_user_agent = http_user_agent; + client_info.http_referer = http_referer; + need_recalculate_access = true; +} + +void Context::setForwardedFor(const String & forwarded_for) +{ + client_info.forwarded_for = forwarded_for; + need_recalculate_access = true; +} + +void Context::setQueryKind(ClientInfo::QueryKind query_kind) +{ + client_info.query_kind = query_kind; +} + +void Context::setQueryKindInitial() +{ + /// TODO: Try to combine this function with setQueryKind(). + client_info.setInitialQuery(); +} + +void Context::setQueryKindReplicatedDatabaseInternal() +{ + /// TODO: Try to combine this function with setQueryKind(). + client_info.is_replicated_database_internal = true; +} + +void Context::setCurrentUserName(const String & current_user_name) +{ + /// TODO: Try to combine this function with setUser(). + client_info.current_user = current_user_name; + need_recalculate_access = true; +} + +void Context::setCurrentAddress(const Poco::Net::SocketAddress & current_address) +{ + client_info.current_address = current_address; + need_recalculate_access = true; +} + +void Context::setInitialUserName(const String & initial_user_name) +{ + client_info.initial_user = initial_user_name; + need_recalculate_access = true; +} + +void Context::setInitialAddress(const Poco::Net::SocketAddress & initial_address) +{ + client_info.initial_address = initial_address; +} + +void Context::setInitialQueryId(const String & initial_query_id) +{ + client_info.initial_query_id = initial_query_id; +} + +void Context::setInitialQueryStartTime(std::chrono::time_point initial_query_start_time) +{ + client_info.initial_query_start_time = timeInSeconds(initial_query_start_time); + client_info.initial_query_start_time_microseconds = timeInMicroseconds(initial_query_start_time); +} + +void Context::setQuotaClientKey(const String & quota_key_) +{ + client_info.quota_key = quota_key_; + need_recalculate_access = true; +} + +void Context::setConnectionClientVersion(UInt64 client_version_major, UInt64 client_version_minor, UInt64 client_version_patch, unsigned client_tcp_protocol_version) +{ + client_info.connection_client_version_major = client_version_major; + client_info.connection_client_version_minor = client_version_minor; + client_info.connection_client_version_patch = client_version_patch; + client_info.connection_tcp_protocol_version = client_tcp_protocol_version; +} + +void Context::setReplicaInfo(bool collaborate_with_initiator, size_t all_replicas_count, size_t number_of_current_replica) +{ + client_info.collaborate_with_initiator = collaborate_with_initiator; + client_info.count_participating_replicas = all_replicas_count; + client_info.number_of_current_replica = number_of_current_replica; +} + +void Context::increaseDistributedDepth() +{ + ++client_info.distributed_depth; +} + + StorageID Context::resolveStorageID(StorageID storage_id, StorageNamespace where) const { if (storage_id.uuid != UUIDHelpers::Nil) diff --git a/src/Interpreters/Context.h b/src/Interpreters/Context.h index 172f3818dfd..afc4bfde6a8 100644 --- a/src/Interpreters/Context.h +++ b/src/Interpreters/Context.h @@ -593,9 +593,33 @@ public: InputBlocksReader getInputBlocksReaderCallback() const; void resetInputCallbacks(); - ClientInfo & getClientInfo() { return client_info; } + /// Returns information about the client executing a query. const ClientInfo & getClientInfo() const { return client_info; } + /// Modify stored in the context information about the client executing a query. + void setClientInfo(const ClientInfo & client_info_); + void setClientName(const String & client_name); + void setClientInterface(ClientInfo::Interface interface); + void setClientVersion(UInt64 client_version_major, UInt64 client_version_minor, UInt64 client_version_patch, unsigned client_tcp_protocol_version); + void setClientConnectionId(uint32_t connection_id); + void setHttpClientInfo(ClientInfo::HTTPMethod http_method, const String & http_user_agent, const String & http_referer); + void setForwardedFor(const String & forwarded_for); + void setQueryKind(ClientInfo::QueryKind query_kind); + void setQueryKindInitial(); + void setQueryKindReplicatedDatabaseInternal(); + void setCurrentUserName(const String & current_user_name); + void setCurrentAddress(const Poco::Net::SocketAddress & current_address); + void setInitialUserName(const String & initial_user_name); + void setInitialAddress(const Poco::Net::SocketAddress & initial_address); + void setInitialQueryId(const String & initial_query_id); + void setInitialQueryStartTime(std::chrono::time_point initial_query_start_time); + void setQuotaClientKey(const String & quota_key); + void setConnectionClientVersion(UInt64 client_version_major, UInt64 client_version_minor, UInt64 client_version_patch, unsigned client_tcp_protocol_version); + void setReplicaInfo(bool collaborate_with_initiator, size_t all_replicas_count, size_t number_of_current_replica); + void increaseDistributedDepth(); + const OpenTelemetry::TracingContext & getClientTraceContext() const { return client_info.client_trace_context; } + OpenTelemetry::TracingContext & getClientTraceContext() { return client_info.client_trace_context; } + enum StorageNamespace { ResolveGlobal = 1u, /// Database name must be specified diff --git a/src/Interpreters/DDLTask.cpp b/src/Interpreters/DDLTask.cpp index b24856a6146..4e684f5899f 100644 --- a/src/Interpreters/DDLTask.cpp +++ b/src/Interpreters/DDLTask.cpp @@ -199,7 +199,7 @@ ContextMutablePtr DDLTaskBase::makeQueryContext(ContextPtr from_context, const Z auto query_context = Context::createCopy(from_context); query_context->makeQueryContext(); query_context->setCurrentQueryId(""); // generate random query_id - query_context->getClientInfo().query_kind = ClientInfo::QueryKind::SECONDARY_QUERY; + query_context->setQueryKind(ClientInfo::QueryKind::SECONDARY_QUERY); if (entry.settings) query_context->applySettingsChanges(*entry.settings); return query_context; @@ -439,8 +439,8 @@ void DatabaseReplicatedTask::parseQueryFromEntry(ContextPtr context) ContextMutablePtr DatabaseReplicatedTask::makeQueryContext(ContextPtr from_context, const ZooKeeperPtr & zookeeper) { auto query_context = DDLTaskBase::makeQueryContext(from_context, zookeeper); - query_context->getClientInfo().query_kind = ClientInfo::QueryKind::SECONDARY_QUERY; - query_context->getClientInfo().is_replicated_database_internal = true; + query_context->setQueryKind(ClientInfo::QueryKind::SECONDARY_QUERY); + query_context->setQueryKindReplicatedDatabaseInternal(); query_context->setCurrentDatabase(database->getDatabaseName()); auto txn = std::make_shared(zookeeper, database->zookeeper_path, is_initial_query, entry_path); diff --git a/src/Interpreters/DDLWorker.cpp b/src/Interpreters/DDLWorker.cpp index 81c78000ac3..193bb5b6ab0 100644 --- a/src/Interpreters/DDLWorker.cpp +++ b/src/Interpreters/DDLWorker.cpp @@ -476,7 +476,7 @@ bool DDLWorker::tryExecuteQuery(DDLTaskBase & task, const ZooKeeperPtr & zookeep query_context->setSetting("implicit_transaction", Field{0}); } - query_context->getClientInfo().initial_query_id = task.entry.initial_query_id; + query_context->setInitialQueryId(task.entry.initial_query_id); if (!task.is_initial_query) query_scope.emplace(query_context); diff --git a/src/Interpreters/InterpreterDropQuery.cpp b/src/Interpreters/InterpreterDropQuery.cpp index 0beb4492aef..616cf80a446 100644 --- a/src/Interpreters/InterpreterDropQuery.cpp +++ b/src/Interpreters/InterpreterDropQuery.cpp @@ -451,11 +451,11 @@ void InterpreterDropQuery::executeDropQuery(ASTDropQuery::Kind kind, ContextPtr auto drop_context = Context::createCopy(global_context); if (ignore_sync_setting) drop_context->setSetting("database_atomic_wait_for_drop_and_detach_synchronously", false); - drop_context->getClientInfo().query_kind = ClientInfo::QueryKind::SECONDARY_QUERY; + drop_context->setQueryKind(ClientInfo::QueryKind::SECONDARY_QUERY); if (auto txn = current_context->getZooKeeperMetadataTransaction()) { /// For Replicated database - drop_context->getClientInfo().is_replicated_database_internal = true; + drop_context->setQueryKindReplicatedDatabaseInternal(); drop_context->setQueryContext(std::const_pointer_cast(current_context)); drop_context->initZooKeeperMetadataTransaction(txn, true); } diff --git a/src/Interpreters/InterpreterSelectQuery.cpp b/src/Interpreters/InterpreterSelectQuery.cpp index 32812151b59..d07a6521544 100644 --- a/src/Interpreters/InterpreterSelectQuery.cpp +++ b/src/Interpreters/InterpreterSelectQuery.cpp @@ -3183,7 +3183,7 @@ void InterpreterSelectQuery::initSettings() if (query.settings()) InterpreterSetQuery(query.settings(), context).executeForCurrentContext(options.ignore_setting_constraints); - auto & client_info = context->getClientInfo(); + const auto & client_info = context->getClientInfo(); auto min_major = DBMS_MIN_MAJOR_VERSION_WITH_CURRENT_AGGREGATION_VARIANT_SELECTION_METHOD; auto min_minor = DBMS_MIN_MINOR_VERSION_WITH_CURRENT_AGGREGATION_VARIANT_SELECTION_METHOD; diff --git a/src/Interpreters/Session.cpp b/src/Interpreters/Session.cpp index 8571f20b91e..97b056cfc32 100644 --- a/src/Interpreters/Session.cpp +++ b/src/Interpreters/Session.cpp @@ -299,7 +299,10 @@ Session::~Session() if (notified_session_log_about_login) { if (auto session_log = getSessionLog()) + { + /// TODO: We have to ensure that the same info is added to the session log on a LoginSuccess event and on the corresponding Logout event. session_log->addLogOut(auth_id, user, getClientInfo()); + } } } @@ -368,17 +371,117 @@ void Session::onAuthenticationFailure(const std::optional & user_name, c } } -ClientInfo & Session::getClientInfo() -{ - /// FIXME it may produce different info for LoginSuccess and the corresponding Logout entries in the session log - return session_context ? session_context->getClientInfo() : *prepared_client_info; -} - const ClientInfo & Session::getClientInfo() const { return session_context ? session_context->getClientInfo() : *prepared_client_info; } +void Session::setClientInfo(const ClientInfo & client_info) +{ + if (session_context) + session_context->setClientInfo(client_info); + else + prepared_client_info = client_info; +} + +void Session::setClientName(const String & client_name) +{ + if (session_context) + session_context->setClientName(client_name); + else + prepared_client_info->client_name = client_name; +} + +void Session::setClientInterface(ClientInfo::Interface interface) +{ + if (session_context) + session_context->setClientInterface(interface); + else + prepared_client_info->interface = interface; +} + +void Session::setClientVersion(UInt64 client_version_major, UInt64 client_version_minor, UInt64 client_version_patch, unsigned client_tcp_protocol_version) +{ + if (session_context) + { + session_context->setClientVersion(client_version_major, client_version_minor, client_version_patch, client_tcp_protocol_version); + } + else + { + prepared_client_info->client_version_major = client_version_major; + prepared_client_info->client_version_minor = client_version_minor; + prepared_client_info->client_version_patch = client_version_patch; + prepared_client_info->client_tcp_protocol_version = client_tcp_protocol_version; + } +} + +void Session::setClientConnectionId(uint32_t connection_id) +{ + if (session_context) + session_context->setClientConnectionId(connection_id); + else + prepared_client_info->connection_id = connection_id; +} + +void Session::setHttpClientInfo(ClientInfo::HTTPMethod http_method, const String & http_user_agent, const String & http_referer) +{ + if (session_context) + { + session_context->setHttpClientInfo(http_method, http_user_agent, http_referer); + } + else + { + prepared_client_info->http_method = http_method; + prepared_client_info->http_user_agent = http_user_agent; + prepared_client_info->http_referer = http_referer; + } +} + +void Session::setForwardedFor(const String & forwarded_for) +{ + if (session_context) + session_context->setForwardedFor(forwarded_for); + else + prepared_client_info->forwarded_for = forwarded_for; +} + +void Session::setQuotaClientKey(const String & quota_key) +{ + if (session_context) + session_context->setQuotaClientKey(quota_key); + else + prepared_client_info->quota_key = quota_key; +} + +void Session::setConnectionClientVersion(UInt64 client_version_major, UInt64 client_version_minor, UInt64 client_version_patch, unsigned client_tcp_protocol_version) +{ + if (session_context) + { + session_context->setConnectionClientVersion(client_version_major, client_version_minor, client_version_patch, client_tcp_protocol_version); + } + else + { + prepared_client_info->connection_client_version_major = client_version_major; + prepared_client_info->connection_client_version_minor = client_version_minor; + prepared_client_info->connection_client_version_patch = client_version_patch; + prepared_client_info->connection_tcp_protocol_version = client_tcp_protocol_version; + } +} + +const OpenTelemetry::TracingContext & Session::getClientTraceContext() const +{ + if (session_context) + return session_context->getClientTraceContext(); + return prepared_client_info->client_trace_context; +} + +OpenTelemetry::TracingContext & Session::getClientTraceContext() +{ + if (session_context) + return session_context->getClientTraceContext(); + return prepared_client_info->client_trace_context; +} + ContextMutablePtr Session::makeSessionContext() { if (session_context) @@ -396,8 +499,7 @@ ContextMutablePtr Session::makeSessionContext() new_session_context->makeSessionContext(); /// Copy prepared client info to the new session context. - auto & res_client_info = new_session_context->getClientInfo(); - res_client_info = std::move(prepared_client_info).value(); + new_session_context->setClientInfo(*prepared_client_info); prepared_client_info.reset(); /// Set user information for the new context: current profiles, roles, access rights. @@ -436,8 +538,7 @@ ContextMutablePtr Session::makeSessionContext(const String & session_name_, std: /// Copy prepared client info to the session context, no matter it's been just created or not. /// If we continue using a previously created session context found by session ID /// it's necessary to replace the client info in it anyway, because it contains actual connection information (client address, etc.) - auto & res_client_info = new_session_context->getClientInfo(); - res_client_info = std::move(prepared_client_info).value(); + new_session_context->setClientInfo(*prepared_client_info); prepared_client_info.reset(); /// Set user information for the new context: current profiles, roles, access rights. @@ -492,27 +593,26 @@ ContextMutablePtr Session::makeQueryContextImpl(const ClientInfo * client_info_t } /// Copy the specified client info to the new query context. - auto & res_client_info = query_context->getClientInfo(); if (client_info_to_move) - res_client_info = std::move(*client_info_to_move); + query_context->setClientInfo(*client_info_to_move); else if (client_info_to_copy && (client_info_to_copy != &getClientInfo())) - res_client_info = *client_info_to_copy; + query_context->setClientInfo(*client_info_to_copy); /// Copy current user's name and address if it was authenticated after query_client_info was initialized. if (prepared_client_info && !prepared_client_info->current_user.empty()) { - res_client_info.current_user = prepared_client_info->current_user; - res_client_info.current_address = prepared_client_info->current_address; + query_context->setCurrentUserName(prepared_client_info->current_user); + query_context->setCurrentAddress(prepared_client_info->current_address); } /// Set parameters of initial query. - if (res_client_info.query_kind == ClientInfo::QueryKind::NO_QUERY) - res_client_info.query_kind = ClientInfo::QueryKind::INITIAL_QUERY; + if (query_context->getClientInfo().query_kind == ClientInfo::QueryKind::NO_QUERY) + query_context->setQueryKind(ClientInfo::QueryKind::INITIAL_QUERY); - if (res_client_info.query_kind == ClientInfo::QueryKind::INITIAL_QUERY) + if (query_context->getClientInfo().query_kind == ClientInfo::QueryKind::INITIAL_QUERY) { - res_client_info.initial_user = res_client_info.current_user; - res_client_info.initial_address = res_client_info.current_address; + query_context->setInitialUserName(query_context->getClientInfo().current_user); + query_context->setInitialAddress(query_context->getClientInfo().current_address); } /// Set user information for the new context: current profiles, roles, access rights. @@ -563,4 +663,3 @@ void Session::closeSession(const String & session_id) } } - diff --git a/src/Interpreters/Session.h b/src/Interpreters/Session.h index d7c06a60464..36f811ccd24 100644 --- a/src/Interpreters/Session.h +++ b/src/Interpreters/Session.h @@ -54,10 +54,23 @@ public: /// Writes a row about login failure into session log (if enabled) void onAuthenticationFailure(const std::optional & user_name, const Poco::Net::SocketAddress & address_, const Exception & e); - /// Returns a reference to session ClientInfo. - ClientInfo & getClientInfo(); + /// Returns a reference to the session's ClientInfo. const ClientInfo & getClientInfo() const; + /// Modify the session's ClientInfo. + void setClientInfo(const ClientInfo & client_info); + void setClientName(const String & client_name); + void setClientInterface(ClientInfo::Interface interface); + void setClientVersion(UInt64 client_version_major, UInt64 client_version_minor, UInt64 client_version_patch, unsigned client_tcp_protocol_version); + void setClientConnectionId(uint32_t connection_id); + void setHttpClientInfo(ClientInfo::HTTPMethod http_method, const String & http_user_agent, const String & http_referer); + void setForwardedFor(const String & forwarded_for); + void setQuotaClientKey(const String & quota_key); + void setConnectionClientVersion(UInt64 client_version_major, UInt64 client_version_minor, UInt64 client_version_patch, unsigned client_tcp_protocol_version); + + const OpenTelemetry::TracingContext & getClientTraceContext() const; + OpenTelemetry::TracingContext & getClientTraceContext(); + /// Makes a session context, can be used one or zero times. /// The function also assigns an user to this context. ContextMutablePtr makeSessionContext(); diff --git a/src/Interpreters/executeQuery.cpp b/src/Interpreters/executeQuery.cpp index 4b76d20f31d..2c74039463e 100644 --- a/src/Interpreters/executeQuery.cpp +++ b/src/Interpreters/executeQuery.cpp @@ -655,7 +655,7 @@ static std::tuple executeQueryImpl( /// the value passed by the client Stopwatch start_watch{CLOCK_MONOTONIC}; - auto & client_info = context->getClientInfo(); + const auto & client_info = context->getClientInfo(); if (!internal) { @@ -667,8 +667,7 @@ static std::tuple executeQueryImpl( // On the other hand, if it's initialized then take it as the start of the query if (client_info.initial_query_start_time == 0) { - client_info.initial_query_start_time = timeInSeconds(query_start_time); - client_info.initial_query_start_time_microseconds = timeInMicroseconds(query_start_time); + context->setInitialQueryStartTime(query_start_time); } else { diff --git a/src/Processors/QueryPlan/DistributedCreateLocalPlan.cpp b/src/Processors/QueryPlan/DistributedCreateLocalPlan.cpp index 9b9cc221ca8..b251eec2d28 100644 --- a/src/Processors/QueryPlan/DistributedCreateLocalPlan.cpp +++ b/src/Processors/QueryPlan/DistributedCreateLocalPlan.cpp @@ -72,14 +72,10 @@ std::unique_ptr createLocalPlan( if (coordinator) { new_context->parallel_reading_coordinator = coordinator; - new_context->getClientInfo().interface = ClientInfo::Interface::LOCAL; - new_context->getClientInfo().collaborate_with_initiator = true; - new_context->getClientInfo().query_kind = ClientInfo::QueryKind::SECONDARY_QUERY; - new_context->getClientInfo().count_participating_replicas = replica_count; - new_context->getClientInfo().number_of_current_replica = replica_num; - new_context->getClientInfo().connection_client_version_major = DBMS_VERSION_MAJOR; - new_context->getClientInfo().connection_client_version_minor = DBMS_VERSION_MINOR; - new_context->getClientInfo().connection_tcp_protocol_version = DBMS_TCP_PROTOCOL_VERSION; + new_context->setClientInterface(ClientInfo::Interface::LOCAL); + new_context->setQueryKind(ClientInfo::QueryKind::SECONDARY_QUERY); + new_context->setReplicaInfo(true, replica_count, replica_num); + new_context->setConnectionClientVersion(DBMS_VERSION_MAJOR, DBMS_VERSION_MINOR, DBMS_VERSION_PATCH, DBMS_TCP_PROTOCOL_VERSION); new_context->setParallelReplicasGroupUUID(group_uuid); new_context->setMergeTreeAllRangesCallback([coordinator](InitialAllRangesAnnouncement announcement) { diff --git a/src/Server/GRPCServer.cpp b/src/Server/GRPCServer.cpp index bf9ba20a5cf..67d30012b0e 100644 --- a/src/Server/GRPCServer.cpp +++ b/src/Server/GRPCServer.cpp @@ -798,7 +798,7 @@ namespace /// Authentication. session.emplace(iserver.context(), ClientInfo::Interface::GRPC); session->authenticate(user, password, user_address); - session->getClientInfo().quota_key = quota_key; + session->setQuotaClientKey(quota_key); ClientInfo client_info = session->getClientInfo(); diff --git a/src/Server/HTTPHandler.cpp b/src/Server/HTTPHandler.cpp index f7cdb905710..069670c84a5 100644 --- a/src/Server/HTTPHandler.cpp +++ b/src/Server/HTTPHandler.cpp @@ -474,7 +474,6 @@ bool HTTPHandler::authenticateUser( } /// Set client info. It will be used for quota accounting parameters in 'setUser' method. - ClientInfo & client_info = session->getClientInfo(); ClientInfo::HTTPMethod http_method = ClientInfo::HTTPMethod::UNKNOWN; if (request.getMethod() == HTTPServerRequest::HTTP_GET) @@ -482,15 +481,13 @@ bool HTTPHandler::authenticateUser( else if (request.getMethod() == HTTPServerRequest::HTTP_POST) http_method = ClientInfo::HTTPMethod::POST; - client_info.http_method = http_method; - client_info.http_user_agent = request.get("User-Agent", ""); - client_info.http_referer = request.get("Referer", ""); - client_info.forwarded_for = request.get("X-Forwarded-For", ""); - client_info.quota_key = quota_key; + session->setHttpClientInfo(http_method, request.get("User-Agent", ""), request.get("Referer", "")); + session->setForwardedFor(request.get("X-Forwarded-For", "")); + session->setQuotaClientKey(quota_key); /// Extract the last entry from comma separated list of forwarded_for addresses. /// Only the last proxy can be trusted (if any). - String forwarded_address = client_info.getLastForwardedFor(); + String forwarded_address = session->getClientInfo().getLastForwardedFor(); try { if (!forwarded_address.empty() && server.config().getBool("auth_use_forwarded_address", false)) @@ -988,22 +985,22 @@ void HTTPHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse } // Parse the OpenTelemetry traceparent header. - ClientInfo& client_info = session->getClientInfo(); + auto & client_trace_context = session->getClientTraceContext(); if (request.has("traceparent")) { std::string opentelemetry_traceparent = request.get("traceparent"); std::string error; - if (!client_info.client_trace_context.parseTraceparentHeader(opentelemetry_traceparent, error)) + if (!client_trace_context.parseTraceparentHeader(opentelemetry_traceparent, error)) { LOG_DEBUG(log, "Failed to parse OpenTelemetry traceparent header '{}': {}", opentelemetry_traceparent, error); } - client_info.client_trace_context.tracestate = request.get("tracestate", ""); + client_trace_context.tracestate = request.get("tracestate", ""); } // Setup tracing context for this thread auto context = session->sessionOrGlobalContext(); thread_trace_context = std::make_unique("HTTPHandler", - client_info.client_trace_context, + client_trace_context, context->getSettingsRef(), context->getOpenTelemetrySpanLog()); thread_trace_context->root_span.kind = OpenTelemetry::SERVER; diff --git a/src/Server/MySQLHandler.cpp b/src/Server/MySQLHandler.cpp index 7318b0ad89b..f98b86e6cf8 100644 --- a/src/Server/MySQLHandler.cpp +++ b/src/Server/MySQLHandler.cpp @@ -94,7 +94,7 @@ void MySQLHandler::run() session = std::make_unique(server.context(), ClientInfo::Interface::MYSQL); SCOPE_EXIT({ session.reset(); }); - session->getClientInfo().connection_id = connection_id; + session->setClientConnectionId(connection_id); in = std::make_shared(socket()); out = std::make_shared(socket()); diff --git a/src/Server/PostgreSQLHandler.cpp b/src/Server/PostgreSQLHandler.cpp index 36b05932979..7b078154252 100644 --- a/src/Server/PostgreSQLHandler.cpp +++ b/src/Server/PostgreSQLHandler.cpp @@ -58,7 +58,7 @@ void PostgreSQLHandler::run() session = std::make_unique(server.context(), ClientInfo::Interface::POSTGRESQL); SCOPE_EXIT({ session.reset(); }); - session->getClientInfo().connection_id = connection_id; + session->setClientConnectionId(connection_id); try { diff --git a/src/Server/TCPHandler.cpp b/src/Server/TCPHandler.cpp index 36566832ebc..a747f06f1ce 100644 --- a/src/Server/TCPHandler.cpp +++ b/src/Server/TCPHandler.cpp @@ -1177,21 +1177,12 @@ std::unique_ptr TCPHandler::makeSession() auto res = std::make_unique(server.context(), interface, socket().secure(), certificate); - auto & client_info = res->getClientInfo(); - client_info.forwarded_for = forwarded_for; - client_info.client_name = client_name; - client_info.client_version_major = client_version_major; - client_info.client_version_minor = client_version_minor; - client_info.client_version_patch = client_version_patch; - client_info.client_tcp_protocol_version = client_tcp_protocol_version; - - client_info.connection_client_version_major = client_version_major; - client_info.connection_client_version_minor = client_version_minor; - client_info.connection_client_version_patch = client_version_patch; - client_info.connection_tcp_protocol_version = client_tcp_protocol_version; - - client_info.quota_key = quota_key; - client_info.interface = interface; + res->setForwardedFor(forwarded_for); + res->setClientName(client_name); + res->setClientVersion(client_version_major, client_version_minor, client_version_patch, client_tcp_protocol_version); + res->setConnectionClientVersion(client_version_major, client_version_minor, client_version_patch, client_tcp_protocol_version); + res->setQuotaClientKey(quota_key); + res->setClientInterface(interface); return res; } @@ -1253,7 +1244,7 @@ void TCPHandler::receiveHello() } session = makeSession(); - auto & client_info = session->getClientInfo(); + const auto & client_info = session->getClientInfo(); #if USE_SSL /// Authentication with SSL user certificate @@ -1286,7 +1277,7 @@ void TCPHandler::receiveAddendum() { readStringBinary(quota_key, *in); if (!is_interserver_mode) - session->getClientInfo().quota_key = quota_key; + session->setQuotaClientKey(quota_key); } } diff --git a/src/Storages/Distributed/DistributedSink.cpp b/src/Storages/Distributed/DistributedSink.cpp index 875764f7633..0dcdae01ba9 100644 --- a/src/Storages/Distributed/DistributedSink.cpp +++ b/src/Storages/Distributed/DistributedSink.cpp @@ -132,7 +132,7 @@ DistributedSink::DistributedSink( const auto & settings = context->getSettingsRef(); if (settings.max_distributed_depth && context->getClientInfo().distributed_depth >= settings.max_distributed_depth) throw Exception(ErrorCodes::TOO_LARGE_DISTRIBUTED_DEPTH, "Maximum distributed depth exceeded"); - context->getClientInfo().distributed_depth += 1; + context->increaseDistributedDepth(); random_shard_insert = settings.insert_distributed_one_random_shard && !storage.has_sharding_key; } diff --git a/src/Storages/StorageDistributed.cpp b/src/Storages/StorageDistributed.cpp index c46192ab43b..c028cf5ec77 100644 --- a/src/Storages/StorageDistributed.cpp +++ b/src/Storages/StorageDistributed.cpp @@ -914,7 +914,7 @@ std::optional StorageDistributed::distributedWriteBetweenDistribu QueryPipeline pipeline; ContextMutablePtr query_context = Context::createCopy(local_context); - ++query_context->getClientInfo().distributed_depth; + query_context->increaseDistributedDepth(); for (size_t shard_index : collections::range(0, shards_info.size())) { @@ -976,7 +976,7 @@ std::optional StorageDistributed::distributedWriteFromClusterStor QueryPipeline pipeline; ContextMutablePtr query_context = Context::createCopy(local_context); - ++query_context->getClientInfo().distributed_depth; + query_context->increaseDistributedDepth(); /// Here we take addresses from destination cluster and assume source table exists on these nodes for (const auto & replicas : getCluster()->getShardsAddresses()) diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 6894368841f..066f5a42f46 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -5079,7 +5079,7 @@ std::optional StorageReplicatedMergeTree::distributedWriteFromClu QueryPipeline pipeline; ContextMutablePtr query_context = Context::createCopy(local_context); - ++query_context->getClientInfo().distributed_depth; + query_context->increaseDistributedDepth(); for (const auto & replicas : src_cluster->getShardsAddresses()) { diff --git a/src/Storages/WindowView/StorageWindowView.cpp b/src/Storages/WindowView/StorageWindowView.cpp index 242e8e5d570..0f506040cd9 100644 --- a/src/Storages/WindowView/StorageWindowView.cpp +++ b/src/Storages/WindowView/StorageWindowView.cpp @@ -992,7 +992,7 @@ void StorageWindowView::cleanup() auto cleanup_context = Context::createCopy(getContext()); cleanup_context->makeQueryContext(); cleanup_context->setCurrentQueryId(""); - cleanup_context->getClientInfo().is_replicated_database_internal = true; + cleanup_context->setQueryKindReplicatedDatabaseInternal(); InterpreterAlterQuery interpreter_alter(alter_query, cleanup_context); interpreter_alter.execute();