Remove the link between TaskHandles and ZooKeeper

This commit is contained in:
Silviu Caragea 2018-03-22 14:34:42 +02:00
parent 6629b03af9
commit a2dc16a582
8 changed files with 26 additions and 31 deletions

View File

@ -118,6 +118,17 @@ void BackgroundSchedulePool::TaskInfo::execute()
LOG_INFO(&Logger::get("BackgroundSchedulePool"), "Executing " << name << " took " << milliseconds << " ms."); LOG_INFO(&Logger::get("BackgroundSchedulePool"), "Executing " << name << " took " << milliseconds << " ms.");
} }
zkutil::WatchCallback BackgroundSchedulePool::TaskInfo::getWatchCallback()
{
return [t=shared_from_this()](zkutil::ZooKeeper &, int, int, const char *) mutable {
if (t)
{
t->schedule();
t.reset(); /// The event is set only once, even if the callback can fire multiple times due to session events.
}
};
}
// BackgroundSchedulePool // BackgroundSchedulePool

View File

@ -11,7 +11,7 @@
#include <map> #include <map>
#include <functional> #include <functional>
#include <boost/noncopyable.hpp> #include <boost/noncopyable.hpp>
#include <Common/ZooKeeper/Types.h>
namespace DB namespace DB
{ {
@ -55,6 +55,10 @@ public:
void deactivate(); void deactivate();
void activate(); void activate();
/// get zkutil::WatchCallback needed for zookeeper callbacks.
zkutil::WatchCallback getWatchCallback();
private: private:
friend class TaskNotification; friend class TaskNotification;
friend class BackgroundSchedulePool; friend class BackgroundSchedulePool;

View File

@ -137,7 +137,7 @@ private:
return; return;
} }
if (!zookeeper.exists(path + "/" + *(it - 1), nullptr, task_handle)) if (!zookeeper.exists(path + "/" + *(it - 1), nullptr, task_handle->getWatchCallback()))
task_handle->schedule(); task_handle->schedule();
success = true; success = true;

View File

@ -5,7 +5,6 @@
#include <vector> #include <vector>
#include <zookeeper.h> #include <zookeeper.h>
#include <Poco/Event.h> #include <Poco/Event.h>
#include <Common/BackgroundSchedulePool.h>
namespace zkutil namespace zkutil
@ -191,7 +190,6 @@ namespace CreateMode
} }
using EventPtr = std::shared_ptr<Poco::Event>; using EventPtr = std::shared_ptr<Poco::Event>;
using TaskHandlePtr = DB::BackgroundSchedulePool::TaskHandle; /// TODO Need to remove this dependency.
class ZooKeeper; class ZooKeeper;

View File

@ -204,23 +204,6 @@ WatchCallback ZooKeeper::callbackForEvent(const EventPtr & event)
return callback; return callback;
} }
WatchCallback ZooKeeper::callbackForTaskHandle(const TaskHandlePtr & task)
{
WatchCallback callback;
if (task)
{
callback = [t=task](ZooKeeper &, int, int, const char *) mutable
{
if (t)
{
t->schedule();
t.reset(); /// The event is set only once, even if the callback can fire multiple times due to session events.
}
};
}
return callback;
}
WatchContext * ZooKeeper::createContext(WatchCallback && callback) WatchContext * ZooKeeper::createContext(WatchCallback && callback)
{ {
if (callback) if (callback)
@ -468,9 +451,9 @@ bool ZooKeeper::exists(const std::string & path, Stat * stat_, const EventPtr &
return existsWatch(path, stat_, callbackForEvent(watch)); return existsWatch(path, stat_, callbackForEvent(watch));
} }
bool ZooKeeper::exists(const std::string & path, Stat * stat, const TaskHandlePtr & watch) bool ZooKeeper::exists(const std::string & path, Stat * stat, const WatchCallback & watch_callback)
{ {
return existsWatch(path, stat, callbackForTaskHandle(watch)); return existsWatch(path, stat, watch_callback);
} }
bool ZooKeeper::existsWatch(const std::string & path, Stat * stat_, const WatchCallback & watch_callback) bool ZooKeeper::existsWatch(const std::string & path, Stat * stat_, const WatchCallback & watch_callback)
@ -528,11 +511,11 @@ std::string ZooKeeper::get(const std::string & path, Stat * stat, const EventPtr
throw KeeperException("Can't get data for node " + path + ": node doesn't exist", code); throw KeeperException("Can't get data for node " + path + ": node doesn't exist", code);
} }
std::string ZooKeeper::get(const std::string & path, Stat * stat, const TaskHandlePtr & watch) std::string ZooKeeper::get(const std::string & path, Stat * stat, const WatchCallback & watch_callback)
{ {
int code; int code;
std::string res; std::string res;
if (tryGetWatch(path, res, stat, callbackForTaskHandle(watch), &code)) if (tryGetWatch(path, res, stat, watch_callback, &code))
return res; return res;
else else
throw KeeperException("Can't get data for node " + path + ": node doesn't exist", code); throw KeeperException("Can't get data for node " + path + ": node doesn't exist", code);

View File

@ -8,11 +8,11 @@
#include <memory> #include <memory>
#include <mutex> #include <mutex>
#include <string> #include <string>
#include <unistd.h>
#include <common/logger_useful.h> #include <common/logger_useful.h>
#include <Common/ProfileEvents.h> #include <Common/ProfileEvents.h>
#include <Common/CurrentMetrics.h> #include <Common/CurrentMetrics.h>
namespace ProfileEvents namespace ProfileEvents
{ {
extern const Event CannotRemoveEphemeralNode; extern const Event CannotRemoveEphemeralNode;
@ -160,11 +160,11 @@ public:
int32_t tryRemoveEphemeralNodeWithRetries(const std::string & path, int32_t version = -1, size_t * attempt = nullptr); int32_t tryRemoveEphemeralNodeWithRetries(const std::string & path, int32_t version = -1, size_t * attempt = nullptr);
bool exists(const std::string & path, Stat * stat = nullptr, const EventPtr & watch = nullptr); bool exists(const std::string & path, Stat * stat = nullptr, const EventPtr & watch = nullptr);
bool exists(const std::string & path, Stat * stat, const TaskHandlePtr & watch); bool exists(const std::string & path, Stat * stat, const WatchCallback & watch_callback);
bool existsWatch(const std::string & path, Stat * stat, const WatchCallback & watch_callback); bool existsWatch(const std::string & path, Stat * stat, const WatchCallback & watch_callback);
std::string get(const std::string & path, Stat * stat = nullptr, const EventPtr & watch = nullptr); std::string get(const std::string & path, Stat * stat = nullptr, const EventPtr & watch = nullptr);
std::string get(const std::string & path, Stat * stat, const TaskHandlePtr & watch); std::string get(const std::string & path, Stat * stat, const WatchCallback & watch_callback);
/// Doesn't not throw in the following cases: /// Doesn't not throw in the following cases:
/// * The node doesn't exist. Returns false in this case. /// * The node doesn't exist. Returns false in this case.
@ -377,7 +377,6 @@ private:
void tryRemoveChildrenRecursive(const std::string & path); void tryRemoveChildrenRecursive(const std::string & path);
static WatchCallback callbackForEvent(const EventPtr & event); static WatchCallback callbackForEvent(const EventPtr & event);
static WatchCallback callbackForTaskHandle(const TaskHandlePtr & task);
WatchContext * createContext(WatchCallback && callback); WatchContext * createContext(WatchCallback && callback);
static void destroyContext(WatchContext * context); static void destroyContext(WatchContext * context);
static void processCallback(zhandle_t * zh, int type, int state, const char * path, void * watcher_ctx); static void processCallback(zhandle_t * zh, int type, int state, const char * path, void * watcher_ctx);

View File

@ -58,7 +58,7 @@ void ReplicatedMergeTreeAlterThread::run()
auto zookeeper = storage.getZooKeeper(); auto zookeeper = storage.getZooKeeper();
zkutil::Stat stat; zkutil::Stat stat;
const String columns_str = zookeeper->get(storage.zookeeper_path + "/columns", &stat, task_handle); const String columns_str = zookeeper->get(storage.zookeeper_path + "/columns", &stat, task_handle->getWatchCallback());
auto columns_in_zk = ColumnsDescription::parse(columns_str); auto columns_in_zk = ColumnsDescription::parse(columns_str);
bool changed_version = (stat.version != storage.columns_version); bool changed_version = (stat.version != storage.columns_version);

View File

@ -402,7 +402,7 @@ bool ReplicatedMergeTreeQueue::pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper, B
if (next_update_event) if (next_update_event)
{ {
if (zookeeper->exists(zookeeper_path + "/log/log-" + padIndex(index), nullptr, next_update_event)) if (zookeeper->exists(zookeeper_path + "/log/log-" + padIndex(index), nullptr, next_update_event->getWatchCallback()))
next_update_event->schedule(); next_update_event->schedule();
} }