diff --git a/src/Server/GRPCServer.cpp b/src/Server/GRPCServer.cpp index 80f738d63bf..d4246d253d8 100644 --- a/src/Server/GRPCServer.cpp +++ b/src/Server/GRPCServer.cpp @@ -1,11 +1,14 @@ #include "GRPCServer.h" #if USE_GRPC +#include +#include #include #include #include #include #include +#include #include #include #include @@ -79,6 +82,8 @@ namespace str.append(str.empty() ? "" : ", ").append("extremes"); if (result.has_progress()) str.append(str.empty() ? "" : ", ").append("progress"); + if (result.logs_size()) + str.append(str.empty() ? "" : ", ").append("logs: ").append(std::to_string(result.logs_size())).append(" entries"); if (result.has_exception()) str.append(str.empty() ? "" : ", ").append("exception"); return str; @@ -195,6 +200,7 @@ namespace void finishQuery(); void onException(const Exception & exception); + void onFatalError(); void close(); void readQueryInfo(); @@ -202,6 +208,7 @@ namespace void addProgressToResult(); void addTotalsToResult(const Block & totals); void addExtremesToResult(const Block & extremes); + void addLogsToResult(); void sendResult(); void throwIfFailedToSendResult(); void sendException(const Exception & exception); @@ -223,6 +230,7 @@ namespace BlockIO io; Progress progress; + InternalTextLogsQueuePtr logs_queue; GRPCQueryInfo query_info; /// We reuse the same messages multiple times. GRPCResult result; @@ -357,7 +365,16 @@ namespace query_context->applySettingsChanges(settings_changes); const Settings & settings = query_context->getSettingsRef(); + /// Prepare for sending exceptions and logs. send_exception_with_stacktrace = query_context->getSettingsRef().calculate_text_stack_trace; + const auto client_logs_level = query_context->getSettingsRef().send_logs_level; + if (client_logs_level != LogsLevel::none) + { + logs_queue = std::make_shared(); + logs_queue->max_priority = Poco::Logger::parseLevel(client_logs_level.toString()); + CurrentThread::attachInternalTextLogsQueue(logs_queue, client_logs_level); + CurrentThread::setFatalErrorCallback([this]{ onFatalError(); }); + } /// Set the current database if specified. if (!query_info.database().empty()) @@ -480,7 +497,7 @@ namespace "Only the following fields can be set: input_data, next_query_info", ErrorCodes::INVALID_GRPC_QUERY_INFO); } - LOG_DEBUG(log, "Received extra QueryInfo with input data: {} bytes", query_info.input_data().size()); + LOG_DEBUG(log, "Received extra QueryInfo: input_data: {} bytes", query_info.input_data().size()); need_input_data_from_query_info = true; } @@ -548,8 +565,10 @@ namespace after_send_progress.restart(); } + addLogsToResult(); + bool has_output = write_buffer->offset(); - if (has_output || result.has_progress()) + if (has_output || result.has_progress() || result.logs_size()) sendResult(); throwIfFailedToSendResult(); @@ -587,8 +606,10 @@ namespace after_send_progress.restart(); } + addLogsToResult(); + bool has_output = write_buffer->offset(); - if (has_output || result.has_progress()) + if (has_output || result.has_progress() || result.logs_size()) sendResult(); throwIfFailedToSendResult(); @@ -605,6 +626,7 @@ namespace finalize = true; io.onFinish(); query_scope->logPeakMemoryUsage(); + addLogsToResult(); sendResult(); close(); @@ -624,6 +646,16 @@ namespace if (responder && !responder_finished) { + try + { + /// Try to send logs to client, but it could be risky too. + addLogsToResult(); + } + catch (...) + { + LOG_WARNING(log, "Couldn't send logs to client"); + } + try { sendException(exception); @@ -637,6 +669,22 @@ namespace close(); } + void Call::onFatalError() + { + if (responder && !responder_finished) + { + try + { + finalize = true; + addLogsToResult(); + sendResult(); + } + catch (...) + { + } + } + } + void Call::close() { responder.reset(); @@ -715,6 +763,53 @@ namespace stream->writeSuffix(); } + void Call::addLogsToResult() + { + if (!logs_queue) + return; + + static_assert(::clickhouse::grpc::LOG_NONE == 0); + static_assert(::clickhouse::grpc::LOG_FATAL == static_cast(Poco::Message::PRIO_FATAL)); + static_assert(::clickhouse::grpc::LOG_CRITICAL == static_cast(Poco::Message::PRIO_CRITICAL)); + static_assert(::clickhouse::grpc::LOG_ERROR == static_cast(Poco::Message::PRIO_ERROR)); + static_assert(::clickhouse::grpc::LOG_WARNING == static_cast(Poco::Message::PRIO_WARNING)); + static_assert(::clickhouse::grpc::LOG_NOTICE == static_cast(Poco::Message::PRIO_NOTICE)); + static_assert(::clickhouse::grpc::LOG_INFORMATION == static_cast(Poco::Message::PRIO_INFORMATION)); + static_assert(::clickhouse::grpc::LOG_DEBUG == static_cast(Poco::Message::PRIO_DEBUG)); + static_assert(::clickhouse::grpc::LOG_TRACE == static_cast(Poco::Message::PRIO_TRACE)); + + MutableColumns columns; + while (logs_queue->tryPop(columns)) + { + if (columns.empty() || columns[0]->empty()) + continue; + + const auto & column_time = typeid_cast(*columns[0]); + const auto & column_time_microseconds = typeid_cast(*columns[1]); + const auto & column_query_id = typeid_cast(*columns[3]); + const auto & column_thread_id = typeid_cast(*columns[4]); + const auto & column_level = typeid_cast(*columns[5]); + const auto & column_source = typeid_cast(*columns[6]); + const auto & column_text = typeid_cast(*columns[7]); + size_t num_rows = column_time.size(); + + for (size_t row = 0; row != num_rows; ++row) + { + auto & log_entry = *result.add_logs(); + log_entry.set_time(column_time.getElement(row)); + log_entry.set_time_microseconds(column_time_microseconds.getElement(row)); + StringRef query_id = column_query_id.getDataAt(row); + log_entry.set_query_id(query_id.data, query_id.size); + log_entry.set_thread_id(column_thread_id.getElement(row)); + log_entry.set_level(static_cast<::clickhouse::grpc::LogsLevel>(column_level.getElement(row))); + StringRef source = column_source.getDataAt(row); + log_entry.set_source(source.data, source.size); + StringRef text = column_text.getDataAt(row); + log_entry.set_text(text.data, text.size); + } + } + } + void Call::sendResult() { /// gRPC doesn't allow to write anything to a finished responder. diff --git a/src/Server/grpc_protos/clickhouse_grpc.proto b/src/Server/grpc_protos/clickhouse_grpc.proto index 61ccaebf8b4..0fa01645825 100644 --- a/src/Server/grpc_protos/clickhouse_grpc.proto +++ b/src/Server/grpc_protos/clickhouse_grpc.proto @@ -16,6 +16,28 @@ message QueryInfo { bool next_query_info = 11; } +enum LogsLevel { + LOG_NONE = 0; + LOG_FATAL = 1; + LOG_CRITICAL = 2; + LOG_ERROR = 3; + LOG_WARNING = 4; + LOG_NOTICE = 5; + LOG_INFORMATION = 6; + LOG_DEBUG = 7; + LOG_TRACE = 8; +} + +message LogEntry { + uint32 time = 1; + uint32 time_microseconds = 2; + uint64 thread_id = 3; + string query_id = 4; + LogsLevel level = 5; + string source = 6; + string text = 7; +} + message Progress { uint64 read_rows = 1; uint64 read_bytes = 2; @@ -35,8 +57,9 @@ message Result { string output = 1; string totals = 2; string extremes = 3; - Progress progress = 4; - Exception exception = 5; + repeated LogEntry logs = 4; + Progress progress = 5; + Exception exception = 6; } service ClickHouse { diff --git a/tests/integration/test_grpc_protocol/test.py b/tests/integration/test_grpc_protocol/test.py index 71e47e7fe14..0f1e448bf22 100644 --- a/tests/integration/test_grpc_protocol/test.py +++ b/tests/integration/test_grpc_protocol/test.py @@ -84,6 +84,14 @@ def query_and_get_extremes(*args, **kwargs): extremes += result.extremes return extremes +def query_and_get_logs(*args, **kwargs): + logs = "" + for result in query_no_errors(*args, **kwargs): + for log_entry in result.logs: + #print(log_entry) + logs += log_entry.text + "\n" + return logs + @pytest.fixture(scope="module", autouse=True) def start_cluster(): cluster.start() @@ -164,3 +172,9 @@ def test_errors_handling(): query("CREATE TABLE t (a UInt8) ENGINE = Memory") e = query_and_get_error("CREATE TABLE t (a UInt8) ENGINE = Memory") assert "Table default.t already exists" in e.display_text + +def test_logs(): + logs = query_and_get_logs("SELECT 1", settings={'send_logs_level':'debug'}) + assert "SELECT 1" in logs + assert "Read 1 rows" in logs + assert "Peak memory usage" in logs