Fix nasty serialization bug

This commit is contained in:
alesapin 2021-03-03 15:21:21 +03:00
parent 1707e7f1c3
commit 37fc5faa6f
5 changed files with 138 additions and 4 deletions

View File

@ -36,6 +36,12 @@ namespace
return std::string{"snapshot_"} + std::to_string(up_to_log_idx) + ".bin";
}
std::string getBaseName(const String & path)
{
size_t basename_start = path.rfind('/');
return std::string{&path[basename_start + 1], path.length() - basename_start - 1};
}
String parentPath(const String & path)
{
auto rslash_pos = path.rfind('/');
@ -95,6 +101,7 @@ void NuKeeperStorageSnapshot::serialize(const NuKeeperStorageSnapshot & snapshot
{
const auto & path = it->key;
const auto & node = it->value;
Coordination::write(path, out);
writeNode(node, out);
}
@ -138,12 +145,13 @@ SnapshotMetadataPtr NuKeeperStorageSnapshot::deserialize(NuKeeperStorage & stora
current_size++;
}
for (const auto & itr : storage.container)
{
if (itr.key != "/")
{
auto parent_path = parentPath(itr.key);
storage.container.updateValue(parent_path, [&path = itr.key] (NuKeeperStorage::Node & value) { value.children.insert(path); });
storage.container.updateValue(parent_path, [&path = itr.key] (NuKeeperStorage::Node & value) { value.children.insert(getBaseName(path)); });
}
}

View File

@ -26,7 +26,7 @@ private:
List list;
IndexMap map;
bool snapshot_mode;
bool snapshot_mode{false};
public:
@ -113,10 +113,9 @@ public:
auto list_itr = it->second;
auto elem_copy = *(list_itr);
list_itr->active_in_map = false;
map.erase(it);
updater(elem_copy.value);
auto itr = list.insert(list.end(), elem_copy);
map.erase(it);
map.emplace(itr->key, itr);
return itr;
}

View File

@ -0,0 +1,24 @@
<yandex>
<test_keeper_server>
<tcp_port>9181</tcp_port>
<server_id>1</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<coordination_settings>
<snapshot_distance>10</snapshot_distance>
<reserved_log_items>5</reserved_log_items>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>localhost</hostname>
<port>44444</port>
</server>
</raft_configuration>
</test_keeper_server>
</yandex>

View File

@ -0,0 +1,12 @@
<yandex>
<shutdown_wait_unfinished>3</shutdown_wait_unfinished>
<logger>
<level>trace</level>
<log>/var/log/clickhouse-server/log.log</log>
<errorlog>/var/log/clickhouse-server/log.err.log</errorlog>
<size>1000M</size>
<count>10</count>
<stderr>/var/log/clickhouse-server/stderr.log</stderr>
<stdout>/var/log/clickhouse-server/stdout.log</stdout>
</logger>
</yandex>

View File

@ -0,0 +1,91 @@
#!/usr/bin/env python3
#!/usr/bin/env python3
import pytest
from helpers.cluster import ClickHouseCluster
import random
import string
import os
import time
from kazoo.client import KazooClient, KazooState
cluster = ClickHouseCluster(__file__)
# clickhouse itself will use external zookeeper
node = cluster.add_instance('node', main_configs=['configs/enable_test_keeper.xml', 'configs/logs_conf.xml'], stay_alive=True, with_zookeeper=True)
def random_string(length):
return ''.join(random.choices(string.ascii_lowercase + string.digits, k=length))
def create_random_path(prefix="", depth=1):
if depth == 0:
return prefix
return create_random_path(os.path.join(prefix, random_string(3)), depth - 1)
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def get_connection_zk(nodename, timeout=30.0):
_fake_zk_instance = KazooClient(hosts=cluster.get_instance_ip(nodename) + ":9181", timeout=timeout)
def reset_listener(state):
nonlocal _fake_zk_instance
print("Fake zk callback called for state", state)
if state != KazooState.CONNECTED:
_fake_zk_instance._reset()
_fake_zk_instance.add_listener(reset_listener)
_fake_zk_instance.start()
return _fake_zk_instance
def test_state_after_restart(started_cluster):
try:
node_zk = None
node_zk2 = None
node_zk = get_connection_zk("node")
node_zk.create("/test_state_after_restart", b"somevalue")
strs = []
for i in range(100):
strs.append(random_string(123).encode())
node_zk.create("/test_state_after_restart/node" + str(i), strs[i])
existing_children = []
for i in range(100):
if i % 7 == 0:
node_zk.delete("/test_state_after_restart/node" + str(i))
else:
existing_children.append("node" + str(i))
node.restart_clickhouse(kill=True)
node_zk2 = get_connection_zk("node")
assert node_zk2.get("/test_state_after_restart")[0] == b"somevalue"
for i in range(100):
if i % 7 == 0:
assert node_zk2.exists("/test_state_after_restart/node" + str(i)) is None
else:
assert len(node_zk2.get("/test_state_after_restart/node" + str(i))[0]) == 123
assert node_zk2.get("/test_state_after_restart/node" + str(i))[0] == strs[i]
assert list(sorted(existing_children)) == list(sorted(node_zk2.get_children("/test_state_after_restart")))
finally:
try:
if node_zk is not None:
node_zk.stop()
node_zk.close()
if node_zk2 is not None:
node_zk2.stop()
node_zk2.close()
except:
pass