dbms: IO: fixed error [#CONV-2546].

This commit is contained in:
Alexey Milovidov 2011-06-26 21:30:59 +00:00
parent 36e74d86b3
commit 123e3f8e76
5 changed files with 29 additions and 25 deletions

View File

@ -26,13 +26,6 @@ private:
std::vector<char> compressed_buffer; std::vector<char> compressed_buffer;
char scratch[QLZ_SCRATCH_DECOMPRESS]; char scratch[QLZ_SCRATCH_DECOMPRESS];
public:
CompressedReadBuffer(ReadBuffer & in_)
: in(in_),
compressed_buffer(QUICKLZ_HEADER_SIZE)
{
}
bool nextImpl() bool nextImpl()
{ {
if (in.eof()) if (in.eof())
@ -40,13 +33,13 @@ public:
uint128 checksum; uint128 checksum;
in.readStrict(reinterpret_cast<char *>(&checksum), sizeof(checksum)); in.readStrict(reinterpret_cast<char *>(&checksum), sizeof(checksum));
in.readStrict(&compressed_buffer[0], QUICKLZ_HEADER_SIZE); in.readStrict(&compressed_buffer[0], QUICKLZ_HEADER_SIZE);
size_t size_compressed = qlz_size_compressed(&compressed_buffer[0]); size_t size_compressed = qlz_size_compressed(&compressed_buffer[0]);
if (size_compressed > DBMS_COMPRESSED_READ_BUFFER_MAX_COMPRESSED_SIZE) if (size_compressed > DBMS_COMPRESSED_READ_BUFFER_MAX_COMPRESSED_SIZE)
throw Exception("Too large size_compressed. Most likely corrupted data.", ErrorCodes::TOO_LARGE_SIZE_COMPRESSED); throw Exception("Too large size_compressed. Most likely corrupted data.", ErrorCodes::TOO_LARGE_SIZE_COMPRESSED);
size_t size_decompressed = qlz_size_decompressed(&compressed_buffer[0]); size_t size_decompressed = qlz_size_decompressed(&compressed_buffer[0]);
compressed_buffer.resize(size_compressed); compressed_buffer.resize(size_compressed);
@ -56,13 +49,20 @@ public:
if (checksum != CityHash128(&compressed_buffer[0], size_compressed)) if (checksum != CityHash128(&compressed_buffer[0], size_compressed))
throw Exception("Checksum doesnt match: corrupted data.", ErrorCodes::CHECKSUM_DOESNT_MATCH); throw Exception("Checksum doesnt match: corrupted data.", ErrorCodes::CHECKSUM_DOESNT_MATCH);
qlz_decompress(&compressed_buffer[0], &internal_buffer[0], scratch); qlz_decompress(&compressed_buffer[0], &internal_buffer[0], scratch);
working_buffer = Buffer(working_buffer.begin(), working_buffer.begin() + size_decompressed); working_buffer = Buffer(working_buffer.begin(), working_buffer.begin() + size_decompressed);
return true; return true;
} }
public:
CompressedReadBuffer(ReadBuffer & in_)
: in(in_),
compressed_buffer(QUICKLZ_HEADER_SIZE)
{
}
}; };
} }

View File

@ -23,14 +23,11 @@ private:
size_t compressed_bytes; size_t compressed_bytes;
public:
CompressedWriteBuffer(WriteBuffer & out_) : out(out_), compressed_bytes(0) {}
void nextImpl() void nextImpl()
{ {
size_t uncompressed_size = pos - working_buffer.begin(); size_t uncompressed_size = pos - working_buffer.begin();
compressed_buffer.resize(uncompressed_size + QUICKLZ_ADDITIONAL_SPACE); compressed_buffer.resize(uncompressed_size + QUICKLZ_ADDITIONAL_SPACE);
size_t compressed_size = qlz_compress( size_t compressed_size = qlz_compress(
working_buffer.begin(), working_buffer.begin(),
&compressed_buffer[0], &compressed_buffer[0],
@ -44,6 +41,9 @@ public:
compressed_bytes += compressed_size; compressed_bytes += compressed_size;
} }
public:
CompressedWriteBuffer(WriteBuffer & out_) : out(out_), compressed_bytes(0) {}
/// Объём сжатых данных /// Объём сжатых данных
size_t getCompressedBytes() size_t getCompressedBytes()
{ {

View File

@ -57,6 +57,9 @@ public:
{ {
bytes_read += pos - working_buffer.begin(); bytes_read += pos - working_buffer.begin();
bool res = nextImpl(); bool res = nextImpl();
if (!res)
working_buffer = Buffer(working_buffer.begin(), working_buffer.begin());
pos = working_buffer.begin(); pos = working_buffer.begin();
return res; return res;
} }

View File

@ -17,25 +17,26 @@ class ReadBufferFromIStream : public ReadBuffer
private: private:
std::istream & istr; std::istream & istr;
public:
ReadBufferFromIStream(std::istream & istr_) : istr(istr_) {}
bool nextImpl() bool nextImpl()
{ {
istr.read(working_buffer.begin(), DEFAULT_READ_BUFFER_SIZE); istr.read(working_buffer.begin(), DEFAULT_READ_BUFFER_SIZE);
size_t gcount = istr.gcount();
working_buffer = Buffer(working_buffer.begin(), working_buffer.begin() + istr.gcount()); if (!gcount)
if (working_buffer.end() == working_buffer.begin())
{ {
if (istr.eof()) if (istr.eof())
return false; return false;
if (!istr.good()) else
throw Exception("Cannot read from istream", ErrorCodes::CANNOT_READ_FROM_ISTREAM); throw Exception("Cannot read from istream", ErrorCodes::CANNOT_READ_FROM_ISTREAM);
} }
else
working_buffer = Buffer(working_buffer.begin(), working_buffer.begin() + gcount);
return true; return true;
} }
public:
ReadBufferFromIStream(std::istream & istr_) : istr(istr_) {}
}; };
} }

View File

@ -17,9 +17,6 @@ class WriteBufferFromOStream : public WriteBuffer
private: private:
std::ostream & ostr; std::ostream & ostr;
public:
WriteBufferFromOStream(std::ostream & ostr_) : ostr(ostr_) {}
void nextImpl() void nextImpl()
{ {
ostr.write(working_buffer.begin(), pos - working_buffer.begin()); ostr.write(working_buffer.begin(), pos - working_buffer.begin());
@ -29,6 +26,9 @@ public:
throw Exception("Cannot write to ostream", ErrorCodes::CANNOT_WRITE_TO_OSTREAM); throw Exception("Cannot write to ostream", ErrorCodes::CANNOT_WRITE_TO_OSTREAM);
} }
public:
WriteBufferFromOStream(std::ostream & ostr_) : ostr(ostr_) {}
~WriteBufferFromOStream() ~WriteBufferFromOStream()
{ {
next(); next();