bugfix client logs + some tests

This commit is contained in:
NIKITA MIKHAILOV 2019-07-10 15:19:17 +03:00
parent 21ad247df3
commit 865606b83c
6 changed files with 105 additions and 29 deletions

View File

@ -29,6 +29,11 @@
#include <DataTypes/DataTypeLowCardinality.h> #include <DataTypes/DataTypeLowCardinality.h>
#include <Compression/CompressionFactory.h> #include <Compression/CompressionFactory.h>
#include <common/logger_useful.h> #include <common/logger_useful.h>
<<<<<<< HEAD
=======
#include <iostream>
>>>>>>> aa54091152... bugfix client logs + some tests
#include "TCPHandler.h" #include "TCPHandler.h"
@ -55,7 +60,7 @@ void TCPHandler::runImpl()
ThreadStatus thread_status; ThreadStatus thread_status;
connection_context = server.context(); connection_context = server.context();
connection_context.makeSessionContext(); connection_context.setSessionContext(connection_context);
Settings global_settings = connection_context.getSettings(); Settings global_settings = connection_context.getSettings();
@ -171,13 +176,13 @@ void TCPHandler::runImpl()
send_exception_with_stack_trace = query_context->getSettingsRef().calculate_text_stack_trace; send_exception_with_stack_trace = query_context->getSettingsRef().calculate_text_stack_trace;
/// Should we send internal logs to client? /// Should we send internal logs to client?
const auto client_logs_level = query_context->getSettingsRef().send_logs_level.value; const auto client_logs_level = query_context->getSettingsRef().send_logs_level;
if (client_revision >= DBMS_MIN_REVISION_WITH_SERVER_LOGS if (client_revision >= DBMS_MIN_REVISION_WITH_SERVER_LOGS
&& client_logs_level != LogsLevel::none) && client_logs_level.value != LogsLevel::none)
{ {
state.logs_queue = std::make_shared<InternalTextLogsQueue>(); state.logs_queue = std::make_shared<InternalTextLogsQueue>();
state.logs_queue->max_priority = Poco::Logger::parseLevel(query_context->getSettingsRef().send_logs_level.toString()); state.logs_queue->max_priority = Poco::Logger::parseLevel(client_logs_level.toString());
CurrentThread::attachInternalTextLogsQueue(state.logs_queue, client_logs_level); CurrentThread::attachInternalTextLogsQueue(state.logs_queue, client_logs_level.value);
} }
query_context->setExternalTablesInitializer([&global_settings, this] (Context & context) query_context->setExternalTablesInitializer([&global_settings, this] (Context & context)

View File

@ -64,7 +64,7 @@ public:
UInt32 master_thread_number = 0; UInt32 master_thread_number = 0;
Int32 master_thread_os_id = -1; Int32 master_thread_os_id = -1;
LogsLevel client_logs_level = LogsLevel::none; std::atomic<LogsLevel> client_logs_level = LogsLevel::none;
String query; String query;
}; };
@ -134,7 +134,12 @@ public:
return thread_state == Died ? nullptr : logs_queue_ptr.lock(); return thread_state == Died ? nullptr : logs_queue_ptr.lock();
} }
<<<<<<< HEAD
void attachInternalTextLogsQueue(const InternalTextLogsQueuePtr & logs_queue, LogsLevel client_logs_level); void attachInternalTextLogsQueue(const InternalTextLogsQueuePtr & logs_queue, LogsLevel client_logs_level);
=======
void attachInternalTextLogsQueue(const InternalTextLogsQueuePtr & logs_queue,
LogsLevel client_logs_level);
>>>>>>> aa54091152... bugfix client logs + some tests
/// Sets query context for current thread and its thread group /// Sets query context for current thread and its thread group
/// NOTE: query_context have to be alive until detachQuery() is called /// NOTE: query_context have to be alive until detachQuery() is called

View File

@ -20,13 +20,13 @@ Block InternalTextLogsQueue::getSampleBlock()
{ {
return Block { return Block {
{std::make_shared<DataTypeDateTime>(), "event_time"}, {std::make_shared<DataTypeDateTime>(), "event_time"},
{std::make_shared<DataTypeUInt32>(), "event_time_microseconds"}, {std::make_shared<DataTypeUInt32>(), "event_time_microseconds"},
{std::make_shared<DataTypeString>(), "host_name"}, {std::make_shared<DataTypeString>(), "host_name"},
{std::make_shared<DataTypeString>(), "query_id"}, {std::make_shared<DataTypeString>(), "query_id"},
{std::make_shared<DataTypeUInt32>(), "thread_number"}, {std::make_shared<DataTypeUInt32>(), "thread_number"},
{std::make_shared<DataTypeInt8>(), "priority"}, {std::make_shared<DataTypeInt8>(), "priority"},
{std::make_shared<DataTypeString>(), "source"}, {std::make_shared<DataTypeString>(), "source"},
{std::make_shared<DataTypeString>(), "text"} {std::make_shared<DataTypeString>(), "text"}
}; };
} }

View File

@ -0,0 +1,12 @@
<Debug>
<Trace>
<Debug>
<Information>
<Debug>
*****
<Information>

View File

@ -0,0 +1,17 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. $CURDIR/../shell_config.sh
cp /dev/null 00965_send_logs_level_concurrent_queries_first.tmp
cp /dev/null 00965_send_logs_level_concurrent_queries_second.tmp
clickhouse-client --send_logs_level="trace" --query="SELECT * from numbers(100000);" >> /dev/null 2>> 00965_send_logs_level_concurrent_queries_first.tmp &
clickhouse-client --send_logs_level="information" --query="SELECT * from numbers(100000);" >> /dev/null 2>> 00965_send_logs_level_concurrent_queries_second.tmp
sleep 2
awk '{ print $8 }' 00965_send_logs_level_concurrent_queries_first.tmp
echo "*****"
awk '{ print $8 }' 00965_send_logs_level_concurrent_queries_second.tmp

View File

@ -5,46 +5,83 @@
#include <sstream> #include <sstream>
#include <atomic> #include <atomic>
#include <Poco/Logger.h> #include <Poco/Logger.h>
#include <Poco/Message.h>
#include <Core/SettingsCommon.h> #include <Core/SettingsCommon.h>
#include <Common/CurrentThread.h> #include <Common/CurrentThread.h>
#include <iostream>
#ifndef QUERY_PREVIEW_LENGTH #ifndef QUERY_PREVIEW_LENGTH
#define QUERY_PREVIEW_LENGTH 160 #define QUERY_PREVIEW_LENGTH 160
#endif #endif
using Poco::Logger; using Poco::Logger;
using Poco::Message;
using DB::LogsLevel; using DB::LogsLevel;
using DB::CurrentThread; using DB::CurrentThread;
/// Logs a message to a specified logger with that level. /// Logs a message to a specified logger with that level.
#define LOG_TRACE(logger, message) do { \ #define LOG_TRACE(logger, message) do { \
if ((logger)->trace() || CurrentThread::getGroup()->client_logs_level >= LogsLevel::trace) {\ const bool is_clients_log = (CurrentThread::getGroup() != nullptr) && (CurrentThread::getGroup()->client_logs_level >= LogsLevel::trace); \
std::stringstream oss_internal_rare; \ if (is_clients_log) {\
std::cerr << "CLIENTS LOG TRACE" << std::endl; \
}\
if ((logger)->trace() || is_clients_log) {\
std::stringstream oss_internal_rare; \
oss_internal_rare << message; \ oss_internal_rare << message; \
(logger)->trace(oss_internal_rare.str());}} while(false) if (is_clients_log) {\
(logger)->force_log(oss_internal_rare.str(), Message::PRIO_TRACE); \
} else { \
(logger)->trace(oss_internal_rare.str()); \
}}} while(false)
#define LOG_DEBUG(logger, message) do { \ #define LOG_DEBUG(logger, message) do { \
if ((logger)->debug() || CurrentThread::getGroup()->client_logs_level >= LogsLevel::debug) {\ const bool is_clients_log = (CurrentThread::getGroup() != nullptr) && (CurrentThread::getGroup()->client_logs_level >= LogsLevel::debug); \
std::stringstream oss_internal_rare; \ if (is_clients_log) {\
std::cerr << "CLIENTS LOG DEBUG" << std::endl; \
}\
if ((logger)->debug() || is_clients_log) {\
std::stringstream oss_internal_rare; \
oss_internal_rare << message; \ oss_internal_rare << message; \
(logger)->debug(oss_internal_rare.str());}} while(false) if (is_clients_log) {\
(logger)->force_log(oss_internal_rare.str(), Message::PRIO_DEBUG); \
} else { \
(logger)->debug(oss_internal_rare.str()); \
}}} while(false)
#define LOG_INFO(logger, message) do { \ #define LOG_INFO(logger, message) do { \
if ((logger)->information() || CurrentThread::getGroup()->client_logs_level >= LogsLevel::information) {\ const bool is_clients_log = (CurrentThread::getGroup() != nullptr) && (CurrentThread::getGroup()->client_logs_level >= LogsLevel::information); \
std::stringstream oss_internal_rare; \ if (is_clients_log) {\
std::cerr << "CLIENTS LOG INFORMATION" << std::endl; \
}\
if ((logger)->information() || is_clients_log) {\
std::stringstream oss_internal_rare; \
oss_internal_rare << message; \ oss_internal_rare << message; \
(logger)->information(oss_internal_rare.str());}} while(false) if (is_clients_log) {\
(logger)->force_log(oss_internal_rare.str(), Message::PRIO_INFORMATION); \
} else { \
(logger)->information(oss_internal_rare.str()); \
}}} while(false)
#define LOG_WARNING(logger, message) do { \ #define LOG_WARNING(logger, message) do { \
if ((logger)->warning() || CurrentThread::getGroup()->client_logs_level >= LogsLevel::warning) {\ const bool is_clients_log = (CurrentThread::getGroup() != nullptr) && (CurrentThread::getGroup()->client_logs_level >= LogsLevel::warning); \
std::stringstream oss_internal_rare; \ if ((logger)->warning() || is_clients_log) {\
std::stringstream oss_internal_rare; \
oss_internal_rare << message; \ oss_internal_rare << message; \
(logger)->warning(oss_internal_rare.str());}} while(false) if (is_clients_log) {\
(logger)->force_log(oss_internal_rare.str(), Message::PRIO_WARNING); \
} else { \
(logger)->warning(oss_internal_rare.str()); \
}}} while(false)
#define LOG_ERROR(logger, message) do { \ #define LOG_ERROR(logger, message) do { \
if ((logger)->error() || CurrentThread::getGroup()->client_logs_level >= LogsLevel::error) {\ const bool is_clients_log = (CurrentThread::getGroup() != nullptr) && (CurrentThread::getGroup()->client_logs_level >= LogsLevel::error); \
std::stringstream oss_internal_rare; \ if ((logger)->error() || is_clients_log) {\
std::stringstream oss_internal_rare; \
oss_internal_rare << message; \ oss_internal_rare << message; \
(logger)->error(oss_internal_rare.str());}} while(false) if (is_clients_log) {\
(logger)->force_log(oss_internal_rare.str(), Message::PRIO_ERROR); \
} else { \
(logger)->error(oss_internal_rare.str()); \
}}} while(false)