2021-03-04 11:10:21 +00:00
# pragma once
# include <Parsers/IAST_fwd.h>
# include <Common/ThreadPool.h>
2021-03-17 14:11:47 +00:00
# include <Core/Settings.h>
2021-08-31 02:16:02 +00:00
# include <Poco/Logger.h>
2021-03-04 11:10:21 +00:00
2022-09-26 15:17:34 +00:00
# include <atomic>
2021-03-04 11:10:21 +00:00
# include <unordered_map>
namespace DB
{
2021-09-09 16:10:53 +00:00
/// A queue, that stores data for insert queries and periodically flushes it to tables.
/// The data is grouped by table, format and settings of insert query.
2021-08-31 02:16:02 +00:00
class AsynchronousInsertQueue : public WithContext
2021-03-04 11:10:21 +00:00
{
2021-08-31 02:16:02 +00:00
public :
using Milliseconds = std : : chrono : : milliseconds ;
2022-09-14 20:31:19 +00:00
AsynchronousInsertQueue ( ContextPtr context_ , size_t pool_size , Milliseconds cleanup_timeout ) ;
2021-08-31 02:16:02 +00:00
~ AsynchronousInsertQueue ( ) ;
2021-09-01 23:18:09 +00:00
void push ( ASTPtr query , ContextPtr query_context ) ;
2021-08-31 02:16:02 +00:00
void waitForProcessingQuery ( const String & query_id , const Milliseconds & timeout ) ;
private :
struct InsertQuery
{
ASTPtr query ;
Settings settings ;
2021-09-04 00:57:05 +00:00
InsertQuery ( const ASTPtr & query_ , const Settings & settings_ ) ;
InsertQuery ( const InsertQuery & other ) ;
InsertQuery & operator = ( const InsertQuery & other ) ;
2021-08-31 02:16:02 +00:00
bool operator = = ( const InsertQuery & other ) const ;
struct Hash { UInt64 operator ( ) ( const InsertQuery & insert_query ) const ; } ;
} ;
struct InsertData
{
struct Entry
2021-04-19 14:51:26 +00:00
{
2021-08-31 02:16:02 +00:00
public :
2021-09-10 10:24:09 +00:00
const String bytes ;
const String query_id ;
2022-10-03 18:52:14 +00:00
std : : chrono : : time_point < std : : chrono : : system_clock > create_time ;
2021-09-10 10:24:09 +00:00
Entry ( String & & bytes_ , String & & query_id_ ) ;
2021-08-31 02:16:02 +00:00
void finish ( std : : exception_ptr exception_ = nullptr ) ;
2021-09-01 23:18:09 +00:00
bool wait ( const Milliseconds & timeout ) const ;
bool isFinished ( ) const ;
std : : exception_ptr getException ( ) const ;
2021-08-31 02:16:02 +00:00
private :
2021-09-01 23:18:09 +00:00
mutable std : : mutex mutex ;
mutable std : : condition_variable cv ;
bool finished = false ;
std : : exception_ptr exception ;
2021-04-19 14:51:26 +00:00
} ;
2022-08-26 20:29:26 +00:00
explicit InsertData ( std : : chrono : : steady_clock : : time_point now )
: first_update ( now )
{ }
2021-08-31 02:16:02 +00:00
using EntryPtr = std : : shared_ptr < Entry > ;
2021-03-04 11:10:21 +00:00
2021-08-31 02:16:02 +00:00
std : : list < EntryPtr > entries ;
size_t size = 0 ;
2022-11-24 15:31:16 +00:00
size_t query_number = 0 ;
2021-03-04 11:10:21 +00:00
2021-08-31 02:16:02 +00:00
/// Timestamp of the first insert into queue, or after the last queue dump.
/// Used to detect for how long the queue is active, so we can dump it by timer.
2022-08-26 20:29:26 +00:00
std : : chrono : : time_point < std : : chrono : : steady_clock > first_update ;
2021-08-31 02:16:02 +00:00
} ;
2021-08-27 21:29:10 +00:00
2021-08-31 02:16:02 +00:00
using InsertDataPtr = std : : unique_ptr < InsertData > ;
2021-03-04 11:10:21 +00:00
2021-09-09 16:10:53 +00:00
/// A separate container, that holds a data and a mutex for it.
/// When it's needed to process current chunk of data, it can be moved for processing
/// and new data can be recreated without holding a lock during processing.
2021-08-31 02:16:02 +00:00
struct Container
{
std : : mutex mutex ;
InsertDataPtr data ;
} ;
2021-03-04 11:10:21 +00:00
2021-08-31 02:16:02 +00:00
using Queue = std : : unordered_map < InsertQuery , std : : shared_ptr < Container > , InsertQuery : : Hash > ;
using QueueIterator = Queue : : iterator ;
2022-08-26 20:29:26 +00:00
/// Ordered container
using DeadlineQueue = std : : map < std : : chrono : : steady_clock : : time_point , QueueIterator > ;
2021-08-31 02:16:02 +00:00
2021-09-03 16:46:09 +00:00
mutable std : : shared_mutex rwlock ;
2021-08-31 02:16:02 +00:00
Queue queue ;
2021-03-04 11:10:21 +00:00
2022-09-26 15:17:34 +00:00
/// This is needed only for using inside cleanup() function and correct signaling about shutdown
mutable std : : mutex cleanup_mutex ;
mutable std : : condition_variable cleanup_can_run ;
2022-09-14 20:31:19 +00:00
2022-08-26 20:29:26 +00:00
mutable std : : mutex deadline_mutex ;
mutable std : : condition_variable are_tasks_available ;
DeadlineQueue deadline_queue ;
2021-09-01 23:18:09 +00:00
using QueryIdToEntry = std : : unordered_map < String , InsertData : : EntryPtr > ;
2021-09-03 16:46:09 +00:00
mutable std : : mutex currently_processing_mutex ;
2021-09-01 23:18:09 +00:00
QueryIdToEntry currently_processing_queries ;
2021-04-19 14:51:26 +00:00
2021-08-31 02:16:02 +00:00
/// Logic and events behind queue are as follows:
/// - busy_timeout: if queue is active for too long and there are a lot of rapid inserts, then we dump the data, so it doesn't
/// grow for a long period of time and users will be able to select new data in deterministic manner.
/// - stale_timeout: if queue is stale for too long, then we dump the data too, so that users will be able to select the last
/// piece of inserted data.
2022-08-26 12:21:30 +00:00
///
/// During processing incoming INSERT queries we can also check whether the maximum size of data in buffer is reached (async_insert_max_data_size setting)
/// If so, then again we dump the data.
2021-03-04 11:10:21 +00:00
2022-09-14 20:31:19 +00:00
const Milliseconds cleanup_timeout ;
2021-03-04 11:10:21 +00:00
2022-09-26 15:17:34 +00:00
std : : atomic < bool > shutdown { false } ;
2022-09-08 21:00:06 +00:00
2021-08-31 02:16:02 +00:00
ThreadPool pool ; /// dump the data only inside this pool.
ThreadFromGlobalPool dump_by_first_update_thread ; /// uses busy_timeout and busyCheck()
2021-09-09 16:10:53 +00:00
ThreadFromGlobalPool cleanup_thread ; /// uses busy_timeout and cleanup()
2021-03-04 11:10:21 +00:00
2021-08-31 02:16:02 +00:00
Poco : : Logger * log = & Poco : : Logger : : get ( " AsynchronousInsertQueue " ) ;
2021-03-04 11:10:21 +00:00
2021-08-31 02:16:02 +00:00
void busyCheck ( ) ;
void cleanup ( ) ;
2021-04-19 14:51:26 +00:00
2021-09-14 23:54:10 +00:00
/// Should be called with shared or exclusively locked 'rwlock'.
void pushImpl ( InsertData : : EntryPtr entry , QueueIterator it ) ;
2021-09-27 16:39:32 +00:00
void scheduleDataProcessingJob ( const InsertQuery & key , InsertDataPtr data , ContextPtr global_context ) ;
2021-08-31 02:16:02 +00:00
static void processData ( InsertQuery key , InsertDataPtr data , ContextPtr global_context ) ;
2021-09-01 23:18:09 +00:00
2021-09-15 14:19:28 +00:00
template < typename E >
2021-09-15 14:47:22 +00:00
static void finishWithException ( const ASTPtr & query , const std : : list < InsertData : : EntryPtr > & entries , const E & exception ) ;
2021-09-15 14:19:28 +00:00
2022-09-08 21:00:06 +00:00
/// @param timeout - time to wait
/// @return true if shutdown requested
bool waitForShutdown ( const Milliseconds & timeout ) ;
2021-09-01 23:18:09 +00:00
public :
Fix race between INSERT async_insert=1 and system.asynchronous_inserts
CI report [1]:
[c190f600f8c6] 2022.03.02 01:07:34.553012 [ 23552 ] {76b6113b-1479-46c9-90ab-e78a3c9f3dbb} executeQuery: Code: 60. DB::Exception: Both table name and UUID are empty. (UNKNOWN_TABLE) (version 22.3.1.1) (from [::1]:42040) (comment: '02015_async_inserts_stress_long.sh') (in query: SELECT * FROM system.asynchronous_inserts FORMAT Null), Stack trace (when copying this message, always include the lines below):
0. ClickHouse/contrib/libcxx/include/exception:133: Poco::Exception::Exception(std::__1::basic_string, std::__1::allocator > const&, int) @ 0xf50e04c in /fasttest-workspace/build/programs/clickhouse
1. ClickHouse/src/Common/Exception.cpp:58: DB::Exception::Exception(std::__1::basic_string, std::__1::allocator > const&, int, bool) @ 0x663ebfa in /fasttest-workspace/build/programs/clickhouse
2. DB::StorageID::assertNotEmpty() const @ 0xbc08591 in /fasttest-workspace/build/programs/clickhouse
3. ClickHouse/contrib/libcxx/include/string:1444: DB::StorageID::getDatabaseName() const @ 0xe50d2b6 in /fasttest-workspace/build/programs/clickhouse
4. ClickHouse/contrib/libcxx/include/string:1957: DB::StorageSystemAsynchronousInserts::fillData(std::__1::vector::mutable_ptr, std::__1::allocator::mutable_ptr > >&, std::__1::shared_ptr, DB::SelectQueryInfo const&) const @ 0xdac636c in /fasttest-workspace/build/programs/clickhouse
[1]: https://s3.amazonaws.com/clickhouse-test-reports/34973/e6fc6a22d5c018961c18247242dd3a40b8c54ff2/fast_test__actions_.html
Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
2022-03-02 12:14:56 +00:00
auto getQueueLocked ( ) const
2021-09-03 16:46:09 +00:00
{
std : : shared_lock lock ( rwlock ) ;
Fix race between INSERT async_insert=1 and system.asynchronous_inserts
CI report [1]:
[c190f600f8c6] 2022.03.02 01:07:34.553012 [ 23552 ] {76b6113b-1479-46c9-90ab-e78a3c9f3dbb} executeQuery: Code: 60. DB::Exception: Both table name and UUID are empty. (UNKNOWN_TABLE) (version 22.3.1.1) (from [::1]:42040) (comment: '02015_async_inserts_stress_long.sh') (in query: SELECT * FROM system.asynchronous_inserts FORMAT Null), Stack trace (when copying this message, always include the lines below):
0. ClickHouse/contrib/libcxx/include/exception:133: Poco::Exception::Exception(std::__1::basic_string, std::__1::allocator > const&, int) @ 0xf50e04c in /fasttest-workspace/build/programs/clickhouse
1. ClickHouse/src/Common/Exception.cpp:58: DB::Exception::Exception(std::__1::basic_string, std::__1::allocator > const&, int, bool) @ 0x663ebfa in /fasttest-workspace/build/programs/clickhouse
2. DB::StorageID::assertNotEmpty() const @ 0xbc08591 in /fasttest-workspace/build/programs/clickhouse
3. ClickHouse/contrib/libcxx/include/string:1444: DB::StorageID::getDatabaseName() const @ 0xe50d2b6 in /fasttest-workspace/build/programs/clickhouse
4. ClickHouse/contrib/libcxx/include/string:1957: DB::StorageSystemAsynchronousInserts::fillData(std::__1::vector::mutable_ptr, std::__1::allocator::mutable_ptr > >&, std::__1::shared_ptr, DB::SelectQueryInfo const&) const @ 0xdac636c in /fasttest-workspace/build/programs/clickhouse
[1]: https://s3.amazonaws.com/clickhouse-test-reports/34973/e6fc6a22d5c018961c18247242dd3a40b8c54ff2/fast_test__actions_.html
Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
2022-03-02 12:14:56 +00:00
return std : : make_pair ( std : : ref ( queue ) , std : : move ( lock ) ) ;
2021-09-03 16:46:09 +00:00
}
2021-03-04 11:10:21 +00:00
} ;
}