renamed files, added new library, changed error codes, added tests for file() function

This commit is contained in:
a.palagashvili 2020-11-10 01:52:22 +03:00
parent 6286775031
commit f999ea2a6f
13 changed files with 105 additions and 66 deletions

4
.gitmodules vendored
View File

@ -17,6 +17,7 @@
[submodule "contrib/zlib-ng"]
path = contrib/zlib-ng
url = https://github.com/ClickHouse-Extras/zlib-ng.git
branch = clickhouse
[submodule "contrib/googletest"]
path = contrib/googletest
url = https://github.com/google/googletest.git
@ -193,3 +194,6 @@
[submodule "contrib/xz"]
path = contrib/xz
url = https://github.com/xz-mirror/xz
[submodule "contrib/fast-lzma2"]
path = contrib/fast-lzma2
url = https://github.com/conor42/fast-lzma2

View File

@ -219,7 +219,6 @@
* Added column transformers `EXCEPT`, `REPLACE`, `APPLY`, which can be applied to the list of selected columns (after `*` or `COLUMNS(...)`). For example, you can write `SELECT * EXCEPT(URL) REPLACE(number + 1 AS number)`. Another example: `select * apply(length) apply(max) from wide_string_table` to find out the maxium length of all string columns. [#14233](https://github.com/ClickHouse/ClickHouse/pull/14233) ([Amos Bird](https://github.com/amosbird)).
* Added an aggregate function `rankCorr` which computes a rank correlation coefficient. [#11769](https://github.com/ClickHouse/ClickHouse/pull/11769) ([antikvist](https://github.com/antikvist)) [#14411](https://github.com/ClickHouse/ClickHouse/pull/14411) ([Nikita Mikhaylov](https://github.com/nikitamikhaylov)).
* Added table function `view` which turns a subquery into a table object. This helps passing queries around. For instance, it can be used in remote/cluster table functions. [#12567](https://github.com/ClickHouse/ClickHouse/pull/12567) ([Amos Bird](https://github.com/amosbird)).
* Added support for `xz` compression format. This enables using `*.xz` files in `table()` function. [#16578](https://github.com/ClickHouse/ClickHouse/pull/16578) ([Abi Palagashvili](https://github.com/fibersel))
#### Bug Fix

1
contrib/fast-lzma2 vendored Submodule

@ -0,0 +1 @@
Subproject commit ded964d203cabe1a572d2c813c55e8a94b4eda48

View File

@ -336,6 +336,13 @@ if (LZMA_LIBRARY)
target_include_directories (clickhouse_common_io SYSTEM BEFORE PUBLIC ${LZMA_INCLUDE_DIR})
endif()
set (FAST_LZMA_LIBRARY fast-lzma2)
set (FAST_LZMA_INCLUDE_DIR ${ClickHouse_SOURCE_DIR}/contrib/fast-lzma2/)
if (FAST_LZMA_LIBRARY)
target_link_libraries (clickhouse_common_io PUBLIC ${FAST_LZMA_LIBRARY})
target_include_directories (clickhouse_common_io SYSTEM BEFORE PUBLIC ${FAST_LZMA_INCLUDE_DIR})
endif()
if (USE_ICU)
dbms_target_link_libraries (PRIVATE ${ICU_LIBRARIES})
dbms_target_include_directories (SYSTEM PRIVATE ${ICU_INCLUDE_DIRS})

View File

@ -519,13 +519,13 @@
M(550, CONDITIONAL_TREE_PARENT_NOT_FOUND) \
M(551, ILLEGAL_PROJECTION_MANIPULATOR) \
M(552, UNRECOGNIZED_ARGUMENTS) \
M(553, LZMA_STREAM_ENCODER_FAILED) \
M(554, LZMA_STREAM_DECODER_FAILED) \
\
M(999, KEEPER_EXCEPTION) \
M(1000, POCO_EXCEPTION) \
M(1001, STD_EXCEPTION) \
M(1002, UNKNOWN_EXCEPTION) \
M(1003, LZMA_STREAM_ENCODER_FAILED) \
M(1004, LZMA_STREAM_DECODER_FAILED)
M(1002, UNKNOWN_EXCEPTION)
/* See END */

View File

@ -1,13 +1,13 @@
#include <IO/CompressionMethod.h>
#include <IO/ReadBuffer.h>
#include <IO/WriteBuffer.h>
#include <IO/ZlibInflatingReadBuffer.h>
#include <IO/ZlibDeflatingWriteBuffer.h>
#include <IO/BrotliReadBuffer.h>
#include <IO/BrotliWriteBuffer.h>
#include <IO/LzmaReadBuffer.h>
#include <IO/LzmaWriteBuffer.h>
#include <IO/LZMADeflatingWriteBuffer.h>
#include <IO/LZMAInflatingReadBuffer.h>
#include <IO/ReadBuffer.h>
#include <IO/WriteBuffer.h>
#include <IO/ZlibDeflatingWriteBuffer.h>
#include <IO/ZlibInflatingReadBuffer.h>
#if !defined(ARCADIA_BUILD)
# include <Common/config.h>
@ -16,7 +16,6 @@
namespace DB
{
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
@ -27,11 +26,16 @@ std::string toContentEncodingName(CompressionMethod method)
{
switch (method)
{
case CompressionMethod::Gzip: return "gzip";
case CompressionMethod::Zlib: return "deflate";
case CompressionMethod::Brotli: return "br";
case CompressionMethod::Xz: return "xz";
case CompressionMethod::None: return "";
case CompressionMethod::Gzip:
return "gzip";
case CompressionMethod::Zlib:
return "deflate";
case CompressionMethod::Brotli:
return "br";
case CompressionMethod::Xz:
return "xz";
case CompressionMethod::None:
return "";
}
__builtin_unreachable();
}
@ -55,20 +59,19 @@ CompressionMethod chooseCompressionMethod(const std::string & path, const std::s
return CompressionMethod::Zlib;
if (*method_str == "brotli" || *method_str == "br")
return CompressionMethod::Brotli;
if (*method_str == "LZMA" || *method_str == "xz")
return CompressionMethod::Xz;
if (hint.empty() || hint == "auto" || hint == "none")
return CompressionMethod::None;
throw Exception("Unknown compression method " + hint + ". Only 'auto', 'none', 'gzip', 'br' are supported as compression methods",
throw Exception(
"Unknown compression method " + hint + ". Only 'auto', 'none', 'gzip', 'br', 'xz' are supported as compression methods",
ErrorCodes::NOT_IMPLEMENTED);
}
std::unique_ptr<ReadBuffer> wrapReadBufferWithCompressionMethod(
std::unique_ptr<ReadBuffer> nested,
CompressionMethod method,
size_t buf_size,
char * existing_memory,
size_t alignment)
std::unique_ptr<ReadBuffer> nested, CompressionMethod method, size_t buf_size, char * existing_memory, size_t alignment)
{
if (method == CompressionMethod::Gzip || method == CompressionMethod::Zlib)
return std::make_unique<ZlibInflatingReadBuffer>(std::move(nested), method, buf_size, existing_memory, alignment);
@ -77,7 +80,7 @@ std::unique_ptr<ReadBuffer> wrapReadBufferWithCompressionMethod(
return std::make_unique<BrotliReadBuffer>(std::move(nested), buf_size, existing_memory, alignment);
#endif
if (method == CompressionMethod::Xz)
return std::make_unique<LzmaReadBuffer>(std::move(nested), buf_size, existing_memory, alignment);
return std::make_unique<LZMAInflatingReadBuffer>(std::move(nested), buf_size, existing_memory, alignment);
if (method == CompressionMethod::None)
return nested;
@ -87,12 +90,7 @@ std::unique_ptr<ReadBuffer> wrapReadBufferWithCompressionMethod(
std::unique_ptr<WriteBuffer> wrapWriteBufferWithCompressionMethod(
std::unique_ptr<WriteBuffer> nested,
CompressionMethod method,
int level,
size_t buf_size,
char * existing_memory,
size_t alignment)
std::unique_ptr<WriteBuffer> nested, CompressionMethod method, int level, size_t buf_size, char * existing_memory, size_t alignment)
{
if (method == DB::CompressionMethod::Gzip || method == CompressionMethod::Zlib)
return std::make_unique<ZlibDeflatingWriteBuffer>(std::move(nested), method, level, buf_size, existing_memory, alignment);
@ -101,6 +99,8 @@ std::unique_ptr<WriteBuffer> wrapWriteBufferWithCompressionMethod(
if (method == DB::CompressionMethod::Brotli)
return std::make_unique<BrotliWriteBuffer>(std::move(nested), level, buf_size, existing_memory, alignment);
#endif
if (method == CompressionMethod::Xz)
return std::make_unique<LZMADeflatingWriteBuffer>(std::move(nested), level, buf_size, existing_memory, alignment);
if (method == CompressionMethod::None)
return nested;

View File

@ -1,4 +1,4 @@
#include <IO/LzmaWriteBuffer.h>
#include <IO/LZMADeflatingWriteBuffer.h>
namespace DB
@ -8,11 +8,15 @@ namespace ErrorCodes
extern const int LZMA_STREAM_ENCODER_FAILED;
}
LzmaWriteBuffer::LzmaWriteBuffer(
LZMADeflatingWriteBuffer::LZMADeflatingWriteBuffer(
std::unique_ptr<WriteBuffer> out_, int compression_level, size_t buf_size, char * existing_memory, size_t alignment)
: BufferWithOwnMemory<WriteBuffer>(buf_size, existing_memory, alignment), out(std::move(out_))
{
lstr = LZMA_STREAM_INIT;
// FL2_createCStreamMt(number of threads, flag of two dictionaries usage)
lstr = FL2_createCStreamMt(2, 0);
/* size_t res = */ FL2_initCStream(lstr, compression_level);
/*lstr = LZMA_STREAM_INIT;
lstr.allocator = nullptr;
lstr.next_in = nullptr;
lstr.avail_in = 0;
@ -37,15 +41,16 @@ LzmaWriteBuffer::LzmaWriteBuffer(
throw Exception(
std::string("lzma stream encoder init failed: ") + std::to_string(ret) + "; lzma version: " + LZMA_VERSION_STRING,
ErrorCodes::LZMA_STREAM_ENCODER_FAILED);
*/
}
LzmaWriteBuffer::~LzmaWriteBuffer()
LZMADeflatingWriteBuffer::~LZMADeflatingWriteBuffer()
{
try
{
finish();
lzma_end(&lstr);
//lzma_end(&lstr);
}
catch (...)
{
@ -53,8 +58,9 @@ LzmaWriteBuffer::~LzmaWriteBuffer()
}
}
void LzmaWriteBuffer::nextImpl()
void LZMADeflatingWriteBuffer::nextImpl()
{
/*
if (!offset())
return;
@ -82,11 +88,13 @@ void LzmaWriteBuffer::nextImpl()
ErrorCodes::LZMA_STREAM_ENCODER_FAILED);
} while (lstr.avail_in > 0 || lstr.avail_out == 0);
*/
}
void LzmaWriteBuffer::finish()
void LZMADeflatingWriteBuffer::finish()
{
/*
if (finished)
return;
@ -114,5 +122,6 @@ void LzmaWriteBuffer::finish()
ErrorCodes::LZMA_STREAM_ENCODER_FAILED);
} while (lstr.avail_out == 0);
*/
}
}

View File

@ -4,14 +4,15 @@
#include <IO/WriteBuffer.h>
#include <lzma.h>
#include <fast-lzma2.h>
namespace DB
{
/// Performs compression using lzma library and writes compressed data to out_ WriteBuffer.
class LzmaWriteBuffer : public BufferWithOwnMemory<WriteBuffer>
class LZMADeflatingWriteBuffer : public BufferWithOwnMemory<WriteBuffer>
{
public:
LzmaWriteBuffer(
LZMADeflatingWriteBuffer(
std::unique_ptr<WriteBuffer> out_,
int compression_level,
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
@ -20,13 +21,13 @@ public:
void finish();
~LzmaWriteBuffer() override;
~LZMADeflatingWriteBuffer() override;
private:
void nextImpl() override;
std::unique_ptr<WriteBuffer> out;
lzma_stream lstr;
FL2_CStream * lstr;
bool finished = false;
};
}

View File

@ -1,4 +1,4 @@
#include <IO/LzmaReadBuffer.h>
#include <IO/LZMAInflatingReadBuffer.h>
namespace DB
{
@ -6,9 +6,13 @@ namespace ErrorCodes
{
extern const int LZMA_STREAM_DECODER_FAILED;
}
LzmaReadBuffer::LzmaReadBuffer(std::unique_ptr<ReadBuffer> in_, size_t buf_size, char * existing_memory, size_t alignment)
LZMAInflatingReadBuffer::LZMAInflatingReadBuffer(std::unique_ptr<ReadBuffer> in_, size_t buf_size, char * existing_memory, size_t alignment)
: BufferWithOwnMemory<ReadBuffer>(buf_size, existing_memory, alignment), in(std::move(in_)), eof(false)
{
// FL2_createDStreamMt(number of threads)
lstr = FL2_createDStreamMt(2);
/* size_t res = */ FL2_initDStream(lstr);
/*
lstr = LZMA_STREAM_INIT;
lstr.allocator = nullptr;
lstr.next_in = nullptr;
@ -26,15 +30,17 @@ LzmaReadBuffer::LzmaReadBuffer(std::unique_ptr<ReadBuffer> in_, size_t buf_size,
std::string("lzma_stream_decoder initialization failed: error code: ") + std::to_string(ret)
+ "; lzma version: " + LZMA_VERSION_STRING,
ErrorCodes::LZMA_STREAM_DECODER_FAILED);
*/
}
LzmaReadBuffer::~LzmaReadBuffer()
LZMAInflatingReadBuffer::~LZMAInflatingReadBuffer()
{
lzma_end(&lstr);
//lzma_end(&lstr);
}
bool LzmaReadBuffer::nextImpl()
bool LZMAInflatingReadBuffer::nextImpl()
{
/*
if (eof)
return false;
@ -77,5 +83,7 @@ bool LzmaReadBuffer::nextImpl()
LZMA_VERSION_STRING);
return true;
*/
return true;
}
}

View File

@ -1,10 +1,10 @@
#pragma once
#include <IO/BufferWithOwnMemory.h>
#include <IO/CompressionMethod.h>
#include <IO/ReadBuffer.h>
#include <lzma.h>
#include <fast-lzma2.h>
namespace DB
{
@ -12,22 +12,23 @@ namespace ErrorCodes
{
}
class LzmaReadBuffer : public BufferWithOwnMemory<ReadBuffer>
class LZMAInflatingReadBuffer : public BufferWithOwnMemory<ReadBuffer>
{
public:
LzmaReadBuffer(
LZMAInflatingReadBuffer(
std::unique_ptr<ReadBuffer> in_,
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
char * existing_memory = nullptr,
size_t alignment = 0);
~LzmaReadBuffer() override;
~LZMAInflatingReadBuffer() override;
private:
bool nextImpl() override;
std::unique_ptr<ReadBuffer> in;
lzma_stream lstr;
FL2_DStream * lstr;
bool eof;
};
}

View File

@ -1,13 +1,13 @@
#include <iostream>
#include <iomanip>
#include <iostream>
#include <Common/Stopwatch.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/LZMADeflatingWriteBuffer.h>
#include <IO/LZMAInflatingReadBuffer.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/LzmaReadBuffer.h>
#include <IO/LzmaWriteBuffer.h>
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/WriteHelpers.h>
#include <Common/Stopwatch.h>
int main(int, char **)
try
@ -18,8 +18,9 @@ try
Stopwatch stopwatch;
{
auto buf = std::make_unique<DB::WriteBufferFromFile>("test_lzma_buffers.xz", DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_CREAT | O_TRUNC);
DB::LzmaWriteBuffer lzma_buf(std::move(buf), /*compression level*/ 3);
auto buf
= std::make_unique<DB::WriteBufferFromFile>("test_lzma_buffers.xz", DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_CREAT | O_TRUNC);
DB::LZMADeflatingWriteBuffer lzma_buf(std::move(buf), /*compression level*/ 3);
stopwatch.restart();
for (size_t i = 0; i < n; ++i)
@ -32,13 +33,12 @@ try
stopwatch.stop();
std::cout << "Writing done. Elapsed: " << stopwatch.elapsedSeconds() << " s."
<< ", " << (lzma_buf.count() / stopwatch.elapsedSeconds() / 1000000) << " MB/s"
<< std::endl;
<< ", " << (lzma_buf.count() / stopwatch.elapsedSeconds() / 1000000) << " MB/s" << std::endl;
}
{
auto buf = std::make_unique<DB::ReadBufferFromFile>("test_lzma_buffers.xz");
DB::LzmaReadBuffer lzma_buf(std::move(buf));
DB::LZMAInflatingReadBuffer lzma_buf(std::move(buf));
stopwatch.restart();
for (size_t i = 0; i < n; ++i)
@ -52,9 +52,7 @@ try
}
stopwatch.stop();
std::cout << "Reading done. Elapsed: " << stopwatch.elapsedSeconds() << " s."
<< ", " << (lzma_buf.count() / stopwatch.elapsedSeconds() / 1000000) << " MB/s"
<< std::endl;
<< ", " << (lzma_buf.count() / stopwatch.elapsedSeconds() / 1000000) << " MB/s" << std::endl;
}
return 0;

View File

@ -1,5 +1,7 @@
1000000 999999
1000000 999999
2000000 999999
1000000 999999
3000000 999999
1 255
1 255
1 255

View File

@ -15,8 +15,17 @@ SELECT count(), max(x) FROM file;
DROP TABLE file;
SELECT count(), max(x) FROM file('data{1,2}.tsv.{gz,br}', TSV, 'x UInt64');
CREATE TABLE file (x UInt64) ENGINE = File(TSV, 'data3.tsv.xz');
TRUNCATE TABLE file;
INSERT INTO file SELECT * FROM numbers(1000000);
SELECT count(), max(x) FROM file;
DROP TABLE file;
SELECT count(), max(x) FROM file('data{1,2,3}.tsv.{gz,br,xz}', TSV, 'x UInt64');
-- check that they are compressed
SELECT count() < 1000000, max(x) FROM file('data1.tsv.br', RowBinary, 'x UInt8', 'none');
SELECT count() < 3000000, max(x) FROM file('data2.tsv.gz', RowBinary, 'x UInt8', 'none');
SELECT count() < 1000000, max(x) FROM file('data3.tsv.xz', RowBinary, 'x UInt8', 'none');