Merge branch 'master' into sql-fetch-allow-without-offset

This commit is contained in:
Alexey Milovidov 2023-07-24 05:56:46 +02:00
commit 0c765ddc5c
17 changed files with 149 additions and 140 deletions

View File

@ -1,5 +1,6 @@
#include <limits>
#include <Common/Exception.h>
#include <Common/logger_useful.h>
#include <Common/PODArray.h>
#include <Common/checkStackSize.h>
#include <Common/OptimizedRegularExpression.h>
@ -423,6 +424,7 @@ void OptimizedRegularExpressionImpl<thread_safe>::analyze(
bool & is_trivial,
bool & required_substring_is_prefix,
std::vector<std::string> & alternatives)
try
{
Literals alternative_literals;
Literal required_literal;
@ -432,12 +434,20 @@ void OptimizedRegularExpressionImpl<thread_safe>::analyze(
for (auto & lit : alternative_literals)
alternatives.push_back(std::move(lit.literal));
}
catch (...)
{
required_substring = "";
is_trivial = false;
required_substring_is_prefix = false;
alternatives.clear();
LOG_ERROR(&Poco::Logger::get("OptimizeRegularExpression"), "Analyze RegularExpression failed, got error: {}", DB::getCurrentExceptionMessage(false));
}
template <bool thread_safe>
OptimizedRegularExpressionImpl<thread_safe>::OptimizedRegularExpressionImpl(const std::string & regexp_, int options)
{
std::vector<std::string> alternativesDummy; /// this vector extracts patterns a,b,c from pattern (a|b|c). for now it's not used.
analyze(regexp_, required_substring, is_trivial, required_substring_is_prefix, alternativesDummy);
std::vector<std::string> alternatives_dummy; /// this vector extracts patterns a,b,c from pattern (a|b|c). for now it's not used.
analyze(regexp_, required_substring, is_trivial, required_substring_is_prefix, alternatives_dummy);
/// Just three following options are supported

View File

@ -9,14 +9,14 @@ namespace DB
using ReadBufferIterator = std::function<std::unique_ptr<ReadBuffer>(ColumnsDescription &)>;
/// Try to determine the schema of the data in specifying format.
/// Try to determine the schema of the data in the specified format.
/// For formats that have an external schema reader, it will
/// use it and won't create a read buffer.
/// For formats that have a schema reader from the data,
/// read buffer will be created by the provided iterator and
/// the schema will be extracted from the data. If schema reader
/// couldn't determine the schema we will try the next read buffer
/// from provided iterator if it makes sense. If format doesn't
/// from the provided iterator if it makes sense. If the format doesn't
/// have any schema reader or we couldn't determine the schema,
/// an exception will be thrown.
ColumnsDescription readSchemaFromFormat(

View File

@ -382,7 +382,7 @@ MergeTreeData::MergeTreeData(
checkTTLExpressions(metadata_, metadata_);
String reason;
if (!canUsePolymorphicParts(*settings, &reason) && !reason.empty())
if (!canUsePolymorphicParts(*settings, reason) && !reason.empty())
LOG_WARNING(log, "{} Settings 'min_rows_for_wide_part'and 'min_bytes_for_wide_part' will be ignored.", reason);
#if !USE_ROCKSDB
@ -3323,7 +3323,7 @@ void MergeTreeData::checkAlterIsPossible(const AlterCommands & commands, Context
MergeTreeSettings copy = *getSettings();
copy.applyChange(changed_setting);
String reason;
if (!canUsePolymorphicParts(copy, &reason) && !reason.empty())
if (!canUsePolymorphicParts(copy, reason) && !reason.empty())
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Can't change settings. Reason: {}", reason);
}
@ -3348,7 +3348,7 @@ void MergeTreeData::checkAlterIsPossible(const AlterCommands & commands, Context
auto copy = getDefaultSettings();
copy->applyChanges(new_changes);
String reason;
if (!canUsePolymorphicParts(*copy, &reason) && !reason.empty())
if (!canUsePolymorphicParts(*copy, reason) && !reason.empty())
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Can't change settings. Reason: {}", reason);
}
@ -3390,8 +3390,9 @@ MergeTreeDataPartFormat MergeTreeData::choosePartFormat(size_t bytes_uncompresse
using PartType = MergeTreeDataPartType;
using PartStorageType = MergeTreeDataPartStorageType;
const auto settings = getSettings();
if (!canUsePolymorphicParts(*settings))
String out_reason;
const auto settings = getSettings();
if (!canUsePolymorphicParts(*settings, out_reason))
return {PartType::Wide, PartStorageType::Full};
auto satisfies = [&](const auto & min_bytes_for, const auto & min_rows_for)
@ -8010,22 +8011,23 @@ bool MergeTreeData::partsContainSameProjections(const DataPartPtr & left, const
bool MergeTreeData::canUsePolymorphicParts() const
{
return canUsePolymorphicParts(*getSettings(), nullptr);
String unused;
return canUsePolymorphicParts(*getSettings(), unused);
}
bool MergeTreeData::canUsePolymorphicParts(const MergeTreeSettings & settings, String * out_reason) const
bool MergeTreeData::canUsePolymorphicParts(const MergeTreeSettings & settings, String & out_reason) const
{
if (!canUseAdaptiveGranularity())
{
if (out_reason && (settings.min_rows_for_wide_part != 0 || settings.min_bytes_for_wide_part != 0
if ((settings.min_rows_for_wide_part != 0 || settings.min_bytes_for_wide_part != 0
|| settings.min_rows_for_compact_part != 0 || settings.min_bytes_for_compact_part != 0))
{
*out_reason = fmt::format(
"Table can't create parts with adaptive granularity, but settings"
" min_rows_for_wide_part = {}"
", min_bytes_for_wide_part = {}"
". Parts with non-adaptive granularity can be stored only in Wide (default) format.",
settings.min_rows_for_wide_part, settings.min_bytes_for_wide_part);
out_reason = fmt::format(
"Table can't create parts with adaptive granularity, but settings"
" min_rows_for_wide_part = {}"
", min_bytes_for_wide_part = {}"
". Parts with non-adaptive granularity can be stored only in Wide (default) format.",
settings.min_rows_for_wide_part, settings.min_bytes_for_wide_part);
}
return false;

View File

@ -1484,7 +1484,7 @@ private:
/// Check selected parts for movements. Used by ALTER ... MOVE queries.
CurrentlyMovingPartsTaggerPtr checkPartsForMove(const DataPartsVector & parts, SpacePtr space);
bool canUsePolymorphicParts(const MergeTreeSettings & settings, String * out_reason = nullptr) const;
bool canUsePolymorphicParts(const MergeTreeSettings & settings, String & out_reason) const;
std::mutex write_ahead_log_mutex;
WriteAheadLogPtr write_ahead_log;

View File

@ -136,7 +136,7 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMerge(
const AllowedMergingPredicate & can_merge_callback,
bool merge_with_ttl_allowed,
const MergeTreeTransactionPtr & txn,
String * out_disable_reason,
String & out_disable_reason,
const PartitionIdsHint * partitions_hint)
{
MergeTreeData::DataPartsVector data_parts = getDataPartsToSelectMergeFrom(txn, partitions_hint);
@ -145,8 +145,7 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMerge(
if (data_parts.empty())
{
if (out_disable_reason)
*out_disable_reason = "There are no parts in the table";
out_disable_reason = "There are no parts in the table";
return SelectPartsDecision::CANNOT_SELECT;
}
@ -154,8 +153,7 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMerge(
if (info.parts_selected_precondition == 0)
{
if (out_disable_reason)
*out_disable_reason = "No parts satisfy preconditions for merge";
out_disable_reason = "No parts satisfy preconditions for merge";
return SelectPartsDecision::CANNOT_SELECT;
}
@ -179,8 +177,7 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMerge(
/*optimize_skip_merged_partitions=*/true);
}
if (out_disable_reason)
*out_disable_reason = "There is no need to merge parts according to merge selector algorithm";
out_disable_reason = "There is no need to merge parts according to merge selector algorithm";
return SelectPartsDecision::CANNOT_SELECT;
}
@ -197,7 +194,8 @@ MergeTreeDataMergerMutator::PartitionIdsHint MergeTreeDataMergerMutator::getPart
auto metadata_snapshot = data.getInMemoryMetadataPtr();
MergeSelectingInfo info = getPossibleMergeRanges(data_parts, can_merge_callback, txn);
String out_reason;
MergeSelectingInfo info = getPossibleMergeRanges(data_parts, can_merge_callback, txn, out_reason);
if (info.parts_selected_precondition == 0)
return res;
@ -227,7 +225,7 @@ MergeTreeDataMergerMutator::PartitionIdsHint MergeTreeDataMergerMutator::getPart
/// This method should have been const, but something went wrong... it's const with dry_run = true
auto status = const_cast<MergeTreeDataMergerMutator *>(this)->selectPartsToMergeFromRanges(
future_part, /*aggressive*/ false, max_total_size_to_merge, merge_with_ttl_allowed,
metadata_snapshot, ranges_per_partition[i], info.current_time, &out_disable_reason,
metadata_snapshot, ranges_per_partition[i], info.current_time, out_disable_reason,
/* dry_run */ true);
if (status == SelectPartsDecision::SELECTED)
res.insert(all_partition_ids[i]);
@ -331,7 +329,7 @@ MergeTreeDataMergerMutator::MergeSelectingInfo MergeTreeDataMergerMutator::getPo
const MergeTreeData::DataPartsVector & data_parts,
const AllowedMergingPredicate & can_merge_callback,
const MergeTreeTransactionPtr & txn,
String * out_disable_reason) const
String & out_disable_reason) const
{
MergeSelectingInfo res;
@ -444,7 +442,7 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMergeFromRanges(
const StorageMetadataPtr & metadata_snapshot,
const IMergeSelector::PartsRanges & parts_ranges,
const time_t & current_time,
String * out_disable_reason,
String & out_disable_reason,
bool dry_run)
{
const auto data_settings = data.getSettings();
@ -515,8 +513,7 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMergeFromRanges(
if (parts_to_merge.empty())
{
if (out_disable_reason)
*out_disable_reason = "Did not find any parts to merge (with usual merge selectors)";
out_disable_reason = "Did not find any parts to merge (with usual merge selectors)";
return SelectPartsDecision::CANNOT_SELECT;
}
}
@ -563,22 +560,20 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectAllPartsToMergeWithinParti
bool final,
const StorageMetadataPtr & metadata_snapshot,
const MergeTreeTransactionPtr & txn,
String * out_disable_reason,
String & out_disable_reason,
bool optimize_skip_merged_partitions)
{
MergeTreeData::DataPartsVector parts = selectAllPartsFromPartition(partition_id);
if (parts.empty())
{
if (out_disable_reason)
*out_disable_reason = "There are no parts inside partition";
out_disable_reason = "There are no parts inside partition";
return SelectPartsDecision::CANNOT_SELECT;
}
if (!final && parts.size() == 1)
{
if (out_disable_reason)
*out_disable_reason = "There is only one part inside partition";
out_disable_reason = "There is only one part inside partition";
return SelectPartsDecision::CANNOT_SELECT;
}
@ -587,8 +582,7 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectAllPartsToMergeWithinParti
if (final && optimize_skip_merged_partitions && parts.size() == 1 && parts[0]->info.level > 0 &&
(!metadata_snapshot->hasAnyTTL() || parts[0]->checkAllTTLCalculated(metadata_snapshot)))
{
if (out_disable_reason)
*out_disable_reason = "Partition skipped due to optimize_skip_merged_partitions";
out_disable_reason = "Partition skipped due to optimize_skip_merged_partitions";
return SelectPartsDecision::NOTHING_TO_MERGE;
}
@ -629,9 +623,7 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectAllPartsToMergeWithinParti
static_cast<int>((DISK_USAGE_COEFFICIENT_TO_SELECT - 1.0) * 100));
}
if (out_disable_reason)
*out_disable_reason = fmt::format("Insufficient available disk space, required {}", ReadableSize(required_disk_space));
out_disable_reason = fmt::format("Insufficient available disk space, required {}", ReadableSize(required_disk_space));
return SelectPartsDecision::CANNOT_SELECT;
}

View File

@ -43,7 +43,7 @@ public:
using AllowedMergingPredicate = std::function<bool (const MergeTreeData::DataPartPtr &,
const MergeTreeData::DataPartPtr &,
const MergeTreeTransaction *,
String *)>;
String &)>;
explicit MergeTreeDataMergerMutator(MergeTreeData & data_);
@ -92,7 +92,7 @@ public:
const MergeTreeData::DataPartsVector & data_parts,
const AllowedMergingPredicate & can_merge_callback,
const MergeTreeTransactionPtr & txn,
String * out_disable_reason = nullptr) const;
String & out_disable_reason) const;
/// The third step of selecting parts to merge: takes ranges that we can merge, and selects parts that we want to merge
SelectPartsDecision selectPartsToMergeFromRanges(
@ -103,7 +103,7 @@ public:
const StorageMetadataPtr & metadata_snapshot,
const IMergeSelector::PartsRanges & parts_ranges,
const time_t & current_time,
String * out_disable_reason = nullptr,
String & out_disable_reason,
bool dry_run = false);
String getBestPartitionToOptimizeEntire(const PartitionsInfo & partitions_info) const;
@ -129,7 +129,7 @@ public:
const AllowedMergingPredicate & can_merge,
bool merge_with_ttl_allowed,
const MergeTreeTransactionPtr & txn,
String * out_disable_reason = nullptr,
String & out_disable_reason,
const PartitionIdsHint * partitions_hint = nullptr);
/** Select all the parts in the specified partition for merge, if possible.
@ -144,7 +144,7 @@ public:
bool final,
const StorageMetadataPtr & metadata_snapshot,
const MergeTreeTransactionPtr & txn,
String * out_disable_reason = nullptr,
String & out_disable_reason,
bool optimize_skip_merged_partitions = false);
/** Creates a task to merge parts.

View File

@ -160,7 +160,7 @@ struct Settings;
M(UInt64, min_marks_to_honor_max_concurrent_queries, 0, "Minimal number of marks to honor the MergeTree-level's max_concurrent_queries (0 - disabled). Queries will still be limited by other max_concurrent_queries settings.", 0) \
M(UInt64, min_bytes_to_rebalance_partition_over_jbod, 0, "Minimal amount of bytes to enable part rebalance over JBOD array (0 - disabled).", 0) \
M(Bool, check_sample_column_is_correct, true, "Check columns or columns by hash for sampling are unsigned integer.", 0) \
M(Bool, allow_vertical_merges_from_compact_to_wide_parts, false, "Allows vertical merges from compact to wide parts. This settings must have the same value on all replicas", 0) \
M(Bool, allow_vertical_merges_from_compact_to_wide_parts, true, "Allows vertical merges from compact to wide parts. This settings must have the same value on all replicas", 0) \
M(Bool, enable_the_endpoint_id_with_zookeeper_name_prefix, false, "Enable the endpoint id with zookeeper name prefix for the replicated merge tree table", 0) \
M(UInt64, zero_copy_merge_mutation_min_parts_size_sleep_before_lock, 1ULL * 1024 * 1024 * 1024, "If zero copy replication is enabled sleep random amount of time before trying to lock depending on parts size for merge or mutation", 0) \
\
@ -169,8 +169,9 @@ struct Settings;
M(UInt64, part_moves_between_shards_delay_seconds, 30, "Time to wait before/after moving parts between shards.", 0) \
M(Bool, use_metadata_cache, false, "Experimental feature to speed up parts loading process by using MergeTree metadata cache", 0) \
M(Bool, allow_remote_fs_zero_copy_replication, false, "Don't use this setting in production, because it is not ready.", 0) \
M(String, remote_fs_zero_copy_zookeeper_path, "/clickhouse/zero_copy", "ZooKeeper path for Zero-copy table-independet info.", 0) \
M(String, remote_fs_zero_copy_zookeeper_path, "/clickhouse/zero_copy", "ZooKeeper path for zero-copy table-independent info.", 0) \
M(Bool, remote_fs_zero_copy_path_compatible_mode, false, "Run zero-copy in compatible mode during conversion process.", 0) \
\
/** Compress marks and primary key. */ \
M(Bool, compress_marks, true, "Marks support compression, reduce mark file size and speed up network transmission.", 0) \
M(Bool, compress_primary_key, true, "Primary key support compression, reduce primary key file size and speed up network transmission.", 0) \

View File

@ -2245,7 +2245,7 @@ bool BaseMergePredicate<VirtualPartsT, MutationsStateT>::operator()(
const MergeTreeData::DataPartPtr & left,
const MergeTreeData::DataPartPtr & right,
const MergeTreeTransaction *,
String * out_reason) const
String & out_reason) const
{
if (left)
return canMergeTwoParts(left, right, out_reason);
@ -2257,7 +2257,7 @@ template<typename VirtualPartsT, typename MutationsStateT>
bool BaseMergePredicate<VirtualPartsT, MutationsStateT>::canMergeTwoParts(
const MergeTreeData::DataPartPtr & left,
const MergeTreeData::DataPartPtr & right,
String * out_reason) const
String & out_reason) const
{
/// A sketch of a proof of why this method actually works:
///
@ -2301,22 +2301,19 @@ bool BaseMergePredicate<VirtualPartsT, MutationsStateT>::canMergeTwoParts(
{
if (pinned_part_uuids_ && pinned_part_uuids_->part_uuids.contains(part->uuid))
{
if (out_reason)
*out_reason = "Part " + part->name + " has uuid " + toString(part->uuid) + " which is currently pinned";
out_reason = "Part " + part->name + " has uuid " + toString(part->uuid) + " which is currently pinned";
return false;
}
if (inprogress_quorum_part_ && part->name == *inprogress_quorum_part_)
{
if (out_reason)
*out_reason = "Quorum insert for part " + part->name + " is currently in progress";
out_reason = "Quorum insert for part " + part->name + " is currently in progress";
return false;
}
if (prev_virtual_parts_ && prev_virtual_parts_->getContainingPart(part->info).empty())
{
if (out_reason)
*out_reason = "Entry for part " + part->name + " hasn't been read from the replication log yet";
out_reason = "Entry for part " + part->name + " hasn't been read from the replication log yet";
return false;
}
}
@ -2330,8 +2327,7 @@ bool BaseMergePredicate<VirtualPartsT, MutationsStateT>::canMergeTwoParts(
{
if (partition_ids_hint && !partition_ids_hint->contains(left->info.partition_id))
{
if (out_reason)
*out_reason = fmt::format("Uncommitted block were not loaded for unexpected partition {}", left->info.partition_id);
out_reason = fmt::format("Uncommitted block were not loaded for unexpected partition {}", left->info.partition_id);
return false;
}
@ -2343,10 +2339,8 @@ bool BaseMergePredicate<VirtualPartsT, MutationsStateT>::canMergeTwoParts(
auto block_it = block_numbers.upper_bound(left_max_block);
if (block_it != block_numbers.end() && *block_it < right_min_block)
{
if (out_reason)
*out_reason = "Block number " + toString(*block_it) + " is still being inserted between parts "
+ left->name + " and " + right->name;
out_reason = "Block number " + toString(*block_it) + " is still being inserted between parts "
+ left->name + " and " + right->name;
return false;
}
}
@ -2365,8 +2359,7 @@ bool BaseMergePredicate<VirtualPartsT, MutationsStateT>::canMergeTwoParts(
String containing_part = virtual_parts_->getContainingPart(part->info);
if (containing_part != part->name)
{
if (out_reason)
*out_reason = "Part " + part->name + " has already been assigned a merge into " + containing_part;
out_reason = "Part " + part->name + " has already been assigned a merge into " + containing_part;
return false;
}
}
@ -2383,10 +2376,9 @@ bool BaseMergePredicate<VirtualPartsT, MutationsStateT>::canMergeTwoParts(
Strings covered = virtual_parts_->getPartsCoveredBy(gap_part_info);
if (!covered.empty())
{
if (out_reason)
*out_reason = "There are " + toString(covered.size()) + " parts (from " + covered.front()
+ " to " + covered.back() + ") that are still not present or being processed by "
+ " other background process on this replica between " + left->name + " and " + right->name;
out_reason = "There are " + toString(covered.size()) + " parts (from " + covered.front()
+ " to " + covered.back() + ") that are still not present or being processed by "
+ " other background process on this replica between " + left->name + " and " + right->name;
return false;
}
}
@ -2402,9 +2394,8 @@ bool BaseMergePredicate<VirtualPartsT, MutationsStateT>::canMergeTwoParts(
if (left_mutation_ver != right_mutation_ver)
{
if (out_reason)
*out_reason = "Current mutation versions of parts " + left->name + " and " + right->name + " differ: "
+ toString(left_mutation_ver) + " and " + toString(right_mutation_ver) + " respectively";
out_reason = "Current mutation versions of parts " + left->name + " and " + right->name + " differ: "
+ toString(left_mutation_ver) + " and " + toString(right_mutation_ver) + " respectively";
return false;
}
}
@ -2415,26 +2406,23 @@ bool BaseMergePredicate<VirtualPartsT, MutationsStateT>::canMergeTwoParts(
template<typename VirtualPartsT, typename MutationsStateT>
bool BaseMergePredicate<VirtualPartsT, MutationsStateT>::canMergeSinglePart(
const MergeTreeData::DataPartPtr & part,
String * out_reason) const
String & out_reason) const
{
if (pinned_part_uuids_ && pinned_part_uuids_->part_uuids.contains(part->uuid))
{
if (out_reason)
*out_reason = fmt::format("Part {} has uuid {} which is currently pinned", part->name, part->uuid);
out_reason = fmt::format("Part {} has uuid {} which is currently pinned", part->name, part->uuid);
return false;
}
if (inprogress_quorum_part_ && part->name == *inprogress_quorum_part_)
{
if (out_reason)
*out_reason = fmt::format("Quorum insert for part {} is currently in progress", part->name);
out_reason = fmt::format("Quorum insert for part {} is currently in progress", part->name);
return false;
}
if (prev_virtual_parts_ && prev_virtual_parts_->getContainingPart(part->info).empty())
{
if (out_reason)
*out_reason = fmt::format("Entry for part {} hasn't been read from the replication log yet", part->name);
out_reason = fmt::format("Entry for part {} hasn't been read from the replication log yet", part->name);
return false;
}
@ -2449,8 +2437,7 @@ bool BaseMergePredicate<VirtualPartsT, MutationsStateT>::canMergeSinglePart(
String containing_part = virtual_parts_->getContainingPart(part->info);
if (containing_part != part->name)
{
if (out_reason)
*out_reason = fmt::format("Part {} has already been assigned a merge into {}", part->name, containing_part);
out_reason = fmt::format("Part {} has already been assigned a merge into {}", part->name, containing_part);
return false;
}
}
@ -2459,7 +2446,7 @@ bool BaseMergePredicate<VirtualPartsT, MutationsStateT>::canMergeSinglePart(
}
bool ReplicatedMergeTreeMergePredicate::partParticipatesInReplaceRange(const MergeTreeData::DataPartPtr & part, String * out_reason) const
bool ReplicatedMergeTreeMergePredicate::partParticipatesInReplaceRange(const MergeTreeData::DataPartPtr & part, String & out_reason) const
{
std::lock_guard lock(queue.state_mutex);
for (const auto & entry : queue.queue)
@ -2472,9 +2459,7 @@ bool ReplicatedMergeTreeMergePredicate::partParticipatesInReplaceRange(const Mer
if (part->info.isDisjoint(MergeTreePartInfo::fromPartName(part_name, queue.format_version)))
continue;
if (out_reason)
*out_reason = fmt::format("Part {} participates in REPLACE_RANGE {} ({})", part_name, entry->new_part_name, entry->znode_name);
out_reason = fmt::format("Part {} participates in REPLACE_RANGE {} ({})", part_name, entry->new_part_name, entry->znode_name);
return true;
}
}

View File

@ -505,19 +505,19 @@ public:
bool operator()(const MergeTreeData::DataPartPtr & left,
const MergeTreeData::DataPartPtr & right,
const MergeTreeTransaction * txn,
String * out_reason = nullptr) const;
String & out_reason) const;
/// Can we assign a merge with these two parts?
/// (assuming that no merge was assigned after the predicate was constructed)
/// If we can't and out_reason is not nullptr, set it to the reason why we can't merge.
bool canMergeTwoParts(const MergeTreeData::DataPartPtr & left,
const MergeTreeData::DataPartPtr & right,
String * out_reason = nullptr) const;
String & out_reason) const;
/// Can we assign a merge this part and some other part?
/// For example a merge of a part and itself is needed for TTL.
/// This predicate is checked for the first part of each range.
bool canMergeSinglePart(const MergeTreeData::DataPartPtr & part, String * out_reason) const;
bool canMergeSinglePart(const MergeTreeData::DataPartPtr & part, String & out_reason) const;
CommittingBlocks getCommittingBlocks(zkutil::ZooKeeperPtr & zookeeper, const std::string & zookeeper_path, Poco::Logger * log_);
@ -561,7 +561,7 @@ public:
/// Returns true if part is needed for some REPLACE_RANGE entry.
/// We should not drop part in this case, because replication queue may stuck without that part.
bool partParticipatesInReplaceRange(const MergeTreeData::DataPartPtr & part, String * out_reason) const;
bool partParticipatesInReplaceRange(const MergeTreeData::DataPartPtr & part, String & out_reason) const;
/// Return nonempty optional of desired mutation version and alter version.
/// If we have no alter (modify/drop) mutations in mutations queue, than we return biggest possible

View File

@ -45,6 +45,7 @@
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
#include <fmt/core.h>
namespace DB
{
@ -857,7 +858,7 @@ MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMerge(
bool aggressive,
const String & partition_id,
bool final,
String * out_disable_reason,
String & out_disable_reason,
TableLockHolder & /* table_lock_holder */,
std::unique_lock<std::mutex> & lock,
const MergeTreeTransactionPtr & txn,
@ -875,7 +876,7 @@ MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMerge(
CurrentlyMergingPartsTaggerPtr merging_tagger;
MergeList::EntryPtr merge_entry;
auto can_merge = [this, &lock](const DataPartPtr & left, const DataPartPtr & right, const MergeTreeTransaction * tx, String * disable_reason) -> bool
auto can_merge = [this, &lock](const DataPartPtr & left, const DataPartPtr & right, const MergeTreeTransaction * tx, String & disable_reason) -> bool
{
if (tx)
{
@ -884,8 +885,7 @@ MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMerge(
if ((left && !left->version.isVisible(tx->getSnapshot(), Tx::EmptyTID))
|| (right && !right->version.isVisible(tx->getSnapshot(), Tx::EmptyTID)))
{
if (disable_reason)
*disable_reason = "Some part is not visible in transaction";
disable_reason = "Some part is not visible in transaction";
return false;
}
@ -893,8 +893,7 @@ MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMerge(
if ((left && left->version.isRemovalTIDLocked())
|| (right && right->version.isRemovalTIDLocked()))
{
if (disable_reason)
*disable_reason = "Some part is locked for removal in another cuncurrent transaction";
disable_reason = "Some part is locked for removal in another cuncurrent transaction";
return false;
}
}
@ -905,8 +904,7 @@ MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMerge(
{
if (currently_merging_mutating_parts.contains(right))
{
if (disable_reason)
*disable_reason = "Some part currently in a merging or mutating process";
disable_reason = "Some part currently in a merging or mutating process";
return false;
}
else
@ -915,30 +913,26 @@ MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMerge(
if (currently_merging_mutating_parts.contains(left) || currently_merging_mutating_parts.contains(right))
{
if (disable_reason)
*disable_reason = "Some part currently in a merging or mutating process";
disable_reason = "Some part currently in a merging or mutating process";
return false;
}
if (getCurrentMutationVersion(left, lock) != getCurrentMutationVersion(right, lock))
{
if (disable_reason)
*disable_reason = "Some parts have differ mmutatuon version";
disable_reason = "Some parts have differ mmutatuon version";
return false;
}
if (!partsContainSameProjections(left, right))
{
if (disable_reason)
*disable_reason = "Some parts contains differ projections";
disable_reason = "Some parts contains differ projections";
return false;
}
auto max_possible_level = getMaxLevelInBetween(left, right);
if (max_possible_level > std::max(left->info.level, right->info.level))
{
if (disable_reason)
*disable_reason = fmt::format("There is an outdated part in a gap between two active parts ({}, {}) with merge level {} higher than these active parts have", left->name, right->name, max_possible_level);
disable_reason = fmt::format("There is an outdated part in a gap between two active parts ({}, {}) with merge level {} higher than these active parts have", left->name, right->name, max_possible_level);
return false;
}
@ -947,14 +941,13 @@ MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMerge(
SelectPartsDecision select_decision = SelectPartsDecision::CANNOT_SELECT;
auto is_background_memory_usage_ok = [](String * disable_reason) -> bool
auto is_background_memory_usage_ok = [](String & disable_reason) -> bool
{
if (canEnqueueBackgroundTask())
return true;
if (disable_reason)
*disable_reason = fmt::format("Current background tasks memory usage ({}) is more than the limit ({})",
formatReadableSizeWithBinarySuffix(background_memory_tracker.get()),
formatReadableSizeWithBinarySuffix(background_memory_tracker.getSoftLimit()));
disable_reason = fmt::format("Current background tasks memory usage ({}) is more than the limit ({})",
formatReadableSizeWithBinarySuffix(background_memory_tracker.get()),
formatReadableSizeWithBinarySuffix(background_memory_tracker.getSoftLimit()));
return false;
};
@ -979,8 +972,8 @@ MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMerge(
txn,
out_disable_reason);
}
else if (out_disable_reason)
*out_disable_reason = "Current value of max_source_parts_size is zero";
else
out_disable_reason = "Current value of max_source_parts_size is zero";
}
}
else
@ -1014,15 +1007,14 @@ MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMerge(
/// If final - we will wait for currently processing merges to finish and continue.
if (final
&& select_decision != SelectPartsDecision::SELECTED
&& !currently_merging_mutating_parts.empty()
&& out_disable_reason)
&& !currently_merging_mutating_parts.empty())
{
LOG_DEBUG(log, "Waiting for currently running merges ({} parts are merging right now) to perform OPTIMIZE FINAL",
currently_merging_mutating_parts.size());
if (std::cv_status::timeout == currently_processing_in_background_condition.wait_for(lock, timeout))
{
*out_disable_reason = fmt::format("Timeout ({} ms) while waiting for already running merges before running OPTIMIZE with FINAL", timeout_ms);
out_disable_reason = fmt::format("Timeout ({} ms) while waiting for already running merges before running OPTIMIZE with FINAL", timeout_ms);
break;
}
}
@ -1038,14 +1030,9 @@ MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMerge(
if (select_decision != SelectPartsDecision::SELECTED)
{
if (out_disable_reason)
{
if (!out_disable_reason->empty())
{
*out_disable_reason += ". ";
}
*out_disable_reason += "Cannot select parts for optimization";
}
if (!out_disable_reason.empty())
out_disable_reason += ". ";
out_disable_reason += "Cannot select parts for optimization";
return {};
}
@ -1066,7 +1053,7 @@ bool StorageMergeTree::merge(
const Names & deduplicate_by_columns,
bool cleanup,
const MergeTreeTransactionPtr & txn,
String * out_disable_reason,
String & out_disable_reason,
bool optimize_skip_merged_partitions)
{
auto table_lock_holder = lockForShare(RWLockImpl::NO_QUERY, getSettings()->lock_acquire_timeout_for_background_operations);
@ -1121,7 +1108,7 @@ bool StorageMergeTree::partIsAssignedToBackgroundOperation(const DataPartPtr & p
}
MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMutate(
const StorageMetadataPtr & metadata_snapshot, String * /* disable_reason */, TableLockHolder & /* table_lock_holder */,
const StorageMetadataPtr & metadata_snapshot, String & /* disable_reason */, TableLockHolder & /* table_lock_holder */,
std::unique_lock<std::mutex> & /*currently_processing_in_background_mutex_lock*/)
{
if (current_mutations_by_version.empty())
@ -1322,10 +1309,11 @@ bool StorageMergeTree::scheduleDataProcessingJob(BackgroundJobsAssignee & assign
if (merger_mutator.merges_blocker.isCancelled())
return false;
merge_entry = selectPartsToMerge(metadata_snapshot, false, {}, false, nullptr, shared_lock, lock, txn);
String out_reason;
merge_entry = selectPartsToMerge(metadata_snapshot, false, {}, false, out_reason, shared_lock, lock, txn);
if (!merge_entry && !current_mutations_by_version.empty())
mutate_entry = selectPartsToMutate(metadata_snapshot, nullptr, shared_lock, lock);
mutate_entry = selectPartsToMutate(metadata_snapshot, out_reason, shared_lock, lock);
has_mutations = !current_mutations_by_version.empty();
}
@ -1528,7 +1516,7 @@ bool StorageMergeTree::optimize(
deduplicate_by_columns,
cleanup,
txn,
&disable_reason,
disable_reason,
local_context->getSettingsRef().optimize_skip_merged_partitions))
{
constexpr auto message = "Cannot OPTIMIZE table: {}";
@ -1556,7 +1544,7 @@ bool StorageMergeTree::optimize(
deduplicate_by_columns,
cleanup,
txn,
&disable_reason,
disable_reason,
local_context->getSettingsRef().optimize_skip_merged_partitions))
{
constexpr auto message = "Cannot OPTIMIZE table: {}";

View File

@ -176,7 +176,7 @@ private:
const Names & deduplicate_by_columns,
bool cleanup,
const MergeTreeTransactionPtr & txn,
String * out_disable_reason = nullptr,
String & out_disable_reason,
bool optimize_skip_merged_partitions = false);
void renameAndCommitEmptyParts(MutableDataPartsVector & new_parts, Transaction & transaction);
@ -203,7 +203,7 @@ private:
bool aggressive,
const String & partition_id,
bool final,
String * disable_reason,
String & disable_reason,
TableLockHolder & table_lock_holder,
std::unique_lock<std::mutex> & lock,
const MergeTreeTransactionPtr & txn,
@ -212,7 +212,7 @@ private:
MergeMutateSelectedEntryPtr selectPartsToMutate(
const StorageMetadataPtr & metadata_snapshot, String * disable_reason,
const StorageMetadataPtr & metadata_snapshot, String & disable_reason,
TableLockHolder & table_lock_holder, std::unique_lock<std::mutex> & currently_processing_in_background_mutex_lock);
/// For current mutations queue, returns maximum version of mutation for a part,

View File

@ -3467,9 +3467,10 @@ void StorageReplicatedMergeTree::mergeSelectingTask()
merge_pred.emplace(queue.getMergePredicate(zookeeper, partitions_to_merge_in));
}
String out_reason;
if (can_assign_merge &&
merger_mutator.selectPartsToMerge(future_merged_part, false, max_source_parts_size_for_merge, *merge_pred,
merge_with_ttl_allowed, NO_TRANSACTION_PTR, nullptr, &partitions_to_merge_in) == SelectPartsDecision::SELECTED)
merge_with_ttl_allowed, NO_TRANSACTION_PTR, out_reason, &partitions_to_merge_in) == SelectPartsDecision::SELECTED)
{
create_result = createLogEntryToMergeParts(
zookeeper,
@ -5232,13 +5233,13 @@ bool StorageReplicatedMergeTree::optimize(
{
select_decision = merger_mutator.selectPartsToMerge(
future_merged_part, /* aggressive */ true, storage_settings_ptr->max_bytes_to_merge_at_max_space_in_pool,
can_merge, /* merge_with_ttl_allowed */ false, NO_TRANSACTION_PTR, &disable_reason);
can_merge, /* merge_with_ttl_allowed */ false, NO_TRANSACTION_PTR, disable_reason);
}
else
{
select_decision = merger_mutator.selectAllPartsToMergeWithinPartition(
future_merged_part, can_merge, partition_id, final, metadata_snapshot, NO_TRANSACTION_PTR,
&disable_reason, query_context->getSettingsRef().optimize_skip_merged_partitions);
disable_reason, query_context->getSettingsRef().optimize_skip_merged_partitions);
}
/// If there is nothing to merge then we treat this merge as successful (needed for optimize final optimization)
@ -7777,7 +7778,7 @@ void StorageReplicatedMergeTree::movePartitionToShard(
/// canMergeSinglePart is overlapping with dropPart, let's try to use the same code.
String out_reason;
if (!merge_pred.canMergeSinglePart(part, &out_reason))
if (!merge_pred.canMergeSinglePart(part, out_reason))
throw Exception(ErrorCodes::PART_IS_TEMPORARILY_LOCKED, "Part is busy, reason: {}", out_reason);
}
@ -8025,14 +8026,14 @@ bool StorageReplicatedMergeTree::dropPartImpl(
/// There isn't a lot we can do otherwise. Can't cancel merges because it is possible that a replica already
/// finished the merge.
String out_reason;
if (!merge_pred.canMergeSinglePart(part, &out_reason))
if (!merge_pred.canMergeSinglePart(part, out_reason))
{
if (throw_if_noop)
throw Exception::createDeprecated(out_reason, ErrorCodes::PART_IS_TEMPORARILY_LOCKED);
return false;
}
if (merge_pred.partParticipatesInReplaceRange(part, &out_reason))
if (merge_pred.partParticipatesInReplaceRange(part, out_reason))
{
if (throw_if_noop)
throw Exception::createDeprecated(out_reason, ErrorCodes::PART_IS_TEMPORARILY_LOCKED);

View File

@ -0,0 +1,5 @@
<clickhouse>
<merge_tree>
<allow_vertical_merges_from_compact_to_wide_parts>0</allow_vertical_merges_from_compact_to_wide_parts>
</merge_tree>
</clickhouse>

View File

@ -15,7 +15,10 @@ node_old = cluster.add_instance(
)
node_new = cluster.add_instance(
"node2",
main_configs=["configs/no_compress_marks.xml"],
main_configs=[
"configs/no_compress_marks.xml",
"configs/no_allow_vertical_merges_from_compact_to_wide_parts.xml",
],
with_zookeeper=True,
stay_alive=True,
allow_analyzer=False,

View File

@ -0,0 +1,22 @@
SELECT
'0',
toTypeName(materialize(js2.s))
FROM
(
SELECT number AS k
FROM numbers(100)
) AS js1
FULL OUTER JOIN
(
SELECT
toLowCardinality(2147483647 + 256) AS k,
'-0.0000000001',
1024,
toString(number + 10) AS s
FROM numbers(1024)
) AS js2 ON js1.k = js2.k
ORDER BY
inf DESC NULLS FIRST,
js1.k ASC NULLS LAST,
js2.k ASC
FORMAT `Null`

View File

@ -1 +1 @@
SELECT match('', repeat('(', 100000)); -- { serverError 306 }
SELECT match('', repeat('(', 100000)); -- { serverError 427 }