dbms: added ProfileEvents system [#METR-2944].

This commit is contained in:
Alexey Milovidov 2014-01-03 08:20:13 +00:00
parent af3d957de1
commit fd414800d5
13 changed files with 207 additions and 3 deletions

View File

@ -0,0 +1,62 @@
#pragma once
#include <stddef.h>
/** Позволяет считать количество различных событий, произошедших в программе
* - для высокоуровневого профайлинга.
*/
#define APPLY_FOR_EVENTS(M) \
M(Query, "Queries") \
M(SelectQuery, "Select queries") \
M(InsertQuery, "Insert queries") \
M(FileOpen, "File opens") \
M(Seek, "Seeks") \
M(ReadBufferFromFileDescriptorRead, "ReadBufferFromFileDescriptor reads") \
M(ReadCompressedBytes, "Read compressed bytes") \
M(CompressedReadBufferBlocks, "Read decompressed blocks") \
M(CompressedReadBufferBytes, "Read decompressed bytes") \
M(UncompressedCacheHits, "Uncompressed cache hits") \
M(UncompressedCacheMisses, "Uncompressed cache misses") \
\
M(END, "")
namespace ProfileEvents
{
/// Виды событий.
enum Event
{
#define M(NAME, DESCRIPTION) NAME,
APPLY_FOR_EVENTS(M)
#undef M
};
/// Получить текстовое описание события по его enum-у.
inline const char * getDescription(Event event)
{
static const char * descriptions[] =
{
#define M(NAME, DESCRIPTION) DESCRIPTION,
APPLY_FOR_EVENTS(M)
#undef M
};
return descriptions[event];
}
/// Счётчики - сколько раз каждое из событий произошло.
extern size_t counters[Event::END];
/// Увеличить счётчик события. Потокобезопасно.
inline void increment(Event event, size_t amount = 1)
{
__sync_fetch_and_add(&counters[event], amount);
}
}
#undef APPLY_FOR_EVENTS

View File

@ -7,6 +7,7 @@
#include <lz4/lz4.h>
#include <DB/Common/PODArray.h>
#include <DB/Common/ProfileEvents.h>
#include <DB/Core/Exception.h>
#include <DB/Core/ErrorCodes.h>
#include <DB/IO/ReadBuffer.h>
@ -50,6 +51,8 @@ private:
if (size_compressed > DBMS_MAX_COMPRESSED_SIZE)
throw Exception("Too large size_compressed. Most likely corrupted data.", ErrorCodes::TOO_LARGE_SIZE_COMPRESSED);
ProfileEvents::increment(ProfileEvents::ReadCompressedBytes, size_compressed + sizeof(checksum));
size_decompressed = qlz_size_decompressed(&own_compressed_buffer[0]);
/// Находится ли сжатый блок целиком в буфере in?
@ -74,6 +77,9 @@ private:
void decompress(char * to, size_t size_decompressed)
{
ProfileEvents::increment(ProfileEvents::CompressedReadBufferBlocks);
ProfileEvents::increment(ProfileEvents::CompressedReadBufferBytes, size_decompressed);
/// Старший бит первого байта определяет использованный метод сжатия.
if ((compressed_buffer[0] & 0x80) == 0)
{

View File

@ -20,6 +20,8 @@ public:
char * existing_memory = NULL, size_t alignment = 0)
: ReadBufferFromFileDescriptor(-1, buf_size, existing_memory, alignment), file_name(file_name_)
{
ProfileEvents::increment(ProfileEvents::FileOpen);
fd = open(file_name.c_str(), O_RDONLY);
if (-1 == fd)

View File

@ -3,6 +3,8 @@
#include <unistd.h>
#include <errno.h>
#include <DB/Common/ProfileEvents.h>
#include <DB/Core/Exception.h>
#include <DB/Core/ErrorCodes.h>
@ -27,6 +29,8 @@ protected:
size_t bytes_read = 0;
while (!bytes_read)
{
ProfileEvents::increment(ProfileEvents::ReadBufferFromFileDescriptorRead);
ssize_t res = ::read(fd, internal_buffer.begin(), internal_buffer.size());
if (!res)
break;
@ -84,6 +88,8 @@ public:
}
else
{
ProfileEvents::increment(ProfileEvents::Seek);
pos = working_buffer.end();
off_t res = lseek(fd, new_pos, SEEK_SET);
if (-1 == res)

View File

@ -6,6 +6,7 @@
#include <Poco/Mutex.h>
#include <DB/Common/SipHash.h>
#include <DB/Common/ProfileEvents.h>
#include <DB/IO/BufferWithOwnMemory.h>
#include <DB/Interpreters/AggregationCommon.h>
@ -68,11 +69,13 @@ public:
if (cell && cell->key == key)
{
ProfileEvents::increment(ProfileEvents::UncompressedCacheHits);
++hits;
return cell;
}
else
{
ProfileEvents::increment(ProfileEvents::UncompressedCacheMisses);
++misses;
return NULL;
}

View File

@ -4,6 +4,8 @@
#include <sys/stat.h>
#include <fcntl.h>
#include <DB/Common/ProfileEvents.h>
#include <DB/IO/WriteBufferFromFileDescriptor.h>
@ -22,6 +24,8 @@ public:
char * existing_memory = NULL, size_t alignment = 0)
: WriteBufferFromFileDescriptor(-1, buf_size, existing_memory, alignment), file_name(file_name_)
{
ProfileEvents::increment(ProfileEvents::FileOpen);
fd = open(file_name.c_str(), flags == -1 ? O_WRONLY | O_TRUNC | O_CREAT : flags, mode);
if (-1 == fd)

View File

@ -0,0 +1,42 @@
#pragma once
#include <Poco/SharedPtr.h>
#include <DB/Storages/IStorage.h>
#include <DB/DataStreams/IProfilingBlockInputStream.h>
namespace DB
{
using Poco::SharedPtr;
/** Реализует системную таблицу events, которая позволяет получить информацию для профайлинга.
*/
class StorageSystemEvents : public IStorage
{
public:
static StoragePtr create(const std::string & name_);
std::string getName() const { return "SystemEvents"; }
std::string getTableName() const { return name; }
const NamesAndTypesList & getColumnsList() const { return columns; }
BlockInputStreams read(
const Names & column_names,
ASTPtr query,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE,
unsigned threads = 1);
private:
const std::string name;
NamesAndTypesList columns;
StorageSystemEvents(const std::string & name_);
};
}

View File

@ -0,0 +1,7 @@
#include <DB/Common/ProfileEvents.h>
namespace ProfileEvents
{
size_t counters[Event::END]; /// Глобальная переменная - инициализируется нулями.
}

View File

@ -19,7 +19,7 @@ namespace DB
InterpreterInsertQuery::InterpreterInsertQuery(ASTPtr query_ptr_, Context & context_)
: query_ptr(query_ptr_), context(context_)
{
ProfileEvents::increment(ProfileEvents::InsertQuery);
}

View File

@ -29,6 +29,8 @@ namespace DB
void InterpreterSelectQuery::init(BlockInputStreamPtr input_)
{
ProfileEvents::increment(ProfileEvents::SelectQuery);
if (settings.limits.max_subquery_depth && subquery_depth > settings.limits.max_subquery_depth)
throw Exception("Too deep subqueries. Maximum: " + toString(settings.limits.max_subquery_depth),
ErrorCodes::TOO_DEEP_SUBQUERIES);

View File

@ -1,3 +1,5 @@
#include <DB/Common/ProfileEvents.h>
#include <DB/Parsers/formatAST.h>
#include <DB/DataStreams/BlockIO.h>
@ -25,6 +27,8 @@ void executeQuery(
bool internal,
QueryProcessingStage::Enum stage)
{
ProfileEvents::increment(ProfileEvents::Query);
ParserQuery parser;
ASTPtr ast;
std::string expected;
@ -112,6 +116,8 @@ BlockIO executeQuery(
bool internal,
QueryProcessingStage::Enum stage)
{
ProfileEvents::increment(ProfileEvents::Query);
ParserQuery parser;
ASTPtr ast;
std::string expected;

View File

@ -7,6 +7,7 @@
#include <DB/Storages/StorageSystemTables.h>
#include <DB/Storages/StorageSystemDatabases.h>
#include <DB/Storages/StorageSystemProcesses.h>
#include <DB/Storages/StorageSystemEvents.h>
#include <DB/Storages/StorageSystemOne.h>
#include "Server.h"
@ -126,8 +127,9 @@ int Server::main(const std::vector<std::string> & args)
global_context->addTable("system", "one", StorageSystemOne::create("one"));
global_context->addTable("system", "numbers", StorageSystemNumbers::create("numbers"));
global_context->addTable("system", "tables", StorageSystemTables::create("tables", *global_context));
global_context->addTable("system", "databases", StorageSystemDatabases::create("databases", *global_context));
global_context->addTable("system", "processes", StorageSystemProcesses::create("processes", *global_context));
global_context->addTable("system", "databases", StorageSystemDatabases::create("databases", *global_context));
global_context->addTable("system", "processes", StorageSystemProcesses::create("processes", *global_context));
global_context->addTable("system", "events", StorageSystemEvents::create("events"));
global_context->setCurrentDatabase(config.getString("default_database", "default"));

View File

@ -0,0 +1,62 @@
#include <DB/Common/ProfileEvents.h>
#include <DB/Columns/ColumnString.h>
#include <DB/DataTypes/DataTypeString.h>
#include <DB/DataTypes/DataTypesNumberFixed.h>
#include <DB/DataStreams/OneBlockInputStream.h>
#include <DB/Storages/StorageSystemEvents.h>
namespace DB
{
StorageSystemEvents::StorageSystemEvents(const std::string & name_)
: name(name_)
{
columns.push_back(NameAndTypePair("event", new DataTypeString));
columns.push_back(NameAndTypePair("value", new DataTypeUInt64));
}
StoragePtr StorageSystemEvents::create(const std::string & name_)
{
return (new StorageSystemEvents(name_))->thisPtr();
}
BlockInputStreams StorageSystemEvents::read(
const Names & column_names, ASTPtr query, const Settings & settings,
QueryProcessingStage::Enum & processed_stage, size_t max_block_size, unsigned threads)
{
check(column_names);
processed_stage = QueryProcessingStage::FetchColumns;
Block block;
ColumnWithNameAndType col_event;
col_event.name = "event";
col_event.type = new DataTypeString;
col_event.column = new ColumnString;
block.insert(col_event);
ColumnWithNameAndType col_value;
col_value.name = "value";
col_value.type = new DataTypeUInt64;
col_value.column = new ColumnUInt64;
block.insert(col_value);
for (size_t i = 0; i < ProfileEvents::END; ++i)
{
UInt64 value = ProfileEvents::counters[i];
if (0 != value)
{
col_event.column->insert(String(ProfileEvents::getDescription(ProfileEvents::Event(i))));
col_value.column->insert(value);
}
}
return BlockInputStreams(1, new OneBlockInputStream(block));
}
}