Merge pull request #6103 from nikitamikhaylov/system_text_log

Allow to write ClickHouse text logs into system table.
This commit is contained in:
alexey-milovidov 2019-08-04 17:51:34 +03:00 committed by GitHub
commit a0599214ef
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 272 additions and 22 deletions

View File

@ -39,6 +39,7 @@
#include <Interpreters/ProcessList.h>
#include <Interpreters/loadMetadata.h>
#include <Interpreters/DNSCacheUpdater.h>
#include <Interpreters/SystemLog.cpp>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/System/attachSystemTables.h>
#include <AggregateFunctions/registerAggregateFunctions.h>
@ -54,6 +55,7 @@
#include "Common/config_version.h"
#include "MySQLHandlerFactory.h"
#if defined(__linux__)
#include <Common/hasLinuxCapability.h>
#include <sys/mman.h>
@ -271,12 +273,15 @@ int Server::main(const std::vector<std::string> & /*args*/)
* table engines could use Context on destroy.
*/
LOG_INFO(log, "Shutting down storages.");
if (text_log)
text_log->shutdown();
global_context->shutdown();
LOG_DEBUG(log, "Shutted down storages.");
/** Explicitly destroy Context. It is more convenient than in destructor of Server, because logger is still available.
* At this moment, no one could own shared part of Context.
*/
text_log.reset();
global_context.reset();
LOG_DEBUG(log, "Destroyed global context.");
@ -408,6 +413,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
main_config_zk_changed_event,
[&](ConfigurationPtr config)
{
setTextLog(text_log);
buildLoggers(*config, logger());
global_context->setClustersConfig(config);
global_context->setMacros(std::make_unique<Macros>(*config, "macros"));
@ -493,11 +499,15 @@ int Server::main(const std::vector<std::string> & /*args*/)
format_schema_path.createDirectories();
LOG_INFO(log, "Loading metadata from " + path);
/// Create text_log instance
text_log = createSystemLog<TextLog>(*global_context, "system", "text_log", global_context->getConfigRef(), "text_log");
try
{
loadMetadataSystem(*global_context);
/// After attaching system databases we can initialize system log.
global_context->initializeSystemLogs();
global_context->initializeSystemLogs(text_log);
/// After the system database is created, attach virtual system tables (in addition to query_log and part_log)
attachSystemTablesServer(*global_context->getDatabase("system"), has_zookeeper);
/// Then, load remaining databases

View File

@ -57,6 +57,7 @@ protected:
private:
std::unique_ptr<Context> global_context;
std::shared_ptr<TextLog> text_log;
};
}

View File

@ -0,0 +1,7 @@
<yandex>
<text_log>
<database>system</database>
<table>text_log</table>
<flush_interval_milliseconds>7500</flush_interval_milliseconds>
</text_log>
</yandex>

View File

@ -322,6 +322,14 @@
</part_log>
-->
<!-- Uncomment to write text log into table.
Text log contains all information from usual server log but stores it in structured and efficient way.
<text_log>
<database>system</database>
<table>text_log</table>
<flush_interval_milliseconds>7500</flush_interval_milliseconds>
</text_log>
-->
<!-- Parameters for embedded dictionaries, used in Yandex.Metrica.
See https://clickhouse.yandex/docs/en/dicts/internal_dicts/

View File

@ -1645,10 +1645,11 @@ Compiler & Context::getCompiler()
}
void Context::initializeSystemLogs()
void Context::initializeSystemLogs(std::shared_ptr<TextLog> text_log)
{
auto lock = getLock();
shared->system_logs.emplace(*global_context, getConfigRef());
shared->system_logs->text_log = text_log;
}
bool Context::hasTraceCollector()
@ -1706,11 +1707,22 @@ std::shared_ptr<TraceLog> Context::getTraceLog()
auto lock = getLock();
if (!shared->system_logs || !shared->system_logs->trace_log)
return nullptr;
return {};
return shared->system_logs->trace_log;
}
std::shared_ptr<TextLog> Context::getTextLog()
{
auto lock = getLock();
if (!shared->system_logs)
if (auto log = shared->system_logs->text_log.lock())
return log;
return {};
}
CompressionCodecPtr Context::chooseCompressionCodec(size_t part_size, double part_size_ratio) const
{

View File

@ -62,6 +62,7 @@ class Clusters;
class QueryLog;
class QueryThreadLog;
class PartLog;
class TextLog;
class TraceLog;
struct MergeTreeSettings;
class IDatabase;
@ -423,7 +424,7 @@ public:
Compiler & getCompiler();
/// Call after initialization before using system logs. Call for global context.
void initializeSystemLogs();
void initializeSystemLogs(std::shared_ptr<TextLog> text_log);
void initializeTraceCollector();
bool hasTraceCollector();
@ -431,8 +432,8 @@ public:
/// Nullptr if the query log is not ready for this moment.
std::shared_ptr<QueryLog> getQueryLog();
std::shared_ptr<QueryThreadLog> getQueryThreadLog();
std::shared_ptr<TraceLog> getTraceLog();
std::shared_ptr<TextLog> getTextLog();
/// Returns an object used to log opertaions with parts if it possible.
/// Provide table name to make required cheks.

View File

@ -15,6 +15,7 @@
#include <Interpreters/PartLog.h>
#include <Interpreters/QueryThreadLog.h>
#include <Interpreters/TraceLog.h>
#include <Interpreters/TextLog.h>
#include <Databases/IDatabase.h>
#include <Storages/StorageDistributed.h>
#include <Storages/StorageReplicatedMergeTree.h>
@ -230,7 +231,8 @@ BlockIO InterpreterSystemQuery::execute()
[&] () { if (auto query_log = context.getQueryLog()) query_log->flush(); },
[&] () { if (auto part_log = context.getPartLog("")) part_log->flush(); },
[&] () { if (auto query_thread_log = context.getQueryThreadLog()) query_thread_log->flush(); },
[&] () { if (auto trace_log = context.getTraceLog()) trace_log->flush(); }
[&] () { if (auto trace_log = context.getTraceLog()) trace_log->flush(); },
[&] () { if (auto text_log = context.getTextLog()) text_log->flush(); }
);
break;
case Type::STOP_LISTEN_QUERIES:

View File

@ -2,6 +2,7 @@
#include <Interpreters/QueryLog.h>
#include <Interpreters/QueryThreadLog.h>
#include <Interpreters/PartLog.h>
#include <Interpreters/TextLog.h>
#include <Interpreters/TraceLog.h>
#include <Poco/Util/AbstractConfiguration.h>
@ -56,7 +57,6 @@ SystemLogs::~SystemLogs()
shutdown();
}
void SystemLogs::shutdown()
{
if (query_log)

View File

@ -59,9 +59,9 @@ class Context;
class QueryLog;
class QueryThreadLog;
class PartLog;
class TextLog;
class TraceLog;
/// System logs should be destroyed in destructor of the last Context and before tables,
/// because SystemLog destruction makes insert query while flushing data into underlying tables
struct SystemLogs
@ -75,6 +75,8 @@ struct SystemLogs
std::shared_ptr<QueryThreadLog> query_thread_log; /// Used to log query threads.
std::shared_ptr<PartLog> part_log; /// Used to log operations with parts
std::shared_ptr<TraceLog> trace_log; /// Used to log traces from query profiler
std::weak_ptr<TextLog> text_log; /// Used to log all text. We use weak_ptr, because this log is
/// a server's field.
String part_log_database;
};

View File

@ -0,0 +1,77 @@
#include <Interpreters/TextLog.h>
#include <DataTypes/DataTypeEnum.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeString.h>
#include <Common/ClickHouseRevision.h>
#include <array>
namespace DB
{
template <> struct NearestFieldTypeImpl<Message::Priority> { using Type = UInt64; };
Block TextLogElement::createBlock()
{
auto priority_datatype = std::make_shared<DataTypeEnum8>(
DataTypeEnum8::Values
{
{"Fatal", static_cast<Int8>(Message::PRIO_FATAL)},
{"Critical", static_cast<Int8>(Message::PRIO_CRITICAL)},
{"Error", static_cast<Int8>(Message::PRIO_ERROR)},
{"Warning", static_cast<Int8>(Message::PRIO_WARNING)},
{"Notice", static_cast<Int8>(Message::PRIO_NOTICE)},
{"Information", static_cast<Int8>(Message::PRIO_INFORMATION)},
{"Debug", static_cast<Int8>(Message::PRIO_DEBUG)},
{"Trace", static_cast<Int8>(Message::PRIO_TRACE)}
});
return
{
{std::make_shared<DataTypeDate>(), "event_date"},
{std::make_shared<DataTypeDateTime>(), "event_time"},
{std::make_shared<DataTypeUInt32>(), "microseconds"},
{std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>()), "thread_name"},
{std::make_shared<DataTypeUInt32>(), "thread_number"},
{std::make_shared<DataTypeUInt32>(), "os_thread_id"},
{std::move(priority_datatype), "level"},
{std::make_shared<DataTypeString>(), "query_id"},
{std::make_shared<DataTypeString>(), "logger_name"},
{std::make_shared<DataTypeString>(), "message"},
{std::make_shared<DataTypeUInt32>(), "revision"},
{std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>()), "source_file"},
{std::make_shared<DataTypeUInt64>(), "source_line"}
};
}
void TextLogElement::appendToBlock(Block & block) const
{
MutableColumns columns = block.mutateColumns();
size_t i = 0;
columns[i++]->insert(DateLUT::instance().toDayNum(event_time));
columns[i++]->insert(event_time);
columns[i++]->insert(microseconds);
columns[i++]->insertData(thread_name.data(), thread_name.size());
columns[i++]->insert(thread_number);
columns[i++]->insert(os_thread_id);
columns[i++]->insert(level);
columns[i++]->insert(query_id);
columns[i++]->insert(logger_name);
columns[i++]->insert(message);
columns[i++]->insert(ClickHouseRevision::get());
columns[i++]->insert(source_file);
columns[i++]->insert(source_line);
}
}

View File

@ -0,0 +1,37 @@
#pragma once
#include <Interpreters/SystemLog.h>
namespace DB
{
using Poco::Message;
struct TextLogElement
{
time_t event_time{};
UInt32 microseconds;
String thread_name;
UInt32 os_thread_id;
UInt32 thread_number;
Message::Priority level = Message::PRIO_TRACE;
String query_id;
String logger_name;
String message;
String source_file;
UInt64 source_line;
static std::string name() { return "TextLog"; }
static Block createBlock();
void appendToBlock(Block & block) const;
};
class TextLog : public SystemLog<TextLogElement>
{
using SystemLog<TextLogElement>::SystemLog;
};
}

View File

@ -1,2 +1,7 @@
<yandex>
<text_log>
<database>system</database>
<table>text_log</table>
<flush_interval_milliseconds>7500</flush_interval_milliseconds>
</text_log>
</yandex>

View File

@ -0,0 +1,2 @@
6103
1

View File

@ -0,0 +1,17 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. $CURDIR/../shell_config.sh
${CLICKHOUSE_CLIENT} --query="SELECT 6103"
for (( i=1; i <= 50; i++ ))
do
${CLICKHOUSE_CLIENT} --query="SYSTEM FLUSH LOGS"
sleep 0.1;
if [[ $($CLICKHOUSE_CURL -sS "$CLICKHOUSE_URL" -d "SELECT count() > 0 FROM system.text_log WHERE position(system.text_log.message, 'SELECT 6103') > 0") == 1 ]]; then echo 1; exit; fi;
done;

View File

@ -19,6 +19,7 @@ CMD dpkg -i package_folder/clickhouse-common-static_*.deb; \
ln -s /usr/share/clickhouse-test/config/zookeeper.xml /etc/clickhouse-server/config.d/; \
ln -s /usr/share/clickhouse-test/config/listen.xml /etc/clickhouse-server/config.d/; \
ln -s /usr/share/clickhouse-test/config/part_log.xml /etc/clickhouse-server/config.d/; \
ln -s /usr/share/clickhouse-test/config/text_log.xml /etc/clickhouse-server/config.d/; \
ln -s /usr/share/clickhouse-test/config/log_queries.xml /etc/clickhouse-server/users.d/; \
ln -s /usr/share/clickhouse-test/config/readonly.xml /etc/clickhouse-server/users.d/; \
ln -s /usr/share/clickhouse-test/config/ints_dictionary.xml /etc/clickhouse-server/; \

View File

@ -45,6 +45,7 @@ chmod 777 -R /var/log/clickhouse-server/
ln -s /usr/share/clickhouse-test/config/zookeeper.xml /etc/clickhouse-server/config.d/; \
ln -s /usr/share/clickhouse-test/config/listen.xml /etc/clickhouse-server/config.d/; \
ln -s /usr/share/clickhouse-test/config/part_log.xml /etc/clickhouse-server/config.d/; \
ln -s /usr/share/clickhouse-test/config/text_log.xml /etc/clickhouse-server/config.d/; \
ln -s /usr/share/clickhouse-test/config/log_queries.xml /etc/clickhouse-server/users.d/; \
ln -s /usr/share/clickhouse-test/config/readonly.xml /etc/clickhouse-server/users.d/; \
ln -s /usr/share/clickhouse-test/config/ints_dictionary.xml /etc/clickhouse-server/; \

View File

@ -39,6 +39,7 @@ CMD dpkg -i package_folder/clickhouse-common-static_*.deb; \
ln -s /usr/share/clickhouse-test/config/zookeeper.xml /etc/clickhouse-server/config.d/; \
ln -s /usr/share/clickhouse-test/config/listen.xml /etc/clickhouse-server/config.d/; \
ln -s /usr/share/clickhouse-test/config/part_log.xml /etc/clickhouse-server/config.d/; \
ln -s /usr/share/clickhouse-test/config/text_log.xml /etc/clickhouse-server/config.d/; \
ln -s /usr/share/clickhouse-test/config/query_masking_rules.xml /etc/clickhouse-server/config.d/; \
ln -s /usr/share/clickhouse-test/config/log_queries.xml /etc/clickhouse-server/users.d/; \
ln -s /usr/share/clickhouse-test/config/readonly.xml /etc/clickhouse-server/users.d/; \

View File

@ -47,6 +47,7 @@ chmod 777 -R /var/log/clickhouse-server/
ln -s /usr/share/clickhouse-test/config/zookeeper.xml /etc/clickhouse-server/config.d/; \
ln -s /usr/share/clickhouse-test/config/listen.xml /etc/clickhouse-server/config.d/; \
ln -s /usr/share/clickhouse-test/config/part_log.xml /etc/clickhouse-server/config.d/; \
ln -s /usr/share/clickhouse-test/config/text_log.xml /etc/clickhouse-server/config.d/; \
ln -s /usr/share/clickhouse-test/config/log_queries.xml /etc/clickhouse-server/users.d/; \
ln -s /usr/share/clickhouse-test/config/readonly.xml /etc/clickhouse-server/users.d/; \
ln -s /usr/share/clickhouse-test/config/ints_dictionary.xml /etc/clickhouse-server/; \

View File

@ -20,17 +20,23 @@ using DB::CurrentThread;
/// Logs a message to a specified logger with that level.
#define LOG_SIMPLE(logger, message, priority, PRIORITY) do \
{ \
const bool is_clients_log = (CurrentThread::getGroup() != nullptr) && \
(CurrentThread::getGroup()->client_logs_level >= (priority)); \
if ((logger)->is((PRIORITY)) || is_clients_log) { \
std::stringstream oss_internal_rare; \
oss_internal_rare << message; \
if (auto channel = (logger)->getChannel()) { \
channel->log(Message((logger)->name(), oss_internal_rare.str(), (PRIORITY))); \
} \
} \
#define LOG_SIMPLE(logger, message, priority, PRIORITY) do \
{ \
const bool is_clients_log = (CurrentThread::getGroup() != nullptr) && \
(CurrentThread::getGroup()->client_logs_level >= (priority)); \
if ((logger)->is((PRIORITY)) || is_clients_log) { \
std::stringstream oss_internal_rare; \
oss_internal_rare << message; \
if (auto channel = (logger)->getChannel()) { \
std::string file_function; \
file_function += __FILE__; \
file_function += ", "; \
file_function += __PRETTY_FUNCTION__; \
Message poco_message((logger)->name(), oss_internal_rare.str(), \
(PRIORITY), file_function.c_str(), __LINE__); \
channel->log(poco_message); \
} \
} \
} while (false)

View File

@ -5,13 +5,13 @@
#include <Poco/Util/AbstractConfiguration.h>
#include "OwnFormattingChannel.h"
#include "OwnPatternFormatter.h"
#include "OwnSplitChannel.h"
#include <Poco/ConsoleChannel.h>
#include <Poco/File.h>
#include <Poco/Logger.h>
#include <Poco/Net/RemoteSyslogChannel.h>
#include <Poco/Path.h>
// TODO: move to libcommon
static std::string createDirectory(const std::string & file)
{
@ -22,18 +22,28 @@ static std::string createDirectory(const std::string & file)
return path.toString();
};
void Loggers::setTextLog(std::shared_ptr<DB::TextLog> log)
{
text_log = log;
}
void Loggers::buildLoggers(Poco::Util::AbstractConfiguration & config, Poco::Logger & logger /*_root*/, const std::string & cmd_name)
{
if (split)
if (auto log = text_log.lock())
split->addTextLog(log);
auto current_logger = config.getString("logger", "");
if (config_logger == current_logger)
return;
config_logger = current_logger;
bool is_daemon = config.getBool("application.runAsDaemon", false);
/// Split logs to ordinary log, error log, syslog and console.
/// Use extended interface of Channel for more comprehensive logging.
Poco::AutoPtr<DB::OwnSplitChannel> split = new DB::OwnSplitChannel;
split = new DB::OwnSplitChannel();
auto log_level = config.getString("logger.level", "trace");
const auto log_path = config.getString("logger.log", "");

View File

@ -3,6 +3,8 @@
#include <Poco/AutoPtr.h>
#include <Poco/FileChannel.h>
#include <Poco/Util/Application.h>
#include <Interpreters/TextLog.h>
#include "OwnSplitChannel.h"
namespace Poco::Util
{
@ -23,6 +25,8 @@ public:
return layer; /// layer setted in inheritor class BaseDaemonApplication.
}
void setTextLog(std::shared_ptr<DB::TextLog> log);
protected:
std::optional<size_t> layer;
@ -33,4 +37,7 @@ private:
Poco::AutoPtr<Poco::Channel> syslog_channel;
/// Previous value of logger element in config. It is used to reinitialize loggers whenever the value changed.
std::string config_logger;
std::weak_ptr<DB::TextLog> text_log;
Poco::AutoPtr<DB::OwnSplitChannel> split;
};

View File

@ -3,6 +3,7 @@
#include <iostream>
#include <Core/Block.h>
#include <Interpreters/InternalTextLogsQueue.h>
#include <Interpreters/TextLog.h>
#include <sys/time.h>
#include <Poco/Message.h>
#include <Common/CurrentThread.h>
@ -48,7 +49,37 @@ void OwnSplitChannel::log(const Poco::Message & msg)
logs_queue->emplace(std::move(columns));
}
/// TODO: Also log to system.internal_text_log table
/// Also log to system.text_log table
TextLogElement elem;
elem.event_time = msg_ext.time_seconds;
elem.microseconds = msg_ext.time_microseconds;
elem.thread_name = getThreadName();
elem.thread_number = msg_ext.thread_number;
try
{
elem.os_thread_id = CurrentThread::get().os_thread_id;
} catch (...)
{
elem.os_thread_id = 0;
}
elem.query_id = msg_ext.query_id;
elem.message = msg.getText();
elem.logger_name = msg.getSource();
elem.level = msg.getPriority();
if (msg.getSourceFile() != nullptr)
elem.source_file = msg.getSourceFile();
elem.source_line = msg.getSourceLine();
std::lock_guard<std::mutex> lock(text_log_mutex);
if (auto log = text_log.lock())
log->add(elem);
}
void OwnSplitChannel::addChannel(Poco::AutoPtr<Poco::Channel> channel)
@ -56,5 +87,10 @@ void OwnSplitChannel::addChannel(Poco::AutoPtr<Poco::Channel> channel)
channels.emplace_back(std::move(channel), dynamic_cast<ExtendedLogChannel *>(channel.get()));
}
void OwnSplitChannel::addTextLog(std::shared_ptr<DB::TextLog> log)
{
std::lock_guard<std::mutex> lock(text_log_mutex);
text_log = log;
}
}

View File

@ -3,6 +3,7 @@
#include <Poco/AutoPtr.h>
#include <Poco/Channel.h>
#include "ExtendedLogChannel.h"
#include <Interpreters/TextLog.h>
namespace DB
@ -19,11 +20,16 @@ public:
/// Adds a child channel
void addChannel(Poco::AutoPtr<Poco::Channel> channel);
void addTextLog(std::shared_ptr<DB::TextLog> log);
private:
using ChannelPtr = Poco::AutoPtr<Poco::Channel>;
/// Handler and its pointer casted to extended interface
using ExtendedChannelPtrPair = std::pair<ChannelPtr, ExtendedLogChannel *>;
std::vector<ExtendedChannelPtrPair> channels;
std::mutex text_log_mutex;
std::weak_ptr<DB::TextLog> text_log;
};
}