fix replacing column names after mutation

This commit is contained in:
Anton Popov 2023-06-22 15:17:13 +00:00
parent c8316837e3
commit 8864e30d2e
16 changed files with 54 additions and 48 deletions

View File

@ -1,5 +1,4 @@
#include "IMergeTreeDataPart.h"
#include "Common/SipHash.h"
#include "Storages/MergeTree/IDataPartStorage.h"
#include <optional>

View File

@ -32,7 +32,7 @@ public:
virtual void write(const Block & block, const IColumn::Permutation * permutation) = 0;
virtual void fillChecksums(IMergeTreeDataPart::Checksums & checksums) = 0;
virtual void fillChecksums(IMergeTreeDataPart::Checksums & checksums, NameSet & checksums_to_remove) = 0;
virtual void finish(bool sync) = 0;

View File

@ -68,44 +68,35 @@ void MergeTreeDataPartChecksum::checkSize(const IDataPartStorage & storage, cons
void MergeTreeDataPartChecksums::checkEqual(const MergeTreeDataPartChecksums & rhs, bool have_uncompressed) const
{
for (const auto & it : rhs.files)
{
const String & name = it.first;
for (const auto & [name, _] : rhs.files)
if (!files.contains(name))
throw Exception(ErrorCodes::UNEXPECTED_FILE_IN_DATA_PART, "Unexpected file {} in data part", name);
}
for (const auto & it : files)
for (const auto & [name, checksum] : files)
{
const String & name = it.first;
/// Exclude files written by inverted index from check. No correct checksums are available for them currently.
if (name.ends_with(".gin_dict") || name.ends_with(".gin_post") || name.ends_with(".gin_seg") || name.ends_with(".gin_sid"))
continue;
auto jt = rhs.files.find(name);
if (jt == rhs.files.end())
auto it = rhs.files.find(name);
if (it == rhs.files.end())
throw Exception(ErrorCodes::NO_FILE_IN_DATA_PART, "No file {} in data part", name);
it.second.checkEqual(jt->second, have_uncompressed, name);
checksum.checkEqual(it->second, have_uncompressed, name);
}
}
void MergeTreeDataPartChecksums::checkSizes(const IDataPartStorage & storage) const
{
for (const auto & it : files)
{
const String & name = it.first;
it.second.checkSize(storage, name);
}
for (const auto & [name, checksum] : files)
checksum.checkSize(storage, name);
}
UInt64 MergeTreeDataPartChecksums::getTotalSizeOnDisk() const
{
UInt64 res = 0;
for (const auto & it : files)
res += it.second.file_size;
for (const auto & [_, checksum] : files)
res += checksum.file_size;
return res;
}
@ -219,11 +210,8 @@ void MergeTreeDataPartChecksums::write(WriteBuffer & to) const
writeVarUInt(files.size(), out);
for (const auto & it : files)
for (const auto & [name, sum] : files)
{
const String & name = it.first;
const Checksum & sum = it.second;
writeBinary(name, out);
writeVarUInt(sum.file_size, out);
writePODBinary(sum.file_hash, out);
@ -256,11 +244,8 @@ void MergeTreeDataPartChecksums::add(MergeTreeDataPartChecksums && rhs_checksums
void MergeTreeDataPartChecksums::computeTotalChecksumDataOnly(SipHash & hash) const
{
/// We use fact that iteration is in deterministic (lexicographical) order.
for (const auto & it : files)
for (const auto & [name, sum] : files)
{
const String & name = it.first;
const Checksum & sum = it.second;
if (!endsWith(name, ".bin"))
continue;

View File

@ -403,7 +403,7 @@ size_t MergeTreeDataPartWriterCompact::ColumnsBuffer::size() const
return accumulated_columns.at(0)->size();
}
void MergeTreeDataPartWriterCompact::fillChecksums(IMergeTreeDataPart::Checksums & checksums)
void MergeTreeDataPartWriterCompact::fillChecksums(IMergeTreeDataPart::Checksums & checksums, NameSet & /*checksums_to_remove*/)
{
// If we don't have anything to write, skip finalization.
if (!columns_list.empty())

View File

@ -22,7 +22,7 @@ public:
void write(const Block & block, const IColumn::Permutation * permutation) override;
void fillChecksums(IMergeTreeDataPart::Checksums & checksums) override;
void fillChecksums(IMergeTreeDataPart::Checksums & checksums, NameSet & checksums_to_remove) override;
void finish(bool sync) override;
private:

View File

@ -76,7 +76,7 @@ void MergeTreeDataPartWriterInMemory::calculateAndSerializePrimaryIndex(const Bl
}
}
void MergeTreeDataPartWriterInMemory::fillChecksums(IMergeTreeDataPart::Checksums & checksums)
void MergeTreeDataPartWriterInMemory::fillChecksums(IMergeTreeDataPart::Checksums & checksums, NameSet & /*checksums_to_remove*/)
{
/// If part is empty we still need to initialize block by empty columns.
if (!part_in_memory->block)

View File

@ -18,7 +18,7 @@ public:
/// You can write only one block. In-memory part can be written only at INSERT.
void write(const Block & block, const IColumn::Permutation * permutation) override;
void fillChecksums(IMergeTreeDataPart::Checksums & checksums) override;
void fillChecksums(IMergeTreeDataPart::Checksums & checksums, NameSet & checksums_to_remove) override;
void finish(bool /*sync*/) override {}
private:

View File

@ -136,6 +136,7 @@ void MergeTreeDataPartWriterWide::addStreams(
settings.query_write_settings);
full_name_to_stream_name.emplace(full_stream_name, stream_name);
stream_name_to_full_name.emplace(stream_name, full_stream_name);
};
ISerialization::SubstreamPath path;
@ -562,7 +563,7 @@ void MergeTreeDataPartWriterWide::validateColumnOfFixedSize(const NameAndTypePai
}
void MergeTreeDataPartWriterWide::fillDataChecksums(IMergeTreeDataPart::Checksums & checksums)
void MergeTreeDataPartWriterWide::fillDataChecksums(IMergeTreeDataPart::Checksums & checksums, NameSet & checksums_to_remove)
{
const auto & global_settings = storage.getContext()->getSettingsRef();
ISerialization::SerializeBinaryBulkSettings serialize_settings;
@ -598,10 +599,19 @@ void MergeTreeDataPartWriterWide::fillDataChecksums(IMergeTreeDataPart::Checksum
}
}
for (auto & stream : column_streams)
for (auto & [stream_name, stream] : column_streams)
{
stream.second->preFinalize();
stream.second->addToChecksums(checksums);
/// Remove checksums for old stream name if file was
/// renamed due to replacing the name to the hash of name.
const auto & full_stream_name = stream_name_to_full_name.at(stream_name);
if (stream_name != full_stream_name)
{
checksums_to_remove.insert(full_stream_name + stream->data_file_extension);
checksums_to_remove.insert(full_stream_name + stream->marks_file_extension);
}
stream->preFinalize();
stream->addToChecksums(checksums);
}
}
@ -633,11 +643,11 @@ void MergeTreeDataPartWriterWide::finishDataSerialization(bool sync)
}
void MergeTreeDataPartWriterWide::fillChecksums(IMergeTreeDataPart::Checksums & checksums)
void MergeTreeDataPartWriterWide::fillChecksums(IMergeTreeDataPart::Checksums & checksums, NameSet & checksums_to_remove)
{
// If we don't have anything to write, skip finalization.
if (!columns_list.empty())
fillDataChecksums(checksums);
fillDataChecksums(checksums, checksums_to_remove);
if (settings.rewrite_primary_key)
fillPrimaryIndexChecksums(checksums);

View File

@ -29,14 +29,14 @@ public:
void write(const Block & block, const IColumn::Permutation * permutation) override;
void fillChecksums(IMergeTreeDataPart::Checksums & checksums) final;
void fillChecksums(IMergeTreeDataPart::Checksums & checksums, NameSet & checksums_to_remove) final;
void finish(bool sync) final;
private:
/// Finish serialization of data: write final mark if required and compute checksums
/// Also validate written data in debug mode
void fillDataChecksums(IMergeTreeDataPart::Checksums & checksums);
void fillDataChecksums(IMergeTreeDataPart::Checksums & checksums, NameSet & checksums_to_remove);
void finishDataSerialization(bool sync);
/// Write data of one column.
@ -111,8 +111,11 @@ private:
using ColumnStreams = std::map<String, StreamPtr>;
ColumnStreams column_streams;
/// TODO:
/// Some long column names may be replaced to hashes.
/// Below are mapping from original stream name to actual
/// stream name (probably hash of the stream) and vice versa.
std::unordered_map<String, String> full_name_to_stream_name;
std::unordered_map<String, String> stream_name_to_full_name;
/// Non written marks to disk (for each column). Waiting until all rows for
/// this marks will be written to disk.

View File

@ -142,12 +142,16 @@ MergedBlockOutputStream::Finalizer MergedBlockOutputStream::finalizePartAsync(
{
/// Finish write and get checksums.
MergeTreeData::DataPart::Checksums checksums;
NameSet checksums_to_remove;
if (additional_column_checksums)
checksums = std::move(*additional_column_checksums);
/// Finish columns serialization.
writer->fillChecksums(checksums);
writer->fillChecksums(checksums, checksums_to_remove);
for (const auto & name : checksums_to_remove)
checksums.files.erase(name);
LOG_TRACE(&Poco::Logger::get("MergedBlockOutputStream"), "filled checksums {}", new_part->getNameWithState());

View File

@ -63,7 +63,11 @@ MergedColumnOnlyOutputStream::fillChecksums(
{
/// Finish columns serialization.
MergeTreeData::DataPart::Checksums checksums;
writer->fillChecksums(checksums);
NameSet checksums_to_remove;
writer->fillChecksums(checksums, checksums_to_remove);
for (const auto & filename : checksums_to_remove)
all_checksums.files.erase(filename);
for (const auto & [projection_name, projection_part] : new_part->getProjectionParts())
checksums.addFile(
@ -80,9 +84,7 @@ MergedColumnOnlyOutputStream::fillChecksums(
for (const String & removed_file : removed_files)
{
new_part->getDataPartStorage().removeFileIfExists(removed_file);
if (all_checksums.files.contains(removed_file))
all_checksums.files.erase(removed_file);
all_checksums.files.erase(removed_file);
}
new_part->setColumns(columns, serialization_infos, metadata_snapshot->getMetadataVersion());

View File

@ -1,5 +1,6 @@
<clickhouse>
<merge_tree>
<min_bytes_for_wide_part>0</min_bytes_for_wide_part>
<replace_long_file_name_to_hash>0</replace_long_file_name_to_hash>
</merge_tree>
</clickhouse>

View File

@ -2,5 +2,6 @@
<merge_tree>
<min_rows_for_wide_part>0</min_rows_for_wide_part>
<min_bytes_for_wide_part>0</min_bytes_for_wide_part>
<replace_long_file_name_to_hash>0</replace_long_file_name_to_hash>
</merge_tree>
</clickhouse>

View File

@ -23,7 +23,7 @@ def test_file_path_escaping(started_cluster):
node.query(
"""
CREATE TABLE test.`T.a_b,l-e!` (`~Id` UInt32)
ENGINE = MergeTree() PARTITION BY `~Id` ORDER BY `~Id` SETTINGS min_bytes_for_wide_part = 0;
ENGINE = MergeTree() PARTITION BY `~Id` ORDER BY `~Id` SETTINGS min_bytes_for_wide_part = 0, replace_long_file_name_to_hash = 0;
"""
)
node.query("""INSERT INTO test.`T.a_b,l-e!` VALUES (1);""")

View File

@ -2,5 +2,6 @@
<merge_tree>
<min_rows_for_wide_part>0</min_rows_for_wide_part>
<min_bytes_for_wide_part>0</min_bytes_for_wide_part>
<replace_long_file_name_to_hash>0</replace_long_file_name_to_hash>
</merge_tree>
</clickhouse>

View File

@ -150,7 +150,7 @@ def partition_table_complex(started_cluster):
q("DROP TABLE IF EXISTS test.partition_complex")
q(
"CREATE TABLE test.partition_complex (p Date, k Int8, v1 Int8 MATERIALIZED k + 1) "
"ENGINE = MergeTree PARTITION BY p ORDER BY k SETTINGS index_granularity=1, index_granularity_bytes=0, compress_marks=false, compress_primary_key=false"
"ENGINE = MergeTree PARTITION BY p ORDER BY k SETTINGS index_granularity=1, index_granularity_bytes=0, compress_marks=false, compress_primary_key=false, replace_long_file_name_to_hash = false"
)
q("INSERT INTO test.partition_complex (p, k) VALUES(toDate(31), 1)")
q("INSERT INTO test.partition_complex (p, k) VALUES(toDate(1), 2)")