Add support for user defined identifier on log entries

Sometimes we want to push a log entry once and only once. Because it is
not possible to create a sequential node in ZooKeeper and store its name
to a well known location in the same transaction we'll do it in the
other order. First somehow generate a unique identifier, then submit a
log entry with that identifier. Later, we can search through log entries
using the identifier we provided to find the node.

Required for part movement between shards.
This commit is contained in:
Nicolae Vartolomei 2021-09-16 16:47:57 +01:00
parent c428f433c3
commit 0381c634d4
No known key found for this signature in database
GPG Key ID: 6A71A31C4A463CF6
3 changed files with 95 additions and 3 deletions

View File

@ -26,6 +26,7 @@ enum FormatVersion : UInt8
FORMAT_WITH_DEDUPLICATE = 4,
FORMAT_WITH_UUID = 5,
FORMAT_WITH_DEDUPLICATE_BY_COLUMNS = 6,
FORMAT_WITH_LOG_ENTRY_ID = 7,
FORMAT_LAST
};
@ -43,11 +44,17 @@ void ReplicatedMergeTreeLogEntryData::writeText(WriteBuffer & out) const
if (new_part_uuid != UUIDHelpers::Nil)
format_version = std::max<UInt8>(format_version, FORMAT_WITH_UUID);
if (!log_entry_id.empty())
format_version = std::max<UInt8>(format_version, FORMAT_WITH_LOG_ENTRY_ID);
out << "format version: " << format_version << "\n"
<< "create_time: " << LocalDateTime(create_time ? create_time : time(nullptr)) << "\n"
<< "source replica: " << source_replica << '\n'
<< "block_id: " << escape << block_id << '\n';
if (format_version >= FORMAT_WITH_LOG_ENTRY_ID)
out << "log_entry_id: " << escape << log_entry_id << '\n';
switch (type)
{
case GET_PART:
@ -192,6 +199,9 @@ void ReplicatedMergeTreeLogEntryData::readText(ReadBuffer & in)
in >> "block_id: " >> escape >> block_id >> "\n";
}
if (format_version >= FORMAT_WITH_LOG_ENTRY_ID)
in >> "log_entry_id: " >> escape >> log_entry_id >> "\n";
in >> type_str >> "\n";
bool trailing_newline_found = false;

View File

@ -77,6 +77,7 @@ struct ReplicatedMergeTreeLogEntryData
String toString() const;
String znode_name;
String log_entry_id;
Type type = EMPTY;
String source_replica; /// Empty string means that this entry was added to the queue immediately, and not copied from the log.

View File

@ -5107,6 +5107,7 @@ bool StorageReplicatedMergeTree::tryWaitForReplicaToProcessLogEntry(
LOG_DEBUG(log, "Waiting for {} to process log entry", replica);
if (startsWith(entry.znode_name, "log-"))
{
/// Take the number from the node name `log-xxxxxxxxxx`.
UInt64 log_index = parse<UInt64>(entry.znode_name.substr(entry.znode_name.size() - 10));
@ -5137,9 +5138,76 @@ bool StorageReplicatedMergeTree::tryWaitForReplicaToProcessLogEntry(
if (!pulled_to_queue)
return false;
}
LOG_DEBUG(log, "Looking for node corresponding to {} in {} queue", log_node_name, replica);
LOG_DEBUG(log, "Looking for node corresponding to {} in {} queue", log_node_name, replica);
}
else if (!entry.log_entry_id.empty())
{
/// First pass, check the table log.
/// If found in the log, wait for replica to fetch it to the queue.
/// If not found in the log, it is already in the queue.
LOG_DEBUG(log, "Looking for log entry with id `{}` in the log", entry.log_entry_id);
String log_pointer = getZooKeeper()->get(fs::path(table_zookeeper_path) / "replicas" / replica / "log_pointer");
Strings log_entries = getZooKeeper()->getChildren(fs::path(table_zookeeper_path) / "log");
UInt64 log_index = 0;
bool found = false;
for (const String & log_entry_name : log_entries)
{
log_index = parse<UInt64>(log_entry_name.substr(log_entry_name.size() - 10));
if (!log_pointer.empty() && log_index < parse<UInt64>(log_pointer))
continue;
String log_entry_str;
Coordination::Stat log_entry_stat;
bool exists = getZooKeeper()->tryGet(fs::path(table_zookeeper_path) / "log" / log_entry_name, log_entry_str, &log_entry_stat);
ReplicatedMergeTreeLogEntryData log_entry = *ReplicatedMergeTreeLogEntry::parse(log_entry_str, log_entry_stat);
if (exists && entry.log_entry_id == log_entry.log_entry_id)
{
LOG_DEBUG(log, "Found log entry with id `{}` in the log", entry.log_entry_id);
found = true;
log_node_name = log_entry_name;
break;
}
}
if (found)
{
LOG_DEBUG(log, "Waiting for {} to pull {} to queue", replica, log_node_name);
/// Let's wait until entry gets into the replica queue.
bool pulled_to_queue = false;
do
{
zkutil::EventPtr event = std::make_shared<Poco::Event>();
log_pointer = getZooKeeper()->get(fs::path(table_zookeeper_path) / "replicas" / replica / "log_pointer", nullptr, event);
if (!log_pointer.empty() && parse<UInt64>(log_pointer) > log_index)
{
pulled_to_queue = true;
break;
}
/// Wait with timeout because we can be already shut down, but not dropped.
/// So log_pointer node will exist, but we will never update it because all background threads already stopped.
/// It can lead to query hung because table drop query can wait for some query (alter, optimize, etc) which called this method,
/// but the query will never finish because the drop already shut down the table.
if (!stop_waiting())
event->tryWait(event_wait_timeout_ms);
} while (!stop_waiting());
if (!pulled_to_queue)
return false;
}
}
else
{
throw Exception("Logical error: unexpected name of log node: " + entry.znode_name, ErrorCodes::LOGICAL_ERROR);
}
/** Second - find the corresponding entry in the queue of the specified replica.
* Its number may not match the `log` node. Therefore, we search by comparing the content.
@ -5151,12 +5219,25 @@ bool StorageReplicatedMergeTree::tryWaitForReplicaToProcessLogEntry(
for (const String & entry_name : queue_entries)
{
String queue_entry_str;
bool exists = getZooKeeper()->tryGet(fs::path(table_zookeeper_path) / "replicas" / replica / "queue" / entry_name, queue_entry_str);
Coordination::Stat queue_entry_stat;
bool exists = getZooKeeper()->tryGet(fs::path(table_zookeeper_path) / "replicas" / replica / "queue" / entry_name, queue_entry_str, &queue_entry_stat);
if (exists && queue_entry_str == entry_str)
{
queue_entry_to_wait_for = entry_name;
break;
}
else if (!entry.log_entry_id.empty())
{
/// Check if the id matches rather than just contents. This entry
/// might have been written by different ClickHouse versions and
/// it is hard to guarantee same text representation.
ReplicatedMergeTreeLogEntryData queue_entry = *ReplicatedMergeTreeLogEntry::parse(queue_entry_str, queue_entry_stat);
if (entry.log_entry_id == queue_entry.log_entry_id)
{
queue_entry_to_wait_for = entry_name;
break;
}
}
}
/// While looking for the record, it has already been executed and deleted.