Use ContextPtr

This commit is contained in:
kssenii 2021-04-11 07:44:40 +00:00
parent 820a32d939
commit beae1c5fa4
18 changed files with 114 additions and 122 deletions

View File

@ -31,10 +31,6 @@ SRCS(
MySQL/PacketsProtocolText.cpp MySQL/PacketsProtocolText.cpp
MySQL/PacketsReplication.cpp MySQL/PacketsReplication.cpp
NamesAndTypes.cpp NamesAndTypes.cpp
PostgreSQL/PostgreSQLConnection.cpp
PostgreSQL/PostgreSQLConnectionPool.cpp
PostgreSQL/PostgreSQLPoolWithFailover.cpp
PostgreSQL/insertPostgreSQLValue.cpp
PostgreSQLProtocol.cpp PostgreSQLProtocol.cpp
QueryProcessingStage.cpp QueryProcessingStage.cpp
Settings.cpp Settings.cpp

View File

@ -35,7 +35,7 @@ namespace ErrorCodes
static const auto METADATA_SUFFIX = ".postgresql_replica_metadata"; static const auto METADATA_SUFFIX = ".postgresql_replica_metadata";
DatabaseMaterializePostgreSQL::DatabaseMaterializePostgreSQL( DatabaseMaterializePostgreSQL::DatabaseMaterializePostgreSQL(
const Context & context_, ContextPtr context_,
const String & metadata_path_, const String & metadata_path_,
UUID uuid_, UUID uuid_,
const ASTStorage * database_engine_define_, const ASTStorage * database_engine_define_,
@ -58,7 +58,7 @@ void DatabaseMaterializePostgreSQL::startSynchronization()
remote_database_name, remote_database_name,
connection->getConnectionInfo(), connection->getConnectionInfo(),
metadata_path + METADATA_SUFFIX, metadata_path + METADATA_SUFFIX,
global_context, getContext(),
settings->postgresql_replica_max_block_size.value, settings->postgresql_replica_max_block_size.value,
settings->postgresql_replica_allow_minimal_ddl, true, settings->postgresql_replica_allow_minimal_ddl, true,
settings->postgresql_replica_tables_list.value); settings->postgresql_replica_tables_list.value);
@ -67,12 +67,12 @@ void DatabaseMaterializePostgreSQL::startSynchronization()
for (const auto & table_name : tables_to_replicate) for (const auto & table_name : tables_to_replicate)
{ {
auto storage = tryGetTable(table_name, global_context); auto storage = tryGetTable(table_name, getContext());
if (!storage) if (!storage)
storage = StorageMaterializePostgreSQL::create(StorageID(database_name, table_name), global_context); storage = StorageMaterializePostgreSQL::create(StorageID(database_name, table_name), getContext());
replication_handler->addStorage(table_name, storage->template as<StorageMaterializePostgreSQL>()); replication_handler->addStorage(table_name, storage->as<StorageMaterializePostgreSQL>());
materialized_tables[table_name] = storage; materialized_tables[table_name] = storage;
} }
@ -88,9 +88,9 @@ void DatabaseMaterializePostgreSQL::shutdown()
} }
void DatabaseMaterializePostgreSQL::loadStoredObjects(Context & context, bool has_force_restore_data_flag, bool force_attach) void DatabaseMaterializePostgreSQL::loadStoredObjects(ContextPtr local_context, bool has_force_restore_data_flag, bool force_attach)
{ {
DatabaseAtomic::loadStoredObjects(context, has_force_restore_data_flag, force_attach); DatabaseAtomic::loadStoredObjects(local_context, has_force_restore_data_flag, force_attach);
try try
{ {
@ -107,16 +107,16 @@ void DatabaseMaterializePostgreSQL::loadStoredObjects(Context & context, bool ha
} }
StoragePtr DatabaseMaterializePostgreSQL::tryGetTable(const String & name, const Context & context) const StoragePtr DatabaseMaterializePostgreSQL::tryGetTable(const String & name, ContextPtr local_context) const
{ {
/// When a nested ReplacingMergeTree table is managed from PostgreSQLReplicationHandler, its context is modified /// When a nested ReplacingMergeTree table is managed from PostgreSQLReplicationHandler, its context is modified
/// to show the type of managed table. /// to show the type of managed table.
if (context.hasQueryContext()) if (local_context->hasQueryContext())
{ {
auto storage_set = context.getQueryContext().getQueryFactoriesInfo().storages; auto storage_set = local_context->getQueryContext()->getQueryFactoriesInfo().storages;
if (storage_set.find("ReplacingMergeTree") != storage_set.end()) if (storage_set.find("ReplacingMergeTree") != storage_set.end())
{ {
return DatabaseAtomic::tryGetTable(name, context); return DatabaseAtomic::tryGetTable(name, local_context);
} }
} }
@ -132,14 +132,14 @@ StoragePtr DatabaseMaterializePostgreSQL::tryGetTable(const String & name, const
} }
void DatabaseMaterializePostgreSQL::createTable(const Context & context, const String & name, const StoragePtr & table, const ASTPtr & query) void DatabaseMaterializePostgreSQL::createTable(ContextPtr local_context, const String & name, const StoragePtr & table, const ASTPtr & query)
{ {
if (context.hasQueryContext()) if (local_context->hasQueryContext())
{ {
auto storage_set = context.getQueryContext().getQueryFactoriesInfo().storages; auto storage_set = local_context->getQueryContext()->getQueryFactoriesInfo().storages;
if (storage_set.find("ReplacingMergeTree") != storage_set.end()) if (storage_set.find("ReplacingMergeTree") != storage_set.end())
{ {
DatabaseAtomic::createTable(context, name, table, query); DatabaseAtomic::createTable(local_context, name, table, query);
return; return;
} }
} }
@ -156,7 +156,7 @@ void DatabaseMaterializePostgreSQL::stopReplication()
} }
void DatabaseMaterializePostgreSQL::drop(const Context & context) void DatabaseMaterializePostgreSQL::drop(ContextPtr local_context)
{ {
if (replication_handler) if (replication_handler)
replication_handler->shutdownFinal(); replication_handler->shutdownFinal();
@ -167,12 +167,12 @@ void DatabaseMaterializePostgreSQL::drop(const Context & context)
if (metadata.exists()) if (metadata.exists())
metadata.remove(false); metadata.remove(false);
DatabaseAtomic::drop(context); DatabaseAtomic::drop(local_context);
} }
DatabaseTablesIteratorPtr DatabaseMaterializePostgreSQL::getTablesIterator( DatabaseTablesIteratorPtr DatabaseMaterializePostgreSQL::getTablesIterator(
const Context & /* context */, const DatabaseOnDisk::FilterByNameFunction & /* filter_by_table_name */) ContextPtr /* context */, const DatabaseOnDisk::FilterByNameFunction & /* filter_by_table_name */)
{ {
Tables nested_tables; Tables nested_tables;
for (const auto & [table_name, storage] : materialized_tables) for (const auto & [table_name, storage] : materialized_tables)

View File

@ -20,7 +20,6 @@
namespace DB namespace DB
{ {
class Context;
class PostgreSQLConnection; class PostgreSQLConnection;
using PostgreSQLConnectionPtr = std::shared_ptr<PostgreSQLConnection>; using PostgreSQLConnectionPtr = std::shared_ptr<PostgreSQLConnection>;
@ -30,7 +29,7 @@ class DatabaseMaterializePostgreSQL : public DatabaseAtomic
public: public:
DatabaseMaterializePostgreSQL( DatabaseMaterializePostgreSQL(
const Context & context_, ContextPtr context_,
const String & metadata_path_, const String & metadata_path_,
UUID uuid_, UUID uuid_,
const ASTStorage * database_engine_define_, const ASTStorage * database_engine_define_,
@ -43,16 +42,16 @@ public:
String getMetadataPath() const override { return metadata_path; } String getMetadataPath() const override { return metadata_path; }
void loadStoredObjects(Context &, bool, bool force_attach) override; void loadStoredObjects(ContextPtr, bool, bool force_attach) override;
DatabaseTablesIteratorPtr getTablesIterator( DatabaseTablesIteratorPtr getTablesIterator(
const Context & context, const DatabaseOnDisk::FilterByNameFunction & filter_by_table_name) override; ContextPtr context, const DatabaseOnDisk::FilterByNameFunction & filter_by_table_name) override;
StoragePtr tryGetTable(const String & name, const Context & context) const override; StoragePtr tryGetTable(const String & name, ContextPtr context) const override;
void createTable(const Context & context, const String & name, const StoragePtr & table, const ASTPtr & query) override; void createTable(ContextPtr context, const String & name, const StoragePtr & table, const ASTPtr & query) override;
void drop(const Context & context) override; void drop(ContextPtr local_context) override;
void shutdown() override; void shutdown() override;

View File

@ -149,7 +149,7 @@ StoragePtr DatabasePostgreSQL::fetchTable(const String & table_name, ContextPtr
if (!table_checked && !checkPostgresTable(table_name)) if (!table_checked && !checkPostgresTable(table_name))
return StoragePtr{}; return StoragePtr{};
auto use_nulls = context.getSettingsRef().external_databases_use_nulls; auto use_nulls = local_context->getSettingsRef().external_databases_use_nulls;
auto connection = pool->get(); auto connection = pool->get();
auto columns = fetchPostgreSQLTableStructure(connection->conn(), doubleQuoteString(table_name), use_nulls).columns; auto columns = fetchPostgreSQLTableStructure(connection->conn(), doubleQuoteString(table_name), use_nulls).columns;

View File

@ -216,13 +216,13 @@ BlockIO InterpreterCreateQuery::createDatabase(ASTCreateQuery & create)
"Enable allow_experimental_database_replicated to use it.", ErrorCodes::UNKNOWN_DATABASE_ENGINE); "Enable allow_experimental_database_replicated to use it.", ErrorCodes::UNKNOWN_DATABASE_ENGINE);
} }
if (create.storage->engine->name == "MaterializePostgreSQL" && !context.getSettingsRef().allow_experimental_database_materialize_postgresql && !internal) if (create.storage->engine->name == "MaterializePostgreSQL" && !getContext()->getSettingsRef().allow_experimental_database_materialize_postgresql && !internal)
{ {
throw Exception("MaterializePostgreSQL is an experimental database engine. " throw Exception("MaterializePostgreSQL is an experimental database engine. "
"Enable allow_experimental_database_postgresql_replica to use it.", ErrorCodes::UNKNOWN_DATABASE_ENGINE); "Enable allow_experimental_database_postgresql_replica to use it.", ErrorCodes::UNKNOWN_DATABASE_ENGINE);
} }
DatabasePtr database = DatabaseFactory::get(create, metadata_path / "", getContext); DatabasePtr database = DatabaseFactory::get(create, metadata_path / "", getContext());
if (create.uuid != UUIDHelpers::Nil) if (create.uuid != UUIDHelpers::Nil)
create.database = TABLE_WITH_UUID_NAME_PLACEHOLDER; create.database = TABLE_WITH_UUID_NAME_PLACEHOLDER;

View File

@ -436,7 +436,7 @@ void InterpreterDropQuery::extendQueryLogElemImpl(QueryLogElement & elem, const
elem.query_kind = "Drop"; elem.query_kind = "Drop";
} }
void InterpreterDropQuery::executeDropQuery(ASTDropQuery::Kind kind, const Context & global_context, const Context & current_context, const StorageID & target_table_id, bool no_delay) void InterpreterDropQuery::executeDropQuery(ASTDropQuery::Kind kind, ContextPtr global_context, ContextPtr current_context, const StorageID & target_table_id, bool no_delay)
{ {
if (DatabaseCatalog::instance().tryGetTable(target_table_id, current_context)) if (DatabaseCatalog::instance().tryGetTable(target_table_id, current_context))
{ {
@ -452,13 +452,13 @@ void InterpreterDropQuery::executeDropQuery(ASTDropQuery::Kind kind, const Conte
/// to avoid "Not enough privileges" error if current user has only DROP VIEW ON mat_view_name privilege /// to avoid "Not enough privileges" error if current user has only DROP VIEW ON mat_view_name privilege
/// and not allowed to drop inner table explicitly. Allowing to drop inner table without explicit grant /// and not allowed to drop inner table explicitly. Allowing to drop inner table without explicit grant
/// looks like expected behaviour and we have tests for it. /// looks like expected behaviour and we have tests for it.
auto drop_context = Context(global_context); auto drop_context = Context::createCopy(global_context);
drop_context.getClientInfo().query_kind = ClientInfo::QueryKind::SECONDARY_QUERY; drop_context->getClientInfo().query_kind = ClientInfo::QueryKind::SECONDARY_QUERY;
if (auto txn = current_context.getZooKeeperMetadataTransaction()) if (auto txn = current_context->getZooKeeperMetadataTransaction())
{ {
/// For Replicated database /// For Replicated database
drop_context.setQueryContext(const_cast<Context &>(current_context)); drop_context->setQueryContext(current_context);
drop_context.initZooKeeperMetadataTransaction(txn, true); drop_context->initZooKeeperMetadataTransaction(txn, true);
} }
InterpreterDropQuery drop_interpreter(ast_drop_query, drop_context); InterpreterDropQuery drop_interpreter(ast_drop_query, drop_context);
drop_interpreter.execute(); drop_interpreter.execute();

View File

@ -26,7 +26,7 @@ public:
void extendQueryLogElemImpl(QueryLogElement & elem, const ASTPtr &, ContextPtr) const override; void extendQueryLogElemImpl(QueryLogElement & elem, const ASTPtr &, ContextPtr) const override;
static void executeDropQuery(ASTDropQuery::Kind kind, const Context & global_context, const Context & current_context, const StorageID & target_table_id, bool no_delay); static void executeDropQuery(ASTDropQuery::Kind kind, ContextPtr global_context, ContextPtr current_context, const StorageID & target_table_id, bool no_delay);
private: private:
AccessRightsElements getRequiredAccessForDDLOnCluster() const; AccessRightsElements getRequiredAccessForDDLOnCluster() const;

View File

@ -344,7 +344,7 @@ public:
*/ */
virtual void drop() {} virtual void drop() {}
virtual void dropInnerTableIfAny(bool /* no_delay */, const Context & /* context */) {} virtual void dropInnerTableIfAny(bool /* no_delay */, ContextPtr /* context */) {}
/** Clear the table data and leave it empty. /** Clear the table data and leave it empty.
* Must be called under exclusive lock (lockExclusively). * Must be called under exclusive lock (lockExclusively).

View File

@ -23,7 +23,7 @@ namespace ErrorCodes
} }
MaterializePostgreSQLConsumer::MaterializePostgreSQLConsumer( MaterializePostgreSQLConsumer::MaterializePostgreSQLConsumer(
const Context & context_, ContextPtr context_,
postgres::ConnectionPtr connection_, postgres::ConnectionPtr connection_,
const std::string & replication_slot_name_, const std::string & replication_slot_name_,
const std::string & publication_name_, const std::string & publication_name_,
@ -491,9 +491,9 @@ void MaterializePostgreSQLConsumer::syncTables(std::shared_ptr<pqxx::nontransact
insert->table_id = storage->getStorageID(); insert->table_id = storage->getStorageID();
insert->columns = buffer.columnsAST; insert->columns = buffer.columnsAST;
auto insert_context(context); auto insert_context = Context::createCopy(context);
insert_context.makeQueryContext(); insert_context->makeQueryContext();
insert_context.addQueryFactoriesInfo(Context::QueryLogFactories::Storage, "ReplacingMergeTree"); insert_context->addQueryFactoriesInfo(Context::QueryLogFactories::Storage, "ReplacingMergeTree");
InterpreterInsertQuery interpreter(insert, insert_context, true); InterpreterInsertQuery interpreter(insert, insert_context, true);
auto block_io = interpreter.execute(); auto block_io = interpreter.execute();

View File

@ -27,7 +27,7 @@ public:
using Storages = std::unordered_map<String, StoragePtr>; using Storages = std::unordered_map<String, StoragePtr>;
MaterializePostgreSQLConsumer( MaterializePostgreSQLConsumer(
const Context & context_, ContextPtr context_,
postgres::ConnectionPtr connection_, postgres::ConnectionPtr connection_,
const std::string & replication_slot_name_, const std::string & replication_slot_name_,
const std::string & publication_name_, const std::string & publication_name_,
@ -101,7 +101,7 @@ private:
} }
Poco::Logger * log; Poco::Logger * log;
const Context & context; ContextPtr context;
const std::string replication_slot_name, publication_name; const std::string replication_slot_name, publication_name;
MaterializePostgreSQLMetadata metadata; MaterializePostgreSQLMetadata metadata;

View File

@ -17,13 +17,11 @@ namespace DB
static const auto reschedule_ms = 500; static const auto reschedule_ms = 500;
/// TODO: fetch replica identity index
PostgreSQLReplicationHandler::PostgreSQLReplicationHandler( PostgreSQLReplicationHandler::PostgreSQLReplicationHandler(
const std::string & database_name_, const std::string & database_name_,
const postgres::ConnectionInfo & connection_info_, const postgres::ConnectionInfo & connection_info_,
const std::string & metadata_path_, const std::string & metadata_path_,
const Context & context_, ContextPtr context_,
const size_t max_block_size_, const size_t max_block_size_,
bool allow_minimal_ddl_, bool allow_minimal_ddl_,
bool is_postgresql_replica_database_engine_, bool is_postgresql_replica_database_engine_,
@ -42,8 +40,8 @@ PostgreSQLReplicationHandler::PostgreSQLReplicationHandler(
replication_slot = fmt::format("{}_ch_replication_slot", database_name); replication_slot = fmt::format("{}_ch_replication_slot", database_name);
publication_name = fmt::format("{}_ch_publication", database_name); publication_name = fmt::format("{}_ch_publication", database_name);
startup_task = context.getSchedulePool().createTask("PostgreSQLReplicaStartup", [this]{ waitConnectionAndStart(); }); startup_task = context->getSchedulePool().createTask("PostgreSQLReplicaStartup", [this]{ waitConnectionAndStart(); });
consumer_task = context.getSchedulePool().createTask("PostgreSQLReplicaStartup", [this]{ consumerFunc(); }); consumer_task = context->getSchedulePool().createTask("PostgreSQLReplicaStartup", [this]{ consumerFunc(); });
} }
@ -169,7 +167,7 @@ NameSet PostgreSQLReplicationHandler::loadFromSnapshot(std::string & snapshot_na
query_str = fmt::format("SELECT * FROM {}", storage_data.first); query_str = fmt::format("SELECT * FROM {}", storage_data.first);
const StorageInMemoryMetadata & storage_metadata = nested_storage->getInMemoryMetadata(); const StorageInMemoryMetadata & storage_metadata = nested_storage->getInMemoryMetadata();
auto insert_context = storage_data.second->makeNestedTableContext(); auto insert_context = storage_data.second->getNestedTableContext();
auto insert = std::make_shared<ASTInsertQuery>(); auto insert = std::make_shared<ASTInsertQuery>();
insert->table_id = nested_storage->getStorageID(); insert->table_id = nested_storage->getStorageID();
@ -384,7 +382,7 @@ PostgreSQLTableStructurePtr PostgreSQLReplicationHandler::fetchTableStructure(
if (!is_postgresql_replica_database_engine) if (!is_postgresql_replica_database_engine)
return nullptr; return nullptr;
auto use_nulls = context.getSettingsRef().external_databases_use_nulls; auto use_nulls = context->getSettingsRef().external_databases_use_nulls;
return std::make_unique<PostgreSQLTableStructure>(fetchPostgreSQLTableStructure(tx, table_name, use_nulls, true, true)); return std::make_unique<PostgreSQLTableStructure>(fetchPostgreSQLTableStructure(tx, table_name, use_nulls, true, true));
} }

View File

@ -30,7 +30,7 @@ public:
const std::string & database_name_, const std::string & database_name_,
const postgres::ConnectionInfo & connection_info_, const postgres::ConnectionInfo & connection_info_,
const std::string & metadata_path_, const std::string & metadata_path_,
const Context & context_, ContextPtr context_,
const size_t max_block_size_, const size_t max_block_size_,
bool allow_minimal_ddl_, bool allow_minimal_ddl_,
bool is_postgresql_replica_database_engine_, bool is_postgresql_replica_database_engine_,
@ -79,7 +79,7 @@ private:
PostgreSQLTableStructurePtr fetchTableStructure(std::shared_ptr<pqxx::ReplicationTransaction> tx, const std::string & table_name); PostgreSQLTableStructurePtr fetchTableStructure(std::shared_ptr<pqxx::ReplicationTransaction> tx, const std::string & table_name);
Poco::Logger * log; Poco::Logger * log;
const Context & context; ContextPtr context;
/// Remote database name. /// Remote database name.
const String database_name; const String database_name;

View File

@ -45,11 +45,11 @@ StorageMaterializePostgreSQL::StorageMaterializePostgreSQL(
const String & remote_table_name_, const String & remote_table_name_,
const postgres::ConnectionInfo & connection_info, const postgres::ConnectionInfo & connection_info,
const StorageInMemoryMetadata & storage_metadata, const StorageInMemoryMetadata & storage_metadata,
const Context & context_, ContextPtr context_,
std::unique_ptr<MaterializePostgreSQLSettings> replication_settings_) std::unique_ptr<MaterializePostgreSQLSettings> replication_settings_)
: IStorage(table_id_) : IStorage(table_id_)
, WithContext(context_->getGlobalContext())
, remote_table_name(remote_table_name_) , remote_table_name(remote_table_name_)
, global_context(context_.getGlobalContext())
, replication_settings(std::move(replication_settings_)) , replication_settings(std::move(replication_settings_))
, is_postgresql_replica_database( , is_postgresql_replica_database(
DatabaseCatalog::instance().getDatabase(getStorageID().database_name)->getEngineName() == "MaterializePostgreSQL") DatabaseCatalog::instance().getDatabase(getStorageID().database_name)->getEngineName() == "MaterializePostgreSQL")
@ -65,7 +65,7 @@ StorageMaterializePostgreSQL::StorageMaterializePostgreSQL(
remote_database_name, remote_database_name,
connection_info, connection_info,
metadata_path, metadata_path,
global_context, getContext(),
replication_settings->postgresql_replica_max_block_size.value, replication_settings->postgresql_replica_max_block_size.value,
replication_settings->postgresql_replica_allow_minimal_ddl.value, false); replication_settings->postgresql_replica_allow_minimal_ddl.value, false);
} }
@ -73,9 +73,9 @@ StorageMaterializePostgreSQL::StorageMaterializePostgreSQL(
StorageMaterializePostgreSQL::StorageMaterializePostgreSQL( StorageMaterializePostgreSQL::StorageMaterializePostgreSQL(
const StorageID & table_id_, const StorageID & table_id_,
const Context & context_) ContextPtr context_)
: IStorage(table_id_) : IStorage(table_id_)
, global_context(context_) , WithContext(context_->getGlobalContext())
, is_postgresql_replica_database(true) , is_postgresql_replica_database(true)
, nested_table_id(table_id_) , nested_table_id(table_id_)
, nested_context(makeNestedTableContext()) , nested_context(makeNestedTableContext())
@ -274,12 +274,11 @@ ASTPtr StorageMaterializePostgreSQL::getCreateNestedTableQuery(PostgreSQLTableSt
void StorageMaterializePostgreSQL::createNestedIfNeeded(PostgreSQLTableStructurePtr table_structure) void StorageMaterializePostgreSQL::createNestedIfNeeded(PostgreSQLTableStructurePtr table_structure)
{ {
auto context = makeNestedTableContext();
const auto ast_create = getCreateNestedTableQuery(std::move(table_structure)); const auto ast_create = getCreateNestedTableQuery(std::move(table_structure));
try try
{ {
InterpreterCreateQuery interpreter(ast_create, context); InterpreterCreateQuery interpreter(ast_create, nested_context);
interpreter.execute(); interpreter.execute();
} }
catch (...) catch (...)
@ -289,11 +288,11 @@ void StorageMaterializePostgreSQL::createNestedIfNeeded(PostgreSQLTableStructure
} }
Context StorageMaterializePostgreSQL::makeNestedTableContext() const std::shared_ptr<Context> StorageMaterializePostgreSQL::makeNestedTableContext() const
{ {
auto context(global_context); auto context = Context::createCopy(getContext());
context.makeQueryContext(); context->makeQueryContext();
context.addQueryFactoriesInfo(Context::QueryLogFactories::Storage, "ReplacingMergeTree"); context->addQueryFactoriesInfo(Context::QueryLogFactories::Storage, "ReplacingMergeTree");
return context; return context;
} }
@ -316,14 +315,14 @@ void StorageMaterializePostgreSQL::shutdown()
} }
void StorageMaterializePostgreSQL::dropInnerTableIfAny(bool no_delay, const Context & context) void StorageMaterializePostgreSQL::dropInnerTableIfAny(bool no_delay, ContextPtr local_context)
{ {
if (replication_handler) if (replication_handler)
replication_handler->shutdownFinal(); replication_handler->shutdownFinal();
auto nested_table = getNested(); auto nested_table = getNested();
if (nested_table && !is_postgresql_replica_database) if (nested_table && !is_postgresql_replica_database)
InterpreterDropQuery::executeDropQuery(ASTDropQuery::Kind::Drop, global_context, context, nested_table_id, no_delay); InterpreterDropQuery::executeDropQuery(ASTDropQuery::Kind::Drop, getContext(), local_context, nested_table_id, no_delay);
} }
@ -340,7 +339,7 @@ Pipe StorageMaterializePostgreSQL::read(
const Names & column_names, const Names & column_names,
const StorageMetadataPtr & metadata_snapshot, const StorageMetadataPtr & metadata_snapshot,
SelectQueryInfo & query_info, SelectQueryInfo & query_info,
const Context & context, ContextPtr context_,
QueryProcessingStage::Enum processed_stage, QueryProcessingStage::Enum processed_stage,
size_t max_block_size, size_t max_block_size,
unsigned num_streams) unsigned num_streams)
@ -355,46 +354,46 @@ Pipe StorageMaterializePostgreSQL::read(
column_names, column_names,
metadata_snapshot, metadata_snapshot,
query_info, query_info,
context, context_,
processed_stage, processed_stage,
max_block_size, max_block_size,
num_streams); num_streams);
} }
void StorageMaterializePostgreSQL::renameInMemory(const StorageID & new_table_id) //void StorageMaterializePostgreSQL::renameInMemory(const StorageID & new_table_id)
{ //{
auto old_table_id = getStorageID(); // auto old_table_id = getStorageID();
auto metadata_snapshot = getInMemoryMetadataPtr(); // auto metadata_snapshot = getInMemoryMetadataPtr();
bool from_atomic_to_atomic_database = old_table_id.hasUUID() && new_table_id.hasUUID(); // bool from_atomic_to_atomic_database = old_table_id.hasUUID() && new_table_id.hasUUID();
//
if (has_inner_table && tryGetTargetTable() && !from_atomic_to_atomic_database) // if (has_inner_table && tryGetTargetTable() && !from_atomic_to_atomic_database)
{ // {
auto new_target_table_name = generateInnerTableName(new_table_id); // auto new_target_table_name = generateInnerTableName(new_table_id);
auto rename = std::make_shared<ASTRenameQuery>(); // auto rename = std::make_shared<ASTRenameQuery>();
//
ASTRenameQuery::Table from; // ASTRenameQuery::Table from;
from.database = target_table_id.database_name; // from.database = target_table_id.database_name;
from.table = target_table_id.table_name; // from.table = target_table_id.table_name;
//
ASTRenameQuery::Table to; // ASTRenameQuery::Table to;
to.database = target_table_id.database_name; // to.database = target_table_id.database_name;
to.table = new_target_table_name; // to.table = new_target_table_name;
//
ASTRenameQuery::Element elem; // ASTRenameQuery::Element elem;
elem.from = from; // elem.from = from;
elem.to = to; // elem.to = to;
rename->elements.emplace_back(elem); // rename->elements.emplace_back(elem);
//
InterpreterRenameQuery(rename, global_context).execute(); // InterpreterRenameQuery(rename, global_context).execute();
target_table_id.table_name = new_target_table_name; // target_table_id.table_name = new_target_table_name;
} // }
//
IStorage::renameInMemory(new_table_id); // IStorage::renameInMemory(new_table_id);
const auto & select_query = metadata_snapshot->getSelectQuery(); // const auto & select_query = metadata_snapshot->getSelectQuery();
// TODO Actually we don't need to update dependency if MV has UUID, but then db and table name will be outdated // // TODO Actually we don't need to update dependency if MV has UUID, but then db and table name will be outdated
DatabaseCatalog::instance().updateDependency(select_query.select_table_id, old_table_id, select_query.select_table_id, getStorageID()); // DatabaseCatalog::instance().updateDependency(select_query.select_table_id, old_table_id, select_query.select_table_id, getStorageID());
} //}
void registerStorageMaterializePostgreSQL(StorageFactory & factory) void registerStorageMaterializePostgreSQL(StorageFactory & factory)
@ -414,7 +413,7 @@ void registerStorageMaterializePostgreSQL(StorageFactory & factory)
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
for (auto & engine_arg : engine_args) for (auto & engine_arg : engine_args)
engine_arg = evaluateConstantExpressionOrIdentifierAsLiteral(engine_arg, args.local_context); engine_arg = evaluateConstantExpressionOrIdentifierAsLiteral(engine_arg, args.getContext());
StorageInMemoryMetadata metadata; StorageInMemoryMetadata metadata;
metadata.setColumns(args.columns); metadata.setColumns(args.columns);
@ -427,9 +426,9 @@ void registerStorageMaterializePostgreSQL(StorageFactory & factory)
throw Exception("Storage MaterializePostgreSQL needs order by key or primary key", ErrorCodes::BAD_ARGUMENTS); throw Exception("Storage MaterializePostgreSQL needs order by key or primary key", ErrorCodes::BAD_ARGUMENTS);
if (args.storage_def->primary_key) if (args.storage_def->primary_key)
metadata.primary_key = KeyDescription::getKeyFromAST(args.storage_def->primary_key->ptr(), metadata.columns, args.context); metadata.primary_key = KeyDescription::getKeyFromAST(args.storage_def->primary_key->ptr(), metadata.columns, args.getContext());
else else
metadata.primary_key = KeyDescription::getKeyFromAST(args.storage_def->order_by->ptr(), metadata.columns, args.context); metadata.primary_key = KeyDescription::getKeyFromAST(args.storage_def->order_by->ptr(), metadata.columns, args.getContext());
auto parsed_host_port = parseAddress(engine_args[0]->as<ASTLiteral &>().value.safeGet<String>(), 5432); auto parsed_host_port = parseAddress(engine_args[0]->as<ASTLiteral &>().value.safeGet<String>(), 5432);
const String & remote_table = engine_args[2]->as<ASTLiteral &>().value.safeGet<String>(); const String & remote_table = engine_args[2]->as<ASTLiteral &>().value.safeGet<String>();
@ -445,7 +444,7 @@ void registerStorageMaterializePostgreSQL(StorageFactory & factory)
return StorageMaterializePostgreSQL::create( return StorageMaterializePostgreSQL::create(
args.table_id, remote_database, remote_table, connection_info, args.table_id, remote_database, remote_table, connection_info,
metadata, args.context, metadata, args.getContext(),
std::move(postgresql_replication_settings)); std::move(postgresql_replication_settings));
}; };

View File

@ -15,7 +15,6 @@
#include <Parsers/ASTCreateQuery.h> #include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTColumnDeclaration.h> #include <Parsers/ASTColumnDeclaration.h>
#include <Interpreters/evaluateConstantExpression.h> #include <Interpreters/evaluateConstantExpression.h>
#include <Interpreters/Context.h>
#include <Interpreters/InterpreterCreateQuery.h> #include <Interpreters/InterpreterCreateQuery.h>
#include <Interpreters/ExpressionAnalyzer.h> #include <Interpreters/ExpressionAnalyzer.h>
#include <ext/shared_ptr_helper.h> #include <ext/shared_ptr_helper.h>
@ -24,14 +23,14 @@
namespace DB namespace DB
{ {
class StorageMaterializePostgreSQL final : public ext::shared_ptr_helper<StorageMaterializePostgreSQL>, public IStorage class StorageMaterializePostgreSQL final : public ext::shared_ptr_helper<StorageMaterializePostgreSQL>, public IStorage, WithContext
{ {
friend struct ext::shared_ptr_helper<StorageMaterializePostgreSQL>; friend struct ext::shared_ptr_helper<StorageMaterializePostgreSQL>;
public: public:
StorageMaterializePostgreSQL( StorageMaterializePostgreSQL(
const StorageID & table_id_, const StorageID & table_id_,
const Context & context_); ContextPtr context_);
String getName() const override { return "MaterializePostgreSQL"; } String getName() const override { return "MaterializePostgreSQL"; }
@ -39,7 +38,7 @@ public:
void shutdown() override; void shutdown() override;
void dropInnerTableIfAny(bool no_delay, const Context & context) override; void dropInnerTableIfAny(bool no_delay, ContextPtr local_context) override;
NamesAndTypesList getVirtuals() const override; NamesAndTypesList getVirtuals() const override;
@ -47,7 +46,7 @@ public:
const Names & column_names, const Names & column_names,
const StorageMetadataPtr & metadata_snapshot, const StorageMetadataPtr & metadata_snapshot,
SelectQueryInfo & query_info, SelectQueryInfo & query_info,
const Context & context, ContextPtr context_,
QueryProcessingStage::Enum processed_stage, QueryProcessingStage::Enum processed_stage,
size_t max_block_size, size_t max_block_size,
unsigned num_streams) override; unsigned num_streams) override;
@ -58,7 +57,9 @@ public:
StoragePtr tryGetNested() const; StoragePtr tryGetNested() const;
Context makeNestedTableContext() const; ContextPtr getNestedTableContext() const { return nested_context; }
std::shared_ptr<Context> makeNestedTableContext() const;
void setNestedStatus(bool loaded) { nested_loaded.store(loaded); } void setNestedStatus(bool loaded) { nested_loaded.store(loaded); }
@ -73,7 +74,7 @@ protected:
const String & remote_table_name, const String & remote_table_name,
const postgres::ConnectionInfo & connection_info, const postgres::ConnectionInfo & connection_info,
const StorageInMemoryMetadata & storage_metadata, const StorageInMemoryMetadata & storage_metadata,
const Context & context_, ContextPtr context_,
std::unique_ptr<MaterializePostgreSQLSettings> replication_settings_); std::unique_ptr<MaterializePostgreSQLSettings> replication_settings_);
private: private:
@ -87,7 +88,6 @@ private:
std::string getNestedTableName() const; std::string getNestedTableName() const;
std::string remote_table_name; std::string remote_table_name;
const Context global_context;
std::unique_ptr<MaterializePostgreSQLSettings> replication_settings; std::unique_ptr<MaterializePostgreSQLSettings> replication_settings;
std::unique_ptr<PostgreSQLReplicationHandler> replication_handler; std::unique_ptr<PostgreSQLReplicationHandler> replication_handler;
@ -95,7 +95,7 @@ private:
std::atomic<bool> nested_loaded = false; std::atomic<bool> nested_loaded = false;
bool is_postgresql_replica_database = false; bool is_postgresql_replica_database = false;
StorageID nested_table_id; StorageID nested_table_id;
const Context nested_context; ContextPtr nested_context;
}; };
} }

View File

@ -21,13 +21,13 @@ Pipe readFinalFromNestedStorage(
const Names & column_names, const Names & column_names,
const StorageMetadataPtr & /*metadata_snapshot*/, const StorageMetadataPtr & /*metadata_snapshot*/,
SelectQueryInfo & query_info, SelectQueryInfo & query_info,
const Context & context, ContextPtr context,
QueryProcessingStage::Enum processed_stage, QueryProcessingStage::Enum processed_stage,
size_t max_block_size, size_t max_block_size,
unsigned int num_streams) unsigned int num_streams)
{ {
NameSet column_names_set = NameSet(column_names.begin(), column_names.end()); NameSet column_names_set = NameSet(column_names.begin(), column_names.end());
auto lock = nested_storage->lockForShare(context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout); auto lock = nested_storage->lockForShare(context->getCurrentQueryId(), context->getSettingsRef().lock_acquire_timeout);
const StorageMetadataPtr & nested_metadata = nested_storage->getInMemoryMetadataPtr(); const StorageMetadataPtr & nested_metadata = nested_storage->getInMemoryMetadataPtr();
Block nested_header = nested_metadata->getSampleBlock(); Block nested_header = nested_metadata->getSampleBlock();

View File

@ -18,7 +18,7 @@ Pipe readFinalFromNestedStorage(
const Names & column_names, const Names & column_names,
const StorageMetadataPtr & /*metadata_snapshot*/, const StorageMetadataPtr & /*metadata_snapshot*/,
SelectQueryInfo & query_info, SelectQueryInfo & query_info,
const Context & context, ContextPtr context,
QueryProcessingStage::Enum processed_stage, QueryProcessingStage::Enum processed_stage,
size_t max_block_size, size_t max_block_size,
unsigned int num_streams); unsigned int num_streams);

View File

@ -208,10 +208,10 @@ void StorageMaterializedView::drop()
dropInnerTableIfAny(true, getContext()); dropInnerTableIfAny(true, getContext());
} }
void StorageMaterializedView::dropInnerTableIfAny(bool no_delay, const Context & context) void StorageMaterializedView::dropInnerTableIfAny(bool no_delay, ContextPtr local_context)
{ {
if (has_inner_table && tryGetTargetTable()) if (has_inner_table && tryGetTargetTable())
InterpreterDropQuery::executeDropQuery(ASTDropQuery::Kind::Drop, getContext(), context, target_table_id, no_delay); InterpreterDropQuery::executeDropQuery(ASTDropQuery::Kind::Drop, getContext(), local_context, target_table_id, no_delay);
} }
void StorageMaterializedView::truncate(const ASTPtr &, const StorageMetadataPtr &, ContextPtr local_context, TableExclusiveLockHolder &) void StorageMaterializedView::truncate(const ASTPtr &, const StorageMetadataPtr &, ContextPtr local_context, TableExclusiveLockHolder &)

View File

@ -37,7 +37,7 @@ public:
BlockOutputStreamPtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr context) override; BlockOutputStreamPtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr context) override;
void drop() override; void drop() override;
void dropInnerTableIfAny(bool no_delay, ContextPtr context) override; void dropInnerTableIfAny(bool no_delay, ContextPtr local_context) override;
void truncate(const ASTPtr &, const StorageMetadataPtr &, ContextPtr, TableExclusiveLockHolder &) override; void truncate(const ASTPtr &, const StorageMetadataPtr &, ContextPtr, TableExclusiveLockHolder &) override;