ClickHouse/dbms/include/DB/Interpreters/ProcessList.h
Alexey Milovidov 54d3af930c Merge
2016-01-13 04:06:56 +03:00

172 lines
5.6 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#pragma once
#include <map>
#include <list>
#include <memory>
#include <Poco/Mutex.h>
#include <Poco/Condition.h>
#include <Poco/Net/IPAddress.h>
#include <DB/Common/Stopwatch.h>
#include <DB/Core/Defines.h>
#include <DB/Core/Progress.h>
#include <DB/Common/Exception.h>
#include <DB/Common/MemoryTracker.h>
#include <DB/IO/WriteHelpers.h>
#include <DB/Interpreters/QueryPriorities.h>
#include <DB/Storages/IStorage.h>
namespace DB
{
/** Список исполняющихся в данный момент запросов.
* Также реализует ограничение на их количество.
*/
/// Запрос и данные о его выполнении.
struct ProcessListElement
{
String query;
String user;
String query_id;
Poco::Net::IPAddress ip_address;
Stopwatch watch;
Progress progress;
MemoryTracker memory_tracker;
QueryPriorities::Handle priority_handle;
bool is_cancelled = false;
/// Здесь могут быть зарегистрированы временные таблицы. Изменять под mutex-ом.
Tables temporary_tables;
ProcessListElement(const String & query_, const String & user_,
const String & query_id_, const Poco::Net::IPAddress & ip_address_,
size_t max_memory_usage, double memory_tracker_fault_probability,
QueryPriorities::Handle && priority_handle_)
: query(query_), user(user_), query_id(query_id_), ip_address(ip_address_), memory_tracker(max_memory_usage),
priority_handle(std::move(priority_handle_))
{
current_memory_tracker = &memory_tracker;
if (memory_tracker_fault_probability)
memory_tracker.setFaultProbability(memory_tracker_fault_probability);
}
~ProcessListElement()
{
current_memory_tracker = nullptr;
}
bool update(const Progress & value)
{
progress.incrementPiecewiseAtomically(value);
if (priority_handle)
priority_handle->waitIfNeed(std::chrono::seconds(1)); /// NOTE Можно сделать настраиваемым таймаут.
return !is_cancelled;
}
};
/// Данные о запросах одного пользователя.
struct ProcessListForUser
{
/// Query_id -> ProcessListElement *
using QueryToElement = std::unordered_map<String, ProcessListElement *>;
QueryToElement queries;
};
class ProcessList;
/// Держит итератор на список, и удаляет элемент из списка в деструкторе.
class ProcessListEntry
{
private:
using Container = std::list<ProcessListElement>;
ProcessList & parent;
Container::iterator it;
public:
ProcessListEntry(ProcessList & parent_, Container::iterator it_)
: parent(parent_), it(it_) {}
~ProcessListEntry();
ProcessListElement * operator->() { return &*it; }
const ProcessListElement * operator->() const { return &*it; }
ProcessListElement & get() { return *it; }
const ProcessListElement & get() const { return *it; }
};
class ProcessList
{
friend class ProcessListEntry;
public:
using Element = ProcessListElement;
using Entry = ProcessListEntry;
/// list, чтобы итераторы не инвалидировались. NOTE: можно заменить на cyclic buffer, но почти незачем.
using Container = std::list<Element>;
/// Query_id -> Element *
using QueryToElement = std::unordered_map<String, Element *>;
/// User -> Query_id -> Element *
using UserToQueries = std::unordered_map<String, QueryToElement>;
private:
mutable Poco::FastMutex mutex;
mutable Poco::Condition have_space; /// Количество одновременно выполняющихся запросов стало меньше максимального.
Container cont;
size_t cur_size; /// В C++03 std::list::size не O(1).
size_t max_size; /// Если 0 - не ограничено. Иначе, если пытаемся добавить больше - кидается исключение.
UserToQueries user_to_queries;
QueryPriorities priorities;
public:
ProcessList(size_t max_size_ = 0) : cur_size(0), max_size(max_size_) {}
typedef std::shared_ptr<ProcessListEntry> EntryPtr;
/** Зарегистрировать выполняющийся запрос. Возвращает refcounted объект, который удаляет запрос из списка при уничтожении.
* Если выполняющихся запросов сейчас слишком много - ждать не более указанного времени.
* Если времени не хватило - кинуть исключение.
*/
EntryPtr insert(const String & query_, const String & user_, const String & query_id_, const Poco::Net::IPAddress & ip_address_,
const Settings & settings);
/// Количество одновременно выполняющихся запросов.
size_t size() const { return cur_size; }
/// Получить текущее состояние (копию) списка запросов.
Container get() const
{
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
return cont;
}
void setMaxSize(size_t max_size_)
{
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
max_size = max_size_;
}
/// Зарегистрировать временную таблицу. Потом её можно будет получить по query_id и по названию.
void addTemporaryTable(ProcessListElement & elem, const String & table_name, StoragePtr storage);
/// Найти временную таблицу по query_id и по названию. Замечание: плохо работает, если есть разные запросы с одним query_id.
StoragePtr tryGetTemporaryTable(const String & query_id, const String & table_name) const;
};
}