Merge remote-tracking branch 'upstream/master' into refactoring-merge-tree-parts

This commit is contained in:
Anton Popov 2023-01-25 18:01:12 +00:00
commit a455083b87
3 changed files with 38 additions and 25 deletions

View File

@ -482,14 +482,15 @@ void DataPartStorageOnDiskBase::rename(
{
disk.setLastModified(from, Poco::Timestamp::fromEpochTime(time(nullptr)));
disk.moveDirectory(from, to);
/// Only after moveDirectory() since before the directory does not exists.
SyncGuardPtr to_sync_guard;
if (fsync_part_dir)
to_sync_guard = volume->getDisk()->getDirectorySyncGuard(to);
});
part_dir = new_part_dir;
root_path = new_root_path;
SyncGuardPtr sync_guard;
if (fsync_part_dir)
sync_guard = volume->getDisk()->getDirectorySyncGuard(getRelativePath());
}
void DataPartStorageOnDiskBase::remove(

View File

@ -726,6 +726,8 @@ void Fetcher::downloadBaseOrProjectionPartToDisk(
size_t files;
readBinary(files, in);
std::vector<std::unique_ptr<WriteBufferFromFileBase>> written_files;
for (size_t i = 0; i < files; ++i)
{
String file_name;
@ -743,8 +745,8 @@ void Fetcher::downloadBaseOrProjectionPartToDisk(
"This may happen if we are trying to download part from malicious replica or logical error.",
absolute_file_path, data_part_storage->getRelativePath());
auto file_out = output_buffer_getter(*data_part_storage, file_name, file_size);
HashingWriteBuffer hashing_out(*file_out);
written_files.emplace_back(output_buffer_getter(*data_part_storage, file_name, file_size));
HashingWriteBuffer hashing_out(*written_files.back());
copyDataWithThrottler(in, hashing_out, file_size, blocker.getCounter(), throttler);
if (blocker.isCancelled())
@ -768,9 +770,14 @@ void Fetcher::downloadBaseOrProjectionPartToDisk(
file_name != "columns.txt" &&
file_name != IMergeTreeDataPart::DEFAULT_COMPRESSION_CODEC_FILE_NAME)
checksums.addFile(file_name, file_size, expected_hash);
}
/// Call fsync for all files at once in attempt to decrease the latency
for (auto & file : written_files)
{
file->finalize();
if (sync)
hashing_out.sync();
file->sync();
}
}

View File

@ -630,6 +630,8 @@ void finalizeMutatedPart(
ContextPtr context,
bool sync)
{
std::vector<std::unique_ptr<WriteBufferFromFileBase>> written_files;
if (new_data_part->uuid != UUIDHelpers::Nil)
{
auto out = new_data_part->getDataPartStorage().writeFile(IMergeTreeDataPart::UUID_FILE_NAME, 4096, context->getWriteSettings());
@ -637,8 +639,7 @@ void finalizeMutatedPart(
writeUUIDText(new_data_part->uuid, out_hashing);
new_data_part->checksums.files[IMergeTreeDataPart::UUID_FILE_NAME].file_size = out_hashing.count();
new_data_part->checksums.files[IMergeTreeDataPart::UUID_FILE_NAME].file_hash = out_hashing.getHash();
if (sync)
out_hashing.sync();
written_files.push_back(std::move(out));
}
if (execute_ttl_type != ExecuteTTLType::NONE)
@ -649,43 +650,47 @@ void finalizeMutatedPart(
new_data_part->ttl_infos.write(out_hashing);
new_data_part->checksums.files["ttl.txt"].file_size = out_hashing.count();
new_data_part->checksums.files["ttl.txt"].file_hash = out_hashing.getHash();
if (sync)
out_hashing.sync();
written_files.push_back(std::move(out_ttl));
}
if (!new_data_part->getSerializationInfos().empty())
{
auto out = new_data_part->getDataPartStorage().writeFile(IMergeTreeDataPart::SERIALIZATION_FILE_NAME, 4096, context->getWriteSettings());
HashingWriteBuffer out_hashing(*out);
auto out_serialization = new_data_part->getDataPartStorage().writeFile(IMergeTreeDataPart::SERIALIZATION_FILE_NAME, 4096, context->getWriteSettings());
HashingWriteBuffer out_hashing(*out_serialization);
new_data_part->getSerializationInfos().writeJSON(out_hashing);
new_data_part->checksums.files[IMergeTreeDataPart::SERIALIZATION_FILE_NAME].file_size = out_hashing.count();
new_data_part->checksums.files[IMergeTreeDataPart::SERIALIZATION_FILE_NAME].file_hash = out_hashing.getHash();
if (sync)
out_hashing.sync();
written_files.push_back(std::move(out_serialization));
}
{
/// Write file with checksums.
auto out_checksums = new_data_part->getDataPartStorage().writeFile("checksums.txt", 4096, context->getWriteSettings());
new_data_part->checksums.write(*out_checksums);
if (sync)
out_checksums->sync();
} /// close fd
written_files.push_back(std::move(out_checksums));
}
{
auto out = new_data_part->getDataPartStorage().writeFile(IMergeTreeDataPart::DEFAULT_COMPRESSION_CODEC_FILE_NAME, 4096, context->getWriteSettings());
DB::writeText(queryToString(codec->getFullCodecDesc()), *out);
if (sync)
out->sync();
} /// close fd
auto out_comp = new_data_part->getDataPartStorage().writeFile(IMergeTreeDataPart::DEFAULT_COMPRESSION_CODEC_FILE_NAME, 4096, context->getWriteSettings());
DB::writeText(queryToString(codec->getFullCodecDesc()), *out_comp);
written_files.push_back(std::move(out_comp));
}
{
/// Write a file with a description of columns.
auto out_columns = new_data_part->getDataPartStorage().writeFile("columns.txt", 4096, context->getWriteSettings());
new_data_part->getColumns().writeText(*out_columns);
written_files.push_back(std::move(out_columns));
}
for (auto & file : written_files)
{
file->finalize();
if (sync)
out_columns->sync();
} /// close fd
file->sync();
}
/// Close files
written_files.clear();
new_data_part->rows_count = source_part->rows_count;
new_data_part->index_granularity = source_part->index_granularity;