Implement ReplicatedMergeTree move to operation

This commit is contained in:
Guillaume Tassery 2019-08-28 10:24:17 +02:00
parent 6f5969e1b5
commit 62e302d689

View File

@ -4974,9 +4974,175 @@ void StorageReplicatedMergeTree::replacePartitionFrom(const StoragePtr & source_
waitForAllReplicasToProcessLogEntry(entry);
}
void StorageReplicatedMergeTree::movePartitionTo(const StoragePtr & /*source_table*/, const ASTPtr & /*partition*/, const Context & /*context*/)
void StorageReplicatedMergeTree::movePartitionTo(const StoragePtr & dest_table, const ASTPtr & partition, const Context & context)
{
// TODO: Implement
auto dest_table_storage = std::dynamic_pointer_cast<StorageReplicatedMergeTree>(dest_table);
if (!dest_table_storage)
throw Exception("Table " + this->getTableName() + " supports attachPartitionFrom only for MergeTree family of table engines."
" Got " + dest_table->getName(), ErrorCodes::NOT_IMPLEMENTED);
auto lock1 = lockStructureForShare(false, context.getCurrentQueryId());
auto lock2 = dest_table->lockStructureForShare(false, context.getCurrentQueryId());
Stopwatch watch;
MergeTreeData & src_data = dest_table_storage->checkStructureAndGetMergeTreeData(this);
String partition_id = getPartitionIDFromQuery(partition, context);
DataPartsVector src_all_parts = src_data.getDataPartsVectorInPartition(MergeTreeDataPartState::Committed, partition_id);
DataPartsVector src_parts;
MutableDataPartsVector dst_parts;
Strings block_id_paths;
Strings part_checksums;
std::vector<EphemeralLockInZooKeeper> ephemeral_locks;
LOG_DEBUG(log, "Cloning " << src_all_parts.size() << " parts");
static const String TMP_PREFIX = "tmp_replace_from_";
auto zookeeper = getZooKeeper();
/// Firstly, generate last block number and compute drop_range
/// NOTE: Even if we make ATTACH PARTITION instead of REPLACE PARTITION drop_range will not be empty, it will contain a block.
/// So, such case has special meaning, if drop_range contains only one block it means that nothing to drop.
MergeTreePartInfo drop_range;
drop_range.partition_id = partition_id;
drop_range.max_block = allocateBlockNumber(partition_id, zookeeper)->getNumber();
drop_range.min_block = 0;
drop_range.level = std::numeric_limits<decltype(drop_range.level)>::max();
String drop_range_fake_part_name = getPartNamePossiblyFake(format_version, drop_range);
if (drop_range.getBlocksCount() > 1)
{
/// We have to prohibit merges in drop_range, since new merge log entry appeared after this REPLACE FROM entry
/// could produce new merged part instead in place of just deleted parts.
/// It is better to prohibit them on leader replica (like DROP PARTITION makes),
/// but it is inconvenient for a user since he could actually use source table from this replica.
/// Therefore prohibit merges on the initializer server now and on the remaining servers when log entry will be executed.
/// It does not provides strong guarantees, but is suitable for intended use case (assume merges are quite rare).
{
std::lock_guard merge_selecting_lock(merge_selecting_mutex);
queue.disableMergesInRange(drop_range_fake_part_name);
}
}
for (size_t i = 0; i < src_all_parts.size(); ++i)
{
/// We also make some kind of deduplication to avoid duplicated parts in case of ATTACH PARTITION
/// Assume that merges in the partition are quite rare
/// Save deduplication block ids with special prefix replace_partition
auto & src_part = src_all_parts[i];
if (!dest_table_storage->canReplacePartition(src_part))
throw Exception(
"Cannot replace partition '" + partition_id + "' because part '" + src_part->name + "' has inconsistent granularity with table",
ErrorCodes::LOGICAL_ERROR);
String hash_hex = src_part->checksums.getTotalChecksumHex();
String block_id_path = "" ;
auto lock = dest_table_storage->allocateBlockNumber(partition_id, zookeeper, block_id_path);
if (!lock)
{
LOG_INFO(log, "Part " << src_part->name << " (hash " << hash_hex << ") has been already attached");
continue;
}
UInt64 index = lock->getNumber();
MergeTreePartInfo dst_part_info(partition_id, index, index, src_part->info.level);
auto dst_part = dest_table_storage->cloneAndLoadDataPart(src_part, TMP_PREFIX, dst_part_info);
src_parts.emplace_back(src_part);
dst_parts.emplace_back(dst_part);
ephemeral_locks.emplace_back(std::move(*lock));
block_id_paths.emplace_back(block_id_path);
part_checksums.emplace_back(hash_hex);
}
ReplicatedMergeTreeLogEntryData entry;
{
entry.type = ReplicatedMergeTreeLogEntryData::REPLACE_RANGE;
entry.source_replica = replica_name;
entry.create_time = time(nullptr);
entry.replace_range_entry = std::make_shared<ReplicatedMergeTreeLogEntryData::ReplaceRangeEntry>();
auto & entry_replace = *entry.replace_range_entry;
entry_replace.drop_range_part_name = drop_range_fake_part_name;
entry_replace.from_database = src_data.database_name;
entry_replace.from_table = src_data.table_name;
for (const auto & part : src_parts)
entry_replace.src_part_names.emplace_back(part->name);
for (const auto & part : dst_parts)
entry_replace.new_part_names.emplace_back(part->name);
for (const String & checksum : part_checksums)
entry_replace.part_names_checksums.emplace_back(checksum);
entry_replace.columns_version = columns_version;
}
/// We are almost ready to commit changes, remove fetches and merges from drop range
queue.removePartProducingOpsInRange(zookeeper, drop_range, entry);
/// Remove deduplication block_ids of replacing parts
dest_table_storage->clearBlocksInPartition(*zookeeper, drop_range.partition_id, drop_range.max_block, drop_range.max_block);
DataPartsVector parts_to_remove;
Coordination::Responses op_results;
try
{
Coordination::Requests ops;
for (size_t i = 0; i < dst_parts.size(); ++i)
{
dest_table_storage->getCommitPartOps(ops, dst_parts[i], block_id_paths[i]);
ephemeral_locks[i].getUnlockOps(ops);
if (ops.size() > zkutil::MULTI_BATCH_SIZE)
{
/// It is unnecessary to add parts to working set until we commit log entry
zookeeper->multi(ops);
ops.clear();
}
}
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/log/log-", entry.toString(), zkutil::CreateMode::PersistentSequential));
{
Transaction transaction(*dest_table_storage);
auto data_parts_lock = lockParts();
for (MutableDataPartPtr & part : dst_parts)
dest_table_storage->renameTempPartAndReplace(part, nullptr, &transaction, data_parts_lock);
transaction.commit(&data_parts_lock);
parts_to_remove = removePartsInRangeFromWorkingSet(drop_range, true, false, data_parts_lock);
}
op_results = zookeeper->multi(ops);
PartLog::addNewParts(global_context, dst_parts, watch.elapsed());
}
catch (...)
{
PartLog::addNewParts(global_context, dst_parts, watch.elapsed(), ExecutionStatus::fromCurrentException());
throw;
}
String log_znode_path = dynamic_cast<const Coordination::CreateResponse &>(*op_results.back()).path_created;
entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1);
for (auto & lock : ephemeral_locks)
lock.assumeUnlocked();
/// Forcibly remove replaced parts from ZooKeeper
tryRemovePartsFromZooKeeperWithRetries(parts_to_remove);
/// Speedup removing of replaced parts from filesystem
parts_to_remove.clear();
cleanup_thread.wakeup();
/// If necessary, wait until the operation is performed on all replicas.
if (context.getSettingsRef().replication_alter_partitions_sync > 1)
waitForAllReplicasToProcessLogEntry(entry);
}
void StorageReplicatedMergeTree::getCommitPartOps(