Merge pull request #20215 from azat/dist-lockless-SYSTEM-FLUSH-DISTRIBUTED

Lockless SYSTEM FLUSH DISTRIBUTED
This commit is contained in:
tavplubix 2021-02-11 12:30:46 +03:00 committed by GitHub
commit 17af32a59b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 19 additions and 21 deletions

View File

@ -606,7 +606,7 @@ void InterpreterSystemQuery::flushDistributed(ASTSystemQuery &)
context.checkAccess(AccessType::SYSTEM_FLUSH_DISTRIBUTED, table_id);
if (auto * storage_distributed = dynamic_cast<StorageDistributed *>(DatabaseCatalog::instance().getTable(table_id, context).get()))
storage_distributed->flushClusterNodesAllData();
storage_distributed->flushClusterNodesAllData(context);
else
throw Exception("Table " + table_id.getNameForLogs() + " is not distributed", ErrorCodes::BAD_ARGUMENTS);
}

View File

@ -681,7 +681,7 @@ void StorageDistributed::truncate(const ASTPtr &, const StorageMetadataPtr &, co
for (auto it = cluster_nodes_data.begin(); it != cluster_nodes_data.end();)
{
it->second.shutdownAndDropAllData();
it->second.directory_monitor->shutdownAndDropAllData();
it = cluster_nodes_data.erase(it);
}
@ -799,16 +799,6 @@ ClusterPtr StorageDistributed::getOptimizedCluster(const Context & context, cons
return cluster;
}
void StorageDistributed::ClusterNodeData::flushAllData() const
{
directory_monitor->flushAllData();
}
void StorageDistributed::ClusterNodeData::shutdownAndDropAllData() const
{
directory_monitor->shutdownAndDropAllData();
}
IColumn::Selector StorageDistributed::createSelector(const ClusterPtr cluster, const ColumnWithTypeAndName & result)
{
const auto & slot_to_shard = cluster->getSlotToShard();
@ -892,13 +882,24 @@ ActionLock StorageDistributed::getActionLock(StorageActionBlockType type)
return {};
}
void StorageDistributed::flushClusterNodesAllData()
void StorageDistributed::flushClusterNodesAllData(const Context & context)
{
std::lock_guard lock(cluster_nodes_mutex);
/// Sync SYSTEM FLUSH DISTRIBUTED with TRUNCATE
auto table_lock = lockForShare(context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
std::vector<std::shared_ptr<StorageDistributedDirectoryMonitor>> directory_monitors;
{
std::lock_guard lock(cluster_nodes_mutex);
directory_monitors.reserve(cluster_nodes_data.size());
for (auto & node : cluster_nodes_data)
directory_monitors.push_back(node.second.directory_monitor);
}
/// TODO: Maybe it should be executed in parallel
for (auto & node : cluster_nodes_data)
node.second.flushAllData();
for (auto & node : directory_monitors)
node->flushAllData();
}
void StorageDistributed::rename(const String & new_path_to_table_data, const StorageID & new_table_id)

View File

@ -114,7 +114,7 @@ public:
/// (note that monitors are created lazily, i.e. until at least one INSERT executed)
std::vector<StorageDistributedDirectoryMonitor::Status> getDirectoryMonitorsStatuses() const;
void flushClusterNodesAllData();
void flushClusterNodesAllData(const Context & context);
ClusterPtr getCluster() const;
@ -200,11 +200,8 @@ protected:
struct ClusterNodeData
{
std::unique_ptr<StorageDistributedDirectoryMonitor> directory_monitor;
std::shared_ptr<StorageDistributedDirectoryMonitor> directory_monitor;
ConnectionPoolPtr connection_pool;
void flushAllData() const;
void shutdownAndDropAllData() const;
};
std::unordered_map<std::string, ClusterNodeData> cluster_nodes_data;
mutable std::mutex cluster_nodes_mutex;