2017-11-23 13:12:22 +00:00
|
|
|
#pragma once
|
|
|
|
|
2018-03-03 16:46:32 +00:00
|
|
|
#include <optional>
|
|
|
|
|
2018-05-21 13:49:54 +00:00
|
|
|
#include <Common/ActionBlocker.h>
|
2023-03-31 14:09:00 +00:00
|
|
|
#include <Parsers/SyncReplicaMode.h>
|
2017-04-01 09:19:00 +00:00
|
|
|
#include <Storages/MergeTree/ReplicatedMergeTreeLogEntry.h>
|
2018-04-19 14:20:18 +00:00
|
|
|
#include <Storages/MergeTree/ReplicatedMergeTreeMutationEntry.h>
|
2017-04-01 09:19:00 +00:00
|
|
|
#include <Storages/MergeTree/ActiveDataPartSet.h>
|
|
|
|
#include <Storages/MergeTree/MergeTreeData.h>
|
2018-06-07 13:28:39 +00:00
|
|
|
#include <Storages/MergeTree/MergeTreeMutationStatus.h>
|
2020-11-24 14:24:48 +00:00
|
|
|
#include <Storages/MergeTree/PinnedPartUUIDs.h>
|
2018-10-18 14:14:07 +00:00
|
|
|
#include <Storages/MergeTree/ReplicatedMergeTreeQuorumAddedParts.h>
|
2020-02-17 18:07:22 +00:00
|
|
|
#include <Storages/MergeTree/ReplicatedMergeTreeAltersSequence.h>
|
2021-07-05 19:58:36 +00:00
|
|
|
#include <Storages/MergeTree/DropPartsRanges.h>
|
2016-01-10 04:44:12 +00:00
|
|
|
|
2017-06-19 20:06:35 +00:00
|
|
|
#include <Common/ZooKeeper/ZooKeeper.h>
|
2016-01-10 04:44:12 +00:00
|
|
|
|
|
|
|
|
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
|
2018-05-20 19:56:03 +00:00
|
|
|
class StorageReplicatedMergeTree;
|
2018-04-20 16:18:16 +00:00
|
|
|
class MergeTreeDataMergerMutator;
|
2016-01-10 04:44:12 +00:00
|
|
|
|
2018-05-10 15:01:10 +00:00
|
|
|
class ReplicatedMergeTreeMergePredicate;
|
2020-09-18 10:57:33 +00:00
|
|
|
class ReplicatedMergeTreeMergeStrategyPicker;
|
2018-05-10 15:01:10 +00:00
|
|
|
|
2022-11-25 15:41:20 +00:00
|
|
|
using PartitionIdsHint = std::unordered_set<String>;
|
2016-01-10 04:44:12 +00:00
|
|
|
|
|
|
|
class ReplicatedMergeTreeQueue
|
|
|
|
{
|
|
|
|
private:
|
2017-04-01 07:20:54 +00:00
|
|
|
friend class CurrentlyExecuting;
|
2018-05-10 15:01:10 +00:00
|
|
|
friend class ReplicatedMergeTreeMergePredicate;
|
2021-09-16 21:19:58 +00:00
|
|
|
friend class MergeFromLogEntryTask;
|
|
|
|
friend class ReplicatedMergeMutateTaskBase;
|
2016-01-10 04:44:12 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
using LogEntry = ReplicatedMergeTreeLogEntry;
|
|
|
|
using LogEntryPtr = LogEntry::Ptr;
|
2016-01-10 04:44:12 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
using Queue = std::list<LogEntryPtr>;
|
2016-01-10 04:44:12 +00:00
|
|
|
|
2018-05-23 14:33:55 +00:00
|
|
|
using StringSet = std::set<String>;
|
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
struct ByTime
|
|
|
|
{
|
|
|
|
bool operator()(const LogEntryPtr & lhs, const LogEntryPtr & rhs) const
|
|
|
|
{
|
2017-09-01 18:21:01 +00:00
|
|
|
return std::forward_as_tuple(lhs->create_time, lhs.get())
|
|
|
|
< std::forward_as_tuple(rhs->create_time, rhs.get());
|
2017-04-01 07:20:54 +00:00
|
|
|
}
|
|
|
|
};
|
2016-01-17 13:00:42 +00:00
|
|
|
|
2020-09-02 08:18:50 +00:00
|
|
|
struct OperationsInQueue
|
|
|
|
{
|
|
|
|
size_t merges = 0;
|
|
|
|
size_t mutations = 0;
|
|
|
|
size_t merges_with_ttl = 0;
|
|
|
|
};
|
|
|
|
|
2017-04-16 15:00:33 +00:00
|
|
|
/// To calculate min_unprocessed_insert_time, max_processed_insert_time, for which the replica lag is calculated.
|
2017-04-01 07:20:54 +00:00
|
|
|
using InsertsByTime = std::set<LogEntryPtr, ByTime>;
|
2016-01-17 13:00:42 +00:00
|
|
|
|
2018-05-20 19:56:03 +00:00
|
|
|
StorageReplicatedMergeTree & storage;
|
2020-09-18 10:57:33 +00:00
|
|
|
ReplicatedMergeTreeMergeStrategyPicker & merge_strategy_picker;
|
2017-11-28 14:07:17 +00:00
|
|
|
MergeTreeDataFormatVersion format_version;
|
2017-08-25 20:41:45 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
String zookeeper_path;
|
|
|
|
String replica_path;
|
|
|
|
String logger_name;
|
2020-05-30 21:57:37 +00:00
|
|
|
Poco::Logger * log = nullptr;
|
2018-05-10 15:01:10 +00:00
|
|
|
|
|
|
|
/// Protects the queue, future_parts and other queue state variables.
|
2018-06-20 11:12:16 +00:00
|
|
|
mutable std::mutex state_mutex;
|
2016-01-10 04:44:12 +00:00
|
|
|
|
2018-06-22 10:43:35 +00:00
|
|
|
/// A set of parts that should be on this replica according to the queue entries that have been done
|
|
|
|
/// up to this point. The invariant holds: `virtual_parts` = `current_parts` + `queue`.
|
2018-06-18 12:17:46 +00:00
|
|
|
/// Note: it can be different from the actual set of parts because the replica can decide to fetch
|
|
|
|
/// a bigger part instead of the part mentioned in the log entry.
|
|
|
|
ActiveDataPartSet current_parts;
|
|
|
|
|
2017-04-16 15:00:33 +00:00
|
|
|
/** The queue of what you need to do on this line to catch up. It is taken from ZooKeeper (/replicas/me/queue/).
|
2020-02-10 13:32:59 +00:00
|
|
|
* In ZK records in chronological order. Here they are executed in parallel and reorder after entry execution.
|
|
|
|
* Order of execution is not "queue" at all. Look at selectEntryToProcess.
|
2017-04-01 07:20:54 +00:00
|
|
|
*/
|
|
|
|
Queue queue;
|
2016-01-10 04:44:12 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
InsertsByTime inserts_by_time;
|
2023-02-12 06:39:16 +00:00
|
|
|
std::atomic<time_t> min_unprocessed_insert_time = 0;
|
|
|
|
std::atomic<time_t> max_processed_insert_time = 0;
|
2016-01-17 13:00:42 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
time_t last_queue_update = 0;
|
2016-01-10 04:44:12 +00:00
|
|
|
|
2017-04-16 15:00:33 +00:00
|
|
|
/// parts that will appear as a result of actions performed right now by background threads (these actions are not in the queue).
|
2018-06-18 12:17:46 +00:00
|
|
|
/// Used to block other actions on parts in the range covered by future_parts.
|
2018-05-23 14:33:55 +00:00
|
|
|
using FuturePartsSet = std::map<String, LogEntryPtr>;
|
|
|
|
FuturePartsSet future_parts;
|
2016-01-10 04:44:12 +00:00
|
|
|
|
2020-08-11 20:58:39 +00:00
|
|
|
/// Avoid parallel execution of queue enties, which may remove other entries from the queue.
|
2022-08-15 18:32:03 +00:00
|
|
|
std::set<MergeTreePartInfo> currently_executing_drop_replace_ranges;
|
2020-08-11 20:58:39 +00:00
|
|
|
|
2018-05-10 15:01:10 +00:00
|
|
|
/** What will be the set of active parts after executing all log entries up to log_pointer.
|
|
|
|
* Used to determine which merges can be assigned (see ReplicatedMergeTreeMergePredicate)
|
2017-04-01 07:20:54 +00:00
|
|
|
*/
|
|
|
|
ActiveDataPartSet virtual_parts;
|
2016-01-10 04:44:12 +00:00
|
|
|
|
2021-07-05 19:58:36 +00:00
|
|
|
|
2023-01-31 12:37:56 +00:00
|
|
|
/// We do not add DROP_PARTs to virtual_parts because they can intersect,
|
|
|
|
/// so we store them separately in this structure.
|
|
|
|
DropPartsRanges drop_parts;
|
2021-07-05 19:58:36 +00:00
|
|
|
|
2018-04-19 14:20:18 +00:00
|
|
|
/// A set of mutations loaded from ZooKeeper.
|
2018-06-21 13:27:36 +00:00
|
|
|
/// mutations_by_partition is an index partition ID -> block ID -> mutation into this set.
|
2018-04-19 14:20:18 +00:00
|
|
|
/// Note that mutations are updated in such a way that they are always more recent than
|
|
|
|
/// log_pointer (see pullLogsToQueue()).
|
2018-06-18 12:17:46 +00:00
|
|
|
|
|
|
|
struct MutationStatus
|
|
|
|
{
|
2020-02-05 16:30:02 +00:00
|
|
|
MutationStatus(const ReplicatedMergeTreeMutationEntryPtr & entry_, MergeTreeDataFormatVersion format_version_)
|
2019-02-05 13:03:52 +00:00
|
|
|
: entry(entry_)
|
2020-02-05 16:30:02 +00:00
|
|
|
, parts_to_do(format_version_)
|
2019-02-05 13:03:52 +00:00
|
|
|
{
|
|
|
|
}
|
|
|
|
|
2018-06-18 12:17:46 +00:00
|
|
|
ReplicatedMergeTreeMutationEntryPtr entry;
|
|
|
|
|
2020-06-15 13:37:40 +00:00
|
|
|
/// Current parts we have to mutate to complete mutation.
|
|
|
|
///
|
|
|
|
/// current_part_name =mutation> result_part_name
|
|
|
|
/// ^~~parts_to_do~~^ ^~virtual_parts~^
|
|
|
|
///
|
|
|
|
/// We use ActiveDataPartSet structure to be able to manage covering and
|
|
|
|
/// covered parts.
|
2020-02-05 16:30:02 +00:00
|
|
|
ActiveDataPartSet parts_to_do;
|
2018-06-21 13:27:36 +00:00
|
|
|
|
2020-02-05 16:30:02 +00:00
|
|
|
/// Note that is_done is not equivalent to parts_to_do.size() == 0
|
|
|
|
/// (even if parts_to_do.size() == 0 some relevant parts can still commit in the future).
|
2020-08-08 00:47:03 +00:00
|
|
|
/// Also we can jump over mutation when we download mutated part from other replica.
|
2018-06-21 13:27:36 +00:00
|
|
|
bool is_done = false;
|
2019-02-05 13:03:52 +00:00
|
|
|
|
|
|
|
String latest_failed_part;
|
|
|
|
MergeTreePartInfo latest_failed_part_info;
|
|
|
|
time_t latest_fail_time = 0;
|
|
|
|
String latest_fail_reason;
|
2018-06-18 12:17:46 +00:00
|
|
|
};
|
|
|
|
|
2020-01-15 13:00:08 +00:00
|
|
|
/// Mapping from znode path to Mutations Status
|
2018-06-18 12:17:46 +00:00
|
|
|
std::map<String, MutationStatus> mutations_by_znode;
|
2020-01-31 19:30:33 +00:00
|
|
|
/// Partition -> (block_number -> MutationStatus)
|
2018-06-18 12:17:46 +00:00
|
|
|
std::unordered_map<String, std::map<Int64, MutationStatus *>> mutations_by_partition;
|
2018-06-21 13:27:36 +00:00
|
|
|
/// Znode ID of the latest mutation that is done.
|
|
|
|
String mutation_pointer;
|
2018-04-19 14:20:18 +00:00
|
|
|
|
2018-05-10 15:01:10 +00:00
|
|
|
/// Provides only one simultaneous call to pullLogsToQueue.
|
|
|
|
std::mutex pull_logs_to_queue_mutex;
|
2016-01-10 04:44:12 +00:00
|
|
|
|
2020-02-17 18:07:22 +00:00
|
|
|
/// This sequence control ALTERs execution in replication queue.
|
|
|
|
/// We need it because alters have to be executed sequentially (one by one).
|
|
|
|
ReplicatedMergeTreeAltersSequence alter_sequence;
|
2018-05-28 15:37:30 +00:00
|
|
|
|
2021-10-18 20:16:02 +00:00
|
|
|
Strings broken_parts_to_enqueue_fetches_on_loading;
|
|
|
|
|
2018-05-21 13:49:54 +00:00
|
|
|
/// List of subscribers
|
|
|
|
/// A subscriber callback is called when an entry queue is deleted
|
|
|
|
mutable std::mutex subscribers_mutex;
|
|
|
|
|
2023-03-27 23:39:36 +00:00
|
|
|
using SubscriberCallBack = std::function<void(size_t /* queue_size */, const String * /* removed_log_entry_id */)>;
|
2018-05-21 13:49:54 +00:00
|
|
|
using Subscribers = std::list<SubscriberCallBack>;
|
|
|
|
using SubscriberIterator = Subscribers::iterator;
|
|
|
|
|
|
|
|
friend class SubscriberHandler;
|
|
|
|
struct SubscriberHandler : public boost::noncopyable
|
|
|
|
{
|
2019-08-03 11:02:40 +00:00
|
|
|
SubscriberHandler(SubscriberIterator it_, ReplicatedMergeTreeQueue & queue_) : it(it_), queue(queue_) {}
|
2018-05-21 13:49:54 +00:00
|
|
|
~SubscriberHandler();
|
|
|
|
|
|
|
|
private:
|
|
|
|
SubscriberIterator it;
|
|
|
|
ReplicatedMergeTreeQueue & queue;
|
|
|
|
};
|
|
|
|
|
|
|
|
Subscribers subscribers;
|
|
|
|
|
2023-02-07 15:57:58 +00:00
|
|
|
/// Notify subscribers about queue change (new queue size and entry that was removed)
|
2023-03-27 23:39:36 +00:00
|
|
|
void notifySubscribers(size_t new_queue_size, const String * removed_log_entry_id);
|
2018-05-21 13:49:54 +00:00
|
|
|
|
2018-08-06 17:18:11 +00:00
|
|
|
/// Check that entry_ptr is REPLACE_RANGE entry and can be removed from queue because current entry covers it
|
2020-07-09 04:43:28 +00:00
|
|
|
bool checkReplaceRangeCanBeRemoved(
|
2022-03-13 12:23:51 +00:00
|
|
|
const MergeTreePartInfo & part_info, LogEntryPtr entry_ptr, const ReplicatedMergeTreeLogEntryData & current) const;
|
2018-05-21 13:49:54 +00:00
|
|
|
|
2018-04-19 14:20:18 +00:00
|
|
|
/// Ensures that only one thread is simultaneously updating mutations.
|
|
|
|
std::mutex update_mutations_mutex;
|
|
|
|
|
2020-01-28 17:15:22 +00:00
|
|
|
/// Insert new entry from log into queue
|
2018-05-10 15:01:10 +00:00
|
|
|
void insertUnlocked(
|
|
|
|
const LogEntryPtr & entry, std::optional<time_t> & min_unprocessed_insert_time_changed,
|
2018-06-20 11:12:16 +00:00
|
|
|
std::lock_guard<std::mutex> & state_lock);
|
2016-01-10 04:44:12 +00:00
|
|
|
|
2018-06-06 19:15:10 +00:00
|
|
|
void removeProcessedEntry(zkutil::ZooKeeperPtr zookeeper, LogEntryPtr & entry);
|
2016-01-10 04:44:12 +00:00
|
|
|
|
2017-04-16 15:00:33 +00:00
|
|
|
/** Can I now try this action. If not, you need to leave it in the queue and try another one.
|
2018-06-20 11:12:16 +00:00
|
|
|
* Called under the state_mutex.
|
2017-04-01 07:20:54 +00:00
|
|
|
*/
|
2018-05-10 15:01:10 +00:00
|
|
|
bool shouldExecuteLogEntry(
|
|
|
|
const LogEntry & entry, String & out_postpone_reason,
|
2018-04-20 16:18:16 +00:00
|
|
|
MergeTreeDataMergerMutator & merger_mutator, MergeTreeData & data,
|
2022-06-15 20:08:45 +00:00
|
|
|
std::unique_lock<std::mutex> & state_lock) const;
|
2016-01-10 04:44:12 +00:00
|
|
|
|
2023-04-12 22:20:43 +00:00
|
|
|
/// Return the version (block number) of the last mutation that we don't need to apply to the part
|
|
|
|
/// with getDataVersion() == data_version. (Either this mutation was already applied or the part
|
|
|
|
/// was created after the mutation).
|
|
|
|
/// If there is no such mutation or it has already been executed and deleted, return 0.
|
|
|
|
Int64 getCurrentMutationVersion(const String & partition_id, Int64 data_version, std::lock_guard<std::mutex> & /* state_lock */) const;
|
2018-05-12 23:05:04 +00:00
|
|
|
|
2017-05-12 13:47:42 +00:00
|
|
|
/** Check that part isn't in currently generating parts and isn't covered by them.
|
2018-06-20 11:12:16 +00:00
|
|
|
* Should be called under state_mutex.
|
2017-05-12 13:47:42 +00:00
|
|
|
*/
|
2022-06-15 20:08:45 +00:00
|
|
|
bool isCoveredByFuturePartsImpl(
|
2021-10-18 20:16:02 +00:00
|
|
|
const LogEntry & entry,
|
2018-05-10 15:01:10 +00:00
|
|
|
const String & new_part_name, String & out_reason,
|
2022-06-15 20:08:45 +00:00
|
|
|
std::unique_lock<std::mutex> & state_lock,
|
|
|
|
std::vector<LogEntryPtr> * covered_entries_to_wait) const;
|
2017-05-12 13:47:42 +00:00
|
|
|
|
2018-06-20 11:12:16 +00:00
|
|
|
/// After removing the queue element, update the insertion times in the RAM. Running under state_mutex.
|
2017-04-16 15:00:33 +00:00
|
|
|
/// Returns information about what times have changed - this information can be passed to updateTimesInZooKeeper.
|
2018-06-18 12:17:46 +00:00
|
|
|
void updateStateOnQueueEntryRemoval(const LogEntryPtr & entry,
|
|
|
|
bool is_successful,
|
2018-03-03 16:46:32 +00:00
|
|
|
std::optional<time_t> & min_unprocessed_insert_time_changed,
|
|
|
|
std::optional<time_t> & max_processed_insert_time_changed,
|
2018-06-20 11:12:16 +00:00
|
|
|
std::unique_lock<std::mutex> & state_lock);
|
2016-01-17 13:00:42 +00:00
|
|
|
|
2020-02-05 16:30:02 +00:00
|
|
|
/// Add part for mutations with block_number > part.getDataVersion()
|
2021-11-03 18:13:42 +00:00
|
|
|
void addPartToMutations(const String & part_name, const MergeTreePartInfo & part_info);
|
2020-02-05 16:30:02 +00:00
|
|
|
|
2020-06-16 10:34:59 +00:00
|
|
|
/// Remove covered parts from mutations (parts_to_do) which were assigned
|
2020-06-16 10:50:47 +00:00
|
|
|
/// for mutation. If remove_covered_parts = true, than remove parts covered
|
|
|
|
/// by first argument. If remove_part == true, than also remove part itself.
|
|
|
|
/// Both negative flags will throw exception.
|
2020-06-16 10:34:59 +00:00
|
|
|
///
|
|
|
|
/// Part removed from mutations which satisfy contitions:
|
|
|
|
/// block_number > part.getDataVersion()
|
|
|
|
/// or block_number == part.getDataVersion()
|
|
|
|
/// ^ (this may happen if we downloaded mutated part from other replica)
|
2020-06-16 10:50:47 +00:00
|
|
|
void removeCoveredPartsFromMutations(const String & part_name, bool remove_part, bool remove_covered_parts);
|
2018-06-21 15:54:01 +00:00
|
|
|
|
2017-04-16 15:00:33 +00:00
|
|
|
/// Update the insertion times in ZooKeeper.
|
2018-03-03 16:46:32 +00:00
|
|
|
void updateTimesInZooKeeper(zkutil::ZooKeeperPtr zookeeper,
|
|
|
|
std::optional<time_t> min_unprocessed_insert_time_changed,
|
|
|
|
std::optional<time_t> max_processed_insert_time_changed) const;
|
2016-01-17 13:00:42 +00:00
|
|
|
|
2017-04-16 15:00:33 +00:00
|
|
|
/// Marks the element of the queue as running.
|
2017-04-01 07:20:54 +00:00
|
|
|
class CurrentlyExecuting
|
|
|
|
{
|
|
|
|
private:
|
|
|
|
ReplicatedMergeTreeQueue::LogEntryPtr entry;
|
|
|
|
ReplicatedMergeTreeQueue & queue;
|
2016-02-02 21:30:27 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
friend class ReplicatedMergeTreeQueue;
|
2016-02-02 21:30:27 +00:00
|
|
|
|
2017-04-16 15:00:33 +00:00
|
|
|
/// Created only in the selectEntryToProcess function. It is called under mutex.
|
2022-01-26 17:44:35 +00:00
|
|
|
CurrentlyExecuting(
|
|
|
|
const ReplicatedMergeTreeQueue::LogEntryPtr & entry_,
|
|
|
|
ReplicatedMergeTreeQueue & queue_,
|
2022-06-15 20:08:45 +00:00
|
|
|
std::unique_lock<std::mutex> & state_lock);
|
2017-05-12 13:47:42 +00:00
|
|
|
|
2018-06-20 11:12:16 +00:00
|
|
|
/// In case of fetch, we determine actual part during the execution, so we need to update entry. It is called under state_mutex.
|
2022-01-26 17:44:35 +00:00
|
|
|
static void setActualPartName(
|
|
|
|
ReplicatedMergeTreeQueue::LogEntry & entry,
|
|
|
|
const String & actual_part_name,
|
|
|
|
ReplicatedMergeTreeQueue & queue,
|
2022-06-15 20:08:45 +00:00
|
|
|
std::unique_lock<std::mutex> & state_lock,
|
|
|
|
std::vector<LogEntryPtr> & covered_entries_to_wait);
|
2022-01-26 17:44:35 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
public:
|
|
|
|
~CurrentlyExecuting();
|
|
|
|
};
|
2016-02-02 21:30:27 +00:00
|
|
|
|
2020-10-23 08:54:00 +00:00
|
|
|
using CurrentlyExecutingPtr = std::unique_ptr<CurrentlyExecuting>;
|
2020-10-24 04:13:17 +00:00
|
|
|
/// ZK contains a limit on the number or total size of operations in a multi-request.
|
|
|
|
/// If the limit is exceeded, the connection is simply closed.
|
|
|
|
/// The constant is selected with a margin. The default limit in ZK is 1 MB of data in total.
|
|
|
|
/// The average size of the node value in this case is less than 10 kilobytes.
|
2020-10-24 04:21:46 +00:00
|
|
|
static constexpr size_t MAX_MULTI_OPS = 100;
|
2020-10-24 04:13:17 +00:00
|
|
|
|
|
|
|
/// Very large queue entries may appear occasionally.
|
|
|
|
/// We cannot process MAX_MULTI_OPS at once because it will fail.
|
|
|
|
/// But we have to process more than one entry at once because otherwise lagged replicas keep up slowly.
|
2022-09-02 08:54:48 +00:00
|
|
|
/// Let's start with one entry per transaction and increase it exponentially towards MAX_MULTI_OPS.
|
2020-10-24 04:13:17 +00:00
|
|
|
/// It will allow to make some progress before failing and remain operational even in extreme cases.
|
|
|
|
size_t current_multi_batch_size = 1;
|
2020-10-23 08:54:00 +00:00
|
|
|
|
2016-01-10 04:44:12 +00:00
|
|
|
public:
|
2020-09-18 10:57:33 +00:00
|
|
|
ReplicatedMergeTreeQueue(StorageReplicatedMergeTree & storage_, ReplicatedMergeTreeMergeStrategyPicker & merge_strategy_picker_);
|
2023-03-27 23:39:36 +00:00
|
|
|
~ReplicatedMergeTreeQueue() = default;
|
2018-05-21 13:49:54 +00:00
|
|
|
|
2021-06-01 13:25:23 +00:00
|
|
|
/// Clears queue state
|
|
|
|
void clear();
|
2019-08-19 17:59:16 +00:00
|
|
|
|
2021-09-09 15:19:12 +00:00
|
|
|
/// Get set of parts from zookeeper
|
|
|
|
void initialize(zkutil::ZooKeeperPtr zookeeper);
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2017-04-27 15:19:11 +00:00
|
|
|
/** Inserts an action to the end of the queue.
|
2017-04-16 15:00:33 +00:00
|
|
|
* To restore broken parts during operation.
|
|
|
|
* Do not insert the action itself into ZK (do it yourself).
|
2017-04-01 07:20:54 +00:00
|
|
|
*/
|
|
|
|
void insert(zkutil::ZooKeeperPtr zookeeper, LogEntryPtr & entry);
|
|
|
|
|
2018-08-20 13:31:24 +00:00
|
|
|
/** Load (initialize) a queue from ZooKeeper (/replicas/me/queue/).
|
|
|
|
* If queue was not empty load() would not load duplicate records.
|
|
|
|
* return true, if we update queue.
|
|
|
|
*/
|
2018-08-09 15:06:39 +00:00
|
|
|
bool load(zkutil::ZooKeeperPtr zookeeper);
|
|
|
|
|
2021-06-01 13:25:23 +00:00
|
|
|
bool removeFailedQuorumPart(const MergeTreePartInfo & part_info);
|
2018-05-10 15:01:10 +00:00
|
|
|
|
2021-08-18 09:49:22 +00:00
|
|
|
enum PullLogsReason
|
|
|
|
{
|
|
|
|
LOAD,
|
|
|
|
UPDATE,
|
|
|
|
MERGE_PREDICATE,
|
|
|
|
SYNC,
|
|
|
|
OTHER,
|
|
|
|
};
|
|
|
|
|
2017-04-16 15:00:33 +00:00
|
|
|
/** Copy the new entries from the shared log to the queue of this replica. Set the log_pointer to the appropriate value.
|
2018-05-31 13:05:05 +00:00
|
|
|
* If watch_callback is not empty, will call it when new entries appear in the log.
|
2018-05-20 19:56:03 +00:00
|
|
|
* If there were new entries, notifies storage.queue_task_handle.
|
|
|
|
* Additionally loads mutations (so that the set of mutations is always more recent than the queue).
|
2020-06-12 18:24:32 +00:00
|
|
|
* Return the version of "logs" node (that is updated for every merge/mutation/... added to the log)
|
2017-04-01 07:20:54 +00:00
|
|
|
*/
|
2021-08-18 09:49:22 +00:00
|
|
|
int32_t pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper, Coordination::WatchCallback watch_callback = {}, PullLogsReason reason = OTHER);
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2018-05-31 13:05:05 +00:00
|
|
|
/// Load new mutation entries. If something new is loaded, schedule storage.merge_selecting_task.
|
|
|
|
/// If watch_callback is not empty, will call it when new mutations appear in ZK.
|
2018-08-25 01:58:14 +00:00
|
|
|
void updateMutations(zkutil::ZooKeeperPtr zookeeper, Coordination::WatchCallback watch_callback = {});
|
2018-04-19 14:20:18 +00:00
|
|
|
|
2019-02-04 12:53:25 +00:00
|
|
|
/// Remove a mutation from ZooKeeper and from the local set. Returns the removed entry or nullptr
|
2020-02-17 16:33:05 +00:00
|
|
|
/// if it could not be found. Called during KILL MUTATION query execution.
|
2019-02-04 12:53:25 +00:00
|
|
|
ReplicatedMergeTreeMutationEntryPtr removeMutation(zkutil::ZooKeeperPtr zookeeper, const String & mutation_id);
|
|
|
|
|
2017-04-16 15:00:33 +00:00
|
|
|
/** Remove the action from the queue with the parts covered by part_name (from ZK and from the RAM).
|
|
|
|
* And also wait for the completion of their execution, if they are now being executed.
|
2021-10-25 14:01:23 +00:00
|
|
|
* covering_entry is as an entry that caused removal of entries in range (usually, DROP_RANGE)
|
2017-04-01 07:20:54 +00:00
|
|
|
*/
|
Remove covered parts for fetched part
Here is an example that I found on production, simplified.
Consider the following queue (nothing of this had been processed on this
replica):
- GET_PART all_0_0_0 (queue-0000000001)
- GET_PART all_1_1_0 (queue-0000000002)
...
- GET_PART all_0_1_1 (queue-0000000003)
- GET_PART all_2_2_0 (queue-0000000004)
...
- MERGE_PARTS from [all_0_1_1, all_2_2_0] to all_0_2_2 (queue-0000000005)
And now queue-0000000005 started to executing (either because
of reording, or because at that time GET_PART fails), and it
does not have any required parts, so it will fetch them, but
not all_0_0_0 and all_1_1_0, so this replica delay will set to
the time of min(queue-0000000001, queue-0000000002), while it
is not true, since it already have parts that covers those
parts.
and since MERGE_PARTS takes 30min, it increased the replica delay
eventually to 30min, for the time range of 30min, which is pretty huge.
Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
2022-07-30 06:19:58 +00:00
|
|
|
void removePartProducingOpsInRange(zkutil::ZooKeeperPtr zookeeper,
|
|
|
|
const MergeTreePartInfo & part_info,
|
2023-02-13 13:46:46 +00:00
|
|
|
const std::optional<ReplicatedMergeTreeLogEntryData> & covering_entry);
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2017-04-16 15:00:33 +00:00
|
|
|
/** In the case where there are not enough parts to perform the merge in part_name
|
|
|
|
* - move actions with merged parts to the end of the queue
|
|
|
|
* (in order to download a already merged part from another replica).
|
2017-04-01 07:20:54 +00:00
|
|
|
*/
|
|
|
|
StringSet moveSiblingPartsForMergeToEndOfQueue(const String & part_name);
|
|
|
|
|
2017-04-16 15:00:33 +00:00
|
|
|
/** Select the next action to process.
|
2018-06-01 18:06:43 +00:00
|
|
|
* merger_mutator is used only to check if the merges are not suspended.
|
2017-04-01 07:20:54 +00:00
|
|
|
*/
|
2020-10-23 08:54:00 +00:00
|
|
|
struct SelectedEntry
|
|
|
|
{
|
|
|
|
ReplicatedMergeTreeQueue::LogEntryPtr log_entry;
|
|
|
|
CurrentlyExecutingPtr currently_executing_holder;
|
|
|
|
|
|
|
|
SelectedEntry(const ReplicatedMergeTreeQueue::LogEntryPtr & log_entry_, CurrentlyExecutingPtr && currently_executing_holder_)
|
|
|
|
: log_entry(log_entry_)
|
|
|
|
, currently_executing_holder(std::move(currently_executing_holder_))
|
|
|
|
{}
|
|
|
|
};
|
|
|
|
|
|
|
|
using SelectedEntryPtr = std::shared_ptr<SelectedEntry>;
|
|
|
|
SelectedEntryPtr selectEntryToProcess(MergeTreeDataMergerMutator & merger_mutator, MergeTreeData & data);
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2017-04-16 15:00:33 +00:00
|
|
|
/** Execute `func` function to handle the action.
|
|
|
|
* In this case, at runtime, mark the queue element as running
|
|
|
|
* (add into future_parts and more).
|
|
|
|
* If there was an exception during processing, it saves it in `entry`.
|
|
|
|
* Returns true if there were no exceptions during the processing.
|
2017-04-01 07:20:54 +00:00
|
|
|
*/
|
2022-03-13 12:23:51 +00:00
|
|
|
bool processEntry(std::function<zkutil::ZooKeeperPtr()> get_zookeeper, LogEntryPtr & entry, std::function<bool(LogEntryPtr &)> func);
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2018-07-31 11:36:08 +00:00
|
|
|
/// Count the number of merges and mutations of single parts in the queue.
|
2020-09-02 08:18:50 +00:00
|
|
|
OperationsInQueue countMergesAndPartMutations() const;
|
2018-07-31 11:36:08 +00:00
|
|
|
|
|
|
|
/// Count the total number of active mutations.
|
|
|
|
size_t countMutations() const;
|
|
|
|
|
|
|
|
/// Count the total number of active mutations that are finished (is_done = true).
|
|
|
|
size_t countFinishedMutations() const;
|
2023-04-24 18:21:49 +00:00
|
|
|
/// Count the total number of active mutations that are not finished (is_done = false).
|
|
|
|
size_t countUnfinishedMutations() const;
|
2018-07-31 11:36:08 +00:00
|
|
|
|
2019-08-16 15:57:19 +00:00
|
|
|
/// Returns functor which used by MergeTreeMergerMutator to select parts for merge
|
2022-11-25 15:41:20 +00:00
|
|
|
ReplicatedMergeTreeMergePredicate getMergePredicate(zkutil::ZooKeeperPtr & zookeeper, PartitionIdsHint && partition_ids_hint);
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2018-04-20 19:11:20 +00:00
|
|
|
MutationCommands getMutationCommands(const MergeTreeData::DataPartPtr & part, Int64 desired_mutation_version) const;
|
|
|
|
|
2023-02-27 11:27:57 +00:00
|
|
|
/// Return mutation commands for part which could be not applied to
|
|
|
|
/// it according to part mutation version. Used when we apply alter commands on fly,
|
2020-04-03 10:40:46 +00:00
|
|
|
/// without actual data modification on disk.
|
2023-02-27 11:27:57 +00:00
|
|
|
std::map<int64_t, MutationCommands> getAlterMutationCommandsForPart(const MergeTreeData::DataPartPtr & part) const;
|
2020-03-24 17:05:38 +00:00
|
|
|
|
2018-06-21 13:27:36 +00:00
|
|
|
/// Mark finished mutations as done. If the function needs to be called again at some later time
|
|
|
|
/// (because some mutations are probably done but we are not sure yet), returns true.
|
|
|
|
bool tryFinalizeMutations(zkutil::ZooKeeperPtr zookeeper);
|
|
|
|
|
2020-08-08 00:47:03 +00:00
|
|
|
/// Checks that part is already in virtual parts
|
2019-09-10 11:21:59 +00:00
|
|
|
bool isVirtualPart(const MergeTreeData::DataPartPtr & data_part) const;
|
2019-08-20 08:38:02 +00:00
|
|
|
|
2023-01-31 12:37:56 +00:00
|
|
|
/// Returns true if part_info is covered by some DROP_RANGE or DROP_PART
|
|
|
|
bool isGoingToBeDropped(const MergeTreePartInfo & part_info, MergeTreePartInfo * out_drop_range_info = nullptr) const;
|
|
|
|
bool isGoingToBeDroppedImpl(const MergeTreePartInfo & part_info, MergeTreePartInfo * out_drop_range_info) const;
|
2022-04-12 12:14:26 +00:00
|
|
|
|
2021-07-02 09:40:13 +00:00
|
|
|
/// Check that part produced by some entry in queue and get source parts for it.
|
|
|
|
/// If there are several entries return largest source_parts set. This rarely possible
|
|
|
|
/// for example after replica clone.
|
2021-06-30 15:24:51 +00:00
|
|
|
bool checkPartInQueueAndGetSourceParts(const String & part_name, Strings & source_parts) const;
|
|
|
|
|
2019-08-16 15:57:19 +00:00
|
|
|
/// Check that part isn't in currently generating parts and isn't covered by them and add it to future_parts.
|
|
|
|
/// Locks queue's mutex.
|
2018-05-23 14:33:55 +00:00
|
|
|
bool addFuturePartIfNotCoveredByThem(const String & part_name, LogEntry & entry, String & reject_reason);
|
2017-05-12 13:47:42 +00:00
|
|
|
|
2018-05-21 13:49:54 +00:00
|
|
|
/// A blocker that stops selects from the queue
|
2018-05-28 15:37:30 +00:00
|
|
|
ActionBlocker actions_blocker;
|
2018-05-21 13:49:54 +00:00
|
|
|
|
2020-04-16 15:30:18 +00:00
|
|
|
/// A blocker that stops pulling entries from replication log to queue
|
|
|
|
ActionBlocker pull_log_blocker;
|
|
|
|
|
2018-05-21 13:49:54 +00:00
|
|
|
/// Adds a subscriber
|
2023-03-31 14:09:00 +00:00
|
|
|
SubscriberHandler addSubscriber(SubscriberCallBack && callback, std::unordered_set<String> & out_entry_names, SyncReplicaMode sync_mode);
|
2023-03-27 23:39:36 +00:00
|
|
|
|
|
|
|
void notifySubscribersOnPartialShutdown();
|
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
struct Status
|
|
|
|
{
|
2022-10-07 10:46:45 +00:00
|
|
|
/// TODO: consider using UInt64 here
|
2017-04-01 07:20:54 +00:00
|
|
|
UInt32 future_parts;
|
|
|
|
UInt32 queue_size;
|
|
|
|
UInt32 inserts_in_queue;
|
|
|
|
UInt32 merges_in_queue;
|
2018-06-06 13:22:30 +00:00
|
|
|
UInt32 part_mutations_in_queue;
|
2017-04-01 07:20:54 +00:00
|
|
|
UInt32 queue_oldest_time;
|
|
|
|
UInt32 inserts_oldest_time;
|
|
|
|
UInt32 merges_oldest_time;
|
2018-06-06 13:22:30 +00:00
|
|
|
UInt32 part_mutations_oldest_time;
|
2017-04-01 07:20:54 +00:00
|
|
|
String oldest_part_to_get;
|
|
|
|
String oldest_part_to_merge_to;
|
2018-04-20 16:18:16 +00:00
|
|
|
String oldest_part_to_mutate_to;
|
2017-04-01 07:20:54 +00:00
|
|
|
UInt32 last_queue_update;
|
|
|
|
};
|
|
|
|
|
2017-04-16 15:00:33 +00:00
|
|
|
/// Get information about the queue.
|
2018-03-03 16:26:06 +00:00
|
|
|
Status getStatus() const;
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2017-04-16 15:00:33 +00:00
|
|
|
/// Get the data of the queue elements.
|
2017-04-01 07:20:54 +00:00
|
|
|
using LogEntriesData = std::vector<ReplicatedMergeTreeLogEntryData>;
|
2018-03-03 16:26:06 +00:00
|
|
|
void getEntries(LogEntriesData & res) const;
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2017-04-16 15:00:33 +00:00
|
|
|
/// Get information about the insertion times.
|
2017-04-01 07:20:54 +00:00
|
|
|
void getInsertTimes(time_t & out_min_unprocessed_insert_time, time_t & out_max_processed_insert_time) const;
|
2018-06-07 13:28:39 +00:00
|
|
|
|
2020-07-22 12:36:19 +00:00
|
|
|
|
|
|
|
/// Return empty optional if mutation was killed. Otherwise return partially
|
|
|
|
/// filled mutation status with information about error (latest_fail*) and
|
2020-07-31 12:22:32 +00:00
|
|
|
/// is_done. mutation_ids filled with all mutations with same errors,
|
|
|
|
/// because they may be executed simultaneously as one mutation. Order is
|
|
|
|
/// important for better readability of exception message. If mutation was
|
|
|
|
/// killed doesn't return any ids.
|
2020-07-31 11:37:16 +00:00
|
|
|
std::optional<MergeTreeMutationStatus> getIncompleteMutationsStatus(const String & znode_name, std::set<String> * mutation_ids = nullptr) const;
|
2020-07-22 12:36:19 +00:00
|
|
|
|
2018-06-07 13:28:39 +00:00
|
|
|
std::vector<MergeTreeMutationStatus> getMutationsStatus() const;
|
2020-01-31 12:25:31 +00:00
|
|
|
|
2020-02-18 19:57:48 +00:00
|
|
|
void removeCurrentPartsFromMutations();
|
2020-03-27 10:53:04 +00:00
|
|
|
|
|
|
|
using QueueLocks = std::scoped_lock<std::mutex, std::mutex, std::mutex>;
|
|
|
|
|
|
|
|
/// This method locks all important queue mutexes: state_mutex,
|
|
|
|
/// pull_logs_to_queue and update_mutations_mutex. It should be used only
|
|
|
|
/// once while we want to shutdown our queue and remove it's task from pool.
|
|
|
|
/// It's needed because queue itself can trigger it's task handler and in
|
|
|
|
/// this case race condition is possible.
|
|
|
|
QueueLocks lockQueue();
|
2021-10-18 20:16:02 +00:00
|
|
|
|
|
|
|
/// Can be called only on data parts loading.
|
|
|
|
/// We need loaded queue to create GET_PART entry for broken (or missing) part,
|
|
|
|
/// but queue is not loaded yet on data parts loading.
|
|
|
|
void setBrokenPartsToEnqueueFetchesOnLoading(Strings && parts_to_fetch);
|
|
|
|
/// Must be called right after queue loading.
|
|
|
|
void createLogEntriesToFetchBrokenParts();
|
2016-01-10 04:44:12 +00:00
|
|
|
};
|
|
|
|
|
2018-05-10 15:01:10 +00:00
|
|
|
class ReplicatedMergeTreeMergePredicate
|
|
|
|
{
|
|
|
|
public:
|
2022-11-25 15:41:20 +00:00
|
|
|
ReplicatedMergeTreeMergePredicate(ReplicatedMergeTreeQueue & queue_, zkutil::ZooKeeperPtr & zookeeper, PartitionIdsHint && partition_ids_hint_);
|
2018-05-10 15:01:10 +00:00
|
|
|
|
2020-04-16 18:47:20 +00:00
|
|
|
/// Depending on the existence of left part checks a merge predicate for two parts or for single part.
|
|
|
|
bool operator()(const MergeTreeData::DataPartPtr & left,
|
|
|
|
const MergeTreeData::DataPartPtr & right,
|
2021-05-18 17:07:29 +00:00
|
|
|
const MergeTreeTransaction * txn,
|
2020-04-16 18:47:20 +00:00
|
|
|
String * out_reason = nullptr) const;
|
|
|
|
|
2018-05-10 15:01:10 +00:00
|
|
|
/// Can we assign a merge with these two parts?
|
|
|
|
/// (assuming that no merge was assigned after the predicate was constructed)
|
|
|
|
/// If we can't and out_reason is not nullptr, set it to the reason why we can't merge.
|
2020-04-16 18:47:20 +00:00
|
|
|
bool canMergeTwoParts(const MergeTreeData::DataPartPtr & left,
|
|
|
|
const MergeTreeData::DataPartPtr & right,
|
|
|
|
String * out_reason = nullptr) const;
|
2018-05-10 15:01:10 +00:00
|
|
|
|
2020-04-16 18:47:20 +00:00
|
|
|
/// Can we assign a merge this part and some other part?
|
|
|
|
/// For example a merge of a part and itself is needed for TTL.
|
2021-05-10 18:03:37 +00:00
|
|
|
/// This predicate is checked for the first part of each range.
|
2020-04-10 21:29:54 +00:00
|
|
|
bool canMergeSinglePart(const MergeTreeData::DataPartPtr & part, String * out_reason) const;
|
|
|
|
|
2022-06-01 18:11:53 +00:00
|
|
|
/// Returns true if part is needed for some REPLACE_RANGE entry.
|
|
|
|
/// We should not drop part in this case, because replication queue may stuck without that part.
|
|
|
|
bool partParticipatesInReplaceRange(const MergeTreeData::DataPartPtr & part, String * out_reason) const;
|
|
|
|
|
2020-02-17 16:33:05 +00:00
|
|
|
/// Return nonempty optional of desired mutation version and alter version.
|
|
|
|
/// If we have no alter (modify/drop) mutations in mutations queue, than we return biggest possible
|
|
|
|
/// mutation version (and -1 as alter version). In other case, we return biggest mutation version with
|
|
|
|
/// smallest alter version. This required, because we have to execute alter mutations sequentially and
|
|
|
|
/// don't glue them together. Alter is rare operation, so it shouldn't affect performance.
|
2020-01-31 12:25:31 +00:00
|
|
|
std::optional<std::pair<Int64, int>> getDesiredMutationVersion(const MergeTreeData::DataPartPtr & part) const;
|
2018-05-12 23:05:04 +00:00
|
|
|
|
2022-11-16 10:50:51 +00:00
|
|
|
bool isMutationFinished(const std::string & znode_name, const std::map<String, int64_t> & block_numbers) const;
|
2018-06-21 13:27:36 +00:00
|
|
|
|
2020-06-12 18:24:32 +00:00
|
|
|
/// The version of "log" node that is used to check that no new merges have appeared.
|
|
|
|
int32_t getVersion() const { return merges_version; }
|
|
|
|
|
2022-07-19 11:58:59 +00:00
|
|
|
/// Returns true if there's a drop range covering new_drop_range_info
|
2023-01-31 12:37:56 +00:00
|
|
|
bool isGoingToBeDropped(const MergeTreePartInfo & new_drop_range_info, MergeTreePartInfo * out_drop_range_info = nullptr) const;
|
2021-07-05 19:58:36 +00:00
|
|
|
|
2022-07-19 11:58:59 +00:00
|
|
|
/// Returns virtual part covering part_name (if any) or empty string
|
2022-07-18 21:37:07 +00:00
|
|
|
String getCoveringVirtualPart(const String & part_name) const;
|
|
|
|
|
2018-05-10 15:01:10 +00:00
|
|
|
private:
|
|
|
|
const ReplicatedMergeTreeQueue & queue;
|
|
|
|
|
2022-11-25 15:41:20 +00:00
|
|
|
PartitionIdsHint partition_ids_hint;
|
|
|
|
|
2018-05-10 15:01:10 +00:00
|
|
|
/// A snapshot of active parts that would appear if the replica executes all log entries in its queue.
|
2018-05-12 23:05:04 +00:00
|
|
|
ActiveDataPartSet prev_virtual_parts;
|
|
|
|
/// partition ID -> block numbers of the inserts and mutations that are about to commit
|
|
|
|
/// (loaded at some later time than prev_virtual_parts).
|
|
|
|
std::unordered_map<String, std::set<Int64>> committing_blocks;
|
2018-05-10 15:01:10 +00:00
|
|
|
|
2020-11-24 14:24:48 +00:00
|
|
|
/// List of UUIDs for parts that have their identity "pinned".
|
|
|
|
PinnedPartUUIDs pinned_part_uuids;
|
|
|
|
|
2018-05-12 23:05:04 +00:00
|
|
|
/// Quorum state taken at some later time than prev_virtual_parts.
|
2018-05-10 15:01:10 +00:00
|
|
|
String inprogress_quorum_part;
|
2020-06-12 18:24:32 +00:00
|
|
|
|
|
|
|
int32_t merges_version = -1;
|
2018-05-10 15:01:10 +00:00
|
|
|
};
|
|
|
|
|
2016-01-10 04:44:12 +00:00
|
|
|
|
2017-04-16 15:00:33 +00:00
|
|
|
/** Convert a number to a string in the format of the suffixes of auto-incremental nodes in ZooKeeper.
|
|
|
|
* Negative numbers are also supported - for them the name of the node looks somewhat silly
|
|
|
|
* and does not match any auto-incremented node in ZK.
|
2016-01-10 04:44:12 +00:00
|
|
|
*/
|
|
|
|
String padIndex(Int64 index);
|
|
|
|
|
|
|
|
}
|