Add config for pinning api version

This commit is contained in:
Antonio Andelic 2023-06-09 13:18:13 +00:00
parent d4602f7e1f
commit 07582d56f3
11 changed files with 169 additions and 6 deletions

View File

@ -486,7 +486,7 @@ String RecoveryCommand::run()
String ApiVersionCommand::run()
{
return toString(static_cast<uint8_t>(Coordination::current_keeper_api_version));
return toString(static_cast<uint8_t>(Coordination::latest_keeper_api_version));
}
String CreateSnapshotCommand::run()

View File

@ -13,7 +13,7 @@ enum class KeeperApiVersion : uint8_t
WITH_CHECK_NOT_EXISTS,
};
inline constexpr auto current_keeper_api_version = KeeperApiVersion::WITH_CHECK_NOT_EXISTS;
inline constexpr auto latest_keeper_api_version = KeeperApiVersion::WITH_CHECK_NOT_EXISTS;
const std::string keeper_system_path = "/keeper";
const std::string keeper_api_version_path = keeper_system_path + "/api_version";
@ -21,7 +21,7 @@ const std::string keeper_api_version_path = keeper_system_path + "/api_version";
using PathWithData = std::pair<std::string_view, std::string>;
const std::vector<PathWithData> child_system_paths_with_data
{
{keeper_api_version_path, toString(static_cast<uint8_t>(current_keeper_api_version))}
{keeper_api_version_path, toString(static_cast<uint8_t>(latest_keeper_api_version))}
};
}

View File

@ -0,0 +1,37 @@
#include <Coordination/KeeperContext.h>
#include <Coordination/KeeperConstants.h>
#include <Core/SettingsEnums.h>
#include <Common/ErrorCodes.h>
#include <Common/logger_useful.h>
#include <Core/SettingsFields.h>
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
DECLARE_SETTING_ENUM(KeeperApiVersion);
IMPLEMENT_SETTING_ENUM(KeeperApiVersion, ErrorCodes::BAD_ARGUMENTS,
{{"ZOOKEEPER_COMPATIBLE", KeeperApiVersion::ZOOKEEPER_COMPATIBLE},
{"WITH_FILTERED_LIST", KeeperApiVersion::WITH_FILTERED_LIST},
{"WITH_MULTI_READ", KeeperApiVersion::WITH_MULTI_READ},
{"WITH_CHECK_NOT_EXISTS", KeeperApiVersion::WITH_CHECK_NOT_EXISTS}});
void KeeperContext::initialize(const Poco::Util::AbstractConfiguration & config)
{
for (const auto & [path, data] : child_system_paths_with_data)
system_nodes_with_data[std::string{path}] = data;
if (config.has("keeper_server.api_version"))
{
auto version_string = config.getString("keeper_server.api_version");
auto api_version = SettingFieldKeeperApiVersionTraits::fromString(version_string);
LOG_INFO(&Poco::Logger::get("KeeperContext"), "API version override used: {}", version_string);
system_nodes_with_data[keeper_api_version_path] = toString(static_cast<uint8_t>(api_version));
}
}
}

View File

@ -1,10 +1,14 @@
#pragma once
#include <Poco/Util/AbstractConfiguration.h>
namespace DB
{
struct KeeperContext
{
void initialize(const Poco::Util::AbstractConfiguration & config);
enum class Phase : uint8_t
{
INIT,
@ -16,6 +20,8 @@ struct KeeperContext
bool ignore_system_path_on_startup{false};
bool digest_enabled{true};
std::unordered_map<std::string, std::string> system_nodes_with_data;
};
using KeeperContextPtr = std::shared_ptr<KeeperContext>;

View File

@ -119,6 +119,8 @@ KeeperServer::KeeperServer(
if (coordination_settings->quorum_reads)
LOG_WARNING(log, "Quorum reads enabled, Keeper will work slower.");
keeper_context->initialize(config);
keeper_context->digest_enabled = config.getBool("keeper_server.digest_enabled", false);
keeper_context->ignore_system_path_on_startup = config.getBool("keeper_server.ignore_system_path_on_startup", false);

View File

@ -185,7 +185,7 @@ void KeeperStorageSnapshot::serialize(const KeeperStorageSnapshot & snapshot, Wr
}
/// Serialize data tree
writeBinary(snapshot.snapshot_container_size - child_system_paths_with_data.size(), out);
writeBinary(snapshot.snapshot_container_size - keeper_context->system_nodes_with_data.size(), out);
size_t counter = 0;
for (auto it = snapshot.begin; counter < snapshot.snapshot_container_size; ++counter)
{

View File

@ -283,7 +283,7 @@ void KeeperStorage::initializeSystemNodes()
}
// insert child system nodes
for (const auto & [path, data] : child_system_paths_with_data)
for (const auto & [path, data] : keeper_context->system_nodes_with_data)
{
assert(keeper_api_version_path.starts_with(keeper_system_path));
Node child_system_node;

View File

@ -2357,7 +2357,7 @@ TEST_P(CoordinationTest, TestCurrentApiVersion)
uint8_t keeper_version{0};
DB::ReadBufferFromOwnString buf(get_response.data);
DB::readIntText(keeper_version, buf);
EXPECT_EQ(keeper_version, static_cast<uint8_t>(current_keeper_api_version));
EXPECT_EQ(keeper_version, static_cast<uint8_t>(latest_keeper_api_version));
}
TEST_P(CoordinationTest, TestSystemNodeModify)

View File

@ -0,0 +1,31 @@
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>1</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<coordination_settings>
<snapshot_distance>10</snapshot_distance>
<reserved_log_items>5</reserved_log_items>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
<!-- For instant start in single node configuration -->
<heart_beat_interval_ms>0</heart_beat_interval_ms>
<election_timeout_lower_bound_ms>0</election_timeout_lower_bound_ms>
<election_timeout_upper_bound_ms>0</election_timeout_upper_bound_ms>
</coordination_settings>
<!-- API VERSION -->
<raft_configuration>
<server>
<id>1</id>
<hostname>localhost</hostname>
<port>9234</port>
</server>
</raft_configuration>
</keeper_server>
</clickhouse>

View File

@ -0,0 +1,87 @@
#!/usr/bin/env python3
import pytest
import os
from helpers.cluster import ClickHouseCluster
import helpers.keeper_utils as keeper_utils
from kazoo.client import KazooClient, KazooState
CURRENT_TEST_DIR = os.path.dirname(os.path.abspath(__file__))
cluster = ClickHouseCluster(__file__)
# clickhouse itself will use external zookeeper
node = cluster.add_instance(
"node",
main_configs=["configs/enable_keeper.xml"],
stay_alive=True,
)
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def get_connection_zk(nodename, timeout=30.0):
_fake_zk_instance = KazooClient(
hosts=cluster.get_instance_ip(nodename) + ":9181", timeout=timeout
)
_fake_zk_instance.start()
return _fake_zk_instance
def restart_clickhouse(api_version=None, expect_fail=True):
node.stop_clickhouse()
node.copy_file_to_container(
os.path.join(CURRENT_TEST_DIR, "configs/enable_keeper.xml"),
"/etc/clickhouse-server/config.d/enable_keeper.xml",
)
if api_version:
node.replace_in_config(
"/etc/clickhouse-server/config.d/enable_keeper.xml",
"<!-- API VERSION -->",
f"<api_version>{api_version}<\\/api_version>",
)
node.start_clickhouse(retry_start=not expect_fail)
keeper_utils.wait_until_connected(cluster, node)
def test_keeper_api_version(started_cluster):
restart_clickhouse()
def assert_version(string_version, version_number):
node.wait_for_log_line(
f"Detected server's API version: {string_version}", look_behind_lines=1000
)
try:
node_zk = get_connection_zk(node.name)
assert node_zk.get("/keeper/api_version")[0] == str(version_number).encode()
finally:
if node_zk:
node_zk.stop()
node_zk.close()
assert_version("WITH_CHECK_NOT_EXISTS", 3)
for i, version in enumerate(
[
"ZOOKEEPER_COMPATIBLE",
"WITH_FILTERED_LIST",
"WITH_MULTI_READ",
"WITH_CHECK_NOT_EXISTS",
]
):
restart_clickhouse(version)
assert_version(version, i)
with pytest.raises(Exception):
restart_clickhouse("INVALID_VERSION", expect_fail=True)