refactor AsynchronousInsertQueue

This commit is contained in:
Anton Popov 2021-08-31 05:16:02 +03:00
parent 8f60c4b8d2
commit 36ac20681e
7 changed files with 327 additions and 258 deletions

View File

@ -15,88 +15,79 @@
#include <Parsers/ASTInsertQuery.h>
#include <Parsers/queryToString.h>
#include <Storages/IStorage.h>
#include <Common/SipHash.h>
#include <Common/FieldVisitorHash.h>
namespace DB
{
struct AsynchronousInsertQueue::InsertData
namespace ErrorCodes
{
InsertData(ASTPtr query_, const Settings & settings_)
: query(std::move(query_)), settings(settings_)
extern const int TIMEOUT_EXCEEDED;
}
AsynchronousInsertQueue::InsertQuery::InsertQuery(const ASTPtr & query_, const Settings & settings_)
: query(query_->clone()), settings(settings_)
{
}
ASTPtr query;
Settings settings;
struct Data
AsynchronousInsertQueue::InsertQuery::InsertQuery(const InsertQuery & other)
: query(other.query->clone()), settings(other.settings)
{
String bytes;
String query_id;
Context::AsyncInsertInfoPtr info;
};
std::mutex mutex;
std::list<Data> data;
size_t size = 0;
/// Timestamp of the first insert into queue, or after the last queue dump.
/// Used to detect for how long the queue is active, so we can dump it by timer.
std::chrono::time_point<std::chrono::steady_clock> first_update = std::chrono::steady_clock::now();
/// Timestamp of the last insert into queue.
/// Used to detect for how long the queue is stale, so we can dump it by another timer.
std::chrono::time_point<std::chrono::steady_clock> last_update;
/// Indicates that the BlockIO should be updated, because we can't read/write prefix and suffix more than once.
bool is_reset = false;
void reset()
{
data.clear();
is_reset = true;
}
};
std::size_t AsynchronousInsertQueue::InsertQueryHash::operator() (const InsertQuery & query) const
{
const auto * insert_query = query.query->as<ASTInsertQuery>();
std::size_t hash = 0;
hash ^= std::hash<String>()(insert_query->table_id.getFullTableName());
hash ^= std::hash<String>()(insert_query->format);
// TODO: insert_query->columns
// TODO: insert_query->table_function
// TODO: insert_query->settings_ast
// TODO: some of query.settings
return hash;
}
bool AsynchronousInsertQueue::InsertQueryEquality::operator() (const InsertQuery & query1, const InsertQuery & query2) const
AsynchronousInsertQueue::InsertQuery &
AsynchronousInsertQueue::InsertQuery::operator==(const InsertQuery & other)
{
const auto * insert_query1 = query1.query->as<ASTInsertQuery>();
const auto * insert_query2 = query2.query->as<ASTInsertQuery>();
if (insert_query1->table_id != insert_query2->table_id)
return false;
if (insert_query1->format != insert_query2->format)
return false;
// TODO: same fields as in InsertQueryHash.
return true;
query = other.query->clone();
settings = other.settings;
return *this;
}
AsynchronousInsertQueue::AsynchronousInsertQueue(ContextMutablePtr context_, size_t pool_size, size_t max_data_size_, const Timeout & timeouts)
: WithMutableContext(context_)
UInt64 AsynchronousInsertQueue::InsertQuery::Hash::operator()(const InsertQuery & insert_query) const
{
SipHash hash;
insert_query.query->updateTreeHash(hash);
for (const auto & setting : insert_query.settings.allChanged())
{
hash.update(setting.getName());
applyVisitor(FieldVisitorHash(hash), setting.getValue());
}
return hash.get64();
}
bool AsynchronousInsertQueue::InsertQuery::operator==(const InsertQuery & other) const
{
return queryToString(query) == queryToString(other.query) && settings == other.settings;
}
void AsynchronousInsertQueue::InsertData::Entry::finish(std::exception_ptr exception_)
{
std::lock_guard lock(mutex);
finished = true;
exception = exception_;
cv.notify_all();
}
bool AsynchronousInsertQueue::InsertData::Entry::wait(const Milliseconds & timeout)
{
std::unique_lock lock(mutex);
return cv.wait_for(lock, timeout, [&] { return finished; });
}
AsynchronousInsertQueue::AsynchronousInsertQueue(ContextPtr context_, size_t pool_size, size_t max_data_size_, const Timeout & timeouts)
: WithContext(context_)
, max_data_size(max_data_size_)
, busy_timeout(timeouts.busy)
, stale_timeout(timeouts.stale)
, queue(new Queue)
, pool(pool_size)
, dump_by_first_update_thread(&AsynchronousInsertQueue::busyCheck, this)
, cleanup_thread(&AsynchronousInsertQueue::cleanup, this)
{
using namespace std::chrono;
@ -115,45 +106,101 @@ AsynchronousInsertQueue::~AsynchronousInsertQueue()
assert(dump_by_first_update_thread.joinable());
dump_by_first_update_thread.join();
assert(cleanup_thread.joinable());
cleanup_thread.join();
if (dump_by_last_update_thread.joinable())
dump_by_last_update_thread.join();
pool.wait();
std::lock_guard lock(currently_processing_mutex);
for (const auto & [_, entry] : currently_processing_queries)
{
if (!entry->finished)
entry->finish(std::make_exception_ptr(Exception(
ErrorCodes::TIMEOUT_EXCEEDED,
"Wait for async insert timeout exceeded)")));
}
}
void AsynchronousInsertQueue::push(const ASTPtr & query, const Settings & settings, const String & query_id)
{
std::unique_lock lock(rwlock);
InsertQuery key{query, settings};
auto it = queue->find(key);
if (it == queue->end())
it = queue->emplace(key, std::make_shared<InsertData>(query, settings)).first;
else if (it->second->is_reset)
it->second = std::make_shared<InsertData>(query, settings);
auto read_buf = getReadBufferFromASTInsertQuery(query);
/// It's important to read the whole data per query as a single chunk, so we can safely drop it in case of parsing failure.
auto & new_data = it->second->data.emplace_back();
new_data.query_id = query_id;
new_data.bytes.reserve(read_buf->totalSize());
new_data.info = getContext()->addAsyncInsertQueryId(query_id);
WriteBufferFromString write_buf(new_data.bytes);
auto entry = std::make_shared<InsertData::Entry>();
entry->query_id = query_id;
entry->bytes.reserve(read_buf->totalSize());
WriteBufferFromString write_buf(entry->bytes);
copyData(*read_buf, write_buf);
it->second->size += read_buf->count();
it->second->last_update = std::chrono::steady_clock::now();
LOG_INFO(&Poco::Logger::get("AsynchronousInsertQueue"),
"Queue size {} for query '{}'", it->second->size, queryToString(*query));
InsertQuery key{query, settings};
Queue::iterator it;
bool found = false;
if (it->second->size > max_data_size)
/// Since we're under lock here, it's safe to pass-by-copy the shared_ptr
/// without a race with the cleanup thread, which may reset last shared_ptr instance.
pool.scheduleOrThrowOnError([data = it->second, global_context = getContext()] { processData(data, global_context); });
{
std::shared_lock read_lock(rwlock);
it = queue.find(key);
if (it != queue.end())
found = true;
}
if (!found)
{
std::unique_lock write_lock(rwlock);
it = queue.emplace(key, std::make_shared<Container>()).first;
}
auto & [data_mutex, data] = *it->second;
std::lock_guard data_lock(data_mutex);
if (!data)
data = std::make_unique<InsertData>();
data->size += read_buf->count();
data->last_update = std::chrono::steady_clock::now();
data->entries.emplace_back(entry);
{
std::lock_guard currently_processing_lock(currently_processing_mutex);
currently_processing_queries.emplace(query_id, entry);
}
LOG_INFO(log, "Queue size {} for query '{}'", data->size, queryToString(*query));
if (data->size > max_data_size)
{
pool.scheduleOrThrowOnError([key,
data = std::make_shared<InsertDataPtr>(std::move(data)),
global_context = getContext()]
{
processData(std::move(key), std::move(*data), global_context);
});
}
}
void AsynchronousInsertQueue::waitForProcessingQuery(const String & query_id, const Milliseconds & timeout)
{
InsertData::EntryPtr entry;
{
std::lock_guard lock(currently_processing_mutex);
auto it = currently_processing_queries.find(query_id);
if (it == currently_processing_queries.end())
return;
entry = it->second;
}
bool finished = entry->wait(timeout);
if (!finished)
throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Wait for async insert timeout ({} ms) exceeded)", timeout.count());
if (entry->exception)
std::rethrow_exception(entry->exception);
}
void AsynchronousInsertQueue::busyCheck()
@ -166,38 +213,28 @@ void AsynchronousInsertQueue::busyCheck()
/// TODO: use priority queue instead of raw unsorted queue.
timeout = busy_timeout;
std::vector<InsertQuery> keys_to_remove;
{
std::shared_lock read_lock(rwlock);
for (auto & [key, data] : *queue)
for (auto & [key, elem] : queue)
{
std::unique_lock<std::mutex> data_lock(data->mutex);
std::lock_guard data_lock(elem->mutex);
if (!elem->data)
continue;
auto lag = std::chrono::steady_clock::now() - data->first_update;
if (data->is_reset)
keys_to_remove.push_back(key);
else if (lag >= busy_timeout)
pool.scheduleOrThrowOnError([data = data, global_context = getContext()] { processData(data, global_context); });
auto lag = std::chrono::steady_clock::now() - elem->data->first_update;
if (lag >= busy_timeout)
{
pool.scheduleOrThrowOnError([key = key,
data = std::make_shared<InsertDataPtr>(std::move(elem->data)),
global_context = getContext()]
{
processData(std::move(key), std::move(*data), global_context);
});
}
else
timeout = std::min(timeout, std::chrono::ceil<std::chrono::seconds>(busy_timeout - lag));
}
}
if (!keys_to_remove.empty())
{
std::unique_lock write_lock(rwlock);
for (const auto & key : keys_to_remove)
{
auto it = queue->find(key);
if (it != queue->end() && it->second->is_reset)
queue->erase(it);
}
}
}
}
void AsynchronousInsertQueue::staleCheck()
@ -205,78 +242,112 @@ void AsynchronousInsertQueue::staleCheck()
while (!shutdown)
{
std::this_thread::sleep_for(stale_timeout);
std::shared_lock read_lock(rwlock);
for (auto & [_, data] : *queue)
for (auto & [key, elem] : queue)
{
std::unique_lock<std::mutex> data_lock(data->mutex);
auto lag = std::chrono::steady_clock::now() - data->last_update;
std::lock_guard data_lock(elem->mutex);
if (!elem->data)
continue;
auto lag = std::chrono::steady_clock::now() - elem->data->last_update;
if (lag >= stale_timeout)
pool.scheduleOrThrowOnError([data = data, global_context = getContext()] { processData(data, global_context); });
{
pool.scheduleOrThrowOnError([key = key,
data = std::make_shared<InsertDataPtr>(std::move(elem->data)),
global_context = getContext()]
{
processData(std::move(key), std::move(*data), global_context);
});
}
}
}
}
void AsynchronousInsertQueue::cleanup()
{
auto timeout = busy_timeout * 3;
while (!shutdown)
{
std::this_thread::sleep_for(timeout);
std::vector<InsertQuery> keys_to_remove;
{
std::shared_lock read_lock(rwlock);
for (auto & [key, elem] : queue)
{
std::lock_guard data_lock(elem->mutex);
if (!elem->data)
keys_to_remove.push_back(key);
}
}
if (!keys_to_remove.empty())
{
std::unique_lock write_lock(rwlock);
for (const auto & key : keys_to_remove)
{
auto it = queue.find(key);
if (it != queue.end() && !it->second)
queue.erase(it);
}
LOG_TRACE(log, "Removed stale entries for {} queries from asynchronous insertion queue", keys_to_remove.size());
}
}
}
// static
void AsynchronousInsertQueue::processData(std::shared_ptr<InsertData> data, ContextPtr global_context)
void AsynchronousInsertQueue::processData(InsertQuery key, InsertDataPtr data, ContextPtr global_context)
try
{
std::unique_lock<std::mutex> data_lock(data->mutex);
if (data->is_reset)
if (!data)
return;
const auto * log = &Poco::Logger::get("AsynchronousInsertQueue");
auto insert_context = Context::createCopy(global_context);
/// 'resetParser' doesn't work for parallel parsing.
data->settings.set("input_format_parallel_parsing", false);
key.settings.set("input_format_parallel_parsing", false);
insert_context->makeQueryContext();
insert_context->setSettings(data->settings);
insert_context->setSettings(key.settings);
InterpreterInsertQuery interpreter(data->query, insert_context, data->settings.insert_allow_materialized_columns);
InterpreterInsertQuery interpreter(key.query, insert_context, key.settings.insert_allow_materialized_columns);
auto sinks = interpreter.getSinks();
assert(sinks.size() == 1);
auto header = sinks.at(0)->getInputs().front().getHeader();
auto format = getInputFormatFromASTInsertQuery(data->query, false, header, insert_context, nullptr);
auto format = getInputFormatFromASTInsertQuery(key.query, false, header, insert_context, nullptr);
size_t total_rows = 0;
std::string_view current_query_id;
Context::AsyncInsertInfoPtr current_info;
InsertData::EntryPtr current_entry;
auto on_error = [&](const MutableColumns & result_columns, Exception & e)
{
LOG_ERROR(&Poco::Logger::get("AsynchronousInsertQueue"),
"Failed parsing for query '{}' with query id {}. {}",
queryToString(data->query), current_query_id, e.displayText());
LOG_ERROR(log, "Failed parsing for query '{}' with query id {}. {}",
queryToString(key.query), current_entry->query_id, e.displayText());
for (const auto & column : result_columns)
if (column->size() > total_rows)
column->popBack(column->size() - total_rows);
current_info->complete(std::current_exception());
current_entry->finish(std::current_exception());
return 0;
};
StreamingFormatExecutor executor(header, format, std::move(on_error));
std::vector<std::tuple<std::unique_ptr<ReadBuffer>,
std::string_view, Context::AsyncInsertInfoPtr>> prepared_data;
prepared_data.reserve(data->data.size());
for (const auto & datum : data->data)
prepared_data.emplace_back(std::make_unique<ReadBufferFromString>(datum.bytes), datum.query_id, datum.info);
for (const auto & [buffer, query_id, info] : prepared_data)
std::unique_ptr<ReadBuffer> buffer;
for (const auto & entry : data->entries)
{
buffer = std::make_unique<ReadBufferFromString>(entry->bytes);
format->resetParser();
format->setReadBuffer(*buffer);
current_query_id = query_id;
current_info = info;
current_entry = entry;
total_rows += executor.execute();
}
@ -295,21 +366,18 @@ try
out_executor->execute(out_pipeline.getNumThreads());
LOG_DEBUG(log, "Flushed {} rows, {} bytes for query '{}'",
total_rows, total_bytes, queryToString(data->query));
total_rows, total_bytes, queryToString(key.query));
for (const auto & datum : data->data)
datum.info->complete();
data->reset();
for (const auto & entry : data->entries)
if (!entry->finished)
entry->finish();
}
catch (...)
{
tryLogCurrentException("AsynchronousInsertQueue", __PRETTY_FUNCTION__);
for (const auto & datum : data->data)
datum.info->complete(std::current_exception());
data->reset();
for (const auto & entry : data->entries)
entry->finish(std::current_exception());
}
}

View File

@ -4,6 +4,7 @@
#include <Common/RWLock.h>
#include <Common/ThreadPool.h>
#include <Core/Settings.h>
#include <Poco/Logger.h>
#include <unordered_map>
@ -14,20 +15,24 @@ namespace DB
class ASTInsertQuery;
struct BlockIO;
class AsynchronousInsertQueue : public WithMutableContext
class AsynchronousInsertQueue : public WithContext
{
public:
using Milliseconds = std::chrono::milliseconds;
using Seconds = std::chrono::seconds;
/// Using structure to allow and benefit from designated initialization and not mess with a positional arguments in ctor.
struct Timeout
{
std::chrono::seconds busy;
std::chrono::seconds stale;
Seconds busy;
Seconds stale;
};
AsynchronousInsertQueue(ContextMutablePtr context_, size_t pool_size, size_t max_data_size, const Timeout & timeouts);
AsynchronousInsertQueue(ContextPtr context_, size_t pool_size, size_t max_data_size, const Timeout & timeouts);
~AsynchronousInsertQueue();
void push(const ASTPtr & query, const Settings & settings, const String & query_id);
void waitForProcessingQuery(const String & query_id, const Milliseconds & timeout);
private:
@ -35,20 +40,64 @@ class AsynchronousInsertQueue : public WithMutableContext
{
ASTPtr query;
Settings settings;
InsertQuery(const ASTPtr & query_, const Settings & settings_);
InsertQuery(const InsertQuery & other);
InsertQuery & operator==(const InsertQuery & other);
bool operator==(const InsertQuery & other) const;
struct Hash { UInt64 operator()(const InsertQuery & insert_query) const; };
};
struct InsertData;
struct InsertQueryHash
struct InsertData
{
std::size_t operator () (const InsertQuery &) const;
struct Entry
{
public:
String bytes;
String query_id;
bool finished = false;
std::exception_ptr exception;
void finish(std::exception_ptr exception_ = nullptr);
bool wait(const Milliseconds & timeout);
private:
std::mutex mutex;
std::condition_variable cv;
};
struct InsertQueryEquality
{
bool operator () (const InsertQuery &, const InsertQuery &) const;
using EntryPtr = std::shared_ptr<Entry>;
std::list<EntryPtr> entries;
size_t size = 0;
/// Timestamp of the first insert into queue, or after the last queue dump.
/// Used to detect for how long the queue is active, so we can dump it by timer.
std::chrono::time_point<std::chrono::steady_clock> first_update = std::chrono::steady_clock::now();
/// Timestamp of the last insert into queue.
/// Used to detect for how long the queue is stale, so we can dump it by another timer.
std::chrono::time_point<std::chrono::steady_clock> last_update;
};
using InsertDataPtr = std::unique_ptr<InsertData>;
struct Container
{
std::mutex mutex;
InsertDataPtr data;
};
using Queue = std::unordered_map<InsertQuery, std::shared_ptr<Container>, InsertQuery::Hash>;
using QueueIterator = Queue::iterator;
std::shared_mutex rwlock;
Queue queue;
std::mutex currently_processing_mutex;
std::unordered_map<String, InsertData::EntryPtr> currently_processing_queries;
/// Logic and events behind queue are as follows:
/// - reset_timeout: if queue is empty for some time, then we delete the queue and free all associated resources, e.g. tables.
/// - busy_timeout: if queue is active for too long and there are a lot of rapid inserts, then we dump the data, so it doesn't
@ -59,26 +108,23 @@ class AsynchronousInsertQueue : public WithMutableContext
/// we dump pending data and delete queue immediately.
/// - max_data_size: if the maximum size of data is reached, then again we dump the data.
using Queue = std::unordered_map<InsertQuery, std::shared_ptr<InsertData>, InsertQueryHash, InsertQueryEquality>;
using QueueIterator = Queue::iterator;
const size_t max_data_size; /// in bytes
const std::chrono::seconds busy_timeout, stale_timeout;
std::shared_mutex rwlock;
std::unique_ptr<Queue> queue;
const Seconds busy_timeout;
const Seconds stale_timeout;
std::atomic<bool> shutdown{false};
ThreadPool pool; /// dump the data only inside this pool.
ThreadFromGlobalPool dump_by_first_update_thread; /// uses busy_timeout and busyCheck()
ThreadFromGlobalPool dump_by_last_update_thread; /// uses stale_timeout and staleCheck()
/// TODO: ThreadFromGlobalPool check_access_thread;
ThreadFromGlobalPool cleanup_thread;
Poco::Logger * log = &Poco::Logger::get("AsynchronousInsertQueue");
void busyCheck();
void staleCheck();
void cleanup();
void pushImpl(const ASTPtr & query, const String & query_id, QueueIterator it);
static void processData(std::shared_ptr<InsertData> data, ContextPtr global_context);
static void processData(InsertQuery key, InsertDataPtr data, ContextPtr global_context);
};
}

View File

@ -927,44 +927,6 @@ void Context::addQueryAccessInfo(
query_access_info.views.emplace(view_name);
}
void Context::AsyncInsertInfo::complete(std::exception_ptr exception_)
{
std::lock_guard lock(mutex);
finished = true;
exception = exception_;
cv.notify_all();
}
Context::AsyncInsertInfoPtr Context::addAsyncInsertQueryId(const String & query_id)
{
auto lock = getLock();
auto it = processing_async_inserts.emplace(query_id, std::make_shared<AsyncInsertInfo>()).first;
return it->second;
}
void Context::waitForProcessingAsyncInsert(const String & query_id, const std::chrono::milliseconds & timeout) const
{
AsyncInsertInfoPtr async_info;
{
auto lock = getLock();
auto it = processing_async_inserts.find(query_id);
if (it == processing_async_inserts.end())
return;
async_info = it->second;
}
std::unique_lock lock(async_info->mutex);
auto finished = async_info->cv.wait_for(lock, timeout, [&] { return async_info->finished; });
if (!finished)
throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Wait for async insert timeout ({} ms) exceeded)", timeout.count());
if (async_info->exception)
std::rethrow_exception(async_info->exception);
}
void Context::addQueryFactoriesInfo(QueryLogFactories factory_type, const String & created_object) const
{
assert(!isGlobalContext() || getApplicationType() == ApplicationType::LOCAL);

View File

@ -286,21 +286,6 @@ public:
// Top-level OpenTelemetry trace context for the query. Makes sense only for a query context.
OpenTelemetryTraceContext query_trace_context;
struct AsyncInsertInfo
{
std::mutex mutex;
std::condition_variable cv;
bool finished = false;
std::exception_ptr exception;
void complete(std::exception_ptr exception_ = nullptr);
};
using AsyncInsertInfoPtr = std::shared_ptr<AsyncInsertInfo>;
AsyncInsertInfoPtr addAsyncInsertQueryId(const String & query_id);
void waitForProcessingAsyncInsert(const String & query_id, const std::chrono::milliseconds & timeout) const;
private:
using SampleBlockCache = std::unordered_map<std::string, Block>;
mutable SampleBlockCache sample_block_cache;
@ -323,8 +308,6 @@ private:
/// thousands of signatures.
/// And I hope it will be replaced with more common Transaction sometime.
std::unordered_map<String, AsyncInsertInfoPtr> processing_async_inserts;
Context();
Context(const Context &);
Context & operator=(const Context &);

View File

@ -75,6 +75,13 @@ void ASTInsertQuery::formatImpl(const FormatSettings & settings, FormatState & s
}
}
void ASTInsertQuery::updateTreeHashImpl(SipHash & hash_state) const
{
hash_state.update(table_id.getFullTableName());
hash_state.update(format);
IAST::updateTreeHashImpl(hash_state);
}
static void tryFindInputFunctionImpl(const ASTPtr & ast, ASTPtr & input_function)
{

View File

@ -58,6 +58,7 @@ public:
protected:
void formatImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override;
void updateTreeHashImpl(SipHash & hash_state) const override;
};
}

View File

@ -25,7 +25,9 @@ protected:
Chunk generate() override
{
auto context = getContext();
context->waitForProcessingAsyncInsert(query_id, std::chrono::milliseconds(timeout_ms));
auto * queue = context->getAsynchronousInsertQueue();
assert(queue);
queue->waitForProcessingQuery(query_id, std::chrono::milliseconds(timeout_ms));
return Chunk();
}