2019-06-14 14:00:37 +00:00
|
|
|
#include "OwnSplitChannel.h"
|
2021-09-30 19:46:12 +00:00
|
|
|
#include "OwnFormattingChannel.h"
|
2018-06-15 17:32:35 +00:00
|
|
|
|
2019-06-14 14:00:37 +00:00
|
|
|
#include <iostream>
|
2018-06-15 17:32:35 +00:00
|
|
|
#include <Core/Block.h>
|
2019-06-14 14:00:37 +00:00
|
|
|
#include <Interpreters/InternalTextLogsQueue.h>
|
2019-07-22 13:54:08 +00:00
|
|
|
#include <Interpreters/TextLog.h>
|
2021-05-12 18:18:15 +00:00
|
|
|
#include <IO/WriteBufferFromFileDescriptor.h>
|
2019-06-14 14:00:37 +00:00
|
|
|
#include <sys/time.h>
|
2018-06-15 17:32:35 +00:00
|
|
|
#include <Poco/Message.h>
|
2019-06-14 14:00:37 +00:00
|
|
|
#include <Common/CurrentThread.h>
|
|
|
|
#include <Common/DNSResolver.h>
|
2022-01-10 19:01:41 +00:00
|
|
|
#include <Common/setThreadName.h>
|
2022-01-10 19:39:10 +00:00
|
|
|
#include <Common/LockMemoryExceptionInThread.h>
|
2021-10-02 07:13:14 +00:00
|
|
|
#include <base/getThreadId.h>
|
2019-09-06 17:48:27 +00:00
|
|
|
#include <Common/SensitiveDataMasker.h>
|
2021-06-02 05:01:09 +00:00
|
|
|
#include <Common/IO.h>
|
2018-06-15 17:32:35 +00:00
|
|
|
|
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
void OwnSplitChannel::log(const Poco::Message & msg)
|
|
|
|
{
|
|
|
|
auto logs_queue = CurrentThread::getInternalTextLogsQueue();
|
|
|
|
|
|
|
|
if (channels.empty() && (logs_queue == nullptr || msg.getPriority() > logs_queue->max_priority))
|
|
|
|
return;
|
|
|
|
|
2020-04-21 22:04:19 +00:00
|
|
|
if (auto * masker = SensitiveDataMasker::getInstance())
|
2019-06-20 07:17:21 +00:00
|
|
|
{
|
|
|
|
auto message_text = msg.getText();
|
2019-07-19 07:44:18 +00:00
|
|
|
auto matches = masker->wipeSensitiveData(message_text);
|
2019-06-20 07:17:21 +00:00
|
|
|
if (matches > 0)
|
|
|
|
{
|
2021-05-12 18:18:15 +00:00
|
|
|
tryLogSplit({msg, message_text}); // we will continue with the copy of original message with text modified
|
2019-06-20 07:17:21 +00:00
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
2019-09-06 17:48:27 +00:00
|
|
|
|
2021-05-12 18:18:15 +00:00
|
|
|
tryLogSplit(msg);
|
2019-06-20 07:17:21 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
2021-05-12 18:18:15 +00:00
|
|
|
void OwnSplitChannel::tryLogSplit(const Poco::Message & msg)
|
|
|
|
{
|
|
|
|
try
|
|
|
|
{
|
|
|
|
logSplit(msg);
|
|
|
|
}
|
2021-06-02 05:01:09 +00:00
|
|
|
/// It is better to catch the errors here in order to avoid
|
|
|
|
/// breaking some functionality because of unexpected "File not
|
|
|
|
/// found" (or similar) error.
|
|
|
|
///
|
|
|
|
/// For example StorageDistributedDirectoryMonitor will mark batch
|
|
|
|
/// as broken, some MergeTree code can also be affected.
|
|
|
|
///
|
|
|
|
/// Also note, that we cannot log the exception here, since this
|
|
|
|
/// will lead to recursion, using regular tryLogCurrentException().
|
|
|
|
/// but let's log it into the stderr at least.
|
2021-05-12 18:18:15 +00:00
|
|
|
catch (...)
|
|
|
|
{
|
2022-01-10 19:39:10 +00:00
|
|
|
LockMemoryExceptionInThread lock_memory_tracker(VariableContext::Global);
|
2021-05-13 19:06:28 +00:00
|
|
|
|
2021-06-02 05:01:09 +00:00
|
|
|
const std::string & exception_message = getCurrentExceptionMessage(true);
|
|
|
|
const std::string & message = msg.getText();
|
2021-05-12 18:18:15 +00:00
|
|
|
|
2021-06-07 06:40:29 +00:00
|
|
|
/// NOTE: errors are ignored, since nothing can be done.
|
|
|
|
writeRetry(STDERR_FILENO, "Cannot add message to the log: ");
|
|
|
|
writeRetry(STDERR_FILENO, message.data(), message.size());
|
|
|
|
writeRetry(STDERR_FILENO, "\n");
|
|
|
|
writeRetry(STDERR_FILENO, exception_message.data(), exception_message.size());
|
|
|
|
writeRetry(STDERR_FILENO, "\n");
|
2021-05-12 18:18:15 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-06-20 07:17:21 +00:00
|
|
|
void OwnSplitChannel::logSplit(const Poco::Message & msg)
|
|
|
|
{
|
2018-06-15 17:32:35 +00:00
|
|
|
ExtendedLogMessage msg_ext = ExtendedLogMessage::getFrom(msg);
|
|
|
|
|
|
|
|
/// Log data to child channels
|
2021-09-30 19:46:12 +00:00
|
|
|
for (auto & [name, channel] : channels)
|
2018-06-15 17:32:35 +00:00
|
|
|
{
|
|
|
|
if (channel.second)
|
|
|
|
channel.second->logExtended(msg_ext); // extended child
|
|
|
|
else
|
|
|
|
channel.first->log(msg); // ordinary child
|
|
|
|
}
|
|
|
|
|
2019-06-20 07:17:21 +00:00
|
|
|
auto logs_queue = CurrentThread::getInternalTextLogsQueue();
|
|
|
|
|
2018-06-15 17:32:35 +00:00
|
|
|
/// Log to "TCP queue" if message is not too noisy
|
|
|
|
if (logs_queue && msg.getPriority() <= logs_queue->max_priority)
|
|
|
|
{
|
|
|
|
MutableColumns columns = InternalTextLogsQueue::getSampleColumns();
|
|
|
|
|
|
|
|
size_t i = 0;
|
2018-10-22 08:54:54 +00:00
|
|
|
columns[i++]->insert(msg_ext.time_seconds);
|
|
|
|
columns[i++]->insert(msg_ext.time_microseconds);
|
2018-06-15 17:32:35 +00:00
|
|
|
columns[i++]->insert(DNSResolver::instance().getHostName());
|
|
|
|
columns[i++]->insert(msg_ext.query_id);
|
2020-02-02 02:27:15 +00:00
|
|
|
columns[i++]->insert(msg_ext.thread_id);
|
2018-10-22 08:54:54 +00:00
|
|
|
columns[i++]->insert(Int64(msg.getPriority()));
|
2018-06-15 17:32:35 +00:00
|
|
|
columns[i++]->insert(msg.getSource());
|
|
|
|
columns[i++]->insert(msg.getText());
|
|
|
|
|
2021-10-08 08:48:08 +00:00
|
|
|
[[maybe_unused]] bool push_result = logs_queue->emplace(std::move(columns));
|
2018-06-15 17:32:35 +00:00
|
|
|
}
|
2018-06-20 15:21:42 +00:00
|
|
|
|
2020-01-23 20:19:51 +00:00
|
|
|
/// Also log to system.text_log table, if message is not too noisy
|
2020-03-18 11:59:40 +00:00
|
|
|
auto text_log_max_priority_loaded = text_log_max_priority.load(std::memory_order_relaxed);
|
|
|
|
if (text_log_max_priority_loaded && msg.getPriority() <= text_log_max_priority_loaded)
|
2020-01-23 20:19:51 +00:00
|
|
|
{
|
|
|
|
TextLogElement elem;
|
2019-07-22 13:54:08 +00:00
|
|
|
|
2020-01-23 20:19:51 +00:00
|
|
|
elem.event_time = msg_ext.time_seconds;
|
2020-09-06 00:01:51 +00:00
|
|
|
elem.event_time_microseconds = msg_ext.time_in_microseconds;
|
2020-01-23 20:19:51 +00:00
|
|
|
elem.microseconds = msg_ext.time_microseconds;
|
2019-07-22 13:54:08 +00:00
|
|
|
|
2020-01-23 20:19:51 +00:00
|
|
|
elem.thread_name = getThreadName();
|
2020-02-02 02:27:15 +00:00
|
|
|
elem.thread_id = msg_ext.thread_id;
|
2019-07-22 13:54:08 +00:00
|
|
|
|
2020-01-23 20:19:51 +00:00
|
|
|
elem.query_id = msg_ext.query_id;
|
2019-07-22 15:09:33 +00:00
|
|
|
|
2020-01-23 20:19:51 +00:00
|
|
|
elem.message = msg.getText();
|
|
|
|
elem.logger_name = msg.getSource();
|
|
|
|
elem.level = msg.getPriority();
|
2019-07-22 13:54:08 +00:00
|
|
|
|
2020-01-23 20:19:51 +00:00
|
|
|
if (msg.getSourceFile() != nullptr)
|
|
|
|
elem.source_file = msg.getSourceFile();
|
2019-07-30 14:04:18 +00:00
|
|
|
|
2020-01-23 20:19:51 +00:00
|
|
|
elem.source_line = msg.getSourceLine();
|
2020-08-05 16:23:14 +00:00
|
|
|
std::shared_ptr<TextLog> text_log_locked{};
|
2020-08-05 13:33:04 +00:00
|
|
|
{
|
|
|
|
std::lock_guard<std::mutex> lock(text_log_mutex);
|
|
|
|
text_log_locked = text_log.lock();
|
|
|
|
}
|
|
|
|
if (text_log_locked)
|
2020-08-05 15:12:06 +00:00
|
|
|
text_log_locked->add(elem);
|
2020-01-23 20:19:51 +00:00
|
|
|
}
|
2018-06-15 17:32:35 +00:00
|
|
|
}
|
|
|
|
|
2019-06-20 07:17:21 +00:00
|
|
|
|
2021-09-30 19:46:12 +00:00
|
|
|
void OwnSplitChannel::addChannel(Poco::AutoPtr<Poco::Channel> channel, const std::string & name)
|
2018-06-15 17:32:35 +00:00
|
|
|
{
|
2021-09-30 19:46:12 +00:00
|
|
|
channels.emplace(name, ExtendedChannelPtrPair(std::move(channel), dynamic_cast<ExtendedLogChannel *>(channel.get())));
|
2018-06-15 17:32:35 +00:00
|
|
|
}
|
|
|
|
|
2020-01-23 20:19:51 +00:00
|
|
|
void OwnSplitChannel::addTextLog(std::shared_ptr<DB::TextLog> log, int max_priority)
|
2019-07-30 14:04:18 +00:00
|
|
|
{
|
2019-07-31 14:42:23 +00:00
|
|
|
std::lock_guard<std::mutex> lock(text_log_mutex);
|
2019-07-30 14:04:18 +00:00
|
|
|
text_log = log;
|
2020-03-18 11:59:40 +00:00
|
|
|
text_log_max_priority.store(max_priority, std::memory_order_relaxed);
|
2019-07-30 14:04:18 +00:00
|
|
|
}
|
2018-06-15 17:32:35 +00:00
|
|
|
|
2021-09-30 19:46:12 +00:00
|
|
|
void OwnSplitChannel::setLevel(const std::string & name, int level)
|
|
|
|
{
|
|
|
|
auto it = channels.find(name);
|
|
|
|
if (it != channels.end())
|
|
|
|
{
|
|
|
|
if (auto * channel = dynamic_cast<DB::OwnFormattingChannel *>(it->second.first.get()))
|
|
|
|
channel->setLevel(level);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2018-06-15 17:32:35 +00:00
|
|
|
}
|