This commit is contained in:
Alexander Tokmakov 2021-07-13 22:01:43 +03:00
parent f0ed6be269
commit 590615df66
4 changed files with 16 additions and 16 deletions

View File

@ -587,11 +587,9 @@ void ZooKeeperMultiRequest::createLogElements(LogElements & elems) const
elems.back().requests_size = requests.size(); elems.back().requests_size = requests.size();
for (const auto & request : requests) for (const auto & request : requests)
{ {
auto * req = dynamic_cast<ZooKeeperRequest *>(request.get()); auto & req = dynamic_cast<ZooKeeperRequest &>(*request);
assert(req); assert(!req.xid || req.xid == xid);
assert(!req->xid || req->xid == xid); req.createLogElements(elems);
req->xid = xid;
req->createLogElements(elems);
} }
} }
@ -662,13 +660,12 @@ void ZooKeeperMultiResponse::fillLogElements(LogElements & elems, size_t idx) co
ZooKeeperResponse::fillLogElements(elems, idx); ZooKeeperResponse::fillLogElements(elems, idx);
for (const auto & response : responses) for (const auto & response : responses)
{ {
auto * resp = dynamic_cast<ZooKeeperResponse *>(response.get()); auto & resp = dynamic_cast<ZooKeeperResponse &>(*response);
assert(resp); assert(!resp.xid || resp.xid == xid);
assert(!resp->xid || resp->xid == xid); assert(!resp.zxid || resp.zxid == zxid);
assert(!resp->zxid || resp->zxid == zxid); resp.xid = xid;
resp->xid = xid; resp.zxid = zxid;
resp->zxid = zxid; resp.fillLogElements(elems, ++idx);
resp->fillLogElements(elems, ++idx);
} }
} }

View File

@ -1008,6 +1008,12 @@ void ZooKeeper::pushRequest(RequestInfo && info)
throw Exception("xid equal to close_xid", Error::ZSESSIONEXPIRED); throw Exception("xid equal to close_xid", Error::ZSESSIONEXPIRED);
if (info.request->xid < 0) if (info.request->xid < 0)
throw Exception("XID overflow", Error::ZSESSIONEXPIRED); throw Exception("XID overflow", Error::ZSESSIONEXPIRED);
if (auto * multi_request = dynamic_cast<ZooKeeperMultiRequest *>(info.request.get()))
{
for (auto & request : multi_request->requests)
dynamic_cast<ZooKeeperRequest &>(*request).xid = multi_request->xid;
}
} }
/// We must serialize 'pushRequest' and 'finalize' (from sendThread, receiveThread) calls /// We must serialize 'pushRequest' and 'finalize' (from sendThread, receiveThread) calls

View File

@ -31,10 +31,6 @@ SRCS(
MySQL/PacketsProtocolText.cpp MySQL/PacketsProtocolText.cpp
MySQL/PacketsReplication.cpp MySQL/PacketsReplication.cpp
NamesAndTypes.cpp NamesAndTypes.cpp
PostgreSQL/Connection.cpp
PostgreSQL/PoolWithFailover.cpp
PostgreSQL/Utils.cpp
PostgreSQL/insertPostgreSQLValue.cpp
PostgreSQLProtocol.cpp PostgreSQLProtocol.cpp
QueryProcessingStage.cpp QueryProcessingStage.cpp
Settings.cpp Settings.cpp

View File

@ -1,5 +1,6 @@
drop table if exists rmt; drop table if exists rmt;
create table rmt (n int) engine=ReplicatedMergeTree('/test/01158/{database}/rmt', '1') order by n; create table rmt (n int) engine=ReplicatedMergeTree('/test/01158/{database}/rmt', '1') order by n;
system sync replica rmt;
insert into rmt values (1); insert into rmt values (1);
insert into rmt values (1); insert into rmt values (1);
system flush logs; system flush logs;