allow to replace long file names to hashes

This commit is contained in:
Anton Popov 2023-06-06 01:39:45 +00:00
parent eb0e14b870
commit b30544a6ab
10 changed files with 75 additions and 41 deletions

View File

@ -1,4 +1,5 @@
#include "IMergeTreeDataPart.h"
#include "Common/SipHash.h"
#include "Storages/MergeTree/IDataPartStorage.h"
#include <optional>
@ -1015,7 +1016,10 @@ CompressionCodecPtr IMergeTreeDataPart::detectDefaultCompressionCodec() const
{
if (path_to_data_file.empty())
{
String candidate_path = /*fs::path(getRelativePath()) */ (ISerialization::getFileNameForStream(part_column, substream_path) + ".bin");
auto candidate_path = ISerialization::getFileNameForStream(part_column, substream_path) + ".bin";
if (!getDataPartStorage().exists(candidate_path))
candidate_path = sipHash128String(candidate_path) + ".bin";
/// We can have existing, but empty .bin files. Example: LowCardinality(Nullable(...)) columns and column_name.dict.null.bin file.
if (getDataPartStorage().exists(candidate_path) && getDataPartStorage().getFileSize(candidate_path) != 0)

View File

@ -51,7 +51,9 @@ NameSet IMergedBlockOutputStream::removeEmptyColumnsFromPart(
data_part->getSerialization(column.name)->enumerateStreams(
[&](const ISerialization::SubstreamPath & substream_path)
{
++stream_counts[ISerialization::getFileNameForStream(column.name, substream_path)];
auto full_stream_name = ISerialization::getFileNameForStream(column.name, substream_path);
auto stream_name = checksums.getFileNameOrHash(full_stream_name);
++stream_counts[stream_name];
});
}
@ -65,7 +67,9 @@ NameSet IMergedBlockOutputStream::removeEmptyColumnsFromPart(
ISerialization::StreamCallback callback = [&](const ISerialization::SubstreamPath & substream_path)
{
String stream_name = ISerialization::getFileNameForStream(column_name, substream_path);
auto full_stream_name = ISerialization::getFileNameForStream(column_name, substream_path);
auto stream_name = checksums.getFileNameOrHash(full_stream_name);
/// Delete files if they are no longer shared with another column.
if (--stream_counts[stream_name] == 0)
{

View File

@ -341,7 +341,7 @@ MergeTreeDataPartChecksums::Checksum::uint128 MergeTreeDataPartChecksums::getTot
return ret;
}
std::optional<String> MergeTreeDataPartChecksums::getFileNameOrHash(const String & name) const
String MergeTreeDataPartChecksums::getFileNameOrHash(const String & name) const
{
if (files.contains(name + ".bin"))
return name;
@ -350,7 +350,7 @@ std::optional<String> MergeTreeDataPartChecksums::getFileNameOrHash(const String
if (files.contains(hash + ".bin"))
return hash;
return std::nullopt;
return name;
}
void MinimalisticDataPartChecksums::serialize(WriteBuffer & to) const

View File

@ -89,7 +89,7 @@ struct MergeTreeDataPartChecksums
UInt64 getTotalSizeOnDisk() const;
std::optional<String> getFileNameOrHash(const String & name) const;
String getFileNameOrHash(const String & name) const;
};

View File

@ -73,19 +73,20 @@ ColumnSize MergeTreeDataPartWide::getColumnSizeImpl(
getSerialization(column.name)->enumerateStreams([&](const ISerialization::SubstreamPath & substream_path)
{
String file_name = ISerialization::getFileNameForStream(column, substream_path);
auto full_stream_name = ISerialization::getFileNameForStream(column, substream_path);
auto stream_name = checksums.getFileNameOrHash(full_stream_name);
if (processed_substreams && !processed_substreams->insert(file_name).second)
if (processed_substreams && !processed_substreams->insert(stream_name).second)
return;
auto bin_checksum = checksums.files.find(file_name + ".bin");
auto bin_checksum = checksums.files.find(stream_name + ".bin");
if (bin_checksum != checksums.files.end())
{
size.data_compressed += bin_checksum->second.file_size;
size.data_uncompressed += bin_checksum->second.uncompressed_size;
}
auto mrk_checksum = checksums.files.find(file_name + getMarksFileExtension());
auto mrk_checksum = checksums.files.find(stream_name + getMarksFileExtension());
if (mrk_checksum != checksums.files.end())
size.marks += mrk_checksum->second.file_size;
});
@ -185,9 +186,11 @@ void MergeTreeDataPartWide::checkConsistency(bool require_part_metadata) const
{
getSerialization(name_type.name)->enumerateStreams([&](const ISerialization::SubstreamPath & substream_path)
{
String file_name = ISerialization::getFileNameForStream(name_type, substream_path);
String mrk_file_name = file_name + marks_file_extension;
String bin_file_name = file_name + DATA_FILE_EXTENSION;
String full_stream_name = ISerialization::getFileNameForStream(name_type, substream_path);
String stream_name = checksums.getFileNameOrHash(full_stream_name);
String mrk_file_name = stream_name + marks_file_extension;
String bin_file_name = stream_name + DATA_FILE_EXTENSION;
if (!checksums.files.contains(mrk_file_name))
throw Exception(
@ -213,6 +216,8 @@ void MergeTreeDataPartWide::checkConsistency(bool require_part_metadata) const
getSerialization(name_type.name)->enumerateStreams([&](const ISerialization::SubstreamPath & substream_path)
{
auto file_path = ISerialization::getFileNameForStream(name_type, substream_path) + marks_file_extension;
if (!getDataPartStorage().exists(file_path))
file_path = sipHash128String(file_path) + marks_file_extension;
/// Missing file is Ok for case when new column was added.
if (getDataPartStorage().exists(file_path))
@ -266,7 +271,10 @@ String MergeTreeDataPartWide::getFileNameForColumn(const NameAndTypePair & colum
getSerialization(column.name)->enumerateStreams([&](const ISerialization::SubstreamPath & substream_path)
{
if (filename.empty())
filename = ISerialization::getFileNameForStream(column, substream_path);
{
auto full_stream_name = ISerialization::getFileNameForStream(column, substream_path);
auto filname = checksums.getFileNameOrHash(full_stream_name);
}
});
return filename;
}

View File

@ -204,7 +204,7 @@ size_t MergeTreeReaderWide::readRows(
return read_rows;
}
std::optional<String> getStreamName(
String getStreamName(
const NameAndTypePair & column,
const ISerialization::SubstreamPath & substream_path,
const MergeTreeDataPartChecksums & checksums)
@ -226,18 +226,20 @@ void MergeTreeReaderWide::addStreams(
{
auto stream_name = getStreamName(name_and_type, substream_path, data_part_info_for_read->getChecksums());
/** If data file is missing then we will not try to open it.
* It is necessary since it allows to add new column to structure of the table without creating new files for old parts.
*/
if (!stream_name)
if (streams.contains(stream_name))
{
has_all_streams = false;
has_any_stream = true;
return;
}
if (streams.contains(*stream_name))
bool data_file_exists = data_part_info_for_read->getChecksums().files.contains(stream_name + DATA_FILE_EXTENSION);
/** If data file is missing then we will not try to open it.
* It is necessary since it allows to add new column to structure of the table without creating new files for old parts.
*/
if (!data_file_exists)
{
has_any_stream = true;
has_all_streams = false;
return;
}
@ -247,10 +249,10 @@ void MergeTreeReaderWide::addStreams(
auto context = data_part_info_for_read->getContext();
auto * load_marks_threadpool = settings.read_settings.load_marks_asynchronously ? &context->getLoadMarksThreadpool() : nullptr;
streams.emplace(*stream_name, std::make_unique<MergeTreeReaderStream>(
data_part_info_for_read, *stream_name, DATA_FILE_EXTENSION,
streams.emplace(stream_name, std::make_unique<MergeTreeReaderStream>(
data_part_info_for_read, stream_name, DATA_FILE_EXTENSION,
data_part_info_for_read->getMarksCount(), all_mark_ranges, settings, mark_cache,
uncompressed_cache, data_part_info_for_read->getFileSizeOrZero(*stream_name + DATA_FILE_EXTENSION),
uncompressed_cache, data_part_info_for_read->getFileSizeOrZero(stream_name + DATA_FILE_EXTENSION),
&data_part_info_for_read->getIndexGranularityInfo(),
profile_callback, clock_type, is_lc_dict, load_marks_threadpool));
};
@ -278,10 +280,7 @@ static ReadBuffer * getStream(
auto stream_name = getStreamName(name_and_type, substream_path, checksums);
if (!stream_name)
return nullptr;
auto it = streams.find(*stream_name);
auto it = streams.find(stream_name);
if (it == streams.end())
return nullptr;
@ -329,13 +328,13 @@ void MergeTreeReaderWide::prefetchForColumn(
{
auto stream_name = getStreamName(name_and_type, substream_path, data_part_info_for_read->getChecksums());
if (stream_name && !prefetched_streams.contains(*stream_name))
if (!prefetched_streams.contains(stream_name))
{
bool seek_to_mark = !continue_reading;
if (ReadBuffer * buf = getStream(false, substream_path, data_part_info_for_read->getChecksums(), streams, name_and_type, from_mark, seek_to_mark, current_task_last_mark, cache))
{
buf->prefetch(priority);
prefetched_streams.insert(*stream_name);
prefetched_streams.insert(stream_name);
}
}
});

View File

@ -34,8 +34,8 @@ struct Settings;
M(UInt64, min_bytes_for_wide_part, 10485760, "Minimal uncompressed size in bytes to create part in wide format instead of compact", 0) \
M(UInt64, min_rows_for_wide_part, 0, "Minimal number of rows to create part in wide format instead of compact", 0) \
M(Float, ratio_of_defaults_for_sparse_serialization, 1.0, "Minimal ratio of number of default values to number of all values in column to store it in sparse serializations. If >= 1, columns will be always written in full serialization.", 0) \
M(Bool, replace_long_file_name_to_hash, false, "", 0) \
M(UInt64, max_file_name_length, 128, "", 0) \
M(Bool, replace_long_file_name_to_hash, false, "If the file name for column is too long (more than 'max_file_name_length' bytes) replace it to SipHash128", 0) \
M(UInt64, max_file_name_length, 128, "The maximal length of the file name to keep it as is without hashing", 0) \
\
/** Merge settings. */ \
M(UInt64, merge_max_block_size, 8192, "How many rows in blocks should be formed for merge operations. By default has the same value as `index_granularity`.", 0) \

View File

@ -1,3 +1,4 @@
#include "Common/SipHash.h"
#include <Storages/MergeTree/MutateTask.h>
#include <Common/logger_useful.h>
@ -591,7 +592,8 @@ static std::unordered_map<String, size_t> getStreamCounts(
{
auto callback = [&](const ISerialization::SubstreamPath & substream_path)
{
auto stream_name = ISerialization::getFileNameForStream(column_name, substream_path);
auto full_stream_name = ISerialization::getFileNameForStream(column_name, substream_path);
auto stream_name = data_part->checksums.getFileNameOrHash(full_stream_name);
++stream_counts[stream_name];
};
@ -705,7 +707,9 @@ static NameToNameVector collectFilesForRenames(
{
ISerialization::StreamCallback callback = [&](const ISerialization::SubstreamPath & substream_path)
{
String stream_name = ISerialization::getFileNameForStream({command.column_name, command.data_type}, substream_path);
auto full_stream_name = ISerialization::getFileNameForStream({command.column_name, command.data_type}, substream_path);
auto stream_name = source_part->checksums.getFileNameOrHash(full_stream_name);
/// Delete files if they are no longer shared with another column.
if (--stream_counts[stream_name] == 0)
{
@ -724,8 +728,11 @@ static NameToNameVector collectFilesForRenames(
ISerialization::StreamCallback callback = [&](const ISerialization::SubstreamPath & substream_path)
{
String stream_from = ISerialization::getFileNameForStream(command.column_name, substream_path);
String stream_to = boost::replace_first_copy(stream_from, escaped_name_from, escaped_name_to);
String full_stream_from = ISerialization::getFileNameForStream(command.column_name, substream_path);
String full_stream_to = boost::replace_first_copy(full_stream_from, escaped_name_from, escaped_name_to);
String stream_from = source_part->checksums.getFileNameOrHash(full_stream_from);
String stream_to = stream_from == full_stream_from ? full_stream_to : sipHash128String(full_stream_to);
if (stream_from != stream_to)
{

View File

@ -11,6 +11,7 @@
#include <Compression/CompressedReadBuffer.h>
#include <IO/HashingReadBuffer.h>
#include <Common/CurrentMetrics.h>
#include <Common/SipHash.h>
namespace CurrentMetrics
@ -30,6 +31,7 @@ namespace ErrorCodes
extern const int CANNOT_MUNMAP;
extern const int CANNOT_MREMAP;
extern const int UNEXPECTED_FILE_IN_DATA_PART;
extern const int NO_FILE_IN_DATA_PART;
}
@ -137,7 +139,16 @@ IMergeTreeDataPart::Checksums checkDataPart(
{
get_serialization(column)->enumerateStreams([&](const ISerialization::SubstreamPath & substream_path)
{
String file_name = ISerialization::getFileNameForStream(column, substream_path) + ".bin";
auto file_name = ISerialization::getFileNameForStream(column, substream_path) + ".bin";
if (!data_part_storage.exists(file_name))
file_name = sipHash128String(file_name);
if (!data_part_storage.exists(file_name))
throw Exception(ErrorCodes::NO_FILE_IN_DATA_PART,
"There is no file for column '{}' in data part '{}'",
column.name, data_part->name);
checksums_data.files[file_name] = checksum_compressed_file(data_part_storage, file_name);
});
}

View File

@ -261,16 +261,17 @@ void StorageSystemPartsColumns::processNextStorage(
ColumnSize size;
NameAndTypePair subcolumn(column.name, name, column.type, data.type);
String file_name = ISerialization::getFileNameForStream(subcolumn, subpath);
String full_stream_name = ISerialization::getFileNameForStream(subcolumn, subpath);
String stream_name = part->checksums.getFileNameOrHash(full_stream_name);
auto bin_checksum = part->checksums.files.find(file_name + ".bin");
auto bin_checksum = part->checksums.files.find(stream_name + ".bin");
if (bin_checksum != part->checksums.files.end())
{
size.data_compressed += bin_checksum->second.file_size;
size.data_uncompressed += bin_checksum->second.uncompressed_size;
}
auto mrk_checksum = part->checksums.files.find(file_name + part->index_granularity_info.mark_type.getFileExtension());
auto mrk_checksum = part->checksums.files.find(stream_name + part->index_granularity_info.mark_type.getFileExtension());
if (mrk_checksum != part->checksums.files.end())
size.marks += mrk_checksum->second.file_size;