allow several <graphite> targets (#603)

* allow several <graphite> targets

* fix

* fix

* Adjustable parts

* changelog version

* fix

* changelog

* Style fixes

* attachSystemTables

* config describe

* fixes

* fixes
This commit is contained in:
proller 2017-03-21 22:08:09 +03:00 committed by alexey-milovidov
parent daefb877df
commit 670e98fa92
17 changed files with 213 additions and 97 deletions

View File

@ -1,4 +1,10 @@
## [1.1.54189](https://github.com/yandex/Clickhouse/tree/v1.1.54189-testing) (2017-03-17)
[Full Changelog](https://github.com/yandex/Clickhouse/compare/v1.1.54188-stable...v1.1.54189-testing)
- Config: Allow define several graphite blocks, graphite.interval=60 option added. use_graphite option deleted.
## [1.1.54181](https://github.com/yandex/Clickhouse/tree/v1.1.54181-testing) (2017-03-10)
[Full Changelog](https://github.com/yandex/Clickhouse/compare/v1.1.54165-stable...v1.1.54181-testing)

View File

@ -0,0 +1,16 @@
#pragma once
#include <string>
#include <vector>
namespace Poco
{
namespace Util
{
class AbstractConfiguration;
}
}
namespace DB
{
/// get all internal key names for given key
std::vector<std::string> getMultipleKeysFromConfig(Poco::Util::AbstractConfiguration & config, const std::string & root, const std::string & name);
}

View File

@ -0,0 +1,13 @@
#pragma once
#include <DB/Databases/IDatabase.h>
namespace DB
{
class Context;
class AsynchronousMetrics;
void attachSystemTablesServer(DatabasePtr system_database, Context * global_context, bool has_zookeeper);
void attachSystemTablesLocal(DatabasePtr system_database);
void attachSystemTablesAsync(DatabasePtr system_database, AsynchronousMetrics & async_metrics);
}

View File

@ -1,13 +0,0 @@
#pragma once
#include <DB/Databases/IDatabase.h>
namespace DB
{
class Context;
class AsynchronousMetrics;
void attach_system_tables_server(DatabasePtr system_database, Context * global_context, bool has_zookeeper);
void attach_system_tables_local(DatabasePtr system_database);
void attach_system_tables_async(DatabasePtr system_database, AsynchronousMetrics & async_metrics);
}

View File

@ -0,0 +1,21 @@
#include <DB/Common/getMultipleKeysFromConfig.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <DB/Common/StringUtils.h>
namespace DB
{
std::vector<std::string> getMultipleKeysFromConfig(Poco::Util::AbstractConfiguration & config, const std::string & root, const std::string & name)
{
std::vector<std::string> values;
Poco::Util::AbstractConfiguration::Keys config_keys;
config.keys(root, config_keys);
for (const auto & key : config_keys)
{
if (key != name && !(startsWith(key.data(), name + "[") && endsWith(key.data(), "]")))
continue;
values.emplace_back(key);
}
return values;
}
}

View File

@ -5,7 +5,7 @@
#include <Poco/Util/OptionCallback.h>
#include <Poco/String.h>
#include <DB/Databases/DatabaseOrdinary.h>
#include <DB/Storages/System/attach_system_tables.h>
#include <DB/Storages/System/attachSystemTables.h>
#include <DB/Interpreters/Context.h>
#include <DB/Interpreters/ProcessList.h>
#include <DB/Interpreters/executeQuery.h>
@ -379,7 +379,7 @@ void LocalServer::attachSystemTables()
context->addDatabase("system", system_database);
}
attach_system_tables_local(system_database);
attachSystemTablesLocal(system_database);
}

View File

@ -1,15 +1,15 @@
#include "MetricsTransmitter.h"
#include <Poco/Util/Application.h>
#include <Poco/Util/LayeredConfiguration.h>
#include <daemon/BaseDaemon.h>
#include <DB/Common/setThreadName.h>
#include <DB/Common/CurrentMetrics.h>
#include <DB/Common/Exception.h>
#include <DB/Common/setThreadName.h>
#include <DB/Interpreters/AsynchronousMetrics.h>
namespace DB
{
MetricsTransmitter::~MetricsTransmitter()
{
try
@ -32,13 +32,19 @@ MetricsTransmitter::~MetricsTransmitter()
void MetricsTransmitter::run()
{
setThreadName("MetricsTransmit");
auto & config = Poco::Util::Application::instance().config();
auto interval = config.getInt(config_name + ".interval", 60);
/// Next minute at 00 seconds. To avoid time drift and transmit values exactly each minute.
const auto get_next_minute = []
{
return std::chrono::time_point_cast<std::chrono::minutes, std::chrono::system_clock>(
std::chrono::system_clock::now() + std::chrono::minutes(1));
const std::string thread_name = "MericsTrns " + std::to_string(interval) + "s";
setThreadName(thread_name.c_str());
const auto get_next_time = [](size_t seconds) {
/// To avoid time drift and transmit values exactly each interval:
/// next time aligned to system seconds
/// (60s -> every minute at 00 seconds, 5s -> every minute:[00, 05, 15 ... 55]s, 3600 -> every hour:00:00
return std::chrono::system_clock::time_point(
(std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch()) / seconds) * seconds
+ std::chrono::seconds(seconds));
};
std::vector<ProfileEvents::Count> prev_counters(ProfileEvents::end());
@ -47,7 +53,7 @@ void MetricsTransmitter::run()
while (true)
{
if (cond.wait_until(lock, get_next_minute(), [this] { return quit; }))
if (cond.wait_until(lock, get_next_time(interval), [this] { return quit; }))
break;
transmit(prev_counters);
@ -57,35 +63,46 @@ void MetricsTransmitter::run()
void MetricsTransmitter::transmit(std::vector<ProfileEvents::Count> & prev_counters)
{
auto & config = Poco::Util::Application::instance().config();
auto async_metrics_values = async_metrics.getValues();
GraphiteWriter::KeyValueVector<ssize_t> key_vals{};
key_vals.reserve(ProfileEvents::end() + CurrentMetrics::end() + async_metrics_values.size());
for (size_t i = 0, end = ProfileEvents::end(); i < end; ++i)
{
const auto counter = ProfileEvents::counters[i].load(std::memory_order_relaxed);
const auto counter_increment = counter - prev_counters[i];
prev_counters[i] = counter;
std::string key {ProfileEvents::getDescription(static_cast<ProfileEvents::Event>(i))};
key_vals.emplace_back(profile_events_path_prefix + key, counter_increment);
if (config.getBool(config_name + ".events", true))
{
for (size_t i = 0, end = ProfileEvents::end(); i < end; ++i)
{
const auto counter = ProfileEvents::counters[i].load(std::memory_order_relaxed);
const auto counter_increment = counter - prev_counters[i];
prev_counters[i] = counter;
std::string key{ProfileEvents::getDescription(static_cast<ProfileEvents::Event>(i))};
key_vals.emplace_back(profile_events_path_prefix + key, counter_increment);
}
}
for (size_t i = 0, end = CurrentMetrics::end(); i < end; ++i)
if (config.getBool(config_name + ".metrics", true))
{
const auto value = CurrentMetrics::values[i].load(std::memory_order_relaxed);
for (size_t i = 0, end = CurrentMetrics::end(); i < end; ++i)
{
const auto value = CurrentMetrics::values[i].load(std::memory_order_relaxed);
std::string key {CurrentMetrics::getDescription(static_cast<CurrentMetrics::Metric>(i))};
key_vals.emplace_back(current_metrics_path_prefix + key, value);
std::string key{CurrentMetrics::getDescription(static_cast<CurrentMetrics::Metric>(i))};
key_vals.emplace_back(current_metrics_path_prefix + key, value);
}
}
for (const auto & name_value : async_metrics_values)
if (config.getBool(config_name + ".asynchronous_metrics", true))
{
key_vals.emplace_back(asynchronous_metrics_path_prefix + name_value.first, name_value.second);
for (const auto & name_value : async_metrics_values)
{
key_vals.emplace_back(asynchronous_metrics_path_prefix + name_value.first, name_value.second);
}
}
BaseDaemon::instance().writeToGraphite(key_vals);
if (key_vals.size())
BaseDaemon::instance().writeToGraphite(key_vals, config_name);
}
}

View File

@ -1,16 +1,15 @@
#pragma once
#include <vector>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <mutex>
#include <string>
#include <thread>
#include <vector>
#include <DB/Common/ProfileEvents.h>
namespace DB
{
class AsynchronousMetrics;
/** Automatically sends
@ -22,7 +21,10 @@ class AsynchronousMetrics;
class MetricsTransmitter
{
public:
MetricsTransmitter(const AsynchronousMetrics & async_metrics_) : async_metrics(async_metrics_) {}
MetricsTransmitter(const AsynchronousMetrics & async_metrics, const std::string & config_name)
: async_metrics{async_metrics}, config_name{config_name}
{
}
~MetricsTransmitter();
private:
@ -30,15 +32,15 @@ private:
void transmit(std::vector<ProfileEvents::Count> & prev_counters);
const AsynchronousMetrics & async_metrics;
const std::string config_name;
bool quit = false;
std::mutex mutex;
std::condition_variable cond;
std::thread thread {&MetricsTransmitter::run, this};
std::thread thread{&MetricsTransmitter::run, this};
static constexpr auto profile_events_path_prefix = "ClickHouse.ProfileEvents.";
static constexpr auto current_metrics_path_prefix = "ClickHouse.Metrics.";
static constexpr auto asynchronous_metrics_path_prefix = "ClickHouse.AsynchronousMetrics.";
};
}

View File

@ -13,9 +13,11 @@
#include <common/ErrorHandlers.h>
#include <ext/scope_guard.hpp>
#include <zkutil/ZooKeeper.h>
#include <zkutil/ZooKeeperNodeCache.h>
#include <DB/Common/Macros.h>
#include <DB/Common/StringUtils.h>
#include <DB/Common/getFQDNOrHostName.h>
#include <DB/Common/getMultipleKeysFromConfig.h>
#include <DB/Databases/DatabaseOrdinary.h>
#include <DB/IO/HTTPCommon.h>
#include <DB/Interpreters/AsynchronousMetrics.h>
@ -23,7 +25,7 @@
#include <DB/Interpreters/loadMetadata.h>
#include <DB/Storages/MergeTree/ReshardingWorker.h>
#include <DB/Storages/StorageReplicatedMergeTree.h>
#include <DB/Storages/System/attach_system_tables.h>
#include <DB/Storages/System/attachSystemTables.h>
#include "ConfigReloader.h"
#include "HTTPHandler.h"
#include "InterserverIOHTTPHandler.h"
@ -32,6 +34,7 @@
#include "StatusFile.h"
#include "TCPHandler.h"
namespace DB
{
namespace ErrorCodes
@ -221,7 +224,7 @@ int Server::main(const std::vector<std::string> & args)
{
auto old_configuration = loaded_config.configuration;
loaded_config = ConfigProcessor().loadConfigWithZooKeeperIncludes(
config_path, main_config_zk_node_cache, /* fallback_to_preprocessed = */ true);
config_path, main_config_zk_node_cache, /* fallback_to_preprocessed = */ true);
config().removeConfiguration(old_configuration.get());
config().add(loaded_config.configuration.duplicate(), PRIO_DEFAULT, false);
}
@ -321,11 +324,11 @@ int Server::main(const std::vector<std::string> & args)
/// Initialize main config reloader.
std::string include_from_path = config().getString("include_from", "/etc/metrika.xml");
auto main_config_reloader = std::make_unique<ConfigReloader>(
config_path, include_from_path,
std::move(main_config_zk_node_cache),
[&](ConfigurationPtr config) { global_context->setClustersConfig(config); },
/* already_loaded = */ true);
auto main_config_reloader = std::make_unique<ConfigReloader>(config_path,
include_from_path,
std::move(main_config_zk_node_cache),
[&](ConfigurationPtr config) { global_context->setClustersConfig(config); },
/* already_loaded = */ true);
/// Initialize users config reloader.
std::string users_config_path = config().getString("users_config", config_path);
@ -337,11 +340,11 @@ int Server::main(const std::vector<std::string> & args)
if (Poco::File(config_dir + users_config_path).exists())
users_config_path = config_dir + users_config_path;
}
auto users_config_reloader = std::make_unique<ConfigReloader>(
users_config_path, include_from_path,
zkutil::ZooKeeperNodeCache([&] { return global_context->getZooKeeper(); }),
[&](ConfigurationPtr config) { global_context->setUsersConfig(config); },
/* already_loaded = */ false);
auto users_config_reloader = std::make_unique<ConfigReloader>(users_config_path,
include_from_path,
zkutil::ZooKeeperNodeCache([&] { return global_context->getZooKeeper(); }),
[&](ConfigurationPtr config) { global_context->setUsersConfig(config); },
/* already_loaded = */ false);
/// Limit on total number of coucurrently executed queries.
global_context->getProcessList().setMaxSize(config().getInt("max_concurrent_queries", 0));
@ -385,7 +388,7 @@ int Server::main(const std::vector<std::string> & args)
DatabasePtr system_database = global_context->getDatabase("system");
attach_system_tables_server(system_database, global_context.get(), has_zookeeper);
attachSystemTablesServer(system_database, global_context.get(), has_zookeeper);
bool has_resharding_worker = false;
if (has_zookeeper && config().has("resharding"))
@ -425,12 +428,8 @@ int Server::main(const std::vector<std::string> & args)
std::vector<std::unique_ptr<Poco::Net::TCPServer>> servers;
std::vector<std::string> listen_hosts;
Poco::Util::AbstractConfiguration::Keys config_keys;
config().keys("", config_keys);
for (const auto & key : config_keys)
for (const auto & key : DB::getMultipleKeysFromConfig(config(), "", "listen_host"))
{
if (!startsWith(key.data(), "listen_host"))
continue;
listen_hosts.emplace_back(config().getString(key));
}
@ -585,10 +584,14 @@ int Server::main(const std::vector<std::string> & args)
/// This object will periodically calculate some metrics.
AsynchronousMetrics async_metrics(*global_context);
attach_system_tables_async(system_database, async_metrics);
attachSystemTablesAsync(system_database, async_metrics);
std::vector<std::unique_ptr<MetricsTransmitter>> metrics_transmitters;
for (const auto & graphite_key : DB::getMultipleKeysFromConfig(config(), "", "graphite"))
{
metrics_transmitters.emplace_back(std::make_unique<MetricsTransmitter>(async_metrics, graphite_key));
}
const auto metrics_transmitter
= config().getBool("use_graphite", true) ? std::make_unique<MetricsTransmitter>(async_metrics) : nullptr;
waitForTerminationRequest();
}

View File

@ -1,3 +0,0 @@
<yandex>
<use_graphite replace="replace">false</use_graphite>
</yandex>

View File

@ -138,15 +138,36 @@
<builtin_dictionaries_reload_interval>3600</builtin_dictionaries_reload_interval>
<!-- Sending data to Graphite for monitoring. -->
<use_graphite>false</use_graphite>
<!-- Uncomment if use_graphite.
<!-- Sending data to Graphite for monitoring. Several sections can be defined. -->
<!--
interval - send every X second
root_path - prefix for keys
metrics - send data from table system.metrics
events - send data from table system.events
asynchronous_metrics - send data from table system.asynchronous_metrics
-->
<!--
<graphite>
<host>localhost</host>
<port>42000</port>
<root_path>one_min</root_path>
<timeout>0.1</timeout>
<interval>60</interval>
<root_path>one_min</root_path>
<metrics>true<metrics>
<events>true<events>
<asynchronous_metrics>true<asynchronous_metrics>
</graphite>
<graphite>
<host>localhost</host>
<port>42000</port>
<timeout>0.1</timeout>
<interval>1</interval>
<root_path>one_sec</root_path>
<metrics>true<metrics>
<events>true<events>
<asynchronous_metrics>false<asynchronous_metrics>
</graphite>
-->

View File

@ -1,4 +1,4 @@
#include <DB/Storages/System/attach_system_tables.h>
#include <DB/Storages/System/attachSystemTables.h>
#include <DB/Storages/System/StorageSystemAsynchronousMetrics.h>
#include <DB/Storages/System/StorageSystemBuildOptions.h>
@ -23,7 +23,7 @@
namespace DB
{
void attach_system_tables_local(DatabasePtr system_database)
void attachSystemTablesLocal(DatabasePtr system_database)
{
system_database->attachTable("one", StorageSystemOne::create("one"));
system_database->attachTable("numbers", StorageSystemNumbers::create("numbers"));
@ -37,9 +37,9 @@ void attach_system_tables_local(DatabasePtr system_database)
system_database->attachTable("build_options", StorageSystemBuildOptions::create("build_options"));
}
void attach_system_tables_server(DatabasePtr system_database, Context * global_context, bool has_zookeeper)
void attachSystemTablesServer(DatabasePtr system_database, Context * global_context, bool has_zookeeper)
{
attach_system_tables_local(system_database);
attachSystemTablesLocal(system_database);
system_database->attachTable("parts", StorageSystemParts::create("parts"));
system_database->attachTable("processes", StorageSystemProcesses::create("processes"));
system_database->attachTable("metrics", StorageSystemMetrics::create("metrics"));
@ -54,7 +54,7 @@ void attach_system_tables_server(DatabasePtr system_database, Context * global_c
system_database->attachTable("zookeeper", StorageSystemZooKeeper::create("zookeeper"));
}
void attach_system_tables_async(DatabasePtr system_database, AsynchronousMetrics & async_metrics)
void attachSystemTablesAsync(DatabasePtr system_database, AsynchronousMetrics & async_metrics)
{
system_database->attachTable("asynchronous_metrics", StorageSystemAsynchronousMetrics::create("asynchronous_metrics", async_metrics));
}

View File

@ -15,6 +15,8 @@
## 1. Форматирование
0. Большую часть форматирования сделает автоматически clang-format. Инструкция для подключения clang-format в kdevelop описана в файле format_sources
1. Отступы табами. Настройте среду разработки так, чтобы таб был по ширине равен четырём символам.
2. Открывающая фигурная скобка на новой, отдельной строке. (Закрывающая - тоже.)

View File

@ -1,2 +1,2 @@
# Settings -> Configure KDevelop -> Source Formatter -> C++ ; Custom Script Formatter ; Kdevelop: kdev_format_source
# Settings -> Configure KDevelop -> Source Formatter -> [C++, C] ; Custom Script Formatter ; Kdevelop: kdev_format_source
*.c *.cpp *.h *.hpp: mv $TMPFILE $TMPFILE.tmp; cat $TMPFILE.tmp | clang-format -style=file -assume-filename=`pwd`/.clang-format > $TMPFILE

View File

@ -85,5 +85,29 @@ std::ostream & operator<<(std::ostream & stream, const std::pair<K, V> & what)
return stream;
}
#include <ratio>
template <std::intmax_t Num, std::intmax_t Denom>
std::ostream & operator<<(std::ostream & stream, const std::ratio<Num, Denom> & what)
{
stream << "ratio<Num=" << Num << ", Denom=" << Denom << ">";
return stream;
}
#include <chrono>
template <class clock, class duration>
std::ostream & operator<<(std::ostream & stream, const std::chrono::duration<clock, duration> & what)
{
stream << "chrono::duration<clock=" << clock() << ", duration=" << duration() << ">{" << what.count() << "}";
return stream;
}
template <class clock, class duration>
std::ostream & operator<<(std::ostream & stream, const std::chrono::time_point<clock, duration> & what)
{
stream << "chrono::time_point{" << what.time_since_epoch() << "}";
return stream;
}
// TODO: add more types

View File

@ -107,18 +107,27 @@ public:
/// root_path по умолчанию one_min
/// key - лучше группировать по смыслу. Например "meminfo.cached" или "meminfo.free", "meminfo.total"
template <class T>
void writeToGraphite(const std::string & key, const T & value, time_t timestamp = 0, const std::string & custom_root_path = "")
void writeToGraphite(const std::string & key, const T & value, const std::string & config_name = "graphite", time_t timestamp = 0, const std::string & custom_root_path = "")
{
graphite_writer->write(key, value, timestamp, custom_root_path);
auto writer = getGraphiteWriter(config_name);
if (writer)
writer->write(key, value, timestamp, custom_root_path);
}
template <class T>
void writeToGraphite(const GraphiteWriter::KeyValueVector<T> & key_vals, time_t timestamp = 0, const std::string & custom_root_path = "")
void writeToGraphite(const GraphiteWriter::KeyValueVector<T> & key_vals, const std::string & config_name = "graphite", time_t timestamp = 0, const std::string & custom_root_path = "")
{
graphite_writer->write(key_vals, timestamp, custom_root_path);
auto writer = getGraphiteWriter(config_name);
if (writer)
writer->write(key_vals, timestamp, custom_root_path);
}
GraphiteWriter * getGraphiteWriter() { return graphite_writer.get(); }
GraphiteWriter * getGraphiteWriter(const std::string & config_name = "graphite")
{
if (graphite_writers.count(config_name))
return graphite_writers[config_name].get();
return nullptr;
}
std::experimental::optional<size_t> getLayer() const
{
@ -196,7 +205,7 @@ protected:
Poco::AutoPtr<Poco::FileChannel> error_log_file;
Poco::AutoPtr<Poco::SyslogChannel> syslog_channel;
std::unique_ptr<GraphiteWriter> graphite_writer;
std::map<std::string, std::unique_ptr<GraphiteWriter>> graphite_writers;
std::experimental::optional<size_t> layer;

View File

@ -7,7 +7,6 @@
#include <sys/fcntl.h>
#include <sys/time.h>
#include <errno.h>
#include <string.h>
#include <signal.h>
#include <cxxabi.h>
@ -17,16 +16,13 @@
#define _XOPEN_SOURCE
#endif
#include <ucontext.h>
#include <typeinfo>
#include <common/logger_useful.h>
#include <common/ErrorHandlers.h>
#include <sys/time.h>
#include <sys/resource.h>
#include <iostream>
#include <memory>
#include <Poco/Observer.h>
#include <Poco/Logger.h>
#include <Poco/AutoPtr.h>
@ -48,13 +44,12 @@
#include <Poco/NumberFormatter.h>
#include <Poco/Condition.h>
#include <Poco/SyslogChannel.h>
#include <DB/Common/Exception.h>
#include <DB/IO/WriteBufferFromFileDescriptor.h>
#include <DB/IO/ReadBufferFromFileDescriptor.h>
#include <DB/IO/ReadHelpers.h>
#include <DB/IO/WriteHelpers.h>
#include <DB/Common/getMultipleKeysFromConfig.h>
#include <DB/Common/ClickHouseRevision.h>
#include <daemon/OwnPatternFormatter.h>
@ -792,7 +787,10 @@ void BaseDaemon::initialize(Application& self)
signal_listener.reset(new SignalListener(*this));
signal_listener_thread.start(*signal_listener);
graphite_writer.reset(new GraphiteWriter("graphite"));
for (const auto & key : DB::getMultipleKeysFromConfig(config(), "", "graphite"))
{
graphite_writers.emplace(key, std::make_unique<GraphiteWriter>(key));
}
}
void BaseDaemon::logRevision() const