mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-23 08:02:02 +00:00
Update ReplicatedMergeTreeQueue.cpp
This commit is contained in:
parent
984d6b5db8
commit
9bce62719f
@ -407,8 +407,8 @@ void ReplicatedMergeTreeQueue::pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper, z
|
||||
/// Multiple log entries that must be copied to the queue.
|
||||
|
||||
log_entries.erase(
|
||||
std::remove_if(log_entries.begin(), log_entries.end(), [&min_log_entry](const String & entry) { return entry < min_log_entry; }),
|
||||
log_entries.end());
|
||||
std::remove_if(log_entries.begin(), log_entries.end(), [&min_log_entry](const String & entry) { return entry < min_log_entry; }),
|
||||
log_entries.end());
|
||||
|
||||
if (!log_entries.empty())
|
||||
{
|
||||
@ -425,14 +425,14 @@ void ReplicatedMergeTreeQueue::pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper, z
|
||||
{
|
||||
auto begin = log_entries.begin() + i;
|
||||
auto end = i + MAX_MULTI_OPS >= log_entries.size()
|
||||
? log_entries.end()
|
||||
: (begin + MAX_MULTI_OPS);
|
||||
? log_entries.end()
|
||||
: (begin + MAX_MULTI_OPS);
|
||||
auto last = end - 1;
|
||||
|
||||
String last_entry = *last;
|
||||
if (!startsWith(last_entry, "log-"))
|
||||
throw Exception("Error in zookeeper data: unexpected node " + last_entry + " in " + zookeeper_path + "/log",
|
||||
ErrorCodes::UNEXPECTED_NODE_IN_ZOOKEEPER);
|
||||
ErrorCodes::UNEXPECTED_NODE_IN_ZOOKEEPER);
|
||||
|
||||
UInt64 last_entry_index = parse<UInt64>(last_entry.substr(strlen("log-")));
|
||||
|
||||
@ -459,7 +459,7 @@ void ReplicatedMergeTreeQueue::pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper, z
|
||||
copied_entries.emplace_back(LogEntry::parse(res.data, res.stat));
|
||||
|
||||
ops.emplace_back(zkutil::makeCreateRequest(
|
||||
replica_path + "/queue/queue-", res.data, zkutil::CreateMode::PersistentSequential));
|
||||
replica_path + "/queue/queue-", res.data, zkutil::CreateMode::PersistentSequential));
|
||||
|
||||
const auto & entry = *copied_entries.back();
|
||||
if (entry.type == LogEntry::GET_PART)
|
||||
@ -474,11 +474,11 @@ void ReplicatedMergeTreeQueue::pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper, z
|
||||
}
|
||||
|
||||
ops.emplace_back(zkutil::makeSetRequest(
|
||||
replica_path + "/log_pointer", toString(last_entry_index + 1), -1));
|
||||
replica_path + "/log_pointer", toString(last_entry_index + 1), -1));
|
||||
|
||||
if (min_unprocessed_insert_time_changed)
|
||||
ops.emplace_back(zkutil::makeSetRequest(
|
||||
replica_path + "/min_unprocessed_insert_time", toString(*min_unprocessed_insert_time_changed), -1));
|
||||
replica_path + "/min_unprocessed_insert_time", toString(*min_unprocessed_insert_time_changed), -1));
|
||||
|
||||
auto responses = zookeeper->multi(ops);
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user