ClickHouse/dbms/include/DB/Interpreters/ProcessList.h

227 lines
5.9 KiB
C++

#pragma once
#include <map>
#include <list>
#include <memory>
#include <mutex>
#include <Poco/Condition.h>
#include <Poco/Net/IPAddress.h>
#include <DB/Common/Stopwatch.h>
#include <DB/Core/Defines.h>
#include <DB/Core/Progress.h>
#include <DB/Common/Exception.h>
#include <DB/Common/MemoryTracker.h>
#include <DB/IO/WriteHelpers.h>
#include <DB/Interpreters/QueryPriorities.h>
#include <DB/Storages/IStorage.h>
#include <DB/Common/CurrentMetrics.h>
namespace CurrentMetrics
{
extern const Metric Query;
}
namespace DB
{
/** List of currently executing queries.
* Also implements limit on their number.
*/
/** Information of process list element.
* To output in SHOW PROCESSLIST query. Does not contain any complex objects, that do something on copy or destructor.
*/
struct ProcessInfo
{
String query;
String user;
String query_id;
Poco::Net::IPAddress ip_address;
UInt16 port;
double elapsed_seconds;
size_t rows;
size_t bytes;
size_t total_rows;
Int64 memory_usage;
};
/// Query and information about its execution.
struct ProcessListElement
{
String query;
String user;
String query_id;
Poco::Net::IPAddress ip_address;
UInt16 port;
Stopwatch watch;
Progress progress;
MemoryTracker memory_tracker;
QueryPriorities::Handle priority_handle;
CurrentMetrics::Increment num_queries {CurrentMetrics::Query};
bool is_cancelled = false;
/// Temporary tables could be registered here. Modify under mutex.
Tables temporary_tables;
ProcessListElement(const String & query_, const String & user_,
const String & query_id_, const Poco::Net::IPAddress & ip_address_,
UInt16 port_, size_t max_memory_usage, double memory_tracker_fault_probability,
QueryPriorities::Handle && priority_handle_)
: query(query_), user(user_), query_id(query_id_), ip_address(ip_address_), port(port_), memory_tracker(max_memory_usage),
priority_handle(std::move(priority_handle_))
{
memory_tracker.setDescription("(for query)");
current_memory_tracker = &memory_tracker;
if (memory_tracker_fault_probability)
memory_tracker.setFaultProbability(memory_tracker_fault_probability);
}
~ProcessListElement()
{
current_memory_tracker = nullptr;
}
bool update(const Progress & value)
{
progress.incrementPiecewiseAtomically(value);
if (priority_handle)
priority_handle->waitIfNeed(std::chrono::seconds(1)); /// NOTE Could make timeout customizable.
return !is_cancelled;
}
ProcessInfo getInfo() const
{
return ProcessInfo{
.query = query,
.user = user,
.query_id = query_id,
.ip_address = ip_address,
.port = port,
.elapsed_seconds = watch.elapsedSeconds(),
.rows = progress.rows,
.bytes = progress.bytes,
.total_rows = progress.total_rows,
.memory_usage = memory_tracker.get(),
};
}
};
/// Data about queries for one user.
struct ProcessListForUser
{
/// Query_id -> ProcessListElement *
using QueryToElement = std::unordered_map<String, ProcessListElement *>;
QueryToElement queries;
/// Limit and counter for memory of all simultaneously running queries of single user.
MemoryTracker user_memory_tracker;
};
class ProcessList;
/// Keeps iterator to process list and removes element in destructor.
class ProcessListEntry
{
private:
using Container = std::list<ProcessListElement>;
ProcessList & parent;
Container::iterator it;
public:
ProcessListEntry(ProcessList & parent_, Container::iterator it_)
: parent(parent_), it(it_) {}
~ProcessListEntry();
ProcessListElement * operator->() { return &*it; }
const ProcessListElement * operator->() const { return &*it; }
ProcessListElement & get() { return *it; }
const ProcessListElement & get() const { return *it; }
};
class ProcessList
{
friend class ProcessListEntry;
public:
using Element = ProcessListElement;
using Entry = ProcessListEntry;
/// list, for iterators not to invalidate. NOTE: could replace with cyclic buffer, but not worth.
using Container = std::list<Element>;
using Info = std::vector<ProcessInfo>;
/// User -> queries
using UserToQueries = std::unordered_map<String, ProcessListForUser>;
private:
mutable std::mutex mutex;
mutable Poco::Condition have_space; /// Number of currently running queries has become less than maximum.
Container cont;
size_t cur_size; /// In C++03 or C++11 and old ABI, std::list::size is not O(1).
size_t max_size; /// 0 means no limit. Otherwise, when limit exceeded, an exception is thrown.
UserToQueries user_to_queries;
QueryPriorities priorities;
/// Limit and counter for memory of all simultaneously running queries.
MemoryTracker total_memory_tracker;
public:
ProcessList(size_t max_size_ = 0) : cur_size(0), max_size(max_size_) {}
using EntryPtr = std::shared_ptr<ProcessListEntry>;
/** Register running query. Returns refcounted object, that will remove element from list in destructor.
* If too much running queries - wait for not more than specified (see settings) amount of time.
* If timeout is passed - throw an exception.
*/
EntryPtr insert(const String & query_, const String & user_, const String & query_id_, const Poco::Net::IPAddress & ip_address_,
UInt16 port_, const Settings & settings);
/// Number of currently executing queries.
size_t size() const { return cur_size; }
/// Get current state of process list.
Info getInfo() const
{
std::lock_guard<std::mutex> lock(mutex);
Info res;
res.reserve(cur_size);
for (const auto & elem : cont)
res.emplace_back(elem.getInfo());
return res;
}
void setMaxSize(size_t max_size_)
{
std::lock_guard<std::mutex> lock(mutex);
max_size = max_size_;
}
/// Register temporary table. Then it is accessible by query_id and name.
void addTemporaryTable(ProcessListElement & elem, const String & table_name, StoragePtr storage);
/// Find temporary table by query_id and name. NOTE: doesn't work fine if there are many queries with same query_id.
StoragePtr tryGetTemporaryTable(const String & query_id, const String & table_name) const;
};
}