Merge branch 'master' of github.com:ClickHouse/ClickHouse into bit_on_bitset

This commit is contained in:
Guillaume Tassery 2020-02-13 13:24:00 +01:00
commit 1642c7bb97
21 changed files with 410 additions and 205 deletions

View File

@ -52,12 +52,12 @@ IncludeCategories:
ReflowComments: false
AlignEscapedNewlinesLeft: false
AlignEscapedNewlines: DontAlign
AlignTrailingComments: true
# Not changed:
AccessModifierOffset: -4
AlignConsecutiveAssignments: false
AlignOperands: false
AlignTrailingComments: false
AllowAllParametersOfDeclarationOnNextLine: true
AllowShortBlocksOnASingleLine: false
AllowShortCaseLabelsOnASingleLine: false

View File

@ -1,12 +1,17 @@
#include <cstdlib>
#include "MemoryTracker.h"
#include <common/likely.h>
#include <common/logger_useful.h>
#include <IO/WriteHelpers.h>
#include "Common/TraceCollector.h"
#include <Common/CurrentThread.h>
#include <Common/Exception.h>
#include <Common/formatReadable.h>
#include <Common/CurrentThread.h>
#include <IO/WriteHelpers.h>
#include <common/likely.h>
#include <common/logger_useful.h>
#include <ext/singleton.h>
#include <atomic>
#include <cmath>
#include <cstdlib>
namespace DB
@ -73,7 +78,7 @@ void MemoryTracker::alloc(Int64 size)
return;
/** Using memory_order_relaxed means that if allocations are done simultaneously,
* we allow exception about memory limit exceeded to be thrown only on next allocation.
* we allow exception about memory limit exceeded to be thrown only on next allocation.
* So, we allow over-allocations.
*/
Int64 will_be = size + amount.fetch_add(size, std::memory_order_relaxed);
@ -81,7 +86,8 @@ void MemoryTracker::alloc(Int64 size)
if (metric != CurrentMetrics::end())
CurrentMetrics::add(metric, size);
Int64 current_limit = limit.load(std::memory_order_relaxed);
Int64 current_hard_limit = hard_limit.load(std::memory_order_relaxed);
Int64 current_profiler_limit = profiler_limit.load(std::memory_order_relaxed);
/// Using non-thread-safe random number generator. Joint distribution in different threads would not be uniform.
/// In this case, it doesn't matter.
@ -98,12 +104,19 @@ void MemoryTracker::alloc(Int64 size)
message << " " << description;
message << ": fault injected. Would use " << formatReadableSizeWithBinarySuffix(will_be)
<< " (attempt to allocate chunk of " << size << " bytes)"
<< ", maximum: " << formatReadableSizeWithBinarySuffix(current_limit);
<< ", maximum: " << formatReadableSizeWithBinarySuffix(current_hard_limit);
throw DB::Exception(message.str(), DB::ErrorCodes::MEMORY_LIMIT_EXCEEDED);
}
if (unlikely(current_limit && will_be > current_limit))
if (unlikely(current_profiler_limit && will_be > current_profiler_limit))
{
auto no_track = blocker.cancel();
ext::Singleton<DB::TraceCollector>()->collect(size);
setOrRaiseProfilerLimit(current_profiler_limit + Int64(std::ceil((will_be - current_profiler_limit) / profiler_step)) * profiler_step);
}
if (unlikely(current_hard_limit && will_be > current_hard_limit))
{
free(size);
@ -116,7 +129,7 @@ void MemoryTracker::alloc(Int64 size)
message << " " << description;
message << " exceeded: would use " << formatReadableSizeWithBinarySuffix(will_be)
<< " (attempt to allocate chunk of " << size << " bytes)"
<< ", maximum: " << formatReadableSizeWithBinarySuffix(current_limit);
<< ", maximum: " << formatReadableSizeWithBinarySuffix(current_hard_limit);
throw DB::Exception(message.str(), DB::ErrorCodes::MEMORY_LIMIT_EXCEEDED);
}
@ -174,7 +187,8 @@ void MemoryTracker::resetCounters()
{
amount.store(0, std::memory_order_relaxed);
peak.store(0, std::memory_order_relaxed);
limit.store(0, std::memory_order_relaxed);
hard_limit.store(0, std::memory_order_relaxed);
profiler_limit.store(0, std::memory_order_relaxed);
}
@ -187,11 +201,20 @@ void MemoryTracker::reset()
}
void MemoryTracker::setOrRaiseLimit(Int64 value)
void MemoryTracker::setOrRaiseHardLimit(Int64 value)
{
/// This is just atomic set to maximum.
Int64 old_value = limit.load(std::memory_order_relaxed);
while (old_value < value && !limit.compare_exchange_weak(old_value, value))
Int64 old_value = hard_limit.load(std::memory_order_relaxed);
while (old_value < value && !hard_limit.compare_exchange_weak(old_value, value))
;
}
void MemoryTracker::setOrRaiseProfilerLimit(Int64 value)
{
/// This is just atomic set to maximum.
Int64 old_value = profiler_limit.load(std::memory_order_relaxed);
while (old_value < value && !profiler_limit.compare_exchange_weak(old_value, value))
;
}
@ -207,7 +230,7 @@ namespace CurrentMemoryTracker
if (untracked > untracked_memory_limit)
{
/// Zero untracked before track. If tracker throws out-of-limit we would be able to alloc up to untracked_memory_limit bytes
/// more. It could be usefull for enlarge Exception message in rethrow logic.
/// more. It could be useful to enlarge Exception message in rethrow logic.
Int64 tmp = untracked;
untracked = 0;
memory_tracker->alloc(tmp);
@ -218,10 +241,7 @@ namespace CurrentMemoryTracker
void realloc(Int64 old_size, Int64 new_size)
{
Int64 addition = new_size - old_size;
if (addition > 0)
alloc(addition);
else
free(-addition);
addition > 0 ? alloc(addition) : free(-addition);
}
void free(Int64 size)

View File

@ -15,7 +15,10 @@ class MemoryTracker
{
std::atomic<Int64> amount {0};
std::atomic<Int64> peak {0};
std::atomic<Int64> limit {0};
std::atomic<Int64> hard_limit {0};
std::atomic<Int64> profiler_limit {0};
Int64 profiler_step = 0;
/// To test exception safety of calling code, memory tracker throws an exception on each memory allocation with specified probability.
double fault_probability = 0;
@ -32,7 +35,6 @@ class MemoryTracker
public:
MemoryTracker(VariableContext level_ = VariableContext::Thread) : level(level_) {}
MemoryTracker(Int64 limit_, VariableContext level_ = VariableContext::Thread) : limit(limit_), level(level_) {}
MemoryTracker(MemoryTracker * parent_, VariableContext level_ = VariableContext::Thread) : parent(parent_), level(level_) {}
~MemoryTracker();
@ -66,21 +68,22 @@ public:
return peak.load(std::memory_order_relaxed);
}
void setLimit(Int64 limit_)
{
limit.store(limit_, std::memory_order_relaxed);
}
/** Set limit if it was not set.
* Otherwise, set limit to new value, if new value is greater than previous limit.
*/
void setOrRaiseLimit(Int64 value);
void setOrRaiseHardLimit(Int64 value);
void setOrRaiseProfilerLimit(Int64 value);
void setFaultProbability(double value)
{
fault_probability = value;
}
void setProfilerStep(Int64 value)
{
profiler_step = value;
}
/// next should be changed only once: from nullptr to some value.
/// NOTE: It is not true in MergeListElement
void setParent(MemoryTracker * elem)

View File

@ -1,92 +1,38 @@
#include "QueryProfiler.h"
#include <random>
#include <common/phdr_cache.h>
#include <common/config_common.h>
#include <common/StringRef.h>
#include <common/logger_useful.h>
#include <Common/PipeFDs.h>
#include <Common/StackTrace.h>
#include <Common/CurrentThread.h>
#include <Common/Exception.h>
#include <Common/thread_local_rng.h>
#include <IO/WriteHelpers.h>
#include <IO/WriteBufferFromFileDescriptorDiscardOnFailure.h>
#include <Common/Exception.h>
#include <Common/StackTrace.h>
#include <Common/TraceCollector.h>
#include <Common/thread_local_rng.h>
#include <common/StringRef.h>
#include <common/config_common.h>
#include <common/logger_useful.h>
#include <common/phdr_cache.h>
#include <ext/singleton.h>
#include <random>
namespace ProfileEvents
{
extern const Event QueryProfilerSignalOverruns;
}
namespace DB
{
extern LazyPipeFDs trace_pipe;
namespace
{
/// Normally query_id is a UUID (string with a fixed length) but user can provide custom query_id.
/// Thus upper bound on query_id length should be introduced to avoid buffer overflow in signal handler.
constexpr size_t QUERY_ID_MAX_LEN = 1024;
#if defined(OS_LINUX)
thread_local size_t write_trace_iteration = 0;
#endif
void writeTraceInfo(TimerType timer_type, int /* sig */, siginfo_t * info, void * context)
void writeTraceInfo(TraceType trace_type, int /* sig */, siginfo_t * info, void * context)
{
int overrun_count = 0;
#if defined(OS_LINUX)
/// Quickly drop if signal handler is called too frequently.
/// Otherwise we may end up infinitelly processing signals instead of doing any useful work.
++write_trace_iteration;
if (info && info->si_overrun > 0)
{
/// But pass with some frequency to avoid drop of all traces.
if (write_trace_iteration % info->si_overrun == 0)
{
ProfileEvents::increment(ProfileEvents::QueryProfilerSignalOverruns, info->si_overrun);
}
else
{
ProfileEvents::increment(ProfileEvents::QueryProfilerSignalOverruns, info->si_overrun + 1);
return;
}
}
if (info)
overrun_count = info->si_overrun;
#else
UNUSED(info);
#endif
constexpr size_t buf_size = sizeof(char) + // TraceCollector stop flag
8 * sizeof(char) + // maximum VarUInt length for string size
QUERY_ID_MAX_LEN * sizeof(char) + // maximum query_id length
sizeof(UInt8) + // number of stack frames
sizeof(StackTrace::Frames) + // collected stack trace, maximum capacity
sizeof(TimerType) + // timer type
sizeof(UInt64); // thread_id
char buffer[buf_size];
WriteBufferFromFileDescriptorDiscardOnFailure out(trace_pipe.fds_rw[1], buf_size, buffer);
StringRef query_id = CurrentThread::getQueryId();
query_id.size = std::min(query_id.size, QUERY_ID_MAX_LEN);
UInt64 thread_id = CurrentThread::get().thread_id;
const auto signal_context = *reinterpret_cast<ucontext_t *>(context);
const StackTrace stack_trace(signal_context);
writeChar(false, out);
writeStringBinary(query_id, out);
size_t stack_trace_size = stack_trace.getSize();
size_t stack_trace_offset = stack_trace.getOffset();
writeIntBinary(UInt8(stack_trace_size - stack_trace_offset), out);
for (size_t i = stack_trace_offset; i < stack_trace_size; ++i)
writePODBinary(stack_trace.getFrames()[i], out);
writePODBinary(timer_type, out);
writePODBinary(thread_id, out);
out.next();
ext::Singleton<TraceCollector>()->collect(trace_type, stack_trace, overrun_count);
}
[[maybe_unused]] const UInt32 TIMER_PRECISION = 1e9;
@ -135,11 +81,11 @@ QueryProfilerBase<ProfilerImpl>::QueryProfilerBase(const UInt64 thread_id, const
sev.sigev_notify = SIGEV_THREAD_ID;
sev.sigev_signo = pause_signal;
#if defined(__FreeBSD__)
# if defined(__FreeBSD__)
sev._sigev_un._threadid = thread_id;
#else
# else
sev._sigev_un._tid = thread_id;
#endif
# endif
if (timer_create(clock_type, &sev, &timer_id))
{
/// In Google Cloud Run, the function "timer_create" is implemented incorrectly as of 2020-01-25.
@ -206,7 +152,7 @@ QueryProfilerReal::QueryProfilerReal(const UInt64 thread_id, const UInt32 period
void QueryProfilerReal::signalHandler(int sig, siginfo_t * info, void * context)
{
writeTraceInfo(TimerType::Real, sig, info, context);
writeTraceInfo(TraceType::REAL_TIME, sig, info, context);
}
QueryProfilerCpu::QueryProfilerCpu(const UInt64 thread_id, const UInt32 period)
@ -215,7 +161,7 @@ QueryProfilerCpu::QueryProfilerCpu(const UInt64 thread_id, const UInt32 period)
void QueryProfilerCpu::signalHandler(int sig, siginfo_t * info, void * context)
{
writeTraceInfo(TimerType::Cpu, sig, info, context);
writeTraceInfo(TraceType::CPU_TIME, sig, info, context);
}
}

View File

@ -15,12 +15,6 @@ namespace Poco
namespace DB
{
enum class TimerType : UInt8
{
Real,
Cpu,
};
/**
* Query profiler implementation for selected thread.
*

View File

@ -1,25 +1,38 @@
#include "TraceCollector.h"
#include <Core/Field.h>
#include <IO/ReadBufferFromFileDescriptor.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromFileDescriptor.h>
#include <IO/WriteBufferFromFileDescriptorDiscardOnFailure.h>
#include <IO/WriteHelpers.h>
#include <Interpreters/TraceLog.h>
#include <Poco/Logger.h>
#include <Common/Exception.h>
#include <Common/PipeFDs.h>
#include <Common/StackTrace.h>
#include <common/logger_useful.h>
#include <IO/ReadHelpers.h>
#include <IO/ReadBufferFromFileDescriptor.h>
#include <IO/WriteHelpers.h>
#include <IO/WriteBufferFromFileDescriptor.h>
#include <Common/Exception.h>
#include <Interpreters/TraceLog.h>
#include <unistd.h>
#include <fcntl.h>
namespace ProfileEvents
{
extern const Event QueryProfilerSignalOverruns;
}
namespace DB
{
LazyPipeFDs trace_pipe;
namespace
{
/// Normally query_id is a UUID (string with a fixed length) but user can provide custom query_id.
/// Thus upper bound on query_id length should be introduced to avoid buffer overflow in signal handler.
constexpr size_t QUERY_ID_MAX_LEN = 1024;
thread_local size_t write_trace_iteration = 0;
}
namespace ErrorCodes
{
@ -27,20 +40,15 @@ namespace ErrorCodes
extern const int THREAD_IS_NOT_JOINABLE;
}
TraceCollector::TraceCollector(std::shared_ptr<TraceLog> & trace_log_)
: log(&Poco::Logger::get("TraceCollector"))
, trace_log(trace_log_)
TraceCollector::TraceCollector()
{
if (trace_log == nullptr)
throw Exception("Invalid trace log pointer passed", ErrorCodes::NULL_POINTER_DEREFERENCE);
trace_pipe.open();
pipe.open();
/** Turn write end of pipe to non-blocking mode to avoid deadlocks
* when QueryProfiler is invoked under locks and TraceCollector cannot pull data from pipe.
*/
trace_pipe.setNonBlocking();
trace_pipe.tryIncreaseSize(1 << 20);
pipe.setNonBlocking();
pipe.tryIncreaseSize(1 << 20);
thread = ThreadFromGlobalPool(&TraceCollector::run, this);
}
@ -48,14 +56,101 @@ TraceCollector::TraceCollector(std::shared_ptr<TraceLog> & trace_log_)
TraceCollector::~TraceCollector()
{
if (!thread.joinable())
LOG_ERROR(log, "TraceCollector thread is malformed and cannot be joined");
LOG_ERROR(&Poco::Logger::get("TraceCollector"), "TraceCollector thread is malformed and cannot be joined");
else
{
TraceCollector::notifyToStop();
stop();
thread.join();
}
trace_pipe.close();
pipe.close();
}
void TraceCollector::collect(TraceType trace_type, const StackTrace & stack_trace, int overrun_count)
{
/// Quickly drop if signal handler is called too frequently.
/// Otherwise we may end up infinitelly processing signals instead of doing any useful work.
++write_trace_iteration;
if (overrun_count)
{
/// But pass with some frequency to avoid drop of all traces.
if (write_trace_iteration % overrun_count == 0)
{
ProfileEvents::increment(ProfileEvents::QueryProfilerSignalOverruns, overrun_count);
}
else
{
ProfileEvents::increment(ProfileEvents::QueryProfilerSignalOverruns, overrun_count + 1);
return;
}
}
constexpr size_t buf_size = sizeof(char) + // TraceCollector stop flag
8 * sizeof(char) + // maximum VarUInt length for string size
QUERY_ID_MAX_LEN * sizeof(char) + // maximum query_id length
sizeof(UInt8) + // number of stack frames
sizeof(StackTrace::Frames) + // collected stack trace, maximum capacity
sizeof(TraceType) + // trace type
sizeof(UInt64) + // thread_id
sizeof(UInt64); // size
char buffer[buf_size];
WriteBufferFromFileDescriptorDiscardOnFailure out(pipe.fds_rw[1], buf_size, buffer);
StringRef query_id = CurrentThread::getQueryId();
query_id.size = std::min(query_id.size, QUERY_ID_MAX_LEN);
auto thread_id = CurrentThread::get().thread_id;
writeChar(false, out);
writeStringBinary(query_id, out);
size_t stack_trace_size = stack_trace.getSize();
size_t stack_trace_offset = stack_trace.getOffset();
writeIntBinary(UInt8(stack_trace_size - stack_trace_offset), out);
for (size_t i = stack_trace_offset; i < stack_trace_size; ++i)
writePODBinary(stack_trace.getFrames()[i], out);
writePODBinary(trace_type, out);
writePODBinary(thread_id, out);
writePODBinary(UInt64(0), out);
out.next();
}
void TraceCollector::collect(UInt64 size)
{
constexpr size_t buf_size = sizeof(char) + // TraceCollector stop flag
8 * sizeof(char) + // maximum VarUInt length for string size
QUERY_ID_MAX_LEN * sizeof(char) + // maximum query_id length
sizeof(UInt8) + // number of stack frames
sizeof(StackTrace::Frames) + // collected stack trace, maximum capacity
sizeof(TraceType) + // trace type
sizeof(UInt64) + // thread_id
sizeof(UInt64); // size
char buffer[buf_size];
WriteBufferFromFileDescriptorDiscardOnFailure out(pipe.fds_rw[1], buf_size, buffer);
StringRef query_id = CurrentThread::getQueryId();
query_id.size = std::min(query_id.size, QUERY_ID_MAX_LEN);
auto thread_id = CurrentThread::get().thread_id;
writeChar(false, out);
writeStringBinary(query_id, out);
const auto & stack_trace = StackTrace();
size_t stack_trace_size = stack_trace.getSize();
size_t stack_trace_offset = stack_trace.getOffset();
writeIntBinary(UInt8(stack_trace_size - stack_trace_offset), out);
for (size_t i = stack_trace_offset; i < stack_trace_size; ++i)
writePODBinary(stack_trace.getFrames()[i], out);
writePODBinary(TraceType::MEMORY, out);
writePODBinary(thread_id, out);
writePODBinary(size, out);
out.next();
}
/**
@ -68,16 +163,16 @@ TraceCollector::~TraceCollector()
* NOTE: TraceCollector will NOT stop immediately as there may be some data left in the pipe
* before stop message.
*/
void TraceCollector::notifyToStop()
void TraceCollector::stop()
{
WriteBufferFromFileDescriptor out(trace_pipe.fds_rw[1]);
WriteBufferFromFileDescriptor out(pipe.fds_rw[1]);
writeChar(true, out);
out.next();
}
void TraceCollector::run()
{
ReadBufferFromFileDescriptor in(trace_pipe.fds_rw[0]);
ReadBufferFromFileDescriptor in(pipe.fds_rw[0]);
while (true)
{
@ -89,27 +184,33 @@ void TraceCollector::run()
std::string query_id;
readStringBinary(query_id, in);
UInt8 size = 0;
readIntBinary(size, in);
UInt8 trace_size = 0;
readIntBinary(trace_size, in);
Array trace;
trace.reserve(size);
trace.reserve(trace_size);
for (size_t i = 0; i < size; i++)
for (size_t i = 0; i < trace_size; i++)
{
uintptr_t addr = 0;
readPODBinary(addr, in);
trace.emplace_back(UInt64(addr));
}
TimerType timer_type;
readPODBinary(timer_type, in);
TraceType trace_type;
readPODBinary(trace_type, in);
UInt64 thread_id;
readPODBinary(thread_id, in);
TraceLogElement element{std::time(nullptr), timer_type, thread_id, query_id, trace};
trace_log->add(element);
UInt64 size;
readPODBinary(size, in);
if (trace_log)
{
TraceLogElement element{std::time(nullptr), trace_type, thread_id, query_id, trace, size};
trace_log->add(element);
}
}
}

View File

@ -1,7 +1,10 @@
#pragma once
#include "Common/PipeFDs.h"
#include <Common/ThreadPool.h>
class StackTrace;
namespace Poco
{
class Logger;
@ -12,21 +15,31 @@ namespace DB
class TraceLog;
enum class TraceType : UInt8
{
REAL_TIME,
CPU_TIME,
MEMORY,
};
class TraceCollector
{
public:
TraceCollector();
~TraceCollector();
void setTraceLog(const std::shared_ptr<TraceLog> & trace_log_) { trace_log = trace_log_; }
void collect(TraceType type, const StackTrace & stack_trace, int overrun_count = 0);
void collect(UInt64 size);
private:
Poco::Logger * log;
std::shared_ptr<TraceLog> trace_log;
ThreadFromGlobalPool thread;
LazyPipeFDs pipe;
void run();
static void notifyToStop();
public:
TraceCollector(std::shared_ptr<TraceLog> & trace_log_);
~TraceCollector();
void stop();
};
}

View File

@ -1,14 +1,16 @@
#if defined(OS_LINUX)
#include <malloc.h>
#elif defined(OS_DARWIN)
#include <malloc/malloc.h>
#endif
#include <new>
#include <common/config_common.h>
#include <common/memory.h>
#include <Common/MemoryTracker.h>
#include <iostream>
#include <new>
#if defined(OS_LINUX)
# include <malloc.h>
#elif defined(OS_DARWIN)
# include <malloc/malloc.h>
#endif
/// Replace default new/delete with memory tracking versions.
/// @sa https://en.cppreference.com/w/cpp/memory/new/operator_new
/// https://en.cppreference.com/w/cpp/memory/new/operator_delete
@ -29,7 +31,7 @@ ALWAYS_INLINE void trackMemory(std::size_t size)
#endif
}
ALWAYS_INLINE bool trackMemoryNoExept(std::size_t size) noexcept
ALWAYS_INLINE bool trackMemoryNoExcept(std::size_t size) noexcept
{
try
{
@ -54,11 +56,11 @@ ALWAYS_INLINE void untrackMemory(void * ptr [[maybe_unused]], std::size_t size [
#else
if (size)
CurrentMemoryTracker::free(size);
#ifdef _GNU_SOURCE
# ifdef _GNU_SOURCE
/// It's innaccurate resource free for sanitizers. malloc_usable_size() result is greater or equal to allocated size.
else
CurrentMemoryTracker::free(malloc_usable_size(ptr));
#endif
# endif
#endif
}
catch (...)
@ -83,14 +85,14 @@ void * operator new[](std::size_t size)
void * operator new(std::size_t size, const std::nothrow_t &) noexcept
{
if (likely(Memory::trackMemoryNoExept(size)))
if (likely(Memory::trackMemoryNoExcept(size)))
return Memory::newNoExept(size);
return nullptr;
}
void * operator new[](std::size_t size, const std::nothrow_t &) noexcept
{
if (likely(Memory::trackMemoryNoExept(size)))
if (likely(Memory::trackMemoryNoExcept(size)))
return Memory::newNoExept(size);
return nullptr;
}

View File

@ -332,6 +332,7 @@ struct Settings : public SettingsCollection<Settings>
M(SettingUInt64, max_memory_usage, 0, "Maximum memory usage for processing of single query. Zero means unlimited.", 0) \
M(SettingUInt64, max_memory_usage_for_user, 0, "Maximum memory usage for processing all concurrently running queries for the user. Zero means unlimited.", 0) \
M(SettingUInt64, max_memory_usage_for_all_queries, 0, "Maximum memory usage for processing all concurrently running queries on the server. Zero means unlimited.", 0) \
M(SettingUInt64, memory_profiler_step, 0, "Every number of bytes the memory profiler will dump the allocating stacktrace. Zero means disabled memory profiler.", 0) \
\
M(SettingUInt64, max_network_bandwidth, 0, "The maximum speed of data exchange over the network in bytes per second for a query. Zero means unlimited.", 0) \
M(SettingUInt64, max_network_bytes, 0, "The maximum number of bytes (compressed) to receive or transmit over the network for execution of the query.", 0) \

View File

@ -1,3 +1,4 @@
#include "Common/quoteString.h"
#include <Common/typeid_cast.h>
#include <Common/PODArray.h>
#include <Core/Row.h>
@ -334,7 +335,7 @@ void ActionsMatcher::visit(const ASTIdentifier & identifier, const ASTPtr & ast,
found = true;
if (found)
throw Exception("Column " + column_name.get(ast) + " is not under aggregate function and not in GROUP BY.",
throw Exception("Column " + backQuote(column_name.get(ast)) + " is not under aggregate function and not in GROUP BY",
ErrorCodes::NOT_AN_AGGREGATE);
/// Special check for WITH statement alias. Add alias action to be able to use this alias.

View File

@ -57,6 +57,7 @@
#include <Common/TraceCollector.h>
#include <common/logger_useful.h>
#include <Common/RemoteHostFilter.h>
#include <ext/singleton.h>
namespace ProfileEvents
{
@ -168,7 +169,6 @@ struct ContextShared
RemoteHostFilter remote_host_filter; /// Allowed URL from config.xml
std::unique_ptr<TraceCollector> trace_collector; /// Thread collecting traces from threads executing queries
/// Named sessions. The user could specify session identifier to reuse settings and temporary tables in subsequent requests.
class SessionKeyHash
@ -299,13 +299,7 @@ struct ContextShared
schedule_pool.reset();
ddl_worker.reset();
/// Stop trace collector if any
trace_collector.reset();
}
bool hasTraceCollector()
{
return trace_collector != nullptr;
ext::Singleton<TraceCollector>::reset();
}
void initializeTraceCollector(std::shared_ptr<TraceLog> trace_log)
@ -313,7 +307,7 @@ struct ContextShared
if (trace_log == nullptr)
return;
trace_collector = std::make_unique<TraceCollector>(trace_log);
ext::Singleton<TraceCollector>()->setTraceLog(trace_log);
}
};
@ -1693,11 +1687,6 @@ void Context::initializeSystemLogs()
shared->system_logs.emplace(*global_context, getConfigRef());
}
bool Context::hasTraceCollector()
{
return shared->hasTraceCollector();
}
void Context::initializeTraceCollector()
{
shared->initializeTraceCollector(getTraceLog());

View File

@ -181,12 +181,12 @@ ProcessList::EntryPtr ProcessList::insert(const String & query_, const IAST * as
/// You should specify this value in configuration for default profile,
/// not for specific users, sessions or queries,
/// because this setting is effectively global.
total_memory_tracker.setOrRaiseLimit(settings.max_memory_usage_for_all_queries);
total_memory_tracker.setOrRaiseHardLimit(settings.max_memory_usage_for_all_queries);
total_memory_tracker.setDescription("(total)");
/// Track memory usage for all simultaneously running queries from single user.
user_process_list.user_memory_tracker.setParent(&total_memory_tracker);
user_process_list.user_memory_tracker.setOrRaiseLimit(settings.max_memory_usage_for_user);
user_process_list.user_memory_tracker.setOrRaiseHardLimit(settings.max_memory_usage_for_user);
user_process_list.user_memory_tracker.setDescription("(for user)");
/// Actualize thread group info
@ -198,7 +198,9 @@ ProcessList::EntryPtr ProcessList::insert(const String & query_, const IAST * as
thread_group->query = process_it->query;
/// Set query-level memory trackers
thread_group->memory_tracker.setOrRaiseLimit(process_it->max_memory_usage);
thread_group->memory_tracker.setOrRaiseHardLimit(process_it->max_memory_usage);
thread_group->memory_tracker.setOrRaiseProfilerLimit(settings.memory_profiler_step);
thread_group->memory_tracker.setProfilerStep(settings.memory_profiler_step);
thread_group->memory_tracker.setDescription("(for query)");
if (process_it->memory_tracker_fault_probability)
thread_group->memory_tracker.setFaultProbability(process_it->memory_tracker_fault_probability);

View File

@ -1,12 +1,15 @@
#include <Common/ThreadStatus.h>
#include <Interpreters/Context.h>
#include <Interpreters/ProcessList.h>
#include <Interpreters/QueryThreadLog.h>
#include <Common/CurrentThread.h>
#include <Common/ThreadProfileEvents.h>
#include <Common/Exception.h>
#include <Common/QueryProfiler.h>
#include <Interpreters/Context.h>
#include <Interpreters/QueryThreadLog.h>
#include <Interpreters/ProcessList.h>
#include <Common/ThreadProfileEvents.h>
#include <Common/TraceCollector.h>
#include <ext/singleton.h>
#if defined(OS_LINUX)
# include <Common/hasLinuxCapability.h>
@ -153,7 +156,7 @@ void ThreadStatus::finalizePerformanceCounters()
void ThreadStatus::initQueryProfiler()
{
/// query profilers are useless without trace collector
if (!global_context || !global_context->hasTraceCollector())
if (!global_context || !ext::Singleton<TraceCollector>::isInitialized())
return;
const auto & settings = query_context->getSettingsRef();

View File

@ -9,11 +9,12 @@
using namespace DB;
using TimerDataType = TraceLogElement::TimerDataType;
using TraceDataType = TraceLogElement::TraceDataType;
const TimerDataType::Values TraceLogElement::timer_values = {
{"Real", static_cast<UInt8>(TimerType::Real)},
{"CPU", static_cast<UInt8>(TimerType::Cpu)}
const TraceDataType::Values TraceLogElement::trace_values = {
{"Real", static_cast<UInt8>(TraceType::REAL_TIME)},
{"CPU", static_cast<UInt8>(TraceType::CPU_TIME)},
{"Memory", static_cast<UInt8>(TraceType::MEMORY)},
};
Block TraceLogElement::createBlock()
@ -23,10 +24,11 @@ Block TraceLogElement::createBlock()
{std::make_shared<DataTypeDate>(), "event_date"},
{std::make_shared<DataTypeDateTime>(), "event_time"},
{std::make_shared<DataTypeUInt32>(), "revision"},
{std::make_shared<TimerDataType>(timer_values), "timer_type"},
{std::make_shared<TraceDataType>(trace_values), "trace_type"},
{std::make_shared<DataTypeUInt64>(), "thread_id"},
{std::make_shared<DataTypeString>(), "query_id"},
{std::make_shared<DataTypeArray>(std::make_shared<DataTypeUInt64>()), "trace"}
{std::make_shared<DataTypeArray>(std::make_shared<DataTypeUInt64>()), "trace"},
{std::make_shared<DataTypeUInt64>(), "size"},
};
}
@ -39,10 +41,11 @@ void TraceLogElement::appendToBlock(Block & block) const
columns[i++]->insert(DateLUT::instance().toDayNum(event_time));
columns[i++]->insert(event_time);
columns[i++]->insert(ClickHouseRevision::get());
columns[i++]->insert(static_cast<UInt8>(timer_type));
columns[i++]->insert(static_cast<UInt8>(trace_type));
columns[i++]->insert(thread_id);
columns[i++]->insertData(query_id.data(), query_id.size());
columns[i++]->insert(trace);
columns[i++]->insert(size);
block.setColumns(std::move(columns));
}

View File

@ -1,23 +1,26 @@
#pragma once
#include <Common/QueryProfiler.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeEnum.h>
#include <DataTypes/DataTypesNumber.h>
#include <Interpreters/SystemLog.h>
#include <Common/QueryProfiler.h>
#include <Common/TraceCollector.h>
namespace DB
{
struct TraceLogElement
{
using TimerDataType = DataTypeEnum8;
static const TimerDataType::Values timer_values;
using TraceDataType = DataTypeEnum8;
static const TraceDataType::Values trace_values;
time_t event_time{};
TimerType timer_type{};
UInt64 thread_id{};
String query_id{};
Array trace{};
time_t event_time;
TraceType trace_type;
UInt64 thread_id;
String query_id;
Array trace;
UInt64 size; /// Allocation size in bytes for |TraceType::MEMORY|
static std::string name() { return "TraceLog"; }
static Block createBlock();

View File

@ -7,7 +7,21 @@ RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
RUN apt-get update \
&& DEBIAN_FRONTEND=noninteractive apt-get install --yes --no-install-recommends \
p7zip-full bash curl git moreutils ncdu wget psmisc python3 python3-pip tzdata tree python3-dev g++ \
bash \
curl \
g++ \
git \
moreutils \
ncdu \
p7zip-full \
psmisc \
python3 \
python3-dev \
python3-pip \
tree \
tzdata \
vim \
wget \
&& pip3 --no-cache-dir install clickhouse_driver \
&& apt-get purge --yes python3-dev g++ \
&& apt-get autoremove --yes \

View File

@ -147,8 +147,8 @@ params['test_part'] = (
anchor = nextTableAnchor(),
caption = 'Tests times',
header = table_header(['Test', 'Wall clock time, s', 'Total client time, s',
'Number of queries',
'Number of short queries',
'Total queries',
'Ignored short queries',
'Longest query<br>(sum for all runs), s',
'Avg wall clock time<br>(sum for all runs), s',
'Shortest query<br>(sum for all runs), s']),

View File

@ -114,16 +114,16 @@ Features:
- SQL data modeling support for relational mapping of data.
### Looker
[Looker](https://looker.com) is a data platform and business intelligence tool with support for 50+ database dialects including ClickHouse. Looker is available as a SaaS platform and self-hosted. Users
can use Looker via the browser to explore data, build visualizations and dashboards, schedule reports, and share their
insights with colleagues. Looker provides a rich set of tools to embed these features in other applications, and an API
[Looker](https://looker.com) is a data platform and business intelligence tool with support for 50+ database dialects including ClickHouse. Looker is available as a SaaS platform and self-hosted. Users can use Looker via the browser to explore data, build visualizations and dashboards, schedule reports, and share their insights with colleagues. Looker provides a rich set of tools to embed these features in other applications, and an API
to integrate data with other applications.
Features:
- Designed around ease of use and self-service for end users.
- Easy and agile development using LookML, a language which supports currated
[Data Modeling](https://looker.com/platform/data-modeling) to support report writers and end users.
- Powerful workflow integration via Looker's [Data Actions](https://looker.com/platform/actions).
[How to configure ClickHouse in Looker.](https://docs.looker.com/setup-and-management/database-config/clickhouse)
[Original article](https://clickhouse.tech/docs/en/interfaces/third-party/gui/) <!--hide-->

View File

@ -117,3 +117,19 @@
- Моделирование данных с помощью SQL для их реляционного отображения.
[Оригинальная статья](https://clickhouse.tech/docs/ru/interfaces/third-party/gui/) <!--hide-->
### Looker
[Looker](https://looker.com) — платформа для обработки данных и бизнес-аналитики. Поддерживает более 50 диалектов баз данных, включая ClickHouse. Looker можно установить самостоятельно или воспользоваться готовой платформой SaaS.
Просмотр данных, построение отображений и дашбордов, планирование отчётов и обмен данными с коллегами доступны с помощью браузера. Также, Looker предоставляет ряд инструментов, позволяющих встраивать сервис в другие приложения и API для обмена данными.
Основные возможности:
- Язык LookML, поддерживающий [моделирование данных](https://looker.com/platform/data-modeling).
- Интеграция с различными системами с помощью [Data Actions](https://looker.com/platform/actions).
- Инструменты для встраивания сервиса в приложения.
- API.
[Как сконфигурировать ClickHouse в Looker.](https://docs.looker.com/setup-and-management/database-config/clickhouse)
[Original article](https://clickhouse.tech/docs/ru/interfaces/third-party/gui/) <!--hide-->

View File

@ -0,0 +1,43 @@
#pragma once
#include <memory>
namespace ext {
template <class T>
class Singleton
{
public:
Singleton()
{
if (!instance)
instance.reset(new T);
}
template <typename ... Args>
Singleton(const Args & ... args)
{
instance.reset(new T(args...));
/// TODO: throw exception on double-creation.
}
T * operator->()
{
return instance.get();
}
static bool isInitialized()
{
return !!instance;
}
static void reset()
{
instance.reset();
}
private:
inline static std::unique_ptr<T> instance{};
};
}

View File

@ -2019,6 +2019,57 @@ var results =
[0.006, 0.005, 0.005]
]
},
{
"system": "AWS a1.4xlarge (Graviton) 16 vCPU, 2.3 GHz, 32 GiB RAM, EBS",
"time": "2020-02-13 00:00:00",
"result":
[
[0.012, 0.003, 0.003],
[0.073, 0.031, 0.031],
[0.098, 0.053, 0.053],
[0.209, 0.139, 0.141],
[0.251, 0.200, 0.202],
[0.662, 0.439, 0.436],
[0.062, 0.041, 0.041],
[0.040, 0.033, 0.032],
[3.379, 0.720, 0.722],
[0.934, 0.847, 0.845],
[0.436, 0.379, 0.377],
[0.500, 0.417, 0.430],
[1.536, 1.381, 1.373],
[1.956, 1.832, 1.855],
[1.527, 1.458, 1.466],
[1.613, 1.576, 1.581],
[3.644, 3.490, 3.530],
[2.143, 1.982, 1.965],
[7.808, 7.617, 7.764],
[0.390, 0.179, 0.168],
[8.797, 2.308, 2.257],
[10.138, 2.533, 2.517],
[19.626, 5.738, 5.707],
[20.183, 2.195, 2.156],
[1.841, 0.577, 0.578],
[0.535, 0.479, 0.476],
[1.830, 0.578, 0.577],
[8.786, 2.521, 2.524],
[7.364, 2.941, 2.926],
[3.373, 3.186, 3.203],
[1.641, 1.213, 1.209],
[4.890, 1.964, 1.913],
[10.442, 10.410, 10.427],
[11.183, 7.431, 7.402],
[11.175, 7.460, 7.487],
[2.317, 2.232, 2.221],
[0.473, 0.406, 0.418],
[0.201, 0.187, 0.183],
[0.193, 0.144, 0.160],
[0.901, 0.811, 0.836],
[0.090, 0.046, 0.041],
[0.053, 0.032, 0.033],
[0.015, 0.012, 0.012]
]
},
];
</script>