mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-28 18:42:26 +00:00
Revert to the version with path
This commit is contained in:
parent
48db20d5a1
commit
cfc741030f
@ -117,6 +117,7 @@ enum KeeperApiVersion : uint8_t
|
||||
};
|
||||
|
||||
inline constexpr auto current_keeper_api_version = KeeperApiVersion::V1;
|
||||
inline constexpr auto * keeper_api_version_path = "/keeper-api-version";
|
||||
|
||||
struct Request;
|
||||
using RequestPtr = std::shared_ptr<Request>;
|
||||
|
@ -661,20 +661,6 @@ void ZooKeeperSessionIDResponse::writeImpl(WriteBuffer & out) const
|
||||
Coordination::write(server_id, out);
|
||||
}
|
||||
|
||||
Coordination::ZooKeeperResponsePtr ZooKeeperApiVersionRequest::makeResponse() const
|
||||
{
|
||||
return std::make_shared<ZooKeeperApiVersionResponse>();
|
||||
}
|
||||
|
||||
void ZooKeeperApiVersionResponse::readImpl(ReadBuffer & in)
|
||||
{
|
||||
Coordination::read(api_version, in);
|
||||
}
|
||||
|
||||
void ZooKeeperApiVersionResponse::writeImpl(WriteBuffer & out) const
|
||||
{
|
||||
Coordination::write(api_version, out);
|
||||
}
|
||||
|
||||
void ZooKeeperRequest::createLogElements(LogElements & elems) const
|
||||
{
|
||||
@ -910,7 +896,6 @@ ZooKeeperRequestFactory::ZooKeeperRequestFactory()
|
||||
registerZooKeeperRequest<OpNum::GetACL, ZooKeeperGetACLRequest>(*this);
|
||||
registerZooKeeperRequest<OpNum::SetACL, ZooKeeperSetACLRequest>(*this);
|
||||
registerZooKeeperRequest<OpNum::FilteredList, ZooKeeperFilteredListRequest>(*this);
|
||||
registerZooKeeperRequest<OpNum::ApiVersion, ZooKeeperApiVersionRequest>(*this);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -535,28 +535,6 @@ struct ZooKeeperSessionIDResponse final : ZooKeeperResponse
|
||||
Coordination::OpNum getOpNum() const override { return OpNum::SessionID; }
|
||||
};
|
||||
|
||||
struct ZooKeeperApiVersionRequest final : ZooKeeperRequest
|
||||
{
|
||||
Coordination::OpNum getOpNum() const override { return OpNum::ApiVersion; }
|
||||
String getPath() const override { return {}; }
|
||||
void writeImpl(WriteBuffer &) const override {}
|
||||
void readImpl(ReadBuffer &) override {}
|
||||
|
||||
Coordination::ZooKeeperResponsePtr makeResponse() const override;
|
||||
bool isReadRequest() const override { return false; }
|
||||
};
|
||||
|
||||
struct ZooKeeperApiVersionResponse final : ZooKeeperResponse
|
||||
{
|
||||
int64_t api_version;
|
||||
|
||||
void readImpl(ReadBuffer & in) override;
|
||||
|
||||
void writeImpl(WriteBuffer & out) const override;
|
||||
|
||||
Coordination::OpNum getOpNum() const override { return OpNum::ApiVersion; }
|
||||
};
|
||||
|
||||
class ZooKeeperRequestFactory final : private boost::noncopyable
|
||||
{
|
||||
|
||||
|
@ -25,7 +25,6 @@ static const std::unordered_set<int32_t> VALID_OPERATIONS =
|
||||
static_cast<int32_t>(OpNum::SetACL),
|
||||
static_cast<int32_t>(OpNum::GetACL),
|
||||
static_cast<int32_t>(OpNum::FilteredList),
|
||||
static_cast<int32_t>(OpNum::ApiVersion),
|
||||
};
|
||||
|
||||
std::string toString(OpNum op_num)
|
||||
@ -68,8 +67,6 @@ std::string toString(OpNum op_num)
|
||||
return "GetACL";
|
||||
case OpNum::FilteredList:
|
||||
return "FilteredList";
|
||||
case OpNum::ApiVersion:
|
||||
return "ApiVersion";
|
||||
}
|
||||
int32_t raw_op = static_cast<int32_t>(op_num);
|
||||
throw Exception("Operation " + std::to_string(raw_op) + " is unknown", Error::ZUNIMPLEMENTED);
|
||||
|
@ -35,7 +35,6 @@ enum class OpNum : int32_t
|
||||
|
||||
// CH Keeper specific operations
|
||||
FilteredList = 500,
|
||||
ApiVersion = 501,
|
||||
|
||||
SessionID = 997, /// Special internal request
|
||||
};
|
||||
|
@ -49,12 +49,6 @@ void write(const std::string & s, WriteBuffer & out)
|
||||
out.write(s.data(), s.size());
|
||||
}
|
||||
|
||||
void write(std::string_view s, WriteBuffer & out)
|
||||
{
|
||||
write(static_cast<int32_t>(s.size()), out);
|
||||
out.write(s.data(), s.size());
|
||||
}
|
||||
|
||||
void write(const ACL & acl, WriteBuffer & out)
|
||||
{
|
||||
write(acl.permissions, out);
|
||||
|
@ -26,7 +26,6 @@ void write(uint8_t x, WriteBuffer & out);
|
||||
void write(OpNum x, WriteBuffer & out);
|
||||
void write(bool x, WriteBuffer & out);
|
||||
void write(const std::string & s, WriteBuffer & out);
|
||||
void write(std::string_view s, WriteBuffer & out);
|
||||
void write(const ACL & acl, WriteBuffer & out);
|
||||
void write(const Stat & stat, WriteBuffer & out);
|
||||
void write(const Error & x, WriteBuffer & out);
|
||||
|
@ -1,3 +1,4 @@
|
||||
#include "Common/ZooKeeper/IKeeper.h"
|
||||
#include <Common/ZooKeeper/ZooKeeperCommon.h>
|
||||
#include <Common/ZooKeeper/ZooKeeperImpl.h>
|
||||
#include <Common/Exception.h>
|
||||
@ -6,6 +7,7 @@
|
||||
#include <Common/ZooKeeper/ZooKeeperIO.h>
|
||||
#include <IO/WriteHelpers.h>
|
||||
#include <IO/ReadHelpers.h>
|
||||
#include <IO/ReadBufferFromString.h>
|
||||
#include <IO/Operators.h>
|
||||
#include <IO/WriteBufferFromString.h>
|
||||
#include <Common/logger_useful.h>
|
||||
@ -346,23 +348,18 @@ ZooKeeper::ZooKeeper(
|
||||
|
||||
connect(nodes, connection_timeout);
|
||||
|
||||
if (!initApiVersion())
|
||||
{
|
||||
// We failed to get the version, let's reconnect in case
|
||||
// the connection became faulty
|
||||
socket.close();
|
||||
connect(nodes, connection_timeout);
|
||||
}
|
||||
|
||||
if (!auth_scheme.empty())
|
||||
sendAuth(auth_scheme, auth_data);
|
||||
|
||||
send_thread = ThreadFromGlobalPool([this] { sendThread(); });
|
||||
receive_thread = ThreadFromGlobalPool([this] { receiveThread(); });
|
||||
|
||||
initApiVersion();
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::ZooKeeperInit);
|
||||
}
|
||||
|
||||
|
||||
void ZooKeeper::connect(
|
||||
const Nodes & nodes,
|
||||
Poco::Timespan connection_timeout)
|
||||
@ -1069,40 +1066,29 @@ Coordination::KeeperApiVersion ZooKeeper::getApiVersion()
|
||||
return keeper_api_version;
|
||||
}
|
||||
|
||||
bool ZooKeeper::initApiVersion()
|
||||
void ZooKeeper::initApiVersion()
|
||||
{
|
||||
try
|
||||
auto promise = std::make_shared<std::promise<Coordination::GetResponse>>();
|
||||
auto future = promise->get_future();
|
||||
|
||||
auto callback = [promise](const Coordination::GetResponse & response) mutable
|
||||
{
|
||||
ZooKeeperApiVersionRequest request;
|
||||
request.write(*out);
|
||||
promise->set_value(response);
|
||||
};
|
||||
|
||||
if (!in->poll(operation_timeout.totalMilliseconds()))
|
||||
{
|
||||
LOG_ERROR(&Poco::Logger::get("ZooKeeper"), "Failed to get version: timeout");
|
||||
return false;
|
||||
}
|
||||
get(Coordination::keeper_api_version_path, std::move(callback), {});
|
||||
if (future.wait_for(std::chrono::milliseconds(operation_timeout.totalMilliseconds())) != std::future_status::ready)
|
||||
return;
|
||||
|
||||
ZooKeeperApiVersionResponse response;
|
||||
auto response = future.get();
|
||||
|
||||
int32_t length;
|
||||
XID xid;
|
||||
int64_t zxid;
|
||||
Error err;
|
||||
read(length);
|
||||
read(xid);
|
||||
read(zxid);
|
||||
read(err);
|
||||
if (response.error != Coordination::Error::ZOK)
|
||||
return;
|
||||
|
||||
response.readImpl(*in);
|
||||
|
||||
keeper_api_version = static_cast<KeeperApiVersion>(response.api_version);
|
||||
return true;
|
||||
}
|
||||
catch (const DB::Exception & e)
|
||||
{
|
||||
LOG_ERROR(&Poco::Logger::get("ZooKeeper"), "Failed to get version: {}", e.message());
|
||||
return false;
|
||||
}
|
||||
uint8_t keeper_version{0};
|
||||
DB::ReadBufferFromOwnString buf(response.data);
|
||||
DB::readIntText(keeper_version, buf);
|
||||
keeper_api_version = static_cast<Coordination::KeeperApiVersion>(keeper_version);
|
||||
}
|
||||
|
||||
|
||||
|
@ -277,7 +277,7 @@ private:
|
||||
|
||||
void logOperationIfNeeded(const ZooKeeperRequestPtr & request, const ZooKeeperResponsePtr & response = nullptr, bool finalize = false);
|
||||
|
||||
bool initApiVersion();
|
||||
void initApiVersion();
|
||||
|
||||
CurrentMetrics::Increment active_session_metric_increment{CurrentMetrics::ZooKeeperSession};
|
||||
std::shared_ptr<ZooKeeperLog> zk_log;
|
||||
|
@ -37,7 +37,7 @@ void CoordinationSettings::loadFromConfig(const String & config_elem, const Poco
|
||||
}
|
||||
|
||||
|
||||
const String KeeperConfigurationAndSettings::DEFAULT_FOUR_LETTER_WORD_CMD = "conf,cons,crst,envi,ruok,srst,srvr,stat,wchs,dirs,mntr,isro,rcvr,apiv";
|
||||
const String KeeperConfigurationAndSettings::DEFAULT_FOUR_LETTER_WORD_CMD = "conf,cons,crst,envi,ruok,srst,srvr,stat,wchs,dirs,mntr,isro,rcvr";
|
||||
|
||||
KeeperConfigurationAndSettings::KeeperConfigurationAndSettings()
|
||||
: server_id(NOT_EXIST)
|
||||
|
@ -2,7 +2,6 @@
|
||||
|
||||
#include <Coordination/KeeperDispatcher.h>
|
||||
#include <Server/KeeperTCPHandler.h>
|
||||
#include <Common/ZooKeeper/IKeeper.h>
|
||||
#include <Common/logger_useful.h>
|
||||
#include <Poco/Environment.h>
|
||||
#include <Poco/Path.h>
|
||||
@ -133,9 +132,6 @@ void FourLetterCommandFactory::registerCommands(KeeperDispatcher & keeper_dispat
|
||||
FourLetterCommandPtr recovery_command = std::make_shared<RecoveryCommand>(keeper_dispatcher);
|
||||
factory.registerCommand(recovery_command);
|
||||
|
||||
FourLetterCommandPtr api_version_command = std::make_shared<ApiVersionCommand>(keeper_dispatcher);
|
||||
factory.registerCommand(api_version_command);
|
||||
|
||||
factory.initializeAllowList(keeper_dispatcher);
|
||||
factory.setInitialize(true);
|
||||
}
|
||||
@ -251,8 +247,6 @@ String MonitorCommand::run()
|
||||
print(ret, "synced_followers", keeper_info.synced_follower_count);
|
||||
}
|
||||
|
||||
print(ret, "api_version", static_cast<uint64_t>(Coordination::current_keeper_api_version));
|
||||
|
||||
return ret.str();
|
||||
}
|
||||
|
||||
@ -469,9 +463,4 @@ String RecoveryCommand::run()
|
||||
return "ok";
|
||||
}
|
||||
|
||||
String ApiVersionCommand::run()
|
||||
{
|
||||
return toString(static_cast<uint8_t>(Coordination::current_keeper_api_version));
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -315,16 +315,4 @@ struct RecoveryCommand : public IFourLetterCommand
|
||||
String run() override;
|
||||
~RecoveryCommand() override = default;
|
||||
};
|
||||
|
||||
struct ApiVersionCommand : public IFourLetterCommand
|
||||
{
|
||||
explicit ApiVersionCommand(KeeperDispatcher & keeper_dispatcher_)
|
||||
: IFourLetterCommand(keeper_dispatcher_)
|
||||
{
|
||||
}
|
||||
|
||||
String name() override { return "apiv"; }
|
||||
String run() override;
|
||||
~ApiVersionCommand() override = default;
|
||||
};
|
||||
}
|
||||
|
@ -266,20 +266,6 @@ bool KeeperDispatcher::putRequest(const Coordination::ZooKeeperRequestPtr & requ
|
||||
if (!requests_queue->push(std::move(request_info)))
|
||||
throw Exception("Cannot push request to queue", ErrorCodes::SYSTEM_ERROR);
|
||||
}
|
||||
else if (request->getOpNum() == Coordination::OpNum::ApiVersion)
|
||||
{
|
||||
auto response = std::make_shared<Coordination::ZooKeeperApiVersionResponse>();
|
||||
response->api_version = Coordination::current_keeper_api_version;
|
||||
LOG_DEBUG(log, "Returning api version {}", response->api_version);
|
||||
|
||||
KeeperStorage::ResponseForSession response_for_session;
|
||||
response_for_session.session_id = session_id;
|
||||
response_for_session.response = std::move(response);
|
||||
{
|
||||
if (!responses_queue.tryPush(std::move(response_for_session), configuration_and_settings->coordination_settings->operation_timeout_ms.totalMilliseconds()))
|
||||
throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Could not push response with API version into responses queue");
|
||||
}
|
||||
}
|
||||
else if (!requests_queue->tryPush(std::move(request_info), configuration_and_settings->coordination_settings->operation_timeout_ms.totalMilliseconds()))
|
||||
{
|
||||
throw Exception("Cannot push request to queue within operation timeout", ErrorCodes::TIMEOUT_EXCEEDED);
|
||||
|
@ -843,6 +843,9 @@ struct KeeperStorageGetRequestProcessor final : public KeeperStorageRequestProce
|
||||
{
|
||||
Coordination::ZooKeeperGetRequest & request = dynamic_cast<Coordination::ZooKeeperGetRequest &>(*zk_request);
|
||||
|
||||
if (request.path == Coordination::keeper_api_version_path)
|
||||
return {};
|
||||
|
||||
if (!storage.uncommitted_state.getNode(request.path))
|
||||
return {KeeperStorage::Delta{zxid, Coordination::Error::ZNONODE}};
|
||||
|
||||
@ -865,6 +868,16 @@ struct KeeperStorageGetRequestProcessor final : public KeeperStorageRequestProce
|
||||
}
|
||||
}
|
||||
|
||||
// We cannot store the node because the result should be connected to the binary itself
|
||||
// this way we avoid incorrect results when we read a snapshot from older Keeper that can have
|
||||
// lower API version
|
||||
if (request.path == Coordination::keeper_api_version_path)
|
||||
{
|
||||
response.data = std::to_string(static_cast<uint8_t>(Coordination::current_keeper_api_version));
|
||||
response.error = Coordination::Error::ZOK;
|
||||
return response_ptr;
|
||||
}
|
||||
|
||||
auto & container = storage.container;
|
||||
auto node_it = container.find(request.path);
|
||||
if (node_it == container.end())
|
||||
@ -911,6 +924,12 @@ struct KeeperStorageRemoveRequestProcessor final : public KeeperStorageRequestPr
|
||||
|
||||
std::vector<KeeperStorage::Delta> new_deltas;
|
||||
|
||||
if (request.path == Coordination::keeper_api_version_path)
|
||||
{
|
||||
LOG_ERROR(&Poco::Logger::get("KeeperStorage"), "Trying to delete an internal Keeper path ({}) which is not allowed", Coordination::keeper_api_version_path);
|
||||
return {KeeperStorage::Delta{zxid, Coordination::Error::ZBADARGUMENTS}};
|
||||
}
|
||||
|
||||
const auto update_parent_pzxid = [&]()
|
||||
{
|
||||
auto parent_path = parentPath(request.path);
|
||||
@ -1057,6 +1076,12 @@ struct KeeperStorageSetRequestProcessor final : public KeeperStorageRequestProce
|
||||
|
||||
std::vector<KeeperStorage::Delta> new_deltas;
|
||||
|
||||
if (request.path == Coordination::keeper_api_version_path)
|
||||
{
|
||||
LOG_ERROR(&Poco::Logger::get("KeeperStorage"), "Trying to update an internal Keeper path ({}) which is not allowed", Coordination::keeper_api_version_path);
|
||||
return {KeeperStorage::Delta{zxid, Coordination::Error::ZBADARGUMENTS}};
|
||||
}
|
||||
|
||||
if (!storage.uncommitted_state.getNode(request.path))
|
||||
return {KeeperStorage::Delta{zxid, Coordination::Error::ZNONODE}};
|
||||
|
||||
@ -1318,6 +1343,12 @@ struct KeeperStorageSetACLRequestProcessor final : public KeeperStorageRequestPr
|
||||
{
|
||||
Coordination::ZooKeeperSetACLRequest & request = dynamic_cast<Coordination::ZooKeeperSetACLRequest &>(*zk_request);
|
||||
|
||||
if (request.path == Coordination::keeper_api_version_path)
|
||||
{
|
||||
LOG_ERROR(&Poco::Logger::get("KeeperStorage"), "Trying to update an internal Keeper path ({}) which is not allowed", Coordination::keeper_api_version_path);
|
||||
return {KeeperStorage::Delta{zxid, Coordination::Error::ZBADARGUMENTS}};
|
||||
}
|
||||
|
||||
auto & uncommitted_state = storage.uncommitted_state;
|
||||
if (!uncommitted_state.getNode(request.path))
|
||||
return {KeeperStorage::Delta{zxid, Coordination::Error::ZNONODE}};
|
||||
|
@ -2120,6 +2120,20 @@ TEST_P(CoordinationTest, TestDurableState)
|
||||
}
|
||||
}
|
||||
|
||||
TEST_P(CoordinationTest, TestCurrentApiVersion)
|
||||
{
|
||||
using namespace Coordination;
|
||||
KeeperStorage storage{500, "", true};
|
||||
auto request = std::make_shared<ZooKeeperGetRequest>();
|
||||
request->path = Coordination::keeper_api_version_path;
|
||||
auto responses = storage.processRequest(request, 0, std::nullopt, true, true);
|
||||
const auto & get_response = getSingleResponse<ZooKeeperGetResponse>(responses);
|
||||
uint8_t keeper_version{0};
|
||||
DB::ReadBufferFromOwnString buf(get_response.data);
|
||||
DB::readIntText(keeper_version, buf);
|
||||
EXPECT_EQ(keeper_version, current_keeper_api_version);
|
||||
}
|
||||
|
||||
INSTANTIATE_TEST_SUITE_P(CoordinationTestSuite,
|
||||
CoordinationTest,
|
||||
::testing::ValuesIn(std::initializer_list<CompressionParam>{
|
||||
|
Loading…
Reference in New Issue
Block a user