Better errors and better config

This commit is contained in:
alesapin 2020-11-25 16:19:09 +03:00
parent f3555ad2d4
commit fb86eaf6fc
4 changed files with 43 additions and 32 deletions

View File

@ -755,7 +755,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
for (const auto & listen_host : listen_hosts)
{
/// TCP TestKeeper
createServer(listen_host, "test_keeper_tcp_port", listen_try, [&](UInt16 port)
createServer(listen_host, "test_keeper_server.tcp_port", listen_try, [&](UInt16 port)
{
Poco::Net::ServerSocket socket;
auto address = socketBindListen(socket, listen_host, port);

View File

@ -10,10 +10,14 @@
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int TIMEOUT_EXCEEDED;
extern const int BAD_ARGUMENTS;
}
}
namespace zkutil
@ -360,7 +364,7 @@ struct TestKeeperStorageListRequest final : public TestKeeperStorageRequest
{
auto path_prefix = request.path;
if (path_prefix.empty())
throw Coordination::Exception("Logical error: path cannot be empty", Coordination::Error::ZSESSIONEXPIRED);
throw DB::Exception("Logical error: path cannot be empty", ErrorCodes::LOGICAL_ERROR);
if (path_prefix.back() != '/')
path_prefix += '/';
@ -417,26 +421,27 @@ struct TestKeeperStorageMultiRequest final : public TestKeeperStorageRequest
Coordination::ZooKeeperMultiRequest & request = dynamic_cast<Coordination::ZooKeeperMultiRequest &>(*zk_request);
concrete_requests.reserve(request.requests.size());
for (const auto & sub_zk_request : request.requests)
for (const auto & sub_request : request.requests)
{
if (dynamic_cast<const Coordination::ZooKeeperCreateRequest *>(sub_zk_request.get()))
auto sub_zk_request = dynamic_pointer_cast<Coordination::ZooKeeperRequest>(sub_request);
if (sub_zk_request->getOpNum() == Coordination::OpNum::Create)
{
concrete_requests.push_back(std::make_shared<TestKeeperStorageCreateRequest>(dynamic_pointer_cast<Coordination::ZooKeeperCreateRequest>(sub_zk_request)));
concrete_requests.push_back(std::make_shared<TestKeeperStorageCreateRequest>(sub_zk_request));
}
else if (dynamic_cast<const Coordination::ZooKeeperRemoveRequest *>(sub_zk_request.get()))
else if (sub_zk_request->getOpNum() == Coordination::OpNum::Remove)
{
concrete_requests.push_back(std::make_shared<TestKeeperStorageRemoveRequest>(dynamic_pointer_cast<Coordination::ZooKeeperRemoveRequest>(sub_zk_request)));
concrete_requests.push_back(std::make_shared<TestKeeperStorageRemoveRequest>(sub_zk_request));
}
else if (dynamic_cast<const Coordination::ZooKeeperSetRequest *>(sub_zk_request.get()))
else if (sub_zk_request->getOpNum() == Coordination::OpNum::Set)
{
concrete_requests.push_back(std::make_shared<TestKeeperStorageSetRequest>(dynamic_pointer_cast<Coordination::ZooKeeperSetRequest>(sub_zk_request)));
concrete_requests.push_back(std::make_shared<TestKeeperStorageSetRequest>(sub_zk_request));
}
else if (dynamic_cast<const Coordination::ZooKeeperCheckRequest *>(sub_zk_request.get()))
else if (sub_zk_request->getOpNum() == Coordination::OpNum::Check)
{
concrete_requests.push_back(std::make_shared<TestKeeperStorageCheckRequest>(dynamic_pointer_cast<Coordination::ZooKeeperCheckRequest>(sub_zk_request)));
concrete_requests.push_back(std::make_shared<TestKeeperStorageCheckRequest>(sub_zk_request));
}
else
throw Coordination::Exception("Illegal command as part of multi ZooKeeper request", Coordination::Error::ZBADARGUMENTS);
throw DB::Exception(ErrorCodes::BAD_ARGUMENTS, "Illegal command as part of multi ZooKeeper request {}", sub_zk_request->getOpNum());
}
}
@ -490,7 +495,7 @@ struct TestKeeperStorageCloseRequest final : public TestKeeperStorageRequest
using TestKeeperStorageRequest::TestKeeperStorageRequest;
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(TestKeeperStorage::Container &, TestKeeperStorage::Ephemerals &, int64_t, int64_t) const override
{
throw Coordination::Exception("Called process on close request", Coordination::Error::ZRUNTIMEINCONSISTENCY);
throw DB::Exception("Called process on close request", ErrorCodes::LOGICAL_ERROR);
}
};
@ -512,7 +517,7 @@ void TestKeeperStorage::processingThread()
break;
auto zk_request = info.request->zk_request;
if (dynamic_cast<const Coordination::ZooKeeperCloseRequest *>(zk_request.get()))
if (zk_request->getOpNum() == Coordination::OpNum::Close)
{
auto it = ephemerals.find(info.session_id);
if (it != ephemerals.end())
@ -533,13 +538,13 @@ void TestKeeperStorage::processingThread()
{
if (response->error == Coordination::Error::ZOK)
{
auto & watches_type = dynamic_cast<const Coordination::ZooKeeperListRequest *>(zk_request.get())
auto & watches_type = zk_request->getOpNum() == Coordination::OpNum::List
? list_watches
: watches;
watches_type[zk_request->getPath()].emplace_back(std::move(info.watch_callback));
}
else if (response->error == Coordination::Error::ZNONODE && dynamic_cast<const Coordination::ZooKeeperExistsRequest *>(zk_request.get()))
else if (response->error == Coordination::Error::ZNONODE && zk_request->getOpNum() == Coordination::OpNum::Exists)
{
watches[zk_request->getPath()].emplace_back(std::move(info.watch_callback));
}
@ -657,7 +662,7 @@ public:
{
auto it = op_num_to_request.find(zk_request->getOpNum());
if (it == op_num_to_request.end())
throw Coordination::Exception("Unknown operation type " + toString(zk_request->getOpNum()), Coordination::Error::ZBADARGUMENTS);
throw DB::Exception("Unknown operation type " + toString(zk_request->getOpNum()), ErrorCodes::LOGICAL_ERROR);
return it->second(zk_request);
}
@ -705,7 +710,7 @@ void TestKeeperStorage::putCloseRequest(const Coordination::ZooKeeperRequestPtr
request_info.session_id = session_id;
std::lock_guard lock(push_request_mutex);
if (!requests_queue.tryPush(std::move(request_info), operation_timeout.totalMilliseconds()))
throw Exception("Cannot push request to queue within operation timeout", ErrorCodes::LOGICAL_ERROR);
throw Exception("Cannot push request to queue within operation timeout", ErrorCodes::TIMEOUT_EXCEEDED);
}
@ -719,7 +724,7 @@ void TestKeeperStorage::putRequest(const Coordination::ZooKeeperRequestPtr & req
request_info.response_callback = callback;
std::lock_guard lock(push_request_mutex);
if (!requests_queue.tryPush(std::move(request_info), operation_timeout.totalMilliseconds()))
throw Exception("Cannot push request to queue within operation timeout", ErrorCodes::LOGICAL_ERROR);
throw Exception("Cannot push request to queue within operation timeout", ErrorCodes::TIMEOUT_EXCEEDED);
}
void TestKeeperStorage::putRequest(const Coordination::ZooKeeperRequestPtr & request, int64_t session_id, ResponseCallback callback, ResponseCallback watch_callback)
@ -735,7 +740,7 @@ void TestKeeperStorage::putRequest(const Coordination::ZooKeeperRequestPtr & req
std::lock_guard lock(push_request_mutex);
if (!requests_queue.tryPush(std::move(request_info), operation_timeout.totalMilliseconds()))
throw Exception("Cannot push request to queue within operation timeout", ErrorCodes::LOGICAL_ERROR);
throw Exception("Cannot push request to queue within operation timeout", ErrorCodes::TIMEOUT_EXCEEDED);
}
TestKeeperStorage::~TestKeeperStorage()

View File

@ -11,6 +11,8 @@
#include <common/logger_useful.h>
#include <chrono>
#include <Common/PipeFDs.h>
#include <Poco/Util/AbstractConfiguration.h>
#ifdef POCO_HAVE_FD_EPOLL
#include <sys/epoll.h>
@ -23,8 +25,8 @@ namespace DB
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int SYSTEM_ERROR;
extern const int UNEXPECTED_PACKET_FROM_CLIENT;
}
struct SocketInterruptablePollWrapper
@ -149,8 +151,8 @@ TestKeeperTCPHandler::TestKeeperTCPHandler(IServer & server_, const Poco::Net::S
, log(&Poco::Logger::get("TestKeeperTCPHandler"))
, global_context(server.context())
, test_keeper_storage(global_context.getTestKeeperStorage())
, operation_timeout(0, Coordination::DEFAULT_OPERATION_TIMEOUT_MS * 1000)
, session_timeout(0, Coordination::DEFAULT_SESSION_TIMEOUT_MS * 1000)
, operation_timeout(0, global_context.getConfigRef().getUInt("test_keeper_server.operation_timeout_ms", Coordination::DEFAULT_OPERATION_TIMEOUT_MS) * 1000)
, session_timeout(0, global_context.getConfigRef().getUInt("test_keeper_server.session_timeout_ms", Coordination::DEFAULT_SESSION_TIMEOUT_MS) * 1000)
, session_id(test_keeper_storage->getSessionID())
, poll_wrapper(std::make_unique<SocketInterruptablePollWrapper>(socket_))
{
@ -183,23 +185,23 @@ void TestKeeperTCPHandler::receiveHandshake()
Coordination::read(handshake_length, *in);
if (handshake_length != Coordination::CLIENT_HANDSHAKE_LENGTH)
throw Exception("Unexpected handshake length received: " + toString(handshake_length), ErrorCodes::LOGICAL_ERROR);
throw Exception("Unexpected handshake length received: " + toString(handshake_length), ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT);
Coordination::read(protocol_version, *in);
if (protocol_version != Coordination::ZOOKEEPER_PROTOCOL_VERSION)
throw Exception("Unexpected protocol version: " + toString(protocol_version), ErrorCodes::LOGICAL_ERROR);
throw Exception("Unexpected protocol version: " + toString(protocol_version), ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT);
Coordination::read(last_zxid_seen, *in);
if (last_zxid_seen != 0)
throw Exception("Non zero last_zxid_seen is not supported", ErrorCodes::LOGICAL_ERROR);
throw Exception("Non zero last_zxid_seen is not supported", ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT);
Coordination::read(timeout, *in);
Coordination::read(previous_session_id, *in);
if (previous_session_id != 0)
throw Exception("Non zero previous session id is not supported", ErrorCodes::LOGICAL_ERROR);
throw Exception("Non zero previous session id is not supported", ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT);
Coordination::read(passwd, *in);
}

View File

@ -1,3 +1,7 @@
<yandex>
<test_keeper_tcp_port>9181</test_keeper_tcp_port>
<test_keeper_server>
<tcp_port>9181</tcp_port>
<operation_timeout_ms>10000</operation_timeout_ms>
<session_timeout_ms>30000</session_timeout_ms>
</test_keeper_server>
</yandex>