diff --git a/src/Access/AccessType.h b/src/Access/AccessType.h index 8e6a8be8e36..120a97d47b7 100644 --- a/src/Access/AccessType.h +++ b/src/Access/AccessType.h @@ -106,7 +106,7 @@ enum class AccessType (anyone can kill his own queries) */\ \ M(MOVE_PARTITION_BETWEEN_SHARDS, "", GLOBAL, ALL) /* required to be able to move a part/partition to a table - identified by it's ZooKeeper path */\ + identified by its ZooKeeper path */\ \ M(CREATE_USER, "", GLOBAL, ACCESS_MANAGEMENT) \ M(ALTER_USER, "", GLOBAL, ACCESS_MANAGEMENT) \ diff --git a/src/Interpreters/InterpreterKillQueryQuery.cpp b/src/Interpreters/InterpreterKillQueryQuery.cpp index d43d697fcd5..bfff1f1cd7b 100644 --- a/src/Interpreters/InterpreterKillQueryQuery.cpp +++ b/src/Interpreters/InterpreterKillQueryQuery.cpp @@ -31,6 +31,7 @@ namespace ErrorCodes { extern const int LOGICAL_ERROR; extern const int ACCESS_DENIED; + extern const int NOT_IMPLEMENTED; } @@ -290,6 +291,72 @@ BlockIO InterpreterKillQueryQuery::execute() break; } + case ASTKillQueryQuery::Type::PartMoveToShard: + { + if (query.sync) + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "SYNC modifier is not supported for this statement."); + + Block moves_block = getSelectResult( + "database, table, task_name, task_uuid, part_name, to_shard, state", + "system.part_moves_between_shards"); + + if (!moves_block) + return res_io; + + const ColumnString & database_col = typeid_cast(*moves_block.getByName("database").column); + const ColumnString & table_col = typeid_cast(*moves_block.getByName("table").column); + const ColumnUUID & task_uuid_col = typeid_cast(*moves_block.getByName("task_uuid").column); + + auto header = moves_block.cloneEmpty(); + header.insert(0, {ColumnString::create(), std::make_shared(), "kill_status"}); + + MutableColumns res_columns = header.cloneEmptyColumns(); + auto table_id = StorageID::createEmpty(); + AccessRightsElements required_access_rights; + auto access = getContext()->getAccess(); + bool access_denied = false; + + for (size_t i = 0; i < moves_block.rows(); ++i) + { + table_id = StorageID{database_col.getDataAt(i).toString(), table_col.getDataAt(i).toString()}; + auto task_uuid = get(task_uuid_col[i]); + + CancellationCode code = CancellationCode::Unknown; + + if (!query.test) + { + auto storage = DatabaseCatalog::instance().tryGetTable(table_id, getContext()); + if (!storage) + code = CancellationCode::NotFound; + else + { + ASTAlterCommand alter_command{}; + alter_command.type = ASTAlterCommand::MOVE_PARTITION; + alter_command.move_destination_type = DataDestinationType::SHARD; + required_access_rights = InterpreterAlterQuery::getRequiredAccessForCommand( + alter_command, table_id.database_name, table_id.table_name); + if (!access->isGranted(required_access_rights)) + { + access_denied = true; + continue; + } + code = storage->killPartMoveToShard(task_uuid); + } + } + + insertResultRow(i, code, moves_block, header, res_columns); + } + + if (res_columns[0]->empty() && access_denied) + throw Exception( + "Not allowed to kill move partition. To execute this query it's necessary to have the grant " + required_access_rights.toString(), + ErrorCodes::ACCESS_DENIED); + + + res_io.in = std::make_shared(header.cloneWithColumns(std::move(res_columns))); + + break; + } } return res_io; diff --git a/src/Parsers/ASTKillQueryQuery.cpp b/src/Parsers/ASTKillQueryQuery.cpp index 72bdd7d6b0b..71c3011dd2c 100644 --- a/src/Parsers/ASTKillQueryQuery.cpp +++ b/src/Parsers/ASTKillQueryQuery.cpp @@ -11,8 +11,20 @@ String ASTKillQueryQuery::getID(char delim) const void ASTKillQueryQuery::formatQueryImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const { - settings.ostr << (settings.hilite ? hilite_keyword : "") << "KILL " - << (type == Type::Query ? "QUERY" : "MUTATION"); + settings.ostr << (settings.hilite ? hilite_keyword : "") << "KILL "; + + switch (type) + { + case Type::Query: + settings.ostr << "QUERY"; + break; + case Type::Mutation: + settings.ostr << "MUTATION"; + break; + case Type::PartMoveToShard: + settings.ostr << "PART_MOVE_TO_SHARD"; + break; + } formatOnCluster(settings); diff --git a/src/Parsers/ASTKillQueryQuery.h b/src/Parsers/ASTKillQueryQuery.h index c1b3956962f..6ff12bcba93 100644 --- a/src/Parsers/ASTKillQueryQuery.h +++ b/src/Parsers/ASTKillQueryQuery.h @@ -13,6 +13,7 @@ public: { Query, /// KILL QUERY Mutation, /// KILL MUTATION + PartMoveToShard, /// KILL PART_MOVE_TO_SHARD }; Type type = Type::Query; diff --git a/src/Parsers/ParserKillQueryQuery.cpp b/src/Parsers/ParserKillQueryQuery.cpp index a195a778ed2..bc895406c9f 100644 --- a/src/Parsers/ParserKillQueryQuery.cpp +++ b/src/Parsers/ParserKillQueryQuery.cpp @@ -17,6 +17,7 @@ bool ParserKillQueryQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expect ParserKeyword p_kill{"KILL"}; ParserKeyword p_query{"QUERY"}; ParserKeyword p_mutation{"MUTATION"}; + ParserKeyword p_part_move_to_shard{"PART_MOVE_TO_SHARD"}; ParserKeyword p_on{"ON"}; ParserKeyword p_test{"TEST"}; ParserKeyword p_sync{"SYNC"}; @@ -31,6 +32,8 @@ bool ParserKillQueryQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expect query->type = ASTKillQueryQuery::Type::Query; else if (p_mutation.ignore(pos, expected)) query->type = ASTKillQueryQuery::Type::Mutation; + else if (p_part_move_to_shard.ignore(pos, expected)) + query->type = ASTKillQueryQuery::Type::PartMoveToShard; else return false; diff --git a/src/Storages/IStorage.h b/src/Storages/IStorage.h index 90cb963e064..9c340100469 100644 --- a/src/Storages/IStorage.h +++ b/src/Storages/IStorage.h @@ -460,6 +460,12 @@ public: throw Exception("Mutations are not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED); } + /// Cancel a part move to shard. + virtual CancellationCode killPartMoveToShard(const UUID & /*task_uuid*/) + { + throw Exception("Part moves between shards are not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED); + } + /** If the table have to do some complicated work on startup, * that must be postponed after creation of table object * (like launching some background threads), diff --git a/src/Storages/MergeTree/PartMovesBetweenShardsOrchestrator.cpp b/src/Storages/MergeTree/PartMovesBetweenShardsOrchestrator.cpp index c227febbbc2..b3a17250549 100644 --- a/src/Storages/MergeTree/PartMovesBetweenShardsOrchestrator.cpp +++ b/src/Storages/MergeTree/PartMovesBetweenShardsOrchestrator.cpp @@ -1,13 +1,20 @@ #include #include #include -#include #include #include #include namespace DB { + +namespace ErrorCodes +{ + extern const int BAD_ARGUMENTS; + extern const int LOGICAL_ERROR; + extern const int TIMEOUT_EXCEEDED; +} + PartMovesBetweenShardsOrchestrator::PartMovesBetweenShardsOrchestrator(StorageReplicatedMergeTree & storage_) : storage(storage_) , zookeeper_path(storage.zookeeper_path) @@ -27,16 +34,17 @@ void PartMovesBetweenShardsOrchestrator::run() if (need_stop) return; - auto sleep_ms = 10; + /// Don't poll ZooKeeper too often. + auto sleep_ms = 3 * 1000; try { - fetchStateFromZK(); + syncStateFromZK(); + /// Schedule for immediate re-execution as likely there is more work + /// to be done. if (step()) - fetchStateFromZK(); - else - sleep_ms = 3 * 1000; + task->schedule(); } catch (...) { @@ -54,11 +62,11 @@ void PartMovesBetweenShardsOrchestrator::shutdown() LOG_TRACE(log, "PartMovesBetweenShardsOrchestrator thread finished"); } -void PartMovesBetweenShardsOrchestrator::fetchStateFromZK() +void PartMovesBetweenShardsOrchestrator::syncStateFromZK() { std::lock_guard lock(state_mutex); - entries.clear(); + std::vector new_entries; auto zk = storage.getZooKeeper(); @@ -76,8 +84,11 @@ void PartMovesBetweenShardsOrchestrator::fetchStateFromZK() e.version = stat.version; e.znode_name = task_name; - entries[task_name] = std::move(e); + new_entries.push_back(std::move(e)); } + + // Replace in-memory state. + entries = new_entries; } bool PartMovesBetweenShardsOrchestrator::step() @@ -93,7 +104,7 @@ bool PartMovesBetweenShardsOrchestrator::step() { std::lock_guard lock(state_mutex); - for (auto const & entry : entries | boost::adaptors::map_values) + for (auto const & entry : entries) { if (entry.state.value == EntryState::DONE || entry.state.value == EntryState::CANCELLED) continue; @@ -128,11 +139,21 @@ bool PartMovesBetweenShardsOrchestrator::step() throw; } + LOG_DEBUG(log, "stepEntry on task {} from state {} (rollback: {}), try: {}", + entry_to_process->znode_name, + entry_to_process->state.toString(), + entry_to_process->rollback, + entry_to_process->num_tries); + try { /// Use the same ZooKeeper connection. If we'd lost the lock then connection /// will become expired and all consequent operations will fail. - stepEntry(entry_to_process.value(), zk); + Entry new_entry = stepEntry(entry_to_process.value(), zk); + new_entry.last_exception_msg = ""; + new_entry.num_tries = 0; + new_entry.update_time = std::time(nullptr); + zk->set(new_entry.znode_path, new_entry.toString(), new_entry.version); } catch (...) { @@ -140,6 +161,7 @@ bool PartMovesBetweenShardsOrchestrator::step() Entry entry_copy = entry_to_process.value(); entry_copy.last_exception_msg = getCurrentExceptionMessage(false); + entry_copy.num_tries += 1; entry_copy.update_time = std::time(nullptr); zk->set(entry_copy.znode_path, entry_copy.toString(), entry_copy.version); @@ -149,276 +171,537 @@ bool PartMovesBetweenShardsOrchestrator::step() return true; } -void PartMovesBetweenShardsOrchestrator::stepEntry(const Entry & entry, zkutil::ZooKeeperPtr zk) +PartMovesBetweenShardsOrchestrator::Entry PartMovesBetweenShardsOrchestrator::stepEntry(Entry entry, zkutil::ZooKeeperPtr zk) { switch (entry.state.value) { - case EntryState::DONE: - break; - + case EntryState::DONE: [[fallthrough]]; case EntryState::CANCELLED: - break; + throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't stepEntry after terminal state. This is a bug."); case EntryState::TODO: { - /// State transition. - Entry entry_copy = entry; - entry_copy.state = EntryState::SYNC_SOURCE; - entry_copy.update_time = std::time(nullptr); - zk->set(entry_copy.znode_path, entry_copy.toString(), entry_copy.version); + if (entry.rollback) + { + removePins(entry, zk); + entry.state = EntryState::CANCELLED; + return entry; + } + /// The forward transition happens implicitly when task is created by `StorageReplicatedMergeTree::movePartitionToShard`. + else + throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected entry state ({}) in stepEntry. This is a bug.", entry.state.toString()); } - break; case EntryState::SYNC_SOURCE: { + if (entry.rollback) { - /// Log entry. - Coordination::Requests ops; - ops.emplace_back(zkutil::makeCheckRequest(entry.znode_path, entry.version)); - - ReplicatedMergeTreeLogEntryData log_entry; - log_entry.type = ReplicatedMergeTreeLogEntryData::SYNC_PINNED_PART_UUIDS; - log_entry.create_time = std::time(nullptr); - log_entry.source_replica = storage.replica_name; - ops.emplace_back(zkutil::makeSetRequest(zookeeper_path + "/log", "", -1)); - ops.emplace_back(zkutil::makeCreateRequest( - zookeeper_path + "/log/log-", log_entry.toString(), zkutil::CreateMode::PersistentSequential)); - - Coordination::Responses responses; - Coordination::Error rc = zk->tryMulti(ops, responses); - zkutil::KeeperMultiException::check(rc, ops, responses); - - String log_znode_path = dynamic_cast(*responses.back()).path_created; - log_entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1); - - /// This wait in background schedule pool is useless. It'd be - /// better to have some notification which will call `step` - /// function when all replicated will finish. TODO. - storage.waitForAllReplicasToProcessLogEntry(zookeeper_path, log_entry, -1); + entry.state = EntryState::TODO; + return entry; } - + else { - /// State transition. - Entry entry_copy = entry; - entry_copy.state = EntryState::SYNC_DESTINATION; - entry_copy.update_time = std::time(nullptr); - zk->set(entry_copy.znode_path, entry_copy.toString(), entry_copy.version); + ReplicatedMergeTreeLogEntryData sync_source_log_entry; + + String sync_source_log_entry_barrier_path = fs::path(entry.znode_path) / ("log_" + entry.state.toString()); + Coordination::Stat sync_source_log_entry_stat; + String sync_source_log_entry_str; + if (zk->tryGet(sync_source_log_entry_barrier_path, sync_source_log_entry_str, &sync_source_log_entry_stat)) + { + LOG_DEBUG(log, "Log entry was already created will check the existing one."); + + sync_source_log_entry = *ReplicatedMergeTreeLogEntry::parse(sync_source_log_entry_str, sync_source_log_entry_stat); + } + else + { + /// Log entry. + Coordination::Requests ops; + ops.emplace_back(zkutil::makeCheckRequest(entry.znode_path, entry.version)); + + sync_source_log_entry.type = ReplicatedMergeTreeLogEntryData::SYNC_PINNED_PART_UUIDS; + sync_source_log_entry.log_entry_id = sync_source_log_entry_barrier_path; + sync_source_log_entry.create_time = std::time(nullptr); + sync_source_log_entry.source_replica = storage.replica_name; + + ops.emplace_back(zkutil::makeCreateRequest(sync_source_log_entry_barrier_path, sync_source_log_entry.toString(), -1)); + ops.emplace_back(zkutil::makeSetRequest(zookeeper_path + "/log", "", -1)); + ops.emplace_back(zkutil::makeCreateRequest( + zookeeper_path + "/log/log-", sync_source_log_entry.toString(), zkutil::CreateMode::PersistentSequential)); + + Coordination::Responses responses; + Coordination::Error rc = zk->tryMulti(ops, responses); + zkutil::KeeperMultiException::check(rc, ops, responses); + + String log_znode_path = dynamic_cast(*responses.back()).path_created; + sync_source_log_entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1); + + LOG_DEBUG(log, "Pushed log entry: {}", log_znode_path); + } + + Strings unwaited = storage.tryWaitForAllReplicasToProcessLogEntry(zookeeper_path, sync_source_log_entry, 1); + if (!unwaited.empty()) + throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Some replicas haven't processed event: {}, will retry later.", toString(unwaited)); + + entry.state = EntryState::SYNC_DESTINATION; + return entry; } } - break; case EntryState::SYNC_DESTINATION: { + if (entry.rollback) { - /// Log entry. - Coordination::Requests ops; - ops.emplace_back(zkutil::makeCheckRequest(entry.znode_path, entry.version)); - - ReplicatedMergeTreeLogEntryData log_entry; - log_entry.type = ReplicatedMergeTreeLogEntryData::SYNC_PINNED_PART_UUIDS; - log_entry.create_time = std::time(nullptr); - log_entry.source_replica = storage.replica_name; - log_entry.source_shard = zookeeper_path; - - ops.emplace_back(zkutil::makeSetRequest(entry.to_shard + "/log", "", -1)); - ops.emplace_back(zkutil::makeCreateRequest( - entry.to_shard + "/log/log-", log_entry.toString(), zkutil::CreateMode::PersistentSequential)); - - Coordination::Responses responses; - Coordination::Error rc = zk->tryMulti(ops, responses); - zkutil::KeeperMultiException::check(rc, ops, responses); - - String log_znode_path = dynamic_cast(*responses.back()).path_created; - log_entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1); - - storage.waitForAllReplicasToProcessLogEntry(entry.to_shard, log_entry, -1); - } - - { - /// State transition. Entry entry_copy = entry; - entry_copy.state = EntryState::DESTINATION_FETCH; - entry_copy.update_time = std::time(nullptr); - zk->set(entry_copy.znode_path, entry_copy.toString(), entry_copy.version); + entry_copy.state = EntryState::SYNC_SOURCE; + return entry_copy; + } + else + { + ReplicatedMergeTreeLogEntryData sync_destination_log_entry; + + String sync_destination_log_entry_barrier_path = fs::path(entry.znode_path) / ("log_" + entry.state.toString()); + Coordination::Stat sync_destination_log_entry_stat; + String sync_destination_log_entry_str; + if (zk->tryGet(sync_destination_log_entry_barrier_path, sync_destination_log_entry_str, &sync_destination_log_entry_stat)) + { + LOG_DEBUG(log, "Log entry was already created will check the existing one."); + + sync_destination_log_entry = *ReplicatedMergeTreeLogEntry::parse(sync_destination_log_entry_str, sync_destination_log_entry_stat); + } + else + { + /// Log entry. + Coordination::Requests ops; + ops.emplace_back(zkutil::makeCheckRequest(entry.znode_path, entry.version)); + + sync_destination_log_entry.type = ReplicatedMergeTreeLogEntryData::SYNC_PINNED_PART_UUIDS; + sync_destination_log_entry.log_entry_id = sync_destination_log_entry_barrier_path; + sync_destination_log_entry.create_time = std::time(nullptr); + sync_destination_log_entry.source_replica = storage.replica_name; + sync_destination_log_entry.source_shard = zookeeper_path; + + ops.emplace_back(zkutil::makeCreateRequest(sync_destination_log_entry_barrier_path, sync_destination_log_entry.toString(), -1)); + ops.emplace_back(zkutil::makeSetRequest(entry.to_shard + "/log", "", -1)); + ops.emplace_back(zkutil::makeCreateRequest( + entry.to_shard + "/log/log-", sync_destination_log_entry.toString(), zkutil::CreateMode::PersistentSequential)); + + Coordination::Responses responses; + Coordination::Error rc = zk->tryMulti(ops, responses); + zkutil::KeeperMultiException::check(rc, ops, responses); + + String log_znode_path = dynamic_cast(*responses.back()).path_created; + sync_destination_log_entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1); + + LOG_DEBUG(log, "Pushed log entry: {}", log_znode_path); + } + + Strings unwaited = storage.tryWaitForAllReplicasToProcessLogEntry(entry.to_shard, sync_destination_log_entry, 1); + if (!unwaited.empty()) + throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Some replicas haven't processed event: {}, will retry later.", toString(unwaited)); + + entry.state = EntryState::DESTINATION_FETCH; + return entry; } } - break; case EntryState::DESTINATION_FETCH: { - /// Make sure table structure doesn't change when there are part movements in progress. + + if (entry.rollback) { - Coordination::Requests ops; - ops.emplace_back(zkutil::makeCheckRequest(entry.znode_path, entry.version)); + // TODO(nv): Do we want to cleanup fetched data on the destination? + // Maybe leave it there and make sure a background cleanup will take + // care of it sometime later. - /// Log entry. - ReplicatedMergeTreeLogEntryData log_entry; - log_entry.type = ReplicatedMergeTreeLogEntryData::CLONE_PART_FROM_SHARD; - log_entry.create_time = std::time(nullptr); - log_entry.new_part_name = entry.part_name; - log_entry.source_replica = storage.replica_name; - log_entry.source_shard = zookeeper_path; - ops.emplace_back(zkutil::makeSetRequest(entry.to_shard + "/log", "", -1)); - ops.emplace_back(zkutil::makeCreateRequest( - entry.to_shard + "/log/log-", log_entry.toString(), zkutil::CreateMode::PersistentSequential)); - - Coordination::Responses responses; - Coordination::Error rc = zk->tryMulti(ops, responses); - zkutil::KeeperMultiException::check(rc, ops, responses); - - String log_znode_path = dynamic_cast(*responses.back()).path_created; - log_entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1); - - storage.waitForAllReplicasToProcessLogEntry(entry.to_shard, log_entry, -1); + entry.state = EntryState::SYNC_DESTINATION; + return entry; } - + else { - /// State transition. - Entry entry_copy = entry; - entry_copy.state = EntryState::DESTINATION_ATTACH; - entry_copy.update_time = std::time(nullptr); - zk->set(entry_copy.znode_path, entry_copy.toString(), entry_copy.version); + /// Note: Table structure shouldn't be changed while there are part movements in progress. + + ReplicatedMergeTreeLogEntryData fetch_log_entry; + + String fetch_log_entry_barrier_path = fs::path(entry.znode_path) / ("log_" + entry.state.toString()); + Coordination::Stat fetch_log_entry_stat; + String fetch_log_entry_str; + if (zk->tryGet(fetch_log_entry_barrier_path, fetch_log_entry_str, &fetch_log_entry_stat)) + { + LOG_DEBUG(log, "Log entry was already created will check the existing one."); + + fetch_log_entry = *ReplicatedMergeTreeLogEntry::parse(fetch_log_entry_str, fetch_log_entry_stat); + } + else + { + Coordination::Requests ops; + ops.emplace_back(zkutil::makeCheckRequest(entry.znode_path, entry.version)); + + fetch_log_entry.type = ReplicatedMergeTreeLogEntryData::CLONE_PART_FROM_SHARD; + fetch_log_entry.log_entry_id = fetch_log_entry_barrier_path; + fetch_log_entry.create_time = std::time(nullptr); + fetch_log_entry.new_part_name = entry.part_name; + fetch_log_entry.source_replica = storage.replica_name; + fetch_log_entry.source_shard = zookeeper_path; + ops.emplace_back(zkutil::makeCreateRequest(fetch_log_entry_barrier_path, fetch_log_entry.toString(), -1)); + ops.emplace_back(zkutil::makeSetRequest(entry.to_shard + "/log", "", -1)); + ops.emplace_back(zkutil::makeCreateRequest( + entry.to_shard + "/log/log-", fetch_log_entry.toString(), zkutil::CreateMode::PersistentSequential)); + + Coordination::Responses responses; + Coordination::Error rc = zk->tryMulti(ops, responses); + zkutil::KeeperMultiException::check(rc, ops, responses); + + String log_znode_path = dynamic_cast(*responses.back()).path_created; + fetch_log_entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1); + + LOG_DEBUG(log, "Pushed log entry: {}", log_znode_path); + } + + Strings unwaited = storage.tryWaitForAllReplicasToProcessLogEntry(entry.to_shard, fetch_log_entry, 1); + if (!unwaited.empty()) + throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Some replicas haven't processed event: {}, will retry later.", toString(unwaited)); + + entry.state = EntryState::DESTINATION_ATTACH; + return entry; } } - break; case EntryState::DESTINATION_ATTACH: { - /// There is a chance that attach on destination will fail and this task will be left in the queue forever. + String attach_log_entry_barrier_path = fs::path(entry.znode_path) / ("log_" + entry.state.toString()); + + if (entry.rollback) { + Coordination::Stat attach_log_entry_stat; + String attach_log_entry_str; + if (!zk->tryGet(attach_log_entry_barrier_path, attach_log_entry_str, &attach_log_entry_stat)) + { + LOG_DEBUG(log, "Log entry for DESTINATION_ATTACH not found. Not sending DROP_RANGE log entry."); + + // ATTACH_PART wasn't issued, nothing to revert. + entry.state = EntryState::DESTINATION_FETCH; + return entry; + } + else + { + // Need to remove ATTACH_PART from the queue or drop data. + // Similar to `StorageReplicatedMergeTree::dropPart` w/o extra + // checks as we know drop shall be possible. + ReplicatedMergeTreeLogEntryData attach_rollback_log_entry; + + String attach_rollback_log_entry_barrier_path = fs::path(entry.znode_path) / ("log_" + entry.state.toString() + "_rollback"); + Coordination::Stat attach_rollback_log_entry_stat; + String attach_rollback_log_entry_str; + if (zk->tryGet(attach_rollback_log_entry_barrier_path, attach_rollback_log_entry_str, &attach_rollback_log_entry_stat)) + { + LOG_DEBUG(log, "Log entry was already created will check the existing one."); + + attach_rollback_log_entry = *ReplicatedMergeTreeLogEntry::parse(attach_rollback_log_entry_str, attach_rollback_log_entry_stat); + } + else + { + const auto attach_log_entry = ReplicatedMergeTreeLogEntry::parse(attach_log_entry_str, attach_log_entry_stat); + + Coordination::Requests ops; + ops.emplace_back(zkutil::makeCheckRequest(entry.znode_path, entry.version)); + + auto drop_part_info = MergeTreePartInfo::fromPartName(attach_log_entry->new_part_name, storage.format_version); + + storage.getClearBlocksInPartitionOps( + ops, *zk, drop_part_info.partition_id, drop_part_info.min_block, drop_part_info.max_block); + size_t clear_block_ops_size = ops.size(); + + attach_rollback_log_entry.type = ReplicatedMergeTreeLogEntryData::DROP_RANGE; + attach_rollback_log_entry.log_entry_id = attach_rollback_log_entry_barrier_path; + attach_rollback_log_entry.source_replica = storage.replica_name; + attach_rollback_log_entry.source_shard = zookeeper_path; + + attach_rollback_log_entry.new_part_name = getPartNamePossiblyFake(storage.format_version, drop_part_info); + attach_rollback_log_entry.create_time = time(nullptr); + + ops.emplace_back(zkutil::makeCreateRequest(attach_rollback_log_entry_barrier_path, attach_rollback_log_entry.toString(), -1)); + ops.emplace_back(zkutil::makeCreateRequest( + entry.to_shard + "/log/log-", attach_rollback_log_entry.toString(), zkutil::CreateMode::PersistentSequential)); + ops.emplace_back(zkutil::makeSetRequest(entry.to_shard + "/log", "", -1)); + Coordination::Responses responses; + Coordination::Error rc = zk->tryMulti(ops, responses); + zkutil::KeeperMultiException::check(rc, ops, responses); + + String log_znode_path + = dynamic_cast(*responses[clear_block_ops_size]).path_created; + attach_rollback_log_entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1); + + LOG_DEBUG(log, "Pushed log entry: {}", log_znode_path); + } + + Strings unwaited = storage.tryWaitForAllReplicasToProcessLogEntry(entry.to_shard, attach_rollback_log_entry, 1); + if (!unwaited.empty()) + throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Some replicas haven't processed event: {}, will retry later.", toString(unwaited)); + + entry.state = EntryState::DESTINATION_FETCH; + return entry; + } + } + else + { + /// There is a chance that attach on destination will fail and this task will be left in the queue forever. + Coordination::Requests ops; ops.emplace_back(zkutil::makeCheckRequest(entry.znode_path, entry.version)); auto part = storage.getActiveContainingPart(entry.part_name); + /// Allocating block number in other replicas zookeeper path /// TODO Maybe we can do better. - auto block_number_lock = storage.allocateBlockNumber(part->info.partition_id, zk, "", entry.to_shard); - auto block_number = block_number_lock->getNumber(); + auto block_number_lock = storage.allocateBlockNumber(part->info.partition_id, zk, attach_log_entry_barrier_path, entry.to_shard); - auto part_info = part->info; - part_info.min_block = block_number; - part_info.max_block = block_number; - part_info.level = 0; - part_info.mutation = 0; - - /// Attach log entry (all replicas already fetched part) ReplicatedMergeTreeLogEntryData log_entry; - log_entry.type = ReplicatedMergeTreeLogEntryData::ATTACH_PART; - log_entry.part_checksum = part->checksums.getTotalChecksumHex(); - log_entry.create_time = std::time(nullptr); - log_entry.new_part_name = part_info.getPartName(); - ops.emplace_back(zkutil::makeSetRequest(entry.to_shard + "/log", "", -1)); - ops.emplace_back(zkutil::makeCreateRequest( - entry.to_shard + "/log/log-", log_entry.toString(), zkutil::CreateMode::PersistentSequential)); - Coordination::Responses responses; - Coordination::Error rc = zk->tryMulti(ops, responses); - zkutil::KeeperMultiException::check(rc, ops, responses); + if (block_number_lock) + { + auto block_number = block_number_lock->getNumber(); - String log_znode_path = dynamic_cast(*responses.back()).path_created; - log_entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1); + auto part_info = part->info; + part_info.min_block = block_number; + part_info.max_block = block_number; + part_info.level = 0; + part_info.mutation = 0; - storage.waitForAllReplicasToProcessLogEntry(entry.to_shard, log_entry, -1); - } + /// Attach log entry (all replicas already fetched part) + log_entry.type = ReplicatedMergeTreeLogEntryData::ATTACH_PART; + log_entry.log_entry_id = attach_log_entry_barrier_path; + log_entry.part_checksum = part->checksums.getTotalChecksumHex(); + log_entry.create_time = std::time(nullptr); + log_entry.new_part_name = part_info.getPartName(); - { - /// State transition. - Entry entry_copy = entry; - entry_copy.state = EntryState::SOURCE_DROP_PRE_DELAY; - entry_copy.update_time = std::time(nullptr); - zk->set(entry_copy.znode_path, entry_copy.toString(), entry_copy.version); + ops.emplace_back(zkutil::makeCreateRequest(attach_log_entry_barrier_path, log_entry.toString(), -1)); + ops.emplace_back(zkutil::makeSetRequest(entry.to_shard + "/log", "", -1)); + ops.emplace_back(zkutil::makeCreateRequest( + entry.to_shard + "/log/log-", log_entry.toString(), zkutil::CreateMode::PersistentSequential)); + + Coordination::Responses responses; + Coordination::Error rc = zk->tryMulti(ops, responses); + zkutil::KeeperMultiException::check(rc, ops, responses); + + String log_znode_path = dynamic_cast(*responses.back()).path_created; + log_entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1); + + LOG_DEBUG(log, "Pushed log entry: {}", log_znode_path); + } + else + { + LOG_DEBUG(log, "Log entry was already created will check the existing one."); + + Coordination::Stat stat; + String log_entry_str = zk->get(attach_log_entry_barrier_path, &stat); + log_entry = *ReplicatedMergeTreeLogEntry::parse(log_entry_str, stat); + } + + Strings unwaited = storage.tryWaitForAllReplicasToProcessLogEntry(entry.to_shard, log_entry, 1); + if (!unwaited.empty()) + throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Some replicas haven't processed event: {}, will retry later.", toString(unwaited)); + + + entry.dst_part_name = log_entry.new_part_name; + entry.state = EntryState::SOURCE_DROP_PRE_DELAY; + return entry; } } - break; case EntryState::SOURCE_DROP_PRE_DELAY: { - std::this_thread::sleep_for(std::chrono::seconds(storage.getSettings()->part_moves_between_shards_delay_seconds)); - - /// State transition. - Entry entry_copy = entry; - entry_copy.state = EntryState::SOURCE_DROP; - entry_copy.update_time = std::time(nullptr); - zk->set(entry_copy.znode_path, entry_copy.toString(), entry_copy.version); + if (entry.rollback) + { + entry.state = EntryState::DESTINATION_ATTACH; + return entry; + } + else + { + std::this_thread::sleep_for(std::chrono::seconds(storage.getSettings()->part_moves_between_shards_delay_seconds)); + entry.state = EntryState::SOURCE_DROP; + return entry; + } } - break; case EntryState::SOURCE_DROP: { + if (entry.rollback) + throw Exception(ErrorCodes::LOGICAL_ERROR, "It is not possible to rollback from this state. This is a bug."); + else { - ReplicatedMergeTreeLogEntry log_entry; - if (storage.dropPartImpl(zk, entry.part_name, log_entry, false, false)) - storage.waitForAllReplicasToProcessLogEntry(zookeeper_path, log_entry, -1); - } + // Can't use dropPartImpl directly as we need additional zk ops to remember the log entry + // for subsequent retries. - { - /// State transition. - Entry entry_copy = entry; - entry_copy.state = EntryState::SOURCE_DROP_POST_DELAY; - entry_copy.update_time = std::time(nullptr); - zk->set(entry_copy.znode_path, entry_copy.toString(), entry_copy.version); + ReplicatedMergeTreeLogEntryData source_drop_log_entry; + + String source_drop_log_entry_barrier_path = fs::path(entry.znode_path) / ("log_" + entry.state.toString()); + Coordination::Stat source_drop_log_entry_stat; + String source_drop_log_entry_str; + if (zk->tryGet(source_drop_log_entry_barrier_path, source_drop_log_entry_str, &source_drop_log_entry_stat)) + { + LOG_DEBUG(log, "Log entry was already created will check the existing one."); + + source_drop_log_entry = *ReplicatedMergeTreeLogEntry::parse(source_drop_log_entry_str, source_drop_log_entry_stat); + } + else + { + auto source_drop_part_info = MergeTreePartInfo::fromPartName(entry.part_name, storage.format_version); + + Coordination::Requests ops; + ops.emplace_back(zkutil::makeCheckRequest(entry.znode_path, entry.version)); + + storage.getClearBlocksInPartitionOps(ops, *zk, source_drop_part_info.partition_id, source_drop_part_info.min_block, source_drop_part_info.max_block); + + source_drop_log_entry.type = ReplicatedMergeTreeLogEntryData::DROP_RANGE; + source_drop_log_entry.log_entry_id = source_drop_log_entry_barrier_path; + source_drop_log_entry.create_time = std::time(nullptr); + source_drop_log_entry.new_part_name = getPartNamePossiblyFake(storage.format_version, source_drop_part_info); + source_drop_log_entry.source_replica = storage.replica_name; + + ops.emplace_back(zkutil::makeCreateRequest(source_drop_log_entry_barrier_path, source_drop_log_entry.toString(), -1)); + ops.emplace_back(zkutil::makeSetRequest(zookeeper_path + "/log", "", -1)); + ops.emplace_back(zkutil::makeCreateRequest( + zookeeper_path + "/log/log-", source_drop_log_entry.toString(), zkutil::CreateMode::PersistentSequential)); + + Coordination::Responses responses; + Coordination::Error rc = zk->tryMulti(ops, responses); + zkutil::KeeperMultiException::check(rc, ops, responses); + + String log_znode_path = dynamic_cast(*responses.back()).path_created; + source_drop_log_entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1); + + LOG_DEBUG(log, "Pushed log entry: {}", log_znode_path); + } + + Strings unwaited = storage.tryWaitForAllReplicasToProcessLogEntry(zookeeper_path, source_drop_log_entry, 1); + if (!unwaited.empty()) + throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Some replicas haven't processed event: {}, will retry later.", toString(unwaited)); + + entry.state = EntryState::SOURCE_DROP_POST_DELAY; + return entry; } } - break; case EntryState::SOURCE_DROP_POST_DELAY: { - std::this_thread::sleep_for(std::chrono::seconds(storage.getSettings()->part_moves_between_shards_delay_seconds)); - - /// State transition. - Entry entry_copy = entry; - entry_copy.state = EntryState::REMOVE_UUID_PIN; - entry_copy.update_time = std::time(nullptr); - zk->set(entry_copy.znode_path, entry_copy.toString(), entry_copy.version); + if (entry.rollback) + throw Exception(ErrorCodes::LOGICAL_ERROR, "It is not possible to rollback from this state. This is a bug."); + else + { + std::this_thread::sleep_for(std::chrono::seconds(storage.getSettings()->part_moves_between_shards_delay_seconds)); + entry.state = EntryState::REMOVE_UUID_PIN; + return entry; + } } - break; case EntryState::REMOVE_UUID_PIN: { + if (entry.rollback) + throw Exception(ErrorCodes::LOGICAL_ERROR, "It is not possible to rollback from this state. This is a bug."); + else { - PinnedPartUUIDs src_pins; - PinnedPartUUIDs dst_pins; + removePins(entry, zk); - { - String s = zk->get(zookeeper_path + "/pinned_part_uuids", &src_pins.stat); - src_pins.fromString(s); - } - - { - String s = zk->get(entry.to_shard + "/pinned_part_uuids", &dst_pins.stat); - dst_pins.fromString(s); - } - - src_pins.part_uuids.erase(entry.part_uuid); - dst_pins.part_uuids.erase(entry.part_uuid); - - Coordination::Requests ops; - ops.emplace_back(zkutil::makeSetRequest(zookeeper_path + "/pinned_part_uuids", src_pins.toString(), src_pins.stat.version)); - ops.emplace_back(zkutil::makeSetRequest(entry.to_shard + "/pinned_part_uuids", dst_pins.toString(), dst_pins.stat.version)); - - zk->multi(ops); + entry.state = EntryState::DONE; + return entry; } - - /// State transition. - Entry entry_copy = entry; - entry_copy.state = EntryState::DONE; - entry_copy.update_time = std::time(nullptr); - zk->set(entry_copy.znode_path, entry_copy.toString(), entry_copy.version); } - break; + } + + __builtin_unreachable(); +} + +void PartMovesBetweenShardsOrchestrator::removePins(const Entry & entry, zkutil::ZooKeeperPtr zk) +{ + PinnedPartUUIDs src_pins; + PinnedPartUUIDs dst_pins; + + { + String s = zk->get(zookeeper_path + "/pinned_part_uuids", &src_pins.stat); + src_pins.fromString(s); + } + + { + String s = zk->get(entry.to_shard + "/pinned_part_uuids", &dst_pins.stat); + dst_pins.fromString(s); + } + + dst_pins.part_uuids.erase(entry.part_uuid); + src_pins.part_uuids.erase(entry.part_uuid); + + Coordination::Requests ops; + ops.emplace_back(zkutil::makeSetRequest(zookeeper_path + "/pinned_part_uuids", src_pins.toString(), src_pins.stat.version)); + ops.emplace_back(zkutil::makeSetRequest(entry.to_shard + "/pinned_part_uuids", dst_pins.toString(), dst_pins.stat.version)); + + zk->multi(ops); +} + +CancellationCode PartMovesBetweenShardsOrchestrator::killPartMoveToShard(const UUID & task_uuid) +{ + while (true) + { + auto entry = getEntryByUUID(task_uuid); + + // If the task is in this state or any that follows it is too late to rollback + // since we can't be sure if the source data still exists. + auto not_possible_to_rollback_after_state = EntryState(EntryState::SOURCE_DROP); + if (entry.state.value >= not_possible_to_rollback_after_state.value) + { + LOG_DEBUG(log, "Can't kill move part between shards entry {} ({}) after state {}. Current state: {}.", + toString(entry.task_uuid), entry.znode_name, not_possible_to_rollback_after_state.toString(), entry.state.toString()); + return CancellationCode::CancelCannotBeSent; + } + + LOG_TRACE(log, "Will try to mark move part between shards entry {} ({}) for rollback.", + toString(entry.task_uuid), entry.znode_name); + + auto zk = storage.getZooKeeper(); + + // State transition. + entry.rollback = true; + entry.update_time = std::time(nullptr); + entry.num_tries = 0; + entry.last_exception_msg = ""; + + auto code = zk->trySet(entry.znode_path, entry.toString(), entry.version); + if (code == Coordination::Error::ZOK) + { + // Orchestrator will process it in background. + return CancellationCode::CancelSent; + } + else if (code == Coordination::Error::ZBADVERSION) + { + /// Node was updated meanwhile. We must re-read it and repeat all the actions. + continue; + } + else + throw Coordination::Exception(code, entry.znode_path); } } -std::vector PartMovesBetweenShardsOrchestrator::getEntries() const +std::vector PartMovesBetweenShardsOrchestrator::getEntries() { + // Force sync. Also catches parsing errors. + syncStateFromZK(); + std::lock_guard lock(state_mutex); - std::vector res; + return entries; +} - for (const auto & e : entries) - res.push_back(e.second); +PartMovesBetweenShardsOrchestrator::Entry PartMovesBetweenShardsOrchestrator::getEntryByUUID(const UUID & task_uuid) +{ + /// Need latest state in case user tries to kill a move observed on a different replica. + syncStateFromZK(); - return res; + std::lock_guard lock(state_mutex); + for (auto const & entry : entries) + { + if (entry.task_uuid == task_uuid) + return entry; + } + + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Task with id {} not found", toString(task_uuid)); } String PartMovesBetweenShardsOrchestrator::Entry::toString() const @@ -431,12 +714,19 @@ String PartMovesBetweenShardsOrchestrator::Entry::toString() const json.set(JSON_KEY_PART_NAME, part_name); json.set(JSON_KEY_PART_UUID, DB::toString(part_uuid)); json.set(JSON_KEY_TO_SHARD, to_shard); + json.set(JSON_KEY_DST_PART_NAME, dst_part_name); json.set(JSON_KEY_STATE, state.toString()); + json.set(JSON_KEY_ROLLBACK, DB::toString(rollback)); json.set(JSON_KEY_LAST_EX_MSG, last_exception_msg); + json.set(JSON_KEY_NUM_TRIES, DB::toString(num_tries)); std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM oss.exceptions(std::ios::failbit); - json.stringify(oss); + + // Always escape unicode to make last_exception_msg json safe. + // It may contain random binary data when exception is a parsing error + // of unexpected contents. + Poco::JSON::Stringifier::stringify(json, oss, 0, -1, Poco::JSON_WRAP_STRINGS | Poco::JSON_ESCAPE_UNICODE); return oss.str(); } @@ -452,8 +742,11 @@ void PartMovesBetweenShardsOrchestrator::Entry::fromString(const String & buf) part_name = json->getValue(JSON_KEY_PART_NAME); part_uuid = parseFromString(json->getValue(JSON_KEY_PART_UUID)); to_shard = json->getValue(JSON_KEY_TO_SHARD); + dst_part_name = json->getValue(JSON_KEY_DST_PART_NAME); state.value = EntryState::fromString(json->getValue(JSON_KEY_STATE)); + rollback = json->getValue(JSON_KEY_ROLLBACK); last_exception_msg = json->getValue(JSON_KEY_LAST_EX_MSG); + num_tries = json->getValue(JSON_KEY_NUM_TRIES); } } diff --git a/src/Storages/MergeTree/PartMovesBetweenShardsOrchestrator.h b/src/Storages/MergeTree/PartMovesBetweenShardsOrchestrator.h index 9e54ae8a8ed..38b6a076748 100644 --- a/src/Storages/MergeTree/PartMovesBetweenShardsOrchestrator.h +++ b/src/Storages/MergeTree/PartMovesBetweenShardsOrchestrator.h @@ -7,6 +7,7 @@ #include #include #include +#include namespace DB { @@ -18,14 +19,37 @@ namespace ErrorCodes class StorageReplicatedMergeTree; -/// Cross shard part movement workflow orchestration. +/** + * Cross shard part movement workflow orchestration. + * + * TODO(nv): + * * Usage of `format_version` when acting on the behalf of the remote shard. + * There needs to be sort of an API to coordinate with remote replicas. + * * Only one movement at a time can be coordinated. This can easily be fixed + * by cycling through different tasks and checking their status with a + * priority queue and back-off for failing tasks + * `min(backoff * num_tries, max_backoff)`. + */ class PartMovesBetweenShardsOrchestrator { public: struct EntryState { + // State transitions are linear. When a kill query is issued a rollback + // flag is set and transitions order is reversed. + // + // SOURCE_DROP is a critical state after which rollback is not possible + // and we must ensure that the task can always succeed after that. + // + // Similar for rollback. It should be always possible to rollback before + // SOURCE_DROP state and it should terminate. + // + // Note: This fragile. If you change the states please add entry to + // changelog about forward/backward compatibility. Better not to have + // any active move tasks while doing upgrade/downgrade operations. enum Value { + CANCELLED, TODO, SYNC_SOURCE, SYNC_DESTINATION, @@ -36,7 +60,6 @@ public: SOURCE_DROP_POST_DELAY, REMOVE_UUID_PIN, DONE, - CANCELLED, }; EntryState(): value(TODO) {} @@ -95,10 +118,14 @@ public: String part_name; UUID part_uuid; String to_shard; + String dst_part_name; EntryState state; + bool rollback = false; + /// Reset on successful transitions. String last_exception_msg; + UInt64 num_tries = 0; String znode_name; @@ -120,27 +147,31 @@ private: static constexpr auto JSON_KEY_PART_NAME = "part_name"; static constexpr auto JSON_KEY_PART_UUID = "part_uuid"; static constexpr auto JSON_KEY_TO_SHARD = "to_shard"; + static constexpr auto JSON_KEY_DST_PART_NAME = "dst_part_name"; static constexpr auto JSON_KEY_STATE = "state"; + static constexpr auto JSON_KEY_ROLLBACK = "rollback"; static constexpr auto JSON_KEY_LAST_EX_MSG = "last_exception"; + static constexpr auto JSON_KEY_NUM_TRIES = "num_tries"; public: - PartMovesBetweenShardsOrchestrator(StorageReplicatedMergeTree & storage_); + explicit PartMovesBetweenShardsOrchestrator(StorageReplicatedMergeTree & storage_); void start() { task->activateAndSchedule(); } void wakeup() { task->schedule(); } void shutdown(); - void fetchStateFromZK(); + CancellationCode killPartMoveToShard(const UUID & task_uuid); - /// We could have one thread per Entry and worry about concurrency issues. - /// Or we could have a single thread trying to run one step at a time. - bool step(); - - std::vector getEntries() const; + std::vector getEntries(); private: void run(); - void stepEntry(const Entry & entry, zkutil::ZooKeeperPtr zk); + bool step(); + Entry stepEntry(Entry entry, zkutil::ZooKeeperPtr zk); + + Entry getEntryByUUID(const UUID & task_uuid); + void removePins(const Entry & entry, zkutil::ZooKeeperPtr zk); + void syncStateFromZK(); private: StorageReplicatedMergeTree & storage; @@ -153,7 +184,7 @@ private: BackgroundSchedulePool::TaskHolder task; mutable std::mutex state_mutex; - std::map entries; + std::vector entries; public: String entries_znode_path; diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 335302e5dda..03bef4581c1 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -4669,7 +4669,7 @@ void StorageReplicatedMergeTree::alter( /// If new version returns ordinary name, else returns part name containing the first and last month of the month /// NOTE: use it in pair with getFakePartCoveringAllPartsInPartition(...) -static String getPartNamePossiblyFake(MergeTreeDataFormatVersion format_version, const MergeTreePartInfo & part_info) +String getPartNamePossiblyFake(MergeTreeDataFormatVersion format_version, const MergeTreePartInfo & part_info) { if (format_version < MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING) { @@ -6568,6 +6568,7 @@ void StorageReplicatedMergeTree::movePartitionToShard( dst_pins.part_uuids.insert(part->uuid); PartMovesBetweenShardsOrchestrator::Entry part_move_entry; + part_move_entry.state = PartMovesBetweenShardsOrchestrator::EntryState::SYNC_SOURCE; part_move_entry.create_time = std::time(nullptr); part_move_entry.update_time = part_move_entry.create_time; part_move_entry.task_uuid = UUIDHelpers::generateV4(); @@ -6591,10 +6592,13 @@ void StorageReplicatedMergeTree::movePartitionToShard( String task_znode_path = dynamic_cast(*responses.back()).path_created; LOG_DEBUG(log, "Created task for part movement between shards at " + task_znode_path); - /// Force refresh local state. This will make the task immediately visible in `system.part_moves_between_shards` table. - part_moves_between_shards_orchestrator.fetchStateFromZK(); + /// TODO(nv): Nice to have support for `replication_alter_partitions_sync`. + /// For now use the system.part_moves_between_shards table for status. +} - // TODO: Add support for `replication_alter_partitions_sync`. +CancellationCode StorageReplicatedMergeTree::killPartMoveToShard(const UUID & task_uuid) +{ + return part_moves_between_shards_orchestrator.killPartMoveToShard(task_uuid); } void StorageReplicatedMergeTree::getCommitPartOps( diff --git a/src/Storages/StorageReplicatedMergeTree.h b/src/Storages/StorageReplicatedMergeTree.h index e76a54ac5ec..9eaf3c318dd 100644 --- a/src/Storages/StorageReplicatedMergeTree.h +++ b/src/Storages/StorageReplicatedMergeTree.h @@ -682,6 +682,7 @@ private: void replacePartitionFrom(const StoragePtr & source_table, const ASTPtr & partition, bool replace, ContextPtr query_context) override; void movePartitionToTable(const StoragePtr & dest_table, const ASTPtr & partition, ContextPtr query_context) override; void movePartitionToShard(const ASTPtr & partition, bool move_part, const String & to, ContextPtr query_context) override; + CancellationCode killPartMoveToShard(const UUID & task_uuid) override; void fetchPartition( const ASTPtr & partition, const StorageMetadataPtr & metadata_snapshot, @@ -745,6 +746,8 @@ protected: bool allow_renaming_); }; +String getPartNamePossiblyFake(MergeTreeDataFormatVersion format_version, const MergeTreePartInfo & part_info); + /** There are three places for each part, where it should be * 1. In the RAM, data_parts, all_data_parts. diff --git a/src/Storages/System/StorageSystemPartMovesBetweenShards.cpp b/src/Storages/System/StorageSystemPartMovesBetweenShards.cpp index 70bc473e241..c397392e9fb 100644 --- a/src/Storages/System/StorageSystemPartMovesBetweenShards.cpp +++ b/src/Storages/System/StorageSystemPartMovesBetweenShards.cpp @@ -30,10 +30,12 @@ NamesAndTypesList StorageSystemPartMovesBetweenShards::getNamesAndTypes() { "part_name", std::make_shared() }, { "part_uuid", std::make_shared() }, { "to_shard", std::make_shared() }, + { "dst_part_name", std::make_shared() }, /// Processing status of item. { "update_time", std::make_shared() }, { "state", std::make_shared() }, + { "rollback", std::make_shared() }, { "num_tries", std::make_shared() }, { "last_exception", std::make_shared() }, }; @@ -122,11 +124,13 @@ void StorageSystemPartMovesBetweenShards::fillData(MutableColumns & res_columns, res_columns[col_num++]->insert(entry.part_name); res_columns[col_num++]->insert(entry.part_uuid); res_columns[col_num++]->insert(entry.to_shard); + res_columns[col_num++]->insert(entry.dst_part_name); /// Processing status of item. res_columns[col_num++]->insert(entry.update_time); res_columns[col_num++]->insert(entry.state.toString()); - res_columns[col_num++]->insert(0); + res_columns[col_num++]->insert(entry.rollback); + res_columns[col_num++]->insert(entry.num_tries); res_columns[col_num++]->insert(entry.last_exception_msg); } } diff --git a/tests/integration/test_part_moves_between_shards/test.py b/tests/integration/test_part_moves_between_shards/test.py index 00407f95389..ed7640e5f9e 100644 --- a/tests/integration/test_part_moves_between_shards/test.py +++ b/tests/integration/test_part_moves_between_shards/test.py @@ -1,32 +1,38 @@ -import random -import time - import pytest +import random +import threading +import time from helpers.client import QueryRuntimeException from helpers.cluster import ClickHouseCluster from helpers.test_tools import TSV +transient_ch_errors = [23, 32, 210] + cluster = ClickHouseCluster(__file__) s0r0 = cluster.add_instance( 's0r0', main_configs=['configs/remote_servers.xml', 'configs/merge_tree.xml'], + stay_alive=True, with_zookeeper=True) s0r1 = cluster.add_instance( 's0r1', main_configs=['configs/remote_servers.xml', 'configs/merge_tree.xml'], + stay_alive=True, with_zookeeper=True) s1r0 = cluster.add_instance( 's1r0', main_configs=['configs/remote_servers.xml', 'configs/merge_tree.xml'], + stay_alive=True, with_zookeeper=True) s1r1 = cluster.add_instance( 's1r1', main_configs=['configs/remote_servers.xml', 'configs/merge_tree.xml'], + stay_alive=True, with_zookeeper=True) @@ -43,12 +49,14 @@ def test_move(started_cluster): for shard_ix, rs in enumerate([[s0r0, s0r1], [s1r0, s1r1]]): for replica_ix, r in enumerate(rs): r.query(""" + DROP TABLE IF EXISTS test_move; CREATE TABLE test_move(v UInt64) ENGINE ReplicatedMergeTree('/clickhouse/shard_{}/tables/test_move', '{}') ORDER BY tuple() - """.format(shard_ix, replica_ix)) + """.format(shard_ix, r.name)) s0r0.query("SYSTEM STOP MERGES test_move") + s0r1.query("SYSTEM STOP MERGES test_move") s0r0.query("INSERT INTO test_move VALUES (1)") s0r0.query("INSERT INTO test_move VALUES (2)") @@ -63,14 +71,7 @@ def test_move(started_cluster): s0r0.query("SYSTEM START MERGES test_move") s0r0.query("OPTIMIZE TABLE test_move FINAL") - while True: - time.sleep(3) - - print(s0r0.query("SELECT * FROM system.part_moves_between_shards")) - - # Eventually. - if "DONE" == s0r0.query("SELECT state FROM system.part_moves_between_shards WHERE table = 'test_move'").strip(): - break + wait_for_state("DONE", s0r0, "test_move") for n in [s0r0, s0r1]: assert "1" == n.query("SELECT count() FROM test_move").strip() @@ -81,14 +82,7 @@ def test_move(started_cluster): # Move part back s1r0.query("ALTER TABLE test_move MOVE PART 'all_0_0_0' TO SHARD '/clickhouse/shard_0/tables/test_move'") - while True: - time.sleep(3) - - print(s1r0.query("SELECT * FROM system.part_moves_between_shards")) - - # Eventually. - if "DONE" == s1r0.query("SELECT state FROM system.part_moves_between_shards WHERE table = 'test_move'").strip(): - break + wait_for_state("DONE", s1r0, "test_move") for n in [s0r0, s0r1]: assert "2" == n.query("SELECT count() FROM test_move").strip() @@ -101,17 +95,20 @@ def test_deduplication_while_move(started_cluster): for shard_ix, rs in enumerate([[s0r0, s0r1], [s1r0, s1r1]]): for replica_ix, r in enumerate(rs): r.query(""" + DROP TABLE IF EXISTS test_deduplication; CREATE TABLE test_deduplication(v UInt64) ENGINE ReplicatedMergeTree('/clickhouse/shard_{}/tables/test_deduplication', '{}') ORDER BY tuple() - """.format(shard_ix, replica_ix)) + """.format(shard_ix, r.name)) r.query(""" - CREATE TABLE t_d AS test_deduplication + DROP TABLE IF EXISTS test_deduplication_d; + CREATE TABLE test_deduplication_d AS test_deduplication ENGINE Distributed('test_cluster', '', test_deduplication) """) s0r0.query("SYSTEM STOP MERGES test_deduplication") + s0r1.query("SYSTEM STOP MERGES test_deduplication") s0r0.query("INSERT INTO test_deduplication VALUES (1)") s0r0.query("INSERT INTO test_deduplication VALUES (2)") @@ -120,7 +117,8 @@ def test_deduplication_while_move(started_cluster): assert "2" == s0r0.query("SELECT count() FROM test_deduplication").strip() assert "0" == s1r0.query("SELECT count() FROM test_deduplication").strip() - s0r0.query("ALTER TABLE test_deduplication MOVE PART 'all_0_0_0' TO SHARD '/clickhouse/shard_1/tables/test_deduplication'") + s0r0.query( + "ALTER TABLE test_deduplication MOVE PART 'all_0_0_0' TO SHARD '/clickhouse/shard_1/tables/test_deduplication'") s0r0.query("SYSTEM START MERGES test_deduplication") expected = """ @@ -128,32 +126,363 @@ def test_deduplication_while_move(started_cluster): 2 """ - # Verify that we get consisntent result at all times while the part is moving from one shard to another. - while "DONE" != s0r0.query("SELECT state FROM system.part_moves_between_shards WHERE table = 'test_deduplication' ORDER BY create_time DESC LIMIT 1").strip(): + def deduplication_invariant_test(): n = random.choice(list(started_cluster.instances.values())) + assert TSV( + n.query("SELECT * FROM test_deduplication_d ORDER BY v", + settings={"allow_experimental_query_deduplication": 1}) + ) == TSV(expected) - assert TSV(n.query("SELECT * FROM t_d ORDER BY v", settings={ - "allow_experimental_query_deduplication": 1 - })) == TSV(expected) + deduplication_invariant = ConcurrentInvariant(deduplication_invariant_test) + deduplication_invariant.start() + + wait_for_state("DONE", s0r0, "test_deduplication") + + deduplication_invariant.stop_and_assert_no_exception() + + +def test_part_move_step_by_step(started_cluster): + for shard_ix, rs in enumerate([[s0r0, s0r1], [s1r0, s1r1]]): + for replica_ix, r in enumerate(rs): + r.query(""" + DROP TABLE IF EXISTS test_part_move_step_by_step; + CREATE TABLE test_part_move_step_by_step(v UInt64) + ENGINE ReplicatedMergeTree('/clickhouse/shard_{}/tables/test_part_move_step_by_step', '{}') + ORDER BY tuple() + """.format(shard_ix, r.name)) + + r.query(""" + DROP TABLE IF EXISTS test_part_move_step_by_step_d; + CREATE TABLE test_part_move_step_by_step_d AS test_part_move_step_by_step + ENGINE Distributed('test_cluster', currentDatabase(), test_part_move_step_by_step) + """) + + s0r0.query("SYSTEM STOP MERGES test_part_move_step_by_step") + s0r1.query("SYSTEM STOP MERGES test_part_move_step_by_step") + + s0r0.query("INSERT INTO test_part_move_step_by_step VALUES (1)") + s0r0.query("INSERT INTO test_part_move_step_by_step VALUES (2)") + s0r1.query("SYSTEM SYNC REPLICA test_part_move_step_by_step", timeout=20) + + assert "2" == s0r0.query("SELECT count() FROM test_part_move_step_by_step").strip() + assert "0" == s1r0.query("SELECT count() FROM test_part_move_step_by_step").strip() + + expected = """ +1 +2 +""" + + def deduplication_invariant_test(): + n = random.choice(list(started_cluster.instances.values())) + try: + assert TSV( + n.query("SELECT * FROM test_part_move_step_by_step_d ORDER BY v", + settings={"allow_experimental_query_deduplication": 1}) + ) == TSV(expected) + except QueryRuntimeException as e: + # ignore transient errors that are caused by us restarting nodes + if e.returncode not in transient_ch_errors: + raise e + + deduplication_invariant = ConcurrentInvariant(deduplication_invariant_test) + deduplication_invariant.start() + + # Stop a source replica to prevent SYNC_SOURCE succeeding. + s0r1.stop_clickhouse() + + s0r0.query( + "ALTER TABLE test_part_move_step_by_step MOVE PART 'all_0_0_0' TO SHARD '/clickhouse/shard_1/tables/test_part_move_step_by_step'") + + # Should hang on SYNC_SOURCE until all source replicas acknowledge new pinned UUIDs. + wait_for_state("SYNC_SOURCE", s0r0, "test_part_move_step_by_step", "Some replicas haven\\'t processed event") + deduplication_invariant.assert_no_exception() + + # Start all replicas in source shard but stop a replica in destination shard + # to prevent SYNC_DESTINATION succeeding. + s1r1.stop_clickhouse() + s0r1.start_clickhouse() + + # After SYNC_SOURCE step no merges will be assigned. + s0r0.query("SYSTEM START MERGES test_part_move_step_by_step; OPTIMIZE TABLE test_part_move_step_by_step;") + s0r1.query("SYSTEM START MERGES test_part_move_step_by_step; OPTIMIZE TABLE test_part_move_step_by_step;") + + wait_for_state("SYNC_DESTINATION", s0r0, "test_part_move_step_by_step", "Some replicas haven\\'t processed event") + deduplication_invariant.assert_no_exception() + + # Start previously stopped replica in destination shard to let SYNC_DESTINATION + # succeed. + # Stop the other replica in destination shard to prevent DESTINATION_FETCH succeed. + s1r0.stop_clickhouse() + s1r1.start_clickhouse() + wait_for_state("DESTINATION_FETCH", s0r0, "test_part_move_step_by_step", "Some replicas haven\\'t processed event") + deduplication_invariant.assert_no_exception() + + # Start previously stopped replica in destination shard to let DESTINATION_FETCH + # succeed. + # Stop the other replica in destination shard to prevent DESTINATION_ATTACH succeed. + s1r1.stop_clickhouse() + s1r0.start_clickhouse() + wait_for_state("DESTINATION_ATTACH", s0r0, "test_part_move_step_by_step", "Some replicas haven\\'t processed event") + deduplication_invariant.assert_no_exception() + + # Start all replicas in destination shard to let DESTINATION_ATTACH succeed. + # Stop a source replica to prevent SOURCE_DROP succeeding. + s0r0.stop_clickhouse() + s1r1.start_clickhouse() + wait_for_state("SOURCE_DROP", s0r1, "test_part_move_step_by_step", "Some replicas haven\\'t processed event") + deduplication_invariant.assert_no_exception() + + s0r0.start_clickhouse() + wait_for_state("DONE", s0r1, "test_part_move_step_by_step") + deduplication_invariant.assert_no_exception() + + # No hung tasks in replication queue. Would timeout otherwise. + for instance in started_cluster.instances.values(): + instance.query("SYSTEM SYNC REPLICA test_part_move_step_by_step") + + assert "1" == s0r0.query("SELECT count() FROM test_part_move_step_by_step").strip() + assert "1" == s1r0.query("SELECT count() FROM test_part_move_step_by_step").strip() + + deduplication_invariant.stop_and_assert_no_exception() + + +def test_part_move_step_by_step_kill(started_cluster): + for shard_ix, rs in enumerate([[s0r0, s0r1], [s1r0, s1r1]]): + for replica_ix, r in enumerate(rs): + r.query(""" + DROP TABLE IF EXISTS test_part_move_step_by_step_kill; + CREATE TABLE test_part_move_step_by_step_kill(v UInt64) + ENGINE ReplicatedMergeTree('/clickhouse/shard_{}/tables/test_part_move_step_by_step_kill', '{}') + ORDER BY tuple() + """.format(shard_ix, r.name)) + + r.query(""" + DROP TABLE IF EXISTS test_part_move_step_by_step_kill_d; + CREATE TABLE test_part_move_step_by_step_kill_d AS test_part_move_step_by_step_kill + ENGINE Distributed('test_cluster', currentDatabase(), test_part_move_step_by_step_kill) + """) + + s0r0.query("SYSTEM STOP MERGES test_part_move_step_by_step_kill") + s0r1.query("SYSTEM STOP MERGES test_part_move_step_by_step_kill") + + s0r0.query("INSERT INTO test_part_move_step_by_step_kill VALUES (1)") + s0r0.query("INSERT INTO test_part_move_step_by_step_kill VALUES (2)") + s0r1.query("SYSTEM SYNC REPLICA test_part_move_step_by_step_kill", timeout=20) + + assert "2" == s0r0.query("SELECT count() FROM test_part_move_step_by_step_kill").strip() + assert "0" == s1r0.query("SELECT count() FROM test_part_move_step_by_step_kill").strip() + + expected = """ +1 +2 +""" + + def deduplication_invariant_test(): + n = random.choice(list(started_cluster.instances.values())) + try: + assert TSV( + n.query("SELECT * FROM test_part_move_step_by_step_kill_d ORDER BY v", + settings={ + "allow_experimental_query_deduplication": 1 + }) + ) == TSV(expected) + except QueryRuntimeException as e: + # ignore transient errors that are caused by us restarting nodes + if e.returncode not in transient_ch_errors: + raise e + + deduplication_invariant = ConcurrentInvariant(deduplication_invariant_test) + deduplication_invariant.start() + + # Stop a source replica to prevent SYNC_SOURCE succeeding. + s0r1.stop_clickhouse() + + s0r0.query( + "ALTER TABLE test_part_move_step_by_step_kill MOVE PART 'all_0_0_0' TO SHARD '/clickhouse/shard_1/tables/test_part_move_step_by_step_kill'") + + # Should hang on SYNC_SOURCE until all source replicas acknowledge new pinned UUIDs. + wait_for_state("SYNC_SOURCE", s0r0, "test_part_move_step_by_step_kill", "Some replicas haven\\'t processed event") + deduplication_invariant.assert_no_exception() + + # Start all replicas in source shard but stop a replica in destination shard + # to prevent SYNC_DESTINATION succeeding. + s1r1.stop_clickhouse() + s0r1.start_clickhouse() + + # After SYNC_SOURCE step no merges will be assigned. + s0r0.query("SYSTEM START MERGES test_part_move_step_by_step_kill; OPTIMIZE TABLE test_part_move_step_by_step_kill;") + s0r1.query("SYSTEM START MERGES test_part_move_step_by_step_kill; OPTIMIZE TABLE test_part_move_step_by_step_kill;") + + wait_for_state("SYNC_DESTINATION", s0r0, "test_part_move_step_by_step_kill", + "Some replicas haven\\'t processed event") + deduplication_invariant.assert_no_exception() + + # Start previously stopped replica in destination shard to let SYNC_DESTINATION + # succeed. + # Stop the other replica in destination shard to prevent DESTINATION_FETCH succeed. + s1r0.stop_clickhouse() + s1r1.start_clickhouse() + wait_for_state("DESTINATION_FETCH", s0r0, "test_part_move_step_by_step_kill", + "Some replicas haven\\'t processed event") + + # Start previously stopped replica in destination shard to let DESTINATION_FETCH + # succeed. + # Stop the other replica in destination shard to prevent DESTINATION_ATTACH succeed. + s1r1.stop_clickhouse() + s1r0.start_clickhouse() + wait_for_state("DESTINATION_ATTACH", s0r0, "test_part_move_step_by_step_kill", + "Some replicas haven\\'t processed event") + deduplication_invariant.assert_no_exception() + + # Rollback here. + s0r0.query(""" + KILL PART_MOVE_TO_SHARD + WHERE task_uuid = (SELECT task_uuid FROM system.part_moves_between_shards WHERE table = 'test_part_move_step_by_step_kill') + """) + + wait_for_state("DESTINATION_ATTACH", s0r0, "test_part_move_step_by_step_kill", + assert_exception_msg="Some replicas haven\\'t processed event", + assert_rollback=True) + + s1r1.start_clickhouse() + + wait_for_state("CANCELLED", s0r0, "test_part_move_step_by_step_kill", assert_rollback=True) + deduplication_invariant.assert_no_exception() + + # No hung tasks in replication queue. Would timeout otherwise. + for instance in started_cluster.instances.values(): + instance.query("SYSTEM SYNC REPLICA test_part_move_step_by_step_kill") + + assert "2" == s0r0.query("SELECT count() FROM test_part_move_step_by_step_kill").strip() + assert "0" == s1r0.query("SELECT count() FROM test_part_move_step_by_step_kill").strip() + + deduplication_invariant.stop_and_assert_no_exception() def test_move_not_permitted(started_cluster): + # Verify that invariants for part compatibility are checked. + + # Tests are executed in order. Make sure cluster is up if previous test + # failed. + s0r0.start_clickhouse() + s1r0.start_clickhouse() + for ix, n in enumerate([s0r0, s1r0]): - n.query("DROP TABLE IF EXISTS not_permitted") n.query(""" - CREATE TABLE not_permitted(v_{} UInt64) - ENGINE ReplicatedMergeTree('/clickhouse/shard_{}/tables/not_permitted', 'r') - ORDER BY tuple() - """.format(ix, ix)) + DROP TABLE IF EXISTS not_permitted_columns; + + CREATE TABLE not_permitted_columns(v_{ix} UInt64) + ENGINE ReplicatedMergeTree('/clickhouse/shard_{ix}/tables/not_permitted_columns', 'r') + ORDER BY tuple(); + """.format(ix=ix)) - s0r0.query("INSERT INTO not_permitted VALUES (1)") + partition = "date" + if ix > 0: + partition = "v" - with pytest.raises(QueryRuntimeException) as exc: - s0r0.query("ALTER TABLE not_permitted MOVE PART 'all_0_0_0' TO SHARD '/clickhouse/shard_1/tables/not_permitted'") + n.query(""" + DROP TABLE IF EXISTS not_permitted_partition; + CREATE TABLE not_permitted_partition(date Date, v UInt64) + ENGINE ReplicatedMergeTree('/clickhouse/shard_{ix}/tables/not_permitted_partition', 'r') + PARTITION BY ({partition}) + ORDER BY tuple(); + """.format(ix=ix, partition=partition)) - assert "DB::Exception: Table columns structure in ZooKeeper is different from local table structure." in str(exc.value) + s0r0.query("INSERT INTO not_permitted_columns VALUES (1)") + s0r0.query("INSERT INTO not_permitted_partition VALUES ('2021-09-03', 1)") - with pytest.raises(QueryRuntimeException) as exc: - s0r0.query("ALTER TABLE not_permitted MOVE PART 'all_0_0_0' TO SHARD '/clickhouse/shard_0/tables/not_permitted'") + with pytest.raises(QueryRuntimeException, match="DB::Exception: Source and destination are the same"): + s0r0.query("ALTER TABLE not_permitted_columns MOVE PART 'all_0_0_0' TO SHARD '/clickhouse/shard_0/tables/not_permitted_columns'") - assert "DB::Exception: Source and destination are the same" in str(exc.value) + with pytest.raises(QueryRuntimeException, match="DB::Exception: Table columns structure in ZooKeeper is different from local table structure."): + s0r0.query("ALTER TABLE not_permitted_columns MOVE PART 'all_0_0_0' TO SHARD '/clickhouse/shard_1/tables/not_permitted_columns'") + + with pytest.raises(QueryRuntimeException, match="DB::Exception: Existing table metadata in ZooKeeper differs in partition key expression."): + s0r0.query("ALTER TABLE not_permitted_partition MOVE PART '20210903_0_0_0' TO SHARD '/clickhouse/shard_1/tables/not_permitted_partition'") + + +def wait_for_state(desired_state, instance, test_table, assert_exception_msg=None, assert_rollback=False): + last_debug_print_time = time.time() + + print("Waiting to reach state: {}".format(desired_state)) + if assert_exception_msg: + print(" with exception contents: {}".format(assert_exception_msg)) + if assert_rollback: + print(" and rollback: {}".format(assert_rollback)) + + while True: + tasks = TSV.toMat(instance.query( + "SELECT state, num_tries, last_exception, rollback FROM system.part_moves_between_shards WHERE table = '{}'".format( + test_table))) + assert len(tasks) == 1, "only one task expected in this test" + + if time.time() - last_debug_print_time > 30: + last_debug_print_time = time.time() + print("Current state: ", tasks) + + [state, num_tries, last_exception, rollback] = tasks[0] + + if state == desired_state: + if assert_exception_msg and int(num_tries) < 3: + # Let the task be retried a few times when expecting an exception + # to make sure the exception is persistent and the code doesn't + # accidentally continue to run when we expect it not to. + continue + + if assert_exception_msg: + assert assert_exception_msg in last_exception + + if assert_rollback: + assert int(rollback) == 1, "rollback bit isn't set" + + break + elif state in ["DONE", "CANCELLED"]: + raise Exception("Reached terminal state {}, but was waiting for {}".format(state, desired_state)) + + time.sleep(0.1) + + +class ConcurrentInvariant: + def __init__(self, invariant_test, loop_sleep=0.1): + self.invariant_test = invariant_test + self.loop_sleep = loop_sleep + + self.started = False + self.exiting = False + self.exception = None + self.thread = threading.Thread(target=self._loop) + + def start(self): + if self.started: + raise Exception('invariant thread already started') + + self.started = True + self.thread.start() + + def stop_and_assert_no_exception(self): + self._assert_started() + + self.exiting = True + self.thread.join() + + if self.exception: + raise self.exception + + def assert_no_exception(self): + self._assert_started() + + if self.exception: + raise self.exception + + def _loop(self): + try: + while not self.exiting: + self.invariant_test() + time.sleep(self.loop_sleep) + except Exception as e: + self.exiting = True + self.exception = e + + def _assert_started(self): + if not self.started: + raise Exception('invariant thread not started, forgot to call start?')