Fix lock during fetch

This commit is contained in:
alesapin 2022-02-14 12:20:27 +03:00
parent beb4400978
commit b2886a429b
6 changed files with 56 additions and 12 deletions

View File

@ -771,6 +771,8 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToDiskRemoteMeta(
LOG_DEBUG(log, "Downloading Part {} unique id {} metadata onto disk {}.",
part_name, part_id, disk->getName());
data.lockSharedDataTemporary(part_name, part_id, disk);
static const String TMP_PREFIX = "tmp-fetch_";
String tmp_prefix = tmp_prefix_.empty() ? TMP_PREFIX : tmp_prefix_;
@ -835,7 +837,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToDiskRemoteMeta(
new_data_part->modification_time = time(nullptr);
new_data_part->loadColumnsChecksumsIndexes(true, false);
new_data_part->storage.lockSharedData(*new_data_part);
data.lockSharedData(*new_data_part, /* replace_existing_lock = */ true);
LOG_DEBUG(log, "Download of part {} unique id {} metadata onto disk {} finished.",
part_name, part_id, disk->getName());

View File

@ -63,7 +63,7 @@ private:
class Fetcher final : private boost::noncopyable
{
public:
explicit Fetcher(MergeTreeData & data_) : data(data_), log(&Poco::Logger::get("Fetcher")) {}
explicit Fetcher(StorageReplicatedMergeTree & data_) : data(data_), log(&Poco::Logger::get("Fetcher")) {}
/// Downloads a part to tmp_directory. If to_detached - downloads to the `detached` directory.
MergeTreeData::MutableDataPartPtr fetchPart(
@ -129,7 +129,7 @@ private:
PooledReadWriteBufferFromHTTP & in,
ThrottlerPtr throttler);
MergeTreeData & data;
StorageReplicatedMergeTree & data;
Poco::Logger * log;
};

View File

@ -876,7 +876,7 @@ public:
/// Lock part in zookeeper for shared data in several nodes
/// Overridden in StorageReplicatedMergeTree
virtual void lockSharedData(const IMergeTreeDataPart &) const {}
virtual void lockSharedData(const IMergeTreeDataPart &, bool = false) const {}
/// Unlock shared data part in zookeeper
/// Overridden in StorageReplicatedMergeTree

View File

@ -7168,7 +7168,31 @@ void StorageReplicatedMergeTree::createTableSharedID()
}
void StorageReplicatedMergeTree::lockSharedData(const IMergeTreeDataPart & part) const
void StorageReplicatedMergeTree::lockSharedDataTemporary(const String & part_name, const String & part_id, const DiskPtr & disk) const
{
if (!disk || !disk->supportZeroCopyReplication())
return;
zkutil::ZooKeeperPtr zookeeper = tryGetZooKeeper();
if (!zookeeper)
return;
String id = part_id;
boost::replace_all(id, "/", "_");
Strings zc_zookeeper_paths = getZeroCopyPartPath(*getSettings(), disk->getType(), getTableSharedID(),
part_name, zookeeper_path);
for (const auto & zc_zookeeper_path : zc_zookeeper_paths)
{
String zookeeper_node = fs::path(zc_zookeeper_path) / id / replica_name;
LOG_TRACE(log, "Set zookeeper temporary ephemeral lock {}", zookeeper_node);
createZeroCopyLockNode(zookeeper, zookeeper_node, zkutil::CreateMode::Ephemeral, false);
}
}
void StorageReplicatedMergeTree::lockSharedData(const IMergeTreeDataPart & part, bool replace_existing_lock) const
{
if (!part.volume || !part.isStoredOnDisk())
return;
@ -7190,8 +7214,9 @@ void StorageReplicatedMergeTree::lockSharedData(const IMergeTreeDataPart & part)
{
String zookeeper_node = fs::path(zc_zookeeper_path) / id / replica_name;
LOG_TRACE(log, "Set zookeeper lock {}", zookeeper_node);
createZeroCopyLockNode(zookeeper, zookeeper_node);
LOG_TRACE(log, "Set zookeeper persistent lock {}", zookeeper_node);
createZeroCopyLockNode(zookeeper, zookeeper_node, zkutil::CreateMode::Persistent, replace_existing_lock);
}
}
@ -7677,7 +7702,7 @@ bool StorageReplicatedMergeTree::createEmptyPartInsteadOfLost(zkutil::ZooKeeperP
}
void StorageReplicatedMergeTree::createZeroCopyLockNode(const zkutil::ZooKeeperPtr & zookeeper, const String & zookeeper_node)
void StorageReplicatedMergeTree::createZeroCopyLockNode(const zkutil::ZooKeeperPtr & zookeeper, const String & zookeeper_node, int32_t mode, bool replace_existing)
{
/// In rare case other replica can remove path between createAncestors and createIfNotExists
/// So we make up to 5 attempts
@ -7687,8 +7712,22 @@ void StorageReplicatedMergeTree::createZeroCopyLockNode(const zkutil::ZooKeeperP
try
{
zookeeper->createAncestors(zookeeper_node);
zookeeper->createIfNotExists(zookeeper_node, "lock");
break;
if (replace_existing && zookeeper->exists(zookeeper_node))
{
Coordination::Requests ops;
ops.emplace_back(zkutil::makeRemoveRequest(zookeeper_node, -1));
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_node, "", mode));
Coordination::Responses responses;
auto error = zookeeper->tryMulti(ops, responses);
if (error == Coordination::Error::ZOK)
break;
}
else
{
auto error = zookeeper->tryCreate(zookeeper_node, "", mode);
if (error == Coordination::Error::ZOK || error == Coordination::Error::ZNODEEXISTS)
break;
}
}
catch (const zkutil::KeeperException & e)
{

View File

@ -231,7 +231,9 @@ public:
bool executeFetchShared(const String & source_replica, const String & new_part_name, const DiskPtr & disk, const String & path);
/// Lock part in zookeeper for use shared data in several nodes
void lockSharedData(const IMergeTreeDataPart & part) const override;
void lockSharedData(const IMergeTreeDataPart & part, bool replace_existing_lock) const override;
void lockSharedDataTemporary(const String & part_name, const String & part_id, const DiskPtr & disk) const;
/// Unlock shared data part in zookeeper
/// Return true if data unlocked
@ -758,7 +760,7 @@ private:
static Strings getZeroCopyPartPath(const MergeTreeSettings & settings, DiskType disk_type, const String & table_uuid,
const String & part_name, const String & zookeeper_path_old);
static void createZeroCopyLockNode(const zkutil::ZooKeeperPtr & zookeeper, const String & zookeeper_node);
static void createZeroCopyLockNode(const zkutil::ZooKeeperPtr & zookeeper, const String & zookeeper_node, int32_t mode = zkutil::CreateMode::Persistent, bool replace_exising_lock = false);
bool removeDetachedPart(DiskPtr disk, const String & path, const String & part_name, bool is_freezed) override;

View File

@ -1,3 +1,4 @@
-- Tags: no-s3-storage
DROP TABLE IF EXISTS sparse_tuple;
CREATE TABLE sparse_tuple (id UInt64, t Tuple(a UInt64, s String))