diff --git a/dbms/include/DB/Core/ErrorCodes.h b/dbms/include/DB/Core/ErrorCodes.h index bc09aff79e6..5af59c206f1 100644 --- a/dbms/include/DB/Core/ErrorCodes.h +++ b/dbms/include/DB/Core/ErrorCodes.h @@ -288,6 +288,7 @@ namespace ErrorCodes UNKNOWN_GLOBAL_SUBQUERIES_METHOD = 284, TOO_LESS_LIVE_REPLICAS = 285, UNSATISFIED_QUORUM_FOR_PREVIOUS_WRITE = 286, + UNKNOWN_FORMAT_VERSION = 287, KEEPER_EXCEPTION = 999, POCO_EXCEPTION = 1000, diff --git a/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreeLogEntry.h b/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreeLogEntry.h index 471d8cd6bba..3511cfa7caf 100644 --- a/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreeLogEntry.h +++ b/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreeLogEntry.h @@ -3,10 +3,14 @@ #include #include #include + #include #include +struct Stat; + + namespace DB { @@ -77,7 +81,7 @@ struct ReplicatedMergeTreeLogEntry void readText(ReadBuffer & in); String toString() const; - static Ptr parse(const String & s); + static Ptr parse(const String & s, const Stat & stat); }; diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.cpp index 241f1cd393a..7d4a4989044 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.cpp @@ -1,3 +1,5 @@ +#include + #include #include #include @@ -43,7 +45,8 @@ void ReplicatedMergeTreeLogEntry::tagPartAsFuture(StorageReplicatedMergeTree & s void ReplicatedMergeTreeLogEntry::writeText(WriteBuffer & out) const { - out << "format version: 1\n" + out << "format version: 2\n" + << "create_time: " << mysqlxx::DateTime(create_time ? create_time : time(0)) << "\n" << "source replica: " << source_replica << '\n'; switch (type) @@ -85,10 +88,22 @@ void ReplicatedMergeTreeLogEntry::writeText(WriteBuffer & out) const void ReplicatedMergeTreeLogEntry::readText(ReadBuffer & in) { + UInt8 format_version = 0; String type_str; - in >> "format version: 1\n" - >> "source replica: " >> source_replica >> "\n" + in >> "format version: " >> format_version >> "\n"; + + if (format_version != 1 && format_version != 2) + throw Exception("Unknown ReplicatedMergeTreeLogEntry format version: " + DB::toString(format_version), ErrorCodes::UNKNOWN_FORMAT_VERSION); + + if (format_version == 2) + { + mysqlxx::DateTime create_time_dt; + in >> "create_time: " >> create_time_dt >> "\n"; + create_time = create_time_dt; + } + + in >> "source replica: " >> source_replica >> "\n" >> type_str >> "\n"; if (type_str == "get") @@ -146,12 +161,16 @@ String ReplicatedMergeTreeLogEntry::toString() const return s; } -ReplicatedMergeTreeLogEntry::Ptr ReplicatedMergeTreeLogEntry::parse(const String & s) +ReplicatedMergeTreeLogEntry::Ptr ReplicatedMergeTreeLogEntry::parse(const String & s, const zkutil::Stat & stat) { ReadBufferFromString in(s); Ptr res = new ReplicatedMergeTreeLogEntry; res->readText(in); assertEOF(in); + + if (!res->create_time) + res->create_time = stat.ctime / 1000; + return res; } diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index 23edccd9341..691e6de9050 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -422,6 +422,18 @@ void StorageReplicatedMergeTree::createReplica() log_entry.source_replica = ""; log_entry.new_part_name = name; + /// Узнаем время создания part-а, если он ещё не удалён (не был, например, смерджен). + { + zkutil::Stat stat; + String unused; + if (zookeeper->tryGet(source_path + "/parts/" + name, unused, &stat)) + log_entry.create_time = stat.ctime / 1000; + + /** Иначе временем создания будет текущее время. + * Это время используется для измерения отставания реплик. + */ + } + zookeeper->create(replica_path + "/queue/queue-", log_entry.toString(), zkutil::CreateMode::PersistentSequential); } LOG_DEBUG(log, "Queued " << active_parts.size() << " parts to be fetched"); @@ -665,8 +677,7 @@ void StorageReplicatedMergeTree::loadQueue() { zkutil::Stat stat; String s = zookeeper->get(replica_path + "/queue/" + child, &stat); - LogEntryPtr entry = LogEntry::parse(s); - entry->create_time = stat.ctime / 1000; + LogEntryPtr entry = LogEntry::parse(s, stat); entry->znode_name = child; entry->addResultToVirtualParts(*this); queue.push_back(entry); @@ -706,8 +717,7 @@ void StorageReplicatedMergeTree::pullLogsToQueue(zkutil::EventPtr next_update_ev ++count; ++index; - LogEntryPtr entry = LogEntry::parse(entry_str); - entry->create_time = stat.ctime / 1000; + LogEntryPtr entry = LogEntry::parse(entry_str, stat); /// Одновременно добавим запись в очередь и продвинем указатель на лог. zkutil::Ops ops;