Merge remote-tracking branch 'origin/master' into s3queue-fix-ordered-mode

This commit is contained in:
kssenii 2024-06-04 13:52:45 +02:00
commit 080c6037ab
4 changed files with 39 additions and 17 deletions

View File

@ -637,6 +637,9 @@ void TestKeeper::finalize(const String &)
expired = true; expired = true;
} }
/// Signal request_queue to wake up processing thread without waiting for timeout
requests_queue.finish();
processing_thread.join(); processing_thread.join();
try try

View File

@ -1,5 +1,4 @@
#include "ZooKeeper.h" #include "ZooKeeper.h"
#include "Coordination/KeeperConstants.h"
#include "Coordination/KeeperFeatureFlags.h" #include "Coordination/KeeperFeatureFlags.h"
#include "ZooKeeperImpl.h" #include "ZooKeeperImpl.h"
#include "KeeperException.h" #include "KeeperException.h"
@ -376,11 +375,14 @@ void ZooKeeper::createAncestors(const std::string & path)
} }
Coordination::Responses responses; Coordination::Responses responses;
Coordination::Error code = multiImpl(create_ops, responses, /*check_session_valid*/ false); const auto & [code, failure_reason] = multiImpl(create_ops, responses, /*check_session_valid*/ false);
if (code == Coordination::Error::ZOK) if (code == Coordination::Error::ZOK)
return; return;
if (!failure_reason.empty())
throw KeeperException::fromMessage(code, failure_reason);
throw KeeperException::fromPath(code, path); throw KeeperException::fromPath(code, path);
} }
@ -676,17 +678,19 @@ Coordination::Error ZooKeeper::trySet(const std::string & path, const std::strin
} }
Coordination::Error ZooKeeper::multiImpl(const Coordination::Requests & requests, Coordination::Responses & responses, bool check_session_valid) std::pair<Coordination::Error, std::string>
ZooKeeper::multiImpl(const Coordination::Requests & requests, Coordination::Responses & responses, bool check_session_valid)
{ {
if (requests.empty()) if (requests.empty())
return Coordination::Error::ZOK; return {Coordination::Error::ZOK, ""};
std::future<Coordination::MultiResponse> future_result; std::future<Coordination::MultiResponse> future_result;
Coordination::Requests requests_with_check_session;
if (check_session_valid) if (check_session_valid)
{ {
Coordination::Requests new_requests = requests; requests_with_check_session = requests;
addCheckSessionOp(new_requests); addCheckSessionOp(requests_with_check_session);
future_result = asyncTryMultiNoThrow(new_requests); future_result = asyncTryMultiNoThrow(requests_with_check_session);
} }
else else
{ {
@ -696,7 +700,7 @@ Coordination::Error ZooKeeper::multiImpl(const Coordination::Requests & requests
if (future_result.wait_for(std::chrono::milliseconds(args.operation_timeout_ms)) != std::future_status::ready) if (future_result.wait_for(std::chrono::milliseconds(args.operation_timeout_ms)) != std::future_status::ready)
{ {
impl->finalize(fmt::format("Operation timeout on {} {}", Coordination::OpNum::Multi, requests[0]->getPath())); impl->finalize(fmt::format("Operation timeout on {} {}", Coordination::OpNum::Multi, requests[0]->getPath()));
return Coordination::Error::ZOPERATIONTIMEOUT; return {Coordination::Error::ZOPERATIONTIMEOUT, ""};
} }
else else
{ {
@ -704,11 +708,14 @@ Coordination::Error ZooKeeper::multiImpl(const Coordination::Requests & requests
Coordination::Error code = response.error; Coordination::Error code = response.error;
responses = response.responses; responses = response.responses;
std::string reason;
if (check_session_valid) if (check_session_valid)
{ {
if (code != Coordination::Error::ZOK && !Coordination::isHardwareError(code) && getFailedOpIndex(code, responses) == requests.size()) if (code != Coordination::Error::ZOK && !Coordination::isHardwareError(code) && getFailedOpIndex(code, responses) == requests.size())
{ {
impl->finalize(fmt::format("Session was killed: {}", requests.back()->getPath())); reason = fmt::format("Session was killed: {}", requests_with_check_session.back()->getPath());
impl->finalize(reason);
code = Coordination::Error::ZSESSIONMOVED; code = Coordination::Error::ZSESSIONMOVED;
} }
responses.pop_back(); responses.pop_back();
@ -717,23 +724,33 @@ Coordination::Error ZooKeeper::multiImpl(const Coordination::Requests & requests
chassert(code == Coordination::Error::ZOK || Coordination::isHardwareError(code) || responses.back()->error != Coordination::Error::ZOK); chassert(code == Coordination::Error::ZOK || Coordination::isHardwareError(code) || responses.back()->error != Coordination::Error::ZOK);
} }
return code; return {code, std::move(reason)};
} }
} }
Coordination::Responses ZooKeeper::multi(const Coordination::Requests & requests, bool check_session_valid) Coordination::Responses ZooKeeper::multi(const Coordination::Requests & requests, bool check_session_valid)
{ {
Coordination::Responses responses; Coordination::Responses responses;
Coordination::Error code = multiImpl(requests, responses, check_session_valid); const auto & [code, failure_reason] = multiImpl(requests, responses, check_session_valid);
if (!failure_reason.empty())
throw KeeperException::fromMessage(code, failure_reason);
KeeperMultiException::check(code, requests, responses); KeeperMultiException::check(code, requests, responses);
return responses; return responses;
} }
Coordination::Error ZooKeeper::tryMulti(const Coordination::Requests & requests, Coordination::Responses & responses, bool check_session_valid) Coordination::Error ZooKeeper::tryMulti(const Coordination::Requests & requests, Coordination::Responses & responses, bool check_session_valid)
{ {
Coordination::Error code = multiImpl(requests, responses, check_session_valid); const auto & [code, failure_reason] = multiImpl(requests, responses, check_session_valid);
if (code != Coordination::Error::ZOK && !Coordination::isUserError(code)) if (code != Coordination::Error::ZOK && !Coordination::isUserError(code))
{
if (!failure_reason.empty())
throw KeeperException::fromMessage(code, failure_reason);
throw KeeperException(code); throw KeeperException(code);
}
return code; return code;
} }
@ -1346,7 +1363,7 @@ Coordination::Error ZooKeeper::tryMultiNoThrow(const Coordination::Requests & re
{ {
try try
{ {
return multiImpl(requests, responses, check_session_valid); return multiImpl(requests, responses, check_session_valid).first;
} }
catch (const Coordination::Exception & e) catch (const Coordination::Exception & e)
{ {

View File

@ -2,10 +2,8 @@
#include "Types.h" #include "Types.h"
#include <Poco/Util/LayeredConfiguration.h> #include <Poco/Util/LayeredConfiguration.h>
#include <unordered_set>
#include <future> #include <future>
#include <memory> #include <memory>
#include <mutex>
#include <string> #include <string>
#include <Common/logger_useful.h> #include <Common/logger_useful.h>
#include <Common/ProfileEvents.h> #include <Common/ProfileEvents.h>
@ -18,7 +16,6 @@
#include <Common/thread_local_rng.h> #include <Common/thread_local_rng.h>
#include <Coordination/KeeperFeatureFlags.h> #include <Coordination/KeeperFeatureFlags.h>
#include <unistd.h> #include <unistd.h>
#include <random>
namespace ProfileEvents namespace ProfileEvents
@ -644,7 +641,11 @@ private:
Coordination::Stat * stat, Coordination::Stat * stat,
Coordination::WatchCallbackPtr watch_callback, Coordination::WatchCallbackPtr watch_callback,
Coordination::ListRequestType list_request_type); Coordination::ListRequestType list_request_type);
Coordination::Error multiImpl(const Coordination::Requests & requests, Coordination::Responses & responses, bool check_session_valid);
/// returns error code with optional reason
std::pair<Coordination::Error, std::string>
multiImpl(const Coordination::Requests & requests, Coordination::Responses & responses, bool check_session_valid);
Coordination::Error existsImpl(const std::string & path, Coordination::Stat * stat_, Coordination::WatchCallback watch_callback); Coordination::Error existsImpl(const std::string & path, Coordination::Stat * stat_, Coordination::WatchCallback watch_callback);
Coordination::Error syncImpl(const std::string & path, std::string & returned_path); Coordination::Error syncImpl(const std::string & path, std::string & returned_path);

View File

@ -17,6 +17,7 @@
#include <Common/ZooKeeper/ZooKeeper.h> #include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/getRandomASCIIString.h> #include <Common/getRandomASCIIString.h>
#include <Common/randomSeed.h> #include <Common/randomSeed.h>
#include <numeric>
namespace ProfileEvents namespace ProfileEvents