diff --git a/src/Backups/BackupsWorker.cpp b/src/Backups/BackupsWorker.cpp index 363aaae9c8d..106aa89082d 100644 --- a/src/Backups/BackupsWorker.cpp +++ b/src/Backups/BackupsWorker.cpp @@ -1171,7 +1171,7 @@ void BackupsWorker::waitAll() for (const auto & id : current_operations) wait(id, /* rethrow_exception= */ false); - backup_log->flush(backup_log->getLastLogIndex()); + backup_log->flush(backup_log->getLastLogIndex(), /* should_prepare_tables_anyway */ false); LOG_INFO(log, "Backups and restores finished"); } @@ -1223,6 +1223,8 @@ void BackupsWorker::cancelAll(bool wait_) for (const auto & id : current_operations) wait(id, /* rethrow_exception= */ false); + backup_log->flush(backup_log->getLastLogIndex(), /* should_prepare_tables_anyway */ false); + LOG_INFO(log, "Backups and restores finished or stopped"); } diff --git a/src/Common/SystemLogBase.cpp b/src/Common/SystemLogBase.cpp index a35a46c49cc..127c8862a35 100644 --- a/src/Common/SystemLogBase.cpp +++ b/src/Common/SystemLogBase.cpp @@ -249,15 +249,15 @@ SystemLogBase::Index SystemLogBase::getLastLogIndex() } template -void SystemLogBase::notifyFlush(Index expected_flushed_index) +void SystemLogBase::notifyFlush(Index expected_flushed_index, bool should_prepare_tables_anyway) { - queue->notifyFlush(expected_flushed_index, /* should_prepare_tables_anyway */ true); + queue->notifyFlush(expected_flushed_index, should_prepare_tables_anyway); } template -void SystemLogBase::flush(Index expected_flushed_index) +void SystemLogBase::flush(Index expected_flushed_index, bool should_prepare_tables_anyway) { - queue->waitFlush(expected_flushed_index, /* should_prepare_tables_anyway */ true); + queue->waitFlush(expected_flushed_index, should_prepare_tables_anyway); } template diff --git a/src/Common/SystemLogBase.h b/src/Common/SystemLogBase.h index c359287a73f..0d7b04d5c57 100644 --- a/src/Common/SystemLogBase.h +++ b/src/Common/SystemLogBase.h @@ -64,9 +64,9 @@ public: /// Thereby all the records whose index is less than the flashed index are flushed already. virtual Index getLastLogIndex() = 0; /// Call this method to wake up the flush thread and flush the data in the background. It is non blocking call - virtual void notifyFlush(Index expected_flushed_index) = 0; + virtual void notifyFlush(Index expected_flushed_index, bool should_prepare_tables_anyway) = 0; /// Call this method to wait intill the logs are flushed up to expected_flushed_index. It is blocking call. - virtual void flush(Index expected_flushed_index) = 0; + virtual void flush(Index expected_flushed_index, bool should_prepare_tables_anyway) = 0; virtual void prepareTable() = 0; @@ -198,10 +198,10 @@ public: Index getLastLogIndex() override; - void notifyFlush(Index expected_flushed_index) override; + void notifyFlush(Index expected_flushed_index, bool should_prepare_tables_anyway) override; /// Flush data in the buffer to disk. Block the thread until the data is stored on disk. - void flush(Index expected_flushed_index) override; + void flush(Index expected_flushed_index, bool should_prepare_tables_anyway) override; /// Handles crash, flushes log without blocking if notify_flush_on_crash is set void handleCrash() override; diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index 5413b568068..3051ed3e567 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -99,6 +99,7 @@ #include #include #include +#include #include #include #include @@ -618,7 +619,7 @@ struct ContextSharedPart : boost::noncopyable /** After system_logs have been shut down it is guaranteed that no system table gets created or written to. * Note that part changes at shutdown won't be logged to part log. */ - SHUTDOWN(log, "system logs", system_logs, shutdown()); + SHUTDOWN(log, "system logs", system_logs, flushAndShutdown()); LOG_TRACE(log, "Shutting down database catalog"); DatabaseCatalog::shutdown(); @@ -4312,13 +4313,13 @@ std::shared_ptr Context::getBlobStorageLog() const return shared->system_logs->blob_storage_log; } -std::vector Context::getSystemLogs() const +SystemLogs Context::getSystemLogs() const { SharedLockGuard lock(shared->mutex); if (!shared->system_logs) return {}; - return shared->system_logs->logs; + return *shared->system_logs; } std::optional Context::getDashboards() const diff --git a/src/Interpreters/Context.h b/src/Interpreters/Context.h index d5e35c3e4b3..3da4f124553 100644 --- a/src/Interpreters/Context.h +++ b/src/Interpreters/Context.h @@ -48,6 +48,8 @@ namespace DB class ASTSelectQuery; +class SystemLogs; + struct ContextSharedPart; class ContextAccess; class ContextAccessWrapper; @@ -1150,7 +1152,7 @@ public: std::shared_ptr getBackupLog() const; std::shared_ptr getBlobStorageLog() const; - std::vector getSystemLogs() const; + SystemLogs getSystemLogs() const; using Dashboards = std::vector>; std::optional getDashboards() const; diff --git a/src/Interpreters/InterpreterSystemQuery.cpp b/src/Interpreters/InterpreterSystemQuery.cpp index 9b483bac25c..ef6d1040c5e 100644 --- a/src/Interpreters/InterpreterSystemQuery.cpp +++ b/src/Interpreters/InterpreterSystemQuery.cpp @@ -710,21 +710,8 @@ BlockIO InterpreterSystemQuery::execute() case Type::FLUSH_LOGS: { getContext()->checkAccess(AccessType::SYSTEM_FLUSH_LOGS); - - auto logs = getContext()->getSystemLogs(); - - std::vector> commands; - commands.reserve(logs.size()); - for (auto * system_log : logs) - { - auto current_index = system_log->getLastLogIndex(); - /// The data is started to being flushed in the background after notifyFlush call - system_log->notifyFlush(current_index); - commands.emplace_back([system_log, current_index] { system_log->flush(current_index); }); - } - - /// The data is flashing in the background, we need to wait until it is done - executeCommandsAndThrowIfError(commands); + auto system_logs = getContext()->getSystemLogs(); + system_logs.flush(true); break; } case Type::STOP_LISTEN: diff --git a/src/Interpreters/SystemLog.cpp b/src/Interpreters/SystemLog.cpp index 9d07184a0e5..9b58da3f545 100644 --- a/src/Interpreters/SystemLog.cpp +++ b/src/Interpreters/SystemLog.cpp @@ -50,6 +50,7 @@ #include + namespace DB { @@ -312,56 +313,13 @@ SystemLogs::SystemLogs(ContextPtr global_context, const Poco::Util::AbstractConf azure_queue_log = createSystemLog(global_context, "system", "azure_queue_log", config, "azure_queue_log", "Contains logging entries with the information files processes by S3Queue engine."); blob_storage_log = createSystemLog(global_context, "system", "blob_storage_log", config, "blob_storage_log", "Contains logging entries with information about various blob storage operations such as uploads and deletes."); - if (query_log) - logs.emplace_back(query_log.get()); - if (query_thread_log) - logs.emplace_back(query_thread_log.get()); - if (part_log) - logs.emplace_back(part_log.get()); - if (trace_log) - logs.emplace_back(trace_log.get()); - if (crash_log) - logs.emplace_back(crash_log.get()); - if (text_log) - logs.emplace_back(text_log.get()); - if (metric_log) - logs.emplace_back(metric_log.get()); - if (error_log) - logs.emplace_back(error_log.get()); - if (asynchronous_metric_log) - logs.emplace_back(asynchronous_metric_log.get()); - if (opentelemetry_span_log) - logs.emplace_back(opentelemetry_span_log.get()); - if (query_views_log) - logs.emplace_back(query_views_log.get()); - if (zookeeper_log) - logs.emplace_back(zookeeper_log.get()); if (session_log) - { - logs.emplace_back(session_log.get()); global_context->addWarningMessage("Table system.session_log is enabled. It's unreliable and may contain garbage. Do not use it for any kind of security monitoring."); - } - if (transactions_info_log) - logs.emplace_back(transactions_info_log.get()); - if (processors_profile_log) - logs.emplace_back(processors_profile_log.get()); - if (filesystem_cache_log) - logs.emplace_back(filesystem_cache_log.get()); - if (filesystem_read_prefetches_log) - logs.emplace_back(filesystem_read_prefetches_log.get()); - if (asynchronous_insert_log) - logs.emplace_back(asynchronous_insert_log.get()); - if (backup_log) - logs.emplace_back(backup_log.get()); - if (s3_queue_log) - logs.emplace_back(s3_queue_log.get()); - if (blob_storage_log) - logs.emplace_back(blob_storage_log.get()); bool should_prepare = global_context->getServerSettings().prepare_system_log_tables_on_startup; try { - for (auto & log : logs) + for (auto & log : getAllLogs()) { log->startup(); if (should_prepare) @@ -395,20 +353,56 @@ SystemLogs::SystemLogs(ContextPtr global_context, const Poco::Util::AbstractConf } } - -SystemLogs::~SystemLogs() +std::vector SystemLogs::getAllLogs() const { +/// NOLINTBEGIN(bugprone-macro-parentheses) +#define GET_RAW_POINTERS(log_type, member, descr) \ + member.get(), \ + + std::vector result = { + LIST_OF_ALL_SYSTEM_LOGS(GET_RAW_POINTERS) + }; +#undef GET_RAW_POINTERS +/// NOLINTEND(bugprone-macro-parentheses) + + auto last_it = std::remove(result.begin(), result.end(), nullptr); + result.erase(last_it, result.end()); + + return result; +} + +void SystemLogs::flush(bool should_prepare_tables_anyway) +{ + auto logs = getAllLogs(); + std::vector logs_indexes(logs.size(), 0); + + for (size_t i = 0; i < logs.size(); ++i) + { + auto last_log_index = logs[i]->getLastLogIndex(); + logs_indexes[i] = last_log_index; + logs[i]->notifyFlush(last_log_index, should_prepare_tables_anyway); + } + + for (size_t i = 0; i < logs.size(); ++i) + logs[i]->flush(logs_indexes[i], should_prepare_tables_anyway); +} + +void SystemLogs::flushAndShutdown() +{ + flush(/* should_prepare_tables_anyway */ false); shutdown(); } void SystemLogs::shutdown() { + auto logs = getAllLogs(); for (auto & log : logs) log->shutdown(); } void SystemLogs::handleCrash() { + auto logs = getAllLogs(); for (auto & log : logs) log->handleCrash(); } diff --git a/src/Interpreters/SystemLog.h b/src/Interpreters/SystemLog.h index 0ac468b15ec..093be203282 100644 --- a/src/Interpreters/SystemLog.h +++ b/src/Interpreters/SystemLog.h @@ -5,6 +5,32 @@ #include #include +#include + +#define LIST_OF_ALL_SYSTEM_LOGS(M) \ + M(QueryLog, query_log, "Used to log queries.") \ + M(QueryThreadLog, query_thread_log, "Used to log query threads.") \ + M(PartLog, part_log, "Used to log operations with parts.") \ + M(TraceLog, trace_log, "Used to log traces from query profiler.") \ + M(CrashLog, crash_log, "Used to log server crashes.") \ + M(TextLog, text_log, "Used to log all text messages.") \ + M(MetricLog, metric_log, "Used to log all metrics.") \ + M(ErrorLog, error_log, "Used to log errors.") \ + M(FilesystemCacheLog, filesystem_cache_log, "") \ + M(FilesystemReadPrefetchesLog, filesystem_read_prefetches_log, "") \ + M(ObjectStorageQueueLog, s3_queue_log, "") \ + M(ObjectStorageQueueLog, azure_queue_log, "") \ + M(AsynchronousMetricLog, asynchronous_metric_log, "Metrics from system.asynchronous_metrics") \ + M(OpenTelemetrySpanLog, opentelemetry_span_log, "OpenTelemetry trace spans.") \ + M(QueryViewsLog, query_views_log, "Used to log queries of materialized and live views.") \ + M(ZooKeeperLog, zookeeper_log, "Used to log all actions of ZooKeeper client.") \ + M(SessionLog, session_log, "Login, LogOut and Login failure events.") \ + M(TransactionsInfoLog, transactions_info_log, "Events related to transactions.") \ + M(ProcessorsProfileLog, processors_profile_log, "Used to log processors profiling") \ + M(AsynchronousInsertLog, asynchronous_insert_log, "") \ + M(BackupLog, backup_log, "Backup and restore events") \ + M(BlobStorageLog, blob_storage_log, "Log blob storage operations") \ + namespace DB { @@ -34,71 +60,39 @@ namespace DB }; */ -class QueryLog; -class QueryThreadLog; -class PartLog; -class TextLog; -class TraceLog; -class CrashLog; -class ErrorLog; -class MetricLog; -class AsynchronousMetricLog; -class OpenTelemetrySpanLog; -class QueryViewsLog; -class ZooKeeperLog; -class SessionLog; -class TransactionsInfoLog; -class ProcessorsProfileLog; -class FilesystemCacheLog; -class FilesystemReadPrefetchesLog; -class AsynchronousInsertLog; -class BackupLog; -class ObjectStorageQueueLog; -class BlobStorageLog; +/// NOLINTBEGIN(bugprone-macro-parentheses) +#define FORWARD_DECLARATION(log_type, member, descr) \ + class log_type; \ + +LIST_OF_ALL_SYSTEM_LOGS(FORWARD_DECLARATION) +#undef FORWARD_DECLARATION +/// NOLINTEND(bugprone-macro-parentheses) + /// System logs should be destroyed in destructor of the last Context and before tables, /// because SystemLog destruction makes insert query while flushing data into underlying tables -struct SystemLogs +class SystemLogs { +public: + SystemLogs() = default; SystemLogs(ContextPtr global_context, const Poco::Util::AbstractConfiguration & config); - ~SystemLogs(); + SystemLogs(const SystemLogs & other) = default; + void flush(bool should_prepare_tables_anyway); + void flushAndShutdown(); void shutdown(); void handleCrash(); - std::shared_ptr query_log; /// Used to log queries. - std::shared_ptr query_thread_log; /// Used to log query threads. - std::shared_ptr part_log; /// Used to log operations with parts - std::shared_ptr trace_log; /// Used to log traces from query profiler - std::shared_ptr crash_log; /// Used to log server crashes. - std::shared_ptr text_log; /// Used to log all text messages. - std::shared_ptr metric_log; /// Used to log all metrics. - std::shared_ptr error_log; /// Used to log errors. - std::shared_ptr filesystem_cache_log; - std::shared_ptr filesystem_read_prefetches_log; - std::shared_ptr s3_queue_log; - std::shared_ptr azure_queue_log; - /// Metrics from system.asynchronous_metrics. - std::shared_ptr asynchronous_metric_log; - /// OpenTelemetry trace spans. - std::shared_ptr opentelemetry_span_log; - /// Used to log queries of materialized and live views - std::shared_ptr query_views_log; - /// Used to log all actions of ZooKeeper client - std::shared_ptr zookeeper_log; - /// Login, LogOut and Login failure events - std::shared_ptr session_log; - /// Events related to transactions - std::shared_ptr transactions_info_log; - /// Used to log processors profiling - std::shared_ptr processors_profile_log; - std::shared_ptr asynchronous_insert_log; - /// Backup and restore events - std::shared_ptr backup_log; - /// Log blob storage operations - std::shared_ptr blob_storage_log; +/// NOLINTBEGIN(bugprone-macro-parentheses) +#define PUBLIC_MEMBERS(log_type, member, descr) \ + std::shared_ptr member; \ - std::vector logs; + LIST_OF_ALL_SYSTEM_LOGS(PUBLIC_MEMBERS) +#undef PUBLIC_MEMBERS +/// NOLINTEND(bugprone-macro-parentheses) + +private: + std::vector getAllLogs() const; }; struct SystemLogSettings