This commit is contained in:
Pavel Kartavyy 2016-03-03 18:38:13 +03:00
commit 4e2a760474
7 changed files with 190 additions and 62 deletions

View File

@ -6,7 +6,6 @@
#include <zkutil/RWLock.h>
#include <zkutil/SingleBarrier.h>
#include <zkutil/Barrier.h>
#include <Poco/Util/LayeredConfiguration.h>
#include <Poco/SharedPtr.h>
@ -91,6 +90,8 @@ public:
/// Получить статус заданной распределённой задачи.
Status getStatus();
zkutil::RWLock createDeletionLock(const std::string & coordinator_id);
private:
/// Следить за появлением новых задач. Выполнить их последовательно.
void pollAndExecute();
@ -142,13 +143,20 @@ private:
void waitForUploadCompletion();
size_t getPartitionCountUnlocked(const std::string & coordinator_id);
bool updateOfflineNodes(const std::string & coordinator_id);
bool updateOfflineNodes();
bool updateOfflineNodesCommon(const std::string & path, const std::string & coordinator_id);
Status getStatusCommon(const std::string & path, const std::string & coordinator_id);
/// Функции, которые создают необходимые объекты для синхронизации
/// распределённых задач.
zkutil::RWLock createLock();
zkutil::RWLock createCoordinatorLock(const std::string & coordinator_id);
zkutil::Barrier createCheckBarrier(const std::string & coordinator_id);
zkutil::SingleBarrier createSubscribeBarrier(const std::string & coordinator_id);
zkutil::SingleBarrier createCheckBarrier(const std::string & coordinator_id);
zkutil::SingleBarrier createOptOutBarrier(const std::string & coordinator_id, size_t count);
zkutil::SingleBarrier createRecoveryBarrier(const ReshardingJob & job);
zkutil::SingleBarrier createUploadBarrier(const ReshardingJob & job);

View File

@ -110,21 +110,23 @@ private:
///
/// /lock: global distributed read/write lock;
/// /online: currently online nodes;
/// /coordinators: one znode for each coordinator.
/// /coordination: one znode for each coordinator.
///
/// A coordinator whose identifier is ${id} has the following layout
/// under the /coordinators/${id} znode:
/// under the /coordination/${id} znode:
///
/// /lock: coordinator-specific distributed read/write lock;
///
/// /deletion_lock: for safe coordinator deletion
///
/// /query_hash: hash value obtained from the query that
/// is sent to the participating nodes;
///
/// /increment: unique block number allocator;
///
/// /status: coordinator status;
/// /status: coordinator status before its participating nodes have subscribed;
///
/// /status/${host}: distributed job status on a given participating node;
/// /status/${host}: status if an individual participating node;
///
/// /cluster: cluster on which the distributed job is to be performed;
///
@ -137,6 +139,9 @@ private:
///
/// /shards: the list of shards that have subscribed;
///
/// /subscribe_barrier: when all the participating nodes have subscribed
/// to their coordinator, proceed further
///
/// /check_barrier: when all the participating nodes have checked
/// that they can perform resharding operations, proceed further;
///
@ -177,7 +182,7 @@ ReshardingWorker::ReshardingWorker(const Poco::Util::AbstractConfiguration & con
zookeeper->createIfNotExists(distributed_online_path, "");
/// Notify that we are online.
zookeeper->create(distributed_online_path + "/" + current_host, "",
(void) zookeeper->tryCreate(distributed_online_path + "/" + current_host, "",
zkutil::CreateMode::Ephemeral);
distributed_lock_path = distributed_path + "/lock";
@ -303,6 +308,7 @@ void ReshardingWorker::pollAndExecute()
catch (...)
{
error = true;
tryLogCurrentException(__PRETTY_FUNCTION__);
}
if (error)
@ -389,6 +395,7 @@ void ReshardingWorker::perform(const Strings & job_nodes)
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
zookeeper->remove(child_full_path);
throw;
}
@ -752,6 +759,7 @@ void ReshardingWorker::applyChanges()
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
pool.wait();
throw;
}
@ -822,6 +830,9 @@ std::string ReshardingWorker::createCoordinator(const Cluster & cluster)
(void) zookeeper->create(getCoordinatorPath(coordinator_id) + "/lock",
"", zkutil::CreateMode::Persistent);
(void) zookeeper->create(getCoordinatorPath(coordinator_id) + "/deletion_lock",
"", zkutil::CreateMode::Persistent);
(void) zookeeper->create(getCoordinatorPath(coordinator_id) + "/cluster",
cluster_name, zkutil::CreateMode::Persistent);
@ -893,9 +904,14 @@ void ReshardingWorker::registerQuery(const std::string & coordinator_id, const s
void ReshardingWorker::deleteCoordinator(const std::string & coordinator_id)
{
/// We don't acquire a scoped lock because we delete everything including this lock.
auto deletion_lock = createDeletionLock(coordinator_id);
deletion_lock.acquireWrite(zkutil::RWLock::Blocking);
auto zookeeper = context.getZooKeeper();
if (zookeeper->exists(getCoordinatorPath(coordinator_id)))
zookeeper->removeRecursive(getCoordinatorPath(coordinator_id));
}
UInt64 ReshardingWorker::subscribe(const std::string & coordinator_id, const std::string & query)
@ -978,7 +994,7 @@ UInt64 ReshardingWorker::subscribe(const std::string & coordinator_id, const std
/// any guarantee that all the required nodes for this distributed job are online.
/// We are inside a lightweight function, so it is not an issue.
auto timeout = context.getSettingsRef().resharding_barrier_timeout;
createCheckBarrier(coordinator_id).enter(timeout);
createSubscribeBarrier(coordinator_id).enter(timeout);
return block_number;
}
@ -1107,7 +1123,7 @@ size_t ReshardingWorker::getNodeCount(const std::string & coordinator_id)
void ReshardingWorker::waitForCheckCompletion(const std::string & coordinator_id)
{
createCheckBarrier(coordinator_id).leave();
createCheckBarrier(coordinator_id).enter();
}
void ReshardingWorker::waitForOptOutCompletion(const std::string & coordinator_id, size_t count)
@ -1129,44 +1145,54 @@ void ReshardingWorker::setStatus(const std::string & coordinator_id, const std::
toString(static_cast<UInt64>(status)));
}
ReshardingWorker::Status ReshardingWorker::getCoordinatorStatus(const std::string & coordinator_id)
bool ReshardingWorker::updateOfflineNodes(const std::string & coordinator_id)
{
auto zookeeper = context.getZooKeeper();
auto status_str = zookeeper->get(getCoordinatorPath(coordinator_id) + "/status");
return static_cast<Status>(std::stoull(status_str));
return updateOfflineNodesCommon(getCoordinatorPath(coordinator_id) + "/status", coordinator_id);
}
bool ReshardingWorker::updateOfflineNodes()
{
return updateOfflineNodesCommon(getPartitionPath(current_job) + "/nodes", current_job.coordinator_id);
}
bool ReshardingWorker::updateOfflineNodesCommon(const std::string & path, const std::string & coordinator_id)
{
auto zookeeper = context.getZooKeeper();
auto job_nodes = zookeeper->getChildren(getPartitionPath(current_job) + "/nodes");
std::sort(job_nodes.begin(), job_nodes.end());
auto nodes = zookeeper->getChildren(path);
std::sort(nodes.begin(), nodes.end());
auto online = zookeeper->getChildren(distributed_path + "/online");
std::sort(online.begin(), online.end());
std::vector<std::string> offline(job_nodes.size());
auto end = std::set_difference(job_nodes.begin(), job_nodes.end(),
online.begin(), online.end(),
offline.begin());
std::vector<std::string> offline(nodes.size());
auto end = std::set_difference(nodes.begin(), nodes.end(),
online.begin(), online.end(), offline.begin());
offline.resize(end - offline.begin());
for (const auto & node : offline)
zookeeper->set(getCoordinatorPath(current_job.coordinator_id) + "/status/" + node,
zookeeper->set(coordinator_id + "/status/" + node,
toString(static_cast<UInt64>(STATUS_ON_HOLD)));
return !offline.empty();
}
/// Note: we don't need any synchronization for the status of a distributed job.
/// That's why we don't protect it with a read/write lock.
ReshardingWorker::Status ReshardingWorker::getCoordinatorStatus(const std::string & coordinator_id)
{
return getStatusCommon(getCoordinatorPath(coordinator_id) + "/status", coordinator_id);
}
ReshardingWorker::Status ReshardingWorker::getStatus()
{
auto zookeeper = context.getZooKeeper();
return getStatusCommon(getPartitionPath(current_job) + "/nodes", current_job.coordinator_id);
}
auto coordinator_id = current_job.coordinator_id;
ReshardingWorker::Status ReshardingWorker::getStatusCommon(const std::string & path, const std::string & coordinator_id)
{
/// Note: we don't need any synchronization for the status.
/// That's why we don't acquire any read/write lock.
auto zookeeper = context.getZooKeeper();
auto status_str = zookeeper->get(getCoordinatorPath(coordinator_id) + "/status");
auto coordinator_status = std::stoull(status_str);
@ -1174,15 +1200,15 @@ ReshardingWorker::Status ReshardingWorker::getStatus()
if (coordinator_status != STATUS_OK)
return static_cast<Status>(coordinator_status);
(void) updateOfflineNodes();
(void) updateOfflineNodesCommon(path, coordinator_id);
auto job_nodes = zookeeper->getChildren(getPartitionPath(current_job) + "/nodes");
auto nodes = zookeeper->getChildren(path);
bool has_error = false;
bool has_on_hold = false;
/// Determine the status.
for (const auto & node : job_nodes)
for (const auto & node : nodes)
{
auto status_str = zookeeper->get(getCoordinatorPath(coordinator_id) + "/status/" + node);
auto status = std::stoull(status_str);
@ -1299,23 +1325,6 @@ void ReshardingWorker::abortPollingIfRequested()
throw Exception("Cancelled resharding", ErrorCodes::ABORTED);
}
void ReshardingWorker::abortCoordinatorIfRequested(const std::string & coordinator_id)
{
bool must_abort;
try
{
must_abort = must_stop || (getCoordinatorStatus(coordinator_id) != STATUS_OK);
}
catch (...)
{
must_abort = true;
}
if (must_abort)
throw Exception("Cancelled resharding", ErrorCodes::ABORTED);
}
void ReshardingWorker::abortRecoveryIfRequested()
{
bool has_offline_nodes = false;
@ -1329,6 +1338,7 @@ void ReshardingWorker::abortRecoveryIfRequested()
catch (...)
{
must_abort = true;
tryLogCurrentException(__PRETTY_FUNCTION__);
}
if (must_abort)
@ -1349,6 +1359,47 @@ void ReshardingWorker::abortRecoveryIfRequested()
}
}
void ReshardingWorker::abortCoordinatorIfRequested(const std::string & coordinator_id)
{
bool is_remote_node_unavailable = false;
bool is_remote_node_error = false;
bool cancellation_result = false;
try
{
auto status = getCoordinatorStatus(coordinator_id);
if (status == STATUS_ON_HOLD)
is_remote_node_unavailable = true;
else if (status == STATUS_ERROR)
is_remote_node_error = true;
cancellation_result = status != STATUS_OK;
}
catch (...)
{
cancellation_result = true;
tryLogCurrentException(__PRETTY_FUNCTION__);
}
bool must_abort = must_stop || cancellation_result;
if (must_abort)
{
/// Important: always keep the following order.
if (must_stop)
throw Exception("Cancelled resharding", ErrorCodes::ABORTED);
else if (is_remote_node_unavailable)
throw Exception("Remote node unavailable",
ErrorCodes::RESHARDING_REMOTE_NODE_UNAVAILABLE);
else if (is_remote_node_error)
throw Exception("An error occurred on a remote node",
ErrorCodes::RESHARDING_REMOTE_NODE_ERROR);
else
throw Exception("An error occurred on local node", ErrorCodes::LOGICAL_ERROR);
}
}
void ReshardingWorker::abortJobIfRequested()
{
bool is_remote_node_unavailable = false;
@ -1370,6 +1421,7 @@ void ReshardingWorker::abortJobIfRequested()
catch (...)
{
cancellation_result = true;
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
@ -1397,7 +1449,7 @@ zkutil::RWLock ReshardingWorker::createLock()
{
auto zookeeper = context.getZooKeeper();
zkutil::RWLock lock(zookeeper, distributed_lock_path);
zkutil::RWLock lock{zookeeper, distributed_lock_path};
zkutil::RWLock::CancellationHook hook = std::bind(&ReshardingWorker::abortPollingIfRequested, this);
lock.setCancellationHook(hook);
@ -1408,8 +1460,7 @@ zkutil::RWLock ReshardingWorker::createCoordinatorLock(const std::string & coord
{
auto zookeeper = context.getZooKeeper();
zkutil::RWLock lock(zookeeper, getCoordinatorPath(coordinator_id) + "/lock");
zkutil::RWLock lock{zookeeper, getCoordinatorPath(coordinator_id) + "/lock"};
zkutil::RWLock::CancellationHook hook = std::bind(&ReshardingWorker::abortCoordinatorIfRequested, this,
coordinator_id);
lock.setCancellationHook(hook);
@ -1417,17 +1468,42 @@ zkutil::RWLock ReshardingWorker::createCoordinatorLock(const std::string & coord
return lock;
}
zkutil::Barrier ReshardingWorker::createCheckBarrier(const std::string & coordinator_id)
zkutil::RWLock ReshardingWorker::createDeletionLock(const std::string & coordinator_id)
{
auto zookeeper = context.getZooKeeper();
zkutil::RWLock lock{zookeeper, getCoordinatorPath(coordinator_id) + "/deletion_lock"};
zkutil::RWLock::CancellationHook hook = std::bind(&ReshardingWorker::abortPollingIfRequested, this);
lock.setCancellationHook(hook);
return lock;
}
zkutil::SingleBarrier ReshardingWorker::createSubscribeBarrier(const std::string & coordinator_id)
{
auto zookeeper = context.getZooKeeper();
auto node_count = zookeeper->get(getCoordinatorPath(coordinator_id) + "/node_count");
zkutil::Barrier check_barrier{zookeeper, getCoordinatorPath(coordinator_id) + "/check_barrier",
std::stoull(node_count)};
zkutil::Barrier::CancellationHook hook = std::bind(&ReshardingWorker::abortCoordinatorIfRequested, this,
coordinator_id
);
zkutil::SingleBarrier subscribe_barrier{zookeeper, getCoordinatorPath(coordinator_id) + "/subscribe_barrier",
std::stoull(node_count)};
zkutil::SingleBarrier::CancellationHook hook = std::bind(&ReshardingWorker::abortCoordinatorIfRequested, this,
coordinator_id);
subscribe_barrier.setCancellationHook(hook);
return subscribe_barrier;
}
zkutil::SingleBarrier ReshardingWorker::createCheckBarrier(const std::string & coordinator_id)
{
auto zookeeper = context.getZooKeeper();
auto node_count = zookeeper->get(getCoordinatorPath(coordinator_id) + "/node_count");
zkutil::SingleBarrier check_barrier{zookeeper, getCoordinatorPath(coordinator_id) + "/check_barrier",
std::stoull(node_count)};
zkutil::SingleBarrier::CancellationHook hook = std::bind(&ReshardingWorker::abortCoordinatorIfRequested, this,
coordinator_id);
check_barrier.setCancellationHook(hook);
return check_barrier;
@ -1440,7 +1516,6 @@ zkutil::SingleBarrier ReshardingWorker::createOptOutBarrier(const std::string &
zkutil::SingleBarrier opt_out_barrier{zookeeper, getCoordinatorPath(coordinator_id)
+ "/opt_out_barrier", count};
zkutil::SingleBarrier::CancellationHook hook = std::bind(&ReshardingWorker::abortCoordinatorIfRequested, this,
coordinator_id);
opt_out_barrier.setCancellationHook(hook);

View File

@ -325,13 +325,20 @@ void StorageDistributed::reshardPartitions(ASTPtr query, const String & database
throw Exception("StorageDistributed: Internal error", ErrorCodes::LOGICAL_ERROR);
auto & stream = *stream_ptr;
stream.readPrefix();
while (!stream.isCancelled() && stream.read())
;
stream.readSuffix();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
try
{
resharding_worker.setStatus(coordinator_id, ReshardingWorker::STATUS_ERROR);
resharding_worker.deleteCoordinator(coordinator_id);
}
catch (...)

View File

@ -76,6 +76,7 @@ namespace ErrorCodes
extern const int RESHARDING_INVALID_PARAMETERS;
extern const int INVALID_SHARD_WEIGHT;
extern const int DUPLICATE_SHARD_PATHS;
extern const int RESHARDING_COORDINATOR_DELETED;
extern const int RESHARDING_NO_SUCH_COORDINATOR;
extern const int RESHARDING_NO_COORDINATOR_MEMBERSHIP;
extern const int RESHARDING_ALREADY_SUBSCRIBED;
@ -3486,11 +3487,6 @@ void StorageReplicatedMergeTree::reshardPartitions(ASTPtr query, const String &
std::string coordinator_id;
UInt64 block_number = 0;
if (has_coordinator)
{
coordinator_id = coordinator.get<const String &>();
block_number = resharding_worker.subscribe(coordinator_id, queryToString(query));
}
/// List of local partitions that need to be resharded.
ReshardingWorker::PartitionList partition_list;
@ -3506,6 +3502,21 @@ void StorageReplicatedMergeTree::reshardPartitions(ASTPtr query, const String &
try
{
zkutil::RWLock deletion_lock;
if (has_coordinator)
{
coordinator_id = coordinator.get<const String &>();
deletion_lock = std::move(resharding_worker.createDeletionLock(coordinator_id));
}
zkutil::RWLock::Guard<zkutil::RWLock::Read, zkutil::RWLock::NonBlocking> guard{deletion_lock};
if (!deletion_lock.ownsLock())
throw Exception("Coordinator has been deleted", ErrorCodes::RESHARDING_COORDINATOR_DELETED);
if (has_coordinator)
block_number = resharding_worker.subscribe(coordinator_id, queryToString(query));
for (const auto & weighted_path : weighted_zookeeper_paths)
{
UInt64 weight = weighted_path.second;
@ -3665,6 +3676,11 @@ void StorageReplicatedMergeTree::reshardPartitions(ASTPtr query, const String &
/// has willfully attempted to botch an ongoing distributed resharding job.
/// Consequently we don't take them into account.
}
else if (ex.code() == ErrorCodes::RESHARDING_COORDINATOR_DELETED)
{
/// nothing here
}
else
{
try
@ -3684,6 +3700,8 @@ void StorageReplicatedMergeTree::reshardPartitions(ASTPtr query, const String &
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
if (has_coordinator)
{
try

View File

@ -2,7 +2,6 @@ add_library(zkutil
src/ZooKeeper.cpp
src/Lock.cpp
src/SingleBarrier.cpp
src/Barrier.cpp
src/RWLock.cpp
src/ZooKeeperHolder.cpp
@ -11,7 +10,6 @@ add_library(zkutil
include/zkutil/KeeperException.h
include/zkutil/Lock.h
include/zkutil/SingleBarrier.h
include/zkutil/Barrier.h
include/zkutil/RWLock.h
include/zkutil/ZooKeeper.h
include/zkutil/Types.h

View File

@ -31,6 +31,8 @@ public:
using CancellationHook = std::function<void()>;
public:
RWLock() = default;
/// Create under the specified ZooKeeper path a queue for lock requests
/// if it doesn't exist yet.
RWLock(ZooKeeperPtr & zookeeper_, const std::string & path_);
@ -41,6 +43,8 @@ public:
RWLock(RWLock &&) = default;
RWLock & operator=(RWLock &&) = default;
operator bool() const;
/// Register a function that checks whether lock acquisition should be cancelled.
void setCancellationHook(CancellationHook cancellation_hook_);

View File

@ -60,6 +60,11 @@ RWLock::RWLock(ZooKeeperPtr & zookeeper_, const std::string & path_)
throw KeeperException(code);
}
RWLock::operator bool() const
{
return zookeeper && !path.empty();
}
void RWLock::setCancellationHook(CancellationHook cancellation_hook_)
{
cancellation_hook = cancellation_hook_;
@ -79,11 +84,18 @@ void RWLock::release()
{
__sync_synchronize();
if (!*this)
{
owns_lock = false;
return;
}
if (key.empty())
throw DB::Exception("RWLock: no lock is held", DB::ErrorCodes::LOGICAL_ERROR);
zookeeper->remove(path + "/" + key);
key.clear();
owns_lock = false;
}
template <typename RWLock::Type lock_type>
@ -93,6 +105,12 @@ void RWLock::acquireImpl(Mode mode)
__sync_synchronize();
if (!*this)
{
owns_lock = true;
return;
}
if (!key.empty())
throw DB::Exception("RWLock: lock already held", DB::ErrorCodes::RWLOCK_ALREADY_HELD);