Fix strange code

This commit is contained in:
Alexey Milovidov 2022-09-05 06:31:19 +02:00
parent 6bc814a860
commit b4eec0e6f4
13 changed files with 219 additions and 125 deletions

View File

@ -65,8 +65,10 @@ namespace ErrorCodes
extern const int BAD_TTL_FILE;
extern const int NOT_IMPLEMENTED;
extern const int NO_SUCH_COLUMN_IN_TABLE;
extern const int INCORRECT_FILE_NAME;
}
void IMergeTreeDataPart::MinMaxIndex::load(const MergeTreeData & data, const PartMetadataManagerPtr & manager)
{
auto metadata_snapshot = data.getInMemoryMetadataPtr();
@ -1807,12 +1809,74 @@ bool isCompressedFromIndexExtension(const String & index_extension)
return index_extension == getIndexExtension(true);
}
bool isCompressedFromMrkExtension(const String & mrk_extension)
MarkType::MarkType(std::string_view extension)
{
return mrk_extension == getNonAdaptiveMrkExtension(true)
|| mrk_extension == getAdaptiveMrkExtension(MergeTreeDataPartType::Wide, true)
|| mrk_extension == getAdaptiveMrkExtension(MergeTreeDataPartType::Compact, true)
|| mrk_extension == getAdaptiveMrkExtension(MergeTreeDataPartType::InMemory, true);
if (extension.starts_with('c'))
{
compressed = true;
extension = extension.substr(1);
}
if (!extension.starts_with("mrk"))
throw Exception(ErrorCodes::INCORRECT_FILE_NAME, "Mark file extension does not start with .mrk or .cmrk");
extension = extension.substr(strlen("mrk"));
if (extension.empty())
{
adaptive = false;
part_type = MergeTreeDataPartType::Wide;
}
else if (extension == "2")
{
adaptive = true;
part_type = MergeTreeDataPartType::Wide;
}
else if (extension == "3")
{
adaptive = true;
part_type = MergeTreeDataPartType::Compact;
}
else
throw Exception(ErrorCodes::INCORRECT_FILE_NAME, "Unknown mark file extension: '{}'", extension);
}
MarkType::MarkType(bool adaptive_, bool compressed_, MergeTreeDataPartType::Value part_type_)
: adaptive(adaptive_), compressed(compressed_), part_type(part_type_)
{
if (!adaptive && part_type != MergeTreeDataPartType::Wide)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Logical error: non-Wide data part type with non-adaptive granularity");
if (part_type == MergeTreeDataPartType::Unknown)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Logical error: unknown data part type");
}
bool MarkType::isMarkFileExtension(std::string_view extension)
{
return extension.find("mrk") != std::string_view::npos;
}
std::string MarkType::getFileExtension()
{
std::string res = compressed ? "cmrk" : "mrk";
if (!adaptive)
{
if (part_type != MergeTreeDataPartType::Wide)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Logical error: non-Wide data part type with non-adaptive granularity");
return res;
}
switch (part_type)
{
case MergeTreeDataPartType::Wide:
return res + "2";
case MergeTreeDataPartType::Compact:
return res + "3";
case MergeTreeDataPartType::InMemory:
return "";
case MergeTreeDataPartType::Unknown:
throw Exception(ErrorCodes::LOGICAL_ERROR, "Logical error: unknown data part type");
}
}
}

View File

@ -15,6 +15,7 @@
#include <Storages/MergeTree/MergeTreeDataPartChecksum.h>
#include <Storages/MergeTree/MergeTreeDataPartTTLInfo.h>
#include <Storages/MergeTree/MergeTreeIOSettings.h>
#include <Storages/MergeTree/MergeTreeDataPartType.h>
#include <Storages/MergeTree/KeyCondition.h>
#include <Storages/ColumnsDescription.h>
#include <Interpreters/TransactionVersionMetadata.h>
@ -23,6 +24,7 @@
#include <shared_mutex>
namespace zkutil
{
class ZooKeeper;
@ -585,6 +587,23 @@ bool isInMemoryPart(const MergeTreeDataPartPtr & data_part);
inline String getIndexExtension(bool is_compressed_primary_key) { return is_compressed_primary_key ? ".cidx" : ".idx"; }
std::optional<String> getIndexExtensionFromFilesystem(const DataPartStoragePtr & data_part_storage);
bool isCompressedFromIndexExtension(const String & index_extension);
bool isCompressedFromMrkExtension(const String & mrk_extension);
/** Various types of mark files are stored in files with various extensions:
* .mrk, .mrk2, .mrk3, .cmrk, .cmrk2, .cmrk3.
* This helper allows to obtain mark type from file extension and vise versa.
*/
struct MarkType
{
MarkType(std::string_view extension);
MarkType(bool adaptive_, bool compressed_, MergeTreeDataPartType::Value part_type_);
static bool isMarkFileExtension(std::string_view extension);
std::string getFileExtension();
bool adaptive = false;
bool compressed = false;
MergeTreeDataPartType::Value part_type = MergeTreeDataPartType::Unknown;
};
}

View File

@ -13,8 +13,8 @@ IPartMetadataManager::IPartMetadataManager(const IMergeTreeDataPart * part_) : p
bool IPartMetadataManager::isCompressedFromFileName(const String & file_name)
{
const auto & extension = fs::path(file_name).extension();
return isCompressedFromMrkExtension(extension) || isCompressedFromIndexExtension(extension);
std::string extension = fs::path(file_name).extension();
return MarkType(extension).compressed || isCompressedFromIndexExtension(extension);
}
}

View File

@ -154,6 +154,7 @@ namespace ErrorCodes
extern const int CANNOT_RESTORE_TABLE;
}
static void checkSampleExpression(const StorageInMemoryMetadata & metadata, bool allow_sampling_expression_not_in_primary_key, bool check_sample_column_is_correct)
{
if (metadata.sampling_key.column_names.empty())
@ -2642,21 +2643,6 @@ MergeTreeData::MutableDataPartPtr MergeTreeData::createPart(const String & name,
throw Exception("Unknown type of part " + data_part_storage->getRelativePath(), ErrorCodes::UNKNOWN_PART_TYPE);
}
static MergeTreeDataPartType getPartTypeFromMarkExtension(const String & mrk_ext)
{
if (mrk_ext == getNonAdaptiveMrkExtension(true)
|| mrk_ext == getNonAdaptiveMrkExtension(false))
return MergeTreeDataPartType::Wide;
if (mrk_ext == getAdaptiveMrkExtension(MergeTreeDataPartType::Wide, true)
|| mrk_ext == getAdaptiveMrkExtension(MergeTreeDataPartType::Wide, false))
return MergeTreeDataPartType::Wide;
if (mrk_ext == getAdaptiveMrkExtension(MergeTreeDataPartType::Compact, true)
|| mrk_ext == getAdaptiveMrkExtension(MergeTreeDataPartType::Compact, false))
return MergeTreeDataPartType::Compact;
throw Exception("Can't determine part type, because of unknown mark extension " + mrk_ext, ErrorCodes::UNKNOWN_PART_TYPE);
}
MergeTreeData::MutableDataPartPtr MergeTreeData::createPart(
const String & name, const DataPartStoragePtr & data_part_storage, const IMergeTreeDataPart * parent_part) const
{
@ -2671,7 +2657,9 @@ MergeTreeData::MutableDataPartPtr MergeTreeData::createPart(
auto mrk_ext = MergeTreeIndexGranularityInfo::getMarksExtensionFromFilesystem(data_part_storage);
if (mrk_ext)
type = getPartTypeFromMarkExtension(*mrk_ext);
{
type = MarkType(*mrk_ext).part_type;
}
else
{
/// Didn't find any mark file, suppose that part is empty.

View File

@ -27,15 +27,24 @@ MergeTreeDataPartWriterCompact::MergeTreeDataPartWriterCompact(
settings.max_compress_block_size,
settings_.query_write_settings))
, plain_hashing(*plain_file)
, marks_file(data_part_storage_builder->writeFile(
{
marks_file = data_part_storage_builder->writeFile(
MergeTreeDataPartCompact::DATA_FILE_NAME + marks_file_extension_,
4096,
settings_.query_write_settings))
, marks_hashing(*marks_file)
, marks_compressed_buf(marks_hashing, settings_.getMarksCompressionCodec(), settings_.marks_compress_block_size)
, marks_compressed(marks_compressed_buf)
, compress_marks(isCompressedFromMrkExtension(marks_file_extension))
{
settings_.query_write_settings);
marks_file_hashing = std::make_unique<HashingWriteBuffer>(*marks_file);
if (MarkType(marks_file_extension).compressed)
{
marks_compressor = std::make_unique<CompressedWriteBuffer>(
*marks_file_hashing,
settings_.getMarksCompressionCodec(),
settings_.marks_compress_block_size);
marks_source_hashing = std::make_unique<HashingWriteBuffer>(*marks_compressor);
}
const auto & storage_columns = metadata_snapshot->getColumns();
for (const auto & column : columns_list)
addStreams(column, storage_columns.getCodecDescOrDefault(column.name, default_codec));
@ -176,6 +185,8 @@ void MergeTreeDataPartWriterCompact::writeDataBlockPrimaryIndexAndSkipIndices(co
void MergeTreeDataPartWriterCompact::writeDataBlock(const Block & block, const Granules & granules)
{
WriteBuffer & marks_out = marks_source_hashing ? *marks_source_hashing : *marks_file_hashing;
for (const auto & granule : granules)
{
data_written = true;
@ -207,8 +218,8 @@ void MergeTreeDataPartWriterCompact::writeDataBlock(const Block & block, const G
};
writeIntBinary(plain_hashing.count(), compress_marks ? marks_compressed : marks_hashing);
writeIntBinary(static_cast<UInt64>(0), compress_marks ? marks_compressed : marks_hashing);
writeIntBinary(plain_hashing.count(), marks_out);
writeIntBinary(static_cast<UInt64>(0), marks_out);
writeColumnSingleGranule(
block.getByName(name_and_type->name), data_part->getSerialization(name_and_type->name),
@ -218,7 +229,7 @@ void MergeTreeDataPartWriterCompact::writeDataBlock(const Block & block, const G
prev_stream->hashing_buf.next(); //-V522
}
writeIntBinary(granule.rows_to_write, compress_marks ? marks_compressed : marks_hashing);
writeIntBinary(granule.rows_to_write, marks_out);
}
}
@ -243,21 +254,26 @@ void MergeTreeDataPartWriterCompact::fillDataChecksums(IMergeTreeDataPart::Check
assert(stream->hashing_buf.offset() == 0);
#endif
WriteBuffer & marks_out = marks_source_hashing ? *marks_source_hashing : *marks_file_hashing;
if (with_final_mark && data_written)
{
for (size_t i = 0; i < columns_list.size(); ++i)
{
writeIntBinary(plain_hashing.count(), compress_marks ? marks_compressed : marks_hashing);
writeIntBinary(static_cast<UInt64>(0), compress_marks ? marks_compressed : marks_hashing);
writeIntBinary(plain_hashing.count(), marks_out);
writeIntBinary(static_cast<UInt64>(0), marks_out);
}
writeIntBinary(static_cast<UInt64>(0), compress_marks ? marks_compressed : marks_hashing);
writeIntBinary(static_cast<UInt64>(0), marks_out);
}
plain_file->next();
if (compress_marks)
marks_compressed.next();
marks_hashing.next();
if (marks_source_hashing)
marks_source_hashing->next();
if (marks_compressor)
marks_compressor->next();
marks_file_hashing->next();
addToChecksums(checksums);
plain_file->preFinalize();
@ -268,6 +284,7 @@ void MergeTreeDataPartWriterCompact::finishDataSerialization(bool sync)
{
plain_file->finalize();
marks_file->finalize();
if (sync)
{
plain_file->sync();
@ -339,14 +356,15 @@ void MergeTreeDataPartWriterCompact::addToChecksums(MergeTreeDataPartChecksums &
checksums.files[data_file_name].file_size = plain_hashing.count();
checksums.files[data_file_name].file_hash = plain_hashing.getHash();
if (compress_marks)
if (marks_compressor)
{
checksums.files[marks_file_name].is_compressed = true;
checksums.files[marks_file_name].uncompressed_size = marks_compressed.count();
checksums.files[marks_file_name].uncompressed_hash = marks_compressed.getHash();
checksums.files[marks_file_name].uncompressed_size = marks_source_hashing->count();
checksums.files[marks_file_name].uncompressed_hash = marks_source_hashing->getHash();
}
checksums.files[marks_file_name].file_size = marks_hashing.count();
checksums.files[marks_file_name].file_hash = marks_hashing.getHash();
checksums.files[marks_file_name].file_size = marks_file_hashing->count();
checksums.files[marks_file_name].file_hash = marks_file_hashing->getHash();
}
void MergeTreeDataPartWriterCompact::ColumnsBuffer::add(MutableColumns && columns)

View File

@ -1,6 +1,8 @@
#pragma once
#include <Storages/MergeTree/MergeTreeDataPartWriterOnDisk.h>
namespace DB
{
@ -84,12 +86,16 @@ private:
/// Stream for each column's substreams path (look at addStreams).
std::unordered_map<String, CompressedStreamPtr> compressed_streams;
/// marks -> marks_file -> marks_compressed_buf -> marks_compressed
/// If marks are uncompressed, the data is written to 'marks_file_hashing' for hash calculation and then to the 'marks_file'.
std::unique_ptr<WriteBufferFromFileBase> marks_file;
HashingWriteBuffer marks_hashing;
CompressedWriteBuffer marks_compressed_buf;
HashingWriteBuffer marks_compressed;
bool compress_marks;
std::unique_ptr<HashingWriteBuffer> marks_file_hashing;
/// If marks are compressed, the data is written to 'marks_source_hashing' for hash calculation,
/// then to 'marks_compressor' for compression,
/// then to 'marks_file_hashing' for calculation of hash of compressed data,
/// then finally to 'marks_file'.
std::unique_ptr<CompressedWriteBuffer> marks_compressor;
std::unique_ptr<HashingWriteBuffer> marks_source_hashing;
};
}

View File

@ -13,12 +13,15 @@ namespace ErrorCodes
void MergeTreeDataPartWriterOnDisk::Stream::preFinalize()
{
compressed.next();
/// 'compressed_buf' doesn't call next() on underlying buffer ('plain_hashing'). We should do it manually.
compressed_hashing.next();
compressor.next();
plain_hashing.next();
if (compress_marks)
marks_compressed.next();
{
marks_compressed_hashing.next();
marks_compressor.next();
}
marks_hashing.next();
@ -60,13 +63,13 @@ MergeTreeDataPartWriterOnDisk::Stream::Stream(
marks_file_extension{marks_file_extension_},
plain_file(data_part_storage_builder->writeFile(data_path_ + data_file_extension, max_compress_block_size_, query_write_settings)),
plain_hashing(*plain_file),
compressed_buf(plain_hashing, compression_codec_, max_compress_block_size_),
compressed(compressed_buf),
compressor(plain_hashing, compression_codec_, max_compress_block_size_),
compressed_hashing(compressor),
marks_file(data_part_storage_builder->writeFile(marks_path_ + marks_file_extension, 4096, query_write_settings)),
marks_hashing(*marks_file),
marks_compressed_buf(marks_hashing, marks_compression_codec_, marks_compress_block_size_),
marks_compressed(marks_compressed_buf),
compress_marks(isCompressedFromMrkExtension(marks_file_extension))
marks_compressor(marks_hashing, marks_compression_codec_, marks_compress_block_size_),
marks_compressed_hashing(marks_compressor),
compress_marks(MarkType(marks_file_extension).compressed)
{
}
@ -75,17 +78,18 @@ void MergeTreeDataPartWriterOnDisk::Stream::addToChecksums(MergeTreeData::DataPa
String name = escaped_column_name;
checksums.files[name + data_file_extension].is_compressed = true;
checksums.files[name + data_file_extension].uncompressed_size = compressed.count();
checksums.files[name + data_file_extension].uncompressed_hash = compressed.getHash();
checksums.files[name + data_file_extension].uncompressed_size = compressed_hashing.count();
checksums.files[name + data_file_extension].uncompressed_hash = compressed_hashing.getHash();
checksums.files[name + data_file_extension].file_size = plain_hashing.count();
checksums.files[name + data_file_extension].file_hash = plain_hashing.getHash();
if (compress_marks)
{
checksums.files[name + marks_file_extension].is_compressed = true;
checksums.files[name + marks_file_extension].uncompressed_size = marks_compressed.count();
checksums.files[name + marks_file_extension].uncompressed_hash = marks_compressed.getHash();
checksums.files[name + marks_file_extension].uncompressed_size = marks_compressed_hashing.count();
checksums.files[name + marks_file_extension].uncompressed_hash = marks_compressed_hashing.getHash();
}
checksums.files[name + marks_file_extension].file_size = marks_hashing.count();
checksums.files[name + marks_file_extension].file_hash = marks_hashing.getHash();
}
@ -175,15 +179,15 @@ void MergeTreeDataPartWriterOnDisk::initPrimaryIndex()
{
String index_name = "primary" + getIndexExtension(compress_primary_key);
index_file_stream = data_part_storage_builder->writeFile(index_name, DBMS_DEFAULT_BUFFER_SIZE, settings.query_write_settings);
index_hashing_stream = std::make_unique<HashingWriteBuffer>(*index_file_stream);
index_file_hashing_stream = std::make_unique<HashingWriteBuffer>(*index_file_stream);
if (compress_primary_key)
{
ParserCodec codec_parser;
auto ast = parseQuery(codec_parser, "(" + Poco::toUpper(settings.primary_key_compression_codec) + ")", 0, DBMS_DEFAULT_MAX_PARSER_DEPTH);
CompressionCodecPtr primary_key_compression_codec = CompressionCodecFactory::instance().get(ast, nullptr);
index_compressed_buf = std::make_unique<CompressedWriteBuffer>(*index_hashing_stream, primary_key_compression_codec, settings.primary_key_compress_block_size);
index_compressed_stream = std::make_unique<HashingWriteBuffer>(*index_compressed_buf);
index_compressor_stream = std::make_unique<CompressedWriteBuffer>(*index_file_hashing_stream, primary_key_compression_codec, settings.primary_key_compress_block_size);
index_source_hashing_stream = std::make_unique<HashingWriteBuffer>(*index_compressor_stream);
}
}
}
@ -242,7 +246,7 @@ void MergeTreeDataPartWriterOnDisk::calculateAndSerializePrimaryIndex(const Bloc
const auto & primary_column = primary_index_block.getByPosition(j);
index_columns[j]->insertFrom(*primary_column.column, granule.start_row);
primary_column.type->getDefaultSerialization()->serializeBinary(
*primary_column.column, granule.start_row, compress_primary_key ? *index_compressed_stream : *index_hashing_stream);
*primary_column.column, granule.start_row, compress_primary_key ? *index_source_hashing_stream : *index_file_hashing_stream);
}
}
}
@ -260,11 +264,13 @@ void MergeTreeDataPartWriterOnDisk::calculateAndSerializeSkipIndices(const Block
{
const auto index_helper = skip_indices[i];
auto & stream = *skip_indices_streams[i];
WriteBuffer & marks_out = stream.compress_marks ? stream.marks_compressed_hashing : stream.marks_hashing;
for (const auto & granule : granules_to_write)
{
if (skip_index_accumulated_marks[i] == index_helper->index.granularity)
{
skip_indices_aggregators[i]->getGranuleAndReset()->serializeBinary(stream.compressed);
skip_indices_aggregators[i]->getGranuleAndReset()->serializeBinary(stream.compressed_hashing);
skip_index_accumulated_marks[i] = 0;
}
@ -272,15 +278,16 @@ void MergeTreeDataPartWriterOnDisk::calculateAndSerializeSkipIndices(const Block
{
skip_indices_aggregators[i] = index_helper->createIndexAggregator();
if (stream.compressed.offset() >= settings.min_compress_block_size)
stream.compressed.next();
if (stream.compressed_hashing.offset() >= settings.min_compress_block_size)
stream.compressed_hashing.next();
writeIntBinary(stream.plain_hashing.count(), marks_out);
writeIntBinary(stream.compressed_hashing.offset(), marks_out);
writeIntBinary(stream.plain_hashing.count(), stream.compress_marks ? stream.marks_compressed : stream.marks_hashing);
writeIntBinary(stream.compressed.offset(), stream.compress_marks ? stream.marks_compressed : stream.marks_hashing);
/// Actually this numbers is redundant, but we have to store them
/// to be compatible with normal .mrk2 file format
/// to be compatible with the normal .mrk2 file format
if (settings.can_use_adaptive_granularity)
writeIntBinary(1UL, stream.compress_marks ? stream.marks_compressed : stream.marks_hashing);
writeIntBinary(1UL, marks_out);
}
size_t pos = granule.start_row;
@ -297,7 +304,7 @@ void MergeTreeDataPartWriterOnDisk::fillPrimaryIndexChecksums(MergeTreeData::Dat
if (write_final_mark && compute_granularity)
index_granularity.appendMark(0);
if (index_hashing_stream)
if (index_file_hashing_stream)
{
if (write_final_mark)
{
@ -307,32 +314,32 @@ void MergeTreeDataPartWriterOnDisk::fillPrimaryIndexChecksums(MergeTreeData::Dat
size_t last_row_number = column.size() - 1;
index_columns[j]->insertFrom(column, last_row_number);
index_types[j]->getDefaultSerialization()->serializeBinary(
column, last_row_number, compress_primary_key ? *index_compressed_stream : *index_hashing_stream);
column, last_row_number, compress_primary_key ? *index_source_hashing_stream : *index_file_hashing_stream);
}
last_block_index_columns.clear();
}
if (compress_primary_key)
index_compressed_stream->next();
index_source_hashing_stream->next();
index_hashing_stream->next();
index_file_hashing_stream->next();
String index_name = "primary" + getIndexExtension(compress_primary_key);
if (compress_primary_key)
{
checksums.files[index_name].is_compressed = true;
checksums.files[index_name].uncompressed_size = index_compressed_stream->count();
checksums.files[index_name].uncompressed_hash = index_compressed_stream->getHash();
checksums.files[index_name].uncompressed_size = index_source_hashing_stream->count();
checksums.files[index_name].uncompressed_hash = index_source_hashing_stream->getHash();
}
checksums.files[index_name].file_size = index_hashing_stream->count();
checksums.files[index_name].file_hash = index_hashing_stream->getHash();
checksums.files[index_name].file_size = index_file_hashing_stream->count();
checksums.files[index_name].file_hash = index_file_hashing_stream->getHash();
index_file_stream->preFinalize();
}
}
void MergeTreeDataPartWriterOnDisk::finishPrimaryIndexSerialization(bool sync)
{
if (index_hashing_stream)
if (index_file_hashing_stream)
{
index_file_stream->finalize();
if (sync)
@ -340,10 +347,10 @@ void MergeTreeDataPartWriterOnDisk::finishPrimaryIndexSerialization(bool sync)
if (compress_primary_key)
{
index_compressed_stream = nullptr;
index_compressed_buf = nullptr;
index_source_hashing_stream = nullptr;
index_compressor_stream = nullptr;
}
index_hashing_stream = nullptr;
index_file_hashing_stream = nullptr;
}
}
@ -353,7 +360,7 @@ void MergeTreeDataPartWriterOnDisk::fillSkipIndicesChecksums(MergeTreeData::Data
{
auto & stream = *skip_indices_streams[i];
if (!skip_indices_aggregators[i]->empty())
skip_indices_aggregators[i]->getGranuleAndReset()->serializeBinary(stream.compressed);
skip_indices_aggregators[i]->getGranuleAndReset()->serializeBinary(stream.compressed_hashing);
}
for (auto & stream : skip_indices_streams)

View File

@ -65,17 +65,17 @@ public:
std::string data_file_extension;
std::string marks_file_extension;
/// compressed -> compressed_buf -> plain_hashing -> plain_file
/// compressed_hashing -> compressor -> plain_hashing -> plain_file
std::unique_ptr<WriteBufferFromFileBase> plain_file;
HashingWriteBuffer plain_hashing;
CompressedWriteBuffer compressed_buf;
HashingWriteBuffer compressed;
CompressedWriteBuffer compressor;
HashingWriteBuffer compressed_hashing;
/// marks -> marks_file -> marks_compressed_buf -> marks_compressed
/// marks_compressed_hashing -> marks_compressor -> marks_hashing -> marks_file
std::unique_ptr<WriteBufferFromFileBase> marks_file;
HashingWriteBuffer marks_hashing;
CompressedWriteBuffer marks_compressed_buf;
HashingWriteBuffer marks_compressed;
CompressedWriteBuffer marks_compressor;
HashingWriteBuffer marks_compressed_hashing;
bool compress_marks;
bool is_prefinalized = false;
@ -145,9 +145,9 @@ protected:
std::vector<size_t> skip_index_accumulated_marks;
std::unique_ptr<WriteBufferFromFileBase> index_file_stream;
std::unique_ptr<HashingWriteBuffer> index_hashing_stream;
std::unique_ptr<CompressedWriteBuffer> index_compressed_buf;
std::unique_ptr<HashingWriteBuffer> index_compressed_stream;
std::unique_ptr<HashingWriteBuffer> index_file_hashing_stream;
std::unique_ptr<CompressedWriteBuffer> index_compressor_stream;
std::unique_ptr<HashingWriteBuffer> index_source_hashing_stream;
bool compress_primary_key;
DataTypes index_types;

View File

@ -30,6 +30,7 @@ Granules getGranulesToWrite(const MergeTreeIndexGranularity & index_granularity,
Granules result;
size_t current_row = 0;
/// When our last mark is not finished yet and we have to write rows into it
if (rows_written_in_last_mark > 0)
{
@ -43,7 +44,7 @@ Granules getGranulesToWrite(const MergeTreeIndexGranularity & index_granularity,
.is_complete = (rows_left_in_block >= rows_left_in_last_mark),
});
current_row += result.back().rows_to_write;
current_mark++;
++current_mark;
}
/// Calculating normal granules for block
@ -61,7 +62,7 @@ Granules getGranulesToWrite(const MergeTreeIndexGranularity & index_granularity,
.is_complete = (rows_left_in_block >= expected_rows_in_mark),
});
current_row += result.back().rows_to_write;
current_mark++;
++current_mark;
}
return result;
@ -144,7 +145,7 @@ ISerialization::OutputStreamGetter MergeTreeDataPartWriterWide::createStreamGett
if (is_offsets && offset_columns.contains(stream_name))
return nullptr;
return &column_streams.at(stream_name)->compressed;
return &column_streams.at(stream_name)->compressed_hashing;
};
}
@ -272,10 +273,12 @@ void MergeTreeDataPartWriterWide::writeSingleMark(
void MergeTreeDataPartWriterWide::flushMarkToFile(const StreamNameAndMark & stream_with_mark, size_t rows_in_mark)
{
Stream & stream = *column_streams[stream_with_mark.stream_name];
writeIntBinary(stream_with_mark.mark.offset_in_compressed_file, stream.compress_marks ? stream.marks_compressed : stream.marks_hashing);
writeIntBinary(stream_with_mark.mark.offset_in_decompressed_block, stream.compress_marks ? stream.marks_compressed : stream.marks_hashing);
WriteBuffer & marks_out = stream.compress_marks ? stream.marks_compressed_hashing : stream.marks_hashing;
writeIntBinary(stream_with_mark.mark.offset_in_compressed_file, marks_out);
writeIntBinary(stream_with_mark.mark.offset_in_decompressed_block, marks_out);
if (settings.can_use_adaptive_granularity)
writeIntBinary(rows_in_mark, stream.compress_marks ? stream.marks_compressed : stream.marks_hashing);
writeIntBinary(rows_in_mark, marks_out);
}
StreamsWithMarks MergeTreeDataPartWriterWide::getCurrentMarksForColumn(
@ -297,13 +300,13 @@ StreamsWithMarks MergeTreeDataPartWriterWide::getCurrentMarksForColumn(
Stream & stream = *column_streams[stream_name];
/// There could already be enough data to compress into the new block.
if (stream.compressed.offset() >= settings.min_compress_block_size)
stream.compressed.next();
if (stream.compressed_hashing.offset() >= settings.min_compress_block_size)
stream.compressed_hashing.next();
StreamNameAndMark stream_with_mark;
stream_with_mark.stream_name = stream_name;
stream_with_mark.mark.offset_in_compressed_file = stream.plain_hashing.count();
stream_with_mark.mark.offset_in_decompressed_block = stream.compressed.offset();
stream_with_mark.mark.offset_in_decompressed_block = stream.compressed_hashing.offset();
result.push_back(stream_with_mark);
}, path);
@ -333,7 +336,7 @@ void MergeTreeDataPartWriterWide::writeSingleGranule(
if (is_offsets && offset_columns.contains(stream_name))
return;
column_streams[stream_name]->compressed.nextIfAtEnd();
column_streams[stream_name]->compressed_hashing.nextIfAtEnd();
}, serialize_settings.path);
}

View File

@ -16,19 +16,10 @@ namespace ErrorCodes
std::optional<std::string> MergeTreeIndexGranularityInfo::getMarksExtensionFromFilesystem(const DataPartStoragePtr & data_part_storage)
{
if (data_part_storage->exists())
{
for (auto it = data_part_storage->iterate(); it->isValid(); it->next())
{
const auto & ext = fs::path(it->name()).extension();
if (ext == getNonAdaptiveMrkExtension(false)
|| ext == getNonAdaptiveMrkExtension(true)
|| ext == getAdaptiveMrkExtension(MergeTreeDataPartType::Wide, false)
|| ext == getAdaptiveMrkExtension(MergeTreeDataPartType::Wide, true)
|| ext == getAdaptiveMrkExtension(MergeTreeDataPartType::Compact, false)
|| ext == getAdaptiveMrkExtension(MergeTreeDataPartType::Compact, true))
return ext;
}
}
if (it->isFile())
if (std::string ext = fs::path(it->name()).extension(); MarkType::isMarkFileExtension(ext))
return ext;
return {};
}

View File

@ -42,10 +42,7 @@ public:
String getMarksFilePath(const DataPartStoragePtr & data_part_storage, const String & path_prefix) const
{
auto mrk_ext = getMarksExtensionFromFilesystem(data_part_storage);
if (mrk_ext)
return path_prefix + *mrk_ext;
return path_prefix + marks_file_extension;
return path_prefix + mrk_ext.value_or(marks_file_extension);
}
size_t getMarkSizeInBytes(size_t columns_num = 1) const;

View File

@ -29,7 +29,6 @@ MergeTreeMarksLoader::MergeTreeMarksLoader(
: data_part_storage(std::move(data_part_storage_))
, mark_cache(mark_cache_)
, mrk_path(mrk_path_)
, compress_marks(isCompressedFromMrkExtension(fs::path(mrk_path_).extension()))
, marks_count(marks_count_)
, index_granularity_info(index_granularity_info_)
, save_marks_in_cache(save_marks_in_cache_)
@ -62,9 +61,12 @@ MarkCache::MappedPtr MergeTreeMarksLoader::loadMarksImpl()
size_t mark_size = index_granularity_info.getMarkSizeInBytes(columns_in_mark);
size_t expected_uncompressed_size = mark_size * marks_count;
std::string file_extension = fs::path(mrk_path).extension();
bool compressed_marks = MarkType(file_extension).compressed;
auto res = std::make_shared<MarksInCompressedFile>(marks_count * columns_in_mark);
if (!compress_marks && expected_uncompressed_size != file_size)
if (!compressed_marks && expected_uncompressed_size != file_size)
throw Exception(
ErrorCodes::CORRUPTED_DATA,
"Bad size of marks file '{}': {}, must be: {}",
@ -73,7 +75,7 @@ MarkCache::MappedPtr MergeTreeMarksLoader::loadMarksImpl()
auto buffer = data_part_storage->readFile(mrk_path, read_settings.adjustBufferSize(file_size), file_size, std::nullopt);
std::unique_ptr<ReadBuffer> reader;
if (!compress_marks)
if (!compressed_marks)
reader = std::move(buffer);
else
reader = std::make_unique<CompressedReadBufferFromFile>(std::move(buffer));

View File

@ -31,7 +31,6 @@ private:
DataPartStoragePtr data_part_storage;
MarkCache * mark_cache = nullptr;
String mrk_path;
bool compress_marks;
size_t marks_count;
const MergeTreeIndexGranularityInfo & index_granularity_info;
bool save_marks_in_cache = false;