mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 15:42:02 +00:00
dbms: saving create time for log entries in ZK [#METR-18085].
This commit is contained in:
parent
51b2951d62
commit
10b86afd3c
@ -288,6 +288,7 @@ namespace ErrorCodes
|
||||
UNKNOWN_GLOBAL_SUBQUERIES_METHOD = 284,
|
||||
TOO_LESS_LIVE_REPLICAS = 285,
|
||||
UNSATISFIED_QUORUM_FOR_PREVIOUS_WRITE = 286,
|
||||
UNKNOWN_FORMAT_VERSION = 287,
|
||||
|
||||
KEEPER_EXCEPTION = 999,
|
||||
POCO_EXCEPTION = 1000,
|
||||
|
@ -3,10 +3,14 @@
|
||||
#include <DB/Core/Exception.h>
|
||||
#include <DB/Core/ErrorCodes.h>
|
||||
#include <DB/Core/Types.h>
|
||||
|
||||
#include <mutex>
|
||||
#include <condition_variable>
|
||||
|
||||
|
||||
struct Stat;
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
@ -77,7 +81,7 @@ struct ReplicatedMergeTreeLogEntry
|
||||
void readText(ReadBuffer & in);
|
||||
|
||||
String toString() const;
|
||||
static Ptr parse(const String & s);
|
||||
static Ptr parse(const String & s, const Stat & stat);
|
||||
};
|
||||
|
||||
|
||||
|
@ -1,3 +1,5 @@
|
||||
#include <zkutil/Types.h>
|
||||
|
||||
#include <DB/Storages/MergeTree/ReplicatedMergeTreeLogEntry.h>
|
||||
#include <DB/Storages/StorageReplicatedMergeTree.h>
|
||||
#include <DB/IO/Operators.h>
|
||||
@ -43,7 +45,8 @@ void ReplicatedMergeTreeLogEntry::tagPartAsFuture(StorageReplicatedMergeTree & s
|
||||
|
||||
void ReplicatedMergeTreeLogEntry::writeText(WriteBuffer & out) const
|
||||
{
|
||||
out << "format version: 1\n"
|
||||
out << "format version: 2\n"
|
||||
<< "create_time: " << mysqlxx::DateTime(create_time ? create_time : time(0)) << "\n"
|
||||
<< "source replica: " << source_replica << '\n';
|
||||
|
||||
switch (type)
|
||||
@ -85,10 +88,22 @@ void ReplicatedMergeTreeLogEntry::writeText(WriteBuffer & out) const
|
||||
|
||||
void ReplicatedMergeTreeLogEntry::readText(ReadBuffer & in)
|
||||
{
|
||||
UInt8 format_version = 0;
|
||||
String type_str;
|
||||
|
||||
in >> "format version: 1\n"
|
||||
>> "source replica: " >> source_replica >> "\n"
|
||||
in >> "format version: " >> format_version >> "\n";
|
||||
|
||||
if (format_version != 1 && format_version != 2)
|
||||
throw Exception("Unknown ReplicatedMergeTreeLogEntry format version: " + DB::toString(format_version), ErrorCodes::UNKNOWN_FORMAT_VERSION);
|
||||
|
||||
if (format_version == 2)
|
||||
{
|
||||
mysqlxx::DateTime create_time_dt;
|
||||
in >> "create_time: " >> create_time_dt >> "\n";
|
||||
create_time = create_time_dt;
|
||||
}
|
||||
|
||||
in >> "source replica: " >> source_replica >> "\n"
|
||||
>> type_str >> "\n";
|
||||
|
||||
if (type_str == "get")
|
||||
@ -146,12 +161,16 @@ String ReplicatedMergeTreeLogEntry::toString() const
|
||||
return s;
|
||||
}
|
||||
|
||||
ReplicatedMergeTreeLogEntry::Ptr ReplicatedMergeTreeLogEntry::parse(const String & s)
|
||||
ReplicatedMergeTreeLogEntry::Ptr ReplicatedMergeTreeLogEntry::parse(const String & s, const zkutil::Stat & stat)
|
||||
{
|
||||
ReadBufferFromString in(s);
|
||||
Ptr res = new ReplicatedMergeTreeLogEntry;
|
||||
res->readText(in);
|
||||
assertEOF(in);
|
||||
|
||||
if (!res->create_time)
|
||||
res->create_time = stat.ctime / 1000;
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
|
@ -422,6 +422,18 @@ void StorageReplicatedMergeTree::createReplica()
|
||||
log_entry.source_replica = "";
|
||||
log_entry.new_part_name = name;
|
||||
|
||||
/// Узнаем время создания part-а, если он ещё не удалён (не был, например, смерджен).
|
||||
{
|
||||
zkutil::Stat stat;
|
||||
String unused;
|
||||
if (zookeeper->tryGet(source_path + "/parts/" + name, unused, &stat))
|
||||
log_entry.create_time = stat.ctime / 1000;
|
||||
|
||||
/** Иначе временем создания будет текущее время.
|
||||
* Это время используется для измерения отставания реплик.
|
||||
*/
|
||||
}
|
||||
|
||||
zookeeper->create(replica_path + "/queue/queue-", log_entry.toString(), zkutil::CreateMode::PersistentSequential);
|
||||
}
|
||||
LOG_DEBUG(log, "Queued " << active_parts.size() << " parts to be fetched");
|
||||
@ -665,8 +677,7 @@ void StorageReplicatedMergeTree::loadQueue()
|
||||
{
|
||||
zkutil::Stat stat;
|
||||
String s = zookeeper->get(replica_path + "/queue/" + child, &stat);
|
||||
LogEntryPtr entry = LogEntry::parse(s);
|
||||
entry->create_time = stat.ctime / 1000;
|
||||
LogEntryPtr entry = LogEntry::parse(s, stat);
|
||||
entry->znode_name = child;
|
||||
entry->addResultToVirtualParts(*this);
|
||||
queue.push_back(entry);
|
||||
@ -706,8 +717,7 @@ void StorageReplicatedMergeTree::pullLogsToQueue(zkutil::EventPtr next_update_ev
|
||||
++count;
|
||||
++index;
|
||||
|
||||
LogEntryPtr entry = LogEntry::parse(entry_str);
|
||||
entry->create_time = stat.ctime / 1000;
|
||||
LogEntryPtr entry = LogEntry::parse(entry_str, stat);
|
||||
|
||||
/// Одновременно добавим запись в очередь и продвинем указатель на лог.
|
||||
zkutil::Ops ops;
|
||||
|
Loading…
Reference in New Issue
Block a user