Merge pull request #22378 from ianton-ru/s3_zero_copy_replication

Fix S3 Zero-Copy replication for hybrid storage
This commit is contained in:
alexey-milovidov 2021-03-31 04:15:47 +03:00 committed by GitHub
commit a21fe0b492
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 57 additions and 4 deletions

View File

@ -206,18 +206,18 @@ MergeTreeData::DataPartPtr MergeTreePartsMover::clonePart(const MergeTreeMoveEnt
/// Try to fetch part from S3 without copy and fallback to default copy
/// if it's not possible
moving_part.part->assertOnDisk();
String path_to_clone = data->getRelativeDataPath() + directory_to_move + '/';
String path_to_clone = data->getRelativeDataPath() + directory_to_move + "/";
String relative_path = part->relative_path;
if (disk->exists(path_to_clone + relative_path))
{
LOG_WARNING(log, "Path " + fullPath(disk, path_to_clone + relative_path) + " already exists. Will remove it and clone again.");
disk->removeRecursive(path_to_clone + relative_path + '/');
disk->removeRecursive(path_to_clone + relative_path + "/");
}
disk->createDirectories(path_to_clone);
bool is_fetched = data->tryToFetchIfShared(*part, disk, path_to_clone + "/" + part->name);
if (!is_fetched)
part->volume->getDisk()->copy(data->getRelativeDataPath() + relative_path, disk, path_to_clone);
part->volume->getDisk()->removeFileIfExists(path_to_clone + '/' + IMergeTreeDataPart::DELETE_ON_DESTROY_MARKER_FILE_NAME);
part->volume->getDisk()->copy(data->getRelativeDataPath() + relative_path + "/", disk, path_to_clone);
part->volume->getDisk()->removeFileIfExists(path_to_clone + "/" + IMergeTreeDataPart::DELETE_ON_DESTROY_MARKER_FILE_NAME);
}
else
{

View File

@ -17,6 +17,16 @@
</main>
</volumes>
</s3>
<hybrid>
<volumes>
<main>
<disk>default</disk>
</main>
<external>
<disk>s31</disk>
</external>
</volumes>
</hybrid>
</policies>
</storage_configuration>

View File

@ -88,3 +88,46 @@ def test_s3_zero_copy_replication(cluster, policy):
node1.query("DROP TABLE IF EXISTS s3_test NO DELAY")
node2.query("DROP TABLE IF EXISTS s3_test NO DELAY")
def test_s3_zero_copy_on_hybrid_storage(cluster):
node1 = cluster.instances["node1"]
node2 = cluster.instances["node2"]
node1.query(
"""
CREATE TABLE hybrid_test ON CLUSTER test_cluster (id UInt32, value String)
ENGINE=ReplicatedMergeTree('/clickhouse/tables/s3_test', '{}')
ORDER BY id
SETTINGS storage_policy='hybrid'
"""
.format('{replica}')
)
node1.query("INSERT INTO hybrid_test VALUES (0,'data'),(1,'data')")
time.sleep(1)
assert node1.query("SELECT * FROM hybrid_test ORDER BY id FORMAT Values") == "(0,'data'),(1,'data')"
assert node2.query("SELECT * FROM hybrid_test ORDER BY id FORMAT Values") == "(0,'data'),(1,'data')"
assert node1.query("SELECT partition_id,disk_name FROM system.parts WHERE table='hybrid_test' FORMAT Values") == "('all','default')"
assert node2.query("SELECT partition_id,disk_name FROM system.parts WHERE table='hybrid_test' FORMAT Values") == "('all','default')"
node1.query("ALTER TABLE hybrid_test MOVE PARTITION ID 'all' TO DISK 's31'")
assert node1.query("SELECT partition_id,disk_name FROM system.parts WHERE table='hybrid_test' FORMAT Values") == "('all','s31')"
assert node2.query("SELECT partition_id,disk_name FROM system.parts WHERE table='hybrid_test' FORMAT Values") == "('all','default')"
# Total objects in S3
s3_objects = get_large_objects_count(cluster, 0)
node2.query("ALTER TABLE hybrid_test MOVE PARTITION ID 'all' TO DISK 's31'")
assert node1.query("SELECT partition_id,disk_name FROM system.parts WHERE table='hybrid_test' FORMAT Values") == "('all','s31')"
assert node2.query("SELECT partition_id,disk_name FROM system.parts WHERE table='hybrid_test' FORMAT Values") == "('all','s31')"
# Check that after moving partition on node2 no new obects on s3
assert get_large_objects_count(cluster, 0) == s3_objects
assert node1.query("SELECT * FROM hybrid_test ORDER BY id FORMAT Values") == "(0,'data'),(1,'data')"
assert node2.query("SELECT * FROM hybrid_test ORDER BY id FORMAT Values") == "(0,'data'),(1,'data')"