fixed compressor testing, added base logic for compressor and decompressor, tests passing

This commit is contained in:
a.palagashvili 2020-11-02 02:52:34 +03:00
parent be2b002ff4
commit 495cd47fd8
3 changed files with 30 additions and 22 deletions

View File

@ -21,13 +21,13 @@ LzmaReadBuffer::LzmaReadBuffer(
lstr.avail_out = 0; lstr.avail_out = 0;
// 500 mb // 500 mb
uint64_t memlimit = 500 << 30; uint64_t memlimit = 500 << 20;
lzma_ret ret = lzma_stream_decoder(&lstr, memlimit, LZMA_CONCATENATED | LZMA_IGNORE_CHECK); lzma_ret ret = lzma_stream_decoder(&lstr, memlimit, LZMA_CONCATENATED);
// lzma does not provide api for converting error code to string unlike zlib // lzma does not provide api for converting error code to string unlike zlib
if (ret != LZMA_OK) if (ret != LZMA_OK)
throw Exception( throw Exception(
std::string("lzma_stream_decoder failed: error code: ") + std::to_string(ret) + "; lzma version: " + LZMA_VERSION_STRING, std::string("lzma_stream_decoder initialization failed: error code: ") + std::to_string(ret) + "; lzma version: " + LZMA_VERSION_STRING,
ErrorCodes::LZMA_STREAM_DECODER_FAILED); ErrorCodes::LZMA_STREAM_DECODER_FAILED);
} }
@ -52,9 +52,10 @@ bool LzmaReadBuffer::nextImpl()
lstr.next_out = reinterpret_cast<unsigned char *>(internal_buffer.begin()); lstr.next_out = reinterpret_cast<unsigned char *>(internal_buffer.begin());
lstr.avail_out = internal_buffer.size(); lstr.avail_out = internal_buffer.size();
lzma_ret ret = lzma_code(&lstr, LZMA_FINISH); lzma_ret ret = lzma_code(&lstr, LZMA_RUN);
in->position() = in->buffer().end() - lstr.avail_in; in->position() = in->buffer().end() - lstr.avail_in;
working_buffer.resize(internal_buffer.size() - lstr.avail_out);
if (ret == LZMA_STREAM_END) if (ret == LZMA_STREAM_END)
{ {
@ -62,6 +63,10 @@ bool LzmaReadBuffer::nextImpl()
{ {
eof = true; eof = true;
return working_buffer.size() != 0; return working_buffer.size() != 0;
} else {
throw Exception(
std::string("lzma decoder finished, but stream is still alive: error code: ") + std::to_string(ret) + "; lzma version: " + LZMA_VERSION_STRING,
ErrorCodes::LZMA_STREAM_DECODER_FAILED);
} }
} }

View File

@ -60,24 +60,31 @@ void LzmaWriteBuffer::nextImpl()
lstr.next_in = reinterpret_cast<unsigned char *>(working_buffer.begin()); lstr.next_in = reinterpret_cast<unsigned char *>(working_buffer.begin());
lstr.avail_in = offset(); lstr.avail_in = offset();
std::cout << lstr.avail_in << std::endl; //std::cout << lstr.avail_in << std::endl;
lzma_action action = LZMA_RUN; lzma_action action = LZMA_RUN;
do { do {
out->nextIfAtEnd(); out->nextIfAtEnd();
lstr.next_out = reinterpret_cast<unsigned char *>(out->position()); lstr.next_out = reinterpret_cast<unsigned char *>(out->position());
lstr.avail_out = out->buffer().end() - out->position(); lstr.avail_out = out->buffer().end() - out->position();
//std::cout << lstr.avail_out << " BEFOR" << std::endl;
lzma_ret ret = lzma_code(&lstr, action); lzma_ret ret = lzma_code(&lstr, action);
out->position() = out->buffer().end() - lstr.avail_out; out->position() = out->buffer().end() - lstr.avail_out;
if (ret == LZMA_STREAM_END) //std::cout << lstr.avail_out << " AFTER" << std::endl;
//std::cout << ret << " RET IMPL" << std::endl;
if (ret == LZMA_STREAM_END) {
return; return;
}
if (ret != LZMA_OK) if (ret != LZMA_OK)
throw Exception(std::string("lzma stream encoding failed: ") + "; lzma version: " + LZMA_VERSION_STRING, ErrorCodes::LZMA_STREAM_ENCODER_FAILED); throw Exception(std::string("lzma stream encoding failed: ") + "; lzma version: " + LZMA_VERSION_STRING, ErrorCodes::LZMA_STREAM_ENCODER_FAILED);
std::cout << lstr.avail_in << std::endl; //std::cout << lstr.avail_in << " " << lstr.avail_out << std::endl;
} while (lstr.avail_in > 0 || lstr.avail_out == 0); } while (lstr.avail_in > 0 || lstr.avail_out == 0);
} }
@ -97,22 +104,18 @@ void LzmaWriteBuffer::finish()
lzma_ret ret = lzma_code(&lstr, LZMA_FINISH); lzma_ret ret = lzma_code(&lstr, LZMA_FINISH);
out->position() = out->buffer().end() - lstr.avail_out; out->position() = out->buffer().end() - lstr.avail_out;
if (ret == LZMA_STREAM_END) //std::cout << ret << " RET FIN" << std::endl;
if (ret == LZMA_STREAM_END) {
finished = true;
return; return;
}
if (ret != LZMA_OK) if (ret != LZMA_OK)
throw Exception(std::string("lzma stream encoding failed: ") + "; lzma version: " + LZMA_VERSION_STRING, ErrorCodes::LZMA_STREAM_ENCODER_FAILED); throw Exception(std::string("lzma stream encoding failed: ") + "; lzma version: " + LZMA_VERSION_STRING, ErrorCodes::LZMA_STREAM_ENCODER_FAILED);
std::cout << lstr.avail_in << std::endl; //std::cout << lstr.avail_in << std::endl;
} while (lstr.avail_out == 0); } while (lstr.avail_out == 0);
while (true) {
out->nextIfAtEnd();
lstr.next_out = reinterpret_cast<unsigned char *>(out->position());
lstr.avail_out = out->buffer().end() - out->position();
}
} }
} }

View File

@ -14,7 +14,7 @@ try
{ {
std::cout << std::fixed << std::setprecision(2); std::cout << std::fixed << std::setprecision(2);
size_t n = 100000; size_t n = 10000000;
Stopwatch stopwatch; Stopwatch stopwatch;
{ {
@ -35,7 +35,7 @@ try
<< ", " << (lzma_buf.count() / stopwatch.elapsedSeconds() / 1000000) << " MB/s" << ", " << (lzma_buf.count() / stopwatch.elapsedSeconds() / 1000000) << " MB/s"
<< std::endl; << std::endl;
} }
/*
{ {
auto buf = std::make_unique<DB::ReadBufferFromFile>("test_lzma_buffers.xz"); auto buf = std::make_unique<DB::ReadBufferFromFile>("test_lzma_buffers.xz");
DB::LzmaReadBuffer lzma_buf(std::move(buf)); DB::LzmaReadBuffer lzma_buf(std::move(buf));
@ -56,7 +56,7 @@ try
<< std::endl; << std::endl;
} }
*/
return 0; return 0;
} }
catch (const DB::Exception & e) catch (const DB::Exception & e)