Compare commits

...

18 Commits

Author SHA1 Message Date
Antonio Andelic
dec4d3b6c0
Merge 57db5cf24c into 4e56c026cd 2024-11-21 04:16:05 +08:00
Antonio Andelic
57db5cf24c Randomize correctly 2024-11-19 13:39:54 +01:00
Antonio Andelic
459fa898ed Merge branch 'master' into randomize-keeper-feature-flasgs-keeper 2024-11-19 10:00:41 +01:00
Antonio Andelic
0d875ecf5c
Always randomize in private 2024-11-04 12:56:14 +01:00
Antonio Andelic
6698212b5a Fix test 2024-10-31 13:39:41 +01:00
Antonio Andelic
c787838cb2 Merge branch 'master' into randomize-keeper-feature-flasgs-keeper 2024-10-31 12:01:31 +01:00
Antonio Andelic
eb020f1c4b Fix RemoveRecursive 2024-10-29 09:05:31 +01:00
Antonio Andelic
1a40df4d0c Merge branch 'master' into randomize-keeper-feature-flasgs-keeper 2024-10-28 12:07:38 +01:00
Antonio Andelic
4380c6035d Merge branch 'master' into randomize-keeper-feature-flasgs-keeper 2024-10-15 16:51:36 +02:00
Antonio Andelic
5145281088 Correct randomization 2024-10-15 16:51:32 +02:00
Antonio Andelic
35fa4c43e4 More fixes 2024-10-10 19:39:28 +02:00
robot-clickhouse
293e076493 Automatic style fix 2024-10-10 14:03:18 +00:00
Antonio Andelic
8b92603c6d Fix old version 2024-10-10 15:52:56 +02:00
Antonio Andelic
fb14f6e029 Fix MultiRead 2024-10-10 15:52:37 +02:00
robot-clickhouse
e1f37ec2bb Automatic style fix 2024-10-10 07:54:28 +00:00
Antonio Andelic
cc0ef6104f Fix MultiRead 2024-10-10 09:45:42 +02:00
robot-clickhouse
46ce65e66e Automatic style fix 2024-10-09 16:21:09 +00:00
Antonio Andelic
e048893b85 Randomize feature flags in integration test 2024-10-09 18:11:50 +02:00
12 changed files with 130 additions and 10 deletions

View File

@ -341,7 +341,10 @@ Coordination::Error ZooKeeper::tryGetChildren(
const EventPtr & watch,
Coordination::ListRequestType list_request_type)
{
return tryGetChildrenWatch(path, res, stat,
return tryGetChildrenWatch(
path,
res,
stat,
watch ? std::make_shared<Coordination::WatchCallback>(callbackForEvent(watch)) : Coordination::WatchCallbackPtr{},
list_request_type);
}
@ -975,11 +978,14 @@ void ZooKeeper::removeRecursive(const std::string & path, uint32_t remove_nodes_
Coordination::Error ZooKeeper::tryRemoveRecursive(const std::string & path, uint32_t remove_nodes_limit)
{
if (!isFeatureEnabled(DB::KeeperFeatureFlag::REMOVE_RECURSIVE))
const auto fallback_method = [&]
{
tryRemoveChildrenRecursive(path);
return tryRemove(path);
}
};
if (!isFeatureEnabled(DB::KeeperFeatureFlag::REMOVE_RECURSIVE))
return fallback_method();
auto promise = std::make_shared<std::promise<Coordination::RemoveRecursiveResponse>>();
auto future = promise->get_future();
@ -998,6 +1004,10 @@ Coordination::Error ZooKeeper::tryRemoveRecursive(const std::string & path, uint
}
auto response = future.get();
if (response.error == Coordination::Error::ZNOTEMPTY) /// limit was too low, try without RemoveRecursive request
return fallback_method();
return response.error;
}

View File

@ -486,13 +486,13 @@ public:
/// Remove the node with the subtree.
/// If Keeper supports RemoveRecursive operation then it will be performed atomically.
/// Otherwise if someone concurrently adds or removes a node in the subtree, the result is undefined.
void removeRecursive(const std::string & path, uint32_t remove_nodes_limit = 100);
void removeRecursive(const std::string & path, uint32_t remove_nodes_limit = 1000);
/// Same as removeRecursive but in case if Keeper does not supports RemoveRecursive and
/// if someone concurrently removes a node in the subtree, this will not cause errors.
/// For instance, you can call this method twice concurrently for the same node and the end
/// result would be the same as for the single call.
Coordination::Error tryRemoveRecursive(const std::string & path, uint32_t remove_nodes_limit = 100);
Coordination::Error tryRemoveRecursive(const std::string & path, uint32_t remove_nodes_limit = 1000);
/// Similar to removeRecursive(...) and tryRemoveRecursive(...), but does not remove path itself.
/// Node defined as RemoveException will not be deleted.

View File

@ -767,6 +767,11 @@ size_t ZooKeeperMultiRequest::sizeImpl() const
}
void ZooKeeperMultiRequest::readImpl(ReadBuffer & in)
{
readImpl(in, /*request_validator=*/{});
}
void ZooKeeperMultiRequest::readImpl(ReadBuffer & in, RequestValidator request_validator)
{
while (true)
{
@ -788,6 +793,8 @@ void ZooKeeperMultiRequest::readImpl(ReadBuffer & in)
ZooKeeperRequestPtr request = ZooKeeperRequestFactory::instance().get(op_num);
request->readImpl(in);
if (request_validator)
request_validator(*request);
requests.push_back(request);
if (in.eof())

View File

@ -570,6 +570,9 @@ struct ZooKeeperMultiRequest final : MultiRequest<ZooKeeperRequestPtr>, ZooKeepe
void writeImpl(WriteBuffer & out) const override;
size_t sizeImpl() const override;
void readImpl(ReadBuffer & in) override;
using RequestValidator = std::function<void(const ZooKeeperRequest &)>;
void readImpl(ReadBuffer & in, RequestValidator request_validator);
std::string toStringImpl(bool short_format) const override;
ZooKeeperResponsePtr makeResponse() const override;

View File

@ -514,7 +514,13 @@ void KeeperContext::initializeFeatureFlags(const Poco::Util::AbstractConfigurati
feature_flags.disableFeatureFlag(feature_flag.value());
}
if (feature_flags.isEnabled(KeeperFeatureFlag::MULTI_READ))
feature_flags.enableFeatureFlag(KeeperFeatureFlag::FILTERED_LIST);
else
system_nodes_with_data[keeper_api_version_path] = toString(static_cast<uint8_t>(KeeperApiVersion::ZOOKEEPER_COMPATIBLE));
system_nodes_with_data[keeper_api_feature_flags_path] = feature_flags.getFeatureFlags();
}
feature_flags.logFlags(getLogger("KeeperContext"));
@ -569,6 +575,25 @@ const CoordinationSettings & KeeperContext::getCoordinationSettings() const
return *coordination_settings;
}
bool KeeperContext::isOperationSupported(Coordination::OpNum operation) const
{
switch (operation)
{
case Coordination::OpNum::FilteredList:
return feature_flags.isEnabled(KeeperFeatureFlag::FILTERED_LIST);
case Coordination::OpNum::MultiRead:
return feature_flags.isEnabled(KeeperFeatureFlag::MULTI_READ);
case Coordination::OpNum::CreateIfNotExists:
return feature_flags.isEnabled(KeeperFeatureFlag::CREATE_IF_NOT_EXISTS);
case Coordination::OpNum::CheckNotExists:
return feature_flags.isEnabled(KeeperFeatureFlag::CHECK_NOT_EXISTS);
case Coordination::OpNum::RemoveRecursive:
return feature_flags.isEnabled(KeeperFeatureFlag::REMOVE_RECURSIVE);
default:
return true;
}
}
uint64_t KeeperContext::lastCommittedIndex() const
{
return last_committed_log_idx.load(std::memory_order_relaxed);

View File

@ -1,6 +1,7 @@
#pragma once
#include <Coordination/KeeperFeatureFlags.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <Common/ZooKeeper/ZooKeeperConstants.h>
#include <atomic>
#include <condition_variable>
#include <cstdint>
@ -103,6 +104,7 @@ public:
return precommit_sleep_probability_for_testing;
}
bool isOperationSupported(Coordination::OpNum operation) const;
private:
/// local disk defined using path or disk name
using Storage = std::variant<DiskPtr, std::string>;

View File

@ -1,5 +1,4 @@
#include <Server/KeeperTCPHandler.h>
#include "Common/ZooKeeper/ZooKeeperConstants.h"
#if USE_NURAFT
@ -19,6 +18,8 @@
# include <Common/NetException.h>
# include <Common/PipeFDs.h>
# include <Common/Stopwatch.h>
# include <Common/ZooKeeper/ZooKeeperCommon.h>
# include <Common/ZooKeeper/ZooKeeperConstants.h>
# include <Common/ZooKeeper/ZooKeeperIO.h>
# include <Common/logger_useful.h>
# include <Common/setThreadName.h>
@ -63,6 +64,7 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
extern const int UNEXPECTED_PACKET_FROM_CLIENT;
extern const int TIMEOUT_EXCEEDED;
extern const int BAD_ARGUMENTS;
}
struct PollResult
@ -637,7 +639,23 @@ std::pair<Coordination::OpNum, Coordination::XID> KeeperTCPHandler::receiveReque
Coordination::ZooKeeperRequestPtr request = Coordination::ZooKeeperRequestFactory::instance().get(opnum);
request->xid = xid;
request->readImpl(read_buffer);
auto request_validator = [&](const Coordination::ZooKeeperRequest & current_request)
{
if (!keeper_dispatcher->getKeeperContext()->isOperationSupported(current_request.getOpNum()))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unsupported operation: {}", current_request.getOpNum());
};
if (auto * multi_request = dynamic_cast<Coordination::ZooKeeperMultiRequest *>(request.get()))
{
multi_request->readImpl(read_buffer, request_validator);
}
else
{
request->readImpl(read_buffer);
request_validator(*request);
}
if (!keeper_dispatcher->putRequest(request, session_id, use_xid_64))
throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Session {} already disconnected", session_id);

View File

@ -7,6 +7,7 @@ import os.path as p
import platform
import pprint
import pwd
import random
import re
import shlex
import shutil
@ -1650,6 +1651,8 @@ class ClickHouseCluster:
minio_certs_dir=None,
minio_data_dir=None,
use_keeper=True,
keeper_randomize_feature_flags=True,
keeper_required_feature_flags=[],
main_config_name="config.xml",
users_config_name="users.xml",
copy_common_configs=True,
@ -1682,6 +1685,8 @@ class ClickHouseCluster:
if not env_variables:
env_variables = {}
self.use_keeper = use_keeper
self.keeper_randomize_feature_flags = keeper_randomize_feature_flags
self.keeper_required_feature_flags = keeper_required_feature_flags
# Code coverage files will be placed in database directory
# (affect only WITH_COVERAGE=1 build)
@ -2828,15 +2833,51 @@ class ClickHouseCluster:
if self.use_keeper: # TODO: remove hardcoded paths from here
for i in range(1, 4):
current_keeper_config_dir = os.path.join(
f"{self.keeper_instance_dir_prefix}{i}", "config"
)
shutil.copy(
os.path.join(
self.keeper_config_dir, f"keeper_config{i}.xml"
),
os.path.join(
self.keeper_instance_dir_prefix + f"{i}", "config"
),
current_keeper_config_dir,
)
extra_configs_dir = os.path.join(
current_keeper_config_dir, f"keeper_config{i}.d"
)
os.mkdir(extra_configs_dir)
feature_flags_config = os.path.join(
extra_configs_dir, "feature_flags.yaml"
)
indentation = 4 * " "
def get_feature_flag_value(feature_flag):
if not self.keeper_randomize_feature_flags:
return 1
if feature_flag in self.keeper_required_feature_flags:
return 1
return random.randint(0, 1)
with open(feature_flags_config, "w") as ff_config:
ff_config.write("keeper_server:\n")
ff_config.write(f"{indentation}feature_flags:\n")
indentation *= 2
for feature_flag in [
"filtered_list",
"multi_read",
"check_not_exists",
"create_if_not_exists",
"remove_recursive",
]:
ff_config.write(
f"{indentation}{feature_flag}: {get_feature_flag_value(feature_flag)}\n"
)
run_and_check(self.base_zookeeper_cmd + common_opts, env=self.env)
self.up_called = True

View File

@ -13,6 +13,7 @@ node = cluster.add_instance(
main_configs=["configs/enable_keeper_map.xml"],
user_configs=["configs/keeper_retries.xml"],
with_zookeeper=True,
keeper_required_feature_flags=["multi_read"],
stay_alive=True,
)

View File

@ -20,6 +20,7 @@ node1 = cluster.add_instance(
main_configs=["configs/config.xml"],
user_configs=["configs/users.xml"],
with_zookeeper=True,
keeper_required_feature_flags=["multi_read", "create_if_not_exists"],
macros={"shard": "shard1", "replica": "1"},
stay_alive=True,
)
@ -28,6 +29,7 @@ node2 = cluster.add_instance(
main_configs=["configs/config.xml"],
user_configs=["configs/users.xml"],
with_zookeeper=True,
keeper_required_feature_flags=["multi_read", "create_if_not_exists"],
macros={"shard": "shard1", "replica": "2"},
)
nodes = [node1, node2]

View File

@ -59,6 +59,9 @@ instance = cluster.add_instance(
user_configs=["configs/users.xml"],
with_kafka=True,
with_zookeeper=True, # For Replicated Table
keeper_required_feature_flags=[
"create_if_not_exists"
], # new Kafka doesn't work without this feature
macros={
"kafka_broker": "kafka1",
"kafka_topic_old": KAFKA_TOPIC_OLD,

View File

@ -99,6 +99,7 @@ def started_cluster():
with_minio=True,
with_azurite=True,
with_zookeeper=True,
keeper_required_feature_flags=["create_if_not_exists"],
main_configs=[
"configs/zookeeper.xml",
"configs/s3queue_log.xml",
@ -110,6 +111,7 @@ def started_cluster():
user_configs=["configs/users.xml"],
with_minio=True,
with_zookeeper=True,
keeper_required_feature_flags=["create_if_not_exists"],
main_configs=[
"configs/s3queue_log.xml",
],
@ -118,6 +120,7 @@ def started_cluster():
cluster.add_instance(
"old_instance",
with_zookeeper=True,
keeper_required_feature_flags=["create_if_not_exists"],
image="clickhouse/clickhouse-server",
tag="23.12",
stay_alive=True,
@ -127,6 +130,7 @@ def started_cluster():
cluster.add_instance(
"node1",
with_zookeeper=True,
keeper_required_feature_flags=["create_if_not_exists"],
stay_alive=True,
main_configs=[
"configs/zookeeper.xml",
@ -137,6 +141,7 @@ def started_cluster():
cluster.add_instance(
"node2",
with_zookeeper=True,
keeper_required_feature_flags=["create_if_not_exists"],
stay_alive=True,
main_configs=[
"configs/zookeeper.xml",
@ -149,6 +154,7 @@ def started_cluster():
user_configs=["configs/users.xml"],
with_minio=True,
with_zookeeper=True,
keeper_required_feature_flags=["create_if_not_exists"],
main_configs=[
"configs/s3queue_log.xml",
"configs/merge_tree.xml",
@ -158,6 +164,7 @@ def started_cluster():
cluster.add_instance(
"instance_24.5",
with_zookeeper=True,
keeper_required_feature_flags=["create_if_not_exists"],
image="clickhouse/clickhouse-server",
tag="24.5",
stay_alive=True,
@ -170,6 +177,7 @@ def started_cluster():
cluster.add_instance(
"node_cloud_mode",
with_zookeeper=True,
keeper_required_feature_flags=["create_if_not_exists"],
stay_alive=True,
main_configs=[
"configs/zookeeper.xml",