Less wrong

This commit is contained in:
Alexey Milovidov 2020-04-20 02:11:41 +03:00
parent 39c6c9a5d7
commit 237db26688
4 changed files with 31 additions and 24 deletions

View File

@ -30,8 +30,7 @@ std::shared_ptr<TSystemLog> createSystemLog(
const String & default_database_name, const String & default_database_name,
const String & default_table_name, const String & default_table_name,
const Poco::Util::AbstractConfiguration & config, const Poco::Util::AbstractConfiguration & config,
const String & config_prefix, const String & config_prefix)
bool lazy_load)
{ {
if (!config.has(config_prefix)) if (!config.has(config_prefix))
return {}; return {};
@ -64,7 +63,7 @@ std::shared_ptr<TSystemLog> createSystemLog(
size_t flush_interval_milliseconds = config.getUInt64(config_prefix + ".flush_interval_milliseconds", DEFAULT_SYSTEM_LOG_FLUSH_INTERVAL_MILLISECONDS); size_t flush_interval_milliseconds = config.getUInt64(config_prefix + ".flush_interval_milliseconds", DEFAULT_SYSTEM_LOG_FLUSH_INTERVAL_MILLISECONDS);
return std::make_shared<TSystemLog>(context, database, table, engine, flush_interval_milliseconds, lazy_load); return std::make_shared<TSystemLog>(context, database, table, engine, flush_interval_milliseconds);
} }
} }
@ -72,14 +71,12 @@ std::shared_ptr<TSystemLog> createSystemLog(
SystemLogs::SystemLogs(Context & global_context, const Poco::Util::AbstractConfiguration & config) SystemLogs::SystemLogs(Context & global_context, const Poco::Util::AbstractConfiguration & config)
{ {
bool lazy_load = config.getBool("system_tables_lazy_load", true); query_log = createSystemLog<QueryLog>(global_context, "system", "query_log", config, "query_log");
query_thread_log = createSystemLog<QueryThreadLog>(global_context, "system", "query_thread_log", config, "query_thread_log");
query_log = createSystemLog<QueryLog>(global_context, "system", "query_log", config, "query_log", lazy_load); part_log = createSystemLog<PartLog>(global_context, "system", "part_log", config, "part_log");
query_thread_log = createSystemLog<QueryThreadLog>(global_context, "system", "query_thread_log", config, "query_thread_log", lazy_load); trace_log = createSystemLog<TraceLog>(global_context, "system", "trace_log", config, "trace_log");
part_log = createSystemLog<PartLog>(global_context, "system", "part_log", config, "part_log", lazy_load); text_log = createSystemLog<TextLog>(global_context, "system", "text_log", config, "text_log");
trace_log = createSystemLog<TraceLog>(global_context, "system", "trace_log", config, "trace_log", lazy_load); metric_log = createSystemLog<MetricLog>(global_context, "system", "metric_log", config, "metric_log");
text_log = createSystemLog<TextLog>(global_context, "system", "text_log", config, "text_log", lazy_load);
metric_log = createSystemLog<MetricLog>(global_context, "system", "metric_log", config, "metric_log", lazy_load);
if (metric_log) if (metric_log)
{ {
@ -99,6 +96,14 @@ SystemLogs::SystemLogs(Context & global_context, const Poco::Util::AbstractConfi
logs.emplace_back(text_log.get()); logs.emplace_back(text_log.get());
if (metric_log) if (metric_log)
logs.emplace_back(metric_log.get()); logs.emplace_back(metric_log.get());
bool lazy_load = config.getBool("system_tables_lazy_load", true);
for (auto & log : logs)
{
if (!lazy_load)
log->prepareTable();
log->startup();
}
} }

View File

@ -73,6 +73,8 @@ public:
virtual String getName() = 0; virtual String getName() = 0;
virtual ASTPtr getCreateTableQuery() = 0; virtual ASTPtr getCreateTableQuery() = 0;
virtual void flush() = 0; virtual void flush() = 0;
virtual void prepareTable() = 0;
virtual void startup() = 0;
virtual void shutdown() = 0; virtual void shutdown() = 0;
virtual ~ISystemLog() = default; virtual ~ISystemLog() = default;
}; };
@ -117,8 +119,7 @@ public:
const String & database_name_, const String & database_name_,
const String & table_name_, const String & table_name_,
const String & storage_def_, const String & storage_def_,
size_t flush_interval_milliseconds_, size_t flush_interval_milliseconds_);
bool lazy_load);
/** Append a record into log. /** Append a record into log.
* Writing to table will be done asynchronously and in case of failure, record could be lost. * Writing to table will be done asynchronously and in case of failure, record could be lost.
@ -130,6 +131,9 @@ public:
/// Flush data in the buffer to disk /// Flush data in the buffer to disk
void flush() override; void flush() override;
/// Start the background thread.
void startup() override;
/// Stop the background flush thread before destructor. No more data will be written. /// Stop the background flush thread before destructor. No more data will be written.
void shutdown() override void shutdown() override
{ {
@ -178,7 +182,7 @@ private:
* Renames old table if its structure is not suitable. * Renames old table if its structure is not suitable.
* This cannot be done in constructor to avoid deadlock while renaming a table under locked Context when SystemLog object is created. * This cannot be done in constructor to avoid deadlock while renaming a table under locked Context when SystemLog object is created.
*/ */
void prepareTable(); void prepareTable() override;
/// flushImpl can be executed only in saving_thread. /// flushImpl can be executed only in saving_thread.
void flushImpl(const std::vector<LogElement> & to_flush, uint64_t to_flush_end); void flushImpl(const std::vector<LogElement> & to_flush, uint64_t to_flush_end);
@ -190,8 +194,7 @@ SystemLog<LogElement>::SystemLog(Context & context_,
const String & database_name_, const String & database_name_,
const String & table_name_, const String & table_name_,
const String & storage_def_, const String & storage_def_,
size_t flush_interval_milliseconds_, size_t flush_interval_milliseconds_)
bool lazy_load)
: context(context_) : context(context_)
, table_id(database_name_, table_name_) , table_id(database_name_, table_name_)
, storage_def(storage_def_), , storage_def(storage_def_),
@ -199,12 +202,12 @@ SystemLog<LogElement>::SystemLog(Context & context_,
{ {
assert(database_name_ == DatabaseCatalog::SYSTEM_DATABASE); assert(database_name_ == DatabaseCatalog::SYSTEM_DATABASE);
log = &Logger::get("SystemLog (" + database_name_ + "." + table_name_ + ")"); log = &Logger::get("SystemLog (" + database_name_ + "." + table_name_ + ")");
}
if (!lazy_load)
{
prepareTable();
}
template <typename LogElement>
void SystemLog<LogElement>::startup()
{
saving_thread = ThreadFromGlobalPool([this] { savingThreadFunction(); }); saving_thread = ThreadFromGlobalPool([this] { savingThreadFunction(); });
} }

View File

@ -72,9 +72,9 @@ void TextLogElement::appendToBlock(Block & block) const
TextLog::TextLog(Context & context_, const String & database_name_, TextLog::TextLog(Context & context_, const String & database_name_,
const String & table_name_, const String & storage_def_, const String & table_name_, const String & storage_def_,
size_t flush_interval_milliseconds_, bool lazy_load) size_t flush_interval_milliseconds_)
: SystemLog<TextLogElement>(context_, database_name_, table_name_, : SystemLog<TextLogElement>(context_, database_name_, table_name_,
storage_def_, flush_interval_milliseconds_, lazy_load) storage_def_, flush_interval_milliseconds_)
{ {
// SystemLog methods may write text logs, so we disable logging for the text // SystemLog methods may write text logs, so we disable logging for the text
// log table to avoid recursion. // log table to avoid recursion.

View File

@ -36,8 +36,7 @@ public:
const String & database_name_, const String & database_name_,
const String & table_name_, const String & table_name_,
const String & storage_def_, const String & storage_def_,
size_t flush_interval_milliseconds_, size_t flush_interval_milliseconds_);
bool lazy_load);
}; };
} }