Fix access to tables after restart

This commit is contained in:
kssenii 2021-07-01 07:33:58 +00:00
parent 1a5c30e249
commit f01c8edbff
7 changed files with 103 additions and 79 deletions

View File

@ -153,6 +153,12 @@ void DatabaseMaterializedPostgreSQL::createTable(ContextPtr local_context, const
}
void DatabaseMaterializedPostgreSQL::shutdown()
{
stopReplication();
}
void DatabaseMaterializedPostgreSQL::stopReplication()
{
if (replication_handler)

View File

@ -57,6 +57,8 @@ public:
void stopReplication();
void shutdown() override;
private:
void startSynchronization();

View File

@ -28,6 +28,10 @@
# include <Storages/StorageMaterializeMySQL.h>
#endif
#if USE_LIBPQXX
# include <Storages/PostgreSQL/StorageMaterializedPostgreSQL.h>
#endif
namespace fs = std::filesystem;
namespace CurrentMetrics
@ -234,6 +238,13 @@ DatabaseAndTable DatabaseCatalog::getTableImpl(
return {};
}
#if USE_LIBPQXX
if (!context_->isInternalQuery() && (db_and_table.first->getEngineName() == "MaterializedPostgreSQL"))
{
db_and_table.second = std::make_shared<StorageMaterializedPostgreSQL>(std::move(db_and_table.second), getContext());
}
#endif
#if USE_MYSQL
/// It's definitely not the best place for this logic, but behaviour must be consistent with DatabaseMaterializeMySQL::tryGetTable(...)
if (db_and_table.first->getEngineName() == "MaterializeMySQL")
@ -245,6 +256,7 @@ DatabaseAndTable DatabaseCatalog::getTableImpl(
return db_and_table;
}
if (table_id.database_name == TEMPORARY_DATABASE)
{
/// For temporary tables UUIDs are set in Context::resolveStorageID(...).

View File

@ -38,6 +38,12 @@ MaterializedPostgreSQLConsumer::MaterializedPostgreSQLConsumer(
, allow_automatic_update(allow_automatic_update_)
, storages(storages_)
{
final_lsn = start_lsn;
auto tx = std::make_shared<pqxx::nontransaction>(connection->getRef());
current_lsn = advanceLSN(tx);
LOG_TRACE(log, "Starting replication. LSN: {} (last: {})", getLSNValue(current_lsn), getLSNValue(final_lsn));
tx->commit();
for (const auto & [table_name, storage] : storages)
{
buffers.emplace(table_name, Buffer(storage));
@ -298,7 +304,7 @@ void MaterializedPostgreSQLConsumer::processReplicationMessage(const char * repl
/// In this case, first comes a tuple with old replica identity indexes and all other values will come as
/// nulls. Then comes a full new row.
case 'K': [[fallthrough]];
/// Old row. Only if replica identity is set to full. Does notreally make sense to use it as
/// Old row. Only if replica identity is set to full. Does not really make sense to use it as
/// it is much more efficient to use replica identity index, but support all possible cases.
case 'O':
{
@ -371,7 +377,9 @@ void MaterializedPostgreSQLConsumer::processReplicationMessage(const char * repl
if (storages.find(relation_name) == storages.end())
{
markTableAsSkipped(relation_id, relation_name);
LOG_ERROR(log, "Storage for table {} does not exist, but is included in replication stream", relation_name);
LOG_ERROR(log,
"Storage for table {} does not exist, but is included in replication stream. (Storages number: {})",
relation_name, storages.size());
return;
}
@ -468,13 +476,13 @@ void MaterializedPostgreSQLConsumer::syncTables()
{
auto storage = storages[table_name];
auto insert_context = Context::createCopy(context);
insert_context->setInternalQuery(true);
auto insert = std::make_shared<ASTInsertQuery>();
insert->table_id = storage->getStorageID();
insert->columns = buffer.columnsAST;
auto insert_context = Context::createCopy(context);
insert_context->setInternalQuery(true);
InterpreterInsertQuery interpreter(insert, insert_context, true);
auto block_io = interpreter.execute();
OneBlockInputStream input(result_rows);
@ -505,10 +513,8 @@ String MaterializedPostgreSQLConsumer::advanceLSN(std::shared_ptr<pqxx::nontrans
std::string query_str = fmt::format("SELECT end_lsn FROM pg_replication_slot_advance('{}', '{}')", replication_slot_name, final_lsn);
pqxx::result result{tx->exec(query_str)};
if (!result.empty())
return result[0][0].as<std::string>();
LOG_TRACE(log, "Advanced LSN up to: {}", final_lsn);
final_lsn = result[0][0].as<std::string>();
LOG_TRACE(log, "Advanced LSN up to: {}", getLSNValue(final_lsn));
return final_lsn;
}
@ -552,18 +558,21 @@ void MaterializedPostgreSQLConsumer::markTableAsSkipped(Int32 relation_id, const
/// Empty lsn string means - continue waiting for valid lsn.
skip_list.insert({relation_id, ""});
/// Erase cached schema identifiers. It will be updated again once table is allowed back into replication stream
/// and it receives first data after update.
schema_data.erase(relation_id);
if (storages.count(relation_name))
{
/// Erase cached schema identifiers. It will be updated again once table is allowed back into replication stream
/// and it receives first data after update.
schema_data.erase(relation_id);
/// Clear table buffer.
auto & buffer = buffers.find(relation_name)->second;
buffer.columns = buffer.description.sample_block.cloneEmptyColumns();
/// Clear table buffer.
auto & buffer = buffers.find(relation_name)->second;
buffer.columns = buffer.description.sample_block.cloneEmptyColumns();
if (allow_automatic_update)
LOG_TRACE(log, "Table {} (relation_id: {}) is skipped temporarily. It will be reloaded in the background", relation_name, relation_id);
else
LOG_WARNING(log, "Table {} (relation_id: {}) is skipped, because table schema has changed", relation_name, relation_id);
if (allow_automatic_update)
LOG_TRACE(log, "Table {} (relation_id: {}) is skipped temporarily. It will be reloaded in the background", relation_name, relation_id);
else
LOG_WARNING(log, "Table {} (relation_id: {}) is skipped, because table schema has changed", relation_name, relation_id);
}
}

View File

@ -156,7 +156,6 @@ void PostgreSQLReplicationHandler::startSynchronization(bool throw_on_error)
/// and pass them to replication consumer.
else
{
LOG_TRACE(log, "Loading {} tables...", materialized_storages.size());
for (const auto & [table_name, storage] : materialized_storages)
{
auto * materialized_storage = storage->as <StorageMaterializedPostgreSQL>();
@ -174,12 +173,14 @@ void PostgreSQLReplicationHandler::startSynchronization(bool throw_on_error)
throw;
}
}
LOG_TRACE(log, "Loaded {} tables", nested_storages.size());
}
tx.commit();
/// Pass current connection to consumer. It is not std::moved implicitly, but a shared_ptr is passed.
/// Consumer and replication handler are always executed one after another (not concurrently) and share the same connection.
/// (Apart from the case, when shutdownFinal is called).
/// Handler uses it only for loadFromSnapshot and shutdown methods.
consumer = std::make_shared<MaterializedPostgreSQLConsumer>(
context,

View File

@ -225,6 +225,9 @@ void StorageMaterializedPostgreSQL::shutdown()
{
if (replication_handler)
replication_handler->shutdown();
auto nested = getNested();
if (nested)
nested->shutdown();
}

View File

@ -101,7 +101,8 @@ queries = [
'DELETE FROM postgresql_replica_{} WHERE value % 9 = 2;',
"UPDATE postgresql_replica_{} SET key=key+10000000",
'UPDATE postgresql_replica_{} SET value = value + 2 WHERE key % 3 = 1;',
'DELETE FROM postgresql_replica_{} WHERE value%5 = 0;']
'DELETE FROM postgresql_replica_{} WHERE value%5 = 0;'
]
@pytest.mark.timeout(30)
@ -836,66 +837,56 @@ def test_restart_server_while_replication_startup_not_finished(started_cluster):
drop_materialized_db()
# Something not ok with this test, need to investigate.
@pytest.mark.timeout(320)
def test_abrupt_server_restart_while_heavy_replication(started_cluster):
return
# instance.query("DROP DATABASE IF EXISTS test_database")
# conn = get_postgres_conn(ip=started_cluster.postgres_ip,
# port=started_cluster.postgres_port,
# database=True)
# cursor = conn.cursor()
# NUM_TABLES = 6
#
# for i in range(NUM_TABLES):
# create_postgres_table(cursor, 'postgresql_replica_{}'.format(i));
#
# def transaction(thread_id):
# if thread_id % 2:
# conn = get_postgres_conn(ip=started_cluster.postgres_ip,
# port=started_cluster.postgres_port,
# database=True, auto_commit=True)
# else:
# conn = get_postgres_conn(ip=started_cluster.postgres_ip,
# port=started_cluster.postgres_port,
# database=True, auto_commit=False)
# cursor_ = conn.cursor()
# for query in queries:
# cursor_.execute(query.format(thread_id))
# print('thread {}, query {}'.format(thread_id, query))
# if thread_id % 2 == 0:
# conn.commit()
#
# threads = []
# threads_num = 6
# for i in range(threads_num):
# threads.append(threading.Thread(target=transaction, args=(i,)))
#
# create_materialized_db(ip=started_cluster.postgres_ip,
# port=started_cluster.postgres_port)
#
# for thread in threads:
# time.sleep(random.uniform(0, 0.5))
# thread.start()
#
# # Join here because it takes time for data to reach wal
# for thread in threads:
# thread.join()
# time.sleep(1)
# instance.restart_clickhouse()
#
# for i in range(NUM_TABLES):
# result = instance.query("SELECT count() FROM test_database.postgresql_replica_{}".format(i))
# print(result) # Just debug
#
# for i in range(NUM_TABLES):
# check_tables_are_synchronized('postgresql_replica_{}'.format(i));
#
# for i in range(NUM_TABLES):
# result = instance.query("SELECT count() FROM test_database.postgresql_replica_{}".format(i))
# print(result) # Just debug
#
# drop_materialized_db()
instance.query("DROP DATABASE IF EXISTS test_database")
conn = get_postgres_conn(ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port,
database=True)
cursor = conn.cursor()
NUM_TABLES = 2
for i in range(NUM_TABLES):
create_postgres_table(cursor, 'postgresql_replica_{}'.format(i));
def transaction(thread_id):
conn = get_postgres_conn(ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port,
database=True, auto_commit=True)
cursor_ = conn.cursor()
for query in queries:
cursor_.execute(query.format(thread_id))
print('thread {}, query {}'.format(thread_id, query))
threads = []
threads_num = 2
for i in range(threads_num):
threads.append(threading.Thread(target=transaction, args=(i,)))
create_materialized_db(ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port)
for thread in threads:
time.sleep(random.uniform(0, 0.5))
thread.start()
# Join here because it takes time for data to reach wal
for thread in threads:
thread.join()
instance.restart_clickhouse()
for i in range(NUM_TABLES):
result = instance.query("SELECT count() FROM test_database.postgresql_replica_{}".format(i))
print(result) # Just debug
for i in range(NUM_TABLES):
check_tables_are_synchronized('postgresql_replica_{}'.format(i));
for i in range(NUM_TABLES):
result = instance.query("SELECT count() FROM test_database.postgresql_replica_{}".format(i))
print(result) # Just debug
drop_materialized_db()
if __name__ == '__main__':