Add automatic DROP DNS CACHE, update of SYSTEM queries. [#CLICKHOUSE-3645]

This commit is contained in:
Vitaliy Lyudvichenko 2018-03-26 17:12:07 +03:00 committed by alexey-milovidov
parent 76468d8d89
commit 8fd72a6777
13 changed files with 262 additions and 38 deletions

View File

@ -138,7 +138,9 @@
M(RWLockAcquiredReadLocks) \ M(RWLockAcquiredReadLocks) \
M(RWLockAcquiredWriteLocks) \ M(RWLockAcquiredWriteLocks) \
M(RWLockReadersWaitMilliseconds) \ M(RWLockReadersWaitMilliseconds) \
M(RWLockWritersWaitMilliseconds) M(RWLockWritersWaitMilliseconds) \
\
M(NetworkErrors)
namespace ProfileEvents namespace ProfileEvents
{ {

View File

@ -1406,9 +1406,28 @@ std::shared_ptr<Cluster> Context::tryGetCluster(const std::string & cluster_name
void Context::reloadClusterConfig() void Context::reloadClusterConfig()
{ {
std::lock_guard<std::mutex> lock(shared->clusters_mutex); while (true)
auto & config = shared->clusters_config ? *shared->clusters_config : getConfigRef(); {
shared->clusters = std::make_unique<Clusters>(config, settings); ConfigurationPtr cluster_config;
{
std::lock_guard<std::mutex> lock(shared->clusters_mutex);
cluster_config = shared->clusters_config;
}
auto & config = cluster_config ? *cluster_config : getConfigRef();
auto new_clusters = std::make_unique<Clusters>(config, settings);
{
std::lock_guard<std::mutex> lock(shared->clusters_mutex);
if (shared->clusters_config.get() == cluster_config.get())
{
shared->clusters = std::move(new_clusters);
return;
}
/// Clusters config has been suddenly changed, recompute clusters
}
}
} }

View File

@ -39,6 +39,7 @@
#include <random> #include <random>
#include <pcg_random.hpp> #include <pcg_random.hpp>
#include <Poco/Net/NetException.h>
namespace DB namespace DB
@ -95,7 +96,7 @@ struct HostID
{ {
return DB::isLocalAddress(Poco::Net::SocketAddress(host_name, port), clickhouse_port); return DB::isLocalAddress(Poco::Net::SocketAddress(host_name, port), clickhouse_port);
} }
catch (const Poco::Exception & e) catch (const Poco::Net::NetException & e)
{ {
/// Avoid "Host not found" exceptions /// Avoid "Host not found" exceptions
return false; return false;

View File

@ -0,0 +1,116 @@
#include "DNSCacheUpdater.h"
#include <Common/DNSCache.h>
#include <Interpreters/Context.h>
#include <Storages/MergeTree/BackgroundProcessingPool.h>
#include <Common/ProfileEvents.h>
#include <Poco/Net/NetException.h>
#include <common/logger_useful.h>
namespace ProfileEvents
{
extern Event NetworkErrors;
}
namespace DB
{
using BackgroundProcessingPoolTaskInfo = BackgroundProcessingPool::TaskInfo;
namespace ErrorCodes
{
extern const int TIMEOUT_EXCEEDED;
extern const int ALL_CONNECTION_TRIES_FAILED;
}
DNSCacheUpdater::DNSCacheUpdater(Context & context_)
: context(context_), pool(context_.getBackgroundPool())
{
task_handle = pool.addTask([this] () { return run(); });
}
bool DNSCacheUpdater::run()
{
/// TODO: Ensusre that we get global counter (not thread local)
auto num_current_network_exceptions = ProfileEvents::counters[ProfileEvents::NetworkErrors].load(std::memory_order_relaxed);
if (num_current_network_exceptions >= last_num_network_erros + min_errors_to_update_cache
&& time(nullptr) > last_update_time + min_update_period_seconds)
{
try
{
LOG_INFO(&Poco::Logger::get("DNSCacheUpdater"), "Updating DNS cache");
DNSCache::instance().drop();
context.reloadClusterConfig();
last_num_network_erros = num_current_network_exceptions;
last_update_time = time(nullptr);
return true;
}
catch (...)
{
/// Do not increment ProfileEvents::NetworkErrors twice
if (isNetworkError())
return false;
throw;
}
}
/// According to BackgroundProcessingPool logic, if task has done work, it could be executed again immediately.
return false;
}
DNSCacheUpdater::~DNSCacheUpdater()
{
if (task_handle)
pool.removeTask(task_handle);
task_handle.reset();
}
bool DNSCacheUpdater::incrementNetworkErrors()
{
if (isNetworkError())
{
ProfileEvents::increment(ProfileEvents::NetworkErrors);
return true;
}
return false;
}
bool DNSCacheUpdater::isNetworkError()
{
try
{
throw;
}
catch (const Exception & e)
{
if (e.code() == ErrorCodes::TIMEOUT_EXCEEDED || e.code() == ErrorCodes::ALL_CONNECTION_TRIES_FAILED)
return true;
}
catch (Poco::Net::DNSException & e)
{
return true;
}
catch (Poco::TimeoutException & e)
{
return true;
}
catch (...)
{
/// Do nothing
}
return false;
}
}

View File

@ -0,0 +1,42 @@
#pragma once
#include <memory>
namespace DB
{
class Context;
class BackgroundProcessingPool;
class BackgroundProcessingPoolTaskInfo;
/// Add a task to BackgroundProcessingPool that watch for ProfileEvents::NetworkErrors and updates DNS cache if it has increased
class DNSCacheUpdater
{
public:
DNSCacheUpdater(Context & context);
~DNSCacheUpdater();
/// Call it inside catch section
/// Returns true if it is a network error
static bool isNetworkError();
/// Checks if it is a network error and increments ProfileEvents::NetworkErrors
static bool incrementNetworkErrors();
private:
bool run();
Context & context;
BackgroundProcessingPool & pool;
std::shared_ptr<BackgroundProcessingPoolTaskInfo> task_handle;
size_t last_num_network_erros = 0;
time_t last_update_time = 0;
static constexpr size_t min_errors_to_update_cache = 3;
static constexpr time_t min_update_period_seconds = 10;
};
}

View File

@ -63,6 +63,10 @@ BlockIO InterpreterSystemQuery::execute()
using Type = ASTSystemQuery::Type; using Type = ASTSystemQuery::Type;
/// Use global context with fresh system profile settings
Context system_context = context.getGlobalContext();
system_context.setSetting("profile", context.getSystemProfileName());
switch (query.type) switch (query.type)
{ {
case Type::SHUTDOWN: case Type::SHUTDOWN:
@ -76,29 +80,32 @@ BlockIO InterpreterSystemQuery::execute()
case Type::DROP_DNS_CACHE: case Type::DROP_DNS_CACHE:
DNSCache::instance().drop(); DNSCache::instance().drop();
/// Reinitialize clusters to update their resolved_addresses /// Reinitialize clusters to update their resolved_addresses
context.reloadClusterConfig(); system_context.reloadClusterConfig();
break; break;
case Type::DROP_MARK_CACHE: case Type::DROP_MARK_CACHE:
context.dropMarkCache(); system_context.dropMarkCache();
break; break;
case Type::DROP_UNCOMPRESSED_CACHE: case Type::DROP_UNCOMPRESSED_CACHE:
context.dropUncompressedCache(); system_context.dropUncompressedCache();
break; break;
case Type::RELOAD_DICTIONARY: case Type::RELOAD_DICTIONARY:
context.getExternalDictionaries().reloadDictionary(query.target_dictionary); system_context.getExternalDictionaries().reloadDictionary(query.target_dictionary);
break; break;
case Type::RELOAD_DICTIONARIES: case Type::RELOAD_DICTIONARIES:
{ {
auto status = getOverallExecutionStatusOfCommands( auto status = getOverallExecutionStatusOfCommands(
[&] { context.getExternalDictionaries().reload(); }, [&] { system_context.getExternalDictionaries().reload(); },
[&] { context.getEmbeddedDictionaries().reload(); } [&] { system_context.getEmbeddedDictionaries().reload(); }
); );
if (status.code != 0) if (status.code != 0)
throw Exception(status.message, status.code); throw Exception(status.message, status.code);
break; break;
} }
case Type::RELOAD_EMBEDDED_DICTIONARIES:
system_context.getEmbeddedDictionaries().reload();
break;
case Type::RELOAD_CONFIG: case Type::RELOAD_CONFIG:
context.reloadConfig(); system_context.reloadConfig();
break; break;
case Type::STOP_LISTEN_QUERIES: case Type::STOP_LISTEN_QUERIES:
case Type::START_LISTEN_QUERIES: case Type::START_LISTEN_QUERIES:

View File

@ -23,6 +23,7 @@
#include <Interpreters/ProcessList.h> #include <Interpreters/ProcessList.h>
#include <Interpreters/QueryLog.h> #include <Interpreters/QueryLog.h>
#include <Interpreters/executeQuery.h> #include <Interpreters/executeQuery.h>
#include "DNSCacheUpdater.h"
namespace ProfileEvents namespace ProfileEvents
@ -377,6 +378,8 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
if (!internal) if (!internal)
onExceptionBeforeStart(query, context, current_time); onExceptionBeforeStart(query, context, current_time);
DNSCacheUpdater::incrementNetworkErrors();
throw; throw;
} }

View File

@ -39,6 +39,8 @@ const char * ASTSystemQuery::typeToString(Type type)
return "RELOAD DICTIONARY"; return "RELOAD DICTIONARY";
case Type::RELOAD_DICTIONARIES: case Type::RELOAD_DICTIONARIES:
return "RELOAD DICTIONARIES"; return "RELOAD DICTIONARIES";
case Type::RELOAD_EMBEDDED_DICTIONARIES:
return "RELOAD EMBEDDED DICTIONARIES";
case Type::RELOAD_CONFIG: case Type::RELOAD_CONFIG:
return "RELOAD CONFIG"; return "RELOAD CONFIG";
case Type::STOP_MERGES: case Type::STOP_MERGES:

View File

@ -24,6 +24,7 @@ public:
SYNC_REPLICA, SYNC_REPLICA,
RELOAD_DICTIONARY, RELOAD_DICTIONARY,
RELOAD_DICTIONARIES, RELOAD_DICTIONARIES,
RELOAD_EMBEDDED_DICTIONARIES,
RELOAD_CONFIG, RELOAD_CONFIG,
STOP_MERGES, STOP_MERGES,
START_MERGES, START_MERGES,

View File

@ -40,6 +40,9 @@
#if Poco_NetSSL_FOUND #if Poco_NetSSL_FOUND
#include <Poco/Net/Context.h> #include <Poco/Net/Context.h>
#include <Poco/Net/SecureServerSocket.h> #include <Poco/Net/SecureServerSocket.h>
#include <Interpreters/DNSCacheUpdater.h>
#endif #endif
namespace CurrentMetrics namespace CurrentMetrics
@ -322,6 +325,9 @@ int Server::main(const std::vector<std::string> & /*args*/)
global_context->setDDLWorker(std::make_shared<DDLWorker>(ddl_zookeeper_path, *global_context, &config(), "distributed_ddl")); global_context->setDDLWorker(std::make_shared<DDLWorker>(ddl_zookeeper_path, *global_context, &config(), "distributed_ddl"));
} }
/// Initialize a watcher updating DNS cache in case of network errors
DNSCacheUpdater dns_cache_updater(*global_context);
{ {
Poco::Timespan keep_alive_timeout(config().getUInt("keep_alive_timeout", 10), 0); Poco::Timespan keep_alive_timeout(config().getUInt("keep_alive_timeout", 10), 0);

View File

@ -6,6 +6,7 @@
#include <IO/WriteHelpers.h> #include <IO/WriteHelpers.h>
#include <common/logger_useful.h> #include <common/logger_useful.h>
#include <Storages/MergeTree/BackgroundProcessingPool.h> #include <Storages/MergeTree/BackgroundProcessingPool.h>
#include <Interpreters/DNSCacheUpdater.h>
#include <pcg_random.hpp> #include <pcg_random.hpp>
#include <random> #include <random>
@ -25,7 +26,7 @@ constexpr double BackgroundProcessingPool::sleep_seconds;
constexpr double BackgroundProcessingPool::sleep_seconds_random_part; constexpr double BackgroundProcessingPool::sleep_seconds_random_part;
void BackgroundProcessingPool::TaskInfo::wake() void BackgroundProcessingPoolTaskInfo::wake()
{ {
if (removed) if (removed)
return; return;
@ -36,7 +37,7 @@ void BackgroundProcessingPool::TaskInfo::wake()
std::unique_lock<std::mutex> lock(pool.tasks_mutex); std::unique_lock<std::mutex> lock(pool.tasks_mutex);
auto next_time_to_execute = iterator->first; auto next_time_to_execute = iterator->first;
TaskHandle this_task_handle = iterator->second; auto this_task_handle = iterator->second;
/// If this task was done nothing at previous time and it has to sleep, then cancel sleep time. /// If this task was done nothing at previous time and it has to sleep, then cancel sleep time.
if (next_time_to_execute > current_time) if (next_time_to_execute > current_time)
@ -180,6 +181,7 @@ void BackgroundProcessingPool::threadFunction()
catch (...) catch (...)
{ {
tryLogCurrentException(__PRETTY_FUNCTION__); tryLogCurrentException(__PRETTY_FUNCTION__);
DNSCacheUpdater::incrementNetworkErrors();
} }
if (shutdown) if (shutdown)

View File

@ -16,6 +16,9 @@
namespace DB namespace DB
{ {
class BackgroundProcessingPool;
class BackgroundProcessingPoolTaskInfo;
/** Using a fixed number of threads, perform an arbitrary number of tasks in an infinite loop. /** Using a fixed number of threads, perform an arbitrary number of tasks in an infinite loop.
* In this case, one task can run simultaneously from different threads. * In this case, one task can run simultaneously from different threads.
* Designed for tasks that perform continuous background work (for example, merge). * Designed for tasks that perform continuous background work (for example, merge).
@ -27,29 +30,7 @@ class BackgroundProcessingPool
public: public:
/// Returns true, if some useful work was done. In that case, thread will not sleep before next run of this task. /// Returns true, if some useful work was done. In that case, thread will not sleep before next run of this task.
using Task = std::function<bool()>; using Task = std::function<bool()>;
using TaskInfo = BackgroundProcessingPoolTaskInfo;
class TaskInfo
{
public:
/// Wake up any thread.
void wake();
TaskInfo(BackgroundProcessingPool & pool_, const Task & function_) : pool(pool_), function(function_) {}
private:
friend class BackgroundProcessingPool;
BackgroundProcessingPool & pool;
Task function;
/// Read lock is hold when task is executed.
std::shared_mutex rwlock;
std::atomic<bool> removed {false};
std::multimap<Poco::Timestamp, std::shared_ptr<TaskInfo>>::iterator iterator;
};
using TaskHandle = std::shared_ptr<TaskInfo>; using TaskHandle = std::shared_ptr<TaskInfo>;
@ -65,7 +46,9 @@ public:
~BackgroundProcessingPool(); ~BackgroundProcessingPool();
private: protected:
friend class BackgroundProcessingPoolTaskInfo;
using Tasks = std::multimap<Poco::Timestamp, TaskHandle>; /// key is desired next time to execute (priority). using Tasks = std::multimap<Poco::Timestamp, TaskHandle>; /// key is desired next time to execute (priority).
using Threads = std::vector<std::thread>; using Threads = std::vector<std::thread>;
@ -87,4 +70,27 @@ private:
using BackgroundProcessingPoolPtr = std::shared_ptr<BackgroundProcessingPool>; using BackgroundProcessingPoolPtr = std::shared_ptr<BackgroundProcessingPool>;
class BackgroundProcessingPoolTaskInfo
{
public:
/// Wake up any thread.
void wake();
BackgroundProcessingPoolTaskInfo(BackgroundProcessingPool & pool_, const BackgroundProcessingPool::Task & function_)
: pool(pool_), function(function_) {}
protected:
friend class BackgroundProcessingPool;
BackgroundProcessingPool & pool;
BackgroundProcessingPool::Task function;
/// Read lock is hold when task is executed.
std::shared_mutex rwlock;
std::atomic<bool> removed {false};
std::multimap<Poco::Timestamp, std::shared_ptr<BackgroundProcessingPoolTaskInfo>>::iterator iterator;
};
} }

View File

@ -58,6 +58,7 @@ def test_DROP_DNS_CACHE(started_cluster):
instance = cluster.instances['ch1'] instance = cluster.instances['ch1']
instance.exec_in_container(['bash', '-c', 'echo 127.255.255.255 lost_host > /etc/hosts'], privileged=True, user='root') instance.exec_in_container(['bash', '-c', 'echo 127.255.255.255 lost_host > /etc/hosts'], privileged=True, user='root')
instance.query("SYSTEM DROP DNS CACHE")
with pytest.raises(QueryRuntimeException): with pytest.raises(QueryRuntimeException):
instance.query("SELECT * FROM remote('lost_host', 'system', 'one')") instance.query("SELECT * FROM remote('lost_host', 'system', 'one')")
@ -74,6 +75,22 @@ def test_DROP_DNS_CACHE(started_cluster):
assert TSV(instance.query("SELECT DISTINCT host_name, host_address FROM system.clusters WHERE cluster='lost_host_cluster'")) == TSV("lost_host\t127.0.0.1\n") assert TSV(instance.query("SELECT DISTINCT host_name, host_address FROM system.clusters WHERE cluster='lost_host_cluster'")) == TSV("lost_host\t127.0.0.1\n")
def test_automatic_DROP_DNS_CACHE(started_cluster):
instance = cluster.instances['ch1']
instance.exec_in_container(['bash', '-c', 'echo 127.255.255.255 lost_host > /etc/hosts'], privileged=True, user='root')
instance.query("SYSTEM DROP DNS CACHE")
for i in xrange(5):
with pytest.raises(QueryRuntimeException):
instance.query("SELECT * FROM remote('lost_host', 'system', 'one')")
time.sleep(20)
# DNS cache should be automatically updated after this delay
instance.query("SELECT * FROM remote('lost_host', 'system', 'one')")
def test_RELOAD_CONFIG_AND_MACROS(started_cluster): def test_RELOAD_CONFIG_AND_MACROS(started_cluster):
macros = "<yandex><macros><mac>ro</mac></macros></yandex>" macros = "<yandex><macros><mac>ro</mac></macros></yandex>"