This commit is contained in:
Michael Kolupaev 2014-04-22 14:57:19 +04:00
parent ad3954a28a
commit dba95b4de5
2 changed files with 21 additions and 11 deletions

View File

@ -34,6 +34,7 @@ public:
const String & sign_column_ = "",
const MergeTreeSettings & settings_ = MergeTreeSettings());
void startup();
void shutdown();
~StorageReplicatedMergeTree();

View File

@ -60,14 +60,6 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
}
loadQueue();
activateReplica();
leader_election = new zkutil::LeaderElection(zookeeper_path + "/leader_election", zookeeper,
std::bind(&StorageReplicatedMergeTree::becomeLeader, this), replica_name);
queue_updating_thread = std::thread(&StorageReplicatedMergeTree::queueUpdatingThread, this);
for (size_t i = 0; i < settings_.replication_threads; ++i)
queue_threads.push_back(std::thread(&StorageReplicatedMergeTree::queueThread, this));
}
StoragePtr StorageReplicatedMergeTree::create(
@ -88,9 +80,7 @@ StoragePtr StorageReplicatedMergeTree::create(
path_, name_, columns_, context_, primary_expr_ast_, date_column_name_, sampling_expression_,
index_granularity_, mode_, sign_column_, settings_);
StoragePtr res_ptr = res->thisPtr();
String endpoint_name = "ReplicatedMergeTree:" + res->replica_path;
InterserverIOEndpointPtr endpoint = new ReplicatedMergeTreePartsServer(res->data, res_ptr);
res->endpoint_holder = new InterserverIOEndpointHolder(endpoint_name, endpoint, res->context.getInterserverIOHandler());
res->startup();
return res_ptr;
}
@ -1022,6 +1012,7 @@ void StorageReplicatedMergeTree::shutdown()
LOG_TRACE(log, "Waiting for threads to finish");
if (is_leader_node)
{
is_leader_node = false;
merge_selecting_thread.join();
clear_old_blocks_thread.join();
}
@ -1031,6 +1022,24 @@ void StorageReplicatedMergeTree::shutdown()
LOG_TRACE(log, "Threads finished");
}
void StorageReplicatedMergeTree::startup()
{
shutdown_called = false;
String endpoint_name = "ReplicatedMergeTree:" + replica_path;
InterserverIOEndpointPtr endpoint = new ReplicatedMergeTreePartsServer(data, thisPtr());
endpoint_holder = new InterserverIOEndpointHolder(endpoint_name, endpoint, context.getInterserverIOHandler());
activateReplica();
leader_election = new zkutil::LeaderElection(zookeeper_path + "/leader_election", zookeeper,
std::bind(&StorageReplicatedMergeTree::becomeLeader, this), replica_name);
queue_updating_thread = std::thread(&StorageReplicatedMergeTree::queueUpdatingThread, this);
for (size_t i = 0; i < data.settings.replication_threads; ++i)
queue_threads.push_back(std::thread(&StorageReplicatedMergeTree::queueThread, this));
}
StorageReplicatedMergeTree::~StorageReplicatedMergeTree()
{
try