ClickHouse/base/loggers/OwnSplitChannel.cpp

118 lines
3.5 KiB
C++
Raw Normal View History

#include "OwnSplitChannel.h"
#include <iostream>
#include <Core/Block.h>
#include <Interpreters/InternalTextLogsQueue.h>
2019-07-22 13:54:08 +00:00
#include <Interpreters/TextLog.h>
#include <sys/time.h>
#include <Poco/Message.h>
#include <Common/CurrentThread.h>
#include <Common/DNSResolver.h>
2020-02-02 02:35:47 +00:00
#include <common/getThreadId.h>
#include <Common/SensitiveDataMasker.h>
namespace DB
{
void OwnSplitChannel::log(const Poco::Message & msg)
{
auto logs_queue = CurrentThread::getInternalTextLogsQueue();
if (channels.empty() && (logs_queue == nullptr || msg.getPriority() > logs_queue->max_priority))
return;
2020-04-21 22:04:19 +00:00
if (auto * masker = SensitiveDataMasker::getInstance())
2019-06-20 07:17:21 +00:00
{
auto message_text = msg.getText();
2019-07-19 07:44:18 +00:00
auto matches = masker->wipeSensitiveData(message_text);
2019-06-20 07:17:21 +00:00
if (matches > 0)
{
logSplit({msg, message_text}); // we will continue with the copy of original message with text modified
return;
}
}
2019-06-20 07:17:21 +00:00
logSplit(msg);
}
void OwnSplitChannel::logSplit(const Poco::Message & msg)
{
ExtendedLogMessage msg_ext = ExtendedLogMessage::getFrom(msg);
/// Log data to child channels
for (auto & channel : channels)
{
if (channel.second)
channel.second->logExtended(msg_ext); // extended child
else
channel.first->log(msg); // ordinary child
}
2019-06-20 07:17:21 +00:00
auto logs_queue = CurrentThread::getInternalTextLogsQueue();
/// Log to "TCP queue" if message is not too noisy
if (logs_queue && msg.getPriority() <= logs_queue->max_priority)
{
MutableColumns columns = InternalTextLogsQueue::getSampleColumns();
size_t i = 0;
columns[i++]->insert(msg_ext.time_seconds);
columns[i++]->insert(msg_ext.time_microseconds);
columns[i++]->insert(DNSResolver::instance().getHostName());
columns[i++]->insert(msg_ext.query_id);
2020-02-02 02:27:15 +00:00
columns[i++]->insert(msg_ext.thread_id);
columns[i++]->insert(Int64(msg.getPriority()));
columns[i++]->insert(msg.getSource());
columns[i++]->insert(msg.getText());
logs_queue->emplace(std::move(columns));
}
/// Also log to system.text_log table, if message is not too noisy
2020-03-18 11:59:40 +00:00
auto text_log_max_priority_loaded = text_log_max_priority.load(std::memory_order_relaxed);
if (text_log_max_priority_loaded && msg.getPriority() <= text_log_max_priority_loaded)
{
TextLogElement elem;
2019-07-22 13:54:08 +00:00
elem.event_time = msg_ext.time_seconds;
elem.microseconds = msg_ext.time_microseconds;
2019-07-22 13:54:08 +00:00
elem.thread_name = getThreadName();
2020-02-02 02:27:15 +00:00
elem.thread_id = msg_ext.thread_id;
2019-07-22 13:54:08 +00:00
elem.query_id = msg_ext.query_id;
2019-07-22 15:09:33 +00:00
elem.message = msg.getText();
elem.logger_name = msg.getSource();
elem.level = msg.getPriority();
2019-07-22 13:54:08 +00:00
if (msg.getSourceFile() != nullptr)
elem.source_file = msg.getSourceFile();
2019-07-30 14:04:18 +00:00
elem.source_line = msg.getSourceLine();
2020-08-05 16:23:14 +00:00
std::shared_ptr<TextLog> text_log_locked{};
2020-08-05 13:33:04 +00:00
{
std::lock_guard<std::mutex> lock(text_log_mutex);
text_log_locked = text_log.lock();
}
if (text_log_locked)
2020-08-05 15:12:06 +00:00
text_log_locked->add(elem);
}
}
2019-06-20 07:17:21 +00:00
void OwnSplitChannel::addChannel(Poco::AutoPtr<Poco::Channel> channel)
{
channels.emplace_back(std::move(channel), dynamic_cast<ExtendedLogChannel *>(channel.get()));
}
void OwnSplitChannel::addTextLog(std::shared_ptr<DB::TextLog> log, int max_priority)
2019-07-30 14:04:18 +00:00
{
2019-07-31 14:42:23 +00:00
std::lock_guard<std::mutex> lock(text_log_mutex);
2019-07-30 14:04:18 +00:00
text_log = log;
2020-03-18 11:59:40 +00:00
text_log_max_priority.store(max_priority, std::memory_order_relaxed);
2019-07-30 14:04:18 +00:00
}
}