mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-10 01:25:21 +00:00
Revert "Revert "Merge pull request #38953 from ClickHouse/add-allocation-ptr-to-trace-log"
This commit is contained in:
parent
6863cd152f
commit
2081408c15
647
src/AggregateFunctions/AggregateFunctionFlameGraph.cpp
Normal file
647
src/AggregateFunctions/AggregateFunctionFlameGraph.cpp
Normal file
@ -0,0 +1,647 @@
|
|||||||
|
#include <AggregateFunctions/AggregateFunctionFactory.h>
|
||||||
|
#include <AggregateFunctions/IAggregateFunction.h>
|
||||||
|
#include <AggregateFunctions/FactoryHelpers.h>
|
||||||
|
#include <Common/HashTable/HashMap.h>
|
||||||
|
#include <Common/SymbolIndex.h>
|
||||||
|
#include <Common/ArenaAllocator.h>
|
||||||
|
#include <Core/Settings.h>
|
||||||
|
#include <Columns/ColumnArray.h>
|
||||||
|
#include <Columns/ColumnString.h>
|
||||||
|
#include <Columns/ColumnsNumber.h>
|
||||||
|
#include <DataTypes/DataTypeArray.h>
|
||||||
|
#include <DataTypes/DataTypeString.h>
|
||||||
|
#include <DataTypes/DataTypesNumber.h>
|
||||||
|
#include <IO/WriteHelpers.h>
|
||||||
|
#include <IO/Operators.h>
|
||||||
|
#include <filesystem>
|
||||||
|
|
||||||
|
namespace DB
|
||||||
|
{
|
||||||
|
namespace ErrorCodes
|
||||||
|
{
|
||||||
|
extern const int FUNCTION_NOT_ALLOWED;
|
||||||
|
extern const int NOT_IMPLEMENTED;
|
||||||
|
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
|
||||||
|
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
|
||||||
|
}
|
||||||
|
|
||||||
|
struct AggregateFunctionFlameGraphTree
|
||||||
|
{
|
||||||
|
struct ListNode;
|
||||||
|
|
||||||
|
struct TreeNode
|
||||||
|
{
|
||||||
|
TreeNode * parent = nullptr;
|
||||||
|
ListNode * children = nullptr;
|
||||||
|
UInt64 ptr = 0;
|
||||||
|
size_t allocated = 0;
|
||||||
|
};
|
||||||
|
|
||||||
|
struct ListNode
|
||||||
|
{
|
||||||
|
ListNode * next = nullptr;
|
||||||
|
TreeNode * child = nullptr;
|
||||||
|
};
|
||||||
|
|
||||||
|
TreeNode root;
|
||||||
|
|
||||||
|
static ListNode * createChild(TreeNode * parent, UInt64 ptr, Arena * arena)
|
||||||
|
{
|
||||||
|
|
||||||
|
ListNode * list_node = reinterpret_cast<ListNode *>(arena->alloc(sizeof(ListNode)));
|
||||||
|
TreeNode * tree_node = reinterpret_cast<TreeNode *>(arena->alloc(sizeof(TreeNode)));
|
||||||
|
|
||||||
|
list_node->child = tree_node;
|
||||||
|
list_node->next = nullptr;
|
||||||
|
|
||||||
|
tree_node->parent =parent;
|
||||||
|
tree_node->children = nullptr;
|
||||||
|
tree_node->ptr = ptr;
|
||||||
|
tree_node->allocated = 0;
|
||||||
|
|
||||||
|
return list_node;
|
||||||
|
}
|
||||||
|
|
||||||
|
TreeNode * find(const UInt64 * stack, size_t stack_size, Arena * arena)
|
||||||
|
{
|
||||||
|
TreeNode * node = &root;
|
||||||
|
for (size_t i = 0; i < stack_size; ++i)
|
||||||
|
{
|
||||||
|
UInt64 ptr = stack[i];
|
||||||
|
if (ptr == 0)
|
||||||
|
break;
|
||||||
|
|
||||||
|
if (!node->children)
|
||||||
|
{
|
||||||
|
node->children = createChild(node, ptr, arena);
|
||||||
|
node = node->children->child;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
ListNode * list = node->children;
|
||||||
|
while (list->child->ptr != ptr && list->next)
|
||||||
|
list = list->next;
|
||||||
|
|
||||||
|
if (list->child->ptr != ptr)
|
||||||
|
{
|
||||||
|
list->next = createChild(node, ptr, arena);
|
||||||
|
list = list->next;
|
||||||
|
}
|
||||||
|
|
||||||
|
node = list->child;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return node;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void append(DB::PaddedPODArray<UInt64> & values, DB::PaddedPODArray<UInt64> & offsets, std::vector<UInt64> & frame)
|
||||||
|
{
|
||||||
|
UInt64 prev = offsets.empty() ? 0 : offsets.back();
|
||||||
|
offsets.push_back(prev + frame.size());
|
||||||
|
for (UInt64 val : frame)
|
||||||
|
values.push_back(val);
|
||||||
|
}
|
||||||
|
|
||||||
|
struct Trace
|
||||||
|
{
|
||||||
|
using Frames = std::vector<UInt64>;
|
||||||
|
|
||||||
|
Frames frames;
|
||||||
|
|
||||||
|
/// The total number of bytes allocated for traces with the same prefix.
|
||||||
|
size_t allocated_total = 0;
|
||||||
|
/// This counter is relevant in case we want to filter some traces with small amount of bytes.
|
||||||
|
/// It shows the total number of bytes for *filtered* traces with the same prefix.
|
||||||
|
/// This is the value which is used in flamegraph.
|
||||||
|
size_t allocated_self = 0;
|
||||||
|
};
|
||||||
|
|
||||||
|
using Traces = std::vector<Trace>;
|
||||||
|
|
||||||
|
Traces dump(size_t max_depth, size_t min_bytes) const
|
||||||
|
{
|
||||||
|
Traces traces;
|
||||||
|
Trace::Frames frames;
|
||||||
|
std::vector<size_t> allocated_total;
|
||||||
|
std::vector<size_t> allocated_self;
|
||||||
|
std::vector<ListNode *> nodes;
|
||||||
|
|
||||||
|
nodes.push_back(root.children);
|
||||||
|
allocated_total.push_back(root.allocated);
|
||||||
|
allocated_self.push_back(root.allocated);
|
||||||
|
|
||||||
|
while (!nodes.empty())
|
||||||
|
{
|
||||||
|
if (nodes.back() == nullptr)
|
||||||
|
{
|
||||||
|
traces.push_back({frames, allocated_total.back(), allocated_self.back()});
|
||||||
|
|
||||||
|
nodes.pop_back();
|
||||||
|
allocated_total.pop_back();
|
||||||
|
allocated_self.pop_back();
|
||||||
|
|
||||||
|
/// We don't have root's frame so framers are empty in the end.
|
||||||
|
if (!frames.empty())
|
||||||
|
frames.pop_back();
|
||||||
|
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
TreeNode * current = nodes.back()->child;
|
||||||
|
nodes.back() = nodes.back()->next;
|
||||||
|
|
||||||
|
bool enough_bytes = current->allocated >= min_bytes;
|
||||||
|
bool enough_depth = max_depth == 0 || nodes.size() < max_depth;
|
||||||
|
|
||||||
|
if (enough_bytes)
|
||||||
|
{
|
||||||
|
frames.push_back(current->ptr);
|
||||||
|
allocated_self.back() -= current->allocated;
|
||||||
|
|
||||||
|
if (enough_depth)
|
||||||
|
{
|
||||||
|
allocated_total.push_back(current->allocated);
|
||||||
|
allocated_self.push_back(current->allocated);
|
||||||
|
nodes.push_back(current->children);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
traces.push_back({frames, current->allocated, current->allocated});
|
||||||
|
frames.pop_back();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return traces;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
static void insertData(DB::PaddedPODArray<UInt8> & chars, DB::PaddedPODArray<UInt64> & offsets, const char * pos, size_t length)
|
||||||
|
{
|
||||||
|
const size_t old_size = chars.size();
|
||||||
|
const size_t new_size = old_size + length + 1;
|
||||||
|
|
||||||
|
chars.resize(new_size);
|
||||||
|
if (length)
|
||||||
|
memcpy(chars.data() + old_size, pos, length);
|
||||||
|
chars[old_size + length] = 0;
|
||||||
|
offsets.push_back(new_size);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Split str by line feed and write as separate row to ColumnString.
|
||||||
|
static void fillColumn(DB::PaddedPODArray<UInt8> & chars, DB::PaddedPODArray<UInt64> & offsets, const std::string & str)
|
||||||
|
{
|
||||||
|
size_t start = 0;
|
||||||
|
size_t end = 0;
|
||||||
|
size_t size = str.size();
|
||||||
|
|
||||||
|
while (end < size)
|
||||||
|
{
|
||||||
|
if (str[end] == '\n')
|
||||||
|
{
|
||||||
|
insertData(chars, offsets, str.data() + start, end - start);
|
||||||
|
start = end + 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
++end;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (start < end)
|
||||||
|
insertData(chars, offsets, str.data() + start, end - start);
|
||||||
|
}
|
||||||
|
|
||||||
|
void dumpFlameGraph(
|
||||||
|
const AggregateFunctionFlameGraphTree::Traces & traces,
|
||||||
|
DB::PaddedPODArray<UInt8> & chars,
|
||||||
|
DB::PaddedPODArray<UInt64> & offsets)
|
||||||
|
{
|
||||||
|
DB::WriteBufferFromOwnString out;
|
||||||
|
|
||||||
|
std::unordered_map<uintptr_t, size_t> mapping;
|
||||||
|
|
||||||
|
#if defined(__ELF__) && !defined(OS_FREEBSD)
|
||||||
|
auto symbol_index_ptr = DB::SymbolIndex::instance();
|
||||||
|
const DB::SymbolIndex & symbol_index = *symbol_index_ptr;
|
||||||
|
#endif
|
||||||
|
|
||||||
|
for (const auto & trace : traces)
|
||||||
|
{
|
||||||
|
if (trace.allocated_self == 0)
|
||||||
|
continue;
|
||||||
|
|
||||||
|
for (size_t i = 0; i < trace.frames.size(); ++i)
|
||||||
|
{
|
||||||
|
if (i)
|
||||||
|
out << ";";
|
||||||
|
|
||||||
|
const void * ptr = reinterpret_cast<const void *>(trace.frames[i]);
|
||||||
|
|
||||||
|
#if defined(__ELF__) && !defined(OS_FREEBSD)
|
||||||
|
if (const auto * symbol = symbol_index.findSymbol(ptr))
|
||||||
|
writeString(demangle(symbol->name), out);
|
||||||
|
else
|
||||||
|
DB::writePointerHex(ptr, out);
|
||||||
|
#else
|
||||||
|
DB::writePointerHex(ptr, out);
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
|
||||||
|
out << ' ' << trace.allocated_self << "\n";
|
||||||
|
}
|
||||||
|
|
||||||
|
fillColumn(chars, offsets, out.str());
|
||||||
|
}
|
||||||
|
|
||||||
|
struct AggregateFunctionFlameGraphData
|
||||||
|
{
|
||||||
|
struct Entry
|
||||||
|
{
|
||||||
|
AggregateFunctionFlameGraphTree::TreeNode * trace;
|
||||||
|
UInt64 size;
|
||||||
|
Entry * next = nullptr;
|
||||||
|
};
|
||||||
|
|
||||||
|
struct Pair
|
||||||
|
{
|
||||||
|
Entry * allocation = nullptr;
|
||||||
|
Entry * deallocation = nullptr;
|
||||||
|
};
|
||||||
|
|
||||||
|
using Entries = HashMap<UInt64, Pair>;
|
||||||
|
|
||||||
|
AggregateFunctionFlameGraphTree tree;
|
||||||
|
Entries entries;
|
||||||
|
Entry * free_list = nullptr;
|
||||||
|
|
||||||
|
Entry * alloc(Arena * arena)
|
||||||
|
{
|
||||||
|
if (free_list)
|
||||||
|
{
|
||||||
|
auto * res = free_list;
|
||||||
|
free_list = free_list->next;
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
|
||||||
|
return reinterpret_cast<Entry *>(arena->alloc(sizeof(Entry)));
|
||||||
|
}
|
||||||
|
|
||||||
|
void release(Entry * entry)
|
||||||
|
{
|
||||||
|
entry->next = free_list;
|
||||||
|
free_list = entry;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void track(Entry * allocation)
|
||||||
|
{
|
||||||
|
auto * node = allocation->trace;
|
||||||
|
while (node)
|
||||||
|
{
|
||||||
|
node->allocated += allocation->size;
|
||||||
|
node = node->parent;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static void untrack(Entry * allocation)
|
||||||
|
{
|
||||||
|
auto * node = allocation->trace;
|
||||||
|
while (node)
|
||||||
|
{
|
||||||
|
node->allocated -= allocation->size;
|
||||||
|
node = node->parent;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static Entry * tryFindMatchAndRemove(Entry *& list, UInt64 size)
|
||||||
|
{
|
||||||
|
if (!list)
|
||||||
|
return nullptr;
|
||||||
|
|
||||||
|
if (list->size == size)
|
||||||
|
{
|
||||||
|
Entry * entry = list;
|
||||||
|
list = list->next;
|
||||||
|
return entry;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
Entry * parent = list;
|
||||||
|
while (parent->next && parent->next->size != size)
|
||||||
|
parent = parent->next;
|
||||||
|
|
||||||
|
if (parent->next && parent->next->size == size)
|
||||||
|
{
|
||||||
|
Entry * entry = parent->next;
|
||||||
|
parent->next = entry->next;
|
||||||
|
return entry;
|
||||||
|
}
|
||||||
|
|
||||||
|
return nullptr;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void add(UInt64 ptr, Int64 size, const UInt64 * stack, size_t stack_size, Arena * arena)
|
||||||
|
{
|
||||||
|
/// In case if argument is nullptr, only track allocations.
|
||||||
|
if (ptr == 0)
|
||||||
|
{
|
||||||
|
if (size > 0)
|
||||||
|
{
|
||||||
|
auto * node = tree.find(stack, stack_size, arena);
|
||||||
|
Entry entry{.trace = node, .size = UInt64(size)};
|
||||||
|
track(&entry);
|
||||||
|
}
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
auto & place = entries[ptr];
|
||||||
|
if (size > 0)
|
||||||
|
{
|
||||||
|
if (auto * deallocation = tryFindMatchAndRemove(place.deallocation, size))
|
||||||
|
{
|
||||||
|
release(deallocation);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
auto * node = tree.find(stack, stack_size, arena);
|
||||||
|
|
||||||
|
auto * allocation = alloc(arena);
|
||||||
|
allocation->size = UInt64(size);
|
||||||
|
allocation->trace = node;
|
||||||
|
|
||||||
|
track(allocation);
|
||||||
|
|
||||||
|
allocation->next = place.allocation;
|
||||||
|
place.allocation = allocation;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else if (size < 0)
|
||||||
|
{
|
||||||
|
UInt64 abs_size = -size;
|
||||||
|
if (auto * allocation = tryFindMatchAndRemove(place.allocation, abs_size))
|
||||||
|
{
|
||||||
|
untrack(allocation);
|
||||||
|
release(allocation);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
auto * deallocation = alloc(arena);
|
||||||
|
deallocation->size = abs_size;
|
||||||
|
|
||||||
|
deallocation->next = place.deallocation;
|
||||||
|
place.deallocation = deallocation;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void merge(const AggregateFunctionFlameGraphTree & other_tree, Arena * arena)
|
||||||
|
{
|
||||||
|
AggregateFunctionFlameGraphTree::Trace::Frames frames;
|
||||||
|
std::vector<AggregateFunctionFlameGraphTree::ListNode *> nodes;
|
||||||
|
|
||||||
|
nodes.push_back(other_tree.root.children);
|
||||||
|
|
||||||
|
while (!nodes.empty())
|
||||||
|
{
|
||||||
|
if (nodes.back() == nullptr)
|
||||||
|
{
|
||||||
|
nodes.pop_back();
|
||||||
|
|
||||||
|
/// We don't have root's frame so framers are empty in the end.
|
||||||
|
if (!frames.empty())
|
||||||
|
frames.pop_back();
|
||||||
|
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
AggregateFunctionFlameGraphTree::TreeNode * current = nodes.back()->child;
|
||||||
|
nodes.back() = nodes.back()->next;
|
||||||
|
|
||||||
|
frames.push_back(current->ptr);
|
||||||
|
|
||||||
|
if (current->children)
|
||||||
|
nodes.push_back(current->children);
|
||||||
|
else
|
||||||
|
{
|
||||||
|
if (current->allocated)
|
||||||
|
add(0, current->allocated, frames.data(), frames.size(), arena);
|
||||||
|
|
||||||
|
frames.pop_back();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void merge(const AggregateFunctionFlameGraphData & other, Arena * arena)
|
||||||
|
{
|
||||||
|
AggregateFunctionFlameGraphTree::Trace::Frames frames;
|
||||||
|
for (const auto & entry : other.entries)
|
||||||
|
{
|
||||||
|
for (auto * allocation = entry.value.second.allocation; allocation; allocation = allocation->next)
|
||||||
|
{
|
||||||
|
frames.clear();
|
||||||
|
const auto * node = allocation->trace;
|
||||||
|
while (node->ptr)
|
||||||
|
{
|
||||||
|
frames.push_back(node->ptr);
|
||||||
|
node = node->parent;
|
||||||
|
}
|
||||||
|
|
||||||
|
std::reverse(frames.begin(), frames.end());
|
||||||
|
add(entry.value.first, allocation->size, frames.data(), frames.size(), arena);
|
||||||
|
untrack(allocation);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (auto * deallocation = entry.value.second.deallocation; deallocation; deallocation = deallocation->next)
|
||||||
|
{
|
||||||
|
add(entry.value.first, -Int64(deallocation->size), nullptr, 0, arena);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
merge(other.tree, arena);
|
||||||
|
}
|
||||||
|
|
||||||
|
void dumpFlameGraph(
|
||||||
|
DB::PaddedPODArray<UInt8> & chars,
|
||||||
|
DB::PaddedPODArray<UInt64> & offsets,
|
||||||
|
size_t max_depth, size_t min_bytes) const
|
||||||
|
{
|
||||||
|
DB::dumpFlameGraph(tree.dump(max_depth, min_bytes), chars, offsets);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
/// Aggregate function which builds a flamegraph using the list of stacktraces.
|
||||||
|
/// The output is an array of strings which can be used by flamegraph.pl util.
|
||||||
|
/// See https://github.com/brendangregg/FlameGraph
|
||||||
|
///
|
||||||
|
/// Syntax: flameGraph(traces, [size = 1], [ptr = 0])
|
||||||
|
/// - trace : Array(UInt64), a stacktrace
|
||||||
|
/// - size : Int64, an allocation size (for memory profiling)
|
||||||
|
/// - ptr : UInt64, an allocation address
|
||||||
|
/// In case if ptr != 0, a flameGraph will map allocations (size > 0) and deallocations (size < 0) with the same size and ptr.
|
||||||
|
/// Only allocations which were not freed are shown. Not mapped deallocations are ignored.
|
||||||
|
///
|
||||||
|
/// Usage:
|
||||||
|
///
|
||||||
|
/// * Build a flamegraph based on CPU query profiler
|
||||||
|
/// set query_profiler_cpu_time_period_ns=10000000;
|
||||||
|
/// SELECT SearchPhrase, COUNT(DISTINCT UserID) AS u FROM hits WHERE SearchPhrase <> '' GROUP BY SearchPhrase ORDER BY u DESC LIMIT 10;
|
||||||
|
/// clickhouse client --allow_introspection_functions=1
|
||||||
|
/// -q "select arrayJoin(flameGraph(arrayReverse(trace))) from system.trace_log where trace_type = 'CPU' and query_id = 'xxx'"
|
||||||
|
/// | ~/dev/FlameGraph/flamegraph.pl > flame_cpu.svg
|
||||||
|
///
|
||||||
|
/// * Build a flamegraph based on memory query profiler, showing all allocations
|
||||||
|
/// set memory_profiler_sample_probability=1, max_untracked_memory=1;
|
||||||
|
/// SELECT SearchPhrase, COUNT(DISTINCT UserID) AS u FROM hits WHERE SearchPhrase <> '' GROUP BY SearchPhrase ORDER BY u DESC LIMIT 10;
|
||||||
|
/// clickhouse client --allow_introspection_functions=1
|
||||||
|
/// -q "select arrayJoin(flameGraph(trace, size)) from system.trace_log where trace_type = 'MemorySample' and query_id = 'xxx'"
|
||||||
|
/// | ~/dev/FlameGraph/flamegraph.pl --countname=bytes --color=mem > flame_mem.svg
|
||||||
|
///
|
||||||
|
/// * Build a flamegraph based on memory query profiler, showing allocations which were not deallocated in query context
|
||||||
|
/// set memory_profiler_sample_probability=1, max_untracked_memory=1, use_uncompressed_cache=1, merge_tree_max_rows_to_use_cache=100000000000, merge_tree_max_bytes_to_use_cache=1000000000000;
|
||||||
|
/// SELECT SearchPhrase, COUNT(DISTINCT UserID) AS u FROM hits WHERE SearchPhrase <> '' GROUP BY SearchPhrase ORDER BY u DESC LIMIT 10;
|
||||||
|
/// clickhouse client --allow_introspection_functions=1
|
||||||
|
/// -q "select arrayJoin(flameGraph(trace, size, ptr)) from system.trace_log where trace_type = 'MemorySample' and query_id = 'xxx'"
|
||||||
|
/// | ~/dev/FlameGraph/flamegraph.pl --countname=bytes --color=mem > flame_mem_untracked.svg
|
||||||
|
///
|
||||||
|
/// * Build a flamegraph based on memory query profiler, showing active allocations at the fixed point of time
|
||||||
|
/// set memory_profiler_sample_probability=1, max_untracked_memory=1;
|
||||||
|
/// SELECT SearchPhrase, COUNT(DISTINCT UserID) AS u FROM hits WHERE SearchPhrase <> '' GROUP BY SearchPhrase ORDER BY u DESC LIMIT 10;
|
||||||
|
/// 1. Memory usage per second
|
||||||
|
/// select event_time, m, formatReadableSize(max(s) as m) from (select event_time, sum(size) over (order by event_time) as s from system.trace_log where query_id = 'xxx' and trace_type = 'MemorySample') group by event_time order by event_time;
|
||||||
|
/// 2. Find a time point with maximal memory usage
|
||||||
|
/// select argMax(event_time, s), max(s) from (select event_time, sum(size) over (order by event_time) as s from system.trace_log where query_id = 'xxx' and trace_type = 'MemorySample');
|
||||||
|
/// 3. Fix active allocations at fixed point of time
|
||||||
|
/// clickhouse client --allow_introspection_functions=1
|
||||||
|
/// -q "select arrayJoin(flameGraph(trace, size, ptr)) from (select * from system.trace_log where trace_type = 'MemorySample' and query_id = 'xxx' and event_time <= 'yyy' order by event_time)"
|
||||||
|
/// | ~/dev/FlameGraph/flamegraph.pl --countname=bytes --color=mem > flame_mem_time_point_pos.svg
|
||||||
|
/// 4. Find deallocations at fixed point of time
|
||||||
|
/// clickhouse client --allow_introspection_functions=1
|
||||||
|
/// -q "select arrayJoin(flameGraph(trace, -size, ptr)) from (select * from system.trace_log where trace_type = 'MemorySample' and query_id = 'xxx' and event_time > 'yyy' order by event_time desc)"
|
||||||
|
/// | ~/dev/FlameGraph/flamegraph.pl --countname=bytes --color=mem > flame_mem_time_point_neg.svg
|
||||||
|
class AggregateFunctionFlameGraph final : public IAggregateFunctionDataHelper<AggregateFunctionFlameGraphData, AggregateFunctionFlameGraph>
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
explicit AggregateFunctionFlameGraph(const DataTypes & argument_types_)
|
||||||
|
: IAggregateFunctionDataHelper<AggregateFunctionFlameGraphData, AggregateFunctionFlameGraph>(argument_types_, {}, createResultType())
|
||||||
|
{}
|
||||||
|
|
||||||
|
String getName() const override { return "flameGraph"; }
|
||||||
|
|
||||||
|
static DataTypePtr createResultType()
|
||||||
|
{
|
||||||
|
return std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>());
|
||||||
|
}
|
||||||
|
|
||||||
|
bool allocatesMemoryInArena() const override { return true; }
|
||||||
|
|
||||||
|
void add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena * arena) const override
|
||||||
|
{
|
||||||
|
const auto * trace = typeid_cast<const ColumnArray *>(columns[0]);
|
||||||
|
|
||||||
|
const auto & trace_offsets = trace->getOffsets();
|
||||||
|
const auto & trace_values = typeid_cast<const ColumnUInt64 *>(&trace->getData())->getData();
|
||||||
|
UInt64 prev_offset = 0;
|
||||||
|
if (row_num)
|
||||||
|
prev_offset = trace_offsets[row_num - 1];
|
||||||
|
UInt64 trace_size = trace_offsets[row_num] - prev_offset;
|
||||||
|
|
||||||
|
Int64 allocated = 1;
|
||||||
|
if (argument_types.size() >= 2)
|
||||||
|
{
|
||||||
|
const auto & sizes = typeid_cast<const ColumnInt64 *>(columns[1])->getData();
|
||||||
|
allocated = sizes[row_num];
|
||||||
|
}
|
||||||
|
|
||||||
|
UInt64 ptr = 0;
|
||||||
|
if (argument_types.size() >= 3)
|
||||||
|
{
|
||||||
|
const auto & ptrs = typeid_cast<const ColumnUInt64 *>(columns[2])->getData();
|
||||||
|
ptr = ptrs[row_num];
|
||||||
|
}
|
||||||
|
|
||||||
|
this->data(place).add(ptr, allocated, trace_values.data() + prev_offset, trace_size, arena);
|
||||||
|
}
|
||||||
|
|
||||||
|
void addManyDefaults(
|
||||||
|
AggregateDataPtr __restrict /*place*/,
|
||||||
|
const IColumn ** /*columns*/,
|
||||||
|
size_t /*length*/,
|
||||||
|
Arena * /*arena*/) const override
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena * arena) const override
|
||||||
|
{
|
||||||
|
this->data(place).merge(this->data(rhs), arena);
|
||||||
|
}
|
||||||
|
|
||||||
|
void serialize(ConstAggregateDataPtr __restrict, WriteBuffer &, std::optional<size_t> /* version */) const override
|
||||||
|
{
|
||||||
|
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Serialization for function flameGraph is not implemented.");
|
||||||
|
}
|
||||||
|
|
||||||
|
void deserialize(AggregateDataPtr __restrict, ReadBuffer &, std::optional<size_t> /* version */, Arena *) const override
|
||||||
|
{
|
||||||
|
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Deserialization for function flameGraph is not implemented.");
|
||||||
|
}
|
||||||
|
|
||||||
|
void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena *) const override
|
||||||
|
{
|
||||||
|
auto & array = assert_cast<ColumnArray &>(to);
|
||||||
|
auto & str = assert_cast<ColumnString &>(array.getData());
|
||||||
|
|
||||||
|
this->data(place).dumpFlameGraph(str.getChars(), str.getOffsets(), 0, 0);
|
||||||
|
|
||||||
|
array.getOffsets().push_back(str.size());
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
static void check(const std::string & name, const DataTypes & argument_types, const Array & params)
|
||||||
|
{
|
||||||
|
assertNoParameters(name, params);
|
||||||
|
|
||||||
|
if (argument_types.empty() || argument_types.size() > 3)
|
||||||
|
throw Exception(
|
||||||
|
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
|
||||||
|
"Aggregate function {} requires 1 to 3 arguments : trace, [size = 1], [ptr = 0]",
|
||||||
|
name);
|
||||||
|
|
||||||
|
auto ptr_type = std::make_shared<DataTypeUInt64>();
|
||||||
|
auto trace_type = std::make_shared<DataTypeArray>(ptr_type);
|
||||||
|
auto size_type = std::make_shared<DataTypeInt64>();
|
||||||
|
|
||||||
|
if (!argument_types[0]->equals(*trace_type))
|
||||||
|
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
|
||||||
|
"First argument (trace) for function {} must be Array(UInt64), but it has type {}",
|
||||||
|
name, argument_types[0]->getName());
|
||||||
|
|
||||||
|
if (argument_types.size() >= 2 && !argument_types[1]->equals(*size_type))
|
||||||
|
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
|
||||||
|
"Second argument (size) for function {} must be Int64, but it has type {}",
|
||||||
|
name, argument_types[1]->getName());
|
||||||
|
|
||||||
|
if (argument_types.size() >= 3 && !argument_types[2]->equals(*ptr_type))
|
||||||
|
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
|
||||||
|
"Third argument (ptr) for function {} must be UInt64, but it has type {}",
|
||||||
|
name, argument_types[2]->getName());
|
||||||
|
}
|
||||||
|
|
||||||
|
AggregateFunctionPtr createAggregateFunctionFlameGraph(const std::string & name, const DataTypes & argument_types, const Array & params, const Settings * settings)
|
||||||
|
{
|
||||||
|
if (!settings->allow_introspection_functions)
|
||||||
|
throw Exception(ErrorCodes::FUNCTION_NOT_ALLOWED,
|
||||||
|
"Introspection functions are disabled, because setting 'allow_introspection_functions' is set to 0");
|
||||||
|
|
||||||
|
check(name, argument_types, params);
|
||||||
|
return std::make_shared<AggregateFunctionFlameGraph>(argument_types);
|
||||||
|
}
|
||||||
|
|
||||||
|
void registerAggregateFunctionFlameGraph(AggregateFunctionFactory & factory)
|
||||||
|
{
|
||||||
|
AggregateFunctionProperties properties = { .returns_default_when_only_null = true, .is_order_dependent = true };
|
||||||
|
|
||||||
|
factory.registerFunction("flameGraph", { createAggregateFunctionFlameGraph, properties });
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -74,6 +74,7 @@ void registerAggregateFunctionExponentialMovingAverage(AggregateFunctionFactory
|
|||||||
void registerAggregateFunctionSparkbar(AggregateFunctionFactory &);
|
void registerAggregateFunctionSparkbar(AggregateFunctionFactory &);
|
||||||
void registerAggregateFunctionIntervalLengthSum(AggregateFunctionFactory &);
|
void registerAggregateFunctionIntervalLengthSum(AggregateFunctionFactory &);
|
||||||
void registerAggregateFunctionAnalysisOfVariance(AggregateFunctionFactory &);
|
void registerAggregateFunctionAnalysisOfVariance(AggregateFunctionFactory &);
|
||||||
|
void registerAggregateFunctionFlameGraph(AggregateFunctionFactory &);
|
||||||
|
|
||||||
class AggregateFunctionCombinatorFactory;
|
class AggregateFunctionCombinatorFactory;
|
||||||
void registerAggregateFunctionCombinatorIf(AggregateFunctionCombinatorFactory &);
|
void registerAggregateFunctionCombinatorIf(AggregateFunctionCombinatorFactory &);
|
||||||
@ -160,6 +161,7 @@ void registerAggregateFunctions()
|
|||||||
registerAggregateFunctionExponentialMovingAverage(factory);
|
registerAggregateFunctionExponentialMovingAverage(factory);
|
||||||
registerAggregateFunctionSparkbar(factory);
|
registerAggregateFunctionSparkbar(factory);
|
||||||
registerAggregateFunctionAnalysisOfVariance(factory);
|
registerAggregateFunctionAnalysisOfVariance(factory);
|
||||||
|
registerAggregateFunctionFlameGraph(factory);
|
||||||
|
|
||||||
registerWindowFunctions(factory);
|
registerWindowFunctions(factory);
|
||||||
}
|
}
|
||||||
|
16
src/Common/AllocationTrace.h
Normal file
16
src/Common/AllocationTrace.h
Normal file
@ -0,0 +1,16 @@
|
|||||||
|
#pragma once
|
||||||
|
#include <cstddef>
|
||||||
|
|
||||||
|
/// This is a structure which is returned by MemoryTracker.
|
||||||
|
/// Methods onAlloc/onFree should be called after actual memory allocation if it succeed.
|
||||||
|
/// For now, it will only collect allocation trace with sample_probability.
|
||||||
|
struct AllocationTrace
|
||||||
|
{
|
||||||
|
AllocationTrace() = default;
|
||||||
|
explicit AllocationTrace(double sample_probability_);
|
||||||
|
|
||||||
|
void onAlloc(void * ptr, size_t size) const;
|
||||||
|
void onFree(void * ptr, size_t size) const;
|
||||||
|
|
||||||
|
double sample_probability = 0;
|
||||||
|
};
|
@ -92,8 +92,10 @@ public:
|
|||||||
void * alloc(size_t size, size_t alignment = 0)
|
void * alloc(size_t size, size_t alignment = 0)
|
||||||
{
|
{
|
||||||
checkSize(size);
|
checkSize(size);
|
||||||
CurrentMemoryTracker::alloc(size);
|
auto trace = CurrentMemoryTracker::alloc(size);
|
||||||
return allocNoTrack(size, alignment);
|
void * ptr = allocNoTrack(size, alignment);
|
||||||
|
trace.onAlloc(ptr, size);
|
||||||
|
return ptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Free memory range.
|
/// Free memory range.
|
||||||
@ -103,7 +105,8 @@ public:
|
|||||||
{
|
{
|
||||||
checkSize(size);
|
checkSize(size);
|
||||||
freeNoTrack(buf, size);
|
freeNoTrack(buf, size);
|
||||||
CurrentMemoryTracker::free(size);
|
auto trace = CurrentMemoryTracker::free(size);
|
||||||
|
trace.onFree(buf, size);
|
||||||
}
|
}
|
||||||
catch (...)
|
catch (...)
|
||||||
{
|
{
|
||||||
@ -129,13 +132,16 @@ public:
|
|||||||
&& alignment <= MALLOC_MIN_ALIGNMENT)
|
&& alignment <= MALLOC_MIN_ALIGNMENT)
|
||||||
{
|
{
|
||||||
/// Resize malloc'd memory region with no special alignment requirement.
|
/// Resize malloc'd memory region with no special alignment requirement.
|
||||||
CurrentMemoryTracker::realloc(old_size, new_size);
|
auto trace = CurrentMemoryTracker::realloc(old_size, new_size);
|
||||||
|
trace.onFree(buf, old_size);
|
||||||
|
|
||||||
void * new_buf = ::realloc(buf, new_size);
|
void * new_buf = ::realloc(buf, new_size);
|
||||||
if (nullptr == new_buf)
|
if (nullptr == new_buf)
|
||||||
DB::throwFromErrno(fmt::format("Allocator: Cannot realloc from {} to {}.", ReadableSize(old_size), ReadableSize(new_size)), DB::ErrorCodes::CANNOT_ALLOCATE_MEMORY);
|
DB::throwFromErrno(fmt::format("Allocator: Cannot realloc from {} to {}.", ReadableSize(old_size), ReadableSize(new_size)), DB::ErrorCodes::CANNOT_ALLOCATE_MEMORY);
|
||||||
|
|
||||||
buf = new_buf;
|
buf = new_buf;
|
||||||
|
trace.onAlloc(buf, new_size);
|
||||||
|
|
||||||
if constexpr (clear_memory)
|
if constexpr (clear_memory)
|
||||||
if (new_size > old_size)
|
if (new_size > old_size)
|
||||||
memset(reinterpret_cast<char *>(buf) + old_size, 0, new_size - old_size);
|
memset(reinterpret_cast<char *>(buf) + old_size, 0, new_size - old_size);
|
||||||
@ -143,7 +149,8 @@ public:
|
|||||||
else if (old_size >= MMAP_THRESHOLD && new_size >= MMAP_THRESHOLD)
|
else if (old_size >= MMAP_THRESHOLD && new_size >= MMAP_THRESHOLD)
|
||||||
{
|
{
|
||||||
/// Resize mmap'd memory region.
|
/// Resize mmap'd memory region.
|
||||||
CurrentMemoryTracker::realloc(old_size, new_size);
|
auto trace = CurrentMemoryTracker::realloc(old_size, new_size);
|
||||||
|
trace.onFree(buf, old_size);
|
||||||
|
|
||||||
// On apple and freebsd self-implemented mremap used (common/mremap.h)
|
// On apple and freebsd self-implemented mremap used (common/mremap.h)
|
||||||
buf = clickhouse_mremap(buf, old_size, new_size, MREMAP_MAYMOVE,
|
buf = clickhouse_mremap(buf, old_size, new_size, MREMAP_MAYMOVE,
|
||||||
@ -152,14 +159,17 @@ public:
|
|||||||
DB::throwFromErrno(fmt::format("Allocator: Cannot mremap memory chunk from {} to {}.",
|
DB::throwFromErrno(fmt::format("Allocator: Cannot mremap memory chunk from {} to {}.",
|
||||||
ReadableSize(old_size), ReadableSize(new_size)), DB::ErrorCodes::CANNOT_MREMAP);
|
ReadableSize(old_size), ReadableSize(new_size)), DB::ErrorCodes::CANNOT_MREMAP);
|
||||||
|
|
||||||
|
trace.onAlloc(buf, new_size);
|
||||||
/// No need for zero-fill, because mmap guarantees it.
|
/// No need for zero-fill, because mmap guarantees it.
|
||||||
}
|
}
|
||||||
else if (new_size < MMAP_THRESHOLD)
|
else if (new_size < MMAP_THRESHOLD)
|
||||||
{
|
{
|
||||||
/// Small allocs that requires a copy. Assume there's enough memory in system. Call CurrentMemoryTracker once.
|
/// Small allocs that requires a copy. Assume there's enough memory in system. Call CurrentMemoryTracker once.
|
||||||
CurrentMemoryTracker::realloc(old_size, new_size);
|
auto trace = CurrentMemoryTracker::realloc(old_size, new_size);
|
||||||
|
trace.onFree(buf, old_size);
|
||||||
|
|
||||||
void * new_buf = allocNoTrack(new_size, alignment);
|
void * new_buf = allocNoTrack(new_size, alignment);
|
||||||
|
trace.onAlloc(new_buf, new_size);
|
||||||
memcpy(new_buf, buf, std::min(old_size, new_size));
|
memcpy(new_buf, buf, std::min(old_size, new_size));
|
||||||
freeNoTrack(buf, old_size);
|
freeNoTrack(buf, old_size);
|
||||||
buf = new_buf;
|
buf = new_buf;
|
||||||
|
@ -30,21 +30,24 @@ struct AllocatorWithMemoryTracking
|
|||||||
throw std::bad_alloc();
|
throw std::bad_alloc();
|
||||||
|
|
||||||
size_t bytes = n * sizeof(T);
|
size_t bytes = n * sizeof(T);
|
||||||
CurrentMemoryTracker::alloc(bytes);
|
auto trace = CurrentMemoryTracker::alloc(bytes);
|
||||||
|
|
||||||
T * p = static_cast<T *>(malloc(bytes));
|
T * p = static_cast<T *>(malloc(bytes));
|
||||||
if (!p)
|
if (!p)
|
||||||
throw std::bad_alloc();
|
throw std::bad_alloc();
|
||||||
|
|
||||||
|
trace.onAlloc(p, bytes);
|
||||||
|
|
||||||
return p;
|
return p;
|
||||||
}
|
}
|
||||||
|
|
||||||
void deallocate(T * p, size_t n) noexcept
|
void deallocate(T * p, size_t n) noexcept
|
||||||
{
|
{
|
||||||
free(p);
|
|
||||||
|
|
||||||
size_t bytes = n * sizeof(T);
|
size_t bytes = n * sizeof(T);
|
||||||
CurrentMemoryTracker::free(bytes);
|
|
||||||
|
free(p);
|
||||||
|
auto trace = CurrentMemoryTracker::free(bytes);
|
||||||
|
trace.onFree(p, bytes);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -37,7 +37,7 @@ MemoryTracker * getMemoryTracker()
|
|||||||
|
|
||||||
using DB::current_thread;
|
using DB::current_thread;
|
||||||
|
|
||||||
void CurrentMemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceeded)
|
AllocationTrace CurrentMemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceeded)
|
||||||
{
|
{
|
||||||
#ifdef MEMORY_TRACKER_DEBUG_CHECKS
|
#ifdef MEMORY_TRACKER_DEBUG_CHECKS
|
||||||
if (unlikely(memory_tracker_always_throw_logical_error_on_allocation))
|
if (unlikely(memory_tracker_always_throw_logical_error_on_allocation))
|
||||||
@ -55,8 +55,9 @@ void CurrentMemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceeded)
|
|||||||
|
|
||||||
if (will_be > current_thread->untracked_memory_limit)
|
if (will_be > current_thread->untracked_memory_limit)
|
||||||
{
|
{
|
||||||
memory_tracker->allocImpl(will_be, throw_if_memory_exceeded);
|
auto res = memory_tracker->allocImpl(will_be, throw_if_memory_exceeded);
|
||||||
current_thread->untracked_memory = 0;
|
current_thread->untracked_memory = 0;
|
||||||
|
return res;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
@ -68,36 +69,40 @@ void CurrentMemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceeded)
|
|||||||
/// total_memory_tracker only, ignore untracked_memory
|
/// total_memory_tracker only, ignore untracked_memory
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
memory_tracker->allocImpl(size, throw_if_memory_exceeded);
|
return memory_tracker->allocImpl(size, throw_if_memory_exceeded);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return AllocationTrace(memory_tracker->getSampleProbability());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return AllocationTrace(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
void CurrentMemoryTracker::check()
|
void CurrentMemoryTracker::check()
|
||||||
{
|
{
|
||||||
if (auto * memory_tracker = getMemoryTracker())
|
if (auto * memory_tracker = getMemoryTracker())
|
||||||
memory_tracker->allocImpl(0, true);
|
std::ignore = memory_tracker->allocImpl(0, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
void CurrentMemoryTracker::alloc(Int64 size)
|
AllocationTrace CurrentMemoryTracker::alloc(Int64 size)
|
||||||
{
|
{
|
||||||
bool throw_if_memory_exceeded = true;
|
bool throw_if_memory_exceeded = true;
|
||||||
allocImpl(size, throw_if_memory_exceeded);
|
return allocImpl(size, throw_if_memory_exceeded);
|
||||||
}
|
}
|
||||||
|
|
||||||
void CurrentMemoryTracker::allocNoThrow(Int64 size)
|
AllocationTrace CurrentMemoryTracker::allocNoThrow(Int64 size)
|
||||||
{
|
{
|
||||||
bool throw_if_memory_exceeded = false;
|
bool throw_if_memory_exceeded = false;
|
||||||
allocImpl(size, throw_if_memory_exceeded);
|
return allocImpl(size, throw_if_memory_exceeded);
|
||||||
}
|
}
|
||||||
|
|
||||||
void CurrentMemoryTracker::realloc(Int64 old_size, Int64 new_size)
|
AllocationTrace CurrentMemoryTracker::realloc(Int64 old_size, Int64 new_size)
|
||||||
{
|
{
|
||||||
Int64 addition = new_size - old_size;
|
Int64 addition = new_size - old_size;
|
||||||
addition > 0 ? alloc(addition) : free(-addition);
|
return addition > 0 ? alloc(addition) : free(-addition);
|
||||||
}
|
}
|
||||||
|
|
||||||
void CurrentMemoryTracker::free(Int64 size)
|
AllocationTrace CurrentMemoryTracker::free(Int64 size)
|
||||||
{
|
{
|
||||||
if (auto * memory_tracker = getMemoryTracker())
|
if (auto * memory_tracker = getMemoryTracker())
|
||||||
{
|
{
|
||||||
@ -106,15 +111,20 @@ void CurrentMemoryTracker::free(Int64 size)
|
|||||||
current_thread->untracked_memory -= size;
|
current_thread->untracked_memory -= size;
|
||||||
if (current_thread->untracked_memory < -current_thread->untracked_memory_limit)
|
if (current_thread->untracked_memory < -current_thread->untracked_memory_limit)
|
||||||
{
|
{
|
||||||
memory_tracker->free(-current_thread->untracked_memory);
|
Int64 untracked_memory = current_thread->untracked_memory;
|
||||||
current_thread->untracked_memory = 0;
|
current_thread->untracked_memory = 0;
|
||||||
|
return memory_tracker->free(-untracked_memory);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
/// total_memory_tracker only, ignore untracked_memory
|
/// total_memory_tracker only, ignore untracked_memory
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
memory_tracker->free(size);
|
return memory_tracker->free(size);
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
return AllocationTrace(memory_tracker->getSampleProbability());
|
||||||
|
}
|
||||||
|
|
||||||
|
return AllocationTrace(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1,19 +1,20 @@
|
|||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
#include <base/types.h>
|
#include <base/types.h>
|
||||||
|
#include <Common/AllocationTrace.h>
|
||||||
|
|
||||||
/// Convenience methods, that use current thread's memory_tracker if it is available.
|
/// Convenience methods, that use current thread's memory_tracker if it is available.
|
||||||
struct CurrentMemoryTracker
|
struct CurrentMemoryTracker
|
||||||
{
|
{
|
||||||
/// Call the following functions before calling of corresponding operations with memory allocators.
|
/// Call the following functions before calling of corresponding operations with memory allocators.
|
||||||
static void alloc(Int64 size);
|
[[nodiscard]] static AllocationTrace alloc(Int64 size);
|
||||||
static void allocNoThrow(Int64 size);
|
[[nodiscard]] static AllocationTrace allocNoThrow(Int64 size);
|
||||||
static void realloc(Int64 old_size, Int64 new_size);
|
[[nodiscard]] static AllocationTrace realloc(Int64 old_size, Int64 new_size);
|
||||||
|
|
||||||
/// This function should be called after memory deallocation.
|
/// This function should be called after memory deallocation.
|
||||||
static void free(Int64 size);
|
[[nodiscard]] static AllocationTrace free(Int64 size);
|
||||||
static void check();
|
static void check();
|
||||||
|
|
||||||
private:
|
private:
|
||||||
static void allocImpl(Int64 size, bool throw_if_memory_exceeded);
|
[[nodiscard]] static AllocationTrace allocImpl(Int64 size, bool throw_if_memory_exceeded);
|
||||||
};
|
};
|
||||||
|
@ -57,7 +57,8 @@ public:
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Do not count guard page in memory usage.
|
/// Do not count guard page in memory usage.
|
||||||
CurrentMemoryTracker::alloc(num_pages * page_size);
|
auto trace = CurrentMemoryTracker::alloc(num_pages * page_size);
|
||||||
|
trace.onAlloc(vp, num_pages * page_size);
|
||||||
|
|
||||||
boost::context::stack_context sctx;
|
boost::context::stack_context sctx;
|
||||||
sctx.size = num_bytes;
|
sctx.size = num_bytes;
|
||||||
@ -77,6 +78,7 @@ public:
|
|||||||
::munmap(vp, sctx.size);
|
::munmap(vp, sctx.size);
|
||||||
|
|
||||||
/// Do not count guard page in memory usage.
|
/// Do not count guard page in memory usage.
|
||||||
CurrentMemoryTracker::free(sctx.size - page_size);
|
auto trace = CurrentMemoryTracker::free(sctx.size - page_size);
|
||||||
|
trace.onFree(vp, sctx.size - page_size);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
#include "MemoryTracker.h"
|
#include "MemoryTracker.h"
|
||||||
|
|
||||||
#include <IO/WriteHelpers.h>
|
#include <IO/WriteHelpers.h>
|
||||||
|
#include <Common/SipHash.h>
|
||||||
#include <Common/VariableContext.h>
|
#include <Common/VariableContext.h>
|
||||||
#include <Common/TraceSender.h>
|
#include <Common/TraceSender.h>
|
||||||
#include <Common/Exception.h>
|
#include <Common/Exception.h>
|
||||||
@ -82,6 +83,53 @@ inline std::string_view toDescription(OvercommitResult result)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool shouldTrackAllocation(DB::Float64 probability, void * ptr)
|
||||||
|
{
|
||||||
|
return sipHash64(uintptr_t(ptr)) < std::numeric_limits<uint64_t>::max() * probability;
|
||||||
|
}
|
||||||
|
|
||||||
|
AllocationTrace updateAllocationTrace(AllocationTrace trace, const std::optional<double> & sample_probability)
|
||||||
|
{
|
||||||
|
if (unlikely(sample_probability))
|
||||||
|
return AllocationTrace(*sample_probability);
|
||||||
|
|
||||||
|
return trace;
|
||||||
|
}
|
||||||
|
|
||||||
|
AllocationTrace getAllocationTrace(std::optional<double> & sample_probability)
|
||||||
|
{
|
||||||
|
if (unlikely(sample_probability))
|
||||||
|
return AllocationTrace(*sample_probability);
|
||||||
|
|
||||||
|
return AllocationTrace(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
AllocationTrace::AllocationTrace(double sample_probability_) : sample_probability(sample_probability_) {}
|
||||||
|
|
||||||
|
void AllocationTrace::onAlloc(void * ptr, size_t size) const
|
||||||
|
{
|
||||||
|
if (likely(sample_probability == 0))
|
||||||
|
return;
|
||||||
|
|
||||||
|
if (sample_probability < 1 && !shouldTrackAllocation(sample_probability, ptr))
|
||||||
|
return;
|
||||||
|
|
||||||
|
MemoryTrackerBlockerInThread untrack_lock(VariableContext::Global);
|
||||||
|
DB::TraceSender::send(DB::TraceType::MemorySample, StackTrace(), {.size = Int64(size), .ptr = ptr});
|
||||||
|
}
|
||||||
|
|
||||||
|
void AllocationTrace::onFree(void * ptr, size_t size) const
|
||||||
|
{
|
||||||
|
if (likely(sample_probability == 0))
|
||||||
|
return;
|
||||||
|
|
||||||
|
if (sample_probability < 1 && !shouldTrackAllocation(sample_probability, ptr))
|
||||||
|
return;
|
||||||
|
|
||||||
|
MemoryTrackerBlockerInThread untrack_lock(VariableContext::Global);
|
||||||
|
DB::TraceSender::send(DB::TraceType::MemorySample, StackTrace(), {.size = -Int64(size), .ptr = ptr});
|
||||||
}
|
}
|
||||||
|
|
||||||
namespace ProfileEvents
|
namespace ProfileEvents
|
||||||
@ -135,7 +183,7 @@ void MemoryTracker::logMemoryUsage(Int64 current) const
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void MemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceeded, MemoryTracker * query_tracker)
|
AllocationTrace MemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceeded, MemoryTracker * query_tracker)
|
||||||
{
|
{
|
||||||
if (size < 0)
|
if (size < 0)
|
||||||
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Negative size ({}) is passed to MemoryTracker. It is a bug.", size);
|
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Negative size ({}) is passed to MemoryTracker. It is a bug.", size);
|
||||||
@ -154,9 +202,14 @@ void MemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceeded, MemoryT
|
|||||||
|
|
||||||
/// Since the MemoryTrackerBlockerInThread should respect the level, we should go to the next parent.
|
/// Since the MemoryTrackerBlockerInThread should respect the level, we should go to the next parent.
|
||||||
if (auto * loaded_next = parent.load(std::memory_order_relaxed))
|
if (auto * loaded_next = parent.load(std::memory_order_relaxed))
|
||||||
loaded_next->allocImpl(size, throw_if_memory_exceeded,
|
{
|
||||||
level == VariableContext::Process ? this : query_tracker);
|
MemoryTracker * tracker = level == VariableContext::Process ? this : query_tracker;
|
||||||
return;
|
return updateAllocationTrace(
|
||||||
|
loaded_next->allocImpl(size, throw_if_memory_exceeded, tracker),
|
||||||
|
sample_probability);
|
||||||
|
}
|
||||||
|
|
||||||
|
return getAllocationTrace(sample_probability);
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Using memory_order_relaxed means that if allocations are done simultaneously,
|
/** Using memory_order_relaxed means that if allocations are done simultaneously,
|
||||||
@ -183,14 +236,6 @@ void MemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceeded, MemoryT
|
|||||||
allocation_traced = true;
|
allocation_traced = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
std::bernoulli_distribution sample(sample_probability);
|
|
||||||
if (unlikely(sample_probability > 0.0 && sample(thread_local_rng)))
|
|
||||||
{
|
|
||||||
MemoryTrackerBlockerInThread untrack_lock(VariableContext::Global);
|
|
||||||
DB::TraceSender::send(DB::TraceType::MemorySample, StackTrace(), {.size = size});
|
|
||||||
allocation_traced = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
std::bernoulli_distribution fault(fault_probability);
|
std::bernoulli_distribution fault(fault_probability);
|
||||||
if (unlikely(fault_probability > 0.0 && fault(thread_local_rng)))
|
if (unlikely(fault_probability > 0.0 && fault(thread_local_rng)))
|
||||||
{
|
{
|
||||||
@ -309,16 +354,22 @@ void MemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceeded, MemoryT
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (auto * loaded_next = parent.load(std::memory_order_relaxed))
|
if (auto * loaded_next = parent.load(std::memory_order_relaxed))
|
||||||
loaded_next->allocImpl(size, throw_if_memory_exceeded,
|
{
|
||||||
level == VariableContext::Process ? this : query_tracker);
|
MemoryTracker * tracker = level == VariableContext::Process ? this : query_tracker;
|
||||||
|
return updateAllocationTrace(
|
||||||
|
loaded_next->allocImpl(size, throw_if_memory_exceeded, tracker),
|
||||||
|
sample_probability);
|
||||||
|
}
|
||||||
|
|
||||||
|
return getAllocationTrace(sample_probability);
|
||||||
}
|
}
|
||||||
|
|
||||||
void MemoryTracker::adjustWithUntrackedMemory(Int64 untracked_memory)
|
void MemoryTracker::adjustWithUntrackedMemory(Int64 untracked_memory)
|
||||||
{
|
{
|
||||||
if (untracked_memory > 0)
|
if (untracked_memory > 0)
|
||||||
allocImpl(untracked_memory, /*throw_if_memory_exceeded*/ false);
|
std::ignore = allocImpl(untracked_memory, /*throw_if_memory_exceeded*/ false);
|
||||||
else
|
else
|
||||||
free(-untracked_memory);
|
std::ignore = free(-untracked_memory);
|
||||||
}
|
}
|
||||||
|
|
||||||
bool MemoryTracker::updatePeak(Int64 will_be, bool log_memory_usage)
|
bool MemoryTracker::updatePeak(Int64 will_be, bool log_memory_usage)
|
||||||
@ -337,8 +388,7 @@ bool MemoryTracker::updatePeak(Int64 will_be, bool log_memory_usage)
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
AllocationTrace MemoryTracker::free(Int64 size)
|
||||||
void MemoryTracker::free(Int64 size)
|
|
||||||
{
|
{
|
||||||
if (MemoryTrackerBlockerInThread::isBlocked(level))
|
if (MemoryTrackerBlockerInThread::isBlocked(level))
|
||||||
{
|
{
|
||||||
@ -353,15 +403,9 @@ void MemoryTracker::free(Int64 size)
|
|||||||
|
|
||||||
/// Since the MemoryTrackerBlockerInThread should respect the level, we should go to the next parent.
|
/// Since the MemoryTrackerBlockerInThread should respect the level, we should go to the next parent.
|
||||||
if (auto * loaded_next = parent.load(std::memory_order_relaxed))
|
if (auto * loaded_next = parent.load(std::memory_order_relaxed))
|
||||||
loaded_next->free(size);
|
return updateAllocationTrace(loaded_next->free(size), sample_probability);
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
std::bernoulli_distribution sample(sample_probability);
|
return getAllocationTrace(sample_probability);
|
||||||
if (unlikely(sample_probability > 0.0 && sample(thread_local_rng)))
|
|
||||||
{
|
|
||||||
MemoryTrackerBlockerInThread untrack_lock(VariableContext::Global);
|
|
||||||
DB::TraceSender::send(DB::TraceType::MemorySample, StackTrace(), {.size = -size});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Int64 accounted_size = size;
|
Int64 accounted_size = size;
|
||||||
@ -389,12 +433,15 @@ void MemoryTracker::free(Int64 size)
|
|||||||
if (auto * overcommit_tracker_ptr = overcommit_tracker.load(std::memory_order_relaxed))
|
if (auto * overcommit_tracker_ptr = overcommit_tracker.load(std::memory_order_relaxed))
|
||||||
overcommit_tracker_ptr->tryContinueQueryExecutionAfterFree(accounted_size);
|
overcommit_tracker_ptr->tryContinueQueryExecutionAfterFree(accounted_size);
|
||||||
|
|
||||||
|
AllocationTrace res = getAllocationTrace(sample_probability);
|
||||||
if (auto * loaded_next = parent.load(std::memory_order_relaxed))
|
if (auto * loaded_next = parent.load(std::memory_order_relaxed))
|
||||||
loaded_next->free(size);
|
res = updateAllocationTrace(loaded_next->free(size), sample_probability);
|
||||||
|
|
||||||
auto metric_loaded = metric.load(std::memory_order_relaxed);
|
auto metric_loaded = metric.load(std::memory_order_relaxed);
|
||||||
if (metric_loaded != CurrentMetrics::end())
|
if (metric_loaded != CurrentMetrics::end())
|
||||||
CurrentMetrics::sub(metric_loaded, accounted_size);
|
CurrentMetrics::sub(metric_loaded, accounted_size);
|
||||||
|
|
||||||
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -478,3 +525,14 @@ void MemoryTracker::setOrRaiseProfilerLimit(Int64 value)
|
|||||||
while ((value == 0 || old_value < value) && !profiler_limit.compare_exchange_weak(old_value, value))
|
while ((value == 0 || old_value < value) && !profiler_limit.compare_exchange_weak(old_value, value))
|
||||||
;
|
;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
double MemoryTracker::getSampleProbability()
|
||||||
|
{
|
||||||
|
if (sample_probability)
|
||||||
|
return *sample_probability;
|
||||||
|
|
||||||
|
if (auto * loaded_next = parent.load(std::memory_order_relaxed))
|
||||||
|
return loaded_next->getSampleProbability();
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
@ -2,9 +2,11 @@
|
|||||||
|
|
||||||
#include <atomic>
|
#include <atomic>
|
||||||
#include <chrono>
|
#include <chrono>
|
||||||
|
#include <optional>
|
||||||
#include <base/types.h>
|
#include <base/types.h>
|
||||||
#include <Common/CurrentMetrics.h>
|
#include <Common/CurrentMetrics.h>
|
||||||
#include <Common/VariableContext.h>
|
#include <Common/VariableContext.h>
|
||||||
|
#include <Common/AllocationTrace.h>
|
||||||
|
|
||||||
#if !defined(NDEBUG)
|
#if !defined(NDEBUG)
|
||||||
#define MEMORY_TRACKER_DEBUG_CHECKS
|
#define MEMORY_TRACKER_DEBUG_CHECKS
|
||||||
@ -65,7 +67,7 @@ private:
|
|||||||
double fault_probability = 0;
|
double fault_probability = 0;
|
||||||
|
|
||||||
/// To randomly sample allocations and deallocations in trace_log.
|
/// To randomly sample allocations and deallocations in trace_log.
|
||||||
double sample_probability = 0;
|
std::optional<double> sample_probability;
|
||||||
|
|
||||||
/// Singly-linked list. All information will be passed to subsequent memory trackers also (it allows to implement trackers hierarchy).
|
/// Singly-linked list. All information will be passed to subsequent memory trackers also (it allows to implement trackers hierarchy).
|
||||||
/// In terms of tree nodes it is the list of parents. Lifetime of these trackers should "include" lifetime of current tracker.
|
/// In terms of tree nodes it is the list of parents. Lifetime of these trackers should "include" lifetime of current tracker.
|
||||||
@ -90,8 +92,8 @@ private:
|
|||||||
|
|
||||||
/// allocImpl(...) and free(...) should not be used directly
|
/// allocImpl(...) and free(...) should not be used directly
|
||||||
friend struct CurrentMemoryTracker;
|
friend struct CurrentMemoryTracker;
|
||||||
void allocImpl(Int64 size, bool throw_if_memory_exceeded, MemoryTracker * query_tracker = nullptr);
|
[[nodiscard]] AllocationTrace allocImpl(Int64 size, bool throw_if_memory_exceeded, MemoryTracker * query_tracker = nullptr);
|
||||||
void free(Int64 size);
|
[[nodiscard]] AllocationTrace free(Int64 size);
|
||||||
public:
|
public:
|
||||||
|
|
||||||
static constexpr auto USAGE_EVENT_NAME = "MemoryTrackerUsage";
|
static constexpr auto USAGE_EVENT_NAME = "MemoryTrackerUsage";
|
||||||
@ -146,6 +148,8 @@ public:
|
|||||||
sample_probability = value;
|
sample_probability = value;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
double getSampleProbability();
|
||||||
|
|
||||||
void setProfilerStep(Int64 value)
|
void setProfilerStep(Int64 value)
|
||||||
{
|
{
|
||||||
profiler_step = value;
|
profiler_step = value;
|
||||||
|
@ -28,4 +28,5 @@ public:
|
|||||||
}
|
}
|
||||||
|
|
||||||
friend class MemoryTracker;
|
friend class MemoryTracker;
|
||||||
|
friend struct AllocationTrace;
|
||||||
};
|
};
|
||||||
|
@ -33,6 +33,7 @@ void TraceSender::send(TraceType trace_type, const StackTrace & stack_trace, Ext
|
|||||||
+ sizeof(TraceType) /// trace type
|
+ sizeof(TraceType) /// trace type
|
||||||
+ sizeof(UInt64) /// thread_id
|
+ sizeof(UInt64) /// thread_id
|
||||||
+ sizeof(Int64) /// size
|
+ sizeof(Int64) /// size
|
||||||
|
+ sizeof(void *) /// ptr
|
||||||
+ sizeof(ProfileEvents::Event) /// event
|
+ sizeof(ProfileEvents::Event) /// event
|
||||||
+ sizeof(ProfileEvents::Count); /// increment
|
+ sizeof(ProfileEvents::Count); /// increment
|
||||||
|
|
||||||
@ -74,6 +75,7 @@ void TraceSender::send(TraceType trace_type, const StackTrace & stack_trace, Ext
|
|||||||
writePODBinary(trace_type, out);
|
writePODBinary(trace_type, out);
|
||||||
writePODBinary(thread_id, out);
|
writePODBinary(thread_id, out);
|
||||||
writePODBinary(extras.size, out);
|
writePODBinary(extras.size, out);
|
||||||
|
writePODBinary(UInt64(extras.ptr), out);
|
||||||
writePODBinary(extras.event, out);
|
writePODBinary(extras.event, out);
|
||||||
writePODBinary(extras.increment, out);
|
writePODBinary(extras.increment, out);
|
||||||
|
|
||||||
|
@ -28,8 +28,9 @@ class TraceSender
|
|||||||
public:
|
public:
|
||||||
struct Extras
|
struct Extras
|
||||||
{
|
{
|
||||||
/// size - for memory tracing is the amount of memory allocated; for other trace types it is 0.
|
/// size, ptr - for memory tracing is the amount of memory allocated; for other trace types it is 0.
|
||||||
Int64 size{};
|
Int64 size{};
|
||||||
|
void * ptr = nullptr;
|
||||||
/// Event type and increment for 'ProfileEvent' trace type; for other trace types defaults.
|
/// Event type and increment for 'ProfileEvent' trace type; for other trace types defaults.
|
||||||
ProfileEvents::Event event{ProfileEvents::end()};
|
ProfileEvents::Event event{ProfileEvents::end()};
|
||||||
ProfileEvents::Count increment{};
|
ProfileEvents::Count increment{};
|
||||||
|
@ -9,7 +9,11 @@ extern "C" void * clickhouse_malloc(size_t size)
|
|||||||
{
|
{
|
||||||
void * res = malloc(size);
|
void * res = malloc(size);
|
||||||
if (res)
|
if (res)
|
||||||
Memory::trackMemory(size);
|
{
|
||||||
|
AllocationTrace trace;
|
||||||
|
size_t actual_size = Memory::trackMemory(size, trace);
|
||||||
|
trace.onAlloc(res, actual_size);
|
||||||
|
}
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -17,17 +21,29 @@ extern "C" void * clickhouse_calloc(size_t number_of_members, size_t size)
|
|||||||
{
|
{
|
||||||
void * res = calloc(number_of_members, size);
|
void * res = calloc(number_of_members, size);
|
||||||
if (res)
|
if (res)
|
||||||
Memory::trackMemory(number_of_members * size);
|
{
|
||||||
|
AllocationTrace trace;
|
||||||
|
size_t actual_size = Memory::trackMemory(number_of_members * size, trace);
|
||||||
|
trace.onAlloc(res, actual_size);
|
||||||
|
}
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
extern "C" void * clickhouse_realloc(void * ptr, size_t size)
|
extern "C" void * clickhouse_realloc(void * ptr, size_t size)
|
||||||
{
|
{
|
||||||
if (ptr)
|
if (ptr)
|
||||||
Memory::untrackMemory(ptr);
|
{
|
||||||
|
AllocationTrace trace;
|
||||||
|
size_t actual_size = Memory::untrackMemory(ptr, trace);
|
||||||
|
trace.onFree(ptr, actual_size);
|
||||||
|
}
|
||||||
void * res = realloc(ptr, size);
|
void * res = realloc(ptr, size);
|
||||||
if (res)
|
if (res)
|
||||||
Memory::trackMemory(size);
|
{
|
||||||
|
AllocationTrace trace;
|
||||||
|
size_t actual_size = Memory::trackMemory(size, trace);
|
||||||
|
trace.onAlloc(res, actual_size);
|
||||||
|
}
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -42,7 +58,9 @@ extern "C" void * clickhouse_reallocarray(void * ptr, size_t number_of_members,
|
|||||||
|
|
||||||
extern "C" void clickhouse_free(void * ptr)
|
extern "C" void clickhouse_free(void * ptr)
|
||||||
{
|
{
|
||||||
Memory::untrackMemory(ptr);
|
AllocationTrace trace;
|
||||||
|
size_t actual_size = Memory::untrackMemory(ptr, trace);
|
||||||
|
trace.onFree(ptr, actual_size);
|
||||||
free(ptr);
|
free(ptr);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -50,6 +68,10 @@ extern "C" int clickhouse_posix_memalign(void ** memptr, size_t alignment, size_
|
|||||||
{
|
{
|
||||||
int res = posix_memalign(memptr, alignment, size);
|
int res = posix_memalign(memptr, alignment, size);
|
||||||
if (res == 0)
|
if (res == 0)
|
||||||
Memory::trackMemory(size);
|
{
|
||||||
|
AllocationTrace trace;
|
||||||
|
size_t actual_size = Memory::trackMemory(size, trace);
|
||||||
|
trace.onAlloc(*memptr, actual_size);
|
||||||
|
}
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
@ -112,16 +112,19 @@ inline ALWAYS_INLINE size_t getActualAllocationSize(size_t size, TAlign... align
|
|||||||
|
|
||||||
template <std::same_as<std::align_val_t>... TAlign>
|
template <std::same_as<std::align_val_t>... TAlign>
|
||||||
requires DB::OptionalArgument<TAlign...>
|
requires DB::OptionalArgument<TAlign...>
|
||||||
inline ALWAYS_INLINE void trackMemory(std::size_t size, TAlign... align)
|
inline ALWAYS_INLINE size_t trackMemory(std::size_t size, AllocationTrace & trace, TAlign... align)
|
||||||
{
|
{
|
||||||
std::size_t actual_size = getActualAllocationSize(size, align...);
|
std::size_t actual_size = getActualAllocationSize(size, align...);
|
||||||
CurrentMemoryTracker::allocNoThrow(actual_size);
|
trace = CurrentMemoryTracker::allocNoThrow(actual_size);
|
||||||
|
return actual_size;
|
||||||
}
|
}
|
||||||
|
|
||||||
template <std::same_as<std::align_val_t>... TAlign>
|
template <std::same_as<std::align_val_t>... TAlign>
|
||||||
requires DB::OptionalArgument<TAlign...>
|
requires DB::OptionalArgument<TAlign...>
|
||||||
inline ALWAYS_INLINE void untrackMemory(void * ptr [[maybe_unused]], std::size_t size [[maybe_unused]] = 0, TAlign... align [[maybe_unused]]) noexcept
|
inline ALWAYS_INLINE size_t untrackMemory(void * ptr [[maybe_unused]], AllocationTrace & trace, std::size_t size [[maybe_unused]] = 0, TAlign... align [[maybe_unused]]) noexcept
|
||||||
{
|
{
|
||||||
|
std::size_t actual_size = 0;
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
#if USE_JEMALLOC
|
#if USE_JEMALLOC
|
||||||
@ -130,23 +133,26 @@ inline ALWAYS_INLINE void untrackMemory(void * ptr [[maybe_unused]], std::size_t
|
|||||||
if (likely(ptr != nullptr))
|
if (likely(ptr != nullptr))
|
||||||
{
|
{
|
||||||
if constexpr (sizeof...(TAlign) == 1)
|
if constexpr (sizeof...(TAlign) == 1)
|
||||||
CurrentMemoryTracker::free(sallocx(ptr, MALLOCX_ALIGN(alignToSizeT(align...))));
|
actual_size = sallocx(ptr, MALLOCX_ALIGN(alignToSizeT(align...)));
|
||||||
else
|
else
|
||||||
CurrentMemoryTracker::free(sallocx(ptr, 0));
|
actual_size = sallocx(ptr, 0);
|
||||||
}
|
}
|
||||||
#else
|
#else
|
||||||
if (size)
|
if (size)
|
||||||
CurrentMemoryTracker::free(size);
|
actual_size = size;
|
||||||
# if defined(_GNU_SOURCE)
|
# if defined(_GNU_SOURCE)
|
||||||
/// It's innaccurate resource free for sanitizers. malloc_usable_size() result is greater or equal to allocated size.
|
/// It's innaccurate resource free for sanitizers. malloc_usable_size() result is greater or equal to allocated size.
|
||||||
else
|
else
|
||||||
CurrentMemoryTracker::free(malloc_usable_size(ptr));
|
actual_size = malloc_usable_size(ptr);
|
||||||
# endif
|
# endif
|
||||||
#endif
|
#endif
|
||||||
|
trace = CurrentMemoryTracker::free(actual_size);
|
||||||
}
|
}
|
||||||
catch (...)
|
catch (...)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return actual_size;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -50,50 +50,74 @@ static struct InitializeJemallocZoneAllocatorForOSX
|
|||||||
|
|
||||||
void * operator new(std::size_t size)
|
void * operator new(std::size_t size)
|
||||||
{
|
{
|
||||||
Memory::trackMemory(size);
|
AllocationTrace trace;
|
||||||
return Memory::newImpl(size);
|
std::size_t actual_size = Memory::trackMemory(size, trace);
|
||||||
|
void * ptr = Memory::newImpl(size);
|
||||||
|
trace.onAlloc(ptr, actual_size);
|
||||||
|
return ptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
void * operator new(std::size_t size, std::align_val_t align)
|
void * operator new(std::size_t size, std::align_val_t align)
|
||||||
{
|
{
|
||||||
Memory::trackMemory(size, align);
|
AllocationTrace trace;
|
||||||
return Memory::newImpl(size, align);
|
std::size_t actual_size = Memory::trackMemory(size, trace, align);
|
||||||
|
void * ptr = Memory::newImpl(size, align);
|
||||||
|
trace.onAlloc(ptr, actual_size);
|
||||||
|
return ptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
void * operator new[](std::size_t size)
|
void * operator new[](std::size_t size)
|
||||||
{
|
{
|
||||||
Memory::trackMemory(size);
|
AllocationTrace trace;
|
||||||
return Memory::newImpl(size);
|
std::size_t actual_size = Memory::trackMemory(size, trace);
|
||||||
|
void * ptr = Memory::newImpl(size);
|
||||||
|
trace.onAlloc(ptr, actual_size);
|
||||||
|
return ptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
void * operator new[](std::size_t size, std::align_val_t align)
|
void * operator new[](std::size_t size, std::align_val_t align)
|
||||||
{
|
{
|
||||||
Memory::trackMemory(size, align);
|
AllocationTrace trace;
|
||||||
return Memory::newImpl(size, align);
|
std::size_t actual_size = Memory::trackMemory(size, trace, align);
|
||||||
|
void * ptr = Memory::newImpl(size, align);
|
||||||
|
trace.onAlloc(ptr, actual_size);
|
||||||
|
return ptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
void * operator new(std::size_t size, const std::nothrow_t &) noexcept
|
void * operator new(std::size_t size, const std::nothrow_t &) noexcept
|
||||||
{
|
{
|
||||||
Memory::trackMemory(size);
|
AllocationTrace trace;
|
||||||
return Memory::newNoExept(size);
|
std::size_t actual_size = Memory::trackMemory(size, trace);
|
||||||
|
void * ptr = Memory::newNoExept(size);
|
||||||
|
trace.onAlloc(ptr, actual_size);
|
||||||
|
return ptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
void * operator new[](std::size_t size, const std::nothrow_t &) noexcept
|
void * operator new[](std::size_t size, const std::nothrow_t &) noexcept
|
||||||
{
|
{
|
||||||
Memory::trackMemory(size);
|
AllocationTrace trace;
|
||||||
return Memory::newNoExept(size);
|
std::size_t actual_size = Memory::trackMemory(size, trace);
|
||||||
|
void * ptr = Memory::newNoExept(size);
|
||||||
|
trace.onAlloc(ptr, actual_size);
|
||||||
|
return ptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
void * operator new(std::size_t size, std::align_val_t align, const std::nothrow_t &) noexcept
|
void * operator new(std::size_t size, std::align_val_t align, const std::nothrow_t &) noexcept
|
||||||
{
|
{
|
||||||
Memory::trackMemory(size, align);
|
AllocationTrace trace;
|
||||||
return Memory::newNoExept(size, align);
|
std::size_t actual_size = Memory::trackMemory(size, trace, align);
|
||||||
|
void * ptr = Memory::newNoExept(size, align);
|
||||||
|
trace.onAlloc(ptr, actual_size);
|
||||||
|
return ptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
void * operator new[](std::size_t size, std::align_val_t align, const std::nothrow_t &) noexcept
|
void * operator new[](std::size_t size, std::align_val_t align, const std::nothrow_t &) noexcept
|
||||||
{
|
{
|
||||||
Memory::trackMemory(size, align);
|
AllocationTrace trace;
|
||||||
return Memory::newNoExept(size, align);
|
std::size_t actual_size = Memory::trackMemory(size, trace, align);
|
||||||
|
void * ptr = Memory::newNoExept(size, align);
|
||||||
|
trace.onAlloc(ptr, actual_size);
|
||||||
|
return ptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// delete
|
/// delete
|
||||||
@ -109,48 +133,64 @@ void * operator new[](std::size_t size, std::align_val_t align, const std::nothr
|
|||||||
|
|
||||||
void operator delete(void * ptr) noexcept
|
void operator delete(void * ptr) noexcept
|
||||||
{
|
{
|
||||||
Memory::untrackMemory(ptr);
|
AllocationTrace trace;
|
||||||
|
std::size_t actual_size = Memory::untrackMemory(ptr, trace);
|
||||||
|
trace.onFree(ptr, actual_size);
|
||||||
Memory::deleteImpl(ptr);
|
Memory::deleteImpl(ptr);
|
||||||
}
|
}
|
||||||
|
|
||||||
void operator delete(void * ptr, std::align_val_t align) noexcept
|
void operator delete(void * ptr, std::align_val_t align) noexcept
|
||||||
{
|
{
|
||||||
Memory::untrackMemory(ptr, 0, align);
|
AllocationTrace trace;
|
||||||
|
std::size_t actual_size = Memory::untrackMemory(ptr, trace, 0, align);
|
||||||
|
trace.onFree(ptr, actual_size);
|
||||||
Memory::deleteImpl(ptr);
|
Memory::deleteImpl(ptr);
|
||||||
}
|
}
|
||||||
|
|
||||||
void operator delete[](void * ptr) noexcept
|
void operator delete[](void * ptr) noexcept
|
||||||
{
|
{
|
||||||
Memory::untrackMemory(ptr);
|
AllocationTrace trace;
|
||||||
|
std::size_t actual_size = Memory::untrackMemory(ptr, trace);
|
||||||
|
trace.onFree(ptr, actual_size);
|
||||||
Memory::deleteImpl(ptr);
|
Memory::deleteImpl(ptr);
|
||||||
}
|
}
|
||||||
|
|
||||||
void operator delete[](void * ptr, std::align_val_t align) noexcept
|
void operator delete[](void * ptr, std::align_val_t align) noexcept
|
||||||
{
|
{
|
||||||
Memory::untrackMemory(ptr, 0, align);
|
AllocationTrace trace;
|
||||||
|
std::size_t actual_size = Memory::untrackMemory(ptr, trace, 0, align);
|
||||||
|
trace.onFree(ptr, actual_size);
|
||||||
Memory::deleteImpl(ptr);
|
Memory::deleteImpl(ptr);
|
||||||
}
|
}
|
||||||
|
|
||||||
void operator delete(void * ptr, std::size_t size) noexcept
|
void operator delete(void * ptr, std::size_t size) noexcept
|
||||||
{
|
{
|
||||||
Memory::untrackMemory(ptr, size);
|
AllocationTrace trace;
|
||||||
|
std::size_t actual_size = Memory::untrackMemory(ptr, trace, size);
|
||||||
|
trace.onFree(ptr, actual_size);
|
||||||
Memory::deleteSized(ptr, size);
|
Memory::deleteSized(ptr, size);
|
||||||
}
|
}
|
||||||
|
|
||||||
void operator delete(void * ptr, std::size_t size, std::align_val_t align) noexcept
|
void operator delete(void * ptr, std::size_t size, std::align_val_t align) noexcept
|
||||||
{
|
{
|
||||||
Memory::untrackMemory(ptr, size, align);
|
AllocationTrace trace;
|
||||||
|
std::size_t actual_size = Memory::untrackMemory(ptr, trace, size, align);
|
||||||
|
trace.onFree(ptr, actual_size);
|
||||||
Memory::deleteSized(ptr, size, align);
|
Memory::deleteSized(ptr, size, align);
|
||||||
}
|
}
|
||||||
|
|
||||||
void operator delete[](void * ptr, std::size_t size) noexcept
|
void operator delete[](void * ptr, std::size_t size) noexcept
|
||||||
{
|
{
|
||||||
Memory::untrackMemory(ptr, size);
|
AllocationTrace trace;
|
||||||
|
std::size_t actual_size = Memory::untrackMemory(ptr, trace, size);
|
||||||
|
trace.onFree(ptr, actual_size);
|
||||||
Memory::deleteSized(ptr, size);
|
Memory::deleteSized(ptr, size);
|
||||||
}
|
}
|
||||||
|
|
||||||
void operator delete[](void * ptr, std::size_t size, std::align_val_t align) noexcept
|
void operator delete[](void * ptr, std::size_t size, std::align_val_t align) noexcept
|
||||||
{
|
{
|
||||||
Memory::untrackMemory(ptr, size, align);
|
AllocationTrace trace;
|
||||||
|
std::size_t actual_size = Memory::untrackMemory(ptr, trace, size, align);
|
||||||
|
trace.onFree(ptr, actual_size);
|
||||||
Memory::deleteSized(ptr, size, align);
|
Memory::deleteSized(ptr, size, align);
|
||||||
}
|
}
|
||||||
|
@ -104,6 +104,9 @@ void TraceCollector::run()
|
|||||||
Int64 size;
|
Int64 size;
|
||||||
readPODBinary(size, in);
|
readPODBinary(size, in);
|
||||||
|
|
||||||
|
UInt64 ptr;
|
||||||
|
readPODBinary(ptr, in);
|
||||||
|
|
||||||
ProfileEvents::Event event;
|
ProfileEvents::Event event;
|
||||||
readPODBinary(event, in);
|
readPODBinary(event, in);
|
||||||
|
|
||||||
@ -119,7 +122,7 @@ void TraceCollector::run()
|
|||||||
|
|
||||||
UInt64 time = static_cast<UInt64>(ts.tv_sec * 1000000000LL + ts.tv_nsec);
|
UInt64 time = static_cast<UInt64>(ts.tv_sec * 1000000000LL + ts.tv_nsec);
|
||||||
UInt64 time_in_microseconds = static_cast<UInt64>((ts.tv_sec * 1000000LL) + (ts.tv_nsec / 1000));
|
UInt64 time_in_microseconds = static_cast<UInt64>((ts.tv_sec * 1000000LL) + (ts.tv_nsec / 1000));
|
||||||
TraceLogElement element{time_t(time / 1000000000), time_in_microseconds, time, trace_type, thread_id, query_id, trace, size, event, increment};
|
TraceLogElement element{time_t(time / 1000000000), time_in_microseconds, time, trace_type, thread_id, query_id, trace, size, ptr, event, increment};
|
||||||
trace_log->add(element);
|
trace_log->add(element);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -38,6 +38,7 @@ NamesAndTypesList TraceLogElement::getNamesAndTypes()
|
|||||||
{"query_id", std::make_shared<DataTypeString>()},
|
{"query_id", std::make_shared<DataTypeString>()},
|
||||||
{"trace", std::make_shared<DataTypeArray>(std::make_shared<DataTypeUInt64>())},
|
{"trace", std::make_shared<DataTypeArray>(std::make_shared<DataTypeUInt64>())},
|
||||||
{"size", std::make_shared<DataTypeInt64>()},
|
{"size", std::make_shared<DataTypeInt64>()},
|
||||||
|
{"ptr", std::make_shared<DataTypeUInt64>()},
|
||||||
{"event", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())},
|
{"event", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())},
|
||||||
{"increment", std::make_shared<DataTypeInt64>()},
|
{"increment", std::make_shared<DataTypeInt64>()},
|
||||||
};
|
};
|
||||||
@ -57,6 +58,7 @@ void TraceLogElement::appendToBlock(MutableColumns & columns) const
|
|||||||
columns[i++]->insertData(query_id.data(), query_id.size());
|
columns[i++]->insertData(query_id.data(), query_id.size());
|
||||||
columns[i++]->insert(trace);
|
columns[i++]->insert(trace);
|
||||||
columns[i++]->insert(size);
|
columns[i++]->insert(size);
|
||||||
|
columns[i++]->insert(ptr);
|
||||||
|
|
||||||
String event_name;
|
String event_name;
|
||||||
if (event != ProfileEvents::end())
|
if (event != ProfileEvents::end())
|
||||||
|
@ -27,8 +27,10 @@ struct TraceLogElement
|
|||||||
UInt64 thread_id{};
|
UInt64 thread_id{};
|
||||||
String query_id{};
|
String query_id{};
|
||||||
Array trace{};
|
Array trace{};
|
||||||
/// Allocation size in bytes for TraceType::Memory.
|
/// Allocation size in bytes for TraceType::Memory and TraceType::MemorySample.
|
||||||
Int64 size{};
|
Int64 size{};
|
||||||
|
/// Allocation ptr for TraceType::MemorySample.
|
||||||
|
UInt64 ptr{};
|
||||||
/// ProfileEvent for TraceType::ProfileEvent.
|
/// ProfileEvent for TraceType::ProfileEvent.
|
||||||
ProfileEvents::Event event{ProfileEvents::end()};
|
ProfileEvents::Event event{ProfileEvents::end()};
|
||||||
/// Increment of profile event for TraceType::ProfileEvent.
|
/// Increment of profile event for TraceType::ProfileEvent.
|
||||||
|
@ -88,6 +88,10 @@ MergeListElement::MergeListElement(
|
|||||||
/// thread_group::memory_tracker, but MemoryTrackerThreadSwitcher will reset parent).
|
/// thread_group::memory_tracker, but MemoryTrackerThreadSwitcher will reset parent).
|
||||||
memory_tracker.setProfilerStep(settings.memory_profiler_step);
|
memory_tracker.setProfilerStep(settings.memory_profiler_step);
|
||||||
memory_tracker.setSampleProbability(settings.memory_profiler_sample_probability);
|
memory_tracker.setSampleProbability(settings.memory_profiler_sample_probability);
|
||||||
|
/// Specify sample probability also for current thread to track more deallocations.
|
||||||
|
if (auto * thread_memory_tracker = DB::CurrentThread::getMemoryTracker())
|
||||||
|
thread_memory_tracker->setSampleProbability(settings.memory_profiler_sample_probability);
|
||||||
|
|
||||||
memory_tracker.setSoftLimit(settings.memory_overcommit_ratio_denominator);
|
memory_tracker.setSoftLimit(settings.memory_overcommit_ratio_denominator);
|
||||||
if (settings.memory_tracker_fault_probability > 0.0)
|
if (settings.memory_tracker_fault_probability > 0.0)
|
||||||
memory_tracker.setFaultProbability(settings.memory_tracker_fault_probability);
|
memory_tracker.setFaultProbability(settings.memory_tracker_fault_probability);
|
||||||
|
Loading…
Reference in New Issue
Block a user