Merge pull request #61132 from ClickHouse/more_memory_checks

Throw memory limit exceptions to avoid OOM in some places
This commit is contained in:
alesapin 2024-03-12 11:48:15 +01:00 committed by GitHub
commit 40a992e1db
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 16 additions and 3 deletions

View File

@ -3,6 +3,7 @@
#include <Processors/Port.h>
#include <Processors/IProcessor.h>
#include <Common/SharedMutex.h>
#include <Common/AllocatorWithMemoryTracking.h>
#include <mutex>
#include <queue>
#include <stack>
@ -117,7 +118,11 @@ public:
}
};
using Queue = std::queue<Node *>;
/// This queue can grow a lot and lead to OOM. That is why we use non-default
/// allocator for container which throws exceptions in operator new
using DequeWithMemoryTracker = std::deque<ExecutingGraph::Node *, AllocatorWithMemoryTracking<ExecutingGraph::Node *>>;
using Queue = std::queue<ExecutingGraph::Node *, DequeWithMemoryTracker>;
using NodePtr = std::unique_ptr<Node>;
using Nodes = std::vector<NodePtr>;
Nodes nodes;

View File

@ -47,7 +47,10 @@ class ExecutorTasks
public:
using Stack = std::stack<UInt64>;
using Queue = std::queue<ExecutingGraph::Node *>;
/// This queue can grow a lot and lead to OOM. That is why we use non-default
/// allocator for container which throws exceptions in operator new
using DequeWithMemoryTracker = std::deque<ExecutingGraph::Node *, AllocatorWithMemoryTracking<ExecutingGraph::Node *>>;
using Queue = std::queue<ExecutingGraph::Node *, DequeWithMemoryTracker>;
void finish();
bool isFinished() const { return finished; }

View File

@ -5,7 +5,9 @@
#include <Common/EventCounter.h>
#include <Common/ThreadPool_fwd.h>
#include <Common/ConcurrencyControl.h>
#include <Common/AllocatorWithMemoryTracking.h>
#include <deque>
#include <queue>
#include <mutex>
#include <memory>
@ -90,7 +92,10 @@ private:
ReadProgressCallbackPtr read_progress_callback;
using Queue = std::queue<ExecutingGraph::Node *>;
/// This queue can grow a lot and lead to OOM. That is why we use non-default
/// allocator for container which throws exceptions in operator new
using DequeWithMemoryTracker = std::deque<ExecutingGraph::Node *, AllocatorWithMemoryTracking<ExecutingGraph::Node *>>;
using Queue = std::queue<ExecutingGraph::Node *, DequeWithMemoryTracker>;
void initializeExecution(size_t num_threads, bool concurrency_control); /// Initialize executor contexts and task_queue.
void finalizeExecution(); /// Check all processors are finished.