#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace DB { QueryLog::QueryLog(Context & context_, const String & database_name_, const String & table_name_, size_t flush_interval_milliseconds_) : context(context_), database_name(database_name_), table_name(table_name_), flush_interval_milliseconds(flush_interval_milliseconds_) { data.reserve(DBMS_QUERY_LOG_QUEUE_SIZE); { String description = backQuoteIfNeed(database_name) + "." + backQuoteIfNeed(table_name); Poco::ScopedLock lock(context.getMutex()); table = context.tryGetTable(database_name, table_name); if (table) { const Block expected = createBlock(); const Block actual = table->getSampleBlockNonMaterialized(); if (!blocksHaveEqualStructure(actual, expected)) { /// Переименовываем существующую таблицу. int suffix = 0; while (context.isTableExist(database_name, table_name + "_" + toString(suffix))) ++suffix; ASTRenameQuery * rename = new ASTRenameQuery; ASTPtr holder = rename; ASTRenameQuery::Table from; from.database = database_name; from.table = table_name; ASTRenameQuery::Table to; to.database = database_name; to.table = table_name + "_" + toString(suffix); ASTRenameQuery::Element elem; elem.from = from; elem.to = to; rename->elements.emplace_back(elem); LOG_DEBUG(log, "Existing table " << description << " for query log has obsolete or different structure." " Renaming it to " << backQuoteIfNeed(to.table)); InterpreterRenameQuery(holder, context).execute(); /// Нужная таблица будет создана. table = nullptr; } else LOG_DEBUG(log, "Will use existing table " << description << " for query log."); } if (!table) { /// Создаём таблицу. LOG_DEBUG(log, "Creating new table " << description << " for query log."); ASTCreateQuery * create = new ASTCreateQuery; ASTPtr holder = create; create->database = database_name; create->table = table_name; Block sample = createBlock(); create->columns = InterpreterCreateQuery::formatColumns(sample.getColumnsList()); String engine = "MergeTree(event_date, event_time, 8192)"; ParserFunction engine_parser; create->storage = parseQuery(engine_parser, engine.data(), engine.data() + engine.size(), "ENGINE to create table for query log"); InterpreterCreateQuery(holder, context).execute(); table = context.getTable(database_name, table_name); } } saving_thread = std::thread([this] { threadFunction(); }); } QueryLog::~QueryLog() { /// Говорим потоку, что надо завершиться. QueryLogElement elem; elem.type = QueryLogElement::SHUTDOWN; queue.push(elem); saving_thread.join(); } void QueryLog::threadFunction() { Stopwatch time_after_last_write; bool first = true; while (true) { try { if (first) { time_after_last_write.restart(); first = false; } QueryLogElement element; bool has_element = false; if (data.empty()) { queue.pop(element); has_element = true; } else { size_t milliseconds_elapsed = time_after_last_write.elapsed() / 1000000; if (milliseconds_elapsed < flush_interval_milliseconds) has_element = queue.tryPop(element, flush_interval_milliseconds - milliseconds_elapsed); } if (has_element) { if (element.type == QueryLogElement::SHUTDOWN) { flush(); break; } else data.push_back(element); } size_t milliseconds_elapsed = time_after_last_write.elapsed() / 1000000; if (milliseconds_elapsed >= flush_interval_milliseconds) { /// Записываем данные в таблицу. flush(); time_after_last_write.restart(); } } catch (...) { /// В случае ошибки теряем накопленные записи, чтобы не блокироваться. data.clear(); tryLogCurrentException(__PRETTY_FUNCTION__); } } } Block QueryLog::createBlock() { return { {new ColumnUInt8, new DataTypeUInt8, "type"}, {new ColumnUInt16, new DataTypeDate, "event_date"}, {new ColumnUInt32, new DataTypeDateTime, "event_time"}, {new ColumnUInt32, new DataTypeDateTime, "query_start_time"}, {new ColumnUInt64, new DataTypeUInt64, "query_duration_ms"}, {new ColumnUInt64, new DataTypeUInt64, "read_rows"}, {new ColumnUInt64, new DataTypeUInt64, "read_bytes"}, {new ColumnUInt64, new DataTypeUInt64, "result_rows"}, {new ColumnUInt64, new DataTypeUInt64, "result_bytes"}, {new ColumnString, new DataTypeString, "query"}, {new ColumnString, new DataTypeString, "exception"}, {new ColumnString, new DataTypeString, "stack_trace"}, {new ColumnUInt8, new DataTypeUInt8, "interface"}, {new ColumnUInt8, new DataTypeUInt8, "http_method"}, {new ColumnFixedString(16), new DataTypeFixedString(16), "ip_address"}, {new ColumnString, new DataTypeString, "user"}, {new ColumnString, new DataTypeString, "query_id"}, }; } void QueryLog::flush() { try { LOG_TRACE(log, "Flushing query log"); DateLUT & date_lut = DateLUT::instance(); Block block = createBlock(); for (const QueryLogElement & elem : data) { block.unsafeGetByPosition(0).column.get()->insert(static_cast(elem.type)); block.unsafeGetByPosition(1).column.get()->insert(static_cast(date_lut.toDayNum(elem.event_time))); block.unsafeGetByPosition(2).column.get()->insert(static_cast(elem.event_time)); block.unsafeGetByPosition(3).column.get()->insert(static_cast(elem.query_start_time)); block.unsafeGetByPosition(4).column.get()->insert(static_cast(elem.query_duration_ms)); block.unsafeGetByPosition(5).column.get()->insert(static_cast(elem.read_rows)); block.unsafeGetByPosition(6).column.get()->insert(static_cast(elem.read_bytes)); block.unsafeGetByPosition(7).column.get()->insert(static_cast(elem.result_rows)); block.unsafeGetByPosition(8).column.get()->insert(static_cast(elem.result_bytes)); block.unsafeGetByPosition(9).column.get()->insertData(elem.query.data(), elem.query.size()); block.unsafeGetByPosition(10).column.get()->insertData(elem.exception.data(), elem.exception.size()); block.unsafeGetByPosition(11).column.get()->insertData(elem.stack_trace.data(), elem.stack_trace.size()); block.unsafeGetByPosition(12).column.get()->insert(static_cast(elem.interface)); block.unsafeGetByPosition(13).column.get()->insert(static_cast(elem.http_method)); char ipv6_binary[16]; if (Poco::Net::IPAddress::IPv6 == elem.ip_address.family()) { memcpy(ipv6_binary, elem.ip_address.addr(), 16); } else if (Poco::Net::IPAddress::IPv4 == elem.ip_address.family()) { /// Преобразуем в IPv6-mapped адрес. memset(ipv6_binary, 0, 10); ipv6_binary[10] = '\xFF'; ipv6_binary[11] = '\xFF'; memcpy(&ipv6_binary[12], elem.ip_address.addr(), 4); } else memset(ipv6_binary, 0, 16); block.unsafeGetByPosition(14).column.get()->insertData(ipv6_binary, 16); block.unsafeGetByPosition(15).column.get()->insertData(elem.user.data(), elem.user.size()); block.unsafeGetByPosition(16).column.get()->insertData(elem.query_id.data(), elem.query_id.size()); } BlockOutputStreamPtr stream = table->write(nullptr); stream->writePrefix(); stream->write(block); stream->writeSuffix(); } catch (...) { tryLogCurrentException(__PRETTY_FUNCTION__); } /// В случае ошибки тоже очищаем накопленные записи, чтобы не блокироваться. data.clear(); } }