Move Zookeeper lock for S3 shared part in IMergeTreeDataPart

This commit is contained in:
Anton Ivashkin 2020-10-09 17:24:10 +03:00
parent 766dbfd2be
commit 9272ed06b4
4 changed files with 69 additions and 52 deletions

View File

@ -7,7 +7,6 @@
#include <Common/CurrentMetrics.h>
#include <Common/NetException.h>
#include <Common/FileSyncGuard.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <DataStreams/NativeBlockOutputStream.h>
#include <IO/HTTPCommon.h>
#include <IO/createReadBufferFromFileBase.h>
@ -240,17 +239,7 @@ void Service::sendPartS3Metadata(const MergeTreeData::DataPartPtr & part, WriteB
if (disk->getType() != "s3")
throw Exception("S3 disk is not S3 anymore", ErrorCodes::LOGICAL_ERROR);
String id = disk->getUniqueId(part->getFullRelativePath() + "checksums.txt");
if (id.empty())
throw Exception("Can't lock part on S3 storage", ErrorCodes::LOGICAL_ERROR);
String zookeeper_node = zookeeper_path + "/zero_copy_s3/" + id + "/" + replica_name;
LOG_TRACE(log, "Set zookeeper lock {}", id);
zookeeper->createAncestors(zookeeper_node);
zookeeper->createIfNotExists(zookeeper_node, "lock");
part->lockSharedData(zookeeper_path, replica_name, zookeeper);
writeBinary(checksums.files.size(), out);
for (const auto & it : checksums.files)
@ -629,19 +618,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToS3(
new_data_part->modification_time = time(nullptr);
new_data_part->loadColumnsChecksumsIndexes(true, false);
String id = disk->getUniqueId(new_data_part->getFullRelativePath() + "checksums.txt");
if (id.empty())
throw Exception("Can't lock part on S3 storage", ErrorCodes::LOGICAL_ERROR);
String zookeeper_node = zookeeper_path + "/zero_copy_s3/" + id + "/" + replica_name;
LOG_TRACE(log, "Set zookeeper lock {}", id);
zookeeper->createAncestors(zookeeper_node);
zookeeper->createIfNotExists(zookeeper_node, "lock");
new_data_part->lockSharedData(zookeeper_path, replica_name, zookeeper);
return new_data_part;
}

View File

@ -12,6 +12,7 @@
#include <Common/StringUtils/StringUtils.h>
#include <Common/escapeForFileName.h>
#include <Common/FileSyncGuard.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <common/JSON.h>
#include <common/logger_useful.h>
#include <Compression/getCompressionCodecForFile.h>
@ -1079,6 +1080,56 @@ bool IMergeTreeDataPart::checkAllTTLCalculated(const StorageMetadataPtr & metada
return true;
}
void IMergeTreeDataPart::lockSharedData(const String & zookeeper_path, const String & replica_name, zkutil::ZooKeeperPtr zookeeper) const
{
auto disk = volume->getDisk();
if (disk->getType() != "s3")
return;
String id = disk->getUniqueId(getFullRelativePath() + "checksums.txt");
if (id.empty())
throw Exception("Can't lock part on S3 storage", ErrorCodes::LOGICAL_ERROR);
String zookeeper_node = zookeeper_path + "/zero_copy_s3/" + id + "/" + replica_name;
LOG_TRACE(storage.log, "Set zookeeper lock {}", id);
zookeeper->createAncestors(zookeeper_node);
zookeeper->createIfNotExists(zookeeper_node, "lock");
}
bool IMergeTreeDataPart::unlockSharedData(const String & zookeeper_path, const String & replica_name, zkutil::ZooKeeperPtr zookeeper) const
{
auto disk = volume->getDisk();
if (disk->getType() != "s3")
return true;
String id = disk->getUniqueId(getFullRelativePath() + "checksums.txt");
if (id.empty())
return true;
String zookeeper_part_node = zookeeper_path + "/zero_copy_s3/" + id;
String zookeeper_node = zookeeper_part_node + "/" + replica_name;
LOG_TRACE(storage.log, "Remove zookeeper lock for {}", id);
zookeeper->remove(zookeeper_node);
Strings children;
zookeeper->tryGetChildren(zookeeper_part_node, children);
if (!children.empty())
{
LOG_TRACE(storage.log, "Found zookeper locks for {}", id);
}
return children.empty();
}
bool isCompactPart(const MergeTreeDataPartPtr & data_part)
{
return (data_part && data_part->getType() == MergeTreeDataPartType::COMPACT);
@ -1095,3 +1146,4 @@ bool isInMemoryPart(const MergeTreeDataPartPtr & data_part)
}
}

View File

@ -22,6 +22,12 @@
#include <shared_mutex>
namespace zkutil
{
class ZooKeeper;
using ZooKeeperPtr = std::shared_ptr<ZooKeeper>;
}
namespace DB
{
@ -349,6 +355,14 @@ public:
/// part creation (using alter query with materialize_ttl setting).
bool checkAllTTLCalculated(const StorageMetadataPtr & metadata_snapshot) const;
/// Lock part in zookeeper for use common S3 data in several nodes
void lockSharedData(const String & zookeeper_path, const String & replica_name, zkutil::ZooKeeperPtr zookeeper) const;
/// Unlock common S3 data part in zookeeper
/// Return true if data unlocked
/// Return false if data is still used by another node
bool unlockSharedData(const String & zookeeper_path, const String & replica_name, zkutil::ZooKeeperPtr zookeeper) const;
protected:
/// Total size of all columns, calculated once in calcuateColumnSizesOnDisk

View File

@ -5111,33 +5111,7 @@ void StorageReplicatedMergeTree::clearOldPartsAndRemoveFromZK()
{
try
{
bool keep_s3 = false;
auto disk = part->volume->getDisk();
if (disk->getType() == "s3")
{
String id = disk->getUniqueId(part->getFullRelativePath() + "checksums.txt");
if (!id.empty())
{
String zookeeper_part_node = zookeeper_path + "/zero_copy_s3/" + id;
String zookeeper_node = zookeeper_part_node + "/" + replica_name;
LOG_TRACE(log, "Remove zookeeper lock for {}", id);
zookeeper->remove(zookeeper_node);
Strings children;
zookeeper->tryGetChildren(zookeeper_part_node, children);
if (!children.empty())
{
LOG_TRACE(log, "Found zookeper locks for {}", id);
keep_s3 = true;
}
}
}
bool keep_s3 = !part->unlockSharedData(zookeeper_path, replica_name, zookeeper);
part->remove(keep_s3);
}
catch (...)