rework Context::getSystemLogs, add system logs flush at shutdown

This commit is contained in:
Sema Checherinda 2024-08-01 16:58:17 +02:00
parent 00efd5fc0d
commit e3290c7820
8 changed files with 109 additions and 129 deletions

View File

@ -1171,7 +1171,7 @@ void BackupsWorker::waitAll()
for (const auto & id : current_operations) for (const auto & id : current_operations)
wait(id, /* rethrow_exception= */ false); wait(id, /* rethrow_exception= */ false);
backup_log->flush(backup_log->getLastLogIndex()); backup_log->flush(backup_log->getLastLogIndex(), /* should_prepare_tables_anyway */ false);
LOG_INFO(log, "Backups and restores finished"); LOG_INFO(log, "Backups and restores finished");
} }
@ -1223,6 +1223,8 @@ void BackupsWorker::cancelAll(bool wait_)
for (const auto & id : current_operations) for (const auto & id : current_operations)
wait(id, /* rethrow_exception= */ false); wait(id, /* rethrow_exception= */ false);
backup_log->flush(backup_log->getLastLogIndex(), /* should_prepare_tables_anyway */ false);
LOG_INFO(log, "Backups and restores finished or stopped"); LOG_INFO(log, "Backups and restores finished or stopped");
} }

View File

@ -249,15 +249,15 @@ SystemLogBase<LogElement>::Index SystemLogBase<LogElement>::getLastLogIndex()
} }
template <typename LogElement> template <typename LogElement>
void SystemLogBase<LogElement>::notifyFlush(Index expected_flushed_index) void SystemLogBase<LogElement>::notifyFlush(Index expected_flushed_index, bool should_prepare_tables_anyway)
{ {
queue->notifyFlush(expected_flushed_index, /* should_prepare_tables_anyway */ true); queue->notifyFlush(expected_flushed_index, should_prepare_tables_anyway);
} }
template <typename LogElement> template <typename LogElement>
void SystemLogBase<LogElement>::flush(Index expected_flushed_index) void SystemLogBase<LogElement>::flush(Index expected_flushed_index, bool should_prepare_tables_anyway)
{ {
queue->waitFlush(expected_flushed_index, /* should_prepare_tables_anyway */ true); queue->waitFlush(expected_flushed_index, should_prepare_tables_anyway);
} }
template <typename LogElement> template <typename LogElement>

View File

@ -64,9 +64,9 @@ public:
/// Thereby all the records whose index is less than the flashed index are flushed already. /// Thereby all the records whose index is less than the flashed index are flushed already.
virtual Index getLastLogIndex() = 0; virtual Index getLastLogIndex() = 0;
/// Call this method to wake up the flush thread and flush the data in the background. It is non blocking call /// Call this method to wake up the flush thread and flush the data in the background. It is non blocking call
virtual void notifyFlush(Index expected_flushed_index) = 0; virtual void notifyFlush(Index expected_flushed_index, bool should_prepare_tables_anyway) = 0;
/// Call this method to wait intill the logs are flushed up to expected_flushed_index. It is blocking call. /// Call this method to wait intill the logs are flushed up to expected_flushed_index. It is blocking call.
virtual void flush(Index expected_flushed_index) = 0; virtual void flush(Index expected_flushed_index, bool should_prepare_tables_anyway) = 0;
virtual void prepareTable() = 0; virtual void prepareTable() = 0;
@ -198,10 +198,10 @@ public:
Index getLastLogIndex() override; Index getLastLogIndex() override;
void notifyFlush(Index expected_flushed_index) override; void notifyFlush(Index expected_flushed_index, bool should_prepare_tables_anyway) override;
/// Flush data in the buffer to disk. Block the thread until the data is stored on disk. /// Flush data in the buffer to disk. Block the thread until the data is stored on disk.
void flush(Index expected_flushed_index) override; void flush(Index expected_flushed_index, bool should_prepare_tables_anyway) override;
/// Handles crash, flushes log without blocking if notify_flush_on_crash is set /// Handles crash, flushes log without blocking if notify_flush_on_crash is set
void handleCrash() override; void handleCrash() override;

View File

@ -99,6 +99,7 @@
#include <Common/logger_useful.h> #include <Common/logger_useful.h>
#include <Common/RemoteHostFilter.h> #include <Common/RemoteHostFilter.h>
#include <Common/HTTPHeaderFilter.h> #include <Common/HTTPHeaderFilter.h>
#include <Interpreters/SystemLog.h>
#include <Interpreters/InterpreterSelectQueryAnalyzer.h> #include <Interpreters/InterpreterSelectQueryAnalyzer.h>
#include <Interpreters/AsynchronousInsertQueue.h> #include <Interpreters/AsynchronousInsertQueue.h>
#include <Interpreters/DatabaseCatalog.h> #include <Interpreters/DatabaseCatalog.h>
@ -618,7 +619,7 @@ struct ContextSharedPart : boost::noncopyable
/** After system_logs have been shut down it is guaranteed that no system table gets created or written to. /** After system_logs have been shut down it is guaranteed that no system table gets created or written to.
* Note that part changes at shutdown won't be logged to part log. * Note that part changes at shutdown won't be logged to part log.
*/ */
SHUTDOWN(log, "system logs", system_logs, shutdown()); SHUTDOWN(log, "system logs", system_logs, flushAndShutdown());
LOG_TRACE(log, "Shutting down database catalog"); LOG_TRACE(log, "Shutting down database catalog");
DatabaseCatalog::shutdown(); DatabaseCatalog::shutdown();
@ -4312,13 +4313,13 @@ std::shared_ptr<BlobStorageLog> Context::getBlobStorageLog() const
return shared->system_logs->blob_storage_log; return shared->system_logs->blob_storage_log;
} }
std::vector<ISystemLog *> Context::getSystemLogs() const SystemLogs Context::getSystemLogs() const
{ {
SharedLockGuard lock(shared->mutex); SharedLockGuard lock(shared->mutex);
if (!shared->system_logs) if (!shared->system_logs)
return {}; return {};
return shared->system_logs->logs; return *shared->system_logs;
} }
std::optional<Context::Dashboards> Context::getDashboards() const std::optional<Context::Dashboards> Context::getDashboards() const

View File

@ -48,6 +48,8 @@ namespace DB
class ASTSelectQuery; class ASTSelectQuery;
class SystemLogs;
struct ContextSharedPart; struct ContextSharedPart;
class ContextAccess; class ContextAccess;
class ContextAccessWrapper; class ContextAccessWrapper;
@ -1150,7 +1152,7 @@ public:
std::shared_ptr<BackupLog> getBackupLog() const; std::shared_ptr<BackupLog> getBackupLog() const;
std::shared_ptr<BlobStorageLog> getBlobStorageLog() const; std::shared_ptr<BlobStorageLog> getBlobStorageLog() const;
std::vector<ISystemLog *> getSystemLogs() const; SystemLogs getSystemLogs() const;
using Dashboards = std::vector<std::map<String, String>>; using Dashboards = std::vector<std::map<String, String>>;
std::optional<Dashboards> getDashboards() const; std::optional<Dashboards> getDashboards() const;

View File

@ -710,21 +710,8 @@ BlockIO InterpreterSystemQuery::execute()
case Type::FLUSH_LOGS: case Type::FLUSH_LOGS:
{ {
getContext()->checkAccess(AccessType::SYSTEM_FLUSH_LOGS); getContext()->checkAccess(AccessType::SYSTEM_FLUSH_LOGS);
auto system_logs = getContext()->getSystemLogs();
auto logs = getContext()->getSystemLogs(); system_logs.flush(true);
std::vector<std::function<void()>> commands;
commands.reserve(logs.size());
for (auto * system_log : logs)
{
auto current_index = system_log->getLastLogIndex();
/// The data is started to being flushed in the background after notifyFlush call
system_log->notifyFlush(current_index);
commands.emplace_back([system_log, current_index] { system_log->flush(current_index); });
}
/// The data is flashing in the background, we need to wait until it is done
executeCommandsAndThrowIfError(commands);
break; break;
} }
case Type::STOP_LISTEN: case Type::STOP_LISTEN:

View File

@ -50,6 +50,7 @@
#include <fmt/core.h> #include <fmt/core.h>
namespace DB namespace DB
{ {
@ -312,56 +313,13 @@ SystemLogs::SystemLogs(ContextPtr global_context, const Poco::Util::AbstractConf
azure_queue_log = createSystemLog<ObjectStorageQueueLog>(global_context, "system", "azure_queue_log", config, "azure_queue_log", "Contains logging entries with the information files processes by S3Queue engine."); azure_queue_log = createSystemLog<ObjectStorageQueueLog>(global_context, "system", "azure_queue_log", config, "azure_queue_log", "Contains logging entries with the information files processes by S3Queue engine.");
blob_storage_log = createSystemLog<BlobStorageLog>(global_context, "system", "blob_storage_log", config, "blob_storage_log", "Contains logging entries with information about various blob storage operations such as uploads and deletes."); blob_storage_log = createSystemLog<BlobStorageLog>(global_context, "system", "blob_storage_log", config, "blob_storage_log", "Contains logging entries with information about various blob storage operations such as uploads and deletes.");
if (query_log)
logs.emplace_back(query_log.get());
if (query_thread_log)
logs.emplace_back(query_thread_log.get());
if (part_log)
logs.emplace_back(part_log.get());
if (trace_log)
logs.emplace_back(trace_log.get());
if (crash_log)
logs.emplace_back(crash_log.get());
if (text_log)
logs.emplace_back(text_log.get());
if (metric_log)
logs.emplace_back(metric_log.get());
if (error_log)
logs.emplace_back(error_log.get());
if (asynchronous_metric_log)
logs.emplace_back(asynchronous_metric_log.get());
if (opentelemetry_span_log)
logs.emplace_back(opentelemetry_span_log.get());
if (query_views_log)
logs.emplace_back(query_views_log.get());
if (zookeeper_log)
logs.emplace_back(zookeeper_log.get());
if (session_log) if (session_log)
{
logs.emplace_back(session_log.get());
global_context->addWarningMessage("Table system.session_log is enabled. It's unreliable and may contain garbage. Do not use it for any kind of security monitoring."); global_context->addWarningMessage("Table system.session_log is enabled. It's unreliable and may contain garbage. Do not use it for any kind of security monitoring.");
}
if (transactions_info_log)
logs.emplace_back(transactions_info_log.get());
if (processors_profile_log)
logs.emplace_back(processors_profile_log.get());
if (filesystem_cache_log)
logs.emplace_back(filesystem_cache_log.get());
if (filesystem_read_prefetches_log)
logs.emplace_back(filesystem_read_prefetches_log.get());
if (asynchronous_insert_log)
logs.emplace_back(asynchronous_insert_log.get());
if (backup_log)
logs.emplace_back(backup_log.get());
if (s3_queue_log)
logs.emplace_back(s3_queue_log.get());
if (blob_storage_log)
logs.emplace_back(blob_storage_log.get());
bool should_prepare = global_context->getServerSettings().prepare_system_log_tables_on_startup; bool should_prepare = global_context->getServerSettings().prepare_system_log_tables_on_startup;
try try
{ {
for (auto & log : logs) for (auto & log : getAllLogs())
{ {
log->startup(); log->startup();
if (should_prepare) if (should_prepare)
@ -395,20 +353,56 @@ SystemLogs::SystemLogs(ContextPtr global_context, const Poco::Util::AbstractConf
} }
} }
std::vector<ISystemLog *> SystemLogs::getAllLogs() const
SystemLogs::~SystemLogs()
{ {
/// NOLINTBEGIN(bugprone-macro-parentheses)
#define GET_RAW_POINTERS(log_type, member, descr) \
member.get(), \
std::vector<ISystemLog *> result = {
LIST_OF_ALL_SYSTEM_LOGS(GET_RAW_POINTERS)
};
#undef GET_RAW_POINTERS
/// NOLINTEND(bugprone-macro-parentheses)
auto last_it = std::remove(result.begin(), result.end(), nullptr);
result.erase(last_it, result.end());
return result;
}
void SystemLogs::flush(bool should_prepare_tables_anyway)
{
auto logs = getAllLogs();
std::vector<ISystemLog::Index> logs_indexes(logs.size(), 0);
for (size_t i = 0; i < logs.size(); ++i)
{
auto last_log_index = logs[i]->getLastLogIndex();
logs_indexes[i] = last_log_index;
logs[i]->notifyFlush(last_log_index, should_prepare_tables_anyway);
}
for (size_t i = 0; i < logs.size(); ++i)
logs[i]->flush(logs_indexes[i], should_prepare_tables_anyway);
}
void SystemLogs::flushAndShutdown()
{
flush(/* should_prepare_tables_anyway */ false);
shutdown(); shutdown();
} }
void SystemLogs::shutdown() void SystemLogs::shutdown()
{ {
auto logs = getAllLogs();
for (auto & log : logs) for (auto & log : logs)
log->shutdown(); log->shutdown();
} }
void SystemLogs::handleCrash() void SystemLogs::handleCrash()
{ {
auto logs = getAllLogs();
for (auto & log : logs) for (auto & log : logs)
log->handleCrash(); log->handleCrash();
} }

View File

@ -5,6 +5,32 @@
#include <Parsers/IAST.h> #include <Parsers/IAST.h>
#include <boost/noncopyable.hpp> #include <boost/noncopyable.hpp>
#include <vector>
#define LIST_OF_ALL_SYSTEM_LOGS(M) \
M(QueryLog, query_log, "Used to log queries.") \
M(QueryThreadLog, query_thread_log, "Used to log query threads.") \
M(PartLog, part_log, "Used to log operations with parts.") \
M(TraceLog, trace_log, "Used to log traces from query profiler.") \
M(CrashLog, crash_log, "Used to log server crashes.") \
M(TextLog, text_log, "Used to log all text messages.") \
M(MetricLog, metric_log, "Used to log all metrics.") \
M(ErrorLog, error_log, "Used to log errors.") \
M(FilesystemCacheLog, filesystem_cache_log, "") \
M(FilesystemReadPrefetchesLog, filesystem_read_prefetches_log, "") \
M(ObjectStorageQueueLog, s3_queue_log, "") \
M(ObjectStorageQueueLog, azure_queue_log, "") \
M(AsynchronousMetricLog, asynchronous_metric_log, "Metrics from system.asynchronous_metrics") \
M(OpenTelemetrySpanLog, opentelemetry_span_log, "OpenTelemetry trace spans.") \
M(QueryViewsLog, query_views_log, "Used to log queries of materialized and live views.") \
M(ZooKeeperLog, zookeeper_log, "Used to log all actions of ZooKeeper client.") \
M(SessionLog, session_log, "Login, LogOut and Login failure events.") \
M(TransactionsInfoLog, transactions_info_log, "Events related to transactions.") \
M(ProcessorsProfileLog, processors_profile_log, "Used to log processors profiling") \
M(AsynchronousInsertLog, asynchronous_insert_log, "") \
M(BackupLog, backup_log, "Backup and restore events") \
M(BlobStorageLog, blob_storage_log, "Log blob storage operations") \
namespace DB namespace DB
{ {
@ -34,71 +60,39 @@ namespace DB
}; };
*/ */
class QueryLog; /// NOLINTBEGIN(bugprone-macro-parentheses)
class QueryThreadLog; #define FORWARD_DECLARATION(log_type, member, descr) \
class PartLog; class log_type; \
class TextLog;
class TraceLog; LIST_OF_ALL_SYSTEM_LOGS(FORWARD_DECLARATION)
class CrashLog; #undef FORWARD_DECLARATION
class ErrorLog; /// NOLINTEND(bugprone-macro-parentheses)
class MetricLog;
class AsynchronousMetricLog;
class OpenTelemetrySpanLog;
class QueryViewsLog;
class ZooKeeperLog;
class SessionLog;
class TransactionsInfoLog;
class ProcessorsProfileLog;
class FilesystemCacheLog;
class FilesystemReadPrefetchesLog;
class AsynchronousInsertLog;
class BackupLog;
class ObjectStorageQueueLog;
class BlobStorageLog;
/// System logs should be destroyed in destructor of the last Context and before tables, /// System logs should be destroyed in destructor of the last Context and before tables,
/// because SystemLog destruction makes insert query while flushing data into underlying tables /// because SystemLog destruction makes insert query while flushing data into underlying tables
struct SystemLogs class SystemLogs
{ {
public:
SystemLogs() = default;
SystemLogs(ContextPtr global_context, const Poco::Util::AbstractConfiguration & config); SystemLogs(ContextPtr global_context, const Poco::Util::AbstractConfiguration & config);
~SystemLogs(); SystemLogs(const SystemLogs & other) = default;
void flush(bool should_prepare_tables_anyway);
void flushAndShutdown();
void shutdown(); void shutdown();
void handleCrash(); void handleCrash();
std::shared_ptr<QueryLog> query_log; /// Used to log queries. /// NOLINTBEGIN(bugprone-macro-parentheses)
std::shared_ptr<QueryThreadLog> query_thread_log; /// Used to log query threads. #define PUBLIC_MEMBERS(log_type, member, descr) \
std::shared_ptr<PartLog> part_log; /// Used to log operations with parts std::shared_ptr<log_type> member; \
std::shared_ptr<TraceLog> trace_log; /// Used to log traces from query profiler
std::shared_ptr<CrashLog> crash_log; /// Used to log server crashes.
std::shared_ptr<TextLog> text_log; /// Used to log all text messages.
std::shared_ptr<MetricLog> metric_log; /// Used to log all metrics.
std::shared_ptr<ErrorLog> error_log; /// Used to log errors.
std::shared_ptr<FilesystemCacheLog> filesystem_cache_log;
std::shared_ptr<FilesystemReadPrefetchesLog> filesystem_read_prefetches_log;
std::shared_ptr<ObjectStorageQueueLog> s3_queue_log;
std::shared_ptr<ObjectStorageQueueLog> azure_queue_log;
/// Metrics from system.asynchronous_metrics.
std::shared_ptr<AsynchronousMetricLog> asynchronous_metric_log;
/// OpenTelemetry trace spans.
std::shared_ptr<OpenTelemetrySpanLog> opentelemetry_span_log;
/// Used to log queries of materialized and live views
std::shared_ptr<QueryViewsLog> query_views_log;
/// Used to log all actions of ZooKeeper client
std::shared_ptr<ZooKeeperLog> zookeeper_log;
/// Login, LogOut and Login failure events
std::shared_ptr<SessionLog> session_log;
/// Events related to transactions
std::shared_ptr<TransactionsInfoLog> transactions_info_log;
/// Used to log processors profiling
std::shared_ptr<ProcessorsProfileLog> processors_profile_log;
std::shared_ptr<AsynchronousInsertLog> asynchronous_insert_log;
/// Backup and restore events
std::shared_ptr<BackupLog> backup_log;
/// Log blob storage operations
std::shared_ptr<BlobStorageLog> blob_storage_log;
std::vector<ISystemLog *> logs; LIST_OF_ALL_SYSTEM_LOGS(PUBLIC_MEMBERS)
#undef PUBLIC_MEMBERS
/// NOLINTEND(bugprone-macro-parentheses)
private:
std::vector<ISystemLog *> getAllLogs() const;
}; };
struct SystemLogSettings struct SystemLogSettings