ClickHouse/src/Interpreters/AsynchronousInsertQueue.h

193 lines
5.9 KiB
C++
Raw Normal View History

2021-03-04 11:10:21 +00:00
#pragma once
#include <Parsers/IAST_fwd.h>
#include <Common/CurrentThread.h>
2021-03-04 11:10:21 +00:00
#include <Common/ThreadPool.h>
2021-03-17 14:11:47 +00:00
#include <Core/Settings.h>
2021-08-31 02:16:02 +00:00
#include <Poco/Logger.h>
#include <future>
2021-03-04 11:10:21 +00:00
namespace DB
{
2021-09-09 16:10:53 +00:00
/// A queue, that stores data for insert queries and periodically flushes it to tables.
/// The data is grouped by table, format and settings of insert query.
2021-08-31 02:16:02 +00:00
class AsynchronousInsertQueue : public WithContext
2021-03-04 11:10:21 +00:00
{
2021-08-31 02:16:02 +00:00
public:
using Milliseconds = std::chrono::milliseconds;
AsynchronousInsertQueue(ContextPtr context_, size_t pool_size_);
2021-08-31 02:16:02 +00:00
~AsynchronousInsertQueue();
struct PushResult
{
enum Status
{
OK,
TOO_MUCH_DATA,
};
Status status;
/// Future that allows to wait until the query is flushed.
std::future<void> future;
/// Read buffer that contains extracted
/// from query data in case of too much data.
std::unique_ptr<ReadBuffer> insert_data_buffer;
};
PushResult push(ASTPtr query, ContextPtr query_context);
size_t getPoolSize() const { return pool_size; }
2021-08-31 02:16:02 +00:00
private:
struct InsertQuery
{
public:
2021-08-31 02:16:02 +00:00
ASTPtr query;
String query_str;
2021-08-31 02:16:02 +00:00
Settings settings;
UInt128 hash;
2021-08-31 02:16:02 +00:00
2021-09-04 00:57:05 +00:00
InsertQuery(const ASTPtr & query_, const Settings & settings_);
InsertQuery(const InsertQuery & other);
InsertQuery & operator=(const InsertQuery & other);
2021-08-31 02:16:02 +00:00
bool operator==(const InsertQuery & other) const;
private:
UInt128 calculateHash() const;
2021-08-31 02:16:02 +00:00
};
struct UserMemoryTrackerSwitcher
{
explicit UserMemoryTrackerSwitcher(MemoryTracker * new_tracker)
{
auto * thread_tracker = CurrentThread::getMemoryTracker();
prev_untracked_memory = current_thread->untracked_memory;
prev_memory_tracker_parent = thread_tracker->getParent();
current_thread->untracked_memory = 0;
thread_tracker->setParent(new_tracker);
}
~UserMemoryTrackerSwitcher()
{
CurrentThread::flushUntrackedMemory();
auto * thread_tracker = CurrentThread::getMemoryTracker();
current_thread->untracked_memory = prev_untracked_memory;
thread_tracker->setParent(prev_memory_tracker_parent);
}
MemoryTracker * prev_memory_tracker_parent;
Int64 prev_untracked_memory;
};
2021-08-31 02:16:02 +00:00
struct InsertData
{
struct Entry
2021-04-19 14:51:26 +00:00
{
2021-08-31 02:16:02 +00:00
public:
2023-05-02 18:50:14 +00:00
String bytes;
2021-09-10 10:24:09 +00:00
const String query_id;
MemoryTracker * const user_memory_tracker;
const std::chrono::time_point<std::chrono::system_clock> create_time;
2021-09-10 10:24:09 +00:00
Entry(String && bytes_, String && query_id_, MemoryTracker * user_memory_tracker_);
2021-08-31 02:16:02 +00:00
void finish(std::exception_ptr exception_ = nullptr);
std::future<void> getFuture() { return promise.get_future(); }
bool isFinished() const { return finished; }
2021-08-31 02:16:02 +00:00
private:
std::promise<void> promise;
std::atomic_bool finished = false;
2021-04-19 14:51:26 +00:00
};
~InsertData()
{
auto it = entries.begin();
// Entries must be destroyed in context of user who runs async insert.
// Each entry in the list may correspond to a different user,
// so we need to switch current thread's MemoryTracker parent on each iteration.
while (it != entries.end())
{
UserMemoryTrackerSwitcher switcher((*it)->user_memory_tracker);
it = entries.erase(it);
}
}
2021-08-31 02:16:02 +00:00
using EntryPtr = std::shared_ptr<Entry>;
2021-03-04 11:10:21 +00:00
2021-08-31 02:16:02 +00:00
std::list<EntryPtr> entries;
2021-03-04 11:10:21 +00:00
size_t size_in_bytes = 0;
size_t query_number = 0;
2021-08-31 02:16:02 +00:00
};
2021-08-27 21:29:10 +00:00
2021-08-31 02:16:02 +00:00
using InsertDataPtr = std::unique_ptr<InsertData>;
2021-03-04 11:10:21 +00:00
2021-08-31 02:16:02 +00:00
struct Container
{
InsertQuery key;
2021-08-31 02:16:02 +00:00
InsertDataPtr data;
};
2021-03-04 11:10:21 +00:00
2022-08-26 20:29:26 +00:00
/// Ordered container
/// Key is a timestamp of the first insert into batch.
/// Used to detect for how long the batch is active, so we can dump it by timer.
using Queue = std::map<std::chrono::steady_clock::time_point, Container>;
using QueueIterator = Queue::iterator;
using QueueIteratorByKey = std::unordered_map<UInt128, QueueIterator>;
2021-03-04 11:10:21 +00:00
struct QueueShard
{
mutable std::mutex mutex;
mutable std::condition_variable are_tasks_available;
2022-09-14 20:31:19 +00:00
Queue queue;
QueueIteratorByKey iterators;
};
2022-08-26 20:29:26 +00:00
const size_t pool_size;
std::vector<QueueShard> queue_shards;
2021-04-19 14:51:26 +00:00
2021-08-31 02:16:02 +00:00
/// Logic and events behind queue are as follows:
/// - async_insert_busy_timeout_ms:
/// if queue is active for too long and there are a lot of rapid inserts, then we dump the data, so it doesn't
/// grow for a long period of time and users will be able to select new data in deterministic manner.
2022-08-26 12:21:30 +00:00
///
/// During processing incoming INSERT queries we can also check whether the maximum size of data in buffer is reached
/// (async_insert_max_data_size setting). If so, then again we dump the data.
2021-03-04 11:10:21 +00:00
2022-09-26 15:17:34 +00:00
std::atomic<bool> shutdown{false};
/// Dump the data only inside this pool.
ThreadPool pool;
2021-03-04 11:10:21 +00:00
/// Uses async_insert_busy_timeout_ms and processBatchDeadlines()
std::vector<ThreadFromGlobalPool> dump_by_first_update_threads;
2021-04-19 14:51:26 +00:00
Poco::Logger * log = &Poco::Logger::get("AsynchronousInsertQueue");
void processBatchDeadlines(size_t shard_num);
void scheduleDataProcessingJob(const InsertQuery & key, InsertDataPtr data, ContextPtr global_context);
2021-08-31 02:16:02 +00:00
static void processData(InsertQuery key, InsertDataPtr data, ContextPtr global_context);
2021-09-01 23:18:09 +00:00
template <typename E>
static void finishWithException(const ASTPtr & query, const std::list<InsertData::EntryPtr> & entries, const E & exception);
2021-09-01 23:18:09 +00:00
public:
auto getQueueLocked(size_t shard_num) const
2021-09-03 16:46:09 +00:00
{
auto & shard = queue_shards[shard_num];
std::unique_lock lock(shard.mutex);
return std::make_pair(std::ref(shard.queue), std::move(lock));
2021-09-03 16:46:09 +00:00
}
2021-03-04 11:10:21 +00:00
};
}