#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace DB { namespace ErrorCodes { extern const int TOO_LARGE_SIZE_COMPRESSED; extern const int BAD_ARGUMENTS; } } namespace { /// Outputs sizes of uncompressed and compressed blocks for compressed file. void checkAndWriteHeader(DB::ReadBuffer & in, DB::WriteBuffer & out) { while (!in.eof()) { in.ignore(16); /// checksum char header[COMPRESSED_BLOCK_HEADER_SIZE]; in.readStrict(header, COMPRESSED_BLOCK_HEADER_SIZE); UInt32 size_compressed = unalignedLoad(&header[1]); if (size_compressed > DBMS_MAX_COMPRESSED_SIZE) throw DB::Exception("Too large size_compressed. Most likely corrupted data.", DB::ErrorCodes::TOO_LARGE_SIZE_COMPRESSED); UInt32 size_decompressed = unalignedLoad(&header[5]); DB::writeText(size_decompressed, out); DB::writeChar('\t', out); DB::writeText(size_compressed, out); DB::writeChar('\n', out); in.ignore(size_compressed - COMPRESSED_BLOCK_HEADER_SIZE); } } } int mainEntryClickHouseCompressor(int argc, char ** argv) { using namespace DB; namespace po = boost::program_options; try { po::options_description desc = createOptionsDescription("Allowed options", getTerminalWidth()); desc.add_options() ("help,h", "produce help message") ("input", po::value()->value_name("INPUT"), "input file") ("output", po::value()->value_name("OUTPUT"), "output file") ("decompress,d", "decompress") ("offset-in-compressed-file", po::value()->default_value(0ULL), "offset to the compressed block (i.e. physical file offset)") ("offset-in-decompressed-block", po::value()->default_value(0ULL), "offset to the decompressed block (i.e. virtual offset)") ("block-size,b", po::value()->default_value(DBMS_DEFAULT_BUFFER_SIZE), "compress in blocks of specified size") ("hc", "use LZ4HC instead of LZ4") ("zstd", "use ZSTD instead of LZ4") ("deflate_qpl", "use deflate_qpl instead of LZ4") ("codec", po::value>()->multitoken(), "use codecs combination instead of LZ4") ("level", po::value(), "compression level for codecs specified via flags") ("none", "use no compression instead of LZ4") ("stat", "print block statistics of compressed data") ; po::positional_options_description positional_desc; positional_desc.add("input", 1); positional_desc.add("output", 1); po::variables_map options; po::store(po::command_line_parser(argc, argv).options(desc).positional(positional_desc).run(), options); if (options.count("help")) { std::cout << "Usage: " << argv[0] << " [options] < INPUT > OUTPUT" << std::endl; std::cout << "Usage: " << argv[0] << " [options] INPUT OUTPUT" << std::endl; std::cout << desc << std::endl; return 0; } bool decompress = options.count("decompress"); bool use_lz4hc = options.count("hc"); bool use_zstd = options.count("zstd"); bool use_deflate_qpl = options.count("deflate_qpl"); bool stat_mode = options.count("stat"); bool use_none = options.count("none"); unsigned block_size = options["block-size"].as(); std::vector codecs; if (options.count("codec")) codecs = options["codec"].as>(); if ((use_lz4hc || use_zstd || use_deflate_qpl || use_none) && !codecs.empty()) throw Exception("Wrong options, codec flags like --zstd and --codec options are mutually exclusive", ErrorCodes::BAD_ARGUMENTS); if (!codecs.empty() && options.count("level")) throw Exception("Wrong options, --level is not compatible with --codec list", ErrorCodes::BAD_ARGUMENTS); std::string method_family = "LZ4"; if (use_lz4hc) method_family = "LZ4HC"; else if (use_zstd) method_family = "ZSTD"; else if (use_deflate_qpl) method_family = "DEFLATE_QPL"; else if (use_none) method_family = "NONE"; std::optional level = std::nullopt; if (options.count("level")) level = options["level"].as(); CompressionCodecPtr codec; if (!codecs.empty()) { ParserCodec codec_parser; std::string codecs_line = boost::algorithm::join(codecs, ","); auto ast = parseQuery(codec_parser, "(" + codecs_line + ")", 0, DBMS_DEFAULT_MAX_PARSER_DEPTH); codec = CompressionCodecFactory::instance().get(ast, nullptr); } else codec = CompressionCodecFactory::instance().get(method_family, level); std::unique_ptr rb; std::unique_ptr wb; if (options.count("input")) rb = std::make_unique(options["input"].as()); else rb = std::make_unique(STDIN_FILENO); if (options.count("output")) wb = std::make_unique(options["output"].as()); else wb = std::make_unique(STDOUT_FILENO); if (stat_mode) { /// Output statistic for compressed file. checkAndWriteHeader(*rb, *wb); } else if (decompress) { /// Decompression size_t offset_in_compressed_file = options["offset-in-compressed-file"].as(); size_t offset_in_decompressed_block = options["offset-in-decompressed-block"].as(); if (offset_in_compressed_file || offset_in_decompressed_block) { CompressedReadBufferFromFile compressed_file(std::move(rb)); compressed_file.seek(offset_in_compressed_file, offset_in_decompressed_block); copyData(compressed_file, *wb); } else { CompressedReadBuffer from(*rb); copyData(from, *wb); } } else { /// Compression CompressedWriteBuffer to(*wb, codec, block_size); copyData(*rb, to); } } catch (...) { std::cerr << getCurrentExceptionMessage(true) << '\n'; return getCurrentExceptionCode(); } return 0; }