Simplify AsynchronousInsertQueue and fix race (#43233)

This commit is contained in:
Anton Popov 2022-11-25 15:02:22 +01:00 committed by GitHub
parent 8d3ccf1c52
commit c811f34a41
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 301 additions and 370 deletions

View File

@ -1475,8 +1475,7 @@ try
if (settings.async_insert_threads)
global_context->setAsynchronousInsertQueue(std::make_shared<AsynchronousInsertQueue>(
global_context,
settings.async_insert_threads,
settings.async_insert_cleanup_timeout_ms));
settings.async_insert_threads));
/// Size of cache for marks (index of MergeTree family of tables).
size_t mark_cache_size = config().getUInt64("mark_cache_size", 5368709120);

View File

@ -604,7 +604,6 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
M(Seconds, wait_for_async_insert_timeout, DBMS_DEFAULT_LOCK_ACQUIRE_TIMEOUT_SEC, "Timeout for waiting for processing asynchronous insertion", 0) \
M(UInt64, async_insert_max_data_size, 1000000, "Maximum size in bytes of unparsed data collected per query before being inserted", 0) \
M(Milliseconds, async_insert_busy_timeout_ms, 200, "Maximum time to wait before dumping collected data per query since the first data appeared", 0) \
M(Milliseconds, async_insert_cleanup_timeout_ms, 1000, "Time to wait before each iteration of cleaning up buffers for INSERT queries which don't appear anymore. Only has meaning at server startup.", 0) \
\
M(UInt64, remote_fs_read_max_backoff_ms, 10000, "Max wait time when trying to read data for remote disk", 0) \
M(UInt64, remote_fs_read_backoff_max_tries, 5, "Max attempts to read with backoff", 0) \
@ -705,6 +704,7 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
MAKE_OBSOLETE(M, DefaultDatabaseEngine, default_database_engine, DefaultDatabaseEngine::Atomic) \
MAKE_OBSOLETE(M, UInt64, max_pipeline_depth, 0) \
MAKE_OBSOLETE(M, Seconds, temporary_live_view_timeout, 1) \
MAKE_OBSOLETE(M, Milliseconds, async_insert_cleanup_timeout_ms, 1000) \
/** The section above is for obsolete settings. Do not add anything there. */

View File

@ -48,15 +48,22 @@ namespace ErrorCodes
extern const int TIMEOUT_EXCEEDED;
extern const int UNKNOWN_EXCEPTION;
extern const int UNKNOWN_FORMAT;
extern const int BAD_ARGUMENTS;
}
AsynchronousInsertQueue::InsertQuery::InsertQuery(const ASTPtr & query_, const Settings & settings_)
: query(query_->clone()), settings(settings_)
: query(query_->clone())
, query_str(queryToString(query))
, settings(settings_)
, hash(calculateHash())
{
}
AsynchronousInsertQueue::InsertQuery::InsertQuery(const InsertQuery & other)
: query(other.query->clone()), settings(other.settings)
: query(other.query->clone())
, query_str(other.query_str)
, settings(other.settings)
, hash(other.hash)
{
}
@ -66,29 +73,33 @@ AsynchronousInsertQueue::InsertQuery::operator=(const InsertQuery & other)
if (this != &other)
{
query = other.query->clone();
query_str = other.query_str;
settings = other.settings;
hash = other.hash;
}
return *this;
}
UInt64 AsynchronousInsertQueue::InsertQuery::Hash::operator()(const InsertQuery & insert_query) const
UInt128 AsynchronousInsertQueue::InsertQuery::calculateHash() const
{
SipHash hash;
insert_query.query->updateTreeHash(hash);
SipHash siphash;
query->updateTreeHash(siphash);
for (const auto & setting : insert_query.settings.allChanged())
for (const auto & setting : settings.allChanged())
{
hash.update(setting.getName());
applyVisitor(FieldVisitorHash(hash), setting.getValue());
siphash.update(setting.getName());
applyVisitor(FieldVisitorHash(siphash), setting.getValue());
}
return hash.get64();
UInt128 res;
siphash.get128(res);
return res;
}
bool AsynchronousInsertQueue::InsertQuery::operator==(const InsertQuery & other) const
{
return queryToString(query) == queryToString(other.query) && settings == other.settings;
return query_str == other.query_str && settings == other.settings;
}
AsynchronousInsertQueue::InsertData::Entry::Entry(String && bytes_, String && query_id_)
@ -100,43 +111,31 @@ AsynchronousInsertQueue::InsertData::Entry::Entry(String && bytes_, String && qu
void AsynchronousInsertQueue::InsertData::Entry::finish(std::exception_ptr exception_)
{
std::lock_guard lock(mutex);
finished = true;
if (finished.exchange(true))
return;
if (exception_)
{
promise.set_exception(exception_);
ProfileEvents::increment(ProfileEvents::FailedAsyncInsertQuery, 1);
exception = exception_;
cv.notify_all();
}
else
{
promise.set_value();
}
}
bool AsynchronousInsertQueue::InsertData::Entry::wait(const Milliseconds & timeout) const
{
std::unique_lock lock(mutex);
return cv.wait_for(lock, timeout, [&] { return finished; });
}
bool AsynchronousInsertQueue::InsertData::Entry::isFinished() const
{
std::lock_guard lock(mutex);
return finished;
}
std::exception_ptr AsynchronousInsertQueue::InsertData::Entry::getException() const
{
std::lock_guard lock(mutex);
return exception;
}
AsynchronousInsertQueue::AsynchronousInsertQueue(ContextPtr context_, size_t pool_size, Milliseconds cleanup_timeout_)
AsynchronousInsertQueue::AsynchronousInsertQueue(ContextPtr context_, size_t pool_size_)
: WithContext(context_)
, cleanup_timeout(cleanup_timeout_)
, pool_size(pool_size_)
, queue_shards(pool_size)
, pool(pool_size)
, dump_by_first_update_thread(&AsynchronousInsertQueue::busyCheck, this)
, cleanup_thread(&AsynchronousInsertQueue::cleanup, this)
{
using namespace std::chrono;
if (!pool_size)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "pool_size cannot be zero");
assert(pool_size);
for (size_t i = 0; i < pool_size; ++i)
dump_by_first_update_threads.emplace_back([this, i] { processBatchDeadlines(i); });
}
AsynchronousInsertQueue::~AsynchronousInsertQueue()
@ -144,34 +143,31 @@ AsynchronousInsertQueue::~AsynchronousInsertQueue()
/// TODO: add a setting for graceful shutdown.
LOG_TRACE(log, "Shutting down the asynchronous insertion queue");
shutdown = true;
{
std::lock_guard lock(deadline_mutex);
are_tasks_available.notify_one();
}
{
std::lock_guard lock(cleanup_mutex);
cleanup_can_run.notify_one();
}
assert(dump_by_first_update_thread.joinable());
dump_by_first_update_thread.join();
for (size_t i = 0; i < pool_size; ++i)
{
auto & shard = queue_shards[i];
assert(cleanup_thread.joinable());
cleanup_thread.join();
shard.are_tasks_available.notify_one();
assert(dump_by_first_update_threads[i].joinable());
dump_by_first_update_threads[i].join();
{
std::lock_guard lock(shard.mutex);
for (auto & [_, elem] : shard.queue)
{
for (const auto & entry : elem.data->entries)
{
entry->finish(std::make_exception_ptr(Exception(
ErrorCodes::TIMEOUT_EXCEEDED, "Wait for async insert timeout exceeded)")));
}
}
}
}
pool.wait();
std::lock_guard lock(currently_processing_mutex);
for (const auto & [_, entry] : currently_processing_queries)
{
if (!entry->isFinished())
entry->finish(std::make_exception_ptr(Exception(
ErrorCodes::TIMEOUT_EXCEEDED,
"Wait for async insert timeout exceeded)")));
}
LOG_TRACE(log, "Asynchronous insertion queue finished");
}
@ -185,7 +181,7 @@ void AsynchronousInsertQueue::scheduleDataProcessingJob(const InsertQuery & key,
});
}
void AsynchronousInsertQueue::push(ASTPtr query, ContextPtr query_context)
std::future<void> AsynchronousInsertQueue::push(ASTPtr query, ContextPtr query_context)
{
query = query->clone();
const auto & settings = query_context->getSettingsRef();
@ -214,97 +210,77 @@ void AsynchronousInsertQueue::push(ASTPtr query, ContextPtr query_context)
quota->used(QuotaType::WRITTEN_BYTES, bytes.size());
auto entry = std::make_shared<InsertData::Entry>(std::move(bytes), query_context->getCurrentQueryId());
InsertQuery key{query, settings};
InsertDataPtr data_to_process;
std::future<void> insert_future;
auto shard_num = key.hash % pool_size;
auto & shard = queue_shards[shard_num];
{
/// Firstly try to get entry from queue without exclusive lock.
std::shared_lock read_lock(rwlock);
if (auto it = queue.find(key); it != queue.end())
std::lock_guard lock(shard.mutex);
auto [it, inserted] = shard.iterators.try_emplace(key.hash);
if (inserted)
{
pushImpl(std::move(entry), it);
return;
auto now = std::chrono::steady_clock::now();
auto timeout = now + Milliseconds{key.settings.async_insert_busy_timeout_ms};
it->second = shard.queue.emplace(timeout, Container{key, std::make_unique<InsertData>()}).first;
}
auto queue_it = it->second;
auto & data = queue_it->second.data;
size_t entry_data_size = entry->bytes.size();
assert(data);
data->size_in_bytes += entry_data_size;
data->entries.emplace_back(entry);
insert_future = entry->getFuture();
LOG_TRACE(log, "Have {} pending inserts with total {} bytes of data for query '{}'",
data->entries.size(), data->size_in_bytes, key.query_str);
/// Here we check whether we hit the limit on maximum data size in the buffer.
/// And use setting from query context.
/// It works, because queries with the same set of settings are already grouped together.
if (data->size_in_bytes > key.settings.async_insert_max_data_size)
{
data_to_process = std::move(data);
shard.iterators.erase(it);
shard.queue.erase(queue_it);
}
CurrentMetrics::add(CurrentMetrics::PendingAsyncInsert);
ProfileEvents::increment(ProfileEvents::AsyncInsertQuery);
ProfileEvents::increment(ProfileEvents::AsyncInsertBytes, entry_data_size);
}
std::lock_guard write_lock(rwlock);
auto it = queue.emplace(key, std::make_shared<Container>()).first;
pushImpl(std::move(entry), it);
if (data_to_process)
scheduleDataProcessingJob(key, std::move(data_to_process), getContext());
else
shard.are_tasks_available.notify_one();
return insert_future;
}
void AsynchronousInsertQueue::pushImpl(InsertData::EntryPtr entry, QueueIterator it)
void AsynchronousInsertQueue::processBatchDeadlines(size_t shard_num)
{
auto & [data_mutex, data] = *it->second;
std::lock_guard data_lock(data_mutex);
auto & shard = queue_shards[shard_num];
if (!data)
{
auto now = std::chrono::steady_clock::now();
data = std::make_unique<InsertData>(now);
std::lock_guard lock(deadline_mutex);
deadline_queue.insert({now + Milliseconds{it->first.settings.async_insert_busy_timeout_ms}, it});
are_tasks_available.notify_one();
}
size_t entry_data_size = entry->bytes.size();
data->size += entry_data_size;
data->entries.emplace_back(entry);
{
std::lock_guard currently_processing_lock(currently_processing_mutex);
currently_processing_queries.emplace(entry->query_id, entry);
}
LOG_TRACE(log, "Have {} pending inserts with total {} bytes of data for query '{}'",
data->entries.size(), data->size, queryToString(it->first.query));
/// Here we check whether we hit the limit on maximum data size in the buffer.
/// And use setting from query context!
/// It works, because queries with the same set of settings are already grouped together.
if (data->size > it->first.settings.async_insert_max_data_size)
scheduleDataProcessingJob(it->first, std::move(data), getContext());
CurrentMetrics::add(CurrentMetrics::PendingAsyncInsert);
ProfileEvents::increment(ProfileEvents::AsyncInsertQuery);
ProfileEvents::increment(ProfileEvents::AsyncInsertBytes, entry_data_size);
}
void AsynchronousInsertQueue::waitForProcessingQuery(const String & query_id, const Milliseconds & timeout)
{
InsertData::EntryPtr entry;
{
std::lock_guard lock(currently_processing_mutex);
auto it = currently_processing_queries.find(query_id);
if (it == currently_processing_queries.end())
return;
entry = it->second;
}
bool finished = entry->wait(timeout);
if (!finished)
throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Wait for async insert timeout ({} ms) exceeded)", timeout.count());
if (auto exception = entry->getException())
std::rethrow_exception(exception);
}
void AsynchronousInsertQueue::busyCheck()
{
while (!shutdown)
{
std::vector<QueueIterator> entries_to_flush;
std::vector<Container> entries_to_flush;
{
std::unique_lock deadline_lock(deadline_mutex);
are_tasks_available.wait_for(deadline_lock, Milliseconds(getContext()->getSettingsRef().async_insert_busy_timeout_ms), [this]()
std::unique_lock lock(shard.mutex);
shard.are_tasks_available.wait_for(lock,
Milliseconds(getContext()->getSettingsRef().async_insert_busy_timeout_ms), [&shard, this]
{
if (shutdown)
return true;
if (!deadline_queue.empty() && deadline_queue.begin()->first < std::chrono::steady_clock::now())
if (!shard.queue.empty() && shard.queue.begin()->first < std::chrono::steady_clock::now())
return true;
return false;
@ -317,91 +293,22 @@ void AsynchronousInsertQueue::busyCheck()
while (true)
{
if (deadline_queue.empty() || deadline_queue.begin()->first > now)
if (shard.queue.empty() || shard.queue.begin()->first > now)
break;
entries_to_flush.emplace_back(deadline_queue.begin()->second);
deadline_queue.erase(deadline_queue.begin());
auto it = shard.queue.begin();
shard.iterators.erase(it->second.key.hash);
entries_to_flush.emplace_back(std::move(it->second));
shard.queue.erase(it);
}
}
std::shared_lock read_lock(rwlock);
for (auto & entry : entries_to_flush)
{
auto & [key, elem] = *entry;
std::lock_guard data_lock(elem->mutex);
if (!elem->data)
continue;
scheduleDataProcessingJob(key, std::move(elem->data), getContext());
}
scheduleDataProcessingJob(entry.key, std::move(entry.data), getContext());
}
}
void AsynchronousInsertQueue::cleanup()
{
while (true)
{
{
std::unique_lock cleanup_lock(cleanup_mutex);
cleanup_can_run.wait_for(cleanup_lock, Milliseconds(cleanup_timeout), [this]() -> bool { return shutdown; });
if (shutdown)
return;
}
std::vector<InsertQuery> keys_to_remove;
{
std::shared_lock read_lock(rwlock);
for (auto & [key, elem] : queue)
{
std::lock_guard data_lock(elem->mutex);
if (!elem->data)
keys_to_remove.push_back(key);
}
}
if (!keys_to_remove.empty())
{
std::lock_guard write_lock(rwlock);
size_t total_removed = 0;
for (const auto & key : keys_to_remove)
{
auto it = queue.find(key);
if (it != queue.end() && !it->second->data)
{
queue.erase(it);
++total_removed;
}
}
if (total_removed)
LOG_TRACE(log, "Removed stale entries for {} queries from asynchronous insertion queue", total_removed);
}
{
std::vector<String> ids_to_remove;
std::lock_guard lock(currently_processing_mutex);
for (const auto & [query_id, entry] : currently_processing_queries)
if (entry->isFinished())
ids_to_remove.push_back(query_id);
if (!ids_to_remove.empty())
{
for (const auto & id : ids_to_remove)
currently_processing_queries.erase(id);
LOG_TRACE(log, "Removed {} finished entries from asynchronous insertion queue", ids_to_remove.size());
}
}
}
}
static void appendElementsToLogSafe(
AsynchronousInsertLog & log,
std::vector<AsynchronousInsertLogElement> elements,
@ -464,7 +371,7 @@ try
{
current_exception = e.displayText();
LOG_ERROR(log, "Failed parsing for query '{}' with query id {}. {}",
queryToString(key.query), current_entry->query_id, current_exception);
key.query_str, current_entry->query_id, current_exception);
for (const auto & column : result_columns)
if (column->size() > total_rows)
@ -546,7 +453,7 @@ try
completed_executor.execute();
LOG_INFO(log, "Flushed {} rows, {} bytes for query '{}'",
total_rows, total_bytes, queryToString(key.query));
total_rows, total_bytes, key.query_str);
}
catch (...)
{

View File

@ -4,10 +4,7 @@
#include <Common/ThreadPool.h>
#include <Core/Settings.h>
#include <Poco/Logger.h>
#include <atomic>
#include <unordered_map>
#include <future>
namespace DB
{
@ -19,25 +16,29 @@ class AsynchronousInsertQueue : public WithContext
public:
using Milliseconds = std::chrono::milliseconds;
AsynchronousInsertQueue(ContextPtr context_, size_t pool_size, Milliseconds cleanup_timeout);
AsynchronousInsertQueue(ContextPtr context_, size_t pool_size_);
~AsynchronousInsertQueue();
void push(ASTPtr query, ContextPtr query_context);
void waitForProcessingQuery(const String & query_id, const Milliseconds & timeout);
std::future<void> push(ASTPtr query, ContextPtr query_context);
size_t getPoolSize() const { return pool_size; }
private:
struct InsertQuery
{
public:
ASTPtr query;
String query_str;
Settings settings;
UInt128 hash;
InsertQuery(const ASTPtr & query_, const Settings & settings_);
InsertQuery(const InsertQuery & other);
InsertQuery & operator=(const InsertQuery & other);
bool operator==(const InsertQuery & other) const;
struct Hash { UInt64 operator()(const InsertQuery & insert_query) const; };
private:
UInt128 calculateHash() const;
};
struct InsertData
@ -47,109 +48,84 @@ private:
public:
const String bytes;
const String query_id;
std::chrono::time_point<std::chrono::system_clock> create_time;
const std::chrono::time_point<std::chrono::system_clock> create_time;
Entry(String && bytes_, String && query_id_);
void finish(std::exception_ptr exception_ = nullptr);
bool wait(const Milliseconds & timeout) const;
bool isFinished() const;
std::exception_ptr getException() const;
std::future<void> getFuture() { return promise.get_future(); }
bool isFinished() const { return finished; }
private:
mutable std::mutex mutex;
mutable std::condition_variable cv;
bool finished = false;
std::exception_ptr exception;
std::promise<void> promise;
std::atomic_bool finished = false;
};
explicit InsertData(std::chrono::steady_clock::time_point now)
: first_update(now)
{}
using EntryPtr = std::shared_ptr<Entry>;
std::list<EntryPtr> entries;
size_t size = 0;
/// Timestamp of the first insert into queue, or after the last queue dump.
/// Used to detect for how long the queue is active, so we can dump it by timer.
std::chrono::time_point<std::chrono::steady_clock> first_update;
size_t size_in_bytes = 0;
};
using InsertDataPtr = std::unique_ptr<InsertData>;
/// A separate container, that holds a data and a mutex for it.
/// When it's needed to process current chunk of data, it can be moved for processing
/// and new data can be recreated without holding a lock during processing.
struct Container
{
std::mutex mutex;
InsertQuery key;
InsertDataPtr data;
};
using Queue = std::unordered_map<InsertQuery, std::shared_ptr<Container>, InsertQuery::Hash>;
using QueueIterator = Queue::iterator;
/// Ordered container
using DeadlineQueue = std::map<std::chrono::steady_clock::time_point, QueueIterator>;
/// Key is a timestamp of the first insert into batch.
/// Used to detect for how long the batch is active, so we can dump it by timer.
using Queue = std::map<std::chrono::steady_clock::time_point, Container>;
using QueueIterator = Queue::iterator;
using QueueIteratorByKey = std::unordered_map<UInt128, QueueIterator>;
struct QueueShard
{
mutable std::mutex mutex;
mutable std::condition_variable are_tasks_available;
mutable std::shared_mutex rwlock;
Queue queue;
Queue queue;
QueueIteratorByKey iterators;
};
/// This is needed only for using inside cleanup() function and correct signaling about shutdown
mutable std::mutex cleanup_mutex;
mutable std::condition_variable cleanup_can_run;
mutable std::mutex deadline_mutex;
mutable std::condition_variable are_tasks_available;
DeadlineQueue deadline_queue;
using QueryIdToEntry = std::unordered_map<String, InsertData::EntryPtr>;
mutable std::mutex currently_processing_mutex;
QueryIdToEntry currently_processing_queries;
const size_t pool_size;
std::vector<QueueShard> queue_shards;
/// Logic and events behind queue are as follows:
/// - busy_timeout: if queue is active for too long and there are a lot of rapid inserts, then we dump the data, so it doesn't
/// grow for a long period of time and users will be able to select new data in deterministic manner.
/// - stale_timeout: if queue is stale for too long, then we dump the data too, so that users will be able to select the last
/// piece of inserted data.
/// - async_insert_busy_timeout_ms:
/// if queue is active for too long and there are a lot of rapid inserts, then we dump the data, so it doesn't
/// grow for a long period of time and users will be able to select new data in deterministic manner.
///
/// During processing incoming INSERT queries we can also check whether the maximum size of data in buffer is reached (async_insert_max_data_size setting)
/// If so, then again we dump the data.
const Milliseconds cleanup_timeout;
/// During processing incoming INSERT queries we can also check whether the maximum size of data in buffer is reached
/// (async_insert_max_data_size setting). If so, then again we dump the data.
std::atomic<bool> shutdown{false};
ThreadPool pool; /// dump the data only inside this pool.
ThreadFromGlobalPool dump_by_first_update_thread; /// uses busy_timeout and busyCheck()
ThreadFromGlobalPool cleanup_thread; /// uses busy_timeout and cleanup()
/// Dump the data only inside this pool.
ThreadPool pool;
/// Uses async_insert_busy_timeout_ms and processBatchDeadlines()
std::vector<ThreadFromGlobalPool> dump_by_first_update_threads;
Poco::Logger * log = &Poco::Logger::get("AsynchronousInsertQueue");
void busyCheck();
void cleanup();
/// Should be called with shared or exclusively locked 'rwlock'.
void pushImpl(InsertData::EntryPtr entry, QueueIterator it);
void processBatchDeadlines(size_t shard_num);
void scheduleDataProcessingJob(const InsertQuery & key, InsertDataPtr data, ContextPtr global_context);
static void processData(InsertQuery key, InsertDataPtr data, ContextPtr global_context);
template <typename E>
static void finishWithException(const ASTPtr & query, const std::list<InsertData::EntryPtr> & entries, const E & exception);
/// @param timeout - time to wait
/// @return true if shutdown requested
bool waitForShutdown(const Milliseconds & timeout);
public:
auto getQueueLocked() const
auto getQueueLocked(size_t shard_num) const
{
std::shared_lock lock(rwlock);
return std::make_pair(std::ref(queue), std::move(lock));
auto & shard = queue_shards[shard_num];
std::unique_lock lock(shard.mutex);
return std::make_pair(std::ref(shard.queue), std::move(lock));
}
};

View File

@ -592,13 +592,12 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
quota->checkExceeded(QuotaType::ERRORS);
}
queue->push(ast, context);
auto insert_future = queue->push(ast, context);
if (settings.wait_for_async_insert)
{
auto timeout = settings.wait_for_async_insert_timeout.totalMilliseconds();
auto query_id = context->getCurrentQueryId();
auto source = std::make_shared<WaitForAsyncInsertSource>(query_id, timeout, *queue);
auto source = std::make_shared<WaitForAsyncInsertSource>(std::move(insert_future), timeout);
res.pipeline = QueryPipeline(Pipe(std::move(source)));
}

View File

@ -6,18 +6,24 @@
namespace DB
{
namespace ErrorCodes
{
extern const int TIMEOUT_EXCEEDED;
extern const int LOGICAL_ERROR;
}
/// Source, that allow to wait until processing of
/// asynchronous insert for specified query_id will be finished.
class WaitForAsyncInsertSource : public ISource, WithContext
{
public:
WaitForAsyncInsertSource(
const String & query_id_, size_t timeout_ms_, AsynchronousInsertQueue & queue_)
std::future<void> insert_future_, size_t timeout_ms_)
: ISource(Block())
, query_id(query_id_)
, insert_future(std::move(insert_future_))
, timeout_ms(timeout_ms_)
, queue(queue_)
{
assert(insert_future.valid());
}
String getName() const override { return "WaitForAsyncInsert"; }
@ -25,14 +31,20 @@ public:
protected:
Chunk generate() override
{
queue.waitForProcessingQuery(query_id, std::chrono::milliseconds(timeout_ms));
auto status = insert_future.wait_for(std::chrono::milliseconds(timeout_ms));
if (status == std::future_status::deferred)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Logical error: got future in deferred state");
if (status == std::future_status::timeout)
throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Wait for async insert timeout ({} ms) exceeded)", timeout_ms);
insert_future.get();
return Chunk();
}
private:
String query_id;
std::future<void> insert_future;
size_t timeout_ms;
AsynchronousInsertQueue & queue;
};
}

View File

@ -27,8 +27,6 @@ NamesAndTypesList StorageSystemAsynchronousInserts::getNamesAndTypes()
{"total_bytes", std::make_shared<DataTypeUInt64>()},
{"entries.query_id", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>())},
{"entries.bytes", std::make_shared<DataTypeArray>(std::make_shared<DataTypeUInt64>())},
{"entries.finished", std::make_shared<DataTypeArray>(std::make_shared<DataTypeUInt8>())},
{"entries.exception", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>())},
};
}
@ -40,78 +38,56 @@ void StorageSystemAsynchronousInserts::fillData(MutableColumns & res_columns, Co
if (!insert_queue)
return;
auto [queue, queue_lock] = insert_queue->getQueueLocked();
for (const auto & [key, elem] : queue)
for (size_t shard_num = 0; shard_num < insert_queue->getPoolSize(); ++shard_num)
{
std::lock_guard elem_lock(elem->mutex);
auto [queue, queue_lock] = insert_queue->getQueueLocked(shard_num);
if (!elem->data)
continue;
auto time_in_microseconds = [](const time_point<steady_clock> & timestamp)
for (const auto & [first_update, elem] : queue)
{
auto time_diff = duration_cast<microseconds>(steady_clock::now() - timestamp);
auto time_us = (system_clock::now() - time_diff).time_since_epoch().count();
const auto & [key, data] = elem;
DecimalUtils::DecimalComponents<DateTime64> components{time_us / 1'000'000, time_us % 1'000'000};
return DecimalField(DecimalUtils::decimalFromComponents<DateTime64>(components, TIME_SCALE), TIME_SCALE);
};
const auto & insert_query = key.query->as<const ASTInsertQuery &>();
size_t i = 0;
res_columns[i++]->insert(queryToString(insert_query));
/// If query is "INSERT INTO FUNCTION" then table_id is empty.
if (insert_query.table_id)
{
res_columns[i++]->insert(insert_query.table_id.getDatabaseName());
res_columns[i++]->insert(insert_query.table_id.getTableName());
}
else
{
res_columns[i++]->insertDefault();
res_columns[i++]->insertDefault();
}
res_columns[i++]->insert(insert_query.format);
res_columns[i++]->insert(time_in_microseconds(elem->data->first_update));
res_columns[i++]->insert(elem->data->size);
Array arr_query_id;
Array arr_bytes;
Array arr_finished;
Array arr_exception;
for (const auto & entry : elem->data->entries)
{
arr_query_id.push_back(entry->query_id);
arr_bytes.push_back(entry->bytes.size());
arr_finished.push_back(entry->isFinished());
if (auto exception = entry->getException())
auto time_in_microseconds = [](const time_point<steady_clock> & timestamp)
{
try
{
std::rethrow_exception(exception);
}
catch (const Exception & e)
{
arr_exception.push_back(e.displayText());
}
catch (...)
{
arr_exception.push_back("Unknown exception");
}
auto time_diff = duration_cast<microseconds>(steady_clock::now() - timestamp);
auto time_us = (system_clock::now() - time_diff).time_since_epoch().count();
DecimalUtils::DecimalComponents<DateTime64> components{time_us / 1'000'000, time_us % 1'000'000};
return DecimalField(DecimalUtils::decimalFromComponents<DateTime64>(components, TIME_SCALE), TIME_SCALE);
};
const auto & insert_query = key.query->as<const ASTInsertQuery &>();
size_t i = 0;
res_columns[i++]->insert(queryToString(insert_query));
/// If query is "INSERT INTO FUNCTION" then table_id is empty.
if (insert_query.table_id)
{
res_columns[i++]->insert(insert_query.table_id.getDatabaseName());
res_columns[i++]->insert(insert_query.table_id.getTableName());
}
else
arr_exception.push_back("");
}
{
res_columns[i++]->insertDefault();
res_columns[i++]->insertDefault();
}
res_columns[i++]->insert(arr_query_id);
res_columns[i++]->insert(arr_bytes);
res_columns[i++]->insert(arr_finished);
res_columns[i++]->insert(arr_exception);
res_columns[i++]->insert(insert_query.format);
res_columns[i++]->insert(time_in_microseconds(first_update));
res_columns[i++]->insert(data->size_in_bytes);
Array arr_query_id;
Array arr_bytes;
for (const auto & entry : data->entries)
{
arr_query_id.push_back(entry->query_id);
arr_bytes.push_back(entry->bytes.size());
}
res_columns[i++]->insert(arr_query_id);
res_columns[i++]->insert(arr_bytes);
}
}
}

View File

@ -14,9 +14,7 @@ CREATE TABLE system.asynchronous_inserts
`first_update` DateTime64(6),
`total_bytes` UInt64,
`entries.query_id` Array(String),
`entries.bytes` Array(UInt64),
`entries.finished` Array(UInt8),
`entries.exception` Array(String)
`entries.bytes` Array(UInt64)
)
ENGINE = SystemAsynchronousInserts
COMMENT 'SYSTEM TABLE is built on the fly.'

View File

@ -1,4 +1,4 @@
1 a
2 b
INSERT INTO async_inserts_2156 VALUES 1 Insert 1 0
INSERT INTO async_inserts_2156 VALUES 1 Insert 1
INSERT INTO async_inserts_2156 VALUES 1 Insert 1
INSERT INTO async_inserts_2156 VALUES 1 Insert 1

View File

@ -7,7 +7,7 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
${CLICKHOUSE_CLIENT} -q "DROP TABLE IF EXISTS async_inserts_2156"
${CLICKHOUSE_CLIENT} -q "CREATE TABLE async_inserts_2156 (id UInt32, s String) ENGINE = Memory"
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&async_insert=1&wait_for_async_insert=0" -d "INSERT INTO async_inserts_2156 VALUES (1, 'a')"
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&async_insert=1&wait_for_async_insert=1" -d "INSERT INTO async_inserts_2156 VALUES (1, 'a')"
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&async_insert=1&wait_for_async_insert=1" -d "INSERT INTO async_inserts_2156 VALUES (2, 'b')"
${CLICKHOUSE_CLIENT} -q "SELECT * FROM async_inserts_2156 ORDER BY id"
@ -15,7 +15,7 @@ ${CLICKHOUSE_CLIENT} -q "SELECT * FROM async_inserts_2156 ORDER BY id"
${CLICKHOUSE_CLIENT} -q "SYSTEM FLUSH LOGS"
${CLICKHOUSE_CLIENT} -q "SELECT query, arrayExists(x -> x LIKE '%async_inserts_2156', tables), \
query_kind, Settings['async_insert'], Settings['wait_for_async_insert'] FROM system.query_log \
query_kind, Settings['async_insert'] FROM system.query_log \
WHERE event_date >= yesterday() AND current_database = '$CLICKHOUSE_DATABASE' \
AND query ILIKE 'INSERT INTO async_inserts_2156 VALUES%' AND type = 'QueryFinish' \
ORDER BY query_start_time_microseconds"

View File

@ -0,0 +1 @@
OK

View File

@ -0,0 +1,63 @@
#!/usr/bin/env bash
# Tags: no-random-settings, no-fasttest, long
set -e
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
export MY_CLICKHOUSE_CLIENT="$CLICKHOUSE_CLIENT --async_insert_busy_timeout_ms 10 --async_insert_max_data_size 1 --async_insert 1"
function insert1()
{
while true; do
${MY_CLICKHOUSE_CLIENT} --wait_for_async_insert 0 -q 'INSERT INTO async_inserts_race FORMAT CSV 1,"a"'
done
}
function insert2()
{
while true; do
${MY_CLICKHOUSE_CLIENT} --wait_for_async_insert 0 -q 'INSERT INTO async_inserts_race FORMAT JSONEachRow {"id": 5, "s": "e"} {"id": 6, "s": "f"}'
done
}
function insert3()
{
while true; do
${MY_CLICKHOUSE_CLIENT} --wait_for_async_insert 1 -q "INSERT INTO async_inserts_race VALUES (7, 'g') (8, 'h')" &
sleep 0.05
done
}
function select1()
{
while true; do
${MY_CLICKHOUSE_CLIENT} -q "SELECT * FROM async_inserts_race FORMAT Null"
done
}
${CLICKHOUSE_CLIENT} -q "DROP TABLE IF EXISTS async_inserts_race"
${CLICKHOUSE_CLIENT} -q "CREATE TABLE async_inserts_race (id UInt32, s String) ENGINE = MergeTree ORDER BY id"
TIMEOUT=10
export -f insert1
export -f insert2
export -f insert3
export -f select1
for _ in {1..3}; do
timeout $TIMEOUT bash -c insert1 &
timeout $TIMEOUT bash -c insert2 &
timeout $TIMEOUT bash -c insert3 &
done
timeout $TIMEOUT bash -c select1 &
wait
echo "OK"
${CLICKHOUSE_CLIENT} -q "DROP TABLE IF EXISTS async_inserts_race";