diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp index 5ba585a6178..bfad3140498 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp @@ -718,6 +718,7 @@ size_t ReplicatedMergeTreeQueue::countMerges() void ReplicatedMergeTreeQueue::getInsertTimes(time_t & out_min_unprocessed_insert_time, time_t & out_max_processed_insert_time) const { + std::lock_guard lock(mutex); out_min_unprocessed_insert_time = min_unprocessed_insert_time; out_max_processed_insert_time = max_processed_insert_time; } diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h index 80eee2e6338..65be4e3d24b 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h @@ -57,7 +57,7 @@ private: StringSet future_parts; /// To access the queue, future_parts, ... - std::mutex mutex; + mutable std::mutex mutex; /// Provides only one simultaneous call to pullLogsToQueue. std::mutex pull_logs_to_queue_mutex; diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index c4055c2266b..6f7a370e457 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -1568,9 +1568,11 @@ void StorageReplicatedMergeTree::queueUpdatingThread() while (!shutdown_called) { + last_queue_update_attempt_time.store(time(nullptr)); try { pullLogsToQueue(queue_updating_event); + last_successful_queue_update_attempt_time.store(time(nullptr)); queue_updating_event->wait(); } catch (const zkutil::KeeperException & e) @@ -3201,6 +3203,8 @@ void StorageReplicatedMergeTree::getStatus(Status & res, bool with_zk_fields) res.is_session_expired = !zookeeper || zookeeper->expired(); res.queue = queue.getStatus(); + res.absolute_delay = getAbsoluteDelay(); /// NOTE: may be slightly inconsistent with queue status. + res.parts_to_check = part_check_thread.size(); res.zookeeper_path = zookeeper_path; @@ -3249,25 +3253,53 @@ void StorageReplicatedMergeTree::getQueue(LogEntriesData & res, String & replica queue.getEntries(res); } +time_t StorageReplicatedMergeTree::getAbsoluteDelay() const +{ + time_t min_unprocessed_insert_time = 0; + time_t max_processed_insert_time = 0; + queue.getInsertTimes(min_unprocessed_insert_time, max_processed_insert_time); + + /// Load in reverse order to preserve consistency (successful update time must be after update start time). + /// Probably doesn't matter because pullLogsToQueue() acts as a barrier. + time_t successful_queue_update_time = last_successful_queue_update_attempt_time.load(); + time_t queue_update_time = last_queue_update_attempt_time.load(); + + time_t current_time = time(nullptr); + + if (!queue_update_time) + { + /// We have not even tried to update queue yet (perhaps replica is readonly). + /// As we have no info about the current state of replication log, return effectively infinite delay. + return current_time; + } + else if (min_unprocessed_insert_time) + { + /// There are some unprocessed insert entries in queue. + return (current_time > min_unprocessed_insert_time) ? (current_time - min_unprocessed_insert_time) : 0; + } + else if (queue_update_time > successful_queue_update_time) + { + /// Queue is empty, but there are some in-flight or failed queue update attempts + /// (likely because of problems with connecting to ZooKeeper). + /// Return the time passed since last attempt. + return (current_time > queue_update_time) ? (current_time - queue_update_time) : 0; + } + else + { + /// Everything is up-to-date. + return 0; + } +} void StorageReplicatedMergeTree::getReplicaDelays(time_t & out_absolute_delay, time_t & out_relative_delay) { assertNotReadonly(); - /** Absolute delay - the lag of the current replica from real time. - */ + time_t current_time = time(nullptr); - time_t min_unprocessed_insert_time = 0; - time_t max_processed_insert_time = 0; - queue.getInsertTimes(min_unprocessed_insert_time, max_processed_insert_time); - - time_t current_time = time(0); - out_absolute_delay = 0; + out_absolute_delay = getAbsoluteDelay(); out_relative_delay = 0; - if (min_unprocessed_insert_time) - out_absolute_delay = current_time - min_unprocessed_insert_time; - /** Relative delay is the maximum difference of absolute delay from any other replica, * (if this replica lags behind any other live replica, or zero, otherwise). * Calculated only if the absolute delay is large enough. @@ -3318,8 +3350,13 @@ void StorageReplicatedMergeTree::getReplicaDelays(time_t & out_absolute_delay, t if (have_replica_with_nothing_unprocessed) out_relative_delay = out_absolute_delay; - else if (max_replicas_unprocessed_insert_time > min_unprocessed_insert_time) - out_relative_delay = max_replicas_unprocessed_insert_time - min_unprocessed_insert_time; + else + { + max_replicas_unprocessed_insert_time = std::min(current_time, max_replicas_unprocessed_insert_time); + time_t min_replicas_delay = current_time - max_replicas_unprocessed_insert_time; + if (out_absolute_delay > min_replicas_delay) + out_relative_delay = out_absolute_delay - min_replicas_delay; + } } diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.h b/dbms/src/Storages/StorageReplicatedMergeTree.h index 2a99ce4a46e..00d29f22dfa 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.h +++ b/dbms/src/Storages/StorageReplicatedMergeTree.h @@ -177,6 +177,7 @@ public: Int32 columns_version; UInt64 log_max_index; UInt64 log_pointer; + UInt64 absolute_delay; UInt8 total_replicas; UInt8 active_replicas; }; @@ -187,6 +188,12 @@ public: using LogEntriesData = std::vector; void getQueue(LogEntriesData & res, String & replica_name); + /// Get replica delay relative to current time. + time_t getAbsoluteDelay() const; + + /// If the absolute delay is greater than min_relative_delay_to_yield_leadership, + /// will also calculate the difference from the unprocessed time of the best replica. + /// NOTE: Will communicate to ZooKeeper to calculate relative delay. void getReplicaDelays(time_t & out_absolute_delay, time_t & out_relative_delay); /// Add a part to the queue of parts whose data you want to check in the background thread. @@ -238,6 +245,8 @@ private: * In ZK entries in chronological order. Here it is not necessary. */ ReplicatedMergeTreeQueue queue; + std::atomic last_queue_update_attempt_time{0}; + std::atomic last_successful_queue_update_attempt_time{0}; /** /replicas/me/is_active. */ diff --git a/dbms/src/Storages/System/StorageSystemReplicas.cpp b/dbms/src/Storages/System/StorageSystemReplicas.cpp index 8b236ec81af..53b722ea5cf 100644 --- a/dbms/src/Storages/System/StorageSystemReplicas.cpp +++ b/dbms/src/Storages/System/StorageSystemReplicas.cpp @@ -16,31 +16,32 @@ namespace DB StorageSystemReplicas::StorageSystemReplicas(const std::string & name_) : name(name_) , columns{ - { "database", std::make_shared() }, - { "table", std::make_shared() }, - { "engine", std::make_shared() }, - { "is_leader", std::make_shared() }, - { "is_readonly", std::make_shared() }, - { "is_session_expired", std::make_shared() }, - { "future_parts", std::make_shared() }, - { "parts_to_check", std::make_shared() }, - { "zookeeper_path", std::make_shared() }, - { "replica_name", std::make_shared() }, - { "replica_path", std::make_shared() }, - { "columns_version", std::make_shared() }, - { "queue_size", std::make_shared() }, - { "inserts_in_queue", std::make_shared() }, - { "merges_in_queue", std::make_shared() }, - { "queue_oldest_time", std::make_shared()}, - { "inserts_oldest_time", std::make_shared()}, - { "merges_oldest_time", std::make_shared()}, - { "oldest_part_to_get", std::make_shared() }, - { "oldest_part_to_merge_to",std::make_shared() }, - { "log_max_index", std::make_shared() }, - { "log_pointer", std::make_shared() }, - { "last_queue_update", std::make_shared()}, - { "total_replicas", std::make_shared() }, - { "active_replicas", std::make_shared() }, + { "database", std::make_shared() }, + { "table", std::make_shared() }, + { "engine", std::make_shared() }, + { "is_leader", std::make_shared() }, + { "is_readonly", std::make_shared() }, + { "is_session_expired", std::make_shared() }, + { "future_parts", std::make_shared() }, + { "parts_to_check", std::make_shared() }, + { "zookeeper_path", std::make_shared() }, + { "replica_name", std::make_shared() }, + { "replica_path", std::make_shared() }, + { "columns_version", std::make_shared() }, + { "queue_size", std::make_shared() }, + { "inserts_in_queue", std::make_shared() }, + { "merges_in_queue", std::make_shared() }, + { "queue_oldest_time", std::make_shared() }, + { "inserts_oldest_time", std::make_shared() }, + { "merges_oldest_time", std::make_shared() }, + { "oldest_part_to_get", std::make_shared() }, + { "oldest_part_to_merge_to", std::make_shared() }, + { "log_max_index", std::make_shared() }, + { "log_pointer", std::make_shared() }, + { "last_queue_update", std::make_shared() }, + { "absolute_delay", std::make_shared() }, + { "total_replicas", std::make_shared() }, + { "active_replicas", std::make_shared() }, } { } @@ -112,28 +113,29 @@ BlockInputStreams StorageSystemReplicas::read( col_engine = filtered_block.getByName("engine"); } - ColumnWithTypeAndName col_is_leader { std::make_shared(), std::make_shared(), "is_leader"}; - ColumnWithTypeAndName col_is_readonly { std::make_shared(), std::make_shared(), "is_readonly"}; - ColumnWithTypeAndName col_is_session_expired{ std::make_shared(), std::make_shared(), "is_session_expired"}; - ColumnWithTypeAndName col_future_parts { std::make_shared(), std::make_shared(), "future_parts"}; - ColumnWithTypeAndName col_parts_to_check { std::make_shared(), std::make_shared(), "parts_to_check"}; - ColumnWithTypeAndName col_zookeeper_path { std::make_shared(), std::make_shared(), "zookeeper_path"}; - ColumnWithTypeAndName col_replica_name { std::make_shared(), std::make_shared(), "replica_name"}; - ColumnWithTypeAndName col_replica_path { std::make_shared(), std::make_shared(), "replica_path"}; - ColumnWithTypeAndName col_columns_version { std::make_shared(), std::make_shared(), "columns_version"}; - ColumnWithTypeAndName col_queue_size { std::make_shared(), std::make_shared(), "queue_size"}; - ColumnWithTypeAndName col_inserts_in_queue { std::make_shared(), std::make_shared(), "inserts_in_queue"}; - ColumnWithTypeAndName col_merges_in_queue { std::make_shared(), std::make_shared(), "merges_in_queue"}; - ColumnWithTypeAndName col_queue_oldest_time { std::make_shared(), std::make_shared(), "queue_oldest_time"}; - ColumnWithTypeAndName col_inserts_oldest_time{ std::make_shared(),std::make_shared(), "inserts_oldest_time"}; - ColumnWithTypeAndName col_merges_oldest_time{ std::make_shared(), std::make_shared(), "merges_oldest_time"}; - ColumnWithTypeAndName col_oldest_part_to_get{ std::make_shared(), std::make_shared(), "oldest_part_to_get"}; - ColumnWithTypeAndName col_oldest_part_to_merge_to{ std::make_shared(), std::make_shared(), "oldest_part_to_merge_to"}; - ColumnWithTypeAndName col_log_max_index { std::make_shared(), std::make_shared(), "log_max_index"}; - ColumnWithTypeAndName col_log_pointer { std::make_shared(), std::make_shared(), "log_pointer"}; - ColumnWithTypeAndName col_last_queue_update { std::make_shared(), std::make_shared(), "last_queue_update"}; - ColumnWithTypeAndName col_total_replicas { std::make_shared(), std::make_shared(), "total_replicas"}; - ColumnWithTypeAndName col_active_replicas { std::make_shared(), std::make_shared(), "active_replicas"}; + ColumnWithTypeAndName col_is_leader{std::make_shared(), std::make_shared(), "is_leader"}; + ColumnWithTypeAndName col_is_readonly{std::make_shared(), std::make_shared(), "is_readonly"}; + ColumnWithTypeAndName col_is_session_expired{std::make_shared(), std::make_shared(), "is_session_expired"}; + ColumnWithTypeAndName col_future_parts{std::make_shared(), std::make_shared(), "future_parts"}; + ColumnWithTypeAndName col_parts_to_check{std::make_shared(), std::make_shared(), "parts_to_check"}; + ColumnWithTypeAndName col_zookeeper_path{std::make_shared(), std::make_shared(), "zookeeper_path"}; + ColumnWithTypeAndName col_replica_name{std::make_shared(), std::make_shared(), "replica_name"}; + ColumnWithTypeAndName col_replica_path{std::make_shared(), std::make_shared(), "replica_path"}; + ColumnWithTypeAndName col_columns_version{std::make_shared(), std::make_shared(), "columns_version"}; + ColumnWithTypeAndName col_queue_size{std::make_shared(), std::make_shared(), "queue_size"}; + ColumnWithTypeAndName col_inserts_in_queue{std::make_shared(), std::make_shared(), "inserts_in_queue"}; + ColumnWithTypeAndName col_merges_in_queue{std::make_shared(), std::make_shared(), "merges_in_queue"}; + ColumnWithTypeAndName col_queue_oldest_time{std::make_shared(), std::make_shared(), "queue_oldest_time"}; + ColumnWithTypeAndName col_inserts_oldest_time{std::make_shared(), std::make_shared(), "inserts_oldest_time"}; + ColumnWithTypeAndName col_merges_oldest_time{std::make_shared(), std::make_shared(), "merges_oldest_time"}; + ColumnWithTypeAndName col_oldest_part_to_get{std::make_shared(), std::make_shared(), "oldest_part_to_get"}; + ColumnWithTypeAndName col_oldest_part_to_merge_to{std::make_shared(), std::make_shared(), "oldest_part_to_merge_to"}; + ColumnWithTypeAndName col_log_max_index{std::make_shared(), std::make_shared(), "log_max_index"}; + ColumnWithTypeAndName col_log_pointer{std::make_shared(), std::make_shared(), "log_pointer"}; + ColumnWithTypeAndName col_last_queue_update{std::make_shared(), std::make_shared(), "last_queue_update"}; + ColumnWithTypeAndName col_absolute_delay{std::make_shared(), std::make_shared(), "absolute_delay"}; + ColumnWithTypeAndName col_total_replicas{std::make_shared(), std::make_shared(), "total_replicas"}; + ColumnWithTypeAndName col_active_replicas{std::make_shared(), std::make_shared(), "active_replicas"}; for (size_t i = 0, size = col_database.column->size(); i < size; ++i) { @@ -143,28 +145,29 @@ BlockInputStreams StorageSystemReplicas::read( [(*col_database.column)[i].safeGet()] [(*col_table.column)[i].safeGet()]).getStatus(status, with_zk_fields); - col_is_leader .column->insert(UInt64(status.is_leader)); - col_is_readonly .column->insert(UInt64(status.is_readonly)); - col_is_session_expired .column->insert(UInt64(status.is_session_expired)); - col_future_parts .column->insert(UInt64(status.queue.future_parts)); - col_parts_to_check .column->insert(UInt64(status.parts_to_check)); - col_zookeeper_path .column->insert(status.zookeeper_path); - col_replica_name .column->insert(status.replica_name); - col_replica_path .column->insert(status.replica_path); - col_columns_version .column->insert(Int64(status.columns_version)); - col_queue_size .column->insert(UInt64(status.queue.queue_size)); - col_inserts_in_queue .column->insert(UInt64(status.queue.inserts_in_queue)); - col_merges_in_queue .column->insert(UInt64(status.queue.merges_in_queue)); - col_queue_oldest_time .column->insert(UInt64(status.queue.queue_oldest_time)); - col_inserts_oldest_time .column->insert(UInt64(status.queue.inserts_oldest_time)); - col_merges_oldest_time .column->insert(UInt64(status.queue.merges_oldest_time)); - col_oldest_part_to_get .column->insert(status.queue.oldest_part_to_get); + col_is_leader.column->insert(UInt64(status.is_leader)); + col_is_readonly.column->insert(UInt64(status.is_readonly)); + col_is_session_expired.column->insert(UInt64(status.is_session_expired)); + col_future_parts.column->insert(UInt64(status.queue.future_parts)); + col_parts_to_check.column->insert(UInt64(status.parts_to_check)); + col_zookeeper_path.column->insert(status.zookeeper_path); + col_replica_name.column->insert(status.replica_name); + col_replica_path.column->insert(status.replica_path); + col_columns_version.column->insert(Int64(status.columns_version)); + col_queue_size.column->insert(UInt64(status.queue.queue_size)); + col_inserts_in_queue.column->insert(UInt64(status.queue.inserts_in_queue)); + col_merges_in_queue.column->insert(UInt64(status.queue.merges_in_queue)); + col_queue_oldest_time.column->insert(UInt64(status.queue.queue_oldest_time)); + col_inserts_oldest_time.column->insert(UInt64(status.queue.inserts_oldest_time)); + col_merges_oldest_time.column->insert(UInt64(status.queue.merges_oldest_time)); + col_oldest_part_to_get.column->insert(status.queue.oldest_part_to_get); col_oldest_part_to_merge_to.column->insert(status.queue.oldest_part_to_merge_to); - col_log_max_index .column->insert(status.log_max_index); - col_log_pointer .column->insert(status.log_pointer); - col_last_queue_update .column->insert(UInt64(status.queue.last_queue_update)); - col_total_replicas .column->insert(UInt64(status.total_replicas)); - col_active_replicas .column->insert(UInt64(status.active_replicas)); + col_log_max_index.column->insert(status.log_max_index); + col_log_pointer.column->insert(status.log_pointer); + col_last_queue_update.column->insert(UInt64(status.queue.last_queue_update)); + col_absolute_delay.column->insert(UInt64(status.absolute_delay)); + col_total_replicas.column->insert(UInt64(status.total_replicas)); + col_active_replicas.column->insert(UInt64(status.active_replicas)); } Block block{ @@ -191,6 +194,7 @@ BlockInputStreams StorageSystemReplicas::read( col_log_max_index, col_log_pointer, col_last_queue_update, + col_absolute_delay, col_total_replicas, col_active_replicas, };