some changes to log all text logs

This commit is contained in:
Nikita Mikhaylov 2019-07-30 17:04:18 +03:00
parent 45c5dd8e61
commit e06b3b17b3
11 changed files with 79 additions and 49 deletions

View File

@ -52,6 +52,10 @@
#include "Common/config_version.h"
#include "MySQLHandlerFactory.h"
#include <Interpreters/TextLog.h>
#include <Interpreters/SystemLog.h>
#include <Interpreters/SystemLog.cpp>
#if defined(__linux__)
#include <Common/hasLinuxCapability.h>
#include <sys/mman.h>
@ -167,6 +171,8 @@ void Server::defineOptions(Poco::Util::OptionSet & _options)
BaseDaemon::defineOptions(_options);
}
int Server::main(const std::vector<std::string> & /*args*/)
{
Logger * log = &logger();
@ -174,6 +180,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
ThreadStatus thread_status;
registerFunctions();
registerAggregateFunctions();
registerTableFunctions();
@ -269,12 +276,15 @@ int Server::main(const std::vector<std::string> & /*args*/)
* table engines could use Context on destroy.
*/
LOG_INFO(log, "Shutting down storages.");
if (text_log)
text_log->shutdown();
global_context->shutdown();
LOG_DEBUG(log, "Shutted down storages.");
/** Explicitly destroy Context. It is more convenient than in destructor of Server, because logger is still available.
* At this moment, no one could own shared part of Context.
*/
text_log.reset();
global_context.reset();
LOG_DEBUG(log, "Destroyed global context.");
@ -397,6 +407,9 @@ int Server::main(const std::vector<std::string> & /*args*/)
if (config().has("macros"))
global_context->setMacros(std::make_unique<Macros>(config(), "macros"));
/// Create text_log instance
text_log = createSystemLog<TextLog>(*global_context, "system", "text_log", global_context->getConfigRef(), "text_log");
/// Initialize main config reloader.
std::string include_from_path = config().getString("include_from", "/etc/metrika.xml");
auto main_config_reloader = std::make_unique<ConfigReloader>(config_path,
@ -406,6 +419,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
main_config_zk_changed_event,
[&](ConfigurationPtr config)
{
setTextLog(text_log);
buildLoggers(*config, logger());
global_context->setClustersConfig(config);
global_context->setMacros(std::make_unique<Macros>(*config, "macros"));
@ -491,6 +505,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
format_schema_path.createDirectories();
LOG_INFO(log, "Loading metadata from " + path);
try
{
loadMetadataSystem(*global_context);

View File

@ -57,6 +57,7 @@ protected:
private:
std::unique_ptr<Context> global_context;
std::shared_ptr<TextLog> text_log;
};
}

View File

@ -1692,18 +1692,6 @@ std::shared_ptr<PartLog> Context::getPartLog(const String & part_database)
return shared->system_logs->part_log;
}
std::shared_ptr<TextLog> Context::getTextLog()
{
auto lock = getLock();
if (!shared->system_logs || !shared->system_logs->text_log)
return {};
return shared->system_logs->text_log;
}
std::shared_ptr<TraceLog> Context::getTraceLog()
{
auto lock = getLock();

View File

@ -428,7 +428,6 @@ public:
/// Nullptr if the query log is not ready for this moment.
std::shared_ptr<QueryLog> getQueryLog();
std::shared_ptr<QueryThreadLog> getQueryThreadLog();
std::shared_ptr<TextLog> getTextLog();
std::shared_ptr<TraceLog> getTraceLog();
/// Returns an object used to log opertaions with parts if it possible.

View File

@ -46,7 +46,6 @@ SystemLogs::SystemLogs(Context & global_context, const Poco::Util::AbstractConfi
query_log = createSystemLog<QueryLog>(global_context, "system", "query_log", config, "query_log");
query_thread_log = createSystemLog<QueryThreadLog>(global_context, "system", "query_thread_log", config, "query_thread_log");
part_log = createSystemLog<PartLog>(global_context, "system", "part_log", config, "part_log");
text_log = createSystemLog<TextLog>(global_context, "system", "text_log", config, "text_log");
trace_log = createSystemLog<TraceLog>(global_context, "system", "trace_log", config, "trace_log");
part_log_database = config.getString("part_log.database", "system");
@ -58,7 +57,6 @@ SystemLogs::~SystemLogs()
shutdown();
}
void SystemLogs::shutdown()
{
if (query_log)
@ -67,8 +65,6 @@ void SystemLogs::shutdown()
query_thread_log->shutdown();
if (part_log)
part_log->shutdown();
if (text_log)
text_log->shutdown();
if (trace_log)
trace_log->shutdown();
}

View File

@ -62,7 +62,6 @@ class PartLog;
class TextLog;
class TraceLog;
/// System logs should be destroyed in destructor of the last Context and before tables,
/// because SystemLog destruction makes insert query while flushing data into underlying tables
struct SystemLogs
@ -75,7 +74,6 @@ struct SystemLogs
std::shared_ptr<QueryLog> query_log; /// Used to log queries.
std::shared_ptr<QueryThreadLog> query_thread_log; /// Used to log query threads.
std::shared_ptr<PartLog> part_log; /// Used to log operations with parts
std::shared_ptr<TextLog> text_log; /// Used to save all text logs.
std::shared_ptr<TraceLog> trace_log; /// Used to log traces from query profiler
String part_log_database;
@ -309,6 +307,8 @@ void SystemLog<LogElement>::threadFunction()
}
template <typename LogElement>
void SystemLog<LogElement>::flushImpl(EntryType reason)
{
@ -442,3 +442,5 @@ void SystemLog<LogElement>::prepareTable()
}
}

View File

@ -28,7 +28,8 @@ using DB::CurrentThread;
std::stringstream oss_internal_rare; \
oss_internal_rare << message; \
if (auto channel = (logger)->getChannel()) { \
channel->log(Message((logger)->name(), oss_internal_rare.str(), (PRIORITY))); \
channel->log(Message((logger)->name(), oss_internal_rare.str(), \
(PRIORITY), __FILE__, __LINE__)); \
} \
} \
} while (false)

View File

@ -5,13 +5,13 @@
#include <Poco/Util/AbstractConfiguration.h>
#include "OwnFormattingChannel.h"
#include "OwnPatternFormatter.h"
#include "OwnSplitChannel.h"
#include <Poco/ConsoleChannel.h>
#include <Poco/File.h>
#include <Poco/Logger.h>
#include <Poco/Net/RemoteSyslogChannel.h>
#include <Poco/Path.h>
// TODO: move to libcommon
static std::string createDirectory(const std::string & file)
{
@ -22,18 +22,26 @@ static std::string createDirectory(const std::string & file)
return path.toString();
};
void Loggers::setTextLog(std::shared_ptr<DB::TextLog> log) {
text_log = log;
}
void Loggers::buildLoggers(Poco::Util::AbstractConfiguration & config, Poco::Logger & logger /*_root*/, const std::string & cmd_name)
{
if (split && !text_log.expired()) {
split->addTextLog(text_log);
}
auto current_logger = config.getString("logger", "");
if (config_logger == current_logger)
if (config_logger == current_logger) {
return;
}
config_logger = current_logger;
bool is_daemon = config.getBool("application.runAsDaemon", false);
/// Split logs to ordinary log, error log, syslog and console.
/// Use extended interface of Channel for more comprehensive logging.
Poco::AutoPtr<DB::OwnSplitChannel> split = new DB::OwnSplitChannel;
split = new DB::OwnSplitChannel();
auto log_level = config.getString("logger.level", "trace");
const auto log_path = config.getString("logger.log", "");

View File

@ -3,6 +3,8 @@
#include <Poco/AutoPtr.h>
#include <Poco/FileChannel.h>
#include <Poco/Util/Application.h>
#include <Interpreters/TextLog.h>
#include "OwnSplitChannel.h"
namespace Poco::Util
{
@ -23,6 +25,8 @@ public:
return layer; /// layer setted in inheritor class BaseDaemonApplication.
}
void setTextLog(std::shared_ptr<DB::TextLog> log);
protected:
std::optional<size_t> layer;
@ -33,4 +37,7 @@ private:
Poco::AutoPtr<Poco::Channel> syslog_channel;
/// Previous value of logger element in config. It is used to reinitialize loggers whenever the value changed.
std::string config_logger;
std::weak_ptr<DB::TextLog> text_log;
Poco::AutoPtr<DB::OwnSplitChannel> split;
};

View File

@ -49,34 +49,36 @@ void OwnSplitChannel::log(const Poco::Message & msg)
logs_queue->emplace(std::move(columns));
}
/// Also log to system.internal_text_log table
ThreadGroupStatusPtr thread_group = CurrentThread::getGroup();
if (thread_group && thread_group->global_context)
/// Also log to system.text_log table
TextLogElement elem;
elem.event_time = msg_ext.time_seconds;
elem.microseconds = msg_ext.time_microseconds;
elem.thread_name = getThreadName();
elem.thread_number = msg_ext.thread_number;
try
{
if (auto text_log = thread_group->global_context->getTextLog())
{
TextLogElement elem;
elem.event_time = msg_ext.time_seconds;
elem.microseconds = msg_ext.time_microseconds;
elem.thread_name = getThreadName();
elem.thread_number = msg_ext.thread_number;
elem.os_thread_id = CurrentThread::get().os_thread_id;
elem.query_id = msg_ext.query_id;
elem.message = msg.getText();
elem.logger_name = msg.getSource();
elem.level = msg.getPriority();
if (msg.getSourceFile() != nullptr)
elem.source_file = msg.getSourceFile();
elem.source_line = msg.getSourceLine();
text_log->add(elem);
}
elem.os_thread_id = CurrentThread::get().os_thread_id;
} catch (...)
{
elem.os_thread_id = 0;
}
elem.query_id = msg_ext.query_id;
elem.message = msg.getText();
elem.logger_name = msg.getSource();
elem.level = msg.getPriority();
if (msg.getSourceFile() != nullptr)
elem.source_file = msg.getSourceFile();
elem.source_line = msg.getSourceLine();
if (auto log = text_log.lock())
log->add(elem);
}
void OwnSplitChannel::addChannel(Poco::AutoPtr<Poco::Channel> channel)
@ -84,5 +86,9 @@ void OwnSplitChannel::addChannel(Poco::AutoPtr<Poco::Channel> channel)
channels.emplace_back(std::move(channel), dynamic_cast<ExtendedLogChannel *>(channel.get()));
}
void OwnSplitChannel::addTextLog(std::weak_ptr<DB::TextLog> log)
{
text_log = log;
}
}

View File

@ -3,6 +3,7 @@
#include <Poco/AutoPtr.h>
#include <Poco/Channel.h>
#include "ExtendedLogChannel.h"
#include <Interpreters/TextLog.h>
namespace DB
@ -13,17 +14,23 @@ namespace DB
class OwnSplitChannel : public Poco::Channel
{
public:
OwnSplitChannel() = default;
/// Makes an extended message from msg and passes it to the client logs queue and child (if possible)
void log(const Poco::Message & msg) override;
/// Adds a child channel
void addChannel(Poco::AutoPtr<Poco::Channel> channel);
void addTextLog(std::weak_ptr<DB::TextLog> log);
private:
using ChannelPtr = Poco::AutoPtr<Poco::Channel>;
/// Handler and its pointer casted to extended interface
using ExtendedChannelPtrPair = std::pair<ChannelPtr, ExtendedLogChannel *>;
std::vector<ExtendedChannelPtrPair> channels;
std::weak_ptr<DB::TextLog> text_log;
};
}