Add 'clickhouse compressor' command. [#CLICKHOUSE-2]

This commit is contained in:
Vitaliy Lyudvichenko 2017-09-20 17:12:12 +03:00 committed by alexey-milovidov
parent 7ee2c52328
commit 48c5382c92
5 changed files with 134 additions and 117 deletions

View File

@ -37,6 +37,9 @@ add_library (clickhouse-performance-test PerformanceTest.cpp)
target_link_libraries (clickhouse-performance-test dbms ${Boost_PROGRAM_OPTIONS_LIBRARY}) target_link_libraries (clickhouse-performance-test dbms ${Boost_PROGRAM_OPTIONS_LIBRARY})
target_include_directories (clickhouse-performance-test PRIVATE ${PCG_RANDOM_INCLUDE_DIR}) target_include_directories (clickhouse-performance-test PRIVATE ${PCG_RANDOM_INCLUDE_DIR})
add_library (clickhouse-compressor-lib Compressor.cpp)
target_link_libraries (clickhouse-compressor-lib dbms ${Boost_PROGRAM_OPTIONS_LIBRARY})
add_executable(clickhouse main.cpp) add_executable(clickhouse main.cpp)
target_include_directories(clickhouse PRIVATE ${COMMON_INCLUDE_DIR}) target_include_directories(clickhouse PRIVATE ${COMMON_INCLUDE_DIR})
target_link_libraries(clickhouse target_link_libraries(clickhouse
@ -45,7 +48,8 @@ target_link_libraries(clickhouse
clickhouse-local clickhouse-local
clickhouse-benchmark clickhouse-benchmark
clickhouse-performance-test clickhouse-performance-test
clickhouse-extract-from-config) clickhouse-extract-from-config
clickhouse-compressor-lib)
INSTALL(TARGETS clickhouse RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR} COMPONENT clickhouse) INSTALL(TARGETS clickhouse RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR} COMPONENT clickhouse)
# make symbolic links to concrete clickhouse applications # make symbolic links to concrete clickhouse applications
macro(install_symlink_to_clickhouse app) macro(install_symlink_to_clickhouse app)

View File

@ -0,0 +1,124 @@
#include <iostream>
#include <boost/program_options.hpp>
#include <Common/Exception.h>
#include <IO/WriteBufferFromFileDescriptor.h>
#include <IO/ReadBufferFromFileDescriptor.h>
#include <IO/CompressedWriteBuffer.h>
#include <IO/CompressedReadBuffer.h>
#include <IO/WriteHelpers.h>
#include <IO/copyData.h>
namespace DB
{
namespace ErrorCodes
{
extern const int TOO_LARGE_SIZE_COMPRESSED;
}
}
namespace
{
/// Outputs sizes of uncompressed and compressed blocks for compressed file.
void checkAndWriteHeader(DB::ReadBuffer & in, DB::WriteBuffer & out)
{
while (!in.eof())
{
in.ignore(16); /// checksum
char header[COMPRESSED_BLOCK_HEADER_SIZE];
in.readStrict(header, COMPRESSED_BLOCK_HEADER_SIZE);
UInt32 size_compressed = unalignedLoad<UInt32>(&header[1]);
if (size_compressed > DBMS_MAX_COMPRESSED_SIZE)
throw DB::Exception("Too large size_compressed. Most likely corrupted data.", DB::ErrorCodes::TOO_LARGE_SIZE_COMPRESSED);
UInt32 size_decompressed = unalignedLoad<UInt32>(&header[5]);
DB::writeText(size_decompressed, out);
DB::writeChar('\t', out);
DB::writeText(size_compressed, out);
DB::writeChar('\n', out);
in.ignore(size_compressed - COMPRESSED_BLOCK_HEADER_SIZE);
}
}
}
int mainEntryClickHouseCompressor(int argc, char ** argv)
{
boost::program_options::options_description desc("Allowed options");
desc.add_options()
("help,h", "produce help message")
("decompress,d", "decompress")
("block-size,b", boost::program_options::value<unsigned>()->default_value(DBMS_DEFAULT_BUFFER_SIZE), "compress in blocks of specified size")
("hc", "use LZ4HC instead of LZ4")
("zstd", "use ZSTD instead of LZ4")
("none", "use no compression instead of LZ4")
("stat", "print block statistics of compressed data")
;
boost::program_options::variables_map options;
boost::program_options::store(boost::program_options::parse_command_line(argc, argv, desc), options);
if (options.count("help"))
{
std::cout << "Usage: " << argv[0] << " [options] < in > out" << std::endl;
std::cout << desc << std::endl;
return 1;
}
try
{
bool decompress = options.count("decompress");
bool use_lz4hc = options.count("hc");
bool use_zstd = options.count("zstd");
bool stat_mode = options.count("stat");
bool use_none = options.count("none");
unsigned block_size = options["block-size"].as<unsigned>();
DB::CompressionMethod method = DB::CompressionMethod::LZ4;
if (use_lz4hc)
method = DB::CompressionMethod::LZ4HC;
else if (use_zstd)
method = DB::CompressionMethod::ZSTD;
else if (use_none)
method = DB::CompressionMethod::NONE;
DB::ReadBufferFromFileDescriptor rb(STDIN_FILENO);
DB::WriteBufferFromFileDescriptor wb(STDOUT_FILENO);
if (stat_mode)
{
/// Output statistic for compressed file.
checkAndWriteHeader(rb, wb);
}
else if (decompress)
{
/// Decompression
DB::CompressedReadBuffer from(rb);
DB::copyData(from, wb);
}
else
{
/// Compression
DB::CompressedWriteBuffer to(wb, method, block_size);
DB::copyData(rb, to);
}
}
catch (...)
{
std::cerr << DB::getCurrentExceptionMessage(true);
return DB::getCurrentExceptionCode();
}
return 0;
}

View File

@ -13,6 +13,7 @@ int mainEntryClickHouseLocal(int argc, char ** argv);
int mainEntryClickHouseBenchmark(int argc, char ** argv); int mainEntryClickHouseBenchmark(int argc, char ** argv);
int mainEntryClickHousePerformanceTest(int argc, char ** argv); int mainEntryClickHousePerformanceTest(int argc, char ** argv);
int mainEntryClickHouseExtractFromConfig(int argc, char ** argv); int mainEntryClickHouseExtractFromConfig(int argc, char ** argv);
int mainEntryClickHouseCompressor(int argc, char ** argv);
namespace namespace
{ {
@ -29,6 +30,7 @@ std::pair<const char *, MainFunc> clickhouse_applications[] =
{"server", mainEntryClickHouseServer}, {"server", mainEntryClickHouseServer},
{"performance-test", mainEntryClickHousePerformanceTest}, {"performance-test", mainEntryClickHousePerformanceTest},
{"extract-from-config", mainEntryClickHouseExtractFromConfig}, {"extract-from-config", mainEntryClickHouseExtractFromConfig},
{"compressor", mainEntryClickHouseCompressor}
}; };

View File

@ -1,6 +1,6 @@
add_executable (clickhouse-compressor main.cpp) add_executable (clickhouse-compressor main.cpp)
target_link_libraries (clickhouse-compressor dbms ${Boost_PROGRAM_OPTIONS_LIBRARY}) target_link_libraries (clickhouse-compressor clickhouse-compressor-lib)
install (TARGETS clickhouse-compressor RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR} COMPONENT clickhouse-compressor) install (TARGETS clickhouse-compressor RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR} COMPONENT clickhouse-compressor)

View File

@ -1,119 +1,6 @@
#include <iostream> int mainEntryClickHouseCompressor(int argc, char ** argv);
#include <boost/program_options.hpp>
#include <Common/Exception.h>
#include <IO/WriteBufferFromFileDescriptor.h>
#include <IO/ReadBufferFromFileDescriptor.h>
#include <IO/CompressedWriteBuffer.h>
#include <IO/CompressedReadBuffer.h>
#include <IO/WriteHelpers.h>
#include <IO/copyData.h>
namespace DB
{
namespace ErrorCodes
{
extern const int TOO_LARGE_SIZE_COMPRESSED;
}
}
/// Outputs sizes of uncompressed and compressed blocks for compressed file.
void stat(DB::ReadBuffer & in, DB::WriteBuffer & out)
{
while (!in.eof())
{
in.ignore(16); /// checksum
char header[COMPRESSED_BLOCK_HEADER_SIZE];
in.readStrict(header, COMPRESSED_BLOCK_HEADER_SIZE);
UInt32 size_compressed = unalignedLoad<UInt32>(&header[1]);
if (size_compressed > DBMS_MAX_COMPRESSED_SIZE)
throw DB::Exception("Too large size_compressed. Most likely corrupted data.", DB::ErrorCodes::TOO_LARGE_SIZE_COMPRESSED);
UInt32 size_decompressed = unalignedLoad<UInt32>(&header[5]);
DB::writeText(size_decompressed, out);
DB::writeChar('\t', out);
DB::writeText(size_compressed, out);
DB::writeChar('\n', out);
in.ignore(size_compressed - COMPRESSED_BLOCK_HEADER_SIZE);
}
}
int main(int argc, char ** argv) int main(int argc, char ** argv)
{ {
boost::program_options::options_description desc("Allowed options"); return mainEntryClickHouseCompressor(argc, argv);
desc.add_options()
("help,h", "produce help message")
("decompress,d", "decompress")
("block-size,b", boost::program_options::value<unsigned>()->default_value(DBMS_DEFAULT_BUFFER_SIZE), "compress in blocks of specified size")
("hc", "use LZ4HC instead of LZ4")
("zstd", "use ZSTD instead of LZ4")
("none", "use no compression instead of LZ4")
("stat", "print block statistics of compressed data")
;
boost::program_options::variables_map options;
boost::program_options::store(boost::program_options::parse_command_line(argc, argv, desc), options);
if (options.count("help"))
{
std::cout << "Usage: " << argv[0] << " [options] < in > out" << std::endl;
std::cout << desc << std::endl;
return 1;
}
try
{
bool decompress = options.count("decompress");
bool use_lz4hc = options.count("hc");
bool use_zstd = options.count("zstd");
bool stat_mode = options.count("stat");
bool use_none = options.count("none");
unsigned block_size = options["block-size"].as<unsigned>();
DB::CompressionMethod method = DB::CompressionMethod::LZ4;
if (use_lz4hc)
method = DB::CompressionMethod::LZ4HC;
else if (use_zstd)
method = DB::CompressionMethod::ZSTD;
else if (use_none)
method = DB::CompressionMethod::NONE;
DB::ReadBufferFromFileDescriptor rb(STDIN_FILENO);
DB::WriteBufferFromFileDescriptor wb(STDOUT_FILENO);
if (stat_mode)
{
/// Output statistic for compressed file.
stat(rb, wb);
}
else if (decompress)
{
/// Decompression
DB::CompressedReadBuffer from(rb);
DB::copyData(from, wb);
}
else
{
/// Compression
DB::CompressedWriteBuffer to(wb, method, block_size);
DB::copyData(rb, to);
}
}
catch (...)
{
std::cerr << DB::getCurrentExceptionMessage(true);
return DB::getCurrentExceptionCode();
}
return 0;
} }