From ce91c257b29f0a6d39f807cd745c56c75af3c87c Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Mon, 8 Feb 2021 22:07:30 +0300 Subject: [PATCH 1/2] Lockless SYSTEM FLUSH DISTRIBUTED Right now SYSTEM FLUSH DISTRIBUTED will block: - INSERT into this Distributed table (requireDirectoryMonitor()) - SELECT * FROM system.distribution_queue --- src/Storages/StorageDistributed.cpp | 26 ++++++++++++-------------- src/Storages/StorageDistributed.h | 5 +---- 2 files changed, 13 insertions(+), 18 deletions(-) diff --git a/src/Storages/StorageDistributed.cpp b/src/Storages/StorageDistributed.cpp index 02ee70dc8f4..8605013c65d 100644 --- a/src/Storages/StorageDistributed.cpp +++ b/src/Storages/StorageDistributed.cpp @@ -681,7 +681,7 @@ void StorageDistributed::truncate(const ASTPtr &, const StorageMetadataPtr &, co for (auto it = cluster_nodes_data.begin(); it != cluster_nodes_data.end();) { - it->second.shutdownAndDropAllData(); + it->second.directory_monitor->shutdownAndDropAllData(); it = cluster_nodes_data.erase(it); } @@ -799,16 +799,6 @@ ClusterPtr StorageDistributed::getOptimizedCluster(const Context & context, cons return cluster; } -void StorageDistributed::ClusterNodeData::flushAllData() const -{ - directory_monitor->flushAllData(); -} - -void StorageDistributed::ClusterNodeData::shutdownAndDropAllData() const -{ - directory_monitor->shutdownAndDropAllData(); -} - IColumn::Selector StorageDistributed::createSelector(const ClusterPtr cluster, const ColumnWithTypeAndName & result) { const auto & slot_to_shard = cluster->getSlotToShard(); @@ -894,11 +884,19 @@ ActionLock StorageDistributed::getActionLock(StorageActionBlockType type) void StorageDistributed::flushClusterNodesAllData() { - std::lock_guard lock(cluster_nodes_mutex); + std::vector> directory_monitors; + + { + std::lock_guard lock(cluster_nodes_mutex); + + directory_monitors.reserve(cluster_nodes_data.size()); + for (auto & node : cluster_nodes_data) + directory_monitors.push_back(node.second.directory_monitor); + } /// TODO: Maybe it should be executed in parallel - for (auto & node : cluster_nodes_data) - node.second.flushAllData(); + for (auto & node : directory_monitors) + node->flushAllData(); } void StorageDistributed::rename(const String & new_path_to_table_data, const StorageID & new_table_id) diff --git a/src/Storages/StorageDistributed.h b/src/Storages/StorageDistributed.h index 585efafddfb..928b6297297 100644 --- a/src/Storages/StorageDistributed.h +++ b/src/Storages/StorageDistributed.h @@ -200,11 +200,8 @@ protected: struct ClusterNodeData { - std::unique_ptr directory_monitor; + std::shared_ptr directory_monitor; ConnectionPoolPtr connection_pool; - - void flushAllData() const; - void shutdownAndDropAllData() const; }; std::unordered_map cluster_nodes_data; mutable std::mutex cluster_nodes_mutex; From 809fa7e4cc1b4f204533c84d31cab36b0a8ef68d Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Wed, 10 Feb 2021 23:07:28 +0300 Subject: [PATCH 2/2] Sync SYSTEM FLUSH DISTRIBUTED with TRUNCATE --- src/Interpreters/InterpreterSystemQuery.cpp | 2 +- src/Storages/StorageDistributed.cpp | 5 ++++- src/Storages/StorageDistributed.h | 2 +- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/src/Interpreters/InterpreterSystemQuery.cpp b/src/Interpreters/InterpreterSystemQuery.cpp index 86706701141..0e9683de95f 100644 --- a/src/Interpreters/InterpreterSystemQuery.cpp +++ b/src/Interpreters/InterpreterSystemQuery.cpp @@ -606,7 +606,7 @@ void InterpreterSystemQuery::flushDistributed(ASTSystemQuery &) context.checkAccess(AccessType::SYSTEM_FLUSH_DISTRIBUTED, table_id); if (auto * storage_distributed = dynamic_cast(DatabaseCatalog::instance().getTable(table_id, context).get())) - storage_distributed->flushClusterNodesAllData(); + storage_distributed->flushClusterNodesAllData(context); else throw Exception("Table " + table_id.getNameForLogs() + " is not distributed", ErrorCodes::BAD_ARGUMENTS); } diff --git a/src/Storages/StorageDistributed.cpp b/src/Storages/StorageDistributed.cpp index 8605013c65d..c08dc38fa2d 100644 --- a/src/Storages/StorageDistributed.cpp +++ b/src/Storages/StorageDistributed.cpp @@ -882,8 +882,11 @@ ActionLock StorageDistributed::getActionLock(StorageActionBlockType type) return {}; } -void StorageDistributed::flushClusterNodesAllData() +void StorageDistributed::flushClusterNodesAllData(const Context & context) { + /// Sync SYSTEM FLUSH DISTRIBUTED with TRUNCATE + auto table_lock = lockForShare(context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout); + std::vector> directory_monitors; { diff --git a/src/Storages/StorageDistributed.h b/src/Storages/StorageDistributed.h index 928b6297297..4d3869f7c5c 100644 --- a/src/Storages/StorageDistributed.h +++ b/src/Storages/StorageDistributed.h @@ -114,7 +114,7 @@ public: /// (note that monitors are created lazily, i.e. until at least one INSERT executed) std::vector getDirectoryMonitorsStatuses() const; - void flushClusterNodesAllData(); + void flushClusterNodesAllData(const Context & context); ClusterPtr getCluster() const;