allow arbitrary callbacks in ZooKeeperNodeChange [#CLICKHOUSE-3859]

This commit is contained in:
Alexey Zatelepin 2018-10-17 20:23:10 +03:00
parent a0a9aad777
commit ce1ead5b88
7 changed files with 46 additions and 21 deletions

View File

@ -114,12 +114,13 @@ int Server::main(const std::vector<std::string> & /*args*/)
bool has_zookeeper = config().has("zookeeper");
zkutil::ZooKeeperNodeCache main_config_zk_node_cache([&] { return global_context->getZooKeeper(); });
zkutil::EventPtr main_config_zk_changed_event = std::make_shared<Poco::Event>();
if (loaded_config.has_zk_includes)
{
auto old_configuration = loaded_config.configuration;
ConfigProcessor config_processor(config_path);
loaded_config = config_processor.loadConfigWithZooKeeperIncludes(
main_config_zk_node_cache, /* fallback_to_preprocessed = */ true);
main_config_zk_node_cache, main_config_zk_changed_event, /* fallback_to_preprocessed = */ true);
config_processor.savePreprocessedConfig(loaded_config);
config().removeConfiguration(old_configuration.get());
config().add(loaded_config.configuration.duplicate(), PRIO_DEFAULT, false);
@ -267,6 +268,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
auto main_config_reloader = std::make_unique<ConfigReloader>(config_path,
include_from_path,
std::move(main_config_zk_node_cache),
main_config_zk_changed_event,
[&](ConfigurationPtr config)
{
buildLoggers(*config);
@ -288,6 +290,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
auto users_config_reloader = std::make_unique<ConfigReloader>(users_config_path,
include_from_path,
zkutil::ZooKeeperNodeCache([&] { return global_context->getZooKeeper(); }),
std::make_shared<Poco::Event>(),
[&](ConfigurationPtr config) { global_context->setUsersConfig(config); },
/* already_loaded = */ false);

View File

@ -234,6 +234,7 @@ void ConfigProcessor::doIncludesRecursive(
XMLDocumentPtr include_from,
Node * node,
zkutil::ZooKeeperNodeCache * zk_node_cache,
const zkutil::EventPtr & zk_changed_event,
std::unordered_set<std::string> & contributing_zk_paths)
{
if (node->nodeType() == Node::TEXT_NODE)
@ -352,7 +353,7 @@ void ConfigProcessor::doIncludesRecursive(
XMLDocumentPtr zk_document;
auto get_zk_node = [&](const std::string & name) -> const Node *
{
zkutil::ZooKeeperNodeCache::GetResult result = zk_node_cache->get(name);
zkutil::ZooKeeperNodeCache::GetResult result = zk_node_cache->get(name, zk_changed_event);
if (!result.exists)
return nullptr;
@ -383,13 +384,13 @@ void ConfigProcessor::doIncludesRecursive(
}
if (included_something)
doIncludesRecursive(config, include_from, node, zk_node_cache, contributing_zk_paths);
doIncludesRecursive(config, include_from, node, zk_node_cache, zk_changed_event, contributing_zk_paths);
else
{
NodeListPtr children = node->childNodes();
Node * child = nullptr;
for (size_t i = 0; (child = children->item(i)); ++i)
doIncludesRecursive(config, include_from, child, zk_node_cache, contributing_zk_paths);
doIncludesRecursive(config, include_from, child, zk_node_cache, zk_changed_event, contributing_zk_paths);
}
}
@ -433,7 +434,8 @@ ConfigProcessor::Files ConfigProcessor::getConfigMergeFiles(const std::string &
XMLDocumentPtr ConfigProcessor::processConfig(
bool * has_zk_includes,
zkutil::ZooKeeperNodeCache * zk_node_cache)
zkutil::ZooKeeperNodeCache * zk_node_cache,
const zkutil::EventPtr & zk_changed_event)
{
XMLDocumentPtr config = dom_parser.parse(path);
@ -476,7 +478,7 @@ XMLDocumentPtr ConfigProcessor::processConfig(
include_from = dom_parser.parse(include_from_path);
}
doIncludesRecursive(config, include_from, getRootNode(config.get()), zk_node_cache, contributing_zk_paths);
doIncludesRecursive(config, include_from, getRootNode(config.get()), zk_node_cache, zk_changed_event, contributing_zk_paths);
}
catch (Poco::Exception & e)
{
@ -525,6 +527,7 @@ ConfigProcessor::LoadedConfig ConfigProcessor::loadConfig(bool allow_zk_includes
ConfigProcessor::LoadedConfig ConfigProcessor::loadConfigWithZooKeeperIncludes(
zkutil::ZooKeeperNodeCache & zk_node_cache,
const zkutil::EventPtr & zk_changed_event,
bool fallback_to_preprocessed)
{
XMLDocumentPtr config_xml;
@ -532,7 +535,7 @@ ConfigProcessor::LoadedConfig ConfigProcessor::loadConfigWithZooKeeperIncludes(
bool processed_successfully = false;
try
{
config_xml = processConfig(&has_zk_includes, &zk_node_cache);
config_xml = processConfig(&has_zk_includes, &zk_node_cache, zk_changed_event);
processed_successfully = true;
}
catch (const Poco::Exception & ex)

View File

@ -22,6 +22,7 @@
namespace zkutil
{
class ZooKeeperNodeCache;
using EventPtr = std::shared_ptr<Poco::Event>;
}
using ConfigurationPtr = Poco::AutoPtr<Poco::Util::AbstractConfiguration>;
@ -58,7 +59,8 @@ public:
/// 5) (Yandex.Metrika-specific) Substitute "<layer/>" with "<layer>layer number from the hostname</layer>".
XMLDocumentPtr processConfig(
bool * has_zk_includes = nullptr,
zkutil::ZooKeeperNodeCache * zk_node_cache = nullptr);
zkutil::ZooKeeperNodeCache * zk_node_cache = nullptr,
const zkutil::EventPtr & zk_changed_event = nullptr);
/// loadConfig* functions apply processConfig and create Poco::Util::XMLConfiguration.
@ -83,6 +85,7 @@ public:
/// processing, load the configuration from the preprocessed file.
LoadedConfig loadConfigWithZooKeeperIncludes(
zkutil::ZooKeeperNodeCache & zk_node_cache,
const zkutil::EventPtr & zk_changed_event,
bool fallback_to_preprocessed = false);
void savePreprocessedConfig(const LoadedConfig & loaded_config);
@ -125,5 +128,6 @@ private:
XMLDocumentPtr include_from,
Poco::XML::Node * node,
zkutil::ZooKeeperNodeCache * zk_node_cache,
const zkutil::EventPtr & zk_changed_event,
std::unordered_set<std::string> & contributing_zk_paths);
};

View File

@ -16,10 +16,12 @@ ConfigReloader::ConfigReloader(
const std::string & path_,
const std::string & include_from_path_,
zkutil::ZooKeeperNodeCache && zk_node_cache_,
const zkutil::EventPtr & zk_changed_event_,
Updater && updater_,
bool already_loaded)
: path(path_), include_from_path(include_from_path_)
, zk_node_cache(std::move(zk_node_cache_))
, zk_changed_event(zk_changed_event_)
, updater(std::move(updater_))
{
if (!already_loaded)
@ -38,7 +40,7 @@ ConfigReloader::~ConfigReloader()
try
{
quit = true;
zk_node_cache.getChangedEvent().set();
zk_changed_event->set();
if (thread.joinable())
thread.join();
@ -58,7 +60,7 @@ void ConfigReloader::run()
{
try
{
bool zk_changed = zk_node_cache.getChangedEvent().tryWait(std::chrono::milliseconds(reload_interval).count());
bool zk_changed = zk_changed_event->tryWait(std::chrono::milliseconds(reload_interval).count());
if (quit)
return;
@ -88,7 +90,7 @@ void ConfigReloader::reloadIfNewer(bool force, bool throw_on_error, bool fallbac
loaded_config = config_processor.loadConfig(/* allow_zk_includes = */ true);
if (loaded_config.has_zk_includes)
loaded_config = config_processor.loadConfigWithZooKeeperIncludes(
zk_node_cache, fallback_to_preprocessed);
zk_node_cache, zk_changed_event, fallback_to_preprocessed);
}
catch (...)
{

View File

@ -34,6 +34,7 @@ public:
const std::string & path,
const std::string & include_from_path,
zkutil::ZooKeeperNodeCache && zk_node_cache,
const zkutil::EventPtr & zk_changed_event,
Updater && updater,
bool already_loaded);
@ -72,6 +73,7 @@ private:
std::string include_from_path;
FilesChangesTracker files;
zkutil::ZooKeeperNodeCache zk_node_cache;
zkutil::EventPtr zk_changed_event = std::make_shared<Poco::Event>();
Updater updater;

View File

@ -9,7 +9,16 @@ ZooKeeperNodeCache::ZooKeeperNodeCache(GetZooKeeper get_zookeeper_)
{
}
ZooKeeperNodeCache::GetResult ZooKeeperNodeCache::get(const std::string & path)
ZooKeeperNodeCache::GetResult ZooKeeperNodeCache::get(const std::string & path, EventPtr watch_event)
{
Coordination::WatchCallback watch_callback;
if (watch_event)
watch_callback = [watch_event](const Coordination::WatchResponse &) { watch_event->set(); };
return get(path, watch_callback);
}
ZooKeeperNodeCache::GetResult ZooKeeperNodeCache::get(const std::string & path, Coordination::WatchCallback caller_watch_callback)
{
zkutil::ZooKeeperPtr zookeeper;
std::unordered_set<std::string> invalidated_paths;
@ -38,7 +47,7 @@ ZooKeeperNodeCache::GetResult ZooKeeperNodeCache::get(const std::string & path)
if (cache_it != node_cache.end())
return cache_it->second;
auto watch_callback = [context=context](const Coordination::WatchResponse & response)
auto watch_callback = [=](const Coordination::WatchResponse & response)
{
if (!(response.type != Coordination::SESSION || response.state == Coordination::EXPIRED_SESSION))
return;
@ -56,8 +65,8 @@ ZooKeeperNodeCache::GetResult ZooKeeperNodeCache::get(const std::string & path)
changed = true;
}
}
if (changed)
context->changed_event.set();
if (changed && caller_watch_callback)
caller_watch_callback(response);
};
GetResult result;

View File

@ -22,8 +22,13 @@ namespace zkutil
/// This class allows querying the contents of ZooKeeper nodes and caching the results.
/// Watches are set for cached nodes and for nodes that were nonexistent at the time of query.
/// After a watch fires, a notification is generated for the change event.
/// After a watch fires, the callback or event that was passed by the user is notified.
///
/// NOTE: methods of this class are not thread-safe.
///
/// Intended use case: if you need one thread to watch changes in several nodes.
/// If instead you use simple a watch event for this, watches will accumulate for nodes that do not change
/// or change rarely.
class ZooKeeperNodeCache
{
public:
@ -39,17 +44,14 @@ public:
Coordination::Stat stat;
};
GetResult get(const std::string & path);
Poco::Event & getChangedEvent() { return context->changed_event; }
GetResult get(const std::string & path, EventPtr watch_event);
GetResult get(const std::string & path, Coordination::WatchCallback watch_callback);
private:
GetZooKeeper get_zookeeper;
struct Context
{
Poco::Event changed_event;
std::mutex mutex;
zkutil::ZooKeeperPtr zookeeper;
std::unordered_set<std::string> invalidated_paths;