#pragma once #include #include #include #include #include namespace DB { class ASTInsertQuery; struct BlockIO; class AsynchronousInsertQueue { public: AsynchronousInsertQueue(size_t pool_size, size_t max_data_size); bool push(ASTInsertQuery * query, const Settings & settings); void push(ASTInsertQuery * query, BlockIO && io, const Settings & settings); private: struct InsertQuery { ASTPtr query; Settings settings; }; struct InsertData; struct InsertQueryHash { std::size_t operator () (const InsertQuery &) const; }; struct InsertQueryEquality { bool operator () (const InsertQuery &, const InsertQuery &) const; }; /// Logic and events behind queue are as follows: /// - reset_timeout: if queue is empty for some time, then we delete the queue and free all associated resources, e.g. tables. /// - dump_timeout: if queue is active for too long and there are a lot of rapid inserts, then we dump the data, so it doesn't /// grow for a long period of time and users will be able to select new data in determenistic manner. /// - stale_timeout: if queue is stale for too long, then we dump the data too, so that users will be able to select the last /// piece of inserted data. /// - access_timeout: also we have to check if user still has access to the tables periodically, and if the access is lost, then /// we dump pending data and delete queue immediately. /// - max_data_size: if the maximum size of data is reached, then again we dump the data. using Queue = std::unordered_map, InsertQueryHash, InsertQueryEquality>; using QueueIterator = Queue::iterator; const size_t max_data_size; /// in bytes RWLock lock; std::unique_ptr queue; ThreadPool pool; /// dump the data only inside this pool. /// TODO: ThreadFromGlobalPool remove_empty_thread, check_access_thread; void pushImpl(ASTInsertQuery * query, QueueIterator & it); /// use only under lock static void processData(std::shared_ptr data); }; }