mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-25 00:52:02 +00:00
commit
250f495456
@ -23,6 +23,7 @@ void Connection::execWithRetry(const std::function<void(pqxx::nontransaction &)>
|
||||
{
|
||||
pqxx::nontransaction tx(getRef());
|
||||
exec(tx);
|
||||
break;
|
||||
}
|
||||
catch (const pqxx::broken_connection & e)
|
||||
{
|
||||
|
@ -293,7 +293,7 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
|
||||
postgresql_replica_settings->loadFromQuery(*engine_define);
|
||||
|
||||
return std::make_shared<DatabaseMaterializedPostgreSQL>(
|
||||
context, metadata_path, uuid, engine_define,
|
||||
context, metadata_path, uuid, engine_define, create.attach,
|
||||
database_name, postgres_database_name, connection_info,
|
||||
std::move(postgresql_replica_settings));
|
||||
}
|
||||
|
@ -37,12 +37,14 @@ DatabaseMaterializedPostgreSQL::DatabaseMaterializedPostgreSQL(
|
||||
const String & metadata_path_,
|
||||
UUID uuid_,
|
||||
const ASTStorage * database_engine_define_,
|
||||
bool is_attach_,
|
||||
const String & database_name_,
|
||||
const String & postgres_database_name,
|
||||
const postgres::ConnectionInfo & connection_info_,
|
||||
std::unique_ptr<MaterializedPostgreSQLSettings> settings_)
|
||||
: DatabaseAtomic(database_name_, metadata_path_, uuid_, "DatabaseMaterializedPostgreSQL (" + database_name_ + ")", context_)
|
||||
, database_engine_define(database_engine_define_->clone())
|
||||
, is_attach(is_attach_)
|
||||
, remote_database_name(postgres_database_name)
|
||||
, connection_info(connection_info_)
|
||||
, settings(std::move(settings_))
|
||||
@ -58,6 +60,7 @@ void DatabaseMaterializedPostgreSQL::startSynchronization()
|
||||
database_name,
|
||||
connection_info,
|
||||
getContext(),
|
||||
is_attach,
|
||||
settings->materialized_postgresql_max_block_size.value,
|
||||
settings->materialized_postgresql_allow_automatic_update,
|
||||
/* is_materialized_postgresql_database = */ true,
|
||||
|
@ -33,6 +33,7 @@ public:
|
||||
const String & metadata_path_,
|
||||
UUID uuid_,
|
||||
const ASTStorage * database_engine_define_,
|
||||
bool is_attach_,
|
||||
const String & database_name_,
|
||||
const String & postgres_database_name,
|
||||
const postgres::ConnectionInfo & connection_info,
|
||||
@ -63,6 +64,7 @@ private:
|
||||
void startSynchronization();
|
||||
|
||||
ASTPtr database_engine_define;
|
||||
bool is_attach;
|
||||
String remote_database_name;
|
||||
postgres::ConnectionInfo connection_info;
|
||||
std::unique_ptr<MaterializedPostgreSQLSettings> settings;
|
||||
|
@ -29,12 +29,14 @@ PostgreSQLReplicationHandler::PostgreSQLReplicationHandler(
|
||||
const String & current_database_name_,
|
||||
const postgres::ConnectionInfo & connection_info_,
|
||||
ContextPtr context_,
|
||||
bool is_attach_,
|
||||
const size_t max_block_size_,
|
||||
bool allow_automatic_update_,
|
||||
bool is_materialized_postgresql_database_,
|
||||
const String tables_list_)
|
||||
: log(&Poco::Logger::get("PostgreSQLReplicationHandler"))
|
||||
, context(context_)
|
||||
, is_attach(is_attach_)
|
||||
, remote_database_name(remote_database_name_)
|
||||
, current_database_name(current_database_name_)
|
||||
, connection_info(connection_info_)
|
||||
@ -145,10 +147,8 @@ void PostgreSQLReplicationHandler::startSynchronization(bool throw_on_error)
|
||||
{
|
||||
initial_sync();
|
||||
}
|
||||
/// Replication slot depends on publication, so if replication slot exists and new
|
||||
/// publication was just created - drop that replication slot and start from scratch.
|
||||
/// TODO: tests
|
||||
else if (new_publication_created)
|
||||
/// Always drop replication slot if it is CREATE query and not ATTACH.
|
||||
else if (!is_attach || new_publication)
|
||||
{
|
||||
dropReplicationSlot(tx);
|
||||
initial_sync();
|
||||
@ -285,22 +285,25 @@ bool PostgreSQLReplicationHandler::isPublicationExist(pqxx::work & tx)
|
||||
std::string query_str = fmt::format("SELECT exists (SELECT 1 FROM pg_publication WHERE pubname = '{}')", publication_name);
|
||||
pqxx::result result{tx.exec(query_str)};
|
||||
assert(!result.empty());
|
||||
bool publication_exists = (result[0][0].as<std::string>() == "t");
|
||||
|
||||
if (publication_exists)
|
||||
LOG_INFO(log, "Publication {} already exists. Using existing version", publication_name);
|
||||
|
||||
return publication_exists;
|
||||
return result[0][0].as<std::string>() == "t";
|
||||
}
|
||||
|
||||
|
||||
void PostgreSQLReplicationHandler::createPublicationIfNeeded(pqxx::work & tx, bool create_without_check)
|
||||
void PostgreSQLReplicationHandler::createPublicationIfNeeded(pqxx::work & tx)
|
||||
{
|
||||
/// For database engine a publication can be created earlier than in startReplication().
|
||||
if (new_publication_created)
|
||||
return;
|
||||
auto publication_exists = isPublicationExist(tx);
|
||||
|
||||
if (create_without_check || !isPublicationExist(tx))
|
||||
if (!is_attach && publication_exists)
|
||||
{
|
||||
/// This is a case for single Materialized storage. In case of database engine this check is done in advance.
|
||||
LOG_WARNING(log,
|
||||
"Publication {} already exists, but it is a CREATE query, not ATTACH. Publication will be dropped",
|
||||
publication_name);
|
||||
|
||||
connection->execWithRetry([&](pqxx::nontransaction & tx_){ dropPublication(tx_); });
|
||||
}
|
||||
|
||||
if (!is_attach || !publication_exists)
|
||||
{
|
||||
if (tables_list.empty())
|
||||
{
|
||||
@ -320,8 +323,8 @@ void PostgreSQLReplicationHandler::createPublicationIfNeeded(pqxx::work & tx, bo
|
||||
try
|
||||
{
|
||||
tx.exec(query_str);
|
||||
new_publication_created = true;
|
||||
LOG_TRACE(log, "Created publication {} with tables list: {}", publication_name, tables_list);
|
||||
new_publication = true;
|
||||
}
|
||||
catch (Exception & e)
|
||||
{
|
||||
@ -329,6 +332,10 @@ void PostgreSQLReplicationHandler::createPublicationIfNeeded(pqxx::work & tx, bo
|
||||
throw;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG_TRACE(log, "Using existing publication ({}) version", publication_name);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -401,6 +408,7 @@ void PostgreSQLReplicationHandler::dropPublication(pqxx::nontransaction & tx)
|
||||
{
|
||||
std::string query_str = fmt::format("DROP PUBLICATION IF EXISTS {}", publication_name);
|
||||
tx.exec(query_str);
|
||||
LOG_TRACE(log, "Dropped publication: {}", publication_name);
|
||||
}
|
||||
|
||||
|
||||
@ -438,9 +446,11 @@ void PostgreSQLReplicationHandler::shutdownFinal()
|
||||
NameSet PostgreSQLReplicationHandler::fetchRequiredTables(postgres::Connection & connection_)
|
||||
{
|
||||
pqxx::work tx(connection_.getRef());
|
||||
bool publication_exists_before_startup = isPublicationExist(tx);
|
||||
NameSet result_tables;
|
||||
|
||||
bool publication_exists_before_startup = isPublicationExist(tx);
|
||||
LOG_DEBUG(log, "Publication exists: {}, is attach: {}", publication_exists_before_startup, is_attach);
|
||||
|
||||
Strings expected_tables;
|
||||
if (!tables_list.empty())
|
||||
{
|
||||
@ -453,49 +463,58 @@ NameSet PostgreSQLReplicationHandler::fetchRequiredTables(postgres::Connection &
|
||||
|
||||
if (publication_exists_before_startup)
|
||||
{
|
||||
if (tables_list.empty())
|
||||
if (!is_attach)
|
||||
{
|
||||
/// There is no tables list, but publication already exists, then the expected behaviour
|
||||
/// is to replicate the whole database. But it could be a server restart, so we can't drop it.
|
||||
LOG_WARNING(log,
|
||||
"Publication {} already exists and tables list is empty. Assuming publication is correct",
|
||||
"Publication {} already exists, but it is a CREATE query, not ATTACH. Publication will be dropped",
|
||||
publication_name);
|
||||
|
||||
result_tables = fetchPostgreSQLTablesList(tx);
|
||||
connection->execWithRetry([&](pqxx::nontransaction & tx_){ dropPublication(tx_); });
|
||||
}
|
||||
/// Check tables list from publication is the same as expected tables list.
|
||||
/// If not - drop publication and return expected tables list.
|
||||
else
|
||||
{
|
||||
result_tables = fetchTablesFromPublication(tx);
|
||||
NameSet diff;
|
||||
std::set_symmetric_difference(expected_tables.begin(), expected_tables.end(),
|
||||
result_tables.begin(), result_tables.end(),
|
||||
std::inserter(diff, diff.begin()));
|
||||
if (!diff.empty())
|
||||
if (tables_list.empty())
|
||||
{
|
||||
String diff_tables;
|
||||
for (const auto & table_name : diff)
|
||||
{
|
||||
if (!diff_tables.empty())
|
||||
diff_tables += ", ";
|
||||
diff_tables += table_name;
|
||||
}
|
||||
|
||||
LOG_WARNING(log,
|
||||
"Publication {} already exists, but specified tables list differs from publication tables list in tables: {}",
|
||||
publication_name, diff_tables);
|
||||
"Publication {} already exists and tables list is empty. Assuming publication is correct.",
|
||||
publication_name);
|
||||
|
||||
connection->execWithRetry([&](pqxx::nontransaction & tx_){ dropPublication(tx_); });
|
||||
result_tables = fetchPostgreSQLTablesList(tx);
|
||||
}
|
||||
/// Check tables list from publication is the same as expected tables list.
|
||||
/// If not - drop publication and return expected tables list.
|
||||
else
|
||||
{
|
||||
result_tables = fetchTablesFromPublication(tx);
|
||||
NameSet diff;
|
||||
std::set_symmetric_difference(expected_tables.begin(), expected_tables.end(),
|
||||
result_tables.begin(), result_tables.end(),
|
||||
std::inserter(diff, diff.begin()));
|
||||
if (!diff.empty())
|
||||
{
|
||||
String diff_tables;
|
||||
for (const auto & table_name : diff)
|
||||
{
|
||||
if (!diff_tables.empty())
|
||||
diff_tables += ", ";
|
||||
diff_tables += table_name;
|
||||
}
|
||||
|
||||
LOG_WARNING(log,
|
||||
"Publication {} already exists, but specified tables list differs from publication tables list in tables: {}.",
|
||||
publication_name, diff_tables);
|
||||
|
||||
connection->execWithRetry([&](pqxx::nontransaction & tx_){ dropPublication(tx_); });
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
|
||||
if (result_tables.empty())
|
||||
{
|
||||
if (!tables_list.empty())
|
||||
{
|
||||
tx.commit();
|
||||
return NameSet(expected_tables.begin(), expected_tables.end());
|
||||
result_tables = NameSet(expected_tables.begin(), expected_tables.end());
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@ -24,6 +24,7 @@ public:
|
||||
const String & current_database_name_,
|
||||
const postgres::ConnectionInfo & connection_info_,
|
||||
ContextPtr context_,
|
||||
bool is_attach_,
|
||||
const size_t max_block_size_,
|
||||
bool allow_automatic_update_,
|
||||
bool is_materialized_postgresql_database_,
|
||||
@ -54,7 +55,7 @@ private:
|
||||
|
||||
bool isPublicationExist(pqxx::work & tx);
|
||||
|
||||
void createPublicationIfNeeded(pqxx::work & tx, bool create_without_check = false);
|
||||
void createPublicationIfNeeded(pqxx::work & tx);
|
||||
|
||||
NameSet fetchTablesFromPublication(pqxx::work & tx);
|
||||
|
||||
@ -83,6 +84,12 @@ private:
|
||||
Poco::Logger * log;
|
||||
ContextPtr context;
|
||||
|
||||
/// If it is not attach, i.e. a create query, then if publication already exists - always drop it.
|
||||
bool is_attach;
|
||||
|
||||
/// If new publication is created at start up - always drop replication slot if it exists.
|
||||
bool new_publication = false;
|
||||
|
||||
const String remote_database_name, current_database_name;
|
||||
|
||||
/// Connection string and address for logs.
|
||||
@ -113,11 +120,6 @@ private:
|
||||
|
||||
std::atomic<bool> stop_synchronization = false;
|
||||
|
||||
/// For database engine there are 2 places where it is checked for publication:
|
||||
/// 1. to fetch tables list from already created publication when database is loaded
|
||||
/// 2. at replication startup
|
||||
bool new_publication_created = false;
|
||||
|
||||
/// MaterializedPostgreSQL tables. Used for managing all operations with its internal nested tables.
|
||||
MaterializedStorages materialized_storages;
|
||||
|
||||
|
@ -70,6 +70,7 @@ StorageMaterializedPostgreSQL::StorageMaterializedPostgreSQL(
|
||||
table_id_.database_name,
|
||||
connection_info,
|
||||
getContext(),
|
||||
is_attach,
|
||||
replication_settings->materialized_postgresql_max_block_size.value,
|
||||
/* allow_automatic_update */ false, /* is_materialized_postgresql_database */false);
|
||||
}
|
||||
|
@ -1068,7 +1068,7 @@ class ClickHouseCluster:
|
||||
logging.error("Can't connect to MySQL:{}".format(errors))
|
||||
raise Exception("Cannot wait MySQL container")
|
||||
|
||||
def wait_postgres_to_start(self, timeout=180):
|
||||
def wait_postgres_to_start(self, timeout=260):
|
||||
self.postgres_ip = self.get_instance_ip(self.postgres_host)
|
||||
start = time.time()
|
||||
while time.time() - start < timeout:
|
||||
|
Loading…
Reference in New Issue
Block a user