ClickHouse/dbms/src/Storages/MergeTree/ReshardingWorker.cpp

2599 lines
76 KiB
C++
Raw Normal View History

2016-01-28 01:00:27 +00:00
#include <DB/Storages/MergeTree/ReshardingWorker.h>
#include <DB/Storages/MergeTree/ReshardingJob.h>
#include <DB/Storages/MergeTree/MergeTreeData.h>
2016-03-02 14:12:04 +00:00
#include <DB/Storages/MergeTree/MergeTreeDataMerger.h>
2016-01-28 01:00:27 +00:00
#include <DB/Storages/MergeTree/ReplicatedMergeTreeAddress.h>
#include <DB/Storages/MergeTree/MergeTreeSharder.h>
#include <DB/Storages/MergeTree/MergeTreeBlockInputStream.h>
#include <DB/Storages/StorageReplicatedMergeTree.h>
2016-03-01 17:47:53 +00:00
2016-01-28 01:00:27 +00:00
#include <DB/IO/ReadBufferFromString.h>
#include <DB/IO/ReadHelpers.h>
#include <DB/IO/WriteBufferFromString.h>
#include <DB/IO/WriteHelpers.h>
2016-03-01 17:47:53 +00:00
2016-01-28 01:00:27 +00:00
#include <DB/Common/getFQDNOrHostName.h>
2016-03-25 11:48:45 +00:00
#include <DB/Common/SHA512Utils.h>
2016-03-01 17:47:53 +00:00
2016-01-28 01:00:27 +00:00
#include <DB/Interpreters/executeQuery.h>
#include <DB/Interpreters/Context.h>
2016-03-01 17:47:53 +00:00
#include <DB/Interpreters/Cluster.h>
#include <threadpool.hpp>
2016-03-01 17:47:53 +00:00
2016-01-28 01:00:27 +00:00
#include <zkutil/ZooKeeper.h>
2016-03-01 17:47:53 +00:00
2016-01-28 01:00:27 +00:00
#include <Poco/Event.h>
#include <Poco/DirectoryIterator.h>
#include <Poco/File.h>
2016-03-01 17:47:53 +00:00
#include <openssl/sha.h>
2016-01-28 01:00:27 +00:00
#include <future>
2016-03-01 17:47:53 +00:00
#include <chrono>
#include <cstdlib>
#include <ctime>
2016-01-28 01:00:27 +00:00
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int ABORTED;
extern const int UNEXPECTED_ZOOKEEPER_ERROR;
extern const int PARTITION_COPY_FAILED;
extern const int PARTITION_ATTACH_FAILED;
2016-01-28 01:00:42 +00:00
extern const int UNKNOWN_ELEMENT_IN_CONFIG;
extern const int INVALID_CONFIG_PARAMETER;
2016-03-01 17:47:53 +00:00
extern const int RESHARDING_BUSY_CLUSTER;
extern const int RESHARDING_BUSY_SHARD;
2016-03-01 17:47:53 +00:00
extern const int RESHARDING_NO_SUCH_COORDINATOR;
extern const int RESHARDING_NO_COORDINATOR_MEMBERSHIP;
extern const int RESHARDING_ALREADY_SUBSCRIBED;
extern const int RESHARDING_REMOTE_NODE_UNAVAILABLE;
extern const int RESHARDING_REMOTE_NODE_ERROR;
extern const int RESHARDING_COORDINATOR_DELETED;
extern const int RESHARDING_DISTRIBUTED_JOB_ON_HOLD;
extern const int RESHARDING_INVALID_QUERY;
extern const int RWLOCK_NO_SUCH_LOCK;
2016-03-25 11:48:45 +00:00
extern const int NO_SUCH_BARRIER;
extern const int RESHARDING_ILL_FORMED_LOG;
2016-01-28 01:00:27 +00:00
}
namespace
{
2016-03-01 17:47:53 +00:00
constexpr long wait_duration = 1000;
2016-03-25 11:48:45 +00:00
/// Helper class which extracts from the ClickHouse configuration file
/// the parameters we need for operating the resharding thread.
2016-01-28 01:00:42 +00:00
class Arguments final
{
public:
Arguments(const Poco::Util::AbstractConfiguration & config, const std::string & config_name)
{
Poco::Util::AbstractConfiguration::Keys keys;
config.keys(config_name, keys);
for (const auto & key : keys)
{
if (key == "task_queue_path")
{
task_queue_path = config.getString(config_name + "." + key);
if (task_queue_path.empty())
2016-03-25 11:48:45 +00:00
throw Exception{"Invalid parameter in resharding configuration", ErrorCodes::INVALID_CONFIG_PARAMETER};
2016-01-28 01:00:42 +00:00
}
else
2016-03-25 11:48:45 +00:00
throw Exception{"Unknown parameter in resharding configuration", ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG};
2016-01-28 01:00:42 +00:00
}
}
Arguments(const Arguments &) = delete;
Arguments & operator=(const Arguments &) = delete;
std::string getTaskQueuePath() const
{
return task_queue_path;
}
private:
std::string task_queue_path;
};
2016-03-25 11:48:45 +00:00
/// Helper class we use to read and write the status of a coordinator
/// or a node that has subscribed to a coordinator.
/// The status format is: status_code [, optional_message]
class Status final
{
public:
Status(ReshardingWorker::StatusCode code_, const std::string & msg_)
: code{code_}, msg{msg_}
{
}
Status(const std::string & str)
{
size_t pos = str.find(',');
code = static_cast<ReshardingWorker::StatusCode>(std::stoull(str.substr(0, pos)));
if (pos != std::string::npos)
{
if ((pos + 1) < str.length())
msg = str.substr(pos + 1);
}
}
Status(const Status &) = delete;
Status & operator=(const Status &) = delete;
std::string toString() const
{
return DB::toString(static_cast<UInt64>(code)) + (msg.empty() ? "" : ",") + msg;
}
ReshardingWorker::StatusCode getCode() const
{
return code;
}
std::string getMessage() const
{
return msg;
}
private:
ReshardingWorker::StatusCode code;
std::string msg;
};
2016-01-28 01:00:27 +00:00
}
2016-03-25 11:48:45 +00:00
/// Job structure:
///
/// /shards: various information on target shards that is needed in order to build
/// the log;
///
/// /log: contains one log record for each operation to be performed;
///
/// /is_published: znode created after uploading sharded partitions. If we have just
/// recovered from a failure, and this znode exists, apply changes without resharding
/// the initial partition. Indeed we could experience a failure after completing a
/// "ALTER TABLE ... DROP PARTITION ...", which would rule out any further attempt
/// to reshard the initial partition;
///
/// /is_log_created: znode created after creating the log. Prevents from recreating
/// it after a failure;
///
/// /is_committed: changes have been committed locally.
///
2016-03-09 13:59:34 +00:00
/// Rationale for distributed jobs:
2016-03-01 17:47:53 +00:00
///
/// A distributed job is initiated in a query ALTER TABLE RESHARD inside which
/// we specify a distributed table. Then ClickHouse creates a job coordinating
/// structure in ZooKeeper, namely a coordinator, identified by a so-called
/// coordinator id.
/// Each shard of the cluster specified in the distributed table metadata
/// receives one query ALTER TABLE RESHARD with the keyword COORDINATE WITH
/// indicating the aforementioned coordinator id.
///
2016-03-09 13:59:34 +00:00
/// Locking notes:
///
/// In order to avoid any deadlock situation, two approaches are implemented:
///
/// 1. As long as a cluster is busy with a distributed job, we forbid clients
/// to submit any new distributed job on this cluster. For this purpose, clusters
/// are identified by the hash value of the ordered list of their hostnames and
/// ports. In the event that an initiator should identify a cluster node by means
/// of a local address, some additional checks are performed on shards themselves.
///
/// 2. Also, the jobs that constitute a distributed job are submitted in identical
/// order on all the shards of a cluster. If one or more shards fail while performing
/// a distributed job, the latter is put on hold. Then no new jobs are performed
/// until the failing shards have come back online.
///
/// ZooKeeper coordinator structure description:
///
2016-03-01 17:47:53 +00:00
/// At the highest level we have under the /resharding_distributed znode:
///
/// /lock: global distributed read/write lock;
/// /online: currently online nodes;
/// /coordination: one znode for each coordinator.
2016-03-01 17:47:53 +00:00
///
/// A coordinator whose identifier is ${id} has the following layout
/// under the /coordination/${id} znode:
2016-03-01 17:47:53 +00:00
///
/// /lock: coordinator-specific distributed read/write lock;
///
/// /deletion_lock: for safe coordinator deletion
///
2016-03-01 17:47:53 +00:00
/// /query_hash: hash value obtained from the query that
/// is sent to the participating nodes;
///
/// /increment: unique block number allocator;
///
/// /status: coordinator status before its participating nodes have subscribed;
2016-03-01 17:47:53 +00:00
///
/// /status/${host}: status if an individual participating node;
2016-03-01 17:47:53 +00:00
///
2016-03-25 11:48:45 +00:00
/// /status_probe: node that we update just after having updated either the status
/// of a participating node or the status of the coordinator as a whole;
///
2016-03-01 17:47:53 +00:00
/// /cluster: cluster on which the distributed job is to be performed;
///
/// /node_count: number of nodes of the cluster that participate in at
/// least one distributed resharding job;
///
/// /cluster_addresses: the list of addresses, in both IP and hostname
/// representations, of all the nodes of the cluster; used to check if a given node
/// is a member of the cluster;
///
/// /shards: the list of shards that have subscribed;
///
/// /subscribe_barrier: when all the participating nodes have subscribed
/// to their coordinator, proceed further
///
2016-03-01 17:47:53 +00:00
/// /check_barrier: when all the participating nodes have checked
/// that they can perform resharding operations, proceed further;
///
/// /opt_out_barrier: after having crossed this barrier, each node of the cluster
/// knows exactly whether it will take part in distributed jobs or not.
///
/// /partitions: partitions that must be resharded on more than one shard;
///
/// /partitions/${partition_id}/nodes: participating nodes;
///
/// /partitions/${partition_id}/upload_barrier: when all the participating
/// nodes have uploaded new data to their respective replicas, we can apply changes;
///
2016-03-25 11:48:45 +00:00
/// /partitions/${partition_id}/election_barrier: used for the election of
/// a leader among the participating nodes;
///
/// /partitions/${partition_id}/commit_barrier: crossed when all the changes
/// have been applied on the target nodes;
///
2016-03-01 17:47:53 +00:00
/// /partitions/${partition_id}/recovery_barrier: recovery if
/// one or several participating nodes had previously gone offline.
///
2016-01-28 01:00:42 +00:00
ReshardingWorker::ReshardingWorker(const Poco::Util::AbstractConfiguration & config,
const std::string & config_name, Context & context_)
: context{context_}, get_zookeeper{[&]() { return context.getZooKeeper(); }}
2016-01-28 01:00:27 +00:00
{
2016-01-28 01:00:42 +00:00
Arguments arguments(config, config_name);
2016-01-28 01:00:27 +00:00
auto zookeeper = context.getZooKeeper();
2016-03-01 17:47:53 +00:00
std::string root = arguments.getTaskQueuePath();
if (root.back() != '/')
2016-03-01 17:47:53 +00:00
root += "/";
auto current_host = getFQDNOrHostName();
2016-03-25 11:48:45 +00:00
task_queue_path = root + "resharding/";
host_task_queue_path = task_queue_path + current_host;
2016-01-28 16:06:57 +00:00
zookeeper->createAncestors(host_task_queue_path + "/");
2016-03-01 17:47:53 +00:00
distributed_path = root + "resharding_distributed";
zookeeper->createAncestors(distributed_path + "/");
distributed_online_path = distributed_path + "/online";
zookeeper->createIfNotExists(distributed_online_path, "");
/// Notify that we are online.
2016-03-04 10:00:00 +00:00
int32_t code = zookeeper->tryCreate(distributed_online_path + "/" + current_host, "",
2016-03-01 17:47:53 +00:00
zkutil::CreateMode::Ephemeral);
2016-03-04 10:00:00 +00:00
if ((code != ZOK) && (code != ZNODEEXISTS))
2016-03-25 11:48:45 +00:00
throw zkutil::KeeperException{code};
2016-03-01 17:47:53 +00:00
distributed_lock_path = distributed_path + "/lock";
zookeeper->createIfNotExists(distributed_lock_path, "");
coordination_path = distributed_path + "/coordination";
zookeeper->createAncestors(coordination_path + "/");
2016-01-28 01:00:27 +00:00
}
ReshardingWorker::~ReshardingWorker()
{
2016-03-01 17:47:53 +00:00
try
2016-01-28 01:00:42 +00:00
{
2016-03-01 17:47:53 +00:00
shutdown();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
2016-01-28 01:00:42 +00:00
}
2016-01-28 01:00:27 +00:00
}
void ReshardingWorker::start()
{
2016-03-25 11:48:45 +00:00
polling_thread = std::thread{&ReshardingWorker::pollAndExecute, this};
2016-01-28 01:00:27 +00:00
}
2016-03-01 17:47:53 +00:00
void ReshardingWorker::shutdown()
2016-01-28 01:00:27 +00:00
{
2016-03-03 08:40:21 +00:00
if (is_started)
{
must_stop = true;
if (polling_thread.joinable())
polling_thread.join();
is_started = false;
}
2016-03-01 17:47:53 +00:00
}
void ReshardingWorker::submitJob(const ReshardingJob & job)
{
auto serialized_job = job.toString();
2016-01-28 01:00:42 +00:00
auto zookeeper = context.getZooKeeper();
(void) zookeeper->create(host_task_queue_path + "/task-", serialized_job,
zkutil::CreateMode::PersistentSequential);
2016-01-28 01:00:27 +00:00
}
bool ReshardingWorker::isStarted() const
{
return is_started;
}
void ReshardingWorker::pollAndExecute()
{
2016-03-25 11:48:45 +00:00
std::string error_msg;
2016-01-28 01:00:42 +00:00
2016-01-28 01:00:27 +00:00
try
{
bool old_val = false;
if (!is_started.compare_exchange_strong(old_val, true, std::memory_order_seq_cst,
std::memory_order_relaxed))
2016-03-25 11:48:45 +00:00
throw Exception{"Resharding background thread already started", ErrorCodes::LOGICAL_ERROR};
2016-01-28 01:00:27 +00:00
2016-01-28 01:00:42 +00:00
LOG_DEBUG(log, "Started resharding background thread.");
2016-01-28 01:00:27 +00:00
try
{
performPendingJobs();
}
catch (const Exception & ex)
{
2016-01-28 01:00:42 +00:00
if (ex.code() == ErrorCodes::ABORTED)
2016-01-28 01:00:27 +00:00
throw;
else
2016-01-28 01:00:42 +00:00
LOG_ERROR(log, ex.message());
2016-01-28 01:00:27 +00:00
}
2016-03-25 11:48:45 +00:00
catch (const std::exception & ex)
{
LOG_ERROR(log, ex.what());
}
2016-01-28 01:00:27 +00:00
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
while (true)
{
try
{
Strings children;
while (true)
{
auto zookeeper = context.getZooKeeper();
children = zookeeper->getChildren(host_task_queue_path, nullptr, event);
if (!children.empty())
break;
do
{
2016-03-01 17:47:53 +00:00
abortPollingIfRequested();
2016-01-28 01:00:27 +00:00
}
2016-03-01 17:47:53 +00:00
while (!event->tryWait(wait_duration));
2016-01-28 01:00:27 +00:00
}
std::sort(children.begin(), children.end());
perform(children);
}
catch (const Exception & ex)
{
2016-01-28 01:00:42 +00:00
if (ex.code() == ErrorCodes::ABORTED)
2016-01-28 01:00:27 +00:00
throw;
else
2016-01-28 01:00:42 +00:00
LOG_ERROR(log, ex.message());
2016-01-28 01:00:27 +00:00
}
2016-03-25 11:48:45 +00:00
catch (const std::exception & ex)
{
LOG_ERROR(log, ex.what());
}
2016-01-28 01:00:27 +00:00
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
}
catch (const Exception & ex)
{
2016-01-28 01:00:42 +00:00
if (ex.code() != ErrorCodes::ABORTED)
2016-03-25 11:48:45 +00:00
error_msg = ex.message();
}
catch (const std::exception & ex)
{
error_msg = ex.what();
2016-01-28 01:00:42 +00:00
}
catch (...)
{
2016-03-25 11:48:45 +00:00
error_msg = "unspecified";
tryLogCurrentException(__PRETTY_FUNCTION__);
2016-01-28 01:00:27 +00:00
}
2016-03-25 11:48:45 +00:00
if (!error_msg.empty())
LOG_ERROR(log, "Resharding background thread terminated with critical error: "
<< error_msg);
2016-01-28 01:00:42 +00:00
else
LOG_DEBUG(log, "Resharding background thread terminated.");
2016-01-28 01:00:27 +00:00
}
2016-03-01 17:47:53 +00:00
void ReshardingWorker::jabScheduler()
{
2016-03-08 13:46:00 +00:00
/// We inform the job scheduler that something has just happened. This forces
/// the scheduler to fetch all the current pending jobs. We need this when a
/// distributed job is not ready to be performed yet. Otherwise if no new jobs
/// were submitted, we wouldn't be able to check again whether we can perform
/// the distributed job.
event->set();
2016-03-01 17:47:53 +00:00
/// Sleep for 3 time units in order to prevent scheduler overloading
/// if we were the only job in the queue.
2016-03-08 13:46:00 +00:00
auto zookeeper = context.getZooKeeper();
2016-03-01 17:47:53 +00:00
if (zookeeper->getChildren(host_task_queue_path).size() == 1)
std::this_thread::sleep_for(3 * std::chrono::milliseconds(wait_duration));
}
2016-01-28 01:00:27 +00:00
void ReshardingWorker::performPendingJobs()
{
auto zookeeper = context.getZooKeeper();
Strings children = zookeeper->getChildren(host_task_queue_path);
std::sort(children.begin(), children.end());
perform(children);
}
void ReshardingWorker::perform(const Strings & job_nodes)
{
auto zookeeper = context.getZooKeeper();
for (const auto & child : job_nodes)
{
std::string child_full_path = host_task_queue_path + "/" + child;
auto job_descriptor = zookeeper->get(child_full_path);
2016-01-28 01:00:42 +00:00
try
{
2016-03-25 11:48:45 +00:00
perform(job_descriptor, child);
2016-01-28 01:00:42 +00:00
}
catch (const Exception & ex)
{
2016-03-01 17:47:53 +00:00
/// If the job has been cancelled, either locally or remotely, we keep it
/// in the corresponding task queues for a later execution.
/// If an error has occurred, either locally or remotely, while
/// performing the job, we delete it from the corresponding task queues.
try
2016-03-01 17:47:53 +00:00
{
if (ex.code() == ErrorCodes::ABORTED)
2016-03-01 17:47:53 +00:00
{
/// nothing here
2016-03-01 17:47:53 +00:00
}
else if (ex.code() == ErrorCodes::RESHARDING_REMOTE_NODE_UNAVAILABLE)
2016-03-01 17:47:53 +00:00
{
/// nothing here
2016-03-01 17:47:53 +00:00
}
else if (ex.code() == ErrorCodes::RESHARDING_DISTRIBUTED_JOB_ON_HOLD)
{
/// nothing here
}
else if (ex.code() == ErrorCodes::RESHARDING_REMOTE_NODE_ERROR)
2016-03-25 11:48:45 +00:00
zookeeper->removeRecursive(child_full_path);
else if (ex.code() == ErrorCodes::RESHARDING_COORDINATOR_DELETED)
2016-03-25 11:48:45 +00:00
zookeeper->removeRecursive(child_full_path);
else if (ex.code() == ErrorCodes::RWLOCK_NO_SUCH_LOCK)
2016-03-25 11:48:45 +00:00
zookeeper->removeRecursive(child_full_path);
else if (ex.code() == ErrorCodes::NO_SUCH_BARRIER)
zookeeper->removeRecursive(child_full_path);
else
2016-03-25 11:48:45 +00:00
zookeeper->removeRecursive(child_full_path);
2016-03-01 17:47:53 +00:00
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
throw;
2016-01-28 01:00:42 +00:00
}
catch (...)
{
2016-03-25 11:48:45 +00:00
zookeeper->removeRecursive(child_full_path);
2016-01-28 01:00:42 +00:00
throw;
}
2016-03-25 11:48:45 +00:00
zookeeper->removeRecursive(child_full_path);
2016-01-28 01:00:27 +00:00
}
}
2016-03-25 11:48:45 +00:00
void ReshardingWorker::perform(const std::string & job_descriptor, const std::string & job_name)
2016-01-28 01:00:27 +00:00
{
LOG_DEBUG(log, "Performing resharding job.");
2016-03-01 17:47:53 +00:00
current_job = ReshardingJob{job_descriptor};
2016-03-25 11:48:45 +00:00
current_job.job_name = job_name;
2016-03-01 17:47:53 +00:00
zkutil::RWLock deletion_lock;
if (current_job.isCoordinated())
2016-03-04 10:04:01 +00:00
deletion_lock = createDeletionLock(current_job.coordinator_id);
zkutil::RWLock::Guard<zkutil::RWLock::Read, zkutil::RWLock::NonBlocking> guard{deletion_lock};
if (!deletion_lock.ownsLock())
2016-03-25 11:48:45 +00:00
throw Exception{"Coordinator has been deleted", ErrorCodes::RESHARDING_COORDINATOR_DELETED};
2016-03-01 17:47:53 +00:00
StoragePtr generic_storage = context.getTable(current_job.database_name, current_job.table_name);
2016-01-28 01:00:27 +00:00
auto & storage = typeid_cast<StorageReplicatedMergeTree &>(*(generic_storage.get()));
2016-03-01 17:47:53 +00:00
current_job.storage = &storage;
2016-01-28 01:00:27 +00:00
2016-03-25 11:48:45 +00:00
/// Protect the source partition from merging jobs.
freezeSourcePartition();
std::string dumped_coordinator_state;
auto handle_exception = [&](const std::string & cancel_msg, const std::string & error_msg)
{
try
{
/// Cancellation has priority over errors.
if (must_stop)
{
if (current_job.isCoordinated())
{
setStatus(current_job.coordinator_id, getFQDNOrHostName(), STATUS_ON_HOLD,
cancel_msg);
dumped_coordinator_state = dumpCoordinatorState(current_job.coordinator_id);
}
softCleanup();
}
else
{
/// An error has occurred on this node.
if (current_job.isCoordinated())
{
setStatus(current_job.coordinator_id, getFQDNOrHostName(), STATUS_ERROR,
error_msg);
dumped_coordinator_state = dumpCoordinatorState(current_job.coordinator_id);
}
deletion_lock.release();
unfreezeSourcePartition();
hardCleanup();
}
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
};
2016-01-28 01:00:27 +00:00
try
{
2016-03-01 17:47:53 +00:00
attachJob();
2016-03-25 11:48:45 +00:00
{
ScopedAnomalyMonitor scoped_anomaly_monitor{anomaly_monitor};
/// Create the new sharded partitions. Upload them to their target
/// shards. Collect into persistent storage all the information we
/// need in order to commit changes.
if (!isPublished())
{
createShardedPartitions();
storeTargetShards();
publishShardedPartitions();
deleteTemporaryData();
markAsPublished();
}
waitForUploadCompletion();
/// If the current job is part of a distributed job, take participation
/// in a leader election among all the participating nodes. All the
/// participating nodes drop their source partition. Moreover the leader
/// sends all the required attach requests to the target shards.
electLeader();
/// Build into persistent storage a log consisting of a description
/// of all the operations to be performed within the commit operation.
if (!isLogCreated())
{
createLog();
markLogAsCreated();
}
if (!isCommitted())
{
commit();
markAsCommitted();
}
/// A distributed job is considered to be complete if and only if
/// changes have been committed on all the participating nodes.
waitForCommitCompletion();
}
2016-01-28 01:00:27 +00:00
}
catch (const Exception & ex)
{
2016-03-01 17:47:53 +00:00
try
{
if (ex.code() == ErrorCodes::ABORTED)
{
LOG_DEBUG(log, "Resharding job cancelled.");
/// A soft shutdown is being performed on this node.
/// Put the current distributed job on hold in order to reliably handle
/// the scenario in which the remote nodes undergo a hard shutdown.
if (current_job.isCoordinated())
2016-03-25 11:48:45 +00:00
{
setStatus(current_job.coordinator_id, getFQDNOrHostName(), STATUS_ON_HOLD,
ex.message());
dumped_coordinator_state = dumpCoordinatorState(current_job.coordinator_id);
}
2016-03-01 17:47:53 +00:00
softCleanup();
}
else if (ex.code() == ErrorCodes::RESHARDING_REMOTE_NODE_UNAVAILABLE)
{
/// A remote node has gone offline.
/// Put the current distributed job on hold. Also jab the job scheduler
/// so that it will come accross this distributed job even if no new jobs
/// are submitted.
2016-03-25 11:48:45 +00:00
setStatus(current_job.coordinator_id, getFQDNOrHostName(), STATUS_ON_HOLD,
ex.message());
dumped_coordinator_state = dumpCoordinatorState(current_job.coordinator_id);
2016-03-01 17:47:53 +00:00
softCleanup();
jabScheduler();
}
else if (ex.code() == ErrorCodes::RESHARDING_REMOTE_NODE_ERROR)
{
2016-03-25 11:48:45 +00:00
dumped_coordinator_state = dumpCoordinatorState(current_job.coordinator_id);
deletion_lock.release();
2016-03-25 11:48:45 +00:00
unfreezeSourcePartition();
2016-03-01 17:47:53 +00:00
hardCleanup();
}
else if (ex.code() == ErrorCodes::RESHARDING_COORDINATOR_DELETED)
unfreezeSourcePartition();
2016-03-01 17:47:53 +00:00
else if (ex.code() == ErrorCodes::RESHARDING_DISTRIBUTED_JOB_ON_HOLD)
{
/// The current distributed job is on hold and one or more required nodes
/// have not gone online yet. Jab the job scheduler so that it will come
/// accross this distributed job even if no new jobs are submitted.
2016-03-25 11:48:45 +00:00
setStatus(current_job.coordinator_id, getFQDNOrHostName(), STATUS_ON_HOLD,
ex.message());
dumped_coordinator_state = dumpCoordinatorState(current_job.coordinator_id);
2016-03-01 17:47:53 +00:00
jabScheduler();
}
else
2016-03-25 11:48:45 +00:00
handle_exception(ex.message(), ex.message());
2016-03-01 17:47:53 +00:00
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
2016-01-28 01:00:27 +00:00
2016-03-25 11:48:45 +00:00
LOG_ERROR(log, dumped_coordinator_state);
throw;
}
catch (const std::exception & ex)
{
/// An error has occurred on this node.
handle_exception("Resharding job cancelled", ex.what());
LOG_ERROR(log, dumped_coordinator_state);
2016-01-28 01:00:27 +00:00
throw;
}
catch (...)
{
2016-03-01 17:47:53 +00:00
/// An error has occurred on this node.
2016-03-25 11:48:45 +00:00
handle_exception("Resharding job cancelled", "An unspecified error has occurred");
LOG_ERROR(log, dumped_coordinator_state);
2016-01-28 01:00:27 +00:00
throw;
}
deletion_lock.release();
2016-03-25 11:48:45 +00:00
/// Although the source partition has been dropped, the following function
/// must be called in order to delete all the data that makes freezing fail-safe.
unfreezeSourcePartition();
2016-03-01 17:47:53 +00:00
hardCleanup();
2016-03-25 11:48:45 +00:00
2016-01-28 01:00:27 +00:00
LOG_DEBUG(log, "Resharding job successfully completed.");
}
2016-03-01 17:47:53 +00:00
void ReshardingWorker::createShardedPartitions()
2016-01-28 01:00:27 +00:00
{
2016-03-01 17:47:53 +00:00
abortJobIfRequested();
2016-01-28 01:00:27 +00:00
LOG_DEBUG(log, "Splitting partition shard-wise.");
2016-03-01 17:47:53 +00:00
auto & storage = *(current_job.storage);
2016-03-02 14:12:04 +00:00
MergeTreeDataMerger merger{storage.data};
2016-03-01 17:47:53 +00:00
MergeTreeDataMerger::CancellationHook hook = std::bind(&ReshardingWorker::abortJobIfRequested, this);
2016-03-02 14:12:04 +00:00
merger.setCancellationHook(hook);
2016-01-28 01:00:42 +00:00
2016-01-28 16:06:57 +00:00
MergeTreeData::PerShardDataParts & per_shard_data_parts = storage.data.per_shard_data_parts;
2016-03-02 14:12:04 +00:00
per_shard_data_parts = merger.reshardPartition(current_job);
2016-01-28 01:00:27 +00:00
}
2016-03-25 11:48:45 +00:00
void ReshardingWorker::storeTargetShards()
{
auto & storage = *(current_job.storage);
MergeTreeData::PerShardDataParts & per_shard_data_parts = storage.data.per_shard_data_parts;
auto zookeeper = context.getZooKeeper();
zookeeper->tryRemove(getLocalJobPath(current_job) + "/shards");
std::string out;
WriteBufferFromString buf{out};
size_t entries_count = 0;
for (const auto & entry : per_shard_data_parts)
{
const MergeTreeData::MutableDataPartPtr & part_from_shard = entry.second;
if (!part_from_shard)
continue;
++entries_count;
}
writeVarUInt(entries_count, buf);
for (const auto & entry : per_shard_data_parts)
{
size_t shard_no = entry.first;
const MergeTreeData::MutableDataPartPtr & part_from_shard = entry.second;
if (!part_from_shard)
continue;
std::string part = storage.data.getFullPath() + "reshard/" + toString(shard_no) + "/" + part_from_shard->name;
auto hash = SHA512Utils::computeHashFromFolder(part);
writeVarUInt(shard_no, buf);
writeBinary(part_from_shard->name, buf);
writeBinary(hash, buf);
}
buf.next();
(void) zookeeper->create(getLocalJobPath(current_job) + "/shards", out,
zkutil::CreateMode::Persistent);
}
ReshardingWorker::ShardList ReshardingWorker::getTargetShards(const std::string & hostname,
const std::string & job_name)
{
ShardList shard_list;
auto zookeeper = context.getZooKeeper();
auto shards = zookeeper->get(task_queue_path + hostname + "/" + job_name + "/shards");
ReadBufferFromString buf{shards};
size_t entries_count;
readVarUInt(entries_count, buf);
for (size_t i = 0; i < entries_count; ++i)
{
size_t shard_no;
readVarUInt(shard_no, buf);
std::string part_name;
readBinary(part_name, buf);
std::string hash;
readBinary(hash, buf);
shard_list.emplace_back(shard_no, part_name, hash);
}
return shard_list;
}
2016-03-01 17:47:53 +00:00
void ReshardingWorker::publishShardedPartitions()
2016-01-28 01:00:27 +00:00
{
2016-03-01 17:47:53 +00:00
abortJobIfRequested();
2016-01-28 01:00:27 +00:00
LOG_DEBUG(log, "Sending newly created partitions to their respective shards.");
2016-03-01 17:47:53 +00:00
auto & storage = *(current_job.storage);
auto zookeeper = context.getZooKeeper();
2016-01-28 01:00:27 +00:00
struct TaskInfo
{
TaskInfo(const std::string & replica_path_,
2016-01-28 16:06:57 +00:00
const std::string & part_,
2016-01-28 01:00:27 +00:00
const ReplicatedMergeTreeAddress & dest_,
size_t shard_no_)
2016-01-28 16:06:57 +00:00
: replica_path(replica_path_), dest(dest_), part(part_),
2016-01-28 01:00:27 +00:00
shard_no(shard_no_)
{
}
TaskInfo(const TaskInfo &) = delete;
TaskInfo & operator=(const TaskInfo &) = delete;
TaskInfo(TaskInfo &&) = default;
TaskInfo & operator=(TaskInfo &&) = default;
2016-01-28 01:00:27 +00:00
std::string replica_path;
ReplicatedMergeTreeAddress dest;
2016-01-28 16:06:57 +00:00
std::string part;
2016-01-28 01:00:27 +00:00
size_t shard_no;
};
using TaskInfoList = std::vector<TaskInfo>;
TaskInfoList task_info_list;
/// Копировать новые партиции на реплики соответствующих шардов.
/// Количество участвующих локальных реплик. Должно быть <= 1.
size_t local_count = 0;
2016-01-28 01:00:42 +00:00
for (const auto & entry : storage.data.per_shard_data_parts)
2016-01-28 01:00:27 +00:00
{
2016-01-28 01:00:42 +00:00
size_t shard_no = entry.first;
2016-01-28 16:06:57 +00:00
const MergeTreeData::MutableDataPartPtr & part_from_shard = entry.second;
if (!part_from_shard)
2016-01-28 01:00:42 +00:00
continue;
2016-01-28 01:00:27 +00:00
2016-03-01 17:47:53 +00:00
const WeightedZooKeeperPath & weighted_path = current_job.paths[shard_no];
2016-01-28 01:00:42 +00:00
const std::string & zookeeper_path = weighted_path.first;
2016-01-28 01:00:27 +00:00
auto children = zookeeper->getChildren(zookeeper_path + "/replicas");
for (const auto & child : children)
{
const std::string replica_path = zookeeper_path + "/replicas/" + child;
auto host = zookeeper->get(replica_path + "/host");
ReplicatedMergeTreeAddress host_desc(host);
2016-01-28 16:06:57 +00:00
task_info_list.emplace_back(replica_path, part_from_shard->name, host_desc, shard_no);
2016-01-28 01:00:27 +00:00
if (replica_path == storage.replica_path)
{
++local_count;
if (local_count > 1)
2016-03-25 11:48:45 +00:00
throw Exception{"Detected more than one local replica", ErrorCodes::LOGICAL_ERROR};
2016-01-28 01:00:27 +00:00
std::swap(task_info_list[0], task_info_list[task_info_list.size() - 1]);
}
}
}
2016-03-01 17:47:53 +00:00
abortJobIfRequested();
2016-01-28 01:00:27 +00:00
size_t remote_count = task_info_list.size() - local_count;
boost::threadpool::pool pool(remote_count);
using Tasks = std::vector<std::packaged_task<bool()> >;
Tasks tasks(remote_count);
2016-03-25 11:48:45 +00:00
ReplicatedMergeTreeAddress local_address{zookeeper->get(storage.replica_path + "/host")};
InterserverIOEndpointLocation from_location{storage.replica_path, local_address.host, local_address.replication_port};
2016-01-28 01:00:27 +00:00
2016-03-01 17:47:53 +00:00
ShardedPartitionUploader::Client::CancellationHook hook = std::bind(&ReshardingWorker::abortJobIfRequested, this);
2016-03-25 11:48:45 +00:00
2016-03-01 17:47:53 +00:00
storage.sharded_partition_uploader_client.setCancellationHook(hook);
2016-01-28 01:00:27 +00:00
try
{
for (size_t i = local_count; i < task_info_list.size(); ++i)
{
const TaskInfo & entry = task_info_list[i];
const auto & replica_path = entry.replica_path;
const auto & dest = entry.dest;
2016-01-28 16:06:57 +00:00
const auto & part = entry.part;
2016-01-28 01:00:27 +00:00
size_t shard_no = entry.shard_no;
2016-03-25 11:48:45 +00:00
InterserverIOEndpointLocation to_location{replica_path, dest.host, dest.replication_port};
2016-01-28 01:00:27 +00:00
size_t j = i - local_count;
2016-03-25 11:48:45 +00:00
tasks[j] = Tasks::value_type{std::bind(&ShardedPartitionUploader::Client::send,
&storage.sharded_partition_uploader_client, part, shard_no, to_location)};
2016-01-28 01:00:27 +00:00
pool.schedule([j, &tasks]{ tasks[j](); });
}
}
catch (...)
{
2016-03-01 17:47:53 +00:00
try
{
pool.wait();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
if (!current_job.is_aborted)
{
/// We may have caught an error because one or more remote nodes have
/// gone offline while performing I/O. The following check is here to
/// sort out this ambiguity.
abortJobIfRequested();
}
2016-01-28 01:00:27 +00:00
throw;
}
pool.wait();
for (auto & task : tasks)
{
bool res = task.get_future().get();
if (!res)
2016-03-25 11:48:45 +00:00
throw Exception{"Failed to copy partition", ErrorCodes::PARTITION_COPY_FAILED};
2016-01-28 01:00:27 +00:00
}
2016-03-01 17:47:53 +00:00
abortJobIfRequested();
2016-01-28 01:00:27 +00:00
if (local_count == 1)
{
/// На локальной реплике просто перемещаем шардированную паритцию в папку detached/.
const TaskInfo & entry = task_info_list[0];
2016-01-28 16:06:57 +00:00
const auto & part = entry.part;
2016-01-28 01:00:27 +00:00
size_t shard_no = entry.shard_no;
2016-01-28 16:06:57 +00:00
std::string from_path = storage.full_path + "reshard/" + toString(shard_no) + "/" + part + "/";
std::string to_path = storage.full_path + "detached/";
2016-03-25 11:48:45 +00:00
Poco::File{from_path}.moveTo(to_path);
2016-01-28 01:00:27 +00:00
}
}
2016-03-25 11:48:45 +00:00
void ReshardingWorker::commit()
2016-01-28 01:00:27 +00:00
{
2016-03-25 11:48:45 +00:00
/// Note: we never rollback any change. After having recovered from an abnormal
/// situation, we attempt to apply all the pending changes.
2016-03-01 17:47:53 +00:00
auto zookeeper = context.getZooKeeper();
2016-01-28 01:00:27 +00:00
2016-03-25 11:48:45 +00:00
auto log_path = getLocalJobPath(current_job) + "/log";
2016-01-28 01:00:27 +00:00
2016-03-25 11:48:45 +00:00
std::vector<LogRecord> log_records;
2016-03-25 11:48:45 +00:00
auto children = zookeeper->getChildren(log_path);
if (children.empty())
return;
2016-01-28 01:00:27 +00:00
2016-03-25 11:48:45 +00:00
LOG_DEBUG(log, "Committing changes.");
2016-03-25 11:48:45 +00:00
std::sort(children.begin(), children.end());
2016-03-25 11:48:45 +00:00
for (const auto & child : children)
log_records.emplace_back(zookeeper, log_path + "/" + child);
2016-01-28 01:00:27 +00:00
2016-03-25 11:48:45 +00:00
/// Find defective log records and repair them.
for (LogRecord & log_record : log_records)
{
2016-03-25 11:48:45 +00:00
if (log_record.state == LogRecord::RUNNING)
repairLogRecord(log_record);
}
2016-03-25 11:48:45 +00:00
size_t operation_count = 0;
for (const LogRecord & log_record : log_records)
{
if (log_record.state == LogRecord::READY)
++operation_count;
}
2016-03-25 11:48:45 +00:00
if (operation_count == 0)
{
/// All the operations have already been performed.
return;
}
2016-01-28 01:00:27 +00:00
2016-03-25 11:48:45 +00:00
if (!current_job.do_copy)
2016-01-28 01:00:27 +00:00
{
2016-03-25 11:48:45 +00:00
/// If the keyword COPY is not specified, we have drop operations. They are
/// always performed first. This is to prevent a name clash if an attach
/// operation should ever happen to run locally.
if (log_records[0].operation != LogRecord::OP_DROP)
throw Exception{"Ill-formed log", ErrorCodes::RESHARDING_ILL_FORMED_LOG};
2016-01-28 01:00:42 +00:00
2016-03-25 11:48:45 +00:00
if (log_records[0].state == LogRecord::READY)
executeLogRecord(log_records[0]);
2016-01-28 01:00:27 +00:00
2016-03-25 11:48:45 +00:00
--operation_count;
2016-03-25 11:48:45 +00:00
if (operation_count == 0)
2016-01-28 01:00:27 +00:00
{
2016-03-25 11:48:45 +00:00
/// The drop operation was the only task to be performed.
return;
2016-01-28 01:00:27 +00:00
}
}
2016-03-25 11:48:45 +00:00
/// Execute all the remaining log records.
2016-01-28 01:00:27 +00:00
2016-03-25 11:48:45 +00:00
size_t pool_size = operation_count;
boost::threadpool::pool pool(pool_size);
2016-01-28 01:00:27 +00:00
2016-03-25 11:48:45 +00:00
using Tasks = std::vector<std::packaged_task<void()> >;
Tasks tasks(pool_size);
try
{
size_t j = 0;
/// Note: yes, i = 0 is correct indeed since, if we have just performed
/// a drop, it won't be run again. Maintaining a supplementary variable
/// to keep track of what we have already done is definitely not worth
/// the price.
for (size_t i = 0; i < log_records.size(); ++i)
2016-01-28 01:00:27 +00:00
{
2016-03-25 11:48:45 +00:00
if (log_records[i].state == LogRecord::READY)
{
2016-03-25 11:48:45 +00:00
if (operation_count == 0)
throw Exception{"ReshardingWorker: found discrepancy while committing",
ErrorCodes::LOGICAL_ERROR};
tasks[j] = Tasks::value_type{std::bind(&ReshardingWorker::executeLogRecord, this,
log_records[i])};
pool.schedule([j, &tasks]{ tasks[j](); });
++j;
--operation_count;
}
2016-01-28 01:00:27 +00:00
}
2016-03-25 11:48:45 +00:00
}
catch (...)
{
try
{
pool.wait();
2016-03-25 11:48:45 +00:00
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
2016-03-25 11:48:45 +00:00
throw;
}
2016-01-28 01:00:27 +00:00
2016-03-25 11:48:45 +00:00
pool.wait();
LOG_DEBUG(log, "Changes successfully committed.");
}
2016-01-28 01:00:27 +00:00
2016-03-25 11:48:45 +00:00
void ReshardingWorker::repairLogRecord(LogRecord & log_record)
{
bool found = true;
if (log_record.operation == LogRecord::OP_DROP)
{
/// We make the following conservative assumptions:
/// 1. If there is no partition with the source partition name, the source partition
/// has already been dropped.
/// 2. If there is such a partition but it has changed since we built from it
/// new sharded partitions, we cannot drop it since it could result in data loss.
auto & storage = *(current_job.storage);
if (storage.data.getPartitionSize(current_job.partition) == 0)
found = false;
else
{
auto current_hash = storage.data.computePartitionHash(current_job.partition);
if (current_hash != log_record.partition_hash)
{
2016-03-25 11:48:45 +00:00
LOG_WARNING(log, "The source partition " << current_job.partition
<< " cannot be dropped because it has changed since the last"
" time we were online");
found = false;
}
}
2016-01-28 01:00:27 +00:00
}
2016-03-25 11:48:45 +00:00
else if (log_record.operation == LogRecord::OP_ATTACH)
found = checkAttachLogRecord(log_record);
else
throw Exception{"Ill-formed log", ErrorCodes::RESHARDING_ILL_FORMED_LOG};
if (!found)
{
/// The operation was completed.
log_record.state = LogRecord::DONE;
}
else
{
/// The operation was not performed. Do it again.
log_record.state = LogRecord::READY;
}
log_record.writeBack();
}
void ReshardingWorker::executeLogRecord(LogRecord & log_record)
{
if (log_record.operation == LogRecord::OP_DROP)
executeDrop(log_record);
else if (log_record.operation == LogRecord::OP_ATTACH)
executeAttach(log_record);
else
throw Exception{"Ill-formed log", ErrorCodes::RESHARDING_ILL_FORMED_LOG};
}
void ReshardingWorker::executeDrop(LogRecord & log_record)
{
log_record.state = LogRecord::RUNNING;
log_record.writeBack();
/// Locally drop the source partition.
std::string query_str = "ALTER TABLE " + current_job.database_name + "."
+ current_job.table_name + " DROP PARTITION " + current_job.partition;
(void) executeQuery(query_str, context, true);
log_record.state = LogRecord::DONE;
log_record.writeBack();
}
bool ReshardingWorker::checkAttachLogRecord(LogRecord & log_record)
{
/// We check that all the replicas of the shard that must receive an attach
/// request store all the required detached partitions. Moreover we verify
/// that these detached partitions have not changed. We conservatively
/// assume that, if this check fails, the attach request cannot be applied.
auto & storage = *(current_job.storage);
auto zookeeper = context.getZooKeeper();
struct TaskInfo
{
TaskInfo(const std::string & replica_path_,
const ReplicatedMergeTreeAddress & dest_,
const std::string & part_, const std::string & hash_)
: replica_path(replica_path_), dest(dest_),
part(part_), hash(hash_)
{
}
TaskInfo(const TaskInfo &) = delete;
TaskInfo & operator=(const TaskInfo &) = delete;
TaskInfo(TaskInfo &&) = default;
TaskInfo & operator=(TaskInfo &&) = default;
std::string replica_path;
ReplicatedMergeTreeAddress dest;
std::string part;
std::string hash;
};
using TaskInfoList = std::vector<TaskInfo>;
TaskInfoList task_info_list;
const WeightedZooKeeperPath & weighted_path = current_job.paths[log_record.shard_no];
const std::string & zookeeper_path = weighted_path.first;
auto children = zookeeper->getChildren(zookeeper_path + "/replicas");
for (const auto & child : children)
{
const std::string & replica_path = zookeeper_path + "/replicas/" + child;
auto host = zookeeper->get(replica_path + "/host");
ReplicatedMergeTreeAddress host_desc{host};
for (const auto & entry : log_record.parts_with_hash)
{
const auto & part = entry.first;
const auto & hash = entry.second;
task_info_list.emplace_back(replica_path, host_desc, part, hash);
}
}
boost::threadpool::pool pool(task_info_list.size());
using Tasks = std::vector<std::packaged_task<RemotePartChecker::Status()> >;
Tasks tasks(task_info_list.size());
try
{
for (size_t i = 0; i < task_info_list.size(); ++i)
{
const TaskInfo & entry = task_info_list[i];
const auto & replica_path = entry.replica_path;
const auto & dest = entry.dest;
const auto & part = entry.part;
const auto & hash = entry.hash;
InterserverIOEndpointLocation to_location{replica_path, dest.host, dest.replication_port};
tasks[i] = Tasks::value_type{std::bind(&RemotePartChecker::Client::check,
&storage.remote_part_checker_client, part, hash, to_location)};
pool.schedule([i, &tasks]{ tasks[i](); });
}
}
catch (...)
{
try
{
pool.wait();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
throw;
}
pool.wait();
bool may_perform_attach = true;
for (size_t i = 0; i < tasks.size(); ++i)
{
RemotePartChecker::Status status = tasks[i].get_future().get();
if (status != RemotePartChecker::Status::OK)
{
may_perform_attach = false;
const TaskInfo & entry = task_info_list[i];
if (status == RemotePartChecker::Status::INCONSISTENT)
LOG_WARNING(log, "Cannot attach sharded partition " << current_job.partition
<< " on host " << entry.dest.host << " because some changes have occurred "
" in part " << entry.part << " since the last time we were online.");
else if (status == RemotePartChecker::Status::ERROR)
LOG_WARNING(log, "Cannot attach sharded partition " << current_job.partition
<< " on host " << entry.dest.host << " because an unexpected error "
<< " was triggered while trying to check part " << entry.part);
}
}
return may_perform_attach;
}
void ReshardingWorker::executeAttach(LogRecord & log_record)
{
auto & storage = *(current_job.storage);
auto zookeeper = context.getZooKeeper();
/// Description of a task on a replica.
struct TaskInfo
{
TaskInfo(const std::string & replica_path_, const ReplicatedMergeTreeAddress & dest_)
: replica_path(replica_path_), dest(dest_)
{
}
TaskInfo(const TaskInfo &) = delete;
TaskInfo & operator=(const TaskInfo &) = delete;
TaskInfo(TaskInfo &&) = default;
TaskInfo & operator=(TaskInfo &&) = default;
std::string replica_path;
ReplicatedMergeTreeAddress dest;
};
/// Description of tasks for each replica of a shard.
/// For fault tolerance purposes, some fields are provided
/// to perform attempts on more than one replica if needed.
struct ShardTaskInfo
{
ShardTaskInfo()
{
struct timespec times;
if (clock_gettime(CLOCK_THREAD_CPUTIME_ID, &times))
throwFromErrno("Cannot clock_gettime.", DB::ErrorCodes::CANNOT_CLOCK_GETTIME);
(void) srand48_r(reinterpret_cast<intptr_t>(this) ^ times.tv_nsec, &rand_state);
}
ShardTaskInfo(const ShardTaskInfo &) = delete;
ShardTaskInfo & operator=(const ShardTaskInfo &) = delete;
ShardTaskInfo(ShardTaskInfo &&) = default;
ShardTaskInfo & operator=(ShardTaskInfo &&) = default;
/// one task for each replica
std::vector<TaskInfo> shard_tasks;
/// index to the replica to be used
size_t next = 0;
/// result of the operation on the current replica
bool is_success = false;
/// For pseudo-random number generation.
drand48_data rand_state;
};
const WeightedZooKeeperPath & weighted_path = current_job.paths[log_record.shard_no];
const std::string & zookeeper_path = weighted_path.first;
ShardTaskInfo shard_task_info;
auto children = zookeeper->getChildren(zookeeper_path + "/replicas");
for (const auto & child : children)
{
const std::string replica_path = zookeeper_path + "/replicas/" + child;
auto host = zookeeper->get(replica_path + "/host");
ReplicatedMergeTreeAddress host_desc(host);
shard_task_info.shard_tasks.emplace_back(replica_path, host_desc);
}
log_record.state = LogRecord::RUNNING;
log_record.writeBack();
while (true)
{
/// Randomly choose a replica on which to perform the operation.
long int rand_res;
(void) lrand48_r(&shard_task_info.rand_state, &rand_res);
size_t current = shard_task_info.next + rand_res % (shard_task_info.shard_tasks.size() - shard_task_info.next);
std::swap(shard_task_info.shard_tasks[shard_task_info.next], shard_task_info.shard_tasks[current]);
++shard_task_info.next;
TaskInfo & cur_task_shard_task_info = shard_task_info.shard_tasks[shard_task_info.next - 1];
const auto & replica_path = cur_task_shard_task_info.replica_path;
const auto & dest = cur_task_shard_task_info.dest;
/// Run the operation.
InterserverIOEndpointLocation location(replica_path, dest.host, dest.replication_port);
std::string query_str = "ALTER TABLE " + dest.database + "."
+ dest.table + " ATTACH PARTITION " + current_job.partition;
bool res = storage.remote_query_executor_client.executeQuery(location, query_str);
if (res)
break;
else if (shard_task_info.next == shard_task_info.shard_tasks.size())
{
/// No more attempts are possible.
throw Exception{"Failed to attach partition on shard",
ErrorCodes::PARTITION_ATTACH_FAILED};
}
}
log_record.state = LogRecord::DONE;
log_record.writeBack();
}
void ReshardingWorker::electLeader()
{
/// If we are not a distributed job, do nothing since we are obviously the
/// leader. Otherwise each node of the distributed job creates an ephemeral
/// sequential znode onto ZooKeeper persistent storage. When all these nodes
/// have entered the game, i.e. the election barrier is released, the winner
/// is the node having the znode with the lowest ID.
/// Then one of the nodes publishes this piece of information as a new znode.
///
/// In case of failure this election scheme is guaranteed to always succeed:
///
/// 1. If one node experiences a failure, it will eventually get winner
/// information if another node was able to publish it.
///
/// 2. If all the nodes experience a failure before any of them could publish
/// winner information, the election is simply re-run. This is possible since
/// SingleBarrier objects may be, by design, crossed again after release.
///
/// 3. If two nodes A, B get inconsistent winner information because of the
/// failure of a third node C (e.g. A determines that C is the winner, but
/// then, C fails; consequently the corresponding znode disappears; then B
/// determines that A is the winner), save for any failure, either A or B
/// will succeed to publish first its information. That is indeed all what
/// we need.
if (!current_job.isCoordinated())
return;
LOG_DEBUG(log, "Performing leader election");
auto leader = getPartitionPath(current_job) + "/leader";
auto election_path = getPartitionPath(current_job) + "/leader_election";
auto zookeeper = context.getZooKeeper();
if (!zookeeper->exists(leader))
{
zookeeper->create(election_path + "/node-", getFQDNOrHostName(),
zkutil::CreateMode::EphemeralSequential);
waitForElectionCompletion();
auto nodes = zookeeper->getChildren(election_path);
std::sort(nodes.begin(), nodes.end());
auto winner = zookeeper->get(election_path + "/" + nodes.front());
zookeeper->createIfNotExists(leader, winner);
}
}
bool ReshardingWorker::isLeader()
{
if (!current_job.isCoordinated())
return true;
auto zookeeper = context.getZooKeeper();
return zookeeper->get(getPartitionPath(current_job) + "/leader") == getFQDNOrHostName();
}
void ReshardingWorker::createLog()
{
LOG_DEBUG(log, "Creating log");
auto & storage = *(current_job.storage);
auto zookeeper = context.getZooKeeper();
auto log_path = getLocalJobPath(current_job) + "/log";
if (zookeeper->exists(log_path))
{
/// An abnormal condition occurred while creating the log the last time
/// we were online. Therefore we assume that it must be garbage.
zookeeper->removeRecursive(log_path);
}
(void) zookeeper->create(log_path, "", zkutil::CreateMode::Persistent);
/// If the keyword COPY is not specified, a drop request is performed on
/// each participating node.
if (!current_job.do_copy)
{
LogRecord log_record{zookeeper};
log_record.operation = LogRecord::OP_DROP;
log_record.partition = current_job.partition;
log_record.partition_hash = storage.data.computePartitionHash(current_job.partition);
log_record.state = LogRecord::READY;
log_record.enqueue(log_path);
}
if (!isLeader())
return;
/// The leader performs all the attach requests on the target shards.
std::unordered_map<size_t, ShardList> shard_to_info;
if (current_job.isCoordinated())
{
auto nodes = zookeeper->getChildren(getPartitionPath(current_job) + "/nodes");
for (const auto & node : nodes)
{
auto job_name = zookeeper->get(getPartitionPath(current_job) + "/nodes/" + node);
ShardList shards_from_node = getTargetShards(node, job_name);
for (const TargetShardInfo & shard_info : shards_from_node)
shard_to_info[shard_info.shard_no].push_back(shard_info);
}
}
else
{
ShardList shards_from_node = getTargetShards(getFQDNOrHostName(), current_job.job_name);
for (const TargetShardInfo & shard_info : shards_from_node)
shard_to_info[shard_info.shard_no].push_back(shard_info);
}
for (const auto & entry : shard_to_info)
{
size_t shard_no = entry.first;
const ShardList & shard_list = entry.second;
LogRecord log_record{zookeeper};
log_record.operation = LogRecord::OP_ATTACH;
log_record.partition = current_job.partition;
log_record.shard_no = shard_no;
log_record.state = LogRecord::READY;
for (const TargetShardInfo & info : shard_list)
log_record.parts_with_hash.emplace(info.part_name, info.hash);
log_record.enqueue(log_path);
}
2016-01-28 01:00:27 +00:00
}
2016-03-01 17:47:53 +00:00
void ReshardingWorker::softCleanup()
{
LOG_DEBUG(log, "Performing soft cleanup.");
2016-03-25 11:48:45 +00:00
deleteTemporaryData();
2016-03-01 17:47:53 +00:00
current_job.clear();
}
void ReshardingWorker::hardCleanup()
2016-01-28 01:00:27 +00:00
{
LOG_DEBUG(log, "Performing cleanup.");
2016-03-25 11:48:45 +00:00
deleteTemporaryData();
2016-03-01 17:47:53 +00:00
detachJob();
current_job.clear();
}
2016-01-28 01:00:27 +00:00
2016-03-25 11:48:45 +00:00
void ReshardingWorker::deleteTemporaryData()
2016-03-01 17:47:53 +00:00
{
auto & storage = *(current_job.storage);
2016-01-28 01:00:42 +00:00
storage.data.per_shard_data_parts.clear();
2016-01-28 01:00:27 +00:00
2016-03-25 11:48:45 +00:00
if (Poco::File{storage.full_path + "/reshard"}.exists())
2016-01-28 01:00:27 +00:00
{
2016-03-01 17:47:53 +00:00
Poco::DirectoryIterator end;
for (Poco::DirectoryIterator it(storage.full_path + "/reshard"); it != end; ++it)
{
auto absolute_path = it.path().absolute().toString();
2016-03-25 11:48:45 +00:00
Poco::File{absolute_path}.remove(true);
2016-03-01 17:47:53 +00:00
}
2016-01-28 01:00:27 +00:00
}
}
2016-03-01 17:47:53 +00:00
std::string ReshardingWorker::createCoordinator(const Cluster & cluster)
{
const std::string cluster_name = cluster.getName();
auto zookeeper = context.getZooKeeper();
auto lock = createLock();
zkutil::RWLock::Guard<zkutil::RWLock::Write> guard{lock};
auto coordinators = zookeeper->getChildren(coordination_path);
for (const auto & coordinator : coordinators)
{
auto effective_cluster_name = zookeeper->get(coordination_path + "/" + coordinator + "/cluster");
if (effective_cluster_name == cluster_name)
2016-03-25 11:48:45 +00:00
throw Exception{"The cluster specified for this table is currently busy with another "
"distributed job. Please try later", ErrorCodes::RESHARDING_BUSY_CLUSTER};
2016-03-01 17:47:53 +00:00
}
std::string coordinator_id = zookeeper->create(coordination_path + "/coordinator-", "",
zkutil::CreateMode::PersistentSequential);
coordinator_id = coordinator_id.substr(coordination_path.length() + 1);
(void) zookeeper->create(getCoordinatorPath(coordinator_id) + "/lock",
"", zkutil::CreateMode::Persistent);
(void) zookeeper->create(getCoordinatorPath(coordinator_id) + "/deletion_lock",
"", zkutil::CreateMode::Persistent);
2016-03-01 17:47:53 +00:00
(void) zookeeper->create(getCoordinatorPath(coordinator_id) + "/cluster",
cluster_name, zkutil::CreateMode::Persistent);
(void) zookeeper->create(getCoordinatorPath(coordinator_id) + "/increment",
"0", zkutil::CreateMode::Persistent);
(void) zookeeper->create(getCoordinatorPath(coordinator_id) + "/node_count",
toString(cluster.getRemoteShardCount() + cluster.getLocalShardCount()),
zkutil::CreateMode::Persistent);
(void) zookeeper->create(getCoordinatorPath(coordinator_id) + "/shards",
"", zkutil::CreateMode::Persistent);
2016-03-01 17:47:53 +00:00
(void) zookeeper->create(getCoordinatorPath(coordinator_id) + "/status",
toString(static_cast<UInt64>(STATUS_OK)), zkutil::CreateMode::Persistent);
2016-03-01 17:47:53 +00:00
2016-03-25 11:48:45 +00:00
(void) zookeeper->create(getCoordinatorPath(coordinator_id) + "/status_probe",
"", zkutil::CreateMode::Persistent);
2016-03-01 17:47:53 +00:00
(void) zookeeper->create(getCoordinatorPath(coordinator_id) + "/partitions",
"", zkutil::CreateMode::Persistent);
/// Register the addresses, IP and hostnames, of all the nodes of the cluster.
(void) zookeeper->create(getCoordinatorPath(coordinator_id) + "/cluster_addresses",
"", zkutil::CreateMode::Persistent);
auto publish_address = [&](const std::string & host, size_t shard_no)
2016-03-01 17:47:53 +00:00
{
int32_t code = zookeeper->tryCreate(getCoordinatorPath(coordinator_id) + "/cluster_addresses/"
+ host, toString(shard_no), zkutil::CreateMode::Persistent);
if ((code != ZOK) && (code != ZNODEEXISTS))
2016-03-25 11:48:45 +00:00
throw zkutil::KeeperException{code};
};
2016-03-01 17:47:53 +00:00
if (!cluster.getShardsAddresses().empty())
2016-03-01 17:47:53 +00:00
{
size_t shard_no = 0;
for (const auto & address : cluster.getShardsAddresses())
2016-03-01 17:47:53 +00:00
{
publish_address(address.host_name, shard_no);
publish_address(address.resolved_address.host().toString(), shard_no);
++shard_no;
2016-03-01 17:47:53 +00:00
}
}
else if (!cluster.getShardsWithFailoverAddresses().empty())
2016-03-01 17:47:53 +00:00
{
size_t shard_no = 0;
for (const auto & addresses : cluster.getShardsWithFailoverAddresses())
{
for (const auto & address : addresses)
{
publish_address(address.host_name, shard_no);
publish_address(address.resolved_address.host().toString(), shard_no);
}
++shard_no;
}
2016-03-01 17:47:53 +00:00
}
else
2016-03-25 11:48:45 +00:00
throw Exception{"ReshardingWorker: ill-formed cluster", ErrorCodes::LOGICAL_ERROR};
2016-03-01 17:47:53 +00:00
return coordinator_id;
}
void ReshardingWorker::registerQuery(const std::string & coordinator_id, const std::string & query)
{
auto zookeeper = context.getZooKeeper();
2016-03-25 11:48:45 +00:00
(void) zookeeper->create(getCoordinatorPath(coordinator_id) + "/query_hash",
SHA512Utils::computeHashFromString(query), zkutil::CreateMode::Persistent);
2016-03-01 17:47:53 +00:00
}
void ReshardingWorker::deleteCoordinator(const std::string & coordinator_id)
{
/// We don't acquire a scoped lock because we delete everything including this lock.
auto deletion_lock = createDeletionLock(coordinator_id);
deletion_lock.acquireWrite(zkutil::RWLock::Blocking);
2016-03-01 17:47:53 +00:00
auto zookeeper = context.getZooKeeper();
if (zookeeper->exists(getCoordinatorPath(coordinator_id)))
zookeeper->removeRecursive(getCoordinatorPath(coordinator_id));
}
UInt64 ReshardingWorker::subscribe(const std::string & coordinator_id, const std::string & query)
{
auto zookeeper = context.getZooKeeper();
if (!zookeeper->exists(getCoordinatorPath(coordinator_id)))
2016-03-25 11:48:45 +00:00
throw Exception{"Coordinator " + coordinator_id + " does not exist",
ErrorCodes::RESHARDING_NO_SUCH_COORDINATOR};
auto current_host = getFQDNOrHostName();
2016-03-01 17:47:53 +00:00
/// Make sure that this shard is not busy in another distributed job.
2016-03-01 17:47:53 +00:00
{
auto lock = createLock();
2016-03-01 17:47:53 +00:00
zkutil::RWLock::Guard<zkutil::RWLock::Read> guard{lock};
auto coordinators = zookeeper->getChildren(coordination_path);
for (const auto & coordinator : coordinators)
{
if (coordinator == coordinator_id)
continue;
auto cluster_addresses = zookeeper->getChildren(coordination_path + "/" + coordinator
+ "/cluster_addresses");
if (std::find(cluster_addresses.begin(), cluster_addresses.end(), current_host)
!= cluster_addresses.end())
2016-03-25 11:48:45 +00:00
throw Exception{"This shard is already busy with another distributed job",
ErrorCodes::RESHARDING_BUSY_SHARD};
}
}
2016-03-01 17:47:53 +00:00
2016-03-02 20:27:29 +00:00
UInt64 block_number;
{
auto lock = createCoordinatorLock(coordinator_id);
zkutil::RWLock::Guard<zkutil::RWLock::Write> guard{lock};
2016-03-01 17:47:53 +00:00
/// Make sure that the query ALTER TABLE RESHARD with the "COORDINATE WITH" tag
/// is not bogus.
2016-03-01 17:47:53 +00:00
auto cluster_addresses = zookeeper->getChildren(getCoordinatorPath(coordinator_id)
+ "/cluster_addresses");
if (std::find(cluster_addresses.begin(), cluster_addresses.end(), current_host)
== cluster_addresses.end())
2016-03-25 11:48:45 +00:00
throw Exception{"This host is not allowed to subscribe to coordinator "
2016-03-01 17:47:53 +00:00
+ coordinator_id,
2016-03-25 11:48:45 +00:00
ErrorCodes::RESHARDING_NO_COORDINATOR_MEMBERSHIP};
2016-03-01 17:47:53 +00:00
/// Check that the coordinator recognizes our query.
auto query_hash = zookeeper->get(getCoordinatorPath(coordinator_id) + "/query_hash");
2016-03-25 11:48:45 +00:00
if (SHA512Utils::computeHashFromString(query) != query_hash)
throw Exception{"Coordinator " + coordinator_id + " does not handle this query",
ErrorCodes::RESHARDING_INVALID_QUERY};
/// Access granted. Now perform subscription.
auto my_shard_no = zookeeper->get(getCoordinatorPath(coordinator_id) + "/cluster_addresses/"
+ current_host);
int32_t code = zookeeper->tryCreate(getCoordinatorPath(coordinator_id) + "/shards/"
2016-03-02 20:27:29 +00:00
+ my_shard_no, "", zkutil::CreateMode::Persistent);
2016-03-01 17:47:53 +00:00
if (code == ZNODEEXISTS)
2016-03-25 11:48:45 +00:00
throw Exception{"This shard has already subscribed to coordinator " + coordinator_id,
ErrorCodes::RESHARDING_ALREADY_SUBSCRIBED};
2016-03-01 17:47:53 +00:00
else if (code != ZOK)
2016-03-25 11:48:45 +00:00
throw zkutil::KeeperException{code};
2016-03-01 17:47:53 +00:00
zookeeper->create(getCoordinatorPath(coordinator_id) + "/status/"
2016-03-25 11:48:45 +00:00
+ current_host, Status(STATUS_OK, "").toString(), zkutil::CreateMode::Persistent);
2016-03-01 17:47:53 +00:00
2016-03-02 20:27:29 +00:00
/// Assign a unique block number to the current node. We will use it in order
/// to avoid any possible conflict when uploading resharded partitions.
2016-03-01 17:47:53 +00:00
auto current_block_number = zookeeper->get(getCoordinatorPath(coordinator_id) + "/increment");
block_number = std::stoull(current_block_number);
zookeeper->set(getCoordinatorPath(coordinator_id) + "/increment", toString(block_number + 1));
}
return block_number;
}
void ReshardingWorker::unsubscribe(const std::string & coordinator_id)
{
/// Note: we don't remove this shard from the /shards znode because
/// it can subscribe to a distributed job only if its cluster is not
/// currently busy with any distributed job.
2016-03-01 17:47:53 +00:00
auto zookeeper = context.getZooKeeper();
auto lock = createCoordinatorLock(coordinator_id);
2016-03-01 17:47:53 +00:00
zkutil::RWLock::Guard<zkutil::RWLock::Write> guard{lock};
auto current_host = getFQDNOrHostName();
zookeeper->remove(getCoordinatorPath(coordinator_id) + "/status/" + current_host);
auto node_count = zookeeper->get(getCoordinatorPath(coordinator_id) + "/node_count");
UInt64 cur_node_count = std::stoull(node_count);
if (cur_node_count == 0)
2016-03-25 11:48:45 +00:00
throw Exception{"ReshardingWorker: invalid node count", ErrorCodes::LOGICAL_ERROR};
2016-03-01 17:47:53 +00:00
zookeeper->set(getCoordinatorPath(coordinator_id) + "/node_count", toString(cur_node_count - 1));
}
void ReshardingWorker::addPartitions(const std::string & coordinator_id,
const PartitionList & partition_list)
{
auto zookeeper = context.getZooKeeper();
auto lock = createCoordinatorLock(coordinator_id);
2016-03-01 17:47:53 +00:00
zkutil::RWLock::Guard<zkutil::RWLock::Write> guard{lock};
auto current_host = getFQDNOrHostName();
for (const auto & partition : partition_list)
{
2016-03-25 11:48:45 +00:00
auto partition_path = getCoordinatorPath(coordinator_id) + "/partitions/" + partition;
auto nodes_path = partition_path + "/nodes/";
zookeeper->createAncestors(nodes_path);
(void) zookeeper->create(nodes_path + current_host, "", zkutil::CreateMode::Persistent);
zookeeper->createAncestors(partition_path + "/leader_election/");
2016-03-01 17:47:53 +00:00
}
}
ReshardingWorker::PartitionList::iterator ReshardingWorker::categorizePartitions(const std::string & coordinator_id,
PartitionList & partition_list)
{
auto current_host = getFQDNOrHostName();
auto zookeeper = context.getZooKeeper();
2016-03-01 17:47:53 +00:00
auto is_coordinated = [&](const std::string & partition)
{
auto path = getCoordinatorPath(coordinator_id) + "/partitions/" + partition + "/nodes";
auto nodes = zookeeper->getChildren(path);
if ((nodes.size() == 1) && (nodes[0] == current_host))
{
zookeeper->removeRecursive(getCoordinatorPath(coordinator_id) + "/partitions/" + partition);
return false;
}
else
return true;
};
int size = partition_list.size();
int i = -1;
int j = size;
2016-03-01 17:47:53 +00:00
{
auto lock = createCoordinatorLock(coordinator_id);
2016-03-01 17:47:53 +00:00
zkutil::RWLock::Guard<zkutil::RWLock::Write> guard{lock};
while (true)
2016-03-01 17:47:53 +00:00
{
do
2016-03-01 17:47:53 +00:00
{
++i;
2016-03-01 17:47:53 +00:00
}
while ((i < j) && (is_coordinated(partition_list[i])));
2016-03-01 17:47:53 +00:00
if (i >= j)
break;
do
{
--j;
}
while ((i < j) && (!is_coordinated(partition_list[j])));
if (i >= j)
break;
std::swap(partition_list[i], partition_list[j]);
};
}
2016-03-01 17:47:53 +00:00
auto uncoordinated_begin = std::next(partition_list.begin(), j);
2016-03-01 17:47:53 +00:00
std::sort(partition_list.begin(), uncoordinated_begin);
std::sort(uncoordinated_begin, partition_list.end());
2016-03-01 17:47:53 +00:00
return uncoordinated_begin;
2016-03-01 17:47:53 +00:00
}
size_t ReshardingWorker::getPartitionCount(const std::string & coordinator_id)
{
auto lock = createCoordinatorLock(coordinator_id);
2016-03-01 17:47:53 +00:00
zkutil::RWLock::Guard<zkutil::RWLock::Read> guard{lock};
return getPartitionCountUnlocked(coordinator_id);
}
size_t ReshardingWorker::getPartitionCountUnlocked(const std::string & coordinator_id)
{
auto zookeeper = context.getZooKeeper();
return zookeeper->getChildren(getCoordinatorPath(coordinator_id) + "/partitions").size();
}
size_t ReshardingWorker::getNodeCount(const std::string & coordinator_id)
{
auto lock = createCoordinatorLock(coordinator_id);
2016-03-01 17:47:53 +00:00
zkutil::RWLock::Guard<zkutil::RWLock::Read> guard{lock};
auto zookeeper = context.getZooKeeper();
auto count = zookeeper->get(getCoordinatorPath(coordinator_id) + "/node_count");
return std::stoull(count);
}
void ReshardingWorker::waitForCheckCompletion(const std::string & coordinator_id)
{
2016-03-08 15:38:06 +00:00
/// Since we get the information about all the shards only after
/// having crosssed this barrier, we set up a timeout for safety
/// purposes.
auto timeout = context.getSettingsRef().resharding_barrier_timeout;
createCheckBarrier(coordinator_id).enter(timeout);
2016-03-01 17:47:53 +00:00
}
void ReshardingWorker::waitForOptOutCompletion(const std::string & coordinator_id, size_t count)
{
createOptOutBarrier(coordinator_id, count).enter();
}
2016-03-25 11:48:45 +00:00
void ReshardingWorker::setStatus(const std::string & coordinator_id, StatusCode status,
const std::string & msg)
2016-03-01 17:47:53 +00:00
{
auto zookeeper = context.getZooKeeper();
2016-03-25 11:48:45 +00:00
zookeeper->set(getCoordinatorPath(coordinator_id) + "/status", Status(status, msg).toString());
zookeeper->set(getCoordinatorPath(coordinator_id) + "/status_probe", "");
2016-03-01 17:47:53 +00:00
}
void ReshardingWorker::setStatus(const std::string & coordinator_id, const std::string & hostname,
2016-03-25 11:48:45 +00:00
StatusCode status, const std::string & msg)
2016-03-01 17:47:53 +00:00
{
auto zookeeper = context.getZooKeeper();
zookeeper->set(getCoordinatorPath(coordinator_id) + "/status/" + hostname,
2016-03-25 11:48:45 +00:00
Status(status, msg).toString());
zookeeper->set(getCoordinatorPath(coordinator_id) + "/status_probe", "");
2016-03-01 17:47:53 +00:00
}
2016-03-09 13:40:47 +00:00
bool ReshardingWorker::detectOfflineNodes(const std::string & coordinator_id)
2016-03-01 17:47:53 +00:00
{
2016-03-09 13:40:47 +00:00
return detectOfflineNodesCommon(getCoordinatorPath(coordinator_id) + "/status", coordinator_id);
2016-03-01 17:47:53 +00:00
}
2016-03-09 13:40:47 +00:00
bool ReshardingWorker::detectOfflineNodes()
{
2016-03-09 13:40:47 +00:00
return detectOfflineNodesCommon(getPartitionPath(current_job) + "/nodes", current_job.coordinator_id);
}
2016-03-09 13:40:47 +00:00
bool ReshardingWorker::detectOfflineNodesCommon(const std::string & path, const std::string & coordinator_id)
2016-03-01 17:47:53 +00:00
{
auto zookeeper = context.getZooKeeper();
auto nodes = zookeeper->getChildren(path);
std::sort(nodes.begin(), nodes.end());
2016-03-01 17:47:53 +00:00
auto online = zookeeper->getChildren(distributed_path + "/online");
std::sort(online.begin(), online.end());
std::vector<std::string> offline(nodes.size());
auto end = std::set_difference(nodes.begin(), nodes.end(),
online.begin(), online.end(), offline.begin());
2016-03-01 17:47:53 +00:00
offline.resize(end - offline.begin());
2016-03-25 11:48:45 +00:00
if (!offline.empty())
{
for (const auto & node : offline)
zookeeper->set(getCoordinatorPath(coordinator_id) + "/status/" + node,
Status(STATUS_ON_HOLD, "Node has gone offline").toString());
zookeeper->set(getCoordinatorPath(coordinator_id) + "/status_probe", "");
}
2016-03-01 17:47:53 +00:00
return !offline.empty();
}
2016-03-25 11:48:45 +00:00
bool ReshardingWorker::isPublished()
{
auto zookeeper = context.getZooKeeper();
return zookeeper->exists(getLocalJobPath(current_job) + "/is_published");
}
void ReshardingWorker::markAsPublished()
{
auto zookeeper = context.getZooKeeper();
(void) zookeeper->create(getLocalJobPath(current_job) + "/is_published",
"", zkutil::CreateMode::Persistent);
}
bool ReshardingWorker::isLogCreated()
{
auto zookeeper = context.getZooKeeper();
return zookeeper->exists(getLocalJobPath(current_job) + "/is_log_created");
}
void ReshardingWorker::markLogAsCreated()
{
auto zookeeper = context.getZooKeeper();
(void) zookeeper->create(getLocalJobPath(current_job) + "/is_log_created",
"", zkutil::CreateMode::Persistent);
}
bool ReshardingWorker::isCommitted()
{
auto zookeeper = context.getZooKeeper();
return zookeeper->exists(getLocalJobPath(current_job) + "/is_committed");
}
void ReshardingWorker::markAsCommitted()
{
auto zookeeper = context.getZooKeeper();
(void) zookeeper->create(getLocalJobPath(current_job) + "/is_committed",
"", zkutil::CreateMode::Persistent);
}
ReshardingWorker::StatusCode ReshardingWorker::getCoordinatorStatus(const std::string & coordinator_id)
{
return getStatusCommon(getCoordinatorPath(coordinator_id) + "/status", coordinator_id);
}
2016-03-25 11:48:45 +00:00
ReshardingWorker::StatusCode ReshardingWorker::getStatus()
2016-03-01 17:47:53 +00:00
{
return getStatusCommon(getPartitionPath(current_job) + "/nodes", current_job.coordinator_id);
}
2016-03-25 11:48:45 +00:00
ReshardingWorker::StatusCode ReshardingWorker::getStatusCommon(const std::string & path, const std::string & coordinator_id)
{
/// Note: we don't need any synchronization for the status.
/// That's why we don't acquire any read/write lock.
2016-03-04 16:33:31 +00:00
/// All the operations are either reads or idempotent writes.
2016-03-01 17:47:53 +00:00
auto zookeeper = context.getZooKeeper();
2016-03-01 17:47:53 +00:00
2016-03-25 11:48:45 +00:00
auto coordinator_status = Status(zookeeper->get(getCoordinatorPath(coordinator_id) + "/status")).getCode();
2016-03-01 17:47:53 +00:00
if (coordinator_status != STATUS_OK)
2016-03-25 11:48:45 +00:00
return coordinator_status;
2016-03-01 17:47:53 +00:00
2016-03-09 13:40:47 +00:00
(void) detectOfflineNodesCommon(path, coordinator_id);
2016-03-01 17:47:53 +00:00
auto nodes = zookeeper->getChildren(path);
2016-03-01 17:47:53 +00:00
bool has_error = false;
bool has_on_hold = false;
/// Determine the status.
for (const auto & node : nodes)
2016-03-01 17:47:53 +00:00
{
2016-03-25 11:48:45 +00:00
auto status = Status(zookeeper->get(getCoordinatorPath(coordinator_id) + "/status/" + node)).getCode();
2016-03-01 17:47:53 +00:00
if (status == STATUS_ERROR)
has_error = true;
else if (status == STATUS_ON_HOLD)
has_on_hold = true;
}
/// Cancellation notifications have priority over error notifications.
if (has_on_hold)
return STATUS_ON_HOLD;
else if (has_error)
return STATUS_ERROR;
else
return STATUS_OK;
}
2016-03-25 11:48:45 +00:00
std::string ReshardingWorker::dumpCoordinatorState(const std::string & coordinator_id)
{
std::string out;
auto current_host = getFQDNOrHostName();
try
{
WriteBufferFromString buf{out};
writeString("Coordinator dump: ", buf);
writeString("ID: {", buf);
writeString(coordinator_id + "}; ", buf);
auto zookeeper = context.getZooKeeper();
Status status(zookeeper->get(getCoordinatorPath(coordinator_id) + "/status"));
if (status.getCode() != STATUS_OK)
{
writeString("Global status: {", buf);
writeString(status.getMessage() + "}; ", buf);
}
auto hosts = zookeeper->getChildren(getCoordinatorPath(coordinator_id) + "/status");
for (const auto & host : hosts)
{
Status status(zookeeper->get(getCoordinatorPath(coordinator_id) + "/status/" + host));
if (status.getCode() != STATUS_OK)
{
writeString("NODE ", buf);
writeString(((host == current_host) ? "localhost" : host) + ": {", buf);
writeString(status.getMessage() + "}; ", buf);
}
}
buf.next();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
return out;
}
ReshardingWorker::AnomalyType ReshardingWorker::probeForAnomaly()
{
AnomalyType anomaly_type = ANOMALY_NONE;
bool is_remote_node_unavailable = false;
bool is_remote_node_error = false;
bool cancellation_result = false;
if (current_job.isCoordinated())
{
try
{
auto status = getStatus();
if (status == STATUS_ON_HOLD)
is_remote_node_unavailable = true;
else if (status == STATUS_ERROR)
is_remote_node_error = true;
cancellation_result = status != STATUS_OK;
}
catch (...)
{
cancellation_result = true;
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
bool must_abort = must_stop || cancellation_result;
if (must_abort)
{
/// Important: always keep the following order.
if (must_stop)
anomaly_type = ANOMALY_LOCAL_SHUTDOWN;
else if (is_remote_node_unavailable)
anomaly_type = ANOMALY_REMOTE_NODE_UNAVAILABLE;
else if (is_remote_node_error)
anomaly_type = ANOMALY_REMOTE_ERROR;
else
anomaly_type = ANOMALY_LOCAL_ERROR;
}
return anomaly_type;
}
void ReshardingWorker::processAnomaly(AnomalyType anomaly_type)
{
if (anomaly_type == ANOMALY_NONE)
return;
current_job.is_aborted = true;
if (anomaly_type == ANOMALY_LOCAL_SHUTDOWN)
throw Exception{"Cancelled resharding", ErrorCodes::ABORTED};
else if (anomaly_type == ANOMALY_REMOTE_NODE_UNAVAILABLE)
throw Exception{"Remote node unavailable",
ErrorCodes::RESHARDING_REMOTE_NODE_UNAVAILABLE};
else if (anomaly_type == ANOMALY_REMOTE_ERROR)
throw Exception{"An error occurred on a remote node",
ErrorCodes::RESHARDING_REMOTE_NODE_ERROR};
else
throw Exception{"An error occurred on local node", ErrorCodes::LOGICAL_ERROR};
}
2016-03-01 17:47:53 +00:00
void ReshardingWorker::attachJob()
{
if (!current_job.isCoordinated())
return;
auto zookeeper = context.getZooKeeper();
auto status = getStatus();
if (status == STATUS_ERROR)
{
/// This case is triggered when an error occured on a participating node
/// while we went offline.
2016-03-25 11:48:45 +00:00
throw Exception{"An error occurred on a remote node", ErrorCodes::RESHARDING_REMOTE_NODE_ERROR};
2016-03-01 17:47:53 +00:00
}
else if (status == STATUS_ON_HOLD)
{
2016-03-25 11:48:45 +00:00
/// The current distributed job is on hold. Check that all the required nodes
/// are online. If it is so, wait for them to be ready to perform the job.
2016-03-01 17:47:53 +00:00
2016-03-25 11:48:45 +00:00
setStatus(current_job.coordinator_id, getFQDNOrHostName(), STATUS_OK);
2016-03-01 17:47:53 +00:00
createRecoveryBarrier(current_job).enter();
/// Catch any error that could have happened while crossing the barrier.
2016-03-25 11:48:45 +00:00
processAnomaly(probeForAnomaly());
2016-03-01 17:47:53 +00:00
}
else if (status == STATUS_OK)
{
2016-03-25 11:48:45 +00:00
/// For the purpose of creating the log, we need to know the locations
/// of all the jobs that constitute this distributed job.
zookeeper->set(getPartitionPath(current_job) + "/nodes/" + getFQDNOrHostName(),
current_job.job_name);
2016-03-01 17:47:53 +00:00
}
else
{
/// This should never happen but we must take this case into account for the sake
/// of completeness.
2016-03-25 11:48:45 +00:00
throw Exception{"ReshardingWorker: unexpected status", ErrorCodes::LOGICAL_ERROR};
2016-03-01 17:47:53 +00:00
}
}
void ReshardingWorker::detachJob()
{
if (!current_job.isCoordinated())
return;
auto zookeeper = context.getZooKeeper();
bool delete_coordinator = false;
{
2016-03-25 11:48:45 +00:00
/// detachJob() may be called when an error has occurred. For this reason,
/// in the call to createCoordinatorLock(), the flag may_use_in_emergency
/// is set to true: local or remote errors won't interrupt lock acquisition.
auto lock = createCoordinatorLock(current_job.coordinator_id, true);
2016-03-01 17:47:53 +00:00
zkutil::RWLock::Guard<zkutil::RWLock::Write> guard{lock};
auto children = zookeeper->getChildren(getPartitionPath(current_job) + "/nodes");
if (children.empty())
2016-03-25 11:48:45 +00:00
throw Exception{"ReshardingWorker: unable to detach job", ErrorCodes::LOGICAL_ERROR};
2016-03-01 17:47:53 +00:00
bool was_last_node = children.size() == 1;
auto current_host = getFQDNOrHostName();
zookeeper->remove(getPartitionPath(current_job) + "/nodes/" + current_host);
if (was_last_node)
{
/// All the participating nodes have processed the current partition.
zookeeper->removeRecursive(getPartitionPath(current_job));
if (getPartitionCountUnlocked(current_job.coordinator_id) == 0)
{
/// All the partitions of the current distributed job have been processed.
delete_coordinator = true;
}
}
}
if (delete_coordinator)
deleteCoordinator(current_job.coordinator_id);
}
void ReshardingWorker::waitForUploadCompletion()
{
if (!current_job.isCoordinated())
return;
createUploadBarrier(current_job).enter();
}
2016-03-25 11:48:45 +00:00
void ReshardingWorker::waitForElectionCompletion()
{
createElectionBarrier(current_job).enter();
}
void ReshardingWorker::waitForCommitCompletion()
{
if (!current_job.isCoordinated())
return;
createCommitBarrier(current_job).enter();
}
2016-03-01 17:47:53 +00:00
void ReshardingWorker::abortPollingIfRequested()
2016-01-28 01:00:27 +00:00
{
if (must_stop)
2016-03-25 11:48:45 +00:00
throw Exception{"Cancelled resharding", ErrorCodes::ABORTED};
2016-01-28 01:00:27 +00:00
}
2016-03-01 17:47:53 +00:00
void ReshardingWorker::abortRecoveryIfRequested()
{
bool has_offline_nodes = false;
bool must_abort;
try
{
2016-03-09 13:40:47 +00:00
has_offline_nodes = detectOfflineNodes();
2016-03-01 17:47:53 +00:00
must_abort = must_stop || has_offline_nodes;
}
catch (...)
{
must_abort = true;
tryLogCurrentException(__PRETTY_FUNCTION__);
2016-03-01 17:47:53 +00:00
}
if (must_abort)
{
if (must_stop)
2016-03-25 11:48:45 +00:00
throw Exception{"Cancelled resharding", ErrorCodes::ABORTED};
2016-03-01 17:47:53 +00:00
else if (has_offline_nodes)
2016-03-25 11:48:45 +00:00
throw Exception{"Distributed job on hold. Ignoring for now",
ErrorCodes::RESHARDING_DISTRIBUTED_JOB_ON_HOLD};
2016-03-01 17:47:53 +00:00
else
{
/// We don't throw any exception here because the other nodes waiting
/// to cross the recovery barrier would get stuck forever into a loop.
/// Instead we rely on cancellation points to detect this error and
/// therefore terminate the job.
2016-03-25 11:48:45 +00:00
setStatus(current_job.coordinator_id, getFQDNOrHostName(), STATUS_ERROR,
"Recovery failed for an unspecified reason");
2016-03-01 17:47:53 +00:00
}
}
}
void ReshardingWorker::abortCoordinatorIfRequested(const std::string & coordinator_id)
{
bool is_remote_node_unavailable = false;
bool is_remote_node_error = false;
bool cancellation_result = false;
try
{
auto status = getCoordinatorStatus(coordinator_id);
if (status == STATUS_ON_HOLD)
is_remote_node_unavailable = true;
else if (status == STATUS_ERROR)
is_remote_node_error = true;
cancellation_result = status != STATUS_OK;
}
catch (...)
{
cancellation_result = true;
tryLogCurrentException(__PRETTY_FUNCTION__);
}
bool must_abort = must_stop || cancellation_result;
if (must_abort)
{
/// Important: always keep the following order.
if (must_stop)
2016-03-25 11:48:45 +00:00
throw Exception{"Cancelled resharding", ErrorCodes::ABORTED};
else if (is_remote_node_unavailable)
2016-03-25 11:48:45 +00:00
throw Exception{"Remote node unavailable",
ErrorCodes::RESHARDING_REMOTE_NODE_UNAVAILABLE};
else if (is_remote_node_error)
2016-03-25 11:48:45 +00:00
throw Exception{"An error occurred on a remote node",
ErrorCodes::RESHARDING_REMOTE_NODE_ERROR};
else
2016-03-25 11:48:45 +00:00
throw Exception{"An error occurred on local node", ErrorCodes::LOGICAL_ERROR};
}
}
2016-03-01 17:47:53 +00:00
void ReshardingWorker::abortJobIfRequested()
{
2016-03-25 11:48:45 +00:00
AnomalyType anomaly;
2016-03-01 17:47:53 +00:00
if (current_job.isCoordinated())
2016-03-25 11:48:45 +00:00
anomaly = anomaly_monitor.getAnomalyType();
else
2016-03-01 17:47:53 +00:00
{
2016-03-25 11:48:45 +00:00
/// Very cheap because it actually just checks for must_stop.
anomaly = probeForAnomaly();
2016-03-01 17:47:53 +00:00
}
2016-03-25 11:48:45 +00:00
processAnomaly(anomaly);
2016-03-01 17:47:53 +00:00
}
zkutil::RWLock ReshardingWorker::createLock()
{
zkutil::RWLock lock{get_zookeeper, distributed_lock_path};
2016-03-01 17:47:53 +00:00
zkutil::RWLock::CancellationHook hook = std::bind(&ReshardingWorker::abortPollingIfRequested, this);
lock.setCancellationHook(hook);
return lock;
}
2016-03-25 11:48:45 +00:00
zkutil::RWLock ReshardingWorker::createCoordinatorLock(const std::string & coordinator_id,
bool usable_in_emergency)
2016-03-01 17:47:53 +00:00
{
zkutil::RWLock lock{get_zookeeper, getCoordinatorPath(coordinator_id) + "/lock"};
2016-03-25 11:48:45 +00:00
zkutil::RWLock::CancellationHook hook;
if (usable_in_emergency)
hook = std::bind(&ReshardingWorker::abortPollingIfRequested, this);
else
hook = std::bind(&ReshardingWorker::abortCoordinatorIfRequested,
this, coordinator_id);
2016-03-01 17:47:53 +00:00
lock.setCancellationHook(hook);
return lock;
}
zkutil::RWLock ReshardingWorker::createDeletionLock(const std::string & coordinator_id)
{
zkutil::RWLock lock{get_zookeeper, getCoordinatorPath(coordinator_id) + "/deletion_lock"};
zkutil::RWLock::CancellationHook hook = std::bind(&ReshardingWorker::abortPollingIfRequested, this);
lock.setCancellationHook(hook);
return lock;
}
zkutil::SingleBarrier ReshardingWorker::createCheckBarrier(const std::string & coordinator_id)
{
auto zookeeper = context.getZooKeeper();
2016-03-01 17:47:53 +00:00
auto node_count = zookeeper->get(getCoordinatorPath(coordinator_id) + "/node_count");
zkutil::SingleBarrier check_barrier{get_zookeeper, getCoordinatorPath(coordinator_id) + "/check_barrier",
std::stoull(node_count)};
zkutil::SingleBarrier::CancellationHook hook = std::bind(&ReshardingWorker::abortCoordinatorIfRequested, this,
coordinator_id);
2016-03-01 17:47:53 +00:00
check_barrier.setCancellationHook(hook);
return check_barrier;
}
zkutil::SingleBarrier ReshardingWorker::createOptOutBarrier(const std::string & coordinator_id,
size_t count)
{
zkutil::SingleBarrier opt_out_barrier{get_zookeeper, getCoordinatorPath(coordinator_id)
2016-03-01 17:47:53 +00:00
+ "/opt_out_barrier", count};
zkutil::SingleBarrier::CancellationHook hook = std::bind(&ReshardingWorker::abortCoordinatorIfRequested, this,
coordinator_id);
2016-03-01 17:47:53 +00:00
opt_out_barrier.setCancellationHook(hook);
return opt_out_barrier;
}
zkutil::SingleBarrier ReshardingWorker::createRecoveryBarrier(const ReshardingJob & job)
{
auto zookeeper = context.getZooKeeper();
auto node_count = zookeeper->getChildren(getPartitionPath(job) + "/nodes").size();
zkutil::SingleBarrier recovery_barrier{get_zookeeper, getPartitionPath(job) + "/recovery_barrier", node_count};
2016-03-01 17:47:53 +00:00
zkutil::SingleBarrier::CancellationHook hook = std::bind(&ReshardingWorker::abortRecoveryIfRequested, this);
recovery_barrier.setCancellationHook(hook);
return recovery_barrier;
}
zkutil::SingleBarrier ReshardingWorker::createUploadBarrier(const ReshardingJob & job)
{
auto zookeeper = context.getZooKeeper();
auto node_count = zookeeper->getChildren(getPartitionPath(job) + "/nodes").size();
zkutil::SingleBarrier upload_barrier{get_zookeeper, getPartitionPath(job) + "/upload_barrier", node_count};
2016-03-25 11:48:45 +00:00
2016-03-01 17:47:53 +00:00
zkutil::SingleBarrier::CancellationHook hook = std::bind(&ReshardingWorker::abortJobIfRequested, this);
upload_barrier.setCancellationHook(hook);
return upload_barrier;
}
2016-03-25 11:48:45 +00:00
zkutil::SingleBarrier ReshardingWorker::createElectionBarrier(const ReshardingJob & job)
2016-03-01 17:47:53 +00:00
{
2016-03-25 11:48:45 +00:00
auto zookeeper = context.getZooKeeper();
2016-03-01 17:47:53 +00:00
2016-03-25 11:48:45 +00:00
auto node_count = zookeeper->getChildren(getPartitionPath(job) + "/nodes").size();
2016-03-01 17:47:53 +00:00
zkutil::SingleBarrier election_barrier{get_zookeeper, getPartitionPath(job) + "/election_barrier", node_count};
2016-03-01 17:47:53 +00:00
2016-03-25 11:48:45 +00:00
zkutil::SingleBarrier::CancellationHook hook = std::bind(&ReshardingWorker::abortJobIfRequested, this);
election_barrier.setCancellationHook(hook);
return election_barrier;
}
zkutil::SingleBarrier ReshardingWorker::createCommitBarrier(const ReshardingJob & job)
{
auto zookeeper = context.getZooKeeper();
auto node_count = zookeeper->getChildren(getPartitionPath(job) + "/nodes").size();
zkutil::SingleBarrier commit_barrier{get_zookeeper, getPartitionPath(job) + "/commit_barrier", node_count};
2016-03-25 11:48:45 +00:00
zkutil::SingleBarrier::CancellationHook hook = std::bind(&ReshardingWorker::abortJobIfRequested, this);
commit_barrier.setCancellationHook(hook);
return commit_barrier;
}
void ReshardingWorker::freezeSourcePartition()
{
auto zookeeper = context.getZooKeeper();
auto & storage = *(current_job.storage);
zookeeper->createIfNotExists(storage.replica_path + "/frozen_partitions/"
+ current_job.partition, "");
storage.merger.freezePartition(current_job.partition);
}
void ReshardingWorker::unfreezeSourcePartition()
{
auto zookeeper = context.getZooKeeper();
auto & storage = *(current_job.storage);
zookeeper->remove(storage.replica_path + "/frozen_partitions/" + current_job.partition);
storage.merger.unfreezePartition(current_job.partition);
2016-03-01 17:47:53 +00:00
}
std::string ReshardingWorker::getCoordinatorPath(const std::string & coordinator_id) const
{
return coordination_path + "/" + coordinator_id;
}
std::string ReshardingWorker::getPartitionPath(const ReshardingJob & job) const
{
return coordination_path + "/" + job.coordinator_id + "/partitions/" + job.partition;
}
2016-03-25 11:48:45 +00:00
std::string ReshardingWorker::getLocalJobPath(const ReshardingJob & job) const
{
return host_task_queue_path + "/" + job.job_name;
}
ReshardingWorker::AnomalyMonitor::AnomalyMonitor(ReshardingWorker & resharding_worker_)
: resharding_worker{resharding_worker_}
{
}
ReshardingWorker::AnomalyMonitor::~AnomalyMonitor()
{
try
{
shutdown();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
void ReshardingWorker::AnomalyMonitor::start()
{
if (resharding_worker.current_job.isCoordinated())
thread_routine = std::thread{&ReshardingWorker::AnomalyMonitor::routine, this};
}
void ReshardingWorker::AnomalyMonitor::shutdown()
{
if (is_started)
{
must_stop = true;
if (thread_routine.joinable())
thread_routine.join();
is_started = false;
must_stop = false;
anomaly_type = ANOMALY_NONE;
}
}
void ReshardingWorker::AnomalyMonitor::routine()
{
bool old_val = false;
if (!is_started.compare_exchange_strong(old_val, true, std::memory_order_seq_cst,
std::memory_order_relaxed))
throw Exception{"Anomaly probing thread already started", ErrorCodes::LOGICAL_ERROR};
anomaly_type = ANOMALY_NONE;
while (!must_stop)
{
auto zookeeper = resharding_worker.context.getZooKeeper();
auto coordinator_id = resharding_worker.current_job.coordinator_id;
/// We create a new instance of Poco::Event each time we run
/// the loop body in order to avoid multiple notifications.
zkutil::EventPtr event = new Poco::Event;
/// Monitor both status changes and movements of the participating nodes.
(void) zookeeper->get(resharding_worker.getCoordinatorPath(coordinator_id)
+ "/status_probe", nullptr, event);
(void) zookeeper->getChildren(resharding_worker.distributed_online_path,
nullptr, event);
auto probed_anomaly_type = resharding_worker.probeForAnomaly();
if (probed_anomaly_type != ANOMALY_NONE)
{
/// An anomaly has just been found. No need to monitor further.
anomaly_type = probed_anomaly_type;
break;
}
while (!event->tryWait(wait_duration))
{
/// We are going offline.
if (resharding_worker.must_stop)
break;
/// We have received a request to stop this thread.
if (must_stop)
break;
}
}
}
ReshardingWorker::AnomalyType ReshardingWorker::AnomalyMonitor::getAnomalyType() const
{
return anomaly_type;
}
ReshardingWorker::LogRecord::LogRecord(zkutil::ZooKeeperPtr zookeeper_)
: zookeeper{zookeeper_}
{
}
ReshardingWorker::LogRecord::LogRecord(zkutil::ZooKeeperPtr zookeeper_, const std::string & zk_path_)
: zookeeper{zookeeper_}, zk_path{zk_path_}
{
auto serialized_record = zookeeper->get(zk_path);
ReadBufferFromString buf{serialized_record};
unsigned int val;
readVarUInt(val, buf);
operation = static_cast<Operation>(val);
readVarUInt(val, buf);
state = static_cast<State>(val);
readBinary(partition, buf);
readBinary(partition_hash, buf);
readVarUInt(shard_no, buf);
size_t s;
readVarUInt(s, buf);
for (size_t i = 0; i < s; ++i)
{
std::string part;
readBinary(part, buf);
std::string hash;
readBinary(hash, buf);
parts_with_hash.emplace(part, hash);
}
}
void ReshardingWorker::LogRecord::enqueue(const std::string & log_path)
{
(void) zookeeper->create(log_path + "/rec-", toString(), zkutil::CreateMode::PersistentSequential);
}
void ReshardingWorker::LogRecord::writeBack()
{
zookeeper->set(zk_path, toString());
}
std::string ReshardingWorker::LogRecord::toString()
{
std::string out;
WriteBufferFromString buf{out};
writeVarUInt(static_cast<unsigned int>(operation), buf);
writeVarUInt(static_cast<unsigned int>(state), buf);
writeBinary(partition, buf);
writeBinary(partition_hash, buf);
writeVarUInt(shard_no, buf);
writeVarUInt(parts_with_hash.size(), buf);
for (const auto & entry : parts_with_hash)
{
writeBinary(entry.first, buf);
writeBinary(entry.second, buf);
}
buf.next();
return out;
}
2016-01-28 01:00:27 +00:00
}