revert unnecessary changes

This commit is contained in:
vdimir 2024-11-01 18:17:08 +00:00
parent 52d9b205ea
commit 38a3c67075
No known key found for this signature in database
GPG Key ID: 6EE4CE2BEDC51862
19 changed files with 134 additions and 73 deletions

View File

@ -5,7 +5,6 @@
#include <IO/WriteBuffer.h>
#include <IO/WriteHelpers.h>
#include <IO/Operators.h>
#include <Interpreters/TemporaryDataOnDisk.h>
#include <Common/logger_useful.h>
@ -30,18 +29,17 @@ CollapsingSortedAlgorithm::CollapsingSortedAlgorithm(
size_t max_block_size_rows_,
size_t max_block_size_bytes_,
LoggerPtr log_,
std::shared_ptr<TemporaryDataBuffer> temp_data_buffer_,
WriteBuffer * out_row_sources_buf_,
bool use_average_block_sizes)
: IMergingAlgorithmWithSharedChunks(
header_,
num_inputs,
std::move(description_),
temp_data_buffer_.get(),
out_row_sources_buf_,
max_row_refs,
std::make_unique<MergedData>(use_average_block_sizes, max_block_size_rows_, max_block_size_bytes_))
, sign_column_number(header_.getPositionByName(sign_column))
, only_positive_sign(only_positive_sign_)
, temp_data_buffer(temp_data_buffer_)
, log(log_)
{
}

View File

@ -11,8 +11,6 @@ namespace Poco
namespace DB
{
class TemporaryDataBuffer;
/** Merges several sorted inputs to one.
* For each group of consecutive identical values of the primary key (the columns by which the data is sorted),
* keeps no more than one row with the value of the column `sign_column = -1` ("negative row")
@ -37,7 +35,7 @@ public:
size_t max_block_size_rows_,
size_t max_block_size_bytes_,
LoggerPtr log_,
std::shared_ptr<TemporaryDataBuffer> temp_data_buffer_ = nullptr,
WriteBuffer * out_row_sources_buf_ = nullptr,
bool use_average_block_sizes = false);
const char * getName() const override { return "CollapsingSortedAlgorithm"; }
@ -64,8 +62,6 @@ private:
PODArray<RowSourcePart> current_row_sources; /// Sources of rows with the current primary key
size_t count_incorrect_data = 0; /// To prevent too many error messages from writing to the log.
std::shared_ptr<TemporaryDataBuffer> temp_data_buffer = nullptr;
LoggerPtr log;
void reportIncorrectData();

View File

@ -3,7 +3,6 @@
#include <IO/WriteBuffer.h>
#include <IO/WriteHelpers.h>
#include <IO/WriteBufferFromString.h>
#include <Interpreters/TemporaryDataOnDisk.h>
namespace DB
{
@ -16,7 +15,7 @@ MergingSortedAlgorithm::MergingSortedAlgorithm(
size_t max_block_size_bytes_,
SortingQueueStrategy sorting_queue_strategy_,
UInt64 limit_,
std::shared_ptr<TemporaryDataBuffer> out_row_sources_buf_,
WriteBuffer * out_row_sources_buf_,
bool use_average_block_sizes)
: header(std::move(header_))
, merged_data(use_average_block_sizes, max_block_size_, max_block_size_bytes_)

View File

@ -9,8 +9,6 @@
namespace DB
{
class TemporaryDataBuffer;
/// Merges several sorted inputs into one sorted output.
class MergingSortedAlgorithm final : public IMergingAlgorithm
{
@ -23,7 +21,7 @@ public:
size_t max_block_size_bytes_,
SortingQueueStrategy sorting_queue_strategy_,
UInt64 limit_ = 0,
std::shared_ptr<TemporaryDataBuffer> out_row_sources_buf_ = nullptr,
WriteBuffer * out_row_sources_buf_ = nullptr,
bool use_average_block_sizes = false);
void addInput();
@ -47,7 +45,7 @@ private:
/// Used in Vertical merge algorithm to gather non-PK/non-index columns (on next step)
/// If it is not nullptr then it should be populated during execution
std::shared_ptr<TemporaryDataBuffer> out_row_sources_buf = nullptr;
WriteBuffer * out_row_sources_buf = nullptr;
/// Chunks currently being merged.
Inputs current_inputs;

View File

@ -5,7 +5,6 @@
#include <IO/WriteBuffer.h>
#include <Columns/IColumn.h>
#include <Processors/Merges/Algorithms/RowRef.h>
#include <Interpreters/TemporaryDataOnDisk.h>
namespace DB
{
@ -38,13 +37,12 @@ ReplacingSortedAlgorithm::ReplacingSortedAlgorithm(
const String & version_column,
size_t max_block_size_rows,
size_t max_block_size_bytes,
std::shared_ptr<TemporaryDataBuffer> temp_data_buffer_,
WriteBuffer * out_row_sources_buf_,
bool use_average_block_sizes,
bool cleanup_,
bool enable_vertical_final_)
: IMergingAlgorithmWithSharedChunks(header_, num_inputs, std::move(description_), temp_data_buffer_.get(), max_row_refs, std::make_unique<MergedData>(use_average_block_sizes, max_block_size_rows, max_block_size_bytes))
: IMergingAlgorithmWithSharedChunks(header_, num_inputs, std::move(description_), out_row_sources_buf_, max_row_refs, std::make_unique<MergedData>(use_average_block_sizes, max_block_size_rows, max_block_size_bytes))
, cleanup(cleanup_), enable_vertical_final(enable_vertical_final_)
, temp_data_buffer(temp_data_buffer_)
{
if (!is_deleted_column.empty())
is_deleted_column_number = header_.getPositionByName(is_deleted_column);

View File

@ -24,8 +24,6 @@ struct ChunkSelectFinalIndices : public ChunkInfoCloneable<ChunkSelectFinalIndic
const ColumnUInt64 * select_final_indices = nullptr;
};
class TemporaryDataBuffer;
/** Merges several sorted inputs into one.
* For each group of consecutive identical values of the primary key (the columns by which the data is sorted),
* keeps row with max `version` value.
@ -40,7 +38,7 @@ public:
const String & version_column,
size_t max_block_size_rows,
size_t max_block_size_bytes,
std::shared_ptr<TemporaryDataBuffer> temp_data_buffer_ = nullptr,
WriteBuffer * out_row_sources_buf_ = nullptr,
bool use_average_block_sizes = false,
bool cleanup = false,
bool enable_vertical_final_ = false);
@ -61,8 +59,6 @@ private:
RowRef selected_row; /// Last row with maximum version for current primary key, may extend lifetime of chunk in input source
size_t max_pos = 0; /// The position (into current_row_sources) of the row with the highest version.
std::shared_ptr<TemporaryDataBuffer> temp_data_buffer = nullptr;
/// Sources of rows with the current primary key.
PODArray<RowSourcePart> current_row_sources;

View File

@ -1,7 +1,6 @@
#include <Processors/Merges/Algorithms/VersionedCollapsingAlgorithm.h>
#include <Columns/ColumnsNumber.h>
#include <IO/WriteBuffer.h>
#include <Interpreters/TemporaryDataOnDisk.h>
namespace DB
{
@ -15,13 +14,12 @@ VersionedCollapsingAlgorithm::VersionedCollapsingAlgorithm(
const String & sign_column_,
size_t max_block_size_rows_,
size_t max_block_size_bytes_,
std::shared_ptr<TemporaryDataBuffer> temp_data_buffer_,
WriteBuffer * out_row_sources_buf_,
bool use_average_block_sizes)
: IMergingAlgorithmWithSharedChunks(header_, num_inputs, std::move(description_), temp_data_buffer_.get(), MAX_ROWS_IN_MULTIVERSION_QUEUE, std::make_unique<MergedData>(use_average_block_sizes, max_block_size_rows_, max_block_size_bytes_))
: IMergingAlgorithmWithSharedChunks(header_, num_inputs, std::move(description_), out_row_sources_buf_, MAX_ROWS_IN_MULTIVERSION_QUEUE, std::make_unique<MergedData>(use_average_block_sizes, max_block_size_rows_, max_block_size_bytes_))
/// -1 for +1 in FixedSizeDequeWithGaps's internal buffer. 3 is a reasonable minimum size to collapse anything.
, max_rows_in_queue(std::min(std::max<size_t>(3, max_block_size_rows_), MAX_ROWS_IN_MULTIVERSION_QUEUE) - 1)
, current_keys(max_rows_in_queue)
, temp_data_buffer(temp_data_buffer_)
{
sign_column_number = header_.getPositionByName(sign_column_);
}

View File

@ -8,8 +8,6 @@
namespace DB
{
class TemporaryDataBuffer;
/** Merges several sorted inputs to one.
* For each group of consecutive identical values of the sorting key
* (the columns by which the data is sorted, including specially specified version column),
@ -24,7 +22,7 @@ public:
SortDescription description_, const String & sign_column_,
size_t max_block_size_rows,
size_t max_block_size_bytes,
std::shared_ptr<TemporaryDataBuffer> temp_data_buffer_ = nullptr,
WriteBuffer * out_row_sources_buf_ = nullptr,
bool use_average_block_sizes = false);
const char * getName() const override { return "VersionedCollapsingAlgorithm"; }
@ -39,8 +37,6 @@ private:
FixedSizeDequeWithGaps<RowRef> current_keys;
Int8 sign_in_queue = 0;
std::shared_ptr<TemporaryDataBuffer> temp_data_buffer = nullptr;
std::queue<RowSourcePart> current_row_sources; /// Sources of rows with the current primary key
void insertGap(size_t gap_size);

View File

@ -23,7 +23,7 @@ public:
bool only_positive_sign,
size_t max_block_size_rows,
size_t max_block_size_bytes,
std::shared_ptr<TemporaryDataBuffer> out_row_sources_buf_ = nullptr,
WriteBuffer * out_row_sources_buf_ = nullptr,
bool use_average_block_sizes = false)
: IMergingTransform(
num_inputs, header, header, /*have_all_inputs_=*/ true, /*limit_hint_=*/ 0, /*always_read_till_end_=*/ false,

View File

@ -20,7 +20,7 @@ MergingSortedTransform::MergingSortedTransform(
SortingQueueStrategy sorting_queue_strategy,
UInt64 limit_,
bool always_read_till_end_,
std::shared_ptr<TemporaryDataBuffer> out_row_sources_buf_,
WriteBuffer * out_row_sources_buf_,
bool use_average_block_sizes,
bool have_all_inputs_)
: IMergingTransform(

View File

@ -20,7 +20,7 @@ public:
SortingQueueStrategy sorting_queue_strategy,
UInt64 limit_ = 0,
bool always_read_till_end_ = false,
std::shared_ptr<TemporaryDataBuffer> out_row_sources_buf_ = nullptr,
WriteBuffer * out_row_sources_buf_ = nullptr,
bool use_average_block_sizes = false,
bool have_all_inputs_ = true);

View File

@ -21,7 +21,7 @@ public:
const String & is_deleted_column, const String & version_column,
size_t max_block_size_rows,
size_t max_block_size_bytes,
std::shared_ptr<TemporaryDataBuffer> temp_data_buffer_ = nullptr,
WriteBuffer * out_row_sources_buf_ = nullptr,
bool use_average_block_sizes = false,
bool cleanup = false,
bool enable_vertical_final = false)
@ -34,7 +34,7 @@ public:
version_column,
max_block_size_rows,
max_block_size_bytes,
temp_data_buffer_,
out_row_sources_buf_,
use_average_block_sizes,
cleanup,
enable_vertical_final)

View File

@ -21,7 +21,7 @@ public:
SortDescription description_, const String & sign_column_,
size_t max_block_size_rows,
size_t max_block_size_bytes,
std::shared_ptr<TemporaryDataBuffer> temp_data_buffer_ = nullptr,
WriteBuffer * out_row_sources_buf_ = nullptr,
bool use_average_block_sizes = false)
: IMergingTransform(
num_inputs, header, header, /*have_all_inputs_=*/ true, /*limit_hint_=*/ 0, /*always_read_till_end_=*/ false,
@ -31,7 +31,7 @@ public:
sign_column_,
max_block_size_rows,
max_block_size_bytes,
temp_data_buffer_,
out_row_sources_buf_,
use_average_block_sizes)
{
}

View File

@ -20,6 +20,7 @@ struct BuildQueryPipelineSettings
ExpressionActionsSettings actions_settings;
QueryStatusPtr process_list_element;
ProgressCallback progress_callback = nullptr;
TemporaryFileLookupPtr temporary_file_lookup;
const ExpressionActionsSettings & getActionsSettings() const { return actions_settings; }
static BuildQueryPipelineSettings fromContext(ContextPtr from);

View File

@ -197,12 +197,6 @@ public:
void setQueryIdHolder(std::shared_ptr<QueryIdHolder> query_id_holder) { resources.query_id_holders.emplace_back(std::move(query_id_holder)); }
void addContext(ContextPtr context) { resources.interpreter_context.emplace_back(std::move(context)); }
template <typename Resource>
void addResource(Resource resource, std::vector<Resource> QueryPlanResourceHolder::*field)
{
(resources.*field).push_back(std::move(resource));
}
/// Convert query pipeline to pipe.
static Pipe getPipe(QueryPipelineBuilder pipeline, QueryPlanResourceHolder & resources);
static QueryPipeline getPipeline(QueryPipelineBuilder builder);

View File

@ -13,7 +13,6 @@ class QueryPlan;
class Context;
struct QueryIdHolder;
class TemporaryDataBuffer;
struct QueryPlanResourceHolder
{
@ -34,7 +33,6 @@ struct QueryPlanResourceHolder
std::vector<StoragePtr> storage_holders;
std::vector<TableLockHolder> table_locks;
std::vector<std::shared_ptr<QueryIdHolder>> query_id_holders;
std::vector<std::shared_ptr<TemporaryDataBuffer>> rows_sources_temporary_file;
};
}

View File

@ -65,6 +65,11 @@ namespace ProfileEvents
extern const Event MergeProjectionStageExecuteMilliseconds;
}
namespace CurrentMetrics
{
extern const Metric TemporaryFilesForMerge;
}
namespace DB
{
namespace Setting
@ -124,6 +129,66 @@ static ColumnsStatistics getStatisticsForColumns(
return all_statistics;
}
/// Manages the "rows_sources" temporary file that is used during vertical merge.
class RowsSourcesTemporaryFile : public ITemporaryFileLookup
{
public:
/// A logical name of the temporary file under which it will be known to the plan steps that use it.
static constexpr auto FILE_ID = "rows_sources";
explicit RowsSourcesTemporaryFile(TemporaryDataOnDiskScopePtr temporary_data_on_disk_)
: temporary_data_on_disk(temporary_data_on_disk_->childScope(CurrentMetrics::TemporaryFilesForMerge))
{
}
WriteBuffer & getTemporaryFileForWriting(const String & name) override
{
if (name != FILE_ID)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected temporary file name requested: {}", name);
if (tmp_data_buffer)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Temporary file was already requested for writing, there musto be only one writer");
tmp_data_buffer = std::make_unique<TemporaryDataBuffer>(temporary_data_on_disk.get());
return *tmp_data_buffer;
}
std::unique_ptr<ReadBuffer> getTemporaryFileForReading(const String & name) override
{
if (name != FILE_ID)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected temporary file name requested: {}", name);
if (!finalized)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Temporary file is not finalized yet");
/// tmp_disk might not create real file if no data was written to it.
if (final_size == 0)
return std::make_unique<ReadBufferFromEmptyFile>();
/// Reopen the file for each read so that multiple reads can be performed in parallel and there is no need to seek to the beginning.
return tmp_data_buffer->read();
}
/// Returns written data size in bytes
size_t finalizeWriting()
{
if (!tmp_data_buffer)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Temporary file was not requested for writing");
auto stat = tmp_data_buffer->finishWriting();
finalized = true;
final_size = stat.uncompressed_size;
return final_size;
}
private:
std::unique_ptr<TemporaryDataBuffer> tmp_data_buffer;
TemporaryDataOnDiskScopePtr temporary_data_on_disk;
bool finalized = false;
size_t final_size = 0;
};
static void addMissedColumnsToSerializationInfos(
size_t num_rows_in_parts,
const Names & part_columns,
@ -425,7 +490,7 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare() const
}
case MergeAlgorithm::Vertical:
{
ctx->rows_sources_temporary_file = std::make_unique<TemporaryDataBuffer>(global_ctx->context->getTempDataOnDisk().get());
ctx->rows_sources_temporary_file = std::make_shared<RowsSourcesTemporaryFile>(global_ctx->context->getTempDataOnDisk());
std::map<String, UInt64> local_merged_column_to_size;
for (const auto & part : global_ctx->future_part->parts)
@ -802,11 +867,24 @@ bool MergeTask::VerticalMergeStage::prepareVerticalMergeForAllColumns() const
if (global_ctx->chosen_merge_algorithm != MergeAlgorithm::Vertical)
return false;
size_t sum_input_rows_exact = global_ctx->merge_list_element_ptr->rows_read;
size_t input_rows_filtered = *global_ctx->input_rows_filtered;
global_ctx->merge_list_element_ptr->columns_written = global_ctx->merging_columns.size();
global_ctx->merge_list_element_ptr->progress.store(ctx->column_sizes->keyColumnsWeight(), std::memory_order_relaxed);
/// Ensure data has written to disk.
ctx->rows_sources_temporary_file->finishWriting();
size_t rows_sources_count = ctx->rows_sources_temporary_file->finalizeWriting();
/// In special case, when there is only one source part, and no rows were skipped, we may have
/// skipped writing rows_sources file. Otherwise rows_sources_count must be equal to the total
/// number of input rows.
/// Note that only one byte index is written for each row, so number of rows is equals to the number of bytes written.
if ((rows_sources_count > 0 || global_ctx->future_part->parts.size() > 1) && sum_input_rows_exact != rows_sources_count + input_rows_filtered)
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Number of rows in source parts ({}) excluding filtered rows ({}) differs from number "
"of bytes written to rows_sources file ({}). It is a bug.",
sum_input_rows_exact, input_rows_filtered, rows_sources_count);
ctx->it_name_and_type = global_ctx->gathering_columns.cbegin();
@ -838,12 +916,12 @@ class ColumnGathererStep : public ITransformingStep
public:
ColumnGathererStep(
const Header & input_header_,
std::unique_ptr<ReadBuffer> rows_sources_read_buf_,
const String & rows_sources_temporary_file_name_,
UInt64 merge_block_size_rows_,
UInt64 merge_block_size_bytes_,
bool is_result_sparse_)
: ITransformingStep(input_header_, input_header_, getTraits())
, rows_sources_read_buf(std::move(rows_sources_read_buf_))
, rows_sources_temporary_file_name(rows_sources_temporary_file_name_)
, merge_block_size_rows(merge_block_size_rows_)
, merge_block_size_bytes(merge_block_size_bytes_)
, is_result_sparse(is_result_sparse_)
@ -851,13 +929,15 @@ public:
String getName() const override { return "ColumnGatherer"; }
void transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings & /* pipeline_settings */) override
void transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings & pipeline_settings) override
{
const auto & header = pipeline.getHeader();
const auto &header = pipeline.getHeader();
const auto input_streams_count = pipeline.getNumStreams();
if (!rows_sources_read_buf)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Temporary data buffer for rows sources is not set");
if (!pipeline_settings.temporary_file_lookup)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Temporary file lookup is not set in pipeline settings for vertical merge");
auto rows_sources_read_buf = pipeline_settings.temporary_file_lookup->getTemporaryFileForReading(rows_sources_temporary_file_name);
auto transform = std::make_unique<ColumnGathererTransform>(
header,
@ -892,7 +972,7 @@ private:
}
MergeTreeData::MergingParams merging_params{};
std::unique_ptr<ReadBuffer> rows_sources_read_buf;
const String rows_sources_temporary_file_name;
const UInt64 merge_block_size_rows;
const UInt64 merge_block_size_bytes;
const bool is_result_sparse;
@ -943,7 +1023,7 @@ MergeTask::VerticalMergeRuntimeContext::PreparedColumnPipeline MergeTask::Vertic
const auto data_settings = global_ctx->data->getSettings();
auto merge_step = std::make_unique<ColumnGathererStep>(
merge_column_query_plan.getCurrentHeader(),
ctx->rows_sources_temporary_file->read(),
RowsSourcesTemporaryFile::FILE_ID,
(*data_settings)[MergeTreeSetting::merge_max_block_size],
(*data_settings)[MergeTreeSetting::merge_max_block_size_bytes],
is_result_sparse);
@ -972,9 +1052,9 @@ MergeTask::VerticalMergeRuntimeContext::PreparedColumnPipeline MergeTask::Vertic
}
auto pipeline_settings = BuildQueryPipelineSettings::fromContext(global_ctx->context);
pipeline_settings.temporary_file_lookup = ctx->rows_sources_temporary_file;
auto optimization_settings = QueryPlanOptimizationSettings::fromContext(global_ctx->context);
auto builder = merge_column_query_plan.buildQueryPipeline(optimization_settings, pipeline_settings);
builder->addResource(ctx->rows_sources_temporary_file, &QueryPlanResourceHolder::rows_sources_temporary_file);
return {QueryPipelineBuilder::getPipeline(std::move(*builder)), std::move(indexes_to_recalc)};
}
@ -1347,7 +1427,7 @@ public:
const SortDescription & sort_description_,
const Names partition_key_columns_,
const MergeTreeData::MergingParams & merging_params_,
std::shared_ptr<TemporaryDataBuffer> rows_sources_temporary_file_,
const String & rows_sources_temporary_file_name_,
UInt64 merge_block_size_rows_,
UInt64 merge_block_size_bytes_,
bool blocks_are_granules_size_,
@ -1357,7 +1437,7 @@ public:
, sort_description(sort_description_)
, partition_key_columns(partition_key_columns_)
, merging_params(merging_params_)
, rows_sources_temporary_file(rows_sources_temporary_file_)
, rows_sources_temporary_file_name(rows_sources_temporary_file_name_)
, merge_block_size_rows(merge_block_size_rows_)
, merge_block_size_bytes(merge_block_size_bytes_)
, blocks_are_granules_size(blocks_are_granules_size_)
@ -1367,7 +1447,7 @@ public:
String getName() const override { return "MergeParts"; }
void transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings & /* pipeline_settings */) override
void transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings & pipeline_settings) override
{
/// The order of the streams is important: when the key is matched, the elements go in the order of the source stream number.
/// In the merged part, the lines with the same key must be in the ascending order of the identifier of original part,
@ -1377,6 +1457,14 @@ public:
const auto & header = pipeline.getHeader();
const auto input_streams_count = pipeline.getNumStreams();
WriteBuffer * rows_sources_write_buf = nullptr;
if (!rows_sources_temporary_file_name.empty())
{
if (!pipeline_settings.temporary_file_lookup)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Temporary file lookup is not set in pipeline settings for vertical merge");
rows_sources_write_buf = &pipeline_settings.temporary_file_lookup->getTemporaryFileForWriting(rows_sources_temporary_file_name);
}
switch (merging_params.mode)
{
case MergeTreeData::MergingParams::Ordinary:
@ -1389,14 +1477,14 @@ public:
SortingQueueStrategy::Default,
/* limit_= */0,
/* always_read_till_end_= */false,
rows_sources_temporary_file,
rows_sources_write_buf,
blocks_are_granules_size);
break;
case MergeTreeData::MergingParams::Collapsing:
merged_transform = std::make_shared<CollapsingSortedTransform>(
header, input_streams_count, sort_description, merging_params.sign_column, false,
merge_block_size_rows, merge_block_size_bytes, rows_sources_temporary_file, blocks_are_granules_size);
merge_block_size_rows, merge_block_size_bytes, rows_sources_write_buf, blocks_are_granules_size);
break;
case MergeTreeData::MergingParams::Summing:
@ -1411,7 +1499,7 @@ public:
case MergeTreeData::MergingParams::Replacing:
merged_transform = std::make_shared<ReplacingSortedTransform>(
header, input_streams_count, sort_description, merging_params.is_deleted_column, merging_params.version_column,
merge_block_size_rows, merge_block_size_bytes, rows_sources_temporary_file, blocks_are_granules_size,
merge_block_size_rows, merge_block_size_bytes, rows_sources_write_buf, blocks_are_granules_size,
cleanup);
break;
@ -1424,7 +1512,7 @@ public:
case MergeTreeData::MergingParams::VersionedCollapsing:
merged_transform = std::make_shared<VersionedCollapsingTransform>(
header, input_streams_count, sort_description, merging_params.sign_column,
merge_block_size_rows, merge_block_size_bytes, rows_sources_temporary_file, blocks_are_granules_size);
merge_block_size_rows, merge_block_size_bytes, rows_sources_write_buf, blocks_are_granules_size);
break;
}
@ -1466,7 +1554,7 @@ private:
const SortDescription sort_description;
const Names partition_key_columns;
const MergeTreeData::MergingParams merging_params{};
std::shared_ptr<TemporaryDataBuffer> rows_sources_temporary_file;
const String rows_sources_temporary_file_name;
const UInt64 merge_block_size_rows;
const UInt64 merge_block_size_bytes;
const bool blocks_are_granules_size;
@ -1635,7 +1723,7 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream() const
sort_description,
partition_key_columns,
global_ctx->merging_params,
(is_vertical_merge ? ctx->rows_sources_temporary_file : nullptr), /// rows_sources' temporary file is used only for vertical merge
(is_vertical_merge ? RowsSourcesTemporaryFile::FILE_ID : ""), /// rows_sources' temporary file is used only for vertical merge
(*data_settings)[MergeTreeSetting::merge_max_block_size],
(*data_settings)[MergeTreeSetting::merge_max_block_size_bytes],
ctx->blocks_are_granules_size,
@ -1700,6 +1788,7 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream() const
{
auto pipeline_settings = BuildQueryPipelineSettings::fromContext(global_ctx->context);
pipeline_settings.temporary_file_lookup = ctx->rows_sources_temporary_file;
auto optimization_settings = QueryPlanOptimizationSettings::fromContext(global_ctx->context);
auto builder = merge_parts_query_plan.buildQueryPipeline(optimization_settings, pipeline_settings);

View File

@ -42,6 +42,7 @@ namespace DB
class MergeTask;
using MergeTaskPtr = std::shared_ptr<MergeTask>;
class RowsSourcesTemporaryFile;
/**
* Overview of the merge algorithm
@ -243,7 +244,7 @@ private:
bool force_ttl{false};
CompressionCodecPtr compression_codec{nullptr};
size_t sum_input_rows_upper_bound{0};
std::shared_ptr<TemporaryDataBuffer> rows_sources_temporary_file;
std::shared_ptr<RowsSourcesTemporaryFile> rows_sources_temporary_file;
std::optional<ColumnSizeEstimator> column_sizes{};
/// For projections to rebuild
@ -322,7 +323,7 @@ private:
struct VerticalMergeRuntimeContext : public IStageRuntimeContext
{
/// Begin dependencies from previous stage
std::shared_ptr<TemporaryDataBuffer> rows_sources_temporary_file;
std::shared_ptr<RowsSourcesTemporaryFile> rows_sources_temporary_file;
std::optional<ColumnSizeEstimator> column_sizes;
CompressionCodecPtr compression_codec;
std::list<DB::NameAndTypePair>::const_iterator it_name_and_type;

View File

@ -111,11 +111,10 @@ MergeTreeSelectProcessor::MergeTreeSelectProcessor(
}
if (!prewhere_actions.steps.empty())
LOG_TRACE(log, "PREWHERE condition was split into {} steps", prewhere_actions.steps.size());
LOG_TRACE(log, "PREWHERE condition was split into {} steps: {}", prewhere_actions.steps.size(), prewhere_actions.dumpConditions());
if (prewhere_info)
LOG_TEST(log, "Original PREWHERE DAG:{}\n{}\nPREWHERE actions:\n{}",
prewhere_actions.dumpConditions(),
LOG_TEST(log, "Original PREWHERE DAG:\n{}\nPREWHERE actions:\n{}",
prewhere_info->prewhere_actions.dumpDAG(),
(!prewhere_actions.steps.empty() ? prewhere_actions.dump() : std::string("<nullptr>")));
}