From cb223d360a146b9533f7d865c56a66c4d2fa8bc3 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Tue, 30 Jun 2015 02:54:33 +0300 Subject: [PATCH] dbms: QueryLog: development [#METR-16946]. --- dbms/src/Interpreters/QueryLog.cpp | 14 +++--- dbms/src/Interpreters/executeQuery.cpp | 59 ++++++++++++++------------ dbms/src/Server/TCPHandler.cpp | 4 -- 3 files changed, 40 insertions(+), 37 deletions(-) diff --git a/dbms/src/Interpreters/QueryLog.cpp b/dbms/src/Interpreters/QueryLog.cpp index 77ba058de38..8a317ae5d10 100644 --- a/dbms/src/Interpreters/QueryLog.cpp +++ b/dbms/src/Interpreters/QueryLog.cpp @@ -190,6 +190,8 @@ Block QueryLog::createBlock() {new ColumnUInt64, new DataTypeUInt64, "result_bytes"}, {new ColumnString, new DataTypeString, "query"}, + {new ColumnString, new DataTypeString, "exception"}, + {new ColumnString, new DataTypeString, "stack_trace"}, {new ColumnUInt8, new DataTypeUInt8, "interface"}, {new ColumnUInt8, new DataTypeUInt8, "http_method"}, @@ -225,9 +227,11 @@ void QueryLog::flush() block.unsafeGetByPosition(8).column.get()->insert(static_cast(elem.result_bytes)); block.unsafeGetByPosition(9).column.get()->insertData(elem.query.data(), elem.query.size()); + block.unsafeGetByPosition(10).column.get()->insertData(elem.exception.data(), elem.exception.size()); + block.unsafeGetByPosition(11).column.get()->insertData(elem.stack_trace.data(), elem.stack_trace.size()); - block.unsafeGetByPosition(10).column.get()->insert(static_cast(elem.interface)); - block.unsafeGetByPosition(11).column.get()->insert(static_cast(elem.http_method)); + block.unsafeGetByPosition(12).column.get()->insert(static_cast(elem.interface)); + block.unsafeGetByPosition(13).column.get()->insert(static_cast(elem.http_method)); char ipv6_binary[16]; if (Poco::Net::IPAddress::IPv6 == elem.ip_address.family()) @@ -245,10 +249,10 @@ void QueryLog::flush() else memset(ipv6_binary, 0, 16); - block.unsafeGetByPosition(12).column.get()->insertData(ipv6_binary, 16); + block.unsafeGetByPosition(14).column.get()->insertData(ipv6_binary, 16); - block.unsafeGetByPosition(13).column.get()->insertData(elem.user.data(), elem.user.size()); - block.unsafeGetByPosition(14).column.get()->insertData(elem.query_id.data(), elem.query_id.size()); + block.unsafeGetByPosition(15).column.get()->insertData(elem.user.data(), elem.user.size()); + block.unsafeGetByPosition(16).column.get()->insertData(elem.query_id.data(), elem.query_id.size()); } BlockOutputStreamPtr stream = table->write(nullptr); diff --git a/dbms/src/Interpreters/executeQuery.cpp b/dbms/src/Interpreters/executeQuery.cpp index c66aa7ecb57..6d5c2076396 100644 --- a/dbms/src/Interpreters/executeQuery.cpp +++ b/dbms/src/Interpreters/executeQuery.cpp @@ -50,6 +50,8 @@ static std::tuple executeQueryImpl( bool internal, QueryProcessingStage::Enum stage) { + /// TODO Логгировать здесь эксепшены, возникающие до начала выполнения запроса. + ProfileEvents::increment(ProfileEvents::Query); ParserQuery parser; @@ -107,7 +109,23 @@ static std::tuple executeQueryImpl( BlockIO res; - /// Всё, что связано с логгированием запросов. + try + { + auto interpreter = InterpreterFactory::get(ast, context, stage); + res = interpreter->execute(); + + /// Держим элемент списка процессов до конца обработки запроса. + res.process_list_entry = process_list_entry; + } + catch (...) + { + quota.addError(current_time); /// TODO Было бы лучше добавить ещё в exception_callback + throw; + } + + quota.addQuery(current_time); + + /// Всё, что связано с логом запросов. { QueryLogElement elem; @@ -142,7 +160,8 @@ static std::tuple executeQueryImpl( { const BlockStreamProfileInfo & info = profiling_stream->getInfo(); - elem.query_duration_ms = info.total_stopwatch.elapsed() / 1000000; + double elapsed_seconds = info.total_stopwatch.elapsed(); /// TODO этот Stopwatch - coarse, использовать другой + elem.query_duration_ms = elapsed_seconds * 1000; stream.getLeafRowsBytes(elem.read_rows, elem.read_bytes); /// TODO неверно для распределённых запросов? @@ -153,9 +172,9 @@ static std::tuple executeQueryImpl( { LOG_INFO(&Logger::get("executeQuery"), std::fixed << std::setprecision(3) << "Read " << elem.read_rows << " rows, " - << formatReadableSizeWithBinarySuffix(elem.read_bytes) << " in " << elem.query_duration_ms / 1000.0 << " sec., " - << static_cast(elem.read_rows * 1000.0 / elem.query_duration_ms) << " rows/sec., " - << formatReadableSizeWithBinarySuffix(elem.read_bytes * 1000.0 / elem.query_duration_ms) << "/sec."); + << formatReadableSizeWithBinarySuffix(elem.read_bytes) << " in " << elapsed_seconds << " sec., " + << static_cast(elem.read_rows / elapsed_seconds) << " rows/sec., " + << formatReadableSizeWithBinarySuffix(elem.read_bytes / elapsed_seconds) << "/sec."); } } @@ -190,30 +209,14 @@ static std::tuple executeQueryImpl( if (log_queries) context.getQueryLog().add(elem); }; - } - try - { - auto interpreter = InterpreterFactory::get(ast, context, stage); - res = interpreter->execute(); - - /// Держим элемент списка процессов до конца обработки запроса. - res.process_list_entry = process_list_entry; - } - catch (...) - { - quota.addError(current_time); - throw; - } - - quota.addQuery(current_time); - - if (res.in) - { - std::stringstream log_str; - log_str << "Query pipeline:\n"; - res.in->dumpTree(log_str); - LOG_DEBUG(&Logger::get("executeQuery"), log_str.str()); + if (!internal && res.in) + { + std::stringstream log_str; + log_str << "Query pipeline:\n"; + res.in->dumpTree(log_str); + LOG_DEBUG(&Logger::get("executeQuery"), log_str.str()); + } } return std::make_tuple(ast, res); diff --git a/dbms/src/Server/TCPHandler.cpp b/dbms/src/Server/TCPHandler.cpp index 196ebf08afa..8c9231c94f1 100644 --- a/dbms/src/Server/TCPHandler.cpp +++ b/dbms/src/Server/TCPHandler.cpp @@ -283,10 +283,6 @@ void TCPHandler::processOrdinaryQuery() AsynchronousBlockInputStream async_in(state.io.in); async_in.readPrefix(); - std::stringstream query_pipeline; - async_in.dumpTree(query_pipeline); - LOG_DEBUG(log, "Query pipeline:\n" << query_pipeline.rdbuf()); - while (true) { Block block;