More tests

This commit is contained in:
kssenii 2021-06-29 23:11:46 +00:00
parent 9d5847a1a8
commit da70f85d4e
6 changed files with 235 additions and 25 deletions

View File

@ -6,6 +6,7 @@ namespace postgres
Connection::Connection(const ConnectionInfo & connection_info_, bool replication_, size_t num_tries_)
: connection_info(connection_info_), replication(replication_), num_tries(num_tries_)
, log(&Poco::Logger::get("PostgreSQLReplicaConnection"))
{
if (replication)
{
@ -25,8 +26,7 @@ void Connection::execWithRetry(const std::function<void(pqxx::nontransaction &)>
}
catch (const pqxx::broken_connection & e)
{
LOG_DEBUG(&Poco::Logger::get("PostgreSQLReplicaConnection"),
"Cannot execute query due to connection failure, attempt: {}/{}. (Message: {})",
LOG_DEBUG(log, "Cannot execute query due to connection failure, attempt: {}/{}. (Message: {})",
try_no, num_tries, e.what());
if (try_no == num_tries)
@ -38,18 +38,36 @@ void Connection::execWithRetry(const std::function<void(pqxx::nontransaction &)>
pqxx::connection & Connection::getRef()
{
connect();
assert(connection != nulptr);
return *connection;
}
void Connection::tryUpdateConnection()
{
try
{
updateConnection();
}
catch (const pqxx::broken_connection & e)
{
LOG_ERROR(log, "Unable to update connection: {}", e.what());
}
}
void Connection::updateConnection()
{
if (connection)
connection->close();
/// Always throws if there is no connection.
connection = std::make_unique<pqxx::connection>(connection_info.first);
if (replication)
connection->set_variable("default_transaction_isolation", "'repeatable read'");
LOG_DEBUG(&Poco::Logger::get("PostgreSQLConnection"), "New connection to {}", connection_info.second);
}
void Connection::connect()
{
if (!connection || !connection->is_open())
{
/// Always throws if there is no connection.
connection = std::make_unique<pqxx::connection>(connection_info.first);
if (replication)
connection->set_variable("default_transaction_isolation", "'repeatable read'");
LOG_DEBUG(&Poco::Logger::get("PostgreSQLConnection"), "New connection to {}", connection_info.second);
}
updateConnection();
}
}

View File

@ -4,6 +4,14 @@
#include <Core/Types.h>
#include <boost/noncopyable.hpp>
/* Methods to work with PostgreSQL connection object.
* Should only be used in case there has to be a single connection object, which
* is long-lived and there are no concurrent connection queries.
* Now only use case - for replication handler for replication from PostgreSQL.
* In all other integration engine use pool with failover.
**/
namespace Poco { class Logger; }
namespace postgres
{
@ -21,13 +29,19 @@ public:
void connect();
void tryUpdateConnection();
const ConnectionInfo & getConnectionInfo() { return connection_info; }
private:
void updateConnection();
ConnectionPtr connection;
ConnectionInfo connection_info;
bool replication;
size_t num_tries;
Poco::Logger * log;
};
}

View File

@ -455,7 +455,7 @@ void MaterializedPostgreSQLConsumer::processReplicationMessage(const char * repl
}
void MaterializedPostgreSQLConsumer::syncTables(std::shared_ptr<pqxx::nontransaction> tx)
void MaterializedPostgreSQLConsumer::syncTables()
{
try
{
@ -488,6 +488,7 @@ void MaterializedPostgreSQLConsumer::syncTables(std::shared_ptr<pqxx::nontransac
LOG_DEBUG(log, "Table sync end for {} tables, last lsn: {} = {}, (attempted lsn {})", tables_to_sync.size(), current_lsn, getLSNValue(current_lsn), getLSNValue(final_lsn));
auto tx = std::make_shared<pqxx::nontransaction>(connection->getRef());
current_lsn = advanceLSN(tx);
tables_to_sync.clear();
tx->commit();
@ -569,12 +570,11 @@ void MaterializedPostgreSQLConsumer::markTableAsSkipped(Int32 relation_id, const
/// Read binary changes from replication slot via COPY command (starting from current lsn in a slot).
bool MaterializedPostgreSQLConsumer::readFromReplicationSlot()
{
std::shared_ptr<pqxx::nontransaction> tx;
bool slot_empty = true;
try
{
tx = std::make_shared<pqxx::nontransaction>(connection->getRef());
auto tx = std::make_shared<pqxx::nontransaction>(connection->getRef());
/// Read up to max_block_size rows changes (upto_n_changes parameter). It might return larger number as the limit
/// is checked only after each transaction block.
@ -611,6 +611,17 @@ bool MaterializedPostgreSQLConsumer::readFromReplicationSlot()
processReplicationMessage((*row)[1].c_str(), (*row)[1].size());
}
}
catch (const Exception &)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
return false;
}
catch (const pqxx::broken_connection & e)
{
LOG_ERROR(log, "Connection error: {}", e.what());
connection->tryUpdateConnection();
return false;
}
catch (const pqxx::sql_error & e)
{
/// For now sql replication interface is used and it has the problem that it registers relcache
@ -628,14 +639,24 @@ bool MaterializedPostgreSQLConsumer::readFromReplicationSlot()
LOG_ERROR(log, "Conversion error: {}", e.what());
return false;
}
catch (const pqxx::broken_connection & e)
catch (const pqxx::statement_completion_unknown & e)
{
LOG_ERROR(log, "Connection error: {}", e.what());
LOG_ERROR(log, "Unknown statement completion: {}", e.what());
return false;
}
catch (const Exception &)
catch (const pqxx::in_doubt_error & e)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
LOG_ERROR(log, "PostgreSQL library has some doubts: {}", e.what());
return false;
}
catch (const pqxx::internal_error & e)
{
LOG_ERROR(log, "PostgreSQL library internal error: {}", e.what());
return false;
}
catch (const pqxx::conversion_overrun & e)
{
LOG_ERROR(log, "PostgreSQL library conversion overflow: {}", e.what());
return false;
}
catch (...)
@ -653,9 +674,7 @@ bool MaterializedPostgreSQLConsumer::readFromReplicationSlot()
}
if (!tables_to_sync.empty())
{
syncTables(tx);
}
syncTables();
return true;
}

View File

@ -39,7 +39,7 @@ private:
/// Read approximarely up to max_block_size changes from WAL.
bool readFromReplicationSlot();
void syncTables(std::shared_ptr<pqxx::nontransaction> tx);
void syncTables();
String advanceLSN(std::shared_ptr<pqxx::nontransaction> ntx);

View File

@ -734,6 +734,170 @@ def test_concurrent_transactions(started_cluster):
drop_materialized_db()
@pytest.mark.timeout(320)
def test_abrupt_connection_loss_while_heavy_replication(started_cluster):
instance.query("DROP DATABASE IF EXISTS test_database")
conn = get_postgres_conn(ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port,
database=True)
cursor = conn.cursor()
NUM_TABLES = 6
for i in range(NUM_TABLES):
create_postgres_table(cursor, 'postgresql_replica_{}'.format(i));
def transaction(thread_id):
if thread_id % 2:
conn = get_postgres_conn(ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port,
database=True, auto_commit=True)
else:
conn = get_postgres_conn(ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port,
database=True, auto_commit=False)
cursor_ = conn.cursor()
for query in queries:
cursor_.execute(query.format(thread_id))
print('thread {}, query {}'.format(thread_id, query))
if thread_id % 2 == 0:
conn.commit()
threads = []
threads_num = 6
for i in range(threads_num):
threads.append(threading.Thread(target=transaction, args=(i,)))
create_materialized_db(ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port)
for thread in threads:
time.sleep(random.uniform(0, 0.5))
thread.start()
# Join here because it takes time for data to reach wal
for thread in threads:
thread.join()
time.sleep(1)
started_cluster.pause_container('postgres1')
for i in range(NUM_TABLES):
result = instance.query("SELECT count() FROM test_database.postgresql_replica_{}".format(i))
print(result) # Just debug
started_cluster.unpause_container('postgres1')
for i in range(NUM_TABLES):
check_tables_are_synchronized('postgresql_replica_{}'.format(i));
for i in range(NUM_TABLES):
result = instance.query("SELECT count() FROM test_database.postgresql_replica_{}".format(i))
print(result) # Just debug
drop_materialized_db()
def test_drop_database_while_replication_startup_not_finished(started_cluster):
instance.query("DROP DATABASE IF EXISTS test_database")
conn = get_postgres_conn(ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port,
database=True)
cursor = conn.cursor()
NUM_TABLES = 5
for i in range(NUM_TABLES):
table_name = 'postgresql_replica_{}'.format(i)
create_postgres_table(cursor, table_name);
instance.query("INSERT INTO postgres_database.{} SELECT number, number from numbers(100000)".format(table_name))
for i in range(6):
create_materialized_db(ip=started_cluster.postgres_ip, port=started_cluster.postgres_port)
time.sleep(0.5 * i)
drop_materialized_db()
def test_restart_server_while_replication_startup_not_finished(started_cluster):
instance.query("DROP DATABASE IF EXISTS test_database")
conn = get_postgres_conn(ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port,
database=True)
cursor = conn.cursor()
NUM_TABLES = 5
for i in range(NUM_TABLES):
table_name = 'postgresql_replica_{}'.format(i)
create_postgres_table(cursor, table_name);
instance.query("INSERT INTO postgres_database.{} SELECT number, number from numbers(100000)".format(table_name))
create_materialized_db(ip=started_cluster.postgres_ip, port=started_cluster.postgres_port)
time.sleep(0.5)
instance.restart_clickhouse()
for i in range(NUM_TABLES):
check_tables_are_synchronized('postgresql_replica_{}'.format(i));
drop_materialized_db()
# Something not ok with this test, need to investigate.
@pytest.mark.timeout(320)
def test_abrupt_server_restart_while_heavy_replication(started_cluster):
return
# instance.query("DROP DATABASE IF EXISTS test_database")
# conn = get_postgres_conn(ip=started_cluster.postgres_ip,
# port=started_cluster.postgres_port,
# database=True)
# cursor = conn.cursor()
# NUM_TABLES = 6
#
# for i in range(NUM_TABLES):
# create_postgres_table(cursor, 'postgresql_replica_{}'.format(i));
#
# def transaction(thread_id):
# if thread_id % 2:
# conn = get_postgres_conn(ip=started_cluster.postgres_ip,
# port=started_cluster.postgres_port,
# database=True, auto_commit=True)
# else:
# conn = get_postgres_conn(ip=started_cluster.postgres_ip,
# port=started_cluster.postgres_port,
# database=True, auto_commit=False)
# cursor_ = conn.cursor()
# for query in queries:
# cursor_.execute(query.format(thread_id))
# print('thread {}, query {}'.format(thread_id, query))
# if thread_id % 2 == 0:
# conn.commit()
#
# threads = []
# threads_num = 6
# for i in range(threads_num):
# threads.append(threading.Thread(target=transaction, args=(i,)))
#
# create_materialized_db(ip=started_cluster.postgres_ip,
# port=started_cluster.postgres_port)
#
# for thread in threads:
# time.sleep(random.uniform(0, 0.5))
# thread.start()
#
# # Join here because it takes time for data to reach wal
# for thread in threads:
# thread.join()
# time.sleep(1)
# instance.restart_clickhouse()
#
# for i in range(NUM_TABLES):
# result = instance.query("SELECT count() FROM test_database.postgresql_replica_{}".format(i))
# print(result) # Just debug
#
# for i in range(NUM_TABLES):
# check_tables_are_synchronized('postgresql_replica_{}'.format(i));
#
# for i in range(NUM_TABLES):
# result = instance.query("SELECT count() FROM test_database.postgresql_replica_{}".format(i))
# print(result) # Just debug
#
# drop_materialized_db()
if __name__ == '__main__':
cluster.start()
input("Cluster created, press any key to destroy...")

View File

@ -77,11 +77,6 @@ def started_cluster():
finally:
cluster.shutdown()
@pytest.fixture(autouse=True)
def postgresql_setup_teardown():
yield # run test
instance.query('DROP TABLE IF EXISTS test.postgresql_replica')
@pytest.mark.timeout(320)
def test_initial_load_from_snapshot(started_cluster):