CLICKHOUSE-3847 we can do pullToLog() after we mark replica is_active and before we calc delay.

This commit is contained in:
VadimPE 2018-08-08 16:53:06 +03:00
parent 01e9d15914
commit 8d28b9f6b4
2 changed files with 8 additions and 40 deletions

View File

@ -197,6 +197,14 @@ bool ReplicatedMergeTreeRestartingThread::tryStartup()
{ {
removeFailedQuorumParts(); removeFailedQuorumParts();
activateReplica(); activateReplica();
storage.cloneReplicaIfNeeded();
/// pullLogsToQueue() after we mark replica 'is_active' and clone();
/// because cleanup_thread don't del our log_pointer.
storage.queue.pullLogsToQueue(storage.getZooKeeper());
storage.last_queue_update_finish_time.store(time(nullptr));
storage.cloneReplicaIfNeeded(); storage.cloneReplicaIfNeeded();
updateQuorumIfWeHavePart(); updateQuorumIfWeHavePart();

View File

@ -616,39 +616,6 @@ void StorageReplicatedMergeTree::createReplica()
*/ */
zookeeper->set(zookeeper_path + "/replicas", "last added replica: " + replica_name); zookeeper->set(zookeeper_path + "/replicas", "last added replica: " + replica_name);
Strings replicas = zookeeper->getChildren(zookeeper_path + "/replicas");
/** "Reference" replica, from which we take information about the set of parts, queue and pointer to the log.
* Take random replica created earlier than this.
*/
String source_replica;
zkutil::Stat stat;
zookeeper->exists(replica_path, &stat);
auto my_create_time = stat.czxid;
std::shuffle(replicas.begin(), replicas.end(), rng);
for (const String & replica : replicas)
{
if (!zookeeper->exists(zookeeper_path + "/replicas/" + replica, &stat))
throw Exception("Replica " + zookeeper_path + "/replicas/" + replica + " was removed from right under our feet.",
ErrorCodes::NO_SUCH_REPLICA);
if (stat.czxid < my_create_time)
{
source_replica = replica;
break;
}
}
if (source_replica.empty())
{
LOG_INFO(log, "This is the first replica");
}
else
{
cloneReplicaIfNeeded();
}
zookeeper->create(replica_path + "/columns", getColumns().toString(), zkutil::CreateMode::Persistent); zookeeper->create(replica_path + "/columns", getColumns().toString(), zkutil::CreateMode::Persistent);
} }
@ -2099,8 +2066,6 @@ void StorageReplicatedMergeTree::cloneReplicaIfNeeded()
} }
} while (cloneReplica(source_replica, zookeeper)); } while (cloneReplica(source_replica, zookeeper));
zookeeper->remove(replica_path + "/is_lost", -1);
} }
@ -2841,11 +2806,6 @@ void StorageReplicatedMergeTree::startup()
database_name + "." + table_name + " (ReplicatedMergeTreeQueue)", database_name + "." + table_name + " (ReplicatedMergeTreeQueue)",
data.getDataParts(), current_zookeeper); data.getDataParts(), current_zookeeper);
queue.pullLogsToQueue(current_zookeeper);
last_queue_update_finish_time.store(time(nullptr));
/// NOTE: not updating last_queue_update_start_time because it must contain the time when
/// the notification of queue change was received. In the beginning it is effectively infinite.
StoragePtr ptr = shared_from_this(); StoragePtr ptr = shared_from_this();
InterserverIOEndpointPtr data_parts_exchange_endpoint = std::make_shared<DataPartsExchange::Service>(data, ptr); InterserverIOEndpointPtr data_parts_exchange_endpoint = std::make_shared<DataPartsExchange::Service>(data, ptr);
data_parts_exchange_endpoint_holder = std::make_shared<InterserverIOEndpointHolder>( data_parts_exchange_endpoint_holder = std::make_shared<InterserverIOEndpointHolder>(