mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-22 09:40:49 +00:00
Merge pull request #16751 from amosbird/globalcontext
Make global_context consistent.
This commit is contained in:
commit
5cc9cb01cd
@ -44,10 +44,10 @@ namespace
|
||||
}
|
||||
}
|
||||
|
||||
DatabaseDictionary::DatabaseDictionary(const String & name_, const Context & global_context_)
|
||||
DatabaseDictionary::DatabaseDictionary(const String & name_, const Context & context_)
|
||||
: IDatabase(name_)
|
||||
, log(&Poco::Logger::get("DatabaseDictionary(" + database_name + ")"))
|
||||
, global_context(global_context_.getGlobalContext())
|
||||
, global_context(context_.getGlobalContext())
|
||||
{
|
||||
}
|
||||
|
||||
|
@ -22,7 +22,7 @@ namespace DB
|
||||
class DatabaseDictionary final : public IDatabase
|
||||
{
|
||||
public:
|
||||
DatabaseDictionary(const String & name_, const Context & global_context);
|
||||
DatabaseDictionary(const String & name_, const Context & context_);
|
||||
|
||||
String getEngineName() const override
|
||||
{
|
||||
|
@ -13,6 +13,7 @@
|
||||
# include <Databases/MySQL/FetchTablesColumnsList.h>
|
||||
# include <Formats/MySQLBlockInputStream.h>
|
||||
# include <IO/Operators.h>
|
||||
# include <Interpreters/Context.h>
|
||||
# include <Parsers/ASTCreateQuery.h>
|
||||
# include <Parsers/ASTFunction.h>
|
||||
# include <Parsers/ParserCreateQuery.h>
|
||||
|
@ -120,7 +120,7 @@ XDBCDictionarySource::XDBCDictionarySource(
|
||||
, invalidate_query{config_.getString(config_prefix_ + ".invalidate_query", "")}
|
||||
, bridge_helper{bridge_}
|
||||
, timeouts{ConnectionTimeouts::getHTTPTimeouts(context_)}
|
||||
, global_context(context_)
|
||||
, global_context(context_.getGlobalContext())
|
||||
{
|
||||
bridge_url = bridge_helper->getMainURI();
|
||||
|
||||
|
@ -336,9 +336,9 @@ struct ContextShared
|
||||
ReplicatedFetchList replicated_fetch_list;
|
||||
ConfigurationPtr users_config; /// Config with the users, profiles and quotas sections.
|
||||
InterserverIOHandler interserver_io_handler; /// Handler for interserver communication.
|
||||
std::optional<BackgroundSchedulePool> buffer_flush_schedule_pool; /// A thread pool that can do background flush for Buffer tables.
|
||||
std::optional<BackgroundSchedulePool> schedule_pool; /// A thread pool that can run different jobs in background (used in replicated tables)
|
||||
std::optional<BackgroundSchedulePool> distributed_schedule_pool; /// A thread pool that can run different jobs in background (used for distributed sends)
|
||||
mutable std::optional<BackgroundSchedulePool> buffer_flush_schedule_pool; /// A thread pool that can do background flush for Buffer tables.
|
||||
mutable std::optional<BackgroundSchedulePool> schedule_pool; /// A thread pool that can run different jobs in background (used in replicated tables)
|
||||
mutable std::optional<BackgroundSchedulePool> distributed_schedule_pool; /// A thread pool that can run different jobs in background (used for distributed sends)
|
||||
MultiVersion<Macros> macros; /// Substitutions extracted from config.
|
||||
std::unique_ptr<DDLWorker> ddl_worker; /// Process ddl commands from zk.
|
||||
/// Rules for selecting the compression settings, depending on the size of the part.
|
||||
@ -484,7 +484,7 @@ Context Context::createGlobal(ContextShared * shared)
|
||||
|
||||
void Context::initGlobal()
|
||||
{
|
||||
DatabaseCatalog::init(this);
|
||||
DatabaseCatalog::init(*this);
|
||||
TemporaryLiveViewCleaner::init(*this);
|
||||
}
|
||||
|
||||
@ -1401,7 +1401,7 @@ void Context::dropCaches() const
|
||||
shared->mark_cache->reset();
|
||||
}
|
||||
|
||||
BackgroundSchedulePool & Context::getBufferFlushSchedulePool()
|
||||
BackgroundSchedulePool & Context::getBufferFlushSchedulePool() const
|
||||
{
|
||||
auto lock = getLock();
|
||||
if (!shared->buffer_flush_schedule_pool)
|
||||
@ -1443,7 +1443,7 @@ BackgroundTaskSchedulingSettings Context::getBackgroundMoveTaskSchedulingSetting
|
||||
return task_settings;
|
||||
}
|
||||
|
||||
BackgroundSchedulePool & Context::getSchedulePool()
|
||||
BackgroundSchedulePool & Context::getSchedulePool() const
|
||||
{
|
||||
auto lock = getLock();
|
||||
if (!shared->schedule_pool)
|
||||
@ -1454,7 +1454,7 @@ BackgroundSchedulePool & Context::getSchedulePool()
|
||||
return *shared->schedule_pool;
|
||||
}
|
||||
|
||||
BackgroundSchedulePool & Context::getDistributedSchedulePool()
|
||||
BackgroundSchedulePool & Context::getDistributedSchedulePool() const
|
||||
{
|
||||
auto lock = getLock();
|
||||
if (!shared->distributed_schedule_pool)
|
||||
|
@ -519,9 +519,9 @@ public:
|
||||
BackgroundTaskSchedulingSettings getBackgroundProcessingTaskSchedulingSettings() const;
|
||||
BackgroundTaskSchedulingSettings getBackgroundMoveTaskSchedulingSettings() const;
|
||||
|
||||
BackgroundSchedulePool & getBufferFlushSchedulePool();
|
||||
BackgroundSchedulePool & getSchedulePool();
|
||||
BackgroundSchedulePool & getDistributedSchedulePool();
|
||||
BackgroundSchedulePool & getBufferFlushSchedulePool() const;
|
||||
BackgroundSchedulePool & getSchedulePool() const;
|
||||
BackgroundSchedulePool & getDistributedSchedulePool() const;
|
||||
|
||||
/// Has distributed_ddl configuration or not.
|
||||
bool hasDistributedDDL() const;
|
||||
|
@ -39,7 +39,7 @@ namespace ErrorCodes
|
||||
|
||||
TemporaryTableHolder::TemporaryTableHolder(const Context & context_,
|
||||
const TemporaryTableHolder::Creator & creator, const ASTPtr & query)
|
||||
: global_context(&context_.getGlobalContext())
|
||||
: global_context(context_.getGlobalContext())
|
||||
, temporary_tables(DatabaseCatalog::instance().getDatabaseForTemporaryTables().get())
|
||||
{
|
||||
ASTPtr original_create;
|
||||
@ -62,7 +62,7 @@ TemporaryTableHolder::TemporaryTableHolder(const Context & context_,
|
||||
}
|
||||
auto table_id = StorageID(DatabaseCatalog::TEMPORARY_DATABASE, global_name, id);
|
||||
auto table = creator(table_id);
|
||||
temporary_tables->createTable(*global_context, global_name, table, original_create);
|
||||
temporary_tables->createTable(global_context, global_name, table, original_create);
|
||||
table->startup();
|
||||
}
|
||||
|
||||
@ -107,7 +107,7 @@ TemporaryTableHolder & TemporaryTableHolder::operator = (TemporaryTableHolder &&
|
||||
TemporaryTableHolder::~TemporaryTableHolder()
|
||||
{
|
||||
if (id != UUIDHelpers::Nil)
|
||||
temporary_tables->dropTable(*global_context, "_tmp_" + toString(id));
|
||||
temporary_tables->dropTable(global_context, "_tmp_" + toString(id));
|
||||
}
|
||||
|
||||
StorageID TemporaryTableHolder::getGlobalTableID() const
|
||||
@ -117,7 +117,7 @@ StorageID TemporaryTableHolder::getGlobalTableID() const
|
||||
|
||||
StoragePtr TemporaryTableHolder::getTable() const
|
||||
{
|
||||
auto table = temporary_tables->tryGetTable("_tmp_" + toString(id), *global_context);
|
||||
auto table = temporary_tables->tryGetTable("_tmp_" + toString(id), global_context);
|
||||
if (!table)
|
||||
throw Exception("Temporary table " + getGlobalTableID().getNameForLogs() + " not found", ErrorCodes::LOGICAL_ERROR);
|
||||
return table;
|
||||
@ -126,13 +126,13 @@ StoragePtr TemporaryTableHolder::getTable() const
|
||||
|
||||
void DatabaseCatalog::loadDatabases()
|
||||
{
|
||||
drop_delay_sec = global_context->getConfigRef().getInt("database_atomic_delay_before_drop_table_sec", default_drop_delay_sec);
|
||||
drop_delay_sec = global_context.getConfigRef().getInt("database_atomic_delay_before_drop_table_sec", default_drop_delay_sec);
|
||||
|
||||
auto db_for_temporary_and_external_tables = std::make_shared<DatabaseMemory>(TEMPORARY_DATABASE, *global_context);
|
||||
auto db_for_temporary_and_external_tables = std::make_shared<DatabaseMemory>(TEMPORARY_DATABASE, global_context);
|
||||
attachDatabase(TEMPORARY_DATABASE, db_for_temporary_and_external_tables);
|
||||
|
||||
loadMarkedAsDroppedTables();
|
||||
auto task_holder = global_context->getSchedulePool().createTask("DatabaseCatalog", [this](){ this->dropTableDataTask(); });
|
||||
auto task_holder = global_context.getSchedulePool().createTask("DatabaseCatalog", [this](){ this->dropTableDataTask(); });
|
||||
drop_task = std::make_unique<BackgroundSchedulePoolTaskHolder>(std::move(task_holder));
|
||||
(*drop_task)->activate();
|
||||
std::lock_guard lock{tables_marked_dropped_mutex};
|
||||
@ -328,11 +328,11 @@ DatabasePtr DatabaseCatalog::detachDatabase(const String & database_name, bool d
|
||||
if (drop)
|
||||
{
|
||||
/// Delete the database.
|
||||
db->drop(*global_context);
|
||||
db->drop(global_context);
|
||||
|
||||
/// Old ClickHouse versions did not store database.sql files
|
||||
Poco::File database_metadata_file(
|
||||
global_context->getPath() + "metadata/" + escapeForFileName(database_name) + ".sql");
|
||||
global_context.getPath() + "metadata/" + escapeForFileName(database_name) + ".sql");
|
||||
if (database_metadata_file.exists())
|
||||
database_metadata_file.remove(false);
|
||||
}
|
||||
@ -505,14 +505,12 @@ void DatabaseCatalog::updateUUIDMapping(const UUID & uuid, DatabasePtr database,
|
||||
|
||||
std::unique_ptr<DatabaseCatalog> DatabaseCatalog::database_catalog;
|
||||
|
||||
DatabaseCatalog::DatabaseCatalog(Context * global_context_)
|
||||
DatabaseCatalog::DatabaseCatalog(Context & global_context_)
|
||||
: global_context(global_context_), log(&Poco::Logger::get("DatabaseCatalog"))
|
||||
{
|
||||
if (!global_context)
|
||||
throw Exception("DatabaseCatalog is not initialized. It's a bug.", ErrorCodes::LOGICAL_ERROR);
|
||||
}
|
||||
|
||||
DatabaseCatalog & DatabaseCatalog::init(Context * global_context_)
|
||||
DatabaseCatalog & DatabaseCatalog::init(Context & global_context_)
|
||||
{
|
||||
if (database_catalog)
|
||||
{
|
||||
@ -651,7 +649,7 @@ void DatabaseCatalog::loadMarkedAsDroppedTables()
|
||||
/// we should load them and enqueue cleanup to remove data from store/ and metadata from ZooKeeper
|
||||
|
||||
std::map<String, StorageID> dropped_metadata;
|
||||
String path = global_context->getPath() + "metadata_dropped/";
|
||||
String path = global_context.getPath() + "metadata_dropped/";
|
||||
|
||||
if (!std::filesystem::exists(path))
|
||||
{
|
||||
@ -706,7 +704,7 @@ void DatabaseCatalog::loadMarkedAsDroppedTables()
|
||||
|
||||
String DatabaseCatalog::getPathForDroppedMetadata(const StorageID & table_id) const
|
||||
{
|
||||
return global_context->getPath() + "metadata_dropped/" +
|
||||
return global_context.getPath() + "metadata_dropped/" +
|
||||
escapeForFileName(table_id.getDatabaseName()) + "." +
|
||||
escapeForFileName(table_id.getTableName()) + "." +
|
||||
toString(table_id.uuid) + ".sql";
|
||||
@ -729,7 +727,7 @@ void DatabaseCatalog::enqueueDroppedTableCleanup(StorageID table_id, StoragePtr
|
||||
{
|
||||
/// Try load table from metadata to drop it correctly (e.g. remove metadata from zk or remove data from all volumes)
|
||||
LOG_INFO(log, "Trying load partially dropped table {} from {}", table_id.getNameForLogs(), dropped_metadata_path);
|
||||
ASTPtr ast = DatabaseOnDisk::parseQueryFromMetadata(log, *global_context, dropped_metadata_path, /*throw_on_error*/ false, /*remove_empty*/false);
|
||||
ASTPtr ast = DatabaseOnDisk::parseQueryFromMetadata(log, global_context, dropped_metadata_path, /*throw_on_error*/ false, /*remove_empty*/false);
|
||||
auto * create = typeid_cast<ASTCreateQuery *>(ast.get());
|
||||
assert(!create || create->uuid == table_id.uuid);
|
||||
|
||||
@ -740,7 +738,7 @@ void DatabaseCatalog::enqueueDroppedTableCleanup(StorageID table_id, StoragePtr
|
||||
create->table = table_id.table_name;
|
||||
try
|
||||
{
|
||||
table = createTableFromAST(*create, table_id.getDatabaseName(), data_path, *global_context, false).second;
|
||||
table = createTableFromAST(*create, table_id.getDatabaseName(), data_path, global_context, false).second;
|
||||
table->is_dropped = true;
|
||||
}
|
||||
catch (...)
|
||||
@ -867,7 +865,7 @@ void DatabaseCatalog::dropTableFinally(const TableMarkedAsDropped & table)
|
||||
|
||||
/// Even if table is not loaded, try remove its data from disk.
|
||||
/// TODO remove data from all volumes
|
||||
String data_path = global_context->getPath() + "store/" + getPathForUUID(table.table_id.uuid);
|
||||
String data_path = global_context.getPath() + "store/" + getPathForUUID(table.table_id.uuid);
|
||||
Poco::File table_data_dir{data_path};
|
||||
if (table_data_dir.exists())
|
||||
{
|
||||
@ -901,7 +899,7 @@ String DatabaseCatalog::resolveDictionaryName(const String & name) const
|
||||
String maybe_database_name = name.substr(0, pos);
|
||||
String maybe_table_name = name.substr(pos + 1);
|
||||
|
||||
auto db_and_table = tryGetDatabaseAndTable({maybe_database_name, maybe_table_name}, *global_context);
|
||||
auto db_and_table = tryGetDatabaseAndTable({maybe_database_name, maybe_table_name}, global_context);
|
||||
if (!db_and_table.first)
|
||||
return name;
|
||||
assert(db_and_table.second);
|
||||
|
@ -73,7 +73,6 @@ struct TemporaryTableHolder : boost::noncopyable
|
||||
{
|
||||
typedef std::function<StoragePtr(const StorageID &)> Creator;
|
||||
|
||||
TemporaryTableHolder() = default;
|
||||
TemporaryTableHolder(const Context & context, const Creator & creator, const ASTPtr & query = {});
|
||||
|
||||
/// Creates temporary table with Engine=Memory
|
||||
@ -95,7 +94,7 @@ struct TemporaryTableHolder : boost::noncopyable
|
||||
|
||||
operator bool () const { return id != UUIDHelpers::Nil; }
|
||||
|
||||
const Context * global_context = nullptr;
|
||||
const Context & global_context;
|
||||
IDatabase * temporary_tables = nullptr;
|
||||
UUID id = UUIDHelpers::Nil;
|
||||
};
|
||||
@ -111,7 +110,7 @@ public:
|
||||
static constexpr const char * TEMPORARY_DATABASE = "_temporary_and_external_tables";
|
||||
static constexpr const char * SYSTEM_DATABASE = "system";
|
||||
|
||||
static DatabaseCatalog & init(Context * global_context_);
|
||||
static DatabaseCatalog & init(Context & global_context_);
|
||||
static DatabaseCatalog & instance();
|
||||
static void shutdown();
|
||||
|
||||
@ -199,7 +198,7 @@ private:
|
||||
// make emplace(global_context_) compile with private constructor ¯\_(ツ)_/¯.
|
||||
static std::unique_ptr<DatabaseCatalog> database_catalog;
|
||||
|
||||
DatabaseCatalog(Context * global_context_);
|
||||
DatabaseCatalog(Context & global_context_);
|
||||
void assertDatabaseExistsUnlocked(const String & database_name) const;
|
||||
void assertDatabaseDoesntExistUnlocked(const String & database_name) const;
|
||||
|
||||
@ -240,7 +239,7 @@ private:
|
||||
using UUIDToDatabaseMap = std::unordered_map<UUID, DatabasePtr>;
|
||||
|
||||
/// For some reason Context is required to get Storage from Database object
|
||||
Context * global_context;
|
||||
Context & global_context;
|
||||
mutable std::mutex databases_mutex;
|
||||
|
||||
ViewDependencies view_dependencies;
|
||||
|
@ -42,8 +42,8 @@ namespace
|
||||
}
|
||||
|
||||
|
||||
ExternalLoaderDatabaseConfigRepository::ExternalLoaderDatabaseConfigRepository(IDatabase & database_, const Context & global_context_)
|
||||
: global_context(global_context_.getGlobalContext())
|
||||
ExternalLoaderDatabaseConfigRepository::ExternalLoaderDatabaseConfigRepository(IDatabase & database_, const Context & context_)
|
||||
: global_context(context_.getGlobalContext())
|
||||
, database_name(database_.getDatabaseName())
|
||||
, database(database_)
|
||||
{
|
||||
|
@ -83,13 +83,13 @@ StorageDistributedDirectoryMonitor::StorageDistributedDirectoryMonitor(
|
||||
: storage(storage_)
|
||||
, pool(std::move(pool_))
|
||||
, path{path_ + '/'}
|
||||
, should_batch_inserts(storage.global_context->getSettingsRef().distributed_directory_monitor_batch_inserts)
|
||||
, min_batched_block_size_rows(storage.global_context->getSettingsRef().min_insert_block_size_rows)
|
||||
, min_batched_block_size_bytes(storage.global_context->getSettingsRef().min_insert_block_size_bytes)
|
||||
, should_batch_inserts(storage.global_context.getSettingsRef().distributed_directory_monitor_batch_inserts)
|
||||
, min_batched_block_size_rows(storage.global_context.getSettingsRef().min_insert_block_size_rows)
|
||||
, min_batched_block_size_bytes(storage.global_context.getSettingsRef().min_insert_block_size_bytes)
|
||||
, current_batch_file_path{path + "current_batch.txt"}
|
||||
, default_sleep_time{storage.global_context->getSettingsRef().distributed_directory_monitor_sleep_time_ms.totalMilliseconds()}
|
||||
, default_sleep_time{storage.global_context.getSettingsRef().distributed_directory_monitor_sleep_time_ms.totalMilliseconds()}
|
||||
, sleep_time{default_sleep_time}
|
||||
, max_sleep_time{storage.global_context->getSettingsRef().distributed_directory_monitor_max_sleep_time_ms.totalMilliseconds()}
|
||||
, max_sleep_time{storage.global_context.getSettingsRef().distributed_directory_monitor_max_sleep_time_ms.totalMilliseconds()}
|
||||
, log{&Poco::Logger::get(getLoggerName())}
|
||||
, monitor_blocker(monitor_blocker_)
|
||||
, metric_pending_files(CurrentMetrics::DistributedFilesToInsert, 0)
|
||||
@ -249,7 +249,7 @@ ConnectionPoolPtr StorageDistributedDirectoryMonitor::createPool(const std::stri
|
||||
|
||||
auto pools = createPoolsForAddresses(name, pool_factory);
|
||||
|
||||
const auto settings = storage.global_context->getSettings();
|
||||
const auto settings = storage.global_context.getSettings();
|
||||
return pools.size() == 1 ? pools.front() : std::make_shared<ConnectionPoolWithFailover>(pools,
|
||||
settings.load_balancing,
|
||||
settings.distributed_replica_error_half_life.totalSeconds(),
|
||||
@ -308,7 +308,7 @@ bool StorageDistributedDirectoryMonitor::processFiles(const std::map<UInt64, std
|
||||
void StorageDistributedDirectoryMonitor::processFile(const std::string & file_path)
|
||||
{
|
||||
LOG_TRACE(log, "Started processing `{}`", file_path);
|
||||
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(storage.global_context->getSettingsRef());
|
||||
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(storage.global_context.getSettingsRef());
|
||||
|
||||
try
|
||||
{
|
||||
@ -483,7 +483,7 @@ struct StorageDistributedDirectoryMonitor::Batch
|
||||
|
||||
Poco::File{tmp_file}.renameTo(parent.current_batch_file_path);
|
||||
}
|
||||
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(parent.storage.global_context->getSettingsRef());
|
||||
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(parent.storage.global_context.getSettingsRef());
|
||||
auto connection = parent.pool->get(timeouts);
|
||||
|
||||
bool batch_broken = false;
|
||||
|
@ -6,6 +6,7 @@
|
||||
#include <Storages/Kafka/ReadBufferFromKafkaConsumer.h>
|
||||
#include <Processors/Formats/InputStreamFromInputFormat.h>
|
||||
#include <common/logger_useful.h>
|
||||
#include <Interpreters/Context.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
@ -1,7 +1,6 @@
|
||||
#pragma once
|
||||
|
||||
#include <DataStreams/IBlockInputStream.h>
|
||||
#include <Interpreters/Context.h>
|
||||
|
||||
#include <Storages/Kafka/StorageKafka.h>
|
||||
#include <Storages/Kafka/ReadBufferFromKafkaConsumer.h>
|
||||
|
@ -1,4 +1,4 @@
|
||||
#include "KafkaBlockOutputStream.h"
|
||||
#include <Storages/Kafka/KafkaBlockOutputStream.h>
|
||||
|
||||
#include <Formats/FormatFactory.h>
|
||||
#include <Storages/Kafka/WriteBufferToKafkaProducer.h>
|
||||
|
@ -1,7 +1,6 @@
|
||||
#pragma once
|
||||
|
||||
#include <DataStreams/IBlockOutputStream.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Storages/Kafka/StorageKafka.h>
|
||||
|
||||
namespace DB
|
||||
|
@ -33,6 +33,7 @@
|
||||
#include <Common/typeid_cast.h>
|
||||
#include <common/logger_useful.h>
|
||||
#include <Common/quoteString.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Processors/Sources/SourceFromInputStream.h>
|
||||
#include <librdkafka/rdkafka.h>
|
||||
#include <common/getFQDNOrHostName.h>
|
||||
@ -169,7 +170,7 @@ namespace
|
||||
|
||||
StorageKafka::StorageKafka(
|
||||
const StorageID & table_id_,
|
||||
Context & context_,
|
||||
const Context & context_,
|
||||
const ColumnsDescription & columns_,
|
||||
std::unique_ptr<KafkaSettings> kafka_settings_)
|
||||
: IStorage(table_id_)
|
||||
|
@ -4,7 +4,6 @@
|
||||
#include <Storages/IStorage.h>
|
||||
#include <Storages/Kafka/Buffer_fwd.h>
|
||||
#include <Storages/Kafka/KafkaSettings.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Common/SettingsChanges.h>
|
||||
|
||||
#include <Poco/Semaphore.h>
|
||||
@ -69,13 +68,13 @@ public:
|
||||
protected:
|
||||
StorageKafka(
|
||||
const StorageID & table_id_,
|
||||
Context & context_,
|
||||
const Context & context_,
|
||||
const ColumnsDescription & columns_,
|
||||
std::unique_ptr<KafkaSettings> kafka_settings_);
|
||||
|
||||
private:
|
||||
// Configuration and state
|
||||
Context & global_context;
|
||||
const Context & global_context;
|
||||
std::unique_ptr<KafkaSettings> kafka_settings;
|
||||
const Names topics;
|
||||
const String brokers;
|
||||
|
@ -134,7 +134,7 @@ MergeTreeData::MergeTreeData(
|
||||
bool attach,
|
||||
BrokenPartCallback broken_part_callback_)
|
||||
: IStorage(table_id_)
|
||||
, global_context(context_)
|
||||
, global_context(context_.getGlobalContext())
|
||||
, merging_params(merging_params_)
|
||||
, require_part_metadata(require_part_metadata_)
|
||||
, relative_data_path(relative_data_path_)
|
||||
|
@ -1,6 +1,8 @@
|
||||
#include <Formats/FormatFactory.h>
|
||||
#include <Processors/Formats/InputStreamFromInputFormat.h>
|
||||
#include <Storages/RabbitMQ/RabbitMQBlockInputStream.h>
|
||||
|
||||
#include <Formats/FormatFactory.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Processors/Formats/InputStreamFromInputFormat.h>
|
||||
#include <Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.h>
|
||||
|
||||
namespace ErrorCodes
|
||||
|
@ -1,7 +1,6 @@
|
||||
#pragma once
|
||||
|
||||
#include <DataStreams/IBlockInputStream.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Storages/RabbitMQ/StorageRabbitMQ.h>
|
||||
#include <Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.h>
|
||||
|
||||
@ -38,7 +37,7 @@ public:
|
||||
private:
|
||||
StorageRabbitMQ & storage;
|
||||
StorageMetadataPtr metadata_snapshot;
|
||||
Context context;
|
||||
const Context & context;
|
||||
Names column_names;
|
||||
const size_t max_block_size;
|
||||
bool ack_in_suffix;
|
||||
|
@ -1,7 +1,6 @@
|
||||
#pragma once
|
||||
|
||||
#include <DataStreams/IBlockOutputStream.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Storages/RabbitMQ/StorageRabbitMQ.h>
|
||||
|
||||
|
||||
@ -23,7 +22,7 @@ public:
|
||||
private:
|
||||
StorageRabbitMQ & storage;
|
||||
StorageMetadataPtr metadata_snapshot;
|
||||
Context context;
|
||||
const Context & context;
|
||||
ProducerBufferPtr buffer;
|
||||
BlockOutputStreamPtr child;
|
||||
};
|
||||
|
@ -69,7 +69,7 @@ namespace ExchangeType
|
||||
|
||||
StorageRabbitMQ::StorageRabbitMQ(
|
||||
const StorageID & table_id_,
|
||||
Context & context_,
|
||||
const Context & context_,
|
||||
const ColumnsDescription & columns_,
|
||||
std::unique_ptr<RabbitMQSettings> rabbitmq_settings_)
|
||||
: IStorage(table_id_)
|
||||
|
@ -67,12 +67,12 @@ public:
|
||||
protected:
|
||||
StorageRabbitMQ(
|
||||
const StorageID & table_id_,
|
||||
Context & context_,
|
||||
const Context & context_,
|
||||
const ColumnsDescription & columns_,
|
||||
std::unique_ptr<RabbitMQSettings> rabbitmq_settings_);
|
||||
|
||||
private:
|
||||
Context global_context;
|
||||
const Context & global_context;
|
||||
Context rabbitmq_context;
|
||||
std::unique_ptr<RabbitMQSettings> rabbitmq_settings;
|
||||
|
||||
|
@ -1,7 +1,9 @@
|
||||
#include <Storages/RabbitMQ/WriteBufferToRabbitMQProducer.h>
|
||||
#include "Core/Block.h"
|
||||
#include "Columns/ColumnString.h"
|
||||
#include "Columns/ColumnsNumber.h"
|
||||
|
||||
#include <Core/Block.h>
|
||||
#include <Columns/ColumnString.h>
|
||||
#include <Columns/ColumnsNumber.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <common/logger_useful.h>
|
||||
#include <amqpcpp.h>
|
||||
#include <uv.h>
|
||||
@ -25,7 +27,7 @@ namespace ErrorCodes
|
||||
|
||||
WriteBufferToRabbitMQProducer::WriteBufferToRabbitMQProducer(
|
||||
std::pair<String, UInt16> & parsed_address_,
|
||||
Context & global_context,
|
||||
const Context & global_context,
|
||||
const std::pair<String, String> & login_password_,
|
||||
const Names & routing_keys_,
|
||||
const String & exchange_name_,
|
||||
|
@ -9,7 +9,7 @@
|
||||
#include <Storages/RabbitMQ/RabbitMQHandler.h>
|
||||
#include <Common/ConcurrentBoundedQueue.h>
|
||||
#include <Core/BackgroundSchedulePool.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Core/Names.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -19,7 +19,7 @@ class WriteBufferToRabbitMQProducer : public WriteBuffer
|
||||
public:
|
||||
WriteBufferToRabbitMQProducer(
|
||||
std::pair<String, UInt16> & parsed_address_,
|
||||
Context & global_context,
|
||||
const Context & global_context,
|
||||
const std::pair<String, String> & login_password_,
|
||||
const Names & routing_keys_,
|
||||
const String & exchange_name_,
|
||||
|
@ -65,14 +65,14 @@ StorageBuffer::StorageBuffer(
|
||||
const StorageID & table_id_,
|
||||
const ColumnsDescription & columns_,
|
||||
const ConstraintsDescription & constraints_,
|
||||
Context & context_,
|
||||
const Context & context_,
|
||||
size_t num_shards_,
|
||||
const Thresholds & min_thresholds_,
|
||||
const Thresholds & max_thresholds_,
|
||||
const StorageID & destination_id_,
|
||||
bool allow_materialized_)
|
||||
: IStorage(table_id_)
|
||||
, global_context(context_)
|
||||
, global_context(context_.getGlobalContext())
|
||||
, num_shards(num_shards_), buffers(num_shards_)
|
||||
, min_thresholds(min_thresholds_)
|
||||
, max_thresholds(max_thresholds_)
|
||||
|
@ -9,7 +9,6 @@
|
||||
#include <Storages/IStorage.h>
|
||||
#include <DataStreams/IBlockOutputStream.h>
|
||||
#include <Poco/Event.h>
|
||||
#include <Interpreters/Context.h>
|
||||
|
||||
|
||||
namespace Poco { class Logger; }
|
||||
@ -82,7 +81,13 @@ public:
|
||||
void startup() override;
|
||||
/// Flush all buffers into the subordinate table and stop background thread.
|
||||
void shutdown() override;
|
||||
bool optimize(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, const ASTPtr & partition, bool final, bool deduplicate, const Context & context) override;
|
||||
bool optimize(
|
||||
const ASTPtr & query,
|
||||
const StorageMetadataPtr & metadata_snapshot,
|
||||
const ASTPtr & partition,
|
||||
bool final,
|
||||
bool deduplicate,
|
||||
const Context & context) override;
|
||||
|
||||
bool supportsSampling() const override { return true; }
|
||||
bool supportsPrewhere() const override
|
||||
@ -112,7 +117,7 @@ public:
|
||||
|
||||
|
||||
private:
|
||||
Context global_context;
|
||||
const Context & global_context;
|
||||
|
||||
struct Buffer
|
||||
{
|
||||
@ -165,7 +170,7 @@ protected:
|
||||
const StorageID & table_id_,
|
||||
const ColumnsDescription & columns_,
|
||||
const ConstraintsDescription & constraints_,
|
||||
Context & context_,
|
||||
const Context & context_,
|
||||
size_t num_shards_,
|
||||
const Thresholds & min_thresholds_,
|
||||
const Thresholds & max_thresholds_,
|
||||
|
@ -366,10 +366,10 @@ StorageDistributed::StorageDistributed(
|
||||
: IStorage(id_)
|
||||
, remote_database(remote_database_)
|
||||
, remote_table(remote_table_)
|
||||
, global_context(std::make_unique<Context>(context_))
|
||||
, global_context(context_.getGlobalContext())
|
||||
, log(&Poco::Logger::get("StorageDistributed (" + id_.table_name + ")"))
|
||||
, owned_cluster(std::move(owned_cluster_))
|
||||
, cluster_name(global_context->getMacros()->expand(cluster_name_))
|
||||
, cluster_name(global_context.getMacros()->expand(cluster_name_))
|
||||
, has_sharding_key(sharding_key_)
|
||||
, relative_data_path(relative_data_path_)
|
||||
{
|
||||
@ -380,14 +380,14 @@ StorageDistributed::StorageDistributed(
|
||||
|
||||
if (sharding_key_)
|
||||
{
|
||||
sharding_key_expr = buildShardingKeyExpression(sharding_key_, *global_context, storage_metadata.getColumns().getAllPhysical(), false);
|
||||
sharding_key_expr = buildShardingKeyExpression(sharding_key_, global_context, storage_metadata.getColumns().getAllPhysical(), false);
|
||||
sharding_key_column_name = sharding_key_->getColumnName();
|
||||
sharding_key_is_deterministic = isExpressionActionsDeterministics(sharding_key_expr);
|
||||
}
|
||||
|
||||
if (!relative_data_path.empty())
|
||||
{
|
||||
storage_policy = global_context->getStoragePolicy(storage_policy_name_);
|
||||
storage_policy = global_context.getStoragePolicy(storage_policy_name_);
|
||||
data_volume = storage_policy->getVolume(0);
|
||||
if (storage_policy->getVolumes().size() > 1)
|
||||
LOG_WARNING(log, "Storage policy for Distributed table has multiple volumes. "
|
||||
@ -397,7 +397,7 @@ StorageDistributed::StorageDistributed(
|
||||
/// Sanity check. Skip check if the table is already created to allow the server to start.
|
||||
if (!attach_ && !cluster_name.empty())
|
||||
{
|
||||
size_t num_local_shards = global_context->getCluster(cluster_name)->getLocalShardCount();
|
||||
size_t num_local_shards = global_context.getCluster(cluster_name)->getLocalShardCount();
|
||||
if (num_local_shards && remote_database == id_.database_name && remote_table == id_.table_name)
|
||||
throw Exception("Distributed table " + id_.table_name + " looks at itself", ErrorCodes::INFINITE_LOOP);
|
||||
}
|
||||
@ -719,7 +719,7 @@ StorageDistributedDirectoryMonitor& StorageDistributed::requireDirectoryMonitor(
|
||||
{
|
||||
node_data.connection_pool = StorageDistributedDirectoryMonitor::createPool(name, *this);
|
||||
node_data.directory_monitor = std::make_unique<StorageDistributedDirectoryMonitor>(
|
||||
*this, path, node_data.connection_pool, monitors_blocker, global_context->getDistributedSchedulePool());
|
||||
*this, path, node_data.connection_pool, monitors_blocker, global_context.getDistributedSchedulePool());
|
||||
}
|
||||
return *node_data.directory_monitor;
|
||||
}
|
||||
@ -741,7 +741,7 @@ size_t StorageDistributed::getShardCount() const
|
||||
|
||||
ClusterPtr StorageDistributed::getCluster() const
|
||||
{
|
||||
return owned_cluster ? owned_cluster : global_context->getCluster(cluster_name);
|
||||
return owned_cluster ? owned_cluster : global_context.getCluster(cluster_name);
|
||||
}
|
||||
|
||||
ClusterPtr StorageDistributed::getOptimizedCluster(const Context & context, const StorageMetadataPtr & metadata_snapshot, const ASTPtr & query_ptr) const
|
||||
|
@ -130,7 +130,7 @@ public:
|
||||
String remote_table;
|
||||
ASTPtr remote_table_function_ptr;
|
||||
|
||||
std::unique_ptr<Context> global_context;
|
||||
const Context & global_context;
|
||||
Poco::Logger * log;
|
||||
|
||||
/// Used to implement TableFunctionRemote.
|
||||
|
@ -5,6 +5,7 @@
|
||||
#include <Storages/StorageFactory.h>
|
||||
#include <Storages/VirtualColumnUtils.h>
|
||||
#include <Storages/AlterCommands.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Interpreters/TreeRewriter.h>
|
||||
#include <Interpreters/ExpressionActions.h>
|
||||
#include <Interpreters/evaluateConstantExpression.h>
|
||||
@ -76,7 +77,7 @@ StorageMerge::StorageMerge(
|
||||
: IStorage(table_id_)
|
||||
, source_database(source_database_)
|
||||
, table_name_regexp(table_name_regexp_)
|
||||
, global_context(context_)
|
||||
, global_context(context_.getGlobalContext())
|
||||
{
|
||||
StorageInMemoryMetadata storage_metadata;
|
||||
storage_metadata.setColumns(columns_);
|
||||
|
@ -4,7 +4,6 @@
|
||||
|
||||
#include <Common/OptimizedRegularExpression.h>
|
||||
#include <Storages/IStorage.h>
|
||||
#include <Interpreters/Context.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -50,7 +49,7 @@ public:
|
||||
private:
|
||||
String source_database;
|
||||
OptimizedRegularExpression table_name_regexp;
|
||||
Context global_context;
|
||||
const Context & global_context;
|
||||
|
||||
using StorageWithLockAndName = std::tuple<StoragePtr, TableLockHolder, String>;
|
||||
using StorageListWithLocks = std::list<StorageWithLockAndName>;
|
||||
|
@ -34,8 +34,7 @@ StorageMongoDB::StorageMongoDB(
|
||||
const std::string & username_,
|
||||
const std::string & password_,
|
||||
const ColumnsDescription & columns_,
|
||||
const ConstraintsDescription & constraints_,
|
||||
const Context & context_)
|
||||
const ConstraintsDescription & constraints_)
|
||||
: IStorage(table_id_)
|
||||
, host(host_)
|
||||
, port(port_)
|
||||
@ -43,7 +42,6 @@ StorageMongoDB::StorageMongoDB(
|
||||
, collection_name(collection_name_)
|
||||
, username(username_)
|
||||
, password(password_)
|
||||
, global_context(context_)
|
||||
, connection{std::make_shared<Poco::MongoDB::Connection>(host, port)}
|
||||
{
|
||||
StorageInMemoryMetadata storage_metadata;
|
||||
@ -114,8 +112,7 @@ void registerStorageMongoDB(StorageFactory & factory)
|
||||
username,
|
||||
password,
|
||||
args.columns,
|
||||
args.constraints,
|
||||
args.context);
|
||||
args.constraints);
|
||||
},
|
||||
{
|
||||
.source_access_type = AccessType::MONGO,
|
||||
|
@ -3,7 +3,6 @@
|
||||
#include <ext/shared_ptr_helper.h>
|
||||
|
||||
#include <Storages/IStorage.h>
|
||||
#include <Interpreters/Context.h>
|
||||
|
||||
#include <Poco/MongoDB/Connection.h>
|
||||
|
||||
@ -28,8 +27,7 @@ public:
|
||||
const std::string & username_,
|
||||
const std::string & password_,
|
||||
const ColumnsDescription & columns_,
|
||||
const ConstraintsDescription & constraints_,
|
||||
const Context & context_);
|
||||
const ConstraintsDescription & constraints_);
|
||||
|
||||
std::string getName() const override { return "MongoDB"; }
|
||||
|
||||
@ -51,7 +49,6 @@ private:
|
||||
std::string username;
|
||||
std::string password;
|
||||
|
||||
Context global_context;
|
||||
std::shared_ptr<Poco::MongoDB::Connection> connection;
|
||||
};
|
||||
|
||||
|
@ -55,7 +55,7 @@ StorageMySQL::StorageMySQL(
|
||||
, replace_query{replace_query_}
|
||||
, on_duplicate_clause{on_duplicate_clause_}
|
||||
, pool(std::move(pool_))
|
||||
, global_context(context_)
|
||||
, global_context(context_.getGlobalContext())
|
||||
{
|
||||
StorageInMemoryMetadata storage_metadata;
|
||||
storage_metadata.setColumns(columns_);
|
||||
|
@ -8,7 +8,6 @@
|
||||
|
||||
# include <ext/shared_ptr_helper.h>
|
||||
|
||||
# include <Interpreters/Context.h>
|
||||
# include <Storages/IStorage.h>
|
||||
# include <mysqlxx/Pool.h>
|
||||
|
||||
@ -57,7 +56,7 @@ private:
|
||||
std::string on_duplicate_clause;
|
||||
|
||||
mysqlxx::Pool pool;
|
||||
Context global_context;
|
||||
const Context & global_context;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -194,17 +194,17 @@ StorageS3::StorageS3(
|
||||
UInt64 min_upload_part_size_,
|
||||
const ColumnsDescription & columns_,
|
||||
const ConstraintsDescription & constraints_,
|
||||
Context & context_,
|
||||
const Context & context_,
|
||||
const String & compression_method_)
|
||||
: IStorage(table_id_)
|
||||
, uri(uri_)
|
||||
, context_global(context_)
|
||||
, global_context(context_.getGlobalContext())
|
||||
, format_name(format_name_)
|
||||
, min_upload_part_size(min_upload_part_size_)
|
||||
, compression_method(compression_method_)
|
||||
, name(uri_.storage_name)
|
||||
{
|
||||
context_global.getRemoteHostFilter().checkURL(uri_.uri);
|
||||
global_context.getRemoteHostFilter().checkURL(uri_.uri);
|
||||
StorageInMemoryMetadata storage_metadata;
|
||||
storage_metadata.setColumns(columns_);
|
||||
storage_metadata.setConstraints(constraints_);
|
||||
@ -325,7 +325,7 @@ BlockOutputStreamPtr StorageS3::write(const ASTPtr & /*query*/, const StorageMet
|
||||
{
|
||||
return std::make_shared<StorageS3BlockOutputStream>(
|
||||
format_name, min_upload_part_size, metadata_snapshot->getSampleBlock(),
|
||||
context_global, chooseCompressionMethod(uri.endpoint, compression_method),
|
||||
global_context, chooseCompressionMethod(uri.endpoint, compression_method),
|
||||
client, uri.bucket, uri.key);
|
||||
}
|
||||
|
||||
|
@ -33,7 +33,7 @@ public:
|
||||
UInt64 min_upload_part_size_,
|
||||
const ColumnsDescription & columns_,
|
||||
const ConstraintsDescription & constraints_,
|
||||
Context & context_,
|
||||
const Context & context_,
|
||||
const String & compression_method_ = "");
|
||||
|
||||
String getName() const override
|
||||
@ -56,7 +56,7 @@ public:
|
||||
|
||||
private:
|
||||
S3::URI uri;
|
||||
const Context & context_global;
|
||||
const Context & global_context;
|
||||
|
||||
String format_name;
|
||||
UInt64 min_upload_part_size;
|
||||
|
@ -10,6 +10,7 @@
|
||||
# include <DataTypes/convertMySQLDataType.h>
|
||||
# include <Formats/MySQLBlockInputStream.h>
|
||||
# include <IO/Operators.h>
|
||||
# include <Interpreters/Context.h>
|
||||
# include <Interpreters/evaluateConstantExpression.h>
|
||||
# include <Parsers/ASTFunction.h>
|
||||
# include <Parsers/ASTLiteral.h>
|
||||
|
Loading…
Reference in New Issue
Block a user