ClickHouse/src/Interpreters/SystemLog.cpp
Alexey Milovidov cfef7ba692 Whitespace
2020-06-13 04:35:49 +03:00

130 lines
4.4 KiB
C++

#include <Interpreters/SystemLog.h>
#include <Interpreters/QueryLog.h>
#include <Interpreters/QueryThreadLog.h>
#include <Interpreters/PartLog.h>
#include <Interpreters/TextLog.h>
#include <Interpreters/TraceLog.h>
#include <Interpreters/MetricLog.h>
#include <Interpreters/AsynchronousMetricLog.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <common/logger_useful.h>
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
namespace
{
constexpr size_t DEFAULT_SYSTEM_LOG_FLUSH_INTERVAL_MILLISECONDS = 7500;
/// Creates a system log with MergeTree engine using parameters from config
template <typename TSystemLog>
std::shared_ptr<TSystemLog> createSystemLog(
Context & context,
const String & default_database_name,
const String & default_table_name,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix)
{
if (!config.has(config_prefix))
return {};
String database = config.getString(config_prefix + ".database", default_database_name);
String table = config.getString(config_prefix + ".table", default_table_name);
if (database != default_database_name)
{
/// System tables must be loaded before other tables, but loading order is undefined for all databases except `system`
LOG_ERROR(&Poco::Logger::get("SystemLog"), "Custom database name for a system table specified in config. Table `{}` will be created in `system` database instead of `{}`", table, database);
database = default_database_name;
}
String engine;
if (config.has(config_prefix + ".engine"))
{
if (config.has(config_prefix + ".partition_by"))
throw Exception("If 'engine' is specified for system table, PARTITION BY parameters should be specified directly inside 'engine' and 'partition_by' setting doesn't make sense", ErrorCodes::BAD_ARGUMENTS);
engine = config.getString(config_prefix + ".engine");
}
else
{
String partition_by = config.getString(config_prefix + ".partition_by", "toYYYYMM(event_date)");
engine = "ENGINE = MergeTree PARTITION BY (" + partition_by + ") ORDER BY (event_date, event_time)"
"SETTINGS min_bytes_for_wide_part = '10M'"; /// Use polymorphic parts for log tables by default
}
size_t flush_interval_milliseconds = config.getUInt64(config_prefix + ".flush_interval_milliseconds", DEFAULT_SYSTEM_LOG_FLUSH_INTERVAL_MILLISECONDS);
return std::make_shared<TSystemLog>(context, database, table, engine, flush_interval_milliseconds);
}
}
SystemLogs::SystemLogs(Context & global_context, const Poco::Util::AbstractConfiguration & config)
{
query_log = createSystemLog<QueryLog>(global_context, "system", "query_log", config, "query_log");
query_thread_log = createSystemLog<QueryThreadLog>(global_context, "system", "query_thread_log", config, "query_thread_log");
part_log = createSystemLog<PartLog>(global_context, "system", "part_log", config, "part_log");
trace_log = createSystemLog<TraceLog>(global_context, "system", "trace_log", config, "trace_log");
text_log = createSystemLog<TextLog>(global_context, "system", "text_log", config, "text_log");
metric_log = createSystemLog<MetricLog>(global_context, "system", "metric_log", config, "metric_log");
asynchronous_metric_log = createSystemLog<AsynchronousMetricLog>(
global_context, "system", "asynchronous_metric_log", config,
"asynchronous_metric_log");
if (query_log)
logs.emplace_back(query_log.get());
if (query_thread_log)
logs.emplace_back(query_thread_log.get());
if (part_log)
logs.emplace_back(part_log.get());
if (trace_log)
logs.emplace_back(trace_log.get());
if (text_log)
logs.emplace_back(text_log.get());
if (metric_log)
logs.emplace_back(metric_log.get());
if (asynchronous_metric_log)
logs.emplace_back(asynchronous_metric_log.get());
try
{
for (auto & log : logs)
log->startup();
}
catch (...)
{
/// join threads
shutdown();
throw;
}
if (metric_log)
{
size_t collect_interval_milliseconds = config.getUInt64("metric_log.collect_interval_milliseconds");
metric_log->startCollectMetric(collect_interval_milliseconds);
}
}
SystemLogs::~SystemLogs()
{
shutdown();
}
void SystemLogs::shutdown()
{
for (auto & log : logs)
log->shutdown();
}
}