Separate compilation of CurrentMetrics; Separated AsynchronousMetrics from CurrentMetrics; Comments [#METR-23237].

This commit is contained in:
Alexey Milovidov 2016-10-24 07:06:27 +03:00
parent de479eb36a
commit ea852bb9d8
40 changed files with 523 additions and 222 deletions

View File

@ -371,7 +371,7 @@ add_library (dbms
include/DB/Interpreters/ClusterProxy/DescribeQueryConstructor.h
include/DB/Interpreters/ClusterProxy/AlterQueryConstructor.h
include/DB/Interpreters/ClusterProxy/Query.h
include/DB/Interpreters/ActiveMetrics.h
include/DB/Interpreters/AsynchronousMetrics.h
include/DB/Common/Allocator.h
include/DB/Common/CombinedCardinalityEstimator.h
include/DB/Common/ExternalTable.h
@ -542,6 +542,7 @@ add_library (dbms
include/DB/Storages/StorageStripeLog.h
include/DB/Storages/System/StorageSystemEvents.h
include/DB/Storages/System/StorageSystemMetrics.h
include/DB/Storages/System/StorageSystemAsynchronousMetrics.h
include/DB/Storages/System/StorageSystemTables.h
include/DB/Storages/MergeTree/MarkRange.h
include/DB/Storages/MergeTree/MergeTreeDataMerger.h
@ -648,6 +649,7 @@ add_library (dbms
src/Storages/System/StorageSystemProcesses.cpp
src/Storages/System/StorageSystemEvents.cpp
src/Storages/System/StorageSystemMetrics.cpp
src/Storages/System/StorageSystemAsynchronousMetrics.cpp
src/Storages/System/StorageSystemMerges.cpp
src/Storages/System/StorageSystemSettings.cpp
src/Storages/System/StorageSystemZooKeeper.cpp
@ -853,7 +855,7 @@ add_library (dbms
src/Interpreters/ClusterProxy/DescribeQueryConstructor.cpp
src/Interpreters/ClusterProxy/AlterQueryConstructor.cpp
src/Interpreters/ClusterProxy/Query.cpp
src/Interpreters/ActiveMetrics.cpp
src/Interpreters/AsynchronousMetrics.cpp
src/Functions/FunctionFactory.cpp
src/Functions/FunctionsArithmetic.cpp

View File

@ -1,94 +1,45 @@
#pragma once
#include <stddef.h>
#include <cstdint>
#include <utility>
#include <atomic>
/** Позволяет считать количество одновременно происходящих событий или текущее значение какой-либо метрики.
* - для высокоуровневого профайлинга.
/** Allows to count number of simultaneously happening processes or current value of some metric.
* - for high-level profiling.
*
* Также смотрите ProfileEvents.h
* В ProfileEvents считается общее количество произошедших (точечных) событий - например, сколько раз были выполнены запросы.
* В CurrentMetrics считается количество одновременных событий - например, сколько сейчас одновременно выполняется запросов,
* или текущее значение метрики - например, величина отставания реплики в секундах.
* See also ProfileEvents.h
* ProfileEvents counts number of happened events - for example, how many times queries was executed.
* CurrentMetrics counts number of simultaneously happening events - for example, number of currently executing queries, right now,
* or just current value of some metric - for example, replica delay in seconds.
*
* CurrentMetrics are updated instantly and are correct for any point in time.
* For periodically (asynchronously) updated metrics, see AsynchronousMetrics.h
*/
#define APPLY_FOR_METRICS(M) \
M(Query) \
M(Merge) \
M(ReplicatedFetch) \
M(ReplicatedSend) \
M(ReplicatedChecks) \
M(BackgroundPoolTask) \
M(DiskSpaceReservedForMerge) \
M(DistributedSend) \
M(QueryPreempted) \
M(TCPConnection) \
M(HTTPConnection) \
M(InterserverConnection) \
M(OpenFileForRead) \
M(OpenFileForWrite) \
M(Read) \
M(Write) \
M(SendExternalTables) \
M(QueryThread) \
M(ReadonlyReplica) \
M(MemoryTracking) \
M(MarkCacheBytes) \
M(MarkCacheFiles) \
M(UncompressedCacheBytes) \
M(UncompressedCacheCells) \
M(ReplicasMaxQueueSize) \
M(ReplicasMaxInsertsInQueue) \
M(ReplicasMaxMergesInQueue) \
M(ReplicasSumQueueSize) \
M(ReplicasSumInsertsInQueue) \
M(ReplicasSumMergesInQueue) \
M(ReplicasMaxAbsoluteDelay) \
M(ReplicasMaxRelativeDelay) \
M(MaxPartCountForPartition) \
\
M(END)
namespace CurrentMetrics
{
/// Виды метрик.
enum Metric
{
#define M(NAME) NAME,
APPLY_FOR_METRICS(M)
#undef M
};
/// Получить текстовое описание метрики по его enum-у.
inline const char * getDescription(Metric event)
{
static const char * descriptions[] =
{
#define M(NAME) #NAME,
APPLY_FOR_METRICS(M)
#undef M
};
return descriptions[event];
}
/// Metric identifier (index in array).
using Metric = size_t;
using Value = int64_t;
/// Счётчики - текущие значения метрик.
extern std::atomic<Value> values[END];
/// Get text description of metric by identifier. Returns statically allocated string.
const char * getDescription(Metric event);
/// Metric identifier -> current value of metric.
extern std::atomic<Value> values[];
/// Выставить значение указанной метрики.
/// Get index just after last metric identifier.
Metric end();
/// Set value of specified metric.
inline void set(Metric metric, Value value)
{
values[metric] = value;
}
/// Прибавить величину к значению указанной метрики. Вы затем должны вычесть величину самостоятельно. Или см. ниже class Increment.
/// Add value for specified metric. You must subtract value later; or see class Increment below.
inline void add(Metric metric, Value value = 1)
{
values[metric] += value;
@ -99,7 +50,7 @@ namespace CurrentMetrics
add(metric, -value);
}
/// На время жизни объекта, увеличивает указанное значение на указанную величину.
/// For lifetime of object, add amout for specified metric. Then subtract.
class Increment
{
private:
@ -141,7 +92,7 @@ namespace CurrentMetrics
amount = new_amount;
}
/// Уменьшить значение раньше вызова деструктора.
/// Subtract value before destructor.
void destroy()
{
*what -= amount;
@ -149,6 +100,3 @@ namespace CurrentMetrics
}
};
}
#undef APPLY_FOR_METRICS

View File

@ -15,7 +15,7 @@ namespace ProfileEvents
using Event = size_t;
using Count = size_t;
/// Get text description of event by identifier. Returs statically allocated string.
/// Get text description of event by identifier. Returns statically allocated string.
const char * getDescription(Event event);
/// Counters - how many times each event happened.

View File

@ -8,6 +8,11 @@
#include <DB/Common/ThreadPool.h>
namespace CurrentMetrics
{
extern const Metric QueryThread;
}
namespace DB
{

View File

@ -13,17 +13,22 @@
#include <DB/Common/CurrentMetrics.h>
/** Позволяет обработать множество источников блоков параллельно, используя указанное количество потоков.
* Вынимает из любого доступного источника блок и передаёт его на обработку в предоставленный handler.
/** Allows to process multiple block input streams (sources) in parallel, using specified number of threads.
* Reads (pulls) blocks from any available source and passes it to specified handler.
*
* Устроено так:
* - есть набор источников, из которых можно вынимать блоки;
* - есть набор потоков, которые могут одновременно вынимать блоки из разных источников;
* - "свободные" источники (с которыми сейчас не работает никакой поток) кладутся в очередь источников;
* - когда поток берёт источник для обработки, он удаляет его из очереди источников,
* вынимает из него блок, и затем кладёт источник обратно в очередь источников;
* Implemented in following way:
* - there are multiple input sources to read blocks from;
* - there are multiple threads, that could simultaneously read blocks from different sources;
* - "available" sources (that are not read in any thread right now) are put in queue of sources;
* - when thread take a source to read from, it removes source from queue of sources,
* then read block from source and then put source back to queue of available sources.
*/
namespace CurrentMetrics
{
extern const Metric QueryThread;
}
namespace DB
{

View File

@ -12,6 +12,12 @@
#include <unistd.h>
#include <fcntl.h>
namespace CurrentMetrics
{
extern const Metric OpenFileForRead;
}
namespace DB
{

View File

@ -11,6 +11,11 @@ namespace ProfileEvents
extern const Event FileOpen;
}
namespace CurrentMetrics
{
extern const Metric OpenFileForRead;
}
namespace DB
{

View File

@ -25,6 +25,11 @@ namespace ProfileEvents
extern const Event Seek;
}
namespace CurrentMetrics
{
extern const Metric Read;
}
namespace DB
{

View File

@ -11,6 +11,12 @@
#include <unistd.h>
#include <fcntl.h>
namespace CurrentMetrics
{
extern const Metric OpenFileForWrite;
}
namespace DB
{
@ -32,12 +38,10 @@ public:
int getFD() const override { return fd; }
private:
///
void nextImpl() override;
///
off_t doSeek(off_t off, int whence) override;
///
void doTruncate(off_t length) override;
/// Если в буфере ещё остались данные - запишем их.
void flush();
/// Ждать окончания текущей асинхронной задачи.

View File

@ -15,6 +15,11 @@ namespace ProfileEvents
extern const Event FileOpen;
}
namespace CurrentMetrics
{
extern const Metric OpenFileForWrite;
}
namespace DB
{

View File

@ -19,6 +19,11 @@ namespace ProfileEvents
extern const Event WriteBufferFromFileDescriptorWriteBytes;
}
namespace CurrentMetrics
{
extern const Metric Write;
}
namespace DB
{

View File

@ -1,39 +0,0 @@
#pragma once
#include <thread>
#include <mutex>
#include <condition_variable>
namespace DB
{
class Context;
/** Periodically (each minute, starting at 30 seconds offset)
* calculates and updates some metrics (see CurrentMetrics.h),
* that are not updated automatically (so, need to be actively calculated).
*/
class ActiveMetrics
{
public:
ActiveMetrics(Context & context_)
: context(context_), thread([this] { run(); })
{
}
~ActiveMetrics();
private:
Context & context;
bool quit {false};
std::mutex mutex;
std::condition_variable cond;
std::thread thread;
void run();
void update();
};
}

View File

@ -0,0 +1,54 @@
#pragma once
#include <thread>
#include <mutex>
#include <condition_variable>
#include <unordered_map>
#include <string>
namespace DB
{
class Context;
/** Periodically (each minute, starting at 30 seconds offset)
* calculates and updates some metrics,
* that are not updated automatically (so, need to be asynchronously calculated).
*/
class AsynchronousMetrics
{
public:
AsynchronousMetrics(Context & context_)
: context(context_), thread([this] { run(); })
{
}
~AsynchronousMetrics();
using Value = double;
using Container = std::unordered_map<std::string, Value>;
/// Returns copy of all values.
Container getValues() const;
private:
Context & context;
bool quit {false};
std::mutex wait_mutex;
std::condition_variable wait_cond;
Container container;
mutable std::mutex container_mutex;
std::thread thread;
void run();
void update();
void set(const std::string & name, Value value);
};
}

View File

@ -17,15 +17,20 @@
#include <DB/Common/CurrentMetrics.h>
namespace CurrentMetrics
{
extern const Metric Query;
}
namespace DB
{
/** Список исполняющихся в данный момент запросов.
* Также реализует ограничение на их количество.
/** List of currently executing queries.
* Also implements limit on their number.
*/
/** Информационная составляющая элемента списка процессов.
* Для вывода в SHOW PROCESSLIST. Не содержит никаких сложных объектов, которые что-то делают при копировании или в деструкторах.
/** Information of process list element.
* To output in SHOW PROCESSLIST query. Does not contain any complex objects, that do something on copy or destructor.
*/
struct ProcessInfo
{
@ -42,7 +47,7 @@ struct ProcessInfo
};
/// Запрос и данные о его выполнении.
/// Query and information about its execution.
struct ProcessListElement
{
String query;
@ -63,7 +68,7 @@ struct ProcessListElement
bool is_cancelled = false;
/// Здесь могут быть зарегистрированы временные таблицы. Изменять под mutex-ом.
/// Temporary tables could be registered here. Modify under mutex.
Tables temporary_tables;
@ -91,7 +96,7 @@ struct ProcessListElement
progress.incrementPiecewiseAtomically(value);
if (priority_handle)
priority_handle->waitIfNeed(std::chrono::seconds(1)); /// NOTE Можно сделать настраиваемым таймаут.
priority_handle->waitIfNeed(std::chrono::seconds(1)); /// NOTE Could make timeout customizable.
return !is_cancelled;
}
@ -114,14 +119,14 @@ struct ProcessListElement
};
/// Данные о запросах одного пользователя.
/// Data about queries for one user.
struct ProcessListForUser
{
/// Query_id -> ProcessListElement *
using QueryToElement = std::unordered_map<String, ProcessListElement *>;
QueryToElement queries;
/// Ограничение и счётчик памяти на все одновременно выполняющиеся запросы одного пользователя.
/// Limit and counter for memory of all simultaneously running queries of single user.
MemoryTracker user_memory_tracker;
};
@ -129,7 +134,7 @@ struct ProcessListForUser
class ProcessList;
/// Держит итератор на список, и удаляет элемент из списка в деструкторе.
/// Keeps iterator to process list and removes element in destructor.
class ProcessListEntry
{
private:
@ -158,7 +163,7 @@ public:
using Element = ProcessListElement;
using Entry = ProcessListEntry;
/// list, чтобы итераторы не инвалидировались. NOTE: можно заменить на cyclic buffer, но почти незачем.
/// list, for iterators not to invalidate. NOTE: could replace with cyclic buffer, but not worth.
using Container = std::list<Element>;
using Info = std::vector<ProcessInfo>;
/// User -> queries
@ -166,15 +171,15 @@ public:
private:
mutable std::mutex mutex;
mutable Poco::Condition have_space; /// Количество одновременно выполняющихся запросов стало меньше максимального.
mutable Poco::Condition have_space; /// Number of currently running queries has become less than maximum.
Container cont;
size_t cur_size; /// В C++03 std::list::size не O(1).
size_t max_size; /// Если 0 - не ограничено. Иначе, если пытаемся добавить больше - кидается исключение.
size_t cur_size; /// In C++03 or C++11 and old ABI, std::list::size is not O(1).
size_t max_size; /// 0 means no limit. Otherwise, when limit exceeded, an exception is thrown.
UserToQueries user_to_queries;
QueryPriorities priorities;
/// Ограничение и счётчик памяти на все одновременно выполняющиеся запросы.
/// Limit and counter for memory of all simultaneously running queries.
MemoryTracker total_memory_tracker;
public:
@ -182,17 +187,17 @@ public:
using EntryPtr = std::shared_ptr<ProcessListEntry>;
/** Зарегистрировать выполняющийся запрос. Возвращает refcounted объект, который удаляет запрос из списка при уничтожении.
* Если выполняющихся запросов сейчас слишком много - ждать не более указанного времени.
* Если времени не хватило - кинуть исключение.
/** Register running query. Returns refcounted object, that will remove element from list in destructor.
* If too much running queries - wait for not more than specified (see settings) amount of time.
* If timeout is passed - throw an exception.
*/
EntryPtr insert(const String & query_, const String & user_, const String & query_id_, const Poco::Net::IPAddress & ip_address_,
UInt16 port_, const Settings & settings);
/// Количество одновременно выполняющихся запросов.
/// Number of currently executing queries.
size_t size() const { return cur_size; }
/// Получить текущее состояние списка запросов.
/// Get current state of process list.
Info getInfo() const
{
std::lock_guard<std::mutex> lock(mutex);
@ -211,10 +216,10 @@ public:
max_size = max_size_;
}
/// Зарегистрировать временную таблицу. Потом её можно будет получить по query_id и по названию.
/// Register temporary table. Then it is accessible by query_id and name.
void addTemporaryTable(ProcessListElement & elem, const String & table_name, StoragePtr storage);
/// Найти временную таблицу по query_id и по названию. Замечание: плохо работает, если есть разные запросы с одним query_id.
/// Find temporary table by query_id and name. NOTE: doesn't work fine if there are many queries with same query_id.
StoragePtr tryGetTemporaryTable(const String & query_id, const String & table_name) const;
};

View File

@ -8,17 +8,26 @@
#include <DB/Common/CurrentMetrics.h>
/** Реализует приоритеты запросов.
* Позволяет приостанавливать выполнение запроса, если выполняется хотя бы один более приоритетный запрос.
namespace CurrentMetrics
{
extern const Metric QueryPreempted;
}
namespace DB
{
/** Implements query priorities in very primitive way.
* Allows to freeze query execution if at least one query of higher priority is executed.
*
* Величина приоритета - целое число, чем меньше - тем больше приоритет.
* Priority value is integer, smaller means higher priority.
*
* Приоритет 0 считается особенным - запросы с таким приоритетом выполняются всегда,
* не зависят от других запросов и не влияют на другие запросы.
* То есть 0 означает - не использовать приоритеты.
* Priority 0 is special - queries with that priority is always executed,
* not depends on other queries and not affect other queries.
* Thus 0 means - don't use priorities.
*
* NOTE Возможности сделать лучше:
* - реализовать ограничение на максимальное количество запросов с таким приоритетом.
* NOTE Possibilities for improvement:
* - implement limit on maximum number of running queries with same priority.
*/
class QueryPriorities
{
@ -30,7 +39,7 @@ private:
using Count = int;
/// Количество выполняющихся сейчас запросов с заданным приоритетом.
/// Number of currently running queries for each priority.
using Container = std::map<Priority, Count>;
std::mutex mutex;
@ -38,8 +47,8 @@ private:
Container container;
/** Если есть более приоритетные запросы - спать, пока они не перестанут быть или не истечёт таймаут.
* Возвращает true, если более приоритетные запросы исчезли на момент возврата из функции, false, если истёк таймаут.
/** If there are higher priority queries - sleep until they are finish or timeout happens.
* Returns true, if higher priority queries has finished at return of function, false, if timout exceeded.
*/
template <typename Duration>
bool waitIfNeed(Priority priority, Duration timeout)
@ -103,8 +112,8 @@ public:
using Handle = std::shared_ptr<HandleImpl>;
/** Зарегистрировать, что запрос с заданным приоритетом выполняется.
* Возвращается объект, в деструкторе которого, запись о запросе удаляется.
/** Register query with specified priority.
* Returns an object that remove record in destructor.
*/
Handle insert(Priority priority)
{
@ -117,3 +126,5 @@ public:
return std::make_shared<HandleImpl>(*this, *it);
}
};
}

View File

@ -18,6 +18,11 @@
#include <condition_variable>
namespace CurrentMetrics
{
extern const Metric DistributedSend;
}
namespace DB
{

View File

@ -10,6 +10,12 @@
#include <DB/Common/formatReadable.h>
#include <DB/Common/CurrentMetrics.h>
namespace CurrentMetrics
{
extern const Metric DiskSpaceReservedForMerge;
}
namespace DB
{
@ -20,10 +26,10 @@ namespace ErrorCodes
}
/** Узнает количество свободного места в файловой системе.
* Можно "резервировать" место, чтобы разные операции могли согласованно планировать использование диска.
* Резервирования не разделяются по файловым системам.
* Вместо этого при запросе свободного места считается, что все резервирования сделаны в той же файловой системе.
/** Determines amount of free space in filesystem.
* Could "reserve" space, for different operations to plan disk space usage.
* Reservations are not separated for different filesystems,
* instead it is assumed, that all reservations are done within same filesystem.
*/
class DiskSpaceMonitor
{
@ -61,7 +67,7 @@ public:
}
}
/// Изменить количество зарезервированного места. При увеличении не делается проверка, что места достаточно.
/// Change amount of reserved space. When new_size is greater than before, availability of free space is not checked.
void update(size_t new_size)
{
std::lock_guard<std::mutex> lock(DiskSpaceMonitor::mutex);
@ -99,7 +105,7 @@ public:
size_t res = fs.f_bfree * fs.f_bsize;
/// Зарезервируем дополнительно 30 МБ. Когда я тестировал, statvfs показывал на несколько мегабайт больше свободного места, чем df.
/// Heuristic by Michael Kolupaev: reserve 30 MB more, because statvfs shows few megabytes more space than df.
res -= std::min(res, 30 * (1ul << 20));
std::lock_guard<std::mutex> lock(mutex);
@ -124,7 +130,7 @@ public:
return reservation_count;
}
/// Если места (приблизительно) недостаточно, бросает исключение.
/// If not enough (approximately) space, throw an exception.
static ReservationPtr reserve(const std::string & path, size_t size)
{
size_t free_bytes = getUnreservedFreeSpace(path);

View File

@ -7,6 +7,12 @@
#include <mutex>
#include <atomic>
namespace CurrentMetrics
{
extern const Metric Merge;
}
namespace DB
{

View File

@ -0,0 +1,44 @@
#pragma once
#include <ext/shared_ptr_helper.hpp>
#include <DB/Storages/IStorage.h>
namespace DB
{
class AsynchronousMetrics;
/** Implements system table asynchronous_metrics, which allows to get values of periodically (asynchronously) updated metrics.
*/
class StorageSystemAsynchronousMetrics : private ext::shared_ptr_helper<StorageSystemAsynchronousMetrics>, public IStorage
{
friend class ext::shared_ptr_helper<StorageSystemAsynchronousMetrics>;
public:
static StoragePtr create(const std::string & name_, const AsynchronousMetrics & async_metrics_);
std::string getName() const override { return "SystemAsynchronousMetrics"; }
std::string getTableName() const override { return name; }
const NamesAndTypesList & getColumnsListImpl() const override { return columns; }
BlockInputStreams read(
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE,
unsigned threads = 1) override;
private:
const std::string name;
NamesAndTypesList columns;
const AsynchronousMetrics & async_metrics;
StorageSystemAsynchronousMetrics(const std::string & name_, const AsynchronousMetrics & async_metrics_);
};
}

View File

@ -24,6 +24,11 @@
#include <DB/Common/CurrentMetrics.h>
namespace CurrentMetrics
{
extern const Metric SendExternalTables;
}
namespace DB
{
@ -286,7 +291,7 @@ void Connection::sendQuery(const String & query, const String & query_id_, UInt6
block_in.reset();
block_out.reset();
/// Если версия сервера достаточно новая и стоит флаг, отправляем пустой блок, символизируя конец передачи данных.
/// If server version is new enough, send empty block which meand end of data.
if (server_revision >= DBMS_MIN_REVISION_WITH_TEMPORARY_TABLES && !with_pending_data)
{
sendData(Block());

View File

@ -1,7 +1,52 @@
#include <DB/Common/CurrentMetrics.h>
/// Available metrics. Add something here as you wish.
#define APPLY_FOR_METRICS(M) \
M(Query) \
M(Merge) \
M(ReplicatedFetch) \
M(ReplicatedSend) \
M(ReplicatedChecks) \
M(BackgroundPoolTask) \
M(DiskSpaceReservedForMerge) \
M(DistributedSend) \
M(QueryPreempted) \
M(TCPConnection) \
M(HTTPConnection) \
M(InterserverConnection) \
M(OpenFileForRead) \
M(OpenFileForWrite) \
M(Read) \
M(Write) \
M(SendExternalTables) \
M(QueryThread) \
M(ReadonlyReplica) \
M(MemoryTracking) \
namespace CurrentMetrics
{
std::atomic<Value> values[END] {}; /// Глобальная переменная - инициализируется нулями.
#define M(NAME) extern const Metric NAME = __COUNTER__;
APPLY_FOR_METRICS(M)
#undef M
constexpr Metric END = __COUNTER__;
std::atomic<Value> values[END] {}; /// Global variable, initialized by zeros.
const char * getDescription(Metric event)
{
static const char * descriptions[] =
{
#define M(NAME) #NAME,
APPLY_FOR_METRICS(M)
#undef M
};
return descriptions[event];
}
Metric end() { return END; }
}
#undef APPLY_FOR_METRICS

View File

@ -9,12 +9,17 @@
#include <DB/Common/MemoryTracker.h>
namespace CurrentMetrics
{
extern const Metric MemoryTracking;
}
namespace DB
{
namespace ErrorCodes
{
extern const int MEMORY_LIMIT_EXCEEDED;
}
namespace ErrorCodes
{
extern const int MEMORY_LIMIT_EXCEEDED;
}
}

View File

@ -4,6 +4,12 @@
#include <DB/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.h>
namespace CurrentMetrics
{
extern const Metric QueryThread;
}
namespace DB
{

View File

@ -16,6 +16,11 @@ namespace ProfileEvents
extern const Event ReadBufferAIOReadBytes;
}
namespace CurrentMetrics
{
extern const Metric Read;
}
namespace DB
{

View File

@ -13,6 +13,11 @@ namespace ProfileEvents
extern const Event WriteBufferAIOWriteBytes;
}
namespace CurrentMetrics
{
extern const Metric Write;
}
namespace DB
{
@ -34,10 +39,10 @@ namespace ErrorCodes
/// Примечание: выделяется дополнительная страница, которая содежрит те данные, которые
/// не влезают в основной буфер.
WriteBufferAIO::WriteBufferAIO(const std::string & filename_, size_t buffer_size_, int flags_, mode_t mode_,
char * existing_memory_)
: WriteBufferFromFileBase(buffer_size_ + DEFAULT_AIO_FILE_BLOCK_SIZE, existing_memory_, DEFAULT_AIO_FILE_BLOCK_SIZE),
flush_buffer(BufferWithOwnMemory<WriteBuffer>(this->memory.size(), nullptr, DEFAULT_AIO_FILE_BLOCK_SIZE)),
filename(filename_)
char * existing_memory_)
: WriteBufferFromFileBase(buffer_size_ + DEFAULT_AIO_FILE_BLOCK_SIZE, existing_memory_, DEFAULT_AIO_FILE_BLOCK_SIZE),
flush_buffer(BufferWithOwnMemory<WriteBuffer>(this->memory.size(), nullptr, DEFAULT_AIO_FILE_BLOCK_SIZE)),
filename(filename_)
{
/// Исправить информацию о размере буферов, чтобы дополнительные страницы не касались базового класса BufferBase.
this->buffer().resize(this->buffer().size() - DEFAULT_AIO_FILE_BLOCK_SIZE);

View File

@ -29,6 +29,11 @@ namespace ProfileEvents
extern const Event ExternalAggregationUncompressedBytes;
}
namespace CurrentMetrics
{
extern const Metric QueryThread;
}
namespace DB
{

View File

@ -1,4 +1,4 @@
#include <DB/Interpreters/ActiveMetrics.h>
#include <DB/Interpreters/AsynchronousMetrics.h>
#include <DB/Common/Exception.h>
#include <DB/Common/setThreadName.h>
#include <DB/Common/CurrentMetrics.h>
@ -13,16 +13,16 @@
namespace DB
{
ActiveMetrics::~ActiveMetrics()
AsynchronousMetrics::~AsynchronousMetrics()
{
try
{
{
std::lock_guard<std::mutex> lock{mutex};
std::lock_guard<std::mutex> lock{wait_mutex};
quit = true;
}
cond.notify_one();
wait_cond.notify_one();
thread.join();
}
catch (...)
@ -32,11 +32,25 @@ ActiveMetrics::~ActiveMetrics()
}
void ActiveMetrics::run()
AsynchronousMetrics::Container AsynchronousMetrics::getValues() const
{
setThreadName("ActiveMetrics");
std::lock_guard<std::mutex> lock{container_mutex};
return container;
}
std::unique_lock<std::mutex> lock{mutex};
void AsynchronousMetrics::set(const std::string & name, Value value)
{
std::lock_guard<std::mutex> lock{container_mutex};
container[name] = value;
}
void AsynchronousMetrics::run()
{
setThreadName("AsyncMetrics");
std::unique_lock<std::mutex> lock{wait_mutex};
/// Next minute + 30 seconds. To be distant with moment of transmission of metrics, see MetricsTransmitter.
const auto get_next_minute = []
@ -47,7 +61,7 @@ void ActiveMetrics::run()
while (true)
{
if (cond.wait_until(lock, get_next_minute(), [this] { return quit; }))
if (wait_cond.wait_until(lock, get_next_minute(), [this] { return quit; }))
break;
try
@ -78,21 +92,21 @@ static void calculateMaxAndSum(Max & max, Sum & sum, T x)
}
void ActiveMetrics::update()
void AsynchronousMetrics::update()
{
{
if (auto mark_cache = context.getMarkCache())
{
CurrentMetrics::set(CurrentMetrics::MarkCacheBytes, mark_cache->weight());
CurrentMetrics::set(CurrentMetrics::MarkCacheFiles, mark_cache->count());
set("MarkCacheBytes", mark_cache->weight());
set("MarkCacheFiles", mark_cache->count());
}
}
{
if (auto uncompressed_cache = context.getUncompressedCache())
{
CurrentMetrics::set(CurrentMetrics::UncompressedCacheBytes, uncompressed_cache->weight());
CurrentMetrics::set(CurrentMetrics::UncompressedCacheCells, uncompressed_cache->count());
set("UncompressedCacheBytes", uncompressed_cache->weight());
set("UncompressedCacheCells", uncompressed_cache->count());
}
}
@ -156,18 +170,18 @@ void ActiveMetrics::update()
}
}
CurrentMetrics::set(CurrentMetrics::ReplicasMaxQueueSize, max_queue_size);
CurrentMetrics::set(CurrentMetrics::ReplicasMaxInsertsInQueue, max_inserts_in_queue);
CurrentMetrics::set(CurrentMetrics::ReplicasMaxMergesInQueue, max_merges_in_queue);
set("ReplicasMaxQueueSize", max_queue_size);
set("ReplicasMaxInsertsInQueue", max_inserts_in_queue);
set("ReplicasMaxMergesInQueue", max_merges_in_queue);
CurrentMetrics::set(CurrentMetrics::ReplicasSumQueueSize, sum_queue_size);
CurrentMetrics::set(CurrentMetrics::ReplicasSumInsertsInQueue, sum_inserts_in_queue);
CurrentMetrics::set(CurrentMetrics::ReplicasSumMergesInQueue, sum_merges_in_queue);
set("ReplicasSumQueueSize", sum_queue_size);
set("ReplicasSumInsertsInQueue", sum_inserts_in_queue);
set("ReplicasSumMergesInQueue", sum_merges_in_queue);
CurrentMetrics::set(CurrentMetrics::ReplicasMaxAbsoluteDelay, max_absolute_delay);
CurrentMetrics::set(CurrentMetrics::ReplicasMaxRelativeDelay, max_relative_delay);
set("ReplicasMaxAbsoluteDelay", max_absolute_delay);
set("ReplicasMaxRelativeDelay", max_relative_delay);
CurrentMetrics::set(CurrentMetrics::MaxPartCountForPartition, max_part_count_for_partition);
set("MaxPartCountForPartition", max_part_count_for_partition);
}
/// Add more metrics as you wish.

View File

@ -5,6 +5,11 @@
#include "Server.h"
namespace CurrentMetrics
{
extern const Metric HTTPConnection;
}
namespace DB
{
@ -21,7 +26,7 @@ public:
struct Output
{
std::shared_ptr<WriteBufferFromHTTPServerResponse> out;
/// Используется для выдачи ответа. Равен либо out, либо CompressedWriteBuffer(*out), в зависимости от настроек.
/// Used for sending response. Points to 'out', or to CompressedWriteBuffer(*out), depending on settings.
std::shared_ptr<WriteBuffer> out_maybe_compressed;
};
@ -38,7 +43,7 @@ private:
Logger * log;
/// Функция также инициализирует used_output.
/// Also initializes 'used_output'.
void processQuery(
Poco::Net::HTTPServerRequest & request,
HTMLForm & params,

View File

@ -4,6 +4,11 @@
#include <DB/Common/CurrentMetrics.h>
namespace CurrentMetrics
{
extern const Metric InterserverConnection;
}
namespace DB
{

View File

@ -3,6 +3,7 @@
#include <daemon/BaseDaemon.h>
#include <DB/Common/setThreadName.h>
#include <DB/Common/CurrentMetrics.h>
#include <DB/Interpreters/AsynchronousMetrics.h>
namespace DB
@ -39,6 +40,8 @@ void MetricsTransmitter::run()
std::chrono::system_clock::now() + std::chrono::minutes(1));
};
ProfileEvents::Count prev_counters[ProfileEvents::end()] {};
std::unique_lock<std::mutex> lock{mutex};
while (true)
@ -46,27 +49,29 @@ void MetricsTransmitter::run()
if (cond.wait_until(lock, get_next_minute(), [this] { return quit; }))
break;
transmit();
transmit(prev_counters);
}
}
void MetricsTransmitter::transmit()
void MetricsTransmitter::transmit(ProfileEvents::Count * prev_counters)
{
auto async_metrics_values = async_metrics.getValues();
GraphiteWriter::KeyValueVector<ssize_t> key_vals{};
key_vals.reserve(ProfileEvents::end() + CurrentMetrics::END);
key_vals.reserve(ProfileEvents::end() + CurrentMetrics::end() + async_metrics_values.size());
for (size_t i = 0, end = ProfileEvents::end(); i < end; ++i)
{
const auto counter = ProfileEvents::counters[i].load(std::memory_order_relaxed);
const auto counter_increment = counter - prev_counters[i].load(std::memory_order_relaxed);
prev_counters[i].store(counter, std::memory_order_relaxed);
const auto counter_increment = counter - prev_counters[i];
prev_counters[i] = counter;
std::string key {ProfileEvents::getDescription(static_cast<ProfileEvents::Event>(i))};
key_vals.emplace_back(profile_events_path_prefix + key, counter_increment);
}
for (size_t i = 0; i < CurrentMetrics::END; ++i)
for (size_t i = 0, end = CurrentMetrics::end(); i < end; ++i)
{
const auto value = CurrentMetrics::values[i].load(std::memory_order_relaxed);
@ -74,6 +79,11 @@ void MetricsTransmitter::transmit()
key_vals.emplace_back(current_metrics_path_prefix + key, value);
}
for (const auto & name_value : async_metrics_values)
{
key_vals.emplace_back(asynchronous_metrics_path_prefix + name_value.first, name_value.second);
}
BaseDaemon::instance().writeToGraphite(key_vals);
}

View File

@ -10,23 +10,25 @@
namespace DB
{
class AsynchronousMetrics;
/** Automatically sends
* - difference of ProfileEvents;
* - values of CurrentMetrics;
* - values of ActiveMetrics;
* - values of AsynchronousMetrics;
* to Graphite at beginning of every minute.
*/
class MetricsTransmitter
{
public:
MetricsTransmitter(const AsynchronousMetrics & async_metrics_) : async_metrics(async_metrics_) {}
~MetricsTransmitter();
private:
void run();
void transmit();
void transmit(ProfileEvents::Count * prev_counters);
/// Values of ProfileEvents counters at previous iteration (or zeros at first time).
decltype(ProfileEvents::counters) prev_counters{};
const AsynchronousMetrics & async_metrics;
bool quit = false;
std::mutex mutex;
@ -35,7 +37,7 @@ private:
static constexpr auto profile_events_path_prefix = "ClickHouse.ProfileEvents.";
static constexpr auto current_metrics_path_prefix = "ClickHouse.Metrics.";
static constexpr auto active_metrics_path_prefix = "ClickHouse.ActiveMetrics.";
static constexpr auto asynchronous_metrics_path_prefix = "ClickHouse.AsynchronousMetrics.";
};
}

View File

@ -20,7 +20,7 @@
#include <DB/Interpreters/loadMetadata.h>
#include <DB/Interpreters/ProcessList.h>
#include <DB/Interpreters/ActiveMetrics.h>
#include <DB/Interpreters/AsynchronousMetrics.h>
#include <DB/Storages/System/StorageSystemNumbers.h>
#include <DB/Storages/System/StorageSystemTables.h>
@ -39,6 +39,7 @@
#include <DB/Storages/System/StorageSystemFunctions.h>
#include <DB/Storages/System/StorageSystemClusters.h>
#include <DB/Storages/System/StorageSystemMetrics.h>
#include <DB/Storages/System/StorageSystemAsynchronousMetrics.h>
#include <DB/Storages/StorageReplicatedMergeTree.h>
#include <DB/Storages/MergeTree/ReshardingWorker.h>
#include <DB/Databases/DatabaseOrdinary.h>
@ -379,10 +380,6 @@ int Server::main(const std::vector<std::string> & args)
);
{
const auto metrics_transmitter = config().getBool("use_graphite", true)
? std::make_unique<MetricsTransmitter>()
: nullptr;
const std::string listen_host = config().getString("listen_host", "::");
Poco::Timespan keep_alive_timeout(config().getInt("keep_alive_timeout", 10), 0);
@ -505,7 +502,13 @@ int Server::main(const std::vector<std::string> & args)
}
/// This object will periodically calculate some metrics.
ActiveMetrics active_metrics(*global_context);
AsynchronousMetrics async_metrics(*global_context);
system_database->attachTable("asynchronous_metrics", StorageSystemAsynchronousMetrics::create("asynchronous_metrics", async_metrics));
const auto metrics_transmitter = config().getBool("use_graphite", true)
? std::make_unique<MetricsTransmitter>(async_metrics)
: nullptr;
waitForTerminationRequest();
}

View File

@ -14,6 +14,12 @@
#include "Server.h"
namespace CurrentMetrics
{
extern const Metric TCPConnection;
}
namespace DB
{

View File

@ -7,6 +7,12 @@
#include <random>
namespace CurrentMetrics
{
extern const Metric BackgroundPoolTask;
}
namespace DB
{

View File

@ -3,6 +3,12 @@
#include <DB/Common/CurrentMetrics.h>
namespace CurrentMetrics
{
extern const Metric ReplicatedSend;
extern const Metric ReplicatedFetch;
}
namespace DB
{

View File

@ -15,6 +15,11 @@
#include <DB/Common/escapeForFileName.h>
namespace CurrentMetrics
{
extern const Metric ReplicatedChecks;
}
namespace DB
{

View File

@ -13,6 +13,12 @@ namespace ProfileEvents
extern const Event ReplicaPartialShutdown;
}
namespace CurrentMetrics
{
extern const Metric ReadonlyReplica;
}
namespace DB
{

View File

@ -7,6 +7,13 @@
#include <DB/IO/InterserverWriteBuffer.h>
namespace CurrentMetrics
{
extern const Metric ReplicatedSend;
extern const Metric ReplicatedFetch;
}
namespace DB
{

View File

@ -0,0 +1,68 @@
#include <DB/Interpreters/AsynchronousMetrics.h>
#include <DB/Columns/ColumnString.h>
#include <DB/DataTypes/DataTypeString.h>
#include <DB/DataTypes/DataTypesNumberFixed.h>
#include <DB/DataStreams/OneBlockInputStream.h>
#include <DB/Storages/System/StorageSystemAsynchronousMetrics.h>
namespace DB
{
StorageSystemAsynchronousMetrics::StorageSystemAsynchronousMetrics(const std::string & name_, const AsynchronousMetrics & async_metrics_)
: name(name_),
columns
{
{"metric", std::make_shared<DataTypeString>()},
{"value", std::make_shared<DataTypeFloat64>()},
},
async_metrics(async_metrics_)
{
}
StoragePtr StorageSystemAsynchronousMetrics::create(const std::string & name_, const AsynchronousMetrics & async_metrics_)
{
return make_shared(name_, async_metrics_);
}
BlockInputStreams StorageSystemAsynchronousMetrics::read(
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
const size_t max_block_size,
const unsigned threads)
{
check(column_names);
processed_stage = QueryProcessingStage::FetchColumns;
Block block;
ColumnWithTypeAndName col_metric;
col_metric.name = "metric";
col_metric.type = std::make_shared<DataTypeString>();
col_metric.column = std::make_shared<ColumnString>();
block.insert(col_metric);
ColumnWithTypeAndName col_value;
col_value.name = "value";
col_value.type = std::make_shared<DataTypeFloat64>();
col_value.column = std::make_shared<ColumnFloat64>();
block.insert(col_value);
auto async_metrics_values = async_metrics.getValues();
for (const auto & name_value : async_metrics_values)
{
col_metric.column->insert(name_value.first);
col_value.column->insert(name_value.second);
}
return BlockInputStreams(1, std::make_shared<OneBlockInputStream>(block));
}
}

View File

@ -52,7 +52,7 @@ BlockInputStreams StorageSystemMetrics::read(
col_value.column = std::make_shared<ColumnInt64>();
block.insert(col_value);
for (size_t i = 0; i < CurrentMetrics::END; ++i)
for (size_t i = 0, end = CurrentMetrics::end(); i < end; ++i)
{
auto value = CurrentMetrics::values[i].load(std::memory_order_relaxed);