Merge pull request #20037 from abyss7/better-read-buffers-2

ReadBuffer: check for unread data on next()
This commit is contained in:
alexey-milovidov 2021-02-07 01:33:06 +03:00 committed by GitHub
commit f2ab4f2ce9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 40 additions and 31 deletions

View File

@ -249,7 +249,7 @@ public:
if (whence == SEEK_CUR) if (whence == SEEK_CUR)
{ {
/// If position within current working buffer - shift pos. /// If position within current working buffer - shift pos.
if (working_buffer.size() && size_t(getPosition() + offset_) < absolute_position) if (!working_buffer.empty() && size_t(getPosition() + offset_) < absolute_position)
{ {
pos += offset_; pos += offset_;
return getPosition(); return getPosition();
@ -262,7 +262,7 @@ public:
else if (whence == SEEK_SET) else if (whence == SEEK_SET)
{ {
/// If position within current working buffer - shift pos. /// If position within current working buffer - shift pos.
if (working_buffer.size() && size_t(offset_) >= absolute_position - working_buffer.size() if (!working_buffer.empty() && size_t(offset_) >= absolute_position - working_buffer.size()
&& size_t(offset_) < absolute_position) && size_t(offset_) < absolute_position)
{ {
pos = working_buffer.end() - (absolute_position - offset_); pos = working_buffer.end() - (absolute_position - offset_);

View File

@ -77,7 +77,7 @@ bool BrotliReadBuffer::nextImpl()
if (in->eof()) if (in->eof())
{ {
eof = true; eof = true;
return working_buffer.size() != 0; return !working_buffer.empty();
} }
else else
{ {

View File

@ -40,6 +40,7 @@ public:
inline Position end() const { return end_pos; } inline Position end() const { return end_pos; }
inline size_t size() const { return size_t(end_pos - begin_pos); } inline size_t size() const { return size_t(end_pos - begin_pos); }
inline void resize(size_t size) { end_pos = begin_pos + size; } inline void resize(size_t size) { end_pos = begin_pos + size; }
inline bool empty() const { return size() == 0; }
inline void swap(Buffer & other) inline void swap(Buffer & other)
{ {

View File

@ -25,11 +25,16 @@ protected:
return false; return false;
/// First reading /// First reading
if (working_buffer.size() == 0 && (*current)->hasPendingData()) if (working_buffer.empty())
{ {
working_buffer = Buffer((*current)->position(), (*current)->buffer().end()); if ((*current)->hasPendingData())
return true; {
working_buffer = Buffer((*current)->position(), (*current)->buffer().end());
return true;
}
} }
else
(*current)->position() = position();
if (!(*current)->next()) if (!(*current)->next())
{ {
@ -51,14 +56,12 @@ protected:
} }
public: public:
ConcatReadBuffer(const ReadBuffers & buffers_) : ReadBuffer(nullptr, 0), buffers(buffers_), current(buffers.begin()) {} explicit ConcatReadBuffer(const ReadBuffers & buffers_) : ReadBuffer(nullptr, 0), buffers(buffers_), current(buffers.begin())
ConcatReadBuffer(ReadBuffer & buf1, ReadBuffer & buf2) : ReadBuffer(nullptr, 0)
{ {
buffers.push_back(&buf1); assert(!buffers.empty());
buffers.push_back(&buf2);
current = buffers.begin();
} }
ConcatReadBuffer(ReadBuffer & buf1, ReadBuffer & buf2) : ConcatReadBuffer({&buf1, &buf2}) {}
}; };
} }

View File

@ -1,10 +1,11 @@
#pragma once #pragma once
#include <IO/ReadBuffer.h>
#include <IO/HashingWriteBuffer.h> #include <IO/HashingWriteBuffer.h>
#include <IO/ReadBuffer.h>
namespace DB namespace DB
{ {
/* /*
* Calculates the hash from the read data. When reading, the data is read from the nested ReadBuffer. * Calculates the hash from the read data. When reading, the data is read from the nested ReadBuffer.
* Small pieces are copied into its own memory. * Small pieces are copied into its own memory.
@ -12,14 +13,14 @@ namespace DB
class HashingReadBuffer : public IHashingBuffer<ReadBuffer> class HashingReadBuffer : public IHashingBuffer<ReadBuffer>
{ {
public: public:
HashingReadBuffer(ReadBuffer & in_, size_t block_size_ = DBMS_DEFAULT_HASHING_BLOCK_SIZE) : explicit HashingReadBuffer(ReadBuffer & in_, size_t block_size_ = DBMS_DEFAULT_HASHING_BLOCK_SIZE)
IHashingBuffer<ReadBuffer>(block_size_), in(in_) : IHashingBuffer<ReadBuffer>(block_size_), in(in_)
{ {
working_buffer = in.buffer(); working_buffer = in.buffer();
pos = in.position(); pos = in.position();
/// calculate hash from the data already read /// calculate hash from the data already read
if (working_buffer.size()) if (!working_buffer.empty())
{ {
calculateHash(pos, working_buffer.end() - pos); calculateHash(pos, working_buffer.end() - pos);
} }
@ -39,7 +40,7 @@ private:
return res; return res;
} }
private:
ReadBuffer & in; ReadBuffer & in;
}; };
} }

View File

@ -66,7 +66,7 @@ bool LZMAInflatingReadBuffer::nextImpl()
if (in->eof()) if (in->eof())
{ {
eof = true; eof = true;
return working_buffer.size() != 0; return !working_buffer.empty();
} }
else else
{ {

View File

@ -50,7 +50,7 @@ LimitReadBuffer::LimitReadBuffer(ReadBuffer & in_, UInt64 limit_, bool throw_exc
LimitReadBuffer::~LimitReadBuffer() LimitReadBuffer::~LimitReadBuffer()
{ {
/// Update underlying buffer's position in case when limit wasn't reached. /// Update underlying buffer's position in case when limit wasn't reached.
if (working_buffer.size() != 0) if (!working_buffer.empty())
in.position() = position(); in.position() = position();
} }

View File

@ -61,7 +61,7 @@ private:
position() = nullptr; position() = nullptr;
} }
return buffer().size() != 0; return !buffer().empty();
} }
using Container = std::forward_list<BufferBase::Buffer>; using Container = std::forward_list<BufferBase::Buffer>;

View File

@ -55,6 +55,7 @@ public:
*/ */
bool next() bool next()
{ {
assert(!hasPendingData());
assert(position() <= working_buffer.end()); assert(position() <= working_buffer.end());
bytes += offset(); bytes += offset();
@ -77,7 +78,7 @@ public:
next(); next();
} }
virtual ~ReadBuffer() {} virtual ~ReadBuffer() = default;
/** Unlike std::istream, it returns true if all data was read /** Unlike std::istream, it returns true if all data was read
@ -197,7 +198,7 @@ private:
*/ */
virtual bool nextImpl() { return false; } virtual bool nextImpl() { return false; }
[[noreturn]] void throwReadAfterEOF() [[noreturn]] static void throwReadAfterEOF()
{ {
throw Exception("Attempt to read after eof", ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF); throw Exception("Attempt to read after eof", ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF);
} }

View File

@ -76,9 +76,7 @@ public:
} }
} }
virtual ~UpdatableSessionBase() virtual ~UpdatableSessionBase() = default;
{
}
}; };
@ -205,6 +203,8 @@ namespace detail
{ {
if (next_callback) if (next_callback)
next_callback(count()); next_callback(count());
if (!working_buffer.empty())
impl->position() = position();
if (!impl->next()) if (!impl->next())
return false; return false;
internal_buffer = impl->buffer(); internal_buffer = impl->buffer();

View File

@ -61,7 +61,7 @@ public:
/** it is desirable in the derived classes to place the next() call in the destructor, /** it is desirable in the derived classes to place the next() call in the destructor,
* so that the last data is written * so that the last data is written
*/ */
virtual ~WriteBuffer() {} virtual ~WriteBuffer() = default;
inline void nextIfAtEnd() inline void nextIfAtEnd()
{ {
@ -75,7 +75,7 @@ public:
size_t bytes_copied = 0; size_t bytes_copied = 0;
/// Produces endless loop /// Produces endless loop
assert(working_buffer.size() > 0); assert(!working_buffer.empty());
while (bytes_copied < n) while (bytes_copied < n)
{ {

View File

@ -70,7 +70,7 @@ bool ZlibInflatingReadBuffer::nextImpl()
if (in->eof()) if (in->eof())
{ {
eof = true; eof = true;
return working_buffer.size() != 0; return !working_buffer.empty();
} }
else else
{ {

View File

@ -54,7 +54,7 @@ bool ZstdInflatingReadBuffer::nextImpl()
if (in->eof()) if (in->eof())
{ {
eof = true; eof = true;
return working_buffer.size() != 0; return !working_buffer.empty();
} }
return true; return true;

View File

@ -219,8 +219,11 @@ void HTTPHandler::pushDelayedResults(Output & used_output)
} }
} }
ConcatReadBuffer concat_read_buffer(read_buffers_raw_ptr); if (!read_buffers_raw_ptr.empty())
copyData(concat_read_buffer, *used_output.out_maybe_compressed); {
ConcatReadBuffer concat_read_buffer(read_buffers_raw_ptr);
copyData(concat_read_buffer, *used_output.out_maybe_compressed);
}
} }