Add a tests for a TTL bug in zero copy replication

This commit is contained in:
alesapin 2022-01-14 18:44:10 +03:00
parent 1254225061
commit bf803472b1
6 changed files with 113 additions and 14 deletions

View File

@ -200,7 +200,7 @@ MergeTreeData::DataPartPtr MergeTreePartsMover::clonePart(const MergeTreeMoveEnt
auto settings = data->getSettings();
auto part = moving_part.part;
auto disk = moving_part.reserved_space->getDisk();
LOG_DEBUG(log, "Cloning part {} from {} to {}", part->name, part->volume->getDisk()->getName(), disk->getName());
LOG_DEBUG(log, "Cloning part {} from '{}' to '{}'", part->name, part->volume->getDisk()->getName(), disk->getName());
const String directory_to_move = "moving";
if (disk->supportZeroCopyReplication() && settings->allow_remote_fs_zero_copy_replication)

View File

@ -7128,11 +7128,11 @@ bool StorageReplicatedMergeTree::unlockSharedData(const IMergeTreeDataPart & par
}
bool StorageReplicatedMergeTree::unlockSharedDataByID(String id, const String & table_uuid, const String & part_name,
bool StorageReplicatedMergeTree::unlockSharedDataByID(String part_id, const String & table_uuid, const String & part_name,
const String & replica_name_, DiskPtr disk, zkutil::ZooKeeperPtr zookeeper_ptr, const MergeTreeSettings & settings,
Poco::Logger * logger, const String & zookeeper_path_old)
{
boost::replace_all(id, "/", "_");
boost::replace_all(part_id, "/", "_");
Strings zc_zookeeper_paths = getZeroCopyPartPath(settings, disk->getType(), table_uuid, part_name, zookeeper_path_old);
@ -7140,13 +7140,16 @@ bool StorageReplicatedMergeTree::unlockSharedDataByID(String id, const String &
for (const auto & zc_zookeeper_path : zc_zookeeper_paths)
{
String zookeeper_part_uniq_node = fs::path(zc_zookeeper_path) / id;
String zookeeper_node = fs::path(zookeeper_part_uniq_node) / replica_name_;
String zookeeper_part_uniq_node = fs::path(zc_zookeeper_path) / part_id;
LOG_TRACE(logger, "Remove zookeeper lock {}", zookeeper_node);
/// Delete our replica node for part from zookeeper (we are not interested in it anymore)
String zookeeper_part_replica_node = fs::path(zookeeper_part_uniq_node) / replica_name_;
zookeeper_ptr->tryRemove(zookeeper_node);
LOG_TRACE(logger, "Remove zookeeper lock {}", zookeeper_part_replica_node);
zookeeper_ptr->tryRemove(zookeeper_part_replica_node);
/// Check, maybe we were the last replica and can remove part forever
Strings children;
zookeeper_ptr->tryGetChildren(zookeeper_part_uniq_node, children);
@ -7157,9 +7160,9 @@ bool StorageReplicatedMergeTree::unlockSharedDataByID(String id, const String &
continue;
}
auto e = zookeeper_ptr->tryRemove(zookeeper_part_uniq_node);
auto error_code = zookeeper_ptr->tryRemove(zookeeper_part_uniq_node);
LOG_TRACE(logger, "Remove parent zookeeper lock {} : {}", zookeeper_part_uniq_node, e != Coordination::Error::ZNOTEMPTY);
LOG_TRACE(logger, "Remove parent zookeeper lock {} : {}", zookeeper_part_uniq_node, error_code != Coordination::Error::ZNOTEMPTY);
/// Even when we have lock with same part name, but with different uniq, we can remove files on S3
children.clear();
@ -7168,9 +7171,9 @@ bool StorageReplicatedMergeTree::unlockSharedDataByID(String id, const String &
if (children.empty())
{
/// Cleanup after last uniq removing
e = zookeeper_ptr->tryRemove(zookeeper_part_node);
error_code = zookeeper_ptr->tryRemove(zookeeper_part_node);
LOG_TRACE(logger, "Remove parent zookeeper lock {} : {}", zookeeper_part_node, e != Coordination::Error::ZNOTEMPTY);
LOG_TRACE(logger, "Remove parent zookeeper lock {} : {}", zookeeper_part_node, error_code != Coordination::Error::ZNOTEMPTY);
}
else
{
@ -7213,7 +7216,7 @@ String StorageReplicatedMergeTree::getSharedDataReplica(
zkutil::ZooKeeperPtr zookeeper = tryGetZooKeeper();
if (!zookeeper)
return best_replica;
return "";
Strings zc_zookeeper_paths = getZeroCopyPartPath(*getSettings(), disk_type, getTableSharedID(), part.name,
zookeeper_path);
@ -7251,7 +7254,7 @@ String StorageReplicatedMergeTree::getSharedDataReplica(
LOG_TRACE(log, "Found zookeper active replicas for part {}: {}", part.name, active_replicas.size());
if (active_replicas.empty())
return best_replica;
return "";
/** You must select the best (most relevant) replica.
* This is a replica with the maximum `log_pointer`, then with the minimum `queue` size.

View File

@ -243,7 +243,7 @@ public:
/// Unlock shared data part in zookeeper by part id
/// Return true if data unlocked
/// Return false if data is still used by another node
static bool unlockSharedDataByID(String id, const String & table_uuid, const String & part_name, const String & replica_name_,
static bool unlockSharedDataByID(String part_id, const String & table_uuid, const String & part_name, const String & replica_name_,
DiskPtr disk, zkutil::ZooKeeperPtr zookeeper_, const MergeTreeSettings & settings, Poco::Logger * logger,
const String & zookeeper_path_old);

View File

@ -0,0 +1 @@
#!/usr/bin/env python3

View File

@ -0,0 +1,26 @@
<clickhouse>
<storage_configuration>
<disks>
<s3_disk>
<type>s3</type>
<endpoint>http://minio1:9001/root/data/</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
</s3_disk>
</disks>
<policies>
<s3_and_default>
<volumes>
<main>
<disk>default</disk>
</main>
<external>
<disk>s3_disk</disk>
</external>
</volumes>
</s3_and_default>
</policies>
</storage_configuration>
</clickhouse>

View File

@ -0,0 +1,69 @@
#!/usr/bin/env python3
import time
import pytest
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance("node1", main_configs=["configs/s3.xml"], with_minio=True, with_zookeeper=True)
node2 = cluster.add_instance("node2", main_configs=["configs/s3.xml"], with_minio=True, with_zookeeper=True)
node3 = cluster.add_instance("node3", main_configs=["configs/s3.xml"], with_minio=True, with_zookeeper=True)
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def test_ttl_move_and_s3(started_cluster):
for i, node in enumerate([node1, node2, node3]):
node.query(
"""
CREATE TABLE s3_test_with_ttl (date DateTime, id UInt32, value String)
ENGINE=ReplicatedMergeTree('/clickhouse/tables/s3_test', '{}')
ORDER BY id
PARTITION BY id
TTL date TO DISK 's3_disk'
SETTINGS storage_policy='s3_and_default'
""".format(i))
node1.query("SYSTEM STOP MOVES s3_test_with_ttl")
node2.query("SYSTEM STOP MOVES s3_test_with_ttl")
for i in range(30):
if i % 2 == 0:
node = node1
else:
node = node2
node.query(f"INSERT INTO s3_test_with_ttl SELECT now() + 5, {i}, randomPrintableASCII(1048570)")
node1.query("SYSTEM SYNC REPLICA s3_test_with_ttl")
node2.query("SYSTEM SYNC REPLICA s3_test_with_ttl")
node3.query("SYSTEM SYNC REPLICA s3_test_with_ttl")
assert node1.query("SELECT COUNT() FROM s3_test_with_ttl") == "30\n"
assert node2.query("SELECT COUNT() FROM s3_test_with_ttl") == "30\n"
node1.query("SYSTEM START MOVES s3_test_with_ttl")
node2.query("SYSTEM START MOVES s3_test_with_ttl")
assert node1.query("SELECT COUNT() FROM s3_test_with_ttl") == "30\n"
assert node2.query("SELECT COUNT() FROM s3_test_with_ttl") == "30\n"
time.sleep(5)
print(node1.query("SELECT * FROM system.parts WHERE table = 's3_test_with_ttl' FORMAT Vertical"))
minio = cluster.minio_client
objects = minio.list_objects(cluster.minio_bucket, 'data/', recursive=True)
counter = 0
for obj in objects:
print("Objectname:", obj.object_name, "metadata:", obj.metadata)
counter += 1
print("Total objects", counter)
assert counter == 300