ClickHouse/dbms/src/Interpreters/QueryLog.cpp

280 lines
8.4 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#include <statdaemons/Stopwatch.h>
#include <DB/Parsers/ASTCreateQuery.h>
#include <DB/Parsers/parseQuery.h>
#include <DB/Parsers/ExpressionElementParsers.h>
#include <DB/Parsers/ASTRenameQuery.h>
#include <DB/Parsers/formatAST.h>
#include <DB/Columns/ColumnsNumber.h>
#include <DB/Columns/ColumnString.h>
#include <DB/Columns/ColumnFixedString.h>
#include <DB/DataTypes/DataTypesNumberFixed.h>
#include <DB/DataTypes/DataTypeDateTime.h>
#include <DB/DataTypes/DataTypeDate.h>
#include <DB/DataTypes/DataTypeString.h>
#include <DB/DataTypes/DataTypeFixedString.h>
#include <DB/Interpreters/InterpreterCreateQuery.h>
#include <DB/Interpreters/InterpreterRenameQuery.h>
#include <DB/Interpreters/QueryLog.h>
namespace DB
{
QueryLog::QueryLog(Context & context_, const String & database_name_, const String & table_name_, size_t flush_interval_milliseconds_)
: context(context_), database_name(database_name_), table_name(table_name_), flush_interval_milliseconds(flush_interval_milliseconds_)
{
data.reserve(DBMS_QUERY_LOG_QUEUE_SIZE);
{
String description = backQuoteIfNeed(database_name) + "." + backQuoteIfNeed(table_name);
Poco::ScopedLock<Poco::Mutex> lock(context.getMutex());
table = context.tryGetTable(database_name, table_name);
if (table)
{
const Block expected = createBlock();
const Block actual = table->getSampleBlockNonMaterialized();
if (!blocksHaveEqualStructure(actual, expected))
{
/// Переименовываем существующую таблицу.
int suffix = 0;
while (context.isTableExist(database_name, table_name + "_" + toString(suffix)))
++suffix;
ASTRenameQuery * rename = new ASTRenameQuery;
ASTPtr holder = rename;
ASTRenameQuery::Table from;
from.database = database_name;
from.table = table_name;
ASTRenameQuery::Table to;
to.database = database_name;
to.table = table_name + "_" + toString(suffix);
ASTRenameQuery::Element elem;
elem.from = from;
elem.to = to;
rename->elements.emplace_back(elem);
LOG_DEBUG(log, "Existing table " << description << " for query log has obsolete or different structure."
" Renaming it to " << backQuoteIfNeed(to.table));
InterpreterRenameQuery(holder, context).execute();
/// Нужная таблица будет создана.
table = nullptr;
}
else
LOG_DEBUG(log, "Will use existing table " << description << " for query log.");
}
if (!table)
{
/// Создаём таблицу.
LOG_DEBUG(log, "Creating new table " << description << " for query log.");
ASTCreateQuery * create = new ASTCreateQuery;
ASTPtr holder = create;
create->database = database_name;
create->table = table_name;
Block sample = createBlock();
create->columns = InterpreterCreateQuery::formatColumns(sample.getColumnsList());
String engine = "MergeTree(event_date, event_time, 8192)";
ParserFunction engine_parser;
create->storage = parseQuery(engine_parser, engine.data(), engine.data() + engine.size(), "ENGINE to create table for query log");
InterpreterCreateQuery(holder, context).execute();
table = context.getTable(database_name, table_name);
}
}
saving_thread = std::thread([this] { threadFunction(); });
}
QueryLog::~QueryLog()
{
/// Говорим потоку, что надо завершиться.
QueryLogElement elem;
elem.type = QueryLogElement::SHUTDOWN;
queue.push(elem);
saving_thread.join();
}
void QueryLog::threadFunction()
{
Stopwatch time_after_last_write;
bool first = true;
while (true)
{
try
{
if (first)
{
time_after_last_write.restart();
first = false;
}
QueryLogElement element;
bool has_element = false;
if (data.empty())
{
queue.pop(element);
has_element = true;
}
else
{
size_t milliseconds_elapsed = time_after_last_write.elapsed() / 1000000;
if (milliseconds_elapsed < flush_interval_milliseconds)
has_element = queue.tryPop(element, flush_interval_milliseconds - milliseconds_elapsed);
}
if (has_element)
{
if (element.type == QueryLogElement::SHUTDOWN)
{
flush();
break;
}
else
data.push_back(element);
}
size_t milliseconds_elapsed = time_after_last_write.elapsed() / 1000000;
if (milliseconds_elapsed >= flush_interval_milliseconds)
{
/// Записываем данные в таблицу.
flush();
time_after_last_write.restart();
}
}
catch (...)
{
/// В случае ошибки теряем накопленные записи, чтобы не блокироваться.
data.clear();
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
}
Block QueryLog::createBlock()
{
return
{
{new ColumnUInt8, new DataTypeUInt8, "type"},
{new ColumnUInt16, new DataTypeDate, "event_date"},
{new ColumnUInt32, new DataTypeDateTime, "event_time"},
{new ColumnUInt32, new DataTypeDateTime, "query_start_time"},
{new ColumnUInt64, new DataTypeUInt64, "query_duration_ms"},
{new ColumnUInt64, new DataTypeUInt64, "read_rows"},
{new ColumnUInt64, new DataTypeUInt64, "read_bytes"},
{new ColumnUInt64, new DataTypeUInt64, "result_rows"},
{new ColumnUInt64, new DataTypeUInt64, "result_bytes"},
{new ColumnUInt64, new DataTypeUInt64, "memory_usage"},
{new ColumnString, new DataTypeString, "query"},
{new ColumnString, new DataTypeString, "exception"},
{new ColumnString, new DataTypeString, "stack_trace"},
{new ColumnUInt8, new DataTypeUInt8, "interface"},
{new ColumnUInt8, new DataTypeUInt8, "http_method"},
{new ColumnFixedString(16), new DataTypeFixedString(16), "ip_address"},
{new ColumnString, new DataTypeString, "user"},
{new ColumnString, new DataTypeString, "query_id"},
};
}
void QueryLog::flush()
{
try
{
LOG_TRACE(log, "Flushing query log");
auto & date_lut = DateLUT::instance();
Block block = createBlock();
for (const QueryLogElement & elem : data)
{
char ipv6_binary[16];
if (Poco::Net::IPAddress::IPv6 == elem.ip_address.family())
{
memcpy(ipv6_binary, elem.ip_address.addr(), 16);
}
else if (Poco::Net::IPAddress::IPv4 == elem.ip_address.family())
{
/// Преобразуем в IPv6-mapped адрес.
memset(ipv6_binary, 0, 10);
ipv6_binary[10] = '\xFF';
ipv6_binary[11] = '\xFF';
memcpy(&ipv6_binary[12], elem.ip_address.addr(), 4);
}
else
memset(ipv6_binary, 0, 16);
size_t i = 0;
block.unsafeGetByPosition(i++).column.get()->insert(static_cast<UInt64>(elem.type));
block.unsafeGetByPosition(i++).column.get()->insert(static_cast<UInt64>(date_lut.toDayNum(elem.event_time)));
block.unsafeGetByPosition(i++).column.get()->insert(static_cast<UInt64>(elem.event_time));
block.unsafeGetByPosition(i++).column.get()->insert(static_cast<UInt64>(elem.query_start_time));
block.unsafeGetByPosition(i++).column.get()->insert(static_cast<UInt64>(elem.query_duration_ms));
block.unsafeGetByPosition(i++).column.get()->insert(static_cast<UInt64>(elem.read_rows));
block.unsafeGetByPosition(i++).column.get()->insert(static_cast<UInt64>(elem.read_bytes));
block.unsafeGetByPosition(i++).column.get()->insert(static_cast<UInt64>(elem.result_rows));
block.unsafeGetByPosition(i++).column.get()->insert(static_cast<UInt64>(elem.result_bytes));
block.unsafeGetByPosition(i++).column.get()->insert(static_cast<UInt64>(elem.memory_usage));
block.unsafeGetByPosition(i++).column.get()->insertData(elem.query.data(), elem.query.size());
block.unsafeGetByPosition(i++).column.get()->insertData(elem.exception.data(), elem.exception.size());
block.unsafeGetByPosition(i++).column.get()->insertData(elem.stack_trace.data(), elem.stack_trace.size());
block.unsafeGetByPosition(i++).column.get()->insert(static_cast<UInt64>(elem.interface));
block.unsafeGetByPosition(i++).column.get()->insert(static_cast<UInt64>(elem.http_method));
block.unsafeGetByPosition(i++).column.get()->insertData(ipv6_binary, 16);
block.unsafeGetByPosition(i++).column.get()->insertData(elem.user.data(), elem.user.size());
block.unsafeGetByPosition(i++).column.get()->insertData(elem.query_id.data(), elem.query_id.size());
}
BlockOutputStreamPtr stream = table->write(nullptr);
stream->writePrefix();
stream->write(block);
stream->writeSuffix();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
/// В случае ошибки тоже очищаем накопленные записи, чтобы не блокироваться.
data.clear();
}
}