ClickHouse/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp
Amos Bird 264cff6415
Projections
TODO (suggested by Nikolai)

1. Build query plan fro current query (inside storage::read) up to WithMergableState
2. Check, that plan is simple enough: Aggregating - Expression - Filter - ReadFromStorage (or simplier)
3. Check, that filter is the same as filter in projection, and also expression calculates the same aggregation keys as in projection
4. Return WithMergableState if projection applies

3 will be easier to do with ActionsDAG, cause it sees all functions, and dependencies are direct (but it is possible with ExpressionActions also)

Also need to figure out how prewhere works for projections, and
row_filter_policies.

wip
2021-05-11 18:12:23 +08:00

2310 lines
95 KiB
C++

#include "MergeTreeDataMergerMutator.h"
#include <Storages/MergeTree/MergeTreeSequentialSource.h>
#include <Storages/MergeTree/MergedBlockOutputStream.h>
#include <Storages/MergeTree/MergedColumnOnlyOutputStream.h>
#include <Storages/MergeTree/SimpleMergeSelector.h>
#include <Storages/MergeTree/AllMergeSelector.h>
#include <Storages/MergeTree/TTLMergeSelector.h>
#include <Storages/MergeTree/MergeList.h>
#include <Storages/MergeTree/MergeTreeDataWriter.h>
#include <Storages/MergeTree/StorageFromMergeTreeDataPart.h>
#include <DataStreams/TTLBlockInputStream.h>
#include <DataStreams/DistinctSortedBlockInputStream.h>
#include <DataStreams/ExpressionBlockInputStream.h>
#include <DataStreams/MaterializingBlockInputStream.h>
#include <DataStreams/ColumnGathererStream.h>
#include <DataStreams/SquashingBlockInputStream.h>
#include <Processors/Merges/MergingSortedTransform.h>
#include <Processors/Merges/CollapsingSortedTransform.h>
#include <Processors/Merges/SummingSortedTransform.h>
#include <Processors/Merges/ReplacingSortedTransform.h>
#include <Processors/Merges/GraphiteRollupSortedTransform.h>
#include <Processors/Merges/AggregatingSortedTransform.h>
#include <Processors/Merges/VersionedCollapsingTransform.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
#include <Processors/Transforms/ExpressionTransform.h>
#include <Processors/Transforms/MaterializingTransform.h>
#include <Processors/Executors/PipelineExecutingBlockInputStream.h>
#include <Interpreters/MutationsInterpreter.h>
#include <Interpreters/Context.h>
#include <Common/interpolate.h>
#include <Common/typeid_cast.h>
#include <Common/escapeForFileName.h>
#include <Parsers/queryToString.h>
#include <cmath>
#include <ctime>
#include <numeric>
#include <boost/algorithm/string/replace.hpp>
namespace ProfileEvents
{
extern const Event MergedRows;
extern const Event MergedUncompressedBytes;
extern const Event MergesTimeMilliseconds;
extern const Event Merge;
}
namespace CurrentMetrics
{
extern const Metric BackgroundPoolTask;
extern const Metric PartMutation;
}
namespace DB
{
namespace ErrorCodes
{
extern const int DIRECTORY_ALREADY_EXISTS;
extern const int LOGICAL_ERROR;
extern const int ABORTED;
}
/// Do not start to merge parts, if free space is less than sum size of parts times specified coefficient.
/// This value is chosen to not allow big merges to eat all free space. Thus allowing small merges to proceed.
static const double DISK_USAGE_COEFFICIENT_TO_SELECT = 2;
/// To do merge, reserve amount of space equals to sum size of parts times specified coefficient.
/// Must be strictly less than DISK_USAGE_COEFFICIENT_TO_SELECT,
/// because between selecting parts to merge and doing merge, amount of free space could have decreased.
static const double DISK_USAGE_COEFFICIENT_TO_RESERVE = 1.1;
void FutureMergedMutatedPart::assign(MergeTreeData::DataPartsVector parts_)
{
if (parts_.empty())
return;
size_t sum_rows = 0;
size_t sum_bytes_uncompressed = 0;
MergeTreeDataPartType future_part_type = MergeTreeDataPartType::UNKNOWN;
for (const auto & part : parts_)
{
sum_rows += part->rows_count;
sum_bytes_uncompressed += part->getTotalColumnsSize().data_uncompressed;
future_part_type = std::min(future_part_type, part->getType());
}
auto chosen_type = parts_.front()->storage.choosePartTypeOnDisk(sum_bytes_uncompressed, sum_rows);
future_part_type = std::min(future_part_type, chosen_type);
assign(std::move(parts_), future_part_type);
}
void FutureMergedMutatedPart::assign(MergeTreeData::DataPartsVector parts_, MergeTreeDataPartType future_part_type)
{
if (parts_.empty())
return;
for (const MergeTreeData::DataPartPtr & part : parts_)
{
const MergeTreeData::DataPartPtr & first_part = parts_.front();
if (part->partition.value != first_part->partition.value)
throw Exception(
"Attempting to merge parts " + first_part->name + " and " + part->name + " that are in different partitions",
ErrorCodes::LOGICAL_ERROR);
}
parts = std::move(parts_);
UInt32 max_level = 0;
Int64 max_mutation = 0;
for (const auto & part : parts)
{
max_level = std::max(max_level, part->info.level);
max_mutation = std::max(max_mutation, part->info.mutation);
}
type = future_part_type;
part_info.partition_id = parts.front()->info.partition_id;
part_info.min_block = parts.front()->info.min_block;
part_info.max_block = parts.back()->info.max_block;
part_info.level = max_level + 1;
part_info.mutation = max_mutation;
if (parts.front()->storage.format_version < MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
{
DayNum min_date = DayNum(std::numeric_limits<UInt16>::max());
DayNum max_date = DayNum(std::numeric_limits<UInt16>::min());
for (const auto & part : parts)
{
/// NOTE: getting min and max dates from part names (instead of part data) because we want
/// the merged part name be determined only by source part names.
/// It is simpler this way when the real min and max dates for the block range can change
/// (e.g. after an ALTER DELETE command).
DayNum part_min_date;
DayNum part_max_date;
MergeTreePartInfo::parseMinMaxDatesFromPartName(part->name, part_min_date, part_max_date);
min_date = std::min(min_date, part_min_date);
max_date = std::max(max_date, part_max_date);
}
name = part_info.getPartNameV0(min_date, max_date);
}
else
name = part_info.getPartName();
}
void FutureMergedMutatedPart::updatePath(const MergeTreeData & storage, const ReservationPtr & reservation)
{
path = storage.getFullPathOnDisk(reservation->getDisk()) + name + "/";
}
MergeTreeDataMergerMutator::MergeTreeDataMergerMutator(MergeTreeData & data_, size_t background_pool_size_)
: data(data_), background_pool_size(background_pool_size_), log(&Poco::Logger::get(data.getLogName() + " (MergerMutator)"))
{
}
UInt64 MergeTreeDataMergerMutator::getMaxSourcePartsSizeForMerge() const
{
size_t busy_threads_in_pool = CurrentMetrics::values[CurrentMetrics::BackgroundPoolTask].load(std::memory_order_relaxed);
return getMaxSourcePartsSizeForMerge(background_pool_size, busy_threads_in_pool);
}
UInt64 MergeTreeDataMergerMutator::getMaxSourcePartsSizeForMerge(size_t pool_size, size_t pool_used) const
{
if (pool_used > pool_size)
throw Exception("Logical error: invalid arguments passed to getMaxSourcePartsSize: pool_used > pool_size", ErrorCodes::LOGICAL_ERROR);
size_t free_entries = pool_size - pool_used;
const auto data_settings = data.getSettings();
/// Always allow maximum size if one or less pool entries is busy.
/// One entry is probably the entry where this function is executed.
/// This will protect from bad settings.
UInt64 max_size = 0;
if (pool_used <= 1 || free_entries >= data_settings->number_of_free_entries_in_pool_to_lower_max_size_of_merge)
max_size = data_settings->max_bytes_to_merge_at_max_space_in_pool;
else
max_size = interpolateExponential(
data_settings->max_bytes_to_merge_at_min_space_in_pool,
data_settings->max_bytes_to_merge_at_max_space_in_pool,
static_cast<double>(free_entries) / data_settings->number_of_free_entries_in_pool_to_lower_max_size_of_merge);
return std::min(max_size, static_cast<UInt64>(data.getStoragePolicy()->getMaxUnreservedFreeSpace() / DISK_USAGE_COEFFICIENT_TO_SELECT));
}
UInt64 MergeTreeDataMergerMutator::getMaxSourcePartSizeForMutation() const
{
const auto data_settings = data.getSettings();
size_t busy_threads_in_pool = CurrentMetrics::values[CurrentMetrics::BackgroundPoolTask].load(std::memory_order_relaxed);
/// DataPart can be store only at one disk. Get maximum reservable free space at all disks.
UInt64 disk_space = data.getStoragePolicy()->getMaxUnreservedFreeSpace();
/// Allow mutations only if there are enough threads, leave free threads for merges else
if (busy_threads_in_pool <= 1
|| background_pool_size - busy_threads_in_pool >= data_settings->number_of_free_entries_in_pool_to_execute_mutation)
return static_cast<UInt64>(disk_space / DISK_USAGE_COEFFICIENT_TO_RESERVE);
return 0;
}
SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMerge(
FutureMergedMutatedPart & future_part,
bool aggressive,
size_t max_total_size_to_merge,
const AllowedMergingPredicate & can_merge_callback,
bool merge_with_ttl_allowed,
String * out_disable_reason)
{
MergeTreeData::DataPartsVector data_parts = data.getDataPartsVector();
const auto data_settings = data.getSettings();
auto metadata_snapshot = data.getInMemoryMetadataPtr();
if (data_parts.empty())
{
if (out_disable_reason)
*out_disable_reason = "There are no parts in the table";
return SelectPartsDecision::CANNOT_SELECT;
}
time_t current_time = std::time(nullptr);
IMergeSelector::PartsRanges parts_ranges;
StoragePolicyPtr storage_policy = data.getStoragePolicy();
/// Volumes with stopped merges are extremely rare situation.
/// Check it once and don't check each part (this is bad for performance).
bool has_volumes_with_disabled_merges = storage_policy->hasAnyVolumeWithDisabledMerges();
const String * prev_partition_id = nullptr;
/// Previous part only in boundaries of partition frame
const MergeTreeData::DataPartPtr * prev_part = nullptr;
size_t parts_selected_precondition = 0;
for (const MergeTreeData::DataPartPtr & part : data_parts)
{
const String & partition_id = part->info.partition_id;
if (!prev_partition_id || partition_id != *prev_partition_id)
{
if (parts_ranges.empty() || !parts_ranges.back().empty())
parts_ranges.emplace_back();
/// New partition frame.
prev_partition_id = &partition_id;
prev_part = nullptr;
}
/// Check predicate only for first part in each partition.
if (!prev_part)
{
/* Parts can be merged with themselves for TTL needs for example.
* So we have to check if this part is currently being inserted with quorum and so on and so forth.
* Obviously we have to check it manually only for the first part
* of each partition because it will be automatically checked for a pair of parts. */
if (!can_merge_callback(nullptr, part, nullptr))
continue;
}
else
{
/// If we cannot merge with previous part we had to start new parts
/// interval (in the same partition)
if (!can_merge_callback(*prev_part, part, nullptr))
{
/// Starting new interval in the same partition
if (!parts_ranges.back().empty())
parts_ranges.emplace_back();
/// Now we have no previous part, but it affects only logging
prev_part = nullptr;
}
}
IMergeSelector::Part part_info;
part_info.size = part->getBytesOnDisk();
part_info.age = current_time - part->modification_time;
part_info.level = part->info.level;
part_info.data = &part;
part_info.ttl_infos = &part->ttl_infos;
part_info.compression_codec_desc = part->default_codec->getFullCodecDesc();
part_info.shall_participate_in_merges = has_volumes_with_disabled_merges ? part->shallParticipateInMerges(storage_policy) : true;
++parts_selected_precondition;
parts_ranges.back().emplace_back(part_info);
/// Check for consistency of data parts. If assertion is failed, it requires immediate investigation.
if (prev_part && part->info.partition_id == (*prev_part)->info.partition_id
&& part->info.min_block <= (*prev_part)->info.max_block)
{
LOG_ERROR(log, "Part {} intersects previous part {}", part->name, (*prev_part)->name);
}
prev_part = &part;
}
if (parts_selected_precondition == 0)
{
if (out_disable_reason)
*out_disable_reason = "No parts satisfy preconditions for merge";
return SelectPartsDecision::CANNOT_SELECT;
}
IMergeSelector::PartsRange parts_to_merge;
if (metadata_snapshot->hasAnyTTL() && merge_with_ttl_allowed && !ttl_merges_blocker.isCancelled())
{
/// TTL delete is preferred to recompression
TTLDeleteMergeSelector delete_ttl_selector(
next_delete_ttl_merge_times_by_partition,
current_time,
data_settings->merge_with_ttl_timeout,
data_settings->ttl_only_drop_parts);
parts_to_merge = delete_ttl_selector.select(parts_ranges, max_total_size_to_merge);
if (!parts_to_merge.empty())
{
future_part.merge_type = MergeType::TTL_DELETE;
}
else if (metadata_snapshot->hasAnyRecompressionTTL())
{
TTLRecompressMergeSelector recompress_ttl_selector(
next_recompress_ttl_merge_times_by_partition,
current_time,
data_settings->merge_with_recompression_ttl_timeout,
metadata_snapshot->getRecompressionTTLs());
parts_to_merge = recompress_ttl_selector.select(parts_ranges, max_total_size_to_merge);
if (!parts_to_merge.empty())
future_part.merge_type = MergeType::TTL_RECOMPRESS;
}
}
if (parts_to_merge.empty())
{
SimpleMergeSelector::Settings merge_settings;
if (aggressive)
merge_settings.base = 1;
parts_to_merge = SimpleMergeSelector(merge_settings)
.select(parts_ranges, max_total_size_to_merge);
/// Do not allow to "merge" part with itself for regular merges, unless it is a TTL-merge where it is ok to remove some values with expired ttl
if (parts_to_merge.size() == 1)
throw Exception("Logical error: merge selector returned only one part to merge", ErrorCodes::LOGICAL_ERROR);
if (parts_to_merge.empty())
{
if (out_disable_reason)
*out_disable_reason = "There is no need to merge parts according to merge selector algorithm";
return SelectPartsDecision::CANNOT_SELECT;
}
}
MergeTreeData::DataPartsVector parts;
parts.reserve(parts_to_merge.size());
for (IMergeSelector::Part & part_info : parts_to_merge)
{
const MergeTreeData::DataPartPtr & part = *static_cast<const MergeTreeData::DataPartPtr *>(part_info.data);
parts.push_back(part);
}
LOG_DEBUG(log, "Selected {} parts from {} to {}", parts.size(), parts.front()->name, parts.back()->name);
future_part.assign(std::move(parts));
return SelectPartsDecision::SELECTED;
}
SelectPartsDecision MergeTreeDataMergerMutator::selectAllPartsToMergeWithinPartition(
FutureMergedMutatedPart & future_part,
UInt64 & available_disk_space,
const AllowedMergingPredicate & can_merge,
const String & partition_id,
bool final,
const StorageMetadataPtr & metadata_snapshot,
String * out_disable_reason,
bool optimize_skip_merged_partitions)
{
MergeTreeData::DataPartsVector parts = selectAllPartsFromPartition(partition_id);
if (parts.empty())
return SelectPartsDecision::CANNOT_SELECT;
if (!final && parts.size() == 1)
{
if (out_disable_reason)
*out_disable_reason = "There is only one part inside partition";
return SelectPartsDecision::CANNOT_SELECT;
}
/// If final, optimize_skip_merged_partitions is true and we have only one part in partition with level > 0
/// than we don't select it to merge. But if there are some expired TTL then merge is needed
if (final && optimize_skip_merged_partitions && parts.size() == 1 && parts[0]->info.level > 0 &&
(!metadata_snapshot->hasAnyTTL() || parts[0]->checkAllTTLCalculated(metadata_snapshot)))
{
return SelectPartsDecision::NOTHING_TO_MERGE;
}
auto it = parts.begin();
auto prev_it = it;
UInt64 sum_bytes = 0;
while (it != parts.end())
{
/// For the case of one part, we check that it can be merged "with itself".
if ((it != parts.begin() || parts.size() == 1) && !can_merge(*prev_it, *it, out_disable_reason))
{
return SelectPartsDecision::CANNOT_SELECT;
}
sum_bytes += (*it)->getBytesOnDisk();
prev_it = it;
++it;
}
/// Enough disk space to cover the new merge with a margin.
auto required_disk_space = sum_bytes * DISK_USAGE_COEFFICIENT_TO_SELECT;
if (available_disk_space <= required_disk_space)
{
time_t now = time(nullptr);
if (now - disk_space_warning_time > 3600)
{
disk_space_warning_time = now;
LOG_WARNING(log,
"Won't merge parts from {} to {} because not enough free space: {} free and unreserved"
", {} required now (+{}% on overhead); suppressing similar warnings for the next hour",
parts.front()->name,
(*prev_it)->name,
ReadableSize(available_disk_space),
ReadableSize(sum_bytes),
static_cast<int>((DISK_USAGE_COEFFICIENT_TO_SELECT - 1.0) * 100));
}
if (out_disable_reason)
*out_disable_reason = fmt::format("Insufficient available disk space, required {}", ReadableSize(required_disk_space));
return SelectPartsDecision::CANNOT_SELECT;
}
LOG_DEBUG(log, "Selected {} parts from {} to {}", parts.size(), parts.front()->name, parts.back()->name);
future_part.assign(std::move(parts));
available_disk_space -= required_disk_space;
return SelectPartsDecision::SELECTED;
}
MergeTreeData::DataPartsVector MergeTreeDataMergerMutator::selectAllPartsFromPartition(const String & partition_id)
{
MergeTreeData::DataPartsVector parts_from_partition;
MergeTreeData::DataParts data_parts = data.getDataParts();
for (const auto & current_part : data_parts)
{
if (current_part->info.partition_id != partition_id)
continue;
parts_from_partition.push_back(current_part);
}
return parts_from_partition;
}
/// PK columns are sorted and merged, ordinary columns are gathered using info from merge step
static void extractMergingAndGatheringColumns(
const NamesAndTypesList & storage_columns,
const ExpressionActionsPtr & sorting_key_expr,
const IndicesDescription & indexes,
const ProjectionsDescription & projections,
const MergeTreeData::MergingParams & merging_params,
NamesAndTypesList & gathering_columns, Names & gathering_column_names,
NamesAndTypesList & merging_columns, Names & merging_column_names)
{
Names sort_key_columns_vec = sorting_key_expr->getRequiredColumns();
std::set<String> key_columns(sort_key_columns_vec.cbegin(), sort_key_columns_vec.cend());
for (const auto & index : indexes)
{
Names index_columns_vec = index.expression->getRequiredColumns();
std::copy(index_columns_vec.cbegin(), index_columns_vec.cend(),
std::inserter(key_columns, key_columns.end()));
}
for (const auto & projection : projections)
{
Names projection_columns_vec = projection.required_columns;
std::copy(projection_columns_vec.cbegin(), projection_columns_vec.cend(),
std::inserter(key_columns, key_columns.end()));
}
/// Force sign column for Collapsing mode
if (merging_params.mode == MergeTreeData::MergingParams::Collapsing)
key_columns.emplace(merging_params.sign_column);
/// Force version column for Replacing mode
if (merging_params.mode == MergeTreeData::MergingParams::Replacing)
key_columns.emplace(merging_params.version_column);
/// Force sign column for VersionedCollapsing mode. Version is already in primary key.
if (merging_params.mode == MergeTreeData::MergingParams::VersionedCollapsing)
key_columns.emplace(merging_params.sign_column);
/// Force to merge at least one column in case of empty key
if (key_columns.empty())
key_columns.emplace(storage_columns.front().name);
/// TODO: also force "summing" and "aggregating" columns to make Horizontal merge only for such columns
for (const auto & column : storage_columns)
{
if (key_columns.count(column.name))
{
merging_columns.emplace_back(column);
merging_column_names.emplace_back(column.name);
}
else
{
gathering_columns.emplace_back(column);
gathering_column_names.emplace_back(column.name);
}
}
}
/* Allow to compute more accurate progress statistics */
class ColumnSizeEstimator
{
MergeTreeData::DataPart::ColumnToSize map;
public:
/// Stores approximate size of columns in bytes
/// Exact values are not required since it used for relative values estimation (progress).
size_t sum_total = 0;
size_t sum_index_columns = 0;
size_t sum_ordinary_columns = 0;
ColumnSizeEstimator(const MergeTreeData::DataPart::ColumnToSize & map_, const Names & key_columns, const Names & ordinary_columns)
: map(map_)
{
for (const auto & name : key_columns)
if (!map.count(name)) map[name] = 0;
for (const auto & name : ordinary_columns)
if (!map.count(name)) map[name] = 0;
for (const auto & name : key_columns)
sum_index_columns += map.at(name);
for (const auto & name : ordinary_columns)
sum_ordinary_columns += map.at(name);
sum_total = std::max(static_cast<decltype(sum_index_columns)>(1), sum_index_columns + sum_ordinary_columns);
}
Float64 columnWeight(const String & column) const
{
return static_cast<Float64>(map.at(column)) / sum_total;
}
Float64 keyColumnsWeight() const
{
return static_cast<Float64>(sum_index_columns) / sum_total;
}
};
/** Progress callback.
* What it should update:
* - approximate progress
* - amount of read rows
* - various metrics
* - time elapsed for current merge.
*/
/// Auxiliary struct that for each merge stage stores its current progress.
/// A stage is: the horizontal stage + a stage for each gathered column (if we are doing a
/// Vertical merge) or a mutation of a single part. During a single stage all rows are read.
struct MergeStageProgress
{
explicit MergeStageProgress(Float64 weight_)
: is_first(true) , weight(weight_)
{
}
MergeStageProgress(Float64 initial_progress_, Float64 weight_)
: initial_progress(initial_progress_), is_first(false), weight(weight_)
{
}
Float64 initial_progress = 0.0;
bool is_first;
Float64 weight;
UInt64 total_rows = 0;
UInt64 rows_read = 0;
};
class MergeProgressCallback
{
public:
MergeProgressCallback(
MergeList::Entry & merge_entry_, UInt64 & watch_prev_elapsed_, MergeStageProgress & stage_)
: merge_entry(merge_entry_)
, watch_prev_elapsed(watch_prev_elapsed_)
, stage(stage_)
{
updateWatch();
}
MergeList::Entry & merge_entry;
UInt64 & watch_prev_elapsed;
MergeStageProgress & stage;
void updateWatch()
{
UInt64 watch_curr_elapsed = merge_entry->watch.elapsed();
ProfileEvents::increment(ProfileEvents::MergesTimeMilliseconds, (watch_curr_elapsed - watch_prev_elapsed) / 1000000);
watch_prev_elapsed = watch_curr_elapsed;
}
void operator() (const Progress & value)
{
ProfileEvents::increment(ProfileEvents::MergedUncompressedBytes, value.read_bytes);
if (stage.is_first)
{
ProfileEvents::increment(ProfileEvents::MergedRows, value.read_rows);
ProfileEvents::increment(ProfileEvents::Merge);
}
updateWatch();
merge_entry->bytes_read_uncompressed += value.read_bytes;
if (stage.is_first)
merge_entry->rows_read += value.read_rows;
stage.total_rows += value.total_rows_to_read;
stage.rows_read += value.read_rows;
if (stage.total_rows > 0)
{
merge_entry->progress.store(
stage.initial_progress + stage.weight * stage.rows_read / stage.total_rows,
std::memory_order_relaxed);
}
}
};
static bool needSyncPart(size_t input_rows, size_t input_bytes, const MergeTreeSettings & settings)
{
return ((settings.min_rows_to_fsync_after_merge && input_rows >= settings.min_rows_to_fsync_after_merge)
|| (settings.min_compressed_bytes_to_fsync_after_merge && input_bytes >= settings.min_compressed_bytes_to_fsync_after_merge));
}
/// parts should be sorted.
MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTemporaryPart(
const FutureMergedMutatedPart & future_part,
const StorageMetadataPtr & metadata_snapshot,
MergeList::Entry & merge_entry,
TableLockHolder & holder,
time_t time_of_merge,
ContextPtr context,
const ReservationPtr & space_reservation,
bool deduplicate,
const Names & deduplicate_by_columns,
const MergeTreeData::MergingParams & merging_params,
const IMergeTreeDataPart * parent_part,
const String & prefix)
{
const String tmp_prefix = parent_part ? prefix : "tmp_merge_";
if (merges_blocker.isCancelled())
throw Exception("Cancelled merging parts", ErrorCodes::ABORTED);
/// We don't want to perform merge assigned with TTL as normal merge, so
/// throw exception
if (isTTLMergeType(future_part.merge_type) && ttl_merges_blocker.isCancelled())
throw Exception("Cancelled merging parts with TTL", ErrorCodes::ABORTED);
const MergeTreeData::DataPartsVector & parts = future_part.parts;
LOG_DEBUG(log, "Merging {} parts: from {} to {} into {}", parts.size(), parts.front()->name, parts.back()->name, future_part.type.toString());
if (deduplicate)
{
if (deduplicate_by_columns.empty())
LOG_DEBUG(log, "DEDUPLICATE BY all columns");
else
LOG_DEBUG(log, "DEDUPLICATE BY ('{}')", fmt::join(deduplicate_by_columns, "', '"));
}
auto disk = space_reservation->getDisk();
String part_path = data.relative_data_path;
String new_part_tmp_path = part_path + tmp_prefix + future_part.name + (parent_part ? ".proj" : "") + "/";
if (disk->exists(new_part_tmp_path))
throw Exception("Directory " + fullPath(disk, new_part_tmp_path) + " already exists", ErrorCodes::DIRECTORY_ALREADY_EXISTS);
Names all_column_names = metadata_snapshot->getColumns().getNamesOfPhysical();
NamesAndTypesList storage_columns = metadata_snapshot->getColumns().getAllPhysical();
const auto data_settings = data.getSettings();
NamesAndTypesList gathering_columns;
NamesAndTypesList merging_columns;
Names gathering_column_names, merging_column_names;
extractMergingAndGatheringColumns(
storage_columns,
metadata_snapshot->getSortingKey().expression,
metadata_snapshot->getSecondaryIndices(),
metadata_snapshot->getProjections(),
merging_params,
gathering_columns,
gathering_column_names,
merging_columns,
merging_column_names);
auto single_disk_volume = std::make_shared<SingleDiskVolume>("volume_" + future_part.name, disk, 0);
MergeTreeData::MutableDataPartPtr new_data_part = data.createPart(
future_part.name,
future_part.type,
future_part.part_info,
single_disk_volume,
tmp_prefix + future_part.name + (parent_part ? ".proj" : ""),
parent_part);
new_data_part->uuid = future_part.uuid;
new_data_part->setColumns(storage_columns);
new_data_part->partition.assign(future_part.getPartition());
new_data_part->is_temp = parent_part == nullptr;
bool need_remove_expired_values = false;
bool force_ttl = false;
for (const auto & part : parts)
{
new_data_part->ttl_infos.update(part->ttl_infos);
if (metadata_snapshot->hasAnyTTL() && !part->checkAllTTLCalculated(metadata_snapshot))
{
LOG_INFO(log, "Some TTL values were not calculated for part {}. Will calculate them forcefully during merge.", part->name);
need_remove_expired_values = true;
force_ttl = true;
}
}
const auto & part_min_ttl = new_data_part->ttl_infos.part_min_ttl;
if (part_min_ttl && part_min_ttl <= time_of_merge)
need_remove_expired_values = true;
if (need_remove_expired_values && ttl_merges_blocker.isCancelled())
{
LOG_INFO(log, "Part {} has values with expired TTL, but merges with TTL are cancelled.", new_data_part->name);
need_remove_expired_values = false;
}
size_t sum_input_rows_upper_bound = merge_entry->total_rows_count;
size_t sum_compressed_bytes_upper_bound = merge_entry->total_size_bytes_compressed;
MergeAlgorithm chosen_merge_algorithm = chooseMergeAlgorithm(
parts, sum_input_rows_upper_bound, gathering_columns, deduplicate, need_remove_expired_values, merging_params);
merge_entry->merge_algorithm.store(chosen_merge_algorithm, std::memory_order_relaxed);
LOG_DEBUG(log, "Selected MergeAlgorithm: {}", toString(chosen_merge_algorithm));
/// Note: this is done before creating input streams, because otherwise data.data_parts_mutex
/// (which is locked in data.getTotalActiveSizeInBytes())
/// (which is locked in shared mode when input streams are created) and when inserting new data
/// the order is reverse. This annoys TSan even though one lock is locked in shared mode and thus
/// deadlock is impossible.
auto compression_codec = data.getCompressionCodecForPart(merge_entry->total_size_bytes_compressed, new_data_part->ttl_infos, time_of_merge);
auto tmp_disk = context->getTemporaryVolume()->getDisk();
String rows_sources_file_path;
std::unique_ptr<WriteBufferFromFileBase> rows_sources_uncompressed_write_buf;
std::unique_ptr<WriteBuffer> rows_sources_write_buf;
std::optional<ColumnSizeEstimator> column_sizes;
SyncGuardPtr sync_guard;
if (chosen_merge_algorithm == MergeAlgorithm::Vertical)
{
tmp_disk->createDirectories(new_part_tmp_path);
rows_sources_file_path = new_part_tmp_path + "rows_sources";
rows_sources_uncompressed_write_buf = tmp_disk->writeFile(rows_sources_file_path);
rows_sources_write_buf = std::make_unique<CompressedWriteBuffer>(*rows_sources_uncompressed_write_buf);
MergeTreeData::DataPart::ColumnToSize merged_column_to_size;
for (const MergeTreeData::DataPartPtr & part : parts)
part->accumulateColumnSizes(merged_column_to_size);
column_sizes = ColumnSizeEstimator(merged_column_to_size, merging_column_names, gathering_column_names);
if (data.getSettings()->fsync_part_directory)
sync_guard = disk->getDirectorySyncGuard(new_part_tmp_path);
}
else
{
merging_columns = storage_columns;
merging_column_names = all_column_names;
gathering_columns.clear();
gathering_column_names.clear();
}
/** Read from all parts, merge and write into a new one.
* In passing, we calculate expression for sorting.
*/
Pipes pipes;
UInt64 watch_prev_elapsed = 0;
/// We count total amount of bytes in parts
/// and use direct_io + aio if there is more than min_merge_bytes_to_use_direct_io
bool read_with_direct_io = false;
if (data_settings->min_merge_bytes_to_use_direct_io != 0)
{
size_t total_size = 0;
for (const auto & part : parts)
{
total_size += part->getBytesOnDisk();
if (total_size >= data_settings->min_merge_bytes_to_use_direct_io)
{
LOG_DEBUG(log, "Will merge parts reading files in O_DIRECT");
read_with_direct_io = true;
break;
}
}
}
MergeStageProgress horizontal_stage_progress(
column_sizes ? column_sizes->keyColumnsWeight() : 1.0);
for (const auto & part : parts)
{
auto input = std::make_unique<MergeTreeSequentialSource>(
data, metadata_snapshot, part, merging_column_names, read_with_direct_io, true);
input->setProgressCallback(
MergeProgressCallback(merge_entry, watch_prev_elapsed, horizontal_stage_progress));
Pipe pipe(std::move(input));
if (metadata_snapshot->hasSortingKey())
{
pipe.addSimpleTransform([&metadata_snapshot](const Block & header)
{
return std::make_shared<ExpressionTransform>(header, metadata_snapshot->getSortingKey().expression);
});
}
pipes.emplace_back(std::move(pipe));
}
Names sort_columns = metadata_snapshot->getSortingKeyColumns();
SortDescription sort_description;
size_t sort_columns_size = sort_columns.size();
sort_description.reserve(sort_columns_size);
Names partition_key_columns = metadata_snapshot->getPartitionKey().column_names;
Block header = pipes.at(0).getHeader();
for (size_t i = 0; i < sort_columns_size; ++i)
sort_description.emplace_back(header.getPositionByName(sort_columns[i]), 1, 1);
/// The order of the streams is important: when the key is matched, the elements go in the order of the source stream number.
/// In the merged part, the lines with the same key must be in the ascending order of the identifier of original part,
/// that is going in insertion order.
ProcessorPtr merged_transform;
/// If merge is vertical we cannot calculate it
bool blocks_are_granules_size = (chosen_merge_algorithm == MergeAlgorithm::Vertical);
UInt64 merge_block_size = data_settings->merge_max_block_size;
switch (merging_params.mode)
{
case MergeTreeData::MergingParams::Ordinary:
merged_transform = std::make_unique<MergingSortedTransform>(
header, pipes.size(), sort_description, merge_block_size, 0, rows_sources_write_buf.get(), true, blocks_are_granules_size);
break;
case MergeTreeData::MergingParams::Collapsing:
merged_transform = std::make_unique<CollapsingSortedTransform>(
header, pipes.size(), sort_description, merging_params.sign_column, false,
merge_block_size, rows_sources_write_buf.get(), blocks_are_granules_size);
break;
case MergeTreeData::MergingParams::Summing:
merged_transform = std::make_unique<SummingSortedTransform>(
header, pipes.size(), sort_description, merging_params.columns_to_sum, partition_key_columns, merge_block_size);
break;
case MergeTreeData::MergingParams::Aggregating:
merged_transform = std::make_unique<AggregatingSortedTransform>(
header, pipes.size(), sort_description, merge_block_size);
break;
case MergeTreeData::MergingParams::Replacing:
merged_transform = std::make_unique<ReplacingSortedTransform>(
header, pipes.size(), sort_description, merging_params.version_column,
merge_block_size, rows_sources_write_buf.get(), blocks_are_granules_size);
break;
case MergeTreeData::MergingParams::Graphite:
merged_transform = std::make_unique<GraphiteRollupSortedTransform>(
header, pipes.size(), sort_description, merge_block_size,
merging_params.graphite_params, time_of_merge);
break;
case MergeTreeData::MergingParams::VersionedCollapsing:
merged_transform = std::make_unique<VersionedCollapsingTransform>(
header, pipes.size(), sort_description, merging_params.sign_column,
merge_block_size, rows_sources_write_buf.get(), blocks_are_granules_size);
break;
}
QueryPipeline pipeline;
pipeline.init(Pipe::unitePipes(std::move(pipes)));
pipeline.addTransform(std::move(merged_transform));
pipeline.setMaxThreads(1);
BlockInputStreamPtr merged_stream = std::make_shared<PipelineExecutingBlockInputStream>(std::move(pipeline));
if (deduplicate)
merged_stream = std::make_shared<DistinctSortedBlockInputStream>(merged_stream, sort_description, SizeLimits(), 0 /*limit_hint*/, deduplicate_by_columns);
if (need_remove_expired_values)
merged_stream = std::make_shared<TTLBlockInputStream>(merged_stream, data, metadata_snapshot, new_data_part, time_of_merge, force_ttl);
if (metadata_snapshot->hasSecondaryIndices())
{
const auto & indices = metadata_snapshot->getSecondaryIndices();
merged_stream = std::make_shared<ExpressionBlockInputStream>(
merged_stream, indices.getSingleExpressionForIndices(metadata_snapshot->getColumns(), data.getContext()));
merged_stream = std::make_shared<MaterializingBlockInputStream>(merged_stream);
}
const auto & index_factory = MergeTreeIndexFactory::instance();
MergedBlockOutputStream to{
new_data_part,
metadata_snapshot,
merging_columns,
index_factory.getMany(metadata_snapshot->getSecondaryIndices()),
compression_codec,
blocks_are_granules_size};
merged_stream->readPrefix();
to.writePrefix();
size_t rows_written = 0;
const size_t initial_reservation = space_reservation ? space_reservation->getSize() : 0;
auto is_cancelled = [&]() { return merges_blocker.isCancelled()
|| (need_remove_expired_values && ttl_merges_blocker.isCancelled()); };
Block block;
while (!is_cancelled() && (block = merged_stream->read()))
{
rows_written += block.rows();
to.write(block);
merge_entry->rows_written = merged_stream->getProfileInfo().rows;
merge_entry->bytes_written_uncompressed = merged_stream->getProfileInfo().bytes;
/// Reservation updates is not performed yet, during the merge it may lead to higher free space requirements
if (space_reservation && sum_input_rows_upper_bound)
{
/// The same progress from merge_entry could be used for both algorithms (it should be more accurate)
/// But now we are using inaccurate row-based estimation in Horizontal case for backward compatibility
Float64 progress = (chosen_merge_algorithm == MergeAlgorithm::Horizontal)
? std::min(1., 1. * rows_written / sum_input_rows_upper_bound)
: std::min(1., merge_entry->progress.load(std::memory_order_relaxed));
space_reservation->update(static_cast<size_t>((1. - progress) * initial_reservation));
}
}
merged_stream->readSuffix();
merged_stream.reset();
if (merges_blocker.isCancelled())
throw Exception("Cancelled merging parts", ErrorCodes::ABORTED);
if (need_remove_expired_values && ttl_merges_blocker.isCancelled())
throw Exception("Cancelled merging parts with expired TTL", ErrorCodes::ABORTED);
bool need_sync = needSyncPart(sum_input_rows_upper_bound, sum_compressed_bytes_upper_bound, *data_settings);
MergeTreeData::DataPart::Checksums checksums_gathered_columns;
/// Gather ordinary columns
if (chosen_merge_algorithm == MergeAlgorithm::Vertical)
{
size_t sum_input_rows_exact = merge_entry->rows_read;
merge_entry->columns_written = merging_column_names.size();
merge_entry->progress.store(column_sizes->keyColumnsWeight(), std::memory_order_relaxed);
BlockInputStreams column_part_streams(parts.size());
auto it_name_and_type = gathering_columns.cbegin();
rows_sources_write_buf->next();
rows_sources_uncompressed_write_buf->next();
/// Ensure data has written to disk.
rows_sources_uncompressed_write_buf->finalize();
size_t rows_sources_count = rows_sources_write_buf->count();
/// In special case, when there is only one source part, and no rows were skipped, we may have
/// skipped writing rows_sources file. Otherwise rows_sources_count must be equal to the total
/// number of input rows.
if ((rows_sources_count > 0 || parts.size() > 1) && sum_input_rows_exact != rows_sources_count)
throw Exception("Number of rows in source parts (" + toString(sum_input_rows_exact)
+ ") differs from number of bytes written to rows_sources file (" + toString(rows_sources_count)
+ "). It is a bug.", ErrorCodes::LOGICAL_ERROR);
CompressedReadBufferFromFile rows_sources_read_buf(tmp_disk->readFile(rows_sources_file_path));
IMergedBlockOutputStream::WrittenOffsetColumns written_offset_columns;
for (size_t column_num = 0, gathering_column_names_size = gathering_column_names.size();
column_num < gathering_column_names_size;
++column_num, ++it_name_and_type)
{
const String & column_name = it_name_and_type->name;
Names column_names{column_name};
Float64 progress_before = merge_entry->progress.load(std::memory_order_relaxed);
MergeStageProgress column_progress(progress_before, column_sizes->columnWeight(column_name));
for (size_t part_num = 0; part_num < parts.size(); ++part_num)
{
auto column_part_source = std::make_shared<MergeTreeSequentialSource>(
data, metadata_snapshot, parts[part_num], column_names, read_with_direct_io, true);
column_part_source->setProgressCallback(
MergeProgressCallback(merge_entry, watch_prev_elapsed, column_progress));
QueryPipeline column_part_pipeline;
column_part_pipeline.init(Pipe(std::move(column_part_source)));
column_part_pipeline.setMaxThreads(1);
column_part_streams[part_num] =
std::make_shared<PipelineExecutingBlockInputStream>(std::move(column_part_pipeline));
}
rows_sources_read_buf.seek(0, 0);
ColumnGathererStream column_gathered_stream(column_name, column_part_streams, rows_sources_read_buf);
MergedColumnOnlyOutputStream column_to(
new_data_part,
metadata_snapshot,
column_gathered_stream.getHeader(),
compression_codec,
/// we don't need to recalc indices here
/// because all of them were already recalculated and written
/// as key part of vertical merge
std::vector<MergeTreeIndexPtr>{},
&written_offset_columns,
to.getIndexGranularity());
size_t column_elems_written = 0;
column_to.writePrefix();
while (!merges_blocker.isCancelled() && (block = column_gathered_stream.read()))
{
column_elems_written += block.rows();
column_to.write(block);
}
if (merges_blocker.isCancelled())
throw Exception("Cancelled merging parts", ErrorCodes::ABORTED);
column_gathered_stream.readSuffix();
auto changed_checksums = column_to.writeSuffixAndGetChecksums(new_data_part, checksums_gathered_columns, need_sync);
checksums_gathered_columns.add(std::move(changed_checksums));
if (rows_written != column_elems_written)
{
throw Exception("Written " + toString(column_elems_written) + " elements of column " + column_name +
", but " + toString(rows_written) + " rows of PK columns", ErrorCodes::LOGICAL_ERROR);
}
/// NOTE: 'progress' is modified by single thread, but it may be concurrently read from MergeListElement::getInfo() (StorageSystemMerges).
merge_entry->columns_written += 1;
merge_entry->bytes_written_uncompressed += column_gathered_stream.getProfileInfo().bytes;
merge_entry->progress.store(progress_before + column_sizes->columnWeight(column_name), std::memory_order_relaxed);
}
tmp_disk->removeFile(rows_sources_file_path);
}
for (const auto & part : parts)
new_data_part->minmax_idx.merge(part->minmax_idx);
/// Print overall profiling info. NOTE: it may duplicates previous messages
{
double elapsed_seconds = merge_entry->watch.elapsedSeconds();
LOG_DEBUG(log,
"Merge sorted {} rows, containing {} columns ({} merged, {} gathered) in {} sec., {} rows/sec., {}/sec.",
merge_entry->rows_read,
all_column_names.size(),
merging_column_names.size(),
gathering_column_names.size(),
elapsed_seconds,
merge_entry->rows_read / elapsed_seconds,
ReadableSize(merge_entry->bytes_read_uncompressed / elapsed_seconds));
}
for (const auto & projection : metadata_snapshot->getProjections())
{
MergeTreeData::DataPartsVector projection_parts;
for (const auto & part : parts)
{
auto it = part->getProjectionParts().find(projection.name);
if (it != part->getProjectionParts().end())
projection_parts.push_back(it->second);
}
if (projection_parts.size() < parts.size())
{
LOG_DEBUG(log, "Projection {} is not merged because some parts don't have it", projection.name);
continue;
}
LOG_DEBUG(
log,
"Selected {} projection_parts from {} to {}",
projection_parts.size(),
projection_parts.front()->name,
projection_parts.back()->name);
FutureMergedMutatedPart projection_future_part;
projection_future_part.assign(std::move(projection_parts));
projection_future_part.name = projection.name;
projection_future_part.path = future_part.path + "/" + projection.name + ".proj/";
projection_future_part.part_info = {"all", 0, 0, 0};
MergeTreeData::MergingParams projection_merging_params;
projection_merging_params.mode = MergeTreeData::MergingParams::Ordinary;
if (projection.type == "aggregate")
projection_merging_params.mode = MergeTreeData::MergingParams::Aggregating;
// TODO Should we use a new merge_entry for projection?
auto merged_projection_part = mergePartsToTemporaryPart(
projection_future_part,
projection.metadata,
merge_entry,
holder,
time_of_merge,
context,
space_reservation,
deduplicate,
deduplicate_by_columns,
projection_merging_params,
new_data_part.get());
new_data_part->addProjectionPart(projection.name, std::move(merged_projection_part));
}
if (chosen_merge_algorithm != MergeAlgorithm::Vertical)
to.writeSuffixAndFinalizePart(new_data_part, need_sync);
else
to.writeSuffixAndFinalizePart(new_data_part, need_sync, &storage_columns, &checksums_gathered_columns);
return new_data_part;
}
MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTemporaryPart(
const FutureMergedMutatedPart & future_part,
const StorageMetadataPtr & metadata_snapshot,
const MutationCommands & commands,
MergeListEntry & merge_entry,
time_t time_of_mutation,
ContextPtr context,
const ReservationPtr & space_reservation,
TableLockHolder & holder)
{
checkOperationIsNotCanceled(merge_entry);
if (future_part.parts.size() != 1)
throw Exception("Trying to mutate " + toString(future_part.parts.size()) + " parts, not one. "
"This is a bug.", ErrorCodes::LOGICAL_ERROR);
CurrentMetrics::Increment num_mutations{CurrentMetrics::PartMutation};
const auto & source_part = future_part.parts[0];
auto storage_from_source_part = StorageFromMergeTreeDataPart::create(source_part);
auto context_for_reading = Context::createCopy(context);
context_for_reading->setSetting("max_streams_to_max_threads_ratio", 1);
context_for_reading->setSetting("max_threads", 1);
/// Allow mutations to work when force_index_by_date or force_primary_key is on.
context_for_reading->setSetting("force_index_by_date", Field(0));
context_for_reading->setSetting("force_primary_key", Field(0));
MutationCommands commands_for_part;
for (const auto & command : commands)
{
if (command.partition == nullptr || future_part.parts[0]->info.partition_id == data.getPartitionIDFromQuery(
command.partition, context_for_reading))
commands_for_part.emplace_back(command);
}
if (source_part->isStoredOnDisk() && !isStorageTouchedByMutations(
storage_from_source_part, metadata_snapshot, commands_for_part, Context::createCopy(context_for_reading)))
{
LOG_TRACE(log, "Part {} doesn't change up to mutation version {}", source_part->name, future_part.part_info.mutation);
return data.cloneAndLoadDataPartOnSameDisk(source_part, "tmp_clone_", future_part.part_info, metadata_snapshot);
}
else
{
LOG_TRACE(log, "Mutating part {} to mutation version {}", source_part->name, future_part.part_info.mutation);
}
BlockInputStreamPtr in = nullptr;
Block updated_header;
std::unique_ptr<MutationsInterpreter> interpreter;
const auto data_settings = data.getSettings();
MutationCommands for_interpreter;
MutationCommands for_file_renames;
splitMutationCommands(source_part, commands_for_part, for_interpreter, for_file_renames);
UInt64 watch_prev_elapsed = 0;
MergeStageProgress stage_progress(1.0);
NamesAndTypesList storage_columns = metadata_snapshot->getColumns().getAllPhysical();
NameSet materialized_indices;
NameSet materialized_projections;
MutationsInterpreter::MutationKind::MutationKindEnum mutation_kind
= MutationsInterpreter::MutationKind::MutationKindEnum::MUTATE_UNKNOWN;
if (!for_interpreter.empty())
{
interpreter = std::make_unique<MutationsInterpreter>(
storage_from_source_part, metadata_snapshot, for_interpreter, context_for_reading, true);
materialized_indices = interpreter->grabMaterializedIndices();
materialized_projections = interpreter->grabMaterializedProjections();
mutation_kind = interpreter->getMutationKind();
in = interpreter->execute();
updated_header = interpreter->getUpdatedHeader();
in->setProgressCallback(MergeProgressCallback(merge_entry, watch_prev_elapsed, stage_progress));
}
auto single_disk_volume = std::make_shared<SingleDiskVolume>("volume_" + future_part.name, space_reservation->getDisk(), 0);
auto new_data_part = data.createPart(
future_part.name, future_part.type, future_part.part_info, single_disk_volume, "tmp_mut_" + future_part.name);
new_data_part->uuid = future_part.uuid;
new_data_part->is_temp = true;
new_data_part->ttl_infos = source_part->ttl_infos;
/// It shouldn't be changed by mutation.
new_data_part->index_granularity_info = source_part->index_granularity_info;
new_data_part->setColumns(getColumnsForNewDataPart(source_part, updated_header, storage_columns, for_file_renames));
new_data_part->partition.assign(source_part->partition);
auto disk = new_data_part->volume->getDisk();
String new_part_tmp_path = new_data_part->getFullRelativePath();
SyncGuardPtr sync_guard;
if (data.getSettings()->fsync_part_directory)
sync_guard = disk->getDirectorySyncGuard(new_part_tmp_path);
/// Don't change granularity type while mutating subset of columns
auto mrk_extension = source_part->index_granularity_info.is_adaptive ? getAdaptiveMrkExtension(new_data_part->getType())
: getNonAdaptiveMrkExtension();
bool need_sync = needSyncPart(source_part->rows_count, source_part->getBytesOnDisk(), *data_settings);
bool need_remove_expired_values = false;
if (in && shouldExecuteTTL(metadata_snapshot, in->getHeader().getNamesAndTypesList().getNames(), commands_for_part))
need_remove_expired_values = true;
/// All columns from part are changed and may be some more that were missing before in part
if (!isWidePart(source_part)
|| (mutation_kind == MutationsInterpreter::MutationKind::MUTATE_OTHER && interpreter && interpreter->isAffectingAllColumns()))
{
disk->createDirectories(new_part_tmp_path);
/// Note: this is done before creating input streams, because otherwise data.data_parts_mutex
/// (which is locked in data.getTotalActiveSizeInBytes())
/// (which is locked in shared mode when input streams are created) and when inserting new data
/// the order is reverse. This annoys TSan even though one lock is locked in shared mode and thus
/// deadlock is impossible.
auto compression_codec = data.getCompressionCodecForPart(source_part->getBytesOnDisk(), source_part->ttl_infos, time_of_mutation);
auto part_indices = getIndicesForNewDataPart(metadata_snapshot->getSecondaryIndices(), for_file_renames);
auto part_projections = getProjectionsForNewDataPart(metadata_snapshot->getProjections(), for_file_renames);
mutateAllPartColumns(
new_data_part,
metadata_snapshot,
part_indices,
part_projections,
in,
time_of_mutation,
compression_codec,
merge_entry,
need_remove_expired_values,
need_sync,
space_reservation,
holder,
context);
/// no finalization required, because mutateAllPartColumns use
/// MergedBlockOutputStream which finilaze all part fields itself
}
else /// TODO: check that we modify only non-key columns in this case.
{
/// We will modify only some of the columns. Other columns and key values can be copied as-is.
NameSet updated_columns;
if (mutation_kind != MutationsInterpreter::MutationKind::MUTATE_INDEX_PROJECTION)
{
for (const auto & name_type : updated_header.getNamesAndTypesList())
updated_columns.emplace(name_type.name);
}
auto indices_to_recalc = getIndicesToRecalculate(
in, updated_columns, metadata_snapshot, context, materialized_indices, source_part);
auto projections_to_recalc = getProjectionsToRecalculate(
updated_columns, metadata_snapshot, materialized_projections, source_part);
NameSet files_to_skip = collectFilesToSkip(
source_part,
mutation_kind == MutationsInterpreter::MutationKind::MUTATE_INDEX_PROJECTION ? Block{} : updated_header,
indices_to_recalc,
mrk_extension,
projections_to_recalc);
NameToNameVector files_to_rename = collectFilesForRenames(source_part, for_file_renames, mrk_extension);
if (indices_to_recalc.empty() && projections_to_recalc.empty() && mutation_kind != MutationsInterpreter::MutationKind::MUTATE_OTHER
&& files_to_rename.empty())
{
LOG_TRACE(
log, "Part {} doesn't change up to mutation version {} (optimized)", source_part->name, future_part.part_info.mutation);
return data.cloneAndLoadDataPartOnSameDisk(source_part, "tmp_clone_", future_part.part_info, metadata_snapshot);
}
if (need_remove_expired_values)
files_to_skip.insert("ttl.txt");
disk->createDirectories(new_part_tmp_path);
/// Create hardlinks for unchanged files
for (auto it = disk->iterateDirectory(source_part->getFullRelativePath()); it->isValid(); it->next())
{
if (files_to_skip.count(it->name()))
continue;
String destination = new_part_tmp_path;
String file_name = it->name();
auto rename_it = std::find_if(files_to_rename.begin(), files_to_rename.end(), [&file_name](const auto & rename_pair) { return rename_pair.first == file_name; });
if (rename_it != files_to_rename.end())
{
if (rename_it->second.empty())
continue;
destination += rename_it->second;
}
else
{
destination += it->name();
}
if (!disk->isDirectory(it->path()))
disk->createHardLink(it->path(), destination);
else if (!startsWith("tmp_", it->name())) // ignore projection tmp merge dir
{
// it's a projection part directory
disk->createDirectories(destination);
for (auto p_it = disk->iterateDirectory(it->path()); p_it->isValid(); p_it->next())
{
String p_destination = destination + "/";
String p_file_name = p_it->name();
p_destination += p_it->name();
disk->createHardLink(p_it->path(), p_destination);
}
}
}
merge_entry->columns_written = storage_columns.size() - updated_header.columns();
new_data_part->checksums = source_part->checksums;
auto compression_codec = source_part->default_codec;
if (in)
{
mutateSomePartColumns(
source_part,
metadata_snapshot,
indices_to_recalc,
projections_to_recalc,
mutation_kind == MutationsInterpreter::MutationKind::MUTATE_INDEX_PROJECTION ? Block{} : updated_header,
new_data_part,
in,
time_of_mutation,
compression_codec,
merge_entry,
need_remove_expired_values,
need_sync,
space_reservation,
holder,
context);
}
for (const auto & [rename_from, rename_to] : files_to_rename)
{
if (rename_to.empty() && new_data_part->checksums.files.count(rename_from))
{
new_data_part->checksums.files.erase(rename_from);
}
else if (new_data_part->checksums.files.count(rename_from))
{
new_data_part->checksums.files[rename_to] = new_data_part->checksums.files[rename_from];
new_data_part->checksums.files.erase(rename_from);
}
}
finalizeMutatedPart(source_part, new_data_part, need_remove_expired_values, compression_codec);
}
return new_data_part;
}
MergeAlgorithm MergeTreeDataMergerMutator::chooseMergeAlgorithm(
const MergeTreeData::DataPartsVector & parts,
size_t sum_rows_upper_bound,
const NamesAndTypesList & gathering_columns,
bool deduplicate,
bool need_remove_expired_values,
const MergeTreeData::MergingParams & merging_params) const
{
const auto data_settings = data.getSettings();
if (deduplicate)
return MergeAlgorithm::Horizontal;
if (data_settings->enable_vertical_merge_algorithm == 0)
return MergeAlgorithm::Horizontal;
if (need_remove_expired_values)
return MergeAlgorithm::Horizontal;
for (const auto & part : parts)
if (!part->supportsVerticalMerge())
return MergeAlgorithm::Horizontal;
bool is_supported_storage =
merging_params.mode == MergeTreeData::MergingParams::Ordinary ||
merging_params.mode == MergeTreeData::MergingParams::Collapsing ||
merging_params.mode == MergeTreeData::MergingParams::Replacing ||
merging_params.mode == MergeTreeData::MergingParams::VersionedCollapsing;
bool enough_ordinary_cols = gathering_columns.size() >= data_settings->vertical_merge_algorithm_min_columns_to_activate;
bool enough_total_rows = sum_rows_upper_bound >= data_settings->vertical_merge_algorithm_min_rows_to_activate;
bool no_parts_overflow = parts.size() <= RowSourcePart::MAX_PARTS;
auto merge_alg = (is_supported_storage && enough_total_rows && enough_ordinary_cols && no_parts_overflow) ?
MergeAlgorithm::Vertical : MergeAlgorithm::Horizontal;
return merge_alg;
}
MergeTreeData::DataPartPtr MergeTreeDataMergerMutator::renameMergedTemporaryPart(
MergeTreeData::MutableDataPartPtr & new_data_part,
const MergeTreeData::DataPartsVector & parts,
MergeTreeData::Transaction * out_transaction)
{
/// Rename new part, add to the set and remove original parts.
auto replaced_parts = data.renameTempPartAndReplace(new_data_part, nullptr, out_transaction);
/// Let's check that all original parts have been deleted and only them.
if (replaced_parts.size() != parts.size())
{
/** This is normal, although this happens rarely.
*
* The situation - was replaced 0 parts instead of N can be, for example, in the following case
* - we had A part, but there was no B and C parts;
* - A, B -> AB was in the queue, but it has not been done, because there is no B part;
* - AB, C -> ABC was in the queue, but it has not been done, because there are no AB and C parts;
* - we have completed the task of downloading a B part;
* - we started to make A, B -> AB merge, since all parts appeared;
* - we decided to download ABC part from another replica, since it was impossible to make merge AB, C -> ABC;
* - ABC part appeared. When it was added, old A, B, C parts were deleted;
* - AB merge finished. AB part was added. But this is an obsolete part. The log will contain the message `Obsolete part added`,
* then we get here.
*
* When M > N parts could be replaced?
* - new block was added in ReplicatedMergeTreeBlockOutputStream;
* - it was added to working dataset in memory and renamed on filesystem;
* - but ZooKeeper transaction that adds it to reference dataset in ZK failed;
* - and it is failed due to connection loss, so we don't rollback working dataset in memory,
* because we don't know if the part was added to ZK or not
* (see ReplicatedMergeTreeBlockOutputStream)
* - then method selectPartsToMerge selects a range and sees, that EphemeralLock for the block in this part is unlocked,
* and so it is possible to merge a range skipping this part.
* (NOTE: Merging with part that is not in ZK is not possible, see checks in 'createLogEntryToMergeParts'.)
* - and after merge, this part will be removed in addition to parts that was merged.
*/
LOG_WARNING(log, "Unexpected number of parts removed when adding {}: {} instead of {}", new_data_part->name, replaced_parts.size(), parts.size());
}
else
{
for (size_t i = 0; i < parts.size(); ++i)
if (parts[i]->name != replaced_parts[i]->name)
throw Exception("Unexpected part removed when adding " + new_data_part->name + ": " + replaced_parts[i]->name
+ " instead of " + parts[i]->name, ErrorCodes::LOGICAL_ERROR);
}
LOG_TRACE(log, "Merged {} parts: from {} to {}", parts.size(), parts.front()->name, parts.back()->name);
return new_data_part;
}
size_t MergeTreeDataMergerMutator::estimateNeededDiskSpace(const MergeTreeData::DataPartsVector & source_parts)
{
size_t res = 0;
for (const MergeTreeData::DataPartPtr & part : source_parts)
res += part->getBytesOnDisk();
return static_cast<size_t>(res * DISK_USAGE_COEFFICIENT_TO_RESERVE);
}
void MergeTreeDataMergerMutator::splitMutationCommands(
MergeTreeData::DataPartPtr part,
const MutationCommands & commands,
MutationCommands & for_interpreter,
MutationCommands & for_file_renames)
{
ColumnsDescription part_columns(part->getColumns());
if (!isWidePart(part))
{
NameSet mutated_columns;
for (const auto & command : commands)
{
if (command.type == MutationCommand::Type::MATERIALIZE_INDEX
|| command.type == MutationCommand::Type::MATERIALIZE_PROJECTION
|| command.type == MutationCommand::Type::MATERIALIZE_TTL
|| command.type == MutationCommand::Type::DELETE
|| command.type == MutationCommand::Type::UPDATE)
{
for_interpreter.push_back(command);
for (const auto & [column_name, expr] : command.column_to_update_expression)
mutated_columns.emplace(column_name);
}
else if (command.type == MutationCommand::Type::DROP_INDEX || command.type == MutationCommand::Type::DROP_PROJECTION)
{
for_file_renames.push_back(command);
}
else if (part_columns.has(command.column_name))
{
if (command.type == MutationCommand::Type::DROP_COLUMN)
{
mutated_columns.emplace(command.column_name);
}
else if (command.type == MutationCommand::Type::RENAME_COLUMN)
{
for_interpreter.push_back(
{
.type = MutationCommand::Type::READ_COLUMN,
.column_name = command.rename_to,
});
mutated_columns.emplace(command.column_name);
part_columns.rename(command.column_name, command.rename_to);
}
}
}
/// If it's compact part, then we don't need to actually remove files
/// from disk we just don't read dropped columns
for (const auto & column : part->getColumns())
{
if (!mutated_columns.count(column.name))
for_interpreter.emplace_back(
MutationCommand{.type = MutationCommand::Type::READ_COLUMN, .column_name = column.name, .data_type = column.type});
}
}
else
{
for (const auto & command : commands)
{
if (command.type == MutationCommand::Type::MATERIALIZE_INDEX
|| command.type == MutationCommand::Type::MATERIALIZE_PROJECTION
|| command.type == MutationCommand::Type::MATERIALIZE_TTL
|| command.type == MutationCommand::Type::DELETE
|| command.type == MutationCommand::Type::UPDATE)
{
for_interpreter.push_back(command);
}
else if (command.type == MutationCommand::Type::DROP_INDEX || command.type == MutationCommand::Type::DROP_PROJECTION)
{
for_file_renames.push_back(command);
}
/// If we don't have this column in source part, than we don't need
/// to materialize it
else if (part_columns.has(command.column_name))
{
if (command.type == MutationCommand::Type::READ_COLUMN)
{
for_interpreter.push_back(command);
}
else if (command.type == MutationCommand::Type::RENAME_COLUMN)
{
part_columns.rename(command.column_name, command.rename_to);
for_file_renames.push_back(command);
}
else
{
for_file_renames.push_back(command);
}
}
}
}
}
NameToNameVector MergeTreeDataMergerMutator::collectFilesForRenames(
MergeTreeData::DataPartPtr source_part, const MutationCommands & commands_for_removes, const String & mrk_extension)
{
/// Collect counts for shared streams of different columns. As an example, Nested columns have shared stream with array sizes.
std::map<String, size_t> stream_counts;
for (const NameAndTypePair & column : source_part->getColumns())
{
auto serialization = source_part->getSerializationForColumn(column);
serialization->enumerateStreams(
[&](const ISerialization::SubstreamPath & substream_path)
{
++stream_counts[ISerialization::getFileNameForStream(column, substream_path)];
},
{});
}
NameToNameVector rename_vector;
/// Remove old data
for (const auto & command : commands_for_removes)
{
if (command.type == MutationCommand::Type::DROP_INDEX)
{
if (source_part->checksums.has(INDEX_FILE_PREFIX + command.column_name + ".idx"))
{
rename_vector.emplace_back(INDEX_FILE_PREFIX + command.column_name + ".idx", "");
rename_vector.emplace_back(INDEX_FILE_PREFIX + command.column_name + mrk_extension, "");
}
}
else if (command.type == MutationCommand::Type::DROP_PROJECTION)
{
if (source_part->checksums.has(command.column_name + ".proj"))
rename_vector.emplace_back(command.column_name + ".proj", "");
}
else if (command.type == MutationCommand::Type::DROP_COLUMN)
{
ISerialization::StreamCallback callback = [&](const ISerialization::SubstreamPath & substream_path)
{
String stream_name = ISerialization::getFileNameForStream({command.column_name, command.data_type}, substream_path);
/// Delete files if they are no longer shared with another column.
if (--stream_counts[stream_name] == 0)
{
rename_vector.emplace_back(stream_name + ".bin", "");
rename_vector.emplace_back(stream_name + mrk_extension, "");
}
};
auto column = source_part->getColumns().tryGetByName(command.column_name);
if (column)
{
auto serialization = source_part->getSerializationForColumn(*column);
serialization->enumerateStreams(callback);
}
}
else if (command.type == MutationCommand::Type::RENAME_COLUMN)
{
String escaped_name_from = escapeForFileName(command.column_name);
String escaped_name_to = escapeForFileName(command.rename_to);
ISerialization::StreamCallback callback = [&](const ISerialization::SubstreamPath & substream_path)
{
String stream_from = ISerialization::getFileNameForStream({command.column_name, command.data_type}, substream_path);
String stream_to = boost::replace_first_copy(stream_from, escaped_name_from, escaped_name_to);
if (stream_from != stream_to)
{
rename_vector.emplace_back(stream_from + ".bin", stream_to + ".bin");
rename_vector.emplace_back(stream_from + mrk_extension, stream_to + mrk_extension);
}
};
auto column = source_part->getColumns().tryGetByName(command.column_name);
if (column)
{
auto serialization = source_part->getSerializationForColumn(*column);
serialization->enumerateStreams(callback);
}
}
}
return rename_vector;
}
NameSet MergeTreeDataMergerMutator::collectFilesToSkip(
const MergeTreeDataPartPtr & source_part,
const Block & updated_header,
const std::set<MergeTreeIndexPtr> & indices_to_recalc,
const String & mrk_extension,
const std::set<MergeTreeProjectionPtr> & projections_to_recalc)
{
NameSet files_to_skip = source_part->getFileNamesWithoutChecksums();
/// Skip updated files
for (const auto & entry : updated_header)
{
ISerialization::StreamCallback callback = [&](const ISerialization::SubstreamPath & substream_path)
{
String stream_name = ISerialization::getFileNameForStream({entry.name, entry.type}, substream_path);
files_to_skip.insert(stream_name + ".bin");
files_to_skip.insert(stream_name + mrk_extension);
};
auto serialization = source_part->getSerializationForColumn({entry.name, entry.type});
serialization->enumerateStreams(callback);
}
for (const auto & index : indices_to_recalc)
{
files_to_skip.insert(index->getFileName() + ".idx");
files_to_skip.insert(index->getFileName() + mrk_extension);
}
for (const auto & projection : projections_to_recalc)
{
files_to_skip.insert(projection->getDirectoryName());
}
return files_to_skip;
}
NamesAndTypesList MergeTreeDataMergerMutator::getColumnsForNewDataPart(
MergeTreeData::DataPartPtr source_part,
const Block & updated_header,
NamesAndTypesList storage_columns,
const MutationCommands & commands_for_removes)
{
/// In compact parts we read all columns, because they all stored in a
/// single file
if (!isWidePart(source_part))
return updated_header.getNamesAndTypesList();
NameSet removed_columns;
NameToNameMap renamed_columns_to_from;
/// All commands are validated in AlterCommand so we don't care about order
for (const auto & command : commands_for_removes)
{
if (command.type == MutationCommand::DROP_COLUMN)
removed_columns.insert(command.column_name);
if (command.type == MutationCommand::RENAME_COLUMN)
renamed_columns_to_from.emplace(command.rename_to, command.column_name);
}
Names source_column_names = source_part->getColumns().getNames();
NameSet source_columns_name_set(source_column_names.begin(), source_column_names.end());
for (auto it = storage_columns.begin(); it != storage_columns.end();)
{
if (updated_header.has(it->name))
{
auto updated_type = updated_header.getByName(it->name).type;
if (updated_type != it->type)
it->type = updated_type;
++it;
}
else
{
if (!source_columns_name_set.count(it->name))
{
/// Source part doesn't have column but some other column
/// was renamed to it's name.
auto renamed_it = renamed_columns_to_from.find(it->name);
if (renamed_it != renamed_columns_to_from.end()
&& source_columns_name_set.count(renamed_it->second))
++it;
else
it = storage_columns.erase(it);
}
else
{
bool was_renamed = false;
bool was_removed = removed_columns.count(it->name);
/// Check that this column was renamed to some other name
for (const auto & [rename_to, rename_from] : renamed_columns_to_from)
{
if (rename_from == it->name)
{
was_renamed = true;
break;
}
}
/// If we want to rename this column to some other name, than it
/// should it's previous version should be dropped or removed
if (renamed_columns_to_from.count(it->name) && !was_renamed && !was_removed)
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Incorrect mutation commands, trying to rename column {} to {}, but part {} already has column {}", renamed_columns_to_from[it->name], it->name, source_part->name, it->name);
/// Column was renamed and no other column renamed to it's name
/// or column is dropped.
if (!renamed_columns_to_from.count(it->name) && (was_renamed || was_removed))
it = storage_columns.erase(it);
else
++it;
}
}
}
return storage_columns;
}
MergeTreeIndices MergeTreeDataMergerMutator::getIndicesForNewDataPart(
const IndicesDescription & all_indices,
const MutationCommands & commands_for_removes)
{
NameSet removed_indices;
for (const auto & command : commands_for_removes)
if (command.type == MutationCommand::DROP_INDEX)
removed_indices.insert(command.column_name);
MergeTreeIndices new_indices;
for (const auto & index : all_indices)
if (!removed_indices.count(index.name))
new_indices.push_back(MergeTreeIndexFactory::instance().get(index));
return new_indices;
}
MergeTreeProjections MergeTreeDataMergerMutator::getProjectionsForNewDataPart(
const ProjectionsDescription & all_projections,
const MutationCommands & commands_for_removes)
{
NameSet removed_projections;
for (const auto & command : commands_for_removes)
if (command.type == MutationCommand::DROP_PROJECTION)
removed_projections.insert(command.column_name);
MergeTreeProjections new_projections;
for (const auto & projection : all_projections)
if (!removed_projections.count(projection.name))
new_projections.push_back(MergeTreeProjectionFactory::instance().get(projection));
return new_projections;
}
std::set<MergeTreeIndexPtr> MergeTreeDataMergerMutator::getIndicesToRecalculate(
BlockInputStreamPtr & input_stream,
const NameSet & updated_columns,
const StorageMetadataPtr & metadata_snapshot,
ContextPtr context,
const NameSet & materialized_indices,
const MergeTreeData::DataPartPtr & source_part)
{
/// Checks if columns used in skipping indexes modified.
const auto & index_factory = MergeTreeIndexFactory::instance();
std::set<MergeTreeIndexPtr> indices_to_recalc;
ASTPtr indices_recalc_expr_list = std::make_shared<ASTExpressionList>();
const auto & indices = metadata_snapshot->getSecondaryIndices();
for (size_t i = 0; i < indices.size(); ++i)
{
const auto & index = indices[i];
// If we ask to materialize and it already exists
if (!source_part->checksums.has(INDEX_FILE_PREFIX + index.name + ".idx") && materialized_indices.count(index.name))
{
if (indices_to_recalc.insert(index_factory.get(index)).second)
{
ASTPtr expr_list = index.expression_list_ast->clone();
for (const auto & expr : expr_list->children)
indices_recalc_expr_list->children.push_back(expr->clone());
}
}
// If some dependent columns gets mutated
else
{
bool mutate = false;
const auto & index_cols = index.expression->getRequiredColumns();
for (const auto & col : index_cols)
{
if (updated_columns.count(col))
{
mutate = true;
break;
}
}
if (mutate && indices_to_recalc.insert(index_factory.get(index)).second)
{
ASTPtr expr_list = index.expression_list_ast->clone();
for (const auto & expr : expr_list->children)
indices_recalc_expr_list->children.push_back(expr->clone());
}
}
}
if (!indices_to_recalc.empty() && input_stream)
{
auto indices_recalc_syntax = TreeRewriter(context).analyze(indices_recalc_expr_list, input_stream->getHeader().getNamesAndTypesList());
auto indices_recalc_expr = ExpressionAnalyzer(
indices_recalc_expr_list,
indices_recalc_syntax, context).getActions(false);
/// We can update only one column, but some skip idx expression may depend on several
/// columns (c1 + c2 * c3). It works because this stream was created with help of
/// MutationsInterpreter which knows about skip indices and stream 'in' already has
/// all required columns.
/// TODO move this logic to single place.
input_stream = std::make_shared<MaterializingBlockInputStream>(
std::make_shared<ExpressionBlockInputStream>(input_stream, indices_recalc_expr));
}
return indices_to_recalc;
}
std::set<MergeTreeProjectionPtr> MergeTreeDataMergerMutator::getProjectionsToRecalculate(
const NameSet & updated_columns,
const StorageMetadataPtr & metadata_snapshot,
const NameSet & materialized_projections,
const MergeTreeData::DataPartPtr & source_part)
{
/// Checks if columns used in projections modified.
const auto & projection_factory = MergeTreeProjectionFactory::instance();
std::set<MergeTreeProjectionPtr> projections_to_recalc;
for (const auto & projection : metadata_snapshot->getProjections())
{
// If we ask to materialize and it doesn't exist
if (!source_part->checksums.has(projection.name + ".proj") && materialized_projections.count(projection.name))
{
projections_to_recalc.insert(projection_factory.get(projection));
}
else
{
// If some dependent columns gets mutated
bool mutate = false;
const auto & projection_cols = projection.required_columns;
for (const auto & col : projection_cols)
{
if (updated_columns.count(col))
{
mutate = true;
break;
}
}
if (mutate)
projections_to_recalc.insert(projection_factory.get(projection));
}
}
return projections_to_recalc;
}
bool MergeTreeDataMergerMutator::shouldExecuteTTL(const StorageMetadataPtr & metadata_snapshot, const Names & columns, const MutationCommands & commands)
{
if (!metadata_snapshot->hasAnyTTL())
return false;
for (const auto & command : commands)
if (command.type == MutationCommand::MATERIALIZE_TTL)
return true;
auto dependencies = metadata_snapshot->getColumnDependencies(NameSet(columns.begin(), columns.end()));
for (const auto & dependency : dependencies)
if (dependency.kind == ColumnDependency::TTL_EXPRESSION || dependency.kind == ColumnDependency::TTL_TARGET)
return true;
return false;
}
// 1. get projection pipeline and a sink to write parts
// 2. build an executor that can write block to the input stream (actually we can write through it to generate as many parts as possible)
// 3. finalize the pipeline so that all parts are merged into one part
void MergeTreeDataMergerMutator::writeWithProjections(
MergeTreeData::MutableDataPartPtr new_data_part,
const StorageMetadataPtr & metadata_snapshot,
const MergeTreeProjections & projections_to_build,
BlockInputStreamPtr mutating_stream,
IMergedBlockOutputStream & out,
time_t time_of_mutation,
MergeListEntry & merge_entry,
const ReservationPtr & space_reservation,
TableLockHolder & holder,
ContextPtr context,
IMergeTreeDataPart::MinMaxIndex * minmax_idx)
{
size_t block_num = 0;
std::map<String, MergeTreeData::MutableDataPartsVector> projection_parts;
Block block;
std::vector<SquashingTransform> projection_squashes;
for (size_t i = 0, size = projections_to_build.size(); i < size; ++i)
{
projection_squashes.emplace_back(65536, 65536 * 256);
}
while (checkOperationIsNotCanceled(merge_entry) && (block = mutating_stream->read()))
{
if (minmax_idx)
minmax_idx->update(block, data.getMinMaxColumnsNames(metadata_snapshot->getPartitionKey()));
out.write(block);
for (size_t i = 0, size = projections_to_build.size(); i < size; ++i)
{
const auto & projection = projections_to_build[i]->projection;
auto in = InterpreterSelectQuery(
projection.query_ast,
context,
Pipe(std::make_shared<SourceFromSingleChunk>(block, Chunk(block.getColumns(), block.rows()))),
SelectQueryOptions{
projection.type == "normal" ? QueryProcessingStage::FetchColumns : QueryProcessingStage::WithMergeableState})
.execute()
.getInputStream();
in = std::make_shared<SquashingBlockInputStream>(in, block.rows(), std::numeric_limits<UInt64>::max());
in->readPrefix();
auto & projection_squash = projection_squashes[i];
auto projection_block = projection_squash.add(in->read());
if (in->read())
throw Exception("Projection cannot increase the number of rows in a block", ErrorCodes::LOGICAL_ERROR);
in->readSuffix();
if (projection_block)
{
projection_parts[projection.name].emplace_back(
MergeTreeDataWriter::writeTempProjectionPart(data, log, projection_block, projection, new_data_part.get(), ++block_num));
}
}
merge_entry->rows_written += block.rows();
merge_entry->bytes_written_uncompressed += block.bytes();
}
// Write the last block
for (size_t i = 0, size = projections_to_build.size(); i < size; ++i)
{
const auto & projection = projections_to_build[i]->projection;
auto & projection_squash = projection_squashes[i];
auto projection_block = projection_squash.add({});
if (projection_block)
{
projection_parts[projection.name].emplace_back(
MergeTreeDataWriter::writeTempProjectionPart(data, log, projection_block, projection, new_data_part.get(), ++block_num));
}
}
const auto & projections = metadata_snapshot->projections;
for (auto && [name, parts] : projection_parts)
{
LOG_DEBUG(log, "Selected {} projection_parts from {} to {}", parts.size(), parts.front()->name, parts.back()->name);
const auto & projection = projections.get(name);
std::map<size_t, MergeTreeData::MutableDataPartsVector> level_parts;
size_t current_level = 0;
size_t next_level = 1;
level_parts[current_level] = std::move(parts);
size_t max_parts_to_merge_in_one_level = 10;
for (;;)
{
auto & current_level_parts = level_parts[current_level];
auto & next_level_parts = level_parts[next_level];
MergeTreeData::MutableDataPartsVector selected_parts;
while (selected_parts.size() < max_parts_to_merge_in_one_level && !current_level_parts.empty())
{
selected_parts.push_back(std::move(current_level_parts.back()));
current_level_parts.pop_back();
}
if (selected_parts.empty())
{
if (next_level_parts.empty())
{
LOG_WARNING(log, "There is no projection parts merged");
break;
}
current_level = next_level;
++next_level;
}
else if (selected_parts.size() == 1)
{
if (next_level_parts.empty())
{
LOG_DEBUG(log, "Merged a projection part in level {}", current_level);
selected_parts[0]->renameTo(projection.name + ".proj", true);
selected_parts[0]->name = projection.name;
selected_parts[0]->is_temp = false;
new_data_part->addProjectionPart(name, std::move(selected_parts[0]));
break;
}
else
{
LOG_DEBUG(log, "Forwarded part {} in level {} to next level", selected_parts[0]->name, current_level);
next_level_parts.push_back(std::move(selected_parts[0]));
}
}
else if (selected_parts.size() > 1)
{
// Generate a unique part name
++block_num;
FutureMergedMutatedPart projection_future_part;
MergeTreeData::DataPartsVector const_selected_parts(
std::make_move_iterator(selected_parts.begin()), std::make_move_iterator(selected_parts.end()));
projection_future_part.assign(std::move(const_selected_parts));
projection_future_part.name = fmt::format("{}_{}", projection.name, ++block_num);
projection_future_part.part_info = {"all", 0, 0, 0};
MergeTreeData::MergingParams projection_merging_params;
projection_merging_params.mode = MergeTreeData::MergingParams::Ordinary;
if (projection.type == "aggregate")
projection_merging_params.mode = MergeTreeData::MergingParams::Aggregating;
LOG_DEBUG(log, "Merged {} parts in level {} to {}", selected_parts.size(), current_level, projection_future_part.name);
next_level_parts.push_back(mergePartsToTemporaryPart(
projection_future_part,
projection.metadata,
merge_entry,
holder,
time_of_mutation,
context,
space_reservation,
false, // TODO Do we need deduplicate for projections
{},
projection_merging_params,
new_data_part.get(),
"tmp_merge_"));
next_level_parts.back()->is_temp = true;
}
}
}
}
void MergeTreeDataMergerMutator::mutateAllPartColumns(
MergeTreeData::MutableDataPartPtr new_data_part,
const StorageMetadataPtr & metadata_snapshot,
const MergeTreeIndices & skip_indices,
const MergeTreeProjections & projections_to_build,
BlockInputStreamPtr mutating_stream,
time_t time_of_mutation,
const CompressionCodecPtr & compression_codec,
MergeListEntry & merge_entry,
bool need_remove_expired_values,
bool need_sync,
const ReservationPtr & space_reservation,
TableLockHolder & holder,
ContextPtr context)
{
if (mutating_stream == nullptr)
throw Exception("Cannot mutate part columns with uninitialized mutations stream. It's a bug", ErrorCodes::LOGICAL_ERROR);
if (metadata_snapshot->hasPrimaryKey() || metadata_snapshot->hasSecondaryIndices())
mutating_stream = std::make_shared<MaterializingBlockInputStream>(
std::make_shared<ExpressionBlockInputStream>(mutating_stream, data.getPrimaryKeyAndSkipIndicesExpression(metadata_snapshot)));
if (need_remove_expired_values)
mutating_stream = std::make_shared<TTLBlockInputStream>(mutating_stream, data, metadata_snapshot, new_data_part, time_of_mutation, true);
IMergeTreeDataPart::MinMaxIndex minmax_idx;
MergedBlockOutputStream out{
new_data_part,
metadata_snapshot,
new_data_part->getColumns(),
skip_indices,
compression_codec};
mutating_stream->readPrefix();
out.writePrefix();
writeWithProjections(
new_data_part,
metadata_snapshot,
projections_to_build,
mutating_stream,
out,
time_of_mutation,
merge_entry,
space_reservation,
holder,
context,
&minmax_idx);
new_data_part->minmax_idx = std::move(minmax_idx);
mutating_stream->readSuffix();
out.writeSuffixAndFinalizePart(new_data_part, need_sync);
}
void MergeTreeDataMergerMutator::mutateSomePartColumns(
const MergeTreeDataPartPtr & source_part,
const StorageMetadataPtr & metadata_snapshot,
const std::set<MergeTreeIndexPtr> & indices_to_recalc,
const std::set<MergeTreeProjectionPtr> & projections_to_recalc,
const Block & mutation_header,
MergeTreeData::MutableDataPartPtr new_data_part,
BlockInputStreamPtr mutating_stream,
time_t time_of_mutation,
const CompressionCodecPtr & compression_codec,
MergeListEntry & merge_entry,
bool need_remove_expired_values,
bool need_sync,
const ReservationPtr & space_reservation,
TableLockHolder & holder,
ContextPtr context)
{
if (mutating_stream == nullptr)
throw Exception("Cannot mutate part columns with uninitialized mutations stream. It's a bug", ErrorCodes::LOGICAL_ERROR);
if (need_remove_expired_values)
mutating_stream = std::make_shared<TTLBlockInputStream>(mutating_stream, data, metadata_snapshot, new_data_part, time_of_mutation, true);
IMergedBlockOutputStream::WrittenOffsetColumns unused_written_offsets;
MergedColumnOnlyOutputStream out(
new_data_part,
metadata_snapshot,
mutation_header,
compression_codec,
std::vector<MergeTreeIndexPtr>(indices_to_recalc.begin(), indices_to_recalc.end()),
nullptr,
source_part->index_granularity,
&source_part->index_granularity_info
);
mutating_stream->readPrefix();
out.writePrefix();
std::vector<MergeTreeProjectionPtr> projections_to_build(projections_to_recalc.begin(), projections_to_recalc.end());
writeWithProjections(
new_data_part,
metadata_snapshot,
projections_to_build,
mutating_stream,
out,
time_of_mutation,
merge_entry,
space_reservation,
holder,
context);
mutating_stream->readSuffix();
auto changed_checksums = out.writeSuffixAndGetChecksums(new_data_part, new_data_part->checksums, need_sync);
new_data_part->checksums.add(std::move(changed_checksums));
}
void MergeTreeDataMergerMutator::finalizeMutatedPart(
const MergeTreeDataPartPtr & source_part,
MergeTreeData::MutableDataPartPtr new_data_part,
bool need_remove_expired_values,
const CompressionCodecPtr & codec)
{
auto disk = new_data_part->volume->getDisk();
if (new_data_part->uuid != UUIDHelpers::Nil)
{
auto out = disk->writeFile(new_data_part->getFullRelativePath() + IMergeTreeDataPart::UUID_FILE_NAME, 4096);
HashingWriteBuffer out_hashing(*out);
writeUUIDText(new_data_part->uuid, out_hashing);
new_data_part->checksums.files[IMergeTreeDataPart::UUID_FILE_NAME].file_size = out_hashing.count();
new_data_part->checksums.files[IMergeTreeDataPart::UUID_FILE_NAME].file_hash = out_hashing.getHash();
}
if (need_remove_expired_values)
{
/// Write a file with ttl infos in json format.
auto out_ttl = disk->writeFile(new_data_part->getFullRelativePath() + "ttl.txt", 4096);
HashingWriteBuffer out_hashing(*out_ttl);
new_data_part->ttl_infos.write(out_hashing);
new_data_part->checksums.files["ttl.txt"].file_size = out_hashing.count();
new_data_part->checksums.files["ttl.txt"].file_hash = out_hashing.getHash();
}
{
/// Write file with checksums.
auto out_checksums = disk->writeFile(new_data_part->getFullRelativePath() + "checksums.txt", 4096);
new_data_part->checksums.write(*out_checksums);
} /// close fd
{
auto out = disk->writeFile(new_data_part->getFullRelativePath() + IMergeTreeDataPart::DEFAULT_COMPRESSION_CODEC_FILE_NAME, 4096);
DB::writeText(queryToString(codec->getFullCodecDesc()), *out);
}
{
/// Write a file with a description of columns.
auto out_columns = disk->writeFile(new_data_part->getFullRelativePath() + "columns.txt", 4096);
new_data_part->getColumns().writeText(*out_columns);
} /// close fd
new_data_part->rows_count = source_part->rows_count;
new_data_part->index_granularity = source_part->index_granularity;
new_data_part->index = source_part->index;
new_data_part->minmax_idx = source_part->minmax_idx;
new_data_part->modification_time = time(nullptr);
new_data_part->loadProjections(false, false);
new_data_part->setBytesOnDisk(
MergeTreeData::DataPart::calculateTotalSizeOnDisk(new_data_part->volume->getDisk(), new_data_part->getFullRelativePath()));
new_data_part->default_codec = codec;
new_data_part->calculateColumnsSizesOnDisk();
new_data_part->storage.lockSharedData(*new_data_part);
}
bool MergeTreeDataMergerMutator::checkOperationIsNotCanceled(const MergeListEntry & merge_entry) const
{
if (merges_blocker.isCancelled() || merge_entry->is_cancelled)
throw Exception("Cancelled mutating parts", ErrorCodes::ABORTED);
return true;
}
}