Co-authored-by: Enmk <V.Nemkov@gmail.com>

fix memory access
This commit is contained in:
Andrey Zvonov 2024-10-04 12:05:59 +00:00
parent 0f4990d2e7
commit 86eb3d6425
34 changed files with 733 additions and 41 deletions

View File

@ -4215,3 +4215,9 @@ SELECT toFloat64('1.7091'), toFloat64('1.5008753E7') SETTINGS precise_float_pars
│ 1.7091 │ 15008753 │
└─────────────────────┴──────────────────────────┘
```
## push_external_roles_in_interserver_queries
Позволяет передавать роли пользователя от инициатора запроса другим нодам при выполнении запроса.
Значение по умолчанию: `true`.

View File

@ -220,7 +220,7 @@ std::vector<String> Client::loadWarningMessages()
"" /* query_id */,
QueryProcessingStage::Complete,
&client_context->getSettingsRef(),
&client_context->getClientInfo(), false, {});
&client_context->getClientInfo(), false, {}, {});
while (true)
{
Packet packet = connection->receivePacket();

View File

@ -366,6 +366,13 @@ void ContextAccess::setUser(const UserPtr & user_) const
current_roles_with_admin_option = user->granted_roles.findGrantedWithAdminOption(*params.current_roles);
}
if (params.external_roles && !params.external_roles->empty())
{
current_roles.insert(current_roles.end(), params.external_roles->begin(), params.external_roles->end());
auto new_granted_with_admin_option = user->granted_roles.findGrantedWithAdminOption(*params.external_roles);
current_roles_with_admin_option.insert(current_roles_with_admin_option.end(), new_granted_with_admin_option.begin(), new_granted_with_admin_option.end());
}
subscription_for_roles_changes.reset();
enabled_roles = access_control->getEnabledRoles(current_roles, current_roles_with_admin_option);
subscription_for_roles_changes = enabled_roles->subscribeForChanges([weak_ptr = weak_from_this()](const std::shared_ptr<const EnabledRolesInfo> & roles_info_)
@ -516,7 +523,6 @@ std::optional<QuotaUsage> ContextAccess::getQuotaUsage() const
return getQuota()->getUsage();
}
SettingsChanges ContextAccess::getDefaultSettings() const
{
std::lock_guard lock{mutex};

View File

@ -18,6 +18,7 @@ ContextAccessParams::ContextAccessParams(
bool full_access_,
bool use_default_roles_,
const std::shared_ptr<const std::vector<UUID>> & current_roles_,
const std::shared_ptr<const std::vector<UUID>> & external_roles_,
const Settings & settings_,
const String & current_database_,
const ClientInfo & client_info_)
@ -25,6 +26,7 @@ ContextAccessParams::ContextAccessParams(
, full_access(full_access_)
, use_default_roles(use_default_roles_)
, current_roles(current_roles_)
, external_roles(external_roles_)
, readonly(settings_[Setting::readonly])
, allow_ddl(settings_[Setting::allow_ddl])
, allow_introspection(settings_[Setting::allow_introspection_functions])
@ -59,6 +61,17 @@ String ContextAccessParams::toString() const
}
out << "]";
}
if (external_roles && !external_roles->empty())
{
out << separator() << "external_roles = [";
for (size_t i = 0; i != external_roles->size(); ++i)
{
if (i)
out << ", ";
out << (*external_roles)[i];
}
out << "]";
}
if (readonly)
out << separator() << "readonly = " << readonly;
if (allow_ddl)
@ -107,6 +120,7 @@ bool operator ==(const ContextAccessParams & left, const ContextAccessParams & r
CONTEXT_ACCESS_PARAMS_EQUALS(full_access)
CONTEXT_ACCESS_PARAMS_EQUALS(use_default_roles)
CONTEXT_ACCESS_PARAMS_EQUALS(current_roles)
CONTEXT_ACCESS_PARAMS_EQUALS(external_roles)
CONTEXT_ACCESS_PARAMS_EQUALS(readonly)
CONTEXT_ACCESS_PARAMS_EQUALS(allow_ddl)
CONTEXT_ACCESS_PARAMS_EQUALS(allow_introspection)
@ -157,6 +171,7 @@ bool operator <(const ContextAccessParams & left, const ContextAccessParams & ri
CONTEXT_ACCESS_PARAMS_LESS(full_access)
CONTEXT_ACCESS_PARAMS_LESS(use_default_roles)
CONTEXT_ACCESS_PARAMS_LESS(current_roles)
CONTEXT_ACCESS_PARAMS_LESS(external_roles)
CONTEXT_ACCESS_PARAMS_LESS(readonly)
CONTEXT_ACCESS_PARAMS_LESS(allow_ddl)
CONTEXT_ACCESS_PARAMS_LESS(allow_introspection)

View File

@ -19,6 +19,7 @@ public:
bool full_access_,
bool use_default_roles_,
const std::shared_ptr<const std::vector<UUID>> & current_roles_,
const std::shared_ptr<const std::vector<UUID>> & external_roles_,
const Settings & settings_,
const String & current_database_,
const ClientInfo & client_info_);
@ -31,6 +32,7 @@ public:
const bool use_default_roles;
const std::shared_ptr<const std::vector<UUID>> current_roles;
const std::shared_ptr<const std::vector<UUID>> external_roles;
const UInt64 readonly;
const bool allow_ddl;

View File

@ -26,7 +26,6 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
}
LDAPAccessStorage::LDAPAccessStorage(const String & storage_name_, AccessControl & access_control_, const Poco::Util::AbstractConfiguration & config, const String & prefix)
: IAccessStorage(storage_name_), access_control(access_control_), memory_storage(storage_name_, access_control.getChangesNotifier(), false)
{
@ -320,6 +319,10 @@ std::set<String> LDAPAccessStorage::mapExternalRolesNoLock(const LDAPClient::Sea
{
std::set<String> role_names;
// If this node can't access LDAP server (or has not privileges to fetch roles) and gets empty list of external roles
if (external_roles.size() == 0)
return role_names;
if (external_roles.size() != role_search_params.size())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unable to map external roles");

View File

@ -1121,6 +1121,7 @@ void ClientBase::processOrdinaryQuery(const String & query_to_execute, ASTPtr pa
&client_context->getSettingsRef(),
&client_context->getClientInfo(),
true,
{},
[&](const Progress & progress) { onProgress(progress); });
if (send_external_tables)
@ -1624,6 +1625,7 @@ void ClientBase::processInsertQuery(const String & query_to_execute, ASTPtr pars
&client_context->getSettingsRef(),
&client_context->getClientInfo(),
true,
{},
[&](const Progress & progress) { onProgress(progress); });
if (send_external_tables)

View File

@ -14,6 +14,7 @@
#include <Client/ClientBase.h>
#include <Client/Connection.h>
#include <Client/ConnectionParameters.h>
#include "Common/logger_useful.h"
#include <Common/ClickHouseRevision.h>
#include <Common/Exception.h>
#include <Common/NetException.h>
@ -22,8 +23,8 @@
#include <Common/StringUtils.h>
#include <Common/OpenSSLHelpers.h>
#include <Common/randomSeed.h>
#include <Common/logger_useful.h>
#include <Core/Block.h>
#include <Core/ProtocolDefines.h>
#include <Interpreters/ClientInfo.h>
#include <Interpreters/OpenTelemetrySpanLog.h>
#include <Compression/CompressionFactory.h>
@ -752,6 +753,7 @@ void Connection::sendQuery(
const Settings * settings,
const ClientInfo * client_info,
bool with_pending_data,
const std::vector<String> & external_roles,
std::function<void(const Progress &)>)
{
OpenTelemetry::SpanHolder span("Connection::sendQuery()", OpenTelemetry::SpanKind::CLIENT);
@ -824,6 +826,18 @@ void Connection::sendQuery(
else
writeStringBinary("" /* empty string is a marker of the end of settings */, *out);
String external_roles_str;
if (server_revision >= DBMS_MIN_PROTOCOL_VERSION_WITH_INTERSERVER_EXTERNALLY_GRANTED_ROLES)
{
WriteBufferFromString buffer(external_roles_str);
writeVectorBinary(external_roles, buffer);
buffer.finalize();
LOG_DEBUG(log_wrapper.get(), "Sending external_roles with query: [{}] ({})", fmt::join(external_roles, ", "), external_roles.size());
writeStringBinary(external_roles_str, *out);
}
/// Interserver secret
if (server_revision >= DBMS_MIN_REVISION_WITH_INTERSERVER_SECRET)
{
@ -844,6 +858,7 @@ void Connection::sendQuery(
data += query;
data += query_id;
data += client_info->initial_user;
data += external_roles_str;
/// TODO: add source/target host/ip-address
std::string hash = encodeSHA256(data);

View File

@ -108,6 +108,7 @@ public:
const Settings * settings/* = nullptr */,
const ClientInfo * client_info/* = nullptr */,
bool with_pending_data/* = false */,
const std::vector<String> & external_roles,
std::function<void(const Progress &)> process_progress_callback) override;
void sendCancel() override;

View File

@ -161,7 +161,8 @@ void HedgedConnections::sendQuery(
const String & query_id,
UInt64 stage,
ClientInfo & client_info,
bool with_pending_data)
bool with_pending_data,
const std::vector<String> & external_roles)
{
std::lock_guard lock(cancel_mutex);
@ -188,7 +189,7 @@ void HedgedConnections::sendQuery(
hedged_connections_factory.skipReplicasWithTwoLevelAggregationIncompatibility();
}
auto send_query = [this, timeouts, query, query_id, stage, client_info, with_pending_data](ReplicaState & replica)
auto send_query = [this, timeouts, query, query_id, stage, client_info, with_pending_data, external_roles](ReplicaState & replica)
{
Settings modified_settings = settings;
@ -218,7 +219,8 @@ void HedgedConnections::sendQuery(
modified_settings.set("allow_experimental_analyzer", static_cast<bool>(modified_settings[Setting::allow_experimental_analyzer]));
replica.connection->sendQuery(
timeouts, query, /* query_parameters */ {}, query_id, stage, &modified_settings, &client_info, with_pending_data, {});
timeouts, query, /* query_parameters */ {}, query_id, stage, &modified_settings, &client_info, with_pending_data, external_roles, {});
replica.change_replica_timeout.setRelative(timeouts.receive_data_timeout);
replica.packet_receiver->setTimeout(hedged_connections_factory.getConnectionTimeouts().receive_timeout);
};

View File

@ -90,7 +90,8 @@ public:
const String & query_id,
UInt64 stage,
ClientInfo & client_info,
bool with_pending_data) override;
bool with_pending_data,
const std::vector<String> & external_roles) override;
void sendReadTaskResponse(const String &) override
{

View File

@ -23,7 +23,8 @@ public:
const String & query_id,
UInt64 stage,
ClientInfo & client_info,
bool with_pending_data) = 0;
bool with_pending_data,
const std::vector<String> & external_roles) = 0;
virtual void sendReadTaskResponse(const String &) = 0;
virtual void sendMergeTreeReadTaskResponse(const ParallelReadResponse & response) = 0;

View File

@ -100,6 +100,7 @@ public:
const Settings * settings,
const ClientInfo * client_info,
bool with_pending_data,
const std::vector<String> & external_roles,
std::function<void(const Progress &)> process_progress_callback) = 0;
virtual void sendCancel() = 0;

View File

@ -106,6 +106,7 @@ void LocalConnection::sendQuery(
const Settings *,
const ClientInfo * client_info,
bool,
const std::vector<String> & /*external_roles*/,
std::function<void(const Progress &)> process_progress_callback)
{
/// Last query may not have been finished or cancelled due to exception on client side.

View File

@ -114,6 +114,7 @@ public:
const Settings * settings/* = nullptr */,
const ClientInfo * client_info/* = nullptr */,
bool with_pending_data/* = false */,
const std::vector<String> & external_roles,
std::function<void(const Progress &)> process_progress_callback) override;
void sendCancel() override;

View File

@ -128,7 +128,8 @@ void MultiplexedConnections::sendQuery(
const String & query_id,
UInt64 stage,
ClientInfo & client_info,
bool with_pending_data)
bool with_pending_data,
const std::vector<String> & external_roles)
{
std::lock_guard lock(cancel_mutex);
@ -181,14 +182,14 @@ void MultiplexedConnections::sendQuery(
modified_settings[Setting::parallel_replica_offset] = i;
replica_states[i].connection->sendQuery(
timeouts, query, /* query_parameters */ {}, query_id, stage, &modified_settings, &client_info, with_pending_data, {});
timeouts, query, /* query_parameters */ {}, query_id, stage, &modified_settings, &client_info, with_pending_data, external_roles, {});
}
}
else
{
/// Use single replica.
replica_states[0].connection->sendQuery(
timeouts, query, /* query_parameters */ {}, query_id, stage, &modified_settings, &client_info, with_pending_data, {});
timeouts, query, /* query_parameters */ {}, query_id, stage, &modified_settings, &client_info, with_pending_data, external_roles, {});
}
sent_query = true;

View File

@ -36,7 +36,8 @@ public:
const String & query_id,
UInt64 stage,
ClientInfo & client_info,
bool with_pending_data) override;
bool with_pending_data,
const std::vector<String> & external_roles) override;
void sendReadTaskResponse(const String &) override;
void sendMergeTreeReadTaskResponse(const ParallelReadResponse & response) override;

View File

@ -163,7 +163,7 @@ void Suggest::load(IServerConnection & connection,
void Suggest::fetch(IServerConnection & connection, const ConnectionTimeouts & timeouts, const std::string & query, const ClientInfo & client_info)
{
connection.sendQuery(
timeouts, query, {} /* query_parameters */, "" /* query_id */, QueryProcessingStage::Complete, nullptr, &client_info, false, {});
timeouts, query, {} /* query_parameters */, "" /* query_id */, QueryProcessingStage::Complete, nullptr, &client_info, false, {} /* external_roles*/, {});
while (true)
{

View File

@ -90,6 +90,9 @@ static constexpr auto DBMS_MIN_PROTOCOL_VERSION_WITH_CHUNKED_PACKETS = 54470;
static constexpr auto DBMS_MIN_REVISION_WITH_VERSIONED_PARALLEL_REPLICAS_PROTOCOL = 54471;
/// Push externally granted roles to other nodes
static constexpr auto DBMS_MIN_PROTOCOL_VERSION_WITH_INTERSERVER_EXTERNALLY_GRANTED_ROLES = 54472;
/// Version of ClickHouse TCP protocol.
///
/// Should be incremented manually on protocol changes.
@ -97,6 +100,6 @@ static constexpr auto DBMS_MIN_REVISION_WITH_VERSIONED_PARALLEL_REPLICAS_PROTOCO
/// NOTE: DBMS_TCP_PROTOCOL_VERSION has nothing common with VERSION_REVISION,
/// later is just a number for server version (one number instead of commit SHA)
/// for simplicity (sometimes it may be more convenient in some use cases).
static constexpr auto DBMS_TCP_PROTOCOL_VERSION = 54471;
static constexpr auto DBMS_TCP_PROTOCOL_VERSION = 54472;
}

View File

@ -5727,6 +5727,9 @@ If enabled, MongoDB tables will return an error when a MongoDB query cannot be b
Allow writing simple SELECT queries without the leading SELECT keyword, which makes it simple for calculator-style usage, e.g. `1 + 2` becomes a valid query.
In `clickhouse-local` it is enabled by default and can be explicitly disabled.
)", 0) \
DECLARE(Bool, push_external_roles_in_interserver_queries, true, R"(
Enable pushing user roles from originator to other nodes while performing a query.
)", 0) \
\
\

View File

@ -86,6 +86,7 @@ static std::initializer_list<std::pair<ClickHouseVersion, SettingsChangesHistory
{"filesystem_cache_prefer_bigger_buffer_size", true, true, "New setting"},
{"read_in_order_use_virtual_row", false, false, "Use virtual row while reading in order of primary key or its monotonic function fashion. It is useful when searching over multiple parts as only relevant ones are touched."},
{"filesystem_cache_boundary_alignment", 0, 0, "New setting"},
{"push_external_roles_in_interserver_queries", false, false, "New setting."},
}
},
{"24.10",

View File

@ -1493,7 +1493,7 @@ ConfigurationPtr Context::getUsersConfig()
return shared->users_config;
}
void Context::setUser(const UUID & user_id_)
void Context::setUser(const UUID & user_id_, const std::vector<UUID> & external_roles_)
{
/// Prepare lists of user's profiles, constraints, settings, roles.
/// NOTE: AccessControl::read<User>() and other AccessControl's functions may require some IO work,
@ -1508,7 +1508,6 @@ void Context::setUser(const UUID & user_id_)
const auto & database = user->default_database;
/// Apply user's profiles, constraints, settings, roles.
std::lock_guard lock(mutex);
setUserIDWithLock(user_id_, lock);
@ -1518,6 +1517,7 @@ void Context::setUser(const UUID & user_id_)
setCurrentProfilesWithLock(*enabled_profiles, /* check_constraints= */ false, lock);
setCurrentRolesWithLock(default_roles, lock);
setExternalRolesWithLock(external_roles_, lock);
/// It's optional to specify the DEFAULT DATABASE in the user's definition.
if (!database.empty())
@ -1561,6 +1561,18 @@ void Context::setCurrentRolesWithLock(const std::vector<UUID> & new_current_role
need_recalculate_access = true;
}
void Context::setExternalRolesWithLock(const std::vector<UUID> & new_external_roles, const std::lock_guard<ContextSharedMutex> &)
{
if (external_roles && !external_roles->empty())
{
if (current_roles)
current_roles->insert(current_roles->end(), new_external_roles.begin(), new_external_roles.end());
else
current_roles = std::make_shared<std::vector<UUID>>(new_external_roles);
}
need_recalculate_access = true;
}
void Context::setCurrentRolesImpl(const std::vector<UUID> & new_current_roles, bool throw_if_not_granted, bool skip_if_not_granted, const std::shared_ptr<const User> & user)
{
if (skip_if_not_granted)
@ -1675,7 +1687,7 @@ std::shared_ptr<const ContextAccessWrapper> Context::getAccess() const
bool full_access = !user_id;
return ContextAccessParams{
user_id, full_access, /* use_default_roles= */ false, current_roles, *settings, current_database, client_info};
user_id, full_access, /* use_default_roles= */ false, current_roles, external_roles, *settings, current_database, client_info};
};
/// Check if the current access rights are still valid, otherwise get parameters for recalculating access rights.

View File

@ -289,6 +289,7 @@ protected:
std::optional<UUID> user_id;
std::shared_ptr<std::vector<UUID>> current_roles;
std::shared_ptr<std::vector<UUID>> external_roles;
std::shared_ptr<const SettingsConstraintsAndProfileIDs> settings_constraints_and_current_profiles;
mutable std::shared_ptr<const ContextAccess> access;
mutable bool need_recalculate_access = true;
@ -634,7 +635,7 @@ public:
/// Sets the current user assuming that he/she is already authenticated.
/// WARNING: This function doesn't check password!
void setUser(const UUID & user_id_);
void setUser(const UUID & user_id_, const std::vector<UUID> & external_roles_ = {});
UserPtr getUser() const;
std::optional<UUID> getUserID() const;
@ -1398,6 +1399,8 @@ private:
void setCurrentRolesWithLock(const std::vector<UUID> & new_current_roles, const std::lock_guard<ContextSharedMutex> & lock);
void setExternalRolesWithLock(const std::vector<UUID> & new_external_roles, const std::lock_guard<ContextSharedMutex> & lock);
void setSettingWithLock(std::string_view name, const String & value, const std::lock_guard<ContextSharedMutex> & lock);
void setSettingWithLock(std::string_view name, const Field & value, const std::lock_guard<ContextSharedMutex> & lock);

View File

@ -6,6 +6,8 @@
#include <Access/ContextAccess.h>
#include <Access/SettingsProfilesInfo.h>
#include <Access/User.h>
#include <Access/Role.h>
#include <Common/typeid_cast.h>
#include <Common/logger_useful.h>
#include <Common/Exception.h>
#include <Common/ThreadPool.h>
@ -25,12 +27,12 @@
#include <unordered_map>
#include <vector>
namespace DB
{
namespace Setting
{
extern const SettingsUInt64 max_sessions_for_user;
extern const SettingsBool push_external_roles_in_interserver_queries;
}
namespace ErrorCodes
@ -288,7 +290,7 @@ void Session::shutdownNamedSessions()
Session::Session(const ContextPtr & global_context_, ClientInfo::Interface interface_, bool is_secure, const std::string & certificate)
: auth_id(UUIDHelpers::generateV4()),
global_context(global_context_),
log(getLogger(String{magic_enum::enum_name(interface_)} + "-Session"))
log(getLogger(String{magic_enum::enum_name(interface_)} + "-Session-" + toString(auth_id)))
{
prepared_client_info.emplace();
prepared_client_info->interface = interface_;
@ -342,12 +344,12 @@ std::unordered_set<AuthenticationType> Session::getAuthenticationTypesOrLogInFai
}
}
void Session::authenticate(const String & user_name, const String & password, const Poco::Net::SocketAddress & address)
void Session::authenticate(const String & user_name, const String & password, const Poco::Net::SocketAddress & address, const Strings & external_roles_)
{
authenticate(BasicCredentials{user_name, password}, address);
authenticate(BasicCredentials{user_name, password}, address, external_roles_);
}
void Session::authenticate(const Credentials & credentials_, const Poco::Net::SocketAddress & address_)
void Session::authenticate(const Credentials & credentials_, const Poco::Net::SocketAddress & address_, const Strings & external_roles_)
{
if (session_context)
throw Exception(ErrorCodes::LOGICAL_ERROR, "If there is a session context it must be created after authentication");
@ -359,8 +361,8 @@ void Session::authenticate(const Credentials & credentials_, const Poco::Net::So
if ((address == Poco::Net::SocketAddress{}) && (prepared_client_info->interface == ClientInfo::Interface::LOCAL))
address = Poco::Net::SocketAddress{"127.0.0.1", 0};
LOG_DEBUG(log, "{} Authenticating user '{}' from {}",
toString(auth_id), credentials_.getUserName(), address.toString());
LOG_DEBUG(log, "Authenticating user '{}' from {}",
credentials_.getUserName(), address.toString());
try
{
@ -370,6 +372,14 @@ void Session::authenticate(const Credentials & credentials_, const Poco::Net::So
settings_from_auth_server = auth_result.settings;
LOG_DEBUG(log, "{} Authenticated with global context as user {}",
toString(auth_id), toString(*user_id));
if (!external_roles_.empty() && global_context->getSettingsRef()[Setting::push_external_roles_in_interserver_queries])
{
external_roles = global_context->getAccessControl().find<Role>(external_roles_);
LOG_DEBUG(log, "User {} has external_roles applied: [{}] ({})",
toString(*user_id), fmt::join(external_roles_, ", "), external_roles_.size());
}
}
catch (const Exception & e)
{
@ -394,7 +404,7 @@ void Session::checkIfUserIsStillValid()
void Session::onAuthenticationFailure(const std::optional<String> & user_name, const Poco::Net::SocketAddress & address_, const Exception & e)
{
LOG_DEBUG(log, "{} Authentication failed with error: {}", toString(auth_id), e.what());
LOG_DEBUG(log, "Authentication failed with error: {}", e.what());
if (auto session_log = getSessionLog())
{
/// Add source address to the log
@ -520,8 +530,8 @@ ContextMutablePtr Session::makeSessionContext()
if (session_tracker_handle)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Session tracker handle was created before making session");
LOG_DEBUG(log, "{} Creating session context with user_id: {}",
toString(auth_id), toString(*user_id));
LOG_DEBUG(log, "Creating session context with user_id: {}",
toString(*user_id));
/// Make a new session context.
ContextMutablePtr new_session_context;
new_session_context = Context::createCopy(global_context);
@ -532,7 +542,7 @@ ContextMutablePtr Session::makeSessionContext()
prepared_client_info.reset();
/// Set user information for the new context: current profiles, roles, access rights.
new_session_context->setUser(*user_id);
new_session_context->setUser(*user_id, external_roles);
/// Session context is ready.
session_context = new_session_context;
@ -563,8 +573,8 @@ ContextMutablePtr Session::makeSessionContext(const String & session_name_, std:
if (session_tracker_handle)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Session tracker handle was created before making session");
LOG_DEBUG(log, "{} Creating named session context with name: {}, user_id: {}",
toString(auth_id), session_name_, toString(*user_id));
LOG_DEBUG(log, "Creating named session context with name: {}, user_id: {}",
session_name_, toString(*user_id));
/// Make a new session context OR
/// if the `session_id` and `user_id` were used before then just get a previously created session context.
@ -587,7 +597,7 @@ ContextMutablePtr Session::makeSessionContext(const String & session_name_, std:
/// Set user information for the new context: current profiles, roles, access rights.
if (!access->tryGetUser())
{
new_session_context->setUser(*user_id);
new_session_context->setUser(*user_id, external_roles);
max_sessions_for_user = new_session_context->getSettingsRef()[Setting::max_sessions_for_user];
}
else
@ -639,7 +649,7 @@ ContextMutablePtr Session::makeQueryContextImpl(const ClientInfo * client_info_t
throw Exception(ErrorCodes::LOGICAL_ERROR, "Query context must be created after authentication");
/// We can create a query context either from a session context or from a global context.
bool from_session_context = static_cast<bool>(session_context);
const bool from_session_context = static_cast<bool>(session_context);
/// Create a new query context.
ContextMutablePtr query_context = Context::createCopy(from_session_context ? session_context : global_context);
@ -679,7 +689,7 @@ ContextMutablePtr Session::makeQueryContextImpl(const ClientInfo * client_info_t
/// Set user information for the new context: current profiles, roles, access rights.
if (user_id && !query_context->getAccess()->tryGetUser())
query_context->setUser(*user_id);
query_context->setUser(*user_id, external_roles);
/// Query context is ready.
query_context_created = true;

View File

@ -10,6 +10,7 @@
#include <memory>
#include <mutex>
#include <optional>
#include <vector>
namespace Poco::Net { class SocketAddress; }
@ -50,8 +51,11 @@ public:
/// Sets the current user, checks the credentials and that the specified address is allowed to connect from.
/// The function throws an exception if there is no such user or password is wrong.
void authenticate(const String & user_name, const String & password, const Poco::Net::SocketAddress & address);
void authenticate(const Credentials & credentials_, const Poco::Net::SocketAddress & address_);
void authenticate(const String & user_name, const String & password, const Poco::Net::SocketAddress & address, const Strings & external_roles_ = {});
/// `external_roles_` names of the additional roles (over what is granted via local access control mechanisms) that would be granted to user during this session.
/// Role is not granted if it can't be found by name via AccessControl (i.e. doesn't exist on this instance).
void authenticate(const Credentials & credentials_, const Poco::Net::SocketAddress & address_, const Strings & external_roles_ = {});
// Verifies whether the user's validity extends beyond the current time.
// Throws an exception if the user's validity has expired.
@ -112,6 +116,7 @@ private:
mutable UserPtr user;
std::optional<UUID> user_id;
std::vector<UUID> external_roles;
AuthenticationData user_authenticated_with;
ContextMutablePtr session_context;

View File

@ -56,8 +56,9 @@ RemoteInserter::RemoteInserter(
/** Send query and receive "header", that describes table structure.
* Header is needed to know, what structure is required for blocks to be passed to 'write' method.
*/
/// TODO (vnemkov): figure out should we pass additional roles in this case or not.
connection.sendQuery(
timeouts, query, /* query_parameters */ {}, "", QueryProcessingStage::Complete, &settings, &modified_client_info, false, {});
timeouts, query, /* query_parameters */ {}, "", QueryProcessingStage::Complete, &settings, &modified_client_info, false, /* external_roles */ {}, {});
while (true)
{

View File

@ -22,8 +22,12 @@
#include <Client/MultiplexedConnections.h>
#include <Client/HedgedConnections.h>
#include <Storages/MergeTree/MergeTreeDataPartUUID.h>
#include <Storages/StorageMemory.h>
#include <Storages/MergeTree/ParallelReplicasReadingCoordinator.h>
#include <Storages/StorageMemory.h>
#include <Access/AccessControl.h>
#include <Access/User.h>
#include <Access/Role.h>
namespace ProfileEvents
{
@ -43,6 +47,7 @@ namespace Setting
extern const SettingsBool skip_unavailable_shards;
extern const SettingsOverflowMode timeout_overflow_mode;
extern const SettingsBool use_hedged_requests;
extern const SettingsBool push_external_roles_in_interserver_queries;
}
namespace ErrorCodes
@ -398,7 +403,25 @@ void RemoteQueryExecutor::sendQueryUnlocked(ClientInfo::QueryKind query_kind, As
if (!duplicated_part_uuids.empty())
connections->sendIgnoredPartUUIDs(duplicated_part_uuids);
connections->sendQuery(timeouts, query, query_id, stage, modified_client_info, true);
// Collect all roles granted on this node and pass those to the remote node
std::vector<String> local_granted_roles;
if (context->getSettingsRef()[Setting::push_external_roles_in_interserver_queries] && !modified_client_info.initial_user.empty())
{
auto user = context->getAccessControl().read<User>(modified_client_info.initial_user, true);
boost::container::flat_set<String> granted_roles;
if (user)
{
const auto & access_control = context->getAccessControl();
for (const auto & e : user->granted_roles.getElements())
{
auto names = access_control.readNames(e.ids);
granted_roles.insert(names.begin(), names.end());
}
}
local_granted_roles.insert(local_granted_roles.end(), granted_roles.begin(), granted_roles.end());
}
connections->sendQuery(timeouts, query, query_id, stage, modified_client_info, true, local_granted_roles);
established = false;
sent_query = true;

View File

@ -63,12 +63,17 @@
#include <Core/Protocol.h>
#include <Storages/MergeTree/RequestResponse.h>
#include <Interpreters/ClientInfo.h>
#include "TCPHandler.h"
#include <Common/config_version.h>
#include <fmt/format.h>
#include <fmt/ostream.h>
#include <Common/StringUtils.h>
using namespace std::literals;
using namespace DB;
@ -1960,6 +1965,13 @@ void TCPHandler::processQuery(std::optional<QueryState> & state)
Settings passed_settings;
passed_settings.read(*in, settings_format);
std::string received_extra_roles;
// TODO: check if having `is_interserver_mode` doesn't break interoperability with the CH-client.
if (client_tcp_protocol_version >= DBMS_MIN_PROTOCOL_VERSION_WITH_INTERSERVER_EXTERNALLY_GRANTED_ROLES)
{
readStringBinary(received_extra_roles, *in);
}
/// Interserver secret.
std::string received_hash;
if (client_tcp_protocol_version >= DBMS_MIN_REVISION_WITH_INTERSERVER_SECRET)
@ -2019,6 +2031,7 @@ void TCPHandler::processQuery(std::optional<QueryState> & state)
data += state->query;
data += state->query_id;
data += client_info.initial_user;
data += received_extra_roles;
std::string calculated_hash = encodeSHA256(data);
assert(calculated_hash.size() == 32);
@ -2039,13 +2052,25 @@ void TCPHandler::processQuery(std::optional<QueryState> & state)
}
else
{
// In a cluster, query originator may have an access to the external auth provider (like LDAP server),
// that grants specific roles to the user. We want these roles to be granted to the user on other nodes of cluster when
// query is executed.
Strings external_roles;
if (!received_extra_roles.empty())
{
ReadBufferFromString buffer(received_extra_roles);
readVectorBinary(external_roles, buffer);
LOG_DEBUG(log, "Parsed extra roles [{}]", fmt::join(external_roles, ", "));
}
LOG_DEBUG(log, "User (initial, interserver mode): {} (client: {})", client_info.initial_user, getClientAddress(client_info).toString());
/// In case of inter-server mode authorization is done with the
/// initial address of the client, not the real address from which
/// the query was come, since the real address is the address of
/// the initiator server, while we are interested in client's
/// address.
session->authenticate(AlwaysAllowCredentials{client_info.initial_user}, client_info.initial_address);
session->authenticate(AlwaysAllowCredentials{client_info.initial_user}, client_info.initial_address, external_roles);
}
is_interserver_authenticated = true;

View File

@ -0,0 +1,11 @@
<clickhouse>
<ldap_servers>
<openldap>
<host>openldap</host>
<port>389</port>
<bind_dn>cn={user_name},ou=users,dc=company,dc=com</bind_dn>
<enable_tls>no</enable_tls>
<tls_require_cert>never</tls_require_cert>
</openldap>
</ldap_servers>
</clickhouse>

View File

@ -0,0 +1,15 @@
<clickhouse>
<logger>
<level>debug</level>
<log>/var/log/clickhouse-server/log.log</log>
<errorlog>/var/log/clickhouse-server/log.err.log</errorlog>
<size>1000M</size>
<count>10</count>
<stderr>/var/log/clickhouse-server/stderr.log</stderr>
<stdout>/var/log/clickhouse-server/stdout.log</stdout>
<library_bridge_log>/var/log/clickhouse-server/clickhouse-library-bridge.log</library_bridge_log>
<library_bridge_errlog>/var/log/clickhouse-server/clickhouse-library-bridge.err.log</library_bridge_errlog>
<library_bridge_level>trace</library_bridge_level>
</logger>
</clickhouse>

View File

@ -0,0 +1,46 @@
<clickhouse>
<remote_servers>
<sharded_cluster_without_secret>
<shard>
<replica>
<host>instance1</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>instance2</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>instance3</host>
<port>9000</port>
</replica>
</shard>
</sharded_cluster_without_secret>
<sharded_cluster_with_secret>
<secret>qwerty123</secret>
<shard>
<replica>
<host>instance1</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>instance2</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>instance3</host>
<port>9000</port>
</replica>
</shard>
</sharded_cluster_with_secret>
</remote_servers>
</clickhouse>

View File

@ -0,0 +1,64 @@
# LDIF Export for dc=company,dc=com
# Server: openldap (openldap)
# Search Scope: sub
# Search Filter: (objectClass=*)
# Total Entries: 7
#
# Generated by phpLDAPadmin (http://phpldapadmin.sourceforge.net) on May 22, 2020 5:51 pm
# Version: 1.2.5
# Entry 1: dc=company,dc=com
#dn: dc=company,dc=com
#dc: company
#o: company
#objectclass: top
#objectclass: dcObject
#objectclass: organization
# Entry 2: cn=admin,dc=company,dc=com
#dn: cn=admin,dc=company,dc=com
#cn: admin
#description: LDAP administrator
#objectclass: simpleSecurityObject
#objectclass: organizationalRole
#userpassword: {SSHA}eUEupkQCTvq9SkrxfWGSe5rX+orrjVbF
# Entry 3: ou=groups,dc=company,dc=com
dn: ou=groups,dc=company,dc=com
objectclass: organizationalUnit
objectclass: top
ou: groups
# Entry 4: cn=admin,ou=groups,dc=company,dc=com
dn: cn=admin,ou=groups,dc=company,dc=com
cn: admin
gidnumber: 500
objectclass: posixGroup
objectclass: top
# Entry 5: cn=users,ou=groups,dc=company,dc=com
dn: cn=users,ou=groups,dc=company,dc=com
cn: users
gidnumber: 501
objectclass: posixGroup
objectclass: top
# Entry 6: ou=users,dc=company,dc=com
dn: ou=users,dc=company,dc=com
objectclass: organizationalUnit
objectclass: top
ou: users
# Entry 7: cn=myuser,ou=users,dc=company,dc=com
# dn: cn=myuser,ou=users,dc=company,dc=com
# cn: myuser
# gidnumber: 501
# givenname: John
# homedirectory: /home/users/myuser
# objectclass: inetOrgPerson
# objectclass: posixAccount
# objectclass: top
# sn: User
# uid: myuser
# uidnumber: 1101
# userpassword: myuser

View File

@ -0,0 +1,411 @@
import time
from os import getuid
import pytest
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
cluster.add_instance(
"instance1",
main_configs=[
"configs/clickhouse/config.d/logger.xml",
"configs/clickhouse/config.d/ldap_servers.xml",
"configs/clickhouse/config.d/remote_servers.xml",
],
macros={"shard": 1, "replica": "instance1"},
stay_alive=True,
with_ldap=True,
with_zookeeper=True,
)
cluster.add_instance(
"instance2",
main_configs=[
"configs/clickhouse/config.d/logger.xml",
"configs/clickhouse/config.d/ldap_servers.xml",
"configs/clickhouse/config.d/remote_servers.xml",
],
macros={"shard": 1, "replica": "instance2"},
stay_alive=True,
with_ldap=True,
with_zookeeper=True,
)
cluster.add_instance(
"instance3",
main_configs=[
"configs/clickhouse/config.d/logger.xml",
"configs/clickhouse/config.d/ldap_servers.xml",
"configs/clickhouse/config.d/remote_servers.xml",
],
macros={"shard": 1, "replica": "instance3"},
stay_alive=True,
with_ldap=True,
with_zookeeper=True,
)
instances = [
cluster.instances["instance1"],
cluster.instances["instance2"],
cluster.instances["instance3"],
]
ldap_server = {
"host": "openldap",
"port": "389",
"enable_tls": "no",
"bind_dn": "cn={user_name},ou=users,dc=company,dc=com",
}
# Fixtures
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
# Helpers
def create_table(node, on_cluster, name=None):
if name is None:
name = f"tbl_{getuid()}"
node.query(f"DROP TABLE IF EXISTS {name} ON CLUSTER {on_cluster} SYNC")
node.query(
f"CREATE TABLE {name} ON CLUSTER {on_cluster} (d Date, a String, b UInt8, x String, y Int8) "
f"ENGINE = ReplicatedMergeTree('/clickhouse/tables/{{shard}}/{name}', '{{replica}}') "
"PARTITION BY y ORDER BY (d, b)"
)
return name
def create_distributed_table(node, on_cluster, over, name=None):
if name is None:
name = f"dis_tbl_{getuid()}"
node.query(f"DROP TABLE IF EXISTS {name} ON CLUSTER {on_cluster} SYNC")
node.query(
f"CREATE TABLE {name} ON CLUSTER {on_cluster} AS {over} "
f"ENGINE = Distributed({on_cluster}, default, {over}, rand())"
)
return name
def drop_table(node, name, on_cluster):
node.query(f"DROP TABLE IF EXISTS {name} ON CLUSTER {on_cluster} SYNC")
def grant_select(cluster, privilege, role_or_user, node):
"""Grant select privilege on a table on a given cluster
to a role or a user.
"""
node.query(f"GRANT ON CLUSTER {cluster} {privilege} TO {role_or_user}")
def revoke_select(cluster, privilege, role_or_user, node):
node.query(f"REVOKE ON CLUSTER {cluster} {privilege} FROM {role_or_user}")
def fix_ldap_permissions_command():
ldif = (
"dn: olcDatabase={1}mdb,cn=config\n"
"changetype: modify\n"
"delete: olcAccess\n"
"-\n"
"add: olcAccess\n"
'olcAccess: to attrs=userPassword,shadowLastChange by self write by dn=\\"cn=admin,dc=company,dc=com\\" write by anonymous auth by * none\n'
'olcAccess: to * by self write by dn=\\"cn=admin,dc=company,dc=com\\" read by users read by * none'
)
return f'echo -e "{ldif}" | ldapmodify -Y EXTERNAL -Q -H ldapi:///'
def add_user_to_ldap_command(
cn,
userpassword,
givenname=None,
homedirectory=None,
sn=None,
uid=None,
uidnumber=None,
):
if uid is None:
uid = cn
if givenname is None:
givenname = "John"
if homedirectory is None:
homedirectory = f"/home/{cn}"
if sn is None:
sn = "User"
if uidnumber is None:
uidnumber = 2000
user = {
"dn": f"cn={cn},ou=users,dc=company,dc=com",
"cn": cn,
"gidnumber": 501,
"givenname": givenname,
"homedirectory": homedirectory,
"objectclass": ["inetOrgPerson", "posixAccount", "top"],
"sn": sn,
"uid": uid,
"uidnumber": uidnumber,
"userpassword": userpassword,
"_server": cluster.ldap_host,
}
lines = []
for key, value in list(user.items()):
if key.startswith("_"):
continue
elif key == "objectclass":
for cls in value:
lines.append(f"objectclass: {cls}")
else:
lines.append(f"{key}: {value}")
ldif = "\n".join(lines)
return f'echo -e "{ldif}" | ldapadd -x -H ldap://localhost -D "cn=admin,dc=company,dc=com" -w admin'
def add_rbac_user(user, node):
username = user.get("username", None) or user["cn"]
password = user.get("password", None) or user["userpassword"]
node.query(
f"CREATE USER OR REPLACE {username} IDENTIFIED WITH PLAINTEXT_PASSWORD BY '{password}'"
)
def add_rbac_role(role, node):
node.query(f"DROP ROLE IF EXISTS {role}")
node.query(f"CREATE ROLE OR REPLACE {role}")
def outline_test_select_using_mapped_role(cluster, role_name, role_mapped, user):
# default cluster node
node = instances[0]
query_settings = {"user": user["username"], "password": user["password"]}
# create base table on cluster
src_table = create_table(node=node, on_cluster=cluster)
# create distristibuted table over base table on cluster
dist_table = create_distributed_table(on_cluster=cluster, over=src_table, node=node)
# check that grants for the user
for instance in instances:
for _ in range(10):
r = instance.query(f"SHOW GRANTS", settings=query_settings)
if role_mapped:
assert role_name in r
else:
time.sleep(1)
# no privilege on source table
for instance in instances:
assert "Not enough privileges" in instance.query_and_get_error(
f"SELECT * FROM {src_table}", settings=query_settings
)
# with privilege on source table
grant_select(
cluster=cluster,
privilege=f"SELECT ON {src_table}",
role_or_user=role_name,
node=node,
)
# user should be able to read from the source table
for instance in instances:
if role_mapped:
instance.query(f"SELECT * FROM {src_table}", settings=query_settings)
else:
instance.query_and_get_error(
f"SELECT * FROM {src_table}", settings=query_settings
) == 241
revoke_select(
cluster=cluster,
privilege=f"SELECT ON {src_table}",
role_or_user=role_name,
node=node,
)
# privilege only on distributed table
grant_select(
cluster=cluster,
privilege=f"SELECT ON {dist_table}",
role_or_user=role_name,
node=node,
)
# user should still not be able to read from distributed table
for instance in instances:
instance.query_and_get_error(
f"SELECT * FROM {dist_table}", settings=query_settings
) == 241
revoke_select(
cluster=cluster,
privilege=f"SELECT ON {dist_table}",
role_or_user=role_name,
node=node,
)
# privilege only on source but not on distributed table
grant_select(
cluster=cluster,
privilege=f"SELECT ON {src_table}",
role_or_user=role_name,
node=node,
)
# user should still not be able to read from distributed table
for instance in instances:
instance.query_and_get_error(
f"SELECT * FROM {dist_table}", settings=query_settings
) == 241
revoke_select(
cluster=cluster,
privilege=f"SELECT ON {src_table}",
role_or_user=role_name,
node=node,
)
# privilege on source and distributed
grant_select(
cluster=cluster,
privilege=f"SELECT ON {src_table}",
role_or_user=role_name,
node=node,
)
grant_select(
cluster=cluster,
privilege=f"SELECT ON {dist_table}",
role_or_user=role_name,
node=node,
)
# user should be able to read from the distributed table
for instance in instances:
if role_mapped:
instance.query(f"SELECT * FROM {dist_table}", settings=query_settings)
else:
instance.query_and_get_error(
f"SELECT * FROM {dist_table}", settings=query_settings
) == 241
revoke_select(
cluster=cluster,
privilege=f"SELECT ON {src_table}",
role_or_user=role_name,
node=node,
)
revoke_select(
cluster=cluster,
privilege=f"SELECT ON {dist_table}",
role_or_user=role_name,
node=node,
)
def execute_tests(role_name, role_mapped, ldap_user, local_user):
for cluster_type in ["with_secret", "without_secret"]:
for user in [ldap_user, local_user]:
if role_mapped and user["type"] == "local user":
for instance in instances:
instance.query(f"GRANT {role_name} TO {local_user['username']}")
outline_test_select_using_mapped_role(
cluster=f"sharded_cluster_{cluster_type}",
role_name=role_name,
role_mapped=role_mapped,
user=user,
)
def test_using_authenticated_users(started_cluster):
role_name = f"role_{getuid()}"
ldap_user = {
"type": "ldap authenticated user",
"cn": "myuser",
"username": "myuser",
"userpassword": "myuser",
"password": "myuser",
"server": "openldap",
"uidnumber": 1101,
}
local_user = {
"type": "local user",
"username": "local_user2",
"password": "local_user2",
}
# fix_ldap_permissions
cluster.exec_in_container(
cluster.ldap_id, ["bash", "-c", fix_ldap_permissions_command()]
)
# add LDAP user
cluster.exec_in_container(
cluster.ldap_id,
[
"bash",
"-c",
add_user_to_ldap_command(
cn=ldap_user["cn"],
userpassword=ldap_user["userpassword"],
uidnumber=ldap_user["uidnumber"],
),
],
)
# cluster.exec_in_container(cluster.ldap_id, ["ldapsearch -x -LLL uid=*"])
# add local RBAC user
for instance in instances:
add_rbac_user(user=local_user, node=instance)
# add RBAC role on cluster that user will use
for instance in instances:
add_rbac_role(role_name, instance)
# create LDAP-auth user and grant role
for instance in instances:
instance.query(
f"CREATE USER OR REPLACE {ldap_user['username']} IDENTIFIED WITH LDAP SERVER '{ldap_user['server']}'"
)
for instance in instances:
instance.query(f"GRANT {role_name} TO {ldap_user['username']}")
# grant role to local RBAC user
for instance in instances:
instance.query(f"GRANT {role_name} TO {local_user['username']}")
execute_tests(
role_name=role_name,
role_mapped=role_name,
ldap_user=ldap_user,
local_user=local_user,
)