#pragma once #include #include #include #include #include #include namespace DB { /// A queue, that stores data for insert queries and periodically flushes it to tables. /// The data is grouped by table, format and settings of insert query. class AsynchronousInsertQueue : public WithContext { public: using Milliseconds = std::chrono::milliseconds; AsynchronousInsertQueue(ContextPtr context_, size_t pool_size_, bool flush_on_shutdown_); ~AsynchronousInsertQueue(); struct PushResult { enum Status { OK, TOO_MUCH_DATA, }; Status status; /// Future that allows to wait until the query is flushed. std::future future; /// Read buffer that contains extracted /// from query data in case of too much data. std::unique_ptr insert_data_buffer; }; void flushAll(); PushResult push(ASTPtr query, ContextPtr query_context); size_t getPoolSize() const { return pool_size; } private: struct InsertQuery { public: ASTPtr query; String query_str; Settings settings; UInt128 hash; InsertQuery(const ASTPtr & query_, const Settings & settings_); InsertQuery(const InsertQuery & other); InsertQuery & operator=(const InsertQuery & other); bool operator==(const InsertQuery & other) const; private: UInt128 calculateHash() const; }; struct UserMemoryTrackerSwitcher { explicit UserMemoryTrackerSwitcher(MemoryTracker * new_tracker) { auto * thread_tracker = CurrentThread::getMemoryTracker(); prev_untracked_memory = current_thread->untracked_memory; prev_memory_tracker_parent = thread_tracker->getParent(); current_thread->untracked_memory = 0; thread_tracker->setParent(new_tracker); } ~UserMemoryTrackerSwitcher() { CurrentThread::flushUntrackedMemory(); auto * thread_tracker = CurrentThread::getMemoryTracker(); current_thread->untracked_memory = prev_untracked_memory; thread_tracker->setParent(prev_memory_tracker_parent); } MemoryTracker * prev_memory_tracker_parent; Int64 prev_untracked_memory; }; struct InsertData { struct Entry { public: String bytes; const String query_id; MemoryTracker * const user_memory_tracker; const std::chrono::time_point create_time; Entry(String && bytes_, String && query_id_, MemoryTracker * user_memory_tracker_); void finish(std::exception_ptr exception_ = nullptr); std::future getFuture() { return promise.get_future(); } bool isFinished() const { return finished; } private: std::promise promise; std::atomic_bool finished = false; }; ~InsertData() { auto it = entries.begin(); // Entries must be destroyed in context of user who runs async insert. // Each entry in the list may correspond to a different user, // so we need to switch current thread's MemoryTracker parent on each iteration. while (it != entries.end()) { UserMemoryTrackerSwitcher switcher((*it)->user_memory_tracker); it = entries.erase(it); } } using EntryPtr = std::shared_ptr; std::list entries; size_t size_in_bytes = 0; }; using InsertDataPtr = std::unique_ptr; struct Container { InsertQuery key; InsertDataPtr data; }; /// Ordered container /// Key is a timestamp of the first insert into batch. /// Used to detect for how long the batch is active, so we can dump it by timer. using Queue = std::map; using QueueIterator = Queue::iterator; using QueueIteratorByKey = std::unordered_map; struct QueueShard { mutable std::mutex mutex; mutable std::condition_variable are_tasks_available; Queue queue; QueueIteratorByKey iterators; }; const size_t pool_size; const bool flush_on_shutdown; std::vector queue_shards; /// Logic and events behind queue are as follows: /// - async_insert_busy_timeout_ms: /// if queue is active for too long and there are a lot of rapid inserts, then we dump the data, so it doesn't /// grow for a long period of time and users will be able to select new data in deterministic manner. /// /// During processing incoming INSERT queries we can also check whether the maximum size of data in buffer is reached /// (async_insert_max_data_size setting). If so, then again we dump the data. std::atomic shutdown{false}; std::atomic flush_stopped{false}; /// A mutex that prevents concurrent forced flushes of queue. mutable std::mutex flush_mutex; /// Dump the data only inside this pool. ThreadPool pool; /// Uses async_insert_busy_timeout_ms and processBatchDeadlines() std::vector dump_by_first_update_threads; Poco::Logger * log = &Poco::Logger::get("AsynchronousInsertQueue"); void processBatchDeadlines(size_t shard_num); void scheduleDataProcessingJob(const InsertQuery & key, InsertDataPtr data, ContextPtr global_context); static void processData(InsertQuery key, InsertDataPtr data, ContextPtr global_context); template static void finishWithException(const ASTPtr & query, const std::list & entries, const E & exception); public: auto getQueueLocked(size_t shard_num) const { auto & shard = queue_shards[shard_num]; std::unique_lock lock(shard.mutex); return std::make_pair(std::ref(shard.queue), std::move(lock)); } }; }