diff --git a/src/IO/PeekableReadBuffer.cpp b/src/IO/PeekableReadBuffer.cpp index d9de3a5e76a..c47bdce3924 100644 --- a/src/IO/PeekableReadBuffer.cpp +++ b/src/IO/PeekableReadBuffer.cpp @@ -10,12 +10,12 @@ namespace ErrorCodes } PeekableReadBuffer::PeekableReadBuffer(ReadBuffer & sub_buf_, size_t start_size_ /*= 0*/) - : BufferWithOwnMemory(start_size_), sub_buf(sub_buf_) + : BufferWithOwnMemory(start_size_), sub_buf(&sub_buf_) { - padded &= sub_buf.isPadded(); + padded &= sub_buf->isPadded(); /// Read from sub-buffer - Buffer & sub_working = sub_buf.buffer(); - BufferBase::set(sub_working.begin(), sub_working.size(), sub_buf.offset()); + Buffer & sub_working = sub_buf->buffer(); + BufferBase::set(sub_working.begin(), sub_working.size(), sub_buf->offset()); checkStateCorrect(); } @@ -23,17 +23,26 @@ PeekableReadBuffer::PeekableReadBuffer(ReadBuffer & sub_buf_, size_t start_size_ void PeekableReadBuffer::reset() { checkStateCorrect(); +} +void PeekableReadBuffer::setSubBuffer(ReadBuffer & sub_buf_) +{ + sub_buf = &sub_buf_; + resetImpl(); +} + +void PeekableReadBuffer::resetImpl() +{ peeked_size = 0; checkpoint = std::nullopt; checkpoint_in_own_memory = false; use_stack_memory = true; if (!currentlyReadFromOwnMemory()) - sub_buf.position() = pos; + sub_buf->position() = pos; - Buffer & sub_working = sub_buf.buffer(); - BufferBase::set(sub_working.begin(), sub_working.size(), sub_buf.offset()); + Buffer & sub_working = sub_buf->buffer(); + BufferBase::set(sub_working.begin(), sub_working.size(), sub_buf->offset()); checkStateCorrect(); } @@ -43,20 +52,20 @@ bool PeekableReadBuffer::peekNext() checkStateCorrect(); Position copy_from = pos; - size_t bytes_to_copy = sub_buf.available(); + size_t bytes_to_copy = sub_buf->available(); if (useSubbufferOnly()) { /// Don't have to copy all data from sub-buffer if there is no data in own memory (checkpoint and pos are in sub-buffer) if (checkpoint) copy_from = *checkpoint; - bytes_to_copy = sub_buf.buffer().end() - copy_from; + bytes_to_copy = sub_buf->buffer().end() - copy_from; if (!bytes_to_copy) { - sub_buf.position() = copy_from; + sub_buf->position() = copy_from; /// Both checkpoint and pos are at the end of sub-buffer. Just load next part of data. - bool res = sub_buf.next(); - BufferBase::set(sub_buf.buffer().begin(), sub_buf.buffer().size(), sub_buf.offset()); + bool res = sub_buf->next(); + BufferBase::set(sub_buf->buffer().begin(), sub_buf->buffer().size(), sub_buf->offset()); if (checkpoint) checkpoint.emplace(pos); @@ -70,13 +79,13 @@ bool PeekableReadBuffer::peekNext() if (useSubbufferOnly()) { - sub_buf.position() = copy_from; + sub_buf->position() = copy_from; } char * memory_data = getMemoryData(); /// Save unread data from sub-buffer to own memory - memcpy(memory_data + peeked_size, sub_buf.position(), bytes_to_copy); + memcpy(memory_data + peeked_size, sub_buf->position(), bytes_to_copy); /// If useSubbufferOnly() is false, then checkpoint is in own memory and it was updated in resizeOwnMemoryIfNecessary /// Otherwise, checkpoint now at the beginning of own memory @@ -106,10 +115,10 @@ bool PeekableReadBuffer::peekNext() } peeked_size += bytes_to_copy; - sub_buf.position() += bytes_to_copy; + sub_buf->position() += bytes_to_copy; checkStateCorrect(); - return sub_buf.next(); + return sub_buf->next(); } void PeekableReadBuffer::rollbackToCheckpoint(bool drop) @@ -152,7 +161,7 @@ bool PeekableReadBuffer::nextImpl() if (checkpoint) { if (currentlyReadFromOwnMemory()) - res = sub_buf.hasPendingData() || sub_buf.next(); + res = sub_buf->hasPendingData() || sub_buf->next(); else res = peekNext(); } @@ -161,21 +170,21 @@ bool PeekableReadBuffer::nextImpl() if (useSubbufferOnly()) { /// Load next data to sub_buf - sub_buf.position() = position(); - res = sub_buf.next(); + sub_buf->position() = position(); + res = sub_buf->next(); } else { /// All copied data have been read from own memory, continue reading from sub_buf peeked_size = 0; - res = sub_buf.hasPendingData() || sub_buf.next(); + res = sub_buf->hasPendingData() || sub_buf->next(); } } /// Switch to reading from sub_buf (or just update it if already switched) - Buffer & sub_working = sub_buf.buffer(); - BufferBase::set(sub_working.begin(), sub_working.size(), sub_buf.offset()); - nextimpl_working_buffer_offset = sub_buf.offset(); + Buffer & sub_working = sub_buf->buffer(); + BufferBase::set(sub_working.begin(), sub_working.size(), sub_buf->offset()); + nextimpl_working_buffer_offset = sub_buf->offset(); if (checkpoint_at_end) { @@ -199,8 +208,8 @@ void PeekableReadBuffer::checkStateCorrect() const throw DB::Exception("Checkpoint in empty own buffer", ErrorCodes::LOGICAL_ERROR); if (currentlyReadFromOwnMemory() && pos < *checkpoint) throw DB::Exception("Current position in own buffer before checkpoint in own buffer", ErrorCodes::LOGICAL_ERROR); - if (!currentlyReadFromOwnMemory() && pos < sub_buf.position()) - throw DB::Exception("Current position in subbuffer less than sub_buf.position()", ErrorCodes::LOGICAL_ERROR); + if (!currentlyReadFromOwnMemory() && pos < sub_buf->position()) + throw DB::Exception("Current position in subbuffer less than sub_buf->position()", ErrorCodes::LOGICAL_ERROR); } else { @@ -294,11 +303,11 @@ void PeekableReadBuffer::makeContinuousMemoryFromCheckpointToPos() if (!checkpointInOwnMemory() || currentlyReadFromOwnMemory()) return; /// it's already continuous - size_t bytes_to_append = pos - sub_buf.position(); + size_t bytes_to_append = pos - sub_buf->position(); resizeOwnMemoryIfNecessary(bytes_to_append); char * memory_data = getMemoryData(); - memcpy(memory_data + peeked_size, sub_buf.position(), bytes_to_append); - sub_buf.position() = pos; + memcpy(memory_data + peeked_size, sub_buf->position(), bytes_to_append); + sub_buf->position() = pos; peeked_size += bytes_to_append; BufferBase::set(memory_data, peeked_size, peeked_size); } @@ -306,7 +315,7 @@ void PeekableReadBuffer::makeContinuousMemoryFromCheckpointToPos() PeekableReadBuffer::~PeekableReadBuffer() { if (!currentlyReadFromOwnMemory()) - sub_buf.position() = pos; + sub_buf->position() = pos; } bool PeekableReadBuffer::hasUnreadData() const diff --git a/src/IO/PeekableReadBuffer.h b/src/IO/PeekableReadBuffer.h index 15283793755..3ed2ebdac0a 100644 --- a/src/IO/PeekableReadBuffer.h +++ b/src/IO/PeekableReadBuffer.h @@ -24,7 +24,7 @@ public: ~PeekableReadBuffer() override; - void prefetch() override { sub_buf.prefetch(); } + void prefetch() override { sub_buf->prefetch(); } /// Sets checkpoint at current position ALWAYS_INLINE inline void setCheckpoint() @@ -71,13 +71,17 @@ public: // without recreating the buffer. void reset(); + void setSubBuffer(ReadBuffer & sub_buf_); + private: bool nextImpl() override; + void resetImpl(); + bool peekNext(); inline bool useSubbufferOnly() const { return !peeked_size; } - inline bool currentlyReadFromOwnMemory() const { return working_buffer.begin() != sub_buf.buffer().begin(); } + inline bool currentlyReadFromOwnMemory() const { return working_buffer.begin() != sub_buf->buffer().begin(); } inline bool checkpointInOwnMemory() const { return checkpoint_in_own_memory; } void checkStateCorrect() const; @@ -90,7 +94,7 @@ private: const char * getMemoryData() const { return use_stack_memory ? stack_memory : memory.data(); } - ReadBuffer & sub_buf; + ReadBuffer * sub_buf; size_t peeked_size = 0; std::optional checkpoint = std::nullopt; bool checkpoint_in_own_memory = false; diff --git a/src/Processors/Formats/Impl/CustomSeparatedRowInputFormat.cpp b/src/Processors/Formats/Impl/CustomSeparatedRowInputFormat.cpp index 1c99a5484a2..069db49b147 100644 --- a/src/Processors/Formats/Impl/CustomSeparatedRowInputFormat.cpp +++ b/src/Processors/Formats/Impl/CustomSeparatedRowInputFormat.cpp @@ -84,8 +84,7 @@ void CustomSeparatedRowInputFormat::syncAfterError() void CustomSeparatedRowInputFormat::setReadBuffer(ReadBuffer & in_) { - buf = std::make_unique(in_); - RowInputFormatWithNamesAndTypes::setReadBuffer(*buf); + buf->setSubBuffer(in_); } CustomSeparatedFormatReader::CustomSeparatedFormatReader( diff --git a/src/Processors/Formats/Impl/HiveTextRowInputFormat.cpp b/src/Processors/Formats/Impl/HiveTextRowInputFormat.cpp index 6f405dac1ff..ec5612ae30b 100644 --- a/src/Processors/Formats/Impl/HiveTextRowInputFormat.cpp +++ b/src/Processors/Formats/Impl/HiveTextRowInputFormat.cpp @@ -38,8 +38,7 @@ HiveTextRowInputFormat::HiveTextRowInputFormat( void HiveTextRowInputFormat::setReadBuffer(ReadBuffer & in_) { - buf = std::make_unique(in_); - CSVRowInputFormat::setReadBuffer(*buf); + buf->setSubBuffer(in_); } HiveTextFormatReader::HiveTextFormatReader(PeekableReadBuffer & buf_, const FormatSettings & format_settings_) diff --git a/src/Processors/Formats/Impl/JSONAsStringRowInputFormat.cpp b/src/Processors/Formats/Impl/JSONAsStringRowInputFormat.cpp index d369eedceea..64a3bc9f9b5 100644 --- a/src/Processors/Formats/Impl/JSONAsStringRowInputFormat.cpp +++ b/src/Processors/Formats/Impl/JSONAsStringRowInputFormat.cpp @@ -98,8 +98,7 @@ bool JSONAsRowInputFormat::readRow(MutableColumns & columns, RowReadExtension &) void JSONAsRowInputFormat::setReadBuffer(ReadBuffer & in_) { - buf = std::make_unique(in_); - IInputFormat::setReadBuffer(*buf); + buf->setSubBuffer(in_); } diff --git a/src/Processors/Formats/Impl/MsgPackRowInputFormat.cpp b/src/Processors/Formats/Impl/MsgPackRowInputFormat.cpp index 931a7587903..4e920dc033c 100644 --- a/src/Processors/Formats/Impl/MsgPackRowInputFormat.cpp +++ b/src/Processors/Formats/Impl/MsgPackRowInputFormat.cpp @@ -420,8 +420,7 @@ bool MsgPackRowInputFormat::readRow(MutableColumns & columns, RowReadExtension & void MsgPackRowInputFormat::setReadBuffer(ReadBuffer & in_) { - buf = std::make_unique(in_); - IInputFormat::setReadBuffer(in_); + buf->setSubBuffer(in_); } MsgPackSchemaReader::MsgPackSchemaReader(ReadBuffer & in_, const FormatSettings & format_settings_) diff --git a/src/Processors/Formats/Impl/RegexpRowInputFormat.cpp b/src/Processors/Formats/Impl/RegexpRowInputFormat.cpp index c26b6b39e0d..796dc73f672 100644 --- a/src/Processors/Formats/Impl/RegexpRowInputFormat.cpp +++ b/src/Processors/Formats/Impl/RegexpRowInputFormat.cpp @@ -124,8 +124,7 @@ bool RegexpRowInputFormat::readRow(MutableColumns & columns, RowReadExtension & void RegexpRowInputFormat::setReadBuffer(ReadBuffer & in_) { - buf = std::make_unique(in_); - IInputFormat::setReadBuffer(*buf); + buf->setSubBuffer(in_); } RegexpSchemaReader::RegexpSchemaReader(ReadBuffer & in_, const FormatSettings & format_settings_) diff --git a/src/Processors/Formats/Impl/TemplateRowInputFormat.cpp b/src/Processors/Formats/Impl/TemplateRowInputFormat.cpp index 09a5c69530d..38d248dfe79 100644 --- a/src/Processors/Formats/Impl/TemplateRowInputFormat.cpp +++ b/src/Processors/Formats/Impl/TemplateRowInputFormat.cpp @@ -266,9 +266,7 @@ void TemplateRowInputFormat::resetParser() void TemplateRowInputFormat::setReadBuffer(ReadBuffer & in_) { - buf = std::make_unique(in_); - format_reader->setReadBuffer(*buf); - IInputFormat::setReadBuffer(*buf); + buf->setSubBuffer(in_); } TemplateFormatReader::TemplateFormatReader( diff --git a/src/Processors/Formats/Impl/ValuesBlockInputFormat.cpp b/src/Processors/Formats/Impl/ValuesBlockInputFormat.cpp index aff4557a4b7..e5f74bde992 100644 --- a/src/Processors/Formats/Impl/ValuesBlockInputFormat.cpp +++ b/src/Processors/Formats/Impl/ValuesBlockInputFormat.cpp @@ -562,8 +562,7 @@ void ValuesBlockInputFormat::resetParser() void ValuesBlockInputFormat::setReadBuffer(ReadBuffer & in_) { - buf = std::make_unique(in_); - IInputFormat::setReadBuffer(*buf); + buf->setSubBuffer(in_); } ValuesSchemaReader::ValuesSchemaReader(ReadBuffer & in_, const FormatSettings & format_settings_) diff --git a/src/Storages/Kafka/KafkaConsumer.cpp b/src/Storages/Kafka/KafkaConsumer.cpp index 3c532a83c31..1cfbd145fb1 100644 --- a/src/Storages/Kafka/KafkaConsumer.cpp +++ b/src/Storages/Kafka/KafkaConsumer.cpp @@ -371,7 +371,7 @@ ReadBufferPtr KafkaConsumer::consume() return nullptr; if (hasMorePolledMessages()) - return getNextNonEmptyMessage(); + return getNextMessage(); if (intermediate_commit) commit(); @@ -460,10 +460,10 @@ ReadBufferPtr KafkaConsumer::consume() ProfileEvents::increment(ProfileEvents::KafkaMessagesPolled, messages.size()); stalled_status = NOT_STALLED; - return getNextNonEmptyMessage(); + return getNextMessage(); } -ReadBufferPtr KafkaConsumer::getNextNonEmptyMessage() +ReadBufferPtr KafkaConsumer::getNextMessage() { if (current == messages.end()) return nullptr; @@ -472,10 +472,10 @@ ReadBufferPtr KafkaConsumer::getNextNonEmptyMessage() size_t size = current->get_payload().get_size(); ++current; - if (data && size > 0) + if (data) return std::make_shared(data, size); - return getNextNonEmptyMessage(); + return getNextMessage(); } size_t KafkaConsumer::filterMessageErrors() diff --git a/src/Storages/Kafka/KafkaConsumer.h b/src/Storages/Kafka/KafkaConsumer.h index 3a00cfdf8c6..1fc7c260c11 100644 --- a/src/Storages/Kafka/KafkaConsumer.h +++ b/src/Storages/Kafka/KafkaConsumer.h @@ -110,7 +110,7 @@ private: void resetIfStopped(); /// Return number of messages with an error. size_t filterMessageErrors(); - ReadBufferPtr getNextNonEmptyMessage(); + ReadBufferPtr getNextMessage(); }; } diff --git a/src/Storages/NATS/StorageNATS.h b/src/Storages/NATS/StorageNATS.h index e29755d4e28..787abb01e0e 100644 --- a/src/Storages/NATS/StorageNATS.h +++ b/src/Storages/NATS/StorageNATS.h @@ -56,7 +56,7 @@ public: /// We want to control the number of rows in a chunk inserted into NATS bool prefersLargeBlocks() const override { return false; } - void pushConsumer(NATSConsumerPtr buf); + void pushConsumer(NATSConsumerPtr consumer); NATSConsumerPtr popConsumer(); NATSConsumerPtr popConsumer(std::chrono::milliseconds timeout);