mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-25 17:12:03 +00:00
Fix zero-copy-replication on encrypted disks.
This commit is contained in:
parent
e428ed5543
commit
6b71cb1c35
@ -360,6 +360,19 @@ SyncGuardPtr DiskEncrypted::getDirectorySyncGuard(const String & path) const
|
||||
return delegate->getDirectorySyncGuard(wrapped_path);
|
||||
}
|
||||
|
||||
std::unordered_map<String, String> DiskEncrypted::getSerializedMetadata(const std::vector<String> & paths) const
|
||||
{
|
||||
std::vector<String> wrapped_paths;
|
||||
wrapped_paths.reserve(paths.size());
|
||||
for (const auto & path : paths)
|
||||
wrapped_paths.emplace_back(wrappedPath(path));
|
||||
auto metadata = delegate->getSerializedMetadata(wrapped_paths);
|
||||
std::unordered_map<String, String> res;
|
||||
for (size_t i = 0; i != paths.size(); ++i)
|
||||
res.emplace(paths[i], metadata.at(wrapped_paths.at(i)));
|
||||
return res;
|
||||
}
|
||||
|
||||
void DiskEncrypted::applyNewSettings(
|
||||
const Poco::Util::AbstractConfiguration & config,
|
||||
ContextPtr /*context*/,
|
||||
|
@ -225,6 +225,11 @@ public:
|
||||
return delegate->getUniqueId(wrapped_path);
|
||||
}
|
||||
|
||||
bool checkUniqueId(const String & id) const override
|
||||
{
|
||||
return delegate->checkUniqueId(id);
|
||||
}
|
||||
|
||||
void onFreeze(const String & path) override
|
||||
{
|
||||
auto wrapped_path = wrappedPath(path);
|
||||
@ -276,6 +281,8 @@ public:
|
||||
return delegate->getMetadataStorage();
|
||||
}
|
||||
|
||||
std::unordered_map<String, String> getSerializedMetadata(const std::vector<String> & paths) const override;
|
||||
|
||||
DiskPtr getDelegateDiskIfExists() const override
|
||||
{
|
||||
return delegate;
|
||||
|
@ -0,0 +1,16 @@
|
||||
<clickhouse>
|
||||
<remote_servers>
|
||||
<cluster>
|
||||
<shard>
|
||||
<replica>
|
||||
<host>node1</host>
|
||||
<port>9000</port>
|
||||
</replica>
|
||||
<replica>
|
||||
<host>node2</host>
|
||||
<port>9000</port>
|
||||
</replica>
|
||||
</shard>
|
||||
</cluster>
|
||||
</remote_servers>
|
||||
</clickhouse>
|
@ -0,0 +1,49 @@
|
||||
<clickhouse>
|
||||
<storage_configuration>
|
||||
<disks>
|
||||
<disk_s3>
|
||||
<type>s3</type>
|
||||
<endpoint>http://minio1:9001/root/data/</endpoint>
|
||||
<access_key_id>minio</access_key_id>
|
||||
<secret_access_key>minio123</secret_access_key>
|
||||
</disk_s3>
|
||||
<disk_s3_encrypted>
|
||||
<type>encrypted</type>
|
||||
<disk>disk_s3</disk>
|
||||
<key>1234567812345678</key>
|
||||
<path>encrypted/</path>
|
||||
</disk_s3_encrypted>
|
||||
<disk_s3_encrypted_with_diff_keys>
|
||||
<type>encrypted</type>
|
||||
<disk>disk_s3</disk>
|
||||
<path>encrypted_with_diff_keys/</path>
|
||||
</disk_s3_encrypted_with_diff_keys>
|
||||
</disks>
|
||||
<policies>
|
||||
<s3_policy>
|
||||
<volumes>
|
||||
<main>
|
||||
<disk>disk_s3</disk>
|
||||
</main>
|
||||
<external>
|
||||
<disk>disk_s3_encrypted</disk>
|
||||
</external>
|
||||
</volumes>
|
||||
</s3_policy>
|
||||
<s3_encrypted_policy>
|
||||
<volumes>
|
||||
<main>
|
||||
<disk>disk_s3_encrypted</disk>
|
||||
</main>
|
||||
</volumes>
|
||||
</s3_encrypted_policy>
|
||||
<s3_encrypted_policy_with_diff_keys>
|
||||
<volumes>
|
||||
<main>
|
||||
<disk>disk_s3_encrypted_with_diff_keys</disk>
|
||||
</main>
|
||||
</volumes>
|
||||
</s3_encrypted_policy_with_diff_keys>
|
||||
</policies>
|
||||
</storage_configuration>
|
||||
</clickhouse>
|
@ -0,0 +1,9 @@
|
||||
<clickhouse>
|
||||
<storage_configuration>
|
||||
<disks>
|
||||
<disk_s3_encrypted_with_diff_keys>
|
||||
<key>1111111111111111</key>
|
||||
</disk_s3_encrypted_with_diff_keys>
|
||||
</disks>
|
||||
</storage_configuration>
|
||||
</clickhouse>
|
@ -0,0 +1,9 @@
|
||||
<clickhouse>
|
||||
<storage_configuration>
|
||||
<disks>
|
||||
<disk_s3_encrypted_with_diff_keys>
|
||||
<key>2222222222222222</key>
|
||||
</disk_s3_encrypted_with_diff_keys>
|
||||
</disks>
|
||||
</storage_configuration>
|
||||
</clickhouse>
|
92
tests/integration/test_encrypted_disk_replication/test.py
Normal file
92
tests/integration/test_encrypted_disk_replication/test.py
Normal file
@ -0,0 +1,92 @@
|
||||
import pytest
|
||||
from helpers.cluster import ClickHouseCluster
|
||||
from helpers.test_tools import TSV
|
||||
|
||||
|
||||
cluster = ClickHouseCluster(__file__)
|
||||
|
||||
node1 = cluster.add_instance(
|
||||
"node1",
|
||||
main_configs=[
|
||||
"configs/cluster.xml",
|
||||
"configs/disk_s3_encrypted.xml",
|
||||
"configs/disk_s3_encrypted_node1.xml",
|
||||
],
|
||||
macros={"replica": "node1"},
|
||||
with_zookeeper=True,
|
||||
with_minio=True,
|
||||
)
|
||||
|
||||
node2 = cluster.add_instance(
|
||||
"node2",
|
||||
main_configs=[
|
||||
"configs/cluster.xml",
|
||||
"configs/disk_s3_encrypted.xml",
|
||||
"configs/disk_s3_encrypted_node2.xml",
|
||||
],
|
||||
macros={"replica": "node2"},
|
||||
with_zookeeper=True,
|
||||
with_minio=True,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture(scope="module", autouse=True)
|
||||
def start_cluster():
|
||||
try:
|
||||
cluster.start()
|
||||
yield cluster
|
||||
finally:
|
||||
cluster.shutdown()
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def cleanup_after_test():
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
node1.query("DROP TABLE IF EXISTS encrypted_test ON CLUSTER 'cluster' NO DELAY")
|
||||
|
||||
|
||||
def create_table(
|
||||
zero_copy_replication=False, storage_policy="s3_encrypted_policy_with_diff_keys"
|
||||
):
|
||||
engine = "ReplicatedMergeTree('/clickhouse/tables/encrypted_test/', '{replica}')"
|
||||
|
||||
settings = f"storage_policy='{storage_policy}'"
|
||||
if zero_copy_replication:
|
||||
settings += ", allow_remote_fs_zero_copy_replication=true"
|
||||
|
||||
node1.query(
|
||||
f"""
|
||||
CREATE TABLE encrypted_test ON CLUSTER 'cluster' (
|
||||
id Int64,
|
||||
data String
|
||||
) ENGINE={engine}
|
||||
ORDER BY id
|
||||
SETTINGS {settings}
|
||||
"""
|
||||
)
|
||||
|
||||
|
||||
def check_replication():
|
||||
node1.query("INSERT INTO encrypted_test VALUES (0, 'a'), (1, 'b')")
|
||||
node2.query("INSERT INTO encrypted_test VALUES (2, 'c'), (3, 'd')")
|
||||
|
||||
node1.query("SYSTEM SYNC REPLICA ON CLUSTER 'cluster' encrypted_test")
|
||||
|
||||
select_query = "SELECT * FROM encrypted_test ORDER BY id"
|
||||
|
||||
assert node1.query(select_query) == TSV([[0, "a"], [1, "b"], [2, "c"], [3, "d"]])
|
||||
assert node2.query(select_query) == TSV([[0, "a"], [1, "b"], [2, "c"], [3, "d"]])
|
||||
|
||||
|
||||
def test_replication():
|
||||
create_table(
|
||||
zero_copy_replication=False, storage_policy="s3_encrypted_policy_with_diff_keys"
|
||||
)
|
||||
check_replication()
|
||||
|
||||
|
||||
def test_zero_copy_replication():
|
||||
create_table(zero_copy_replication=True, storage_policy="s3_encrypted_policy")
|
||||
check_replication()
|
Loading…
Reference in New Issue
Block a user