Fix global trace collector

This commit is contained in:
Antonio Andelic 2024-06-06 11:11:08 +02:00
parent ce244e126d
commit 1d3cf17053
6 changed files with 105 additions and 60 deletions

View File

@ -773,7 +773,51 @@ try
LOG_INFO(log, "Available CPU instruction sets: {}", cpu_info);
#endif
bool will_have_trace_collector = hasPHDRCache() && config().has("trace_log");
bool has_trace_collector = false;
/// Disable it if we collect test coverage information, because it will work extremely slow.
#if !WITH_COVERAGE
/// Profilers cannot work reliably with any other libunwind or without PHDR cache.
if (hasPHDRCache() && config().has("trace_log"))
{
has_trace_collector = true;
/// Set up server-wide memory profiler (for total memory tracker).
if (server_settings.total_memory_profiler_step)
{
total_memory_tracker.setProfilerStep(server_settings.total_memory_profiler_step);
}
if (server_settings.total_memory_tracker_sample_probability > 0.0)
{
total_memory_tracker.setSampleProbability(server_settings.total_memory_tracker_sample_probability);
}
if (server_settings.total_memory_profiler_sample_min_allocation_size)
{
total_memory_tracker.setSampleMinAllocationSize(server_settings.total_memory_profiler_sample_min_allocation_size);
}
if (server_settings.total_memory_profiler_sample_max_allocation_size)
{
total_memory_tracker.setSampleMaxAllocationSize(server_settings.total_memory_profiler_sample_max_allocation_size);
}
}
#endif
/// Describe multiple reasons when query profiler cannot work.
#if WITH_COVERAGE
LOG_INFO(log, "Query Profiler and TraceCollector are disabled because they work extremely slow with test coverage.");
#endif
#if defined(SANITIZER)
LOG_INFO(log, "Query Profiler disabled because they cannot work under sanitizers"
" when two different stack unwinding methods will interfere with each other.");
#endif
if (!hasPHDRCache())
LOG_INFO(log, "Query Profiler and TraceCollector are disabled because they require PHDR cache to be created"
" (otherwise the function 'dl_iterate_phdr' is not lock free and not async-signal safe).");
// Initialize global thread pool. Do it before we fetch configs from zookeeper
// nodes (`from_zk`), because ZooKeeper interface uses the pool. We will
@ -782,8 +826,12 @@ try
server_settings.max_thread_pool_size,
server_settings.max_thread_pool_free_size,
server_settings.thread_pool_queue_size,
will_have_trace_collector ? server_settings.global_profiler_real_time_period_ns : 0,
will_have_trace_collector ? server_settings.global_profiler_cpu_time_period_ns : 0);
has_trace_collector ? server_settings.global_profiler_real_time_period_ns : 0,
has_trace_collector ? server_settings.global_profiler_cpu_time_period_ns : 0);
if (has_trace_collector)
global_context->createTraceCollector();
/// Wait for all threads to avoid possible use-after-free (for example logging objects can be already destroyed).
SCOPE_EXIT({
Stopwatch watch;
@ -1950,52 +1998,9 @@ try
LOG_DEBUG(log, "Loaded metadata.");
/// Init trace collector only after trace_log system table was created
/// Disable it if we collect test coverage information, because it will work extremely slow.
#if !WITH_COVERAGE
/// Profilers cannot work reliably with any other libunwind or without PHDR cache.
if (hasPHDRCache())
{
if (has_trace_collector)
global_context->initializeTraceCollector();
/// Set up server-wide memory profiler (for total memory tracker).
if (server_settings.total_memory_profiler_step)
{
total_memory_tracker.setProfilerStep(server_settings.total_memory_profiler_step);
}
if (server_settings.total_memory_tracker_sample_probability > 0.0)
{
total_memory_tracker.setSampleProbability(server_settings.total_memory_tracker_sample_probability);
}
if (server_settings.total_memory_profiler_sample_min_allocation_size)
{
total_memory_tracker.setSampleMinAllocationSize(server_settings.total_memory_profiler_sample_min_allocation_size);
}
if (server_settings.total_memory_profiler_sample_max_allocation_size)
{
total_memory_tracker.setSampleMaxAllocationSize(server_settings.total_memory_profiler_sample_max_allocation_size);
}
}
#endif
/// Describe multiple reasons when query profiler cannot work.
#if WITH_COVERAGE
LOG_INFO(log, "Query Profiler and TraceCollector are disabled because they work extremely slow with test coverage.");
#endif
#if defined(SANITIZER)
LOG_INFO(log, "Query Profiler disabled because they cannot work under sanitizers"
" when two different stack unwinding methods will interfere with each other.");
#endif
if (!hasPHDRCache())
LOG_INFO(log, "Query Profiler and TraceCollector are disabled because they require PHDR cache to be created"
" (otherwise the function 'dl_iterate_phdr' is not lock free and not async-signal safe).");
#if defined(OS_LINUX)
auto tasks_stats_provider = TasksStatsCounters::findBestAvailableProvider();
if (tasks_stats_provider == TasksStatsCounters::MetricsProvider::None)

View File

@ -228,9 +228,9 @@ void Timer::cleanup()
#endif
template <typename ProfilerImpl>
QueryProfilerBase<ProfilerImpl>::QueryProfilerBase([[maybe_unused]] UInt64 thread_id, [[maybe_unused]] int clock_type, [[maybe_unused]] UInt32 period, [[maybe_unused]] int pause_signal_)
: log(getLogger("QueryProfiler"))
, pause_signal(pause_signal_)
QueryProfilerBase<ProfilerImpl>::QueryProfilerBase(
[[maybe_unused]] UInt64 thread_id, [[maybe_unused]] int clock_type, [[maybe_unused]] UInt32 period, [[maybe_unused]] int pause_signal_)
: log(getLogger("QueryProfiler")), pause_signal(pause_signal_)
{
#if defined(SANITIZER)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "QueryProfiler disabled because they cannot work under sanitizers");

View File

@ -740,12 +740,18 @@ struct ContextSharedPart : boost::noncopyable
void initializeTraceCollector(std::shared_ptr<TraceLog> trace_log)
{
if (!trace_log)
return;
if (!trace_collector.has_value())
throw Exception(ErrorCodes::LOGICAL_ERROR, "TraceCollector needs to be first created before initialization");
trace_collector->initialize(trace_log);
}
void createTraceCollector()
{
if (hasTraceCollector())
return;
trace_collector.emplace(std::move(trace_log));
trace_collector.emplace();
}
void addWarningMessage(const String & message) TSA_REQUIRES(mutex)
@ -3891,6 +3897,11 @@ void Context::initializeSystemLogs()
});
}
void Context::createTraceCollector()
{
shared->createTraceCollector();
}
void Context::initializeTraceCollector()
{
shared->initializeTraceCollector(getTraceLog());

View File

@ -1077,6 +1077,8 @@ public:
void initializeSystemLogs();
/// Call after initialization before using trace collector.
void createTraceCollector();
void initializeTraceCollector();
/// Call after unexpected crash happen.

View File

@ -1,5 +1,4 @@
#include "TraceCollector.h"
#include <Interpreters/TraceCollector.h>
#include <Core/Field.h>
#include <IO/ReadBufferFromFileDescriptor.h>
#include <IO/ReadHelpers.h>
@ -14,8 +13,12 @@
namespace DB
{
TraceCollector::TraceCollector(std::shared_ptr<TraceLog> trace_log_)
: trace_log(std::move(trace_log_))
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
TraceCollector::TraceCollector()
{
TraceSender::pipe.open();
@ -28,6 +31,23 @@ TraceCollector::TraceCollector(std::shared_ptr<TraceLog> trace_log_)
thread = ThreadFromGlobalPool(&TraceCollector::run, this);
}
void TraceCollector::initialize(std::shared_ptr<TraceLog> trace_log_)
{
if (is_trace_log_initialized)
throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "TraceCollector is already initialized");
trace_log_ptr = trace_log_;
is_trace_log_initialized.store(true, std::memory_order_release);
}
std::shared_ptr<TraceLog> TraceCollector::getTraceLog()
{
if (!is_trace_log_initialized.load(std::memory_order_acquire))
return nullptr;
return trace_log_ptr;
}
void TraceCollector::tryClosePipe()
{
try
@ -120,7 +140,7 @@ void TraceCollector::run()
ProfileEvents::Count increment;
readPODBinary(increment, in);
if (trace_log)
if (auto trace_log = getTraceLog())
{
// time and time_in_microseconds are both being constructed from the same timespec so that the
// times will be equal up to the precision of a second.

View File

@ -1,4 +1,5 @@
#pragma once
#include <atomic>
#include <Common/ThreadPool.h>
class StackTrace;
@ -16,11 +17,17 @@ class TraceLog;
class TraceCollector
{
public:
explicit TraceCollector(std::shared_ptr<TraceLog> trace_log_);
explicit TraceCollector();
~TraceCollector();
void initialize(std::shared_ptr<TraceLog> trace_log_);
private:
std::shared_ptr<TraceLog> trace_log;
std::shared_ptr<TraceLog> getTraceLog();
std::atomic<bool> is_trace_log_initialized = false;
std::shared_ptr<TraceLog> trace_log_ptr;
ThreadFromGlobalPool thread;
void tryClosePipe();