Merge pull request #15639 from GrigoryPervakov/tmp-s3-merge

Use tmp disk for vertical merge files
This commit is contained in:
alexey-milovidov 2020-10-08 20:58:07 +03:00 committed by GitHub
commit c693c653a5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 10 additions and 8 deletions

View File

@ -624,6 +624,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
MergeList::Entry & merge_entry,
TableLockHolder &,
time_t time_of_merge,
const Context & context,
const ReservationPtr & space_reservation,
bool deduplicate)
{
@ -706,7 +707,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
MergeAlgorithm merge_alg = chooseMergeAlgorithm(parts, sum_input_rows_upper_bound, gathering_columns, deduplicate, need_remove_expired_values);
merge_entry->merge_algorithm = merge_alg;
LOG_DEBUG(log, "Selected MergeAlgorithm: {}", ((merge_alg == MergeAlgorithm::Vertical) ? "Vertical" : "Horizontal"));
LOG_DEBUG(log, "Selected MergeAlgorithm: {}", toString(merge_alg));
/// Note: this is done before creating input streams, because otherwise data.data_parts_mutex
/// (which is locked in data.getTotalActiveSizeInBytes())
@ -715,7 +716,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
/// deadlock is impossible.
auto compression_codec = data.getCompressionCodecForPart(merge_entry->total_size_bytes_compressed, new_data_part->ttl_infos, time_of_merge);
/// TODO: Should it go through IDisk interface?
auto tmp_disk = context.getTemporaryVolume()->getDisk();
String rows_sources_file_path;
std::unique_ptr<WriteBufferFromFileBase> rows_sources_uncompressed_write_buf;
std::unique_ptr<WriteBuffer> rows_sources_write_buf;
@ -723,9 +724,9 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
if (merge_alg == MergeAlgorithm::Vertical)
{
disk->createDirectories(new_part_tmp_path);
tmp_disk->createDirectories(new_part_tmp_path);
rows_sources_file_path = new_part_tmp_path + "rows_sources";
rows_sources_uncompressed_write_buf = disk->writeFile(rows_sources_file_path);
rows_sources_uncompressed_write_buf = tmp_disk->writeFile(rows_sources_file_path);
rows_sources_write_buf = std::make_unique<CompressedWriteBuffer>(*rows_sources_uncompressed_write_buf);
for (const MergeTreeData::DataPartPtr & part : parts)
@ -955,7 +956,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
+ ") differs from number of bytes written to rows_sources file (" + toString(rows_sources_count)
+ "). It is a bug.", ErrorCodes::LOGICAL_ERROR);
CompressedReadBufferFromFile rows_sources_read_buf(disk->readFile(rows_sources_file_path));
CompressedReadBufferFromFile rows_sources_read_buf(tmp_disk->readFile(rows_sources_file_path));
IMergedBlockOutputStream::WrittenOffsetColumns written_offset_columns;
for (size_t column_num = 0, gathering_column_names_size = gathering_column_names.size();
@ -1027,7 +1028,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
merge_entry->progress.store(progress_before + column_sizes->columnWeight(column_name), std::memory_order_relaxed);
}
disk->remove(rows_sources_file_path);
tmp_disk->remove(rows_sources_file_path);
}
for (const auto & part : parts)

View File

@ -114,6 +114,7 @@ public:
MergeListEntry & merge_entry,
TableLockHolder & table_lock_holder,
time_t time_of_merge,
const Context & context,
const ReservationPtr & space_reservation,
bool deduplicate);

View File

@ -744,7 +744,7 @@ bool StorageMergeTree::merge(
try
{
new_part = merger_mutator.mergePartsToTemporaryPart(
future_part, metadata_snapshot, *merge_entry, table_lock_holder, time(nullptr),
future_part, metadata_snapshot, *merge_entry, table_lock_holder, time(nullptr), global_context,
merging_tagger->reserved_space, deduplicate);
merger_mutator.renameMergedTemporaryPart(new_part, future_part.parts, nullptr);

View File

@ -1450,7 +1450,7 @@ bool StorageReplicatedMergeTree::tryExecuteMerge(const LogEntry & entry)
{
part = merger_mutator.mergePartsToTemporaryPart(
future_merged_part, metadata_snapshot, *merge_entry,
table_lock, entry.create_time, reserved_space, entry.deduplicate);
table_lock, entry.create_time, global_context, reserved_space, entry.deduplicate);
merger_mutator.renameMergedTemporaryPart(part, parts, &transaction);