#include "InternalTextLogsQueue.h" #include #include #include #include #include #include namespace DB { InternalTextLogsQueue::InternalTextLogsQueue() : ConcurrentBoundedQueue(std::numeric_limits::max()), max_priority(Poco::Message::Priority::PRIO_INFORMATION) {} Block InternalTextLogsQueue::getSampleBlock() { return Block { {std::make_shared(), "event_time"}, {std::make_shared(), "event_time_microseconds"}, {std::make_shared(), "host_name"}, {std::make_shared(), "query_id"}, {std::make_shared(), "thread_id"}, {std::make_shared(), "priority"}, {std::make_shared(), "source"}, {std::make_shared(), "text"} }; } MutableColumns InternalTextLogsQueue::getSampleColumns() { static Block sample_block = getSampleBlock(); return sample_block.cloneEmptyColumns(); } void InternalTextLogsQueue::pushBlock(Block && log_block) { static Block sample_block = getSampleBlock(); if (blocksHaveEqualStructure(sample_block, log_block)) (void)(emplace(log_block.mutateColumns())); else LOG_WARNING(&Poco::Logger::get("InternalTextLogsQueue"), "Log block have different structure"); } const char * InternalTextLogsQueue::getPriorityName(int priority) { /// See Poco::Message::Priority static constexpr const char * const PRIORITIES[] = { "Unknown", "Fatal", "Critical", "Error", "Warning", "Notice", "Information", "Debug", "Trace" }; return (priority >= 1 && priority <= 8) ? PRIORITIES[priority] : PRIORITIES[0]; } bool InternalTextLogsQueue::isNeeded(int priority, const String & source) const { bool is_needed = priority <= max_priority; if (is_needed && source_regexp) is_needed = re2::RE2::PartialMatch(source, *source_regexp); return is_needed; } void InternalTextLogsQueue::setSourceRegexp(const String & regexp) { source_regexp = std::make_unique(regexp); } }