From ea852bb9d86bbf66662ed618424a5cfbed6ca578 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Mon, 24 Oct 2016 07:06:27 +0300 Subject: [PATCH] Separate compilation of CurrentMetrics; Separated AsynchronousMetrics from CurrentMetrics; Comments [#METR-23237]. --- dbms/CMakeLists.txt | 6 +- dbms/include/DB/Common/CurrentMetrics.h | 98 +++++-------------- dbms/include/DB/Common/ProfileEvents.h | 2 +- .../AsynchronousBlockInputStream.h | 5 + .../DB/DataStreams/ParallelInputsProcessor.h | 21 ++-- dbms/include/DB/IO/ReadBufferAIO.h | 6 ++ dbms/include/DB/IO/ReadBufferFromFile.h | 5 + .../DB/IO/ReadBufferFromFileDescriptor.h | 5 + dbms/include/DB/IO/WriteBufferAIO.h | 10 +- dbms/include/DB/IO/WriteBufferFromFile.h | 5 + .../DB/IO/WriteBufferFromFileDescriptor.h | 5 + dbms/include/DB/Interpreters/ActiveMetrics.h | 39 -------- .../DB/Interpreters/AsynchronousMetrics.h | 54 ++++++++++ dbms/include/DB/Interpreters/ProcessList.h | 49 +++++----- .../include/DB/Interpreters/QueryPriorities.h | 37 ++++--- .../Storages/Distributed/DirectoryMonitor.h | 5 + .../DB/Storages/MergeTree/DiskSpaceMonitor.h | 20 ++-- .../include/DB/Storages/MergeTree/MergeList.h | 6 ++ .../System/StorageSystemAsynchronousMetrics.h | 44 +++++++++ dbms/src/Client/Connection.cpp | 7 +- dbms/src/Common/CurrentMetrics.cpp | 47 ++++++++- dbms/src/Common/MemoryTracker.cpp | 13 ++- ...regatedMemoryEfficientBlockInputStream.cpp | 6 ++ dbms/src/IO/ReadBufferAIO.cpp | 5 + dbms/src/IO/WriteBufferAIO.cpp | 13 ++- dbms/src/Interpreters/Aggregator.cpp | 5 + ...iveMetrics.cpp => AsynchronousMetrics.cpp} | 58 ++++++----- dbms/src/Server/HTTPHandler.h | 9 +- dbms/src/Server/InterserverIOHTTPHandler.h | 5 + dbms/src/Server/MetricsTransmitter.cpp | 22 +++-- dbms/src/Server/MetricsTransmitter.h | 12 ++- dbms/src/Server/Server.cpp | 15 +-- dbms/src/Server/TCPHandler.h | 6 ++ .../MergeTree/BackgroundProcessingPool.cpp | 6 ++ .../Storages/MergeTree/DataPartsExchange.cpp | 6 ++ .../MergeTree/MergeTreePartChecker.cpp | 5 + .../ReplicatedMergeTreeRestartingThread.cpp | 6 ++ .../MergeTree/ShardedPartitionUploader.cpp | 7 ++ .../StorageSystemAsynchronousMetrics.cpp | 68 +++++++++++++ .../Storages/System/StorageSystemMetrics.cpp | 2 +- 40 files changed, 523 insertions(+), 222 deletions(-) delete mode 100644 dbms/include/DB/Interpreters/ActiveMetrics.h create mode 100644 dbms/include/DB/Interpreters/AsynchronousMetrics.h create mode 100644 dbms/include/DB/Storages/System/StorageSystemAsynchronousMetrics.h rename dbms/src/Interpreters/{ActiveMetrics.cpp => AsynchronousMetrics.cpp} (70%) create mode 100644 dbms/src/Storages/System/StorageSystemAsynchronousMetrics.cpp diff --git a/dbms/CMakeLists.txt b/dbms/CMakeLists.txt index c73ad646c02..e635fe06502 100644 --- a/dbms/CMakeLists.txt +++ b/dbms/CMakeLists.txt @@ -371,7 +371,7 @@ add_library (dbms include/DB/Interpreters/ClusterProxy/DescribeQueryConstructor.h include/DB/Interpreters/ClusterProxy/AlterQueryConstructor.h include/DB/Interpreters/ClusterProxy/Query.h - include/DB/Interpreters/ActiveMetrics.h + include/DB/Interpreters/AsynchronousMetrics.h include/DB/Common/Allocator.h include/DB/Common/CombinedCardinalityEstimator.h include/DB/Common/ExternalTable.h @@ -542,6 +542,7 @@ add_library (dbms include/DB/Storages/StorageStripeLog.h include/DB/Storages/System/StorageSystemEvents.h include/DB/Storages/System/StorageSystemMetrics.h + include/DB/Storages/System/StorageSystemAsynchronousMetrics.h include/DB/Storages/System/StorageSystemTables.h include/DB/Storages/MergeTree/MarkRange.h include/DB/Storages/MergeTree/MergeTreeDataMerger.h @@ -648,6 +649,7 @@ add_library (dbms src/Storages/System/StorageSystemProcesses.cpp src/Storages/System/StorageSystemEvents.cpp src/Storages/System/StorageSystemMetrics.cpp + src/Storages/System/StorageSystemAsynchronousMetrics.cpp src/Storages/System/StorageSystemMerges.cpp src/Storages/System/StorageSystemSettings.cpp src/Storages/System/StorageSystemZooKeeper.cpp @@ -853,7 +855,7 @@ add_library (dbms src/Interpreters/ClusterProxy/DescribeQueryConstructor.cpp src/Interpreters/ClusterProxy/AlterQueryConstructor.cpp src/Interpreters/ClusterProxy/Query.cpp - src/Interpreters/ActiveMetrics.cpp + src/Interpreters/AsynchronousMetrics.cpp src/Functions/FunctionFactory.cpp src/Functions/FunctionsArithmetic.cpp diff --git a/dbms/include/DB/Common/CurrentMetrics.h b/dbms/include/DB/Common/CurrentMetrics.h index e06681ae5b1..720e07218dc 100644 --- a/dbms/include/DB/Common/CurrentMetrics.h +++ b/dbms/include/DB/Common/CurrentMetrics.h @@ -1,94 +1,45 @@ #pragma once +#include #include #include #include -/** Позволяет считать количество одновременно происходящих событий или текущее значение какой-либо метрики. - * - для высокоуровневого профайлинга. +/** Allows to count number of simultaneously happening processes or current value of some metric. + * - for high-level profiling. * - * Также смотрите ProfileEvents.h - * В ProfileEvents считается общее количество произошедших (точечных) событий - например, сколько раз были выполнены запросы. - * В CurrentMetrics считается количество одновременных событий - например, сколько сейчас одновременно выполняется запросов, - * или текущее значение метрики - например, величина отставания реплики в секундах. + * See also ProfileEvents.h + * ProfileEvents counts number of happened events - for example, how many times queries was executed. + * CurrentMetrics counts number of simultaneously happening events - for example, number of currently executing queries, right now, + * or just current value of some metric - for example, replica delay in seconds. + * + * CurrentMetrics are updated instantly and are correct for any point in time. + * For periodically (asynchronously) updated metrics, see AsynchronousMetrics.h */ -#define APPLY_FOR_METRICS(M) \ - M(Query) \ - M(Merge) \ - M(ReplicatedFetch) \ - M(ReplicatedSend) \ - M(ReplicatedChecks) \ - M(BackgroundPoolTask) \ - M(DiskSpaceReservedForMerge) \ - M(DistributedSend) \ - M(QueryPreempted) \ - M(TCPConnection) \ - M(HTTPConnection) \ - M(InterserverConnection) \ - M(OpenFileForRead) \ - M(OpenFileForWrite) \ - M(Read) \ - M(Write) \ - M(SendExternalTables) \ - M(QueryThread) \ - M(ReadonlyReplica) \ - M(MemoryTracking) \ - M(MarkCacheBytes) \ - M(MarkCacheFiles) \ - M(UncompressedCacheBytes) \ - M(UncompressedCacheCells) \ - M(ReplicasMaxQueueSize) \ - M(ReplicasMaxInsertsInQueue) \ - M(ReplicasMaxMergesInQueue) \ - M(ReplicasSumQueueSize) \ - M(ReplicasSumInsertsInQueue) \ - M(ReplicasSumMergesInQueue) \ - M(ReplicasMaxAbsoluteDelay) \ - M(ReplicasMaxRelativeDelay) \ - M(MaxPartCountForPartition) \ - \ - M(END) - namespace CurrentMetrics { - /// Виды метрик. - enum Metric - { - #define M(NAME) NAME, - APPLY_FOR_METRICS(M) - #undef M - }; - - - /// Получить текстовое описание метрики по его enum-у. - inline const char * getDescription(Metric event) - { - static const char * descriptions[] = - { - #define M(NAME) #NAME, - APPLY_FOR_METRICS(M) - #undef M - }; - - return descriptions[event]; - } - - + /// Metric identifier (index in array). + using Metric = size_t; using Value = int64_t; - /// Счётчики - текущие значения метрик. - extern std::atomic values[END]; + /// Get text description of metric by identifier. Returns statically allocated string. + const char * getDescription(Metric event); + /// Metric identifier -> current value of metric. + extern std::atomic values[]; - /// Выставить значение указанной метрики. + /// Get index just after last metric identifier. + Metric end(); + + /// Set value of specified metric. inline void set(Metric metric, Value value) { values[metric] = value; } - /// Прибавить величину к значению указанной метрики. Вы затем должны вычесть величину самостоятельно. Или см. ниже class Increment. + /// Add value for specified metric. You must subtract value later; or see class Increment below. inline void add(Metric metric, Value value = 1) { values[metric] += value; @@ -99,7 +50,7 @@ namespace CurrentMetrics add(metric, -value); } - /// На время жизни объекта, увеличивает указанное значение на указанную величину. + /// For lifetime of object, add amout for specified metric. Then subtract. class Increment { private: @@ -141,7 +92,7 @@ namespace CurrentMetrics amount = new_amount; } - /// Уменьшить значение раньше вызова деструктора. + /// Subtract value before destructor. void destroy() { *what -= amount; @@ -149,6 +100,3 @@ namespace CurrentMetrics } }; } - - -#undef APPLY_FOR_METRICS diff --git a/dbms/include/DB/Common/ProfileEvents.h b/dbms/include/DB/Common/ProfileEvents.h index 7ba6e9e1561..06610398795 100644 --- a/dbms/include/DB/Common/ProfileEvents.h +++ b/dbms/include/DB/Common/ProfileEvents.h @@ -15,7 +15,7 @@ namespace ProfileEvents using Event = size_t; using Count = size_t; - /// Get text description of event by identifier. Returs statically allocated string. + /// Get text description of event by identifier. Returns statically allocated string. const char * getDescription(Event event); /// Counters - how many times each event happened. diff --git a/dbms/include/DB/DataStreams/AsynchronousBlockInputStream.h b/dbms/include/DB/DataStreams/AsynchronousBlockInputStream.h index 2e5c21df836..417770edaef 100644 --- a/dbms/include/DB/DataStreams/AsynchronousBlockInputStream.h +++ b/dbms/include/DB/DataStreams/AsynchronousBlockInputStream.h @@ -8,6 +8,11 @@ #include +namespace CurrentMetrics +{ + extern const Metric QueryThread; +} + namespace DB { diff --git a/dbms/include/DB/DataStreams/ParallelInputsProcessor.h b/dbms/include/DB/DataStreams/ParallelInputsProcessor.h index afbd22ec073..cccceea0d60 100644 --- a/dbms/include/DB/DataStreams/ParallelInputsProcessor.h +++ b/dbms/include/DB/DataStreams/ParallelInputsProcessor.h @@ -13,17 +13,22 @@ #include -/** Позволяет обработать множество источников блоков параллельно, используя указанное количество потоков. - * Вынимает из любого доступного источника блок и передаёт его на обработку в предоставленный handler. +/** Allows to process multiple block input streams (sources) in parallel, using specified number of threads. + * Reads (pulls) blocks from any available source and passes it to specified handler. * - * Устроено так: - * - есть набор источников, из которых можно вынимать блоки; - * - есть набор потоков, которые могут одновременно вынимать блоки из разных источников; - * - "свободные" источники (с которыми сейчас не работает никакой поток) кладутся в очередь источников; - * - когда поток берёт источник для обработки, он удаляет его из очереди источников, - * вынимает из него блок, и затем кладёт источник обратно в очередь источников; + * Implemented in following way: + * - there are multiple input sources to read blocks from; + * - there are multiple threads, that could simultaneously read blocks from different sources; + * - "available" sources (that are not read in any thread right now) are put in queue of sources; + * - when thread take a source to read from, it removes source from queue of sources, + * then read block from source and then put source back to queue of available sources. */ +namespace CurrentMetrics +{ + extern const Metric QueryThread; +} + namespace DB { diff --git a/dbms/include/DB/IO/ReadBufferAIO.h b/dbms/include/DB/IO/ReadBufferAIO.h index 4270197c01c..cabd5989524 100644 --- a/dbms/include/DB/IO/ReadBufferAIO.h +++ b/dbms/include/DB/IO/ReadBufferAIO.h @@ -12,6 +12,12 @@ #include #include + +namespace CurrentMetrics +{ + extern const Metric OpenFileForRead; +} + namespace DB { diff --git a/dbms/include/DB/IO/ReadBufferFromFile.h b/dbms/include/DB/IO/ReadBufferFromFile.h index a4ee5a1c14b..3a9bdbc788c 100644 --- a/dbms/include/DB/IO/ReadBufferFromFile.h +++ b/dbms/include/DB/IO/ReadBufferFromFile.h @@ -11,6 +11,11 @@ namespace ProfileEvents extern const Event FileOpen; } +namespace CurrentMetrics +{ + extern const Metric OpenFileForRead; +} + namespace DB { diff --git a/dbms/include/DB/IO/ReadBufferFromFileDescriptor.h b/dbms/include/DB/IO/ReadBufferFromFileDescriptor.h index dc2f124e93b..d419f61195a 100644 --- a/dbms/include/DB/IO/ReadBufferFromFileDescriptor.h +++ b/dbms/include/DB/IO/ReadBufferFromFileDescriptor.h @@ -25,6 +25,11 @@ namespace ProfileEvents extern const Event Seek; } +namespace CurrentMetrics +{ + extern const Metric Read; +} + namespace DB { diff --git a/dbms/include/DB/IO/WriteBufferAIO.h b/dbms/include/DB/IO/WriteBufferAIO.h index 60d6ee7fdf7..d0f040fa6f3 100644 --- a/dbms/include/DB/IO/WriteBufferAIO.h +++ b/dbms/include/DB/IO/WriteBufferAIO.h @@ -11,6 +11,12 @@ #include #include + +namespace CurrentMetrics +{ + extern const Metric OpenFileForWrite; +} + namespace DB { @@ -32,12 +38,10 @@ public: int getFD() const override { return fd; } private: - /// void nextImpl() override; - /// off_t doSeek(off_t off, int whence) override; - /// void doTruncate(off_t length) override; + /// Если в буфере ещё остались данные - запишем их. void flush(); /// Ждать окончания текущей асинхронной задачи. diff --git a/dbms/include/DB/IO/WriteBufferFromFile.h b/dbms/include/DB/IO/WriteBufferFromFile.h index 506090c1cf5..bde37721e51 100644 --- a/dbms/include/DB/IO/WriteBufferFromFile.h +++ b/dbms/include/DB/IO/WriteBufferFromFile.h @@ -15,6 +15,11 @@ namespace ProfileEvents extern const Event FileOpen; } +namespace CurrentMetrics +{ + extern const Metric OpenFileForWrite; +} + namespace DB { diff --git a/dbms/include/DB/IO/WriteBufferFromFileDescriptor.h b/dbms/include/DB/IO/WriteBufferFromFileDescriptor.h index ccc4ba0d00b..248ecdf8a20 100644 --- a/dbms/include/DB/IO/WriteBufferFromFileDescriptor.h +++ b/dbms/include/DB/IO/WriteBufferFromFileDescriptor.h @@ -19,6 +19,11 @@ namespace ProfileEvents extern const Event WriteBufferFromFileDescriptorWriteBytes; } +namespace CurrentMetrics +{ + extern const Metric Write; +} + namespace DB { diff --git a/dbms/include/DB/Interpreters/ActiveMetrics.h b/dbms/include/DB/Interpreters/ActiveMetrics.h deleted file mode 100644 index aad4a3b93fe..00000000000 --- a/dbms/include/DB/Interpreters/ActiveMetrics.h +++ /dev/null @@ -1,39 +0,0 @@ -#pragma once - -#include -#include -#include - - -namespace DB -{ - -class Context; - - -/** Periodically (each minute, starting at 30 seconds offset) - * calculates and updates some metrics (see CurrentMetrics.h), - * that are not updated automatically (so, need to be actively calculated). - */ -class ActiveMetrics -{ -public: - ActiveMetrics(Context & context_) - : context(context_), thread([this] { run(); }) - { - } - - ~ActiveMetrics(); - -private: - Context & context; - bool quit {false}; - std::mutex mutex; - std::condition_variable cond; - std::thread thread; - - void run(); - void update(); -}; - -} diff --git a/dbms/include/DB/Interpreters/AsynchronousMetrics.h b/dbms/include/DB/Interpreters/AsynchronousMetrics.h new file mode 100644 index 00000000000..8c653374c7d --- /dev/null +++ b/dbms/include/DB/Interpreters/AsynchronousMetrics.h @@ -0,0 +1,54 @@ +#pragma once + +#include +#include +#include +#include +#include + + +namespace DB +{ + +class Context; + + +/** Periodically (each minute, starting at 30 seconds offset) + * calculates and updates some metrics, + * that are not updated automatically (so, need to be asynchronously calculated). + */ +class AsynchronousMetrics +{ +public: + AsynchronousMetrics(Context & context_) + : context(context_), thread([this] { run(); }) + { + } + + ~AsynchronousMetrics(); + + using Value = double; + using Container = std::unordered_map; + + /// Returns copy of all values. + Container getValues() const; + +private: + Context & context; + + bool quit {false}; + std::mutex wait_mutex; + std::condition_variable wait_cond; + + Container container; + mutable std::mutex container_mutex; + + std::thread thread; + + void run(); + void update(); + + void set(const std::string & name, Value value); +}; + +} diff --git a/dbms/include/DB/Interpreters/ProcessList.h b/dbms/include/DB/Interpreters/ProcessList.h index 500421344b8..cc95b1631a0 100644 --- a/dbms/include/DB/Interpreters/ProcessList.h +++ b/dbms/include/DB/Interpreters/ProcessList.h @@ -17,15 +17,20 @@ #include +namespace CurrentMetrics +{ + extern const Metric Query; +} + namespace DB { -/** Список исполняющихся в данный момент запросов. - * Также реализует ограничение на их количество. +/** List of currently executing queries. + * Also implements limit on their number. */ -/** Информационная составляющая элемента списка процессов. - * Для вывода в SHOW PROCESSLIST. Не содержит никаких сложных объектов, которые что-то делают при копировании или в деструкторах. +/** Information of process list element. + * To output in SHOW PROCESSLIST query. Does not contain any complex objects, that do something on copy or destructor. */ struct ProcessInfo { @@ -42,7 +47,7 @@ struct ProcessInfo }; -/// Запрос и данные о его выполнении. +/// Query and information about its execution. struct ProcessListElement { String query; @@ -63,7 +68,7 @@ struct ProcessListElement bool is_cancelled = false; - /// Здесь могут быть зарегистрированы временные таблицы. Изменять под mutex-ом. + /// Temporary tables could be registered here. Modify under mutex. Tables temporary_tables; @@ -91,7 +96,7 @@ struct ProcessListElement progress.incrementPiecewiseAtomically(value); if (priority_handle) - priority_handle->waitIfNeed(std::chrono::seconds(1)); /// NOTE Можно сделать настраиваемым таймаут. + priority_handle->waitIfNeed(std::chrono::seconds(1)); /// NOTE Could make timeout customizable. return !is_cancelled; } @@ -114,14 +119,14 @@ struct ProcessListElement }; -/// Данные о запросах одного пользователя. +/// Data about queries for one user. struct ProcessListForUser { /// Query_id -> ProcessListElement * using QueryToElement = std::unordered_map; QueryToElement queries; - /// Ограничение и счётчик памяти на все одновременно выполняющиеся запросы одного пользователя. + /// Limit and counter for memory of all simultaneously running queries of single user. MemoryTracker user_memory_tracker; }; @@ -129,7 +134,7 @@ struct ProcessListForUser class ProcessList; -/// Держит итератор на список, и удаляет элемент из списка в деструкторе. +/// Keeps iterator to process list and removes element in destructor. class ProcessListEntry { private: @@ -158,7 +163,7 @@ public: using Element = ProcessListElement; using Entry = ProcessListEntry; - /// list, чтобы итераторы не инвалидировались. NOTE: можно заменить на cyclic buffer, но почти незачем. + /// list, for iterators not to invalidate. NOTE: could replace with cyclic buffer, but not worth. using Container = std::list; using Info = std::vector; /// User -> queries @@ -166,15 +171,15 @@ public: private: mutable std::mutex mutex; - mutable Poco::Condition have_space; /// Количество одновременно выполняющихся запросов стало меньше максимального. + mutable Poco::Condition have_space; /// Number of currently running queries has become less than maximum. Container cont; - size_t cur_size; /// В C++03 std::list::size не O(1). - size_t max_size; /// Если 0 - не ограничено. Иначе, если пытаемся добавить больше - кидается исключение. + size_t cur_size; /// In C++03 or C++11 and old ABI, std::list::size is not O(1). + size_t max_size; /// 0 means no limit. Otherwise, when limit exceeded, an exception is thrown. UserToQueries user_to_queries; QueryPriorities priorities; - /// Ограничение и счётчик памяти на все одновременно выполняющиеся запросы. + /// Limit and counter for memory of all simultaneously running queries. MemoryTracker total_memory_tracker; public: @@ -182,17 +187,17 @@ public: using EntryPtr = std::shared_ptr; - /** Зарегистрировать выполняющийся запрос. Возвращает refcounted объект, который удаляет запрос из списка при уничтожении. - * Если выполняющихся запросов сейчас слишком много - ждать не более указанного времени. - * Если времени не хватило - кинуть исключение. + /** Register running query. Returns refcounted object, that will remove element from list in destructor. + * If too much running queries - wait for not more than specified (see settings) amount of time. + * If timeout is passed - throw an exception. */ EntryPtr insert(const String & query_, const String & user_, const String & query_id_, const Poco::Net::IPAddress & ip_address_, UInt16 port_, const Settings & settings); - /// Количество одновременно выполняющихся запросов. + /// Number of currently executing queries. size_t size() const { return cur_size; } - /// Получить текущее состояние списка запросов. + /// Get current state of process list. Info getInfo() const { std::lock_guard lock(mutex); @@ -211,10 +216,10 @@ public: max_size = max_size_; } - /// Зарегистрировать временную таблицу. Потом её можно будет получить по query_id и по названию. + /// Register temporary table. Then it is accessible by query_id and name. void addTemporaryTable(ProcessListElement & elem, const String & table_name, StoragePtr storage); - /// Найти временную таблицу по query_id и по названию. Замечание: плохо работает, если есть разные запросы с одним query_id. + /// Find temporary table by query_id and name. NOTE: doesn't work fine if there are many queries with same query_id. StoragePtr tryGetTemporaryTable(const String & query_id, const String & table_name) const; }; diff --git a/dbms/include/DB/Interpreters/QueryPriorities.h b/dbms/include/DB/Interpreters/QueryPriorities.h index 7e8f3a5817d..f2e83b90212 100644 --- a/dbms/include/DB/Interpreters/QueryPriorities.h +++ b/dbms/include/DB/Interpreters/QueryPriorities.h @@ -8,17 +8,26 @@ #include -/** Реализует приоритеты запросов. - * Позволяет приостанавливать выполнение запроса, если выполняется хотя бы один более приоритетный запрос. +namespace CurrentMetrics +{ + extern const Metric QueryPreempted; +} + + +namespace DB +{ + +/** Implements query priorities in very primitive way. + * Allows to freeze query execution if at least one query of higher priority is executed. * - * Величина приоритета - целое число, чем меньше - тем больше приоритет. + * Priority value is integer, smaller means higher priority. * - * Приоритет 0 считается особенным - запросы с таким приоритетом выполняются всегда, - * не зависят от других запросов и не влияют на другие запросы. - * То есть 0 означает - не использовать приоритеты. + * Priority 0 is special - queries with that priority is always executed, + * not depends on other queries and not affect other queries. + * Thus 0 means - don't use priorities. * - * NOTE Возможности сделать лучше: - * - реализовать ограничение на максимальное количество запросов с таким приоритетом. + * NOTE Possibilities for improvement: + * - implement limit on maximum number of running queries with same priority. */ class QueryPriorities { @@ -30,7 +39,7 @@ private: using Count = int; - /// Количество выполняющихся сейчас запросов с заданным приоритетом. + /// Number of currently running queries for each priority. using Container = std::map; std::mutex mutex; @@ -38,8 +47,8 @@ private: Container container; - /** Если есть более приоритетные запросы - спать, пока они не перестанут быть или не истечёт таймаут. - * Возвращает true, если более приоритетные запросы исчезли на момент возврата из функции, false, если истёк таймаут. + /** If there are higher priority queries - sleep until they are finish or timeout happens. + * Returns true, if higher priority queries has finished at return of function, false, if timout exceeded. */ template bool waitIfNeed(Priority priority, Duration timeout) @@ -103,8 +112,8 @@ public: using Handle = std::shared_ptr; - /** Зарегистрировать, что запрос с заданным приоритетом выполняется. - * Возвращается объект, в деструкторе которого, запись о запросе удаляется. + /** Register query with specified priority. + * Returns an object that remove record in destructor. */ Handle insert(Priority priority) { @@ -117,3 +126,5 @@ public: return std::make_shared(*this, *it); } }; + +} diff --git a/dbms/include/DB/Storages/Distributed/DirectoryMonitor.h b/dbms/include/DB/Storages/Distributed/DirectoryMonitor.h index 50bfec1def7..fca11d9458d 100644 --- a/dbms/include/DB/Storages/Distributed/DirectoryMonitor.h +++ b/dbms/include/DB/Storages/Distributed/DirectoryMonitor.h @@ -18,6 +18,11 @@ #include +namespace CurrentMetrics +{ + extern const Metric DistributedSend; +} + namespace DB { diff --git a/dbms/include/DB/Storages/MergeTree/DiskSpaceMonitor.h b/dbms/include/DB/Storages/MergeTree/DiskSpaceMonitor.h index c402e9510ba..f444d76ca16 100644 --- a/dbms/include/DB/Storages/MergeTree/DiskSpaceMonitor.h +++ b/dbms/include/DB/Storages/MergeTree/DiskSpaceMonitor.h @@ -10,6 +10,12 @@ #include #include + +namespace CurrentMetrics +{ + extern const Metric DiskSpaceReservedForMerge; +} + namespace DB { @@ -20,10 +26,10 @@ namespace ErrorCodes } -/** Узнает количество свободного места в файловой системе. - * Можно "резервировать" место, чтобы разные операции могли согласованно планировать использование диска. - * Резервирования не разделяются по файловым системам. - * Вместо этого при запросе свободного места считается, что все резервирования сделаны в той же файловой системе. +/** Determines amount of free space in filesystem. + * Could "reserve" space, for different operations to plan disk space usage. + * Reservations are not separated for different filesystems, + * instead it is assumed, that all reservations are done within same filesystem. */ class DiskSpaceMonitor { @@ -61,7 +67,7 @@ public: } } - /// Изменить количество зарезервированного места. При увеличении не делается проверка, что места достаточно. + /// Change amount of reserved space. When new_size is greater than before, availability of free space is not checked. void update(size_t new_size) { std::lock_guard lock(DiskSpaceMonitor::mutex); @@ -99,7 +105,7 @@ public: size_t res = fs.f_bfree * fs.f_bsize; - /// Зарезервируем дополнительно 30 МБ. Когда я тестировал, statvfs показывал на несколько мегабайт больше свободного места, чем df. + /// Heuristic by Michael Kolupaev: reserve 30 MB more, because statvfs shows few megabytes more space than df. res -= std::min(res, 30 * (1ul << 20)); std::lock_guard lock(mutex); @@ -124,7 +130,7 @@ public: return reservation_count; } - /// Если места (приблизительно) недостаточно, бросает исключение. + /// If not enough (approximately) space, throw an exception. static ReservationPtr reserve(const std::string & path, size_t size) { size_t free_bytes = getUnreservedFreeSpace(path); diff --git a/dbms/include/DB/Storages/MergeTree/MergeList.h b/dbms/include/DB/Storages/MergeTree/MergeList.h index 9d9f1cec04e..9ce478e482f 100644 --- a/dbms/include/DB/Storages/MergeTree/MergeList.h +++ b/dbms/include/DB/Storages/MergeTree/MergeList.h @@ -7,6 +7,12 @@ #include #include + +namespace CurrentMetrics +{ + extern const Metric Merge; +} + namespace DB { diff --git a/dbms/include/DB/Storages/System/StorageSystemAsynchronousMetrics.h b/dbms/include/DB/Storages/System/StorageSystemAsynchronousMetrics.h new file mode 100644 index 00000000000..71741071944 --- /dev/null +++ b/dbms/include/DB/Storages/System/StorageSystemAsynchronousMetrics.h @@ -0,0 +1,44 @@ +#pragma once + +#include + +#include + + +namespace DB +{ + +class AsynchronousMetrics; + +/** Implements system table asynchronous_metrics, which allows to get values of periodically (asynchronously) updated metrics. + */ +class StorageSystemAsynchronousMetrics : private ext::shared_ptr_helper, public IStorage +{ +friend class ext::shared_ptr_helper; + +public: + static StoragePtr create(const std::string & name_, const AsynchronousMetrics & async_metrics_); + + std::string getName() const override { return "SystemAsynchronousMetrics"; } + std::string getTableName() const override { return name; } + + const NamesAndTypesList & getColumnsListImpl() const override { return columns; } + + BlockInputStreams read( + const Names & column_names, + ASTPtr query, + const Context & context, + const Settings & settings, + QueryProcessingStage::Enum & processed_stage, + size_t max_block_size = DEFAULT_BLOCK_SIZE, + unsigned threads = 1) override; + +private: + const std::string name; + NamesAndTypesList columns; + const AsynchronousMetrics & async_metrics; + + StorageSystemAsynchronousMetrics(const std::string & name_, const AsynchronousMetrics & async_metrics_); +}; + +} diff --git a/dbms/src/Client/Connection.cpp b/dbms/src/Client/Connection.cpp index 4a57ee84074..dc7f63bcdbe 100644 --- a/dbms/src/Client/Connection.cpp +++ b/dbms/src/Client/Connection.cpp @@ -24,6 +24,11 @@ #include +namespace CurrentMetrics +{ + extern const Metric SendExternalTables; +} + namespace DB { @@ -286,7 +291,7 @@ void Connection::sendQuery(const String & query, const String & query_id_, UInt6 block_in.reset(); block_out.reset(); - /// Если версия сервера достаточно новая и стоит флаг, отправляем пустой блок, символизируя конец передачи данных. + /// If server version is new enough, send empty block which meand end of data. if (server_revision >= DBMS_MIN_REVISION_WITH_TEMPORARY_TABLES && !with_pending_data) { sendData(Block()); diff --git a/dbms/src/Common/CurrentMetrics.cpp b/dbms/src/Common/CurrentMetrics.cpp index d3653b59af7..aaba82b06e2 100644 --- a/dbms/src/Common/CurrentMetrics.cpp +++ b/dbms/src/Common/CurrentMetrics.cpp @@ -1,7 +1,52 @@ #include +/// Available metrics. Add something here as you wish. +#define APPLY_FOR_METRICS(M) \ + M(Query) \ + M(Merge) \ + M(ReplicatedFetch) \ + M(ReplicatedSend) \ + M(ReplicatedChecks) \ + M(BackgroundPoolTask) \ + M(DiskSpaceReservedForMerge) \ + M(DistributedSend) \ + M(QueryPreempted) \ + M(TCPConnection) \ + M(HTTPConnection) \ + M(InterserverConnection) \ + M(OpenFileForRead) \ + M(OpenFileForWrite) \ + M(Read) \ + M(Write) \ + M(SendExternalTables) \ + M(QueryThread) \ + M(ReadonlyReplica) \ + M(MemoryTracking) \ + + namespace CurrentMetrics { - std::atomic values[END] {}; /// Глобальная переменная - инициализируется нулями. + #define M(NAME) extern const Metric NAME = __COUNTER__; + APPLY_FOR_METRICS(M) + #undef M + constexpr Metric END = __COUNTER__; + + std::atomic values[END] {}; /// Global variable, initialized by zeros. + + const char * getDescription(Metric event) + { + static const char * descriptions[] = + { + #define M(NAME) #NAME, + APPLY_FOR_METRICS(M) + #undef M + }; + + return descriptions[event]; + } + + Metric end() { return END; } } + +#undef APPLY_FOR_METRICS diff --git a/dbms/src/Common/MemoryTracker.cpp b/dbms/src/Common/MemoryTracker.cpp index 2328021394d..696ccfdd862 100644 --- a/dbms/src/Common/MemoryTracker.cpp +++ b/dbms/src/Common/MemoryTracker.cpp @@ -9,12 +9,17 @@ #include +namespace CurrentMetrics +{ + extern const Metric MemoryTracking; +} + namespace DB { -namespace ErrorCodes -{ - extern const int MEMORY_LIMIT_EXCEEDED; -} + namespace ErrorCodes + { + extern const int MEMORY_LIMIT_EXCEEDED; + } } diff --git a/dbms/src/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.cpp b/dbms/src/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.cpp index b0f2c299c56..be844f70115 100644 --- a/dbms/src/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.cpp +++ b/dbms/src/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.cpp @@ -4,6 +4,12 @@ #include +namespace CurrentMetrics +{ + extern const Metric QueryThread; +} + + namespace DB { diff --git a/dbms/src/IO/ReadBufferAIO.cpp b/dbms/src/IO/ReadBufferAIO.cpp index e3b7a59c9b0..8610abd1df5 100644 --- a/dbms/src/IO/ReadBufferAIO.cpp +++ b/dbms/src/IO/ReadBufferAIO.cpp @@ -16,6 +16,11 @@ namespace ProfileEvents extern const Event ReadBufferAIOReadBytes; } +namespace CurrentMetrics +{ + extern const Metric Read; +} + namespace DB { diff --git a/dbms/src/IO/WriteBufferAIO.cpp b/dbms/src/IO/WriteBufferAIO.cpp index 620de3ed2f4..105492af3b2 100644 --- a/dbms/src/IO/WriteBufferAIO.cpp +++ b/dbms/src/IO/WriteBufferAIO.cpp @@ -13,6 +13,11 @@ namespace ProfileEvents extern const Event WriteBufferAIOWriteBytes; } +namespace CurrentMetrics +{ + extern const Metric Write; +} + namespace DB { @@ -34,10 +39,10 @@ namespace ErrorCodes /// Примечание: выделяется дополнительная страница, которая содежрит те данные, которые /// не влезают в основной буфер. WriteBufferAIO::WriteBufferAIO(const std::string & filename_, size_t buffer_size_, int flags_, mode_t mode_, - char * existing_memory_) - : WriteBufferFromFileBase(buffer_size_ + DEFAULT_AIO_FILE_BLOCK_SIZE, existing_memory_, DEFAULT_AIO_FILE_BLOCK_SIZE), - flush_buffer(BufferWithOwnMemory(this->memory.size(), nullptr, DEFAULT_AIO_FILE_BLOCK_SIZE)), - filename(filename_) + char * existing_memory_) + : WriteBufferFromFileBase(buffer_size_ + DEFAULT_AIO_FILE_BLOCK_SIZE, existing_memory_, DEFAULT_AIO_FILE_BLOCK_SIZE), + flush_buffer(BufferWithOwnMemory(this->memory.size(), nullptr, DEFAULT_AIO_FILE_BLOCK_SIZE)), + filename(filename_) { /// Исправить информацию о размере буферов, чтобы дополнительные страницы не касались базового класса BufferBase. this->buffer().resize(this->buffer().size() - DEFAULT_AIO_FILE_BLOCK_SIZE); diff --git a/dbms/src/Interpreters/Aggregator.cpp b/dbms/src/Interpreters/Aggregator.cpp index 3560239a2e2..e837c97e285 100644 --- a/dbms/src/Interpreters/Aggregator.cpp +++ b/dbms/src/Interpreters/Aggregator.cpp @@ -29,6 +29,11 @@ namespace ProfileEvents extern const Event ExternalAggregationUncompressedBytes; } +namespace CurrentMetrics +{ + extern const Metric QueryThread; +} + namespace DB { diff --git a/dbms/src/Interpreters/ActiveMetrics.cpp b/dbms/src/Interpreters/AsynchronousMetrics.cpp similarity index 70% rename from dbms/src/Interpreters/ActiveMetrics.cpp rename to dbms/src/Interpreters/AsynchronousMetrics.cpp index 2c5df33163e..f8884e00e49 100644 --- a/dbms/src/Interpreters/ActiveMetrics.cpp +++ b/dbms/src/Interpreters/AsynchronousMetrics.cpp @@ -1,4 +1,4 @@ -#include +#include #include #include #include @@ -13,16 +13,16 @@ namespace DB { -ActiveMetrics::~ActiveMetrics() +AsynchronousMetrics::~AsynchronousMetrics() { try { { - std::lock_guard lock{mutex}; + std::lock_guard lock{wait_mutex}; quit = true; } - cond.notify_one(); + wait_cond.notify_one(); thread.join(); } catch (...) @@ -32,11 +32,25 @@ ActiveMetrics::~ActiveMetrics() } -void ActiveMetrics::run() +AsynchronousMetrics::Container AsynchronousMetrics::getValues() const { - setThreadName("ActiveMetrics"); + std::lock_guard lock{container_mutex}; + return container; +} - std::unique_lock lock{mutex}; + +void AsynchronousMetrics::set(const std::string & name, Value value) +{ + std::lock_guard lock{container_mutex}; + container[name] = value; +} + + +void AsynchronousMetrics::run() +{ + setThreadName("AsyncMetrics"); + + std::unique_lock lock{wait_mutex}; /// Next minute + 30 seconds. To be distant with moment of transmission of metrics, see MetricsTransmitter. const auto get_next_minute = [] @@ -47,7 +61,7 @@ void ActiveMetrics::run() while (true) { - if (cond.wait_until(lock, get_next_minute(), [this] { return quit; })) + if (wait_cond.wait_until(lock, get_next_minute(), [this] { return quit; })) break; try @@ -78,21 +92,21 @@ static void calculateMaxAndSum(Max & max, Sum & sum, T x) } -void ActiveMetrics::update() +void AsynchronousMetrics::update() { { if (auto mark_cache = context.getMarkCache()) { - CurrentMetrics::set(CurrentMetrics::MarkCacheBytes, mark_cache->weight()); - CurrentMetrics::set(CurrentMetrics::MarkCacheFiles, mark_cache->count()); + set("MarkCacheBytes", mark_cache->weight()); + set("MarkCacheFiles", mark_cache->count()); } } { if (auto uncompressed_cache = context.getUncompressedCache()) { - CurrentMetrics::set(CurrentMetrics::UncompressedCacheBytes, uncompressed_cache->weight()); - CurrentMetrics::set(CurrentMetrics::UncompressedCacheCells, uncompressed_cache->count()); + set("UncompressedCacheBytes", uncompressed_cache->weight()); + set("UncompressedCacheCells", uncompressed_cache->count()); } } @@ -156,18 +170,18 @@ void ActiveMetrics::update() } } - CurrentMetrics::set(CurrentMetrics::ReplicasMaxQueueSize, max_queue_size); - CurrentMetrics::set(CurrentMetrics::ReplicasMaxInsertsInQueue, max_inserts_in_queue); - CurrentMetrics::set(CurrentMetrics::ReplicasMaxMergesInQueue, max_merges_in_queue); + set("ReplicasMaxQueueSize", max_queue_size); + set("ReplicasMaxInsertsInQueue", max_inserts_in_queue); + set("ReplicasMaxMergesInQueue", max_merges_in_queue); - CurrentMetrics::set(CurrentMetrics::ReplicasSumQueueSize, sum_queue_size); - CurrentMetrics::set(CurrentMetrics::ReplicasSumInsertsInQueue, sum_inserts_in_queue); - CurrentMetrics::set(CurrentMetrics::ReplicasSumMergesInQueue, sum_merges_in_queue); + set("ReplicasSumQueueSize", sum_queue_size); + set("ReplicasSumInsertsInQueue", sum_inserts_in_queue); + set("ReplicasSumMergesInQueue", sum_merges_in_queue); - CurrentMetrics::set(CurrentMetrics::ReplicasMaxAbsoluteDelay, max_absolute_delay); - CurrentMetrics::set(CurrentMetrics::ReplicasMaxRelativeDelay, max_relative_delay); + set("ReplicasMaxAbsoluteDelay", max_absolute_delay); + set("ReplicasMaxRelativeDelay", max_relative_delay); - CurrentMetrics::set(CurrentMetrics::MaxPartCountForPartition, max_part_count_for_partition); + set("MaxPartCountForPartition", max_part_count_for_partition); } /// Add more metrics as you wish. diff --git a/dbms/src/Server/HTTPHandler.h b/dbms/src/Server/HTTPHandler.h index f4ebb06df0b..1259f935bc0 100644 --- a/dbms/src/Server/HTTPHandler.h +++ b/dbms/src/Server/HTTPHandler.h @@ -5,6 +5,11 @@ #include "Server.h" +namespace CurrentMetrics +{ + extern const Metric HTTPConnection; +} + namespace DB { @@ -21,7 +26,7 @@ public: struct Output { std::shared_ptr out; - /// Используется для выдачи ответа. Равен либо out, либо CompressedWriteBuffer(*out), в зависимости от настроек. + /// Used for sending response. Points to 'out', or to CompressedWriteBuffer(*out), depending on settings. std::shared_ptr out_maybe_compressed; }; @@ -38,7 +43,7 @@ private: Logger * log; - /// Функция также инициализирует used_output. + /// Also initializes 'used_output'. void processQuery( Poco::Net::HTTPServerRequest & request, HTMLForm & params, diff --git a/dbms/src/Server/InterserverIOHTTPHandler.h b/dbms/src/Server/InterserverIOHTTPHandler.h index 28dea53ee79..1b8e95dba32 100644 --- a/dbms/src/Server/InterserverIOHTTPHandler.h +++ b/dbms/src/Server/InterserverIOHTTPHandler.h @@ -4,6 +4,11 @@ #include +namespace CurrentMetrics +{ + extern const Metric InterserverConnection; +} + namespace DB { diff --git a/dbms/src/Server/MetricsTransmitter.cpp b/dbms/src/Server/MetricsTransmitter.cpp index 523e5ce894c..a21e45da243 100644 --- a/dbms/src/Server/MetricsTransmitter.cpp +++ b/dbms/src/Server/MetricsTransmitter.cpp @@ -3,6 +3,7 @@ #include #include #include +#include namespace DB @@ -39,6 +40,8 @@ void MetricsTransmitter::run() std::chrono::system_clock::now() + std::chrono::minutes(1)); }; + ProfileEvents::Count prev_counters[ProfileEvents::end()] {}; + std::unique_lock lock{mutex}; while (true) @@ -46,27 +49,29 @@ void MetricsTransmitter::run() if (cond.wait_until(lock, get_next_minute(), [this] { return quit; })) break; - transmit(); + transmit(prev_counters); } } -void MetricsTransmitter::transmit() +void MetricsTransmitter::transmit(ProfileEvents::Count * prev_counters) { + auto async_metrics_values = async_metrics.getValues(); + GraphiteWriter::KeyValueVector key_vals{}; - key_vals.reserve(ProfileEvents::end() + CurrentMetrics::END); + key_vals.reserve(ProfileEvents::end() + CurrentMetrics::end() + async_metrics_values.size()); for (size_t i = 0, end = ProfileEvents::end(); i < end; ++i) { const auto counter = ProfileEvents::counters[i].load(std::memory_order_relaxed); - const auto counter_increment = counter - prev_counters[i].load(std::memory_order_relaxed); - prev_counters[i].store(counter, std::memory_order_relaxed); + const auto counter_increment = counter - prev_counters[i]; + prev_counters[i] = counter; std::string key {ProfileEvents::getDescription(static_cast(i))}; key_vals.emplace_back(profile_events_path_prefix + key, counter_increment); } - for (size_t i = 0; i < CurrentMetrics::END; ++i) + for (size_t i = 0, end = CurrentMetrics::end(); i < end; ++i) { const auto value = CurrentMetrics::values[i].load(std::memory_order_relaxed); @@ -74,6 +79,11 @@ void MetricsTransmitter::transmit() key_vals.emplace_back(current_metrics_path_prefix + key, value); } + for (const auto & name_value : async_metrics_values) + { + key_vals.emplace_back(asynchronous_metrics_path_prefix + name_value.first, name_value.second); + } + BaseDaemon::instance().writeToGraphite(key_vals); } diff --git a/dbms/src/Server/MetricsTransmitter.h b/dbms/src/Server/MetricsTransmitter.h index 3b54a621d8d..f8ee42d7f8e 100644 --- a/dbms/src/Server/MetricsTransmitter.h +++ b/dbms/src/Server/MetricsTransmitter.h @@ -10,23 +10,25 @@ namespace DB { +class AsynchronousMetrics; + /** Automatically sends * - difference of ProfileEvents; * - values of CurrentMetrics; - * - values of ActiveMetrics; + * - values of AsynchronousMetrics; * to Graphite at beginning of every minute. */ class MetricsTransmitter { public: + MetricsTransmitter(const AsynchronousMetrics & async_metrics_) : async_metrics(async_metrics_) {} ~MetricsTransmitter(); private: void run(); - void transmit(); + void transmit(ProfileEvents::Count * prev_counters); - /// Values of ProfileEvents counters at previous iteration (or zeros at first time). - decltype(ProfileEvents::counters) prev_counters{}; + const AsynchronousMetrics & async_metrics; bool quit = false; std::mutex mutex; @@ -35,7 +37,7 @@ private: static constexpr auto profile_events_path_prefix = "ClickHouse.ProfileEvents."; static constexpr auto current_metrics_path_prefix = "ClickHouse.Metrics."; - static constexpr auto active_metrics_path_prefix = "ClickHouse.ActiveMetrics."; + static constexpr auto asynchronous_metrics_path_prefix = "ClickHouse.AsynchronousMetrics."; }; } diff --git a/dbms/src/Server/Server.cpp b/dbms/src/Server/Server.cpp index 45e17d7945f..767618d52d7 100644 --- a/dbms/src/Server/Server.cpp +++ b/dbms/src/Server/Server.cpp @@ -20,7 +20,7 @@ #include #include -#include +#include #include #include @@ -39,6 +39,7 @@ #include #include #include +#include #include #include #include @@ -379,10 +380,6 @@ int Server::main(const std::vector & args) ); { - const auto metrics_transmitter = config().getBool("use_graphite", true) - ? std::make_unique() - : nullptr; - const std::string listen_host = config().getString("listen_host", "::"); Poco::Timespan keep_alive_timeout(config().getInt("keep_alive_timeout", 10), 0); @@ -505,7 +502,13 @@ int Server::main(const std::vector & args) } /// This object will periodically calculate some metrics. - ActiveMetrics active_metrics(*global_context); + AsynchronousMetrics async_metrics(*global_context); + + system_database->attachTable("asynchronous_metrics", StorageSystemAsynchronousMetrics::create("asynchronous_metrics", async_metrics)); + + const auto metrics_transmitter = config().getBool("use_graphite", true) + ? std::make_unique(async_metrics) + : nullptr; waitForTerminationRequest(); } diff --git a/dbms/src/Server/TCPHandler.h b/dbms/src/Server/TCPHandler.h index 7fe6631e7d9..8d601e039c0 100644 --- a/dbms/src/Server/TCPHandler.h +++ b/dbms/src/Server/TCPHandler.h @@ -14,6 +14,12 @@ #include "Server.h" +namespace CurrentMetrics +{ + extern const Metric TCPConnection; +} + + namespace DB { diff --git a/dbms/src/Storages/MergeTree/BackgroundProcessingPool.cpp b/dbms/src/Storages/MergeTree/BackgroundProcessingPool.cpp index 6dc826a200f..f1102346342 100644 --- a/dbms/src/Storages/MergeTree/BackgroundProcessingPool.cpp +++ b/dbms/src/Storages/MergeTree/BackgroundProcessingPool.cpp @@ -7,6 +7,12 @@ #include + +namespace CurrentMetrics +{ + extern const Metric BackgroundPoolTask; +} + namespace DB { diff --git a/dbms/src/Storages/MergeTree/DataPartsExchange.cpp b/dbms/src/Storages/MergeTree/DataPartsExchange.cpp index f4070eedb4b..236a0070b42 100644 --- a/dbms/src/Storages/MergeTree/DataPartsExchange.cpp +++ b/dbms/src/Storages/MergeTree/DataPartsExchange.cpp @@ -3,6 +3,12 @@ #include +namespace CurrentMetrics +{ + extern const Metric ReplicatedSend; + extern const Metric ReplicatedFetch; +} + namespace DB { diff --git a/dbms/src/Storages/MergeTree/MergeTreePartChecker.cpp b/dbms/src/Storages/MergeTree/MergeTreePartChecker.cpp index c5dbec7e689..bd4634701f8 100644 --- a/dbms/src/Storages/MergeTree/MergeTreePartChecker.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreePartChecker.cpp @@ -15,6 +15,11 @@ #include +namespace CurrentMetrics +{ + extern const Metric ReplicatedChecks; +} + namespace DB { diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp index 7d340ee6e1f..beae7374d9d 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp @@ -13,6 +13,12 @@ namespace ProfileEvents extern const Event ReplicaPartialShutdown; } +namespace CurrentMetrics +{ + extern const Metric ReadonlyReplica; +} + + namespace DB { diff --git a/dbms/src/Storages/MergeTree/ShardedPartitionUploader.cpp b/dbms/src/Storages/MergeTree/ShardedPartitionUploader.cpp index 5c9129d8f69..4797d2f3939 100644 --- a/dbms/src/Storages/MergeTree/ShardedPartitionUploader.cpp +++ b/dbms/src/Storages/MergeTree/ShardedPartitionUploader.cpp @@ -7,6 +7,13 @@ #include + +namespace CurrentMetrics +{ + extern const Metric ReplicatedSend; + extern const Metric ReplicatedFetch; +} + namespace DB { diff --git a/dbms/src/Storages/System/StorageSystemAsynchronousMetrics.cpp b/dbms/src/Storages/System/StorageSystemAsynchronousMetrics.cpp new file mode 100644 index 00000000000..80dfb5d9cd9 --- /dev/null +++ b/dbms/src/Storages/System/StorageSystemAsynchronousMetrics.cpp @@ -0,0 +1,68 @@ +#include +#include +#include +#include +#include +#include + + +namespace DB +{ + + +StorageSystemAsynchronousMetrics::StorageSystemAsynchronousMetrics(const std::string & name_, const AsynchronousMetrics & async_metrics_) + : name(name_), + columns + { + {"metric", std::make_shared()}, + {"value", std::make_shared()}, + }, + async_metrics(async_metrics_) +{ +} + +StoragePtr StorageSystemAsynchronousMetrics::create(const std::string & name_, const AsynchronousMetrics & async_metrics_) +{ + return make_shared(name_, async_metrics_); +} + + +BlockInputStreams StorageSystemAsynchronousMetrics::read( + const Names & column_names, + ASTPtr query, + const Context & context, + const Settings & settings, + QueryProcessingStage::Enum & processed_stage, + const size_t max_block_size, + const unsigned threads) +{ + check(column_names); + processed_stage = QueryProcessingStage::FetchColumns; + + Block block; + + ColumnWithTypeAndName col_metric; + col_metric.name = "metric"; + col_metric.type = std::make_shared(); + col_metric.column = std::make_shared(); + block.insert(col_metric); + + ColumnWithTypeAndName col_value; + col_value.name = "value"; + col_value.type = std::make_shared(); + col_value.column = std::make_shared(); + block.insert(col_value); + + auto async_metrics_values = async_metrics.getValues(); + + for (const auto & name_value : async_metrics_values) + { + col_metric.column->insert(name_value.first); + col_value.column->insert(name_value.second); + } + + return BlockInputStreams(1, std::make_shared(block)); +} + + +} diff --git a/dbms/src/Storages/System/StorageSystemMetrics.cpp b/dbms/src/Storages/System/StorageSystemMetrics.cpp index 5cbfd4c8ee1..64610d9e5a9 100644 --- a/dbms/src/Storages/System/StorageSystemMetrics.cpp +++ b/dbms/src/Storages/System/StorageSystemMetrics.cpp @@ -52,7 +52,7 @@ BlockInputStreams StorageSystemMetrics::read( col_value.column = std::make_shared(); block.insert(col_value); - for (size_t i = 0; i < CurrentMetrics::END; ++i) + for (size_t i = 0, end = CurrentMetrics::end(); i < end; ++i) { auto value = CurrentMetrics::values[i].load(std::memory_order_relaxed);