mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 07:01:59 +00:00
Merge pull request #70332 from zvonand/ldap-remote-roles
Passing external user roles from query originator to other nodes
This commit is contained in:
commit
07be02d297
@ -4215,3 +4215,9 @@ SELECT toFloat64('1.7091'), toFloat64('1.5008753E7') SETTINGS precise_float_pars
|
||||
│ 1.7091 │ 15008753 │
|
||||
└─────────────────────┴──────────────────────────┘
|
||||
```
|
||||
|
||||
## push_external_roles_in_interserver_queries
|
||||
|
||||
Позволяет передавать роли пользователя от инициатора запроса другим нодам при выполнении запроса.
|
||||
|
||||
Значение по умолчанию: `true`.
|
||||
|
@ -220,7 +220,7 @@ std::vector<String> Client::loadWarningMessages()
|
||||
"" /* query_id */,
|
||||
QueryProcessingStage::Complete,
|
||||
&client_context->getSettingsRef(),
|
||||
&client_context->getClientInfo(), false, {});
|
||||
&client_context->getClientInfo(), false, {}, {});
|
||||
while (true)
|
||||
{
|
||||
Packet packet = connection->receivePacket();
|
||||
|
@ -366,6 +366,13 @@ void ContextAccess::setUser(const UserPtr & user_) const
|
||||
current_roles_with_admin_option = user->granted_roles.findGrantedWithAdminOption(*params.current_roles);
|
||||
}
|
||||
|
||||
if (params.external_roles && !params.external_roles->empty())
|
||||
{
|
||||
current_roles.insert(current_roles.end(), params.external_roles->begin(), params.external_roles->end());
|
||||
auto new_granted_with_admin_option = user->granted_roles.findGrantedWithAdminOption(*params.external_roles);
|
||||
current_roles_with_admin_option.insert(current_roles_with_admin_option.end(), new_granted_with_admin_option.begin(), new_granted_with_admin_option.end());
|
||||
}
|
||||
|
||||
subscription_for_roles_changes.reset();
|
||||
enabled_roles = access_control->getEnabledRoles(current_roles, current_roles_with_admin_option);
|
||||
subscription_for_roles_changes = enabled_roles->subscribeForChanges([weak_ptr = weak_from_this()](const std::shared_ptr<const EnabledRolesInfo> & roles_info_)
|
||||
@ -516,7 +523,6 @@ std::optional<QuotaUsage> ContextAccess::getQuotaUsage() const
|
||||
return getQuota()->getUsage();
|
||||
}
|
||||
|
||||
|
||||
SettingsChanges ContextAccess::getDefaultSettings() const
|
||||
{
|
||||
std::lock_guard lock{mutex};
|
||||
|
@ -18,6 +18,7 @@ ContextAccessParams::ContextAccessParams(
|
||||
bool full_access_,
|
||||
bool use_default_roles_,
|
||||
const std::shared_ptr<const std::vector<UUID>> & current_roles_,
|
||||
const std::shared_ptr<const std::vector<UUID>> & external_roles_,
|
||||
const Settings & settings_,
|
||||
const String & current_database_,
|
||||
const ClientInfo & client_info_)
|
||||
@ -25,6 +26,7 @@ ContextAccessParams::ContextAccessParams(
|
||||
, full_access(full_access_)
|
||||
, use_default_roles(use_default_roles_)
|
||||
, current_roles(current_roles_)
|
||||
, external_roles(external_roles_)
|
||||
, readonly(settings_[Setting::readonly])
|
||||
, allow_ddl(settings_[Setting::allow_ddl])
|
||||
, allow_introspection(settings_[Setting::allow_introspection_functions])
|
||||
@ -59,6 +61,17 @@ String ContextAccessParams::toString() const
|
||||
}
|
||||
out << "]";
|
||||
}
|
||||
if (external_roles && !external_roles->empty())
|
||||
{
|
||||
out << separator() << "external_roles = [";
|
||||
for (size_t i = 0; i != external_roles->size(); ++i)
|
||||
{
|
||||
if (i)
|
||||
out << ", ";
|
||||
out << (*external_roles)[i];
|
||||
}
|
||||
out << "]";
|
||||
}
|
||||
if (readonly)
|
||||
out << separator() << "readonly = " << readonly;
|
||||
if (allow_ddl)
|
||||
@ -107,6 +120,7 @@ bool operator ==(const ContextAccessParams & left, const ContextAccessParams & r
|
||||
CONTEXT_ACCESS_PARAMS_EQUALS(full_access)
|
||||
CONTEXT_ACCESS_PARAMS_EQUALS(use_default_roles)
|
||||
CONTEXT_ACCESS_PARAMS_EQUALS(current_roles)
|
||||
CONTEXT_ACCESS_PARAMS_EQUALS(external_roles)
|
||||
CONTEXT_ACCESS_PARAMS_EQUALS(readonly)
|
||||
CONTEXT_ACCESS_PARAMS_EQUALS(allow_ddl)
|
||||
CONTEXT_ACCESS_PARAMS_EQUALS(allow_introspection)
|
||||
@ -157,6 +171,7 @@ bool operator <(const ContextAccessParams & left, const ContextAccessParams & ri
|
||||
CONTEXT_ACCESS_PARAMS_LESS(full_access)
|
||||
CONTEXT_ACCESS_PARAMS_LESS(use_default_roles)
|
||||
CONTEXT_ACCESS_PARAMS_LESS(current_roles)
|
||||
CONTEXT_ACCESS_PARAMS_LESS(external_roles)
|
||||
CONTEXT_ACCESS_PARAMS_LESS(readonly)
|
||||
CONTEXT_ACCESS_PARAMS_LESS(allow_ddl)
|
||||
CONTEXT_ACCESS_PARAMS_LESS(allow_introspection)
|
||||
|
@ -19,6 +19,7 @@ public:
|
||||
bool full_access_,
|
||||
bool use_default_roles_,
|
||||
const std::shared_ptr<const std::vector<UUID>> & current_roles_,
|
||||
const std::shared_ptr<const std::vector<UUID>> & external_roles_,
|
||||
const Settings & settings_,
|
||||
const String & current_database_,
|
||||
const ClientInfo & client_info_);
|
||||
@ -31,6 +32,7 @@ public:
|
||||
|
||||
const bool use_default_roles;
|
||||
const std::shared_ptr<const std::vector<UUID>> current_roles;
|
||||
const std::shared_ptr<const std::vector<UUID>> external_roles;
|
||||
|
||||
const UInt64 readonly;
|
||||
const bool allow_ddl;
|
||||
|
@ -26,7 +26,6 @@ namespace ErrorCodes
|
||||
extern const int BAD_ARGUMENTS;
|
||||
}
|
||||
|
||||
|
||||
LDAPAccessStorage::LDAPAccessStorage(const String & storage_name_, AccessControl & access_control_, const Poco::Util::AbstractConfiguration & config, const String & prefix)
|
||||
: IAccessStorage(storage_name_), access_control(access_control_), memory_storage(storage_name_, access_control.getChangesNotifier(), false)
|
||||
{
|
||||
@ -320,6 +319,10 @@ std::set<String> LDAPAccessStorage::mapExternalRolesNoLock(const LDAPClient::Sea
|
||||
{
|
||||
std::set<String> role_names;
|
||||
|
||||
// If this node can't access LDAP server (or has not privileges to fetch roles) and gets empty list of external roles
|
||||
if (external_roles.empty())
|
||||
return role_names;
|
||||
|
||||
if (external_roles.size() != role_search_params.size())
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unable to map external roles");
|
||||
|
||||
|
@ -1121,6 +1121,7 @@ void ClientBase::processOrdinaryQuery(const String & query_to_execute, ASTPtr pa
|
||||
&client_context->getSettingsRef(),
|
||||
&client_context->getClientInfo(),
|
||||
true,
|
||||
{},
|
||||
[&](const Progress & progress) { onProgress(progress); });
|
||||
|
||||
if (send_external_tables)
|
||||
@ -1624,6 +1625,7 @@ void ClientBase::processInsertQuery(const String & query_to_execute, ASTPtr pars
|
||||
&client_context->getSettingsRef(),
|
||||
&client_context->getClientInfo(),
|
||||
true,
|
||||
{},
|
||||
[&](const Progress & progress) { onProgress(progress); });
|
||||
|
||||
if (send_external_tables)
|
||||
|
@ -14,6 +14,7 @@
|
||||
#include <Client/ClientBase.h>
|
||||
#include <Client/Connection.h>
|
||||
#include <Client/ConnectionParameters.h>
|
||||
#include "Common/logger_useful.h"
|
||||
#include <Common/ClickHouseRevision.h>
|
||||
#include <Common/Exception.h>
|
||||
#include <Common/NetException.h>
|
||||
@ -22,8 +23,8 @@
|
||||
#include <Common/StringUtils.h>
|
||||
#include <Common/OpenSSLHelpers.h>
|
||||
#include <Common/randomSeed.h>
|
||||
#include <Common/logger_useful.h>
|
||||
#include <Core/Block.h>
|
||||
#include <Core/ProtocolDefines.h>
|
||||
#include <Interpreters/ClientInfo.h>
|
||||
#include <Interpreters/OpenTelemetrySpanLog.h>
|
||||
#include <Compression/CompressionFactory.h>
|
||||
@ -752,6 +753,7 @@ void Connection::sendQuery(
|
||||
const Settings * settings,
|
||||
const ClientInfo * client_info,
|
||||
bool with_pending_data,
|
||||
const std::vector<String> & external_roles,
|
||||
std::function<void(const Progress &)>)
|
||||
{
|
||||
OpenTelemetry::SpanHolder span("Connection::sendQuery()", OpenTelemetry::SpanKind::CLIENT);
|
||||
@ -824,6 +826,18 @@ void Connection::sendQuery(
|
||||
else
|
||||
writeStringBinary("" /* empty string is a marker of the end of settings */, *out);
|
||||
|
||||
String external_roles_str;
|
||||
if (server_revision >= DBMS_MIN_PROTOCOL_VERSION_WITH_INTERSERVER_EXTERNALLY_GRANTED_ROLES)
|
||||
{
|
||||
WriteBufferFromString buffer(external_roles_str);
|
||||
writeVectorBinary(external_roles, buffer);
|
||||
buffer.finalize();
|
||||
|
||||
LOG_TRACE(log_wrapper.get(), "Sending external_roles with query: [{}] ({})", fmt::join(external_roles, ", "), external_roles.size());
|
||||
|
||||
writeStringBinary(external_roles_str, *out);
|
||||
}
|
||||
|
||||
/// Interserver secret
|
||||
if (server_revision >= DBMS_MIN_REVISION_WITH_INTERSERVER_SECRET)
|
||||
{
|
||||
@ -844,6 +858,9 @@ void Connection::sendQuery(
|
||||
data += query;
|
||||
data += query_id;
|
||||
data += client_info->initial_user;
|
||||
// Also for backwards compatibility
|
||||
if (server_revision >= DBMS_MIN_PROTOCOL_VERSION_WITH_INTERSERVER_EXTERNALLY_GRANTED_ROLES)
|
||||
data += external_roles_str;
|
||||
/// TODO: add source/target host/ip-address
|
||||
|
||||
std::string hash = encodeSHA256(data);
|
||||
|
@ -108,6 +108,7 @@ public:
|
||||
const Settings * settings/* = nullptr */,
|
||||
const ClientInfo * client_info/* = nullptr */,
|
||||
bool with_pending_data/* = false */,
|
||||
const std::vector<String> & external_roles,
|
||||
std::function<void(const Progress &)> process_progress_callback) override;
|
||||
|
||||
void sendCancel() override;
|
||||
|
@ -161,7 +161,8 @@ void HedgedConnections::sendQuery(
|
||||
const String & query_id,
|
||||
UInt64 stage,
|
||||
ClientInfo & client_info,
|
||||
bool with_pending_data)
|
||||
bool with_pending_data,
|
||||
const std::vector<String> & external_roles)
|
||||
{
|
||||
std::lock_guard lock(cancel_mutex);
|
||||
|
||||
@ -188,7 +189,7 @@ void HedgedConnections::sendQuery(
|
||||
hedged_connections_factory.skipReplicasWithTwoLevelAggregationIncompatibility();
|
||||
}
|
||||
|
||||
auto send_query = [this, timeouts, query, query_id, stage, client_info, with_pending_data](ReplicaState & replica)
|
||||
auto send_query = [this, timeouts, query, query_id, stage, client_info, with_pending_data, external_roles](ReplicaState & replica)
|
||||
{
|
||||
Settings modified_settings = settings;
|
||||
|
||||
@ -218,7 +219,8 @@ void HedgedConnections::sendQuery(
|
||||
modified_settings.set("allow_experimental_analyzer", static_cast<bool>(modified_settings[Setting::allow_experimental_analyzer]));
|
||||
|
||||
replica.connection->sendQuery(
|
||||
timeouts, query, /* query_parameters */ {}, query_id, stage, &modified_settings, &client_info, with_pending_data, {});
|
||||
timeouts, query, /* query_parameters */ {}, query_id, stage, &modified_settings, &client_info, with_pending_data, external_roles, {});
|
||||
|
||||
replica.change_replica_timeout.setRelative(timeouts.receive_data_timeout);
|
||||
replica.packet_receiver->setTimeout(hedged_connections_factory.getConnectionTimeouts().receive_timeout);
|
||||
};
|
||||
|
@ -90,7 +90,8 @@ public:
|
||||
const String & query_id,
|
||||
UInt64 stage,
|
||||
ClientInfo & client_info,
|
||||
bool with_pending_data) override;
|
||||
bool with_pending_data,
|
||||
const std::vector<String> & external_roles) override;
|
||||
|
||||
void sendReadTaskResponse(const String &) override
|
||||
{
|
||||
|
@ -23,7 +23,8 @@ public:
|
||||
const String & query_id,
|
||||
UInt64 stage,
|
||||
ClientInfo & client_info,
|
||||
bool with_pending_data) = 0;
|
||||
bool with_pending_data,
|
||||
const std::vector<String> & external_roles) = 0;
|
||||
|
||||
virtual void sendReadTaskResponse(const String &) = 0;
|
||||
virtual void sendMergeTreeReadTaskResponse(const ParallelReadResponse & response) = 0;
|
||||
|
@ -100,6 +100,7 @@ public:
|
||||
const Settings * settings,
|
||||
const ClientInfo * client_info,
|
||||
bool with_pending_data,
|
||||
const std::vector<String> & external_roles,
|
||||
std::function<void(const Progress &)> process_progress_callback) = 0;
|
||||
|
||||
virtual void sendCancel() = 0;
|
||||
|
@ -106,6 +106,7 @@ void LocalConnection::sendQuery(
|
||||
const Settings *,
|
||||
const ClientInfo * client_info,
|
||||
bool,
|
||||
const std::vector<String> & /*external_roles*/,
|
||||
std::function<void(const Progress &)> process_progress_callback)
|
||||
{
|
||||
/// Last query may not have been finished or cancelled due to exception on client side.
|
||||
|
@ -114,6 +114,7 @@ public:
|
||||
const Settings * settings/* = nullptr */,
|
||||
const ClientInfo * client_info/* = nullptr */,
|
||||
bool with_pending_data/* = false */,
|
||||
const std::vector<String> & external_roles,
|
||||
std::function<void(const Progress &)> process_progress_callback) override;
|
||||
|
||||
void sendCancel() override;
|
||||
|
@ -128,7 +128,8 @@ void MultiplexedConnections::sendQuery(
|
||||
const String & query_id,
|
||||
UInt64 stage,
|
||||
ClientInfo & client_info,
|
||||
bool with_pending_data)
|
||||
bool with_pending_data,
|
||||
const std::vector<String> & external_roles)
|
||||
{
|
||||
std::lock_guard lock(cancel_mutex);
|
||||
|
||||
@ -181,14 +182,14 @@ void MultiplexedConnections::sendQuery(
|
||||
modified_settings[Setting::parallel_replica_offset] = i;
|
||||
|
||||
replica_states[i].connection->sendQuery(
|
||||
timeouts, query, /* query_parameters */ {}, query_id, stage, &modified_settings, &client_info, with_pending_data, {});
|
||||
timeouts, query, /* query_parameters */ {}, query_id, stage, &modified_settings, &client_info, with_pending_data, external_roles, {});
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
/// Use single replica.
|
||||
replica_states[0].connection->sendQuery(
|
||||
timeouts, query, /* query_parameters */ {}, query_id, stage, &modified_settings, &client_info, with_pending_data, {});
|
||||
timeouts, query, /* query_parameters */ {}, query_id, stage, &modified_settings, &client_info, with_pending_data, external_roles, {});
|
||||
}
|
||||
|
||||
sent_query = true;
|
||||
|
@ -36,7 +36,8 @@ public:
|
||||
const String & query_id,
|
||||
UInt64 stage,
|
||||
ClientInfo & client_info,
|
||||
bool with_pending_data) override;
|
||||
bool with_pending_data,
|
||||
const std::vector<String> & external_roles) override;
|
||||
|
||||
void sendReadTaskResponse(const String &) override;
|
||||
void sendMergeTreeReadTaskResponse(const ParallelReadResponse & response) override;
|
||||
|
@ -163,7 +163,7 @@ void Suggest::load(IServerConnection & connection,
|
||||
void Suggest::fetch(IServerConnection & connection, const ConnectionTimeouts & timeouts, const std::string & query, const ClientInfo & client_info)
|
||||
{
|
||||
connection.sendQuery(
|
||||
timeouts, query, {} /* query_parameters */, "" /* query_id */, QueryProcessingStage::Complete, nullptr, &client_info, false, {});
|
||||
timeouts, query, {} /* query_parameters */, "" /* query_id */, QueryProcessingStage::Complete, nullptr, &client_info, false, {} /* external_roles*/, {});
|
||||
|
||||
while (true)
|
||||
{
|
||||
|
@ -90,6 +90,9 @@ static constexpr auto DBMS_MIN_PROTOCOL_VERSION_WITH_CHUNKED_PACKETS = 54470;
|
||||
|
||||
static constexpr auto DBMS_MIN_REVISION_WITH_VERSIONED_PARALLEL_REPLICAS_PROTOCOL = 54471;
|
||||
|
||||
/// Push externally granted roles to other nodes
|
||||
static constexpr auto DBMS_MIN_PROTOCOL_VERSION_WITH_INTERSERVER_EXTERNALLY_GRANTED_ROLES = 54472;
|
||||
|
||||
/// Version of ClickHouse TCP protocol.
|
||||
///
|
||||
/// Should be incremented manually on protocol changes.
|
||||
@ -97,6 +100,6 @@ static constexpr auto DBMS_MIN_REVISION_WITH_VERSIONED_PARALLEL_REPLICAS_PROTOCO
|
||||
/// NOTE: DBMS_TCP_PROTOCOL_VERSION has nothing common with VERSION_REVISION,
|
||||
/// later is just a number for server version (one number instead of commit SHA)
|
||||
/// for simplicity (sometimes it may be more convenient in some use cases).
|
||||
static constexpr auto DBMS_TCP_PROTOCOL_VERSION = 54471;
|
||||
static constexpr auto DBMS_TCP_PROTOCOL_VERSION = 54472;
|
||||
|
||||
}
|
||||
|
@ -5734,6 +5734,9 @@ If enabled, MongoDB tables will return an error when a MongoDB query cannot be b
|
||||
Allow writing simple SELECT queries without the leading SELECT keyword, which makes it simple for calculator-style usage, e.g. `1 + 2` becomes a valid query.
|
||||
|
||||
In `clickhouse-local` it is enabled by default and can be explicitly disabled.
|
||||
)", 0) \
|
||||
DECLARE(Bool, push_external_roles_in_interserver_queries, true, R"(
|
||||
Enable pushing user roles from originator to other nodes while performing a query.
|
||||
)", 0) \
|
||||
\
|
||||
\
|
||||
|
@ -88,6 +88,7 @@ static std::initializer_list<std::pair<ClickHouseVersion, SettingsChangesHistory
|
||||
{"filesystem_cache_prefer_bigger_buffer_size", true, true, "New setting"},
|
||||
{"read_in_order_use_virtual_row", false, false, "Use virtual row while reading in order of primary key or its monotonic function fashion. It is useful when searching over multiple parts as only relevant ones are touched."},
|
||||
{"filesystem_cache_boundary_alignment", 0, 0, "New setting"},
|
||||
{"push_external_roles_in_interserver_queries", false, false, "New setting."},
|
||||
}
|
||||
},
|
||||
{"24.10",
|
||||
|
@ -1493,7 +1493,7 @@ ConfigurationPtr Context::getUsersConfig()
|
||||
return shared->users_config;
|
||||
}
|
||||
|
||||
void Context::setUser(const UUID & user_id_)
|
||||
void Context::setUser(const UUID & user_id_, const std::vector<UUID> & external_roles_)
|
||||
{
|
||||
/// Prepare lists of user's profiles, constraints, settings, roles.
|
||||
/// NOTE: AccessControl::read<User>() and other AccessControl's functions may require some IO work,
|
||||
@ -1508,7 +1508,6 @@ void Context::setUser(const UUID & user_id_)
|
||||
const auto & database = user->default_database;
|
||||
|
||||
/// Apply user's profiles, constraints, settings, roles.
|
||||
|
||||
std::lock_guard lock(mutex);
|
||||
|
||||
setUserIDWithLock(user_id_, lock);
|
||||
@ -1518,6 +1517,7 @@ void Context::setUser(const UUID & user_id_)
|
||||
setCurrentProfilesWithLock(*enabled_profiles, /* check_constraints= */ false, lock);
|
||||
|
||||
setCurrentRolesWithLock(default_roles, lock);
|
||||
setExternalRolesWithLock(external_roles_, lock);
|
||||
|
||||
/// It's optional to specify the DEFAULT DATABASE in the user's definition.
|
||||
if (!database.empty())
|
||||
@ -1561,6 +1561,18 @@ void Context::setCurrentRolesWithLock(const std::vector<UUID> & new_current_role
|
||||
need_recalculate_access = true;
|
||||
}
|
||||
|
||||
void Context::setExternalRolesWithLock(const std::vector<UUID> & new_external_roles, const std::lock_guard<ContextSharedMutex> &)
|
||||
{
|
||||
if (!new_external_roles.empty())
|
||||
{
|
||||
if (current_roles)
|
||||
current_roles->insert(current_roles->end(), new_external_roles.begin(), new_external_roles.end());
|
||||
else
|
||||
current_roles = std::make_shared<std::vector<UUID>>(new_external_roles);
|
||||
need_recalculate_access = true;
|
||||
}
|
||||
}
|
||||
|
||||
void Context::setCurrentRolesImpl(const std::vector<UUID> & new_current_roles, bool throw_if_not_granted, bool skip_if_not_granted, const std::shared_ptr<const User> & user)
|
||||
{
|
||||
if (skip_if_not_granted)
|
||||
@ -1675,7 +1687,7 @@ std::shared_ptr<const ContextAccessWrapper> Context::getAccess() const
|
||||
bool full_access = !user_id;
|
||||
|
||||
return ContextAccessParams{
|
||||
user_id, full_access, /* use_default_roles= */ false, current_roles, *settings, current_database, client_info};
|
||||
user_id, full_access, /* use_default_roles= */ false, current_roles, external_roles, *settings, current_database, client_info};
|
||||
};
|
||||
|
||||
/// Check if the current access rights are still valid, otherwise get parameters for recalculating access rights.
|
||||
|
@ -289,6 +289,7 @@ protected:
|
||||
|
||||
std::optional<UUID> user_id;
|
||||
std::shared_ptr<std::vector<UUID>> current_roles;
|
||||
std::shared_ptr<std::vector<UUID>> external_roles;
|
||||
std::shared_ptr<const SettingsConstraintsAndProfileIDs> settings_constraints_and_current_profiles;
|
||||
mutable std::shared_ptr<const ContextAccess> access;
|
||||
mutable bool need_recalculate_access = true;
|
||||
@ -634,7 +635,7 @@ public:
|
||||
|
||||
/// Sets the current user assuming that he/she is already authenticated.
|
||||
/// WARNING: This function doesn't check password!
|
||||
void setUser(const UUID & user_id_);
|
||||
void setUser(const UUID & user_id_, const std::vector<UUID> & external_roles_ = {});
|
||||
UserPtr getUser() const;
|
||||
|
||||
std::optional<UUID> getUserID() const;
|
||||
@ -1398,6 +1399,8 @@ private:
|
||||
|
||||
void setCurrentRolesWithLock(const std::vector<UUID> & new_current_roles, const std::lock_guard<ContextSharedMutex> & lock);
|
||||
|
||||
void setExternalRolesWithLock(const std::vector<UUID> & new_external_roles, const std::lock_guard<ContextSharedMutex> & lock);
|
||||
|
||||
void setSettingWithLock(std::string_view name, const String & value, const std::lock_guard<ContextSharedMutex> & lock);
|
||||
|
||||
void setSettingWithLock(std::string_view name, const Field & value, const std::lock_guard<ContextSharedMutex> & lock);
|
||||
|
@ -6,6 +6,8 @@
|
||||
#include <Access/ContextAccess.h>
|
||||
#include <Access/SettingsProfilesInfo.h>
|
||||
#include <Access/User.h>
|
||||
#include <Access/Role.h>
|
||||
#include <Common/typeid_cast.h>
|
||||
#include <Common/logger_useful.h>
|
||||
#include <Common/Exception.h>
|
||||
#include <Common/ThreadPool.h>
|
||||
@ -25,12 +27,12 @@
|
||||
#include <unordered_map>
|
||||
#include <vector>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
namespace Setting
|
||||
{
|
||||
extern const SettingsUInt64 max_sessions_for_user;
|
||||
extern const SettingsBool push_external_roles_in_interserver_queries;
|
||||
}
|
||||
|
||||
namespace ErrorCodes
|
||||
@ -288,7 +290,7 @@ void Session::shutdownNamedSessions()
|
||||
Session::Session(const ContextPtr & global_context_, ClientInfo::Interface interface_, bool is_secure, const std::string & certificate)
|
||||
: auth_id(UUIDHelpers::generateV4()),
|
||||
global_context(global_context_),
|
||||
log(getLogger(String{magic_enum::enum_name(interface_)} + "-Session"))
|
||||
log(getLogger(String{magic_enum::enum_name(interface_)} + "-Session-" + toString(auth_id)))
|
||||
{
|
||||
prepared_client_info.emplace();
|
||||
prepared_client_info->interface = interface_;
|
||||
@ -342,12 +344,12 @@ std::unordered_set<AuthenticationType> Session::getAuthenticationTypesOrLogInFai
|
||||
}
|
||||
}
|
||||
|
||||
void Session::authenticate(const String & user_name, const String & password, const Poco::Net::SocketAddress & address)
|
||||
void Session::authenticate(const String & user_name, const String & password, const Poco::Net::SocketAddress & address, const Strings & external_roles_)
|
||||
{
|
||||
authenticate(BasicCredentials{user_name, password}, address);
|
||||
authenticate(BasicCredentials{user_name, password}, address, external_roles_);
|
||||
}
|
||||
|
||||
void Session::authenticate(const Credentials & credentials_, const Poco::Net::SocketAddress & address_)
|
||||
void Session::authenticate(const Credentials & credentials_, const Poco::Net::SocketAddress & address_, const Strings & external_roles_)
|
||||
{
|
||||
if (session_context)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "If there is a session context it must be created after authentication");
|
||||
@ -359,8 +361,8 @@ void Session::authenticate(const Credentials & credentials_, const Poco::Net::So
|
||||
if ((address == Poco::Net::SocketAddress{}) && (prepared_client_info->interface == ClientInfo::Interface::LOCAL))
|
||||
address = Poco::Net::SocketAddress{"127.0.0.1", 0};
|
||||
|
||||
LOG_DEBUG(log, "{} Authenticating user '{}' from {}",
|
||||
toString(auth_id), credentials_.getUserName(), address.toString());
|
||||
LOG_DEBUG(log, "Authenticating user '{}' from {}",
|
||||
credentials_.getUserName(), address.toString());
|
||||
|
||||
try
|
||||
{
|
||||
@ -370,6 +372,14 @@ void Session::authenticate(const Credentials & credentials_, const Poco::Net::So
|
||||
settings_from_auth_server = auth_result.settings;
|
||||
LOG_DEBUG(log, "{} Authenticated with global context as user {}",
|
||||
toString(auth_id), toString(*user_id));
|
||||
|
||||
if (!external_roles_.empty() && global_context->getSettingsRef()[Setting::push_external_roles_in_interserver_queries])
|
||||
{
|
||||
external_roles = global_context->getAccessControl().find<Role>(external_roles_);
|
||||
|
||||
LOG_DEBUG(log, "User {} has external_roles applied: [{}] ({})",
|
||||
toString(*user_id), fmt::join(external_roles_, ", "), external_roles_.size());
|
||||
}
|
||||
}
|
||||
catch (const Exception & e)
|
||||
{
|
||||
@ -394,7 +404,7 @@ void Session::checkIfUserIsStillValid()
|
||||
|
||||
void Session::onAuthenticationFailure(const std::optional<String> & user_name, const Poco::Net::SocketAddress & address_, const Exception & e)
|
||||
{
|
||||
LOG_DEBUG(log, "{} Authentication failed with error: {}", toString(auth_id), e.what());
|
||||
LOG_DEBUG(log, "Authentication failed with error: {}", e.what());
|
||||
if (auto session_log = getSessionLog())
|
||||
{
|
||||
/// Add source address to the log
|
||||
@ -520,8 +530,8 @@ ContextMutablePtr Session::makeSessionContext()
|
||||
if (session_tracker_handle)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Session tracker handle was created before making session");
|
||||
|
||||
LOG_DEBUG(log, "{} Creating session context with user_id: {}",
|
||||
toString(auth_id), toString(*user_id));
|
||||
LOG_DEBUG(log, "Creating session context with user_id: {}",
|
||||
toString(*user_id));
|
||||
/// Make a new session context.
|
||||
ContextMutablePtr new_session_context;
|
||||
new_session_context = Context::createCopy(global_context);
|
||||
@ -532,7 +542,7 @@ ContextMutablePtr Session::makeSessionContext()
|
||||
prepared_client_info.reset();
|
||||
|
||||
/// Set user information for the new context: current profiles, roles, access rights.
|
||||
new_session_context->setUser(*user_id);
|
||||
new_session_context->setUser(*user_id, external_roles);
|
||||
|
||||
/// Session context is ready.
|
||||
session_context = new_session_context;
|
||||
@ -563,8 +573,8 @@ ContextMutablePtr Session::makeSessionContext(const String & session_name_, std:
|
||||
if (session_tracker_handle)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Session tracker handle was created before making session");
|
||||
|
||||
LOG_DEBUG(log, "{} Creating named session context with name: {}, user_id: {}",
|
||||
toString(auth_id), session_name_, toString(*user_id));
|
||||
LOG_DEBUG(log, "Creating named session context with name: {}, user_id: {}",
|
||||
session_name_, toString(*user_id));
|
||||
|
||||
/// Make a new session context OR
|
||||
/// if the `session_id` and `user_id` were used before then just get a previously created session context.
|
||||
@ -587,7 +597,7 @@ ContextMutablePtr Session::makeSessionContext(const String & session_name_, std:
|
||||
/// Set user information for the new context: current profiles, roles, access rights.
|
||||
if (!access->tryGetUser())
|
||||
{
|
||||
new_session_context->setUser(*user_id);
|
||||
new_session_context->setUser(*user_id, external_roles);
|
||||
max_sessions_for_user = new_session_context->getSettingsRef()[Setting::max_sessions_for_user];
|
||||
}
|
||||
else
|
||||
@ -639,7 +649,7 @@ ContextMutablePtr Session::makeQueryContextImpl(const ClientInfo * client_info_t
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Query context must be created after authentication");
|
||||
|
||||
/// We can create a query context either from a session context or from a global context.
|
||||
bool from_session_context = static_cast<bool>(session_context);
|
||||
const bool from_session_context = static_cast<bool>(session_context);
|
||||
|
||||
/// Create a new query context.
|
||||
ContextMutablePtr query_context = Context::createCopy(from_session_context ? session_context : global_context);
|
||||
@ -679,7 +689,7 @@ ContextMutablePtr Session::makeQueryContextImpl(const ClientInfo * client_info_t
|
||||
|
||||
/// Set user information for the new context: current profiles, roles, access rights.
|
||||
if (user_id && !query_context->getAccess()->tryGetUser())
|
||||
query_context->setUser(*user_id);
|
||||
query_context->setUser(*user_id, external_roles);
|
||||
|
||||
/// Query context is ready.
|
||||
query_context_created = true;
|
||||
|
@ -10,6 +10,7 @@
|
||||
#include <memory>
|
||||
#include <mutex>
|
||||
#include <optional>
|
||||
#include <vector>
|
||||
|
||||
namespace Poco::Net { class SocketAddress; }
|
||||
|
||||
@ -50,8 +51,11 @@ public:
|
||||
|
||||
/// Sets the current user, checks the credentials and that the specified address is allowed to connect from.
|
||||
/// The function throws an exception if there is no such user or password is wrong.
|
||||
void authenticate(const String & user_name, const String & password, const Poco::Net::SocketAddress & address);
|
||||
void authenticate(const Credentials & credentials_, const Poco::Net::SocketAddress & address_);
|
||||
void authenticate(const String & user_name, const String & password, const Poco::Net::SocketAddress & address, const Strings & external_roles_ = {});
|
||||
|
||||
/// `external_roles_` names of the additional roles (over what is granted via local access control mechanisms) that would be granted to user during this session.
|
||||
/// Role is not granted if it can't be found by name via AccessControl (i.e. doesn't exist on this instance).
|
||||
void authenticate(const Credentials & credentials_, const Poco::Net::SocketAddress & address_, const Strings & external_roles_ = {});
|
||||
|
||||
// Verifies whether the user's validity extends beyond the current time.
|
||||
// Throws an exception if the user's validity has expired.
|
||||
@ -112,6 +116,7 @@ private:
|
||||
|
||||
mutable UserPtr user;
|
||||
std::optional<UUID> user_id;
|
||||
std::vector<UUID> external_roles;
|
||||
AuthenticationData user_authenticated_with;
|
||||
|
||||
ContextMutablePtr session_context;
|
||||
|
@ -56,8 +56,9 @@ RemoteInserter::RemoteInserter(
|
||||
/** Send query and receive "header", that describes table structure.
|
||||
* Header is needed to know, what structure is required for blocks to be passed to 'write' method.
|
||||
*/
|
||||
/// TODO (vnemkov): figure out should we pass additional roles in this case or not.
|
||||
connection.sendQuery(
|
||||
timeouts, query, /* query_parameters */ {}, "", QueryProcessingStage::Complete, &settings, &modified_client_info, false, {});
|
||||
timeouts, query, /* query_parameters */ {}, "", QueryProcessingStage::Complete, &settings, &modified_client_info, false, /* external_roles */ {}, {});
|
||||
|
||||
while (true)
|
||||
{
|
||||
|
@ -22,8 +22,12 @@
|
||||
#include <Client/MultiplexedConnections.h>
|
||||
#include <Client/HedgedConnections.h>
|
||||
#include <Storages/MergeTree/MergeTreeDataPartUUID.h>
|
||||
#include <Storages/StorageMemory.h>
|
||||
#include <Storages/MergeTree/ParallelReplicasReadingCoordinator.h>
|
||||
#include <Storages/StorageMemory.h>
|
||||
|
||||
#include <Access/AccessControl.h>
|
||||
#include <Access/User.h>
|
||||
#include <Access/Role.h>
|
||||
|
||||
namespace ProfileEvents
|
||||
{
|
||||
@ -43,6 +47,7 @@ namespace Setting
|
||||
extern const SettingsBool skip_unavailable_shards;
|
||||
extern const SettingsOverflowMode timeout_overflow_mode;
|
||||
extern const SettingsBool use_hedged_requests;
|
||||
extern const SettingsBool push_external_roles_in_interserver_queries;
|
||||
}
|
||||
|
||||
namespace ErrorCodes
|
||||
@ -398,7 +403,25 @@ void RemoteQueryExecutor::sendQueryUnlocked(ClientInfo::QueryKind query_kind, As
|
||||
if (!duplicated_part_uuids.empty())
|
||||
connections->sendIgnoredPartUUIDs(duplicated_part_uuids);
|
||||
|
||||
connections->sendQuery(timeouts, query, query_id, stage, modified_client_info, true);
|
||||
// Collect all roles granted on this node and pass those to the remote node
|
||||
std::vector<String> local_granted_roles;
|
||||
if (context->getSettingsRef()[Setting::push_external_roles_in_interserver_queries] && !modified_client_info.initial_user.empty())
|
||||
{
|
||||
auto user = context->getAccessControl().read<User>(modified_client_info.initial_user, true);
|
||||
boost::container::flat_set<String> granted_roles;
|
||||
if (user)
|
||||
{
|
||||
const auto & access_control = context->getAccessControl();
|
||||
for (const auto & e : user->granted_roles.getElements())
|
||||
{
|
||||
auto names = access_control.readNames(e.ids);
|
||||
granted_roles.insert(names.begin(), names.end());
|
||||
}
|
||||
}
|
||||
local_granted_roles.insert(local_granted_roles.end(), granted_roles.begin(), granted_roles.end());
|
||||
}
|
||||
|
||||
connections->sendQuery(timeouts, query, query_id, stage, modified_client_info, true, local_granted_roles);
|
||||
|
||||
established = false;
|
||||
sent_query = true;
|
||||
|
@ -63,12 +63,17 @@
|
||||
|
||||
#include <Core/Protocol.h>
|
||||
#include <Storages/MergeTree/RequestResponse.h>
|
||||
#include <Interpreters/ClientInfo.h>
|
||||
|
||||
#include "TCPHandler.h"
|
||||
|
||||
#include <Common/config_version.h>
|
||||
|
||||
#include <fmt/format.h>
|
||||
|
||||
#include <fmt/ostream.h>
|
||||
#include <Common/StringUtils.h>
|
||||
|
||||
using namespace std::literals;
|
||||
using namespace DB;
|
||||
|
||||
@ -1960,6 +1965,13 @@ void TCPHandler::processQuery(std::optional<QueryState> & state)
|
||||
Settings passed_settings;
|
||||
passed_settings.read(*in, settings_format);
|
||||
|
||||
std::string received_extra_roles;
|
||||
// TODO: check if having `is_interserver_mode` doesn't break interoperability with the CH-client.
|
||||
if (client_tcp_protocol_version >= DBMS_MIN_PROTOCOL_VERSION_WITH_INTERSERVER_EXTERNALLY_GRANTED_ROLES)
|
||||
{
|
||||
readStringBinary(received_extra_roles, *in);
|
||||
}
|
||||
|
||||
/// Interserver secret.
|
||||
std::string received_hash;
|
||||
if (client_tcp_protocol_version >= DBMS_MIN_REVISION_WITH_INTERSERVER_SECRET)
|
||||
@ -2019,6 +2031,7 @@ void TCPHandler::processQuery(std::optional<QueryState> & state)
|
||||
data += state->query;
|
||||
data += state->query_id;
|
||||
data += client_info.initial_user;
|
||||
data += received_extra_roles;
|
||||
|
||||
std::string calculated_hash = encodeSHA256(data);
|
||||
assert(calculated_hash.size() == 32);
|
||||
@ -2039,13 +2052,25 @@ void TCPHandler::processQuery(std::optional<QueryState> & state)
|
||||
}
|
||||
else
|
||||
{
|
||||
// In a cluster, query originator may have an access to the external auth provider (like LDAP server),
|
||||
// that grants specific roles to the user. We want these roles to be granted to the user on other nodes of cluster when
|
||||
// query is executed.
|
||||
Strings external_roles;
|
||||
if (!received_extra_roles.empty())
|
||||
{
|
||||
ReadBufferFromString buffer(received_extra_roles);
|
||||
|
||||
readVectorBinary(external_roles, buffer);
|
||||
LOG_DEBUG(log, "Parsed extra roles [{}]", fmt::join(external_roles, ", "));
|
||||
}
|
||||
|
||||
LOG_DEBUG(log, "User (initial, interserver mode): {} (client: {})", client_info.initial_user, getClientAddress(client_info).toString());
|
||||
/// In case of inter-server mode authorization is done with the
|
||||
/// initial address of the client, not the real address from which
|
||||
/// the query was come, since the real address is the address of
|
||||
/// the initiator server, while we are interested in client's
|
||||
/// address.
|
||||
session->authenticate(AlwaysAllowCredentials{client_info.initial_user}, client_info.initial_address);
|
||||
session->authenticate(AlwaysAllowCredentials{client_info.initial_user}, client_info.initial_address, external_roles);
|
||||
}
|
||||
|
||||
is_interserver_authenticated = true;
|
||||
|
@ -0,0 +1,18 @@
|
||||
<clickhouse>
|
||||
<remote_servers>
|
||||
<test_ldap_cluster>
|
||||
<shard>
|
||||
<replica>
|
||||
<host>instance1</host>
|
||||
<port>9000</port>
|
||||
</replica>
|
||||
</shard>
|
||||
<shard>
|
||||
<replica>
|
||||
<host>instance2</host>
|
||||
<port>9000</port>
|
||||
</replica>
|
||||
</shard>
|
||||
</test_ldap_cluster>
|
||||
</remote_servers>
|
||||
</clickhouse>
|
@ -9,8 +9,22 @@ LDAP_ADMIN_BIND_DN = "cn=admin,dc=example,dc=org"
|
||||
LDAP_ADMIN_PASSWORD = "clickhouse"
|
||||
|
||||
cluster = ClickHouseCluster(__file__)
|
||||
instance = cluster.add_instance(
|
||||
"instance", main_configs=["configs/ldap_with_role_mapping.xml"], with_ldap=True
|
||||
|
||||
instance1 = cluster.add_instance(
|
||||
"instance1",
|
||||
main_configs=["configs/ldap_with_role_mapping.xml", "configs/remote_servers.xml"],
|
||||
macros={"shard": 1, "replica": "instance1"},
|
||||
stay_alive=True,
|
||||
with_ldap=True,
|
||||
with_zookeeper=True,
|
||||
)
|
||||
|
||||
instance2 = cluster.add_instance(
|
||||
"instance2",
|
||||
main_configs=["configs/remote_servers.xml"],
|
||||
macros={"shard": 1, "replica": "instance2"},
|
||||
stay_alive=True,
|
||||
with_zookeeper=True,
|
||||
)
|
||||
|
||||
|
||||
@ -74,59 +88,98 @@ def delete_ldap_group(ldap_cluster, group_cn):
|
||||
|
||||
|
||||
def test_authentication_pass():
|
||||
assert instance.query(
|
||||
assert instance1.query(
|
||||
"SELECT currentUser()", user="janedoe", password="qwerty"
|
||||
) == TSV([["janedoe"]])
|
||||
|
||||
|
||||
def test_authentication_fail():
|
||||
# User doesn't exist.
|
||||
assert "doesnotexist: Authentication failed" in instance.query_and_get_error(
|
||||
assert "doesnotexist: Authentication failed" in instance1.query_and_get_error(
|
||||
"SELECT currentUser()", user="doesnotexist"
|
||||
)
|
||||
|
||||
# Wrong password.
|
||||
assert "janedoe: Authentication failed" in instance.query_and_get_error(
|
||||
assert "janedoe: Authentication failed" in instance1.query_and_get_error(
|
||||
"SELECT currentUser()", user="janedoe", password="123"
|
||||
)
|
||||
|
||||
|
||||
def test_role_mapping(ldap_cluster):
|
||||
instance.query("DROP ROLE IF EXISTS role_1")
|
||||
instance.query("DROP ROLE IF EXISTS role_2")
|
||||
instance.query("DROP ROLE IF EXISTS role_3")
|
||||
instance.query("CREATE ROLE role_1")
|
||||
instance.query("CREATE ROLE role_2")
|
||||
instance1.query("DROP ROLE IF EXISTS role_1")
|
||||
instance1.query("DROP ROLE IF EXISTS role_2")
|
||||
instance1.query("DROP ROLE IF EXISTS role_3")
|
||||
instance1.query("CREATE ROLE role_1")
|
||||
instance1.query("CREATE ROLE role_2")
|
||||
add_ldap_group(ldap_cluster, group_cn="clickhouse-role_1", member_cn="johndoe")
|
||||
add_ldap_group(ldap_cluster, group_cn="clickhouse-role_2", member_cn="johndoe")
|
||||
|
||||
assert instance.query(
|
||||
assert instance1.query(
|
||||
"select currentUser()", user="johndoe", password="qwertz"
|
||||
) == TSV([["johndoe"]])
|
||||
|
||||
assert instance.query(
|
||||
assert instance1.query(
|
||||
"select role_name from system.current_roles ORDER BY role_name",
|
||||
user="johndoe",
|
||||
password="qwertz",
|
||||
) == TSV([["role_1"], ["role_2"]])
|
||||
|
||||
instance.query("CREATE ROLE role_3")
|
||||
instance1.query("CREATE ROLE role_3")
|
||||
add_ldap_group(ldap_cluster, group_cn="clickhouse-role_3", member_cn="johndoe")
|
||||
# Check that non-existing role in ClickHouse is ignored during role update
|
||||
# See https://github.com/ClickHouse/ClickHouse/issues/54318
|
||||
add_ldap_group(ldap_cluster, group_cn="clickhouse-role_4", member_cn="johndoe")
|
||||
|
||||
assert instance.query(
|
||||
assert instance1.query(
|
||||
"select role_name from system.current_roles ORDER BY role_name",
|
||||
user="johndoe",
|
||||
password="qwertz",
|
||||
) == TSV([["role_1"], ["role_2"], ["role_3"]])
|
||||
|
||||
instance.query("DROP ROLE role_1")
|
||||
instance.query("DROP ROLE role_2")
|
||||
instance.query("DROP ROLE role_3")
|
||||
instance1.query("DROP ROLE role_1")
|
||||
instance1.query("DROP ROLE role_2")
|
||||
instance1.query("DROP ROLE role_3")
|
||||
|
||||
delete_ldap_group(ldap_cluster, group_cn="clickhouse-role_1")
|
||||
delete_ldap_group(ldap_cluster, group_cn="clickhouse-role_2")
|
||||
delete_ldap_group(ldap_cluster, group_cn="clickhouse-role_3")
|
||||
delete_ldap_group(ldap_cluster, group_cn="clickhouse-role_4")
|
||||
|
||||
|
||||
def test_push_role_to_other_nodes(ldap_cluster):
|
||||
instance1.query("DROP TABLE IF EXISTS distributed_table SYNC")
|
||||
instance1.query("DROP TABLE IF EXISTS local_table SYNC")
|
||||
instance2.query("DROP TABLE IF EXISTS local_table SYNC")
|
||||
instance1.query("DROP ROLE IF EXISTS role_read")
|
||||
|
||||
instance1.query("CREATE ROLE role_read")
|
||||
instance1.query("GRANT SELECT ON *.* TO role_read")
|
||||
|
||||
add_ldap_group(ldap_cluster, group_cn="clickhouse-role_read", member_cn="johndoe")
|
||||
|
||||
assert instance1.query(
|
||||
"select currentUser()", user="johndoe", password="qwertz"
|
||||
) == TSV([["johndoe"]])
|
||||
|
||||
instance1.query(
|
||||
"CREATE TABLE IF NOT EXISTS local_table (id UInt32) ENGINE = MergeTree() ORDER BY id"
|
||||
)
|
||||
instance2.query(
|
||||
"CREATE TABLE IF NOT EXISTS local_table (id UInt32) ENGINE = MergeTree() ORDER BY id"
|
||||
)
|
||||
instance2.query("INSERT INTO local_table VALUES (1), (2), (3)")
|
||||
instance1.query(
|
||||
"CREATE TABLE IF NOT EXISTS distributed_table AS local_table ENGINE = Distributed(test_ldap_cluster, default, local_table)"
|
||||
)
|
||||
|
||||
result = instance1.query(
|
||||
"SELECT sum(id) FROM distributed_table", user="johndoe", password="qwertz"
|
||||
)
|
||||
assert result.strip() == "6"
|
||||
|
||||
instance1.query("DROP TABLE IF EXISTS distributed_table SYNC")
|
||||
instance1.query("DROP TABLE IF EXISTS local_table SYNC")
|
||||
instance2.query("DROP TABLE IF EXISTS local_table SYNC")
|
||||
instance2.query("DROP ROLE IF EXISTS role_read")
|
||||
|
||||
delete_ldap_group(ldap_cluster, group_cn="clickhouse-role_read")
|
||||
|
Loading…
Reference in New Issue
Block a user