#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace DB { namespace ErrorCodes { extern const int TOO_LARGE_SIZE_COMPRESSED; extern const int BAD_ARGUMENTS; } } namespace CurrentMetrics { extern const Metric LocalThread; extern const Metric LocalThreadActive; extern const Metric LocalThreadScheduled; } namespace { /// Outputs method, sizes of uncompressed and compressed blocks for compressed file. void checkAndWriteHeader(DB::ReadBuffer & in, DB::WriteBuffer & out) { while (!in.eof()) { UInt32 size_compressed; UInt32 size_decompressed; auto codec = DB::getCompressionCodecForFile(in, size_compressed, size_decompressed, true /* skip_to_next_block */); if (size_compressed > DBMS_MAX_COMPRESSED_SIZE) throw DB::Exception(DB::ErrorCodes::TOO_LARGE_SIZE_COMPRESSED, "Too large size_compressed. Most likely corrupted data."); DB::writeText(queryToString(codec->getFullCodecDesc()), out); DB::writeChar('\t', out); DB::writeText(size_decompressed, out); DB::writeChar('\t', out); DB::writeText(size_compressed, out); DB::writeChar('\n', out); } } } int mainEntryClickHouseCompressor(int argc, char ** argv) { using namespace DB; namespace po = boost::program_options; bool print_stacktrace = false; try { po::options_description desc = createOptionsDescription("Allowed options", getTerminalWidth()); desc.add_options() ("help,h", "produce help message") ("input", po::value()->value_name("INPUT"), "input file") ("output", po::value()->value_name("OUTPUT"), "output file") ("decompress,d", "decompress") ("offset-in-compressed-file", po::value()->default_value(0ULL), "offset to the compressed block (i.e. physical file offset)") ("offset-in-decompressed-block", po::value()->default_value(0ULL), "offset to the decompressed block (i.e. virtual offset)") ("block-size,b", po::value()->default_value(DBMS_DEFAULT_BUFFER_SIZE), "compress in blocks of specified size") ("hc", "use LZ4HC instead of LZ4") ("zstd", "use ZSTD instead of LZ4") ("codec", po::value>()->multitoken(), "use codecs combination instead of LZ4") ("level", po::value(), "compression level for codecs specified via flags") ("threads", po::value()->default_value(1), "number of threads for parallel compression") ("none", "use no compression instead of LZ4") ("stat", "print block statistics of compressed data") ("stacktrace", "print stacktrace of exception") ; po::positional_options_description positional_desc; positional_desc.add("input", 1); positional_desc.add("output", 1); po::variables_map options; po::store(po::command_line_parser(argc, argv).options(desc).positional(positional_desc).run(), options); if (options.count("help")) { std::cout << "Usage: " << argv[0] << " [options] < INPUT > OUTPUT" << std::endl; std::cout << "Usage: " << argv[0] << " [options] INPUT OUTPUT" << std::endl; std::cout << desc << std::endl; std::cout << "\nSee also: https://clickhouse.com/docs/en/operations/utilities/clickhouse-compressor/\n"; return 0; } bool decompress = options.count("decompress"); bool use_lz4hc = options.count("hc"); bool use_zstd = options.count("zstd"); bool stat_mode = options.count("stat"); bool use_none = options.count("none"); print_stacktrace = options.count("stacktrace"); size_t block_size = options["block-size"].as(); size_t num_threads = options["threads"].as(); std::vector codecs; if (options.count("codec")) codecs = options["codec"].as>(); if ((use_lz4hc || use_zstd || use_none) && !codecs.empty()) throw Exception(ErrorCodes::BAD_ARGUMENTS, "Wrong options, codec flags like --zstd and --codec options are mutually exclusive"); if (num_threads < 1) throw Exception(ErrorCodes::BAD_ARGUMENTS, "Invalid value of `threads` parameter"); if (num_threads > 1 && decompress) throw Exception(ErrorCodes::BAD_ARGUMENTS, "Parallel mode is only implemented for compression (not for decompression)"); if (!codecs.empty() && options.count("level")) throw Exception(ErrorCodes::BAD_ARGUMENTS, "Wrong options, --level is not compatible with --codec list"); std::string method_family = "LZ4"; if (use_lz4hc) method_family = "LZ4HC"; else if (use_zstd) method_family = "ZSTD"; else if (use_none) method_family = "NONE"; std::optional level = std::nullopt; if (options.count("level")) level = options["level"].as(); CompressionCodecPtr codec; if (!codecs.empty()) { ParserCodec codec_parser; std::string codecs_line = boost::algorithm::join(codecs, ","); auto ast = parseQuery(codec_parser, "(" + codecs_line + ")", 0, DBMS_DEFAULT_MAX_PARSER_DEPTH, DBMS_DEFAULT_MAX_PARSER_BACKTRACKS); codec = CompressionCodecFactory::instance().get(ast, nullptr); } else codec = CompressionCodecFactory::instance().get(method_family, level); std::unique_ptr rb; std::unique_ptr wb; if (options.count("input")) rb = std::make_unique(options["input"].as()); else rb = std::make_unique(STDIN_FILENO); if (options.count("output")) wb = std::make_unique(options["output"].as()); else wb = std::make_unique(STDOUT_FILENO); if (stat_mode) { /// Output statistic for compressed file. checkAndWriteHeader(*rb, *wb); } else if (decompress) { /// Decompression size_t offset_in_compressed_file = options["offset-in-compressed-file"].as(); size_t offset_in_decompressed_block = options["offset-in-decompressed-block"].as(); if (offset_in_compressed_file || offset_in_decompressed_block) { CompressedReadBufferFromFile compressed_file(std::move(rb)); compressed_file.seek(offset_in_compressed_file, offset_in_decompressed_block); copyData(compressed_file, *wb); } else { CompressedReadBuffer from(*rb); copyData(from, *wb); } } else { /// Compression if (num_threads == 1) { CompressedWriteBuffer to(*wb, codec, block_size); copyData(*rb, to); to.finalize(); } else { ThreadPool pool(CurrentMetrics::LocalThread, CurrentMetrics::LocalThreadActive, CurrentMetrics::LocalThreadScheduled, num_threads); ParallelCompressedWriteBuffer to(*wb, codec, block_size, num_threads, pool); copyData(*rb, to); to.finalize(); } } wb->finalize(); } catch (...) { std::cerr << getCurrentExceptionMessage(print_stacktrace) << '\n'; return getCurrentExceptionCode(); } return 0; }