Fix nasty bug

This commit is contained in:
alesapin 2021-04-16 21:31:23 +03:00
parent 7ed32dddf0
commit 690045c67f
7 changed files with 68 additions and 30 deletions

2
contrib/NuRaft vendored

@ -1 +1 @@
Subproject commit d2feb5978b979729a07c3ca76eaa4ab94cef4ceb
Subproject commit 377f8e77491d9f66ce8e32e88aae19dffe8dc4d7

View File

@ -205,6 +205,13 @@ void ZooKeeperSetRequest::readImpl(ReadBuffer & in)
Coordination::read(version, in);
}
String ZooKeeperSetRequest::toString() const
{
WriteBufferFromOwnString result;
result << "type: Set " << "xid: " << xid << " path: " << path << "data: " << data << '\n';
return result.str();
}
void ZooKeeperSetResponse::readImpl(ReadBuffer & in)
{
Coordination::read(stat, in);
@ -459,12 +466,14 @@ void ZooKeeperSessionIDRequest::writeImpl(WriteBuffer & out) const
{
Coordination::write(internal_id, out);
Coordination::write(session_timeout_ms, out);
Coordination::write(server_id, out);
}
void ZooKeeperSessionIDRequest::readImpl(ReadBuffer & in)
{
Coordination::read(internal_id, in);
Coordination::read(session_timeout_ms, in);
Coordination::read(server_id, in);
}
Coordination::ZooKeeperResponsePtr ZooKeeperSessionIDRequest::makeResponse() const
@ -476,12 +485,14 @@ void ZooKeeperSessionIDResponse::readImpl(ReadBuffer & in)
{
Coordination::read(internal_id, in);
Coordination::read(session_id, in);
Coordination::read(server_id, in);
}
void ZooKeeperSessionIDResponse::writeImpl(WriteBuffer & out) const
{
Coordination::write(internal_id, out);
Coordination::write(session_id, out);
Coordination::write(server_id, out);
}
void ZooKeeperRequestFactory::registerRequest(OpNum op_num, Creator creator)

View File

@ -61,6 +61,7 @@ struct ZooKeeperRequest : virtual Request
virtual ZooKeeperResponsePtr makeResponse() const = 0;
virtual bool isReadRequest() const = 0;
virtual String toString() const { return "not implemented"; }
};
using ZooKeeperRequestPtr = std::shared_ptr<ZooKeeperRequest>;
@ -274,6 +275,7 @@ struct ZooKeeperSetRequest final : SetRequest, ZooKeeperRequest
bool isReadRequest() const override { return false; }
size_t bytesSize() const override { return SetRequest::bytesSize() + sizeof(xid); }
String toString() const override;
};
struct ZooKeeperSetResponse final : SetResponse, ZooKeeperResponse
@ -396,6 +398,8 @@ struct ZooKeeperSessionIDRequest final : ZooKeeperRequest
{
int64_t internal_id;
int64_t session_timeout_ms;
/// Who requested this session
int32_t server_id;
Coordination::OpNum getOpNum() const override { return OpNum::SessionID; }
String getPath() const override { return {}; }
@ -412,6 +416,8 @@ struct ZooKeeperSessionIDResponse final : ZooKeeperResponse
{
int64_t internal_id;
int64_t session_id;
/// Who requested this session
int32_t server_id;
void readImpl(ReadBuffer & in) override;

View File

@ -17,7 +17,7 @@ using RaftAppendResult = nuraft::ptr<nuraft::cmd_result<nuraft::ptr<nuraft::buff
class KeeperServer
{
private:
int server_id;
const int server_id;
CoordinationSettingsPtr coordination_settings;
@ -72,6 +72,8 @@ public:
void waitInit();
void shutdown();
int getServerID() const { return server_id; }
};
}

View File

@ -104,6 +104,7 @@ nuraft::ptr<nuraft::buffer> KeeperStateMachine::commit(const uint64_t log_idx, n
std::shared_ptr<Coordination::ZooKeeperSessionIDResponse> response = std::make_shared<Coordination::ZooKeeperSessionIDResponse>();
response->internal_id = session_id_request.internal_id;
response->session_id = session_id;
response->server_id = session_id_request.server_id;
KeeperStorage::ResponseForSession response_for_session;
response_for_session.session_id = -1;
@ -121,7 +122,6 @@ nuraft::ptr<nuraft::buffer> KeeperStateMachine::commit(const uint64_t log_idx, n
}
}
last_committed_idx = log_idx;
return nullptr;
}

View File

@ -63,6 +63,8 @@ void KeeperStorageDispatcher::requestThread()
current_batch.emplace_back(request);
/// Waiting until previous append will be successful, or batch is big enough
/// has_result == false && get_result_code == OK means that our request still not processed.
/// Sometimes NuRaft set errorcode without setting result, so we check both here.
while (prev_result && (!prev_result->has_result() && prev_result->get_result_code() == nuraft::cmd_result_code::OK) && current_batch.size() <= max_batch_size)
{
/// Trying to get batch requests as fast as possible
@ -74,9 +76,12 @@ void KeeperStorageDispatcher::requestThread()
has_read_request = true;
break;
}
else
{
current_batch.emplace_back(request);
}
}
if (shutdown_called)
break;
@ -95,12 +100,12 @@ void KeeperStorageDispatcher::requestThread()
/// Process collected write requests batch
if (!current_batch.empty())
{
prev_result = server->putRequestBatch(current_batch);
auto result = server->putRequestBatch(current_batch);
if (prev_result)
if (result)
{
if (has_read_request) /// If we will execute read request next, than we have to process result now
forceWaitAndProcessResult(std::move(prev_result), std::move(current_batch));
forceWaitAndProcessResult(std::move(result), std::move(current_batch));
}
else
{
@ -108,6 +113,7 @@ void KeeperStorageDispatcher::requestThread()
}
prev_batch = current_batch;
prev_result = result;
}
/// Read request always goes after write batch (last request)
@ -181,8 +187,9 @@ void KeeperStorageDispatcher::setResponse(int64_t session_id, const Coordination
if (response->xid != Coordination::WATCH_XID && response->getOpNum() == Coordination::OpNum::SessionID)
{
const Coordination::ZooKeeperSessionIDResponse & session_id_resp = dynamic_cast<const Coordination::ZooKeeperSessionIDResponse &>(*response);
/// Nobody waits for this session id
if (!new_session_id_response_callback.count(session_id_resp.internal_id))
if (session_id_resp.server_id != server->getServerID() || !new_session_id_response_callback.count(session_id_resp.internal_id))
return;
auto callback = new_session_id_response_callback[session_id_resp.internal_id];
@ -196,6 +203,7 @@ void KeeperStorageDispatcher::setResponse(int64_t session_id, const Coordination
return;
session_writer->second(response);
/// Session closed, no more writes
if (response->xid != Coordination::WATCH_XID && response->getOpNum() == Coordination::OpNum::Close)
{
@ -394,7 +402,7 @@ void KeeperStorageDispatcher::forceWaitAndProcessResult(RaftAppendResult && resu
result->get();
/// If we get some errors, than send them to clients
if (result->get_result_code() == nuraft::cmd_result_code::TIMEOUT)
if (!result->get_accepted() || result->get_result_code() == nuraft::cmd_result_code::TIMEOUT)
addErrorResponses(std::move(requests_for_sessions), Coordination::Error::ZOPERATIONTIMEOUT);
else if (result->get_result_code() != nuraft::cmd_result_code::OK)
addErrorResponses(std::move(requests_for_sessions), Coordination::Error::ZRUNTIMEINCONSISTENCY);
@ -408,6 +416,8 @@ int64_t KeeperStorageDispatcher::getSessionID(long session_timeout_ms)
std::shared_ptr<Coordination::ZooKeeperSessionIDRequest> request = std::make_shared<Coordination::ZooKeeperSessionIDRequest>();
request->internal_id = internal_session_id_counter.fetch_add(1);
request->session_timeout_ms = session_timeout_ms;
request->server_id = server->getServerID();
request_info.request = request;
request_info.session_id = -1;
@ -415,16 +425,23 @@ int64_t KeeperStorageDispatcher::getSessionID(long session_timeout_ms)
auto future = promise->get_future();
{
std::lock_guard lock(session_to_response_callback_mutex);
new_session_id_response_callback[request->internal_id] = [promise] (const Coordination::ZooKeeperResponsePtr & response)
new_session_id_response_callback[request->internal_id] = [promise, internal_id = request->internal_id] (const Coordination::ZooKeeperResponsePtr & response)
{
if (response->getOpNum() != Coordination::OpNum::SessionID)
promise->set_exception(std::make_exception_ptr(Exception(ErrorCodes::LOGICAL_ERROR,
"Incorrect response of type {} instead of SessionID response", Coordination::toString(response->getOpNum()))));
auto session_id_response = dynamic_cast<const Coordination::ZooKeeperSessionIDResponse &>(*response);
if (session_id_response.internal_id != internal_id)
{
promise->set_exception(std::make_exception_ptr(Exception(ErrorCodes::LOGICAL_ERROR,
"Incorrect response with internal id {} instead of {}", session_id_response.internal_id, internal_id)));
}
if (response->error != Coordination::Error::ZOK)
promise->set_exception(std::make_exception_ptr(zkutil::KeeperException("SessionID request failed with error", response->error)));
promise->set_value(dynamic_cast<const Coordination::ZooKeeperSessionIDResponse &>(*response).session_id);
promise->set_value(session_id_response.session_id);
};
}

View File

@ -128,22 +128,24 @@
(c/exec :rm :-rf pid-file-path)
(c/exec :rm :-rf data-dir)
(c/exec :rm :-rf logs-dir)
(c/exec :rm :-rf configs-dir)))));db/LogFiles
;(log-files [_ test node]
; (c/su
(c/exec :rm :-rf configs-dir)))
db/LogFiles
(log-files [_ test node]
(c/su
;(if (cu/exists? pid-file-path)
;(do
; (info node "Collecting traces")
; (collect-traces test node))
;(info node "Pid files doesn't exists"))
; (kill-clickhouse! node test)
; (if (cu/exists? coordination-data-dir)
; (do
; (info node "Coordination files exists, going to compress")
; (c/cd data-dir
; (c/exec :tar :czf "coordination.tar.gz" "coordination")))))
; (let [common-logs [stderr-file (str logs-dir "/clickhouse-server.log") (str data-dir "/coordination.tar.gz")]
; gdb-log (str logs-dir "/gdb.log")]
; (if (cu/exists? (str logs-dir "/gdb.log"))
; (conj common-logs gdb-log)
; common-logs)))))
(kill-clickhouse! node test)
(if (cu/exists? coordination-data-dir)
(do
(info node "Coordination files exists, going to compress")
(c/cd data-dir
(c/exec :tar :czf "coordination.tar.gz" "coordination")))))
(let [common-logs [stderr-file (str logs-dir "/clickhouse-server.log") (str data-dir "/coordination.tar.gz")]
gdb-log (str logs-dir "/gdb.log")]
(if (cu/exists? (str logs-dir "/gdb.log"))
(conj common-logs gdb-log)
common-logs)))))