From c53b96a11ce776f96e7cbd3b8cfe74f9fd495526 Mon Sep 17 00:00:00 2001 From: Igor Nikonov Date: Fri, 28 Oct 2022 15:09:01 +0000 Subject: [PATCH] Add more comments --- .../MergeTree/ReplicatedMergeTreeSink.cpp | 16 +++++++++++++--- src/Storages/MergeTree/ZooKeeperRetries.h | 1 + 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp index 8b226e2309d..16a5bbd587e 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp @@ -368,11 +368,15 @@ void ReplicatedMergeTreeSink::commitPart( { zookeeper->setKeeper(storage.getZooKeeper()); - /// if we are in retry, check if last iteration was actually successful - /// we could get network error on latest keeper operation in iteration - /// but operation could be completed by keeper server if (retries_ctl.isRetry()) { + /// If we are retrying, check if last iteration was actually successful, + /// we could get network error on committing part to zk + /// but the operation could be completed by zk server + + /// If this flag is true, then part is in Active state, and we'll not retry anymore + /// we only check if part was committed to zk and return success or failure correspondingly + /// Note: if commit to zk failed then cleanup thread will mark the part as Outdated later if (part_committed_locally_but_zookeeper) { /// check that info about the part was actually written in zk @@ -616,9 +620,15 @@ void ReplicatedMergeTreeSink::commitPart( * if the changes were applied, the inserted block appeared in `/blocks/`, and it can not be inserted again. */ transaction.commit(); + + /// Setting this flag is point of no return + /// On next retry, we'll just check if actually operation succeed or failed + /// and return ok or error correspondingly part_committed_locally_but_zookeeper = true; /// if all retries will be exhausted by accessing zookeeper on fresh retry -> we'll add committed part to queue in the action + /// here lambda capture part name, it's ok since we'll not generate new one for this insert, + /// see comments around 'part_committed_locally_but_zookeeper' flag retries_ctl.actionAfterLastFailedRetry( [&storage = storage, part_name = part->name]() { storage.enqueuePartForCheck(part_name, MAX_AGE_OF_LOCAL_PART_THAT_WASNT_ADDED_TO_ZOOKEEPER); }); diff --git a/src/Storages/MergeTree/ZooKeeperRetries.h b/src/Storages/MergeTree/ZooKeeperRetries.h index c000e297ceb..22ace074245 100644 --- a/src/Storages/MergeTree/ZooKeeperRetries.h +++ b/src/Storages/MergeTree/ZooKeeperRetries.h @@ -136,6 +136,7 @@ public: Coordination::Error getLastKeeperErrorCode() const { return keeper_error.code; } + /// action will be called only once and only after latest failed retry void actionAfterLastFailedRetry(std::function f) { action_after_last_failed_retry = std::move(f); } private: