From f999ea2a6f123a962e2fa2e7edd38bb6b0f6e1c9 Mon Sep 17 00:00:00 2001 From: "a.palagashvili" Date: Tue, 10 Nov 2020 01:52:22 +0300 Subject: [PATCH] renamed files, added new library, changed error codes, added tests for file() function --- .gitmodules | 4 ++ CHANGELOG.md | 1 - contrib/fast-lzma2 | 1 + src/CMakeLists.txt | 7 +++ src/Common/ErrorCodes.cpp | 6 +-- src/IO/CompressionMethod.cpp | 50 +++++++++---------- ...uffer.cpp => LZMADeflatingWriteBuffer.cpp} | 23 ++++++--- ...iteBuffer.h => LZMADeflatingWriteBuffer.h} | 9 ++-- ...Buffer.cpp => LZMAInflatingReadBuffer.cpp} | 18 +++++-- ...ReadBuffer.h => LZMAInflatingReadBuffer.h} | 11 ++-- src/IO/tests/lzma_buffers.cpp | 26 +++++----- .../01059_storage_file_brotli.reference | 4 +- .../0_stateless/01059_storage_file_brotli.sql | 11 +++- 13 files changed, 105 insertions(+), 66 deletions(-) create mode 160000 contrib/fast-lzma2 rename src/IO/{LzmaWriteBuffer.cpp => LZMADeflatingWriteBuffer.cpp} (85%) rename src/IO/{LzmaWriteBuffer.h => LZMADeflatingWriteBuffer.h} (73%) rename src/IO/{LzmaReadBuffer.cpp => LZMAInflatingReadBuffer.cpp} (81%) rename src/IO/{LzmaReadBuffer.h => LZMAInflatingReadBuffer.h} (68%) diff --git a/.gitmodules b/.gitmodules index ecaf806067c..e489444d46c 100644 --- a/.gitmodules +++ b/.gitmodules @@ -17,6 +17,7 @@ [submodule "contrib/zlib-ng"] path = contrib/zlib-ng url = https://github.com/ClickHouse-Extras/zlib-ng.git + branch = clickhouse [submodule "contrib/googletest"] path = contrib/googletest url = https://github.com/google/googletest.git @@ -193,3 +194,6 @@ [submodule "contrib/xz"] path = contrib/xz url = https://github.com/xz-mirror/xz +[submodule "contrib/fast-lzma2"] + path = contrib/fast-lzma2 + url = https://github.com/conor42/fast-lzma2 diff --git a/CHANGELOG.md b/CHANGELOG.md index 1242bd2e7b4..457346aff9a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -219,7 +219,6 @@ * Added column transformers `EXCEPT`, `REPLACE`, `APPLY`, which can be applied to the list of selected columns (after `*` or `COLUMNS(...)`). For example, you can write `SELECT * EXCEPT(URL) REPLACE(number + 1 AS number)`. Another example: `select * apply(length) apply(max) from wide_string_table` to find out the maxium length of all string columns. [#14233](https://github.com/ClickHouse/ClickHouse/pull/14233) ([Amos Bird](https://github.com/amosbird)). * Added an aggregate function `rankCorr` which computes a rank correlation coefficient. [#11769](https://github.com/ClickHouse/ClickHouse/pull/11769) ([antikvist](https://github.com/antikvist)) [#14411](https://github.com/ClickHouse/ClickHouse/pull/14411) ([Nikita Mikhaylov](https://github.com/nikitamikhaylov)). * Added table function `view` which turns a subquery into a table object. This helps passing queries around. For instance, it can be used in remote/cluster table functions. [#12567](https://github.com/ClickHouse/ClickHouse/pull/12567) ([Amos Bird](https://github.com/amosbird)). -* Added support for `xz` compression format. This enables using `*.xz` files in `table()` function. [#16578](https://github.com/ClickHouse/ClickHouse/pull/16578) ([Abi Palagashvili](https://github.com/fibersel)) #### Bug Fix diff --git a/contrib/fast-lzma2 b/contrib/fast-lzma2 new file mode 160000 index 00000000000..ded964d203c --- /dev/null +++ b/contrib/fast-lzma2 @@ -0,0 +1 @@ +Subproject commit ded964d203cabe1a572d2c813c55e8a94b4eda48 diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 7a8fe586e11..6839b06677d 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -336,6 +336,13 @@ if (LZMA_LIBRARY) target_include_directories (clickhouse_common_io SYSTEM BEFORE PUBLIC ${LZMA_INCLUDE_DIR}) endif() +set (FAST_LZMA_LIBRARY fast-lzma2) +set (FAST_LZMA_INCLUDE_DIR ${ClickHouse_SOURCE_DIR}/contrib/fast-lzma2/) +if (FAST_LZMA_LIBRARY) + target_link_libraries (clickhouse_common_io PUBLIC ${FAST_LZMA_LIBRARY}) + target_include_directories (clickhouse_common_io SYSTEM BEFORE PUBLIC ${FAST_LZMA_INCLUDE_DIR}) +endif() + if (USE_ICU) dbms_target_link_libraries (PRIVATE ${ICU_LIBRARIES}) dbms_target_include_directories (SYSTEM PRIVATE ${ICU_INCLUDE_DIRS}) diff --git a/src/Common/ErrorCodes.cpp b/src/Common/ErrorCodes.cpp index b12623c73ff..82084976a82 100644 --- a/src/Common/ErrorCodes.cpp +++ b/src/Common/ErrorCodes.cpp @@ -519,13 +519,13 @@ M(550, CONDITIONAL_TREE_PARENT_NOT_FOUND) \ M(551, ILLEGAL_PROJECTION_MANIPULATOR) \ M(552, UNRECOGNIZED_ARGUMENTS) \ + M(553, LZMA_STREAM_ENCODER_FAILED) \ + M(554, LZMA_STREAM_DECODER_FAILED) \ \ M(999, KEEPER_EXCEPTION) \ M(1000, POCO_EXCEPTION) \ M(1001, STD_EXCEPTION) \ - M(1002, UNKNOWN_EXCEPTION) \ - M(1003, LZMA_STREAM_ENCODER_FAILED) \ - M(1004, LZMA_STREAM_DECODER_FAILED) + M(1002, UNKNOWN_EXCEPTION) /* See END */ diff --git a/src/IO/CompressionMethod.cpp b/src/IO/CompressionMethod.cpp index 0bf390d92ca..ec278b5d71f 100644 --- a/src/IO/CompressionMethod.cpp +++ b/src/IO/CompressionMethod.cpp @@ -1,13 +1,13 @@ #include -#include -#include -#include -#include #include #include -#include -#include +#include +#include +#include +#include +#include +#include #if !defined(ARCADIA_BUILD) # include @@ -16,7 +16,6 @@ namespace DB { - namespace ErrorCodes { extern const int NOT_IMPLEMENTED; @@ -27,11 +26,16 @@ std::string toContentEncodingName(CompressionMethod method) { switch (method) { - case CompressionMethod::Gzip: return "gzip"; - case CompressionMethod::Zlib: return "deflate"; - case CompressionMethod::Brotli: return "br"; - case CompressionMethod::Xz: return "xz"; - case CompressionMethod::None: return ""; + case CompressionMethod::Gzip: + return "gzip"; + case CompressionMethod::Zlib: + return "deflate"; + case CompressionMethod::Brotli: + return "br"; + case CompressionMethod::Xz: + return "xz"; + case CompressionMethod::None: + return ""; } __builtin_unreachable(); } @@ -55,20 +59,19 @@ CompressionMethod chooseCompressionMethod(const std::string & path, const std::s return CompressionMethod::Zlib; if (*method_str == "brotli" || *method_str == "br") return CompressionMethod::Brotli; + if (*method_str == "LZMA" || *method_str == "xz") + return CompressionMethod::Xz; if (hint.empty() || hint == "auto" || hint == "none") return CompressionMethod::None; - throw Exception("Unknown compression method " + hint + ". Only 'auto', 'none', 'gzip', 'br' are supported as compression methods", + throw Exception( + "Unknown compression method " + hint + ". Only 'auto', 'none', 'gzip', 'br', 'xz' are supported as compression methods", ErrorCodes::NOT_IMPLEMENTED); } std::unique_ptr wrapReadBufferWithCompressionMethod( - std::unique_ptr nested, - CompressionMethod method, - size_t buf_size, - char * existing_memory, - size_t alignment) + std::unique_ptr nested, CompressionMethod method, size_t buf_size, char * existing_memory, size_t alignment) { if (method == CompressionMethod::Gzip || method == CompressionMethod::Zlib) return std::make_unique(std::move(nested), method, buf_size, existing_memory, alignment); @@ -77,7 +80,7 @@ std::unique_ptr wrapReadBufferWithCompressionMethod( return std::make_unique(std::move(nested), buf_size, existing_memory, alignment); #endif if (method == CompressionMethod::Xz) - return std::make_unique(std::move(nested), buf_size, existing_memory, alignment); + return std::make_unique(std::move(nested), buf_size, existing_memory, alignment); if (method == CompressionMethod::None) return nested; @@ -87,12 +90,7 @@ std::unique_ptr wrapReadBufferWithCompressionMethod( std::unique_ptr wrapWriteBufferWithCompressionMethod( - std::unique_ptr nested, - CompressionMethod method, - int level, - size_t buf_size, - char * existing_memory, - size_t alignment) + std::unique_ptr nested, CompressionMethod method, int level, size_t buf_size, char * existing_memory, size_t alignment) { if (method == DB::CompressionMethod::Gzip || method == CompressionMethod::Zlib) return std::make_unique(std::move(nested), method, level, buf_size, existing_memory, alignment); @@ -101,6 +99,8 @@ std::unique_ptr wrapWriteBufferWithCompressionMethod( if (method == DB::CompressionMethod::Brotli) return std::make_unique(std::move(nested), level, buf_size, existing_memory, alignment); #endif + if (method == CompressionMethod::Xz) + return std::make_unique(std::move(nested), level, buf_size, existing_memory, alignment); if (method == CompressionMethod::None) return nested; diff --git a/src/IO/LzmaWriteBuffer.cpp b/src/IO/LZMADeflatingWriteBuffer.cpp similarity index 85% rename from src/IO/LzmaWriteBuffer.cpp rename to src/IO/LZMADeflatingWriteBuffer.cpp index 8439fc624d4..66cd11c13d6 100644 --- a/src/IO/LzmaWriteBuffer.cpp +++ b/src/IO/LZMADeflatingWriteBuffer.cpp @@ -1,4 +1,4 @@ -#include +#include namespace DB @@ -8,11 +8,15 @@ namespace ErrorCodes extern const int LZMA_STREAM_ENCODER_FAILED; } -LzmaWriteBuffer::LzmaWriteBuffer( +LZMADeflatingWriteBuffer::LZMADeflatingWriteBuffer( std::unique_ptr out_, int compression_level, size_t buf_size, char * existing_memory, size_t alignment) : BufferWithOwnMemory(buf_size, existing_memory, alignment), out(std::move(out_)) { - lstr = LZMA_STREAM_INIT; + // FL2_createCStreamMt(number of threads, flag of two dictionaries usage) + lstr = FL2_createCStreamMt(2, 0); + /* size_t res = */ FL2_initCStream(lstr, compression_level); + + /*lstr = LZMA_STREAM_INIT; lstr.allocator = nullptr; lstr.next_in = nullptr; lstr.avail_in = 0; @@ -37,15 +41,16 @@ LzmaWriteBuffer::LzmaWriteBuffer( throw Exception( std::string("lzma stream encoder init failed: ") + std::to_string(ret) + "; lzma version: " + LZMA_VERSION_STRING, ErrorCodes::LZMA_STREAM_ENCODER_FAILED); + */ } -LzmaWriteBuffer::~LzmaWriteBuffer() +LZMADeflatingWriteBuffer::~LZMADeflatingWriteBuffer() { try { finish(); - lzma_end(&lstr); + //lzma_end(&lstr); } catch (...) { @@ -53,8 +58,9 @@ LzmaWriteBuffer::~LzmaWriteBuffer() } } -void LzmaWriteBuffer::nextImpl() +void LZMADeflatingWriteBuffer::nextImpl() { + /* if (!offset()) return; @@ -82,11 +88,13 @@ void LzmaWriteBuffer::nextImpl() ErrorCodes::LZMA_STREAM_ENCODER_FAILED); } while (lstr.avail_in > 0 || lstr.avail_out == 0); + */ } -void LzmaWriteBuffer::finish() +void LZMADeflatingWriteBuffer::finish() { + /* if (finished) return; @@ -114,5 +122,6 @@ void LzmaWriteBuffer::finish() ErrorCodes::LZMA_STREAM_ENCODER_FAILED); } while (lstr.avail_out == 0); + */ } } diff --git a/src/IO/LzmaWriteBuffer.h b/src/IO/LZMADeflatingWriteBuffer.h similarity index 73% rename from src/IO/LzmaWriteBuffer.h rename to src/IO/LZMADeflatingWriteBuffer.h index d59595dab23..aadf15ec6dd 100644 --- a/src/IO/LzmaWriteBuffer.h +++ b/src/IO/LZMADeflatingWriteBuffer.h @@ -4,14 +4,15 @@ #include #include +#include namespace DB { /// Performs compression using lzma library and writes compressed data to out_ WriteBuffer. -class LzmaWriteBuffer : public BufferWithOwnMemory +class LZMADeflatingWriteBuffer : public BufferWithOwnMemory { public: - LzmaWriteBuffer( + LZMADeflatingWriteBuffer( std::unique_ptr out_, int compression_level, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, @@ -20,13 +21,13 @@ public: void finish(); - ~LzmaWriteBuffer() override; + ~LZMADeflatingWriteBuffer() override; private: void nextImpl() override; std::unique_ptr out; - lzma_stream lstr; + FL2_CStream * lstr; bool finished = false; }; } diff --git a/src/IO/LzmaReadBuffer.cpp b/src/IO/LZMAInflatingReadBuffer.cpp similarity index 81% rename from src/IO/LzmaReadBuffer.cpp rename to src/IO/LZMAInflatingReadBuffer.cpp index 22fda48b3c6..09aa31b7f43 100644 --- a/src/IO/LzmaReadBuffer.cpp +++ b/src/IO/LZMAInflatingReadBuffer.cpp @@ -1,4 +1,4 @@ -#include +#include namespace DB { @@ -6,9 +6,13 @@ namespace ErrorCodes { extern const int LZMA_STREAM_DECODER_FAILED; } -LzmaReadBuffer::LzmaReadBuffer(std::unique_ptr in_, size_t buf_size, char * existing_memory, size_t alignment) +LZMAInflatingReadBuffer::LZMAInflatingReadBuffer(std::unique_ptr in_, size_t buf_size, char * existing_memory, size_t alignment) : BufferWithOwnMemory(buf_size, existing_memory, alignment), in(std::move(in_)), eof(false) { + // FL2_createDStreamMt(number of threads) + lstr = FL2_createDStreamMt(2); + /* size_t res = */ FL2_initDStream(lstr); + /* lstr = LZMA_STREAM_INIT; lstr.allocator = nullptr; lstr.next_in = nullptr; @@ -26,15 +30,17 @@ LzmaReadBuffer::LzmaReadBuffer(std::unique_ptr in_, size_t buf_size, std::string("lzma_stream_decoder initialization failed: error code: ") + std::to_string(ret) + "; lzma version: " + LZMA_VERSION_STRING, ErrorCodes::LZMA_STREAM_DECODER_FAILED); + */ } -LzmaReadBuffer::~LzmaReadBuffer() +LZMAInflatingReadBuffer::~LZMAInflatingReadBuffer() { - lzma_end(&lstr); + //lzma_end(&lstr); } -bool LzmaReadBuffer::nextImpl() +bool LZMAInflatingReadBuffer::nextImpl() { + /* if (eof) return false; @@ -77,5 +83,7 @@ bool LzmaReadBuffer::nextImpl() LZMA_VERSION_STRING); return true; + */ + return true; } } diff --git a/src/IO/LzmaReadBuffer.h b/src/IO/LZMAInflatingReadBuffer.h similarity index 68% rename from src/IO/LzmaReadBuffer.h rename to src/IO/LZMAInflatingReadBuffer.h index 5f936475ee1..4a9893e5b4c 100644 --- a/src/IO/LzmaReadBuffer.h +++ b/src/IO/LZMAInflatingReadBuffer.h @@ -1,10 +1,10 @@ #pragma once #include -#include #include #include +#include namespace DB { @@ -12,22 +12,23 @@ namespace ErrorCodes { } -class LzmaReadBuffer : public BufferWithOwnMemory +class LZMAInflatingReadBuffer : public BufferWithOwnMemory { public: - LzmaReadBuffer( + LZMAInflatingReadBuffer( std::unique_ptr in_, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, char * existing_memory = nullptr, size_t alignment = 0); - ~LzmaReadBuffer() override; + ~LZMAInflatingReadBuffer() override; private: bool nextImpl() override; std::unique_ptr in; - lzma_stream lstr; + FL2_DStream * lstr; + bool eof; }; } diff --git a/src/IO/tests/lzma_buffers.cpp b/src/IO/tests/lzma_buffers.cpp index 1f691fa09f7..7eb6bf8b81c 100644 --- a/src/IO/tests/lzma_buffers.cpp +++ b/src/IO/tests/lzma_buffers.cpp @@ -1,13 +1,13 @@ -#include #include +#include -#include -#include +#include +#include #include -#include -#include -#include #include +#include +#include +#include int main(int, char **) try @@ -18,8 +18,9 @@ try Stopwatch stopwatch; { - auto buf = std::make_unique("test_lzma_buffers.xz", DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_CREAT | O_TRUNC); - DB::LzmaWriteBuffer lzma_buf(std::move(buf), /*compression level*/ 3); + auto buf + = std::make_unique("test_lzma_buffers.xz", DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_CREAT | O_TRUNC); + DB::LZMADeflatingWriteBuffer lzma_buf(std::move(buf), /*compression level*/ 3); stopwatch.restart(); for (size_t i = 0; i < n; ++i) @@ -32,13 +33,12 @@ try stopwatch.stop(); std::cout << "Writing done. Elapsed: " << stopwatch.elapsedSeconds() << " s." - << ", " << (lzma_buf.count() / stopwatch.elapsedSeconds() / 1000000) << " MB/s" - << std::endl; + << ", " << (lzma_buf.count() / stopwatch.elapsedSeconds() / 1000000) << " MB/s" << std::endl; } { auto buf = std::make_unique("test_lzma_buffers.xz"); - DB::LzmaReadBuffer lzma_buf(std::move(buf)); + DB::LZMAInflatingReadBuffer lzma_buf(std::move(buf)); stopwatch.restart(); for (size_t i = 0; i < n; ++i) @@ -52,9 +52,7 @@ try } stopwatch.stop(); std::cout << "Reading done. Elapsed: " << stopwatch.elapsedSeconds() << " s." - << ", " << (lzma_buf.count() / stopwatch.elapsedSeconds() / 1000000) << " MB/s" - << std::endl; - + << ", " << (lzma_buf.count() / stopwatch.elapsedSeconds() / 1000000) << " MB/s" << std::endl; } return 0; diff --git a/tests/queries/0_stateless/01059_storage_file_brotli.reference b/tests/queries/0_stateless/01059_storage_file_brotli.reference index 6c545e9faec..aae55b2873c 100644 --- a/tests/queries/0_stateless/01059_storage_file_brotli.reference +++ b/tests/queries/0_stateless/01059_storage_file_brotli.reference @@ -1,5 +1,7 @@ 1000000 999999 1000000 999999 -2000000 999999 +1000000 999999 +3000000 999999 1 255 1 255 +1 255 \ No newline at end of file diff --git a/tests/queries/0_stateless/01059_storage_file_brotli.sql b/tests/queries/0_stateless/01059_storage_file_brotli.sql index e7d5a87b2af..eba61e4450f 100644 --- a/tests/queries/0_stateless/01059_storage_file_brotli.sql +++ b/tests/queries/0_stateless/01059_storage_file_brotli.sql @@ -15,8 +15,17 @@ SELECT count(), max(x) FROM file; DROP TABLE file; -SELECT count(), max(x) FROM file('data{1,2}.tsv.{gz,br}', TSV, 'x UInt64'); +CREATE TABLE file (x UInt64) ENGINE = File(TSV, 'data3.tsv.xz'); +TRUNCATE TABLE file; + +INSERT INTO file SELECT * FROM numbers(1000000); +SELECT count(), max(x) FROM file; + +DROP TABLE file; + +SELECT count(), max(x) FROM file('data{1,2,3}.tsv.{gz,br,xz}', TSV, 'x UInt64'); -- check that they are compressed SELECT count() < 1000000, max(x) FROM file('data1.tsv.br', RowBinary, 'x UInt8', 'none'); SELECT count() < 3000000, max(x) FROM file('data2.tsv.gz', RowBinary, 'x UInt8', 'none'); +SELECT count() < 1000000, max(x) FROM file('data3.tsv.xz', RowBinary, 'x UInt8', 'none');