Rename system sync distributed to system flush distributed

This commit is contained in:
zhang2014 2019-05-10 12:19:02 +08:00
parent 1a33840964
commit 80788cd7a8
12 changed files with 23 additions and 26 deletions

View File

@ -206,8 +206,8 @@ BlockIO InterpreterSystemQuery::execute()
case Type::SYNC_REPLICA:
syncReplica(query);
break;
case Type::SYNC_DISTRIBUTED:
syncDistributed(query);
case Type::FLUSH_DISTRIBUTED:
flushDistributed(query);
break;
case Type::RESTART_REPLICAS:
restartReplicas(system_context);
@ -321,13 +321,13 @@ void InterpreterSystemQuery::syncReplica(ASTSystemQuery & query)
throw Exception("Table " + database_name + "." + table_name + " is not replicated", ErrorCodes::BAD_ARGUMENTS);
}
void InterpreterSystemQuery::syncDistributed(ASTSystemQuery & query)
void InterpreterSystemQuery::flushDistributed(ASTSystemQuery & query)
{
String database_name = !query.target_database.empty() ? query.target_database : context.getCurrentDatabase();
String & table_name = query.target_table;
if (auto storage_distributed = dynamic_cast<StorageDistributed *>(context.getTable(database_name, table_name).get()))
storage_distributed->syncReplicaSends();
storage_distributed->flushClusterNodesAllData();
else
throw Exception("Table " + database_name + "." + table_name + " is not distributed", ErrorCodes::BAD_ARGUMENTS);
}

View File

@ -31,7 +31,7 @@ private:
void restartReplicas(Context & system_context);
void syncReplica(ASTSystemQuery & query);
void syncDistributed(ASTSystemQuery & query);
void flushDistributed(ASTSystemQuery & query);
};

View File

@ -41,8 +41,8 @@ const char * ASTSystemQuery::typeToString(Type type)
return "RESTART REPLICA";
case Type::SYNC_REPLICA:
return "SYNC REPLICA";
case Type::SYNC_DISTRIBUTED:
return "SYNC DISTRIBUTED";
case Type::FLUSH_DISTRIBUTED:
return "FLUSH DISTRIBUTED";
case Type::RELOAD_DICTIONARY:
return "RELOAD DICTIONARY";
case Type::RELOAD_DICTIONARIES:
@ -112,7 +112,7 @@ void ASTSystemQuery::formatImpl(const FormatSettings & settings, FormatState &,
if (!target_table.empty())
print_database_table();
}
else if (type == Type::RESTART_REPLICA || type == Type::SYNC_REPLICA || type == Type::SYNC_DISTRIBUTED)
else if (type == Type::RESTART_REPLICA || type == Type::SYNC_REPLICA || type == Type::FLUSH_DISTRIBUTED)
{
print_database_table();
}

View File

@ -40,7 +40,7 @@ public:
STOP_REPLICATION_QUEUES,
START_REPLICATION_QUEUES,
FLUSH_LOGS,
SYNC_DISTRIBUTED,
FLUSH_DISTRIBUTED,
STOP_DISTRIBUTED_SENDS,
START_DISTRIBUTED_SENDS,
END

View File

@ -49,7 +49,7 @@ bool ParserSystemQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected &
case Type::RESTART_REPLICA:
case Type::SYNC_REPLICA:
case Type::SYNC_DISTRIBUTED:
case Type::FLUSH_DISTRIBUTED:
if (!parseDatabaseAndTableName(pos, expected, res->target_database, res->target_table))
return false;
break;

View File

@ -87,15 +87,12 @@ StorageDistributedDirectoryMonitor::~StorageDistributedDirectoryMonitor()
}
}
void StorageDistributedDirectoryMonitor::syncReplicaSends()
void StorageDistributedDirectoryMonitor::flushAllData()
{
if (!quit)
{
if (monitor_blocker.isCancelled())
throw Exception("Cancelled sync distributed sends.", ErrorCodes::ABORTED);
std::unique_lock lock{mutex};
findFiles();
processFiles();
}
}
@ -131,7 +128,7 @@ void StorageDistributedDirectoryMonitor::run()
{
try
{
do_sleep = !findFiles();
do_sleep = !processFiles();
}
catch (...)
{
@ -195,7 +192,7 @@ ConnectionPoolPtr StorageDistributedDirectoryMonitor::createPool(const std::stri
}
bool StorageDistributedDirectoryMonitor::findFiles()
bool StorageDistributedDirectoryMonitor::processFiles()
{
std::map<UInt64, std::string> files;

View File

@ -26,12 +26,12 @@ public:
static ConnectionPoolPtr createPool(const std::string & name, const StorageDistributed & storage);
void syncReplicaSends();
void flushAllData();
void shutdownAndDropAllData();
private:
void run();
bool findFiles();
bool processFiles();
void processFile(const std::string & file_path);
void processFilesWithBatching(const std::map<UInt64, std::string> & files);

View File

@ -466,9 +466,9 @@ void StorageDistributed::ClusterNodeData::requireDirectoryMonitor(
directory_monitor = std::make_unique<StorageDistributedDirectoryMonitor>(storage, name, conneciton_pool, monitor_blocker);
}
void StorageDistributed::ClusterNodeData::syncReplicaSends()
void StorageDistributed::ClusterNodeData::flushAllData()
{
directory_monitor->syncReplicaSends();
directory_monitor->flushAllData();
}
void StorageDistributed::ClusterNodeData::shutdownAndDropAllData()
@ -516,13 +516,13 @@ ActionLock StorageDistributed::getActionLock(StorageActionBlockType type)
return {};
}
void StorageDistributed::syncReplicaSends()
void StorageDistributed::flushClusterNodesAllData()
{
std::lock_guard lock(cluster_nodes_mutex);
/// TODO: Maybe it should be executed in parallel
for (auto it = cluster_nodes_data.begin(); it != cluster_nodes_data.end(); ++it)
it->second.syncReplicaSends();
it->second.flushAllData();
}

View File

@ -106,7 +106,7 @@ public:
/// ensure connection pool creation and return it
ConnectionPoolPtr requireConnectionPool(const std::string & name);
void syncReplicaSends();
void flushClusterNodesAllData();
ClusterPtr getCluster() const;
@ -141,7 +141,7 @@ public:
/// Creates directory_monitor if not exists.
void requireDirectoryMonitor(const std::string & name, StorageDistributed & storage, ActionBlocker & monitor_blocker);
void syncReplicaSends();
void flushAllData();
void shutdownAndDropAllData();
};

View File

@ -36,6 +36,6 @@ def test_start_and_stop_replica_send(started_cluster):
assert node1.query("SELECT COUNT() FROM distributed_table").rstrip() == '1'
node1.query("SYSTEM START DISTRIBUTED SENDS distributed_table;")
node1.query("SYSTEM SYNC DISTRIBUTED distributed_table;")
node1.query("SYSTEM FLUSH DISTRIBUTED distributed_table;")
assert node1.query("SELECT COUNT() FROM distributed_table").rstrip() == '2'