fix review

This commit is contained in:
zhang2014 2019-04-22 23:11:16 +08:00
parent c44f608868
commit 1a33840964
9 changed files with 57 additions and 14 deletions

View File

@ -13,6 +13,7 @@ namespace ActionLocks
extern const StorageActionBlockType PartsFetch = 2;
extern const StorageActionBlockType PartsSend = 3;
extern const StorageActionBlockType ReplicationQueue = 4;
extern const StorageActionBlockType DistributedSend = 5;
}

View File

@ -23,6 +23,7 @@
#include <Parsers/ASTCreateQuery.h>
#include <csignal>
#include <algorithm>
#include "InterpreterSystemQuery.h"
namespace DB
@ -43,6 +44,7 @@ namespace ActionLocks
extern StorageActionBlockType PartsFetch;
extern StorageActionBlockType PartsSend;
extern StorageActionBlockType ReplicationQueue;
extern StorageActionBlockType DistributedSend;
}
@ -195,9 +197,18 @@ BlockIO InterpreterSystemQuery::execute()
case Type::START_REPLICATION_QUEUES:
startStopAction(context, query, ActionLocks::ReplicationQueue, true);
break;
case Type::STOP_DISTRIBUTED_SENDS:
startStopAction(context, query, ActionLocks::DistributedSend, false);
break;
case Type::START_DISTRIBUTED_SENDS:
startStopAction(context, query, ActionLocks::DistributedSend, true);
break;
case Type::SYNC_REPLICA:
syncReplica(query);
break;
case Type::SYNC_DISTRIBUTED:
syncDistributed(query);
break;
case Type::RESTART_REPLICAS:
restartReplicas(system_context);
break;
@ -304,13 +315,22 @@ void InterpreterSystemQuery::syncReplica(ASTSystemQuery & query)
StoragePtr table = context.getTable(database_name, table_name);
if (auto storage_distributed = dynamic_cast<StorageDistributed *>(table.get()))
storage_distributed->syncReplicaSends();
else if (auto storage_replicated = dynamic_cast<StorageReplicatedMergeTree *>(table.get()))
if (auto storage_replicated = dynamic_cast<StorageReplicatedMergeTree *>(table.get()))
storage_replicated->waitForShrinkingQueueSize(0, context.getSettingsRef().receive_timeout.value.milliseconds());
else
throw Exception("Table " + database_name + "." + table_name + " is not replicated", ErrorCodes::BAD_ARGUMENTS);
}
void InterpreterSystemQuery::syncDistributed(ASTSystemQuery & query)
{
String database_name = !query.target_database.empty() ? query.target_database : context.getCurrentDatabase();
String & table_name = query.target_table;
if (auto storage_distributed = dynamic_cast<StorageDistributed *>(context.getTable(database_name, table_name).get()))
storage_distributed->syncReplicaSends();
else
throw Exception("Table " + database_name + "." + table_name + " is not distributed", ErrorCodes::BAD_ARGUMENTS);
}
}

View File

@ -31,6 +31,7 @@ private:
void restartReplicas(Context & system_context);
void syncReplica(ASTSystemQuery & query);
void syncDistributed(ASTSystemQuery & query);
};

View File

@ -41,6 +41,8 @@ const char * ASTSystemQuery::typeToString(Type type)
return "RESTART REPLICA";
case Type::SYNC_REPLICA:
return "SYNC REPLICA";
case Type::SYNC_DISTRIBUTED:
return "SYNC DISTRIBUTED";
case Type::RELOAD_DICTIONARY:
return "RELOAD DICTIONARY";
case Type::RELOAD_DICTIONARIES:
@ -65,6 +67,10 @@ const char * ASTSystemQuery::typeToString(Type type)
return "STOP REPLICATION QUEUES";
case Type::START_REPLICATION_QUEUES:
return "START REPLICATION QUEUES";
case Type::STOP_DISTRIBUTED_SENDS:
return "STOP DISTRIBUTED SENDS";
case Type::START_DISTRIBUTED_SENDS:
return "START DISTRIBUTED SENDS";
case Type::FLUSH_LOGS:
return "FLUSH LOGS";
default:
@ -99,12 +105,14 @@ void ASTSystemQuery::formatImpl(const FormatSettings & settings, FormatState &,
|| type == Type::STOP_REPLICATED_SENDS
|| type == Type::START_REPLICATED_SENDS
|| type == Type::STOP_REPLICATION_QUEUES
|| type == Type::START_REPLICATION_QUEUES)
|| type == Type::START_REPLICATION_QUEUES
|| type == Type::STOP_DISTRIBUTED_SENDS
|| type == Type::START_DISTRIBUTED_SENDS)
{
if (!target_table.empty())
print_database_table();
}
else if (type == Type::RESTART_REPLICA || type == Type::SYNC_REPLICA)
else if (type == Type::RESTART_REPLICA || type == Type::SYNC_REPLICA || type == Type::SYNC_DISTRIBUTED)
{
print_database_table();
}

View File

@ -40,6 +40,9 @@ public:
STOP_REPLICATION_QUEUES,
START_REPLICATION_QUEUES,
FLUSH_LOGS,
SYNC_DISTRIBUTED,
STOP_DISTRIBUTED_SENDS,
START_DISTRIBUTED_SENDS,
END
};

View File

@ -49,6 +49,7 @@ bool ParserSystemQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected &
case Type::RESTART_REPLICA:
case Type::SYNC_REPLICA:
case Type::SYNC_DISTRIBUTED:
if (!parseDatabaseAndTableName(pos, expected, res->target_database, res->target_table))
return false;
break;
@ -61,6 +62,8 @@ bool ParserSystemQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected &
case Type::START_REPLICATED_SENDS:
case Type::STOP_REPLICATION_QUEUES:
case Type::START_REPLICATION_QUEUES:
case Type::STOP_DISTRIBUTED_SENDS:
case Type::START_DISTRIBUTED_SENDS:
parseDatabaseAndTableName(pos, expected, res->target_database, res->target_table);
break;

View File

@ -89,11 +89,14 @@ StorageDistributedDirectoryMonitor::~StorageDistributedDirectoryMonitor()
void StorageDistributedDirectoryMonitor::syncReplicaSends()
{
if (quit || monitor_blocker.isCancelled())
throw Exception("Cancelled sync distributed sync replica sends.", ErrorCodes::ABORTED);
if (!quit)
{
if (monitor_blocker.isCancelled())
throw Exception("Cancelled sync distributed sends.", ErrorCodes::ABORTED);
std::unique_lock lock{mutex};
findFiles();
std::unique_lock lock{mutex};
findFiles();
}
}
void StorageDistributedDirectoryMonitor::shutdownAndDropAllData()
@ -140,6 +143,10 @@ void StorageDistributedDirectoryMonitor::run()
tryLogCurrentException(getLoggerName().data());
}
}
else
{
LOG_DEBUG(log, "Skipping send data over distributed table.");
}
if (do_sleep)
cond.wait_for(lock, sleep_time, quit_requested);

View File

@ -67,7 +67,7 @@ namespace ErrorCodes
namespace ActionLocks
{
extern const StorageActionBlockType PartsSend;
extern const StorageActionBlockType DistributedSend;
}
namespace
@ -511,7 +511,7 @@ ClusterPtr StorageDistributed::skipUnusedShards(ClusterPtr cluster, const Select
ActionLock StorageDistributed::getActionLock(StorageActionBlockType type)
{
if (type == ActionLocks::PartsSend)
if (type == ActionLocks::DistributedSend)
return monitors_blocker.cancel();
return {};
}

View File

@ -27,7 +27,7 @@ def started_cluster():
def test_start_and_stop_replica_send(started_cluster):
node1.query("SYSTEM STOP REPLICATED SENDS distributed_table;")
node1.query("SYSTEM STOP DISTRIBUTED SENDS distributed_table;")
node1.query("INSERT INTO distributed_table VALUES (0, 'node1')")
node1.query("INSERT INTO distributed_table VALUES (1, 'node2')")
@ -35,7 +35,7 @@ def test_start_and_stop_replica_send(started_cluster):
# Write only to this node when stop replicated sends
assert node1.query("SELECT COUNT() FROM distributed_table").rstrip() == '1'
node1.query("SYSTEM START REPLICATED SENDS distributed_table;")
node1.query("SYSTEM SYNC REPLICA distributed_table;")
node1.query("SYSTEM START DISTRIBUTED SENDS distributed_table;")
node1.query("SYSTEM SYNC DISTRIBUTED distributed_table;")
assert node1.query("SELECT COUNT() FROM distributed_table").rstrip() == '2'