Merge pull request #48855 from ClickHouse/keeper-operation-create-if-not-exists

Implement `createIfNotExists` in Keeper natively
This commit is contained in:
Antonio Andelic 2023-09-26 16:04:51 +02:00 committed by GitHub
commit e7c8363f6f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 124 additions and 20 deletions

View File

@ -52,6 +52,21 @@ function configure()
| sed "s|<snapshot_distance>100000</snapshot_distance>|<snapshot_distance>10000</snapshot_distance>|" \
> /etc/clickhouse-server/config.d/keeper_port.xml.tmp
sudo mv /etc/clickhouse-server/config.d/keeper_port.xml.tmp /etc/clickhouse-server/config.d/keeper_port.xml
function randomize_config_boolean_value {
value=$(($RANDOM % 2))
sudo cat /etc/clickhouse-server/config.d/keeper_port.xml \
| sed "s|<$1>[01]</$1>|<$1>$value</$1>|" \
> /etc/clickhouse-server/config.d/keeper_port.xml.tmp
sudo mv /etc/clickhouse-server/config.d/keeper_port.xml.tmp /etc/clickhouse-server/config.d/keeper_port.xml
}
# Randomize all Keeper feature flags
randomize_config_boolean_value filtered_list
randomize_config_boolean_value multi_read
randomize_config_boolean_value check_not_exists
randomize_config_boolean_value create_if_not_exists
sudo chown clickhouse /etc/clickhouse-server/config.d/keeper_port.xml
sudo chgrp clickhouse /etc/clickhouse-server/config.d/keeper_port.xml

View File

@ -60,11 +60,19 @@ install_packages previous_release_package_folder
# available for dump via clickhouse-local
configure
function remove_keeper_config()
{
sudo cat /etc/clickhouse-server/config.d/keeper_port.xml \
| sed "/<$1>$2<\/$1>/d" \
> /etc/clickhouse-server/config.d/keeper_port.xml.tmp
sudo mv /etc/clickhouse-server/config.d/keeper_port.xml.tmp /etc/clickhouse-server/config.d/keeper_port.xml
}
# async_replication setting doesn't exist on some older versions
sudo cat /etc/clickhouse-server/config.d/keeper_port.xml \
| sed "/<async_replication>1<\/async_replication>/d" \
> /etc/clickhouse-server/config.d/keeper_port.xml.tmp
sudo mv /etc/clickhouse-server/config.d/keeper_port.xml.tmp /etc/clickhouse-server/config.d/keeper_port.xml
remove_keeper_config "async_replication" "1"
# create_if_not_exists feature flag doesn't exist on some older versions
remove_keeper_config "create_if_not_exists" "[01]"
# it contains some new settings, but we can safely remove it
rm /etc/clickhouse-server/config.d/merge_tree.xml
@ -89,10 +97,10 @@ sudo cat /etc/clickhouse-server/config.d/keeper_port.xml \
sudo mv /etc/clickhouse-server/config.d/keeper_port.xml.tmp /etc/clickhouse-server/config.d/keeper_port.xml
# async_replication setting doesn't exist on some older versions
sudo cat /etc/clickhouse-server/config.d/keeper_port.xml \
| sed "/<async_replication>1<\/async_replication>/d" \
> /etc/clickhouse-server/config.d/keeper_port.xml.tmp
sudo mv /etc/clickhouse-server/config.d/keeper_port.xml.tmp /etc/clickhouse-server/config.d/keeper_port.xml
remove_keeper_config "async_replication" "1"
# create_if_not_exists feature flag doesn't exist on some older versions
remove_keeper_config "create_if_not_exists" "[01]"
# But we still need default disk because some tables loaded only into it
sudo cat /etc/clickhouse-server/config.d/s3_storage_policy_by_default.xml \

View File

@ -212,6 +212,9 @@ struct CreateRequest : virtual Request
bool is_sequential = false;
ACLs acls;
/// should it succeed if node already exists
bool not_exists = false;
void addRootPath(const String & root_path) override;
String getPath() const override { return path; }

View File

@ -29,7 +29,7 @@ using EventPtr = std::shared_ptr<Poco::Event>;
template <typename R>
using AsyncResponses = std::vector<std::pair<std::string, std::future<R>>>;
Coordination::RequestPtr makeCreateRequest(const std::string & path, const std::string & data, int create_mode);
Coordination::RequestPtr makeCreateRequest(const std::string & path, const std::string & data, int create_mode, bool ignore_if_exists = false);
Coordination::RequestPtr makeRemoveRequest(const std::string & path, int version);
Coordination::RequestPtr makeSetRequest(const std::string & path, const std::string & data, int version);
Coordination::RequestPtr makeCheckRequest(const std::string & path, int version);

View File

@ -1,4 +1,6 @@
#include "ZooKeeper.h"
#include "Coordination/KeeperConstants.h"
#include "Coordination/KeeperFeatureFlags.h"
#include "ZooKeeperImpl.h"
#include "KeeperException.h"
#include "TestKeeper.h"
@ -353,6 +355,33 @@ void ZooKeeper::createIfNotExists(const std::string & path, const std::string &
void ZooKeeper::createAncestors(const std::string & path)
{
size_t pos = 1;
if (isFeatureEnabled(DB::KeeperFeatureFlag::CREATE_IF_NOT_EXISTS))
{
Coordination::Requests create_ops;
while (true)
{
pos = path.find('/', pos);
if (pos == std::string::npos)
break;
auto request = makeCreateRequest(path.substr(0, pos), "", CreateMode::Persistent, true);
create_ops.emplace_back(request);
++pos;
}
Coordination::Responses responses;
Coordination::Error code = multiImpl(create_ops, responses);
if (code == Coordination::Error::ZOK)
return;
throw KeeperException::fromPath(code, path);
}
std::string data;
std::string path_created; // Ignored
std::vector<std::string> pending_nodes;
@ -1333,13 +1362,14 @@ void KeeperMultiException::check(
}
Coordination::RequestPtr makeCreateRequest(const std::string & path, const std::string & data, int create_mode)
Coordination::RequestPtr makeCreateRequest(const std::string & path, const std::string & data, int create_mode, bool ignore_if_exists)
{
auto request = std::make_shared<Coordination::CreateRequest>();
request->path = path;
request->data = data;
request->is_ephemeral = create_mode == CreateMode::Ephemeral || create_mode == CreateMode::EphemeralSequential;
request->is_sequential = create_mode == CreateMode::PersistentSequential || create_mode == CreateMode::EphemeralSequential;
request->not_exists = ignore_if_exists;
return request;
}

View File

@ -1,4 +1,5 @@
#include "Common/ZooKeeper/IKeeper.h"
#include "Common/ZooKeeper/ZooKeeperConstants.h"
#include <Common/ZooKeeper/ZooKeeperCommon.h>
#include <Common/ZooKeeper/ZooKeeperIO.h>
#include <Common/Stopwatch.h>
@ -695,7 +696,6 @@ void ZooKeeperMultiResponse::writeImpl(WriteBuffer & out) const
ZooKeeperResponsePtr ZooKeeperHeartbeatRequest::makeResponse() const { return setTime(std::make_shared<ZooKeeperHeartbeatResponse>()); }
ZooKeeperResponsePtr ZooKeeperSyncRequest::makeResponse() const { return setTime(std::make_shared<ZooKeeperSyncResponse>()); }
ZooKeeperResponsePtr ZooKeeperAuthRequest::makeResponse() const { return setTime(std::make_shared<ZooKeeperAuthResponse>()); }
ZooKeeperResponsePtr ZooKeeperCreateRequest::makeResponse() const { return setTime(std::make_shared<ZooKeeperCreateResponse>()); }
ZooKeeperResponsePtr ZooKeeperRemoveRequest::makeResponse() const { return setTime(std::make_shared<ZooKeeperRemoveResponse>()); }
ZooKeeperResponsePtr ZooKeeperExistsRequest::makeResponse() const { return setTime(std::make_shared<ZooKeeperExistsResponse>()); }
ZooKeeperResponsePtr ZooKeeperGetRequest::makeResponse() const { return setTime(std::make_shared<ZooKeeperGetResponse>()); }
@ -704,6 +704,13 @@ ZooKeeperResponsePtr ZooKeeperReconfigRequest::makeResponse() const { return set
ZooKeeperResponsePtr ZooKeeperListRequest::makeResponse() const { return setTime(std::make_shared<ZooKeeperListResponse>()); }
ZooKeeperResponsePtr ZooKeeperSimpleListRequest::makeResponse() const { return setTime(std::make_shared<ZooKeeperSimpleListResponse>()); }
ZooKeeperResponsePtr ZooKeeperCreateRequest::makeResponse() const
{
if (not_exists)
return setTime(std::make_shared<ZooKeeperCreateIfNotExistsResponse>());
return setTime(std::make_shared<ZooKeeperCreateResponse>());
}
ZooKeeperResponsePtr ZooKeeperCheckRequest::makeResponse() const
{
if (not_exists)
@ -977,7 +984,7 @@ void registerZooKeeperRequest(ZooKeeperRequestFactory & factory)
res->operation_type = ZooKeeperMultiRequest::OperationType::Read;
else if constexpr (num == OpNum::Multi)
res->operation_type = ZooKeeperMultiRequest::OperationType::Write;
else if constexpr (num == OpNum::CheckNotExists)
else if constexpr (num == OpNum::CheckNotExists || num == OpNum::CreateIfNotExists)
res->not_exists = true;
return res;
@ -1001,6 +1008,7 @@ ZooKeeperRequestFactory::ZooKeeperRequestFactory()
registerZooKeeperRequest<OpNum::Reconfig, ZooKeeperReconfigRequest>(*this);
registerZooKeeperRequest<OpNum::Multi, ZooKeeperMultiRequest>(*this);
registerZooKeeperRequest<OpNum::MultiRead, ZooKeeperMultiRequest>(*this);
registerZooKeeperRequest<OpNum::CreateIfNotExists, ZooKeeperCreateRequest>(*this);
registerZooKeeperRequest<OpNum::SessionID, ZooKeeperSessionIDRequest>(*this);
registerZooKeeperRequest<OpNum::GetACL, ZooKeeperGetACLRequest>(*this);
registerZooKeeperRequest<OpNum::SetACL, ZooKeeperSetACLRequest>(*this);

View File

@ -230,7 +230,7 @@ struct ZooKeeperCreateRequest final : public CreateRequest, ZooKeeperRequest
ZooKeeperCreateRequest() = default;
explicit ZooKeeperCreateRequest(const CreateRequest & base) : CreateRequest(base) {}
OpNum getOpNum() const override { return OpNum::Create; }
OpNum getOpNum() const override { return not_exists ? OpNum::CreateIfNotExists : OpNum::Create; }
void writeImpl(WriteBuffer & out) const override;
void readImpl(ReadBuffer & in) override;
std::string toStringImpl() const override;
@ -243,7 +243,7 @@ struct ZooKeeperCreateRequest final : public CreateRequest, ZooKeeperRequest
void createLogElements(LogElements & elems) const override;
};
struct ZooKeeperCreateResponse final : CreateResponse, ZooKeeperResponse
struct ZooKeeperCreateResponse : CreateResponse, ZooKeeperResponse
{
void readImpl(ReadBuffer & in) override;
@ -256,6 +256,12 @@ struct ZooKeeperCreateResponse final : CreateResponse, ZooKeeperResponse
void fillLogElements(LogElements & elems, size_t idx) const override;
};
struct ZooKeeperCreateIfNotExistsResponse : ZooKeeperCreateResponse
{
OpNum getOpNum() const override { return OpNum::CreateIfNotExists; }
using ZooKeeperCreateResponse::ZooKeeperCreateResponse;
};
struct ZooKeeperRemoveRequest final : RemoveRequest, ZooKeeperRequest
{
ZooKeeperRemoveRequest() = default;

View File

@ -22,6 +22,7 @@ static const std::unordered_set<int32_t> VALID_OPERATIONS =
static_cast<int32_t>(OpNum::Reconfig),
static_cast<int32_t>(OpNum::Multi),
static_cast<int32_t>(OpNum::MultiRead),
static_cast<int32_t>(OpNum::CreateIfNotExists),
static_cast<int32_t>(OpNum::Auth),
static_cast<int32_t>(OpNum::SessionID),
static_cast<int32_t>(OpNum::SetACL),

View File

@ -38,6 +38,7 @@ enum class OpNum : int32_t
// CH Keeper specific operations
FilteredList = 500,
CheckNotExists = 501,
CreateIfNotExists = 502,
SessionID = 997, /// Special internal request
};

View File

@ -17,4 +17,5 @@ const String keeper_system_path = "/keeper";
const String keeper_api_version_path = keeper_system_path + "/api_version";
const String keeper_api_feature_flags_path = keeper_system_path + "/feature_flags";
const String keeper_config_path = keeper_system_path + "/config";
}

View File

@ -11,6 +11,7 @@ enum class KeeperFeatureFlag : size_t
FILTERED_LIST = 0,
MULTI_READ,
CHECK_NOT_EXISTS,
CREATE_IF_NOT_EXISTS,
};
class KeeperFeatureFlags

View File

@ -956,6 +956,9 @@ struct KeeperStorageCreateRequestProcessor final : public KeeperStorageRequestPr
std::string path_created = request.path;
if (request.is_sequential)
{
if (request.not_exists)
return {KeeperStorage::Delta{zxid, Coordination::Error::ZBADARGUMENTS}};
auto seq_num = parent_node->seq_num;
std::stringstream seq_num_str; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
@ -974,7 +977,12 @@ struct KeeperStorageCreateRequestProcessor final : public KeeperStorageRequestPr
}
if (storage.uncommitted_state.getNode(path_created))
{
if (zk_request->getOpNum() == Coordination::OpNum::CreateIfNotExists)
return new_deltas;
return {KeeperStorage::Delta{zxid, Coordination::Error::ZNODEEXISTS}};
}
if (getBaseNodeName(path_created).size == 0)
return {KeeperStorage::Delta{zxid, Coordination::Error::ZBADARGUMENTS}};
@ -1031,6 +1039,13 @@ struct KeeperStorageCreateRequestProcessor final : public KeeperStorageRequestPr
Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse();
Coordination::ZooKeeperCreateResponse & response = dynamic_cast<Coordination::ZooKeeperCreateResponse &>(*response_ptr);
if (storage.uncommitted_state.deltas.begin()->zxid != zxid)
{
response.path_created = zk_request->getPath();
response.error = Coordination::Error::ZOK;
return response_ptr;
}
if (const auto result = storage.commit(zxid); result != Coordination::Error::ZOK)
{
response.error = result;
@ -1764,6 +1779,7 @@ struct KeeperStorageMultiRequestProcessor final : public KeeperStorageRequestPro
switch (sub_zk_request->getOpNum())
{
case Coordination::OpNum::Create:
case Coordination::OpNum::CreateIfNotExists:
check_operation_type(OperationType::Write);
concrete_requests.push_back(std::make_shared<KeeperStorageCreateRequestProcessor>(sub_zk_request));
break;
@ -2030,6 +2046,7 @@ KeeperStorageRequestProcessorsFactory::KeeperStorageRequestProcessorsFactory()
registerKeeperRequestProcessor<Coordination::OpNum::Check, KeeperStorageCheckRequestProcessor>(*this);
registerKeeperRequestProcessor<Coordination::OpNum::Multi, KeeperStorageMultiRequestProcessor>(*this);
registerKeeperRequestProcessor<Coordination::OpNum::MultiRead, KeeperStorageMultiRequestProcessor>(*this);
registerKeeperRequestProcessor<Coordination::OpNum::CreateIfNotExists, KeeperStorageCreateRequestProcessor>(*this);
registerKeeperRequestProcessor<Coordination::OpNum::SetACL, KeeperStorageSetACLRequestProcessor>(*this);
registerKeeperRequestProcessor<Coordination::OpNum::GetACL, KeeperStorageGetACLRequestProcessor>(*this);
registerKeeperRequestProcessor<Coordination::OpNum::CheckNotExists, KeeperStorageCheckRequestProcessor>(*this);

View File

@ -89,6 +89,7 @@ NamesAndTypesList ZooKeeperLogElement::getNamesAndTypes()
{"SessionID", static_cast<Int16>(Coordination::OpNum::SessionID)},
{"FilteredList", static_cast<Int16>(Coordination::OpNum::FilteredList)},
{"CheckNotExists", static_cast<Int16>(Coordination::OpNum::CheckNotExists)},
{"CreateIfNotExists", static_cast<Int16>(Coordination::OpNum::CreateIfNotExists)},
});
auto error_enum = getCoordinationErrorCodesEnumType();

View File

@ -3,7 +3,7 @@
<tcp_port>9181</tcp_port>
<server_id>1</server_id>
<create_snapshot_on_exit>true</create_snapshot_on_exit>
<create_snapshot_on_exit>1</create_snapshot_on_exit>
<coordination_settings>
<operation_timeout_ms>10000</operation_timeout_ms>
@ -32,7 +32,10 @@
</raft_configuration>
<feature_flags>
<filtered_list>1</filtered_list>
<multi_read>1</multi_read>
<check_not_exists>1</check_not_exists>
<create_if_not_exists>1</create_if_not_exists>
</feature_flags>
</keeper_server>
</clickhouse>

View File

@ -124,8 +124,8 @@ else
fi
# We randomize creating the snapshot on exit for Keeper to test out using older snapshots
create_snapshot_on_exit=$(($RANDOM % 2))
sed --follow-symlinks -i "s|<create_snapshot_on_exit>true</create_snapshot_on_exit>|<create_snapshot_on_exit>$create_snapshot_on_exit</create_snapshot_on_exit>|" $DEST_SERVER_PATH/config.d/keeper_port.xml
value=$(($RANDOM % 2))
sed --follow-symlinks -i "s|<create_snapshot_on_exit>[01]</create_snapshot_on_exit>|<create_snapshot_on_exit>$value</create_snapshot_on_exit>|" $DEST_SERVER_PATH/config.d/keeper_port.xml
if [[ -n "$USE_POLYMORPHIC_PARTS" ]] && [[ "$USE_POLYMORPHIC_PARTS" -eq 1 ]]; then
ln -sf $SRC_PATH/config.d/polymorphic_parts.xml $DEST_SERVER_PATH/config.d/

View File

@ -84,11 +84,20 @@ def test_keeper_feature_flags(started_cluster):
[("filtered_list", 1), ("multi_read", 1), ("check_not_exists", 0)]
)
feature_flags = [("multi_read", 0), ("check_not_exists", 1)]
feature_flags = [
("multi_read", 0),
("check_not_exists", 1),
("create_if_not_exists", 1),
]
restart_clickhouse(feature_flags)
assert_feature_flags(feature_flags + [("filtered_list", 1)])
feature_flags = [("multi_read", 0), ("check_not_exists", 0), ("filtered_list", 0)]
feature_flags = [
("multi_read", 0),
("check_not_exists", 0),
("filtered_list", 0),
("create_if_not_exists", 0),
]
restart_clickhouse(feature_flags)
assert_feature_flags(feature_flags)

View File

@ -1,2 +1,2 @@
default ::1 9181 0 0 0 1 1 ['FILTERED_LIST','MULTI_READ','CHECK_NOT_EXISTS']
default ::1 9181 0 0 0 1 1 ['FILTERED_LIST','MULTI_READ','CHECK_NOT_EXISTS','CREATE_IF_NOT_EXISTS']
zookeeper2 ::1 9181 0 0 0 1