dbms: QueryLog: development [#METR-16946].

This commit is contained in:
Alexey Milovidov 2015-06-30 02:54:33 +03:00
parent b5fd5fe490
commit cb223d360a
3 changed files with 40 additions and 37 deletions

View File

@ -190,6 +190,8 @@ Block QueryLog::createBlock()
{new ColumnUInt64, new DataTypeUInt64, "result_bytes"},
{new ColumnString, new DataTypeString, "query"},
{new ColumnString, new DataTypeString, "exception"},
{new ColumnString, new DataTypeString, "stack_trace"},
{new ColumnUInt8, new DataTypeUInt8, "interface"},
{new ColumnUInt8, new DataTypeUInt8, "http_method"},
@ -225,9 +227,11 @@ void QueryLog::flush()
block.unsafeGetByPosition(8).column.get()->insert(static_cast<UInt64>(elem.result_bytes));
block.unsafeGetByPosition(9).column.get()->insertData(elem.query.data(), elem.query.size());
block.unsafeGetByPosition(10).column.get()->insertData(elem.exception.data(), elem.exception.size());
block.unsafeGetByPosition(11).column.get()->insertData(elem.stack_trace.data(), elem.stack_trace.size());
block.unsafeGetByPosition(10).column.get()->insert(static_cast<UInt64>(elem.interface));
block.unsafeGetByPosition(11).column.get()->insert(static_cast<UInt64>(elem.http_method));
block.unsafeGetByPosition(12).column.get()->insert(static_cast<UInt64>(elem.interface));
block.unsafeGetByPosition(13).column.get()->insert(static_cast<UInt64>(elem.http_method));
char ipv6_binary[16];
if (Poco::Net::IPAddress::IPv6 == elem.ip_address.family())
@ -245,10 +249,10 @@ void QueryLog::flush()
else
memset(ipv6_binary, 0, 16);
block.unsafeGetByPosition(12).column.get()->insertData(ipv6_binary, 16);
block.unsafeGetByPosition(14).column.get()->insertData(ipv6_binary, 16);
block.unsafeGetByPosition(13).column.get()->insertData(elem.user.data(), elem.user.size());
block.unsafeGetByPosition(14).column.get()->insertData(elem.query_id.data(), elem.query_id.size());
block.unsafeGetByPosition(15).column.get()->insertData(elem.user.data(), elem.user.size());
block.unsafeGetByPosition(16).column.get()->insertData(elem.query_id.data(), elem.query_id.size());
}
BlockOutputStreamPtr stream = table->write(nullptr);

View File

@ -50,6 +50,8 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
bool internal,
QueryProcessingStage::Enum stage)
{
/// TODO Логгировать здесь эксепшены, возникающие до начала выполнения запроса.
ProfileEvents::increment(ProfileEvents::Query);
ParserQuery parser;
@ -107,7 +109,23 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
BlockIO res;
/// Всё, что связано с логгированием запросов.
try
{
auto interpreter = InterpreterFactory::get(ast, context, stage);
res = interpreter->execute();
/// Держим элемент списка процессов до конца обработки запроса.
res.process_list_entry = process_list_entry;
}
catch (...)
{
quota.addError(current_time); /// TODO Было бы лучше добавить ещё в exception_callback
throw;
}
quota.addQuery(current_time);
/// Всё, что связано с логом запросов.
{
QueryLogElement elem;
@ -142,7 +160,8 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
{
const BlockStreamProfileInfo & info = profiling_stream->getInfo();
elem.query_duration_ms = info.total_stopwatch.elapsed() / 1000000;
double elapsed_seconds = info.total_stopwatch.elapsed(); /// TODO этот Stopwatch - coarse, использовать другой
elem.query_duration_ms = elapsed_seconds * 1000;
stream.getLeafRowsBytes(elem.read_rows, elem.read_bytes); /// TODO неверно для распределённых запросов?
@ -153,9 +172,9 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
{
LOG_INFO(&Logger::get("executeQuery"), std::fixed << std::setprecision(3)
<< "Read " << elem.read_rows << " rows, "
<< formatReadableSizeWithBinarySuffix(elem.read_bytes) << " in " << elem.query_duration_ms / 1000.0 << " sec., "
<< static_cast<size_t>(elem.read_rows * 1000.0 / elem.query_duration_ms) << " rows/sec., "
<< formatReadableSizeWithBinarySuffix(elem.read_bytes * 1000.0 / elem.query_duration_ms) << "/sec.");
<< formatReadableSizeWithBinarySuffix(elem.read_bytes) << " in " << elapsed_seconds << " sec., "
<< static_cast<size_t>(elem.read_rows / elapsed_seconds) << " rows/sec., "
<< formatReadableSizeWithBinarySuffix(elem.read_bytes / elapsed_seconds) << "/sec.");
}
}
@ -190,30 +209,14 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
if (log_queries)
context.getQueryLog().add(elem);
};
}
try
{
auto interpreter = InterpreterFactory::get(ast, context, stage);
res = interpreter->execute();
/// Держим элемент списка процессов до конца обработки запроса.
res.process_list_entry = process_list_entry;
}
catch (...)
{
quota.addError(current_time);
throw;
}
quota.addQuery(current_time);
if (res.in)
{
std::stringstream log_str;
log_str << "Query pipeline:\n";
res.in->dumpTree(log_str);
LOG_DEBUG(&Logger::get("executeQuery"), log_str.str());
if (!internal && res.in)
{
std::stringstream log_str;
log_str << "Query pipeline:\n";
res.in->dumpTree(log_str);
LOG_DEBUG(&Logger::get("executeQuery"), log_str.str());
}
}
return std::make_tuple(ast, res);

View File

@ -283,10 +283,6 @@ void TCPHandler::processOrdinaryQuery()
AsynchronousBlockInputStream async_in(state.io.in);
async_in.readPrefix();
std::stringstream query_pipeline;
async_in.dumpTree(query_pipeline);
LOG_DEBUG(log, "Query pipeline:\n" << query_pipeline.rdbuf());
while (true)
{
Block block;