From 58110f5cde5f3ececf2b439ab8cc36748559012d Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Thu, 5 Jan 2012 18:35:22 +0000 Subject: [PATCH] dbms: added LZ4 [#CONV-2944]. --- dbms/include/DB/Core/ErrorCodes.h | 1 + dbms/include/DB/IO/CompressedReadBuffer.h | 7 +- dbms/include/DB/IO/CompressedStream.h | 20 + dbms/include/DB/IO/CompressedWriteBuffer.h | 68 ++- dbms/include/DB/Storages/StorageLog.h | 2 +- libs/liblz4/include/lz4/lz4.h | 95 ++++ libs/liblz4/src/lz4.c | 588 +++++++++++++++++++++ utils/compressor/main.cpp | 65 ++- 8 files changed, 814 insertions(+), 32 deletions(-) create mode 100644 libs/liblz4/include/lz4/lz4.h create mode 100644 libs/liblz4/src/lz4.c diff --git a/dbms/include/DB/Core/ErrorCodes.h b/dbms/include/DB/Core/ErrorCodes.h index 0fc5f391d13..c0ba834f989 100644 --- a/dbms/include/DB/Core/ErrorCodes.h +++ b/dbms/include/DB/Core/ErrorCodes.h @@ -94,6 +94,7 @@ namespace ErrorCodes RECEIVED_ERROR_FROM_REMOTE_IO_SERVER, CANNOT_SEEK_THROUGH_FILE, CANNOT_TRUNCATE_FILE, + UNKNOWN_COMPRESSION_METHOD, POCO_EXCEPTION = 1000, STD_EXCEPTION, diff --git a/dbms/include/DB/IO/CompressedReadBuffer.h b/dbms/include/DB/IO/CompressedReadBuffer.h index c065f72f44e..6c55e1b9e67 100644 --- a/dbms/include/DB/IO/CompressedReadBuffer.h +++ b/dbms/include/DB/IO/CompressedReadBuffer.h @@ -4,6 +4,7 @@ #include #include +#include #include #include @@ -49,7 +50,11 @@ private: if (checksum != CityHash128(&compressed_buffer[0], size_compressed)) throw Exception("Checksum doesnt match: corrupted data.", ErrorCodes::CHECKSUM_DOESNT_MATCH); - qlz_decompress(&compressed_buffer[0], working_buffer.begin(), scratch); + /// Старший бит первого байта определяет использованный метод сжатия. + if ((compressed_buffer[0] & 0x80) == 0) + qlz_decompress(&compressed_buffer[0], working_buffer.begin(), scratch); + else + LZ4_uncompress(&compressed_buffer[QUICKLZ_HEADER_SIZE], working_buffer.begin(), size_decompressed); return true; } diff --git a/dbms/include/DB/IO/CompressedStream.h b/dbms/include/DB/IO/CompressedStream.h index 7a6691f63bc..69ed1c5fd9a 100644 --- a/dbms/include/DB/IO/CompressedStream.h +++ b/dbms/include/DB/IO/CompressedStream.h @@ -3,5 +3,25 @@ /** Общие дефайны */ #define DBMS_MAX_COMPRESSED_SIZE 0x40000000ULL /// 1GB + #define QUICKLZ_ADDITIONAL_SPACE 400 #define QUICKLZ_HEADER_SIZE 9 + +#define LZ4_ADDITIONAL_SPACE_MIN 8.0 +#define LZ4_ADDITIONAL_SPACE_K 0.004 + + +namespace DB +{ + +namespace CompressionMethod +{ + /** Метод сжатия */ + enum Enum + { + QuickLZ = 0, + LZ4 = 1, + }; +} + +} diff --git a/dbms/include/DB/IO/CompressedWriteBuffer.h b/dbms/include/DB/IO/CompressedWriteBuffer.h index a76635fe440..f8d66dcbba8 100644 --- a/dbms/include/DB/IO/CompressedWriteBuffer.h +++ b/dbms/include/DB/IO/CompressedWriteBuffer.h @@ -1,9 +1,14 @@ #pragma once +#include + #include #include #include +#include + +#include #include #include @@ -17,6 +22,7 @@ class CompressedWriteBuffer : public BufferWithOwnMemory { private: WriteBuffer & out; + CompressionMethod::Enum method; std::vector compressed_buffer; char scratch[QLZ_SCRATCH_COMPRESS]; @@ -27,22 +33,66 @@ private: return; size_t uncompressed_size = offset(); - compressed_buffer.resize(uncompressed_size + QUICKLZ_ADDITIONAL_SPACE); + size_t compressed_size = 0; + char * compressed_buffer_ptr = NULL; - size_t compressed_size = qlz_compress( - working_buffer.begin(), - &compressed_buffer[0], - uncompressed_size, - scratch); + /** Для того, чтобы различить между QuickLZ и LZ4 и сохранить обратную совместимость (со случаем, когда использовался только QuickLZ), + * используем старший бит первого байта в сжатых данных (который сейчас не используется в QuickLZ). + * PS. Если потребуется использовать другие библиотеки, то можно использовать ещё один бит первого байта, или старший бит размера. + */ - uint128 checksum = CityHash128(&compressed_buffer[0], compressed_size); + switch (method) + { + case CompressionMethod::QuickLZ: + { + compressed_buffer.resize(uncompressed_size + QUICKLZ_ADDITIONAL_SPACE); + + compressed_size = qlz_compress( + working_buffer.begin(), + &compressed_buffer[0], + uncompressed_size, + scratch); + + compressed_buffer_ptr = &compressed_buffer[0]; + break; + } + case CompressionMethod::LZ4: + { + /** В случае LZ4, в начале запишем заголовок такого же размера и структуры, как в QuickLZ + * 1 байт, чтобы отличить LZ4 от QuickLZ. + * 4 байта - размер сжатых данных + * 4 байта - размер несжатых данных. + */ + compressed_buffer.resize(QUICKLZ_HEADER_SIZE + uncompressed_size + std::max(LZ4_ADDITIONAL_SPACE_MIN, ceil(uncompressed_size * LZ4_ADDITIONAL_SPACE_K))); + + compressed_buffer[0] = 0x82; /// Второй бит - для совместимости с QuickLZ - обозначает, что размеры записываются 4 байтами. + + compressed_size = QUICKLZ_HEADER_SIZE + LZ4_compress( + working_buffer.begin(), + &compressed_buffer[QUICKLZ_HEADER_SIZE], + uncompressed_size); + + UInt32 compressed_size_32 = compressed_size; + UInt32 uncompressed_size_32 = uncompressed_size; + + memcpy(&compressed_buffer[1], reinterpret_cast(&compressed_size_32), sizeof(compressed_size_32)); + memcpy(&compressed_buffer[5], reinterpret_cast(&uncompressed_size_32), sizeof(uncompressed_size_32)); + + compressed_buffer_ptr = &compressed_buffer[0]; + break; + } + default: + throw Exception("Unknown compression method", ErrorCodes::UNKNOWN_COMPRESSION_METHOD); + } + + uint128 checksum = CityHash128(compressed_buffer_ptr, compressed_size); out.write(reinterpret_cast(&checksum), sizeof(checksum)); - out.write(&compressed_buffer[0], compressed_size); + out.write(compressed_buffer_ptr, compressed_size); } public: - CompressedWriteBuffer(WriteBuffer & out_) : out(out_) {} + CompressedWriteBuffer(WriteBuffer & out_, CompressionMethod::Enum method_ = CompressionMethod::QuickLZ) : out(out_), method(method_) {} /// Объём сжатых данных size_t getCompressedBytes() diff --git a/dbms/include/DB/Storages/StorageLog.h b/dbms/include/DB/Storages/StorageLog.h index 4568e866039..6653161ba33 100644 --- a/dbms/include/DB/Storages/StorageLog.h +++ b/dbms/include/DB/Storages/StorageLog.h @@ -54,7 +54,7 @@ private: struct Stream { Stream(const std::string & path) - : plain(path), compressed(plain) {} + : plain(path), compressed(plain, CompressionMethod::LZ4) {} WriteBufferFromFile plain; CompressedWriteBuffer compressed; diff --git a/libs/liblz4/include/lz4/lz4.h b/libs/liblz4/include/lz4/lz4.h new file mode 100644 index 00000000000..5efd36401b7 --- /dev/null +++ b/libs/liblz4/include/lz4/lz4.h @@ -0,0 +1,95 @@ +/* + LZ4 - Fast LZ compression algorithm + Header File + Copyright (C) 2011, Yann Collet. + BSD License + + Redistribution and use in source and binary forms, with or without + modification, are permitted provided that the following conditions are + met: + + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above + copyright notice, this list of conditions and the following disclaimer + in the documentation and/or other materials provided with the + distribution. + + THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +*/ + +#if defined (__cplusplus) +extern "C" { +#endif + + +//**************************** +// Simple Functions +//**************************** + +int LZ4_compress (char* source, char* dest, int isize); +int LZ4_uncompress (char* source, char* dest, int osize); + +/* +LZ4_compress : + return : the number of bytes in compressed buffer dest + note : destination buffer must be already allocated. + To avoid any problem, size it to handle worst cases situations (input data not compressible) + Worst case size is : "inputsize + 0.4%", with "0.4%" being at least 8 bytes. + +LZ4_uncompress : + osize : is the output size, therefore the original size + return : the number of bytes read in the source buffer + If the source stream is malformed, the function will stop decoding and return a negative result, indicating the byte position of the faulty instruction + This version never writes beyond dest + osize, and is therefore protected against malicious data packets + note 2 : destination buffer must be already allocated +*/ + + +//**************************** +// Advanced Functions +//**************************** + +int LZ4_uncompress_unknownOutputSize (char* source, char* dest, int isize, int maxOutputSize); + +/* +LZ4_uncompress_unknownOutputSize : + isize : is the input size, therefore the compressed size + maxOutputSize : is the size of the destination buffer (which must be already allocated) + return : the number of bytes decoded in the destination buffer (necessarily <= maxOutputSize) + If the source stream is malformed, the function will stop decoding and return a negative result, indicating the byte position of the faulty instruction + This version never writes beyond dest + maxOutputSize, and is therefore protected against malicious data packets + note : This version is slower than LZ4_uncompress, and is therefore not recommended for general use +*/ + + +int LZ4_compressCtx(void** ctx, char* source, char* dest, int isize); + +/* +LZ4_compressCtx : + This function explicitly handles the CTX memory structure. + It avoids allocating/deallocating memory between each call, improving performance when malloc is time-consuming. + Note : when memory is allocated into the stack (default mode), there is no "malloc" penalty. + Therefore, this function is mostly useful when memory is allocated into the heap (it requires increasing HASH_LOG value beyond STACK_LIMIT) + + On first call : provide a *ctx=NULL; It will be automatically allocated. + On next calls : reuse the same ctx pointer. + Use different pointers for different threads when doing multi-threading. + + note : performance difference is small, mostly noticeable when repetitively calling the compression algorithm on many small segments. +*/ + + +#if defined (__cplusplus) +} +#endif diff --git a/libs/liblz4/src/lz4.c b/libs/liblz4/src/lz4.c new file mode 100644 index 00000000000..0c011bbd554 --- /dev/null +++ b/libs/liblz4/src/lz4.c @@ -0,0 +1,588 @@ +/* + LZ4 - Fast LZ compression algorithm + Copyright (C) 2011, Yann Collet. + BSD License + + Redistribution and use in source and binary forms, with or without + modification, are permitted provided that the following conditions are + met: + + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above + copyright notice, this list of conditions and the following disclaimer + in the documentation and/or other materials provided with the + distribution. + + THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +*/ + +//************************************** +// Includes +//************************************** +#include // for malloc +#include // for memset +#include + + +//************************************** +// Performance parameter +//************************************** +// Increasing this value improves compression ratio +// Lowering this value reduces memory usage +// Lowering may also improve speed, typically on reaching cache size limits (L1 32KB for Intel, 64KB for AMD) +// Memory usage formula for 32 bits systems : N->2^(N+2) Bytes (examples : 17 -> 512KB ; 12 -> 16KB) +#define HASH_LOG 12 + + +//************************************** +// Basic Types +//************************************** +#if defined(_MSC_VER) +#define BYTE unsigned __int8 +#define U16 unsigned __int16 +#define U32 unsigned __int32 +#define S32 __int32 +#else +#include +#define BYTE uint8_t +#define U16 uint16_t +#define U32 uint32_t +#define S32 int32_t +#endif + + +//************************************** +// Constants +//************************************** +#define MINMATCH 4 +#define SKIPSTRENGTH 6 +#define STACKLIMIT 13 +#define HEAPMODE (HASH_LOG>STACKLIMIT) // Defines if memory is allocated into the stack (local variable), or into the heap (malloc()). +#define COPYTOKEN 4 +#define COPYLENGTH 8 +#define LASTLITERALS 5 +#define MFLIMIT (COPYLENGTH+MINMATCH) +#define MINLENGTH (MFLIMIT+1) + +#define MAXD_LOG 16 +#define MAX_DISTANCE ((1 << MAXD_LOG) - 1) + +#define HASHTABLESIZE (1 << HASH_LOG) +#define HASH_MASK (HASHTABLESIZE - 1) + +#define ML_BITS 4 +#define ML_MASK ((1U<v) +#define A16(x) (((U16_S *)(x))->v) + + +//************************************** +// Macros +//************************************** +#define LZ4_HASH_FUNCTION(i) (((i) * 2654435761U) >> ((MINMATCH*8)-HASH_LOG)) +#define LZ4_HASH_VALUE(p) LZ4_HASH_FUNCTION(A32(p)) +#define LZ4_COPYPACKET(s,d) A32(d) = A32(s); d+=4; s+=4; A32(d) = A32(s); d+=4; s+=4; +#define LZ4_WILDCOPY(s,d,e) do { LZ4_COPYPACKET(s,d) } while (dhashTable; + memset((void*)HashTable, 0, sizeof(srt->hashTable)); +#else + (void) ctx; +#endif + + + // First Byte + HashTable[LZ4_HASH_VALUE(ip)] = ip; + ip++; forwardH = LZ4_HASH_VALUE(ip); + + // Main Loop + for ( ; ; ) + { + int findMatchAttempts = (1U << skipStrength) + 3; + const BYTE* forwardIp = ip; + const BYTE* ref; + BYTE* token; + + // Find a match + do { + U32 h = forwardH; + int step = findMatchAttempts++ >> skipStrength; + ip = forwardIp; + forwardIp = ip + step; + + if (forwardIp > mflimit) { goto _last_literals; } + + forwardH = LZ4_HASH_VALUE(forwardIp); + ref = HashTable[h]; + HashTable[h] = ip; + + } while ((ref < ip - MAX_DISTANCE) || (A32(ref) != A32(ip))); + + // Catch up + while ((ip>anchor) && (ref>(BYTE*)source) && (ip[-1]==ref[-1])) { ip--; ref--; } + + // Encode Literal length + length = ip - anchor; + token = op++; + if (length>=(int)RUN_MASK) { *token=(RUN_MASK< 254 ; len-=255) *op++ = 255; *op++ = (BYTE)len; } + else *token = (length<matchlimit-4) { ref -= ip - (matchlimit-3); ip = matchlimit-3; break; } + } + if (A16(ref) == A16(ip)) { ip+=2; ref+=2; } + if (*ref == *ip) ip++; + len = (ip - anchor); + + // Encode MatchLength + if (len>=(int)ML_MASK) { *token+=ML_MASK; len-=ML_MASK; for(; len > 509 ; len-=510) { *op++ = 255; *op++ = 255; } if (len > 254) { len-=255; *op++ = 255; } *op++ = (BYTE)len; } + else *token += len; + + // Test end of chunk + if (ip > mflimit) { anchor = ip; break; } + + // Fill table + HashTable[LZ4_HASH_VALUE(ip-2)] = ip-2; + + // Test next position + ref = HashTable[LZ4_HASH_VALUE(ip)]; + HashTable[LZ4_HASH_VALUE(ip)] = ip; + if ((ref > ip - (MAX_DISTANCE + 1)) && (A32(ref) == A32(ip))) { token = op++; *token=0; goto _next_match; } + + // Prepare next loop + anchor = ip++; + forwardH = LZ4_HASH_VALUE(ip); + } + +_last_literals: + // Encode Last Literals + { + int lastRun = iend - anchor; + if (lastRun>=(int)RUN_MASK) { *op++=(RUN_MASK< 254 ; lastRun-=255) *op++ = 255; *op++ = (BYTE) lastRun; } + else *op++ = (lastRun<> ((MINMATCH*8)-HASHLOG64K)) +#define LZ4_HASH64K_VALUE(p) LZ4_HASH64K_FUNCTION(A32(p)) +int LZ4_compress64kCtx(void** ctx, + char* source, + char* dest, + int isize) +{ +#if HEAPMODE + struct refTables *srt = (struct refTables *) (*ctx); + U16* HashTable; +#else + U16 HashTable[HASHTABLESIZE<<1] = {0}; +#endif + + const BYTE* ip = (BYTE*) source; + const BYTE* anchor = ip; + const BYTE* const base = ip; + const BYTE* const iend = ip + isize; + const BYTE* const mflimit = iend - MFLIMIT; +#define matchlimit (iend - LASTLITERALS) + + BYTE* op = (BYTE*) dest; + + int len, length; + const int skipStrength = SKIPSTRENGTH; + U32 forwardH; + + + // Init + if (isizehashTable); + memset((void*)HashTable, 0, sizeof(srt->hashTable)); +#else + (void) ctx; +#endif + + + // First Byte + ip++; forwardH = LZ4_HASH64K_VALUE(ip); + + // Main Loop + for ( ; ; ) + { + int findMatchAttempts = (1U << skipStrength) + 3; + const BYTE* forwardIp = ip; + const BYTE* ref; + BYTE* token; + + // Find a match + do { + U32 h = forwardH; + int step = findMatchAttempts++ >> skipStrength; + ip = forwardIp; + forwardIp = ip + step; + + if (forwardIp > mflimit) { goto _last_literals; } + + forwardH = LZ4_HASH64K_VALUE(forwardIp); + ref = base + HashTable[h]; + HashTable[h] = ip - base; + + } while (A32(ref) != A32(ip)); + + // Catch up + while ((ip>anchor) && (ref>(BYTE*)source) && (ip[-1]==ref[-1])) { ip--; ref--; } + + // Encode Literal length + length = ip - anchor; + token = op++; + if (length>=(int)RUN_MASK) { *token=(RUN_MASK< 254 ; len-=255) *op++ = 255; *op++ = (BYTE)len; } + else *token = (length<=(int)ML_MASK) { *token+=ML_MASK; len-=ML_MASK; for(; len > 509 ; len-=510) { *op++ = 255; *op++ = 255; } if (len > 254) { len-=255; *op++ = 255; } *op++ = (BYTE)len; } + else *token += len; + + // Test end of chunk + if (ip > mflimit) { anchor = ip; break; } + + // Test next position + ref = base + HashTable[LZ4_HASH64K_VALUE(ip)]; + HashTable[LZ4_HASH64K_VALUE(ip)] = ip - base; + if ((ref > ip - (MAX_DISTANCE + 1)) && (A32(ref) == A32(ip))) { token = op++; *token=0; goto _next_match; } + + // Prepare next loop + anchor = ip++; + forwardH = LZ4_HASH64K_VALUE(ip); + } + +_last_literals: + // Encode Last Literals + { + int lastRun = iend - anchor; + if (lastRun>=(int)RUN_MASK) { *op++=(RUN_MASK< 254 ; lastRun-=255) *op++ = 255; *op++ = (BYTE) lastRun; } + else *op++ = (lastRun<>ML_BITS)) == RUN_MASK) { for (;(len=*ip++)==255;length+=255){} length += len; } + + // copy literals + ref = op+length; + if (ref>oend-COPYLENGTH) + { + if (ref > oend) goto _output_error; + memcpy(op, ip, length); + ip += length; + break; // Necessarily EOF + } + LZ4_WILDCOPY(ip, op, ref); ip -= (op-ref); op = ref; + + + // get offset + ref -= A16(ip); ip+=2; + + // get matchlength + if ((length=(token&ML_MASK)) == ML_MASK) { for (;*ip==255;length+=255) {ip++;} length += *ip++; } + + // copy repeated sequence + if (op-ref oend-COPYLENGTH) + { + if (cpy > oend) goto _output_error; + LZ4_WILDCOPY(ref, op, (oend-COPYLENGTH)); + while(op>ML_BITS)) == RUN_MASK) { for (;(len=*ip++)==255;length+=255){} length += len; } + + // copy literals + ref = op+length; + if (ref>oend-COPYLENGTH) + { + if (ref > oend) goto _output_error; + memcpy(op, ip, length); + ip += length; + break; // Necessarily EOF + } + LZ4_WILDCOPY(ip, op, ref); ip -= (op-ref); op = ref; + if (ip>=iend) break; // check EOF + + + // get offset + ref -= A16(ip); ip+=2; + + // get matchlength + if ((length=(token&ML_MASK)) == ML_MASK) { for (;(len=*ip++)==255;length+=255){} length += len; } + + // copy repeated sequence + if (op-refoend-COPYLENGTH) + { + if (cpy > oend) goto _output_error; + LZ4_WILDCOPY(ref, op, (oend-COPYLENGTH)); + while(op 2 || (argc == 2 && strcmp(argv[1], "-d"))) + try { - std::cerr << "Usage: " << argv[0] << " [-d] < in > out" << std::endl; - return 1; + bool decompress = false; + bool use_lz4 = false; + + if (argc == 2) + { + decompress = 0 == strcmp(argv[1], "-d"); + use_lz4 = 0 == strcmp(argv[1], "--lz4"); + } + + if (argc > 2 || (argc == 2 && !decompress && !use_lz4)) + { + std::cerr << "Usage: " << argv[0] << " [-d|--lz4] < in > out" << std::endl; + return 1; + } + + DB::CompressionMethod::Enum method = use_lz4 ? DB::CompressionMethod::LZ4 : DB::CompressionMethod::QuickLZ; + + Poco::SharedPtr rb = new DB::ReadBufferFromFileDescriptor(STDIN_FILENO); + Poco::SharedPtr wb = new DB::WriteBufferFromFileDescriptor(STDOUT_FILENO); + Poco::SharedPtr from; + Poco::SharedPtr to; + + if (!decompress) + { + /// Сжатие + from = rb; + to = new DB::CompressedWriteBuffer(*wb, method); + } + else + { + /// Разжатие + from = new DB::CompressedReadBuffer(*rb); + to = wb; + } + + DB::copyData(*from, *to); } - - Poco::SharedPtr rb = new DB::ReadBufferFromFileDescriptor(STDIN_FILENO); - Poco::SharedPtr wb = new DB::WriteBufferFromFileDescriptor(STDOUT_FILENO); - Poco::SharedPtr from; - Poco::SharedPtr to; - - if (argc == 1) + catch (const DB::Exception & e) { - /// Сжатие - from = rb; - to = new DB::CompressedWriteBuffer(*wb); + std::cerr << e.what() << ", " << e.message() << std::endl + << std::endl + << "Stack trace:" << std::endl + << e.getStackTrace().toString() + << std::endl; + throw; } - else - { - /// Разжатие - from = new DB::CompressedReadBuffer(*rb); - to = wb; - } - - DB::copyData(*from, *to); return 0; }