Add bzip2 file compression

This commit is contained in:
Nikolay Degterinsky 2021-08-06 23:55:04 +00:00
parent e1927118cd
commit 9a45458faf
18 changed files with 397 additions and 4 deletions

3
.gitmodules vendored
View File

@ -243,3 +243,6 @@
[submodule "contrib/s2geometry"]
path = contrib/s2geometry
url = https://github.com/ClickHouse-Extras/s2geometry.git
[submodule "contrib/bzip2"]
path = contrib/bzip2
url = https://gitlab.com/federicomenaquintero/bzip2.git

View File

@ -543,6 +543,7 @@ include (cmake/find/nuraft.cmake)
include (cmake/find/yaml-cpp.cmake)
include (cmake/find/s2geometry.cmake)
include (cmake/find/nlp.cmake)
include (cmake/find/bzip2.cmake)
if(NOT USE_INTERNAL_PARQUET_LIBRARY)
set (ENABLE_ORC OFF CACHE INTERNAL "")

19
cmake/find/bzip2.cmake Normal file
View File

@ -0,0 +1,19 @@
option(ENABLE_BZIP2 "Enable bzip2 compression support" ${ENABLE_LIBRARIES})
if (NOT ENABLE_BZIP2)
message (STATUS "bzip2 compression disabled")
return()
endif()
if (NOT EXISTS "${ClickHouse_SOURCE_DIR}/contrib/bzip2/bzlib.h")
message (WARNING "submodule contrib/bzip2 is missing. to fix try run: \n git submodule update --init --recursive")
message (${RECONFIGURE_MESSAGE_LEVEL} "Can't find internal bzip2 library")
set (USE_NLP 0)
return()
endif ()
set (USE_BZIP2 1)
set (BZIP2_INCLUDE_DIR "${ClickHouse_SOURCE_DIR}/contrib/bzip2")
set (BZIP2_LIBRARY bzip2)
message (STATUS "Using bzip2=${USE_BZIP2}: ${BZIP2_INCLUDE_DIR} : ${BZIP2_LIBRARY}")

View File

@ -334,6 +334,10 @@ if (USE_NLP)
add_subdirectory(lemmagen-c-cmake)
endif()
if (USE_BZIP2)
add_subdirectory(bzip2-cmake)
endif()
if (USE_SQLITE)
add_subdirectory(sqlite-cmake)
endif()

View File

@ -0,0 +1,23 @@
set(BZIP2_SOURCE_DIR "${ClickHouse_SOURCE_DIR}/contrib/bzip2")
set(BZIP2_BINARY_DIR "${ClickHouse_BINARY_DIR}/contrib/bzip2")
set(SRCS
"${BZIP2_SOURCE_DIR}/blocksort.c"
"${BZIP2_SOURCE_DIR}/huffman.c"
"${BZIP2_SOURCE_DIR}/crctable.c"
"${BZIP2_SOURCE_DIR}/randtable.c"
"${BZIP2_SOURCE_DIR}/compress.c"
"${BZIP2_SOURCE_DIR}/decompress.c"
"${BZIP2_SOURCE_DIR}/bzlib.c"
)
# From bzip2/CMakeLists.txt
set(BZ_VERSION "1.0.7")
configure_file (
"${BZIP2_SOURCE_DIR}/bz_version.h.in"
"${BZIP2_BINARY_DIR}/bz_version.h"
)
add_library(bzip2 ${SRCS})
target_include_directories(bzip2 PUBLIC "${BZIP2_SOURCE_DIR}" "${BZIP2_BINARY_DIR}")

View File

@ -479,6 +479,11 @@ if (USE_NLP)
dbms_target_link_libraries (PUBLIC lemmagen)
endif()
if (USE_BZIP2)
target_link_libraries (clickhouse_common_io PRIVATE ${BZIP2_LIBRARY})
target_include_directories (clickhouse_common_io SYSTEM BEFORE PRIVATE ${BZIP2_INCLUDE_DIR})
endif()
include ("${ClickHouse_SOURCE_DIR}/cmake/add_check.cmake")
if (ENABLE_TESTS AND USE_GTEST)

View File

@ -567,6 +567,8 @@
M(1000, POCO_EXCEPTION) \
M(1001, STD_EXCEPTION) \
M(1002, UNKNOWN_EXCEPTION) \
M(1003, BZIP2_STREAM_DECODER_FAILED) \
M(1004, BZIP2_STREAM_ENCODER_FAILED) \
/* See END */

View File

@ -19,3 +19,4 @@
#cmakedefine01 USE_DATASKETCHES
#cmakedefine01 USE_YAML_CPP
#cmakedefine01 CLICKHOUSE_SPLIT_BINARY
#cmakedefine01 USE_BZIP2

View File

@ -0,0 +1,97 @@
#if !defined(ARCADIA_BUILD)
# include <Common/config.h>
#endif
#if USE_BZIP2
# include <IO/Bzip2ReadBuffer.h>
# include <bzlib.h>
namespace DB
{
namespace ErrorCodes
{
extern const int BZIP2_STREAM_DECODER_FAILED;
}
class Bzip2ReadBuffer::Bzip2StateWrapper
{
public:
Bzip2StateWrapper()
{
memset(&stream, 0, sizeof(stream));
int ret = BZ2_bzDecompressInit(&stream, 0, 0);
if (ret != BZ_OK)
throw Exception(
ErrorCodes::BZIP2_STREAM_DECODER_FAILED,
"bzip2 stream encoder init failed: error code: {}",
ret);
}
~Bzip2StateWrapper()
{
BZ2_bzDecompressEnd(&stream);
}
bz_stream stream;
};
Bzip2ReadBuffer::Bzip2ReadBuffer(std::unique_ptr<ReadBuffer> in_, size_t buf_size, char *existing_memory, size_t alignment)
: BufferWithOwnMemory<ReadBuffer>(buf_size, existing_memory, alignment)
, in(std::move(in_))
, bz(std::make_unique<Bzip2StateWrapper>())
, eof(false)
{
}
Bzip2ReadBuffer::~Bzip2ReadBuffer() = default;
bool Bzip2ReadBuffer::nextImpl()
{
if (eof)
return false;
if (!bz->stream.avail_in)
{
in->nextIfAtEnd();
bz->stream.avail_in = in->buffer().end() - in->position();
bz->stream.next_in = in->position();
}
bz->stream.avail_out = internal_buffer.size();
bz->stream.next_out = internal_buffer.begin();
int ret = BZ2_bzDecompress(&bz->stream);
in->position() = in->buffer().end() - bz->stream.avail_in;
working_buffer.resize(internal_buffer.size() - bz->stream.avail_out);
if (ret == BZ_STREAM_END)
{
if (in->eof())
{
eof = true;
return !working_buffer.empty();
}
else
{
throw Exception(
ErrorCodes::BZIP2_STREAM_DECODER_FAILED,
"bzip2 decoder finished, but input stream has not exceeded: error code: {}", ret);
}
}
if (ret != BZ_OK)
throw Exception(
ErrorCodes::BZIP2_STREAM_DECODER_FAILED,
"bzip2 stream decoder failed: error code: {}",
ret);
return true;
}
}
#endif

33
src/IO/Bzip2ReadBuffer.h Normal file
View File

@ -0,0 +1,33 @@
#pragma once
#include <IO/ReadBuffer.h>
#include <IO/BufferWithOwnMemory.h>
namespace DB
{
class Bzip2ReadBuffer : public BufferWithOwnMemory<ReadBuffer>
{
public:
Bzip2ReadBuffer(
std::unique_ptr<ReadBuffer> in_,
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
char * existing_memory = nullptr,
size_t alignment = 0);
~Bzip2ReadBuffer() override;
private:
bool nextImpl() override;
std::unique_ptr<ReadBuffer> in;
class Bzip2StateWrapper;
std::unique_ptr<Bzip2StateWrapper> bz;
bool eof;
};
}

138
src/IO/Bzip2WriteBuffer.cpp Normal file
View File

@ -0,0 +1,138 @@
#if !defined(ARCADIA_BUILD)
# include <Common/config.h>
#endif
#if USE_BROTLI
# include <IO/Bzip2WriteBuffer.h>
# include <bzlib.h>
#include <Common/MemoryTracker.h>
namespace DB
{
namespace ErrorCodes
{
extern const int BZIP2_STREAM_ENCODER_FAILED;
}
class Bzip2WriteBuffer::Bzip2StateWrapper
{
public:
Bzip2StateWrapper(int compression_level)
{
memset(&stream, 0, sizeof(stream));
int ret = BZ2_bzCompressInit(&stream, compression_level, 0, 0);
if (ret != BZ_OK)
throw Exception(
ErrorCodes::BZIP2_STREAM_ENCODER_FAILED,
"bzip2 stream encoder init failed: error code: {}",
ret);
}
~Bzip2StateWrapper()
{
BZ2_bzCompressEnd(&stream);
}
bz_stream stream;
};
Bzip2WriteBuffer::Bzip2WriteBuffer(std::unique_ptr<WriteBuffer> out_, int compression_level, size_t buf_size, char * existing_memory, size_t alignment)
: BufferWithOwnMemory<WriteBuffer>(buf_size, existing_memory, alignment)
, bz(std::make_unique<Bzip2StateWrapper>(compression_level))
, out(std::move(out_))
{
}
Bzip2WriteBuffer::~Bzip2WriteBuffer()
{
/// FIXME move final flush into the caller
MemoryTracker::LockExceptionInThread lock(VariableContext::Global);
finish();
}
void Bzip2WriteBuffer::nextImpl()
{
if (!offset())
{
return;
}
bz->stream.next_in = working_buffer.begin();
bz->stream.avail_in = offset();
try
{
do
{
out->nextIfAtEnd();
bz->stream.next_out = out->position();
bz->stream.avail_out = out->buffer().end() - out->position();
int ret = BZ2_bzCompress(&bz->stream, BZ_RUN);
out->position() = out->buffer().end() - bz->stream.avail_out;
if (ret != BZ_RUN_OK)
throw Exception(
ErrorCodes::BZIP2_STREAM_ENCODER_FAILED,
"bzip2 stream encoder failed: error code: {}",
ret);
}
while (bz->stream.avail_in > 0);
}
catch (...)
{
/// Do not try to write next time after exception.
out->position() = out->buffer().begin();
throw;
}
}
void Bzip2WriteBuffer::finish()
{
if (finished)
return;
try
{
finishImpl();
out->finalize();
finished = true;
}
catch (...)
{
/// Do not try to flush next time after exception.
out->position() = out->buffer().begin();
finished = true;
throw;
}
}
void Bzip2WriteBuffer::finishImpl()
{
next();
out->nextIfAtEnd();
bz->stream.next_out = out->position();
bz->stream.avail_out = out->buffer().end() - out->position();
int ret = BZ2_bzCompress(&bz->stream, BZ_FINISH);
out->position() = out->buffer().end() - bz->stream.avail_out;
if (ret != BZ_STREAM_END && ret != BZ_FINISH_OK)
throw Exception(
ErrorCodes::BZIP2_STREAM_ENCODER_FAILED,
"bzip2 stream encoder failed: error code: {}",
ret);
}
}
#endif

37
src/IO/Bzip2WriteBuffer.h Normal file
View File

@ -0,0 +1,37 @@
#pragma once
#include <IO/WriteBuffer.h>
#include <IO/BufferWithOwnMemory.h>
namespace DB
{
class Bzip2WriteBuffer : public BufferWithOwnMemory<WriteBuffer>
{
public:
Bzip2WriteBuffer(
std::unique_ptr<WriteBuffer> out_,
int compression_level,
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
char * existing_memory = nullptr,
size_t alignment = 0);
~Bzip2WriteBuffer() override;
void finalize() override { finish(); }
private:
void nextImpl() override;
void finish();
void finishImpl();
class Bzip2StateWrapper;
std::unique_ptr<Bzip2StateWrapper> bz;
std::unique_ptr<WriteBuffer> out;
bool finished = false;
};
}

View File

@ -10,6 +10,8 @@
#include <IO/ZlibInflatingReadBuffer.h>
#include <IO/ZstdDeflatingWriteBuffer.h>
#include <IO/ZstdInflatingReadBuffer.h>
#include <IO/Bzip2ReadBuffer.h>
#include <IO/Bzip2WriteBuffer.h>
#if !defined(ARCADIA_BUILD)
# include <Common/config.h>
@ -40,6 +42,8 @@ std::string toContentEncodingName(CompressionMethod method)
return "xz";
case CompressionMethod::Zstd:
return "zstd";
case CompressionMethod::Bzip2:
return "bz2";
case CompressionMethod::None:
return "";
}
@ -69,11 +73,13 @@ CompressionMethod chooseCompressionMethod(const std::string & path, const std::s
return CompressionMethod::Xz;
if (method_str == "zstd" || method_str == "zst")
return CompressionMethod::Zstd;
if (method_str == "bz2")
return CompressionMethod::Bzip2;
if (hint.empty() || hint == "auto" || hint == "none")
return CompressionMethod::None;
throw Exception(
"Unknown compression method " + hint + ". Only 'auto', 'none', 'gzip', 'deflate', 'br', 'xz', 'zstd' are supported as compression methods",
"Unknown compression method " + hint + ". Only 'auto', 'none', 'gzip', 'deflate', 'br', 'xz', 'zstd', 'bz2' are supported as compression methods",
ErrorCodes::NOT_IMPLEMENTED);
}
@ -91,7 +97,10 @@ std::unique_ptr<ReadBuffer> wrapReadBufferWithCompressionMethod(
return std::make_unique<LZMAInflatingReadBuffer>(std::move(nested), buf_size, existing_memory, alignment);
if (method == CompressionMethod::Zstd)
return std::make_unique<ZstdInflatingReadBuffer>(std::move(nested), buf_size, existing_memory, alignment);
#if USE_BZIP2
if (method == CompressionMethod::Bzip2)
return std::make_unique<Bzip2ReadBuffer>(std::move(nested), buf_size, existing_memory, alignment);
#endif
if (method == CompressionMethod::None)
return nested;
@ -114,7 +123,10 @@ std::unique_ptr<WriteBuffer> wrapWriteBufferWithCompressionMethod(
if (method == CompressionMethod::Zstd)
return std::make_unique<ZstdDeflatingWriteBuffer>(std::move(nested), level, buf_size, existing_memory, alignment);
#if USE_BZIP2
if (method == CompressionMethod::Bzip2)
return std::make_unique<Bzip2WriteBuffer>(std::move(nested), level, buf_size, existing_memory, alignment);
#endif
if (method == CompressionMethod::None)
return nested;

View File

@ -31,7 +31,8 @@ enum class CompressionMethod
/// Zstd compressor
/// This option corresponds to HTTP Content-Encoding: zstd
Zstd,
Brotli
Brotli,
Bzip2
};
/// How the compression method is named in HTTP.

View File

@ -23,6 +23,8 @@ SRCS(
AIOContextPool.cpp
BrotliReadBuffer.cpp
BrotliWriteBuffer.cpp
Bzip2ReadBuffer.cpp
Bzip2WriteBuffer.cpp
CascadeWriteBuffer.cpp
CompressionMethod.cpp
DoubleConverter.cpp

View File

@ -50,6 +50,7 @@ const char * auto_config_build[]
"USE_LDAP", "@USE_LDAP@",
"TZDATA_VERSION", "@TZDATA_VERSION@",
"USE_KRB5", "@USE_KRB5@",
"USE_BZIP2", "@USE_BZIP2@",
nullptr, nullptr
};

View File

@ -0,0 +1 @@
Hello, World!

View File

@ -0,0 +1,13 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
USER_FILES_PATH=$(clickhouse-client --query "select _path,_file from file('nonexist.txt', 'CSV', 'val1 char')" 2>&1 | grep Exception | awk '{gsub("/nonexist.txt","",$9); print $9}')
${CLICKHOUSE_CLIENT} --query "SELECT * FROM (SELECT 'Hello, World!' as String) INTO OUTFILE '${USER_FILES_PATH}/bz2_compression.bz2'"
bzip2 -t ${USER_FILES_PATH}/bz2_compression.bz2
${CLICKHOUSE_CLIENT} --query "SELECT * FROM file('${USER_FILES_PATH}/bz2_compression.bz2', 'TabSeparated', 'col String')"
rm -f "${USER_FILES_PATH}/bz2_compression.bz2"