dbms: ProfileEventsTransmitter corrections. [#METR-10463]

ProfileEventsTransmitter thread wakings are now properly aligned with minutes' boundaries.
Style and logic corrections.
This commit is contained in:
Andrey Mironov 2014-07-22 12:44:52 +04:00
parent de6633990a
commit 9bb88d4db2

View File

@ -22,11 +22,12 @@
#include <thread> #include <thread>
#include <atomic> #include <atomic>
#include <condition_variable>
namespace namespace
{ {
/** Automatically sends ProfileEvents to Graphite every minute /** Automatically sends difference of ProfileEvents to Graphite at beginning of every minute
*/ */
class ProfileEventsTransmitter class ProfileEventsTransmitter
{ {
@ -35,7 +36,13 @@ public:
{ {
try try
{ {
quit.store(true, std::memory_order_relaxed); {
std::lock_guard<std::mutex> lock{mutex};
quit = true;
}
cond.notify_one();
thread.join(); thread.join();
} }
catch (...) catch (...)
@ -47,52 +54,53 @@ public:
private: private:
void run() void run()
{ {
while (!quit.load(std::memory_order_relaxed)) const auto get_next_minute = [] {
return std::chrono::time_point_cast<std::chrono::minutes, std::chrono::system_clock>(
std::chrono::system_clock::now() + std::chrono::minutes(1)
);
};
std::unique_lock<std::mutex> lock{mutex};
while (true)
{ {
std::this_thread::sleep_until(std::chrono::system_clock::now() + std::chrono::minutes(1)); const auto quit = cond.wait_until(lock, get_next_minute(), [this] {
return this->quit;
});
if (quit)
break;
transmitCounters(); transmitCounters();
} }
} }
void transmitCounters() void transmitCounters()
{ {
decltype(prev_counters) counters; GraphiteWriter::KeyValueVector<size_t> key_vals{};
std::copy(std::begin(prev_counters), std::end(prev_counters), counters);
std::transform(
std::begin(counters), std::end(counters), ProfileEvents::counters,
prev_counters, [] (size_t& prev, size_t& current)
{
prev = current - prev;
return prev;
}
);
auto key_vals = GraphiteWriter::KeyValueVector<size_t>{};
key_vals.reserve(ProfileEvents::END); key_vals.reserve(ProfileEvents::END);
for (auto i = 0; i < ProfileEvents::END; ++i) for (auto i = 0; i < ProfileEvents::END; ++i)
{ {
key_vals.push_back({ const auto counter_increment = ProfileEvents::counters[i] - prev_counters[i];
descriptionToKey(ProfileEvents::getDescription(static_cast<ProfileEvents::Event>(i))), prev_counters[i] = ProfileEvents::counters[i];
counters[i]
}); std::string key{ProfileEvents::getDescription(static_cast<ProfileEvents::Event>(i))};
std::replace(std::begin(key), std::end(key), ' ', '_');
key_vals.emplace_back(event_path_prefix + key, counter_increment);
} }
Daemon::instance().writeToGraphite(key_vals); Daemon::instance().writeToGraphite(key_vals);
} }
std::string descriptionToKey(std::string desc) decltype(ProfileEvents::counters) prev_counters{};
{ bool quit = false;
std::transform(std::begin(desc), std::end(desc), std::begin(desc), std::mutex mutex;
[] (const char c) { return c == ' ' ? '_' : std::tolower(c); } std::condition_variable cond;
);
return desc;
}
decltype(ProfileEvents::counters) prev_counters{0};
std::atomic<bool> quit;
std::thread thread{&ProfileEventsTransmitter::run, this}; std::thread thread{&ProfileEventsTransmitter::run, this};
static constexpr auto event_path_prefix = "ClickHouse.ProfileEvents.";
}; };
} }