Merge pull request #33534 from azat/fwd-decl

RFC: Split headers, move SystemLog into module, more forward declarations
This commit is contained in:
Kruglov Pavel 2022-01-18 17:22:49 +03:00 committed by GitHub
commit 2295a07066
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
75 changed files with 1022 additions and 838 deletions

View File

@ -2,7 +2,7 @@
#include <base/scope_guard.h>
#include <base/logger_useful.h>
#include <Common/MemoryTracker.h>
#include <Common/LockMemoryExceptionInThread.h>
/// Same as SCOPE_EXIT() but block the MEMORY_LIMIT_EXCEEDED errors.
///
@ -12,8 +12,7 @@
///
/// NOTE: it should be used with caution.
#define SCOPE_EXIT_MEMORY(...) SCOPE_EXIT( \
MemoryTracker::LockExceptionInThread \
lock_memory_tracker(VariableContext::Global); \
LockMemoryExceptionInThread lock_memory_tracker(VariableContext::Global); \
__VA_ARGS__; \
)
@ -57,8 +56,7 @@
#define SCOPE_EXIT_MEMORY_SAFE(...) SCOPE_EXIT( \
try \
{ \
MemoryTracker::LockExceptionInThread \
lock_memory_tracker(VariableContext::Global); \
LockMemoryExceptionInThread lock_memory_tracker(VariableContext::Global); \
__VA_ARGS__; \
} \
catch (...) \

View File

@ -51,6 +51,7 @@
#include <Common/getExecutablePath.h>
#include <Common/getHashOfLoadedBinary.h>
#include <Common/Elf.h>
#include <Common/setThreadName.h>
#include <filesystem>
#include <loggers/OwnFormattingChannel.h>

View File

@ -10,6 +10,8 @@
#include <Poco/Message.h>
#include <Common/CurrentThread.h>
#include <Common/DNSResolver.h>
#include <Common/setThreadName.h>
#include <Common/LockMemoryExceptionInThread.h>
#include <base/getThreadId.h>
#include <Common/SensitiveDataMasker.h>
#include <Common/IO.h>
@ -57,7 +59,7 @@ void OwnSplitChannel::tryLogSplit(const Poco::Message & msg)
/// but let's log it into the stderr at least.
catch (...)
{
MemoryTracker::LockExceptionInThread lock_memory_tracker(VariableContext::Global);
LockMemoryExceptionInThread lock_memory_tracker(VariableContext::Global);
const std::string & exception_message = getCurrentExceptionMessage(true);
const std::string & message = msg.getText();

View File

@ -28,6 +28,7 @@
#include <IO/WriteBufferFromFileDescriptor.h>
#include <IO/UseSSL.h>
#include <Parsers/IAST.h>
#include <Parsers/ASTInsertQuery.h>
#include <base/ErrorHandlers.h>
#include <Functions/registerFunctions.h>
#include <AggregateFunctions/registerAggregateFunctions.h>

View File

@ -584,7 +584,6 @@ if (ENABLE_TESTS AND USE_GTEST)
dbms
clickhouse_common_config
clickhouse_common_zookeeper
clickhouse_common_config
string_utils)
add_check(unit_tests_dbms)

View File

@ -8,7 +8,6 @@ set (SRCS
)
add_library(clickhouse_common_config ${SRCS})
target_link_libraries(clickhouse_common_config
PUBLIC
clickhouse_common_zookeeper
@ -18,9 +17,17 @@ target_link_libraries(clickhouse_common_config
string_utils
)
if (USE_YAML_CPP)
target_link_libraries(clickhouse_common_config
add_library(clickhouse_common_config_no_zookeeper_log ${SRCS})
target_link_libraries(clickhouse_common_config_no_zookeeper_log
PUBLIC
clickhouse_common_zookeeper_no_log
common
Poco::XML
PRIVATE
yaml-cpp
string_utils
)
if (USE_YAML_CPP)
target_link_libraries(clickhouse_common_config PRIVATE yaml-cpp)
target_link_libraries(clickhouse_common_config_no_zookeeper_log PRIVATE yaml-cpp)
endif()

View File

@ -15,6 +15,7 @@
#include <Common/formatReadable.h>
#include <Common/filesystemHelpers.h>
#include <Common/ErrorCodes.h>
#include <Common/LockMemoryExceptionInThread.h>
#include <filesystem>
#include <Common/config_version.h>
@ -175,7 +176,7 @@ void tryLogCurrentException(const char * log_name, const std::string & start_of_
///
/// And in this case the exception will not be logged, so let's block the
/// MemoryTracker until the exception will be logged.
MemoryTracker::LockExceptionInThread lock_memory_tracker(VariableContext::Global);
LockMemoryExceptionInThread lock_memory_tracker(VariableContext::Global);
/// Poco::Logger::get can allocate memory too
tryLogCurrentExceptionImpl(&Poco::Logger::get(log_name), start_of_message);
@ -188,7 +189,7 @@ void tryLogCurrentException(Poco::Logger * logger, const std::string & start_of_
///
/// And in this case the exception will not be logged, so let's block the
/// MemoryTracker until the exception will be logged.
MemoryTracker::LockExceptionInThread lock_memory_tracker(VariableContext::Global);
LockMemoryExceptionInThread lock_memory_tracker(VariableContext::Global);
tryLogCurrentExceptionImpl(logger, start_of_message);
}

View File

@ -0,0 +1,20 @@
#include <Common/LockMemoryExceptionInThread.h>
/// LockMemoryExceptionInThread
thread_local uint64_t LockMemoryExceptionInThread::counter = 0;
thread_local VariableContext LockMemoryExceptionInThread::level = VariableContext::Global;
thread_local bool LockMemoryExceptionInThread::block_fault_injections = false;
LockMemoryExceptionInThread::LockMemoryExceptionInThread(VariableContext level_, bool block_fault_injections_)
: previous_level(level)
, previous_block_fault_injections(block_fault_injections)
{
++counter;
level = level_;
block_fault_injections = block_fault_injections_;
}
LockMemoryExceptionInThread::~LockMemoryExceptionInThread()
{
--counter;
level = previous_level;
block_fault_injections = previous_block_fault_injections;
}

View File

@ -0,0 +1,39 @@
#pragma once
#include <Common/VariableContext.h>
/// To be able to avoid MEMORY_LIMIT_EXCEEDED Exception in destructors:
/// - either configured memory limit reached
/// - or fault injected
///
/// So this will simply ignore the configured memory limit (and avoid fault injection).
///
/// NOTE: exception will be silently ignored, no message in log
/// (since logging from MemoryTracker::alloc() is tricky)
///
/// NOTE: MEMORY_LIMIT_EXCEEDED Exception implicitly blocked if
/// stack unwinding is currently in progress in this thread (to avoid
/// std::terminate()), so you don't need to use it in this case explicitly.
struct LockMemoryExceptionInThread
{
private:
static thread_local uint64_t counter;
static thread_local VariableContext level;
static thread_local bool block_fault_injections;
VariableContext previous_level;
bool previous_block_fault_injections;
public:
/// level_ - block in level and above
/// block_fault_injections_ - block in fault injection too
explicit LockMemoryExceptionInThread(VariableContext level_ = VariableContext::User, bool block_fault_injections_ = true);
~LockMemoryExceptionInThread();
LockMemoryExceptionInThread(const LockMemoryExceptionInThread &) = delete;
LockMemoryExceptionInThread & operator=(const LockMemoryExceptionInThread &) = delete;
static bool isBlocked(VariableContext current_level, bool fault_injection)
{
return counter > 0 && current_level >= level && (!fault_injection || block_fault_injections);
}
};

View File

@ -1,12 +1,14 @@
#include "MemoryTracker.h"
#include <IO/WriteHelpers.h>
#include "Common/TraceCollector.h"
#include <Interpreters/TraceCollector.h>
#include <Common/Exception.h>
#include <Common/LockMemoryExceptionInThread.h>
#include <Common/MemoryTrackerBlockerInThread.h>
#include <Common/formatReadable.h>
#include <base/logger_useful.h>
#include <Common/ProfileEvents.h>
#include <Common/thread_local_rng.h>
#include <base/logger_useful.h>
#include <atomic>
#include <cmath>
@ -34,7 +36,7 @@ namespace
/// noexcept(false)) will cause std::terminate()
bool inline memoryTrackerCanThrow(VariableContext level, bool fault_injection)
{
return !MemoryTracker::LockExceptionInThread::isBlocked(level, fault_injection) && !std::uncaught_exceptions();
return !LockMemoryExceptionInThread::isBlocked(level, fault_injection) && !std::uncaught_exceptions();
}
}
@ -55,41 +57,6 @@ namespace ProfileEvents
static constexpr size_t log_peak_memory_usage_every = 1ULL << 30;
// BlockerInThread
thread_local uint64_t MemoryTracker::BlockerInThread::counter = 0;
thread_local VariableContext MemoryTracker::BlockerInThread::level = VariableContext::Global;
MemoryTracker::BlockerInThread::BlockerInThread(VariableContext level_)
: previous_level(level)
{
++counter;
level = level_;
}
MemoryTracker::BlockerInThread::~BlockerInThread()
{
--counter;
level = previous_level;
}
/// LockExceptionInThread
thread_local uint64_t MemoryTracker::LockExceptionInThread::counter = 0;
thread_local VariableContext MemoryTracker::LockExceptionInThread::level = VariableContext::Global;
thread_local bool MemoryTracker::LockExceptionInThread::block_fault_injections = false;
MemoryTracker::LockExceptionInThread::LockExceptionInThread(VariableContext level_, bool block_fault_injections_)
: previous_level(level)
, previous_block_fault_injections(block_fault_injections)
{
++counter;
level = level_;
block_fault_injections = block_fault_injections_;
}
MemoryTracker::LockExceptionInThread::~LockExceptionInThread()
{
--counter;
level = previous_level;
block_fault_injections = previous_block_fault_injections;
}
MemoryTracker total_memory_tracker(nullptr, VariableContext::Global);
@ -133,9 +100,9 @@ void MemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceeded)
if (size < 0)
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Negative size ({}) is passed to MemoryTracker. It is a bug.", size);
if (BlockerInThread::isBlocked(level))
if (MemoryTrackerBlockerInThread::isBlocked(level))
{
/// Since the BlockerInThread should respect the level, we should go to the next parent.
/// Since the MemoryTrackerBlockerInThread should respect the level, we should go to the next parent.
if (auto * loaded_next = parent.load(std::memory_order_relaxed))
loaded_next->allocImpl(size, throw_if_memory_exceeded);
return;
@ -184,7 +151,7 @@ void MemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceeded)
if (unlikely(fault_probability && fault(thread_local_rng)) && memoryTrackerCanThrow(level, true) && throw_if_memory_exceeded)
{
/// Prevent recursion. Exception::ctor -> std::string -> new[] -> MemoryTracker::alloc
BlockerInThread untrack_lock(VariableContext::Global);
MemoryTrackerBlockerInThread untrack_lock(VariableContext::Global);
ProfileEvents::increment(ProfileEvents::QueryMemoryLimitExceeded);
const auto * description = description_ptr.load(std::memory_order_relaxed);
@ -203,7 +170,7 @@ void MemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceeded)
bool allocation_traced = false;
if (unlikely(current_profiler_limit && will_be > current_profiler_limit))
{
BlockerInThread untrack_lock(VariableContext::Global);
MemoryTrackerBlockerInThread untrack_lock(VariableContext::Global);
DB::TraceCollector::collect(DB::TraceType::Memory, StackTrace(), size);
setOrRaiseProfilerLimit((will_be + profiler_step - 1) / profiler_step * profiler_step);
allocation_traced = true;
@ -212,7 +179,7 @@ void MemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceeded)
std::bernoulli_distribution sample(sample_probability);
if (unlikely(sample_probability && sample(thread_local_rng)))
{
BlockerInThread untrack_lock(VariableContext::Global);
MemoryTrackerBlockerInThread untrack_lock(VariableContext::Global);
DB::TraceCollector::collect(DB::TraceType::MemorySample, StackTrace(), size);
allocation_traced = true;
}
@ -220,7 +187,7 @@ void MemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceeded)
if (unlikely(current_hard_limit && will_be > current_hard_limit) && memoryTrackerCanThrow(level, false) && throw_if_memory_exceeded)
{
/// Prevent recursion. Exception::ctor -> std::string -> new[] -> MemoryTracker::alloc
BlockerInThread untrack_lock(VariableContext::Global);
MemoryTrackerBlockerInThread untrack_lock(VariableContext::Global);
ProfileEvents::increment(ProfileEvents::QueryMemoryLimitExceeded);
const auto * description = description_ptr.load(std::memory_order_relaxed);
throw DB::Exception(
@ -237,7 +204,7 @@ void MemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceeded)
if (throw_if_memory_exceeded)
{
/// Prevent recursion. Exception::ctor -> std::string -> new[] -> MemoryTracker::alloc
BlockerInThread untrack_lock(VariableContext::Global);
MemoryTrackerBlockerInThread untrack_lock(VariableContext::Global);
bool log_memory_usage = true;
peak_updated = updatePeak(will_be, log_memory_usage);
}
@ -249,7 +216,7 @@ void MemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceeded)
if (peak_updated && allocation_traced)
{
BlockerInThread untrack_lock(VariableContext::Global);
MemoryTrackerBlockerInThread untrack_lock(VariableContext::Global);
DB::TraceCollector::collect(DB::TraceType::MemoryPeak, StackTrace(), will_be);
}
@ -288,9 +255,9 @@ bool MemoryTracker::updatePeak(Int64 will_be, bool log_memory_usage)
void MemoryTracker::free(Int64 size)
{
if (BlockerInThread::isBlocked(level))
if (MemoryTrackerBlockerInThread::isBlocked(level))
{
/// Since the BlockerInThread should respect the level, we should go to the next parent.
/// Since the MemoryTrackerBlockerInThread should respect the level, we should go to the next parent.
if (auto * loaded_next = parent.load(std::memory_order_relaxed))
loaded_next->free(size);
return;
@ -299,7 +266,7 @@ void MemoryTracker::free(Int64 size)
std::bernoulli_distribution sample(sample_probability);
if (unlikely(sample_probability && sample(thread_local_rng)))
{
BlockerInThread untrack_lock(VariableContext::Global);
MemoryTrackerBlockerInThread untrack_lock(VariableContext::Global);
DB::TraceCollector::collect(DB::TraceType::MemorySample, StackTrace(), -size);
}

View File

@ -31,6 +31,9 @@ extern thread_local bool memory_tracker_always_throw_logical_error_on_allocation
/** Tracks memory consumption.
* It throws an exception if amount of consumed memory become greater than certain limit.
* The same memory tracker could be simultaneously used in different threads.
*
* @see LockMemoryExceptionInThread
* @see MemoryTrackerBlockerInThread
*/
class MemoryTracker
{
@ -167,64 +170,6 @@ public:
/// Prints info about peak memory consumption into log.
void logPeakMemoryUsage() const;
/// To be able to temporarily stop memory tracking from current thread.
struct BlockerInThread
{
private:
static thread_local uint64_t counter;
static thread_local VariableContext level;
VariableContext previous_level;
public:
/// level_ - block in level and above
explicit BlockerInThread(VariableContext level_ = VariableContext::User);
~BlockerInThread();
BlockerInThread(const BlockerInThread &) = delete;
BlockerInThread & operator=(const BlockerInThread &) = delete;
static bool isBlocked(VariableContext current_level)
{
return counter > 0 && current_level >= level;
}
};
/// To be able to avoid MEMORY_LIMIT_EXCEEDED Exception in destructors:
/// - either configured memory limit reached
/// - or fault injected
///
/// So this will simply ignore the configured memory limit (and avoid fault injection).
///
/// NOTE: exception will be silently ignored, no message in log
/// (since logging from MemoryTracker::alloc() is tricky)
///
/// NOTE: MEMORY_LIMIT_EXCEEDED Exception implicitly blocked if
/// stack unwinding is currently in progress in this thread (to avoid
/// std::terminate()), so you don't need to use it in this case explicitly.
struct LockExceptionInThread
{
private:
static thread_local uint64_t counter;
static thread_local VariableContext level;
static thread_local bool block_fault_injections;
VariableContext previous_level;
bool previous_block_fault_injections;
public:
/// level_ - block in level and above
/// block_fault_injections_ - block in fault injection too
explicit LockExceptionInThread(VariableContext level_ = VariableContext::User, bool block_fault_injections_ = true);
~LockExceptionInThread();
LockExceptionInThread(const LockExceptionInThread &) = delete;
LockExceptionInThread & operator=(const LockExceptionInThread &) = delete;
static bool isBlocked(VariableContext current_level, bool fault_injection)
{
return counter > 0 && current_level >= level && (!fault_injection || block_fault_injections);
}
};
};
extern MemoryTracker total_memory_tracker;

View File

@ -0,0 +1,16 @@
#include <Common/MemoryTrackerBlockerInThread.h>
// MemoryTrackerBlockerInThread
thread_local uint64_t MemoryTrackerBlockerInThread::counter = 0;
thread_local VariableContext MemoryTrackerBlockerInThread::level = VariableContext::Global;
MemoryTrackerBlockerInThread::MemoryTrackerBlockerInThread(VariableContext level_)
: previous_level(level)
{
++counter;
level = level_;
}
MemoryTrackerBlockerInThread::~MemoryTrackerBlockerInThread()
{
--counter;
level = previous_level;
}

View File

@ -0,0 +1,25 @@
#pragma once
#include <Common/VariableContext.h>
/// To be able to temporarily stop memory tracking from current thread.
struct MemoryTrackerBlockerInThread
{
private:
static thread_local uint64_t counter;
static thread_local VariableContext level;
VariableContext previous_level;
public:
/// level_ - block in level and above
explicit MemoryTrackerBlockerInThread(VariableContext level_ = VariableContext::User);
~MemoryTrackerBlockerInThread();
MemoryTrackerBlockerInThread(const MemoryTrackerBlockerInThread &) = delete;
MemoryTrackerBlockerInThread & operator=(const MemoryTrackerBlockerInThread &) = delete;
static bool isBlocked(VariableContext current_level)
{
return counter > 0 && current_level >= level;
}
};

View File

@ -1,9 +1,9 @@
#include "QueryProfiler.h"
#include <IO/WriteHelpers.h>
#include <Interpreters/TraceCollector.h>
#include <Common/Exception.h>
#include <Common/StackTrace.h>
#include <Common/TraceCollector.h>
#include <Common/thread_local_rng.h>
#include <base/logger_useful.h>
#include <base/phdr_cache.h>

View File

@ -4,6 +4,7 @@
#include <Common/ThreadStatus.h>
#include <base/errnoToString.h>
#include <Interpreters/OpenTelemetrySpanLog.h>
#include <Interpreters/Context.h>
#include <Poco/Logger.h>
#include <base/getThreadId.h>
@ -11,6 +12,7 @@
#include <csignal>
#include <mutex>
#include <sys/mman.h>
namespace DB

View File

@ -1,187 +0,0 @@
#include "TraceCollector.h"
#include <Core/Field.h>
#include <IO/ReadBufferFromFileDescriptor.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromFileDescriptor.h>
#include <IO/WriteBufferFromFileDescriptorDiscardOnFailure.h>
#include <IO/WriteHelpers.h>
#include <Interpreters/TraceLog.h>
#include <Poco/Logger.h>
#include <Common/Exception.h>
#include <Common/PipeFDs.h>
#include <Common/StackTrace.h>
#include <Common/setThreadName.h>
#include <base/logger_useful.h>
namespace DB
{
namespace
{
/// Normally query_id is a UUID (string with a fixed length) but user can provide custom query_id.
/// Thus upper bound on query_id length should be introduced to avoid buffer overflow in signal handler.
///
/// And it cannot be large, since otherwise it will not fit into PIPE_BUF.
/// The performance test query ids can be surprisingly long like
/// `aggregating_merge_tree_simple_aggregate_function_string.query100.profile100`,
/// so make some allowance for them as well.
constexpr size_t QUERY_ID_MAX_LEN = 128;
static_assert(QUERY_ID_MAX_LEN <= std::numeric_limits<uint8_t>::max());
}
LazyPipeFDs pipe;
TraceCollector::TraceCollector(std::shared_ptr<TraceLog> trace_log_)
: trace_log(std::move(trace_log_))
{
pipe.open();
/** Turn write end of pipe to non-blocking mode to avoid deadlocks
* when QueryProfiler is invoked under locks and TraceCollector cannot pull data from pipe.
*/
pipe.setNonBlockingWrite();
pipe.tryIncreaseSize(1 << 20);
thread = ThreadFromGlobalPool(&TraceCollector::run, this);
}
TraceCollector::~TraceCollector()
{
if (!thread.joinable())
LOG_ERROR(&Poco::Logger::get("TraceCollector"), "TraceCollector thread is malformed and cannot be joined");
else
stop();
pipe.close();
}
void TraceCollector::collect(TraceType trace_type, const StackTrace & stack_trace, Int64 size)
{
constexpr size_t buf_size = sizeof(char) /// TraceCollector stop flag
+ sizeof(UInt8) /// String size
+ QUERY_ID_MAX_LEN /// Maximum query_id length
+ sizeof(UInt8) /// Number of stack frames
+ sizeof(StackTrace::FramePointers) /// Collected stack trace, maximum capacity
+ sizeof(TraceType) /// trace type
+ sizeof(UInt64) /// thread_id
+ sizeof(Int64); /// size
/// Write should be atomic to avoid overlaps
/// (since recursive collect() is possible)
static_assert(PIPE_BUF >= 512);
static_assert(buf_size <= 512, "Only write of PIPE_BUF to pipe is atomic and the minimal known PIPE_BUF across supported platforms is 512");
char buffer[buf_size];
WriteBufferFromFileDescriptorDiscardOnFailure out(pipe.fds_rw[1], buf_size, buffer);
StringRef query_id;
UInt64 thread_id;
if (CurrentThread::isInitialized())
{
query_id = CurrentThread::getQueryId();
query_id.size = std::min(query_id.size, QUERY_ID_MAX_LEN);
thread_id = CurrentThread::get().thread_id;
}
else
{
thread_id = MainThreadStatus::get()->thread_id;
}
writeChar(false, out); /// true if requested to stop the collecting thread.
writeBinary(static_cast<uint8_t>(query_id.size), out);
out.write(query_id.data, query_id.size);
size_t stack_trace_size = stack_trace.getSize();
size_t stack_trace_offset = stack_trace.getOffset();
writeIntBinary(UInt8(stack_trace_size - stack_trace_offset), out);
for (size_t i = stack_trace_offset; i < stack_trace_size; ++i)
writePODBinary(stack_trace.getFramePointers()[i], out);
writePODBinary(trace_type, out);
writePODBinary(thread_id, out);
writePODBinary(size, out);
out.next();
}
/** Sends TraceCollector stop message
*
* Each sequence of data for TraceCollector thread starts with a boolean flag.
* If this flag is true, TraceCollector must stop reading trace_pipe and exit.
* This function sends flag with a true value to stop TraceCollector gracefully.
*/
void TraceCollector::stop()
{
WriteBufferFromFileDescriptor out(pipe.fds_rw[1]);
writeChar(true, out);
out.next();
thread.join();
}
void TraceCollector::run()
{
setThreadName("TraceCollector");
ReadBufferFromFileDescriptor in(pipe.fds_rw[0]);
while (true)
{
char is_last;
readChar(is_last, in);
if (is_last)
break;
std::string query_id;
UInt8 query_id_size = 0;
readBinary(query_id_size, in);
query_id.resize(query_id_size);
in.read(query_id.data(), query_id_size);
UInt8 trace_size = 0;
readIntBinary(trace_size, in);
Array trace;
trace.reserve(trace_size);
for (size_t i = 0; i < trace_size; ++i)
{
uintptr_t addr = 0;
readPODBinary(addr, in);
trace.emplace_back(UInt64(addr));
}
TraceType trace_type;
readPODBinary(trace_type, in);
UInt64 thread_id;
readPODBinary(thread_id, in);
Int64 size;
readPODBinary(size, in);
if (trace_log)
{
// time and time_in_microseconds are both being constructed from the same timespec so that the
// times will be equal up to the precision of a second.
struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
UInt64 time = UInt64(ts.tv_sec * 1000000000LL + ts.tv_nsec);
UInt64 time_in_microseconds = UInt64((ts.tv_sec * 1000000LL) + (ts.tv_nsec / 1000));
TraceLogElement element{time_t(time / 1000000000), time_in_microseconds, time, trace_type, thread_id, query_id, trace, size};
trace_log->add(element);
}
}
}
}

View File

@ -1,46 +0,0 @@
#pragma once
#include "Common/PipeFDs.h"
#include <Common/ThreadPool.h>
class StackTrace;
namespace Poco
{
class Logger;
}
namespace DB
{
class TraceLog;
enum class TraceType : uint8_t
{
Real,
CPU,
Memory,
MemorySample,
MemoryPeak,
};
class TraceCollector
{
public:
TraceCollector(std::shared_ptr<TraceLog> trace_log_);
~TraceCollector();
/// Collect a stack trace. This method is signal safe.
/// Precondition: the TraceCollector object must be created.
/// size - for memory tracing is the amount of memory allocated; for other trace types it is 0.
static void collect(TraceType trace_type, const StackTrace & stack_trace, Int64 size);
private:
std::shared_ptr<TraceLog> trace_log;
ThreadFromGlobalPool thread;
void run();
void stop();
};
}

View File

@ -0,0 +1,78 @@
#include <Common/TraceSender.h>
#include <IO/WriteBufferFromFileDescriptorDiscardOnFailure.h>
#include <IO/WriteHelpers.h>
#include <Common/StackTrace.h>
#include <Common/CurrentThread.h>
namespace
{
/// Normally query_id is a UUID (string with a fixed length) but user can provide custom query_id.
/// Thus upper bound on query_id length should be introduced to avoid buffer overflow in signal handler.
///
/// And it cannot be large, since otherwise it will not fit into PIPE_BUF.
/// The performance test query ids can be surprisingly long like
/// `aggregating_merge_tree_simple_aggregate_function_string.query100.profile100`,
/// so make some allowance for them as well.
constexpr size_t QUERY_ID_MAX_LEN = 128;
static_assert(QUERY_ID_MAX_LEN <= std::numeric_limits<uint8_t>::max());
}
namespace DB
{
LazyPipeFDs TraceSender::pipe;
void TraceSender::send(TraceType trace_type, const StackTrace & stack_trace, Int64 size)
{
constexpr size_t buf_size = sizeof(char) /// TraceCollector stop flag
+ sizeof(UInt8) /// String size
+ QUERY_ID_MAX_LEN /// Maximum query_id length
+ sizeof(UInt8) /// Number of stack frames
+ sizeof(StackTrace::FramePointers) /// Collected stack trace, maximum capacity
+ sizeof(TraceType) /// trace type
+ sizeof(UInt64) /// thread_id
+ sizeof(Int64); /// size
/// Write should be atomic to avoid overlaps
/// (since recursive collect() is possible)
static_assert(PIPE_BUF >= 512);
static_assert(buf_size <= 512, "Only write of PIPE_BUF to pipe is atomic and the minimal known PIPE_BUF across supported platforms is 512");
char buffer[buf_size];
WriteBufferFromFileDescriptorDiscardOnFailure out(pipe.fds_rw[1], buf_size, buffer);
StringRef query_id;
UInt64 thread_id;
if (CurrentThread::isInitialized())
{
query_id = CurrentThread::getQueryId();
query_id.size = std::min(query_id.size, QUERY_ID_MAX_LEN);
thread_id = CurrentThread::get().thread_id;
}
else
{
thread_id = MainThreadStatus::get()->thread_id;
}
writeChar(false, out); /// true if requested to stop the collecting thread.
writeBinary(static_cast<uint8_t>(query_id.size), out);
out.write(query_id.data, query_id.size);
size_t stack_trace_size = stack_trace.getSize();
size_t stack_trace_offset = stack_trace.getOffset();
writeIntBinary(UInt8(stack_trace_size - stack_trace_offset), out);
for (size_t i = stack_trace_offset; i < stack_trace_size; ++i)
writePODBinary(stack_trace.getFramePointers()[i], out);
writePODBinary(trace_type, out);
writePODBinary(thread_id, out);
writePODBinary(size, out);
out.next();
}
}

36
src/Common/TraceSender.h Normal file
View File

@ -0,0 +1,36 @@
#pragma once
#include <Common/PipeFDs.h>
#include <base/types.h>
class StackTrace;
class TraceCollector;
namespace DB
{
enum class TraceType : uint8_t
{
Real,
CPU,
Memory,
MemorySample,
MemoryPeak,
};
/// This is the second part of TraceCollector, that sends stacktrace to the pipe.
/// It has been split out to avoid dependency from interpreters part.
class TraceSender
{
public:
/// Collect a stack trace. This method is signal safe.
/// Precondition: the TraceCollector object must be created.
/// size - for memory tracing is the amount of memory allocated; for other trace types it is 0.
static void send(TraceType trace_type, const StackTrace & stack_trace, Int64 size);
private:
friend class TraceCollector;
static LazyPipeFDs pipe;
};
}

View File

@ -2,9 +2,32 @@ include("${ClickHouse_SOURCE_DIR}/cmake/dbms_glob_sources.cmake")
add_headers_and_sources(clickhouse_common_zookeeper .)
# for clickhouse server
add_library(clickhouse_common_zookeeper ${clickhouse_common_zookeeper_headers} ${clickhouse_common_zookeeper_sources})
target_compile_definitions (clickhouse_common_zookeeper PRIVATE -DZOOKEEPER_LOG)
target_link_libraries (clickhouse_common_zookeeper
PUBLIC
clickhouse_common_io
common
PRIVATE
string_utils
)
# To avoid circular dependency from interpreters.
if (OS_DARWIN)
target_link_libraries (clickhouse_common_zookeeper PRIVATE -Wl,-undefined,dynamic_lookup)
else()
target_link_libraries (clickhouse_common_zookeeper PRIVATE -Wl,--unresolved-symbols=ignore-all)
endif()
target_link_libraries (clickhouse_common_zookeeper PUBLIC clickhouse_common_io common PRIVATE string_utils)
# for examples -- no logging (to avoid extra dependencies)
add_library(clickhouse_common_zookeeper_no_log ${clickhouse_common_zookeeper_headers} ${clickhouse_common_zookeeper_sources})
target_link_libraries (clickhouse_common_zookeeper_no_log
PUBLIC
clickhouse_common_io
common
PRIVATE
string_utils
)
if (ENABLE_EXAMPLES)
add_subdirectory(examples)

View File

@ -1,5 +1,6 @@
#include <Common/ZooKeeper/ZooKeeperCommon.h>
#include <Common/ZooKeeper/ZooKeeperIO.h>
#include <Common/Stopwatch.h>
#include <IO/WriteHelpers.h>
#include <IO/WriteBufferFromString.h>
#include <IO/Operators.h>

View File

@ -1230,6 +1230,7 @@ void ZooKeeper::setZooKeeperLog(std::shared_ptr<DB::ZooKeeperLog> zk_log_)
std::atomic_store(&zk_log, std::move(zk_log_));
}
#ifdef ZOOKEEPER_LOG
void ZooKeeper::logOperationIfNeeded(const ZooKeeperRequestPtr & request, const ZooKeeperResponsePtr & response, bool finalize)
{
auto maybe_zk_log = std::atomic_load(&zk_log);
@ -1271,5 +1272,9 @@ void ZooKeeper::logOperationIfNeeded(const ZooKeeperRequestPtr & request, const
maybe_zk_log->add(elem);
}
}
#else
void ZooKeeper::logOperationIfNeeded(const ZooKeeperRequestPtr &, const ZooKeeperResponsePtr &, bool)
{}
#endif
}

View File

@ -1,14 +1,14 @@
add_executable(zkutil_test_commands zkutil_test_commands.cpp)
target_link_libraries(zkutil_test_commands PRIVATE clickhouse_common_zookeeper)
target_link_libraries(zkutil_test_commands PRIVATE clickhouse_common_zookeeper_no_log)
add_executable(zkutil_test_commands_new_lib zkutil_test_commands_new_lib.cpp)
target_link_libraries(zkutil_test_commands_new_lib PRIVATE clickhouse_common_zookeeper string_utils)
target_link_libraries(zkutil_test_commands_new_lib PRIVATE clickhouse_common_zookeeper_no_log string_utils)
add_executable(zkutil_test_async zkutil_test_async.cpp)
target_link_libraries(zkutil_test_async PRIVATE clickhouse_common_zookeeper)
target_link_libraries(zkutil_test_async PRIVATE clickhouse_common_zookeeper_no_log)
add_executable (zk_many_watches_reconnect zk_many_watches_reconnect.cpp)
target_link_libraries (zk_many_watches_reconnect PRIVATE clickhouse_common_zookeeper clickhouse_common_config)
target_link_libraries (zk_many_watches_reconnect PRIVATE clickhouse_common_zookeeper_no_log clickhouse_common_config)
add_executable (zookeeper_impl zookeeper_impl.cpp)
target_link_libraries (zookeeper_impl PRIVATE clickhouse_common_zookeeper)
target_link_libraries (zookeeper_impl PRIVATE clickhouse_common_zookeeper_no_log)

View File

@ -9,6 +9,8 @@
#include <Common/getMaxFileDescriptorCount.h>
#include <Common/StringUtils/StringUtils.h>
#include <Coordination/Keeper4LWInfo.h>
#include <IO/WriteHelpers.h>
#include <IO/Operators.h>
#include <unistd.h>

View File

@ -1,16 +1,18 @@
#include <Coordination/KeeperStorage.h>
#include <Common/ZooKeeper/IKeeper.h>
#include <Common/setThreadName.h>
#include <mutex>
#include <functional>
#include <base/logger_useful.h>
#include <Common/StringUtils/StringUtils.h>
#include <sstream>
#include <iomanip>
#include <Common/hex.h>
#include <IO/WriteHelpers.h>
#include <IO/Operators.h>
#include <Poco/SHA1Engine.h>
#include <Poco/Base64Encoder.h>
#include <boost/algorithm/string.hpp>
#include <Common/hex.h>
#include <sstream>
#include <iomanip>
#include <mutex>
#include <functional>
#include <base/logger_useful.h>
namespace DB
{

View File

@ -6,6 +6,7 @@
#include <Coordination/SessionExpiryQueue.h>
#include <Coordination/ACLMap.h>
#include <Coordination/SnapshotableHashTable.h>
#include <IO/WriteBufferFromString.h>
#include <unordered_map>
#include <unordered_set>
#include <vector>

View File

@ -16,6 +16,7 @@
#include <Common/Arena.h>
#include <Common/ArenaWithFreeLists.h>
#include <Common/MemorySanitizer.h>
#include <Common/CurrentMetrics.h>
#include <Common/HashTable/HashMap.h>
#include <IO/AIO.h>
#include <IO/BufferWithOwnMemory.h>

View File

@ -8,7 +8,7 @@
#include <string.h>
#include <Common/Exception.h>
#include <Common/MemoryTracker.h>
#include <Common/LockMemoryExceptionInThread.h>
#include <IO/BufferBase.h>
@ -116,7 +116,7 @@ public:
return;
/// finalize() is often called from destructors.
MemoryTracker::LockExceptionInThread lock(VariableContext::Global);
LockMemoryExceptionInThread lock(VariableContext::Global);
try
{
finalizeImpl();

View File

@ -3,6 +3,8 @@
#include <Interpreters/SystemLog.h>
#include <Common/ProfileEvents.h>
#include <Common/CurrentMetrics.h>
#include <Core/NamesAndTypes.h>
#include <Core/NamesAndAliases.h>
#include <vector>
#include <atomic>

View File

@ -63,6 +63,7 @@
#include <Interpreters/DDLWorker.h>
#include <Interpreters/DDLTask.h>
#include <Interpreters/Session.h>
#include <Interpreters/TraceCollector.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/UncompressedCache.h>
#include <IO/MMappedFileCache.h>
@ -74,7 +75,6 @@
#include <Common/Config/AbstractConfigurationComparison.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/ShellCommand.h>
#include <Common/TraceCollector.h>
#include <base/logger_useful.h>
#include <base/EnumReflection.h>
#include <Common/RemoteHostFilter.h>

View File

@ -1,6 +1,8 @@
#pragma once
#include <Interpreters/SystemLog.h>
#include <Core/NamesAndTypes.h>
#include <Core/NamesAndAliases.h>
/// Call this function on crash.

View File

@ -1,5 +1,6 @@
#include <Interpreters/IInterpreter.h>
#include <Interpreters/QueryLog.h>
#include <Interpreters/Context.h>
namespace DB
{

View File

@ -3,6 +3,7 @@
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPlan/BuildQueryPipelineSettings.h>
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
namespace DB
{

View File

@ -13,6 +13,7 @@
#include <Parsers/ASTAlterQuery.h>
#include <Parsers/ASTAssignment.h>
#include <Parsers/ASTIdentifier_fwd.h>
#include <Parsers/ASTColumnDeclaration.h>
#include <Storages/AlterCommands.h>
#include <Storages/IStorage.h>
#include <Storages/LiveView/LiveViewCommands.h>

View File

@ -39,6 +39,7 @@
#include <Interpreters/InterpreterCreateQuery.h>
#include <Interpreters/InterpreterSelectWithUnionQuery.h>
#include <Interpreters/InterpreterInsertQuery.h>
#include <Interpreters/InterpreterRenameQuery.h>
#include <Interpreters/AddDefaultDatabaseVisitor.h>
#include <Access/Common/AccessRightsElement.h>

View File

@ -6,6 +6,7 @@
#include <Interpreters/InDepthNodeVisitor.h>
#include <Interpreters/InterpreterSelectWithUnionQuery.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <Interpreters/InterpreterInsertQuery.h>
#include <Interpreters/Context.h>
#include <Interpreters/TableOverrideUtils.h>
#include <Formats/FormatFactory.h>
@ -15,6 +16,7 @@
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTSelectQuery.h>
#include <Parsers/ASTSelectWithUnionQuery.h>
#include <Parsers/ASTSetQuery.h>
#include <Storages/StorageView.h>
#include <Processors/QueryPlan/QueryPlan.h>

View File

@ -16,6 +16,7 @@ limitations under the License. */
#include <Interpreters/Context.h>
#include <Access/Common/AccessFlags.h>
#include <QueryPipeline/StreamLocalLimits.h>
#include <Storages/IStorage.h>
namespace DB

View File

@ -15,7 +15,7 @@ limitations under the License. */
#include <QueryPipeline/BlockIO.h>
#include <Interpreters/IInterpreter.h>
#include <Parsers/IAST_fwd.h>
#include <Storages/IStorage.h>
#include <Storages/IStorage_fwd.h>
#include <Storages/SelectQueryInfo.h>
namespace DB

View File

@ -3,6 +3,8 @@
#include <Interpreters/SystemLog.h>
#include <Common/ProfileEvents.h>
#include <Common/CurrentMetrics.h>
#include <Core/NamesAndTypes.h>
#include <Core/NamesAndAliases.h>
#include <vector>
#include <atomic>

View File

@ -8,8 +8,10 @@
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeMap.h>
#include <DataTypes/DataTypeUUID.h>
#include <Interpreters/Context.h>
#include <Common/hex.h>
#include <Common/CurrentThread.h>
namespace DB

View File

@ -1,6 +1,8 @@
#pragma once
#include <Interpreters/SystemLog.h>
#include <Core/NamesAndTypes.h>
#include <Core/NamesAndAliases.h>
namespace DB
{

View File

@ -1,6 +1,8 @@
#pragma once
#include <Interpreters/SystemLog.h>
#include <Core/NamesAndTypes.h>
#include <Core/NamesAndAliases.h>
namespace DB

View File

@ -1,6 +1,8 @@
#pragma once
#include <Core/NamesAndTypes.h>
#include <Core/NamesAndAliases.h>
#include <Core/Settings.h>
#include <Interpreters/SystemLog.h>
#include <Interpreters/ClientInfo.h>

View File

@ -2,6 +2,8 @@
#include <Interpreters/SystemLog.h>
#include <Interpreters/ClientInfo.h>
#include <Core/NamesAndTypes.h>
#include <Core/NamesAndAliases.h>
namespace ProfileEvents

View File

@ -9,6 +9,8 @@
#include <Core/SettingsEnums.h>
#include <Core/Types.h>
#include <Core/UUID.h>
#include <Core/NamesAndTypes.h>
#include <Core/NamesAndAliases.h>
#include <Interpreters/SystemLog.h>
#include <base/types.h>

View File

@ -21,6 +21,7 @@
#include <Columns/ColumnString.h>
#include <Columns/ColumnTuple.h>
#include <Access/SettingsProfilesInfo.h>
#include <Interpreters/Context.h>
#include <cassert>

View File

@ -3,6 +3,9 @@
#include <Interpreters/SystemLog.h>
#include <Interpreters/ClientInfo.h>
#include <Access/Common/AuthenticationData.h>
#include <Core/NamesAndTypes.h>
#include <Core/NamesAndAliases.h>
#include <Columns/IColumn.h>
namespace DB
{

View File

@ -10,17 +10,38 @@
#include <Interpreters/TextLog.h>
#include <Interpreters/TraceLog.h>
#include <Interpreters/ZooKeeperLog.h>
#include <Interpreters/InterpreterCreateQuery.h>
#include <Interpreters/InterpreterRenameQuery.h>
#include <Interpreters/InterpreterInsertQuery.h>
#include <Interpreters/Context.h>
#include <Processors/Executors/PushingPipelineExecutor.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/parseQuery.h>
#include <Parsers/ParserCreateQuery.h>
#include <Parsers/ASTRenameQuery.h>
#include <Parsers/formatAST.h>
#include <Parsers/ASTIndexDeclaration.h>
#include <Parsers/ASTInsertQuery.h>
#include <Storages/IStorage.h>
#include <Common/setThreadName.h>
#include <Common/MemoryTrackerBlockerInThread.h>
#include <IO/WriteHelpers.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <base/logger_useful.h>
#include <base/scope_guard.h>
#define DBMS_SYSTEM_LOG_QUEUE_SIZE 1048576
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int TIMEOUT_EXCEEDED;
extern const int LOGICAL_ERROR;
}
namespace
@ -96,6 +117,9 @@ std::shared_ptr<TSystemLog> createSystemLog(
}
///
/// ISystemLog
///
ASTPtr ISystemLog::getCreateTableQueryClean(const StorageID & table_id, ContextPtr context)
{
DatabasePtr database = DatabaseCatalog::instance().getDatabase(table_id.database_name);
@ -111,7 +135,40 @@ ASTPtr ISystemLog::getCreateTableQueryClean(const StorageID & table_id, ContextP
return old_ast;
}
void ISystemLog::stopFlushThread()
{
{
std::lock_guard lock(mutex);
if (!saving_thread.joinable())
{
return;
}
if (is_shutdown)
{
return;
}
is_shutdown = true;
/// Tell thread to shutdown.
flush_event.notify_all();
}
saving_thread.join();
}
void ISystemLog::startup()
{
std::lock_guard lock(mutex);
saving_thread = ThreadFromGlobalPool([this] { savingThreadFunction(); });
}
///
/// SystemLogs
///
SystemLogs::SystemLogs(ContextPtr global_context, const Poco::Util::AbstractConfiguration & config)
{
query_log = createSystemLog<QueryLog>(global_context, "system", "query_log", config, "query_log");
@ -193,4 +250,392 @@ void SystemLogs::shutdown()
log->shutdown();
}
///
/// SystemLog
///
template <typename LogElement>
SystemLog<LogElement>::SystemLog(
ContextPtr context_,
const String & database_name_,
const String & table_name_,
const String & storage_def_,
size_t flush_interval_milliseconds_)
: WithContext(context_)
, table_id(database_name_, table_name_)
, storage_def(storage_def_)
, create_query(serializeAST(*getCreateTableQuery()))
, flush_interval_milliseconds(flush_interval_milliseconds_)
{
assert(database_name_ == DatabaseCatalog::SYSTEM_DATABASE);
log = &Poco::Logger::get("SystemLog (" + database_name_ + "." + table_name_ + ")");
}
static thread_local bool recursive_add_call = false;
template <typename LogElement>
void SystemLog<LogElement>::add(const LogElement & element)
{
/// It is possible that the method will be called recursively.
/// Better to drop these events to avoid complications.
if (recursive_add_call)
return;
recursive_add_call = true;
SCOPE_EXIT({ recursive_add_call = false; });
/// Memory can be allocated while resizing on queue.push_back.
/// The size of allocation can be in order of a few megabytes.
/// But this should not be accounted for query memory usage.
/// Otherwise the tests like 01017_uniqCombined_memory_usage.sql will be flacky.
MemoryTrackerBlockerInThread temporarily_disable_memory_tracker(VariableContext::Global);
/// Should not log messages under mutex.
bool queue_is_half_full = false;
{
std::unique_lock lock(mutex);
if (is_shutdown)
return;
if (queue.size() == DBMS_SYSTEM_LOG_QUEUE_SIZE / 2)
{
queue_is_half_full = true;
// The queue more than half full, time to flush.
// We only check for strict equality, because messages are added one
// by one, under exclusive lock, so we will see each message count.
// It is enough to only wake the flushing thread once, after the message
// count increases past half available size.
const uint64_t queue_end = queue_front_index + queue.size();
if (requested_flush_up_to < queue_end)
requested_flush_up_to = queue_end;
flush_event.notify_all();
}
if (queue.size() >= DBMS_SYSTEM_LOG_QUEUE_SIZE)
{
// Ignore all further entries until the queue is flushed.
// Log a message about that. Don't spam it -- this might be especially
// problematic in case of trace log. Remember what the front index of the
// queue was when we last logged the message. If it changed, it means the
// queue was flushed, and we can log again.
if (queue_front_index != logged_queue_full_at_index)
{
logged_queue_full_at_index = queue_front_index;
// TextLog sets its logger level to 0, so this log is a noop and
// there is no recursive logging.
lock.unlock();
LOG_ERROR(log, "Queue is full for system log '{}' at {}", demangle(typeid(*this).name()), queue_front_index);
}
return;
}
queue.push_back(element);
}
if (queue_is_half_full)
LOG_INFO(log, "Queue is half full for system log '{}'.", demangle(typeid(*this).name()));
}
template <typename LogElement>
void SystemLog<LogElement>::shutdown()
{
stopFlushThread();
auto table = DatabaseCatalog::instance().tryGetTable(table_id, getContext());
if (table)
table->flushAndShutdown();
}
template <typename LogElement>
void SystemLog<LogElement>::flush(bool force)
{
uint64_t this_thread_requested_offset;
{
std::unique_lock lock(mutex);
if (is_shutdown)
return;
this_thread_requested_offset = queue_front_index + queue.size();
// Publish our flush request, taking care not to overwrite the requests
// made by other threads.
is_force_prepare_tables |= force;
requested_flush_up_to = std::max(requested_flush_up_to,
this_thread_requested_offset);
flush_event.notify_all();
}
LOG_DEBUG(log, "Requested flush up to offset {}",
this_thread_requested_offset);
// Use an arbitrary timeout to avoid endless waiting. 60s proved to be
// too fast for our parallel functional tests, probably because they
// heavily load the disk.
const int timeout_seconds = 180;
std::unique_lock lock(mutex);
bool result = flush_event.wait_for(lock, std::chrono::seconds(timeout_seconds),
[&] { return flushed_up_to >= this_thread_requested_offset
&& !is_force_prepare_tables; });
if (!result)
{
throw Exception("Timeout exceeded (" + toString(timeout_seconds) + " s) while flushing system log '" + demangle(typeid(*this).name()) + "'.",
ErrorCodes::TIMEOUT_EXCEEDED);
}
}
template <typename LogElement>
void SystemLog<LogElement>::savingThreadFunction()
{
setThreadName("SystemLogFlush");
std::vector<LogElement> to_flush;
bool exit_this_thread = false;
while (!exit_this_thread)
{
try
{
// The end index (exclusive, like std end()) of the messages we are
// going to flush.
uint64_t to_flush_end = 0;
// Should we prepare table even if there are no new messages.
bool should_prepare_tables_anyway = false;
{
std::unique_lock lock(mutex);
flush_event.wait_for(lock,
std::chrono::milliseconds(flush_interval_milliseconds),
[&] ()
{
return requested_flush_up_to > flushed_up_to || is_shutdown || is_force_prepare_tables;
}
);
queue_front_index += queue.size();
to_flush_end = queue_front_index;
// Swap with existing array from previous flush, to save memory
// allocations.
to_flush.resize(0);
queue.swap(to_flush);
should_prepare_tables_anyway = is_force_prepare_tables;
exit_this_thread = is_shutdown;
}
if (to_flush.empty())
{
if (should_prepare_tables_anyway)
{
prepareTable();
LOG_TRACE(log, "Table created (force)");
std::lock_guard lock(mutex);
is_force_prepare_tables = false;
flush_event.notify_all();
}
}
else
{
flushImpl(to_flush, to_flush_end);
}
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
LOG_TRACE(log, "Terminating");
}
template <typename LogElement>
void SystemLog<LogElement>::flushImpl(const std::vector<LogElement> & to_flush, uint64_t to_flush_end)
{
try
{
LOG_TRACE(log, "Flushing system log, {} entries to flush up to offset {}",
to_flush.size(), to_flush_end);
/// We check for existence of the table and create it as needed at every
/// flush. This is done to allow user to drop the table at any moment
/// (new empty table will be created automatically). BTW, flush method
/// is called from single thread.
prepareTable();
ColumnsWithTypeAndName log_element_columns;
auto log_element_names_and_types = LogElement::getNamesAndTypes();
for (const auto & name_and_type : log_element_names_and_types)
log_element_columns.emplace_back(name_and_type.type, name_and_type.name);
Block block(std::move(log_element_columns));
MutableColumns columns = block.mutateColumns();
for (const auto & elem : to_flush)
elem.appendToBlock(columns);
block.setColumns(std::move(columns));
/// We write to table indirectly, using InterpreterInsertQuery.
/// This is needed to support DEFAULT-columns in table.
std::unique_ptr<ASTInsertQuery> insert = std::make_unique<ASTInsertQuery>();
insert->table_id = table_id;
ASTPtr query_ptr(insert.release());
// we need query context to do inserts to target table with MV containing subqueries or joins
auto insert_context = Context::createCopy(context);
insert_context->makeQueryContext();
InterpreterInsertQuery interpreter(query_ptr, insert_context);
BlockIO io = interpreter.execute();
PushingPipelineExecutor executor(io.pipeline);
executor.start();
executor.push(block);
executor.finish();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
{
std::lock_guard lock(mutex);
flushed_up_to = to_flush_end;
is_force_prepare_tables = false;
flush_event.notify_all();
}
LOG_TRACE(log, "Flushed system log up to offset {}", to_flush_end);
}
template <typename LogElement>
void SystemLog<LogElement>::prepareTable()
{
String description = table_id.getNameForLogs();
auto table = DatabaseCatalog::instance().tryGetTable(table_id, getContext());
if (table)
{
if (old_create_query.empty())
{
old_create_query = serializeAST(*getCreateTableQueryClean(table_id, getContext()));
if (old_create_query.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Empty CREATE QUERY for {}", backQuoteIfNeed(table_id.table_name));
}
if (old_create_query != create_query)
{
/// Rename the existing table.
int suffix = 0;
while (DatabaseCatalog::instance().isTableExist(
{table_id.database_name, table_id.table_name + "_" + toString(suffix)}, getContext()))
++suffix;
auto rename = std::make_shared<ASTRenameQuery>();
ASTRenameQuery::Table from;
from.database = table_id.database_name;
from.table = table_id.table_name;
ASTRenameQuery::Table to;
to.database = table_id.database_name;
to.table = table_id.table_name + "_" + toString(suffix);
ASTRenameQuery::Element elem;
elem.from = from;
elem.to = to;
rename->elements.emplace_back(elem);
LOG_DEBUG(
log,
"Existing table {} for system log has obsolete or different structure. Renaming it to {}.\nOld: {}\nNew: {}\n.",
description,
backQuoteIfNeed(to.table),
old_create_query,
create_query);
auto query_context = Context::createCopy(context);
query_context->makeQueryContext();
InterpreterRenameQuery(rename, query_context).execute();
/// The required table will be created.
table = nullptr;
}
else if (!is_prepared)
LOG_DEBUG(log, "Will use existing table {} for {}", description, LogElement::name());
}
if (!table)
{
/// Create the table.
LOG_DEBUG(log, "Creating new table {} for {}", description, LogElement::name());
auto query_context = Context::createCopy(context);
query_context->makeQueryContext();
auto create_query_ast = getCreateTableQuery();
InterpreterCreateQuery interpreter(create_query_ast, query_context);
interpreter.setInternal(true);
interpreter.execute();
table = DatabaseCatalog::instance().getTable(table_id, getContext());
old_create_query.clear();
}
is_prepared = true;
}
template <typename LogElement>
ASTPtr SystemLog<LogElement>::getCreateTableQuery()
{
auto create = std::make_shared<ASTCreateQuery>();
create->setDatabase(table_id.database_name);
create->setTable(table_id.table_name);
auto ordinary_columns = LogElement::getNamesAndTypes();
auto alias_columns = LogElement::getNamesAndAliases();
auto new_columns_list = std::make_shared<ASTColumns>();
new_columns_list->set(new_columns_list->columns, InterpreterCreateQuery::formatColumns(ordinary_columns, alias_columns));
create->set(create->columns_list, new_columns_list);
ParserStorage storage_parser;
ASTPtr storage_ast = parseQuery(
storage_parser, storage_def.data(), storage_def.data() + storage_def.size(),
"Storage to create table for " + LogElement::name(), 0, DBMS_DEFAULT_MAX_PARSER_DEPTH);
create->set(create->storage, storage_ast);
return create;
}
template class SystemLog<AsynchronousMetricLogElement>;
template class SystemLog<CrashLogElement>;
template class SystemLog<MetricLogElement>;
template class SystemLog<OpenTelemetrySpanLogElement>;
template class SystemLog<PartLogElement>;
template class SystemLog<QueryLogElement>;
template class SystemLog<QueryThreadLogElement>;
template class SystemLog<QueryViewsLogElement>;
template class SystemLog<SessionLogElement>;
template class SystemLog<TraceLogElement>;
template class SystemLog<ZooKeeperLogElement>;
template class SystemLog<TextLogElement>;
}

View File

@ -4,32 +4,27 @@
#include <atomic>
#include <memory>
#include <vector>
#include <condition_variable>
#include <boost/noncopyable.hpp>
#include <base/logger_useful.h>
#include <base/scope_guard.h>
#include <base/types.h>
#include <Core/Defines.h>
#include <Storages/IStorage.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/parseQuery.h>
#include <Parsers/ParserCreateQuery.h>
#include <Parsers/ASTRenameQuery.h>
#include <Parsers/formatAST.h>
#include <Parsers/ASTIndexDeclaration.h>
#include <Parsers/ASTInsertQuery.h>
#include <Processors/Executors/PushingPipelineExecutor.h>
#include <Interpreters/InterpreterCreateQuery.h>
#include <Interpreters/InterpreterRenameQuery.h>
#include <Interpreters/InterpreterInsertQuery.h>
#include <Interpreters/Context.h>
#include <Common/setThreadName.h>
#include <Storages/IStorage_fwd.h>
#include <Interpreters/Context_fwd.h>
#include <Interpreters/StorageID.h>
#include <Parsers/IAST_fwd.h>
#include <Common/ThreadPool.h>
#include <IO/WriteHelpers.h>
#include <Poco/Util/AbstractConfiguration.h>
namespace Poco
{
class Logger;
namespace Util
{
class AbstractConfiguration;
}
}
namespace DB
{
@ -58,14 +53,6 @@ namespace DB
};
*/
namespace ErrorCodes
{
extern const int TIMEOUT_EXCEEDED;
extern const int LOGICAL_ERROR;
}
#define DBMS_SYSTEM_LOG_QUEUE_SIZE 1048576
class QueryLog;
class QueryThreadLog;
class PartLog;
@ -87,15 +74,33 @@ public:
//// force -- force table creation (used for SYSTEM FLUSH LOGS)
virtual void flush(bool force = false) = 0;
virtual void prepareTable() = 0;
virtual void startup() = 0;
/// Start the background thread.
virtual void startup();
/// Stop the background flush thread before destructor. No more data will be written.
virtual void shutdown() = 0;
virtual ~ISystemLog() = default;
virtual void savingThreadFunction() = 0;
/// returns CREATE TABLE query, but with removed:
/// - UUID
/// - SETTINGS (for MergeTree)
/// That way it can be used to compare with the SystemLog::getCreateTableQuery()
static ASTPtr getCreateTableQueryClean(const StorageID & table_id, ContextPtr context);
protected:
ThreadFromGlobalPool saving_thread;
/// Data shared between callers of add()/flush()/shutdown(), and the saving thread
std::mutex mutex;
bool is_shutdown = false;
std::condition_variable flush_event;
void stopFlushThread();
};
@ -156,23 +161,10 @@ public:
*/
void add(const LogElement & element);
void stopFlushThread();
void shutdown() override;
/// Flush data in the buffer to disk
void flush(bool force = false) override;
/// Start the background thread.
void startup() override;
/// Stop the background flush thread before destructor. No more data will be written.
void shutdown() override
{
stopFlushThread();
auto table = DatabaseCatalog::instance().tryGetTable(table_id, getContext());
if (table)
table->flushAndShutdown();
}
void flush(bool force) override;
String getName() override
{
@ -192,10 +184,7 @@ private:
String old_create_query;
bool is_prepared = false;
const size_t flush_interval_milliseconds;
ThreadFromGlobalPool saving_thread;
/* Data shared between callers of add()/flush()/shutdown(), and the saving thread */
std::mutex mutex;
// Queue is bounded. But its size is quite large to not block in all normal cases.
std::vector<LogElement> queue;
// An always-incrementing index of the first message currently in the queue.
@ -203,10 +192,8 @@ private:
// can wait until a particular message is flushed. This is used to implement
// synchronous log flushing for SYSTEM FLUSH LOGS.
uint64_t queue_front_index = 0;
bool is_shutdown = false;
// A flag that says we must create the tables even if the queue is empty.
bool is_force_prepare_tables = false;
std::condition_variable flush_event;
// Requested to flush logs up to this index, exclusive
uint64_t requested_flush_up_to = 0;
// Flushed log up to this index, exclusive
@ -214,7 +201,7 @@ private:
// Logged overflow message at this queue front index
uint64_t logged_queue_full_at_index = -1;
void savingThreadFunction();
void savingThreadFunction() override;
/** Creates new table if it does not exist.
* Renames old table if its structure is not suitable.
@ -226,403 +213,4 @@ private:
void flushImpl(const std::vector<LogElement> & to_flush, uint64_t to_flush_end);
};
template <typename LogElement>
SystemLog<LogElement>::SystemLog(
ContextPtr context_,
const String & database_name_,
const String & table_name_,
const String & storage_def_,
size_t flush_interval_milliseconds_)
: WithContext(context_)
, table_id(database_name_, table_name_)
, storage_def(storage_def_)
, create_query(serializeAST(*getCreateTableQuery()))
, flush_interval_milliseconds(flush_interval_milliseconds_)
{
assert(database_name_ == DatabaseCatalog::SYSTEM_DATABASE);
log = &Poco::Logger::get("SystemLog (" + database_name_ + "." + table_name_ + ")");
}
template <typename LogElement>
void SystemLog<LogElement>::startup()
{
std::lock_guard lock(mutex);
saving_thread = ThreadFromGlobalPool([this] { savingThreadFunction(); });
}
static thread_local bool recursive_add_call = false;
template <typename LogElement>
void SystemLog<LogElement>::add(const LogElement & element)
{
/// It is possible that the method will be called recursively.
/// Better to drop these events to avoid complications.
if (recursive_add_call)
return;
recursive_add_call = true;
SCOPE_EXIT({ recursive_add_call = false; });
/// Memory can be allocated while resizing on queue.push_back.
/// The size of allocation can be in order of a few megabytes.
/// But this should not be accounted for query memory usage.
/// Otherwise the tests like 01017_uniqCombined_memory_usage.sql will be flacky.
MemoryTracker::BlockerInThread temporarily_disable_memory_tracker(VariableContext::Global);
/// Should not log messages under mutex.
bool queue_is_half_full = false;
{
std::unique_lock lock(mutex);
if (is_shutdown)
return;
if (queue.size() == DBMS_SYSTEM_LOG_QUEUE_SIZE / 2)
{
queue_is_half_full = true;
// The queue more than half full, time to flush.
// We only check for strict equality, because messages are added one
// by one, under exclusive lock, so we will see each message count.
// It is enough to only wake the flushing thread once, after the message
// count increases past half available size.
const uint64_t queue_end = queue_front_index + queue.size();
if (requested_flush_up_to < queue_end)
requested_flush_up_to = queue_end;
flush_event.notify_all();
}
if (queue.size() >= DBMS_SYSTEM_LOG_QUEUE_SIZE)
{
// Ignore all further entries until the queue is flushed.
// Log a message about that. Don't spam it -- this might be especially
// problematic in case of trace log. Remember what the front index of the
// queue was when we last logged the message. If it changed, it means the
// queue was flushed, and we can log again.
if (queue_front_index != logged_queue_full_at_index)
{
logged_queue_full_at_index = queue_front_index;
// TextLog sets its logger level to 0, so this log is a noop and
// there is no recursive logging.
lock.unlock();
LOG_ERROR(log, "Queue is full for system log '{}' at {}", demangle(typeid(*this).name()), queue_front_index);
}
return;
}
queue.push_back(element);
}
if (queue_is_half_full)
LOG_INFO(log, "Queue is half full for system log '{}'.", demangle(typeid(*this).name()));
}
template <typename LogElement>
void SystemLog<LogElement>::flush(bool force)
{
uint64_t this_thread_requested_offset;
{
std::unique_lock lock(mutex);
if (is_shutdown)
return;
this_thread_requested_offset = queue_front_index + queue.size();
// Publish our flush request, taking care not to overwrite the requests
// made by other threads.
is_force_prepare_tables |= force;
requested_flush_up_to = std::max(requested_flush_up_to,
this_thread_requested_offset);
flush_event.notify_all();
}
LOG_DEBUG(log, "Requested flush up to offset {}",
this_thread_requested_offset);
// Use an arbitrary timeout to avoid endless waiting. 60s proved to be
// too fast for our parallel functional tests, probably because they
// heavily load the disk.
const int timeout_seconds = 180;
std::unique_lock lock(mutex);
bool result = flush_event.wait_for(lock, std::chrono::seconds(timeout_seconds),
[&] { return flushed_up_to >= this_thread_requested_offset
&& !is_force_prepare_tables; });
if (!result)
{
throw Exception("Timeout exceeded (" + toString(timeout_seconds) + " s) while flushing system log '" + demangle(typeid(*this).name()) + "'.",
ErrorCodes::TIMEOUT_EXCEEDED);
}
}
template <typename LogElement>
void SystemLog<LogElement>::stopFlushThread()
{
{
std::lock_guard lock(mutex);
if (!saving_thread.joinable())
{
return;
}
if (is_shutdown)
{
return;
}
is_shutdown = true;
/// Tell thread to shutdown.
flush_event.notify_all();
}
saving_thread.join();
}
template <typename LogElement>
void SystemLog<LogElement>::savingThreadFunction()
{
setThreadName("SystemLogFlush");
std::vector<LogElement> to_flush;
bool exit_this_thread = false;
while (!exit_this_thread)
{
try
{
// The end index (exclusive, like std end()) of the messages we are
// going to flush.
uint64_t to_flush_end = 0;
// Should we prepare table even if there are no new messages.
bool should_prepare_tables_anyway = false;
{
std::unique_lock lock(mutex);
flush_event.wait_for(lock,
std::chrono::milliseconds(flush_interval_milliseconds),
[&] ()
{
return requested_flush_up_to > flushed_up_to || is_shutdown || is_force_prepare_tables;
}
);
queue_front_index += queue.size();
to_flush_end = queue_front_index;
// Swap with existing array from previous flush, to save memory
// allocations.
to_flush.resize(0);
queue.swap(to_flush);
should_prepare_tables_anyway = is_force_prepare_tables;
exit_this_thread = is_shutdown;
}
if (to_flush.empty())
{
if (should_prepare_tables_anyway)
{
prepareTable();
LOG_TRACE(log, "Table created (force)");
std::lock_guard lock(mutex);
is_force_prepare_tables = false;
flush_event.notify_all();
}
}
else
{
flushImpl(to_flush, to_flush_end);
}
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
LOG_TRACE(log, "Terminating");
}
template <typename LogElement>
void SystemLog<LogElement>::flushImpl(const std::vector<LogElement> & to_flush, uint64_t to_flush_end)
{
try
{
LOG_TRACE(log, "Flushing system log, {} entries to flush up to offset {}",
to_flush.size(), to_flush_end);
/// We check for existence of the table and create it as needed at every
/// flush. This is done to allow user to drop the table at any moment
/// (new empty table will be created automatically). BTW, flush method
/// is called from single thread.
prepareTable();
ColumnsWithTypeAndName log_element_columns;
auto log_element_names_and_types = LogElement::getNamesAndTypes();
for (auto name_and_type : log_element_names_and_types)
log_element_columns.emplace_back(name_and_type.type, name_and_type.name);
Block block(std::move(log_element_columns));
MutableColumns columns = block.mutateColumns();
for (const auto & elem : to_flush)
elem.appendToBlock(columns);
block.setColumns(std::move(columns));
/// We write to table indirectly, using InterpreterInsertQuery.
/// This is needed to support DEFAULT-columns in table.
std::unique_ptr<ASTInsertQuery> insert = std::make_unique<ASTInsertQuery>();
insert->table_id = table_id;
ASTPtr query_ptr(insert.release());
// we need query context to do inserts to target table with MV containing subqueries or joins
auto insert_context = Context::createCopy(context);
insert_context->makeQueryContext();
InterpreterInsertQuery interpreter(query_ptr, insert_context);
BlockIO io = interpreter.execute();
PushingPipelineExecutor executor(io.pipeline);
executor.start();
executor.push(block);
executor.finish();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
{
std::lock_guard lock(mutex);
flushed_up_to = to_flush_end;
is_force_prepare_tables = false;
flush_event.notify_all();
}
LOG_TRACE(log, "Flushed system log up to offset {}", to_flush_end);
}
template <typename LogElement>
void SystemLog<LogElement>::prepareTable()
{
String description = table_id.getNameForLogs();
auto table = DatabaseCatalog::instance().tryGetTable(table_id, getContext());
if (table)
{
if (old_create_query.empty())
{
old_create_query = serializeAST(*getCreateTableQueryClean(table_id, getContext()));
if (old_create_query.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Empty CREATE QUERY for {}", backQuoteIfNeed(table_id.table_name));
}
if (old_create_query != create_query)
{
/// Rename the existing table.
int suffix = 0;
while (DatabaseCatalog::instance().isTableExist(
{table_id.database_name, table_id.table_name + "_" + toString(suffix)}, getContext()))
++suffix;
auto rename = std::make_shared<ASTRenameQuery>();
ASTRenameQuery::Table from;
from.database = table_id.database_name;
from.table = table_id.table_name;
ASTRenameQuery::Table to;
to.database = table_id.database_name;
to.table = table_id.table_name + "_" + toString(suffix);
ASTRenameQuery::Element elem;
elem.from = from;
elem.to = to;
rename->elements.emplace_back(elem);
LOG_DEBUG(
log,
"Existing table {} for system log has obsolete or different structure. Renaming it to {}.\nOld: {}\nNew: {}\n.",
description,
backQuoteIfNeed(to.table),
old_create_query,
create_query);
auto query_context = Context::createCopy(context);
query_context->makeQueryContext();
InterpreterRenameQuery(rename, query_context).execute();
/// The required table will be created.
table = nullptr;
}
else if (!is_prepared)
LOG_DEBUG(log, "Will use existing table {} for {}", description, LogElement::name());
}
if (!table)
{
/// Create the table.
LOG_DEBUG(log, "Creating new table {} for {}", description, LogElement::name());
auto query_context = Context::createCopy(context);
query_context->makeQueryContext();
auto create_query_ast = getCreateTableQuery();
InterpreterCreateQuery interpreter(create_query_ast, query_context);
interpreter.setInternal(true);
interpreter.execute();
table = DatabaseCatalog::instance().getTable(table_id, getContext());
old_create_query.clear();
}
is_prepared = true;
}
template <typename LogElement>
ASTPtr SystemLog<LogElement>::getCreateTableQuery()
{
auto create = std::make_shared<ASTCreateQuery>();
create->setDatabase(table_id.database_name);
create->setTable(table_id.table_name);
auto ordinary_columns = LogElement::getNamesAndTypes();
auto alias_columns = LogElement::getNamesAndAliases();
auto new_columns_list = std::make_shared<ASTColumns>();
new_columns_list->set(new_columns_list->columns, InterpreterCreateQuery::formatColumns(ordinary_columns, alias_columns));
create->set(create->columns_list, new_columns_list);
ParserStorage storage_parser;
ASTPtr storage_ast = parseQuery(
storage_parser, storage_def.data(), storage_def.data() + storage_def.size(),
"Storage to create table for " + LogElement::name(), 0, DBMS_DEFAULT_MAX_PARSER_DEPTH);
create->set(create->storage, storage_ast);
return create;
}
}

View File

@ -8,6 +8,7 @@
#include <DataTypes/DataTypeLowCardinality.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <base/logger_useful.h>
#include <array>

View File

@ -1,5 +1,9 @@
#pragma once
#include <Interpreters/SystemLog.h>
#include <Core/NamesAndTypes.h>
#include <Core/NamesAndAliases.h>
#include <Poco/Message.h>
namespace DB
{

View File

@ -7,6 +7,7 @@
#include <Interpreters/ProcessList.h>
#include <Interpreters/QueryThreadLog.h>
#include <Interpreters/QueryViewsLog.h>
#include <Interpreters/TraceCollector.h>
#include <Parsers/formatAST.h>
#include <Common/CurrentThread.h>
#include <Common/Exception.h>
@ -14,7 +15,8 @@
#include <Common/QueryProfiler.h>
#include <Common/SensitiveDataMasker.h>
#include <Common/ThreadProfileEvents.h>
#include <Common/TraceCollector.h>
#include <Common/setThreadName.h>
#include <Common/LockMemoryExceptionInThread.h>
#include <base/errnoToString.h>
#if defined(OS_LINUX)
@ -341,7 +343,7 @@ void ThreadStatus::finalizeQueryProfiler()
void ThreadStatus::detachQuery(bool exit_if_already_detached, bool thread_exits)
{
MemoryTracker::LockExceptionInThread lock(VariableContext::Global);
LockMemoryExceptionInThread lock(VariableContext::Global);
if (exit_if_already_detached && thread_state == ThreadState::DetachedFromQuery)
{

View File

@ -0,0 +1,114 @@
#include "TraceCollector.h"
#include <Core/Field.h>
#include <IO/ReadBufferFromFileDescriptor.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromFileDescriptor.h>
#include <IO/WriteHelpers.h>
#include <Interpreters/TraceLog.h>
#include <Poco/Logger.h>
#include <Common/setThreadName.h>
#include <base/logger_useful.h>
namespace DB
{
TraceCollector::TraceCollector(std::shared_ptr<TraceLog> trace_log_)
: trace_log(std::move(trace_log_))
{
TraceSender::pipe.open();
/** Turn write end of pipe to non-blocking mode to avoid deadlocks
* when QueryProfiler is invoked under locks and TraceCollector cannot pull data from pipe.
*/
TraceSender::pipe.setNonBlockingWrite();
TraceSender::pipe.tryIncreaseSize(1 << 20);
thread = ThreadFromGlobalPool(&TraceCollector::run, this);
}
TraceCollector::~TraceCollector()
{
if (!thread.joinable())
LOG_ERROR(&Poco::Logger::get("TraceCollector"), "TraceCollector thread is malformed and cannot be joined");
else
stop();
TraceSender::pipe.close();
}
/** Sends TraceCollector stop message
*
* Each sequence of data for TraceCollector thread starts with a boolean flag.
* If this flag is true, TraceCollector must stop reading trace_pipe and exit.
* This function sends flag with a true value to stop TraceCollector gracefully.
*/
void TraceCollector::stop()
{
WriteBufferFromFileDescriptor out(TraceSender::pipe.fds_rw[1]);
writeChar(true, out);
out.next();
thread.join();
}
void TraceCollector::run()
{
setThreadName("TraceCollector");
ReadBufferFromFileDescriptor in(TraceSender::pipe.fds_rw[0]);
while (true)
{
char is_last;
readChar(is_last, in);
if (is_last)
break;
std::string query_id;
UInt8 query_id_size = 0;
readBinary(query_id_size, in);
query_id.resize(query_id_size);
in.read(query_id.data(), query_id_size);
UInt8 trace_size = 0;
readIntBinary(trace_size, in);
Array trace;
trace.reserve(trace_size);
for (size_t i = 0; i < trace_size; ++i)
{
uintptr_t addr = 0;
readPODBinary(addr, in);
trace.emplace_back(UInt64(addr));
}
TraceType trace_type;
readPODBinary(trace_type, in);
UInt64 thread_id;
readPODBinary(thread_id, in);
Int64 size;
readPODBinary(size, in);
if (trace_log)
{
// time and time_in_microseconds are both being constructed from the same timespec so that the
// times will be equal up to the precision of a second.
struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
UInt64 time = UInt64(ts.tv_sec * 1000000000LL + ts.tv_nsec);
UInt64 time_in_microseconds = UInt64((ts.tv_sec * 1000000LL) + (ts.tv_nsec / 1000));
TraceLogElement element{time_t(time / 1000000000), time_in_microseconds, time, trace_type, thread_id, query_id, trace, size};
trace_log->add(element);
}
}
}
}

View File

@ -0,0 +1,37 @@
#pragma once
#include <Common/ThreadPool.h>
#include <Common/TraceSender.h>
class StackTrace;
namespace Poco
{
class Logger;
}
namespace DB
{
class TraceLog;
class TraceCollector
{
public:
TraceCollector(std::shared_ptr<TraceLog> trace_log_);
~TraceCollector();
static inline void collect(TraceType trace_type, const StackTrace & stack_trace, Int64 size)
{
return TraceSender::send(trace_type, stack_trace, size);
}
private:
std::shared_ptr<TraceLog> trace_log;
ThreadFromGlobalPool thread;
void run();
void stop();
};
}

View File

@ -3,8 +3,10 @@
#include <DataTypes/DataTypeEnum.h>
#include <DataTypes/DataTypesNumber.h>
#include <Interpreters/SystemLog.h>
#include <Interpreters/TraceCollector.h>
#include <Common/QueryProfiler.h>
#include <Common/TraceCollector.h>
#include <Core/NamesAndTypes.h>
#include <Core/NamesAndAliases.h>
namespace DB

View File

@ -1,5 +1,6 @@
#pragma once
#include <Core/NamesAndTypes.h>
#include <Core/NamesAndAliases.h>
#include <Interpreters/SystemLog.h>
#include <Interpreters/ClientInfo.h>

View File

@ -2,6 +2,7 @@
#include <Common/PODArray.h>
#include <Common/typeid_cast.h>
#include <Common/ThreadProfileEvents.h>
#include <Common/MemoryTrackerBlockerInThread.h>
#include <Interpreters/AsynchronousInsertQueue.h>
#include <IO/WriteBufferFromFile.h>
@ -29,6 +30,7 @@
#include <Parsers/ParserQuery.h>
#include <Parsers/queryNormalization.h>
#include <Parsers/queryToString.h>
#include <Parsers/formatAST.h>
#include <Formats/FormatFactory.h>
#include <Storages/StorageInput.h>
@ -38,6 +40,7 @@
#include <Interpreters/Context.h>
#include <Interpreters/InterpreterFactory.h>
#include <Interpreters/InterpreterSetQuery.h>
#include <Interpreters/InterpreterInsertQuery.h>
#include <Interpreters/NormalizeSelectWithUnionQueryVisitor.h>
#include <Interpreters/OpenTelemetrySpanLog.h>
#include <Interpreters/ProcessList.h>
@ -193,7 +196,7 @@ static void setExceptionStackTrace(QueryLogElement & elem)
{
/// Disable memory tracker for stack trace.
/// Because if exception is "Memory limit (for query) exceed", then we probably can't allocate another one string.
MemoryTracker::BlockerInThread temporarily_disable_memory_tracker(VariableContext::Global);
MemoryTrackerBlockerInThread temporarily_disable_memory_tracker(VariableContext::Global);
try
{

View File

@ -15,10 +15,13 @@
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/CurrentMetrics.h>
#include <Common/FieldVisitorsAccurateComparison.h>
#include <Common/MemoryTrackerBlockerInThread.h>
#include <base/JSON.h>
#include <base/logger_useful.h>
#include <Compression/getCompressionCodecForFile.h>
#include <Parsers/parseQuery.h>
#include <Parsers/queryToString.h>
#include <Parsers/ExpressionElementParsers.h>
#include <DataTypes/NestedUtils.h>
#include <DataTypes/DataTypeAggregateFunction.h>
@ -615,7 +618,7 @@ void IMergeTreeDataPart::loadColumnsChecksumsIndexes(bool require_columns_checks
/// Memory should not be limited during ATTACH TABLE query.
/// This is already true at the server startup but must be also ensured for manual table ATTACH.
/// Motivation: memory for index is shared between queries - not belong to the query itself.
MemoryTracker::BlockerInThread temporarily_disable_memory_tracker(VariableContext::Global);
MemoryTrackerBlockerInThread temporarily_disable_memory_tracker(VariableContext::Global);
loadUUID();
loadColumns(require_columns_checksums);

View File

@ -2,8 +2,9 @@
#include <Storages/MergeTree/MergeTreeDataMergerMutator.h>
#include <Storages/MergeTree/FutureMergedMutatedPart.h>
#include <Common/CurrentMetrics.h>
#include <base/getThreadId.h>
#include <Common/CurrentThread.h>
#include <Common/MemoryTracker.h>
#include <base/getThreadId.h>
namespace DB

View File

@ -1,4 +1,5 @@
#include <Storages/MergeTree/MergeTreeDataPartWriterOnDisk.h>
#include <Common/MemoryTrackerBlockerInThread.h>
#include <utility>
@ -184,7 +185,7 @@ void MergeTreeDataPartWriterOnDisk::calculateAndSerializePrimaryIndex(const Bloc
* And otherwise it will look like excessively growing memory consumption in context of query.
* (observed in long INSERT SELECTs)
*/
MemoryTracker::BlockerInThread temporarily_disable_memory_tracker;
MemoryTrackerBlockerInThread temporarily_disable_memory_tracker;
/// Write index. The index contains Primary Key value for each `index_granularity` row.
for (const auto & granule : granules_to_write)

View File

@ -1,5 +1,6 @@
#include <Storages/MergeTree/MergeTreeMarksLoader.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Common/MemoryTrackerBlockerInThread.h>
#include <IO/ReadBufferFromFile.h>
#include <utility>
@ -47,7 +48,7 @@ const MarkInCompressedFile & MergeTreeMarksLoader::getMark(size_t row_index, siz
MarkCache::MappedPtr MergeTreeMarksLoader::loadMarksImpl()
{
/// Memory for marks must not be accounted as memory usage for query, because they are stored in shared cache.
MemoryTracker::BlockerInThread temporarily_disable_memory_tracker;
MemoryTrackerBlockerInThread temporarily_disable_memory_tracker;
size_t file_size = disk->getFileSize(mrk_path);
size_t mark_size = index_granularity_info.getMarkSizeInBytes(columns_in_mark);

View File

@ -1,13 +1,14 @@
#include <Storages/MergeTree/ReplicatedMergeTreeQueue.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <Storages/MergeTree/MergeTreeDataMergerMutator.h>
#include <Storages/MergeTree/ReplicatedMergeTreeQuorumEntry.h>
#include <Storages/MergeTree/ReplicatedMergeTreeMergeStrategyPicker.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <Common/StringUtils/StringUtils.h>
#include <Common/CurrentMetrics.h>
#include <Parsers/formatAST.h>
namespace DB

View File

@ -9,6 +9,7 @@
#include <Parsers/ParserCreateQuery.h>
#include <Parsers/parseQuery.h>
#include <Parsers/queryToString.h>
#include <Parsers/formatAST.h>
#include <Core/Defines.h>
#include <Interpreters/InterpreterSelectQuery.h>

View File

@ -13,7 +13,7 @@
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTExpressionList.h>
#include <Common/CurrentMetrics.h>
#include <Common/MemoryTracker.h>
#include <Common/MemoryTrackerBlockerInThread.h>
#include <Common/FieldVisitorConvertToNumber.h>
#include <Common/quoteString.h>
#include <Common/typeid_cast.h>
@ -467,7 +467,7 @@ static void appendBlock(const Block & from, Block & to)
MutableColumnPtr last_col;
try
{
MemoryTracker::BlockerInThread temporarily_disable_memory_tracker;
MemoryTrackerBlockerInThread temporarily_disable_memory_tracker;
if (to.rows() == 0)
{
@ -496,7 +496,7 @@ static void appendBlock(const Block & from, Block & to)
/// In case of rollback, it is better to ignore memory limits instead of abnormal server termination.
/// So ignore any memory limits, even global (since memory tracking has drift).
MemoryTracker::BlockerInThread temporarily_ignore_any_memory_limits(VariableContext::Global);
MemoryTrackerBlockerInThread temporarily_ignore_any_memory_limits(VariableContext::Global);
try
{
@ -924,7 +924,7 @@ void StorageBuffer::writeBlockToDestination(const Block & block, StoragePtr tabl
}
auto destination_metadata_snapshot = table->getInMemoryMetadataPtr();
MemoryTracker::BlockerInThread temporarily_disable_memory_tracker;
MemoryTrackerBlockerInThread temporarily_disable_memory_tracker;
auto insert = std::make_shared<ASTInsertQuery>();
insert->table_id = destination_id;

View File

@ -33,6 +33,7 @@
#include <Parsers/ASTSelectQuery.h>
#include <Parsers/ASTSelectWithUnionQuery.h>
#include <Parsers/ASTTablesInSelectQuery.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ParserAlterQuery.h>
#include <Parsers/TablePropertiesQueriesASTs.h>
#include <Parsers/parseQuery.h>
@ -44,6 +45,7 @@
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/InterpreterDescribeQuery.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <Interpreters/InterpreterInsertQuery.h>
#include <Interpreters/JoinedTables.h>
#include <Interpreters/TranslateQualifiedNamesVisitor.h>
#include <Interpreters/AddDefaultDatabaseVisitor.h>
@ -55,6 +57,7 @@
#include <Interpreters/getTableExpressions.h>
#include <Functions/IFunction.h>
#include <Processors/Executors/PushingPipelineExecutor.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPlan/BuildQueryPipelineSettings.h>
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>

View File

@ -11,6 +11,7 @@
#include <QueryPipeline/Pipe.h>
#include <Processors/Sinks/SinkToStorage.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Parsers/ASTCreateQuery.h>
namespace DB

View File

@ -17,6 +17,7 @@
#include <Parsers/ASTPartition.h>
#include <Parsers/ASTSetQuery.h>
#include <Parsers/queryToString.h>
#include <Parsers/formatAST.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/ActiveDataPartSet.h>
#include <Storages/AlterCommands.h>

View File

@ -36,12 +36,14 @@
#include <Databases/DatabaseOnDisk.h>
#include <Parsers/formatAST.h>
#include <Parsers/parseQuery.h>
#include <Parsers/ASTDropQuery.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTOptimizeQuery.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/queryToString.h>
#include <Parsers/ASTCheckQuery.h>
#include <Parsers/ExpressionListParsers.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/Sources/RemoteSource.h>

View File

@ -9,6 +9,8 @@
#include <Interpreters/Context.h>
#include <Interpreters/InDepthNodeVisitor.h>
#include <Interpreters/InterpreterAlterQuery.h>
#include <Interpreters/InterpreterCreateQuery.h>
#include <Interpreters/InterpreterInsertQuery.h>
#include <Interpreters/InterpreterDropQuery.h>
#include <Interpreters/QueryAliasesVisitor.h>
#include <Interpreters/QueryNormalizer.h>
@ -21,7 +23,9 @@
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTSelectQuery.h>
#include <Parsers/ASTTablesInSelectQuery.h>
#include <Parsers/ASTColumnDeclaration.h>
#include <Parsers/ASTWatchQuery.h>
#include <Parsers/parseQuery.h>
#include <Parsers/formatAST.h>
#include <Processors/Executors/CompletedPipelineExecutor.h>
#include <Processors/Sources/BlocksSource.h>

View File

@ -1,2 +1,2 @@
add_executable (config-processor config-processor.cpp)
target_link_libraries(config-processor PRIVATE clickhouse_common_config)
target_link_libraries(config-processor PRIVATE clickhouse_common_config_no_zookeeper_log)

View File

@ -1,2 +1,2 @@
add_executable(keeper-bench Generator.cpp Runner.cpp Stats.cpp main.cpp)
target_link_libraries(keeper-bench PRIVATE clickhouse_common_zookeeper)
target_link_libraries(keeper-bench PRIVATE clickhouse_common_zookeeper_no_log)

View File

@ -1,2 +1,2 @@
add_executable(clickhouse-zookeeper-cli zookeeper-cli.cpp)
target_link_libraries(clickhouse-zookeeper-cli PRIVATE clickhouse_common_zookeeper)
target_link_libraries(clickhouse-zookeeper-cli PRIVATE clickhouse_common_zookeeper_no_log)

View File

@ -1,2 +1,2 @@
add_executable (zookeeper-dump-tree main.cpp ${SRCS})
target_link_libraries(zookeeper-dump-tree PRIVATE clickhouse_common_zookeeper clickhouse_common_io boost::program_options)
target_link_libraries(zookeeper-dump-tree PRIVATE clickhouse_common_zookeeper_no_log clickhouse_common_io boost::program_options)

View File

@ -1,2 +1,2 @@
add_executable (zookeeper-remove-by-list main.cpp ${SRCS})
target_link_libraries(zookeeper-remove-by-list PRIVATE clickhouse_common_zookeeper boost::program_options)
target_link_libraries(zookeeper-remove-by-list PRIVATE clickhouse_common_zookeeper_no_log boost::program_options)

View File

@ -1,2 +1,2 @@
add_executable(zk-test main.cpp)
target_link_libraries(zk-test PRIVATE clickhouse_common_zookeeper)
target_link_libraries(zk-test PRIVATE clickhouse_common_zookeeper_no_log)