add waiting for async inserts

This commit is contained in:
Anton Popov 2021-08-28 00:29:10 +03:00
parent e8ac8e3454
commit 9e67943878
9 changed files with 154 additions and 27 deletions

View File

@ -594,6 +594,8 @@ class IColumn;
\ \
M(UInt64, async_insert_threads, 16, "Maximum number of threads to actually parse and insert data in background. Zero means asynchronous mode is disabled", 0) \ M(UInt64, async_insert_threads, 16, "Maximum number of threads to actually parse and insert data in background. Zero means asynchronous mode is disabled", 0) \
M(Bool, async_insert_mode, false, "Insert query is processed almost instantly, but an actual data queued for later asynchronous insertion", 0) \ M(Bool, async_insert_mode, false, "Insert query is processed almost instantly, but an actual data queued for later asynchronous insertion", 0) \
M(Bool, wait_for_async_insert, true, "If true wait for processing of asynchronous insertion", 0) \
M(Seconds, wait_for_async_insert_timeout, DBMS_DEFAULT_LOCK_ACQUIRE_TIMEOUT_SEC, "Timeout for waiting for processing asynchronous insertion", 0) \
M(UInt64, async_insert_max_data_size, 1000000, "Maximum size in bytes of unparsed data collected per query before being inserted", 0) \ M(UInt64, async_insert_max_data_size, 1000000, "Maximum size in bytes of unparsed data collected per query before being inserted", 0) \
M(Seconds, async_insert_busy_timeout, 1, "Maximum time to wait before dumping collected data per query since the first data appeared", 0) \ M(Seconds, async_insert_busy_timeout, 1, "Maximum time to wait before dumping collected data per query since the first data appeared", 0) \
M(Seconds, async_insert_stale_timeout, 0, "Maximum time to wait before dumping collected data per query since the last data appeared. Zero means no timeout at all", 0) \ M(Seconds, async_insert_stale_timeout, 0, "Maximum time to wait before dumping collected data per query since the last data appeared. Zero means no timeout at all", 0) \

View File

@ -34,6 +34,7 @@ struct AsynchronousInsertQueue::InsertData
{ {
String bytes; String bytes;
String query_id; String query_id;
Context::AsyncInsertInfoPtr info;
}; };
std::mutex mutex; std::mutex mutex;
@ -88,8 +89,8 @@ bool AsynchronousInsertQueue::InsertQueryEquality::operator() (const InsertQuery
return true; return true;
} }
AsynchronousInsertQueue::AsynchronousInsertQueue(ContextPtr context_, size_t pool_size, size_t max_data_size_, const Timeout & timeouts) AsynchronousInsertQueue::AsynchronousInsertQueue(ContextMutablePtr context_, size_t pool_size, size_t max_data_size_, const Timeout & timeouts)
: WithContext(context_) : WithMutableContext(context_)
, max_data_size(max_data_size_) , max_data_size(max_data_size_)
, busy_timeout(timeouts.busy) , busy_timeout(timeouts.busy)
, stale_timeout(timeouts.stale) , stale_timeout(timeouts.stale)
@ -139,6 +140,8 @@ void AsynchronousInsertQueue::push(const ASTPtr & query, const Settings & settin
new_data.query_id = query_id; new_data.query_id = query_id;
new_data.bytes.reserve(read_buf->totalSize()); new_data.bytes.reserve(read_buf->totalSize());
new_data.info = getContext()->addAsyncInsertQueryId(query_id);
WriteBufferFromString write_buf(new_data.bytes); WriteBufferFromString write_buf(new_data.bytes);
copyData(*read_buf, write_buf); copyData(*read_buf, write_buf);
@ -226,6 +229,7 @@ try
size_t total_rows = 0; size_t total_rows = 0;
std::string_view current_query_id; std::string_view current_query_id;
Context::AsyncInsertInfoPtr current_info;
auto on_error = [&](const MutableColumns & result_columns, Exception & e) auto on_error = [&](const MutableColumns & result_columns, Exception & e)
{ {
@ -237,21 +241,30 @@ try
if (column->size() > total_rows) if (column->size() > total_rows)
column->popBack(column->size() - total_rows); column->popBack(column->size() - total_rows);
std::lock_guard info_lock(current_info->mutex);
current_info->finished = true;
current_info->exception = std::current_exception();
current_info->cv.notify_all();
return 0; return 0;
}; };
StreamingFormatExecutor executor(header, format, std::move(on_error)); StreamingFormatExecutor executor(header, format, std::move(on_error));
std::vector<std::pair<std::string_view, std::unique_ptr<ReadBuffer>>> prepared_data; std::vector<std::tuple<std::unique_ptr<ReadBuffer>,
std::string_view, Context::AsyncInsertInfoPtr>> prepared_data;
prepared_data.reserve(data->data.size()); prepared_data.reserve(data->data.size());
for (const auto & datum : data->data) for (const auto & datum : data->data)
prepared_data.emplace_back(datum.query_id, std::make_unique<ReadBufferFromString>(datum.bytes)); prepared_data.emplace_back(std::make_unique<ReadBufferFromString>(datum.bytes), datum.query_id, datum.info);
for (const auto & [query_id, buffer] : prepared_data) for (const auto & [buffer, query_id, info] : prepared_data)
{ {
format->resetParser(); format->resetParser();
format->setReadBuffer(*buffer); format->setReadBuffer(*buffer);
current_query_id = query_id; current_query_id = query_id;
current_info = info;
total_rows += executor.execute(); total_rows += executor.execute();
} }
@ -272,6 +285,13 @@ try
LOG_DEBUG(log, "Flushed {} rows, {} bytes for query '{}'", LOG_DEBUG(log, "Flushed {} rows, {} bytes for query '{}'",
total_rows, total_bytes, queryToString(data->query)); total_rows, total_bytes, queryToString(data->query));
for (const auto & datum : data->data)
{
std::lock_guard info_lock(datum.info->mutex);
datum.info->finished = true;
datum.info->cv.notify_all();
}
data->reset(); data->reset();
} }
catch (...) catch (...)

View File

@ -14,16 +14,17 @@ namespace DB
class ASTInsertQuery; class ASTInsertQuery;
struct BlockIO; struct BlockIO;
class AsynchronousInsertQueue : public WithContext class AsynchronousInsertQueue : public WithMutableContext
{ {
public: public:
/// Using structure to allow and benefit from designated initialization and not mess with a positional arguments in ctor. /// Using structure to allow and benefit from designated initialization and not mess with a positional arguments in ctor.
struct Timeout struct Timeout
{ {
std::chrono::seconds busy, stale; std::chrono::seconds busy;
std::chrono::seconds stale;
}; };
AsynchronousInsertQueue(ContextPtr context_, size_t pool_size, size_t max_data_size, const Timeout & timeouts); AsynchronousInsertQueue(ContextMutablePtr context_, size_t pool_size, size_t max_data_size, const Timeout & timeouts);
~AsynchronousInsertQueue(); ~AsynchronousInsertQueue();
void push(const ASTPtr & query, const Settings & settings, const String & query_id); void push(const ASTPtr & query, const Settings & settings, const String & query_id);
@ -35,6 +36,7 @@ class AsynchronousInsertQueue : public WithContext
ASTPtr query; ASTPtr query;
Settings settings; Settings settings;
}; };
struct InsertData; struct InsertData;
struct InsertQueryHash struct InsertQueryHash

View File

@ -120,6 +120,7 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR; extern const int LOGICAL_ERROR;
extern const int NOT_IMPLEMENTED; extern const int NOT_IMPLEMENTED;
extern const int INVALID_SETTING_VALUE; extern const int INVALID_SETTING_VALUE;
extern const int TIMEOUT_EXCEEDED;
} }
@ -926,6 +927,35 @@ void Context::addQueryAccessInfo(
query_access_info.views.emplace(view_name); query_access_info.views.emplace(view_name);
} }
Context::AsyncInsertInfoPtr Context::addAsyncInsertQueryId(const String & query_id)
{
auto lock = getLock();
auto it = processing_async_inserts.emplace(query_id, std::make_shared<AsyncInsertInfo>()).first;
return it->second;
}
void Context::waitForProcessingAsyncInsert(const String & query_id, const std::chrono::milliseconds & timeout) const
{
AsyncInsertInfoPtr wait_data;
{
auto lock = getLock();
auto it = processing_async_inserts.find(query_id);
if (it == processing_async_inserts.end())
return;
wait_data = it->second;
}
std::unique_lock lock(wait_data->mutex);
auto finished = wait_data->cv.wait_for(lock, timeout, [&] { return wait_data->finished; });
if (!finished)
throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Wait for async insert timeout ({} ms) exceeded)", timeout.count());
if (wait_data->exception)
std::rethrow_exception(wait_data->exception);
}
void Context::addQueryFactoriesInfo(QueryLogFactories factory_type, const String & created_object) const void Context::addQueryFactoriesInfo(QueryLogFactories factory_type, const String & created_object) const
{ {

View File

@ -24,6 +24,7 @@
#include <memory> #include <memory>
#include <mutex> #include <mutex>
#include <optional> #include <optional>
#include <exception>
namespace Poco::Net { class IPAddress; } namespace Poco::Net { class IPAddress; }
@ -285,6 +286,19 @@ public:
// Top-level OpenTelemetry trace context for the query. Makes sense only for a query context. // Top-level OpenTelemetry trace context for the query. Makes sense only for a query context.
OpenTelemetryTraceContext query_trace_context; OpenTelemetryTraceContext query_trace_context;
struct AsyncInsertInfo
{
std::mutex mutex;
std::condition_variable cv;
bool finished = false;
std::exception_ptr exception;
};
using AsyncInsertInfoPtr = std::shared_ptr<AsyncInsertInfo>;
AsyncInsertInfoPtr addAsyncInsertQueryId(const String & query_id);
void waitForProcessingAsyncInsert(const String & query_id, const std::chrono::milliseconds & timeout) const;
private: private:
using SampleBlockCache = std::unordered_map<std::string, Block>; using SampleBlockCache = std::unordered_map<std::string, Block>;
mutable SampleBlockCache sample_block_cache; mutable SampleBlockCache sample_block_cache;
@ -307,6 +321,8 @@ private:
/// thousands of signatures. /// thousands of signatures.
/// And I hope it will be replaced with more common Transaction sometime. /// And I hope it will be replaced with more common Transaction sometime.
std::unordered_map<String, AsyncInsertInfoPtr> processing_async_inserts;
Context(); Context();
Context(const Context &); Context(const Context &);
Context & operator=(const Context &); Context & operator=(const Context &);
@ -468,6 +484,7 @@ public:
const String & projection_name = {}, const String & projection_name = {},
const String & view_name = {}); const String & view_name = {});
/// Supported factories for records in query_log /// Supported factories for records in query_log
enum class QueryLogFactories enum class QueryLogFactories
{ {

View File

@ -147,7 +147,7 @@ static bool isTrivialSelect(const ASTPtr & select)
}; };
std::pair<BlockIO, Processors> InterpreterInsertQuery::executeImpl( std::pair<BlockIO, BlockOutputStreams> InterpreterInsertQuery::executeImpl(
const StoragePtr & table, Block & sample_block) const StoragePtr & table, Block & sample_block)
{ {
const auto & settings = getContext()->getSettingsRef(); const auto & settings = getContext()->getSettingsRef();
@ -158,7 +158,7 @@ std::pair<BlockIO, Processors> InterpreterInsertQuery::executeImpl(
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "PARTITION BY clause is not supported by storage"); throw Exception(ErrorCodes::NOT_IMPLEMENTED, "PARTITION BY clause is not supported by storage");
BlockIO res; BlockIO res;
Processors sinks; BlockOutputStreams out_streams;
bool is_distributed_insert_select = false; bool is_distributed_insert_select = false;
if (query.select && table->isRemote() && settings.parallel_distributed_insert_select) if (query.select && table->isRemote() && settings.parallel_distributed_insert_select)
@ -303,11 +303,11 @@ std::pair<BlockIO, Processors> InterpreterInsertQuery::executeImpl(
auto out_wrapper = std::make_shared<CountingBlockOutputStream>(out); auto out_wrapper = std::make_shared<CountingBlockOutputStream>(out);
out_wrapper->setProcessListElement(getContext()->getProcessListElement()); out_wrapper->setProcessListElement(getContext()->getProcessListElement());
sinks.emplace_back(std::make_shared<SinkToOutputStream>(std::move(out_wrapper))); out_streams.emplace_back(std::move(out_wrapper));
} }
} }
return {std::move(res), std::move(sinks)}; return {std::move(res), std::move(out_streams)};
} }
BlockIO InterpreterInsertQuery::execute() BlockIO InterpreterInsertQuery::execute()
@ -324,11 +324,11 @@ BlockIO InterpreterInsertQuery::execute()
getContext()->checkAccess(AccessType::INSERT, query.table_id, sample_block.getNames()); getContext()->checkAccess(AccessType::INSERT, query.table_id, sample_block.getNames());
BlockIO res; BlockIO res;
Processors sinks; BlockOutputStreams out_streams;
std::tie(res, sinks) = executeImpl(table, sample_block); std::tie(res, out_streams) = executeImpl(table, sample_block);
/// What type of query: INSERT or INSERT SELECT or INSERT WATCH? /// What type of query: INSERT or INSERT SELECT or INSERT WATCH?
if (sinks.empty()) if (out_streams.empty())
{ {
/// Pipeline was already built. /// Pipeline was already built.
} }
@ -336,7 +336,7 @@ BlockIO InterpreterInsertQuery::execute()
{ {
/// XXX: is this branch also triggered for select+input() case? /// XXX: is this branch also triggered for select+input() case?
const auto & header = sinks.at(0)->getInputs().front().getHeader(); const auto & header = out_streams.at(0)->getHeader();
auto actions_dag = ActionsDAG::makeConvertingActions( auto actions_dag = ActionsDAG::makeConvertingActions(
res.pipeline.getHeader().getColumnsWithTypeAndName(), res.pipeline.getHeader().getColumnsWithTypeAndName(),
header.getColumnsWithTypeAndName(), header.getColumnsWithTypeAndName(),
@ -348,13 +348,15 @@ BlockIO InterpreterInsertQuery::execute()
return std::make_shared<ExpressionTransform>(in_header, actions); return std::make_shared<ExpressionTransform>(in_header, actions);
}); });
auto it = sinks.rbegin(); res.pipeline.setSinks([&](const Block &, QueryPipeline::StreamType type) -> ProcessorPtr
res.pipeline.setSinks([&it](const Block &, QueryPipeline::StreamType type) -> ProcessorPtr
{ {
if (type != QueryPipeline::StreamType::Main) if (type != QueryPipeline::StreamType::Main)
return nullptr; return nullptr;
return *it++; auto stream = std::move(out_streams.back());
out_streams.pop_back();
return std::make_shared<SinkToOutputStream>(std::move(stream));
}); });
if (!allow_materialized) if (!allow_materialized)
@ -364,16 +366,18 @@ BlockIO InterpreterInsertQuery::execute()
throw Exception("Cannot insert column " + column.name + ", because it is MATERIALIZED column.", ErrorCodes::ILLEGAL_COLUMN); throw Exception("Cannot insert column " + column.name + ", because it is MATERIALIZED column.", ErrorCodes::ILLEGAL_COLUMN);
} }
} }
else else if (!query.expectNativeData())
{ {
auto pipe = getSourceFromASTInsertQuery(query_ptr, true, sample_block, getContext(), nullptr); auto pipe = getSourceFromASTInsertQuery(query_ptr, true, sample_block, getContext(), nullptr);
res.pipeline.init(std::move(pipe)); res.pipeline.init(std::move(pipe));
res.pipeline.resize(1); res.pipeline.resize(1);
res.pipeline.setSinks([&](const Block &, Pipe::StreamType) res.pipeline.setSinks([&](const Block &, Pipe::StreamType)
{ {
return sinks.at(0); return std::make_shared<SinkToOutputStream>(out_streams.at(0));
}); });
} }
else
res.out = std::move(out_streams.at(0));
res.pipeline.addStorageHolder(table); res.pipeline.addStorageHolder(table);
if (const auto * mv = dynamic_cast<const StorageMaterializedView *>(table.get())) if (const auto * mv = dynamic_cast<const StorageMaterializedView *>(table.get()))
@ -398,7 +402,14 @@ Processors InterpreterInsertQuery::getSinks()
if (!query.table_function) if (!query.table_function)
getContext()->checkAccess(AccessType::INSERT, query.table_id, sample_block.getNames()); getContext()->checkAccess(AccessType::INSERT, query.table_id, sample_block.getNames());
return executeImpl(table, sample_block).second; auto out_streams = executeImpl(table, sample_block).second;
Processors sinks;
sinks.reserve(out_streams.size());
for (const auto & out : out_streams)
sinks.emplace_back(std::make_shared<SinkToOutputStream>(out));
return sinks;
} }
StorageID InterpreterInsertQuery::getDatabaseTable() const StorageID InterpreterInsertQuery::getDatabaseTable() const

View File

@ -37,7 +37,7 @@ public:
private: private:
StoragePtr getTable(ASTInsertQuery & query); StoragePtr getTable(ASTInsertQuery & query);
Block getSampleBlock(const ASTInsertQuery & query, const StoragePtr & table, const StorageMetadataPtr & metadata_snapshot) const; Block getSampleBlock(const ASTInsertQuery & query, const StoragePtr & table, const StorageMetadataPtr & metadata_snapshot) const;
std::pair<BlockIO, Processors> executeImpl(const StoragePtr & table, Block & sample_block); std::pair<BlockIO, BlockOutputStreams> executeImpl(const StoragePtr & table, Block & sample_block);
ASTPtr query_ptr; ASTPtr query_ptr;
const bool allow_materialized; const bool allow_materialized;

View File

@ -56,6 +56,7 @@
#include <Processors/Transforms/MaterializingTransform.h> #include <Processors/Transforms/MaterializingTransform.h>
#include <Processors/Formats/IOutputFormat.h> #include <Processors/Formats/IOutputFormat.h>
#include <Processors/Sources/SinkToOutputStream.h> #include <Processors/Sources/SinkToOutputStream.h>
#include <Processors/Sources/WaitForAsyncInsertSource.h>
namespace ProfileEvents namespace ProfileEvents
@ -555,11 +556,18 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
if (async_insert) if (async_insert)
{ {
queue->push(ast, settings, context->getCurrentQueryId()); auto query_id = context->getCurrentQueryId();
queue->push(ast, settings, query_id);
/// Shortcut for already processed similar insert-queries. BlockIO io;
/// Similarity is defined by hashing query text and some settings. if (settings.wait_for_async_insert)
return std::make_tuple(ast, BlockIO()); {
auto timeout = settings.wait_for_async_insert_timeout.totalMilliseconds();
auto source = std::make_shared<WaitForAsyncInsertSource>(Block(), query_id, timeout, context->getGlobalContext());
io.pipeline.init(Pipe(source));
}
return std::make_tuple(ast, std::move(io));
} }
auto interpreter = InterpreterFactory::get(ast, context, SelectQueryOptions(stage).setInternal(internal)); auto interpreter = InterpreterFactory::get(ast, context, SelectQueryOptions(stage).setInternal(internal));

View File

@ -0,0 +1,37 @@
#pragma once
#include <Processors/ISource.h>
#include <Interpreters/Context.h>
namespace DB
{
class WaitForAsyncInsertSource : public ISource, WithContext
{
public:
WaitForAsyncInsertSource(
const Block & header, const String & query_id_,
size_t timeout_ms_, ContextPtr context_)
: ISource(std::move(header))
, WithContext(context_)
, query_id(query_id_)
, timeout_ms(timeout_ms_)
{
}
String getName() const override { return "WaitForAsyncInsert"; }
protected:
Chunk generate() override
{
auto context = getContext();
context->waitForProcessingAsyncInsert(query_id, std::chrono::milliseconds(timeout_ms));
return Chunk();
}
private:
String query_id;
size_t timeout_ms;
};
}