Add missing fsync for small files (ttl.txt, columns.txt, ...) in mutations

Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
This commit is contained in:
Azat Khuzhin 2022-12-30 22:20:47 +01:00
parent 600027c09c
commit 9fb6004c92

View File

@ -625,7 +625,8 @@ void finalizeMutatedPart(
MergeTreeData::MutableDataPartPtr new_data_part,
ExecuteTTLType execute_ttl_type,
const CompressionCodecPtr & codec,
ContextPtr context)
ContextPtr context,
bool sync)
{
if (new_data_part->uuid != UUIDHelpers::Nil)
{
@ -634,6 +635,8 @@ void finalizeMutatedPart(
writeUUIDText(new_data_part->uuid, out_hashing);
new_data_part->checksums.files[IMergeTreeDataPart::UUID_FILE_NAME].file_size = out_hashing.count();
new_data_part->checksums.files[IMergeTreeDataPart::UUID_FILE_NAME].file_hash = out_hashing.getHash();
if (sync)
out_hashing.sync();
}
if (execute_ttl_type != ExecuteTTLType::NONE)
@ -644,6 +647,8 @@ void finalizeMutatedPart(
new_data_part->ttl_infos.write(out_hashing);
new_data_part->checksums.files["ttl.txt"].file_size = out_hashing.count();
new_data_part->checksums.files["ttl.txt"].file_hash = out_hashing.getHash();
if (sync)
out_hashing.sync();
}
if (!new_data_part->getSerializationInfos().empty())
@ -653,23 +658,31 @@ void finalizeMutatedPart(
new_data_part->getSerializationInfos().writeJSON(out_hashing);
new_data_part->checksums.files[IMergeTreeDataPart::SERIALIZATION_FILE_NAME].file_size = out_hashing.count();
new_data_part->checksums.files[IMergeTreeDataPart::SERIALIZATION_FILE_NAME].file_hash = out_hashing.getHash();
if (sync)
out_hashing.sync();
}
{
/// Write file with checksums.
auto out_checksums = new_data_part->getDataPartStorage().writeFile("checksums.txt", 4096, context->getWriteSettings());
new_data_part->checksums.write(*out_checksums);
if (sync)
out_checksums->sync();
} /// close fd
{
auto out = new_data_part->getDataPartStorage().writeFile(IMergeTreeDataPart::DEFAULT_COMPRESSION_CODEC_FILE_NAME, 4096, context->getWriteSettings());
DB::writeText(queryToString(codec->getFullCodecDesc()), *out);
if (sync)
out->sync();
} /// close fd
{
/// Write a file with a description of columns.
auto out_columns = new_data_part->getDataPartStorage().writeFile("columns.txt", 4096, context->getWriteSettings());
new_data_part->getColumns().writeText(*out_columns);
if (sync)
out_columns->sync();
} /// close fd
new_data_part->rows_count = source_part->rows_count;
@ -1414,7 +1427,7 @@ private:
}
}
MutationHelpers::finalizeMutatedPart(ctx->source_part, ctx->new_data_part, ctx->execute_ttl_type, ctx->compression_codec, ctx->context);
MutationHelpers::finalizeMutatedPart(ctx->source_part, ctx->new_data_part, ctx->execute_ttl_type, ctx->compression_codec, ctx->context, ctx->need_sync);
}