ClickHouse/src/Interpreters/AsynchronousInsertQueue.h

148 lines
5.1 KiB
C++
Raw Normal View History

2021-03-04 11:10:21 +00:00
#pragma once
#include <Parsers/IAST_fwd.h>
#include <Common/ThreadPool.h>
2021-03-17 14:11:47 +00:00
#include <Core/Settings.h>
2021-08-31 02:16:02 +00:00
#include <Poco/Logger.h>
2021-03-04 11:10:21 +00:00
#include <unordered_map>
namespace DB
{
2021-09-09 16:10:53 +00:00
/// A queue, that stores data for insert queries and periodically flushes it to tables.
/// The data is grouped by table, format and settings of insert query.
2021-08-31 02:16:02 +00:00
class AsynchronousInsertQueue : public WithContext
2021-03-04 11:10:21 +00:00
{
2021-08-31 02:16:02 +00:00
public:
using Milliseconds = std::chrono::milliseconds;
/// Using structure to allow and benefit from designated initialization and not mess with a positional arguments in ctor.
struct Timeout
{
Milliseconds busy;
Milliseconds stale;
2021-08-31 02:16:02 +00:00
};
AsynchronousInsertQueue(ContextPtr context_, size_t pool_size, size_t max_data_size, const Timeout & timeouts);
~AsynchronousInsertQueue();
2021-09-01 23:18:09 +00:00
void push(ASTPtr query, ContextPtr query_context);
2021-08-31 02:16:02 +00:00
void waitForProcessingQuery(const String & query_id, const Milliseconds & timeout);
private:
struct InsertQuery
{
ASTPtr query;
Settings settings;
2021-09-04 00:57:05 +00:00
InsertQuery(const ASTPtr & query_, const Settings & settings_);
InsertQuery(const InsertQuery & other);
InsertQuery & operator=(const InsertQuery & other);
2021-08-31 02:16:02 +00:00
bool operator==(const InsertQuery & other) const;
struct Hash { UInt64 operator()(const InsertQuery & insert_query) const; };
};
struct InsertData
{
struct Entry
2021-04-19 14:51:26 +00:00
{
2021-08-31 02:16:02 +00:00
public:
2021-09-10 10:24:09 +00:00
const String bytes;
const String query_id;
Entry(String && bytes_, String && query_id_);
2021-08-31 02:16:02 +00:00
void finish(std::exception_ptr exception_ = nullptr);
2021-09-01 23:18:09 +00:00
bool wait(const Milliseconds & timeout) const;
bool isFinished() const;
std::exception_ptr getException() const;
2021-08-31 02:16:02 +00:00
private:
2021-09-01 23:18:09 +00:00
mutable std::mutex mutex;
mutable std::condition_variable cv;
bool finished = false;
std::exception_ptr exception;
2021-04-19 14:51:26 +00:00
};
2021-08-31 02:16:02 +00:00
using EntryPtr = std::shared_ptr<Entry>;
2021-03-04 11:10:21 +00:00
2021-08-31 02:16:02 +00:00
std::list<EntryPtr> entries;
size_t size = 0;
2021-03-04 11:10:21 +00:00
2021-08-31 02:16:02 +00:00
/// Timestamp of the first insert into queue, or after the last queue dump.
/// Used to detect for how long the queue is active, so we can dump it by timer.
std::chrono::time_point<std::chrono::steady_clock> first_update = std::chrono::steady_clock::now();
2021-08-31 02:16:02 +00:00
/// Timestamp of the last insert into queue.
/// Used to detect for how long the queue is stale, so we can dump it by another timer.
std::chrono::time_point<std::chrono::steady_clock> last_update;
};
2021-08-27 21:29:10 +00:00
2021-08-31 02:16:02 +00:00
using InsertDataPtr = std::unique_ptr<InsertData>;
2021-03-04 11:10:21 +00:00
2021-09-09 16:10:53 +00:00
/// A separate container, that holds a data and a mutex for it.
/// When it's needed to process current chunk of data, it can be moved for processing
/// and new data can be recreated without holding a lock during processing.
2021-08-31 02:16:02 +00:00
struct Container
{
std::mutex mutex;
InsertDataPtr data;
};
2021-03-04 11:10:21 +00:00
2021-08-31 02:16:02 +00:00
using Queue = std::unordered_map<InsertQuery, std::shared_ptr<Container>, InsertQuery::Hash>;
using QueueIterator = Queue::iterator;
2021-09-03 16:46:09 +00:00
mutable std::shared_mutex rwlock;
2021-08-31 02:16:02 +00:00
Queue queue;
2021-03-04 11:10:21 +00:00
2021-09-01 23:18:09 +00:00
using QueryIdToEntry = std::unordered_map<String, InsertData::EntryPtr>;
2021-09-03 16:46:09 +00:00
mutable std::mutex currently_processing_mutex;
2021-09-01 23:18:09 +00:00
QueryIdToEntry currently_processing_queries;
2021-04-19 14:51:26 +00:00
2021-08-31 02:16:02 +00:00
/// Logic and events behind queue are as follows:
/// - busy_timeout: if queue is active for too long and there are a lot of rapid inserts, then we dump the data, so it doesn't
/// grow for a long period of time and users will be able to select new data in deterministic manner.
/// - stale_timeout: if queue is stale for too long, then we dump the data too, so that users will be able to select the last
/// piece of inserted data.
/// - max_data_size: if the maximum size of data is reached, then again we dump the data.
2021-03-04 11:10:21 +00:00
2021-08-31 02:16:02 +00:00
const size_t max_data_size; /// in bytes
const Milliseconds busy_timeout;
const Milliseconds stale_timeout;
2021-03-04 11:10:21 +00:00
2021-08-31 02:16:02 +00:00
std::atomic<bool> shutdown{false};
ThreadPool pool; /// dump the data only inside this pool.
ThreadFromGlobalPool dump_by_first_update_thread; /// uses busy_timeout and busyCheck()
ThreadFromGlobalPool dump_by_last_update_thread; /// uses stale_timeout and staleCheck()
2021-09-09 16:10:53 +00:00
ThreadFromGlobalPool cleanup_thread; /// uses busy_timeout and cleanup()
2021-03-04 11:10:21 +00:00
2021-08-31 02:16:02 +00:00
Poco::Logger * log = &Poco::Logger::get("AsynchronousInsertQueue");
2021-03-04 11:10:21 +00:00
2021-08-31 02:16:02 +00:00
void busyCheck();
void staleCheck();
void cleanup();
2021-04-19 14:51:26 +00:00
/// Should be called with shared or exclusively locked 'rwlock'.
void pushImpl(InsertData::EntryPtr entry, QueueIterator it);
void scheduleDataProcessingJob(const InsertQuery & key, InsertDataPtr data, ContextPtr global_context);
2021-08-31 02:16:02 +00:00
static void processData(InsertQuery key, InsertDataPtr data, ContextPtr global_context);
2021-09-01 23:18:09 +00:00
template <typename E>
static void finishWithException(const ASTPtr & query, const std::list<InsertData::EntryPtr> & entries, const E & exception);
2021-09-01 23:18:09 +00:00
public:
Fix race between INSERT async_insert=1 and system.asynchronous_inserts CI report [1]: [c190f600f8c6] 2022.03.02 01:07:34.553012 [ 23552 ] {76b6113b-1479-46c9-90ab-e78a3c9f3dbb} executeQuery: Code: 60. DB::Exception: Both table name and UUID are empty. (UNKNOWN_TABLE) (version 22.3.1.1) (from [::1]:42040) (comment: '02015_async_inserts_stress_long.sh') (in query: SELECT * FROM system.asynchronous_inserts FORMAT Null), Stack trace (when copying this message, always include the lines below): 0. ClickHouse/contrib/libcxx/include/exception:133: Poco::Exception::Exception(std::__1::basic_string, std::__1::allocator > const&, int) @ 0xf50e04c in /fasttest-workspace/build/programs/clickhouse 1. ClickHouse/src/Common/Exception.cpp:58: DB::Exception::Exception(std::__1::basic_string, std::__1::allocator > const&, int, bool) @ 0x663ebfa in /fasttest-workspace/build/programs/clickhouse 2. DB::StorageID::assertNotEmpty() const @ 0xbc08591 in /fasttest-workspace/build/programs/clickhouse 3. ClickHouse/contrib/libcxx/include/string:1444: DB::StorageID::getDatabaseName() const @ 0xe50d2b6 in /fasttest-workspace/build/programs/clickhouse 4. ClickHouse/contrib/libcxx/include/string:1957: DB::StorageSystemAsynchronousInserts::fillData(std::__1::vector::mutable_ptr, std::__1::allocator::mutable_ptr > >&, std::__1::shared_ptr, DB::SelectQueryInfo const&) const @ 0xdac636c in /fasttest-workspace/build/programs/clickhouse [1]: https://s3.amazonaws.com/clickhouse-test-reports/34973/e6fc6a22d5c018961c18247242dd3a40b8c54ff2/fast_test__actions_.html Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
2022-03-02 12:14:56 +00:00
auto getQueueLocked() const
2021-09-03 16:46:09 +00:00
{
std::shared_lock lock(rwlock);
Fix race between INSERT async_insert=1 and system.asynchronous_inserts CI report [1]: [c190f600f8c6] 2022.03.02 01:07:34.553012 [ 23552 ] {76b6113b-1479-46c9-90ab-e78a3c9f3dbb} executeQuery: Code: 60. DB::Exception: Both table name and UUID are empty. (UNKNOWN_TABLE) (version 22.3.1.1) (from [::1]:42040) (comment: '02015_async_inserts_stress_long.sh') (in query: SELECT * FROM system.asynchronous_inserts FORMAT Null), Stack trace (when copying this message, always include the lines below): 0. ClickHouse/contrib/libcxx/include/exception:133: Poco::Exception::Exception(std::__1::basic_string, std::__1::allocator > const&, int) @ 0xf50e04c in /fasttest-workspace/build/programs/clickhouse 1. ClickHouse/src/Common/Exception.cpp:58: DB::Exception::Exception(std::__1::basic_string, std::__1::allocator > const&, int, bool) @ 0x663ebfa in /fasttest-workspace/build/programs/clickhouse 2. DB::StorageID::assertNotEmpty() const @ 0xbc08591 in /fasttest-workspace/build/programs/clickhouse 3. ClickHouse/contrib/libcxx/include/string:1444: DB::StorageID::getDatabaseName() const @ 0xe50d2b6 in /fasttest-workspace/build/programs/clickhouse 4. ClickHouse/contrib/libcxx/include/string:1957: DB::StorageSystemAsynchronousInserts::fillData(std::__1::vector::mutable_ptr, std::__1::allocator::mutable_ptr > >&, std::__1::shared_ptr, DB::SelectQueryInfo const&) const @ 0xdac636c in /fasttest-workspace/build/programs/clickhouse [1]: https://s3.amazonaws.com/clickhouse-test-reports/34973/e6fc6a22d5c018961c18247242dd3a40b8c54ff2/fast_test__actions_.html Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
2022-03-02 12:14:56 +00:00
return std::make_pair(std::ref(queue), std::move(lock));
2021-09-03 16:46:09 +00:00
}
2021-03-04 11:10:21 +00:00
};
}