Add test for s3 disk restore

This commit is contained in:
Antonio Andelic 2023-05-25 16:01:40 +00:00
parent bb77441acb
commit 6a8a21e09a
12 changed files with 212 additions and 12 deletions

View File

@ -616,7 +616,8 @@ Changelog::Changelog(Poco::Logger * log_, LogFileSettings log_file_settings, Kee
load_from_disk(disk);
auto current_log_disk = getCurrentLogDisk();
load_from_disk(current_log_disk);
if (disk != current_log_disk)
load_from_disk(current_log_disk);
if (existing_changelogs.empty())
LOG_WARNING(log, "No logs exists in {}. It's Ok if it's the first run of clickhouse-keeper.", disk->getPath());

View File

@ -612,7 +612,7 @@ SnapshotFileInfo KeeperSnapshotManager::serializeSnapshotBufferToDisk(nuraft::bu
disk->removeFile(tmp_snapshot_file_name);
existing_snapshots.emplace(up_to_log_idx, snapshot_file_name);
existing_snapshots.emplace(up_to_log_idx, SnapshotFileInfo{snapshot_file_name, disk});
removeOutdatedSnapshotsIfNeeded();
return {snapshot_file_name, disk};
@ -750,7 +750,7 @@ SnapshotFileInfo KeeperSnapshotManager::serializeSnapshotToDisk(const KeeperStor
disk->removeFile(tmp_snapshot_file_name);
existing_snapshots.emplace(up_to_log_idx, snapshot_file_name);
existing_snapshots.emplace(up_to_log_idx, SnapshotFileInfo{snapshot_file_name, disk});
removeOutdatedSnapshotsIfNeeded();
return {snapshot_file_name, disk};

View File

@ -0,0 +1,26 @@
<clickhouse>
<storage_configuration>
<disks>
<snapshot_s3_plain1>
<type>s3_plain</type>
<endpoint>http://minio1:9001/root/data/snapshots1/</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
</snapshot_s3_plain1>
<snapshot_s3_plain2>
<type>s3_plain</type>
<endpoint>http://minio1:9001/root/data/snapshots2/</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
</snapshot_s3_plain2>
<snapshot_s3_plain3>
<type>s3_plain</type>
<endpoint>http://minio1:9001/root/data/snapshots3/</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
</snapshot_s3_plain3>
</disks>
</storage_configuration>
</clickhouse>

View File

@ -2,8 +2,6 @@
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>1</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>

View File

@ -2,8 +2,6 @@
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>2</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>

View File

@ -2,8 +2,6 @@
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>3</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>

View File

@ -0,0 +1,5 @@
<clickhouse>
<keeper_server>
<snapshot_storage_disk>snapshot_s3_plain1</snapshot_storage_disk>
</keeper_server>
</clickhouse>

View File

@ -0,0 +1,5 @@
<clickhouse>
<keeper_server>
<snapshot_storage_disk>snapshot_s3_plain2</snapshot_storage_disk>
</keeper_server>
</clickhouse>

View File

@ -0,0 +1,5 @@
<clickhouse>
<keeper_server>
<snapshot_storage_disk>snapshot_s3_plain3</snapshot_storage_disk>
</keeper_server>
</clickhouse>

View File

@ -0,0 +1,6 @@
<clickhouse>
<keeper_server>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
</keeper_server>
</clickhouse>

View File

@ -9,13 +9,19 @@ import time
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance(
"node1", main_configs=["configs/enable_keeper1.xml"], stay_alive=True
"node1",
main_configs=["configs/enable_keeper1.xml", "configs/local_storage_path.xml"],
stay_alive=True,
)
node2 = cluster.add_instance(
"node2", main_configs=["configs/enable_keeper2.xml"], stay_alive=True
"node2",
main_configs=["configs/enable_keeper2.xml", "configs/local_storage_path.xml"],
stay_alive=True,
)
node3 = cluster.add_instance(
"node3", main_configs=["configs/enable_keeper3.xml"], stay_alive=True
"node3",
main_configs=["configs/enable_keeper3.xml", "configs/local_storage_path.xml"],
stay_alive=True,
)
from kazoo.client import KazooClient, KazooState

View File

@ -0,0 +1,152 @@
#!/usr/bin/env python3
import pytest
from helpers.cluster import ClickHouseCluster
import helpers.keeper_utils as keeper_utils
import random
import string
import os
import time
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance(
"node1",
main_configs=[
"configs/enable_keeper1.xml",
"configs/disk_s3_storage.xml",
"configs/keeper1_snapshot_disk.xml",
],
stay_alive=True,
with_minio=True,
)
node2 = cluster.add_instance(
"node2",
main_configs=[
"configs/enable_keeper2.xml",
"configs/disk_s3_storage.xml",
"configs/keeper2_snapshot_disk.xml",
],
stay_alive=True,
with_minio=True,
)
node3 = cluster.add_instance(
"node3",
main_configs=[
"configs/enable_keeper3.xml",
"configs/disk_s3_storage.xml",
"configs/keeper3_snapshot_disk.xml",
],
stay_alive=True,
with_minio=True,
)
from kazoo.client import KazooClient, KazooState
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def get_fake_zk(nodename, timeout=30.0):
_fake_zk_instance = KazooClient(
hosts=cluster.get_instance_ip(nodename) + ":9181", timeout=timeout
)
_fake_zk_instance.start()
return _fake_zk_instance
def stop_zk(zk):
try:
if zk:
zk.stop()
zk.close()
except:
pass
def test_recover_from_snapshot_with_disk_s3(started_cluster):
try:
node1_zk = node2_zk = node3_zk = None
node1_zk = get_fake_zk("node1")
node2_zk = get_fake_zk("node2")
node3_zk = get_fake_zk("node3")
node1_zk.create("/test_snapshot_multinode_recover", "somedata".encode())
node2_zk.sync("/test_snapshot_multinode_recover")
node3_zk.sync("/test_snapshot_multinode_recover")
assert node1_zk.get("/test_snapshot_multinode_recover")[0] == b"somedata"
assert node2_zk.get("/test_snapshot_multinode_recover")[0] == b"somedata"
assert node3_zk.get("/test_snapshot_multinode_recover")[0] == b"somedata"
node3.stop_clickhouse(kill=True)
# at least we will have 2 snapshots
for i in range(435):
node1_zk.create(
"/test_snapshot_multinode_recover" + str(i),
("somedata" + str(i)).encode(),
)
for i in range(435):
if i % 10 == 0:
node1_zk.delete("/test_snapshot_multinode_recover" + str(i))
finally:
for zk in [node1_zk, node2_zk, node3_zk]:
stop_zk(zk)
# stale node should recover from leader's snapshot
# with some sanitizers can start longer than 5 seconds
node3.start_clickhouse(20)
keeper_utils.wait_until_connected(cluster, node3)
print("Restarted")
try:
node1_zk = node2_zk = node3_zk = None
node1_zk = get_fake_zk("node1")
node2_zk = get_fake_zk("node2")
node3_zk = get_fake_zk("node3")
node1_zk.sync("/test_snapshot_multinode_recover")
node2_zk.sync("/test_snapshot_multinode_recover")
node3_zk.sync("/test_snapshot_multinode_recover")
assert node1_zk.get("/test_snapshot_multinode_recover")[0] == b"somedata"
assert node2_zk.get("/test_snapshot_multinode_recover")[0] == b"somedata"
assert node3_zk.get("/test_snapshot_multinode_recover")[0] == b"somedata"
for i in range(435):
if i % 10 != 0:
assert (
node1_zk.get("/test_snapshot_multinode_recover" + str(i))[0]
== ("somedata" + str(i)).encode()
)
assert (
node2_zk.get("/test_snapshot_multinode_recover" + str(i))[0]
== ("somedata" + str(i)).encode()
)
assert (
node3_zk.get("/test_snapshot_multinode_recover" + str(i))[0]
== ("somedata" + str(i)).encode()
)
else:
assert (
node1_zk.exists("/test_snapshot_multinode_recover" + str(i)) is None
)
assert (
node2_zk.exists("/test_snapshot_multinode_recover" + str(i)) is None
)
assert (
node3_zk.exists("/test_snapshot_multinode_recover" + str(i)) is None
)
finally:
for zk in [node1_zk, node2_zk, node3_zk]:
stop_zk(zk)