diff --git a/src/Storages/MergeTree/IMergeTreeDataPart.cpp b/src/Storages/MergeTree/IMergeTreeDataPart.cpp index fa33bef1582..289c41e5d10 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPart.cpp +++ b/src/Storages/MergeTree/IMergeTreeDataPart.cpp @@ -1,5 +1,4 @@ #include "IMergeTreeDataPart.h" -#include "Common/SipHash.h" #include "Storages/MergeTree/IDataPartStorage.h" #include diff --git a/src/Storages/MergeTree/IMergeTreeDataPartWriter.h b/src/Storages/MergeTree/IMergeTreeDataPartWriter.h index fa3c675f7da..3f359904ddd 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPartWriter.h +++ b/src/Storages/MergeTree/IMergeTreeDataPartWriter.h @@ -32,7 +32,7 @@ public: virtual void write(const Block & block, const IColumn::Permutation * permutation) = 0; - virtual void fillChecksums(IMergeTreeDataPart::Checksums & checksums) = 0; + virtual void fillChecksums(IMergeTreeDataPart::Checksums & checksums, NameSet & checksums_to_remove) = 0; virtual void finish(bool sync) = 0; diff --git a/src/Storages/MergeTree/MergeTreeDataPartChecksum.cpp b/src/Storages/MergeTree/MergeTreeDataPartChecksum.cpp index 7d39ea0707f..5dc71147246 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartChecksum.cpp +++ b/src/Storages/MergeTree/MergeTreeDataPartChecksum.cpp @@ -68,44 +68,35 @@ void MergeTreeDataPartChecksum::checkSize(const IDataPartStorage & storage, cons void MergeTreeDataPartChecksums::checkEqual(const MergeTreeDataPartChecksums & rhs, bool have_uncompressed) const { - for (const auto & it : rhs.files) - { - const String & name = it.first; - + for (const auto & [name, _] : rhs.files) if (!files.contains(name)) throw Exception(ErrorCodes::UNEXPECTED_FILE_IN_DATA_PART, "Unexpected file {} in data part", name); - } - for (const auto & it : files) + for (const auto & [name, checksum] : files) { - const String & name = it.first; - /// Exclude files written by inverted index from check. No correct checksums are available for them currently. if (name.ends_with(".gin_dict") || name.ends_with(".gin_post") || name.ends_with(".gin_seg") || name.ends_with(".gin_sid")) continue; - auto jt = rhs.files.find(name); - if (jt == rhs.files.end()) + auto it = rhs.files.find(name); + if (it == rhs.files.end()) throw Exception(ErrorCodes::NO_FILE_IN_DATA_PART, "No file {} in data part", name); - it.second.checkEqual(jt->second, have_uncompressed, name); + checksum.checkEqual(it->second, have_uncompressed, name); } } void MergeTreeDataPartChecksums::checkSizes(const IDataPartStorage & storage) const { - for (const auto & it : files) - { - const String & name = it.first; - it.second.checkSize(storage, name); - } + for (const auto & [name, checksum] : files) + checksum.checkSize(storage, name); } UInt64 MergeTreeDataPartChecksums::getTotalSizeOnDisk() const { UInt64 res = 0; - for (const auto & it : files) - res += it.second.file_size; + for (const auto & [_, checksum] : files) + res += checksum.file_size; return res; } @@ -219,11 +210,8 @@ void MergeTreeDataPartChecksums::write(WriteBuffer & to) const writeVarUInt(files.size(), out); - for (const auto & it : files) + for (const auto & [name, sum] : files) { - const String & name = it.first; - const Checksum & sum = it.second; - writeBinary(name, out); writeVarUInt(sum.file_size, out); writePODBinary(sum.file_hash, out); @@ -256,11 +244,8 @@ void MergeTreeDataPartChecksums::add(MergeTreeDataPartChecksums && rhs_checksums void MergeTreeDataPartChecksums::computeTotalChecksumDataOnly(SipHash & hash) const { /// We use fact that iteration is in deterministic (lexicographical) order. - for (const auto & it : files) + for (const auto & [name, sum] : files) { - const String & name = it.first; - const Checksum & sum = it.second; - if (!endsWith(name, ".bin")) continue; diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.cpp b/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.cpp index 0b650eb9f16..9b8f1155912 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.cpp +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.cpp @@ -403,7 +403,7 @@ size_t MergeTreeDataPartWriterCompact::ColumnsBuffer::size() const return accumulated_columns.at(0)->size(); } -void MergeTreeDataPartWriterCompact::fillChecksums(IMergeTreeDataPart::Checksums & checksums) +void MergeTreeDataPartWriterCompact::fillChecksums(IMergeTreeDataPart::Checksums & checksums, NameSet & /*checksums_to_remove*/) { // If we don't have anything to write, skip finalization. if (!columns_list.empty()) diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.h b/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.h index 06f8122393f..b1cfefd2d8f 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.h +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.h @@ -22,7 +22,7 @@ public: void write(const Block & block, const IColumn::Permutation * permutation) override; - void fillChecksums(IMergeTreeDataPart::Checksums & checksums) override; + void fillChecksums(IMergeTreeDataPart::Checksums & checksums, NameSet & checksums_to_remove) override; void finish(bool sync) override; private: diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterInMemory.cpp b/src/Storages/MergeTree/MergeTreeDataPartWriterInMemory.cpp index 9afa7a1e80d..048339b58c9 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterInMemory.cpp +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterInMemory.cpp @@ -76,7 +76,7 @@ void MergeTreeDataPartWriterInMemory::calculateAndSerializePrimaryIndex(const Bl } } -void MergeTreeDataPartWriterInMemory::fillChecksums(IMergeTreeDataPart::Checksums & checksums) +void MergeTreeDataPartWriterInMemory::fillChecksums(IMergeTreeDataPart::Checksums & checksums, NameSet & /*checksums_to_remove*/) { /// If part is empty we still need to initialize block by empty columns. if (!part_in_memory->block) diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterInMemory.h b/src/Storages/MergeTree/MergeTreeDataPartWriterInMemory.h index 9e1e868beac..2d333822652 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterInMemory.h +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterInMemory.h @@ -18,7 +18,7 @@ public: /// You can write only one block. In-memory part can be written only at INSERT. void write(const Block & block, const IColumn::Permutation * permutation) override; - void fillChecksums(IMergeTreeDataPart::Checksums & checksums) override; + void fillChecksums(IMergeTreeDataPart::Checksums & checksums, NameSet & checksums_to_remove) override; void finish(bool /*sync*/) override {} private: diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp b/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp index 60bb1119770..c9dae9a1f2c 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp @@ -136,6 +136,7 @@ void MergeTreeDataPartWriterWide::addStreams( settings.query_write_settings); full_name_to_stream_name.emplace(full_stream_name, stream_name); + stream_name_to_full_name.emplace(stream_name, full_stream_name); }; ISerialization::SubstreamPath path; @@ -562,7 +563,7 @@ void MergeTreeDataPartWriterWide::validateColumnOfFixedSize(const NameAndTypePai } -void MergeTreeDataPartWriterWide::fillDataChecksums(IMergeTreeDataPart::Checksums & checksums) +void MergeTreeDataPartWriterWide::fillDataChecksums(IMergeTreeDataPart::Checksums & checksums, NameSet & checksums_to_remove) { const auto & global_settings = storage.getContext()->getSettingsRef(); ISerialization::SerializeBinaryBulkSettings serialize_settings; @@ -598,10 +599,19 @@ void MergeTreeDataPartWriterWide::fillDataChecksums(IMergeTreeDataPart::Checksum } } - for (auto & stream : column_streams) + for (auto & [stream_name, stream] : column_streams) { - stream.second->preFinalize(); - stream.second->addToChecksums(checksums); + /// Remove checksums for old stream name if file was + /// renamed due to replacing the name to the hash of name. + const auto & full_stream_name = stream_name_to_full_name.at(stream_name); + if (stream_name != full_stream_name) + { + checksums_to_remove.insert(full_stream_name + stream->data_file_extension); + checksums_to_remove.insert(full_stream_name + stream->marks_file_extension); + } + + stream->preFinalize(); + stream->addToChecksums(checksums); } } @@ -633,11 +643,11 @@ void MergeTreeDataPartWriterWide::finishDataSerialization(bool sync) } -void MergeTreeDataPartWriterWide::fillChecksums(IMergeTreeDataPart::Checksums & checksums) +void MergeTreeDataPartWriterWide::fillChecksums(IMergeTreeDataPart::Checksums & checksums, NameSet & checksums_to_remove) { // If we don't have anything to write, skip finalization. if (!columns_list.empty()) - fillDataChecksums(checksums); + fillDataChecksums(checksums, checksums_to_remove); if (settings.rewrite_primary_key) fillPrimaryIndexChecksums(checksums); diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterWide.h b/src/Storages/MergeTree/MergeTreeDataPartWriterWide.h index de7419fedb2..c274fc9807c 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterWide.h +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterWide.h @@ -29,14 +29,14 @@ public: void write(const Block & block, const IColumn::Permutation * permutation) override; - void fillChecksums(IMergeTreeDataPart::Checksums & checksums) final; + void fillChecksums(IMergeTreeDataPart::Checksums & checksums, NameSet & checksums_to_remove) final; void finish(bool sync) final; private: /// Finish serialization of data: write final mark if required and compute checksums /// Also validate written data in debug mode - void fillDataChecksums(IMergeTreeDataPart::Checksums & checksums); + void fillDataChecksums(IMergeTreeDataPart::Checksums & checksums, NameSet & checksums_to_remove); void finishDataSerialization(bool sync); /// Write data of one column. @@ -111,8 +111,11 @@ private: using ColumnStreams = std::map; ColumnStreams column_streams; - /// TODO: + /// Some long column names may be replaced to hashes. + /// Below are mapping from original stream name to actual + /// stream name (probably hash of the stream) and vice versa. std::unordered_map full_name_to_stream_name; + std::unordered_map stream_name_to_full_name; /// Non written marks to disk (for each column). Waiting until all rows for /// this marks will be written to disk. diff --git a/src/Storages/MergeTree/MergedBlockOutputStream.cpp b/src/Storages/MergeTree/MergedBlockOutputStream.cpp index c93ad135835..1ebb1d87aae 100644 --- a/src/Storages/MergeTree/MergedBlockOutputStream.cpp +++ b/src/Storages/MergeTree/MergedBlockOutputStream.cpp @@ -142,12 +142,16 @@ MergedBlockOutputStream::Finalizer MergedBlockOutputStream::finalizePartAsync( { /// Finish write and get checksums. MergeTreeData::DataPart::Checksums checksums; + NameSet checksums_to_remove; if (additional_column_checksums) checksums = std::move(*additional_column_checksums); /// Finish columns serialization. - writer->fillChecksums(checksums); + writer->fillChecksums(checksums, checksums_to_remove); + + for (const auto & name : checksums_to_remove) + checksums.files.erase(name); LOG_TRACE(&Poco::Logger::get("MergedBlockOutputStream"), "filled checksums {}", new_part->getNameWithState()); diff --git a/src/Storages/MergeTree/MergedColumnOnlyOutputStream.cpp b/src/Storages/MergeTree/MergedColumnOnlyOutputStream.cpp index 3b2eb96f2d4..108f364fc2d 100644 --- a/src/Storages/MergeTree/MergedColumnOnlyOutputStream.cpp +++ b/src/Storages/MergeTree/MergedColumnOnlyOutputStream.cpp @@ -63,7 +63,11 @@ MergedColumnOnlyOutputStream::fillChecksums( { /// Finish columns serialization. MergeTreeData::DataPart::Checksums checksums; - writer->fillChecksums(checksums); + NameSet checksums_to_remove; + writer->fillChecksums(checksums, checksums_to_remove); + + for (const auto & filename : checksums_to_remove) + all_checksums.files.erase(filename); for (const auto & [projection_name, projection_part] : new_part->getProjectionParts()) checksums.addFile( @@ -80,9 +84,7 @@ MergedColumnOnlyOutputStream::fillChecksums( for (const String & removed_file : removed_files) { new_part->getDataPartStorage().removeFileIfExists(removed_file); - - if (all_checksums.files.contains(removed_file)) - all_checksums.files.erase(removed_file); + all_checksums.files.erase(removed_file); } new_part->setColumns(columns, serialization_infos, metadata_snapshot->getMetadataVersion()); diff --git a/tests/integration/test_backward_compatibility/configs/wide_parts_only.xml b/tests/integration/test_backward_compatibility/configs/wide_parts_only.xml index e9cf053f1c5..674ffff6c93 100644 --- a/tests/integration/test_backward_compatibility/configs/wide_parts_only.xml +++ b/tests/integration/test_backward_compatibility/configs/wide_parts_only.xml @@ -1,5 +1,6 @@ 0 + 0 diff --git a/tests/integration/test_default_compression_codec/configs/wide_parts_only.xml b/tests/integration/test_default_compression_codec/configs/wide_parts_only.xml index 10b9edef36d..4d1a3357799 100644 --- a/tests/integration/test_default_compression_codec/configs/wide_parts_only.xml +++ b/tests/integration/test_default_compression_codec/configs/wide_parts_only.xml @@ -2,5 +2,6 @@ 0 0 + 0 diff --git a/tests/integration/test_filesystem_layout/test.py b/tests/integration/test_filesystem_layout/test.py index 2be478f95d0..81f3b67cb75 100644 --- a/tests/integration/test_filesystem_layout/test.py +++ b/tests/integration/test_filesystem_layout/test.py @@ -23,7 +23,7 @@ def test_file_path_escaping(started_cluster): node.query( """ CREATE TABLE test.`T.a_b,l-e!` (`~Id` UInt32) - ENGINE = MergeTree() PARTITION BY `~Id` ORDER BY `~Id` SETTINGS min_bytes_for_wide_part = 0; + ENGINE = MergeTree() PARTITION BY `~Id` ORDER BY `~Id` SETTINGS min_bytes_for_wide_part = 0, replace_long_file_name_to_hash = 0; """ ) node.query("""INSERT INTO test.`T.a_b,l-e!` VALUES (1);""") diff --git a/tests/integration/test_mutations_hardlinks/configs/wide_parts_only.xml b/tests/integration/test_mutations_hardlinks/configs/wide_parts_only.xml index 10b9edef36d..4d1a3357799 100644 --- a/tests/integration/test_mutations_hardlinks/configs/wide_parts_only.xml +++ b/tests/integration/test_mutations_hardlinks/configs/wide_parts_only.xml @@ -2,5 +2,6 @@ 0 0 + 0 diff --git a/tests/integration/test_partition/test.py b/tests/integration/test_partition/test.py index 93f03f4420e..7634c81f807 100644 --- a/tests/integration/test_partition/test.py +++ b/tests/integration/test_partition/test.py @@ -150,7 +150,7 @@ def partition_table_complex(started_cluster): q("DROP TABLE IF EXISTS test.partition_complex") q( "CREATE TABLE test.partition_complex (p Date, k Int8, v1 Int8 MATERIALIZED k + 1) " - "ENGINE = MergeTree PARTITION BY p ORDER BY k SETTINGS index_granularity=1, index_granularity_bytes=0, compress_marks=false, compress_primary_key=false" + "ENGINE = MergeTree PARTITION BY p ORDER BY k SETTINGS index_granularity=1, index_granularity_bytes=0, compress_marks=false, compress_primary_key=false, replace_long_file_name_to_hash = false" ) q("INSERT INTO test.partition_complex (p, k) VALUES(toDate(31), 1)") q("INSERT INTO test.partition_complex (p, k) VALUES(toDate(1), 2)")