ClickHouse/src/IO/AsynchronousInsertionQueue.h

84 lines
3.0 KiB
C++
Raw Normal View History

2021-03-04 11:10:21 +00:00
#pragma once
#include <Parsers/IAST_fwd.h>
#include <Common/RWLock.h>
#include <Common/ThreadPool.h>
2021-03-17 14:11:47 +00:00
#include <Core/Settings.h>
2021-03-04 11:10:21 +00:00
#include <unordered_map>
namespace DB
{
class ASTInsertQuery;
struct BlockIO;
class AsynchronousInsertQueue
{
public:
2021-04-19 14:51:26 +00:00
/// Using structure to allow and benefit from designated initialization and not mess with a positional arguments in ctor.
struct Timeout
{
size_t busy, stale; /// in seconds
};
AsynchronousInsertQueue(size_t pool_size, size_t max_data_size, const Timeout & timeouts);
~AsynchronousInsertQueue();
2021-03-04 11:10:21 +00:00
bool push(ASTInsertQuery * query, const Settings & settings);
void push(ASTInsertQuery * query, BlockIO && io, const Settings & settings);
private:
2021-03-17 14:11:47 +00:00
struct InsertQuery
{
ASTPtr query;
Settings settings;
};
2021-03-04 11:10:21 +00:00
struct InsertData;
struct InsertQueryHash
{
std::size_t operator () (const InsertQuery &) const;
};
struct InsertQueryEquality
{
bool operator () (const InsertQuery &, const InsertQuery &) const;
};
2021-03-17 14:11:47 +00:00
/// Logic and events behind queue are as follows:
/// - reset_timeout: if queue is empty for some time, then we delete the queue and free all associated resources, e.g. tables.
2021-04-19 14:51:26 +00:00
/// - busy_timeout: if queue is active for too long and there are a lot of rapid inserts, then we dump the data, so it doesn't
2021-04-15 06:32:45 +00:00
/// grow for a long period of time and users will be able to select new data in deterministic manner.
2021-03-17 14:11:47 +00:00
/// - stale_timeout: if queue is stale for too long, then we dump the data too, so that users will be able to select the last
/// piece of inserted data.
/// - access_timeout: also we have to check if user still has access to the tables periodically, and if the access is lost, then
/// we dump pending data and delete queue immediately.
/// - max_data_size: if the maximum size of data is reached, then again we dump the data.
2021-04-19 14:51:26 +00:00
2021-03-04 11:10:21 +00:00
using Queue = std::unordered_map<InsertQuery, std::shared_ptr<InsertData>, InsertQueryHash, InsertQueryEquality>;
using QueueIterator = Queue::iterator;
2021-03-17 14:11:47 +00:00
const size_t max_data_size; /// in bytes
2021-04-19 14:51:26 +00:00
const std::chrono::seconds busy_timeout, stale_timeout;
2021-03-04 11:10:21 +00:00
RWLock lock;
2021-03-17 14:11:47 +00:00
std::unique_ptr<Queue> queue;
2021-03-04 11:10:21 +00:00
2021-04-19 14:51:26 +00:00
std::atomic<bool> shutdown{false};
2021-03-17 14:11:47 +00:00
ThreadPool pool; /// dump the data only inside this pool.
2021-04-19 14:51:26 +00:00
ThreadFromGlobalPool dump_by_first_update_thread; /// uses busy_timeout and busyCheck()
ThreadFromGlobalPool dump_by_last_update_thread; /// uses stale_timeout and staleCheck()
2021-03-04 11:10:21 +00:00
/// TODO: ThreadFromGlobalPool remove_empty_thread, check_access_thread;
2021-04-19 14:51:26 +00:00
void busyCheck();
void staleCheck();
2021-03-04 11:10:21 +00:00
void pushImpl(ASTInsertQuery * query, QueueIterator & it); /// use only under lock
2021-03-17 14:11:47 +00:00
static void processData(std::shared_ptr<InsertData> data);
2021-03-04 11:10:21 +00:00
};
}