mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-20 00:30:49 +00:00
Merge pull request #47631 from ClickHouse/fix_wait_for_zero_copy_lock
Fix wait for zero copy lock during move
This commit is contained in:
commit
efa0622a84
@ -41,6 +41,16 @@ ZooKeeperLock::~ZooKeeperLock()
|
||||
}
|
||||
}
|
||||
|
||||
bool ZooKeeperLock::isLocked() const
|
||||
{
|
||||
return locked;
|
||||
}
|
||||
|
||||
const std::string & ZooKeeperLock::getLockPath() const
|
||||
{
|
||||
return lock_path;
|
||||
}
|
||||
|
||||
void ZooKeeperLock::unlock()
|
||||
{
|
||||
if (!locked)
|
||||
|
@ -37,6 +37,8 @@ public:
|
||||
|
||||
void unlock();
|
||||
bool tryLock();
|
||||
bool isLocked() const;
|
||||
const std::string & getLockPath() const;
|
||||
|
||||
private:
|
||||
zkutil::ZooKeeperPtr zookeeper;
|
||||
|
@ -218,7 +218,7 @@ ReplicatedMergeMutateTaskBase::PrepareResult MergeFromLogEntryTask::prepare()
|
||||
|
||||
zero_copy_lock = storage.tryCreateZeroCopyExclusiveLock(entry.new_part_name, disk);
|
||||
|
||||
if (!zero_copy_lock)
|
||||
if (!zero_copy_lock || !zero_copy_lock->isLocked())
|
||||
{
|
||||
LOG_DEBUG(log, "Merge of part {} started by some other replica, will wait it and fetch merged part", entry.new_part_name);
|
||||
/// Don't check for missing part -- it's missing because other replica still not
|
||||
|
@ -7484,7 +7484,7 @@ MovePartsOutcome MergeTreeData::movePartsToSpace(const DataPartsVector & parts,
|
||||
if (moving_tagger->parts_to_move.empty())
|
||||
return MovePartsOutcome::NothingToMove;
|
||||
|
||||
return moveParts(moving_tagger);
|
||||
return moveParts(moving_tagger, true);
|
||||
}
|
||||
|
||||
MergeTreeData::CurrentlyMovingPartsTaggerPtr MergeTreeData::selectPartsForMove()
|
||||
@ -7539,7 +7539,7 @@ MergeTreeData::CurrentlyMovingPartsTaggerPtr MergeTreeData::checkPartsForMove(co
|
||||
return std::make_shared<CurrentlyMovingPartsTagger>(std::move(parts_to_move), *this);
|
||||
}
|
||||
|
||||
MovePartsOutcome MergeTreeData::moveParts(const CurrentlyMovingPartsTaggerPtr & moving_tagger)
|
||||
MovePartsOutcome MergeTreeData::moveParts(const CurrentlyMovingPartsTaggerPtr & moving_tagger, bool wait_for_move_if_zero_copy)
|
||||
{
|
||||
LOG_INFO(log, "Got {} parts to move.", moving_tagger->parts_to_move.size());
|
||||
|
||||
@ -7588,21 +7588,41 @@ MovePartsOutcome MergeTreeData::moveParts(const CurrentlyMovingPartsTaggerPtr &
|
||||
auto disk = moving_part.reserved_space->getDisk();
|
||||
if (supportsReplication() && disk->supportZeroCopyReplication() && settings->allow_remote_fs_zero_copy_replication)
|
||||
{
|
||||
/// If we acquired lock than let's try to move. After one
|
||||
/// replica will actually move the part from disk to some
|
||||
/// zero-copy storage other replicas will just fetch
|
||||
/// metainformation.
|
||||
if (auto lock = tryCreateZeroCopyExclusiveLock(moving_part.part->name, disk); lock)
|
||||
/// This loop is not endless, if shutdown called/connection failed/replica became readonly
|
||||
/// we will return true from waitZeroCopyLock and createZeroCopyLock will return nullopt.
|
||||
while (true)
|
||||
{
|
||||
cloned_part = parts_mover.clonePart(moving_part);
|
||||
parts_mover.swapClonedPart(cloned_part);
|
||||
}
|
||||
else
|
||||
{
|
||||
/// Move will be retried but with backoff.
|
||||
LOG_DEBUG(log, "Move of part {} postponed, because zero copy mode enabled and someone other moving this part right now", moving_part.part->name);
|
||||
result = MovePartsOutcome::MoveWasPostponedBecauseOfZeroCopy;
|
||||
continue;
|
||||
/// If we acquired lock than let's try to move. After one
|
||||
/// replica will actually move the part from disk to some
|
||||
/// zero-copy storage other replicas will just fetch
|
||||
/// metainformation.
|
||||
if (auto lock = tryCreateZeroCopyExclusiveLock(moving_part.part->name, disk); lock)
|
||||
{
|
||||
if (lock->isLocked())
|
||||
{
|
||||
cloned_part = parts_mover.clonePart(moving_part);
|
||||
parts_mover.swapClonedPart(cloned_part);
|
||||
break;
|
||||
}
|
||||
else if (wait_for_move_if_zero_copy)
|
||||
{
|
||||
LOG_DEBUG(log, "Other replica is working on move of {}, will wait until lock disappear", moving_part.part->name);
|
||||
/// Wait and checks not only for timeout but also for shutdown and so on.
|
||||
while (!waitZeroCopyLockToDisappear(*lock, 3000))
|
||||
{
|
||||
LOG_DEBUG(log, "Waiting until some replica will move {} and zero copy lock disappear", moving_part.part->name);
|
||||
}
|
||||
}
|
||||
else
|
||||
break;
|
||||
}
|
||||
else
|
||||
{
|
||||
/// Move will be retried but with backoff.
|
||||
LOG_DEBUG(log, "Move of part {} postponed, because zero copy mode enabled and someone other moving this part right now", moving_part.part->name);
|
||||
result = MovePartsOutcome::MoveWasPostponedBecauseOfZeroCopy;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
else /// Ordinary move as it should be
|
||||
|
@ -1456,7 +1456,7 @@ private:
|
||||
using CurrentlyMovingPartsTaggerPtr = std::shared_ptr<CurrentlyMovingPartsTagger>;
|
||||
|
||||
/// Move selected parts to corresponding disks
|
||||
MovePartsOutcome moveParts(const CurrentlyMovingPartsTaggerPtr & moving_tagger);
|
||||
MovePartsOutcome moveParts(const CurrentlyMovingPartsTaggerPtr & moving_tagger, bool wait_for_move_if_zero_copy=false);
|
||||
|
||||
/// Select parts for move and disks for them. Used in background moving processes.
|
||||
CurrentlyMovingPartsTaggerPtr selectPartsForMove();
|
||||
@ -1511,6 +1511,7 @@ private:
|
||||
/// Create zero-copy exclusive lock for part and disk. Useful for coordination of
|
||||
/// distributed operations which can lead to data duplication. Implemented only in ReplicatedMergeTree.
|
||||
virtual std::optional<ZeroCopyLock> tryCreateZeroCopyExclusiveLock(const String &, const DiskPtr &) { return std::nullopt; }
|
||||
virtual bool waitZeroCopyLockToDisappear(const ZeroCopyLock &, size_t) { return false; }
|
||||
|
||||
/// Remove parts from disk calling part->remove(). Can do it in parallel in case of big set of parts and enabled settings.
|
||||
/// If we fail to remove some part and throw_on_error equal to `true` will throw an exception on the first failed part.
|
||||
|
@ -127,7 +127,7 @@ ReplicatedMergeMutateTaskBase::PrepareResult MutateFromLogEntryTask::prepare()
|
||||
|
||||
zero_copy_lock = storage.tryCreateZeroCopyExclusiveLock(entry.new_part_name, disk);
|
||||
|
||||
if (!zero_copy_lock)
|
||||
if (!zero_copy_lock || !zero_copy_lock->isLocked())
|
||||
{
|
||||
LOG_DEBUG(log, "Mutation of part {} started by some other replica, will wait it and mutated merged part", entry.new_part_name);
|
||||
return PrepareResult{
|
||||
|
@ -14,6 +14,7 @@ struct ZeroCopyLock
|
||||
{
|
||||
ZeroCopyLock(const zkutil::ZooKeeperPtr & zookeeper, const std::string & lock_path, const std::string & lock_message);
|
||||
|
||||
bool isLocked() const { return lock->isLocked(); }
|
||||
/// Actual lock
|
||||
std::unique_ptr<zkutil::ZooKeeperLock> lock;
|
||||
};
|
||||
|
@ -8599,11 +8599,37 @@ std::optional<String> StorageReplicatedMergeTree::getZeroCopyPartPath(const Stri
|
||||
return getZeroCopyPartPath(*getSettings(), toString(disk->getDataSourceDescription().type), getTableSharedID(), part_name, zookeeper_path)[0];
|
||||
}
|
||||
|
||||
bool StorageReplicatedMergeTree::waitZeroCopyLockToDisappear(const ZeroCopyLock & lock, size_t milliseconds_to_wait)
|
||||
{
|
||||
if (lock.isLocked())
|
||||
return true;
|
||||
|
||||
if (partial_shutdown_called.load(std::memory_order_relaxed))
|
||||
return true;
|
||||
|
||||
auto lock_path = lock.lock->getLockPath();
|
||||
zkutil::ZooKeeperPtr zookeeper = tryGetZooKeeper();
|
||||
if (!zookeeper)
|
||||
return true;
|
||||
|
||||
Stopwatch time_waiting;
|
||||
const auto & stop_waiting = [&]()
|
||||
{
|
||||
bool timeout_exceeded = milliseconds_to_wait < time_waiting.elapsedMilliseconds();
|
||||
return partial_shutdown_called.load(std::memory_order_relaxed) || is_readonly.load(std::memory_order_relaxed) || timeout_exceeded;
|
||||
};
|
||||
|
||||
return zookeeper->waitForDisappear(lock_path, stop_waiting);
|
||||
}
|
||||
|
||||
std::optional<ZeroCopyLock> StorageReplicatedMergeTree::tryCreateZeroCopyExclusiveLock(const String & part_name, const DiskPtr & disk)
|
||||
{
|
||||
if (!disk || !disk->supportZeroCopyReplication())
|
||||
return std::nullopt;
|
||||
|
||||
if (partial_shutdown_called.load(std::memory_order_relaxed) || is_readonly.load(std::memory_order_relaxed))
|
||||
return std::nullopt;
|
||||
|
||||
zkutil::ZooKeeperPtr zookeeper = tryGetZooKeeper();
|
||||
if (!zookeeper)
|
||||
return std::nullopt;
|
||||
@ -8616,10 +8642,8 @@ std::optional<ZeroCopyLock> StorageReplicatedMergeTree::tryCreateZeroCopyExclusi
|
||||
|
||||
/// Create actual lock
|
||||
ZeroCopyLock lock(zookeeper, zc_zookeeper_path, replica_name);
|
||||
if (lock.lock->tryLock())
|
||||
return lock;
|
||||
else
|
||||
return std::nullopt;
|
||||
lock.lock->tryLock();
|
||||
return lock;
|
||||
}
|
||||
|
||||
String StorageReplicatedMergeTree::findReplicaHavingPart(
|
||||
|
@ -866,9 +866,14 @@ private:
|
||||
std::optional<String> getZeroCopyPartPath(const String & part_name, const DiskPtr & disk);
|
||||
|
||||
/// Create ephemeral lock in zookeeper for part and disk which support zero copy replication.
|
||||
/// If somebody already holding the lock -- return std::nullopt.
|
||||
/// If no connection to zookeeper, shutdown, readonly -- return std::nullopt.
|
||||
/// If somebody already holding the lock -- return unlocked ZeroCopyLock object (not std::nullopt).
|
||||
std::optional<ZeroCopyLock> tryCreateZeroCopyExclusiveLock(const String & part_name, const DiskPtr & disk) override;
|
||||
|
||||
/// Wait for ephemral lock to disappear. Return true if table shutdown/readonly/timeout exceeded, etc.
|
||||
/// Or if node actually disappeared.
|
||||
bool waitZeroCopyLockToDisappear(const ZeroCopyLock & lock, size_t milliseconds_to_wait) override;
|
||||
|
||||
void startupImpl(bool from_attach_thread);
|
||||
};
|
||||
|
||||
|
@ -5,6 +5,7 @@ import random
|
||||
import string
|
||||
import time
|
||||
|
||||
from multiprocessing.dummy import Pool
|
||||
import pytest
|
||||
from helpers.cluster import ClickHouseCluster
|
||||
|
||||
@ -102,3 +103,79 @@ SETTINGS index_granularity = 8192, storage_policy = 's3'"""
|
||||
assert part_to_disk["20230102_0_0_0"] == "s3"
|
||||
assert part_to_disk["20230109_0_0_0"] == "s3"
|
||||
assert part_to_disk["20230116_0_0_0"] == "default"
|
||||
|
||||
|
||||
def test_concurrent_move_to_s3(started_cluster):
|
||||
node1 = cluster.instances["node1"]
|
||||
node2 = cluster.instances["node2"]
|
||||
|
||||
node1.query(
|
||||
"""
|
||||
CREATE TABLE test_concurrent_move (EventDate Date, CounterID UInt32)
|
||||
ENGINE = ReplicatedMergeTree('/clickhouse-tables/test_concurrent_move', 'r1')
|
||||
PARTITION BY CounterID
|
||||
ORDER BY (CounterID, EventDate)
|
||||
SETTINGS index_granularity = 8192, storage_policy = 's3'"""
|
||||
)
|
||||
|
||||
node2.query(
|
||||
"""
|
||||
CREATE TABLE test_concurrent_move (EventDate Date, CounterID UInt32)
|
||||
ENGINE = ReplicatedMergeTree('/clickhouse-tables/test_concurrent_move', 'r2')
|
||||
PARTITION BY CounterID
|
||||
ORDER BY (CounterID, EventDate)
|
||||
SETTINGS index_granularity = 8192, storage_policy = 's3'"""
|
||||
)
|
||||
partitions = range(10)
|
||||
|
||||
for i in partitions:
|
||||
node1.query(
|
||||
f"INSERT INTO test_concurrent_move SELECT toDate('2023-01-01') + toIntervalDay(number), {i} from system.numbers limit 20"
|
||||
)
|
||||
node1.query(
|
||||
f"INSERT INTO test_concurrent_move SELECT toDate('2023-01-01') + toIntervalDay(number) + rand(), {i} from system.numbers limit 20"
|
||||
)
|
||||
node1.query(
|
||||
f"INSERT INTO test_concurrent_move SELECT toDate('2023-01-01') + toIntervalDay(number) + rand(), {i} from system.numbers limit 20"
|
||||
)
|
||||
node1.query(
|
||||
f"INSERT INTO test_concurrent_move SELECT toDate('2023-01-01') + toIntervalDay(number) + rand(), {i} from system.numbers limit 20"
|
||||
)
|
||||
|
||||
node2.query("SYSTEM SYNC REPLICA test_concurrent_move")
|
||||
|
||||
# check that we can move parts concurrently without exceptions
|
||||
p = Pool(3)
|
||||
for i in partitions:
|
||||
|
||||
def move_partition_to_s3(node):
|
||||
node.query(
|
||||
f"ALTER TABLE test_concurrent_move MOVE PARTITION '{i}' TO DISK 's3'"
|
||||
)
|
||||
|
||||
j1 = p.apply_async(move_partition_to_s3, (node1,))
|
||||
j2 = p.apply_async(move_partition_to_s3, (node2,))
|
||||
j1.get()
|
||||
j2.get()
|
||||
|
||||
def get_part_to_disk(query_result):
|
||||
part_to_disk = {}
|
||||
for row in query_result.strip().split("\n"):
|
||||
disk, part = row.split("\t")
|
||||
part_to_disk[part] = disk
|
||||
return part_to_disk
|
||||
|
||||
part_to_disk = get_part_to_disk(
|
||||
node1.query(
|
||||
"SELECT disk_name, name FROM system.parts where table = 'test_concurrent_move' and active"
|
||||
)
|
||||
)
|
||||
|
||||
assert all([value == "s3" for value in part_to_disk.values()])
|
||||
|
||||
part_to_disk = get_part_to_disk(
|
||||
node2.query(
|
||||
"SELECT disk_name, name FROM system.parts where table = 'test_concurrent_move' and active"
|
||||
)
|
||||
)
|
||||
assert all([value == "s3" for value in part_to_disk.values()])
|
||||
|
Loading…
Reference in New Issue
Block a user