DB: IO: allowed to use not own memory piece [#CONV-2546].

This commit is contained in:
Alexey Milovidov 2011-06-27 18:22:14 +00:00
parent 054786a2a7
commit ea1a7d9192
9 changed files with 144 additions and 106 deletions

View File

@ -0,0 +1,85 @@
#ifndef DBMS_COMMON_BUFFERBASE_H
#define DBMS_COMMON_BUFFERBASE_H
namespace DB
{
/** Базовый класс для ReadBuffer и WriteBuffer.
* Содержит общие типы, переменные и функции.
*/
class BufferBase
{
public:
/** Курсор в буфере. Позиция записи или чтения. */
typedef char * Position;
/** Ссылка на диапазон памяти. */
struct Buffer
{
Buffer(Position begin_pos_, Position end_pos_) : begin_pos(begin_pos_), end_pos(end_pos_) {}
inline Position begin() { return begin_pos; }
inline Position end() { return end_pos; }
inline size_t size() { return end_pos - begin_pos; }
inline void resize(size_t size) { end_pos = begin_pos + size; }
private:
Position begin_pos;
Position end_pos; /// на 1 байт после конца буфера
};
/** Конструктор принимает диапазон памяти, который следует использовать под буфер.
* offset - начальное место курсора. ReadBuffer должен установить его в конец диапазона, а WriteBuffer - в начало.
*/
BufferBase(Position ptr, size_t size, size_t offset)
: internal_buffer(ptr, ptr + size), working_buffer(ptr, ptr + size), pos(ptr + offset), bytes(0) {}
void set(Position ptr, size_t size, size_t offset)
{
internal_buffer = Buffer(ptr, ptr + size);
working_buffer = Buffer(ptr, ptr + size);
pos = ptr + offset;
bytes = 0;
}
/// получить часть буфера, из которого можно читать / в который можно писать данные
inline Buffer & buffer() { return working_buffer; }
/// получить (для чтения и изменения) позицию в буфере
inline Position & position() { return pos; };
/// смещение в байтах курсора от начала буфера
inline size_t offset() { return pos - working_buffer.begin(); }
/** Сколько байт было прочитано/записано, считая те, что ещё в буфере. */
size_t count()
{
return bytes + offset();
}
protected:
/// Ссылка на кусок памяти для буфера.
Buffer internal_buffer;
/** Часть куска памяти, которую можно использовать.
* Например, если internal_buffer - 1MB, а из файла для чтения было загружено в буфер
* только 10 байт, то working_buffer будет иметь размер 10 байт
* (working_buffer.end() будет указывать на позицию сразу после тех 10 байт, которых можно прочитать).
*/
Buffer working_buffer;
/// Позиция чтения/записи.
Position pos;
/** Сколько байт было прочитано/записано, не считая тех, что сейчас в буфере.
* (считая те, что были уже использованы и удалены из буфера)
*/
size_t bytes;
};
}
#endif

View File

@ -0,0 +1,31 @@
#ifndef DBMS_COMMON_BUFFERWITHOWNMEMORY_H
#define DBMS_COMMON_BUFFERWITHOWNMEMORY_H
#include <vector>
#define DBMS_DEFAULT_BUFFER_SIZE 1048576ULL
namespace DB
{
/** Буфер, который сам владеет своим куском памяти для работы.
* Аргумент шаблона - ReadBuffer или WriteBuffer
*/
template <typename Base>
class BufferWithOwnMemory : public Base
{
protected:
std::vector<char> memory;
public:
BufferWithOwnMemory(size_t size = DBMS_DEFAULT_BUFFER_SIZE) : Base(NULL, size), memory(size)
{
Base::set(&memory[0], size);
}
};
}
#endif

View File

@ -9,6 +9,7 @@
#include <DB/Core/Exception.h>
#include <DB/Core/ErrorCodes.h>
#include <DB/IO/ReadBuffer.h>
#include <DB/IO/BufferWithOwnMemory.h>
#include <DB/IO/CompressedStream.h>
@ -18,7 +19,7 @@
namespace DB
{
class CompressedReadBuffer : public ReadBuffer
class CompressedReadBuffer : public BufferWithOwnMemory<ReadBuffer>
{
private:
ReadBuffer & in;
@ -43,16 +44,16 @@ private:
size_t size_decompressed = qlz_size_decompressed(&compressed_buffer[0]);
compressed_buffer.resize(size_compressed);
memory.resize(size_decompressed);
internal_buffer.resize(size_decompressed);
working_buffer.resize(size_decompressed);
in.readStrict(&compressed_buffer[QUICKLZ_HEADER_SIZE], size_compressed - QUICKLZ_HEADER_SIZE);
if (checksum != CityHash128(&compressed_buffer[0], size_compressed))
throw Exception("Checksum doesnt match: corrupted data.", ErrorCodes::CHECKSUM_DOESNT_MATCH);
qlz_decompress(&compressed_buffer[0], &internal_buffer[0], scratch);
working_buffer = Buffer(working_buffer.begin(), working_buffer.begin() + size_decompressed);
qlz_decompress(&compressed_buffer[0], working_buffer.begin(), scratch);
return true;
}

View File

@ -7,13 +7,14 @@
#include <quicklz/quicklz_level1.h>
#include <DB/IO/WriteBuffer.h>
#include <DB/IO/BufferWithOwnMemory.h>
#include <DB/IO/CompressedStream.h>
namespace DB
{
class CompressedWriteBuffer : public WriteBuffer
class CompressedWriteBuffer : public BufferWithOwnMemory<WriteBuffer>
{
private:
WriteBuffer & out;
@ -25,7 +26,7 @@ private:
void nextImpl()
{
size_t uncompressed_size = pos - working_buffer.begin();
size_t uncompressed_size = offset();
compressed_buffer.resize(uncompressed_size + QUICKLZ_ADDITIONAL_SPACE);
size_t compressed_size = qlz_compress(

View File

@ -1,14 +1,11 @@
#ifndef DBMS_COMMON_READBUFFER_H
#define DBMS_COMMON_READBUFFER_H
#include <vector>
#include <cstring>
#include <algorithm>
#include <DB/Core/Exception.h>
#include <DB/Core/ErrorCodes.h>
#define DEFAULT_READ_BUFFER_SIZE 1048576UL
#include <DB/IO/BufferBase.h>
namespace DB
@ -20,45 +17,21 @@ namespace DB
*
* Наследники должны реализовать метод nextImpl().
*/
class ReadBuffer
class ReadBuffer : public BufferBase
{
public:
typedef char * Position;
struct Buffer
{
Buffer(Position begin_pos_, Position end_pos_) : begin_pos(begin_pos_), end_pos(end_pos_) {}
inline Position begin() { return begin_pos; }
inline Position end() { return end_pos; }
private:
Position begin_pos;
Position end_pos; /// на 1 байт после конца буфера
};
ReadBuffer()
: internal_buffer(DEFAULT_READ_BUFFER_SIZE),
working_buffer(&internal_buffer[0], &internal_buffer[0]),
pos(&internal_buffer[0]),
bytes_read(0)
{}
/// получить часть буфера, из которого можно читать данные
inline Buffer & buffer() { return working_buffer; }
/// получить (для чтения и изменения) позицию в буфере
inline Position & position() { return pos; };
ReadBuffer(Position ptr, size_t size) : BufferBase(ptr, size, size) {}
void set(Position ptr, size_t size) { BufferBase::set(ptr, size, size); }
/** прочитать следующие данные и заполнить ими буфер; переместить позицию в начало;
* вернуть false в случае конца, true иначе; кинуть исключение, если что-то не так
*/
inline bool next()
{
bytes_read += pos - working_buffer.begin();
bytes += offset();
bool res = nextImpl();
if (!res)
working_buffer = Buffer(working_buffer.begin(), working_buffer.begin());
working_buffer.resize(0);
pos = working_buffer.begin();
return res;
@ -123,22 +96,7 @@ public:
throw Exception("Cannot read all data", ErrorCodes::CANNOT_READ_ALL_DATA);
}
/** Сколько байт было прочитано из буфера. */
size_t count()
{
return bytes_read + pos - working_buffer.begin();
}
protected:
std::vector<char> internal_buffer;
Buffer working_buffer;
Position pos;
private:
size_t bytes_read;
/** Прочитать следующие данные и заполнить ими буфер.
* Вернуть false в случае конца, true иначе.
* Кинуть исключение, если что-то не так.

View File

@ -7,19 +7,20 @@
#include <DB/Core/ErrorCodes.h>
#include <DB/IO/ReadBuffer.h>
#include <DB/IO/BufferWithOwnMemory.h>
namespace DB
{
class ReadBufferFromIStream : public ReadBuffer
class ReadBufferFromIStream : public BufferWithOwnMemory<ReadBuffer>
{
private:
std::istream & istr;
bool nextImpl()
{
istr.read(working_buffer.begin(), DEFAULT_READ_BUFFER_SIZE);
istr.read(working_buffer.begin(), working_buffer.size());
size_t gcount = istr.gcount();
if (!gcount)
@ -30,7 +31,7 @@ private:
throw Exception("Cannot read from istream", ErrorCodes::CANNOT_READ_FROM_ISTREAM);
}
else
working_buffer = Buffer(working_buffer.begin(), working_buffer.begin() + gcount);
working_buffer.resize(gcount);
return true;
}

View File

@ -1,11 +1,10 @@
#ifndef DBMS_COMMON_WRITEBUFFER_H
#define DBMS_COMMON_WRITEBUFFER_H
#include <vector>
#include <cstring>
#include <algorithm>
#include <cstring>
#define DEFAULT_WRITE_BUFFER_SIZE 1048576UL
#include <DB/IO/BufferBase.h>
namespace DB
@ -17,43 +16,18 @@ namespace DB
*
* Наследники должны реализовать метод nextImpl().
*/
class WriteBuffer
class WriteBuffer : public BufferBase
{
public:
typedef char * Position;
struct Buffer
{
Buffer(Position begin_pos_, Position end_pos_) : begin_pos(begin_pos_), end_pos(end_pos_) {}
inline Position begin() { return begin_pos; }
inline Position end() { return end_pos; }
private:
Position begin_pos;
Position end_pos; /// на 1 байт после конца буфера
};
WriteBuffer()
: internal_buffer(DEFAULT_WRITE_BUFFER_SIZE),
working_buffer(&internal_buffer[0], &internal_buffer[0] + DEFAULT_WRITE_BUFFER_SIZE),
pos(&internal_buffer[0]),
bytes_written(0)
{
}
/// получить часть буфера, в который можно писать данные
inline Buffer & buffer() { return working_buffer; }
/// получить (для чтения и изменения) позицию в буфере
inline Position & position() { return pos; };
WriteBuffer(Position ptr, size_t size) : BufferBase(ptr, size, 0) {}
void set(Position ptr, size_t size) { BufferBase::set(ptr, size, 0); }
/** записать данные, находящиеся в буфере (от начала буфера до текущей позиции);
* переместить позицию в начало; кинуть исключение, если что-то не так
*/
inline void next()
{
bytes_written += pos - working_buffer.begin();
bytes += offset();
nextImpl();
pos = working_buffer.begin();
}
@ -93,22 +67,7 @@ public:
++pos;
}
/** Сколько байт было записано в буфер. */
size_t count()
{
return bytes_written + pos - working_buffer.begin();
}
protected:
std::vector<char> internal_buffer;
Buffer working_buffer;
Position pos;
private:
size_t bytes_written;
/** Записать данные, находящиеся в буфере (от начала буфера до текущей позиции).
* Кинуть исключение, если что-то не так.
*/

View File

@ -7,19 +7,20 @@
#include <DB/Core/ErrorCodes.h>
#include <DB/IO/WriteBuffer.h>
#include <DB/IO/BufferWithOwnMemory.h>
namespace DB
{
class WriteBufferFromOStream : public WriteBuffer
class WriteBufferFromOStream : public BufferWithOwnMemory<WriteBuffer>
{
private:
std::ostream & ostr;
void nextImpl()
{
ostr.write(working_buffer.begin(), pos - working_buffer.begin());
ostr.write(working_buffer.begin(), offset());
ostr.flush();
if (!ostr.good())

View File

@ -29,6 +29,7 @@ int main(int argc, char ** argv)
{
std::ofstream ostr("test1");
DB::WriteBufferFromOStream buf(ostr);
DB::CompressedWriteBuffer compressed_buf(buf);