diff --git a/src/Processors/Transforms/ColumnGathererTransform.cpp b/src/Processors/Transforms/ColumnGathererTransform.cpp index 52fa42fdb51..f266d5c2e2f 100644 --- a/src/Processors/Transforms/ColumnGathererTransform.cpp +++ b/src/Processors/Transforms/ColumnGathererTransform.cpp @@ -183,13 +183,14 @@ void ColumnGathererStream::consume(Input & input, size_t source_num) ColumnGathererTransform::ColumnGathererTransform( const Block & header, size_t num_inputs, - ReadBuffer & row_sources_buf_, + std::unique_ptr row_sources_buf_, size_t block_preferred_size_rows_, size_t block_preferred_size_bytes_, bool is_result_sparse_) : IMergingTransform( num_inputs, header, header, /*have_all_inputs_=*/ true, /*limit_hint_=*/ 0, /*always_read_till_end_=*/ false, - num_inputs, row_sources_buf_, block_preferred_size_rows_, block_preferred_size_bytes_, is_result_sparse_) + num_inputs, *row_sources_buf_, block_preferred_size_rows_, block_preferred_size_bytes_, is_result_sparse_) + , row_sources_buf_holder(std::move(row_sources_buf_)) , log(getLogger("ColumnGathererStream")) { if (header.columns() != 1) diff --git a/src/Processors/Transforms/ColumnGathererTransform.h b/src/Processors/Transforms/ColumnGathererTransform.h index fbc9a6bfcc6..ce2671ce0bf 100644 --- a/src/Processors/Transforms/ColumnGathererTransform.h +++ b/src/Processors/Transforms/ColumnGathererTransform.h @@ -115,7 +115,7 @@ public: ColumnGathererTransform( const Block & header, size_t num_inputs, - ReadBuffer & row_sources_buf_, + std::unique_ptr row_sources_buf_, size_t block_preferred_size_rows_, size_t block_preferred_size_bytes_, bool is_result_sparse_); @@ -124,6 +124,8 @@ public: protected: void onFinish() override; + + std::unique_ptr row_sources_buf_holder; /// Keep ownership of row_sources_buf while it's in use by ColumnGathererStream. LoggerPtr log; }; diff --git a/src/Storages/MergeTree/MergeTask.cpp b/src/Storages/MergeTree/MergeTask.cpp index 398a9472456..8c1dca7ac10 100644 --- a/src/Storages/MergeTree/MergeTask.cpp +++ b/src/Storages/MergeTree/MergeTask.cpp @@ -43,6 +43,7 @@ #include #include #include +#include #include #include #include @@ -90,6 +91,62 @@ static ColumnsStatistics getStatisticsForColumns( return all_statistics; } +/// Manages the "rows_sources" temporary file that is used during vertical merge. +class RowsSourcesTemporaryFile : public ITemporaryFileLookup +{ +public: + /// A logical name of the temporary file under which it will be known to the plan steps that use it. + static constexpr auto FILE_ID = "rows_sources"; + + explicit RowsSourcesTemporaryFile(TemporaryDataOnDiskScopePtr temporary_data_on_disk_) + : tmp_disk(std::make_unique(temporary_data_on_disk_)) + , uncompressed_write_buffer(tmp_disk->createRawStream()) + , tmp_file_name_on_disk(uncompressed_write_buffer->getFileName()) + { + } + + WriteBuffer & getTemporaryFileForWriting(const String & name) override + { + if (name != FILE_ID) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected temporary file name requested: {}", name); + + if (write_buffer) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Temporary file was already requested for writing, there musto be only one writer"); + + write_buffer = (std::make_unique(*uncompressed_write_buffer)); + return *write_buffer; + } + + std::unique_ptr getTemporaryFileForReading(const String & name) override + { + if (name != FILE_ID) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected temporary file name requested: {}", name); + + if (!finalized) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Temporary file is not finalized yet"); + + /// Reopen the file for each read so that multiple reads can be performed in parallel and there is no need to seek to the beginning. + auto raw_file_read_buffer = std::make_unique(tmp_file_name_on_disk); + return std::make_unique(std::move(raw_file_read_buffer)); + } + + /// Returns written data size in bytes + size_t finalizeWriting() + { + write_buffer->finalize(); + uncompressed_write_buffer->finalize(); + finalized = true; + return write_buffer->count(); + } + +private: + std::unique_ptr tmp_disk; + std::unique_ptr uncompressed_write_buffer; + std::unique_ptr write_buffer; + const String tmp_file_name_on_disk; + bool finalized = false; +}; + static void addMissedColumnsToSerializationInfos( size_t num_rows_in_parts, const Names & part_columns, @@ -364,8 +421,6 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare() const ctx->compression_codec = global_ctx->data->getCompressionCodecForPart( global_ctx->merge_list_element_ptr->total_size_bytes_compressed, global_ctx->new_data_part->ttl_infos, global_ctx->time_of_merge); - ctx->tmp_disk = std::make_unique(global_ctx->context->getTempDataOnDisk()); - switch (global_ctx->chosen_merge_algorithm) { case MergeAlgorithm::Horizontal: @@ -378,8 +433,7 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare() const } case MergeAlgorithm::Vertical: { - ctx->rows_sources_uncompressed_write_buf = ctx->tmp_disk->createRawStream(); - ctx->rows_sources_write_buf = std::make_unique(*ctx->rows_sources_uncompressed_write_buf); + ctx->rows_sources_temporary_file = std::make_shared(global_ctx->context->getTempDataOnDisk()); std::map local_merged_column_to_size; for (const auto & part : global_ctx->future_part->parts) @@ -499,11 +553,9 @@ MergeTask::StageRuntimeContextPtr MergeTask::ExecuteAndFinalizeHorizontalPart::g auto new_ctx = std::make_shared(); - new_ctx->rows_sources_write_buf = std::move(ctx->rows_sources_write_buf); - new_ctx->rows_sources_uncompressed_write_buf = std::move(ctx->rows_sources_uncompressed_write_buf); + new_ctx->rows_sources_temporary_file = std::move(ctx->rows_sources_temporary_file); new_ctx->column_sizes = std::move(ctx->column_sizes); new_ctx->compression_codec = std::move(ctx->compression_codec); - new_ctx->tmp_disk = std::move(ctx->tmp_disk); new_ctx->it_name_and_type = std::move(ctx->it_name_and_type); new_ctx->read_with_direct_io = std::move(ctx->read_with_direct_io); new_ctx->need_sync = std::move(ctx->need_sync); @@ -760,11 +812,7 @@ bool MergeTask::VerticalMergeStage::prepareVerticalMergeForAllColumns() const global_ctx->merge_list_element_ptr->progress.store(ctx->column_sizes->keyColumnsWeight(), std::memory_order_relaxed); /// Ensure data has written to disk. - ctx->rows_sources_write_buf->finalize(); - ctx->rows_sources_uncompressed_write_buf->finalize(); - ctx->rows_sources_uncompressed_write_buf->finalize(); - - size_t rows_sources_count = ctx->rows_sources_write_buf->count(); + size_t rows_sources_count = ctx->rows_sources_temporary_file->finalizeWriting(); /// In special case, when there is only one source part, and no rows were skipped, we may have /// skipped writing rows_sources file. Otherwise rows_sources_count must be equal to the total /// number of input rows. @@ -775,29 +823,6 @@ bool MergeTask::VerticalMergeStage::prepareVerticalMergeForAllColumns() const "of bytes written to rows_sources file ({}). It is a bug.", sum_input_rows_exact, input_rows_filtered, rows_sources_count); - /// TemporaryDataOnDisk::createRawStream returns WriteBufferFromFile implementing IReadableWriteBuffer - /// and we expect to get ReadBufferFromFile here. - /// So, it's relatively safe to use dynamic_cast here and downcast to ReadBufferFromFile. - auto * wbuf_readable = dynamic_cast(ctx->rows_sources_uncompressed_write_buf.get()); - std::unique_ptr reread_buf = wbuf_readable ? wbuf_readable->tryGetReadBuffer() : nullptr; - if (!reread_buf) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot read temporary file {}", ctx->rows_sources_uncompressed_write_buf->getFileName()); - - auto * reread_buffer_raw = dynamic_cast(reread_buf.get()); - if (!reread_buffer_raw) - { - const auto & reread_buf_ref = *reread_buf; - throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected ReadBufferFromFileBase, but got {}", demangle(typeid(reread_buf_ref).name())); - } - /// Move ownership from std::unique_ptr to std::unique_ptr for CompressedReadBufferFromFile. - /// First, release ownership from unique_ptr to base type. - reread_buf.release(); /// NOLINT(bugprone-unused-return-value,hicpp-ignored-remove-result): we already have the pointer value in `reread_buffer_raw` - - /// Then, move ownership to unique_ptr to concrete type. - std::unique_ptr reread_buffer_from_file(reread_buffer_raw); - - /// CompressedReadBufferFromFile expects std::unique_ptr as argument. - ctx->rows_sources_read_buf = std::make_unique(std::move(reread_buffer_from_file)); ctx->it_name_and_type = global_ctx->gathering_columns.cbegin(); const auto & settings = global_ctx->context->getSettingsRef(); @@ -828,12 +853,12 @@ class ColumnGathererStep : public ITransformingStep public: ColumnGathererStep( const DataStream & input_stream_, - CompressedReadBufferFromFile * rows_sources_read_buf_, + const String & rows_sources_temporary_file_name_, UInt64 merge_block_size_rows_, UInt64 merge_block_size_bytes_, bool is_result_sparse_) : ITransformingStep(input_stream_, input_stream_.header, getTraits()) - , rows_sources_read_buf(rows_sources_read_buf_) + , rows_sources_temporary_file_name(rows_sources_temporary_file_name_) , merge_block_size_rows(merge_block_size_rows_) , merge_block_size_bytes(merge_block_size_bytes_) , is_result_sparse(is_result_sparse_) @@ -841,17 +866,20 @@ public: String getName() const override { return "ColumnGatherer"; } - void transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings & /*pipelineSettings*/) override + void transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings & pipeline_settings) override { const auto &header = pipeline.getHeader(); const auto input_streams_count = pipeline.getNumStreams(); - rows_sources_read_buf->seek(0, 0); + if (!pipeline_settings.temporary_file_lookup) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Temporary file lookup is not set in pipeline settings for vertical merge"); + + auto rows_sources_read_buf = pipeline_settings.temporary_file_lookup->getTemporaryFileForReading(rows_sources_temporary_file_name); auto transform = std::make_unique( header, input_streams_count, - *rows_sources_read_buf, + std::move(rows_sources_read_buf), merge_block_size_rows, merge_block_size_bytes, is_result_sparse); @@ -881,7 +909,7 @@ private: } MergeTreeData::MergingParams merging_params{}; - CompressedReadBufferFromFile * rows_sources_read_buf; + const String rows_sources_temporary_file_name; const UInt64 merge_block_size_rows; const UInt64 merge_block_size_bytes; const bool is_result_sparse; @@ -932,7 +960,7 @@ MergeTask::VerticalMergeRuntimeContext::PreparedColumnPipeline MergeTask::Vertic const auto data_settings = global_ctx->data->getSettings(); auto merge_step = std::make_unique( merge_column_query_plan.getCurrentDataStream(), - ctx->rows_sources_read_buf.get(), //global_ctx->rows_sources_temporary_file_name, + RowsSourcesTemporaryFile::FILE_ID, data_settings->merge_max_block_size, data_settings->merge_max_block_size_bytes, is_result_sparse); @@ -960,7 +988,8 @@ MergeTask::VerticalMergeRuntimeContext::PreparedColumnPipeline MergeTask::Vertic } auto pipeline_settings = BuildQueryPipelineSettings::fromContext(global_ctx->context); - auto optimization_settings = QueryPlanOptimizationSettings::fromContext(global_ctx->context); + pipeline_settings.temporary_file_lookup = ctx->rows_sources_temporary_file; + auto optimization_settings = QueryPlanOptimizationSettings::fromContext(global_ctx->context); auto builder = merge_column_query_plan.buildQueryPipeline(optimization_settings, pipeline_settings); return {QueryPipelineBuilder::getPipeline(std::move(*builder)), std::move(indexes_to_recalc)}; @@ -1012,10 +1041,6 @@ void MergeTask::VerticalMergeStage::prepareVerticalMergeForOneColumn() const global_ctx->to->getIndexGranularity()); ctx->column_elems_written = 0; - - /// rows_sources_read_buf is reused for each column so we need to rewind it explicitly each time - /// This sharing also prevents from from running multiple merge of individual columns in parallel. - ctx->rows_sources_read_buf->seek(0, 0); } @@ -1329,7 +1354,7 @@ public: const SortDescription & sort_description_, const Names partition_key_columns_, const MergeTreeData::MergingParams & merging_params_, - WriteBuffer * rows_sources_write_buf_, + const String & rows_sources_temporary_file_name_, UInt64 merge_block_size_rows_, UInt64 merge_block_size_bytes_, bool blocks_are_granules_size_, @@ -1339,7 +1364,7 @@ public: , sort_description(sort_description_) , partition_key_columns(partition_key_columns_) , merging_params(merging_params_) - , rows_sources_write_buf(rows_sources_write_buf_) + , rows_sources_temporary_file_name(rows_sources_temporary_file_name_) , merge_block_size_rows(merge_block_size_rows_) , merge_block_size_bytes(merge_block_size_bytes_) , blocks_are_granules_size(blocks_are_granules_size_) @@ -1349,7 +1374,7 @@ public: String getName() const override { return "ApplyMergePolicy"; } - void transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings & /*pipelineSettings*/) override + void transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings & pipeline_settings) override { /// The order of the streams is important: when the key is matched, the elements go in the order of the source stream number. /// In the merged part, the lines with the same key must be in the ascending order of the identifier of original part, @@ -1359,6 +1384,14 @@ public: const auto &header = pipeline.getHeader(); const auto input_streams_count = pipeline.getNumStreams(); + WriteBuffer * rows_sources_write_buf = nullptr; + if (!rows_sources_temporary_file_name.empty()) + { + if (!pipeline_settings.temporary_file_lookup) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Temporary file lookup is not set in pipeline settings for vertical merge"); + rows_sources_write_buf = &pipeline_settings.temporary_file_lookup->getTemporaryFileForWriting(rows_sources_temporary_file_name); + } + switch (merging_params.mode) { case MergeTreeData::MergingParams::Ordinary: @@ -1448,7 +1481,7 @@ private: const SortDescription sort_description; const Names partition_key_columns; const MergeTreeData::MergingParams merging_params{}; - WriteBuffer * rows_sources_write_buf; + const String rows_sources_temporary_file_name; const UInt64 merge_block_size_rows; const UInt64 merge_block_size_bytes; const bool blocks_are_granules_size; @@ -1645,8 +1678,9 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream() const for (size_t i = 0; i < sort_columns_size; ++i) sort_description.emplace_back(sort_columns[i], 1, 1); + const bool is_vertical_merge = (global_ctx->chosen_merge_algorithm == MergeAlgorithm::Vertical); /// If merge is vertical we cannot calculate it - ctx->blocks_are_granules_size = (global_ctx->chosen_merge_algorithm == MergeAlgorithm::Vertical); + ctx->blocks_are_granules_size = is_vertical_merge; if (global_ctx->cleanup && !data_settings->allow_experimental_replacing_merge_with_cleanup) throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "Experimental merges with CLEANUP are not allowed"); @@ -1656,7 +1690,7 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream() const sort_description, partition_key_columns, ctx->merging_params, - ctx->rows_sources_write_buf.get(), + (is_vertical_merge ? RowsSourcesTemporaryFile::FILE_ID : ""), /// rows_sources temporaty file is used only for vertical merge data_settings->merge_max_block_size, data_settings->merge_max_block_size_bytes, ctx->blocks_are_granules_size, @@ -1722,7 +1756,8 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream() const { auto pipeline_settings = BuildQueryPipelineSettings::fromContext(global_ctx->context); - auto optimization_settings = QueryPlanOptimizationSettings::fromContext(global_ctx->context); + pipeline_settings.temporary_file_lookup = ctx->rows_sources_temporary_file; + auto optimization_settings = QueryPlanOptimizationSettings::fromContext(global_ctx->context); auto builder = merge_parts_query_plan.buildQueryPipeline(optimization_settings, pipeline_settings); global_ctx->merged_pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder)); diff --git a/src/Storages/MergeTree/MergeTask.h b/src/Storages/MergeTree/MergeTask.h index 84f00d521fd..ef187e1de2f 100644 --- a/src/Storages/MergeTree/MergeTask.h +++ b/src/Storages/MergeTree/MergeTask.h @@ -40,6 +40,7 @@ namespace DB class MergeTask; using MergeTaskPtr = std::shared_ptr; +class RowsSourcesTemporaryFile; /** * Overview of the merge algorithm @@ -227,14 +228,12 @@ private: bool need_prefix; MergeTreeData::MergingParams merging_params{}; - TemporaryDataOnDiskPtr tmp_disk{nullptr}; DiskPtr disk{nullptr}; bool need_remove_expired_values{false}; bool force_ttl{false}; CompressionCodecPtr compression_codec{nullptr}; size_t sum_input_rows_upper_bound{0}; - std::unique_ptr rows_sources_uncompressed_write_buf{nullptr}; - std::unique_ptr rows_sources_write_buf{nullptr}; + std::shared_ptr rows_sources_temporary_file; std::optional column_sizes{}; /// For projections to rebuild @@ -314,11 +313,9 @@ private: struct VerticalMergeRuntimeContext : public IStageRuntimeContext { /// Begin dependencies from previous stage - std::unique_ptr rows_sources_uncompressed_write_buf{nullptr}; - std::unique_ptr rows_sources_write_buf{nullptr}; + std::shared_ptr rows_sources_temporary_file; std::optional column_sizes; CompressionCodecPtr compression_codec; - TemporaryDataOnDiskPtr tmp_disk{nullptr}; std::list::const_iterator it_name_and_type; bool read_with_direct_io{false}; bool need_sync{false}; @@ -350,7 +347,6 @@ private: size_t column_elems_written{0}; QueryPipeline column_parts_pipeline; std::unique_ptr executor; - std::unique_ptr rows_sources_read_buf{nullptr}; UInt64 elapsed_execute_ns{0}; };