Merge pull request #28299 from amosbird/fixtmprowsource

Use real tmp file instead of predefined “rows_sources" for vertical merges
This commit is contained in:
alexey-milovidov 2021-08-29 21:49:59 +03:00 committed by GitHub
commit 77f26e8673
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -783,7 +783,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
auto compression_codec = data.getCompressionCodecForPart(merge_entry->total_size_bytes_compressed, new_data_part->ttl_infos, time_of_merge); auto compression_codec = data.getCompressionCodecForPart(merge_entry->total_size_bytes_compressed, new_data_part->ttl_infos, time_of_merge);
auto tmp_disk = context->getTemporaryVolume()->getDisk(); auto tmp_disk = context->getTemporaryVolume()->getDisk();
String rows_sources_file_path; std::unique_ptr<TemporaryFile> rows_sources_file;
std::unique_ptr<WriteBufferFromFileBase> rows_sources_uncompressed_write_buf; std::unique_ptr<WriteBufferFromFileBase> rows_sources_uncompressed_write_buf;
std::unique_ptr<WriteBuffer> rows_sources_write_buf; std::unique_ptr<WriteBuffer> rows_sources_write_buf;
std::optional<ColumnSizeEstimator> column_sizes; std::optional<ColumnSizeEstimator> column_sizes;
@ -792,9 +792,8 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
if (chosen_merge_algorithm == MergeAlgorithm::Vertical) if (chosen_merge_algorithm == MergeAlgorithm::Vertical)
{ {
tmp_disk->createDirectories(new_part_tmp_path); rows_sources_file = createTemporaryFile(tmp_disk->getPath());
rows_sources_file_path = new_part_tmp_path + "rows_sources"; rows_sources_uncompressed_write_buf = tmp_disk->writeFile(fileName(rows_sources_file->path()));
rows_sources_uncompressed_write_buf = tmp_disk->writeFile(rows_sources_file_path);
rows_sources_write_buf = std::make_unique<CompressedWriteBuffer>(*rows_sources_uncompressed_write_buf); rows_sources_write_buf = std::make_unique<CompressedWriteBuffer>(*rows_sources_uncompressed_write_buf);
MergeTreeData::DataPart::ColumnToSize merged_column_to_size; MergeTreeData::DataPart::ColumnToSize merged_column_to_size;
@ -1030,7 +1029,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
+ ") differs from number of bytes written to rows_sources file (" + toString(rows_sources_count) + ") differs from number of bytes written to rows_sources file (" + toString(rows_sources_count)
+ "). It is a bug.", ErrorCodes::LOGICAL_ERROR); + "). It is a bug.", ErrorCodes::LOGICAL_ERROR);
CompressedReadBufferFromFile rows_sources_read_buf(tmp_disk->readFile(rows_sources_file_path)); CompressedReadBufferFromFile rows_sources_read_buf(tmp_disk->readFile(fileName(rows_sources_file->path())));
IMergedBlockOutputStream::WrittenOffsetColumns written_offset_columns; IMergedBlockOutputStream::WrittenOffsetColumns written_offset_columns;
for (size_t column_num = 0, gathering_column_names_size = gathering_column_names.size(); for (size_t column_num = 0, gathering_column_names_size = gathering_column_names.size();
@ -1101,8 +1100,6 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
merge_entry->bytes_written_uncompressed += column_gathered_stream.getProfileInfo().bytes; merge_entry->bytes_written_uncompressed += column_gathered_stream.getProfileInfo().bytes;
merge_entry->progress.store(progress_before + column_sizes->columnWeight(column_name), std::memory_order_relaxed); merge_entry->progress.store(progress_before + column_sizes->columnWeight(column_name), std::memory_order_relaxed);
} }
tmp_disk->removeFile(rows_sources_file_path);
} }
for (const auto & part : parts) for (const auto & part : parts)