ZooKeeper: fixed error [#CLICKHOUSE-2]

This commit is contained in:
Alexey Milovidov 2018-04-05 07:31:04 +03:00
parent e00e81c3e9
commit d3408d45a8
2 changed files with 42 additions and 25 deletions

View File

@ -711,12 +711,26 @@ void ZooKeeper::sendThread()
std::chrono::duration_cast<std::chrono::milliseconds>(next_heartbeat_time - now).count(),
operation_timeout.totalMilliseconds());
RequestPtr request;
if (requests.tryPop(request, max_wait))
RequestInfo info;
if (requests_queue.tryPop(info, max_wait))
{
request->write(*out);
{
CurrentMetrics::add(CurrentMetrics::ZooKeeperRequest);
std::lock_guard lock(operations_mutex);
operations[info.request->xid] = info;
}
if (request->xid == close_xid)
if (info.watch)
{
info.request->has_watch = true;
CurrentMetrics::add(CurrentMetrics::ZooKeeperWatch);
std::lock_guard lock(watches_mutex);
watches[info.request->getPath()].emplace_back(std::move(info.watch));
}
info.request->write(*out);
if (info.request->xid == close_xid)
break;
}
}
@ -740,9 +754,24 @@ void ZooKeeper::sendThread()
}
/// Drain queue
RequestPtr request;
while (requests.tryPop(request))
;
RequestInfo info;
while (requests_queue.tryPop(info))
{
if (info.callback)
{
ResponsePtr response = info.request->makeResponse();
response->error = ZSESSIONEXPIRED;
info.callback(*response);
}
if (info.watch)
{
WatchResponse response;
response.type = SESSION;
response.state = EXPIRED_SESSION;
response.error = ZSESSIONEXPIRED;
info.watch(response);
}
}
}
@ -1004,7 +1033,7 @@ void ZooKeeper::finalize(bool error_send, bool error_receive)
{
RequestInfo & request_info = op.second;
ResponsePtr response = request_info.request->makeResponse();
response->error = ZCONNECTIONLOSS;
response->error = ZSESSIONEXPIRED;
if (request_info.callback)
request_info.callback(*response);
}
@ -1256,21 +1285,7 @@ void ZooKeeper::pushRequest(RequestInfo && info)
ProfileEvents::increment(ProfileEvents::ZooKeeperTransactions);
{
CurrentMetrics::add(CurrentMetrics::ZooKeeperRequest);
std::lock_guard lock(operations_mutex);
operations[info.request->xid] = info;
}
if (info.watch)
{
info.request->has_watch = true;
CurrentMetrics::add(CurrentMetrics::ZooKeeperWatch);
std::lock_guard lock(watches_mutex);
watches[info.request->getPath()].emplace_back(std::move(info.watch));
}
if (!requests.tryPush(info.request, operation_timeout.totalMilliseconds()))
if (!requests_queue.tryPush(std::move(info), operation_timeout.totalMilliseconds()))
throw Exception("Cannot push request to queue within operation timeout", ZOPERATIONTIMEOUT);
}

View File

@ -439,6 +439,8 @@ public:
using CheckCallback = std::function<void(const CheckResponse &)>;
using MultiCallback = std::function<void(const MultiResponse &)>;
/// If the method will throw exception, callbacks won't be called.
/// After the method is executed successfully, you must wait for callbacks.
void create(
const String & path,
@ -573,9 +575,9 @@ private:
clock::time_point time;
};
using RequestsQueue = ConcurrentBoundedQueue<RequestPtr>;
using RequestsQueue = ConcurrentBoundedQueue<RequestInfo>;
RequestsQueue requests{1};
RequestsQueue requests_queue{1};
void pushRequest(RequestInfo && request);
using Operations = std::map<XID, RequestInfo>;