Make method StorageReplicatedMergeTree::fetchExistsPart

This commit is contained in:
Anton Ivashkin 2021-03-09 20:49:50 +03:00
parent 265d293934
commit aff13c0c52
2 changed files with 146 additions and 49 deletions

View File

@ -1988,8 +1988,7 @@ bool StorageReplicatedMergeTree::executeFetchShared(
try
{
if (!fetchPart(new_part_name, metadata_snapshot, zookeeper_path + "/replicas/" + source_replica, false, 0,
nullptr, true, disk, path))
if (!fetchExistsPart(new_part_name, metadata_snapshot, zookeeper_path + "/replicas/" + source_replica, disk, path))
return false;
}
catch (Exception & e)
@ -3546,8 +3545,7 @@ bool StorageReplicatedMergeTree::partIsLastQuorumPart(const MergeTreePartInfo &
}
bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const StorageMetadataPtr & metadata_snapshot,
const String & source_replica_path, bool to_detached, size_t quorum, zkutil::ZooKeeper::Ptr zookeeper_, bool replace_exists,
DiskPtr replaced_disk, String replaced_part_path)
const String & source_replica_path, bool to_detached, size_t quorum, zkutil::ZooKeeper::Ptr zookeeper_)
{
auto zookeeper = zookeeper_ ? zookeeper_ : getZooKeeper();
const auto part_info = MergeTreePartInfo::fromPartName(part_name, format_version);
@ -3598,7 +3596,6 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Stora
DataPartPtr part_to_clone;
if (!replace_exists)
{
/// If the desired part is a result of a part mutation, try to find the source part and compare
/// its checksums to the checksums of the desired part. If they match, we can just clone the local part.
@ -3658,8 +3655,7 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Stora
return fetcher.fetchPart(
metadata_snapshot, part_name, source_replica_path,
address.host, address.replication_port,
timeouts, user_password.first, user_password.second, interserver_scheme, to_detached, "", true,
replace_exists ? replaced_disk : nullptr);
timeouts, user_password.first, user_password.second, interserver_scheme, to_detached, "", true);
};
}
@ -3669,51 +3665,41 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Stora
if (!to_detached)
{
if (replace_exists)
Transaction transaction(*this);
renameTempPartAndReplace(part, nullptr, &transaction);
replaced_parts = checkPartChecksumsAndCommit(transaction, part);
/** If a quorum is tracked for this part, you must update it.
* If you do not have time, in case of losing the session, when you restart the server - see the `ReplicatedMergeTreeRestartingThread::updateQuorumIfWeHavePart` method.
*/
if (quorum)
{
if (part->volume->getDisk()->getName() != replaced_disk->getName())
throw Exception("Part " + part->name + " fetched on wrong disk " + part->volume->getDisk()->getName(), ErrorCodes::LOGICAL_ERROR);
replaced_disk->removeFileIfExists(replaced_part_path);
replaced_disk->moveDirectory(part->getFullRelativePath(), replaced_part_path);
/// Check if this quorum insert is parallel or not
if (zookeeper->exists(zookeeper_path + "/quorum/parallel/" + part_name))
updateQuorum(part_name, true);
else if (zookeeper->exists(zookeeper_path + "/quorum/status"))
updateQuorum(part_name, false);
}
else
/// merged parts that are still inserted with quorum. if it only contains one block, it hasn't been merged before
if (part_info.level != 0 || part_info.mutation != 0)
{
Transaction transaction(*this);
renameTempPartAndReplace(part, nullptr, &transaction);
replaced_parts = checkPartChecksumsAndCommit(transaction, part);
/** If a quorum is tracked for this part, you must update it.
* If you do not have time, in case of losing the session, when you restart the server - see the `ReplicatedMergeTreeRestartingThread::updateQuorumIfWeHavePart` method.
*/
if (quorum)
Strings quorum_parts = zookeeper->getChildren(zookeeper_path + "/quorum/parallel");
for (const String & quorum_part : quorum_parts)
{
/// Check if this quorum insert is parallel or not
if (zookeeper->exists(zookeeper_path + "/quorum/parallel/" + part_name))
updateQuorum(part_name, true);
else if (zookeeper->exists(zookeeper_path + "/quorum/status"))
updateQuorum(part_name, false);
auto quorum_part_info = MergeTreePartInfo::fromPartName(quorum_part, format_version);
if (part_info.contains(quorum_part_info))
updateQuorum(quorum_part, true);
}
}
/// merged parts that are still inserted with quorum. if it only contains one block, it hasn't been merged before
if (part_info.level != 0 || part_info.mutation != 0)
{
Strings quorum_parts = zookeeper->getChildren(zookeeper_path + "/quorum/parallel");
for (const String & quorum_part : quorum_parts)
{
auto quorum_part_info = MergeTreePartInfo::fromPartName(quorum_part, format_version);
if (part_info.contains(quorum_part_info))
updateQuorum(quorum_part, true);
}
}
merge_selecting_task->schedule();
merge_selecting_task->schedule();
for (const auto & replaced_part : replaced_parts)
{
LOG_DEBUG(log, "Part {} is rendered obsolete by fetching part {}", replaced_part->name, part_name);
ProfileEvents::increment(ProfileEvents::ObsoleteReplicatedParts);
}
for (const auto & replaced_part : replaced_parts)
{
LOG_DEBUG(log, "Part {} is rendered obsolete by fetching part {}", replaced_part->name, part_name);
ProfileEvents::increment(ProfileEvents::ObsoleteReplicatedParts);
}
write_part_log({});
@ -3753,6 +3739,109 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Stora
}
bool StorageReplicatedMergeTree::fetchExistsPart(const String & part_name, const StorageMetadataPtr & metadata_snapshot,
const String & source_replica_path, DiskPtr replaced_disk, String replaced_part_path)
{
auto zookeeper = getZooKeeper();
const auto part_info = MergeTreePartInfo::fromPartName(part_name, format_version);
if (auto part = getPartIfExists(part_info, {IMergeTreeDataPart::State::Outdated, IMergeTreeDataPart::State::Deleting}))
{
LOG_DEBUG(log, "Part {} should be deleted after previous attempt before fetch", part->name);
/// Force immediate parts cleanup to delete the part that was left from the previous fetch attempt.
cleanup_thread.wakeup();
return false;
}
{
std::lock_guard lock(currently_fetching_parts_mutex);
if (!currently_fetching_parts.insert(part_name).second)
{
LOG_DEBUG(log, "Part {} is already fetching right now", part_name);
return false;
}
}
if (part_name != "foo")
return false;
SCOPE_EXIT
({
std::lock_guard lock(currently_fetching_parts_mutex);
currently_fetching_parts.erase(part_name);
});
LOG_DEBUG(log, "Fetching part {} from {}", part_name, source_replica_path);
TableLockHolder table_lock_holder = lockForShare(RWLockImpl::NO_QUERY, getSettings()->lock_acquire_timeout_for_background_operations);
/// Logging
Stopwatch stopwatch;
MutableDataPartPtr part;
DataPartsVector replaced_parts;
auto write_part_log = [&] (const ExecutionStatus & execution_status)
{
writePartLog(
PartLogElement::DOWNLOAD_PART, execution_status, stopwatch.elapsed(),
part_name, part, replaced_parts, nullptr);
};
std::function<MutableDataPartPtr()> get_part;
{
ReplicatedMergeTreeAddress address(zookeeper->get(source_replica_path + "/host"));
auto timeouts = ConnectionTimeouts::getHTTPTimeouts(global_context);
auto user_password = global_context.getInterserverCredentials();
String interserver_scheme = global_context.getInterserverScheme();
get_part = [&, address, timeouts, user_password, interserver_scheme]()
{
if (interserver_scheme != address.scheme)
throw Exception("Interserver schemes are different: '" + interserver_scheme
+ "' != '" + address.scheme + "', can't fetch part from " + address.host,
ErrorCodes::INTERSERVER_SCHEME_DOESNT_MATCH);
return fetcher.fetchPart(
metadata_snapshot, part_name, source_replica_path,
address.host, address.replication_port,
timeouts, user_password.first, user_password.second, interserver_scheme, false, "", true,
replaced_disk);
};
}
try
{
part = get_part();
if (part->volume->getDisk()->getName() != replaced_disk->getName())
throw Exception("Part " + part->name + " fetched on wrong disk " + part->volume->getDisk()->getName(), ErrorCodes::LOGICAL_ERROR);
replaced_disk->removeFileIfExists(replaced_part_path);
replaced_disk->moveDirectory(part->getFullRelativePath(), replaced_part_path);
}
catch (const Exception & e)
{
/// The same part is being written right now (but probably it's not committed yet).
/// We will check the need for fetch later.
if (e.code() == ErrorCodes::DIRECTORY_ALREADY_EXISTS)
return false;
throw;
}
catch (...)
{
write_part_log(ExecutionStatus::fromCurrentException());
throw;
}
ProfileEvents::increment(ProfileEvents::ReplicatedPartFetches);
LOG_DEBUG(log, "Fetched part {} from {}", part_name, source_replica_path);
return true;
}
void StorageReplicatedMergeTree::startup()
{
if (is_readonly)

View File

@ -532,10 +532,18 @@ private:
const String & replica_path,
bool to_detached,
size_t quorum,
zkutil::ZooKeeper::Ptr zookeeper_ = nullptr,
bool replace_exists = false,
DiskPtr replaced_disk = nullptr,
String replaced_part_path = "");
zkutil::ZooKeeper::Ptr zookeeper_ = nullptr);
/** Download the specified part from the specified replica.
* Used for replace local part on the same s3-shared part in hybrid storage.
* Returns false if part is already fetching right now.
*/
bool fetchExistsPart(
const String & part_name,
const StorageMetadataPtr & metadata_snapshot,
const String & replica_path,
DiskPtr replaced_disk,
String replaced_part_path);
/// Required only to avoid races between executeLogEntry and fetchPartition
std::unordered_set<String> currently_fetching_parts;