mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-24 00:22:29 +00:00
fix test_hdfs_zero_copy_replication_move[tiered_copy-2]
This commit is contained in:
parent
5047c758f4
commit
b4e6689bf9
@ -344,17 +344,6 @@ void IDiskRemote::replaceFile(const String & from_path, const String & to_path)
|
||||
}
|
||||
|
||||
|
||||
void IDiskRemote::removeFileIfExists(const String & path)
|
||||
{
|
||||
RemoteFSPathKeeperPtr fs_paths_keeper = createFSPathKeeper();
|
||||
if (fs::exists(fs::path(metadata_path) / path))
|
||||
{
|
||||
removeMeta(path, fs_paths_keeper);
|
||||
removeFromRemoteFS(fs_paths_keeper);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void IDiskRemote::removeSharedFile(const String & path, bool keep_in_remote_fs)
|
||||
{
|
||||
RemoteFSPathKeeperPtr fs_paths_keeper = createFSPathKeeper();
|
||||
@ -364,6 +353,18 @@ void IDiskRemote::removeSharedFile(const String & path, bool keep_in_remote_fs)
|
||||
}
|
||||
|
||||
|
||||
void IDiskRemote::removeSharedFileIfExists(const String & path, bool keep_in_remote_fs)
|
||||
{
|
||||
RemoteFSPathKeeperPtr fs_paths_keeper = createFSPathKeeper();
|
||||
if (fs::exists(fs::path(metadata_path) / path))
|
||||
{
|
||||
removeMeta(path, fs_paths_keeper);
|
||||
if (!keep_in_remote_fs)
|
||||
removeFromRemoteFS(fs_paths_keeper);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void IDiskRemote::removeSharedRecursive(const String & path, bool keep_in_remote_fs)
|
||||
{
|
||||
RemoteFSPathKeeperPtr fs_paths_keeper = createFSPathKeeper();
|
||||
|
@ -83,12 +83,14 @@ public:
|
||||
|
||||
void removeFile(const String & path) override { removeSharedFile(path, false); }
|
||||
|
||||
void removeFileIfExists(const String & path) override;
|
||||
void removeFileIfExists(const String & path) override { removeSharedFileIfExists(path, false); }
|
||||
|
||||
void removeRecursive(const String & path) override { removeSharedRecursive(path, false); }
|
||||
|
||||
void removeSharedFile(const String & path, bool keep_in_remote_fs) override;
|
||||
|
||||
void removeSharedFileIfExists(const String & path, bool keep_in_remote_fs) override;
|
||||
|
||||
void removeSharedRecursive(const String & path, bool keep_in_remote_fs) override;
|
||||
|
||||
void listFiles(const String & path, std::vector<String> & file_names) override;
|
||||
|
@ -43,7 +43,6 @@ namespace ErrorCodes
|
||||
extern const int S3_ERROR;
|
||||
extern const int INCORRECT_PART_TYPE;
|
||||
extern const int ZERO_COPY_REPLICATION_ERROR;
|
||||
extern const int NO_RESERVATIONS_PROVIDED;
|
||||
}
|
||||
|
||||
namespace DataPartsExchange
|
||||
|
@ -1103,7 +1103,7 @@ void IMergeTreeDataPart::renameTo(const String & new_relative_path, bool remove_
|
||||
|
||||
std::optional<bool> IMergeTreeDataPart::keepSharedDataInDecoupledStorage() const
|
||||
{
|
||||
/// NOTE: It's needed for S3 zero-copy replication
|
||||
/// NOTE: It's needed for zero-copy replication
|
||||
if (force_keep_shared_data)
|
||||
return true;
|
||||
|
||||
|
@ -2751,15 +2751,12 @@ void MergeTreeData::swapActivePart(MergeTreeData::DataPartPtr part_copy)
|
||||
|
||||
original_active_part->force_keep_shared_data = false;
|
||||
|
||||
auto original_disk_type = original_active_part->volume->getDisk()->getType();
|
||||
if (original_disk_type == DiskType::Type::S3 || original_disk_type == DiskType::Type::HDFS)
|
||||
if (original_active_part->volume->getDisk()->supportZeroCopyReplication() &&
|
||||
part_copy->volume->getDisk()->supportZeroCopyReplication() &&
|
||||
original_active_part->getUniqueId() == part_copy->getUniqueId())
|
||||
{
|
||||
if (part_copy->volume->getDisk()->getType() == original_disk_type
|
||||
&& original_active_part->getUniqueId() == part_copy->getUniqueId())
|
||||
{
|
||||
/// May be when several volumes use the same S3 storage
|
||||
original_active_part->force_keep_shared_data = true;
|
||||
}
|
||||
/// May be when several volumes use the same S3/HDFS storage
|
||||
original_active_part->force_keep_shared_data = true;
|
||||
}
|
||||
|
||||
modifyPartState(original_active_part, DataPartState::DeleteOnDestroy);
|
||||
|
@ -7310,17 +7310,7 @@ String StorageReplicatedMergeTree::getSharedDataReplica(
|
||||
if (!zookeeper)
|
||||
return best_replica;
|
||||
|
||||
String zero_copy;
|
||||
if (disk_type == DiskType::Type::S3)
|
||||
{
|
||||
zero_copy = "zero_copy_s3";
|
||||
}
|
||||
else if (disk_type == DiskType::Type::HDFS)
|
||||
{
|
||||
zero_copy = "zero_copy_hdfs";
|
||||
}
|
||||
else
|
||||
return best_replica;
|
||||
String zero_copy = fmt::format("zero_copy_{}", DiskType::toString(disk_type));
|
||||
String zookeeper_part_node = fs::path(zookeeper_path) / zero_copy / "shared" / part.name;
|
||||
|
||||
Strings ids;
|
||||
|
@ -34,6 +34,7 @@ sudo -H pip install \
|
||||
protobuf \
|
||||
psycopg2-binary \
|
||||
pymongo \
|
||||
pytz \
|
||||
pytest \
|
||||
pytest-timeout \
|
||||
redis \
|
||||
|
@ -49,6 +49,19 @@ def cluster():
|
||||
cluster.shutdown()
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def cleanup_after_test(cluster):
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
for instance in cluster.instances.values():
|
||||
instance.query("DROP TABLE IF EXISTS hdfs_test NO DELAY")
|
||||
instance.query("DROP TABLE IF EXISTS single_node_move_test NO DELAY")
|
||||
instance.query("DROP TABLE IF EXISTS move_test NO DELAY")
|
||||
instance.query("DROP TABLE IF EXISTS ttl_move_test NO DELAY")
|
||||
instance.query("DROP TABLE IF EXISTS ttl_delete_test NO DELAY")
|
||||
|
||||
|
||||
def test_hdfs_zero_copy_replication_insert(cluster):
|
||||
node1 = cluster.instances["node1"]
|
||||
node2 = cluster.instances["node2"]
|
||||
@ -75,8 +88,6 @@ def test_hdfs_zero_copy_replication_insert(cluster):
|
||||
|
||||
wait_for_hdfs_objects(cluster, "/clickhouse1", SHARDS * FILES_OVERHEAD_PER_TABLE + FILES_OVERHEAD_PER_PART_COMPACT)
|
||||
|
||||
node1.query("DROP TABLE IF EXISTS hdfs_test NO DELAY")
|
||||
node2.query("DROP TABLE IF EXISTS hdfs_test NO DELAY")
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
@ -88,7 +99,6 @@ def test_hdfs_zero_copy_replication_insert(cluster):
|
||||
def test_hdfs_zero_copy_replication_single_move(cluster, storage_policy, init_objects):
|
||||
node1 = cluster.instances["node1"]
|
||||
|
||||
node1.query("DROP TABLE IF EXISTS single_node_move_test NO DELAY")
|
||||
node1.query(
|
||||
Template("""
|
||||
CREATE TABLE single_node_move_test (dt DateTime, id Int64)
|
||||
@ -110,8 +120,6 @@ def test_hdfs_zero_copy_replication_single_move(cluster, storage_policy, init_ob
|
||||
node1.query("ALTER TABLE single_node_move_test MOVE PARTITION ID 'all' TO VOLUME 'main'")
|
||||
assert node1.query("SELECT id FROM single_node_move_test ORDER BY dt FORMAT Values") == "(10),(11)"
|
||||
|
||||
node1.query("DROP TABLE IF EXISTS single_node_move_test NO DELAY")
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("storage_policy", "init_objects"),
|
||||
@ -123,8 +131,6 @@ def test_hdfs_zero_copy_replication_move(cluster, storage_policy, init_objects):
|
||||
node1 = cluster.instances["node1"]
|
||||
node2 = cluster.instances["node2"]
|
||||
|
||||
node1.query("DROP TABLE IF EXISTS move_test NO DELAY")
|
||||
node2.query("DROP TABLE IF EXISTS move_test NO DELAY")
|
||||
node1.query(
|
||||
Template("""
|
||||
CREATE TABLE move_test ON CLUSTER test_cluster (dt DateTime, id Int64)
|
||||
@ -151,9 +157,6 @@ def test_hdfs_zero_copy_replication_move(cluster, storage_policy, init_objects):
|
||||
assert node2.query("SELECT id FROM move_test ORDER BY dt FORMAT Values") == "(10),(11)"
|
||||
wait_for_hdfs_objects(cluster, "/clickhouse1", init_objects + FILES_OVERHEAD_PER_PART_COMPACT)
|
||||
|
||||
node1.query("DROP TABLE IF EXISTS move_test NO DELAY")
|
||||
node2.query("DROP TABLE IF EXISTS move_test NO DELAY")
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("storage_policy"), ["hybrid", "tiered", "tiered_copy"]
|
||||
@ -162,8 +165,6 @@ def test_hdfs_zero_copy_with_ttl_move(cluster, storage_policy):
|
||||
node1 = cluster.instances["node1"]
|
||||
node2 = cluster.instances["node2"]
|
||||
|
||||
node1.query("DROP TABLE IF EXISTS ttl_move_test NO DELAY")
|
||||
node2.query("DROP TABLE IF EXISTS ttl_move_test NO DELAY")
|
||||
node1.query(
|
||||
Template("""
|
||||
CREATE TABLE ttl_move_test ON CLUSTER test_cluster (dt DateTime, id Int64)
|
||||
@ -185,17 +186,11 @@ def test_hdfs_zero_copy_with_ttl_move(cluster, storage_policy):
|
||||
assert node1.query("SELECT id FROM ttl_move_test ORDER BY id FORMAT Values") == "(10),(11)"
|
||||
assert node2.query("SELECT id FROM ttl_move_test ORDER BY id FORMAT Values") == "(10),(11)"
|
||||
|
||||
node1.query("DROP TABLE IF EXISTS ttl_move_test NO DELAY")
|
||||
node2.query("DROP TABLE IF EXISTS ttl_move_test NO DELAY")
|
||||
|
||||
|
||||
def test_hdfs_zero_copy_with_ttl_delete(cluster):
|
||||
node1 = cluster.instances["node1"]
|
||||
node2 = cluster.instances["node2"]
|
||||
|
||||
node1.query("DROP TABLE IF EXISTS ttl_delete_test NO DELAY")
|
||||
node2.query("DROP TABLE IF EXISTS ttl_delete_test NO DELAY")
|
||||
|
||||
node1.query(
|
||||
"""
|
||||
CREATE TABLE ttl_delete_test ON CLUSTER test_cluster (dt DateTime, id Int64)
|
||||
@ -216,6 +211,3 @@ def test_hdfs_zero_copy_with_ttl_delete(cluster):
|
||||
assert node2.query("SELECT count() FROM ttl_delete_test FORMAT Values") == "(1)"
|
||||
assert node1.query("SELECT id FROM ttl_delete_test ORDER BY id FORMAT Values") == "(11)"
|
||||
assert node2.query("SELECT id FROM ttl_delete_test ORDER BY id FORMAT Values") == "(11)"
|
||||
|
||||
node1.query("DROP TABLE IF EXISTS ttl_delete_test NO DELAY")
|
||||
node2.query("DROP TABLE IF EXISTS ttl_delete_test NO DELAY")
|
||||
|
Loading…
Reference in New Issue
Block a user