dbms: tracking amount of memory usage per query [#METR-11015].

This commit is contained in:
Alexey Milovidov 2014-05-04 02:57:43 +04:00
parent 4f1852c399
commit 0abc554493
21 changed files with 226 additions and 44 deletions

View File

@ -15,7 +15,7 @@ namespace DB
struct AggregateFunctionGroupArrayData
{
Array value;
Array value; /// TODO Добавить MemoryTracker
};

View File

@ -20,7 +20,7 @@ template <typename ArgumentFieldType>
struct AggregateFunctionQuantileData
{
typedef ReservoirSampler<ArgumentFieldType, ReservoirSamplerOnEmpty::RETURN_NAN_OR_ZERO> Sample;
Sample sample;
Sample sample; /// TODO Добавить MemoryTracker
};

View File

@ -2,6 +2,8 @@
#include <limits>
#include <DB/Common/MemoryTracker.h>
#include <DB/IO/WriteHelpers.h>
#include <DB/IO/ReadHelpers.h>
@ -324,6 +326,9 @@ private:
void toLarge()
{
if (current_memory_tracker)
current_memory_tracker->alloc(sizeof(detail::QuantileTimingLarge));
/// На время копирования данных из tiny, устанавливать значение large ещё нельзя (иначе оно перезатрёт часть данных).
detail::QuantileTimingLarge * tmp_large = new detail::QuantileTimingLarge;
@ -343,7 +348,12 @@ public:
~QuantileTiming()
{
if (isLarge())
{
delete large;
if (current_memory_tracker)
current_memory_tracker->free(sizeof(detail::QuantileTimingLarge));
}
}
void insert(UInt64 x)
@ -405,6 +415,10 @@ public:
if (!isLarge())
{
tiny.count = TINY_MAX_ELEMS + 1;
if (current_memory_tracker)
current_memory_tracker->alloc(sizeof(detail::QuantileTimingLarge));
large = new detail::QuantileTimingLarge;
}
@ -424,6 +438,10 @@ public:
if (!isLarge())
{
tiny.count = TINY_MAX_ELEMS + 1;
if (current_memory_tracker)
current_memory_tracker->alloc(sizeof(detail::QuantileTimingLarge));
large = new detail::QuantileTimingLarge;
}

View File

@ -6,6 +6,7 @@
#include <Poco/SharedPtr.h>
#include <Yandex/likely.h>
#include <DB/Common/ProfileEvents.h>
#include <DB/Common/MemoryTracker.h>
namespace DB
@ -37,6 +38,9 @@ private:
ProfileEvents::increment(ProfileEvents::ArenaAllocChunks);
ProfileEvents::increment(ProfileEvents::ArenaAllocBytes, size_);
if (current_memory_tracker)
current_memory_tracker->alloc(size_);
begin = allocate(size_);
pos = begin;
end = begin + size_;
@ -47,6 +51,9 @@ private:
{
deallocate(begin, size());
if (current_memory_tracker)
current_memory_tracker->free(size());
if (prev)
delete prev;
}

View File

@ -264,6 +264,7 @@ protected:
void alloc()
{
// TODO Если здесь исключение, то free будет вызывана от неправильного размера.
buf = reinterpret_cast<Cell *>(Allocator::alloc(bufSizeBytes()));
}

View File

@ -4,6 +4,7 @@
#include <string.h>
#include <sys/mman.h>
#include <DB/Common/MemoryTracker.h>
#include <DB/Core/Exception.h>
#include <DB/Core/ErrorCodes.h>
@ -31,6 +32,9 @@ public:
/// Выделить кусок памяти и заполнить его нулями.
void * alloc(size_t size)
{
if (current_memory_tracker)
current_memory_tracker->alloc(size);
void * buf;
if (size >= MMAP_THRESHOLD)
@ -63,6 +67,9 @@ public:
{
::free(buf);
}
if (current_memory_tracker)
current_memory_tracker->free(size);
}
/** Увеличить размер куска памяти.
@ -72,6 +79,9 @@ public:
*/
void * realloc(void * buf, size_t old_size, size_t new_size)
{
if (current_memory_tracker)
current_memory_tracker->realloc(old_size, new_size);
if (old_size < MMAP_THRESHOLD && new_size < MMAP_THRESHOLD)
{
buf = ::realloc(buf, new_size);
@ -96,7 +106,6 @@ public:
buf = new_buf;
}
return buf;
}
};

View File

@ -0,0 +1,49 @@
#pragma once
#include <Yandex/Common.h>
/** Отслеживает потребление памяти.
* Кидает исключение, если оно стало бы больше некоторого предельного значения.
* Один объект может использоваться одновременно в разных потоках.
*/
class MemoryTracker
{
Int32 amount = 0;
Int32 peak = 0;
Int32 limit = 0;
public:
MemoryTracker(Int32 limit_) : limit(limit_) {}
~MemoryTracker();
/** Вызывайте эти функции перед соответствующими операциями с памятью.
*/
void alloc(Int32 size);
void realloc(Int32 old_size, Int32 new_size)
{
alloc(new_size - old_size);
}
/** А эту функцию имеет смысл вызывать после освобождения памяти.
*/
void free(Int32 size)
{
__sync_sub_and_fetch(&amount, size);
}
Int32 get() const
{
return amount;
}
};
/** Объект MemoryTracker довольно трудно протащить во все места, где выделяются существенные объёмы памяти.
* Поэтому, используется thread-local указатель на используемый MemoryTracker или nullptr, если его не нужно использовать.
* Этот указатель выставляется, когда в данном потоке следует отслеживать потребление памяти.
* Таким образом, его нужно всего-лишь протащить во все потоки, в которых обрабатывается один запрос.
*/
extern __thread MemoryTracker * current_memory_tracker;

View File

@ -12,6 +12,7 @@
#include <Yandex/likely.h>
#include <Yandex/strong_typedef.h>
#include <DB/Common/MemoryTracker.h>
#include <DB/Core/Exception.h>
#include <DB/Core/ErrorCodes.h>
@ -88,6 +89,10 @@ private:
}
size_t bytes_to_alloc = to_size(n);
if (current_memory_tracker)
current_memory_tracker->alloc(bytes_to_alloc);
c_start = c_end = Allocator::allocate(bytes_to_alloc);
c_end_of_storage = c_start + bytes_to_alloc;
}
@ -101,6 +106,9 @@ private:
::free(c_start);
else
Allocator::deallocate(c_start, storage_size());
if (current_memory_tracker)
current_memory_tracker->free(storage_size());
}
void realloc(size_t n)
@ -117,6 +125,9 @@ private:
char * old_c_start = c_start;
char * old_c_end_of_storage = c_end_of_storage;
if (current_memory_tracker)
current_memory_tracker->realloc(storage_size(), bytes_to_alloc);
if (use_libc_realloc)
{
c_start = reinterpret_cast<char *>(::realloc(c_start, bytes_to_alloc));

View File

@ -248,6 +248,7 @@ namespace ErrorCodes
FORMAT_VERSION_TOO_OLD,
CANNOT_MUNMAP,
CANNOT_MREMAP,
MEMORY_LIMIT_EXCEEDED,
POCO_EXCEPTION = 1000,
STD_EXCEPTION,

View File

@ -89,7 +89,7 @@ protected:
/// Если вычислений ещё не было - вычислим первый блок синхронно
if (!started)
{
calculate();
calculate(current_memory_tracker);
started = true;
}
else /// Если вычисления уже идут - подождём результата
@ -113,13 +113,15 @@ protected:
void next()
{
ready.reset();
pool.schedule(boost::bind(&AsynchronousBlockInputStream::calculate, this));
pool.schedule(boost::bind(&AsynchronousBlockInputStream::calculate, this, current_memory_tracker));
}
/// Вычисления, которые могут выполняться в отдельном потоке
void calculate()
void calculate(MemoryTracker * memory_tracker)
{
current_memory_tracker = memory_tracker;
try
{
block = children.back()->read();

View File

@ -76,7 +76,8 @@ protected:
for (size_t i = 0, size = many_data.size(); i < size; ++i)
{
many_data[i] = new AggregatedDataVariants;
pool.schedule(boost::bind(&ParallelAggregatingBlockInputStream::calculate, this, boost::ref(children[i]), boost::ref(*many_data[i]), boost::ref(exceptions[i])));
pool.schedule(boost::bind(&ParallelAggregatingBlockInputStream::calculate, this,
boost::ref(children[i]), boost::ref(*many_data[i]), boost::ref(exceptions[i]), current_memory_tracker));
}
pool.wait();
@ -97,8 +98,10 @@ private:
boost::threadpool::pool pool;
/// Вычисления, которые выполняются в отдельном потоке
void calculate(BlockInputStreamPtr & input, AggregatedDataVariants & data, ExceptionPtr & exception)
void calculate(BlockInputStreamPtr & input, AggregatedDataVariants & data, ExceptionPtr & exception, MemoryTracker * memory_tracker)
{
current_memory_tracker = memory_tracker;
try
{
aggregator->execute(input, data);

View File

@ -148,7 +148,7 @@ protected:
threads_data.resize(max_threads);
for (ThreadsData::iterator it = threads_data.begin(); it != threads_data.end(); ++it)
{
it->runnable = new Thread(*this);
it->runnable = new Thread(*this, current_memory_tracker);
it->thread = new Poco::Thread;
it->thread->start(*it->runnable);
}
@ -202,7 +202,10 @@ private:
class Thread : public Poco::Runnable
{
public:
Thread(UnionBlockInputStream & parent_) : parent(parent_) {}
Thread(UnionBlockInputStream & parent_, MemoryTracker * memory_tracker) : parent(parent_)
{
current_memory_tracker = memory_tracker;
}
void run()
{

View File

@ -3,6 +3,7 @@
#include <boost/noncopyable.hpp>
#include <DB/Common/ProfileEvents.h>
#include <DB/Common/MemoryTracker.h>
#include <DB/Core/Exception.h>
#include <DB/Core/ErrorCodes.h>
@ -87,6 +88,9 @@ private:
ProfileEvents::increment(ProfileEvents::IOBufferAllocs);
ProfileEvents::increment(ProfileEvents::IOBufferAllocBytes, m_capacity);
if (current_memory_tracker)
current_memory_tracker->alloc(m_capacity);
if (!alignment)
{
m_data = reinterpret_cast<char *>(malloc(m_capacity));
@ -105,8 +109,13 @@ private:
void dealloc()
{
if (m_data)
free(reinterpret_cast<void *>(m_data));
if (!m_data)
return;
free(reinterpret_cast<void *>(m_data));
if (current_memory_tracker)
current_memory_tracker->free(m_capacity);
}
};

View File

@ -72,7 +72,10 @@ struct Limits
/** Ограничения для максимального размера запоминаемого состояния при выполнении DISTINCT. */ \
M(SettingUInt64, max_rows_in_distinct, 0) \
M(SettingUInt64, max_bytes_in_distinct, 0) \
M(SettingOverflowMode<false>, distinct_overflow_mode, OverflowMode::THROW)
M(SettingOverflowMode<false>, distinct_overflow_mode, OverflowMode::THROW) \
\
/** Максимальное использование памяти при обработке запроса. 0 - не ограничено. */ \
M(SettingUInt64, max_memory_usage, 0) \
#define DECLARE(TYPE, NAME, DEFAULT) \
TYPE NAME {DEFAULT};

View File

@ -10,6 +10,7 @@
#include <DB/Core/Defines.h>
#include <DB/Core/Exception.h>
#include <DB/Core/ErrorCodes.h>
#include <DB/Common/MemoryTracker.h>
#include <DB/IO/WriteHelpers.h>
@ -34,15 +35,26 @@ public:
Stopwatch watch;
volatile size_t rows_processed;
volatile size_t bytes_processed;
volatile size_t rows_processed = 0;
volatile size_t bytes_processed = 0;
bool is_cancelled;
MemoryTracker memory_tracker;
bool is_cancelled = false;
Element(const String & query_, const String & user_, const String & query_id_, const Poco::Net::IPAddress & ip_address_)
: query(query_), user(user_), query_id(query_id_), ip_address(ip_address_), rows_processed(0), bytes_processed(0),
is_cancelled(false) {}
Element(const String & query_, const String & user_,
const String & query_id_, const Poco::Net::IPAddress & ip_address_,
size_t max_memory_usage)
: query(query_), user(user_), query_id(query_id_), ip_address(ip_address_), memory_tracker(max_memory_usage)
{
current_memory_tracker = &memory_tracker;
}
~Element()
{
current_memory_tracker = nullptr;
}
bool update(size_t rows, size_t bytes) volatile
{
@ -113,7 +125,7 @@ public:
* Если времени не хватило - кинуть исключение.
*/
EntryPtr insert(const String & query_, const String & user_, const String & query_id_, const Poco::Net::IPAddress & ip_address_,
size_t max_wait_milliseconds = DEFAULT_QUERIES_QUEUE_WAIT_TIME_MS, bool replace_running_query = false)
size_t max_memory_usage = 0, size_t max_wait_milliseconds = DEFAULT_QUERIES_QUEUE_WAIT_TIME_MS, bool replace_running_query = false)
{
EntryPtr res;
@ -134,7 +146,7 @@ public:
{
if (!replace_running_query)
throw Exception("Query with id = " + query_id_ + " is already running.",
ErrorCodes::QUERY_WITH_SAME_ID_IS_ALREADY_RUNNING);
ErrorCodes::QUERY_WITH_SAME_ID_IS_ALREADY_RUNNING);
element->second->is_cancelled = true;
/// В случае если запрос отменяется, данные о нем удаляются из мапа в момент отмены.
queries->second.erase(element);
@ -144,7 +156,7 @@ public:
++cur_size;
res = new Entry(*this, cont.insert(cont.end(), Element(query_, user_, query_id_, ip_address_)));
res = new Entry(*this, cont.emplace(cont.end(), query_, user_, query_id_, ip_address_, max_memory_usage));
if (!query_id_.empty())
user_to_queries[user_][query_id_] = &res->get();

View File

@ -91,9 +91,9 @@ private:
/// Для более точного контроля max_rows_to_group_by.
size_t size_of_all_results;
void calculateHashesThread(Block & block, size_t begin, size_t end, ExceptionPtr & exception);
void aggregateThread(Block & block, AggregatedDataVariants & result, size_t thread_no, ExceptionPtr & exception);
void convertToBlockThread(AggregatedDataVariants & data_variant, Block & block, bool final, ExceptionPtr & exception);
void calculateHashesThread(Block & block, size_t begin, size_t end, ExceptionPtr & exception, MemoryTracker * memory_tracker);
void aggregateThread(Block & block, AggregatedDataVariants & result, size_t thread_no, ExceptionPtr & exception, MemoryTracker * memory_tracker);
void convertToBlockThread(AggregatedDataVariants & data_variant, Block & block, bool final, ExceptionPtr & exception, MemoryTracker * memory_tracker);
};

View File

@ -12,11 +12,11 @@ namespace DB
/** Парсит и исполняет запрос.
*/
void executeQuery(
ReadBuffer & istr, /// Откуда читать запрос (а также данные для INSERT-а, если есть)
WriteBuffer & ostr, /// Куда писать результат
Context & context, /// БД, таблицы, типы данных, движки таблиц, функции, агрегатные функции...
BlockInputStreamPtr & query_plan, /// Сюда может быть записано описание, как выполнялся запрос
bool internal = false, /// Если true - значит запрос порождён из другого запроса, и не нужно его регистировать в ProcessList-е.
ReadBuffer & istr, /// Откуда читать запрос (а также данные для INSERT-а, если есть)
WriteBuffer & ostr, /// Куда писать результат
Context & context, /// БД, таблицы, типы данных, движки таблиц, функции, агрегатные функции...
BlockInputStreamPtr & query_plan, /// Сюда может быть записано описание, как выполнялся запрос
bool internal = false, /// Если true - значит запрос порождён из другого запроса, и не нужно его регистировать в ProcessList-е.
QueryProcessingStage::Enum stage = QueryProcessingStage::Complete); /// До какой стадии выполнять SELECT запрос.
@ -30,7 +30,7 @@ void executeQuery(
* - затем читайте результат из BlockIO::in;
*
* Если запрос не предполагает записи данных или возврата результата, то out и in,
* соответственно, будут равны NULL.
* соответственно, будут равны nullptr.
*
* Часть запроса по парсингу и форматированию (секция FORMAT) необходимо выполнить отдельно.
*/

View File

@ -0,0 +1,31 @@
#include <Yandex/likely.h>
#include <Yandex/logger_useful.h>
#include <DB/Core/Exception.h>
#include <DB/IO/WriteHelpers.h>
#include <DB/Common/MemoryTracker.h>
MemoryTracker::~MemoryTracker()
{
LOG_DEBUG(&Logger::get("MemoryTracker"), "Peak memory usage for query: " << peak << " bytes.");
}
void MemoryTracker::alloc(Int32 size)
{
Int32 will_be = __sync_add_and_fetch(&amount, size);
if (unlikely(limit && will_be > limit))
{
free(size);
throw DB::Exception("Memory limit exceeded: would use " + DB::toString(will_be) + " bytes"
" (attempt to allocate chunk of " + DB::toString(size) + " bytes)"
", maximum: " + DB::toString(limit) + " bytes", DB::ErrorCodes::MEMORY_LIMIT_EXCEEDED);
}
if (will_be > peak)
peak = will_be;
}
__thread MemoryTracker * current_memory_tracker = nullptr;

View File

@ -102,7 +102,8 @@ void SplittingAggregator::execute(BlockInputStreamPtr stream, ManyAggregatedData
boost::ref(block),
rows * thread_no / threads,
rows * (thread_no + 1) / threads,
boost::ref(exceptions[thread_no])));
boost::ref(exceptions[thread_no]),
current_memory_tracker));
pool.wait();
@ -120,7 +121,8 @@ void SplittingAggregator::execute(BlockInputStreamPtr stream, ManyAggregatedData
boost::ref(block),
boost::ref(*results[thread_no]),
thread_no,
boost::ref(exceptions[thread_no])));
boost::ref(exceptions[thread_no]),
current_memory_tracker));
pool.wait();
@ -158,7 +160,8 @@ void SplittingAggregator::convertToBlocks(ManyAggregatedDataVariants & data_vari
boost::ref(*data_variants[thread_no]),
boost::ref(blocks[thread_no]),
final,
boost::ref(exceptions[thread_no])));
boost::ref(exceptions[thread_no]),
current_memory_tracker));
pool.wait();
@ -166,8 +169,10 @@ void SplittingAggregator::convertToBlocks(ManyAggregatedDataVariants & data_vari
}
void SplittingAggregator::calculateHashesThread(Block & block, size_t begin, size_t end, ExceptionPtr & exception)
void SplittingAggregator::calculateHashesThread(Block & block, size_t begin, size_t end, ExceptionPtr & exception, MemoryTracker * memory_tracker)
{
current_memory_tracker = memory_tracker;
try
{
if (method == AggregatedDataVariants::KEY_64)
@ -237,8 +242,11 @@ void SplittingAggregator::calculateHashesThread(Block & block, size_t begin, siz
}
void SplittingAggregator::aggregateThread(Block & block, AggregatedDataVariants & result, size_t thread_no, ExceptionPtr & exception)
void SplittingAggregator::aggregateThread(
Block & block, AggregatedDataVariants & result, size_t thread_no, ExceptionPtr & exception, MemoryTracker * memory_tracker)
{
current_memory_tracker = memory_tracker;
try
{
result.aggregator = this;
@ -419,8 +427,11 @@ void SplittingAggregator::aggregateThread(Block & block, AggregatedDataVariants
}
void SplittingAggregator::convertToBlockThread(AggregatedDataVariants & data_variant, Block & block, bool final, ExceptionPtr & exception)
void SplittingAggregator::convertToBlockThread(
AggregatedDataVariants & data_variant, Block & block, bool final, ExceptionPtr & exception, MemoryTracker * memory_tracker)
{
current_memory_tracker = memory_tracker;
try
{
block = convertToBlock(data_variant, final);

View File

@ -89,8 +89,10 @@ void executeQuery(
{
process_list_entry = context.getProcessList().insert(
query, context.getUser(), context.getCurrentQueryId(), context.getIPAddress(),
context.getSettingsRef().limits.max_memory_usage,
context.getSettingsRef().queue_max_wait_ms.totalMilliseconds(),
context.getSettingsRef().replace_running_query);
context.setProcessListElement(&process_list_entry->get());
}
@ -159,8 +161,10 @@ BlockIO executeQuery(
{
process_list_entry = context.getProcessList().insert(
query, context.getUser(), context.getCurrentQueryId(), context.getIPAddress(),
context.getSettingsRef().limits.max_memory_usage,
context.getSettingsRef().queue_max_wait_ms.totalMilliseconds(),
context.getSettingsRef().replace_running_query);
context.setProcessListElement(&process_list_entry->get());
}

View File

@ -12,13 +12,14 @@ namespace DB
StorageSystemProcesses::StorageSystemProcesses(const std::string & name_, const Context & context_)
: name(name_), context(context_)
{
columns.push_back(NameAndTypePair("user", new DataTypeString));
columns.push_back(NameAndTypePair("address", new DataTypeString));
columns.push_back(NameAndTypePair("elapsed", new DataTypeFloat64));
columns.push_back(NameAndTypePair("rows_read", new DataTypeUInt64));
columns.push_back(NameAndTypePair("bytes_read", new DataTypeUInt64));
columns.push_back(NameAndTypePair("query", new DataTypeString));
columns.push_back(NameAndTypePair("query_id", new DataTypeString));
columns.push_back(NameAndTypePair("user", new DataTypeString));
columns.push_back(NameAndTypePair("address", new DataTypeString));
columns.push_back(NameAndTypePair("elapsed", new DataTypeFloat64));
columns.push_back(NameAndTypePair("rows_read", new DataTypeUInt64));
columns.push_back(NameAndTypePair("bytes_read", new DataTypeUInt64));
columns.push_back(NameAndTypePair("memory_usage", new DataTypeUInt64));
columns.push_back(NameAndTypePair("query", new DataTypeString));
columns.push_back(NameAndTypePair("query_id", new DataTypeString));
}
StoragePtr StorageSystemProcesses::create(const std::string & name_, const Context & context_)
@ -66,6 +67,12 @@ BlockInputStreams StorageSystemProcesses::read(
col_bytes_read.column = new ColumnUInt64;
block.insert(col_bytes_read);
ColumnWithNameAndType col_memory_usage;
col_memory_usage.name = "memory_usage";
col_memory_usage.type = new DataTypeUInt64;
col_memory_usage.column = new ColumnUInt64;
block.insert(col_memory_usage);
ColumnWithNameAndType col_query;
col_query.name = "query";
col_query.type = new DataTypeString;
@ -90,6 +97,7 @@ BlockInputStreams StorageSystemProcesses::read(
col_elapsed.column->insert(it->watch.elapsedSeconds());
col_rows_read.column->insert(rows_read);
col_bytes_read.column->insert(bytes_read);
col_memory_usage.column->insert(UInt64(it->memory_tracker.get()));
col_query.column->insert(it->query);
col_query_id.column->insert(it->query_id);
}