Fix and test server restart

This commit is contained in:
kssenii 2021-04-08 20:39:56 +00:00
parent 4482a35a3a
commit 093e53e65a
5 changed files with 73 additions and 2 deletions

View File

@ -119,11 +119,13 @@ void PostgreSQLReplicationHandler::startSynchronization()
}
else
{
LOG_TRACE(log, "Restoring tables...");
for (const auto & [table_name, storage] : storages)
{
try
{
nested_storages[table_name] = storage->getNested();
storage->setStorageMetadata();
storage->setNestedLoaded();
}
catch (...)

View File

@ -152,6 +152,13 @@ ASTPtr StorageMaterializePostgreSQL::getColumnDeclaration(const DataTypePtr & da
}
void StorageMaterializePostgreSQL::setStorageMetadata()
{
auto storage_metadata = getNested()->getInMemoryMetadataPtr();
setInMemoryMetadata(*storage_metadata);
}
/// For single storage MaterializePostgreSQL get columns and primary key columns from storage definition.
/// For database engine MaterializePostgreSQL get columns and primary key columns by fetching from PostgreSQL, also using the same
/// transaction with snapshot, which is used for initial tables dump.

View File

@ -69,6 +69,8 @@ public:
void dropNested();
void setStorageMetadata();
protected:
StorageMaterializePostgreSQL(
const StorageID & table_id_,

View File

@ -10,7 +10,10 @@ from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
from helpers.test_tools import TSV
cluster = ClickHouseCluster(__file__)
instance = cluster.add_instance('instance', main_configs=['configs/log_conf.xml'], user_configs = ['configs/users.xml'], with_postgres=True)
instance = cluster.add_instance('instance',
main_configs=['configs/log_conf.xml'],
user_configs = ['configs/users.xml'],
with_postgres=True, stay_alive=True)
postgres_table_template = """
CREATE TABLE IF NOT EXISTS {} (
@ -55,6 +58,7 @@ def assert_nested_table_is_created(table_name):
@pytest.mark.timeout(30)
def check_tables_are_synchronized(table_name, order_by='key'):
assert_nested_table_is_created(table_name)
print("nested ok")
expected = instance.query('select * from postgres_database.{} order by {};'.format(table_name, order_by))
result = instance.query('select * from test_database.{} order by {};'.format(table_name, order_by))
@ -364,6 +368,32 @@ def test_changing_replica_identity_value(started_cluster):
check_tables_are_synchronized('postgresql_replica');
@pytest.mark.timeout(320)
def test_clickhouse_restart(started_cluster):
instance.query("DROP DATABASE IF EXISTS test_database")
conn = get_postgres_conn(True)
cursor = conn.cursor()
NUM_TABLES = 5
for i in range(NUM_TABLES):
create_postgres_table(cursor, 'postgresql_replica_{}'.format(i));
instance.query("INSERT INTO postgres_database.postgresql_replica_{} SELECT number, {} from numbers(50)".format(i, i))
instance.query("CREATE DATABASE test_database ENGINE = MaterializePostgreSQL('postgres1:5432', 'postgres_database', 'postgres', 'mysecretpassword')")
for i in range(NUM_TABLES):
table_name = 'postgresql_replica_{}'.format(i)
check_tables_are_synchronized(table_name);
for i in range(NUM_TABLES):
instance.query("INSERT INTO postgres_database.postgresql_replica_{} SELECT 50 + number, {} from numbers(50000)".format(i, i))
instance.restart_clickhouse()
for i in range(NUM_TABLES):
check_tables_are_synchronized('postgresql_replica_{}'.format(i));
if __name__ == '__main__':
cluster.start()
input("Cluster created, press any key to destroy...")

View File

@ -9,7 +9,7 @@ from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
from helpers.test_tools import TSV
cluster = ClickHouseCluster(__file__)
instance = cluster.add_instance('instance', main_configs=['configs/log_conf.xml'], with_postgres=True)
instance = cluster.add_instance('instance', main_configs=['configs/log_conf.xml'], with_postgres=True, stay_alive=True)
postgres_table_template = """
CREATE TABLE IF NOT EXISTS {} (
@ -400,6 +400,36 @@ def test_connection_loss(started_cluster):
assert(int(result) == 100050)
@pytest.mark.timeout(320)
def test_clickhouse_restart(started_cluster):
conn = get_postgres_conn(True)
cursor = conn.cursor()
create_postgres_table(cursor, 'postgresql_replica');
instance.query("INSERT INTO postgres_database.postgresql_replica SELECT number, number from numbers(50)")
instance.query('''
CREATE TABLE test.postgresql_replica (key UInt64, value UInt64, _sign Int8 MATERIALIZED 1, _version UInt64 MATERIALIZED 1)
ENGINE = MaterializePostgreSQL(
'postgres1:5432', 'postgres_database', 'postgresql_replica', 'postgres', 'mysecretpassword')
PRIMARY KEY key; ''')
i = 50
while i < 100000:
instance.query("INSERT INTO postgres_database.postgresql_replica SELECT {} + number, number from numbers(10000)".format(i))
i += 10000
instance.restart_clickhouse()
result = instance.query('SELECT count() FROM test.postgresql_replica;')
while int(result) < 100050:
time.sleep(1)
result = instance.query('SELECT count() FROM test.postgresql_replica;')
cursor.execute('DROP TABLE postgresql_replica;')
print(result)
assert(int(result) == 100050)
if __name__ == '__main__':
cluster.start()
input("Cluster created, press any key to destroy...")