Much better

This commit is contained in:
kssenii 2021-11-05 17:25:02 +03:00
parent 26f6a697b5
commit be88441cd6
5 changed files with 134 additions and 155 deletions

View File

@ -83,38 +83,32 @@ void PostgreSQLReplicationHandler::startup()
}
String PostgreSQLReplicationHandler::probablyDoubleQuoteWithSchema(const String & table_name, bool quote) const
std::pair<String, String> PostgreSQLReplicationHandler::getSchemaAndTableName(const String & table_name) const
{
if (table_name.starts_with("\""))
{
if (!quote)
return table_name.substr(1, table_name.size() - 1);
return table_name;
}
/// !schema_list.empty() -- We replicate all tables from specifies schemas.
/// In this case when tables list is fetched, we append schema with dot. But without quotes.
/// If there is a setting `tables_list`, then table names can be put there along with schema,
/// separated by dot and with no quotes. We add double quotes in this case.
if (!postgres_schema.empty())
return std::make_pair(postgres_schema, table_name);
if (auto pos = table_name.find('.'); schema_as_a_part_of_table_name && pos != std::string::npos)
{
auto schema = table_name.substr(0, pos);
auto table = table_name.substr(pos + 1);
return doubleQuoteString(schema) + '.' + doubleQuoteString(table);
}
return std::make_pair(table_name.substr(0, pos), table_name.substr(pos + 1));
if (postgres_schema.empty())
{
/// We do no need quotes to fetch table structure in case there is no schema. (will not work)
if (quote)
return doubleQuoteString(table_name);
else
return table_name;
}
return std::make_pair("", table_name);
}
return doubleQuoteString(postgres_schema) + '.' + doubleQuoteString(table_name);
String PostgreSQLReplicationHandler::doubleQuoteWithSchema(const String & table_name) const
{
auto [schema, table] = getSchemaAndTableName(table_name);
if (schema.empty())
return doubleQuoteString(table);
return doubleQuoteString(schema) + '.' + doubleQuoteString(table);
}
@ -128,6 +122,8 @@ void PostgreSQLReplicationHandler::checkConnectionAndStart()
}
catch (const pqxx::broken_connection & pqxx_error)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
if (!is_attach)
throw;
@ -136,10 +132,10 @@ void PostgreSQLReplicationHandler::checkConnectionAndStart()
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
if (!is_attach)
throw;
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
@ -236,10 +232,7 @@ void PostgreSQLReplicationHandler::startSynchronization(bool throw_on_error)
auto * materialized_storage = storage->as <StorageMaterializedPostgreSQL>();
try
{
/// FIXME: Looks like it is possible we might get here if there is no nested storage or at least nested storage id field might be empty.
/// Caught it somehow when doing something else incorrectly, but do not see any reason how it could happen.
/// Try load nested table, set materialized table metadata.
nested_storages[table_name] = materialized_storage->prepare();
nested_storages[table_name] = materialized_storage->getNested();
}
catch (Exception & e)
{
@ -299,7 +292,7 @@ StoragePtr PostgreSQLReplicationHandler::loadFromSnapshot(postgres::Connection &
/// Load from snapshot, which will show table state before creation of replication slot.
/// Already connected to needed database, no need to add it to query.
auto quoted_name = probablyDoubleQuoteWithSchema(table_name);
auto quoted_name = doubleQuoteWithSchema(table_name);
query_str = fmt::format("SELECT * FROM {}", quoted_name);
LOG_DEBUG(log, "Loading PostgreSQL table {}.{}", postgres_database, quoted_name);
@ -324,7 +317,7 @@ StoragePtr PostgreSQLReplicationHandler::loadFromSnapshot(postgres::Connection &
CompletedPipelineExecutor executor(block_io.pipeline);
executor.execute();
nested_storage = materialized_storage->prepare();
materialized_storage->set(nested_storage);
auto nested_table_id = nested_storage->getStorageID();
LOG_DEBUG(log, "Loaded table {}.{} (uuid: {})", nested_table_id.database_name, nested_table_id.table_name, toString(nested_table_id.uuid));
@ -408,7 +401,7 @@ void PostgreSQLReplicationHandler::createPublicationIfNeeded(pqxx::nontransactio
WriteBufferFromOwnString buf;
for (const auto & storage_data : materialized_storages)
{
buf << probablyDoubleQuoteWithSchema(storage_data.first);
buf << doubleQuoteWithSchema(storage_data.first);
buf << ",";
}
tables_list = buf.str();
@ -690,10 +683,6 @@ std::set<String> PostgreSQLReplicationHandler::fetchRequiredTables()
result_tables = fetchPostgreSQLTablesList(tx, schema_list.empty() ? postgres_schema : schema_list);
}
}
else
{
result_tables = std::set(expected_tables.begin(), expected_tables.end());
}
}
@ -711,7 +700,7 @@ std::set<String> PostgreSQLReplicationHandler::fetchRequiredTables()
for (auto & table_name : tables_names)
{
boost::trim(table_name);
buf << probablyDoubleQuoteWithSchema(table_name);
buf << doubleQuoteWithSchema(table_name);
buf << ",";
}
tables_list = buf.str();
@ -747,7 +736,8 @@ PostgreSQLTableStructurePtr PostgreSQLReplicationHandler::fetchTableStructure(
PostgreSQLTableStructure structure;
try
{
structure = fetchPostgreSQLTableStructure(tx, table_name, probablyDoubleQuoteWithSchema(table_name, false), true, true, true);
auto [schema, table] = getSchemaAndTableName(table_name);
structure = fetchPostgreSQLTableStructure(tx, table, schema, true, true, true);
}
catch (...)
{
@ -781,13 +771,9 @@ void PostgreSQLReplicationHandler::addTableToReplication(StorageMaterializedPost
if (!nested)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Internal table was not created");
{
postgres::Connection tmp_connection(connection_info);
nested_storage = loadFromSnapshot(tmp_connection, snapshot_name, postgres_table_name, materialized_storage);
}
auto nested_table_id = nested_storage->getStorageID();
materialized_storage->setNestedStorageID(nested_table_id);
nested_storage = materialized_storage->prepare();
postgres::Connection tmp_connection(connection_info);
nested_storage = loadFromSnapshot(tmp_connection, snapshot_name, postgres_table_name, materialized_storage);
materialized_storage->set(nested_storage);
}
{
@ -863,6 +849,7 @@ void PostgreSQLReplicationHandler::reloadFromSnapshot(const std::vector<std::pai
{
auto storage = DatabaseCatalog::instance().getTable(StorageID(current_database_name, table_name), context);
auto * materialized_storage = storage->as <StorageMaterializedPostgreSQL>();
auto materialized_table_lock = materialized_storage->lockForShare(String(), context->getSettingsRef().lock_acquire_timeout);
/// If for some reason this temporary table already exists - also drop it.
auto temp_materialized_storage = materialized_storage->createTemporary();
@ -890,34 +877,30 @@ void PostgreSQLReplicationHandler::reloadFromSnapshot(const std::vector<std::pai
try
{
auto materialized_table_lock = materialized_storage->lockForShare(String(), context->getSettingsRef().lock_acquire_timeout);
InterpreterRenameQuery(ast_rename, nested_context).execute();
{
auto nested_storage = DatabaseCatalog::instance().getTable(StorageID(table_id.database_name, table_id.table_name),
nested_context);
auto nested_table_lock = nested_storage->lockForShare(String(), context->getSettingsRef().lock_acquire_timeout);
auto nested_table_id = nested_storage->getStorageID();
auto nested_storage = DatabaseCatalog::instance().getTable(StorageID(table_id.database_name, table_id.table_name, temp_table_id.uuid), nested_context);
materialized_storage->set(nested_storage);
materialized_storage->setNestedStorageID(nested_table_id);
nested_storage = materialized_storage->prepare();
auto nested_sample_block = nested_storage->getInMemoryMetadataPtr()->getSampleBlock();
auto materialized_sample_block = materialized_storage->getInMemoryMetadataPtr()->getSampleBlock();
assertBlocksHaveEqualStructure(nested_sample_block, materialized_sample_block, "while reloading table in the background");
auto nested_storage_metadata = nested_storage->getInMemoryMetadataPtr();
auto nested_sample_block = nested_storage_metadata->getSampleBlock();
LOG_DEBUG(log, "Updated table {}. New structure: {}",
nested_table_id.getNameForLogs(), nested_sample_block.dumpStructure());
LOG_INFO(log, "Updated table {}. New structure: {}",
nested_storage->getStorageID().getNameForLogs(), nested_sample_block.dumpStructure());
auto materialized_storage_metadata = nested_storage->getInMemoryMetadataPtr();
auto materialized_sample_block = materialized_storage_metadata->getSampleBlock();
/// Pass pointer to new nested table into replication consumer, remove current table from skip list and set start lsn position.
consumer->updateNested(table_name, nested_storage, relation_id, start_lsn);
assertBlocksHaveEqualStructure(nested_sample_block, materialized_sample_block, "while reloading table in the background");
auto table_to_drop = DatabaseCatalog::instance().getTable(StorageID(temp_table_id.database_name, temp_table_id.table_name, table_id.uuid), nested_context);
auto drop_table_id = table_to_drop->getStorageID();
/// Pass pointer to new nested table into replication consumer, remove current table from skip list and set start lsn position.
consumer->updateNested(table_name, nested_storage, relation_id, start_lsn);
}
if (drop_table_id == nested_storage->getStorageID())
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Cannot drop table because is has the same uuid as new table: {}", drop_table_id.getNameForLogs());
LOG_DEBUG(log, "Dropping table {}", temp_table_id.getNameForLogs());
InterpreterDropQuery::executeDropQuery(ASTDropQuery::Kind::Drop, nested_context, nested_context, temp_table_id, true);
LOG_DEBUG(log, "Dropping table {}", drop_table_id.getNameForLogs());
InterpreterDropQuery::executeDropQuery(ASTDropQuery::Kind::Drop, nested_context, nested_context, drop_table_id, true);
}
catch (...)
{

View File

@ -89,7 +89,9 @@ private:
PostgreSQLTableStructurePtr fetchTableStructure(pqxx::ReplicationTransaction & tx, const String & table_name) const;
String probablyDoubleQuoteWithSchema(const String & table_name, bool quote = true) const;
String doubleQuoteWithSchema(const String & table_name) const;
std::pair<String, String> getSchemaAndTableName(const String & table_name) const;
Poco::Logger * log;
ContextPtr context;

View File

@ -113,7 +113,7 @@ StorageMaterializedPostgreSQL::StorageMaterializedPostgreSQL(
ContextPtr context_,
const String & postgres_database_name,
const String & postgres_table_name)
: IStorage(nested_storage_->getStorageID())
: IStorage(StorageID(nested_storage_->getStorageID().database_name, nested_storage_->getStorageID().table_name))
, WithContext(context_->getGlobalContext())
, log(&Poco::Logger::get("StorageMaterializedPostgreSQL(" + postgres::formatNameForLogs(postgres_database_name, postgres_table_name) + ")"))
, is_materialized_postgresql_database(true)
@ -214,12 +214,11 @@ std::shared_ptr<Context> StorageMaterializedPostgreSQL::makeNestedTableContext(C
}
StoragePtr StorageMaterializedPostgreSQL::prepare()
void StorageMaterializedPostgreSQL::set(StoragePtr nested_storage)
{
auto nested_table = getNested();
setInMemoryMetadata(nested_table->getInMemoryMetadata());
nested_table_id = nested_storage->getStorageID();
setInMemoryMetadata(nested_storage->getInMemoryMetadata());
has_nested.store(true);
return nested_table;
}

View File

@ -117,14 +117,10 @@ public:
StorageID getNestedStorageID() const;
void setNestedStorageID(const StorageID & id) { nested_table_id.emplace(id); }
void set(StoragePtr nested_storage);
static std::shared_ptr<Context> makeNestedTableContext(ContextPtr from_context);
/// Get nested table (or throw if it does not exist), set in-memory metadata (taken from nested table)
/// for current table, set has_nested = true.
StoragePtr prepare();
bool supportsFinal() const override { return true; }
ASTPtr getCreateNestedTableQuery(PostgreSQLTableStructurePtr table_structure);

View File

@ -153,7 +153,7 @@ def assert_nested_table_is_created(table_name, materialized_database='test_datab
table = table_name
else:
table = schema_name + "." + table_name
print('Checking table exists:', table)
print(f'Checking table {table} exists in {materialized_database}')
database_tables = instance.query('SHOW TABLES FROM {}'.format(materialized_database))
while table not in database_tables:
time.sleep(0.2)
@ -166,11 +166,12 @@ def assert_number_of_columns(expected, table_name, database_name='test_database'
while (int(result) != expected):
time.sleep(1)
result = instance.query(f"select count() from system.columns where table = '{table_name}' and database = '{database_name}' and not startsWith(name, '_')")
print('Number of columns ok')
@pytest.mark.timeout(320)
def check_tables_are_synchronized(table_name, order_by='key', postgres_database='postgres_database', materialized_database='test_database'):
assert_nested_table_is_created(table_name, materialized_database)
def check_tables_are_synchronized(table_name, order_by='key', postgres_database='postgres_database', materialized_database='test_database', schema_name=''):
assert_nested_table_is_created(table_name, materialized_database, schema_name)
print("Checking table is synchronized:", table_name)
expected = instance.query('select * from {}.{} order by {};'.format(postgres_database, table_name, order_by))
@ -1182,6 +1183,66 @@ def test_predefined_connection_configuration(started_cluster):
insert_counter = 0
def test_database_with_single_non_default_schema(started_cluster):
conn = get_postgres_conn(ip=started_cluster.postgres_ip, port=started_cluster.postgres_port, database=True)
cursor = conn.cursor()
NUM_TABLES=5
schema_name = 'test_schema'
clickhouse_postgres_db = 'postgres_database_with_schema'
global insert_counter
insert_counter = 0
def insert_into_tables():
global insert_counter
clickhouse_postgres_db = 'postgres_database_with_schema'
for i in range(NUM_TABLES):
table_name = f'postgresql_replica_{i}'
instance.query(f"INSERT INTO {clickhouse_postgres_db}.{table_name} SELECT number, number from numbers(1000 * {insert_counter}, 1000)")
insert_counter += 1
def assert_show_tables(expected):
result = instance.query('SHOW TABLES FROM test_database')
assert(result == expected)
print('assert show tables Ok')
def check_all_tables_are_synchronized():
for i in range(NUM_TABLES):
print('checking table', i)
check_tables_are_synchronized("postgresql_replica_{}".format(i), postgres_database=clickhouse_postgres_db);
print('synchronization Ok')
create_postgres_schema(cursor, schema_name)
create_clickhouse_postgres_db(ip=cluster.postgres_ip, port=cluster.postgres_port, name=clickhouse_postgres_db, schema_name=schema_name)
for i in range(NUM_TABLES):
table_name = 'postgresql_replica_{}'.format(i)
create_postgres_table_with_schema(cursor, schema_name, table_name);
insert_into_tables()
create_materialized_db(ip=started_cluster.postgres_ip, port=started_cluster.postgres_port,
settings=[f"materialized_postgresql_schema = '{schema_name}'", "materialized_postgresql_allow_automatic_update = 1"])
insert_into_tables()
check_all_tables_are_synchronized()
assert_show_tables("postgresql_replica_0\npostgresql_replica_1\npostgresql_replica_2\npostgresql_replica_3\npostgresql_replica_4\n")
instance.restart_clickhouse()
check_all_tables_are_synchronized()
assert_show_tables("postgresql_replica_0\npostgresql_replica_1\npostgresql_replica_2\npostgresql_replica_3\npostgresql_replica_4\n")
insert_into_tables()
check_all_tables_are_synchronized()
print('ALTER')
altered_table = random.randint(0, NUM_TABLES-1)
cursor.execute("ALTER TABLE test_schema.postgresql_replica_{} ADD COLUMN value2 integer".format(altered_table))
instance.query(f"INSERT INTO {clickhouse_postgres_db}.postgresql_replica_{altered_table} SELECT number, number, number from numbers(5000, 1000)")
assert_number_of_columns(3, f'postgresql_replica_{altered_table}')
check_tables_are_synchronized(f"postgresql_replica_{altered_table}", postgres_database=clickhouse_postgres_db);
drop_materialized_db()
def test_database_with_multiple_non_default_schemas_1(started_cluster):
conn = get_postgres_conn(ip=started_cluster.postgres_ip, port=started_cluster.postgres_port, database=True)
cursor = conn.cursor()
@ -1190,6 +1251,7 @@ def test_database_with_multiple_non_default_schemas_1(started_cluster):
schema_name = 'test_schema'
clickhouse_postgres_db = 'postgres_database_with_schema'
publication_tables = ''
global insert_counter
insert_counter = 0
def insert_into_tables():
@ -1235,15 +1297,13 @@ def test_database_with_multiple_non_default_schemas_1(started_cluster):
insert_into_tables()
check_all_tables_are_synchronized()
#altered_table = random.randint(0, NUM_TABLES-1)
#cursor.execute("ALTER TABLE test_schema.postgresql_replica_{} ADD COLUMN value2 integer".format(altered_table))
#check_tables_are_synchronized("postgresql_replica_{}".format(altered_table), schema_name=schema_name, postgres_database=clickhouse_postgres_db);
#table_name = 'postgresql_replica_{}'.format(altered_table)
#instance.query("INSERT INTO {}.{} SELECT number, number, number from numbers(5000, 1000)".format(clickhouse_postgres_db, table_name))
#check_tables_are_synchronized("postgresql_replica_{}".format(altered_table), schema_name=schema_name, postgres_database=clickhouse_postgres_db);
#print('Ok')
print('ALTER')
altered_table = random.randint(0, NUM_TABLES-1)
cursor.execute("ALTER TABLE test_schema.postgresql_replica_{} ADD COLUMN value2 integer".format(altered_table))
instance.query(f"INSERT INTO {clickhouse_postgres_db}.postgresql_replica_{altered_table} SELECT number, number, number from numbers(5000, 1000)")
assert_number_of_columns(3, f'{schema_name}.postgresql_replica_{altered_table}')
check_tables_are_synchronized(f"postgresql_replica_{altered_table}", schema_name=schema_name, postgres_database=clickhouse_postgres_db);
drop_materialized_db()
@ -1254,6 +1314,7 @@ def test_database_with_multiple_non_default_schemas_2(started_cluster):
NUM_TABLES = 2
schemas_num = 2
schema_list = 'schema0, schema1'
global insert_counter
insert_counter = 0
def check_all_tables_are_synchronized():
@ -1303,76 +1364,14 @@ def test_database_with_multiple_non_default_schemas_2(started_cluster):
insert_into_tables()
check_all_tables_are_synchronized()
#altered_table = random.randint(0, NUM_TABLES-1)
#cursor.execute("ALTER TABLE test_schema.postgresql_replica_{} ADD COLUMN value2 integer".format(altered_table))
#check_tables_are_synchronized("postgresql_replica_{}".format(altered_table), schema_name=schema_name, postgres_database=clickhouse_postgres_db);
#table_name = 'postgresql_replica_{}'.format(altered_table)
#instance.query("INSERT INTO {}.{} SELECT number, number, number from numbers(5000, 1000)".format(clickhouse_postgres_db, table_name))
#check_tables_are_synchronized("postgresql_replica_{}".format(altered_table), schema_name=schema_name, postgres_database=clickhouse_postgres_db);
#print('Ok')
drop_materialized_db()
def test_database_with_single_non_default_schema(started_cluster):
conn = get_postgres_conn(ip=started_cluster.postgres_ip, port=started_cluster.postgres_port, database=True)
cursor = conn.cursor()
NUM_TABLES=5
schema_name = 'test_schema'
clickhouse_postgres_db = 'postgres_database_with_schema'
insert_counter = 0
def insert_into_tables():
global insert_counter
clickhouse_postgres_db = 'postgres_database_with_schema'
for i in range(NUM_TABLES):
table_name = f'postgresql_replica_{i}'
instance.query(f"INSERT INTO {clickhouse_postgres_db}.{table_name} SELECT number, number from numbers(1000 * {insert_counter}, 1000)")
insert_counter += 1
def assert_show_tables(expected):
result = instance.query('SHOW TABLES FROM test_database')
assert(result == expected)
print('assert show tables Ok')
def check_all_tables_are_synchronized():
for i in range(NUM_TABLES):
print('checking table', i)
check_tables_are_synchronized("postgresql_replica_{}".format(i), postgres_database=clickhouse_postgres_db);
print('synchronization Ok')
create_postgres_schema(cursor, schema_name)
create_clickhouse_postgres_db(ip=cluster.postgres_ip, port=cluster.postgres_port, name=clickhouse_postgres_db, schema_name=schema_name)
for i in range(NUM_TABLES):
table_name = 'postgresql_replica_{}'.format(i)
create_postgres_table_with_schema(cursor, schema_name, table_name);
insert_into_tables()
create_materialized_db(ip=started_cluster.postgres_ip, port=started_cluster.postgres_port,
settings=[f"materialized_postgresql_schema = '{schema_name}'", "materialized_postgresql_allow_automatic_update = 1"])
insert_into_tables()
check_all_tables_are_synchronized()
assert_show_tables("postgresql_replica_0\npostgresql_replica_1\npostgresql_replica_2\npostgresql_replica_3\npostgresql_replica_4\n")
instance.restart_clickhouse()
check_all_tables_are_synchronized()
assert_show_tables("postgresql_replica_0\npostgresql_replica_1\npostgresql_replica_2\npostgresql_replica_3\npostgresql_replica_4\n")
insert_into_tables()
check_all_tables_are_synchronized()
#altered_table = random.randint(0, NUM_TABLES-1)
#cursor.execute("ALTER TABLE test_schema.postgresql_replica_{} ADD COLUMN value2 integer".format(altered_table))
#check_tables_are_synchronized("postgresql_replica_{}".format(altered_table), postgres_database=clickhouse_postgres_db);
#table_name = 'postgresql_replica_{}'.format(altered_table)
#instance.query("INSERT INTO {}.{} SELECT number, number, number from numbers(5000, 1000)".format(clickhouse_postgres_db, table_name))
#check_tables_are_synchronized("postgresql_replica_{}".format(altered_table), postgres_database=clickhouse_postgres_db);
#print('Ok')
print('ALTER')
altered_schema = random.randint(0, schemas_num-1)
altered_table = random.randint(0, NUM_TABLES-1)
cursor.execute(f"ALTER TABLE schema{altered_schema}.postgresql_replica_{altered_table} ADD COLUMN value2 integer")
instance.query(f"INSERT INTO clickhouse_postgres_db{altered_schema}.postgresql_replica_{altered_table} SELECT number, number, number from numbers(1000 * {insert_counter}, 1000)")
assert_number_of_columns(3, f'schema{altered_schema}.postgresql_replica_{altered_table}')
check_tables_are_synchronized(f"postgresql_replica_{altered_table}", schema_name=schema_name, postgres_database=clickhouse_postgres_db);
drop_materialized_db()