Templates are shit

This commit is contained in:
Alexey Milovidov 2024-10-20 23:28:23 +02:00
parent 8f038e2e1c
commit 1236422559
3 changed files with 44 additions and 55 deletions

View File

@ -25,8 +25,7 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR; extern const int LOGICAL_ERROR;
} }
template<bool only_plain_file> void MergeTreeDataPartWriterOnDisk::Stream::preFinalize()
void MergeTreeDataPartWriterOnDisk::Stream<only_plain_file>::preFinalize()
{ {
/// Here the main goal is to do preFinalize calls for plain_file and marks_file /// Here the main goal is to do preFinalize calls for plain_file and marks_file
/// Before that all hashing and compression buffers have to be finalized /// Before that all hashing and compression buffers have to be finalized
@ -37,45 +36,42 @@ void MergeTreeDataPartWriterOnDisk::Stream<only_plain_file>::preFinalize()
compressor->finalize(); compressor->finalize();
plain_hashing.finalize(); plain_hashing.finalize();
if constexpr (!only_plain_file) if (marks_hashing)
{ {
if (compress_marks) if (compress_marks)
{ {
marks_compressed_hashing.finalize(); marks_compressed_hashing->finalize();
marks_compressor.finalize(); marks_compressor->finalize();
} }
marks_hashing.finalize(); marks_hashing->finalize();
} }
plain_file->preFinalize(); plain_file->preFinalize();
if constexpr (!only_plain_file) if (marks_file)
marks_file->preFinalize(); marks_file->preFinalize();
is_prefinalized = true; is_prefinalized = true;
} }
template<bool only_plain_file> void MergeTreeDataPartWriterOnDisk::Stream::finalize()
void MergeTreeDataPartWriterOnDisk::Stream<only_plain_file>::finalize()
{ {
if (!is_prefinalized) if (!is_prefinalized)
preFinalize(); preFinalize();
plain_file->finalize(); plain_file->finalize();
if constexpr (!only_plain_file) if (marks_file)
marks_file->finalize(); marks_file->finalize();
} }
template<bool only_plain_file> void MergeTreeDataPartWriterOnDisk::Stream::sync() const
void MergeTreeDataPartWriterOnDisk::Stream<only_plain_file>::sync() const
{ {
plain_file->sync(); plain_file->sync();
if constexpr (!only_plain_file) if (marks_file)
marks_file->sync(); marks_file->sync();
} }
template<> MergeTreeDataPartWriterOnDisk::Stream::Stream(
MergeTreeDataPartWriterOnDisk::Stream<false>::Stream(
const String & escaped_column_name_, const String & escaped_column_name_,
const MutableDataPartStoragePtr & data_part_storage, const MutableDataPartStoragePtr & data_part_storage,
const String & data_path_, const String & data_path_,
@ -94,16 +90,15 @@ MergeTreeDataPartWriterOnDisk::Stream<false>::Stream(
plain_hashing(*plain_file), plain_hashing(*plain_file),
compressor(std::make_unique<CompressedWriteBuffer>(plain_hashing, compression_codec_, max_compress_block_size_, query_write_settings.use_adaptive_write_buffer, query_write_settings.adaptive_write_buffer_initial_size)), compressor(std::make_unique<CompressedWriteBuffer>(plain_hashing, compression_codec_, max_compress_block_size_, query_write_settings.use_adaptive_write_buffer, query_write_settings.adaptive_write_buffer_initial_size)),
compressed_hashing(*compressor), compressed_hashing(*compressor),
marks_file(data_part_storage->writeFile(marks_path_ + marks_file_extension, 4096, query_write_settings)),
marks_hashing(*marks_file),
marks_compressor(marks_hashing, marks_compression_codec_, marks_compress_block_size_, query_write_settings.use_adaptive_write_buffer, query_write_settings.adaptive_write_buffer_initial_size),
marks_compressed_hashing(marks_compressor),
compress_marks(MarkType(marks_file_extension).compressed) compress_marks(MarkType(marks_file_extension).compressed)
{ {
marks_file = data_part_storage->writeFile(marks_path_ + marks_file_extension, 4096, query_write_settings);
marks_hashing.emplace(*marks_file);
marks_compressor.emplace(*marks_hashing, marks_compression_codec_, marks_compress_block_size_, query_write_settings.use_adaptive_write_buffer, query_write_settings.adaptive_write_buffer_initial_size);
marks_compressed_hashing.emplace(*marks_compressor);
} }
template<> MergeTreeDataPartWriterOnDisk::Stream::Stream(
MergeTreeDataPartWriterOnDisk::Stream<true>::Stream(
const String & escaped_column_name_, const String & escaped_column_name_,
const MutableDataPartStoragePtr & data_part_storage, const MutableDataPartStoragePtr & data_part_storage,
const String & data_path_, const String & data_path_,
@ -121,8 +116,7 @@ MergeTreeDataPartWriterOnDisk::Stream<true>::Stream(
{ {
} }
template<bool only_plain_file> void MergeTreeDataPartWriterOnDisk::Stream::addToChecksums(MergeTreeData::DataPart::Checksums & checksums)
void MergeTreeDataPartWriterOnDisk::Stream<only_plain_file>::addToChecksums(MergeTreeData::DataPart::Checksums & checksums)
{ {
String name = escaped_column_name; String name = escaped_column_name;
@ -132,17 +126,17 @@ void MergeTreeDataPartWriterOnDisk::Stream<only_plain_file>::addToChecksums(Merg
checksums.files[name + data_file_extension].file_size = plain_hashing.count(); checksums.files[name + data_file_extension].file_size = plain_hashing.count();
checksums.files[name + data_file_extension].file_hash = plain_hashing.getHash(); checksums.files[name + data_file_extension].file_hash = plain_hashing.getHash();
if constexpr (!only_plain_file) if (marks_hashing)
{ {
if (compress_marks) if (compress_marks)
{ {
checksums.files[name + marks_file_extension].is_compressed = true; checksums.files[name + marks_file_extension].is_compressed = true;
checksums.files[name + marks_file_extension].uncompressed_size = marks_compressed_hashing.count(); checksums.files[name + marks_file_extension].uncompressed_size = marks_compressed_hashing->count();
checksums.files[name + marks_file_extension].uncompressed_hash = marks_compressed_hashing.getHash(); checksums.files[name + marks_file_extension].uncompressed_hash = marks_compressed_hashing->getHash();
} }
checksums.files[name + marks_file_extension].file_size = marks_hashing.count(); checksums.files[name + marks_file_extension].file_size = marks_hashing->count();
checksums.files[name + marks_file_extension].file_hash = marks_hashing.getHash(); checksums.files[name + marks_file_extension].file_hash = marks_hashing->getHash();
} }
} }
@ -276,12 +270,12 @@ void MergeTreeDataPartWriterOnDisk::initStatistics()
for (const auto & stat_ptr : stats) for (const auto & stat_ptr : stats)
{ {
String stats_name = stat_ptr->getFileName(); String stats_name = stat_ptr->getFileName();
stats_streams.emplace_back(std::make_unique<MergeTreeDataPartWriterOnDisk::Stream<true>>( stats_streams.emplace_back(std::make_unique<MergeTreeDataPartWriterOnDisk::Stream>(
stats_name, stats_name,
data_part_storage, data_part_storage,
stats_name, STATS_FILE_SUFFIX, stats_name, STATS_FILE_SUFFIX,
default_codec, settings.max_compress_block_size, default_codec, settings.max_compress_block_size,
settings.query_write_settings)); settings.query_write_settings));
} }
} }
@ -298,14 +292,14 @@ void MergeTreeDataPartWriterOnDisk::initSkipIndices()
{ {
String stream_name = skip_index->getFileName(); String stream_name = skip_index->getFileName();
skip_indices_streams.emplace_back( skip_indices_streams.emplace_back(
std::make_unique<MergeTreeDataPartWriterOnDisk::Stream<false>>( std::make_unique<MergeTreeDataPartWriterOnDisk::Stream>(
stream_name, stream_name,
data_part_storage, data_part_storage,
stream_name, skip_index->getSerializedFileExtension(), stream_name, skip_index->getSerializedFileExtension(),
stream_name, marks_file_extension, stream_name, marks_file_extension,
default_codec, settings.max_compress_block_size, default_codec, settings.max_compress_block_size,
marks_compression_codec, settings.marks_compress_block_size, marks_compression_codec, settings.marks_compress_block_size,
settings.query_write_settings)); settings.query_write_settings));
GinIndexStorePtr store = nullptr; GinIndexStorePtr store = nullptr;
if (typeid_cast<const MergeTreeIndexFullText *>(&*skip_index) != nullptr) if (typeid_cast<const MergeTreeIndexFullText *>(&*skip_index) != nullptr)
@ -381,7 +375,7 @@ void MergeTreeDataPartWriterOnDisk::calculateAndSerializeSkipIndices(const Block
{ {
const auto index_helper = skip_indices[i]; const auto index_helper = skip_indices[i];
auto & stream = *skip_indices_streams[i]; auto & stream = *skip_indices_streams[i];
WriteBuffer & marks_out = stream.compress_marks ? stream.marks_compressed_hashing : stream.marks_hashing; WriteBuffer & marks_out = stream.compress_marks ? *stream.marks_compressed_hashing : *stream.marks_hashing;
GinIndexStorePtr store; GinIndexStorePtr store;
if (typeid_cast<const MergeTreeIndexFullText *>(&*index_helper) != nullptr) if (typeid_cast<const MergeTreeIndexFullText *>(&*index_helper) != nullptr)
@ -564,7 +558,4 @@ Names MergeTreeDataPartWriterOnDisk::getSkipIndicesColumns() const
return Names(skip_indexes_column_names_set.begin(), skip_indexes_column_names_set.end()); return Names(skip_indexes_column_names_set.begin(), skip_indexes_column_names_set.end());
} }
template struct MergeTreeDataPartWriterOnDisk::Stream<false>;
template struct MergeTreeDataPartWriterOnDisk::Stream<true>;
} }

View File

@ -44,7 +44,6 @@ public:
/// Helper class, which holds chain of buffers to write data file with marks. /// Helper class, which holds chain of buffers to write data file with marks.
/// It is used to write: one column, skip index or all columns (in compact format). /// It is used to write: one column, skip index or all columns (in compact format).
template <bool only_plain_file>
struct Stream struct Stream
{ {
Stream( Stream(
@ -82,9 +81,9 @@ public:
/// marks_compressed_hashing -> marks_compressor -> marks_hashing -> marks_file /// marks_compressed_hashing -> marks_compressor -> marks_hashing -> marks_file
std::unique_ptr<WriteBufferFromFileBase> marks_file; std::unique_ptr<WriteBufferFromFileBase> marks_file;
std::conditional_t<!only_plain_file, HashingWriteBuffer, void*> marks_hashing; std::optional<HashingWriteBuffer> marks_hashing;
std::conditional_t<!only_plain_file, CompressedWriteBuffer, void*> marks_compressor; std::optional<CompressedWriteBuffer> marks_compressor;
std::conditional_t<!only_plain_file, HashingWriteBuffer, void*> marks_compressed_hashing; std::optional<HashingWriteBuffer> marks_compressed_hashing;
bool compress_marks; bool compress_marks;
bool is_prefinalized = false; bool is_prefinalized = false;
@ -98,8 +97,7 @@ public:
void addToChecksums(MergeTreeDataPartChecksums & checksums); void addToChecksums(MergeTreeDataPartChecksums & checksums);
}; };
using StreamPtr = std::unique_ptr<Stream<false>>; using StreamPtr = std::unique_ptr<Stream>;
using StatisticStreamPtr = std::unique_ptr<Stream<true>>;
MergeTreeDataPartWriterOnDisk( MergeTreeDataPartWriterOnDisk(
const String & data_part_name_, const String & data_part_name_,
@ -157,7 +155,7 @@ protected:
const MergeTreeIndices skip_indices; const MergeTreeIndices skip_indices;
const ColumnsStatistics stats; const ColumnsStatistics stats;
std::vector<StatisticStreamPtr> stats_streams; std::vector<StreamPtr> stats_streams;
const String marks_file_extension; const String marks_file_extension;
const CompressionCodecPtr default_codec; const CompressionCodecPtr default_codec;

View File

@ -187,7 +187,7 @@ void MergeTreeDataPartWriterWide::addStreams(
query_write_settings.use_adaptive_write_buffer = settings.use_adaptive_write_buffer_for_dynamic_subcolumns && ISerialization::isDynamicSubcolumn(substream_path, substream_path.size()); query_write_settings.use_adaptive_write_buffer = settings.use_adaptive_write_buffer_for_dynamic_subcolumns && ISerialization::isDynamicSubcolumn(substream_path, substream_path.size());
query_write_settings.adaptive_write_buffer_initial_size = settings.adaptive_write_buffer_initial_size; query_write_settings.adaptive_write_buffer_initial_size = settings.adaptive_write_buffer_initial_size;
column_streams[stream_name] = std::make_unique<Stream<false>>( column_streams[stream_name] = std::make_unique<Stream>(
stream_name, stream_name,
data_part_storage, data_part_storage,
stream_name, DATA_FILE_EXTENSION, stream_name, DATA_FILE_EXTENSION,
@ -362,7 +362,7 @@ void MergeTreeDataPartWriterWide::writeSingleMark(
void MergeTreeDataPartWriterWide::flushMarkToFile(const StreamNameAndMark & stream_with_mark, size_t rows_in_mark) void MergeTreeDataPartWriterWide::flushMarkToFile(const StreamNameAndMark & stream_with_mark, size_t rows_in_mark)
{ {
auto & stream = *column_streams[stream_with_mark.stream_name]; auto & stream = *column_streams[stream_with_mark.stream_name];
WriteBuffer & marks_out = stream.compress_marks ? stream.marks_compressed_hashing : stream.marks_hashing; WriteBuffer & marks_out = stream.compress_marks ? *stream.marks_compressed_hashing : *stream.marks_hashing;
writeBinaryLittleEndian(stream_with_mark.mark.offset_in_compressed_file, marks_out); writeBinaryLittleEndian(stream_with_mark.mark.offset_in_compressed_file, marks_out);
writeBinaryLittleEndian(stream_with_mark.mark.offset_in_decompressed_block, marks_out); writeBinaryLittleEndian(stream_with_mark.mark.offset_in_decompressed_block, marks_out);