diff --git a/src/Interpreters/SystemLog.cpp b/src/Interpreters/SystemLog.cpp index 3740ac3f4c8..cfb31ab2a41 100644 --- a/src/Interpreters/SystemLog.cpp +++ b/src/Interpreters/SystemLog.cpp @@ -30,8 +30,7 @@ std::shared_ptr createSystemLog( const String & default_database_name, const String & default_table_name, const Poco::Util::AbstractConfiguration & config, - const String & config_prefix, - bool lazy_load) + const String & config_prefix) { if (!config.has(config_prefix)) return {}; @@ -64,7 +63,7 @@ std::shared_ptr createSystemLog( size_t flush_interval_milliseconds = config.getUInt64(config_prefix + ".flush_interval_milliseconds", DEFAULT_SYSTEM_LOG_FLUSH_INTERVAL_MILLISECONDS); - return std::make_shared(context, database, table, engine, flush_interval_milliseconds, lazy_load); + return std::make_shared(context, database, table, engine, flush_interval_milliseconds); } } @@ -72,14 +71,12 @@ std::shared_ptr createSystemLog( SystemLogs::SystemLogs(Context & global_context, const Poco::Util::AbstractConfiguration & config) { - bool lazy_load = config.getBool("system_tables_lazy_load", true); - - query_log = createSystemLog(global_context, "system", "query_log", config, "query_log", lazy_load); - query_thread_log = createSystemLog(global_context, "system", "query_thread_log", config, "query_thread_log", lazy_load); - part_log = createSystemLog(global_context, "system", "part_log", config, "part_log", lazy_load); - trace_log = createSystemLog(global_context, "system", "trace_log", config, "trace_log", lazy_load); - text_log = createSystemLog(global_context, "system", "text_log", config, "text_log", lazy_load); - metric_log = createSystemLog(global_context, "system", "metric_log", config, "metric_log", lazy_load); + query_log = createSystemLog(global_context, "system", "query_log", config, "query_log"); + query_thread_log = createSystemLog(global_context, "system", "query_thread_log", config, "query_thread_log"); + part_log = createSystemLog(global_context, "system", "part_log", config, "part_log"); + trace_log = createSystemLog(global_context, "system", "trace_log", config, "trace_log"); + text_log = createSystemLog(global_context, "system", "text_log", config, "text_log"); + metric_log = createSystemLog(global_context, "system", "metric_log", config, "metric_log"); if (metric_log) { @@ -99,6 +96,14 @@ SystemLogs::SystemLogs(Context & global_context, const Poco::Util::AbstractConfi logs.emplace_back(text_log.get()); if (metric_log) logs.emplace_back(metric_log.get()); + + bool lazy_load = config.getBool("system_tables_lazy_load", true); + for (auto & log : logs) + { + if (!lazy_load) + log->prepareTable(); + log->startup(); + } } diff --git a/src/Interpreters/SystemLog.h b/src/Interpreters/SystemLog.h index ceffba67b50..066956ed53a 100644 --- a/src/Interpreters/SystemLog.h +++ b/src/Interpreters/SystemLog.h @@ -73,6 +73,8 @@ public: virtual String getName() = 0; virtual ASTPtr getCreateTableQuery() = 0; virtual void flush() = 0; + virtual void prepareTable() = 0; + virtual void startup() = 0; virtual void shutdown() = 0; virtual ~ISystemLog() = default; }; @@ -117,8 +119,7 @@ public: const String & database_name_, const String & table_name_, const String & storage_def_, - size_t flush_interval_milliseconds_, - bool lazy_load); + size_t flush_interval_milliseconds_); /** Append a record into log. * Writing to table will be done asynchronously and in case of failure, record could be lost. @@ -130,6 +131,9 @@ public: /// Flush data in the buffer to disk void flush() override; + /// Start the background thread. + void startup() override; + /// Stop the background flush thread before destructor. No more data will be written. void shutdown() override { @@ -178,7 +182,7 @@ private: * Renames old table if its structure is not suitable. * This cannot be done in constructor to avoid deadlock while renaming a table under locked Context when SystemLog object is created. */ - void prepareTable(); + void prepareTable() override; /// flushImpl can be executed only in saving_thread. void flushImpl(const std::vector & to_flush, uint64_t to_flush_end); @@ -190,8 +194,7 @@ SystemLog::SystemLog(Context & context_, const String & database_name_, const String & table_name_, const String & storage_def_, - size_t flush_interval_milliseconds_, - bool lazy_load) + size_t flush_interval_milliseconds_) : context(context_) , table_id(database_name_, table_name_) , storage_def(storage_def_), @@ -199,12 +202,12 @@ SystemLog::SystemLog(Context & context_, { assert(database_name_ == DatabaseCatalog::SYSTEM_DATABASE); log = &Logger::get("SystemLog (" + database_name_ + "." + table_name_ + ")"); +} - if (!lazy_load) - { - prepareTable(); - } +template +void SystemLog::startup() +{ saving_thread = ThreadFromGlobalPool([this] { savingThreadFunction(); }); } diff --git a/src/Interpreters/TextLog.cpp b/src/Interpreters/TextLog.cpp index 8ecf6f029d2..b5f1d987b91 100644 --- a/src/Interpreters/TextLog.cpp +++ b/src/Interpreters/TextLog.cpp @@ -72,9 +72,9 @@ void TextLogElement::appendToBlock(Block & block) const TextLog::TextLog(Context & context_, const String & database_name_, const String & table_name_, const String & storage_def_, - size_t flush_interval_milliseconds_, bool lazy_load) + size_t flush_interval_milliseconds_) : SystemLog(context_, database_name_, table_name_, - storage_def_, flush_interval_milliseconds_, lazy_load) + storage_def_, flush_interval_milliseconds_) { // SystemLog methods may write text logs, so we disable logging for the text // log table to avoid recursion. diff --git a/src/Interpreters/TextLog.h b/src/Interpreters/TextLog.h index aaf64974e05..73c38429662 100644 --- a/src/Interpreters/TextLog.h +++ b/src/Interpreters/TextLog.h @@ -36,8 +36,7 @@ public: const String & database_name_, const String & table_name_, const String & storage_def_, - size_t flush_interval_milliseconds_, - bool lazy_load); + size_t flush_interval_milliseconds_); }; }