2017-11-23 13:12:22 +00:00
|
|
|
#pragma once
|
|
|
|
|
2018-03-03 16:46:32 +00:00
|
|
|
#include <optional>
|
|
|
|
|
2017-04-01 09:19:00 +00:00
|
|
|
#include <Storages/MergeTree/ReplicatedMergeTreeLogEntry.h>
|
2018-04-19 14:20:18 +00:00
|
|
|
#include <Storages/MergeTree/ReplicatedMergeTreeMutationEntry.h>
|
2017-04-01 09:19:00 +00:00
|
|
|
#include <Storages/MergeTree/ActiveDataPartSet.h>
|
|
|
|
#include <Storages/MergeTree/MergeTreeData.h>
|
2016-01-10 04:44:12 +00:00
|
|
|
|
2017-06-19 20:06:35 +00:00
|
|
|
#include <Common/ZooKeeper/ZooKeeper.h>
|
2016-01-10 04:44:12 +00:00
|
|
|
|
|
|
|
|
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
|
|
|
|
|
|
|
|
class MergeTreeDataMerger;
|
|
|
|
|
2018-05-10 15:01:10 +00:00
|
|
|
class ReplicatedMergeTreeMergePredicate;
|
|
|
|
|
2016-01-10 04:44:12 +00:00
|
|
|
|
|
|
|
class ReplicatedMergeTreeQueue
|
|
|
|
{
|
|
|
|
private:
|
2017-04-01 07:20:54 +00:00
|
|
|
friend class CurrentlyExecuting;
|
2018-05-10 15:01:10 +00:00
|
|
|
friend class ReplicatedMergeTreeMergePredicate;
|
2016-01-10 04:44:12 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
using StringSet = std::set<String>;
|
2016-01-10 04:44:12 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
using LogEntry = ReplicatedMergeTreeLogEntry;
|
|
|
|
using LogEntryPtr = LogEntry::Ptr;
|
2016-01-10 04:44:12 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
using Queue = std::list<LogEntryPtr>;
|
2016-01-10 04:44:12 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
struct ByTime
|
|
|
|
{
|
|
|
|
bool operator()(const LogEntryPtr & lhs, const LogEntryPtr & rhs) const
|
|
|
|
{
|
2017-09-01 18:21:01 +00:00
|
|
|
return std::forward_as_tuple(lhs->create_time, lhs.get())
|
|
|
|
< std::forward_as_tuple(rhs->create_time, rhs.get());
|
2017-04-01 07:20:54 +00:00
|
|
|
}
|
|
|
|
};
|
2016-01-17 13:00:42 +00:00
|
|
|
|
2017-04-16 15:00:33 +00:00
|
|
|
/// To calculate min_unprocessed_insert_time, max_processed_insert_time, for which the replica lag is calculated.
|
2017-04-01 07:20:54 +00:00
|
|
|
using InsertsByTime = std::set<LogEntryPtr, ByTime>;
|
2016-01-17 13:00:42 +00:00
|
|
|
|
|
|
|
|
2017-11-28 14:07:17 +00:00
|
|
|
MergeTreeDataFormatVersion format_version;
|
2017-08-25 20:41:45 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
String zookeeper_path;
|
|
|
|
String replica_path;
|
|
|
|
String logger_name;
|
2018-05-10 15:01:10 +00:00
|
|
|
Logger * log = nullptr;
|
|
|
|
|
|
|
|
/// Protects the queue, future_parts and other queue state variables.
|
|
|
|
mutable std::mutex queue_mutex;
|
2016-01-10 04:44:12 +00:00
|
|
|
|
2017-04-16 15:00:33 +00:00
|
|
|
/** The queue of what you need to do on this line to catch up. It is taken from ZooKeeper (/replicas/me/queue/).
|
|
|
|
* In ZK records in chronological order. Here it is not necessary.
|
2017-04-01 07:20:54 +00:00
|
|
|
*/
|
|
|
|
Queue queue;
|
2016-01-10 04:44:12 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
InsertsByTime inserts_by_time;
|
|
|
|
time_t min_unprocessed_insert_time = 0;
|
|
|
|
time_t max_processed_insert_time = 0;
|
2016-01-17 13:00:42 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
time_t last_queue_update = 0;
|
2016-01-10 04:44:12 +00:00
|
|
|
|
2017-04-16 15:00:33 +00:00
|
|
|
/// parts that will appear as a result of actions performed right now by background threads (these actions are not in the queue).
|
|
|
|
/// Used to not perform other actions at the same time with these parts.
|
2017-04-01 07:20:54 +00:00
|
|
|
StringSet future_parts;
|
2016-01-10 04:44:12 +00:00
|
|
|
|
2018-04-19 14:20:18 +00:00
|
|
|
/// Protects virtual_parts, log_pointer, mutations.
|
2018-05-10 15:01:10 +00:00
|
|
|
/// If you intend to lock both target_state_mutex and queue_mutex, lock target_state_mutex first.
|
|
|
|
mutable std::mutex target_state_mutex;
|
2016-01-10 04:44:12 +00:00
|
|
|
|
2018-05-10 15:01:10 +00:00
|
|
|
/// Index of the first log entry that we didn't see yet.
|
|
|
|
Int64 log_pointer = 0;
|
2016-01-13 02:36:11 +00:00
|
|
|
|
2018-05-10 15:01:10 +00:00
|
|
|
/** What will be the set of active parts after executing all log entries up to log_pointer.
|
|
|
|
* Used to determine which merges can be assigned (see ReplicatedMergeTreeMergePredicate)
|
2017-04-01 07:20:54 +00:00
|
|
|
*/
|
|
|
|
ActiveDataPartSet virtual_parts;
|
2016-01-10 04:44:12 +00:00
|
|
|
|
2018-04-19 14:20:18 +00:00
|
|
|
/// A set of mutations loaded from ZooKeeper.
|
|
|
|
/// mutations_by_partition is an index partition ID -> block ID -> mutation into this list.
|
|
|
|
/// Note that mutations are updated in such a way that they are always more recent than
|
|
|
|
/// log_pointer (see pullLogsToQueue()).
|
|
|
|
std::map<String, ReplicatedMergeTreeMutationEntry> mutations_by_znode;
|
|
|
|
std::unordered_map<String, std::map<Int64, const ReplicatedMergeTreeMutationEntry *>> mutations_by_partition;
|
|
|
|
|
2018-05-10 15:01:10 +00:00
|
|
|
/// Provides only one simultaneous call to pullLogsToQueue.
|
|
|
|
std::mutex pull_logs_to_queue_mutex;
|
2016-01-10 04:44:12 +00:00
|
|
|
|
2018-04-19 14:20:18 +00:00
|
|
|
/// Ensures that only one thread is simultaneously updating mutations.
|
|
|
|
std::mutex update_mutations_mutex;
|
|
|
|
|
2017-04-16 15:00:33 +00:00
|
|
|
/// Put a set of (already existing) parts in virtual_parts.
|
2017-04-01 07:20:54 +00:00
|
|
|
void initVirtualParts(const MergeTreeData::DataParts & parts);
|
2016-01-10 04:44:12 +00:00
|
|
|
|
2017-04-16 15:00:33 +00:00
|
|
|
/// Load (initialize) a queue from ZooKeeper (/replicas/me/queue/).
|
2017-11-28 14:07:17 +00:00
|
|
|
bool load(zkutil::ZooKeeperPtr zookeeper);
|
2016-01-10 04:44:12 +00:00
|
|
|
|
2018-05-10 15:01:10 +00:00
|
|
|
void insertUnlocked(
|
|
|
|
const LogEntryPtr & entry, std::optional<time_t> & min_unprocessed_insert_time_changed,
|
|
|
|
std::lock_guard<std::mutex> & target_state_lock,
|
|
|
|
std::lock_guard<std::mutex> & queue_lock);
|
2016-01-10 04:44:12 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
void remove(zkutil::ZooKeeperPtr zookeeper, LogEntryPtr & entry);
|
2016-01-10 04:44:12 +00:00
|
|
|
|
2017-04-16 15:00:33 +00:00
|
|
|
/** Can I now try this action. If not, you need to leave it in the queue and try another one.
|
2018-05-10 15:01:10 +00:00
|
|
|
* Called under the queue_mutex.
|
2017-04-01 07:20:54 +00:00
|
|
|
*/
|
2018-05-10 15:01:10 +00:00
|
|
|
bool shouldExecuteLogEntry(
|
|
|
|
const LogEntry & entry, String & out_postpone_reason,
|
|
|
|
MergeTreeDataMerger & merger, MergeTreeData & data,
|
|
|
|
std::lock_guard<std::mutex> & queue_lock) const;
|
2016-01-10 04:44:12 +00:00
|
|
|
|
2017-05-12 13:47:42 +00:00
|
|
|
/** Check that part isn't in currently generating parts and isn't covered by them.
|
2018-05-10 15:01:10 +00:00
|
|
|
* Should be called under queue_mutex.
|
2017-05-12 13:47:42 +00:00
|
|
|
*/
|
2018-05-10 15:01:10 +00:00
|
|
|
bool isNotCoveredByFuturePartsImpl(
|
|
|
|
const String & new_part_name, String & out_reason,
|
|
|
|
std::lock_guard<std::mutex> & queue_lock) const;
|
2017-05-12 13:47:42 +00:00
|
|
|
|
2018-05-10 15:01:10 +00:00
|
|
|
/// After removing the queue element, update the insertion times in the RAM. Running under mutex.
|
2017-04-16 15:00:33 +00:00
|
|
|
/// Returns information about what times have changed - this information can be passed to updateTimesInZooKeeper.
|
2018-03-03 16:46:32 +00:00
|
|
|
void updateTimesOnRemoval(const LogEntryPtr & entry,
|
|
|
|
std::optional<time_t> & min_unprocessed_insert_time_changed,
|
|
|
|
std::optional<time_t> & max_processed_insert_time_changed,
|
2018-05-10 15:01:10 +00:00
|
|
|
std::unique_lock<std::mutex> & queue_lock);
|
2016-01-17 13:00:42 +00:00
|
|
|
|
2017-04-16 15:00:33 +00:00
|
|
|
/// Update the insertion times in ZooKeeper.
|
2018-03-03 16:46:32 +00:00
|
|
|
void updateTimesInZooKeeper(zkutil::ZooKeeperPtr zookeeper,
|
|
|
|
std::optional<time_t> min_unprocessed_insert_time_changed,
|
|
|
|
std::optional<time_t> max_processed_insert_time_changed) const;
|
2016-01-17 13:00:42 +00:00
|
|
|
|
2018-03-03 16:26:06 +00:00
|
|
|
/// Returns list of currently executing entries blocking execution of specified CLEAR_COLUMN command
|
2018-05-10 15:01:10 +00:00
|
|
|
Queue getConflictsForClearColumnCommand(const LogEntry & entry, String * out_conflicts_description, std::lock_guard<std::mutex> & queue_lock) const;
|
2016-02-02 21:30:27 +00:00
|
|
|
|
2017-04-16 15:00:33 +00:00
|
|
|
/// Marks the element of the queue as running.
|
2017-04-01 07:20:54 +00:00
|
|
|
class CurrentlyExecuting
|
|
|
|
{
|
|
|
|
private:
|
|
|
|
ReplicatedMergeTreeQueue::LogEntryPtr entry;
|
|
|
|
ReplicatedMergeTreeQueue & queue;
|
2016-02-02 21:30:27 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
friend class ReplicatedMergeTreeQueue;
|
2016-02-02 21:30:27 +00:00
|
|
|
|
2017-04-16 15:00:33 +00:00
|
|
|
/// Created only in the selectEntryToProcess function. It is called under mutex.
|
2017-04-01 07:20:54 +00:00
|
|
|
CurrentlyExecuting(ReplicatedMergeTreeQueue::LogEntryPtr & entry, ReplicatedMergeTreeQueue & queue);
|
2017-05-12 13:47:42 +00:00
|
|
|
|
2018-05-10 15:01:10 +00:00
|
|
|
/// In case of fetch, we determine actual part during the execution, so we need to update entry. It is called under queue_mutex.
|
2017-05-12 13:47:42 +00:00
|
|
|
static void setActualPartName(const ReplicatedMergeTreeLogEntry & entry, const String & actual_part_name,
|
|
|
|
ReplicatedMergeTreeQueue & queue);
|
2017-04-01 07:20:54 +00:00
|
|
|
public:
|
|
|
|
~CurrentlyExecuting();
|
|
|
|
};
|
2016-02-02 21:30:27 +00:00
|
|
|
|
2016-01-10 04:44:12 +00:00
|
|
|
public:
|
2017-11-28 14:07:17 +00:00
|
|
|
ReplicatedMergeTreeQueue(MergeTreeDataFormatVersion format_version_)
|
|
|
|
: format_version(format_version_)
|
|
|
|
, virtual_parts(format_version)
|
|
|
|
{
|
|
|
|
}
|
2017-04-01 07:20:54 +00:00
|
|
|
|
|
|
|
void initialize(const String & zookeeper_path_, const String & replica_path_, const String & logger_name_,
|
|
|
|
const MergeTreeData::DataParts & parts, zkutil::ZooKeeperPtr zookeeper);
|
|
|
|
|
2017-04-27 15:19:11 +00:00
|
|
|
/** Inserts an action to the end of the queue.
|
2017-04-16 15:00:33 +00:00
|
|
|
* To restore broken parts during operation.
|
|
|
|
* Do not insert the action itself into ZK (do it yourself).
|
2017-04-01 07:20:54 +00:00
|
|
|
*/
|
|
|
|
void insert(zkutil::ZooKeeperPtr zookeeper, LogEntryPtr & entry);
|
|
|
|
|
2017-04-16 15:00:33 +00:00
|
|
|
/** Delete the action with the specified part (as new_part_name) from the queue.
|
|
|
|
* Called for unreachable actions in the queue - old lost parts.
|
2017-04-01 07:20:54 +00:00
|
|
|
*/
|
|
|
|
bool remove(zkutil::ZooKeeperPtr zookeeper, const String & part_name);
|
|
|
|
|
2018-05-10 15:01:10 +00:00
|
|
|
bool removeFromVirtualParts(const MergeTreePartInfo & part_info);
|
|
|
|
|
2017-04-16 15:00:33 +00:00
|
|
|
/** Copy the new entries from the shared log to the queue of this replica. Set the log_pointer to the appropriate value.
|
|
|
|
* If next_update_event != nullptr, will call this event when new entries appear in the log.
|
|
|
|
* Returns true if new entries have been.
|
2017-04-01 07:20:54 +00:00
|
|
|
*/
|
2017-12-21 18:17:06 +00:00
|
|
|
bool pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper, zkutil::EventPtr next_update_event);
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2018-04-19 14:20:18 +00:00
|
|
|
bool updateMutations(zkutil::ZooKeeperPtr zookeeper, zkutil::EventPtr next_update_event);
|
|
|
|
|
2017-04-16 15:00:33 +00:00
|
|
|
/** Remove the action from the queue with the parts covered by part_name (from ZK and from the RAM).
|
|
|
|
* And also wait for the completion of their execution, if they are now being executed.
|
2017-04-01 07:20:54 +00:00
|
|
|
*/
|
|
|
|
void removeGetsAndMergesInRange(zkutil::ZooKeeperPtr zookeeper, const String & part_name);
|
|
|
|
|
2017-06-16 16:47:09 +00:00
|
|
|
/** Disables future merges and fetches inside entry.new_part_name
|
|
|
|
* If there are currently executing merges or fetches then throws exception.
|
|
|
|
*/
|
|
|
|
void disableMergesAndFetchesInRange(const LogEntry & entry);
|
|
|
|
|
2017-04-16 15:00:33 +00:00
|
|
|
/** In the case where there are not enough parts to perform the merge in part_name
|
|
|
|
* - move actions with merged parts to the end of the queue
|
|
|
|
* (in order to download a already merged part from another replica).
|
2017-04-01 07:20:54 +00:00
|
|
|
*/
|
|
|
|
StringSet moveSiblingPartsForMergeToEndOfQueue(const String & part_name);
|
|
|
|
|
2017-04-16 15:00:33 +00:00
|
|
|
/** Select the next action to process.
|
|
|
|
* merger is used only to check if the merges is not suspended.
|
2017-04-01 07:20:54 +00:00
|
|
|
*/
|
|
|
|
using SelectedEntry = std::pair<ReplicatedMergeTreeQueue::LogEntryPtr, std::unique_ptr<CurrentlyExecuting>>;
|
|
|
|
SelectedEntry selectEntryToProcess(MergeTreeDataMerger & merger, MergeTreeData & data);
|
|
|
|
|
2017-04-16 15:00:33 +00:00
|
|
|
/** Execute `func` function to handle the action.
|
|
|
|
* In this case, at runtime, mark the queue element as running
|
|
|
|
* (add into future_parts and more).
|
|
|
|
* If there was an exception during processing, it saves it in `entry`.
|
|
|
|
* Returns true if there were no exceptions during the processing.
|
2017-04-01 07:20:54 +00:00
|
|
|
*/
|
|
|
|
bool processEntry(std::function<zkutil::ZooKeeperPtr()> get_zookeeper, LogEntryPtr & entry, const std::function<bool(LogEntryPtr &)> func);
|
|
|
|
|
2018-05-10 15:01:10 +00:00
|
|
|
ReplicatedMergeTreeMergePredicate getMergePredicate(zkutil::ZooKeeperPtr & zookeeper) const;
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2017-04-16 15:00:33 +00:00
|
|
|
/// Prohibit merges in the specified range.
|
2017-04-01 07:20:54 +00:00
|
|
|
void disableMergesInRange(const String & part_name);
|
|
|
|
|
2017-05-12 13:47:42 +00:00
|
|
|
/** Check that part isn't in currently generating parts and isn't covered by them and add it to future_parts.
|
|
|
|
* Locks queue's mutex.
|
|
|
|
*/
|
|
|
|
bool addFuturePartIfNotCoveredByThem(const String & part_name, const LogEntry & entry, String & reject_reason);
|
|
|
|
|
2017-04-16 15:00:33 +00:00
|
|
|
/// Count the number of merges in the queue.
|
2018-03-03 16:26:06 +00:00
|
|
|
size_t countMerges() const;
|
2017-04-01 07:20:54 +00:00
|
|
|
|
|
|
|
struct Status
|
|
|
|
{
|
|
|
|
UInt32 future_parts;
|
|
|
|
UInt32 queue_size;
|
|
|
|
UInt32 inserts_in_queue;
|
|
|
|
UInt32 merges_in_queue;
|
|
|
|
UInt32 queue_oldest_time;
|
|
|
|
UInt32 inserts_oldest_time;
|
|
|
|
UInt32 merges_oldest_time;
|
|
|
|
String oldest_part_to_get;
|
|
|
|
String oldest_part_to_merge_to;
|
|
|
|
UInt32 last_queue_update;
|
|
|
|
};
|
|
|
|
|
2017-04-16 15:00:33 +00:00
|
|
|
/// Get information about the queue.
|
2018-03-03 16:26:06 +00:00
|
|
|
Status getStatus() const;
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2017-04-16 15:00:33 +00:00
|
|
|
/// Get the data of the queue elements.
|
2017-04-01 07:20:54 +00:00
|
|
|
using LogEntriesData = std::vector<ReplicatedMergeTreeLogEntryData>;
|
2018-03-03 16:26:06 +00:00
|
|
|
void getEntries(LogEntriesData & res) const;
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2017-04-16 15:00:33 +00:00
|
|
|
/// Get information about the insertion times.
|
2017-04-01 07:20:54 +00:00
|
|
|
void getInsertTimes(time_t & out_min_unprocessed_insert_time, time_t & out_max_processed_insert_time) const;
|
2016-01-10 04:44:12 +00:00
|
|
|
};
|
|
|
|
|
2018-05-10 15:01:10 +00:00
|
|
|
class ReplicatedMergeTreeMergePredicate
|
|
|
|
{
|
|
|
|
public:
|
|
|
|
ReplicatedMergeTreeMergePredicate(
|
|
|
|
const ReplicatedMergeTreeQueue & queue_, ActiveDataPartSet virtual_parts_, Int64 log_pointer,
|
|
|
|
zkutil::ZooKeeperPtr & zookeeper);
|
|
|
|
|
|
|
|
/// Can we assign a merge with these two parts?
|
|
|
|
/// (assuming that no merge was assigned after the predicate was constructed)
|
|
|
|
/// If we can't and out_reason is not nullptr, set it to the reason why we can't merge.
|
|
|
|
bool operator()(
|
|
|
|
const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right,
|
|
|
|
String * out_reason = nullptr) const;
|
|
|
|
|
|
|
|
private:
|
|
|
|
const ReplicatedMergeTreeQueue & queue;
|
|
|
|
|
|
|
|
/// A snapshot of active parts that would appear if the replica executes all log entries in its queue.
|
|
|
|
ActiveDataPartSet virtual_parts;
|
|
|
|
/// partition ID -> block numbers of the inserts that are about to commit (loaded at some later time than virtual_parts).
|
|
|
|
std::unordered_map<String, std::set<Int64>> current_inserts;
|
|
|
|
/// The same as virtual_parts but loaded at some later time than current inserts.
|
|
|
|
ActiveDataPartSet next_virtual_parts;
|
|
|
|
|
|
|
|
/// Quorum state taken at some later time than virtual_parts.
|
|
|
|
String last_quorum_part;
|
|
|
|
String inprogress_quorum_part;
|
|
|
|
};
|
|
|
|
|
2016-01-10 04:44:12 +00:00
|
|
|
|
2017-04-16 15:00:33 +00:00
|
|
|
/** Convert a number to a string in the format of the suffixes of auto-incremental nodes in ZooKeeper.
|
|
|
|
* Negative numbers are also supported - for them the name of the node looks somewhat silly
|
|
|
|
* and does not match any auto-incremented node in ZK.
|
2016-01-10 04:44:12 +00:00
|
|
|
*/
|
|
|
|
String padIndex(Int64 index);
|
|
|
|
|
|
|
|
}
|