diff --git a/src/Core/PostgreSQL/insertPostgreSQLValue.cpp b/src/Core/PostgreSQL/insertPostgreSQLValue.cpp index 8d490f253db..70537767dc5 100644 --- a/src/Core/PostgreSQL/insertPostgreSQLValue.cpp +++ b/src/Core/PostgreSQL/insertPostgreSQLValue.cpp @@ -1,5 +1,6 @@ #include "insertPostgreSQLValue.h" +#if USE_LIBPQXX #include #include #include @@ -233,3 +234,5 @@ void preparePostgreSQLArrayInfo( array_info[column_idx] = {count_dimensions, default_value, parser}; } } + +#endif diff --git a/src/Core/PostgreSQL/insertPostgreSQLValue.h b/src/Core/PostgreSQL/insertPostgreSQLValue.h index 89d63e44ed3..7acba4f09bd 100644 --- a/src/Core/PostgreSQL/insertPostgreSQLValue.h +++ b/src/Core/PostgreSQL/insertPostgreSQLValue.h @@ -1,5 +1,11 @@ #pragma once +#if !defined(ARCADIA_BUILD) +#include "config_core.h" +#endif + +#if USE_LIBPQXX + #include #include #include @@ -28,3 +34,5 @@ void preparePostgreSQLArrayInfo( void insertDefaultPostgreSQLValue(IColumn & column, const IColumn & sample_column); } + +#endif diff --git a/src/Databases/DatabaseAtomic.cpp b/src/Databases/DatabaseAtomic.cpp index b4222a7e349..ef1917091e3 100644 --- a/src/Databases/DatabaseAtomic.cpp +++ b/src/Databases/DatabaseAtomic.cpp @@ -108,12 +108,10 @@ StoragePtr DatabaseAtomic::detachTable(const String & name) void DatabaseAtomic::dropTable(const Context & context, const String & table_name, bool no_delay) { - if (auto * mv = dynamic_cast(tryGetTable(table_name, context).get())) - { - /// Remove the inner table (if any) to avoid deadlock - /// (due to attempt to execute DROP from the worker thread) - mv->dropInnerTable(no_delay, context); - } + auto * storage = tryGetTable(table_name, context).get(); + /// Remove the inner table (if any) to avoid deadlock + /// (due to attempt to execute DROP from the worker thread) + storage->dropInnerTableIfAny(no_delay, context); String table_metadata_path = getObjectMetadataPath(table_name); String table_metadata_path_drop; @@ -594,4 +592,3 @@ void DatabaseAtomic::waitDetachedTableNotInUse(const UUID & uuid) } } - diff --git a/src/Databases/PostgreSQL/DatabaseMaterializePostgreSQL.cpp b/src/Databases/PostgreSQL/DatabaseMaterializePostgreSQL.cpp index 7f808007ebc..ad60b6242ff 100644 --- a/src/Databases/PostgreSQL/DatabaseMaterializePostgreSQL.cpp +++ b/src/Databases/PostgreSQL/DatabaseMaterializePostgreSQL.cpp @@ -70,9 +70,7 @@ void DatabaseMaterializePostgreSQL::startSynchronization() auto storage = tryGetTable(table_name, global_context); if (!storage) - { - storage = StorageMaterializePostgreSQL::create(StorageID(database_name, table_name), StoragePtr{}, global_context); - } + storage = StorageMaterializePostgreSQL::create(StorageID(database_name, table_name), global_context); replication_handler->addStorage(table_name, storage->template as()); materialized_tables[table_name] = storage; @@ -151,13 +149,17 @@ void DatabaseMaterializePostgreSQL::createTable(const Context & context, const S } +void DatabaseMaterializePostgreSQL::stopReplication() +{ + if (replication_handler) + replication_handler->shutdown(); +} + + void DatabaseMaterializePostgreSQL::drop(const Context & context) { if (replication_handler) - { - replication_handler->shutdown(); replication_handler->shutdownFinal(); - } /// Remove metadata Poco::File metadata(getMetadataPath() + METADATA_SUFFIX); diff --git a/src/Databases/PostgreSQL/DatabaseMaterializePostgreSQL.h b/src/Databases/PostgreSQL/DatabaseMaterializePostgreSQL.h index 405bfd80283..b87d281d7a5 100644 --- a/src/Databases/PostgreSQL/DatabaseMaterializePostgreSQL.h +++ b/src/Databases/PostgreSQL/DatabaseMaterializePostgreSQL.h @@ -56,6 +56,8 @@ public: void shutdown() override; + void stopReplication(); + private: void startSynchronization(); diff --git a/src/Databases/PostgreSQL/fetchPostgreSQLTableStructure.cpp b/src/Databases/PostgreSQL/fetchPostgreSQLTableStructure.cpp index 63124ba12d7..01ade1da180 100644 --- a/src/Databases/PostgreSQL/fetchPostgreSQLTableStructure.cpp +++ b/src/Databases/PostgreSQL/fetchPostgreSQLTableStructure.cpp @@ -184,7 +184,7 @@ PostgreSQLTableStructure fetchPostgreSQLTableStructure( table.primary_key_columns = readNamesAndTypesList(tx, postgres_table_name, query, use_nulls, true); } - if (with_replica_identity_index) + if (with_replica_identity_index && !table.primary_key_columns) { query = fmt::format( "SELECT " @@ -201,7 +201,7 @@ PostgreSQLTableStructure fetchPostgreSQLTableStructure( "and a.attrelid = t.oid " "and a.attnum = ANY(ix.indkey) " "and t.relkind = 'r' " /// simple tables - "and t.relname = '{}' " + "and t.relname = '{}' " /// Connection is alread done to a needed database, only table name is needed. "and ix.indisreplident = 't' " /// index is is replica identity index "ORDER BY a.attname", /// column names postgres_table_name); diff --git a/src/Interpreters/InterpreterDropQuery.cpp b/src/Interpreters/InterpreterDropQuery.cpp index 2c698b5e3c1..6417b32e389 100644 --- a/src/Interpreters/InterpreterDropQuery.cpp +++ b/src/Interpreters/InterpreterDropQuery.cpp @@ -23,7 +23,7 @@ #endif #if USE_LIBPQXX -# include +# include #endif namespace DB @@ -186,11 +186,6 @@ BlockIO InterpreterDropQuery::executeToTableImpl(ASTDropQuery & query, DatabaseP table->shutdown(); -#if USE_LIBPQXX - if (table->getName() == "MaterializePostgreSQL") - table->as()->shutdownFinal(); -#endif - TableExclusiveLockHolder table_lock; if (database->getUUID() == UUIDHelpers::Nil) table_lock = table->lockExclusively(context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout); @@ -353,6 +348,10 @@ BlockIO InterpreterDropQuery::executeToDatabaseImpl(const ASTDropQuery & query, #endif if (auto * replicated = typeid_cast(database.get())) replicated->stopReplication(); +#if USE_LIBPQXX + if (auto * materialize_postgresql = typeid_cast(database.get())) + materialize_postgresql->stopReplication(); +#endif if (database->shouldBeEmptyOnDetach()) { @@ -434,4 +433,33 @@ void InterpreterDropQuery::extendQueryLogElemImpl(QueryLogElement & elem, const elem.query_kind = "Drop"; } +void InterpreterDropQuery::executeDropQuery(ASTDropQuery::Kind kind, const Context & global_context, const Context & current_context, const StorageID & target_table_id, bool no_delay) +{ + if (DatabaseCatalog::instance().tryGetTable(target_table_id, current_context)) + { + /// We create and execute `drop` query for internal table. + auto drop_query = std::make_shared(); + drop_query->database = target_table_id.database_name; + drop_query->table = target_table_id.table_name; + drop_query->kind = kind; + drop_query->no_delay = no_delay; + drop_query->if_exists = true; + ASTPtr ast_drop_query = drop_query; + /// FIXME We have to use global context to execute DROP query for inner table + /// to avoid "Not enough privileges" error if current user has only DROP VIEW ON mat_view_name privilege + /// and not allowed to drop inner table explicitly. Allowing to drop inner table without explicit grant + /// looks like expected behaviour and we have tests for it. + auto drop_context = Context(global_context); + drop_context.getClientInfo().query_kind = ClientInfo::QueryKind::SECONDARY_QUERY; + if (auto txn = current_context.getZooKeeperMetadataTransaction()) + { + /// For Replicated database + drop_context.setQueryContext(const_cast(current_context)); + drop_context.initZooKeeperMetadataTransaction(txn, true); + } + InterpreterDropQuery drop_interpreter(ast_drop_query, drop_context); + drop_interpreter.execute(); + } +} + } diff --git a/src/Interpreters/InterpreterDropQuery.h b/src/Interpreters/InterpreterDropQuery.h index 4a67857767f..3c05c2788f1 100644 --- a/src/Interpreters/InterpreterDropQuery.h +++ b/src/Interpreters/InterpreterDropQuery.h @@ -26,6 +26,8 @@ public: void extendQueryLogElemImpl(QueryLogElement & elem, const ASTPtr &, const Context &) const override; + static void executeDropQuery(ASTDropQuery::Kind kind, const Context & global_context, const Context & current_context, const StorageID & target_table_id, bool no_delay); + private: AccessRightsElements getRequiredAccessForDDLOnCluster() const; ASTPtr query_ptr; diff --git a/src/Storages/IStorage.h b/src/Storages/IStorage.h index b859e654967..2ecf1e33560 100644 --- a/src/Storages/IStorage.h +++ b/src/Storages/IStorage.h @@ -345,6 +345,8 @@ public: */ virtual void drop() {} + virtual void dropInnerTableIfAny(bool /* no_delay */, const Context & /* context */) {} + /** Clear the table data and leave it empty. * Must be called under exclusive lock (lockExclusively). */ diff --git a/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.cpp b/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.cpp index 556ede4ce7b..4b14035fad7 100644 --- a/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.cpp +++ b/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.cpp @@ -4,10 +4,10 @@ #include #include #include - +#include +#include #include #include -#include #include @@ -124,7 +124,7 @@ void PostgreSQLReplicationHandler::startSynchronization() { nested_storages[table_name] = storage->getNested(); storage->setStorageMetadata(); - storage->setNestedLoaded(); + storage->setNestedStatus(true); } catch (...) { @@ -183,7 +183,7 @@ NameSet PostgreSQLReplicationHandler::loadFromSnapshot(std::string & snapshot_na assertBlocksHaveEqualStructure(input.getHeader(), block_io.out->getHeader(), "postgresql replica load from snapshot"); copyData(input, *block_io.out); - storage_data.second->setNestedLoaded(); + storage_data.second->setNestedStatus(true); nested_storages[table_name] = nested_storage; /// This is needed if this method is called from reloadFromSnapshot() method below. @@ -401,7 +401,9 @@ std::unordered_map PostgreSQLReplicationHandler::reloadFromSnapsh const auto & table_name = relation.second; auto * storage = storages[table_name]; sync_storages[table_name] = storage; - storage->dropNested(); + auto nested_storage = storage->getNested(); + storage->setNestedStatus(false); + InterpreterDropQuery::executeDropQuery(ASTDropQuery::Kind::Drop, context, context, nested_storage->getStorageID(), true); } auto replication_connection = postgres::createReplicationConnection(connection_info); diff --git a/src/Storages/PostgreSQL/StorageMaterializePostgreSQL.cpp b/src/Storages/PostgreSQL/StorageMaterializePostgreSQL.cpp index e4b87b8410b..b26ecf805f6 100644 --- a/src/Storages/PostgreSQL/StorageMaterializePostgreSQL.cpp +++ b/src/Storages/PostgreSQL/StorageMaterializePostgreSQL.cpp @@ -23,6 +23,7 @@ #include #include #include +#include namespace DB @@ -52,6 +53,8 @@ StorageMaterializePostgreSQL::StorageMaterializePostgreSQL( , replication_settings(std::move(replication_settings_)) , is_postgresql_replica_database( DatabaseCatalog::instance().getDatabase(getStorageID().database_name)->getEngineName() == "MaterializePostgreSQL") + , nested_table_id(StorageID(table_id_.database_name, getNestedTableName())) + , nested_context(makeNestedTableContext()) { setInMemoryMetadata(storage_metadata); @@ -70,16 +73,28 @@ StorageMaterializePostgreSQL::StorageMaterializePostgreSQL( StorageMaterializePostgreSQL::StorageMaterializePostgreSQL( const StorageID & table_id_, - StoragePtr nested_storage_, const Context & context_) : IStorage(table_id_) , global_context(context_) - , nested_storage(nested_storage_) , is_postgresql_replica_database(true) + , nested_table_id(table_id_) + , nested_context(makeNestedTableContext()) { } +StoragePtr StorageMaterializePostgreSQL::getNested() const +{ + return DatabaseCatalog::instance().getTable(nested_table_id, nested_context); +} + + +StoragePtr StorageMaterializePostgreSQL::tryGetNested() const +{ + return DatabaseCatalog::instance().tryGetTable(nested_table_id, nested_context); +} + + std::string StorageMaterializePostgreSQL::getNestedTableName() const { auto table_name = getStorageID().table_name; @@ -91,6 +106,17 @@ std::string StorageMaterializePostgreSQL::getNestedTableName() const } +void StorageMaterializePostgreSQL::setStorageMetadata() +{ + /// If it is a MaterializePostgreSQL database engine, then storage with engine MaterializePostgreSQL + /// gets its metadata when it is fetch from postges, but if inner tables exist (i.e. it is a server restart) + /// then metadata for storage needs to be set from inner table metadata. + auto nested_table = getNested(); + auto storage_metadata = nested_table->getInMemoryMetadataPtr(); + setInMemoryMetadata(*storage_metadata); +} + + std::shared_ptr StorageMaterializePostgreSQL::getMaterializedColumnsDeclaration( const String name, const String type, UInt64 default_value) { @@ -150,13 +176,6 @@ ASTPtr StorageMaterializePostgreSQL::getColumnDeclaration(const DataTypePtr & da } -void StorageMaterializePostgreSQL::setStorageMetadata() -{ - auto storage_metadata = getNested()->getInMemoryMetadataPtr(); - setInMemoryMetadata(*storage_metadata); -} - - /// For single storage MaterializePostgreSQL get columns and primary key columns from storage definition. /// For database engine MaterializePostgreSQL get columns and primary key columns by fetching from PostgreSQL, also using the same /// transaction with snapshot, which is used for initial tables dump. @@ -231,8 +250,8 @@ ASTPtr StorageMaterializePostgreSQL::getCreateNestedTableQuery(PostgreSQLTableSt columns_declare_list->set(columns_declare_list->columns, columns_expression_list); - columns_declare_list->columns->children.emplace_back(getMaterializedColumnsDeclaration("_sign", "Int8", UInt64(1))); - columns_declare_list->columns->children.emplace_back(getMaterializedColumnsDeclaration("_version", "UInt64", UInt64(1))); + columns_declare_list->columns->children.emplace_back(getMaterializedColumnsDeclaration("_sign", "Int8", 1)); + columns_declare_list->columns->children.emplace_back(getMaterializedColumnsDeclaration("_version", "UInt64", 1)); create_table_query->set(create_table_query->columns_list, columns_declare_list); @@ -255,14 +274,6 @@ ASTPtr StorageMaterializePostgreSQL::getCreateNestedTableQuery(PostgreSQLTableSt void StorageMaterializePostgreSQL::createNestedIfNeeded(PostgreSQLTableStructurePtr table_structure) { - if (nested_loaded) - { - nested_storage = tryGetNested(); - - if (nested_storage) - return; - } - auto context = makeNestedTableContext(); const auto ast_create = getCreateNestedTableQuery(std::move(table_structure)); @@ -275,8 +286,6 @@ void StorageMaterializePostgreSQL::createNestedIfNeeded(PostgreSQLTableStructure { tryLogCurrentException(__PRETTY_FUNCTION__); } - - nested_storage = getNested(); } @@ -290,32 +299,6 @@ Context StorageMaterializePostgreSQL::makeNestedTableContext() const } -StoragePtr StorageMaterializePostgreSQL::getNested() -{ - if (nested_storage) - return nested_storage; - - auto context = makeNestedTableContext(); - nested_storage = DatabaseCatalog::instance().getTable( - StorageID(getStorageID().database_name, getNestedTableName()), context); - - return nested_storage; -} - - -StoragePtr StorageMaterializePostgreSQL::tryGetNested() -{ - if (nested_storage) - return nested_storage; - - auto context = makeNestedTableContext(); - nested_storage = DatabaseCatalog::instance().tryGetTable( - StorageID(getStorageID().database_name, getNestedTableName()), context); - - return nested_storage; -} - - void StorageMaterializePostgreSQL::startup() { if (!is_postgresql_replica_database) @@ -333,47 +316,23 @@ void StorageMaterializePostgreSQL::shutdown() } -void StorageMaterializePostgreSQL::shutdownFinal() +void StorageMaterializePostgreSQL::dropInnerTableIfAny(bool no_delay, const Context & context) { - if (is_postgresql_replica_database) - return; - if (replication_handler) replication_handler->shutdownFinal(); - if (nested_storage) - dropNested(); -} - - -void StorageMaterializePostgreSQL::dropNested() -{ - std::lock_guard lock(nested_mutex); - nested_loaded = false; - - auto table_id = nested_storage->getStorageID(); - auto ast_drop = std::make_shared(); - - ast_drop->kind = ASTDropQuery::Drop; - ast_drop->table = table_id.table_name; - ast_drop->database = table_id.database_name; - ast_drop->if_exists = true; - - auto context = makeNestedTableContext(); - auto interpreter = InterpreterDropQuery(ast_drop, context); - interpreter.execute(); - - nested_storage = nullptr; - LOG_TRACE(&Poco::Logger::get("StorageMaterializePostgreSQL"), "Dropped (possibly temporarily) nested table {}", getNestedTableName()); + auto nested_table = getNested(); + if (nested_table && !is_postgresql_replica_database) + InterpreterDropQuery::executeDropQuery(ASTDropQuery::Kind::Drop, global_context, context, nested_table_id, no_delay); } NamesAndTypesList StorageMaterializePostgreSQL::getVirtuals() const { - if (nested_storage) - return nested_storage->getVirtuals(); - - return {}; + return NamesAndTypesList{ + {"_sign", std::make_shared()}, + {"_version", std::make_shared()} + }; } @@ -386,26 +345,20 @@ Pipe StorageMaterializePostgreSQL::read( size_t max_block_size, unsigned num_streams) { - std::unique_lock lock(nested_mutex, std::defer_lock); + if (!nested_loaded) + return Pipe(); - if (nested_loaded && lock.try_lock()) - { - if (!nested_storage) - getNested(); + auto nested_table = getNested(); - return readFinalFromNestedStorage( - nested_storage, - column_names, - metadata_snapshot, - query_info, - context, - processed_stage, - max_block_size, - num_streams); - } - - LOG_WARNING(&Poco::Logger::get("StorageMaterializePostgreSQL"), "Nested table {} is unavailable or is not loaded yet", getNestedTableName()); - return Pipe(); + return readFinalFromNestedStorage( + nested_table, + column_names, + metadata_snapshot, + query_info, + context, + processed_stage, + max_block_size, + num_streams); } diff --git a/src/Storages/PostgreSQL/StorageMaterializePostgreSQL.h b/src/Storages/PostgreSQL/StorageMaterializePostgreSQL.h index 5bbea64133a..f311414c041 100644 --- a/src/Storages/PostgreSQL/StorageMaterializePostgreSQL.h +++ b/src/Storages/PostgreSQL/StorageMaterializePostgreSQL.h @@ -31,14 +31,16 @@ class StorageMaterializePostgreSQL final : public ext::shared_ptr_helper replication_handler; std::atomic nested_loaded = false; - StoragePtr nested_storage; - std::mutex nested_mutex; - bool is_postgresql_replica_database = false; + StorageID nested_table_id; + const Context nested_context; }; } diff --git a/src/Storages/StorageMaterializedView.cpp b/src/Storages/StorageMaterializedView.cpp index c89187a46e2..75e34f97532 100644 --- a/src/Storages/StorageMaterializedView.cpp +++ b/src/Storages/StorageMaterializedView.cpp @@ -3,7 +3,6 @@ #include #include #include -#include #include #include @@ -198,36 +197,6 @@ BlockOutputStreamPtr StorageMaterializedView::write(const ASTPtr & query, const } -static void executeDropQuery(ASTDropQuery::Kind kind, const Context & global_context, const Context & current_context, const StorageID & target_table_id, bool no_delay) -{ - if (DatabaseCatalog::instance().tryGetTable(target_table_id, current_context)) - { - /// We create and execute `drop` query for internal table. - auto drop_query = std::make_shared(); - drop_query->database = target_table_id.database_name; - drop_query->table = target_table_id.table_name; - drop_query->kind = kind; - drop_query->no_delay = no_delay; - drop_query->if_exists = true; - ASTPtr ast_drop_query = drop_query; - /// FIXME We have to use global context to execute DROP query for inner table - /// to avoid "Not enough privileges" error if current user has only DROP VIEW ON mat_view_name privilege - /// and not allowed to drop inner table explicitly. Allowing to drop inner table without explicit grant - /// looks like expected behaviour and we have tests for it. - auto drop_context = Context(global_context); - drop_context.getClientInfo().query_kind = ClientInfo::QueryKind::SECONDARY_QUERY; - if (auto txn = current_context.getZooKeeperMetadataTransaction()) - { - /// For Replicated database - drop_context.setQueryContext(const_cast(current_context)); - drop_context.initZooKeeperMetadataTransaction(txn, true); - } - InterpreterDropQuery drop_interpreter(ast_drop_query, drop_context); - drop_interpreter.execute(); - } -} - - void StorageMaterializedView::drop() { auto table_id = getStorageID(); @@ -235,19 +204,19 @@ void StorageMaterializedView::drop() if (!select_query.select_table_id.empty()) DatabaseCatalog::instance().removeDependency(select_query.select_table_id, table_id); - dropInnerTable(true, global_context); + dropInnerTableIfAny(true, global_context); } -void StorageMaterializedView::dropInnerTable(bool no_delay, const Context & context) +void StorageMaterializedView::dropInnerTableIfAny(bool no_delay, const Context & context) { if (has_inner_table && tryGetTargetTable()) - executeDropQuery(ASTDropQuery::Kind::Drop, global_context, context, target_table_id, no_delay); + InterpreterDropQuery::executeDropQuery(ASTDropQuery::Kind::Drop, global_context, context, target_table_id, no_delay); } void StorageMaterializedView::truncate(const ASTPtr &, const StorageMetadataPtr &, const Context & context, TableExclusiveLockHolder &) { if (has_inner_table) - executeDropQuery(ASTDropQuery::Kind::Truncate, global_context, context, target_table_id, true); + InterpreterDropQuery::executeDropQuery(ASTDropQuery::Kind::Truncate, global_context, context, target_table_id, true); } void StorageMaterializedView::checkStatementCanBeForwarded() const diff --git a/src/Storages/StorageMaterializedView.h b/src/Storages/StorageMaterializedView.h index a5dc089d68e..cd89af154a9 100644 --- a/src/Storages/StorageMaterializedView.h +++ b/src/Storages/StorageMaterializedView.h @@ -37,7 +37,7 @@ public: BlockOutputStreamPtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, const Context & context) override; void drop() override; - void dropInnerTable(bool no_delay, const Context & context); + void dropInnerTableIfAny(bool no_delay, const Context & context) override; void truncate(const ASTPtr &, const StorageMetadataPtr &, const Context &, TableExclusiveLockHolder &) override;