#include "InternalTextLogsQueue.h" #include #include #include #include #include #include namespace DB { InternalTextLogsQueue::InternalTextLogsQueue() : ConcurrentBoundedQueue(std::numeric_limits::max()), max_priority(Poco::Message::Priority::PRIO_INFORMATION) {} Block InternalTextLogsQueue::getSampleBlock() { return Block { {std::make_shared(), "event_time"}, {std::make_shared(), "event_time_microseconds"}, {std::make_shared(), "host_name"}, {std::make_shared(), "query_id"}, {std::make_shared(), "thread_number"}, {std::make_shared(), "priority"}, {std::make_shared(), "source"}, {std::make_shared(), "text"} }; } MutableColumns InternalTextLogsQueue::getSampleColumns() { static Block sample_block = getSampleBlock(); return sample_block.cloneEmptyColumns(); } void InternalTextLogsQueue::pushBlock(Block && log_block) { static Block sample_block = getSampleBlock(); if (blocksHaveEqualStructure(sample_block, log_block)) emplace(log_block.mutateColumns()); else LOG_WARNING(&Poco::Logger::get("InternalTextLogsQueue"), "Log block have different structure"); } const char * InternalTextLogsQueue::getPriorityName(int priority) { /// See Poco::Message::Priority static const char * PRIORITIES [] = { "Unknown", "Fatal", "Critical", "Error", "Warning", "Notice", "Information", "Debug", "Trace" }; return (priority >= 1 && priority <= 8) ? PRIORITIES[priority] : PRIORITIES[0]; } }