2015-06-26 19:23:25 +00:00
|
|
|
|
#include <statdaemons/Stopwatch.h>
|
|
|
|
|
#include <DB/Parsers/ASTCreateQuery.h>
|
|
|
|
|
#include <DB/Parsers/parseQuery.h>
|
|
|
|
|
#include <DB/Parsers/ExpressionElementParsers.h>
|
|
|
|
|
#include <DB/Parsers/ASTRenameQuery.h>
|
|
|
|
|
#include <DB/Parsers/formatAST.h>
|
|
|
|
|
#include <DB/Columns/ColumnsNumber.h>
|
|
|
|
|
#include <DB/Columns/ColumnString.h>
|
|
|
|
|
#include <DB/Columns/ColumnFixedString.h>
|
|
|
|
|
#include <DB/DataTypes/DataTypesNumberFixed.h>
|
|
|
|
|
#include <DB/DataTypes/DataTypeDateTime.h>
|
|
|
|
|
#include <DB/DataTypes/DataTypeDate.h>
|
|
|
|
|
#include <DB/DataTypes/DataTypeString.h>
|
|
|
|
|
#include <DB/DataTypes/DataTypeFixedString.h>
|
|
|
|
|
#include <DB/Interpreters/InterpreterCreateQuery.h>
|
|
|
|
|
#include <DB/Interpreters/InterpreterRenameQuery.h>
|
|
|
|
|
#include <DB/Interpreters/QueryLog.h>
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
namespace DB
|
|
|
|
|
{
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
QueryLog::QueryLog(Context & context_, const String & database_name_, const String & table_name_, size_t flush_interval_milliseconds_)
|
|
|
|
|
: context(context_), database_name(database_name_), table_name(table_name_), flush_interval_milliseconds(flush_interval_milliseconds_)
|
|
|
|
|
{
|
|
|
|
|
data.reserve(DBMS_QUERY_LOG_QUEUE_SIZE);
|
|
|
|
|
|
|
|
|
|
{
|
|
|
|
|
String description = backQuoteIfNeed(database_name) + "." + backQuoteIfNeed(table_name);
|
|
|
|
|
|
|
|
|
|
Poco::ScopedLock<Poco::Mutex> lock(context.getMutex());
|
|
|
|
|
|
|
|
|
|
table = context.tryGetTable(database_name, table_name);
|
|
|
|
|
|
|
|
|
|
if (table)
|
|
|
|
|
{
|
|
|
|
|
const Block expected = createBlock();
|
|
|
|
|
const Block actual = table->getSampleBlockNonMaterialized();
|
|
|
|
|
|
|
|
|
|
if (!blocksHaveEqualStructure(actual, expected))
|
|
|
|
|
{
|
|
|
|
|
/// Переименовываем существующую таблицу.
|
|
|
|
|
int suffix = 0;
|
|
|
|
|
while (context.isTableExist(database_name, table_name + "_" + toString(suffix)))
|
|
|
|
|
++suffix;
|
|
|
|
|
|
|
|
|
|
ASTRenameQuery * rename = new ASTRenameQuery;
|
|
|
|
|
ASTPtr holder = rename;
|
|
|
|
|
|
|
|
|
|
ASTRenameQuery::Table from;
|
|
|
|
|
from.database = database_name;
|
|
|
|
|
from.table = table_name;
|
|
|
|
|
|
|
|
|
|
ASTRenameQuery::Table to;
|
|
|
|
|
to.database = database_name;
|
|
|
|
|
to.table = table_name + "_" + toString(suffix);
|
|
|
|
|
|
|
|
|
|
ASTRenameQuery::Element elem;
|
|
|
|
|
elem.from = from;
|
|
|
|
|
elem.to = to;
|
|
|
|
|
|
|
|
|
|
rename->elements.emplace_back(elem);
|
|
|
|
|
|
|
|
|
|
LOG_DEBUG(log, "Existing table " << description << " for query log has obsolete or different structure."
|
|
|
|
|
" Renaming it to " << backQuoteIfNeed(to.table));
|
|
|
|
|
|
|
|
|
|
InterpreterRenameQuery(holder, context).execute();
|
|
|
|
|
|
|
|
|
|
/// Нужная таблица будет создана.
|
|
|
|
|
table = nullptr;
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
LOG_DEBUG(log, "Will use existing table " << description << " for query log.");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (!table)
|
|
|
|
|
{
|
|
|
|
|
/// Создаём таблицу.
|
|
|
|
|
LOG_DEBUG(log, "Creating new table " << description << " for query log.");
|
|
|
|
|
|
|
|
|
|
ASTCreateQuery * create = new ASTCreateQuery;
|
|
|
|
|
ASTPtr holder = create;
|
|
|
|
|
|
|
|
|
|
create->database = database_name;
|
|
|
|
|
create->table = table_name;
|
|
|
|
|
|
|
|
|
|
Block sample = createBlock();
|
|
|
|
|
create->columns = InterpreterCreateQuery::formatColumns(sample.getColumnsList());
|
|
|
|
|
|
|
|
|
|
String engine = "MergeTree(event_date, event_time, 8192)";
|
|
|
|
|
ParserFunction engine_parser;
|
|
|
|
|
|
|
|
|
|
create->storage = parseQuery(engine_parser, engine.data(), engine.data() + engine.size(), "ENGINE to create table for query log");
|
|
|
|
|
|
|
|
|
|
InterpreterCreateQuery(holder, context).execute();
|
|
|
|
|
|
|
|
|
|
table = context.getTable(database_name, table_name);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
saving_thread = std::thread([this] { threadFunction(); });
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
QueryLog::~QueryLog()
|
|
|
|
|
{
|
|
|
|
|
/// Говорим потоку, что надо завершиться.
|
|
|
|
|
QueryLogElement elem;
|
|
|
|
|
elem.type = QueryLogElement::SHUTDOWN;
|
|
|
|
|
queue.push(elem);
|
|
|
|
|
|
|
|
|
|
saving_thread.join();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
void QueryLog::threadFunction()
|
|
|
|
|
{
|
|
|
|
|
Stopwatch time_after_last_write;
|
|
|
|
|
bool first = true;
|
|
|
|
|
|
|
|
|
|
while (true)
|
|
|
|
|
{
|
|
|
|
|
try
|
|
|
|
|
{
|
|
|
|
|
if (first)
|
|
|
|
|
{
|
|
|
|
|
time_after_last_write.restart();
|
|
|
|
|
first = false;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
QueryLogElement element;
|
|
|
|
|
bool has_element = false;
|
|
|
|
|
|
|
|
|
|
if (data.empty())
|
|
|
|
|
{
|
|
|
|
|
queue.pop(element);
|
|
|
|
|
has_element = true;
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
size_t milliseconds_elapsed = time_after_last_write.elapsed() / 1000000;
|
|
|
|
|
if (milliseconds_elapsed < flush_interval_milliseconds)
|
|
|
|
|
has_element = queue.tryPop(element, flush_interval_milliseconds - milliseconds_elapsed);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (has_element)
|
|
|
|
|
{
|
|
|
|
|
if (element.type == QueryLogElement::SHUTDOWN)
|
|
|
|
|
{
|
|
|
|
|
flush();
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
data.push_back(element);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
size_t milliseconds_elapsed = time_after_last_write.elapsed() / 1000000;
|
|
|
|
|
if (milliseconds_elapsed >= flush_interval_milliseconds)
|
|
|
|
|
{
|
|
|
|
|
/// Записываем данные в таблицу.
|
|
|
|
|
flush();
|
|
|
|
|
time_after_last_write.restart();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
catch (...)
|
|
|
|
|
{
|
|
|
|
|
/// В случае ошибки теряем накопленные записи, чтобы не блокироваться.
|
|
|
|
|
data.clear();
|
|
|
|
|
tryLogCurrentException(__PRETTY_FUNCTION__);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Block QueryLog::createBlock()
|
|
|
|
|
{
|
|
|
|
|
return
|
|
|
|
|
{
|
|
|
|
|
{new ColumnUInt8, new DataTypeUInt8, "type"},
|
|
|
|
|
{new ColumnUInt16, new DataTypeDate, "event_date"},
|
|
|
|
|
{new ColumnUInt32, new DataTypeDateTime, "event_time"},
|
|
|
|
|
{new ColumnUInt32, new DataTypeDateTime, "query_start_time"},
|
|
|
|
|
{new ColumnUInt64, new DataTypeUInt64, "query_duration_ms"},
|
|
|
|
|
|
|
|
|
|
{new ColumnUInt64, new DataTypeUInt64, "read_rows"},
|
|
|
|
|
{new ColumnUInt64, new DataTypeUInt64, "read_bytes"},
|
|
|
|
|
|
|
|
|
|
{new ColumnUInt64, new DataTypeUInt64, "result_rows"},
|
|
|
|
|
{new ColumnUInt64, new DataTypeUInt64, "result_bytes"},
|
|
|
|
|
|
2015-07-01 05:18:54 +00:00
|
|
|
|
{new ColumnUInt64, new DataTypeUInt64, "memory_usage"},
|
|
|
|
|
|
2015-06-26 19:23:25 +00:00
|
|
|
|
{new ColumnString, new DataTypeString, "query"},
|
2015-06-29 23:54:33 +00:00
|
|
|
|
{new ColumnString, new DataTypeString, "exception"},
|
|
|
|
|
{new ColumnString, new DataTypeString, "stack_trace"},
|
2015-06-26 19:23:25 +00:00
|
|
|
|
|
|
|
|
|
{new ColumnUInt8, new DataTypeUInt8, "interface"},
|
|
|
|
|
{new ColumnUInt8, new DataTypeUInt8, "http_method"},
|
|
|
|
|
{new ColumnFixedString(16), new DataTypeFixedString(16), "ip_address"},
|
|
|
|
|
{new ColumnString, new DataTypeString, "user"},
|
|
|
|
|
{new ColumnString, new DataTypeString, "query_id"},
|
|
|
|
|
};
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
void QueryLog::flush()
|
|
|
|
|
{
|
|
|
|
|
try
|
|
|
|
|
{
|
|
|
|
|
LOG_TRACE(log, "Flushing query log");
|
|
|
|
|
|
2015-07-07 23:11:30 +00:00
|
|
|
|
const auto & date_lut = DateLUT::instance();
|
2015-06-26 19:23:25 +00:00
|
|
|
|
|
|
|
|
|
Block block = createBlock();
|
|
|
|
|
|
|
|
|
|
for (const QueryLogElement & elem : data)
|
|
|
|
|
{
|
|
|
|
|
char ipv6_binary[16];
|
|
|
|
|
if (Poco::Net::IPAddress::IPv6 == elem.ip_address.family())
|
|
|
|
|
{
|
|
|
|
|
memcpy(ipv6_binary, elem.ip_address.addr(), 16);
|
|
|
|
|
}
|
|
|
|
|
else if (Poco::Net::IPAddress::IPv4 == elem.ip_address.family())
|
|
|
|
|
{
|
|
|
|
|
/// Преобразуем в IPv6-mapped адрес.
|
|
|
|
|
memset(ipv6_binary, 0, 10);
|
|
|
|
|
ipv6_binary[10] = '\xFF';
|
|
|
|
|
ipv6_binary[11] = '\xFF';
|
|
|
|
|
memcpy(&ipv6_binary[12], elem.ip_address.addr(), 4);
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
memset(ipv6_binary, 0, 16);
|
|
|
|
|
|
2015-07-01 05:18:54 +00:00
|
|
|
|
size_t i = 0;
|
|
|
|
|
|
|
|
|
|
block.unsafeGetByPosition(i++).column.get()->insert(static_cast<UInt64>(elem.type));
|
|
|
|
|
block.unsafeGetByPosition(i++).column.get()->insert(static_cast<UInt64>(date_lut.toDayNum(elem.event_time)));
|
|
|
|
|
block.unsafeGetByPosition(i++).column.get()->insert(static_cast<UInt64>(elem.event_time));
|
|
|
|
|
block.unsafeGetByPosition(i++).column.get()->insert(static_cast<UInt64>(elem.query_start_time));
|
|
|
|
|
block.unsafeGetByPosition(i++).column.get()->insert(static_cast<UInt64>(elem.query_duration_ms));
|
|
|
|
|
|
|
|
|
|
block.unsafeGetByPosition(i++).column.get()->insert(static_cast<UInt64>(elem.read_rows));
|
|
|
|
|
block.unsafeGetByPosition(i++).column.get()->insert(static_cast<UInt64>(elem.read_bytes));
|
|
|
|
|
|
|
|
|
|
block.unsafeGetByPosition(i++).column.get()->insert(static_cast<UInt64>(elem.result_rows));
|
|
|
|
|
block.unsafeGetByPosition(i++).column.get()->insert(static_cast<UInt64>(elem.result_bytes));
|
|
|
|
|
|
|
|
|
|
block.unsafeGetByPosition(i++).column.get()->insert(static_cast<UInt64>(elem.memory_usage));
|
|
|
|
|
|
|
|
|
|
block.unsafeGetByPosition(i++).column.get()->insertData(elem.query.data(), elem.query.size());
|
|
|
|
|
block.unsafeGetByPosition(i++).column.get()->insertData(elem.exception.data(), elem.exception.size());
|
|
|
|
|
block.unsafeGetByPosition(i++).column.get()->insertData(elem.stack_trace.data(), elem.stack_trace.size());
|
|
|
|
|
|
|
|
|
|
block.unsafeGetByPosition(i++).column.get()->insert(static_cast<UInt64>(elem.interface));
|
|
|
|
|
block.unsafeGetByPosition(i++).column.get()->insert(static_cast<UInt64>(elem.http_method));
|
|
|
|
|
|
|
|
|
|
block.unsafeGetByPosition(i++).column.get()->insertData(ipv6_binary, 16);
|
2015-06-26 19:23:25 +00:00
|
|
|
|
|
2015-07-01 05:18:54 +00:00
|
|
|
|
block.unsafeGetByPosition(i++).column.get()->insertData(elem.user.data(), elem.user.size());
|
|
|
|
|
block.unsafeGetByPosition(i++).column.get()->insertData(elem.query_id.data(), elem.query_id.size());
|
2015-06-26 19:23:25 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
BlockOutputStreamPtr stream = table->write(nullptr);
|
|
|
|
|
|
|
|
|
|
stream->writePrefix();
|
|
|
|
|
stream->write(block);
|
|
|
|
|
stream->writeSuffix();
|
|
|
|
|
}
|
|
|
|
|
catch (...)
|
|
|
|
|
{
|
|
|
|
|
tryLogCurrentException(__PRETTY_FUNCTION__);
|
|
|
|
|
}
|
|
|
|
|
|
2015-06-26 20:48:10 +00:00
|
|
|
|
/// В случае ошибки тоже очищаем накопленные записи, чтобы не блокироваться.
|
2015-06-26 19:23:25 +00:00
|
|
|
|
data.clear();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|