dbms: CurrentMetrics: development [#METR-19596].

This commit is contained in:
Alexey Milovidov 2016-01-21 04:47:28 +03:00
parent 30e559636d
commit 0f3e163c4e
32 changed files with 170 additions and 29 deletions

View File

@ -1,6 +1,6 @@
#pragma once
#include <stddef.h>
#include <cstdint>
#include <utility>
@ -15,6 +15,25 @@
#define APPLY_FOR_METRICS(M) \
M(Query) \
M(Merge) \
M(ReplicatedFetch) \
M(ReplicatedSend) \
M(ReplicatedChecks) \
M(BackgroundPoolTask) \
M(DiskSpaceReservedForMerge) \
M(DistributedSend) \
M(QueryPreempted) \
M(TCPConnection) \
M(HTTPConnection) \
M(InterserverConnection) \
M(OpenFileForRead) \
M(OpenFileForWrite) \
M(Read) \
M(Write) \
M(SendExternalTables) \
M(QueryThread) \
M(ReadonlyReplica) \
M(MemoryTracking) \
\
M(END)
@ -43,31 +62,44 @@ namespace CurrentMetrics
}
using Value = int64_t;
/// Счётчики - текущие значения метрик.
extern size_t values[END];
extern Value values[END];
/// Выставить значение указанной метрики.
inline void set(Metric metric, size_t value)
inline void set(Metric metric, Value value)
{
values[metric] = value;
}
/// Прибавить величину к значению указанной метрики. Вы затем должны вычесть величину самостоятельно. Или см. ниже class Increment.
inline void add(Metric metric, Value value = 1)
{
__sync_fetch_and_add(&values[metric], value);
}
inline void sub(Metric metric, Value value = 1)
{
add(metric, -value);
}
/// На время жизни объекта, увеличивает указанное значение на указанную величину.
class Increment
{
private:
size_t * what;
size_t amount;
Value * what;
Value amount;
Increment(size_t * what, size_t amount)
Increment(Value * what, Value amount)
: what(what), amount(amount)
{
__sync_fetch_and_add(what, amount);
}
public:
Increment(Metric metric, size_t amount = 1)
Increment(Metric metric, Value amount = 1)
: Increment(&values[metric], amount) {}
~Increment()
@ -88,6 +120,19 @@ namespace CurrentMetrics
old.what = nullptr;
return *this;
}
void changeTo(Value new_amount)
{
__sync_fetch_and_add(what, new_amount - amount);
amount = new_amount;
}
/// Уменьшить значение раньше вызова деструктора.
void destroy()
{
__sync_fetch_and_sub(what, amount);
what = nullptr;
}
};
}

View File

@ -81,6 +81,8 @@
M(ReadBackoff) \
\
M(ReplicaYieldLeadership) \
M(ReplicaPartialShutdown) \
M(ReplicaPermanentlyReadonly) \
\
M(END)

View File

@ -6,6 +6,7 @@
#include <DB/DataStreams/IProfilingBlockInputStream.h>
#include <DB/Common/setThreadName.h>
#include <DB/Common/CurrentMetrics.h>
namespace DB
@ -123,6 +124,7 @@ protected:
{
setThreadName("AsyncBlockInput");
current_memory_tracker = memory_tracker;
CurrentMetrics::Increment metric_increment{CurrentMetrics::QueryThread};
try
{

View File

@ -10,6 +10,7 @@
#include <DB/DataStreams/IProfilingBlockInputStream.h>
#include <DB/Common/setThreadName.h>
#include <DB/Common/CurrentMetrics.h>
/** Позволяет обработать множество источников блоков параллельно, используя указанное количество потоков.
@ -172,6 +173,7 @@ private:
std::exception_ptr exception;
setThreadName("ParalInputsProc");
CurrentMetrics::Increment metric_increment{CurrentMetrics::QueryThread};
try
{

View File

@ -5,6 +5,7 @@
#include <DB/IO/BufferWithOwnMemory.h>
#include <DB/Core/Defines.h>
#include <DB/Common/AIO.h>
#include <DB/Common/CurrentMetrics.h>
#include <string>
#include <limits>
@ -95,6 +96,8 @@ private:
bool is_aio = false;
/// Асинхронная операция завершилась неудачно?
bool aio_failed = false;
CurrentMetrics::Increment metric_increment{CurrentMetrics::OpenFileForRead};
};
}

View File

@ -3,6 +3,7 @@
#include <fcntl.h>
#include <DB/IO/ReadBufferFromFileDescriptor.h>
#include <DB/Common/CurrentMetrics.h>
namespace DB
@ -22,6 +23,7 @@ class ReadBufferFromFile : public ReadBufferFromFileDescriptor
{
private:
std::string file_name;
CurrentMetrics::Increment metric_increment{CurrentMetrics::OpenFileForRead};
public:
ReadBufferFromFile(const std::string & file_name_, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, int flags = -1,
@ -58,6 +60,7 @@ public:
throw Exception("Cannot close file", ErrorCodes::CANNOT_CLOSE_FILE);
fd = -1;
metric_increment.destroy();
}
virtual std::string getFileName()

View File

@ -10,6 +10,7 @@
#include <DB/Common/Stopwatch.h>
#include <DB/Common/Exception.h>
#include <DB/Common/CurrentMetrics.h>
#include <DB/IO/ReadBufferFromFileBase.h>
#include <DB/IO/ReadBuffer.h>
@ -47,7 +48,11 @@ protected:
if (profile_callback)
watch.emplace(clock_type);
ssize_t res = ::read(fd, internal_buffer.begin(), internal_buffer.size());
ssize_t res = 0;
{
CurrentMetrics::Increment metric_increment{CurrentMetrics::Read};
res = ::read(fd, internal_buffer.begin(), internal_buffer.size());
}
if (!res)
break;

View File

@ -5,6 +5,7 @@
#include <DB/IO/BufferWithOwnMemory.h>
#include <DB/Core/Defines.h>
#include <DB/Common/AIO.h>
#include <DB/Common/CurrentMetrics.h>
#include <string>
#include <unistd.h>
@ -87,6 +88,8 @@ private:
bool is_pending_write = false;
/// Асинхронная операция завершилась неудачно?
bool aio_failed = false;
CurrentMetrics::Increment metric_increment{CurrentMetrics::OpenFileForWrite};
};
}

View File

@ -5,6 +5,7 @@
#include <fcntl.h>
#include <DB/Common/ProfileEvents.h>
#include <DB/Common/CurrentMetrics.h>
#include <DB/IO/WriteBufferFromFileDescriptor.h>
@ -26,6 +27,7 @@ class WriteBufferFromFile : public WriteBufferFromFileDescriptor
{
private:
std::string file_name;
CurrentMetrics::Increment metric_increment{CurrentMetrics::OpenFileForWrite};
public:
WriteBufferFromFile(const std::string & file_name_, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, int flags = -1, mode_t mode = 0666,
@ -73,6 +75,7 @@ public:
throw Exception("Cannot close file", ErrorCodes::CANNOT_CLOSE_FILE);
fd = -1;
metric_increment.destroy();
}
/** fsync() transfers ("flushes") all modified in-core data of (i.e., modified buffer cache pages for) the file

View File

@ -5,6 +5,7 @@
#include <DB/Common/Exception.h>
#include <DB/Common/ProfileEvents.h>
#include <DB/Common/CurrentMetrics.h>
#include <DB/IO/WriteBufferFromFileBase.h>
#include <DB/IO/WriteBuffer.h>
@ -40,7 +41,11 @@ protected:
{
ProfileEvents::increment(ProfileEvents::WriteBufferFromFileDescriptorWrite);
ssize_t res = ::write(fd, working_buffer.begin() + bytes_written, offset() - bytes_written);
ssize_t res = 0;
{
CurrentMetrics::Increment metric_increment{CurrentMetrics::Write};
res = ::write(fd, working_buffer.begin() + bytes_written, offset() - bytes_written);
}
if ((-1 == res || 0 == res) && errno != EINTR)
throwFromErrno("Cannot write to file " + getFileName(), ErrorCodes::CANNOT_WRITE_TO_FILE_DESCRIPTOR);

View File

@ -5,6 +5,7 @@
#include <condition_variable>
#include <memory>
#include <chrono>
#include <DB/Common/CurrentMetrics.h>
/** Реализует приоритеты запросов.
@ -67,6 +68,7 @@ private:
if (!found)
return true;
CurrentMetrics::Increment metric_increment{CurrentMetrics::QueryPreempted};
if (std::cv_status::timeout == condvar.wait_for(lock, timeout))
return false;
}

View File

@ -3,6 +3,7 @@
#include <DB/DataStreams/RemoteBlockOutputStream.h>
#include <DB/Common/escapeForFileName.h>
#include <DB/Common/setThreadName.h>
#include <DB/Common/CurrentMetrics.h>
#include <DB/Storages/StorageDistributed.h>
#include <DB/IO/ReadBufferFromFile.h>
@ -181,6 +182,8 @@ private:
try
{
CurrentMetrics::Increment metric_increment{CurrentMetrics::DistributedSend};
ReadBufferFromFile in{file_path};
std::string insert_query;

View File

@ -6,6 +6,7 @@
#include <DB/Common/Exception.h>
#include <DB/IO/WriteHelpers.h>
#include <DB/Common/formatReadable.h>
#include <DB/Common/CurrentMetrics.h>
namespace DB
{
@ -73,13 +74,16 @@ public:
return size;
}
private:
Reservation(size_t size_) : size(size_)
Reservation(size_t size_)
: size(size_), metric_increment(CurrentMetrics::DiskSpaceReservedForMerge, size)
{
Poco::ScopedLock<Poco::FastMutex> lock(DiskSpaceMonitor::mutex);
DiskSpaceMonitor::reserved_bytes += size;
++DiskSpaceMonitor::reservation_count;
}
size_t size;
CurrentMetrics::Increment metric_increment;
};
typedef Poco::SharedPtr<Reservation> ReservationPtr;

View File

@ -1,6 +1,7 @@
#pragma once
#include <DB/Common/Stopwatch.h>
#include <DB/Common/CurrentMetrics.h>
#include <memory>
#include <list>
#include <mutex>
@ -42,6 +43,8 @@ class MergeListEntry
using container_t = std::list<MergeInfo>;
container_t::iterator it;
CurrentMetrics::Increment num_merges {CurrentMetrics::Merge};
public:
MergeListEntry(const MergeListEntry &) = delete;
MergeListEntry & operator=(const MergeListEntry &) = delete;

View File

@ -21,6 +21,7 @@
#include <DB/Client/Connection.h>
#include <DB/Common/NetException.h>
#include <DB/Common/CurrentMetrics.h>
namespace DB
@ -366,6 +367,8 @@ void Connection::sendExternalTablesData(ExternalTablesData & data)
size_t maybe_compressed_out_bytes = maybe_compressed_out ? maybe_compressed_out->count() : 0;
size_t rows = 0;
CurrentMetrics::Increment metric_increment{CurrentMetrics::SendExternalTables};
for (auto & elem : data)
{
elem.first->readPrefix();

View File

@ -3,5 +3,5 @@
namespace CurrentMetrics
{
size_t values[END]; /// Глобальная переменная - инициализируется нулями.
Value values[END]; /// Глобальная переменная - инициализируется нулями.
}

View File

@ -1,6 +1,7 @@
#include <common/likely.h>
#include <common/logger_useful.h>
#include <DB/Common/Exception.h>
#include <DB/Common/CurrentMetrics.h>
#include <DB/Common/formatReadable.h>
#include <DB/IO/WriteHelpers.h>
#include <iomanip>
@ -21,6 +22,9 @@ MemoryTracker::~MemoryTracker()
{
if (peak)
logPeakMemoryUsage();
if (amount && !next)
CurrentMetrics::sub(CurrentMetrics::MemoryTracking, amount);
}
@ -36,6 +40,9 @@ void MemoryTracker::alloc(Int64 size)
{
Int64 will_be = __sync_add_and_fetch(&amount, size);
if (!next)
CurrentMetrics::add(CurrentMetrics::MemoryTracking, size);
/// Используется непотокобезопасный генератор случайных чисел. Совместное распределение в разных потоках не будет равномерным.
/// В данном случае, это нормально.
if (unlikely(fault_probability && drand48() < fault_probability))
@ -82,11 +89,16 @@ void MemoryTracker::free(Int64 size)
if (next)
next->free(size);
else
CurrentMetrics::sub(CurrentMetrics::MemoryTracking, size);
}
void MemoryTracker::reset()
{
if (!next)
CurrentMetrics::sub(CurrentMetrics::MemoryTracking, amount);
amount = 0;
peak = 0;
limit = 0;

View File

@ -1,5 +1,6 @@
#include <future>
#include <DB/Common/setThreadName.h>
#include <DB/Common/CurrentMetrics.h>
#include <DB/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.h>
@ -111,6 +112,7 @@ void MergingAggregatedMemoryEfficientBlockInputStream::start()
{
current_memory_tracker = memory_tracker;
setThreadName("MergeAggReadThr");
CurrentMetrics::Increment metric_increment{CurrentMetrics::QueryThread};
child->readPrefix();
});
reading_pool->schedule([&task] { task(); });
@ -227,6 +229,7 @@ void MergingAggregatedMemoryEfficientBlockInputStream::mergeThread(MemoryTracker
{
setThreadName("MergeAggMergThr");
current_memory_tracker = memory_tracker;
CurrentMetrics::Increment metric_increment{CurrentMetrics::QueryThread};
try
{
@ -397,6 +400,7 @@ MergingAggregatedMemoryEfficientBlockInputStream::BlocksToMerge MergingAggregate
{
current_memory_tracker = memory_tracker;
setThreadName("MergeAggReadThr");
CurrentMetrics::Increment metric_increment{CurrentMetrics::QueryThread};
read_from_input(input);
});
auto & task = tasks.back();

View File

@ -173,6 +173,8 @@ off_t ReadBufferAIO::doSeek(off_t off, int whence)
void ReadBufferAIO::synchronousRead()
{
CurrentMetrics::Increment metric_increment{CurrentMetrics::Read};
prepare();
bytes_read = ::pread(fd, buffer_begin, region_aligned_size, region_aligned_begin);
@ -207,6 +209,8 @@ bool ReadBufferAIO::waitForAIOCompletion()
if (is_eof || !is_pending_read)
return false;
CurrentMetrics::Increment metric_increment{CurrentMetrics::Read};
bytes_read = future_bytes_read.get();
is_pending_read = false;

View File

@ -164,6 +164,8 @@ bool WriteBufferAIO::waitForAIOCompletion()
if (!is_pending_write)
return false;
CurrentMetrics::Increment metric_increment{CurrentMetrics::Write};
while (io_getevents(aio_context.ctx, events.size(), events.size(), &events[0], nullptr) < 0)
{
if (errno != EINTR)

View File

@ -1633,6 +1633,7 @@ private:
{
current_memory_tracker = memory_tracker;
setThreadName("MergingAggregtd");
CurrentMetrics::Increment metric_increment{CurrentMetrics::QueryThread};
try
{

View File

@ -1,6 +1,7 @@
#pragma once
#include <DB/IO/WriteBufferFromHTTPServerResponse.h>
#include <DB/Common/CurrentMetrics.h>
#include "Server.h"
@ -33,6 +34,8 @@ public:
private:
Server & server;
CurrentMetrics::Increment metric_increment{CurrentMetrics::HTTPConnection};
Logger * log;
/// Функция также инициализирует used_output.

View File

@ -1,6 +1,7 @@
#pragma once
#include "Server.h"
#include <DB/Common/CurrentMetrics.h>
namespace DB
@ -19,7 +20,7 @@ public:
private:
Server & server;
CurrentMetrics::Increment metric_increment{CurrentMetrics::InterserverConnection};
Logger * log;
void processQuery(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response);

View File

@ -52,7 +52,7 @@ void MetricsTransmitter::run()
void MetricsTransmitter::transmit()
{
GraphiteWriter::KeyValueVector<size_t> key_vals{};
GraphiteWriter::KeyValueVector<ssize_t> key_vals{};
key_vals.reserve(ProfileEvents::END + CurrentMetrics::END);
for (size_t i = 0; i < ProfileEvents::END; ++i)

View File

@ -1,6 +1,7 @@
#pragma once
#include "Server.h"
#include <DB/Common/CurrentMetrics.h>
namespace DB
@ -23,6 +24,7 @@ private:
Server & server;
Logger * log;
const String profile;
CurrentMetrics::Increment metric_increment{CurrentMetrics::HTTPConnection};
void processQuery(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response);
};

View File

@ -34,6 +34,7 @@
#include <DB/Storages/System/StorageSystemColumns.h>
#include <DB/Storages/System/StorageSystemFunctions.h>
#include <DB/Storages/System/StorageSystemClusters.h>
#include <DB/Storages/System/StorageSystemMetrics.h>
#include <zkutil/ZooKeeper.h>
@ -303,6 +304,7 @@ int Server::main(const std::vector<std::string> & args)
global_context->addTable("system", "processes", StorageSystemProcesses::create("processes"));
global_context->addTable("system", "settings", StorageSystemSettings::create("settings"));
global_context->addTable("system", "events", StorageSystemEvents::create("events"));
global_context->addTable("system", "metrics", StorageSystemMetrics::create("metrics"));
global_context->addTable("system", "merges", StorageSystemMerges::create("merges"));
global_context->addTable("system", "replicas", StorageSystemReplicas::create("replicas"));
global_context->addTable("system", "replication_queue", StorageSystemReplicationQueue::create("replication_queue"));

View File

@ -9,6 +9,7 @@
#include <DB/DataStreams/BlockIO.h>
#include <DB/Common/Stopwatch.h>
#include <DB/Common/CurrentMetrics.h>
#include "Server.h"
@ -98,6 +99,8 @@ private:
/// На данный момент, поддерживается одновременное выполнение только одного запроса в соединении.
QueryState state;
CurrentMetrics::Increment metric_increment{CurrentMetrics::TCPConnection};
void runImpl();

View File

@ -1,5 +1,6 @@
#include <DB/Common/Exception.h>
#include <DB/Common/setThreadName.h>
#include <DB/Common/CurrentMetrics.h>
#include <DB/IO/WriteHelpers.h>
#include <common/logger_useful.h>
#include <DB/Storages/MergeTree/BackgroundProcessingPool.h>
@ -160,12 +161,16 @@ void BackgroundProcessingPool::threadFunction()
if (task->removed)
continue;
Context context(*this, counters_diff);
bool done_work = task->function(context);
{
CurrentMetrics::Increment metric_increment{CurrentMetrics::BackgroundPoolTask};
/// Если задача сделала полезную работу, то она сможет выполняться в следующий раз хоть сразу.
/// Если нет - добавляем задержку перед повторным исполнением.
task->next_time_to_execute = time(0) + (done_work ? 0 : sleep_seconds);
Context context(*this, counters_diff);
bool done_work = task->function(context);
/// Если задача сделала полезную работу, то она сможет выполняться в следующий раз хоть сразу.
/// Если нет - добавляем задержку перед повторным исполнением.
task->next_time_to_execute = time(0) + (done_work ? 0 : sleep_seconds);
}
}
catch (...)
{

View File

@ -11,6 +11,7 @@
#include <DB/IO/CompressedReadBuffer.h>
#include <DB/IO/HashingReadBuffer.h>
#include <DB/Columns/ColumnsNumber.h>
#include <DB/Common/CurrentMetrics.h>
namespace DB
@ -264,6 +265,8 @@ void MergeTreePartChecker::checkDataPart(
const DataTypes & primary_key_data_types,
MergeTreeData::DataPart::Checksums * out_checksums)
{
CurrentMetrics::Increment metric_increment{CurrentMetrics::ReplicatedChecks};
if (!path.empty() && path.back() != '/')
path += "/";

View File

@ -1,5 +1,6 @@
#include <DB/Storages/MergeTree/ReplicatedMergeTreePartsExchange.h>
#include <DB/Storages/StorageReplicatedMergeTree.h>
#include <DB/Common/CurrentMetrics.h>
namespace DB
@ -27,6 +28,8 @@ void ReplicatedMergeTreePartsServer::processQuery(const Poco::Net::HTMLForm & pa
Poco::ScopedReadRWLock part_lock(part->columns_lock);
CurrentMetrics::Increment metric_increment{CurrentMetrics::ReplicatedSend};
/// Список файлов возьмем из списка контрольных сумм.
MergeTreeData::DataPart::Checksums checksums = part->checksums;
/// Добавим файлы, которых нет в списке контрольных сумм.
@ -98,6 +101,8 @@ MergeTreeData::MutableDataPartPtr ReplicatedMergeTreePartsFetcher::fetchPart(
part_file.remove(true);
}
CurrentMetrics::Increment metric_increment{CurrentMetrics::ReplicatedFetch};
part_file.createDirectory();
MergeTreeData::MutableDataPartPtr new_data_part = std::make_shared<MergeTreeData::DataPart>(data);

View File

@ -70,6 +70,8 @@ void ReplicatedMergeTreeRestartingThread::run()
else
LOG_WARNING(log, "ZooKeeper session has expired. Switching to a new session.");
if (!storage.is_readonly)
CurrentMetrics::add(CurrentMetrics::ReadonlyReplica);
storage.is_readonly = true;
partialShutdown();
}
@ -98,6 +100,8 @@ void ReplicatedMergeTreeRestartingThread::run()
break;
}
if (storage.is_readonly)
CurrentMetrics::sub(CurrentMetrics::ReadonlyReplica);
storage.is_readonly = false;
first_time = false;
need_restart = false;
@ -118,7 +122,7 @@ void ReplicatedMergeTreeRestartingThread::run()
}
catch (...)
{
tryLogCurrentException("__PRETTY_FUNCTION__", "Cannot get replica delays");
tryLogCurrentException(__PRETTY_FUNCTION__, "Cannot get replica delays");
error = true;
}
@ -338,6 +342,8 @@ void ReplicatedMergeTreeRestartingThread::activateReplica()
void ReplicatedMergeTreeRestartingThread::partialShutdown()
{
ProfileEvents::increment(ProfileEvents::ReplicaPartialShutdown);
storage.leader_election = nullptr;
storage.shutdown_called = true;
storage.shutdown_event.set();
@ -378,7 +384,10 @@ void ReplicatedMergeTreeRestartingThread::partialShutdown()
void ReplicatedMergeTreeRestartingThread::goReadOnlyPermanently()
{
LOG_INFO(log, "Going to readonly mode");
ProfileEvents::increment(ProfileEvents::ReplicaPermanentlyReadonly);
if (!storage.is_readonly)
CurrentMetrics::add(CurrentMetrics::ReadonlyReplica);
storage.is_readonly = true;
stop();

View File

@ -14,8 +14,8 @@ StorageSystemMetrics::StorageSystemMetrics(const std::string & name_)
: name(name_),
columns
{
{"event", new DataTypeString},
{"value", new DataTypeUInt64},
{"metric", new DataTypeString},
{"value", new DataTypeInt64},
}
{
}
@ -48,19 +48,16 @@ BlockInputStreams StorageSystemMetrics::read(
ColumnWithTypeAndName col_value;
col_value.name = "value";
col_value.type = new DataTypeUInt64;
col_value.column = new ColumnUInt64;
col_value.type = new DataTypeInt64;
col_value.column = new ColumnInt64;
block.insert(col_value);
for (size_t i = 0; i < CurrentMetrics::END; ++i)
{
UInt64 value = CurrentMetrics::values[i];
auto value = CurrentMetrics::values[i];
if (0 != value)
{
col_metric.column->insert(String(CurrentMetrics::getDescription(CurrentMetrics::Metric(i))));
col_value.column->insert(value);
}
col_metric.column->insert(String(CurrentMetrics::getDescription(CurrentMetrics::Metric(i))));
col_value.column->insert(value);
}
return BlockInputStreams(1, new OneBlockInputStream(block));