Small refactoring of SystemLogs

This commit is contained in:
Alexey Milovidov 2020-04-13 04:33:05 +03:00
parent 4a83e85893
commit c7eaaaf7fe
4 changed files with 84 additions and 51 deletions

View File

@ -70,6 +70,13 @@ void MetricLog::stopCollectMetric()
}
void MetricLog::shutdown()
{
stopCollectMetric();
stopFlushThread();
}
inline UInt64 time_in_milliseconds(std::chrono::time_point<std::chrono::system_clock> timepoint)
{
return std::chrono::duration_cast<std::chrono::milliseconds>(timepoint.time_since_epoch()).count();

View File

@ -34,6 +34,8 @@ class MetricLog : public SystemLog<MetricLogElement>
using SystemLog<MetricLogElement>::SystemLog;
public:
void shutdown() override;
/// Launches a background thread to collect metrics with interval
void startCollectMetric(size_t collect_interval_milliseconds_);

View File

@ -83,6 +83,19 @@ SystemLogs::SystemLogs(Context & global_context, const Poco::Util::AbstractConfi
size_t collect_interval_milliseconds = config.getUInt64("metric_log.collect_interval_milliseconds");
metric_log->startCollectMetric(collect_interval_milliseconds);
}
if (query_log)
logs.emplace_back(query_log.get());
if (query_thread_log)
logs.emplace_back(query_thread_log.get());
if (part_log)
logs.emplace_back(part_log.get());
if (trace_log)
logs.emplace_back(trace_log.get());
if (text_log)
logs.emplace_back(text_log.get());
if (metric_log)
logs.emplace_back(metric_log.get());
}
@ -93,21 +106,8 @@ SystemLogs::~SystemLogs()
void SystemLogs::shutdown()
{
if (query_log)
query_log->shutdown();
if (query_thread_log)
query_thread_log->shutdown();
if (part_log)
part_log->shutdown();
if (trace_log)
trace_log->shutdown();
if (text_log)
text_log->shutdown();
if (metric_log)
{
metric_log->stopCollectMetric();
metric_log->shutdown();
}
for (auto & log : logs)
log->shutdown();
}
}

View File

@ -2,6 +2,9 @@
#include <thread>
#include <atomic>
#include <memory>
#include <vector>
#include <condition_variable>
#include <boost/noncopyable.hpp>
#include <common/logger_useful.h>
@ -59,13 +62,20 @@ namespace ErrorCodes
#define DBMS_SYSTEM_LOG_QUEUE_SIZE 1048576
class Context;
class QueryLog;
class QueryThreadLog;
class PartLog;
class TextLog;
class TraceLog;
class MetricLog;
class ISystemLog
{
public:
virtual String getName() = 0;
virtual ASTPtr getCreateTableQuery() = 0;
virtual void flush() = 0;
virtual void shutdown() = 0;
virtual ~ISystemLog() = default;
};
/// System logs should be destroyed in destructor of the last Context and before tables,
/// because SystemLog destruction makes insert query while flushing data into underlying tables
@ -82,11 +92,13 @@ struct SystemLogs
std::shared_ptr<TraceLog> trace_log; /// Used to log traces from query profiler
std::shared_ptr<TextLog> text_log; /// Used to log all text messages.
std::shared_ptr<MetricLog> metric_log; /// Used to log all metrics.
std::vector<ISystemLog *> logs;
};
template <typename LogElement>
class SystemLog : private boost::noncopyable
class SystemLog : public ISystemLog, private boost::noncopyable
{
public:
using Self = SystemLog;
@ -106,18 +118,28 @@ public:
const String & storage_def_,
size_t flush_interval_milliseconds_);
~SystemLog();
/** Append a record into log.
* Writing to table will be done asynchronously and in case of failure, record could be lost.
*/
void add(const LogElement & element);
void stopFlushThread();
/// Flush data in the buffer to disk
void flush();
void flush() override;
/// Stop the background flush thread before destructor. No more data will be written.
void shutdown();
void shutdown() override
{
stopFlushThread();
}
String getName() override
{
return LogElement::name();
}
ASTPtr getCreateTableQuery() override;
protected:
Logger * log;
@ -250,7 +272,7 @@ void SystemLog<LogElement>::flush()
template <typename LogElement>
void SystemLog<LogElement>::shutdown()
void SystemLog<LogElement>::stopFlushThread()
{
{
std::unique_lock lock(mutex);
@ -270,13 +292,6 @@ void SystemLog<LogElement>::shutdown()
}
template <typename LogElement>
SystemLog<LogElement>::~SystemLog()
{
shutdown();
}
template <typename LogElement>
void SystemLog<LogElement>::savingThreadFunction()
{
@ -399,7 +414,7 @@ void SystemLog<LogElement>::prepareTable()
rename->elements.emplace_back(elem);
LOG_DEBUG(log, "Existing table " << description << " for system log has obsolete or different structure."
" Renaming it to " << backQuoteIfNeed(to.table));
" Renaming it to " << backQuoteIfNeed(to.table));
InterpreterRenameQuery(rename, context).execute();
@ -415,22 +430,7 @@ void SystemLog<LogElement>::prepareTable()
/// Create the table.
LOG_DEBUG(log, "Creating new table " << description << " for " + LogElement::name());
auto create = std::make_shared<ASTCreateQuery>();
create->database = table_id.database_name;
create->table = table_id.table_name;
Block sample = LogElement::createBlock();
auto new_columns_list = std::make_shared<ASTColumns>();
new_columns_list->set(new_columns_list->columns, InterpreterCreateQuery::formatColumns(sample.getNamesAndTypesList()));
create->set(create->columns_list, new_columns_list);
ParserStorage storage_parser;
ASTPtr storage_ast = parseQuery(
storage_parser, storage_def.data(), storage_def.data() + storage_def.size(),
"Storage to create table for " + LogElement::name(), 0);
create->set(create->storage, storage_ast);
auto create = getCreateTableQuery();
InterpreterCreateQuery interpreter(create, context);
interpreter.setInternal(true);
@ -442,4 +442,28 @@ void SystemLog<LogElement>::prepareTable()
is_prepared = true;
}
template <typename LogElement>
ASTPtr SystemLog<LogElement>::getCreateTableQuery()
{
auto create = std::make_shared<ASTCreateQuery>();
create->database = table_id.database_name;
create->table = table_id.table_name;
Block sample = LogElement::createBlock();
auto new_columns_list = std::make_shared<ASTColumns>();
new_columns_list->set(new_columns_list->columns, InterpreterCreateQuery::formatColumns(sample.getNamesAndTypesList()));
create->set(create->columns_list, new_columns_list);
ParserStorage storage_parser;
ASTPtr storage_ast = parseQuery(
storage_parser, storage_def.data(), storage_def.data() + storage_def.size(),
"Storage to create table for " + LogElement::name(), 0);
create->set(create->storage, storage_ast);
return create;
}
}