This commit is contained in:
kssenii 2021-02-20 18:37:54 +00:00
parent d590c32708
commit 883cc2c0ef
4 changed files with 10 additions and 6 deletions

View File

@ -181,7 +181,7 @@ void PostgreSQLReplicaConsumer::readTupleData(
Int32 col_len = readInt32(message, pos, size);
String value;
for (Int16 i = 0; i < col_len; ++i)
for (Int32 i = 0; i < col_len; ++i)
value += readInt8(message, pos, size);
insertValue(buffer, value, column_idx);

View File

@ -398,7 +398,7 @@ std::string PostgreSQLReplicationHandler::reloadFromSnapshot(NameSet & table_nam
Storages sync_storages;
for (const auto & table_name : table_names)
{
auto storage = storages[table_name];
auto * storage = storages[table_name];
sync_storages[table_name] = storage;
storage->dropNested();
}

View File

@ -336,7 +336,8 @@ void StoragePostgreSQLReplica::shutdownFinal()
void StoragePostgreSQLReplica::dropNested()
{
nested_loaded.store(false);
std::lock_guard lock(nested_mutex);
nested_loaded = false;
auto table_id = nested_storage->getStorageID();
auto ast_drop = std::make_shared<ASTDropQuery>();
@ -373,7 +374,9 @@ Pipe StoragePostgreSQLReplica::read(
unsigned num_streams)
{
/// If initial table sync has not yet finished, nested tables might not be created yet.
if (!nested_loaded)
/// Or nested table might be attempted to get dropped. (Second mutex lock in dropNested()).
std::unique_lock lock(nested_mutex, std::defer_lock);
if (!nested_loaded || !lock.try_lock())
{
LOG_WARNING(&Poco::Logger::get("StoragePostgreSQLReplica"), "Table {} is not loaded yet", getNestedTableName());
return Pipe();
@ -383,7 +386,7 @@ Pipe StoragePostgreSQLReplica::read(
if (!nested_storage)
getNested();
auto lock = nested_storage->lockForShare(context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
auto storage_lock = nested_storage->lockForShare(context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
const StorageMetadataPtr & nested_metadata = nested_storage->getInMemoryMetadataPtr();
@ -428,7 +431,7 @@ Pipe StoragePostgreSQLReplica::read(
nested_metadata, query_info, context,
processed_stage, max_block_size, num_streams);
pipe.addTableLock(lock);
pipe.addTableLock(storage_lock);
if (!expressions->children.empty() && !pipe.empty())
{

View File

@ -97,6 +97,7 @@ private:
std::atomic<bool> nested_loaded = false;
StoragePtr nested_storage;
std::mutex nested_mutex;
bool is_postgresql_replica_database = false;
};