From 1b827ac4249c6f891037a48e62fbf943e24ccc84 Mon Sep 17 00:00:00 2001 From: kssenii Date: Sat, 8 May 2021 14:55:53 +0000 Subject: [PATCH] Correct merge, finish refactoring --- src/Core/PostgreSQL/Connection.cpp | 35 ++++ src/Core/PostgreSQL/Connection.h | 30 ++++ src/Core/PostgreSQL/ConnectionHolder.h | 37 +++++ src/Core/PostgreSQL/PoolWithFailover.cpp | 138 ++++++++++++++++ src/Core/PostgreSQL/PoolWithFailover.h | 65 ++++++++ src/Core/PostgreSQL/Utils.cpp | 19 +++ src/Core/PostgreSQL/Utils.h | 37 +++++ .../MaterializePostgreSQLConsumer.cpp | 8 +- .../MaterializePostgreSQLConsumer.h | 4 +- .../PostgreSQLReplicationHandler.cpp | 156 +++++++++--------- .../PostgreSQL/PostgreSQLReplicationHandler.h | 3 +- .../PostgreSQL/StorageMaterializePostgreSQL.h | 2 +- .../TableFunctionPostgreSQL.cpp | 4 +- src/TableFunctions/TableFunctionPostgreSQL.h | 2 +- .../__init__.py | 0 .../configs/config.xml | 30 ++++ .../configs/users.xml | 23 +++ 17 files changed, 505 insertions(+), 88 deletions(-) create mode 100644 src/Core/PostgreSQL/Connection.cpp create mode 100644 src/Core/PostgreSQL/Connection.h create mode 100644 src/Core/PostgreSQL/ConnectionHolder.h create mode 100644 src/Core/PostgreSQL/PoolWithFailover.cpp create mode 100644 src/Core/PostgreSQL/PoolWithFailover.h create mode 100644 src/Core/PostgreSQL/Utils.cpp create mode 100644 src/Core/PostgreSQL/Utils.h create mode 100644 tests/integration/test_dictionaries_update_field/__init__.py create mode 100644 tests/integration/test_dictionaries_update_field/configs/config.xml create mode 100644 tests/integration/test_dictionaries_update_field/configs/users.xml diff --git a/src/Core/PostgreSQL/Connection.cpp b/src/Core/PostgreSQL/Connection.cpp new file mode 100644 index 00000000000..ff6197d1390 --- /dev/null +++ b/src/Core/PostgreSQL/Connection.cpp @@ -0,0 +1,35 @@ +#include "Connection.h" +#include + +namespace postgres +{ + +Connection::Connection(const ConnectionInfo & connection_info_, bool replication_) + : connection_info(connection_info_), replication(replication_) +{ + if (replication) + { + connection_info = std::make_pair( + fmt::format("{} replication=database", connection_info.first), connection_info.second); + } +} + +pqxx::connection & Connection::getRef() +{ + connect(); + assert(connection != nullptr); + return *connection; +} + +void Connection::connect() +{ + if (!connection || !connection->is_open()) + { + /// Always throws if there is no connection. + connection = std::make_unique(connection_info.first); + if (replication) + connection->set_variable("default_transaction_isolation", "'repeatable read'"); + LOG_DEBUG(&Poco::Logger::get("PostgreSQLConnection"), "New connection to {}", connection_info.second); + } +} +} diff --git a/src/Core/PostgreSQL/Connection.h b/src/Core/PostgreSQL/Connection.h new file mode 100644 index 00000000000..1e9334eace5 --- /dev/null +++ b/src/Core/PostgreSQL/Connection.h @@ -0,0 +1,30 @@ +#pragma once + +#include // Y_IGNORE +#include + + +namespace postgres +{ +using ConnectionInfo = std::pair; +using ConnectionPtr = std::unique_ptr; + +class Connection +{ +public: + Connection(const ConnectionInfo & connection_info_, bool replication_ = false); + + Connection(const Connection & other) = delete; + + pqxx::connection & getRef(); + + void connect(); + + const ConnectionInfo & getConnectionInfo() { return connection_info; } + +private: + ConnectionPtr connection; + ConnectionInfo connection_info; + bool replication; +}; +} diff --git a/src/Core/PostgreSQL/ConnectionHolder.h b/src/Core/PostgreSQL/ConnectionHolder.h new file mode 100644 index 00000000000..98ab7df182d --- /dev/null +++ b/src/Core/PostgreSQL/ConnectionHolder.h @@ -0,0 +1,37 @@ +#pragma once + +#include // Y_IGNORE +#include +#include + + +namespace postgres +{ + +using ConnectionPtr = std::unique_ptr; +using Pool = BorrowedObjectPool; +using PoolPtr = std::shared_ptr; + +class ConnectionHolder +{ + +public: + ConnectionHolder(PoolPtr pool_, ConnectionPtr connection_) : pool(pool_), connection(std::move(connection_)) {} + + ConnectionHolder(const ConnectionHolder & other) = delete; + + ~ConnectionHolder() { pool->returnObject(std::move(connection)); } + + pqxx::connection & get() + { + assert(connection != nullptr); + return *connection; + } + +private: + PoolPtr pool; + ConnectionPtr connection; +}; + +using ConnectionHolderPtr = std::unique_ptr; +} diff --git a/src/Core/PostgreSQL/PoolWithFailover.cpp b/src/Core/PostgreSQL/PoolWithFailover.cpp new file mode 100644 index 00000000000..6bf756b8a12 --- /dev/null +++ b/src/Core/PostgreSQL/PoolWithFailover.cpp @@ -0,0 +1,138 @@ +#include "PoolWithFailover.h" +#include "Utils.h" +#include +#include + +namespace DB +{ +namespace ErrorCodes +{ + extern const int POSTGRESQL_CONNECTION_FAILURE; +} +} + +namespace postgres +{ + +PoolWithFailover::PoolWithFailover( + const Poco::Util::AbstractConfiguration & config, const String & config_prefix, + size_t pool_size, size_t pool_wait_timeout_, size_t max_tries_) + : pool_wait_timeout(pool_wait_timeout_) + , max_tries(max_tries_) +{ + LOG_TRACE(&Poco::Logger::get("PostgreSQLConnectionPool"), "PostgreSQL connection pool size: {}, connection wait timeout: {}, max failover tries: {}", + pool_size, pool_wait_timeout, max_tries_); + + auto db = config.getString(config_prefix + ".db", ""); + auto host = config.getString(config_prefix + ".host", ""); + auto port = config.getUInt(config_prefix + ".port", 0); + auto user = config.getString(config_prefix + ".user", ""); + auto password = config.getString(config_prefix + ".password", ""); + + if (config.has(config_prefix + ".replica")) + { + Poco::Util::AbstractConfiguration::Keys config_keys; + config.keys(config_prefix, config_keys); + + for (const auto & config_key : config_keys) + { + if (config_key.starts_with("replica")) + { + std::string replica_name = config_prefix + "." + config_key; + size_t priority = config.getInt(replica_name + ".priority", 0); + + auto replica_host = config.getString(replica_name + ".host", host); + auto replica_port = config.getUInt(replica_name + ".port", port); + auto replica_user = config.getString(replica_name + ".user", user); + auto replica_password = config.getString(replica_name + ".password", password); + + auto connection_string = formatConnectionString(db, replica_host, replica_port, replica_user, replica_password).first; + replicas_with_priority[priority].emplace_back(connection_string, pool_size); + } + } + } + else + { + auto connection_string = formatConnectionString(db, host, port, user, password).first; + replicas_with_priority[0].emplace_back(connection_string, pool_size); + } +} + +PoolWithFailover::PoolWithFailover( + const std::string & database, + const RemoteDescription & addresses, + const std::string & user, const std::string & password, + size_t pool_size, size_t pool_wait_timeout_, size_t max_tries_) + : pool_wait_timeout(pool_wait_timeout_) + , max_tries(max_tries_) +{ + LOG_TRACE(&Poco::Logger::get("PostgreSQLConnectionPool"), "PostgreSQL connection pool size: {}, connection wait timeout: {}, max failover tries: {}", + pool_size, pool_wait_timeout, max_tries_); + + /// Replicas have the same priority, but traversed replicas are moved to the end of the queue. + for (const auto & [host, port] : addresses) + { + LOG_DEBUG(&Poco::Logger::get("PostgreSQLPoolWithFailover"), "Adding address host: {}, port: {} to connection pool", host, port); + auto connection_string = formatConnectionString(database, host, port, user, password).first; + replicas_with_priority[0].emplace_back(connection_string, pool_size); + } +} + +ConnectionHolderPtr PoolWithFailover::get() +{ + std::lock_guard lock(mutex); + + for (size_t try_idx = 0; try_idx < max_tries; ++try_idx) + { + for (auto & priority : replicas_with_priority) + { + auto & replicas = priority.second; + for (size_t i = 0; i < replicas.size(); ++i) + { + auto & replica = replicas[i]; + + ConnectionPtr connection; + auto connection_available = replica.pool->tryBorrowObject(connection, []() { return nullptr; }, pool_wait_timeout); + + if (!connection_available) + { + LOG_WARNING(log, "Unable to fetch connection within the timeout"); + continue; + } + + try + { + /// Create a new connection or reopen an old connection if it became invalid. + if (!connection || !connection->is_open()) + { + connection = std::make_unique(replica.connection_string); + LOG_DEBUG(log, "New connection to {}:{}", connection->hostname(), connection->port()); + } + } + catch (const pqxx::broken_connection & pqxx_error) + { + LOG_ERROR(log, "Connection error: {}", pqxx_error.what()); + + replica.pool->returnObject(std::move(connection)); + continue; + } + catch (...) + { + replica.pool->returnObject(std::move(connection)); + throw; + } + + auto connection_holder = std::make_unique(replica.pool, std::move(connection)); + + /// Move all traversed replicas to the end. + if (replicas.size() > 1) + std::rotate(replicas.begin(), replicas.begin() + i + 1, replicas.end()); + + return connection_holder; + } + } + } + + throw DB::Exception(DB::ErrorCodes::POSTGRESQL_CONNECTION_FAILURE, "Unable to connect to any of the replicas"); +} +} diff --git a/src/Core/PostgreSQL/PoolWithFailover.h b/src/Core/PostgreSQL/PoolWithFailover.h new file mode 100644 index 00000000000..f4ae2c6cd1b --- /dev/null +++ b/src/Core/PostgreSQL/PoolWithFailover.h @@ -0,0 +1,65 @@ +#pragma once + +#include "ConnectionHolder.h" +#include +#include +#include + + +namespace postgres +{ + +class PoolWithFailover +{ + +using RemoteDescription = std::vector>; + +public: + static constexpr inline auto POSTGRESQL_POOL_DEFAULT_SIZE = 16; + static constexpr inline auto POSTGRESQL_POOL_WAIT_TIMEOUT = 5000; + static constexpr inline auto POSTGRESQL_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES = 5; + + PoolWithFailover( + const Poco::Util::AbstractConfiguration & config, + const std::string & config_prefix, + size_t pool_size = POSTGRESQL_POOL_DEFAULT_SIZE, + size_t pool_wait_timeout = POSTGRESQL_POOL_WAIT_TIMEOUT, + size_t max_tries_ = POSTGRESQL_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES); + + PoolWithFailover( + const std::string & database, + const RemoteDescription & addresses, + const std::string & user, + const std::string & password, + size_t pool_size = POSTGRESQL_POOL_DEFAULT_SIZE, + size_t pool_wait_timeout = POSTGRESQL_POOL_WAIT_TIMEOUT, + size_t max_tries_ = POSTGRESQL_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES); + + PoolWithFailover(const PoolWithFailover & other) = delete; + + ConnectionHolderPtr get(); + +private: + struct PoolHolder + { + String connection_string; + PoolPtr pool; + + PoolHolder(const String & connection_string_, size_t pool_size) + : connection_string(connection_string_), pool(std::make_shared(pool_size)) {} + }; + + /// Highest priority is 0, the bigger the number in map, the less the priority + using Replicas = std::vector; + using ReplicasWithPriority = std::map; + + ReplicasWithPriority replicas_with_priority; + size_t pool_wait_timeout; + size_t max_tries; + std::mutex mutex; + Poco::Logger * log = &Poco::Logger::get("PostgreSQLConnectionPool"); +}; + +using PoolWithFailoverPtr = std::shared_ptr; + +} diff --git a/src/Core/PostgreSQL/Utils.cpp b/src/Core/PostgreSQL/Utils.cpp new file mode 100644 index 00000000000..98e76da99d2 --- /dev/null +++ b/src/Core/PostgreSQL/Utils.cpp @@ -0,0 +1,19 @@ +#include "Utils.h" +#include + +namespace postgres +{ + +ConnectionInfo formatConnectionString(String dbname, String host, UInt16 port, String user, String password) +{ + DB::WriteBufferFromOwnString out; + out << "dbname=" << DB::quote << dbname + << " host=" << DB::quote << host + << " port=" << port + << " user=" << DB::quote << user + << " password=" << DB::quote << password + << " connect_timeout=10"; + return std::make_pair(out.str(), host + ':' + DB::toString(port)); +} + +} diff --git a/src/Core/PostgreSQL/Utils.h b/src/Core/PostgreSQL/Utils.h new file mode 100644 index 00000000000..ccb133112d9 --- /dev/null +++ b/src/Core/PostgreSQL/Utils.h @@ -0,0 +1,37 @@ +#pragma once + +#include // Y_IGNORE +#include +#include "Connection.h" + +namespace pqxx +{ + using ReadTransaction = pqxx::read_transaction; + using ReplicationTransaction = pqxx::transaction; +} + + +namespace postgres +{ + +ConnectionInfo formatConnectionString(String dbname, String host, UInt16 port, String user, String password); + +Connection createReplicationConnection(const ConnectionInfo & connection_info); + +template +class Transaction +{ +public: + Transaction(pqxx::connection & connection) : transaction(connection) {} + + ~Transaction() { transaction.commit(); } + + T & getRef() { return transaction; } + + void exec(const String & query) { transaction.exec(query); } + +private: + T transaction; +}; + +} diff --git a/src/Storages/PostgreSQL/MaterializePostgreSQLConsumer.cpp b/src/Storages/PostgreSQL/MaterializePostgreSQLConsumer.cpp index 7eabce7c270..79a98b7b070 100644 --- a/src/Storages/PostgreSQL/MaterializePostgreSQLConsumer.cpp +++ b/src/Storages/PostgreSQL/MaterializePostgreSQLConsumer.cpp @@ -24,7 +24,7 @@ namespace ErrorCodes MaterializePostgreSQLConsumer::MaterializePostgreSQLConsumer( ContextPtr context_, - postgres::Connection && connection_, + std::shared_ptr connection_, const std::string & replication_slot_name_, const std::string & publication_name_, const std::string & metadata_path, @@ -37,7 +37,7 @@ MaterializePostgreSQLConsumer::MaterializePostgreSQLConsumer( , replication_slot_name(replication_slot_name_) , publication_name(publication_name_) , metadata(metadata_path) - , connection(std::move(connection_)) + , connection(connection_) , current_lsn(start_lsn) , max_block_size(max_block_size_) , allow_automatic_update(allow_automatic_update_) @@ -88,7 +88,7 @@ void MaterializePostgreSQLConsumer::readMetadata() if (!metadata.lsn().empty()) { - auto tx = std::make_shared(connection.getRef()); + auto tx = std::make_shared(connection->getRef()); final_lsn = metadata.lsn(); final_lsn = advanceLSN(tx); tx->commit(); @@ -600,7 +600,7 @@ bool MaterializePostgreSQLConsumer::readFromReplicationSlot() try { - tx = std::make_shared(connection.getRef()); + tx = std::make_shared(connection->getRef()); /// Read up to max_block_size rows changes (upto_n_changes parameter). It might return larger number as the limit /// is checked only after each transaction block. diff --git a/src/Storages/PostgreSQL/MaterializePostgreSQLConsumer.h b/src/Storages/PostgreSQL/MaterializePostgreSQLConsumer.h index ffe80c93ca6..afb39519715 100644 --- a/src/Storages/PostgreSQL/MaterializePostgreSQLConsumer.h +++ b/src/Storages/PostgreSQL/MaterializePostgreSQLConsumer.h @@ -28,7 +28,7 @@ public: MaterializePostgreSQLConsumer( ContextPtr context_, - postgres::Connection && connection_, + std::shared_ptr connection_, const std::string & replication_slot_name_, const std::string & publication_name_, const std::string & metadata_path, @@ -106,7 +106,7 @@ private: const std::string replication_slot_name, publication_name; MaterializePostgreSQLMetadata metadata; - postgres::Connection connection; + std::shared_ptr connection; std::string current_lsn, final_lsn; const size_t max_block_size; diff --git a/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.cpp b/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.cpp index dc38d18759e..30d3f1e6e97 100644 --- a/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.cpp +++ b/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.cpp @@ -9,7 +9,7 @@ #include #include #include -#include +#include namespace DB @@ -42,7 +42,7 @@ PostgreSQLReplicationHandler::PostgreSQLReplicationHandler( , allow_automatic_update(allow_automatic_update_) , is_materialize_postgresql_database(is_materialize_postgresql_database_) , tables_list(tables_list_) - , connection(connection_info_) + , connection(std::make_shared(connection_info_)) { replication_slot = fmt::format("{}_ch_replication_slot", current_database_name); publication_name = fmt::format("{}_ch_publication", current_database_name); @@ -68,8 +68,7 @@ void PostgreSQLReplicationHandler::waitConnectionAndStart() { try { - /// Will throw pqxx::broken_connection if no connection at the moment - connection.isValid(); + connection->connect(); /// Will throw pqxx::broken_connection if no connection at the moment startSynchronization(false); } catch (const pqxx::broken_connection & pqxx_error) @@ -95,7 +94,7 @@ void PostgreSQLReplicationHandler::shutdown() void PostgreSQLReplicationHandler::startSynchronization(bool throw_on_error) { { - postgres::Transaction tx(connection.getRef()); + postgres::Transaction tx(connection->getRef()); createPublicationIfNeeded(tx.getRef()); } @@ -121,6 +120,8 @@ void PostgreSQLReplicationHandler::startSynchronization(bool throw_on_error) e.addMessage("while loading table {}.{}", remote_database_name, table_name); tryLogCurrentException(__PRETTY_FUNCTION__); + /// Throw in case of single MaterializePostgreSQL storage, becuase initial setup is done immediately + /// (unlike database engine where it is done in a separate thread). if (throw_on_error) throw; } @@ -134,16 +135,17 @@ void PostgreSQLReplicationHandler::startSynchronization(bool throw_on_error) { initial_sync(); } + /// Replication slot depends on publication, so if replication slot exists and new + /// publication was just created - drop that replication slot and start from scratch. else if (new_publication_created) { - /// Replication slot depends on publication, so if replication slot exists and new - /// publication was just created - drop that replication slot and start from scratch. dropReplicationSlot(tx.getRef()); initial_sync(); } + /// Synchronization and initial load already took place - do not create any new tables, just fetch StoragePtr's + /// and pass them to replication consumer. else { - /// Synchronization and initial load already took place. LOG_TRACE(log, "Loading {} tables...", materialized_storages.size()); for (const auto & [table_name, storage] : materialized_storages) { @@ -179,9 +181,12 @@ void PostgreSQLReplicationHandler::startSynchronization(bool throw_on_error) } } + /// Pass current connection to consumer. It is not std::moved implicitly, but a shared_ptr is passed. + /// Consumer and replication handler are always executed one after another (not concurrently) and share the same connection. + /// Handler uses it only for loadFromSnapshot and shutdown methods. consumer = std::make_shared( context, - std::move(connection), + connection, replication_slot, publication_name, metadata_path, @@ -197,10 +202,10 @@ void PostgreSQLReplicationHandler::startSynchronization(bool throw_on_error) } -StoragePtr PostgreSQLReplicationHandler::loadFromSnapshot(std::string & snapshot_name, const String & table_name, +StoragePtr PostgreSQLReplicationHandler::loadFromSnapshot(String & snapshot_name, const String & table_name, StorageMaterializePostgreSQL * materialized_storage) { - auto tx = std::make_shared(connection.getRef()); + auto tx = std::make_shared(connection->getRef()); std::string query_str = fmt::format("SET TRANSACTION SNAPSHOT '{}'", snapshot_name); tx->exec(query_str); @@ -242,7 +247,16 @@ void PostgreSQLReplicationHandler::consumerFunc() bool schedule_now = consumer->consume(skipped_tables); if (!skipped_tables.empty()) - reloadFromSnapshot(skipped_tables); + { + try + { + reloadFromSnapshot(skipped_tables); + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__); + } + } if (stop_synchronization) return; @@ -270,6 +284,7 @@ bool PostgreSQLReplicationHandler::isPublicationExist(pqxx::work & tx) void PostgreSQLReplicationHandler::createPublicationIfNeeded(pqxx::work & tx, bool create_without_check) { + /// For database engine a publication can be created earlier than in startReplication(). if (new_publication_created) return; @@ -370,12 +385,10 @@ void PostgreSQLReplicationHandler::dropPublication(pqxx::nontransaction & tx) void PostgreSQLReplicationHandler::shutdownFinal() { - if (Poco::File(metadata_path).exists()) - Poco::File(metadata_path).remove(); - - postgres::Connection connection_(connection_info); - postgres::Transaction tx(connection_.getRef()); + if (std::filesystem::exists(metadata_path)) + std::filesystem::remove(metadata_path); + postgres::Transaction tx(connection->getRef()); dropPublication(tx.getRef()); if (isReplicationSlotExist(tx.getRef(), replication_slot)) dropReplicationSlot(tx.getRef()); @@ -432,80 +445,69 @@ void PostgreSQLReplicationHandler::reloadFromSnapshot(const std::vector tx(replication_connection.getRef()); + + std::string snapshot_name, start_lsn; + createReplicationSlot(tx.getRef(), start_lsn, snapshot_name, true); + + for (const auto & [relation_id, table_name] : relation_data) { - postgres::Connection replication_connection(connection_info, /* replication */true); - postgres::Transaction tx(replication_connection.getRef()); + auto storage = DatabaseCatalog::instance().getTable(StorageID(current_database_name, table_name), context); + auto * materialized_storage = storage->as (); - std::string snapshot_name, start_lsn; - createReplicationSlot(tx.getRef(), start_lsn, snapshot_name, true); + auto temp_materialized_storage = materialized_storage->createTemporary(); - for (const auto & [relation_id, table_name] : relation_data) + /// This snapshot is valid up to the end of the transaction, which exported it. + StoragePtr temp_nested_storage = loadFromSnapshot(snapshot_name, table_name, temp_materialized_storage->as ()); + + auto table_id = materialized_storage->getNestedStorageID(); + auto temp_table_id = temp_nested_storage->getStorageID(); + + LOG_TRACE(log, "Starting background update of table {}.{} ({}) with table {}.{} ({})", + table_id.database_name, table_id.table_name, toString(table_id.uuid), + temp_table_id.database_name, temp_table_id.table_name, toString(temp_table_id.uuid)); + + auto ast_rename = std::make_shared(); + ASTRenameQuery::Element elem { - auto storage = DatabaseCatalog::instance().getTable( - StorageID(current_database_name, table_name), - context); - auto * materialized_storage = storage->as (); + ASTRenameQuery::Table{table_id.database_name, table_id.table_name}, + ASTRenameQuery::Table{temp_table_id.database_name, temp_table_id.table_name} + }; + ast_rename->elements.push_back(std::move(elem)); + ast_rename->exchange = true; - auto temp_materialized_storage = materialized_storage->createTemporary(); + auto nested_context = materialized_storage->getNestedTableContext(); - /// This snapshot is valid up to the end of the transaction, which exported it. - StoragePtr temp_nested_storage = loadFromSnapshot(snapshot_name, table_name, temp_materialized_storage->as ()); + try + { + auto materialized_table_lock = materialized_storage->lockForShare(String(), context->getSettingsRef().lock_acquire_timeout); + InterpreterRenameQuery(ast_rename, nested_context).execute(); - auto table_id = materialized_storage->getNestedStorageID(); - auto temp_table_id = temp_nested_storage->getStorageID(); - - LOG_TRACE(log, "Starting background update of table {}.{} ({}) with table {}.{} ({})", - table_id.database_name, table_id.table_name, toString(table_id.uuid), - temp_table_id.database_name, temp_table_id.table_name, toString(temp_table_id.uuid)); - - auto ast_rename = std::make_shared(); - ASTRenameQuery::Element elem { - ASTRenameQuery::Table{table_id.database_name, table_id.table_name}, - ASTRenameQuery::Table{temp_table_id.database_name, temp_table_id.table_name} - }; - ast_rename->elements.push_back(std::move(elem)); - ast_rename->exchange = true; + auto nested_storage = DatabaseCatalog::instance().getTable(StorageID(table_id.database_name, table_id.table_name), nested_context); + auto nested_table_lock = nested_storage->lockForShare(String(), context->getSettingsRef().lock_acquire_timeout); + auto nested_table_id = nested_storage->getStorageID(); - auto nested_context = materialized_storage->getNestedTableContext(); + materialized_storage->setNestedStorageID(nested_table_id); + nested_storage = materialized_storage->prepare(); + LOG_TRACE(log, "Updated table {}.{} ({})", nested_table_id.database_name, nested_table_id.table_name, toString(nested_table_id.uuid)); - try - { - auto materialized_table_lock = materialized_storage->lockForShare(String(), context->getSettingsRef().lock_acquire_timeout); - InterpreterRenameQuery(ast_rename, nested_context).execute(); - - { - auto nested_storage = DatabaseCatalog::instance().getTable(StorageID(table_id.database_name, table_id.table_name), nested_context); - auto nested_table_lock = nested_storage->lockForShare(String(), context->getSettingsRef().lock_acquire_timeout); - auto nested_table_id = nested_storage->getStorageID(); - - materialized_storage->setNestedStorageID(nested_table_id); - nested_storage = materialized_storage->prepare(); - LOG_TRACE(log, "Updated table {}.{} ({})", nested_table_id.database_name, nested_table_id.table_name, toString(nested_table_id.uuid)); - - /// Pass pointer to new nested table into replication consumer, remove current table from skip list and set start lsn position. - consumer->updateNested(table_name, nested_storage, relation_id, start_lsn); - } - - LOG_DEBUG(log, "Dropping table {}.{} ({})", temp_table_id.database_name, temp_table_id.table_name, toString(temp_table_id.uuid)); - InterpreterDropQuery::executeDropQuery(ASTDropQuery::Kind::Drop, nested_context, nested_context, temp_table_id, true); - - dropReplicationSlot(tx.getRef(), /* temporary */true); - } - catch (...) - { - tryLogCurrentException(__PRETTY_FUNCTION__); + /// Pass pointer to new nested table into replication consumer, remove current table from skip list and set start lsn position. + consumer->updateNested(table_name, nested_storage, relation_id, start_lsn); } + + LOG_DEBUG(log, "Dropping table {}.{} ({})", temp_table_id.database_name, temp_table_id.table_name, toString(temp_table_id.uuid)); + InterpreterDropQuery::executeDropQuery(ASTDropQuery::Kind::Drop, nested_context, nested_context, temp_table_id, true); + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__); } } - catch (...) - { - tryLogCurrentException(__PRETTY_FUNCTION__); - } + + dropReplicationSlot(tx.getRef(), /* temporary */true); } - - } #endif diff --git a/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.h b/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.h index ddea1d03763..c955b2fbe3a 100644 --- a/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.h +++ b/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.h @@ -113,7 +113,8 @@ private: String replication_slot, publication_name; - postgres::Connection connection; + /// Shared between replication_consumer and replication_handler, but never accessed concurrently. + std::shared_ptr connection; /// Replication consumer. Manages decoding of replication stream and syncing into tables. std::shared_ptr consumer; diff --git a/src/Storages/PostgreSQL/StorageMaterializePostgreSQL.h b/src/Storages/PostgreSQL/StorageMaterializePostgreSQL.h index 6b896c24dfa..e38041a1b78 100644 --- a/src/Storages/PostgreSQL/StorageMaterializePostgreSQL.h +++ b/src/Storages/PostgreSQL/StorageMaterializePostgreSQL.h @@ -173,7 +173,7 @@ private: /// It results in the fact: single MaterializePostgreSQL storage is created only if its nested table is created. /// In case of attach - this setup will be done in a separate thread in the background. It will also /// be checked for nested table and attempted to load it if it does not exist for some reason. - bool is_attach; + bool is_attach = true; }; } diff --git a/src/TableFunctions/TableFunctionPostgreSQL.cpp b/src/TableFunctions/TableFunctionPostgreSQL.cpp index db609cd6081..54facb9ca0b 100644 --- a/src/TableFunctions/TableFunctionPostgreSQL.cpp +++ b/src/TableFunctions/TableFunctionPostgreSQL.cpp @@ -43,9 +43,9 @@ StoragePtr TableFunctionPostgreSQL::executeImpl(const ASTPtr & /*ast_function*/, ColumnsDescription TableFunctionPostgreSQL::getActualTableStructure(ContextPtr context) const { const bool use_nulls = context->getSettingsRef().external_table_functions_use_nulls; - auto connection = connection_pool->get(); + auto connection_holder = connection_pool->get(); auto columns = fetchPostgreSQLTableStructure( - connection->conn(), + connection_holder->get(), remote_table_schema.empty() ? doubleQuoteString(remote_table_name) : doubleQuoteString(remote_table_schema) + '.' + doubleQuoteString(remote_table_name), use_nulls).columns; diff --git a/src/TableFunctions/TableFunctionPostgreSQL.h b/src/TableFunctions/TableFunctionPostgreSQL.h index a3d024c1a50..c31d02fa955 100644 --- a/src/TableFunctions/TableFunctionPostgreSQL.h +++ b/src/TableFunctions/TableFunctionPostgreSQL.h @@ -5,7 +5,7 @@ #if USE_LIBPQXX #include -#include +#include namespace DB diff --git a/tests/integration/test_dictionaries_update_field/__init__.py b/tests/integration/test_dictionaries_update_field/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/integration/test_dictionaries_update_field/configs/config.xml b/tests/integration/test_dictionaries_update_field/configs/config.xml new file mode 100644 index 00000000000..a1518083be3 --- /dev/null +++ b/tests/integration/test_dictionaries_update_field/configs/config.xml @@ -0,0 +1,30 @@ + + + + trace + /var/log/clickhouse-server/clickhouse-server.log + /var/log/clickhouse-server/clickhouse-server.err.log + 1000M + 10 + + + 9000 + 127.0.0.1 + + + + true + none + + AcceptCertificateHandler + + + + + 500 + 5368709120 + ./clickhouse/ + users.xml + + /etc/clickhouse-server/config.d/*.xml + diff --git a/tests/integration/test_dictionaries_update_field/configs/users.xml b/tests/integration/test_dictionaries_update_field/configs/users.xml new file mode 100644 index 00000000000..6061af8e33d --- /dev/null +++ b/tests/integration/test_dictionaries_update_field/configs/users.xml @@ -0,0 +1,23 @@ + + + + + + + + + + + + ::/0 + + default + default + + + + + + + +