diff --git a/CMakeLists.txt b/CMakeLists.txt index 0e9449b0b0b..41ce8b0d4d1 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -108,6 +108,7 @@ include_directories (${ClickHouse_SOURCE_DIR}/contrib/libsparsehash/) include_directories (${ClickHouse_SOURCE_DIR}/contrib/libre2/) include_directories (${ClickHouse_BINARY_DIR}/contrib/libre2/) include_directories (${ClickHouse_SOURCE_DIR}/contrib/libzookeeper/include/) +include_directories (${ClickHouse_SOURCE_DIR}/contrib/libtcmalloc/include/) include_directories (${ClickHouse_SOURCE_DIR}/contrib/libpoco/Foundation/include/) include_directories (${ClickHouse_SOURCE_DIR}/contrib/libpoco/Util/include/) include_directories (${ClickHouse_SOURCE_DIR}/contrib/libpoco/Net/include/) diff --git a/dbms/CMakeLists.txt b/dbms/CMakeLists.txt index 3564c84a796..9d2c0351ade 100644 --- a/dbms/CMakeLists.txt +++ b/dbms/CMakeLists.txt @@ -15,6 +15,10 @@ else() set (LINK_MONGOCLIENT libmongoclient.a libssl.a libcrypto.a libboost_thread.a) endif() +if (DISABLE_LIBTCMALLOC) + add_definitions(-D NO_TCMALLOC) +endif() + add_library(string_utils include/DB/Common/StringUtils.h @@ -371,6 +375,7 @@ add_library (dbms include/DB/Interpreters/ClusterProxy/DescribeQueryConstructor.h include/DB/Interpreters/ClusterProxy/AlterQueryConstructor.h include/DB/Interpreters/ClusterProxy/Query.h + include/DB/Interpreters/AsynchronousMetrics.h include/DB/Common/Allocator.h include/DB/Common/CombinedCardinalityEstimator.h include/DB/Common/ExternalTable.h @@ -541,6 +546,7 @@ add_library (dbms include/DB/Storages/StorageStripeLog.h include/DB/Storages/System/StorageSystemEvents.h include/DB/Storages/System/StorageSystemMetrics.h + include/DB/Storages/System/StorageSystemAsynchronousMetrics.h include/DB/Storages/System/StorageSystemTables.h include/DB/Storages/MergeTree/MarkRange.h include/DB/Storages/MergeTree/MergeTreeDataMerger.h @@ -647,6 +653,7 @@ add_library (dbms src/Storages/System/StorageSystemProcesses.cpp src/Storages/System/StorageSystemEvents.cpp src/Storages/System/StorageSystemMetrics.cpp + src/Storages/System/StorageSystemAsynchronousMetrics.cpp src/Storages/System/StorageSystemMerges.cpp src/Storages/System/StorageSystemSettings.cpp src/Storages/System/StorageSystemZooKeeper.cpp @@ -852,6 +859,7 @@ add_library (dbms src/Interpreters/ClusterProxy/DescribeQueryConstructor.cpp src/Interpreters/ClusterProxy/AlterQueryConstructor.cpp src/Interpreters/ClusterProxy/Query.cpp + src/Interpreters/AsynchronousMetrics.cpp src/Functions/FunctionFactory.cpp src/Functions/FunctionsArithmetic.cpp diff --git a/dbms/include/DB/Common/Arena.h b/dbms/include/DB/Common/Arena.h index 652eb8e9de0..e6b68a6ff5f 100644 --- a/dbms/include/DB/Common/Arena.h +++ b/dbms/include/DB/Common/Arena.h @@ -10,22 +10,28 @@ #include +namespace ProfileEvents +{ + extern const Event ArenaAllocChunks; + extern const Event ArenaAllocBytes; +} + namespace DB { -/** Пул, в который можно складывать что-нибудь. Например, короткие строки. - * Сценарий использования: - * - складываем много строк и запоминаем их адреса; - * - адреса остаются валидными в течение жизни пула; - * - при уничтожении пула, вся память освобождается; - * - память выделяется и освобождается большими кусками; - * - удаление части данных не предусмотрено; +/** Memory pool to append something. For example, short strings. + * Usage scenario: + * - put lot of strings inside pool, keep their addresses; + * - addresses remain valid during lifetime of pool; + * - at destruction of pool, all memory is freed; + * - memory is allocated and freed by large chunks; + * - freeing parts of data is not possible (but look at ArenaWithFreeLists if you need); */ class Arena : private boost::noncopyable { private: - /// Непрерывный кусок памяти и указатель на свободное место в нём. Односвязный список. + /// Contiguous chunk of memory and pointer to free space inside it. Member of single-linked list. struct Chunk : private Allocator /// empty base optimization { char * begin; @@ -59,7 +65,7 @@ private: size_t growth_factor; size_t linear_growth_threshold; - /// Последний непрерывный кусок памяти. + /// Last contiguous chunk of memory. Chunk * head; size_t size_in_bytes; @@ -68,7 +74,8 @@ private: return (s + 4096 - 1) / 4096 * 4096; } - /// Если размер чанка меньше linear_growth_threshold, то рост экспоненциальный, иначе - линейный, для уменьшения потребления памяти. + /// If chunks size is less than 'linear_growth_threshold', then use exponential growth, otherwise - linear growth + /// (to not allocate too much excessive memory). size_t nextSize(size_t min_next_size) const { size_t size_after_grow = 0; @@ -84,7 +91,7 @@ private: return roundUpToPageSize(size_after_grow); } - /// Добавить следующий непрерывный кусок памяти размера не меньше заданного. + /// Add next contiguous chunk of memory with size not less than specified. void NO_INLINE addChunk(size_t min_size) { head = new Chunk(nextSize(min_size), head); @@ -103,7 +110,7 @@ public: delete head; } - /// Получить кусок памяти, без выравнивания. + /// Get piece of memory, without alignment. char * alloc(size_t size) { if (unlikely(head->pos + size > head->end)) @@ -114,17 +121,17 @@ public: return res; } - /** Отменить только что сделанное выделение памяти. - * Нужно передать размер не больше того, который был только что выделен. + /** Rollback just performed allocation. + * Must pass size not more that was just allocated. */ void rollback(size_t size) { head->pos -= size; } - /** Начать или расширить непрерывный кусок памяти. - * begin - текущее начало куска памяти, если его надо расширить, или nullptr, если его надо начать. - * Если в чанке не хватило места - скопировать существующие данные в новый кусок памяти и изменить значение begin. + /** Begin or expand allocation of contiguous piece of memory. + * 'begin' - current begin of piece of memory, if it need to be expanded, or nullptr, if it need to be started. + * If there is no space in chunk to expand current piece of memory - then copy all piece to new chunk and change value of 'begin'. */ char * allocContinue(size_t size, char const *& begin) { @@ -148,7 +155,7 @@ public: return res; } - /// Вставить строку без выравнивания. + /// Insert string without alignment. const char * insert(const char * data, size_t size) { char * res = alloc(size); @@ -156,7 +163,7 @@ public: return res; } - /// Размер выделенного пула в байтах + /// Size of chunks in bytes. size_t size() const { return size_in_bytes; diff --git a/dbms/include/DB/Common/CurrentMetrics.h b/dbms/include/DB/Common/CurrentMetrics.h index 3544845ab15..720e07218dc 100644 --- a/dbms/include/DB/Common/CurrentMetrics.h +++ b/dbms/include/DB/Common/CurrentMetrics.h @@ -1,81 +1,45 @@ #pragma once +#include #include #include #include -/** Позволяет считать количество одновременно происходящих событий или текущее значение какой-либо метрики. - * - для высокоуровневого профайлинга. +/** Allows to count number of simultaneously happening processes or current value of some metric. + * - for high-level profiling. * - * Также смотрите ProfileEvents.h - * В ProfileEvents считается общее количество произошедших (точечных) событий - например, сколько раз были выполнены запросы. - * В CurrentMetrics считается количество одновременных событий - например, сколько сейчас одновременно выполняется запросов, - * или текущее значение метрики - например, величина отставания реплики в секундах. + * See also ProfileEvents.h + * ProfileEvents counts number of happened events - for example, how many times queries was executed. + * CurrentMetrics counts number of simultaneously happening events - for example, number of currently executing queries, right now, + * or just current value of some metric - for example, replica delay in seconds. + * + * CurrentMetrics are updated instantly and are correct for any point in time. + * For periodically (asynchronously) updated metrics, see AsynchronousMetrics.h */ -#define APPLY_FOR_METRICS(M) \ - M(Query) \ - M(Merge) \ - M(ReplicatedFetch) \ - M(ReplicatedSend) \ - M(ReplicatedChecks) \ - M(BackgroundPoolTask) \ - M(DiskSpaceReservedForMerge) \ - M(DistributedSend) \ - M(QueryPreempted) \ - M(TCPConnection) \ - M(HTTPConnection) \ - M(InterserverConnection) \ - M(OpenFileForRead) \ - M(OpenFileForWrite) \ - M(Read) \ - M(Write) \ - M(SendExternalTables) \ - M(QueryThread) \ - M(ReadonlyReplica) \ - M(MemoryTracking) \ - \ - M(END) - namespace CurrentMetrics { - /// Виды метрик. - enum Metric - { - #define M(NAME) NAME, - APPLY_FOR_METRICS(M) - #undef M - }; - - - /// Получить текстовое описание метрики по его enum-у. - inline const char * getDescription(Metric event) - { - static const char * descriptions[] = - { - #define M(NAME) #NAME, - APPLY_FOR_METRICS(M) - #undef M - }; - - return descriptions[event]; - } - - + /// Metric identifier (index in array). + using Metric = size_t; using Value = int64_t; - /// Счётчики - текущие значения метрик. - extern std::atomic values[END]; + /// Get text description of metric by identifier. Returns statically allocated string. + const char * getDescription(Metric event); + /// Metric identifier -> current value of metric. + extern std::atomic values[]; - /// Выставить значение указанной метрики. + /// Get index just after last metric identifier. + Metric end(); + + /// Set value of specified metric. inline void set(Metric metric, Value value) { values[metric] = value; } - /// Прибавить величину к значению указанной метрики. Вы затем должны вычесть величину самостоятельно. Или см. ниже class Increment. + /// Add value for specified metric. You must subtract value later; or see class Increment below. inline void add(Metric metric, Value value = 1) { values[metric] += value; @@ -86,7 +50,7 @@ namespace CurrentMetrics add(metric, -value); } - /// На время жизни объекта, увеличивает указанное значение на указанную величину. + /// For lifetime of object, add amout for specified metric. Then subtract. class Increment { private: @@ -128,7 +92,7 @@ namespace CurrentMetrics amount = new_amount; } - /// Уменьшить значение раньше вызова деструктора. + /// Subtract value before destructor. void destroy() { *what -= amount; @@ -136,6 +100,3 @@ namespace CurrentMetrics } }; } - - -#undef APPLY_FOR_METRICS diff --git a/dbms/include/DB/Common/PoolWithFailoverBase.h b/dbms/include/DB/Common/PoolWithFailoverBase.h index f259dbb29a6..febde6bf3de 100644 --- a/dbms/include/DB/Common/PoolWithFailoverBase.h +++ b/dbms/include/DB/Common/PoolWithFailoverBase.h @@ -18,6 +18,12 @@ namespace ErrorCodes } } +namespace ProfileEvents +{ + extern const Event DistributedConnectionFailTry; + extern const Event DistributedConnectionFailAtAll; +} + namespace { diff --git a/dbms/include/DB/Common/ProfileEvents.h b/dbms/include/DB/Common/ProfileEvents.h index 39dd24f4bfe..06610398795 100644 --- a/dbms/include/DB/Common/ProfileEvents.h +++ b/dbms/include/DB/Common/ProfileEvents.h @@ -4,136 +4,29 @@ #include -/** Позволяет считать количество различных событий, произошедших в программе - * - для высокоуровневого профайлинга. +/** Implements global counters for various events happening in the application + * - for high level profiling. + * See .cpp for list of events. */ -#define APPLY_FOR_EVENTS(M) \ - M(Query) \ - M(SelectQuery) \ - M(InsertQuery) \ - M(FileOpen) \ - M(Seek) \ - M(ReadBufferFromFileDescriptorRead) \ - M(ReadBufferFromFileDescriptorReadBytes) \ - M(WriteBufferFromFileDescriptorWrite) \ - M(WriteBufferFromFileDescriptorWriteBytes) \ - M(ReadBufferAIORead) \ - M(ReadBufferAIOReadBytes) \ - M(WriteBufferAIOWrite) \ - M(WriteBufferAIOWriteBytes) \ - M(ReadCompressedBytes) \ - M(CompressedReadBufferBlocks) \ - M(CompressedReadBufferBytes) \ - M(UncompressedCacheHits) \ - M(UncompressedCacheMisses) \ - M(UncompressedCacheWeightLost) \ - M(IOBufferAllocs) \ - M(IOBufferAllocBytes) \ - M(ArenaAllocChunks) \ - M(ArenaAllocBytes) \ - M(FunctionExecute) \ - M(MarkCacheHits) \ - M(MarkCacheMisses) \ - M(CreatedReadBufferOrdinary) \ - M(CreatedReadBufferAIO) \ - M(CreatedWriteBufferOrdinary) \ - M(CreatedWriteBufferAIO) \ - \ - M(ReplicatedPartFetches) \ - M(ReplicatedPartFailedFetches) \ - M(ObsoleteReplicatedParts) \ - M(ReplicatedPartMerges) \ - M(ReplicatedPartFetchesOfMerged) \ - M(ReplicatedPartChecks) \ - M(ReplicatedPartChecksFailed) \ - M(ReplicatedDataLoss) \ - \ - M(DelayedInserts) \ - M(RejectedInserts) \ - M(DelayedInsertsMilliseconds) \ - M(SynchronousMergeOnInsert) \ - \ - M(ZooKeeperInit) \ - M(ZooKeeperTransactions) \ - M(ZooKeeperGetChildren) \ - M(ZooKeeperCreate) \ - M(ZooKeeperRemove) \ - M(ZooKeeperExists) \ - M(ZooKeeperGet) \ - M(ZooKeeperSet) \ - M(ZooKeeperMulti) \ - M(ZooKeeperExceptions) \ - \ - M(DistributedConnectionFailTry) \ - M(DistributedConnectionFailAtAll) \ - \ - M(CompileAttempt) \ - M(CompileSuccess) \ - \ - M(ExternalSortWritePart) \ - M(ExternalSortMerge) \ - M(ExternalAggregationWritePart) \ - M(ExternalAggregationMerge) \ - M(ExternalAggregationCompressedBytes) \ - M(ExternalAggregationUncompressedBytes) \ - \ - M(SlowRead) \ - M(ReadBackoff) \ - \ - M(ReplicaYieldLeadership) \ - M(ReplicaPartialShutdown) \ - \ - M(SelectedParts) \ - M(SelectedRanges) \ - M(SelectedMarks) \ - \ - M(MergedRows) \ - M(MergedUncompressedBytes) \ - \ - M(MergeTreeDataWriterRows) \ - M(MergeTreeDataWriterUncompressedBytes) \ - M(MergeTreeDataWriterCompressedBytes) \ - M(MergeTreeDataWriterBlocks) \ - M(MergeTreeDataWriterBlocksAlreadySorted) \ - \ - M(END) - namespace ProfileEvents { - /// Виды событий. - enum Event - { - #define M(NAME) NAME, - APPLY_FOR_EVENTS(M) - #undef M - }; + /// Event identifier (index in array). + using Event = size_t; + using Count = size_t; + /// Get text description of event by identifier. Returns statically allocated string. + const char * getDescription(Event event); - /// Получить текстовое описание события по его enum-у. - inline const char * getDescription(Event event) - { - static const char * descriptions[] = - { - #define M(NAME) #NAME, - APPLY_FOR_EVENTS(M) - #undef M - }; + /// Counters - how many times each event happened. + extern std::atomic counters[]; - return descriptions[event]; - } - - - /// Счётчики - сколько раз каждое из событий произошло. - extern std::atomic counters[END]; - - - /// Увеличить счётчик события. Потокобезопасно. - inline void increment(Event event, size_t amount = 1) + /// Increment a counter for event. Thread-safe. + inline void increment(Event event, Count amount = 1) { counters[event] += amount; } + + /// Get index just after last event identifier. + Event end(); } - - -#undef APPLY_FOR_EVENTS diff --git a/dbms/include/DB/DataStreams/AsynchronousBlockInputStream.h b/dbms/include/DB/DataStreams/AsynchronousBlockInputStream.h index 2e5c21df836..417770edaef 100644 --- a/dbms/include/DB/DataStreams/AsynchronousBlockInputStream.h +++ b/dbms/include/DB/DataStreams/AsynchronousBlockInputStream.h @@ -8,6 +8,11 @@ #include +namespace CurrentMetrics +{ + extern const Metric QueryThread; +} + namespace DB { diff --git a/dbms/include/DB/DataStreams/ParallelAggregatingBlockInputStream.h b/dbms/include/DB/DataStreams/ParallelAggregatingBlockInputStream.h index 6f7217a14f8..8a9a244544e 100644 --- a/dbms/include/DB/DataStreams/ParallelAggregatingBlockInputStream.h +++ b/dbms/include/DB/DataStreams/ParallelAggregatingBlockInputStream.h @@ -11,6 +11,12 @@ #include +namespace ProfileEvents +{ + extern const Event ExternalAggregationMerge; +} + + namespace DB { diff --git a/dbms/include/DB/DataStreams/ParallelInputsProcessor.h b/dbms/include/DB/DataStreams/ParallelInputsProcessor.h index afbd22ec073..cccceea0d60 100644 --- a/dbms/include/DB/DataStreams/ParallelInputsProcessor.h +++ b/dbms/include/DB/DataStreams/ParallelInputsProcessor.h @@ -13,17 +13,22 @@ #include -/** Позволяет обработать множество источников блоков параллельно, используя указанное количество потоков. - * Вынимает из любого доступного источника блок и передаёт его на обработку в предоставленный handler. +/** Allows to process multiple block input streams (sources) in parallel, using specified number of threads. + * Reads (pulls) blocks from any available source and passes it to specified handler. * - * Устроено так: - * - есть набор источников, из которых можно вынимать блоки; - * - есть набор потоков, которые могут одновременно вынимать блоки из разных источников; - * - "свободные" источники (с которыми сейчас не работает никакой поток) кладутся в очередь источников; - * - когда поток берёт источник для обработки, он удаляет его из очереди источников, - * вынимает из него блок, и затем кладёт источник обратно в очередь источников; + * Implemented in following way: + * - there are multiple input sources to read blocks from; + * - there are multiple threads, that could simultaneously read blocks from different sources; + * - "available" sources (that are not read in any thread right now) are put in queue of sources; + * - when thread take a source to read from, it removes source from queue of sources, + * then read block from source and then put source back to queue of available sources. */ +namespace CurrentMetrics +{ + extern const Metric QueryThread; +} + namespace DB { diff --git a/dbms/include/DB/Functions/FunctionsStringSearch.h b/dbms/include/DB/Functions/FunctionsStringSearch.h index 9c37329ddd1..29bb908647d 100644 --- a/dbms/include/DB/Functions/FunctionsStringSearch.h +++ b/dbms/include/DB/Functions/FunctionsStringSearch.h @@ -24,6 +24,12 @@ #include +namespace ProfileEvents +{ + extern const Event RegexpCreated; +} + + namespace DB { @@ -445,6 +451,7 @@ namespace Regexps if (no_capture) flags |= OptimizedRegularExpression::RE_NO_CAPTURE; + ProfileEvents::increment(ProfileEvents::RegexpCreated); return new Regexp{createRegexp(pattern, flags)}; }); } diff --git a/dbms/include/DB/IO/BufferWithOwnMemory.h b/dbms/include/DB/IO/BufferWithOwnMemory.h index 672046c8282..2fd84d7a182 100644 --- a/dbms/include/DB/IO/BufferWithOwnMemory.h +++ b/dbms/include/DB/IO/BufferWithOwnMemory.h @@ -9,13 +9,20 @@ #include +namespace ProfileEvents +{ + extern const Event IOBufferAllocs; + extern const Event IOBufferAllocBytes; +} + + namespace DB { -/** Замена std::vector для использования в буферах. - * Отличается тем, что не делает лишний memset. (И почти ничего не делает.) - * Также можно попросить выделять выровненный кусок памяти. +/** Replacement for std::vector to use in buffers. + * Differs in that is doesn't do unneeded memset. (And also tries to do as little as possible.) + * Also allows to allocate aligned piece of memory (to use with O_DIRECT, for example). */ struct Memory : boost::noncopyable, Allocator { @@ -26,7 +33,7 @@ struct Memory : boost::noncopyable, Allocator Memory() {} - /// Если alignment != 0, то будет выделяться память, выровненная на alignment. + /// If alignment != 0, then allocate memory aligned to specified value. Memory(size_t size_, size_t alignment_ = 0) : m_capacity(size_), m_size(m_capacity), alignment(alignment_) { alloc(); @@ -73,8 +80,7 @@ struct Memory : boost::noncopyable, Allocator else { new_size = align(new_size, alignment); - /// @todo pointer to void can be converted to pointer to any type with static_cast by ISO C++, reinterpret_cast has no advantages - m_data = reinterpret_cast(Allocator::realloc(m_data, m_capacity, new_size, alignment)); + m_data = static_cast(Allocator::realloc(m_data, m_capacity, new_size, alignment)); m_capacity = new_size; m_size = m_capacity; } @@ -101,8 +107,7 @@ private: ProfileEvents::increment(ProfileEvents::IOBufferAllocBytes, m_capacity); size_t new_capacity = align(m_capacity, alignment); - /// @todo pointer to void can be converted to pointer to any type with static_cast by ISO C++, reinterpret_cast has no advantages - m_data = reinterpret_cast(Allocator::alloc(new_capacity, alignment)); + m_data = static_cast(Allocator::alloc(new_capacity, alignment)); m_capacity = new_capacity; m_size = m_capacity; } @@ -112,15 +117,14 @@ private: if (!m_data) return; - /// @todo pointer to any type can be implicitly converted to pointer to void, no cast required - Allocator::free(reinterpret_cast(m_data), m_capacity); - m_data = nullptr; /// Чтобы избежать double free, если последующий вызов alloc кинет исключение. + Allocator::free(m_data, m_capacity); + m_data = nullptr; /// To avoid double free if next alloc will throw an exception. } }; -/** Буфер, который может сам владеть своим куском памяти для работы. - * Аргумент шаблона - ReadBuffer или WriteBuffer +/** Buffer that could own its working memory. + * Template parameter: ReadBuffer or WriteBuffer */ template class BufferWithOwnMemory : public Base @@ -128,7 +132,7 @@ class BufferWithOwnMemory : public Base protected: Memory memory; public: - /// Если передать не-NULL existing_memory, то буфер не будет создавать свой кусок памяти, а будет использовать существующий (и не будет им владеть). + /// If non-nullptr 'existing_memory' is passed, then buffer will not create its own memory and will use existing_memory without ownership. BufferWithOwnMemory(size_t size = DBMS_DEFAULT_BUFFER_SIZE, char * existing_memory = nullptr, size_t alignment = 0) : Base(nullptr, 0), memory(existing_memory ? 0 : size, alignment) { diff --git a/dbms/include/DB/IO/CompressedReadBufferBase.h b/dbms/include/DB/IO/CompressedReadBufferBase.h index 6a05d485240..7c8b8b9f8d1 100644 --- a/dbms/include/DB/IO/CompressedReadBufferBase.h +++ b/dbms/include/DB/IO/CompressedReadBufferBase.h @@ -21,6 +21,13 @@ #include +namespace ProfileEvents +{ + extern const Event ReadCompressedBytes; + extern const Event CompressedReadBufferBlocks; + extern const Event CompressedReadBufferBytes; +} + namespace DB { @@ -33,27 +40,31 @@ namespace ErrorCodes } +/** Basic functionality for implementation of + * CompressedReadBuffer, CompressedReadBufferFromFile and CachedCompressedReadBuffer. + */ class CompressedReadBufferBase { protected: ReadBuffer * compressed_in; - /// Если в буфере compressed_in помещается целый сжатый блок - используем его. Иначе - копируем данные по кусочкам в own_compressed_buffer. + /// If 'compressed_in' buffer has whole compressed block - than use it. Otherwise copy parts of data to 'own_compressed_buffer'. PODArray own_compressed_buffer{COMPRESSED_BLOCK_HEADER_SIZE}; + /// Points to memory, holding compressed block. char * compressed_buffer = nullptr; #ifdef USE_QUICKLZ std::unique_ptr qlz_state; #else - void * fixed_size_padding = nullptr; + void * fixed_size_padding = nullptr; /// ABI compatibility for USE_QUICKLZ #endif - /// Не проверять чексуммы. + /// Don't checksum on decompressing. bool disable_checksum = false; - /// Прочитать сжатые данные в compressed_buffer. Достать из их заголовка размер разжатых данных. Проверить чексумму. - /// Возвращает количество прочитанных байт. + /// Read compressed data into compressed_buffer. Get size of decompressed data from block header. Checksum if need. + /// Returns number of compressed bytes read. size_t readCompressedData(size_t & size_decompressed, size_t & size_compressed_without_checksum) { if (compressed_in->eof()) @@ -65,7 +76,7 @@ protected: own_compressed_buffer.resize(COMPRESSED_BLOCK_HEADER_SIZE); compressed_in->readStrict(&own_compressed_buffer[0], COMPRESSED_BLOCK_HEADER_SIZE); - UInt8 method = own_compressed_buffer[0]; /// См. CompressedWriteBuffer.h + UInt8 method = own_compressed_buffer[0]; /// See CompressedWriteBuffer.h size_t & size_compressed = size_compressed_without_checksum; @@ -91,7 +102,7 @@ protected: ProfileEvents::increment(ProfileEvents::ReadCompressedBytes, size_compressed + sizeof(checksum)); - /// Находится ли сжатый блок целиком в буфере compressed_in? + /// Is whole compressed block located in 'compressed_in' buffer? if (compressed_in->offset() >= COMPRESSED_BLOCK_HEADER_SIZE && compressed_in->position() + size_compressed - COMPRESSED_BLOCK_HEADER_SIZE <= compressed_in->buffer().end()) { @@ -117,7 +128,7 @@ protected: ProfileEvents::increment(ProfileEvents::CompressedReadBufferBlocks); ProfileEvents::increment(ProfileEvents::CompressedReadBufferBytes, size_decompressed); - UInt8 method = compressed_buffer[0]; /// См. CompressedWriteBuffer.h + UInt8 method = compressed_buffer[0]; /// See CompressedWriteBuffer.h if (method < 0x80) { @@ -149,15 +160,15 @@ protected: } public: - /// compressed_in можно инициализировать отложенно, но до первого вызова readCompressedData. + /// 'compressed_in' could be initialized lazily, but before first call of 'readCompressedData'. CompressedReadBufferBase(ReadBuffer * in = nullptr) : compressed_in(in) { } - /** Не проверять чексуммы. - * Может использоваться, например, в тех случаях, когда сжатые данные пишет клиент, - * который не умеет вычислять чексуммы, и вместо этого заполняет их нулями или чем угодно. + /** Disable checksums. + * For example, may be used when + * compressed data is generated by client, that cannot calculate checksums, and fill checksums with zeros instead. */ void disableChecksumming() { diff --git a/dbms/include/DB/IO/ReadBufferAIO.h b/dbms/include/DB/IO/ReadBufferAIO.h index 4270197c01c..cabd5989524 100644 --- a/dbms/include/DB/IO/ReadBufferAIO.h +++ b/dbms/include/DB/IO/ReadBufferAIO.h @@ -12,6 +12,12 @@ #include #include + +namespace CurrentMetrics +{ + extern const Metric OpenFileForRead; +} + namespace DB { diff --git a/dbms/include/DB/IO/ReadBufferFromFile.h b/dbms/include/DB/IO/ReadBufferFromFile.h index c859aa05294..3a9bdbc788c 100644 --- a/dbms/include/DB/IO/ReadBufferFromFile.h +++ b/dbms/include/DB/IO/ReadBufferFromFile.h @@ -6,6 +6,16 @@ #include +namespace ProfileEvents +{ + extern const Event FileOpen; +} + +namespace CurrentMetrics +{ + extern const Metric OpenFileForRead; +} + namespace DB { @@ -17,7 +27,8 @@ namespace ErrorCodes } -/** Принимает имя файла. Самостоятельно открывает и закрывает файл. +/** Accepts path to file and opens it, or pre-opened file descriptor. + * Closes file by himself (thus "owns" a file descriptor). */ class ReadBufferFromFile : public ReadBufferFromFileDescriptor { @@ -38,7 +49,7 @@ public: throwFromErrno("Cannot open file " + file_name, errno == ENOENT ? ErrorCodes::FILE_DOESNT_EXIST : ErrorCodes::CANNOT_OPEN_FILE); } - /// Использовать уже открытый файл. + /// Use pre-opened file descriptor. ReadBufferFromFile(int fd, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, int flags = -1, char * existing_memory = nullptr, size_t alignment = 0) : ReadBufferFromFileDescriptor(fd, buf_size, existing_memory, alignment), file_name("(fd = " + toString(fd) + ")") @@ -53,7 +64,7 @@ public: ::close(fd); } - /// Закрыть файл раньше вызова деструктора. + /// Close file before destruction of object. void close() { if (0 != ::close(fd)) diff --git a/dbms/include/DB/IO/ReadBufferFromFileDescriptor.h b/dbms/include/DB/IO/ReadBufferFromFileDescriptor.h index 01fcaa2e96d..d419f61195a 100644 --- a/dbms/include/DB/IO/ReadBufferFromFileDescriptor.h +++ b/dbms/include/DB/IO/ReadBufferFromFileDescriptor.h @@ -18,6 +18,18 @@ #include +namespace ProfileEvents +{ + extern const Event ReadBufferFromFileDescriptorRead; + extern const Event ReadBufferFromFileDescriptorReadBytes; + extern const Event Seek; +} + +namespace CurrentMetrics +{ + extern const Metric Read; +} + namespace DB { @@ -29,13 +41,13 @@ namespace ErrorCodes extern const int CANNOT_SELECT; } -/** Работает с готовым файловым дескриптором. Не открывает и не закрывает файл. +/** Use ready file descriptor. Does not open or close a file. */ class ReadBufferFromFileDescriptor : public ReadBufferFromFileBase { protected: int fd; - off_t pos_in_file; /// Какому сдвигу в файле соответствует working_buffer.end(). + off_t pos_in_file; /// What offset in file corresponds to working_buffer.end(). bool nextImpl() override { @@ -85,7 +97,7 @@ protected: return true; } - /// Имя или описание файла + /// Name or some description of file. std::string getFileName() const override { return "(fd = " + toString(fd) + ")"; @@ -108,7 +120,7 @@ public: private: - /// Если offset такой маленький, что мы не выйдем за пределы буфера, настоящий seek по файлу не делается. + /// If 'offset' is small enough to stay in buffer after seek, then true seek in file does not happen. off_t doSeek(off_t offset, int whence) override { off_t new_pos = offset; @@ -117,13 +129,13 @@ private: else if (whence != SEEK_SET) throw Exception("ReadBufferFromFileDescriptor::seek expects SEEK_SET or SEEK_CUR as whence", ErrorCodes::ARGUMENT_OUT_OF_BOUND); - /// Никуда не сдвинулись. + /// Position is unchanged. if (new_pos + (working_buffer.end() - pos) == pos_in_file) return new_pos; if (hasPendingData() && new_pos <= pos_in_file && new_pos >= pos_in_file - static_cast(working_buffer.size())) { - /// Остались в пределах буфера. + /// Position is still inside buffer. pos = working_buffer.begin() + (new_pos - (pos_in_file - working_buffer.size())); return new_pos; } @@ -141,7 +153,7 @@ private: } - /// При условии, что файловый дескриптор позволяет использовать select, проверяет в течение таймаута, есть ли данные для чтения. + /// Assuming file descriptor supports 'select', check that we have data to read or wait until timeout. bool poll(size_t timeout_microseconds) { fd_set fds; diff --git a/dbms/include/DB/IO/UncompressedCache.h b/dbms/include/DB/IO/UncompressedCache.h index 7159cf49a37..e2b12f42837 100644 --- a/dbms/include/DB/IO/UncompressedCache.h +++ b/dbms/include/DB/IO/UncompressedCache.h @@ -7,6 +7,13 @@ #include +namespace ProfileEvents +{ + extern const Event UncompressedCacheHits; + extern const Event UncompressedCacheMisses; + extern const Event UncompressedCacheWeightLost; +} + namespace DB { @@ -26,7 +33,7 @@ struct UncompressedSizeWeightFunction }; -/** Кэш разжатых блоков для CachedCompressedReadBuffer. thread-safe. +/** Cache of decompressed blocks for implementation of CachedCompressedReadBuffer. thread-safe. */ class UncompressedCache : public LRUCache { @@ -37,7 +44,7 @@ public: UncompressedCache(size_t max_size_in_bytes) : Base(max_size_in_bytes) {} - /// Посчитать ключ от пути к файлу и смещения. + /// Calculate key from path to file and offset. static UInt128 hash(const String & path_to_file, size_t offset) { UInt128 key; diff --git a/dbms/include/DB/IO/WriteBufferAIO.h b/dbms/include/DB/IO/WriteBufferAIO.h index 60d6ee7fdf7..d0f040fa6f3 100644 --- a/dbms/include/DB/IO/WriteBufferAIO.h +++ b/dbms/include/DB/IO/WriteBufferAIO.h @@ -11,6 +11,12 @@ #include #include + +namespace CurrentMetrics +{ + extern const Metric OpenFileForWrite; +} + namespace DB { @@ -32,12 +38,10 @@ public: int getFD() const override { return fd; } private: - /// void nextImpl() override; - /// off_t doSeek(off_t off, int whence) override; - /// void doTruncate(off_t length) override; + /// Если в буфере ещё остались данные - запишем их. void flush(); /// Ждать окончания текущей асинхронной задачи. diff --git a/dbms/include/DB/IO/WriteBufferFromFile.h b/dbms/include/DB/IO/WriteBufferFromFile.h index 601b69e7de0..bde37721e51 100644 --- a/dbms/include/DB/IO/WriteBufferFromFile.h +++ b/dbms/include/DB/IO/WriteBufferFromFile.h @@ -10,6 +10,16 @@ #include +namespace ProfileEvents +{ + extern const Event FileOpen; +} + +namespace CurrentMetrics +{ + extern const Metric OpenFileForWrite; +} + namespace DB { @@ -21,7 +31,8 @@ namespace ErrorCodes } -/** Принимает имя файла. Самостоятельно открывает и закрывает файл. +/** Accepts path to file and opens it, or pre-opened file descriptor. + * Closes file by himself (thus "owns" a file descriptor). */ class WriteBufferFromFile : public WriteBufferFromFileDescriptor { @@ -42,7 +53,7 @@ public: throwFromErrno("Cannot open file " + file_name, errno == ENOENT ? ErrorCodes::FILE_DOESNT_EXIST : ErrorCodes::CANNOT_OPEN_FILE); } - /// Использовать уже открытый файл. + /// Use pre-opened file descriptor. WriteBufferFromFile(int fd, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, int flags = -1, mode_t mode = 0666, char * existing_memory = nullptr, size_t alignment = 0) : WriteBufferFromFileDescriptor(fd, buf_size, existing_memory, alignment), file_name("(fd = " + toString(fd) + ")") @@ -66,7 +77,7 @@ public: ::close(fd); } - /// Закрыть файл раньше вызова деструктора. + /// Close file before destruction of object. void close() { next(); @@ -78,17 +89,6 @@ public: metric_increment.destroy(); } - /** fsync() transfers ("flushes") all modified in-core data of (i.e., modified buffer cache pages for) the file - * referred to by the file descriptor fd to the disk device (or other permanent storage device) - * so that all changed information can be retrieved even after the system crashed or was rebooted. - * This includes writing through or flushing a disk cache if present. The call blocks until the device - * reports that the transfer has completed. It also flushes metadata information associated with the file (see stat(2)). - * - man fsync */ - void sync() override - { - fsync(fd); - } - std::string getFileName() const override { return file_name; diff --git a/dbms/include/DB/IO/WriteBufferFromFileDescriptor.h b/dbms/include/DB/IO/WriteBufferFromFileDescriptor.h index 157659a57d8..248ecdf8a20 100644 --- a/dbms/include/DB/IO/WriteBufferFromFileDescriptor.h +++ b/dbms/include/DB/IO/WriteBufferFromFileDescriptor.h @@ -13,6 +13,17 @@ #include +namespace ProfileEvents +{ + extern const Event WriteBufferFromFileDescriptorWrite; + extern const Event WriteBufferFromFileDescriptorWriteBytes; +} + +namespace CurrentMetrics +{ + extern const Metric Write; +} + namespace DB { @@ -24,7 +35,7 @@ namespace ErrorCodes extern const int CANNOT_TRUNCATE_FILE; } -/** Работает с готовым файловым дескриптором. Не открывает и не закрывает файл. +/** Use ready file descriptor. Does not open or close a file. */ class WriteBufferFromFileDescriptor : public WriteBufferFromFileBase { @@ -57,7 +68,7 @@ protected: ProfileEvents::increment(ProfileEvents::WriteBufferFromFileDescriptorWriteBytes, bytes_written); } - /// Имя или описание файла + /// Name or some description of file. virtual std::string getFileName() const override { return "(fd = " + toString(fd) + ")"; @@ -67,8 +78,8 @@ public: WriteBufferFromFileDescriptor(int fd_ = -1, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, char * existing_memory = nullptr, size_t alignment = 0) : WriteBufferFromFileBase(buf_size, existing_memory, alignment), fd(fd_) {} - /** Можно вызывать для инициализации, если нужный fd не был передан в конструктор. - * Менять fd во время работы нельзя. + /** Could be used before initialization if needed 'fd' was not passed to constructor. + * It's not possible to change 'fd' during work. */ void setFD(int fd_) { @@ -100,10 +111,10 @@ public: void sync() override { - /// Если в буфере ещё остались данные - запишем их. + /// If buffer has pending data - write it. next(); - /// Попросим ОС сбросить данные на диск. + /// Request OS to sync data with storage medium. int res = fsync(fd); if (-1 == res) throwFromErrno("Cannot fsync " + getFileName(), ErrorCodes::CANNOT_FSYNC); diff --git a/dbms/include/DB/Interpreters/AsynchronousMetrics.h b/dbms/include/DB/Interpreters/AsynchronousMetrics.h new file mode 100644 index 00000000000..8c653374c7d --- /dev/null +++ b/dbms/include/DB/Interpreters/AsynchronousMetrics.h @@ -0,0 +1,54 @@ +#pragma once + +#include +#include +#include +#include +#include + + +namespace DB +{ + +class Context; + + +/** Periodically (each minute, starting at 30 seconds offset) + * calculates and updates some metrics, + * that are not updated automatically (so, need to be asynchronously calculated). + */ +class AsynchronousMetrics +{ +public: + AsynchronousMetrics(Context & context_) + : context(context_), thread([this] { run(); }) + { + } + + ~AsynchronousMetrics(); + + using Value = double; + using Container = std::unordered_map; + + /// Returns copy of all values. + Container getValues() const; + +private: + Context & context; + + bool quit {false}; + std::mutex wait_mutex; + std::condition_variable wait_cond; + + Container container; + mutable std::mutex container_mutex; + + std::thread thread; + + void run(); + void update(); + + void set(const std::string & name, Value value); +}; + +} diff --git a/dbms/include/DB/Interpreters/InterpreterSelectQuery.h b/dbms/include/DB/Interpreters/InterpreterSelectQuery.h index 09e16e9019f..1e6adec4943 100644 --- a/dbms/include/DB/Interpreters/InterpreterSelectQuery.h +++ b/dbms/include/DB/Interpreters/InterpreterSelectQuery.h @@ -99,7 +99,13 @@ private: /** Оставить в каждом запросе цепочки UNION ALL только нужные столбцы секции SELECT. * Однако, если используется хоть один DISTINCT в цепочке, то все столбцы считаются нужными, - * так как иначе DISTINCT работал бы по-другому. + * так как иначе DISTINCT работал бы по-другому. + * + * Always leave arrayJoin, because it changes number of rows. + * + * TODO If query doesn't have GROUP BY, but have aggregate functions, + * then leave at least one aggregate function, + * In order that fact of aggregation has not been lost. */ void rewriteExpressionList(const Names & required_column_names); diff --git a/dbms/include/DB/Interpreters/ProcessList.h b/dbms/include/DB/Interpreters/ProcessList.h index 500421344b8..cc95b1631a0 100644 --- a/dbms/include/DB/Interpreters/ProcessList.h +++ b/dbms/include/DB/Interpreters/ProcessList.h @@ -17,15 +17,20 @@ #include +namespace CurrentMetrics +{ + extern const Metric Query; +} + namespace DB { -/** Список исполняющихся в данный момент запросов. - * Также реализует ограничение на их количество. +/** List of currently executing queries. + * Also implements limit on their number. */ -/** Информационная составляющая элемента списка процессов. - * Для вывода в SHOW PROCESSLIST. Не содержит никаких сложных объектов, которые что-то делают при копировании или в деструкторах. +/** Information of process list element. + * To output in SHOW PROCESSLIST query. Does not contain any complex objects, that do something on copy or destructor. */ struct ProcessInfo { @@ -42,7 +47,7 @@ struct ProcessInfo }; -/// Запрос и данные о его выполнении. +/// Query and information about its execution. struct ProcessListElement { String query; @@ -63,7 +68,7 @@ struct ProcessListElement bool is_cancelled = false; - /// Здесь могут быть зарегистрированы временные таблицы. Изменять под mutex-ом. + /// Temporary tables could be registered here. Modify under mutex. Tables temporary_tables; @@ -91,7 +96,7 @@ struct ProcessListElement progress.incrementPiecewiseAtomically(value); if (priority_handle) - priority_handle->waitIfNeed(std::chrono::seconds(1)); /// NOTE Можно сделать настраиваемым таймаут. + priority_handle->waitIfNeed(std::chrono::seconds(1)); /// NOTE Could make timeout customizable. return !is_cancelled; } @@ -114,14 +119,14 @@ struct ProcessListElement }; -/// Данные о запросах одного пользователя. +/// Data about queries for one user. struct ProcessListForUser { /// Query_id -> ProcessListElement * using QueryToElement = std::unordered_map; QueryToElement queries; - /// Ограничение и счётчик памяти на все одновременно выполняющиеся запросы одного пользователя. + /// Limit and counter for memory of all simultaneously running queries of single user. MemoryTracker user_memory_tracker; }; @@ -129,7 +134,7 @@ struct ProcessListForUser class ProcessList; -/// Держит итератор на список, и удаляет элемент из списка в деструкторе. +/// Keeps iterator to process list and removes element in destructor. class ProcessListEntry { private: @@ -158,7 +163,7 @@ public: using Element = ProcessListElement; using Entry = ProcessListEntry; - /// list, чтобы итераторы не инвалидировались. NOTE: можно заменить на cyclic buffer, но почти незачем. + /// list, for iterators not to invalidate. NOTE: could replace with cyclic buffer, but not worth. using Container = std::list; using Info = std::vector; /// User -> queries @@ -166,15 +171,15 @@ public: private: mutable std::mutex mutex; - mutable Poco::Condition have_space; /// Количество одновременно выполняющихся запросов стало меньше максимального. + mutable Poco::Condition have_space; /// Number of currently running queries has become less than maximum. Container cont; - size_t cur_size; /// В C++03 std::list::size не O(1). - size_t max_size; /// Если 0 - не ограничено. Иначе, если пытаемся добавить больше - кидается исключение. + size_t cur_size; /// In C++03 or C++11 and old ABI, std::list::size is not O(1). + size_t max_size; /// 0 means no limit. Otherwise, when limit exceeded, an exception is thrown. UserToQueries user_to_queries; QueryPriorities priorities; - /// Ограничение и счётчик памяти на все одновременно выполняющиеся запросы. + /// Limit and counter for memory of all simultaneously running queries. MemoryTracker total_memory_tracker; public: @@ -182,17 +187,17 @@ public: using EntryPtr = std::shared_ptr; - /** Зарегистрировать выполняющийся запрос. Возвращает refcounted объект, который удаляет запрос из списка при уничтожении. - * Если выполняющихся запросов сейчас слишком много - ждать не более указанного времени. - * Если времени не хватило - кинуть исключение. + /** Register running query. Returns refcounted object, that will remove element from list in destructor. + * If too much running queries - wait for not more than specified (see settings) amount of time. + * If timeout is passed - throw an exception. */ EntryPtr insert(const String & query_, const String & user_, const String & query_id_, const Poco::Net::IPAddress & ip_address_, UInt16 port_, const Settings & settings); - /// Количество одновременно выполняющихся запросов. + /// Number of currently executing queries. size_t size() const { return cur_size; } - /// Получить текущее состояние списка запросов. + /// Get current state of process list. Info getInfo() const { std::lock_guard lock(mutex); @@ -211,10 +216,10 @@ public: max_size = max_size_; } - /// Зарегистрировать временную таблицу. Потом её можно будет получить по query_id и по названию. + /// Register temporary table. Then it is accessible by query_id and name. void addTemporaryTable(ProcessListElement & elem, const String & table_name, StoragePtr storage); - /// Найти временную таблицу по query_id и по названию. Замечание: плохо работает, если есть разные запросы с одним query_id. + /// Find temporary table by query_id and name. NOTE: doesn't work fine if there are many queries with same query_id. StoragePtr tryGetTemporaryTable(const String & query_id, const String & table_name) const; }; diff --git a/dbms/include/DB/Interpreters/QueryPriorities.h b/dbms/include/DB/Interpreters/QueryPriorities.h index 7e8f3a5817d..f2e83b90212 100644 --- a/dbms/include/DB/Interpreters/QueryPriorities.h +++ b/dbms/include/DB/Interpreters/QueryPriorities.h @@ -8,17 +8,26 @@ #include -/** Реализует приоритеты запросов. - * Позволяет приостанавливать выполнение запроса, если выполняется хотя бы один более приоритетный запрос. +namespace CurrentMetrics +{ + extern const Metric QueryPreempted; +} + + +namespace DB +{ + +/** Implements query priorities in very primitive way. + * Allows to freeze query execution if at least one query of higher priority is executed. * - * Величина приоритета - целое число, чем меньше - тем больше приоритет. + * Priority value is integer, smaller means higher priority. * - * Приоритет 0 считается особенным - запросы с таким приоритетом выполняются всегда, - * не зависят от других запросов и не влияют на другие запросы. - * То есть 0 означает - не использовать приоритеты. + * Priority 0 is special - queries with that priority is always executed, + * not depends on other queries and not affect other queries. + * Thus 0 means - don't use priorities. * - * NOTE Возможности сделать лучше: - * - реализовать ограничение на максимальное количество запросов с таким приоритетом. + * NOTE Possibilities for improvement: + * - implement limit on maximum number of running queries with same priority. */ class QueryPriorities { @@ -30,7 +39,7 @@ private: using Count = int; - /// Количество выполняющихся сейчас запросов с заданным приоритетом. + /// Number of currently running queries for each priority. using Container = std::map; std::mutex mutex; @@ -38,8 +47,8 @@ private: Container container; - /** Если есть более приоритетные запросы - спать, пока они не перестанут быть или не истечёт таймаут. - * Возвращает true, если более приоритетные запросы исчезли на момент возврата из функции, false, если истёк таймаут. + /** If there are higher priority queries - sleep until they are finish or timeout happens. + * Returns true, if higher priority queries has finished at return of function, false, if timout exceeded. */ template bool waitIfNeed(Priority priority, Duration timeout) @@ -103,8 +112,8 @@ public: using Handle = std::shared_ptr; - /** Зарегистрировать, что запрос с заданным приоритетом выполняется. - * Возвращается объект, в деструкторе которого, запись о запросе удаляется. + /** Register query with specified priority. + * Returns an object that remove record in destructor. */ Handle insert(Priority priority) { @@ -117,3 +126,5 @@ public: return std::make_shared(*this, *it); } }; + +} diff --git a/dbms/include/DB/Parsers/ASTSelectQuery.h b/dbms/include/DB/Parsers/ASTSelectQuery.h index e2df9063846..9ecdabddd32 100644 --- a/dbms/include/DB/Parsers/ASTSelectQuery.h +++ b/dbms/include/DB/Parsers/ASTSelectQuery.h @@ -33,7 +33,7 @@ public: void renameColumns(const ASTSelectQuery & source); /// Переписывает select_expression_list, чтобы вернуть только необходимые столбцы в правильном порядке. - void rewriteSelectExpressionList(const Names & column_names); + void rewriteSelectExpressionList(const Names & required_column_names); bool isUnionAllHead() const { return (prev_union_all == nullptr) && next_union_all != nullptr; } diff --git a/dbms/include/DB/Storages/Distributed/DirectoryMonitor.h b/dbms/include/DB/Storages/Distributed/DirectoryMonitor.h index 50bfec1def7..fca11d9458d 100644 --- a/dbms/include/DB/Storages/Distributed/DirectoryMonitor.h +++ b/dbms/include/DB/Storages/Distributed/DirectoryMonitor.h @@ -18,6 +18,11 @@ #include +namespace CurrentMetrics +{ + extern const Metric DistributedSend; +} + namespace DB { diff --git a/dbms/include/DB/Storages/MarkCache.h b/dbms/include/DB/Storages/MarkCache.h index 95561c02975..de13145c436 100644 --- a/dbms/include/DB/Storages/MarkCache.h +++ b/dbms/include/DB/Storages/MarkCache.h @@ -9,22 +9,28 @@ #include +namespace ProfileEvents +{ + extern const Event MarkCacheHits; + extern const Event MarkCacheMisses; +} namespace DB { -/// Оценка количества байтов, занимаемых засечками в кеше. +/// Estimate of number of bytes in cache for marks. struct MarksWeightFunction { size_t operator()(const MarksInCompressedFile & marks) const { - /// Можно еще добавить порядка 100 байт на накладные расходы вектора и кеша. + /// NOTE Could add extra 100 bytes for overhead of std::vector, cache structures and allocator. return marks.size() * sizeof(MarkInCompressedFile); } }; -/** Кэш засечек в столбце из StorageMergeTree. +/** Cache of 'marks' for StorageMergeTree. + * Marks is an index structure that addresses ranges in column file, corresponding to ranges of primary key. */ class MarkCache : public LRUCache { @@ -35,7 +41,7 @@ public: MarkCache(size_t max_size_in_bytes, const Delay & expiration_delay) : Base(max_size_in_bytes, expiration_delay) {} - /// Посчитать ключ от пути к файлу и смещения. + /// Calculate key from path to file and offset. static UInt128 hash(const String & path_to_file) { UInt128 key; diff --git a/dbms/include/DB/Storages/MergeTree/DiskSpaceMonitor.h b/dbms/include/DB/Storages/MergeTree/DiskSpaceMonitor.h index c402e9510ba..f444d76ca16 100644 --- a/dbms/include/DB/Storages/MergeTree/DiskSpaceMonitor.h +++ b/dbms/include/DB/Storages/MergeTree/DiskSpaceMonitor.h @@ -10,6 +10,12 @@ #include #include + +namespace CurrentMetrics +{ + extern const Metric DiskSpaceReservedForMerge; +} + namespace DB { @@ -20,10 +26,10 @@ namespace ErrorCodes } -/** Узнает количество свободного места в файловой системе. - * Можно "резервировать" место, чтобы разные операции могли согласованно планировать использование диска. - * Резервирования не разделяются по файловым системам. - * Вместо этого при запросе свободного места считается, что все резервирования сделаны в той же файловой системе. +/** Determines amount of free space in filesystem. + * Could "reserve" space, for different operations to plan disk space usage. + * Reservations are not separated for different filesystems, + * instead it is assumed, that all reservations are done within same filesystem. */ class DiskSpaceMonitor { @@ -61,7 +67,7 @@ public: } } - /// Изменить количество зарезервированного места. При увеличении не делается проверка, что места достаточно. + /// Change amount of reserved space. When new_size is greater than before, availability of free space is not checked. void update(size_t new_size) { std::lock_guard lock(DiskSpaceMonitor::mutex); @@ -99,7 +105,7 @@ public: size_t res = fs.f_bfree * fs.f_bsize; - /// Зарезервируем дополнительно 30 МБ. Когда я тестировал, statvfs показывал на несколько мегабайт больше свободного места, чем df. + /// Heuristic by Michael Kolupaev: reserve 30 MB more, because statvfs shows few megabytes more space than df. res -= std::min(res, 30 * (1ul << 20)); std::lock_guard lock(mutex); @@ -124,7 +130,7 @@ public: return reservation_count; } - /// Если места (приблизительно) недостаточно, бросает исключение. + /// If not enough (approximately) space, throw an exception. static ReservationPtr reserve(const std::string & path, size_t size) { size_t free_bytes = getUnreservedFreeSpace(path); diff --git a/dbms/include/DB/Storages/MergeTree/MergeList.h b/dbms/include/DB/Storages/MergeTree/MergeList.h index 9d9f1cec04e..9ce478e482f 100644 --- a/dbms/include/DB/Storages/MergeTree/MergeList.h +++ b/dbms/include/DB/Storages/MergeTree/MergeList.h @@ -7,6 +7,12 @@ #include #include + +namespace CurrentMetrics +{ + extern const Metric Merge; +} + namespace DB { diff --git a/dbms/include/DB/Storages/MergeTree/MergeTreeBlockOutputStream.h b/dbms/include/DB/Storages/MergeTree/MergeTreeBlockOutputStream.h index 79e02f60358..a098ba1397f 100644 --- a/dbms/include/DB/Storages/MergeTree/MergeTreeBlockOutputStream.h +++ b/dbms/include/DB/Storages/MergeTree/MergeTreeBlockOutputStream.h @@ -4,6 +4,12 @@ #include #include + +namespace ProfileEvents +{ + extern const Event SynchronousMergeOnInsert; +} + namespace DB { diff --git a/dbms/include/DB/Storages/MergeTree/MergeTreeReadPool.h b/dbms/include/DB/Storages/MergeTree/MergeTreeReadPool.h index 6c8469b8538..1a1d64317b4 100644 --- a/dbms/include/DB/Storages/MergeTree/MergeTreeReadPool.h +++ b/dbms/include/DB/Storages/MergeTree/MergeTreeReadPool.h @@ -6,6 +6,12 @@ #include +namespace ProfileEvents +{ + extern const Event SlowRead; + extern const Event ReadBackoff; +} + namespace DB { @@ -55,21 +61,21 @@ using MergeTreeReadTaskPtr = std::unique_ptr; class MergeTreeReadPool { public: - /** Пул может динамически уменьшать количество потоков, если чтения происходят медленно. - * Настройки порогов для такого уменьшения. + /** Pull could dynamically lower (backoff) number of threads, if read operation are too slow. + * Settings for that backoff. */ struct BackoffSettings { - /// Обращать внимания только на чтения, занявшие не меньше такого количества времени. Если выставлено в 0 - значит backoff выключен. + /// Pay attention only to reads, that took at least this amount of time. If set to 0 - means backoff is disabled. size_t min_read_latency_ms = 1000; - /// Считать события, когда пропускная способность меньше стольки байт в секунду. + /// Count events, when read throughput is less than specified bytes per second. size_t max_throughput = 1048576; - /// Не обращать внимания на событие, если от предыдущего прошло меньше стольки-то времени. + /// Do not pay attention to event, if not enough time passed since previous event. size_t min_interval_between_events_ms = 1000; - /// Количество событий, после которого количество потоков будет уменьшено. + /// Number of events to do backoff - to lower number of threads in pool. size_t min_events = 2; - /// Константы выше приведены лишь в качестве примера. + /// Constants above is just an example. BackoffSettings(const Settings & settings) : min_read_latency_ms(settings.read_backoff_min_latency_ms.totalMilliseconds()), max_throughput(settings.read_backoff_max_throughput), @@ -84,7 +90,7 @@ public: BackoffSettings backoff_settings; private: - /** Состояние для отслеживания скорости чтений. + /** State to track numbers of slow reads. */ struct BackoffState { @@ -118,7 +124,7 @@ public: { const std::lock_guard lock{mutex}; - /// Если количество потоков было уменьшено из-за backoff, то не будем отдавать задачи для более чем backoff_state.current_threads потоков. + /// If number of threads was lowered due to backoff, then will assign work only for maximum 'backoff_state.current_threads' threads. if (thread >= backoff_state.current_threads) return nullptr; @@ -138,23 +144,24 @@ public: auto & part = parts[part_idx]; auto & marks_in_part = thread_tasks.sum_marks_in_parts.back(); - /// Берём весь кусок, если он достаточно мал + /// Get whole part to read if it is small enough. auto need_marks = std::min(marks_in_part, min_marks_to_read); - /// Не будем оставлять в куске слишком мало строк. + /// Do not leave too little rows in part for next time. if (marks_in_part > need_marks && marks_in_part - need_marks < min_marks_to_read) need_marks = marks_in_part; MarkRanges ranges_to_get_from_part; - /// Возьмем весь кусок, если он достаточно мал. + /// Get whole part to read if it is small enough. if (marks_in_part <= need_marks) { const auto marks_to_get_from_range = marks_in_part; - /** Отрезки уже перечислены справа налево, reverse изначально сделан в MergeTreeDataSelectExecutor и - * поддержан в fillPerThreadInfo. */ + /** Ranges are in right-to-left order, because 'reverse' was done in MergeTreeDataSelectExecutor + * and that order is supported in 'fillPerThreadInfo'. + */ ranges_to_get_from_part = thread_task.ranges; marks_in_part -= marks_to_get_from_range; @@ -167,7 +174,7 @@ public: } else { - /// Цикл по отрезкам куска. + /// Loop through part ranges. while (need_marks > 0 && !thread_task.ranges.empty()) { auto & range = thread_task.ranges.back(); @@ -187,9 +194,9 @@ public: need_marks -= marks_to_get_from_range; } - /** Перечислим справа налево, чтобы MergeTreeThreadBlockInputStream забирал - * отрезки с помощью .pop_back() (их порядок был сменен на "слева направо" - * из-за .pop_back() в этой ветке). */ + /** Change order to right-to-left, for MergeTreeThreadBlockInputStream to get ranges with .pop_back() + * (order was changed to left-to-right due to .pop_back() above). + */ std::reverse(std::begin(ranges_to_get_from_part), std::end(ranges_to_get_from_part)); } @@ -199,9 +206,9 @@ public: per_part_remove_prewhere_column[part_idx], per_part_should_reorder[part_idx]); } - /** Каждый обработчик задач может вызвать этот метод, передав в него информацию о скорости чтения. - * Если скорость чтения слишком низкая, то пул может принять решение уменьшить число потоков - не отдавать больше задач в некоторые потоки. - * Это позволяет бороться с чрезмерной нагрузкой на дисковую подсистему в случаях, когда чтения осуществляются не из page cache. + /** Each worker could call this method and pass information about read performance. + * If read performance is too low, pool could decide to lower number of threads: do not assign more tasks to several threads. + * This allows to overcome excessive load to disk subsystem, when reads are not from page cache. */ void profileFeedback(const ReadBufferFromFileBase::ProfileInfo info) { @@ -254,9 +261,9 @@ private: { auto & part = parts[i]; - /// Посчитаем засечки для каждого куска. + /// Read marks for every data part. size_t sum_marks = 0; - /// Отрезки уже перечислены справа налево, reverse в MergeTreeDataSelectExecutor. + /// Ranges are in right-to-left order, due to 'reverse' in MergeTreeDataSelectExecutor. for (const auto & range : part.ranges) sum_marks += range.end - range.begin; @@ -291,8 +298,9 @@ private: const NameSet pre_name_set{ std::begin(required_pre_column_names), std::end(required_pre_column_names) }; - /** Если выражение в PREWHERE - не столбец таблицы, не нужно отдавать наружу столбец с ним - * (от storage ожидают получить только столбцы таблицы). */ + /** If expression in PREWHERE is not table column, then no need to return column with it to caller + * (because storage is expected only to read table columns). + */ per_part_remove_prewhere_column.push_back(0 == pre_name_set.count(prewhere_column_name)); Names post_column_names; @@ -309,8 +317,9 @@ private: if (check_columns) { - /** Под part->columns_lock проверим, что все запрошенные столбцы в куске того же типа, что в таблице. - * Это может быть не так во время ALTER MODIFY. */ + /** Under part->columns_lock check that all requested columns in part are of same type that in table. + * This could be violated during ALTER MODIFY. + */ if (!required_pre_column_names.empty()) data.check(part.data_part->columns, required_pre_column_names); if (!required_column_names.empty()) @@ -351,12 +360,12 @@ private: RangesInDataPart & part = parts.back(); size_t & marks_in_part = per_part_sum_marks.back(); - /// Не будем брать из куска слишком мало строк. + /// Do not get too few rows from part. if (marks_in_part >= min_marks_for_concurrent_read && need_marks < min_marks_for_concurrent_read) need_marks = min_marks_for_concurrent_read; - /// Не будем оставлять в куске слишком мало строк. + /// Do not leave too few rows in part for next time. if (marks_in_part > need_marks && marks_in_part - need_marks < min_marks_for_concurrent_read) need_marks = marks_in_part; @@ -364,10 +373,10 @@ private: MarkRanges ranges_to_get_from_part; size_t marks_in_ranges = need_marks; - /// Возьмем весь кусок, если он достаточно мал. + /// Get whole part to read if it is small enough. if (marks_in_part <= need_marks) { - /// Оставим отрезки перечисленными справа налево для удобства использования .pop_back() в .getTask() + /// Leave ranges in right-to-left order for convenience to use .pop_back() in .getTask() ranges_to_get_from_part = part.ranges; marks_in_ranges = marks_in_part; @@ -377,7 +386,7 @@ private: } else { - /// Цикл по отрезкам куска. + /// Loop through part ranges. while (need_marks > 0) { if (part.ranges.empty()) @@ -396,9 +405,9 @@ private: part.ranges.pop_back(); } - /** Вновь перечислим отрезки справа налево, чтобы .getTask() мог забирать их - * с помощью .pop_back() (их порядок был сменен на "слева направо" - * из-за .pop_back() в этой ветке). */ + /** Change order to right-to-left, for getTask() to get ranges with .pop_back() + * (order was changed to left-to-right due to .pop_back() above). + */ std::reverse(std::begin(ranges_to_get_from_part), std::end(ranges_to_get_from_part)); } @@ -410,10 +419,12 @@ private: } } + /** Если некоторых запрошенных столбцов нет в куске, - * то выясняем, какие столбцы может быть необходимо дополнительно прочитать, - * чтобы можно было вычислить DEFAULT выражение для этих столбцов. - * Добавляет их в columns. */ + * то выясняем, какие столбцы может быть необходимо дополнительно прочитать, + * чтобы можно было вычислить DEFAULT выражение для этих столбцов. + * Добавляет их в columns. + */ NameSet injectRequiredColumns(const MergeTreeData::DataPartPtr & part, Names & columns) const { NameSet required_columns{std::begin(columns), std::end(columns)}; diff --git a/dbms/include/DB/Storages/MergeTree/MergeTreeSettings.h b/dbms/include/DB/Storages/MergeTree/MergeTreeSettings.h index 03f67a28eb1..a389ef509c0 100644 --- a/dbms/include/DB/Storages/MergeTree/MergeTreeSettings.h +++ b/dbms/include/DB/Storages/MergeTree/MergeTreeSettings.h @@ -9,65 +9,65 @@ namespace DB { -/** Тонкие настройки работы MergeTree. - * Могут быть загружены из конфига. +/** Advanced settings of MergeTree. + * Could be loaded from config. */ struct MergeTreeSettings { - /** Настройки слияний. */ + /** Merge settings. */ - /// Опеределяет, насколько разбалансированные объединения мы готовы делать. - /// Чем больше, тем более разбалансированные. Желательно, чтобы было больше, чем 1 / max_parts_to_merge_at_once. + /// Determines how unbalanced merges we could do. + /// Bigger values for more unbalanced merges. It is advisable to be more than 1 / max_parts_to_merge_at_once. double size_ratio_coefficient_to_merge_parts = 0.25; - /// Сколько за раз сливать кусков. - /// Трудоемкость выбора кусков O(N * max_parts_to_merge_at_once). + /// How many parts could be merges at once. + /// Labour coefficient of parts selection O(N * max_parts_to_merge_at_once). size_t max_parts_to_merge_at_once = 10; - /// Но пока суммарный размер кусков слишком маленький (меньше такого количества байт), можно сливать и больше кусков за раз. - /// Это сделано, чтобы быстрее сливать очень уж маленькие куски, которых может быстро накопиться много. + /// But while total size of parts is too small(less than this number of bytes), we could merge more parts at once. + /// This is intentionally to allow quicker merge of too small parts, which could be accumulated too quickly. size_t merge_more_parts_if_sum_bytes_is_less_than = 100 * 1024 * 1024; size_t max_parts_to_merge_at_once_if_small = 100; - /// Куски настолько большого размера объединять нельзя вообще. + /// Parts of more than this bytes couldn't be merged at all. size_t max_bytes_to_merge_parts = 10ULL * 1024 * 1024 * 1024; - /// Не больше половины потоков одновременно могут выполнять слияния, в которых участвует хоть один кусок хотя бы такого размера. + /// No more than half of threads could execute merge of parts, if at least one part more than this size in bytes. size_t max_bytes_to_merge_parts_small = 250 * 1024 * 1024; - /// Куски настолько большого размера в сумме, объединять нельзя вообще. + /// Parts more than this size in bytes deny to merge at all. size_t max_sum_bytes_to_merge_parts = 25ULL * 1024 * 1024 * 1024; - /// Во столько раз ночью увеличиваем коэффициент. + /// How much times we increase the coefficient at night. size_t merge_parts_at_night_inc = 10; - /// Сколько заданий на слияние кусков разрешено одновременно иметь в очереди ReplicatedMergeTree. + /// How many tasks of merging parts are allowed simultaneously in ReplicatedMergeTree queue. size_t max_replicated_merges_in_queue = 6; - /// Через сколько секунд удалять ненужные куски. + /// How many seconds to keep obsolete parts. time_t old_parts_lifetime = 8 * 60; - /// Через сколько секунд удалять tmp_-директории. + /// How many seconds to keep tmp_-directories. time_t temporary_directories_lifetime = 86400; - /** Настройки вставок. */ + /** Inserts settings. */ - /// Если в таблице хотя бы столько активных кусков, искусственно замедлять вставки в таблицу. + /// If table contains at least that many active parts, artificially slow down insert into table. size_t parts_to_delay_insert = 150; - /// Если в таблице хотя бы столько активных кусков, выдавать ошибку 'Too much parts ...' + /// If more than this number active parts, throw 'Too much parts ...' exception size_t parts_to_throw_insert = 300; - /// Насколько секунд можно максимально задерживать вставку в таблицу типа MergeTree, если в ней много недомердженных кусков. + /// Max delay of inserting data into MergeTree table in seconds, if there are a lot of unmerged parts. size_t max_delay_to_insert = 200; - /** Настройки репликации. */ + /** Replication settings. */ - /// Для скольки последних блоков хранить хеши в ZooKeeper. + /// How many last blocks of hashes should be kept in ZooKeeper. size_t replicated_deduplication_window = 100; - /// Хранить примерно столько последних записей в логе в ZooKeeper, даже если они никому уже не нужны. - /// Не влияет на работу таблиц; используется только чтобы успеть посмотреть на лог в ZooKeeper глазами прежде, чем его очистят. + /// Keep about this number of last records in ZooKeeper log, even if they are obsolete. + /// It doesn't affect work of tables: used only to diagnose ZooKeeper log before cleaning. size_t replicated_logs_to_keep = 100; /// After specified amount of time passed after replication log entry creation @@ -77,35 +77,38 @@ struct MergeTreeSettings time_t prefer_fetch_merged_part_time_threshold = 3600; size_t prefer_fetch_merged_part_size_threshold = 10ULL * 1024 * 1024 * 1024; - /// Настройки минимального количества битых данных, при котором отказываться автоматически их удалять. + /// Max broken parts, if more - deny automatic deletion. size_t max_suspicious_broken_parts = 10; - /// Не выполнять ALTER, если количество файлов для модификации (удаления, добавления) больше указанного. + /// Not apply ALTER if number of files for modification(deletion, addition) more than this. size_t max_files_to_modify_in_alter_columns = 50; - /// Не выполнять ALTER, если количество файлов для удаления больше указанного. + /// Not apply ALTER, if number of files for deletion more than this. size_t max_files_to_remove_in_alter_columns = 10; - /// Максимальное количество ошибок при загрузке кусков, при котором ReplicatedMergeTree соглашается запускаться. + /// Maximum number of errors during parts loading, while ReplicatedMergeTree still allowed to start. size_t replicated_max_unexpected_parts = 3; size_t replicated_max_unexpectedly_merged_parts = 2; size_t replicated_max_missing_obsolete_parts = 5; size_t replicated_max_missing_active_parts = 20; - /// Если отношение количества ошибок к общему количеству кусков меньше указанного значения, то всё-равно можно запускаться. + /// If ration of wrong parts to total number of parts is less than this - allow to start anyway. double replicated_max_ratio_of_wrong_parts = 0.05; - /** Настройки проверки отставания реплик. */ + /// In seconds. + size_t zookeeper_session_expiration_check_period = 60; - /// Периодичность для проверки отставания и сравнения его с другими репликами. + /** Check delay of replicas settings. */ + + /// Period to check replication delay and compare with other replicas. size_t check_delay_period = 60; - /// Минимальное отставание от других реплик, при котором нужно уступить лидерство. Здесь и далее, если 0 - не ограничено. + /// Minimal delay from other replicas to yield leadership. Here and further 0 means unlimited. size_t min_relative_delay_to_yield_leadership = 120; - /// Минимальное отставание от других реплик, при котором нужно закрыться от запросов и не выдавать Ok для проверки статуса. + /// Minimal delay from other replicas to close, stop serving requests and not return Ok during status check. size_t min_relative_delay_to_close = 300; - /// Минимальное абсолютное отставание, при котором нужно закрыться от запросов и не выдавать Ok для проверки статуса. + /// Minimal absolute delay to close, stop serving requests and not return Ok during status check. size_t min_absolute_delay_to_close = 0; @@ -143,6 +146,7 @@ struct MergeTreeSettings SET_SIZE_T(replicated_max_missing_obsolete_parts); SET_SIZE_T(replicated_max_missing_active_parts); SET_DOUBLE(replicated_max_ratio_of_wrong_parts); + SET_SIZE_T(zookeeper_session_expiration_check_period); SET_SIZE_T(check_delay_period); SET_SIZE_T(min_relative_delay_to_yield_leadership); SET_SIZE_T(min_relative_delay_to_close); diff --git a/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreeQueue.h b/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreeQueue.h index bb0ada1de15..60fc3e950bb 100644 --- a/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreeQueue.h +++ b/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreeQueue.h @@ -157,7 +157,7 @@ public: * Если в процессе обработки было исключение - сохраняет его в entry. * Возвращает true, если в процессе обработки не было исключений. */ - bool processEntry(zkutil::ZooKeeperPtr zookeeper, LogEntryPtr & entry, const std::function func); + bool processEntry(std::function get_zookeeper, LogEntryPtr & entry, const std::function func); /// Будет ли кусок в будущем слит в более крупный (или мерджи кусков в данном диапазоне запрещены)? bool partWillBeMergedOrMergesDisabled(const String & part_name) const; diff --git a/dbms/include/DB/Storages/StorageReplicatedMergeTree.h b/dbms/include/DB/Storages/StorageReplicatedMergeTree.h index b9d3fa3eab1..934be72df63 100644 --- a/dbms/include/DB/Storages/StorageReplicatedMergeTree.h +++ b/dbms/include/DB/Storages/StorageReplicatedMergeTree.h @@ -488,19 +488,6 @@ private: using ReplicaToSpaceInfo = std::map; - struct PartitionMergeLockInfo - { - PartitionMergeLockInfo(const std::string & fake_part_name_) - : fake_part_name(fake_part_name_), ref_count(1) - { - } - - std::string fake_part_name; - unsigned int ref_count; - }; - - using PartitionToMergeLock = std::map; - /** Проверяет, что структуры локальной и реплицируемых таблиц совпадают. */ void enforceShardsConsistency(const WeightedZooKeeperPaths & weighted_zookeeper_paths); @@ -513,9 +500,6 @@ private: /** Проверяет, что имеется достаточно свободного места локально и на всех репликах. */ bool checkSpaceForResharding(const ReplicaToSpaceInfo & replica_to_space_info, size_t partition_size) const; - - std::mutex mutex_partition_to_merge_lock; - PartitionToMergeLock partition_to_merge_lock; }; diff --git a/dbms/include/DB/Storages/System/StorageSystemAsynchronousMetrics.h b/dbms/include/DB/Storages/System/StorageSystemAsynchronousMetrics.h new file mode 100644 index 00000000000..71741071944 --- /dev/null +++ b/dbms/include/DB/Storages/System/StorageSystemAsynchronousMetrics.h @@ -0,0 +1,44 @@ +#pragma once + +#include + +#include + + +namespace DB +{ + +class AsynchronousMetrics; + +/** Implements system table asynchronous_metrics, which allows to get values of periodically (asynchronously) updated metrics. + */ +class StorageSystemAsynchronousMetrics : private ext::shared_ptr_helper, public IStorage +{ +friend class ext::shared_ptr_helper; + +public: + static StoragePtr create(const std::string & name_, const AsynchronousMetrics & async_metrics_); + + std::string getName() const override { return "SystemAsynchronousMetrics"; } + std::string getTableName() const override { return name; } + + const NamesAndTypesList & getColumnsListImpl() const override { return columns; } + + BlockInputStreams read( + const Names & column_names, + ASTPtr query, + const Context & context, + const Settings & settings, + QueryProcessingStage::Enum & processed_stage, + size_t max_block_size = DEFAULT_BLOCK_SIZE, + unsigned threads = 1) override; + +private: + const std::string name; + NamesAndTypesList columns; + const AsynchronousMetrics & async_metrics; + + StorageSystemAsynchronousMetrics(const std::string & name_, const AsynchronousMetrics & async_metrics_); +}; + +} diff --git a/dbms/src/Client/Connection.cpp b/dbms/src/Client/Connection.cpp index 4a57ee84074..dc7f63bcdbe 100644 --- a/dbms/src/Client/Connection.cpp +++ b/dbms/src/Client/Connection.cpp @@ -24,6 +24,11 @@ #include +namespace CurrentMetrics +{ + extern const Metric SendExternalTables; +} + namespace DB { @@ -286,7 +291,7 @@ void Connection::sendQuery(const String & query, const String & query_id_, UInt6 block_in.reset(); block_out.reset(); - /// Если версия сервера достаточно новая и стоит флаг, отправляем пустой блок, символизируя конец передачи данных. + /// If server version is new enough, send empty block which meand end of data. if (server_revision >= DBMS_MIN_REVISION_WITH_TEMPORARY_TABLES && !with_pending_data) { sendData(Block()); diff --git a/dbms/src/Common/CurrentMetrics.cpp b/dbms/src/Common/CurrentMetrics.cpp index d3653b59af7..d78b5266fee 100644 --- a/dbms/src/Common/CurrentMetrics.cpp +++ b/dbms/src/Common/CurrentMetrics.cpp @@ -1,7 +1,56 @@ #include +/// Available metrics. Add something here as you wish. +#define APPLY_FOR_METRICS(M) \ + M(Query) \ + M(Merge) \ + M(ReplicatedFetch) \ + M(ReplicatedSend) \ + M(ReplicatedChecks) \ + M(BackgroundPoolTask) \ + M(DiskSpaceReservedForMerge) \ + M(DistributedSend) \ + M(QueryPreempted) \ + M(TCPConnection) \ + M(HTTPConnection) \ + M(InterserverConnection) \ + M(OpenFileForRead) \ + M(OpenFileForWrite) \ + M(Read) \ + M(Write) \ + M(SendExternalTables) \ + M(QueryThread) \ + M(ReadonlyReplica) \ + M(LeaderReplica) \ + M(MemoryTracking) \ + M(LeaderElection) \ + M(EphemeralNode) \ + M(ZooKeeperWatch) \ + + namespace CurrentMetrics { - std::atomic values[END] {}; /// Глобальная переменная - инициализируется нулями. + #define M(NAME) extern const Metric NAME = __COUNTER__; + APPLY_FOR_METRICS(M) + #undef M + constexpr Metric END = __COUNTER__; + + std::atomic values[END] {}; /// Global variable, initialized by zeros. + + const char * getDescription(Metric event) + { + static const char * descriptions[] = + { + #define M(NAME) #NAME, + APPLY_FOR_METRICS(M) + #undef M + }; + + return descriptions[event]; + } + + Metric end() { return END; } } + +#undef APPLY_FOR_METRICS diff --git a/dbms/src/Common/MemoryTracker.cpp b/dbms/src/Common/MemoryTracker.cpp index 2328021394d..696ccfdd862 100644 --- a/dbms/src/Common/MemoryTracker.cpp +++ b/dbms/src/Common/MemoryTracker.cpp @@ -9,12 +9,17 @@ #include +namespace CurrentMetrics +{ + extern const Metric MemoryTracking; +} + namespace DB { -namespace ErrorCodes -{ - extern const int MEMORY_LIMIT_EXCEEDED; -} + namespace ErrorCodes + { + extern const int MEMORY_LIMIT_EXCEEDED; + } } diff --git a/dbms/src/Common/ProfileEvents.cpp b/dbms/src/Common/ProfileEvents.cpp index ee6542737af..5f0cb2a7a4f 100644 --- a/dbms/src/Common/ProfileEvents.cpp +++ b/dbms/src/Common/ProfileEvents.cpp @@ -1,7 +1,124 @@ #include +/// Available events. Add something here as you wish. +#define APPLY_FOR_EVENTS(M) \ + M(Query) \ + M(SelectQuery) \ + M(InsertQuery) \ + M(FileOpen) \ + M(Seek) \ + M(ReadBufferFromFileDescriptorRead) \ + M(ReadBufferFromFileDescriptorReadBytes) \ + M(WriteBufferFromFileDescriptorWrite) \ + M(WriteBufferFromFileDescriptorWriteBytes) \ + M(ReadBufferAIORead) \ + M(ReadBufferAIOReadBytes) \ + M(WriteBufferAIOWrite) \ + M(WriteBufferAIOWriteBytes) \ + M(ReadCompressedBytes) \ + M(CompressedReadBufferBlocks) \ + M(CompressedReadBufferBytes) \ + M(UncompressedCacheHits) \ + M(UncompressedCacheMisses) \ + M(UncompressedCacheWeightLost) \ + M(IOBufferAllocs) \ + M(IOBufferAllocBytes) \ + M(ArenaAllocChunks) \ + M(ArenaAllocBytes) \ + M(FunctionExecute) \ + M(MarkCacheHits) \ + M(MarkCacheMisses) \ + M(CreatedReadBufferOrdinary) \ + M(CreatedReadBufferAIO) \ + M(CreatedWriteBufferOrdinary) \ + M(CreatedWriteBufferAIO) \ + \ + M(ReplicatedPartFetches) \ + M(ReplicatedPartFailedFetches) \ + M(ObsoleteReplicatedParts) \ + M(ReplicatedPartMerges) \ + M(ReplicatedPartFetchesOfMerged) \ + M(ReplicatedPartChecks) \ + M(ReplicatedPartChecksFailed) \ + M(ReplicatedDataLoss) \ + \ + M(DelayedInserts) \ + M(RejectedInserts) \ + M(DelayedInsertsMilliseconds) \ + M(SynchronousMergeOnInsert) \ + \ + M(ZooKeeperInit) \ + M(ZooKeeperTransactions) \ + M(ZooKeeperGetChildren) \ + M(ZooKeeperCreate) \ + M(ZooKeeperRemove) \ + M(ZooKeeperExists) \ + M(ZooKeeperGet) \ + M(ZooKeeperSet) \ + M(ZooKeeperMulti) \ + M(ZooKeeperExceptions) \ + \ + M(DistributedConnectionFailTry) \ + M(DistributedConnectionFailAtAll) \ + \ + M(CompileAttempt) \ + M(CompileSuccess) \ + \ + M(ExternalSortWritePart) \ + M(ExternalSortMerge) \ + M(ExternalAggregationWritePart) \ + M(ExternalAggregationMerge) \ + M(ExternalAggregationCompressedBytes) \ + M(ExternalAggregationUncompressedBytes) \ + \ + M(SlowRead) \ + M(ReadBackoff) \ + \ + M(ReplicaYieldLeadership) \ + M(ReplicaPartialShutdown) \ + \ + M(SelectedParts) \ + M(SelectedRanges) \ + M(SelectedMarks) \ + \ + M(MergedRows) \ + M(MergedUncompressedBytes) \ + \ + M(MergeTreeDataWriterRows) \ + M(MergeTreeDataWriterUncompressedBytes) \ + M(MergeTreeDataWriterCompressedBytes) \ + M(MergeTreeDataWriterBlocks) \ + M(MergeTreeDataWriterBlocksAlreadySorted) \ + \ + M(ObsoleteEphemeralNode) \ + M(CannotRemoveEphemeralNode) \ + M(LeaderElectionAcquiredLeadership) \ + \ + M(RegexpCreated) \ + namespace ProfileEvents { - std::atomic counters[END] {}; /// Глобальная переменная - инициализируется нулями. + #define M(NAME) extern const Event NAME = __COUNTER__; + APPLY_FOR_EVENTS(M) + #undef M + constexpr Event END = __COUNTER__; + + std::atomic counters[END] {}; /// Global variable, initialized by zeros. + + const char * getDescription(Event event) + { + static const char * descriptions[] = + { + #define M(NAME) #NAME, + APPLY_FOR_EVENTS(M) + #undef M + }; + + return descriptions[event]; + } + + Event end() { return END; } } + +#undef APPLY_FOR_EVENTS diff --git a/dbms/src/DataStreams/AggregatingBlockInputStream.cpp b/dbms/src/DataStreams/AggregatingBlockInputStream.cpp index 8b143f9be38..358ef7d5ed9 100644 --- a/dbms/src/DataStreams/AggregatingBlockInputStream.cpp +++ b/dbms/src/DataStreams/AggregatingBlockInputStream.cpp @@ -3,6 +3,11 @@ #include +namespace ProfileEvents +{ + extern const Event ExternalAggregationMerge; +} + namespace DB { diff --git a/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp b/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp index bc5e5559842..3ca1c0f10a8 100644 --- a/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp +++ b/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp @@ -6,6 +6,12 @@ #include +namespace ProfileEvents +{ + extern const Event ExternalSortWritePart; + extern const Event ExternalSortMerge; +} + namespace DB { diff --git a/dbms/src/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.cpp b/dbms/src/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.cpp index b0f2c299c56..be844f70115 100644 --- a/dbms/src/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.cpp +++ b/dbms/src/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.cpp @@ -4,6 +4,12 @@ #include +namespace CurrentMetrics +{ + extern const Metric QueryThread; +} + + namespace DB { diff --git a/dbms/src/IO/ReadBufferAIO.cpp b/dbms/src/IO/ReadBufferAIO.cpp index 968867f9159..8610abd1df5 100644 --- a/dbms/src/IO/ReadBufferAIO.cpp +++ b/dbms/src/IO/ReadBufferAIO.cpp @@ -9,6 +9,18 @@ #include +namespace ProfileEvents +{ + extern const Event FileOpen; + extern const Event ReadBufferAIORead; + extern const Event ReadBufferAIOReadBytes; +} + +namespace CurrentMetrics +{ + extern const Metric Read; +} + namespace DB { diff --git a/dbms/src/IO/WriteBufferAIO.cpp b/dbms/src/IO/WriteBufferAIO.cpp index 0c015d0109e..105492af3b2 100644 --- a/dbms/src/IO/WriteBufferAIO.cpp +++ b/dbms/src/IO/WriteBufferAIO.cpp @@ -5,6 +5,19 @@ #include #include + +namespace ProfileEvents +{ + extern const Event FileOpen; + extern const Event WriteBufferAIOWrite; + extern const Event WriteBufferAIOWriteBytes; +} + +namespace CurrentMetrics +{ + extern const Metric Write; +} + namespace DB { @@ -26,10 +39,10 @@ namespace ErrorCodes /// Примечание: выделяется дополнительная страница, которая содежрит те данные, которые /// не влезают в основной буфер. WriteBufferAIO::WriteBufferAIO(const std::string & filename_, size_t buffer_size_, int flags_, mode_t mode_, - char * existing_memory_) - : WriteBufferFromFileBase(buffer_size_ + DEFAULT_AIO_FILE_BLOCK_SIZE, existing_memory_, DEFAULT_AIO_FILE_BLOCK_SIZE), - flush_buffer(BufferWithOwnMemory(this->memory.size(), nullptr, DEFAULT_AIO_FILE_BLOCK_SIZE)), - filename(filename_) + char * existing_memory_) + : WriteBufferFromFileBase(buffer_size_ + DEFAULT_AIO_FILE_BLOCK_SIZE, existing_memory_, DEFAULT_AIO_FILE_BLOCK_SIZE), + flush_buffer(BufferWithOwnMemory(this->memory.size(), nullptr, DEFAULT_AIO_FILE_BLOCK_SIZE)), + filename(filename_) { /// Исправить информацию о размере буферов, чтобы дополнительные страницы не касались базового класса BufferBase. this->buffer().resize(this->buffer().size() - DEFAULT_AIO_FILE_BLOCK_SIZE); diff --git a/dbms/src/IO/createReadBufferFromFileBase.cpp b/dbms/src/IO/createReadBufferFromFileBase.cpp index 67626496f43..ac50e326b6a 100644 --- a/dbms/src/IO/createReadBufferFromFileBase.cpp +++ b/dbms/src/IO/createReadBufferFromFileBase.cpp @@ -3,6 +3,13 @@ #include #include + +namespace ProfileEvents +{ + extern const Event CreatedReadBufferOrdinary; + extern const Event CreatedReadBufferAIO; +} + namespace DB { diff --git a/dbms/src/IO/createWriteBufferFromFileBase.cpp b/dbms/src/IO/createWriteBufferFromFileBase.cpp index fa463ade50e..ac2906d6c97 100644 --- a/dbms/src/IO/createWriteBufferFromFileBase.cpp +++ b/dbms/src/IO/createWriteBufferFromFileBase.cpp @@ -3,6 +3,13 @@ #include #include + +namespace ProfileEvents +{ + extern const Event CreatedWriteBufferOrdinary; + extern const Event CreatedWriteBufferAIO; +} + namespace DB { diff --git a/dbms/src/Interpreters/Aggregator.cpp b/dbms/src/Interpreters/Aggregator.cpp index c2ec7a4c24f..e837c97e285 100644 --- a/dbms/src/Interpreters/Aggregator.cpp +++ b/dbms/src/Interpreters/Aggregator.cpp @@ -22,6 +22,18 @@ #include +namespace ProfileEvents +{ + extern const Event ExternalAggregationWritePart; + extern const Event ExternalAggregationCompressedBytes; + extern const Event ExternalAggregationUncompressedBytes; +} + +namespace CurrentMetrics +{ + extern const Metric QueryThread; +} + namespace DB { diff --git a/dbms/src/Interpreters/AsynchronousMetrics.cpp b/dbms/src/Interpreters/AsynchronousMetrics.cpp new file mode 100644 index 00000000000..00b2ec84d43 --- /dev/null +++ b/dbms/src/Interpreters/AsynchronousMetrics.cpp @@ -0,0 +1,231 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#ifndef NO_TCMALLOC + #include + + /// Initializing malloc extension in global constructor as required. + struct MallocExtensionInitializer + { + MallocExtensionInitializer() + { + MallocExtension::Initialize(); + } + } malloc_extension_initializer; +#endif + + +namespace DB +{ + +AsynchronousMetrics::~AsynchronousMetrics() +{ + try + { + { + std::lock_guard lock{wait_mutex}; + quit = true; + } + + wait_cond.notify_one(); + thread.join(); + } + catch (...) + { + DB::tryLogCurrentException(__PRETTY_FUNCTION__); + } +} + + +AsynchronousMetrics::Container AsynchronousMetrics::getValues() const +{ + std::lock_guard lock{container_mutex}; + return container; +} + + +void AsynchronousMetrics::set(const std::string & name, Value value) +{ + std::lock_guard lock{container_mutex}; + container[name] = value; +} + + +void AsynchronousMetrics::run() +{ + setThreadName("AsyncMetrics"); + + std::unique_lock lock{wait_mutex}; + + /// Next minute + 30 seconds. To be distant with moment of transmission of metrics, see MetricsTransmitter. + const auto get_next_minute = [] + { + return std::chrono::time_point_cast( + std::chrono::system_clock::now() + std::chrono::minutes(1)) + std::chrono::seconds(30); + }; + + while (true) + { + if (wait_cond.wait_until(lock, get_next_minute(), [this] { return quit; })) + break; + + try + { + update(); + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__); + } + } +} + + +template +static void calculateMax(Max & max, T x) +{ + if (Max(x) > max) + max = x; +} + +template +static void calculateMaxAndSum(Max & max, Sum & sum, T x) +{ + sum += x; + if (Max(x) > max) + max = x; +} + + +void AsynchronousMetrics::update() +{ + { + if (auto mark_cache = context.getMarkCache()) + { + set("MarkCacheBytes", mark_cache->weight()); + set("MarkCacheFiles", mark_cache->count()); + } + } + + { + if (auto uncompressed_cache = context.getUncompressedCache()) + { + set("UncompressedCacheBytes", uncompressed_cache->weight()); + set("UncompressedCacheCells", uncompressed_cache->count()); + } + } + + { + auto databases = context.getDatabases(); + + size_t max_queue_size = 0; + size_t max_inserts_in_queue = 0; + size_t max_merges_in_queue = 0; + + size_t sum_queue_size = 0; + size_t sum_inserts_in_queue = 0; + size_t sum_merges_in_queue = 0; + + size_t max_absolute_delay = 0; + size_t max_relative_delay = 0; + + size_t max_part_count_for_partition = 0; + + for (const auto & db : databases) + { + for (auto iterator = db.second->getIterator(); iterator->isValid(); iterator->next()) + { + auto & table = iterator->table(); + StorageMergeTree * table_merge_tree = typeid_cast(table.get()); + StorageReplicatedMergeTree * table_replicated_merge_tree = typeid_cast(table.get()); + + if (table_replicated_merge_tree) + { + StorageReplicatedMergeTree::Status status; + table_replicated_merge_tree->getStatus(status, false); + + calculateMaxAndSum(max_queue_size, sum_queue_size, status.queue.queue_size); + calculateMaxAndSum(max_inserts_in_queue, sum_inserts_in_queue, status.queue.inserts_in_queue); + calculateMaxAndSum(max_merges_in_queue, sum_merges_in_queue, status.queue.merges_in_queue); + + try + { + time_t absolute_delay = 0; + time_t relative_delay = 0; + table_replicated_merge_tree->getReplicaDelays(absolute_delay, relative_delay); + + calculateMax(max_absolute_delay, absolute_delay); + calculateMax(max_relative_delay, relative_delay); + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__, + "Cannot get replica delay for table: " + backQuoteIfNeed(db.first) + "." + backQuoteIfNeed(iterator->name())); + } + + calculateMax(max_part_count_for_partition, table_replicated_merge_tree->getData().getMaxPartsCountForMonth()); + if (auto unreplicated_data = table_replicated_merge_tree->getUnreplicatedData()) + calculateMax(max_part_count_for_partition, unreplicated_data->getMaxPartsCountForMonth()); + } + + if (table_merge_tree) + { + calculateMax(max_part_count_for_partition, table_merge_tree->getData().getMaxPartsCountForMonth()); + } + } + } + + set("ReplicasMaxQueueSize", max_queue_size); + set("ReplicasMaxInsertsInQueue", max_inserts_in_queue); + set("ReplicasMaxMergesInQueue", max_merges_in_queue); + + set("ReplicasSumQueueSize", sum_queue_size); + set("ReplicasSumInsertsInQueue", sum_inserts_in_queue); + set("ReplicasSumMergesInQueue", sum_merges_in_queue); + + set("ReplicasMaxAbsoluteDelay", max_absolute_delay); + set("ReplicasMaxRelativeDelay", max_relative_delay); + + set("MaxPartCountForPartition", max_part_count_for_partition); + } + +#ifndef NO_TCMALLOC + { + /// tcmalloc related metrics. Remove if you switch to different allocator. + + MallocExtension & malloc_extension = *MallocExtension::instance(); + + auto malloc_metrics = + { + "generic.current_allocated_bytes", + "generic.heap_size", + "tcmalloc.current_total_thread_cache_bytes", + "tcmalloc.central_cache_free_bytes", + "tcmalloc.transfer_cache_free_bytes", + "tcmalloc.thread_cache_free_bytes", + "tcmalloc.pageheap_free_bytes", + "tcmalloc.pageheap_unmapped_bytes", + }; + + for (auto malloc_metric : malloc_metrics) + { + size_t value = 0; + if (malloc_extension.GetNumericProperty(malloc_metric, &value)) + set(malloc_metric, value); + } + } +#endif + + /// Add more metrics as you wish. +} + + +} diff --git a/dbms/src/Interpreters/Compiler.cpp b/dbms/src/Interpreters/Compiler.cpp index a58394819a6..3e2ebc58dde 100644 --- a/dbms/src/Interpreters/Compiler.cpp +++ b/dbms/src/Interpreters/Compiler.cpp @@ -14,6 +14,12 @@ #include +namespace ProfileEvents +{ + extern const Event CompileAttempt; + extern const Event CompileSuccess; +} + namespace DB { diff --git a/dbms/src/Interpreters/ExpressionActions.cpp b/dbms/src/Interpreters/ExpressionActions.cpp index 01db370d110..fc8cb73f7c8 100644 --- a/dbms/src/Interpreters/ExpressionActions.cpp +++ b/dbms/src/Interpreters/ExpressionActions.cpp @@ -8,6 +8,11 @@ #include +namespace ProfileEvents +{ + extern const Event FunctionExecute; +} + namespace DB { diff --git a/dbms/src/Interpreters/InterpreterInsertQuery.cpp b/dbms/src/Interpreters/InterpreterInsertQuery.cpp index 4f588537cc0..89b951d7b01 100644 --- a/dbms/src/Interpreters/InterpreterInsertQuery.cpp +++ b/dbms/src/Interpreters/InterpreterInsertQuery.cpp @@ -15,6 +15,12 @@ #include #include + +namespace ProfileEvents +{ + extern const Event InsertQuery; +} + namespace DB { diff --git a/dbms/src/Interpreters/InterpreterSelectQuery.cpp b/dbms/src/Interpreters/InterpreterSelectQuery.cpp index 23d9d814797..449909c416d 100644 --- a/dbms/src/Interpreters/InterpreterSelectQuery.cpp +++ b/dbms/src/Interpreters/InterpreterSelectQuery.cpp @@ -35,6 +35,11 @@ #include +namespace ProfileEvents +{ + extern const Event SelectQuery; +} + namespace DB { diff --git a/dbms/src/Interpreters/executeQuery.cpp b/dbms/src/Interpreters/executeQuery.cpp index f2faed4c41e..97f217d1420 100644 --- a/dbms/src/Interpreters/executeQuery.cpp +++ b/dbms/src/Interpreters/executeQuery.cpp @@ -20,6 +20,11 @@ #include +namespace ProfileEvents +{ + extern const Event Query; +} + namespace DB { diff --git a/dbms/src/Server/ConfigReloader.cpp b/dbms/src/Server/ConfigReloader.cpp index 4c56937b0a6..75716c6dec2 100644 --- a/dbms/src/Server/ConfigReloader.cpp +++ b/dbms/src/Server/ConfigReloader.cpp @@ -43,13 +43,17 @@ ConfigReloader::~ConfigReloader() { try { - LOG_DEBUG(log, "ConfigReloader::~ConfigReloader()"); - quit = true; + { + std::lock_guard lock{mutex}; + quit = true; + } + + cond.notify_one(); thread.join(); } catch (...) { - tryLogCurrentException("~ConfigReloader"); + DB::tryLogCurrentException(__PRETTY_FUNCTION__); } } @@ -58,9 +62,13 @@ void ConfigReloader::run() { setThreadName("ConfigReloader"); - while (!quit) + std::unique_lock lock{mutex}; + + while (true) { - std::this_thread::sleep_for(reload_interval); + if (cond.wait_for(lock, reload_interval, [this] { return quit; })) + break; + reloadIfNewer(false, false); } } diff --git a/dbms/src/Server/ConfigReloader.h b/dbms/src/Server/ConfigReloader.h index 73483d33946..b55a138fc62 100644 --- a/dbms/src/Server/ConfigReloader.h +++ b/dbms/src/Server/ConfigReloader.h @@ -5,7 +5,8 @@ #include #include #include -#include +#include +#include #include @@ -95,7 +96,9 @@ private: FilesChangesTracker last_main_config_files; FilesChangesTracker last_users_config_files; - std::atomic quit{false}; + bool quit {false}; + std::mutex mutex; + std::condition_variable cond; std::thread thread; Poco::Logger * log = &Logger::get("ConfigReloader"); diff --git a/dbms/src/Server/HTTPHandler.h b/dbms/src/Server/HTTPHandler.h index f4ebb06df0b..1259f935bc0 100644 --- a/dbms/src/Server/HTTPHandler.h +++ b/dbms/src/Server/HTTPHandler.h @@ -5,6 +5,11 @@ #include "Server.h" +namespace CurrentMetrics +{ + extern const Metric HTTPConnection; +} + namespace DB { @@ -21,7 +26,7 @@ public: struct Output { std::shared_ptr out; - /// Используется для выдачи ответа. Равен либо out, либо CompressedWriteBuffer(*out), в зависимости от настроек. + /// Used for sending response. Points to 'out', or to CompressedWriteBuffer(*out), depending on settings. std::shared_ptr out_maybe_compressed; }; @@ -38,7 +43,7 @@ private: Logger * log; - /// Функция также инициализирует used_output. + /// Also initializes 'used_output'. void processQuery( Poco::Net::HTTPServerRequest & request, HTMLForm & params, diff --git a/dbms/src/Server/InterserverIOHTTPHandler.h b/dbms/src/Server/InterserverIOHTTPHandler.h index 28dea53ee79..1b8e95dba32 100644 --- a/dbms/src/Server/InterserverIOHTTPHandler.h +++ b/dbms/src/Server/InterserverIOHTTPHandler.h @@ -4,6 +4,11 @@ #include +namespace CurrentMetrics +{ + extern const Metric InterserverConnection; +} + namespace DB { diff --git a/dbms/src/Server/MetricsTransmitter.cpp b/dbms/src/Server/MetricsTransmitter.cpp index 252daeefe44..a21e45da243 100644 --- a/dbms/src/Server/MetricsTransmitter.cpp +++ b/dbms/src/Server/MetricsTransmitter.cpp @@ -3,6 +3,7 @@ #include #include #include +#include namespace DB @@ -23,7 +24,7 @@ MetricsTransmitter::~MetricsTransmitter() } catch (...) { - DB::tryLogCurrentException(__FUNCTION__); + DB::tryLogCurrentException(__PRETTY_FUNCTION__); } } @@ -32,12 +33,15 @@ void MetricsTransmitter::run() { setThreadName("MetricsTransmit"); - const auto get_next_minute = [] { + /// Next minute at 00 seconds. To avoid time drift and transmit values exactly each minute. + const auto get_next_minute = [] + { return std::chrono::time_point_cast( - std::chrono::system_clock::now() + std::chrono::minutes(1) - ); + std::chrono::system_clock::now() + std::chrono::minutes(1)); }; + ProfileEvents::Count prev_counters[ProfileEvents::end()] {}; + std::unique_lock lock{mutex}; while (true) @@ -45,32 +49,39 @@ void MetricsTransmitter::run() if (cond.wait_until(lock, get_next_minute(), [this] { return quit; })) break; - transmit(); + transmit(prev_counters); } } -void MetricsTransmitter::transmit() +void MetricsTransmitter::transmit(ProfileEvents::Count * prev_counters) { - GraphiteWriter::KeyValueVector key_vals{}; - key_vals.reserve(ProfileEvents::END + CurrentMetrics::END); + auto async_metrics_values = async_metrics.getValues(); - for (size_t i = 0; i < ProfileEvents::END; ++i) + GraphiteWriter::KeyValueVector key_vals{}; + key_vals.reserve(ProfileEvents::end() + CurrentMetrics::end() + async_metrics_values.size()); + + for (size_t i = 0, end = ProfileEvents::end(); i < end; ++i) { const auto counter = ProfileEvents::counters[i].load(std::memory_order_relaxed); - const auto counter_increment = counter - prev_counters[i].load(std::memory_order_relaxed); - prev_counters[i].store(counter, std::memory_order_relaxed); + const auto counter_increment = counter - prev_counters[i]; + prev_counters[i] = counter; std::string key {ProfileEvents::getDescription(static_cast(i))}; - key_vals.emplace_back(event_path_prefix + key, counter_increment); + key_vals.emplace_back(profile_events_path_prefix + key, counter_increment); } - for (size_t i = 0; i < CurrentMetrics::END; ++i) + for (size_t i = 0, end = CurrentMetrics::end(); i < end; ++i) { const auto value = CurrentMetrics::values[i].load(std::memory_order_relaxed); std::string key {CurrentMetrics::getDescription(static_cast(i))}; - key_vals.emplace_back(metrics_path_prefix + key, value); + key_vals.emplace_back(current_metrics_path_prefix + key, value); + } + + for (const auto & name_value : async_metrics_values) + { + key_vals.emplace_back(asynchronous_metrics_path_prefix + name_value.first, name_value.second); } BaseDaemon::instance().writeToGraphite(key_vals); diff --git a/dbms/src/Server/MetricsTransmitter.h b/dbms/src/Server/MetricsTransmitter.h index ffb01d83140..f8ee42d7f8e 100644 --- a/dbms/src/Server/MetricsTransmitter.h +++ b/dbms/src/Server/MetricsTransmitter.h @@ -10,30 +10,34 @@ namespace DB { +class AsynchronousMetrics; + /** Automatically sends * - difference of ProfileEvents; * - values of CurrentMetrics; + * - values of AsynchronousMetrics; * to Graphite at beginning of every minute. */ class MetricsTransmitter { public: + MetricsTransmitter(const AsynchronousMetrics & async_metrics_) : async_metrics(async_metrics_) {} ~MetricsTransmitter(); private: void run(); - void transmit(); + void transmit(ProfileEvents::Count * prev_counters); - /// Значения счётчиков при предыдущей отправке (или нули, если ни разу не отправляли). - decltype(ProfileEvents::counters) prev_counters{}; + const AsynchronousMetrics & async_metrics; bool quit = false; std::mutex mutex; std::condition_variable cond; std::thread thread {&MetricsTransmitter::run, this}; - static constexpr auto event_path_prefix = "ClickHouse.ProfileEvents."; - static constexpr auto metrics_path_prefix = "ClickHouse.Metrics."; + static constexpr auto profile_events_path_prefix = "ClickHouse.ProfileEvents."; + static constexpr auto current_metrics_path_prefix = "ClickHouse.Metrics."; + static constexpr auto asynchronous_metrics_path_prefix = "ClickHouse.AsynchronousMetrics."; }; } diff --git a/dbms/src/Server/Server.cpp b/dbms/src/Server/Server.cpp index 2bedcdfc8ce..767618d52d7 100644 --- a/dbms/src/Server/Server.cpp +++ b/dbms/src/Server/Server.cpp @@ -20,6 +20,7 @@ #include #include +#include #include #include @@ -38,6 +39,7 @@ #include #include #include +#include #include #include #include @@ -360,8 +362,6 @@ int Server::main(const std::vector & args) } SCOPE_EXIT( - LOG_DEBUG(log, "Closed all connections."); - /** Ask to cancel background jobs all table engines, * and also query_log. * It is important to do early, not in destructor of Context, because @@ -380,10 +380,6 @@ int Server::main(const std::vector & args) ); { - const auto metrics_transmitter = config().getBool("use_graphite", true) - ? std::make_unique() - : nullptr; - const std::string listen_host = config().getString("listen_host", "::"); Poco::Timespan keep_alive_timeout(config().getInt("keep_alive_timeout", 10), 0); @@ -480,12 +476,14 @@ int Server::main(const std::vector & args) LOG_DEBUG(log, "Waiting for current connections to close."); - config_reloader.reset(); - is_cancelled = true; http_server->stop(); tcp_server->stop(); + + LOG_DEBUG(log, "Closed all connections."); + + config_reloader.reset(); ); /// try to load dictionaries immediately, throw on error and die @@ -496,14 +494,23 @@ int Server::main(const std::vector & args) global_context->tryCreateDictionaries(); global_context->tryCreateExternalDictionaries(); } - - waitForTerminationRequest(); } catch (...) { LOG_ERROR(log, "Caught exception while loading dictionaries."); throw; } + + /// This object will periodically calculate some metrics. + AsynchronousMetrics async_metrics(*global_context); + + system_database->attachTable("asynchronous_metrics", StorageSystemAsynchronousMetrics::create("asynchronous_metrics", async_metrics)); + + const auto metrics_transmitter = config().getBool("use_graphite", true) + ? std::make_unique(async_metrics) + : nullptr; + + waitForTerminationRequest(); } return Application::EXIT_OK; diff --git a/dbms/src/Server/TCPHandler.h b/dbms/src/Server/TCPHandler.h index 7fe6631e7d9..8d601e039c0 100644 --- a/dbms/src/Server/TCPHandler.h +++ b/dbms/src/Server/TCPHandler.h @@ -14,6 +14,12 @@ #include "Server.h" +namespace CurrentMetrics +{ + extern const Metric TCPConnection; +} + + namespace DB { diff --git a/dbms/src/Server/config.xml b/dbms/src/Server/config.xml index 07eafced170..647bc87838c 100644 --- a/dbms/src/Server/config.xml +++ b/dbms/src/Server/config.xml @@ -147,7 +147,7 @@ 10000000000 0.01 - + zstd --> diff --git a/dbms/src/Storages/MergeTree/BackgroundProcessingPool.cpp b/dbms/src/Storages/MergeTree/BackgroundProcessingPool.cpp index 6dc826a200f..f1102346342 100644 --- a/dbms/src/Storages/MergeTree/BackgroundProcessingPool.cpp +++ b/dbms/src/Storages/MergeTree/BackgroundProcessingPool.cpp @@ -7,6 +7,12 @@ #include + +namespace CurrentMetrics +{ + extern const Metric BackgroundPoolTask; +} + namespace DB { diff --git a/dbms/src/Storages/MergeTree/DataPartsExchange.cpp b/dbms/src/Storages/MergeTree/DataPartsExchange.cpp index f4070eedb4b..236a0070b42 100644 --- a/dbms/src/Storages/MergeTree/DataPartsExchange.cpp +++ b/dbms/src/Storages/MergeTree/DataPartsExchange.cpp @@ -3,6 +3,12 @@ #include +namespace CurrentMetrics +{ + extern const Metric ReplicatedSend; + extern const Metric ReplicatedFetch; +} + namespace DB { diff --git a/dbms/src/Storages/MergeTree/MergeTreeData.cpp b/dbms/src/Storages/MergeTree/MergeTreeData.cpp index 7daa39a642a..68c533b79b6 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeData.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeData.cpp @@ -25,6 +25,13 @@ #include +namespace ProfileEvents +{ + extern const Event RejectedInserts; + extern const Event DelayedInserts; + extern const Event DelayedInsertsMilliseconds; +} + namespace DB { diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp index 87d17708f9d..8123b174464 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp @@ -16,6 +16,13 @@ #include #include + +namespace ProfileEvents +{ + extern const Event MergedRows; + extern const Event MergedUncompressedBytes; +} + namespace DB { diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index c8cac1fa67b..45a89921bf5 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -36,6 +36,14 @@ #include +namespace ProfileEvents +{ + extern const Event SelectedParts; + extern const Event SelectedRanges; + extern const Event SelectedMarks; +} + + namespace DB { diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataWriter.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataWriter.cpp index bf1b36e3346..3bf6049c3ee 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataWriter.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataWriter.cpp @@ -4,6 +4,16 @@ #include #include + +namespace ProfileEvents +{ + extern const Event MergeTreeDataWriterBlocks; + extern const Event MergeTreeDataWriterBlocksAlreadySorted; + extern const Event MergeTreeDataWriterRows; + extern const Event MergeTreeDataWriterUncompressedBytes; + extern const Event MergeTreeDataWriterCompressedBytes; +} + namespace DB { diff --git a/dbms/src/Storages/MergeTree/MergeTreePartChecker.cpp b/dbms/src/Storages/MergeTree/MergeTreePartChecker.cpp index c5dbec7e689..bd4634701f8 100644 --- a/dbms/src/Storages/MergeTree/MergeTreePartChecker.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreePartChecker.cpp @@ -15,6 +15,11 @@ #include +namespace CurrentMetrics +{ + extern const Metric ReplicatedChecks; +} + namespace DB { diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreePartCheckThread.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreePartCheckThread.cpp index 0636db23c18..d00828e5ced 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreePartCheckThread.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreePartCheckThread.cpp @@ -4,6 +4,13 @@ #include +namespace ProfileEvents +{ + extern const Event ReplicatedPartChecks; + extern const Event ReplicatedPartChecksFailed; + extern const Event ReplicatedDataLoss; +} + namespace DB { diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp index 1c9a36100fa..e18bc711226 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp @@ -577,14 +577,17 @@ ReplicatedMergeTreeQueue::SelectedEntry ReplicatedMergeTreeQueue::selectEntryToP } -bool ReplicatedMergeTreeQueue::processEntry(zkutil::ZooKeeperPtr zookeeper, LogEntryPtr & entry, const std::function func) +bool ReplicatedMergeTreeQueue::processEntry( + std::function get_zookeeper, + LogEntryPtr & entry, + const std::function func) { std::exception_ptr saved_exception; try { if (func(entry)) - remove(zookeeper, entry); + remove(get_zookeeper(), entry); } catch (...) { diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp index 8ebae41fea0..f56cc9245f8 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp @@ -7,6 +7,19 @@ #include +namespace ProfileEvents +{ + extern const Event ReplicaYieldLeadership; + extern const Event ReplicaPartialShutdown; +} + +namespace CurrentMetrics +{ + extern const Metric ReadonlyReplica; + extern const Metric LeaderReplica; +} + + namespace DB { @@ -37,10 +50,10 @@ void ReplicatedMergeTreeRestartingThread::run() constexpr auto retry_period_ms = 10 * 1000; /// Периодичность проверки истечения сессии в ZK. - time_t check_period_ms = 60 * 1000; + Int64 check_period_ms = storage.data.settings.zookeeper_session_expiration_check_period * 1000; /// Периодичность проверки величины отставания реплики. - if (check_period_ms > static_cast(storage.data.settings.check_delay_period) * 1000) + if (check_period_ms > static_cast(storage.data.settings.check_delay_period) * 1000) check_period_ms = storage.data.settings.check_delay_period * 1000; setThreadName("ReplMTRestart"); @@ -107,7 +120,9 @@ void ReplicatedMergeTreeRestartingThread::run() time_t relative_delay = 0; storage.getReplicaDelays(absolute_delay, relative_delay); - LOG_TRACE(log, "Absolute delay: " << absolute_delay << ". Relative delay: " << relative_delay << "."); + + if (absolute_delay) + LOG_TRACE(log, "Absolute delay: " << absolute_delay << ". Relative delay: " << relative_delay << "."); prev_time_of_check_delay = current_time; @@ -123,6 +138,7 @@ void ReplicatedMergeTreeRestartingThread::run() if (storage.is_leader_node) { storage.is_leader_node = false; + CurrentMetrics::sub(CurrentMetrics::LeaderReplica); if (storage.merge_selecting_thread.joinable()) storage.merge_selecting_thread.join(); @@ -187,7 +203,7 @@ bool ReplicatedMergeTreeRestartingThread::tryStartup() storage.zookeeper_path + "/leader_election", *storage.current_zookeeper, /// current_zookeeper живёт в течение времени жизни leader_election, /// так как до изменения current_zookeeper, объект leader_election уничтожается в методе partialShutdown. - [this] { storage.becomeLeader(); }, + [this] { storage.becomeLeader(); CurrentMetrics::add(CurrentMetrics::LeaderReplica); }, storage.replica_name); /// Все, что выше, может бросить KeeperException, если что-то не так с ZK. @@ -358,6 +374,7 @@ void ReplicatedMergeTreeRestartingThread::partialShutdown() if (storage.is_leader_node) { storage.is_leader_node = false; + CurrentMetrics::sub(CurrentMetrics::LeaderReplica); if (storage.merge_selecting_thread.joinable()) storage.merge_selecting_thread.join(); } diff --git a/dbms/src/Storages/MergeTree/ShardedPartitionUploader.cpp b/dbms/src/Storages/MergeTree/ShardedPartitionUploader.cpp index 5c9129d8f69..4797d2f3939 100644 --- a/dbms/src/Storages/MergeTree/ShardedPartitionUploader.cpp +++ b/dbms/src/Storages/MergeTree/ShardedPartitionUploader.cpp @@ -7,6 +7,13 @@ #include + +namespace CurrentMetrics +{ + extern const Metric ReplicatedSend; + extern const Metric ReplicatedFetch; +} + namespace DB { diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index 6e845af2cf4..88a9586ea48 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -47,6 +47,14 @@ #include +namespace ProfileEvents +{ + extern const Event ReplicatedPartMerges; + extern const Event ReplicatedPartFailedFetches; + extern const Event ReplicatedPartFetchesOfMerged; + extern const Event ObsoleteReplicatedParts; + extern const Event ReplicatedPartFetches; +} namespace DB { @@ -1029,8 +1037,6 @@ void StorageReplicatedMergeTree::pullLogsToQueue(zkutil::EventPtr next_update_ev bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry, BackgroundProcessingPool::Context & pool_context) { - auto zookeeper = getZooKeeper(); - if (entry.type == LogEntry::DROP_RANGE) { executeDropRange(entry); @@ -1045,7 +1051,7 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry, Backgro MergeTreeData::DataPartPtr containing_part = data.getActiveContainingPart(entry.new_part_name); /// Даже если кусок есть локально, его (в исключительных случаях) может не быть в zookeeper. Проверим, что он там есть. - if (containing_part && zookeeper->exists(replica_path + "/parts/" + containing_part->name)) + if (containing_part && getZooKeeper()->exists(replica_path + "/parts/" + containing_part->name)) { if (!(entry.type == LogEntry::GET_PART && entry.source_replica == replica_name)) LOG_DEBUG(log, "Skipping action for part " << entry.new_part_name << " - part already exists."); @@ -1057,7 +1063,7 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry, Backgro LOG_WARNING(log, "Part " << entry.new_part_name << " from own log doesn't exist."); /// Возможно, этот кусок нам не нужен, так как при записи с кворумом, кворум пофейлился (см. ниже про /quorum/failed_parts). - if (entry.quorum && zookeeper->exists(zookeeper_path + "/quorum/failed_parts/" + entry.new_part_name)) + if (entry.quorum && getZooKeeper()->exists(zookeeper_path + "/quorum/failed_parts/" + entry.new_part_name)) { LOG_DEBUG(log, "Skipping action for part " << entry.new_part_name << " because quorum for that part was failed."); return true; /// NOTE Удаление из virtual_parts не делается, но оно нужно только для мерджей. @@ -1270,6 +1276,8 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry, Backgro * Это позволит проследить, что реплики не стали активными. */ + auto zookeeper = getZooKeeper(); + Strings replicas = zookeeper->getChildren(zookeeper_path + "/replicas"); zkutil::Ops ops; @@ -1408,11 +1416,9 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry, Backgro void StorageReplicatedMergeTree::executeDropRange(const StorageReplicatedMergeTree::LogEntry & entry) { - auto zookeeper = getZooKeeper(); - LOG_INFO(log, (entry.detach ? "Detaching" : "Removing") << " parts inside " << entry.new_part_name << "."); - queue.removeGetsAndMergesInRange(zookeeper, entry.new_part_name); + queue.removeGetsAndMergesInRange(getZooKeeper(), entry.new_part_name); LOG_DEBUG(log, (entry.detach ? "Detaching" : "Removing") << " parts."); size_t removed_parts = 0; @@ -1437,7 +1443,7 @@ void StorageReplicatedMergeTree::executeDropRange(const StorageReplicatedMergeTr zkutil::Ops ops; removePartFromZooKeeper(part->name, ops); - auto code = zookeeper->tryMulti(ops); + auto code = getZooKeeper()->tryMulti(ops); /// Если кусок уже удалён (например, потому что он так и не был добавлен в ZK из-за сбоя, /// см. ReplicatedMergeTreeBlockOutputStream), то всё Ок. @@ -1455,8 +1461,6 @@ void StorageReplicatedMergeTree::executeDropRange(const StorageReplicatedMergeTr bool StorageReplicatedMergeTree::executeAttachPart(const StorageReplicatedMergeTree::LogEntry & entry) { - auto zookeeper = getZooKeeper(); - String source_path = (entry.attach_unreplicated ? "unreplicated/" : "detached/") + entry.source_part_name; LOG_INFO(log, "Attaching part " << entry.source_part_name << " from " << source_path << " as " << entry.new_part_name); @@ -1482,7 +1486,7 @@ bool StorageReplicatedMergeTree::executeAttachPart(const StorageReplicatedMergeT LOG_WARNING(log, "Unreplicated part " << entry.source_part_name << " is already detached"); } - zookeeper->multi(ops); + getZooKeeper()->multi(ops); /// NOTE: Не можем использовать renameTempPartAndAdd, потому что кусок не временный - если что-то пойдет не так, его не нужно удалять. part->renameTo(entry.new_part_name); @@ -1551,7 +1555,7 @@ bool StorageReplicatedMergeTree::queueTask(BackgroundProcessingPool::Context & p time_t prev_attempt_time = entry->last_attempt_time; - bool res = queue.processEntry(getZooKeeper(), entry, [&](LogEntryPtr & entry) + bool res = queue.processEntry([this]{ return getZooKeeper(); }, entry, [&](LogEntryPtr & entry) { try { @@ -2046,15 +2050,13 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Strin currently_fetching_parts.erase(part_name); ); - auto zookeeper = getZooKeeper(); - LOG_DEBUG(log, "Fetching part " << part_name << " from " << replica_path); TableStructureReadLockPtr table_lock; if (!to_detached) table_lock = lockStructure(true); - ReplicatedMergeTreeAddress address(zookeeper->get(replica_path + "/host")); + ReplicatedMergeTreeAddress address(getZooKeeper()->get(replica_path + "/host")); MergeTreeData::MutableDataPartPtr part = fetcher.fetchPart( part_name, replica_path, address.host, address.replication_port, to_detached); @@ -2073,7 +2075,7 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Strin MergeTreeData::Transaction transaction; auto removed_parts = data.renameTempPartAndReplace(part, nullptr, &transaction); - zookeeper->multi(ops); + getZooKeeper()->multi(ops); transaction.commit(); /** Если для этого куска отслеживается кворум, то надо его обновить. @@ -2334,8 +2336,6 @@ bool StorageReplicatedMergeTree::optimize(const String & partition, bool final, assertNotReadonly(); - auto zookeeper = getZooKeeper(); - if (!is_leader_node) throw Exception("Method OPTIMIZE for ReplicatedMergeTree could be called only on leader replica", ErrorCodes::NOT_IMPLEMENTED); @@ -2386,7 +2386,6 @@ void StorageReplicatedMergeTree::alter(const AlterCommands & params, { assertNotReadonly(); - auto zookeeper = getZooKeeper(); auto merge_blocker = merger.cancel(); auto unreplicated_merge_blocker = unreplicated_merger ? unreplicated_merger->cancel() : MergeTreeDataMerger::Blocker(); @@ -2425,7 +2424,7 @@ void StorageReplicatedMergeTree::alter(const AlterCommands & params, }.toString(); /// Делаем ALTER. - zookeeper->set(zookeeper_path + "/columns", new_columns_str, -1, &stat); + getZooKeeper()->set(zookeeper_path + "/columns", new_columns_str, -1, &stat); new_columns_version = stat.version; } @@ -2435,7 +2434,7 @@ void StorageReplicatedMergeTree::alter(const AlterCommands & params, /// Ждем, пока все реплики обновят данные. /// Подпишемся на изменения столбцов, чтобы перестать ждать, если кто-то еще сделает ALTER. - if (!zookeeper->exists(zookeeper_path + "/columns", &stat, alter_query_event)) + if (!getZooKeeper()->exists(zookeeper_path + "/columns", &stat, alter_query_event)) throw Exception(zookeeper_path + "/columns doesn't exist", ErrorCodes::NOT_FOUND_NODE); if (stat.version != new_columns_version) @@ -2445,7 +2444,7 @@ void StorageReplicatedMergeTree::alter(const AlterCommands & params, return; } - Strings replicas = zookeeper->getChildren(zookeeper_path + "/replicas"); + Strings replicas = getZooKeeper()->getChildren(zookeeper_path + "/replicas"); std::set inactive_replicas; std::set timed_out_replicas; @@ -2459,7 +2458,7 @@ void StorageReplicatedMergeTree::alter(const AlterCommands & params, while (!shutdown_called) { /// Реплика может быть неактивной. - if (!zookeeper->exists(zookeeper_path + "/replicas/" + replica + "/is_active")) + if (!getZooKeeper()->exists(zookeeper_path + "/replicas/" + replica + "/is_active")) { LOG_WARNING(log, "Replica " << replica << " is not active during ALTER query." " ALTER will be done asynchronously when replica becomes active."); @@ -2471,7 +2470,7 @@ void StorageReplicatedMergeTree::alter(const AlterCommands & params, String replica_columns_str; /// Реплику могли успеть удалить. - if (!zookeeper->tryGet(zookeeper_path + "/replicas/" + replica + "/columns", replica_columns_str, &stat)) + if (!getZooKeeper()->tryGet(zookeeper_path + "/replicas/" + replica + "/columns", replica_columns_str, &stat)) { LOG_WARNING(log, replica << " was removed"); break; @@ -2482,7 +2481,7 @@ void StorageReplicatedMergeTree::alter(const AlterCommands & params, if (replica_columns_str == new_columns_str) break; - if (!zookeeper->exists(zookeeper_path + "/columns", &stat)) + if (!getZooKeeper()->exists(zookeeper_path + "/columns", &stat)) throw Exception(zookeeper_path + "/columns doesn't exist", ErrorCodes::NOT_FOUND_NODE); if (stat.version != new_columns_version) @@ -2492,7 +2491,7 @@ void StorageReplicatedMergeTree::alter(const AlterCommands & params, return; } - if (!zookeeper->exists(zookeeper_path + "/replicas/" + replica + "/columns", &stat, alter_query_event)) + if (!getZooKeeper()->exists(zookeeper_path + "/replicas/" + replica + "/columns", &stat, alter_query_event)) { LOG_WARNING(log, replica << " was removed"); break; @@ -2617,24 +2616,23 @@ void StorageReplicatedMergeTree::dropPartition( assertNotReadonly(); - auto zookeeper = getZooKeeper(); String month_name = MergeTreeData::getMonthName(field); if (!is_leader_node) { /// Проксируем запрос в лидера. - auto live_replicas = zookeeper->getChildren(zookeeper_path + "/leader_election"); + auto live_replicas = getZooKeeper()->getChildren(zookeeper_path + "/leader_election"); if (live_replicas.empty()) throw Exception("No active replicas", ErrorCodes::NO_ACTIVE_REPLICAS); std::sort(live_replicas.begin(), live_replicas.end()); - const auto leader = zookeeper->get(zookeeper_path + "/leader_election/" + live_replicas.front()); + const auto leader = getZooKeeper()->get(zookeeper_path + "/leader_election/" + live_replicas.front()); if (leader == replica_name) throw Exception("Leader was suddenly changed or logical error.", ErrorCodes::LEADERSHIP_CHANGED); - ReplicatedMergeTreeAddress leader_address(zookeeper->get(zookeeper_path + "/replicas/" + leader + "/host")); + ReplicatedMergeTreeAddress leader_address(getZooKeeper()->get(zookeeper_path + "/replicas/" + leader + "/host")); auto new_query = query->clone(); auto & alter = typeid_cast(*new_query); @@ -2700,7 +2698,7 @@ void StorageReplicatedMergeTree::dropPartition( entry.source_replica = replica_name; entry.new_part_name = fake_part_name; entry.detach = detach; - String log_znode_path = zookeeper->create(zookeeper_path + "/log/log-", entry.toString(), zkutil::CreateMode::PersistentSequential); + String log_znode_path = getZooKeeper()->create(zookeeper_path + "/log/log-", entry.toString(), zkutil::CreateMode::PersistentSequential); entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1); entry.create_time = time(0); @@ -2719,7 +2717,6 @@ void StorageReplicatedMergeTree::attachPartition(ASTPtr query, const Field & fie { assertNotReadonly(); - auto zookeeper = getZooKeeper(); String partition; if (attach_part) @@ -2804,12 +2801,12 @@ void StorageReplicatedMergeTree::attachPartition(ASTPtr query, const Field & fie entry.create_time = time(0); ops.push_back(new zkutil::Op::Create( - zookeeper_path + "/log/log-", entry.toString(), zookeeper->getDefaultACL(), zkutil::CreateMode::PersistentSequential)); + zookeeper_path + "/log/log-", entry.toString(), getZooKeeper()->getDefaultACL(), zkutil::CreateMode::PersistentSequential)); } LOG_DEBUG(log, "Adding attaches to log"); - zookeeper->multi(ops); + getZooKeeper()->multi(ops); /// Если надо - дожидаемся выполнения операции на себе или на всех репликах. if (settings.replication_alter_partitions_sync != 0) @@ -2833,26 +2830,28 @@ void StorageReplicatedMergeTree::attachPartition(ASTPtr query, const Field & fie void StorageReplicatedMergeTree::drop() { - auto zookeeper = tryGetZooKeeper(); - - if (is_readonly || !zookeeper) - throw Exception("Can't drop readonly replicated table (need to drop data in ZooKeeper as well)", ErrorCodes::TABLE_IS_READ_ONLY); - - shutdown(); - - if (zookeeper->expired()) - throw Exception("Table was not dropped because ZooKeeper session has expired.", ErrorCodes::TABLE_WAS_NOT_DROPPED); - - LOG_INFO(log, "Removing replica " << replica_path); - replica_is_active_node = nullptr; - zookeeper->tryRemoveRecursive(replica_path); - - /// Проверяем, что zookeeper_path существует: его могла удалить другая реплика после выполнения предыдущей строки. - Strings replicas; - if (zookeeper->tryGetChildren(zookeeper_path + "/replicas", replicas) == ZOK && replicas.empty()) { - LOG_INFO(log, "Removing table " << zookeeper_path << " (this might take several minutes)"); - zookeeper->tryRemoveRecursive(zookeeper_path); + auto zookeeper = tryGetZooKeeper(); + + if (is_readonly || !zookeeper) + throw Exception("Can't drop readonly replicated table (need to drop data in ZooKeeper as well)", ErrorCodes::TABLE_IS_READ_ONLY); + + shutdown(); + + if (zookeeper->expired()) + throw Exception("Table was not dropped because ZooKeeper session has expired.", ErrorCodes::TABLE_WAS_NOT_DROPPED); + + LOG_INFO(log, "Removing replica " << replica_path); + replica_is_active_node = nullptr; + zookeeper->tryRemoveRecursive(replica_path); + + /// Проверяем, что zookeeper_path существует: его могла удалить другая реплика после выполнения предыдущей строки. + Strings replicas; + if (zookeeper->tryGetChildren(zookeeper_path + "/replicas", replicas) == ZOK && replicas.empty()) + { + LOG_INFO(log, "Removing table " << zookeeper_path << " (this might take several minutes)"); + zookeeper->tryRemoveRecursive(zookeeper_path); + } } data.dropAllData(); @@ -2883,8 +2882,7 @@ bool StorageReplicatedMergeTree::existsNodeCached(const std::string & path) return true; } - auto zookeeper = getZooKeeper(); - bool res = zookeeper->exists(path); + bool res = getZooKeeper()->exists(path); if (res) { @@ -2925,10 +2923,9 @@ AbandonableLockInZooKeeper StorageReplicatedMergeTree::allocateBlockNumber(const void StorageReplicatedMergeTree::waitForAllReplicasToProcessLogEntry(const ReplicatedMergeTreeLogEntryData & entry) { - auto zookeeper = getZooKeeper(); LOG_DEBUG(log, "Waiting for all replicas to process " << entry.znode_name); - Strings replicas = zookeeper->getChildren(zookeeper_path + "/replicas"); + Strings replicas = getZooKeeper()->getChildren(zookeeper_path + "/replicas"); for (const String & replica : replicas) waitForReplicaToProcessLogEntry(replica, entry); @@ -2938,8 +2935,6 @@ void StorageReplicatedMergeTree::waitForAllReplicasToProcessLogEntry(const Repli void StorageReplicatedMergeTree::waitForReplicaToProcessLogEntry(const String & replica, const ReplicatedMergeTreeLogEntryData & entry) { - auto zookeeper = getZooKeeper(); - String entry_str = entry.toString(); String log_node_name; @@ -2974,7 +2969,7 @@ void StorageReplicatedMergeTree::waitForReplicaToProcessLogEntry(const String & { zkutil::EventPtr event = std::make_shared(); - String log_pointer = zookeeper->get(zookeeper_path + "/replicas/" + replica + "/log_pointer", nullptr, event); + String log_pointer = getZooKeeper()->get(zookeeper_path + "/replicas/" + replica + "/log_pointer", nullptr, event); if (!log_pointer.empty() && parse(log_pointer) > log_index) break; @@ -2987,9 +2982,9 @@ void StorageReplicatedMergeTree::waitForReplicaToProcessLogEntry(const String & * ища ноду с таким же содержимым. И если мы её не найдём - значит реплика уже взяла эту запись в свою queue. */ - String log_pointer = zookeeper->get(zookeeper_path + "/replicas/" + replica + "/log_pointer"); + String log_pointer = getZooKeeper()->get(zookeeper_path + "/replicas/" + replica + "/log_pointer"); - Strings log_entries = zookeeper->getChildren(zookeeper_path + "/log"); + Strings log_entries = getZooKeeper()->getChildren(zookeeper_path + "/log"); UInt64 log_index = 0; bool found = false; @@ -3001,7 +2996,7 @@ void StorageReplicatedMergeTree::waitForReplicaToProcessLogEntry(const String & continue; String log_entry_str; - bool exists = zookeeper->tryGet(zookeeper_path + "/log/" + log_entry_name, log_entry_str); + bool exists = getZooKeeper()->tryGet(zookeeper_path + "/log/" + log_entry_name, log_entry_str); if (exists && entry_str == log_entry_str) { found = true; @@ -3019,7 +3014,7 @@ void StorageReplicatedMergeTree::waitForReplicaToProcessLogEntry(const String & { zkutil::EventPtr event = std::make_shared(); - String log_pointer = zookeeper->get(zookeeper_path + "/replicas/" + replica + "/log_pointer", nullptr, event); + String log_pointer = getZooKeeper()->get(zookeeper_path + "/replicas/" + replica + "/log_pointer", nullptr, event); if (!log_pointer.empty() && parse(log_pointer) > log_index) break; @@ -3040,13 +3035,13 @@ void StorageReplicatedMergeTree::waitForReplicaToProcessLogEntry(const String & * Поэтому, ищем путём сравнения содержимого. */ - Strings queue_entries = zookeeper->getChildren(zookeeper_path + "/replicas/" + replica + "/queue"); + Strings queue_entries = getZooKeeper()->getChildren(zookeeper_path + "/replicas/" + replica + "/queue"); String queue_entry_to_wait_for; for (const String & entry_name : queue_entries) { String queue_entry_str; - bool exists = zookeeper->tryGet(zookeeper_path + "/replicas/" + replica + "/queue/" + entry_name, queue_entry_str); + bool exists = getZooKeeper()->tryGet(zookeeper_path + "/replicas/" + replica + "/queue/" + entry_name, queue_entry_str); if (exists && queue_entry_str == entry_str) { queue_entry_to_wait_for = entry_name; @@ -3064,7 +3059,7 @@ void StorageReplicatedMergeTree::waitForReplicaToProcessLogEntry(const String & LOG_DEBUG(log, "Waiting for " << queue_entry_to_wait_for << " to disappear from " << replica << " queue"); /// Третье - дождемся, пока запись исчезнет из очереди реплики. - zookeeper->waitForDisappear(zookeeper_path + "/replicas/" + replica + "/queue/" + queue_entry_to_wait_for); + getZooKeeper()->waitForDisappear(zookeeper_path + "/replicas/" + replica + "/queue/" + queue_entry_to_wait_for); } @@ -3201,8 +3196,6 @@ void StorageReplicatedMergeTree::getReplicaDelays(time_t & out_absolute_delay, t void StorageReplicatedMergeTree::fetchPartition(const Field & partition, const String & from_, const Settings & settings) { - auto zookeeper = getZooKeeper(); - String partition_str = MergeTreeData::getMonthName(partition); String from = from_; @@ -3219,47 +3212,53 @@ void StorageReplicatedMergeTree::fetchPartition(const Field & partition, const S if (startsWith(dir_it.name(), partition_str)) throw Exception("Detached partition " + partition_str + " already exists.", ErrorCodes::PARTITION_ALREADY_EXISTS); - /// Список реплик шарда-источника. - zkutil::Strings replicas = zookeeper->getChildren(from + "/replicas"); - - /// Оставим только активные реплики. + zkutil::Strings replicas; zkutil::Strings active_replicas; - active_replicas.reserve(replicas.size()); - - for (const String & replica : replicas) - if (zookeeper->exists(from + "/replicas/" + replica + "/is_active")) - active_replicas.push_back(replica); - - if (active_replicas.empty()) - throw Exception("No active replicas for shard " + from, ErrorCodes::NO_ACTIVE_REPLICAS); - - /** Надо выбрать лучшую (наиболее актуальную) реплику. - * Это реплика с максимальным log_pointer, затем с минимальным размером queue. - * NOTE Это не совсем лучший критерий. Для скачивания старых партиций это не имеет смысла, - * и было бы неплохо уметь выбирать реплику, ближайшую по сети. - * NOTE Разумеется, здесь есть data race-ы. Можно решить ретраями. - */ - Int64 max_log_pointer = -1; - UInt64 min_queue_size = std::numeric_limits::max(); String best_replica; - for (const String & replica : active_replicas) { - String current_replica_path = from + "/replicas/" + replica; + auto zookeeper = getZooKeeper(); - String log_pointer_str = zookeeper->get(current_replica_path + "/log_pointer"); - Int64 log_pointer = log_pointer_str.empty() ? 0 : parse(log_pointer_str); + /// Список реплик шарда-источника. + replicas = zookeeper->getChildren(from + "/replicas"); - zkutil::Stat stat; - zookeeper->get(current_replica_path + "/queue", &stat); - size_t queue_size = stat.numChildren; + /// Оставим только активные реплики. + active_replicas.reserve(replicas.size()); - if (log_pointer > max_log_pointer - || (log_pointer == max_log_pointer && queue_size < min_queue_size)) + for (const String & replica : replicas) + if (zookeeper->exists(from + "/replicas/" + replica + "/is_active")) + active_replicas.push_back(replica); + + if (active_replicas.empty()) + throw Exception("No active replicas for shard " + from, ErrorCodes::NO_ACTIVE_REPLICAS); + + /** Надо выбрать лучшую (наиболее актуальную) реплику. + * Это реплика с максимальным log_pointer, затем с минимальным размером queue. + * NOTE Это не совсем лучший критерий. Для скачивания старых партиций это не имеет смысла, + * и было бы неплохо уметь выбирать реплику, ближайшую по сети. + * NOTE Разумеется, здесь есть data race-ы. Можно решить ретраями. + */ + Int64 max_log_pointer = -1; + UInt64 min_queue_size = std::numeric_limits::max(); + + for (const String & replica : active_replicas) { - max_log_pointer = log_pointer; - min_queue_size = queue_size; - best_replica = replica; + String current_replica_path = from + "/replicas/" + replica; + + String log_pointer_str = zookeeper->get(current_replica_path + "/log_pointer"); + Int64 log_pointer = log_pointer_str.empty() ? 0 : parse(log_pointer_str); + + zkutil::Stat stat; + zookeeper->get(current_replica_path + "/queue", &stat); + size_t queue_size = stat.numChildren; + + if (log_pointer > max_log_pointer + || (log_pointer == max_log_pointer && queue_size < min_queue_size)) + { + max_log_pointer = log_pointer; + min_queue_size = queue_size; + best_replica = replica; + } } } @@ -3288,7 +3287,7 @@ void StorageReplicatedMergeTree::fetchPartition(const Field & partition, const S if (try_no >= 5) throw Exception("Too much retries to fetch parts from " + best_replica_path, ErrorCodes::TOO_MUCH_RETRIES_TO_FETCH_PARTS); - Strings parts = zookeeper->getChildren(best_replica_path + "/parts"); + Strings parts = getZooKeeper()->getChildren(best_replica_path + "/parts"); ActiveDataPartSet active_parts_set(parts); Strings parts_to_fetch; @@ -3675,9 +3674,10 @@ StorageReplicatedMergeTree::gatherReplicaSpaceInfo(const WeightedZooKeeperPaths local_space_info.factor = 1.1; local_space_info.available_size = DiskSpaceMonitor::getUnreservedFreeSpace(full_path); - auto zookeeper = getZooKeeper(); for (const auto & weighted_path : weighted_zookeeper_paths) { + auto zookeeper = getZooKeeper(); + const auto & path = weighted_path.first; UInt64 weight = weighted_path.second; diff --git a/dbms/src/Storages/System/StorageSystemAsynchronousMetrics.cpp b/dbms/src/Storages/System/StorageSystemAsynchronousMetrics.cpp new file mode 100644 index 00000000000..80dfb5d9cd9 --- /dev/null +++ b/dbms/src/Storages/System/StorageSystemAsynchronousMetrics.cpp @@ -0,0 +1,68 @@ +#include +#include +#include +#include +#include +#include + + +namespace DB +{ + + +StorageSystemAsynchronousMetrics::StorageSystemAsynchronousMetrics(const std::string & name_, const AsynchronousMetrics & async_metrics_) + : name(name_), + columns + { + {"metric", std::make_shared()}, + {"value", std::make_shared()}, + }, + async_metrics(async_metrics_) +{ +} + +StoragePtr StorageSystemAsynchronousMetrics::create(const std::string & name_, const AsynchronousMetrics & async_metrics_) +{ + return make_shared(name_, async_metrics_); +} + + +BlockInputStreams StorageSystemAsynchronousMetrics::read( + const Names & column_names, + ASTPtr query, + const Context & context, + const Settings & settings, + QueryProcessingStage::Enum & processed_stage, + const size_t max_block_size, + const unsigned threads) +{ + check(column_names); + processed_stage = QueryProcessingStage::FetchColumns; + + Block block; + + ColumnWithTypeAndName col_metric; + col_metric.name = "metric"; + col_metric.type = std::make_shared(); + col_metric.column = std::make_shared(); + block.insert(col_metric); + + ColumnWithTypeAndName col_value; + col_value.name = "value"; + col_value.type = std::make_shared(); + col_value.column = std::make_shared(); + block.insert(col_value); + + auto async_metrics_values = async_metrics.getValues(); + + for (const auto & name_value : async_metrics_values) + { + col_metric.column->insert(name_value.first); + col_value.column->insert(name_value.second); + } + + return BlockInputStreams(1, std::make_shared(block)); +} + + +} diff --git a/dbms/src/Storages/System/StorageSystemEvents.cpp b/dbms/src/Storages/System/StorageSystemEvents.cpp index a8b995f701f..475664c9ddf 100644 --- a/dbms/src/Storages/System/StorageSystemEvents.cpp +++ b/dbms/src/Storages/System/StorageSystemEvents.cpp @@ -52,7 +52,7 @@ BlockInputStreams StorageSystemEvents::read( col_value.column = std::make_shared(); block.insert(col_value); - for (size_t i = 0; i < ProfileEvents::END; ++i) + for (size_t i = 0, end = ProfileEvents::end(); i < end; ++i) { UInt64 value = ProfileEvents::counters[i]; diff --git a/dbms/src/Storages/System/StorageSystemMetrics.cpp b/dbms/src/Storages/System/StorageSystemMetrics.cpp index 5cbfd4c8ee1..64610d9e5a9 100644 --- a/dbms/src/Storages/System/StorageSystemMetrics.cpp +++ b/dbms/src/Storages/System/StorageSystemMetrics.cpp @@ -52,7 +52,7 @@ BlockInputStreams StorageSystemMetrics::read( col_value.column = std::make_shared(); block.insert(col_value); - for (size_t i = 0; i < CurrentMetrics::END; ++i) + for (size_t i = 0, end = CurrentMetrics::end(); i < end; ++i) { auto value = CurrentMetrics::values[i].load(std::memory_order_relaxed); diff --git a/libs/libcommon/include/ext/shared_ptr_helper.hpp b/libs/libcommon/include/ext/shared_ptr_helper.hpp index 4678fe09e55..e18a0ee8818 100644 --- a/libs/libcommon/include/ext/shared_ptr_helper.hpp +++ b/libs/libcommon/include/ext/shared_ptr_helper.hpp @@ -5,39 +5,58 @@ namespace ext { -/** - * Class AllocateShared allow to make std::shared_ptr from T with private constructor. - * Derive you T class from shared_ptr_helper, define him as friend and call allocate_shared()/make_shared() method. -**/ -template +/** Class AllocateShared allow to make std::shared_ptr from T with private constructor. + * Derive your T class from shared_ptr_helper, define him as friend and call allocate_shared()/make_shared() method. + */ +template class shared_ptr_helper { protected: -typedef typename std::remove_const::type TNoConst; + typedef typename std::remove_const::type TNoConst; - template + template struct Deleter { - void operator()(typename TAlloc::value_type * ptr) - { - std::allocator_traits::destroy(alloc, ptr); - } - TAlloc alloc; + void operator()(typename TAlloc::value_type * ptr) + { + using AllocTraits = std::allocator_traits; + ptr->~TNoConst(); + AllocTraits::deallocate(alloc, ptr, 1); + } + + TAlloc alloc; }; -///see std::allocate_shared -template -static std::shared_ptr allocate_shared(const TAlloc & alloc, TArgs && ... args) -{ - TAlloc alloc_copy(alloc); - return std::shared_ptr(new (std::allocator_traits::allocate(alloc_copy, 1)) TNoConst(std::forward(args)...), Deleter(), alloc_copy); -} + /// see std::allocate_shared + template + static std::shared_ptr allocate_shared(const TAlloc & alloc, TArgs &&... args) + { + using AllocTraits = std::allocator_traits; + TAlloc alloc_copy(alloc); -template -static std::shared_ptr make_shared(TArgs && ... args) -{ - return allocate_shared(std::allocator(), std::forward(args)...); -} + auto ptr = AllocTraits::allocate(alloc_copy, 1); + + try + { + new (ptr) TNoConst(std::forward(args)...); + } + catch (...) + { + AllocTraits::deallocate(alloc_copy, ptr, 1); + throw; + } + + return std::shared_ptr( + ptr, + Deleter(), + alloc_copy); + } + + template + static std::shared_ptr make_shared(TArgs &&... args) + { + return allocate_shared(std::allocator(), std::forward(args)...); + } }; } diff --git a/libs/libzkutil/include/zkutil/KeeperException.h b/libs/libzkutil/include/zkutil/KeeperException.h index e9613c9c331..3c07053b371 100644 --- a/libs/libzkutil/include/zkutil/KeeperException.h +++ b/libs/libzkutil/include/zkutil/KeeperException.h @@ -12,6 +12,11 @@ namespace DB } } +namespace ProfileEvents +{ + extern const Event ZooKeeperExceptions; +} + namespace zkutil { @@ -54,7 +59,7 @@ public: { return code == ZCONNECTIONLOSS || code == ZOPERATIONTIMEOUT; } - + const int32_t code; private: diff --git a/libs/libzkutil/include/zkutil/LeaderElection.h b/libs/libzkutil/include/zkutil/LeaderElection.h index 1d53ef53b0b..9e153da4824 100644 --- a/libs/libzkutil/include/zkutil/LeaderElection.h +++ b/libs/libzkutil/include/zkutil/LeaderElection.h @@ -4,6 +4,19 @@ #include #include #include +#include + + +namespace ProfileEvents +{ + extern const Event ObsoleteEphemeralNode; + extern const Event LeaderElectionAcquiredLeadership; +} + +namespace CurrentMetrics +{ + extern const Metric LeaderElection; +} namespace zkutil @@ -17,6 +30,11 @@ public: using LeadershipHandler = std::function; /** handler is called when this instance become leader. + * + * identifier - if not empty, must uniquely (within same path) identify participant of leader election. + * It means that different participants of leader election have different identifiers + * and existence of more than one ephemeral node with same identifier indicates an error + * (see cleanOldEphemeralNodes). */ LeaderElection(const std::string & path_, ZooKeeper & zookeeper_, LeadershipHandler handler_, const std::string & identifier_ = "") : path(path_), zookeeper(zookeeper_), handler(handler_), identifier(identifier_) @@ -48,6 +66,8 @@ private: std::atomic shutdown {false}; zkutil::EventPtr event = std::make_shared(); + CurrentMetrics::Increment metric_increment{CurrentMetrics::LeaderElection}; + void createNode() { shutdown = false; @@ -56,9 +76,41 @@ private: std::string node_path = node->getPath(); node_name = node_path.substr(node_path.find_last_of('/') + 1); + cleanOldEphemeralNodes(); + thread = std::thread(&LeaderElection::threadFunction, this); } + void cleanOldEphemeralNodes() + { + if (identifier.empty()) + return; + + /** If there are nodes with same identifier, remove them. + * Such nodes could still be alive after failed attempt of removal, + * if it was temporary communication failure, that was continued for more than session timeout, + * but ZK session is still alive for unknown reason, and someone still holds that ZK session. + * See comments in destructor of EphemeralNodeHolder. + */ + Strings brothers = zookeeper.getChildren(path); + for (const auto & brother : brothers) + { + if (brother == node_name) + continue; + + std::string brother_path = path + "/" + brother; + std::string brother_identifier = zookeeper.get(brother_path); + + if (brother_identifier == identifier) + { + ProfileEvents::increment(ProfileEvents::ObsoleteEphemeralNode); + LOG_WARNING(&Logger::get("LeaderElection"), "Found obsolete ephemeral node for identifier " + + identifier + ", removing: " + brother_path); + zookeeper.tryRemoveWithRetries(brother_path); + } + } + } + void releaseNode() { shutdown = true; @@ -84,6 +136,7 @@ private: if (it == children.begin()) { + ProfileEvents::increment(ProfileEvents::LeaderElectionAcquiredLeadership); handler(); return; } diff --git a/libs/libzkutil/include/zkutil/ZooKeeper.h b/libs/libzkutil/include/zkutil/ZooKeeper.h index 69457c59870..71e1aade795 100644 --- a/libs/libzkutil/include/zkutil/ZooKeeper.h +++ b/libs/libzkutil/include/zkutil/ZooKeeper.h @@ -9,6 +9,19 @@ #include #include #include +#include +#include + + +namespace ProfileEvents +{ + extern const Event CannotRemoveEphemeralNode; +} + +namespace CurrentMetrics +{ + extern const Metric EphemeralNode; +} namespace zkutil @@ -432,13 +445,15 @@ public: } catch (const KeeperException & e) { - LOG_ERROR(zookeeper.log, "~EphemeralNodeHolder(): " << e.displayText()); + ProfileEvents::increment(ProfileEvents::CannotRemoveEphemeralNode); + DB::tryLogCurrentException(__PRETTY_FUNCTION__); } } private: std::string path; ZooKeeper & zookeeper; + CurrentMetrics::Increment metric_increment{CurrentMetrics::EphemeralNode}; }; using EphemeralNodeHolderPtr = EphemeralNodeHolder::Ptr; diff --git a/libs/libzkutil/src/ZooKeeper.cpp b/libs/libzkutil/src/ZooKeeper.cpp index aa0bcf6d4d6..bc5d100cea2 100644 --- a/libs/libzkutil/src/ZooKeeper.cpp +++ b/libs/libzkutil/src/ZooKeeper.cpp @@ -5,6 +5,25 @@ #include +namespace ProfileEvents +{ + extern const Event ZooKeeperInit; + extern const Event ZooKeeperTransactions; + extern const Event ZooKeeperCreate; + extern const Event ZooKeeperRemove; + extern const Event ZooKeeperExists; + extern const Event ZooKeeperMulti; + extern const Event ZooKeeperGet; + extern const Event ZooKeeperSet; + extern const Event ZooKeeperGetChildren; +} + +namespace CurrentMetrics +{ + extern const Metric ZooKeeperWatch; +} + + namespace zkutil { @@ -29,6 +48,7 @@ struct WatchWithEvent /// существует все время существования WatchWithEvent ZooKeeper & zk; EventPtr event; + CurrentMetrics::Increment metric_increment{CurrentMetrics::ZooKeeperWatch}; WatchWithEvent(ZooKeeper & zk_, EventPtr event_) : zk(zk_), event(event_) {}