Merge pull request #69383 from ClickHouse/tmp_files_lookup

Refactor temporary file usage in MergeTask with QueryPlan
This commit is contained in:
Alexander Gololobov 2024-09-12 10:57:15 +00:00 committed by GitHub
commit bbcde19910
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 139 additions and 65 deletions

View File

@ -12,12 +12,15 @@ namespace DB
struct Settings;
class QueryStatus;
using QueryStatusPtr = std::shared_ptr<QueryStatus>;
struct ITemporaryFileLookup;
using TemporaryFileLookupPtr = std::shared_ptr<ITemporaryFileLookup>;
struct BuildQueryPipelineSettings
{
ExpressionActionsSettings actions_settings;
QueryStatusPtr process_list_element;
ProgressCallback progress_callback = nullptr;
TemporaryFileLookupPtr temporary_file_lookup;
const ExpressionActionsSettings & getActionsSettings() const { return actions_settings; }
static BuildQueryPipelineSettings fromContext(ContextPtr from);

View File

@ -0,0 +1,31 @@
#pragma once
#include <base/types.h>
#include <boost/noncopyable.hpp>
#include <vector>
#include <memory>
namespace DB
{
class WriteBuffer;
class ReadBuffer;
/// Interface for accessing temporary files by some logical name (or id).
/// While building query pipeline processors can lookup temporary files by some id and use them for writing and/or reading temporary data
/// without knowing what exactly is behind the name: local file, memory buffer, object in cloud storage, etc.
struct ITemporaryFileLookup : boost::noncopyable
{
virtual ~ITemporaryFileLookup() = default;
/// Give the caller a temporary write buffer, but don't give away the ownership.
virtual WriteBuffer & getTemporaryFileForWriting(const String & file_id) = 0;
/// Give the caller a temporary read buffer, it exclusively belongs to the caller.
/// Other callers can get their own read buffer for the same temporary file.
virtual std::unique_ptr<ReadBuffer> getTemporaryFileForReading(const String & file_id) = 0;
};
using TemporaryFileLookupPtr = std::shared_ptr<ITemporaryFileLookup>;
}

View File

@ -183,13 +183,14 @@ void ColumnGathererStream::consume(Input & input, size_t source_num)
ColumnGathererTransform::ColumnGathererTransform(
const Block & header,
size_t num_inputs,
ReadBuffer & row_sources_buf_,
std::unique_ptr<ReadBuffer> row_sources_buf_,
size_t block_preferred_size_rows_,
size_t block_preferred_size_bytes_,
bool is_result_sparse_)
: IMergingTransform<ColumnGathererStream>(
num_inputs, header, header, /*have_all_inputs_=*/ true, /*limit_hint_=*/ 0, /*always_read_till_end_=*/ false,
num_inputs, row_sources_buf_, block_preferred_size_rows_, block_preferred_size_bytes_, is_result_sparse_)
num_inputs, *row_sources_buf_, block_preferred_size_rows_, block_preferred_size_bytes_, is_result_sparse_)
, row_sources_buf_holder(std::move(row_sources_buf_))
, log(getLogger("ColumnGathererStream"))
{
if (header.columns() != 1)

View File

@ -115,7 +115,7 @@ public:
ColumnGathererTransform(
const Block & header,
size_t num_inputs,
ReadBuffer & row_sources_buf_,
std::unique_ptr<ReadBuffer> row_sources_buf_,
size_t block_preferred_size_rows_,
size_t block_preferred_size_bytes_,
bool is_result_sparse_);
@ -124,6 +124,8 @@ public:
protected:
void onFinish() override;
std::unique_ptr<ReadBuffer> row_sources_buf_holder; /// Keep ownership of row_sources_buf while it's in use by ColumnGathererStream.
LoggerPtr log;
};

View File

@ -14,7 +14,7 @@
#include <Compression/CompressedWriteBuffer.h>
#include <DataTypes/ObjectUtils.h>
#include <DataTypes/Serializations/SerializationInfo.h>
#include <IO/IReadableWriteBuffer.h>
#include <IO/ReadBufferFromEmptyFile.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <Storages/MergeTree/MergeTreeSequentialSource.h>
@ -43,6 +43,7 @@
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPlan/UnionStep.h>
#include <Processors/QueryPlan/ExpressionStep.h>
#include <Processors/QueryPlan/TemporaryFiles.h>
#include <Interpreters/PreparedSets.h>
#include <Interpreters/MergeTreeTransaction.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
@ -90,6 +91,68 @@ static ColumnsStatistics getStatisticsForColumns(
return all_statistics;
}
/// Manages the "rows_sources" temporary file that is used during vertical merge.
class RowsSourcesTemporaryFile : public ITemporaryFileLookup
{
public:
/// A logical name of the temporary file under which it will be known to the plan steps that use it.
static constexpr auto FILE_ID = "rows_sources";
explicit RowsSourcesTemporaryFile(TemporaryDataOnDiskScopePtr temporary_data_on_disk_)
: tmp_disk(std::make_unique<TemporaryDataOnDisk>(temporary_data_on_disk_))
, uncompressed_write_buffer(tmp_disk->createRawStream())
, tmp_file_name_on_disk(uncompressed_write_buffer->getFileName())
{
}
WriteBuffer & getTemporaryFileForWriting(const String & name) override
{
if (name != FILE_ID)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected temporary file name requested: {}", name);
if (write_buffer)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Temporary file was already requested for writing, there musto be only one writer");
write_buffer = (std::make_unique<CompressedWriteBuffer>(*uncompressed_write_buffer));
return *write_buffer;
}
std::unique_ptr<ReadBuffer> getTemporaryFileForReading(const String & name) override
{
if (name != FILE_ID)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected temporary file name requested: {}", name);
if (!finalized)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Temporary file is not finalized yet");
/// tmp_disk might not create real file if no data was written to it.
if (final_size == 0)
return std::make_unique<ReadBufferFromEmptyFile>();
/// Reopen the file for each read so that multiple reads can be performed in parallel and there is no need to seek to the beginning.
auto raw_file_read_buffer = std::make_unique<ReadBufferFromFile>(tmp_file_name_on_disk);
return std::make_unique<CompressedReadBufferFromFile>(std::move(raw_file_read_buffer));
}
/// Returns written data size in bytes
size_t finalizeWriting()
{
write_buffer->finalize();
uncompressed_write_buffer->finalize();
finalized = true;
final_size = write_buffer->count();
return final_size;
}
private:
std::unique_ptr<TemporaryDataOnDisk> tmp_disk;
std::unique_ptr<WriteBufferFromFileBase> uncompressed_write_buffer;
std::unique_ptr<WriteBuffer> write_buffer;
const String tmp_file_name_on_disk;
bool finalized = false;
size_t final_size = 0;
};
static void addMissedColumnsToSerializationInfos(
size_t num_rows_in_parts,
const Names & part_columns,
@ -364,8 +427,6 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare() const
ctx->compression_codec = global_ctx->data->getCompressionCodecForPart(
global_ctx->merge_list_element_ptr->total_size_bytes_compressed, global_ctx->new_data_part->ttl_infos, global_ctx->time_of_merge);
ctx->tmp_disk = std::make_unique<TemporaryDataOnDisk>(global_ctx->context->getTempDataOnDisk());
switch (global_ctx->chosen_merge_algorithm)
{
case MergeAlgorithm::Horizontal:
@ -378,8 +439,7 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare() const
}
case MergeAlgorithm::Vertical:
{
ctx->rows_sources_uncompressed_write_buf = ctx->tmp_disk->createRawStream();
ctx->rows_sources_write_buf = std::make_unique<CompressedWriteBuffer>(*ctx->rows_sources_uncompressed_write_buf);
ctx->rows_sources_temporary_file = std::make_shared<RowsSourcesTemporaryFile>(global_ctx->context->getTempDataOnDisk());
std::map<String, UInt64> local_merged_column_to_size;
for (const auto & part : global_ctx->future_part->parts)
@ -499,11 +559,9 @@ MergeTask::StageRuntimeContextPtr MergeTask::ExecuteAndFinalizeHorizontalPart::g
auto new_ctx = std::make_shared<VerticalMergeRuntimeContext>();
new_ctx->rows_sources_write_buf = std::move(ctx->rows_sources_write_buf);
new_ctx->rows_sources_uncompressed_write_buf = std::move(ctx->rows_sources_uncompressed_write_buf);
new_ctx->rows_sources_temporary_file = std::move(ctx->rows_sources_temporary_file);
new_ctx->column_sizes = std::move(ctx->column_sizes);
new_ctx->compression_codec = std::move(ctx->compression_codec);
new_ctx->tmp_disk = std::move(ctx->tmp_disk);
new_ctx->it_name_and_type = std::move(ctx->it_name_and_type);
new_ctx->read_with_direct_io = std::move(ctx->read_with_direct_io);
new_ctx->need_sync = std::move(ctx->need_sync);
@ -760,11 +818,7 @@ bool MergeTask::VerticalMergeStage::prepareVerticalMergeForAllColumns() const
global_ctx->merge_list_element_ptr->progress.store(ctx->column_sizes->keyColumnsWeight(), std::memory_order_relaxed);
/// Ensure data has written to disk.
ctx->rows_sources_write_buf->finalize();
ctx->rows_sources_uncompressed_write_buf->finalize();
ctx->rows_sources_uncompressed_write_buf->finalize();
size_t rows_sources_count = ctx->rows_sources_write_buf->count();
size_t rows_sources_count = ctx->rows_sources_temporary_file->finalizeWriting();
/// In special case, when there is only one source part, and no rows were skipped, we may have
/// skipped writing rows_sources file. Otherwise rows_sources_count must be equal to the total
/// number of input rows.
@ -775,29 +829,6 @@ bool MergeTask::VerticalMergeStage::prepareVerticalMergeForAllColumns() const
"of bytes written to rows_sources file ({}). It is a bug.",
sum_input_rows_exact, input_rows_filtered, rows_sources_count);
/// TemporaryDataOnDisk::createRawStream returns WriteBufferFromFile implementing IReadableWriteBuffer
/// and we expect to get ReadBufferFromFile here.
/// So, it's relatively safe to use dynamic_cast here and downcast to ReadBufferFromFile.
auto * wbuf_readable = dynamic_cast<IReadableWriteBuffer *>(ctx->rows_sources_uncompressed_write_buf.get());
std::unique_ptr<ReadBuffer> reread_buf = wbuf_readable ? wbuf_readable->tryGetReadBuffer() : nullptr;
if (!reread_buf)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot read temporary file {}", ctx->rows_sources_uncompressed_write_buf->getFileName());
auto * reread_buffer_raw = dynamic_cast<ReadBufferFromFileBase *>(reread_buf.get());
if (!reread_buffer_raw)
{
const auto & reread_buf_ref = *reread_buf;
throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected ReadBufferFromFileBase, but got {}", demangle(typeid(reread_buf_ref).name()));
}
/// Move ownership from std::unique_ptr<ReadBuffer> to std::unique_ptr<ReadBufferFromFile> for CompressedReadBufferFromFile.
/// First, release ownership from unique_ptr to base type.
reread_buf.release(); /// NOLINT(bugprone-unused-return-value,hicpp-ignored-remove-result): we already have the pointer value in `reread_buffer_raw`
/// Then, move ownership to unique_ptr to concrete type.
std::unique_ptr<ReadBufferFromFileBase> reread_buffer_from_file(reread_buffer_raw);
/// CompressedReadBufferFromFile expects std::unique_ptr<ReadBufferFromFile> as argument.
ctx->rows_sources_read_buf = std::make_unique<CompressedReadBufferFromFile>(std::move(reread_buffer_from_file));
ctx->it_name_and_type = global_ctx->gathering_columns.cbegin();
const auto & settings = global_ctx->context->getSettingsRef();
@ -828,12 +859,12 @@ class ColumnGathererStep : public ITransformingStep
public:
ColumnGathererStep(
const DataStream & input_stream_,
CompressedReadBufferFromFile * rows_sources_read_buf_,
const String & rows_sources_temporary_file_name_,
UInt64 merge_block_size_rows_,
UInt64 merge_block_size_bytes_,
bool is_result_sparse_)
: ITransformingStep(input_stream_, input_stream_.header, getTraits())
, rows_sources_read_buf(rows_sources_read_buf_)
, rows_sources_temporary_file_name(rows_sources_temporary_file_name_)
, merge_block_size_rows(merge_block_size_rows_)
, merge_block_size_bytes(merge_block_size_bytes_)
, is_result_sparse(is_result_sparse_)
@ -841,17 +872,20 @@ public:
String getName() const override { return "ColumnGatherer"; }
void transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings & /*pipelineSettings*/) override
void transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings & pipeline_settings) override
{
const auto &header = pipeline.getHeader();
const auto input_streams_count = pipeline.getNumStreams();
rows_sources_read_buf->seek(0, 0);
if (!pipeline_settings.temporary_file_lookup)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Temporary file lookup is not set in pipeline settings for vertical merge");
auto rows_sources_read_buf = pipeline_settings.temporary_file_lookup->getTemporaryFileForReading(rows_sources_temporary_file_name);
auto transform = std::make_unique<ColumnGathererTransform>(
header,
input_streams_count,
*rows_sources_read_buf,
std::move(rows_sources_read_buf),
merge_block_size_rows,
merge_block_size_bytes,
is_result_sparse);
@ -881,7 +915,7 @@ private:
}
MergeTreeData::MergingParams merging_params{};
CompressedReadBufferFromFile * rows_sources_read_buf;
const String rows_sources_temporary_file_name;
const UInt64 merge_block_size_rows;
const UInt64 merge_block_size_bytes;
const bool is_result_sparse;
@ -932,7 +966,7 @@ MergeTask::VerticalMergeRuntimeContext::PreparedColumnPipeline MergeTask::Vertic
const auto data_settings = global_ctx->data->getSettings();
auto merge_step = std::make_unique<ColumnGathererStep>(
merge_column_query_plan.getCurrentDataStream(),
ctx->rows_sources_read_buf.get(), //global_ctx->rows_sources_temporary_file_name,
RowsSourcesTemporaryFile::FILE_ID,
data_settings->merge_max_block_size,
data_settings->merge_max_block_size_bytes,
is_result_sparse);
@ -961,6 +995,7 @@ MergeTask::VerticalMergeRuntimeContext::PreparedColumnPipeline MergeTask::Vertic
}
auto pipeline_settings = BuildQueryPipelineSettings::fromContext(global_ctx->context);
pipeline_settings.temporary_file_lookup = ctx->rows_sources_temporary_file;
auto optimization_settings = QueryPlanOptimizationSettings::fromContext(global_ctx->context);
auto builder = merge_column_query_plan.buildQueryPipeline(optimization_settings, pipeline_settings);
@ -1013,10 +1048,6 @@ void MergeTask::VerticalMergeStage::prepareVerticalMergeForOneColumn() const
global_ctx->to->getIndexGranularity());
ctx->column_elems_written = 0;
/// rows_sources_read_buf is reused for each column so we need to rewind it explicitly each time
/// This sharing also prevents from from running multiple merge of individual columns in parallel.
ctx->rows_sources_read_buf->seek(0, 0);
}
@ -1330,7 +1361,7 @@ public:
const SortDescription & sort_description_,
const Names partition_key_columns_,
const MergeTreeData::MergingParams & merging_params_,
WriteBuffer * rows_sources_write_buf_,
const String & rows_sources_temporary_file_name_,
UInt64 merge_block_size_rows_,
UInt64 merge_block_size_bytes_,
bool blocks_are_granules_size_,
@ -1340,7 +1371,7 @@ public:
, sort_description(sort_description_)
, partition_key_columns(partition_key_columns_)
, merging_params(merging_params_)
, rows_sources_write_buf(rows_sources_write_buf_)
, rows_sources_temporary_file_name(rows_sources_temporary_file_name_)
, merge_block_size_rows(merge_block_size_rows_)
, merge_block_size_bytes(merge_block_size_bytes_)
, blocks_are_granules_size(blocks_are_granules_size_)
@ -1350,7 +1381,7 @@ public:
String getName() const override { return "MergeParts"; }
void transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings & /*pipelineSettings*/) override
void transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings & pipeline_settings) override
{
/// The order of the streams is important: when the key is matched, the elements go in the order of the source stream number.
/// In the merged part, the lines with the same key must be in the ascending order of the identifier of original part,
@ -1360,6 +1391,14 @@ public:
const auto &header = pipeline.getHeader();
const auto input_streams_count = pipeline.getNumStreams();
WriteBuffer * rows_sources_write_buf = nullptr;
if (!rows_sources_temporary_file_name.empty())
{
if (!pipeline_settings.temporary_file_lookup)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Temporary file lookup is not set in pipeline settings for vertical merge");
rows_sources_write_buf = &pipeline_settings.temporary_file_lookup->getTemporaryFileForWriting(rows_sources_temporary_file_name);
}
switch (merging_params.mode)
{
case MergeTreeData::MergingParams::Ordinary:
@ -1449,7 +1488,7 @@ private:
const SortDescription sort_description;
const Names partition_key_columns;
const MergeTreeData::MergingParams merging_params{};
WriteBuffer * rows_sources_write_buf;
const String rows_sources_temporary_file_name;
const UInt64 merge_block_size_rows;
const UInt64 merge_block_size_bytes;
const bool blocks_are_granules_size;
@ -1606,8 +1645,9 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream() const
for (size_t i = 0; i < sort_columns_size; ++i)
sort_description.emplace_back(sort_columns[i], 1, 1);
const bool is_vertical_merge = (global_ctx->chosen_merge_algorithm == MergeAlgorithm::Vertical);
/// If merge is vertical we cannot calculate it
ctx->blocks_are_granules_size = (global_ctx->chosen_merge_algorithm == MergeAlgorithm::Vertical);
ctx->blocks_are_granules_size = is_vertical_merge;
if (global_ctx->cleanup && !data_settings->allow_experimental_replacing_merge_with_cleanup)
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "Experimental merges with CLEANUP are not allowed");
@ -1617,7 +1657,7 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream() const
sort_description,
partition_key_columns,
ctx->merging_params,
ctx->rows_sources_write_buf.get(),
(is_vertical_merge ? RowsSourcesTemporaryFile::FILE_ID : ""), /// rows_sources temporaty file is used only for vertical merge
data_settings->merge_max_block_size,
data_settings->merge_max_block_size_bytes,
ctx->blocks_are_granules_size,
@ -1682,6 +1722,7 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream() const
{
auto pipeline_settings = BuildQueryPipelineSettings::fromContext(global_ctx->context);
pipeline_settings.temporary_file_lookup = ctx->rows_sources_temporary_file;
auto optimization_settings = QueryPlanOptimizationSettings::fromContext(global_ctx->context);
auto builder = merge_parts_query_plan.buildQueryPipeline(optimization_settings, pipeline_settings);

View File

@ -40,6 +40,7 @@ namespace DB
class MergeTask;
using MergeTaskPtr = std::shared_ptr<MergeTask>;
class RowsSourcesTemporaryFile;
/**
* Overview of the merge algorithm
@ -227,14 +228,12 @@ private:
bool need_prefix;
MergeTreeData::MergingParams merging_params{};
TemporaryDataOnDiskPtr tmp_disk{nullptr};
DiskPtr disk{nullptr};
bool need_remove_expired_values{false};
bool force_ttl{false};
CompressionCodecPtr compression_codec{nullptr};
size_t sum_input_rows_upper_bound{0};
std::unique_ptr<WriteBufferFromFileBase> rows_sources_uncompressed_write_buf{nullptr};
std::unique_ptr<WriteBuffer> rows_sources_write_buf{nullptr};
std::shared_ptr<RowsSourcesTemporaryFile> rows_sources_temporary_file;
std::optional<ColumnSizeEstimator> column_sizes{};
/// For projections to rebuild
@ -314,11 +313,9 @@ private:
struct VerticalMergeRuntimeContext : public IStageRuntimeContext
{
/// Begin dependencies from previous stage
std::unique_ptr<WriteBufferFromFileBase> rows_sources_uncompressed_write_buf{nullptr};
std::unique_ptr<WriteBuffer> rows_sources_write_buf{nullptr};
std::shared_ptr<RowsSourcesTemporaryFile> rows_sources_temporary_file;
std::optional<ColumnSizeEstimator> column_sizes;
CompressionCodecPtr compression_codec;
TemporaryDataOnDiskPtr tmp_disk{nullptr};
std::list<DB::NameAndTypePair>::const_iterator it_name_and_type;
bool read_with_direct_io{false};
bool need_sync{false};
@ -350,7 +347,6 @@ private:
size_t column_elems_written{0};
QueryPipeline column_parts_pipeline;
std::unique_ptr<PullingPipelineExecutor> executor;
std::unique_ptr<CompressedReadBufferFromFile> rows_sources_read_buf{nullptr};
UInt64 elapsed_execute_ns{0};
};