start counting absolute replica delay when there are problems with queue update [#CLICKHOUSE-2141]

This commit is contained in:
Alexey Zatelepin 2017-04-17 18:06:12 +03:00
parent b51c6e7592
commit 2bd089d589
5 changed files with 133 additions and 82 deletions

View File

@ -718,6 +718,7 @@ size_t ReplicatedMergeTreeQueue::countMerges()
void ReplicatedMergeTreeQueue::getInsertTimes(time_t & out_min_unprocessed_insert_time, time_t & out_max_processed_insert_time) const void ReplicatedMergeTreeQueue::getInsertTimes(time_t & out_min_unprocessed_insert_time, time_t & out_max_processed_insert_time) const
{ {
std::lock_guard<std::mutex> lock(mutex);
out_min_unprocessed_insert_time = min_unprocessed_insert_time; out_min_unprocessed_insert_time = min_unprocessed_insert_time;
out_max_processed_insert_time = max_processed_insert_time; out_max_processed_insert_time = max_processed_insert_time;
} }

View File

@ -57,7 +57,7 @@ private:
StringSet future_parts; StringSet future_parts;
/// To access the queue, future_parts, ... /// To access the queue, future_parts, ...
std::mutex mutex; mutable std::mutex mutex;
/// Provides only one simultaneous call to pullLogsToQueue. /// Provides only one simultaneous call to pullLogsToQueue.
std::mutex pull_logs_to_queue_mutex; std::mutex pull_logs_to_queue_mutex;

View File

@ -1568,9 +1568,11 @@ void StorageReplicatedMergeTree::queueUpdatingThread()
while (!shutdown_called) while (!shutdown_called)
{ {
last_queue_update_attempt_time.store(time(nullptr));
try try
{ {
pullLogsToQueue(queue_updating_event); pullLogsToQueue(queue_updating_event);
last_successful_queue_update_attempt_time.store(time(nullptr));
queue_updating_event->wait(); queue_updating_event->wait();
} }
catch (const zkutil::KeeperException & e) catch (const zkutil::KeeperException & e)
@ -3201,6 +3203,8 @@ void StorageReplicatedMergeTree::getStatus(Status & res, bool with_zk_fields)
res.is_session_expired = !zookeeper || zookeeper->expired(); res.is_session_expired = !zookeeper || zookeeper->expired();
res.queue = queue.getStatus(); res.queue = queue.getStatus();
res.absolute_delay = getAbsoluteDelay(); /// NOTE: may be slightly inconsistent with queue status.
res.parts_to_check = part_check_thread.size(); res.parts_to_check = part_check_thread.size();
res.zookeeper_path = zookeeper_path; res.zookeeper_path = zookeeper_path;
@ -3249,25 +3253,53 @@ void StorageReplicatedMergeTree::getQueue(LogEntriesData & res, String & replica
queue.getEntries(res); queue.getEntries(res);
} }
time_t StorageReplicatedMergeTree::getAbsoluteDelay() const
{
time_t min_unprocessed_insert_time = 0;
time_t max_processed_insert_time = 0;
queue.getInsertTimes(min_unprocessed_insert_time, max_processed_insert_time);
/// Load in reverse order to preserve consistency (successful update time must be after update start time).
/// Probably doesn't matter because pullLogsToQueue() acts as a barrier.
time_t successful_queue_update_time = last_successful_queue_update_attempt_time.load();
time_t queue_update_time = last_queue_update_attempt_time.load();
time_t current_time = time(nullptr);
if (!queue_update_time)
{
/// We have not even tried to update queue yet (perhaps replica is readonly).
/// As we have no info about the current state of replication log, return effectively infinite delay.
return current_time;
}
else if (min_unprocessed_insert_time)
{
/// There are some unprocessed insert entries in queue.
return (current_time > min_unprocessed_insert_time) ? (current_time - min_unprocessed_insert_time) : 0;
}
else if (queue_update_time > successful_queue_update_time)
{
/// Queue is empty, but there are some in-flight or failed queue update attempts
/// (likely because of problems with connecting to ZooKeeper).
/// Return the time passed since last attempt.
return (current_time > queue_update_time) ? (current_time - queue_update_time) : 0;
}
else
{
/// Everything is up-to-date.
return 0;
}
}
void StorageReplicatedMergeTree::getReplicaDelays(time_t & out_absolute_delay, time_t & out_relative_delay) void StorageReplicatedMergeTree::getReplicaDelays(time_t & out_absolute_delay, time_t & out_relative_delay)
{ {
assertNotReadonly(); assertNotReadonly();
/** Absolute delay - the lag of the current replica from real time. time_t current_time = time(nullptr);
*/
time_t min_unprocessed_insert_time = 0; out_absolute_delay = getAbsoluteDelay();
time_t max_processed_insert_time = 0;
queue.getInsertTimes(min_unprocessed_insert_time, max_processed_insert_time);
time_t current_time = time(0);
out_absolute_delay = 0;
out_relative_delay = 0; out_relative_delay = 0;
if (min_unprocessed_insert_time)
out_absolute_delay = current_time - min_unprocessed_insert_time;
/** Relative delay is the maximum difference of absolute delay from any other replica, /** Relative delay is the maximum difference of absolute delay from any other replica,
* (if this replica lags behind any other live replica, or zero, otherwise). * (if this replica lags behind any other live replica, or zero, otherwise).
* Calculated only if the absolute delay is large enough. * Calculated only if the absolute delay is large enough.
@ -3318,8 +3350,13 @@ void StorageReplicatedMergeTree::getReplicaDelays(time_t & out_absolute_delay, t
if (have_replica_with_nothing_unprocessed) if (have_replica_with_nothing_unprocessed)
out_relative_delay = out_absolute_delay; out_relative_delay = out_absolute_delay;
else if (max_replicas_unprocessed_insert_time > min_unprocessed_insert_time) else
out_relative_delay = max_replicas_unprocessed_insert_time - min_unprocessed_insert_time; {
max_replicas_unprocessed_insert_time = std::min(current_time, max_replicas_unprocessed_insert_time);
time_t min_replicas_delay = current_time - max_replicas_unprocessed_insert_time;
if (out_absolute_delay > min_replicas_delay)
out_relative_delay = out_absolute_delay - min_replicas_delay;
}
} }

View File

@ -177,6 +177,7 @@ public:
Int32 columns_version; Int32 columns_version;
UInt64 log_max_index; UInt64 log_max_index;
UInt64 log_pointer; UInt64 log_pointer;
UInt64 absolute_delay;
UInt8 total_replicas; UInt8 total_replicas;
UInt8 active_replicas; UInt8 active_replicas;
}; };
@ -187,6 +188,12 @@ public:
using LogEntriesData = std::vector<ReplicatedMergeTreeLogEntryData>; using LogEntriesData = std::vector<ReplicatedMergeTreeLogEntryData>;
void getQueue(LogEntriesData & res, String & replica_name); void getQueue(LogEntriesData & res, String & replica_name);
/// Get replica delay relative to current time.
time_t getAbsoluteDelay() const;
/// If the absolute delay is greater than min_relative_delay_to_yield_leadership,
/// will also calculate the difference from the unprocessed time of the best replica.
/// NOTE: Will communicate to ZooKeeper to calculate relative delay.
void getReplicaDelays(time_t & out_absolute_delay, time_t & out_relative_delay); void getReplicaDelays(time_t & out_absolute_delay, time_t & out_relative_delay);
/// Add a part to the queue of parts whose data you want to check in the background thread. /// Add a part to the queue of parts whose data you want to check in the background thread.
@ -238,6 +245,8 @@ private:
* In ZK entries in chronological order. Here it is not necessary. * In ZK entries in chronological order. Here it is not necessary.
*/ */
ReplicatedMergeTreeQueue queue; ReplicatedMergeTreeQueue queue;
std::atomic<time_t> last_queue_update_attempt_time{0};
std::atomic<time_t> last_successful_queue_update_attempt_time{0};
/** /replicas/me/is_active. /** /replicas/me/is_active.
*/ */

View File

@ -39,6 +39,7 @@ StorageSystemReplicas::StorageSystemReplicas(const std::string & name_)
{ "log_max_index", std::make_shared<DataTypeUInt64>() }, { "log_max_index", std::make_shared<DataTypeUInt64>() },
{ "log_pointer", std::make_shared<DataTypeUInt64>() }, { "log_pointer", std::make_shared<DataTypeUInt64>() },
{ "last_queue_update", std::make_shared<DataTypeDateTime>() }, { "last_queue_update", std::make_shared<DataTypeDateTime>() },
{ "absolute_delay", std::make_shared<DataTypeUInt64>() },
{ "total_replicas", std::make_shared<DataTypeUInt8>() }, { "total_replicas", std::make_shared<DataTypeUInt8>() },
{ "active_replicas", std::make_shared<DataTypeUInt8>() }, { "active_replicas", std::make_shared<DataTypeUInt8>() },
} }
@ -132,6 +133,7 @@ BlockInputStreams StorageSystemReplicas::read(
ColumnWithTypeAndName col_log_max_index{std::make_shared<ColumnUInt64>(), std::make_shared<DataTypeUInt64>(), "log_max_index"}; ColumnWithTypeAndName col_log_max_index{std::make_shared<ColumnUInt64>(), std::make_shared<DataTypeUInt64>(), "log_max_index"};
ColumnWithTypeAndName col_log_pointer{std::make_shared<ColumnUInt64>(), std::make_shared<DataTypeUInt64>(), "log_pointer"}; ColumnWithTypeAndName col_log_pointer{std::make_shared<ColumnUInt64>(), std::make_shared<DataTypeUInt64>(), "log_pointer"};
ColumnWithTypeAndName col_last_queue_update{std::make_shared<ColumnUInt32>(), std::make_shared<DataTypeDateTime>(), "last_queue_update"}; ColumnWithTypeAndName col_last_queue_update{std::make_shared<ColumnUInt32>(), std::make_shared<DataTypeDateTime>(), "last_queue_update"};
ColumnWithTypeAndName col_absolute_delay{std::make_shared<ColumnUInt64>(), std::make_shared<DataTypeUInt64>(), "absolute_delay"};
ColumnWithTypeAndName col_total_replicas{std::make_shared<ColumnUInt8>(), std::make_shared<DataTypeUInt8>(), "total_replicas"}; ColumnWithTypeAndName col_total_replicas{std::make_shared<ColumnUInt8>(), std::make_shared<DataTypeUInt8>(), "total_replicas"};
ColumnWithTypeAndName col_active_replicas{std::make_shared<ColumnUInt8>(), std::make_shared<DataTypeUInt8>(), "active_replicas"}; ColumnWithTypeAndName col_active_replicas{std::make_shared<ColumnUInt8>(), std::make_shared<DataTypeUInt8>(), "active_replicas"};
@ -163,6 +165,7 @@ BlockInputStreams StorageSystemReplicas::read(
col_log_max_index.column->insert(status.log_max_index); col_log_max_index.column->insert(status.log_max_index);
col_log_pointer.column->insert(status.log_pointer); col_log_pointer.column->insert(status.log_pointer);
col_last_queue_update.column->insert(UInt64(status.queue.last_queue_update)); col_last_queue_update.column->insert(UInt64(status.queue.last_queue_update));
col_absolute_delay.column->insert(UInt64(status.absolute_delay));
col_total_replicas.column->insert(UInt64(status.total_replicas)); col_total_replicas.column->insert(UInt64(status.total_replicas));
col_active_replicas.column->insert(UInt64(status.active_replicas)); col_active_replicas.column->insert(UInt64(status.active_replicas));
} }
@ -191,6 +194,7 @@ BlockInputStreams StorageSystemReplicas::read(
col_log_max_index, col_log_max_index,
col_log_pointer, col_log_pointer,
col_last_queue_update, col_last_queue_update,
col_absolute_delay,
col_total_replicas, col_total_replicas,
col_active_replicas, col_active_replicas,
}; };