diff --git a/src/Interpreters/MetricLog.cpp b/src/Interpreters/MetricLog.cpp index 5622e0c65b0..bd898170705 100644 --- a/src/Interpreters/MetricLog.cpp +++ b/src/Interpreters/MetricLog.cpp @@ -70,6 +70,13 @@ void MetricLog::stopCollectMetric() } +void MetricLog::shutdown() +{ + stopCollectMetric(); + stopFlushThread(); +} + + inline UInt64 time_in_milliseconds(std::chrono::time_point timepoint) { return std::chrono::duration_cast(timepoint.time_since_epoch()).count(); diff --git a/src/Interpreters/MetricLog.h b/src/Interpreters/MetricLog.h index c55bad2c12f..a90ce923494 100644 --- a/src/Interpreters/MetricLog.h +++ b/src/Interpreters/MetricLog.h @@ -34,6 +34,8 @@ class MetricLog : public SystemLog using SystemLog::SystemLog; public: + void shutdown() override; + /// Launches a background thread to collect metrics with interval void startCollectMetric(size_t collect_interval_milliseconds_); diff --git a/src/Interpreters/SystemLog.cpp b/src/Interpreters/SystemLog.cpp index a78342f8b17..fc0f2f98125 100644 --- a/src/Interpreters/SystemLog.cpp +++ b/src/Interpreters/SystemLog.cpp @@ -83,6 +83,19 @@ SystemLogs::SystemLogs(Context & global_context, const Poco::Util::AbstractConfi size_t collect_interval_milliseconds = config.getUInt64("metric_log.collect_interval_milliseconds"); metric_log->startCollectMetric(collect_interval_milliseconds); } + + if (query_log) + logs.emplace_back(query_log.get()); + if (query_thread_log) + logs.emplace_back(query_thread_log.get()); + if (part_log) + logs.emplace_back(part_log.get()); + if (trace_log) + logs.emplace_back(trace_log.get()); + if (text_log) + logs.emplace_back(text_log.get()); + if (metric_log) + logs.emplace_back(metric_log.get()); } @@ -93,21 +106,8 @@ SystemLogs::~SystemLogs() void SystemLogs::shutdown() { - if (query_log) - query_log->shutdown(); - if (query_thread_log) - query_thread_log->shutdown(); - if (part_log) - part_log->shutdown(); - if (trace_log) - trace_log->shutdown(); - if (text_log) - text_log->shutdown(); - if (metric_log) - { - metric_log->stopCollectMetric(); - metric_log->shutdown(); - } + for (auto & log : logs) + log->shutdown(); } } diff --git a/src/Interpreters/SystemLog.h b/src/Interpreters/SystemLog.h index 87da342ae1f..7c8dc1606f7 100644 --- a/src/Interpreters/SystemLog.h +++ b/src/Interpreters/SystemLog.h @@ -2,6 +2,9 @@ #include #include +#include +#include + #include #include #include @@ -59,13 +62,20 @@ namespace ErrorCodes #define DBMS_SYSTEM_LOG_QUEUE_SIZE 1048576 + class Context; -class QueryLog; -class QueryThreadLog; -class PartLog; -class TextLog; -class TraceLog; -class MetricLog; + + +class ISystemLog +{ +public: + virtual String getName() = 0; + virtual ASTPtr getCreateTableQuery() = 0; + virtual void flush() = 0; + virtual void shutdown() = 0; + virtual ~ISystemLog() = default; +}; + /// System logs should be destroyed in destructor of the last Context and before tables, /// because SystemLog destruction makes insert query while flushing data into underlying tables @@ -82,11 +92,13 @@ struct SystemLogs std::shared_ptr trace_log; /// Used to log traces from query profiler std::shared_ptr text_log; /// Used to log all text messages. std::shared_ptr metric_log; /// Used to log all metrics. + + std::vector logs; }; template -class SystemLog : private boost::noncopyable +class SystemLog : public ISystemLog, private boost::noncopyable { public: using Self = SystemLog; @@ -106,18 +118,28 @@ public: const String & storage_def_, size_t flush_interval_milliseconds_); - ~SystemLog(); - /** Append a record into log. * Writing to table will be done asynchronously and in case of failure, record could be lost. */ void add(const LogElement & element); + void stopFlushThread(); + /// Flush data in the buffer to disk - void flush(); + void flush() override; /// Stop the background flush thread before destructor. No more data will be written. - void shutdown(); + void shutdown() override + { + stopFlushThread(); + } + + String getName() override + { + return LogElement::name(); + } + + ASTPtr getCreateTableQuery() override; protected: Logger * log; @@ -250,7 +272,7 @@ void SystemLog::flush() template -void SystemLog::shutdown() +void SystemLog::stopFlushThread() { { std::unique_lock lock(mutex); @@ -270,13 +292,6 @@ void SystemLog::shutdown() } -template -SystemLog::~SystemLog() -{ - shutdown(); -} - - template void SystemLog::savingThreadFunction() { @@ -399,7 +414,7 @@ void SystemLog::prepareTable() rename->elements.emplace_back(elem); LOG_DEBUG(log, "Existing table " << description << " for system log has obsolete or different structure." - " Renaming it to " << backQuoteIfNeed(to.table)); + " Renaming it to " << backQuoteIfNeed(to.table)); InterpreterRenameQuery(rename, context).execute(); @@ -415,22 +430,7 @@ void SystemLog::prepareTable() /// Create the table. LOG_DEBUG(log, "Creating new table " << description << " for " + LogElement::name()); - auto create = std::make_shared(); - - create->database = table_id.database_name; - create->table = table_id.table_name; - - Block sample = LogElement::createBlock(); - - auto new_columns_list = std::make_shared(); - new_columns_list->set(new_columns_list->columns, InterpreterCreateQuery::formatColumns(sample.getNamesAndTypesList())); - create->set(create->columns_list, new_columns_list); - - ParserStorage storage_parser; - ASTPtr storage_ast = parseQuery( - storage_parser, storage_def.data(), storage_def.data() + storage_def.size(), - "Storage to create table for " + LogElement::name(), 0); - create->set(create->storage, storage_ast); + auto create = getCreateTableQuery(); InterpreterCreateQuery interpreter(create, context); interpreter.setInternal(true); @@ -442,4 +442,28 @@ void SystemLog::prepareTable() is_prepared = true; } + +template +ASTPtr SystemLog::getCreateTableQuery() +{ + auto create = std::make_shared(); + + create->database = table_id.database_name; + create->table = table_id.table_name; + + Block sample = LogElement::createBlock(); + + auto new_columns_list = std::make_shared(); + new_columns_list->set(new_columns_list->columns, InterpreterCreateQuery::formatColumns(sample.getNamesAndTypesList())); + create->set(create->columns_list, new_columns_list); + + ParserStorage storage_parser; + ASTPtr storage_ast = parseQuery( + storage_parser, storage_def.data(), storage_def.data() + storage_def.size(), + "Storage to create table for " + LogElement::name(), 0); + create->set(create->storage, storage_ast); + + return create; +} + }