From 8b07a7f1807ef771b7b163b7728db215f9c7552a Mon Sep 17 00:00:00 2001 From: Nicolae Vartolomei Date: Tue, 27 Jul 2021 15:35:20 +0100 Subject: [PATCH] Store exception generated when we tried to update the queue last time The use case is to alert when queue contains broken entries. Especially important when ClickHouse breaks backwards compatibility between versions and log entries written by newer versions aren't parseable by old versions. ``` Code: 27, e.displayText() = DB::Exception: Cannot parse input: expected 'quorum: ' before: 'merge_type: 2\n' ``` --- .../ReplicatedMergeTreeRestartingThread.cpp | 17 +++++++--- src/Storages/StorageReplicatedMergeTree.cpp | 31 ++++++++++++------- src/Storages/StorageReplicatedMergeTree.h | 2 ++ src/Storages/System/StorageSystemReplicas.cpp | 2 ++ 4 files changed, 36 insertions(+), 16 deletions(-) diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp index 1c9921aad1d..eadd414f1d5 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp @@ -153,11 +153,20 @@ bool ReplicatedMergeTreeRestartingThread::tryStartup() storage.cloneReplicaIfNeeded(zookeeper); - storage.queue.load(zookeeper); + try + { + storage.queue.load(zookeeper); + + /// pullLogsToQueue() after we mark replica 'is_active' (and after we repair if it was lost); + /// because cleanup_thread doesn't delete log_pointer of active replicas. + storage.queue.pullLogsToQueue(zookeeper); + } + catch (...) + { + storage.last_queue_update_exception.set(std::make_unique(getCurrentExceptionMessage(false))); + throw; + } - /// pullLogsToQueue() after we mark replica 'is_active' (and after we repair if it was lost); - /// because cleanup_thread doesn't delete log_pointer of active replicas. - storage.queue.pullLogsToQueue(zookeeper); storage.queue.removeCurrentPartsFromMutations(); storage.last_queue_update_finish_time.store(time(nullptr)); diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index d44b86fe9bb..8966a34e825 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -3087,21 +3087,24 @@ void StorageReplicatedMergeTree::queueUpdatingTask() last_queue_update_finish_time.store(time(nullptr)); queue_update_in_progress = false; } - catch (const Coordination::Exception & e) - { - tryLogCurrentException(log, __PRETTY_FUNCTION__); - - if (e.code == Coordination::Error::ZSESSIONEXPIRED) - { - restarting_thread.wakeup(); - return; - } - - queue_updating_task->scheduleAfter(QUEUE_UPDATE_ERROR_SLEEP_MS); - } catch (...) { + last_queue_update_exception.set(std::make_unique(getCurrentExceptionMessage(false))); tryLogCurrentException(log, __PRETTY_FUNCTION__); + + try + { + throw; + } + catch (const Coordination::Exception & e) + { + if (e.code == Coordination::Error::ZSESSIONEXPIRED) + { + restarting_thread.wakeup(); + return; + } + } + queue_updating_task->scheduleAfter(QUEUE_UPDATE_ERROR_SLEEP_MS); } } @@ -5562,6 +5565,10 @@ void StorageReplicatedMergeTree::getStatus(Status & res, bool with_zk_fields) res.total_replicas = 0; res.active_replicas = 0; + MultiVersion::Version queue_exception = last_queue_update_exception.get(); + if (queue_exception) + res.last_queue_update_exception = *queue_exception; + if (with_zk_fields && !res.is_session_expired) { try diff --git a/src/Storages/StorageReplicatedMergeTree.h b/src/Storages/StorageReplicatedMergeTree.h index 800f419cb76..9c3b9b12e37 100644 --- a/src/Storages/StorageReplicatedMergeTree.h +++ b/src/Storages/StorageReplicatedMergeTree.h @@ -174,6 +174,7 @@ public: UInt64 absolute_delay; UInt8 total_replicas; UInt8 active_replicas; + String last_queue_update_exception; /// If the error has happened fetching the info from ZooKeeper, this field will be set. String zookeeper_exception; }; @@ -329,6 +330,7 @@ private: ReplicatedMergeTreeQueue queue; std::atomic last_queue_update_start_time{0}; std::atomic last_queue_update_finish_time{0}; + MultiVersion last_queue_update_exception; DataPartsExchange::Fetcher fetcher; diff --git a/src/Storages/System/StorageSystemReplicas.cpp b/src/Storages/System/StorageSystemReplicas.cpp index fc33c6b421b..3af7352616f 100644 --- a/src/Storages/System/StorageSystemReplicas.cpp +++ b/src/Storages/System/StorageSystemReplicas.cpp @@ -50,6 +50,7 @@ StorageSystemReplicas::StorageSystemReplicas(const StorageID & table_id_) { "absolute_delay", std::make_shared() }, { "total_replicas", std::make_shared() }, { "active_replicas", std::make_shared() }, + { "last_queue_update_exception", std::make_shared() }, { "zookeeper_exception", std::make_shared() }, })); setInMemoryMetadata(storage_metadata); @@ -183,6 +184,7 @@ Pipe StorageSystemReplicas::read( res_columns[col_num++]->insert(status.absolute_delay); res_columns[col_num++]->insert(status.total_replicas); res_columns[col_num++]->insert(status.active_replicas); + res_columns[col_num++]->insert(status.last_queue_update_exception); res_columns[col_num++]->insert(status.zookeeper_exception); }