This commit is contained in:
Ivan Lezhankin 2021-03-17 17:11:47 +03:00
parent 74214865eb
commit 346818266a
7 changed files with 99 additions and 25 deletions

View File

@ -2,30 +2,39 @@
#include <Core/Settings.h>
#include <DataStreams/BlockIO.h>
#include <DataStreams/copyData.h>
#include <DataStreams/IBlockStream_fwd.h>
#include <DataStreams/InputStreamFromASTInsertQuery.h>
#include <IO/ConcatReadBuffer.h>
#include <IO/ReadBufferFromMemory.h>
#include <IO/ReadBufferFromString.h>
#include <IO/copyData.h>
#include <Parsers/ASTInsertQuery.h>
#include <Common/getNumberOfPhysicalCPUCores.h>
#include "IO/WriteBufferFromOStream.h"
#include "Parsers/formatAST.h"
namespace DB
{
struct AsynchronousInsertQueue::InsertQuery
{
ASTPtr query;
Settings settings;
};
struct AsynchronousInsertQueue::InsertData
{
std::mutex mutex;
std::list<std::string> data;
size_t size = 0;
std::chrono::time_point<std::chrono::steady_clock> first_update, last_update;
BlockIO io;
/// Timestamp of the first insert into queue, or after the last queue dump.
/// Used to detect for how long the queue is active, so we can dump it by timer.
std::chrono::time_point<std::chrono::steady_clock> first_update;
/// Timestamp of the last insert into queue.
/// Used to detect for how long the queue is stale, so we can dump it by another timer.
std::chrono::time_point<std::chrono::steady_clock> last_update;
/// Indicates that the BlockIO should be updated, because we can't read/write prefix and suffix more than once.
bool reset = false;
};
std::size_t AsynchronousInsertQueue::InsertQueryHash::operator() (const InsertQuery & query) const
@ -58,7 +67,8 @@ bool AsynchronousInsertQueue::InsertQueryEquality::operator() (const InsertQuery
return true;
}
AsynchronousInsertQueue::AsynchronousInsertQueue(size_t pool_size, size_t max_data_size_) : max_data_size(max_data_size_), pool(pool_size)
AsynchronousInsertQueue::AsynchronousInsertQueue(size_t pool_size, size_t max_data_size_)
: max_data_size(max_data_size_), lock(RWLockImpl::create()), queue(new Queue), pool(pool_size)
{
}
@ -66,8 +76,8 @@ bool AsynchronousInsertQueue::push(ASTInsertQuery * query, const Settings & sett
{
auto read_lock = lock->getLock(RWLockImpl::Read, String());
auto it = queue.find(InsertQuery{query->shared_from_this(), settings});
if (it != queue.end())
auto it = queue->find(InsertQuery{query->shared_from_this(), settings});
if (it != queue->end() && !it->second->reset)
{
pushImpl(query, it);
return true;
@ -80,14 +90,23 @@ void AsynchronousInsertQueue::push(ASTInsertQuery * query, BlockIO && io, const
{
auto write_lock = lock->getLock(RWLockImpl::Write, String());
auto it = queue.find(InsertQuery{query->shared_from_this(), settings});
if (it == queue.end())
auto it = queue->find(InsertQuery{query->shared_from_this(), settings});
if (it == queue->end())
{
InsertQuery key{query->shared_from_this(), settings};
it = queue.insert({key, std::make_shared<InsertData>()}).first;
it = queue->insert({key, std::make_shared<InsertData>()}).first;
it->second->io = std::move(io);
it->second->first_update = std::chrono::steady_clock::now();
}
else
{
std::unique_lock<std::mutex> data_lock(it->second->mutex);
it->second->reset = false;
it->second->io = std::move(io);
/// All other fields should have been already reset.
}
pushImpl(query, it);
}
@ -97,6 +116,7 @@ void AsynchronousInsertQueue::pushImpl(ASTInsertQuery * query, QueueIterator & i
ConcatReadBuffer::Buffers buffers;
auto ast_buf = std::make_unique<ReadBufferFromMemory>(query->data, query->data ? query->end - query->data : 0);
if (query->data)
buffers.push_back(std::move(ast_buf));
@ -118,15 +138,36 @@ void AsynchronousInsertQueue::pushImpl(ASTInsertQuery * query, QueueIterator & i
it->second->size += concat_buf.count();
it->second->last_update = std::chrono::steady_clock::now();
String query_string;
{
WriteBufferFromString buf(query_string);
formatAST(*query, buf);
}
LOG_INFO(&Poco::Logger::get("AsynchronousInsertQueue"), "Queue size {} for query '{}'", it->second->size, query_string);
if (it->second->size > max_data_size)
/// Since we're under lock here it's safe to pass-by-copy the shared_ptr
/// Since we're under lock here, it's safe to pass-by-copy the shared_ptr
/// without a race with the cleanup thread, which may reset last shared_ptr instance.
pool.scheduleOrThrowOnError([this, data = it->second] { processData(data); });
pool.scheduleOrThrowOnError([data = it->second] { processData(data); });
}
// static
void AsynchronousInsertQueue::processData(std::shared_ptr<InsertData> data)
{
std::unique_lock<std::mutex> data_lock(data->mutex);
auto in = std::dynamic_pointer_cast<InputStreamFromASTInsertQuery>(data->io.in);
assert(in);
for (const auto & datum : data->data)
in->appendBuffer(std::make_unique<ReadBufferFromString>(datum));
copyData(*in, *data->io.out);
data->data.clear();
data->size = 0;
data->first_update = std::chrono::steady_clock::now();
data->last_update = data->first_update;
data->reset = true;
}
}

View File

@ -3,6 +3,7 @@
#include <Parsers/IAST_fwd.h>
#include <Common/RWLock.h>
#include <Common/ThreadPool.h>
#include <Core/Settings.h>
#include <unordered_map>
@ -12,7 +13,6 @@ namespace DB
class ASTInsertQuery;
struct BlockIO;
struct Settings;
class AsynchronousInsertQueue
{
@ -23,7 +23,11 @@ class AsynchronousInsertQueue
void push(ASTInsertQuery * query, BlockIO && io, const Settings & settings);
private:
struct InsertQuery;
struct InsertQuery
{
ASTPtr query;
Settings settings;
};
struct InsertData;
struct InsertQueryHash
@ -36,19 +40,29 @@ class AsynchronousInsertQueue
bool operator () (const InsertQuery &, const InsertQuery &) const;
};
/// Logic and events behind queue are as follows:
/// - reset_timeout: if queue is empty for some time, then we delete the queue and free all associated resources, e.g. tables.
/// - dump_timeout: if queue is active for too long and there are a lot of rapid inserts, then we dump the data, so it doesn't
/// grow for a long period of time and users will be able to select new data in determenistic manner.
/// - stale_timeout: if queue is stale for too long, then we dump the data too, so that users will be able to select the last
/// piece of inserted data.
/// - access_timeout: also we have to check if user still has access to the tables periodically, and if the access is lost, then
/// we dump pending data and delete queue immediately.
/// - max_data_size: if the maximum size of data is reached, then again we dump the data.
using Queue = std::unordered_map<InsertQuery, std::shared_ptr<InsertData>, InsertQueryHash, InsertQueryEquality>;
using QueueIterator = Queue::iterator;
const size_t max_data_size;
const size_t max_data_size; /// in bytes
RWLock lock;
Queue queue;
std::unique_ptr<Queue> queue;
ThreadPool pool;
ThreadPool pool; /// dump the data only inside this pool.
/// TODO: ThreadFromGlobalPool remove_empty_thread, check_access_thread;
void pushImpl(ASTInsertQuery * query, QueueIterator & it); /// use only under lock
void processData(std::shared_ptr<InsertData> data);
static void processData(std::shared_ptr<InsertData> data);
};
}

View File

@ -14,7 +14,7 @@ class ConcatReadBuffer : public ReadBuffer
public:
using Buffers = std::vector<std::unique_ptr<ReadBuffer>>;
ConcatReadBuffer() : ReadBuffer(nullptr, 0)
ConcatReadBuffer() : ReadBuffer(nullptr, 0), current(buffers.end())
{
}
@ -31,7 +31,9 @@ public:
void appendBuffer(std::unique_ptr<ReadBuffer> buffer)
{
assert(!count());
buffers.push_back(std::move(buffer));
current = buffers.begin();
}
protected:

View File

@ -65,6 +65,7 @@
#include <Interpreters/DatabaseCatalog.h>
#include <Storages/MergeTree/BackgroundJobsExecutor.h>
#include <Storages/MergeTree/MergeTreeDataPartUUID.h>
#include <IO/AsynchronousInsertionQueue.h>
namespace ProfileEvents
@ -380,6 +381,8 @@ struct ContextShared
std::shared_ptr<CompiledExpressionCache> compiled_expression_cache;
#endif
std::shared_ptr<AsynchronousInsertQueue> async_insert_queue;
bool shutdown_called = false;
Stopwatch uptime_watch;
@ -496,6 +499,7 @@ Context Context::createGlobal(ContextShared * shared)
void Context::initGlobal()
{
DatabaseCatalog::init(*this);
shared->async_insert_queue = std::make_shared<AsynchronousInsertQueue>(16, 1024);
}
SharedContextHolder Context::createShared()
@ -2589,4 +2593,9 @@ PartUUIDsPtr Context::getIgnoredPartUUIDs()
return ignored_part_uuids;
}
AsynchronousInsertQueue & Context::getAsynchronousInsertQueue() const
{
return *shared->async_insert_queue;
}
}

View File

@ -152,7 +152,7 @@ struct SharedContextHolder
{
~SharedContextHolder();
SharedContextHolder();
SharedContextHolder(std::unique_ptr<ContextShared> shared_context);
explicit SharedContextHolder(std::unique_ptr<ContextShared> shared_context);
SharedContextHolder(SharedContextHolder &&) noexcept;
SharedContextHolder & operator=(SharedContextHolder &&);
@ -765,7 +765,8 @@ public:
PartUUIDsPtr getPartUUIDs();
PartUUIDsPtr getIgnoredPartUUIDs();
AsynchronousInsertQueue & getAsynchronousInsertQueue();
AsynchronousInsertQueue & getAsynchronousInsertQueue() const;
private:
std::unique_lock<std::recursive_mutex> getLock() const;

View File

@ -371,6 +371,7 @@ void ThreadStatus::detachQuery(bool exit_if_already_detached, bool thread_exits)
query_id.clear();
query_context = nullptr;
thread_group->query_context = nullptr;
thread_trace_context.trace_id = 0;
thread_trace_context.span_id = 0;
thread_group.reset();

View File

@ -498,6 +498,11 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
context.initializeExternalTablesIfSet();
auto * insert_query = ast->as<ASTInsertQuery>();
if (insert_query)
/// Resolve database before trying to use async insert feature - to properly hash the query.
insert_query->table_id = context.resolveStorageID(insert_query->table_id);
if (insert_query && insert_query->select)
{
/// Prepare Input storage before executing interpreter if we already got a buffer with data.
@ -548,7 +553,8 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
limits.size_limits = SizeLimits(settings.max_result_rows, settings.max_result_bytes, settings.result_overflow_mode);
}
const bool async_insert = insert_query && !insert_query->select && settings.asynchronous_insert_mode;
const bool async_insert
= insert_query && !insert_query->select && (insert_query->data || insert_query->tail) && settings.asynchronous_insert_mode;
auto & queue = context.getAsynchronousInsertQueue();
if (async_insert && queue.push(insert_query, settings))