ClickHouse/dbms/include/DB/Storages/MergeTree/ReshardingWorker.h

187 lines
8.2 KiB
C
Raw Normal View History

2016-01-28 01:00:27 +00:00
#pragma once
2016-03-01 17:47:53 +00:00
#include <DB/Storages/MergeTree/ReshardingJob.h>
2016-01-28 01:00:27 +00:00
#include <DB/Storages/AlterCommands.h>
#include <common/logger_useful.h>
2016-03-01 17:47:53 +00:00
#include <zkutil/RWLock.h>
#include <zkutil/SingleBarrier.h>
2016-01-28 01:00:42 +00:00
#include <Poco/Util/LayeredConfiguration.h>
2016-01-28 01:00:27 +00:00
#include <Poco/SharedPtr.h>
2016-03-01 17:47:53 +00:00
2016-01-28 01:00:27 +00:00
#include <string>
#include <thread>
#include <atomic>
2016-03-01 17:47:53 +00:00
#include <functional>
2016-01-28 01:00:27 +00:00
namespace DB
{
class Context;
2016-03-01 17:47:53 +00:00
class Cluster;
2016-01-28 01:00:27 +00:00
class StorageReplicatedMergeTree;
/** Исполнитель задач перешардирования.
* Рабоает в фоновом режиме внутри одного потока.
* Следит за появлением задач и назначает их на выполнение.
* Задачи выполняются последовательно.
*/
class ReshardingWorker final
{
2016-03-01 17:47:53 +00:00
public:
using PartitionList = std::vector<std::string>;
enum Status
{
STATUS_OK = 0,
STATUS_ERROR, /// Произошла ошибка на одном исполнителе.
STATUS_ON_HOLD /// Задача приостановлена.
};
2016-01-28 01:00:27 +00:00
public:
2016-01-28 01:00:42 +00:00
ReshardingWorker(const Poco::Util::AbstractConfiguration & config,
const std::string & config_name, Context & context_);
2016-01-28 01:00:27 +00:00
ReshardingWorker(const ReshardingWorker &) = delete;
ReshardingWorker & operator=(const ReshardingWorker &) = delete;
~ReshardingWorker();
/// Запустить поток выполняющий задачи перешардирования.
void start();
2016-03-01 17:47:53 +00:00
void shutdown();
2016-01-28 01:00:27 +00:00
/// Прислать запрос на перешардирование.
2016-03-01 17:47:53 +00:00
void submitJob(const ReshardingJob & job);
2016-01-28 01:00:27 +00:00
/// Был ли поток запущен?
bool isStarted() const;
2016-03-01 17:47:53 +00:00
/// Создать новый координатор распределённой задачи. Вызывается с инициатора.
std::string createCoordinator(const Cluster & cluster);
///
void registerQuery(const std::string & coordinator_id, const std::string & query);
/// Удалить координатор.
void deleteCoordinator(const std::string & coordinator_id);
/// Подписаться к заданному координатору. Вызывается с исполнителя.
UInt64 subscribe(const std::string & coordinator_id, const std::string & query);
/// Отменить подпись к заданному координатору. Вызывается с исполнителя.
void unsubscribe(const std::string & coordinator_id);
/// Увеличить количество партиций входящих в одну распределённую задачу. Вызывается с исполнителя.
void addPartitions(const std::string & coordinator_id, const PartitionList & partition_list);
///
ReshardingWorker::PartitionList::iterator categorizePartitions(const std::string & coordinator_id, ReshardingWorker::PartitionList & partition_list);
/// Получить количество партиций входящих в одну распределённую задачу. Вызывается с исполнителя.
size_t getPartitionCount(const std::string & coordinator_id);
/// Получить количество учавствующих узлов.
size_t getNodeCount(const std::string & coordinator_id);
/// Ждать завершение проверок на всех исполнителях. Вызывается с исполнителя.
void waitForCheckCompletion(const std::string & coordinator_id);
/// Ждать завершение всех необходмых отмен подписей.
void waitForOptOutCompletion(const std::string & coordinator_id, size_t count);
/// Установить статус заданной распределённой задачи.
void setStatus(const std::string & coordinator_id, Status status);
///
void setStatus(const std::string & coordinator_id, const std::string & hostname, Status status);
/// Получить статус заданной распределённой задачи.
Status getStatus();
zkutil::RWLock createDeletionLock(const std::string & coordinator_id);
2016-01-28 01:00:27 +00:00
private:
/// Следить за появлением новых задач. Выполнить их последовательно.
void pollAndExecute();
2016-03-01 17:47:53 +00:00
/// Подтолкнуть планировщик задач.
void jabScheduler();
2016-01-28 01:00:27 +00:00
/// Выполнить задачи, которые были в очереди выполнения при запуске узла.
void performPendingJobs();
/// Выполнить задачи, которые заданы по путям в БД ZooKeeper.
void perform(const Strings & job_nodes);
/// Выполнить одну задачу.
2016-03-01 17:47:53 +00:00
void perform(const std::string & job_descriptor);
2016-01-28 01:00:27 +00:00
/// Разбить куски входящие в партицию на несколько, согласно ключу шардирования.
/// Оновременно перегруппировать эти куски по шардам и слить куски в каждой группе.
/// При завершении этого процесса создаётся новая партиция для каждого шарда.
2016-03-01 17:47:53 +00:00
void createShardedPartitions();
2016-01-28 01:00:27 +00:00
/// Копировать все партиции полученные путём перешардирования на каждую реплику
/// соответствующих шардов.
2016-03-01 17:47:53 +00:00
void publishShardedPartitions();
2016-01-28 01:00:27 +00:00
/// Для каждого шарда добавить данные из новой партиции этого шарда в таблицу на всех
/// репликах входящих в этот же шард. На локальном узле, который выполняет задачу
/// перешардирования, удалить данные из первоначальной партиции.
2016-03-01 17:47:53 +00:00
void applyChanges();
2016-01-28 01:00:27 +00:00
/// Удалить временные данные с локального узла и ZooKeeper'а.
2016-03-01 17:47:53 +00:00
void softCleanup();
void hardCleanup();
void cleanupCommon();
2016-01-28 01:00:27 +00:00
/// Принудительно завершить поток.
2016-03-01 17:47:53 +00:00
void abortPollingIfRequested();
void abortCoordinatorIfRequested(const std::string & coordinator_id);
2016-03-01 17:47:53 +00:00
void abortRecoveryIfRequested();
void abortJobIfRequested();
Status getCoordinatorStatus(const std::string & coordinator_id);
2016-03-01 17:47:53 +00:00
/// Зарегистрировать задачу в соответствующий координатор.
void attachJob();
/// Снять задачу с координатора.
void detachJob();
/// Ждать завершение загрузок на всех исполнителях.
void waitForUploadCompletion();
size_t getPartitionCountUnlocked(const std::string & coordinator_id);
bool updateOfflineNodes(const std::string & coordinator_id);
2016-03-01 17:47:53 +00:00
bool updateOfflineNodes();
bool updateOfflineNodesCommon(const std::string & path, const std::string & coordinator_id);
Status getStatusCommon(const std::string & path, const std::string & coordinator_id);
2016-03-01 17:47:53 +00:00
/// Функции, которые создают необходимые объекты для синхронизации
/// распределённых задач.
zkutil::RWLock createLock();
zkutil::RWLock createCoordinatorLock(const std::string & coordinator_id);
zkutil::SingleBarrier createSubscribeBarrier(const std::string & coordinator_id);
zkutil::SingleBarrier createCheckBarrier(const std::string & coordinator_id);
2016-03-01 17:47:53 +00:00
zkutil::SingleBarrier createOptOutBarrier(const std::string & coordinator_id, size_t count);
zkutil::SingleBarrier createRecoveryBarrier(const ReshardingJob & job);
zkutil::SingleBarrier createUploadBarrier(const ReshardingJob & job);
std::string computeHash(const std::string & in);
std::string getCoordinatorPath(const std::string & coordinator_id) const;
std::string getPartitionPath(const ReshardingJob & job) const;
2016-01-28 01:00:27 +00:00
private:
2016-03-01 17:47:53 +00:00
ReshardingJob current_job;
std::thread polling_thread;
std::string host_task_queue_path;
std::string distributed_path;
std::string distributed_online_path;
std::string distributed_lock_path;
std::string coordination_path;
2016-01-28 01:00:27 +00:00
Context & context;
Logger * log;
2016-03-01 17:47:53 +00:00
2016-01-28 01:00:27 +00:00
std::atomic<bool> is_started{false};
std::atomic<bool> must_stop{false};
};
2016-01-28 01:00:42 +00:00
using ReshardingWorkerPtr = std::shared_ptr<ReshardingWorker>;
2016-01-28 01:00:27 +00:00
}