From 489a92c0679b51ed2f4b6b15ae634b6f30fa59b7 Mon Sep 17 00:00:00 2001 From: kssenii Date: Wed, 15 Sep 2021 18:22:33 +0300 Subject: [PATCH] Review fixes --- src/Databases/DatabaseAtomic.cpp | 1 - src/Databases/IDatabase.h | 2 +- .../PostgreSQL/DatabaseMaterializedPostgreSQL.cpp | 15 +++++++++++---- .../PostgreSQL/DatabaseMaterializedPostgreSQL.h | 2 +- src/Interpreters/InterpreterAlterQuery.cpp | 4 ++-- .../PostgreSQL/PostgreSQLReplicationHandler.cpp | 12 ++++++------ 6 files changed, 21 insertions(+), 15 deletions(-) diff --git a/src/Databases/DatabaseAtomic.cpp b/src/Databases/DatabaseAtomic.cpp index 7b1a8c6446e..2dbcd652004 100644 --- a/src/Databases/DatabaseAtomic.cpp +++ b/src/Databases/DatabaseAtomic.cpp @@ -2,7 +2,6 @@ #include #include #include -#include #include #include #include diff --git a/src/Databases/IDatabase.h b/src/Databases/IDatabase.h index 92041b366a7..3cb1856d08d 100644 --- a/src/Databases/IDatabase.h +++ b/src/Databases/IDatabase.h @@ -282,7 +282,7 @@ public: /// Delete data and metadata stored inside the database, if exists. virtual void drop(ContextPtr /*context*/) {} - virtual void applyNewSettings(const SettingsChanges &, ContextPtr) + virtual void applySettingsChanges(const SettingsChanges &, ContextPtr) { throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Database engine {} either does not support settings, or does not support altering settings", diff --git a/src/Databases/PostgreSQL/DatabaseMaterializedPostgreSQL.cpp b/src/Databases/PostgreSQL/DatabaseMaterializedPostgreSQL.cpp index 42720fa4eb1..cb3cda8ab79 100644 --- a/src/Databases/PostgreSQL/DatabaseMaterializedPostgreSQL.cpp +++ b/src/Databases/PostgreSQL/DatabaseMaterializedPostgreSQL.cpp @@ -127,9 +127,11 @@ void DatabaseMaterializedPostgreSQL::loadStoredObjects( } -void DatabaseMaterializedPostgreSQL::applyNewSettings(const SettingsChanges & settings_changes, ContextPtr query_context) +void DatabaseMaterializedPostgreSQL::applySettingsChanges(const SettingsChanges & settings_changes, ContextPtr query_context) { std::lock_guard lock(handler_mutex); + bool need_update_on_disk = false; + for (const auto & change : settings_changes) { if (!settings->has(change.name)) @@ -140,12 +142,12 @@ void DatabaseMaterializedPostgreSQL::applyNewSettings(const SettingsChanges & se if (!query_context->isInternalQuery()) throw Exception(ErrorCodes::QUERY_NOT_ALLOWED, "Changing setting `{}` is not allowed", change.name); - DatabaseOnDisk::modifySettingsMetadata(settings_changes, query_context); + need_update_on_disk = true; } else if ((change.name == "materialized_postgresql_allow_automatic_update") || (change.name == "materialized_postgresql_max_block_size")) { - DatabaseOnDisk::modifySettingsMetadata(settings_changes, query_context); replication_handler->setSetting(change); + need_update_on_disk = true; } else { @@ -154,6 +156,9 @@ void DatabaseMaterializedPostgreSQL::applyNewSettings(const SettingsChanges & se settings->applyChange(change); } + + if (need_update_on_disk) + DatabaseOnDisk::modifySettingsMetadata(settings_changes, query_context); } @@ -185,6 +190,8 @@ StoragePtr DatabaseMaterializedPostgreSQL::tryGetTable(const String & name, Cont /// `except` is not empty in case it is detach and it will contain only one table name - name of detached table. +/// In case we have a user defined setting `materialized_postgresql_tables_list`, then list of tables is always taken there. +/// Otherwise we traverse materialized storages to find out the list. String DatabaseMaterializedPostgreSQL::getFormattedTablesList(const String & except) const { String tables_list; @@ -334,7 +341,7 @@ StoragePtr DatabaseMaterializedPostgreSQL::detachTable(const String & table_name auto nested = table_to_delete->as()->getNested(); if (!nested) - throw Exception(ErrorCodes::UNKNOWN_TABLE, "Inner table `{}` does not exist", table_name); + throw Exception(ErrorCodes::LOGICAL_ERROR, "Inner table `{}` does not exist", table_name); std::lock_guard lock(handler_mutex); replication_handler->removeTableFromReplication(table_name); diff --git a/src/Databases/PostgreSQL/DatabaseMaterializedPostgreSQL.h b/src/Databases/PostgreSQL/DatabaseMaterializedPostgreSQL.h index effd0ec653a..a0f9b3fce7a 100644 --- a/src/Databases/PostgreSQL/DatabaseMaterializedPostgreSQL.h +++ b/src/Databases/PostgreSQL/DatabaseMaterializedPostgreSQL.h @@ -61,7 +61,7 @@ public: void stopReplication(); - void applyNewSettings(const SettingsChanges & settings_changes, ContextPtr query_context) override; + void applySettingsChanges(const SettingsChanges & settings_changes, ContextPtr query_context) override; void shutdown() override; diff --git a/src/Interpreters/InterpreterAlterQuery.cpp b/src/Interpreters/InterpreterAlterQuery.cpp index d0054439c56..6595e1c02be 100644 --- a/src/Interpreters/InterpreterAlterQuery.cpp +++ b/src/Interpreters/InterpreterAlterQuery.cpp @@ -189,7 +189,7 @@ BlockIO InterpreterAlterQuery::executeToDatabase(const ASTAlterQuery & alter) for (const auto & command : alter_commands) { if (command.type != AlterCommand::MODIFY_DATABASE_SETTING) - throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Alter of type '{}' is not supported by databases", alterTypeToString(command.type)); + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Unsupported alter type for database engines"); } for (const auto & command : alter_commands) @@ -197,7 +197,7 @@ BlockIO InterpreterAlterQuery::executeToDatabase(const ASTAlterQuery & alter) if (!command.ignore) { if (command.type == AlterCommand::MODIFY_DATABASE_SETTING) - database->applyNewSettings(command.settings_changes, getContext()); + database->applySettingsChanges(command.settings_changes, getContext()); else throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Unsupported alter command"); } diff --git a/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.cpp b/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.cpp index 23ee7532b5e..456ca2c514e 100644 --- a/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.cpp +++ b/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.cpp @@ -119,6 +119,10 @@ void PostgreSQLReplicationHandler::startSynchronization(bool throw_on_error) /// Data older than this is not available anymore. String snapshot_name, start_lsn; + /// Also lets have a separate non-replication connection, because we need two parallel transactions and + /// one connection can have one transaction at a time. + auto tmp_connection = std::make_shared(connection_info); + auto initial_sync = [&]() { LOG_TRACE(log, "Starting tables sync load"); @@ -136,15 +140,11 @@ void PostgreSQLReplicationHandler::startSynchronization(bool throw_on_error) createReplicationSlot(tx, start_lsn, snapshot_name); } - /// Loading tables from snapshot requires a certain transaction type, so we need to open a new transactin. - /// But we cannot have more than one open transaciton on the same connection. Therefore we have - /// a separate connection to load tables. - postgres::Connection tmp_connection(connection_info); for (const auto & [table_name, storage] : materialized_storages) { try { - nested_storages[table_name] = loadFromSnapshot(tmp_connection, snapshot_name, table_name, storage->as ()); + nested_storages[table_name] = loadFromSnapshot(*tmp_connection, snapshot_name, table_name, storage->as()); } catch (Exception & e) { @@ -211,7 +211,7 @@ void PostgreSQLReplicationHandler::startSynchronization(bool throw_on_error) /// Handler uses it only for loadFromSnapshot and shutdown methods. consumer = std::make_shared( context, - std::make_shared(connection_info), + std::move(tmp_connection), replication_slot, publication_name, start_lsn,