Lockless SYSTEM FLUSH DISTRIBUTED

Right now SYSTEM FLUSH DISTRIBUTED will block:
- INSERT into this Distributed table (requireDirectoryMonitor())
- SELECT * FROM system.distribution_queue
This commit is contained in:
Azat Khuzhin 2021-02-08 22:07:30 +03:00
parent ebf5868652
commit ce91c257b2
2 changed files with 13 additions and 18 deletions

View File

@ -681,7 +681,7 @@ void StorageDistributed::truncate(const ASTPtr &, const StorageMetadataPtr &, co
for (auto it = cluster_nodes_data.begin(); it != cluster_nodes_data.end();)
{
it->second.shutdownAndDropAllData();
it->second.directory_monitor->shutdownAndDropAllData();
it = cluster_nodes_data.erase(it);
}
@ -799,16 +799,6 @@ ClusterPtr StorageDistributed::getOptimizedCluster(const Context & context, cons
return cluster;
}
void StorageDistributed::ClusterNodeData::flushAllData() const
{
directory_monitor->flushAllData();
}
void StorageDistributed::ClusterNodeData::shutdownAndDropAllData() const
{
directory_monitor->shutdownAndDropAllData();
}
IColumn::Selector StorageDistributed::createSelector(const ClusterPtr cluster, const ColumnWithTypeAndName & result)
{
const auto & slot_to_shard = cluster->getSlotToShard();
@ -894,11 +884,19 @@ ActionLock StorageDistributed::getActionLock(StorageActionBlockType type)
void StorageDistributed::flushClusterNodesAllData()
{
std::lock_guard lock(cluster_nodes_mutex);
std::vector<std::shared_ptr<StorageDistributedDirectoryMonitor>> directory_monitors;
{
std::lock_guard lock(cluster_nodes_mutex);
directory_monitors.reserve(cluster_nodes_data.size());
for (auto & node : cluster_nodes_data)
directory_monitors.push_back(node.second.directory_monitor);
}
/// TODO: Maybe it should be executed in parallel
for (auto & node : cluster_nodes_data)
node.second.flushAllData();
for (auto & node : directory_monitors)
node->flushAllData();
}
void StorageDistributed::rename(const String & new_path_to_table_data, const StorageID & new_table_id)

View File

@ -200,11 +200,8 @@ protected:
struct ClusterNodeData
{
std::unique_ptr<StorageDistributedDirectoryMonitor> directory_monitor;
std::shared_ptr<StorageDistributedDirectoryMonitor> directory_monitor;
ConnectionPoolPtr connection_pool;
void flushAllData() const;
void shutdownAndDropAllData() const;
};
std::unordered_map<std::string, ClusterNodeData> cluster_nodes_data;
mutable std::mutex cluster_nodes_mutex;