Merge pull request #29219 from evillique/log_lz4_streaming

Add LZ4 stream compression of logs
This commit is contained in:
Nikolay Degterinsky 2021-11-17 21:22:57 +03:00 committed by GitHub
commit fc9ef14a73
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 80 additions and 6 deletions

View File

@ -63,6 +63,9 @@
#include <Common/Elf.h> #include <Common/Elf.h>
#include <filesystem> #include <filesystem>
#include <loggers/OwnFormattingChannel.h>
#include <loggers/OwnPatternFormatter.h>
#include <Common/config_version.h> #include <Common/config_version.h>
#if defined(OS_DARWIN) #if defined(OS_DARWIN)
@ -1001,6 +1004,14 @@ void BaseDaemon::setupWatchdog()
memcpy(argv0, new_process_name, std::min(strlen(new_process_name), original_process_name.size())); memcpy(argv0, new_process_name, std::min(strlen(new_process_name), original_process_name.size()));
} }
/// If streaming compression of logs is used then we write watchdog logs to cerr
if (config().getRawString("logger.stream_compress", "false") == "true")
{
Poco::AutoPtr<OwnPatternFormatter> pf = new OwnPatternFormatter;
Poco::AutoPtr<DB::OwnFormattingChannel> log = new DB::OwnFormattingChannel(pf, new Poco::ConsoleChannel(std::cerr));
logger().setChannel(log);
}
logger().information(fmt::format("Will watch for the process with pid {}", pid)); logger().information(fmt::format("Will watch for the process with pid {}", pid));
/// Forward signals to the child process. /// Forward signals to the child process.

View File

@ -62,7 +62,13 @@ void Loggers::buildLoggers(Poco::Util::AbstractConfiguration & config, Poco::Log
if (!log_path.empty()) if (!log_path.empty())
{ {
createDirectory(log_path); createDirectory(log_path);
std::cerr << "Logging " << log_level_string << " to " << log_path << std::endl;
std::string ext;
if (config.getRawString("logger.stream_compress", "false") == "true")
ext = ".lz4";
std::cerr << "Logging " << log_level_string << " to " << log_path << ext << std::endl;
auto log_level = Poco::Logger::parseLevel(log_level_string); auto log_level = Poco::Logger::parseLevel(log_level_string);
if (log_level > max_log_level) if (log_level > max_log_level)
{ {
@ -75,6 +81,7 @@ void Loggers::buildLoggers(Poco::Util::AbstractConfiguration & config, Poco::Log
log_file->setProperty(Poco::FileChannel::PROP_ROTATION, config.getRawString("logger.size", "100M")); log_file->setProperty(Poco::FileChannel::PROP_ROTATION, config.getRawString("logger.size", "100M"));
log_file->setProperty(Poco::FileChannel::PROP_ARCHIVE, "number"); log_file->setProperty(Poco::FileChannel::PROP_ARCHIVE, "number");
log_file->setProperty(Poco::FileChannel::PROP_COMPRESS, config.getRawString("logger.compress", "true")); log_file->setProperty(Poco::FileChannel::PROP_COMPRESS, config.getRawString("logger.compress", "true"));
log_file->setProperty(Poco::FileChannel::PROP_STREAMCOMPRESS, config.getRawString("logger.stream_compress", "false"));
log_file->setProperty(Poco::FileChannel::PROP_PURGECOUNT, config.getRawString("logger.count", "1")); log_file->setProperty(Poco::FileChannel::PROP_PURGECOUNT, config.getRawString("logger.count", "1"));
log_file->setProperty(Poco::FileChannel::PROP_FLUSH, config.getRawString("logger.flush", "true")); log_file->setProperty(Poco::FileChannel::PROP_FLUSH, config.getRawString("logger.flush", "true"));
log_file->setProperty(Poco::FileChannel::PROP_ROTATEONOPEN, config.getRawString("logger.rotateOnOpen", "false")); log_file->setProperty(Poco::FileChannel::PROP_ROTATEONOPEN, config.getRawString("logger.rotateOnOpen", "false"));
@ -100,13 +107,18 @@ void Loggers::buildLoggers(Poco::Util::AbstractConfiguration & config, Poco::Log
max_log_level = errorlog_level; max_log_level = errorlog_level;
} }
std::cerr << "Logging errors to " << errorlog_path << std::endl; std::string ext;
if (config.getRawString("logger.stream_compress", "false") == "true")
ext = ".lz4";
std::cerr << "Logging errors to " << errorlog_path << ext << std::endl;
error_log_file = new Poco::FileChannel; error_log_file = new Poco::FileChannel;
error_log_file->setProperty(Poco::FileChannel::PROP_PATH, fs::weakly_canonical(errorlog_path)); error_log_file->setProperty(Poco::FileChannel::PROP_PATH, fs::weakly_canonical(errorlog_path));
error_log_file->setProperty(Poco::FileChannel::PROP_ROTATION, config.getRawString("logger.size", "100M")); error_log_file->setProperty(Poco::FileChannel::PROP_ROTATION, config.getRawString("logger.size", "100M"));
error_log_file->setProperty(Poco::FileChannel::PROP_ARCHIVE, "number"); error_log_file->setProperty(Poco::FileChannel::PROP_ARCHIVE, "number");
error_log_file->setProperty(Poco::FileChannel::PROP_COMPRESS, config.getRawString("logger.compress", "true")); error_log_file->setProperty(Poco::FileChannel::PROP_COMPRESS, config.getRawString("logger.compress", "true"));
error_log_file->setProperty(Poco::FileChannel::PROP_STREAMCOMPRESS, config.getRawString("logger.stream_compress", "false"));
error_log_file->setProperty(Poco::FileChannel::PROP_PURGECOUNT, config.getRawString("logger.count", "1")); error_log_file->setProperty(Poco::FileChannel::PROP_PURGECOUNT, config.getRawString("logger.count", "1"));
error_log_file->setProperty(Poco::FileChannel::PROP_FLUSH, config.getRawString("logger.flush", "true")); error_log_file->setProperty(Poco::FileChannel::PROP_FLUSH, config.getRawString("logger.flush", "true"));
error_log_file->setProperty(Poco::FileChannel::PROP_ROTATEONOPEN, config.getRawString("logger.rotateOnOpen", "false")); error_log_file->setProperty(Poco::FileChannel::PROP_ROTATEONOPEN, config.getRawString("logger.rotateOnOpen", "false"));

2
contrib/poco vendored

@ -1 +1 @@
Subproject commit 173fb31717837d366152c508619b09dcf11786da Subproject commit 258b9ba6cd245ff88e9346f75c43464c403f329d

View File

@ -51,6 +51,7 @@ if (USE_INTERNAL_POCO_LIBRARY)
"${LIBRARY_DIR}/Foundation/src/Channel.cpp" "${LIBRARY_DIR}/Foundation/src/Channel.cpp"
"${LIBRARY_DIR}/Foundation/src/Checksum.cpp" "${LIBRARY_DIR}/Foundation/src/Checksum.cpp"
"${LIBRARY_DIR}/Foundation/src/Clock.cpp" "${LIBRARY_DIR}/Foundation/src/Clock.cpp"
"${LIBRARY_DIR}/Foundation/src/CompressedLogFile.cpp"
"${LIBRARY_DIR}/Foundation/src/Condition.cpp" "${LIBRARY_DIR}/Foundation/src/Condition.cpp"
"${LIBRARY_DIR}/Foundation/src/Configurable.cpp" "${LIBRARY_DIR}/Foundation/src/Configurable.cpp"
"${LIBRARY_DIR}/Foundation/src/ConsoleChannel.cpp" "${LIBRARY_DIR}/Foundation/src/ConsoleChannel.cpp"
@ -222,7 +223,7 @@ if (USE_INTERNAL_POCO_LIBRARY)
POCO_OS_FAMILY_UNIX POCO_OS_FAMILY_UNIX
) )
target_include_directories (_poco_foundation SYSTEM PUBLIC "${LIBRARY_DIR}/Foundation/include") target_include_directories (_poco_foundation SYSTEM PUBLIC "${LIBRARY_DIR}/Foundation/include")
target_link_libraries (_poco_foundation PRIVATE Poco::Foundation::PCRE ${ZLIB_LIBRARIES}) target_link_libraries (_poco_foundation PRIVATE Poco::Foundation::PCRE ${ZLIB_LIBRARIES} lz4)
else () else ()
add_library (Poco::Foundation UNKNOWN IMPORTED GLOBAL) add_library (Poco::Foundation UNKNOWN IMPORTED GLOBAL)

View File

@ -19,6 +19,7 @@ RUN apt-get update \
sqlite3 \ sqlite3 \
curl \ curl \
tar \ tar \
lz4 \
krb5-user \ krb5-user \
iproute2 \ iproute2 \
lsof \ lsof \

View File

@ -4,8 +4,8 @@ from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__, name="aggregate_fixed_key") cluster = ClickHouseCluster(__file__, name="aggregate_fixed_key")
node1 = cluster.add_instance('node1', with_zookeeper=True, image='yandex/clickhouse-server', tag='21.3', with_installed_binary=True) node1 = cluster.add_instance('node1', with_zookeeper=True, image='yandex/clickhouse-server', tag='21.3', with_installed_binary=True)
node2 = cluster.add_instance('node2', with_zookeeper=True, image='yandex/clickhouse-server') node2 = cluster.add_instance('node2', with_zookeeper=True)
node3 = cluster.add_instance('node3', with_zookeeper=True, image='yandex/clickhouse-server') node3 = cluster.add_instance('node3', with_zookeeper=True)
@pytest.fixture(scope="module") @pytest.fixture(scope="module")

View File

@ -0,0 +1,5 @@
<yandex>
<logger>
<stream_compress>true</stream_compress>
</logger>
</yandex>

View File

@ -0,0 +1,44 @@
import pytest
import time
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node = cluster.add_instance('node', main_configs=['configs/logs.xml'], stay_alive=True)
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def check_log_file():
assert node.file_exists("/var/log/clickhouse-server/clickhouse-server.log.lz4")
lz4_output = node.exec_in_container(["bash", "-c", "lz4 -t /var/log/clickhouse-server/clickhouse-server.log.lz4 2>&1"], user='root')
assert lz4_output.count('Error') == 0, lz4_output
compressed_size = int(node.exec_in_container(["bash", "-c", "du -b /var/log/clickhouse-server/clickhouse-server.log.lz4 | awk {' print $1 '}"], user='root'))
uncompressed_size = int(lz4_output.split()[3])
assert 0 < compressed_size < uncompressed_size, lz4_output
def test_concatenation(started_cluster):
node.stop_clickhouse()
node.start_clickhouse()
node.stop_clickhouse()
check_log_file()
def test_incomplete_rotation(started_cluster):
node.stop_clickhouse(kill=True)
node.start_clickhouse()
node.stop_clickhouse()
check_log_file()