mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 07:31:57 +00:00
Fix S3 Zero-Copy replication for hybrid storage
This commit is contained in:
parent
1f5052a7c5
commit
f87435213e
@ -206,18 +206,18 @@ MergeTreeData::DataPartPtr MergeTreePartsMover::clonePart(const MergeTreeMoveEnt
|
||||
/// Try to fetch part from S3 without copy and fallback to default copy
|
||||
/// if it's not possible
|
||||
moving_part.part->assertOnDisk();
|
||||
String path_to_clone = data->getRelativeDataPath() + directory_to_move + '/';
|
||||
String path_to_clone = data->getRelativeDataPath() + directory_to_move + "/";
|
||||
String relative_path = part->relative_path;
|
||||
if (disk->exists(path_to_clone + relative_path))
|
||||
{
|
||||
LOG_WARNING(log, "Path " + fullPath(disk, path_to_clone + relative_path) + " already exists. Will remove it and clone again.");
|
||||
disk->removeRecursive(path_to_clone + relative_path + '/');
|
||||
disk->removeRecursive(path_to_clone + relative_path + "/");
|
||||
}
|
||||
disk->createDirectories(path_to_clone);
|
||||
bool is_fetched = data->tryToFetchIfShared(*part, disk, path_to_clone + "/" + part->name);
|
||||
if (!is_fetched)
|
||||
part->volume->getDisk()->copy(data->getRelativeDataPath() + relative_path, disk, path_to_clone);
|
||||
part->volume->getDisk()->removeFileIfExists(path_to_clone + '/' + IMergeTreeDataPart::DELETE_ON_DESTROY_MARKER_FILE_NAME);
|
||||
part->volume->getDisk()->copy(data->getRelativeDataPath() + relative_path + "/", disk, path_to_clone);
|
||||
part->volume->getDisk()->removeFileIfExists(path_to_clone + "/" + IMergeTreeDataPart::DELETE_ON_DESTROY_MARKER_FILE_NAME);
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@ -17,6 +17,16 @@
|
||||
</main>
|
||||
</volumes>
|
||||
</s3>
|
||||
<hybrid>
|
||||
<volumes>
|
||||
<main>
|
||||
<disk>default</disk>
|
||||
</main>
|
||||
<external>
|
||||
<disk>s31</disk>
|
||||
</external>
|
||||
</volumes>
|
||||
</hybrid>
|
||||
</policies>
|
||||
</storage_configuration>
|
||||
|
||||
|
@ -88,3 +88,46 @@ def test_s3_zero_copy_replication(cluster, policy):
|
||||
node1.query("DROP TABLE IF EXISTS s3_test NO DELAY")
|
||||
node2.query("DROP TABLE IF EXISTS s3_test NO DELAY")
|
||||
|
||||
|
||||
def test_s3_zero_copy_on_hybrid_storage(cluster):
|
||||
node1 = cluster.instances["node1"]
|
||||
node2 = cluster.instances["node2"]
|
||||
|
||||
node1.query(
|
||||
"""
|
||||
CREATE TABLE hybrid_test ON CLUSTER test_cluster (id UInt32, value String)
|
||||
ENGINE=ReplicatedMergeTree('/clickhouse/tables/s3_test', '{}')
|
||||
ORDER BY id
|
||||
SETTINGS storage_policy='hybrid'
|
||||
"""
|
||||
.format('{replica}')
|
||||
)
|
||||
|
||||
node1.query("INSERT INTO hybrid_test VALUES (0,'data'),(1,'data')")
|
||||
|
||||
time.sleep(1)
|
||||
|
||||
assert node1.query("SELECT * FROM hybrid_test ORDER BY id FORMAT Values") == "(0,'data'),(1,'data')"
|
||||
assert node2.query("SELECT * FROM hybrid_test ORDER BY id FORMAT Values") == "(0,'data'),(1,'data')"
|
||||
|
||||
assert node1.query("SELECT partition_id,disk_name FROM system.parts WHERE table='hybrid_test' FORMAT Values") == "('all','default')"
|
||||
assert node2.query("SELECT partition_id,disk_name FROM system.parts WHERE table='hybrid_test' FORMAT Values") == "('all','default')"
|
||||
|
||||
node1.query("ALTER TABLE hybrid_test MOVE PARTITION ID 'all' TO DISK 's31'")
|
||||
|
||||
assert node1.query("SELECT partition_id,disk_name FROM system.parts WHERE table='hybrid_test' FORMAT Values") == "('all','s31')"
|
||||
assert node2.query("SELECT partition_id,disk_name FROM system.parts WHERE table='hybrid_test' FORMAT Values") == "('all','default')"
|
||||
|
||||
# Total objects in S3
|
||||
s3_objects = get_large_objects_count(cluster, 0)
|
||||
|
||||
node2.query("ALTER TABLE hybrid_test MOVE PARTITION ID 'all' TO DISK 's31'")
|
||||
|
||||
assert node1.query("SELECT partition_id,disk_name FROM system.parts WHERE table='hybrid_test' FORMAT Values") == "('all','s31')"
|
||||
assert node2.query("SELECT partition_id,disk_name FROM system.parts WHERE table='hybrid_test' FORMAT Values") == "('all','s31')"
|
||||
|
||||
# Check that after moving partition on node2 no new obects on s3
|
||||
assert get_large_objects_count(cluster, 0) == s3_objects
|
||||
|
||||
assert node1.query("SELECT * FROM hybrid_test ORDER BY id FORMAT Values") == "(0,'data'),(1,'data')"
|
||||
assert node2.query("SELECT * FROM hybrid_test ORDER BY id FORMAT Values") == "(0,'data'),(1,'data')"
|
||||
|
Loading…
Reference in New Issue
Block a user