diff --git a/src/Databases/PostgreSQL/DatabasePostgreSQLReplica.cpp b/src/Databases/PostgreSQL/DatabasePostgreSQLReplica.cpp index 35d808c0dec..9105dc9ba25 100644 --- a/src/Databases/PostgreSQL/DatabasePostgreSQLReplica.cpp +++ b/src/Databases/PostgreSQL/DatabasePostgreSQLReplica.cpp @@ -88,7 +88,6 @@ void DatabasePostgreSQLReplica::startSynchronization() : (global_context.getSettingsRef().max_insert_block_size.value), global_context.getMacros()->expand(settings->postgresql_tables_list.value)); - /// TODO: may be no need to always fetch std::unordered_set tables_to_replicate = replication_handler->fetchRequiredTables(connection->conn()); for (const auto & table_name : tables_to_replicate) @@ -179,7 +178,6 @@ void DatabasePostgreSQLReplica::createTable(const Context & context, const if (storage_set.find("ReplacingMergeTree") != storage_set.end()) { Base::createTable(context, name, table, query); - /// TODO: Update table cached tables list or not return; } } @@ -191,7 +189,6 @@ void DatabasePostgreSQLReplica::createTable(const Context & context, const template void DatabasePostgreSQLReplica::dropTable(const Context & context, const String & name, bool no_delay) { - /// TODO: If called from non sync thread, add dropped storage to skip list Base::dropTable(context, name, no_delay); } diff --git a/src/Storages/PostgreSQL/PostgreSQLReplicaConsumer.h b/src/Storages/PostgreSQL/PostgreSQLReplicaConsumer.h index f8b214db4b7..55f8a949cd1 100644 --- a/src/Storages/PostgreSQL/PostgreSQLReplicaConsumer.h +++ b/src/Storages/PostgreSQL/PostgreSQLReplicaConsumer.h @@ -19,7 +19,7 @@ /// TODO: There is ALTER PUBLICATION command to dynamically add and remove tables for replicating (the command is transactional). -/// This can also be supported. (Probably, if in a replication stream comes a relation name, which does not currenly +/// This can also be supported. (Probably, if in a replication stream comes a relation name, which does not currently /// exist in CH, it can be loaded from snapshot and handled the same way as some ddl by comparing lsn positions of wal, /// but there is the case that a known table has been just renamed, then the previous version might be just dropped by user). @@ -139,7 +139,7 @@ private: /// if relation definition has changed since the last relation definition message. std::unordered_map schema_data; - /// skip_list contains relation ids for tables on which ddl was perfomed, which can break synchronization. + /// skip_list contains relation ids for tables on which ddl was performed, which can break synchronization. /// This breaking changes are detected in replication stream in according replication message and table is added to skip list. /// After it is finished, a temporary replication slot is created with 'export snapshot' option, and start_lsn is returned. /// Skipped tables are reloaded from snapshot (nested tables are also updated). Afterwards, if a replication message is diff --git a/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.cpp b/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.cpp index 83afe658661..eccaa3c7acf 100644 --- a/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.cpp +++ b/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.cpp @@ -420,9 +420,9 @@ std::unordered_map PostgreSQLReplicationHandler::reloadFromSnapsh auto ntx = std::make_shared(*replication_connection->conn()); std::string snapshot_name, start_lsn; createReplicationSlot(ntx, start_lsn, snapshot_name, true); - ntx->commit(); - + /// This snapshot is valid up to the end of the transaction, which exported it. auto success_tables = loadFromSnapshot(snapshot_name, sync_storages); + ntx->commit(); for (const auto & relation : relation_data) { diff --git a/src/Storages/PostgreSQL/StoragePostgreSQLReplica.cpp b/src/Storages/PostgreSQL/StoragePostgreSQLReplica.cpp index 0f895c21ae4..4b2e746a557 100644 --- a/src/Storages/PostgreSQL/StoragePostgreSQLReplica.cpp +++ b/src/Storages/PostgreSQL/StoragePostgreSQLReplica.cpp @@ -374,7 +374,6 @@ Pipe StoragePostgreSQLReplica::read( size_t max_block_size, unsigned num_streams) { - /// TODO: are there other places where this lock is needed std::unique_lock lock(nested_mutex, std::defer_lock); if (nested_loaded && lock.try_lock())