diff --git a/src/Storages/MergeTree/IMergeTreeDataPart.cpp b/src/Storages/MergeTree/IMergeTreeDataPart.cpp index d27b03fff44..dfc1fe0c262 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPart.cpp +++ b/src/Storages/MergeTree/IMergeTreeDataPart.cpp @@ -1,4 +1,5 @@ #include "IMergeTreeDataPart.h" +#include "Common/SipHash.h" #include "Storages/MergeTree/IDataPartStorage.h" #include @@ -1015,7 +1016,10 @@ CompressionCodecPtr IMergeTreeDataPart::detectDefaultCompressionCodec() const { if (path_to_data_file.empty()) { - String candidate_path = /*fs::path(getRelativePath()) */ (ISerialization::getFileNameForStream(part_column, substream_path) + ".bin"); + auto candidate_path = ISerialization::getFileNameForStream(part_column, substream_path) + ".bin"; + + if (!getDataPartStorage().exists(candidate_path)) + candidate_path = sipHash128String(candidate_path) + ".bin"; /// We can have existing, but empty .bin files. Example: LowCardinality(Nullable(...)) columns and column_name.dict.null.bin file. if (getDataPartStorage().exists(candidate_path) && getDataPartStorage().getFileSize(candidate_path) != 0) diff --git a/src/Storages/MergeTree/IMergedBlockOutputStream.cpp b/src/Storages/MergeTree/IMergedBlockOutputStream.cpp index 21bead2864a..2df3b6d15a6 100644 --- a/src/Storages/MergeTree/IMergedBlockOutputStream.cpp +++ b/src/Storages/MergeTree/IMergedBlockOutputStream.cpp @@ -51,7 +51,9 @@ NameSet IMergedBlockOutputStream::removeEmptyColumnsFromPart( data_part->getSerialization(column.name)->enumerateStreams( [&](const ISerialization::SubstreamPath & substream_path) { - ++stream_counts[ISerialization::getFileNameForStream(column.name, substream_path)]; + auto full_stream_name = ISerialization::getFileNameForStream(column.name, substream_path); + auto stream_name = checksums.getFileNameOrHash(full_stream_name); + ++stream_counts[stream_name]; }); } @@ -65,7 +67,9 @@ NameSet IMergedBlockOutputStream::removeEmptyColumnsFromPart( ISerialization::StreamCallback callback = [&](const ISerialization::SubstreamPath & substream_path) { - String stream_name = ISerialization::getFileNameForStream(column_name, substream_path); + auto full_stream_name = ISerialization::getFileNameForStream(column_name, substream_path); + auto stream_name = checksums.getFileNameOrHash(full_stream_name); + /// Delete files if they are no longer shared with another column. if (--stream_counts[stream_name] == 0) { diff --git a/src/Storages/MergeTree/MergeTreeDataPartChecksum.cpp b/src/Storages/MergeTree/MergeTreeDataPartChecksum.cpp index 2f97edd1a9c..7d39ea0707f 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartChecksum.cpp +++ b/src/Storages/MergeTree/MergeTreeDataPartChecksum.cpp @@ -341,7 +341,7 @@ MergeTreeDataPartChecksums::Checksum::uint128 MergeTreeDataPartChecksums::getTot return ret; } -std::optional MergeTreeDataPartChecksums::getFileNameOrHash(const String & name) const +String MergeTreeDataPartChecksums::getFileNameOrHash(const String & name) const { if (files.contains(name + ".bin")) return name; @@ -350,7 +350,7 @@ std::optional MergeTreeDataPartChecksums::getFileNameOrHash(const String if (files.contains(hash + ".bin")) return hash; - return std::nullopt; + return name; } void MinimalisticDataPartChecksums::serialize(WriteBuffer & to) const diff --git a/src/Storages/MergeTree/MergeTreeDataPartChecksum.h b/src/Storages/MergeTree/MergeTreeDataPartChecksum.h index 626b0a90839..2a38b52c72a 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartChecksum.h +++ b/src/Storages/MergeTree/MergeTreeDataPartChecksum.h @@ -89,7 +89,7 @@ struct MergeTreeDataPartChecksums UInt64 getTotalSizeOnDisk() const; - std::optional getFileNameOrHash(const String & name) const; + String getFileNameOrHash(const String & name) const; }; diff --git a/src/Storages/MergeTree/MergeTreeDataPartWide.cpp b/src/Storages/MergeTree/MergeTreeDataPartWide.cpp index f44cbdd8628..645e16eed38 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWide.cpp +++ b/src/Storages/MergeTree/MergeTreeDataPartWide.cpp @@ -73,19 +73,20 @@ ColumnSize MergeTreeDataPartWide::getColumnSizeImpl( getSerialization(column.name)->enumerateStreams([&](const ISerialization::SubstreamPath & substream_path) { - String file_name = ISerialization::getFileNameForStream(column, substream_path); + auto full_stream_name = ISerialization::getFileNameForStream(column, substream_path); + auto stream_name = checksums.getFileNameOrHash(full_stream_name); - if (processed_substreams && !processed_substreams->insert(file_name).second) + if (processed_substreams && !processed_substreams->insert(stream_name).second) return; - auto bin_checksum = checksums.files.find(file_name + ".bin"); + auto bin_checksum = checksums.files.find(stream_name + ".bin"); if (bin_checksum != checksums.files.end()) { size.data_compressed += bin_checksum->second.file_size; size.data_uncompressed += bin_checksum->second.uncompressed_size; } - auto mrk_checksum = checksums.files.find(file_name + getMarksFileExtension()); + auto mrk_checksum = checksums.files.find(stream_name + getMarksFileExtension()); if (mrk_checksum != checksums.files.end()) size.marks += mrk_checksum->second.file_size; }); @@ -185,9 +186,11 @@ void MergeTreeDataPartWide::checkConsistency(bool require_part_metadata) const { getSerialization(name_type.name)->enumerateStreams([&](const ISerialization::SubstreamPath & substream_path) { - String file_name = ISerialization::getFileNameForStream(name_type, substream_path); - String mrk_file_name = file_name + marks_file_extension; - String bin_file_name = file_name + DATA_FILE_EXTENSION; + String full_stream_name = ISerialization::getFileNameForStream(name_type, substream_path); + String stream_name = checksums.getFileNameOrHash(full_stream_name); + + String mrk_file_name = stream_name + marks_file_extension; + String bin_file_name = stream_name + DATA_FILE_EXTENSION; if (!checksums.files.contains(mrk_file_name)) throw Exception( @@ -213,6 +216,8 @@ void MergeTreeDataPartWide::checkConsistency(bool require_part_metadata) const getSerialization(name_type.name)->enumerateStreams([&](const ISerialization::SubstreamPath & substream_path) { auto file_path = ISerialization::getFileNameForStream(name_type, substream_path) + marks_file_extension; + if (!getDataPartStorage().exists(file_path)) + file_path = sipHash128String(file_path) + marks_file_extension; /// Missing file is Ok for case when new column was added. if (getDataPartStorage().exists(file_path)) @@ -266,7 +271,10 @@ String MergeTreeDataPartWide::getFileNameForColumn(const NameAndTypePair & colum getSerialization(column.name)->enumerateStreams([&](const ISerialization::SubstreamPath & substream_path) { if (filename.empty()) - filename = ISerialization::getFileNameForStream(column, substream_path); + { + auto full_stream_name = ISerialization::getFileNameForStream(column, substream_path); + auto filname = checksums.getFileNameOrHash(full_stream_name); + } }); return filename; } diff --git a/src/Storages/MergeTree/MergeTreeReaderWide.cpp b/src/Storages/MergeTree/MergeTreeReaderWide.cpp index cd641a5cd2a..0ce20dc02f0 100644 --- a/src/Storages/MergeTree/MergeTreeReaderWide.cpp +++ b/src/Storages/MergeTree/MergeTreeReaderWide.cpp @@ -204,7 +204,7 @@ size_t MergeTreeReaderWide::readRows( return read_rows; } -std::optional getStreamName( +String getStreamName( const NameAndTypePair & column, const ISerialization::SubstreamPath & substream_path, const MergeTreeDataPartChecksums & checksums) @@ -226,18 +226,20 @@ void MergeTreeReaderWide::addStreams( { auto stream_name = getStreamName(name_and_type, substream_path, data_part_info_for_read->getChecksums()); - /** If data file is missing then we will not try to open it. - * It is necessary since it allows to add new column to structure of the table without creating new files for old parts. - */ - if (!stream_name) + if (streams.contains(stream_name)) { - has_all_streams = false; + has_any_stream = true; return; } - if (streams.contains(*stream_name)) + bool data_file_exists = data_part_info_for_read->getChecksums().files.contains(stream_name + DATA_FILE_EXTENSION); + + /** If data file is missing then we will not try to open it. + * It is necessary since it allows to add new column to structure of the table without creating new files for old parts. + */ + if (!data_file_exists) { - has_any_stream = true; + has_all_streams = false; return; } @@ -247,10 +249,10 @@ void MergeTreeReaderWide::addStreams( auto context = data_part_info_for_read->getContext(); auto * load_marks_threadpool = settings.read_settings.load_marks_asynchronously ? &context->getLoadMarksThreadpool() : nullptr; - streams.emplace(*stream_name, std::make_unique( - data_part_info_for_read, *stream_name, DATA_FILE_EXTENSION, + streams.emplace(stream_name, std::make_unique( + data_part_info_for_read, stream_name, DATA_FILE_EXTENSION, data_part_info_for_read->getMarksCount(), all_mark_ranges, settings, mark_cache, - uncompressed_cache, data_part_info_for_read->getFileSizeOrZero(*stream_name + DATA_FILE_EXTENSION), + uncompressed_cache, data_part_info_for_read->getFileSizeOrZero(stream_name + DATA_FILE_EXTENSION), &data_part_info_for_read->getIndexGranularityInfo(), profile_callback, clock_type, is_lc_dict, load_marks_threadpool)); }; @@ -278,10 +280,7 @@ static ReadBuffer * getStream( auto stream_name = getStreamName(name_and_type, substream_path, checksums); - if (!stream_name) - return nullptr; - - auto it = streams.find(*stream_name); + auto it = streams.find(stream_name); if (it == streams.end()) return nullptr; @@ -329,13 +328,13 @@ void MergeTreeReaderWide::prefetchForColumn( { auto stream_name = getStreamName(name_and_type, substream_path, data_part_info_for_read->getChecksums()); - if (stream_name && !prefetched_streams.contains(*stream_name)) + if (!prefetched_streams.contains(stream_name)) { bool seek_to_mark = !continue_reading; if (ReadBuffer * buf = getStream(false, substream_path, data_part_info_for_read->getChecksums(), streams, name_and_type, from_mark, seek_to_mark, current_task_last_mark, cache)) { buf->prefetch(priority); - prefetched_streams.insert(*stream_name); + prefetched_streams.insert(stream_name); } } }); diff --git a/src/Storages/MergeTree/MergeTreeSettings.h b/src/Storages/MergeTree/MergeTreeSettings.h index ae4d585e5fe..0d32567d2fa 100644 --- a/src/Storages/MergeTree/MergeTreeSettings.h +++ b/src/Storages/MergeTree/MergeTreeSettings.h @@ -34,8 +34,8 @@ struct Settings; M(UInt64, min_bytes_for_wide_part, 10485760, "Minimal uncompressed size in bytes to create part in wide format instead of compact", 0) \ M(UInt64, min_rows_for_wide_part, 0, "Minimal number of rows to create part in wide format instead of compact", 0) \ M(Float, ratio_of_defaults_for_sparse_serialization, 1.0, "Minimal ratio of number of default values to number of all values in column to store it in sparse serializations. If >= 1, columns will be always written in full serialization.", 0) \ - M(Bool, replace_long_file_name_to_hash, false, "", 0) \ - M(UInt64, max_file_name_length, 128, "", 0) \ + M(Bool, replace_long_file_name_to_hash, false, "If the file name for column is too long (more than 'max_file_name_length' bytes) replace it to SipHash128", 0) \ + M(UInt64, max_file_name_length, 128, "The maximal length of the file name to keep it as is without hashing", 0) \ \ /** Merge settings. */ \ M(UInt64, merge_max_block_size, 8192, "How many rows in blocks should be formed for merge operations. By default has the same value as `index_granularity`.", 0) \ diff --git a/src/Storages/MergeTree/MutateTask.cpp b/src/Storages/MergeTree/MutateTask.cpp index 76096d00641..4bcaea53337 100644 --- a/src/Storages/MergeTree/MutateTask.cpp +++ b/src/Storages/MergeTree/MutateTask.cpp @@ -1,3 +1,4 @@ +#include "Common/SipHash.h" #include #include @@ -591,7 +592,8 @@ static std::unordered_map getStreamCounts( { auto callback = [&](const ISerialization::SubstreamPath & substream_path) { - auto stream_name = ISerialization::getFileNameForStream(column_name, substream_path); + auto full_stream_name = ISerialization::getFileNameForStream(column_name, substream_path); + auto stream_name = data_part->checksums.getFileNameOrHash(full_stream_name); ++stream_counts[stream_name]; }; @@ -705,7 +707,9 @@ static NameToNameVector collectFilesForRenames( { ISerialization::StreamCallback callback = [&](const ISerialization::SubstreamPath & substream_path) { - String stream_name = ISerialization::getFileNameForStream({command.column_name, command.data_type}, substream_path); + auto full_stream_name = ISerialization::getFileNameForStream({command.column_name, command.data_type}, substream_path); + auto stream_name = source_part->checksums.getFileNameOrHash(full_stream_name); + /// Delete files if they are no longer shared with another column. if (--stream_counts[stream_name] == 0) { @@ -724,8 +728,11 @@ static NameToNameVector collectFilesForRenames( ISerialization::StreamCallback callback = [&](const ISerialization::SubstreamPath & substream_path) { - String stream_from = ISerialization::getFileNameForStream(command.column_name, substream_path); - String stream_to = boost::replace_first_copy(stream_from, escaped_name_from, escaped_name_to); + String full_stream_from = ISerialization::getFileNameForStream(command.column_name, substream_path); + String full_stream_to = boost::replace_first_copy(full_stream_from, escaped_name_from, escaped_name_to); + + String stream_from = source_part->checksums.getFileNameOrHash(full_stream_from); + String stream_to = stream_from == full_stream_from ? full_stream_to : sipHash128String(full_stream_to); if (stream_from != stream_to) { diff --git a/src/Storages/MergeTree/checkDataPart.cpp b/src/Storages/MergeTree/checkDataPart.cpp index 00710ed3ed6..561f76d8b5f 100644 --- a/src/Storages/MergeTree/checkDataPart.cpp +++ b/src/Storages/MergeTree/checkDataPart.cpp @@ -11,6 +11,7 @@ #include #include #include +#include namespace CurrentMetrics @@ -30,6 +31,7 @@ namespace ErrorCodes extern const int CANNOT_MUNMAP; extern const int CANNOT_MREMAP; extern const int UNEXPECTED_FILE_IN_DATA_PART; + extern const int NO_FILE_IN_DATA_PART; } @@ -137,7 +139,16 @@ IMergeTreeDataPart::Checksums checkDataPart( { get_serialization(column)->enumerateStreams([&](const ISerialization::SubstreamPath & substream_path) { - String file_name = ISerialization::getFileNameForStream(column, substream_path) + ".bin"; + auto file_name = ISerialization::getFileNameForStream(column, substream_path) + ".bin"; + + if (!data_part_storage.exists(file_name)) + file_name = sipHash128String(file_name); + + if (!data_part_storage.exists(file_name)) + throw Exception(ErrorCodes::NO_FILE_IN_DATA_PART, + "There is no file for column '{}' in data part '{}'", + column.name, data_part->name); + checksums_data.files[file_name] = checksum_compressed_file(data_part_storage, file_name); }); } diff --git a/src/Storages/System/StorageSystemPartsColumns.cpp b/src/Storages/System/StorageSystemPartsColumns.cpp index 00b958b015f..de874b22e7e 100644 --- a/src/Storages/System/StorageSystemPartsColumns.cpp +++ b/src/Storages/System/StorageSystemPartsColumns.cpp @@ -261,16 +261,17 @@ void StorageSystemPartsColumns::processNextStorage( ColumnSize size; NameAndTypePair subcolumn(column.name, name, column.type, data.type); - String file_name = ISerialization::getFileNameForStream(subcolumn, subpath); + String full_stream_name = ISerialization::getFileNameForStream(subcolumn, subpath); + String stream_name = part->checksums.getFileNameOrHash(full_stream_name); - auto bin_checksum = part->checksums.files.find(file_name + ".bin"); + auto bin_checksum = part->checksums.files.find(stream_name + ".bin"); if (bin_checksum != part->checksums.files.end()) { size.data_compressed += bin_checksum->second.file_size; size.data_uncompressed += bin_checksum->second.uncompressed_size; } - auto mrk_checksum = part->checksums.files.find(file_name + part->index_granularity_info.mark_type.getFileExtension()); + auto mrk_checksum = part->checksums.files.find(stream_name + part->index_granularity_info.mark_type.getFileExtension()); if (mrk_checksum != part->checksums.files.end()) size.marks += mrk_checksum->second.file_size;