From b726845be295488cd241627c879d73fac61ccf0e Mon Sep 17 00:00:00 2001 From: Ivan Lezhankin Date: Thu, 4 Mar 2021 14:10:21 +0300 Subject: [PATCH] [WIP] --- src/Common/HashTable/Hash.h | 6 +- src/Common/RWLock.h | 2 - src/Core/Settings.h | 3 +- src/DataStreams/BlockIO.h | 2 +- .../InputStreamFromASTInsertQuery.cpp | 56 ++++---- .../InputStreamFromASTInsertQuery.h | 19 ++- src/IO/AsynchronousInsertionQueue.cpp | 132 ++++++++++++++++++ src/IO/AsynchronousInsertionQueue.h | 54 +++++++ src/IO/ConcatReadBuffer.h | 37 +++-- src/IO/ReadBuffer.h | 32 +++++ .../gtest_cascade_and_memory_write_buffer.cpp | 7 +- .../gtest_mysql_binlog_event_read_buffer.cpp | 15 +- src/IO/tests/gtest_peekable_read_buffer.cpp | 10 +- src/Interpreters/Context.h | 5 +- src/Interpreters/InterpreterInsertQuery.cpp | 2 +- src/Interpreters/executeQuery.cpp | 36 ++++- src/Parsers/ASTInsertQuery.h | 6 +- src/Server/HTTPHandler.cpp | 10 +- src/Storages/MergeTree/MergeTreeData.cpp | 8 +- 19 files changed, 347 insertions(+), 95 deletions(-) create mode 100644 src/IO/AsynchronousInsertionQueue.cpp create mode 100644 src/IO/AsynchronousInsertionQueue.h diff --git a/src/Common/HashTable/Hash.h b/src/Common/HashTable/Hash.h index ef20b70917d..5ef476c6669 100644 --- a/src/Common/HashTable/Hash.h +++ b/src/Common/HashTable/Hash.h @@ -75,7 +75,7 @@ template inline typename std::enable_if<(sizeof(T) > sizeof(DB::UInt64)), DB::UInt64>::type intHashCRC32(const T & x, DB::UInt64 updated_value) { - auto * begin = reinterpret_cast(&x); + const auto * begin = reinterpret_cast(&x); for (size_t i = 0; i < sizeof(T); i += sizeof(UInt64)) { updated_value = intHashCRC32(unalignedLoad(begin), updated_value); @@ -93,8 +93,8 @@ inline UInt32 updateWeakHash32(const DB::UInt8 * pos, size_t size, DB::UInt32 up DB::UInt64 value = 0; auto * value_ptr = reinterpret_cast(&value); - typedef __attribute__((__aligned__(1))) uint16_t uint16_unaligned_t; - typedef __attribute__((__aligned__(1))) uint32_t uint32_unaligned_t; + using uint16_unaligned_t = __attribute__((__aligned__(1))) uint16_t; + using uint32_unaligned_t = __attribute__((__aligned__(1))) uint32_t; /// Adopted code from FastMemcpy.h (memcpy_tiny) switch (size) diff --git a/src/Common/RWLock.h b/src/Common/RWLock.h index 952c8049a0f..1e4f9b9bd5e 100644 --- a/src/Common/RWLock.h +++ b/src/Common/RWLock.h @@ -74,7 +74,6 @@ private: using GroupsContainer = std::list; using OwnerQueryIds = std::unordered_map; -private: mutable std::mutex internal_state_mtx; GroupsContainer readers_queue; @@ -85,7 +84,6 @@ private: /// or writers_queue.end() otherwise OwnerQueryIds owner_queries; -private: RWLockImpl() = default; void unlock(GroupsContainer::iterator group_it, const String & query_id) noexcept; void dropOwnerGroupAndPassOwnership(GroupsContainer::iterator group_it) noexcept; diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 9bb9ad30f15..05e8db3ade0 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -503,6 +503,7 @@ class IColumn; M(Bool, output_format_write_statistics, true, "Write statistics about read rows, bytes, time elapsed in suitable output formats.", 0) \ M(Bool, output_format_pretty_row_numbers, false, "Add row numbers before each row for pretty output format", 0) \ M(Bool, insert_distributed_one_random_shard, false, "If setting is enabled, inserting into distributed table will choose a random shard to write when there is no sharding key", 0) \ + M(Bool, asynchronous_insert_mode, false, "Insert query is processed almost instantly, but an actual data queued for later asynchronous insertion", 0) \ // End of FORMAT_FACTORY_SETTINGS @@ -522,7 +523,7 @@ struct Settings : public BaseSettings { /// For initialization from empty initializer-list to be "value initialization", not "aggregate initialization" in C++14. /// http://en.cppreference.com/w/cpp/language/aggregate_initialization - Settings() {} + Settings() = default; /** Set multiple settings from "profile" (in server configuration file (users.xml), profiles contain groups of multiple settings). * The profile can also be set using the `set` functions, like the profile setting. diff --git a/src/DataStreams/BlockIO.h b/src/DataStreams/BlockIO.h index 91d7efac8d1..31a0e1020d2 100644 --- a/src/DataStreams/BlockIO.h +++ b/src/DataStreams/BlockIO.h @@ -50,7 +50,7 @@ struct BlockIO } } - void onException() + void onException() const { if (exception_callback) exception_callback(); diff --git a/src/DataStreams/InputStreamFromASTInsertQuery.cpp b/src/DataStreams/InputStreamFromASTInsertQuery.cpp index 70d69227ac0..be9aac6a8ca 100644 --- a/src/DataStreams/InputStreamFromASTInsertQuery.cpp +++ b/src/DataStreams/InputStreamFromASTInsertQuery.cpp @@ -1,11 +1,12 @@ -#include -#include -#include +#include + +#include +#include #include #include -#include -#include -#include +#include +#include +#include #include #include @@ -22,7 +23,6 @@ namespace ErrorCodes InputStreamFromASTInsertQuery::InputStreamFromASTInsertQuery( const ASTPtr & ast, - ReadBuffer * input_buffer_tail_part, const Block & header, const Context & context, const ASTPtr & input_function) @@ -40,25 +40,7 @@ InputStreamFromASTInsertQuery::InputStreamFromASTInsertQuery( format = "Values"; } - /// Data could be in parsed (ast_insert_query.data) and in not parsed yet (input_buffer_tail_part) part of query. - - input_buffer_ast_part = std::make_unique( - ast_insert_query->data, ast_insert_query->data ? ast_insert_query->end - ast_insert_query->data : 0); - - ConcatReadBuffer::ReadBuffers buffers; - if (ast_insert_query->data) - buffers.push_back(input_buffer_ast_part.get()); - - if (input_buffer_tail_part) - buffers.push_back(input_buffer_tail_part); - - /** NOTE Must not read from 'input_buffer_tail_part' before read all between 'ast_insert_query.data' and 'ast_insert_query.end'. - * - because 'query.data' could refer to memory piece, used as buffer for 'input_buffer_tail_part'. - */ - - input_buffer_contacenated = std::make_unique(buffers); - - res_stream = context.getInputFormat(format, *input_buffer_contacenated, header, context.getSettings().max_insert_block_size); + res_stream = context.getInputFormat(format, input_buffer, header, context.getSettings().max_insert_block_size); if (context.getSettingsRef().input_format_defaults_for_omitted_fields && ast_insert_query->table_id && !input_function) { @@ -70,4 +52,26 @@ InputStreamFromASTInsertQuery::InputStreamFromASTInsertQuery( } } +InputStreamFromASTInsertQuery::InputStreamFromASTInsertQuery( + const ASTPtr & ast, + ReadBuffer & tail, + const Block & header, + const Context & context, + const ASTPtr & input_function) : InputStreamFromASTInsertQuery(ast, header, context, input_function) +{ + /// Data could be in parsed (ast_insert_query.data) and in not parsed yet (input_buffer_tail_part) part of query. + const auto * ast_insert_query = ast->as(); + + auto ast_buffer = std::make_unique( + ast_insert_query->data, ast_insert_query->data ? ast_insert_query->end - ast_insert_query->data : 0); + + if (ast_insert_query->data) + input_buffer.appendBuffer(std::move(ast_buffer)); + + input_buffer.appendBuffer(wrapReadBufferReference(tail)); + + /// NOTE: Must not read from 'tail' before read all between 'ast_insert_query.data' and 'ast_insert_query.end'. + /// because 'query.data' could refer to memory piece, used as buffer for 'tail'. +} + } diff --git a/src/DataStreams/InputStreamFromASTInsertQuery.h b/src/DataStreams/InputStreamFromASTInsertQuery.h index d4c6443c77d..9175d1be749 100644 --- a/src/DataStreams/InputStreamFromASTInsertQuery.h +++ b/src/DataStreams/InputStreamFromASTInsertQuery.h @@ -1,7 +1,9 @@ #pragma once -#include #include +#include +#include + #include #include @@ -9,10 +11,7 @@ namespace DB { -struct BlockIO; class Context; -struct StorageInMemoryMetadata; -using StorageMetadataPtr = std::shared_ptr; /** Prepares an input stream which produce data containing in INSERT query * Head of inserting data could be stored in INSERT ast directly @@ -21,12 +20,9 @@ using StorageMetadataPtr = std::shared_ptr; class InputStreamFromASTInsertQuery : public IBlockInputStream { public: + InputStreamFromASTInsertQuery(const ASTPtr & ast, const Block & header, const Context & context, const ASTPtr & input_function); InputStreamFromASTInsertQuery( - const ASTPtr & ast, - ReadBuffer * input_buffer_tail_part, - const Block & header, - const Context & context, - const ASTPtr & input_function); + const ASTPtr & ast, ReadBuffer & tail, const Block & header, const Context & context, const ASTPtr & input_function); Block readImpl() override { return res_stream->read(); } void readPrefixImpl() override { return res_stream->readPrefix(); } @@ -36,9 +32,10 @@ public: Block getHeader() const override { return res_stream->getHeader(); } + void appendBuffer(std::unique_ptr buffer) { input_buffer.appendBuffer(std::move(buffer)); } + private: - std::unique_ptr input_buffer_ast_part; - std::unique_ptr input_buffer_contacenated; + ConcatReadBuffer input_buffer; BlockInputStreamPtr res_stream; }; diff --git a/src/IO/AsynchronousInsertionQueue.cpp b/src/IO/AsynchronousInsertionQueue.cpp new file mode 100644 index 00000000000..16478319f26 --- /dev/null +++ b/src/IO/AsynchronousInsertionQueue.cpp @@ -0,0 +1,132 @@ +#include + +#include +#include +#include +#include +#include +#include +#include +#include + + +namespace DB +{ + +struct AsynchronousInsertQueue::InsertQuery +{ + ASTPtr query; + Settings settings; +}; + +struct AsynchronousInsertQueue::InsertData +{ + std::mutex mutex; + std::list data; + size_t size = 0; + std::chrono::time_point first_update, last_update; + BlockIO io; +}; + +std::size_t AsynchronousInsertQueue::InsertQueryHash::operator() (const InsertQuery & query) const +{ + const auto * insert_query = query.query->as(); + std::size_t hash = 0; + + hash ^= std::hash()(insert_query->table_id.getFullTableName()); + hash ^= std::hash()(insert_query->format); + // TODO: insert_query->columns + // TODO: insert_query->table_function + // TODO: insert_query->settings_ast + + // TODO: some of query.settings + + return hash; +} + +bool AsynchronousInsertQueue::InsertQueryEquality::operator() (const InsertQuery & query1, const InsertQuery & query2) const +{ + const auto * insert_query1 = query1.query->as(); + const auto * insert_query2 = query2.query->as(); + + if (insert_query1->table_id != insert_query2->table_id) + return false; + if (insert_query1->format != insert_query2->format) + return false; + // TODO: same fields as in InsertQueryHash. + + return true; +} + +AsynchronousInsertQueue::AsynchronousInsertQueue(size_t pool_size, size_t max_data_size_) : max_data_size(max_data_size_), pool(pool_size) +{ +} + +bool AsynchronousInsertQueue::push(ASTInsertQuery * query, const Settings & settings) +{ + auto read_lock = lock->getLock(RWLockImpl::Read, String()); + + auto it = queue.find(InsertQuery{query->shared_from_this(), settings}); + if (it != queue.end()) + { + pushImpl(query, it); + return true; + } + + return false; +} + +void AsynchronousInsertQueue::push(ASTInsertQuery * query, BlockIO && io, const Settings & settings) +{ + auto write_lock = lock->getLock(RWLockImpl::Write, String()); + + auto it = queue.find(InsertQuery{query->shared_from_this(), settings}); + if (it == queue.end()) + { + InsertQuery key{query->shared_from_this(), settings}; + it = queue.insert({key, std::make_shared()}).first; + it->second->io = std::move(io); + it->second->first_update = std::chrono::steady_clock::now(); + } + + pushImpl(query, it); +} + +void AsynchronousInsertQueue::pushImpl(ASTInsertQuery * query, QueueIterator & it) +{ + ConcatReadBuffer::Buffers buffers; + + auto ast_buf = std::make_unique(query->data, query->data ? query->end - query->data : 0); + if (query->data) + buffers.push_back(std::move(ast_buf)); + + if (query->tail) + buffers.push_back(wrapReadBufferReference(*query->tail)); + + /// NOTE: must not read from |query->tail| before read all between |query->data| and |query->end|. + + ConcatReadBuffer concat_buf(std::move(buffers)); + + std::unique_lock data_lock(it->second->mutex); + + /// It's important to read the whole data per query as a single chunk, so we can safely drop it in case of parsing failure. + auto & new_data = it->second->data.emplace_back(); + new_data.reserve(concat_buf.totalSize()); + WriteBufferFromString write_buf(new_data); + + copyData(concat_buf, write_buf); + it->second->size += concat_buf.count(); + it->second->last_update = std::chrono::steady_clock::now(); + + if (it->second->size > max_data_size) + /// Since we're under lock here it's safe to pass-by-copy the shared_ptr + /// without a race with the cleanup thread, which may reset last shared_ptr instance. + pool.scheduleOrThrowOnError([this, data = it->second] { processData(data); }); +} + +void AsynchronousInsertQueue::processData(std::shared_ptr data) +{ + data->first_update = std::chrono::steady_clock::now(); +} + +} diff --git a/src/IO/AsynchronousInsertionQueue.h b/src/IO/AsynchronousInsertionQueue.h new file mode 100644 index 00000000000..35f89eee85c --- /dev/null +++ b/src/IO/AsynchronousInsertionQueue.h @@ -0,0 +1,54 @@ +#pragma once + +#include +#include +#include + +#include + + +namespace DB +{ + +class ASTInsertQuery; +struct BlockIO; +struct Settings; + +class AsynchronousInsertQueue +{ + public: + AsynchronousInsertQueue(size_t pool_size, size_t max_data_size); + + bool push(ASTInsertQuery * query, const Settings & settings); + void push(ASTInsertQuery * query, BlockIO && io, const Settings & settings); + + private: + struct InsertQuery; + struct InsertData; + + struct InsertQueryHash + { + std::size_t operator () (const InsertQuery &) const; + }; + + struct InsertQueryEquality + { + bool operator () (const InsertQuery &, const InsertQuery &) const; + }; + + using Queue = std::unordered_map, InsertQueryHash, InsertQueryEquality>; + using QueueIterator = Queue::iterator; + + const size_t max_data_size; + + RWLock lock; + Queue queue; + + ThreadPool pool; + /// TODO: ThreadFromGlobalPool remove_empty_thread, check_access_thread; + + void pushImpl(ASTInsertQuery * query, QueueIterator & it); /// use only under lock + void processData(std::shared_ptr data); +}; + +} diff --git a/src/IO/ConcatReadBuffer.h b/src/IO/ConcatReadBuffer.h index c416b0fd892..88739515be2 100644 --- a/src/IO/ConcatReadBuffer.h +++ b/src/IO/ConcatReadBuffer.h @@ -8,16 +8,35 @@ namespace DB { -/** Reads from the concatenation of multiple ReadBuffers - */ +/// Reads from the concatenation of multiple ReadBuffer's class ConcatReadBuffer : public ReadBuffer { public: - using ReadBuffers = std::vector; + using Buffers = std::vector>; + + ConcatReadBuffer() : ReadBuffer(nullptr, 0) + { + } + + explicit ConcatReadBuffer(Buffers && buffers_) : ReadBuffer(nullptr, 0), buffers(std::move(buffers_)), current(buffers.begin()) + { + assert(!buffers.empty()); + } + + ConcatReadBuffer(ReadBuffer & buf1, ReadBuffer & buf2) : ConcatReadBuffer() + { + appendBuffer(wrapReadBufferReference(buf1)); + appendBuffer(wrapReadBufferReference(buf2)); + } + + void appendBuffer(std::unique_ptr buffer) + { + buffers.push_back(std::move(buffer)); + } protected: - ReadBuffers buffers; - ReadBuffers::iterator current; + Buffers buffers; + Buffers::iterator current; bool nextImpl() override { @@ -54,14 +73,6 @@ protected: working_buffer = Buffer((*current)->position(), (*current)->buffer().end()); return true; } - -public: - explicit ConcatReadBuffer(const ReadBuffers & buffers_) : ReadBuffer(nullptr, 0), buffers(buffers_), current(buffers.begin()) - { - assert(!buffers.empty()); - } - - ConcatReadBuffer(ReadBuffer & buf1, ReadBuffer & buf2) : ConcatReadBuffer({&buf1, &buf2}) {} }; } diff --git a/src/IO/ReadBuffer.h b/src/IO/ReadBuffer.h index e3166ba8180..7b1913bf01c 100644 --- a/src/IO/ReadBuffer.h +++ b/src/IO/ReadBuffer.h @@ -197,6 +197,10 @@ public: return read(to, n); } + /// Returns total size of underlying object read by this buffer. May be helpful for a full allocations + /// before reading. Doesn't change after reading. Returns 0 if total size is unknown. + virtual size_t totalSize() const { return 0; } + protected: /// The number of bytes to ignore from the initial position of `working_buffer` /// buffer. Apparently this is an additional out-parameter for nextImpl(), @@ -253,5 +257,33 @@ inline std::unique_ptr wrapReadBufferReference(ReadBuffer & buf) return std::make_unique(buf); } +inline std::unique_ptr wrapReadBufferPointer(ReadBufferPtr ptr) +{ + class ReadBufferWrapper : public ReadBuffer + { + public: + explicit ReadBufferWrapper(ReadBufferPtr ptr_) : ReadBuffer(ptr_->position(), 0), ptr(ptr_) + { + working_buffer = Buffer(ptr->position(), ptr->buffer().end()); + } + + private: + ReadBufferPtr ptr; + + bool nextImpl() override + { + ptr->position() = position(); + + if (!ptr->next()) + return false; + + working_buffer = ptr->buffer(); + + return true; + } + }; + + return std::make_unique(ptr); +} } diff --git a/src/IO/tests/gtest_cascade_and_memory_write_buffer.cpp b/src/IO/tests/gtest_cascade_and_memory_write_buffer.cpp index 4936307a5e3..1d71c358ca3 100644 --- a/src/IO/tests/gtest_cascade_and_memory_write_buffer.cpp +++ b/src/IO/tests/gtest_cascade_and_memory_write_buffer.cpp @@ -32,8 +32,7 @@ static void testCascadeBufferRedability( EXPECT_EQ(cascade.count(), data.size()); std::vector write_buffers; - std::vector read_buffers; - std::vector read_buffers_raw; + ConcatReadBuffer concat; cascade.getResultBuffers(write_buffers); for (WriteBufferPtr & wbuf : write_buffers) @@ -47,11 +46,9 @@ static void testCascadeBufferRedability( auto rbuf = wbuf_readable->tryGetReadBuffer(); ASSERT_FALSE(!rbuf); - read_buffers.emplace_back(rbuf); - read_buffers_raw.emplace_back(rbuf.get()); + concat.appendBuffer(wrapReadBufferPointer(rbuf)); } - ConcatReadBuffer concat(read_buffers_raw); std::string decoded_data; { WriteBufferFromString decoded_data_writer(decoded_data); diff --git a/src/IO/tests/gtest_mysql_binlog_event_read_buffer.cpp b/src/IO/tests/gtest_mysql_binlog_event_read_buffer.cpp index 536e5a89ca9..6c4d7436e52 100644 --- a/src/IO/tests/gtest_mysql_binlog_event_read_buffer.cpp +++ b/src/IO/tests/gtest_mysql_binlog_event_read_buffer.cpp @@ -33,19 +33,16 @@ TEST(MySQLBinlogEventReadBuffer, NiceBufferSize) TEST(MySQLBinlogEventReadBuffer, BadBufferSizes) { char res[4]; - std::vector buffers; - std::vector nested_buffers; + ConcatReadBuffer concat_buffer; std::vector>> memory_buffers_data; std::vector bad_buffers_size = {2, 1, 2, 3}; for (const auto & bad_buffer_size : bad_buffers_size) { memory_buffers_data.emplace_back(std::make_shared>(bad_buffer_size, 0x01)); - buffers.emplace_back(std::make_shared(memory_buffers_data.back()->data(), bad_buffer_size)); - nested_buffers.emplace_back(buffers.back().get()); + concat_buffer.appendBuffer(std::make_unique(memory_buffers_data.back()->data(), bad_buffer_size)); } - ConcatReadBuffer concat_buffer(nested_buffers); MySQLBinlogEventReadBuffer binlog_in(concat_buffer, 4); binlog_in.readStrict(res, 4); @@ -58,19 +55,17 @@ TEST(MySQLBinlogEventReadBuffer, BadBufferSizes) TEST(MySQLBinlogEventReadBuffer, NiceAndBadBufferSizes) { char res[12]; - std::vector buffers; - std::vector nested_buffers; + ConcatReadBuffer::Buffers nested_buffers; std::vector>> memory_buffers_data; std::vector buffers_size = {6, 1, 3, 6}; for (const auto & bad_buffer_size : buffers_size) { memory_buffers_data.emplace_back(std::make_shared>(bad_buffer_size, 0x01)); - buffers.emplace_back(std::make_shared(memory_buffers_data.back()->data(), bad_buffer_size)); - nested_buffers.emplace_back(buffers.back().get()); + nested_buffers.emplace_back(std::make_unique(memory_buffers_data.back()->data(), bad_buffer_size)); } - ConcatReadBuffer concat_buffer(nested_buffers); + ConcatReadBuffer concat_buffer(std::move(nested_buffers)); MySQLBinlogEventReadBuffer binlog_in(concat_buffer, 4); binlog_in.readStrict(res, 12); diff --git a/src/IO/tests/gtest_peekable_read_buffer.cpp b/src/IO/tests/gtest_peekable_read_buffer.cpp index 8c491338bd3..b635c0152e4 100644 --- a/src/IO/tests/gtest_peekable_read_buffer.cpp +++ b/src/IO/tests/gtest_peekable_read_buffer.cpp @@ -33,12 +33,12 @@ try std::string s2 = "qwertyuiop"; std::string s3 = "asdfghjkl;"; std::string s4 = "zxcvbnm,./"; - DB::ReadBufferFromString b1(s1); - DB::ReadBufferFromString b2(s2); - DB::ReadBufferFromString b3(s3); - DB::ReadBufferFromString b4(s4); - DB::ConcatReadBuffer concat({&b1, &b2, &b3, &b4}); + DB::ConcatReadBuffer concat; + concat.appendBuffer(std::make_unique(s1)); + concat.appendBuffer(std::make_unique(s2)); + concat.appendBuffer(std::make_unique(s3)); + concat.appendBuffer(std::make_unique(s4)); DB::PeekableReadBuffer peekable(concat, 0); ASSERT_TRUE(!peekable.eof()); diff --git a/src/Interpreters/Context.h b/src/Interpreters/Context.h index be53a4364e0..5417a1544fe 100644 --- a/src/Interpreters/Context.h +++ b/src/Interpreters/Context.h @@ -117,11 +117,12 @@ using VolumePtr = std::shared_ptr; struct NamedSession; struct BackgroundTaskSchedulingSettings; - #if USE_EMBEDDED_COMPILER class CompiledExpressionCache; #endif +class AsynchronousInsertQueue; + /// Callback for external tables initializer using ExternalTablesInitializer = std::function; @@ -748,6 +749,8 @@ public: PartUUIDsPtr getPartUUIDs(); PartUUIDsPtr getIgnoredPartUUIDs(); + + AsynchronousInsertQueue & getAsynchronousInsertQueue(); private: std::unique_lock getLock() const; diff --git a/src/Interpreters/InterpreterInsertQuery.cpp b/src/Interpreters/InterpreterInsertQuery.cpp index 6a5f00d230c..7239620519d 100644 --- a/src/Interpreters/InterpreterInsertQuery.cpp +++ b/src/Interpreters/InterpreterInsertQuery.cpp @@ -403,7 +403,7 @@ BlockIO InterpreterInsertQuery::execute() } } else - res.in = std::make_shared(query_ptr, query.tail, query_sample_block, context, nullptr); + res.in = std::make_shared(query_ptr, query_sample_block, context, nullptr); if (!out_streams.empty()) { diff --git a/src/Interpreters/executeQuery.cpp b/src/Interpreters/executeQuery.cpp index 42bcafd1fb8..2554e3261f1 100644 --- a/src/Interpreters/executeQuery.cpp +++ b/src/Interpreters/executeQuery.cpp @@ -3,6 +3,7 @@ #include #include +#include #include #include #include @@ -325,7 +326,7 @@ static void onExceptionBeforeStart(const String & query_for_logging, Context & c static void setQuerySpecificSettings(ASTPtr & ast, Context & context) { - if (auto * ast_insert_into = dynamic_cast(ast.get())) + if (auto * ast_insert_into = ast->as()) { if (ast_insert_into->watch) context.setSetting("output_format_enable_streaming", 1); @@ -464,8 +465,8 @@ static std::tuple executeQueryImpl( query = serializeAST(*ast); } - /// MUST goes before any modification (except for prepared statements, - /// since it substitute parameters and w/o them query does not contains + /// MUST go before any modification (except for prepared statements, + /// since it substitute parameters and w/o them query does not contain /// parameters), to keep query as-is in query_log and server log. query_for_logging = prepareQueryForLogging(query, context); logQuery(query_for_logging, context, internal); @@ -506,7 +507,7 @@ static std::tuple executeQueryImpl( auto & input_storage = dynamic_cast(*storage); auto input_metadata_snapshot = input_storage.getInMemoryMetadataPtr(); BlockInputStreamPtr input_stream = std::make_shared( - ast, istr, input_metadata_snapshot->getSampleBlock(), context, input_function); + ast, *istr, input_metadata_snapshot->getSampleBlock(), context, input_function); input_storage.setInputStream(input_stream); } } @@ -543,6 +544,16 @@ static std::tuple executeQueryImpl( limits.size_limits = SizeLimits(settings.max_result_rows, settings.max_result_bytes, settings.result_overflow_mode); } + const bool async_insert = insert_query && !insert_query->select && settings.asynchronous_insert_mode; + auto & queue = context.getAsynchronousInsertQueue(); + + if (async_insert && queue.push(insert_query, settings)) + { + /// Shortcut for already processed similar insert-queries. + /// Similarity is defined by hashing query text and some settings. + return std::make_tuple(ast, BlockIO()); + } + { OpenTelemetrySpanHolder span("IInterpreter::execute()"); res = interpreter->execute(); @@ -876,6 +887,23 @@ static std::tuple executeQueryImpl( LOG_DEBUG(&Poco::Logger::get("executeQuery"), "Query pipeline:\n{}", msg_buf.str()); } } + + if (async_insert) + { + queue.push(insert_query, std::move(res), settings); + return std::make_tuple(ast, BlockIO()); + } + else if (insert_query) + { + auto in = std::static_pointer_cast(res.in); + auto ast_buffer = std::make_unique( + insert_query->data, insert_query->data ? insert_query->end - insert_query->data : 0); + + if (insert_query->data) + in->appendBuffer(std::move(ast_buffer)); + if (insert_query->tail) + in->appendBuffer(wrapReadBufferReference(*insert_query->tail)); + } } catch (...) { diff --git a/src/Parsers/ASTInsertQuery.h b/src/Parsers/ASTInsertQuery.h index 810be8eea08..9a6a6053c69 100644 --- a/src/Parsers/ASTInsertQuery.h +++ b/src/Parsers/ASTInsertQuery.h @@ -13,14 +13,16 @@ class ReadBuffer; class ASTInsertQuery : public IAST { public: + /// Part of hash StorageID table_id = StorageID::createEmpty(); ASTPtr columns; String format; - ASTPtr select; - ASTPtr watch; ASTPtr table_function; ASTPtr settings_ast; + ASTPtr select; + ASTPtr watch; + /// Data to insert const char * data = nullptr; const char * end = nullptr; diff --git a/src/Server/HTTPHandler.cpp b/src/Server/HTTPHandler.cpp index d200ee7421f..04cbfca3ae6 100644 --- a/src/Server/HTTPHandler.cpp +++ b/src/Server/HTTPHandler.cpp @@ -191,8 +191,7 @@ static std::chrono::steady_clock::duration parseSessionTimeout( void HTTPHandler::pushDelayedResults(Output & used_output) { std::vector write_buffers; - std::vector read_buffers; - std::vector read_buffers_raw_ptr; + ConcatReadBuffer::Buffers read_buffers; auto * cascade_buffer = typeid_cast(used_output.out_maybe_delayed_and_compressed.get()); if (!cascade_buffer) @@ -212,14 +211,13 @@ void HTTPHandler::pushDelayedResults(Output & used_output) && (write_buf_concrete = dynamic_cast(write_buf.get())) && (reread_buf = write_buf_concrete->tryGetReadBuffer())) { - read_buffers.emplace_back(reread_buf); - read_buffers_raw_ptr.emplace_back(reread_buf.get()); + read_buffers.emplace_back(wrapReadBufferPointer(reread_buf)); } } - if (!read_buffers_raw_ptr.empty()) + if (!read_buffers.empty()) { - ConcatReadBuffer concat_read_buffer(read_buffers_raw_ptr); + ConcatReadBuffer concat_read_buffer(std::move(read_buffers)); copyData(concat_read_buffer, *used_output.out_maybe_compressed); } } diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index a0d23b8ab22..52e5c4d91cd 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -2958,10 +2958,10 @@ String MergeTreeData::getPartitionIDFromQuery(const ASTPtr & ast, const Context if (fields_count) { - ReadBufferFromMemory left_paren_buf("(", 1); - ReadBufferFromMemory fields_buf(partition_ast.fields_str.data(), partition_ast.fields_str.size()); - ReadBufferFromMemory right_paren_buf(")", 1); - ConcatReadBuffer buf({&left_paren_buf, &fields_buf, &right_paren_buf}); + ConcatReadBuffer buf; + buf.appendBuffer(std::make_unique("(", 1)); + buf.appendBuffer(std::make_unique(partition_ast.fields_str.data(), partition_ast.fields_str.size())); + buf.appendBuffer(std::make_unique(")", 1)); auto input_format = FormatFactory::instance().getInput("Values", buf, metadata_snapshot->getPartitionKey().sample_block, context, context.getSettingsRef().max_block_size); auto input_stream = std::make_shared(input_format);