More correct startup on create query

This commit is contained in:
kssenii 2021-07-04 14:56:31 +00:00
parent 8351f1db99
commit 32b7d7b750
8 changed files with 80 additions and 52 deletions

View File

@ -23,6 +23,7 @@ void Connection::execWithRetry(const std::function<void(pqxx::nontransaction &)>
{
pqxx::nontransaction tx(getRef());
exec(tx);
break;
}
catch (const pqxx::broken_connection & e)
{

View File

@ -293,7 +293,7 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
postgresql_replica_settings->loadFromQuery(*engine_define);
return std::make_shared<DatabaseMaterializedPostgreSQL>(
context, metadata_path, uuid, engine_define,
context, metadata_path, uuid, engine_define, create.attach,
database_name, postgres_database_name, connection_info,
std::move(postgresql_replica_settings));
}

View File

@ -37,12 +37,14 @@ DatabaseMaterializedPostgreSQL::DatabaseMaterializedPostgreSQL(
const String & metadata_path_,
UUID uuid_,
const ASTStorage * database_engine_define_,
bool is_attach_,
const String & database_name_,
const String & postgres_database_name,
const postgres::ConnectionInfo & connection_info_,
std::unique_ptr<MaterializedPostgreSQLSettings> settings_)
: DatabaseAtomic(database_name_, metadata_path_, uuid_, "DatabaseMaterializedPostgreSQL (" + database_name_ + ")", context_)
, database_engine_define(database_engine_define_->clone())
, is_attach(is_attach_)
, remote_database_name(postgres_database_name)
, connection_info(connection_info_)
, settings(std::move(settings_))
@ -58,6 +60,7 @@ void DatabaseMaterializedPostgreSQL::startSynchronization()
database_name,
connection_info,
getContext(),
is_attach,
settings->materialized_postgresql_max_block_size.value,
settings->materialized_postgresql_allow_automatic_update,
/* is_materialized_postgresql_database = */ true,

View File

@ -33,6 +33,7 @@ public:
const String & metadata_path_,
UUID uuid_,
const ASTStorage * database_engine_define_,
bool is_attach_,
const String & database_name_,
const String & postgres_database_name,
const postgres::ConnectionInfo & connection_info,
@ -63,6 +64,7 @@ private:
void startSynchronization();
ASTPtr database_engine_define;
bool is_attach;
String remote_database_name;
postgres::ConnectionInfo connection_info;
std::unique_ptr<MaterializedPostgreSQLSettings> settings;

View File

@ -29,12 +29,14 @@ PostgreSQLReplicationHandler::PostgreSQLReplicationHandler(
const String & current_database_name_,
const postgres::ConnectionInfo & connection_info_,
ContextPtr context_,
bool is_attach_,
const size_t max_block_size_,
bool allow_automatic_update_,
bool is_materialized_postgresql_database_,
const String tables_list_)
: log(&Poco::Logger::get("PostgreSQLReplicationHandler"))
, context(context_)
, is_attach(is_attach_)
, remote_database_name(remote_database_name_)
, current_database_name(current_database_name_)
, connection_info(connection_info_)
@ -145,10 +147,8 @@ void PostgreSQLReplicationHandler::startSynchronization(bool throw_on_error)
{
initial_sync();
}
/// Replication slot depends on publication, so if replication slot exists and new
/// publication was just created - drop that replication slot and start from scratch.
/// TODO: tests
else if (new_publication_created)
/// Always drop replication slot if it is CREATE query and not ATTACH.
else if (!is_attach || new_publication)
{
dropReplicationSlot(tx);
initial_sync();
@ -285,22 +285,25 @@ bool PostgreSQLReplicationHandler::isPublicationExist(pqxx::work & tx)
std::string query_str = fmt::format("SELECT exists (SELECT 1 FROM pg_publication WHERE pubname = '{}')", publication_name);
pqxx::result result{tx.exec(query_str)};
assert(!result.empty());
bool publication_exists = (result[0][0].as<std::string>() == "t");
if (publication_exists)
LOG_INFO(log, "Publication {} already exists. Using existing version", publication_name);
return publication_exists;
return result[0][0].as<std::string>() == "t";
}
void PostgreSQLReplicationHandler::createPublicationIfNeeded(pqxx::work & tx, bool create_without_check)
void PostgreSQLReplicationHandler::createPublicationIfNeeded(pqxx::work & tx)
{
/// For database engine a publication can be created earlier than in startReplication().
if (new_publication_created)
return;
auto publication_exists = isPublicationExist(tx);
if (create_without_check || !isPublicationExist(tx))
if (!is_attach && publication_exists)
{
/// This is a case for single Materialized storage. In case of database engine this check is done in advance.
LOG_WARNING(log,
"Publication {} already exists, but it is a CREATE query, not ATTACH. Publication will be dropped",
publication_name);
connection->execWithRetry([&](pqxx::nontransaction & tx_){ dropPublication(tx_); });
}
if (!is_attach || !publication_exists)
{
if (tables_list.empty())
{
@ -320,8 +323,8 @@ void PostgreSQLReplicationHandler::createPublicationIfNeeded(pqxx::work & tx, bo
try
{
tx.exec(query_str);
new_publication_created = true;
LOG_TRACE(log, "Created publication {} with tables list: {}", publication_name, tables_list);
new_publication = true;
}
catch (Exception & e)
{
@ -329,6 +332,10 @@ void PostgreSQLReplicationHandler::createPublicationIfNeeded(pqxx::work & tx, bo
throw;
}
}
else
{
LOG_TRACE(log, "Using existing publication ({}) version", publication_name);
}
}
@ -401,6 +408,7 @@ void PostgreSQLReplicationHandler::dropPublication(pqxx::nontransaction & tx)
{
std::string query_str = fmt::format("DROP PUBLICATION IF EXISTS {}", publication_name);
tx.exec(query_str);
LOG_TRACE(log, "Dropped publication: {}", publication_name);
}
@ -438,9 +446,11 @@ void PostgreSQLReplicationHandler::shutdownFinal()
NameSet PostgreSQLReplicationHandler::fetchRequiredTables(postgres::Connection & connection_)
{
pqxx::work tx(connection_.getRef());
bool publication_exists_before_startup = isPublicationExist(tx);
NameSet result_tables;
bool publication_exists_before_startup = isPublicationExist(tx);
LOG_DEBUG(log, "Publication exists: {}, is attach: {}", publication_exists_before_startup, is_attach);
Strings expected_tables;
if (!tables_list.empty())
{
@ -453,49 +463,58 @@ NameSet PostgreSQLReplicationHandler::fetchRequiredTables(postgres::Connection &
if (publication_exists_before_startup)
{
if (tables_list.empty())
if (!is_attach)
{
/// There is no tables list, but publication already exists, then the expected behaviour
/// is to replicate the whole database. But it could be a server restart, so we can't drop it.
LOG_WARNING(log,
"Publication {} already exists and tables list is empty. Assuming publication is correct",
"Publication {} already exists, but it is a CREATE query, not ATTACH. Publication will be dropped",
publication_name);
result_tables = fetchPostgreSQLTablesList(tx);
connection->execWithRetry([&](pqxx::nontransaction & tx_){ dropPublication(tx_); });
}
/// Check tables list from publication is the same as expected tables list.
/// If not - drop publication and return expected tables list.
else
{
result_tables = fetchTablesFromPublication(tx);
NameSet diff;
std::set_symmetric_difference(expected_tables.begin(), expected_tables.end(),
result_tables.begin(), result_tables.end(),
std::inserter(diff, diff.begin()));
if (!diff.empty())
if (tables_list.empty())
{
String diff_tables;
for (const auto & table_name : diff)
{
if (!diff_tables.empty())
diff_tables += ", ";
diff_tables += table_name;
}
LOG_WARNING(log,
"Publication {} already exists, but specified tables list differs from publication tables list in tables: {}",
publication_name, diff_tables);
"Publication {} already exists and tables list is empty. Assuming publication is correct.",
publication_name);
connection->execWithRetry([&](pqxx::nontransaction & tx_){ dropPublication(tx_); });
result_tables = fetchPostgreSQLTablesList(tx);
}
/// Check tables list from publication is the same as expected tables list.
/// If not - drop publication and return expected tables list.
else
{
result_tables = fetchTablesFromPublication(tx);
NameSet diff;
std::set_symmetric_difference(expected_tables.begin(), expected_tables.end(),
result_tables.begin(), result_tables.end(),
std::inserter(diff, diff.begin()));
if (!diff.empty())
{
String diff_tables;
for (const auto & table_name : diff)
{
if (!diff_tables.empty())
diff_tables += ", ";
diff_tables += table_name;
}
LOG_WARNING(log,
"Publication {} already exists, but specified tables list differs from publication tables list in tables: {}.",
publication_name, diff_tables);
connection->execWithRetry([&](pqxx::nontransaction & tx_){ dropPublication(tx_); });
}
}
}
}
else
if (result_tables.empty())
{
if (!tables_list.empty())
{
tx.commit();
return NameSet(expected_tables.begin(), expected_tables.end());
result_tables = NameSet(expected_tables.begin(), expected_tables.end());
}
else
{

View File

@ -24,6 +24,7 @@ public:
const String & current_database_name_,
const postgres::ConnectionInfo & connection_info_,
ContextPtr context_,
bool is_attach_,
const size_t max_block_size_,
bool allow_automatic_update_,
bool is_materialized_postgresql_database_,
@ -54,7 +55,7 @@ private:
bool isPublicationExist(pqxx::work & tx);
void createPublicationIfNeeded(pqxx::work & tx, bool create_without_check = false);
void createPublicationIfNeeded(pqxx::work & tx);
NameSet fetchTablesFromPublication(pqxx::work & tx);
@ -83,6 +84,12 @@ private:
Poco::Logger * log;
ContextPtr context;
/// If it is not attach, i.e. a create query, then if publication already exists - always drop it.
bool is_attach;
/// If new publication is created at start up - always drop replication slot if it exists.
bool new_publication = false;
const String remote_database_name, current_database_name;
/// Connection string and address for logs.
@ -113,11 +120,6 @@ private:
std::atomic<bool> stop_synchronization = false;
/// For database engine there are 2 places where it is checked for publication:
/// 1. to fetch tables list from already created publication when database is loaded
/// 2. at replication startup
bool new_publication_created = false;
/// MaterializedPostgreSQL tables. Used for managing all operations with its internal nested tables.
MaterializedStorages materialized_storages;

View File

@ -70,6 +70,7 @@ StorageMaterializedPostgreSQL::StorageMaterializedPostgreSQL(
table_id_.database_name,
connection_info,
getContext(),
is_attach,
replication_settings->materialized_postgresql_max_block_size.value,
/* allow_automatic_update */ false, /* is_materialized_postgresql_database */false);
}

View File

@ -1068,7 +1068,7 @@ class ClickHouseCluster:
logging.error("Can't connect to MySQL:{}".format(errors))
raise Exception("Cannot wait MySQL container")
def wait_postgres_to_start(self, timeout=180):
def wait_postgres_to_start(self, timeout=260):
self.postgres_ip = self.get_instance_ip(self.postgres_host)
start = time.time()
while time.time() - start < timeout: