mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-14 19:45:11 +00:00
Fix two bugs and add test
This commit is contained in:
parent
ec2c860a39
commit
a95a5f6fb6
@ -574,6 +574,9 @@ void DataPartStorageOnDiskBase::remove(
|
||||
if (e.code() == ErrorCodes::FILE_DOESNT_EXIST)
|
||||
{
|
||||
LOG_ERROR(log, "Directory {} (part to remove) doesn't exist or one of nested files has gone. Most likely this is due to manual removing. This should be discouraged. Ignoring.", fullPath(disk, from));
|
||||
/// We will never touch this part again, so unlocking it from zero-copy
|
||||
if (!can_remove_description)
|
||||
can_remove_description.emplace(can_remove_callback());
|
||||
return;
|
||||
}
|
||||
throw;
|
||||
@ -584,6 +587,10 @@ void DataPartStorageOnDiskBase::remove(
|
||||
{
|
||||
LOG_ERROR(log, "Directory {} (part to remove) doesn't exist or one of nested files has gone. "
|
||||
"Most likely this is due to manual removing. This should be discouraged. Ignoring.", fullPath(disk, from));
|
||||
/// We will never touch this part again, so unlocking it from zero-copy
|
||||
if (!can_remove_description)
|
||||
can_remove_description.emplace(can_remove_callback());
|
||||
|
||||
return;
|
||||
}
|
||||
throw;
|
||||
|
@ -947,6 +947,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToDisk(
|
||||
new_data_part->checksums.checkEqual(data_checksums, false);
|
||||
LOG_DEBUG(log, "Download of part {} onto disk {} finished.", part_name, disk->getName());
|
||||
}
|
||||
|
||||
if (zero_copy_temporary_lock_holder)
|
||||
zero_copy_temporary_lock_holder->setAlreadyRemoved();
|
||||
|
||||
|
@ -1474,6 +1474,7 @@ MergeTreeData::DataPartsVector StorageReplicatedMergeTree::checkPartChecksumsAnd
|
||||
|
||||
while (true)
|
||||
{
|
||||
LOG_DEBUG(log, "Committing part {} to zookeeper", part->name);
|
||||
Coordination::Requests ops;
|
||||
NameSet absent_part_paths_on_replicas;
|
||||
|
||||
@ -1506,7 +1507,10 @@ MergeTreeData::DataPartsVector StorageReplicatedMergeTree::checkPartChecksumsAnd
|
||||
Coordination::Responses responses;
|
||||
Coordination::Error e = zookeeper->tryMulti(ops, responses);
|
||||
if (e == Coordination::Error::ZOK)
|
||||
{
|
||||
LOG_DEBUG(log, "Part {} committed to zookeeper", part->name);
|
||||
return transaction.commit();
|
||||
}
|
||||
|
||||
if (e == Coordination::Error::ZNODEEXISTS)
|
||||
{
|
||||
@ -8136,6 +8140,7 @@ zkutil::EphemeralNodeHolderPtr StorageReplicatedMergeTree::lockSharedDataTempora
|
||||
createZeroCopyLockNode(
|
||||
std::make_shared<ZooKeeperWithFaultInjection>(zookeeper), zookeeper_node, zkutil::CreateMode::Ephemeral, false);
|
||||
|
||||
LOG_TRACE(log, "Zookeeper temporary ephemeral lock {} created", zookeeper_node);
|
||||
return zkutil::EphemeralNodeHolder::existing(zookeeper_node, *zookeeper);
|
||||
}
|
||||
|
||||
@ -8144,6 +8149,7 @@ void StorageReplicatedMergeTree::lockSharedData(
|
||||
bool replace_existing_lock,
|
||||
std::optional<HardlinkedFiles> hardlinked_files) const
|
||||
{
|
||||
LOG_DEBUG(log, "Trying to create zero-copy lock for part {}", part.name);
|
||||
auto zookeeper = tryGetZooKeeper();
|
||||
if (zookeeper)
|
||||
return lockSharedData(part, std::make_shared<ZooKeeperWithFaultInjection>(zookeeper), replace_existing_lock, hardlinked_files);
|
||||
@ -9035,30 +9041,49 @@ void StorageReplicatedMergeTree::createZeroCopyLockNode(
|
||||
bool created = false;
|
||||
for (int attempts = 5; attempts > 0; --attempts)
|
||||
{
|
||||
try
|
||||
Coordination::Requests ops;
|
||||
Coordination::Responses responses;
|
||||
getZeroCopyLockNodeCreaetOps(zookeeper, zookeeper_node, ops, mode, replace_existing_lock, path_to_set_hardlinked_files, hardlinked_files);
|
||||
auto error = zookeeper->tryMulti(ops, responses);
|
||||
if (error == Coordination::Error::ZOK)
|
||||
{
|
||||
Coordination::Requests ops;
|
||||
Coordination::Responses responses;
|
||||
getZeroCopyLockNodeCreaetOps(zookeeper, zookeeper_node, ops, mode, replace_existing_lock, path_to_set_hardlinked_files, hardlinked_files);
|
||||
auto error = zookeeper->tryMulti(ops, responses);
|
||||
if (error == Coordination::Error::ZOK)
|
||||
{
|
||||
created = true;
|
||||
break;
|
||||
}
|
||||
else if (error == Coordination::Error::ZNONODE && mode != zkutil::CreateMode::Persistent)
|
||||
{
|
||||
throw Exception(ErrorCodes::NOT_FOUND_NODE,
|
||||
"Cannot create ephemeral zero copy lock {} because part was unlocked from zookeeper", zookeeper_node);
|
||||
}
|
||||
created = true;
|
||||
break;
|
||||
}
|
||||
catch (const zkutil::KeeperException & e)
|
||||
else if (mode == zkutil::CreateMode::Persistent)
|
||||
{
|
||||
if (e.code == Coordination::Error::ZNONODE)
|
||||
if (error == Coordination::Error::ZNONODE)
|
||||
continue;
|
||||
|
||||
throw;
|
||||
if (error == Coordination::Error::ZNODEEXISTS)
|
||||
{
|
||||
size_t failed_op = zkutil::getFailedOpIndex(error, responses);
|
||||
/// Part was locked before, unfortunately it's possible during moves
|
||||
if (ops[failed_op]->getPath() == zookeeper_node)
|
||||
{
|
||||
created = true;
|
||||
break;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
}
|
||||
else if (mode == zkutil::CreateMode::Ephemeral)
|
||||
{
|
||||
/// It is super rare case when we had part, but it was lost and we were unable to unlock it from keeper.
|
||||
/// Now we are trying to fetch it from other replica and unlocking.
|
||||
if (error == Coordination::Error::ZNODEEXISTS)
|
||||
{
|
||||
size_t failed_op = zkutil::getFailedOpIndex(error, responses);
|
||||
if (ops[failed_op]->getPath() == zookeeper_node)
|
||||
{
|
||||
LOG_WARNING(&Poco::Logger::get("ZeroCopyLocks"), "Replacing persistent lock with ephemeral for path {}. It can happen only in case of local part loss", zookeeper_node);
|
||||
replace_existing_lock = true;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
zkutil::KeeperMultiException::check(error, ops, responses);
|
||||
}
|
||||
|
||||
if (!created)
|
||||
|
76
tests/queries/0_stateless/02725_start_stop_fetches.sh
Executable file
76
tests/queries/0_stateless/02725_start_stop_fetches.sh
Executable file
@ -0,0 +1,76 @@
|
||||
#!/usr/bin/env bash
|
||||
# Tags: race, zookeeper, no-parallel, no-upgrade-check, no-replicated-database
|
||||
|
||||
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
# shellcheck source=../shell_config.sh
|
||||
. "$CURDIR"/../shell_config.sh
|
||||
|
||||
set -e
|
||||
|
||||
NUM_REPLICAS=5
|
||||
|
||||
for i in $(seq 1 $NUM_REPLICAS); do
|
||||
$CLICKHOUSE_CLIENT -n -q "
|
||||
DROP TABLE IF EXISTS r$i SYNC;
|
||||
CREATE TABLE r$i (x UInt64) ENGINE = ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/r', 'r$i') ORDER BY x SETTINGS replicated_deduplication_window = 1, allow_remote_fs_zero_copy_replication = 1;
|
||||
"
|
||||
done
|
||||
|
||||
function thread {
|
||||
while true; do
|
||||
REPLICA=$(($RANDOM % 5 + 1))
|
||||
$CLICKHOUSE_CLIENT --query "INSERT INTO r$REPLICA SELECT rand()"
|
||||
done
|
||||
}
|
||||
|
||||
function nemesis_thread1 {
|
||||
while true; do
|
||||
REPLICA=$(($RANDOM % 5 + 1))
|
||||
$CLICKHOUSE_CLIENT --query "SYSTEM STOP REPLICATED SENDS r$REPLICA"
|
||||
sleep 0.5
|
||||
$CLICKHOUSE_CLIENT --query "SYSTEM START REPLICATED SENDS r$REPLICA"
|
||||
done
|
||||
}
|
||||
|
||||
function nemesis_thread2 {
|
||||
while true; do
|
||||
REPLICA=$(($RANDOM % 5 + 1))
|
||||
$CLICKHOUSE_CLIENT --query "SYSTEM STOP FETCHES r$REPLICA"
|
||||
sleep 0.5
|
||||
$CLICKHOUSE_CLIENT --query "SYSTEM START FETCHES r$REPLICA"
|
||||
done
|
||||
}
|
||||
|
||||
|
||||
|
||||
export -f thread
|
||||
export -f nemesis_thread1
|
||||
export -f nemesis_thread2
|
||||
|
||||
TIMEOUT=20
|
||||
|
||||
timeout $TIMEOUT bash -c thread & >/dev/null
|
||||
timeout $TIMEOUT bash -c thread & >/dev/null
|
||||
timeout $TIMEOUT bash -c thread & >/dev/null
|
||||
timeout $TIMEOUT bash -c nemesis_thread1 & >/dev/null
|
||||
timeout $TIMEOUT bash -c nemesis_thread1 & >/dev/null
|
||||
timeout $TIMEOUT bash -c nemesis_thread1 & >/dev/null
|
||||
timeout $TIMEOUT bash -c nemesis_thread2 & >/dev/null
|
||||
timeout $TIMEOUT bash -c nemesis_thread2 & >/dev/null
|
||||
timeout $TIMEOUT bash -c nemesis_thread2 & >/dev/null
|
||||
|
||||
wait
|
||||
|
||||
|
||||
for i in $(seq 1 $NUM_REPLICAS); do
|
||||
$CLICKHOUSE_CLIENT -q "SYSTEM START FETCHES r$REPLICA"
|
||||
$CLICKHOUSE_CLIENT -q "SYSTEM START REPLICATED SENDS r$REPLICA"
|
||||
done
|
||||
|
||||
for i in $(seq 1 $NUM_REPLICAS); do
|
||||
$CLICKHOUSE_CLIENT --max_execution_time 60 -q "SYSTEM SYNC REPLICA r$i PULL"
|
||||
done
|
||||
|
||||
for i in $(seq 1 $NUM_REPLICAS); do
|
||||
$CLICKHOUSE_CLIENT -q "DROP TABLE r$i"
|
||||
done
|
Loading…
Reference in New Issue
Block a user