Using different ZooKeeper library (development) [#CLICKHOUSE-2]

This commit is contained in:
Alexey Milovidov 2018-03-24 23:11:46 +03:00
parent ae9633c1e5
commit 58f32f98d7
6 changed files with 16 additions and 14 deletions

View File

@ -93,7 +93,7 @@ TEST(zkutil, multi_async)
EXPECT_ANY_THROW
(
std::vector<zkutil::ZooKeeper::MultiFuture> futures;
std::vector<std::future<zkutil::MultiResponse>> futures;
for (size_t i = 0; i < 10000; ++i)
{

View File

@ -23,7 +23,7 @@ try
{
while (true)
{
std::vector<zkutil::ZooKeeper::TryGetFuture> futures;
std::vector<std::future<zkutil::GetResponse>> futures;
for (auto & node : nodes)
futures.push_back(zookeeper.asyncTryGet("/tmp/" + node));

View File

@ -1660,7 +1660,7 @@ protected:
output = io_insert.out;
}
using ExistsFuture = zkutil::ZooKeeper::ExistsFuture;
using ExistsFuture = std::future<zkutil::ExistsResponse>;
auto future_is_dirty_checker = std::make_unique<ExistsFuture>(zookeeper->asyncExists(is_dirty_flag_path));
Stopwatch watch(CLOCK_MONOTONIC_COARSE);
@ -1686,7 +1686,7 @@ protected:
throw;
}
if (status.exists)
if (status.error != ZooKeeperImpl::ZooKeeper::ZNONODE)
throw Exception("Partition is dirty, cancel INSERT SELECT", ErrorCodes::UNFINISHED);
}

View File

@ -53,7 +53,7 @@ bool ReplicatedMergeTreeQueue::load(zkutil::ZooKeeperPtr zookeeper)
std::sort(children.begin(), children.end());
std::vector<std::pair<String, zkutil::ZooKeeper::GetFuture>> futures;
std::vector<std::pair<String, std::future<zkutil::GetResponse>>> futures;
futures.reserve(children.size());
for (const String & child : children)
@ -61,8 +61,9 @@ bool ReplicatedMergeTreeQueue::load(zkutil::ZooKeeperPtr zookeeper)
for (auto & future : futures)
{
zkutil::ZooKeeper::ValueAndStat res = future.second.get();
LogEntryPtr entry = LogEntry::parse(res.value, res.stat);
zkutil::GetResponse res = future.second.get();
LogEntryPtr entry = LogEntry::parse(res.data, res.stat);
entry->znode_name = future.first;
insertUnlocked(entry, min_unprocessed_insert_time_changed, lock);
@ -308,7 +309,7 @@ bool ReplicatedMergeTreeQueue::pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper, z
LOG_DEBUG(log, "Pulling " << (end - begin) << " entries to queue: " << *begin << " - " << *last);
std::vector<std::pair<String, zkutil::ZooKeeper::GetFuture>> futures;
std::vector<std::pair<String, std::future<zkutil::GetResponse>>> futures;
futures.reserve(end - begin);
for (auto it = begin; it != end; ++it)
@ -324,11 +325,12 @@ bool ReplicatedMergeTreeQueue::pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper, z
for (auto & future : futures)
{
zkutil::ZooKeeper::ValueAndStat res = future.second.get();
copied_entries.emplace_back(LogEntry::parse(res.value, res.stat));
zkutil::GetResponse res = future.second.get();
copied_entries.emplace_back(LogEntry::parse(res.data, res.stat));
ops.emplace_back(zkutil::makeCreateRequest(
replica_path + "/queue/queue-", res.value, zkutil::CreateMode::PersistentSequential));
replica_path + "/queue/queue-", res.data, zkutil::CreateMode::PersistentSequential));
const auto & entry = *copied_entries.back();
if (entry.type == LogEntry::GET_PART)

View File

@ -312,7 +312,7 @@ void ReplicatedMergeTreeRestartingThread::activateReplica()
* This is possible only when session in ZooKeeper expires.
*/
String data;
Stat stat;
zkutil::Stat stat;
bool has_is_active = zookeeper->tryGet(is_active_path, data, &stat);
if (has_is_active && data == active_node_identifier)
{

View File

@ -131,7 +131,7 @@ BlockInputStreams StorageSystemZooKeeper::read(
if (path_part == "/")
path_part.clear();
std::vector<zkutil::ZooKeeper::TryGetFuture> futures;
std::vector<std::future<zkutil::GetResponse>> futures;
futures.reserve(nodes.size());
for (const String & node : nodes)
futures.push_back(zookeeper->asyncTryGet(path_part + '/' + node));
@ -141,7 +141,7 @@ BlockInputStreams StorageSystemZooKeeper::read(
for (size_t i = 0, size = nodes.size(); i < size; ++i)
{
auto res = futures[i].get();
if (!res.exists)
if (res.error == ZooKeeperImpl::ZooKeeper::ZNONODE)
continue; /// Node was deleted meanwhile.
const zkutil::Stat & stat = res.stat;