add safeguard against contention to check target partition row counts

This commit is contained in:
Ding Xiang Fei 2019-11-11 14:53:21 +08:00
parent b75b6c294d
commit 608aa276ab
No known key found for this signature in database
GPG Key ID: 3CD748647EEF6359

View File

@ -1,6 +1,7 @@
#include "ClusterCopier.h"
#include <chrono>
#include <optional>
#include <Poco/Util/XMLConfiguration.h>
#include <Poco/Logger.h>
#include <Poco/ConsoleChannel.h>
@ -178,7 +179,9 @@ struct ShardPartition
ShardPartition(TaskShard & parent, const String & name_quoted_) : task_shard(parent), name(name_quoted_) {}
String getPartitionPath() const;
String getPartitionCleanStartPath() const;
String getCommonPartitionIsDirtyPath() const;
String getCommonPartitionIsCleanedPath() const;
String getPartitionActiveWorkersPath() const;
String getActiveWorkerPath() const;
String getPartitionShardsPath() const;
@ -259,6 +262,8 @@ struct TaskTable
String getPartitionPath(const String & partition_name) const;
String getPartitionIsDirtyPath(const String & partition_name) const;
String getPartitionIsCleanedPath(const String & partition_name) const;
String getPartitionTaskStatusPath(const String & partition_name) const;
String name_in_config;
@ -369,23 +374,6 @@ struct MultiTransactionInfo
Coordination::Responses responses;
};
/// Atomically checks that is_dirty node is not exists, and made the remaining op
/// Returns relative number of failed operation in the second field (the passed op has 0 index)
static MultiTransactionInfo checkNoNodeAndCommit(
const zkutil::ZooKeeperPtr & zookeeper,
const String & checking_node_path,
Coordination::RequestPtr && op)
{
MultiTransactionInfo info;
info.requests.emplace_back(zkutil::makeCreateRequest(checking_node_path, "", zkutil::CreateMode::Persistent));
info.requests.emplace_back(zkutil::makeRemoveRequest(checking_node_path, -1));
info.requests.emplace_back(std::move(op));
info.code = zookeeper->tryMulti(info.requests, info.responses);
return info;
}
// Creates AST representing 'ENGINE = Distributed(cluster, db, table, [sharding_key])
std::shared_ptr<ASTStorage> createASTStorageDistributed(
const String & cluster_name, const String & database, const String & table, const ASTPtr & sharding_key_ast = nullptr)
@ -431,6 +419,11 @@ String TaskTable::getPartitionPath(const String & partition_name) const
+ "/" + escapeForFileName(partition_name); // 201701
}
String ShardPartition::getPartitionCleanStartPath() const
{
return getPartitionPath() + "/clean_start";
}
String ShardPartition::getPartitionPath() const
{
return task_shard.task_table.getPartitionPath(name);
@ -438,8 +431,9 @@ String ShardPartition::getPartitionPath() const
String ShardPartition::getShardStatusPath() const
{
// /root/table_test.hits/201701/1
return getPartitionPath() + "/shards/" + toString(task_shard.numberInCluster());
// schema: /<root...>/tables/<table>/<partition>/shards/<shard>
// e.g. /root/table_test.hits/201701/shards/1
return getPartitionShardsPath() + "/" + toString(task_shard.numberInCluster());
}
String ShardPartition::getPartitionShardsPath() const
@ -462,11 +456,25 @@ String ShardPartition::getCommonPartitionIsDirtyPath() const
return getPartitionPath() + "/is_dirty";
}
String ShardPartition::getCommonPartitionIsCleanedPath() const {
return getCommonPartitionIsDirtyPath() + "/cleaned";
}
String TaskTable::getPartitionIsDirtyPath(const String & partition_name) const
{
return getPartitionPath(partition_name) + "/is_dirty";
}
String TaskTable::getPartitionIsCleanedPath(const String & partition_name) const
{
return getPartitionIsDirtyPath(partition_name) + "/cleaned";
}
String TaskTable::getPartitionTaskStatusPath(const String & partition_name) const
{
return getPartitionPath(partition_name) + "/shards";
}
String DB::TaskShard::getDescription() const
{
std::stringstream ss;
@ -1129,9 +1137,9 @@ protected:
}
/** Checks that the whole partition of a table was copied. We should do it carefully due to dirty lock.
* State of some task could be changed during the processing.
* We have to ensure that all shards have the finished state and there are no dirty flag.
* Moreover, we have to check status twice and check zxid, because state could be changed during the checking.
* State of some task could change during the processing.
* We have to ensure that all shards have the finished state and there is no dirty flag.
* Moreover, we have to check status twice and check zxid, because state can change during the checking.
*/
bool checkPartitionIsDone(const TaskTable & task_table, const String & partition_name, const TasksShard & shards_with_partition)
{
@ -1170,11 +1178,23 @@ protected:
}
// Check that partition is not dirty
if (zookeeper->exists(task_table.getPartitionIsDirtyPath(partition_name)))
{
CleanStateClock clean_state_clock (
zookeeper,
task_table.getPartitionIsDirtyPath(partition_name),
task_table.getPartitionIsCleanedPath(partition_name)
);
Coordination::Stat stat;
LogicalClock task_start_clock;
if (zookeeper->exists(task_table.getPartitionTaskStatusPath(partition_name), &stat))
task_start_clock = LogicalClock(stat.mzxid);
zookeeper->get(task_table.getPartitionTaskStatusPath(partition_name), &stat);
if (!clean_state_clock.is_clean() || task_start_clock <= clean_state_clock.discovery_zxid)
{
LOG_INFO(log, "Partition " << partition_name << " become dirty");
return false;
}
}
get_futures.clear();
for (const String & path : status_paths)
@ -1260,17 +1280,122 @@ protected:
return res;
}
bool tryDropPartition(ShardPartition & task_partition, const zkutil::ZooKeeperPtr & zookeeper)
class LogicalClock {
public:
std::optional<UInt64> zxid;
LogicalClock() = default;
LogicalClock(UInt64 zxid)
: zxid(zxid)
{}
bool hasHappened() const {
return bool(zxid);
}
// happens-before relation with a reasonable time bound
bool happensBefore(const struct LogicalClock & other) const {
const UInt64 HALF = 1ull << 63;
return
!zxid ||
(other.zxid && *zxid <= *other.zxid && *other.zxid - *zxid < HALF) ||
(other.zxid && *zxid >= *other.zxid && *zxid - *other.zxid > HALF);
}
bool operator<=(const struct LogicalClock & other) const {
return happensBefore(other);
}
// strict equality check
bool operator==(const struct LogicalClock & other) const {
return zxid == other.zxid;
}
};
class CleanStateClock {
public:
LogicalClock discovery_zxid;
std::optional<UInt32> discovery_version;
LogicalClock clean_state_zxid;
std::optional<UInt32> clean_state_version;
std::shared_ptr<std::atomic_bool> stale;
bool is_clean() const {
return
!is_stale()
&& (
!discovery_zxid.hasHappened()
|| (clean_state_zxid.hasHappened() && discovery_zxid <= clean_state_zxid));
}
bool is_stale() const {
return stale->load();
}
CleanStateClock(
const zkutil::ZooKeeperPtr & zookeeper,
const String & discovery_path,
const String & clean_state_path)
: stale(std::make_shared<std::atomic_bool>(false))
{
Coordination::Stat stat;
String _some_data;
auto watch_callback =
[stale = stale] (const Coordination::WatchResponse & rsp) {
auto logger = &Poco::Logger::get("ClusterCopier");
if (rsp.error == Coordination::ZOK)
{
switch (rsp.type)
{
case Coordination::CREATED:
LOG_DEBUG(logger, "CleanStateClock change: CREATED, at " << rsp.path);
stale->store(true);
break;
case Coordination::CHANGED:
LOG_DEBUG(logger, "CleanStateClock change: CHANGED, at" << rsp.path);
stale->store(true);
}
}
};
if (zookeeper->tryGetWatch(discovery_path, _some_data, &stat, watch_callback)) {
discovery_zxid = LogicalClock(stat.mzxid);
discovery_version = stat.version;
}
if (zookeeper->tryGetWatch(clean_state_path, _some_data, &stat, watch_callback)) {
clean_state_zxid = LogicalClock(stat.mzxid);
clean_state_version = stat.version;
}
}
bool operator==(const struct CleanStateClock & other) const {
return !is_stale()
&& !other.is_stale()
&& discovery_zxid == other.discovery_zxid
&& discovery_version == other.discovery_version
&& clean_state_zxid == other.clean_state_zxid
&& clean_state_version == other.clean_state_version;
}
bool operator!=(const struct CleanStateClock & other) const {
return !(*this == other);
}
};
bool tryDropPartition(ShardPartition & task_partition, const zkutil::ZooKeeperPtr & zookeeper, const CleanStateClock & clean_state_clock)
{
if (is_safe_mode)
throw Exception("DROP PARTITION is prohibited in safe mode", ErrorCodes::NOT_IMPLEMENTED);
TaskTable & task_table = task_partition.task_shard.task_table;
String current_shards_path = task_partition.getPartitionShardsPath();
String current_partition_active_workers_dir = task_partition.getPartitionActiveWorkersPath();
String is_dirty_flag_path = task_partition.getCommonPartitionIsDirtyPath();
String dirt_cleaner_path = is_dirty_flag_path + "/cleaner";
const String current_shards_path = task_partition.getPartitionShardsPath();
const String current_partition_active_workers_dir = task_partition.getPartitionActiveWorkersPath();
const String is_dirty_flag_path = task_partition.getCommonPartitionIsDirtyPath();
const String dirt_cleaner_path = is_dirty_flag_path + "/cleaner";
const String is_dirt_cleaned_path = task_partition.getCommonPartitionIsCleanedPath();
zkutil::EphemeralNodeHolder::Ptr cleaner_holder;
try
@ -1294,14 +1419,47 @@ protected:
{
if (stat.numChildren != 0)
{
LOG_DEBUG(log, "Partition " << task_partition.name << " contains " << stat.numChildren << " active workers, sleep");
LOG_DEBUG(log, "Partition " << task_partition.name << " contains " << stat.numChildren << " active workers while trying to drop it. Going to sleep.");
std::this_thread::sleep_for(default_sleep_time);
return false;
}
else
{
zookeeper->remove(current_partition_active_workers_dir);
}
}
{
zkutil::EphemeralNodeHolder::Ptr active_workers_lock;
try
{
active_workers_lock = zkutil::EphemeralNodeHolder::create(current_partition_active_workers_dir, *zookeeper, host_id);
}
catch (const Coordination::Exception & e)
{
if (e.code == Coordination::ZNODEEXISTS)
{
LOG_DEBUG(log, "Partition " << task_partition.name << " is being filled now by somebody, sleep");
return false;
}
throw;
}
// Lock the dirty flag
zookeeper->set(is_dirty_flag_path, host_id, clean_state_clock.discovery_version.value());
zookeeper->tryRemove(task_partition.getPartitionCleanStartPath());
CleanStateClock my_clock(zookeeper, is_dirty_flag_path, is_dirt_cleaned_path);
/// Remove all status nodes
zookeeper->tryRemoveRecursive(current_shards_path);
{
Strings children;
if (zookeeper->tryGetChildren(current_shards_path, children) == Coordination::ZOK)
for (const auto & child : children)
{
zookeeper->removeRecursive(current_shards_path + "/" + child);
}
}
String query = "ALTER TABLE " + getQuotedTable(task_table.table_push);
query += " DROP PARTITION " + task_partition.name + "";
@ -1325,13 +1483,28 @@ protected:
return false;
}
/// Remove the locking node
Coordination::Requests requests;
requests.emplace_back(zkutil::makeRemoveRequest(dirt_cleaner_path, -1));
requests.emplace_back(zkutil::makeRemoveRequest(is_dirty_flag_path, -1));
zookeeper->multi(requests);
/// Update the locking node
if (!my_clock.is_stale())
{
zookeeper->set(is_dirty_flag_path, host_id, my_clock.discovery_version.value());
if (my_clock.clean_state_version)
zookeeper->set(is_dirt_cleaned_path, host_id, my_clock.clean_state_version.value());
else
zookeeper->create(is_dirt_cleaned_path, host_id, zkutil::CreateMode::Persistent);
}
else
{
LOG_DEBUG(log, "Clean state is altered when dropping the partition, cowardly bailing");
/// clean state is stale
return false;
}
LOG_INFO(log, "Partition " << task_partition.name << " was dropped on cluster " << task_table.cluster_push_name);
if (zookeeper->tryCreate(current_shards_path, host_id, zkutil::CreateMode::Persistent) == Coordination::ZNODEEXISTS)
zookeeper->set(current_shards_path, host_id);
}
LOG_INFO(log, "Partition " << task_partition.name << " is safe for work now.");
return true;
}
@ -1362,6 +1535,7 @@ protected:
/// Process each source shard having current partition and copy current partition
/// NOTE: shards are sorted by "distance" to current host
bool has_shard_to_process = false;
for (const TaskShardPtr & shard : task_table.all_shards)
{
/// Does shard have a node with current partition?
@ -1405,6 +1579,7 @@ protected:
bool is_unprioritized_task = !previous_shard_is_instantly_finished && shard->priority.is_remote;
PartitionTaskStatus task_status = PartitionTaskStatus::Error;
bool was_error = false;
has_shard_to_process = true;
for (UInt64 try_num = 0; try_num < max_shard_partition_tries; ++try_num)
{
task_status = tryProcessPartitionTask(timeouts, partition, is_unprioritized_task);
@ -1432,11 +1607,13 @@ protected:
cluster_partition.elapsed_time_seconds += watch.elapsedSeconds();
/// Check that whole cluster partition is done
/// Firstly check number failed partition tasks, than look into ZooKeeper and ensure that each partition is done
/// Firstly check the number of failed partition tasks, then look into ZooKeeper and ensure that each partition is done
bool partition_is_done = num_failed_shards == 0;
try
{
partition_is_done = partition_is_done && checkPartitionIsDone(task_table, partition_name, expected_shards);
partition_is_done =
!has_shard_to_process
|| (partition_is_done && checkPartitionIsDone(task_table, partition_name, expected_shards));
}
catch (...)
{
@ -1526,20 +1703,35 @@ protected:
TaskTable & task_table = task_shard.task_table;
ClusterPartition & cluster_partition = task_table.getClusterPartition(task_partition.name);
/// We need to update table definitions for each partition, it could be changed after ALTER
createShardInternalTables(timeouts, task_shard);
auto zookeeper = context.getZooKeeper();
String is_dirty_flag_path = task_partition.getCommonPartitionIsDirtyPath();
String current_task_is_active_path = task_partition.getActiveWorkerPath();
String current_task_status_path = task_partition.getShardStatusPath();
const String is_dirty_flag_path = task_partition.getCommonPartitionIsDirtyPath();
const String is_dirt_cleaned_path = task_partition.getCommonPartitionIsCleanedPath();
const String current_task_is_active_path = task_partition.getActiveWorkerPath();
const String current_task_status_path = task_partition.getShardStatusPath();
/// Auxiliary functions:
/// Creates is_dirty node to initialize DROP PARTITION
auto create_is_dirty_node = [&] ()
auto create_is_dirty_node = [&, this] (const CleanStateClock & clock)
{
auto code = zookeeper->tryCreate(is_dirty_flag_path, current_task_status_path, zkutil::CreateMode::Persistent);
if (code && code != Coordination::ZNODEEXISTS)
throw Coordination::Exception(code, is_dirty_flag_path);
if (clock.is_stale())
LOG_DEBUG(log, "Clean state clock is stale while setting dirty flag, cowardly bailing");
else if (!clock.is_clean())
LOG_DEBUG(log, "Thank you, Captain Obvious");
else if (clock.discovery_version)
{
LOG_DEBUG(log, "Updating clean state clock");
zookeeper->set(is_dirty_flag_path, host_id, clock.discovery_version.value());
}
else
{
LOG_DEBUG(log, "Creating clean state clock");
zookeeper->create(is_dirty_flag_path, host_id, zkutil::CreateMode::Persistent);
}
};
/// Returns SELECT query filtering current partition and applying user filter
@ -1563,14 +1755,29 @@ protected:
LOG_DEBUG(log, "Processing " << current_task_status_path);
CleanStateClock clean_state_clock (zookeeper, is_dirty_flag_path, is_dirt_cleaned_path);
LogicalClock task_start_clock;
{
Coordination::Stat stat;
if (zookeeper->exists(task_partition.getPartitionShardsPath(), &stat))
task_start_clock = LogicalClock(stat.mzxid);
}
/// Do not start if partition is dirty, try to clean it
if (zookeeper->exists(is_dirty_flag_path))
if (clean_state_clock.is_clean()
&& (!task_start_clock.hasHappened() || clean_state_clock.discovery_zxid <= task_start_clock))
{
LOG_DEBUG(log, "Partition " << task_partition.name << " appears to be clean");
zookeeper->createAncestors(current_task_status_path);
}
else
{
LOG_DEBUG(log, "Partition " << task_partition.name << " is dirty, try to drop it");
try
{
tryDropPartition(task_partition, zookeeper);
tryDropPartition(task_partition, zookeeper, clean_state_clock);
}
catch (...)
{
@ -1598,7 +1805,8 @@ protected:
throw;
}
/// Exit if task has been already processed, create blocking node if it is abandoned
/// Exit if task has been already processed;
/// create blocking node to signal cleaning up if it is abandoned
{
String status_data;
if (zookeeper->tryGet(current_task_status_path, status_data))
@ -1611,21 +1819,21 @@ protected:
}
// Task is abandoned, initialize DROP PARTITION
LOG_DEBUG(log, "Task " << current_task_status_path << " has not been successfully finished by " << status.owner);
LOG_DEBUG(log, "Task " << current_task_status_path << " has not been successfully finished by " << status.owner << ". Partition will be dropped and refilled.");
create_is_dirty_node();
create_is_dirty_node(clean_state_clock);
return PartitionTaskStatus::Error;
}
}
zookeeper->createAncestors(current_task_status_path);
/// We need to update table definitions for each partition, it could be changed after ALTER
createShardInternalTables(timeouts, task_shard);
/// Check that destination partition is empty if we are first worker
/// NOTE: this check is incorrect if pull and push tables have different partition key!
String clean_start_status;
if (!zookeeper->tryGet(task_partition.getPartitionCleanStartPath(), clean_start_status) || clean_start_status != "ok")
{
zookeeper->createIfNotExists(task_partition.getPartitionCleanStartPath(), "");
auto checker = zkutil::EphemeralNodeHolder::create(task_partition.getPartitionCleanStartPath() + "/checker", *zookeeper, host_id);
// Maybe we are the first worker
ASTPtr query_select_ast = get_select_query(task_shard.table_split_shard, "count()");
UInt64 count;
{
@ -1643,36 +1851,38 @@ protected:
Coordination::Stat stat_shards;
zookeeper->get(task_partition.getPartitionShardsPath(), &stat_shards);
/// NOTE: partition is still fresh if dirt discovery happens before cleaning
if (stat_shards.numChildren == 0)
{
LOG_WARNING(log, "There are no any workers for partition " << task_partition.name
LOG_WARNING(log, "There are no workers for partition " << task_partition.name
<< ", but destination table contains " << count << " rows"
<< ". Partition will be dropped and refilled.");
create_is_dirty_node();
create_is_dirty_node(clean_state_clock);
return PartitionTaskStatus::Error;
}
}
zookeeper->set(task_partition.getPartitionCleanStartPath(), "ok");
}
/// At this point, we need to sync that the destination table is clean
/// before any actual work
/// Try start processing, create node about it
{
String start_state = TaskStateWithOwner::getData(TaskState::Started, host_id);
auto op_create = zkutil::makeCreateRequest(current_task_status_path, start_state, zkutil::CreateMode::Persistent);
MultiTransactionInfo info = checkNoNodeAndCommit(zookeeper, is_dirty_flag_path, std::move(op_create));
if (info.code)
CleanStateClock new_clean_state_clock (zookeeper, is_dirty_flag_path, is_dirt_cleaned_path);
if (clean_state_clock != new_clean_state_clock)
{
zkutil::KeeperMultiException exception(info.code, info.requests, info.responses);
if (exception.getPathForFirstFailedOp() == is_dirty_flag_path)
{
LOG_INFO(log, "Partition " << task_partition.name << " is dirty and will be dropped and refilled");
LOG_INFO(log, "Partition " << task_partition.name << " clean state changed, cowardly bailing");
return PartitionTaskStatus::Error;
}
throw exception;
else if (!new_clean_state_clock.is_clean())
{
LOG_INFO(log, "Partition " << task_partition.name << " is dirty and will be dropped and refilled");
create_is_dirty_node(new_clean_state_clock);
return PartitionTaskStatus::Error;
}
zookeeper->create(current_task_status_path, start_state, zkutil::CreateMode::Persistent);
}
/// Try create table (if not exists) on each shard
@ -1733,12 +1943,13 @@ protected:
output = io_insert.out;
}
/// Fail-fast optimization to abort copying when the current clean state expires
std::future<Coordination::ExistsResponse> future_is_dirty_checker;
Stopwatch watch(CLOCK_MONOTONIC_COARSE);
constexpr UInt64 check_period_milliseconds = 500;
/// Will asynchronously check that ZooKeeper connection and is_dirty flag appearing while copy data
/// Will asynchronously check that ZooKeeper connection and is_dirty flag appearing while copying data
auto cancel_check = [&] ()
{
if (zookeeper->expired())
@ -1754,8 +1965,14 @@ protected:
Coordination::ExistsResponse status = future_is_dirty_checker.get();
if (status.error != Coordination::ZNONODE)
{
LogicalClock dirt_discovery_epoch (status.stat.mzxid);
if (dirt_discovery_epoch == clean_state_clock.discovery_zxid) {
return false;
}
throw Exception("Partition is dirty, cancel INSERT SELECT", ErrorCodes::UNFINISHED);
}
}
return false;
};
@ -1789,20 +2006,19 @@ protected:
/// Finalize the processing, change state of current partition task (and also check is_dirty flag)
{
String state_finished = TaskStateWithOwner::getData(TaskState::Finished, host_id);
auto op_set = zkutil::makeSetRequest(current_task_status_path, state_finished, 0);
MultiTransactionInfo info = checkNoNodeAndCommit(zookeeper, is_dirty_flag_path, std::move(op_set));
if (info.code)
CleanStateClock new_clean_state_clock (zookeeper, is_dirty_flag_path, is_dirt_cleaned_path);
if (clean_state_clock != new_clean_state_clock)
{
zkutil::KeeperMultiException exception(info.code, info.requests, info.responses);
if (exception.getPathForFirstFailedOp() == is_dirty_flag_path)
LOG_INFO(log, "Partition " << task_partition.name << " became dirty and will be dropped and refilled");
else
LOG_INFO(log, "Someone made the node abandoned. Will refill partition. " << zkutil::ZooKeeper::error2string(info.code));
LOG_INFO(log, "Partition " << task_partition.name << " clean state changed, cowardly bailing");
return PartitionTaskStatus::Error;
}
else if (!new_clean_state_clock.is_clean())
{
LOG_INFO(log, "Partition " << task_partition.name << " became dirty and will be dropped and refilled");
create_is_dirty_node(new_clean_state_clock);
return PartitionTaskStatus::Error;
}
zookeeper->set(current_task_status_path, state_finished, 0);
}
LOG_INFO(log, "Partition " << task_partition.name << " copied");