ClickHouse/src/Interpreters/Context.cpp
Azat Khuzhin c1dea66907 Move TraceCollector into Interpreters
Since now it relies on SystemLog that is in Interpreters, and it cannot
be moved into Common, since it has lots of dependencies.
2022-01-10 22:35:42 +03:00

3145 lines
104 KiB
C++

#include <map>
#include <set>
#include <optional>
#include <memory>
#include <Poco/Mutex.h>
#include <Poco/UUID.h>
#include <Poco/Net/IPAddress.h>
#include <Poco/Util/Application.h>
#include <Common/Macros.h>
#include <Common/escapeForFileName.h>
#include <Common/setThreadName.h>
#include <Common/Stopwatch.h>
#include <Common/formatReadable.h>
#include <Common/Throttler.h>
#include <Common/thread_local_rng.h>
#include <Common/FieldVisitorToString.h>
#include <Common/getMultipleKeysFromConfig.h>
#include <Coordination/KeeperDispatcher.h>
#include <Compression/ICompressionCodec.h>
#include <Core/BackgroundSchedulePool.h>
#include <Formats/FormatFactory.h>
#include <Databases/IDatabase.h>
#include <Storages/IStorage.h>
#include <Storages/MarkCache.h>
#include <Storages/MergeTree/MergeList.h>
#include <Storages/MergeTree/ReplicatedFetchList.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/MergeTreeSettings.h>
#include <Storages/CompressionCodecSelector.h>
#include <Storages/StorageS3Settings.h>
#include <Disks/DiskLocal.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <Interpreters/ActionLocksManager.h>
#include <Interpreters/ExternalLoaderXMLConfigRepository.h>
#include <Core/Settings.h>
#include <Core/SettingsQuirks.h>
#include <Access/AccessControl.h>
#include <Access/ContextAccess.h>
#include <Access/EnabledRolesInfo.h>
#include <Access/EnabledRowPolicies.h>
#include <Access/QuotaUsage.h>
#include <Access/User.h>
#include <Access/Credentials.h>
#include <Access/SettingsProfile.h>
#include <Access/SettingsProfilesInfo.h>
#include <Access/SettingsConstraintsAndProfileIDs.h>
#include <Access/ExternalAuthenticators.h>
#include <Access/GSSAcceptor.h>
#include <Backups/BackupFactory.h>
#include <Dictionaries/Embedded/GeoDictionariesLoader.h>
#include <Interpreters/EmbeddedDictionaries.h>
#include <Interpreters/ExternalDictionariesLoader.h>
#include <Interpreters/ExternalUserDefinedExecutableFunctionsLoader.h>
#include <Interpreters/ExternalModelsLoader.h>
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/ProcessList.h>
#include <Interpreters/InterserverCredentials.h>
#include <Interpreters/Cluster.h>
#include <Interpreters/InterserverIOHandler.h>
#include <Interpreters/SystemLog.h>
#include <Interpreters/SessionLog.h>
#include <Interpreters/Context.h>
#include <Interpreters/DDLWorker.h>
#include <Interpreters/DDLTask.h>
#include <Interpreters/Session.h>
#include <Interpreters/TraceCollector.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/UncompressedCache.h>
#include <IO/MMappedFileCache.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ParserCreateQuery.h>
#include <Parsers/parseQuery.h>
#include <Common/StackTrace.h>
#include <Common/Config/ConfigProcessor.h>
#include <Common/Config/AbstractConfigurationComparison.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/ShellCommand.h>
#include <base/logger_useful.h>
#include <base/EnumReflection.h>
#include <Common/RemoteHostFilter.h>
#include <Interpreters/AsynchronousInsertQueue.h>
#include <Interpreters/DatabaseCatalog.h>
#include <Interpreters/JIT/CompiledExpressionCache.h>
#include <Storages/MergeTree/BackgroundJobsAssignee.h>
#include <Storages/MergeTree/MergeTreeBackgroundExecutor.h>
#include <Storages/MergeTree/MergeTreeDataPartUUID.h>
#include <Interpreters/SynonymsExtensions.h>
#include <Interpreters/Lemmatizers.h>
#include <Interpreters/ClusterDiscovery.h>
#include <filesystem>
namespace fs = std::filesystem;
namespace ProfileEvents
{
extern const Event ContextLock;
}
namespace CurrentMetrics
{
extern const Metric ContextLockWait;
extern const Metric BackgroundMovePoolTask;
extern const Metric BackgroundSchedulePoolTask;
extern const Metric BackgroundBufferFlushSchedulePoolTask;
extern const Metric BackgroundDistributedSchedulePoolTask;
extern const Metric BackgroundMessageBrokerSchedulePoolTask;
extern const Metric BackgroundMergesAndMutationsPoolTask;
extern const Metric BackgroundFetchesPoolTask;
extern const Metric BackgroundCommonPoolTask;
}
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int BAD_GET;
extern const int UNKNOWN_DATABASE;
extern const int UNKNOWN_TABLE;
extern const int TABLE_ALREADY_EXISTS;
extern const int THERE_IS_NO_SESSION;
extern const int THERE_IS_NO_QUERY;
extern const int NO_ELEMENTS_IN_CONFIG;
extern const int TABLE_SIZE_EXCEEDS_MAX_DROP_SIZE_LIMIT;
extern const int LOGICAL_ERROR;
extern const int INVALID_SETTING_VALUE;
extern const int UNKNOWN_READ_METHOD;
}
/** Set of known objects (environment), that could be used in query.
* Shared (global) part. Order of members (especially, order of destruction) is very important.
*/
struct ContextSharedPart
{
Poco::Logger * log = &Poco::Logger::get("Context");
/// For access of most of shared objects. Recursive mutex.
mutable std::recursive_mutex mutex;
/// Separate mutex for access of dictionaries. Separate mutex to avoid locks when server doing request to itself.
mutable std::mutex embedded_dictionaries_mutex;
mutable std::mutex external_dictionaries_mutex;
mutable std::mutex external_user_defined_executable_functions_mutex;
mutable std::mutex external_models_mutex;
/// Separate mutex for storage policies. During server startup we may
/// initialize some important storages (system logs with MergeTree engine)
/// under context lock.
mutable std::mutex storage_policies_mutex;
/// Separate mutex for re-initialization of zookeeper session. This operation could take a long time and must not interfere with another operations.
mutable std::mutex zookeeper_mutex;
mutable zkutil::ZooKeeperPtr zookeeper; /// Client for ZooKeeper.
ConfigurationPtr zookeeper_config; /// Stores zookeeper configs
#if USE_NURAFT
mutable std::mutex keeper_dispatcher_mutex;
mutable std::shared_ptr<KeeperDispatcher> keeper_dispatcher;
#endif
mutable std::mutex auxiliary_zookeepers_mutex;
mutable std::map<String, zkutil::ZooKeeperPtr> auxiliary_zookeepers; /// Map for auxiliary ZooKeeper clients.
ConfigurationPtr auxiliary_zookeepers_config; /// Stores auxiliary zookeepers configs
String interserver_io_host; /// The host name by which this server is available for other servers.
UInt16 interserver_io_port = 0; /// and port.
String interserver_scheme; /// http or https
MultiVersion<InterserverCredentials> interserver_io_credentials;
String path; /// Path to the data directory, with a slash at the end.
String flags_path; /// Path to the directory with some control flags for server maintenance.
String user_files_path; /// Path to the directory with user provided files, usable by 'file' table function.
String dictionaries_lib_path; /// Path to the directory with user provided binaries and libraries for external dictionaries.
String user_scripts_path; /// Path to the directory with user provided scripts.
ConfigurationPtr config; /// Global configuration settings.
String tmp_path; /// Path to the temporary files that occur when processing the request.
mutable VolumePtr tmp_volume; /// Volume for the the temporary files that occur when processing the request.
mutable VolumePtr backups_volume; /// Volume for all the backups.
mutable std::optional<EmbeddedDictionaries> embedded_dictionaries; /// Metrica's dictionaries. Have lazy initialization.
mutable std::optional<ExternalDictionariesLoader> external_dictionaries_loader;
mutable std::optional<ExternalUserDefinedExecutableFunctionsLoader> external_user_defined_executable_functions_loader;
mutable std::optional<ExternalModelsLoader> external_models_loader;
ExternalLoaderXMLConfigRepository * external_models_config_repository = nullptr;
scope_guard models_repository_guard;
ExternalLoaderXMLConfigRepository * external_dictionaries_config_repository = nullptr;
scope_guard dictionaries_xmls;
ExternalLoaderXMLConfigRepository * user_defined_executable_functions_config_repository = nullptr;
scope_guard user_defined_executable_functions_xmls;
#if USE_NLP
mutable std::optional<SynonymsExtensions> synonyms_extensions;
mutable std::optional<Lemmatizers> lemmatizers;
#endif
String default_profile_name; /// Default profile name used for default values.
String system_profile_name; /// Profile used by system processes
String buffer_profile_name; /// Profile used by Buffer engine for flushing to the underlying
std::unique_ptr<AccessControl> access_control;
mutable UncompressedCachePtr uncompressed_cache; /// The cache of decompressed blocks.
mutable MarkCachePtr mark_cache; /// Cache of marks in compressed files.
mutable UncompressedCachePtr index_uncompressed_cache; /// The cache of decompressed blocks for MergeTree indices.
mutable MarkCachePtr index_mark_cache; /// Cache of marks in compressed files of MergeTree indices.
mutable MMappedFileCachePtr mmap_cache; /// Cache of mmapped files to avoid frequent open/map/unmap/close and to reuse from several threads.
ProcessList process_list; /// Executing queries at the moment.
MergeList merge_list; /// The list of executable merge (for (Replicated)?MergeTree)
ReplicatedFetchList replicated_fetch_list;
ConfigurationPtr users_config; /// Config with the users, profiles and quotas sections.
InterserverIOHandler interserver_io_handler; /// Handler for interserver communication.
mutable std::optional<BackgroundSchedulePool> buffer_flush_schedule_pool; /// A thread pool that can do background flush for Buffer tables.
mutable std::optional<BackgroundSchedulePool> schedule_pool; /// A thread pool that can run different jobs in background (used in replicated tables)
mutable std::optional<BackgroundSchedulePool> distributed_schedule_pool; /// A thread pool that can run different jobs in background (used for distributed sends)
mutable std::optional<BackgroundSchedulePool> message_broker_schedule_pool; /// A thread pool that can run different jobs in background (used for message brokers, like RabbitMQ and Kafka)
mutable ThrottlerPtr replicated_fetches_throttler; /// A server-wide throttler for replicated fetches
mutable ThrottlerPtr replicated_sends_throttler; /// A server-wide throttler for replicated sends
MultiVersion<Macros> macros; /// Substitutions extracted from config.
std::unique_ptr<DDLWorker> ddl_worker; /// Process ddl commands from zk.
/// Rules for selecting the compression settings, depending on the size of the part.
mutable std::unique_ptr<CompressionCodecSelector> compression_codec_selector;
/// Storage disk chooser for MergeTree engines
mutable std::shared_ptr<const DiskSelector> merge_tree_disk_selector;
/// Storage policy chooser for MergeTree engines
mutable std::shared_ptr<const StoragePolicySelector> merge_tree_storage_policy_selector;
std::optional<MergeTreeSettings> merge_tree_settings; /// Settings of MergeTree* engines.
std::optional<MergeTreeSettings> replicated_merge_tree_settings; /// Settings of ReplicatedMergeTree* engines.
std::atomic_size_t max_table_size_to_drop = 50000000000lu; /// Protects MergeTree tables from accidental DROP (50GB by default)
std::atomic_size_t max_partition_size_to_drop = 50000000000lu; /// Protects MergeTree partitions from accidental DROP (50GB by default)
String format_schema_path; /// Path to a directory that contains schema files used by input formats.
ActionLocksManagerPtr action_locks_manager; /// Set of storages' action lockers
std::unique_ptr<SystemLogs> system_logs; /// Used to log queries and operations on parts
std::optional<StorageS3Settings> storage_s3_settings; /// Settings of S3 storage
std::vector<String> warnings; /// Store warning messages about server configuration.
/// Background executors for *MergeTree tables
MergeMutateBackgroundExecutorPtr merge_mutate_executor;
OrdinaryBackgroundExecutorPtr moves_executor;
OrdinaryBackgroundExecutorPtr fetch_executor;
OrdinaryBackgroundExecutorPtr common_executor;
RemoteHostFilter remote_host_filter; /// Allowed URL from config.xml
std::optional<TraceCollector> trace_collector; /// Thread collecting traces from threads executing queries
/// Clusters for distributed tables
/// Initialized on demand (on distributed storages initialization) since Settings should be initialized
std::shared_ptr<Clusters> clusters;
ConfigurationPtr clusters_config; /// Stores updated configs
mutable std::mutex clusters_mutex; /// Guards clusters and clusters_config
std::unique_ptr<ClusterDiscovery> cluster_discovery;
std::shared_ptr<AsynchronousInsertQueue> async_insert_queue;
std::map<String, UInt16> server_ports;
bool shutdown_called = false;
/// Has background executors for MergeTree tables been initialized?
bool is_background_executors_initialized = false;
Stopwatch uptime_watch;
Context::ApplicationType application_type = Context::ApplicationType::SERVER;
/// vector of xdbc-bridge commands, they will be killed when Context will be destroyed
std::vector<std::unique_ptr<ShellCommand>> bridge_commands;
Context::ConfigReloadCallback config_reload_callback;
ContextSharedPart()
: access_control(std::make_unique<AccessControl>()), macros(std::make_unique<Macros>())
{
/// TODO: make it singleton (?)
static std::atomic<size_t> num_calls{0};
if (++num_calls > 1)
{
std::cerr << "Attempting to create multiple ContextShared instances. Stack trace:\n" << StackTrace().toString();
std::cerr.flush();
std::terminate();
}
}
~ContextSharedPart()
{
try
{
shutdown();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
/** Perform a complex job of destroying objects in advance.
*/
void shutdown()
{
if (shutdown_called)
return;
shutdown_called = true;
/// Stop periodic reloading of the configuration files.
/// This must be done first because otherwise the reloading may pass a changed config
/// to some destroyed parts of ContextSharedPart.
if (access_control)
access_control->stopPeriodicReloadingUsersConfigs();
if (external_dictionaries_loader)
external_dictionaries_loader->enablePeriodicUpdates(false);
if (external_user_defined_executable_functions_loader)
external_user_defined_executable_functions_loader->enablePeriodicUpdates(false);
if (external_models_loader)
external_models_loader->enablePeriodicUpdates(false);
Session::shutdownNamedSessions();
/** After system_logs have been shut down it is guaranteed that no system table gets created or written to.
* Note that part changes at shutdown won't be logged to part log.
*/
if (system_logs)
system_logs->shutdown();
DatabaseCatalog::shutdown();
if (merge_mutate_executor)
merge_mutate_executor->wait();
if (fetch_executor)
fetch_executor->wait();
if (moves_executor)
moves_executor->wait();
if (common_executor)
common_executor->wait();
std::unique_ptr<SystemLogs> delete_system_logs;
{
auto lock = std::lock_guard(mutex);
/** Compiled expressions stored in cache need to be destroyed before destruction of static objects.
* Because CHJIT instance can be static object.
*/
#if USE_EMBEDDED_COMPILER
if (auto * cache = CompiledExpressionCacheFactory::instance().tryGetCache())
cache->reset();
#endif
/// Preemptive destruction is important, because these objects may have a refcount to ContextShared (cyclic reference).
/// TODO: Get rid of this.
/// Dictionaries may be required:
/// - for storage shutdown (during final flush of the Buffer engine)
/// - before storage startup (because of some streaming of, i.e. Kafka, to
/// the table with materialized column that has dictGet)
///
/// So they should be created before any storages and preserved until storages will be terminated.
///
/// But they cannot be created before storages since they may required table as a source,
/// but at least they can be preserved for storage termination.
dictionaries_xmls.reset();
user_defined_executable_functions_xmls.reset();
delete_system_logs = std::move(system_logs);
embedded_dictionaries.reset();
external_dictionaries_loader.reset();
external_user_defined_executable_functions_loader.reset();
models_repository_guard.reset();
external_models_loader.reset();
buffer_flush_schedule_pool.reset();
schedule_pool.reset();
distributed_schedule_pool.reset();
message_broker_schedule_pool.reset();
ddl_worker.reset();
access_control.reset();
/// Stop trace collector if any
trace_collector.reset();
/// Stop zookeeper connection
zookeeper.reset();
}
/// Can be removed w/o context lock
delete_system_logs.reset();
}
bool hasTraceCollector() const
{
return trace_collector.has_value();
}
void initializeTraceCollector(std::shared_ptr<TraceLog> trace_log)
{
if (!trace_log)
return;
if (hasTraceCollector())
return;
trace_collector.emplace(std::move(trace_log));
}
void addWarningMessage(const String & message)
{
/// A warning goes both: into server's log; stored to be placed in `system.warnings` table.
log->warning(message);
warnings.push_back(message);
}
};
Context::Context() = default;
Context::Context(const Context &) = default;
Context & Context::operator=(const Context &) = default;
SharedContextHolder::SharedContextHolder(SharedContextHolder &&) noexcept = default;
SharedContextHolder & SharedContextHolder::operator=(SharedContextHolder &&) = default;
SharedContextHolder::SharedContextHolder() = default;
SharedContextHolder::~SharedContextHolder() = default;
SharedContextHolder::SharedContextHolder(std::unique_ptr<ContextSharedPart> shared_context)
: shared(std::move(shared_context)) {}
void SharedContextHolder::reset() { shared.reset(); }
ContextMutablePtr Context::createGlobal(ContextSharedPart * shared)
{
auto res = std::shared_ptr<Context>(new Context);
res->shared = shared;
return res;
}
void Context::initGlobal()
{
DatabaseCatalog::init(shared_from_this());
}
SharedContextHolder Context::createShared()
{
return SharedContextHolder(std::make_unique<ContextSharedPart>());
}
ContextMutablePtr Context::createCopy(const ContextPtr & other)
{
return std::shared_ptr<Context>(new Context(*other));
}
ContextMutablePtr Context::createCopy(const ContextWeakPtr & other)
{
auto ptr = other.lock();
if (!ptr) throw Exception("Can't copy an expired context", ErrorCodes::LOGICAL_ERROR);
return createCopy(ptr);
}
ContextMutablePtr Context::createCopy(const ContextMutablePtr & other)
{
return createCopy(std::const_pointer_cast<const Context>(other));
}
Context::~Context() = default;
InterserverIOHandler & Context::getInterserverIOHandler() { return shared->interserver_io_handler; }
std::unique_lock<std::recursive_mutex> Context::getLock() const
{
ProfileEvents::increment(ProfileEvents::ContextLock);
CurrentMetrics::Increment increment{CurrentMetrics::ContextLockWait};
return std::unique_lock(shared->mutex);
}
ProcessList & Context::getProcessList() { return shared->process_list; }
const ProcessList & Context::getProcessList() const { return shared->process_list; }
MergeList & Context::getMergeList() { return shared->merge_list; }
const MergeList & Context::getMergeList() const { return shared->merge_list; }
ReplicatedFetchList & Context::getReplicatedFetchList() { return shared->replicated_fetch_list; }
const ReplicatedFetchList & Context::getReplicatedFetchList() const { return shared->replicated_fetch_list; }
String Context::resolveDatabase(const String & database_name) const
{
String res = database_name.empty() ? getCurrentDatabase() : database_name;
if (res.empty())
throw Exception("Default database is not selected", ErrorCodes::UNKNOWN_DATABASE);
return res;
}
String Context::getPath() const
{
auto lock = getLock();
return shared->path;
}
String Context::getFlagsPath() const
{
auto lock = getLock();
return shared->flags_path;
}
String Context::getUserFilesPath() const
{
auto lock = getLock();
return shared->user_files_path;
}
String Context::getDictionariesLibPath() const
{
auto lock = getLock();
return shared->dictionaries_lib_path;
}
String Context::getUserScriptsPath() const
{
auto lock = getLock();
return shared->user_scripts_path;
}
Strings Context::getWarnings() const
{
Strings common_warnings;
{
auto lock = getLock();
common_warnings = shared->warnings;
}
for (const auto & setting : settings)
{
if (setting.isValueChanged() && setting.isObsolete())
{
common_warnings.emplace_back("Some obsolete setting is changed. "
"Check 'select * from system.settings where changed' and read the changelog.");
break;
}
}
return common_warnings;
}
VolumePtr Context::getTemporaryVolume() const
{
auto lock = getLock();
return shared->tmp_volume;
}
void Context::setPath(const String & path)
{
auto lock = getLock();
shared->path = path;
if (shared->tmp_path.empty() && !shared->tmp_volume)
shared->tmp_path = shared->path + "tmp/";
if (shared->flags_path.empty())
shared->flags_path = shared->path + "flags/";
if (shared->user_files_path.empty())
shared->user_files_path = shared->path + "user_files/";
if (shared->dictionaries_lib_path.empty())
shared->dictionaries_lib_path = shared->path + "dictionaries_lib/";
if (shared->user_scripts_path.empty())
shared->user_scripts_path = shared->path + "user_scripts/";
}
VolumePtr Context::setTemporaryStorage(const String & path, const String & policy_name)
{
std::lock_guard lock(shared->storage_policies_mutex);
if (policy_name.empty())
{
shared->tmp_path = path;
if (!shared->tmp_path.ends_with('/'))
shared->tmp_path += '/';
auto disk = std::make_shared<DiskLocal>("_tmp_default", shared->tmp_path, 0);
shared->tmp_volume = std::make_shared<SingleDiskVolume>("_tmp_default", disk, 0);
}
else
{
StoragePolicyPtr tmp_policy = getStoragePolicySelector(lock)->get(policy_name);
if (tmp_policy->getVolumes().size() != 1)
throw Exception("Policy " + policy_name + " is used temporary files, such policy should have exactly one volume",
ErrorCodes::NO_ELEMENTS_IN_CONFIG);
shared->tmp_volume = tmp_policy->getVolume(0);
}
if (shared->tmp_volume->getDisks().empty())
throw Exception("No disks volume for temporary files", ErrorCodes::NO_ELEMENTS_IN_CONFIG);
return shared->tmp_volume;
}
void Context::setFlagsPath(const String & path)
{
auto lock = getLock();
shared->flags_path = path;
}
void Context::setUserFilesPath(const String & path)
{
auto lock = getLock();
shared->user_files_path = path;
}
void Context::setDictionariesLibPath(const String & path)
{
auto lock = getLock();
shared->dictionaries_lib_path = path;
}
void Context::setUserScriptsPath(const String & path)
{
auto lock = getLock();
shared->user_scripts_path = path;
}
void Context::addWarningMessage(const String & msg)
{
auto lock = getLock();
shared->addWarningMessage(msg);
}
void Context::setConfig(const ConfigurationPtr & config)
{
auto lock = getLock();
shared->config = config;
shared->access_control->setExternalAuthenticatorsConfig(*shared->config);
}
const Poco::Util::AbstractConfiguration & Context::getConfigRef() const
{
auto lock = getLock();
return shared->config ? *shared->config : Poco::Util::Application::instance().config();
}
AccessControl & Context::getAccessControl()
{
return *shared->access_control;
}
const AccessControl & Context::getAccessControl() const
{
return *shared->access_control;
}
void Context::setExternalAuthenticatorsConfig(const Poco::Util::AbstractConfiguration & config)
{
auto lock = getLock();
shared->access_control->setExternalAuthenticatorsConfig(config);
}
std::unique_ptr<GSSAcceptorContext> Context::makeGSSAcceptorContext() const
{
auto lock = getLock();
return std::make_unique<GSSAcceptorContext>(shared->access_control->getExternalAuthenticators().getKerberosParams());
}
void Context::setUsersConfig(const ConfigurationPtr & config)
{
auto lock = getLock();
shared->users_config = config;
shared->access_control->setUsersConfig(*shared->users_config);
}
ConfigurationPtr Context::getUsersConfig()
{
auto lock = getLock();
return shared->users_config;
}
void Context::setUser(const UUID & user_id_)
{
auto lock = getLock();
user_id = user_id_;
access = getAccessControl().getContextAccess(
user_id_, /* current_roles = */ {}, /* use_default_roles = */ true, settings, current_database, client_info);
auto user = access->getUser();
current_roles = std::make_shared<std::vector<UUID>>(user->granted_roles.findGranted(user->default_roles));
auto default_profile_info = access->getDefaultProfileInfo();
settings_constraints_and_current_profiles = default_profile_info->getConstraintsAndProfileIDs();
applySettingsChanges(default_profile_info->settings);
if (!user->default_database.empty())
setCurrentDatabase(user->default_database);
}
std::shared_ptr<const User> Context::getUser() const
{
return getAccess()->getUser();
}
String Context::getUserName() const
{
return getAccess()->getUserName();
}
std::optional<UUID> Context::getUserID() const
{
auto lock = getLock();
return user_id;
}
void Context::setQuotaKey(String quota_key_)
{
auto lock = getLock();
client_info.quota_key = std::move(quota_key_);
}
void Context::setCurrentRoles(const std::vector<UUID> & current_roles_)
{
auto lock = getLock();
if (current_roles ? (*current_roles == current_roles_) : current_roles_.empty())
return;
current_roles = std::make_shared<std::vector<UUID>>(current_roles_);
calculateAccessRights();
}
void Context::setCurrentRolesDefault()
{
auto user = getUser();
setCurrentRoles(user->granted_roles.findGranted(user->default_roles));
}
boost::container::flat_set<UUID> Context::getCurrentRoles() const
{
return getRolesInfo()->current_roles;
}
boost::container::flat_set<UUID> Context::getEnabledRoles() const
{
return getRolesInfo()->enabled_roles;
}
std::shared_ptr<const EnabledRolesInfo> Context::getRolesInfo() const
{
return getAccess()->getRolesInfo();
}
void Context::calculateAccessRights()
{
auto lock = getLock();
if (user_id)
access = getAccessControl().getContextAccess(
*user_id,
current_roles ? *current_roles : std::vector<UUID>{},
/* use_default_roles = */ false,
settings,
current_database,
client_info);
}
template <typename... Args>
void Context::checkAccessImpl(const Args &... args) const
{
return getAccess()->checkAccess(args...);
}
void Context::checkAccess(const AccessFlags & flags) const { return checkAccessImpl(flags); }
void Context::checkAccess(const AccessFlags & flags, const std::string_view & database) const { return checkAccessImpl(flags, database); }
void Context::checkAccess(const AccessFlags & flags, const std::string_view & database, const std::string_view & table) const { return checkAccessImpl(flags, database, table); }
void Context::checkAccess(const AccessFlags & flags, const std::string_view & database, const std::string_view & table, const std::string_view & column) const { return checkAccessImpl(flags, database, table, column); }
void Context::checkAccess(const AccessFlags & flags, const std::string_view & database, const std::string_view & table, const std::vector<std::string_view> & columns) const { return checkAccessImpl(flags, database, table, columns); }
void Context::checkAccess(const AccessFlags & flags, const std::string_view & database, const std::string_view & table, const Strings & columns) const { return checkAccessImpl(flags, database, table, columns); }
void Context::checkAccess(const AccessFlags & flags, const StorageID & table_id) const { checkAccessImpl(flags, table_id.getDatabaseName(), table_id.getTableName()); }
void Context::checkAccess(const AccessFlags & flags, const StorageID & table_id, const std::string_view & column) const { checkAccessImpl(flags, table_id.getDatabaseName(), table_id.getTableName(), column); }
void Context::checkAccess(const AccessFlags & flags, const StorageID & table_id, const std::vector<std::string_view> & columns) const { checkAccessImpl(flags, table_id.getDatabaseName(), table_id.getTableName(), columns); }
void Context::checkAccess(const AccessFlags & flags, const StorageID & table_id, const Strings & columns) const { checkAccessImpl(flags, table_id.getDatabaseName(), table_id.getTableName(), columns); }
void Context::checkAccess(const AccessRightsElement & element) const { return checkAccessImpl(element); }
void Context::checkAccess(const AccessRightsElements & elements) const { return checkAccessImpl(elements); }
std::shared_ptr<const ContextAccess> Context::getAccess() const
{
auto lock = getLock();
return access ? access : ContextAccess::getFullAccess();
}
ASTPtr Context::getRowPolicyFilter(const String & database, const String & table_name, RowPolicyFilterType filter_type) const
{
auto lock = getLock();
auto row_filter_of_initial_user = row_policies_of_initial_user ? row_policies_of_initial_user->getFilter(database, table_name, filter_type) : nullptr;
return getAccess()->getRowPolicyFilter(database, table_name, filter_type, row_filter_of_initial_user);
}
void Context::enableRowPoliciesOfInitialUser()
{
auto lock = getLock();
row_policies_of_initial_user = nullptr;
if (client_info.initial_user == client_info.current_user)
return;
auto initial_user_id = getAccessControl().find<User>(client_info.initial_user);
if (!initial_user_id)
return;
row_policies_of_initial_user = getAccessControl().tryGetDefaultRowPolicies(*initial_user_id);
}
std::shared_ptr<const EnabledQuota> Context::getQuota() const
{
return getAccess()->getQuota();
}
std::optional<QuotaUsage> Context::getQuotaUsage() const
{
return getAccess()->getQuotaUsage();
}
void Context::setCurrentProfile(const String & profile_name)
{
auto lock = getLock();
try
{
UUID profile_id = getAccessControl().getID<SettingsProfile>(profile_name);
setCurrentProfile(profile_id);
}
catch (Exception & e)
{
e.addMessage(", while trying to set settings profile {}", profile_name);
throw;
}
}
void Context::setCurrentProfile(const UUID & profile_id)
{
auto lock = getLock();
auto profile_info = getAccessControl().getSettingsProfileInfo(profile_id);
checkSettingsConstraints(profile_info->settings);
applySettingsChanges(profile_info->settings);
settings_constraints_and_current_profiles = profile_info->getConstraintsAndProfileIDs(settings_constraints_and_current_profiles);
}
std::vector<UUID> Context::getCurrentProfiles() const
{
auto lock = getLock();
return settings_constraints_and_current_profiles->current_profiles;
}
std::vector<UUID> Context::getEnabledProfiles() const
{
auto lock = getLock();
return settings_constraints_and_current_profiles->enabled_profiles;
}
const Scalars & Context::getScalars() const
{
return scalars;
}
const Block & Context::getScalar(const String & name) const
{
auto it = scalars.find(name);
if (scalars.end() == it)
{
// This should be a logical error, but it fails the sql_fuzz test too
// often, so 'bad arguments' for now.
throw Exception("Scalar " + backQuoteIfNeed(name) + " doesn't exist (internal bug)", ErrorCodes::BAD_ARGUMENTS);
}
return it->second;
}
const Block * Context::tryGetLocalScalar(const String & name) const
{
auto it = local_scalars.find(name);
if (local_scalars.end() == it)
return nullptr;
return &it->second;
}
Tables Context::getExternalTables() const
{
if (isGlobalContext())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Global context cannot have external tables");
auto lock = getLock();
Tables res;
for (const auto & table : external_tables_mapping)
res[table.first] = table.second->getTable();
auto query_context_ptr = query_context.lock();
auto session_context_ptr = session_context.lock();
if (query_context_ptr && query_context_ptr.get() != this)
{
Tables buf = query_context_ptr->getExternalTables();
res.insert(buf.begin(), buf.end());
}
else if (session_context_ptr && session_context_ptr.get() != this)
{
Tables buf = session_context_ptr->getExternalTables();
res.insert(buf.begin(), buf.end());
}
return res;
}
void Context::addExternalTable(const String & table_name, TemporaryTableHolder && temporary_table)
{
if (isGlobalContext())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Global context cannot have external tables");
auto lock = getLock();
if (external_tables_mapping.end() != external_tables_mapping.find(table_name))
throw Exception("Temporary table " + backQuoteIfNeed(table_name) + " already exists.", ErrorCodes::TABLE_ALREADY_EXISTS);
external_tables_mapping.emplace(table_name, std::make_shared<TemporaryTableHolder>(std::move(temporary_table)));
}
std::shared_ptr<TemporaryTableHolder> Context::removeExternalTable(const String & table_name)
{
if (isGlobalContext())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Global context cannot have external tables");
std::shared_ptr<TemporaryTableHolder> holder;
{
auto lock = getLock();
auto iter = external_tables_mapping.find(table_name);
if (iter == external_tables_mapping.end())
return {};
holder = iter->second;
external_tables_mapping.erase(iter);
}
return holder;
}
void Context::addScalar(const String & name, const Block & block)
{
if (isGlobalContext())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Global context cannot have scalars");
scalars[name] = block;
}
void Context::addLocalScalar(const String & name, const Block & block)
{
if (isGlobalContext())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Global context cannot have local scalars");
local_scalars[name] = block;
}
bool Context::hasScalar(const String & name) const
{
if (isGlobalContext())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Global context cannot have scalars");
return scalars.count(name);
}
void Context::addQueryAccessInfo(
const String & quoted_database_name,
const String & full_quoted_table_name,
const Names & column_names,
const String & projection_name,
const String & view_name)
{
if (isGlobalContext())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Global context cannot have query access info");
std::lock_guard<std::mutex> lock(query_access_info.mutex);
query_access_info.databases.emplace(quoted_database_name);
query_access_info.tables.emplace(full_quoted_table_name);
for (const auto & column_name : column_names)
query_access_info.columns.emplace(full_quoted_table_name + "." + backQuoteIfNeed(column_name));
if (!projection_name.empty())
query_access_info.projections.emplace(full_quoted_table_name + "." + backQuoteIfNeed(projection_name));
if (!view_name.empty())
query_access_info.views.emplace(view_name);
}
void Context::addQueryFactoriesInfo(QueryLogFactories factory_type, const String & created_object) const
{
if (isGlobalContext())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Global context cannot have query factories info");
auto lock = getLock();
switch (factory_type)
{
case QueryLogFactories::AggregateFunction:
query_factories_info.aggregate_functions.emplace(created_object);
break;
case QueryLogFactories::AggregateFunctionCombinator:
query_factories_info.aggregate_function_combinators.emplace(created_object);
break;
case QueryLogFactories::Database:
query_factories_info.database_engines.emplace(created_object);
break;
case QueryLogFactories::DataType:
query_factories_info.data_type_families.emplace(created_object);
break;
case QueryLogFactories::Dictionary:
query_factories_info.dictionaries.emplace(created_object);
break;
case QueryLogFactories::Format:
query_factories_info.formats.emplace(created_object);
break;
case QueryLogFactories::Function:
query_factories_info.functions.emplace(created_object);
break;
case QueryLogFactories::Storage:
query_factories_info.storages.emplace(created_object);
break;
case QueryLogFactories::TableFunction:
query_factories_info.table_functions.emplace(created_object);
}
}
StoragePtr Context::executeTableFunction(const ASTPtr & table_expression)
{
auto hash = table_expression->getTreeHash();
String key = toString(hash.first) + '_' + toString(hash.second);
StoragePtr & res = table_function_results[key];
if (!res)
{
TableFunctionPtr table_function_ptr = TableFunctionFactory::instance().get(table_expression, shared_from_this());
res = table_function_ptr->execute(table_expression, shared_from_this(), table_function_ptr->getName());
/// Since ITableFunction::parseArguments() may change table_expression, i.e.:
///
/// remote('127.1', system.one) -> remote('127.1', 'system.one'),
///
auto new_hash = table_expression->getTreeHash();
if (hash != new_hash)
{
key = toString(new_hash.first) + '_' + toString(new_hash.second);
table_function_results[key] = res;
}
return res;
}
return res;
}
void Context::addViewSource(const StoragePtr & storage)
{
if (view_source)
throw Exception(
"Temporary view source storage " + backQuoteIfNeed(view_source->getName()) + " already exists.", ErrorCodes::TABLE_ALREADY_EXISTS);
view_source = storage;
}
StoragePtr Context::getViewSource() const
{
return view_source;
}
Settings Context::getSettings() const
{
auto lock = getLock();
return settings;
}
void Context::setSettings(const Settings & settings_)
{
auto lock = getLock();
auto old_readonly = settings.readonly;
auto old_allow_ddl = settings.allow_ddl;
auto old_allow_introspection_functions = settings.allow_introspection_functions;
settings = settings_;
if ((settings.readonly != old_readonly) || (settings.allow_ddl != old_allow_ddl) || (settings.allow_introspection_functions != old_allow_introspection_functions))
calculateAccessRights();
}
void Context::setSetting(const StringRef & name, const String & value)
{
auto lock = getLock();
if (name == "profile")
{
setCurrentProfile(value);
return;
}
settings.set(std::string_view{name}, value);
if (name == "readonly" || name == "allow_ddl" || name == "allow_introspection_functions")
calculateAccessRights();
}
void Context::setSetting(const StringRef & name, const Field & value)
{
auto lock = getLock();
if (name == "profile")
{
setCurrentProfile(value.safeGet<String>());
return;
}
settings.set(std::string_view{name}, value);
if (name == "readonly" || name == "allow_ddl" || name == "allow_introspection_functions")
calculateAccessRights();
}
void Context::applySettingChange(const SettingChange & change)
{
try
{
setSetting(change.name, change.value);
}
catch (Exception & e)
{
e.addMessage(fmt::format("in attempt to set the value of setting '{}' to {}",
change.name, applyVisitor(FieldVisitorToString(), change.value)));
throw;
}
}
void Context::applySettingsChanges(const SettingsChanges & changes)
{
auto lock = getLock();
for (const SettingChange & change : changes)
applySettingChange(change);
applySettingsQuirks(settings);
}
void Context::checkSettingsConstraints(const SettingChange & change) const
{
getSettingsConstraintsAndCurrentProfiles()->constraints.check(settings, change);
}
void Context::checkSettingsConstraints(const SettingsChanges & changes) const
{
getSettingsConstraintsAndCurrentProfiles()->constraints.check(settings, changes);
}
void Context::checkSettingsConstraints(SettingsChanges & changes) const
{
getSettingsConstraintsAndCurrentProfiles()->constraints.check(settings, changes);
}
void Context::clampToSettingsConstraints(SettingsChanges & changes) const
{
getSettingsConstraintsAndCurrentProfiles()->constraints.clamp(settings, changes);
}
std::shared_ptr<const SettingsConstraintsAndProfileIDs> Context::getSettingsConstraintsAndCurrentProfiles() const
{
auto lock = getLock();
if (settings_constraints_and_current_profiles)
return settings_constraints_and_current_profiles;
static auto no_constraints_or_profiles = std::make_shared<SettingsConstraintsAndProfileIDs>(getAccessControl());
return no_constraints_or_profiles;
}
String Context::getCurrentDatabase() const
{
auto lock = getLock();
return current_database;
}
String Context::getInitialQueryId() const
{
return client_info.initial_query_id;
}
void Context::setCurrentDatabaseNameInGlobalContext(const String & name)
{
if (!isGlobalContext())
throw Exception("Cannot set current database for non global context, this method should be used during server initialization",
ErrorCodes::LOGICAL_ERROR);
auto lock = getLock();
if (!current_database.empty())
throw Exception("Default database name cannot be changed in global context without server restart",
ErrorCodes::LOGICAL_ERROR);
current_database = name;
}
void Context::setCurrentDatabase(const String & name)
{
DatabaseCatalog::instance().assertDatabaseExists(name);
auto lock = getLock();
current_database = name;
calculateAccessRights();
}
void Context::setCurrentQueryId(const String & query_id)
{
/// Generate random UUID, but using lower quality RNG,
/// because Poco::UUIDGenerator::generateRandom method is using /dev/random, that is very expensive.
/// NOTE: Actually we don't need to use UUIDs for query identifiers.
/// We could use any suitable string instead.
union
{
char bytes[16];
struct
{
UInt64 a;
UInt64 b;
} words;
UUID uuid{};
} random;
random.words.a = thread_local_rng(); //-V656
random.words.b = thread_local_rng(); //-V656
if (client_info.client_trace_context.trace_id != UUID())
{
// Use the OpenTelemetry trace context we received from the client, and
// create a new span for the query.
query_trace_context = client_info.client_trace_context;
query_trace_context.span_id = thread_local_rng();
}
else if (client_info.query_kind == ClientInfo::QueryKind::INITIAL_QUERY)
{
// If this is an initial query without any parent OpenTelemetry trace, we
// might start the trace ourselves, with some configurable probability.
std::bernoulli_distribution should_start_trace{
settings.opentelemetry_start_trace_probability};
if (should_start_trace(thread_local_rng))
{
// Use the randomly generated default query id as the new trace id.
query_trace_context.trace_id = random.uuid;
query_trace_context.span_id = thread_local_rng();
// Mark this trace as sampled in the flags.
query_trace_context.trace_flags = 1;
}
}
String query_id_to_set = query_id;
if (query_id_to_set.empty()) /// If the user did not submit his query_id, then we generate it ourselves.
{
/// Use protected constructor.
struct QueryUUID : Poco::UUID
{
QueryUUID(const char * bytes, Poco::UUID::Version version)
: Poco::UUID(bytes, version) {}
};
query_id_to_set = QueryUUID(random.bytes, Poco::UUID::UUID_RANDOM).toString();
}
client_info.current_query_id = query_id_to_set;
if (client_info.query_kind == ClientInfo::QueryKind::INITIAL_QUERY)
client_info.initial_query_id = client_info.current_query_id;
}
void Context::killCurrentQuery()
{
if (process_list_elem)
{
process_list_elem->cancelQuery(true);
}
};
String Context::getDefaultFormat() const
{
return default_format.empty() ? "TabSeparated" : default_format;
}
void Context::setDefaultFormat(const String & name)
{
default_format = name;
}
MultiVersion<Macros>::Version Context::getMacros() const
{
return shared->macros.get();
}
void Context::setMacros(std::unique_ptr<Macros> && macros)
{
shared->macros.set(std::move(macros));
}
ContextMutablePtr Context::getQueryContext() const
{
auto ptr = query_context.lock();
if (!ptr) throw Exception("There is no query or query context has expired", ErrorCodes::THERE_IS_NO_QUERY);
return ptr;
}
bool Context::isInternalSubquery() const
{
auto ptr = query_context.lock();
return ptr && ptr.get() != this;
}
ContextMutablePtr Context::getSessionContext() const
{
auto ptr = session_context.lock();
if (!ptr) throw Exception("There is no session or session context has expired", ErrorCodes::THERE_IS_NO_SESSION);
return ptr;
}
ContextMutablePtr Context::getGlobalContext() const
{
auto ptr = global_context.lock();
if (!ptr) throw Exception("There is no global context or global context has expired", ErrorCodes::LOGICAL_ERROR);
return ptr;
}
ContextMutablePtr Context::getBufferContext() const
{
if (!buffer_context) throw Exception("There is no buffer context", ErrorCodes::LOGICAL_ERROR);
return buffer_context;
}
const EmbeddedDictionaries & Context::getEmbeddedDictionaries() const
{
return getEmbeddedDictionariesImpl(false);
}
EmbeddedDictionaries & Context::getEmbeddedDictionaries()
{
return getEmbeddedDictionariesImpl(false);
}
const ExternalDictionariesLoader & Context::getExternalDictionariesLoader() const
{
return const_cast<Context *>(this)->getExternalDictionariesLoader();
}
ExternalDictionariesLoader & Context::getExternalDictionariesLoader()
{
std::lock_guard lock(shared->external_dictionaries_mutex);
return getExternalDictionariesLoaderUnlocked();
}
ExternalDictionariesLoader & Context::getExternalDictionariesLoaderUnlocked()
{
if (!shared->external_dictionaries_loader)
shared->external_dictionaries_loader.emplace(getGlobalContext());
return *shared->external_dictionaries_loader;
}
const ExternalUserDefinedExecutableFunctionsLoader & Context::getExternalUserDefinedExecutableFunctionsLoader() const
{
return const_cast<Context *>(this)->getExternalUserDefinedExecutableFunctionsLoader();
}
ExternalUserDefinedExecutableFunctionsLoader & Context::getExternalUserDefinedExecutableFunctionsLoader()
{
std::lock_guard lock(shared->external_user_defined_executable_functions_mutex);
return getExternalUserDefinedExecutableFunctionsLoaderUnlocked();
}
ExternalUserDefinedExecutableFunctionsLoader & Context::getExternalUserDefinedExecutableFunctionsLoaderUnlocked()
{
if (!shared->external_user_defined_executable_functions_loader)
shared->external_user_defined_executable_functions_loader.emplace(getGlobalContext());
return *shared->external_user_defined_executable_functions_loader;
}
const ExternalModelsLoader & Context::getExternalModelsLoader() const
{
return const_cast<Context *>(this)->getExternalModelsLoader();
}
ExternalModelsLoader & Context::getExternalModelsLoader()
{
std::lock_guard lock(shared->external_models_mutex);
return getExternalModelsLoaderUnlocked();
}
ExternalModelsLoader & Context::getExternalModelsLoaderUnlocked()
{
if (!shared->external_models_loader)
shared->external_models_loader.emplace(getGlobalContext());
return *shared->external_models_loader;
}
void Context::loadOrReloadModels(const Poco::Util::AbstractConfiguration & config)
{
auto patterns_values = getMultipleValuesFromConfig(config, "", "models_config");
std::unordered_set<std::string> patterns(patterns_values.begin(), patterns_values.end());
std::lock_guard lock(shared->external_models_mutex);
auto & external_models_loader = getExternalModelsLoaderUnlocked();
if (shared->external_models_config_repository)
{
shared->external_models_config_repository->updatePatterns(patterns);
external_models_loader.reloadConfig(shared->external_models_config_repository->getName());
return;
}
auto app_path = getPath();
auto config_path = getConfigRef().getString("config-file", "config.xml");
auto repository = std::make_unique<ExternalLoaderXMLConfigRepository>(app_path, config_path, patterns);
shared->external_models_config_repository = repository.get();
shared->models_repository_guard = external_models_loader.addConfigRepository(std::move(repository));
}
EmbeddedDictionaries & Context::getEmbeddedDictionariesImpl(const bool throw_on_error) const
{
std::lock_guard lock(shared->embedded_dictionaries_mutex);
if (!shared->embedded_dictionaries)
{
auto geo_dictionaries_loader = std::make_unique<GeoDictionariesLoader>();
shared->embedded_dictionaries.emplace(
std::move(geo_dictionaries_loader),
getGlobalContext(),
throw_on_error);
}
return *shared->embedded_dictionaries;
}
void Context::tryCreateEmbeddedDictionaries(const Poco::Util::AbstractConfiguration & config) const
{
if (!config.getBool("dictionaries_lazy_load", true))
static_cast<void>(getEmbeddedDictionariesImpl(true));
}
void Context::loadOrReloadDictionaries(const Poco::Util::AbstractConfiguration & config)
{
bool dictionaries_lazy_load = config.getBool("dictionaries_lazy_load", true);
auto patterns_values = getMultipleValuesFromConfig(config, "", "dictionaries_config");
std::unordered_set<std::string> patterns(patterns_values.begin(), patterns_values.end());
std::lock_guard lock(shared->external_dictionaries_mutex);
auto & external_dictionaries_loader = getExternalDictionariesLoaderUnlocked();
external_dictionaries_loader.enableAlwaysLoadEverything(!dictionaries_lazy_load);
if (shared->external_dictionaries_config_repository)
{
shared->external_dictionaries_config_repository->updatePatterns(patterns);
external_dictionaries_loader.reloadConfig(shared->external_dictionaries_config_repository->getName());
return;
}
auto app_path = getPath();
auto config_path = getConfigRef().getString("config-file", "config.xml");
auto repository = std::make_unique<ExternalLoaderXMLConfigRepository>(app_path, config_path, patterns);
shared->external_dictionaries_config_repository = repository.get();
shared->dictionaries_xmls = external_dictionaries_loader.addConfigRepository(std::move(repository));
}
void Context::loadOrReloadUserDefinedExecutableFunctions(const Poco::Util::AbstractConfiguration & config)
{
auto patterns_values = getMultipleValuesFromConfig(config, "", "user_defined_executable_functions_config");
std::unordered_set<std::string> patterns(patterns_values.begin(), patterns_values.end());
std::lock_guard lock(shared->external_user_defined_executable_functions_mutex);
auto & external_user_defined_executable_functions_loader = getExternalUserDefinedExecutableFunctionsLoaderUnlocked();
if (shared->user_defined_executable_functions_config_repository)
{
shared->user_defined_executable_functions_config_repository->updatePatterns(patterns);
external_user_defined_executable_functions_loader.reloadConfig(shared->user_defined_executable_functions_config_repository->getName());
return;
}
auto app_path = getPath();
auto config_path = getConfigRef().getString("config-file", "config.xml");
auto repository = std::make_unique<ExternalLoaderXMLConfigRepository>(app_path, config_path, patterns);
shared->user_defined_executable_functions_config_repository = repository.get();
shared->user_defined_executable_functions_xmls = external_user_defined_executable_functions_loader.addConfigRepository(std::move(repository));
}
#if USE_NLP
SynonymsExtensions & Context::getSynonymsExtensions() const
{
auto lock = getLock();
if (!shared->synonyms_extensions)
shared->synonyms_extensions.emplace(getConfigRef());
return *shared->synonyms_extensions;
}
Lemmatizers & Context::getLemmatizers() const
{
auto lock = getLock();
if (!shared->lemmatizers)
shared->lemmatizers.emplace(getConfigRef());
return *shared->lemmatizers;
}
#endif
void Context::setProgressCallback(ProgressCallback callback)
{
/// Callback is set to a session or to a query. In the session, only one query is processed at a time. Therefore, the lock is not needed.
progress_callback = callback;
}
ProgressCallback Context::getProgressCallback() const
{
return progress_callback;
}
void Context::setProcessListElement(ProcessList::Element * elem)
{
/// Set to a session or query. In the session, only one query is processed at a time. Therefore, the lock is not needed.
process_list_elem = elem;
}
ProcessList::Element * Context::getProcessListElement() const
{
return process_list_elem;
}
void Context::setUncompressedCache(size_t max_size_in_bytes)
{
auto lock = getLock();
if (shared->uncompressed_cache)
throw Exception("Uncompressed cache has been already created.", ErrorCodes::LOGICAL_ERROR);
shared->uncompressed_cache = std::make_shared<UncompressedCache>(max_size_in_bytes);
}
UncompressedCachePtr Context::getUncompressedCache() const
{
auto lock = getLock();
return shared->uncompressed_cache;
}
void Context::dropUncompressedCache() const
{
auto lock = getLock();
if (shared->uncompressed_cache)
shared->uncompressed_cache->reset();
}
void Context::setMarkCache(size_t cache_size_in_bytes)
{
auto lock = getLock();
if (shared->mark_cache)
throw Exception("Mark cache has been already created.", ErrorCodes::LOGICAL_ERROR);
shared->mark_cache = std::make_shared<MarkCache>(cache_size_in_bytes);
}
MarkCachePtr Context::getMarkCache() const
{
auto lock = getLock();
return shared->mark_cache;
}
void Context::dropMarkCache() const
{
auto lock = getLock();
if (shared->mark_cache)
shared->mark_cache->reset();
}
void Context::setIndexUncompressedCache(size_t max_size_in_bytes)
{
auto lock = getLock();
if (shared->index_uncompressed_cache)
throw Exception("Index uncompressed cache has been already created.", ErrorCodes::LOGICAL_ERROR);
shared->index_uncompressed_cache = std::make_shared<UncompressedCache>(max_size_in_bytes);
}
UncompressedCachePtr Context::getIndexUncompressedCache() const
{
auto lock = getLock();
return shared->index_uncompressed_cache;
}
void Context::dropIndexUncompressedCache() const
{
auto lock = getLock();
if (shared->index_uncompressed_cache)
shared->index_uncompressed_cache->reset();
}
void Context::setIndexMarkCache(size_t cache_size_in_bytes)
{
auto lock = getLock();
if (shared->index_mark_cache)
throw Exception("Index mark cache has been already created.", ErrorCodes::LOGICAL_ERROR);
shared->index_mark_cache = std::make_shared<MarkCache>(cache_size_in_bytes);
}
MarkCachePtr Context::getIndexMarkCache() const
{
auto lock = getLock();
return shared->index_mark_cache;
}
void Context::dropIndexMarkCache() const
{
auto lock = getLock();
if (shared->index_mark_cache)
shared->index_mark_cache->reset();
}
void Context::setMMappedFileCache(size_t cache_size_in_num_entries)
{
auto lock = getLock();
if (shared->mmap_cache)
throw Exception("Mapped file cache has been already created.", ErrorCodes::LOGICAL_ERROR);
shared->mmap_cache = std::make_shared<MMappedFileCache>(cache_size_in_num_entries);
}
MMappedFileCachePtr Context::getMMappedFileCache() const
{
auto lock = getLock();
return shared->mmap_cache;
}
void Context::dropMMappedFileCache() const
{
auto lock = getLock();
if (shared->mmap_cache)
shared->mmap_cache->reset();
}
void Context::dropCaches() const
{
auto lock = getLock();
if (shared->uncompressed_cache)
shared->uncompressed_cache->reset();
if (shared->mark_cache)
shared->mark_cache->reset();
if (shared->index_uncompressed_cache)
shared->index_uncompressed_cache->reset();
if (shared->index_mark_cache)
shared->index_mark_cache->reset();
if (shared->mmap_cache)
shared->mmap_cache->reset();
}
BackgroundSchedulePool & Context::getBufferFlushSchedulePool() const
{
auto lock = getLock();
if (!shared->buffer_flush_schedule_pool)
shared->buffer_flush_schedule_pool.emplace(
settings.background_buffer_flush_schedule_pool_size,
CurrentMetrics::BackgroundBufferFlushSchedulePoolTask,
"BgBufSchPool");
return *shared->buffer_flush_schedule_pool;
}
BackgroundTaskSchedulingSettings Context::getBackgroundProcessingTaskSchedulingSettings() const
{
BackgroundTaskSchedulingSettings task_settings;
const auto & config = getConfigRef();
task_settings.thread_sleep_seconds = config.getDouble("background_processing_pool_thread_sleep_seconds", 10);
task_settings.thread_sleep_seconds_random_part = config.getDouble("background_processing_pool_thread_sleep_seconds_random_part", 1.0);
task_settings.thread_sleep_seconds_if_nothing_to_do = config.getDouble("background_processing_pool_thread_sleep_seconds_if_nothing_to_do", 0.1);
task_settings.task_sleep_seconds_when_no_work_min = config.getDouble("background_processing_pool_task_sleep_seconds_when_no_work_min", 10);
task_settings.task_sleep_seconds_when_no_work_max = config.getDouble("background_processing_pool_task_sleep_seconds_when_no_work_max", 600);
task_settings.task_sleep_seconds_when_no_work_multiplier = config.getDouble("background_processing_pool_task_sleep_seconds_when_no_work_multiplier", 1.1);
task_settings.task_sleep_seconds_when_no_work_random_part = config.getDouble("background_processing_pool_task_sleep_seconds_when_no_work_random_part", 1.0);
return task_settings;
}
BackgroundTaskSchedulingSettings Context::getBackgroundMoveTaskSchedulingSettings() const
{
BackgroundTaskSchedulingSettings task_settings;
const auto & config = getConfigRef();
task_settings.thread_sleep_seconds = config.getDouble("background_move_processing_pool_thread_sleep_seconds", 10);
task_settings.thread_sleep_seconds_random_part = config.getDouble("background_move_processing_pool_thread_sleep_seconds_random_part", 1.0);
task_settings.thread_sleep_seconds_if_nothing_to_do = config.getDouble("background_move_processing_pool_thread_sleep_seconds_if_nothing_to_do", 0.1);
task_settings.task_sleep_seconds_when_no_work_min = config.getDouble("background_move_processing_pool_task_sleep_seconds_when_no_work_min", 10);
task_settings.task_sleep_seconds_when_no_work_max = config.getDouble("background_move_processing_pool_task_sleep_seconds_when_no_work_max", 600);
task_settings.task_sleep_seconds_when_no_work_multiplier = config.getDouble("background_move_processing_pool_task_sleep_seconds_when_no_work_multiplier", 1.1);
task_settings.task_sleep_seconds_when_no_work_random_part = config.getDouble("background_move_processing_pool_task_sleep_seconds_when_no_work_random_part", 1.0);
return task_settings;
}
BackgroundSchedulePool & Context::getSchedulePool() const
{
auto lock = getLock();
if (!shared->schedule_pool)
shared->schedule_pool.emplace(
settings.background_schedule_pool_size,
CurrentMetrics::BackgroundSchedulePoolTask,
"BgSchPool");
return *shared->schedule_pool;
}
BackgroundSchedulePool & Context::getDistributedSchedulePool() const
{
auto lock = getLock();
if (!shared->distributed_schedule_pool)
shared->distributed_schedule_pool.emplace(
settings.background_distributed_schedule_pool_size,
CurrentMetrics::BackgroundDistributedSchedulePoolTask,
"BgDistSchPool");
return *shared->distributed_schedule_pool;
}
BackgroundSchedulePool & Context::getMessageBrokerSchedulePool() const
{
auto lock = getLock();
if (!shared->message_broker_schedule_pool)
shared->message_broker_schedule_pool.emplace(
settings.background_message_broker_schedule_pool_size,
CurrentMetrics::BackgroundMessageBrokerSchedulePoolTask,
"BgMBSchPool");
return *shared->message_broker_schedule_pool;
}
ThrottlerPtr Context::getReplicatedFetchesThrottler() const
{
auto lock = getLock();
if (!shared->replicated_fetches_throttler)
shared->replicated_fetches_throttler = std::make_shared<Throttler>(
settings.max_replicated_fetches_network_bandwidth_for_server);
return shared->replicated_fetches_throttler;
}
ThrottlerPtr Context::getReplicatedSendsThrottler() const
{
auto lock = getLock();
if (!shared->replicated_sends_throttler)
shared->replicated_sends_throttler = std::make_shared<Throttler>(
settings.max_replicated_sends_network_bandwidth_for_server);
return shared->replicated_sends_throttler;
}
bool Context::hasDistributedDDL() const
{
return getConfigRef().has("distributed_ddl");
}
void Context::setDDLWorker(std::unique_ptr<DDLWorker> ddl_worker)
{
auto lock = getLock();
if (shared->ddl_worker)
throw Exception("DDL background thread has already been initialized", ErrorCodes::LOGICAL_ERROR);
ddl_worker->startup();
shared->ddl_worker = std::move(ddl_worker);
}
DDLWorker & Context::getDDLWorker() const
{
auto lock = getLock();
if (!shared->ddl_worker)
{
if (!hasZooKeeper())
throw Exception("There is no Zookeeper configuration in server config", ErrorCodes::NO_ELEMENTS_IN_CONFIG);
if (!hasDistributedDDL())
throw Exception("There is no DistributedDDL configuration in server config", ErrorCodes::NO_ELEMENTS_IN_CONFIG);
throw Exception("DDL background thread is not initialized", ErrorCodes::NO_ELEMENTS_IN_CONFIG);
}
return *shared->ddl_worker;
}
zkutil::ZooKeeperPtr Context::getZooKeeper() const
{
std::lock_guard lock(shared->zookeeper_mutex);
const auto & config = shared->zookeeper_config ? *shared->zookeeper_config : getConfigRef();
if (!shared->zookeeper)
shared->zookeeper = std::make_shared<zkutil::ZooKeeper>(config, "zookeeper", getZooKeeperLog());
else if (shared->zookeeper->expired())
shared->zookeeper = shared->zookeeper->startNewSession();
return shared->zookeeper;
}
namespace
{
bool checkZooKeeperConfigIsLocal(const Poco::Util::AbstractConfiguration & config, const std::string & config_name)
{
Poco::Util::AbstractConfiguration::Keys keys;
config.keys(config_name, keys);
for (const auto & key : keys)
{
if (startsWith(key, "node"))
{
String host = config.getString(config_name + "." + key + ".host");
if (isLocalAddress(DNSResolver::instance().resolveHost(host)))
return true;
}
}
return false;
}
}
bool Context::tryCheckClientConnectionToMyKeeperCluster() const
{
try
{
/// If our server is part of main Keeper cluster
if (checkZooKeeperConfigIsLocal(getConfigRef(), "zookeeper"))
{
LOG_DEBUG(shared->log, "Keeper server is participant of the main zookeeper cluster, will try to connect to it");
getZooKeeper();
/// Connected, return true
return true;
}
else
{
Poco::Util::AbstractConfiguration::Keys keys;
getConfigRef().keys("auxiliary_zookeepers", keys);
/// If our server is part of some auxiliary_zookeeper
for (const auto & aux_zk_name : keys)
{
if (checkZooKeeperConfigIsLocal(getConfigRef(), "auxiliary_zookeepers." + aux_zk_name))
{
LOG_DEBUG(shared->log, "Our Keeper server is participant of the auxiliary zookeeper cluster ({}), will try to connect to it", aux_zk_name);
getAuxiliaryZooKeeper(aux_zk_name);
/// Connected, return true
return true;
}
}
}
/// Our server doesn't depend on our Keeper cluster
return true;
}
catch (...)
{
return false;
}
}
UInt32 Context::getZooKeeperSessionUptime() const
{
std::lock_guard lock(shared->zookeeper_mutex);
if (!shared->zookeeper || shared->zookeeper->expired())
return 0;
return shared->zookeeper->getSessionUptime();
}
void Context::setSystemZooKeeperLogAfterInitializationIfNeeded()
{
/// It can be nearly impossible to understand in which order global objects are initialized on server startup.
/// If getZooKeeper() is called before initializeSystemLogs(), then zkutil::ZooKeeper gets nullptr
/// instead of pointer to system table and it logs nothing.
/// This method explicitly sets correct pointer to system log after its initialization.
/// TODO get rid of this if possible
std::lock_guard lock(shared->zookeeper_mutex);
if (!shared->system_logs || !shared->system_logs->zookeeper_log)
return;
if (shared->zookeeper)
shared->zookeeper->setZooKeeperLog(shared->system_logs->zookeeper_log);
for (auto & zk : shared->auxiliary_zookeepers)
zk.second->setZooKeeperLog(shared->system_logs->zookeeper_log);
}
void Context::initializeKeeperDispatcher([[maybe_unused]] bool start_async) const
{
#if USE_NURAFT
std::lock_guard lock(shared->keeper_dispatcher_mutex);
if (shared->keeper_dispatcher)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Trying to initialize Keeper multiple times");
const auto & config = getConfigRef();
if (config.has("keeper_server"))
{
bool is_standalone_app = getApplicationType() == ApplicationType::KEEPER;
if (start_async)
{
assert(!is_standalone_app);
LOG_INFO(shared->log, "Connected to ZooKeeper (or Keeper) before internal Keeper start or we don't depend on our Keeper cluster, "
"will wait for Keeper asynchronously");
}
else
{
LOG_INFO(shared->log, "Cannot connect to ZooKeeper (or Keeper) before internal Keeper start, "
"will wait for Keeper synchronously");
}
shared->keeper_dispatcher = std::make_shared<KeeperDispatcher>();
shared->keeper_dispatcher->initialize(config, is_standalone_app, start_async);
}
#endif
}
#if USE_NURAFT
std::shared_ptr<KeeperDispatcher> & Context::getKeeperDispatcher() const
{
std::lock_guard lock(shared->keeper_dispatcher_mutex);
if (!shared->keeper_dispatcher)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Keeper must be initialized before requests");
return shared->keeper_dispatcher;
}
#endif
void Context::shutdownKeeperDispatcher() const
{
#if USE_NURAFT
std::lock_guard lock(shared->keeper_dispatcher_mutex);
if (shared->keeper_dispatcher)
{
shared->keeper_dispatcher->shutdown();
shared->keeper_dispatcher.reset();
}
#endif
}
void Context::updateKeeperConfiguration([[maybe_unused]] const Poco::Util::AbstractConfiguration & config)
{
#if USE_NURAFT
std::lock_guard lock(shared->keeper_dispatcher_mutex);
if (!shared->keeper_dispatcher)
return;
shared->keeper_dispatcher->updateConfiguration(config);
#endif
}
zkutil::ZooKeeperPtr Context::getAuxiliaryZooKeeper(const String & name) const
{
std::lock_guard lock(shared->auxiliary_zookeepers_mutex);
auto zookeeper = shared->auxiliary_zookeepers.find(name);
if (zookeeper == shared->auxiliary_zookeepers.end())
{
if (name.find(':') != std::string::npos || name.find('/') != std::string::npos)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Invalid auxiliary ZooKeeper name {}: ':' and '/' are not allowed", name);
const auto & config = shared->auxiliary_zookeepers_config ? *shared->auxiliary_zookeepers_config : getConfigRef();
if (!config.has("auxiliary_zookeepers." + name))
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Unknown auxiliary ZooKeeper name '{}'. If it's required it can be added to the section <auxiliary_zookeepers> in "
"config.xml",
name);
zookeeper = shared->auxiliary_zookeepers.emplace(name,
std::make_shared<zkutil::ZooKeeper>(config, "auxiliary_zookeepers." + name, getZooKeeperLog())).first;
}
else if (zookeeper->second->expired())
zookeeper->second = zookeeper->second->startNewSession();
return zookeeper->second;
}
void Context::resetZooKeeper() const
{
std::lock_guard lock(shared->zookeeper_mutex);
shared->zookeeper.reset();
}
static void reloadZooKeeperIfChangedImpl(const ConfigurationPtr & config, const std::string & config_name, zkutil::ZooKeeperPtr & zk,
std::shared_ptr<ZooKeeperLog> zk_log)
{
if (!zk || zk->configChanged(*config, config_name))
{
if (zk)
zk->finalize("Config changed");
zk = std::make_shared<zkutil::ZooKeeper>(*config, config_name, std::move(zk_log));
}
}
void Context::reloadZooKeeperIfChanged(const ConfigurationPtr & config) const
{
std::lock_guard lock(shared->zookeeper_mutex);
shared->zookeeper_config = config;
reloadZooKeeperIfChangedImpl(config, "zookeeper", shared->zookeeper, getZooKeeperLog());
}
void Context::reloadAuxiliaryZooKeepersConfigIfChanged(const ConfigurationPtr & config)
{
std::lock_guard lock(shared->auxiliary_zookeepers_mutex);
shared->auxiliary_zookeepers_config = config;
for (auto it = shared->auxiliary_zookeepers.begin(); it != shared->auxiliary_zookeepers.end();)
{
if (!config->has("auxiliary_zookeepers." + it->first))
it = shared->auxiliary_zookeepers.erase(it);
else
{
reloadZooKeeperIfChangedImpl(config, "auxiliary_zookeepers." + it->first, it->second, getZooKeeperLog());
++it;
}
}
}
bool Context::hasZooKeeper() const
{
return getConfigRef().has("zookeeper");
}
bool Context::hasAuxiliaryZooKeeper(const String & name) const
{
return getConfigRef().has("auxiliary_zookeepers." + name);
}
InterserverCredentialsPtr Context::getInterserverCredentials()
{
return shared->interserver_io_credentials.get();
}
void Context::updateInterserverCredentials(const Poco::Util::AbstractConfiguration & config)
{
auto credentials = InterserverCredentials::make(config, "interserver_http_credentials");
shared->interserver_io_credentials.set(std::move(credentials));
}
void Context::setInterserverIOAddress(const String & host, UInt16 port)
{
shared->interserver_io_host = host;
shared->interserver_io_port = port;
}
std::pair<String, UInt16> Context::getInterserverIOAddress() const
{
if (shared->interserver_io_host.empty() || shared->interserver_io_port == 0)
throw Exception("Parameter 'interserver_http(s)_port' required for replication is not specified in configuration file.",
ErrorCodes::NO_ELEMENTS_IN_CONFIG);
return { shared->interserver_io_host, shared->interserver_io_port };
}
void Context::setInterserverScheme(const String & scheme)
{
shared->interserver_scheme = scheme;
}
String Context::getInterserverScheme() const
{
return shared->interserver_scheme;
}
void Context::setRemoteHostFilter(const Poco::Util::AbstractConfiguration & config)
{
shared->remote_host_filter.setValuesFromConfig(config);
}
const RemoteHostFilter & Context::getRemoteHostFilter() const
{
return shared->remote_host_filter;
}
UInt16 Context::getTCPPort() const
{
auto lock = getLock();
const auto & config = getConfigRef();
return config.getInt("tcp_port", DBMS_DEFAULT_PORT);
}
std::optional<UInt16> Context::getTCPPortSecure() const
{
auto lock = getLock();
const auto & config = getConfigRef();
if (config.has("tcp_port_secure"))
return config.getInt("tcp_port_secure");
return {};
}
void Context::registerServerPort(String port_name, UInt16 port)
{
shared->server_ports.emplace(std::move(port_name), port);
}
UInt16 Context::getServerPort(const String & port_name) const
{
auto it = shared->server_ports.find(port_name);
if (it == shared->server_ports.end())
throw Exception(ErrorCodes::BAD_GET, "There is no port named {}", port_name);
else
return it->second;
}
std::shared_ptr<Cluster> Context::getCluster(const std::string & cluster_name) const
{
if (auto res = tryGetCluster(cluster_name))
return res;
throw Exception("Requested cluster '" + cluster_name + "' not found", ErrorCodes::BAD_GET);
}
std::shared_ptr<Cluster> Context::tryGetCluster(const std::string & cluster_name) const
{
auto res = getClusters()->getCluster(cluster_name);
if (res)
return res;
if (!cluster_name.empty())
res = tryGetReplicatedDatabaseCluster(cluster_name);
return res;
}
void Context::reloadClusterConfig() const
{
while (true)
{
ConfigurationPtr cluster_config;
{
std::lock_guard lock(shared->clusters_mutex);
cluster_config = shared->clusters_config;
}
const auto & config = cluster_config ? *cluster_config : getConfigRef();
auto new_clusters = std::make_shared<Clusters>(config, settings);
{
std::lock_guard lock(shared->clusters_mutex);
if (shared->clusters_config.get() == cluster_config.get())
{
shared->clusters = std::move(new_clusters);
return;
}
// Clusters config has been suddenly changed, recompute clusters
}
}
}
std::shared_ptr<Clusters> Context::getClusters() const
{
std::lock_guard lock(shared->clusters_mutex);
if (!shared->clusters)
{
const auto & config = shared->clusters_config ? *shared->clusters_config : getConfigRef();
shared->clusters = std::make_shared<Clusters>(config, settings);
}
return shared->clusters;
}
void Context::startClusterDiscovery()
{
if (!shared->cluster_discovery)
return;
shared->cluster_discovery->start();
}
/// On repeating calls updates existing clusters and adds new clusters, doesn't delete old clusters
void Context::setClustersConfig(const ConfigurationPtr & config, bool enable_discovery, const String & config_name)
{
std::lock_guard lock(shared->clusters_mutex);
if (config->getBool("allow_experimental_cluster_discovery", false) && enable_discovery && !shared->cluster_discovery)
{
shared->cluster_discovery = std::make_unique<ClusterDiscovery>(*config, getGlobalContext());
}
/// Do not update clusters if this part of config wasn't changed.
if (shared->clusters && isSameConfiguration(*config, *shared->clusters_config, config_name))
return;
auto old_clusters_config = shared->clusters_config;
shared->clusters_config = config;
if (!shared->clusters)
shared->clusters = std::make_shared<Clusters>(*shared->clusters_config, settings, config_name);
else
shared->clusters->updateClusters(*shared->clusters_config, settings, config_name, old_clusters_config);
}
void Context::setCluster(const String & cluster_name, const std::shared_ptr<Cluster> & cluster)
{
std::lock_guard lock(shared->clusters_mutex);
if (!shared->clusters)
throw Exception("Clusters are not set", ErrorCodes::LOGICAL_ERROR);
shared->clusters->setCluster(cluster_name, cluster);
}
void Context::initializeSystemLogs()
{
auto lock = getLock();
shared->system_logs = std::make_unique<SystemLogs>(getGlobalContext(), getConfigRef());
}
void Context::initializeTraceCollector()
{
shared->initializeTraceCollector(getTraceLog());
}
bool Context::hasTraceCollector() const
{
return shared->hasTraceCollector();
}
std::shared_ptr<QueryLog> Context::getQueryLog() const
{
auto lock = getLock();
if (!shared->system_logs)
return {};
return shared->system_logs->query_log;
}
std::shared_ptr<QueryThreadLog> Context::getQueryThreadLog() const
{
auto lock = getLock();
if (!shared->system_logs)
return {};
return shared->system_logs->query_thread_log;
}
std::shared_ptr<QueryViewsLog> Context::getQueryViewsLog() const
{
auto lock = getLock();
if (!shared->system_logs)
return {};
return shared->system_logs->query_views_log;
}
std::shared_ptr<PartLog> Context::getPartLog(const String & part_database) const
{
auto lock = getLock();
/// No part log or system logs are shutting down.
if (!shared->system_logs)
return {};
/// Will not log operations on system tables (including part_log itself).
/// It doesn't make sense and not allow to destruct PartLog correctly due to infinite logging and flushing,
/// and also make troubles on startup.
if (part_database == DatabaseCatalog::SYSTEM_DATABASE)
return {};
return shared->system_logs->part_log;
}
std::shared_ptr<TraceLog> Context::getTraceLog() const
{
auto lock = getLock();
if (!shared->system_logs)
return {};
return shared->system_logs->trace_log;
}
std::shared_ptr<TextLog> Context::getTextLog() const
{
auto lock = getLock();
if (!shared->system_logs)
return {};
return shared->system_logs->text_log;
}
std::shared_ptr<MetricLog> Context::getMetricLog() const
{
auto lock = getLock();
if (!shared->system_logs)
return {};
return shared->system_logs->metric_log;
}
std::shared_ptr<AsynchronousMetricLog> Context::getAsynchronousMetricLog() const
{
auto lock = getLock();
if (!shared->system_logs)
return {};
return shared->system_logs->asynchronous_metric_log;
}
std::shared_ptr<OpenTelemetrySpanLog> Context::getOpenTelemetrySpanLog() const
{
auto lock = getLock();
if (!shared->system_logs)
return {};
return shared->system_logs->opentelemetry_span_log;
}
std::shared_ptr<SessionLog> Context::getSessionLog() const
{
auto lock = getLock();
if (!shared->system_logs)
return {};
return shared->system_logs->session_log;
}
std::shared_ptr<ZooKeeperLog> Context::getZooKeeperLog() const
{
auto lock = getLock();
if (!shared->system_logs)
return {};
return shared->system_logs->zookeeper_log;
}
CompressionCodecPtr Context::chooseCompressionCodec(size_t part_size, double part_size_ratio) const
{
auto lock = getLock();
if (!shared->compression_codec_selector)
{
constexpr auto config_name = "compression";
const auto & config = getConfigRef();
if (config.has(config_name))
shared->compression_codec_selector = std::make_unique<CompressionCodecSelector>(config, "compression");
else
shared->compression_codec_selector = std::make_unique<CompressionCodecSelector>();
}
return shared->compression_codec_selector->choose(part_size, part_size_ratio);
}
DiskPtr Context::getDisk(const String & name) const
{
std::lock_guard lock(shared->storage_policies_mutex);
auto disk_selector = getDiskSelector(lock);
return disk_selector->get(name);
}
StoragePolicyPtr Context::getStoragePolicy(const String & name) const
{
std::lock_guard lock(shared->storage_policies_mutex);
auto policy_selector = getStoragePolicySelector(lock);
return policy_selector->get(name);
}
DisksMap Context::getDisksMap() const
{
std::lock_guard lock(shared->storage_policies_mutex);
return getDiskSelector(lock)->getDisksMap();
}
StoragePoliciesMap Context::getPoliciesMap() const
{
std::lock_guard lock(shared->storage_policies_mutex);
return getStoragePolicySelector(lock)->getPoliciesMap();
}
DiskSelectorPtr Context::getDiskSelector(std::lock_guard<std::mutex> & /* lock */) const
{
if (!shared->merge_tree_disk_selector)
{
constexpr auto config_name = "storage_configuration.disks";
const auto & config = getConfigRef();
shared->merge_tree_disk_selector = std::make_shared<DiskSelector>(config, config_name, shared_from_this());
}
return shared->merge_tree_disk_selector;
}
StoragePolicySelectorPtr Context::getStoragePolicySelector(std::lock_guard<std::mutex> & lock) const
{
if (!shared->merge_tree_storage_policy_selector)
{
constexpr auto config_name = "storage_configuration.policies";
const auto & config = getConfigRef();
shared->merge_tree_storage_policy_selector = std::make_shared<StoragePolicySelector>(config, config_name, getDiskSelector(lock));
}
return shared->merge_tree_storage_policy_selector;
}
void Context::updateStorageConfiguration(const Poco::Util::AbstractConfiguration & config)
{
std::lock_guard lock(shared->storage_policies_mutex);
if (shared->merge_tree_disk_selector)
shared->merge_tree_disk_selector
= shared->merge_tree_disk_selector->updateFromConfig(config, "storage_configuration.disks", shared_from_this());
if (shared->merge_tree_storage_policy_selector)
{
try
{
shared->merge_tree_storage_policy_selector = shared->merge_tree_storage_policy_selector->updateFromConfig(
config, "storage_configuration.policies", shared->merge_tree_disk_selector);
}
catch (Exception & e)
{
LOG_ERROR(
shared->log, "An error has occurred while reloading storage policies, storage policies were not applied: {}", e.message());
}
}
if (shared->storage_s3_settings)
{
shared->storage_s3_settings->loadFromConfig("s3", config);
}
}
const MergeTreeSettings & Context::getMergeTreeSettings() const
{
auto lock = getLock();
if (!shared->merge_tree_settings)
{
const auto & config = getConfigRef();
MergeTreeSettings mt_settings;
mt_settings.loadFromConfig("merge_tree", config);
shared->merge_tree_settings.emplace(mt_settings);
}
return *shared->merge_tree_settings;
}
const MergeTreeSettings & Context::getReplicatedMergeTreeSettings() const
{
auto lock = getLock();
if (!shared->replicated_merge_tree_settings)
{
const auto & config = getConfigRef();
MergeTreeSettings mt_settings;
mt_settings.loadFromConfig("merge_tree", config);
mt_settings.loadFromConfig("replicated_merge_tree", config);
shared->replicated_merge_tree_settings.emplace(mt_settings);
}
return *shared->replicated_merge_tree_settings;
}
const StorageS3Settings & Context::getStorageS3Settings() const
{
auto lock = getLock();
if (!shared->storage_s3_settings)
{
const auto & config = getConfigRef();
shared->storage_s3_settings.emplace().loadFromConfig("s3", config);
}
return *shared->storage_s3_settings;
}
void Context::checkCanBeDropped(const String & database, const String & table, const size_t & size, const size_t & max_size_to_drop) const
{
if (!max_size_to_drop || size <= max_size_to_drop)
return;
fs::path force_file(getFlagsPath() + "force_drop_table");
bool force_file_exists = fs::exists(force_file);
if (force_file_exists)
{
try
{
fs::remove(force_file);
return;
}
catch (...)
{
/// User should recreate force file on each drop, it shouldn't be protected
tryLogCurrentException("Drop table check", "Can't remove force file to enable table or partition drop");
}
}
String size_str = formatReadableSizeWithDecimalSuffix(size);
String max_size_to_drop_str = formatReadableSizeWithDecimalSuffix(max_size_to_drop);
throw Exception(ErrorCodes::TABLE_SIZE_EXCEEDS_MAX_DROP_SIZE_LIMIT,
"Table or Partition in {}.{} was not dropped.\nReason:\n"
"1. Size ({}) is greater than max_[table/partition]_size_to_drop ({})\n"
"2. File '{}' intended to force DROP {}\n"
"How to fix this:\n"
"1. Either increase (or set to zero) max_[table/partition]_size_to_drop in server config\n"
"2. Either create forcing file {} and make sure that ClickHouse has write permission for it.\n"
"Example:\nsudo touch '{}' && sudo chmod 666 '{}'",
backQuoteIfNeed(database), backQuoteIfNeed(table),
size_str, max_size_to_drop_str,
force_file.string(), force_file_exists ? "exists but not writeable (could not be removed)" : "doesn't exist",
force_file.string(),
force_file.string(), force_file.string());
}
void Context::setMaxTableSizeToDrop(size_t max_size)
{
// Is initialized at server startup and updated at config reload
shared->max_table_size_to_drop.store(max_size, std::memory_order_relaxed);
}
void Context::checkTableCanBeDropped(const String & database, const String & table, const size_t & table_size) const
{
size_t max_table_size_to_drop = shared->max_table_size_to_drop.load(std::memory_order_relaxed);
checkCanBeDropped(database, table, table_size, max_table_size_to_drop);
}
void Context::setMaxPartitionSizeToDrop(size_t max_size)
{
// Is initialized at server startup and updated at config reload
shared->max_partition_size_to_drop.store(max_size, std::memory_order_relaxed);
}
void Context::checkPartitionCanBeDropped(const String & database, const String & table, const size_t & partition_size) const
{
size_t max_partition_size_to_drop = shared->max_partition_size_to_drop.load(std::memory_order_relaxed);
checkCanBeDropped(database, table, partition_size, max_partition_size_to_drop);
}
InputFormatPtr Context::getInputFormat(const String & name, ReadBuffer & buf, const Block & sample, UInt64 max_block_size, const std::optional<FormatSettings> & format_settings) const
{
return FormatFactory::instance().getInput(name, buf, sample, shared_from_this(), max_block_size, format_settings);
}
OutputFormatPtr Context::getOutputFormat(const String & name, WriteBuffer & buf, const Block & sample) const
{
return FormatFactory::instance().getOutputFormat(name, buf, sample, shared_from_this());
}
OutputFormatPtr Context::getOutputFormatParallelIfPossible(const String & name, WriteBuffer & buf, const Block & sample) const
{
return FormatFactory::instance().getOutputFormatParallelIfPossible(name, buf, sample, shared_from_this());
}
time_t Context::getUptimeSeconds() const
{
auto lock = getLock();
return shared->uptime_watch.elapsedSeconds();
}
void Context::setConfigReloadCallback(ConfigReloadCallback && callback)
{
/// Is initialized at server startup, so lock isn't required. Otherwise use mutex.
shared->config_reload_callback = std::move(callback);
}
void Context::reloadConfig() const
{
/// Use mutex if callback may be changed after startup.
if (!shared->config_reload_callback)
throw Exception("Can't reload config because config_reload_callback is not set.", ErrorCodes::LOGICAL_ERROR);
shared->config_reload_callback();
}
void Context::shutdown()
{
// Disk selector might not be initialized if there was some error during
// its initialization. Don't try to initialize it again on shutdown.
if (shared->merge_tree_disk_selector)
{
for (auto & [disk_name, disk] : getDisksMap())
{
LOG_INFO(shared->log, "Shutdown disk {}", disk_name);
disk->shutdown();
}
}
shared->shutdown();
}
Context::ApplicationType Context::getApplicationType() const
{
return shared->application_type;
}
void Context::setApplicationType(ApplicationType type)
{
/// Lock isn't required, you should set it at start
shared->application_type = type;
}
void Context::setDefaultProfiles(const Poco::Util::AbstractConfiguration & config)
{
shared->default_profile_name = config.getString("default_profile", "default");
getAccessControl().setDefaultProfileName(shared->default_profile_name);
shared->system_profile_name = config.getString("system_profile", shared->default_profile_name);
setCurrentProfile(shared->system_profile_name);
applySettingsQuirks(settings, &Poco::Logger::get("SettingsQuirks"));
shared->buffer_profile_name = config.getString("buffer_profile", shared->system_profile_name);
buffer_context = Context::createCopy(shared_from_this());
buffer_context->setCurrentProfile(shared->buffer_profile_name);
}
String Context::getDefaultProfileName() const
{
return shared->default_profile_name;
}
String Context::getSystemProfileName() const
{
return shared->system_profile_name;
}
String Context::getFormatSchemaPath() const
{
return shared->format_schema_path;
}
void Context::setFormatSchemaPath(const String & path)
{
shared->format_schema_path = path;
}
Context::SampleBlockCache & Context::getSampleBlockCache() const
{
assert(hasQueryContext());
return getQueryContext()->sample_block_cache;
}
bool Context::hasQueryParameters() const
{
return !query_parameters.empty();
}
const NameToNameMap & Context::getQueryParameters() const
{
return query_parameters;
}
void Context::setQueryParameter(const String & name, const String & value)
{
if (!query_parameters.emplace(name, value).second)
throw Exception("Duplicate name " + backQuote(name) + " of query parameter", ErrorCodes::BAD_ARGUMENTS);
}
void Context::addBridgeCommand(std::unique_ptr<ShellCommand> cmd) const
{
auto lock = getLock();
shared->bridge_commands.emplace_back(std::move(cmd));
}
IHostContextPtr & Context::getHostContext()
{
return host_context;
}
const IHostContextPtr & Context::getHostContext() const
{
return host_context;
}
std::shared_ptr<ActionLocksManager> Context::getActionLocksManager()
{
auto lock = getLock();
if (!shared->action_locks_manager)
shared->action_locks_manager = std::make_shared<ActionLocksManager>(shared_from_this());
return shared->action_locks_manager;
}
void Context::setExternalTablesInitializer(ExternalTablesInitializer && initializer)
{
if (external_tables_initializer_callback)
throw Exception("External tables initializer is already set", ErrorCodes::LOGICAL_ERROR);
external_tables_initializer_callback = std::move(initializer);
}
void Context::initializeExternalTablesIfSet()
{
if (external_tables_initializer_callback)
{
external_tables_initializer_callback(shared_from_this());
/// Reset callback
external_tables_initializer_callback = {};
}
}
void Context::setInputInitializer(InputInitializer && initializer)
{
if (input_initializer_callback)
throw Exception("Input initializer is already set", ErrorCodes::LOGICAL_ERROR);
input_initializer_callback = std::move(initializer);
}
void Context::initializeInput(const StoragePtr & input_storage)
{
if (!input_initializer_callback)
throw Exception("Input initializer is not set", ErrorCodes::LOGICAL_ERROR);
input_initializer_callback(shared_from_this(), input_storage);
/// Reset callback
input_initializer_callback = {};
}
void Context::setInputBlocksReaderCallback(InputBlocksReader && reader)
{
if (input_blocks_reader)
throw Exception("Input blocks reader is already set", ErrorCodes::LOGICAL_ERROR);
input_blocks_reader = std::move(reader);
}
InputBlocksReader Context::getInputBlocksReaderCallback() const
{
return input_blocks_reader;
}
void Context::resetInputCallbacks()
{
if (input_initializer_callback)
input_initializer_callback = {};
if (input_blocks_reader)
input_blocks_reader = {};
}
StorageID Context::resolveStorageID(StorageID storage_id, StorageNamespace where) const
{
if (storage_id.uuid != UUIDHelpers::Nil)
return storage_id;
StorageID resolved = StorageID::createEmpty();
std::optional<Exception> exc;
{
auto lock = getLock();
resolved = resolveStorageIDImpl(std::move(storage_id), where, &exc);
}
if (exc)
throw Exception(*exc);
if (!resolved.hasUUID() && resolved.database_name != DatabaseCatalog::TEMPORARY_DATABASE)
resolved.uuid = DatabaseCatalog::instance().getDatabase(resolved.database_name)->tryGetTableUUID(resolved.table_name);
return resolved;
}
StorageID Context::tryResolveStorageID(StorageID storage_id, StorageNamespace where) const
{
if (storage_id.uuid != UUIDHelpers::Nil)
return storage_id;
StorageID resolved = StorageID::createEmpty();
{
auto lock = getLock();
resolved = resolveStorageIDImpl(std::move(storage_id), where, nullptr);
}
if (resolved && !resolved.hasUUID() && resolved.database_name != DatabaseCatalog::TEMPORARY_DATABASE)
{
auto db = DatabaseCatalog::instance().tryGetDatabase(resolved.database_name);
if (db)
resolved.uuid = db->tryGetTableUUID(resolved.table_name);
}
return resolved;
}
StorageID Context::resolveStorageIDImpl(StorageID storage_id, StorageNamespace where, std::optional<Exception> * exception) const
{
if (storage_id.uuid != UUIDHelpers::Nil)
return storage_id;
if (!storage_id)
{
if (exception)
exception->emplace("Both table name and UUID are empty", ErrorCodes::UNKNOWN_TABLE);
return storage_id;
}
bool look_for_external_table = where & StorageNamespace::ResolveExternal;
/// Global context should not contain temporary tables
if (isGlobalContext())
look_for_external_table = false;
bool in_current_database = where & StorageNamespace::ResolveCurrentDatabase;
bool in_specified_database = where & StorageNamespace::ResolveGlobal;
if (!storage_id.database_name.empty())
{
if (in_specified_database)
return storage_id; /// NOTE There is no guarantees that table actually exists in database.
if (exception)
exception->emplace("External and temporary tables have no database, but " +
storage_id.database_name + " is specified", ErrorCodes::UNKNOWN_TABLE);
return StorageID::createEmpty();
}
/// Database name is not specified. It's temporary table or table in current database.
if (look_for_external_table)
{
auto resolved_id = StorageID::createEmpty();
auto try_resolve = [&](ContextPtr context) -> bool
{
const auto & tables = context->external_tables_mapping;
auto it = tables.find(storage_id.getTableName());
if (it == tables.end())
return false;
resolved_id = it->second->getGlobalTableID();
return true;
};
/// Firstly look for temporary table in current context
if (try_resolve(shared_from_this()))
return resolved_id;
/// If not found and current context was created from some query context, look for temporary table in query context
auto query_context_ptr = query_context.lock();
bool is_local_context = query_context_ptr && query_context_ptr.get() != this;
if (is_local_context && try_resolve(query_context_ptr))
return resolved_id;
/// If not found and current context was created from some session context, look for temporary table in session context
auto session_context_ptr = session_context.lock();
bool is_local_or_query_context = session_context_ptr && session_context_ptr.get() != this;
if (is_local_or_query_context && try_resolve(session_context_ptr))
return resolved_id;
}
/// Temporary table not found. It's table in current database.
if (in_current_database)
{
if (current_database.empty())
{
if (exception)
exception->emplace("Default database is not selected", ErrorCodes::UNKNOWN_DATABASE);
return StorageID::createEmpty();
}
storage_id.database_name = current_database;
/// NOTE There is no guarantees that table actually exists in database.
return storage_id;
}
if (exception)
exception->emplace("Cannot resolve database name for table " + storage_id.getNameForLogs(), ErrorCodes::UNKNOWN_TABLE);
return StorageID::createEmpty();
}
void Context::initZooKeeperMetadataTransaction(ZooKeeperMetadataTransactionPtr txn, [[maybe_unused]] bool attach_existing)
{
assert(!metadata_transaction);
assert(attach_existing || query_context.lock().get() == this);
metadata_transaction = std::move(txn);
}
ZooKeeperMetadataTransactionPtr Context::getZooKeeperMetadataTransaction() const
{
assert(!metadata_transaction || hasQueryContext());
return metadata_transaction;
}
void Context::resetZooKeeperMetadataTransaction()
{
assert(metadata_transaction);
assert(hasQueryContext());
metadata_transaction = nullptr;
}
PartUUIDsPtr Context::getPartUUIDs() const
{
auto lock = getLock();
if (!part_uuids)
/// For context itself, only this initialization is not const.
/// We could have done in constructor.
/// TODO: probably, remove this from Context.
const_cast<PartUUIDsPtr &>(part_uuids) = std::make_shared<PartUUIDs>();
return part_uuids;
}
ReadTaskCallback Context::getReadTaskCallback() const
{
if (!next_task_callback.has_value())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Next task callback is not set for query {}", getInitialQueryId());
return next_task_callback.value();
}
void Context::setReadTaskCallback(ReadTaskCallback && callback)
{
next_task_callback = callback;
}
MergeTreeReadTaskCallback Context::getMergeTreeReadTaskCallback() const
{
if (!merge_tree_read_task_callback.has_value())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Next task callback for is not set for query {}", getInitialQueryId());
return merge_tree_read_task_callback.value();
}
void Context::setMergeTreeReadTaskCallback(MergeTreeReadTaskCallback && callback)
{
merge_tree_read_task_callback = callback;
}
PartUUIDsPtr Context::getIgnoredPartUUIDs() const
{
auto lock = getLock();
if (!ignored_part_uuids)
const_cast<PartUUIDsPtr &>(ignored_part_uuids) = std::make_shared<PartUUIDs>();
return ignored_part_uuids;
}
AsynchronousInsertQueue * Context::getAsynchronousInsertQueue() const
{
return shared->async_insert_queue.get();
}
void Context::setAsynchronousInsertQueue(const std::shared_ptr<AsynchronousInsertQueue> & ptr)
{
using namespace std::chrono;
if (std::chrono::milliseconds(settings.async_insert_busy_timeout_ms) == 0ms)
throw Exception("Setting async_insert_busy_timeout_ms can't be zero", ErrorCodes::INVALID_SETTING_VALUE);
shared->async_insert_queue = ptr;
}
void Context::initializeBackgroundExecutorsIfNeeded()
{
auto lock = getLock();
if (shared->is_background_executors_initialized)
return;
const size_t max_merges_and_mutations = getSettingsRef().background_pool_size * getSettingsRef().background_merges_mutations_concurrency_ratio;
/// With this executor we can execute more tasks than threads we have
shared->merge_mutate_executor = MergeMutateBackgroundExecutor::create
(
"MergeMutate",
/*max_threads_count*/getSettingsRef().background_pool_size,
/*max_tasks_count*/max_merges_and_mutations,
CurrentMetrics::BackgroundMergesAndMutationsPoolTask
);
LOG_INFO(shared->log, "Initialized background executor for merges and mutations with num_threads={}, num_tasks={}",
getSettingsRef().background_pool_size, max_merges_and_mutations);
shared->moves_executor = OrdinaryBackgroundExecutor::create
(
"Move",
getSettingsRef().background_move_pool_size,
getSettingsRef().background_move_pool_size,
CurrentMetrics::BackgroundMovePoolTask
);
LOG_INFO(shared->log, "Initialized background executor for move operations with num_threads={}, num_tasks={}",
getSettingsRef().background_move_pool_size, getSettingsRef().background_move_pool_size);
shared->fetch_executor = OrdinaryBackgroundExecutor::create
(
"Fetch",
getSettingsRef().background_fetches_pool_size,
getSettingsRef().background_fetches_pool_size,
CurrentMetrics::BackgroundFetchesPoolTask
);
LOG_INFO(shared->log, "Initialized background executor for fetches with num_threads={}, num_tasks={}",
getSettingsRef().background_fetches_pool_size, getSettingsRef().background_fetches_pool_size);
shared->common_executor = OrdinaryBackgroundExecutor::create
(
"Common",
getSettingsRef().background_common_pool_size,
getSettingsRef().background_common_pool_size,
CurrentMetrics::BackgroundCommonPoolTask
);
LOG_INFO(shared->log, "Initialized background executor for common operations (e.g. clearing old parts) with num_threads={}, num_tasks={}",
getSettingsRef().background_common_pool_size, getSettingsRef().background_common_pool_size);
shared->is_background_executors_initialized = true;
}
MergeMutateBackgroundExecutorPtr Context::getMergeMutateExecutor() const
{
return shared->merge_mutate_executor;
}
OrdinaryBackgroundExecutorPtr Context::getMovesExecutor() const
{
return shared->moves_executor;
}
OrdinaryBackgroundExecutorPtr Context::getFetchesExecutor() const
{
return shared->fetch_executor;
}
OrdinaryBackgroundExecutorPtr Context::getCommonExecutor() const
{
return shared->common_executor;
}
ReadSettings Context::getReadSettings() const
{
ReadSettings res;
std::string_view read_method_str = settings.local_filesystem_read_method.value;
if (auto opt_method = magic_enum::enum_cast<LocalFSReadMethod>(read_method_str))
res.local_fs_method = *opt_method;
else
throw Exception(ErrorCodes::UNKNOWN_READ_METHOD, "Unknown read method '{}' for local filesystem", read_method_str);
read_method_str = settings.remote_filesystem_read_method.value;
if (auto opt_method = magic_enum::enum_cast<RemoteFSReadMethod>(read_method_str))
res.remote_fs_method = *opt_method;
else
throw Exception(ErrorCodes::UNKNOWN_READ_METHOD, "Unknown read method '{}' for remote filesystem", read_method_str);
res.local_fs_prefetch = settings.local_filesystem_read_prefetch;
res.remote_fs_prefetch = settings.remote_filesystem_read_prefetch;
res.remote_fs_read_max_backoff_ms = settings.remote_fs_read_max_backoff_ms;
res.remote_fs_read_backoff_max_tries = settings.remote_fs_read_backoff_max_tries;
res.remote_read_min_bytes_for_seek = settings.remote_read_min_bytes_for_seek;
res.local_fs_buffer_size = settings.max_read_buffer_size;
res.direct_io_threshold = settings.min_bytes_to_use_direct_io;
res.mmap_threshold = settings.min_bytes_to_use_mmap_io;
res.priority = settings.read_priority;
res.http_max_tries = settings.http_max_tries;
res.http_retry_initial_backoff_ms = settings.http_retry_initial_backoff_ms;
res.http_retry_max_backoff_ms = settings.http_retry_max_backoff_ms;
res.mmap_cache = getMMappedFileCache().get();
return res;
}
}