From 1d9d586e20806e733efe139bcdfc0ebfaff691a5 Mon Sep 17 00:00:00 2001 From: Amos Bird Date: Fri, 6 Nov 2020 22:07:56 +0800 Subject: [PATCH] Make global_context consistent. --- src/Databases/DatabaseDictionary.cpp | 4 +-- src/Databases/DatabaseDictionary.h | 2 +- .../MySQL/DatabaseConnectionMySQL.cpp | 1 + src/Dictionaries/XDBCDictionarySource.cpp | 2 +- src/Interpreters/Context.cpp | 14 ++++---- src/Interpreters/Context.h | 6 ++-- src/Interpreters/DatabaseCatalog.cpp | 36 +++++++++---------- src/Interpreters/DatabaseCatalog.h | 9 +++-- ...ExternalLoaderDatabaseConfigRepository.cpp | 4 +-- src/Storages/Distributed/DirectoryMonitor.cpp | 16 ++++----- src/Storages/Kafka/KafkaBlockInputStream.cpp | 1 + src/Storages/Kafka/KafkaBlockInputStream.h | 1 - src/Storages/Kafka/KafkaBlockOutputStream.cpp | 2 +- src/Storages/Kafka/KafkaBlockOutputStream.h | 1 - src/Storages/Kafka/StorageKafka.cpp | 3 +- src/Storages/Kafka/StorageKafka.h | 5 ++- src/Storages/MergeTree/MergeTreeData.cpp | 2 +- .../RabbitMQ/RabbitMQBlockInputStream.cpp | 6 ++-- .../RabbitMQ/RabbitMQBlockInputStream.h | 3 +- .../RabbitMQ/RabbitMQBlockOutputStream.h | 3 +- src/Storages/RabbitMQ/StorageRabbitMQ.cpp | 2 +- src/Storages/RabbitMQ/StorageRabbitMQ.h | 4 +-- .../WriteBufferToRabbitMQProducer.cpp | 10 +++--- .../RabbitMQ/WriteBufferToRabbitMQProducer.h | 4 +-- src/Storages/StorageBuffer.cpp | 4 +-- src/Storages/StorageBuffer.h | 13 ++++--- src/Storages/StorageDistributed.cpp | 14 ++++---- src/Storages/StorageDistributed.h | 2 +- src/Storages/StorageMerge.cpp | 3 +- src/Storages/StorageMerge.h | 3 +- src/Storages/StorageMongoDB.cpp | 7 ++-- src/Storages/StorageMongoDB.h | 5 +-- src/Storages/StorageMySQL.cpp | 2 +- src/Storages/StorageMySQL.h | 3 +- src/Storages/StorageS3.cpp | 8 ++--- src/Storages/StorageS3.h | 4 +-- src/TableFunctions/TableFunctionMySQL.cpp | 1 + 37 files changed, 104 insertions(+), 106 deletions(-) diff --git a/src/Databases/DatabaseDictionary.cpp b/src/Databases/DatabaseDictionary.cpp index 3732139c66a..ff5510f0bf9 100644 --- a/src/Databases/DatabaseDictionary.cpp +++ b/src/Databases/DatabaseDictionary.cpp @@ -44,10 +44,10 @@ namespace } } -DatabaseDictionary::DatabaseDictionary(const String & name_, const Context & global_context_) +DatabaseDictionary::DatabaseDictionary(const String & name_, const Context & context_) : IDatabase(name_) , log(&Poco::Logger::get("DatabaseDictionary(" + database_name + ")")) - , global_context(global_context_.getGlobalContext()) + , global_context(context_.getGlobalContext()) { } diff --git a/src/Databases/DatabaseDictionary.h b/src/Databases/DatabaseDictionary.h index c3c6a53a894..2cfc6ef3285 100644 --- a/src/Databases/DatabaseDictionary.h +++ b/src/Databases/DatabaseDictionary.h @@ -22,7 +22,7 @@ namespace DB class DatabaseDictionary final : public IDatabase { public: - DatabaseDictionary(const String & name_, const Context & global_context); + DatabaseDictionary(const String & name_, const Context & context_); String getEngineName() const override { diff --git a/src/Databases/MySQL/DatabaseConnectionMySQL.cpp b/src/Databases/MySQL/DatabaseConnectionMySQL.cpp index 03d218d132f..45483055739 100644 --- a/src/Databases/MySQL/DatabaseConnectionMySQL.cpp +++ b/src/Databases/MySQL/DatabaseConnectionMySQL.cpp @@ -13,6 +13,7 @@ # include # include # include +# include # include # include # include diff --git a/src/Dictionaries/XDBCDictionarySource.cpp b/src/Dictionaries/XDBCDictionarySource.cpp index 793d8da7390..832c30ed4b7 100644 --- a/src/Dictionaries/XDBCDictionarySource.cpp +++ b/src/Dictionaries/XDBCDictionarySource.cpp @@ -120,7 +120,7 @@ XDBCDictionarySource::XDBCDictionarySource( , invalidate_query{config_.getString(config_prefix_ + ".invalidate_query", "")} , bridge_helper{bridge_} , timeouts{ConnectionTimeouts::getHTTPTimeouts(context_)} - , global_context(context_) + , global_context(context_.getGlobalContext()) { bridge_url = bridge_helper->getMainURI(); diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index 54ee7713e95..21b69b6319d 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -336,9 +336,9 @@ struct ContextShared ReplicatedFetchList replicated_fetch_list; ConfigurationPtr users_config; /// Config with the users, profiles and quotas sections. InterserverIOHandler interserver_io_handler; /// Handler for interserver communication. - std::optional buffer_flush_schedule_pool; /// A thread pool that can do background flush for Buffer tables. - std::optional schedule_pool; /// A thread pool that can run different jobs in background (used in replicated tables) - std::optional distributed_schedule_pool; /// A thread pool that can run different jobs in background (used for distributed sends) + mutable std::optional buffer_flush_schedule_pool; /// A thread pool that can do background flush for Buffer tables. + mutable std::optional schedule_pool; /// A thread pool that can run different jobs in background (used in replicated tables) + mutable std::optional distributed_schedule_pool; /// A thread pool that can run different jobs in background (used for distributed sends) MultiVersion macros; /// Substitutions extracted from config. std::unique_ptr ddl_worker; /// Process ddl commands from zk. /// Rules for selecting the compression settings, depending on the size of the part. @@ -484,7 +484,7 @@ Context Context::createGlobal(ContextShared * shared) void Context::initGlobal() { - DatabaseCatalog::init(this); + DatabaseCatalog::init(*this); TemporaryLiveViewCleaner::init(*this); } @@ -1401,7 +1401,7 @@ void Context::dropCaches() const shared->mark_cache->reset(); } -BackgroundSchedulePool & Context::getBufferFlushSchedulePool() +BackgroundSchedulePool & Context::getBufferFlushSchedulePool() const { auto lock = getLock(); if (!shared->buffer_flush_schedule_pool) @@ -1443,7 +1443,7 @@ BackgroundTaskSchedulingSettings Context::getBackgroundMoveTaskSchedulingSetting return task_settings; } -BackgroundSchedulePool & Context::getSchedulePool() +BackgroundSchedulePool & Context::getSchedulePool() const { auto lock = getLock(); if (!shared->schedule_pool) @@ -1454,7 +1454,7 @@ BackgroundSchedulePool & Context::getSchedulePool() return *shared->schedule_pool; } -BackgroundSchedulePool & Context::getDistributedSchedulePool() +BackgroundSchedulePool & Context::getDistributedSchedulePool() const { auto lock = getLock(); if (!shared->distributed_schedule_pool) diff --git a/src/Interpreters/Context.h b/src/Interpreters/Context.h index 02a57b5d966..d583f3d970f 100644 --- a/src/Interpreters/Context.h +++ b/src/Interpreters/Context.h @@ -519,9 +519,9 @@ public: BackgroundTaskSchedulingSettings getBackgroundProcessingTaskSchedulingSettings() const; BackgroundTaskSchedulingSettings getBackgroundMoveTaskSchedulingSettings() const; - BackgroundSchedulePool & getBufferFlushSchedulePool(); - BackgroundSchedulePool & getSchedulePool(); - BackgroundSchedulePool & getDistributedSchedulePool(); + BackgroundSchedulePool & getBufferFlushSchedulePool() const; + BackgroundSchedulePool & getSchedulePool() const; + BackgroundSchedulePool & getDistributedSchedulePool() const; /// Has distributed_ddl configuration or not. bool hasDistributedDDL() const; diff --git a/src/Interpreters/DatabaseCatalog.cpp b/src/Interpreters/DatabaseCatalog.cpp index 03ec4ccb1eb..81e99b742fe 100644 --- a/src/Interpreters/DatabaseCatalog.cpp +++ b/src/Interpreters/DatabaseCatalog.cpp @@ -39,7 +39,7 @@ namespace ErrorCodes TemporaryTableHolder::TemporaryTableHolder(const Context & context_, const TemporaryTableHolder::Creator & creator, const ASTPtr & query) - : global_context(&context_.getGlobalContext()) + : global_context(context_.getGlobalContext()) , temporary_tables(DatabaseCatalog::instance().getDatabaseForTemporaryTables().get()) { ASTPtr original_create; @@ -62,7 +62,7 @@ TemporaryTableHolder::TemporaryTableHolder(const Context & context_, } auto table_id = StorageID(DatabaseCatalog::TEMPORARY_DATABASE, global_name, id); auto table = creator(table_id); - temporary_tables->createTable(*global_context, global_name, table, original_create); + temporary_tables->createTable(global_context, global_name, table, original_create); table->startup(); } @@ -107,7 +107,7 @@ TemporaryTableHolder & TemporaryTableHolder::operator = (TemporaryTableHolder && TemporaryTableHolder::~TemporaryTableHolder() { if (id != UUIDHelpers::Nil) - temporary_tables->dropTable(*global_context, "_tmp_" + toString(id)); + temporary_tables->dropTable(global_context, "_tmp_" + toString(id)); } StorageID TemporaryTableHolder::getGlobalTableID() const @@ -117,7 +117,7 @@ StorageID TemporaryTableHolder::getGlobalTableID() const StoragePtr TemporaryTableHolder::getTable() const { - auto table = temporary_tables->tryGetTable("_tmp_" + toString(id), *global_context); + auto table = temporary_tables->tryGetTable("_tmp_" + toString(id), global_context); if (!table) throw Exception("Temporary table " + getGlobalTableID().getNameForLogs() + " not found", ErrorCodes::LOGICAL_ERROR); return table; @@ -126,13 +126,13 @@ StoragePtr TemporaryTableHolder::getTable() const void DatabaseCatalog::loadDatabases() { - drop_delay_sec = global_context->getConfigRef().getInt("database_atomic_delay_before_drop_table_sec", default_drop_delay_sec); + drop_delay_sec = global_context.getConfigRef().getInt("database_atomic_delay_before_drop_table_sec", default_drop_delay_sec); - auto db_for_temporary_and_external_tables = std::make_shared(TEMPORARY_DATABASE, *global_context); + auto db_for_temporary_and_external_tables = std::make_shared(TEMPORARY_DATABASE, global_context); attachDatabase(TEMPORARY_DATABASE, db_for_temporary_and_external_tables); loadMarkedAsDroppedTables(); - auto task_holder = global_context->getSchedulePool().createTask("DatabaseCatalog", [this](){ this->dropTableDataTask(); }); + auto task_holder = global_context.getSchedulePool().createTask("DatabaseCatalog", [this](){ this->dropTableDataTask(); }); drop_task = std::make_unique(std::move(task_holder)); (*drop_task)->activate(); std::lock_guard lock{tables_marked_dropped_mutex}; @@ -328,11 +328,11 @@ DatabasePtr DatabaseCatalog::detachDatabase(const String & database_name, bool d if (drop) { /// Delete the database. - db->drop(*global_context); + db->drop(global_context); /// Old ClickHouse versions did not store database.sql files Poco::File database_metadata_file( - global_context->getPath() + "metadata/" + escapeForFileName(database_name) + ".sql"); + global_context.getPath() + "metadata/" + escapeForFileName(database_name) + ".sql"); if (database_metadata_file.exists()) database_metadata_file.remove(false); } @@ -505,14 +505,12 @@ void DatabaseCatalog::updateUUIDMapping(const UUID & uuid, DatabasePtr database, std::unique_ptr DatabaseCatalog::database_catalog; -DatabaseCatalog::DatabaseCatalog(Context * global_context_) +DatabaseCatalog::DatabaseCatalog(Context & global_context_) : global_context(global_context_), log(&Poco::Logger::get("DatabaseCatalog")) { - if (!global_context) - throw Exception("DatabaseCatalog is not initialized. It's a bug.", ErrorCodes::LOGICAL_ERROR); } -DatabaseCatalog & DatabaseCatalog::init(Context * global_context_) +DatabaseCatalog & DatabaseCatalog::init(Context & global_context_) { if (database_catalog) { @@ -651,7 +649,7 @@ void DatabaseCatalog::loadMarkedAsDroppedTables() /// we should load them and enqueue cleanup to remove data from store/ and metadata from ZooKeeper std::map dropped_metadata; - String path = global_context->getPath() + "metadata_dropped/"; + String path = global_context.getPath() + "metadata_dropped/"; if (!std::filesystem::exists(path)) { @@ -706,7 +704,7 @@ void DatabaseCatalog::loadMarkedAsDroppedTables() String DatabaseCatalog::getPathForDroppedMetadata(const StorageID & table_id) const { - return global_context->getPath() + "metadata_dropped/" + + return global_context.getPath() + "metadata_dropped/" + escapeForFileName(table_id.getDatabaseName()) + "." + escapeForFileName(table_id.getTableName()) + "." + toString(table_id.uuid) + ".sql"; @@ -729,7 +727,7 @@ void DatabaseCatalog::enqueueDroppedTableCleanup(StorageID table_id, StoragePtr { /// Try load table from metadata to drop it correctly (e.g. remove metadata from zk or remove data from all volumes) LOG_INFO(log, "Trying load partially dropped table {} from {}", table_id.getNameForLogs(), dropped_metadata_path); - ASTPtr ast = DatabaseOnDisk::parseQueryFromMetadata(log, *global_context, dropped_metadata_path, /*throw_on_error*/ false, /*remove_empty*/false); + ASTPtr ast = DatabaseOnDisk::parseQueryFromMetadata(log, global_context, dropped_metadata_path, /*throw_on_error*/ false, /*remove_empty*/false); auto * create = typeid_cast(ast.get()); assert(!create || create->uuid == table_id.uuid); @@ -740,7 +738,7 @@ void DatabaseCatalog::enqueueDroppedTableCleanup(StorageID table_id, StoragePtr create->table = table_id.table_name; try { - table = createTableFromAST(*create, table_id.getDatabaseName(), data_path, *global_context, false).second; + table = createTableFromAST(*create, table_id.getDatabaseName(), data_path, global_context, false).second; table->is_dropped = true; } catch (...) @@ -867,7 +865,7 @@ void DatabaseCatalog::dropTableFinally(const TableMarkedAsDropped & table) /// Even if table is not loaded, try remove its data from disk. /// TODO remove data from all volumes - String data_path = global_context->getPath() + "store/" + getPathForUUID(table.table_id.uuid); + String data_path = global_context.getPath() + "store/" + getPathForUUID(table.table_id.uuid); Poco::File table_data_dir{data_path}; if (table_data_dir.exists()) { @@ -901,7 +899,7 @@ String DatabaseCatalog::resolveDictionaryName(const String & name) const String maybe_database_name = name.substr(0, pos); String maybe_table_name = name.substr(pos + 1); - auto db_and_table = tryGetDatabaseAndTable({maybe_database_name, maybe_table_name}, *global_context); + auto db_and_table = tryGetDatabaseAndTable({maybe_database_name, maybe_table_name}, global_context); if (!db_and_table.first) return name; assert(db_and_table.second); diff --git a/src/Interpreters/DatabaseCatalog.h b/src/Interpreters/DatabaseCatalog.h index 2fd5c8d2be8..5146c786f64 100644 --- a/src/Interpreters/DatabaseCatalog.h +++ b/src/Interpreters/DatabaseCatalog.h @@ -73,7 +73,6 @@ struct TemporaryTableHolder : boost::noncopyable { typedef std::function Creator; - TemporaryTableHolder() = default; TemporaryTableHolder(const Context & context, const Creator & creator, const ASTPtr & query = {}); /// Creates temporary table with Engine=Memory @@ -95,7 +94,7 @@ struct TemporaryTableHolder : boost::noncopyable operator bool () const { return id != UUIDHelpers::Nil; } - const Context * global_context = nullptr; + const Context & global_context; IDatabase * temporary_tables = nullptr; UUID id = UUIDHelpers::Nil; }; @@ -111,7 +110,7 @@ public: static constexpr const char * TEMPORARY_DATABASE = "_temporary_and_external_tables"; static constexpr const char * SYSTEM_DATABASE = "system"; - static DatabaseCatalog & init(Context * global_context_); + static DatabaseCatalog & init(Context & global_context_); static DatabaseCatalog & instance(); static void shutdown(); @@ -199,7 +198,7 @@ private: // make emplace(global_context_) compile with private constructor ¯\_(ツ)_/¯. static std::unique_ptr database_catalog; - DatabaseCatalog(Context * global_context_); + DatabaseCatalog(Context & global_context_); void assertDatabaseExistsUnlocked(const String & database_name) const; void assertDatabaseDoesntExistUnlocked(const String & database_name) const; @@ -240,7 +239,7 @@ private: using UUIDToDatabaseMap = std::unordered_map; /// For some reason Context is required to get Storage from Database object - Context * global_context; + Context & global_context; mutable std::mutex databases_mutex; ViewDependencies view_dependencies; diff --git a/src/Interpreters/ExternalLoaderDatabaseConfigRepository.cpp b/src/Interpreters/ExternalLoaderDatabaseConfigRepository.cpp index bd29bfb8970..41269b91b5f 100644 --- a/src/Interpreters/ExternalLoaderDatabaseConfigRepository.cpp +++ b/src/Interpreters/ExternalLoaderDatabaseConfigRepository.cpp @@ -42,8 +42,8 @@ namespace } -ExternalLoaderDatabaseConfigRepository::ExternalLoaderDatabaseConfigRepository(IDatabase & database_, const Context & global_context_) - : global_context(global_context_.getGlobalContext()) +ExternalLoaderDatabaseConfigRepository::ExternalLoaderDatabaseConfigRepository(IDatabase & database_, const Context & context_) + : global_context(context_.getGlobalContext()) , database_name(database_.getDatabaseName()) , database(database_) { diff --git a/src/Storages/Distributed/DirectoryMonitor.cpp b/src/Storages/Distributed/DirectoryMonitor.cpp index 43a957a2fd9..47da0a10d9e 100644 --- a/src/Storages/Distributed/DirectoryMonitor.cpp +++ b/src/Storages/Distributed/DirectoryMonitor.cpp @@ -83,13 +83,13 @@ StorageDistributedDirectoryMonitor::StorageDistributedDirectoryMonitor( : storage(storage_) , pool(std::move(pool_)) , path{path_ + '/'} - , should_batch_inserts(storage.global_context->getSettingsRef().distributed_directory_monitor_batch_inserts) - , min_batched_block_size_rows(storage.global_context->getSettingsRef().min_insert_block_size_rows) - , min_batched_block_size_bytes(storage.global_context->getSettingsRef().min_insert_block_size_bytes) + , should_batch_inserts(storage.global_context.getSettingsRef().distributed_directory_monitor_batch_inserts) + , min_batched_block_size_rows(storage.global_context.getSettingsRef().min_insert_block_size_rows) + , min_batched_block_size_bytes(storage.global_context.getSettingsRef().min_insert_block_size_bytes) , current_batch_file_path{path + "current_batch.txt"} - , default_sleep_time{storage.global_context->getSettingsRef().distributed_directory_monitor_sleep_time_ms.totalMilliseconds()} + , default_sleep_time{storage.global_context.getSettingsRef().distributed_directory_monitor_sleep_time_ms.totalMilliseconds()} , sleep_time{default_sleep_time} - , max_sleep_time{storage.global_context->getSettingsRef().distributed_directory_monitor_max_sleep_time_ms.totalMilliseconds()} + , max_sleep_time{storage.global_context.getSettingsRef().distributed_directory_monitor_max_sleep_time_ms.totalMilliseconds()} , log{&Poco::Logger::get(getLoggerName())} , monitor_blocker(monitor_blocker_) , metric_pending_files(CurrentMetrics::DistributedFilesToInsert, 0) @@ -249,7 +249,7 @@ ConnectionPoolPtr StorageDistributedDirectoryMonitor::createPool(const std::stri auto pools = createPoolsForAddresses(name, pool_factory); - const auto settings = storage.global_context->getSettings(); + const auto settings = storage.global_context.getSettings(); return pools.size() == 1 ? pools.front() : std::make_shared(pools, settings.load_balancing, settings.distributed_replica_error_half_life.totalSeconds(), @@ -308,7 +308,7 @@ bool StorageDistributedDirectoryMonitor::processFiles(const std::mapgetSettingsRef()); + auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(storage.global_context.getSettingsRef()); try { @@ -483,7 +483,7 @@ struct StorageDistributedDirectoryMonitor::Batch Poco::File{tmp_file}.renameTo(parent.current_batch_file_path); } - auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(parent.storage.global_context->getSettingsRef()); + auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(parent.storage.global_context.getSettingsRef()); auto connection = parent.pool->get(timeouts); bool batch_broken = false; diff --git a/src/Storages/Kafka/KafkaBlockInputStream.cpp b/src/Storages/Kafka/KafkaBlockInputStream.cpp index c5f598a756c..6a137bd4b8b 100644 --- a/src/Storages/Kafka/KafkaBlockInputStream.cpp +++ b/src/Storages/Kafka/KafkaBlockInputStream.cpp @@ -6,6 +6,7 @@ #include #include #include +#include namespace DB { diff --git a/src/Storages/Kafka/KafkaBlockInputStream.h b/src/Storages/Kafka/KafkaBlockInputStream.h index ae632103653..517df6ecaf7 100644 --- a/src/Storages/Kafka/KafkaBlockInputStream.h +++ b/src/Storages/Kafka/KafkaBlockInputStream.h @@ -1,7 +1,6 @@ #pragma once #include -#include #include #include diff --git a/src/Storages/Kafka/KafkaBlockOutputStream.cpp b/src/Storages/Kafka/KafkaBlockOutputStream.cpp index e7bf562339f..dc5b5283cdc 100644 --- a/src/Storages/Kafka/KafkaBlockOutputStream.cpp +++ b/src/Storages/Kafka/KafkaBlockOutputStream.cpp @@ -1,4 +1,4 @@ -#include "KafkaBlockOutputStream.h" +#include #include #include diff --git a/src/Storages/Kafka/KafkaBlockOutputStream.h b/src/Storages/Kafka/KafkaBlockOutputStream.h index 1121d2a119e..715ed39b8d6 100644 --- a/src/Storages/Kafka/KafkaBlockOutputStream.h +++ b/src/Storages/Kafka/KafkaBlockOutputStream.h @@ -1,7 +1,6 @@ #pragma once #include -#include #include namespace DB diff --git a/src/Storages/Kafka/StorageKafka.cpp b/src/Storages/Kafka/StorageKafka.cpp index effd2869be2..388c21c6ad6 100644 --- a/src/Storages/Kafka/StorageKafka.cpp +++ b/src/Storages/Kafka/StorageKafka.cpp @@ -33,6 +33,7 @@ #include #include #include +#include #include #include #include @@ -169,7 +170,7 @@ namespace StorageKafka::StorageKafka( const StorageID & table_id_, - Context & context_, + const Context & context_, const ColumnsDescription & columns_, std::unique_ptr kafka_settings_) : IStorage(table_id_) diff --git a/src/Storages/Kafka/StorageKafka.h b/src/Storages/Kafka/StorageKafka.h index 4257f1c1854..8ec8e718011 100644 --- a/src/Storages/Kafka/StorageKafka.h +++ b/src/Storages/Kafka/StorageKafka.h @@ -4,7 +4,6 @@ #include #include #include -#include #include #include @@ -69,13 +68,13 @@ public: protected: StorageKafka( const StorageID & table_id_, - Context & context_, + const Context & context_, const ColumnsDescription & columns_, std::unique_ptr kafka_settings_); private: // Configuration and state - Context & global_context; + const Context & global_context; std::unique_ptr kafka_settings; const Names topics; const String brokers; diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index a11dd25d8e3..0d7d8257574 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -134,7 +134,7 @@ MergeTreeData::MergeTreeData( bool attach, BrokenPartCallback broken_part_callback_) : IStorage(table_id_) - , global_context(context_) + , global_context(context_.getGlobalContext()) , merging_params(merging_params_) , require_part_metadata(require_part_metadata_) , relative_data_path(relative_data_path_) diff --git a/src/Storages/RabbitMQ/RabbitMQBlockInputStream.cpp b/src/Storages/RabbitMQ/RabbitMQBlockInputStream.cpp index 830c6224b9e..0909f858fe4 100644 --- a/src/Storages/RabbitMQ/RabbitMQBlockInputStream.cpp +++ b/src/Storages/RabbitMQ/RabbitMQBlockInputStream.cpp @@ -1,6 +1,8 @@ -#include -#include #include + +#include +#include +#include #include namespace ErrorCodes diff --git a/src/Storages/RabbitMQ/RabbitMQBlockInputStream.h b/src/Storages/RabbitMQ/RabbitMQBlockInputStream.h index 5f2c2a62018..2ef1ab70b95 100644 --- a/src/Storages/RabbitMQ/RabbitMQBlockInputStream.h +++ b/src/Storages/RabbitMQ/RabbitMQBlockInputStream.h @@ -1,7 +1,6 @@ #pragma once #include -#include #include #include @@ -38,7 +37,7 @@ public: private: StorageRabbitMQ & storage; StorageMetadataPtr metadata_snapshot; - Context context; + const Context & context; Names column_names; const size_t max_block_size; bool ack_in_suffix; diff --git a/src/Storages/RabbitMQ/RabbitMQBlockOutputStream.h b/src/Storages/RabbitMQ/RabbitMQBlockOutputStream.h index f8ed79438f4..7e5c22f9f39 100644 --- a/src/Storages/RabbitMQ/RabbitMQBlockOutputStream.h +++ b/src/Storages/RabbitMQ/RabbitMQBlockOutputStream.h @@ -1,7 +1,6 @@ #pragma once #include -#include #include @@ -23,7 +22,7 @@ public: private: StorageRabbitMQ & storage; StorageMetadataPtr metadata_snapshot; - Context context; + const Context & context; ProducerBufferPtr buffer; BlockOutputStreamPtr child; }; diff --git a/src/Storages/RabbitMQ/StorageRabbitMQ.cpp b/src/Storages/RabbitMQ/StorageRabbitMQ.cpp index f9e3c0558d3..d32cbaf66ae 100644 --- a/src/Storages/RabbitMQ/StorageRabbitMQ.cpp +++ b/src/Storages/RabbitMQ/StorageRabbitMQ.cpp @@ -69,7 +69,7 @@ namespace ExchangeType StorageRabbitMQ::StorageRabbitMQ( const StorageID & table_id_, - Context & context_, + const Context & context_, const ColumnsDescription & columns_, std::unique_ptr rabbitmq_settings_) : IStorage(table_id_) diff --git a/src/Storages/RabbitMQ/StorageRabbitMQ.h b/src/Storages/RabbitMQ/StorageRabbitMQ.h index 37627178e8b..ebdc7cd8f68 100644 --- a/src/Storages/RabbitMQ/StorageRabbitMQ.h +++ b/src/Storages/RabbitMQ/StorageRabbitMQ.h @@ -67,12 +67,12 @@ public: protected: StorageRabbitMQ( const StorageID & table_id_, - Context & context_, + const Context & context_, const ColumnsDescription & columns_, std::unique_ptr rabbitmq_settings_); private: - Context global_context; + const Context & global_context; Context rabbitmq_context; std::unique_ptr rabbitmq_settings; diff --git a/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.cpp b/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.cpp index 134c00bdc17..ac94659d321 100644 --- a/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.cpp +++ b/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.cpp @@ -1,7 +1,9 @@ #include -#include "Core/Block.h" -#include "Columns/ColumnString.h" -#include "Columns/ColumnsNumber.h" + +#include +#include +#include +#include #include #include #include @@ -25,7 +27,7 @@ namespace ErrorCodes WriteBufferToRabbitMQProducer::WriteBufferToRabbitMQProducer( std::pair & parsed_address_, - Context & global_context, + const Context & global_context, const std::pair & login_password_, const Names & routing_keys_, const String & exchange_name_, diff --git a/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.h b/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.h index 28fa5df8111..6fa4ca9587f 100644 --- a/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.h +++ b/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.h @@ -9,7 +9,7 @@ #include #include #include -#include +#include namespace DB { @@ -19,7 +19,7 @@ class WriteBufferToRabbitMQProducer : public WriteBuffer public: WriteBufferToRabbitMQProducer( std::pair & parsed_address_, - Context & global_context, + const Context & global_context, const std::pair & login_password_, const Names & routing_keys_, const String & exchange_name_, diff --git a/src/Storages/StorageBuffer.cpp b/src/Storages/StorageBuffer.cpp index a2c2325bcc1..b95e2a0a6be 100644 --- a/src/Storages/StorageBuffer.cpp +++ b/src/Storages/StorageBuffer.cpp @@ -65,14 +65,14 @@ StorageBuffer::StorageBuffer( const StorageID & table_id_, const ColumnsDescription & columns_, const ConstraintsDescription & constraints_, - Context & context_, + const Context & context_, size_t num_shards_, const Thresholds & min_thresholds_, const Thresholds & max_thresholds_, const StorageID & destination_id_, bool allow_materialized_) : IStorage(table_id_) - , global_context(context_) + , global_context(context_.getGlobalContext()) , num_shards(num_shards_), buffers(num_shards_) , min_thresholds(min_thresholds_) , max_thresholds(max_thresholds_) diff --git a/src/Storages/StorageBuffer.h b/src/Storages/StorageBuffer.h index 3c8c4b8a040..be9443c005a 100644 --- a/src/Storages/StorageBuffer.h +++ b/src/Storages/StorageBuffer.h @@ -9,7 +9,6 @@ #include #include #include -#include namespace Poco { class Logger; } @@ -82,7 +81,13 @@ public: void startup() override; /// Flush all buffers into the subordinate table and stop background thread. void shutdown() override; - bool optimize(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, const ASTPtr & partition, bool final, bool deduplicate, const Context & context) override; + bool optimize( + const ASTPtr & query, + const StorageMetadataPtr & metadata_snapshot, + const ASTPtr & partition, + bool final, + bool deduplicate, + const Context & context) override; bool supportsSampling() const override { return true; } bool supportsPrewhere() const override @@ -112,7 +117,7 @@ public: private: - Context global_context; + const Context & global_context; struct Buffer { @@ -165,7 +170,7 @@ protected: const StorageID & table_id_, const ColumnsDescription & columns_, const ConstraintsDescription & constraints_, - Context & context_, + const Context & context_, size_t num_shards_, const Thresholds & min_thresholds_, const Thresholds & max_thresholds_, diff --git a/src/Storages/StorageDistributed.cpp b/src/Storages/StorageDistributed.cpp index 911842be5e5..c6a5694843e 100644 --- a/src/Storages/StorageDistributed.cpp +++ b/src/Storages/StorageDistributed.cpp @@ -366,10 +366,10 @@ StorageDistributed::StorageDistributed( : IStorage(id_) , remote_database(remote_database_) , remote_table(remote_table_) - , global_context(std::make_unique(context_)) + , global_context(context_.getGlobalContext()) , log(&Poco::Logger::get("StorageDistributed (" + id_.table_name + ")")) , owned_cluster(std::move(owned_cluster_)) - , cluster_name(global_context->getMacros()->expand(cluster_name_)) + , cluster_name(global_context.getMacros()->expand(cluster_name_)) , has_sharding_key(sharding_key_) , relative_data_path(relative_data_path_) { @@ -380,14 +380,14 @@ StorageDistributed::StorageDistributed( if (sharding_key_) { - sharding_key_expr = buildShardingKeyExpression(sharding_key_, *global_context, storage_metadata.getColumns().getAllPhysical(), false); + sharding_key_expr = buildShardingKeyExpression(sharding_key_, global_context, storage_metadata.getColumns().getAllPhysical(), false); sharding_key_column_name = sharding_key_->getColumnName(); sharding_key_is_deterministic = isExpressionActionsDeterministics(sharding_key_expr); } if (!relative_data_path.empty()) { - storage_policy = global_context->getStoragePolicy(storage_policy_name_); + storage_policy = global_context.getStoragePolicy(storage_policy_name_); data_volume = storage_policy->getVolume(0); if (storage_policy->getVolumes().size() > 1) LOG_WARNING(log, "Storage policy for Distributed table has multiple volumes. " @@ -397,7 +397,7 @@ StorageDistributed::StorageDistributed( /// Sanity check. Skip check if the table is already created to allow the server to start. if (!attach_ && !cluster_name.empty()) { - size_t num_local_shards = global_context->getCluster(cluster_name)->getLocalShardCount(); + size_t num_local_shards = global_context.getCluster(cluster_name)->getLocalShardCount(); if (num_local_shards && remote_database == id_.database_name && remote_table == id_.table_name) throw Exception("Distributed table " + id_.table_name + " looks at itself", ErrorCodes::INFINITE_LOOP); } @@ -719,7 +719,7 @@ StorageDistributedDirectoryMonitor& StorageDistributed::requireDirectoryMonitor( { node_data.connection_pool = StorageDistributedDirectoryMonitor::createPool(name, *this); node_data.directory_monitor = std::make_unique( - *this, path, node_data.connection_pool, monitors_blocker, global_context->getDistributedSchedulePool()); + *this, path, node_data.connection_pool, monitors_blocker, global_context.getDistributedSchedulePool()); } return *node_data.directory_monitor; } @@ -741,7 +741,7 @@ size_t StorageDistributed::getShardCount() const ClusterPtr StorageDistributed::getCluster() const { - return owned_cluster ? owned_cluster : global_context->getCluster(cluster_name); + return owned_cluster ? owned_cluster : global_context.getCluster(cluster_name); } ClusterPtr StorageDistributed::getOptimizedCluster(const Context & context, const StorageMetadataPtr & metadata_snapshot, const ASTPtr & query_ptr) const diff --git a/src/Storages/StorageDistributed.h b/src/Storages/StorageDistributed.h index 790592033d8..58ade73b4cf 100644 --- a/src/Storages/StorageDistributed.h +++ b/src/Storages/StorageDistributed.h @@ -130,7 +130,7 @@ public: String remote_table; ASTPtr remote_table_function_ptr; - std::unique_ptr global_context; + const Context & global_context; Poco::Logger * log; /// Used to implement TableFunctionRemote. diff --git a/src/Storages/StorageMerge.cpp b/src/Storages/StorageMerge.cpp index 3e6f99878a6..fe53383ce2f 100644 --- a/src/Storages/StorageMerge.cpp +++ b/src/Storages/StorageMerge.cpp @@ -5,6 +5,7 @@ #include #include #include +#include #include #include #include @@ -76,7 +77,7 @@ StorageMerge::StorageMerge( : IStorage(table_id_) , source_database(source_database_) , table_name_regexp(table_name_regexp_) - , global_context(context_) + , global_context(context_.getGlobalContext()) { StorageInMemoryMetadata storage_metadata; storage_metadata.setColumns(columns_); diff --git a/src/Storages/StorageMerge.h b/src/Storages/StorageMerge.h index cdd8778d69f..681ea7015e7 100644 --- a/src/Storages/StorageMerge.h +++ b/src/Storages/StorageMerge.h @@ -4,7 +4,6 @@ #include #include -#include namespace DB @@ -50,7 +49,7 @@ public: private: String source_database; OptimizedRegularExpression table_name_regexp; - Context global_context; + const Context & global_context; using StorageWithLockAndName = std::tuple; using StorageListWithLocks = std::list; diff --git a/src/Storages/StorageMongoDB.cpp b/src/Storages/StorageMongoDB.cpp index b9ac2443472..be1159b1a63 100644 --- a/src/Storages/StorageMongoDB.cpp +++ b/src/Storages/StorageMongoDB.cpp @@ -34,8 +34,7 @@ StorageMongoDB::StorageMongoDB( const std::string & username_, const std::string & password_, const ColumnsDescription & columns_, - const ConstraintsDescription & constraints_, - const Context & context_) + const ConstraintsDescription & constraints_) : IStorage(table_id_) , host(host_) , port(port_) @@ -43,7 +42,6 @@ StorageMongoDB::StorageMongoDB( , collection_name(collection_name_) , username(username_) , password(password_) - , global_context(context_) , connection{std::make_shared(host, port)} { StorageInMemoryMetadata storage_metadata; @@ -114,8 +112,7 @@ void registerStorageMongoDB(StorageFactory & factory) username, password, args.columns, - args.constraints, - args.context); + args.constraints); }, { .source_access_type = AccessType::MONGO, diff --git a/src/Storages/StorageMongoDB.h b/src/Storages/StorageMongoDB.h index 3ed03576478..d7b71495574 100644 --- a/src/Storages/StorageMongoDB.h +++ b/src/Storages/StorageMongoDB.h @@ -3,7 +3,6 @@ #include #include -#include #include @@ -28,8 +27,7 @@ public: const std::string & username_, const std::string & password_, const ColumnsDescription & columns_, - const ConstraintsDescription & constraints_, - const Context & context_); + const ConstraintsDescription & constraints_); std::string getName() const override { return "MongoDB"; } @@ -51,7 +49,6 @@ private: std::string username; std::string password; - Context global_context; std::shared_ptr connection; }; diff --git a/src/Storages/StorageMySQL.cpp b/src/Storages/StorageMySQL.cpp index e5c59a794e1..defac1b57cf 100644 --- a/src/Storages/StorageMySQL.cpp +++ b/src/Storages/StorageMySQL.cpp @@ -55,7 +55,7 @@ StorageMySQL::StorageMySQL( , replace_query{replace_query_} , on_duplicate_clause{on_duplicate_clause_} , pool(std::move(pool_)) - , global_context(context_) + , global_context(context_.getGlobalContext()) { StorageInMemoryMetadata storage_metadata; storage_metadata.setColumns(columns_); diff --git a/src/Storages/StorageMySQL.h b/src/Storages/StorageMySQL.h index acab8f9290e..645f3600eee 100644 --- a/src/Storages/StorageMySQL.h +++ b/src/Storages/StorageMySQL.h @@ -8,7 +8,6 @@ # include -# include # include # include @@ -57,7 +56,7 @@ private: std::string on_duplicate_clause; mysqlxx::Pool pool; - Context global_context; + const Context & global_context; }; } diff --git a/src/Storages/StorageS3.cpp b/src/Storages/StorageS3.cpp index e5c16dad958..334ee480078 100644 --- a/src/Storages/StorageS3.cpp +++ b/src/Storages/StorageS3.cpp @@ -194,17 +194,17 @@ StorageS3::StorageS3( UInt64 min_upload_part_size_, const ColumnsDescription & columns_, const ConstraintsDescription & constraints_, - Context & context_, + const Context & context_, const String & compression_method_) : IStorage(table_id_) , uri(uri_) - , context_global(context_) + , global_context(context_.getGlobalContext()) , format_name(format_name_) , min_upload_part_size(min_upload_part_size_) , compression_method(compression_method_) , name(uri_.storage_name) { - context_global.getRemoteHostFilter().checkURL(uri_.uri); + global_context.getRemoteHostFilter().checkURL(uri_.uri); StorageInMemoryMetadata storage_metadata; storage_metadata.setColumns(columns_); storage_metadata.setConstraints(constraints_); @@ -325,7 +325,7 @@ BlockOutputStreamPtr StorageS3::write(const ASTPtr & /*query*/, const StorageMet { return std::make_shared( format_name, min_upload_part_size, metadata_snapshot->getSampleBlock(), - context_global, chooseCompressionMethod(uri.endpoint, compression_method), + global_context, chooseCompressionMethod(uri.endpoint, compression_method), client, uri.bucket, uri.key); } diff --git a/src/Storages/StorageS3.h b/src/Storages/StorageS3.h index 1ecc9409671..96f0cf02e88 100644 --- a/src/Storages/StorageS3.h +++ b/src/Storages/StorageS3.h @@ -33,7 +33,7 @@ public: UInt64 min_upload_part_size_, const ColumnsDescription & columns_, const ConstraintsDescription & constraints_, - Context & context_, + const Context & context_, const String & compression_method_ = ""); String getName() const override @@ -56,7 +56,7 @@ public: private: S3::URI uri; - const Context & context_global; + const Context & global_context; String format_name; UInt64 min_upload_part_size; diff --git a/src/TableFunctions/TableFunctionMySQL.cpp b/src/TableFunctions/TableFunctionMySQL.cpp index 7281ae434e5..d6a62dc68b4 100644 --- a/src/TableFunctions/TableFunctionMySQL.cpp +++ b/src/TableFunctions/TableFunctionMySQL.cpp @@ -10,6 +10,7 @@ # include # include # include +# include # include # include # include