Handle logs from rocksdb by ClickHouse internal logging

Someone may not know about /path/to/rocksdb/LOG, and besides it requires
an access to data dir, instead of logs dir.

v2: fix use-after-free due to destruction order (https://s3.amazonaws.com/clickhouse-test-reports/64856/8cefc1a1ba5ddfdda033694a14e1f1847d497092/stateless_tests__asan__[2_4]/stderr.log)

Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
This commit is contained in:
Azat Khuzhin 2024-06-05 18:32:04 +02:00
parent afba7bb4ed
commit 833f031721
3 changed files with 75 additions and 3 deletions

View File

@ -1561,6 +1561,7 @@
<rocksdb> <rocksdb>
<options> <options>
<max_background_jobs>8</max_background_jobs> <max_background_jobs>8</max_background_jobs>
<info_log_level>DEBUG_LEVEL</info_log_level>
</options> </options>
<column_family_options> <column_family_options>
<num_levels>2</num_levels> <num_levels>2</num_levels>

View File

@ -25,6 +25,7 @@
#include <Poco/Logger.h> #include <Poco/Logger.h>
#include <Poco/Util/AbstractConfiguration.h> #include <Poco/Util/AbstractConfiguration.h>
#include <Common/Logger.h>
#include <Common/logger_useful.h> #include <Common/logger_useful.h>
#include <Common/Exception.h> #include <Common/Exception.h>
#include <Storages/AlterCommands.h> #include <Storages/AlterCommands.h>
@ -42,6 +43,7 @@
#include <cstddef> #include <cstddef>
#include <filesystem> #include <filesystem>
#include <memory>
#include <utility> #include <utility>
@ -185,11 +187,11 @@ StorageEmbeddedRocksDB::StorageEmbeddedRocksDB(const StorageID & table_id_,
bool read_only_) bool read_only_)
: IStorage(table_id_) : IStorage(table_id_)
, WithContext(context_->getGlobalContext()) , WithContext(context_->getGlobalContext())
, log(getLogger(fmt::format("StorageEmbeddedRocksDB ({})", getStorageID().getNameForLogs())))
, primary_key{primary_key_} , primary_key{primary_key_}
, rocksdb_dir(std::move(rocksdb_dir_)) , rocksdb_dir(std::move(rocksdb_dir_))
, ttl(ttl_) , ttl(ttl_)
, read_only(read_only_) , read_only(read_only_)
, log(getLogger(fmt::format("StorageEmbeddedRocksDB ({})", getStorageID().getNameForLogs())))
{ {
setInMemoryMetadata(metadata_); setInMemoryMetadata(metadata_);
setSettings(std::move(settings_)); setSettings(std::move(settings_));
@ -352,6 +354,72 @@ bool StorageEmbeddedRocksDB::optimize(
return true; return true;
} }
static_assert(rocksdb::DEBUG_LEVEL == 0);
static_assert(rocksdb::HEADER_LEVEL == 5);
static constexpr std::array<std::pair<DB::LogsLevel, Poco::Message::Priority>, 6> rocksdb_logger_map = {
std::make_pair(DB::LogsLevel::debug, Poco::Message::Priority::PRIO_DEBUG),
std::make_pair(DB::LogsLevel::information, Poco::Message::Priority::PRIO_INFORMATION),
std::make_pair(DB::LogsLevel::warning, Poco::Message::Priority::PRIO_WARNING),
std::make_pair(DB::LogsLevel::error, Poco::Message::Priority::PRIO_ERROR),
std::make_pair(DB::LogsLevel::fatal, Poco::Message::Priority::PRIO_FATAL),
/// Same as default logger does for HEADER_LEVEL
std::make_pair(DB::LogsLevel::information, Poco::Message::Priority::PRIO_INFORMATION),
};
class StorageEmbeddedRocksDBLogger : public rocksdb::Logger
{
public:
explicit StorageEmbeddedRocksDBLogger(const rocksdb::InfoLogLevel log_level, LoggerRawPtr log_)
: rocksdb::Logger(log_level)
, log(log_)
{}
void Logv(const char * format, va_list ap) override
__attribute__((format(printf, 2, 0)))
{
Logv(rocksdb::InfoLogLevel::DEBUG_LEVEL, format, ap);
}
void Logv(const rocksdb::InfoLogLevel log_level, const char * format, va_list ap) override
__attribute__((format(printf, 3, 0)))
{
if (log_level < GetInfoLogLevel())
return;
auto level = rocksdb_logger_map[log_level];
/// stack buffer was enough
{
va_list backup_ap;
va_copy(backup_ap, ap);
std::array<char, 1024> stack;
if (vsnprintf(stack.data(), stack.size(), format, backup_ap) < static_cast<int>(stack.size()))
{
va_end(backup_ap);
LOG_IMPL(log, level.first, level.second, "{}", stack.data());
return;
}
va_end(backup_ap);
}
/// let's try with a bigger dynamic buffer (but not too huge, since
/// some of rocksdb internal code has also such a limitation, i..e
/// HdfsLogger)
{
va_list backup_ap;
va_copy(backup_ap, ap);
static constexpr int buffer_size = 30000;
std::unique_ptr<char[]> buffer(new char[buffer_size]);
if (vsnprintf(buffer.get(), buffer_size, format, backup_ap) >= buffer_size)
buffer[buffer_size - 1] = 0;
va_end(backup_ap);
LOG_IMPL(log, level.first, level.second, "{}", buffer.get());
}
}
private:
LoggerRawPtr log;
};
void StorageEmbeddedRocksDB::initDB() void StorageEmbeddedRocksDB::initDB()
{ {
rocksdb::Status status; rocksdb::Status status;
@ -448,6 +516,7 @@ void StorageEmbeddedRocksDB::initDB()
} }
} }
merged.info_log = std::make_shared<StorageEmbeddedRocksDBLogger>(merged.info_log_level, log.get());
merged.table_factory.reset(rocksdb::NewBlockBasedTableFactory(table_options)); merged.table_factory.reset(rocksdb::NewBlockBasedTableFactory(table_options));
if (ttl > 0) if (ttl > 0)

View File

@ -114,17 +114,19 @@ public:
private: private:
SinkToStoragePtr getSink(ContextPtr context, const StorageMetadataPtr & metadata_snapshot); SinkToStoragePtr getSink(ContextPtr context, const StorageMetadataPtr & metadata_snapshot);
LoggerPtr log;
MultiVersion<RocksDBSettings> storage_settings; MultiVersion<RocksDBSettings> storage_settings;
const String primary_key; const String primary_key;
using RocksDBPtr = std::unique_ptr<rocksdb::DB>; using RocksDBPtr = std::unique_ptr<rocksdb::DB>;
RocksDBPtr rocksdb_ptr; RocksDBPtr rocksdb_ptr;
mutable SharedMutex rocksdb_ptr_mx; mutable SharedMutex rocksdb_ptr_mx;
String rocksdb_dir; String rocksdb_dir;
Int32 ttl; Int32 ttl;
bool read_only; bool read_only;
void initDB(); void initDB();
LoggerPtr log;
}; };
} }