mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-20 08:40:50 +00:00
Rewrite part movement between shards logic and add kill support
See https://github.com/ClickHouse/ClickHouse/pull/24585#issuecomment-857735081 for extra context about the current implementation.
This commit is contained in:
parent
0381c634d4
commit
9a02061d9c
@ -106,7 +106,7 @@ enum class AccessType
|
||||
(anyone can kill his own queries) */\
|
||||
\
|
||||
M(MOVE_PARTITION_BETWEEN_SHARDS, "", GLOBAL, ALL) /* required to be able to move a part/partition to a table
|
||||
identified by it's ZooKeeper path */\
|
||||
identified by its ZooKeeper path */\
|
||||
\
|
||||
M(CREATE_USER, "", GLOBAL, ACCESS_MANAGEMENT) \
|
||||
M(ALTER_USER, "", GLOBAL, ACCESS_MANAGEMENT) \
|
||||
|
@ -31,6 +31,7 @@ namespace ErrorCodes
|
||||
{
|
||||
extern const int LOGICAL_ERROR;
|
||||
extern const int ACCESS_DENIED;
|
||||
extern const int NOT_IMPLEMENTED;
|
||||
}
|
||||
|
||||
|
||||
@ -290,6 +291,72 @@ BlockIO InterpreterKillQueryQuery::execute()
|
||||
|
||||
break;
|
||||
}
|
||||
case ASTKillQueryQuery::Type::PartMoveToShard:
|
||||
{
|
||||
if (query.sync)
|
||||
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "SYNC modifier is not supported for this statement.");
|
||||
|
||||
Block moves_block = getSelectResult(
|
||||
"database, table, task_name, task_uuid, part_name, to_shard, state",
|
||||
"system.part_moves_between_shards");
|
||||
|
||||
if (!moves_block)
|
||||
return res_io;
|
||||
|
||||
const ColumnString & database_col = typeid_cast<const ColumnString &>(*moves_block.getByName("database").column);
|
||||
const ColumnString & table_col = typeid_cast<const ColumnString &>(*moves_block.getByName("table").column);
|
||||
const ColumnUUID & task_uuid_col = typeid_cast<const ColumnUUID &>(*moves_block.getByName("task_uuid").column);
|
||||
|
||||
auto header = moves_block.cloneEmpty();
|
||||
header.insert(0, {ColumnString::create(), std::make_shared<DataTypeString>(), "kill_status"});
|
||||
|
||||
MutableColumns res_columns = header.cloneEmptyColumns();
|
||||
auto table_id = StorageID::createEmpty();
|
||||
AccessRightsElements required_access_rights;
|
||||
auto access = getContext()->getAccess();
|
||||
bool access_denied = false;
|
||||
|
||||
for (size_t i = 0; i < moves_block.rows(); ++i)
|
||||
{
|
||||
table_id = StorageID{database_col.getDataAt(i).toString(), table_col.getDataAt(i).toString()};
|
||||
auto task_uuid = get<UUID>(task_uuid_col[i]);
|
||||
|
||||
CancellationCode code = CancellationCode::Unknown;
|
||||
|
||||
if (!query.test)
|
||||
{
|
||||
auto storage = DatabaseCatalog::instance().tryGetTable(table_id, getContext());
|
||||
if (!storage)
|
||||
code = CancellationCode::NotFound;
|
||||
else
|
||||
{
|
||||
ASTAlterCommand alter_command{};
|
||||
alter_command.type = ASTAlterCommand::MOVE_PARTITION;
|
||||
alter_command.move_destination_type = DataDestinationType::SHARD;
|
||||
required_access_rights = InterpreterAlterQuery::getRequiredAccessForCommand(
|
||||
alter_command, table_id.database_name, table_id.table_name);
|
||||
if (!access->isGranted(required_access_rights))
|
||||
{
|
||||
access_denied = true;
|
||||
continue;
|
||||
}
|
||||
code = storage->killPartMoveToShard(task_uuid);
|
||||
}
|
||||
}
|
||||
|
||||
insertResultRow(i, code, moves_block, header, res_columns);
|
||||
}
|
||||
|
||||
if (res_columns[0]->empty() && access_denied)
|
||||
throw Exception(
|
||||
"Not allowed to kill move partition. To execute this query it's necessary to have the grant " + required_access_rights.toString(),
|
||||
ErrorCodes::ACCESS_DENIED);
|
||||
|
||||
|
||||
res_io.in = std::make_shared<OneBlockInputStream>(header.cloneWithColumns(std::move(res_columns)));
|
||||
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return res_io;
|
||||
|
@ -11,8 +11,20 @@ String ASTKillQueryQuery::getID(char delim) const
|
||||
|
||||
void ASTKillQueryQuery::formatQueryImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const
|
||||
{
|
||||
settings.ostr << (settings.hilite ? hilite_keyword : "") << "KILL "
|
||||
<< (type == Type::Query ? "QUERY" : "MUTATION");
|
||||
settings.ostr << (settings.hilite ? hilite_keyword : "") << "KILL ";
|
||||
|
||||
switch (type)
|
||||
{
|
||||
case Type::Query:
|
||||
settings.ostr << "QUERY";
|
||||
break;
|
||||
case Type::Mutation:
|
||||
settings.ostr << "MUTATION";
|
||||
break;
|
||||
case Type::PartMoveToShard:
|
||||
settings.ostr << "PART_MOVE_TO_SHARD";
|
||||
break;
|
||||
}
|
||||
|
||||
formatOnCluster(settings);
|
||||
|
||||
|
@ -13,6 +13,7 @@ public:
|
||||
{
|
||||
Query, /// KILL QUERY
|
||||
Mutation, /// KILL MUTATION
|
||||
PartMoveToShard, /// KILL PART_MOVE_TO_SHARD
|
||||
};
|
||||
|
||||
Type type = Type::Query;
|
||||
|
@ -17,6 +17,7 @@ bool ParserKillQueryQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expect
|
||||
ParserKeyword p_kill{"KILL"};
|
||||
ParserKeyword p_query{"QUERY"};
|
||||
ParserKeyword p_mutation{"MUTATION"};
|
||||
ParserKeyword p_part_move_to_shard{"PART_MOVE_TO_SHARD"};
|
||||
ParserKeyword p_on{"ON"};
|
||||
ParserKeyword p_test{"TEST"};
|
||||
ParserKeyword p_sync{"SYNC"};
|
||||
@ -31,6 +32,8 @@ bool ParserKillQueryQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expect
|
||||
query->type = ASTKillQueryQuery::Type::Query;
|
||||
else if (p_mutation.ignore(pos, expected))
|
||||
query->type = ASTKillQueryQuery::Type::Mutation;
|
||||
else if (p_part_move_to_shard.ignore(pos, expected))
|
||||
query->type = ASTKillQueryQuery::Type::PartMoveToShard;
|
||||
else
|
||||
return false;
|
||||
|
||||
|
@ -460,6 +460,12 @@ public:
|
||||
throw Exception("Mutations are not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED);
|
||||
}
|
||||
|
||||
/// Cancel a part move to shard.
|
||||
virtual CancellationCode killPartMoveToShard(const UUID & /*task_uuid*/)
|
||||
{
|
||||
throw Exception("Part moves between shards are not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED);
|
||||
}
|
||||
|
||||
/** If the table have to do some complicated work on startup,
|
||||
* that must be postponed after creation of table object
|
||||
* (like launching some background threads),
|
||||
|
@ -1,13 +1,20 @@
|
||||
#include <Storages/MergeTree/PartMovesBetweenShardsOrchestrator.h>
|
||||
#include <Storages/MergeTree/PinnedPartUUIDs.h>
|
||||
#include <Storages/StorageReplicatedMergeTree.h>
|
||||
#include <boost/range/adaptor/map.hpp>
|
||||
#include <Poco/JSON/JSON.h>
|
||||
#include <Poco/JSON/Object.h>
|
||||
#include <Poco/JSON/Parser.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int BAD_ARGUMENTS;
|
||||
extern const int LOGICAL_ERROR;
|
||||
extern const int TIMEOUT_EXCEEDED;
|
||||
}
|
||||
|
||||
PartMovesBetweenShardsOrchestrator::PartMovesBetweenShardsOrchestrator(StorageReplicatedMergeTree & storage_)
|
||||
: storage(storage_)
|
||||
, zookeeper_path(storage.zookeeper_path)
|
||||
@ -27,16 +34,17 @@ void PartMovesBetweenShardsOrchestrator::run()
|
||||
if (need_stop)
|
||||
return;
|
||||
|
||||
auto sleep_ms = 10;
|
||||
/// Don't poll ZooKeeper too often.
|
||||
auto sleep_ms = 3 * 1000;
|
||||
|
||||
try
|
||||
{
|
||||
fetchStateFromZK();
|
||||
syncStateFromZK();
|
||||
|
||||
/// Schedule for immediate re-execution as likely there is more work
|
||||
/// to be done.
|
||||
if (step())
|
||||
fetchStateFromZK();
|
||||
else
|
||||
sleep_ms = 3 * 1000;
|
||||
task->schedule();
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
@ -54,11 +62,11 @@ void PartMovesBetweenShardsOrchestrator::shutdown()
|
||||
LOG_TRACE(log, "PartMovesBetweenShardsOrchestrator thread finished");
|
||||
}
|
||||
|
||||
void PartMovesBetweenShardsOrchestrator::fetchStateFromZK()
|
||||
void PartMovesBetweenShardsOrchestrator::syncStateFromZK()
|
||||
{
|
||||
std::lock_guard lock(state_mutex);
|
||||
|
||||
entries.clear();
|
||||
std::vector<Entry> new_entries;
|
||||
|
||||
auto zk = storage.getZooKeeper();
|
||||
|
||||
@ -76,8 +84,11 @@ void PartMovesBetweenShardsOrchestrator::fetchStateFromZK()
|
||||
e.version = stat.version;
|
||||
e.znode_name = task_name;
|
||||
|
||||
entries[task_name] = std::move(e);
|
||||
new_entries.push_back(std::move(e));
|
||||
}
|
||||
|
||||
// Replace in-memory state.
|
||||
entries = new_entries;
|
||||
}
|
||||
|
||||
bool PartMovesBetweenShardsOrchestrator::step()
|
||||
@ -93,7 +104,7 @@ bool PartMovesBetweenShardsOrchestrator::step()
|
||||
{
|
||||
std::lock_guard lock(state_mutex);
|
||||
|
||||
for (auto const & entry : entries | boost::adaptors::map_values)
|
||||
for (auto const & entry : entries)
|
||||
{
|
||||
if (entry.state.value == EntryState::DONE || entry.state.value == EntryState::CANCELLED)
|
||||
continue;
|
||||
@ -128,11 +139,21 @@ bool PartMovesBetweenShardsOrchestrator::step()
|
||||
throw;
|
||||
}
|
||||
|
||||
LOG_DEBUG(log, "stepEntry on task {} from state {} (rollback: {}), try: {}",
|
||||
entry_to_process->znode_name,
|
||||
entry_to_process->state.toString(),
|
||||
entry_to_process->rollback,
|
||||
entry_to_process->num_tries);
|
||||
|
||||
try
|
||||
{
|
||||
/// Use the same ZooKeeper connection. If we'd lost the lock then connection
|
||||
/// will become expired and all consequent operations will fail.
|
||||
stepEntry(entry_to_process.value(), zk);
|
||||
Entry new_entry = stepEntry(entry_to_process.value(), zk);
|
||||
new_entry.last_exception_msg = "";
|
||||
new_entry.num_tries = 0;
|
||||
new_entry.update_time = std::time(nullptr);
|
||||
zk->set(new_entry.znode_path, new_entry.toString(), new_entry.version);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
@ -140,6 +161,7 @@ bool PartMovesBetweenShardsOrchestrator::step()
|
||||
|
||||
Entry entry_copy = entry_to_process.value();
|
||||
entry_copy.last_exception_msg = getCurrentExceptionMessage(false);
|
||||
entry_copy.num_tries += 1;
|
||||
entry_copy.update_time = std::time(nullptr);
|
||||
zk->set(entry_copy.znode_path, entry_copy.toString(), entry_copy.version);
|
||||
|
||||
@ -149,276 +171,537 @@ bool PartMovesBetweenShardsOrchestrator::step()
|
||||
return true;
|
||||
}
|
||||
|
||||
void PartMovesBetweenShardsOrchestrator::stepEntry(const Entry & entry, zkutil::ZooKeeperPtr zk)
|
||||
PartMovesBetweenShardsOrchestrator::Entry PartMovesBetweenShardsOrchestrator::stepEntry(Entry entry, zkutil::ZooKeeperPtr zk)
|
||||
{
|
||||
switch (entry.state.value)
|
||||
{
|
||||
case EntryState::DONE:
|
||||
break;
|
||||
|
||||
case EntryState::DONE: [[fallthrough]];
|
||||
case EntryState::CANCELLED:
|
||||
break;
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't stepEntry after terminal state. This is a bug.");
|
||||
|
||||
case EntryState::TODO:
|
||||
{
|
||||
/// State transition.
|
||||
Entry entry_copy = entry;
|
||||
entry_copy.state = EntryState::SYNC_SOURCE;
|
||||
entry_copy.update_time = std::time(nullptr);
|
||||
zk->set(entry_copy.znode_path, entry_copy.toString(), entry_copy.version);
|
||||
if (entry.rollback)
|
||||
{
|
||||
removePins(entry, zk);
|
||||
entry.state = EntryState::CANCELLED;
|
||||
return entry;
|
||||
}
|
||||
/// The forward transition happens implicitly when task is created by `StorageReplicatedMergeTree::movePartitionToShard`.
|
||||
else
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected entry state ({}) in stepEntry. This is a bug.", entry.state.toString());
|
||||
}
|
||||
break;
|
||||
|
||||
case EntryState::SYNC_SOURCE:
|
||||
{
|
||||
if (entry.rollback)
|
||||
{
|
||||
/// Log entry.
|
||||
Coordination::Requests ops;
|
||||
ops.emplace_back(zkutil::makeCheckRequest(entry.znode_path, entry.version));
|
||||
|
||||
ReplicatedMergeTreeLogEntryData log_entry;
|
||||
log_entry.type = ReplicatedMergeTreeLogEntryData::SYNC_PINNED_PART_UUIDS;
|
||||
log_entry.create_time = std::time(nullptr);
|
||||
log_entry.source_replica = storage.replica_name;
|
||||
ops.emplace_back(zkutil::makeSetRequest(zookeeper_path + "/log", "", -1));
|
||||
ops.emplace_back(zkutil::makeCreateRequest(
|
||||
zookeeper_path + "/log/log-", log_entry.toString(), zkutil::CreateMode::PersistentSequential));
|
||||
|
||||
Coordination::Responses responses;
|
||||
Coordination::Error rc = zk->tryMulti(ops, responses);
|
||||
zkutil::KeeperMultiException::check(rc, ops, responses);
|
||||
|
||||
String log_znode_path = dynamic_cast<const Coordination::CreateResponse &>(*responses.back()).path_created;
|
||||
log_entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1);
|
||||
|
||||
/// This wait in background schedule pool is useless. It'd be
|
||||
/// better to have some notification which will call `step`
|
||||
/// function when all replicated will finish. TODO.
|
||||
storage.waitForAllReplicasToProcessLogEntry(zookeeper_path, log_entry, -1);
|
||||
entry.state = EntryState::TODO;
|
||||
return entry;
|
||||
}
|
||||
|
||||
else
|
||||
{
|
||||
/// State transition.
|
||||
Entry entry_copy = entry;
|
||||
entry_copy.state = EntryState::SYNC_DESTINATION;
|
||||
entry_copy.update_time = std::time(nullptr);
|
||||
zk->set(entry_copy.znode_path, entry_copy.toString(), entry_copy.version);
|
||||
ReplicatedMergeTreeLogEntryData sync_source_log_entry;
|
||||
|
||||
String sync_source_log_entry_barrier_path = fs::path(entry.znode_path) / ("log_" + entry.state.toString());
|
||||
Coordination::Stat sync_source_log_entry_stat;
|
||||
String sync_source_log_entry_str;
|
||||
if (zk->tryGet(sync_source_log_entry_barrier_path, sync_source_log_entry_str, &sync_source_log_entry_stat))
|
||||
{
|
||||
LOG_DEBUG(log, "Log entry was already created will check the existing one.");
|
||||
|
||||
sync_source_log_entry = *ReplicatedMergeTreeLogEntry::parse(sync_source_log_entry_str, sync_source_log_entry_stat);
|
||||
}
|
||||
else
|
||||
{
|
||||
/// Log entry.
|
||||
Coordination::Requests ops;
|
||||
ops.emplace_back(zkutil::makeCheckRequest(entry.znode_path, entry.version));
|
||||
|
||||
sync_source_log_entry.type = ReplicatedMergeTreeLogEntryData::SYNC_PINNED_PART_UUIDS;
|
||||
sync_source_log_entry.log_entry_id = sync_source_log_entry_barrier_path;
|
||||
sync_source_log_entry.create_time = std::time(nullptr);
|
||||
sync_source_log_entry.source_replica = storage.replica_name;
|
||||
|
||||
ops.emplace_back(zkutil::makeCreateRequest(sync_source_log_entry_barrier_path, sync_source_log_entry.toString(), -1));
|
||||
ops.emplace_back(zkutil::makeSetRequest(zookeeper_path + "/log", "", -1));
|
||||
ops.emplace_back(zkutil::makeCreateRequest(
|
||||
zookeeper_path + "/log/log-", sync_source_log_entry.toString(), zkutil::CreateMode::PersistentSequential));
|
||||
|
||||
Coordination::Responses responses;
|
||||
Coordination::Error rc = zk->tryMulti(ops, responses);
|
||||
zkutil::KeeperMultiException::check(rc, ops, responses);
|
||||
|
||||
String log_znode_path = dynamic_cast<const Coordination::CreateResponse &>(*responses.back()).path_created;
|
||||
sync_source_log_entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1);
|
||||
|
||||
LOG_DEBUG(log, "Pushed log entry: {}", log_znode_path);
|
||||
}
|
||||
|
||||
Strings unwaited = storage.tryWaitForAllReplicasToProcessLogEntry(zookeeper_path, sync_source_log_entry, 1);
|
||||
if (!unwaited.empty())
|
||||
throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Some replicas haven't processed event: {}, will retry later.", toString(unwaited));
|
||||
|
||||
entry.state = EntryState::SYNC_DESTINATION;
|
||||
return entry;
|
||||
}
|
||||
}
|
||||
break;
|
||||
|
||||
case EntryState::SYNC_DESTINATION:
|
||||
{
|
||||
if (entry.rollback)
|
||||
{
|
||||
/// Log entry.
|
||||
Coordination::Requests ops;
|
||||
ops.emplace_back(zkutil::makeCheckRequest(entry.znode_path, entry.version));
|
||||
|
||||
ReplicatedMergeTreeLogEntryData log_entry;
|
||||
log_entry.type = ReplicatedMergeTreeLogEntryData::SYNC_PINNED_PART_UUIDS;
|
||||
log_entry.create_time = std::time(nullptr);
|
||||
log_entry.source_replica = storage.replica_name;
|
||||
log_entry.source_shard = zookeeper_path;
|
||||
|
||||
ops.emplace_back(zkutil::makeSetRequest(entry.to_shard + "/log", "", -1));
|
||||
ops.emplace_back(zkutil::makeCreateRequest(
|
||||
entry.to_shard + "/log/log-", log_entry.toString(), zkutil::CreateMode::PersistentSequential));
|
||||
|
||||
Coordination::Responses responses;
|
||||
Coordination::Error rc = zk->tryMulti(ops, responses);
|
||||
zkutil::KeeperMultiException::check(rc, ops, responses);
|
||||
|
||||
String log_znode_path = dynamic_cast<const Coordination::CreateResponse &>(*responses.back()).path_created;
|
||||
log_entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1);
|
||||
|
||||
storage.waitForAllReplicasToProcessLogEntry(entry.to_shard, log_entry, -1);
|
||||
}
|
||||
|
||||
{
|
||||
/// State transition.
|
||||
Entry entry_copy = entry;
|
||||
entry_copy.state = EntryState::DESTINATION_FETCH;
|
||||
entry_copy.update_time = std::time(nullptr);
|
||||
zk->set(entry_copy.znode_path, entry_copy.toString(), entry_copy.version);
|
||||
entry_copy.state = EntryState::SYNC_SOURCE;
|
||||
return entry_copy;
|
||||
}
|
||||
else
|
||||
{
|
||||
ReplicatedMergeTreeLogEntryData sync_destination_log_entry;
|
||||
|
||||
String sync_destination_log_entry_barrier_path = fs::path(entry.znode_path) / ("log_" + entry.state.toString());
|
||||
Coordination::Stat sync_destination_log_entry_stat;
|
||||
String sync_destination_log_entry_str;
|
||||
if (zk->tryGet(sync_destination_log_entry_barrier_path, sync_destination_log_entry_str, &sync_destination_log_entry_stat))
|
||||
{
|
||||
LOG_DEBUG(log, "Log entry was already created will check the existing one.");
|
||||
|
||||
sync_destination_log_entry = *ReplicatedMergeTreeLogEntry::parse(sync_destination_log_entry_str, sync_destination_log_entry_stat);
|
||||
}
|
||||
else
|
||||
{
|
||||
/// Log entry.
|
||||
Coordination::Requests ops;
|
||||
ops.emplace_back(zkutil::makeCheckRequest(entry.znode_path, entry.version));
|
||||
|
||||
sync_destination_log_entry.type = ReplicatedMergeTreeLogEntryData::SYNC_PINNED_PART_UUIDS;
|
||||
sync_destination_log_entry.log_entry_id = sync_destination_log_entry_barrier_path;
|
||||
sync_destination_log_entry.create_time = std::time(nullptr);
|
||||
sync_destination_log_entry.source_replica = storage.replica_name;
|
||||
sync_destination_log_entry.source_shard = zookeeper_path;
|
||||
|
||||
ops.emplace_back(zkutil::makeCreateRequest(sync_destination_log_entry_barrier_path, sync_destination_log_entry.toString(), -1));
|
||||
ops.emplace_back(zkutil::makeSetRequest(entry.to_shard + "/log", "", -1));
|
||||
ops.emplace_back(zkutil::makeCreateRequest(
|
||||
entry.to_shard + "/log/log-", sync_destination_log_entry.toString(), zkutil::CreateMode::PersistentSequential));
|
||||
|
||||
Coordination::Responses responses;
|
||||
Coordination::Error rc = zk->tryMulti(ops, responses);
|
||||
zkutil::KeeperMultiException::check(rc, ops, responses);
|
||||
|
||||
String log_znode_path = dynamic_cast<const Coordination::CreateResponse &>(*responses.back()).path_created;
|
||||
sync_destination_log_entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1);
|
||||
|
||||
LOG_DEBUG(log, "Pushed log entry: {}", log_znode_path);
|
||||
}
|
||||
|
||||
Strings unwaited = storage.tryWaitForAllReplicasToProcessLogEntry(entry.to_shard, sync_destination_log_entry, 1);
|
||||
if (!unwaited.empty())
|
||||
throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Some replicas haven't processed event: {}, will retry later.", toString(unwaited));
|
||||
|
||||
entry.state = EntryState::DESTINATION_FETCH;
|
||||
return entry;
|
||||
}
|
||||
}
|
||||
break;
|
||||
|
||||
case EntryState::DESTINATION_FETCH:
|
||||
{
|
||||
/// Make sure table structure doesn't change when there are part movements in progress.
|
||||
|
||||
if (entry.rollback)
|
||||
{
|
||||
Coordination::Requests ops;
|
||||
ops.emplace_back(zkutil::makeCheckRequest(entry.znode_path, entry.version));
|
||||
// TODO(nv): Do we want to cleanup fetched data on the destination?
|
||||
// Maybe leave it there and make sure a background cleanup will take
|
||||
// care of it sometime later.
|
||||
|
||||
/// Log entry.
|
||||
ReplicatedMergeTreeLogEntryData log_entry;
|
||||
log_entry.type = ReplicatedMergeTreeLogEntryData::CLONE_PART_FROM_SHARD;
|
||||
log_entry.create_time = std::time(nullptr);
|
||||
log_entry.new_part_name = entry.part_name;
|
||||
log_entry.source_replica = storage.replica_name;
|
||||
log_entry.source_shard = zookeeper_path;
|
||||
ops.emplace_back(zkutil::makeSetRequest(entry.to_shard + "/log", "", -1));
|
||||
ops.emplace_back(zkutil::makeCreateRequest(
|
||||
entry.to_shard + "/log/log-", log_entry.toString(), zkutil::CreateMode::PersistentSequential));
|
||||
|
||||
Coordination::Responses responses;
|
||||
Coordination::Error rc = zk->tryMulti(ops, responses);
|
||||
zkutil::KeeperMultiException::check(rc, ops, responses);
|
||||
|
||||
String log_znode_path = dynamic_cast<const Coordination::CreateResponse &>(*responses.back()).path_created;
|
||||
log_entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1);
|
||||
|
||||
storage.waitForAllReplicasToProcessLogEntry(entry.to_shard, log_entry, -1);
|
||||
entry.state = EntryState::SYNC_DESTINATION;
|
||||
return entry;
|
||||
}
|
||||
|
||||
else
|
||||
{
|
||||
/// State transition.
|
||||
Entry entry_copy = entry;
|
||||
entry_copy.state = EntryState::DESTINATION_ATTACH;
|
||||
entry_copy.update_time = std::time(nullptr);
|
||||
zk->set(entry_copy.znode_path, entry_copy.toString(), entry_copy.version);
|
||||
/// Note: Table structure shouldn't be changed while there are part movements in progress.
|
||||
|
||||
ReplicatedMergeTreeLogEntryData fetch_log_entry;
|
||||
|
||||
String fetch_log_entry_barrier_path = fs::path(entry.znode_path) / ("log_" + entry.state.toString());
|
||||
Coordination::Stat fetch_log_entry_stat;
|
||||
String fetch_log_entry_str;
|
||||
if (zk->tryGet(fetch_log_entry_barrier_path, fetch_log_entry_str, &fetch_log_entry_stat))
|
||||
{
|
||||
LOG_DEBUG(log, "Log entry was already created will check the existing one.");
|
||||
|
||||
fetch_log_entry = *ReplicatedMergeTreeLogEntry::parse(fetch_log_entry_str, fetch_log_entry_stat);
|
||||
}
|
||||
else
|
||||
{
|
||||
Coordination::Requests ops;
|
||||
ops.emplace_back(zkutil::makeCheckRequest(entry.znode_path, entry.version));
|
||||
|
||||
fetch_log_entry.type = ReplicatedMergeTreeLogEntryData::CLONE_PART_FROM_SHARD;
|
||||
fetch_log_entry.log_entry_id = fetch_log_entry_barrier_path;
|
||||
fetch_log_entry.create_time = std::time(nullptr);
|
||||
fetch_log_entry.new_part_name = entry.part_name;
|
||||
fetch_log_entry.source_replica = storage.replica_name;
|
||||
fetch_log_entry.source_shard = zookeeper_path;
|
||||
ops.emplace_back(zkutil::makeCreateRequest(fetch_log_entry_barrier_path, fetch_log_entry.toString(), -1));
|
||||
ops.emplace_back(zkutil::makeSetRequest(entry.to_shard + "/log", "", -1));
|
||||
ops.emplace_back(zkutil::makeCreateRequest(
|
||||
entry.to_shard + "/log/log-", fetch_log_entry.toString(), zkutil::CreateMode::PersistentSequential));
|
||||
|
||||
Coordination::Responses responses;
|
||||
Coordination::Error rc = zk->tryMulti(ops, responses);
|
||||
zkutil::KeeperMultiException::check(rc, ops, responses);
|
||||
|
||||
String log_znode_path = dynamic_cast<const Coordination::CreateResponse &>(*responses.back()).path_created;
|
||||
fetch_log_entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1);
|
||||
|
||||
LOG_DEBUG(log, "Pushed log entry: {}", log_znode_path);
|
||||
}
|
||||
|
||||
Strings unwaited = storage.tryWaitForAllReplicasToProcessLogEntry(entry.to_shard, fetch_log_entry, 1);
|
||||
if (!unwaited.empty())
|
||||
throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Some replicas haven't processed event: {}, will retry later.", toString(unwaited));
|
||||
|
||||
entry.state = EntryState::DESTINATION_ATTACH;
|
||||
return entry;
|
||||
}
|
||||
}
|
||||
break;
|
||||
|
||||
case EntryState::DESTINATION_ATTACH:
|
||||
{
|
||||
/// There is a chance that attach on destination will fail and this task will be left in the queue forever.
|
||||
String attach_log_entry_barrier_path = fs::path(entry.znode_path) / ("log_" + entry.state.toString());
|
||||
|
||||
if (entry.rollback)
|
||||
{
|
||||
Coordination::Stat attach_log_entry_stat;
|
||||
String attach_log_entry_str;
|
||||
if (!zk->tryGet(attach_log_entry_barrier_path, attach_log_entry_str, &attach_log_entry_stat))
|
||||
{
|
||||
LOG_DEBUG(log, "Log entry for DESTINATION_ATTACH not found. Not sending DROP_RANGE log entry.");
|
||||
|
||||
// ATTACH_PART wasn't issued, nothing to revert.
|
||||
entry.state = EntryState::DESTINATION_FETCH;
|
||||
return entry;
|
||||
}
|
||||
else
|
||||
{
|
||||
// Need to remove ATTACH_PART from the queue or drop data.
|
||||
// Similar to `StorageReplicatedMergeTree::dropPart` w/o extra
|
||||
// checks as we know drop shall be possible.
|
||||
ReplicatedMergeTreeLogEntryData attach_rollback_log_entry;
|
||||
|
||||
String attach_rollback_log_entry_barrier_path = fs::path(entry.znode_path) / ("log_" + entry.state.toString() + "_rollback");
|
||||
Coordination::Stat attach_rollback_log_entry_stat;
|
||||
String attach_rollback_log_entry_str;
|
||||
if (zk->tryGet(attach_rollback_log_entry_barrier_path, attach_rollback_log_entry_str, &attach_rollback_log_entry_stat))
|
||||
{
|
||||
LOG_DEBUG(log, "Log entry was already created will check the existing one.");
|
||||
|
||||
attach_rollback_log_entry = *ReplicatedMergeTreeLogEntry::parse(attach_rollback_log_entry_str, attach_rollback_log_entry_stat);
|
||||
}
|
||||
else
|
||||
{
|
||||
const auto attach_log_entry = ReplicatedMergeTreeLogEntry::parse(attach_log_entry_str, attach_log_entry_stat);
|
||||
|
||||
Coordination::Requests ops;
|
||||
ops.emplace_back(zkutil::makeCheckRequest(entry.znode_path, entry.version));
|
||||
|
||||
auto drop_part_info = MergeTreePartInfo::fromPartName(attach_log_entry->new_part_name, storage.format_version);
|
||||
|
||||
storage.getClearBlocksInPartitionOps(
|
||||
ops, *zk, drop_part_info.partition_id, drop_part_info.min_block, drop_part_info.max_block);
|
||||
size_t clear_block_ops_size = ops.size();
|
||||
|
||||
attach_rollback_log_entry.type = ReplicatedMergeTreeLogEntryData::DROP_RANGE;
|
||||
attach_rollback_log_entry.log_entry_id = attach_rollback_log_entry_barrier_path;
|
||||
attach_rollback_log_entry.source_replica = storage.replica_name;
|
||||
attach_rollback_log_entry.source_shard = zookeeper_path;
|
||||
|
||||
attach_rollback_log_entry.new_part_name = getPartNamePossiblyFake(storage.format_version, drop_part_info);
|
||||
attach_rollback_log_entry.create_time = time(nullptr);
|
||||
|
||||
ops.emplace_back(zkutil::makeCreateRequest(attach_rollback_log_entry_barrier_path, attach_rollback_log_entry.toString(), -1));
|
||||
ops.emplace_back(zkutil::makeCreateRequest(
|
||||
entry.to_shard + "/log/log-", attach_rollback_log_entry.toString(), zkutil::CreateMode::PersistentSequential));
|
||||
ops.emplace_back(zkutil::makeSetRequest(entry.to_shard + "/log", "", -1));
|
||||
Coordination::Responses responses;
|
||||
Coordination::Error rc = zk->tryMulti(ops, responses);
|
||||
zkutil::KeeperMultiException::check(rc, ops, responses);
|
||||
|
||||
String log_znode_path
|
||||
= dynamic_cast<const Coordination::CreateResponse &>(*responses[clear_block_ops_size]).path_created;
|
||||
attach_rollback_log_entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1);
|
||||
|
||||
LOG_DEBUG(log, "Pushed log entry: {}", log_znode_path);
|
||||
}
|
||||
|
||||
Strings unwaited = storage.tryWaitForAllReplicasToProcessLogEntry(entry.to_shard, attach_rollback_log_entry, 1);
|
||||
if (!unwaited.empty())
|
||||
throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Some replicas haven't processed event: {}, will retry later.", toString(unwaited));
|
||||
|
||||
entry.state = EntryState::DESTINATION_FETCH;
|
||||
return entry;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
/// There is a chance that attach on destination will fail and this task will be left in the queue forever.
|
||||
|
||||
Coordination::Requests ops;
|
||||
ops.emplace_back(zkutil::makeCheckRequest(entry.znode_path, entry.version));
|
||||
|
||||
auto part = storage.getActiveContainingPart(entry.part_name);
|
||||
|
||||
/// Allocating block number in other replicas zookeeper path
|
||||
/// TODO Maybe we can do better.
|
||||
auto block_number_lock = storage.allocateBlockNumber(part->info.partition_id, zk, "", entry.to_shard);
|
||||
auto block_number = block_number_lock->getNumber();
|
||||
auto block_number_lock = storage.allocateBlockNumber(part->info.partition_id, zk, attach_log_entry_barrier_path, entry.to_shard);
|
||||
|
||||
auto part_info = part->info;
|
||||
part_info.min_block = block_number;
|
||||
part_info.max_block = block_number;
|
||||
part_info.level = 0;
|
||||
part_info.mutation = 0;
|
||||
|
||||
/// Attach log entry (all replicas already fetched part)
|
||||
ReplicatedMergeTreeLogEntryData log_entry;
|
||||
log_entry.type = ReplicatedMergeTreeLogEntryData::ATTACH_PART;
|
||||
log_entry.part_checksum = part->checksums.getTotalChecksumHex();
|
||||
log_entry.create_time = std::time(nullptr);
|
||||
log_entry.new_part_name = part_info.getPartName();
|
||||
ops.emplace_back(zkutil::makeSetRequest(entry.to_shard + "/log", "", -1));
|
||||
ops.emplace_back(zkutil::makeCreateRequest(
|
||||
entry.to_shard + "/log/log-", log_entry.toString(), zkutil::CreateMode::PersistentSequential));
|
||||
|
||||
Coordination::Responses responses;
|
||||
Coordination::Error rc = zk->tryMulti(ops, responses);
|
||||
zkutil::KeeperMultiException::check(rc, ops, responses);
|
||||
if (block_number_lock)
|
||||
{
|
||||
auto block_number = block_number_lock->getNumber();
|
||||
|
||||
String log_znode_path = dynamic_cast<const Coordination::CreateResponse &>(*responses.back()).path_created;
|
||||
log_entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1);
|
||||
auto part_info = part->info;
|
||||
part_info.min_block = block_number;
|
||||
part_info.max_block = block_number;
|
||||
part_info.level = 0;
|
||||
part_info.mutation = 0;
|
||||
|
||||
storage.waitForAllReplicasToProcessLogEntry(entry.to_shard, log_entry, -1);
|
||||
}
|
||||
/// Attach log entry (all replicas already fetched part)
|
||||
log_entry.type = ReplicatedMergeTreeLogEntryData::ATTACH_PART;
|
||||
log_entry.log_entry_id = attach_log_entry_barrier_path;
|
||||
log_entry.part_checksum = part->checksums.getTotalChecksumHex();
|
||||
log_entry.create_time = std::time(nullptr);
|
||||
log_entry.new_part_name = part_info.getPartName();
|
||||
|
||||
{
|
||||
/// State transition.
|
||||
Entry entry_copy = entry;
|
||||
entry_copy.state = EntryState::SOURCE_DROP_PRE_DELAY;
|
||||
entry_copy.update_time = std::time(nullptr);
|
||||
zk->set(entry_copy.znode_path, entry_copy.toString(), entry_copy.version);
|
||||
ops.emplace_back(zkutil::makeCreateRequest(attach_log_entry_barrier_path, log_entry.toString(), -1));
|
||||
ops.emplace_back(zkutil::makeSetRequest(entry.to_shard + "/log", "", -1));
|
||||
ops.emplace_back(zkutil::makeCreateRequest(
|
||||
entry.to_shard + "/log/log-", log_entry.toString(), zkutil::CreateMode::PersistentSequential));
|
||||
|
||||
Coordination::Responses responses;
|
||||
Coordination::Error rc = zk->tryMulti(ops, responses);
|
||||
zkutil::KeeperMultiException::check(rc, ops, responses);
|
||||
|
||||
String log_znode_path = dynamic_cast<const Coordination::CreateResponse &>(*responses.back()).path_created;
|
||||
log_entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1);
|
||||
|
||||
LOG_DEBUG(log, "Pushed log entry: {}", log_znode_path);
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG_DEBUG(log, "Log entry was already created will check the existing one.");
|
||||
|
||||
Coordination::Stat stat;
|
||||
String log_entry_str = zk->get(attach_log_entry_barrier_path, &stat);
|
||||
log_entry = *ReplicatedMergeTreeLogEntry::parse(log_entry_str, stat);
|
||||
}
|
||||
|
||||
Strings unwaited = storage.tryWaitForAllReplicasToProcessLogEntry(entry.to_shard, log_entry, 1);
|
||||
if (!unwaited.empty())
|
||||
throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Some replicas haven't processed event: {}, will retry later.", toString(unwaited));
|
||||
|
||||
|
||||
entry.dst_part_name = log_entry.new_part_name;
|
||||
entry.state = EntryState::SOURCE_DROP_PRE_DELAY;
|
||||
return entry;
|
||||
}
|
||||
}
|
||||
break;
|
||||
|
||||
case EntryState::SOURCE_DROP_PRE_DELAY:
|
||||
{
|
||||
std::this_thread::sleep_for(std::chrono::seconds(storage.getSettings()->part_moves_between_shards_delay_seconds));
|
||||
|
||||
/// State transition.
|
||||
Entry entry_copy = entry;
|
||||
entry_copy.state = EntryState::SOURCE_DROP;
|
||||
entry_copy.update_time = std::time(nullptr);
|
||||
zk->set(entry_copy.znode_path, entry_copy.toString(), entry_copy.version);
|
||||
if (entry.rollback)
|
||||
{
|
||||
entry.state = EntryState::DESTINATION_ATTACH;
|
||||
return entry;
|
||||
}
|
||||
else
|
||||
{
|
||||
std::this_thread::sleep_for(std::chrono::seconds(storage.getSettings()->part_moves_between_shards_delay_seconds));
|
||||
entry.state = EntryState::SOURCE_DROP;
|
||||
return entry;
|
||||
}
|
||||
}
|
||||
break;
|
||||
|
||||
case EntryState::SOURCE_DROP:
|
||||
{
|
||||
if (entry.rollback)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "It is not possible to rollback from this state. This is a bug.");
|
||||
else
|
||||
{
|
||||
ReplicatedMergeTreeLogEntry log_entry;
|
||||
if (storage.dropPartImpl(zk, entry.part_name, log_entry, false, false))
|
||||
storage.waitForAllReplicasToProcessLogEntry(zookeeper_path, log_entry, -1);
|
||||
}
|
||||
// Can't use dropPartImpl directly as we need additional zk ops to remember the log entry
|
||||
// for subsequent retries.
|
||||
|
||||
{
|
||||
/// State transition.
|
||||
Entry entry_copy = entry;
|
||||
entry_copy.state = EntryState::SOURCE_DROP_POST_DELAY;
|
||||
entry_copy.update_time = std::time(nullptr);
|
||||
zk->set(entry_copy.znode_path, entry_copy.toString(), entry_copy.version);
|
||||
ReplicatedMergeTreeLogEntryData source_drop_log_entry;
|
||||
|
||||
String source_drop_log_entry_barrier_path = fs::path(entry.znode_path) / ("log_" + entry.state.toString());
|
||||
Coordination::Stat source_drop_log_entry_stat;
|
||||
String source_drop_log_entry_str;
|
||||
if (zk->tryGet(source_drop_log_entry_barrier_path, source_drop_log_entry_str, &source_drop_log_entry_stat))
|
||||
{
|
||||
LOG_DEBUG(log, "Log entry was already created will check the existing one.");
|
||||
|
||||
source_drop_log_entry = *ReplicatedMergeTreeLogEntry::parse(source_drop_log_entry_str, source_drop_log_entry_stat);
|
||||
}
|
||||
else
|
||||
{
|
||||
auto source_drop_part_info = MergeTreePartInfo::fromPartName(entry.part_name, storage.format_version);
|
||||
|
||||
Coordination::Requests ops;
|
||||
ops.emplace_back(zkutil::makeCheckRequest(entry.znode_path, entry.version));
|
||||
|
||||
storage.getClearBlocksInPartitionOps(ops, *zk, source_drop_part_info.partition_id, source_drop_part_info.min_block, source_drop_part_info.max_block);
|
||||
|
||||
source_drop_log_entry.type = ReplicatedMergeTreeLogEntryData::DROP_RANGE;
|
||||
source_drop_log_entry.log_entry_id = source_drop_log_entry_barrier_path;
|
||||
source_drop_log_entry.create_time = std::time(nullptr);
|
||||
source_drop_log_entry.new_part_name = getPartNamePossiblyFake(storage.format_version, source_drop_part_info);
|
||||
source_drop_log_entry.source_replica = storage.replica_name;
|
||||
|
||||
ops.emplace_back(zkutil::makeCreateRequest(source_drop_log_entry_barrier_path, source_drop_log_entry.toString(), -1));
|
||||
ops.emplace_back(zkutil::makeSetRequest(zookeeper_path + "/log", "", -1));
|
||||
ops.emplace_back(zkutil::makeCreateRequest(
|
||||
zookeeper_path + "/log/log-", source_drop_log_entry.toString(), zkutil::CreateMode::PersistentSequential));
|
||||
|
||||
Coordination::Responses responses;
|
||||
Coordination::Error rc = zk->tryMulti(ops, responses);
|
||||
zkutil::KeeperMultiException::check(rc, ops, responses);
|
||||
|
||||
String log_znode_path = dynamic_cast<const Coordination::CreateResponse &>(*responses.back()).path_created;
|
||||
source_drop_log_entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1);
|
||||
|
||||
LOG_DEBUG(log, "Pushed log entry: {}", log_znode_path);
|
||||
}
|
||||
|
||||
Strings unwaited = storage.tryWaitForAllReplicasToProcessLogEntry(zookeeper_path, source_drop_log_entry, 1);
|
||||
if (!unwaited.empty())
|
||||
throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Some replicas haven't processed event: {}, will retry later.", toString(unwaited));
|
||||
|
||||
entry.state = EntryState::SOURCE_DROP_POST_DELAY;
|
||||
return entry;
|
||||
}
|
||||
}
|
||||
break;
|
||||
|
||||
case EntryState::SOURCE_DROP_POST_DELAY:
|
||||
{
|
||||
std::this_thread::sleep_for(std::chrono::seconds(storage.getSettings()->part_moves_between_shards_delay_seconds));
|
||||
|
||||
/// State transition.
|
||||
Entry entry_copy = entry;
|
||||
entry_copy.state = EntryState::REMOVE_UUID_PIN;
|
||||
entry_copy.update_time = std::time(nullptr);
|
||||
zk->set(entry_copy.znode_path, entry_copy.toString(), entry_copy.version);
|
||||
if (entry.rollback)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "It is not possible to rollback from this state. This is a bug.");
|
||||
else
|
||||
{
|
||||
std::this_thread::sleep_for(std::chrono::seconds(storage.getSettings()->part_moves_between_shards_delay_seconds));
|
||||
entry.state = EntryState::REMOVE_UUID_PIN;
|
||||
return entry;
|
||||
}
|
||||
}
|
||||
break;
|
||||
|
||||
case EntryState::REMOVE_UUID_PIN:
|
||||
{
|
||||
if (entry.rollback)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "It is not possible to rollback from this state. This is a bug.");
|
||||
else
|
||||
{
|
||||
PinnedPartUUIDs src_pins;
|
||||
PinnedPartUUIDs dst_pins;
|
||||
removePins(entry, zk);
|
||||
|
||||
{
|
||||
String s = zk->get(zookeeper_path + "/pinned_part_uuids", &src_pins.stat);
|
||||
src_pins.fromString(s);
|
||||
}
|
||||
|
||||
{
|
||||
String s = zk->get(entry.to_shard + "/pinned_part_uuids", &dst_pins.stat);
|
||||
dst_pins.fromString(s);
|
||||
}
|
||||
|
||||
src_pins.part_uuids.erase(entry.part_uuid);
|
||||
dst_pins.part_uuids.erase(entry.part_uuid);
|
||||
|
||||
Coordination::Requests ops;
|
||||
ops.emplace_back(zkutil::makeSetRequest(zookeeper_path + "/pinned_part_uuids", src_pins.toString(), src_pins.stat.version));
|
||||
ops.emplace_back(zkutil::makeSetRequest(entry.to_shard + "/pinned_part_uuids", dst_pins.toString(), dst_pins.stat.version));
|
||||
|
||||
zk->multi(ops);
|
||||
entry.state = EntryState::DONE;
|
||||
return entry;
|
||||
}
|
||||
|
||||
/// State transition.
|
||||
Entry entry_copy = entry;
|
||||
entry_copy.state = EntryState::DONE;
|
||||
entry_copy.update_time = std::time(nullptr);
|
||||
zk->set(entry_copy.znode_path, entry_copy.toString(), entry_copy.version);
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
__builtin_unreachable();
|
||||
}
|
||||
|
||||
void PartMovesBetweenShardsOrchestrator::removePins(const Entry & entry, zkutil::ZooKeeperPtr zk)
|
||||
{
|
||||
PinnedPartUUIDs src_pins;
|
||||
PinnedPartUUIDs dst_pins;
|
||||
|
||||
{
|
||||
String s = zk->get(zookeeper_path + "/pinned_part_uuids", &src_pins.stat);
|
||||
src_pins.fromString(s);
|
||||
}
|
||||
|
||||
{
|
||||
String s = zk->get(entry.to_shard + "/pinned_part_uuids", &dst_pins.stat);
|
||||
dst_pins.fromString(s);
|
||||
}
|
||||
|
||||
dst_pins.part_uuids.erase(entry.part_uuid);
|
||||
src_pins.part_uuids.erase(entry.part_uuid);
|
||||
|
||||
Coordination::Requests ops;
|
||||
ops.emplace_back(zkutil::makeSetRequest(zookeeper_path + "/pinned_part_uuids", src_pins.toString(), src_pins.stat.version));
|
||||
ops.emplace_back(zkutil::makeSetRequest(entry.to_shard + "/pinned_part_uuids", dst_pins.toString(), dst_pins.stat.version));
|
||||
|
||||
zk->multi(ops);
|
||||
}
|
||||
|
||||
CancellationCode PartMovesBetweenShardsOrchestrator::killPartMoveToShard(const UUID & task_uuid)
|
||||
{
|
||||
while (true)
|
||||
{
|
||||
auto entry = getEntryByUUID(task_uuid);
|
||||
|
||||
// If the task is in this state or any that follows it is too late to rollback
|
||||
// since we can't be sure if the source data still exists.
|
||||
auto not_possible_to_rollback_after_state = EntryState(EntryState::SOURCE_DROP);
|
||||
if (entry.state.value >= not_possible_to_rollback_after_state.value)
|
||||
{
|
||||
LOG_DEBUG(log, "Can't kill move part between shards entry {} ({}) after state {}. Current state: {}.",
|
||||
toString(entry.task_uuid), entry.znode_name, not_possible_to_rollback_after_state.toString(), entry.state.toString());
|
||||
return CancellationCode::CancelCannotBeSent;
|
||||
}
|
||||
|
||||
LOG_TRACE(log, "Will try to mark move part between shards entry {} ({}) for rollback.",
|
||||
toString(entry.task_uuid), entry.znode_name);
|
||||
|
||||
auto zk = storage.getZooKeeper();
|
||||
|
||||
// State transition.
|
||||
entry.rollback = true;
|
||||
entry.update_time = std::time(nullptr);
|
||||
entry.num_tries = 0;
|
||||
entry.last_exception_msg = "";
|
||||
|
||||
auto code = zk->trySet(entry.znode_path, entry.toString(), entry.version);
|
||||
if (code == Coordination::Error::ZOK)
|
||||
{
|
||||
// Orchestrator will process it in background.
|
||||
return CancellationCode::CancelSent;
|
||||
}
|
||||
else if (code == Coordination::Error::ZBADVERSION)
|
||||
{
|
||||
/// Node was updated meanwhile. We must re-read it and repeat all the actions.
|
||||
continue;
|
||||
}
|
||||
else
|
||||
throw Coordination::Exception(code, entry.znode_path);
|
||||
}
|
||||
}
|
||||
|
||||
std::vector<PartMovesBetweenShardsOrchestrator::Entry> PartMovesBetweenShardsOrchestrator::getEntries() const
|
||||
std::vector<PartMovesBetweenShardsOrchestrator::Entry> PartMovesBetweenShardsOrchestrator::getEntries()
|
||||
{
|
||||
// Force sync. Also catches parsing errors.
|
||||
syncStateFromZK();
|
||||
|
||||
std::lock_guard lock(state_mutex);
|
||||
|
||||
std::vector<Entry> res;
|
||||
return entries;
|
||||
}
|
||||
|
||||
for (const auto & e : entries)
|
||||
res.push_back(e.second);
|
||||
PartMovesBetweenShardsOrchestrator::Entry PartMovesBetweenShardsOrchestrator::getEntryByUUID(const UUID & task_uuid)
|
||||
{
|
||||
/// Need latest state in case user tries to kill a move observed on a different replica.
|
||||
syncStateFromZK();
|
||||
|
||||
return res;
|
||||
std::lock_guard lock(state_mutex);
|
||||
for (auto const & entry : entries)
|
||||
{
|
||||
if (entry.task_uuid == task_uuid)
|
||||
return entry;
|
||||
}
|
||||
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Task with id {} not found", toString(task_uuid));
|
||||
}
|
||||
|
||||
String PartMovesBetweenShardsOrchestrator::Entry::toString() const
|
||||
@ -431,12 +714,19 @@ String PartMovesBetweenShardsOrchestrator::Entry::toString() const
|
||||
json.set(JSON_KEY_PART_NAME, part_name);
|
||||
json.set(JSON_KEY_PART_UUID, DB::toString(part_uuid));
|
||||
json.set(JSON_KEY_TO_SHARD, to_shard);
|
||||
json.set(JSON_KEY_DST_PART_NAME, dst_part_name);
|
||||
json.set(JSON_KEY_STATE, state.toString());
|
||||
json.set(JSON_KEY_ROLLBACK, DB::toString(rollback));
|
||||
json.set(JSON_KEY_LAST_EX_MSG, last_exception_msg);
|
||||
json.set(JSON_KEY_NUM_TRIES, DB::toString(num_tries));
|
||||
|
||||
std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
|
||||
oss.exceptions(std::ios::failbit);
|
||||
json.stringify(oss);
|
||||
|
||||
// Always escape unicode to make last_exception_msg json safe.
|
||||
// It may contain random binary data when exception is a parsing error
|
||||
// of unexpected contents.
|
||||
Poco::JSON::Stringifier::stringify(json, oss, 0, -1, Poco::JSON_WRAP_STRINGS | Poco::JSON_ESCAPE_UNICODE);
|
||||
|
||||
return oss.str();
|
||||
}
|
||||
@ -452,8 +742,11 @@ void PartMovesBetweenShardsOrchestrator::Entry::fromString(const String & buf)
|
||||
part_name = json->getValue<std::string>(JSON_KEY_PART_NAME);
|
||||
part_uuid = parseFromString<UUID>(json->getValue<std::string>(JSON_KEY_PART_UUID));
|
||||
to_shard = json->getValue<std::string>(JSON_KEY_TO_SHARD);
|
||||
dst_part_name = json->getValue<std::string>(JSON_KEY_DST_PART_NAME);
|
||||
state.value = EntryState::fromString(json->getValue<std::string>(JSON_KEY_STATE));
|
||||
rollback = json->getValue<bool>(JSON_KEY_ROLLBACK);
|
||||
last_exception_msg = json->getValue<std::string>(JSON_KEY_LAST_EX_MSG);
|
||||
num_tries = json->getValue<UInt64>(JSON_KEY_NUM_TRIES);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -7,6 +7,7 @@
|
||||
#include <Core/UUID.h>
|
||||
#include <Core/BackgroundSchedulePool.h>
|
||||
#include <IO/WriteHelpers.h>
|
||||
#include <Interpreters/CancellationCode.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -18,14 +19,37 @@ namespace ErrorCodes
|
||||
|
||||
class StorageReplicatedMergeTree;
|
||||
|
||||
/// Cross shard part movement workflow orchestration.
|
||||
/**
|
||||
* Cross shard part movement workflow orchestration.
|
||||
*
|
||||
* TODO(nv):
|
||||
* * Usage of `format_version` when acting on the behalf of the remote shard.
|
||||
* There needs to be sort of an API to coordinate with remote replicas.
|
||||
* * Only one movement at a time can be coordinated. This can easily be fixed
|
||||
* by cycling through different tasks and checking their status with a
|
||||
* priority queue and back-off for failing tasks
|
||||
* `min(backoff * num_tries, max_backoff)`.
|
||||
*/
|
||||
class PartMovesBetweenShardsOrchestrator
|
||||
{
|
||||
public:
|
||||
struct EntryState
|
||||
{
|
||||
// State transitions are linear. When a kill query is issued a rollback
|
||||
// flag is set and transitions order is reversed.
|
||||
//
|
||||
// SOURCE_DROP is a critical state after which rollback is not possible
|
||||
// and we must ensure that the task can always succeed after that.
|
||||
//
|
||||
// Similar for rollback. It should be always possible to rollback before
|
||||
// SOURCE_DROP state and it should terminate.
|
||||
//
|
||||
// Note: This fragile. If you change the states please add entry to
|
||||
// changelog about forward/backward compatibility. Better not to have
|
||||
// any active move tasks while doing upgrade/downgrade operations.
|
||||
enum Value
|
||||
{
|
||||
CANCELLED,
|
||||
TODO,
|
||||
SYNC_SOURCE,
|
||||
SYNC_DESTINATION,
|
||||
@ -36,7 +60,6 @@ public:
|
||||
SOURCE_DROP_POST_DELAY,
|
||||
REMOVE_UUID_PIN,
|
||||
DONE,
|
||||
CANCELLED,
|
||||
};
|
||||
|
||||
EntryState(): value(TODO) {}
|
||||
@ -95,10 +118,14 @@ public:
|
||||
String part_name;
|
||||
UUID part_uuid;
|
||||
String to_shard;
|
||||
String dst_part_name;
|
||||
|
||||
EntryState state;
|
||||
bool rollback = false;
|
||||
|
||||
/// Reset on successful transitions.
|
||||
String last_exception_msg;
|
||||
UInt64 num_tries = 0;
|
||||
|
||||
String znode_name;
|
||||
|
||||
@ -120,27 +147,31 @@ private:
|
||||
static constexpr auto JSON_KEY_PART_NAME = "part_name";
|
||||
static constexpr auto JSON_KEY_PART_UUID = "part_uuid";
|
||||
static constexpr auto JSON_KEY_TO_SHARD = "to_shard";
|
||||
static constexpr auto JSON_KEY_DST_PART_NAME = "dst_part_name";
|
||||
static constexpr auto JSON_KEY_STATE = "state";
|
||||
static constexpr auto JSON_KEY_ROLLBACK = "rollback";
|
||||
static constexpr auto JSON_KEY_LAST_EX_MSG = "last_exception";
|
||||
static constexpr auto JSON_KEY_NUM_TRIES = "num_tries";
|
||||
|
||||
public:
|
||||
PartMovesBetweenShardsOrchestrator(StorageReplicatedMergeTree & storage_);
|
||||
explicit PartMovesBetweenShardsOrchestrator(StorageReplicatedMergeTree & storage_);
|
||||
|
||||
void start() { task->activateAndSchedule(); }
|
||||
void wakeup() { task->schedule(); }
|
||||
void shutdown();
|
||||
|
||||
void fetchStateFromZK();
|
||||
CancellationCode killPartMoveToShard(const UUID & task_uuid);
|
||||
|
||||
/// We could have one thread per Entry and worry about concurrency issues.
|
||||
/// Or we could have a single thread trying to run one step at a time.
|
||||
bool step();
|
||||
|
||||
std::vector<Entry> getEntries() const;
|
||||
std::vector<Entry> getEntries();
|
||||
|
||||
private:
|
||||
void run();
|
||||
void stepEntry(const Entry & entry, zkutil::ZooKeeperPtr zk);
|
||||
bool step();
|
||||
Entry stepEntry(Entry entry, zkutil::ZooKeeperPtr zk);
|
||||
|
||||
Entry getEntryByUUID(const UUID & task_uuid);
|
||||
void removePins(const Entry & entry, zkutil::ZooKeeperPtr zk);
|
||||
void syncStateFromZK();
|
||||
|
||||
private:
|
||||
StorageReplicatedMergeTree & storage;
|
||||
@ -153,7 +184,7 @@ private:
|
||||
BackgroundSchedulePool::TaskHolder task;
|
||||
|
||||
mutable std::mutex state_mutex;
|
||||
std::map<String, Entry> entries;
|
||||
std::vector<Entry> entries;
|
||||
|
||||
public:
|
||||
String entries_znode_path;
|
||||
|
@ -4669,7 +4669,7 @@ void StorageReplicatedMergeTree::alter(
|
||||
|
||||
/// If new version returns ordinary name, else returns part name containing the first and last month of the month
|
||||
/// NOTE: use it in pair with getFakePartCoveringAllPartsInPartition(...)
|
||||
static String getPartNamePossiblyFake(MergeTreeDataFormatVersion format_version, const MergeTreePartInfo & part_info)
|
||||
String getPartNamePossiblyFake(MergeTreeDataFormatVersion format_version, const MergeTreePartInfo & part_info)
|
||||
{
|
||||
if (format_version < MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
|
||||
{
|
||||
@ -6568,6 +6568,7 @@ void StorageReplicatedMergeTree::movePartitionToShard(
|
||||
dst_pins.part_uuids.insert(part->uuid);
|
||||
|
||||
PartMovesBetweenShardsOrchestrator::Entry part_move_entry;
|
||||
part_move_entry.state = PartMovesBetweenShardsOrchestrator::EntryState::SYNC_SOURCE;
|
||||
part_move_entry.create_time = std::time(nullptr);
|
||||
part_move_entry.update_time = part_move_entry.create_time;
|
||||
part_move_entry.task_uuid = UUIDHelpers::generateV4();
|
||||
@ -6591,10 +6592,13 @@ void StorageReplicatedMergeTree::movePartitionToShard(
|
||||
String task_znode_path = dynamic_cast<const Coordination::CreateResponse &>(*responses.back()).path_created;
|
||||
LOG_DEBUG(log, "Created task for part movement between shards at " + task_znode_path);
|
||||
|
||||
/// Force refresh local state. This will make the task immediately visible in `system.part_moves_between_shards` table.
|
||||
part_moves_between_shards_orchestrator.fetchStateFromZK();
|
||||
/// TODO(nv): Nice to have support for `replication_alter_partitions_sync`.
|
||||
/// For now use the system.part_moves_between_shards table for status.
|
||||
}
|
||||
|
||||
// TODO: Add support for `replication_alter_partitions_sync`.
|
||||
CancellationCode StorageReplicatedMergeTree::killPartMoveToShard(const UUID & task_uuid)
|
||||
{
|
||||
return part_moves_between_shards_orchestrator.killPartMoveToShard(task_uuid);
|
||||
}
|
||||
|
||||
void StorageReplicatedMergeTree::getCommitPartOps(
|
||||
|
@ -682,6 +682,7 @@ private:
|
||||
void replacePartitionFrom(const StoragePtr & source_table, const ASTPtr & partition, bool replace, ContextPtr query_context) override;
|
||||
void movePartitionToTable(const StoragePtr & dest_table, const ASTPtr & partition, ContextPtr query_context) override;
|
||||
void movePartitionToShard(const ASTPtr & partition, bool move_part, const String & to, ContextPtr query_context) override;
|
||||
CancellationCode killPartMoveToShard(const UUID & task_uuid) override;
|
||||
void fetchPartition(
|
||||
const ASTPtr & partition,
|
||||
const StorageMetadataPtr & metadata_snapshot,
|
||||
@ -745,6 +746,8 @@ protected:
|
||||
bool allow_renaming_);
|
||||
};
|
||||
|
||||
String getPartNamePossiblyFake(MergeTreeDataFormatVersion format_version, const MergeTreePartInfo & part_info);
|
||||
|
||||
|
||||
/** There are three places for each part, where it should be
|
||||
* 1. In the RAM, data_parts, all_data_parts.
|
||||
|
@ -30,10 +30,12 @@ NamesAndTypesList StorageSystemPartMovesBetweenShards::getNamesAndTypes()
|
||||
{ "part_name", std::make_shared<DataTypeString>() },
|
||||
{ "part_uuid", std::make_shared<DataTypeUUID>() },
|
||||
{ "to_shard", std::make_shared<DataTypeString>() },
|
||||
{ "dst_part_name", std::make_shared<DataTypeString>() },
|
||||
|
||||
/// Processing status of item.
|
||||
{ "update_time", std::make_shared<DataTypeDateTime>() },
|
||||
{ "state", std::make_shared<DataTypeString>() },
|
||||
{ "rollback", std::make_shared<DataTypeUInt8>() },
|
||||
{ "num_tries", std::make_shared<DataTypeUInt32>() },
|
||||
{ "last_exception", std::make_shared<DataTypeString>() },
|
||||
};
|
||||
@ -122,11 +124,13 @@ void StorageSystemPartMovesBetweenShards::fillData(MutableColumns & res_columns,
|
||||
res_columns[col_num++]->insert(entry.part_name);
|
||||
res_columns[col_num++]->insert(entry.part_uuid);
|
||||
res_columns[col_num++]->insert(entry.to_shard);
|
||||
res_columns[col_num++]->insert(entry.dst_part_name);
|
||||
|
||||
/// Processing status of item.
|
||||
res_columns[col_num++]->insert(entry.update_time);
|
||||
res_columns[col_num++]->insert(entry.state.toString());
|
||||
res_columns[col_num++]->insert(0);
|
||||
res_columns[col_num++]->insert(entry.rollback);
|
||||
res_columns[col_num++]->insert(entry.num_tries);
|
||||
res_columns[col_num++]->insert(entry.last_exception_msg);
|
||||
}
|
||||
}
|
||||
|
@ -1,32 +1,38 @@
|
||||
import random
|
||||
import time
|
||||
|
||||
import pytest
|
||||
import random
|
||||
import threading
|
||||
import time
|
||||
|
||||
from helpers.client import QueryRuntimeException
|
||||
from helpers.cluster import ClickHouseCluster
|
||||
from helpers.test_tools import TSV
|
||||
|
||||
transient_ch_errors = [23, 32, 210]
|
||||
|
||||
cluster = ClickHouseCluster(__file__)
|
||||
|
||||
s0r0 = cluster.add_instance(
|
||||
's0r0',
|
||||
main_configs=['configs/remote_servers.xml', 'configs/merge_tree.xml'],
|
||||
stay_alive=True,
|
||||
with_zookeeper=True)
|
||||
|
||||
s0r1 = cluster.add_instance(
|
||||
's0r1',
|
||||
main_configs=['configs/remote_servers.xml', 'configs/merge_tree.xml'],
|
||||
stay_alive=True,
|
||||
with_zookeeper=True)
|
||||
|
||||
s1r0 = cluster.add_instance(
|
||||
's1r0',
|
||||
main_configs=['configs/remote_servers.xml', 'configs/merge_tree.xml'],
|
||||
stay_alive=True,
|
||||
with_zookeeper=True)
|
||||
|
||||
s1r1 = cluster.add_instance(
|
||||
's1r1',
|
||||
main_configs=['configs/remote_servers.xml', 'configs/merge_tree.xml'],
|
||||
stay_alive=True,
|
||||
with_zookeeper=True)
|
||||
|
||||
|
||||
@ -43,12 +49,14 @@ def test_move(started_cluster):
|
||||
for shard_ix, rs in enumerate([[s0r0, s0r1], [s1r0, s1r1]]):
|
||||
for replica_ix, r in enumerate(rs):
|
||||
r.query("""
|
||||
DROP TABLE IF EXISTS test_move;
|
||||
CREATE TABLE test_move(v UInt64)
|
||||
ENGINE ReplicatedMergeTree('/clickhouse/shard_{}/tables/test_move', '{}')
|
||||
ORDER BY tuple()
|
||||
""".format(shard_ix, replica_ix))
|
||||
""".format(shard_ix, r.name))
|
||||
|
||||
s0r0.query("SYSTEM STOP MERGES test_move")
|
||||
s0r1.query("SYSTEM STOP MERGES test_move")
|
||||
|
||||
s0r0.query("INSERT INTO test_move VALUES (1)")
|
||||
s0r0.query("INSERT INTO test_move VALUES (2)")
|
||||
@ -63,14 +71,7 @@ def test_move(started_cluster):
|
||||
s0r0.query("SYSTEM START MERGES test_move")
|
||||
s0r0.query("OPTIMIZE TABLE test_move FINAL")
|
||||
|
||||
while True:
|
||||
time.sleep(3)
|
||||
|
||||
print(s0r0.query("SELECT * FROM system.part_moves_between_shards"))
|
||||
|
||||
# Eventually.
|
||||
if "DONE" == s0r0.query("SELECT state FROM system.part_moves_between_shards WHERE table = 'test_move'").strip():
|
||||
break
|
||||
wait_for_state("DONE", s0r0, "test_move")
|
||||
|
||||
for n in [s0r0, s0r1]:
|
||||
assert "1" == n.query("SELECT count() FROM test_move").strip()
|
||||
@ -81,14 +82,7 @@ def test_move(started_cluster):
|
||||
# Move part back
|
||||
s1r0.query("ALTER TABLE test_move MOVE PART 'all_0_0_0' TO SHARD '/clickhouse/shard_0/tables/test_move'")
|
||||
|
||||
while True:
|
||||
time.sleep(3)
|
||||
|
||||
print(s1r0.query("SELECT * FROM system.part_moves_between_shards"))
|
||||
|
||||
# Eventually.
|
||||
if "DONE" == s1r0.query("SELECT state FROM system.part_moves_between_shards WHERE table = 'test_move'").strip():
|
||||
break
|
||||
wait_for_state("DONE", s1r0, "test_move")
|
||||
|
||||
for n in [s0r0, s0r1]:
|
||||
assert "2" == n.query("SELECT count() FROM test_move").strip()
|
||||
@ -101,17 +95,20 @@ def test_deduplication_while_move(started_cluster):
|
||||
for shard_ix, rs in enumerate([[s0r0, s0r1], [s1r0, s1r1]]):
|
||||
for replica_ix, r in enumerate(rs):
|
||||
r.query("""
|
||||
DROP TABLE IF EXISTS test_deduplication;
|
||||
CREATE TABLE test_deduplication(v UInt64)
|
||||
ENGINE ReplicatedMergeTree('/clickhouse/shard_{}/tables/test_deduplication', '{}')
|
||||
ORDER BY tuple()
|
||||
""".format(shard_ix, replica_ix))
|
||||
""".format(shard_ix, r.name))
|
||||
|
||||
r.query("""
|
||||
CREATE TABLE t_d AS test_deduplication
|
||||
DROP TABLE IF EXISTS test_deduplication_d;
|
||||
CREATE TABLE test_deduplication_d AS test_deduplication
|
||||
ENGINE Distributed('test_cluster', '', test_deduplication)
|
||||
""")
|
||||
|
||||
s0r0.query("SYSTEM STOP MERGES test_deduplication")
|
||||
s0r1.query("SYSTEM STOP MERGES test_deduplication")
|
||||
|
||||
s0r0.query("INSERT INTO test_deduplication VALUES (1)")
|
||||
s0r0.query("INSERT INTO test_deduplication VALUES (2)")
|
||||
@ -120,7 +117,8 @@ def test_deduplication_while_move(started_cluster):
|
||||
assert "2" == s0r0.query("SELECT count() FROM test_deduplication").strip()
|
||||
assert "0" == s1r0.query("SELECT count() FROM test_deduplication").strip()
|
||||
|
||||
s0r0.query("ALTER TABLE test_deduplication MOVE PART 'all_0_0_0' TO SHARD '/clickhouse/shard_1/tables/test_deduplication'")
|
||||
s0r0.query(
|
||||
"ALTER TABLE test_deduplication MOVE PART 'all_0_0_0' TO SHARD '/clickhouse/shard_1/tables/test_deduplication'")
|
||||
s0r0.query("SYSTEM START MERGES test_deduplication")
|
||||
|
||||
expected = """
|
||||
@ -128,32 +126,363 @@ def test_deduplication_while_move(started_cluster):
|
||||
2
|
||||
"""
|
||||
|
||||
# Verify that we get consisntent result at all times while the part is moving from one shard to another.
|
||||
while "DONE" != s0r0.query("SELECT state FROM system.part_moves_between_shards WHERE table = 'test_deduplication' ORDER BY create_time DESC LIMIT 1").strip():
|
||||
def deduplication_invariant_test():
|
||||
n = random.choice(list(started_cluster.instances.values()))
|
||||
assert TSV(
|
||||
n.query("SELECT * FROM test_deduplication_d ORDER BY v",
|
||||
settings={"allow_experimental_query_deduplication": 1})
|
||||
) == TSV(expected)
|
||||
|
||||
assert TSV(n.query("SELECT * FROM t_d ORDER BY v", settings={
|
||||
"allow_experimental_query_deduplication": 1
|
||||
})) == TSV(expected)
|
||||
deduplication_invariant = ConcurrentInvariant(deduplication_invariant_test)
|
||||
deduplication_invariant.start()
|
||||
|
||||
wait_for_state("DONE", s0r0, "test_deduplication")
|
||||
|
||||
deduplication_invariant.stop_and_assert_no_exception()
|
||||
|
||||
|
||||
def test_part_move_step_by_step(started_cluster):
|
||||
for shard_ix, rs in enumerate([[s0r0, s0r1], [s1r0, s1r1]]):
|
||||
for replica_ix, r in enumerate(rs):
|
||||
r.query("""
|
||||
DROP TABLE IF EXISTS test_part_move_step_by_step;
|
||||
CREATE TABLE test_part_move_step_by_step(v UInt64)
|
||||
ENGINE ReplicatedMergeTree('/clickhouse/shard_{}/tables/test_part_move_step_by_step', '{}')
|
||||
ORDER BY tuple()
|
||||
""".format(shard_ix, r.name))
|
||||
|
||||
r.query("""
|
||||
DROP TABLE IF EXISTS test_part_move_step_by_step_d;
|
||||
CREATE TABLE test_part_move_step_by_step_d AS test_part_move_step_by_step
|
||||
ENGINE Distributed('test_cluster', currentDatabase(), test_part_move_step_by_step)
|
||||
""")
|
||||
|
||||
s0r0.query("SYSTEM STOP MERGES test_part_move_step_by_step")
|
||||
s0r1.query("SYSTEM STOP MERGES test_part_move_step_by_step")
|
||||
|
||||
s0r0.query("INSERT INTO test_part_move_step_by_step VALUES (1)")
|
||||
s0r0.query("INSERT INTO test_part_move_step_by_step VALUES (2)")
|
||||
s0r1.query("SYSTEM SYNC REPLICA test_part_move_step_by_step", timeout=20)
|
||||
|
||||
assert "2" == s0r0.query("SELECT count() FROM test_part_move_step_by_step").strip()
|
||||
assert "0" == s1r0.query("SELECT count() FROM test_part_move_step_by_step").strip()
|
||||
|
||||
expected = """
|
||||
1
|
||||
2
|
||||
"""
|
||||
|
||||
def deduplication_invariant_test():
|
||||
n = random.choice(list(started_cluster.instances.values()))
|
||||
try:
|
||||
assert TSV(
|
||||
n.query("SELECT * FROM test_part_move_step_by_step_d ORDER BY v",
|
||||
settings={"allow_experimental_query_deduplication": 1})
|
||||
) == TSV(expected)
|
||||
except QueryRuntimeException as e:
|
||||
# ignore transient errors that are caused by us restarting nodes
|
||||
if e.returncode not in transient_ch_errors:
|
||||
raise e
|
||||
|
||||
deduplication_invariant = ConcurrentInvariant(deduplication_invariant_test)
|
||||
deduplication_invariant.start()
|
||||
|
||||
# Stop a source replica to prevent SYNC_SOURCE succeeding.
|
||||
s0r1.stop_clickhouse()
|
||||
|
||||
s0r0.query(
|
||||
"ALTER TABLE test_part_move_step_by_step MOVE PART 'all_0_0_0' TO SHARD '/clickhouse/shard_1/tables/test_part_move_step_by_step'")
|
||||
|
||||
# Should hang on SYNC_SOURCE until all source replicas acknowledge new pinned UUIDs.
|
||||
wait_for_state("SYNC_SOURCE", s0r0, "test_part_move_step_by_step", "Some replicas haven\\'t processed event")
|
||||
deduplication_invariant.assert_no_exception()
|
||||
|
||||
# Start all replicas in source shard but stop a replica in destination shard
|
||||
# to prevent SYNC_DESTINATION succeeding.
|
||||
s1r1.stop_clickhouse()
|
||||
s0r1.start_clickhouse()
|
||||
|
||||
# After SYNC_SOURCE step no merges will be assigned.
|
||||
s0r0.query("SYSTEM START MERGES test_part_move_step_by_step; OPTIMIZE TABLE test_part_move_step_by_step;")
|
||||
s0r1.query("SYSTEM START MERGES test_part_move_step_by_step; OPTIMIZE TABLE test_part_move_step_by_step;")
|
||||
|
||||
wait_for_state("SYNC_DESTINATION", s0r0, "test_part_move_step_by_step", "Some replicas haven\\'t processed event")
|
||||
deduplication_invariant.assert_no_exception()
|
||||
|
||||
# Start previously stopped replica in destination shard to let SYNC_DESTINATION
|
||||
# succeed.
|
||||
# Stop the other replica in destination shard to prevent DESTINATION_FETCH succeed.
|
||||
s1r0.stop_clickhouse()
|
||||
s1r1.start_clickhouse()
|
||||
wait_for_state("DESTINATION_FETCH", s0r0, "test_part_move_step_by_step", "Some replicas haven\\'t processed event")
|
||||
deduplication_invariant.assert_no_exception()
|
||||
|
||||
# Start previously stopped replica in destination shard to let DESTINATION_FETCH
|
||||
# succeed.
|
||||
# Stop the other replica in destination shard to prevent DESTINATION_ATTACH succeed.
|
||||
s1r1.stop_clickhouse()
|
||||
s1r0.start_clickhouse()
|
||||
wait_for_state("DESTINATION_ATTACH", s0r0, "test_part_move_step_by_step", "Some replicas haven\\'t processed event")
|
||||
deduplication_invariant.assert_no_exception()
|
||||
|
||||
# Start all replicas in destination shard to let DESTINATION_ATTACH succeed.
|
||||
# Stop a source replica to prevent SOURCE_DROP succeeding.
|
||||
s0r0.stop_clickhouse()
|
||||
s1r1.start_clickhouse()
|
||||
wait_for_state("SOURCE_DROP", s0r1, "test_part_move_step_by_step", "Some replicas haven\\'t processed event")
|
||||
deduplication_invariant.assert_no_exception()
|
||||
|
||||
s0r0.start_clickhouse()
|
||||
wait_for_state("DONE", s0r1, "test_part_move_step_by_step")
|
||||
deduplication_invariant.assert_no_exception()
|
||||
|
||||
# No hung tasks in replication queue. Would timeout otherwise.
|
||||
for instance in started_cluster.instances.values():
|
||||
instance.query("SYSTEM SYNC REPLICA test_part_move_step_by_step")
|
||||
|
||||
assert "1" == s0r0.query("SELECT count() FROM test_part_move_step_by_step").strip()
|
||||
assert "1" == s1r0.query("SELECT count() FROM test_part_move_step_by_step").strip()
|
||||
|
||||
deduplication_invariant.stop_and_assert_no_exception()
|
||||
|
||||
|
||||
def test_part_move_step_by_step_kill(started_cluster):
|
||||
for shard_ix, rs in enumerate([[s0r0, s0r1], [s1r0, s1r1]]):
|
||||
for replica_ix, r in enumerate(rs):
|
||||
r.query("""
|
||||
DROP TABLE IF EXISTS test_part_move_step_by_step_kill;
|
||||
CREATE TABLE test_part_move_step_by_step_kill(v UInt64)
|
||||
ENGINE ReplicatedMergeTree('/clickhouse/shard_{}/tables/test_part_move_step_by_step_kill', '{}')
|
||||
ORDER BY tuple()
|
||||
""".format(shard_ix, r.name))
|
||||
|
||||
r.query("""
|
||||
DROP TABLE IF EXISTS test_part_move_step_by_step_kill_d;
|
||||
CREATE TABLE test_part_move_step_by_step_kill_d AS test_part_move_step_by_step_kill
|
||||
ENGINE Distributed('test_cluster', currentDatabase(), test_part_move_step_by_step_kill)
|
||||
""")
|
||||
|
||||
s0r0.query("SYSTEM STOP MERGES test_part_move_step_by_step_kill")
|
||||
s0r1.query("SYSTEM STOP MERGES test_part_move_step_by_step_kill")
|
||||
|
||||
s0r0.query("INSERT INTO test_part_move_step_by_step_kill VALUES (1)")
|
||||
s0r0.query("INSERT INTO test_part_move_step_by_step_kill VALUES (2)")
|
||||
s0r1.query("SYSTEM SYNC REPLICA test_part_move_step_by_step_kill", timeout=20)
|
||||
|
||||
assert "2" == s0r0.query("SELECT count() FROM test_part_move_step_by_step_kill").strip()
|
||||
assert "0" == s1r0.query("SELECT count() FROM test_part_move_step_by_step_kill").strip()
|
||||
|
||||
expected = """
|
||||
1
|
||||
2
|
||||
"""
|
||||
|
||||
def deduplication_invariant_test():
|
||||
n = random.choice(list(started_cluster.instances.values()))
|
||||
try:
|
||||
assert TSV(
|
||||
n.query("SELECT * FROM test_part_move_step_by_step_kill_d ORDER BY v",
|
||||
settings={
|
||||
"allow_experimental_query_deduplication": 1
|
||||
})
|
||||
) == TSV(expected)
|
||||
except QueryRuntimeException as e:
|
||||
# ignore transient errors that are caused by us restarting nodes
|
||||
if e.returncode not in transient_ch_errors:
|
||||
raise e
|
||||
|
||||
deduplication_invariant = ConcurrentInvariant(deduplication_invariant_test)
|
||||
deduplication_invariant.start()
|
||||
|
||||
# Stop a source replica to prevent SYNC_SOURCE succeeding.
|
||||
s0r1.stop_clickhouse()
|
||||
|
||||
s0r0.query(
|
||||
"ALTER TABLE test_part_move_step_by_step_kill MOVE PART 'all_0_0_0' TO SHARD '/clickhouse/shard_1/tables/test_part_move_step_by_step_kill'")
|
||||
|
||||
# Should hang on SYNC_SOURCE until all source replicas acknowledge new pinned UUIDs.
|
||||
wait_for_state("SYNC_SOURCE", s0r0, "test_part_move_step_by_step_kill", "Some replicas haven\\'t processed event")
|
||||
deduplication_invariant.assert_no_exception()
|
||||
|
||||
# Start all replicas in source shard but stop a replica in destination shard
|
||||
# to prevent SYNC_DESTINATION succeeding.
|
||||
s1r1.stop_clickhouse()
|
||||
s0r1.start_clickhouse()
|
||||
|
||||
# After SYNC_SOURCE step no merges will be assigned.
|
||||
s0r0.query("SYSTEM START MERGES test_part_move_step_by_step_kill; OPTIMIZE TABLE test_part_move_step_by_step_kill;")
|
||||
s0r1.query("SYSTEM START MERGES test_part_move_step_by_step_kill; OPTIMIZE TABLE test_part_move_step_by_step_kill;")
|
||||
|
||||
wait_for_state("SYNC_DESTINATION", s0r0, "test_part_move_step_by_step_kill",
|
||||
"Some replicas haven\\'t processed event")
|
||||
deduplication_invariant.assert_no_exception()
|
||||
|
||||
# Start previously stopped replica in destination shard to let SYNC_DESTINATION
|
||||
# succeed.
|
||||
# Stop the other replica in destination shard to prevent DESTINATION_FETCH succeed.
|
||||
s1r0.stop_clickhouse()
|
||||
s1r1.start_clickhouse()
|
||||
wait_for_state("DESTINATION_FETCH", s0r0, "test_part_move_step_by_step_kill",
|
||||
"Some replicas haven\\'t processed event")
|
||||
|
||||
# Start previously stopped replica in destination shard to let DESTINATION_FETCH
|
||||
# succeed.
|
||||
# Stop the other replica in destination shard to prevent DESTINATION_ATTACH succeed.
|
||||
s1r1.stop_clickhouse()
|
||||
s1r0.start_clickhouse()
|
||||
wait_for_state("DESTINATION_ATTACH", s0r0, "test_part_move_step_by_step_kill",
|
||||
"Some replicas haven\\'t processed event")
|
||||
deduplication_invariant.assert_no_exception()
|
||||
|
||||
# Rollback here.
|
||||
s0r0.query("""
|
||||
KILL PART_MOVE_TO_SHARD
|
||||
WHERE task_uuid = (SELECT task_uuid FROM system.part_moves_between_shards WHERE table = 'test_part_move_step_by_step_kill')
|
||||
""")
|
||||
|
||||
wait_for_state("DESTINATION_ATTACH", s0r0, "test_part_move_step_by_step_kill",
|
||||
assert_exception_msg="Some replicas haven\\'t processed event",
|
||||
assert_rollback=True)
|
||||
|
||||
s1r1.start_clickhouse()
|
||||
|
||||
wait_for_state("CANCELLED", s0r0, "test_part_move_step_by_step_kill", assert_rollback=True)
|
||||
deduplication_invariant.assert_no_exception()
|
||||
|
||||
# No hung tasks in replication queue. Would timeout otherwise.
|
||||
for instance in started_cluster.instances.values():
|
||||
instance.query("SYSTEM SYNC REPLICA test_part_move_step_by_step_kill")
|
||||
|
||||
assert "2" == s0r0.query("SELECT count() FROM test_part_move_step_by_step_kill").strip()
|
||||
assert "0" == s1r0.query("SELECT count() FROM test_part_move_step_by_step_kill").strip()
|
||||
|
||||
deduplication_invariant.stop_and_assert_no_exception()
|
||||
|
||||
|
||||
def test_move_not_permitted(started_cluster):
|
||||
# Verify that invariants for part compatibility are checked.
|
||||
|
||||
# Tests are executed in order. Make sure cluster is up if previous test
|
||||
# failed.
|
||||
s0r0.start_clickhouse()
|
||||
s1r0.start_clickhouse()
|
||||
|
||||
for ix, n in enumerate([s0r0, s1r0]):
|
||||
n.query("DROP TABLE IF EXISTS not_permitted")
|
||||
n.query("""
|
||||
CREATE TABLE not_permitted(v_{} UInt64)
|
||||
ENGINE ReplicatedMergeTree('/clickhouse/shard_{}/tables/not_permitted', 'r')
|
||||
ORDER BY tuple()
|
||||
""".format(ix, ix))
|
||||
DROP TABLE IF EXISTS not_permitted_columns;
|
||||
|
||||
s0r0.query("INSERT INTO not_permitted VALUES (1)")
|
||||
CREATE TABLE not_permitted_columns(v_{ix} UInt64)
|
||||
ENGINE ReplicatedMergeTree('/clickhouse/shard_{ix}/tables/not_permitted_columns', 'r')
|
||||
ORDER BY tuple();
|
||||
""".format(ix=ix))
|
||||
|
||||
with pytest.raises(QueryRuntimeException) as exc:
|
||||
s0r0.query("ALTER TABLE not_permitted MOVE PART 'all_0_0_0' TO SHARD '/clickhouse/shard_1/tables/not_permitted'")
|
||||
partition = "date"
|
||||
if ix > 0:
|
||||
partition = "v"
|
||||
|
||||
assert "DB::Exception: Table columns structure in ZooKeeper is different from local table structure." in str(exc.value)
|
||||
n.query("""
|
||||
DROP TABLE IF EXISTS not_permitted_partition;
|
||||
CREATE TABLE not_permitted_partition(date Date, v UInt64)
|
||||
ENGINE ReplicatedMergeTree('/clickhouse/shard_{ix}/tables/not_permitted_partition', 'r')
|
||||
PARTITION BY ({partition})
|
||||
ORDER BY tuple();
|
||||
""".format(ix=ix, partition=partition))
|
||||
|
||||
with pytest.raises(QueryRuntimeException) as exc:
|
||||
s0r0.query("ALTER TABLE not_permitted MOVE PART 'all_0_0_0' TO SHARD '/clickhouse/shard_0/tables/not_permitted'")
|
||||
s0r0.query("INSERT INTO not_permitted_columns VALUES (1)")
|
||||
s0r0.query("INSERT INTO not_permitted_partition VALUES ('2021-09-03', 1)")
|
||||
|
||||
assert "DB::Exception: Source and destination are the same" in str(exc.value)
|
||||
with pytest.raises(QueryRuntimeException, match="DB::Exception: Source and destination are the same"):
|
||||
s0r0.query("ALTER TABLE not_permitted_columns MOVE PART 'all_0_0_0' TO SHARD '/clickhouse/shard_0/tables/not_permitted_columns'")
|
||||
|
||||
with pytest.raises(QueryRuntimeException, match="DB::Exception: Table columns structure in ZooKeeper is different from local table structure."):
|
||||
s0r0.query("ALTER TABLE not_permitted_columns MOVE PART 'all_0_0_0' TO SHARD '/clickhouse/shard_1/tables/not_permitted_columns'")
|
||||
|
||||
with pytest.raises(QueryRuntimeException, match="DB::Exception: Existing table metadata in ZooKeeper differs in partition key expression."):
|
||||
s0r0.query("ALTER TABLE not_permitted_partition MOVE PART '20210903_0_0_0' TO SHARD '/clickhouse/shard_1/tables/not_permitted_partition'")
|
||||
|
||||
|
||||
def wait_for_state(desired_state, instance, test_table, assert_exception_msg=None, assert_rollback=False):
|
||||
last_debug_print_time = time.time()
|
||||
|
||||
print("Waiting to reach state: {}".format(desired_state))
|
||||
if assert_exception_msg:
|
||||
print(" with exception contents: {}".format(assert_exception_msg))
|
||||
if assert_rollback:
|
||||
print(" and rollback: {}".format(assert_rollback))
|
||||
|
||||
while True:
|
||||
tasks = TSV.toMat(instance.query(
|
||||
"SELECT state, num_tries, last_exception, rollback FROM system.part_moves_between_shards WHERE table = '{}'".format(
|
||||
test_table)))
|
||||
assert len(tasks) == 1, "only one task expected in this test"
|
||||
|
||||
if time.time() - last_debug_print_time > 30:
|
||||
last_debug_print_time = time.time()
|
||||
print("Current state: ", tasks)
|
||||
|
||||
[state, num_tries, last_exception, rollback] = tasks[0]
|
||||
|
||||
if state == desired_state:
|
||||
if assert_exception_msg and int(num_tries) < 3:
|
||||
# Let the task be retried a few times when expecting an exception
|
||||
# to make sure the exception is persistent and the code doesn't
|
||||
# accidentally continue to run when we expect it not to.
|
||||
continue
|
||||
|
||||
if assert_exception_msg:
|
||||
assert assert_exception_msg in last_exception
|
||||
|
||||
if assert_rollback:
|
||||
assert int(rollback) == 1, "rollback bit isn't set"
|
||||
|
||||
break
|
||||
elif state in ["DONE", "CANCELLED"]:
|
||||
raise Exception("Reached terminal state {}, but was waiting for {}".format(state, desired_state))
|
||||
|
||||
time.sleep(0.1)
|
||||
|
||||
|
||||
class ConcurrentInvariant:
|
||||
def __init__(self, invariant_test, loop_sleep=0.1):
|
||||
self.invariant_test = invariant_test
|
||||
self.loop_sleep = loop_sleep
|
||||
|
||||
self.started = False
|
||||
self.exiting = False
|
||||
self.exception = None
|
||||
self.thread = threading.Thread(target=self._loop)
|
||||
|
||||
def start(self):
|
||||
if self.started:
|
||||
raise Exception('invariant thread already started')
|
||||
|
||||
self.started = True
|
||||
self.thread.start()
|
||||
|
||||
def stop_and_assert_no_exception(self):
|
||||
self._assert_started()
|
||||
|
||||
self.exiting = True
|
||||
self.thread.join()
|
||||
|
||||
if self.exception:
|
||||
raise self.exception
|
||||
|
||||
def assert_no_exception(self):
|
||||
self._assert_started()
|
||||
|
||||
if self.exception:
|
||||
raise self.exception
|
||||
|
||||
def _loop(self):
|
||||
try:
|
||||
while not self.exiting:
|
||||
self.invariant_test()
|
||||
time.sleep(self.loop_sleep)
|
||||
except Exception as e:
|
||||
self.exiting = True
|
||||
self.exception = e
|
||||
|
||||
def _assert_started(self):
|
||||
if not self.started:
|
||||
raise Exception('invariant thread not started, forgot to call start?')
|
||||
|
Loading…
Reference in New Issue
Block a user