#pragma once #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace DB { /** Allow to store structured log in system table. * * Logging is asynchronous. Data is put into queue from where it will be read by separate thread. * That thread inserts log into a table with no more than specified periodicity. */ /** Structure of log, template parameter. * Structure could change on server version update. * If on first write, existing table has different structure, * then it get renamed (put aside) and new table is created. */ /* Example: struct LogElement { /// default constructor must be available /// fields static std::string name(); static Block createBlock(); void appendToBlock(Block & block) const; }; */ #define DBMS_SYSTEM_LOG_QUEUE_SIZE 1048576 class Context; class QueryLog; class QueryThreadLog; class PartLog; class TextLog; class TraceLog; class MetricLog; /// System logs should be destroyed in destructor of the last Context and before tables, /// because SystemLog destruction makes insert query while flushing data into underlying tables struct SystemLogs { SystemLogs(Context & global_context, const Poco::Util::AbstractConfiguration & config); ~SystemLogs(); void shutdown(); std::shared_ptr query_log; /// Used to log queries. std::shared_ptr query_thread_log; /// Used to log query threads. std::shared_ptr part_log; /// Used to log operations with parts std::shared_ptr trace_log; /// Used to log traces from query profiler std::shared_ptr text_log; /// Used to log all text messages. std::shared_ptr metric_log; /// Used to log all metrics. String part_log_database; }; template class SystemLog : private boost::noncopyable { public: using Self = SystemLog; /** Parameter: table name where to write log. * If table is not exists, then it get created with specified engine. * If it already exists, then its structure is checked to be compatible with structure of log record. * If it is compatible, then existing table will be used. * If not - then existing table will be renamed to same name but with suffix '_N' at end, * where N - is a minimal number from 1, for that table with corresponding name doesn't exist yet; * and new table get created - as if previous table was not exist. */ SystemLog( Context & context_, const String & database_name_, const String & table_name_, const String & storage_def_, size_t flush_interval_milliseconds_); ~SystemLog(); /** Append a record into log. * Writing to table will be done asynchronously and in case of failure, record could be lost. */ void add(const LogElement & element); /// Flush data in the buffer to disk void flush(); /// Stop the background flush thread before destructor. No more data will be written. void shutdown(); protected: Context & context; const String database_name; const String table_name; const String storage_def; StoragePtr table; const size_t flush_interval_milliseconds; std::atomic is_shutdown{false}; enum class EntryType { LOG_ELEMENT = 0, AUTO_FLUSH, FORCE_FLUSH, SHUTDOWN, }; using QueueItem = std::pair; /// Queue is bounded. But its size is quite large to not block in all normal cases. ConcurrentBoundedQueue queue {DBMS_SYSTEM_LOG_QUEUE_SIZE}; /** Data that was pulled from queue. Data is accumulated here before enough time passed. * It's possible to implement double-buffering, but we assume that insertion into table is faster * than accumulation of large amount of log records (for example, for query log - processing of large amount of queries). */ std::vector data; Logger * log; /** In this thread, data is pulled from 'queue' and stored in 'data', and then written into table. */ ThreadFromGlobalPool saving_thread; void threadFunction(); /** Creates new table if it does not exist. * Renames old table if its structure is not suitable. * This cannot be done in constructor to avoid deadlock while renaming a table under locked Context when SystemLog object is created. */ bool is_prepared = false; void prepareTable(); std::mutex flush_mutex; std::mutex condvar_mutex; std::condition_variable flush_condvar; bool force_flushing = false; /// flushImpl can be executed only in saving_thread. void flushImpl(EntryType reason); }; template SystemLog::SystemLog(Context & context_, const String & database_name_, const String & table_name_, const String & storage_def_, size_t flush_interval_milliseconds_) : context(context_), database_name(database_name_), table_name(table_name_), storage_def(storage_def_), flush_interval_milliseconds(flush_interval_milliseconds_) { log = &Logger::get("SystemLog (" + database_name + "." + table_name + ")"); data.reserve(DBMS_SYSTEM_LOG_QUEUE_SIZE); saving_thread = ThreadFromGlobalPool([this] { threadFunction(); }); } template void SystemLog::add(const LogElement & element) { if (is_shutdown) return; /// Without try we could block here in case of queue overflow. if (!queue.tryPush({EntryType::LOG_ELEMENT, element})) LOG_ERROR(log, "SystemLog queue is full"); } template void SystemLog::flush() { if (is_shutdown) return; std::lock_guard flush_lock(flush_mutex); force_flushing = true; /// Tell thread to execute extra flush. queue.push({EntryType::FORCE_FLUSH, {}}); /// Wait for flush being finished. std::unique_lock lock(condvar_mutex); while (force_flushing) flush_condvar.wait(lock); } template void SystemLog::shutdown() { bool old_val = false; if (!is_shutdown.compare_exchange_strong(old_val, true)) return; /// Tell thread to shutdown. queue.push({EntryType::SHUTDOWN, {}}); saving_thread.join(); } template SystemLog::~SystemLog() { shutdown(); } template void SystemLog::threadFunction() { setThreadName("SystemLogFlush"); Stopwatch time_after_last_write; bool first = true; while (true) { try { if (first) { time_after_last_write.restart(); first = false; } QueueItem element; bool has_element = false; /// data.size() is increased only in this function /// TODO: get rid of data and queue duality if (data.empty()) { queue.pop(element); has_element = true; } else { size_t milliseconds_elapsed = time_after_last_write.elapsed() / 1000000; if (milliseconds_elapsed < flush_interval_milliseconds) has_element = queue.tryPop(element, flush_interval_milliseconds - milliseconds_elapsed); } if (has_element) { if (element.first == EntryType::SHUTDOWN) { /// NOTE: MergeTree engine can write data even it is already in shutdown state. flushImpl(element.first); break; } else if (element.first == EntryType::FORCE_FLUSH) { flushImpl(element.first); time_after_last_write.restart(); continue; } else data.push_back(element.second); } size_t milliseconds_elapsed = time_after_last_write.elapsed() / 1000000; if (milliseconds_elapsed >= flush_interval_milliseconds) { /// Write data to a table. flushImpl(EntryType::AUTO_FLUSH); time_after_last_write.restart(); } } catch (...) { /// In case of exception we lost accumulated data - to avoid locking. data.clear(); tryLogCurrentException(__PRETTY_FUNCTION__); } } } template void SystemLog::flushImpl(EntryType reason) { try { if ((reason == EntryType::AUTO_FLUSH || reason == EntryType::SHUTDOWN) && data.empty()) return; LOG_TRACE(log, "Flushing system log"); /// We check for existence of the table and create it as needed at every flush. /// This is done to allow user to drop the table at any moment (new empty table will be created automatically). /// BTW, flush method is called from single thread. prepareTable(); Block block = LogElement::createBlock(); for (const LogElement & elem : data) elem.appendToBlock(block); /// Clear queue early, because insertion to the table could lead to generation of more log entrites /// and pushing them to already full queue will lead to deadlock. data.clear(); /// We write to table indirectly, using InterpreterInsertQuery. /// This is needed to support DEFAULT-columns in table. std::unique_ptr insert = std::make_unique(); insert->database = database_name; insert->table = table_name; ASTPtr query_ptr(insert.release()); InterpreterInsertQuery interpreter(query_ptr, context); BlockIO io = interpreter.execute(); io.out->writePrefix(); io.out->write(block); io.out->writeSuffix(); } catch (...) { tryLogCurrentException(__PRETTY_FUNCTION__); /// In case of exception, also clean accumulated data - to avoid locking. data.clear(); } if (reason == EntryType::FORCE_FLUSH) { std::lock_guard lock(condvar_mutex); force_flushing = false; flush_condvar.notify_one(); } } template void SystemLog::prepareTable() { String description = backQuoteIfNeed(database_name) + "." + backQuoteIfNeed(table_name); table = context.tryGetTable(database_name, table_name); if (table) { const Block expected = LogElement::createBlock(); const Block actual = table->getSampleBlockNonMaterialized(); if (!blocksHaveEqualStructure(actual, expected)) { /// Rename the existing table. int suffix = 0; while (context.isTableExist(database_name, table_name + "_" + toString(suffix))) ++suffix; auto rename = std::make_shared(); ASTRenameQuery::Table from; from.database = database_name; from.table = table_name; ASTRenameQuery::Table to; to.database = database_name; to.table = table_name + "_" + toString(suffix); ASTRenameQuery::Element elem; elem.from = from; elem.to = to; rename->elements.emplace_back(elem); LOG_DEBUG(log, "Existing table " << description << " for system log has obsolete or different structure." " Renaming it to " << backQuoteIfNeed(to.table)); InterpreterRenameQuery(rename, context).execute(); /// The required table will be created. table = nullptr; } else if (!is_prepared) LOG_DEBUG(log, "Will use existing table " << description << " for " + LogElement::name()); } if (!table) { /// Create the table. LOG_DEBUG(log, "Creating new table " << description << " for " + LogElement::name()); auto create = std::make_shared(); create->database = database_name; create->table = table_name; Block sample = LogElement::createBlock(); auto new_columns_list = std::make_shared(); new_columns_list->set(new_columns_list->columns, InterpreterCreateQuery::formatColumns(sample.getNamesAndTypesList())); create->set(create->columns_list, new_columns_list); ParserStorage storage_parser; ASTPtr storage_ast = parseQuery( storage_parser, storage_def.data(), storage_def.data() + storage_def.size(), "Storage to create table for " + LogElement::name(), 0); create->set(create->storage, storage_ast); InterpreterCreateQuery interpreter(create, context); interpreter.setInternal(true); interpreter.execute(); table = context.getTable(database_name, table_name); } is_prepared = true; } }