Merge pull request #1045 from prog8/nocompression

Added compression NONE
This commit is contained in:
alexey-milovidov 2017-08-01 23:07:52 +03:00 committed by GitHub
commit ae8783aee3
8 changed files with 50 additions and 7 deletions

View File

@ -2,6 +2,7 @@
#include <vector>
#include <string.h>
#include <city.h>
#include <lz4.h>
#include <zstd.h>
@ -52,7 +53,10 @@ size_t CompressedReadBufferBase::readCompressedData(size_t & size_decompressed,
size_t & size_compressed = size_compressed_without_checksum;
if (method == static_cast<UInt8>(CompressionMethodByte::LZ4) || method == static_cast<UInt8>(CompressionMethodByte::ZSTD))
if (method == static_cast<UInt8>(CompressionMethodByte::LZ4) ||
method == static_cast<UInt8>(CompressionMethodByte::ZSTD) ||
method == static_cast<UInt8>(CompressionMethodByte::NONE))
{
size_compressed = unalignedLoad<UInt32>(&own_compressed_buffer[1]);
size_decompressed = unalignedLoad<UInt32>(&own_compressed_buffer[5]);
@ -108,6 +112,10 @@ void CompressedReadBufferBase::decompress(char * to, size_t size_decompressed, s
if (ZSTD_isError(res))
throw Exception("Cannot ZSTD_decompress: " + std::string(ZSTD_getErrorName(res)), ErrorCodes::CANNOT_DECOMPRESS);
}
else if (method == static_cast<UInt8>(CompressionMethodByte::NONE))
{
memcpy(to, &compressed_buffer[COMPRESSED_BLOCK_HEADER_SIZE], size_decompressed);
}
else
throw Exception("Unknown compression method: " + toString(method), ErrorCodes::UNKNOWN_COMPRESSION_METHOD);
}

View File

@ -18,6 +18,7 @@ enum class CompressionMethod
LZ4 = 1,
LZ4HC = 2, /// The format is the same as for LZ4. The difference is only in compression.
ZSTD = 3, /// Experimental algorithm: https://github.com/Cyan4973/zstd
NONE = 4, /// No compression
};
/** The compressed block format is as follows:
@ -43,8 +44,9 @@ enum class CompressionMethod
enum class CompressionMethodByte : uint8_t
{
LZ4 = 0x82,
ZSTD = 0x90,
NONE = 0x02,
LZ4 = 0x82,
ZSTD = 0x90,
};
}

View File

@ -3,6 +3,7 @@
#include <lz4.h>
#include <lz4hc.h>
#include <zstd.h>
#include <string.h>
#include <common/unaligned.h>
#include <Core/Types.h>
@ -98,6 +99,25 @@ void CompressedWriteBuffer::nextImpl()
compressed_buffer_ptr = &compressed_buffer[0];
break;
}
case CompressionMethod::NONE:
{
static constexpr size_t header_size = 1 + sizeof (UInt32) + sizeof (UInt32);
compressed_size = header_size + uncompressed_size;
UInt32 uncompressed_size_32 = uncompressed_size;
UInt32 compressed_size_32 = compressed_size;
compressed_buffer.resize(compressed_size);
compressed_buffer[0] = static_cast<UInt8>(CompressionMethodByte::NONE);
unalignedStore(&compressed_buffer[1], compressed_size_32);
unalignedStore(&compressed_buffer[5], uncompressed_size_32);
memcpy(&compressed_buffer[9], working_buffer.begin(), uncompressed_size);
compressed_buffer_ptr = &compressed_buffer[0];
break;
}
default:
throw Exception("Unknown compression method", ErrorCodes::UNKNOWN_COMPRESSION_METHOD);
}

View File

@ -51,6 +51,8 @@ private:
method = CompressionMethod::LZ4;
else if (name == "zstd")
method = CompressionMethod::ZSTD;
else if (name == "none")
method = CompressionMethod::NONE;
else
throw Exception("Unknown compression method " + name, ErrorCodes::UNKNOWN_COMPRESSION_METHOD);
}

View File

@ -1028,8 +1028,11 @@ MergeTreeData::AlterDataPartTransactionPtr MergeTreeData::alterDataPart(
*this, part, DEFAULT_MERGE_BLOCK_SIZE, 0, 0, expression->getRequiredColumns(), ranges,
false, nullptr, "", false, 0, DBMS_DEFAULT_BUFFER_SIZE, false);
auto compression_method = this->context.chooseCompressionMethod(
part->size_in_bytes,
static_cast<double>(part->size_in_bytes) / this->getTotalActiveSizeInBytes());
ExpressionBlockInputStream in(part_in, expression);
MergedColumnOnlyOutputStream out(*this, full_path + part->name + '/', true, CompressionMethod::LZ4, false);
MergedColumnOnlyOutputStream out(*this, full_path + part->name + '/', true, compression_method, false);
in.readPrefix();
out.writePrefix();

View File

@ -620,8 +620,8 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
merged_stream = std::make_shared<DistinctSortedBlockInputStream>(merged_stream, Limits(), 0 /*limit_hint*/, Names());
auto compression_method = data.context.chooseCompressionMethod(
merge_entry->total_size_bytes_compressed,
static_cast<double>(merge_entry->total_size_bytes_compressed) / data.getTotalActiveSizeInBytes());
merge_entry->total_size_bytes_compressed,
static_cast<double> (merge_entry->total_size_bytes_compressed) / data.getTotalActiveSizeInBytes());
MergedBlockOutputStream to{
data, new_part_tmp_path, merging_columns, compression_method, merged_column_to_size, aio_threshold};

View File

@ -146,8 +146,12 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithDa
ProfileEvents::increment(ProfileEvents::MergeTreeDataWriterBlocksAlreadySorted);
}
/// This effectively chooses minimal compression method:
/// either default lz4 or compression method with zero thresholds on absolute and relative part size.
auto compression_method = data.context.chooseCompressionMethod(0, 0);
NamesAndTypesList columns = data.getColumnsList().filter(block.getColumnsList().getNames());
MergedBlockOutputStream out(data, new_data_part->getFullPath(), columns, CompressionMethod::LZ4);
MergedBlockOutputStream out(data, new_data_part->getFullPath(), columns, compression_method);
out.writePrefix();
out.writeWithPermutation(block, perm_ptr);

View File

@ -56,6 +56,7 @@ int main(int argc, char ** argv)
("block-size,b", boost::program_options::value<unsigned>()->default_value(DBMS_DEFAULT_BUFFER_SIZE), "compress in blocks of specified size")
("hc", "use LZ4HC instead of LZ4")
("zstd", "use ZSTD instead of LZ4")
("none", "use no compression instead of LZ4")
("stat", "print block statistics of compressed data")
;
@ -75,6 +76,7 @@ int main(int argc, char ** argv)
bool use_lz4hc = options.count("hc");
bool use_zstd = options.count("zstd");
bool stat_mode = options.count("stat");
bool use_none = options.count("none");
unsigned block_size = options["block-size"].as<unsigned>();
DB::CompressionMethod method = DB::CompressionMethod::LZ4;
@ -83,6 +85,8 @@ int main(int argc, char ** argv)
method = DB::CompressionMethod::LZ4HC;
else if (use_zstd)
method = DB::CompressionMethod::ZSTD;
else if (use_none)
method = DB::CompressionMethod::NONE;
DB::ReadBufferFromFileDescriptor rb(STDIN_FILENO);
DB::WriteBufferFromFileDescriptor wb(STDOUT_FILENO);