Remove non-const function Context::getClientInfo().

This commit is contained in:
Vitaly Baranov 2023-07-07 12:49:50 +02:00
parent 0e4b75a282
commit 815a3857de
25 changed files with 335 additions and 97 deletions

View File

@ -1173,12 +1173,12 @@ void Client::processOptions(const OptionsDescription & options_description,
{
String traceparent = options["opentelemetry-traceparent"].as<std::string>();
String error;
if (!global_context->getClientInfo().client_trace_context.parseTraceparentHeader(traceparent, error))
if (!global_context->getClientTraceContext().parseTraceparentHeader(traceparent, error))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot parse OpenTelemetry traceparent '{}': {}", traceparent, error);
}
if (options.count("opentelemetry-tracestate"))
global_context->getClientInfo().client_trace_context.tracestate = options["opentelemetry-tracestate"].as<std::string>();
global_context->getClientTraceContext().tracestate = options["opentelemetry-tracestate"].as<std::string>();
}
@ -1238,10 +1238,9 @@ void Client::processConfig()
global_context->getSettingsRef().max_insert_block_size);
}
ClientInfo & client_info = global_context->getClientInfo();
client_info.setInitialQuery();
client_info.quota_key = config().getString("quota_key", "");
client_info.query_kind = query_kind;
global_context->setQueryKindInitial();
global_context->setQuotaClientKey(config().getString("quota_key", ""));
global_context->setQueryKind(query_kind);
}

View File

@ -737,9 +737,8 @@ void LocalServer::processConfig()
for (const auto & [key, value] : prompt_substitutions)
boost::replace_all(prompt_by_server_display_name, "{" + key + "}", value);
ClientInfo & client_info = global_context->getClientInfo();
client_info.setInitialQuery();
client_info.query_kind = query_kind;
global_context->setQueryKindInitial();
global_context->setQueryKind(query_kind);
}

View File

@ -814,8 +814,8 @@ void DatabaseReplicated::recoverLostReplica(const ZooKeeperPtr & current_zookeep
{
auto query_context = Context::createCopy(getContext());
query_context->makeQueryContext();
query_context->getClientInfo().query_kind = ClientInfo::QueryKind::SECONDARY_QUERY;
query_context->getClientInfo().is_replicated_database_internal = true;
query_context->setQueryKind(ClientInfo::QueryKind::SECONDARY_QUERY);
query_context->setQueryKindReplicatedDatabaseInternal();
query_context->setCurrentDatabase(getDatabaseName());
query_context->setCurrentQueryId("");
auto txn = std::make_shared<ZooKeeperMetadataTransaction>(current_zookeeper, zookeeper_path, false, "");

View File

@ -59,7 +59,7 @@ static ContextMutablePtr createQueryContext(ContextPtr context)
query_context->setSettings(new_query_settings);
query_context->setInternalQuery(true);
query_context->getClientInfo().query_kind = ClientInfo::QueryKind::SECONDARY_QUERY;
query_context->setQueryKind(ClientInfo::QueryKind::SECONDARY_QUERY);
query_context->setCurrentQueryId(""); // generate random query_id
return query_context;
}

View File

@ -421,12 +421,10 @@ try
auto insert_query_id = insert_context->getCurrentQueryId();
auto query_start_time = std::chrono::system_clock::now();
Stopwatch start_watch{CLOCK_MONOTONIC};
ClientInfo & client_info = insert_context->getClientInfo();
client_info.query_kind = ClientInfo::QueryKind::INITIAL_QUERY;
client_info.initial_query_start_time = timeInSeconds(query_start_time);
client_info.initial_query_start_time_microseconds = timeInMicroseconds(query_start_time);
client_info.current_query_id = insert_query_id;
client_info.initial_query_id = insert_query_id;
insert_context->setQueryKind(ClientInfo::QueryKind::INITIAL_QUERY);
insert_context->setInitialQueryStartTime(query_start_time);
insert_context->setCurrentQueryId(insert_query_id);
insert_context->setInitialQueryId(insert_query_id);
size_t log_queries_cut_to_length = insert_context->getSettingsRef().log_queries_cut_to_length;
String query_for_logging = insert_query.hasSecretParts()
? insert_query.formatForLogging(log_queries_cut_to_length)

View File

@ -171,7 +171,7 @@ void executeQuery(
SelectStreamFactory::Shards remote_shards;
auto new_context = updateSettingsForCluster(*query_info.getCluster(), context, settings, main_table, &query_info, log);
new_context->getClientInfo().distributed_depth += 1;
new_context->increaseDistributedDepth();
size_t shards = query_info.getCluster()->getShardCount();
for (const auto & shard_info : query_info.getCluster()->getShardsInfo())

View File

@ -3850,6 +3850,129 @@ void Context::resetInputCallbacks()
}
void Context::setClientInfo(const ClientInfo & client_info_)
{
client_info = client_info_;
need_recalculate_access = true;
}
void Context::setClientName(const String & client_name)
{
client_info.client_name = client_name;
}
void Context::setClientInterface(ClientInfo::Interface interface)
{
client_info.interface = interface;
need_recalculate_access = true;
}
void Context::setClientVersion(UInt64 client_version_major, UInt64 client_version_minor, UInt64 client_version_patch, unsigned client_tcp_protocol_version)
{
client_info.client_version_major = client_version_major;
client_info.client_version_minor = client_version_minor;
client_info.client_version_patch = client_version_patch;
client_info.client_tcp_protocol_version = client_tcp_protocol_version;
}
void Context::setClientConnectionId(uint32_t connection_id_)
{
client_info.connection_id = connection_id_;
}
void Context::setHttpClientInfo(ClientInfo::HTTPMethod http_method, const String & http_user_agent, const String & http_referer)
{
client_info.http_method = http_method;
client_info.http_user_agent = http_user_agent;
client_info.http_referer = http_referer;
need_recalculate_access = true;
}
void Context::setForwardedFor(const String & forwarded_for)
{
client_info.forwarded_for = forwarded_for;
need_recalculate_access = true;
}
void Context::setQueryKind(ClientInfo::QueryKind query_kind)
{
client_info.query_kind = query_kind;
}
void Context::setQueryKindInitial()
{
/// TODO: Try to combine this function with setQueryKind().
client_info.setInitialQuery();
}
void Context::setQueryKindReplicatedDatabaseInternal()
{
/// TODO: Try to combine this function with setQueryKind().
client_info.is_replicated_database_internal = true;
}
void Context::setCurrentUserName(const String & current_user_name)
{
/// TODO: Try to combine this function with setUser().
client_info.current_user = current_user_name;
need_recalculate_access = true;
}
void Context::setCurrentAddress(const Poco::Net::SocketAddress & current_address)
{
client_info.current_address = current_address;
need_recalculate_access = true;
}
void Context::setInitialUserName(const String & initial_user_name)
{
client_info.initial_user = initial_user_name;
need_recalculate_access = true;
}
void Context::setInitialAddress(const Poco::Net::SocketAddress & initial_address)
{
client_info.initial_address = initial_address;
}
void Context::setInitialQueryId(const String & initial_query_id)
{
client_info.initial_query_id = initial_query_id;
}
void Context::setInitialQueryStartTime(std::chrono::time_point<std::chrono::system_clock> initial_query_start_time)
{
client_info.initial_query_start_time = timeInSeconds(initial_query_start_time);
client_info.initial_query_start_time_microseconds = timeInMicroseconds(initial_query_start_time);
}
void Context::setQuotaClientKey(const String & quota_key_)
{
client_info.quota_key = quota_key_;
need_recalculate_access = true;
}
void Context::setConnectionClientVersion(UInt64 client_version_major, UInt64 client_version_minor, UInt64 client_version_patch, unsigned client_tcp_protocol_version)
{
client_info.connection_client_version_major = client_version_major;
client_info.connection_client_version_minor = client_version_minor;
client_info.connection_client_version_patch = client_version_patch;
client_info.connection_tcp_protocol_version = client_tcp_protocol_version;
}
void Context::setReplicaInfo(bool collaborate_with_initiator, size_t all_replicas_count, size_t number_of_current_replica)
{
client_info.collaborate_with_initiator = collaborate_with_initiator;
client_info.count_participating_replicas = all_replicas_count;
client_info.number_of_current_replica = number_of_current_replica;
}
void Context::increaseDistributedDepth()
{
++client_info.distributed_depth;
}
StorageID Context::resolveStorageID(StorageID storage_id, StorageNamespace where) const
{
if (storage_id.uuid != UUIDHelpers::Nil)

View File

@ -593,9 +593,33 @@ public:
InputBlocksReader getInputBlocksReaderCallback() const;
void resetInputCallbacks();
ClientInfo & getClientInfo() { return client_info; }
/// Returns information about the client executing a query.
const ClientInfo & getClientInfo() const { return client_info; }
/// Modify stored in the context information about the client executing a query.
void setClientInfo(const ClientInfo & client_info_);
void setClientName(const String & client_name);
void setClientInterface(ClientInfo::Interface interface);
void setClientVersion(UInt64 client_version_major, UInt64 client_version_minor, UInt64 client_version_patch, unsigned client_tcp_protocol_version);
void setClientConnectionId(uint32_t connection_id);
void setHttpClientInfo(ClientInfo::HTTPMethod http_method, const String & http_user_agent, const String & http_referer);
void setForwardedFor(const String & forwarded_for);
void setQueryKind(ClientInfo::QueryKind query_kind);
void setQueryKindInitial();
void setQueryKindReplicatedDatabaseInternal();
void setCurrentUserName(const String & current_user_name);
void setCurrentAddress(const Poco::Net::SocketAddress & current_address);
void setInitialUserName(const String & initial_user_name);
void setInitialAddress(const Poco::Net::SocketAddress & initial_address);
void setInitialQueryId(const String & initial_query_id);
void setInitialQueryStartTime(std::chrono::time_point<std::chrono::system_clock> initial_query_start_time);
void setQuotaClientKey(const String & quota_key);
void setConnectionClientVersion(UInt64 client_version_major, UInt64 client_version_minor, UInt64 client_version_patch, unsigned client_tcp_protocol_version);
void setReplicaInfo(bool collaborate_with_initiator, size_t all_replicas_count, size_t number_of_current_replica);
void increaseDistributedDepth();
const OpenTelemetry::TracingContext & getClientTraceContext() const { return client_info.client_trace_context; }
OpenTelemetry::TracingContext & getClientTraceContext() { return client_info.client_trace_context; }
enum StorageNamespace
{
ResolveGlobal = 1u, /// Database name must be specified

View File

@ -199,7 +199,7 @@ ContextMutablePtr DDLTaskBase::makeQueryContext(ContextPtr from_context, const Z
auto query_context = Context::createCopy(from_context);
query_context->makeQueryContext();
query_context->setCurrentQueryId(""); // generate random query_id
query_context->getClientInfo().query_kind = ClientInfo::QueryKind::SECONDARY_QUERY;
query_context->setQueryKind(ClientInfo::QueryKind::SECONDARY_QUERY);
if (entry.settings)
query_context->applySettingsChanges(*entry.settings);
return query_context;
@ -439,8 +439,8 @@ void DatabaseReplicatedTask::parseQueryFromEntry(ContextPtr context)
ContextMutablePtr DatabaseReplicatedTask::makeQueryContext(ContextPtr from_context, const ZooKeeperPtr & zookeeper)
{
auto query_context = DDLTaskBase::makeQueryContext(from_context, zookeeper);
query_context->getClientInfo().query_kind = ClientInfo::QueryKind::SECONDARY_QUERY;
query_context->getClientInfo().is_replicated_database_internal = true;
query_context->setQueryKind(ClientInfo::QueryKind::SECONDARY_QUERY);
query_context->setQueryKindReplicatedDatabaseInternal();
query_context->setCurrentDatabase(database->getDatabaseName());
auto txn = std::make_shared<ZooKeeperMetadataTransaction>(zookeeper, database->zookeeper_path, is_initial_query, entry_path);

View File

@ -476,7 +476,7 @@ bool DDLWorker::tryExecuteQuery(DDLTaskBase & task, const ZooKeeperPtr & zookeep
query_context->setSetting("implicit_transaction", Field{0});
}
query_context->getClientInfo().initial_query_id = task.entry.initial_query_id;
query_context->setInitialQueryId(task.entry.initial_query_id);
if (!task.is_initial_query)
query_scope.emplace(query_context);

View File

@ -451,11 +451,11 @@ void InterpreterDropQuery::executeDropQuery(ASTDropQuery::Kind kind, ContextPtr
auto drop_context = Context::createCopy(global_context);
if (ignore_sync_setting)
drop_context->setSetting("database_atomic_wait_for_drop_and_detach_synchronously", false);
drop_context->getClientInfo().query_kind = ClientInfo::QueryKind::SECONDARY_QUERY;
drop_context->setQueryKind(ClientInfo::QueryKind::SECONDARY_QUERY);
if (auto txn = current_context->getZooKeeperMetadataTransaction())
{
/// For Replicated database
drop_context->getClientInfo().is_replicated_database_internal = true;
drop_context->setQueryKindReplicatedDatabaseInternal();
drop_context->setQueryContext(std::const_pointer_cast<Context>(current_context));
drop_context->initZooKeeperMetadataTransaction(txn, true);
}

View File

@ -3183,7 +3183,7 @@ void InterpreterSelectQuery::initSettings()
if (query.settings())
InterpreterSetQuery(query.settings(), context).executeForCurrentContext(options.ignore_setting_constraints);
auto & client_info = context->getClientInfo();
const auto & client_info = context->getClientInfo();
auto min_major = DBMS_MIN_MAJOR_VERSION_WITH_CURRENT_AGGREGATION_VARIANT_SELECTION_METHOD;
auto min_minor = DBMS_MIN_MINOR_VERSION_WITH_CURRENT_AGGREGATION_VARIANT_SELECTION_METHOD;

View File

@ -299,7 +299,10 @@ Session::~Session()
if (notified_session_log_about_login)
{
if (auto session_log = getSessionLog())
{
/// TODO: We have to ensure that the same info is added to the session log on a LoginSuccess event and on the corresponding Logout event.
session_log->addLogOut(auth_id, user, getClientInfo());
}
}
}
@ -368,17 +371,117 @@ void Session::onAuthenticationFailure(const std::optional<String> & user_name, c
}
}
ClientInfo & Session::getClientInfo()
{
/// FIXME it may produce different info for LoginSuccess and the corresponding Logout entries in the session log
return session_context ? session_context->getClientInfo() : *prepared_client_info;
}
const ClientInfo & Session::getClientInfo() const
{
return session_context ? session_context->getClientInfo() : *prepared_client_info;
}
void Session::setClientInfo(const ClientInfo & client_info)
{
if (session_context)
session_context->setClientInfo(client_info);
else
prepared_client_info = client_info;
}
void Session::setClientName(const String & client_name)
{
if (session_context)
session_context->setClientName(client_name);
else
prepared_client_info->client_name = client_name;
}
void Session::setClientInterface(ClientInfo::Interface interface)
{
if (session_context)
session_context->setClientInterface(interface);
else
prepared_client_info->interface = interface;
}
void Session::setClientVersion(UInt64 client_version_major, UInt64 client_version_minor, UInt64 client_version_patch, unsigned client_tcp_protocol_version)
{
if (session_context)
{
session_context->setClientVersion(client_version_major, client_version_minor, client_version_patch, client_tcp_protocol_version);
}
else
{
prepared_client_info->client_version_major = client_version_major;
prepared_client_info->client_version_minor = client_version_minor;
prepared_client_info->client_version_patch = client_version_patch;
prepared_client_info->client_tcp_protocol_version = client_tcp_protocol_version;
}
}
void Session::setClientConnectionId(uint32_t connection_id)
{
if (session_context)
session_context->setClientConnectionId(connection_id);
else
prepared_client_info->connection_id = connection_id;
}
void Session::setHttpClientInfo(ClientInfo::HTTPMethod http_method, const String & http_user_agent, const String & http_referer)
{
if (session_context)
{
session_context->setHttpClientInfo(http_method, http_user_agent, http_referer);
}
else
{
prepared_client_info->http_method = http_method;
prepared_client_info->http_user_agent = http_user_agent;
prepared_client_info->http_referer = http_referer;
}
}
void Session::setForwardedFor(const String & forwarded_for)
{
if (session_context)
session_context->setForwardedFor(forwarded_for);
else
prepared_client_info->forwarded_for = forwarded_for;
}
void Session::setQuotaClientKey(const String & quota_key)
{
if (session_context)
session_context->setQuotaClientKey(quota_key);
else
prepared_client_info->quota_key = quota_key;
}
void Session::setConnectionClientVersion(UInt64 client_version_major, UInt64 client_version_minor, UInt64 client_version_patch, unsigned client_tcp_protocol_version)
{
if (session_context)
{
session_context->setConnectionClientVersion(client_version_major, client_version_minor, client_version_patch, client_tcp_protocol_version);
}
else
{
prepared_client_info->connection_client_version_major = client_version_major;
prepared_client_info->connection_client_version_minor = client_version_minor;
prepared_client_info->connection_client_version_patch = client_version_patch;
prepared_client_info->connection_tcp_protocol_version = client_tcp_protocol_version;
}
}
const OpenTelemetry::TracingContext & Session::getClientTraceContext() const
{
if (session_context)
return session_context->getClientTraceContext();
return prepared_client_info->client_trace_context;
}
OpenTelemetry::TracingContext & Session::getClientTraceContext()
{
if (session_context)
return session_context->getClientTraceContext();
return prepared_client_info->client_trace_context;
}
ContextMutablePtr Session::makeSessionContext()
{
if (session_context)
@ -396,8 +499,7 @@ ContextMutablePtr Session::makeSessionContext()
new_session_context->makeSessionContext();
/// Copy prepared client info to the new session context.
auto & res_client_info = new_session_context->getClientInfo();
res_client_info = std::move(prepared_client_info).value();
new_session_context->setClientInfo(*prepared_client_info);
prepared_client_info.reset();
/// Set user information for the new context: current profiles, roles, access rights.
@ -436,8 +538,7 @@ ContextMutablePtr Session::makeSessionContext(const String & session_name_, std:
/// Copy prepared client info to the session context, no matter it's been just created or not.
/// If we continue using a previously created session context found by session ID
/// it's necessary to replace the client info in it anyway, because it contains actual connection information (client address, etc.)
auto & res_client_info = new_session_context->getClientInfo();
res_client_info = std::move(prepared_client_info).value();
new_session_context->setClientInfo(*prepared_client_info);
prepared_client_info.reset();
/// Set user information for the new context: current profiles, roles, access rights.
@ -492,27 +593,26 @@ ContextMutablePtr Session::makeQueryContextImpl(const ClientInfo * client_info_t
}
/// Copy the specified client info to the new query context.
auto & res_client_info = query_context->getClientInfo();
if (client_info_to_move)
res_client_info = std::move(*client_info_to_move);
query_context->setClientInfo(*client_info_to_move);
else if (client_info_to_copy && (client_info_to_copy != &getClientInfo()))
res_client_info = *client_info_to_copy;
query_context->setClientInfo(*client_info_to_copy);
/// Copy current user's name and address if it was authenticated after query_client_info was initialized.
if (prepared_client_info && !prepared_client_info->current_user.empty())
{
res_client_info.current_user = prepared_client_info->current_user;
res_client_info.current_address = prepared_client_info->current_address;
query_context->setCurrentUserName(prepared_client_info->current_user);
query_context->setCurrentAddress(prepared_client_info->current_address);
}
/// Set parameters of initial query.
if (res_client_info.query_kind == ClientInfo::QueryKind::NO_QUERY)
res_client_info.query_kind = ClientInfo::QueryKind::INITIAL_QUERY;
if (query_context->getClientInfo().query_kind == ClientInfo::QueryKind::NO_QUERY)
query_context->setQueryKind(ClientInfo::QueryKind::INITIAL_QUERY);
if (res_client_info.query_kind == ClientInfo::QueryKind::INITIAL_QUERY)
if (query_context->getClientInfo().query_kind == ClientInfo::QueryKind::INITIAL_QUERY)
{
res_client_info.initial_user = res_client_info.current_user;
res_client_info.initial_address = res_client_info.current_address;
query_context->setInitialUserName(query_context->getClientInfo().current_user);
query_context->setInitialAddress(query_context->getClientInfo().current_address);
}
/// Set user information for the new context: current profiles, roles, access rights.
@ -563,4 +663,3 @@ void Session::closeSession(const String & session_id)
}
}

View File

@ -54,10 +54,23 @@ public:
/// Writes a row about login failure into session log (if enabled)
void onAuthenticationFailure(const std::optional<String> & user_name, const Poco::Net::SocketAddress & address_, const Exception & e);
/// Returns a reference to session ClientInfo.
ClientInfo & getClientInfo();
/// Returns a reference to the session's ClientInfo.
const ClientInfo & getClientInfo() const;
/// Modify the session's ClientInfo.
void setClientInfo(const ClientInfo & client_info);
void setClientName(const String & client_name);
void setClientInterface(ClientInfo::Interface interface);
void setClientVersion(UInt64 client_version_major, UInt64 client_version_minor, UInt64 client_version_patch, unsigned client_tcp_protocol_version);
void setClientConnectionId(uint32_t connection_id);
void setHttpClientInfo(ClientInfo::HTTPMethod http_method, const String & http_user_agent, const String & http_referer);
void setForwardedFor(const String & forwarded_for);
void setQuotaClientKey(const String & quota_key);
void setConnectionClientVersion(UInt64 client_version_major, UInt64 client_version_minor, UInt64 client_version_patch, unsigned client_tcp_protocol_version);
const OpenTelemetry::TracingContext & getClientTraceContext() const;
OpenTelemetry::TracingContext & getClientTraceContext();
/// Makes a session context, can be used one or zero times.
/// The function also assigns an user to this context.
ContextMutablePtr makeSessionContext();

View File

@ -655,7 +655,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
/// the value passed by the client
Stopwatch start_watch{CLOCK_MONOTONIC};
auto & client_info = context->getClientInfo();
const auto & client_info = context->getClientInfo();
if (!internal)
{
@ -667,8 +667,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
// On the other hand, if it's initialized then take it as the start of the query
if (client_info.initial_query_start_time == 0)
{
client_info.initial_query_start_time = timeInSeconds(query_start_time);
client_info.initial_query_start_time_microseconds = timeInMicroseconds(query_start_time);
context->setInitialQueryStartTime(query_start_time);
}
else
{

View File

@ -72,14 +72,10 @@ std::unique_ptr<QueryPlan> createLocalPlan(
if (coordinator)
{
new_context->parallel_reading_coordinator = coordinator;
new_context->getClientInfo().interface = ClientInfo::Interface::LOCAL;
new_context->getClientInfo().collaborate_with_initiator = true;
new_context->getClientInfo().query_kind = ClientInfo::QueryKind::SECONDARY_QUERY;
new_context->getClientInfo().count_participating_replicas = replica_count;
new_context->getClientInfo().number_of_current_replica = replica_num;
new_context->getClientInfo().connection_client_version_major = DBMS_VERSION_MAJOR;
new_context->getClientInfo().connection_client_version_minor = DBMS_VERSION_MINOR;
new_context->getClientInfo().connection_tcp_protocol_version = DBMS_TCP_PROTOCOL_VERSION;
new_context->setClientInterface(ClientInfo::Interface::LOCAL);
new_context->setQueryKind(ClientInfo::QueryKind::SECONDARY_QUERY);
new_context->setReplicaInfo(true, replica_count, replica_num);
new_context->setConnectionClientVersion(DBMS_VERSION_MAJOR, DBMS_VERSION_MINOR, DBMS_VERSION_PATCH, DBMS_TCP_PROTOCOL_VERSION);
new_context->setParallelReplicasGroupUUID(group_uuid);
new_context->setMergeTreeAllRangesCallback([coordinator](InitialAllRangesAnnouncement announcement)
{

View File

@ -798,7 +798,7 @@ namespace
/// Authentication.
session.emplace(iserver.context(), ClientInfo::Interface::GRPC);
session->authenticate(user, password, user_address);
session->getClientInfo().quota_key = quota_key;
session->setQuotaClientKey(quota_key);
ClientInfo client_info = session->getClientInfo();

View File

@ -474,7 +474,6 @@ bool HTTPHandler::authenticateUser(
}
/// Set client info. It will be used for quota accounting parameters in 'setUser' method.
ClientInfo & client_info = session->getClientInfo();
ClientInfo::HTTPMethod http_method = ClientInfo::HTTPMethod::UNKNOWN;
if (request.getMethod() == HTTPServerRequest::HTTP_GET)
@ -482,15 +481,13 @@ bool HTTPHandler::authenticateUser(
else if (request.getMethod() == HTTPServerRequest::HTTP_POST)
http_method = ClientInfo::HTTPMethod::POST;
client_info.http_method = http_method;
client_info.http_user_agent = request.get("User-Agent", "");
client_info.http_referer = request.get("Referer", "");
client_info.forwarded_for = request.get("X-Forwarded-For", "");
client_info.quota_key = quota_key;
session->setHttpClientInfo(http_method, request.get("User-Agent", ""), request.get("Referer", ""));
session->setForwardedFor(request.get("X-Forwarded-For", ""));
session->setQuotaClientKey(quota_key);
/// Extract the last entry from comma separated list of forwarded_for addresses.
/// Only the last proxy can be trusted (if any).
String forwarded_address = client_info.getLastForwardedFor();
String forwarded_address = session->getClientInfo().getLastForwardedFor();
try
{
if (!forwarded_address.empty() && server.config().getBool("auth_use_forwarded_address", false))
@ -988,22 +985,22 @@ void HTTPHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse
}
// Parse the OpenTelemetry traceparent header.
ClientInfo& client_info = session->getClientInfo();
auto & client_trace_context = session->getClientTraceContext();
if (request.has("traceparent"))
{
std::string opentelemetry_traceparent = request.get("traceparent");
std::string error;
if (!client_info.client_trace_context.parseTraceparentHeader(opentelemetry_traceparent, error))
if (!client_trace_context.parseTraceparentHeader(opentelemetry_traceparent, error))
{
LOG_DEBUG(log, "Failed to parse OpenTelemetry traceparent header '{}': {}", opentelemetry_traceparent, error);
}
client_info.client_trace_context.tracestate = request.get("tracestate", "");
client_trace_context.tracestate = request.get("tracestate", "");
}
// Setup tracing context for this thread
auto context = session->sessionOrGlobalContext();
thread_trace_context = std::make_unique<OpenTelemetry::TracingContextHolder>("HTTPHandler",
client_info.client_trace_context,
client_trace_context,
context->getSettingsRef(),
context->getOpenTelemetrySpanLog());
thread_trace_context->root_span.kind = OpenTelemetry::SERVER;

View File

@ -94,7 +94,7 @@ void MySQLHandler::run()
session = std::make_unique<Session>(server.context(), ClientInfo::Interface::MYSQL);
SCOPE_EXIT({ session.reset(); });
session->getClientInfo().connection_id = connection_id;
session->setClientConnectionId(connection_id);
in = std::make_shared<ReadBufferFromPocoSocket>(socket());
out = std::make_shared<WriteBufferFromPocoSocket>(socket());

View File

@ -58,7 +58,7 @@ void PostgreSQLHandler::run()
session = std::make_unique<Session>(server.context(), ClientInfo::Interface::POSTGRESQL);
SCOPE_EXIT({ session.reset(); });
session->getClientInfo().connection_id = connection_id;
session->setClientConnectionId(connection_id);
try
{

View File

@ -1177,21 +1177,12 @@ std::unique_ptr<Session> TCPHandler::makeSession()
auto res = std::make_unique<Session>(server.context(), interface, socket().secure(), certificate);
auto & client_info = res->getClientInfo();
client_info.forwarded_for = forwarded_for;
client_info.client_name = client_name;
client_info.client_version_major = client_version_major;
client_info.client_version_minor = client_version_minor;
client_info.client_version_patch = client_version_patch;
client_info.client_tcp_protocol_version = client_tcp_protocol_version;
client_info.connection_client_version_major = client_version_major;
client_info.connection_client_version_minor = client_version_minor;
client_info.connection_client_version_patch = client_version_patch;
client_info.connection_tcp_protocol_version = client_tcp_protocol_version;
client_info.quota_key = quota_key;
client_info.interface = interface;
res->setForwardedFor(forwarded_for);
res->setClientName(client_name);
res->setClientVersion(client_version_major, client_version_minor, client_version_patch, client_tcp_protocol_version);
res->setConnectionClientVersion(client_version_major, client_version_minor, client_version_patch, client_tcp_protocol_version);
res->setQuotaClientKey(quota_key);
res->setClientInterface(interface);
return res;
}
@ -1253,7 +1244,7 @@ void TCPHandler::receiveHello()
}
session = makeSession();
auto & client_info = session->getClientInfo();
const auto & client_info = session->getClientInfo();
#if USE_SSL
/// Authentication with SSL user certificate
@ -1286,7 +1277,7 @@ void TCPHandler::receiveAddendum()
{
readStringBinary(quota_key, *in);
if (!is_interserver_mode)
session->getClientInfo().quota_key = quota_key;
session->setQuotaClientKey(quota_key);
}
}

View File

@ -132,7 +132,7 @@ DistributedSink::DistributedSink(
const auto & settings = context->getSettingsRef();
if (settings.max_distributed_depth && context->getClientInfo().distributed_depth >= settings.max_distributed_depth)
throw Exception(ErrorCodes::TOO_LARGE_DISTRIBUTED_DEPTH, "Maximum distributed depth exceeded");
context->getClientInfo().distributed_depth += 1;
context->increaseDistributedDepth();
random_shard_insert = settings.insert_distributed_one_random_shard && !storage.has_sharding_key;
}

View File

@ -914,7 +914,7 @@ std::optional<QueryPipeline> StorageDistributed::distributedWriteBetweenDistribu
QueryPipeline pipeline;
ContextMutablePtr query_context = Context::createCopy(local_context);
++query_context->getClientInfo().distributed_depth;
query_context->increaseDistributedDepth();
for (size_t shard_index : collections::range(0, shards_info.size()))
{
@ -976,7 +976,7 @@ std::optional<QueryPipeline> StorageDistributed::distributedWriteFromClusterStor
QueryPipeline pipeline;
ContextMutablePtr query_context = Context::createCopy(local_context);
++query_context->getClientInfo().distributed_depth;
query_context->increaseDistributedDepth();
/// Here we take addresses from destination cluster and assume source table exists on these nodes
for (const auto & replicas : getCluster()->getShardsAddresses())

View File

@ -5079,7 +5079,7 @@ std::optional<QueryPipeline> StorageReplicatedMergeTree::distributedWriteFromClu
QueryPipeline pipeline;
ContextMutablePtr query_context = Context::createCopy(local_context);
++query_context->getClientInfo().distributed_depth;
query_context->increaseDistributedDepth();
for (const auto & replicas : src_cluster->getShardsAddresses())
{

View File

@ -992,7 +992,7 @@ void StorageWindowView::cleanup()
auto cleanup_context = Context::createCopy(getContext());
cleanup_context->makeQueryContext();
cleanup_context->setCurrentQueryId("");
cleanup_context->getClientInfo().is_replicated_database_internal = true;
cleanup_context->setQueryKindReplicatedDatabaseInternal();
InterpreterAlterQuery interpreter_alter(alter_query, cleanup_context);
interpreter_alter.execute();