diff --git a/src/Storages/PostgreSQL/PostgreSQLReplicaConsumer.cpp b/src/Storages/PostgreSQL/PostgreSQLReplicaConsumer.cpp index efc5c4614e7..aecbb7b3bec 100644 --- a/src/Storages/PostgreSQL/PostgreSQLReplicaConsumer.cpp +++ b/src/Storages/PostgreSQL/PostgreSQLReplicaConsumer.cpp @@ -181,7 +181,7 @@ void PostgreSQLReplicaConsumer::readTupleData( Int32 col_len = readInt32(message, pos, size); String value; - for (Int16 i = 0; i < col_len; ++i) + for (Int32 i = 0; i < col_len; ++i) value += readInt8(message, pos, size); insertValue(buffer, value, column_idx); diff --git a/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.cpp b/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.cpp index a6b1ca64330..c95b4bacad9 100644 --- a/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.cpp +++ b/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.cpp @@ -398,7 +398,7 @@ std::string PostgreSQLReplicationHandler::reloadFromSnapshot(NameSet & table_nam Storages sync_storages; for (const auto & table_name : table_names) { - auto storage = storages[table_name]; + auto * storage = storages[table_name]; sync_storages[table_name] = storage; storage->dropNested(); } diff --git a/src/Storages/PostgreSQL/StoragePostgreSQLReplica.cpp b/src/Storages/PostgreSQL/StoragePostgreSQLReplica.cpp index f85b9510f50..c54f9a31cd4 100644 --- a/src/Storages/PostgreSQL/StoragePostgreSQLReplica.cpp +++ b/src/Storages/PostgreSQL/StoragePostgreSQLReplica.cpp @@ -336,7 +336,8 @@ void StoragePostgreSQLReplica::shutdownFinal() void StoragePostgreSQLReplica::dropNested() { - nested_loaded.store(false); + std::lock_guard lock(nested_mutex); + nested_loaded = false; auto table_id = nested_storage->getStorageID(); auto ast_drop = std::make_shared(); @@ -373,7 +374,9 @@ Pipe StoragePostgreSQLReplica::read( unsigned num_streams) { /// If initial table sync has not yet finished, nested tables might not be created yet. - if (!nested_loaded) + /// Or nested table might be attempted to get dropped. (Second mutex lock in dropNested()). + std::unique_lock lock(nested_mutex, std::defer_lock); + if (!nested_loaded || !lock.try_lock()) { LOG_WARNING(&Poco::Logger::get("StoragePostgreSQLReplica"), "Table {} is not loaded yet", getNestedTableName()); return Pipe(); @@ -383,7 +386,7 @@ Pipe StoragePostgreSQLReplica::read( if (!nested_storage) getNested(); - auto lock = nested_storage->lockForShare(context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout); + auto storage_lock = nested_storage->lockForShare(context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout); const StorageMetadataPtr & nested_metadata = nested_storage->getInMemoryMetadataPtr(); @@ -428,7 +431,7 @@ Pipe StoragePostgreSQLReplica::read( nested_metadata, query_info, context, processed_stage, max_block_size, num_streams); - pipe.addTableLock(lock); + pipe.addTableLock(storage_lock); if (!expressions->children.empty() && !pipe.empty()) { diff --git a/src/Storages/PostgreSQL/StoragePostgreSQLReplica.h b/src/Storages/PostgreSQL/StoragePostgreSQLReplica.h index d2bb80307fc..4d407f337ad 100644 --- a/src/Storages/PostgreSQL/StoragePostgreSQLReplica.h +++ b/src/Storages/PostgreSQL/StoragePostgreSQLReplica.h @@ -97,6 +97,7 @@ private: std::atomic nested_loaded = false; StoragePtr nested_storage; + std::mutex nested_mutex; bool is_postgresql_replica_database = false; };