zkutil: Async interface: development [#METR-2944].

This commit is contained in:
Alexey Milovidov 2014-10-17 00:05:26 +04:00
parent c1966ab09a
commit ef82933bb3
4 changed files with 190 additions and 39 deletions

View File

@ -140,15 +140,21 @@ BlockInputStreams StorageSystemZooKeeper::read(
if (path == "/")
path_part.clear();
std::vector<zkutil::ZooKeeper::TryGetFuture> futures;
futures.reserve(nodes.size());
for (const String & node : nodes)
futures.push_back(zookeeper->asyncTryGet(path_part + '/' + node));
for (size_t i = 0, size = nodes.size(); i < size; ++i)
{
String value;
zkutil::Stat stat;
if (!zookeeper->tryGet(path_part + '/' + node, value, &stat))
auto res = futures[i].get();
if (!res.exists)
continue; /// Ноду успели удалить.
col_name.column->insert(node);
col_value.column->insert(value);
const zkutil::Stat & stat = res.stat;
col_name.column->insert(nodes[i]);
col_value.column->insert(res.value);
col_czxid.column->insert(stat.czxid);
col_mzxid.column->insert(stat.mzxid);
col_ctime.column->insert(UInt64(stat.ctime / 1000));

View File

@ -157,15 +157,77 @@ public:
void tryRemoveRecursive(const std::string & path);
/** Асинхронный интерфейс (не доделано). */
/** Асинхронный интерфейс (реализовано небольшое подмножество операций).
*
* Использование:
*
* // Эти вызовы не блокируются.
* auto future1 = zk.asyncGet("/path1");
* auto future2 = zk.asyncGet("/path2");
* ...
*
* // Эти вызовы могут заблокироваться до выполнения операции.
* auto result1 = future1.get();
* auto result2 = future2.get();
*
* future не должна быть уничтожена до получения результата.
* Результат обязательно необходимо получать.
*/
template <typename Result, typename... TaskParams>
class Future
{
friend class ZooKeeper;
private:
using Task = std::packaged_task<Result (TaskParams...)>;
using TaskPtr = std::unique_ptr<Task>;
TaskPtr task;
template <typename... Args>
Future(Args &&... args) : task(new Task(std::forward<Args>(args)...)) {}
public:
Result get()
{
return task->get_future().get();
}
};
struct ValueAndStat
{
std::string value;
Stat stat;
};
typedef std::packaged_task<ValueAndStat (int rc, const char * value, int value_len, const Stat * stat)> GetTask;
std::unique_ptr<GetTask> asyncGet(const std::string & path, EventPtr watch = nullptr);
using GetFuture = Future<ValueAndStat, int, const char *, int, const Stat *>;
GetFuture asyncGet(const std::string & path);
struct ValueAndStatAndExists
{
std::string value;
Stat stat;
bool exists;
};
using TryGetFuture = Future<ValueAndStatAndExists, int, const char *, int, const Stat *>;
TryGetFuture asyncTryGet(const std::string & path);
struct StatAndExists
{
Stat stat;
bool exists;
};
using ExistsFuture = Future<StatAndExists, int, const Stat *>;
ExistsFuture asyncExists(const std::string & path);
using GetChildrenFuture = Future<Strings, int, const String_vector *>;
GetChildrenFuture asyncGetChildren(const std::string & path);
static std::string error2string(int32_t code);

View File

@ -350,34 +350,6 @@ int32_t ZooKeeper::getImpl(const std::string & path, std::string & res, Stat * s
return code;
}
std::unique_ptr<ZooKeeper::GetTask> ZooKeeper::asyncGet(const std::string & path, EventPtr watch)
{
std::unique_ptr<GetTask> task { new GetTask(
[path] (int rc, const char * value, int value_len, const Stat * stat)
{
if (rc != ZOK)
throw KeeperException(rc, path);
return ValueAndStat{ {value, size_t(value_len)}, *stat };
})};
int32_t code = zoo_aget(
impl, path.c_str(), !watch.isNull(),
[] (int rc, const char * value, int value_len, const Stat * stat, const void * data)
{
auto & task = const_cast<GetTask &>(*reinterpret_cast<const GetTask *>(data));
task(rc, value, value_len, stat);
},
task.get());
if (code != ZOK)
throw KeeperException(code, path);
return task;
}
std::string ZooKeeper::get(const std::string & path, Stat * stat, EventPtr watch)
{
std::string res;
@ -617,4 +589,114 @@ int64_t ZooKeeper::getClientID()
return zoo_client_id(impl)->client_id;
}
ZooKeeper::GetFuture ZooKeeper::asyncGet(const std::string & path)
{
GetFuture future {
[path] (int rc, const char * value, int value_len, const Stat * stat)
{
if (rc != ZOK)
throw KeeperException(rc, path);
return ValueAndStat{ {value, size_t(value_len)}, *stat };
}};
int32_t code = zoo_aget(
impl, path.c_str(), 0,
[] (int rc, const char * value, int value_len, const Stat * stat, const void * data)
{
auto & task = const_cast<GetFuture::Task &>(*reinterpret_cast<const GetFuture::Task *>(data));
task(rc, value, value_len, stat);
},
future.task.get());
if (code != ZOK)
throw KeeperException(code, path);
return future;
}
ZooKeeper::TryGetFuture ZooKeeper::asyncTryGet(const std::string & path)
{
TryGetFuture future {
[path] (int rc, const char * value, int value_len, const Stat * stat)
{
if (rc != ZOK && rc != ZNONODE)
throw KeeperException(rc, path);
return ValueAndStatAndExists{ {value, size_t(value_len)}, *stat, rc != ZNONODE };
}};
int32_t code = zoo_aget(
impl, path.c_str(), 0,
[] (int rc, const char * value, int value_len, const Stat * stat, const void * data)
{
auto & task = const_cast<TryGetFuture::Task &>(*reinterpret_cast<const TryGetFuture::Task *>(data));
task(rc, value, value_len, stat);
},
future.task.get());
if (code != ZOK)
throw KeeperException(code, path);
return future;
}
ZooKeeper::ExistsFuture ZooKeeper::asyncExists(const std::string & path)
{
ExistsFuture future {
[path] (int rc, const Stat * stat)
{
if (rc != ZOK && rc != ZNONODE)
throw KeeperException(rc, path);
return StatAndExists{ *stat, rc != ZNONODE };
}};
int32_t code = zoo_aexists(
impl, path.c_str(), 0,
[] (int rc, const Stat * stat, const void * data)
{
auto & task = const_cast<ExistsFuture::Task &>(*reinterpret_cast<const ExistsFuture::Task *>(data));
task(rc, stat);
},
future.task.get());
if (code != ZOK)
throw KeeperException(code, path);
return future;
}
ZooKeeper::GetChildrenFuture ZooKeeper::asyncGetChildren(const std::string & path)
{
GetChildrenFuture future {
[path] (int rc, const String_vector * strings)
{
if (rc != ZOK)
throw KeeperException(rc, path);
Strings res;
res.resize(strings->count);
for (int i = 0; i < strings->count; ++i)
res[i] = std::string(strings->data[i]);
return res;
}};
int32_t code = zoo_aget_children(
impl, path.c_str(), 0,
[] (int rc, const String_vector * strings, const void * data)
{
auto & task = const_cast<GetChildrenFuture::Task &>(*reinterpret_cast<const GetChildrenFuture::Task *>(data));
task(rc, strings);
},
future.task.get());
if (code != ZOK)
throw KeeperException(code, path);
return future;
}
}

View File

@ -6,11 +6,12 @@ try
{
zkutil::ZooKeeper zookeeper{"localhost:2181"};
auto task = zookeeper.asyncGet(argc <= 1 ? "/" : argv[1]);
auto future = task->get_future();
auto future = zookeeper.asyncGetChildren(argc <= 1 ? "/" : argv[1]);
auto res = future.get();
std::cerr << res.value << ", " << res.stat.numChildren << '\n';
for (const auto & child : res)
std::cerr << child << '\n';
return 0;
}
catch (const Poco::Exception & e)