Some fixes

This commit is contained in:
kssenii 2021-05-10 13:51:05 +00:00
parent 65c574db93
commit 626e87bae5
6 changed files with 46 additions and 28 deletions

View File

@ -3,6 +3,7 @@
#include <pqxx/pqxx> // Y_IGNORE
#include <Core/Types.h>
#include "Connection.h"
#include <Common/Exception.h>
namespace pqxx
{
@ -24,7 +25,17 @@ class Transaction
public:
Transaction(pqxx::connection & connection) : transaction(connection) {}
~Transaction() { transaction.commit(); }
~Transaction()
{
try
{
transaction.commit();
}
catch (...)
{
DB::tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
T & getRef() { return transaction; }

View File

@ -92,13 +92,6 @@ void DatabaseMaterializePostgreSQL::startSynchronization()
}
void DatabaseMaterializePostgreSQL::shutdown()
{
if (replication_handler)
replication_handler->shutdown();
}
void DatabaseMaterializePostgreSQL::loadStoredObjects(ContextPtr local_context, bool has_force_restore_data_flag, bool force_attach)
{
DatabaseAtomic::loadStoredObjects(local_context, has_force_restore_data_flag, force_attach);

View File

@ -55,8 +55,6 @@ public:
void drop(ContextPtr local_context) override;
void shutdown() override;
void stopReplication();
private:

View File

@ -136,7 +136,7 @@ void PostgreSQLReplicationHandler::startSynchronization(bool throw_on_error)
/// There is one replication slot for each replication handler. In case of MaterializePostgreSQL database engine,
/// there is one replication slot per database. Its lifetime must be equal to the lifetime of replication handler.
/// Recreation of a replication slot imposes reloading of all tables.
if (!isReplicationSlotExist(tx.getRef(), replication_slot, start_lsn))
if (!isReplicationSlotExist(tx.getRef(), start_lsn, /* temporary */false))
{
initial_sync();
}
@ -322,9 +322,15 @@ void PostgreSQLReplicationHandler::createPublicationIfNeeded(pqxx::work & tx, bo
}
bool PostgreSQLReplicationHandler::isReplicationSlotExist(pqxx::nontransaction & tx, String & slot_name, String & start_lsn)
bool PostgreSQLReplicationHandler::isReplicationSlotExist(pqxx::nontransaction & tx, String & start_lsn, bool temporary)
{
std::string query_str = fmt::format("SELECT active, restart_lsn, confirmed_flush_lsn FROM pg_replication_slots WHERE slot_name = '{}'", slot_name);
String slot_name;
if (temporary)
slot_name = replication_slot + "_tmp";
else
slot_name = replication_slot;
String query_str = fmt::format("SELECT active, restart_lsn, confirmed_flush_lsn FROM pg_replication_slots WHERE slot_name = '{}'", slot_name);
pqxx::result result{tx.exec(query_str)};
/// Replication slot does not exist
@ -343,9 +349,7 @@ bool PostgreSQLReplicationHandler::isReplicationSlotExist(pqxx::nontransaction &
void PostgreSQLReplicationHandler::createReplicationSlot(
pqxx::nontransaction & tx, String & start_lsn, String & snapshot_name, bool temporary)
{
std::string query_str;
std::string slot_name;
String query_str, slot_name;
if (temporary)
slot_name = replication_slot + "_tmp";
else
@ -395,8 +399,10 @@ void PostgreSQLReplicationHandler::shutdownFinal()
postgres::Transaction<pqxx::nontransaction> tx(connection->getRef());
dropPublication(tx.getRef());
String last_committed_lsn;
if (isReplicationSlotExist(tx.getRef(), replication_slot, last_committed_lsn))
dropReplicationSlot(tx.getRef());
if (isReplicationSlotExist(tx.getRef(), last_committed_lsn, /* temporary */false))
dropReplicationSlot(tx.getRef(), /* temporary */false);
if (isReplicationSlotExist(tx.getRef(), last_committed_lsn, /* temporary */true))
dropReplicationSlot(tx.getRef(), /* temporary */true);
}
@ -453,8 +459,10 @@ void PostgreSQLReplicationHandler::reloadFromSnapshot(const std::vector<std::pai
postgres::Connection replication_connection(connection_info, /* replication */true);
postgres::Transaction<pqxx::nontransaction> tx(replication_connection.getRef());
std::string snapshot_name, start_lsn;
createReplicationSlot(tx.getRef(), start_lsn, snapshot_name, true);
String snapshot_name, start_lsn;
if (isReplicationSlotExist(tx.getRef(), start_lsn, /* temporary */true))
dropReplicationSlot(tx.getRef(), /* temporary */true);
createReplicationSlot(tx.getRef(), start_lsn, snapshot_name, /* temporary */true);
for (const auto & [relation_id, table_name] : relation_data)
{

View File

@ -61,7 +61,7 @@ private:
/// Methods to manage Replication Slots.
bool isReplicationSlotExist(pqxx::nontransaction & tx, String & slot_name, String & start_lsn);
bool isReplicationSlotExist(pqxx::nontransaction & tx, String & start_lsn, bool temporary = false);
void createReplicationSlot(pqxx::nontransaction & tx, String & start_lsn, String & snapshot_name, bool temporary = false);

View File

@ -443,16 +443,16 @@ def test_random_queries(started_cluster):
n = [10000]
query = ['DELETE FROM postgresql_replica_{} WHERE (value*value) % 3 = 0;',
'UPDATE postgresql_replica_{} SET value = value - 125 WHERE key > 6000;',
'UPDATE postgresql_replica_{} SET value = value - 125 WHERE key % 2 = 0;',
'DELETE FROM postgresql_replica_{} WHERE key % 10 = 0;',
'UPDATE postgresql_replica_{} SET value = value*value WHERE key < 5000;',
'UPDATE postgresql_replica_{} SET value = value*value WHERE key % 2 = 1;',
'DELETE FROM postgresql_replica_{} WHERE value % 2 = 0;',
'UPDATE postgresql_replica_{} SET value = value + 2000 WHERE key < 5000;',
'UPDATE postgresql_replica_{} SET value = value + 2000 WHERE key % 5 = 0;',
'DELETE FROM postgresql_replica_{} WHERE value % 3 = 0;',
'UPDATE postgresql_replica_{} SET value = value * 2 WHERE key % 3 == 0;',
'UPDATE postgresql_replica_{} SET value = value * 2 WHERE key % 3 = 0;',
'DELETE FROM postgresql_replica_{} WHERE value % 9 = 2;',
'UPDATE postgresql_replica_{} SET value = value + 2 WHERE key >= 5000;',
'DELETE FROM postgresql_replica_{} WHERE value-3 = 3;']
'UPDATE postgresql_replica_{} SET value = value + 2 WHERE key % 3 = 1;',
'DELETE FROM postgresql_replica_{} WHERE value%5 = 0;']
def attack(thread_id):
print('thread {}'.format(thread_id))
@ -467,10 +467,17 @@ def test_random_queries(started_cluster):
# allow some thread to do inserts (not to violate key constraints)
if thread_id < 5:
instance.query('INSERT INTO postgres_database.postgresql_replica_{} SELECT {} + number, number from numbers(1000)'.format(thread_id, k))
print("try insert table {}".format(thread_id))
instance.query('INSERT INTO postgres_database.postgresql_replica_{} SELECT {}*10000*({} + number), number from numbers(1000)'.format(i, thread_id, k))
k += 1
print("insert table {} ok".format(thread_id))
if i == 5:
# also change primary key value
print("try update primary key {}".format(thread_id))
cursor.execute("UPDATE postgresql_replica_{} SET key=key%100000+100000*{} WHERE key%{}=0".format(thread_id, i+1, i+1))
print("update primary key {} ok".format(thread_id))
threads = []
threads_num = 16
@ -487,6 +494,7 @@ def test_random_queries(started_cluster):
for table_id in range(NUM_TABLES):
n[0] += 1
instance.query('INSERT INTO postgres_database.postgresql_replica_{} SELECT {} + number, number from numbers(5000)'.format(table_id, n[0]))
#cursor.execute("UPDATE postgresql_replica_{} SET key=key%100000+100000*{} WHERE key%{}=0".format(table_id, table_id+1, table_id+1))
for thread in threads:
thread.join()