Correct merge, finish refactoring

This commit is contained in:
kssenii 2021-05-08 14:55:53 +00:00
parent 5c24f0943c
commit 1b827ac424
17 changed files with 505 additions and 88 deletions

View File

@ -0,0 +1,35 @@
#include "Connection.h"
#include <common/logger_useful.h>
namespace postgres
{
Connection::Connection(const ConnectionInfo & connection_info_, bool replication_)
: connection_info(connection_info_), replication(replication_)
{
if (replication)
{
connection_info = std::make_pair(
fmt::format("{} replication=database", connection_info.first), connection_info.second);
}
}
pqxx::connection & Connection::getRef()
{
connect();
assert(connection != nullptr);
return *connection;
}
void Connection::connect()
{
if (!connection || !connection->is_open())
{
/// Always throws if there is no connection.
connection = std::make_unique<pqxx::connection>(connection_info.first);
if (replication)
connection->set_variable("default_transaction_isolation", "'repeatable read'");
LOG_DEBUG(&Poco::Logger::get("PostgreSQLConnection"), "New connection to {}", connection_info.second);
}
}
}

View File

@ -0,0 +1,30 @@
#pragma once
#include <pqxx/pqxx> // Y_IGNORE
#include <Core/Types.h>
namespace postgres
{
using ConnectionInfo = std::pair<String, String>;
using ConnectionPtr = std::unique_ptr<pqxx::connection>;
class Connection
{
public:
Connection(const ConnectionInfo & connection_info_, bool replication_ = false);
Connection(const Connection & other) = delete;
pqxx::connection & getRef();
void connect();
const ConnectionInfo & getConnectionInfo() { return connection_info; }
private:
ConnectionPtr connection;
ConnectionInfo connection_info;
bool replication;
};
}

View File

@ -0,0 +1,37 @@
#pragma once
#include <pqxx/pqxx> // Y_IGNORE
#include <Core/Types.h>
#include <common/BorrowedObjectPool.h>
namespace postgres
{
using ConnectionPtr = std::unique_ptr<pqxx::connection>;
using Pool = BorrowedObjectPool<ConnectionPtr>;
using PoolPtr = std::shared_ptr<Pool>;
class ConnectionHolder
{
public:
ConnectionHolder(PoolPtr pool_, ConnectionPtr connection_) : pool(pool_), connection(std::move(connection_)) {}
ConnectionHolder(const ConnectionHolder & other) = delete;
~ConnectionHolder() { pool->returnObject(std::move(connection)); }
pqxx::connection & get()
{
assert(connection != nullptr);
return *connection;
}
private:
PoolPtr pool;
ConnectionPtr connection;
};
using ConnectionHolderPtr = std::unique_ptr<ConnectionHolder>;
}

View File

@ -0,0 +1,138 @@
#include "PoolWithFailover.h"
#include "Utils.h"
#include <Common/parseRemoteDescription.h>
#include <Common/Exception.h>
namespace DB
{
namespace ErrorCodes
{
extern const int POSTGRESQL_CONNECTION_FAILURE;
}
}
namespace postgres
{
PoolWithFailover::PoolWithFailover(
const Poco::Util::AbstractConfiguration & config, const String & config_prefix,
size_t pool_size, size_t pool_wait_timeout_, size_t max_tries_)
: pool_wait_timeout(pool_wait_timeout_)
, max_tries(max_tries_)
{
LOG_TRACE(&Poco::Logger::get("PostgreSQLConnectionPool"), "PostgreSQL connection pool size: {}, connection wait timeout: {}, max failover tries: {}",
pool_size, pool_wait_timeout, max_tries_);
auto db = config.getString(config_prefix + ".db", "");
auto host = config.getString(config_prefix + ".host", "");
auto port = config.getUInt(config_prefix + ".port", 0);
auto user = config.getString(config_prefix + ".user", "");
auto password = config.getString(config_prefix + ".password", "");
if (config.has(config_prefix + ".replica"))
{
Poco::Util::AbstractConfiguration::Keys config_keys;
config.keys(config_prefix, config_keys);
for (const auto & config_key : config_keys)
{
if (config_key.starts_with("replica"))
{
std::string replica_name = config_prefix + "." + config_key;
size_t priority = config.getInt(replica_name + ".priority", 0);
auto replica_host = config.getString(replica_name + ".host", host);
auto replica_port = config.getUInt(replica_name + ".port", port);
auto replica_user = config.getString(replica_name + ".user", user);
auto replica_password = config.getString(replica_name + ".password", password);
auto connection_string = formatConnectionString(db, replica_host, replica_port, replica_user, replica_password).first;
replicas_with_priority[priority].emplace_back(connection_string, pool_size);
}
}
}
else
{
auto connection_string = formatConnectionString(db, host, port, user, password).first;
replicas_with_priority[0].emplace_back(connection_string, pool_size);
}
}
PoolWithFailover::PoolWithFailover(
const std::string & database,
const RemoteDescription & addresses,
const std::string & user, const std::string & password,
size_t pool_size, size_t pool_wait_timeout_, size_t max_tries_)
: pool_wait_timeout(pool_wait_timeout_)
, max_tries(max_tries_)
{
LOG_TRACE(&Poco::Logger::get("PostgreSQLConnectionPool"), "PostgreSQL connection pool size: {}, connection wait timeout: {}, max failover tries: {}",
pool_size, pool_wait_timeout, max_tries_);
/// Replicas have the same priority, but traversed replicas are moved to the end of the queue.
for (const auto & [host, port] : addresses)
{
LOG_DEBUG(&Poco::Logger::get("PostgreSQLPoolWithFailover"), "Adding address host: {}, port: {} to connection pool", host, port);
auto connection_string = formatConnectionString(database, host, port, user, password).first;
replicas_with_priority[0].emplace_back(connection_string, pool_size);
}
}
ConnectionHolderPtr PoolWithFailover::get()
{
std::lock_guard lock(mutex);
for (size_t try_idx = 0; try_idx < max_tries; ++try_idx)
{
for (auto & priority : replicas_with_priority)
{
auto & replicas = priority.second;
for (size_t i = 0; i < replicas.size(); ++i)
{
auto & replica = replicas[i];
ConnectionPtr connection;
auto connection_available = replica.pool->tryBorrowObject(connection, []() { return nullptr; }, pool_wait_timeout);
if (!connection_available)
{
LOG_WARNING(log, "Unable to fetch connection within the timeout");
continue;
}
try
{
/// Create a new connection or reopen an old connection if it became invalid.
if (!connection || !connection->is_open())
{
connection = std::make_unique<pqxx::connection>(replica.connection_string);
LOG_DEBUG(log, "New connection to {}:{}", connection->hostname(), connection->port());
}
}
catch (const pqxx::broken_connection & pqxx_error)
{
LOG_ERROR(log, "Connection error: {}", pqxx_error.what());
replica.pool->returnObject(std::move(connection));
continue;
}
catch (...)
{
replica.pool->returnObject(std::move(connection));
throw;
}
auto connection_holder = std::make_unique<ConnectionHolder>(replica.pool, std::move(connection));
/// Move all traversed replicas to the end.
if (replicas.size() > 1)
std::rotate(replicas.begin(), replicas.begin() + i + 1, replicas.end());
return connection_holder;
}
}
}
throw DB::Exception(DB::ErrorCodes::POSTGRESQL_CONNECTION_FAILURE, "Unable to connect to any of the replicas");
}
}

View File

@ -0,0 +1,65 @@
#pragma once
#include "ConnectionHolder.h"
#include <mutex>
#include <Poco/Util/AbstractConfiguration.h>
#include <common/logger_useful.h>
namespace postgres
{
class PoolWithFailover
{
using RemoteDescription = std::vector<std::pair<String, uint16_t>>;
public:
static constexpr inline auto POSTGRESQL_POOL_DEFAULT_SIZE = 16;
static constexpr inline auto POSTGRESQL_POOL_WAIT_TIMEOUT = 5000;
static constexpr inline auto POSTGRESQL_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES = 5;
PoolWithFailover(
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
size_t pool_size = POSTGRESQL_POOL_DEFAULT_SIZE,
size_t pool_wait_timeout = POSTGRESQL_POOL_WAIT_TIMEOUT,
size_t max_tries_ = POSTGRESQL_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES);
PoolWithFailover(
const std::string & database,
const RemoteDescription & addresses,
const std::string & user,
const std::string & password,
size_t pool_size = POSTGRESQL_POOL_DEFAULT_SIZE,
size_t pool_wait_timeout = POSTGRESQL_POOL_WAIT_TIMEOUT,
size_t max_tries_ = POSTGRESQL_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES);
PoolWithFailover(const PoolWithFailover & other) = delete;
ConnectionHolderPtr get();
private:
struct PoolHolder
{
String connection_string;
PoolPtr pool;
PoolHolder(const String & connection_string_, size_t pool_size)
: connection_string(connection_string_), pool(std::make_shared<Pool>(pool_size)) {}
};
/// Highest priority is 0, the bigger the number in map, the less the priority
using Replicas = std::vector<PoolHolder>;
using ReplicasWithPriority = std::map<size_t, Replicas>;
ReplicasWithPriority replicas_with_priority;
size_t pool_wait_timeout;
size_t max_tries;
std::mutex mutex;
Poco::Logger * log = &Poco::Logger::get("PostgreSQLConnectionPool");
};
using PoolWithFailoverPtr = std::shared_ptr<PoolWithFailover>;
}

View File

@ -0,0 +1,19 @@
#include "Utils.h"
#include <IO/Operators.h>
namespace postgres
{
ConnectionInfo formatConnectionString(String dbname, String host, UInt16 port, String user, String password)
{
DB::WriteBufferFromOwnString out;
out << "dbname=" << DB::quote << dbname
<< " host=" << DB::quote << host
<< " port=" << port
<< " user=" << DB::quote << user
<< " password=" << DB::quote << password
<< " connect_timeout=10";
return std::make_pair(out.str(), host + ':' + DB::toString(port));
}
}

View File

@ -0,0 +1,37 @@
#pragma once
#include <pqxx/pqxx> // Y_IGNORE
#include <Core/Types.h>
#include "Connection.h"
namespace pqxx
{
using ReadTransaction = pqxx::read_transaction;
using ReplicationTransaction = pqxx::transaction<isolation_level::repeatable_read, write_policy::read_only>;
}
namespace postgres
{
ConnectionInfo formatConnectionString(String dbname, String host, UInt16 port, String user, String password);
Connection createReplicationConnection(const ConnectionInfo & connection_info);
template <typename T>
class Transaction
{
public:
Transaction(pqxx::connection & connection) : transaction(connection) {}
~Transaction() { transaction.commit(); }
T & getRef() { return transaction; }
void exec(const String & query) { transaction.exec(query); }
private:
T transaction;
};
}

View File

@ -24,7 +24,7 @@ namespace ErrorCodes
MaterializePostgreSQLConsumer::MaterializePostgreSQLConsumer( MaterializePostgreSQLConsumer::MaterializePostgreSQLConsumer(
ContextPtr context_, ContextPtr context_,
postgres::Connection && connection_, std::shared_ptr<postgres::Connection> connection_,
const std::string & replication_slot_name_, const std::string & replication_slot_name_,
const std::string & publication_name_, const std::string & publication_name_,
const std::string & metadata_path, const std::string & metadata_path,
@ -37,7 +37,7 @@ MaterializePostgreSQLConsumer::MaterializePostgreSQLConsumer(
, replication_slot_name(replication_slot_name_) , replication_slot_name(replication_slot_name_)
, publication_name(publication_name_) , publication_name(publication_name_)
, metadata(metadata_path) , metadata(metadata_path)
, connection(std::move(connection_)) , connection(connection_)
, current_lsn(start_lsn) , current_lsn(start_lsn)
, max_block_size(max_block_size_) , max_block_size(max_block_size_)
, allow_automatic_update(allow_automatic_update_) , allow_automatic_update(allow_automatic_update_)
@ -88,7 +88,7 @@ void MaterializePostgreSQLConsumer::readMetadata()
if (!metadata.lsn().empty()) if (!metadata.lsn().empty())
{ {
auto tx = std::make_shared<pqxx::nontransaction>(connection.getRef()); auto tx = std::make_shared<pqxx::nontransaction>(connection->getRef());
final_lsn = metadata.lsn(); final_lsn = metadata.lsn();
final_lsn = advanceLSN(tx); final_lsn = advanceLSN(tx);
tx->commit(); tx->commit();
@ -600,7 +600,7 @@ bool MaterializePostgreSQLConsumer::readFromReplicationSlot()
try try
{ {
tx = std::make_shared<pqxx::nontransaction>(connection.getRef()); tx = std::make_shared<pqxx::nontransaction>(connection->getRef());
/// Read up to max_block_size rows changes (upto_n_changes parameter). It might return larger number as the limit /// Read up to max_block_size rows changes (upto_n_changes parameter). It might return larger number as the limit
/// is checked only after each transaction block. /// is checked only after each transaction block.

View File

@ -28,7 +28,7 @@ public:
MaterializePostgreSQLConsumer( MaterializePostgreSQLConsumer(
ContextPtr context_, ContextPtr context_,
postgres::Connection && connection_, std::shared_ptr<postgres::Connection> connection_,
const std::string & replication_slot_name_, const std::string & replication_slot_name_,
const std::string & publication_name_, const std::string & publication_name_,
const std::string & metadata_path, const std::string & metadata_path,
@ -106,7 +106,7 @@ private:
const std::string replication_slot_name, publication_name; const std::string replication_slot_name, publication_name;
MaterializePostgreSQLMetadata metadata; MaterializePostgreSQLMetadata metadata;
postgres::Connection connection; std::shared_ptr<postgres::Connection> connection;
std::string current_lsn, final_lsn; std::string current_lsn, final_lsn;
const size_t max_block_size; const size_t max_block_size;

View File

@ -9,7 +9,7 @@
#include <Interpreters/InterpreterRenameQuery.h> #include <Interpreters/InterpreterRenameQuery.h>
#include <Common/setThreadName.h> #include <Common/setThreadName.h>
#include <DataStreams/copyData.h> #include <DataStreams/copyData.h>
#include <Poco/File.h> #include <filesystem>
namespace DB namespace DB
@ -42,7 +42,7 @@ PostgreSQLReplicationHandler::PostgreSQLReplicationHandler(
, allow_automatic_update(allow_automatic_update_) , allow_automatic_update(allow_automatic_update_)
, is_materialize_postgresql_database(is_materialize_postgresql_database_) , is_materialize_postgresql_database(is_materialize_postgresql_database_)
, tables_list(tables_list_) , tables_list(tables_list_)
, connection(connection_info_) , connection(std::make_shared<postgres::Connection>(connection_info_))
{ {
replication_slot = fmt::format("{}_ch_replication_slot", current_database_name); replication_slot = fmt::format("{}_ch_replication_slot", current_database_name);
publication_name = fmt::format("{}_ch_publication", current_database_name); publication_name = fmt::format("{}_ch_publication", current_database_name);
@ -68,8 +68,7 @@ void PostgreSQLReplicationHandler::waitConnectionAndStart()
{ {
try try
{ {
/// Will throw pqxx::broken_connection if no connection at the moment connection->connect(); /// Will throw pqxx::broken_connection if no connection at the moment
connection.isValid();
startSynchronization(false); startSynchronization(false);
} }
catch (const pqxx::broken_connection & pqxx_error) catch (const pqxx::broken_connection & pqxx_error)
@ -95,7 +94,7 @@ void PostgreSQLReplicationHandler::shutdown()
void PostgreSQLReplicationHandler::startSynchronization(bool throw_on_error) void PostgreSQLReplicationHandler::startSynchronization(bool throw_on_error)
{ {
{ {
postgres::Transaction<pqxx::work> tx(connection.getRef()); postgres::Transaction<pqxx::work> tx(connection->getRef());
createPublicationIfNeeded(tx.getRef()); createPublicationIfNeeded(tx.getRef());
} }
@ -121,6 +120,8 @@ void PostgreSQLReplicationHandler::startSynchronization(bool throw_on_error)
e.addMessage("while loading table {}.{}", remote_database_name, table_name); e.addMessage("while loading table {}.{}", remote_database_name, table_name);
tryLogCurrentException(__PRETTY_FUNCTION__); tryLogCurrentException(__PRETTY_FUNCTION__);
/// Throw in case of single MaterializePostgreSQL storage, becuase initial setup is done immediately
/// (unlike database engine where it is done in a separate thread).
if (throw_on_error) if (throw_on_error)
throw; throw;
} }
@ -134,16 +135,17 @@ void PostgreSQLReplicationHandler::startSynchronization(bool throw_on_error)
{ {
initial_sync(); initial_sync();
} }
/// Replication slot depends on publication, so if replication slot exists and new
/// publication was just created - drop that replication slot and start from scratch.
else if (new_publication_created) else if (new_publication_created)
{ {
/// Replication slot depends on publication, so if replication slot exists and new
/// publication was just created - drop that replication slot and start from scratch.
dropReplicationSlot(tx.getRef()); dropReplicationSlot(tx.getRef());
initial_sync(); initial_sync();
} }
/// Synchronization and initial load already took place - do not create any new tables, just fetch StoragePtr's
/// and pass them to replication consumer.
else else
{ {
/// Synchronization and initial load already took place.
LOG_TRACE(log, "Loading {} tables...", materialized_storages.size()); LOG_TRACE(log, "Loading {} tables...", materialized_storages.size());
for (const auto & [table_name, storage] : materialized_storages) for (const auto & [table_name, storage] : materialized_storages)
{ {
@ -179,9 +181,12 @@ void PostgreSQLReplicationHandler::startSynchronization(bool throw_on_error)
} }
} }
/// Pass current connection to consumer. It is not std::moved implicitly, but a shared_ptr is passed.
/// Consumer and replication handler are always executed one after another (not concurrently) and share the same connection.
/// Handler uses it only for loadFromSnapshot and shutdown methods.
consumer = std::make_shared<MaterializePostgreSQLConsumer>( consumer = std::make_shared<MaterializePostgreSQLConsumer>(
context, context,
std::move(connection), connection,
replication_slot, replication_slot,
publication_name, publication_name,
metadata_path, metadata_path,
@ -197,10 +202,10 @@ void PostgreSQLReplicationHandler::startSynchronization(bool throw_on_error)
} }
StoragePtr PostgreSQLReplicationHandler::loadFromSnapshot(std::string & snapshot_name, const String & table_name, StoragePtr PostgreSQLReplicationHandler::loadFromSnapshot(String & snapshot_name, const String & table_name,
StorageMaterializePostgreSQL * materialized_storage) StorageMaterializePostgreSQL * materialized_storage)
{ {
auto tx = std::make_shared<pqxx::ReplicationTransaction>(connection.getRef()); auto tx = std::make_shared<pqxx::ReplicationTransaction>(connection->getRef());
std::string query_str = fmt::format("SET TRANSACTION SNAPSHOT '{}'", snapshot_name); std::string query_str = fmt::format("SET TRANSACTION SNAPSHOT '{}'", snapshot_name);
tx->exec(query_str); tx->exec(query_str);
@ -242,7 +247,16 @@ void PostgreSQLReplicationHandler::consumerFunc()
bool schedule_now = consumer->consume(skipped_tables); bool schedule_now = consumer->consume(skipped_tables);
if (!skipped_tables.empty()) if (!skipped_tables.empty())
reloadFromSnapshot(skipped_tables); {
try
{
reloadFromSnapshot(skipped_tables);
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
if (stop_synchronization) if (stop_synchronization)
return; return;
@ -270,6 +284,7 @@ bool PostgreSQLReplicationHandler::isPublicationExist(pqxx::work & tx)
void PostgreSQLReplicationHandler::createPublicationIfNeeded(pqxx::work & tx, bool create_without_check) void PostgreSQLReplicationHandler::createPublicationIfNeeded(pqxx::work & tx, bool create_without_check)
{ {
/// For database engine a publication can be created earlier than in startReplication().
if (new_publication_created) if (new_publication_created)
return; return;
@ -370,12 +385,10 @@ void PostgreSQLReplicationHandler::dropPublication(pqxx::nontransaction & tx)
void PostgreSQLReplicationHandler::shutdownFinal() void PostgreSQLReplicationHandler::shutdownFinal()
{ {
if (Poco::File(metadata_path).exists()) if (std::filesystem::exists(metadata_path))
Poco::File(metadata_path).remove(); std::filesystem::remove(metadata_path);
postgres::Connection connection_(connection_info);
postgres::Transaction<pqxx::nontransaction> tx(connection_.getRef());
postgres::Transaction<pqxx::nontransaction> tx(connection->getRef());
dropPublication(tx.getRef()); dropPublication(tx.getRef());
if (isReplicationSlotExist(tx.getRef(), replication_slot)) if (isReplicationSlotExist(tx.getRef(), replication_slot))
dropReplicationSlot(tx.getRef()); dropReplicationSlot(tx.getRef());
@ -432,80 +445,69 @@ void PostgreSQLReplicationHandler::reloadFromSnapshot(const std::vector<std::pai
/// If `allow_automatic_update` is true, create a new table in the background, load new table schema /// If `allow_automatic_update` is true, create a new table in the background, load new table schema
/// and all data from scratch. Then execute REPLACE query. /// and all data from scratch. Then execute REPLACE query.
/// This is only allowed for MaterializePostgreSQL database engine. /// This is only allowed for MaterializePostgreSQL database engine.
try postgres::Connection replication_connection(connection_info, /* replication */true);
postgres::Transaction<pqxx::nontransaction> tx(replication_connection.getRef());
std::string snapshot_name, start_lsn;
createReplicationSlot(tx.getRef(), start_lsn, snapshot_name, true);
for (const auto & [relation_id, table_name] : relation_data)
{ {
postgres::Connection replication_connection(connection_info, /* replication */true); auto storage = DatabaseCatalog::instance().getTable(StorageID(current_database_name, table_name), context);
postgres::Transaction<pqxx::nontransaction> tx(replication_connection.getRef()); auto * materialized_storage = storage->as <StorageMaterializePostgreSQL>();
std::string snapshot_name, start_lsn; auto temp_materialized_storage = materialized_storage->createTemporary();
createReplicationSlot(tx.getRef(), start_lsn, snapshot_name, true);
for (const auto & [relation_id, table_name] : relation_data) /// This snapshot is valid up to the end of the transaction, which exported it.
StoragePtr temp_nested_storage = loadFromSnapshot(snapshot_name, table_name, temp_materialized_storage->as <StorageMaterializePostgreSQL>());
auto table_id = materialized_storage->getNestedStorageID();
auto temp_table_id = temp_nested_storage->getStorageID();
LOG_TRACE(log, "Starting background update of table {}.{} ({}) with table {}.{} ({})",
table_id.database_name, table_id.table_name, toString(table_id.uuid),
temp_table_id.database_name, temp_table_id.table_name, toString(temp_table_id.uuid));
auto ast_rename = std::make_shared<ASTRenameQuery>();
ASTRenameQuery::Element elem
{ {
auto storage = DatabaseCatalog::instance().getTable( ASTRenameQuery::Table{table_id.database_name, table_id.table_name},
StorageID(current_database_name, table_name), ASTRenameQuery::Table{temp_table_id.database_name, temp_table_id.table_name}
context); };
auto * materialized_storage = storage->as <StorageMaterializePostgreSQL>(); ast_rename->elements.push_back(std::move(elem));
ast_rename->exchange = true;
auto temp_materialized_storage = materialized_storage->createTemporary(); auto nested_context = materialized_storage->getNestedTableContext();
/// This snapshot is valid up to the end of the transaction, which exported it. try
StoragePtr temp_nested_storage = loadFromSnapshot(snapshot_name, table_name, temp_materialized_storage->as <StorageMaterializePostgreSQL>()); {
auto materialized_table_lock = materialized_storage->lockForShare(String(), context->getSettingsRef().lock_acquire_timeout);
InterpreterRenameQuery(ast_rename, nested_context).execute();
auto table_id = materialized_storage->getNestedStorageID();
auto temp_table_id = temp_nested_storage->getStorageID();
LOG_TRACE(log, "Starting background update of table {}.{} ({}) with table {}.{} ({})",
table_id.database_name, table_id.table_name, toString(table_id.uuid),
temp_table_id.database_name, temp_table_id.table_name, toString(temp_table_id.uuid));
auto ast_rename = std::make_shared<ASTRenameQuery>();
ASTRenameQuery::Element elem
{ {
ASTRenameQuery::Table{table_id.database_name, table_id.table_name}, auto nested_storage = DatabaseCatalog::instance().getTable(StorageID(table_id.database_name, table_id.table_name), nested_context);
ASTRenameQuery::Table{temp_table_id.database_name, temp_table_id.table_name} auto nested_table_lock = nested_storage->lockForShare(String(), context->getSettingsRef().lock_acquire_timeout);
}; auto nested_table_id = nested_storage->getStorageID();
ast_rename->elements.push_back(std::move(elem));
ast_rename->exchange = true;
auto nested_context = materialized_storage->getNestedTableContext(); materialized_storage->setNestedStorageID(nested_table_id);
nested_storage = materialized_storage->prepare();
LOG_TRACE(log, "Updated table {}.{} ({})", nested_table_id.database_name, nested_table_id.table_name, toString(nested_table_id.uuid));
try /// Pass pointer to new nested table into replication consumer, remove current table from skip list and set start lsn position.
{ consumer->updateNested(table_name, nested_storage, relation_id, start_lsn);
auto materialized_table_lock = materialized_storage->lockForShare(String(), context->getSettingsRef().lock_acquire_timeout);
InterpreterRenameQuery(ast_rename, nested_context).execute();
{
auto nested_storage = DatabaseCatalog::instance().getTable(StorageID(table_id.database_name, table_id.table_name), nested_context);
auto nested_table_lock = nested_storage->lockForShare(String(), context->getSettingsRef().lock_acquire_timeout);
auto nested_table_id = nested_storage->getStorageID();
materialized_storage->setNestedStorageID(nested_table_id);
nested_storage = materialized_storage->prepare();
LOG_TRACE(log, "Updated table {}.{} ({})", nested_table_id.database_name, nested_table_id.table_name, toString(nested_table_id.uuid));
/// Pass pointer to new nested table into replication consumer, remove current table from skip list and set start lsn position.
consumer->updateNested(table_name, nested_storage, relation_id, start_lsn);
}
LOG_DEBUG(log, "Dropping table {}.{} ({})", temp_table_id.database_name, temp_table_id.table_name, toString(temp_table_id.uuid));
InterpreterDropQuery::executeDropQuery(ASTDropQuery::Kind::Drop, nested_context, nested_context, temp_table_id, true);
dropReplicationSlot(tx.getRef(), /* temporary */true);
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
} }
LOG_DEBUG(log, "Dropping table {}.{} ({})", temp_table_id.database_name, temp_table_id.table_name, toString(temp_table_id.uuid));
InterpreterDropQuery::executeDropQuery(ASTDropQuery::Kind::Drop, nested_context, nested_context, temp_table_id, true);
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
} }
} }
catch (...)
{ dropReplicationSlot(tx.getRef(), /* temporary */true);
tryLogCurrentException(__PRETTY_FUNCTION__);
}
} }
} }
#endif #endif

View File

@ -113,7 +113,8 @@ private:
String replication_slot, publication_name; String replication_slot, publication_name;
postgres::Connection connection; /// Shared between replication_consumer and replication_handler, but never accessed concurrently.
std::shared_ptr<postgres::Connection> connection;
/// Replication consumer. Manages decoding of replication stream and syncing into tables. /// Replication consumer. Manages decoding of replication stream and syncing into tables.
std::shared_ptr<MaterializePostgreSQLConsumer> consumer; std::shared_ptr<MaterializePostgreSQLConsumer> consumer;

View File

@ -173,7 +173,7 @@ private:
/// It results in the fact: single MaterializePostgreSQL storage is created only if its nested table is created. /// It results in the fact: single MaterializePostgreSQL storage is created only if its nested table is created.
/// In case of attach - this setup will be done in a separate thread in the background. It will also /// In case of attach - this setup will be done in a separate thread in the background. It will also
/// be checked for nested table and attempted to load it if it does not exist for some reason. /// be checked for nested table and attempted to load it if it does not exist for some reason.
bool is_attach; bool is_attach = true;
}; };
} }

View File

@ -43,9 +43,9 @@ StoragePtr TableFunctionPostgreSQL::executeImpl(const ASTPtr & /*ast_function*/,
ColumnsDescription TableFunctionPostgreSQL::getActualTableStructure(ContextPtr context) const ColumnsDescription TableFunctionPostgreSQL::getActualTableStructure(ContextPtr context) const
{ {
const bool use_nulls = context->getSettingsRef().external_table_functions_use_nulls; const bool use_nulls = context->getSettingsRef().external_table_functions_use_nulls;
auto connection = connection_pool->get(); auto connection_holder = connection_pool->get();
auto columns = fetchPostgreSQLTableStructure( auto columns = fetchPostgreSQLTableStructure(
connection->conn(), connection_holder->get(),
remote_table_schema.empty() ? doubleQuoteString(remote_table_name) remote_table_schema.empty() ? doubleQuoteString(remote_table_name)
: doubleQuoteString(remote_table_schema) + '.' + doubleQuoteString(remote_table_name), : doubleQuoteString(remote_table_schema) + '.' + doubleQuoteString(remote_table_name),
use_nulls).columns; use_nulls).columns;

View File

@ -5,7 +5,7 @@
#if USE_LIBPQXX #if USE_LIBPQXX
#include <TableFunctions/ITableFunction.h> #include <TableFunctions/ITableFunction.h>
#include <Storages/PostgreSQL/PoolWithFailover.h> #include <Core/PostgreSQL/PoolWithFailover.h>
namespace DB namespace DB

View File

@ -0,0 +1,30 @@
<?xml version="1.0"?>
<yandex>
<logger>
<level>trace</level>
<log>/var/log/clickhouse-server/clickhouse-server.log</log>
<errorlog>/var/log/clickhouse-server/clickhouse-server.err.log</errorlog>
<size>1000M</size>
<count>10</count>
</logger>
<tcp_port>9000</tcp_port>
<listen_host>127.0.0.1</listen_host>
<openSSL>
<client>
<cacheSessions>true</cacheSessions>
<verificationMode>none</verificationMode>
<invalidCertificateHandler>
<name>AcceptCertificateHandler</name>
</invalidCertificateHandler>
</client>
</openSSL>
<max_concurrent_queries>500</max_concurrent_queries>
<mark_cache_size>5368709120</mark_cache_size>
<path>./clickhouse/</path>
<users_config>users.xml</users_config>
<dictionaries_config>/etc/clickhouse-server/config.d/*.xml</dictionaries_config>
</yandex>

View File

@ -0,0 +1,23 @@
<?xml version="1.0"?>
<yandex>
<profiles>
<default>
</default>
</profiles>
<users>
<default>
<password></password>
<networks incl="networks" replace="replace">
<ip>::/0</ip>
</networks>
<profile>default</profile>
<quota>default</quota>
</default>
</users>
<quotas>
<default>
</default>
</quotas>
</yandex>