#include #include #include #include #include #include #include namespace DB { namespace ErrorCodes { extern const int BAD_ARGUMENTS; extern const int LOGICAL_ERROR; extern const int TIMEOUT_EXCEEDED; } PartMovesBetweenShardsOrchestrator::PartMovesBetweenShardsOrchestrator(StorageReplicatedMergeTree & storage_) : storage(storage_) , zookeeper_path(storage.zookeeper_path) , logger_name(storage.getStorageID().getFullTableName() + " (PartMovesBetweenShardsOrchestrator)") , log(&Poco::Logger::get(logger_name)) , entries_znode_path(zookeeper_path + "/part_moves_shard") { /// Schedule pool is not designed for long-running tasks. TODO replace with a separate thread? task = storage.getContext()->getSchedulePool().createTask(logger_name, [this]{ run(); }); } void PartMovesBetweenShardsOrchestrator::run() { if (!storage.getSettings()->part_moves_between_shards_enable) return; if (need_stop) return; /// Don't poll ZooKeeper too often. auto sleep_ms = 3 * 1000; try { syncStateFromZK(); /// Schedule for immediate re-execution as likely there is more work /// to be done. if (step()) task->schedule(); } catch (...) { tryLogCurrentException(log, __PRETTY_FUNCTION__); } task->scheduleAfter(sleep_ms); } void PartMovesBetweenShardsOrchestrator::shutdown() { need_stop = true; task->deactivate(); LOG_TRACE(log, "PartMovesBetweenShardsOrchestrator thread finished"); } void PartMovesBetweenShardsOrchestrator::syncStateFromZK() { std::lock_guard lock(state_mutex); std::vector new_entries; auto zk = storage.getZooKeeper(); Strings task_names = zk->getChildren(entries_znode_path); for (auto const & task_name : task_names) { PartMovesBetweenShardsOrchestrator::Entry e; Coordination::Stat stat; e.znode_path = entries_znode_path + "/" + task_name; auto entry_str = zk->get(e.znode_path, &stat); e.fromString(entry_str); e.version = stat.version; e.znode_name = task_name; new_entries.push_back(std::move(e)); } // Replace in-memory state. entries = new_entries; } bool PartMovesBetweenShardsOrchestrator::step() { if (!storage.is_leader) return false; auto zk = storage.getZooKeeper(); std::optional entry_to_process; /// Try find an entry to process and copy it. { std::lock_guard lock(state_mutex); for (auto const & entry : entries) { if (entry.state.value == EntryState::DONE || entry.state.value == EntryState::CANCELLED) continue; entry_to_process.emplace(entry); break; } } if (!entry_to_process.has_value()) return false; /// Since some state transitions are long running (waiting on replicas acknowledgement we create this lock to avoid /// other replicas trying to do the same work. All state transitions should be idempotent so is is safe to lose the /// lock and have another replica retry. /// /// Note: This blocks all other entries from being executed. Technical debt. zkutil::EphemeralNodeHolder::Ptr entry_node_holder; try { entry_node_holder = zkutil::EphemeralNodeHolder::create(entry_to_process->znode_path + "/lock_holder", *zk, storage.replica_name); } catch (const Coordination::Exception & e) { if (e.code == Coordination::Error::ZNODEEXISTS) { LOG_DEBUG(log, "Task {} is being processed by another replica", entry_to_process->znode_name); return false; } throw; } LOG_DEBUG(log, "stepEntry on task {} from state {} (rollback: {}), try: {}", entry_to_process->znode_name, entry_to_process->state.toString(), entry_to_process->rollback, entry_to_process->num_tries); try { /// Use the same ZooKeeper connection. If we'd lost the lock then connection /// will become expired and all consequent operations will fail. Entry new_entry = stepEntry(entry_to_process.value(), zk); new_entry.last_exception_msg = ""; new_entry.num_tries = 0; new_entry.update_time = std::time(nullptr); zk->set(new_entry.znode_path, new_entry.toString(), new_entry.version); } catch (...) { tryLogCurrentException(log, __PRETTY_FUNCTION__); Entry entry_copy = entry_to_process.value(); entry_copy.last_exception_msg = getCurrentExceptionMessage(false); entry_copy.num_tries += 1; entry_copy.update_time = std::time(nullptr); zk->set(entry_copy.znode_path, entry_copy.toString(), entry_copy.version); return false; } return true; } PartMovesBetweenShardsOrchestrator::Entry PartMovesBetweenShardsOrchestrator::stepEntry(Entry entry, zkutil::ZooKeeperPtr zk) { switch (entry.state.value) { case EntryState::DONE: [[fallthrough]]; case EntryState::CANCELLED: throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't stepEntry after terminal state. This is a bug."); case EntryState::TODO: { if (entry.rollback) { removePins(entry, zk); entry.state = EntryState::CANCELLED; return entry; } /// The forward transition happens implicitly when task is created by `StorageReplicatedMergeTree::movePartitionToShard`. else throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected entry state ({}) in stepEntry. This is a bug.", entry.state.toString()); } case EntryState::SYNC_SOURCE: { if (entry.rollback) { entry.state = EntryState::TODO; return entry; } else { ReplicatedMergeTreeLogEntryData sync_source_log_entry; String sync_source_log_entry_barrier_path = fs::path(entry.znode_path) / ("log_" + entry.state.toString()); Coordination::Stat sync_source_log_entry_stat; String sync_source_log_entry_str; if (zk->tryGet(sync_source_log_entry_barrier_path, sync_source_log_entry_str, &sync_source_log_entry_stat)) { LOG_DEBUG(log, "Log entry was already created will check the existing one."); sync_source_log_entry = *ReplicatedMergeTreeLogEntry::parse(sync_source_log_entry_str, sync_source_log_entry_stat); } else { /// Log entry. Coordination::Requests ops; ops.emplace_back(zkutil::makeCheckRequest(entry.znode_path, entry.version)); sync_source_log_entry.type = ReplicatedMergeTreeLogEntryData::SYNC_PINNED_PART_UUIDS; sync_source_log_entry.log_entry_id = sync_source_log_entry_barrier_path; sync_source_log_entry.create_time = std::time(nullptr); sync_source_log_entry.source_replica = storage.replica_name; ops.emplace_back(zkutil::makeCreateRequest(sync_source_log_entry_barrier_path, sync_source_log_entry.toString(), -1)); ops.emplace_back(zkutil::makeSetRequest(zookeeper_path + "/log", "", -1)); ops.emplace_back(zkutil::makeCreateRequest( zookeeper_path + "/log/log-", sync_source_log_entry.toString(), zkutil::CreateMode::PersistentSequential)); Coordination::Responses responses; Coordination::Error rc = zk->tryMulti(ops, responses); zkutil::KeeperMultiException::check(rc, ops, responses); String log_znode_path = dynamic_cast(*responses.back()).path_created; sync_source_log_entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1); LOG_DEBUG(log, "Pushed log entry: {}", log_znode_path); } Strings unwaited = storage.tryWaitForAllReplicasToProcessLogEntry(zookeeper_path, sync_source_log_entry, 1); if (!unwaited.empty()) throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Some replicas haven't processed event: {}, will retry later.", toString(unwaited)); entry.state = EntryState::SYNC_DESTINATION; return entry; } } case EntryState::SYNC_DESTINATION: { if (entry.rollback) { Entry entry_copy = entry; entry_copy.state = EntryState::SYNC_SOURCE; return entry_copy; } else { ReplicatedMergeTreeLogEntryData sync_destination_log_entry; String sync_destination_log_entry_barrier_path = fs::path(entry.znode_path) / ("log_" + entry.state.toString()); Coordination::Stat sync_destination_log_entry_stat; String sync_destination_log_entry_str; if (zk->tryGet(sync_destination_log_entry_barrier_path, sync_destination_log_entry_str, &sync_destination_log_entry_stat)) { LOG_DEBUG(log, "Log entry was already created will check the existing one."); sync_destination_log_entry = *ReplicatedMergeTreeLogEntry::parse(sync_destination_log_entry_str, sync_destination_log_entry_stat); } else { /// Log entry. Coordination::Requests ops; ops.emplace_back(zkutil::makeCheckRequest(entry.znode_path, entry.version)); sync_destination_log_entry.type = ReplicatedMergeTreeLogEntryData::SYNC_PINNED_PART_UUIDS; sync_destination_log_entry.log_entry_id = sync_destination_log_entry_barrier_path; sync_destination_log_entry.create_time = std::time(nullptr); sync_destination_log_entry.source_replica = storage.replica_name; sync_destination_log_entry.source_shard = zookeeper_path; ops.emplace_back(zkutil::makeCreateRequest(sync_destination_log_entry_barrier_path, sync_destination_log_entry.toString(), -1)); ops.emplace_back(zkutil::makeSetRequest(entry.to_shard + "/log", "", -1)); ops.emplace_back(zkutil::makeCreateRequest( entry.to_shard + "/log/log-", sync_destination_log_entry.toString(), zkutil::CreateMode::PersistentSequential)); Coordination::Responses responses; Coordination::Error rc = zk->tryMulti(ops, responses); zkutil::KeeperMultiException::check(rc, ops, responses); String log_znode_path = dynamic_cast(*responses.back()).path_created; sync_destination_log_entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1); LOG_DEBUG(log, "Pushed log entry: {}", log_znode_path); } Strings unwaited = storage.tryWaitForAllReplicasToProcessLogEntry(entry.to_shard, sync_destination_log_entry, 1); if (!unwaited.empty()) throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Some replicas haven't processed event: {}, will retry later.", toString(unwaited)); entry.state = EntryState::DESTINATION_FETCH; return entry; } } case EntryState::DESTINATION_FETCH: { if (entry.rollback) { // TODO(nv): Do we want to cleanup fetched data on the destination? // Maybe leave it there and make sure a background cleanup will take // care of it sometime later. entry.state = EntryState::SYNC_DESTINATION; return entry; } else { /// Note: Table structure shouldn't be changed while there are part movements in progress. ReplicatedMergeTreeLogEntryData fetch_log_entry; String fetch_log_entry_barrier_path = fs::path(entry.znode_path) / ("log_" + entry.state.toString()); Coordination::Stat fetch_log_entry_stat; String fetch_log_entry_str; if (zk->tryGet(fetch_log_entry_barrier_path, fetch_log_entry_str, &fetch_log_entry_stat)) { LOG_DEBUG(log, "Log entry was already created will check the existing one."); fetch_log_entry = *ReplicatedMergeTreeLogEntry::parse(fetch_log_entry_str, fetch_log_entry_stat); } else { Coordination::Requests ops; ops.emplace_back(zkutil::makeCheckRequest(entry.znode_path, entry.version)); fetch_log_entry.type = ReplicatedMergeTreeLogEntryData::CLONE_PART_FROM_SHARD; fetch_log_entry.log_entry_id = fetch_log_entry_barrier_path; fetch_log_entry.create_time = std::time(nullptr); fetch_log_entry.new_part_name = entry.part_name; fetch_log_entry.source_replica = storage.replica_name; fetch_log_entry.source_shard = zookeeper_path; ops.emplace_back(zkutil::makeCreateRequest(fetch_log_entry_barrier_path, fetch_log_entry.toString(), -1)); ops.emplace_back(zkutil::makeSetRequest(entry.to_shard + "/log", "", -1)); ops.emplace_back(zkutil::makeCreateRequest( entry.to_shard + "/log/log-", fetch_log_entry.toString(), zkutil::CreateMode::PersistentSequential)); Coordination::Responses responses; Coordination::Error rc = zk->tryMulti(ops, responses); zkutil::KeeperMultiException::check(rc, ops, responses); String log_znode_path = dynamic_cast(*responses.back()).path_created; fetch_log_entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1); LOG_DEBUG(log, "Pushed log entry: {}", log_znode_path); } Strings unwaited = storage.tryWaitForAllReplicasToProcessLogEntry(entry.to_shard, fetch_log_entry, 1); if (!unwaited.empty()) throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Some replicas haven't processed event: {}, will retry later.", toString(unwaited)); entry.state = EntryState::DESTINATION_ATTACH; return entry; } } case EntryState::DESTINATION_ATTACH: { String attach_log_entry_barrier_path = fs::path(entry.znode_path) / ("log_" + entry.state.toString()); if (entry.rollback) { Coordination::Stat attach_log_entry_stat; String attach_log_entry_str; if (!zk->tryGet(attach_log_entry_barrier_path, attach_log_entry_str, &attach_log_entry_stat)) { LOG_DEBUG(log, "Log entry for DESTINATION_ATTACH not found. Not sending DROP_RANGE log entry."); // ATTACH_PART wasn't issued, nothing to revert. entry.state = EntryState::DESTINATION_FETCH; return entry; } else { // Need to remove ATTACH_PART from the queue or drop data. // Similar to `StorageReplicatedMergeTree::dropPart` without extra // checks as we know drop shall be possible. ReplicatedMergeTreeLogEntryData attach_rollback_log_entry; String attach_rollback_log_entry_barrier_path = fs::path(entry.znode_path) / ("log_" + entry.state.toString() + "_rollback"); Coordination::Stat attach_rollback_log_entry_stat; String attach_rollback_log_entry_str; if (zk->tryGet(attach_rollback_log_entry_barrier_path, attach_rollback_log_entry_str, &attach_rollback_log_entry_stat)) { LOG_DEBUG(log, "Log entry was already created will check the existing one."); attach_rollback_log_entry = *ReplicatedMergeTreeLogEntry::parse(attach_rollback_log_entry_str, attach_rollback_log_entry_stat); } else { const auto attach_log_entry = ReplicatedMergeTreeLogEntry::parse(attach_log_entry_str, attach_log_entry_stat); Coordination::Requests ops; ops.emplace_back(zkutil::makeCheckRequest(entry.znode_path, entry.version)); auto drop_part_info = MergeTreePartInfo::fromPartName(attach_log_entry->new_part_name, storage.format_version); storage.getClearBlocksInPartitionOps( ops, *zk, drop_part_info.partition_id, drop_part_info.min_block, drop_part_info.max_block); size_t clear_block_ops_size = ops.size(); attach_rollback_log_entry.type = ReplicatedMergeTreeLogEntryData::DROP_RANGE; attach_rollback_log_entry.log_entry_id = attach_rollback_log_entry_barrier_path; attach_rollback_log_entry.source_replica = storage.replica_name; attach_rollback_log_entry.source_shard = zookeeper_path; attach_rollback_log_entry.new_part_name = getPartNamePossiblyFake(storage.format_version, drop_part_info); attach_rollback_log_entry.create_time = time(nullptr); ops.emplace_back(zkutil::makeCreateRequest(attach_rollback_log_entry_barrier_path, attach_rollback_log_entry.toString(), -1)); ops.emplace_back(zkutil::makeCreateRequest( entry.to_shard + "/log/log-", attach_rollback_log_entry.toString(), zkutil::CreateMode::PersistentSequential)); ops.emplace_back(zkutil::makeSetRequest(entry.to_shard + "/log", "", -1)); Coordination::Responses responses; Coordination::Error rc = zk->tryMulti(ops, responses); zkutil::KeeperMultiException::check(rc, ops, responses); String log_znode_path = dynamic_cast(*responses[clear_block_ops_size]).path_created; attach_rollback_log_entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1); LOG_DEBUG(log, "Pushed log entry: {}", log_znode_path); } Strings unwaited = storage.tryWaitForAllReplicasToProcessLogEntry(entry.to_shard, attach_rollback_log_entry, 1); if (!unwaited.empty()) throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Some replicas haven't processed event: {}, will retry later.", toString(unwaited)); entry.state = EntryState::DESTINATION_FETCH; return entry; } } else { /// There is a chance that attach on destination will fail and this task will be left in the queue forever. Coordination::Requests ops; ops.emplace_back(zkutil::makeCheckRequest(entry.znode_path, entry.version)); auto part = storage.getActiveContainingPart(entry.part_name); /// Allocating block number in other replicas zookeeper path /// TODO Maybe we can do better. auto block_number_lock = storage.allocateBlockNumber(part->info.partition_id, zk, attach_log_entry_barrier_path, entry.to_shard); ReplicatedMergeTreeLogEntryData log_entry; if (block_number_lock) { auto block_number = block_number_lock->getNumber(); auto part_info = part->info; part_info.min_block = block_number; part_info.max_block = block_number; part_info.level = 0; part_info.mutation = 0; /// Attach log entry (all replicas already fetched part) log_entry.type = ReplicatedMergeTreeLogEntryData::ATTACH_PART; log_entry.log_entry_id = attach_log_entry_barrier_path; log_entry.part_checksum = part->checksums.getTotalChecksumHex(); log_entry.create_time = std::time(nullptr); log_entry.new_part_name = part_info.getPartName(); ops.emplace_back(zkutil::makeCreateRequest(attach_log_entry_barrier_path, log_entry.toString(), -1)); ops.emplace_back(zkutil::makeSetRequest(entry.to_shard + "/log", "", -1)); ops.emplace_back(zkutil::makeCreateRequest( entry.to_shard + "/log/log-", log_entry.toString(), zkutil::CreateMode::PersistentSequential)); Coordination::Responses responses; Coordination::Error rc = zk->tryMulti(ops, responses); zkutil::KeeperMultiException::check(rc, ops, responses); String log_znode_path = dynamic_cast(*responses.back()).path_created; log_entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1); LOG_DEBUG(log, "Pushed log entry: {}", log_znode_path); } else { LOG_DEBUG(log, "Log entry was already created will check the existing one."); Coordination::Stat stat; String log_entry_str = zk->get(attach_log_entry_barrier_path, &stat); log_entry = *ReplicatedMergeTreeLogEntry::parse(log_entry_str, stat); } Strings unwaited = storage.tryWaitForAllReplicasToProcessLogEntry(entry.to_shard, log_entry, 1); if (!unwaited.empty()) throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Some replicas haven't processed event: {}, will retry later.", toString(unwaited)); entry.dst_part_name = log_entry.new_part_name; entry.state = EntryState::SOURCE_DROP_PRE_DELAY; return entry; } } case EntryState::SOURCE_DROP_PRE_DELAY: { if (entry.rollback) { entry.state = EntryState::DESTINATION_ATTACH; return entry; } else { std::this_thread::sleep_for(std::chrono::seconds(storage.getSettings()->part_moves_between_shards_delay_seconds)); entry.state = EntryState::SOURCE_DROP; return entry; } } case EntryState::SOURCE_DROP: { if (entry.rollback) throw Exception(ErrorCodes::LOGICAL_ERROR, "It is not possible to rollback from this state. This is a bug."); else { // Can't use dropPartImpl directly as we need additional zk ops to remember the log entry // for subsequent retries. ReplicatedMergeTreeLogEntryData source_drop_log_entry; String source_drop_log_entry_barrier_path = fs::path(entry.znode_path) / ("log_" + entry.state.toString()); Coordination::Stat source_drop_log_entry_stat; String source_drop_log_entry_str; if (zk->tryGet(source_drop_log_entry_barrier_path, source_drop_log_entry_str, &source_drop_log_entry_stat)) { LOG_DEBUG(log, "Log entry was already created will check the existing one."); source_drop_log_entry = *ReplicatedMergeTreeLogEntry::parse(source_drop_log_entry_str, source_drop_log_entry_stat); } else { auto source_drop_part_info = MergeTreePartInfo::fromPartName(entry.part_name, storage.format_version); Coordination::Requests ops; ops.emplace_back(zkutil::makeCheckRequest(entry.znode_path, entry.version)); storage.getClearBlocksInPartitionOps(ops, *zk, source_drop_part_info.partition_id, source_drop_part_info.min_block, source_drop_part_info.max_block); source_drop_log_entry.type = ReplicatedMergeTreeLogEntryData::DROP_RANGE; source_drop_log_entry.log_entry_id = source_drop_log_entry_barrier_path; source_drop_log_entry.create_time = std::time(nullptr); source_drop_log_entry.new_part_name = getPartNamePossiblyFake(storage.format_version, source_drop_part_info); source_drop_log_entry.source_replica = storage.replica_name; ops.emplace_back(zkutil::makeCreateRequest(source_drop_log_entry_barrier_path, source_drop_log_entry.toString(), -1)); ops.emplace_back(zkutil::makeSetRequest(zookeeper_path + "/log", "", -1)); ops.emplace_back(zkutil::makeCreateRequest( zookeeper_path + "/log/log-", source_drop_log_entry.toString(), zkutil::CreateMode::PersistentSequential)); Coordination::Responses responses; Coordination::Error rc = zk->tryMulti(ops, responses); zkutil::KeeperMultiException::check(rc, ops, responses); String log_znode_path = dynamic_cast(*responses.back()).path_created; source_drop_log_entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1); LOG_DEBUG(log, "Pushed log entry: {}", log_znode_path); } Strings unwaited = storage.tryWaitForAllReplicasToProcessLogEntry(zookeeper_path, source_drop_log_entry, 1); if (!unwaited.empty()) throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Some replicas haven't processed event: {}, will retry later.", toString(unwaited)); entry.state = EntryState::SOURCE_DROP_POST_DELAY; return entry; } } case EntryState::SOURCE_DROP_POST_DELAY: { if (entry.rollback) throw Exception(ErrorCodes::LOGICAL_ERROR, "It is not possible to rollback from this state. This is a bug."); else { std::this_thread::sleep_for(std::chrono::seconds(storage.getSettings()->part_moves_between_shards_delay_seconds)); entry.state = EntryState::REMOVE_UUID_PIN; return entry; } } case EntryState::REMOVE_UUID_PIN: { if (entry.rollback) throw Exception(ErrorCodes::LOGICAL_ERROR, "It is not possible to rollback from this state. This is a bug."); else { removePins(entry, zk); entry.state = EntryState::DONE; return entry; } } } UNREACHABLE(); } void PartMovesBetweenShardsOrchestrator::removePins(const Entry & entry, zkutil::ZooKeeperPtr zk) { PinnedPartUUIDs src_pins; PinnedPartUUIDs dst_pins; { String s = zk->get(zookeeper_path + "/pinned_part_uuids", &src_pins.stat); src_pins.fromString(s); } { String s = zk->get(entry.to_shard + "/pinned_part_uuids", &dst_pins.stat); dst_pins.fromString(s); } dst_pins.part_uuids.erase(entry.part_uuid); src_pins.part_uuids.erase(entry.part_uuid); Coordination::Requests ops; ops.emplace_back(zkutil::makeSetRequest(zookeeper_path + "/pinned_part_uuids", src_pins.toString(), src_pins.stat.version)); ops.emplace_back(zkutil::makeSetRequest(entry.to_shard + "/pinned_part_uuids", dst_pins.toString(), dst_pins.stat.version)); zk->multi(ops); } CancellationCode PartMovesBetweenShardsOrchestrator::killPartMoveToShard(const UUID & task_uuid) { while (true) { auto entry = getEntryByUUID(task_uuid); // If the task is in this state or any that follows it is too late to rollback // since we can't be sure if the source data still exists. auto not_possible_to_rollback_after_state = EntryState(EntryState::SOURCE_DROP); if (entry.state.value >= not_possible_to_rollback_after_state.value) { LOG_DEBUG(log, "Can't kill move part between shards entry {} ({}) after state {}. Current state: {}.", toString(entry.task_uuid), entry.znode_name, not_possible_to_rollback_after_state.toString(), entry.state.toString()); return CancellationCode::CancelCannotBeSent; } LOG_TRACE(log, "Will try to mark move part between shards entry {} ({}) for rollback.", toString(entry.task_uuid), entry.znode_name); auto zk = storage.getZooKeeper(); // State transition. entry.rollback = true; entry.update_time = std::time(nullptr); entry.num_tries = 0; entry.last_exception_msg = ""; auto code = zk->trySet(entry.znode_path, entry.toString(), entry.version); if (code == Coordination::Error::ZOK) { // Orchestrator will process it in background. return CancellationCode::CancelSent; } else if (code == Coordination::Error::ZBADVERSION) { /// Node was updated meanwhile. We must re-read it and repeat all the actions. continue; } else throw Coordination::Exception(code, entry.znode_path); } } std::vector PartMovesBetweenShardsOrchestrator::getEntries() { // Force sync. Also catches parsing errors. syncStateFromZK(); std::lock_guard lock(state_mutex); return entries; } PartMovesBetweenShardsOrchestrator::Entry PartMovesBetweenShardsOrchestrator::getEntryByUUID(const UUID & task_uuid) { /// Need latest state in case user tries to kill a move observed on a different replica. syncStateFromZK(); std::lock_guard lock(state_mutex); for (auto const & entry : entries) { if (entry.task_uuid == task_uuid) return entry; } throw Exception(ErrorCodes::BAD_ARGUMENTS, "Task with id {} not found", toString(task_uuid)); } String PartMovesBetweenShardsOrchestrator::Entry::toString() const { Poco::JSON::Object json; json.set(JSON_KEY_CREATE_TIME, DB::toString(create_time)); json.set(JSON_KEY_UPDATE_TIME, DB::toString(update_time)); json.set(JSON_KEY_TASK_UUID, DB::toString(task_uuid)); json.set(JSON_KEY_PART_NAME, part_name); json.set(JSON_KEY_PART_UUID, DB::toString(part_uuid)); json.set(JSON_KEY_TO_SHARD, to_shard); json.set(JSON_KEY_DST_PART_NAME, dst_part_name); json.set(JSON_KEY_STATE, state.toString()); json.set(JSON_KEY_ROLLBACK, DB::toString(rollback)); json.set(JSON_KEY_LAST_EX_MSG, last_exception_msg); json.set(JSON_KEY_NUM_TRIES, DB::toString(num_tries)); std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM oss.exceptions(std::ios::failbit); // Always escape unicode to make last_exception_msg json safe. // It may contain random binary data when exception is a parsing error // of unexpected contents. Poco::JSON::Stringifier::stringify(json, oss, 0, -1, Poco::JSON_WRAP_STRINGS | Poco::JSON_ESCAPE_UNICODE); return oss.str(); } void PartMovesBetweenShardsOrchestrator::Entry::fromString(const String & buf) { Poco::JSON::Parser parser; auto json = parser.parse(buf).extract(); create_time = parseFromString(json->getValue(JSON_KEY_CREATE_TIME)); update_time = parseFromString(json->getValue(JSON_KEY_UPDATE_TIME)); task_uuid = parseFromString(json->getValue(JSON_KEY_TASK_UUID)); part_name = json->getValue(JSON_KEY_PART_NAME); part_uuid = parseFromString(json->getValue(JSON_KEY_PART_UUID)); to_shard = json->getValue(JSON_KEY_TO_SHARD); dst_part_name = json->getValue(JSON_KEY_DST_PART_NAME); state.value = EntryState::fromString(json->getValue(JSON_KEY_STATE)); rollback = json->getValue(JSON_KEY_ROLLBACK); last_exception_msg = json->getValue(JSON_KEY_LAST_EX_MSG); num_tries = json->getValue(JSON_KEY_NUM_TRIES); } }