mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-13 11:04:10 +00:00
Merge
This commit is contained in:
commit
1fec35c80c
@ -48,9 +48,11 @@ public:
|
|||||||
|
|
||||||
~ReshardingWorker();
|
~ReshardingWorker();
|
||||||
|
|
||||||
/// Запустить поток выполняющий задачи перешардирования.
|
/// Start the thread which performs resharding jobs.
|
||||||
void start();
|
void start();
|
||||||
|
|
||||||
|
/// Stop the thread which performs resharding jobs.
|
||||||
|
/// If any job is in progress, put it on hold for further execution.
|
||||||
void shutdown();
|
void shutdown();
|
||||||
|
|
||||||
/// Прислать запрос на перешардирование.
|
/// Прислать запрос на перешардирование.
|
||||||
@ -61,7 +63,7 @@ public:
|
|||||||
|
|
||||||
/// Создать новый координатор распределённой задачи. Вызывается с инициатора.
|
/// Создать новый координатор распределённой задачи. Вызывается с инициатора.
|
||||||
std::string createCoordinator(const Cluster & cluster);
|
std::string createCoordinator(const Cluster & cluster);
|
||||||
///
|
/// Register a query into a coordinator.
|
||||||
void registerQuery(const std::string & coordinator_id, const std::string & query);
|
void registerQuery(const std::string & coordinator_id, const std::string & query);
|
||||||
/// Удалить координатор.
|
/// Удалить координатор.
|
||||||
void deleteCoordinator(const std::string & coordinator_id);
|
void deleteCoordinator(const std::string & coordinator_id);
|
||||||
@ -72,8 +74,10 @@ public:
|
|||||||
void unsubscribe(const std::string & coordinator_id);
|
void unsubscribe(const std::string & coordinator_id);
|
||||||
/// Увеличить количество партиций входящих в одну распределённую задачу. Вызывается с исполнителя.
|
/// Увеличить количество партиций входящих в одну распределённую задачу. Вызывается с исполнителя.
|
||||||
void addPartitions(const std::string & coordinator_id, const PartitionList & partition_list);
|
void addPartitions(const std::string & coordinator_id, const PartitionList & partition_list);
|
||||||
///
|
/// Rearrange partitions into two categories: coordinated job, uncoordinated job.
|
||||||
ReshardingWorker::PartitionList::iterator categorizePartitions(const std::string & coordinator_id, ReshardingWorker::PartitionList & partition_list);
|
/// Returns an iterator to the beginning of the list of uncoordinated jobs.
|
||||||
|
ReshardingWorker::PartitionList::iterator categorizePartitions(const std::string & coordinator_id,
|
||||||
|
ReshardingWorker::PartitionList & partition_list);
|
||||||
/// Получить количество партиций входящих в одну распределённую задачу. Вызывается с исполнителя.
|
/// Получить количество партиций входящих в одну распределённую задачу. Вызывается с исполнителя.
|
||||||
size_t getPartitionCount(const std::string & coordinator_id);
|
size_t getPartitionCount(const std::string & coordinator_id);
|
||||||
/// Получить количество учавствующих узлов.
|
/// Получить количество учавствующих узлов.
|
||||||
@ -83,12 +87,10 @@ public:
|
|||||||
/// Ждать завершение всех необходмых отмен подписей.
|
/// Ждать завершение всех необходмых отмен подписей.
|
||||||
void waitForOptOutCompletion(const std::string & coordinator_id, size_t count);
|
void waitForOptOutCompletion(const std::string & coordinator_id, size_t count);
|
||||||
|
|
||||||
/// Установить статус заданной распределённой задачи.
|
/// Set the shard-independent status of a given coordinator.
|
||||||
void setStatus(const std::string & coordinator_id, Status status);
|
void setStatus(const std::string & coordinator_id, Status status);
|
||||||
///
|
/// Set the status of a shard under a given coordinator.
|
||||||
void setStatus(const std::string & coordinator_id, const std::string & hostname, Status status);
|
void setStatus(const std::string & coordinator_id, const std::string & hostname, Status status);
|
||||||
/// Получить статус заданной распределённой задачи.
|
|
||||||
Status getStatus();
|
|
||||||
|
|
||||||
zkutil::RWLock createDeletionLock(const std::string & coordinator_id);
|
zkutil::RWLock createDeletionLock(const std::string & coordinator_id);
|
||||||
|
|
||||||
@ -125,15 +127,17 @@ private:
|
|||||||
/// Удалить временные данные с локального узла и ZooKeeper'а.
|
/// Удалить временные данные с локального узла и ZooKeeper'а.
|
||||||
void softCleanup();
|
void softCleanup();
|
||||||
void hardCleanup();
|
void hardCleanup();
|
||||||
void cleanupCommon();
|
|
||||||
|
|
||||||
/// Принудительно завершить поток.
|
/// Принудительно завершить поток, если выполнено условие.
|
||||||
void abortPollingIfRequested();
|
void abortPollingIfRequested();
|
||||||
void abortCoordinatorIfRequested(const std::string & coordinator_id);
|
void abortCoordinatorIfRequested(const std::string & coordinator_id);
|
||||||
void abortRecoveryIfRequested();
|
void abortRecoveryIfRequested();
|
||||||
void abortJobIfRequested();
|
void abortJobIfRequested();
|
||||||
|
|
||||||
|
/// Get the current job-independent status of the coordinator.
|
||||||
Status getCoordinatorStatus(const std::string & coordinator_id);
|
Status getCoordinatorStatus(const std::string & coordinator_id);
|
||||||
|
/// Get the status of the current distributed job.
|
||||||
|
Status getStatus();
|
||||||
|
|
||||||
/// Зарегистрировать задачу в соответствующий координатор.
|
/// Зарегистрировать задачу в соответствующий координатор.
|
||||||
void attachJob();
|
void attachJob();
|
||||||
@ -144,27 +148,35 @@ private:
|
|||||||
|
|
||||||
size_t getPartitionCountUnlocked(const std::string & coordinator_id);
|
size_t getPartitionCountUnlocked(const std::string & coordinator_id);
|
||||||
|
|
||||||
bool updateOfflineNodes(const std::string & coordinator_id);
|
/// Detect offline nodes under a given coordinator.
|
||||||
bool updateOfflineNodes();
|
bool detectOfflineNodes(const std::string & coordinator_id);
|
||||||
bool updateOfflineNodesCommon(const std::string & path, const std::string & coordinator_id);
|
/// Detect offline nodes under the current job.
|
||||||
|
bool detectOfflineNodes();
|
||||||
Status getStatusCommon(const std::string & path, const std::string & coordinator_id);
|
|
||||||
|
|
||||||
/// Функции, которые создают необходимые объекты для синхронизации
|
/// Функции, которые создают необходимые объекты для синхронизации
|
||||||
/// распределённых задач.
|
/// распределённых задач.
|
||||||
zkutil::RWLock createLock();
|
zkutil::RWLock createLock();
|
||||||
zkutil::RWLock createCoordinatorLock(const std::string & coordinator_id);
|
zkutil::RWLock createCoordinatorLock(const std::string & coordinator_id);
|
||||||
|
|
||||||
zkutil::SingleBarrier createCheckBarrier(const std::string & coordinator_id);
|
zkutil::SingleBarrier createCheckBarrier(const std::string & coordinator_id);
|
||||||
zkutil::SingleBarrier createOptOutBarrier(const std::string & coordinator_id, size_t count);
|
zkutil::SingleBarrier createOptOutBarrier(const std::string & coordinator_id, size_t count);
|
||||||
zkutil::SingleBarrier createRecoveryBarrier(const ReshardingJob & job);
|
zkutil::SingleBarrier createRecoveryBarrier(const ReshardingJob & job);
|
||||||
zkutil::SingleBarrier createUploadBarrier(const ReshardingJob & job);
|
zkutil::SingleBarrier createUploadBarrier(const ReshardingJob & job);
|
||||||
|
|
||||||
std::string computeHash(const std::string & in);
|
/// Get the ZooKeeper path of a given coordinator.
|
||||||
|
|
||||||
std::string getCoordinatorPath(const std::string & coordinator_id) const;
|
std::string getCoordinatorPath(const std::string & coordinator_id) const;
|
||||||
|
/// Get the ZooKeeper path of a given job partition.
|
||||||
std::string getPartitionPath(const ReshardingJob & job) const;
|
std::string getPartitionPath(const ReshardingJob & job) const;
|
||||||
|
|
||||||
|
/// Common code for softCleanup() and hardCleanup().
|
||||||
|
void cleanupCommon();
|
||||||
|
/// Common code for detectOfflineNodes().
|
||||||
|
bool detectOfflineNodesCommon(const std::string & path, const std::string & coordinator_id);
|
||||||
|
/// Common code for getStatus() and getCoordinatorStatus().
|
||||||
|
Status getStatusCommon(const std::string & path, const std::string & coordinator_id);
|
||||||
|
|
||||||
|
/// Compute the hash value of a given string.
|
||||||
|
std::string computeHash(const std::string & in);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
ReshardingJob current_job;
|
ReshardingJob current_job;
|
||||||
std::thread polling_thread;
|
std::thread polling_thread;
|
||||||
|
@ -30,8 +30,11 @@
|
|||||||
#include <Poco/SharedPtr.h>
|
#include <Poco/SharedPtr.h>
|
||||||
|
|
||||||
#include <openssl/sha.h>
|
#include <openssl/sha.h>
|
||||||
|
|
||||||
#include <future>
|
#include <future>
|
||||||
#include <chrono>
|
#include <chrono>
|
||||||
|
#include <cstdlib>
|
||||||
|
#include <ctime>
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
@ -97,7 +100,7 @@ private:
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Rationale for distributed jobs.
|
/// Rationale for distributed jobs:
|
||||||
///
|
///
|
||||||
/// A distributed job is initiated in a query ALTER TABLE RESHARD inside which
|
/// A distributed job is initiated in a query ALTER TABLE RESHARD inside which
|
||||||
/// we specify a distributed table. Then ClickHouse creates a job coordinating
|
/// we specify a distributed table. Then ClickHouse creates a job coordinating
|
||||||
@ -107,6 +110,23 @@ private:
|
|||||||
/// receives one query ALTER TABLE RESHARD with the keyword COORDINATE WITH
|
/// receives one query ALTER TABLE RESHARD with the keyword COORDINATE WITH
|
||||||
/// indicating the aforementioned coordinator id.
|
/// indicating the aforementioned coordinator id.
|
||||||
///
|
///
|
||||||
|
/// Locking notes:
|
||||||
|
///
|
||||||
|
/// In order to avoid any deadlock situation, two approaches are implemented:
|
||||||
|
///
|
||||||
|
/// 1. As long as a cluster is busy with a distributed job, we forbid clients
|
||||||
|
/// to submit any new distributed job on this cluster. For this purpose, clusters
|
||||||
|
/// are identified by the hash value of the ordered list of their hostnames and
|
||||||
|
/// ports. In the event that an initiator should identify a cluster node by means
|
||||||
|
/// of a local address, some additional checks are performed on shards themselves.
|
||||||
|
///
|
||||||
|
/// 2. Also, the jobs that constitute a distributed job are submitted in identical
|
||||||
|
/// order on all the shards of a cluster. If one or more shards fail while performing
|
||||||
|
/// a distributed job, the latter is put on hold. Then no new jobs are performed
|
||||||
|
/// until the failing shards have come back online.
|
||||||
|
///
|
||||||
|
/// ZooKeeper coordinator structure description:
|
||||||
|
///
|
||||||
/// At the highest level we have under the /resharding_distributed znode:
|
/// At the highest level we have under the /resharding_distributed znode:
|
||||||
///
|
///
|
||||||
/// /lock: global distributed read/write lock;
|
/// /lock: global distributed read/write lock;
|
||||||
@ -582,6 +602,12 @@ void ReshardingWorker::publishShardedPartitions()
|
|||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TaskInfo(const TaskInfo &) = delete;
|
||||||
|
TaskInfo & operator=(const TaskInfo &) = delete;
|
||||||
|
|
||||||
|
TaskInfo(TaskInfo &&) = default;
|
||||||
|
TaskInfo & operator=(TaskInfo &&) = default;
|
||||||
|
|
||||||
std::string replica_path;
|
std::string replica_path;
|
||||||
ReplicatedMergeTreeAddress dest;
|
ReplicatedMergeTreeAddress dest;
|
||||||
std::string part;
|
std::string part;
|
||||||
@ -704,17 +730,24 @@ void ReshardingWorker::publishShardedPartitions()
|
|||||||
|
|
||||||
void ReshardingWorker::applyChanges()
|
void ReshardingWorker::applyChanges()
|
||||||
{
|
{
|
||||||
|
/// Note: since this method actually performs a distributed commit (i.e. it
|
||||||
|
/// attaches partitions on various shards), we should implement a two-phase
|
||||||
|
/// commit protocol in a future release in order to get even more safety
|
||||||
|
/// guarantees.
|
||||||
|
|
||||||
LOG_DEBUG(log, "Attaching new partitions.");
|
LOG_DEBUG(log, "Attaching new partitions.");
|
||||||
|
|
||||||
auto & storage = *(current_job.storage);
|
auto & storage = *(current_job.storage);
|
||||||
auto zookeeper = context.getZooKeeper();
|
auto zookeeper = context.getZooKeeper();
|
||||||
|
|
||||||
/// На локальном узле удалить первоначальную партицию.
|
/// Locally drop the initial partition.
|
||||||
std::string query_str = "ALTER TABLE " + current_job.database_name + "."
|
std::string query_str = "ALTER TABLE " + current_job.database_name + "."
|
||||||
+ current_job.table_name + " DROP PARTITION " + current_job.partition;
|
+ current_job.table_name + " DROP PARTITION " + current_job.partition;
|
||||||
(void) executeQuery(query_str, context, true);
|
(void) executeQuery(query_str, context, true);
|
||||||
|
|
||||||
/// На всех участвующих репликах добавить соответствующие шардированные партиции в таблицу.
|
/// On each participating shard, attach the corresponding sharded partition to the table.
|
||||||
|
|
||||||
|
/// Description of a task on a replica.
|
||||||
struct TaskInfo
|
struct TaskInfo
|
||||||
{
|
{
|
||||||
TaskInfo(const std::string & replica_path_, const ReplicatedMergeTreeAddress & dest_)
|
TaskInfo(const std::string & replica_path_, const ReplicatedMergeTreeAddress & dest_)
|
||||||
@ -722,13 +755,52 @@ void ReshardingWorker::applyChanges()
|
|||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TaskInfo(const TaskInfo &) = delete;
|
||||||
|
TaskInfo & operator=(const TaskInfo &) = delete;
|
||||||
|
|
||||||
|
TaskInfo(TaskInfo &&) = default;
|
||||||
|
TaskInfo & operator=(TaskInfo &&) = default;
|
||||||
|
|
||||||
std::string replica_path;
|
std::string replica_path;
|
||||||
ReplicatedMergeTreeAddress dest;
|
ReplicatedMergeTreeAddress dest;
|
||||||
};
|
};
|
||||||
|
|
||||||
using TaskInfoList = std::vector<TaskInfo>;
|
/// Description of tasks for each replica of a shard.
|
||||||
|
/// For fault tolerance purposes, some fields are provided
|
||||||
|
/// to perform attempts on more than one replica if needed.
|
||||||
|
struct ShardTaskInfo
|
||||||
|
{
|
||||||
|
ShardTaskInfo()
|
||||||
|
{
|
||||||
|
struct timespec times;
|
||||||
|
if (clock_gettime(CLOCK_THREAD_CPUTIME_ID, ×))
|
||||||
|
throwFromErrno("Cannot clock_gettime.", DB::ErrorCodes::CANNOT_CLOCK_GETTIME);
|
||||||
|
|
||||||
|
(void) srand48_r(reinterpret_cast<intptr_t>(this) ^ times.tv_nsec, &rand_state);
|
||||||
|
}
|
||||||
|
|
||||||
|
ShardTaskInfo(const ShardTaskInfo &) = delete;
|
||||||
|
ShardTaskInfo & operator=(const ShardTaskInfo &) = delete;
|
||||||
|
|
||||||
|
ShardTaskInfo(ShardTaskInfo &&) = default;
|
||||||
|
ShardTaskInfo & operator=(ShardTaskInfo &&) = default;
|
||||||
|
|
||||||
|
/// one task for each replica
|
||||||
|
std::vector<TaskInfo> shard_tasks;
|
||||||
|
/// index to the replica to be used
|
||||||
|
size_t next = 0;
|
||||||
|
/// result of the operation on the current replica
|
||||||
|
bool is_success = false;
|
||||||
|
/// index to the corresponding thread pool entry
|
||||||
|
size_t pool_index;
|
||||||
|
drand48_data rand_state;
|
||||||
|
};
|
||||||
|
|
||||||
|
using TaskInfoList = std::vector<ShardTaskInfo>;
|
||||||
|
|
||||||
TaskInfoList task_info_list;
|
TaskInfoList task_info_list;
|
||||||
|
|
||||||
|
/// Initialize all the possible tasks for each replica of each shard.
|
||||||
for (const auto & entry : storage.data.per_shard_data_parts)
|
for (const auto & entry : storage.data.per_shard_data_parts)
|
||||||
{
|
{
|
||||||
size_t shard_no = entry.first;
|
size_t shard_no = entry.first;
|
||||||
@ -739,55 +811,95 @@ void ReshardingWorker::applyChanges()
|
|||||||
const WeightedZooKeeperPath & weighted_path = current_job.paths[shard_no];
|
const WeightedZooKeeperPath & weighted_path = current_job.paths[shard_no];
|
||||||
const std::string & zookeeper_path = weighted_path.first;
|
const std::string & zookeeper_path = weighted_path.first;
|
||||||
|
|
||||||
|
task_info_list.emplace_back();
|
||||||
|
ShardTaskInfo & shard_task_info = task_info_list.back();
|
||||||
|
|
||||||
auto children = zookeeper->getChildren(zookeeper_path + "/replicas");
|
auto children = zookeeper->getChildren(zookeeper_path + "/replicas");
|
||||||
for (const auto & child : children)
|
for (const auto & child : children)
|
||||||
{
|
{
|
||||||
const std::string replica_path = zookeeper_path + "/replicas/" + child;
|
const std::string replica_path = zookeeper_path + "/replicas/" + child;
|
||||||
auto host = zookeeper->get(replica_path + "/host");
|
auto host = zookeeper->get(replica_path + "/host");
|
||||||
ReplicatedMergeTreeAddress host_desc(host);
|
ReplicatedMergeTreeAddress host_desc(host);
|
||||||
task_info_list.emplace_back(replica_path, host_desc);
|
|
||||||
|
shard_task_info.shard_tasks.emplace_back(replica_path, host_desc);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
boost::threadpool::pool pool(task_info_list.size());
|
/// Loop as long as there are ATTACH operations that need to be performed
|
||||||
|
/// on some shards and there remains at least one valid replica on each of
|
||||||
using Tasks = std::vector<std::packaged_task<bool()> >;
|
/// these shards.
|
||||||
Tasks tasks(task_info_list.size());
|
size_t remaining_task_count = task_info_list.size();
|
||||||
|
while (remaining_task_count > 0)
|
||||||
try
|
|
||||||
{
|
{
|
||||||
for (size_t i = 0; i < task_info_list.size(); ++i)
|
boost::threadpool::pool pool(remaining_task_count);
|
||||||
|
|
||||||
|
using Tasks = std::vector<std::packaged_task<bool()> >;
|
||||||
|
Tasks tasks(remaining_task_count);
|
||||||
|
|
||||||
|
try
|
||||||
{
|
{
|
||||||
const auto & entry = task_info_list[i];
|
size_t pool_index = 0;
|
||||||
const auto & replica_path = entry.replica_path;
|
for (auto & info : task_info_list)
|
||||||
const auto & dest = entry.dest;
|
{
|
||||||
|
if (info.is_success)
|
||||||
|
{
|
||||||
|
/// We have already successfully performed the operation on this shard.
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
InterserverIOEndpointLocation location(replica_path, dest.host, dest.replication_port);
|
/// Randomly choose a replica on which to perform the operation.
|
||||||
|
long int rand_res;
|
||||||
|
(void) lrand48_r(&info.rand_state, &rand_res);
|
||||||
|
size_t current = info.next + rand_res % (info.shard_tasks.size() - info.next);
|
||||||
|
std::swap(info.shard_tasks[info.next], info.shard_tasks[current]);
|
||||||
|
++info.next;
|
||||||
|
|
||||||
std::string query_str = "ALTER TABLE " + dest.database + "."
|
info.pool_index = pool_index;
|
||||||
+ dest.table + " ATTACH PARTITION " + current_job.partition;
|
|
||||||
|
|
||||||
tasks[i] = Tasks::value_type(std::bind(&RemoteQueryExecutor::Client::executeQuery,
|
TaskInfo & cur_task_info = info.shard_tasks[info.next - 1];
|
||||||
&storage.remote_query_executor_client, location, query_str));
|
|
||||||
|
|
||||||
pool.schedule([i, &tasks]{ tasks[i](); });
|
const auto & replica_path = cur_task_info.replica_path;
|
||||||
|
const auto & dest = cur_task_info.dest;
|
||||||
|
|
||||||
|
/// Create and register the task.
|
||||||
|
|
||||||
|
InterserverIOEndpointLocation location(replica_path, dest.host, dest.replication_port);
|
||||||
|
|
||||||
|
std::string query_str = "ALTER TABLE " + dest.database + "."
|
||||||
|
+ dest.table + " ATTACH PARTITION " + current_job.partition;
|
||||||
|
|
||||||
|
tasks[pool_index] = Tasks::value_type(std::bind(&RemoteQueryExecutor::Client::executeQuery,
|
||||||
|
&storage.remote_query_executor_client, location, query_str));
|
||||||
|
|
||||||
|
pool.schedule([pool_index, &tasks]{ tasks[pool_index](); });
|
||||||
|
|
||||||
|
/// Allocate an entry for the next task.
|
||||||
|
++pool_index;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
catch (...)
|
||||||
catch (...)
|
{
|
||||||
{
|
pool.wait();
|
||||||
tryLogCurrentException(__PRETTY_FUNCTION__);
|
throw;
|
||||||
|
}
|
||||||
|
|
||||||
pool.wait();
|
pool.wait();
|
||||||
throw;
|
|
||||||
}
|
|
||||||
|
|
||||||
pool.wait();
|
for (auto & info : task_info_list)
|
||||||
|
{
|
||||||
|
if (info.is_success)
|
||||||
|
continue;
|
||||||
|
|
||||||
for (auto & task : tasks)
|
info.is_success = tasks[info.pool_index].get_future().get();
|
||||||
{
|
if (info.is_success)
|
||||||
bool res = task.get_future().get();
|
--remaining_task_count;
|
||||||
if (!res)
|
else if (info.next == info.shard_tasks.size())
|
||||||
throw Exception("Failed to attach partition on replica",
|
{
|
||||||
ErrorCodes::PARTITION_ATTACH_FAILED);
|
/// No more attempts are possible.
|
||||||
|
throw Exception("Failed to attach partition on shard",
|
||||||
|
ErrorCodes::PARTITION_ATTACH_FAILED);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -863,8 +975,7 @@ std::string ReshardingWorker::createCoordinator(const Cluster & cluster)
|
|||||||
"", zkutil::CreateMode::Persistent);
|
"", zkutil::CreateMode::Persistent);
|
||||||
|
|
||||||
(void) zookeeper->create(getCoordinatorPath(coordinator_id) + "/status",
|
(void) zookeeper->create(getCoordinatorPath(coordinator_id) + "/status",
|
||||||
"", zkutil::CreateMode::Persistent);
|
toString(static_cast<UInt64>(STATUS_OK)), zkutil::CreateMode::Persistent);
|
||||||
setStatus(coordinator_id, STATUS_OK);
|
|
||||||
|
|
||||||
(void) zookeeper->create(getCoordinatorPath(coordinator_id) + "/partitions",
|
(void) zookeeper->create(getCoordinatorPath(coordinator_id) + "/partitions",
|
||||||
"", zkutil::CreateMode::Persistent);
|
"", zkutil::CreateMode::Persistent);
|
||||||
@ -995,9 +1106,7 @@ UInt64 ReshardingWorker::subscribe(const std::string & coordinator_id, const std
|
|||||||
throw zkutil::KeeperException(code);
|
throw zkutil::KeeperException(code);
|
||||||
|
|
||||||
zookeeper->create(getCoordinatorPath(coordinator_id) + "/status/"
|
zookeeper->create(getCoordinatorPath(coordinator_id) + "/status/"
|
||||||
+ current_host, "", zkutil::CreateMode::Persistent);
|
+ current_host, toString(static_cast<UInt64>(STATUS_OK)), zkutil::CreateMode::Persistent);
|
||||||
|
|
||||||
setStatus(coordinator_id, current_host, STATUS_OK);
|
|
||||||
|
|
||||||
/// Assign a unique block number to the current node. We will use it in order
|
/// Assign a unique block number to the current node. We will use it in order
|
||||||
/// to avoid any possible conflict when uploading resharded partitions.
|
/// to avoid any possible conflict when uploading resharded partitions.
|
||||||
@ -1159,17 +1268,17 @@ void ReshardingWorker::setStatus(const std::string & coordinator_id, const std::
|
|||||||
toString(static_cast<UInt64>(status)));
|
toString(static_cast<UInt64>(status)));
|
||||||
}
|
}
|
||||||
|
|
||||||
bool ReshardingWorker::updateOfflineNodes(const std::string & coordinator_id)
|
bool ReshardingWorker::detectOfflineNodes(const std::string & coordinator_id)
|
||||||
{
|
{
|
||||||
return updateOfflineNodesCommon(getCoordinatorPath(coordinator_id) + "/status", coordinator_id);
|
return detectOfflineNodesCommon(getCoordinatorPath(coordinator_id) + "/status", coordinator_id);
|
||||||
}
|
}
|
||||||
|
|
||||||
bool ReshardingWorker::updateOfflineNodes()
|
bool ReshardingWorker::detectOfflineNodes()
|
||||||
{
|
{
|
||||||
return updateOfflineNodesCommon(getPartitionPath(current_job) + "/nodes", current_job.coordinator_id);
|
return detectOfflineNodesCommon(getPartitionPath(current_job) + "/nodes", current_job.coordinator_id);
|
||||||
}
|
}
|
||||||
|
|
||||||
bool ReshardingWorker::updateOfflineNodesCommon(const std::string & path, const std::string & coordinator_id)
|
bool ReshardingWorker::detectOfflineNodesCommon(const std::string & path, const std::string & coordinator_id)
|
||||||
{
|
{
|
||||||
auto zookeeper = context.getZooKeeper();
|
auto zookeeper = context.getZooKeeper();
|
||||||
|
|
||||||
@ -1215,7 +1324,7 @@ ReshardingWorker::Status ReshardingWorker::getStatusCommon(const std::string & p
|
|||||||
if (coordinator_status != STATUS_OK)
|
if (coordinator_status != STATUS_OK)
|
||||||
return static_cast<Status>(coordinator_status);
|
return static_cast<Status>(coordinator_status);
|
||||||
|
|
||||||
(void) updateOfflineNodesCommon(path, coordinator_id);
|
(void) detectOfflineNodesCommon(path, coordinator_id);
|
||||||
|
|
||||||
auto nodes = zookeeper->getChildren(path);
|
auto nodes = zookeeper->getChildren(path);
|
||||||
|
|
||||||
@ -1341,7 +1450,7 @@ void ReshardingWorker::abortRecoveryIfRequested()
|
|||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
has_offline_nodes = updateOfflineNodes();
|
has_offline_nodes = detectOfflineNodes();
|
||||||
must_abort = must_stop || has_offline_nodes;
|
must_abort = must_stop || has_offline_nodes;
|
||||||
}
|
}
|
||||||
catch (...)
|
catch (...)
|
||||||
|
@ -3761,6 +3761,12 @@ StorageReplicatedMergeTree::gatherReplicaSpaceInfo(const WeightedZooKeeperPaths
|
|||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TaskInfo(const TaskInfo &) = delete;
|
||||||
|
TaskInfo & operator=(const TaskInfo &) = delete;
|
||||||
|
|
||||||
|
TaskInfo(TaskInfo &&) = default;
|
||||||
|
TaskInfo & operator=(TaskInfo &&) = default;
|
||||||
|
|
||||||
std::string replica_path;
|
std::string replica_path;
|
||||||
ReplicatedMergeTreeAddress address;
|
ReplicatedMergeTreeAddress address;
|
||||||
};
|
};
|
||||||
|
Loading…
Reference in New Issue
Block a user