Store exception generated when we tried to update the queue last time

The use case is to alert when queue contains broken entries. Especially
important when ClickHouse breaks backwards compatibility between
versions and log entries written by newer versions aren't parseable by
old versions.

```
Code: 27, e.displayText() = DB::Exception: Cannot parse input: expected 'quorum: ' before: 'merge_type: 2\n'
```
This commit is contained in:
Nicolae Vartolomei 2021-07-27 15:35:20 +01:00
parent e83ca20ac3
commit 8b07a7f180
4 changed files with 36 additions and 16 deletions

View File

@ -153,11 +153,20 @@ bool ReplicatedMergeTreeRestartingThread::tryStartup()
storage.cloneReplicaIfNeeded(zookeeper); storage.cloneReplicaIfNeeded(zookeeper);
storage.queue.load(zookeeper); try
{
storage.queue.load(zookeeper);
/// pullLogsToQueue() after we mark replica 'is_active' (and after we repair if it was lost);
/// because cleanup_thread doesn't delete log_pointer of active replicas.
storage.queue.pullLogsToQueue(zookeeper);
}
catch (...)
{
storage.last_queue_update_exception.set(std::make_unique<String>(getCurrentExceptionMessage(false)));
throw;
}
/// pullLogsToQueue() after we mark replica 'is_active' (and after we repair if it was lost);
/// because cleanup_thread doesn't delete log_pointer of active replicas.
storage.queue.pullLogsToQueue(zookeeper);
storage.queue.removeCurrentPartsFromMutations(); storage.queue.removeCurrentPartsFromMutations();
storage.last_queue_update_finish_time.store(time(nullptr)); storage.last_queue_update_finish_time.store(time(nullptr));

View File

@ -3087,21 +3087,24 @@ void StorageReplicatedMergeTree::queueUpdatingTask()
last_queue_update_finish_time.store(time(nullptr)); last_queue_update_finish_time.store(time(nullptr));
queue_update_in_progress = false; queue_update_in_progress = false;
} }
catch (const Coordination::Exception & e)
{
tryLogCurrentException(log, __PRETTY_FUNCTION__);
if (e.code == Coordination::Error::ZSESSIONEXPIRED)
{
restarting_thread.wakeup();
return;
}
queue_updating_task->scheduleAfter(QUEUE_UPDATE_ERROR_SLEEP_MS);
}
catch (...) catch (...)
{ {
last_queue_update_exception.set(std::make_unique<String>(getCurrentExceptionMessage(false)));
tryLogCurrentException(log, __PRETTY_FUNCTION__); tryLogCurrentException(log, __PRETTY_FUNCTION__);
try
{
throw;
}
catch (const Coordination::Exception & e)
{
if (e.code == Coordination::Error::ZSESSIONEXPIRED)
{
restarting_thread.wakeup();
return;
}
}
queue_updating_task->scheduleAfter(QUEUE_UPDATE_ERROR_SLEEP_MS); queue_updating_task->scheduleAfter(QUEUE_UPDATE_ERROR_SLEEP_MS);
} }
} }
@ -5562,6 +5565,10 @@ void StorageReplicatedMergeTree::getStatus(Status & res, bool with_zk_fields)
res.total_replicas = 0; res.total_replicas = 0;
res.active_replicas = 0; res.active_replicas = 0;
MultiVersion<String>::Version queue_exception = last_queue_update_exception.get();
if (queue_exception)
res.last_queue_update_exception = *queue_exception;
if (with_zk_fields && !res.is_session_expired) if (with_zk_fields && !res.is_session_expired)
{ {
try try

View File

@ -174,6 +174,7 @@ public:
UInt64 absolute_delay; UInt64 absolute_delay;
UInt8 total_replicas; UInt8 total_replicas;
UInt8 active_replicas; UInt8 active_replicas;
String last_queue_update_exception;
/// If the error has happened fetching the info from ZooKeeper, this field will be set. /// If the error has happened fetching the info from ZooKeeper, this field will be set.
String zookeeper_exception; String zookeeper_exception;
}; };
@ -329,6 +330,7 @@ private:
ReplicatedMergeTreeQueue queue; ReplicatedMergeTreeQueue queue;
std::atomic<time_t> last_queue_update_start_time{0}; std::atomic<time_t> last_queue_update_start_time{0};
std::atomic<time_t> last_queue_update_finish_time{0}; std::atomic<time_t> last_queue_update_finish_time{0};
MultiVersion<String> last_queue_update_exception;
DataPartsExchange::Fetcher fetcher; DataPartsExchange::Fetcher fetcher;

View File

@ -50,6 +50,7 @@ StorageSystemReplicas::StorageSystemReplicas(const StorageID & table_id_)
{ "absolute_delay", std::make_shared<DataTypeUInt64>() }, { "absolute_delay", std::make_shared<DataTypeUInt64>() },
{ "total_replicas", std::make_shared<DataTypeUInt8>() }, { "total_replicas", std::make_shared<DataTypeUInt8>() },
{ "active_replicas", std::make_shared<DataTypeUInt8>() }, { "active_replicas", std::make_shared<DataTypeUInt8>() },
{ "last_queue_update_exception", std::make_shared<DataTypeString>() },
{ "zookeeper_exception", std::make_shared<DataTypeString>() }, { "zookeeper_exception", std::make_shared<DataTypeString>() },
})); }));
setInMemoryMetadata(storage_metadata); setInMemoryMetadata(storage_metadata);
@ -183,6 +184,7 @@ Pipe StorageSystemReplicas::read(
res_columns[col_num++]->insert(status.absolute_delay); res_columns[col_num++]->insert(status.absolute_delay);
res_columns[col_num++]->insert(status.total_replicas); res_columns[col_num++]->insert(status.total_replicas);
res_columns[col_num++]->insert(status.active_replicas); res_columns[col_num++]->insert(status.active_replicas);
res_columns[col_num++]->insert(status.last_queue_update_exception);
res_columns[col_num++]->insert(status.zookeeper_exception); res_columns[col_num++]->insert(status.zookeeper_exception);
} }