Merge pull request #44781 from azat/fsync-fixes

Fix fsync for fetches/small files in mutations (ttl.txt, columns.txt)
This commit is contained in:
Alexey Milovidov 2022-12-31 18:05:33 +03:00 committed by GitHub
commit 1c288e47de
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 25 additions and 2 deletions

View File

@ -77,6 +77,11 @@ public:
state = uint128(0, 0);
}
void sync() override
{
out.sync();
}
uint128 getHash()
{
next();

View File

@ -200,7 +200,9 @@ MergedBlockOutputStream::WrittenFiles MergedBlockOutputStream::finalizePartOnDis
const MergeTreeMutableDataPartPtr & new_part,
MergeTreeData::DataPart::Checksums & checksums)
{
/// NOTE: You do not need to call fsync here, since it will be called later for the all written_files.
WrittenFiles written_files;
if (new_part->isProjectionPart())
{
if (storage.format_version >= MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING || isCompactPart(new_part))

View File

@ -625,7 +625,8 @@ void finalizeMutatedPart(
MergeTreeData::MutableDataPartPtr new_data_part,
ExecuteTTLType execute_ttl_type,
const CompressionCodecPtr & codec,
ContextPtr context)
ContextPtr context,
bool sync)
{
if (new_data_part->uuid != UUIDHelpers::Nil)
{
@ -634,6 +635,8 @@ void finalizeMutatedPart(
writeUUIDText(new_data_part->uuid, out_hashing);
new_data_part->checksums.files[IMergeTreeDataPart::UUID_FILE_NAME].file_size = out_hashing.count();
new_data_part->checksums.files[IMergeTreeDataPart::UUID_FILE_NAME].file_hash = out_hashing.getHash();
if (sync)
out_hashing.sync();
}
if (execute_ttl_type != ExecuteTTLType::NONE)
@ -644,6 +647,8 @@ void finalizeMutatedPart(
new_data_part->ttl_infos.write(out_hashing);
new_data_part->checksums.files["ttl.txt"].file_size = out_hashing.count();
new_data_part->checksums.files["ttl.txt"].file_hash = out_hashing.getHash();
if (sync)
out_hashing.sync();
}
if (!new_data_part->getSerializationInfos().empty())
@ -653,23 +658,31 @@ void finalizeMutatedPart(
new_data_part->getSerializationInfos().writeJSON(out_hashing);
new_data_part->checksums.files[IMergeTreeDataPart::SERIALIZATION_FILE_NAME].file_size = out_hashing.count();
new_data_part->checksums.files[IMergeTreeDataPart::SERIALIZATION_FILE_NAME].file_hash = out_hashing.getHash();
if (sync)
out_hashing.sync();
}
{
/// Write file with checksums.
auto out_checksums = new_data_part->getDataPartStorage().writeFile("checksums.txt", 4096, context->getWriteSettings());
new_data_part->checksums.write(*out_checksums);
if (sync)
out_checksums->sync();
} /// close fd
{
auto out = new_data_part->getDataPartStorage().writeFile(IMergeTreeDataPart::DEFAULT_COMPRESSION_CODEC_FILE_NAME, 4096, context->getWriteSettings());
DB::writeText(queryToString(codec->getFullCodecDesc()), *out);
if (sync)
out->sync();
} /// close fd
{
/// Write a file with a description of columns.
auto out_columns = new_data_part->getDataPartStorage().writeFile("columns.txt", 4096, context->getWriteSettings());
new_data_part->getColumns().writeText(*out_columns);
if (sync)
out_columns->sync();
} /// close fd
new_data_part->rows_count = source_part->rows_count;
@ -1414,7 +1427,7 @@ private:
}
}
MutationHelpers::finalizeMutatedPart(ctx->source_part, ctx->new_data_part, ctx->execute_ttl_type, ctx->compression_codec, ctx->context);
MutationHelpers::finalizeMutatedPart(ctx->source_part, ctx->new_data_part, ctx->execute_ttl_type, ctx->compression_codec, ctx->context, ctx->need_sync);
}

View File

@ -45,13 +45,16 @@ for i in {1..100}; do
# Non retriable errors
if [[ $FileSync -ne 7 ]]; then
echo "FileSync: $FileSync != 11" >&2
exit 2
fi
# Check that all files was synced
if [[ $FileSync -ne $FileOpen ]]; then
echo "$FileSync (FileSync) != $FileOpen (FileOpen)" >&2
exit 3
fi
if [[ $DirectorySync -ne 2 ]]; then
echo "DirectorySync: $DirectorySync != 2" >&2
exit 4
fi