Merge pull request #35284 from CurtizJ/fix-mutations-sparse-columns

Fix mutations in tables with enabled sparse columns
This commit is contained in:
Alexey Milovidov 2022-03-20 21:55:59 +03:00 committed by GitHub
commit 9b05801e0f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 28 additions and 13 deletions

View File

@ -693,11 +693,10 @@ MergeTreeDataMergerMutator::getColumnsForNewDataPart(
} }
} }
bool is_wide_part = isWidePart(source_part);
SerializationInfoByName new_serialization_infos; SerializationInfoByName new_serialization_infos;
for (const auto & [name, info] : serialization_infos) for (const auto & [name, info] : serialization_infos)
{ {
if (is_wide_part && removed_columns.count(name)) if (removed_columns.count(name))
continue; continue;
auto it = renamed_columns_from_to.find(name); auto it = renamed_columns_from_to.find(name);
@ -709,7 +708,7 @@ MergeTreeDataMergerMutator::getColumnsForNewDataPart(
/// In compact parts we read all columns, because they all stored in a /// In compact parts we read all columns, because they all stored in a
/// single file /// single file
if (!is_wide_part) if (!isWidePart(source_part))
return {updated_header.getNamesAndTypesList(), new_serialization_infos}; return {updated_header.getNamesAndTypesList(), new_serialization_infos};
Names source_column_names = source_part->getColumns().getNames(); Names source_column_names = source_part->getColumns().getNames();

View File

@ -422,10 +422,11 @@ void finalizeMutatedPart(
const CompressionCodecPtr & codec) const CompressionCodecPtr & codec)
{ {
auto disk = new_data_part->volume->getDisk(); auto disk = new_data_part->volume->getDisk();
auto part_path = fs::path(new_data_part->getFullRelativePath());
if (new_data_part->uuid != UUIDHelpers::Nil) if (new_data_part->uuid != UUIDHelpers::Nil)
{ {
auto out = disk->writeFile(new_data_part->getFullRelativePath() + IMergeTreeDataPart::UUID_FILE_NAME, 4096); auto out = disk->writeFile(part_path / IMergeTreeDataPart::UUID_FILE_NAME, 4096);
HashingWriteBuffer out_hashing(*out); HashingWriteBuffer out_hashing(*out);
writeUUIDText(new_data_part->uuid, out_hashing); writeUUIDText(new_data_part->uuid, out_hashing);
new_data_part->checksums.files[IMergeTreeDataPart::UUID_FILE_NAME].file_size = out_hashing.count(); new_data_part->checksums.files[IMergeTreeDataPart::UUID_FILE_NAME].file_size = out_hashing.count();
@ -435,27 +436,36 @@ void finalizeMutatedPart(
if (execute_ttl_type != ExecuteTTLType::NONE) if (execute_ttl_type != ExecuteTTLType::NONE)
{ {
/// Write a file with ttl infos in json format. /// Write a file with ttl infos in json format.
auto out_ttl = disk->writeFile(fs::path(new_data_part->getFullRelativePath()) / "ttl.txt", 4096); auto out_ttl = disk->writeFile(part_path / "ttl.txt", 4096);
HashingWriteBuffer out_hashing(*out_ttl); HashingWriteBuffer out_hashing(*out_ttl);
new_data_part->ttl_infos.write(out_hashing); new_data_part->ttl_infos.write(out_hashing);
new_data_part->checksums.files["ttl.txt"].file_size = out_hashing.count(); new_data_part->checksums.files["ttl.txt"].file_size = out_hashing.count();
new_data_part->checksums.files["ttl.txt"].file_hash = out_hashing.getHash(); new_data_part->checksums.files["ttl.txt"].file_hash = out_hashing.getHash();
} }
if (!new_data_part->getSerializationInfos().empty())
{
auto out = disk->writeFile(part_path / IMergeTreeDataPart::SERIALIZATION_FILE_NAME, 4096);
HashingWriteBuffer out_hashing(*out);
new_data_part->getSerializationInfos().writeJSON(out_hashing);
new_data_part->checksums.files[IMergeTreeDataPart::SERIALIZATION_FILE_NAME].file_size = out_hashing.count();
new_data_part->checksums.files[IMergeTreeDataPart::SERIALIZATION_FILE_NAME].file_hash = out_hashing.getHash();
}
{ {
/// Write file with checksums. /// Write file with checksums.
auto out_checksums = disk->writeFile(fs::path(new_data_part->getFullRelativePath()) / "checksums.txt", 4096); auto out_checksums = disk->writeFile(part_path / "checksums.txt", 4096);
new_data_part->checksums.write(*out_checksums); new_data_part->checksums.write(*out_checksums);
} /// close fd } /// close fd
{ {
auto out = disk->writeFile(new_data_part->getFullRelativePath() + IMergeTreeDataPart::DEFAULT_COMPRESSION_CODEC_FILE_NAME, 4096); auto out = disk->writeFile(part_path / IMergeTreeDataPart::DEFAULT_COMPRESSION_CODEC_FILE_NAME, 4096);
DB::writeText(queryToString(codec->getFullCodecDesc()), *out); DB::writeText(queryToString(codec->getFullCodecDesc()), *out);
} }
{ {
/// Write a file with a description of columns. /// Write a file with a description of columns.
auto out_columns = disk->writeFile(fs::path(new_data_part->getFullRelativePath()) / "columns.txt", 4096); auto out_columns = disk->writeFile(part_path / "columns.txt", 4096);
new_data_part->getColumns().writeText(*out_columns); new_data_part->getColumns().writeText(*out_columns);
} /// close fd } /// close fd
@ -466,7 +476,7 @@ void finalizeMutatedPart(
new_data_part->modification_time = time(nullptr); new_data_part->modification_time = time(nullptr);
new_data_part->loadProjections(false, false); new_data_part->loadProjections(false, false);
new_data_part->setBytesOnDisk( new_data_part->setBytesOnDisk(
MergeTreeData::DataPart::calculateTotalSizeOnDisk(new_data_part->volume->getDisk(), new_data_part->getFullRelativePath())); MergeTreeData::DataPart::calculateTotalSizeOnDisk(new_data_part->volume->getDisk(), part_path));
new_data_part->default_codec = codec; new_data_part->default_codec = codec;
new_data_part->calculateColumnsAndSecondaryIndicesSizesOnDisk(); new_data_part->calculateColumnsAndSecondaryIndicesSizesOnDisk();
new_data_part->storage.lockSharedData(*new_data_part); new_data_part->storage.lockSharedData(*new_data_part);
@ -1308,14 +1318,13 @@ bool MutateTask::prepare()
ctx->mrk_extension = ctx->source_part->index_granularity_info.is_adaptive ? getAdaptiveMrkExtension(ctx->new_data_part->getType()) ctx->mrk_extension = ctx->source_part->index_granularity_info.is_adaptive ? getAdaptiveMrkExtension(ctx->new_data_part->getType())
: getNonAdaptiveMrkExtension(); : getNonAdaptiveMrkExtension();
const auto data_settings = ctx->data-> getSettings(); const auto data_settings = ctx->data->getSettings();
ctx->need_sync = needSyncPart(ctx->source_part->rows_count, ctx->source_part->getBytesOnDisk(), *data_settings); ctx->need_sync = needSyncPart(ctx->source_part->rows_count, ctx->source_part->getBytesOnDisk(), *data_settings);
ctx->execute_ttl_type = ExecuteTTLType::NONE; ctx->execute_ttl_type = ExecuteTTLType::NONE;
if (ctx->mutating_pipeline.initialized()) if (ctx->mutating_pipeline.initialized())
ctx->execute_ttl_type = MergeTreeDataMergerMutator::shouldExecuteTTL(ctx->metadata_snapshot, ctx->interpreter->getColumnDependencies()); ctx->execute_ttl_type = MergeTreeDataMergerMutator::shouldExecuteTTL(ctx->metadata_snapshot, ctx->interpreter->getColumnDependencies());
/// All columns from part are changed and may be some more that were missing before in part /// All columns from part are changed and may be some more that were missing before in part
/// TODO We can materialize compact part without copying data /// TODO We can materialize compact part without copying data
if (!isWidePart(ctx->source_part) if (!isWidePart(ctx->source_part)
@ -1325,7 +1334,6 @@ bool MutateTask::prepare()
} }
else /// TODO: check that we modify only non-key columns in this case. else /// TODO: check that we modify only non-key columns in this case.
{ {
/// We will modify only some of the columns. Other columns and key values can be copied as-is. /// We will modify only some of the columns. Other columns and key values can be copied as-is.
for (const auto & name_type : ctx->updated_header.getNamesAndTypesList()) for (const auto & name_type : ctx->updated_header.getNamesAndTypesList())
ctx->updated_columns.emplace(name_type.name); ctx->updated_columns.emplace(name_type.name);

View File

@ -5,3 +5,6 @@ u Sparse
id Default id Default
t Sparse t Sparse
182 182
id Default
t Sparse
182

View File

@ -5,7 +5,6 @@ DROP TABLE IF EXISTS t_sparse_alter;
CREATE TABLE t_sparse_alter (id UInt64, u UInt64, s String) CREATE TABLE t_sparse_alter (id UInt64, u UInt64, s String)
ENGINE = MergeTree ORDER BY id ENGINE = MergeTree ORDER BY id
SETTINGS ratio_of_defaults_for_sparse_serialization = 0.5; SETTINGS ratio_of_defaults_for_sparse_serialization = 0.5;
INSERT INTO t_sparse_alter SELECT INSERT INTO t_sparse_alter SELECT
number, number,
if (number % 11 = 0, number, 0), if (number % 11 = 0, number, 0),
@ -23,4 +22,10 @@ SELECT column, serialization_kind FROM system.parts_columns WHERE database = cur
SELECT uniqExact(t) FROM t_sparse_alter; SELECT uniqExact(t) FROM t_sparse_alter;
DETACH TABLE t_sparse_alter;
ATTACH TABLE t_sparse_alter;
SELECT column, serialization_kind FROM system.parts_columns WHERE database = currentDatabase() AND table = 't_sparse_alter' AND active ORDER BY column;
SELECT uniqExact(t) FROM t_sparse_alter;
DROP TABLE t_sparse_alter; DROP TABLE t_sparse_alter;