zkutil: fixed error with async. interface [#METR-14296].

This commit is contained in:
Alexey Milovidov 2014-12-14 07:38:11 +03:00
parent 52304c68e7
commit 40d28a78e1
3 changed files with 36 additions and 14 deletions

View File

@ -183,16 +183,35 @@ public:
private:
using Task = std::packaged_task<Result (TaskParams...)>;
using TaskPtr = std::unique_ptr<Task>;
using TaskPtrPtr = std::unique_ptr<TaskPtr>;
TaskPtr task;
/** Всё очень сложно.
*
* В асинхронном интерфейсе libzookeeper, функция (например, zoo_aget)
* принимает указатель на свободную функцию-коллбэк и void* указатель на данные.
* Указатель на данные потом передаётся в callback.
* Это значит, что мы должны сами обеспечить, чтобы данные жили во время работы этой функции и до конца работы callback-а,
* и не можем просто так передать владение данными внутрь функции.
* Для этого, мы засовываем данные в объект Future, который возвращается пользователю. Данные будут жить, пока живёт объект Future.
* Данные засунуты в unique_ptr, чтобы при возврате объекта Future из функции, их адрес (который передаётся в libzookeeper) не менялся.
*
* Вторая проблема состоит в том, что после того, как std::promise был удовлетворён, и пользователь получил результат из std::future,
* объект Future может быть уничтожен, при чём раньше, чем завершит работу в другом потоке функция, которая удовлетворяет promise.
* См. http://stackoverflow.com/questions/10843304/race-condition-in-pthread-once
* Чтобы этого избежать, используется второй unique_ptr. Внутри callback-а, void* данные преобразуются в unique_ptr, и
* перемещаются в локальную переменную unique_ptr, чтобы продлить время жизни данных.
*/
TaskPtrPtr task;
std::future<Result> future;
template <typename... Args>
Future(Args &&... args) : task(new Task(std::forward<Args>(args)...)) {}
Future(Args &&... args) : task(new TaskPtr(new Task(std::forward<Args>(args)...))), future((*task)->get_future()) {}
public:
Result get()
{
return task->get_future().get();
return future.get();
}
};

View File

@ -623,8 +623,8 @@ ZooKeeper::GetFuture ZooKeeper::asyncGet(const std::string & path)
impl, path.c_str(), 0,
[] (int rc, const char * value, int value_len, const Stat * stat, const void * data)
{
auto & task = const_cast<GetFuture::Task &>(*static_cast<const GetFuture::Task *>(data));
task(rc, value, value_len, stat);
GetFuture::TaskPtr owned_task = std::move(const_cast<GetFuture::TaskPtr &>(*static_cast<const GetFuture::TaskPtr *>(data)));
(*owned_task)(rc, value, value_len, stat);
},
future.task.get());
@ -652,8 +652,8 @@ ZooKeeper::TryGetFuture ZooKeeper::asyncTryGet(const std::string & path)
impl, path.c_str(), 0,
[] (int rc, const char * value, int value_len, const Stat * stat, const void * data)
{
auto & task = const_cast<TryGetFuture::Task &>(*static_cast<const TryGetFuture::Task *>(data));
task(rc, value, value_len, stat);
TryGetFuture::TaskPtr owned_task = std::move(const_cast<TryGetFuture::TaskPtr &>(*static_cast<const TryGetFuture::TaskPtr *>(data)));
(*owned_task)(rc, value, value_len, stat);
},
future.task.get());
@ -681,8 +681,8 @@ ZooKeeper::ExistsFuture ZooKeeper::asyncExists(const std::string & path)
impl, path.c_str(), 0,
[] (int rc, const Stat * stat, const void * data)
{
auto & task = const_cast<ExistsFuture::Task &>(*static_cast<const ExistsFuture::Task *>(data));
task(rc, stat);
ExistsFuture::TaskPtr owned_task = std::move(const_cast<ExistsFuture::TaskPtr &>(*static_cast<const ExistsFuture::TaskPtr *>(data)));
(*owned_task)(rc, stat);
},
future.task.get());
@ -715,8 +715,9 @@ ZooKeeper::GetChildrenFuture ZooKeeper::asyncGetChildren(const std::string & pat
impl, path.c_str(), 0,
[] (int rc, const String_vector * strings, const void * data)
{
auto & task = const_cast<GetChildrenFuture::Task &>(*static_cast<const GetChildrenFuture::Task *>(data));
task(rc, strings);
GetChildrenFuture::TaskPtr owned_task =
std::move(const_cast<GetChildrenFuture::TaskPtr &>(*static_cast<const GetChildrenFuture::TaskPtr *>(data)));
(*owned_task)(rc, strings);
},
future.task.get());

View File

@ -1,4 +1,5 @@
#include <zkutil/ZooKeeper.h>
#include <DB/IO/ReadHelpers.h>
int main(int argc, char ** argv)
@ -8,16 +9,17 @@ try
auto nodes = zookeeper.getChildren("/tmp");
size_t num_threads = DB::parse<size_t>(argv[1]);
std::vector<std::thread> threads;
for (size_t i = 0; i < 4; ++i)
for (size_t i = 0; i < num_threads; ++i)
{
threads.emplace_back([&]
{
while (true)
{
std::vector<zkutil::ZooKeeper::GetFuture> futures;
std::vector<zkutil::ZooKeeper::TryGetFuture> futures;
for (auto & node : nodes)
futures.push_back(zookeeper.asyncGet("/tmp/" + node));
futures.push_back(zookeeper.asyncTryGet("/tmp/" + node));
for (auto & future : futures)
std::cerr << (future.get().value.empty() ? ',' : '.');