#include #include #include #include #include #include #include #include #include #include #include #include namespace DB { namespace ErrorCodes { extern const int BAD_ARGUMENTS; } namespace { constexpr size_t DEFAULT_SYSTEM_LOG_FLUSH_INTERVAL_MILLISECONDS = 7500; constexpr size_t DEFAULT_METRIC_LOG_COLLECT_INTERVAL_MILLISECONDS = 1000; /// Creates a system log with MergeTree engine using parameters from config template std::shared_ptr createSystemLog( Context & context, const String & default_database_name, const String & default_table_name, const Poco::Util::AbstractConfiguration & config, const String & config_prefix) { if (!config.has(config_prefix)) return {}; String database = config.getString(config_prefix + ".database", default_database_name); String table = config.getString(config_prefix + ".table", default_table_name); if (database != default_database_name) { /// System tables must be loaded before other tables, but loading order is undefined for all databases except `system` LOG_ERROR(&Poco::Logger::get("SystemLog"), "Custom database name for a system table specified in config." " Table `{}` will be created in `system` database instead of `{}`", table, database); database = default_database_name; } String engine; if (config.has(config_prefix + ".engine")) { if (config.has(config_prefix + ".partition_by")) throw Exception("If 'engine' is specified for system table, " "PARTITION BY parameters should be specified directly inside 'engine' and 'partition_by' setting doesn't make sense", ErrorCodes::BAD_ARGUMENTS); if (config.has(config_prefix + ".ttl")) throw Exception("If 'engine' is specified for system table, " "TTL parameters should be specified directly inside 'engine' and 'ttl' setting doesn't make sense", ErrorCodes::BAD_ARGUMENTS); engine = config.getString(config_prefix + ".engine"); } else { String partition_by = config.getString(config_prefix + ".partition_by", "toYYYYMM(event_date)"); engine = "ENGINE = MergeTree"; if (!partition_by.empty()) engine += " PARTITION BY (" + partition_by + ")"; String ttl = config.getString(config_prefix + ".ttl", ""); if (!ttl.empty()) engine += " TTL " + ttl; engine += " ORDER BY (event_date, event_time)"; } // Validate engine definition grammatically to prevent some configuration errors ParserStorage storage_parser; parseQuery(storage_parser, engine.data(), engine.data() + engine.size(), "Storage to create table for " + config_prefix, 0, DBMS_DEFAULT_MAX_PARSER_DEPTH); size_t flush_interval_milliseconds = config.getUInt64(config_prefix + ".flush_interval_milliseconds", DEFAULT_SYSTEM_LOG_FLUSH_INTERVAL_MILLISECONDS); return std::make_shared(context, database, table, engine, flush_interval_milliseconds); } } SystemLogs::SystemLogs(Context & global_context, const Poco::Util::AbstractConfiguration & config) { query_log = createSystemLog(global_context, "system", "query_log", config, "query_log"); query_thread_log = createSystemLog(global_context, "system", "query_thread_log", config, "query_thread_log"); part_log = createSystemLog(global_context, "system", "part_log", config, "part_log"); trace_log = createSystemLog(global_context, "system", "trace_log", config, "trace_log"); crash_log = createSystemLog(global_context, "system", "crash_log", config, "crash_log"); text_log = createSystemLog(global_context, "system", "text_log", config, "text_log"); metric_log = createSystemLog(global_context, "system", "metric_log", config, "metric_log"); asynchronous_metric_log = createSystemLog( global_context, "system", "asynchronous_metric_log", config, "asynchronous_metric_log"); opentelemetry_span_log = createSystemLog( global_context, "system", "opentelemetry_span_log", config, "opentelemetry_span_log"); if (query_log) logs.emplace_back(query_log.get()); if (query_thread_log) logs.emplace_back(query_thread_log.get()); if (part_log) logs.emplace_back(part_log.get()); if (trace_log) logs.emplace_back(trace_log.get()); if (crash_log) logs.emplace_back(crash_log.get()); if (text_log) logs.emplace_back(text_log.get()); if (metric_log) logs.emplace_back(metric_log.get()); if (asynchronous_metric_log) logs.emplace_back(asynchronous_metric_log.get()); if (opentelemetry_span_log) logs.emplace_back(opentelemetry_span_log.get()); try { for (auto & log : logs) log->startup(); } catch (...) { /// join threads shutdown(); throw; } if (metric_log) { size_t collect_interval_milliseconds = config.getUInt64("metric_log.collect_interval_milliseconds", DEFAULT_METRIC_LOG_COLLECT_INTERVAL_MILLISECONDS); metric_log->startCollectMetric(collect_interval_milliseconds); } if (crash_log) { CrashLog::initialize(crash_log); } } SystemLogs::~SystemLogs() { shutdown(); } void SystemLogs::shutdown() { for (auto & log : logs) log->shutdown(); } }