From f01c8edbff57717a027184c559d899148755066d Mon Sep 17 00:00:00 2001 From: kssenii Date: Thu, 1 Jul 2021 07:33:58 +0000 Subject: [PATCH] Fix access to tables after restart --- .../DatabaseMaterializedPostgreSQL.cpp | 6 + .../DatabaseMaterializedPostgreSQL.h | 2 + src/Interpreters/DatabaseCatalog.cpp | 12 ++ .../MaterializedPostgreSQLConsumer.cpp | 47 +++++--- .../PostgreSQLReplicationHandler.cpp | 3 +- .../StorageMaterializedPostgreSQL.cpp | 3 + .../test.py | 109 ++++++++---------- 7 files changed, 103 insertions(+), 79 deletions(-) diff --git a/src/Databases/PostgreSQL/DatabaseMaterializedPostgreSQL.cpp b/src/Databases/PostgreSQL/DatabaseMaterializedPostgreSQL.cpp index 37a464c6cda..4cfb5a4d137 100644 --- a/src/Databases/PostgreSQL/DatabaseMaterializedPostgreSQL.cpp +++ b/src/Databases/PostgreSQL/DatabaseMaterializedPostgreSQL.cpp @@ -153,6 +153,12 @@ void DatabaseMaterializedPostgreSQL::createTable(ContextPtr local_context, const } +void DatabaseMaterializedPostgreSQL::shutdown() +{ + stopReplication(); +} + + void DatabaseMaterializedPostgreSQL::stopReplication() { if (replication_handler) diff --git a/src/Databases/PostgreSQL/DatabaseMaterializedPostgreSQL.h b/src/Databases/PostgreSQL/DatabaseMaterializedPostgreSQL.h index 0a60f47cbe4..f998a0c54de 100644 --- a/src/Databases/PostgreSQL/DatabaseMaterializedPostgreSQL.h +++ b/src/Databases/PostgreSQL/DatabaseMaterializedPostgreSQL.h @@ -57,6 +57,8 @@ public: void stopReplication(); + void shutdown() override; + private: void startSynchronization(); diff --git a/src/Interpreters/DatabaseCatalog.cpp b/src/Interpreters/DatabaseCatalog.cpp index 4ed4f258b29..0d0c82f1abc 100644 --- a/src/Interpreters/DatabaseCatalog.cpp +++ b/src/Interpreters/DatabaseCatalog.cpp @@ -28,6 +28,10 @@ # include #endif +#if USE_LIBPQXX +# include +#endif + namespace fs = std::filesystem; namespace CurrentMetrics @@ -234,6 +238,13 @@ DatabaseAndTable DatabaseCatalog::getTableImpl( return {}; } +#if USE_LIBPQXX + if (!context_->isInternalQuery() && (db_and_table.first->getEngineName() == "MaterializedPostgreSQL")) + { + db_and_table.second = std::make_shared(std::move(db_and_table.second), getContext()); + } +#endif + #if USE_MYSQL /// It's definitely not the best place for this logic, but behaviour must be consistent with DatabaseMaterializeMySQL::tryGetTable(...) if (db_and_table.first->getEngineName() == "MaterializeMySQL") @@ -245,6 +256,7 @@ DatabaseAndTable DatabaseCatalog::getTableImpl( return db_and_table; } + if (table_id.database_name == TEMPORARY_DATABASE) { /// For temporary tables UUIDs are set in Context::resolveStorageID(...). diff --git a/src/Storages/PostgreSQL/MaterializedPostgreSQLConsumer.cpp b/src/Storages/PostgreSQL/MaterializedPostgreSQLConsumer.cpp index 2d98fc79c40..f80353f5586 100644 --- a/src/Storages/PostgreSQL/MaterializedPostgreSQLConsumer.cpp +++ b/src/Storages/PostgreSQL/MaterializedPostgreSQLConsumer.cpp @@ -38,6 +38,12 @@ MaterializedPostgreSQLConsumer::MaterializedPostgreSQLConsumer( , allow_automatic_update(allow_automatic_update_) , storages(storages_) { + final_lsn = start_lsn; + auto tx = std::make_shared(connection->getRef()); + current_lsn = advanceLSN(tx); + LOG_TRACE(log, "Starting replication. LSN: {} (last: {})", getLSNValue(current_lsn), getLSNValue(final_lsn)); + tx->commit(); + for (const auto & [table_name, storage] : storages) { buffers.emplace(table_name, Buffer(storage)); @@ -298,7 +304,7 @@ void MaterializedPostgreSQLConsumer::processReplicationMessage(const char * repl /// In this case, first comes a tuple with old replica identity indexes and all other values will come as /// nulls. Then comes a full new row. case 'K': [[fallthrough]]; - /// Old row. Only if replica identity is set to full. Does notreally make sense to use it as + /// Old row. Only if replica identity is set to full. Does not really make sense to use it as /// it is much more efficient to use replica identity index, but support all possible cases. case 'O': { @@ -371,7 +377,9 @@ void MaterializedPostgreSQLConsumer::processReplicationMessage(const char * repl if (storages.find(relation_name) == storages.end()) { markTableAsSkipped(relation_id, relation_name); - LOG_ERROR(log, "Storage for table {} does not exist, but is included in replication stream", relation_name); + LOG_ERROR(log, + "Storage for table {} does not exist, but is included in replication stream. (Storages number: {})", + relation_name, storages.size()); return; } @@ -468,13 +476,13 @@ void MaterializedPostgreSQLConsumer::syncTables() { auto storage = storages[table_name]; + auto insert_context = Context::createCopy(context); + insert_context->setInternalQuery(true); + auto insert = std::make_shared(); insert->table_id = storage->getStorageID(); insert->columns = buffer.columnsAST; - auto insert_context = Context::createCopy(context); - insert_context->setInternalQuery(true); - InterpreterInsertQuery interpreter(insert, insert_context, true); auto block_io = interpreter.execute(); OneBlockInputStream input(result_rows); @@ -505,10 +513,8 @@ String MaterializedPostgreSQLConsumer::advanceLSN(std::shared_ptrexec(query_str)}; - if (!result.empty()) - return result[0][0].as(); - - LOG_TRACE(log, "Advanced LSN up to: {}", final_lsn); + final_lsn = result[0][0].as(); + LOG_TRACE(log, "Advanced LSN up to: {}", getLSNValue(final_lsn)); return final_lsn; } @@ -552,18 +558,21 @@ void MaterializedPostgreSQLConsumer::markTableAsSkipped(Int32 relation_id, const /// Empty lsn string means - continue waiting for valid lsn. skip_list.insert({relation_id, ""}); - /// Erase cached schema identifiers. It will be updated again once table is allowed back into replication stream - /// and it receives first data after update. - schema_data.erase(relation_id); + if (storages.count(relation_name)) + { + /// Erase cached schema identifiers. It will be updated again once table is allowed back into replication stream + /// and it receives first data after update. + schema_data.erase(relation_id); - /// Clear table buffer. - auto & buffer = buffers.find(relation_name)->second; - buffer.columns = buffer.description.sample_block.cloneEmptyColumns(); + /// Clear table buffer. + auto & buffer = buffers.find(relation_name)->second; + buffer.columns = buffer.description.sample_block.cloneEmptyColumns(); - if (allow_automatic_update) - LOG_TRACE(log, "Table {} (relation_id: {}) is skipped temporarily. It will be reloaded in the background", relation_name, relation_id); - else - LOG_WARNING(log, "Table {} (relation_id: {}) is skipped, because table schema has changed", relation_name, relation_id); + if (allow_automatic_update) + LOG_TRACE(log, "Table {} (relation_id: {}) is skipped temporarily. It will be reloaded in the background", relation_name, relation_id); + else + LOG_WARNING(log, "Table {} (relation_id: {}) is skipped, because table schema has changed", relation_name, relation_id); + } } diff --git a/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.cpp b/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.cpp index 74e3a2fb965..1d8ab04cfec 100644 --- a/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.cpp +++ b/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.cpp @@ -156,7 +156,6 @@ void PostgreSQLReplicationHandler::startSynchronization(bool throw_on_error) /// and pass them to replication consumer. else { - LOG_TRACE(log, "Loading {} tables...", materialized_storages.size()); for (const auto & [table_name, storage] : materialized_storages) { auto * materialized_storage = storage->as (); @@ -174,12 +173,14 @@ void PostgreSQLReplicationHandler::startSynchronization(bool throw_on_error) throw; } } + LOG_TRACE(log, "Loaded {} tables", nested_storages.size()); } tx.commit(); /// Pass current connection to consumer. It is not std::moved implicitly, but a shared_ptr is passed. /// Consumer and replication handler are always executed one after another (not concurrently) and share the same connection. + /// (Apart from the case, when shutdownFinal is called). /// Handler uses it only for loadFromSnapshot and shutdown methods. consumer = std::make_shared( context, diff --git a/src/Storages/PostgreSQL/StorageMaterializedPostgreSQL.cpp b/src/Storages/PostgreSQL/StorageMaterializedPostgreSQL.cpp index 52fe7be35a7..903bab2b12c 100644 --- a/src/Storages/PostgreSQL/StorageMaterializedPostgreSQL.cpp +++ b/src/Storages/PostgreSQL/StorageMaterializedPostgreSQL.cpp @@ -225,6 +225,9 @@ void StorageMaterializedPostgreSQL::shutdown() { if (replication_handler) replication_handler->shutdown(); + auto nested = getNested(); + if (nested) + nested->shutdown(); } diff --git a/tests/integration/test_postgresql_replica_database_engine/test.py b/tests/integration/test_postgresql_replica_database_engine/test.py index 4be2ab01d6b..dca9be87311 100644 --- a/tests/integration/test_postgresql_replica_database_engine/test.py +++ b/tests/integration/test_postgresql_replica_database_engine/test.py @@ -101,7 +101,8 @@ queries = [ 'DELETE FROM postgresql_replica_{} WHERE value % 9 = 2;', "UPDATE postgresql_replica_{} SET key=key+10000000", 'UPDATE postgresql_replica_{} SET value = value + 2 WHERE key % 3 = 1;', - 'DELETE FROM postgresql_replica_{} WHERE value%5 = 0;'] + 'DELETE FROM postgresql_replica_{} WHERE value%5 = 0;' + ] @pytest.mark.timeout(30) @@ -836,66 +837,56 @@ def test_restart_server_while_replication_startup_not_finished(started_cluster): drop_materialized_db() -# Something not ok with this test, need to investigate. @pytest.mark.timeout(320) def test_abrupt_server_restart_while_heavy_replication(started_cluster): - return -# instance.query("DROP DATABASE IF EXISTS test_database") -# conn = get_postgres_conn(ip=started_cluster.postgres_ip, -# port=started_cluster.postgres_port, -# database=True) -# cursor = conn.cursor() -# NUM_TABLES = 6 -# -# for i in range(NUM_TABLES): -# create_postgres_table(cursor, 'postgresql_replica_{}'.format(i)); -# -# def transaction(thread_id): -# if thread_id % 2: -# conn = get_postgres_conn(ip=started_cluster.postgres_ip, -# port=started_cluster.postgres_port, -# database=True, auto_commit=True) -# else: -# conn = get_postgres_conn(ip=started_cluster.postgres_ip, -# port=started_cluster.postgres_port, -# database=True, auto_commit=False) -# cursor_ = conn.cursor() -# for query in queries: -# cursor_.execute(query.format(thread_id)) -# print('thread {}, query {}'.format(thread_id, query)) -# if thread_id % 2 == 0: -# conn.commit() -# -# threads = [] -# threads_num = 6 -# for i in range(threads_num): -# threads.append(threading.Thread(target=transaction, args=(i,))) -# -# create_materialized_db(ip=started_cluster.postgres_ip, -# port=started_cluster.postgres_port) -# -# for thread in threads: -# time.sleep(random.uniform(0, 0.5)) -# thread.start() -# -# # Join here because it takes time for data to reach wal -# for thread in threads: -# thread.join() -# time.sleep(1) -# instance.restart_clickhouse() -# -# for i in range(NUM_TABLES): -# result = instance.query("SELECT count() FROM test_database.postgresql_replica_{}".format(i)) -# print(result) # Just debug -# -# for i in range(NUM_TABLES): -# check_tables_are_synchronized('postgresql_replica_{}'.format(i)); -# -# for i in range(NUM_TABLES): -# result = instance.query("SELECT count() FROM test_database.postgresql_replica_{}".format(i)) -# print(result) # Just debug -# -# drop_materialized_db() + instance.query("DROP DATABASE IF EXISTS test_database") + conn = get_postgres_conn(ip=started_cluster.postgres_ip, + port=started_cluster.postgres_port, + database=True) + cursor = conn.cursor() + NUM_TABLES = 2 + + for i in range(NUM_TABLES): + create_postgres_table(cursor, 'postgresql_replica_{}'.format(i)); + + def transaction(thread_id): + conn = get_postgres_conn(ip=started_cluster.postgres_ip, + port=started_cluster.postgres_port, + database=True, auto_commit=True) + cursor_ = conn.cursor() + for query in queries: + cursor_.execute(query.format(thread_id)) + print('thread {}, query {}'.format(thread_id, query)) + + threads = [] + threads_num = 2 + for i in range(threads_num): + threads.append(threading.Thread(target=transaction, args=(i,))) + + create_materialized_db(ip=started_cluster.postgres_ip, + port=started_cluster.postgres_port) + + for thread in threads: + time.sleep(random.uniform(0, 0.5)) + thread.start() + + # Join here because it takes time for data to reach wal + for thread in threads: + thread.join() + instance.restart_clickhouse() + + for i in range(NUM_TABLES): + result = instance.query("SELECT count() FROM test_database.postgresql_replica_{}".format(i)) + print(result) # Just debug + + for i in range(NUM_TABLES): + check_tables_are_synchronized('postgresql_replica_{}'.format(i)); + + for i in range(NUM_TABLES): + result = instance.query("SELECT count() FROM test_database.postgresql_replica_{}".format(i)) + print(result) # Just debug + + drop_materialized_db() if __name__ == '__main__':