ClickHouse/dbms/include/DB/Interpreters/ProcessList.h

160 lines
5.3 KiB
C
Raw Normal View History

#pragma once
#include <map>
#include <list>
#include <memory>
#include <Poco/Mutex.h>
#include <Poco/Condition.h>
#include <Poco/Net/IPAddress.h>
#include <statdaemons/Stopwatch.h>
#include <DB/Core/Defines.h>
#include <DB/Core/Progress.h>
#include <DB/Core/Exception.h>
#include <DB/Core/ErrorCodes.h>
#include <DB/Common/MemoryTracker.h>
#include <DB/IO/WriteHelpers.h>
#include <DB/Interpreters/QueryPriorities.h>
#include <DB/Storages/IStorage.h>
namespace DB
{
/** Список исполняющихся в данный момент запросов.
* Также реализует ограничение на их количество.
*/
2015-04-16 06:12:35 +00:00
/// Запрос и данные о его выполнении.
struct ProcessListElement
{
2015-04-16 06:12:35 +00:00
String query;
String user;
String query_id;
Poco::Net::IPAddress ip_address;
2014-09-10 11:34:26 +00:00
2015-04-16 06:12:35 +00:00
Stopwatch watch;
2015-04-16 06:12:35 +00:00
Progress progress;
2015-04-16 06:12:35 +00:00
MemoryTracker memory_tracker;
QueryPriorities::Handle priority_handle;
2015-04-16 06:12:35 +00:00
bool is_cancelled = false;
2014-09-10 11:34:26 +00:00
/// Здесь могут быть зарегистрированы временные таблицы. Изменять под mutex-ом.
Tables temporary_tables;
2015-04-16 06:12:35 +00:00
ProcessListElement(const String & query_, const String & user_,
const String & query_id_, const Poco::Net::IPAddress & ip_address_,
size_t max_memory_usage, QueryPriorities::Handle && priority_handle_)
: query(query_), user(user_), query_id(query_id_), ip_address(ip_address_), memory_tracker(max_memory_usage),
priority_handle(std::move(priority_handle_))
2015-04-16 06:12:35 +00:00
{
current_memory_tracker = &memory_tracker;
}
2015-04-16 06:12:35 +00:00
~ProcessListElement()
{
current_memory_tracker = nullptr;
}
2015-04-16 06:12:35 +00:00
bool update(const Progress & value)
{
progress.incrementPiecewiseAtomically(value);
if (priority_handle)
priority_handle->waitIfNeed(std::chrono::seconds(1)); /// NOTE Можно сделать настраиваемым таймаут.
2015-04-16 06:12:35 +00:00
return !is_cancelled;
}
};
class ProcessList;
/// Держит итератор на список, и удаляет элемент из списка в деструкторе.
class ProcessListEntry
{
private:
using Container = std::list<ProcessListElement>;
ProcessList & parent;
Container::iterator it;
public:
ProcessListEntry(ProcessList & parent_, Container::iterator it_)
: parent(parent_), it(it_) {}
~ProcessListEntry();
ProcessListElement * operator->() { return &*it; }
const ProcessListElement * operator->() const { return &*it; }
ProcessListElement & get() { return *it; }
const ProcessListElement & get() const { return *it; }
};
2015-04-16 06:12:35 +00:00
class ProcessList
{
friend class ProcessListEntry;
2015-04-16 06:12:35 +00:00
public:
using Element = ProcessListElement;
using Entry = ProcessListEntry;
/// list, чтобы итераторы не инвалидировались. NOTE: можно заменить на cyclic buffer, но почти незачем.
using Container = std::list<Element>;
/// Query_id -> Element *
using QueryToElement = std::unordered_map<String, Element *>;
/// User -> Query_id -> Element *
using UserToQueries = std::unordered_map<String, QueryToElement>;
private:
2013-09-14 05:14:22 +00:00
mutable Poco::FastMutex mutex;
mutable Poco::Condition have_space; /// Количество одновременно выполняющихся запросов стало меньше максимального.
2014-09-10 11:34:26 +00:00
Container cont;
size_t cur_size; /// В C++03 std::list::size не O(1).
size_t max_size; /// Если 0 - не ограничено. Иначе, если пытаемся добавить больше - кидается исключение.
UserToQueries user_to_queries;
QueryPriorities priorities;
public:
ProcessList(size_t max_size_ = 0) : cur_size(0), max_size(max_size_) {}
typedef std::shared_ptr<ProcessListEntry> EntryPtr;
/** Зарегистрировать выполняющийся запрос. Возвращает refcounted объект, который удаляет запрос из списка при уничтожении.
* Если выполняющихся запросов сейчас слишком много - ждать не более указанного времени.
* Если времени не хватило - кинуть исключение.
*/
EntryPtr insert(const String & query_, const String & user_, const String & query_id_, const Poco::Net::IPAddress & ip_address_,
const Settings & settings);
/// Количество одновременно выполняющихся запросов.
2014-09-12 16:05:29 +00:00
size_t size() const { return cur_size; }
/// Получить текущее состояние (копию) списка запросов.
Container get() const
{
2014-09-12 16:05:29 +00:00
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
return cont;
}
void setMaxSize(size_t max_size_)
{
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
max_size = max_size_;
}
/// Зарегистрировать временную таблицу. Потом её можно будет получить по query_id и по названию.
void addTemporaryTable(ProcessListElement & elem, const String & table_name, StoragePtr storage);
/// Найти временную таблицу по query_id и по названию. Замечание: плохо работает, если есть разные запросы с одним query_id.
StoragePtr tryGetTemporaryTable(const String & query_id, const String & table_name) const;
};
}