Send logs via gRPC protocol too.

This commit is contained in:
Vitaly Baranov 2020-10-24 03:37:57 +03:00
parent 218d9ea3e8
commit 4f0405af93
3 changed files with 137 additions and 5 deletions

View File

@ -1,11 +1,14 @@
#include "GRPCServer.h"
#if USE_GRPC
#include <Columns/ColumnString.h>
#include <Columns/ColumnsNumber.h>
#include <Common/CurrentThread.h>
#include <Common/SettingsChanges.h>
#include <DataStreams/AddingDefaultsBlockInputStream.h>
#include <DataStreams/AsynchronousBlockInputStream.h>
#include <Interpreters/Context.h>
#include <Interpreters/InternalTextLogsQueue.h>
#include <Interpreters/executeQuery.h>
#include <IO/ConcatReadBuffer.h>
#include <IO/ReadBufferFromString.h>
@ -79,6 +82,8 @@ namespace
str.append(str.empty() ? "" : ", ").append("extremes");
if (result.has_progress())
str.append(str.empty() ? "" : ", ").append("progress");
if (result.logs_size())
str.append(str.empty() ? "" : ", ").append("logs: ").append(std::to_string(result.logs_size())).append(" entries");
if (result.has_exception())
str.append(str.empty() ? "" : ", ").append("exception");
return str;
@ -195,6 +200,7 @@ namespace
void finishQuery();
void onException(const Exception & exception);
void onFatalError();
void close();
void readQueryInfo();
@ -202,6 +208,7 @@ namespace
void addProgressToResult();
void addTotalsToResult(const Block & totals);
void addExtremesToResult(const Block & extremes);
void addLogsToResult();
void sendResult();
void throwIfFailedToSendResult();
void sendException(const Exception & exception);
@ -223,6 +230,7 @@ namespace
BlockIO io;
Progress progress;
InternalTextLogsQueuePtr logs_queue;
GRPCQueryInfo query_info; /// We reuse the same messages multiple times.
GRPCResult result;
@ -357,7 +365,16 @@ namespace
query_context->applySettingsChanges(settings_changes);
const Settings & settings = query_context->getSettingsRef();
/// Prepare for sending exceptions and logs.
send_exception_with_stacktrace = query_context->getSettingsRef().calculate_text_stack_trace;
const auto client_logs_level = query_context->getSettingsRef().send_logs_level;
if (client_logs_level != LogsLevel::none)
{
logs_queue = std::make_shared<InternalTextLogsQueue>();
logs_queue->max_priority = Poco::Logger::parseLevel(client_logs_level.toString());
CurrentThread::attachInternalTextLogsQueue(logs_queue, client_logs_level);
CurrentThread::setFatalErrorCallback([this]{ onFatalError(); });
}
/// Set the current database if specified.
if (!query_info.database().empty())
@ -480,7 +497,7 @@ namespace
"Only the following fields can be set: input_data, next_query_info",
ErrorCodes::INVALID_GRPC_QUERY_INFO);
}
LOG_DEBUG(log, "Received extra QueryInfo with input data: {} bytes", query_info.input_data().size());
LOG_DEBUG(log, "Received extra QueryInfo: input_data: {} bytes", query_info.input_data().size());
need_input_data_from_query_info = true;
}
@ -548,8 +565,10 @@ namespace
after_send_progress.restart();
}
addLogsToResult();
bool has_output = write_buffer->offset();
if (has_output || result.has_progress())
if (has_output || result.has_progress() || result.logs_size())
sendResult();
throwIfFailedToSendResult();
@ -587,8 +606,10 @@ namespace
after_send_progress.restart();
}
addLogsToResult();
bool has_output = write_buffer->offset();
if (has_output || result.has_progress())
if (has_output || result.has_progress() || result.logs_size())
sendResult();
throwIfFailedToSendResult();
@ -605,6 +626,7 @@ namespace
finalize = true;
io.onFinish();
query_scope->logPeakMemoryUsage();
addLogsToResult();
sendResult();
close();
@ -624,6 +646,16 @@ namespace
if (responder && !responder_finished)
{
try
{
/// Try to send logs to client, but it could be risky too.
addLogsToResult();
}
catch (...)
{
LOG_WARNING(log, "Couldn't send logs to client");
}
try
{
sendException(exception);
@ -637,6 +669,22 @@ namespace
close();
}
void Call::onFatalError()
{
if (responder && !responder_finished)
{
try
{
finalize = true;
addLogsToResult();
sendResult();
}
catch (...)
{
}
}
}
void Call::close()
{
responder.reset();
@ -715,6 +763,53 @@ namespace
stream->writeSuffix();
}
void Call::addLogsToResult()
{
if (!logs_queue)
return;
static_assert(::clickhouse::grpc::LOG_NONE == 0);
static_assert(::clickhouse::grpc::LOG_FATAL == static_cast<int>(Poco::Message::PRIO_FATAL));
static_assert(::clickhouse::grpc::LOG_CRITICAL == static_cast<int>(Poco::Message::PRIO_CRITICAL));
static_assert(::clickhouse::grpc::LOG_ERROR == static_cast<int>(Poco::Message::PRIO_ERROR));
static_assert(::clickhouse::grpc::LOG_WARNING == static_cast<int>(Poco::Message::PRIO_WARNING));
static_assert(::clickhouse::grpc::LOG_NOTICE == static_cast<int>(Poco::Message::PRIO_NOTICE));
static_assert(::clickhouse::grpc::LOG_INFORMATION == static_cast<int>(Poco::Message::PRIO_INFORMATION));
static_assert(::clickhouse::grpc::LOG_DEBUG == static_cast<int>(Poco::Message::PRIO_DEBUG));
static_assert(::clickhouse::grpc::LOG_TRACE == static_cast<int>(Poco::Message::PRIO_TRACE));
MutableColumns columns;
while (logs_queue->tryPop(columns))
{
if (columns.empty() || columns[0]->empty())
continue;
const auto & column_time = typeid_cast<const ColumnUInt32 &>(*columns[0]);
const auto & column_time_microseconds = typeid_cast<const ColumnUInt32 &>(*columns[1]);
const auto & column_query_id = typeid_cast<const ColumnString &>(*columns[3]);
const auto & column_thread_id = typeid_cast<const ColumnUInt64 &>(*columns[4]);
const auto & column_level = typeid_cast<const ColumnInt8 &>(*columns[5]);
const auto & column_source = typeid_cast<const ColumnString &>(*columns[6]);
const auto & column_text = typeid_cast<const ColumnString &>(*columns[7]);
size_t num_rows = column_time.size();
for (size_t row = 0; row != num_rows; ++row)
{
auto & log_entry = *result.add_logs();
log_entry.set_time(column_time.getElement(row));
log_entry.set_time_microseconds(column_time_microseconds.getElement(row));
StringRef query_id = column_query_id.getDataAt(row);
log_entry.set_query_id(query_id.data, query_id.size);
log_entry.set_thread_id(column_thread_id.getElement(row));
log_entry.set_level(static_cast<::clickhouse::grpc::LogsLevel>(column_level.getElement(row)));
StringRef source = column_source.getDataAt(row);
log_entry.set_source(source.data, source.size);
StringRef text = column_text.getDataAt(row);
log_entry.set_text(text.data, text.size);
}
}
}
void Call::sendResult()
{
/// gRPC doesn't allow to write anything to a finished responder.

View File

@ -16,6 +16,28 @@ message QueryInfo {
bool next_query_info = 11;
}
enum LogsLevel {
LOG_NONE = 0;
LOG_FATAL = 1;
LOG_CRITICAL = 2;
LOG_ERROR = 3;
LOG_WARNING = 4;
LOG_NOTICE = 5;
LOG_INFORMATION = 6;
LOG_DEBUG = 7;
LOG_TRACE = 8;
}
message LogEntry {
uint32 time = 1;
uint32 time_microseconds = 2;
uint64 thread_id = 3;
string query_id = 4;
LogsLevel level = 5;
string source = 6;
string text = 7;
}
message Progress {
uint64 read_rows = 1;
uint64 read_bytes = 2;
@ -35,8 +57,9 @@ message Result {
string output = 1;
string totals = 2;
string extremes = 3;
Progress progress = 4;
Exception exception = 5;
repeated LogEntry logs = 4;
Progress progress = 5;
Exception exception = 6;
}
service ClickHouse {

View File

@ -84,6 +84,14 @@ def query_and_get_extremes(*args, **kwargs):
extremes += result.extremes
return extremes
def query_and_get_logs(*args, **kwargs):
logs = ""
for result in query_no_errors(*args, **kwargs):
for log_entry in result.logs:
#print(log_entry)
logs += log_entry.text + "\n"
return logs
@pytest.fixture(scope="module", autouse=True)
def start_cluster():
cluster.start()
@ -164,3 +172,9 @@ def test_errors_handling():
query("CREATE TABLE t (a UInt8) ENGINE = Memory")
e = query_and_get_error("CREATE TABLE t (a UInt8) ENGINE = Memory")
assert "Table default.t already exists" in e.display_text
def test_logs():
logs = query_and_get_logs("SELECT 1", settings={'send_logs_level':'debug'})
assert "SELECT 1" in logs
assert "Read 1 rows" in logs
assert "Peak memory usage" in logs