This commit is contained in:
Alexey Milovidov 2016-03-29 19:59:22 +03:00
commit 34f6887a28
9 changed files with 331 additions and 357 deletions

View File

@ -394,7 +394,6 @@ add_library (dbms
include/DB/Common/getNumberOfPhysicalCPUCores.h
include/DB/Common/BitHelpers.h
include/DB/Common/BlockFilterCreator.h
include/DB/Common/SHA512Utils.h
include/DB/IO/CompressedStream.h
include/DB/IO/ReadBufferFromFileDescriptor.h
include/DB/IO/CompressedWriteBuffer.h
@ -563,7 +562,6 @@ add_library (dbms
src/Common/Exception.cpp
src/Common/ShellCommand.cpp
src/Common/getNumberOfPhysicalCPUCores.cpp
src/Common/SHA512Utils.cpp
src/Core/Field.cpp
src/Core/FieldVisitors.cpp

View File

@ -1,13 +0,0 @@
#pragma once
#include <openssl/sha.h>
#include <string>
namespace SHA512Utils
{
void updateHash(SHA512_CTX & ctx, const std::string & path);
std::string computeHashFromString(const std::string & in);
std::string computeHashFromFolder(const std::string & path);
}

View File

@ -403,10 +403,6 @@ public:
*/
size_t getPartitionSize(const std::string & partition_name) const;
/** Возвращает хэш от партиции.
*/
std::string computePartitionHash(const std::string & partition_name) const;
size_t getColumnSize(const std::string & name) const
{
std::lock_guard<std::mutex> lock{data_parts_mutex};

View File

@ -22,11 +22,9 @@ class Context;
class Cluster;
class StorageReplicatedMergeTree;
/** Исполнитель задач перешардирования.
* Рабоает в фоновом режиме внутри одного потока.
* Следит за появлением задач и назначает их на выполнение.
* Задачи выполняются последовательно.
*/
/// Performer of resharding jobs. It works as a background task within a thread.
/// Its main duties are to keep track of newly submitted resharding jobs and
/// to execute them sequentially.
class ReshardingWorker final
{
friend class AnomalyMonitor;
@ -34,13 +32,13 @@ class ReshardingWorker final
public:
using PartitionList = std::vector<std::string>;
/// Possible status values of a coordinator or a node that
/// Possible status values of a coordinator or a performer that
/// has subscribed to a coordinator.
enum StatusCode
{
STATUS_OK = 0,
STATUS_ERROR, /// Произошла ошибка на одном исполнителе.
STATUS_ON_HOLD /// Задача приостановлена.
STATUS_ERROR, /// An error occurred on a performer.
STATUS_ON_HOLD /// Job is stopped.
};
public:
@ -52,29 +50,29 @@ public:
~ReshardingWorker();
/// Start the thread which performs resharding jobs.
/// Start the job tracker thread.
void start();
/// Stop the thread which performs resharding jobs.
/// If any job is in progress, put it on hold for further execution.
/// Stop the job tracker thread. If any job is in progress, put it on hold
/// for future execution.
void shutdown();
/// Прислать запрос на перешардирование.
/// Send a request for resharding on the current performer.
void submitJob(const ReshardingJob & job);
/// Был ли поток запущен?
/// Is the job tracker thread started?
bool isStarted() const;
/// Создать новый координатор распределённой задачи. Вызывается с инициатора.
/// Create a new coordinator for a distributed job. Called from the initiator.
std::string createCoordinator(const Cluster & cluster);
/// Register a query into a coordinator.
void registerQuery(const std::string & coordinator_id, const std::string & query);
/// Удалить координатор.
/// Clear all the information related to a given coordinator.
void deleteCoordinator(const std::string & coordinator_id);
/// Подписаться к заданному координатору. Вызывается с исполнителя.
/// Subscribe the performer, from which this method is called, to a given coordinator.
UInt64 subscribe(const std::string & coordinator_id, const std::string & query);
/// Отменить подпись к заданному координатору. Вызывается с исполнителя.
/// Cancel the aforementionned subscription.
void unsubscribe(const std::string & coordinator_id);
/// Увеличить количество партиций входящих в одну распределённую задачу. Вызывается с исполнителя.
void addPartitions(const std::string & coordinator_id, const PartitionList & partition_list);
@ -82,13 +80,14 @@ public:
/// Returns an iterator to the beginning of the list of uncoordinated jobs.
ReshardingWorker::PartitionList::iterator categorizePartitions(const std::string & coordinator_id,
ReshardingWorker::PartitionList & partition_list);
/// Получить количество партиций входящих в одну распределённую задачу. Вызывается с исполнителя.
/// Get the number of partitions of a distributed job. Called from performers.
size_t getPartitionCount(const std::string & coordinator_id);
/// Получить количество учавствующих узлов.
/// Get the number of performers.
size_t getNodeCount(const std::string & coordinator_id);
/// Ждать завершение проверок на всех исполнителях. Вызывается с исполнителя.
/// Wait for all the preliminary sanity checks to be completed on all the performers.
/// Called from performers.
void waitForCheckCompletion(const std::string & coordinator_id);
/// Ждать завершение всех необходмых отмен подписей.
/// Wait for all the required unsubscribe operations to be completed.
void waitForOptOutCompletion(const std::string & coordinator_id, size_t count);
/// Set the shard-independent status of a given coordinator.
@ -99,11 +98,13 @@ public:
zkutil::RWLock createDeletionLock(const std::string & coordinator_id);
/// Dump the status messages of the coordinator and all the participating nodes.
/// Dump the status messages of the coordinator and all the performers.
std::string dumpCoordinatorState(const std::string & coordinator_id);
static std::string computeHashFromPart(const std::string & path);
private:
/// Anomalies that may be detected by the local node.
/// Anomalies that may be detected by the current performer.
enum AnomalyType
{
ANOMALY_NONE = 0,
@ -179,19 +180,21 @@ private:
};
private:
/// Следить за появлением новых задач. Выполнить их последовательно.
void pollAndExecute();
/// Keep track of newly submitted jobs. Perform them sequentially.
void trackAndPerform();
/// Подтолкнуть планировщик задач.
void jabScheduler();
/// Wake up the tracker thread if it was ever sleeping.
void wakeUpTrackerThread();
/// Выполнить задачи, которые были в очереди выполнения при запуске узла.
/// Perform all the jobs that were already pending when the ClickHouse instance
/// on this node was starting.
void performPendingJobs();
/// Выполнить задачи, которые заданы по путям в БД ZooKeeper.
/// Sequentially perform jobs which are specified by their corresponding
/// znodes in ZooKeeper.
void perform(const Strings & job_nodes);
/// Выполнить одну задачу.
/// Perform one job.
void perform(const std::string & job_descriptor, const std::string & job_name);
/// Разбить куски входящие в партицию на несколько, согласно ключу шардирования.
@ -199,13 +202,12 @@ private:
/// При завершении этого процесса создаётся новая партиция для каждого шарда.
void createShardedPartitions();
/// Копировать все партиции полученные путём перешардирования на каждую реплику
/// соответствующих шардов.
/// Upload all the partitions resulting from source partition resharding to their
/// respective target shards.
void publishShardedPartitions();
/// Для каждого шарда добавить данные из новой партиции этого шарда в таблицу на всех
/// репликах входящих в этот же шард. На локальном узле, который выполняет задачу
/// перешардирования, удалить данные из первоначальной партиции.
/// On each target shard attach its corresponding new sharded partition.
/// Locally drop the source partition.
void commit();
void repairLogRecord(LogRecord & log_record);
@ -214,14 +216,18 @@ private:
void executeAttach(LogRecord & log_record);
bool checkAttachLogRecord(LogRecord & log_record);
/// Удалить временные данные с локального узла и ZooKeeper'а.
/// Delete temporary data from the local node and ZooKeeper.
void softCleanup();
void hardCleanup();
/// Принудительно завершить поток, если выполнено условие.
void abortPollingIfRequested();
/// Forcibly abort the job tracker thread if requested.
void abortTrackingIfRequested();
/// Forcibly abort the distributed job coordinated by the given coordinator
/// if requested.
void abortCoordinatorIfRequested(const std::string & coordinator_id);
/// Forcibly abort the recovery of the current distributed job if requested.
void abortRecoveryIfRequested();
/// Forcibly abort the current job if requested.
void abortJobIfRequested();
/// Get the current job-independent status of the coordinator.
@ -229,22 +235,25 @@ private:
/// Get the status of the current distributed job.
StatusCode getStatus();
/// Зарегистрировать задачу в соответствующий координатор.
void attachJob();
/// Снять задачу с координатора.
void detachJob();
/// Ждать завершение загрузок на всех исполнителях.
/// Prepare the current job for execution.
void initializeJob();
/// Remove the current job's data from its coordinator.
/// If there is no job left, also delete the coordinator.
void finalizeJob();
/// Wait for all the necesssary sharded partitions uploads to their respective
/// target shards to be completed.
void waitForUploadCompletion();
///
/// Wait for all the performers to be checked in for leader election.
void waitForElectionCompletion();
/// Wait for all the partition operations to be completed on all the participating nodes.
/// Wait for all the changes required by the current distributed resharding job
/// to be applied by all the performers on all the target shards.
void waitForCommitCompletion();
size_t getPartitionCountUnlocked(const std::string & coordinator_id);
/// Detect offline nodes under a given coordinator.
/// Detect offline performers under a given coordinator.
bool detectOfflineNodes(const std::string & coordinator_id);
/// Detect offline nodes under the current job.
/// Detect offline performers under the current job.
bool detectOfflineNodes();
bool isPublished();
@ -256,28 +265,34 @@ private:
bool isCommitted();
void markAsCommitted();
///
void storeTargetShards();
///
ShardList getTargetShards(const std::string & hostname, const std::string & job_name);
/// Store onto ZooKeeper information about the target shards. It is used
/// in order to create the log of operations that apply changes all the
/// changes required by the resharding operation.
void storeTargetShardsInfo();
///
/// Retrieve from ZooKeeper the aforementioned information about the target shards.
ShardList getTargetShardsInfo(const std::string & hostname, const std::string & job_name);
/// Create the log of operations.
void createLog();
void electLeader();
bool isLeader();
/// Функции, которые создают необходимые объекты для синхронизации
/// распределённых задач.
zkutil::RWLock createLock();
zkutil::RWLock createCoordinatorLock(const std::string & coordinator_id,
/// Access the global lock handler.
zkutil::RWLock getGlobalLock();
/// Acccess the given coordinator lock handler.
zkutil::RWLock getCoordinatorLock(const std::string & coordinator_id,
bool usable_in_emergency = false);
zkutil::SingleBarrier createCheckBarrier(const std::string & coordinator_id);
zkutil::SingleBarrier createOptOutBarrier(const std::string & coordinator_id, size_t count);
zkutil::SingleBarrier createRecoveryBarrier(const ReshardingJob & job);
zkutil::SingleBarrier createUploadBarrier(const ReshardingJob & job);
zkutil::SingleBarrier createElectionBarrier(const ReshardingJob & job);
zkutil::SingleBarrier createCommitBarrier(const ReshardingJob & job);
/// The following functions are to design access barrier handlers we use
/// to synchronize the progression of distributed jobs.
zkutil::SingleBarrier getCheckBarrier(const std::string & coordinator_id);
zkutil::SingleBarrier getOptOutBarrier(const std::string & coordinator_id, size_t count);
zkutil::SingleBarrier getRecoveryBarrier();
zkutil::SingleBarrier getUploadBarrier();
zkutil::SingleBarrier getElectionBarrier();
zkutil::SingleBarrier getCommitBarrier();
/// Prevent merging jobs from being performed on the partition that we
/// want to reshard on the current host. This operation is persistent:
@ -292,9 +307,9 @@ private:
/// Get the ZooKeeper path of a given coordinator.
std::string getCoordinatorPath(const std::string & coordinator_id) const;
/// Get the ZooKeeper path of a given job partition.
std::string getPartitionPath(const ReshardingJob & job) const;
///
std::string getLocalJobPath(const ReshardingJob & job) const;
std::string getPartitionPath() const;
/// Get the ZooKeeper path of the job currently running on this performer.
std::string getLocalJobPath() const;
/// Common code for softCleanup() and hardCleanup().
void deleteTemporaryData();
@ -370,7 +385,7 @@ private:
private:
ReshardingJob current_job;
std::thread polling_thread;
std::thread job_tracker;
AnomalyMonitor anomaly_monitor{*this};
std::string task_queue_path;

View File

@ -1,84 +0,0 @@
#include <DB/Common/SHA512Utils.h>
#include <DB/IO/ReadBufferFromFile.h>
#include <DB/IO/HexWriteBuffer.h>
#include <Poco/File.h>
#include <vector>
#include <deque>
#include <array>
namespace SHA512Utils
{
void updateHash(SHA512_CTX & ctx, const std::string & path)
{
constexpr size_t buffer_size = 1024;
std::deque<std::string> to_process;
to_process.push_back(path);
while (!to_process.empty())
{
std::string filename = to_process.front();
Poco::File cur{filename};
to_process.pop_front();
if (cur.isFile())
{
DB::ReadBufferFromFile buf{filename};
while (!buf.eof())
{
std::array<char, buffer_size> in;
size_t read_count = buf.read(in.data(), in.size());
SHA512_Update(&ctx, reinterpret_cast<const void*>(in.data()), read_count);
}
}
else
{
std::vector<std::string> files;
cur.list(files);
for (const auto & file : files)
to_process.push_back(path + "/" + file);
}
}
}
std::string computeHashFromString(const std::string & in)
{
unsigned char hash[SHA512_DIGEST_LENGTH];
SHA512_CTX ctx;
SHA512_Init(&ctx);
SHA512_Update(&ctx, reinterpret_cast<const void *>(in.data()), in.size());
SHA512_Final(hash, &ctx);
std::string out;
{
DB::WriteBufferFromString buf{out};
DB::HexWriteBuffer hex_buf{buf};
hex_buf.write(reinterpret_cast<const char *>(hash), sizeof(hash));
}
return out;
}
std::string computeHashFromFolder(const std::string & path)
{
unsigned char hash[SHA512_DIGEST_LENGTH];
SHA512_CTX ctx;
SHA512_Init(&ctx);
updateHash(ctx, path);
SHA512_Final(hash, &ctx);
std::string out;
{
DB::WriteBufferFromString buf{out};
DB::HexWriteBuffer hex_buf{buf};
hex_buf.write(reinterpret_cast<const char *>(hash), sizeof(hash));
}
return out;
}
}

View File

@ -17,7 +17,6 @@
#include <DB/Functions/FunctionFactory.h>
#include <Poco/DirectoryIterator.h>
#include <DB/Common/Increment.h>
#include <DB/Common/SHA512Utils.h>
#include <algorithm>
#include <iomanip>
@ -1252,44 +1251,6 @@ size_t MergeTreeData::getPartitionSize(const std::string & partition_name) const
return size;
}
std::string MergeTreeData::computePartitionHash(const std::string & partition_name) const
{
unsigned char hash[SHA512_DIGEST_LENGTH];
SHA512_CTX ctx;
SHA512_Init(&ctx);
Poco::DirectoryIterator end;
Poco::DirectoryIterator end2;
for (Poco::DirectoryIterator it(full_path); it != end; ++it)
{
const auto filename = it.name();
if (!ActiveDataPartSet::isPartDirectory(filename))
continue;
if (0 != filename.compare(0, partition_name.size(), partition_name))
continue;
const auto part_path = it.path().absolute().toString();
for (Poco::DirectoryIterator it2(part_path); it2 != end2; ++it2)
{
const auto part_file_path = it2.path().absolute().toString();
SHA512Utils::updateHash(ctx, part_file_path);
}
}
SHA512_Final(hash, &ctx);
std::string out;
{
WriteBufferFromString buf(out);
HexWriteBuffer hex_buf(buf);
hex_buf.write(reinterpret_cast<const char *>(hash), sizeof(hash));
}
return out;
}
static std::pair<String, DayNum_t> getMonthNameAndDayNum(const Field & partition)
{
String month_name = partition.getType() == Field::Types::UInt64

View File

@ -1,9 +1,9 @@
#include <DB/Storages/MergeTree/RemotePartChecker.h>
#include <DB/Storages/MergeTree/ReshardingWorker.h>
#include <DB/Storages/StorageReplicatedMergeTree.h>
#include <DB/IO/ReadBufferFromHTTP.h>
#include <DB/IO/ReadHelpers.h>
#include <DB/IO/WriteHelpers.h>
#include <DB/Common/SHA512Utils.h>
namespace DB
{
@ -45,7 +45,7 @@ void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & body
status = Status::NOT_FOUND;
else
{
auto computed_hash = SHA512Utils::computeHashFromFolder(part_path);
auto computed_hash = ReshardingWorker::computeHashFromPart(part_path);
if (computed_hash != hash)
status = Status::INCONSISTENT;
}

View File

@ -7,13 +7,14 @@
#include <DB/Storages/MergeTree/MergeTreeBlockInputStream.h>
#include <DB/Storages/StorageReplicatedMergeTree.h>
#include <DB/IO/ReadBufferFromFile.h>
#include <DB/IO/ReadBufferFromString.h>
#include <DB/IO/ReadHelpers.h>
#include <DB/IO/WriteBufferFromString.h>
#include <DB/IO/ReadHelpers.h>
#include <DB/IO/WriteHelpers.h>
#include <DB/Common/getFQDNOrHostName.h>
#include <DB/Common/SHA512Utils.h>
#include <DB/Common/SipHash.h>
#include <DB/Interpreters/executeQuery.h>
#include <DB/Interpreters/Context.h>
@ -27,8 +28,6 @@
#include <Poco/DirectoryIterator.h>
#include <Poco/File.h>
#include <openssl/sha.h>
#include <future>
#include <chrono>
#include <cstdlib>
@ -64,6 +63,7 @@ namespace ErrorCodes
namespace
{
constexpr size_t hash_size = 16;
constexpr long wait_duration = 1000;
/// Helper class which extracts from the ClickHouse configuration file
@ -101,7 +101,7 @@ private:
};
/// Helper class we use to read and write the status of a coordinator
/// or a node that has subscribed to a coordinator.
/// or a performer that has subscribed to a coordinator.
/// The status format is: status_code [, optional_message]
class Status final
{
@ -145,6 +145,65 @@ private:
std::string msg;
};
/// Compute the hash value of a specified string.
/// The hash function we use is SipHash.
std::string computeHashFromString(const std::string & in)
{
SipHash hash;
hash.update(in.data(), in.size());
char out[hash_size];
hash.get128(out);
return {out, hash_size};
}
/// Compute the value value from the checksum files of a given partition.
/// The hash function we use is SipHash.
std::string computeHashFromPartition(const std::string & data_path, const std::string & partition_name)
{
std::vector<std::string> files;
Poco::DirectoryIterator end;
Poco::DirectoryIterator end2;
for (Poco::DirectoryIterator it(data_path); it != end; ++it)
{
const auto filename = it.name();
if (!ActiveDataPartSet::isPartDirectory(filename))
continue;
if (0 != filename.compare(0, partition_name.size(), partition_name))
continue;
const auto part_path = it.path().absolute().toString();
for (Poco::DirectoryIterator it2(part_path); it2 != end2; ++it2)
{
const auto & filename = it2.name();
if (filename == "checksums.txt")
files.push_back(it.path().absolute().toString());
}
}
std::sort(files.begin(), files.end());
SipHash hash;
for (const auto & file : files)
{
ReadBufferFromFile buf{file};
while (buf.next())
{
size_t byte_count = buf.buffer().end() - buf.position();
hash.update(buf.position(), byte_count);
}
}
char out[hash_size];
hash.get128(out);
return {out, hash_size};
}
}
/// Job structure:
@ -196,7 +255,7 @@ private:
/// At the highest level we have under the /resharding_distributed znode:
///
/// /lock: global distributed read/write lock;
/// /online: currently online nodes;
/// /online: currently online performers;
/// /coordination: one znode for each coordinator.
///
/// A coordinator whose identifier is ${id} has the following layout
@ -206,17 +265,16 @@ private:
///
/// /deletion_lock: for safe coordinator deletion
///
/// /query_hash: hash value obtained from the query that
/// is sent to the participating nodes;
/// /query_hash: hash value obtained from the query that is sent to the performers;
///
/// /increment: unique block number allocator;
///
/// /status: coordinator status before its participating nodes have subscribed;
/// /status: coordinator status before its performers have subscribed;
///
/// /status/${host}: status if an individual participating node;
/// /status/${host}: status of an individual performer;
///
/// /status_probe: node that we update just after having updated either the status
/// of a participating node or the status of the coordinator as a whole;
/// /status_probe: znode that we update just after having updated either the status
/// of a performer or the status of the coordinator as a whole;
///
/// /cluster: cluster on which the distributed job is to be performed;
///
@ -229,30 +287,30 @@ private:
///
/// /shards: the list of shards that have subscribed;
///
/// /subscribe_barrier: when all the participating nodes have subscribed
/// to their coordinator, proceed further
/// /subscribe_barrier: when all the performers have subscribed to their coordinator,
/// proceed further
///
/// /check_barrier: when all the participating nodes have checked
/// that they can perform resharding operations, proceed further;
/// /check_barrier: when all the performers have checked that they can perform
/// resharding operations, proceed further;
///
/// /opt_out_barrier: after having crossed this barrier, each node of the cluster
/// knows exactly whether it will take part in distributed jobs or not.
///
/// /partitions: partitions that must be resharded on more than one shard;
///
/// /partitions/${partition_id}/nodes: participating nodes;
/// /partitions/${partition_id}/nodes: performers;
///
/// /partitions/${partition_id}/upload_barrier: when all the participating
/// nodes have uploaded new data to their respective replicas, we can apply changes;
/// /partitions/${partition_id}/upload_barrier: when all the performers have uploaded
/// new data to the target shards, we can apply changes;
///
/// /partitions/${partition_id}/election_barrier: used for the election of
/// a leader among the participating nodes;
/// a leader among the performers;
///
/// /partitions/${partition_id}/commit_barrier: crossed when all the changes
/// have been applied on the target nodes;
/// required by a resharding operation have been applied;
///
/// /partitions/${partition_id}/recovery_barrier: recovery if
/// one or several participating nodes had previously gone offline.
/// /partitions/${partition_id}/recovery_barrier: recovery if one or several
/// performers had previously gone offline.
///
ReshardingWorker::ReshardingWorker(const Poco::Util::AbstractConfiguration & config,
@ -306,7 +364,7 @@ ReshardingWorker::~ReshardingWorker()
void ReshardingWorker::start()
{
polling_thread = std::thread{&ReshardingWorker::pollAndExecute, this};
job_tracker = std::thread{&ReshardingWorker::trackAndPerform, this};
}
void ReshardingWorker::shutdown()
@ -314,8 +372,8 @@ void ReshardingWorker::shutdown()
if (is_started)
{
must_stop = true;
if (polling_thread.joinable())
polling_thread.join();
if (job_tracker.joinable())
job_tracker.join();
is_started = false;
}
}
@ -333,7 +391,7 @@ bool ReshardingWorker::isStarted() const
return is_started;
}
void ReshardingWorker::pollAndExecute()
void ReshardingWorker::trackAndPerform()
{
std::string error_msg;
@ -382,7 +440,7 @@ void ReshardingWorker::pollAndExecute()
do
{
abortPollingIfRequested();
abortTrackingIfRequested();
}
while (!event->tryWait(wait_duration));
}
@ -429,17 +487,17 @@ void ReshardingWorker::pollAndExecute()
LOG_DEBUG(log, "Resharding background thread terminated.");
}
void ReshardingWorker::jabScheduler()
void ReshardingWorker::wakeUpTrackerThread()
{
/// We inform the job scheduler that something has just happened. This forces
/// the scheduler to fetch all the current pending jobs. We need this when a
/// We inform the job tracker thread that something has just happened. This forces
/// the job tracker to fetch all the current pending jobs. We need this when a
/// distributed job is not ready to be performed yet. Otherwise if no new jobs
/// were submitted, we wouldn't be able to check again whether we can perform
/// the distributed job.
event->set();
/// Sleep for 3 time units in order to prevent scheduler overloading
/// if we were the only job in the queue.
/// Sleep for 3 time units in order to prevent overloading of the job tracker
/// thread if we are the only job in the queue.
auto zookeeper = context.getZooKeeper();
if (zookeeper->getChildren(host_task_queue_path).size() == 1)
std::this_thread::sleep_for(3 * std::chrono::milliseconds(wait_duration));
@ -557,7 +615,7 @@ void ReshardingWorker::perform(const std::string & job_descriptor, const std::st
}
else
{
/// An error has occurred on this node.
/// An error has occurred on this performer.
if (current_job.isCoordinated())
{
setStatus(current_job.coordinator_id, getFQDNOrHostName(), STATUS_ERROR,
@ -577,7 +635,7 @@ void ReshardingWorker::perform(const std::string & job_descriptor, const std::st
try
{
attachJob();
initializeJob();
{
ScopedAnomalyMonitor scoped_anomaly_monitor{anomaly_monitor};
@ -588,7 +646,7 @@ void ReshardingWorker::perform(const std::string & job_descriptor, const std::st
if (!isPublished())
{
createShardedPartitions();
storeTargetShards();
storeTargetShardsInfo();
publishShardedPartitions();
deleteTemporaryData();
markAsPublished();
@ -596,10 +654,12 @@ void ReshardingWorker::perform(const std::string & job_descriptor, const std::st
waitForUploadCompletion();
/// If the current job is part of a distributed job, take participation
/// in a leader election among all the participating nodes. All the
/// participating nodes drop their source partition. Moreover the leader
/// sends all the required attach requests to the target shards.
/// If the current job is part of a distributed job, participate in a
/// leader election among all the performers. This is required because
/// of the following: when changes are applied on the target shards,
/// all the performers drop their respective source partitions; in
/// addition the leader sends all the required attach requests to the
/// target shards.
electLeader();
/// Build into persistent storage a log consisting of a description
@ -617,7 +677,7 @@ void ReshardingWorker::perform(const std::string & job_descriptor, const std::st
}
/// A distributed job is considered to be complete if and only if
/// changes have been committed on all the participating nodes.
/// changes have been committed by all the performers.
waitForCommitCompletion();
}
}
@ -628,9 +688,9 @@ void ReshardingWorker::perform(const std::string & job_descriptor, const std::st
if (ex.code() == ErrorCodes::ABORTED)
{
LOG_DEBUG(log, "Resharding job cancelled.");
/// A soft shutdown is being performed on this node.
/// A soft shutdown is being performed on this performer.
/// Put the current distributed job on hold in order to reliably handle
/// the scenario in which the remote nodes undergo a hard shutdown.
/// the scenario in which the remote performers undergo a hard shutdown.
if (current_job.isCoordinated())
{
setStatus(current_job.coordinator_id, getFQDNOrHostName(), STATUS_ON_HOLD,
@ -641,7 +701,7 @@ void ReshardingWorker::perform(const std::string & job_descriptor, const std::st
}
else if (ex.code() == ErrorCodes::RESHARDING_REMOTE_NODE_UNAVAILABLE)
{
/// A remote node has gone offline.
/// A remote performer has gone offline.
/// Put the current distributed job on hold. Also jab the job scheduler
/// so that it will come accross this distributed job even if no new jobs
/// are submitted.
@ -649,7 +709,7 @@ void ReshardingWorker::perform(const std::string & job_descriptor, const std::st
ex.message());
dumped_coordinator_state = dumpCoordinatorState(current_job.coordinator_id);
softCleanup();
jabScheduler();
wakeUpTrackerThread();
}
else if (ex.code() == ErrorCodes::RESHARDING_REMOTE_NODE_ERROR)
{
@ -662,13 +722,13 @@ void ReshardingWorker::perform(const std::string & job_descriptor, const std::st
unfreezeSourcePartition();
else if (ex.code() == ErrorCodes::RESHARDING_DISTRIBUTED_JOB_ON_HOLD)
{
/// The current distributed job is on hold and one or more required nodes
/// The current distributed job is on hold and one or more required performers
/// have not gone online yet. Jab the job scheduler so that it will come
/// accross this distributed job even if no new jobs are submitted.
setStatus(current_job.coordinator_id, getFQDNOrHostName(), STATUS_ON_HOLD,
ex.message());
dumped_coordinator_state = dumpCoordinatorState(current_job.coordinator_id);
jabScheduler();
wakeUpTrackerThread();
}
else
handle_exception(ex.message(), ex.message());
@ -683,14 +743,14 @@ void ReshardingWorker::perform(const std::string & job_descriptor, const std::st
}
catch (const std::exception & ex)
{
/// An error has occurred on this node.
/// An error has occurred on this performer.
handle_exception("Resharding job cancelled", ex.what());
LOG_ERROR(log, dumped_coordinator_state);
throw;
}
catch (...)
{
/// An error has occurred on this node.
/// An error has occurred on this performer.
handle_exception("Resharding job cancelled", "An unspecified error has occurred");
LOG_ERROR(log, dumped_coordinator_state);
throw;
@ -722,14 +782,16 @@ void ReshardingWorker::createShardedPartitions()
per_shard_data_parts = merger.reshardPartition(current_job);
}
void ReshardingWorker::storeTargetShards()
void ReshardingWorker::storeTargetShardsInfo()
{
LOG_DEBUG(log, "### Storing target shards info");
auto & storage = *(current_job.storage);
MergeTreeData::PerShardDataParts & per_shard_data_parts = storage.data.per_shard_data_parts;
auto zookeeper = context.getZooKeeper();
zookeeper->tryRemove(getLocalJobPath(current_job) + "/shards");
zookeeper->tryRemove(getLocalJobPath() + "/shards");
std::string out;
WriteBufferFromString buf{out};
@ -753,7 +815,7 @@ void ReshardingWorker::storeTargetShards()
continue;
std::string part = storage.data.getFullPath() + "reshard/" + toString(shard_no) + "/" + part_from_shard->name;
auto hash = SHA512Utils::computeHashFromFolder(part);
auto hash = computeHashFromPart(part);
writeVarUInt(shard_no, buf);
writeBinary(part_from_shard->name, buf);
@ -762,11 +824,13 @@ void ReshardingWorker::storeTargetShards()
buf.next();
(void) zookeeper->create(getLocalJobPath(current_job) + "/shards", out,
(void) zookeeper->create(getLocalJobPath() + "/shards", out,
zkutil::CreateMode::Persistent);
LOG_DEBUG(log, "### Stored target shards info");
}
ReshardingWorker::ShardList ReshardingWorker::getTargetShards(const std::string & hostname,
ReshardingWorker::ShardList ReshardingWorker::getTargetShardsInfo(const std::string & hostname,
const std::string & job_name)
{
ShardList shard_list;
@ -911,9 +975,9 @@ void ReshardingWorker::publishShardedPartitions()
if (!current_job.is_aborted)
{
/// We may have caught an error because one or more remote nodes have
/// gone offline while performing I/O. The following check is here to
/// sort out this ambiguity.
/// We may have caught an error because one or more remote performers have
/// gone offline while performing I/O. The following check is here to sort
/// out this ambiguity.
abortJobIfRequested();
}
@ -951,7 +1015,7 @@ void ReshardingWorker::commit()
auto zookeeper = context.getZooKeeper();
auto log_path = getLocalJobPath(current_job) + "/log";
auto log_path = getLocalJobPath() + "/log";
std::vector<LogRecord> log_records;
@ -1073,7 +1137,8 @@ void ReshardingWorker::repairLogRecord(LogRecord & log_record)
found = false;
else
{
auto current_hash = storage.data.computePartitionHash(current_job.partition);
auto current_hash = computeHashFromPartition(storage.data.getFullPath(),
current_job.partition);
if (current_hash != log_record.partition_hash)
{
LOG_WARNING(log, "The source partition " << current_job.partition
@ -1351,23 +1416,23 @@ void ReshardingWorker::executeAttach(LogRecord & log_record)
void ReshardingWorker::electLeader()
{
/// If we are not a distributed job, do nothing since we are obviously the
/// leader. Otherwise each node of the distributed job creates an ephemeral
/// sequential znode onto ZooKeeper persistent storage. When all these nodes
/// leader. Otherwise each performer of the distributed job creates an ephemeral
/// sequential znode onto ZooKeeper persistent storage. When all the performers
/// have entered the game, i.e. the election barrier is released, the winner
/// is the node having the znode with the lowest ID.
/// is the performer having the znode with the lowest ID.
/// Then one of the nodes publishes this piece of information as a new znode.
///
/// In case of failure this election scheme is guaranteed to always succeed:
///
/// 1. If one node experiences a failure, it will eventually get winner
/// information if another node was able to publish it.
/// 1. If one performer experiences a failure, it will eventually get winner
/// information if another performer was able to publish it.
///
/// 2. If all the nodes experience a failure before any of them could publish
/// 2. If all the performers experience a failure before any of them could publish
/// winner information, the election is simply re-run. This is possible since
/// SingleBarrier objects may be, by design, crossed again after release.
///
/// 3. If two nodes A, B get inconsistent winner information because of the
/// failure of a third node C (e.g. A determines that C is the winner, but
/// 3. If two performers A, B get inconsistent winner information because of the
/// failure of a third performer C (e.g. A determines that C is the winner, but
/// then, C fails; consequently the corresponding znode disappears; then B
/// determines that A is the winner), save for any failure, either A or B
/// will succeed to publish first its information. That is indeed all what
@ -1378,8 +1443,8 @@ void ReshardingWorker::electLeader()
LOG_DEBUG(log, "Performing leader election");
auto leader = getPartitionPath(current_job) + "/leader";
auto election_path = getPartitionPath(current_job) + "/leader_election";
auto leader = getPartitionPath() + "/leader";
auto election_path = getPartitionPath() + "/leader_election";
auto zookeeper = context.getZooKeeper();
@ -1404,7 +1469,7 @@ bool ReshardingWorker::isLeader()
return true;
auto zookeeper = context.getZooKeeper();
return zookeeper->get(getPartitionPath(current_job) + "/leader") == getFQDNOrHostName();
return zookeeper->get(getPartitionPath() + "/leader") == getFQDNOrHostName();
}
void ReshardingWorker::createLog()
@ -1414,7 +1479,7 @@ void ReshardingWorker::createLog()
auto & storage = *(current_job.storage);
auto zookeeper = context.getZooKeeper();
auto log_path = getLocalJobPath(current_job) + "/log";
auto log_path = getLocalJobPath() + "/log";
if (zookeeper->exists(log_path))
{
@ -1426,13 +1491,14 @@ void ReshardingWorker::createLog()
(void) zookeeper->create(log_path, "", zkutil::CreateMode::Persistent);
/// If the keyword COPY is not specified, a drop request is performed on
/// each participating node.
/// each performer.
if (!current_job.do_copy)
{
LogRecord log_record{zookeeper};
log_record.operation = LogRecord::OP_DROP;
log_record.partition = current_job.partition;
log_record.partition_hash = storage.data.computePartitionHash(current_job.partition);
log_record.partition_hash = computeHashFromPartition(storage.data.getFullPath(),
current_job.partition);
log_record.state = LogRecord::READY;
log_record.enqueue(log_path);
@ -1447,18 +1513,18 @@ void ReshardingWorker::createLog()
if (current_job.isCoordinated())
{
auto nodes = zookeeper->getChildren(getPartitionPath(current_job) + "/nodes");
auto nodes = zookeeper->getChildren(getPartitionPath() + "/nodes");
for (const auto & node : nodes)
{
auto job_name = zookeeper->get(getPartitionPath(current_job) + "/nodes/" + node);
ShardList shards_from_node = getTargetShards(node, job_name);
auto job_name = zookeeper->get(getPartitionPath() + "/nodes/" + node);
ShardList shards_from_node = getTargetShardsInfo(node, job_name);
for (const TargetShardInfo & shard_info : shards_from_node)
shard_to_info[shard_info.shard_no].push_back(shard_info);
}
}
else
{
ShardList shards_from_node = getTargetShards(getFQDNOrHostName(), current_job.job_name);
ShardList shards_from_node = getTargetShardsInfo(getFQDNOrHostName(), current_job.job_name);
for (const TargetShardInfo & shard_info : shards_from_node)
shard_to_info[shard_info.shard_no].push_back(shard_info);
}
@ -1492,7 +1558,7 @@ void ReshardingWorker::hardCleanup()
{
LOG_DEBUG(log, "Performing cleanup.");
deleteTemporaryData();
detachJob();
finalizeJob();
current_job.clear();
}
@ -1517,7 +1583,7 @@ std::string ReshardingWorker::createCoordinator(const Cluster & cluster)
const std::string cluster_name = cluster.getName();
auto zookeeper = context.getZooKeeper();
auto lock = createLock();
auto lock = getGlobalLock();
zkutil::RWLock::Guard<zkutil::RWLock::Write> guard{lock};
auto coordinators = zookeeper->getChildren(coordination_path);
@ -1607,7 +1673,7 @@ void ReshardingWorker::registerQuery(const std::string & coordinator_id, const s
{
auto zookeeper = context.getZooKeeper();
(void) zookeeper->create(getCoordinatorPath(coordinator_id) + "/query_hash",
SHA512Utils::computeHashFromString(query), zkutil::CreateMode::Persistent);
computeHashFromString(query), zkutil::CreateMode::Persistent);
}
void ReshardingWorker::deleteCoordinator(const std::string & coordinator_id)
@ -1633,7 +1699,7 @@ UInt64 ReshardingWorker::subscribe(const std::string & coordinator_id, const std
/// Make sure that this shard is not busy in another distributed job.
{
auto lock = createLock();
auto lock = getGlobalLock();
zkutil::RWLock::Guard<zkutil::RWLock::Read> guard{lock};
auto coordinators = zookeeper->getChildren(coordination_path);
@ -1654,7 +1720,7 @@ UInt64 ReshardingWorker::subscribe(const std::string & coordinator_id, const std
UInt64 block_number;
{
auto lock = createCoordinatorLock(coordinator_id);
auto lock = getCoordinatorLock(coordinator_id);
zkutil::RWLock::Guard<zkutil::RWLock::Write> guard{lock};
/// Make sure that the query ALTER TABLE RESHARD with the "COORDINATE WITH" tag
@ -1670,7 +1736,7 @@ UInt64 ReshardingWorker::subscribe(const std::string & coordinator_id, const std
/// Check that the coordinator recognizes our query.
auto query_hash = zookeeper->get(getCoordinatorPath(coordinator_id) + "/query_hash");
if (SHA512Utils::computeHashFromString(query) != query_hash)
if (computeHashFromString(query) != query_hash)
throw Exception{"Coordinator " + coordinator_id + " does not handle this query",
ErrorCodes::RESHARDING_INVALID_QUERY};
@ -1688,7 +1754,7 @@ UInt64 ReshardingWorker::subscribe(const std::string & coordinator_id, const std
zookeeper->create(getCoordinatorPath(coordinator_id) + "/status/"
+ current_host, Status(STATUS_OK, "").toString(), zkutil::CreateMode::Persistent);
/// Assign a unique block number to the current node. We will use it in order
/// Assign a unique block number to the current performer. We will use it in order
/// to avoid any possible conflict when uploading resharded partitions.
auto current_block_number = zookeeper->get(getCoordinatorPath(coordinator_id) + "/increment");
block_number = std::stoull(current_block_number);
@ -1706,7 +1772,7 @@ void ReshardingWorker::unsubscribe(const std::string & coordinator_id)
auto zookeeper = context.getZooKeeper();
auto lock = createCoordinatorLock(coordinator_id);
auto lock = getCoordinatorLock(coordinator_id);
zkutil::RWLock::Guard<zkutil::RWLock::Write> guard{lock};
auto current_host = getFQDNOrHostName();
@ -1724,7 +1790,7 @@ void ReshardingWorker::addPartitions(const std::string & coordinator_id,
{
auto zookeeper = context.getZooKeeper();
auto lock = createCoordinatorLock(coordinator_id);
auto lock = getCoordinatorLock(coordinator_id);
zkutil::RWLock::Guard<zkutil::RWLock::Write> guard{lock};
auto current_host = getFQDNOrHostName();
@ -1765,7 +1831,7 @@ ReshardingWorker::PartitionList::iterator ReshardingWorker::categorizePartitions
int j = size;
{
auto lock = createCoordinatorLock(coordinator_id);
auto lock = getCoordinatorLock(coordinator_id);
zkutil::RWLock::Guard<zkutil::RWLock::Write> guard{lock};
while (true)
@ -1802,7 +1868,7 @@ ReshardingWorker::PartitionList::iterator ReshardingWorker::categorizePartitions
size_t ReshardingWorker::getPartitionCount(const std::string & coordinator_id)
{
auto lock = createCoordinatorLock(coordinator_id);
auto lock = getCoordinatorLock(coordinator_id);
zkutil::RWLock::Guard<zkutil::RWLock::Read> guard{lock};
return getPartitionCountUnlocked(coordinator_id);
@ -1816,7 +1882,7 @@ size_t ReshardingWorker::getPartitionCountUnlocked(const std::string & coordinat
size_t ReshardingWorker::getNodeCount(const std::string & coordinator_id)
{
auto lock = createCoordinatorLock(coordinator_id);
auto lock = getCoordinatorLock(coordinator_id);
zkutil::RWLock::Guard<zkutil::RWLock::Read> guard{lock};
auto zookeeper = context.getZooKeeper();
@ -1830,12 +1896,12 @@ void ReshardingWorker::waitForCheckCompletion(const std::string & coordinator_id
/// having crosssed this barrier, we set up a timeout for safety
/// purposes.
auto timeout = context.getSettingsRef().resharding_barrier_timeout;
createCheckBarrier(coordinator_id).enter(timeout);
getCheckBarrier(coordinator_id).enter(timeout);
}
void ReshardingWorker::waitForOptOutCompletion(const std::string & coordinator_id, size_t count)
{
createOptOutBarrier(coordinator_id, count).enter();
getOptOutBarrier(coordinator_id, count).enter();
}
void ReshardingWorker::setStatus(const std::string & coordinator_id, StatusCode status,
@ -1862,7 +1928,7 @@ bool ReshardingWorker::detectOfflineNodes(const std::string & coordinator_id)
bool ReshardingWorker::detectOfflineNodes()
{
return detectOfflineNodesCommon(getPartitionPath(current_job) + "/nodes", current_job.coordinator_id);
return detectOfflineNodesCommon(getPartitionPath() + "/nodes", current_job.coordinator_id);
}
bool ReshardingWorker::detectOfflineNodesCommon(const std::string & path, const std::string & coordinator_id)
@ -1894,39 +1960,39 @@ bool ReshardingWorker::detectOfflineNodesCommon(const std::string & path, const
bool ReshardingWorker::isPublished()
{
auto zookeeper = context.getZooKeeper();
return zookeeper->exists(getLocalJobPath(current_job) + "/is_published");
return zookeeper->exists(getLocalJobPath() + "/is_published");
}
void ReshardingWorker::markAsPublished()
{
auto zookeeper = context.getZooKeeper();
(void) zookeeper->create(getLocalJobPath(current_job) + "/is_published",
(void) zookeeper->create(getLocalJobPath() + "/is_published",
"", zkutil::CreateMode::Persistent);
}
bool ReshardingWorker::isLogCreated()
{
auto zookeeper = context.getZooKeeper();
return zookeeper->exists(getLocalJobPath(current_job) + "/is_log_created");
return zookeeper->exists(getLocalJobPath() + "/is_log_created");
}
void ReshardingWorker::markLogAsCreated()
{
auto zookeeper = context.getZooKeeper();
(void) zookeeper->create(getLocalJobPath(current_job) + "/is_log_created",
(void) zookeeper->create(getLocalJobPath() + "/is_log_created",
"", zkutil::CreateMode::Persistent);
}
bool ReshardingWorker::isCommitted()
{
auto zookeeper = context.getZooKeeper();
return zookeeper->exists(getLocalJobPath(current_job) + "/is_committed");
return zookeeper->exists(getLocalJobPath() + "/is_committed");
}
void ReshardingWorker::markAsCommitted()
{
auto zookeeper = context.getZooKeeper();
(void) zookeeper->create(getLocalJobPath(current_job) + "/is_committed",
(void) zookeeper->create(getLocalJobPath() + "/is_committed",
"", zkutil::CreateMode::Persistent);
}
@ -1937,7 +2003,7 @@ ReshardingWorker::StatusCode ReshardingWorker::getCoordinatorStatus(const std::s
ReshardingWorker::StatusCode ReshardingWorker::getStatus()
{
return getStatusCommon(getPartitionPath(current_job) + "/nodes", current_job.coordinator_id);
return getStatusCommon(getPartitionPath() + "/nodes", current_job.coordinator_id);
}
ReshardingWorker::StatusCode ReshardingWorker::getStatusCommon(const std::string & path, const std::string & coordinator_id)
@ -2026,6 +2092,40 @@ std::string ReshardingWorker::dumpCoordinatorState(const std::string & coordinat
return out;
}
/// Compute the hash function from the checksum files of a given part.
/// The hash function we use is SipHash.
std::string ReshardingWorker::computeHashFromPart(const std::string & path)
{
std::vector<std::string> files;
Poco::DirectoryIterator end;
for (Poco::DirectoryIterator it(path); it != end; ++it)
{
const auto & filename = it.name();
if (filename == "checksums.txt")
files.push_back(it.path().absolute().toString());
}
std::sort(files.begin(), files.end());
SipHash hash;
for (const auto & file : files)
{
ReadBufferFromFile buf{file};
while (buf.next())
{
size_t byte_count = buf.buffer().end() - buf.position();
hash.update(buf.position(), byte_count);
}
}
char out[hash_size];
hash.get128(out);
return {out, hash_size};
}
ReshardingWorker::AnomalyType ReshardingWorker::probeForAnomaly()
{
AnomalyType anomaly_type = ANOMALY_NONE;
@ -2090,7 +2190,7 @@ void ReshardingWorker::processAnomaly(AnomalyType anomaly_type)
throw Exception{"An error occurred on local node", ErrorCodes::LOGICAL_ERROR};
}
void ReshardingWorker::attachJob()
void ReshardingWorker::initializeJob()
{
if (!current_job.isCoordinated())
return;
@ -2101,18 +2201,19 @@ void ReshardingWorker::attachJob()
if (status == STATUS_ERROR)
{
/// This case is triggered when an error occured on a participating node
/// while we went offline.
/// This case is triggered if an error occured on a performer
/// while we were going offline.
throw Exception{"An error occurred on a remote node", ErrorCodes::RESHARDING_REMOTE_NODE_ERROR};
}
else if (status == STATUS_ON_HOLD)
{
/// The current distributed job is on hold. Check that all the required nodes
/// are online. If it is so, wait for them to be ready to perform the job.
/// The current distributed job is on hold. Check that all the required
/// performers are online. If it is so, wait for them to be ready to
/// perform the job.
setStatus(current_job.coordinator_id, getFQDNOrHostName(), STATUS_OK);
createRecoveryBarrier(current_job).enter();
getRecoveryBarrier().enter();
/// Catch any error that could have happened while crossing the barrier.
processAnomaly(probeForAnomaly());
@ -2121,7 +2222,7 @@ void ReshardingWorker::attachJob()
{
/// For the purpose of creating the log, we need to know the locations
/// of all the jobs that constitute this distributed job.
zookeeper->set(getPartitionPath(current_job) + "/nodes/" + getFQDNOrHostName(),
zookeeper->set(getPartitionPath() + "/nodes/" + getFQDNOrHostName(),
current_job.job_name);
}
else
@ -2132,7 +2233,7 @@ void ReshardingWorker::attachJob()
}
}
void ReshardingWorker::detachJob()
void ReshardingWorker::finalizeJob()
{
if (!current_job.isCoordinated())
return;
@ -2142,24 +2243,24 @@ void ReshardingWorker::detachJob()
bool delete_coordinator = false;
{
/// detachJob() may be called when an error has occurred. For this reason,
/// in the call to createCoordinatorLock(), the flag may_use_in_emergency
/// finalizeJob() may be called when an error has occurred. For this reason,
/// in the call to getCoordinatorLock(), the flag may_use_in_emergency
/// is set to true: local or remote errors won't interrupt lock acquisition.
auto lock = createCoordinatorLock(current_job.coordinator_id, true);
auto lock = getCoordinatorLock(current_job.coordinator_id, true);
zkutil::RWLock::Guard<zkutil::RWLock::Write> guard{lock};
auto children = zookeeper->getChildren(getPartitionPath(current_job) + "/nodes");
auto children = zookeeper->getChildren(getPartitionPath() + "/nodes");
if (children.empty())
throw Exception{"ReshardingWorker: unable to detach job", ErrorCodes::LOGICAL_ERROR};
bool was_last_node = children.size() == 1;
auto current_host = getFQDNOrHostName();
zookeeper->remove(getPartitionPath(current_job) + "/nodes/" + current_host);
zookeeper->remove(getPartitionPath() + "/nodes/" + current_host);
if (was_last_node)
{
/// All the participating nodes have processed the current partition.
zookeeper->removeRecursive(getPartitionPath(current_job));
/// All the performers have processed the current partition.
zookeeper->removeRecursive(getPartitionPath());
if (getPartitionCountUnlocked(current_job.coordinator_id) == 0)
{
/// All the partitions of the current distributed job have been processed.
@ -2176,22 +2277,22 @@ void ReshardingWorker::waitForUploadCompletion()
{
if (!current_job.isCoordinated())
return;
createUploadBarrier(current_job).enter();
getUploadBarrier().enter();
}
void ReshardingWorker::waitForElectionCompletion()
{
createElectionBarrier(current_job).enter();
getElectionBarrier().enter();
}
void ReshardingWorker::waitForCommitCompletion()
{
if (!current_job.isCoordinated())
return;
createCommitBarrier(current_job).enter();
getCommitBarrier().enter();
}
void ReshardingWorker::abortPollingIfRequested()
void ReshardingWorker::abortTrackingIfRequested()
{
if (must_stop)
throw Exception{"Cancelled resharding", ErrorCodes::ABORTED};
@ -2222,7 +2323,7 @@ void ReshardingWorker::abortRecoveryIfRequested()
ErrorCodes::RESHARDING_DISTRIBUTED_JOB_ON_HOLD};
else
{
/// We don't throw any exception here because the other nodes waiting
/// We don't throw any exception here because the other performers waiting
/// to cross the recovery barrier would get stuck forever into a loop.
/// Instead we rely on cancellation points to detect this error and
/// therefore terminate the job.
@ -2288,23 +2389,23 @@ void ReshardingWorker::abortJobIfRequested()
processAnomaly(anomaly);
}
zkutil::RWLock ReshardingWorker::createLock()
zkutil::RWLock ReshardingWorker::getGlobalLock()
{
zkutil::RWLock lock{get_zookeeper, distributed_lock_path};
zkutil::RWLock::CancellationHook hook = std::bind(&ReshardingWorker::abortPollingIfRequested, this);
zkutil::RWLock::CancellationHook hook = std::bind(&ReshardingWorker::abortTrackingIfRequested, this);
lock.setCancellationHook(hook);
return lock;
}
zkutil::RWLock ReshardingWorker::createCoordinatorLock(const std::string & coordinator_id,
zkutil::RWLock ReshardingWorker::getCoordinatorLock(const std::string & coordinator_id,
bool usable_in_emergency)
{
zkutil::RWLock lock{get_zookeeper, getCoordinatorPath(coordinator_id) + "/lock"};
zkutil::RWLock::CancellationHook hook;
if (usable_in_emergency)
hook = std::bind(&ReshardingWorker::abortPollingIfRequested, this);
hook = std::bind(&ReshardingWorker::abortTrackingIfRequested, this);
else
hook = std::bind(&ReshardingWorker::abortCoordinatorIfRequested,
this, coordinator_id);
@ -2317,13 +2418,13 @@ zkutil::RWLock ReshardingWorker::createCoordinatorLock(const std::string & coord
zkutil::RWLock ReshardingWorker::createDeletionLock(const std::string & coordinator_id)
{
zkutil::RWLock lock{get_zookeeper, getCoordinatorPath(coordinator_id) + "/deletion_lock"};
zkutil::RWLock::CancellationHook hook = std::bind(&ReshardingWorker::abortPollingIfRequested, this);
zkutil::RWLock::CancellationHook hook = std::bind(&ReshardingWorker::abortTrackingIfRequested, this);
lock.setCancellationHook(hook);
return lock;
}
zkutil::SingleBarrier ReshardingWorker::createCheckBarrier(const std::string & coordinator_id)
zkutil::SingleBarrier ReshardingWorker::getCheckBarrier(const std::string & coordinator_id)
{
auto zookeeper = context.getZooKeeper();
@ -2338,7 +2439,7 @@ zkutil::SingleBarrier ReshardingWorker::createCheckBarrier(const std::string & c
return check_barrier;
}
zkutil::SingleBarrier ReshardingWorker::createOptOutBarrier(const std::string & coordinator_id,
zkutil::SingleBarrier ReshardingWorker::getOptOutBarrier(const std::string & coordinator_id,
size_t count)
{
zkutil::SingleBarrier opt_out_barrier{get_zookeeper, getCoordinatorPath(coordinator_id)
@ -2350,26 +2451,26 @@ zkutil::SingleBarrier ReshardingWorker::createOptOutBarrier(const std::string &
return opt_out_barrier;
}
zkutil::SingleBarrier ReshardingWorker::createRecoveryBarrier(const ReshardingJob & job)
zkutil::SingleBarrier ReshardingWorker::getRecoveryBarrier()
{
auto zookeeper = context.getZooKeeper();
auto node_count = zookeeper->getChildren(getPartitionPath(job) + "/nodes").size();
auto node_count = zookeeper->getChildren(getPartitionPath() + "/nodes").size();
zkutil::SingleBarrier recovery_barrier{get_zookeeper, getPartitionPath(job) + "/recovery_barrier", node_count};
zkutil::SingleBarrier recovery_barrier{get_zookeeper, getPartitionPath() + "/recovery_barrier", node_count};
zkutil::SingleBarrier::CancellationHook hook = std::bind(&ReshardingWorker::abortRecoveryIfRequested, this);
recovery_barrier.setCancellationHook(hook);
return recovery_barrier;
}
zkutil::SingleBarrier ReshardingWorker::createUploadBarrier(const ReshardingJob & job)
zkutil::SingleBarrier ReshardingWorker::getUploadBarrier()
{
auto zookeeper = context.getZooKeeper();
auto node_count = zookeeper->getChildren(getPartitionPath(job) + "/nodes").size();
auto node_count = zookeeper->getChildren(getPartitionPath() + "/nodes").size();
zkutil::SingleBarrier upload_barrier{get_zookeeper, getPartitionPath(job) + "/upload_barrier", node_count};
zkutil::SingleBarrier upload_barrier{get_zookeeper, getPartitionPath() + "/upload_barrier", node_count};
zkutil::SingleBarrier::CancellationHook hook = std::bind(&ReshardingWorker::abortJobIfRequested, this);
upload_barrier.setCancellationHook(hook);
@ -2377,13 +2478,13 @@ zkutil::SingleBarrier ReshardingWorker::createUploadBarrier(const ReshardingJob
return upload_barrier;
}
zkutil::SingleBarrier ReshardingWorker::createElectionBarrier(const ReshardingJob & job)
zkutil::SingleBarrier ReshardingWorker::getElectionBarrier()
{
auto zookeeper = context.getZooKeeper();
auto node_count = zookeeper->getChildren(getPartitionPath(job) + "/nodes").size();
auto node_count = zookeeper->getChildren(getPartitionPath() + "/nodes").size();
zkutil::SingleBarrier election_barrier{get_zookeeper, getPartitionPath(job) + "/election_barrier", node_count};
zkutil::SingleBarrier election_barrier{get_zookeeper, getPartitionPath() + "/election_barrier", node_count};
zkutil::SingleBarrier::CancellationHook hook = std::bind(&ReshardingWorker::abortJobIfRequested, this);
election_barrier.setCancellationHook(hook);
@ -2391,13 +2492,13 @@ zkutil::SingleBarrier ReshardingWorker::createElectionBarrier(const ReshardingJo
return election_barrier;
}
zkutil::SingleBarrier ReshardingWorker::createCommitBarrier(const ReshardingJob & job)
zkutil::SingleBarrier ReshardingWorker::getCommitBarrier()
{
auto zookeeper = context.getZooKeeper();
auto node_count = zookeeper->getChildren(getPartitionPath(job) + "/nodes").size();
auto node_count = zookeeper->getChildren(getPartitionPath() + "/nodes").size();
zkutil::SingleBarrier commit_barrier{get_zookeeper, getPartitionPath(job) + "/commit_barrier", node_count};
zkutil::SingleBarrier commit_barrier{get_zookeeper, getPartitionPath() + "/commit_barrier", node_count};
zkutil::SingleBarrier::CancellationHook hook = std::bind(&ReshardingWorker::abortJobIfRequested, this);
commit_barrier.setCancellationHook(hook);
@ -2428,14 +2529,14 @@ std::string ReshardingWorker::getCoordinatorPath(const std::string & coordinator
return coordination_path + "/" + coordinator_id;
}
std::string ReshardingWorker::getPartitionPath(const ReshardingJob & job) const
std::string ReshardingWorker::getPartitionPath() const
{
return coordination_path + "/" + job.coordinator_id + "/partitions/" + job.partition;
return coordination_path + "/" + current_job.coordinator_id + "/partitions/" + current_job.partition;
}
std::string ReshardingWorker::getLocalJobPath(const ReshardingJob & job) const
std::string ReshardingWorker::getLocalJobPath() const
{
return host_task_queue_path + "/" + job.job_name;
return host_task_queue_path + "/" + current_job.job_name;
}
ReshardingWorker::AnomalyMonitor::AnomalyMonitor(ReshardingWorker & resharding_worker_)
@ -2494,7 +2595,7 @@ void ReshardingWorker::AnomalyMonitor::routine()
/// the loop body in order to avoid multiple notifications.
zkutil::EventPtr event = new Poco::Event;
/// Monitor both status changes and movements of the participating nodes.
/// Monitor both status changes and movements of the performers.
(void) zookeeper->get(resharding_worker.getCoordinatorPath(coordinator_id)
+ "/status_probe", nullptr, event);
(void) zookeeper->getChildren(resharding_worker.distributed_online_path,

View File

@ -3625,8 +3625,8 @@ void StorageReplicatedMergeTree::reshardPartitions(ASTPtr query, const String &
resharding_worker.addPartitions(coordinator_id, partition_list);
resharding_worker.waitForCheckCompletion(coordinator_id);
/// At this point, all the participating nodes know exactly the number
/// of partitions that are to be processed.
/// At this point, all the performers know exactly the number of partitions
/// that are to be processed.
auto count = resharding_worker.getPartitionCount(coordinator_id);
if (count == 0)
@ -3640,8 +3640,8 @@ void StorageReplicatedMergeTree::reshardPartitions(ASTPtr query, const String &
resharding_worker.waitForOptOutCompletion(coordinator_id, old_node_count);
/// At this point, all the participating nodes that actually have some
/// partitions are in a coherent state.
/// At this point, all the performers that actually have some partitions
/// are in a coherent state.
if (partition_list.empty())
return;