Add replication identifier, allow proper access to _version and _sign columns from nested

This commit is contained in:
kssenii 2021-05-12 22:47:41 +00:00
parent 407db17e2e
commit 7c81103ea3
5 changed files with 86 additions and 32 deletions

View File

@ -52,6 +52,7 @@ DatabaseMaterializePostgreSQL::DatabaseMaterializePostgreSQL(
void DatabaseMaterializePostgreSQL::startSynchronization()
{
replication_handler = std::make_unique<PostgreSQLReplicationHandler>(
/* replication_identifier */database_name,
remote_database_name,
database_name,
connection_info,

View File

@ -22,6 +22,7 @@ namespace ErrorCodes
}
PostgreSQLReplicationHandler::PostgreSQLReplicationHandler(
const String & replication_identifier,
const String & remote_database_name_,
const String & current_database_name_,
const postgres::ConnectionInfo & connection_info_,
@ -41,8 +42,8 @@ PostgreSQLReplicationHandler::PostgreSQLReplicationHandler(
, tables_list(tables_list_)
, connection(std::make_shared<postgres::Connection>(connection_info_))
{
replication_slot = fmt::format("{}_ch_replication_slot", current_database_name);
publication_name = fmt::format("{}_ch_publication", current_database_name);
replication_slot = fmt::format("{}_ch_replication_slot", replication_identifier);
publication_name = fmt::format("{}_ch_publication", replication_identifier);
startup_task = context->getSchedulePool().createTask("PostgreSQLReplicaStartup", [this]{ waitConnectionAndStart(); });
consumer_task = context->getSchedulePool().createTask("PostgreSQLReplicaStartup", [this]{ consumerFunc(); });
@ -402,11 +403,20 @@ void PostgreSQLReplicationHandler::shutdownFinal()
pqxx::nontransaction tx(connection->getRef());
dropPublication(tx);
String last_committed_lsn;
if (isReplicationSlotExist(tx, last_committed_lsn, /* temporary */false))
dropReplicationSlot(tx, /* temporary */false);
if (isReplicationSlotExist(tx, last_committed_lsn, /* temporary */true))
dropReplicationSlot(tx, /* temporary */true);
tx.commit();
try
{
if (isReplicationSlotExist(tx, last_committed_lsn, /* temporary */false))
dropReplicationSlot(tx, /* temporary */false);
if (isReplicationSlotExist(tx, last_committed_lsn, /* temporary */true))
dropReplicationSlot(tx, /* temporary */true);
tx.commit();
}
catch (Exception & e)
{
e.addMessage("while dropping replication slot: {}", replication_slot);
LOG_ERROR(log, "Failed to drop replication slot: {}. It must be dropped manually.", replication_slot);
throw;
}
}

View File

@ -19,6 +19,7 @@ class PostgreSQLReplicationHandler
{
public:
PostgreSQLReplicationHandler(
const String & replication_identifier,
const String & remote_database_name_,
const String & current_database_name_,
const postgres::ConnectionInfo & connection_info_,

View File

@ -63,7 +63,9 @@ StorageMaterializePostgreSQL::StorageMaterializePostgreSQL(
setInMemoryMetadata(storage_metadata);
String replication_identifier = remote_database_name + "_" + remote_table_name_;
replication_handler = std::make_unique<PostgreSQLReplicationHandler>(
replication_identifier,
remote_database_name,
table_id_.database_name,
connection_info,
@ -351,11 +353,7 @@ ASTPtr StorageMaterializePostgreSQL::getCreateNestedTableQuery(PostgreSQLTableSt
"No columns returned for table {}.{}", table_id.database_name, table_id.table_name);
}
StorageInMemoryMetadata storage_metadata;
ordinary_columns_and_types = *table_structure->columns;
storage_metadata.setColumns(ColumnsDescription(ordinary_columns_and_types));
setInMemoryMetadata(storage_metadata);
if (!table_structure->primary_key_columns && !table_structure->replica_identity_columns)
{
@ -406,6 +404,17 @@ ASTPtr StorageMaterializePostgreSQL::getCreateNestedTableQuery(PostgreSQLTableSt
create_table_query->set(create_table_query->storage, storage);
/// Add columns _sign and _version, so that they can be accessed from nested ReplacingMergeTree table if needed.
/// TODO: add test for case of database engine, test same case after table reload.
ordinary_columns_and_types.push_back({"_sign", std::make_shared<DataTypeInt8>()});
ordinary_columns_and_types.push_back({"_version", std::make_shared<DataTypeUInt64>()});
StorageInMemoryMetadata metadata;
metadata.setColumns(ColumnsDescription(ordinary_columns_and_types));
metadata.setConstraints(metadata_snapshot->getConstraints());
setInMemoryMetadata(metadata);
return create_table_query;
}

View File

@ -77,8 +77,9 @@ def test_initial_load_from_snapshot(started_cluster):
create_postgres_table(cursor, 'postgresql_replica');
instance.query("INSERT INTO postgres_database.postgresql_replica SELECT number, number from numbers(50)")
instance.query('DROP TABLE IF EXISTS test.postgresql_replica')
instance.query('''
CREATE TABLE test.postgresql_replica (key UInt64, value UInt64, _sign Int8 MATERIALIZED 1, _version UInt64 MATERIALIZED 1)
CREATE TABLE test.postgresql_replica (key UInt64, value UInt64)
ENGINE = MaterializePostgreSQL(
'postgres1:5432', 'postgres_database', 'postgresql_replica', 'postgres', 'mysecretpassword')
PRIMARY KEY key;
@ -100,8 +101,9 @@ def test_no_connection_at_startup(started_cluster):
create_postgres_table(cursor, 'postgresql_replica');
instance.query("INSERT INTO postgres_database.postgresql_replica SELECT number, number from numbers(50)")
instance.query('DROP TABLE IF EXISTS test.postgresql_replica')
instance.query('''
CREATE TABLE test.postgresql_replica (key UInt64, value UInt64, _sign Int8 MATERIALIZED 1, _version UInt64 MATERIALIZED 1)
CREATE TABLE test.postgresql_replica (key UInt64, value UInt64)
ENGINE = MaterializePostgreSQL(
'postgres1:5432', 'postgres_database', 'postgresql_replica', 'postgres', 'mysecretpassword')
PRIMARY KEY key;
@ -132,8 +134,9 @@ def test_detach_attach_is_ok(started_cluster):
create_postgres_table(cursor, 'postgresql_replica');
instance.query("INSERT INTO postgres_database.postgresql_replica SELECT number, number from numbers(50)")
instance.query('DROP TABLE IF EXISTS test.postgresql_replica')
instance.query('''
CREATE TABLE test.postgresql_replica (key UInt64, value UInt64, _sign Int8 MATERIALIZED 1, _version UInt64 MATERIALIZED 1)
CREATE TABLE test.postgresql_replica (key UInt64, value UInt64)
ENGINE = MaterializePostgreSQL(
'postgres1:5432', 'postgres_database', 'postgresql_replica', 'postgres', 'mysecretpassword')
PRIMARY KEY key;
@ -167,8 +170,9 @@ def test_replicating_insert_queries(started_cluster):
instance.query("INSERT INTO postgres_database.postgresql_replica SELECT number, number from numbers(10)")
instance.query('DROP TABLE IF EXISTS test.postgresql_replica')
instance.query('''
CREATE TABLE test.postgresql_replica (key UInt64, value UInt64, _sign Int8 MATERIALIZED 1, _version UInt64 MATERIALIZED 1)
CREATE TABLE test.postgresql_replica (key UInt64, value UInt64)
ENGINE = MaterializePostgreSQL(
'postgres1:5432', 'postgres_database', 'postgresql_replica', 'postgres', 'mysecretpassword')
PRIMARY KEY key;
@ -208,8 +212,9 @@ def test_replicating_delete_queries(started_cluster):
instance.query("INSERT INTO postgres_database.postgresql_replica SELECT number, number from numbers(50)")
instance.query('DROP TABLE IF EXISTS test.postgresql_replica')
instance.query('''
CREATE TABLE test.postgresql_replica (key UInt64, value UInt64, _sign Int8 MATERIALIZED 1, _version UInt64 MATERIALIZED 1)
CREATE TABLE test.postgresql_replica (key UInt64, value UInt64)
ENGINE = MaterializePostgreSQL(
'postgres1:5432', 'postgres_database', 'postgresql_replica', 'postgres', 'mysecretpassword')
PRIMARY KEY key;
@ -246,8 +251,9 @@ def test_replicating_update_queries(started_cluster):
instance.query("INSERT INTO postgres_database.postgresql_replica SELECT number, number + 10 from numbers(50)")
instance.query('DROP TABLE IF EXISTS test.postgresql_replica')
instance.query('''
CREATE TABLE test.postgresql_replica (key UInt64, value UInt64, _sign Int8 MATERIALIZED 1, _version UInt64 MATERIALIZED 1)
CREATE TABLE test.postgresql_replica (key UInt64, value UInt64)
ENGINE = MaterializePostgreSQL(
'postgres1:5432', 'postgres_database', 'postgresql_replica', 'postgres', 'mysecretpassword')
PRIMARY KEY key;
@ -276,8 +282,9 @@ def test_resume_from_written_version(started_cluster):
create_postgres_table(cursor, 'postgresql_replica');
instance.query("INSERT INTO postgres_database.postgresql_replica SELECT number, number + 10 from numbers(50)")
instance.query('DROP TABLE IF EXISTS test.postgresql_replica')
instance.query('''
CREATE TABLE test.postgresql_replica (key UInt64, value UInt64, _sign Int8 MATERIALIZED 1, _version UInt64 MATERIALIZED 1)
CREATE TABLE test.postgresql_replica (key UInt64, value UInt64)
ENGINE = MaterializePostgreSQL(
'postgres1:5432', 'postgres_database', 'postgresql_replica', 'postgres', 'mysecretpassword')
PRIMARY KEY key;
@ -318,12 +325,9 @@ def test_many_replication_messages(started_cluster):
create_postgres_table(cursor, 'postgresql_replica');
instance.query("INSERT INTO postgres_database.postgresql_replica SELECT number, number from numbers(100000)")
instance.query('DROP TABLE IF EXISTS test.postgresql_replica')
instance.query('''
CREATE TABLE test.postgresql_replica (
key UInt64, value UInt64,
_sign Int8 MATERIALIZED 1,
_version UInt64 MATERIALIZED 1,
PRIMARY KEY(key))
CREATE TABLE test.postgresql_replica (key UInt64, value UInt64, PRIMARY KEY(key))
ENGINE = MaterializePostgreSQL(
'postgres1:5432', 'postgres_database', 'postgresql_replica', 'postgres', 'mysecretpassword')
SETTINGS materialize_postgresql_max_block_size = 50000;
@ -376,8 +380,9 @@ def test_connection_loss(started_cluster):
create_postgres_table(cursor, 'postgresql_replica');
instance.query("INSERT INTO postgres_database.postgresql_replica SELECT number, number from numbers(50)")
instance.query('DROP TABLE IF EXISTS test.postgresql_replica')
instance.query('''
CREATE TABLE test.postgresql_replica (key UInt64, value UInt64, _sign Int8 MATERIALIZED 1, _version UInt64 MATERIALIZED 1)
CREATE TABLE test.postgresql_replica (key UInt64, value UInt64)
ENGINE = MaterializePostgreSQL(
'postgres1:5432', 'postgres_database', 'postgresql_replica', 'postgres', 'mysecretpassword')
PRIMARY KEY key;
@ -412,8 +417,9 @@ def test_clickhouse_restart(started_cluster):
create_postgres_table(cursor, 'postgresql_replica');
instance.query("INSERT INTO postgres_database.postgresql_replica SELECT number, number from numbers(50)")
instance.query('DROP TABLE IF EXISTS test.postgresql_replica')
instance.query('''
CREATE TABLE test.postgresql_replica (key UInt64, value UInt64, _sign Int8 MATERIALIZED 1, _version UInt64 MATERIALIZED 1)
CREATE TABLE test.postgresql_replica (key UInt64, value UInt64)
ENGINE = MaterializePostgreSQL(
'postgres1:5432', 'postgres_database', 'postgresql_replica', 'postgres', 'mysecretpassword')
PRIMARY KEY key; ''')
@ -439,32 +445,59 @@ def test_rename_table(started_cluster):
conn = get_postgres_conn(True)
cursor = conn.cursor()
create_postgres_table(cursor, 'postgresql_replica');
instance.query("INSERT INTO postgres_database.postgresql_replica SELECT number, number from numbers(50)")
instance.query('DROP TABLE IF EXISTS test.postgresql_replica')
instance.query('''
CREATE TABLE test.postgresql_replica (key UInt64, value UInt64, _sign Int8 MATERIALIZED 1, _version UInt64 MATERIALIZED 1)
CREATE TABLE test.postgresql_replica (key UInt64, value UInt64)
ENGINE = MaterializePostgreSQL(
'postgres1:5432', 'postgres_database', 'postgresql_replica', 'postgres', 'mysecretpassword')
PRIMARY KEY key; ''')
instance.query("INSERT INTO postgres_database.postgresql_replica SELECT number, number from numbers(25)")
result = instance.query('SELECT count() FROM test.postgresql_replica;')
while int(result) != 50:
while int(result) != 25:
time.sleep(0.5)
result = instance.query('SELECT count() FROM test.postgresql_replica;')
instance.query('RENAME TABLE test.postgresql_replica TO test.postgresql_replica_renamed')
assert(int(instance.query('SELECT count() FROM test.postgresql_replica_renamed;')) == 25)
instance.query("INSERT INTO postgres_database.postgresql_replica SELECT number, number from numbers(25, 25)")
result = instance.query('SELECT count() FROM test.postgresql_replica_renamed;')
while int(result) != 50:
time.sleep(0.5)
result = instance.query('SELECT count() FROM test.postgresql_replica_renamed;')
instance.query("INSERT INTO postgres_database.postgresql_replica SELECT number, number from numbers(50, 50)")
result = instance.query('SELECT * FROM test.postgresql_replica_renamed ORDER BY key;')
postgresql_replica_check_result(result, True)
cursor.execute('DROP TABLE postgresql_replica;')
instance.query('DROP TABLE IF EXISTS test.postgresql_replica_renamed')
result = instance.query('SELECT count() FROM test.postgresql_replica_renamed;')
while int(result) != 100:
def test_virtual_columns(started_cluster):
conn = get_postgres_conn(True)
cursor = conn.cursor()
create_postgres_table(cursor, 'postgresql_replica');
instance.query('DROP TABLE IF EXISTS test.postgresql_replica')
instance.query('''
CREATE TABLE test.postgresql_replica (key UInt64, value UInt64)
ENGINE = MaterializePostgreSQL(
'postgres1:5432', 'postgres_database', 'postgresql_replica', 'postgres', 'mysecretpassword')
PRIMARY KEY key; ''')
instance.query("INSERT INTO postgres_database.postgresql_replica SELECT number, number from numbers(10)")
result = instance.query('SELECT count() FROM test.postgresql_replica;')
while int(result) != 10:
time.sleep(0.5)
result = instance.query('SELECT count() FROM test.postgresql_replica_renamed;')
result = instance.query('SELECT count() FROM test.postgresql_replica;')
# just check that it works, no check with `expected` becuase _version is taken as LSN, which will be different each time.
result = instance.query('SELECT key, value, _sign, _version FROM test.postgresql_replica;')
print(result)
cursor.execute('DROP TABLE postgresql_replica;')
if __name__ == '__main__':