#pragma once #include #include #include #include #include namespace DB { /// A queue, that stores data for insert queries and periodically flushes it to tables. /// The data is grouped by table, format and settings of insert query. class AsynchronousInsertQueue : public WithContext { public: using Milliseconds = std::chrono::milliseconds; AsynchronousInsertQueue(ContextPtr context_, size_t pool_size_); ~AsynchronousInsertQueue(); std::future push(ASTPtr query, ContextPtr query_context); size_t getPoolSize() const { return pool_size; } private: struct InsertQuery { public: ASTPtr query; String query_str; Settings settings; UInt128 hash; InsertQuery(const ASTPtr & query_, const Settings & settings_); InsertQuery(const InsertQuery & other); InsertQuery & operator=(const InsertQuery & other); bool operator==(const InsertQuery & other) const; private: UInt128 calculateHash() const; }; struct InsertData { struct Entry { public: const String bytes; const String query_id; const std::chrono::time_point create_time; Entry(String && bytes_, String && query_id_); void finish(std::exception_ptr exception_ = nullptr); std::future getFuture() { return promise.get_future(); } bool isFinished() const { return finished; } private: std::promise promise; std::atomic_bool finished = false; }; using EntryPtr = std::shared_ptr; std::list entries; size_t size_in_bytes = 0; size_t query_number = 0; }; using InsertDataPtr = std::unique_ptr; struct Container { InsertQuery key; InsertDataPtr data; }; /// Ordered container /// Key is a timestamp of the first insert into batch. /// Used to detect for how long the batch is active, so we can dump it by timer. using Queue = std::map; using QueueIterator = Queue::iterator; using QueueIteratorByKey = std::unordered_map; struct QueueShard { mutable std::mutex mutex; mutable std::condition_variable are_tasks_available; Queue queue; QueueIteratorByKey iterators; }; const size_t pool_size; std::vector queue_shards; /// Logic and events behind queue are as follows: /// - async_insert_busy_timeout_ms: /// if queue is active for too long and there are a lot of rapid inserts, then we dump the data, so it doesn't /// grow for a long period of time and users will be able to select new data in deterministic manner. /// /// During processing incoming INSERT queries we can also check whether the maximum size of data in buffer is reached /// (async_insert_max_data_size setting). If so, then again we dump the data. std::atomic shutdown{false}; /// Dump the data only inside this pool. ThreadPool pool; /// Uses async_insert_busy_timeout_ms and processBatchDeadlines() std::vector dump_by_first_update_threads; Poco::Logger * log = &Poco::Logger::get("AsynchronousInsertQueue"); void processBatchDeadlines(size_t shard_num); void scheduleDataProcessingJob(const InsertQuery & key, InsertDataPtr data, ContextPtr global_context); static void processData(InsertQuery key, InsertDataPtr data, ContextPtr global_context); template static void finishWithException(const ASTPtr & query, const std::list & entries, const E & exception); public: auto getQueueLocked(size_t shard_num) const { auto & shard = queue_shards[shard_num]; std::unique_lock lock(shard.mutex); return std::make_pair(std::ref(shard.queue), std::move(lock)); } }; }