mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-27 18:12:02 +00:00
Merge 57db5cf24c
into 44b4bd38b9
This commit is contained in:
commit
661714c8d7
@ -341,7 +341,10 @@ Coordination::Error ZooKeeper::tryGetChildren(
|
||||
const EventPtr & watch,
|
||||
Coordination::ListRequestType list_request_type)
|
||||
{
|
||||
return tryGetChildrenWatch(path, res, stat,
|
||||
return tryGetChildrenWatch(
|
||||
path,
|
||||
res,
|
||||
stat,
|
||||
watch ? std::make_shared<Coordination::WatchCallback>(callbackForEvent(watch)) : Coordination::WatchCallbackPtr{},
|
||||
list_request_type);
|
||||
}
|
||||
@ -975,11 +978,14 @@ void ZooKeeper::removeRecursive(const std::string & path, uint32_t remove_nodes_
|
||||
|
||||
Coordination::Error ZooKeeper::tryRemoveRecursive(const std::string & path, uint32_t remove_nodes_limit)
|
||||
{
|
||||
if (!isFeatureEnabled(DB::KeeperFeatureFlag::REMOVE_RECURSIVE))
|
||||
const auto fallback_method = [&]
|
||||
{
|
||||
tryRemoveChildrenRecursive(path);
|
||||
return tryRemove(path);
|
||||
}
|
||||
};
|
||||
|
||||
if (!isFeatureEnabled(DB::KeeperFeatureFlag::REMOVE_RECURSIVE))
|
||||
return fallback_method();
|
||||
|
||||
auto promise = std::make_shared<std::promise<Coordination::RemoveRecursiveResponse>>();
|
||||
auto future = promise->get_future();
|
||||
@ -998,6 +1004,10 @@ Coordination::Error ZooKeeper::tryRemoveRecursive(const std::string & path, uint
|
||||
}
|
||||
|
||||
auto response = future.get();
|
||||
|
||||
if (response.error == Coordination::Error::ZNOTEMPTY) /// limit was too low, try without RemoveRecursive request
|
||||
return fallback_method();
|
||||
|
||||
return response.error;
|
||||
}
|
||||
|
||||
|
@ -486,13 +486,13 @@ public:
|
||||
/// Remove the node with the subtree.
|
||||
/// If Keeper supports RemoveRecursive operation then it will be performed atomically.
|
||||
/// Otherwise if someone concurrently adds or removes a node in the subtree, the result is undefined.
|
||||
void removeRecursive(const std::string & path, uint32_t remove_nodes_limit = 100);
|
||||
void removeRecursive(const std::string & path, uint32_t remove_nodes_limit = 1000);
|
||||
|
||||
/// Same as removeRecursive but in case if Keeper does not supports RemoveRecursive and
|
||||
/// if someone concurrently removes a node in the subtree, this will not cause errors.
|
||||
/// For instance, you can call this method twice concurrently for the same node and the end
|
||||
/// result would be the same as for the single call.
|
||||
Coordination::Error tryRemoveRecursive(const std::string & path, uint32_t remove_nodes_limit = 100);
|
||||
Coordination::Error tryRemoveRecursive(const std::string & path, uint32_t remove_nodes_limit = 1000);
|
||||
|
||||
/// Similar to removeRecursive(...) and tryRemoveRecursive(...), but does not remove path itself.
|
||||
/// Node defined as RemoveException will not be deleted.
|
||||
|
@ -767,6 +767,11 @@ size_t ZooKeeperMultiRequest::sizeImpl() const
|
||||
}
|
||||
|
||||
void ZooKeeperMultiRequest::readImpl(ReadBuffer & in)
|
||||
{
|
||||
readImpl(in, /*request_validator=*/{});
|
||||
}
|
||||
|
||||
void ZooKeeperMultiRequest::readImpl(ReadBuffer & in, RequestValidator request_validator)
|
||||
{
|
||||
while (true)
|
||||
{
|
||||
@ -788,6 +793,8 @@ void ZooKeeperMultiRequest::readImpl(ReadBuffer & in)
|
||||
|
||||
ZooKeeperRequestPtr request = ZooKeeperRequestFactory::instance().get(op_num);
|
||||
request->readImpl(in);
|
||||
if (request_validator)
|
||||
request_validator(*request);
|
||||
requests.push_back(request);
|
||||
|
||||
if (in.eof())
|
||||
|
@ -570,6 +570,9 @@ struct ZooKeeperMultiRequest final : MultiRequest<ZooKeeperRequestPtr>, ZooKeepe
|
||||
void writeImpl(WriteBuffer & out) const override;
|
||||
size_t sizeImpl() const override;
|
||||
void readImpl(ReadBuffer & in) override;
|
||||
|
||||
using RequestValidator = std::function<void(const ZooKeeperRequest &)>;
|
||||
void readImpl(ReadBuffer & in, RequestValidator request_validator);
|
||||
std::string toStringImpl(bool short_format) const override;
|
||||
|
||||
ZooKeeperResponsePtr makeResponse() const override;
|
||||
|
@ -514,7 +514,13 @@ void KeeperContext::initializeFeatureFlags(const Poco::Util::AbstractConfigurati
|
||||
feature_flags.disableFeatureFlag(feature_flag.value());
|
||||
}
|
||||
|
||||
if (feature_flags.isEnabled(KeeperFeatureFlag::MULTI_READ))
|
||||
feature_flags.enableFeatureFlag(KeeperFeatureFlag::FILTERED_LIST);
|
||||
else
|
||||
system_nodes_with_data[keeper_api_version_path] = toString(static_cast<uint8_t>(KeeperApiVersion::ZOOKEEPER_COMPATIBLE));
|
||||
|
||||
system_nodes_with_data[keeper_api_feature_flags_path] = feature_flags.getFeatureFlags();
|
||||
|
||||
}
|
||||
|
||||
feature_flags.logFlags(getLogger("KeeperContext"));
|
||||
@ -569,6 +575,25 @@ const CoordinationSettings & KeeperContext::getCoordinationSettings() const
|
||||
return *coordination_settings;
|
||||
}
|
||||
|
||||
bool KeeperContext::isOperationSupported(Coordination::OpNum operation) const
|
||||
{
|
||||
switch (operation)
|
||||
{
|
||||
case Coordination::OpNum::FilteredList:
|
||||
return feature_flags.isEnabled(KeeperFeatureFlag::FILTERED_LIST);
|
||||
case Coordination::OpNum::MultiRead:
|
||||
return feature_flags.isEnabled(KeeperFeatureFlag::MULTI_READ);
|
||||
case Coordination::OpNum::CreateIfNotExists:
|
||||
return feature_flags.isEnabled(KeeperFeatureFlag::CREATE_IF_NOT_EXISTS);
|
||||
case Coordination::OpNum::CheckNotExists:
|
||||
return feature_flags.isEnabled(KeeperFeatureFlag::CHECK_NOT_EXISTS);
|
||||
case Coordination::OpNum::RemoveRecursive:
|
||||
return feature_flags.isEnabled(KeeperFeatureFlag::REMOVE_RECURSIVE);
|
||||
default:
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
uint64_t KeeperContext::lastCommittedIndex() const
|
||||
{
|
||||
return last_committed_log_idx.load(std::memory_order_relaxed);
|
||||
|
@ -1,6 +1,7 @@
|
||||
#pragma once
|
||||
#include <Coordination/KeeperFeatureFlags.h>
|
||||
#include <Poco/Util/AbstractConfiguration.h>
|
||||
#include <Common/ZooKeeper/ZooKeeperConstants.h>
|
||||
#include <atomic>
|
||||
#include <condition_variable>
|
||||
#include <cstdint>
|
||||
@ -103,6 +104,7 @@ public:
|
||||
return precommit_sleep_probability_for_testing;
|
||||
}
|
||||
|
||||
bool isOperationSupported(Coordination::OpNum operation) const;
|
||||
private:
|
||||
/// local disk defined using path or disk name
|
||||
using Storage = std::variant<DiskPtr, std::string>;
|
||||
|
@ -1,5 +1,4 @@
|
||||
#include <Server/KeeperTCPHandler.h>
|
||||
#include "Common/ZooKeeper/ZooKeeperConstants.h"
|
||||
|
||||
#if USE_NURAFT
|
||||
|
||||
@ -19,6 +18,8 @@
|
||||
# include <Common/NetException.h>
|
||||
# include <Common/PipeFDs.h>
|
||||
# include <Common/Stopwatch.h>
|
||||
# include <Common/ZooKeeper/ZooKeeperCommon.h>
|
||||
# include <Common/ZooKeeper/ZooKeeperConstants.h>
|
||||
# include <Common/ZooKeeper/ZooKeeperIO.h>
|
||||
# include <Common/logger_useful.h>
|
||||
# include <Common/setThreadName.h>
|
||||
@ -63,6 +64,7 @@ namespace ErrorCodes
|
||||
extern const int LOGICAL_ERROR;
|
||||
extern const int UNEXPECTED_PACKET_FROM_CLIENT;
|
||||
extern const int TIMEOUT_EXCEEDED;
|
||||
extern const int BAD_ARGUMENTS;
|
||||
}
|
||||
|
||||
struct PollResult
|
||||
@ -637,7 +639,23 @@ std::pair<Coordination::OpNum, Coordination::XID> KeeperTCPHandler::receiveReque
|
||||
|
||||
Coordination::ZooKeeperRequestPtr request = Coordination::ZooKeeperRequestFactory::instance().get(opnum);
|
||||
request->xid = xid;
|
||||
request->readImpl(read_buffer);
|
||||
|
||||
auto request_validator = [&](const Coordination::ZooKeeperRequest & current_request)
|
||||
{
|
||||
if (!keeper_dispatcher->getKeeperContext()->isOperationSupported(current_request.getOpNum()))
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unsupported operation: {}", current_request.getOpNum());
|
||||
};
|
||||
|
||||
if (auto * multi_request = dynamic_cast<Coordination::ZooKeeperMultiRequest *>(request.get()))
|
||||
{
|
||||
multi_request->readImpl(read_buffer, request_validator);
|
||||
}
|
||||
else
|
||||
{
|
||||
request->readImpl(read_buffer);
|
||||
request_validator(*request);
|
||||
}
|
||||
|
||||
|
||||
if (!keeper_dispatcher->putRequest(request, session_id, use_xid_64))
|
||||
throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Session {} already disconnected", session_id);
|
||||
|
@ -7,6 +7,7 @@ import os.path as p
|
||||
import platform
|
||||
import pprint
|
||||
import pwd
|
||||
import random
|
||||
import re
|
||||
import shlex
|
||||
import shutil
|
||||
@ -1650,6 +1651,8 @@ class ClickHouseCluster:
|
||||
minio_certs_dir=None,
|
||||
minio_data_dir=None,
|
||||
use_keeper=True,
|
||||
keeper_randomize_feature_flags=True,
|
||||
keeper_required_feature_flags=[],
|
||||
main_config_name="config.xml",
|
||||
users_config_name="users.xml",
|
||||
copy_common_configs=True,
|
||||
@ -1682,6 +1685,8 @@ class ClickHouseCluster:
|
||||
if not env_variables:
|
||||
env_variables = {}
|
||||
self.use_keeper = use_keeper
|
||||
self.keeper_randomize_feature_flags = keeper_randomize_feature_flags
|
||||
self.keeper_required_feature_flags = keeper_required_feature_flags
|
||||
|
||||
# Code coverage files will be placed in database directory
|
||||
# (affect only WITH_COVERAGE=1 build)
|
||||
@ -2828,15 +2833,51 @@ class ClickHouseCluster:
|
||||
|
||||
if self.use_keeper: # TODO: remove hardcoded paths from here
|
||||
for i in range(1, 4):
|
||||
current_keeper_config_dir = os.path.join(
|
||||
f"{self.keeper_instance_dir_prefix}{i}", "config"
|
||||
)
|
||||
shutil.copy(
|
||||
os.path.join(
|
||||
self.keeper_config_dir, f"keeper_config{i}.xml"
|
||||
),
|
||||
os.path.join(
|
||||
self.keeper_instance_dir_prefix + f"{i}", "config"
|
||||
),
|
||||
current_keeper_config_dir,
|
||||
)
|
||||
|
||||
extra_configs_dir = os.path.join(
|
||||
current_keeper_config_dir, f"keeper_config{i}.d"
|
||||
)
|
||||
os.mkdir(extra_configs_dir)
|
||||
feature_flags_config = os.path.join(
|
||||
extra_configs_dir, "feature_flags.yaml"
|
||||
)
|
||||
|
||||
indentation = 4 * " "
|
||||
|
||||
def get_feature_flag_value(feature_flag):
|
||||
if not self.keeper_randomize_feature_flags:
|
||||
return 1
|
||||
|
||||
if feature_flag in self.keeper_required_feature_flags:
|
||||
return 1
|
||||
|
||||
return random.randint(0, 1)
|
||||
|
||||
with open(feature_flags_config, "w") as ff_config:
|
||||
ff_config.write("keeper_server:\n")
|
||||
ff_config.write(f"{indentation}feature_flags:\n")
|
||||
indentation *= 2
|
||||
|
||||
for feature_flag in [
|
||||
"filtered_list",
|
||||
"multi_read",
|
||||
"check_not_exists",
|
||||
"create_if_not_exists",
|
||||
"remove_recursive",
|
||||
]:
|
||||
ff_config.write(
|
||||
f"{indentation}{feature_flag}: {get_feature_flag_value(feature_flag)}\n"
|
||||
)
|
||||
|
||||
run_and_check(self.base_zookeeper_cmd + common_opts, env=self.env)
|
||||
self.up_called = True
|
||||
|
||||
|
@ -13,6 +13,7 @@ node = cluster.add_instance(
|
||||
main_configs=["configs/enable_keeper_map.xml"],
|
||||
user_configs=["configs/keeper_retries.xml"],
|
||||
with_zookeeper=True,
|
||||
keeper_required_feature_flags=["multi_read"],
|
||||
stay_alive=True,
|
||||
)
|
||||
|
||||
|
@ -20,6 +20,7 @@ node1 = cluster.add_instance(
|
||||
main_configs=["configs/config.xml"],
|
||||
user_configs=["configs/users.xml"],
|
||||
with_zookeeper=True,
|
||||
keeper_required_feature_flags=["multi_read", "create_if_not_exists"],
|
||||
macros={"shard": "shard1", "replica": "1"},
|
||||
stay_alive=True,
|
||||
)
|
||||
@ -28,6 +29,7 @@ node2 = cluster.add_instance(
|
||||
main_configs=["configs/config.xml"],
|
||||
user_configs=["configs/users.xml"],
|
||||
with_zookeeper=True,
|
||||
keeper_required_feature_flags=["multi_read", "create_if_not_exists"],
|
||||
macros={"shard": "shard1", "replica": "2"},
|
||||
)
|
||||
nodes = [node1, node2]
|
||||
|
@ -59,6 +59,9 @@ instance = cluster.add_instance(
|
||||
user_configs=["configs/users.xml"],
|
||||
with_kafka=True,
|
||||
with_zookeeper=True, # For Replicated Table
|
||||
keeper_required_feature_flags=[
|
||||
"create_if_not_exists"
|
||||
], # new Kafka doesn't work without this feature
|
||||
macros={
|
||||
"kafka_broker": "kafka1",
|
||||
"kafka_topic_old": KAFKA_TOPIC_OLD,
|
||||
|
@ -99,6 +99,7 @@ def started_cluster():
|
||||
with_minio=True,
|
||||
with_azurite=True,
|
||||
with_zookeeper=True,
|
||||
keeper_required_feature_flags=["create_if_not_exists"],
|
||||
main_configs=[
|
||||
"configs/zookeeper.xml",
|
||||
"configs/s3queue_log.xml",
|
||||
@ -110,6 +111,7 @@ def started_cluster():
|
||||
user_configs=["configs/users.xml"],
|
||||
with_minio=True,
|
||||
with_zookeeper=True,
|
||||
keeper_required_feature_flags=["create_if_not_exists"],
|
||||
main_configs=[
|
||||
"configs/s3queue_log.xml",
|
||||
],
|
||||
@ -118,6 +120,7 @@ def started_cluster():
|
||||
cluster.add_instance(
|
||||
"old_instance",
|
||||
with_zookeeper=True,
|
||||
keeper_required_feature_flags=["create_if_not_exists"],
|
||||
image="clickhouse/clickhouse-server",
|
||||
tag="23.12",
|
||||
stay_alive=True,
|
||||
@ -127,6 +130,7 @@ def started_cluster():
|
||||
cluster.add_instance(
|
||||
"node1",
|
||||
with_zookeeper=True,
|
||||
keeper_required_feature_flags=["create_if_not_exists"],
|
||||
stay_alive=True,
|
||||
main_configs=[
|
||||
"configs/zookeeper.xml",
|
||||
@ -137,6 +141,7 @@ def started_cluster():
|
||||
cluster.add_instance(
|
||||
"node2",
|
||||
with_zookeeper=True,
|
||||
keeper_required_feature_flags=["create_if_not_exists"],
|
||||
stay_alive=True,
|
||||
main_configs=[
|
||||
"configs/zookeeper.xml",
|
||||
@ -149,6 +154,7 @@ def started_cluster():
|
||||
user_configs=["configs/users.xml"],
|
||||
with_minio=True,
|
||||
with_zookeeper=True,
|
||||
keeper_required_feature_flags=["create_if_not_exists"],
|
||||
main_configs=[
|
||||
"configs/s3queue_log.xml",
|
||||
"configs/merge_tree.xml",
|
||||
@ -158,6 +164,7 @@ def started_cluster():
|
||||
cluster.add_instance(
|
||||
"instance_24.5",
|
||||
with_zookeeper=True,
|
||||
keeper_required_feature_flags=["create_if_not_exists"],
|
||||
image="clickhouse/clickhouse-server",
|
||||
tag="24.5",
|
||||
stay_alive=True,
|
||||
@ -170,6 +177,7 @@ def started_cluster():
|
||||
cluster.add_instance(
|
||||
"node_cloud_mode",
|
||||
with_zookeeper=True,
|
||||
keeper_required_feature_flags=["create_if_not_exists"],
|
||||
stay_alive=True,
|
||||
main_configs=[
|
||||
"configs/zookeeper.xml",
|
||||
|
Loading…
Reference in New Issue
Block a user