Reworked ReadIndirectBuffer for DiskMemory.

This commit is contained in:
Pavel Kovalenko 2020-02-17 23:08:35 +03:00 committed by Pavel Kovalenko
parent cf474e1f09
commit e849654628
2 changed files with 40 additions and 33 deletions

View File

@ -16,6 +16,7 @@ namespace ErrorCodes
extern const int DIRECTORY_DOESNT_EXIST;
extern const int CANNOT_DELETE_DIRECTORY;
extern const int CANNOT_SEEK_THROUGH_FILE;
extern const int SEEK_POSITION_OUT_OF_BOUND;
}
@ -38,32 +39,39 @@ private:
std::vector<String>::iterator iter;
};
bool ReadIndirectBuffer::nextImpl()
off_t ReadIndirectBuffer::seek(off_t offset, int whence)
{
if (!initialized)
if (whence == SEEK_SET)
{
initialized = true;
internal_buffer = buf.buffer();
working_buffer = internal_buffer;
return true;
if (offset >= 0 && working_buffer.begin() + offset < working_buffer.end())
{
pos = working_buffer.begin() + offset;
return size_t(pos - working_buffer.begin());
}
else
throw Exception(
"Seek position is out of bounds. "
"Offset: "
+ std::to_string(offset) + ", Max: " + std::to_string(size_t(working_buffer.end() - working_buffer.begin())),
ErrorCodes::SEEK_POSITION_OUT_OF_BOUND);
}
return false;
}
off_t ReadIndirectBuffer::seek(off_t off, int whence)
{
/// Synchronize position in working buffer and string buffer.
buf.seek(offset(), SEEK_SET);
/// Seek string buffer position.
off_t result = buf.seek(off, whence);
pos = working_buffer.begin() + result;
return result;
else if (whence == SEEK_CUR)
{
Position new_pos = pos + offset;
if (new_pos >= working_buffer.begin() && new_pos < working_buffer.end())
{
pos = new_pos;
return size_t(pos - working_buffer.begin());
}
else
throw Exception(
"Seek position is out of bounds. "
"Offset: "
+ std::to_string(offset) + ", Max: " + std::to_string(size_t(working_buffer.end() - working_buffer.begin())),
ErrorCodes::SEEK_POSITION_OUT_OF_BOUND);
}
else
throw Exception("Only SEEK_SET and SEEK_CUR seek modes allowed.", ErrorCodes::CANNOT_SEEK_THROUGH_FILE);
}
off_t ReadIndirectBuffer::getPosition()
@ -71,6 +79,14 @@ off_t ReadIndirectBuffer::getPosition()
return pos - working_buffer.begin();
}
ReadIndirectBuffer::ReadIndirectBuffer(String path_, const String & data_)
: ReadBufferFromFileBase(), buf(ReadBufferFromString(data_)), path(std::move(path_))
{
internal_buffer = buf.buffer();
working_buffer = internal_buffer;
pos = working_buffer.begin();
}
void WriteIndirectBuffer::finalize()
{
if (isFinished())

View File

@ -19,22 +19,13 @@ class WriteBuffer;
class ReadIndirectBuffer : public ReadBufferFromFileBase
{
public:
ReadIndirectBuffer(String path_, const String & data_)
: ReadBufferFromFileBase(), buf(ReadBufferFromString(data_)), path(std::move(path_))
{
}
ReadIndirectBuffer(String path_, const String & data_);
std::string getFileName() const override { return path; }
private:
bool nextImpl() override;
public:
off_t seek(off_t off, int whence) override;
off_t getPosition() override;
private:
bool initialized = false;
ReadBufferFromString buf;
String path;
};