mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-04 05:22:17 +00:00
implement async inserts with processors
This commit is contained in:
parent
3a0d4807a5
commit
78dbcaea54
@ -2,8 +2,9 @@
|
||||
|
||||
#include <Core/Settings.h>
|
||||
#include <DataStreams/BlockIO.h>
|
||||
#include <DataStreams/IBlockStream_fwd.h>
|
||||
#include <DataStreams/copyData.h>
|
||||
#include <Interpreters/InterpreterInsertQuery.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Processors/Transforms/getSourceFromASTInsertQuery.h>
|
||||
#include <IO/ConcatReadBuffer.h>
|
||||
#include <IO/ReadBufferFromMemory.h>
|
||||
#include <IO/ReadBufferFromString.h>
|
||||
@ -18,10 +19,17 @@ namespace DB
|
||||
|
||||
struct AsynchronousInsertQueue::InsertData
|
||||
{
|
||||
InsertData(ASTPtr query_, const Settings & settings_)
|
||||
: query(std::move(query_)), settings(settings_)
|
||||
{
|
||||
}
|
||||
|
||||
ASTPtr query;
|
||||
Settings settings;
|
||||
|
||||
std::mutex mutex;
|
||||
std::list<std::string> data;
|
||||
size_t size = 0;
|
||||
BlockIO io;
|
||||
|
||||
/// Timestamp of the first insert into queue, or after the last queue dump.
|
||||
/// Used to detect for how long the queue is active, so we can dump it by timer.
|
||||
@ -32,7 +40,13 @@ struct AsynchronousInsertQueue::InsertData
|
||||
std::chrono::time_point<std::chrono::steady_clock> last_update;
|
||||
|
||||
/// Indicates that the BlockIO should be updated, because we can't read/write prefix and suffix more than once.
|
||||
bool reset = false;
|
||||
bool is_reset = false;
|
||||
|
||||
void reset()
|
||||
{
|
||||
data.clear();
|
||||
is_reset = true;
|
||||
}
|
||||
};
|
||||
|
||||
std::size_t AsynchronousInsertQueue::InsertQueryHash::operator() (const InsertQuery & query) const
|
||||
@ -65,8 +79,9 @@ bool AsynchronousInsertQueue::InsertQueryEquality::operator() (const InsertQuery
|
||||
return true;
|
||||
}
|
||||
|
||||
AsynchronousInsertQueue::AsynchronousInsertQueue(size_t pool_size, size_t max_data_size_, const Timeout & timeouts)
|
||||
: max_data_size(max_data_size_)
|
||||
AsynchronousInsertQueue::AsynchronousInsertQueue(ContextPtr context_, size_t pool_size, size_t max_data_size_, const Timeout & timeouts)
|
||||
: WithContext(context_)
|
||||
, max_data_size(max_data_size_)
|
||||
, busy_timeout(timeouts.busy)
|
||||
, stale_timeout(timeouts.stale)
|
||||
, lock(RWLockImpl::create())
|
||||
@ -97,45 +112,41 @@ AsynchronousInsertQueue::~AsynchronousInsertQueue()
|
||||
pool.wait();
|
||||
}
|
||||
|
||||
bool AsynchronousInsertQueue::push(ASTInsertQuery * query, const Settings & settings)
|
||||
{
|
||||
auto read_lock = lock->getLock(RWLockImpl::Read, String());
|
||||
|
||||
auto it = queue->find(InsertQuery{query->shared_from_this(), settings});
|
||||
|
||||
if (it != queue->end())
|
||||
{
|
||||
std::unique_lock<std::mutex> data_lock(it->second->mutex);
|
||||
|
||||
if (it->second->reset)
|
||||
return false;
|
||||
|
||||
pushImpl(query, it);
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
void AsynchronousInsertQueue::push(ASTInsertQuery * query, BlockIO && io, const Settings & settings)
|
||||
void AsynchronousInsertQueue::push(const ASTPtr & query, const Settings & settings)
|
||||
{
|
||||
auto write_lock = lock->getLock(RWLockImpl::Write, String());
|
||||
|
||||
InsertQuery key{query->shared_from_this(), settings};
|
||||
InsertQuery key{query, settings};
|
||||
|
||||
auto it = queue->find(key);
|
||||
if (it == queue->end())
|
||||
{
|
||||
it = queue->insert({key, std::make_shared<InsertData>()}).first;
|
||||
it->second->io = std::move(io);
|
||||
}
|
||||
else if (it->second->reset)
|
||||
{
|
||||
it->second = std::make_shared<InsertData>();
|
||||
it->second->io = std::move(io);
|
||||
}
|
||||
it = queue->insert({key, std::make_shared<InsertData>(query, settings)}).first;
|
||||
else if (it->second->is_reset)
|
||||
it->second = std::make_shared<InsertData>(query, settings);
|
||||
|
||||
std::unique_lock<std::mutex> data_lock(it->second->mutex);
|
||||
pushImpl(query, it);
|
||||
|
||||
auto read_buffers = getReadBuffersFromASTInsertQuery(query);
|
||||
ConcatReadBuffer concat_buf(std::move(read_buffers));
|
||||
|
||||
/// NOTE: must not read from |query->tail| before read all between |query->data| and |query->end|.
|
||||
|
||||
/// It's important to read the whole data per query as a single chunk, so we can safely drop it in case of parsing failure.
|
||||
auto & new_data = it->second->data.emplace_back();
|
||||
new_data.reserve(concat_buf.totalSize());
|
||||
WriteBufferFromString write_buf(new_data);
|
||||
|
||||
copyData(concat_buf, write_buf);
|
||||
it->second->size += concat_buf.count();
|
||||
it->second->last_update = std::chrono::steady_clock::now();
|
||||
|
||||
LOG_INFO(&Poco::Logger::get("AsynchronousInsertQueue"),
|
||||
"Queue size {} for query '{}'", it->second->size, queryToString(*query));
|
||||
|
||||
if (it->second->size > max_data_size)
|
||||
/// Since we're under lock here, it's safe to pass-by-copy the shared_ptr
|
||||
/// without a race with the cleanup thread, which may reset last shared_ptr instance.
|
||||
pool.scheduleOrThrowOnError([data = it->second, global_context = getContext()] { processData(data, global_context); });
|
||||
}
|
||||
|
||||
void AsynchronousInsertQueue::busyCheck()
|
||||
@ -157,7 +168,7 @@ void AsynchronousInsertQueue::busyCheck()
|
||||
auto lag = std::chrono::steady_clock::now() - data->first_update;
|
||||
|
||||
if (lag >= busy_timeout)
|
||||
pool.scheduleOrThrowOnError([data = data] { processData(data); });
|
||||
pool.scheduleOrThrowOnError([data = data, global_context = getContext()] { processData(data, global_context); });
|
||||
else
|
||||
timeout = std::min(timeout, std::chrono::ceil<std::chrono::seconds>(busy_timeout - lag));
|
||||
}
|
||||
@ -179,64 +190,47 @@ void AsynchronousInsertQueue::staleCheck()
|
||||
auto lag = std::chrono::steady_clock::now() - data->last_update;
|
||||
|
||||
if (lag >= stale_timeout)
|
||||
pool.scheduleOrThrowOnError([data = data] { processData(data); });
|
||||
pool.scheduleOrThrowOnError([data = data, global_context = getContext()] { processData(data, global_context); });
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void AsynchronousInsertQueue::pushImpl(ASTInsertQuery * query, QueueIterator & it)
|
||||
{
|
||||
ConcatReadBuffer concat_buf;
|
||||
|
||||
auto ast_buf = std::make_unique<ReadBufferFromMemory>(query->data, query->data ? query->end - query->data : 0);
|
||||
|
||||
if (query->data)
|
||||
concat_buf.appendBuffer(std::move(ast_buf));
|
||||
|
||||
if (query->tail)
|
||||
concat_buf.appendBuffer(wrapReadBufferReference(*query->tail));
|
||||
|
||||
/// NOTE: must not read from |query->tail| before read all between |query->data| and |query->end|.
|
||||
|
||||
/// It's important to read the whole data per query as a single chunk, so we can safely drop it in case of parsing failure.
|
||||
auto & new_data = it->second->data.emplace_back();
|
||||
new_data.reserve(concat_buf.totalSize());
|
||||
WriteBufferFromString write_buf(new_data);
|
||||
|
||||
copyData(concat_buf, write_buf);
|
||||
it->second->size += concat_buf.count();
|
||||
it->second->last_update = std::chrono::steady_clock::now();
|
||||
|
||||
LOG_INFO(&Poco::Logger::get("AsynchronousInsertQueue"), "Queue size {} for query '{}'", it->second->size, queryToString(*query));
|
||||
|
||||
if (it->second->size > max_data_size)
|
||||
/// Since we're under lock here, it's safe to pass-by-copy the shared_ptr
|
||||
/// without a race with the cleanup thread, which may reset last shared_ptr instance.
|
||||
pool.scheduleOrThrowOnError([data = it->second] { processData(data); });
|
||||
}
|
||||
|
||||
// static
|
||||
void AsynchronousInsertQueue::processData(std::shared_ptr<InsertData> data)
|
||||
void AsynchronousInsertQueue::processData(std::shared_ptr<InsertData> data, ContextPtr global_context)
|
||||
try
|
||||
{
|
||||
std::unique_lock<std::mutex> data_lock(data->mutex);
|
||||
|
||||
if (data->reset)
|
||||
if (data->is_reset)
|
||||
return;
|
||||
|
||||
// auto in = std::dynamic_pointer_cast<InputStreamFromASTInsertQuery>(data->io.in);
|
||||
// assert(in);
|
||||
ReadBuffers read_buffers;
|
||||
for (const auto & datum : data->data)
|
||||
read_buffers.emplace_back(std::make_unique<ReadBufferFromString>(datum));
|
||||
|
||||
// auto log_progress = [](const Block & block)
|
||||
// {
|
||||
// LOG_INFO(&Poco::Logger::get("AsynchronousInsertQueue"), "Flushed {} rows", block.rows());
|
||||
// };
|
||||
auto insert_context = Context::createCopy(global_context);
|
||||
insert_context->makeQueryContext();
|
||||
insert_context->setSettings(data->settings);
|
||||
|
||||
// for (const auto & datum : data->data)
|
||||
// in->appendBuffer(std::make_unique<ReadBufferFromString>(datum));
|
||||
// copyData(*in, *data->io.out, [] {return false;}, log_progress);
|
||||
InterpreterInsertQuery interpreter(data->query, std::move(read_buffers), insert_context);
|
||||
auto io = interpreter.execute();
|
||||
assert(io.pipeline.initialized());
|
||||
|
||||
data->io = BlockIO(); /// Release all potential table locks
|
||||
data->reset = true;
|
||||
auto log_progress = [&](const Progress & progress)
|
||||
{
|
||||
LOG_INFO(&Poco::Logger::get("AsynchronousInsertQueue"),
|
||||
"Flushed {} rows, {} bytes", progress.written_rows, progress.written_bytes);
|
||||
};
|
||||
|
||||
io.pipeline.setProgressCallback(log_progress);
|
||||
auto executor = io.pipeline.execute();
|
||||
executor->execute(io.pipeline.getNumThreads());
|
||||
|
||||
data->reset();
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
tryLogCurrentException("AsynchronousInsertQueue", __PRETTY_FUNCTION__);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -14,7 +14,7 @@ namespace DB
|
||||
class ASTInsertQuery;
|
||||
struct BlockIO;
|
||||
|
||||
class AsynchronousInsertQueue
|
||||
class AsynchronousInsertQueue : public WithContext
|
||||
{
|
||||
public:
|
||||
/// Using structure to allow and benefit from designated initialization and not mess with a positional arguments in ctor.
|
||||
@ -23,11 +23,10 @@ class AsynchronousInsertQueue
|
||||
std::chrono::seconds busy, stale;
|
||||
};
|
||||
|
||||
AsynchronousInsertQueue(size_t pool_size, size_t max_data_size, const Timeout & timeouts);
|
||||
AsynchronousInsertQueue(ContextPtr context_, size_t pool_size, size_t max_data_size, const Timeout & timeouts);
|
||||
~AsynchronousInsertQueue();
|
||||
|
||||
bool push(ASTInsertQuery * query, const Settings & settings);
|
||||
void push(ASTInsertQuery * query, BlockIO && io, const Settings & settings);
|
||||
void push(const ASTPtr & query, const Settings & settings);
|
||||
|
||||
private:
|
||||
struct InsertQuery
|
||||
@ -75,9 +74,7 @@ class AsynchronousInsertQueue
|
||||
void busyCheck();
|
||||
void staleCheck();
|
||||
|
||||
void pushImpl(ASTInsertQuery * query, QueueIterator & it); /// use only under lock
|
||||
|
||||
static void processData(std::shared_ptr<InsertData> data);
|
||||
static void processData(std::shared_ptr<InsertData> data, ContextPtr global_context);
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -4,7 +4,7 @@
|
||||
#include <DataStreams/AddingDefaultBlockOutputStream.h>
|
||||
#include <DataStreams/CheckConstraintsBlockOutputStream.h>
|
||||
#include <DataStreams/CountingBlockOutputStream.h>
|
||||
#include <Processors/Transforms/getSourceFromFromASTInsertQuery.h>
|
||||
#include <Processors/Transforms/getSourceFromASTInsertQuery.h>
|
||||
#include <DataStreams/PushingToViewsBlockOutputStream.h>
|
||||
#include <DataStreams/SquashingBlockOutputStream.h>
|
||||
#include <DataStreams/copyData.h>
|
||||
@ -44,8 +44,18 @@ namespace ErrorCodes
|
||||
|
||||
InterpreterInsertQuery::InterpreterInsertQuery(
|
||||
const ASTPtr & query_ptr_, ContextPtr context_, bool allow_materialized_, bool no_squash_, bool no_destination_)
|
||||
: InterpreterInsertQuery(query_ptr_, getReadBuffersFromASTInsertQuery(query_ptr_),
|
||||
context_, allow_materialized_, no_squash_, no_destination_)
|
||||
{
|
||||
}
|
||||
|
||||
InterpreterInsertQuery::InterpreterInsertQuery(
|
||||
const ASTPtr & query_ptr_, ReadBuffers read_buffers_,
|
||||
ContextPtr context_, bool allow_materialized_,
|
||||
bool no_squash_, bool no_destination_)
|
||||
: WithContext(context_)
|
||||
, query_ptr(query_ptr_)
|
||||
, read_buffers(std::move(read_buffers_))
|
||||
, allow_materialized(allow_materialized_)
|
||||
, no_squash(no_squash_)
|
||||
, no_destination(no_destination_)
|
||||
@ -353,7 +363,7 @@ BlockIO InterpreterInsertQuery::execute()
|
||||
}
|
||||
else if (!query.expectNativeData())
|
||||
{
|
||||
auto pipe = getSourceFromFromASTInsertQuery(query_ptr, nullptr, query_sample_block, getContext(), nullptr);
|
||||
auto pipe = getSourceFromASTInsertQuery(query_ptr, query_sample_block, std::move(read_buffers), getContext());
|
||||
res.pipeline.init(std::move(pipe));
|
||||
res.pipeline.resize(1);
|
||||
res.pipeline.setSinks([&](const Block &, Pipe::StreamType)
|
||||
|
@ -5,6 +5,7 @@
|
||||
#include <Interpreters/IInterpreter.h>
|
||||
#include <Parsers/ASTInsertQuery.h>
|
||||
#include <Storages/StorageInMemoryMetadata.h>
|
||||
#include <IO/ReadBuffer.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -14,6 +15,8 @@ namespace DB
|
||||
class InterpreterInsertQuery : public IInterpreter, WithContext
|
||||
{
|
||||
public:
|
||||
using ReadBuffers = std::vector<std::unique_ptr<ReadBuffer>>;
|
||||
|
||||
InterpreterInsertQuery(
|
||||
const ASTPtr & query_ptr_,
|
||||
ContextPtr context_,
|
||||
@ -21,6 +24,14 @@ public:
|
||||
bool no_squash_ = false,
|
||||
bool no_destination_ = false);
|
||||
|
||||
InterpreterInsertQuery(
|
||||
const ASTPtr & query_ptr_,
|
||||
ReadBuffers read_buffers_,
|
||||
ContextPtr context_,
|
||||
bool allow_materialized_ = false,
|
||||
bool no_squash_ = false,
|
||||
bool no_destination_ = false);
|
||||
|
||||
/** Prepare a request for execution. Return block streams
|
||||
* - the stream into which you can write data to execute the query, if INSERT;
|
||||
* - the stream from which you can read the result of the query, if SELECT and similar;
|
||||
@ -37,6 +48,7 @@ private:
|
||||
Block getSampleBlock(const ASTInsertQuery & query, const StoragePtr & table, const StorageMetadataPtr & metadata_snapshot) const;
|
||||
|
||||
ASTPtr query_ptr;
|
||||
ReadBuffers read_buffers;
|
||||
const bool allow_materialized;
|
||||
const bool no_squash;
|
||||
const bool no_destination;
|
||||
|
@ -12,7 +12,7 @@
|
||||
#include <DataStreams/BlockIO.h>
|
||||
#include <DataStreams/copyData.h>
|
||||
#include <DataStreams/IBlockInputStream.h>
|
||||
#include <Processors/Transforms/getSourceFromFromASTInsertQuery.h>
|
||||
#include <Processors/Transforms/getSourceFromASTInsertQuery.h>
|
||||
#include <DataStreams/CountingBlockOutputStream.h>
|
||||
|
||||
#include <Parsers/ASTIdentifier.h>
|
||||
@ -431,6 +431,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
|
||||
query_end = insert_query->data;
|
||||
else
|
||||
query_end = end;
|
||||
|
||||
insert_query->tail = istr;
|
||||
}
|
||||
else
|
||||
@ -521,8 +522,9 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
|
||||
StoragePtr storage = context->executeTableFunction(input_function);
|
||||
auto & input_storage = dynamic_cast<StorageInput &>(*storage);
|
||||
auto input_metadata_snapshot = input_storage.getInMemoryMetadataPtr();
|
||||
auto pipe = getSourceFromFromASTInsertQuery(
|
||||
ast, istr, input_metadata_snapshot->getSampleBlock(), context, input_function);
|
||||
auto read_buffers = getReadBuffersFromASTInsertQuery(ast);
|
||||
auto pipe = getSourceFromASTInsertQuery(
|
||||
ast, input_metadata_snapshot->getSampleBlock(), std::move(read_buffers), context, input_function);
|
||||
input_storage.setPipe(std::move(pipe));
|
||||
}
|
||||
}
|
||||
@ -531,6 +533,18 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
|
||||
/// reset Input callbacks if query is not INSERT SELECT
|
||||
context->resetInputCallbacks();
|
||||
|
||||
auto * queue = context->getAsynchronousInsertQueue();
|
||||
const bool async_insert
|
||||
= queue && insert_query && !insert_query->select && !insert_query->expectNativeData() && settings.async_insert_mode;
|
||||
|
||||
if (async_insert)
|
||||
{
|
||||
/// Shortcut for already processed similar insert-queries.
|
||||
/// Similarity is defined by hashing query text and some settings.
|
||||
queue->push(ast, settings);
|
||||
return std::make_tuple(ast, BlockIO());
|
||||
}
|
||||
|
||||
auto interpreter = InterpreterFactory::get(ast, context, SelectQueryOptions(stage).setInternal(internal));
|
||||
|
||||
std::shared_ptr<const EnabledQuota> quota;
|
||||
@ -559,17 +573,6 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
|
||||
limits.size_limits = SizeLimits(settings.max_result_rows, settings.max_result_bytes, settings.result_overflow_mode);
|
||||
}
|
||||
|
||||
auto * queue = context->getAsynchronousInsertQueue();
|
||||
const bool async_insert
|
||||
= queue && insert_query && !insert_query->select && !insert_query->expectNativeData() && settings.async_insert_mode;
|
||||
|
||||
if (async_insert && queue->push(insert_query, settings))
|
||||
{
|
||||
/// Shortcut for already processed similar insert-queries.
|
||||
/// Similarity is defined by hashing query text and some settings.
|
||||
return std::make_tuple(ast, BlockIO());
|
||||
}
|
||||
|
||||
{
|
||||
OpenTelemetrySpanHolder span("IInterpreter::execute()");
|
||||
res = interpreter->execute();
|
||||
@ -897,12 +900,6 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
|
||||
res.finish_callback = std::move(finish_callback);
|
||||
res.exception_callback = std::move(exception_callback);
|
||||
}
|
||||
|
||||
if (async_insert)
|
||||
{
|
||||
queue->push(insert_query, std::move(res), settings);
|
||||
return std::make_tuple(ast, BlockIO());
|
||||
}
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
@ -1008,7 +1005,8 @@ void executeQuery(
|
||||
{
|
||||
if (streams.out)
|
||||
{
|
||||
auto pipe = getSourceFromFromASTInsertQuery(ast, &istr, streams.out->getHeader(), context, nullptr);
|
||||
auto read_buffers = getReadBuffersFromASTInsertQuery(ast);
|
||||
auto pipe = getSourceFromASTInsertQuery(ast, streams.out->getHeader(), std::move(read_buffers), context);
|
||||
|
||||
pipeline.init(std::move(pipe));
|
||||
pipeline.resize(1);
|
||||
|
@ -29,10 +29,7 @@ public:
|
||||
/// Data from buffer to insert after inlined one - may be nullptr.
|
||||
ReadBuffer * tail = nullptr;
|
||||
|
||||
bool expectNativeData() const
|
||||
{
|
||||
return !data && !tail;
|
||||
}
|
||||
bool expectNativeData() const { return !data && !tail; }
|
||||
|
||||
/// Try to find table function input() in SELECT part
|
||||
void tryFindInputFunction(ASTPtr & input_function) const;
|
||||
|
@ -5,7 +5,7 @@
|
||||
#include <IO/ConcatReadBuffer.h>
|
||||
#include <IO/ReadBufferFromMemory.h>
|
||||
#include <DataStreams/BlockIO.h>
|
||||
#include <Processors/Transforms/getSourceFromFromASTInsertQuery.h>
|
||||
#include <Processors/Transforms/getSourceFromASTInsertQuery.h>
|
||||
#include <Processors/Transforms/AddingDefaultsTransform.h>
|
||||
#include <Storages/ColumnsDescription.h>
|
||||
#include <Storages/IStorage.h>
|
||||
@ -22,18 +22,19 @@ namespace ErrorCodes
|
||||
extern const int INVALID_USAGE_OF_INPUT;
|
||||
}
|
||||
|
||||
|
||||
Pipe getSourceFromFromASTInsertQuery(
|
||||
Pipe getSourceFromASTInsertQuery(
|
||||
const ASTPtr & ast,
|
||||
ReadBuffer * input_buffer_tail_part,
|
||||
const Block & header,
|
||||
ReadBuffers read_buffers,
|
||||
ContextPtr context,
|
||||
const ASTPtr & input_function)
|
||||
{
|
||||
const auto * ast_insert_query = ast->as<ASTInsertQuery>();
|
||||
|
||||
if (!ast_insert_query)
|
||||
throw Exception("Logical error: query requires data to insert, but it is not INSERT query", ErrorCodes::LOGICAL_ERROR);
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Logical error: query requires data to insert, but it is not INSERT query");
|
||||
|
||||
if (read_buffers.empty())
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Required at least one read buffer to create source from ASTInsertQuery");
|
||||
|
||||
String format = ast_insert_query->format;
|
||||
if (format.empty())
|
||||
@ -43,25 +44,14 @@ Pipe getSourceFromFromASTInsertQuery(
|
||||
format = "Values";
|
||||
}
|
||||
|
||||
/// Data could be in parsed (ast_insert_query.data) and in not parsed yet (input_buffer_tail_part) part of query.
|
||||
auto input_buffer = std::make_unique<ConcatReadBuffer>(std::move(read_buffers));
|
||||
auto source = FormatFactory::instance().getInput(
|
||||
format, *input_buffer, header,
|
||||
context, context->getSettings().max_insert_block_size);
|
||||
|
||||
auto input_buffer_ast_part = std::make_unique<ReadBufferFromMemory>(
|
||||
ast_insert_query->data, ast_insert_query->data ? ast_insert_query->end - ast_insert_query->data : 0);
|
||||
source->addBuffer(std::move(input_buffer));
|
||||
|
||||
auto input_buffer_contacenated = std::make_unique<ConcatReadBuffer>();
|
||||
if (ast_insert_query->data)
|
||||
input_buffer_contacenated->appendBuffer(std::move(input_buffer_ast_part));
|
||||
|
||||
if (input_buffer_tail_part)
|
||||
input_buffer_contacenated->appendBuffer(wrapReadBufferReference(*input_buffer_tail_part));
|
||||
|
||||
/** NOTE: Must not read from 'input_buffer_tail_part' before read all between 'ast_insert_query.data' and 'ast_insert_query.end'.
|
||||
* - because 'query.data' could refer to memory piece, used as buffer for 'input_buffer_tail_part'.
|
||||
*/
|
||||
|
||||
auto source = FormatFactory::instance().getInput(format, *input_buffer_contacenated, header, context, context->getSettings().max_insert_block_size);
|
||||
Pipe pipe(source);
|
||||
|
||||
if (context->getSettingsRef().input_format_defaults_for_omitted_fields && ast_insert_query->table_id && !input_function)
|
||||
{
|
||||
StoragePtr storage = DatabaseCatalog::instance().getTable(ast_insert_query->table_id, context);
|
||||
@ -76,10 +66,29 @@ Pipe getSourceFromFromASTInsertQuery(
|
||||
}
|
||||
}
|
||||
|
||||
source->addBuffer(std::move(input_buffer_ast_part));
|
||||
source->addBuffer(std::move(input_buffer_contacenated));
|
||||
|
||||
return pipe;
|
||||
}
|
||||
|
||||
ReadBuffers getReadBuffersFromASTInsertQuery(const ASTPtr & ast)
|
||||
{
|
||||
const auto * insert_query = ast->as<ASTInsertQuery>();
|
||||
if (!insert_query)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Logical error: query requires data to insert, but it is not INSERT query");
|
||||
|
||||
ReadBuffers buffers;
|
||||
if (insert_query->data)
|
||||
{
|
||||
/// Data could be in parsed (ast_insert_query.data) and in not parsed yet (input_buffer_tail_part) part of query.
|
||||
auto ast_buffer = std::make_unique<ReadBufferFromMemory>(
|
||||
insert_query->data, insert_query->end - insert_query->data);
|
||||
|
||||
buffers.emplace_back(std::move(ast_buffer));
|
||||
}
|
||||
|
||||
if (insert_query->tail)
|
||||
buffers.emplace_back(wrapReadBufferReference(*insert_query->tail));
|
||||
|
||||
return buffers;
|
||||
}
|
||||
|
||||
}
|
@ -5,22 +5,24 @@
|
||||
#include <cstddef>
|
||||
#include <memory>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class ReadBuffer;
|
||||
class ASTInsertQuery;
|
||||
using ReadBuffers = std::vector<std::unique_ptr<ReadBuffer>>;
|
||||
|
||||
/** Prepares a pipe which produce data containing in INSERT query
|
||||
* Head of inserting data could be stored in INSERT ast directly
|
||||
* Remaining (tail) data could be stored in input_buffer_tail_part
|
||||
*/
|
||||
|
||||
class Pipe;
|
||||
|
||||
Pipe getSourceFromFromASTInsertQuery(
|
||||
Pipe getSourceFromASTInsertQuery(
|
||||
const ASTPtr & ast,
|
||||
ReadBuffer * input_buffer_tail_part,
|
||||
const Block & header,
|
||||
ReadBuffers read_buffers,
|
||||
ContextPtr context,
|
||||
const ASTPtr & input_function);
|
||||
const ASTPtr & input_function = nullptr);
|
||||
|
||||
ReadBuffers getReadBuffersFromASTInsertQuery(const ASTPtr & ast);
|
||||
|
||||
}
|
@ -178,7 +178,7 @@ SRCS(
|
||||
Transforms/SortingTransform.cpp
|
||||
Transforms/TotalsHavingTransform.cpp
|
||||
Transforms/WindowTransform.cpp
|
||||
Transforms/getSourceFromFromASTInsertQuery.cpp
|
||||
Transforms/getSourceFromASTInsertQuery.cpp
|
||||
printPipeline.cpp
|
||||
|
||||
)
|
||||
|
Loading…
Reference in New Issue
Block a user