diff --git a/dbms/include/DB/AggregateFunctions/AggregateFunctionGroupArray.h b/dbms/include/DB/AggregateFunctions/AggregateFunctionGroupArray.h index bc8d0fc5184..fb935a782a0 100644 --- a/dbms/include/DB/AggregateFunctions/AggregateFunctionGroupArray.h +++ b/dbms/include/DB/AggregateFunctions/AggregateFunctionGroupArray.h @@ -15,7 +15,7 @@ namespace DB struct AggregateFunctionGroupArrayData { - Array value; + Array value; /// TODO Добавить MemoryTracker }; diff --git a/dbms/include/DB/AggregateFunctions/AggregateFunctionQuantile.h b/dbms/include/DB/AggregateFunctions/AggregateFunctionQuantile.h index 1c12f2d48bb..e6b3c09327c 100644 --- a/dbms/include/DB/AggregateFunctions/AggregateFunctionQuantile.h +++ b/dbms/include/DB/AggregateFunctions/AggregateFunctionQuantile.h @@ -20,7 +20,7 @@ template struct AggregateFunctionQuantileData { typedef ReservoirSampler Sample; - Sample sample; + Sample sample; /// TODO Добавить MemoryTracker }; diff --git a/dbms/include/DB/AggregateFunctions/AggregateFunctionQuantileTiming.h b/dbms/include/DB/AggregateFunctions/AggregateFunctionQuantileTiming.h index 86b8cd78389..024cc11b078 100644 --- a/dbms/include/DB/AggregateFunctions/AggregateFunctionQuantileTiming.h +++ b/dbms/include/DB/AggregateFunctions/AggregateFunctionQuantileTiming.h @@ -2,6 +2,8 @@ #include +#include + #include #include @@ -324,6 +326,9 @@ private: void toLarge() { + if (current_memory_tracker) + current_memory_tracker->alloc(sizeof(detail::QuantileTimingLarge)); + /// На время копирования данных из tiny, устанавливать значение large ещё нельзя (иначе оно перезатрёт часть данных). detail::QuantileTimingLarge * tmp_large = new detail::QuantileTimingLarge; @@ -343,7 +348,12 @@ public: ~QuantileTiming() { if (isLarge()) + { delete large; + + if (current_memory_tracker) + current_memory_tracker->free(sizeof(detail::QuantileTimingLarge)); + } } void insert(UInt64 x) @@ -405,6 +415,10 @@ public: if (!isLarge()) { tiny.count = TINY_MAX_ELEMS + 1; + + if (current_memory_tracker) + current_memory_tracker->alloc(sizeof(detail::QuantileTimingLarge)); + large = new detail::QuantileTimingLarge; } @@ -424,6 +438,10 @@ public: if (!isLarge()) { tiny.count = TINY_MAX_ELEMS + 1; + + if (current_memory_tracker) + current_memory_tracker->alloc(sizeof(detail::QuantileTimingLarge)); + large = new detail::QuantileTimingLarge; } diff --git a/dbms/include/DB/Common/Arena.h b/dbms/include/DB/Common/Arena.h index 42af26f3baa..e81c7ad9c74 100644 --- a/dbms/include/DB/Common/Arena.h +++ b/dbms/include/DB/Common/Arena.h @@ -6,6 +6,7 @@ #include #include #include +#include namespace DB @@ -37,6 +38,9 @@ private: ProfileEvents::increment(ProfileEvents::ArenaAllocChunks); ProfileEvents::increment(ProfileEvents::ArenaAllocBytes, size_); + if (current_memory_tracker) + current_memory_tracker->alloc(size_); + begin = allocate(size_); pos = begin; end = begin + size_; @@ -47,6 +51,9 @@ private: { deallocate(begin, size()); + if (current_memory_tracker) + current_memory_tracker->free(size()); + if (prev) delete prev; } diff --git a/dbms/include/DB/Common/HashTable/HashTable.h b/dbms/include/DB/Common/HashTable/HashTable.h index 12c7453d1bd..231a0a14ff6 100644 --- a/dbms/include/DB/Common/HashTable/HashTable.h +++ b/dbms/include/DB/Common/HashTable/HashTable.h @@ -264,6 +264,7 @@ protected: void alloc() { + // TODO Если здесь исключение, то free будет вызывана от неправильного размера. buf = reinterpret_cast(Allocator::alloc(bufSizeBytes())); } diff --git a/dbms/include/DB/Common/HashTable/HashTableAllocator.h b/dbms/include/DB/Common/HashTable/HashTableAllocator.h index 9d7cfcce1b8..3906c351c03 100644 --- a/dbms/include/DB/Common/HashTable/HashTableAllocator.h +++ b/dbms/include/DB/Common/HashTable/HashTableAllocator.h @@ -4,6 +4,7 @@ #include #include +#include #include #include @@ -31,6 +32,9 @@ public: /// Выделить кусок памяти и заполнить его нулями. void * alloc(size_t size) { + if (current_memory_tracker) + current_memory_tracker->alloc(size); + void * buf; if (size >= MMAP_THRESHOLD) @@ -63,6 +67,9 @@ public: { ::free(buf); } + + if (current_memory_tracker) + current_memory_tracker->free(size); } /** Увеличить размер куска памяти. @@ -72,6 +79,9 @@ public: */ void * realloc(void * buf, size_t old_size, size_t new_size) { + if (current_memory_tracker) + current_memory_tracker->realloc(old_size, new_size); + if (old_size < MMAP_THRESHOLD && new_size < MMAP_THRESHOLD) { buf = ::realloc(buf, new_size); @@ -96,7 +106,6 @@ public: buf = new_buf; } - return buf; } }; diff --git a/dbms/include/DB/Common/MemoryTracker.h b/dbms/include/DB/Common/MemoryTracker.h new file mode 100644 index 00000000000..85e30472899 --- /dev/null +++ b/dbms/include/DB/Common/MemoryTracker.h @@ -0,0 +1,49 @@ +#pragma once + +#include + + +/** Отслеживает потребление памяти. + * Кидает исключение, если оно стало бы больше некоторого предельного значения. + * Один объект может использоваться одновременно в разных потоках. + */ +class MemoryTracker +{ + Int32 amount = 0; + Int32 peak = 0; + Int32 limit = 0; + +public: + MemoryTracker(Int32 limit_) : limit(limit_) {} + + ~MemoryTracker(); + + /** Вызывайте эти функции перед соответствующими операциями с памятью. + */ + void alloc(Int32 size); + + void realloc(Int32 old_size, Int32 new_size) + { + alloc(new_size - old_size); + } + + /** А эту функцию имеет смысл вызывать после освобождения памяти. + */ + void free(Int32 size) + { + __sync_sub_and_fetch(&amount, size); + } + + Int32 get() const + { + return amount; + } +}; + + +/** Объект MemoryTracker довольно трудно протащить во все места, где выделяются существенные объёмы памяти. + * Поэтому, используется thread-local указатель на используемый MemoryTracker или nullptr, если его не нужно использовать. + * Этот указатель выставляется, когда в данном потоке следует отслеживать потребление памяти. + * Таким образом, его нужно всего-лишь протащить во все потоки, в которых обрабатывается один запрос. + */ +extern __thread MemoryTracker * current_memory_tracker; diff --git a/dbms/include/DB/Common/PODArray.h b/dbms/include/DB/Common/PODArray.h index fc6844b29d6..62e7463f7e9 100644 --- a/dbms/include/DB/Common/PODArray.h +++ b/dbms/include/DB/Common/PODArray.h @@ -12,6 +12,7 @@ #include #include +#include #include #include @@ -88,6 +89,10 @@ private: } size_t bytes_to_alloc = to_size(n); + + if (current_memory_tracker) + current_memory_tracker->alloc(bytes_to_alloc); + c_start = c_end = Allocator::allocate(bytes_to_alloc); c_end_of_storage = c_start + bytes_to_alloc; } @@ -101,6 +106,9 @@ private: ::free(c_start); else Allocator::deallocate(c_start, storage_size()); + + if (current_memory_tracker) + current_memory_tracker->free(storage_size()); } void realloc(size_t n) @@ -117,6 +125,9 @@ private: char * old_c_start = c_start; char * old_c_end_of_storage = c_end_of_storage; + if (current_memory_tracker) + current_memory_tracker->realloc(storage_size(), bytes_to_alloc); + if (use_libc_realloc) { c_start = reinterpret_cast(::realloc(c_start, bytes_to_alloc)); diff --git a/dbms/include/DB/Core/ErrorCodes.h b/dbms/include/DB/Core/ErrorCodes.h index c4f0d3f81ef..935689fd402 100644 --- a/dbms/include/DB/Core/ErrorCodes.h +++ b/dbms/include/DB/Core/ErrorCodes.h @@ -248,6 +248,7 @@ namespace ErrorCodes FORMAT_VERSION_TOO_OLD, CANNOT_MUNMAP, CANNOT_MREMAP, + MEMORY_LIMIT_EXCEEDED, POCO_EXCEPTION = 1000, STD_EXCEPTION, diff --git a/dbms/include/DB/DataStreams/AsynchronousBlockInputStream.h b/dbms/include/DB/DataStreams/AsynchronousBlockInputStream.h index c5129bd18bb..acf8d8c3f36 100644 --- a/dbms/include/DB/DataStreams/AsynchronousBlockInputStream.h +++ b/dbms/include/DB/DataStreams/AsynchronousBlockInputStream.h @@ -89,7 +89,7 @@ protected: /// Если вычислений ещё не было - вычислим первый блок синхронно if (!started) { - calculate(); + calculate(current_memory_tracker); started = true; } else /// Если вычисления уже идут - подождём результата @@ -113,13 +113,15 @@ protected: void next() { ready.reset(); - pool.schedule(boost::bind(&AsynchronousBlockInputStream::calculate, this)); + pool.schedule(boost::bind(&AsynchronousBlockInputStream::calculate, this, current_memory_tracker)); } /// Вычисления, которые могут выполняться в отдельном потоке - void calculate() + void calculate(MemoryTracker * memory_tracker) { + current_memory_tracker = memory_tracker; + try { block = children.back()->read(); diff --git a/dbms/include/DB/DataStreams/ParallelAggregatingBlockInputStream.h b/dbms/include/DB/DataStreams/ParallelAggregatingBlockInputStream.h index 804ddfd04c7..239af6a3b16 100644 --- a/dbms/include/DB/DataStreams/ParallelAggregatingBlockInputStream.h +++ b/dbms/include/DB/DataStreams/ParallelAggregatingBlockInputStream.h @@ -76,7 +76,8 @@ protected: for (size_t i = 0, size = many_data.size(); i < size; ++i) { many_data[i] = new AggregatedDataVariants; - pool.schedule(boost::bind(&ParallelAggregatingBlockInputStream::calculate, this, boost::ref(children[i]), boost::ref(*many_data[i]), boost::ref(exceptions[i]))); + pool.schedule(boost::bind(&ParallelAggregatingBlockInputStream::calculate, this, + boost::ref(children[i]), boost::ref(*many_data[i]), boost::ref(exceptions[i]), current_memory_tracker)); } pool.wait(); @@ -97,8 +98,10 @@ private: boost::threadpool::pool pool; /// Вычисления, которые выполняются в отдельном потоке - void calculate(BlockInputStreamPtr & input, AggregatedDataVariants & data, ExceptionPtr & exception) + void calculate(BlockInputStreamPtr & input, AggregatedDataVariants & data, ExceptionPtr & exception, MemoryTracker * memory_tracker) { + current_memory_tracker = memory_tracker; + try { aggregator->execute(input, data); diff --git a/dbms/include/DB/DataStreams/UnionBlockInputStream.h b/dbms/include/DB/DataStreams/UnionBlockInputStream.h index dcd57371aa7..35acb5b0a35 100644 --- a/dbms/include/DB/DataStreams/UnionBlockInputStream.h +++ b/dbms/include/DB/DataStreams/UnionBlockInputStream.h @@ -148,7 +148,7 @@ protected: threads_data.resize(max_threads); for (ThreadsData::iterator it = threads_data.begin(); it != threads_data.end(); ++it) { - it->runnable = new Thread(*this); + it->runnable = new Thread(*this, current_memory_tracker); it->thread = new Poco::Thread; it->thread->start(*it->runnable); } @@ -202,7 +202,10 @@ private: class Thread : public Poco::Runnable { public: - Thread(UnionBlockInputStream & parent_) : parent(parent_) {} + Thread(UnionBlockInputStream & parent_, MemoryTracker * memory_tracker) : parent(parent_) + { + current_memory_tracker = memory_tracker; + } void run() { diff --git a/dbms/include/DB/IO/BufferWithOwnMemory.h b/dbms/include/DB/IO/BufferWithOwnMemory.h index 3791463c4bb..144acdfd853 100644 --- a/dbms/include/DB/IO/BufferWithOwnMemory.h +++ b/dbms/include/DB/IO/BufferWithOwnMemory.h @@ -3,6 +3,7 @@ #include #include +#include #include #include @@ -87,6 +88,9 @@ private: ProfileEvents::increment(ProfileEvents::IOBufferAllocs); ProfileEvents::increment(ProfileEvents::IOBufferAllocBytes, m_capacity); + if (current_memory_tracker) + current_memory_tracker->alloc(m_capacity); + if (!alignment) { m_data = reinterpret_cast(malloc(m_capacity)); @@ -105,8 +109,13 @@ private: void dealloc() { - if (m_data) - free(reinterpret_cast(m_data)); + if (!m_data) + return; + + free(reinterpret_cast(m_data)); + + if (current_memory_tracker) + current_memory_tracker->free(m_capacity); } }; diff --git a/dbms/include/DB/Interpreters/Limits.h b/dbms/include/DB/Interpreters/Limits.h index 03b71be6eb2..1fe7d7f6424 100644 --- a/dbms/include/DB/Interpreters/Limits.h +++ b/dbms/include/DB/Interpreters/Limits.h @@ -72,7 +72,10 @@ struct Limits /** Ограничения для максимального размера запоминаемого состояния при выполнении DISTINCT. */ \ M(SettingUInt64, max_rows_in_distinct, 0) \ M(SettingUInt64, max_bytes_in_distinct, 0) \ - M(SettingOverflowMode, distinct_overflow_mode, OverflowMode::THROW) + M(SettingOverflowMode, distinct_overflow_mode, OverflowMode::THROW) \ + \ + /** Максимальное использование памяти при обработке запроса. 0 - не ограничено. */ \ + M(SettingUInt64, max_memory_usage, 0) \ #define DECLARE(TYPE, NAME, DEFAULT) \ TYPE NAME {DEFAULT}; diff --git a/dbms/include/DB/Interpreters/ProcessList.h b/dbms/include/DB/Interpreters/ProcessList.h index 8792ab1eab8..09db7c14a2a 100644 --- a/dbms/include/DB/Interpreters/ProcessList.h +++ b/dbms/include/DB/Interpreters/ProcessList.h @@ -10,6 +10,7 @@ #include #include #include +#include #include @@ -34,15 +35,26 @@ public: Stopwatch watch; - volatile size_t rows_processed; - volatile size_t bytes_processed; + volatile size_t rows_processed = 0; + volatile size_t bytes_processed = 0; - bool is_cancelled; + MemoryTracker memory_tracker; + + bool is_cancelled = false; - Element(const String & query_, const String & user_, const String & query_id_, const Poco::Net::IPAddress & ip_address_) - : query(query_), user(user_), query_id(query_id_), ip_address(ip_address_), rows_processed(0), bytes_processed(0), - is_cancelled(false) {} + Element(const String & query_, const String & user_, + const String & query_id_, const Poco::Net::IPAddress & ip_address_, + size_t max_memory_usage) + : query(query_), user(user_), query_id(query_id_), ip_address(ip_address_), memory_tracker(max_memory_usage) + { + current_memory_tracker = &memory_tracker; + } + + ~Element() + { + current_memory_tracker = nullptr; + } bool update(size_t rows, size_t bytes) volatile { @@ -113,7 +125,7 @@ public: * Если времени не хватило - кинуть исключение. */ EntryPtr insert(const String & query_, const String & user_, const String & query_id_, const Poco::Net::IPAddress & ip_address_, - size_t max_wait_milliseconds = DEFAULT_QUERIES_QUEUE_WAIT_TIME_MS, bool replace_running_query = false) + size_t max_memory_usage = 0, size_t max_wait_milliseconds = DEFAULT_QUERIES_QUEUE_WAIT_TIME_MS, bool replace_running_query = false) { EntryPtr res; @@ -134,7 +146,7 @@ public: { if (!replace_running_query) throw Exception("Query with id = " + query_id_ + " is already running.", - ErrorCodes::QUERY_WITH_SAME_ID_IS_ALREADY_RUNNING); + ErrorCodes::QUERY_WITH_SAME_ID_IS_ALREADY_RUNNING); element->second->is_cancelled = true; /// В случае если запрос отменяется, данные о нем удаляются из мапа в момент отмены. queries->second.erase(element); @@ -144,7 +156,7 @@ public: ++cur_size; - res = new Entry(*this, cont.insert(cont.end(), Element(query_, user_, query_id_, ip_address_))); + res = new Entry(*this, cont.emplace(cont.end(), query_, user_, query_id_, ip_address_, max_memory_usage)); if (!query_id_.empty()) user_to_queries[user_][query_id_] = &res->get(); diff --git a/dbms/include/DB/Interpreters/SplittingAggregator.h b/dbms/include/DB/Interpreters/SplittingAggregator.h index b4d7fd118d9..c52c78509e8 100644 --- a/dbms/include/DB/Interpreters/SplittingAggregator.h +++ b/dbms/include/DB/Interpreters/SplittingAggregator.h @@ -91,9 +91,9 @@ private: /// Для более точного контроля max_rows_to_group_by. size_t size_of_all_results; - void calculateHashesThread(Block & block, size_t begin, size_t end, ExceptionPtr & exception); - void aggregateThread(Block & block, AggregatedDataVariants & result, size_t thread_no, ExceptionPtr & exception); - void convertToBlockThread(AggregatedDataVariants & data_variant, Block & block, bool final, ExceptionPtr & exception); + void calculateHashesThread(Block & block, size_t begin, size_t end, ExceptionPtr & exception, MemoryTracker * memory_tracker); + void aggregateThread(Block & block, AggregatedDataVariants & result, size_t thread_no, ExceptionPtr & exception, MemoryTracker * memory_tracker); + void convertToBlockThread(AggregatedDataVariants & data_variant, Block & block, bool final, ExceptionPtr & exception, MemoryTracker * memory_tracker); }; diff --git a/dbms/include/DB/Interpreters/executeQuery.h b/dbms/include/DB/Interpreters/executeQuery.h index cd7c532935a..e776f8dfed9 100644 --- a/dbms/include/DB/Interpreters/executeQuery.h +++ b/dbms/include/DB/Interpreters/executeQuery.h @@ -12,11 +12,11 @@ namespace DB /** Парсит и исполняет запрос. */ void executeQuery( - ReadBuffer & istr, /// Откуда читать запрос (а также данные для INSERT-а, если есть) - WriteBuffer & ostr, /// Куда писать результат - Context & context, /// БД, таблицы, типы данных, движки таблиц, функции, агрегатные функции... - BlockInputStreamPtr & query_plan, /// Сюда может быть записано описание, как выполнялся запрос - bool internal = false, /// Если true - значит запрос порождён из другого запроса, и не нужно его регистировать в ProcessList-е. + ReadBuffer & istr, /// Откуда читать запрос (а также данные для INSERT-а, если есть) + WriteBuffer & ostr, /// Куда писать результат + Context & context, /// БД, таблицы, типы данных, движки таблиц, функции, агрегатные функции... + BlockInputStreamPtr & query_plan, /// Сюда может быть записано описание, как выполнялся запрос + bool internal = false, /// Если true - значит запрос порождён из другого запроса, и не нужно его регистировать в ProcessList-е. QueryProcessingStage::Enum stage = QueryProcessingStage::Complete); /// До какой стадии выполнять SELECT запрос. @@ -30,7 +30,7 @@ void executeQuery( * - затем читайте результат из BlockIO::in; * * Если запрос не предполагает записи данных или возврата результата, то out и in, - * соответственно, будут равны NULL. + * соответственно, будут равны nullptr. * * Часть запроса по парсингу и форматированию (секция FORMAT) необходимо выполнить отдельно. */ diff --git a/dbms/src/Common/MemoryTracker.cpp b/dbms/src/Common/MemoryTracker.cpp new file mode 100644 index 00000000000..eb8097bcb88 --- /dev/null +++ b/dbms/src/Common/MemoryTracker.cpp @@ -0,0 +1,31 @@ +#include +#include +#include +#include + +#include + + +MemoryTracker::~MemoryTracker() +{ + LOG_DEBUG(&Logger::get("MemoryTracker"), "Peak memory usage for query: " << peak << " bytes."); +} + +void MemoryTracker::alloc(Int32 size) +{ + Int32 will_be = __sync_add_and_fetch(&amount, size); + + if (unlikely(limit && will_be > limit)) + { + free(size); + throw DB::Exception("Memory limit exceeded: would use " + DB::toString(will_be) + " bytes" + " (attempt to allocate chunk of " + DB::toString(size) + " bytes)" + ", maximum: " + DB::toString(limit) + " bytes", DB::ErrorCodes::MEMORY_LIMIT_EXCEEDED); + } + + if (will_be > peak) + peak = will_be; +} + + +__thread MemoryTracker * current_memory_tracker = nullptr; diff --git a/dbms/src/Interpreters/SplittingAggregator.cpp b/dbms/src/Interpreters/SplittingAggregator.cpp index bc76d027010..bf80db339f0 100644 --- a/dbms/src/Interpreters/SplittingAggregator.cpp +++ b/dbms/src/Interpreters/SplittingAggregator.cpp @@ -102,7 +102,8 @@ void SplittingAggregator::execute(BlockInputStreamPtr stream, ManyAggregatedData boost::ref(block), rows * thread_no / threads, rows * (thread_no + 1) / threads, - boost::ref(exceptions[thread_no]))); + boost::ref(exceptions[thread_no]), + current_memory_tracker)); pool.wait(); @@ -120,7 +121,8 @@ void SplittingAggregator::execute(BlockInputStreamPtr stream, ManyAggregatedData boost::ref(block), boost::ref(*results[thread_no]), thread_no, - boost::ref(exceptions[thread_no]))); + boost::ref(exceptions[thread_no]), + current_memory_tracker)); pool.wait(); @@ -158,7 +160,8 @@ void SplittingAggregator::convertToBlocks(ManyAggregatedDataVariants & data_vari boost::ref(*data_variants[thread_no]), boost::ref(blocks[thread_no]), final, - boost::ref(exceptions[thread_no]))); + boost::ref(exceptions[thread_no]), + current_memory_tracker)); pool.wait(); @@ -166,8 +169,10 @@ void SplittingAggregator::convertToBlocks(ManyAggregatedDataVariants & data_vari } -void SplittingAggregator::calculateHashesThread(Block & block, size_t begin, size_t end, ExceptionPtr & exception) +void SplittingAggregator::calculateHashesThread(Block & block, size_t begin, size_t end, ExceptionPtr & exception, MemoryTracker * memory_tracker) { + current_memory_tracker = memory_tracker; + try { if (method == AggregatedDataVariants::KEY_64) @@ -237,8 +242,11 @@ void SplittingAggregator::calculateHashesThread(Block & block, size_t begin, siz } -void SplittingAggregator::aggregateThread(Block & block, AggregatedDataVariants & result, size_t thread_no, ExceptionPtr & exception) +void SplittingAggregator::aggregateThread( + Block & block, AggregatedDataVariants & result, size_t thread_no, ExceptionPtr & exception, MemoryTracker * memory_tracker) { + current_memory_tracker = memory_tracker; + try { result.aggregator = this; @@ -419,8 +427,11 @@ void SplittingAggregator::aggregateThread(Block & block, AggregatedDataVariants } -void SplittingAggregator::convertToBlockThread(AggregatedDataVariants & data_variant, Block & block, bool final, ExceptionPtr & exception) +void SplittingAggregator::convertToBlockThread( + AggregatedDataVariants & data_variant, Block & block, bool final, ExceptionPtr & exception, MemoryTracker * memory_tracker) { + current_memory_tracker = memory_tracker; + try { block = convertToBlock(data_variant, final); diff --git a/dbms/src/Interpreters/executeQuery.cpp b/dbms/src/Interpreters/executeQuery.cpp index 320b12e269a..7b37baf8ef4 100644 --- a/dbms/src/Interpreters/executeQuery.cpp +++ b/dbms/src/Interpreters/executeQuery.cpp @@ -89,8 +89,10 @@ void executeQuery( { process_list_entry = context.getProcessList().insert( query, context.getUser(), context.getCurrentQueryId(), context.getIPAddress(), + context.getSettingsRef().limits.max_memory_usage, context.getSettingsRef().queue_max_wait_ms.totalMilliseconds(), context.getSettingsRef().replace_running_query); + context.setProcessListElement(&process_list_entry->get()); } @@ -159,8 +161,10 @@ BlockIO executeQuery( { process_list_entry = context.getProcessList().insert( query, context.getUser(), context.getCurrentQueryId(), context.getIPAddress(), + context.getSettingsRef().limits.max_memory_usage, context.getSettingsRef().queue_max_wait_ms.totalMilliseconds(), context.getSettingsRef().replace_running_query); + context.setProcessListElement(&process_list_entry->get()); } diff --git a/dbms/src/Storages/StorageSystemProcesses.cpp b/dbms/src/Storages/StorageSystemProcesses.cpp index 5ddaaca0a03..da4ba7de8f7 100644 --- a/dbms/src/Storages/StorageSystemProcesses.cpp +++ b/dbms/src/Storages/StorageSystemProcesses.cpp @@ -12,13 +12,14 @@ namespace DB StorageSystemProcesses::StorageSystemProcesses(const std::string & name_, const Context & context_) : name(name_), context(context_) { - columns.push_back(NameAndTypePair("user", new DataTypeString)); - columns.push_back(NameAndTypePair("address", new DataTypeString)); - columns.push_back(NameAndTypePair("elapsed", new DataTypeFloat64)); - columns.push_back(NameAndTypePair("rows_read", new DataTypeUInt64)); - columns.push_back(NameAndTypePair("bytes_read", new DataTypeUInt64)); - columns.push_back(NameAndTypePair("query", new DataTypeString)); - columns.push_back(NameAndTypePair("query_id", new DataTypeString)); + columns.push_back(NameAndTypePair("user", new DataTypeString)); + columns.push_back(NameAndTypePair("address", new DataTypeString)); + columns.push_back(NameAndTypePair("elapsed", new DataTypeFloat64)); + columns.push_back(NameAndTypePair("rows_read", new DataTypeUInt64)); + columns.push_back(NameAndTypePair("bytes_read", new DataTypeUInt64)); + columns.push_back(NameAndTypePair("memory_usage", new DataTypeUInt64)); + columns.push_back(NameAndTypePair("query", new DataTypeString)); + columns.push_back(NameAndTypePair("query_id", new DataTypeString)); } StoragePtr StorageSystemProcesses::create(const std::string & name_, const Context & context_) @@ -66,6 +67,12 @@ BlockInputStreams StorageSystemProcesses::read( col_bytes_read.column = new ColumnUInt64; block.insert(col_bytes_read); + ColumnWithNameAndType col_memory_usage; + col_memory_usage.name = "memory_usage"; + col_memory_usage.type = new DataTypeUInt64; + col_memory_usage.column = new ColumnUInt64; + block.insert(col_memory_usage); + ColumnWithNameAndType col_query; col_query.name = "query"; col_query.type = new DataTypeString; @@ -90,6 +97,7 @@ BlockInputStreams StorageSystemProcesses::read( col_elapsed.column->insert(it->watch.elapsedSeconds()); col_rows_read.column->insert(rows_read); col_bytes_read.column->insert(bytes_read); + col_memory_usage.column->insert(UInt64(it->memory_tracker.get())); col_query.column->insert(it->query); col_query_id.column->insert(it->query_id); }