diff --git a/src/IO/LzmaReadBuffer.cpp b/src/IO/LzmaReadBuffer.cpp index 18cca25b4d5..3f458de3880 100644 --- a/src/IO/LzmaReadBuffer.cpp +++ b/src/IO/LzmaReadBuffer.cpp @@ -21,13 +21,13 @@ LzmaReadBuffer::LzmaReadBuffer( lstr.avail_out = 0; // 500 mb - uint64_t memlimit = 500 << 30; + uint64_t memlimit = 500 << 20; - lzma_ret ret = lzma_stream_decoder(&lstr, memlimit, LZMA_CONCATENATED | LZMA_IGNORE_CHECK); + lzma_ret ret = lzma_stream_decoder(&lstr, memlimit, LZMA_CONCATENATED); // lzma does not provide api for converting error code to string unlike zlib if (ret != LZMA_OK) throw Exception( - std::string("lzma_stream_decoder failed: error code: ") + std::to_string(ret) + "; lzma version: " + LZMA_VERSION_STRING, + std::string("lzma_stream_decoder initialization failed: error code: ") + std::to_string(ret) + "; lzma version: " + LZMA_VERSION_STRING, ErrorCodes::LZMA_STREAM_DECODER_FAILED); } @@ -52,9 +52,10 @@ bool LzmaReadBuffer::nextImpl() lstr.next_out = reinterpret_cast(internal_buffer.begin()); lstr.avail_out = internal_buffer.size(); - lzma_ret ret = lzma_code(&lstr, LZMA_FINISH); + lzma_ret ret = lzma_code(&lstr, LZMA_RUN); in->position() = in->buffer().end() - lstr.avail_in; + working_buffer.resize(internal_buffer.size() - lstr.avail_out); if (ret == LZMA_STREAM_END) { @@ -62,6 +63,10 @@ bool LzmaReadBuffer::nextImpl() { eof = true; return working_buffer.size() != 0; + } else { + throw Exception( + std::string("lzma decoder finished, but stream is still alive: error code: ") + std::to_string(ret) + "; lzma version: " + LZMA_VERSION_STRING, + ErrorCodes::LZMA_STREAM_DECODER_FAILED); } } diff --git a/src/IO/LzmaWriteBuffer.cpp b/src/IO/LzmaWriteBuffer.cpp index 6ca867d2af7..332961a292f 100644 --- a/src/IO/LzmaWriteBuffer.cpp +++ b/src/IO/LzmaWriteBuffer.cpp @@ -60,24 +60,31 @@ void LzmaWriteBuffer::nextImpl() lstr.next_in = reinterpret_cast(working_buffer.begin()); lstr.avail_in = offset(); - std::cout << lstr.avail_in << std::endl; + //std::cout << lstr.avail_in << std::endl; lzma_action action = LZMA_RUN; do { out->nextIfAtEnd(); lstr.next_out = reinterpret_cast(out->position()); lstr.avail_out = out->buffer().end() - out->position(); + //std::cout << lstr.avail_out << " BEFOR" << std::endl; + lzma_ret ret = lzma_code(&lstr, action); out->position() = out->buffer().end() - lstr.avail_out; - if (ret == LZMA_STREAM_END) - return; + //std::cout << lstr.avail_out << " AFTER" << std::endl; + + //std::cout << ret << " RET IMPL" << std::endl; + + if (ret == LZMA_STREAM_END) { + return; + } if (ret != LZMA_OK) throw Exception(std::string("lzma stream encoding failed: ") + "; lzma version: " + LZMA_VERSION_STRING, ErrorCodes::LZMA_STREAM_ENCODER_FAILED); - std::cout << lstr.avail_in << std::endl; + //std::cout << lstr.avail_in << " " << lstr.avail_out << std::endl; } while (lstr.avail_in > 0 || lstr.avail_out == 0); } @@ -97,22 +104,18 @@ void LzmaWriteBuffer::finish() lzma_ret ret = lzma_code(&lstr, LZMA_FINISH); out->position() = out->buffer().end() - lstr.avail_out; - if (ret == LZMA_STREAM_END) - return; + //std::cout << ret << " RET FIN" << std::endl; + + if (ret == LZMA_STREAM_END) { + finished = true; + return; + } if (ret != LZMA_OK) throw Exception(std::string("lzma stream encoding failed: ") + "; lzma version: " + LZMA_VERSION_STRING, ErrorCodes::LZMA_STREAM_ENCODER_FAILED); - std::cout << lstr.avail_in << std::endl; + //std::cout << lstr.avail_in << std::endl; } while (lstr.avail_out == 0); - - while (true) { - out->nextIfAtEnd(); - lstr.next_out = reinterpret_cast(out->position()); - lstr.avail_out = out->buffer().end() - out->position(); - - - } } } \ No newline at end of file diff --git a/src/IO/tests/lzma_buffers.cpp b/src/IO/tests/lzma_buffers.cpp index 6d6b1a7f2df..1f691fa09f7 100644 --- a/src/IO/tests/lzma_buffers.cpp +++ b/src/IO/tests/lzma_buffers.cpp @@ -14,7 +14,7 @@ try { std::cout << std::fixed << std::setprecision(2); - size_t n = 100000; + size_t n = 10000000; Stopwatch stopwatch; { @@ -35,7 +35,7 @@ try << ", " << (lzma_buf.count() / stopwatch.elapsedSeconds() / 1000000) << " MB/s" << std::endl; } -/* + { auto buf = std::make_unique("test_lzma_buffers.xz"); DB::LzmaReadBuffer lzma_buf(std::move(buf)); @@ -56,7 +56,7 @@ try << std::endl; } -*/ + return 0; } catch (const DB::Exception & e)