diff --git a/dbms/src/Interpreters/InterpreterSystemQuery.cpp b/dbms/src/Interpreters/InterpreterSystemQuery.cpp index 5ab0a979f3d..25319e65756 100644 --- a/dbms/src/Interpreters/InterpreterSystemQuery.cpp +++ b/dbms/src/Interpreters/InterpreterSystemQuery.cpp @@ -206,8 +206,8 @@ BlockIO InterpreterSystemQuery::execute() case Type::SYNC_REPLICA: syncReplica(query); break; - case Type::SYNC_DISTRIBUTED: - syncDistributed(query); + case Type::FLUSH_DISTRIBUTED: + flushDistributed(query); break; case Type::RESTART_REPLICAS: restartReplicas(system_context); @@ -321,13 +321,13 @@ void InterpreterSystemQuery::syncReplica(ASTSystemQuery & query) throw Exception("Table " + database_name + "." + table_name + " is not replicated", ErrorCodes::BAD_ARGUMENTS); } -void InterpreterSystemQuery::syncDistributed(ASTSystemQuery & query) +void InterpreterSystemQuery::flushDistributed(ASTSystemQuery & query) { String database_name = !query.target_database.empty() ? query.target_database : context.getCurrentDatabase(); String & table_name = query.target_table; if (auto storage_distributed = dynamic_cast(context.getTable(database_name, table_name).get())) - storage_distributed->syncReplicaSends(); + storage_distributed->flushClusterNodesAllData(); else throw Exception("Table " + database_name + "." + table_name + " is not distributed", ErrorCodes::BAD_ARGUMENTS); } diff --git a/dbms/src/Interpreters/InterpreterSystemQuery.h b/dbms/src/Interpreters/InterpreterSystemQuery.h index 6c38c2c146b..31945745c1e 100644 --- a/dbms/src/Interpreters/InterpreterSystemQuery.h +++ b/dbms/src/Interpreters/InterpreterSystemQuery.h @@ -31,7 +31,7 @@ private: void restartReplicas(Context & system_context); void syncReplica(ASTSystemQuery & query); - void syncDistributed(ASTSystemQuery & query); + void flushDistributed(ASTSystemQuery & query); }; diff --git a/dbms/src/Parsers/ASTSystemQuery.cpp b/dbms/src/Parsers/ASTSystemQuery.cpp index b3477206371..699dd9d0f54 100644 --- a/dbms/src/Parsers/ASTSystemQuery.cpp +++ b/dbms/src/Parsers/ASTSystemQuery.cpp @@ -41,8 +41,8 @@ const char * ASTSystemQuery::typeToString(Type type) return "RESTART REPLICA"; case Type::SYNC_REPLICA: return "SYNC REPLICA"; - case Type::SYNC_DISTRIBUTED: - return "SYNC DISTRIBUTED"; + case Type::FLUSH_DISTRIBUTED: + return "FLUSH DISTRIBUTED"; case Type::RELOAD_DICTIONARY: return "RELOAD DICTIONARY"; case Type::RELOAD_DICTIONARIES: @@ -112,7 +112,7 @@ void ASTSystemQuery::formatImpl(const FormatSettings & settings, FormatState &, if (!target_table.empty()) print_database_table(); } - else if (type == Type::RESTART_REPLICA || type == Type::SYNC_REPLICA || type == Type::SYNC_DISTRIBUTED) + else if (type == Type::RESTART_REPLICA || type == Type::SYNC_REPLICA || type == Type::FLUSH_DISTRIBUTED) { print_database_table(); } diff --git a/dbms/src/Parsers/ASTSystemQuery.h b/dbms/src/Parsers/ASTSystemQuery.h index 46be199f8c3..0ff8228e2e0 100644 --- a/dbms/src/Parsers/ASTSystemQuery.h +++ b/dbms/src/Parsers/ASTSystemQuery.h @@ -40,7 +40,7 @@ public: STOP_REPLICATION_QUEUES, START_REPLICATION_QUEUES, FLUSH_LOGS, - SYNC_DISTRIBUTED, + FLUSH_DISTRIBUTED, STOP_DISTRIBUTED_SENDS, START_DISTRIBUTED_SENDS, END diff --git a/dbms/src/Parsers/ParserSystemQuery.cpp b/dbms/src/Parsers/ParserSystemQuery.cpp index f6680996e4f..333613e9512 100644 --- a/dbms/src/Parsers/ParserSystemQuery.cpp +++ b/dbms/src/Parsers/ParserSystemQuery.cpp @@ -49,7 +49,7 @@ bool ParserSystemQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected & case Type::RESTART_REPLICA: case Type::SYNC_REPLICA: - case Type::SYNC_DISTRIBUTED: + case Type::FLUSH_DISTRIBUTED: if (!parseDatabaseAndTableName(pos, expected, res->target_database, res->target_table)) return false; break; diff --git a/dbms/src/Storages/Distributed/DirectoryMonitor.cpp b/dbms/src/Storages/Distributed/DirectoryMonitor.cpp index fea7b2deb97..2d19804cdc8 100644 --- a/dbms/src/Storages/Distributed/DirectoryMonitor.cpp +++ b/dbms/src/Storages/Distributed/DirectoryMonitor.cpp @@ -87,15 +87,12 @@ StorageDistributedDirectoryMonitor::~StorageDistributedDirectoryMonitor() } } -void StorageDistributedDirectoryMonitor::syncReplicaSends() +void StorageDistributedDirectoryMonitor::flushAllData() { if (!quit) { - if (monitor_blocker.isCancelled()) - throw Exception("Cancelled sync distributed sends.", ErrorCodes::ABORTED); - std::unique_lock lock{mutex}; - findFiles(); + processFiles(); } } @@ -131,7 +128,7 @@ void StorageDistributedDirectoryMonitor::run() { try { - do_sleep = !findFiles(); + do_sleep = !processFiles(); } catch (...) { @@ -195,7 +192,7 @@ ConnectionPoolPtr StorageDistributedDirectoryMonitor::createPool(const std::stri } -bool StorageDistributedDirectoryMonitor::findFiles() +bool StorageDistributedDirectoryMonitor::processFiles() { std::map files; diff --git a/dbms/src/Storages/Distributed/DirectoryMonitor.h b/dbms/src/Storages/Distributed/DirectoryMonitor.h index 538dd8d3c25..9416db9be2c 100644 --- a/dbms/src/Storages/Distributed/DirectoryMonitor.h +++ b/dbms/src/Storages/Distributed/DirectoryMonitor.h @@ -26,12 +26,12 @@ public: static ConnectionPoolPtr createPool(const std::string & name, const StorageDistributed & storage); - void syncReplicaSends(); + void flushAllData(); void shutdownAndDropAllData(); private: void run(); - bool findFiles(); + bool processFiles(); void processFile(const std::string & file_path); void processFilesWithBatching(const std::map & files); diff --git a/dbms/src/Storages/StorageDistributed.cpp b/dbms/src/Storages/StorageDistributed.cpp index ffa2764e9db..97eac4a9374 100644 --- a/dbms/src/Storages/StorageDistributed.cpp +++ b/dbms/src/Storages/StorageDistributed.cpp @@ -466,9 +466,9 @@ void StorageDistributed::ClusterNodeData::requireDirectoryMonitor( directory_monitor = std::make_unique(storage, name, conneciton_pool, monitor_blocker); } -void StorageDistributed::ClusterNodeData::syncReplicaSends() +void StorageDistributed::ClusterNodeData::flushAllData() { - directory_monitor->syncReplicaSends(); + directory_monitor->flushAllData(); } void StorageDistributed::ClusterNodeData::shutdownAndDropAllData() @@ -516,13 +516,13 @@ ActionLock StorageDistributed::getActionLock(StorageActionBlockType type) return {}; } -void StorageDistributed::syncReplicaSends() +void StorageDistributed::flushClusterNodesAllData() { std::lock_guard lock(cluster_nodes_mutex); /// TODO: Maybe it should be executed in parallel for (auto it = cluster_nodes_data.begin(); it != cluster_nodes_data.end(); ++it) - it->second.syncReplicaSends(); + it->second.flushAllData(); } diff --git a/dbms/src/Storages/StorageDistributed.h b/dbms/src/Storages/StorageDistributed.h index 000d969dabe..fee3ba78d8d 100644 --- a/dbms/src/Storages/StorageDistributed.h +++ b/dbms/src/Storages/StorageDistributed.h @@ -106,7 +106,7 @@ public: /// ensure connection pool creation and return it ConnectionPoolPtr requireConnectionPool(const std::string & name); - void syncReplicaSends(); + void flushClusterNodesAllData(); ClusterPtr getCluster() const; @@ -141,7 +141,7 @@ public: /// Creates directory_monitor if not exists. void requireDirectoryMonitor(const std::string & name, StorageDistributed & storage, ActionBlocker & monitor_blocker); - void syncReplicaSends(); + void flushAllData(); void shutdownAndDropAllData(); }; diff --git a/dbms/tests/integration/test_system_replica_query_with_distributed/__init__.py b/dbms/tests/integration/test_distributed_system_query/__init__.py similarity index 100% rename from dbms/tests/integration/test_system_replica_query_with_distributed/__init__.py rename to dbms/tests/integration/test_distributed_system_query/__init__.py diff --git a/dbms/tests/integration/test_system_replica_query_with_distributed/configs/remote_servers.xml b/dbms/tests/integration/test_distributed_system_query/configs/remote_servers.xml similarity index 100% rename from dbms/tests/integration/test_system_replica_query_with_distributed/configs/remote_servers.xml rename to dbms/tests/integration/test_distributed_system_query/configs/remote_servers.xml diff --git a/dbms/tests/integration/test_system_replica_query_with_distributed/test.py b/dbms/tests/integration/test_distributed_system_query/test.py similarity index 95% rename from dbms/tests/integration/test_system_replica_query_with_distributed/test.py rename to dbms/tests/integration/test_distributed_system_query/test.py index fc16d4d4585..0eac816fc1b 100644 --- a/dbms/tests/integration/test_system_replica_query_with_distributed/test.py +++ b/dbms/tests/integration/test_distributed_system_query/test.py @@ -36,6 +36,6 @@ def test_start_and_stop_replica_send(started_cluster): assert node1.query("SELECT COUNT() FROM distributed_table").rstrip() == '1' node1.query("SYSTEM START DISTRIBUTED SENDS distributed_table;") - node1.query("SYSTEM SYNC DISTRIBUTED distributed_table;") + node1.query("SYSTEM FLUSH DISTRIBUTED distributed_table;") assert node1.query("SELECT COUNT() FROM distributed_table").rstrip() == '2'