Merge pull request #50612 from CurtizJ/long-column-names

Allow to replace long file names to hashes
This commit is contained in:
Anton Popov 2023-09-26 13:05:33 +02:00 committed by GitHub
commit 6cd32eb7b5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
41 changed files with 549 additions and 187 deletions

View File

@ -555,7 +555,7 @@ Merge reads rows from parts in blocks of `merge_max_block_size` rows, then merge
## number_of_free_entries_in_pool_to_lower_max_size_of_merge {#number-of-free-entries-in-pool-to-lower-max-size-of-merge}
When there is less than specified number of free entries in pool (or replicated queue), start to lower maximum size of merge to process (or to put in queue).
When there is less than specified number of free entries in pool (or replicated queue), start to lower maximum size of merge to process (or to put in queue).
This is to allow small merges to process - not filling the pool with long running merges.
Possible values:
@ -566,7 +566,7 @@ Default value: 8
## number_of_free_entries_in_pool_to_execute_mutation {#number-of-free-entries-in-pool-to-execute-mutation}
When there is less than specified number of free entries in pool, do not execute part mutations.
When there is less than specified number of free entries in pool, do not execute part mutations.
This is to leave free threads for regular merges and avoid "Too many parts".
Possible values:
@ -845,6 +845,13 @@ You can see which parts of `s` were stored using the sparse serialization:
└────────┴────────────────────┘
```
## replace_long_file_name_to_hash {#replace_long_file_name_to_hash}
If the file name for column is too long (more than `max_file_name_length` bytes) replace it to SipHash128. Default value: `false`.
## max_file_name_length {#max_file_name_length}
The maximal length of the file name to keep it as is without hashing. Takes effect only if setting `replace_long_file_name_to_hash` is enabled. The value of this setting does not include the length of file extension. So, it is recommended to set it below the maximum filename length (usually 255 bytes) with some gap to avoid filesystem errors. Default value: 127.
## clean_deleted_rows
Enable/disable automatic deletion of rows flagged as `is_deleted` when perform `OPTIMIZE ... FINAL` on a table using the ReplacingMergeTree engine. When disabled, the `CLEANUP` keyword has to be added to the `OPTIMIZE ... FINAL` to have the same behaviour.

View File

@ -22,6 +22,7 @@
#include <base/extended_types.h>
#include <base/types.h>
#include <base/unaligned.h>
#include <base/hex.h>
#include <Common/Exception.h>
#include <city.h>
@ -257,6 +258,16 @@ inline UInt128 sipHash128(const char * data, const size_t size)
return sipHash128Keyed(0, 0, data, size);
}
inline String sipHash128String(const char * data, const size_t size)
{
return getHexUIntLowercase(sipHash128(data, size));
}
inline String sipHash128String(const String & str)
{
return sipHash128String(str.data(), str.size());
}
inline UInt128 sipHash128ReferenceKeyed(UInt64 key0, UInt64 key1, const char * data, const size_t size)
{
SipHash hash(key0, key1, true);

View File

@ -164,7 +164,7 @@ NameToDataType getSubcolumnsOfNested(const NamesAndTypesList & names_and_types)
std::unordered_map<String, NamesAndTypesList> nested;
for (const auto & name_type : names_and_types)
{
const DataTypeArray * type_arr = typeid_cast<const DataTypeArray *>(name_type.type.get());
const auto * type_arr = typeid_cast<const DataTypeArray *>(name_type.type.get());
/// Ignore true Nested type, but try to unite flatten arrays to Nested type.
if (!isNested(name_type.type) && type_arr)
@ -191,8 +191,11 @@ NamesAndTypesList collect(const NamesAndTypesList & names_and_types)
auto nested_types = getSubcolumnsOfNested(names_and_types);
for (const auto & name_type : names_and_types)
if (!isArray(name_type.type) || !nested_types.contains(splitName(name_type.name).first))
{
auto split = splitName(name_type.name);
if (!isArray(name_type.type) || split.second.empty() || !nested_types.contains(split.first))
res.push_back(name_type);
}
for (const auto & name_type : nested_types)
res.emplace_back(name_type.first, name_type.second);

View File

@ -370,6 +370,7 @@ public:
static String getFileNameForStream(const NameAndTypePair & column, const SubstreamPath & path);
static String getFileNameForStream(const String & name_in_storage, const SubstreamPath & path);
static String getSubcolumnNameForStream(const SubstreamPath & path);
static String getSubcolumnNameForStream(const SubstreamPath & path, size_t prefix_len);

View File

@ -1032,11 +1032,14 @@ CompressionCodecPtr IMergeTreeDataPart::detectDefaultCompressionCodec() const
{
if (path_to_data_file.empty())
{
String candidate_path = /*fs::path(getRelativePath()) */ (ISerialization::getFileNameForStream(part_column, substream_path) + ".bin");
auto stream_name = getStreamNameForColumn(part_column, substream_path, ".bin", getDataPartStorage());
if (!stream_name)
return;
auto file_name = *stream_name + ".bin";
/// We can have existing, but empty .bin files. Example: LowCardinality(Nullable(...)) columns and column_name.dict.null.bin file.
if (getDataPartStorage().exists(candidate_path) && getDataPartStorage().getFileSize(candidate_path) != 0)
path_to_data_file = candidate_path;
if (getDataPartStorage().getFileSize(file_name) != 0)
path_to_data_file = file_name;
}
});
@ -1321,8 +1324,8 @@ void IMergeTreeDataPart::loadColumns(bool require)
auto metadata_snapshot = storage.getInMemoryMetadataPtr();
if (parent_part)
metadata_snapshot = metadata_snapshot->projections.get(name).metadata;
NamesAndTypesList loaded_columns;
NamesAndTypesList loaded_columns;
bool is_readonly_storage = getDataPartStorage().isReadonly();
if (!metadata_manager->exists("columns.txt"))
@ -1334,7 +1337,7 @@ void IMergeTreeDataPart::loadColumns(bool require)
/// If there is no file with a list of columns, write it down.
for (const NameAndTypePair & column : metadata_snapshot->getColumns().getAllPhysical())
if (getDataPartStorage().exists(getFileNameForColumn(column) + ".bin"))
if (getFileNameForColumn(column))
loaded_columns.push_back(column);
if (columns.empty())
@ -2064,6 +2067,73 @@ String IMergeTreeDataPart::getZeroLevelPartBlockID(std::string_view token) const
return info.partition_id + "_" + toString(hash_value.items[0]) + "_" + toString(hash_value.items[1]);
}
std::optional<String> IMergeTreeDataPart::getStreamNameOrHash(
const String & stream_name,
const Checksums & checksums_)
{
if (checksums_.files.contains(stream_name + ".bin"))
return stream_name;
auto hash = sipHash128String(stream_name);
if (checksums_.files.contains(hash + ".bin"))
return hash;
return {};
}
std::optional<String> IMergeTreeDataPart::getStreamNameOrHash(
const String & stream_name,
const String & extension,
const IDataPartStorage & storage_)
{
if (storage_.exists(stream_name + extension))
return stream_name;
auto hash = sipHash128String(stream_name);
if (storage_.exists(hash + extension))
return hash;
return {};
}
std::optional<String> IMergeTreeDataPart::getStreamNameForColumn(
const String & column_name,
const ISerialization::SubstreamPath & substream_path,
const Checksums & checksums_)
{
auto stream_name = ISerialization::getFileNameForStream(column_name, substream_path);
return getStreamNameOrHash(stream_name, checksums_);
}
std::optional<String> IMergeTreeDataPart::getStreamNameForColumn(
const NameAndTypePair & column,
const ISerialization::SubstreamPath & substream_path,
const Checksums & checksums_)
{
auto stream_name = ISerialization::getFileNameForStream(column, substream_path);
return getStreamNameOrHash(stream_name, checksums_);
}
std::optional<String> IMergeTreeDataPart::getStreamNameForColumn(
const String & column_name,
const ISerialization::SubstreamPath & substream_path,
const String & extension,
const IDataPartStorage & storage_)
{
auto stream_name = ISerialization::getFileNameForStream(column_name, substream_path);
return getStreamNameOrHash(stream_name, extension, storage_);
}
std::optional<String> IMergeTreeDataPart::getStreamNameForColumn(
const NameAndTypePair & column,
const ISerialization::SubstreamPath & substream_path,
const String & extension,
const IDataPartStorage & storage_)
{
auto stream_name = ISerialization::getFileNameForStream(column, substream_path);
return getStreamNameOrHash(stream_name, extension, storage_);
}
bool isCompactPart(const MergeTreeDataPartPtr & data_part)
{
return (data_part && data_part->getType() == MergeTreeDataPartType::Compact);

View File

@ -131,7 +131,7 @@ public:
/// Return information about secondary indexes size on disk for all indexes in part
IndexSize getTotalSeconaryIndicesSize() const { return total_secondary_indices_size; }
virtual String getFileNameForColumn(const NameAndTypePair & column) const = 0;
virtual std::optional<String> getFileNameForColumn(const NameAndTypePair & column) const = 0;
virtual ~IMergeTreeDataPart();
@ -501,6 +501,37 @@ public:
/// This one is about removing file with version of part's metadata (columns, pk and so on)
void removeMetadataVersion();
static std::optional<String> getStreamNameOrHash(
const String & name,
const IMergeTreeDataPart::Checksums & checksums);
static std::optional<String> getStreamNameOrHash(
const String & name,
const String & extension,
const IDataPartStorage & storage_);
static std::optional<String> getStreamNameForColumn(
const String & column_name,
const ISerialization::SubstreamPath & substream_path,
const Checksums & checksums_);
static std::optional<String> getStreamNameForColumn(
const NameAndTypePair & column,
const ISerialization::SubstreamPath & substream_path,
const Checksums & checksums_);
static std::optional<String> getStreamNameForColumn(
const String & column_name,
const ISerialization::SubstreamPath & substream_path,
const String & extension,
const IDataPartStorage & storage_);
static std::optional<String> getStreamNameForColumn(
const NameAndTypePair & column,
const ISerialization::SubstreamPath & substream_path,
const String & extension,
const IDataPartStorage & storage_);
mutable std::atomic<DataPartRemovalState> removal_state = DataPartRemovalState::NOT_ATTEMPTED;
mutable std::atomic<time_t> last_removal_attempt_time = 0;

View File

@ -32,7 +32,7 @@ public:
virtual void write(const Block & block, const IColumn::Permutation * permutation) = 0;
virtual void fillChecksums(IMergeTreeDataPart::Checksums & checksums) = 0;
virtual void fillChecksums(IMergeTreeDataPart::Checksums & checksums, NameSet & checksums_to_remove) = 0;
virtual void finish(bool sync) = 0;

View File

@ -51,7 +51,9 @@ NameSet IMergedBlockOutputStream::removeEmptyColumnsFromPart(
data_part->getSerialization(column.name)->enumerateStreams(
[&](const ISerialization::SubstreamPath & substream_path)
{
++stream_counts[ISerialization::getFileNameForStream(column.name, substream_path)];
auto stream_name = IMergeTreeDataPart::getStreamNameForColumn(column, substream_path, checksums);
if (stream_name)
++stream_counts[*stream_name];
});
}
@ -65,12 +67,13 @@ NameSet IMergedBlockOutputStream::removeEmptyColumnsFromPart(
ISerialization::StreamCallback callback = [&](const ISerialization::SubstreamPath & substream_path)
{
String stream_name = ISerialization::getFileNameForStream(column_name, substream_path);
auto stream_name = IMergeTreeDataPart::getStreamNameForColumn(column_name, substream_path, checksums);
/// Delete files if they are no longer shared with another column.
if (--stream_counts[stream_name] == 0)
if (stream_name && --stream_counts[*stream_name] == 0)
{
remove_files.emplace(stream_name + ".bin");
remove_files.emplace(stream_name + mrk_extension);
remove_files.emplace(*stream_name + ".bin");
remove_files.emplace(*stream_name + mrk_extension);
}
};

View File

@ -399,6 +399,7 @@ MergeTreeData::MergeTreeData(
settings->check_sample_column_is_correct && !attach);
}
checkColumnFilenamesForCollision(metadata_.getColumns(), *settings, !attach);
checkTTLExpressions(metadata_, metadata_);
String reason;
@ -3351,6 +3352,7 @@ void MergeTreeData::checkAlterIsPossible(const AlterCommands & commands, Context
}
}
checkColumnFilenamesForCollision(new_metadata, /*throw_on_error=*/ true);
checkProperties(new_metadata, old_metadata, false, false, allow_nullable_key, local_context);
checkTTLExpressions(new_metadata, old_metadata);
@ -7445,6 +7447,73 @@ bool MergeTreeData::canUseParallelReplicasBasedOnPKAnalysis(
return decision;
}
void MergeTreeData::checkColumnFilenamesForCollision(const StorageInMemoryMetadata & metadata, bool throw_on_error) const
{
auto settings = getDefaultSettings();
if (metadata.settings_changes)
{
const auto & changes = metadata.settings_changes->as<const ASTSetQuery &>().changes;
settings->applyChanges(changes);
}
checkColumnFilenamesForCollision(metadata.getColumns(), *settings, throw_on_error);
}
void MergeTreeData::checkColumnFilenamesForCollision(const ColumnsDescription & columns, const MergeTreeSettings & settings, bool throw_on_error) const
{
std::unordered_map<String, std::pair<String, String>> stream_name_to_full_name;
auto columns_list = Nested::collect(columns.getAllPhysical());
for (const auto & column : columns_list)
{
std::unordered_map<String, String> column_streams;
auto callback = [&](const auto & substream_path)
{
String stream_name;
auto full_stream_name = ISerialization::getFileNameForStream(column, substream_path);
if (settings.replace_long_file_name_to_hash && full_stream_name.size() > settings.max_file_name_length)
stream_name = sipHash128String(full_stream_name);
else
stream_name = full_stream_name;
column_streams.emplace(stream_name, full_stream_name);
};
auto serialization = column.type->getDefaultSerialization();
serialization->enumerateStreams(callback);
if (column.type->supportsSparseSerialization() && settings.ratio_of_defaults_for_sparse_serialization < 1.0)
{
auto sparse_serialization = column.type->getSparseSerialization();
sparse_serialization->enumerateStreams(callback);
}
for (const auto & [stream_name, full_stream_name] : column_streams)
{
auto [it, inserted] = stream_name_to_full_name.emplace(stream_name, std::pair{full_stream_name, column.name});
if (!inserted)
{
const auto & [other_full_name, other_column_name] = it->second;
auto other_type = columns.getPhysical(other_column_name).type;
auto message = fmt::format(
"Columns '{} {}' and '{} {}' have streams ({} and {}) with collision in file name {}",
column.name, column.type->getName(), other_column_name, other_type->getName(), full_stream_name, other_full_name, stream_name);
if (settings.replace_long_file_name_to_hash)
message += ". It may be a collision between a filename for one column and a hash of filename for another column (see setting 'replace_long_file_name_to_hash')";
if (throw_on_error)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "{}", message);
LOG_ERROR(log, "Table definition is incorrect. {}. It may lead to corruption of data or crashes. You need to resolve it manually", message);
return;
}
}
}
}
MergeTreeData & MergeTreeData::checkStructureAndGetMergeTreeData(IStorage & source_table, const StorageMetadataPtr & src_snapshot, const StorageMetadataPtr & my_snapshot) const
{

View File

@ -1585,6 +1585,9 @@ private:
ContextPtr query_context,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info) const;
void checkColumnFilenamesForCollision(const StorageInMemoryMetadata & metadata, bool throw_on_error) const;
void checkColumnFilenamesForCollision(const ColumnsDescription & columns, const MergeTreeSettings & settings, bool throw_on_error) const;
};
/// RAII struct to record big parts that are submerging or emerging.

View File

@ -8,6 +8,7 @@
#include <Compression/CompressedReadBuffer.h>
#include <Compression/CompressedWriteBuffer.h>
#include <Storages/MergeTree/IDataPartStorage.h>
#include <optional>
namespace DB
@ -67,44 +68,35 @@ void MergeTreeDataPartChecksum::checkSize(const IDataPartStorage & storage, cons
void MergeTreeDataPartChecksums::checkEqual(const MergeTreeDataPartChecksums & rhs, bool have_uncompressed) const
{
for (const auto & it : rhs.files)
{
const String & name = it.first;
for (const auto & [name, _] : rhs.files)
if (!files.contains(name))
throw Exception(ErrorCodes::UNEXPECTED_FILE_IN_DATA_PART, "Unexpected file {} in data part", name);
}
for (const auto & it : files)
for (const auto & [name, checksum] : files)
{
const String & name = it.first;
/// Exclude files written by inverted index from check. No correct checksums are available for them currently.
if (name.ends_with(".gin_dict") || name.ends_with(".gin_post") || name.ends_with(".gin_seg") || name.ends_with(".gin_sid"))
continue;
auto jt = rhs.files.find(name);
if (jt == rhs.files.end())
auto it = rhs.files.find(name);
if (it == rhs.files.end())
throw Exception(ErrorCodes::NO_FILE_IN_DATA_PART, "No file {} in data part", name);
it.second.checkEqual(jt->second, have_uncompressed, name);
checksum.checkEqual(it->second, have_uncompressed, name);
}
}
void MergeTreeDataPartChecksums::checkSizes(const IDataPartStorage & storage) const
{
for (const auto & it : files)
{
const String & name = it.first;
it.second.checkSize(storage, name);
}
for (const auto & [name, checksum] : files)
checksum.checkSize(storage, name);
}
UInt64 MergeTreeDataPartChecksums::getTotalSizeOnDisk() const
{
UInt64 res = 0;
for (const auto & it : files)
res += it.second.file_size;
for (const auto & [_, checksum] : files)
res += checksum.file_size;
return res;
}
@ -218,11 +210,8 @@ void MergeTreeDataPartChecksums::write(WriteBuffer & to) const
writeVarUInt(files.size(), out);
for (const auto & it : files)
for (const auto & [name, sum] : files)
{
const String & name = it.first;
const Checksum & sum = it.second;
writeStringBinary(name, out);
writeVarUInt(sum.file_size, out);
writeBinaryLittleEndian(sum.file_hash, out);
@ -255,11 +244,8 @@ void MergeTreeDataPartChecksums::add(MergeTreeDataPartChecksums && rhs_checksums
void MergeTreeDataPartChecksums::computeTotalChecksumDataOnly(SipHash & hash) const
{
/// We use fact that iteration is in deterministic (lexicographical) order.
for (const auto & it : files)
for (const auto & [name, sum] : files)
{
const String & name = it.first;
const Checksum & sum = it.second;
if (!endsWith(name, ".bin"))
continue;

View File

@ -90,7 +90,6 @@ struct MergeTreeDataPartChecksums
UInt64 getTotalSizeOnDisk() const;
};
/// A kind of MergeTreeDataPartChecksums intended to be stored in ZooKeeper (to save its RAM)
/// MinimalisticDataPartChecksums and MergeTreeDataPartChecksums have the same serialization format
/// for versions less than MINIMAL_VERSION_WITH_MINIMALISTIC_CHECKSUMS.

View File

@ -57,7 +57,7 @@ public:
std::optional<time_t> getColumnModificationTime(const String & column_name) const override;
String getFileNameForColumn(const NameAndTypePair & /* column */) const override { return DATA_FILE_NAME; }
std::optional<String> getFileNameForColumn(const NameAndTypePair & /* column */) const override { return DATA_FILE_NAME; }
~MergeTreeDataPartCompact() override;

View File

@ -40,7 +40,7 @@ public:
bool isStoredOnRemoteDisk() const override { return false; }
bool isStoredOnRemoteDiskWithZeroCopySupport() const override { return false; }
bool hasColumnFiles(const NameAndTypePair & column) const override { return !!getColumnPosition(column.getNameInStorage()); }
String getFileNameForColumn(const NameAndTypePair & /* column */) const override { return ""; }
std::optional<String> getFileNameForColumn(const NameAndTypePair & /* column */) const override { return ""; }
void renameTo(const String & new_relative_path, bool remove_new_dir_if_exists) override;
DataPartStoragePtr makeCloneInDetached(const String & prefix, const StorageMetadataPtr & metadata_snapshot,
const DiskTransactionPtr & disk_transaction) const override;

View File

@ -73,19 +73,22 @@ ColumnSize MergeTreeDataPartWide::getColumnSizeImpl(
getSerialization(column.name)->enumerateStreams([&](const ISerialization::SubstreamPath & substream_path)
{
String file_name = ISerialization::getFileNameForStream(column, substream_path);
auto stream_name = IMergeTreeDataPart::getStreamNameForColumn(column, substream_path, checksums);
if (processed_substreams && !processed_substreams->insert(file_name).second)
if (!stream_name)
return;
auto bin_checksum = checksums.files.find(file_name + ".bin");
if (processed_substreams && !processed_substreams->insert(*stream_name).second)
return;
auto bin_checksum = checksums.files.find(*stream_name + ".bin");
if (bin_checksum != checksums.files.end())
{
size.data_compressed += bin_checksum->second.file_size;
size.data_uncompressed += bin_checksum->second.uncompressed_size;
}
auto mrk_checksum = checksums.files.find(file_name + getMarksFileExtension());
auto mrk_checksum = checksums.files.find(*stream_name + getMarksFileExtension());
if (mrk_checksum != checksums.files.end())
size.marks += mrk_checksum->second.file_size;
});
@ -153,7 +156,13 @@ void MergeTreeDataPartWide::loadIndexGranularity()
if (columns.empty())
throw Exception(ErrorCodes::NO_FILE_IN_DATA_PART, "No columns in part {}", name);
loadIndexGranularityImpl(index_granularity, index_granularity_info, getDataPartStorage(), getFileNameForColumn(columns.front()));
auto any_column_filename = getFileNameForColumn(columns.front());
if (!any_column_filename)
throw Exception(ErrorCodes::NO_FILE_IN_DATA_PART,
"There are no files for column {} in part {}",
columns.front().name, getDataPartStorage().getFullPath());
loadIndexGranularityImpl(index_granularity, index_granularity_info, getDataPartStorage(), *any_column_filename);
}
@ -185,21 +194,19 @@ void MergeTreeDataPartWide::checkConsistency(bool require_part_metadata) const
{
getSerialization(name_type.name)->enumerateStreams([&](const ISerialization::SubstreamPath & substream_path)
{
String file_name = ISerialization::getFileNameForStream(name_type, substream_path);
String mrk_file_name = file_name + marks_file_extension;
String bin_file_name = file_name + DATA_FILE_EXTENSION;
auto stream_name = getStreamNameForColumn(name_type, substream_path, checksums);
if (!stream_name)
throw Exception(
ErrorCodes::NO_FILE_IN_DATA_PART,
"No {}.{} file checksum for column {} in part {}",
*stream_name, DATA_FILE_EXTENSION, name_type.name, getDataPartStorage().getFullPath());
auto mrk_file_name = *stream_name + marks_file_extension;
if (!checksums.files.contains(mrk_file_name))
throw Exception(
ErrorCodes::NO_FILE_IN_DATA_PART,
"No {} file checksum for column {} in part {} ",
mrk_file_name, name_type.name, getDataPartStorage().getFullPath());
if (!checksums.files.contains(bin_file_name))
throw Exception(
ErrorCodes::NO_FILE_IN_DATA_PART,
"No {} file checksum for column {} in part {}",
bin_file_name, name_type.name, getDataPartStorage().getFullPath());
});
}
}
@ -212,27 +219,28 @@ void MergeTreeDataPartWide::checkConsistency(bool require_part_metadata) const
{
getSerialization(name_type.name)->enumerateStreams([&](const ISerialization::SubstreamPath & substream_path)
{
auto file_path = ISerialization::getFileNameForStream(name_type, substream_path) + marks_file_extension;
auto stream_name = getStreamNameForColumn(name_type, substream_path, marks_file_extension, getDataPartStorage());
/// Missing file is Ok for case when new column was added.
if (getDataPartStorage().exists(file_path))
{
UInt64 file_size = getDataPartStorage().getFileSize(file_path);
if (!stream_name)
return;
if (!file_size)
throw Exception(
ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART,
"Part {} is broken: {} is empty.",
getDataPartStorage().getFullPath(),
std::string(fs::path(getDataPartStorage().getFullPath()) / file_path));
auto file_path = *stream_name + marks_file_extension;
UInt64 file_size = getDataPartStorage().getFileSize(file_path);
if (!marks_size)
marks_size = file_size;
else if (file_size != *marks_size)
throw Exception(
ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART,
"Part {} is broken: marks have different sizes.", getDataPartStorage().getFullPath());
}
if (!file_size)
throw Exception(
ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART,
"Part {} is broken: {} is empty.",
getDataPartStorage().getFullPath(),
std::string(fs::path(getDataPartStorage().getFullPath()) / file_path));
if (!marks_size)
marks_size = file_size;
else if (file_size != *marks_size)
throw Exception(
ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART,
"Part {} is broken: marks have different sizes.", getDataPartStorage().getFullPath());
});
}
}
@ -240,20 +248,13 @@ void MergeTreeDataPartWide::checkConsistency(bool require_part_metadata) const
bool MergeTreeDataPartWide::hasColumnFiles(const NameAndTypePair & column) const
{
std::string marks_file_extension = index_granularity_info.mark_type.getFileExtension();
auto check_stream_exists = [this, &marks_file_extension](const String & stream_name)
{
auto bin_checksum = checksums.files.find(stream_name + DATA_FILE_EXTENSION);
auto mrk_checksum = checksums.files.find(stream_name + marks_file_extension);
return bin_checksum != checksums.files.end() && mrk_checksum != checksums.files.end();
};
auto marks_file_extension = index_granularity_info.mark_type.getFileExtension();
bool res = true;
getSerialization(column.name)->enumerateStreams([&](const auto & substream_path)
{
String file_name = ISerialization::getFileNameForStream(column, substream_path);
if (!check_stream_exists(file_name))
auto stream_name = getStreamNameForColumn(column, substream_path, checksums);
if (!stream_name || !checksums.files.contains(*stream_name + marks_file_extension))
res = false;
});
@ -264,7 +265,11 @@ std::optional<time_t> MergeTreeDataPartWide::getColumnModificationTime(const Str
{
try
{
return getDataPartStorage().getFileLastModified(column_name + DATA_FILE_EXTENSION).epochTime();
auto stream_name = getStreamNameOrHash(column_name, checksums);
if (!stream_name)
return {};
return getDataPartStorage().getFileLastModified(*stream_name + DATA_FILE_EXTENSION).epochTime();
}
catch (const fs::filesystem_error &)
{
@ -272,13 +277,19 @@ std::optional<time_t> MergeTreeDataPartWide::getColumnModificationTime(const Str
}
}
String MergeTreeDataPartWide::getFileNameForColumn(const NameAndTypePair & column) const
std::optional<String> MergeTreeDataPartWide::getFileNameForColumn(const NameAndTypePair & column) const
{
String filename;
std::optional<String> filename;
getSerialization(column.name)->enumerateStreams([&](const ISerialization::SubstreamPath & substream_path)
{
if (filename.empty())
filename = ISerialization::getFileNameForStream(column, substream_path);
if (!filename.has_value())
{
/// This method may be called when checksums are not initialized yet.
if (!checksums.empty())
filename = getStreamNameForColumn(column, substream_path, checksums);
else
filename = getStreamNameForColumn(column, substream_path, DATA_FILE_EXTENSION, getDataPartStorage());
}
});
return filename;
}

View File

@ -48,7 +48,7 @@ public:
bool isStoredOnRemoteDiskWithZeroCopySupport() const override;
String getFileNameForColumn(const NameAndTypePair & column) const override;
std::optional<String> getFileNameForColumn(const NameAndTypePair & column) const override;
~MergeTreeDataPartWide() override;

View File

@ -423,7 +423,7 @@ size_t MergeTreeDataPartWriterCompact::ColumnsBuffer::size() const
return accumulated_columns.at(0)->size();
}
void MergeTreeDataPartWriterCompact::fillChecksums(IMergeTreeDataPart::Checksums & checksums)
void MergeTreeDataPartWriterCompact::fillChecksums(IMergeTreeDataPart::Checksums & checksums, NameSet & /*checksums_to_remove*/)
{
// If we don't have anything to write, skip finalization.
if (!columns_list.empty())

View File

@ -22,7 +22,7 @@ public:
void write(const Block & block, const IColumn::Permutation * permutation) override;
void fillChecksums(IMergeTreeDataPart::Checksums & checksums) override;
void fillChecksums(IMergeTreeDataPart::Checksums & checksums, NameSet & checksums_to_remove) override;
void finish(bool sync) override;
private:

View File

@ -76,7 +76,7 @@ void MergeTreeDataPartWriterInMemory::calculateAndSerializePrimaryIndex(const Bl
}
}
void MergeTreeDataPartWriterInMemory::fillChecksums(IMergeTreeDataPart::Checksums & checksums)
void MergeTreeDataPartWriterInMemory::fillChecksums(IMergeTreeDataPart::Checksums & checksums, NameSet & /*checksums_to_remove*/)
{
/// If part is empty we still need to initialize block by empty columns.
if (!part_in_memory->block)

View File

@ -18,7 +18,7 @@ public:
/// You can write only one block. In-memory part can be written only at INSERT.
void write(const Block & block, const IColumn::Permutation * permutation) override;
void fillChecksums(IMergeTreeDataPart::Checksums & checksums) override;
void fillChecksums(IMergeTreeDataPart::Checksums & checksums, NameSet & checksums_to_remove) override;
void finish(bool /*sync*/) override {}
private:

View File

@ -15,6 +15,7 @@ namespace DB
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int INCORRECT_FILE_NAME;
}
namespace
@ -107,7 +108,22 @@ void MergeTreeDataPartWriterWide::addStreams(
ISerialization::StreamCallback callback = [&](const auto & substream_path)
{
assert(!substream_path.empty());
String stream_name = ISerialization::getFileNameForStream(column, substream_path);
auto storage_settings = storage.getSettings();
auto full_stream_name = ISerialization::getFileNameForStream(column, substream_path);
String stream_name;
if (storage_settings->replace_long_file_name_to_hash && full_stream_name.size() > storage_settings->max_file_name_length)
stream_name = sipHash128String(full_stream_name);
else
stream_name = full_stream_name;
auto it = stream_name_to_full_name.find(stream_name);
if (it != stream_name_to_full_name.end() && it->second != full_stream_name)
throw Exception(ErrorCodes::INCORRECT_FILE_NAME,
"Stream with name {} already created (full stream name: {}). Current full stream name: {}."
" It is a collision between a filename for one column and a hash of filename for another column or a bug",
stream_name, it->second, full_stream_name);
/// Shared offsets for Nested type.
if (column_streams.contains(stream_name))
@ -136,12 +152,22 @@ void MergeTreeDataPartWriterWide::addStreams(
marks_compression_codec,
settings.marks_compress_block_size,
settings.query_write_settings);
full_name_to_stream_name.emplace(full_stream_name, stream_name);
stream_name_to_full_name.emplace(stream_name, full_stream_name);
};
ISerialization::SubstreamPath path;
data_part->getSerialization(column.name)->enumerateStreams(callback, column.type);
}
const String & MergeTreeDataPartWriterWide::getStreamName(
const NameAndTypePair & column,
const ISerialization::SubstreamPath & substream_path) const
{
auto full_stream_name = ISerialization::getFileNameForStream(column, substream_path);
return full_name_to_stream_name.at(full_stream_name);
}
ISerialization::OutputStreamGetter MergeTreeDataPartWriterWide::createStreamGetter(
const NameAndTypePair & column, WrittenOffsetColumns & offset_columns) const
@ -149,8 +175,7 @@ ISerialization::OutputStreamGetter MergeTreeDataPartWriterWide::createStreamGett
return [&, this] (const ISerialization::SubstreamPath & substream_path) -> WriteBuffer *
{
bool is_offsets = !substream_path.empty() && substream_path.back().type == ISerialization::Substream::ArraySizes;
String stream_name = ISerialization::getFileNameForStream(column, substream_path);
auto stream_name = getStreamName(column, substream_path);
/// Don't write offsets more than one time for Nested type.
if (is_offsets && offset_columns.contains(stream_name))
@ -299,8 +324,7 @@ StreamsWithMarks MergeTreeDataPartWriterWide::getCurrentMarksForColumn(
data_part->getSerialization(column.name)->enumerateStreams([&] (const ISerialization::SubstreamPath & substream_path)
{
bool is_offsets = !substream_path.empty() && substream_path.back().type == ISerialization::Substream::ArraySizes;
String stream_name = ISerialization::getFileNameForStream(column, substream_path);
auto stream_name = getStreamName(column, substream_path);
/// Don't write offsets more than one time for Nested type.
if (is_offsets && offset_columns.contains(stream_name))
@ -338,14 +362,13 @@ void MergeTreeDataPartWriterWide::writeSingleGranule(
serialization->enumerateStreams([&] (const ISerialization::SubstreamPath & substream_path)
{
bool is_offsets = !substream_path.empty() && substream_path.back().type == ISerialization::Substream::ArraySizes;
String stream_name = ISerialization::getFileNameForStream(name_and_type, substream_path);
auto stream_name = getStreamName(name_and_type, substream_path);
/// Don't write offsets more than one time for Nested type.
if (is_offsets && offset_columns.contains(stream_name))
return;
column_streams[stream_name]->compressed_hashing.nextIfAtEnd();
column_streams.at(stream_name)->compressed_hashing.nextIfAtEnd();
});
}
@ -416,10 +439,7 @@ void MergeTreeDataPartWriterWide::writeColumn(
{
bool is_offsets = !substream_path.empty() && substream_path.back().type == ISerialization::Substream::ArraySizes;
if (is_offsets)
{
String stream_name = ISerialization::getFileNameForStream(name_and_type, substream_path);
offset_columns.insert(stream_name);
}
offset_columns.insert(getStreamName(name_and_type, substream_path));
});
}
@ -561,7 +581,7 @@ void MergeTreeDataPartWriterWide::validateColumnOfFixedSize(const NameAndTypePai
}
void MergeTreeDataPartWriterWide::fillDataChecksums(IMergeTreeDataPart::Checksums & checksums)
void MergeTreeDataPartWriterWide::fillDataChecksums(IMergeTreeDataPart::Checksums & checksums, NameSet & checksums_to_remove)
{
const auto & global_settings = storage.getContext()->getSettingsRef();
ISerialization::SerializeBinaryBulkSettings serialize_settings;
@ -597,10 +617,19 @@ void MergeTreeDataPartWriterWide::fillDataChecksums(IMergeTreeDataPart::Checksum
}
}
for (auto & stream : column_streams)
for (auto & [stream_name, stream] : column_streams)
{
stream.second->preFinalize();
stream.second->addToChecksums(checksums);
/// Remove checksums for old stream name if file was
/// renamed due to replacing the name to the hash of name.
const auto & full_stream_name = stream_name_to_full_name.at(stream_name);
if (stream_name != full_stream_name)
{
checksums_to_remove.insert(full_stream_name + stream->data_file_extension);
checksums_to_remove.insert(full_stream_name + stream->marks_file_extension);
}
stream->preFinalize();
stream->addToChecksums(checksums);
}
}
@ -632,11 +661,11 @@ void MergeTreeDataPartWriterWide::finishDataSerialization(bool sync)
}
void MergeTreeDataPartWriterWide::fillChecksums(IMergeTreeDataPart::Checksums & checksums)
void MergeTreeDataPartWriterWide::fillChecksums(IMergeTreeDataPart::Checksums & checksums, NameSet & checksums_to_remove)
{
// If we don't have anything to write, skip finalization.
if (!columns_list.empty())
fillDataChecksums(checksums);
fillDataChecksums(checksums, checksums_to_remove);
if (settings.rewrite_primary_key)
fillPrimaryIndexChecksums(checksums);
@ -666,10 +695,7 @@ void MergeTreeDataPartWriterWide::writeFinalMark(
{
bool is_offsets = !substream_path.empty() && substream_path.back().type == ISerialization::Substream::ArraySizes;
if (is_offsets)
{
String stream_name = ISerialization::getFileNameForStream(column, substream_path);
offset_columns.insert(stream_name);
}
offset_columns.insert(getStreamName(column, substream_path));
});
}

View File

@ -29,14 +29,14 @@ public:
void write(const Block & block, const IColumn::Permutation * permutation) override;
void fillChecksums(IMergeTreeDataPart::Checksums & checksums) final;
void fillChecksums(IMergeTreeDataPart::Checksums & checksums, NameSet & checksums_to_remove) final;
void finish(bool sync) final;
private:
/// Finish serialization of data: write final mark if required and compute checksums
/// Also validate written data in debug mode
void fillDataChecksums(IMergeTreeDataPart::Checksums & checksums);
void fillDataChecksums(IMergeTreeDataPart::Checksums & checksums, NameSet & checksums_to_remove);
void finishDataSerialization(bool sync);
/// Write data of one column.
@ -101,6 +101,7 @@ private:
void adjustLastMarkIfNeedAndFlushToDisk(size_t new_rows_in_last_mark);
ISerialization::OutputStreamGetter createStreamGetter(const NameAndTypePair & column, WrittenOffsetColumns & offset_columns) const;
const String & getStreamName(const NameAndTypePair & column, const ISerialization::SubstreamPath & substream_path) const;
using SerializationState = ISerialization::SerializeBinaryBulkStatePtr;
using SerializationStates = std::unordered_map<String, SerializationState>;
@ -110,6 +111,12 @@ private:
using ColumnStreams = std::map<String, StreamPtr>;
ColumnStreams column_streams;
/// Some long column names may be replaced to hashes.
/// Below are mapping from original stream name to actual
/// stream name (probably hash of the stream) and vice versa.
std::unordered_map<String, String> full_name_to_stream_name;
std::unordered_map<String, String> stream_name_to_full_name;
/// Non written marks to disk (for each column). Waiting until all rows for
/// this marks will be written to disk.
using MarksForColumns = std::unordered_map<String, StreamsWithMarks>;

View File

@ -206,35 +206,33 @@ void MergeTreeReaderWide::addStreams(
ISerialization::StreamCallback callback = [&] (const ISerialization::SubstreamPath & substream_path)
{
String stream_name = ISerialization::getFileNameForStream(name_and_type, substream_path);
if (streams.contains(stream_name))
{
has_any_stream = true;
return;
}
bool data_file_exists = data_part_info_for_read->getChecksums().files.contains(stream_name + DATA_FILE_EXTENSION);
auto stream_name = IMergeTreeDataPart::getStreamNameForColumn(name_and_type, substream_path, data_part_info_for_read->getChecksums());
/** If data file is missing then we will not try to open it.
* It is necessary since it allows to add new column to structure of the table without creating new files for old parts.
*/
if (!data_file_exists)
if (!stream_name)
{
has_all_streams = false;
return;
}
if (streams.contains(*stream_name))
{
has_any_stream = true;
return;
}
has_any_stream = true;
bool is_lc_dict = substream_path.size() > 1 && substream_path[substream_path.size() - 2].type == ISerialization::Substream::Type::DictionaryKeys;
auto context = data_part_info_for_read->getContext();
auto * load_marks_threadpool = settings.read_settings.load_marks_asynchronously ? &context->getLoadMarksThreadpool() : nullptr;
streams.emplace(stream_name, std::make_unique<MergeTreeReaderStream>(
data_part_info_for_read, stream_name, DATA_FILE_EXTENSION,
streams.emplace(*stream_name, std::make_unique<MergeTreeReaderStream>(
data_part_info_for_read, *stream_name, DATA_FILE_EXTENSION,
data_part_info_for_read->getMarksCount(), all_mark_ranges, settings, mark_cache,
uncompressed_cache, data_part_info_for_read->getFileSizeOrZero(stream_name + DATA_FILE_EXTENSION),
uncompressed_cache, data_part_info_for_read->getFileSizeOrZero(*stream_name + DATA_FILE_EXTENSION),
&data_part_info_for_read->getIndexGranularityInfo(),
profile_callback, clock_type, is_lc_dict, load_marks_threadpool));
};
@ -245,13 +243,14 @@ void MergeTreeReaderWide::addStreams(
partially_read_columns.insert(name_and_type.name);
}
static ReadBuffer * getStream(
bool seek_to_start,
const ISerialization::SubstreamPath & substream_path,
const MergeTreeDataPartChecksums & checksums,
MergeTreeReaderWide::FileStreams & streams,
const NameAndTypePair & name_and_type,
size_t from_mark, bool seek_to_mark,
size_t from_mark,
bool seek_to_mark,
size_t current_task_last_mark,
ISerialization::SubstreamsCache & cache)
{
@ -259,9 +258,11 @@ static ReadBuffer * getStream(
if (cache.contains(ISerialization::getSubcolumnNameForStream(substream_path)))
return nullptr;
String stream_name = ISerialization::getFileNameForStream(name_and_type, substream_path);
auto stream_name = IMergeTreeDataPart::getStreamNameForColumn(name_and_type, substream_path, checksums);
if (!stream_name)
return nullptr;
auto it = streams.find(stream_name);
auto it = streams.find(*stream_name);
if (it == streams.end())
return nullptr;
@ -288,7 +289,7 @@ void MergeTreeReaderWide::deserializePrefix(
ISerialization::DeserializeBinaryBulkSettings deserialize_settings;
deserialize_settings.getter = [&](const ISerialization::SubstreamPath & substream_path)
{
return getStream(/* seek_to_start = */true, substream_path, streams, name_and_type, 0, /* seek_to_mark = */false, current_task_last_mark, cache);
return getStream(/* seek_to_start = */true, substream_path, data_part_info_for_read->getChecksums(), streams, name_and_type, 0, /* seek_to_mark = */false, current_task_last_mark, cache);
};
serialization->deserializeBinaryBulkStatePrefix(deserialize_settings, deserialize_binary_bulk_state_map[name]);
}
@ -307,15 +308,15 @@ void MergeTreeReaderWide::prefetchForColumn(
serialization->enumerateStreams([&](const ISerialization::SubstreamPath & substream_path)
{
String stream_name = ISerialization::getFileNameForStream(name_and_type, substream_path);
auto stream_name = IMergeTreeDataPart::getStreamNameForColumn(name_and_type, substream_path, data_part_info_for_read->getChecksums());
if (!prefetched_streams.contains(stream_name))
if (stream_name && !prefetched_streams.contains(*stream_name))
{
bool seek_to_mark = !continue_reading;
if (ReadBuffer * buf = getStream(false, substream_path, streams, name_and_type, from_mark, seek_to_mark, current_task_last_mark, cache))
if (ReadBuffer * buf = getStream(false, substream_path, data_part_info_for_read->getChecksums(), streams, name_and_type, from_mark, seek_to_mark, current_task_last_mark, cache))
{
buf->prefetch(priority);
prefetched_streams.insert(stream_name);
prefetched_streams.insert(*stream_name);
}
}
});
@ -338,8 +339,9 @@ void MergeTreeReaderWide::readData(
bool seek_to_mark = !was_prefetched && !continue_reading;
return getStream(
/* seek_to_start = */false, substream_path, streams, name_and_type, from_mark,
seek_to_mark, current_task_last_mark, cache);
/* seek_to_start = */false, substream_path,
data_part_info_for_read->getChecksums(), streams,
name_and_type, from_mark, seek_to_mark, current_task_last_mark, cache);
};
deserialize_settings.continuous_reading = continue_reading;

View File

@ -34,7 +34,8 @@ struct Settings;
M(UInt64, min_bytes_for_wide_part, 10485760, "Minimal uncompressed size in bytes to create part in wide format instead of compact", 0) \
M(UInt64, min_rows_for_wide_part, 0, "Minimal number of rows to create part in wide format instead of compact", 0) \
M(Float, ratio_of_defaults_for_sparse_serialization, 0.9375f, "Minimal ratio of number of default values to number of all values in column to store it in sparse serializations. If >= 1, columns will be always written in full serialization.", 0) \
\
M(Bool, replace_long_file_name_to_hash, false, "If the file name for column is too long (more than 'max_file_name_length' bytes) replace it to SipHash128", 0) \
M(UInt64, max_file_name_length, 127, "The maximal length of the file name to keep it as is without hashing", 0) \
/** Merge settings. */ \
M(UInt64, merge_max_block_size, 8192, "How many rows in blocks should be formed for merge operations. By default has the same value as `index_granularity`.", 0) \
M(UInt64, merge_max_block_size_bytes, 10 * 1024 * 1024, "How many bytes in blocks should be formed for merge operations. By default has the same value as `index_granularity_bytes`.", 0) \

View File

@ -142,12 +142,16 @@ MergedBlockOutputStream::Finalizer MergedBlockOutputStream::finalizePartAsync(
{
/// Finish write and get checksums.
MergeTreeData::DataPart::Checksums checksums;
NameSet checksums_to_remove;
if (additional_column_checksums)
checksums = std::move(*additional_column_checksums);
/// Finish columns serialization.
writer->fillChecksums(checksums);
writer->fillChecksums(checksums, checksums_to_remove);
for (const auto & name : checksums_to_remove)
checksums.files.erase(name);
LOG_TRACE(&Poco::Logger::get("MergedBlockOutputStream"), "filled checksums {}", new_part->getNameWithState());

View File

@ -63,7 +63,11 @@ MergedColumnOnlyOutputStream::fillChecksums(
{
/// Finish columns serialization.
MergeTreeData::DataPart::Checksums checksums;
writer->fillChecksums(checksums);
NameSet checksums_to_remove;
writer->fillChecksums(checksums, checksums_to_remove);
for (const auto & filename : checksums_to_remove)
all_checksums.files.erase(filename);
for (const auto & [projection_name, projection_part] : new_part->getProjectionParts())
checksums.addFile(
@ -80,9 +84,7 @@ MergedColumnOnlyOutputStream::fillChecksums(
for (const String & removed_file : removed_files)
{
new_part->getDataPartStorage().removeFileIfExists(removed_file);
if (all_checksums.files.contains(removed_file))
all_checksums.files.erase(removed_file);
all_checksums.files.erase(removed_file);
}
new_part->setColumns(columns, serialization_infos, metadata_snapshot->getMetadataVersion());

View File

@ -1,6 +1,5 @@
#include <Storages/MergeTree/MutateTask.h>
#include "Common/Priority.h"
#include <Common/logger_useful.h>
#include <Common/escapeForFileName.h>
#include <Storages/MergeTree/DataPartStorageOnDiskFull.h>
@ -523,7 +522,9 @@ static std::set<ProjectionDescriptionRawPtr> getProjectionsToRecalculate(
}
static std::unordered_map<String, size_t> getStreamCounts(
const MergeTreeDataPartPtr & data_part, const Names & column_names)
const MergeTreeDataPartPtr & data_part,
const MergeTreeDataPartChecksums & source_part_checksums,
const Names & column_names)
{
std::unordered_map<String, size_t> stream_counts;
@ -533,8 +534,9 @@ static std::unordered_map<String, size_t> getStreamCounts(
{
auto callback = [&](const ISerialization::SubstreamPath & substream_path)
{
auto stream_name = ISerialization::getFileNameForStream(column_name, substream_path);
++stream_counts[stream_name];
auto stream_name = IMergeTreeDataPart::getStreamNameForColumn(column_name, substream_path, source_part_checksums);
if (stream_name)
++stream_counts[*stream_name];
};
serialization->enumerateStreams(callback);
@ -544,7 +546,6 @@ static std::unordered_map<String, size_t> getStreamCounts(
return stream_counts;
}
/// Files, that we don't need to remove and don't need to hardlink, for example columns.txt and checksums.txt.
/// Because we will generate new versions of them after we perform mutation.
static NameSet collectFilesToSkip(
@ -572,9 +573,10 @@ static NameSet collectFilesToSkip(
if (isWidePart(source_part))
{
auto new_stream_counts = getStreamCounts(new_part, new_part->getColumns().getNames());
auto source_updated_stream_counts = getStreamCounts(source_part, updated_header.getNames());
auto new_updated_stream_counts = getStreamCounts(new_part, updated_header.getNames());
auto new_stream_counts = getStreamCounts(new_part, source_part->checksums, new_part->getColumns().getNames());
auto source_updated_stream_counts = getStreamCounts(source_part, source_part->checksums, updated_header.getNames());
auto new_updated_stream_counts = getStreamCounts(new_part, source_part->checksums, updated_header.getNames());
/// Skip all modified files in new part.
for (const auto & [stream_name, _] : new_updated_stream_counts)
@ -615,7 +617,7 @@ static NameToNameVector collectFilesForRenames(
const String & mrk_extension)
{
/// Collect counts for shared streams of different columns. As an example, Nested columns have shared stream with array sizes.
auto stream_counts = getStreamCounts(source_part, source_part->getColumns().getNames());
auto stream_counts = getStreamCounts(source_part, source_part->checksums, source_part->getColumns().getNames());
NameToNameVector rename_vector;
NameSet collected_names;
@ -652,12 +654,13 @@ static NameToNameVector collectFilesForRenames(
{
ISerialization::StreamCallback callback = [&](const ISerialization::SubstreamPath & substream_path)
{
String stream_name = ISerialization::getFileNameForStream({command.column_name, command.data_type}, substream_path);
auto stream_name = IMergeTreeDataPart::getStreamNameForColumn(command.column_name, substream_path, source_part->checksums);
/// Delete files if they are no longer shared with another column.
if (--stream_counts[stream_name] == 0)
if (stream_name && --stream_counts[*stream_name] == 0)
{
add_rename(stream_name + ".bin", "");
add_rename(stream_name + mrk_extension, "");
add_rename(*stream_name + ".bin", "");
add_rename(*stream_name + mrk_extension, "");
}
};
@ -671,13 +674,25 @@ static NameToNameVector collectFilesForRenames(
ISerialization::StreamCallback callback = [&](const ISerialization::SubstreamPath & substream_path)
{
String stream_from = ISerialization::getFileNameForStream(command.column_name, substream_path);
String stream_to = boost::replace_first_copy(stream_from, escaped_name_from, escaped_name_to);
String full_stream_from = ISerialization::getFileNameForStream(command.column_name, substream_path);
String full_stream_to = boost::replace_first_copy(full_stream_from, escaped_name_from, escaped_name_to);
auto stream_from = IMergeTreeDataPart::getStreamNameOrHash(full_stream_from, source_part->checksums);
if (!stream_from)
return;
String stream_to;
auto storage_settings = source_part->storage.getSettings();
if (storage_settings->replace_long_file_name_to_hash && full_stream_to.size() > storage_settings->max_file_name_length)
stream_to = sipHash128String(full_stream_to);
else
stream_to = full_stream_to;
if (stream_from != stream_to)
{
add_rename(stream_from + ".bin", stream_to + ".bin");
add_rename(stream_from + mrk_extension, stream_to + mrk_extension);
add_rename(*stream_from + ".bin", stream_to + ".bin");
add_rename(*stream_from + mrk_extension, stream_to + mrk_extension);
}
};
@ -690,8 +705,8 @@ static NameToNameVector collectFilesForRenames(
/// but were removed in new_part by MODIFY COLUMN from
/// type with higher number of streams (e.g. LowCardinality -> String).
auto old_streams = getStreamCounts(source_part, source_part->getColumns().getNames());
auto new_streams = getStreamCounts(new_part, source_part->getColumns().getNames());
auto old_streams = getStreamCounts(source_part, source_part->checksums, source_part->getColumns().getNames());
auto new_streams = getStreamCounts(new_part, source_part->checksums, source_part->getColumns().getNames());
for (const auto & [old_stream, _] : old_streams)
{

View File

@ -15,6 +15,7 @@
#include <IO/HashingReadBuffer.h>
#include <IO/S3Common.h>
#include <Common/CurrentMetrics.h>
#include <Common/SipHash.h>
#include <Poco/Net/NetException.h>
#if USE_AZURE_BLOB_STORAGE
@ -38,6 +39,7 @@ namespace ErrorCodes
extern const int CANNOT_MUNMAP;
extern const int CANNOT_MREMAP;
extern const int UNEXPECTED_FILE_IN_DATA_PART;
extern const int NO_FILE_IN_DATA_PART;
extern const int NETWORK_ERROR;
extern const int SOCKET_TIMEOUT;
}
@ -200,7 +202,14 @@ static IMergeTreeDataPart::Checksums checkDataPart(
{
get_serialization(column)->enumerateStreams([&](const ISerialization::SubstreamPath & substream_path)
{
String file_name = ISerialization::getFileNameForStream(column, substream_path) + ".bin";
auto stream_name = IMergeTreeDataPart::getStreamNameForColumn(column, substream_path, ".bin", data_part_storage);
if (!stream_name)
throw Exception(ErrorCodes::NO_FILE_IN_DATA_PART,
"There is no file for column '{}' in data part '{}'",
column.name, data_part->name);
auto file_name = *stream_name + ".bin";
checksums_data.files[file_name] = checksum_compressed_file(data_part_storage, file_name);
});
}

View File

@ -271,18 +271,21 @@ void StorageSystemPartsColumns::processNextStorage(
ColumnSize size;
NameAndTypePair subcolumn(column.name, name, column.type, data.type);
String file_name = ISerialization::getFileNameForStream(subcolumn, subpath);
auto bin_checksum = part->checksums.files.find(file_name + ".bin");
if (bin_checksum != part->checksums.files.end())
auto stream_name = IMergeTreeDataPart::getStreamNameForColumn(subcolumn, subpath, part->checksums);
if (stream_name)
{
size.data_compressed += bin_checksum->second.file_size;
size.data_uncompressed += bin_checksum->second.uncompressed_size;
}
auto bin_checksum = part->checksums.files.find(*stream_name + ".bin");
if (bin_checksum != part->checksums.files.end())
{
size.data_compressed += bin_checksum->second.file_size;
size.data_uncompressed += bin_checksum->second.uncompressed_size;
}
auto mrk_checksum = part->checksums.files.find(file_name + part->index_granularity_info.mark_type.getFileExtension());
if (mrk_checksum != part->checksums.files.end())
size.marks += mrk_checksum->second.file_size;
auto mrk_checksum = part->checksums.files.find(*stream_name + part->index_granularity_info.mark_type.getFileExtension());
if (mrk_checksum != part->checksums.files.end())
size.marks += mrk_checksum->second.file_size;
}
subcolumn_bytes_on_disk.push_back(size.data_compressed + size.marks);
subcolumn_data_compressed_bytes.push_back(size.data_compressed);

View File

@ -671,6 +671,8 @@ class MergeTreeSettingsRandomizer:
"compress_primary_key": lambda: random.randint(0, 1),
"marks_compress_block_size": lambda: random.randint(8000, 100000),
"primary_key_compress_block_size": lambda: random.randint(8000, 100000),
"replace_long_file_name_to_hash": lambda: random.randint(0, 1),
"max_file_name_length": threshold_generator(0.3, 0.3, 0, 128),
}
@staticmethod

View File

@ -1,5 +1,6 @@
<clickhouse>
<merge_tree>
<min_bytes_for_wide_part>0</min_bytes_for_wide_part>
<replace_long_file_name_to_hash>0</replace_long_file_name_to_hash>
</merge_tree>
</clickhouse>

View File

@ -48,7 +48,7 @@ def test_nested_compression_codec(start_cluster):
column_array Array(Array(UInt64)) CODEC(T64, LZ4),
column_bad LowCardinality(Int64) CODEC(Delta)
) ENGINE = ReplicatedMergeTree('/t', '{}') ORDER BY tuple() PARTITION BY key
SETTINGS min_rows_for_wide_part = 0, min_bytes_for_wide_part = 0;
SETTINGS min_rows_for_wide_part = 0, min_bytes_for_wide_part = 0, replace_long_file_name_to_hash = 0;
""".format(
i
),

View File

@ -0,0 +1,5 @@
<clickhouse>
<merge_tree>
<replace_long_file_name_to_hash>0</replace_long_file_name_to_hash>
</merge_tree>
</clickhouse>

View File

@ -9,12 +9,20 @@ cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance(
"node1",
main_configs=["configs/default_compression.xml", "configs/wide_parts_only.xml"],
main_configs=[
"configs/default_compression.xml",
"configs/wide_parts_only.xml",
"configs/long_names.xml",
],
with_zookeeper=True,
)
node2 = cluster.add_instance(
"node2",
main_configs=["configs/default_compression.xml", "configs/wide_parts_only.xml"],
main_configs=[
"configs/default_compression.xml",
"configs/wide_parts_only.xml",
"configs/long_names.xml",
],
with_zookeeper=True,
)
node3 = cluster.add_instance(

View File

@ -23,7 +23,7 @@ def test_file_path_escaping(started_cluster):
node.query(
"""
CREATE TABLE test.`T.a_b,l-e!` (`~Id` UInt32)
ENGINE = MergeTree() PARTITION BY `~Id` ORDER BY `~Id` SETTINGS min_bytes_for_wide_part = 0;
ENGINE = MergeTree() PARTITION BY `~Id` ORDER BY `~Id` SETTINGS min_bytes_for_wide_part = 0, replace_long_file_name_to_hash = 0;
"""
)
node.query("""INSERT INTO test.`T.a_b,l-e!` VALUES (1);""")
@ -48,7 +48,7 @@ def test_file_path_escaping(started_cluster):
node.query(
"""
CREATE TABLE `test 2`.`T.a_b,l-e!` UUID '12345678-1000-4000-8000-000000000001' (`~Id` UInt32)
ENGINE = MergeTree() PARTITION BY `~Id` ORDER BY `~Id` SETTINGS min_bytes_for_wide_part = 0;
ENGINE = MergeTree() PARTITION BY `~Id` ORDER BY `~Id` SETTINGS min_bytes_for_wide_part = 0, replace_long_file_name_to_hash = 0;
"""
)
node.query("""INSERT INTO `test 2`.`T.a_b,l-e!` VALUES (1);""")

View File

@ -2,5 +2,6 @@
<merge_tree>
<min_rows_for_wide_part>0</min_rows_for_wide_part>
<min_bytes_for_wide_part>0</min_bytes_for_wide_part>
<replace_long_file_name_to_hash>0</replace_long_file_name_to_hash>
</merge_tree>
</clickhouse>

View File

@ -150,7 +150,7 @@ def partition_table_complex(started_cluster):
q("DROP TABLE IF EXISTS test.partition_complex")
q(
"CREATE TABLE test.partition_complex (p Date, k Int8, v1 Int8 MATERIALIZED k + 1) "
"ENGINE = MergeTree PARTITION BY p ORDER BY k SETTINGS index_granularity=1, index_granularity_bytes=0, compress_marks=false, compress_primary_key=false, ratio_of_defaults_for_sparse_serialization=1"
"ENGINE = MergeTree PARTITION BY p ORDER BY k SETTINGS index_granularity=1, index_granularity_bytes=0, compress_marks=false, compress_primary_key=false, ratio_of_defaults_for_sparse_serialization=1, replace_long_file_name_to_hash=false"
)
q("INSERT INTO test.partition_complex (p, k) VALUES(toDate(31), 1)")
q("INSERT INTO test.partition_complex (p, k) VALUES(toDate(1), 2)")

View File

@ -4,7 +4,13 @@ DROP TABLE IF EXISTS test_00961;
CREATE TABLE test_00961 (d Date, a String, b UInt8, x String, y Int8, z UInt32)
ENGINE = MergeTree PARTITION BY d ORDER BY (a, b)
SETTINGS index_granularity = 111, min_bytes_for_wide_part = 0, compress_marks = 0, compress_primary_key = 0, index_granularity_bytes = '10Mi', ratio_of_defaults_for_sparse_serialization = 1;
SETTINGS index_granularity = 111,
min_bytes_for_wide_part = 0,
compress_marks = 0,
compress_primary_key = 0,
index_granularity_bytes = '10Mi',
ratio_of_defaults_for_sparse_serialization = 1,
replace_long_file_name_to_hash = 0;
INSERT INTO test_00961 VALUES ('2000-01-01', 'Hello, world!', 123, 'xxx yyy', -123, 123456789);

View File

@ -10,7 +10,7 @@ $CLICKHOUSE_CLIENT -q "drop table if exists rmt sync;"
$CLICKHOUSE_CLIENT -q "CREATE TABLE rmt (a UInt8, b Int16, c Float32, d String, e Array(UInt8), f Nullable(UUID), g Tuple(UInt8, UInt16))
ENGINE = ReplicatedMergeTree('/test/02253/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/rmt', '1') ORDER BY a PARTITION BY b % 10
SETTINGS old_parts_lifetime = 1, cleanup_delay_period = 0, cleanup_delay_period_random_add = 0,
cleanup_thread_preferred_points_per_iteration=0, min_bytes_for_wide_part=0, remove_empty_parts=0"
cleanup_thread_preferred_points_per_iteration=0, min_bytes_for_wide_part=0, remove_empty_parts=0, replace_long_file_name_to_hash=0"
$CLICKHOUSE_CLIENT --insert_keeper_fault_injection_probability=0 -q "INSERT INTO rmt SELECT rand(1), 0, 1 / rand(3), toString(rand(4)), [rand(5), rand(6)], rand(7) % 2 ? NULL : generateUUIDv4(), (rand(8), rand(9)) FROM numbers(1000);"

View File

@ -0,0 +1,3 @@
e798545eefc8b7a1c2c81ff00c064ad8
1 1
2 2

View File

@ -0,0 +1,73 @@
DROP TABLE IF EXISTS t_collisions;
SELECT lower(hex(reverse(CAST(sipHash128('very_very_long_column_name_that_will_be_replaced_with_hash'), 'FixedString(16)'))));
CREATE TABLE t_collisions
(
`very_very_long_column_name_that_will_be_replaced_with_hash` Int32,
`e798545eefc8b7a1c2c81ff00c064ad8` Int32
)
ENGINE = MergeTree
ORDER BY tuple()
SETTINGS replace_long_file_name_to_hash = 1, max_file_name_length = 42; -- { serverError BAD_ARGUMENTS }
DROP TABLE IF EXISTS t_collisions;
CREATE TABLE t_collisions
(
`col1` Int32,
`e798545eefc8b7a1c2c81ff00c064ad8` Int32
)
ENGINE = MergeTree
ORDER BY tuple()
SETTINGS replace_long_file_name_to_hash = 1, max_file_name_length = 42;
ALTER TABLE t_collisions ADD COLUMN very_very_long_column_name_that_will_be_replaced_with_hash Int32; -- { serverError BAD_ARGUMENTS }
ALTER TABLE t_collisions RENAME COLUMN col1 TO very_very_long_column_name_that_will_be_replaced_with_hash; -- { serverError BAD_ARGUMENTS }
DROP TABLE IF EXISTS t_collisions;
CREATE TABLE t_collisions
(
`very_very_long_column_name_that_will_be_replaced_with_hash` Int32,
`e798545eefc8b7a1c2c81ff00c064ad8` Int32
)
ENGINE = MergeTree
ORDER BY tuple()
SETTINGS replace_long_file_name_to_hash = 0;
INSERT INTO t_collisions VALUES (1, 1);
ALTER TABLE t_collisions MODIFY SETTING replace_long_file_name_to_hash = 1, max_file_name_length = 42; -- { serverError BAD_ARGUMENTS }
INSERT INTO t_collisions VALUES (2, 2);
SELECT * FROM t_collisions ORDER BY e798545eefc8b7a1c2c81ff00c064ad8;
DROP TABLE IF EXISTS t_collisions;
CREATE TABLE t_collisions
(
`id` Int,
`col` Array(String),
`col.s` Array(LowCardinality(String)),
`col.u` Array(LowCardinality(String))
)
ENGINE = MergeTree
ORDER BY id; -- { serverError BAD_ARGUMENTS }
DROP TABLE IF EXISTS t_collisions;
CREATE TABLE t_collisions
(
`id` Int,
`col` String,
`col.s` Array(LowCardinality(String)),
`col.u` Array(LowCardinality(String))
)
ENGINE = MergeTree
ORDER BY id;
ALTER TABLE t_collisions MODIFY COLUMN col Array(String); -- { serverError BAD_ARGUMENTS }
DROP TABLE IF EXISTS t_collisions;