This commit is contained in:
pufit 2024-11-20 15:12:06 -08:00 committed by GitHub
commit c9158735e0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 49 additions and 1 deletions

View File

@ -198,6 +198,7 @@ namespace DB
DECLARE(UInt64, parts_killer_pool_size, 128, "Threads for cleanup of shared merge tree outdated threads. Only available in ClickHouse Cloud", 0) \ DECLARE(UInt64, parts_killer_pool_size, 128, "Threads for cleanup of shared merge tree outdated threads. Only available in ClickHouse Cloud", 0) \
DECLARE(UInt64, keeper_multiread_batch_size, 10'000, "Maximum size of batch for MultiRead request to [Zoo]Keeper that support batching. If set to 0, batching is disabled. Available only in ClickHouse Cloud.", 0) \ DECLARE(UInt64, keeper_multiread_batch_size, 10'000, "Maximum size of batch for MultiRead request to [Zoo]Keeper that support batching. If set to 0, batching is disabled. Available only in ClickHouse Cloud.", 0) \
DECLARE(Bool, use_legacy_mongodb_integration, true, "Use the legacy MongoDB integration implementation. Note: it's highly recommended to set this option to false, since legacy implementation will be removed in the future. Please submit any issues you encounter with the new implementation.", 0) \ DECLARE(Bool, use_legacy_mongodb_integration, true, "Use the legacy MongoDB integration implementation. Note: it's highly recommended to set this option to false, since legacy implementation will be removed in the future. Please submit any issues you encounter with the new implementation.", 0) \
DECLARE(Bool, validate_access_consistency_between_instances, true, "Validate that the instance has the same user with exactly the same access before executing a DDL query. Note: turning this setting off may expose your cluster to potential permission escalation. Change this setting only if you know what you are doing.", 0) \
\ \
DECLARE(UInt64, prefetch_threadpool_pool_size, 100, "Size of background pool for prefetches for remote object storages", 0) \ DECLARE(UInt64, prefetch_threadpool_pool_size, 100, "Size of background pool for prefetches for remote object storages", 0) \
DECLARE(UInt64, prefetch_threadpool_queue_size, 1000000, "Number of tasks which is possible to push into prefetches pool", 0) \ DECLARE(UInt64, prefetch_threadpool_queue_size, 1000000, "Number of tasks which is possible to push into prefetches pool", 0) \

View File

@ -1,7 +1,10 @@
#include <Access/AccessControl.h>
#include <Access/User.h>
#include <Interpreters/DDLTask.h> #include <Interpreters/DDLTask.h>
#include <base/sort.h> #include <base/sort.h>
#include <Common/DNSResolver.h> #include <Common/DNSResolver.h>
#include <Common/isLocalAddress.h> #include <Common/isLocalAddress.h>
#include <Core/ServerSettings.h>
#include <Core/Settings.h> #include <Core/Settings.h>
#include <Databases/DatabaseReplicated.h> #include <Databases/DatabaseReplicated.h>
#include <Interpreters/DatabaseCatalog.h> #include <Interpreters/DatabaseCatalog.h>
@ -32,6 +35,11 @@ namespace Setting
extern const SettingsUInt64 max_query_size; extern const SettingsUInt64 max_query_size;
} }
namespace ServerSetting
{
extern const ServerSettingsBool validate_access_consistency_between_instances;
}
namespace ErrorCodes namespace ErrorCodes
{ {
extern const int UNKNOWN_FORMAT_VERSION; extern const int UNKNOWN_FORMAT_VERSION;
@ -147,6 +155,14 @@ String DDLLogEntry::toString() const
wb << "\n"; wb << "\n";
} }
if (version >= INITIATOR_USER_VERSION)
{
wb << "initiator_user: ";
writeEscapedString(initial_query_id, wb);
wb << '\n';
wb << "access_hash: " << access_hash << "\n";
}
return wb.str(); return wb.str();
} }
@ -219,6 +235,14 @@ void DDLLogEntry::parse(const String & data)
rb >> "\n"; rb >> "\n";
} }
if (version >= INITIATOR_USER_VERSION)
{
rb >> "initiator_user: ";
readEscapedString(initiator_user, rb);
rb >> "\n";
rb >> "access_hash: " >> access_hash >> "\n";
}
assertEOF(rb); assertEOF(rb);
if (!host_id_strings.empty()) if (!host_id_strings.empty())
@ -253,8 +277,25 @@ ContextMutablePtr DDLTaskBase::makeQueryContext(ContextPtr from_context, const Z
query_context->makeQueryContext(); query_context->makeQueryContext();
query_context->setCurrentQueryId(""); // generate random query_id query_context->setCurrentQueryId(""); // generate random query_id
query_context->setQueryKind(ClientInfo::QueryKind::SECONDARY_QUERY); query_context->setQueryKind(ClientInfo::QueryKind::SECONDARY_QUERY);
const auto & access_control = from_context->getAccessControl();
const auto user = access_control.tryRead<User>(entry.initiator_user);
if (!user)
{
if (from_context->getServerSettings()[ServerSetting::validate_access_consistency_between_instances])
throw Exception(ErrorCodes::INCONSISTENT_CLUSTER_DEFINITION, "Initiator user is not present on instance.");
LOG_WARNING(getLogger("DDLTask"), "Initiator user is not present on the instance. Will use the global user for the query execution. This is a security vulnerability!");
}
else
{
query_context->setUser(access_control.getID<User>(entry.initiator_user));
if (sipHash64(user->access.toString()) != entry.access_hash && from_context->getServerSettings()[ServerSetting::validate_access_consistency_between_instances])
throw Exception(ErrorCodes::INCONSISTENT_CLUSTER_DEFINITION, "Inconsistent access for the '{}' user on the instance.", user->getName());
}
if (entry.settings) if (entry.settings)
query_context->applySettingsChanges(*entry.settings); query_context->applySettingsChanges(*entry.settings);
return query_context; return query_context;
} }

View File

@ -78,10 +78,11 @@ struct DDLLogEntry
static constexpr const UInt64 PRESERVE_INITIAL_QUERY_ID_VERSION = 5; static constexpr const UInt64 PRESERVE_INITIAL_QUERY_ID_VERSION = 5;
static constexpr const UInt64 BACKUP_RESTORE_FLAG_IN_ZK_VERSION = 6; static constexpr const UInt64 BACKUP_RESTORE_FLAG_IN_ZK_VERSION = 6;
static constexpr const UInt64 PARENT_TABLE_UUID_VERSION = 7; static constexpr const UInt64 PARENT_TABLE_UUID_VERSION = 7;
static constexpr const UInt64 INITIATOR_USER_VERSION = 8;
/// Add new version here /// Add new version here
/// Remember to update the value below once new version is added /// Remember to update the value below once new version is added
static constexpr const UInt64 DDL_ENTRY_FORMAT_MAX_VERSION = 7; static constexpr const UInt64 DDL_ENTRY_FORMAT_MAX_VERSION = 9;
UInt64 version = 1; UInt64 version = 1;
String query; String query;
@ -95,6 +96,9 @@ struct DDLLogEntry
/// Only for DatabaseReplicated. /// Only for DatabaseReplicated.
std::optional<UUID> parent_table_uuid; std::optional<UUID> parent_table_uuid;
String initiator_user;
UInt64 access_hash;
void setSettingsIfRequired(ContextPtr context); void setSettingsIfRequired(ContextPtr context);
String toString() const; String toString() const;
void parse(const String & data); void parse(const String & data);

View File

@ -189,6 +189,8 @@ BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr_, ContextPtr context,
entry.setSettingsIfRequired(context); entry.setSettingsIfRequired(context);
entry.tracing_context = OpenTelemetry::CurrentContext(); entry.tracing_context = OpenTelemetry::CurrentContext();
entry.initial_query_id = context->getClientInfo().initial_query_id; entry.initial_query_id = context->getClientInfo().initial_query_id;
entry.initiator_user = context->getUserName();
entry.access_hash = sipHash64(context->getAccess()->getAccessRights()->toString());
String node_path = ddl_worker.enqueueQuery(entry, params.retries_info, context->getProcessListElement()); String node_path = ddl_worker.enqueueQuery(entry, params.retries_info, context->getProcessListElement());
return getDDLOnClusterStatus(node_path, ddl_worker.getReplicasDir(), entry, context); return getDDLOnClusterStatus(node_path, ddl_worker.getReplicasDir(), entry, context);