mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-15 19:02:04 +00:00
support compress marks and primary key
This commit is contained in:
parent
aff8149f5c
commit
71935b96ef
@ -740,7 +740,7 @@ void IMergeTreeDataPart::loadIndex()
|
||||
loaded_index[i]->reserve(index_granularity.getMarksCount());
|
||||
}
|
||||
|
||||
String index_name = "primary.idx";
|
||||
String index_name = "primary" + getIndexExtensionFromFilesystem(data_part_storage).value();
|
||||
String index_path = fs::path(data_part_storage->getRelativePath()) / index_name;
|
||||
auto index_file = metadata_manager->read(index_name);
|
||||
size_t marks_count = index_granularity.getMarksCount();
|
||||
@ -779,7 +779,10 @@ void IMergeTreeDataPart::appendFilesOfIndex(Strings & files) const
|
||||
return;
|
||||
|
||||
if (metadata_snapshot->hasPrimaryKey())
|
||||
files.push_back("primary.idx");
|
||||
{
|
||||
String index_name = "primary" + getIndexExtensionFromFilesystem(data_part_storage).value();
|
||||
files.push_back(index_name);
|
||||
}
|
||||
}
|
||||
|
||||
NameSet IMergeTreeDataPart::getFileNamesWithoutChecksums() const
|
||||
@ -1527,8 +1530,11 @@ void IMergeTreeDataPart::checkConsistencyBase() const
|
||||
const auto & partition_key = metadata_snapshot->getPartitionKey();
|
||||
if (!checksums.empty())
|
||||
{
|
||||
if (!pk.column_names.empty() && !checksums.files.contains("primary.idx"))
|
||||
throw Exception("No checksum for primary.idx", ErrorCodes::NO_FILE_IN_DATA_PART);
|
||||
if (!pk.column_names.empty()
|
||||
&& (!checksums.files.contains("primary" + getIndexExtension(false))
|
||||
&& !checksums.files.contains("primary" + getIndexExtension(true))))
|
||||
throw Exception("No checksum for " + toString("primary" + getIndexExtension(false)) + " or " + toString("primary" + getIndexExtension(true)),
|
||||
ErrorCodes::NO_FILE_IN_DATA_PART);
|
||||
|
||||
if (storage.format_version >= MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
|
||||
{
|
||||
@ -1566,7 +1572,10 @@ void IMergeTreeDataPart::checkConsistencyBase() const
|
||||
|
||||
/// Check that the primary key index is not empty.
|
||||
if (!pk.column_names.empty())
|
||||
check_file_not_empty("primary.idx");
|
||||
{
|
||||
String index_name = "primary" + getIndexExtensionFromFilesystem(data_part_storage).value();
|
||||
check_file_not_empty(index_name);
|
||||
}
|
||||
|
||||
if (storage.format_version >= MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
|
||||
{
|
||||
@ -1779,4 +1788,32 @@ bool isInMemoryPart(const MergeTreeDataPartPtr & data_part)
|
||||
return (data_part && data_part->getType() == MergeTreeDataPartType::InMemory);
|
||||
}
|
||||
|
||||
std::optional<std::string> getIndexExtensionFromFilesystem(const DataPartStoragePtr & data_part_storage)
|
||||
{
|
||||
if (data_part_storage->exists())
|
||||
{
|
||||
for (auto it = data_part_storage->iterate(); it->isValid(); it->next())
|
||||
{
|
||||
const auto & extension = fs::path(it->name()).extension();
|
||||
if (extension == getIndexExtension(false)
|
||||
|| extension == getIndexExtension(true))
|
||||
return extension;
|
||||
}
|
||||
}
|
||||
return {".idx"};
|
||||
}
|
||||
|
||||
bool isCompressFromIndexExtension(const String & index_extension)
|
||||
{
|
||||
return index_extension == getIndexExtension(true);
|
||||
}
|
||||
|
||||
bool isCompressFromMrkExtension(const String & mrk_extension)
|
||||
{
|
||||
return mrk_extension == getNonAdaptiveMrkExtension(true)
|
||||
|| mrk_extension == getAdaptiveMrkExtension(MergeTreeDataPartType::Wide, true)
|
||||
|| mrk_extension == getAdaptiveMrkExtension(MergeTreeDataPartType::Compact, true)
|
||||
|| mrk_extension == getAdaptiveMrkExtension(MergeTreeDataPartType::InMemory, true);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -605,5 +605,9 @@ using MergeTreeMutableDataPartPtr = std::shared_ptr<IMergeTreeDataPart>;
|
||||
bool isCompactPart(const MergeTreeDataPartPtr & data_part);
|
||||
bool isWidePart(const MergeTreeDataPartPtr & data_part);
|
||||
bool isInMemoryPart(const MergeTreeDataPartPtr & data_part);
|
||||
inline String getIndexExtension(bool is_compress_primary_key) { return is_compress_primary_key ? ".cidx" : ".idx"; }
|
||||
std::optional<String> getIndexExtensionFromFilesystem(const DataPartStoragePtr & data_part_storage);
|
||||
bool isCompressFromIndexExtension(const String & index_extension);
|
||||
bool isCompressFromMrkExtension(const String & mrk_extension);
|
||||
|
||||
}
|
||||
|
@ -8,4 +8,14 @@ namespace DB
|
||||
IPartMetadataManager::IPartMetadataManager(const IMergeTreeDataPart * part_) : part(part_)
|
||||
{
|
||||
}
|
||||
|
||||
bool IPartMetadataManager::isCompressFromFileName(const String & file_name) const
|
||||
{
|
||||
const auto & extension = fs::path(file_name).extension();
|
||||
if (isCompressFromMrkExtension(extension)
|
||||
|| isCompressFromIndexExtension(extension))
|
||||
return true;
|
||||
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
@ -9,6 +9,7 @@ namespace DB
|
||||
|
||||
class IMergeTreeDataPart;
|
||||
|
||||
class ReadBuffer;
|
||||
class SeekableReadBuffer;
|
||||
|
||||
class IDisk;
|
||||
@ -29,8 +30,8 @@ public:
|
||||
|
||||
virtual ~IPartMetadataManager() = default;
|
||||
|
||||
/// Read metadata content and return SeekableReadBuffer object.
|
||||
virtual std::unique_ptr<SeekableReadBuffer> read(const String & file_name) const = 0;
|
||||
/// Read metadata content and return ReadBuffer object.
|
||||
virtual std::unique_ptr<ReadBuffer> read(const String & file_name) const = 0;
|
||||
|
||||
/// Return true if metadata exists in part.
|
||||
virtual bool exists(const String & file_name) const = 0;
|
||||
@ -50,6 +51,9 @@ public:
|
||||
/// Check all metadatas in part.
|
||||
virtual std::unordered_map<String, uint128> check() const = 0;
|
||||
|
||||
/// Determine whether to compress by file extension
|
||||
bool isCompressFromFileName(const String & file_name) const;
|
||||
|
||||
protected:
|
||||
const IMergeTreeDataPart * part;
|
||||
};
|
||||
|
@ -2601,11 +2601,14 @@ MergeTreeData::MutableDataPartPtr MergeTreeData::createPart(const String & name,
|
||||
|
||||
static MergeTreeDataPartType getPartTypeFromMarkExtension(const String & mrk_ext)
|
||||
{
|
||||
if (mrk_ext == getNonAdaptiveMrkExtension())
|
||||
if (mrk_ext == getNonAdaptiveMrkExtension(true)
|
||||
|| mrk_ext == getNonAdaptiveMrkExtension(false))
|
||||
return MergeTreeDataPartType::Wide;
|
||||
if (mrk_ext == getAdaptiveMrkExtension(MergeTreeDataPartType::Wide))
|
||||
if (mrk_ext == getAdaptiveMrkExtension(MergeTreeDataPartType::Wide, true)
|
||||
|| mrk_ext == getAdaptiveMrkExtension(MergeTreeDataPartType::Wide, false))
|
||||
return MergeTreeDataPartType::Wide;
|
||||
if (mrk_ext == getAdaptiveMrkExtension(MergeTreeDataPartType::Compact))
|
||||
if (mrk_ext == getAdaptiveMrkExtension(MergeTreeDataPartType::Compact, true)
|
||||
|| mrk_ext == getAdaptiveMrkExtension(MergeTreeDataPartType::Compact, false))
|
||||
return MergeTreeDataPartType::Compact;
|
||||
|
||||
throw Exception("Can't determine part type, because of unknown mark extension " + mrk_ext, ErrorCodes::UNKNOWN_PART_TYPE);
|
||||
|
@ -119,7 +119,8 @@ void MergeTreeDataPartWide::loadIndexGranularity()
|
||||
std::string(fs::path(data_part_storage->getFullPath()) / marks_file_path));
|
||||
|
||||
size_t marks_file_size = data_part_storage->getFileSize(marks_file_path);
|
||||
|
||||
if (!index_granularity_info.is_compress_marks)
|
||||
{
|
||||
if (!index_granularity_info.is_adaptive)
|
||||
{
|
||||
size_t marks_count = marks_file_size / index_granularity_info.getMarkSizeInBytes();
|
||||
@ -137,9 +138,31 @@ void MergeTreeDataPartWide::loadIndexGranularity()
|
||||
}
|
||||
|
||||
if (index_granularity.getMarksCount() * index_granularity_info.getMarkSizeInBytes() != marks_file_size)
|
||||
throw Exception(
|
||||
ErrorCodes::CANNOT_READ_ALL_DATA, "Cannot read all marks from file {}",
|
||||
std::string(fs::path(data_part_storage->getFullPath()) / marks_file_path));
|
||||
throw Exception("Cannot read all marks from file " + data_part_storage->getFullPath() + "/" + marks_file_path, ErrorCodes::CANNOT_READ_ALL_DATA);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
CompressedReadBufferFromFile buffer(
|
||||
data_part_storage->readFile(marks_file_path, ReadSettings().adjustBufferSize(marks_file_size), marks_file_size, std::nullopt));
|
||||
|
||||
MarksInCompressedFile mark(1);
|
||||
size_t marks_count = 0;
|
||||
while (!buffer.eof())
|
||||
{
|
||||
buffer.readStrict(reinterpret_cast<char *>(mark.data()), sizeof(size_t) * 2); /// skip offset_in_compressed file and offset_in_decompressed_block
|
||||
++marks_count;
|
||||
|
||||
if (index_granularity_info.is_adaptive)
|
||||
{
|
||||
size_t granularity;
|
||||
readIntBinary(granularity, buffer);
|
||||
index_granularity.appendMark(granularity);
|
||||
}
|
||||
}
|
||||
|
||||
if (!index_granularity_info.is_adaptive)
|
||||
index_granularity.resizeWithFixedGranularity(marks_count, index_granularity_info.fixed_index_granularity);
|
||||
}
|
||||
|
||||
index_granularity.setInitialized();
|
||||
|
@ -16,7 +16,11 @@ void MergeTreeDataPartWriterOnDisk::Stream::preFinalize()
|
||||
compressed.next();
|
||||
/// 'compressed_buf' doesn't call next() on underlying buffer ('plain_hashing'). We should do it manually.
|
||||
plain_hashing.next();
|
||||
marks.next();
|
||||
|
||||
if (is_compress_marks)
|
||||
marks_compressed.next();
|
||||
|
||||
marks_hashing.next();
|
||||
|
||||
plain_file->preFinalize();
|
||||
marks_file->preFinalize();
|
||||
@ -48,6 +52,8 @@ MergeTreeDataPartWriterOnDisk::Stream::Stream(
|
||||
const std::string & marks_file_extension_,
|
||||
const CompressionCodecPtr & compression_codec_,
|
||||
size_t max_compress_block_size_,
|
||||
const CompressionCodecPtr & marks_compression_codec_,
|
||||
size_t marks_compress_block_size_,
|
||||
const WriteSettings & query_write_settings) :
|
||||
escaped_column_name(escaped_column_name_),
|
||||
data_file_extension{data_file_extension_},
|
||||
@ -56,7 +62,11 @@ MergeTreeDataPartWriterOnDisk::Stream::Stream(
|
||||
plain_hashing(*plain_file),
|
||||
compressed_buf(plain_hashing, compression_codec_, max_compress_block_size_),
|
||||
compressed(compressed_buf),
|
||||
marks_file(data_part_storage_builder->writeFile(marks_path_ + marks_file_extension, 4096, query_write_settings)), marks(*marks_file)
|
||||
marks_file(data_part_storage_builder->writeFile(marks_path_ + marks_file_extension, 4096, query_write_settings)),
|
||||
marks_hashing(*marks_file),
|
||||
marks_compressed_buf(marks_hashing, marks_compression_codec_, marks_compress_block_size_),
|
||||
marks_compressed(marks_compressed_buf),
|
||||
is_compress_marks(isCompressFromMrkExtension(marks_file_extension))
|
||||
{
|
||||
}
|
||||
|
||||
@ -70,8 +80,14 @@ void MergeTreeDataPartWriterOnDisk::Stream::addToChecksums(MergeTreeData::DataPa
|
||||
checksums.files[name + data_file_extension].file_size = plain_hashing.count();
|
||||
checksums.files[name + data_file_extension].file_hash = plain_hashing.getHash();
|
||||
|
||||
checksums.files[name + marks_file_extension].file_size = marks.count();
|
||||
checksums.files[name + marks_file_extension].file_hash = marks.getHash();
|
||||
if (is_compress_marks)
|
||||
{
|
||||
checksums.files[name + marks_file_extension].is_compressed = true;
|
||||
checksums.files[name + marks_file_extension].uncompressed_size = marks_compressed.count();
|
||||
checksums.files[name + marks_file_extension].uncompressed_hash = marks_compressed.getHash();
|
||||
}
|
||||
checksums.files[name + marks_file_extension].file_size = marks_hashing.count();
|
||||
checksums.files[name + marks_file_extension].file_hash = marks_hashing.getHash();
|
||||
}
|
||||
|
||||
|
||||
@ -91,6 +107,7 @@ MergeTreeDataPartWriterOnDisk::MergeTreeDataPartWriterOnDisk(
|
||||
, marks_file_extension(marks_file_extension_)
|
||||
, default_codec(default_codec_)
|
||||
, compute_granularity(index_granularity.empty())
|
||||
, is_compress_primary_key(settings.is_compress_primary_key)
|
||||
{
|
||||
if (settings.blocks_are_granules_size && !index_granularity.empty())
|
||||
throw Exception("Can't take information about index granularity from blocks, when non empty index_granularity array specified", ErrorCodes::LOGICAL_ERROR);
|
||||
@ -156,13 +173,27 @@ void MergeTreeDataPartWriterOnDisk::initPrimaryIndex()
|
||||
{
|
||||
if (metadata_snapshot->hasPrimaryKey())
|
||||
{
|
||||
index_file_stream = data_part_storage_builder->writeFile("primary.idx", DBMS_DEFAULT_BUFFER_SIZE, settings.query_write_settings);
|
||||
index_stream = std::make_unique<HashingWriteBuffer>(*index_file_stream);
|
||||
String index_name = "primary" + getIndexExtension(is_compress_primary_key);
|
||||
index_file_stream = data_part_storage_builder->writeFile(index_name, DBMS_DEFAULT_BUFFER_SIZE, settings.query_write_settings);
|
||||
index_hashing_stream = std::make_unique<HashingWriteBuffer>(*index_file_stream);
|
||||
|
||||
if (is_compress_primary_key)
|
||||
{
|
||||
ParserCodec codec_parser;
|
||||
auto ast = parseQuery(codec_parser, "(" + Poco::toUpper(settings.primary_key_compression_codec) + ")", 0, DBMS_DEFAULT_MAX_PARSER_DEPTH);
|
||||
CompressionCodecPtr primary_key_compression_codec = CompressionCodecFactory::instance().get(ast, nullptr);
|
||||
index_compressed_buf = std::make_unique<CompressedWriteBuffer>(*index_hashing_stream, primary_key_compression_codec, settings.primary_key_compress_block_size);
|
||||
index_compressed_stream = std::make_unique<HashingWriteBuffer>(*index_compressed_buf);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void MergeTreeDataPartWriterOnDisk::initSkipIndices()
|
||||
{
|
||||
ParserCodec codec_parser;
|
||||
auto ast = parseQuery(codec_parser, "(" + Poco::toUpper(settings.marks_compression_codec) + ")", 0, DBMS_DEFAULT_MAX_PARSER_DEPTH);
|
||||
CompressionCodecPtr marks_compression_codec = CompressionCodecFactory::instance().get(ast, nullptr);
|
||||
|
||||
for (const auto & index_helper : skip_indices)
|
||||
{
|
||||
String stream_name = index_helper->getFileName();
|
||||
@ -172,7 +203,9 @@ void MergeTreeDataPartWriterOnDisk::initSkipIndices()
|
||||
data_part_storage_builder,
|
||||
stream_name, index_helper->getSerializedFileExtension(),
|
||||
stream_name, marks_file_extension,
|
||||
default_codec, settings.max_compress_block_size, settings.query_write_settings));
|
||||
default_codec, settings.max_compress_block_size,
|
||||
marks_compression_codec, settings.marks_compress_block_size,
|
||||
settings.query_write_settings));
|
||||
skip_indices_aggregators.push_back(index_helper->createIndexAggregator());
|
||||
skip_index_accumulated_marks.push_back(0);
|
||||
}
|
||||
@ -208,7 +241,8 @@ void MergeTreeDataPartWriterOnDisk::calculateAndSerializePrimaryIndex(const Bloc
|
||||
{
|
||||
const auto & primary_column = primary_index_block.getByPosition(j);
|
||||
index_columns[j]->insertFrom(*primary_column.column, granule.start_row);
|
||||
primary_column.type->getDefaultSerialization()->serializeBinary(*primary_column.column, granule.start_row, *index_stream);
|
||||
primary_column.type->getDefaultSerialization()->serializeBinary(
|
||||
*primary_column.column, granule.start_row, is_compress_primary_key ? *index_compressed_stream : *index_hashing_stream);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -241,12 +275,12 @@ void MergeTreeDataPartWriterOnDisk::calculateAndSerializeSkipIndices(const Block
|
||||
if (stream.compressed.offset() >= settings.min_compress_block_size)
|
||||
stream.compressed.next();
|
||||
|
||||
writeIntBinary(stream.plain_hashing.count(), stream.marks);
|
||||
writeIntBinary(stream.compressed.offset(), stream.marks);
|
||||
writeIntBinary(stream.plain_hashing.count(), stream.is_compress_marks? stream.marks_compressed : stream.marks_hashing);
|
||||
writeIntBinary(stream.compressed.offset(), stream.is_compress_marks? stream.marks_compressed : stream.marks_hashing);
|
||||
/// Actually this numbers is redundant, but we have to store them
|
||||
/// to be compatible with normal .mrk2 file format
|
||||
if (settings.can_use_adaptive_granularity)
|
||||
writeIntBinary(1UL, stream.marks);
|
||||
writeIntBinary(1UL, stream.is_compress_marks? stream.marks_compressed : stream.marks_hashing);
|
||||
}
|
||||
|
||||
size_t pos = granule.start_row;
|
||||
@ -263,7 +297,7 @@ void MergeTreeDataPartWriterOnDisk::fillPrimaryIndexChecksums(MergeTreeData::Dat
|
||||
if (write_final_mark && compute_granularity)
|
||||
index_granularity.appendMark(0);
|
||||
|
||||
if (index_stream)
|
||||
if (index_hashing_stream)
|
||||
{
|
||||
if (write_final_mark)
|
||||
{
|
||||
@ -272,26 +306,44 @@ void MergeTreeDataPartWriterOnDisk::fillPrimaryIndexChecksums(MergeTreeData::Dat
|
||||
const auto & column = *last_block_index_columns[j];
|
||||
size_t last_row_number = column.size() - 1;
|
||||
index_columns[j]->insertFrom(column, last_row_number);
|
||||
index_types[j]->getDefaultSerialization()->serializeBinary(column, last_row_number, *index_stream);
|
||||
index_types[j]->getDefaultSerialization()->serializeBinary(
|
||||
column, last_row_number, is_compress_primary_key ? *index_compressed_stream : *index_hashing_stream);
|
||||
}
|
||||
last_block_index_columns.clear();
|
||||
}
|
||||
|
||||
index_stream->next();
|
||||
checksums.files["primary.idx"].file_size = index_stream->count();
|
||||
checksums.files["primary.idx"].file_hash = index_stream->getHash();
|
||||
if (is_compress_primary_key)
|
||||
index_compressed_stream->next();
|
||||
|
||||
index_hashing_stream->next();
|
||||
|
||||
String index_name = "primary" + getIndexExtension(is_compress_primary_key);
|
||||
if (is_compress_primary_key)
|
||||
{
|
||||
checksums.files[index_name].is_compressed = true;
|
||||
checksums.files[index_name].uncompressed_size = index_compressed_stream->count();
|
||||
checksums.files[index_name].uncompressed_hash = index_compressed_stream->getHash();
|
||||
}
|
||||
checksums.files[index_name].file_size = index_hashing_stream->count();
|
||||
checksums.files[index_name].file_hash = index_hashing_stream->getHash();
|
||||
index_file_stream->preFinalize();
|
||||
}
|
||||
}
|
||||
|
||||
void MergeTreeDataPartWriterOnDisk::finishPrimaryIndexSerialization(bool sync)
|
||||
{
|
||||
if (index_stream)
|
||||
if (index_hashing_stream)
|
||||
{
|
||||
index_file_stream->finalize();
|
||||
if (sync)
|
||||
index_file_stream->sync();
|
||||
index_stream = nullptr;
|
||||
|
||||
if (is_compress_primary_key)
|
||||
{
|
||||
index_compressed_stream = nullptr;
|
||||
index_compressed_buf = nullptr;
|
||||
}
|
||||
index_hashing_stream = nullptr;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -8,7 +8,8 @@
|
||||
#include <Storages/MergeTree/MergeTreeData.h>
|
||||
#include <Storages/MergeTree/IMergeTreeDataPart.h>
|
||||
#include <Disks/IDisk.h>
|
||||
|
||||
#include <Parsers/ExpressionElementParsers.h>
|
||||
#include <Parsers/parseQuery.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -56,6 +57,8 @@ public:
|
||||
const std::string & marks_file_extension_,
|
||||
const CompressionCodecPtr & compression_codec_,
|
||||
size_t max_compress_block_size_,
|
||||
const CompressionCodecPtr & marks_compression_codec_,
|
||||
size_t marks_compress_block_size_,
|
||||
const WriteSettings & query_write_settings);
|
||||
|
||||
String escaped_column_name;
|
||||
@ -68,9 +71,12 @@ public:
|
||||
CompressedWriteBuffer compressed_buf;
|
||||
HashingWriteBuffer compressed;
|
||||
|
||||
/// marks -> marks_file
|
||||
/// marks -> marks_file -> marks_compressed_buf -> marks_compressed
|
||||
std::unique_ptr<WriteBufferFromFileBase> marks_file;
|
||||
HashingWriteBuffer marks;
|
||||
HashingWriteBuffer marks_hashing;
|
||||
CompressedWriteBuffer marks_compressed_buf;
|
||||
HashingWriteBuffer marks_compressed;
|
||||
bool is_compress_marks;
|
||||
|
||||
bool is_prefinalized = false;
|
||||
|
||||
@ -139,7 +145,11 @@ protected:
|
||||
std::vector<size_t> skip_index_accumulated_marks;
|
||||
|
||||
std::unique_ptr<WriteBufferFromFileBase> index_file_stream;
|
||||
std::unique_ptr<HashingWriteBuffer> index_stream;
|
||||
std::unique_ptr<HashingWriteBuffer> index_hashing_stream;
|
||||
std::unique_ptr<CompressedWriteBuffer> index_compressed_buf;
|
||||
std::unique_ptr<HashingWriteBuffer> index_compressed_stream;
|
||||
bool is_compress_primary_key;
|
||||
|
||||
DataTypes index_types;
|
||||
/// Index columns from the last block
|
||||
/// It's written to index file in the `writeSuffixAndFinalizePart` method
|
||||
|
@ -110,6 +110,10 @@ void MergeTreeDataPartWriterWide::addStreams(
|
||||
else /// otherwise return only generic codecs and don't use info about the` data_type
|
||||
compression_codec = CompressionCodecFactory::instance().get(effective_codec_desc, nullptr, default_codec, true);
|
||||
|
||||
ParserCodec codec_parser;
|
||||
auto ast = parseQuery(codec_parser, "(" + Poco::toUpper(settings.marks_compression_codec) + ")", 0, DBMS_DEFAULT_MAX_PARSER_DEPTH);
|
||||
CompressionCodecPtr marks_compression_codec = CompressionCodecFactory::instance().get(ast, nullptr);
|
||||
|
||||
column_streams[stream_name] = std::make_unique<Stream>(
|
||||
stream_name,
|
||||
data_part_storage_builder,
|
||||
@ -117,6 +121,8 @@ void MergeTreeDataPartWriterWide::addStreams(
|
||||
stream_name, marks_file_extension,
|
||||
compression_codec,
|
||||
settings.max_compress_block_size,
|
||||
marks_compression_codec,
|
||||
settings.marks_compress_block_size,
|
||||
settings.query_write_settings);
|
||||
};
|
||||
|
||||
@ -266,10 +272,10 @@ void MergeTreeDataPartWriterWide::writeSingleMark(
|
||||
void MergeTreeDataPartWriterWide::flushMarkToFile(const StreamNameAndMark & stream_with_mark, size_t rows_in_mark)
|
||||
{
|
||||
Stream & stream = *column_streams[stream_with_mark.stream_name];
|
||||
writeIntBinary(stream_with_mark.mark.offset_in_compressed_file, stream.marks);
|
||||
writeIntBinary(stream_with_mark.mark.offset_in_decompressed_block, stream.marks);
|
||||
writeIntBinary(stream_with_mark.mark.offset_in_compressed_file, stream.is_compress_marks? stream.marks_compressed : stream.marks_hashing);
|
||||
writeIntBinary(stream_with_mark.mark.offset_in_decompressed_block, stream.is_compress_marks? stream.marks_compressed : stream.marks_hashing);
|
||||
if (settings.can_use_adaptive_granularity)
|
||||
writeIntBinary(rows_in_mark, stream.marks);
|
||||
writeIntBinary(rows_in_mark, stream.is_compress_marks? stream.marks_compressed : stream.marks_hashing);
|
||||
}
|
||||
|
||||
StreamsWithMarks MergeTreeDataPartWriterWide::getCurrentMarksForColumn(
|
||||
@ -420,7 +426,10 @@ void MergeTreeDataPartWriterWide::validateColumnOfFixedSize(const NameAndTypePai
|
||||
if (!data_part_storage->exists(mrk_path))
|
||||
return;
|
||||
|
||||
auto mrk_in = data_part_storage->readFile(mrk_path, {}, std::nullopt, std::nullopt);
|
||||
auto mrk_file_in = data_part_storage->readFile(mrk_path, {}, std::nullopt, std::nullopt);
|
||||
DB::CompressedReadBufferFromFile mrk_compressed_in(data_part_storage->readFile(mrk_path, {}, std::nullopt, std::nullopt));
|
||||
ReadBuffer & mrk_in = data_part->index_granularity_info.is_compress_marks? static_cast<ReadBuffer &>(mrk_compressed_in)
|
||||
: *mrk_file_in;
|
||||
DB::CompressedReadBufferFromFile bin_in(data_part_storage->readFile(bin_path, {}, std::nullopt, std::nullopt));
|
||||
bool must_be_last = false;
|
||||
UInt64 offset_in_compressed_file = 0;
|
||||
@ -429,15 +438,15 @@ void MergeTreeDataPartWriterWide::validateColumnOfFixedSize(const NameAndTypePai
|
||||
|
||||
size_t mark_num;
|
||||
|
||||
for (mark_num = 0; !mrk_in->eof(); ++mark_num)
|
||||
for (mark_num = 0; !mrk_in.eof(); ++mark_num)
|
||||
{
|
||||
if (mark_num > index_granularity.getMarksCount())
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Incorrect number of marks in memory {}, on disk (at least) {}", index_granularity.getMarksCount(), mark_num + 1);
|
||||
|
||||
DB::readBinary(offset_in_compressed_file, *mrk_in);
|
||||
DB::readBinary(offset_in_decompressed_block, *mrk_in);
|
||||
DB::readBinary(offset_in_compressed_file, mrk_in);
|
||||
DB::readBinary(offset_in_decompressed_block, mrk_in);
|
||||
if (settings.can_use_adaptive_granularity)
|
||||
DB::readBinary(index_granularity_rows, *mrk_in);
|
||||
DB::readBinary(index_granularity_rows, mrk_in);
|
||||
else
|
||||
index_granularity_rows = data_part->index_granularity_info.fixed_index_granularity;
|
||||
|
||||
@ -446,7 +455,7 @@ void MergeTreeDataPartWriterWide::validateColumnOfFixedSize(const NameAndTypePai
|
||||
if (index_granularity_rows != 0)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "We ran out of binary data but still have non empty mark #{} with rows number {}", mark_num, index_granularity_rows);
|
||||
|
||||
if (!mrk_in->eof())
|
||||
if (!mrk_in.eof())
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Mark #{} must be last, but we still have some to read", mark_num);
|
||||
|
||||
break;
|
||||
@ -508,7 +517,7 @@ void MergeTreeDataPartWriterWide::validateColumnOfFixedSize(const NameAndTypePai
|
||||
}
|
||||
}
|
||||
|
||||
if (!mrk_in->eof())
|
||||
if (!mrk_in.eof())
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR,
|
||||
"Still have something in marks stream, last mark #{} index granularity size {}, last rows {}", mark_num, index_granularity.getMarksCount(), index_granularity_rows);
|
||||
if (!bin_in.eof())
|
||||
|
@ -43,6 +43,11 @@ struct MergeTreeWriterSettings
|
||||
, max_compress_block_size(
|
||||
storage_settings->max_compress_block_size ? storage_settings->max_compress_block_size
|
||||
: global_settings.max_compress_block_size)
|
||||
, marks_compression_codec(storage_settings->marks_compression_codec)
|
||||
, marks_compress_block_size(storage_settings->marks_compress_block_size)
|
||||
, is_compress_primary_key(storage_settings->compress_primary_key)
|
||||
, primary_key_compression_codec(storage_settings->primary_key_compression_codec)
|
||||
, primary_key_compress_block_size(storage_settings->primary_key_compress_block_size)
|
||||
, can_use_adaptive_granularity(can_use_adaptive_granularity_)
|
||||
, rewrite_primary_key(rewrite_primary_key_)
|
||||
, blocks_are_granules_size(blocks_are_granules_size_)
|
||||
@ -52,6 +57,14 @@ struct MergeTreeWriterSettings
|
||||
|
||||
size_t min_compress_block_size;
|
||||
size_t max_compress_block_size;
|
||||
|
||||
String marks_compression_codec;
|
||||
size_t marks_compress_block_size;
|
||||
|
||||
bool is_compress_primary_key;
|
||||
String primary_key_compression_codec;
|
||||
size_t primary_key_compress_block_size;
|
||||
|
||||
bool can_use_adaptive_granularity;
|
||||
bool rewrite_primary_key;
|
||||
bool blocks_are_granules_size;
|
||||
|
@ -20,9 +20,12 @@ std::optional<std::string> MergeTreeIndexGranularityInfo::getMarksExtensionFromF
|
||||
for (auto it = data_part_storage->iterate(); it->isValid(); it->next())
|
||||
{
|
||||
const auto & ext = fs::path(it->name()).extension();
|
||||
if (ext == getNonAdaptiveMrkExtension()
|
||||
|| ext == getAdaptiveMrkExtension(MergeTreeDataPartType::Wide)
|
||||
|| ext == getAdaptiveMrkExtension(MergeTreeDataPartType::Compact))
|
||||
if (ext == getNonAdaptiveMrkExtension(false)
|
||||
|| ext == getNonAdaptiveMrkExtension(true)
|
||||
|| ext == getAdaptiveMrkExtension(MergeTreeDataPartType::Wide, false)
|
||||
|| ext == getAdaptiveMrkExtension(MergeTreeDataPartType::Wide, true)
|
||||
|| ext == getAdaptiveMrkExtension(MergeTreeDataPartType::Compact, false)
|
||||
|| ext == getAdaptiveMrkExtension(MergeTreeDataPartType::Compact, true))
|
||||
return ext;
|
||||
}
|
||||
}
|
||||
@ -34,6 +37,7 @@ MergeTreeIndexGranularityInfo::MergeTreeIndexGranularityInfo(const MergeTreeData
|
||||
{
|
||||
const auto storage_settings = storage.getSettings();
|
||||
fixed_index_granularity = storage_settings->index_granularity;
|
||||
is_compress_marks = storage_settings->compress_marks;
|
||||
|
||||
/// Granularity is fixed
|
||||
if (!storage.canUseAdaptiveGranularity())
|
||||
@ -49,21 +53,21 @@ MergeTreeIndexGranularityInfo::MergeTreeIndexGranularityInfo(const MergeTreeData
|
||||
void MergeTreeIndexGranularityInfo::changeGranularityIfRequired(const DataPartStoragePtr & data_part_storage)
|
||||
{
|
||||
auto mrk_ext = getMarksExtensionFromFilesystem(data_part_storage);
|
||||
if (mrk_ext && *mrk_ext == getNonAdaptiveMrkExtension())
|
||||
if (mrk_ext && *mrk_ext == getNonAdaptiveMrkExtension(is_compress_marks))
|
||||
setNonAdaptive();
|
||||
}
|
||||
|
||||
void MergeTreeIndexGranularityInfo::setAdaptive(size_t index_granularity_bytes_)
|
||||
{
|
||||
is_adaptive = true;
|
||||
marks_file_extension = getAdaptiveMrkExtension(type);
|
||||
marks_file_extension = getAdaptiveMrkExtension(type, is_compress_marks);
|
||||
index_granularity_bytes = index_granularity_bytes_;
|
||||
}
|
||||
|
||||
void MergeTreeIndexGranularityInfo::setNonAdaptive()
|
||||
{
|
||||
is_adaptive = false;
|
||||
marks_file_extension = getNonAdaptiveMrkExtension();
|
||||
marks_file_extension = getNonAdaptiveMrkExtension(is_compress_marks);
|
||||
index_granularity_bytes = 0;
|
||||
}
|
||||
|
||||
@ -85,12 +89,12 @@ size_t getAdaptiveMrkSizeCompact(size_t columns_num)
|
||||
return sizeof(UInt64) * (columns_num * 2 + 1);
|
||||
}
|
||||
|
||||
std::string getAdaptiveMrkExtension(MergeTreeDataPartType part_type)
|
||||
std::string getAdaptiveMrkExtension(MergeTreeDataPartType part_type, bool is_compress_marks)
|
||||
{
|
||||
if (part_type == MergeTreeDataPartType::Wide)
|
||||
return ".mrk2";
|
||||
return is_compress_marks ? ".cmrk2" : ".mrk2";
|
||||
else if (part_type == MergeTreeDataPartType::Compact)
|
||||
return ".mrk3";
|
||||
return is_compress_marks ? ".cmrk3" : ".mrk3";
|
||||
else if (part_type == MergeTreeDataPartType::InMemory)
|
||||
return "";
|
||||
else
|
||||
|
@ -27,6 +27,9 @@ public:
|
||||
/// Approximate bytes size of one granule
|
||||
size_t index_granularity_bytes = 0;
|
||||
|
||||
/// Whether to compress marks
|
||||
bool is_compress_marks;
|
||||
|
||||
MergeTreeIndexGranularityInfo(const MergeTreeData & storage, MergeTreeDataPartType type_);
|
||||
|
||||
void changeGranularityIfRequired(const DataPartStoragePtr & data_part_storage);
|
||||
@ -46,10 +49,10 @@ private:
|
||||
void setNonAdaptive();
|
||||
};
|
||||
|
||||
constexpr inline auto getNonAdaptiveMrkExtension() { return ".mrk"; }
|
||||
constexpr inline auto getNonAdaptiveMrkExtension(bool is_compress_marks) { return is_compress_marks ? ".cmrk" : ".mrk"; }
|
||||
constexpr inline auto getNonAdaptiveMrkSizeWide() { return sizeof(UInt64) * 2; }
|
||||
constexpr inline auto getAdaptiveMrkSizeWide() { return sizeof(UInt64) * 3; }
|
||||
inline size_t getAdaptiveMrkSizeCompact(size_t columns_num);
|
||||
std::string getAdaptiveMrkExtension(MergeTreeDataPartType part_type);
|
||||
std::string getAdaptiveMrkExtension(MergeTreeDataPartType part_type, bool is_compress_marks);
|
||||
|
||||
}
|
||||
|
@ -2,6 +2,7 @@
|
||||
#include <Storages/MergeTree/MergeTreeData.h>
|
||||
#include <Common/MemoryTrackerBlockerInThread.h>
|
||||
#include <IO/ReadBufferFromFile.h>
|
||||
#include <Compression/CompressedReadBufferFromFile.h>
|
||||
|
||||
#include <utility>
|
||||
|
||||
@ -54,6 +55,10 @@ MarkCache::MappedPtr MergeTreeMarksLoader::loadMarksImpl()
|
||||
size_t mark_size = index_granularity_info.getMarkSizeInBytes(columns_in_mark);
|
||||
size_t expected_file_size = mark_size * marks_count;
|
||||
|
||||
auto res = std::make_shared<MarksInCompressedFile>(marks_count * columns_in_mark);
|
||||
|
||||
if (!index_granularity_info.is_compress_marks)
|
||||
{
|
||||
if (expected_file_size != file_size)
|
||||
throw Exception(
|
||||
ErrorCodes::CORRUPTED_DATA,
|
||||
@ -61,8 +66,6 @@ MarkCache::MappedPtr MergeTreeMarksLoader::loadMarksImpl()
|
||||
std::string(fs::path(data_part_storage->getFullPath()) / mrk_path),
|
||||
std::to_string(file_size), std::to_string(expected_file_size));
|
||||
|
||||
auto res = std::make_shared<MarksInCompressedFile>(marks_count * columns_in_mark);
|
||||
|
||||
if (!index_granularity_info.is_adaptive)
|
||||
{
|
||||
/// Read directly to marks.
|
||||
@ -87,6 +90,34 @@ MarkCache::MappedPtr MergeTreeMarksLoader::loadMarksImpl()
|
||||
if (i * mark_size != file_size)
|
||||
throw Exception("Cannot read all marks from file " + mrk_path, ErrorCodes::CANNOT_READ_ALL_DATA);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
if (!index_granularity_info.is_adaptive)
|
||||
{
|
||||
CompressedReadBufferFromFile buffer(data_part_storage->readFile(mrk_path, ReadSettings().adjustBufferSize(file_size), file_size, std::nullopt));
|
||||
buffer.readStrict(reinterpret_cast<char *>(res->data()), expected_file_size);
|
||||
|
||||
if (!buffer.eof())
|
||||
throw Exception("Cannot read all marks from file " + mrk_path + ", eof: " + std::to_string(buffer.eof())
|
||||
+ ", buffer size: " + std::to_string(buffer.buffer().size()) + ", file size: " + std::to_string(file_size), ErrorCodes::CANNOT_READ_ALL_DATA);
|
||||
}
|
||||
else
|
||||
{
|
||||
CompressedReadBufferFromFile buffer(data_part_storage->readFile(mrk_path, ReadSettings().adjustBufferSize(file_size), file_size, std::nullopt));
|
||||
size_t i = 0;
|
||||
size_t granularity;
|
||||
while (!buffer.eof())
|
||||
{
|
||||
res->read(buffer, i * columns_in_mark, columns_in_mark);
|
||||
readIntBinary(granularity, buffer);
|
||||
++i;
|
||||
}
|
||||
|
||||
if (i != marks_count)
|
||||
throw Exception("Cannot read all marks from file " + mrk_path, ErrorCodes::CANNOT_READ_ALL_DATA);
|
||||
}
|
||||
}
|
||||
res->protect();
|
||||
return res;
|
||||
}
|
||||
|
@ -152,7 +152,15 @@ struct Settings;
|
||||
M(UInt64, replicated_max_parallel_sends_for_table, 0, "Obsolete setting, does nothing.", 0) \
|
||||
M(UInt64, replicated_max_parallel_fetches, 0, "Obsolete setting, does nothing.", 0) \
|
||||
M(UInt64, replicated_max_parallel_fetches_for_table, 0, "Obsolete setting, does nothing.", 0) \
|
||||
M(Bool, write_final_mark, true, "Obsolete setting, does nothing.", 0)
|
||||
M(Bool, write_final_mark, true, "Obsolete setting, does nothing.", 0) \
|
||||
\
|
||||
/** compress marks and primary */ \
|
||||
M(Bool, compress_marks, false, "Marks support compression, reduce mark file size and speed up network transmission.", 0) \
|
||||
M(Bool, compress_primary_key, false, "Primary key support compression, reduce primary key file size and speed up network transmission.", 0) \
|
||||
M(String, marks_compression_codec, "ZSTD(3)", "Compression encoding used by marks, marks are small enough and cached, so the default compression is ZSTD(3).", 0) \
|
||||
M(String, primary_key_compression_codec, "ZSTD(3)", "Compression encoding used by primary, primary key are small enough and cached, so the default compression is ZSTD(3).", 0) \
|
||||
M(UInt64, marks_compress_block_size, 65536, "Mark compress block size, the actual size of the block to compress.", 0) \
|
||||
M(UInt64, primary_key_compress_block_size, 65536, "Primary compress block size, the actual size of the block to compress.", 0) \
|
||||
/// Settings that should not change after the creation of a table.
|
||||
/// NOLINTNEXTLINE
|
||||
#define APPLY_FOR_IMMUTABLE_MERGE_TREE_SETTINGS(M) \
|
||||
|
@ -1559,8 +1559,7 @@ bool MutateTask::prepare()
|
||||
ctx->new_data_part->partition.assign(ctx->source_part->partition);
|
||||
|
||||
/// Don't change granularity type while mutating subset of columns
|
||||
ctx->mrk_extension = ctx->source_part->index_granularity_info.is_adaptive ? getAdaptiveMrkExtension(ctx->new_data_part->getType())
|
||||
: getNonAdaptiveMrkExtension();
|
||||
ctx->mrk_extension = ctx->source_part->index_granularity_info.marks_file_extension;
|
||||
|
||||
const auto data_settings = ctx->data->getSettings();
|
||||
ctx->need_sync = needSyncPart(ctx->source_part->rows_count, ctx->source_part->getBytesOnDisk(), *data_settings);
|
||||
|
@ -1,6 +1,7 @@
|
||||
#include "PartMetadataManagerOrdinary.h"
|
||||
|
||||
#include <IO/ReadBufferFromFileBase.h>
|
||||
#include <Compression/CompressedReadBufferFromFile.h>
|
||||
#include <Disks/IDisk.h>
|
||||
#include <Storages/MergeTree/IMergeTreeDataPart.h>
|
||||
|
||||
@ -18,9 +19,13 @@ PartMetadataManagerOrdinary::PartMetadataManagerOrdinary(const IMergeTreeDataPar
|
||||
}
|
||||
|
||||
|
||||
std::unique_ptr<SeekableReadBuffer> PartMetadataManagerOrdinary::read(const String & file_name) const
|
||||
std::unique_ptr<ReadBuffer> PartMetadataManagerOrdinary::read(const String & file_name) const
|
||||
{
|
||||
if (!isCompressFromFileName(file_name))
|
||||
return openForReading(part->data_part_storage, file_name);
|
||||
|
||||
String file_path = fs::path(part->data_part_storage->getRelativePath()) / file_name;
|
||||
return std::make_unique<CompressedReadBufferFromFile>(openForReading(part->data_part_storage, file_path));
|
||||
}
|
||||
|
||||
bool PartMetadataManagerOrdinary::exists(const String & file_name) const
|
||||
|
@ -12,7 +12,7 @@ public:
|
||||
|
||||
~PartMetadataManagerOrdinary() override = default;
|
||||
|
||||
std::unique_ptr<SeekableReadBuffer> read(const String & file_name) const override;
|
||||
std::unique_ptr<ReadBuffer> read(const String & file_name) const override;
|
||||
|
||||
bool exists(const String & file_name) const override;
|
||||
|
||||
|
@ -5,6 +5,7 @@
|
||||
#include <Common/ErrorCodes.h>
|
||||
#include <IO/HashingReadBuffer.h>
|
||||
#include <IO/ReadBufferFromString.h>
|
||||
#include <Compression/CompressedReadBuffer.h>
|
||||
#include <Storages/MergeTree/IMergeTreeDataPart.h>
|
||||
|
||||
namespace ProfileEvents
|
||||
@ -38,7 +39,7 @@ String PartMetadataManagerWithCache::getFilePathFromKey(const String & key) cons
|
||||
return key.substr(part->data_part_storage->getDiskName().size() + 1);
|
||||
}
|
||||
|
||||
std::unique_ptr<SeekableReadBuffer> PartMetadataManagerWithCache::read(const String & file_name) const
|
||||
std::unique_ptr<ReadBuffer> PartMetadataManagerWithCache::read(const String & file_name) const
|
||||
{
|
||||
String file_path = fs::path(part->data_part_storage->getRelativePath()) / file_name;
|
||||
String key = getKeyFromFilePath(file_path);
|
||||
@ -47,8 +48,16 @@ std::unique_ptr<SeekableReadBuffer> PartMetadataManagerWithCache::read(const Str
|
||||
if (!status.ok())
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::MergeTreeMetadataCacheMiss);
|
||||
if (!isCompressFromFileName(file_name))
|
||||
{
|
||||
auto in = part->data_part_storage->readFile(file_name, {}, std::nullopt, std::nullopt);
|
||||
readStringUntilEOF(value, *in);
|
||||
}
|
||||
else
|
||||
{
|
||||
auto in = CompressedReadBuffer(*part->data_part_storage->readFile(file_name, {}, std::nullopt, std::nullopt));
|
||||
readStringUntilEOF(value, in);
|
||||
}
|
||||
cache->put(key, value);
|
||||
}
|
||||
else
|
||||
|
@ -19,7 +19,7 @@ public:
|
||||
~PartMetadataManagerWithCache() override = default;
|
||||
|
||||
/// First read the metadata from RocksDB cache, then from disk.
|
||||
std::unique_ptr<SeekableReadBuffer> read(const String & file_name) const override;
|
||||
std::unique_ptr<ReadBuffer> read(const String & file_name) const override;
|
||||
|
||||
/// First judge existence of the metadata in RocksDB cache, then in disk.
|
||||
bool exists(const String & file_name) const override;
|
||||
@ -48,7 +48,6 @@ private:
|
||||
/// Get cache keys and checksums of corresponding metadata in a part(including projection parts)
|
||||
void getKeysAndCheckSums(Strings & keys, std::vector<uint128> & checksums) const;
|
||||
|
||||
|
||||
MergeTreeMetadataCachePtr cache;
|
||||
};
|
||||
|
||||
|
@ -0,0 +1,4 @@
|
||||
test_02381 11904
|
||||
test_02381_compress 1658
|
||||
1000000
|
||||
1000 10000
|
@ -0,0 +1,17 @@
|
||||
drop table if exists test_02381;
|
||||
create table test_02381(a UInt64, b UInt64) ENGINE = MergeTree order by (a, b);
|
||||
insert into test_02381 select number, number from system.numbers limit 1000000;
|
||||
|
||||
drop table if exists test_02381_compress;
|
||||
create table test_02381_compress(a UInt64, b UInt64) ENGINE = MergeTree order by (a, b)
|
||||
SETTINGS compress_marks=1, compress_primary_key=1, marks_compression_codec='ZSTD(3)', primary_key_compression_codec='ZSTD(3)', marks_compress_block_size=65536, primary_key_compress_block_size=65536;
|
||||
insert into test_02381_compress select number, number * 10 from system.numbers limit 1000000;
|
||||
|
||||
-- Compare the size of marks on disk
|
||||
select table,sum(marks_bytes) from system.parts_columns where database = currentDatabase() and table like 'test_02381%' group by table order by table;
|
||||
|
||||
select count(*) from test_02381_compress;
|
||||
select * from test_02381_compress where a = 1000;
|
||||
|
||||
drop table if exists test_02381;
|
||||
drop table if exists test_02381_compress;
|
Loading…
Reference in New Issue
Block a user