This commit is contained in:
Evgeniy Gatov 2016-01-21 15:55:17 +03:00
commit d231814588
39 changed files with 447 additions and 37 deletions

View File

@ -0,0 +1,140 @@
#pragma once
#include <cstdint>
#include <utility>
/** Позволяет считать количество одновременно происходящих событий или текущее значение какой-либо метрики.
* - для высокоуровневого профайлинга.
*
* Также смотрите ProfileEvents.h
* В ProfileEvents считается общее количество произошедших (точечных) событий - например, сколько раз были выполнены запросы.
* В CurrentMetrics считается количество одновременных событий - например, сколько сейчас одновременно выполняется запросов,
* или текущее значение метрики - например, величина отставания реплики в секундах.
*/
#define APPLY_FOR_METRICS(M) \
M(Query) \
M(Merge) \
M(ReplicatedFetch) \
M(ReplicatedSend) \
M(ReplicatedChecks) \
M(BackgroundPoolTask) \
M(DiskSpaceReservedForMerge) \
M(DistributedSend) \
M(QueryPreempted) \
M(TCPConnection) \
M(HTTPConnection) \
M(InterserverConnection) \
M(OpenFileForRead) \
M(OpenFileForWrite) \
M(Read) \
M(Write) \
M(SendExternalTables) \
M(QueryThread) \
M(ReadonlyReplica) \
M(MemoryTracking) \
\
M(END)
namespace CurrentMetrics
{
/// Виды метрик.
enum Metric
{
#define M(NAME) NAME,
APPLY_FOR_METRICS(M)
#undef M
};
/// Получить текстовое описание метрики по его enum-у.
inline const char * getDescription(Metric event)
{
static const char * descriptions[] =
{
#define M(NAME) #NAME,
APPLY_FOR_METRICS(M)
#undef M
};
return descriptions[event];
}
using Value = int64_t;
/// Счётчики - текущие значения метрик.
extern Value values[END];
/// Выставить значение указанной метрики.
inline void set(Metric metric, Value value)
{
values[metric] = value;
}
/// Прибавить величину к значению указанной метрики. Вы затем должны вычесть величину самостоятельно. Или см. ниже class Increment.
inline void add(Metric metric, Value value = 1)
{
__sync_fetch_and_add(&values[metric], value);
}
inline void sub(Metric metric, Value value = 1)
{
add(metric, -value);
}
/// На время жизни объекта, увеличивает указанное значение на указанную величину.
class Increment
{
private:
Value * what;
Value amount;
Increment(Value * what, Value amount)
: what(what), amount(amount)
{
__sync_fetch_and_add(what, amount);
}
public:
Increment(Metric metric, Value amount = 1)
: Increment(&values[metric], amount) {}
~Increment()
{
if (what)
__sync_fetch_and_sub(what, amount);
}
Increment(Increment && old)
{
*this = std::move(old);
}
Increment & operator= (Increment && old)
{
what = old.what;
amount = old.amount;
old.what = nullptr;
return *this;
}
void changeTo(Value new_amount)
{
__sync_fetch_and_add(what, new_amount - amount);
amount = new_amount;
}
/// Уменьшить значение раньше вызова деструктора.
void destroy()
{
__sync_fetch_and_sub(what, amount);
what = nullptr;
}
};
}
#undef APPLY_FOR_METRICS

View File

@ -80,6 +80,10 @@
M(SlowRead) \
M(ReadBackoff) \
\
M(ReplicaYieldLeadership) \
M(ReplicaPartialShutdown) \
M(ReplicaPermanentlyReadonly) \
\
M(END)
namespace ProfileEvents

View File

@ -6,6 +6,7 @@
#include <DB/DataStreams/IProfilingBlockInputStream.h>
#include <DB/Common/setThreadName.h>
#include <DB/Common/CurrentMetrics.h>
namespace DB
@ -123,6 +124,7 @@ protected:
{
setThreadName("AsyncBlockInput");
current_memory_tracker = memory_tracker;
CurrentMetrics::Increment metric_increment{CurrentMetrics::QueryThread};
try
{

View File

@ -10,6 +10,7 @@
#include <DB/DataStreams/IProfilingBlockInputStream.h>
#include <DB/Common/setThreadName.h>
#include <DB/Common/CurrentMetrics.h>
/** Позволяет обработать множество источников блоков параллельно, используя указанное количество потоков.
@ -172,6 +173,7 @@ private:
std::exception_ptr exception;
setThreadName("ParalInputsProc");
CurrentMetrics::Increment metric_increment{CurrentMetrics::QueryThread};
try
{

View File

@ -5,6 +5,7 @@
#include <DB/IO/BufferWithOwnMemory.h>
#include <DB/Core/Defines.h>
#include <DB/Common/AIO.h>
#include <DB/Common/CurrentMetrics.h>
#include <string>
#include <limits>
@ -95,6 +96,8 @@ private:
bool is_aio = false;
/// Асинхронная операция завершилась неудачно?
bool aio_failed = false;
CurrentMetrics::Increment metric_increment{CurrentMetrics::OpenFileForRead};
};
}

View File

@ -3,6 +3,7 @@
#include <fcntl.h>
#include <DB/IO/ReadBufferFromFileDescriptor.h>
#include <DB/Common/CurrentMetrics.h>
namespace DB
@ -22,6 +23,7 @@ class ReadBufferFromFile : public ReadBufferFromFileDescriptor
{
private:
std::string file_name;
CurrentMetrics::Increment metric_increment{CurrentMetrics::OpenFileForRead};
public:
ReadBufferFromFile(const std::string & file_name_, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, int flags = -1,
@ -58,6 +60,7 @@ public:
throw Exception("Cannot close file", ErrorCodes::CANNOT_CLOSE_FILE);
fd = -1;
metric_increment.destroy();
}
virtual std::string getFileName()

View File

@ -10,6 +10,7 @@
#include <DB/Common/Stopwatch.h>
#include <DB/Common/Exception.h>
#include <DB/Common/CurrentMetrics.h>
#include <DB/IO/ReadBufferFromFileBase.h>
#include <DB/IO/ReadBuffer.h>
@ -47,7 +48,11 @@ protected:
if (profile_callback)
watch.emplace(clock_type);
ssize_t res = ::read(fd, internal_buffer.begin(), internal_buffer.size());
ssize_t res = 0;
{
CurrentMetrics::Increment metric_increment{CurrentMetrics::Read};
res = ::read(fd, internal_buffer.begin(), internal_buffer.size());
}
if (!res)
break;

View File

@ -5,6 +5,7 @@
#include <DB/IO/BufferWithOwnMemory.h>
#include <DB/Core/Defines.h>
#include <DB/Common/AIO.h>
#include <DB/Common/CurrentMetrics.h>
#include <string>
#include <unistd.h>
@ -87,6 +88,8 @@ private:
bool is_pending_write = false;
/// Асинхронная операция завершилась неудачно?
bool aio_failed = false;
CurrentMetrics::Increment metric_increment{CurrentMetrics::OpenFileForWrite};
};
}

View File

@ -5,6 +5,7 @@
#include <fcntl.h>
#include <DB/Common/ProfileEvents.h>
#include <DB/Common/CurrentMetrics.h>
#include <DB/IO/WriteBufferFromFileDescriptor.h>
@ -26,6 +27,7 @@ class WriteBufferFromFile : public WriteBufferFromFileDescriptor
{
private:
std::string file_name;
CurrentMetrics::Increment metric_increment{CurrentMetrics::OpenFileForWrite};
public:
WriteBufferFromFile(const std::string & file_name_, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, int flags = -1, mode_t mode = 0666,
@ -73,6 +75,7 @@ public:
throw Exception("Cannot close file", ErrorCodes::CANNOT_CLOSE_FILE);
fd = -1;
metric_increment.destroy();
}
/** fsync() transfers ("flushes") all modified in-core data of (i.e., modified buffer cache pages for) the file

View File

@ -5,6 +5,7 @@
#include <DB/Common/Exception.h>
#include <DB/Common/ProfileEvents.h>
#include <DB/Common/CurrentMetrics.h>
#include <DB/IO/WriteBufferFromFileBase.h>
#include <DB/IO/WriteBuffer.h>
@ -40,7 +41,11 @@ protected:
{
ProfileEvents::increment(ProfileEvents::WriteBufferFromFileDescriptorWrite);
ssize_t res = ::write(fd, working_buffer.begin() + bytes_written, offset() - bytes_written);
ssize_t res = 0;
{
CurrentMetrics::Increment metric_increment{CurrentMetrics::Write};
res = ::write(fd, working_buffer.begin() + bytes_written, offset() - bytes_written);
}
if ((-1 == res || 0 == res) && errno != EINTR)
throwFromErrno("Cannot write to file " + getFileName(), ErrorCodes::CANNOT_WRITE_TO_FILE_DESCRIPTOR);

View File

@ -14,6 +14,7 @@
#include <DB/IO/WriteHelpers.h>
#include <DB/Interpreters/QueryPriorities.h>
#include <DB/Storages/IStorage.h>
#include <DB/Common/CurrentMetrics.h>
namespace DB
@ -23,6 +24,23 @@ namespace DB
* Также реализует ограничение на их количество.
*/
/** Информационная составляющая элемента списка процессов.
* Для вывода в SHOW PROCESSLIST. Не содержит никаких сложных объектов, которые что-то делают при копировании или в деструкторах.
*/
struct ProcessInfo
{
String query;
String user;
String query_id;
Poco::Net::IPAddress ip_address;
double elapsed_seconds;
size_t rows;
size_t bytes;
size_t total_rows;
Int64 memory_usage;
};
/// Запрос и данные о его выполнении.
struct ProcessListElement
{
@ -39,6 +57,8 @@ struct ProcessListElement
QueryPriorities::Handle priority_handle;
CurrentMetrics::Increment num_queries {CurrentMetrics::Query};
bool is_cancelled = false;
/// Здесь могут быть зарегистрированы временные таблицы. Изменять под mutex-ом.
@ -73,6 +93,21 @@ struct ProcessListElement
return !is_cancelled;
}
ProcessInfo getInfo() const
{
return ProcessInfo{
.query = query,
.user = user,
.query_id = query_id,
.ip_address = ip_address,
.elapsed_seconds = watch.elapsedSeconds(),
.rows = progress.rows,
.bytes = progress.bytes,
.total_rows = progress.total_rows,
.memory_usage = memory_tracker.get(),
};
}
};
@ -122,6 +157,7 @@ public:
/// list, чтобы итераторы не инвалидировались. NOTE: можно заменить на cyclic buffer, но почти незачем.
using Container = std::list<Element>;
using Info = std::vector<ProcessInfo>;
/// User -> queries
using UserToQueries = std::unordered_map<String, ProcessListForUser>;
@ -153,11 +189,17 @@ public:
/// Количество одновременно выполняющихся запросов.
size_t size() const { return cur_size; }
/// Получить текущее состояние (копию) списка запросов.
Container get() const
/// Получить текущее состояние списка запросов.
Info getInfo() const
{
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
return cont;
Info res;
res.reserve(cur_size);
for (const auto & elem : cont)
res.emplace_back(elem.getInfo());
return res;
}
void setMaxSize(size_t max_size_)

View File

@ -5,6 +5,7 @@
#include <condition_variable>
#include <memory>
#include <chrono>
#include <DB/Common/CurrentMetrics.h>
/** Реализует приоритеты запросов.
@ -67,6 +68,7 @@ private:
if (!found)
return true;
CurrentMetrics::Increment metric_increment{CurrentMetrics::QueryPreempted};
if (std::cv_status::timeout == condvar.wait_for(lock, timeout))
return false;
}

View File

@ -3,6 +3,7 @@
#include <DB/DataStreams/RemoteBlockOutputStream.h>
#include <DB/Common/escapeForFileName.h>
#include <DB/Common/setThreadName.h>
#include <DB/Common/CurrentMetrics.h>
#include <DB/Storages/StorageDistributed.h>
#include <DB/IO/ReadBufferFromFile.h>
@ -181,6 +182,8 @@ private:
try
{
CurrentMetrics::Increment metric_increment{CurrentMetrics::DistributedSend};
ReadBufferFromFile in{file_path};
std::string insert_query;

View File

@ -6,6 +6,7 @@
#include <DB/Common/Exception.h>
#include <DB/IO/WriteHelpers.h>
#include <DB/Common/formatReadable.h>
#include <DB/Common/CurrentMetrics.h>
namespace DB
{
@ -73,13 +74,16 @@ public:
return size;
}
private:
Reservation(size_t size_) : size(size_)
Reservation(size_t size_)
: size(size_), metric_increment(CurrentMetrics::DiskSpaceReservedForMerge, size)
{
Poco::ScopedLock<Poco::FastMutex> lock(DiskSpaceMonitor::mutex);
DiskSpaceMonitor::reserved_bytes += size;
++DiskSpaceMonitor::reservation_count;
}
size_t size;
CurrentMetrics::Increment metric_increment;
};
typedef Poco::SharedPtr<Reservation> ReservationPtr;

View File

@ -1,6 +1,7 @@
#pragma once
#include <DB/Common/Stopwatch.h>
#include <DB/Common/CurrentMetrics.h>
#include <memory>
#include <list>
#include <mutex>
@ -42,6 +43,8 @@ class MergeListEntry
using container_t = std::list<MergeInfo>;
container_t::iterator it;
CurrentMetrics::Increment num_merges {CurrentMetrics::Merge};
public:
MergeListEntry(const MergeListEntry &) = delete;
MergeListEntry & operator=(const MergeListEntry &) = delete;

View File

@ -1,16 +1,11 @@
#pragma once
#include <Poco/SharedPtr.h>
#include <DB/Storages/IStorage.h>
namespace DB
{
using Poco::SharedPtr;
/** Реализует системную таблицу events, которая позволяет получить информацию для профайлинга.
*/
class StorageSystemEvents : public IStorage

View File

@ -0,0 +1,37 @@
#pragma once
#include <DB/Storages/IStorage.h>
namespace DB
{
/** Реализует системную таблицу metrics, которая позволяет получить информацию о работе сервера.
*/
class StorageSystemMetrics : public IStorage
{
public:
static StoragePtr create(const std::string & name_);
std::string getName() const override { return "SystemMetrics"; }
std::string getTableName() const override { return name; }
const NamesAndTypesList & getColumnsListImpl() const override { return columns; }
BlockInputStreams read(
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE,
unsigned threads = 1) override;
private:
const std::string name;
NamesAndTypesList columns;
StorageSystemMetrics(const std::string & name_);
};
}

View File

@ -21,6 +21,7 @@
#include <DB/Client/Connection.h>
#include <DB/Common/NetException.h>
#include <DB/Common/CurrentMetrics.h>
namespace DB
@ -366,6 +367,8 @@ void Connection::sendExternalTablesData(ExternalTablesData & data)
size_t maybe_compressed_out_bytes = maybe_compressed_out ? maybe_compressed_out->count() : 0;
size_t rows = 0;
CurrentMetrics::Increment metric_increment{CurrentMetrics::SendExternalTables};
for (auto & elem : data)
{
elem.first->readPrefix();

View File

@ -0,0 +1,7 @@
#include <DB/Common/CurrentMetrics.h>
namespace CurrentMetrics
{
Value values[END]; /// Глобальная переменная - инициализируется нулями.
}

View File

@ -1,6 +1,7 @@
#include <common/likely.h>
#include <common/logger_useful.h>
#include <DB/Common/Exception.h>
#include <DB/Common/CurrentMetrics.h>
#include <DB/Common/formatReadable.h>
#include <DB/IO/WriteHelpers.h>
#include <iomanip>
@ -21,6 +22,9 @@ MemoryTracker::~MemoryTracker()
{
if (peak)
logPeakMemoryUsage();
if (amount && !next)
CurrentMetrics::sub(CurrentMetrics::MemoryTracking, amount);
}
@ -36,6 +40,9 @@ void MemoryTracker::alloc(Int64 size)
{
Int64 will_be = __sync_add_and_fetch(&amount, size);
if (!next)
CurrentMetrics::add(CurrentMetrics::MemoryTracking, size);
/// Используется непотокобезопасный генератор случайных чисел. Совместное распределение в разных потоках не будет равномерным.
/// В данном случае, это нормально.
if (unlikely(fault_probability && drand48() < fault_probability))
@ -82,11 +89,16 @@ void MemoryTracker::free(Int64 size)
if (next)
next->free(size);
else
CurrentMetrics::sub(CurrentMetrics::MemoryTracking, size);
}
void MemoryTracker::reset()
{
if (!next)
CurrentMetrics::sub(CurrentMetrics::MemoryTracking, amount);
amount = 0;
peak = 0;
limit = 0;

View File

@ -1,5 +1,6 @@
#include <future>
#include <DB/Common/setThreadName.h>
#include <DB/Common/CurrentMetrics.h>
#include <DB/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.h>
@ -111,6 +112,7 @@ void MergingAggregatedMemoryEfficientBlockInputStream::start()
{
current_memory_tracker = memory_tracker;
setThreadName("MergeAggReadThr");
CurrentMetrics::Increment metric_increment{CurrentMetrics::QueryThread};
child->readPrefix();
});
reading_pool->schedule([&task] { task(); });
@ -227,6 +229,7 @@ void MergingAggregatedMemoryEfficientBlockInputStream::mergeThread(MemoryTracker
{
setThreadName("MergeAggMergThr");
current_memory_tracker = memory_tracker;
CurrentMetrics::Increment metric_increment{CurrentMetrics::QueryThread};
try
{
@ -397,6 +400,7 @@ MergingAggregatedMemoryEfficientBlockInputStream::BlocksToMerge MergingAggregate
{
current_memory_tracker = memory_tracker;
setThreadName("MergeAggReadThr");
CurrentMetrics::Increment metric_increment{CurrentMetrics::QueryThread};
read_from_input(input);
});
auto & task = tasks.back();

View File

@ -173,6 +173,8 @@ off_t ReadBufferAIO::doSeek(off_t off, int whence)
void ReadBufferAIO::synchronousRead()
{
CurrentMetrics::Increment metric_increment{CurrentMetrics::Read};
prepare();
bytes_read = ::pread(fd, buffer_begin, region_aligned_size, region_aligned_begin);
@ -207,6 +209,8 @@ bool ReadBufferAIO::waitForAIOCompletion()
if (is_eof || !is_pending_read)
return false;
CurrentMetrics::Increment metric_increment{CurrentMetrics::Read};
bytes_read = future_bytes_read.get();
is_pending_read = false;

View File

@ -164,6 +164,8 @@ bool WriteBufferAIO::waitForAIOCompletion()
if (!is_pending_write)
return false;
CurrentMetrics::Increment metric_increment{CurrentMetrics::Write};
while (io_getevents(aio_context.ctx, events.size(), events.size(), &events[0], nullptr) < 0)
{
if (errno != EINTR)

View File

@ -1633,6 +1633,7 @@ private:
{
current_memory_tracker = memory_tracker;
setThreadName("MergingAggregtd");
CurrentMetrics::Increment metric_increment{CurrentMetrics::QueryThread};
try
{

View File

@ -189,7 +189,8 @@ void Compiler::compile(
" -I /usr/share/clickhouse/headers/dbms/include/"
" -I /usr/share/clickhouse/headers/contrib/libcityhash/"
" -I /usr/share/clickhouse/headers/contrib/libdouble-conversion/"
" -I /usr/share/clickhouse/headers/contrib/libcpuid/include/"
" -I /usr/share/clickhouse/headers/contrib/libpoco/Foundation/include/"
" -I /usr/share/clickhouse/headers/contrib/libpoco/Util/include/"
" -I /usr/share/clickhouse/headers/libs/libcommon/include/"
" -I /usr/share/clickhouse/headers/libs/libmysqlxx/include/"
" " << additional_compiler_flags <<

View File

@ -1,6 +1,7 @@
#pragma once
#include <DB/IO/WriteBufferFromHTTPServerResponse.h>
#include <DB/Common/CurrentMetrics.h>
#include "Server.h"
@ -33,6 +34,8 @@ public:
private:
Server & server;
CurrentMetrics::Increment metric_increment{CurrentMetrics::HTTPConnection};
Logger * log;
/// Функция также инициализирует used_output.

View File

@ -1,6 +1,7 @@
#pragma once
#include "Server.h"
#include <DB/Common/CurrentMetrics.h>
namespace DB
@ -19,7 +20,7 @@ public:
private:
Server & server;
CurrentMetrics::Increment metric_increment{CurrentMetrics::InterserverConnection};
Logger * log;
void processQuery(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response);

View File

@ -2,6 +2,7 @@
#include <daemon/Daemon.h>
#include <DB/Common/setThreadName.h>
#include <DB/Common/CurrentMetrics.h>
namespace DB
@ -29,7 +30,7 @@ MetricsTransmitter::~MetricsTransmitter()
void MetricsTransmitter::run()
{
setThreadName("ProfileEventsTx");
setThreadName("MetricsTransmit");
const auto get_next_minute = [] {
return std::chrono::time_point_cast<std::chrono::minutes, std::chrono::system_clock>(
@ -44,15 +45,15 @@ void MetricsTransmitter::run()
if (cond.wait_until(lock, get_next_minute(), [this] { return quit; }))
break;
transmitCounters();
transmit();
}
}
void MetricsTransmitter::transmitCounters()
void MetricsTransmitter::transmit()
{
GraphiteWriter::KeyValueVector<size_t> key_vals{};
key_vals.reserve(ProfileEvents::END);
GraphiteWriter::KeyValueVector<ssize_t> key_vals{};
key_vals.reserve(ProfileEvents::END + CurrentMetrics::END);
for (size_t i = 0; i < ProfileEvents::END; ++i)
{
@ -60,10 +61,18 @@ void MetricsTransmitter::transmitCounters()
const auto counter_increment = counter - prev_counters[i];
prev_counters[i] = counter;
std::string key{ProfileEvents::getDescription(static_cast<ProfileEvents::Event>(i))};
std::string key {ProfileEvents::getDescription(static_cast<ProfileEvents::Event>(i))};
key_vals.emplace_back(event_path_prefix + key, counter_increment);
}
for (size_t i = 0; i < CurrentMetrics::END; ++i)
{
const auto value = CurrentMetrics::values[i];
std::string key {CurrentMetrics::getDescription(static_cast<CurrentMetrics::Metric>(i))};
key_vals.emplace_back(metrics_path_prefix + key, value);
}
Daemon::instance().writeToGraphite(key_vals);
}

View File

@ -10,7 +10,10 @@
namespace DB
{
/** Automatically sends difference of ProfileEvents to Graphite at beginning of every minute
/** Automatically sends
* - difference of ProfileEvents;
* - values of CurrentMetrics;
* to Graphite at beginning of every minute.
*/
class MetricsTransmitter
{
@ -19,7 +22,7 @@ public:
private:
void run();
void transmitCounters();
void transmit();
/// Значения счётчиков при предыдущей отправке (или нули, если ни разу не отправляли).
decltype(ProfileEvents::counters) prev_counters{};
@ -30,6 +33,7 @@ private:
std::thread thread {&MetricsTransmitter::run, this};
static constexpr auto event_path_prefix = "ClickHouse.ProfileEvents.";
static constexpr auto metrics_path_prefix = "ClickHouse.Metrics.";
};
}

View File

@ -1,6 +1,7 @@
#pragma once
#include "Server.h"
#include <DB/Common/CurrentMetrics.h>
namespace DB
@ -23,6 +24,7 @@ private:
Server & server;
Logger * log;
const String profile;
CurrentMetrics::Increment metric_increment{CurrentMetrics::HTTPConnection};
void processQuery(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response);
};

View File

@ -34,6 +34,7 @@
#include <DB/Storages/System/StorageSystemColumns.h>
#include <DB/Storages/System/StorageSystemFunctions.h>
#include <DB/Storages/System/StorageSystemClusters.h>
#include <DB/Storages/System/StorageSystemMetrics.h>
#include <zkutil/ZooKeeper.h>
@ -303,6 +304,7 @@ int Server::main(const std::vector<std::string> & args)
global_context->addTable("system", "processes", StorageSystemProcesses::create("processes"));
global_context->addTable("system", "settings", StorageSystemSettings::create("settings"));
global_context->addTable("system", "events", StorageSystemEvents::create("events"));
global_context->addTable("system", "metrics", StorageSystemMetrics::create("metrics"));
global_context->addTable("system", "merges", StorageSystemMerges::create("merges"));
global_context->addTable("system", "replicas", StorageSystemReplicas::create("replicas"));
global_context->addTable("system", "replication_queue", StorageSystemReplicationQueue::create("replication_queue"));

View File

@ -9,6 +9,7 @@
#include <DB/DataStreams/BlockIO.h>
#include <DB/Common/Stopwatch.h>
#include <DB/Common/CurrentMetrics.h>
#include "Server.h"
@ -98,6 +99,8 @@ private:
/// На данный момент, поддерживается одновременное выполнение только одного запроса в соединении.
QueryState state;
CurrentMetrics::Increment metric_increment{CurrentMetrics::TCPConnection};
void runImpl();

View File

@ -1,5 +1,6 @@
#include <DB/Common/Exception.h>
#include <DB/Common/setThreadName.h>
#include <DB/Common/CurrentMetrics.h>
#include <DB/IO/WriteHelpers.h>
#include <common/logger_useful.h>
#include <DB/Storages/MergeTree/BackgroundProcessingPool.h>
@ -160,12 +161,16 @@ void BackgroundProcessingPool::threadFunction()
if (task->removed)
continue;
Context context(*this, counters_diff);
bool done_work = task->function(context);
{
CurrentMetrics::Increment metric_increment{CurrentMetrics::BackgroundPoolTask};
/// Если задача сделала полезную работу, то она сможет выполняться в следующий раз хоть сразу.
/// Если нет - добавляем задержку перед повторным исполнением.
task->next_time_to_execute = time(0) + (done_work ? 0 : sleep_seconds);
Context context(*this, counters_diff);
bool done_work = task->function(context);
/// Если задача сделала полезную работу, то она сможет выполняться в следующий раз хоть сразу.
/// Если нет - добавляем задержку перед повторным исполнением.
task->next_time_to_execute = time(0) + (done_work ? 0 : sleep_seconds);
}
}
catch (...)
{

View File

@ -11,6 +11,7 @@
#include <DB/IO/CompressedReadBuffer.h>
#include <DB/IO/HashingReadBuffer.h>
#include <DB/Columns/ColumnsNumber.h>
#include <DB/Common/CurrentMetrics.h>
namespace DB
@ -264,6 +265,8 @@ void MergeTreePartChecker::checkDataPart(
const DataTypes & primary_key_data_types,
MergeTreeData::DataPart::Checksums * out_checksums)
{
CurrentMetrics::Increment metric_increment{CurrentMetrics::ReplicatedChecks};
if (!path.empty() && path.back() != '/')
path += "/";

View File

@ -1,5 +1,6 @@
#include <DB/Storages/MergeTree/ReplicatedMergeTreePartsExchange.h>
#include <DB/Storages/StorageReplicatedMergeTree.h>
#include <DB/Common/CurrentMetrics.h>
namespace DB
@ -27,6 +28,8 @@ void ReplicatedMergeTreePartsServer::processQuery(const Poco::Net::HTMLForm & pa
Poco::ScopedReadRWLock part_lock(part->columns_lock);
CurrentMetrics::Increment metric_increment{CurrentMetrics::ReplicatedSend};
/// Список файлов возьмем из списка контрольных сумм.
MergeTreeData::DataPart::Checksums checksums = part->checksums;
/// Добавим файлы, которых нет в списке контрольных сумм.
@ -98,6 +101,8 @@ MergeTreeData::MutableDataPartPtr ReplicatedMergeTreePartsFetcher::fetchPart(
part_file.remove(true);
}
CurrentMetrics::Increment metric_increment{CurrentMetrics::ReplicatedFetch};
part_file.createDirectory();
MergeTreeData::MutableDataPartPtr new_data_part = std::make_shared<MergeTreeData::DataPart>(data);

View File

@ -70,6 +70,8 @@ void ReplicatedMergeTreeRestartingThread::run()
else
LOG_WARNING(log, "ZooKeeper session has expired. Switching to a new session.");
if (!storage.is_readonly)
CurrentMetrics::add(CurrentMetrics::ReadonlyReplica);
storage.is_readonly = true;
partialShutdown();
}
@ -98,6 +100,8 @@ void ReplicatedMergeTreeRestartingThread::run()
break;
}
if (storage.is_readonly)
CurrentMetrics::sub(CurrentMetrics::ReadonlyReplica);
storage.is_readonly = false;
first_time = false;
need_restart = false;
@ -118,7 +122,7 @@ void ReplicatedMergeTreeRestartingThread::run()
}
catch (...)
{
tryLogCurrentException("__PRETTY_FUNCTION__", "Cannot get replica delays");
tryLogCurrentException(__PRETTY_FUNCTION__, "Cannot get replica delays");
error = true;
}
@ -134,6 +138,8 @@ void ReplicatedMergeTreeRestartingThread::run()
LOG_INFO(log, "Relative replica delay (" << relative_delay << " seconds) is bigger than threshold ("
<< storage.data.settings.min_relative_delay_to_yield_leadership << "). Will yield leadership.");
ProfileEvents::increment(ProfileEvents::ReplicaYieldLeadership);
need_restart = true;
continue;
}
@ -336,6 +342,8 @@ void ReplicatedMergeTreeRestartingThread::activateReplica()
void ReplicatedMergeTreeRestartingThread::partialShutdown()
{
ProfileEvents::increment(ProfileEvents::ReplicaPartialShutdown);
storage.leader_election = nullptr;
storage.shutdown_called = true;
storage.shutdown_event.set();
@ -376,7 +384,10 @@ void ReplicatedMergeTreeRestartingThread::partialShutdown()
void ReplicatedMergeTreeRestartingThread::goReadOnlyPermanently()
{
LOG_INFO(log, "Going to readonly mode");
ProfileEvents::increment(ProfileEvents::ReplicaPermanentlyReadonly);
if (!storage.is_readonly)
CurrentMetrics::add(CurrentMetrics::ReadonlyReplica);
storage.is_readonly = true;
stop();

View File

@ -1040,7 +1040,8 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry, Backgro
/// Проверяем, что пока мы собирали версии, не ожила реплика с нужным куском.
replica = findReplicaHavingPart(entry.new_part_name, true);
/// Также за это время могла быть создана совсем новая реплика. Но если на старых не появится куска, то на новой его тоже не может быть.
/// Также за это время могла быть создана совсем новая реплика.
/// Но если на старых не появится куска, то на новой его тоже не может быть.
if (replica.empty())
{
@ -1062,6 +1063,8 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry, Backgro
throw Exception("Logical error: log entry with quorum for part covering more than one block number",
ErrorCodes::LOGICAL_ERROR);
zookeeper->createIfNotExists(zookeeper_path + "/nonincrement_block_numbers/" + partition_str, "");
auto acl = zookeeper->getDefaultACL();
ops.push_back(new zkutil::Op::Create(
@ -1094,7 +1097,7 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry, Backgro
else if (code == ZBADVERSION || code == ZNONODE || code == ZNODEEXISTS)
{
LOG_DEBUG(log, "State was changed or isn't expected when trying to mark quorum for part "
<< entry.new_part_name << " as failed.");
<< entry.new_part_name << " as failed. Code: " << zerror(code));
}
else
throw zkutil::KeeperException(code);
@ -1103,7 +1106,8 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry, Backgro
{
LOG_WARNING(log, "No active replica has part " << entry.new_part_name
<< ", but that part needs quorum and /quorum/status contains entry about another part " << quorum_entry.part_name
<< ". It means that part was successfully written to " << entry.quorum << " replicas, but then all of them goes offline."
<< ". It means that part was successfully written to " << entry.quorum
<< " replicas, but then all of them goes offline."
<< " Or it is a bug.");
}
}
@ -1920,7 +1924,6 @@ void StorageReplicatedMergeTree::searchForMissingPart(const String & part_name)
const auto partition_str = part_name.substr(0, 6);
for (auto i = part_info.left; i <= part_info.right; ++i)
{
zookeeper->createIfNotExists(zookeeper_path + "/nonincrement_block_numbers", "");
zookeeper->createIfNotExists(zookeeper_path + "/nonincrement_block_numbers/" + partition_str, "");
AbandonableLockInZooKeeper::createAbandonedIfNotExists(
zookeeper_path + "/nonincrement_block_numbers/" + partition_str + "/block-" + padIndex(i),

View File

@ -0,0 +1,67 @@
#include <DB/Common/CurrentMetrics.h>
#include <DB/Columns/ColumnString.h>
#include <DB/DataTypes/DataTypeString.h>
#include <DB/DataTypes/DataTypesNumberFixed.h>
#include <DB/DataStreams/OneBlockInputStream.h>
#include <DB/Storages/System/StorageSystemMetrics.h>
namespace DB
{
StorageSystemMetrics::StorageSystemMetrics(const std::string & name_)
: name(name_),
columns
{
{"metric", new DataTypeString},
{"value", new DataTypeInt64},
}
{
}
StoragePtr StorageSystemMetrics::create(const std::string & name_)
{
return (new StorageSystemMetrics(name_))->thisPtr();
}
BlockInputStreams StorageSystemMetrics::read(
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
const size_t max_block_size,
const unsigned threads)
{
check(column_names);
processed_stage = QueryProcessingStage::FetchColumns;
Block block;
ColumnWithTypeAndName col_metric;
col_metric.name = "metric";
col_metric.type = new DataTypeString;
col_metric.column = new ColumnString;
block.insert(col_metric);
ColumnWithTypeAndName col_value;
col_value.name = "value";
col_value.type = new DataTypeInt64;
col_value.column = new ColumnInt64;
block.insert(col_value);
for (size_t i = 0; i < CurrentMetrics::END; ++i)
{
auto value = CurrentMetrics::values[i];
col_metric.column->insert(String(CurrentMetrics::getDescription(CurrentMetrics::Metric(i))));
col_value.column->insert(value);
}
return BlockInputStreams(1, new OneBlockInputStream(block));
}
}

View File

@ -54,15 +54,17 @@ BlockInputStreams StorageSystemProcesses::read(
ColumnWithTypeAndName col_query{new ColumnString, new DataTypeString, "query"};
ColumnWithTypeAndName col_query_id{new ColumnString, new DataTypeString, "query_id"};
for (const auto & process : context.getProcessList().get())
ProcessList::Info info = context.getProcessList().getInfo();
for (const auto & process : info)
{
col_user.column->insert(process.user);
col_address.column->insert(process.ip_address.toString());
col_elapsed.column->insert(process.watch.elapsedSeconds());
col_rows_read.column->insert(process.progress.rows);
col_bytes_read.column->insert(process.progress.bytes);
col_total_rows_approx.column->insert(process.progress.total_rows);
col_memory_usage.column->insert(static_cast<UInt64>(process.memory_tracker.get()));
col_elapsed.column->insert(process.elapsed_seconds);
col_rows_read.column->insert(process.rows);
col_bytes_read.column->insert(process.bytes);
col_total_rows_approx.column->insert(process.total_rows);
col_memory_usage.column->insert(static_cast<UInt64>(process.memory_usage));
col_query.column->insert(process.query);
col_query_id.column->insert(process.query_id);
}