Merge branch 'master' into improve_greatest_least_nullable_type

This commit is contained in:
kevinyhzou 2024-06-06 14:08:15 +08:00
commit 3169217fb7
17 changed files with 201 additions and 84 deletions

View File

@ -160,8 +160,8 @@ class IColumn;
M(Bool, enable_multiple_prewhere_read_steps, true, "Move more conditions from WHERE to PREWHERE and do reads from disk and filtering in multiple steps if there are multiple conditions combined with AND", 0) \
M(Bool, move_primary_key_columns_to_end_of_prewhere, true, "Move PREWHERE conditions containing primary key columns to the end of AND chain. It is likely that these conditions are taken into account during primary key analysis and thus will not contribute a lot to PREWHERE filtering.", 0) \
\
M(Bool, allow_statistics_optimize, false, "Allows using statistics to optimize queries", 0) \
M(Bool, allow_experimental_statistics, false, "Allows using statistics", 0) \
M(Bool, allow_statistics_optimize, false, "Allows using statistics to optimize queries", 0) ALIAS(allow_statistic_optimize) \
M(Bool, allow_experimental_statistics, false, "Allows using statistics", 0) ALIAS(allow_experimental_statistic) \
\
M(UInt64, alter_sync, 1, "Wait for actions to manipulate the partitions. 0 - do not wait, 1 - wait for execution only of itself, 2 - wait for everyone.", 0) ALIAS(replication_alter_partitions_sync) \
M(Int64, replication_wait_for_inactive_replica_timeout, 120, "Wait for inactive replica to execute ALTER/OPTIMIZE. Time in seconds, 0 - do not wait, negative - wait for unlimited time.", 0) \

View File

@ -97,6 +97,10 @@ static std::map<ClickHouseVersion, SettingsChangesHistory::SettingsChanges> sett
{"azure_ignore_file_doesnt_exist", false, false, "Allow to return 0 rows when the requested files don't exist instead of throwing an exception in AzureBlobStorage table engine"},
{"s3_ignore_file_doesnt_exist", false, false, "Allow to return 0 rows when the requested files don't exist instead of throwing an exception in S3 table engine"},
{"enable_blob_storage_log", true, true, "Write information about blob storage operations to system.blob_storage_log table"},
{"allow_statistic_optimize", false, false, "Old setting which popped up here being renamed."},
{"allow_experimental_statistic", false, false, "Old setting which popped up here being renamed."},
{"allow_statistics_optimize", false, false, "The setting was renamed. The previous name is `allow_statistic_optimize`."},
{"allow_experimental_statistics", false, false, "The setting was renamed. The previous name is `allow_experimental_statistic`."}
}},
{"24.5", {{"allow_deprecated_error_prone_window_functions", true, false, "Allow usage of deprecated error prone window functions (neighbor, runningAccumulate, runningDifferenceStartingWithFirstValue, runningDifference)"},
{"allow_experimental_join_condition", false, false, "Support join with inequal conditions which involve columns from both left and right table. e.g. t1.y < t2.y."},

View File

@ -36,30 +36,24 @@ void IObjectStorageIteratorAsync::deactivate()
void IObjectStorageIteratorAsync::nextBatch()
{
std::lock_guard lock(mutex);
if (is_finished)
{
current_batch.clear();
current_batch_iterator = current_batch.begin();
return;
}
else
{
if (!is_initialized)
{
outcome_future = scheduleBatch();
is_initialized = true;
}
if (!is_initialized)
{
outcome_future = scheduleBatch();
is_initialized = true;
}
try
{
chassert(outcome_future.valid());
BatchAndHasNext result;
try
{
result = outcome_future.get();
}
catch (...)
{
is_finished = true;
throw;
}
BatchAndHasNext result = outcome_future.get();
current_batch = std::move(result.batch);
current_batch_iterator = current_batch.begin();
@ -71,6 +65,11 @@ void IObjectStorageIteratorAsync::nextBatch()
else
is_finished = true;
}
catch (...)
{
is_finished = true;
throw;
}
}
void IObjectStorageIteratorAsync::next()
@ -95,35 +94,39 @@ std::future<IObjectStorageIteratorAsync::BatchAndHasNext> IObjectStorageIterator
bool IObjectStorageIteratorAsync::isValid()
{
std::lock_guard lock(mutex);
if (!is_initialized)
nextBatch();
std::lock_guard lock(mutex);
return current_batch_iterator != current_batch.end();
}
RelativePathWithMetadataPtr IObjectStorageIteratorAsync::current()
{
std::lock_guard lock(mutex);
if (!isValid())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Trying to access invalid iterator");
std::lock_guard lock(mutex);
return *current_batch_iterator;
}
RelativePathsWithMetadata IObjectStorageIteratorAsync::currentBatch()
{
std::lock_guard lock(mutex);
if (!isValid())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Trying to access invalid iterator");
std::lock_guard lock(mutex);
return current_batch;
}
std::optional<RelativePathsWithMetadata> IObjectStorageIteratorAsync::getCurrentBatchAndScheduleNext()
{
std::lock_guard lock(mutex);
if (!is_initialized)
nextBatch();

View File

@ -59,6 +59,16 @@ std::string DataPartStorageOnDiskBase::getRelativePath() const
return fs::path(root_path) / part_dir / "";
}
std::string DataPartStorageOnDiskBase::getParentDirectory() const
{
/// Cut last "/" if it exists (it shouldn't). Otherwise fs::path behave differently.
fs::path part_dir_without_slash = part_dir.ends_with("/") ? part_dir.substr(0, part_dir.size() - 1) : part_dir;
if (part_dir_without_slash.has_parent_path())
return part_dir_without_slash.parent_path();
return "";
}
std::optional<String> DataPartStorageOnDiskBase::getRelativePathForPrefix(LoggerPtr log, const String & prefix, bool detached, bool broken) const
{
assert(!broken || detached);
@ -674,9 +684,9 @@ void DataPartStorageOnDiskBase::remove(
if (!has_delete_prefix)
{
if (part_dir_without_slash.has_parent_path())
auto parent_path = getParentDirectory();
if (!parent_path.empty())
{
auto parent_path = part_dir_without_slash.parent_path();
if (parent_path == MergeTreeData::DETACHED_DIR_NAME)
throw Exception(
ErrorCodes::LOGICAL_ERROR,
@ -684,7 +694,7 @@ void DataPartStorageOnDiskBase::remove(
part_dir,
root_path);
part_dir_without_slash = parent_path / ("delete_tmp_" + std::string{part_dir_without_slash.filename()});
part_dir_without_slash = fs::path(parent_path) / ("delete_tmp_" + std::string{part_dir_without_slash.filename()});
}
else
{

View File

@ -20,6 +20,7 @@ public:
std::string getRelativePath() const override;
std::string getPartDirectory() const override;
std::string getFullRootPath() const override;
std::string getParentDirectory() const override;
Poco::Timestamp getLastModified() const override;
UInt64 calculateTotalSizeOnDisk() const override;

View File

@ -96,11 +96,12 @@ public:
virtual MergeTreeDataPartStorageType getType() const = 0;
/// Methods to get path components of a data part.
virtual std::string getFullPath() const = 0; /// '/var/lib/clickhouse/data/database/table/moving/all_1_5_1'
virtual std::string getRelativePath() const = 0; /// 'database/table/moving/all_1_5_1'
virtual std::string getPartDirectory() const = 0; /// 'all_1_5_1'
virtual std::string getFullRootPath() const = 0; /// '/var/lib/clickhouse/data/database/table/moving'
/// Can add it if needed /// 'database/table/moving'
virtual std::string getFullPath() const = 0; /// '/var/lib/clickhouse/data/database/table/moving/all_1_5_1'
virtual std::string getRelativePath() const = 0; /// 'database/table/moving/all_1_5_1'
virtual std::string getPartDirectory() const = 0; /// 'all_1_5_1'
virtual std::string getFullRootPath() const = 0; /// '/var/lib/clickhouse/data/database/table/moving'
virtual std::string getParentDirectory() const = 0; /// '' (or 'detached' for 'detached/all_1_5_1')
/// Can add it if needed /// 'database/table/moving'
/// virtual std::string getRelativeRootPath() const = 0;
/// Get a storage for projection.

View File

@ -737,7 +737,11 @@ void IMergeTreeDataPart::loadColumnsChecksumsIndexes(bool require_columns_checks
{
/// Don't scare people with broken part error
if (!isRetryableException(std::current_exception()))
LOG_ERROR(storage.log, "Part {} is broken and need manual correction", getDataPartStorage().getFullPath());
{
auto message = getCurrentExceptionMessage(true);
LOG_ERROR(storage.log, "Part {} is broken and need manual correction. Reason: {}",
getDataPartStorage().getFullPath(), message);
}
// There could be conditions that data part to be loaded is broken, but some of meta infos are already written
// into meta data before exception, need to clean them all.

View File

@ -3894,7 +3894,7 @@ void MergeTreeData::checkPartDynamicColumns(MutableDataPartPtr & part, DataParts
}
}
void MergeTreeData::preparePartForCommit(MutableDataPartPtr & part, Transaction & out_transaction, bool need_rename)
void MergeTreeData::preparePartForCommit(MutableDataPartPtr & part, Transaction & out_transaction, bool need_rename, bool rename_in_transaction)
{
part->is_temp = false;
part->setState(DataPartState::PreActive);
@ -3906,12 +3906,15 @@ void MergeTreeData::preparePartForCommit(MutableDataPartPtr & part, Transaction
return !may_be_cleaned_up || temporary_parts.contains(dir_name);
}());
if (need_rename)
if (need_rename && !rename_in_transaction)
part->renameTo(part->name, true);
LOG_TEST(log, "preparePartForCommit: inserting {} into data_parts_indexes", part->getNameWithState());
data_parts_indexes.insert(part);
out_transaction.addPart(part);
if (rename_in_transaction)
out_transaction.addPart(part, need_rename);
else
out_transaction.addPart(part, /* need_rename= */ false);
}
bool MergeTreeData::addTempPart(
@ -3960,7 +3963,8 @@ bool MergeTreeData::renameTempPartAndReplaceImpl(
MutableDataPartPtr & part,
Transaction & out_transaction,
DataPartsLock & lock,
DataPartsVector * out_covered_parts)
DataPartsVector * out_covered_parts,
bool rename_in_transaction)
{
LOG_TRACE(log, "Renaming temporary part {} to {} with tid {}.", part->getDataPartStorage().getPartDirectory(), part->name, out_transaction.getTID());
@ -3999,7 +4003,7 @@ bool MergeTreeData::renameTempPartAndReplaceImpl(
/// All checks are passed. Now we can rename the part on disk.
/// So, we maintain invariant: if a non-temporary part in filesystem then it is in data_parts
preparePartForCommit(part, out_transaction, /* need_rename */ true);
preparePartForCommit(part, out_transaction, /* need_rename= */ true, rename_in_transaction);
if (out_covered_parts)
{
@ -4014,29 +4018,31 @@ bool MergeTreeData::renameTempPartAndReplaceUnlocked(
MutableDataPartPtr & part,
Transaction & out_transaction,
DataPartsLock & lock,
DataPartsVector * out_covered_parts)
bool rename_in_transaction)
{
return renameTempPartAndReplaceImpl(part, out_transaction, lock, out_covered_parts);
return renameTempPartAndReplaceImpl(part, out_transaction, lock, /*out_covered_parts=*/ nullptr, rename_in_transaction);
}
MergeTreeData::DataPartsVector MergeTreeData::renameTempPartAndReplace(
MutableDataPartPtr & part,
Transaction & out_transaction)
Transaction & out_transaction,
bool rename_in_transaction)
{
auto part_lock = lockParts();
DataPartsVector covered_parts;
renameTempPartAndReplaceImpl(part, out_transaction, part_lock, &covered_parts);
renameTempPartAndReplaceImpl(part, out_transaction, part_lock, &covered_parts, rename_in_transaction);
return covered_parts;
}
bool MergeTreeData::renameTempPartAndAdd(
MutableDataPartPtr & part,
Transaction & out_transaction,
DataPartsLock & lock)
DataPartsLock & lock,
bool rename_in_transaction)
{
DataPartsVector covered_parts;
if (!renameTempPartAndReplaceImpl(part, out_transaction, lock, &covered_parts))
if (!renameTempPartAndReplaceImpl(part, out_transaction, lock, &covered_parts, rename_in_transaction))
return false;
if (!covered_parts.empty())
@ -4077,9 +4083,9 @@ void MergeTreeData::removePartsFromWorkingSet(MergeTreeTransaction * txn, const
resetObjectColumnsFromActiveParts(acquired_lock);
}
void MergeTreeData::removePartsFromWorkingSetImmediatelyAndSetTemporaryState(const DataPartsVector & remove)
void MergeTreeData::removePartsFromWorkingSetImmediatelyAndSetTemporaryState(const DataPartsVector & remove, DataPartsLock * acquired_lock)
{
auto lock = lockParts();
auto lock = (acquired_lock) ? DataPartsLock() : lockParts();
for (const auto & part : remove)
{
@ -4245,8 +4251,9 @@ MergeTreeData::PartsToRemoveFromZooKeeper MergeTreeData::removePartsInRangeFromW
auto [new_data_part, tmp_dir_holder] = createEmptyPart(empty_info, partition, empty_part_name, NO_TRANSACTION_PTR);
MergeTreeData::Transaction transaction(*this, NO_TRANSACTION_RAW);
renameTempPartAndAdd(new_data_part, transaction, lock); /// All covered parts must be already removed
renameTempPartAndAdd(new_data_part, transaction, lock, /*rename_in_transaction=*/ true); /// All covered parts must be already removed
transaction.renameParts();
/// It will add the empty part to the set of Outdated parts without making it Active (exactly what we need)
transaction.rollback(&lock);
new_data_part->remove_time.store(0, std::memory_order_relaxed);
@ -6617,25 +6624,54 @@ TransactionID MergeTreeData::Transaction::getTID() const
return Tx::PrehistoricTID;
}
void MergeTreeData::Transaction::addPart(MutableDataPartPtr & part)
void MergeTreeData::Transaction::addPart(MutableDataPartPtr & part, bool need_rename)
{
precommitted_parts.insert(part);
if (need_rename)
precommitted_parts_need_rename.insert(part);
}
void MergeTreeData::Transaction::rollback(DataPartsLock * lock)
{
if (!isEmpty())
{
WriteBufferFromOwnString buf;
buf << "Removing parts:";
for (const auto & part : precommitted_parts)
buf << " " << part->getDataPartStorage().getPartDirectory();
buf << ".";
LOG_DEBUG(data.log, "Undoing transaction {}. {}", getTID(), buf.str());
for (const auto & part : precommitted_parts)
part->version.creation_csn.store(Tx::RolledBackCSN);
auto non_detached_precommitted_parts = precommitted_parts;
/// Remove detached parts from working set.
///
/// It is possible to have detached parts here, only when rename (in
/// commit()) of detached parts had been broken (i.e. during ATTACH),
/// i.e. the part itself is broken.
DataPartsVector detached_precommitted_parts;
for (auto it = non_detached_precommitted_parts.begin(); it != non_detached_precommitted_parts.end();)
{
const auto & part = *it;
if (part->getDataPartStorage().getParentDirectory() == DETACHED_DIR_NAME)
{
detached_precommitted_parts.push_back(part);
it = non_detached_precommitted_parts.erase(it);
}
else
++it;
}
WriteBufferFromOwnString buf;
buf << "Removing parts:";
for (const auto & part : non_detached_precommitted_parts)
buf << " " << part->getDataPartStorage().getPartDirectory();
buf << ".";
if (!detached_precommitted_parts.empty())
{
buf << " Rollbacking parts state to temporary and removing from working set:";
for (const auto & part : detached_precommitted_parts)
buf << " " << part->getDataPartStorage().getPartDirectory();
buf << ".";
}
LOG_DEBUG(data.log, "Undoing transaction {}. {}", getTID(), buf.str());
/// It would be much better with TSA...
auto our_lock = (lock) ? DataPartsLock() : data.lockParts();
@ -6645,7 +6681,7 @@ void MergeTreeData::Transaction::rollback(DataPartsLock * lock)
if (!data.all_data_dropped)
{
Strings part_names;
for (const auto & part : precommitted_parts)
for (const auto & part : non_detached_precommitted_parts)
part_names.emplace_back(part->name);
throw Exception(ErrorCodes::LOGICAL_ERROR, "There are some PreActive parts ({}) to rollback, "
"but data parts set is empty and table {} was not dropped. It's a bug",
@ -6654,8 +6690,12 @@ void MergeTreeData::Transaction::rollback(DataPartsLock * lock)
}
else
{
data.removePartsFromWorkingSetImmediatelyAndSetTemporaryState(
detached_precommitted_parts,
&our_lock);
data.removePartsFromWorkingSet(txn,
DataPartsVector(precommitted_parts.begin(), precommitted_parts.end()),
DataPartsVector(non_detached_precommitted_parts.begin(), non_detached_precommitted_parts.end()),
/* clear_without_timeout = */ true, &our_lock);
}
}
@ -6665,7 +6705,16 @@ void MergeTreeData::Transaction::rollback(DataPartsLock * lock)
void MergeTreeData::Transaction::clear()
{
chassert(precommitted_parts.size() >= precommitted_parts_need_rename.size());
precommitted_parts.clear();
precommitted_parts_need_rename.clear();
}
void MergeTreeData::Transaction::renameParts()
{
for (const auto & part_need_rename : precommitted_parts_need_rename)
part_need_rename->renameTo(part_need_rename->name, true);
precommitted_parts_need_rename.clear();
}
MergeTreeData::DataPartsVector MergeTreeData::Transaction::commit(DataPartsLock * acquired_parts_lock)
@ -6674,6 +6723,9 @@ MergeTreeData::DataPartsVector MergeTreeData::Transaction::commit(DataPartsLock
if (!isEmpty())
{
if (!precommitted_parts_need_rename.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Parts not renamed");
auto settings = data.getSettings();
auto parts_lock = acquired_parts_lock ? DataPartsLock() : data.lockParts();
auto * owing_parts_lock = acquired_parts_lock ? acquired_parts_lock : &parts_lock;
@ -6682,6 +6734,8 @@ MergeTreeData::DataPartsVector MergeTreeData::Transaction::commit(DataPartsLock
if (part->getDataPartStorage().hasActiveTransaction())
part->getDataPartStorage().commitTransaction();
renameParts();
if (txn)
{
for (const auto & part : precommitted_parts)

View File

@ -255,7 +255,9 @@ public:
DataPartsVector commit(DataPartsLock * acquired_parts_lock = nullptr);
void addPart(MutableDataPartPtr & part);
void renameParts();
void addPart(MutableDataPartPtr & part, bool need_rename);
void rollback(DataPartsLock * lock = nullptr);
@ -286,9 +288,9 @@ public:
MergeTreeData & data;
MergeTreeTransaction * txn;
MutableDataParts precommitted_parts;
MutableDataParts locked_parts;
MutableDataParts precommitted_parts;
MutableDataParts precommitted_parts_need_rename;
};
using TransactionUniquePtr = std::unique_ptr<Transaction>;
@ -588,25 +590,27 @@ public:
bool renameTempPartAndAdd(
MutableDataPartPtr & part,
Transaction & transaction,
DataPartsLock & lock);
DataPartsLock & lock,
bool rename_in_transaction);
/// The same as renameTempPartAndAdd but the block range of the part can contain existing parts.
/// Returns all parts covered by the added part (in ascending order).
DataPartsVector renameTempPartAndReplace(
MutableDataPartPtr & part,
Transaction & out_transaction);
Transaction & out_transaction,
bool rename_in_transaction);
/// Unlocked version of previous one. Useful when added multiple parts with a single lock.
bool renameTempPartAndReplaceUnlocked(
MutableDataPartPtr & part,
Transaction & out_transaction,
DataPartsLock & lock,
DataPartsVector * out_covered_parts = nullptr);
bool rename_in_transaction);
/// Remove parts from working set immediately (without wait for background
/// process). Transfer part state to temporary. Have very limited usage only
/// for new parts which aren't already present in table.
void removePartsFromWorkingSetImmediatelyAndSetTemporaryState(const DataPartsVector & remove);
void removePartsFromWorkingSetImmediatelyAndSetTemporaryState(const DataPartsVector & remove, DataPartsLock * acquired_lock = nullptr);
/// Removes parts from the working set parts.
/// Parts in add must already be in data_parts with PreActive, Active, or Outdated states.
@ -1602,7 +1606,10 @@ private:
/// Preparing itself to be committed in memory: fill some fields inside part, add it to data_parts_indexes
/// in precommitted state and to transaction
void preparePartForCommit(MutableDataPartPtr & part, Transaction & out_transaction, bool need_rename);
///
/// @param need_rename - rename the part
/// @param rename_in_transaction - if set, the rename will be done as part of transaction (without holding DataPartsLock), otherwise inplace (when it does not make sense).
void preparePartForCommit(MutableDataPartPtr & part, Transaction & out_transaction, bool need_rename, bool rename_in_transaction = false);
/// Low-level method for preparing parts for commit (in-memory).
/// FIXME Merge MergeTreeTransaction and Transaction
@ -1610,7 +1617,8 @@ private:
MutableDataPartPtr & part,
Transaction & out_transaction,
DataPartsLock & lock,
DataPartsVector * out_covered_parts);
DataPartsVector * out_covered_parts,
bool rename_in_transaction);
/// RAII Wrapper for atomic work with currently moving parts
/// Acquire them in constructor and remove them in destructor

View File

@ -748,7 +748,10 @@ MergeTreeData::DataPartPtr MergeTreeDataMergerMutator::renameMergedTemporaryPart
"but transactions were enabled for this table");
/// Rename new part, add to the set and remove original parts.
auto replaced_parts = data.renameTempPartAndReplace(new_data_part, out_transaction);
auto replaced_parts = data.renameTempPartAndReplace(new_data_part, out_transaction, /*rename_in_transaction=*/ true);
/// Explicitly rename part while still holding the lock for tmp folder to avoid cleanup
out_transaction.renameParts();
/// Let's check that all original parts have been deleted and only them.
if (replaced_parts.size() != parts.size())

View File

@ -186,7 +186,8 @@ void MergeTreeSink::finishDelayedChunk()
}
}
added = storage.renameTempPartAndAdd(part, transaction, lock);
/// FIXME
added = storage.renameTempPartAndAdd(part, transaction, lock, /*rename_in_transaction=*/ false);
transaction.commit(&lock);
}

View File

@ -236,10 +236,11 @@ bool MutateFromLogEntryTask::finalize(ReplicatedMergeMutateTaskBase::PartLogWrit
if (data_part_storage.hasActiveTransaction())
data_part_storage.precommitTransaction();
storage.renameTempPartAndReplace(new_part, *transaction_ptr);
storage.renameTempPartAndReplace(new_part, *transaction_ptr, /*rename_in_transaction=*/ true);
try
{
transaction_ptr->renameParts();
storage.checkPartChecksumsAndCommit(*transaction_ptr, new_part, mutate_task->getHardlinkedFiles());
}
catch (const Exception & e)

View File

@ -97,7 +97,8 @@ bool MutatePlainMergeTreeTask::executeStep()
MergeTreeData::Transaction transaction(storage, merge_mutate_entry->txn.get());
/// FIXME Transactions: it's too optimistic, better to lock parts before starting transaction
storage.renameTempPartAndReplace(new_part, transaction);
storage.renameTempPartAndReplace(new_part, transaction, /*rename_in_transaction=*/ true);
transaction.renameParts();
transaction.commit();
storage.updateMutationEntriesErrors(future_part, true, "");

View File

@ -888,7 +888,7 @@ std::pair<std::vector<String>, bool> ReplicatedMergeTreeSinkImpl<async_insert>::
try
{
auto lock = storage.lockParts();
storage.renameTempPartAndAdd(part, transaction, lock);
storage.renameTempPartAndAdd(part, transaction, lock, /*rename_in_transaction=*/ false);
}
catch (const Exception & e)
{
@ -903,6 +903,9 @@ std::pair<std::vector<String>, bool> ReplicatedMergeTreeSinkImpl<async_insert>::
throw;
}
/// Rename parts before committing to ZooKeeper without holding DataPartsLock.
transaction.renameParts();
ThreadFuzzer::maybeInjectSleep();
fiu_do_on(FailPoints::replicated_merge_tree_commit_zk_fail_after_op, { zookeeper->forceFailureAfterOperation(); });

View File

@ -1788,7 +1788,7 @@ void StorageMergeTree::renameAndCommitEmptyParts(MutableDataPartsVector & new_pa
for (auto & part: new_parts)
{
DataPartsVector covered_parts_by_one_part = renameTempPartAndReplace(part, transaction);
DataPartsVector covered_parts_by_one_part = renameTempPartAndReplace(part, transaction, /*rename_in_transaction=*/ true);
if (covered_parts_by_one_part.size() > 1)
throw Exception(ErrorCodes::LOGICAL_ERROR,
@ -1798,10 +1798,10 @@ void StorageMergeTree::renameAndCommitEmptyParts(MutableDataPartsVector & new_pa
std::move(covered_parts_by_one_part.begin(), covered_parts_by_one_part.end(), std::back_inserter(covered_parts));
}
LOG_INFO(log, "Remove {} parts by covering them with empty {} parts. With txn {}.",
covered_parts.size(), new_parts.size(), transaction.getTID());
transaction.renameParts();
transaction.commit();
/// Remove covered parts without waiting for old_parts_lifetime seconds.
@ -2064,7 +2064,7 @@ PartitionCommandsResultInfo StorageMergeTree::attachPartition(
{
auto lock = lockParts();
fillNewPartNameAndResetLevel(loaded_parts[i], lock);
renameTempPartAndAdd(loaded_parts[i], transaction, lock);
renameTempPartAndAdd(loaded_parts[i], transaction, lock, /*rename_in_transaction=*/ false);
transaction.commit(&lock);
}
@ -2180,8 +2180,9 @@ void StorageMergeTree::replacePartitionFrom(const StoragePtr & source_table, con
for (auto part : dst_parts)
{
fillNewPartName(part, data_parts_lock);
renameTempPartAndReplaceUnlocked(part, transaction, data_parts_lock);
renameTempPartAndReplaceUnlocked(part, transaction, data_parts_lock, /*rename_in_transaction=*/ true);
}
transaction.renameParts();
/// Populate transaction
transaction.commit(&data_parts_lock);
@ -2284,10 +2285,9 @@ void StorageMergeTree::movePartitionToTable(const StoragePtr & dest_table, const
for (auto & part : dst_parts)
{
dest_table_storage->fillNewPartName(part, dest_data_parts_lock);
dest_table_storage->renameTempPartAndReplaceUnlocked(part, transaction, dest_data_parts_lock);
dest_table_storage->renameTempPartAndReplaceUnlocked(part, transaction, dest_data_parts_lock, /*rename_in_transaction=*/ false);
}
removePartsFromWorkingSet(local_context->getCurrentTransaction().get(), src_parts, true, src_data_parts_lock);
transaction.commit(&src_data_parts_lock);
}
@ -2447,7 +2447,7 @@ void StorageMergeTree::attachRestoredParts(MutableDataPartsVector && parts)
{
auto lock = lockParts();
fillNewPartName(part, lock);
renameTempPartAndAdd(part, transaction, lock);
renameTempPartAndAdd(part, transaction, lock, /*rename_in_transaction=*/ false);
transaction.commit(&lock);
}
}

View File

@ -2093,7 +2093,8 @@ bool StorageReplicatedMergeTree::executeLogEntry(LogEntry & entry)
Transaction transaction(*this, NO_TRANSACTION_RAW);
part->version.setCreationTID(Tx::PrehistoricTID, nullptr);
renameTempPartAndReplace(part, transaction);
renameTempPartAndReplace(part, transaction, /*rename_in_transaction=*/ true);
transaction.renameParts();
checkPartChecksumsAndCommit(transaction, part);
writePartLog(PartLogElement::Type::NEW_PART, {}, 0 /** log entry is fake so we don't measure the time */,
@ -2882,11 +2883,11 @@ bool StorageReplicatedMergeTree::executeReplaceRange(LogEntry & entry)
Coordination::Requests ops;
for (PartDescriptionPtr & part_desc : final_parts)
{
renameTempPartAndReplace(part_desc->res_part, transaction);
renameTempPartAndReplace(part_desc->res_part, transaction, /*rename_in_transaction=*/ true);
getCommitPartOps(ops, part_desc->res_part);
lockSharedData(*part_desc->res_part, /* replace_existing_lock */ true, part_desc->hardlinked_files);
lockSharedData(*part_desc->res_part, /*replace_existing_lock=*/ true, part_desc->hardlinked_files);
}
transaction.renameParts();
if (!ops.empty())
@ -4958,7 +4959,8 @@ bool StorageReplicatedMergeTree::fetchPart(
if (!to_detached)
{
Transaction transaction(*this, NO_TRANSACTION_RAW);
renameTempPartAndReplace(part, transaction);
renameTempPartAndReplace(part, transaction, /*rename_in_transaction=*/ true);
transaction.renameParts();
chassert(!part_to_clone || !is_zero_copy_part(part));
replaced_parts = checkPartChecksumsAndCommit(transaction, part, /*hardlinked_files*/ {}, /*replace_zero_copy_lock*/ true);
@ -8202,8 +8204,9 @@ void StorageReplicatedMergeTree::replacePartitionFrom(
{
auto data_parts_lock = lockParts();
for (auto & part : dst_parts)
renameTempPartAndReplaceUnlocked(part, transaction, data_parts_lock);
renameTempPartAndReplaceUnlocked(part, transaction, data_parts_lock, /*rename_in_transaction=*/ true);
}
transaction.renameParts();
for (const auto & dst_part : dst_parts)
lockSharedData(*dst_part, false, /*hardlinked_files*/ {});
@ -8478,7 +8481,7 @@ void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_ta
auto dest_data_parts_lock = dest_table_storage->lockParts();
for (auto & part : dst_parts)
dest_table_storage->renameTempPartAndReplaceUnlocked(part, transaction, dest_data_parts_lock);
dest_table_storage->renameTempPartAndReplaceUnlocked(part, transaction, dest_data_parts_lock, /*rename_in_transaction=*/ false);
for (const auto & dst_part : dst_parts)
dest_table_storage->lockSharedData(*dst_part, false, /*hardlinked_files*/ {});
@ -10111,7 +10114,8 @@ bool StorageReplicatedMergeTree::createEmptyPartInsteadOfLost(zkutil::ZooKeeperP
try
{
MergeTreeData::Transaction transaction(*this, NO_TRANSACTION_RAW);
auto replaced_parts = renameTempPartAndReplace(new_data_part, transaction);
auto replaced_parts = renameTempPartAndReplace(new_data_part, transaction, /*rename_in_transaction=*/ true);
transaction.renameParts();
if (!replaced_parts.empty())
{

View File

@ -212,6 +212,20 @@ FROM merge('system', '^asynchronous_metric_log')
WHERE event_date >= toDate(now() - {seconds:UInt32}) AND event_time >= now() - {seconds:UInt32} AND metric = 'MaxPartCountForPartition'
GROUP BY t
ORDER BY t WITH FILL STEP {rounding:UInt32}
)EOQ") }
},
{
{ "dashboard", "Overview" },
{ "title", "Concurrent network connections" },
{ "query", trim(R"EOQ(
SELECT toStartOfInterval(event_time, INTERVAL {rounding:UInt32} SECOND)::INT AS t,
sum(CurrentMetric_TCPConnection) AS TCP_Connections,
sum(CurrentMetric_MySQLConnection) AS MySQL_Connections,
sum(CurrentMetric_HTTPConnection) AS HTTP_Connections
FROM merge('system', '^metric_log')
WHERE event_date >= toDate(now() - {seconds:UInt32}) AND event_time >= now() - {seconds:UInt32}
GROUP BY t
ORDER BY t WITH FILL STEP {rounding:UInt32}
)EOQ") }
},
/// Default dashboard for ClickHouse Cloud
@ -349,6 +363,11 @@ ORDER BY t WITH FILL STEP {rounding:UInt32}
{ "dashboard", "Cloud overview" },
{ "title", "Network send bytes/sec" },
{ "query", "SELECT toStartOfInterval(event_time, INTERVAL {rounding:UInt32} SECOND)::INT AS t, avg(value)\nFROM (\n SELECT event_time, sum(value) AS value\n FROM clusterAllReplicas(default, merge('system', '^asynchronous_metric_log'))\n WHERE event_date >= toDate(now() - {seconds:UInt32})\n AND event_time >= now() - {seconds:UInt32}\n AND metric LIKE 'NetworkSendBytes%'\n GROUP BY event_time)\nGROUP BY t\nORDER BY t WITH FILL STEP {rounding:UInt32} SETTINGS skip_unavailable_shards = 1" }
},
{
{ "dashboard", "Cloud overview" },
{ "title", "Concurrent network connections" },
{ "query", "SELECT toStartOfInterval(event_time, INTERVAL {rounding:UInt32} SECOND)::INT AS t, max(TCP_Connections), max(MySQL_Connections), max(HTTP_Connections) FROM (SELECT event_time, sum(CurrentMetric_TCPConnection) AS TCP_Connections, sum(CurrentMetric_MySQLConnection) AS MySQL_Connections, sum(CurrentMetric_HTTPConnection) AS HTTP_Connections FROM clusterAllReplicas(default, merge('system', '^metric_log')) WHERE event_date >= toDate(now() - {seconds:UInt32}) AND event_time >= now() - {seconds:UInt32} GROUP BY event_time) GROUP BY t ORDER BY t WITH FILL STEP {rounding:UInt32} SETTINGS skip_unavailable_shards = 1" }
}
};