Merge branch 'master' of github.com:yandex/ClickHouse

This commit is contained in:
Alexey Milovidov 2016-10-25 01:00:27 +03:00
commit c9ec7b18e3
81 changed files with 1524 additions and 597 deletions

View File

@ -108,6 +108,7 @@ include_directories (${ClickHouse_SOURCE_DIR}/contrib/libsparsehash/)
include_directories (${ClickHouse_SOURCE_DIR}/contrib/libre2/)
include_directories (${ClickHouse_BINARY_DIR}/contrib/libre2/)
include_directories (${ClickHouse_SOURCE_DIR}/contrib/libzookeeper/include/)
include_directories (${ClickHouse_SOURCE_DIR}/contrib/libtcmalloc/include/)
include_directories (${ClickHouse_SOURCE_DIR}/contrib/libpoco/Foundation/include/)
include_directories (${ClickHouse_SOURCE_DIR}/contrib/libpoco/Util/include/)
include_directories (${ClickHouse_SOURCE_DIR}/contrib/libpoco/Net/include/)

View File

@ -15,6 +15,10 @@ else()
set (LINK_MONGOCLIENT libmongoclient.a libssl.a libcrypto.a libboost_thread.a)
endif()
if (DISABLE_LIBTCMALLOC)
add_definitions(-D NO_TCMALLOC)
endif()
add_library(string_utils
include/DB/Common/StringUtils.h
@ -371,6 +375,7 @@ add_library (dbms
include/DB/Interpreters/ClusterProxy/DescribeQueryConstructor.h
include/DB/Interpreters/ClusterProxy/AlterQueryConstructor.h
include/DB/Interpreters/ClusterProxy/Query.h
include/DB/Interpreters/AsynchronousMetrics.h
include/DB/Common/Allocator.h
include/DB/Common/CombinedCardinalityEstimator.h
include/DB/Common/ExternalTable.h
@ -541,6 +546,7 @@ add_library (dbms
include/DB/Storages/StorageStripeLog.h
include/DB/Storages/System/StorageSystemEvents.h
include/DB/Storages/System/StorageSystemMetrics.h
include/DB/Storages/System/StorageSystemAsynchronousMetrics.h
include/DB/Storages/System/StorageSystemTables.h
include/DB/Storages/MergeTree/MarkRange.h
include/DB/Storages/MergeTree/MergeTreeDataMerger.h
@ -647,6 +653,7 @@ add_library (dbms
src/Storages/System/StorageSystemProcesses.cpp
src/Storages/System/StorageSystemEvents.cpp
src/Storages/System/StorageSystemMetrics.cpp
src/Storages/System/StorageSystemAsynchronousMetrics.cpp
src/Storages/System/StorageSystemMerges.cpp
src/Storages/System/StorageSystemSettings.cpp
src/Storages/System/StorageSystemZooKeeper.cpp
@ -852,6 +859,7 @@ add_library (dbms
src/Interpreters/ClusterProxy/DescribeQueryConstructor.cpp
src/Interpreters/ClusterProxy/AlterQueryConstructor.cpp
src/Interpreters/ClusterProxy/Query.cpp
src/Interpreters/AsynchronousMetrics.cpp
src/Functions/FunctionFactory.cpp
src/Functions/FunctionsArithmetic.cpp

View File

@ -10,22 +10,28 @@
#include <DB/Common/Allocator.h>
namespace ProfileEvents
{
extern const Event ArenaAllocChunks;
extern const Event ArenaAllocBytes;
}
namespace DB
{
/** Пул, в который можно складывать что-нибудь. Например, короткие строки.
* Сценарий использования:
* - складываем много строк и запоминаем их адреса;
* - адреса остаются валидными в течение жизни пула;
* - при уничтожении пула, вся память освобождается;
* - память выделяется и освобождается большими кусками;
* - удаление части данных не предусмотрено;
/** Memory pool to append something. For example, short strings.
* Usage scenario:
* - put lot of strings inside pool, keep their addresses;
* - addresses remain valid during lifetime of pool;
* - at destruction of pool, all memory is freed;
* - memory is allocated and freed by large chunks;
* - freeing parts of data is not possible (but look at ArenaWithFreeLists if you need);
*/
class Arena : private boost::noncopyable
{
private:
/// Непрерывный кусок памяти и указатель на свободное место в нём. Односвязный список.
/// Contiguous chunk of memory and pointer to free space inside it. Member of single-linked list.
struct Chunk : private Allocator<false> /// empty base optimization
{
char * begin;
@ -59,7 +65,7 @@ private:
size_t growth_factor;
size_t linear_growth_threshold;
/// Последний непрерывный кусок памяти.
/// Last contiguous chunk of memory.
Chunk * head;
size_t size_in_bytes;
@ -68,7 +74,8 @@ private:
return (s + 4096 - 1) / 4096 * 4096;
}
/// Если размер чанка меньше linear_growth_threshold, то рост экспоненциальный, иначе - линейный, для уменьшения потребления памяти.
/// If chunks size is less than 'linear_growth_threshold', then use exponential growth, otherwise - linear growth
/// (to not allocate too much excessive memory).
size_t nextSize(size_t min_next_size) const
{
size_t size_after_grow = 0;
@ -84,7 +91,7 @@ private:
return roundUpToPageSize(size_after_grow);
}
/// Добавить следующий непрерывный кусок памяти размера не меньше заданного.
/// Add next contiguous chunk of memory with size not less than specified.
void NO_INLINE addChunk(size_t min_size)
{
head = new Chunk(nextSize(min_size), head);
@ -103,7 +110,7 @@ public:
delete head;
}
/// Получить кусок памяти, без выравнивания.
/// Get piece of memory, without alignment.
char * alloc(size_t size)
{
if (unlikely(head->pos + size > head->end))
@ -114,17 +121,17 @@ public:
return res;
}
/** Отменить только что сделанное выделение памяти.
* Нужно передать размер не больше того, который был только что выделен.
/** Rollback just performed allocation.
* Must pass size not more that was just allocated.
*/
void rollback(size_t size)
{
head->pos -= size;
}
/** Начать или расширить непрерывный кусок памяти.
* begin - текущее начало куска памяти, если его надо расширить, или nullptr, если его надо начать.
* Если в чанке не хватило места - скопировать существующие данные в новый кусок памяти и изменить значение begin.
/** Begin or expand allocation of contiguous piece of memory.
* 'begin' - current begin of piece of memory, if it need to be expanded, or nullptr, if it need to be started.
* If there is no space in chunk to expand current piece of memory - then copy all piece to new chunk and change value of 'begin'.
*/
char * allocContinue(size_t size, char const *& begin)
{
@ -148,7 +155,7 @@ public:
return res;
}
/// Вставить строку без выравнивания.
/// Insert string without alignment.
const char * insert(const char * data, size_t size)
{
char * res = alloc(size);
@ -156,7 +163,7 @@ public:
return res;
}
/// Размер выделенного пула в байтах
/// Size of chunks in bytes.
size_t size() const
{
return size_in_bytes;

View File

@ -1,81 +1,45 @@
#pragma once
#include <stddef.h>
#include <cstdint>
#include <utility>
#include <atomic>
/** Позволяет считать количество одновременно происходящих событий или текущее значение какой-либо метрики.
* - для высокоуровневого профайлинга.
/** Allows to count number of simultaneously happening processes or current value of some metric.
* - for high-level profiling.
*
* Также смотрите ProfileEvents.h
* В ProfileEvents считается общее количество произошедших (точечных) событий - например, сколько раз были выполнены запросы.
* В CurrentMetrics считается количество одновременных событий - например, сколько сейчас одновременно выполняется запросов,
* или текущее значение метрики - например, величина отставания реплики в секундах.
* See also ProfileEvents.h
* ProfileEvents counts number of happened events - for example, how many times queries was executed.
* CurrentMetrics counts number of simultaneously happening events - for example, number of currently executing queries, right now,
* or just current value of some metric - for example, replica delay in seconds.
*
* CurrentMetrics are updated instantly and are correct for any point in time.
* For periodically (asynchronously) updated metrics, see AsynchronousMetrics.h
*/
#define APPLY_FOR_METRICS(M) \
M(Query) \
M(Merge) \
M(ReplicatedFetch) \
M(ReplicatedSend) \
M(ReplicatedChecks) \
M(BackgroundPoolTask) \
M(DiskSpaceReservedForMerge) \
M(DistributedSend) \
M(QueryPreempted) \
M(TCPConnection) \
M(HTTPConnection) \
M(InterserverConnection) \
M(OpenFileForRead) \
M(OpenFileForWrite) \
M(Read) \
M(Write) \
M(SendExternalTables) \
M(QueryThread) \
M(ReadonlyReplica) \
M(MemoryTracking) \
\
M(END)
namespace CurrentMetrics
{
/// Виды метрик.
enum Metric
{
#define M(NAME) NAME,
APPLY_FOR_METRICS(M)
#undef M
};
/// Получить текстовое описание метрики по его enum-у.
inline const char * getDescription(Metric event)
{
static const char * descriptions[] =
{
#define M(NAME) #NAME,
APPLY_FOR_METRICS(M)
#undef M
};
return descriptions[event];
}
/// Metric identifier (index in array).
using Metric = size_t;
using Value = int64_t;
/// Счётчики - текущие значения метрик.
extern std::atomic<Value> values[END];
/// Get text description of metric by identifier. Returns statically allocated string.
const char * getDescription(Metric event);
/// Metric identifier -> current value of metric.
extern std::atomic<Value> values[];
/// Выставить значение указанной метрики.
/// Get index just after last metric identifier.
Metric end();
/// Set value of specified metric.
inline void set(Metric metric, Value value)
{
values[metric] = value;
}
/// Прибавить величину к значению указанной метрики. Вы затем должны вычесть величину самостоятельно. Или см. ниже class Increment.
/// Add value for specified metric. You must subtract value later; or see class Increment below.
inline void add(Metric metric, Value value = 1)
{
values[metric] += value;
@ -86,7 +50,7 @@ namespace CurrentMetrics
add(metric, -value);
}
/// На время жизни объекта, увеличивает указанное значение на указанную величину.
/// For lifetime of object, add amout for specified metric. Then subtract.
class Increment
{
private:
@ -128,7 +92,7 @@ namespace CurrentMetrics
amount = new_amount;
}
/// Уменьшить значение раньше вызова деструктора.
/// Subtract value before destructor.
void destroy()
{
*what -= amount;
@ -136,6 +100,3 @@ namespace CurrentMetrics
}
};
}
#undef APPLY_FOR_METRICS

View File

@ -18,6 +18,12 @@ namespace ErrorCodes
}
}
namespace ProfileEvents
{
extern const Event DistributedConnectionFailTry;
extern const Event DistributedConnectionFailAtAll;
}
namespace
{

View File

@ -4,136 +4,29 @@
#include <atomic>
/** Позволяет считать количество различных событий, произошедших в программе
* - для высокоуровневого профайлинга.
/** Implements global counters for various events happening in the application
* - for high level profiling.
* See .cpp for list of events.
*/
#define APPLY_FOR_EVENTS(M) \
M(Query) \
M(SelectQuery) \
M(InsertQuery) \
M(FileOpen) \
M(Seek) \
M(ReadBufferFromFileDescriptorRead) \
M(ReadBufferFromFileDescriptorReadBytes) \
M(WriteBufferFromFileDescriptorWrite) \
M(WriteBufferFromFileDescriptorWriteBytes) \
M(ReadBufferAIORead) \
M(ReadBufferAIOReadBytes) \
M(WriteBufferAIOWrite) \
M(WriteBufferAIOWriteBytes) \
M(ReadCompressedBytes) \
M(CompressedReadBufferBlocks) \
M(CompressedReadBufferBytes) \
M(UncompressedCacheHits) \
M(UncompressedCacheMisses) \
M(UncompressedCacheWeightLost) \
M(IOBufferAllocs) \
M(IOBufferAllocBytes) \
M(ArenaAllocChunks) \
M(ArenaAllocBytes) \
M(FunctionExecute) \
M(MarkCacheHits) \
M(MarkCacheMisses) \
M(CreatedReadBufferOrdinary) \
M(CreatedReadBufferAIO) \
M(CreatedWriteBufferOrdinary) \
M(CreatedWriteBufferAIO) \
\
M(ReplicatedPartFetches) \
M(ReplicatedPartFailedFetches) \
M(ObsoleteReplicatedParts) \
M(ReplicatedPartMerges) \
M(ReplicatedPartFetchesOfMerged) \
M(ReplicatedPartChecks) \
M(ReplicatedPartChecksFailed) \
M(ReplicatedDataLoss) \
\
M(DelayedInserts) \
M(RejectedInserts) \
M(DelayedInsertsMilliseconds) \
M(SynchronousMergeOnInsert) \
\
M(ZooKeeperInit) \
M(ZooKeeperTransactions) \
M(ZooKeeperGetChildren) \
M(ZooKeeperCreate) \
M(ZooKeeperRemove) \
M(ZooKeeperExists) \
M(ZooKeeperGet) \
M(ZooKeeperSet) \
M(ZooKeeperMulti) \
M(ZooKeeperExceptions) \
\
M(DistributedConnectionFailTry) \
M(DistributedConnectionFailAtAll) \
\
M(CompileAttempt) \
M(CompileSuccess) \
\
M(ExternalSortWritePart) \
M(ExternalSortMerge) \
M(ExternalAggregationWritePart) \
M(ExternalAggregationMerge) \
M(ExternalAggregationCompressedBytes) \
M(ExternalAggregationUncompressedBytes) \
\
M(SlowRead) \
M(ReadBackoff) \
\
M(ReplicaYieldLeadership) \
M(ReplicaPartialShutdown) \
\
M(SelectedParts) \
M(SelectedRanges) \
M(SelectedMarks) \
\
M(MergedRows) \
M(MergedUncompressedBytes) \
\
M(MergeTreeDataWriterRows) \
M(MergeTreeDataWriterUncompressedBytes) \
M(MergeTreeDataWriterCompressedBytes) \
M(MergeTreeDataWriterBlocks) \
M(MergeTreeDataWriterBlocksAlreadySorted) \
\
M(END)
namespace ProfileEvents
{
/// Виды событий.
enum Event
{
#define M(NAME) NAME,
APPLY_FOR_EVENTS(M)
#undef M
};
/// Event identifier (index in array).
using Event = size_t;
using Count = size_t;
/// Get text description of event by identifier. Returns statically allocated string.
const char * getDescription(Event event);
/// Получить текстовое описание события по его enum-у.
inline const char * getDescription(Event event)
{
static const char * descriptions[] =
{
#define M(NAME) #NAME,
APPLY_FOR_EVENTS(M)
#undef M
};
/// Counters - how many times each event happened.
extern std::atomic<Count> counters[];
return descriptions[event];
}
/// Счётчики - сколько раз каждое из событий произошло.
extern std::atomic<size_t> counters[END];
/// Увеличить счётчик события. Потокобезопасно.
inline void increment(Event event, size_t amount = 1)
/// Increment a counter for event. Thread-safe.
inline void increment(Event event, Count amount = 1)
{
counters[event] += amount;
}
/// Get index just after last event identifier.
Event end();
}
#undef APPLY_FOR_EVENTS

View File

@ -8,6 +8,11 @@
#include <DB/Common/ThreadPool.h>
namespace CurrentMetrics
{
extern const Metric QueryThread;
}
namespace DB
{

View File

@ -11,6 +11,12 @@
#include <common/ClickHouseRevision.h>
namespace ProfileEvents
{
extern const Event ExternalAggregationMerge;
}
namespace DB
{

View File

@ -13,17 +13,22 @@
#include <DB/Common/CurrentMetrics.h>
/** Позволяет обработать множество источников блоков параллельно, используя указанное количество потоков.
* Вынимает из любого доступного источника блок и передаёт его на обработку в предоставленный handler.
/** Allows to process multiple block input streams (sources) in parallel, using specified number of threads.
* Reads (pulls) blocks from any available source and passes it to specified handler.
*
* Устроено так:
* - есть набор источников, из которых можно вынимать блоки;
* - есть набор потоков, которые могут одновременно вынимать блоки из разных источников;
* - "свободные" источники (с которыми сейчас не работает никакой поток) кладутся в очередь источников;
* - когда поток берёт источник для обработки, он удаляет его из очереди источников,
* вынимает из него блок, и затем кладёт источник обратно в очередь источников;
* Implemented in following way:
* - there are multiple input sources to read blocks from;
* - there are multiple threads, that could simultaneously read blocks from different sources;
* - "available" sources (that are not read in any thread right now) are put in queue of sources;
* - when thread take a source to read from, it removes source from queue of sources,
* then read block from source and then put source back to queue of available sources.
*/
namespace CurrentMetrics
{
extern const Metric QueryThread;
}
namespace DB
{

View File

@ -24,6 +24,12 @@
#include <ext/range.hpp>
namespace ProfileEvents
{
extern const Event RegexpCreated;
}
namespace DB
{
@ -445,6 +451,7 @@ namespace Regexps
if (no_capture)
flags |= OptimizedRegularExpression::RE_NO_CAPTURE;
ProfileEvents::increment(ProfileEvents::RegexpCreated);
return new Regexp{createRegexp<like>(pattern, flags)};
});
}

View File

@ -9,13 +9,20 @@
#include <DB/Core/Defines.h>
namespace ProfileEvents
{
extern const Event IOBufferAllocs;
extern const Event IOBufferAllocBytes;
}
namespace DB
{
/** Замена std::vector<char> для использования в буферах.
* Отличается тем, что не делает лишний memset. (И почти ничего не делает.)
* Также можно попросить выделять выровненный кусок памяти.
/** Replacement for std::vector<char> to use in buffers.
* Differs in that is doesn't do unneeded memset. (And also tries to do as little as possible.)
* Also allows to allocate aligned piece of memory (to use with O_DIRECT, for example).
*/
struct Memory : boost::noncopyable, Allocator<false>
{
@ -26,7 +33,7 @@ struct Memory : boost::noncopyable, Allocator<false>
Memory() {}
/// Если alignment != 0, то будет выделяться память, выровненная на alignment.
/// If alignment != 0, then allocate memory aligned to specified value.
Memory(size_t size_, size_t alignment_ = 0) : m_capacity(size_), m_size(m_capacity), alignment(alignment_)
{
alloc();
@ -73,8 +80,7 @@ struct Memory : boost::noncopyable, Allocator<false>
else
{
new_size = align(new_size, alignment);
/// @todo pointer to void can be converted to pointer to any type with static_cast by ISO C++, reinterpret_cast has no advantages
m_data = reinterpret_cast<char *>(Allocator::realloc(m_data, m_capacity, new_size, alignment));
m_data = static_cast<char *>(Allocator::realloc(m_data, m_capacity, new_size, alignment));
m_capacity = new_size;
m_size = m_capacity;
}
@ -101,8 +107,7 @@ private:
ProfileEvents::increment(ProfileEvents::IOBufferAllocBytes, m_capacity);
size_t new_capacity = align(m_capacity, alignment);
/// @todo pointer to void can be converted to pointer to any type with static_cast by ISO C++, reinterpret_cast has no advantages
m_data = reinterpret_cast<char *>(Allocator::alloc(new_capacity, alignment));
m_data = static_cast<char *>(Allocator::alloc(new_capacity, alignment));
m_capacity = new_capacity;
m_size = m_capacity;
}
@ -112,15 +117,14 @@ private:
if (!m_data)
return;
/// @todo pointer to any type can be implicitly converted to pointer to void, no cast required
Allocator::free(reinterpret_cast<void *>(m_data), m_capacity);
m_data = nullptr; /// Чтобы избежать double free, если последующий вызов alloc кинет исключение.
Allocator::free(m_data, m_capacity);
m_data = nullptr; /// To avoid double free if next alloc will throw an exception.
}
};
/** Буфер, который может сам владеть своим куском памяти для работы.
* Аргумент шаблона - ReadBuffer или WriteBuffer
/** Buffer that could own its working memory.
* Template parameter: ReadBuffer or WriteBuffer
*/
template <typename Base>
class BufferWithOwnMemory : public Base
@ -128,7 +132,7 @@ class BufferWithOwnMemory : public Base
protected:
Memory memory;
public:
/// Если передать не-NULL existing_memory, то буфер не будет создавать свой кусок памяти, а будет использовать существующий (и не будет им владеть).
/// If non-nullptr 'existing_memory' is passed, then buffer will not create its own memory and will use existing_memory without ownership.
BufferWithOwnMemory(size_t size = DBMS_DEFAULT_BUFFER_SIZE, char * existing_memory = nullptr, size_t alignment = 0)
: Base(nullptr, 0), memory(existing_memory ? 0 : size, alignment)
{

View File

@ -21,6 +21,13 @@
#include <DB/IO/WriteHelpers.h>
namespace ProfileEvents
{
extern const Event ReadCompressedBytes;
extern const Event CompressedReadBufferBlocks;
extern const Event CompressedReadBufferBytes;
}
namespace DB
{
@ -33,27 +40,31 @@ namespace ErrorCodes
}
/** Basic functionality for implementation of
* CompressedReadBuffer, CompressedReadBufferFromFile and CachedCompressedReadBuffer.
*/
class CompressedReadBufferBase
{
protected:
ReadBuffer * compressed_in;
/// Если в буфере compressed_in помещается целый сжатый блок - используем его. Иначе - копируем данные по кусочкам в own_compressed_buffer.
/// If 'compressed_in' buffer has whole compressed block - than use it. Otherwise copy parts of data to 'own_compressed_buffer'.
PODArray<char> own_compressed_buffer{COMPRESSED_BLOCK_HEADER_SIZE};
/// Points to memory, holding compressed block.
char * compressed_buffer = nullptr;
#ifdef USE_QUICKLZ
std::unique_ptr<qlz_state_decompress> qlz_state;
#else
void * fixed_size_padding = nullptr;
void * fixed_size_padding = nullptr; /// ABI compatibility for USE_QUICKLZ
#endif
/// Не проверять чексуммы.
/// Don't checksum on decompressing.
bool disable_checksum = false;
/// Прочитать сжатые данные в compressed_buffer. Достать из их заголовка размер разжатых данных. Проверить чексумму.
/// Возвращает количество прочитанных байт.
/// Read compressed data into compressed_buffer. Get size of decompressed data from block header. Checksum if need.
/// Returns number of compressed bytes read.
size_t readCompressedData(size_t & size_decompressed, size_t & size_compressed_without_checksum)
{
if (compressed_in->eof())
@ -65,7 +76,7 @@ protected:
own_compressed_buffer.resize(COMPRESSED_BLOCK_HEADER_SIZE);
compressed_in->readStrict(&own_compressed_buffer[0], COMPRESSED_BLOCK_HEADER_SIZE);
UInt8 method = own_compressed_buffer[0]; /// См. CompressedWriteBuffer.h
UInt8 method = own_compressed_buffer[0]; /// See CompressedWriteBuffer.h
size_t & size_compressed = size_compressed_without_checksum;
@ -91,7 +102,7 @@ protected:
ProfileEvents::increment(ProfileEvents::ReadCompressedBytes, size_compressed + sizeof(checksum));
/// Находится ли сжатый блок целиком в буфере compressed_in?
/// Is whole compressed block located in 'compressed_in' buffer?
if (compressed_in->offset() >= COMPRESSED_BLOCK_HEADER_SIZE &&
compressed_in->position() + size_compressed - COMPRESSED_BLOCK_HEADER_SIZE <= compressed_in->buffer().end())
{
@ -117,7 +128,7 @@ protected:
ProfileEvents::increment(ProfileEvents::CompressedReadBufferBlocks);
ProfileEvents::increment(ProfileEvents::CompressedReadBufferBytes, size_decompressed);
UInt8 method = compressed_buffer[0]; /// См. CompressedWriteBuffer.h
UInt8 method = compressed_buffer[0]; /// See CompressedWriteBuffer.h
if (method < 0x80)
{
@ -149,15 +160,15 @@ protected:
}
public:
/// compressed_in можно инициализировать отложенно, но до первого вызова readCompressedData.
/// 'compressed_in' could be initialized lazily, but before first call of 'readCompressedData'.
CompressedReadBufferBase(ReadBuffer * in = nullptr)
: compressed_in(in)
{
}
/** Не проверять чексуммы.
* Может использоваться, например, в тех случаях, когда сжатые данные пишет клиент,
* который не умеет вычислять чексуммы, и вместо этого заполняет их нулями или чем угодно.
/** Disable checksums.
* For example, may be used when
* compressed data is generated by client, that cannot calculate checksums, and fill checksums with zeros instead.
*/
void disableChecksumming()
{

View File

@ -12,6 +12,12 @@
#include <unistd.h>
#include <fcntl.h>
namespace CurrentMetrics
{
extern const Metric OpenFileForRead;
}
namespace DB
{

View File

@ -6,6 +6,16 @@
#include <DB/Common/CurrentMetrics.h>
namespace ProfileEvents
{
extern const Event FileOpen;
}
namespace CurrentMetrics
{
extern const Metric OpenFileForRead;
}
namespace DB
{
@ -17,7 +27,8 @@ namespace ErrorCodes
}
/** Принимает имя файла. Самостоятельно открывает и закрывает файл.
/** Accepts path to file and opens it, or pre-opened file descriptor.
* Closes file by himself (thus "owns" a file descriptor).
*/
class ReadBufferFromFile : public ReadBufferFromFileDescriptor
{
@ -38,7 +49,7 @@ public:
throwFromErrno("Cannot open file " + file_name, errno == ENOENT ? ErrorCodes::FILE_DOESNT_EXIST : ErrorCodes::CANNOT_OPEN_FILE);
}
/// Использовать уже открытый файл.
/// Use pre-opened file descriptor.
ReadBufferFromFile(int fd, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, int flags = -1,
char * existing_memory = nullptr, size_t alignment = 0)
: ReadBufferFromFileDescriptor(fd, buf_size, existing_memory, alignment), file_name("(fd = " + toString(fd) + ")")
@ -53,7 +64,7 @@ public:
::close(fd);
}
/// Закрыть файл раньше вызова деструктора.
/// Close file before destruction of object.
void close()
{
if (0 != ::close(fd))

View File

@ -18,6 +18,18 @@
#include <DB/IO/BufferWithOwnMemory.h>
namespace ProfileEvents
{
extern const Event ReadBufferFromFileDescriptorRead;
extern const Event ReadBufferFromFileDescriptorReadBytes;
extern const Event Seek;
}
namespace CurrentMetrics
{
extern const Metric Read;
}
namespace DB
{
@ -29,13 +41,13 @@ namespace ErrorCodes
extern const int CANNOT_SELECT;
}
/** Работает с готовым файловым дескриптором. Не открывает и не закрывает файл.
/** Use ready file descriptor. Does not open or close a file.
*/
class ReadBufferFromFileDescriptor : public ReadBufferFromFileBase
{
protected:
int fd;
off_t pos_in_file; /// Какому сдвигу в файле соответствует working_buffer.end().
off_t pos_in_file; /// What offset in file corresponds to working_buffer.end().
bool nextImpl() override
{
@ -85,7 +97,7 @@ protected:
return true;
}
/// Имя или описание файла
/// Name or some description of file.
std::string getFileName() const override
{
return "(fd = " + toString(fd) + ")";
@ -108,7 +120,7 @@ public:
private:
/// Если offset такой маленький, что мы не выйдем за пределы буфера, настоящий seek по файлу не делается.
/// If 'offset' is small enough to stay in buffer after seek, then true seek in file does not happen.
off_t doSeek(off_t offset, int whence) override
{
off_t new_pos = offset;
@ -117,13 +129,13 @@ private:
else if (whence != SEEK_SET)
throw Exception("ReadBufferFromFileDescriptor::seek expects SEEK_SET or SEEK_CUR as whence", ErrorCodes::ARGUMENT_OUT_OF_BOUND);
/// Никуда не сдвинулись.
/// Position is unchanged.
if (new_pos + (working_buffer.end() - pos) == pos_in_file)
return new_pos;
if (hasPendingData() && new_pos <= pos_in_file && new_pos >= pos_in_file - static_cast<off_t>(working_buffer.size()))
{
/// Остались в пределах буфера.
/// Position is still inside buffer.
pos = working_buffer.begin() + (new_pos - (pos_in_file - working_buffer.size()));
return new_pos;
}
@ -141,7 +153,7 @@ private:
}
/// При условии, что файловый дескриптор позволяет использовать select, проверяет в течение таймаута, есть ли данные для чтения.
/// Assuming file descriptor supports 'select', check that we have data to read or wait until timeout.
bool poll(size_t timeout_microseconds)
{
fd_set fds;

View File

@ -7,6 +7,13 @@
#include <DB/Interpreters/AggregationCommon.h>
namespace ProfileEvents
{
extern const Event UncompressedCacheHits;
extern const Event UncompressedCacheMisses;
extern const Event UncompressedCacheWeightLost;
}
namespace DB
{
@ -26,7 +33,7 @@ struct UncompressedSizeWeightFunction
};
/** Кэш разжатых блоков для CachedCompressedReadBuffer. thread-safe.
/** Cache of decompressed blocks for implementation of CachedCompressedReadBuffer. thread-safe.
*/
class UncompressedCache : public LRUCache<UInt128, UncompressedCacheCell, UInt128TrivialHash, UncompressedSizeWeightFunction>
{
@ -37,7 +44,7 @@ public:
UncompressedCache(size_t max_size_in_bytes)
: Base(max_size_in_bytes) {}
/// Посчитать ключ от пути к файлу и смещения.
/// Calculate key from path to file and offset.
static UInt128 hash(const String & path_to_file, size_t offset)
{
UInt128 key;

View File

@ -11,6 +11,12 @@
#include <unistd.h>
#include <fcntl.h>
namespace CurrentMetrics
{
extern const Metric OpenFileForWrite;
}
namespace DB
{
@ -32,12 +38,10 @@ public:
int getFD() const override { return fd; }
private:
///
void nextImpl() override;
///
off_t doSeek(off_t off, int whence) override;
///
void doTruncate(off_t length) override;
/// Если в буфере ещё остались данные - запишем их.
void flush();
/// Ждать окончания текущей асинхронной задачи.

View File

@ -10,6 +10,16 @@
#include <DB/IO/WriteBufferFromFileDescriptor.h>
namespace ProfileEvents
{
extern const Event FileOpen;
}
namespace CurrentMetrics
{
extern const Metric OpenFileForWrite;
}
namespace DB
{
@ -21,7 +31,8 @@ namespace ErrorCodes
}
/** Принимает имя файла. Самостоятельно открывает и закрывает файл.
/** Accepts path to file and opens it, or pre-opened file descriptor.
* Closes file by himself (thus "owns" a file descriptor).
*/
class WriteBufferFromFile : public WriteBufferFromFileDescriptor
{
@ -42,7 +53,7 @@ public:
throwFromErrno("Cannot open file " + file_name, errno == ENOENT ? ErrorCodes::FILE_DOESNT_EXIST : ErrorCodes::CANNOT_OPEN_FILE);
}
/// Использовать уже открытый файл.
/// Use pre-opened file descriptor.
WriteBufferFromFile(int fd, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, int flags = -1, mode_t mode = 0666,
char * existing_memory = nullptr, size_t alignment = 0)
: WriteBufferFromFileDescriptor(fd, buf_size, existing_memory, alignment), file_name("(fd = " + toString(fd) + ")")
@ -66,7 +77,7 @@ public:
::close(fd);
}
/// Закрыть файл раньше вызова деструктора.
/// Close file before destruction of object.
void close()
{
next();
@ -78,17 +89,6 @@ public:
metric_increment.destroy();
}
/** fsync() transfers ("flushes") all modified in-core data of (i.e., modified buffer cache pages for) the file
* referred to by the file descriptor fd to the disk device (or other permanent storage device)
* so that all changed information can be retrieved even after the system crashed or was rebooted.
* This includes writing through or flushing a disk cache if present. The call blocks until the device
* reports that the transfer has completed. It also flushes metadata information associated with the file (see stat(2)).
* - man fsync */
void sync() override
{
fsync(fd);
}
std::string getFileName() const override
{
return file_name;

View File

@ -13,6 +13,17 @@
#include <DB/IO/BufferWithOwnMemory.h>
namespace ProfileEvents
{
extern const Event WriteBufferFromFileDescriptorWrite;
extern const Event WriteBufferFromFileDescriptorWriteBytes;
}
namespace CurrentMetrics
{
extern const Metric Write;
}
namespace DB
{
@ -24,7 +35,7 @@ namespace ErrorCodes
extern const int CANNOT_TRUNCATE_FILE;
}
/** Работает с готовым файловым дескриптором. Не открывает и не закрывает файл.
/** Use ready file descriptor. Does not open or close a file.
*/
class WriteBufferFromFileDescriptor : public WriteBufferFromFileBase
{
@ -57,7 +68,7 @@ protected:
ProfileEvents::increment(ProfileEvents::WriteBufferFromFileDescriptorWriteBytes, bytes_written);
}
/// Имя или описание файла
/// Name or some description of file.
virtual std::string getFileName() const override
{
return "(fd = " + toString(fd) + ")";
@ -67,8 +78,8 @@ public:
WriteBufferFromFileDescriptor(int fd_ = -1, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, char * existing_memory = nullptr, size_t alignment = 0)
: WriteBufferFromFileBase(buf_size, existing_memory, alignment), fd(fd_) {}
/** Можно вызывать для инициализации, если нужный fd не был передан в конструктор.
* Менять fd во время работы нельзя.
/** Could be used before initialization if needed 'fd' was not passed to constructor.
* It's not possible to change 'fd' during work.
*/
void setFD(int fd_)
{
@ -100,10 +111,10 @@ public:
void sync() override
{
/// Если в буфере ещё остались данные - запишем их.
/// If buffer has pending data - write it.
next();
/// Попросим ОС сбросить данные на диск.
/// Request OS to sync data with storage medium.
int res = fsync(fd);
if (-1 == res)
throwFromErrno("Cannot fsync " + getFileName(), ErrorCodes::CANNOT_FSYNC);

View File

@ -0,0 +1,54 @@
#pragma once
#include <thread>
#include <mutex>
#include <condition_variable>
#include <unordered_map>
#include <string>
namespace DB
{
class Context;
/** Periodically (each minute, starting at 30 seconds offset)
* calculates and updates some metrics,
* that are not updated automatically (so, need to be asynchronously calculated).
*/
class AsynchronousMetrics
{
public:
AsynchronousMetrics(Context & context_)
: context(context_), thread([this] { run(); })
{
}
~AsynchronousMetrics();
using Value = double;
using Container = std::unordered_map<std::string, Value>;
/// Returns copy of all values.
Container getValues() const;
private:
Context & context;
bool quit {false};
std::mutex wait_mutex;
std::condition_variable wait_cond;
Container container;
mutable std::mutex container_mutex;
std::thread thread;
void run();
void update();
void set(const std::string & name, Value value);
};
}

View File

@ -99,7 +99,13 @@ private:
/** Оставить в каждом запросе цепочки UNION ALL только нужные столбцы секции SELECT.
* Однако, если используется хоть один DISTINCT в цепочке, то все столбцы считаются нужными,
* так как иначе DISTINCT работал бы по-другому.
* так как иначе DISTINCT работал бы по-другому.
*
* Always leave arrayJoin, because it changes number of rows.
*
* TODO If query doesn't have GROUP BY, but have aggregate functions,
* then leave at least one aggregate function,
* In order that fact of aggregation has not been lost.
*/
void rewriteExpressionList(const Names & required_column_names);

View File

@ -17,15 +17,20 @@
#include <DB/Common/CurrentMetrics.h>
namespace CurrentMetrics
{
extern const Metric Query;
}
namespace DB
{
/** Список исполняющихся в данный момент запросов.
* Также реализует ограничение на их количество.
/** List of currently executing queries.
* Also implements limit on their number.
*/
/** Информационная составляющая элемента списка процессов.
* Для вывода в SHOW PROCESSLIST. Не содержит никаких сложных объектов, которые что-то делают при копировании или в деструкторах.
/** Information of process list element.
* To output in SHOW PROCESSLIST query. Does not contain any complex objects, that do something on copy or destructor.
*/
struct ProcessInfo
{
@ -42,7 +47,7 @@ struct ProcessInfo
};
/// Запрос и данные о его выполнении.
/// Query and information about its execution.
struct ProcessListElement
{
String query;
@ -63,7 +68,7 @@ struct ProcessListElement
bool is_cancelled = false;
/// Здесь могут быть зарегистрированы временные таблицы. Изменять под mutex-ом.
/// Temporary tables could be registered here. Modify under mutex.
Tables temporary_tables;
@ -91,7 +96,7 @@ struct ProcessListElement
progress.incrementPiecewiseAtomically(value);
if (priority_handle)
priority_handle->waitIfNeed(std::chrono::seconds(1)); /// NOTE Можно сделать настраиваемым таймаут.
priority_handle->waitIfNeed(std::chrono::seconds(1)); /// NOTE Could make timeout customizable.
return !is_cancelled;
}
@ -114,14 +119,14 @@ struct ProcessListElement
};
/// Данные о запросах одного пользователя.
/// Data about queries for one user.
struct ProcessListForUser
{
/// Query_id -> ProcessListElement *
using QueryToElement = std::unordered_map<String, ProcessListElement *>;
QueryToElement queries;
/// Ограничение и счётчик памяти на все одновременно выполняющиеся запросы одного пользователя.
/// Limit and counter for memory of all simultaneously running queries of single user.
MemoryTracker user_memory_tracker;
};
@ -129,7 +134,7 @@ struct ProcessListForUser
class ProcessList;
/// Держит итератор на список, и удаляет элемент из списка в деструкторе.
/// Keeps iterator to process list and removes element in destructor.
class ProcessListEntry
{
private:
@ -158,7 +163,7 @@ public:
using Element = ProcessListElement;
using Entry = ProcessListEntry;
/// list, чтобы итераторы не инвалидировались. NOTE: можно заменить на cyclic buffer, но почти незачем.
/// list, for iterators not to invalidate. NOTE: could replace with cyclic buffer, but not worth.
using Container = std::list<Element>;
using Info = std::vector<ProcessInfo>;
/// User -> queries
@ -166,15 +171,15 @@ public:
private:
mutable std::mutex mutex;
mutable Poco::Condition have_space; /// Количество одновременно выполняющихся запросов стало меньше максимального.
mutable Poco::Condition have_space; /// Number of currently running queries has become less than maximum.
Container cont;
size_t cur_size; /// В C++03 std::list::size не O(1).
size_t max_size; /// Если 0 - не ограничено. Иначе, если пытаемся добавить больше - кидается исключение.
size_t cur_size; /// In C++03 or C++11 and old ABI, std::list::size is not O(1).
size_t max_size; /// 0 means no limit. Otherwise, when limit exceeded, an exception is thrown.
UserToQueries user_to_queries;
QueryPriorities priorities;
/// Ограничение и счётчик памяти на все одновременно выполняющиеся запросы.
/// Limit and counter for memory of all simultaneously running queries.
MemoryTracker total_memory_tracker;
public:
@ -182,17 +187,17 @@ public:
using EntryPtr = std::shared_ptr<ProcessListEntry>;
/** Зарегистрировать выполняющийся запрос. Возвращает refcounted объект, который удаляет запрос из списка при уничтожении.
* Если выполняющихся запросов сейчас слишком много - ждать не более указанного времени.
* Если времени не хватило - кинуть исключение.
/** Register running query. Returns refcounted object, that will remove element from list in destructor.
* If too much running queries - wait for not more than specified (see settings) amount of time.
* If timeout is passed - throw an exception.
*/
EntryPtr insert(const String & query_, const String & user_, const String & query_id_, const Poco::Net::IPAddress & ip_address_,
UInt16 port_, const Settings & settings);
/// Количество одновременно выполняющихся запросов.
/// Number of currently executing queries.
size_t size() const { return cur_size; }
/// Получить текущее состояние списка запросов.
/// Get current state of process list.
Info getInfo() const
{
std::lock_guard<std::mutex> lock(mutex);
@ -211,10 +216,10 @@ public:
max_size = max_size_;
}
/// Зарегистрировать временную таблицу. Потом её можно будет получить по query_id и по названию.
/// Register temporary table. Then it is accessible by query_id and name.
void addTemporaryTable(ProcessListElement & elem, const String & table_name, StoragePtr storage);
/// Найти временную таблицу по query_id и по названию. Замечание: плохо работает, если есть разные запросы с одним query_id.
/// Find temporary table by query_id and name. NOTE: doesn't work fine if there are many queries with same query_id.
StoragePtr tryGetTemporaryTable(const String & query_id, const String & table_name) const;
};

View File

@ -8,17 +8,26 @@
#include <DB/Common/CurrentMetrics.h>
/** Реализует приоритеты запросов.
* Позволяет приостанавливать выполнение запроса, если выполняется хотя бы один более приоритетный запрос.
namespace CurrentMetrics
{
extern const Metric QueryPreempted;
}
namespace DB
{
/** Implements query priorities in very primitive way.
* Allows to freeze query execution if at least one query of higher priority is executed.
*
* Величина приоритета - целое число, чем меньше - тем больше приоритет.
* Priority value is integer, smaller means higher priority.
*
* Приоритет 0 считается особенным - запросы с таким приоритетом выполняются всегда,
* не зависят от других запросов и не влияют на другие запросы.
* То есть 0 означает - не использовать приоритеты.
* Priority 0 is special - queries with that priority is always executed,
* not depends on other queries and not affect other queries.
* Thus 0 means - don't use priorities.
*
* NOTE Возможности сделать лучше:
* - реализовать ограничение на максимальное количество запросов с таким приоритетом.
* NOTE Possibilities for improvement:
* - implement limit on maximum number of running queries with same priority.
*/
class QueryPriorities
{
@ -30,7 +39,7 @@ private:
using Count = int;
/// Количество выполняющихся сейчас запросов с заданным приоритетом.
/// Number of currently running queries for each priority.
using Container = std::map<Priority, Count>;
std::mutex mutex;
@ -38,8 +47,8 @@ private:
Container container;
/** Если есть более приоритетные запросы - спать, пока они не перестанут быть или не истечёт таймаут.
* Возвращает true, если более приоритетные запросы исчезли на момент возврата из функции, false, если истёк таймаут.
/** If there are higher priority queries - sleep until they are finish or timeout happens.
* Returns true, if higher priority queries has finished at return of function, false, if timout exceeded.
*/
template <typename Duration>
bool waitIfNeed(Priority priority, Duration timeout)
@ -103,8 +112,8 @@ public:
using Handle = std::shared_ptr<HandleImpl>;
/** Зарегистрировать, что запрос с заданным приоритетом выполняется.
* Возвращается объект, в деструкторе которого, запись о запросе удаляется.
/** Register query with specified priority.
* Returns an object that remove record in destructor.
*/
Handle insert(Priority priority)
{
@ -117,3 +126,5 @@ public:
return std::make_shared<HandleImpl>(*this, *it);
}
};
}

View File

@ -33,7 +33,7 @@ public:
void renameColumns(const ASTSelectQuery & source);
/// Переписывает select_expression_list, чтобы вернуть только необходимые столбцы в правильном порядке.
void rewriteSelectExpressionList(const Names & column_names);
void rewriteSelectExpressionList(const Names & required_column_names);
bool isUnionAllHead() const { return (prev_union_all == nullptr) && next_union_all != nullptr; }

View File

@ -18,6 +18,11 @@
#include <condition_variable>
namespace CurrentMetrics
{
extern const Metric DistributedSend;
}
namespace DB
{

View File

@ -9,22 +9,28 @@
#include <DB/DataStreams/MarkInCompressedFile.h>
namespace ProfileEvents
{
extern const Event MarkCacheHits;
extern const Event MarkCacheMisses;
}
namespace DB
{
/// Оценка количества байтов, занимаемых засечками в кеше.
/// Estimate of number of bytes in cache for marks.
struct MarksWeightFunction
{
size_t operator()(const MarksInCompressedFile & marks) const
{
/// Можно еще добавить порядка 100 байт на накладные расходы вектора и кеша.
/// NOTE Could add extra 100 bytes for overhead of std::vector, cache structures and allocator.
return marks.size() * sizeof(MarkInCompressedFile);
}
};
/** Кэш засечек в столбце из StorageMergeTree.
/** Cache of 'marks' for StorageMergeTree.
* Marks is an index structure that addresses ranges in column file, corresponding to ranges of primary key.
*/
class MarkCache : public LRUCache<UInt128, MarksInCompressedFile, UInt128TrivialHash, MarksWeightFunction>
{
@ -35,7 +41,7 @@ public:
MarkCache(size_t max_size_in_bytes, const Delay & expiration_delay)
: Base(max_size_in_bytes, expiration_delay) {}
/// Посчитать ключ от пути к файлу и смещения.
/// Calculate key from path to file and offset.
static UInt128 hash(const String & path_to_file)
{
UInt128 key;

View File

@ -10,6 +10,12 @@
#include <DB/Common/formatReadable.h>
#include <DB/Common/CurrentMetrics.h>
namespace CurrentMetrics
{
extern const Metric DiskSpaceReservedForMerge;
}
namespace DB
{
@ -20,10 +26,10 @@ namespace ErrorCodes
}
/** Узнает количество свободного места в файловой системе.
* Можно "резервировать" место, чтобы разные операции могли согласованно планировать использование диска.
* Резервирования не разделяются по файловым системам.
* Вместо этого при запросе свободного места считается, что все резервирования сделаны в той же файловой системе.
/** Determines amount of free space in filesystem.
* Could "reserve" space, for different operations to plan disk space usage.
* Reservations are not separated for different filesystems,
* instead it is assumed, that all reservations are done within same filesystem.
*/
class DiskSpaceMonitor
{
@ -61,7 +67,7 @@ public:
}
}
/// Изменить количество зарезервированного места. При увеличении не делается проверка, что места достаточно.
/// Change amount of reserved space. When new_size is greater than before, availability of free space is not checked.
void update(size_t new_size)
{
std::lock_guard<std::mutex> lock(DiskSpaceMonitor::mutex);
@ -99,7 +105,7 @@ public:
size_t res = fs.f_bfree * fs.f_bsize;
/// Зарезервируем дополнительно 30 МБ. Когда я тестировал, statvfs показывал на несколько мегабайт больше свободного места, чем df.
/// Heuristic by Michael Kolupaev: reserve 30 MB more, because statvfs shows few megabytes more space than df.
res -= std::min(res, 30 * (1ul << 20));
std::lock_guard<std::mutex> lock(mutex);
@ -124,7 +130,7 @@ public:
return reservation_count;
}
/// Если места (приблизительно) недостаточно, бросает исключение.
/// If not enough (approximately) space, throw an exception.
static ReservationPtr reserve(const std::string & path, size_t size)
{
size_t free_bytes = getUnreservedFreeSpace(path);

View File

@ -7,6 +7,12 @@
#include <mutex>
#include <atomic>
namespace CurrentMetrics
{
extern const Metric Merge;
}
namespace DB
{

View File

@ -4,6 +4,12 @@
#include <DB/DataStreams/IBlockOutputStream.h>
#include <iomanip>
namespace ProfileEvents
{
extern const Event SynchronousMergeOnInsert;
}
namespace DB
{

View File

@ -6,6 +6,12 @@
#include <mutex>
namespace ProfileEvents
{
extern const Event SlowRead;
extern const Event ReadBackoff;
}
namespace DB
{
@ -55,21 +61,21 @@ using MergeTreeReadTaskPtr = std::unique_ptr<MergeTreeReadTask>;
class MergeTreeReadPool
{
public:
/** Пул может динамически уменьшать количество потоков, если чтения происходят медленно.
* Настройки порогов для такого уменьшения.
/** Pull could dynamically lower (backoff) number of threads, if read operation are too slow.
* Settings for that backoff.
*/
struct BackoffSettings
{
/// Обращать внимания только на чтения, занявшие не меньше такого количества времени. Если выставлено в 0 - значит backoff выключен.
/// Pay attention only to reads, that took at least this amount of time. If set to 0 - means backoff is disabled.
size_t min_read_latency_ms = 1000;
/// Считать события, когда пропускная способность меньше стольки байт в секунду.
/// Count events, when read throughput is less than specified bytes per second.
size_t max_throughput = 1048576;
/// Не обращать внимания на событие, если от предыдущего прошло меньше стольки-то времени.
/// Do not pay attention to event, if not enough time passed since previous event.
size_t min_interval_between_events_ms = 1000;
/// Количество событий, после которого количество потоков будет уменьшено.
/// Number of events to do backoff - to lower number of threads in pool.
size_t min_events = 2;
/// Константы выше приведены лишь в качестве примера.
/// Constants above is just an example.
BackoffSettings(const Settings & settings)
: min_read_latency_ms(settings.read_backoff_min_latency_ms.totalMilliseconds()),
max_throughput(settings.read_backoff_max_throughput),
@ -84,7 +90,7 @@ public:
BackoffSettings backoff_settings;
private:
/** Состояние для отслеживания скорости чтений.
/** State to track numbers of slow reads.
*/
struct BackoffState
{
@ -118,7 +124,7 @@ public:
{
const std::lock_guard<std::mutex> lock{mutex};
/// Если количество потоков было уменьшено из-за backoff, то не будем отдавать задачи для более чем backoff_state.current_threads потоков.
/// If number of threads was lowered due to backoff, then will assign work only for maximum 'backoff_state.current_threads' threads.
if (thread >= backoff_state.current_threads)
return nullptr;
@ -138,23 +144,24 @@ public:
auto & part = parts[part_idx];
auto & marks_in_part = thread_tasks.sum_marks_in_parts.back();
/// Берём весь кусок, если он достаточно мал
/// Get whole part to read if it is small enough.
auto need_marks = std::min(marks_in_part, min_marks_to_read);
/// Не будем оставлять в куске слишком мало строк.
/// Do not leave too little rows in part for next time.
if (marks_in_part > need_marks &&
marks_in_part - need_marks < min_marks_to_read)
need_marks = marks_in_part;
MarkRanges ranges_to_get_from_part;
/// Возьмем весь кусок, если он достаточно мал.
/// Get whole part to read if it is small enough.
if (marks_in_part <= need_marks)
{
const auto marks_to_get_from_range = marks_in_part;
/** Отрезки уже перечислены справа налево, reverse изначально сделан в MergeTreeDataSelectExecutor и
* поддержан в fillPerThreadInfo. */
/** Ranges are in right-to-left order, because 'reverse' was done in MergeTreeDataSelectExecutor
* and that order is supported in 'fillPerThreadInfo'.
*/
ranges_to_get_from_part = thread_task.ranges;
marks_in_part -= marks_to_get_from_range;
@ -167,7 +174,7 @@ public:
}
else
{
/// Цикл по отрезкам куска.
/// Loop through part ranges.
while (need_marks > 0 && !thread_task.ranges.empty())
{
auto & range = thread_task.ranges.back();
@ -187,9 +194,9 @@ public:
need_marks -= marks_to_get_from_range;
}
/** Перечислим справа налево, чтобы MergeTreeThreadBlockInputStream забирал
* отрезки с помощью .pop_back() (их порядок был сменен на "слева направо"
* из-за .pop_back() в этой ветке). */
/** Change order to right-to-left, for MergeTreeThreadBlockInputStream to get ranges with .pop_back()
* (order was changed to left-to-right due to .pop_back() above).
*/
std::reverse(std::begin(ranges_to_get_from_part), std::end(ranges_to_get_from_part));
}
@ -199,9 +206,9 @@ public:
per_part_remove_prewhere_column[part_idx], per_part_should_reorder[part_idx]);
}
/** Каждый обработчик задач может вызвать этот метод, передав в него информацию о скорости чтения.
* Если скорость чтения слишком низкая, то пул может принять решение уменьшить число потоков - не отдавать больше задач в некоторые потоки.
* Это позволяет бороться с чрезмерной нагрузкой на дисковую подсистему в случаях, когда чтения осуществляются не из page cache.
/** Each worker could call this method and pass information about read performance.
* If read performance is too low, pool could decide to lower number of threads: do not assign more tasks to several threads.
* This allows to overcome excessive load to disk subsystem, when reads are not from page cache.
*/
void profileFeedback(const ReadBufferFromFileBase::ProfileInfo info)
{
@ -254,9 +261,9 @@ private:
{
auto & part = parts[i];
/// Посчитаем засечки для каждого куска.
/// Read marks for every data part.
size_t sum_marks = 0;
/// Отрезки уже перечислены справа налево, reverse в MergeTreeDataSelectExecutor.
/// Ranges are in right-to-left order, due to 'reverse' in MergeTreeDataSelectExecutor.
for (const auto & range : part.ranges)
sum_marks += range.end - range.begin;
@ -291,8 +298,9 @@ private:
const NameSet pre_name_set{
std::begin(required_pre_column_names), std::end(required_pre_column_names)
};
/** Если выражение в PREWHERE - не столбец таблицы, не нужно отдавать наружу столбец с ним
* (от storage ожидают получить только столбцы таблицы). */
/** If expression in PREWHERE is not table column, then no need to return column with it to caller
* (because storage is expected only to read table columns).
*/
per_part_remove_prewhere_column.push_back(0 == pre_name_set.count(prewhere_column_name));
Names post_column_names;
@ -309,8 +317,9 @@ private:
if (check_columns)
{
/** Под part->columns_lock проверим, что все запрошенные столбцы в куске того же типа, что в таблице.
* Это может быть не так во время ALTER MODIFY. */
/** Under part->columns_lock check that all requested columns in part are of same type that in table.
* This could be violated during ALTER MODIFY.
*/
if (!required_pre_column_names.empty())
data.check(part.data_part->columns, required_pre_column_names);
if (!required_column_names.empty())
@ -351,12 +360,12 @@ private:
RangesInDataPart & part = parts.back();
size_t & marks_in_part = per_part_sum_marks.back();
/// Не будем брать из куска слишком мало строк.
/// Do not get too few rows from part.
if (marks_in_part >= min_marks_for_concurrent_read &&
need_marks < min_marks_for_concurrent_read)
need_marks = min_marks_for_concurrent_read;
/// Не будем оставлять в куске слишком мало строк.
/// Do not leave too few rows in part for next time.
if (marks_in_part > need_marks &&
marks_in_part - need_marks < min_marks_for_concurrent_read)
need_marks = marks_in_part;
@ -364,10 +373,10 @@ private:
MarkRanges ranges_to_get_from_part;
size_t marks_in_ranges = need_marks;
/// Возьмем весь кусок, если он достаточно мал.
/// Get whole part to read if it is small enough.
if (marks_in_part <= need_marks)
{
/// Оставим отрезки перечисленными справа налево для удобства использования .pop_back() в .getTask()
/// Leave ranges in right-to-left order for convenience to use .pop_back() in .getTask()
ranges_to_get_from_part = part.ranges;
marks_in_ranges = marks_in_part;
@ -377,7 +386,7 @@ private:
}
else
{
/// Цикл по отрезкам куска.
/// Loop through part ranges.
while (need_marks > 0)
{
if (part.ranges.empty())
@ -396,9 +405,9 @@ private:
part.ranges.pop_back();
}
/** Вновь перечислим отрезки справа налево, чтобы .getTask() мог забирать их
* с помощью .pop_back() (их порядок был сменен на "слева направо"
* из-за .pop_back() в этой ветке). */
/** Change order to right-to-left, for getTask() to get ranges with .pop_back()
* (order was changed to left-to-right due to .pop_back() above).
*/
std::reverse(std::begin(ranges_to_get_from_part), std::end(ranges_to_get_from_part));
}
@ -410,10 +419,12 @@ private:
}
}
/** Если некоторых запрошенных столбцов нет в куске,
* то выясняем, какие столбцы может быть необходимо дополнительно прочитать,
* чтобы можно было вычислить DEFAULT выражение для этих столбцов.
* Добавляет их в columns. */
* то выясняем, какие столбцы может быть необходимо дополнительно прочитать,
* чтобы можно было вычислить DEFAULT выражение для этих столбцов.
* Добавляет их в columns.
*/
NameSet injectRequiredColumns(const MergeTreeData::DataPartPtr & part, Names & columns) const
{
NameSet required_columns{std::begin(columns), std::end(columns)};

View File

@ -9,65 +9,65 @@ namespace DB
{
/** Тонкие настройки работы MergeTree.
* Могут быть загружены из конфига.
/** Advanced settings of MergeTree.
* Could be loaded from config.
*/
struct MergeTreeSettings
{
/** Настройки слияний. */
/** Merge settings. */
/// Опеределяет, насколько разбалансированные объединения мы готовы делать.
/// Чем больше, тем более разбалансированные. Желательно, чтобы было больше, чем 1 / max_parts_to_merge_at_once.
/// Determines how unbalanced merges we could do.
/// Bigger values for more unbalanced merges. It is advisable to be more than 1 / max_parts_to_merge_at_once.
double size_ratio_coefficient_to_merge_parts = 0.25;
/// Сколько за раз сливать кусков.
/// Трудоемкость выбора кусков O(N * max_parts_to_merge_at_once).
/// How many parts could be merges at once.
/// Labour coefficient of parts selection O(N * max_parts_to_merge_at_once).
size_t max_parts_to_merge_at_once = 10;
/// Но пока суммарный размер кусков слишком маленький (меньше такого количества байт), можно сливать и больше кусков за раз.
/// Это сделано, чтобы быстрее сливать очень уж маленькие куски, которых может быстро накопиться много.
/// But while total size of parts is too small(less than this number of bytes), we could merge more parts at once.
/// This is intentionally to allow quicker merge of too small parts, which could be accumulated too quickly.
size_t merge_more_parts_if_sum_bytes_is_less_than = 100 * 1024 * 1024;
size_t max_parts_to_merge_at_once_if_small = 100;
/// Куски настолько большого размера объединять нельзя вообще.
/// Parts of more than this bytes couldn't be merged at all.
size_t max_bytes_to_merge_parts = 10ULL * 1024 * 1024 * 1024;
/// Не больше половины потоков одновременно могут выполнять слияния, в которых участвует хоть один кусок хотя бы такого размера.
/// No more than half of threads could execute merge of parts, if at least one part more than this size in bytes.
size_t max_bytes_to_merge_parts_small = 250 * 1024 * 1024;
/// Куски настолько большого размера в сумме, объединять нельзя вообще.
/// Parts more than this size in bytes deny to merge at all.
size_t max_sum_bytes_to_merge_parts = 25ULL * 1024 * 1024 * 1024;
/// Во столько раз ночью увеличиваем коэффициент.
/// How much times we increase the coefficient at night.
size_t merge_parts_at_night_inc = 10;
/// Сколько заданий на слияние кусков разрешено одновременно иметь в очереди ReplicatedMergeTree.
/// How many tasks of merging parts are allowed simultaneously in ReplicatedMergeTree queue.
size_t max_replicated_merges_in_queue = 6;
/// Через сколько секунд удалять ненужные куски.
/// How many seconds to keep obsolete parts.
time_t old_parts_lifetime = 8 * 60;
/// Через сколько секунд удалять tmp_-директории.
/// How many seconds to keep tmp_-directories.
time_t temporary_directories_lifetime = 86400;
/** Настройки вставок. */
/** Inserts settings. */
/// Если в таблице хотя бы столько активных кусков, искусственно замедлять вставки в таблицу.
/// If table contains at least that many active parts, artificially slow down insert into table.
size_t parts_to_delay_insert = 150;
/// Если в таблице хотя бы столько активных кусков, выдавать ошибку 'Too much parts ...'
/// If more than this number active parts, throw 'Too much parts ...' exception
size_t parts_to_throw_insert = 300;
/// Насколько секунд можно максимально задерживать вставку в таблицу типа MergeTree, если в ней много недомердженных кусков.
/// Max delay of inserting data into MergeTree table in seconds, if there are a lot of unmerged parts.
size_t max_delay_to_insert = 200;
/** Настройки репликации. */
/** Replication settings. */
/// Для скольки последних блоков хранить хеши в ZooKeeper.
/// How many last blocks of hashes should be kept in ZooKeeper.
size_t replicated_deduplication_window = 100;
/// Хранить примерно столько последних записей в логе в ZooKeeper, даже если они никому уже не нужны.
/// Не влияет на работу таблиц; используется только чтобы успеть посмотреть на лог в ZooKeeper глазами прежде, чем его очистят.
/// Keep about this number of last records in ZooKeeper log, even if they are obsolete.
/// It doesn't affect work of tables: used only to diagnose ZooKeeper log before cleaning.
size_t replicated_logs_to_keep = 100;
/// After specified amount of time passed after replication log entry creation
@ -77,35 +77,38 @@ struct MergeTreeSettings
time_t prefer_fetch_merged_part_time_threshold = 3600;
size_t prefer_fetch_merged_part_size_threshold = 10ULL * 1024 * 1024 * 1024;
/// Настройки минимального количества битых данных, при котором отказываться автоматически их удалять.
/// Max broken parts, if more - deny automatic deletion.
size_t max_suspicious_broken_parts = 10;
/// Не выполнять ALTER, если количество файлов для модификации (удаления, добавления) больше указанного.
/// Not apply ALTER if number of files for modification(deletion, addition) more than this.
size_t max_files_to_modify_in_alter_columns = 50;
/// Не выполнять ALTER, если количество файлов для удаления больше указанного.
/// Not apply ALTER, if number of files for deletion more than this.
size_t max_files_to_remove_in_alter_columns = 10;
/// Максимальное количество ошибок при загрузке кусков, при котором ReplicatedMergeTree соглашается запускаться.
/// Maximum number of errors during parts loading, while ReplicatedMergeTree still allowed to start.
size_t replicated_max_unexpected_parts = 3;
size_t replicated_max_unexpectedly_merged_parts = 2;
size_t replicated_max_missing_obsolete_parts = 5;
size_t replicated_max_missing_active_parts = 20;
/// Если отношение количества ошибок к общему количеству кусков меньше указанного значения, то всё-равно можно запускаться.
/// If ration of wrong parts to total number of parts is less than this - allow to start anyway.
double replicated_max_ratio_of_wrong_parts = 0.05;
/** Настройки проверки отставания реплик. */
/// In seconds.
size_t zookeeper_session_expiration_check_period = 60;
/// Периодичность для проверки отставания и сравнения его с другими репликами.
/** Check delay of replicas settings. */
/// Period to check replication delay and compare with other replicas.
size_t check_delay_period = 60;
/// Минимальное отставание от других реплик, при котором нужно уступить лидерство. Здесь и далее, если 0 - не ограничено.
/// Minimal delay from other replicas to yield leadership. Here and further 0 means unlimited.
size_t min_relative_delay_to_yield_leadership = 120;
/// Минимальное отставание от других реплик, при котором нужно закрыться от запросов и не выдавать Ok для проверки статуса.
/// Minimal delay from other replicas to close, stop serving requests and not return Ok during status check.
size_t min_relative_delay_to_close = 300;
/// Минимальное абсолютное отставание, при котором нужно закрыться от запросов и не выдавать Ok для проверки статуса.
/// Minimal absolute delay to close, stop serving requests and not return Ok during status check.
size_t min_absolute_delay_to_close = 0;
@ -143,6 +146,7 @@ struct MergeTreeSettings
SET_SIZE_T(replicated_max_missing_obsolete_parts);
SET_SIZE_T(replicated_max_missing_active_parts);
SET_DOUBLE(replicated_max_ratio_of_wrong_parts);
SET_SIZE_T(zookeeper_session_expiration_check_period);
SET_SIZE_T(check_delay_period);
SET_SIZE_T(min_relative_delay_to_yield_leadership);
SET_SIZE_T(min_relative_delay_to_close);

View File

@ -157,7 +157,7 @@ public:
* Если в процессе обработки было исключение - сохраняет его в entry.
* Возвращает true, если в процессе обработки не было исключений.
*/
bool processEntry(zkutil::ZooKeeperPtr zookeeper, LogEntryPtr & entry, const std::function<bool(LogEntryPtr &)> func);
bool processEntry(std::function<zkutil::ZooKeeperPtr()> get_zookeeper, LogEntryPtr & entry, const std::function<bool(LogEntryPtr &)> func);
/// Будет ли кусок в будущем слит в более крупный (или мерджи кусков в данном диапазоне запрещены)?
bool partWillBeMergedOrMergesDisabled(const String & part_name) const;

View File

@ -488,19 +488,6 @@ private:
using ReplicaToSpaceInfo = std::map<std::string, ReplicaSpaceInfo>;
struct PartitionMergeLockInfo
{
PartitionMergeLockInfo(const std::string & fake_part_name_)
: fake_part_name(fake_part_name_), ref_count(1)
{
}
std::string fake_part_name;
unsigned int ref_count;
};
using PartitionToMergeLock = std::map<std::string, PartitionMergeLockInfo>;
/** Проверяет, что структуры локальной и реплицируемых таблиц совпадают.
*/
void enforceShardsConsistency(const WeightedZooKeeperPaths & weighted_zookeeper_paths);
@ -513,9 +500,6 @@ private:
/** Проверяет, что имеется достаточно свободного места локально и на всех репликах.
*/
bool checkSpaceForResharding(const ReplicaToSpaceInfo & replica_to_space_info, size_t partition_size) const;
std::mutex mutex_partition_to_merge_lock;
PartitionToMergeLock partition_to_merge_lock;
};

View File

@ -0,0 +1,44 @@
#pragma once
#include <ext/shared_ptr_helper.hpp>
#include <DB/Storages/IStorage.h>
namespace DB
{
class AsynchronousMetrics;
/** Implements system table asynchronous_metrics, which allows to get values of periodically (asynchronously) updated metrics.
*/
class StorageSystemAsynchronousMetrics : private ext::shared_ptr_helper<StorageSystemAsynchronousMetrics>, public IStorage
{
friend class ext::shared_ptr_helper<StorageSystemAsynchronousMetrics>;
public:
static StoragePtr create(const std::string & name_, const AsynchronousMetrics & async_metrics_);
std::string getName() const override { return "SystemAsynchronousMetrics"; }
std::string getTableName() const override { return name; }
const NamesAndTypesList & getColumnsListImpl() const override { return columns; }
BlockInputStreams read(
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE,
unsigned threads = 1) override;
private:
const std::string name;
NamesAndTypesList columns;
const AsynchronousMetrics & async_metrics;
StorageSystemAsynchronousMetrics(const std::string & name_, const AsynchronousMetrics & async_metrics_);
};
}

View File

@ -24,6 +24,11 @@
#include <DB/Common/CurrentMetrics.h>
namespace CurrentMetrics
{
extern const Metric SendExternalTables;
}
namespace DB
{
@ -286,7 +291,7 @@ void Connection::sendQuery(const String & query, const String & query_id_, UInt6
block_in.reset();
block_out.reset();
/// Если версия сервера достаточно новая и стоит флаг, отправляем пустой блок, символизируя конец передачи данных.
/// If server version is new enough, send empty block which meand end of data.
if (server_revision >= DBMS_MIN_REVISION_WITH_TEMPORARY_TABLES && !with_pending_data)
{
sendData(Block());

View File

@ -1,7 +1,56 @@
#include <DB/Common/CurrentMetrics.h>
/// Available metrics. Add something here as you wish.
#define APPLY_FOR_METRICS(M) \
M(Query) \
M(Merge) \
M(ReplicatedFetch) \
M(ReplicatedSend) \
M(ReplicatedChecks) \
M(BackgroundPoolTask) \
M(DiskSpaceReservedForMerge) \
M(DistributedSend) \
M(QueryPreempted) \
M(TCPConnection) \
M(HTTPConnection) \
M(InterserverConnection) \
M(OpenFileForRead) \
M(OpenFileForWrite) \
M(Read) \
M(Write) \
M(SendExternalTables) \
M(QueryThread) \
M(ReadonlyReplica) \
M(LeaderReplica) \
M(MemoryTracking) \
M(LeaderElection) \
M(EphemeralNode) \
M(ZooKeeperWatch) \
namespace CurrentMetrics
{
std::atomic<Value> values[END] {}; /// Глобальная переменная - инициализируется нулями.
#define M(NAME) extern const Metric NAME = __COUNTER__;
APPLY_FOR_METRICS(M)
#undef M
constexpr Metric END = __COUNTER__;
std::atomic<Value> values[END] {}; /// Global variable, initialized by zeros.
const char * getDescription(Metric event)
{
static const char * descriptions[] =
{
#define M(NAME) #NAME,
APPLY_FOR_METRICS(M)
#undef M
};
return descriptions[event];
}
Metric end() { return END; }
}
#undef APPLY_FOR_METRICS

View File

@ -9,12 +9,17 @@
#include <DB/Common/MemoryTracker.h>
namespace CurrentMetrics
{
extern const Metric MemoryTracking;
}
namespace DB
{
namespace ErrorCodes
{
extern const int MEMORY_LIMIT_EXCEEDED;
}
namespace ErrorCodes
{
extern const int MEMORY_LIMIT_EXCEEDED;
}
}

View File

@ -1,7 +1,124 @@
#include <DB/Common/ProfileEvents.h>
/// Available events. Add something here as you wish.
#define APPLY_FOR_EVENTS(M) \
M(Query) \
M(SelectQuery) \
M(InsertQuery) \
M(FileOpen) \
M(Seek) \
M(ReadBufferFromFileDescriptorRead) \
M(ReadBufferFromFileDescriptorReadBytes) \
M(WriteBufferFromFileDescriptorWrite) \
M(WriteBufferFromFileDescriptorWriteBytes) \
M(ReadBufferAIORead) \
M(ReadBufferAIOReadBytes) \
M(WriteBufferAIOWrite) \
M(WriteBufferAIOWriteBytes) \
M(ReadCompressedBytes) \
M(CompressedReadBufferBlocks) \
M(CompressedReadBufferBytes) \
M(UncompressedCacheHits) \
M(UncompressedCacheMisses) \
M(UncompressedCacheWeightLost) \
M(IOBufferAllocs) \
M(IOBufferAllocBytes) \
M(ArenaAllocChunks) \
M(ArenaAllocBytes) \
M(FunctionExecute) \
M(MarkCacheHits) \
M(MarkCacheMisses) \
M(CreatedReadBufferOrdinary) \
M(CreatedReadBufferAIO) \
M(CreatedWriteBufferOrdinary) \
M(CreatedWriteBufferAIO) \
\
M(ReplicatedPartFetches) \
M(ReplicatedPartFailedFetches) \
M(ObsoleteReplicatedParts) \
M(ReplicatedPartMerges) \
M(ReplicatedPartFetchesOfMerged) \
M(ReplicatedPartChecks) \
M(ReplicatedPartChecksFailed) \
M(ReplicatedDataLoss) \
\
M(DelayedInserts) \
M(RejectedInserts) \
M(DelayedInsertsMilliseconds) \
M(SynchronousMergeOnInsert) \
\
M(ZooKeeperInit) \
M(ZooKeeperTransactions) \
M(ZooKeeperGetChildren) \
M(ZooKeeperCreate) \
M(ZooKeeperRemove) \
M(ZooKeeperExists) \
M(ZooKeeperGet) \
M(ZooKeeperSet) \
M(ZooKeeperMulti) \
M(ZooKeeperExceptions) \
\
M(DistributedConnectionFailTry) \
M(DistributedConnectionFailAtAll) \
\
M(CompileAttempt) \
M(CompileSuccess) \
\
M(ExternalSortWritePart) \
M(ExternalSortMerge) \
M(ExternalAggregationWritePart) \
M(ExternalAggregationMerge) \
M(ExternalAggregationCompressedBytes) \
M(ExternalAggregationUncompressedBytes) \
\
M(SlowRead) \
M(ReadBackoff) \
\
M(ReplicaYieldLeadership) \
M(ReplicaPartialShutdown) \
\
M(SelectedParts) \
M(SelectedRanges) \
M(SelectedMarks) \
\
M(MergedRows) \
M(MergedUncompressedBytes) \
\
M(MergeTreeDataWriterRows) \
M(MergeTreeDataWriterUncompressedBytes) \
M(MergeTreeDataWriterCompressedBytes) \
M(MergeTreeDataWriterBlocks) \
M(MergeTreeDataWriterBlocksAlreadySorted) \
\
M(ObsoleteEphemeralNode) \
M(CannotRemoveEphemeralNode) \
M(LeaderElectionAcquiredLeadership) \
\
M(RegexpCreated) \
namespace ProfileEvents
{
std::atomic<size_t> counters[END] {}; /// Глобальная переменная - инициализируется нулями.
#define M(NAME) extern const Event NAME = __COUNTER__;
APPLY_FOR_EVENTS(M)
#undef M
constexpr Event END = __COUNTER__;
std::atomic<Count> counters[END] {}; /// Global variable, initialized by zeros.
const char * getDescription(Event event)
{
static const char * descriptions[] =
{
#define M(NAME) #NAME,
APPLY_FOR_EVENTS(M)
#undef M
};
return descriptions[event];
}
Event end() { return END; }
}
#undef APPLY_FOR_EVENTS

View File

@ -3,6 +3,11 @@
#include <DB/DataStreams/AggregatingBlockInputStream.h>
namespace ProfileEvents
{
extern const Event ExternalAggregationMerge;
}
namespace DB
{

View File

@ -6,6 +6,12 @@
#include <DB/IO/CompressedWriteBuffer.h>
namespace ProfileEvents
{
extern const Event ExternalSortWritePart;
extern const Event ExternalSortMerge;
}
namespace DB
{

View File

@ -4,6 +4,12 @@
#include <DB/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.h>
namespace CurrentMetrics
{
extern const Metric QueryThread;
}
namespace DB
{

View File

@ -9,6 +9,18 @@
#include <experimental/optional>
namespace ProfileEvents
{
extern const Event FileOpen;
extern const Event ReadBufferAIORead;
extern const Event ReadBufferAIOReadBytes;
}
namespace CurrentMetrics
{
extern const Metric Read;
}
namespace DB
{

View File

@ -5,6 +5,19 @@
#include <sys/types.h>
#include <sys/stat.h>
namespace ProfileEvents
{
extern const Event FileOpen;
extern const Event WriteBufferAIOWrite;
extern const Event WriteBufferAIOWriteBytes;
}
namespace CurrentMetrics
{
extern const Metric Write;
}
namespace DB
{
@ -26,10 +39,10 @@ namespace ErrorCodes
/// Примечание: выделяется дополнительная страница, которая содежрит те данные, которые
/// не влезают в основной буфер.
WriteBufferAIO::WriteBufferAIO(const std::string & filename_, size_t buffer_size_, int flags_, mode_t mode_,
char * existing_memory_)
: WriteBufferFromFileBase(buffer_size_ + DEFAULT_AIO_FILE_BLOCK_SIZE, existing_memory_, DEFAULT_AIO_FILE_BLOCK_SIZE),
flush_buffer(BufferWithOwnMemory<WriteBuffer>(this->memory.size(), nullptr, DEFAULT_AIO_FILE_BLOCK_SIZE)),
filename(filename_)
char * existing_memory_)
: WriteBufferFromFileBase(buffer_size_ + DEFAULT_AIO_FILE_BLOCK_SIZE, existing_memory_, DEFAULT_AIO_FILE_BLOCK_SIZE),
flush_buffer(BufferWithOwnMemory<WriteBuffer>(this->memory.size(), nullptr, DEFAULT_AIO_FILE_BLOCK_SIZE)),
filename(filename_)
{
/// Исправить информацию о размере буферов, чтобы дополнительные страницы не касались базового класса BufferBase.
this->buffer().resize(this->buffer().size() - DEFAULT_AIO_FILE_BLOCK_SIZE);

View File

@ -3,6 +3,13 @@
#include <DB/IO/ReadBufferAIO.h>
#include <DB/Common/ProfileEvents.h>
namespace ProfileEvents
{
extern const Event CreatedReadBufferOrdinary;
extern const Event CreatedReadBufferAIO;
}
namespace DB
{

View File

@ -3,6 +3,13 @@
#include <DB/IO/WriteBufferAIO.h>
#include <DB/Common/ProfileEvents.h>
namespace ProfileEvents
{
extern const Event CreatedWriteBufferOrdinary;
extern const Event CreatedWriteBufferAIO;
}
namespace DB
{

View File

@ -22,6 +22,18 @@
#include <common/ClickHouseRevision.h>
namespace ProfileEvents
{
extern const Event ExternalAggregationWritePart;
extern const Event ExternalAggregationCompressedBytes;
extern const Event ExternalAggregationUncompressedBytes;
}
namespace CurrentMetrics
{
extern const Metric QueryThread;
}
namespace DB
{

View File

@ -0,0 +1,231 @@
#include <DB/Interpreters/AsynchronousMetrics.h>
#include <DB/Common/Exception.h>
#include <DB/Common/setThreadName.h>
#include <DB/Common/CurrentMetrics.h>
#include <DB/Storages/MarkCache.h>
#include <DB/Storages/StorageMergeTree.h>
#include <DB/Storages/StorageReplicatedMergeTree.h>
#include <DB/IO/UncompressedCache.h>
#include <DB/Databases/IDatabase.h>
#include <chrono>
#ifndef NO_TCMALLOC
#include <gperftools/malloc_extension.h>
/// Initializing malloc extension in global constructor as required.
struct MallocExtensionInitializer
{
MallocExtensionInitializer()
{
MallocExtension::Initialize();
}
} malloc_extension_initializer;
#endif
namespace DB
{
AsynchronousMetrics::~AsynchronousMetrics()
{
try
{
{
std::lock_guard<std::mutex> lock{wait_mutex};
quit = true;
}
wait_cond.notify_one();
thread.join();
}
catch (...)
{
DB::tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
AsynchronousMetrics::Container AsynchronousMetrics::getValues() const
{
std::lock_guard<std::mutex> lock{container_mutex};
return container;
}
void AsynchronousMetrics::set(const std::string & name, Value value)
{
std::lock_guard<std::mutex> lock{container_mutex};
container[name] = value;
}
void AsynchronousMetrics::run()
{
setThreadName("AsyncMetrics");
std::unique_lock<std::mutex> lock{wait_mutex};
/// Next minute + 30 seconds. To be distant with moment of transmission of metrics, see MetricsTransmitter.
const auto get_next_minute = []
{
return std::chrono::time_point_cast<std::chrono::minutes, std::chrono::system_clock>(
std::chrono::system_clock::now() + std::chrono::minutes(1)) + std::chrono::seconds(30);
};
while (true)
{
if (wait_cond.wait_until(lock, get_next_minute(), [this] { return quit; }))
break;
try
{
update();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
}
template <typename Max, typename T>
static void calculateMax(Max & max, T x)
{
if (Max(x) > max)
max = x;
}
template <typename Max, typename Sum, typename T>
static void calculateMaxAndSum(Max & max, Sum & sum, T x)
{
sum += x;
if (Max(x) > max)
max = x;
}
void AsynchronousMetrics::update()
{
{
if (auto mark_cache = context.getMarkCache())
{
set("MarkCacheBytes", mark_cache->weight());
set("MarkCacheFiles", mark_cache->count());
}
}
{
if (auto uncompressed_cache = context.getUncompressedCache())
{
set("UncompressedCacheBytes", uncompressed_cache->weight());
set("UncompressedCacheCells", uncompressed_cache->count());
}
}
{
auto databases = context.getDatabases();
size_t max_queue_size = 0;
size_t max_inserts_in_queue = 0;
size_t max_merges_in_queue = 0;
size_t sum_queue_size = 0;
size_t sum_inserts_in_queue = 0;
size_t sum_merges_in_queue = 0;
size_t max_absolute_delay = 0;
size_t max_relative_delay = 0;
size_t max_part_count_for_partition = 0;
for (const auto & db : databases)
{
for (auto iterator = db.second->getIterator(); iterator->isValid(); iterator->next())
{
auto & table = iterator->table();
StorageMergeTree * table_merge_tree = typeid_cast<StorageMergeTree *>(table.get());
StorageReplicatedMergeTree * table_replicated_merge_tree = typeid_cast<StorageReplicatedMergeTree *>(table.get());
if (table_replicated_merge_tree)
{
StorageReplicatedMergeTree::Status status;
table_replicated_merge_tree->getStatus(status, false);
calculateMaxAndSum(max_queue_size, sum_queue_size, status.queue.queue_size);
calculateMaxAndSum(max_inserts_in_queue, sum_inserts_in_queue, status.queue.inserts_in_queue);
calculateMaxAndSum(max_merges_in_queue, sum_merges_in_queue, status.queue.merges_in_queue);
try
{
time_t absolute_delay = 0;
time_t relative_delay = 0;
table_replicated_merge_tree->getReplicaDelays(absolute_delay, relative_delay);
calculateMax(max_absolute_delay, absolute_delay);
calculateMax(max_relative_delay, relative_delay);
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__,
"Cannot get replica delay for table: " + backQuoteIfNeed(db.first) + "." + backQuoteIfNeed(iterator->name()));
}
calculateMax(max_part_count_for_partition, table_replicated_merge_tree->getData().getMaxPartsCountForMonth());
if (auto unreplicated_data = table_replicated_merge_tree->getUnreplicatedData())
calculateMax(max_part_count_for_partition, unreplicated_data->getMaxPartsCountForMonth());
}
if (table_merge_tree)
{
calculateMax(max_part_count_for_partition, table_merge_tree->getData().getMaxPartsCountForMonth());
}
}
}
set("ReplicasMaxQueueSize", max_queue_size);
set("ReplicasMaxInsertsInQueue", max_inserts_in_queue);
set("ReplicasMaxMergesInQueue", max_merges_in_queue);
set("ReplicasSumQueueSize", sum_queue_size);
set("ReplicasSumInsertsInQueue", sum_inserts_in_queue);
set("ReplicasSumMergesInQueue", sum_merges_in_queue);
set("ReplicasMaxAbsoluteDelay", max_absolute_delay);
set("ReplicasMaxRelativeDelay", max_relative_delay);
set("MaxPartCountForPartition", max_part_count_for_partition);
}
#ifndef NO_TCMALLOC
{
/// tcmalloc related metrics. Remove if you switch to different allocator.
MallocExtension & malloc_extension = *MallocExtension::instance();
auto malloc_metrics =
{
"generic.current_allocated_bytes",
"generic.heap_size",
"tcmalloc.current_total_thread_cache_bytes",
"tcmalloc.central_cache_free_bytes",
"tcmalloc.transfer_cache_free_bytes",
"tcmalloc.thread_cache_free_bytes",
"tcmalloc.pageheap_free_bytes",
"tcmalloc.pageheap_unmapped_bytes",
};
for (auto malloc_metric : malloc_metrics)
{
size_t value = 0;
if (malloc_extension.GetNumericProperty(malloc_metric, &value))
set(malloc_metric, value);
}
}
#endif
/// Add more metrics as you wish.
}
}

View File

@ -14,6 +14,12 @@
#include <DB/Interpreters/Compiler.h>
namespace ProfileEvents
{
extern const Event CompileAttempt;
extern const Event CompileSuccess;
}
namespace DB
{

View File

@ -8,6 +8,11 @@
#include <set>
namespace ProfileEvents
{
extern const Event FunctionExecute;
}
namespace DB
{

View File

@ -15,6 +15,12 @@
#include <DB/Interpreters/InterpreterSelectQuery.h>
#include <DB/Interpreters/InterpreterInsertQuery.h>
namespace ProfileEvents
{
extern const Event InsertQuery;
}
namespace DB
{

View File

@ -35,6 +35,11 @@
#include <DB/Core/Field.h>
namespace ProfileEvents
{
extern const Event SelectQuery;
}
namespace DB
{

View File

@ -20,6 +20,11 @@
#include <DB/Interpreters/executeQuery.h>
namespace ProfileEvents
{
extern const Event Query;
}
namespace DB
{

View File

@ -43,13 +43,17 @@ ConfigReloader::~ConfigReloader()
{
try
{
LOG_DEBUG(log, "ConfigReloader::~ConfigReloader()");
quit = true;
{
std::lock_guard<std::mutex> lock{mutex};
quit = true;
}
cond.notify_one();
thread.join();
}
catch (...)
{
tryLogCurrentException("~ConfigReloader");
DB::tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
@ -58,9 +62,13 @@ void ConfigReloader::run()
{
setThreadName("ConfigReloader");
while (!quit)
std::unique_lock<std::mutex> lock{mutex};
while (true)
{
std::this_thread::sleep_for(reload_interval);
if (cond.wait_for(lock, reload_interval, [this] { return quit; }))
break;
reloadIfNewer(false, false);
}
}

View File

@ -5,7 +5,8 @@
#include <time.h>
#include <string>
#include <thread>
#include <atomic>
#include <mutex>
#include <condition_variable>
#include <list>
@ -95,7 +96,9 @@ private:
FilesChangesTracker last_main_config_files;
FilesChangesTracker last_users_config_files;
std::atomic<bool> quit{false};
bool quit {false};
std::mutex mutex;
std::condition_variable cond;
std::thread thread;
Poco::Logger * log = &Logger::get("ConfigReloader");

View File

@ -5,6 +5,11 @@
#include "Server.h"
namespace CurrentMetrics
{
extern const Metric HTTPConnection;
}
namespace DB
{
@ -21,7 +26,7 @@ public:
struct Output
{
std::shared_ptr<WriteBufferFromHTTPServerResponse> out;
/// Используется для выдачи ответа. Равен либо out, либо CompressedWriteBuffer(*out), в зависимости от настроек.
/// Used for sending response. Points to 'out', or to CompressedWriteBuffer(*out), depending on settings.
std::shared_ptr<WriteBuffer> out_maybe_compressed;
};
@ -38,7 +43,7 @@ private:
Logger * log;
/// Функция также инициализирует used_output.
/// Also initializes 'used_output'.
void processQuery(
Poco::Net::HTTPServerRequest & request,
HTMLForm & params,

View File

@ -4,6 +4,11 @@
#include <DB/Common/CurrentMetrics.h>
namespace CurrentMetrics
{
extern const Metric InterserverConnection;
}
namespace DB
{

View File

@ -3,6 +3,7 @@
#include <daemon/BaseDaemon.h>
#include <DB/Common/setThreadName.h>
#include <DB/Common/CurrentMetrics.h>
#include <DB/Interpreters/AsynchronousMetrics.h>
namespace DB
@ -23,7 +24,7 @@ MetricsTransmitter::~MetricsTransmitter()
}
catch (...)
{
DB::tryLogCurrentException(__FUNCTION__);
DB::tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
@ -32,12 +33,15 @@ void MetricsTransmitter::run()
{
setThreadName("MetricsTransmit");
const auto get_next_minute = [] {
/// Next minute at 00 seconds. To avoid time drift and transmit values exactly each minute.
const auto get_next_minute = []
{
return std::chrono::time_point_cast<std::chrono::minutes, std::chrono::system_clock>(
std::chrono::system_clock::now() + std::chrono::minutes(1)
);
std::chrono::system_clock::now() + std::chrono::minutes(1));
};
ProfileEvents::Count prev_counters[ProfileEvents::end()] {};
std::unique_lock<std::mutex> lock{mutex};
while (true)
@ -45,32 +49,39 @@ void MetricsTransmitter::run()
if (cond.wait_until(lock, get_next_minute(), [this] { return quit; }))
break;
transmit();
transmit(prev_counters);
}
}
void MetricsTransmitter::transmit()
void MetricsTransmitter::transmit(ProfileEvents::Count * prev_counters)
{
GraphiteWriter::KeyValueVector<ssize_t> key_vals{};
key_vals.reserve(ProfileEvents::END + CurrentMetrics::END);
auto async_metrics_values = async_metrics.getValues();
for (size_t i = 0; i < ProfileEvents::END; ++i)
GraphiteWriter::KeyValueVector<ssize_t> key_vals{};
key_vals.reserve(ProfileEvents::end() + CurrentMetrics::end() + async_metrics_values.size());
for (size_t i = 0, end = ProfileEvents::end(); i < end; ++i)
{
const auto counter = ProfileEvents::counters[i].load(std::memory_order_relaxed);
const auto counter_increment = counter - prev_counters[i].load(std::memory_order_relaxed);
prev_counters[i].store(counter, std::memory_order_relaxed);
const auto counter_increment = counter - prev_counters[i];
prev_counters[i] = counter;
std::string key {ProfileEvents::getDescription(static_cast<ProfileEvents::Event>(i))};
key_vals.emplace_back(event_path_prefix + key, counter_increment);
key_vals.emplace_back(profile_events_path_prefix + key, counter_increment);
}
for (size_t i = 0; i < CurrentMetrics::END; ++i)
for (size_t i = 0, end = CurrentMetrics::end(); i < end; ++i)
{
const auto value = CurrentMetrics::values[i].load(std::memory_order_relaxed);
std::string key {CurrentMetrics::getDescription(static_cast<CurrentMetrics::Metric>(i))};
key_vals.emplace_back(metrics_path_prefix + key, value);
key_vals.emplace_back(current_metrics_path_prefix + key, value);
}
for (const auto & name_value : async_metrics_values)
{
key_vals.emplace_back(asynchronous_metrics_path_prefix + name_value.first, name_value.second);
}
BaseDaemon::instance().writeToGraphite(key_vals);

View File

@ -10,30 +10,34 @@
namespace DB
{
class AsynchronousMetrics;
/** Automatically sends
* - difference of ProfileEvents;
* - values of CurrentMetrics;
* - values of AsynchronousMetrics;
* to Graphite at beginning of every minute.
*/
class MetricsTransmitter
{
public:
MetricsTransmitter(const AsynchronousMetrics & async_metrics_) : async_metrics(async_metrics_) {}
~MetricsTransmitter();
private:
void run();
void transmit();
void transmit(ProfileEvents::Count * prev_counters);
/// Значения счётчиков при предыдущей отправке (или нули, если ни разу не отправляли).
decltype(ProfileEvents::counters) prev_counters{};
const AsynchronousMetrics & async_metrics;
bool quit = false;
std::mutex mutex;
std::condition_variable cond;
std::thread thread {&MetricsTransmitter::run, this};
static constexpr auto event_path_prefix = "ClickHouse.ProfileEvents.";
static constexpr auto metrics_path_prefix = "ClickHouse.Metrics.";
static constexpr auto profile_events_path_prefix = "ClickHouse.ProfileEvents.";
static constexpr auto current_metrics_path_prefix = "ClickHouse.Metrics.";
static constexpr auto asynchronous_metrics_path_prefix = "ClickHouse.AsynchronousMetrics.";
};
}

View File

@ -20,6 +20,7 @@
#include <DB/Interpreters/loadMetadata.h>
#include <DB/Interpreters/ProcessList.h>
#include <DB/Interpreters/AsynchronousMetrics.h>
#include <DB/Storages/System/StorageSystemNumbers.h>
#include <DB/Storages/System/StorageSystemTables.h>
@ -38,6 +39,7 @@
#include <DB/Storages/System/StorageSystemFunctions.h>
#include <DB/Storages/System/StorageSystemClusters.h>
#include <DB/Storages/System/StorageSystemMetrics.h>
#include <DB/Storages/System/StorageSystemAsynchronousMetrics.h>
#include <DB/Storages/StorageReplicatedMergeTree.h>
#include <DB/Storages/MergeTree/ReshardingWorker.h>
#include <DB/Databases/DatabaseOrdinary.h>
@ -360,8 +362,6 @@ int Server::main(const std::vector<std::string> & args)
}
SCOPE_EXIT(
LOG_DEBUG(log, "Closed all connections.");
/** Ask to cancel background jobs all table engines,
* and also query_log.
* It is important to do early, not in destructor of Context, because
@ -380,10 +380,6 @@ int Server::main(const std::vector<std::string> & args)
);
{
const auto metrics_transmitter = config().getBool("use_graphite", true)
? std::make_unique<MetricsTransmitter>()
: nullptr;
const std::string listen_host = config().getString("listen_host", "::");
Poco::Timespan keep_alive_timeout(config().getInt("keep_alive_timeout", 10), 0);
@ -480,12 +476,14 @@ int Server::main(const std::vector<std::string> & args)
LOG_DEBUG(log, "Waiting for current connections to close.");
config_reloader.reset();
is_cancelled = true;
http_server->stop();
tcp_server->stop();
LOG_DEBUG(log, "Closed all connections.");
config_reloader.reset();
);
/// try to load dictionaries immediately, throw on error and die
@ -496,14 +494,23 @@ int Server::main(const std::vector<std::string> & args)
global_context->tryCreateDictionaries();
global_context->tryCreateExternalDictionaries();
}
waitForTerminationRequest();
}
catch (...)
{
LOG_ERROR(log, "Caught exception while loading dictionaries.");
throw;
}
/// This object will periodically calculate some metrics.
AsynchronousMetrics async_metrics(*global_context);
system_database->attachTable("asynchronous_metrics", StorageSystemAsynchronousMetrics::create("asynchronous_metrics", async_metrics));
const auto metrics_transmitter = config().getBool("use_graphite", true)
? std::make_unique<MetricsTransmitter>(async_metrics)
: nullptr;
waitForTerminationRequest();
}
return Application::EXIT_OK;

View File

@ -14,6 +14,12 @@
#include "Server.h"
namespace CurrentMetrics
{
extern const Metric TCPConnection;
}
namespace DB
{

View File

@ -147,7 +147,7 @@
<min_part_size>10000000000</min_part_size> <!- - Min part size in bytes. - ->
<min_part_size_ratio>0.01</min_part_size_ratio> <!- - Min size of part relative to whole table size. - ->
<!- - Какой метод сжатия выбрать. - ->
<!- - What compression method to use. - ->
<method>zstd</method> <!- - Keep in mind that zstd compression library is highly experimental. - ->
</case>
-->

View File

@ -7,6 +7,12 @@
#include <random>
namespace CurrentMetrics
{
extern const Metric BackgroundPoolTask;
}
namespace DB
{

View File

@ -3,6 +3,12 @@
#include <DB/Common/CurrentMetrics.h>
namespace CurrentMetrics
{
extern const Metric ReplicatedSend;
extern const Metric ReplicatedFetch;
}
namespace DB
{

View File

@ -25,6 +25,13 @@
#include <thread>
namespace ProfileEvents
{
extern const Event RejectedInserts;
extern const Event DelayedInserts;
extern const Event DelayedInsertsMilliseconds;
}
namespace DB
{

View File

@ -16,6 +16,13 @@
#include <DB/DataStreams/ConcatBlockInputStream.h>
#include <DB/Common/Increment.h>
namespace ProfileEvents
{
extern const Event MergedRows;
extern const Event MergedUncompressedBytes;
}
namespace DB
{

View File

@ -36,6 +36,14 @@
#include <DB/Common/VirtualColumnUtils.h>
namespace ProfileEvents
{
extern const Event SelectedParts;
extern const Event SelectedRanges;
extern const Event SelectedMarks;
}
namespace DB
{

View File

@ -4,6 +4,16 @@
#include <DB/DataTypes/DataTypeArray.h>
#include <DB/IO/HashingWriteBuffer.h>
namespace ProfileEvents
{
extern const Event MergeTreeDataWriterBlocks;
extern const Event MergeTreeDataWriterBlocksAlreadySorted;
extern const Event MergeTreeDataWriterRows;
extern const Event MergeTreeDataWriterUncompressedBytes;
extern const Event MergeTreeDataWriterCompressedBytes;
}
namespace DB
{

View File

@ -15,6 +15,11 @@
#include <DB/Common/escapeForFileName.h>
namespace CurrentMetrics
{
extern const Metric ReplicatedChecks;
}
namespace DB
{

View File

@ -4,6 +4,13 @@
#include <DB/Common/setThreadName.h>
namespace ProfileEvents
{
extern const Event ReplicatedPartChecks;
extern const Event ReplicatedPartChecksFailed;
extern const Event ReplicatedDataLoss;
}
namespace DB
{

View File

@ -577,14 +577,17 @@ ReplicatedMergeTreeQueue::SelectedEntry ReplicatedMergeTreeQueue::selectEntryToP
}
bool ReplicatedMergeTreeQueue::processEntry(zkutil::ZooKeeperPtr zookeeper, LogEntryPtr & entry, const std::function<bool(LogEntryPtr &)> func)
bool ReplicatedMergeTreeQueue::processEntry(
std::function<zkutil::ZooKeeperPtr()> get_zookeeper,
LogEntryPtr & entry,
const std::function<bool(LogEntryPtr &)> func)
{
std::exception_ptr saved_exception;
try
{
if (func(entry))
remove(zookeeper, entry);
remove(get_zookeeper(), entry);
}
catch (...)
{

View File

@ -7,6 +7,19 @@
#include <DB/Common/randomSeed.h>
namespace ProfileEvents
{
extern const Event ReplicaYieldLeadership;
extern const Event ReplicaPartialShutdown;
}
namespace CurrentMetrics
{
extern const Metric ReadonlyReplica;
extern const Metric LeaderReplica;
}
namespace DB
{
@ -37,10 +50,10 @@ void ReplicatedMergeTreeRestartingThread::run()
constexpr auto retry_period_ms = 10 * 1000;
/// Периодичность проверки истечения сессии в ZK.
time_t check_period_ms = 60 * 1000;
Int64 check_period_ms = storage.data.settings.zookeeper_session_expiration_check_period * 1000;
/// Периодичность проверки величины отставания реплики.
if (check_period_ms > static_cast<time_t>(storage.data.settings.check_delay_period) * 1000)
if (check_period_ms > static_cast<Int64>(storage.data.settings.check_delay_period) * 1000)
check_period_ms = storage.data.settings.check_delay_period * 1000;
setThreadName("ReplMTRestart");
@ -107,7 +120,9 @@ void ReplicatedMergeTreeRestartingThread::run()
time_t relative_delay = 0;
storage.getReplicaDelays(absolute_delay, relative_delay);
LOG_TRACE(log, "Absolute delay: " << absolute_delay << ". Relative delay: " << relative_delay << ".");
if (absolute_delay)
LOG_TRACE(log, "Absolute delay: " << absolute_delay << ". Relative delay: " << relative_delay << ".");
prev_time_of_check_delay = current_time;
@ -123,6 +138,7 @@ void ReplicatedMergeTreeRestartingThread::run()
if (storage.is_leader_node)
{
storage.is_leader_node = false;
CurrentMetrics::sub(CurrentMetrics::LeaderReplica);
if (storage.merge_selecting_thread.joinable())
storage.merge_selecting_thread.join();
@ -187,7 +203,7 @@ bool ReplicatedMergeTreeRestartingThread::tryStartup()
storage.zookeeper_path + "/leader_election",
*storage.current_zookeeper, /// current_zookeeper живёт в течение времени жизни leader_election,
/// так как до изменения current_zookeeper, объект leader_election уничтожается в методе partialShutdown.
[this] { storage.becomeLeader(); },
[this] { storage.becomeLeader(); CurrentMetrics::add(CurrentMetrics::LeaderReplica); },
storage.replica_name);
/// Все, что выше, может бросить KeeperException, если что-то не так с ZK.
@ -358,6 +374,7 @@ void ReplicatedMergeTreeRestartingThread::partialShutdown()
if (storage.is_leader_node)
{
storage.is_leader_node = false;
CurrentMetrics::sub(CurrentMetrics::LeaderReplica);
if (storage.merge_selecting_thread.joinable())
storage.merge_selecting_thread.join();
}

View File

@ -7,6 +7,13 @@
#include <DB/IO/InterserverWriteBuffer.h>
namespace CurrentMetrics
{
extern const Metric ReplicatedSend;
extern const Metric ReplicatedFetch;
}
namespace DB
{

View File

@ -47,6 +47,14 @@
#include <future>
namespace ProfileEvents
{
extern const Event ReplicatedPartMerges;
extern const Event ReplicatedPartFailedFetches;
extern const Event ReplicatedPartFetchesOfMerged;
extern const Event ObsoleteReplicatedParts;
extern const Event ReplicatedPartFetches;
}
namespace DB
{
@ -1029,8 +1037,6 @@ void StorageReplicatedMergeTree::pullLogsToQueue(zkutil::EventPtr next_update_ev
bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry, BackgroundProcessingPool::Context & pool_context)
{
auto zookeeper = getZooKeeper();
if (entry.type == LogEntry::DROP_RANGE)
{
executeDropRange(entry);
@ -1045,7 +1051,7 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry, Backgro
MergeTreeData::DataPartPtr containing_part = data.getActiveContainingPart(entry.new_part_name);
/// Даже если кусок есть локально, его (в исключительных случаях) может не быть в zookeeper. Проверим, что он там есть.
if (containing_part && zookeeper->exists(replica_path + "/parts/" + containing_part->name))
if (containing_part && getZooKeeper()->exists(replica_path + "/parts/" + containing_part->name))
{
if (!(entry.type == LogEntry::GET_PART && entry.source_replica == replica_name))
LOG_DEBUG(log, "Skipping action for part " << entry.new_part_name << " - part already exists.");
@ -1057,7 +1063,7 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry, Backgro
LOG_WARNING(log, "Part " << entry.new_part_name << " from own log doesn't exist.");
/// Возможно, этот кусок нам не нужен, так как при записи с кворумом, кворум пофейлился (см. ниже про /quorum/failed_parts).
if (entry.quorum && zookeeper->exists(zookeeper_path + "/quorum/failed_parts/" + entry.new_part_name))
if (entry.quorum && getZooKeeper()->exists(zookeeper_path + "/quorum/failed_parts/" + entry.new_part_name))
{
LOG_DEBUG(log, "Skipping action for part " << entry.new_part_name << " because quorum for that part was failed.");
return true; /// NOTE Удаление из virtual_parts не делается, но оно нужно только для мерджей.
@ -1270,6 +1276,8 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry, Backgro
* Это позволит проследить, что реплики не стали активными.
*/
auto zookeeper = getZooKeeper();
Strings replicas = zookeeper->getChildren(zookeeper_path + "/replicas");
zkutil::Ops ops;
@ -1408,11 +1416,9 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry, Backgro
void StorageReplicatedMergeTree::executeDropRange(const StorageReplicatedMergeTree::LogEntry & entry)
{
auto zookeeper = getZooKeeper();
LOG_INFO(log, (entry.detach ? "Detaching" : "Removing") << " parts inside " << entry.new_part_name << ".");
queue.removeGetsAndMergesInRange(zookeeper, entry.new_part_name);
queue.removeGetsAndMergesInRange(getZooKeeper(), entry.new_part_name);
LOG_DEBUG(log, (entry.detach ? "Detaching" : "Removing") << " parts.");
size_t removed_parts = 0;
@ -1437,7 +1443,7 @@ void StorageReplicatedMergeTree::executeDropRange(const StorageReplicatedMergeTr
zkutil::Ops ops;
removePartFromZooKeeper(part->name, ops);
auto code = zookeeper->tryMulti(ops);
auto code = getZooKeeper()->tryMulti(ops);
/// Если кусок уже удалён (например, потому что он так и не был добавлен в ZK из-за сбоя,
/// см. ReplicatedMergeTreeBlockOutputStream), то всё Ок.
@ -1455,8 +1461,6 @@ void StorageReplicatedMergeTree::executeDropRange(const StorageReplicatedMergeTr
bool StorageReplicatedMergeTree::executeAttachPart(const StorageReplicatedMergeTree::LogEntry & entry)
{
auto zookeeper = getZooKeeper();
String source_path = (entry.attach_unreplicated ? "unreplicated/" : "detached/") + entry.source_part_name;
LOG_INFO(log, "Attaching part " << entry.source_part_name << " from " << source_path << " as " << entry.new_part_name);
@ -1482,7 +1486,7 @@ bool StorageReplicatedMergeTree::executeAttachPart(const StorageReplicatedMergeT
LOG_WARNING(log, "Unreplicated part " << entry.source_part_name << " is already detached");
}
zookeeper->multi(ops);
getZooKeeper()->multi(ops);
/// NOTE: Не можем использовать renameTempPartAndAdd, потому что кусок не временный - если что-то пойдет не так, его не нужно удалять.
part->renameTo(entry.new_part_name);
@ -1551,7 +1555,7 @@ bool StorageReplicatedMergeTree::queueTask(BackgroundProcessingPool::Context & p
time_t prev_attempt_time = entry->last_attempt_time;
bool res = queue.processEntry(getZooKeeper(), entry, [&](LogEntryPtr & entry)
bool res = queue.processEntry([this]{ return getZooKeeper(); }, entry, [&](LogEntryPtr & entry)
{
try
{
@ -2046,15 +2050,13 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Strin
currently_fetching_parts.erase(part_name);
);
auto zookeeper = getZooKeeper();
LOG_DEBUG(log, "Fetching part " << part_name << " from " << replica_path);
TableStructureReadLockPtr table_lock;
if (!to_detached)
table_lock = lockStructure(true);
ReplicatedMergeTreeAddress address(zookeeper->get(replica_path + "/host"));
ReplicatedMergeTreeAddress address(getZooKeeper()->get(replica_path + "/host"));
MergeTreeData::MutableDataPartPtr part = fetcher.fetchPart(
part_name, replica_path, address.host, address.replication_port, to_detached);
@ -2073,7 +2075,7 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Strin
MergeTreeData::Transaction transaction;
auto removed_parts = data.renameTempPartAndReplace(part, nullptr, &transaction);
zookeeper->multi(ops);
getZooKeeper()->multi(ops);
transaction.commit();
/** Если для этого куска отслеживается кворум, то надо его обновить.
@ -2334,8 +2336,6 @@ bool StorageReplicatedMergeTree::optimize(const String & partition, bool final,
assertNotReadonly();
auto zookeeper = getZooKeeper();
if (!is_leader_node)
throw Exception("Method OPTIMIZE for ReplicatedMergeTree could be called only on leader replica", ErrorCodes::NOT_IMPLEMENTED);
@ -2386,7 +2386,6 @@ void StorageReplicatedMergeTree::alter(const AlterCommands & params,
{
assertNotReadonly();
auto zookeeper = getZooKeeper();
auto merge_blocker = merger.cancel();
auto unreplicated_merge_blocker = unreplicated_merger ?
unreplicated_merger->cancel() : MergeTreeDataMerger::Blocker();
@ -2425,7 +2424,7 @@ void StorageReplicatedMergeTree::alter(const AlterCommands & params,
}.toString();
/// Делаем ALTER.
zookeeper->set(zookeeper_path + "/columns", new_columns_str, -1, &stat);
getZooKeeper()->set(zookeeper_path + "/columns", new_columns_str, -1, &stat);
new_columns_version = stat.version;
}
@ -2435,7 +2434,7 @@ void StorageReplicatedMergeTree::alter(const AlterCommands & params,
/// Ждем, пока все реплики обновят данные.
/// Подпишемся на изменения столбцов, чтобы перестать ждать, если кто-то еще сделает ALTER.
if (!zookeeper->exists(zookeeper_path + "/columns", &stat, alter_query_event))
if (!getZooKeeper()->exists(zookeeper_path + "/columns", &stat, alter_query_event))
throw Exception(zookeeper_path + "/columns doesn't exist", ErrorCodes::NOT_FOUND_NODE);
if (stat.version != new_columns_version)
@ -2445,7 +2444,7 @@ void StorageReplicatedMergeTree::alter(const AlterCommands & params,
return;
}
Strings replicas = zookeeper->getChildren(zookeeper_path + "/replicas");
Strings replicas = getZooKeeper()->getChildren(zookeeper_path + "/replicas");
std::set<String> inactive_replicas;
std::set<String> timed_out_replicas;
@ -2459,7 +2458,7 @@ void StorageReplicatedMergeTree::alter(const AlterCommands & params,
while (!shutdown_called)
{
/// Реплика может быть неактивной.
if (!zookeeper->exists(zookeeper_path + "/replicas/" + replica + "/is_active"))
if (!getZooKeeper()->exists(zookeeper_path + "/replicas/" + replica + "/is_active"))
{
LOG_WARNING(log, "Replica " << replica << " is not active during ALTER query."
" ALTER will be done asynchronously when replica becomes active.");
@ -2471,7 +2470,7 @@ void StorageReplicatedMergeTree::alter(const AlterCommands & params,
String replica_columns_str;
/// Реплику могли успеть удалить.
if (!zookeeper->tryGet(zookeeper_path + "/replicas/" + replica + "/columns", replica_columns_str, &stat))
if (!getZooKeeper()->tryGet(zookeeper_path + "/replicas/" + replica + "/columns", replica_columns_str, &stat))
{
LOG_WARNING(log, replica << " was removed");
break;
@ -2482,7 +2481,7 @@ void StorageReplicatedMergeTree::alter(const AlterCommands & params,
if (replica_columns_str == new_columns_str)
break;
if (!zookeeper->exists(zookeeper_path + "/columns", &stat))
if (!getZooKeeper()->exists(zookeeper_path + "/columns", &stat))
throw Exception(zookeeper_path + "/columns doesn't exist", ErrorCodes::NOT_FOUND_NODE);
if (stat.version != new_columns_version)
@ -2492,7 +2491,7 @@ void StorageReplicatedMergeTree::alter(const AlterCommands & params,
return;
}
if (!zookeeper->exists(zookeeper_path + "/replicas/" + replica + "/columns", &stat, alter_query_event))
if (!getZooKeeper()->exists(zookeeper_path + "/replicas/" + replica + "/columns", &stat, alter_query_event))
{
LOG_WARNING(log, replica << " was removed");
break;
@ -2617,24 +2616,23 @@ void StorageReplicatedMergeTree::dropPartition(
assertNotReadonly();
auto zookeeper = getZooKeeper();
String month_name = MergeTreeData::getMonthName(field);
if (!is_leader_node)
{
/// Проксируем запрос в лидера.
auto live_replicas = zookeeper->getChildren(zookeeper_path + "/leader_election");
auto live_replicas = getZooKeeper()->getChildren(zookeeper_path + "/leader_election");
if (live_replicas.empty())
throw Exception("No active replicas", ErrorCodes::NO_ACTIVE_REPLICAS);
std::sort(live_replicas.begin(), live_replicas.end());
const auto leader = zookeeper->get(zookeeper_path + "/leader_election/" + live_replicas.front());
const auto leader = getZooKeeper()->get(zookeeper_path + "/leader_election/" + live_replicas.front());
if (leader == replica_name)
throw Exception("Leader was suddenly changed or logical error.", ErrorCodes::LEADERSHIP_CHANGED);
ReplicatedMergeTreeAddress leader_address(zookeeper->get(zookeeper_path + "/replicas/" + leader + "/host"));
ReplicatedMergeTreeAddress leader_address(getZooKeeper()->get(zookeeper_path + "/replicas/" + leader + "/host"));
auto new_query = query->clone();
auto & alter = typeid_cast<ASTAlterQuery &>(*new_query);
@ -2700,7 +2698,7 @@ void StorageReplicatedMergeTree::dropPartition(
entry.source_replica = replica_name;
entry.new_part_name = fake_part_name;
entry.detach = detach;
String log_znode_path = zookeeper->create(zookeeper_path + "/log/log-", entry.toString(), zkutil::CreateMode::PersistentSequential);
String log_znode_path = getZooKeeper()->create(zookeeper_path + "/log/log-", entry.toString(), zkutil::CreateMode::PersistentSequential);
entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1);
entry.create_time = time(0);
@ -2719,7 +2717,6 @@ void StorageReplicatedMergeTree::attachPartition(ASTPtr query, const Field & fie
{
assertNotReadonly();
auto zookeeper = getZooKeeper();
String partition;
if (attach_part)
@ -2804,12 +2801,12 @@ void StorageReplicatedMergeTree::attachPartition(ASTPtr query, const Field & fie
entry.create_time = time(0);
ops.push_back(new zkutil::Op::Create(
zookeeper_path + "/log/log-", entry.toString(), zookeeper->getDefaultACL(), zkutil::CreateMode::PersistentSequential));
zookeeper_path + "/log/log-", entry.toString(), getZooKeeper()->getDefaultACL(), zkutil::CreateMode::PersistentSequential));
}
LOG_DEBUG(log, "Adding attaches to log");
zookeeper->multi(ops);
getZooKeeper()->multi(ops);
/// Если надо - дожидаемся выполнения операции на себе или на всех репликах.
if (settings.replication_alter_partitions_sync != 0)
@ -2833,26 +2830,28 @@ void StorageReplicatedMergeTree::attachPartition(ASTPtr query, const Field & fie
void StorageReplicatedMergeTree::drop()
{
auto zookeeper = tryGetZooKeeper();
if (is_readonly || !zookeeper)
throw Exception("Can't drop readonly replicated table (need to drop data in ZooKeeper as well)", ErrorCodes::TABLE_IS_READ_ONLY);
shutdown();
if (zookeeper->expired())
throw Exception("Table was not dropped because ZooKeeper session has expired.", ErrorCodes::TABLE_WAS_NOT_DROPPED);
LOG_INFO(log, "Removing replica " << replica_path);
replica_is_active_node = nullptr;
zookeeper->tryRemoveRecursive(replica_path);
/// Проверяем, что zookeeper_path существует: его могла удалить другая реплика после выполнения предыдущей строки.
Strings replicas;
if (zookeeper->tryGetChildren(zookeeper_path + "/replicas", replicas) == ZOK && replicas.empty())
{
LOG_INFO(log, "Removing table " << zookeeper_path << " (this might take several minutes)");
zookeeper->tryRemoveRecursive(zookeeper_path);
auto zookeeper = tryGetZooKeeper();
if (is_readonly || !zookeeper)
throw Exception("Can't drop readonly replicated table (need to drop data in ZooKeeper as well)", ErrorCodes::TABLE_IS_READ_ONLY);
shutdown();
if (zookeeper->expired())
throw Exception("Table was not dropped because ZooKeeper session has expired.", ErrorCodes::TABLE_WAS_NOT_DROPPED);
LOG_INFO(log, "Removing replica " << replica_path);
replica_is_active_node = nullptr;
zookeeper->tryRemoveRecursive(replica_path);
/// Проверяем, что zookeeper_path существует: его могла удалить другая реплика после выполнения предыдущей строки.
Strings replicas;
if (zookeeper->tryGetChildren(zookeeper_path + "/replicas", replicas) == ZOK && replicas.empty())
{
LOG_INFO(log, "Removing table " << zookeeper_path << " (this might take several minutes)");
zookeeper->tryRemoveRecursive(zookeeper_path);
}
}
data.dropAllData();
@ -2883,8 +2882,7 @@ bool StorageReplicatedMergeTree::existsNodeCached(const std::string & path)
return true;
}
auto zookeeper = getZooKeeper();
bool res = zookeeper->exists(path);
bool res = getZooKeeper()->exists(path);
if (res)
{
@ -2925,10 +2923,9 @@ AbandonableLockInZooKeeper StorageReplicatedMergeTree::allocateBlockNumber(const
void StorageReplicatedMergeTree::waitForAllReplicasToProcessLogEntry(const ReplicatedMergeTreeLogEntryData & entry)
{
auto zookeeper = getZooKeeper();
LOG_DEBUG(log, "Waiting for all replicas to process " << entry.znode_name);
Strings replicas = zookeeper->getChildren(zookeeper_path + "/replicas");
Strings replicas = getZooKeeper()->getChildren(zookeeper_path + "/replicas");
for (const String & replica : replicas)
waitForReplicaToProcessLogEntry(replica, entry);
@ -2938,8 +2935,6 @@ void StorageReplicatedMergeTree::waitForAllReplicasToProcessLogEntry(const Repli
void StorageReplicatedMergeTree::waitForReplicaToProcessLogEntry(const String & replica, const ReplicatedMergeTreeLogEntryData & entry)
{
auto zookeeper = getZooKeeper();
String entry_str = entry.toString();
String log_node_name;
@ -2974,7 +2969,7 @@ void StorageReplicatedMergeTree::waitForReplicaToProcessLogEntry(const String &
{
zkutil::EventPtr event = std::make_shared<Poco::Event>();
String log_pointer = zookeeper->get(zookeeper_path + "/replicas/" + replica + "/log_pointer", nullptr, event);
String log_pointer = getZooKeeper()->get(zookeeper_path + "/replicas/" + replica + "/log_pointer", nullptr, event);
if (!log_pointer.empty() && parse<UInt64>(log_pointer) > log_index)
break;
@ -2987,9 +2982,9 @@ void StorageReplicatedMergeTree::waitForReplicaToProcessLogEntry(const String &
* ища ноду с таким же содержимым. И если мы её не найдём - значит реплика уже взяла эту запись в свою queue.
*/
String log_pointer = zookeeper->get(zookeeper_path + "/replicas/" + replica + "/log_pointer");
String log_pointer = getZooKeeper()->get(zookeeper_path + "/replicas/" + replica + "/log_pointer");
Strings log_entries = zookeeper->getChildren(zookeeper_path + "/log");
Strings log_entries = getZooKeeper()->getChildren(zookeeper_path + "/log");
UInt64 log_index = 0;
bool found = false;
@ -3001,7 +2996,7 @@ void StorageReplicatedMergeTree::waitForReplicaToProcessLogEntry(const String &
continue;
String log_entry_str;
bool exists = zookeeper->tryGet(zookeeper_path + "/log/" + log_entry_name, log_entry_str);
bool exists = getZooKeeper()->tryGet(zookeeper_path + "/log/" + log_entry_name, log_entry_str);
if (exists && entry_str == log_entry_str)
{
found = true;
@ -3019,7 +3014,7 @@ void StorageReplicatedMergeTree::waitForReplicaToProcessLogEntry(const String &
{
zkutil::EventPtr event = std::make_shared<Poco::Event>();
String log_pointer = zookeeper->get(zookeeper_path + "/replicas/" + replica + "/log_pointer", nullptr, event);
String log_pointer = getZooKeeper()->get(zookeeper_path + "/replicas/" + replica + "/log_pointer", nullptr, event);
if (!log_pointer.empty() && parse<UInt64>(log_pointer) > log_index)
break;
@ -3040,13 +3035,13 @@ void StorageReplicatedMergeTree::waitForReplicaToProcessLogEntry(const String &
* Поэтому, ищем путём сравнения содержимого.
*/
Strings queue_entries = zookeeper->getChildren(zookeeper_path + "/replicas/" + replica + "/queue");
Strings queue_entries = getZooKeeper()->getChildren(zookeeper_path + "/replicas/" + replica + "/queue");
String queue_entry_to_wait_for;
for (const String & entry_name : queue_entries)
{
String queue_entry_str;
bool exists = zookeeper->tryGet(zookeeper_path + "/replicas/" + replica + "/queue/" + entry_name, queue_entry_str);
bool exists = getZooKeeper()->tryGet(zookeeper_path + "/replicas/" + replica + "/queue/" + entry_name, queue_entry_str);
if (exists && queue_entry_str == entry_str)
{
queue_entry_to_wait_for = entry_name;
@ -3064,7 +3059,7 @@ void StorageReplicatedMergeTree::waitForReplicaToProcessLogEntry(const String &
LOG_DEBUG(log, "Waiting for " << queue_entry_to_wait_for << " to disappear from " << replica << " queue");
/// Третье - дождемся, пока запись исчезнет из очереди реплики.
zookeeper->waitForDisappear(zookeeper_path + "/replicas/" + replica + "/queue/" + queue_entry_to_wait_for);
getZooKeeper()->waitForDisappear(zookeeper_path + "/replicas/" + replica + "/queue/" + queue_entry_to_wait_for);
}
@ -3201,8 +3196,6 @@ void StorageReplicatedMergeTree::getReplicaDelays(time_t & out_absolute_delay, t
void StorageReplicatedMergeTree::fetchPartition(const Field & partition, const String & from_, const Settings & settings)
{
auto zookeeper = getZooKeeper();
String partition_str = MergeTreeData::getMonthName(partition);
String from = from_;
@ -3219,47 +3212,53 @@ void StorageReplicatedMergeTree::fetchPartition(const Field & partition, const S
if (startsWith(dir_it.name(), partition_str))
throw Exception("Detached partition " + partition_str + " already exists.", ErrorCodes::PARTITION_ALREADY_EXISTS);
/// Список реплик шарда-источника.
zkutil::Strings replicas = zookeeper->getChildren(from + "/replicas");
/// Оставим только активные реплики.
zkutil::Strings replicas;
zkutil::Strings active_replicas;
active_replicas.reserve(replicas.size());
for (const String & replica : replicas)
if (zookeeper->exists(from + "/replicas/" + replica + "/is_active"))
active_replicas.push_back(replica);
if (active_replicas.empty())
throw Exception("No active replicas for shard " + from, ErrorCodes::NO_ACTIVE_REPLICAS);
/** Надо выбрать лучшую (наиболее актуальную) реплику.
* Это реплика с максимальным log_pointer, затем с минимальным размером queue.
* NOTE Это не совсем лучший критерий. Для скачивания старых партиций это не имеет смысла,
* и было бы неплохо уметь выбирать реплику, ближайшую по сети.
* NOTE Разумеется, здесь есть data race-ы. Можно решить ретраями.
*/
Int64 max_log_pointer = -1;
UInt64 min_queue_size = std::numeric_limits<UInt64>::max();
String best_replica;
for (const String & replica : active_replicas)
{
String current_replica_path = from + "/replicas/" + replica;
auto zookeeper = getZooKeeper();
String log_pointer_str = zookeeper->get(current_replica_path + "/log_pointer");
Int64 log_pointer = log_pointer_str.empty() ? 0 : parse<UInt64>(log_pointer_str);
/// Список реплик шарда-источника.
replicas = zookeeper->getChildren(from + "/replicas");
zkutil::Stat stat;
zookeeper->get(current_replica_path + "/queue", &stat);
size_t queue_size = stat.numChildren;
/// Оставим только активные реплики.
active_replicas.reserve(replicas.size());
if (log_pointer > max_log_pointer
|| (log_pointer == max_log_pointer && queue_size < min_queue_size))
for (const String & replica : replicas)
if (zookeeper->exists(from + "/replicas/" + replica + "/is_active"))
active_replicas.push_back(replica);
if (active_replicas.empty())
throw Exception("No active replicas for shard " + from, ErrorCodes::NO_ACTIVE_REPLICAS);
/** Надо выбрать лучшую (наиболее актуальную) реплику.
* Это реплика с максимальным log_pointer, затем с минимальным размером queue.
* NOTE Это не совсем лучший критерий. Для скачивания старых партиций это не имеет смысла,
* и было бы неплохо уметь выбирать реплику, ближайшую по сети.
* NOTE Разумеется, здесь есть data race-ы. Можно решить ретраями.
*/
Int64 max_log_pointer = -1;
UInt64 min_queue_size = std::numeric_limits<UInt64>::max();
for (const String & replica : active_replicas)
{
max_log_pointer = log_pointer;
min_queue_size = queue_size;
best_replica = replica;
String current_replica_path = from + "/replicas/" + replica;
String log_pointer_str = zookeeper->get(current_replica_path + "/log_pointer");
Int64 log_pointer = log_pointer_str.empty() ? 0 : parse<UInt64>(log_pointer_str);
zkutil::Stat stat;
zookeeper->get(current_replica_path + "/queue", &stat);
size_t queue_size = stat.numChildren;
if (log_pointer > max_log_pointer
|| (log_pointer == max_log_pointer && queue_size < min_queue_size))
{
max_log_pointer = log_pointer;
min_queue_size = queue_size;
best_replica = replica;
}
}
}
@ -3288,7 +3287,7 @@ void StorageReplicatedMergeTree::fetchPartition(const Field & partition, const S
if (try_no >= 5)
throw Exception("Too much retries to fetch parts from " + best_replica_path, ErrorCodes::TOO_MUCH_RETRIES_TO_FETCH_PARTS);
Strings parts = zookeeper->getChildren(best_replica_path + "/parts");
Strings parts = getZooKeeper()->getChildren(best_replica_path + "/parts");
ActiveDataPartSet active_parts_set(parts);
Strings parts_to_fetch;
@ -3675,9 +3674,10 @@ StorageReplicatedMergeTree::gatherReplicaSpaceInfo(const WeightedZooKeeperPaths
local_space_info.factor = 1.1;
local_space_info.available_size = DiskSpaceMonitor::getUnreservedFreeSpace(full_path);
auto zookeeper = getZooKeeper();
for (const auto & weighted_path : weighted_zookeeper_paths)
{
auto zookeeper = getZooKeeper();
const auto & path = weighted_path.first;
UInt64 weight = weighted_path.second;

View File

@ -0,0 +1,68 @@
#include <DB/Interpreters/AsynchronousMetrics.h>
#include <DB/Columns/ColumnString.h>
#include <DB/DataTypes/DataTypeString.h>
#include <DB/DataTypes/DataTypesNumberFixed.h>
#include <DB/DataStreams/OneBlockInputStream.h>
#include <DB/Storages/System/StorageSystemAsynchronousMetrics.h>
namespace DB
{
StorageSystemAsynchronousMetrics::StorageSystemAsynchronousMetrics(const std::string & name_, const AsynchronousMetrics & async_metrics_)
: name(name_),
columns
{
{"metric", std::make_shared<DataTypeString>()},
{"value", std::make_shared<DataTypeFloat64>()},
},
async_metrics(async_metrics_)
{
}
StoragePtr StorageSystemAsynchronousMetrics::create(const std::string & name_, const AsynchronousMetrics & async_metrics_)
{
return make_shared(name_, async_metrics_);
}
BlockInputStreams StorageSystemAsynchronousMetrics::read(
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
const size_t max_block_size,
const unsigned threads)
{
check(column_names);
processed_stage = QueryProcessingStage::FetchColumns;
Block block;
ColumnWithTypeAndName col_metric;
col_metric.name = "metric";
col_metric.type = std::make_shared<DataTypeString>();
col_metric.column = std::make_shared<ColumnString>();
block.insert(col_metric);
ColumnWithTypeAndName col_value;
col_value.name = "value";
col_value.type = std::make_shared<DataTypeFloat64>();
col_value.column = std::make_shared<ColumnFloat64>();
block.insert(col_value);
auto async_metrics_values = async_metrics.getValues();
for (const auto & name_value : async_metrics_values)
{
col_metric.column->insert(name_value.first);
col_value.column->insert(name_value.second);
}
return BlockInputStreams(1, std::make_shared<OneBlockInputStream>(block));
}
}

View File

@ -52,7 +52,7 @@ BlockInputStreams StorageSystemEvents::read(
col_value.column = std::make_shared<ColumnUInt64>();
block.insert(col_value);
for (size_t i = 0; i < ProfileEvents::END; ++i)
for (size_t i = 0, end = ProfileEvents::end(); i < end; ++i)
{
UInt64 value = ProfileEvents::counters[i];

View File

@ -52,7 +52,7 @@ BlockInputStreams StorageSystemMetrics::read(
col_value.column = std::make_shared<ColumnInt64>();
block.insert(col_value);
for (size_t i = 0; i < CurrentMetrics::END; ++i)
for (size_t i = 0, end = CurrentMetrics::end(); i < end; ++i)
{
auto value = CurrentMetrics::values[i].load(std::memory_order_relaxed);

View File

@ -5,39 +5,58 @@
namespace ext
{
/**
* Class AllocateShared allow to make std::shared_ptr<T> from T with private constructor.
* Derive you T class from shared_ptr_helper<T>, define him as friend and call allocate_shared()/make_shared() method.
**/
template <class T>
/** Class AllocateShared allow to make std::shared_ptr<T> from T with private constructor.
* Derive your T class from shared_ptr_helper<T>, define him as friend and call allocate_shared()/make_shared() method.
*/
template <typename T>
class shared_ptr_helper
{
protected:
typedef typename std::remove_const<T>::type TNoConst;
typedef typename std::remove_const<T>::type TNoConst;
template <class TAlloc>
template <typename TAlloc>
struct Deleter
{
void operator()(typename TAlloc::value_type * ptr)
{
std::allocator_traits<TAlloc>::destroy(alloc, ptr);
}
TAlloc alloc;
void operator()(typename TAlloc::value_type * ptr)
{
using AllocTraits = std::allocator_traits<TAlloc>;
ptr->~TNoConst();
AllocTraits::deallocate(alloc, ptr, 1);
}
TAlloc alloc;
};
///see std::allocate_shared
template <class TAlloc, class ... TArgs>
static std::shared_ptr<T> allocate_shared(const TAlloc & alloc, TArgs && ... args)
{
TAlloc alloc_copy(alloc);
return std::shared_ptr<TNoConst>(new (std::allocator_traits<TAlloc>::allocate(alloc_copy, 1)) TNoConst(std::forward<TArgs>(args)...), Deleter<TAlloc>(), alloc_copy);
}
/// see std::allocate_shared
template <typename TAlloc, typename ... TArgs>
static std::shared_ptr<T> allocate_shared(const TAlloc & alloc, TArgs &&... args)
{
using AllocTraits = std::allocator_traits<TAlloc>;
TAlloc alloc_copy(alloc);
template <class ... TArgs>
static std::shared_ptr<T> make_shared(TArgs && ... args)
{
return allocate_shared(std::allocator<TNoConst>(), std::forward<TArgs>(args)...);
}
auto ptr = AllocTraits::allocate(alloc_copy, 1);
try
{
new (ptr) TNoConst(std::forward<TArgs>(args)...);
}
catch (...)
{
AllocTraits::deallocate(alloc_copy, ptr, 1);
throw;
}
return std::shared_ptr<TNoConst>(
ptr,
Deleter<TAlloc>(),
alloc_copy);
}
template <typename ... TArgs>
static std::shared_ptr<T> make_shared(TArgs &&... args)
{
return allocate_shared(std::allocator<TNoConst>(), std::forward<TArgs>(args)...);
}
};
}

View File

@ -12,6 +12,11 @@ namespace DB
}
}
namespace ProfileEvents
{
extern const Event ZooKeeperExceptions;
}
namespace zkutil
{
@ -54,7 +59,7 @@ public:
{
return code == ZCONNECTIONLOSS || code == ZOPERATIONTIMEOUT;
}
const int32_t code;
private:

View File

@ -4,6 +4,19 @@
#include <functional>
#include <memory>
#include <common/logger_useful.h>
#include <DB/Common/CurrentMetrics.h>
namespace ProfileEvents
{
extern const Event ObsoleteEphemeralNode;
extern const Event LeaderElectionAcquiredLeadership;
}
namespace CurrentMetrics
{
extern const Metric LeaderElection;
}
namespace zkutil
@ -17,6 +30,11 @@ public:
using LeadershipHandler = std::function<void()>;
/** handler is called when this instance become leader.
*
* identifier - if not empty, must uniquely (within same path) identify participant of leader election.
* It means that different participants of leader election have different identifiers
* and existence of more than one ephemeral node with same identifier indicates an error
* (see cleanOldEphemeralNodes).
*/
LeaderElection(const std::string & path_, ZooKeeper & zookeeper_, LeadershipHandler handler_, const std::string & identifier_ = "")
: path(path_), zookeeper(zookeeper_), handler(handler_), identifier(identifier_)
@ -48,6 +66,8 @@ private:
std::atomic<bool> shutdown {false};
zkutil::EventPtr event = std::make_shared<Poco::Event>();
CurrentMetrics::Increment metric_increment{CurrentMetrics::LeaderElection};
void createNode()
{
shutdown = false;
@ -56,9 +76,41 @@ private:
std::string node_path = node->getPath();
node_name = node_path.substr(node_path.find_last_of('/') + 1);
cleanOldEphemeralNodes();
thread = std::thread(&LeaderElection::threadFunction, this);
}
void cleanOldEphemeralNodes()
{
if (identifier.empty())
return;
/** If there are nodes with same identifier, remove them.
* Such nodes could still be alive after failed attempt of removal,
* if it was temporary communication failure, that was continued for more than session timeout,
* but ZK session is still alive for unknown reason, and someone still holds that ZK session.
* See comments in destructor of EphemeralNodeHolder.
*/
Strings brothers = zookeeper.getChildren(path);
for (const auto & brother : brothers)
{
if (brother == node_name)
continue;
std::string brother_path = path + "/" + brother;
std::string brother_identifier = zookeeper.get(brother_path);
if (brother_identifier == identifier)
{
ProfileEvents::increment(ProfileEvents::ObsoleteEphemeralNode);
LOG_WARNING(&Logger::get("LeaderElection"), "Found obsolete ephemeral node for identifier "
+ identifier + ", removing: " + brother_path);
zookeeper.tryRemoveWithRetries(brother_path);
}
}
}
void releaseNode()
{
shutdown = true;
@ -84,6 +136,7 @@ private:
if (it == children.begin())
{
ProfileEvents::increment(ProfileEvents::LeaderElectionAcquiredLeadership);
handler();
return;
}

View File

@ -9,6 +9,19 @@
#include <mutex>
#include <string>
#include <common/logger_useful.h>
#include <DB/Common/ProfileEvents.h>
#include <DB/Common/CurrentMetrics.h>
namespace ProfileEvents
{
extern const Event CannotRemoveEphemeralNode;
}
namespace CurrentMetrics
{
extern const Metric EphemeralNode;
}
namespace zkutil
@ -432,13 +445,15 @@ public:
}
catch (const KeeperException & e)
{
LOG_ERROR(zookeeper.log, "~EphemeralNodeHolder(): " << e.displayText());
ProfileEvents::increment(ProfileEvents::CannotRemoveEphemeralNode);
DB::tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
private:
std::string path;
ZooKeeper & zookeeper;
CurrentMetrics::Increment metric_increment{CurrentMetrics::EphemeralNode};
};
using EphemeralNodeHolderPtr = EphemeralNodeHolder::Ptr;

View File

@ -5,6 +5,25 @@
#include <DB/Common/StringUtils.h>
namespace ProfileEvents
{
extern const Event ZooKeeperInit;
extern const Event ZooKeeperTransactions;
extern const Event ZooKeeperCreate;
extern const Event ZooKeeperRemove;
extern const Event ZooKeeperExists;
extern const Event ZooKeeperMulti;
extern const Event ZooKeeperGet;
extern const Event ZooKeeperSet;
extern const Event ZooKeeperGetChildren;
}
namespace CurrentMetrics
{
extern const Metric ZooKeeperWatch;
}
namespace zkutil
{
@ -29,6 +48,7 @@ struct WatchWithEvent
/// существует все время существования WatchWithEvent
ZooKeeper & zk;
EventPtr event;
CurrentMetrics::Increment metric_increment{CurrentMetrics::ZooKeeperWatch};
WatchWithEvent(ZooKeeper & zk_, EventPtr event_) : zk(zk_), event(event_) {}