Fix heap-use-after-free in PeekableReadBuffer

This commit is contained in:
avogar 2022-11-01 12:58:20 +00:00
parent 32c2fb378f
commit e39e61fc71
12 changed files with 59 additions and 54 deletions

View File

@ -10,12 +10,12 @@ namespace ErrorCodes
}
PeekableReadBuffer::PeekableReadBuffer(ReadBuffer & sub_buf_, size_t start_size_ /*= 0*/)
: BufferWithOwnMemory(start_size_), sub_buf(sub_buf_)
: BufferWithOwnMemory(start_size_), sub_buf(&sub_buf_)
{
padded &= sub_buf.isPadded();
padded &= sub_buf->isPadded();
/// Read from sub-buffer
Buffer & sub_working = sub_buf.buffer();
BufferBase::set(sub_working.begin(), sub_working.size(), sub_buf.offset());
Buffer & sub_working = sub_buf->buffer();
BufferBase::set(sub_working.begin(), sub_working.size(), sub_buf->offset());
checkStateCorrect();
}
@ -23,17 +23,26 @@ PeekableReadBuffer::PeekableReadBuffer(ReadBuffer & sub_buf_, size_t start_size_
void PeekableReadBuffer::reset()
{
checkStateCorrect();
}
void PeekableReadBuffer::setSubBuffer(ReadBuffer & sub_buf_)
{
sub_buf = &sub_buf_;
resetImpl();
}
void PeekableReadBuffer::resetImpl()
{
peeked_size = 0;
checkpoint = std::nullopt;
checkpoint_in_own_memory = false;
use_stack_memory = true;
if (!currentlyReadFromOwnMemory())
sub_buf.position() = pos;
sub_buf->position() = pos;
Buffer & sub_working = sub_buf.buffer();
BufferBase::set(sub_working.begin(), sub_working.size(), sub_buf.offset());
Buffer & sub_working = sub_buf->buffer();
BufferBase::set(sub_working.begin(), sub_working.size(), sub_buf->offset());
checkStateCorrect();
}
@ -43,20 +52,20 @@ bool PeekableReadBuffer::peekNext()
checkStateCorrect();
Position copy_from = pos;
size_t bytes_to_copy = sub_buf.available();
size_t bytes_to_copy = sub_buf->available();
if (useSubbufferOnly())
{
/// Don't have to copy all data from sub-buffer if there is no data in own memory (checkpoint and pos are in sub-buffer)
if (checkpoint)
copy_from = *checkpoint;
bytes_to_copy = sub_buf.buffer().end() - copy_from;
bytes_to_copy = sub_buf->buffer().end() - copy_from;
if (!bytes_to_copy)
{
sub_buf.position() = copy_from;
sub_buf->position() = copy_from;
/// Both checkpoint and pos are at the end of sub-buffer. Just load next part of data.
bool res = sub_buf.next();
BufferBase::set(sub_buf.buffer().begin(), sub_buf.buffer().size(), sub_buf.offset());
bool res = sub_buf->next();
BufferBase::set(sub_buf->buffer().begin(), sub_buf->buffer().size(), sub_buf->offset());
if (checkpoint)
checkpoint.emplace(pos);
@ -70,13 +79,13 @@ bool PeekableReadBuffer::peekNext()
if (useSubbufferOnly())
{
sub_buf.position() = copy_from;
sub_buf->position() = copy_from;
}
char * memory_data = getMemoryData();
/// Save unread data from sub-buffer to own memory
memcpy(memory_data + peeked_size, sub_buf.position(), bytes_to_copy);
memcpy(memory_data + peeked_size, sub_buf->position(), bytes_to_copy);
/// If useSubbufferOnly() is false, then checkpoint is in own memory and it was updated in resizeOwnMemoryIfNecessary
/// Otherwise, checkpoint now at the beginning of own memory
@ -106,10 +115,10 @@ bool PeekableReadBuffer::peekNext()
}
peeked_size += bytes_to_copy;
sub_buf.position() += bytes_to_copy;
sub_buf->position() += bytes_to_copy;
checkStateCorrect();
return sub_buf.next();
return sub_buf->next();
}
void PeekableReadBuffer::rollbackToCheckpoint(bool drop)
@ -152,7 +161,7 @@ bool PeekableReadBuffer::nextImpl()
if (checkpoint)
{
if (currentlyReadFromOwnMemory())
res = sub_buf.hasPendingData() || sub_buf.next();
res = sub_buf->hasPendingData() || sub_buf->next();
else
res = peekNext();
}
@ -161,21 +170,21 @@ bool PeekableReadBuffer::nextImpl()
if (useSubbufferOnly())
{
/// Load next data to sub_buf
sub_buf.position() = position();
res = sub_buf.next();
sub_buf->position() = position();
res = sub_buf->next();
}
else
{
/// All copied data have been read from own memory, continue reading from sub_buf
peeked_size = 0;
res = sub_buf.hasPendingData() || sub_buf.next();
res = sub_buf->hasPendingData() || sub_buf->next();
}
}
/// Switch to reading from sub_buf (or just update it if already switched)
Buffer & sub_working = sub_buf.buffer();
BufferBase::set(sub_working.begin(), sub_working.size(), sub_buf.offset());
nextimpl_working_buffer_offset = sub_buf.offset();
Buffer & sub_working = sub_buf->buffer();
BufferBase::set(sub_working.begin(), sub_working.size(), sub_buf->offset());
nextimpl_working_buffer_offset = sub_buf->offset();
if (checkpoint_at_end)
{
@ -199,8 +208,8 @@ void PeekableReadBuffer::checkStateCorrect() const
throw DB::Exception("Checkpoint in empty own buffer", ErrorCodes::LOGICAL_ERROR);
if (currentlyReadFromOwnMemory() && pos < *checkpoint)
throw DB::Exception("Current position in own buffer before checkpoint in own buffer", ErrorCodes::LOGICAL_ERROR);
if (!currentlyReadFromOwnMemory() && pos < sub_buf.position())
throw DB::Exception("Current position in subbuffer less than sub_buf.position()", ErrorCodes::LOGICAL_ERROR);
if (!currentlyReadFromOwnMemory() && pos < sub_buf->position())
throw DB::Exception("Current position in subbuffer less than sub_buf->position()", ErrorCodes::LOGICAL_ERROR);
}
else
{
@ -294,11 +303,11 @@ void PeekableReadBuffer::makeContinuousMemoryFromCheckpointToPos()
if (!checkpointInOwnMemory() || currentlyReadFromOwnMemory())
return; /// it's already continuous
size_t bytes_to_append = pos - sub_buf.position();
size_t bytes_to_append = pos - sub_buf->position();
resizeOwnMemoryIfNecessary(bytes_to_append);
char * memory_data = getMemoryData();
memcpy(memory_data + peeked_size, sub_buf.position(), bytes_to_append);
sub_buf.position() = pos;
memcpy(memory_data + peeked_size, sub_buf->position(), bytes_to_append);
sub_buf->position() = pos;
peeked_size += bytes_to_append;
BufferBase::set(memory_data, peeked_size, peeked_size);
}
@ -306,7 +315,7 @@ void PeekableReadBuffer::makeContinuousMemoryFromCheckpointToPos()
PeekableReadBuffer::~PeekableReadBuffer()
{
if (!currentlyReadFromOwnMemory())
sub_buf.position() = pos;
sub_buf->position() = pos;
}
bool PeekableReadBuffer::hasUnreadData() const

View File

@ -24,7 +24,7 @@ public:
~PeekableReadBuffer() override;
void prefetch() override { sub_buf.prefetch(); }
void prefetch() override { sub_buf->prefetch(); }
/// Sets checkpoint at current position
ALWAYS_INLINE inline void setCheckpoint()
@ -71,13 +71,17 @@ public:
// without recreating the buffer.
void reset();
void setSubBuffer(ReadBuffer & sub_buf_);
private:
bool nextImpl() override;
void resetImpl();
bool peekNext();
inline bool useSubbufferOnly() const { return !peeked_size; }
inline bool currentlyReadFromOwnMemory() const { return working_buffer.begin() != sub_buf.buffer().begin(); }
inline bool currentlyReadFromOwnMemory() const { return working_buffer.begin() != sub_buf->buffer().begin(); }
inline bool checkpointInOwnMemory() const { return checkpoint_in_own_memory; }
void checkStateCorrect() const;
@ -90,7 +94,7 @@ private:
const char * getMemoryData() const { return use_stack_memory ? stack_memory : memory.data(); }
ReadBuffer & sub_buf;
ReadBuffer * sub_buf;
size_t peeked_size = 0;
std::optional<Position> checkpoint = std::nullopt;
bool checkpoint_in_own_memory = false;

View File

@ -84,8 +84,7 @@ void CustomSeparatedRowInputFormat::syncAfterError()
void CustomSeparatedRowInputFormat::setReadBuffer(ReadBuffer & in_)
{
buf = std::make_unique<PeekableReadBuffer>(in_);
RowInputFormatWithNamesAndTypes::setReadBuffer(*buf);
buf->setSubBuffer(in_);
}
CustomSeparatedFormatReader::CustomSeparatedFormatReader(

View File

@ -38,8 +38,7 @@ HiveTextRowInputFormat::HiveTextRowInputFormat(
void HiveTextRowInputFormat::setReadBuffer(ReadBuffer & in_)
{
buf = std::make_unique<PeekableReadBuffer>(in_);
CSVRowInputFormat::setReadBuffer(*buf);
buf->setSubBuffer(in_);
}
HiveTextFormatReader::HiveTextFormatReader(PeekableReadBuffer & buf_, const FormatSettings & format_settings_)

View File

@ -98,8 +98,7 @@ bool JSONAsRowInputFormat::readRow(MutableColumns & columns, RowReadExtension &)
void JSONAsRowInputFormat::setReadBuffer(ReadBuffer & in_)
{
buf = std::make_unique<PeekableReadBuffer>(in_);
IInputFormat::setReadBuffer(*buf);
buf->setSubBuffer(in_);
}

View File

@ -420,8 +420,7 @@ bool MsgPackRowInputFormat::readRow(MutableColumns & columns, RowReadExtension &
void MsgPackRowInputFormat::setReadBuffer(ReadBuffer & in_)
{
buf = std::make_unique<PeekableReadBuffer>(in_);
IInputFormat::setReadBuffer(in_);
buf->setSubBuffer(in_);
}
MsgPackSchemaReader::MsgPackSchemaReader(ReadBuffer & in_, const FormatSettings & format_settings_)

View File

@ -124,8 +124,7 @@ bool RegexpRowInputFormat::readRow(MutableColumns & columns, RowReadExtension &
void RegexpRowInputFormat::setReadBuffer(ReadBuffer & in_)
{
buf = std::make_unique<PeekableReadBuffer>(in_);
IInputFormat::setReadBuffer(*buf);
buf->setSubBuffer(in_);
}
RegexpSchemaReader::RegexpSchemaReader(ReadBuffer & in_, const FormatSettings & format_settings_)

View File

@ -266,9 +266,7 @@ void TemplateRowInputFormat::resetParser()
void TemplateRowInputFormat::setReadBuffer(ReadBuffer & in_)
{
buf = std::make_unique<PeekableReadBuffer>(in_);
format_reader->setReadBuffer(*buf);
IInputFormat::setReadBuffer(*buf);
buf->setSubBuffer(in_);
}
TemplateFormatReader::TemplateFormatReader(

View File

@ -562,8 +562,7 @@ void ValuesBlockInputFormat::resetParser()
void ValuesBlockInputFormat::setReadBuffer(ReadBuffer & in_)
{
buf = std::make_unique<PeekableReadBuffer>(in_);
IInputFormat::setReadBuffer(*buf);
buf->setSubBuffer(in_);
}
ValuesSchemaReader::ValuesSchemaReader(ReadBuffer & in_, const FormatSettings & format_settings_)

View File

@ -371,7 +371,7 @@ ReadBufferPtr KafkaConsumer::consume()
return nullptr;
if (hasMorePolledMessages())
return getNextNonEmptyMessage();
return getNextMessage();
if (intermediate_commit)
commit();
@ -460,10 +460,10 @@ ReadBufferPtr KafkaConsumer::consume()
ProfileEvents::increment(ProfileEvents::KafkaMessagesPolled, messages.size());
stalled_status = NOT_STALLED;
return getNextNonEmptyMessage();
return getNextMessage();
}
ReadBufferPtr KafkaConsumer::getNextNonEmptyMessage()
ReadBufferPtr KafkaConsumer::getNextMessage()
{
if (current == messages.end())
return nullptr;
@ -472,10 +472,10 @@ ReadBufferPtr KafkaConsumer::getNextNonEmptyMessage()
size_t size = current->get_payload().get_size();
++current;
if (data && size > 0)
if (data)
return std::make_shared<ReadBufferFromMemory>(data, size);
return getNextNonEmptyMessage();
return getNextMessage();
}
size_t KafkaConsumer::filterMessageErrors()

View File

@ -110,7 +110,7 @@ private:
void resetIfStopped();
/// Return number of messages with an error.
size_t filterMessageErrors();
ReadBufferPtr getNextNonEmptyMessage();
ReadBufferPtr getNextMessage();
};
}

View File

@ -56,7 +56,7 @@ public:
/// We want to control the number of rows in a chunk inserted into NATS
bool prefersLargeBlocks() const override { return false; }
void pushConsumer(NATSConsumerPtr buf);
void pushConsumer(NATSConsumerPtr consumer);
NATSConsumerPtr popConsumer();
NATSConsumerPtr popConsumer(std::chrono::milliseconds timeout);