diff --git a/dbms/src/Server/CMakeLists.txt b/dbms/src/Server/CMakeLists.txt index 864a37f4ff8..554c3a272cb 100644 --- a/dbms/src/Server/CMakeLists.txt +++ b/dbms/src/Server/CMakeLists.txt @@ -37,6 +37,9 @@ add_library (clickhouse-performance-test PerformanceTest.cpp) target_link_libraries (clickhouse-performance-test dbms ${Boost_PROGRAM_OPTIONS_LIBRARY}) target_include_directories (clickhouse-performance-test PRIVATE ${PCG_RANDOM_INCLUDE_DIR}) +add_library (clickhouse-compressor-lib Compressor.cpp) +target_link_libraries (clickhouse-compressor-lib dbms ${Boost_PROGRAM_OPTIONS_LIBRARY}) + add_executable(clickhouse main.cpp) target_include_directories(clickhouse PRIVATE ${COMMON_INCLUDE_DIR}) target_link_libraries(clickhouse @@ -45,7 +48,8 @@ target_link_libraries(clickhouse clickhouse-local clickhouse-benchmark clickhouse-performance-test - clickhouse-extract-from-config) + clickhouse-extract-from-config + clickhouse-compressor-lib) INSTALL(TARGETS clickhouse RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR} COMPONENT clickhouse) # make symbolic links to concrete clickhouse applications macro(install_symlink_to_clickhouse app) diff --git a/dbms/src/Server/Compressor.cpp b/dbms/src/Server/Compressor.cpp new file mode 100644 index 00000000000..67dca9113f0 --- /dev/null +++ b/dbms/src/Server/Compressor.cpp @@ -0,0 +1,124 @@ +#include + +#include + +#include +#include +#include +#include +#include +#include +#include + + +namespace DB +{ + namespace ErrorCodes + { + extern const int TOO_LARGE_SIZE_COMPRESSED; + } +} + + +namespace +{ + +/// Outputs sizes of uncompressed and compressed blocks for compressed file. +void checkAndWriteHeader(DB::ReadBuffer & in, DB::WriteBuffer & out) +{ + while (!in.eof()) + { + in.ignore(16); /// checksum + + char header[COMPRESSED_BLOCK_HEADER_SIZE]; + in.readStrict(header, COMPRESSED_BLOCK_HEADER_SIZE); + + UInt32 size_compressed = unalignedLoad(&header[1]); + + if (size_compressed > DBMS_MAX_COMPRESSED_SIZE) + throw DB::Exception("Too large size_compressed. Most likely corrupted data.", DB::ErrorCodes::TOO_LARGE_SIZE_COMPRESSED); + + UInt32 size_decompressed = unalignedLoad(&header[5]); + + DB::writeText(size_decompressed, out); + DB::writeChar('\t', out); + DB::writeText(size_compressed, out); + DB::writeChar('\n', out); + + in.ignore(size_compressed - COMPRESSED_BLOCK_HEADER_SIZE); + } +} + +} + + +int mainEntryClickHouseCompressor(int argc, char ** argv) +{ + boost::program_options::options_description desc("Allowed options"); + desc.add_options() + ("help,h", "produce help message") + ("decompress,d", "decompress") + ("block-size,b", boost::program_options::value()->default_value(DBMS_DEFAULT_BUFFER_SIZE), "compress in blocks of specified size") + ("hc", "use LZ4HC instead of LZ4") + ("zstd", "use ZSTD instead of LZ4") + ("none", "use no compression instead of LZ4") + ("stat", "print block statistics of compressed data") + ; + + boost::program_options::variables_map options; + boost::program_options::store(boost::program_options::parse_command_line(argc, argv, desc), options); + + if (options.count("help")) + { + std::cout << "Usage: " << argv[0] << " [options] < in > out" << std::endl; + std::cout << desc << std::endl; + return 1; + } + + try + { + bool decompress = options.count("decompress"); + bool use_lz4hc = options.count("hc"); + bool use_zstd = options.count("zstd"); + bool stat_mode = options.count("stat"); + bool use_none = options.count("none"); + unsigned block_size = options["block-size"].as(); + + DB::CompressionMethod method = DB::CompressionMethod::LZ4; + + if (use_lz4hc) + method = DB::CompressionMethod::LZ4HC; + else if (use_zstd) + method = DB::CompressionMethod::ZSTD; + else if (use_none) + method = DB::CompressionMethod::NONE; + + DB::ReadBufferFromFileDescriptor rb(STDIN_FILENO); + DB::WriteBufferFromFileDescriptor wb(STDOUT_FILENO); + + if (stat_mode) + { + /// Output statistic for compressed file. + checkAndWriteHeader(rb, wb); + } + else if (decompress) + { + /// Decompression + DB::CompressedReadBuffer from(rb); + DB::copyData(from, wb); + } + else + { + /// Compression + DB::CompressedWriteBuffer to(wb, method, block_size); + DB::copyData(rb, to); + } + } + catch (...) + { + std::cerr << DB::getCurrentExceptionMessage(true); + return DB::getCurrentExceptionCode(); + } + + return 0; +} diff --git a/dbms/src/Server/main.cpp b/dbms/src/Server/main.cpp index 106c2c04486..13cc1049f84 100644 --- a/dbms/src/Server/main.cpp +++ b/dbms/src/Server/main.cpp @@ -13,6 +13,7 @@ int mainEntryClickHouseLocal(int argc, char ** argv); int mainEntryClickHouseBenchmark(int argc, char ** argv); int mainEntryClickHousePerformanceTest(int argc, char ** argv); int mainEntryClickHouseExtractFromConfig(int argc, char ** argv); +int mainEntryClickHouseCompressor(int argc, char ** argv); namespace { @@ -29,6 +30,7 @@ std::pair clickhouse_applications[] = {"server", mainEntryClickHouseServer}, {"performance-test", mainEntryClickHousePerformanceTest}, {"extract-from-config", mainEntryClickHouseExtractFromConfig}, + {"compressor", mainEntryClickHouseCompressor} }; diff --git a/utils/compressor/CMakeLists.txt b/utils/compressor/CMakeLists.txt index f5e1a104397..06b6a16ea1e 100644 --- a/utils/compressor/CMakeLists.txt +++ b/utils/compressor/CMakeLists.txt @@ -1,6 +1,6 @@ add_executable (clickhouse-compressor main.cpp) -target_link_libraries (clickhouse-compressor dbms ${Boost_PROGRAM_OPTIONS_LIBRARY}) +target_link_libraries (clickhouse-compressor clickhouse-compressor-lib) install (TARGETS clickhouse-compressor RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR} COMPONENT clickhouse-compressor) diff --git a/utils/compressor/main.cpp b/utils/compressor/main.cpp index 4e649ed1363..087bfa116de 100644 --- a/utils/compressor/main.cpp +++ b/utils/compressor/main.cpp @@ -1,119 +1,6 @@ -#include - -#include - -#include -#include -#include -#include -#include -#include -#include - - -namespace DB -{ - namespace ErrorCodes - { - extern const int TOO_LARGE_SIZE_COMPRESSED; - } -} - - -/// Outputs sizes of uncompressed and compressed blocks for compressed file. -void stat(DB::ReadBuffer & in, DB::WriteBuffer & out) -{ - while (!in.eof()) - { - in.ignore(16); /// checksum - - char header[COMPRESSED_BLOCK_HEADER_SIZE]; - in.readStrict(header, COMPRESSED_BLOCK_HEADER_SIZE); - - UInt32 size_compressed = unalignedLoad(&header[1]); - - if (size_compressed > DBMS_MAX_COMPRESSED_SIZE) - throw DB::Exception("Too large size_compressed. Most likely corrupted data.", DB::ErrorCodes::TOO_LARGE_SIZE_COMPRESSED); - - UInt32 size_decompressed = unalignedLoad(&header[5]); - - DB::writeText(size_decompressed, out); - DB::writeChar('\t', out); - DB::writeText(size_compressed, out); - DB::writeChar('\n', out); - - in.ignore(size_compressed - COMPRESSED_BLOCK_HEADER_SIZE); - } -} - +int mainEntryClickHouseCompressor(int argc, char ** argv); int main(int argc, char ** argv) { - boost::program_options::options_description desc("Allowed options"); - desc.add_options() - ("help,h", "produce help message") - ("decompress,d", "decompress") - ("block-size,b", boost::program_options::value()->default_value(DBMS_DEFAULT_BUFFER_SIZE), "compress in blocks of specified size") - ("hc", "use LZ4HC instead of LZ4") - ("zstd", "use ZSTD instead of LZ4") - ("none", "use no compression instead of LZ4") - ("stat", "print block statistics of compressed data") - ; - - boost::program_options::variables_map options; - boost::program_options::store(boost::program_options::parse_command_line(argc, argv, desc), options); - - if (options.count("help")) - { - std::cout << "Usage: " << argv[0] << " [options] < in > out" << std::endl; - std::cout << desc << std::endl; - return 1; - } - - try - { - bool decompress = options.count("decompress"); - bool use_lz4hc = options.count("hc"); - bool use_zstd = options.count("zstd"); - bool stat_mode = options.count("stat"); - bool use_none = options.count("none"); - unsigned block_size = options["block-size"].as(); - - DB::CompressionMethod method = DB::CompressionMethod::LZ4; - - if (use_lz4hc) - method = DB::CompressionMethod::LZ4HC; - else if (use_zstd) - method = DB::CompressionMethod::ZSTD; - else if (use_none) - method = DB::CompressionMethod::NONE; - - DB::ReadBufferFromFileDescriptor rb(STDIN_FILENO); - DB::WriteBufferFromFileDescriptor wb(STDOUT_FILENO); - - if (stat_mode) - { - /// Output statistic for compressed file. - stat(rb, wb); - } - else if (decompress) - { - /// Decompression - DB::CompressedReadBuffer from(rb); - DB::copyData(from, wb); - } - else - { - /// Compression - DB::CompressedWriteBuffer to(wb, method, block_size); - DB::copyData(rb, to); - } - } - catch (...) - { - std::cerr << DB::getCurrentExceptionMessage(true); - return DB::getCurrentExceptionCode(); - } - - return 0; + return mainEntryClickHouseCompressor(argc, argv); }