Fix timeouts

This commit is contained in:
alesapin 2020-11-11 16:55:28 +03:00
parent 66236d6ebb
commit d83c68fca8
13 changed files with 47 additions and 33 deletions

View File

@ -195,17 +195,17 @@ Poco::Net::SocketAddress makeSocketAddress(const std::string & host, UInt16 port
|| code == EAI_ADDRFAMILY
#endif
)
{
LOG_ERROR(log, "Cannot resolve listen_host ({}), error {}: {}. "
"If it is an IPv6 address and your host has disabled IPv6, then consider to "
"specify IPv4 address to listen in <listen_host> element of configuration "
"file. Example: <listen_host>0.0.0.0</listen_host>",
host, e.code(), e.message());
}
{
LOG_ERROR(log, "Cannot resolve listen_host ({}), error {}: {}. "
"If it is an IPv6 address and your host has disabled IPv6, then consider to "
"specify IPv4 address to listen in <listen_host> element of configuration "
"file. Example: <listen_host>0.0.0.0</listen_host>",
host, e.code(), e.message());
}
throw;
}
return socket_address;
throw;
}
return socket_address;
}
Poco::Net::SocketAddress Server::socketBindListen(Poco::Net::ServerSocket & socket, const std::string & host, UInt16 port, [[maybe_unused]] bool secure)

View File

@ -6,10 +6,22 @@
#include <common/logger_useful.h>
#include <Common/StringUtils/StringUtils.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
}
namespace zkutil
{
using namespace DB;
static String parentPath(const String & path)
{
auto rslash_pos = path.rfind('/');

View File

@ -19,7 +19,7 @@ class TestKeeperStorage
public:
Poco::Timespan operation_timeout{10000};
Poco::Timespan operation_timeout{0, Coordination::DEFAULT_OPERATION_TIMEOUT_MS * 1000};
std::atomic<int64_t> session_id_counter{0};
struct Node
@ -85,5 +85,5 @@ public:
return zxid.fetch_add(1);
}
};
}

View File

@ -129,8 +129,8 @@ struct ZooKeeperArgs
std::vector<std::string> hosts_strings;
session_timeout_ms = DEFAULT_SESSION_TIMEOUT;
operation_timeout_ms = DEFAULT_OPERATION_TIMEOUT;
session_timeout_ms = Coordination::DEFAULT_SESSION_TIMEOUT_MS;
operation_timeout_ms = Coordination::DEFAULT_OPERATION_TIMEOUT_MS;
implementation = "zookeeper";
for (const auto & key : keys)
{

View File

@ -11,6 +11,7 @@
#include <Common/ProfileEvents.h>
#include <Common/CurrentMetrics.h>
#include <Common/ZooKeeper/IKeeper.h>
#include <Common/ZooKeeper/ZooKeeperConstants.h>
#include <unistd.h>
@ -28,9 +29,6 @@ namespace CurrentMetrics
namespace zkutil
{
const UInt32 DEFAULT_SESSION_TIMEOUT = 30000;
const UInt32 DEFAULT_OPERATION_TIMEOUT = 10000;
/// Preferred size of multi() command (in number of ops)
constexpr size_t MULTI_BATCH_SIZE = 100;
@ -53,8 +51,8 @@ public:
using Ptr = std::shared_ptr<ZooKeeper>;
ZooKeeper(const std::string & hosts_, const std::string & identity_ = "",
int32_t session_timeout_ms_ = DEFAULT_SESSION_TIMEOUT,
int32_t operation_timeout_ms_ = DEFAULT_OPERATION_TIMEOUT,
int32_t session_timeout_ms_ = Coordination::DEFAULT_SESSION_TIMEOUT_MS,
int32_t operation_timeout_ms_ = Coordination::DEFAULT_OPERATION_TIMEOUT_MS,
const std::string & chroot_ = "",
const std::string & implementation_ = "zookeeper");

View File

@ -17,13 +17,11 @@ void ZooKeeperResponse::write(WriteBuffer & out) const
{
/// Excessive copy to calculate length.
WriteBufferFromOwnString buf;
//LOG_DEBUG(&Poco::Logger::get("LOG"), "WRITING {}", xid);
Coordination::write(xid, buf);
Coordination::write(zxid, buf);
Coordination::write(error, buf);
if (error == Error::ZOK)
writeImpl(buf);
//LOG_DEBUG(&Poco::Logger::get("LOG"), "BUFFER LENGTH {}", buf.str().length());
Coordination::write(buf.str(), out);
out.next();
}
@ -426,7 +424,7 @@ ZooKeeperResponsePtr ZooKeeperCloseRequest::makeResponse() const { return std::m
void ZooKeeperRequestFactory::registerRequest(OpNum op_num, Creator creator)
{
if (!op_num_to_request.try_emplace(op_num, creator).second)
throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "Request with op num {} already registered", op_num);
throw Coordination::Exception("Request type " + toString(op_num) + " already registered", Coordination::Error::ZRUNTIMEINCONSISTENCY);
}
std::shared_ptr<ZooKeeperRequest> ZooKeeperRequest::read(ReadBuffer & in)
@ -478,5 +476,5 @@ ZooKeeperRequestFactory::ZooKeeperRequestFactory()
registerZooKeeperRequest<OpNum::Check, ZooKeeperCheckRequest>(*this);
registerZooKeeperRequest<OpNum::Multi, ZooKeeperMultiRequest>(*this);
}
}

View File

@ -41,7 +41,7 @@ static constexpr int32_t PASSWORD_LENGTH = 16;
/// ZooKeeper has 1 MB node size and serialization limit by default,
/// but it can be raised up, so we have a slightly larger limit on our side.
static constexpr int32_t MAX_STRING_OR_ARRAY_SIZE = 1 << 28; /// 256 MiB
static constexpr int32_t DEFAULT_SESSION_TIMEOUT = 30000;
static constexpr int32_t DEFAULT_OPERATION_TIMEOUT = 10000;
static constexpr int32_t DEFAULT_SESSION_TIMEOUT_MS = 30000;
static constexpr int32_t DEFAULT_OPERATION_TIMEOUT_MS = 10000;
}

View File

@ -16,7 +16,7 @@ void write(int32_t x, WriteBuffer & out)
void write(OpNum x, WriteBuffer & out)
{
write(static_cast<int32_t>(x), out);
write(static_cast<int32_t>(x), out);
}
void write(bool x, WriteBuffer & out)

View File

@ -78,7 +78,11 @@ SRCS(
WeakHash.cpp
ZooKeeper/IKeeper.cpp
ZooKeeper/TestKeeper.cpp
ZooKeeper/TestKeeperStorage.cpp
ZooKeeper/ZooKeeper.cpp
ZooKeeper/ZooKeeperCommon.cpp
ZooKeeper/ZooKeeperConstants.cpp
ZooKeeper/ZooKeeperIO.cpp
ZooKeeper/ZooKeeperImpl.cpp
ZooKeeper/ZooKeeperNodeCache.cpp
checkStackSize.cpp

View File

@ -25,7 +25,7 @@ void TestKeeperTCPHandler::sendHandshake()
Coordination::write(Coordination::SERVER_HANDSHAKE_LENGTH, *out);
Coordination::write(Coordination::ZOOKEEPER_PROTOCOL_VERSION, *out);
Coordination::write(Coordination::DEFAULT_SESSION_TIMEOUT, *out);
Coordination::write(Coordination::DEFAULT_SESSION_TIMEOUT_MS, *out);
Coordination::write(test_keeper_storage->getSessionID(), *out);
std::array<char, Coordination::PASSWORD_LENGTH> passwd{};
Coordination::write(passwd, *out);
@ -105,7 +105,7 @@ void TestKeeperTCPHandler::runImpl()
while (true)
{
using namespace std::chrono_literals;
while(!responses.empty())
while (!responses.empty())
{
if (responses.front().wait_for(10ms) != std::future_status::ready)
break;

View File

@ -22,8 +22,8 @@ public:
, log(&Poco::Logger::get("TestKeeperTCPHandler"))
, global_context(server.context())
, test_keeper_storage(global_context.getTestKeeperStorage())
, operation_timeout(Coordination::DEFAULT_OPERATION_TIMEOUT)
, session_timeout(Coordination::DEFAULT_SESSION_TIMEOUT)
, operation_timeout(0, Coordination::DEFAULT_OPERATION_TIMEOUT_MS * 1000)
, session_timeout(0, Coordination::DEFAULT_SESSION_TIMEOUT_MS * 1000)
{
}

View File

@ -21,6 +21,7 @@ SRCS(
ReplicasStatusHandler.cpp
StaticRequestHandler.cpp
TCPHandler.cpp
TestKeeperTCPHandler.cpp
WebUIRequestHandler.cpp
)

View File

@ -135,7 +135,7 @@ void testMultiRequest(zkutil::ZooKeeper & zk)
zk.multi(requests);
std::terminate();
}
catch(...)
catch (...)
{
std::cerr << "Got exception on multy request (it's ok)\n";
}
@ -143,7 +143,8 @@ void testMultiRequest(zkutil::ZooKeeper & zk)
checkEq(zk, "/data/multirequest", "bbb");
}
int main(int argc, char *argv[]) {
int main(int argc, char *argv[])
{
if (argc != 2)
{
@ -167,7 +168,7 @@ int main(int argc, char *argv[]) {
testCreateSetWatchEvent(zk);
testCreateListWatchEvent(zk);
}
catch(...)
catch (...)
{
zk.tryRemoveRecursive("/data");
throw;