add idempotent retries of updating the replica queue in ZooKeeper [#CLICKHOUSE-3405]

This commit is contained in:
Alexey Zatelepin 2017-11-23 16:12:22 +03:00
parent 167618f778
commit 78aaa42ddd
4 changed files with 50 additions and 18 deletions

View File

@ -1,7 +1,8 @@
#include <Storages/MergeTree/ReplicatedMergeTreeQueue.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <Storages/MergeTree/ReplicatedMergeTreeQueue.h>
#include <Storages/MergeTree/MergeTreeDataMerger.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Common/StringUtils.h>
@ -15,6 +16,13 @@ namespace ErrorCodes
}
ReplicatedMergeTreeQueue::ReplicatedMergeTreeQueue(const StorageReplicatedMergeTree & storage_)
: storage(storage_)
, virtual_parts(storage.data.format_version)
{
}
void ReplicatedMergeTreeQueue::initVirtualParts(const MergeTreeData::DataParts & parts)
{
std::lock_guard<std::mutex> lock(mutex);
@ -233,7 +241,8 @@ bool ReplicatedMergeTreeQueue::pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper, B
{
std::lock_guard<std::mutex> lock(pull_logs_to_queue_mutex);
String index_str = zookeeper->get(replica_path + "/log_pointer");
zkutil::Stat log_pointer_stat;
String index_str = zookeeper->get(replica_path + "/log_pointer", &log_pointer_stat);
UInt64 index;
Strings log_entries = zookeeper->getChildren(zookeeper_path + "/log");
@ -243,7 +252,7 @@ bool ReplicatedMergeTreeQueue::pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper, B
/// If we do not already have a pointer to the log, put a pointer to the first entry in it.
index = log_entries.empty() ? 0 : parse<UInt64>(std::min_element(log_entries.begin(), log_entries.end())->substr(strlen("log-")));
zookeeper->set(replica_path + "/log_pointer", toString(index));
zookeeper->set(replica_path + "/log_pointer", toString(index), -1, &log_pointer_stat);
}
else
{
@ -320,13 +329,36 @@ bool ReplicatedMergeTreeQueue::pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper, B
}
ops.emplace_back(std::make_unique<zkutil::Op::SetData>(
replica_path + "/log_pointer", toString(last_entry_index + 1), -1));
replica_path + "/log_pointer", toString(last_entry_index + 1), log_pointer_stat.version));
if (min_unprocessed_insert_time_changed)
ops.emplace_back(std::make_unique<zkutil::Op::SetData>(
replica_path + "/min_unprocessed_insert_time", toString(min_unprocessed_insert_time), -1));
auto results = zookeeper->multi(ops);
for (size_t num_tries = 0; ; ++num_tries)
{
if (storage.shutdown_called)
return false;
try
{
zookeeper->multi(ops);
break;
}
catch (const zkutil::KeeperException & ex)
{
if (num_tries > 0 && ex.code == ZBADVERSION)
{
LOG_TRACE(log, "Version mismatch while updating the queue. Assuming we have already updated it.");
break;
}
if (!ex.isTemporaryError())
throw;
tryLogCurrentException(log, "Temporary error while updating the queue (will retry)");
}
}
/// Now we have successfully updated the queue in ZooKeeper. Update it in RAM.
@ -422,7 +454,7 @@ void ReplicatedMergeTreeQueue::removeGetsAndMergesInRange(zkutil::ZooKeeperPtr z
for (Queue::iterator it = queue.begin(); it != queue.end();)
{
if (((*it)->type == LogEntry::GET_PART || (*it)->type == LogEntry::MERGE_PARTS) &&
MergeTreePartInfo::contains(part_name, (*it)->new_part_name, format_version))
MergeTreePartInfo::contains(part_name, (*it)->new_part_name, storage.data.format_version))
{
if ((*it)->currently_executing)
to_wait.push_back(*it);
@ -461,14 +493,14 @@ ReplicatedMergeTreeQueue::Queue ReplicatedMergeTreeQueue::getConflictsForClearCo
{
if (elem->type == LogEntry::MERGE_PARTS || elem->type == LogEntry::GET_PART || elem->type == LogEntry::ATTACH_PART)
{
if (MergeTreePartInfo::contains(entry.new_part_name, elem->new_part_name, format_version))
if (MergeTreePartInfo::contains(entry.new_part_name, elem->new_part_name, storage.data.format_version))
conflicts.emplace_back(elem);
}
if (elem->type == LogEntry::CLEAR_COLUMN)
{
auto cur_part = MergeTreePartInfo::fromPartName(elem->new_part_name, format_version);
auto part = MergeTreePartInfo::fromPartName(entry.new_part_name, format_version);
auto cur_part = MergeTreePartInfo::fromPartName(elem->new_part_name, storage.data.format_version);
auto part = MergeTreePartInfo::fromPartName(entry.new_part_name, storage.data.format_version);
if (part.partition_id == cur_part.partition_id)
conflicts.emplace_back(elem);
@ -524,12 +556,12 @@ bool ReplicatedMergeTreeQueue::isNotCoveredByFuturePartsImpl(const String & new_
/// A more complex check is whether another part is currently created by other action that will cover this part.
/// NOTE The above is redundant, but left for a more convenient message in the log.
auto result_part = MergeTreePartInfo::fromPartName(new_part_name, format_version);
auto result_part = MergeTreePartInfo::fromPartName(new_part_name, storage.data.format_version);
/// It can slow down when the size of `future_parts` is large. But it can not be large, since `BackgroundProcessingPool` is limited.
for (const auto & future_part_name : future_parts)
{
auto future_part = MergeTreePartInfo::fromPartName(future_part_name, format_version);
auto future_part = MergeTreePartInfo::fromPartName(future_part_name, storage.data.format_version);
if (future_part.contains(result_part))
{

View File

@ -1,3 +1,5 @@
#pragma once
#include <Storages/MergeTree/ReplicatedMergeTreeLogEntry.h>
#include <Storages/MergeTree/ActiveDataPartSet.h>
#include <Storages/MergeTree/MergeTreeData.h>
@ -11,6 +13,7 @@ namespace DB
class MergeTreeDataMerger;
class StorageReplicatedMergeTree;
class ReplicatedMergeTreeQueue
@ -38,7 +41,7 @@ private:
using InsertsByTime = std::set<LogEntryPtr, ByTime>;
MergeTreeDataFormatVersion format_version;
const StorageReplicatedMergeTree & storage;
String zookeeper_path;
String replica_path;
@ -124,11 +127,7 @@ private:
};
public:
ReplicatedMergeTreeQueue(MergeTreeDataFormatVersion format_version_)
: format_version(format_version_)
, virtual_parts(format_version)
{
}
ReplicatedMergeTreeQueue(const StorageReplicatedMergeTree & storage_);
void initialize(const String & zookeeper_path_, const String & replica_path_, const String & logger_name_,
const MergeTreeData::DataParts & parts, zkutil::ZooKeeperPtr zookeeper);

View File

@ -188,7 +188,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
settings_, database_name_ + "." + table_name, true, attach,
[this] (const std::string & name) { enqueuePartForCheck(name); },
[this] () { clearOldPartsAndRemoveFromZK(); }),
reader(data), writer(data), merger(data, context.getBackgroundPool()), queue(data.format_version),
reader(data), writer(data), merger(data, context.getBackgroundPool()), queue(*this),
fetcher(data),
shutdown_event(false), part_check_thread(*this),
log(&Logger::get(database_name + "." + table_name + " (StorageReplicatedMergeTree)"))

View File

@ -205,6 +205,7 @@ private:
friend class ReplicatedMergeTreeRestartingThread;
friend struct ReplicatedMergeTreeLogEntry;
friend class ScopedPartitionMergeLock;
friend class ReplicatedMergeTreeQueue;
using LogEntry = ReplicatedMergeTreeLogEntry;
using LogEntryPtr = LogEntry::Ptr;