dbms: set queue entry time when cloning replica or repairing missing or damaged parts [#METR-18085].

This commit is contained in:
Alexey Milovidov 2015-09-20 08:50:15 +03:00
parent 615181b3a6
commit 227b41b0eb

View File

@ -344,6 +344,25 @@ void StorageReplicatedMergeTree::checkTableStructure(bool skip_sanity_checks, bo
}
/** При необходимости восстановить кусок, реплика сама добавляет в свою очередь запись на его получение.
* Какое поставить время для этой записи в очереди? Время учитывается при расчёте отставания реплики.
* Для этих целей имеет смысл использовать время создания недостающего куска
* (то есть, при расчёте отставания будет учитано, насколько старый кусок нам нужно восстановить).
*/
static time_t tryGetPartCreateTime(zkutil::ZooKeeperPtr & zookeeper, const String & replica_path, const String & part_name)
{
time_t res = 0;
/// Узнаем время создания part-а, если он ещё существует (не был, например, смерджен).
zkutil::Stat stat;
String unused;
if (zookeeper->tryGet(replica_path + "/parts/" + part_name, unused, &stat))
res = stat.ctime / 1000;
return res;
}
void StorageReplicatedMergeTree::createReplica()
{
auto zookeeper = getZooKeeper();
@ -458,18 +477,7 @@ void StorageReplicatedMergeTree::createReplica()
log_entry.type = LogEntry::GET_PART;
log_entry.source_replica = "";
log_entry.new_part_name = name;
/// Узнаем время создания part-а, если он ещё не удалён (не был, например, смерджен).
{
zkutil::Stat stat;
String unused;
if (zookeeper->tryGet(source_path + "/parts/" + name, unused, &stat))
log_entry.create_time = stat.ctime / 1000;
/** Иначе временем создания будет текущее время.
* Это время используется для измерения отставания реплик.
*/
}
log_entry.create_time = tryGetPartCreateTime(zookeeper, source_path, name);
zookeeper->create(replica_path + "/queue/queue-", log_entry.toString(), zkutil::CreateMode::PersistentSequential);
}
@ -480,6 +488,9 @@ void StorageReplicatedMergeTree::createReplica()
{
zookeeper->create(replica_path + "/queue/queue-", entry, zkutil::CreateMode::PersistentSequential);
}
/// Далее оно будет загружено в переменную queue в методе loadQueue.
LOG_DEBUG(log, "Copied " << source_queue.size() << " queue entries");
}
@ -605,6 +616,7 @@ void StorageReplicatedMergeTree::checkParts(bool skip_sanity_checks)
log_entry.type = LogEntry::GET_PART;
log_entry.source_replica = "";
log_entry.new_part_name = name;
log_entry.create_time = tryGetPartCreateTime(zookeeper, replica_path, name);
/// Полагаемся, что это происходит до загрузки очереди (loadQueue).
zkutil::Ops ops;
@ -1725,7 +1737,7 @@ void StorageReplicatedMergeTree::removePartAndEnqueueFetch(const String & part_n
LogEntryPtr log_entry = new LogEntry;
log_entry->type = LogEntry::GET_PART;
log_entry->create_time = time(0);
log_entry->create_time = tryGetPartCreateTime(zookeeper, replica_path, part_name);
log_entry->source_replica = "";
log_entry->new_part_name = part_name;