ClickHouse/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreeQueue.h

204 lines
10 KiB
C
Raw Normal View History

2016-01-10 04:44:12 +00:00
#include <DB/Storages/MergeTree/ReplicatedMergeTreeLogEntry.h>
#include <DB/Storages/MergeTree/ActiveDataPartSet.h>
#include <DB/Storages/MergeTree/MergeTreeData.h>
#include <zkutil/ZooKeeper.h>
namespace DB
{
class MergeTreeDataMerger;
class ReplicatedMergeTreeQueue
{
private:
friend class CurrentlyExecuting;
using StringSet = std::set<String>;
using LogEntry = ReplicatedMergeTreeLogEntry;
using LogEntryPtr = LogEntry::Ptr;
using Queue = std::list<LogEntryPtr>;
struct ByTime
{
bool operator()(const LogEntryPtr & lhs, const LogEntryPtr & rhs) const
{
return std::forward_as_tuple(lhs.get()->create_time, lhs.get())
< std::forward_as_tuple(rhs.get()->create_time, rhs.get());
}
};
/// Для вычисления min_unprocessed_insert_time, max_processed_insert_time, по которым вычисляется отставание реплик.
using InsertsByTime = std::set<LogEntryPtr, ByTime>;
2016-01-10 04:44:12 +00:00
String zookeeper_path;
String replica_path;
String logger_name;
/** Очередь того, что нужно сделать на этой реплике, чтобы всех догнать. Берется из ZooKeeper (/replicas/me/queue/).
* В ZK записи в хронологическом порядке. Здесь - не обязательно.
*/
Queue queue;
InsertsByTime inserts_by_time;
time_t min_unprocessed_insert_time = 0;
time_t max_processed_insert_time = 0;
2016-01-10 04:44:12 +00:00
time_t last_queue_update = 0;
/// Куски, которые появятся в результате действий, выполняемых прямо сейчас фоновыми потоками (этих действий нет в очереди).
/// Используется, чтобы не выполнять в тот же момент другие действия с этими кусками.
StringSet future_parts;
/// На доступ к queue, future_parts, ...
2016-01-10 04:44:12 +00:00
std::mutex mutex;
/// Обеспечивает только один одновременный вызов pullLogsToQueue.
std::mutex pull_logs_to_queue_mutex;
2016-01-10 04:44:12 +00:00
/** Каким будет множество активных кусков после выполнения всей текущей очереди - добавления новых кусков и выполнения слияний.
* Используется, чтобы определять, какие мерджи уже были назначены:
* - если в этом множестве есть кусок, то мерджи более мелких кусков внутри его диапазона не делаются.
* Дополнительно, сюда также добавляются специальные элементы, чтобы явно запретить мерджи в некотором диапазоне (см. disableMergesInRange).
* Это множество защищено своим mutex-ом.
*/
ActiveDataPartSet virtual_parts;
Logger * log = nullptr;
/// Положить набор (уже существующих) кусков в virtual_parts.
void initVirtualParts(const MergeTreeData::DataParts & parts);
/// Загрузить (инициализировать) очередь из ZooKeeper (/replicas/me/queue/).
void load(zkutil::ZooKeeperPtr zookeeper);
void insertUnlocked(LogEntryPtr & entry);
void remove(zkutil::ZooKeeperPtr zookeeper, LogEntryPtr & entry);
/** Можно ли сейчас попробовать выполнить это действие. Если нет, нужно оставить его в очереди и попробовать выполнить другое.
* Вызывается под queue_mutex.
*/
bool shouldExecuteLogEntry(const LogEntry & entry, String & out_postpone_reason, MergeTreeDataMerger & merger);
/// После удаления элемента очереди, обновить времена insert-ов в оперативке. Выполняется под queue_mutex.
/// Возвращает информацию, какие времена изменились - эту информацию можно передать в updateTimesInZooKeeper.
void updateTimesOnRemoval(const LogEntryPtr & entry, bool & min_unprocessed_insert_time_changed, bool & max_processed_insert_time_changed);
/// Обновить времена insert-ов в ZooKeeper.
void updateTimesInZooKeeper(zkutil::ZooKeeperPtr zookeeper, bool min_unprocessed_insert_time_changed, bool max_processed_insert_time_changed);
/// Помечает элемент очереди как выполняющийся.
class CurrentlyExecuting
{
private:
2016-02-03 01:24:12 +00:00
ReplicatedMergeTreeQueue::LogEntryPtr entry;
ReplicatedMergeTreeQueue & queue;
friend class ReplicatedMergeTreeQueue;
/// Создаётся только в функции selectEntryToProcess. Вызывается под mutex-ом.
CurrentlyExecuting(ReplicatedMergeTreeQueue::LogEntryPtr & entry, ReplicatedMergeTreeQueue & queue);
public:
~CurrentlyExecuting();
};
2016-01-10 04:44:12 +00:00
public:
ReplicatedMergeTreeQueue() {}
void initialize(const String & zookeeper_path_, const String & replica_path_, const String & logger_name_,
const MergeTreeData::DataParts & parts, zkutil::ZooKeeperPtr zookeeper);
/** Вставить действие в конец очереди.
* Для восстановления битых кусков во время работы.
* Не вставляет само действие в ZK (сделайте это самостоятельно).
*/
void insert(zkutil::ZooKeeperPtr zookeeper, LogEntryPtr & entry);
2016-01-10 04:44:12 +00:00
/** Удалить действие с указанным куском (в качестве new_part_name) из очереди.
* Вызывается для невыполнимых действий в очереди - старых потерянных кусков.
*/
2016-01-10 04:44:12 +00:00
bool remove(zkutil::ZooKeeperPtr zookeeper, const String & part_name);
/** Скопировать новые записи из общего лога в очередь этой реплики. Установить log_pointer в соответствующее значение.
* Если next_update_event != nullptr, вызовет это событие, когда в логе появятся новые записи.
* Возвращает true, если новые записи были.
*/
bool pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper, zkutil::EventPtr next_update_event);
/** Удалить из очереди действия с кусками, покрываемыми part_name (из ZK и из оперативки).
* А также дождаться завершения их выполнения, если они сейчас выполняются.
*/
void removeGetsAndMergesInRange(zkutil::ZooKeeperPtr zookeeper, const String & part_name);
/** В случае, когда для выполнения мерджа в part_name недостаёт кусков
* - переместить действия со сливаемыми кусками в конец очереди
* (чтобы раньше скачать готовый смердженный кусок с другой реплики).
*/
StringSet moveSiblingPartsForMergeToEndOfQueue(const String & part_name);
/** Выбрать следующее действие для обработки.
* merger используется только чтобы проверить, не приостановлены ли мерджи.
*/
using SelectedEntry = std::pair<ReplicatedMergeTreeQueue::LogEntryPtr, std::unique_ptr<CurrentlyExecuting>>;
SelectedEntry selectEntryToProcess(MergeTreeDataMerger & merger);
2016-01-10 04:44:12 +00:00
/** Выполнить функцию func для обработки действия.
* При этом, на время выполнения, отметить элемент очереди как выполняющийся
* (добавить в future_parts и другое).
* Если в процессе обработки было исключение - сохраняет его в entry.
* Возвращает true, если в процессе обработки не было исключений.
*/
bool processEntry(zkutil::ZooKeeperPtr zookeeper, LogEntryPtr & entry, const std::function<bool(LogEntryPtr &)> func);
/// Будет ли кусок в будущем слит в более крупный (или мерджи кусков в данном диапазоне запрещены)?
bool partWillBeMergedOrMergesDisabled(const String & part_name) const;
/// Запретить слияния в указанном диапазоне.
void disableMergesInRange(const String & part_name);
/// Посчитать количество слияний в очереди.
void countMerges(size_t & all_merges, size_t & big_merges, size_t max_big_merges, const std::function<bool(const String &)> & is_part_big);
struct Status
{
UInt32 future_parts;
UInt32 queue_size;
UInt32 inserts_in_queue;
UInt32 merges_in_queue;
UInt32 queue_oldest_time;
UInt32 inserts_oldest_time;
UInt32 merges_oldest_time;
String oldest_part_to_get;
String oldest_part_to_merge_to;
UInt32 last_queue_update;
};
/// Получить информацию об очереди.
Status getStatus();
/// Получить данные элементов очереди.
using LogEntriesData = std::vector<ReplicatedMergeTreeLogEntryData>;
void getEntries(LogEntriesData & res);
/// Получить информацию о временах insert-ов.
void getInsertTimes(time_t & out_min_unprocessed_insert_time, time_t & out_max_processed_insert_time) const;
2016-01-10 04:44:12 +00:00
};
/** Преобразовать число в строку формате суффиксов автоинкрементных нод в ZooKeeper.
* Поддерживаются также отрицательные числа - для них имя ноды выглядит несколько глупо
* и не соответствует никакой автоинкрементной ноде в ZK.
*/
String padIndex(Int64 index);
}