DROP PART and clear blocks in a single zookeeper op

This commit is contained in:
alesapin 2020-11-02 20:30:53 +03:00
parent 364607d87d
commit dac6705995
5 changed files with 38 additions and 31 deletions

View File

@ -2642,6 +2642,14 @@ void MergeTreeData::checkPartitionCanBeDropped(const ASTPtr & partition)
global_context.checkPartitionCanBeDropped(table_id.database_name, table_id.table_name, partition_size);
}
void MergeTreeData::checkPartCanBeDropped(const ASTPtr & part_ast)
{
String part_name = part_ast->as<ASTLiteral &>().value.safeGet<String>();
auto part = getPartIfExists(part_name, {MergeTreeDataPartState::Committed});
if (!part)
throw Exception(ErrorCodes::NO_SUCH_DATA_PART, "No part {} in commited state", part_name);
}
void MergeTreeData::movePartitionToDisk(const ASTPtr & partition, const String & name, bool moving_part, const Context & context)
{
String partition_id;

View File

@ -560,6 +560,8 @@ public:
void checkPartitionCanBeDropped(const ASTPtr & partition) override;
void checkPartCanBeDropped(const ASTPtr & part);
size_t getColumnCompressedSize(const std::string & name) const
{
auto lock = lockParts();

View File

@ -1117,9 +1117,7 @@ Pipe StorageMergeTree::alterPartition(
{
case PartitionCommand::DROP_PARTITION:
if (command.part)
{
/// TODO(nv) what would be a good check here?
}
checkPartCanBeDropped(command.partition);
else
checkPartitionCanBeDropped(command.partition);
dropPartition(command.partition, command.detach, command.part, query_context);

View File

@ -4191,9 +4191,7 @@ Pipe StorageReplicatedMergeTree::alterPartition(
{
case PartitionCommand::DROP_PARTITION:
if (command.part)
{
/// TODO(nv) what to check here? it is possible to drop a big partition by dropping small parts...
}
checkPartCanBeDropped(command.partition);
else
checkPartitionCanBeDropped(command.partition);
dropPartition(command.partition, command.detach, command.part, query_context);
@ -5479,8 +5477,8 @@ void StorageReplicatedMergeTree::removePartsFromZooKeeper(
}
void StorageReplicatedMergeTree::clearBlocksInPartition(
zkutil::ZooKeeper & zookeeper, const String & partition_id, Int64 min_block_num, Int64 max_block_num)
void StorageReplicatedMergeTree::getClearBlocksInPartitionOps(
Coordination::Requests & ops, zkutil::ZooKeeper & zookeeper, const String & partition_id, Int64 min_block_num, Int64 max_block_num)
{
Strings blocks;
if (Coordination::Error::ZOK != zookeeper.tryGetChildren(zookeeper_path + "/blocks", blocks))
@ -5497,7 +5495,6 @@ void StorageReplicatedMergeTree::clearBlocksInPartition(
}
}
zkutil::AsyncResponses<Coordination::RemoveResponse> to_delete_futures;
for (auto & pair : get_futures)
{
const String & path = pair.first;
@ -5510,23 +5507,25 @@ void StorageReplicatedMergeTree::clearBlocksInPartition(
Int64 block_num = 0;
bool parsed = tryReadIntText(block_num, buf) && buf.eof();
if (!parsed || (min_block_num <= block_num && block_num <= max_block_num))
to_delete_futures.emplace_back(path, zookeeper.asyncTryRemove(path));
ops.emplace_back(zkutil::makeRemoveRequest(path, -1));
}
}
void StorageReplicatedMergeTree::clearBlocksInPartition(
zkutil::ZooKeeper & zookeeper, const String & partition_id, Int64 min_block_num, Int64 max_block_num)
{
Coordination::Requests delete_requests;
getClearBlocksInPartitionOps(delete_requests, zookeeper, partition_id, min_block_num, max_block_num);
Coordination::Responses delete_responses;
auto code = zookeeper.tryMulti(delete_requests, delete_responses);
if (code != Coordination::Error::ZOK)
{
for (size_t i = 0; i < delete_requests.size(); ++i)
if (delete_responses[i]->error != Coordination::Error::ZOK)
LOG_WARNING(log, "Error while deleting ZooKeeper path `{}`: {}, ignoring.", delete_requests[i]->getPath(), Coordination::errorMessage(delete_responses[i]->error));
}
for (auto & pair : to_delete_futures)
{
const String & path = pair.first;
Coordination::Error rc = pair.second.get().error;
if (rc == Coordination::Error::ZNOTEMPTY)
{
/// Can happen if there are leftover block nodes with children created by previous server versions.
zookeeper.removeRecursive(path);
}
else if (rc != Coordination::Error::ZOK)
LOG_WARNING(log, "Error while deleting ZooKeeper path `{}`: {}, ignoring.", path, Coordination::errorMessage(rc));
}
LOG_TRACE(log, "Deleted {} deduplication block IDs in partition ID {}", to_delete_futures.size(), partition_id);
LOG_TRACE(log, "Deleted {} deduplication block IDs in partition ID {}", delete_requests.size(), partition_id);
}
void StorageReplicatedMergeTree::replacePartitionFrom(
@ -6053,22 +6052,22 @@ bool StorageReplicatedMergeTree::dropPart(
{
ReplicatedMergeTreeMergePredicate merge_pred = queue.getMergePredicate(zookeeper);
/// TODO(nv) It is possible that part does not exist on replica which executes this statement.
/// Also, it possible for the part to not exist on any replicas, replica which created log entries for the part disappeared.
auto part = data_parts_by_info.find(part_info);
if (part == data_parts_by_info.end())
throw Exception("Part " + part_name + " not found locally, won't try to drop it.", ErrorCodes::NOT_IMPLEMENTED);
auto part = getPartIfExists(part_info, {MergeTreeDataPartState::Committed});
/// TODO(nv) get ops and commit together w/ log entry?
clearBlocksInPartition(*zookeeper, part_info.partition_id, part_info.min_block, part_info.max_block);
if (!part)
throw Exception("Part " + part_name + " not found locally, won't try to drop it.", ErrorCodes::NOT_IMPLEMENTED);
/// There isn't a lot we can do otherwise. Can't cancel merges because it is possible that a replica already
/// finished the merge.
if (partIsAssignedToBackgroundOperation(*part))
if (partIsAssignedToBackgroundOperation(part))
throw Exception("Part " + part_name
+ " is currently participating in a background operation (mutation/merge)"
+ ", try again later.", ErrorCodes::PART_IS_TEMPORARILY_LOCKED);
Coordination::Requests ops;
getClearBlocksInPartitionOps(ops, *zookeeper, part_info.partition_id, part_info.min_block, part_info.max_block);
size_t clean_block_ops_size = ops.size();
/// If `part_name` is result of a recent merge and source parts are still available then
/// DROP_RANGE with detach will move this part together with source parts to `detached/` dir.
entry.type = LogEntry::DROP_RANGE;
@ -6077,7 +6076,6 @@ bool StorageReplicatedMergeTree::dropPart(
entry.detach = detach;
entry.create_time = time(nullptr);
Coordination::Requests ops;
ops.emplace_back(zkutil::makeCheckRequest(zookeeper_path + "/log", merge_pred.getVersion())); /// Make sure no new events were added to the log.
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/log/log-", entry.toString(), zkutil::CreateMode::PersistentSequential));
ops.emplace_back(zkutil::makeSetRequest(zookeeper_path + "/log", "", -1)); /// Just update version.
@ -6092,7 +6090,7 @@ bool StorageReplicatedMergeTree::dropPart(
else
zkutil::KeeperMultiException::check(rc, ops, responses);
String log_znode_path = dynamic_cast<const Coordination::CreateResponse &>(*responses[1]).path_created;
String log_znode_path = dynamic_cast<const Coordination::CreateResponse &>(*responses[clean_block_ops_size + 1]).path_created;
entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1);
return true;

View File

@ -535,6 +535,7 @@ private:
std::mutex existing_nodes_cache_mutex;
bool existsNodeCached(const std::string & path);
void getClearBlocksInPartitionOps(Coordination::Requests & ops, zkutil::ZooKeeper & zookeeper, const String & partition_id, Int64 min_block_num, Int64 max_block_num);
/// Remove block IDs from `blocks/` in ZooKeeper for the given partition ID in the given block number range.
void clearBlocksInPartition(
zkutil::ZooKeeper & zookeeper, const String & partition_id, Int64 min_block_num, Int64 max_block_num);