From 608aa276ab3737da9f3d9aca9f2da79bd6a763f7 Mon Sep 17 00:00:00 2001 From: Ding Xiang Fei Date: Mon, 11 Nov 2019 14:53:21 +0800 Subject: [PATCH] add safeguard against contention to check target partition row counts --- dbms/programs/copier/ClusterCopier.cpp | 428 +++++++++++++++++++------ 1 file changed, 322 insertions(+), 106 deletions(-) diff --git a/dbms/programs/copier/ClusterCopier.cpp b/dbms/programs/copier/ClusterCopier.cpp index 5fc1d76b542..95a2b2fcc69 100644 --- a/dbms/programs/copier/ClusterCopier.cpp +++ b/dbms/programs/copier/ClusterCopier.cpp @@ -1,6 +1,7 @@ #include "ClusterCopier.h" #include +#include #include #include #include @@ -178,7 +179,9 @@ struct ShardPartition ShardPartition(TaskShard & parent, const String & name_quoted_) : task_shard(parent), name(name_quoted_) {} String getPartitionPath() const; + String getPartitionCleanStartPath() const; String getCommonPartitionIsDirtyPath() const; + String getCommonPartitionIsCleanedPath() const; String getPartitionActiveWorkersPath() const; String getActiveWorkerPath() const; String getPartitionShardsPath() const; @@ -259,6 +262,8 @@ struct TaskTable String getPartitionPath(const String & partition_name) const; String getPartitionIsDirtyPath(const String & partition_name) const; + String getPartitionIsCleanedPath(const String & partition_name) const; + String getPartitionTaskStatusPath(const String & partition_name) const; String name_in_config; @@ -369,23 +374,6 @@ struct MultiTransactionInfo Coordination::Responses responses; }; - -/// Atomically checks that is_dirty node is not exists, and made the remaining op -/// Returns relative number of failed operation in the second field (the passed op has 0 index) -static MultiTransactionInfo checkNoNodeAndCommit( - const zkutil::ZooKeeperPtr & zookeeper, - const String & checking_node_path, - Coordination::RequestPtr && op) -{ - MultiTransactionInfo info; - info.requests.emplace_back(zkutil::makeCreateRequest(checking_node_path, "", zkutil::CreateMode::Persistent)); - info.requests.emplace_back(zkutil::makeRemoveRequest(checking_node_path, -1)); - info.requests.emplace_back(std::move(op)); - info.code = zookeeper->tryMulti(info.requests, info.responses); - return info; -} - - // Creates AST representing 'ENGINE = Distributed(cluster, db, table, [sharding_key]) std::shared_ptr createASTStorageDistributed( const String & cluster_name, const String & database, const String & table, const ASTPtr & sharding_key_ast = nullptr) @@ -431,6 +419,11 @@ String TaskTable::getPartitionPath(const String & partition_name) const + "/" + escapeForFileName(partition_name); // 201701 } +String ShardPartition::getPartitionCleanStartPath() const +{ + return getPartitionPath() + "/clean_start"; +} + String ShardPartition::getPartitionPath() const { return task_shard.task_table.getPartitionPath(name); @@ -438,8 +431,9 @@ String ShardPartition::getPartitionPath() const String ShardPartition::getShardStatusPath() const { - // /root/table_test.hits/201701/1 - return getPartitionPath() + "/shards/" + toString(task_shard.numberInCluster()); + // schema: //tables///shards/ + // e.g. /root/table_test.hits/201701/shards/1 + return getPartitionShardsPath() + "/" + toString(task_shard.numberInCluster()); } String ShardPartition::getPartitionShardsPath() const @@ -462,11 +456,25 @@ String ShardPartition::getCommonPartitionIsDirtyPath() const return getPartitionPath() + "/is_dirty"; } +String ShardPartition::getCommonPartitionIsCleanedPath() const { + return getCommonPartitionIsDirtyPath() + "/cleaned"; +} + String TaskTable::getPartitionIsDirtyPath(const String & partition_name) const { return getPartitionPath(partition_name) + "/is_dirty"; } +String TaskTable::getPartitionIsCleanedPath(const String & partition_name) const +{ + return getPartitionIsDirtyPath(partition_name) + "/cleaned"; +} + +String TaskTable::getPartitionTaskStatusPath(const String & partition_name) const +{ + return getPartitionPath(partition_name) + "/shards"; +} + String DB::TaskShard::getDescription() const { std::stringstream ss; @@ -1129,9 +1137,9 @@ protected: } /** Checks that the whole partition of a table was copied. We should do it carefully due to dirty lock. - * State of some task could be changed during the processing. - * We have to ensure that all shards have the finished state and there are no dirty flag. - * Moreover, we have to check status twice and check zxid, because state could be changed during the checking. + * State of some task could change during the processing. + * We have to ensure that all shards have the finished state and there is no dirty flag. + * Moreover, we have to check status twice and check zxid, because state can change during the checking. */ bool checkPartitionIsDone(const TaskTable & task_table, const String & partition_name, const TasksShard & shards_with_partition) { @@ -1170,10 +1178,22 @@ protected: } // Check that partition is not dirty - if (zookeeper->exists(task_table.getPartitionIsDirtyPath(partition_name))) { - LOG_INFO(log, "Partition " << partition_name << " become dirty"); - return false; + CleanStateClock clean_state_clock ( + zookeeper, + task_table.getPartitionIsDirtyPath(partition_name), + task_table.getPartitionIsCleanedPath(partition_name) + ); + Coordination::Stat stat; + LogicalClock task_start_clock; + if (zookeeper->exists(task_table.getPartitionTaskStatusPath(partition_name), &stat)) + task_start_clock = LogicalClock(stat.mzxid); + zookeeper->get(task_table.getPartitionTaskStatusPath(partition_name), &stat); + if (!clean_state_clock.is_clean() || task_start_clock <= clean_state_clock.discovery_zxid) + { + LOG_INFO(log, "Partition " << partition_name << " become dirty"); + return false; + } } get_futures.clear(); @@ -1260,17 +1280,122 @@ protected: return res; } - bool tryDropPartition(ShardPartition & task_partition, const zkutil::ZooKeeperPtr & zookeeper) + class LogicalClock { + public: + std::optional zxid; + + LogicalClock() = default; + + LogicalClock(UInt64 zxid) + : zxid(zxid) + {} + + bool hasHappened() const { + return bool(zxid); + } + + // happens-before relation with a reasonable time bound + bool happensBefore(const struct LogicalClock & other) const { + const UInt64 HALF = 1ull << 63; + return + !zxid || + (other.zxid && *zxid <= *other.zxid && *other.zxid - *zxid < HALF) || + (other.zxid && *zxid >= *other.zxid && *zxid - *other.zxid > HALF); + } + + bool operator<=(const struct LogicalClock & other) const { + return happensBefore(other); + } + + // strict equality check + bool operator==(const struct LogicalClock & other) const { + return zxid == other.zxid; + } + }; + + class CleanStateClock { + public: + LogicalClock discovery_zxid; + std::optional discovery_version; + + LogicalClock clean_state_zxid; + std::optional clean_state_version; + + std::shared_ptr stale; + + bool is_clean() const { + return + !is_stale() + && ( + !discovery_zxid.hasHappened() + || (clean_state_zxid.hasHappened() && discovery_zxid <= clean_state_zxid)); + } + + bool is_stale() const { + return stale->load(); + } + + CleanStateClock( + const zkutil::ZooKeeperPtr & zookeeper, + const String & discovery_path, + const String & clean_state_path) + : stale(std::make_shared(false)) + { + Coordination::Stat stat; + String _some_data; + auto watch_callback = + [stale = stale] (const Coordination::WatchResponse & rsp) { + auto logger = &Poco::Logger::get("ClusterCopier"); + if (rsp.error == Coordination::ZOK) + { + switch (rsp.type) + { + case Coordination::CREATED: + LOG_DEBUG(logger, "CleanStateClock change: CREATED, at " << rsp.path); + stale->store(true); + break; + case Coordination::CHANGED: + LOG_DEBUG(logger, "CleanStateClock change: CHANGED, at" << rsp.path); + stale->store(true); + } + } + }; + if (zookeeper->tryGetWatch(discovery_path, _some_data, &stat, watch_callback)) { + discovery_zxid = LogicalClock(stat.mzxid); + discovery_version = stat.version; + } + if (zookeeper->tryGetWatch(clean_state_path, _some_data, &stat, watch_callback)) { + clean_state_zxid = LogicalClock(stat.mzxid); + clean_state_version = stat.version; + } + } + + bool operator==(const struct CleanStateClock & other) const { + return !is_stale() + && !other.is_stale() + && discovery_zxid == other.discovery_zxid + && discovery_version == other.discovery_version + && clean_state_zxid == other.clean_state_zxid + && clean_state_version == other.clean_state_version; + } + + bool operator!=(const struct CleanStateClock & other) const { + return !(*this == other); + } + }; + + bool tryDropPartition(ShardPartition & task_partition, const zkutil::ZooKeeperPtr & zookeeper, const CleanStateClock & clean_state_clock) { if (is_safe_mode) throw Exception("DROP PARTITION is prohibited in safe mode", ErrorCodes::NOT_IMPLEMENTED); TaskTable & task_table = task_partition.task_shard.task_table; - String current_shards_path = task_partition.getPartitionShardsPath(); - String current_partition_active_workers_dir = task_partition.getPartitionActiveWorkersPath(); - String is_dirty_flag_path = task_partition.getCommonPartitionIsDirtyPath(); - String dirt_cleaner_path = is_dirty_flag_path + "/cleaner"; + const String current_shards_path = task_partition.getPartitionShardsPath(); + const String current_partition_active_workers_dir = task_partition.getPartitionActiveWorkersPath(); + const String is_dirty_flag_path = task_partition.getCommonPartitionIsDirtyPath(); + const String dirt_cleaner_path = is_dirty_flag_path + "/cleaner"; + const String is_dirt_cleaned_path = task_partition.getCommonPartitionIsCleanedPath(); zkutil::EphemeralNodeHolder::Ptr cleaner_holder; try @@ -1294,44 +1419,92 @@ protected: { if (stat.numChildren != 0) { - LOG_DEBUG(log, "Partition " << task_partition.name << " contains " << stat.numChildren << " active workers, sleep"); + LOG_DEBUG(log, "Partition " << task_partition.name << " contains " << stat.numChildren << " active workers while trying to drop it. Going to sleep."); std::this_thread::sleep_for(default_sleep_time); return false; } + else + { + zookeeper->remove(current_partition_active_workers_dir); + } } - /// Remove all status nodes - zookeeper->tryRemoveRecursive(current_shards_path); - - String query = "ALTER TABLE " + getQuotedTable(task_table.table_push); - query += " DROP PARTITION " + task_partition.name + ""; - - /// TODO: use this statement after servers will be updated up to 1.1.54310 - // query += " DROP PARTITION ID '" + task_partition.name + "'"; - - ClusterPtr & cluster_push = task_table.cluster_push; - Settings settings_push = task_cluster->settings_push; - - /// It is important, DROP PARTITION must be done synchronously - settings_push.replication_alter_partitions_sync = 2; - - LOG_DEBUG(log, "Execute distributed DROP PARTITION: " << query); - /// Limit number of max executing replicas to 1 - UInt64 num_shards = executeQueryOnCluster(cluster_push, query, nullptr, &settings_push, PoolMode::GET_ONE, 1); - - if (num_shards < cluster_push->getShardCount()) { - LOG_INFO(log, "DROP PARTITION wasn't successfully executed on " << cluster_push->getShardCount() - num_shards << " shards"); - return false; + zkutil::EphemeralNodeHolder::Ptr active_workers_lock; + try + { + active_workers_lock = zkutil::EphemeralNodeHolder::create(current_partition_active_workers_dir, *zookeeper, host_id); + } + catch (const Coordination::Exception & e) + { + if (e.code == Coordination::ZNODEEXISTS) + { + LOG_DEBUG(log, "Partition " << task_partition.name << " is being filled now by somebody, sleep"); + return false; + } + + throw; + } + + // Lock the dirty flag + zookeeper->set(is_dirty_flag_path, host_id, clean_state_clock.discovery_version.value()); + zookeeper->tryRemove(task_partition.getPartitionCleanStartPath()); + CleanStateClock my_clock(zookeeper, is_dirty_flag_path, is_dirt_cleaned_path); + + /// Remove all status nodes + { + Strings children; + if (zookeeper->tryGetChildren(current_shards_path, children) == Coordination::ZOK) + for (const auto & child : children) + { + zookeeper->removeRecursive(current_shards_path + "/" + child); + } + } + + String query = "ALTER TABLE " + getQuotedTable(task_table.table_push); + query += " DROP PARTITION " + task_partition.name + ""; + + /// TODO: use this statement after servers will be updated up to 1.1.54310 + // query += " DROP PARTITION ID '" + task_partition.name + "'"; + + ClusterPtr & cluster_push = task_table.cluster_push; + Settings settings_push = task_cluster->settings_push; + + /// It is important, DROP PARTITION must be done synchronously + settings_push.replication_alter_partitions_sync = 2; + + LOG_DEBUG(log, "Execute distributed DROP PARTITION: " << query); + /// Limit number of max executing replicas to 1 + UInt64 num_shards = executeQueryOnCluster(cluster_push, query, nullptr, &settings_push, PoolMode::GET_ONE, 1); + + if (num_shards < cluster_push->getShardCount()) + { + LOG_INFO(log, "DROP PARTITION wasn't successfully executed on " << cluster_push->getShardCount() - num_shards << " shards"); + return false; + } + + /// Update the locking node + if (!my_clock.is_stale()) + { + zookeeper->set(is_dirty_flag_path, host_id, my_clock.discovery_version.value()); + if (my_clock.clean_state_version) + zookeeper->set(is_dirt_cleaned_path, host_id, my_clock.clean_state_version.value()); + else + zookeeper->create(is_dirt_cleaned_path, host_id, zkutil::CreateMode::Persistent); + } + else + { + LOG_DEBUG(log, "Clean state is altered when dropping the partition, cowardly bailing"); + /// clean state is stale + return false; + } + + LOG_INFO(log, "Partition " << task_partition.name << " was dropped on cluster " << task_table.cluster_push_name); + if (zookeeper->tryCreate(current_shards_path, host_id, zkutil::CreateMode::Persistent) == Coordination::ZNODEEXISTS) + zookeeper->set(current_shards_path, host_id); } - /// Remove the locking node - Coordination::Requests requests; - requests.emplace_back(zkutil::makeRemoveRequest(dirt_cleaner_path, -1)); - requests.emplace_back(zkutil::makeRemoveRequest(is_dirty_flag_path, -1)); - zookeeper->multi(requests); - - LOG_INFO(log, "Partition " << task_partition.name << " was dropped on cluster " << task_table.cluster_push_name); + LOG_INFO(log, "Partition " << task_partition.name << " is safe for work now."); return true; } @@ -1362,6 +1535,7 @@ protected: /// Process each source shard having current partition and copy current partition /// NOTE: shards are sorted by "distance" to current host + bool has_shard_to_process = false; for (const TaskShardPtr & shard : task_table.all_shards) { /// Does shard have a node with current partition? @@ -1405,6 +1579,7 @@ protected: bool is_unprioritized_task = !previous_shard_is_instantly_finished && shard->priority.is_remote; PartitionTaskStatus task_status = PartitionTaskStatus::Error; bool was_error = false; + has_shard_to_process = true; for (UInt64 try_num = 0; try_num < max_shard_partition_tries; ++try_num) { task_status = tryProcessPartitionTask(timeouts, partition, is_unprioritized_task); @@ -1432,11 +1607,13 @@ protected: cluster_partition.elapsed_time_seconds += watch.elapsedSeconds(); /// Check that whole cluster partition is done - /// Firstly check number failed partition tasks, than look into ZooKeeper and ensure that each partition is done + /// Firstly check the number of failed partition tasks, then look into ZooKeeper and ensure that each partition is done bool partition_is_done = num_failed_shards == 0; try { - partition_is_done = partition_is_done && checkPartitionIsDone(task_table, partition_name, expected_shards); + partition_is_done = + !has_shard_to_process + || (partition_is_done && checkPartitionIsDone(task_table, partition_name, expected_shards)); } catch (...) { @@ -1526,20 +1703,35 @@ protected: TaskTable & task_table = task_shard.task_table; ClusterPartition & cluster_partition = task_table.getClusterPartition(task_partition.name); + /// We need to update table definitions for each partition, it could be changed after ALTER + createShardInternalTables(timeouts, task_shard); + auto zookeeper = context.getZooKeeper(); - String is_dirty_flag_path = task_partition.getCommonPartitionIsDirtyPath(); - String current_task_is_active_path = task_partition.getActiveWorkerPath(); - String current_task_status_path = task_partition.getShardStatusPath(); + const String is_dirty_flag_path = task_partition.getCommonPartitionIsDirtyPath(); + const String is_dirt_cleaned_path = task_partition.getCommonPartitionIsCleanedPath(); + const String current_task_is_active_path = task_partition.getActiveWorkerPath(); + const String current_task_status_path = task_partition.getShardStatusPath(); /// Auxiliary functions: /// Creates is_dirty node to initialize DROP PARTITION - auto create_is_dirty_node = [&] () + auto create_is_dirty_node = [&, this] (const CleanStateClock & clock) { - auto code = zookeeper->tryCreate(is_dirty_flag_path, current_task_status_path, zkutil::CreateMode::Persistent); - if (code && code != Coordination::ZNODEEXISTS) - throw Coordination::Exception(code, is_dirty_flag_path); + if (clock.is_stale()) + LOG_DEBUG(log, "Clean state clock is stale while setting dirty flag, cowardly bailing"); + else if (!clock.is_clean()) + LOG_DEBUG(log, "Thank you, Captain Obvious"); + else if (clock.discovery_version) + { + LOG_DEBUG(log, "Updating clean state clock"); + zookeeper->set(is_dirty_flag_path, host_id, clock.discovery_version.value()); + } + else + { + LOG_DEBUG(log, "Creating clean state clock"); + zookeeper->create(is_dirty_flag_path, host_id, zkutil::CreateMode::Persistent); + } }; /// Returns SELECT query filtering current partition and applying user filter @@ -1563,14 +1755,29 @@ protected: LOG_DEBUG(log, "Processing " << current_task_status_path); + CleanStateClock clean_state_clock (zookeeper, is_dirty_flag_path, is_dirt_cleaned_path); + + LogicalClock task_start_clock; + { + Coordination::Stat stat; + if (zookeeper->exists(task_partition.getPartitionShardsPath(), &stat)) + task_start_clock = LogicalClock(stat.mzxid); + } + /// Do not start if partition is dirty, try to clean it - if (zookeeper->exists(is_dirty_flag_path)) + if (clean_state_clock.is_clean() + && (!task_start_clock.hasHappened() || clean_state_clock.discovery_zxid <= task_start_clock)) + { + LOG_DEBUG(log, "Partition " << task_partition.name << " appears to be clean"); + zookeeper->createAncestors(current_task_status_path); + } + else { LOG_DEBUG(log, "Partition " << task_partition.name << " is dirty, try to drop it"); try { - tryDropPartition(task_partition, zookeeper); + tryDropPartition(task_partition, zookeeper, clean_state_clock); } catch (...) { @@ -1598,7 +1805,8 @@ protected: throw; } - /// Exit if task has been already processed, create blocking node if it is abandoned + /// Exit if task has been already processed; + /// create blocking node to signal cleaning up if it is abandoned { String status_data; if (zookeeper->tryGet(current_task_status_path, status_data)) @@ -1611,21 +1819,21 @@ protected: } // Task is abandoned, initialize DROP PARTITION - LOG_DEBUG(log, "Task " << current_task_status_path << " has not been successfully finished by " << status.owner); + LOG_DEBUG(log, "Task " << current_task_status_path << " has not been successfully finished by " << status.owner << ". Partition will be dropped and refilled."); - create_is_dirty_node(); + create_is_dirty_node(clean_state_clock); return PartitionTaskStatus::Error; } } - zookeeper->createAncestors(current_task_status_path); - - /// We need to update table definitions for each partition, it could be changed after ALTER - createShardInternalTables(timeouts, task_shard); - /// Check that destination partition is empty if we are first worker /// NOTE: this check is incorrect if pull and push tables have different partition key! + String clean_start_status; + if (!zookeeper->tryGet(task_partition.getPartitionCleanStartPath(), clean_start_status) || clean_start_status != "ok") { + zookeeper->createIfNotExists(task_partition.getPartitionCleanStartPath(), ""); + auto checker = zkutil::EphemeralNodeHolder::create(task_partition.getPartitionCleanStartPath() + "/checker", *zookeeper, host_id); + // Maybe we are the first worker ASTPtr query_select_ast = get_select_query(task_shard.table_split_shard, "count()"); UInt64 count; { @@ -1643,36 +1851,38 @@ protected: Coordination::Stat stat_shards; zookeeper->get(task_partition.getPartitionShardsPath(), &stat_shards); + /// NOTE: partition is still fresh if dirt discovery happens before cleaning if (stat_shards.numChildren == 0) { - LOG_WARNING(log, "There are no any workers for partition " << task_partition.name + LOG_WARNING(log, "There are no workers for partition " << task_partition.name << ", but destination table contains " << count << " rows" << ". Partition will be dropped and refilled."); - create_is_dirty_node(); + create_is_dirty_node(clean_state_clock); return PartitionTaskStatus::Error; } } + zookeeper->set(task_partition.getPartitionCleanStartPath(), "ok"); } + /// At this point, we need to sync that the destination table is clean + /// before any actual work /// Try start processing, create node about it { String start_state = TaskStateWithOwner::getData(TaskState::Started, host_id); - auto op_create = zkutil::makeCreateRequest(current_task_status_path, start_state, zkutil::CreateMode::Persistent); - MultiTransactionInfo info = checkNoNodeAndCommit(zookeeper, is_dirty_flag_path, std::move(op_create)); - - if (info.code) + CleanStateClock new_clean_state_clock (zookeeper, is_dirty_flag_path, is_dirt_cleaned_path); + if (clean_state_clock != new_clean_state_clock) { - zkutil::KeeperMultiException exception(info.code, info.requests, info.responses); - - if (exception.getPathForFirstFailedOp() == is_dirty_flag_path) - { - LOG_INFO(log, "Partition " << task_partition.name << " is dirty and will be dropped and refilled"); - return PartitionTaskStatus::Error; - } - - throw exception; + LOG_INFO(log, "Partition " << task_partition.name << " clean state changed, cowardly bailing"); + return PartitionTaskStatus::Error; } + else if (!new_clean_state_clock.is_clean()) + { + LOG_INFO(log, "Partition " << task_partition.name << " is dirty and will be dropped and refilled"); + create_is_dirty_node(new_clean_state_clock); + return PartitionTaskStatus::Error; + } + zookeeper->create(current_task_status_path, start_state, zkutil::CreateMode::Persistent); } /// Try create table (if not exists) on each shard @@ -1733,12 +1943,13 @@ protected: output = io_insert.out; } + /// Fail-fast optimization to abort copying when the current clean state expires std::future future_is_dirty_checker; Stopwatch watch(CLOCK_MONOTONIC_COARSE); constexpr UInt64 check_period_milliseconds = 500; - /// Will asynchronously check that ZooKeeper connection and is_dirty flag appearing while copy data + /// Will asynchronously check that ZooKeeper connection and is_dirty flag appearing while copying data auto cancel_check = [&] () { if (zookeeper->expired()) @@ -1754,7 +1965,13 @@ protected: Coordination::ExistsResponse status = future_is_dirty_checker.get(); if (status.error != Coordination::ZNONODE) + { + LogicalClock dirt_discovery_epoch (status.stat.mzxid); + if (dirt_discovery_epoch == clean_state_clock.discovery_zxid) { + return false; + } throw Exception("Partition is dirty, cancel INSERT SELECT", ErrorCodes::UNFINISHED); + } } return false; @@ -1789,20 +2006,19 @@ protected: /// Finalize the processing, change state of current partition task (and also check is_dirty flag) { String state_finished = TaskStateWithOwner::getData(TaskState::Finished, host_id); - auto op_set = zkutil::makeSetRequest(current_task_status_path, state_finished, 0); - MultiTransactionInfo info = checkNoNodeAndCommit(zookeeper, is_dirty_flag_path, std::move(op_set)); - - if (info.code) + CleanStateClock new_clean_state_clock (zookeeper, is_dirty_flag_path, is_dirt_cleaned_path); + if (clean_state_clock != new_clean_state_clock) { - zkutil::KeeperMultiException exception(info.code, info.requests, info.responses); - - if (exception.getPathForFirstFailedOp() == is_dirty_flag_path) - LOG_INFO(log, "Partition " << task_partition.name << " became dirty and will be dropped and refilled"); - else - LOG_INFO(log, "Someone made the node abandoned. Will refill partition. " << zkutil::ZooKeeper::error2string(info.code)); - + LOG_INFO(log, "Partition " << task_partition.name << " clean state changed, cowardly bailing"); return PartitionTaskStatus::Error; } + else if (!new_clean_state_clock.is_clean()) + { + LOG_INFO(log, "Partition " << task_partition.name << " became dirty and will be dropped and refilled"); + create_is_dirty_node(new_clean_state_clock); + return PartitionTaskStatus::Error; + } + zookeeper->set(current_task_status_path, state_finished, 0); } LOG_INFO(log, "Partition " << task_partition.name << " copied");