From 2843aaaf24046bdfe81a9683a6adbc57bf0c9882 Mon Sep 17 00:00:00 2001 From: Thom O'Connor Date: Wed, 29 May 2024 18:21:57 -0600 Subject: [PATCH 01/17] Updated Advanced Dashboard for both open-source and ClickHouse Cloud versions to include a chart for 'Maximum concurrent network connections' --- .../System/StorageSystemDashboards.cpp | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/src/Storages/System/StorageSystemDashboards.cpp b/src/Storages/System/StorageSystemDashboards.cpp index 9682fbc74a1..0e92769764c 100644 --- a/src/Storages/System/StorageSystemDashboards.cpp +++ b/src/Storages/System/StorageSystemDashboards.cpp @@ -212,6 +212,20 @@ FROM merge('system', '^asynchronous_metric_log') WHERE event_date >= toDate(now() - {seconds:UInt32}) AND event_time >= now() - {seconds:UInt32} AND metric = 'MaxPartCountForPartition' GROUP BY t ORDER BY t WITH FILL STEP {rounding:UInt32} +)EOQ") } + }, + { + { "dashboard", "Overview" }, + { "title", "Maximum concurrent network connections" }, + { "query", trim(R"EOQ( +SELECT toStartOfInterval(event_time, INTERVAL {rounding:UInt32} SECOND)::INT AS t, max(TCP_Connections), max(MySQL_Connections), max(HTTP_Connections) +FROM ( +SELECT event_time, sum(CurrentMetric_TCPConnection) AS TCP_Connections, sum(CurrentMetric_MySQLConnection) AS MySQL_Connections, sum(CurrentMetric_HTTPConnection) AS HTTP_Connections +FROM merge('system', '^metric_log') +WHERE event_date >= toDate(now() - {seconds:UInt32}) AND event_time >= now() - {seconds:UInt32} +GROUP BY event_time) +GROUP BY t +ORDER BY t WITH FILL STEP {rounding:UInt32} )EOQ") } }, /// Default dashboard for ClickHouse Cloud @@ -349,6 +363,11 @@ ORDER BY t WITH FILL STEP {rounding:UInt32} { "dashboard", "Cloud overview" }, { "title", "Network send bytes/sec" }, { "query", "SELECT toStartOfInterval(event_time, INTERVAL {rounding:UInt32} SECOND)::INT AS t, avg(value)\nFROM (\n SELECT event_time, sum(value) AS value\n FROM clusterAllReplicas(default, merge('system', '^asynchronous_metric_log'))\n WHERE event_date >= toDate(now() - {seconds:UInt32})\n AND event_time >= now() - {seconds:UInt32}\n AND metric LIKE 'NetworkSendBytes%'\n GROUP BY event_time)\nGROUP BY t\nORDER BY t WITH FILL STEP {rounding:UInt32} SETTINGS skip_unavailable_shards = 1" } + }, + { + { "dashboard", "Cloud overview" }, + { "title", "Maximum concurrent network connections" }, + { "query", "SELECT toStartOfInterval(event_time, INTERVAL {rounding:UInt32} SECOND)::INT AS t, max(TCP_Connections), max(MySQL_Connections), max(HTTP_Connections) FROM ( SELECT event_time, sum(CurrentMetric_TCPConnection) AS TCP_Connections, sum(CurrentMetric_MySQLConnection) AS MySQL_Connections, sum(CurrentMetric_HTTPConnection) AS HTTP_Connections FROM clusterAllReplicas(default, merge('system', '^metric_log')) WHERE event_date >= toDate(now() - {seconds:UInt32}) AND event_time >= now() - {seconds:UInt32} GROUP BY event_time) GROUP BY t ORDER BY t WITH FILL STEP {rounding:UInt32} SETTINGS skip_unavailable_shards = 1" } } }; From 6de079e10cbf8e2510dbe6cd45c8c84d40e70609 Mon Sep 17 00:00:00 2001 From: Thom O'Connor Date: Thu, 30 May 2024 18:00:03 -0600 Subject: [PATCH 02/17] Minor update: modified 'Maximum concurrent network connections' to 'Concurrent network connections' --- src/Storages/System/StorageSystemDashboards.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Storages/System/StorageSystemDashboards.cpp b/src/Storages/System/StorageSystemDashboards.cpp index 0e92769764c..57f84e09857 100644 --- a/src/Storages/System/StorageSystemDashboards.cpp +++ b/src/Storages/System/StorageSystemDashboards.cpp @@ -216,7 +216,7 @@ ORDER BY t WITH FILL STEP {rounding:UInt32} }, { { "dashboard", "Overview" }, - { "title", "Maximum concurrent network connections" }, + { "title", "Concurrent network connections" }, { "query", trim(R"EOQ( SELECT toStartOfInterval(event_time, INTERVAL {rounding:UInt32} SECOND)::INT AS t, max(TCP_Connections), max(MySQL_Connections), max(HTTP_Connections) FROM ( @@ -366,7 +366,7 @@ ORDER BY t WITH FILL STEP {rounding:UInt32} }, { { "dashboard", "Cloud overview" }, - { "title", "Maximum concurrent network connections" }, + { "title", "Concurrent network connections" }, { "query", "SELECT toStartOfInterval(event_time, INTERVAL {rounding:UInt32} SECOND)::INT AS t, max(TCP_Connections), max(MySQL_Connections), max(HTTP_Connections) FROM ( SELECT event_time, sum(CurrentMetric_TCPConnection) AS TCP_Connections, sum(CurrentMetric_MySQLConnection) AS MySQL_Connections, sum(CurrentMetric_HTTPConnection) AS HTTP_Connections FROM clusterAllReplicas(default, merge('system', '^metric_log')) WHERE event_date >= toDate(now() - {seconds:UInt32}) AND event_time >= now() - {seconds:UInt32} GROUP BY event_time) GROUP BY t ORDER BY t WITH FILL STEP {rounding:UInt32} SETTINGS skip_unavailable_shards = 1" } } }; From 4aa396d115029ef3fb963bedc2c873749dac24db Mon Sep 17 00:00:00 2001 From: Michael Kolupaev Date: Mon, 3 Jun 2024 22:45:48 +0000 Subject: [PATCH 03/17] Fix assert in IObjectStorageIteratorAsync --- .../ObjectStorageIteratorAsync.cpp | 43 ++++++++++--------- 1 file changed, 23 insertions(+), 20 deletions(-) diff --git a/src/Disks/ObjectStorages/ObjectStorageIteratorAsync.cpp b/src/Disks/ObjectStorages/ObjectStorageIteratorAsync.cpp index 0420de0f8dd..a249789df4b 100644 --- a/src/Disks/ObjectStorages/ObjectStorageIteratorAsync.cpp +++ b/src/Disks/ObjectStorages/ObjectStorageIteratorAsync.cpp @@ -36,30 +36,24 @@ void IObjectStorageIteratorAsync::deactivate() void IObjectStorageIteratorAsync::nextBatch() { std::lock_guard lock(mutex); + if (is_finished) { current_batch.clear(); current_batch_iterator = current_batch.begin(); + return; } - else - { - if (!is_initialized) - { - outcome_future = scheduleBatch(); - is_initialized = true; - } + if (!is_initialized) + { + outcome_future = scheduleBatch(); + is_initialized = true; + } + + try + { chassert(outcome_future.valid()); - BatchAndHasNext result; - try - { - result = outcome_future.get(); - } - catch (...) - { - is_finished = true; - throw; - } + BatchAndHasNext result = outcome_future.get(); current_batch = std::move(result.batch); current_batch_iterator = current_batch.begin(); @@ -71,6 +65,11 @@ void IObjectStorageIteratorAsync::nextBatch() else is_finished = true; } + catch (...) + { + is_finished = true; + throw; + } } void IObjectStorageIteratorAsync::next() @@ -95,35 +94,39 @@ std::future IObjectStorageIterator bool IObjectStorageIteratorAsync::isValid() { + std::lock_guard lock(mutex); + if (!is_initialized) nextBatch(); - std::lock_guard lock(mutex); return current_batch_iterator != current_batch.end(); } RelativePathWithMetadataPtr IObjectStorageIteratorAsync::current() { + std::lock_guard lock(mutex); + if (!isValid()) throw Exception(ErrorCodes::LOGICAL_ERROR, "Trying to access invalid iterator"); - std::lock_guard lock(mutex); return *current_batch_iterator; } RelativePathsWithMetadata IObjectStorageIteratorAsync::currentBatch() { + std::lock_guard lock(mutex); + if (!isValid()) throw Exception(ErrorCodes::LOGICAL_ERROR, "Trying to access invalid iterator"); - std::lock_guard lock(mutex); return current_batch; } std::optional IObjectStorageIteratorAsync::getCurrentBatchAndScheduleNext() { std::lock_guard lock(mutex); + if (!is_initialized) nextBatch(); From 66a2962ccef3f64f3c51178955d2839739d3d882 Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Thu, 4 Apr 2024 20:26:33 +0200 Subject: [PATCH 04/17] Add reason into "Part {} is broken and need manual correction" message Signed-off-by: Azat Khuzhin --- src/Storages/MergeTree/IMergeTreeDataPart.cpp | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/Storages/MergeTree/IMergeTreeDataPart.cpp b/src/Storages/MergeTree/IMergeTreeDataPart.cpp index 4c8f1240cf5..143394b1171 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPart.cpp +++ b/src/Storages/MergeTree/IMergeTreeDataPart.cpp @@ -737,7 +737,11 @@ void IMergeTreeDataPart::loadColumnsChecksumsIndexes(bool require_columns_checks { /// Don't scare people with broken part error if (!isRetryableException(std::current_exception())) - LOG_ERROR(storage.log, "Part {} is broken and need manual correction", getDataPartStorage().getFullPath()); + { + auto message = getCurrentExceptionMessage(true); + LOG_ERROR(storage.log, "Part {} is broken and need manual correction. Reason: {}", + getDataPartStorage().getFullPath(), message); + } // There could be conditions that data part to be loaded is broken, but some of meta infos are already written // into meta data before exception, need to clean them all. From 78088ce59a9562e3c805ab54147c68c888228615 Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Wed, 27 Mar 2024 11:51:59 +0100 Subject: [PATCH 05/17] Reduce lock contention for MergeTree tables (by renaming parts without holding lock) Under heavy load, or not so heavy but with fsync_part_directory=1, time that renameTo() holds DataPartsLock will be increased, and this will affect almost every operation with this table. On one of production clusters I saw ~60 seconds with fsync_part_directory=1. Move the renameTo() out from the critical section. v2: instead of using DataPartsLock.lock.lock()/unlock() move the renameTo() into MergeTreeData::Transaction::commit() Signed-off-by: Azat Khuzhin --- src/Storages/MergeTree/MergeTreeData.cpp | 14 +++++++++----- src/Storages/MergeTree/MergeTreeData.h | 4 +++- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index cd706dab9ae..1042dca4bd0 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -3906,12 +3906,9 @@ void MergeTreeData::preparePartForCommit(MutableDataPartPtr & part, Transaction return !may_be_cleaned_up || temporary_parts.contains(dir_name); }()); - if (need_rename) - part->renameTo(part->name, true); - LOG_TEST(log, "preparePartForCommit: inserting {} into data_parts_indexes", part->getNameWithState()); data_parts_indexes.insert(part); - out_transaction.addPart(part); + out_transaction.addPart(part, need_rename); } bool MergeTreeData::addTempPart( @@ -6617,9 +6614,11 @@ TransactionID MergeTreeData::Transaction::getTID() const return Tx::PrehistoricTID; } -void MergeTreeData::Transaction::addPart(MutableDataPartPtr & part) +void MergeTreeData::Transaction::addPart(MutableDataPartPtr & part, bool need_rename) { precommitted_parts.insert(part); + if (need_rename) + precommitted_parts_need_rename.insert(part); } void MergeTreeData::Transaction::rollback(DataPartsLock * lock) @@ -6665,7 +6664,9 @@ void MergeTreeData::Transaction::rollback(DataPartsLock * lock) void MergeTreeData::Transaction::clear() { + chassert(precommitted_parts.size() >= precommitted_parts_need_rename.size()); precommitted_parts.clear(); + precommitted_parts_need_rename.clear(); } MergeTreeData::DataPartsVector MergeTreeData::Transaction::commit(DataPartsLock * acquired_parts_lock) @@ -6682,6 +6683,9 @@ MergeTreeData::DataPartsVector MergeTreeData::Transaction::commit(DataPartsLock if (part->getDataPartStorage().hasActiveTransaction()) part->getDataPartStorage().commitTransaction(); + for (const auto & part_need_rename : precommitted_parts_need_rename) + part_need_rename->renameTo(part_need_rename->name, true); + if (txn) { for (const auto & part : precommitted_parts) diff --git a/src/Storages/MergeTree/MergeTreeData.h b/src/Storages/MergeTree/MergeTreeData.h index c6f736a4afd..6abdebbe98d 100644 --- a/src/Storages/MergeTree/MergeTreeData.h +++ b/src/Storages/MergeTree/MergeTreeData.h @@ -255,7 +255,7 @@ public: DataPartsVector commit(DataPartsLock * acquired_parts_lock = nullptr); - void addPart(MutableDataPartPtr & part); + void addPart(MutableDataPartPtr & part, bool need_rename); void rollback(DataPartsLock * lock = nullptr); @@ -286,7 +286,9 @@ public: MergeTreeData & data; MergeTreeTransaction * txn; + MutableDataParts precommitted_parts; + MutableDataParts precommitted_parts_need_rename; MutableDataParts locked_parts; }; From 6c3db34aaeb0d45a573daf341900dc9ae1b0cb50 Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Wed, 27 Mar 2024 12:38:24 +0100 Subject: [PATCH 06/17] Remove unused locked_parts from MergeTreeData::Transaction Signed-off-by: Azat Khuzhin --- src/Storages/MergeTree/MergeTreeData.h | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/Storages/MergeTree/MergeTreeData.h b/src/Storages/MergeTree/MergeTreeData.h index 6abdebbe98d..e4009107093 100644 --- a/src/Storages/MergeTree/MergeTreeData.h +++ b/src/Storages/MergeTree/MergeTreeData.h @@ -289,8 +289,6 @@ public: MutableDataParts precommitted_parts; MutableDataParts precommitted_parts_need_rename; - MutableDataParts locked_parts; - }; using TransactionUniquePtr = std::unique_ptr; From ee546fa00a72a29f8b91f3cfff77caa37fd598c5 Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Thu, 28 Mar 2024 09:04:36 +0100 Subject: [PATCH 07/17] Fix replacing parts with empty Signed-off-by: Azat Khuzhin --- src/Storages/MergeTree/MergeTreeData.cpp | 10 ++++++++-- src/Storages/MergeTree/MergeTreeData.h | 2 ++ 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index 1042dca4bd0..5ea9f012e8d 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -4244,6 +4244,7 @@ MergeTreeData::PartsToRemoveFromZooKeeper MergeTreeData::removePartsInRangeFromW MergeTreeData::Transaction transaction(*this, NO_TRANSACTION_RAW); renameTempPartAndAdd(new_data_part, transaction, lock); /// All covered parts must be already removed + transaction.renameParts(); /// It will add the empty part to the set of Outdated parts without making it Active (exactly what we need) transaction.rollback(&lock); new_data_part->remove_time.store(0, std::memory_order_relaxed); @@ -6669,6 +6670,12 @@ void MergeTreeData::Transaction::clear() precommitted_parts_need_rename.clear(); } +void MergeTreeData::Transaction::renameParts() +{ + for (const auto & part_need_rename : precommitted_parts_need_rename) + part_need_rename->renameTo(part_need_rename->name, true); +} + MergeTreeData::DataPartsVector MergeTreeData::Transaction::commit(DataPartsLock * acquired_parts_lock) { DataPartsVector total_covered_parts; @@ -6683,8 +6690,7 @@ MergeTreeData::DataPartsVector MergeTreeData::Transaction::commit(DataPartsLock if (part->getDataPartStorage().hasActiveTransaction()) part->getDataPartStorage().commitTransaction(); - for (const auto & part_need_rename : precommitted_parts_need_rename) - part_need_rename->renameTo(part_need_rename->name, true); + renameParts(); if (txn) { diff --git a/src/Storages/MergeTree/MergeTreeData.h b/src/Storages/MergeTree/MergeTreeData.h index e4009107093..29818a24331 100644 --- a/src/Storages/MergeTree/MergeTreeData.h +++ b/src/Storages/MergeTree/MergeTreeData.h @@ -255,6 +255,8 @@ public: DataPartsVector commit(DataPartsLock * acquired_parts_lock = nullptr); + void renameParts(); + void addPart(MutableDataPartPtr & part, bool need_rename); void rollback(DataPartsLock * lock = nullptr); From b41d08a2b618ec98de0a02862fa1e5463e01f364 Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Thu, 28 Mar 2024 21:07:06 +0100 Subject: [PATCH 08/17] Use renameParts() explicitly to avoid leaving parts in detached Since there is an assertion that does not allows to remove detached parts during cleanup, which sounds good in general, but breaks this new code. Signed-off-by: Azat Khuzhin --- src/Storages/MergeTree/MergeTreeData.cpp | 1 + src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp | 3 +++ 2 files changed, 4 insertions(+) diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index 5ea9f012e8d..a323266b0a8 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -6674,6 +6674,7 @@ void MergeTreeData::Transaction::renameParts() { for (const auto & part_need_rename : precommitted_parts_need_rename) part_need_rename->renameTo(part_need_rename->name, true); + precommitted_parts_need_rename.clear(); } MergeTreeData::DataPartsVector MergeTreeData::Transaction::commit(DataPartsLock * acquired_parts_lock) diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp index 4b4f4c33e7d..215239ff401 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp @@ -903,6 +903,9 @@ std::pair, bool> ReplicatedMergeTreeSinkImpl:: throw; } + /// Rename parts before committing to ZooKeeper without holding DataPartsLock. + transaction.renameParts(); + ThreadFuzzer::maybeInjectSleep(); fiu_do_on(FailPoints::replicated_merge_tree_commit_zk_fail_after_op, { zookeeper->forceFailureAfterOperation(); }); From ca2c720d0ecf22c57a4ed7e5405d3f146348b884 Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Fri, 29 Mar 2024 17:48:12 +0100 Subject: [PATCH 09/17] Avoid race between cleanup thread and renameMergedTemporaryPart() The problem was that with this patch set renameMergedTemporaryPart() is called without temporary_directory_lock holded (in MergeTask), since it is reseted just before calling renameMergedTemporaryPart(), and this can be seen in logs: 2024.03.29 19:56:42.126919 [ 1341 ] {ea7a3fd2-cf47-4ec7-91a5-51c69fba1b95::-8_0_138_2_2} test_btnct5cr.alter_table_0 (ea7a3fd2-cf47-4ec7-91a5-51c69fba1b95) (MergerMutator): Merged 50 parts: [-8_0_0_0_2, -8_138_138_0] -> -8_0_138_2_2 2024.03.29 19:56:42.127034 [ 1341 ] {ea7a3fd2-cf47-4ec7-91a5-51c69fba1b95::-8_0_138_2_2} test_btnct5cr.alter_table_0 (ea7a3fd2-cf47-4ec7-91a5-51c69fba1b95): Committing part -8_0_138_2_2 to zookeeper 2024.03.29 19:56:42.128462 [ 884 ] {} test_btnct5cr.alter_table_0 (ea7a3fd2-cf47-4ec7-91a5-51c69fba1b95): Removing temporary directory /var/lib/clickhouse/store/ea7/ea7a3fd2-cf47-4ec7-91a5-51c69fba1b95/tmp_merge_-8_0_138_2_2/ 2024.03.29 19:56:42.128647 [ 1341 ] {ea7a3fd2-cf47-4ec7-91a5-51c69fba1b95::-8_0_138_2_2} test_btnct5cr.alter_table_0 (ea7a3fd2-cf47-4ec7-91a5-51c69fba1b95): Part -8_0_138_2_2 committed to zookeeper ... 2024.03.29 19:56:54.586084 [ 57841 ] {bf240267-0620-4294-afc1-479c58e6be89} executeQuery: std::exception. Code: 1001, type: std::__1::__fs::filesystem::filesystem_error, e.what() = filesystem error: in file_size: No such file or directory ["/var/lib/clickhouse/store/ea7/ea7a3fd2-cf47-4ec7-91a5-51c69fba1b95/-8_0_138_2_2/data.cmrk3"] This should fix failures of 00993_system_parts_race_condition_drop_zookeeper in [1]. [1]: https://s3.amazonaws.com/clickhouse-test-reports/61973/f6f826c85dd5b7bb8db16286fd10dcf441a440f7/stateless_tests__coverage__[4_6].html Though now it looks hackish... Signed-off-by: Azat Khuzhin --- src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp b/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp index 2d49e1df19b..b3fd6c3edb1 100644 --- a/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp +++ b/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp @@ -750,6 +750,9 @@ MergeTreeData::DataPartPtr MergeTreeDataMergerMutator::renameMergedTemporaryPart /// Rename new part, add to the set and remove original parts. auto replaced_parts = data.renameTempPartAndReplace(new_data_part, out_transaction); + /// Explicitly rename part while still holding the lock for tmp folder to avoid cleanup + out_transaction.renameParts(); + /// Let's check that all original parts have been deleted and only them. if (replaced_parts.size() != parts.size()) { From 3675c27fe9e64c7f30cc84c1418afdde5817ff23 Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Thu, 4 Apr 2024 20:50:30 +0200 Subject: [PATCH 10/17] Require explicit rename of parts in transaction Signed-off-by: Azat Khuzhin --- src/Storages/MergeTree/MergeTreeData.cpp | 34 +++++++++++++------ src/Storages/MergeTree/MergeTreeData.h | 16 ++++++--- .../MergeTree/MergeTreeDataMergerMutator.cpp | 2 +- src/Storages/MergeTree/MergeTreeSink.cpp | 3 +- .../MergeTree/MutateFromLogEntryTask.cpp | 3 +- .../MergeTree/MutatePlainMergeTreeTask.cpp | 3 +- .../MergeTree/ReplicatedMergeTreeSink.cpp | 2 +- src/Storages/StorageMergeTree.cpp | 14 ++++---- src/Storages/StorageReplicatedMergeTree.cpp | 20 ++++++----- 9 files changed, 61 insertions(+), 36 deletions(-) diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index a323266b0a8..e18d2a57a6d 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -3894,7 +3894,7 @@ void MergeTreeData::checkPartDynamicColumns(MutableDataPartPtr & part, DataParts } } -void MergeTreeData::preparePartForCommit(MutableDataPartPtr & part, Transaction & out_transaction, bool need_rename) +void MergeTreeData::preparePartForCommit(MutableDataPartPtr & part, Transaction & out_transaction, bool need_rename, bool rename_in_transaction) { part->is_temp = false; part->setState(DataPartState::PreActive); @@ -3906,9 +3906,15 @@ void MergeTreeData::preparePartForCommit(MutableDataPartPtr & part, Transaction return !may_be_cleaned_up || temporary_parts.contains(dir_name); }()); + if (need_rename && !rename_in_transaction) + part->renameTo(part->name, true); + LOG_TEST(log, "preparePartForCommit: inserting {} into data_parts_indexes", part->getNameWithState()); data_parts_indexes.insert(part); - out_transaction.addPart(part, need_rename); + if (rename_in_transaction) + out_transaction.addPart(part, need_rename); + else + out_transaction.addPart(part, /* need_rename= */ false); } bool MergeTreeData::addTempPart( @@ -3957,7 +3963,8 @@ bool MergeTreeData::renameTempPartAndReplaceImpl( MutableDataPartPtr & part, Transaction & out_transaction, DataPartsLock & lock, - DataPartsVector * out_covered_parts) + DataPartsVector * out_covered_parts, + bool rename_in_transaction) { LOG_TRACE(log, "Renaming temporary part {} to {} with tid {}.", part->getDataPartStorage().getPartDirectory(), part->name, out_transaction.getTID()); @@ -3996,7 +4003,7 @@ bool MergeTreeData::renameTempPartAndReplaceImpl( /// All checks are passed. Now we can rename the part on disk. /// So, we maintain invariant: if a non-temporary part in filesystem then it is in data_parts - preparePartForCommit(part, out_transaction, /* need_rename */ true); + preparePartForCommit(part, out_transaction, /* need_rename= */ true, rename_in_transaction); if (out_covered_parts) { @@ -4011,29 +4018,31 @@ bool MergeTreeData::renameTempPartAndReplaceUnlocked( MutableDataPartPtr & part, Transaction & out_transaction, DataPartsLock & lock, - DataPartsVector * out_covered_parts) + bool rename_in_transaction) { - return renameTempPartAndReplaceImpl(part, out_transaction, lock, out_covered_parts); + return renameTempPartAndReplaceImpl(part, out_transaction, lock, /*out_covered_parts=*/ nullptr, rename_in_transaction); } MergeTreeData::DataPartsVector MergeTreeData::renameTempPartAndReplace( MutableDataPartPtr & part, - Transaction & out_transaction) + Transaction & out_transaction, + bool rename_in_transaction) { auto part_lock = lockParts(); DataPartsVector covered_parts; - renameTempPartAndReplaceImpl(part, out_transaction, part_lock, &covered_parts); + renameTempPartAndReplaceImpl(part, out_transaction, part_lock, &covered_parts, rename_in_transaction); return covered_parts; } bool MergeTreeData::renameTempPartAndAdd( MutableDataPartPtr & part, Transaction & out_transaction, - DataPartsLock & lock) + DataPartsLock & lock, + bool rename_in_transaction) { DataPartsVector covered_parts; - if (!renameTempPartAndReplaceImpl(part, out_transaction, lock, &covered_parts)) + if (!renameTempPartAndReplaceImpl(part, out_transaction, lock, &covered_parts, rename_in_transaction)) return false; if (!covered_parts.empty()) @@ -4242,7 +4251,7 @@ MergeTreeData::PartsToRemoveFromZooKeeper MergeTreeData::removePartsInRangeFromW auto [new_data_part, tmp_dir_holder] = createEmptyPart(empty_info, partition, empty_part_name, NO_TRANSACTION_PTR); MergeTreeData::Transaction transaction(*this, NO_TRANSACTION_RAW); - renameTempPartAndAdd(new_data_part, transaction, lock); /// All covered parts must be already removed + renameTempPartAndAdd(new_data_part, transaction, lock, /*rename_in_transaction=*/ true); /// All covered parts must be already removed transaction.renameParts(); /// It will add the empty part to the set of Outdated parts without making it Active (exactly what we need) @@ -6683,6 +6692,9 @@ MergeTreeData::DataPartsVector MergeTreeData::Transaction::commit(DataPartsLock if (!isEmpty()) { + if (!precommitted_parts_need_rename.empty()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Parts not renamed"); + auto settings = data.getSettings(); auto parts_lock = acquired_parts_lock ? DataPartsLock() : data.lockParts(); auto * owing_parts_lock = acquired_parts_lock ? acquired_parts_lock : &parts_lock; diff --git a/src/Storages/MergeTree/MergeTreeData.h b/src/Storages/MergeTree/MergeTreeData.h index 29818a24331..d9c53863a4f 100644 --- a/src/Storages/MergeTree/MergeTreeData.h +++ b/src/Storages/MergeTree/MergeTreeData.h @@ -590,20 +590,22 @@ public: bool renameTempPartAndAdd( MutableDataPartPtr & part, Transaction & transaction, - DataPartsLock & lock); + DataPartsLock & lock, + bool rename_in_transaction); /// The same as renameTempPartAndAdd but the block range of the part can contain existing parts. /// Returns all parts covered by the added part (in ascending order). DataPartsVector renameTempPartAndReplace( MutableDataPartPtr & part, - Transaction & out_transaction); + Transaction & out_transaction, + bool rename_in_transaction); /// Unlocked version of previous one. Useful when added multiple parts with a single lock. bool renameTempPartAndReplaceUnlocked( MutableDataPartPtr & part, Transaction & out_transaction, DataPartsLock & lock, - DataPartsVector * out_covered_parts = nullptr); + bool rename_in_transaction); /// Remove parts from working set immediately (without wait for background /// process). Transfer part state to temporary. Have very limited usage only @@ -1604,7 +1606,10 @@ private: /// Preparing itself to be committed in memory: fill some fields inside part, add it to data_parts_indexes /// in precommitted state and to transaction - void preparePartForCommit(MutableDataPartPtr & part, Transaction & out_transaction, bool need_rename); + /// + /// @param need_rename - rename the part + /// @param rename_in_transaction - if set, the rename will be done as part of transaction (without holding DataPartsLock), otherwise inplace (when it does not make sense). + void preparePartForCommit(MutableDataPartPtr & part, Transaction & out_transaction, bool need_rename, bool rename_in_transaction = false); /// Low-level method for preparing parts for commit (in-memory). /// FIXME Merge MergeTreeTransaction and Transaction @@ -1612,7 +1617,8 @@ private: MutableDataPartPtr & part, Transaction & out_transaction, DataPartsLock & lock, - DataPartsVector * out_covered_parts); + DataPartsVector * out_covered_parts, + bool rename_in_transaction); /// RAII Wrapper for atomic work with currently moving parts /// Acquire them in constructor and remove them in destructor diff --git a/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp b/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp index b3fd6c3edb1..791bcbc3275 100644 --- a/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp +++ b/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp @@ -748,7 +748,7 @@ MergeTreeData::DataPartPtr MergeTreeDataMergerMutator::renameMergedTemporaryPart "but transactions were enabled for this table"); /// Rename new part, add to the set and remove original parts. - auto replaced_parts = data.renameTempPartAndReplace(new_data_part, out_transaction); + auto replaced_parts = data.renameTempPartAndReplace(new_data_part, out_transaction, /*rename_in_transaction=*/ true); /// Explicitly rename part while still holding the lock for tmp folder to avoid cleanup out_transaction.renameParts(); diff --git a/src/Storages/MergeTree/MergeTreeSink.cpp b/src/Storages/MergeTree/MergeTreeSink.cpp index b7dede3cb00..dd28c04fef7 100644 --- a/src/Storages/MergeTree/MergeTreeSink.cpp +++ b/src/Storages/MergeTree/MergeTreeSink.cpp @@ -186,7 +186,8 @@ void MergeTreeSink::finishDelayedChunk() } } - added = storage.renameTempPartAndAdd(part, transaction, lock); + /// FIXME + added = storage.renameTempPartAndAdd(part, transaction, lock, /*rename_in_transaction=*/ false); transaction.commit(&lock); } diff --git a/src/Storages/MergeTree/MutateFromLogEntryTask.cpp b/src/Storages/MergeTree/MutateFromLogEntryTask.cpp index 8d40658bb2c..5c59d5c1b47 100644 --- a/src/Storages/MergeTree/MutateFromLogEntryTask.cpp +++ b/src/Storages/MergeTree/MutateFromLogEntryTask.cpp @@ -236,10 +236,11 @@ bool MutateFromLogEntryTask::finalize(ReplicatedMergeMutateTaskBase::PartLogWrit if (data_part_storage.hasActiveTransaction()) data_part_storage.precommitTransaction(); - storage.renameTempPartAndReplace(new_part, *transaction_ptr); + storage.renameTempPartAndReplace(new_part, *transaction_ptr, /*rename_in_transaction=*/ true); try { + transaction_ptr->renameParts(); storage.checkPartChecksumsAndCommit(*transaction_ptr, new_part, mutate_task->getHardlinkedFiles()); } catch (const Exception & e) diff --git a/src/Storages/MergeTree/MutatePlainMergeTreeTask.cpp b/src/Storages/MergeTree/MutatePlainMergeTreeTask.cpp index 2fd02708421..8a0d5c444bd 100644 --- a/src/Storages/MergeTree/MutatePlainMergeTreeTask.cpp +++ b/src/Storages/MergeTree/MutatePlainMergeTreeTask.cpp @@ -97,7 +97,8 @@ bool MutatePlainMergeTreeTask::executeStep() MergeTreeData::Transaction transaction(storage, merge_mutate_entry->txn.get()); /// FIXME Transactions: it's too optimistic, better to lock parts before starting transaction - storage.renameTempPartAndReplace(new_part, transaction); + storage.renameTempPartAndReplace(new_part, transaction, /*rename_in_transaction=*/ true); + transaction.renameParts(); transaction.commit(); storage.updateMutationEntriesErrors(future_part, true, ""); diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp index 215239ff401..50142185f79 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp @@ -888,7 +888,7 @@ std::pair, bool> ReplicatedMergeTreeSinkImpl:: try { auto lock = storage.lockParts(); - storage.renameTempPartAndAdd(part, transaction, lock); + storage.renameTempPartAndAdd(part, transaction, lock, /*rename_in_transaction=*/ false); } catch (const Exception & e) { diff --git a/src/Storages/StorageMergeTree.cpp b/src/Storages/StorageMergeTree.cpp index 27a76f4f21d..a85bc936031 100644 --- a/src/Storages/StorageMergeTree.cpp +++ b/src/Storages/StorageMergeTree.cpp @@ -1788,7 +1788,7 @@ void StorageMergeTree::renameAndCommitEmptyParts(MutableDataPartsVector & new_pa for (auto & part: new_parts) { - DataPartsVector covered_parts_by_one_part = renameTempPartAndReplace(part, transaction); + DataPartsVector covered_parts_by_one_part = renameTempPartAndReplace(part, transaction, /*rename_in_transaction=*/ true); if (covered_parts_by_one_part.size() > 1) throw Exception(ErrorCodes::LOGICAL_ERROR, @@ -1798,10 +1798,10 @@ void StorageMergeTree::renameAndCommitEmptyParts(MutableDataPartsVector & new_pa std::move(covered_parts_by_one_part.begin(), covered_parts_by_one_part.end(), std::back_inserter(covered_parts)); } - LOG_INFO(log, "Remove {} parts by covering them with empty {} parts. With txn {}.", covered_parts.size(), new_parts.size(), transaction.getTID()); + transaction.renameParts(); transaction.commit(); /// Remove covered parts without waiting for old_parts_lifetime seconds. @@ -2064,7 +2064,7 @@ PartitionCommandsResultInfo StorageMergeTree::attachPartition( { auto lock = lockParts(); fillNewPartNameAndResetLevel(loaded_parts[i], lock); - renameTempPartAndAdd(loaded_parts[i], transaction, lock); + renameTempPartAndAdd(loaded_parts[i], transaction, lock, /*rename_in_transaction=*/ false); transaction.commit(&lock); } @@ -2180,8 +2180,9 @@ void StorageMergeTree::replacePartitionFrom(const StoragePtr & source_table, con for (auto part : dst_parts) { fillNewPartName(part, data_parts_lock); - renameTempPartAndReplaceUnlocked(part, transaction, data_parts_lock); + renameTempPartAndReplaceUnlocked(part, transaction, data_parts_lock, /*rename_in_transaction=*/ true); } + transaction.renameParts(); /// Populate transaction transaction.commit(&data_parts_lock); @@ -2284,10 +2285,9 @@ void StorageMergeTree::movePartitionToTable(const StoragePtr & dest_table, const for (auto & part : dst_parts) { dest_table_storage->fillNewPartName(part, dest_data_parts_lock); - dest_table_storage->renameTempPartAndReplaceUnlocked(part, transaction, dest_data_parts_lock); + dest_table_storage->renameTempPartAndReplaceUnlocked(part, transaction, dest_data_parts_lock, /*rename_in_transaction=*/ false); } - removePartsFromWorkingSet(local_context->getCurrentTransaction().get(), src_parts, true, src_data_parts_lock); transaction.commit(&src_data_parts_lock); } @@ -2447,7 +2447,7 @@ void StorageMergeTree::attachRestoredParts(MutableDataPartsVector && parts) { auto lock = lockParts(); fillNewPartName(part, lock); - renameTempPartAndAdd(part, transaction, lock); + renameTempPartAndAdd(part, transaction, lock, /*rename_in_transaction=*/ false); transaction.commit(&lock); } } diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index e18e66d7af9..9ebca78d87a 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -2093,7 +2093,8 @@ bool StorageReplicatedMergeTree::executeLogEntry(LogEntry & entry) Transaction transaction(*this, NO_TRANSACTION_RAW); part->version.setCreationTID(Tx::PrehistoricTID, nullptr); - renameTempPartAndReplace(part, transaction); + renameTempPartAndReplace(part, transaction, /*rename_in_transaction=*/ true); + transaction.renameParts(); checkPartChecksumsAndCommit(transaction, part); writePartLog(PartLogElement::Type::NEW_PART, {}, 0 /** log entry is fake so we don't measure the time */, @@ -2882,11 +2883,11 @@ bool StorageReplicatedMergeTree::executeReplaceRange(LogEntry & entry) Coordination::Requests ops; for (PartDescriptionPtr & part_desc : final_parts) { - renameTempPartAndReplace(part_desc->res_part, transaction); + renameTempPartAndReplace(part_desc->res_part, transaction, /*rename_in_transaction=*/ true); getCommitPartOps(ops, part_desc->res_part); - - lockSharedData(*part_desc->res_part, /* replace_existing_lock */ true, part_desc->hardlinked_files); + lockSharedData(*part_desc->res_part, /*replace_existing_lock=*/ true, part_desc->hardlinked_files); } + transaction.renameParts(); if (!ops.empty()) @@ -4958,7 +4959,8 @@ bool StorageReplicatedMergeTree::fetchPart( if (!to_detached) { Transaction transaction(*this, NO_TRANSACTION_RAW); - renameTempPartAndReplace(part, transaction); + renameTempPartAndReplace(part, transaction, /*rename_in_transaction=*/ true); + transaction.renameParts(); chassert(!part_to_clone || !is_zero_copy_part(part)); replaced_parts = checkPartChecksumsAndCommit(transaction, part, /*hardlinked_files*/ {}, /*replace_zero_copy_lock*/ true); @@ -8202,8 +8204,9 @@ void StorageReplicatedMergeTree::replacePartitionFrom( { auto data_parts_lock = lockParts(); for (auto & part : dst_parts) - renameTempPartAndReplaceUnlocked(part, transaction, data_parts_lock); + renameTempPartAndReplaceUnlocked(part, transaction, data_parts_lock, /*rename_in_transaction=*/ true); } + transaction.renameParts(); for (const auto & dst_part : dst_parts) lockSharedData(*dst_part, false, /*hardlinked_files*/ {}); @@ -8478,7 +8481,7 @@ void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_ta auto dest_data_parts_lock = dest_table_storage->lockParts(); for (auto & part : dst_parts) - dest_table_storage->renameTempPartAndReplaceUnlocked(part, transaction, dest_data_parts_lock); + dest_table_storage->renameTempPartAndReplaceUnlocked(part, transaction, dest_data_parts_lock, /*rename_in_transaction=*/ false); for (const auto & dst_part : dst_parts) dest_table_storage->lockSharedData(*dst_part, false, /*hardlinked_files*/ {}); @@ -10111,7 +10114,8 @@ bool StorageReplicatedMergeTree::createEmptyPartInsteadOfLost(zkutil::ZooKeeperP try { MergeTreeData::Transaction transaction(*this, NO_TRANSACTION_RAW); - auto replaced_parts = renameTempPartAndReplace(new_data_part, transaction); + auto replaced_parts = renameTempPartAndReplace(new_data_part, transaction, /*rename_in_transaction=*/ true); + transaction.renameParts(); if (!replaced_parts.empty()) { From 6f522c1d619c5ce67cc4d6758409addfaee3618d Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Fri, 5 Apr 2024 10:24:30 +0200 Subject: [PATCH 11/17] Do not remove detached parts in Transaction::rollback Signed-off-by: Azat Khuzhin --- .../MergeTree/DataPartStorageOnDiskBase.cpp | 16 ++++++-- .../MergeTree/DataPartStorageOnDiskBase.h | 1 + src/Storages/MergeTree/IDataPartStorage.h | 11 +++--- src/Storages/MergeTree/MergeTreeData.cpp | 39 ++++++++++++++++--- src/Storages/MergeTree/MergeTreeData.h | 2 +- 5 files changed, 55 insertions(+), 14 deletions(-) diff --git a/src/Storages/MergeTree/DataPartStorageOnDiskBase.cpp b/src/Storages/MergeTree/DataPartStorageOnDiskBase.cpp index 378a1944396..120e0a6f426 100644 --- a/src/Storages/MergeTree/DataPartStorageOnDiskBase.cpp +++ b/src/Storages/MergeTree/DataPartStorageOnDiskBase.cpp @@ -59,6 +59,16 @@ std::string DataPartStorageOnDiskBase::getRelativePath() const return fs::path(root_path) / part_dir / ""; } +std::string DataPartStorageOnDiskBase::getParentDirectory() const +{ + /// Cut last "/" if it exists (it shouldn't). Otherwise fs::path behave differently. + fs::path part_dir_without_slash = part_dir.ends_with("/") ? part_dir.substr(0, part_dir.size() - 1) : part_dir; + + if (part_dir_without_slash.has_parent_path()) + return part_dir_without_slash.parent_path(); + return ""; +} + std::optional DataPartStorageOnDiskBase::getRelativePathForPrefix(LoggerPtr log, const String & prefix, bool detached, bool broken) const { assert(!broken || detached); @@ -674,9 +684,9 @@ void DataPartStorageOnDiskBase::remove( if (!has_delete_prefix) { - if (part_dir_without_slash.has_parent_path()) + auto parent_path = getParentDirectory(); + if (!parent_path.empty()) { - auto parent_path = part_dir_without_slash.parent_path(); if (parent_path == MergeTreeData::DETACHED_DIR_NAME) throw Exception( ErrorCodes::LOGICAL_ERROR, @@ -684,7 +694,7 @@ void DataPartStorageOnDiskBase::remove( part_dir, root_path); - part_dir_without_slash = parent_path / ("delete_tmp_" + std::string{part_dir_without_slash.filename()}); + part_dir_without_slash = fs::path(parent_path) / ("delete_tmp_" + std::string{part_dir_without_slash.filename()}); } else { diff --git a/src/Storages/MergeTree/DataPartStorageOnDiskBase.h b/src/Storages/MergeTree/DataPartStorageOnDiskBase.h index 81353d4e20b..44b2454e256 100644 --- a/src/Storages/MergeTree/DataPartStorageOnDiskBase.h +++ b/src/Storages/MergeTree/DataPartStorageOnDiskBase.h @@ -20,6 +20,7 @@ public: std::string getRelativePath() const override; std::string getPartDirectory() const override; std::string getFullRootPath() const override; + std::string getParentDirectory() const override; Poco::Timestamp getLastModified() const override; UInt64 calculateTotalSizeOnDisk() const override; diff --git a/src/Storages/MergeTree/IDataPartStorage.h b/src/Storages/MergeTree/IDataPartStorage.h index f6320a7e1e4..9342d6ca0ea 100644 --- a/src/Storages/MergeTree/IDataPartStorage.h +++ b/src/Storages/MergeTree/IDataPartStorage.h @@ -96,11 +96,12 @@ public: virtual MergeTreeDataPartStorageType getType() const = 0; /// Methods to get path components of a data part. - virtual std::string getFullPath() const = 0; /// '/var/lib/clickhouse/data/database/table/moving/all_1_5_1' - virtual std::string getRelativePath() const = 0; /// 'database/table/moving/all_1_5_1' - virtual std::string getPartDirectory() const = 0; /// 'all_1_5_1' - virtual std::string getFullRootPath() const = 0; /// '/var/lib/clickhouse/data/database/table/moving' - /// Can add it if needed /// 'database/table/moving' + virtual std::string getFullPath() const = 0; /// '/var/lib/clickhouse/data/database/table/moving/all_1_5_1' + virtual std::string getRelativePath() const = 0; /// 'database/table/moving/all_1_5_1' + virtual std::string getPartDirectory() const = 0; /// 'all_1_5_1' + virtual std::string getFullRootPath() const = 0; /// '/var/lib/clickhouse/data/database/table/moving' + virtual std::string getParentDirectory() const = 0; /// '' (or 'detached' for 'detached/all_1_5_1') + /// Can add it if needed /// 'database/table/moving' /// virtual std::string getRelativeRootPath() const = 0; /// Get a storage for projection. diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index e18d2a57a6d..522c9f8dd82 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -4083,9 +4083,9 @@ void MergeTreeData::removePartsFromWorkingSet(MergeTreeTransaction * txn, const resetObjectColumnsFromActiveParts(acquired_lock); } -void MergeTreeData::removePartsFromWorkingSetImmediatelyAndSetTemporaryState(const DataPartsVector & remove) +void MergeTreeData::removePartsFromWorkingSetImmediatelyAndSetTemporaryState(const DataPartsVector & remove, DataPartsLock * acquired_lock) { - auto lock = lockParts(); + auto lock = (acquired_lock) ? DataPartsLock() : lockParts(); for (const auto & part : remove) { @@ -6635,16 +6635,41 @@ void MergeTreeData::Transaction::rollback(DataPartsLock * lock) { if (!isEmpty()) { + for (const auto & part : precommitted_parts) + part->version.creation_csn.store(Tx::RolledBackCSN); + + /// Remove detached parts from working set. + /// + /// It is possible to have detached parts here, only when rename (in + /// commit()) of detached parts had been broken (i.e. during ATTACH), + /// i.e. the part itself is broken. + DataPartsVector detached_precommitted_parts; + for (auto it = precommitted_parts.begin(); it != precommitted_parts.end();) + { + const auto & part = *it; + if (part->getDataPartStorage().getParentDirectory() == DETACHED_DIR_NAME) + { + detached_precommitted_parts.push_back(part); + it = precommitted_parts.erase(it); + } + else + ++it; + } + WriteBufferFromOwnString buf; buf << "Removing parts:"; for (const auto & part : precommitted_parts) buf << " " << part->getDataPartStorage().getPartDirectory(); buf << "."; + if (!detached_precommitted_parts.empty()) + { + buf << " Rollbacking parts state to temporary and removing from working set:"; + for (const auto & part : detached_precommitted_parts) + buf << " " << part->getDataPartStorage().getPartDirectory(); + buf << "."; + } LOG_DEBUG(data.log, "Undoing transaction {}. {}", getTID(), buf.str()); - for (const auto & part : precommitted_parts) - part->version.creation_csn.store(Tx::RolledBackCSN); - /// It would be much better with TSA... auto our_lock = (lock) ? DataPartsLock() : data.lockParts(); @@ -6663,6 +6688,10 @@ void MergeTreeData::Transaction::rollback(DataPartsLock * lock) } else { + data.removePartsFromWorkingSetImmediatelyAndSetTemporaryState( + detached_precommitted_parts, + &our_lock); + data.removePartsFromWorkingSet(txn, DataPartsVector(precommitted_parts.begin(), precommitted_parts.end()), /* clear_without_timeout = */ true, &our_lock); diff --git a/src/Storages/MergeTree/MergeTreeData.h b/src/Storages/MergeTree/MergeTreeData.h index d9c53863a4f..7881062b724 100644 --- a/src/Storages/MergeTree/MergeTreeData.h +++ b/src/Storages/MergeTree/MergeTreeData.h @@ -610,7 +610,7 @@ public: /// Remove parts from working set immediately (without wait for background /// process). Transfer part state to temporary. Have very limited usage only /// for new parts which aren't already present in table. - void removePartsFromWorkingSetImmediatelyAndSetTemporaryState(const DataPartsVector & remove); + void removePartsFromWorkingSetImmediatelyAndSetTemporaryState(const DataPartsVector & remove, DataPartsLock * acquired_lock = nullptr); /// Removes parts from the working set parts. /// Parts in add must already be in data_parts with PreActive, Active, or Outdated states. From 6cfd5b2165970a65a551117fe58e4b9d22237b8c Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Thu, 11 Apr 2024 14:10:13 +0200 Subject: [PATCH 12/17] Fix possible assertion when size of precommitted_parts <= precommitted_parts_need_rename CI founds [1]: Logical error: 'precommitted_parts.size() >= precommitted_parts_need_rename.size()' [1]: https://s3.amazonaws.com/clickhouse-test-reports/61973/5c1e6a3e956917bdbb7eaa467934e5b75f17a923/stateless_tests__tsan__s3_storage__[5_5].html The problem is that after precommitted_parts cleaned from detached parts it may be less then precommitted_parts_need_rename, so to avoid this, let's just copy it to a new container. Signed-off-by: Azat Khuzhin --- src/Storages/MergeTree/MergeTreeData.cpp | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index 522c9f8dd82..ace28e058d4 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -6638,19 +6638,21 @@ void MergeTreeData::Transaction::rollback(DataPartsLock * lock) for (const auto & part : precommitted_parts) part->version.creation_csn.store(Tx::RolledBackCSN); + auto non_detached_precommitted_parts = precommitted_parts; + /// Remove detached parts from working set. /// /// It is possible to have detached parts here, only when rename (in /// commit()) of detached parts had been broken (i.e. during ATTACH), /// i.e. the part itself is broken. DataPartsVector detached_precommitted_parts; - for (auto it = precommitted_parts.begin(); it != precommitted_parts.end();) + for (auto it = non_detached_precommitted_parts.begin(); it != non_detached_precommitted_parts.end();) { const auto & part = *it; if (part->getDataPartStorage().getParentDirectory() == DETACHED_DIR_NAME) { detached_precommitted_parts.push_back(part); - it = precommitted_parts.erase(it); + it = non_detached_precommitted_parts.erase(it); } else ++it; @@ -6658,7 +6660,7 @@ void MergeTreeData::Transaction::rollback(DataPartsLock * lock) WriteBufferFromOwnString buf; buf << "Removing parts:"; - for (const auto & part : precommitted_parts) + for (const auto & part : non_detached_precommitted_parts) buf << " " << part->getDataPartStorage().getPartDirectory(); buf << "."; if (!detached_precommitted_parts.empty()) @@ -6679,7 +6681,7 @@ void MergeTreeData::Transaction::rollback(DataPartsLock * lock) if (!data.all_data_dropped) { Strings part_names; - for (const auto & part : precommitted_parts) + for (const auto & part : non_detached_precommitted_parts) part_names.emplace_back(part->name); throw Exception(ErrorCodes::LOGICAL_ERROR, "There are some PreActive parts ({}) to rollback, " "but data parts set is empty and table {} was not dropped. It's a bug", @@ -6693,7 +6695,7 @@ void MergeTreeData::Transaction::rollback(DataPartsLock * lock) &our_lock); data.removePartsFromWorkingSet(txn, - DataPartsVector(precommitted_parts.begin(), precommitted_parts.end()), + DataPartsVector(non_detached_precommitted_parts.begin(), non_detached_precommitted_parts.end()), /* clear_without_timeout = */ true, &our_lock); } } From eb8520758a6ed83ea1ab63e2a1c0b4164e160693 Mon Sep 17 00:00:00 2001 From: Nikita Mikhaylov Date: Wed, 5 Jun 2024 20:51:07 +0200 Subject: [PATCH 13/17] Done --- src/Core/SettingsChangesHistory.h | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/Core/SettingsChangesHistory.h b/src/Core/SettingsChangesHistory.h index 40b3f5a7bfa..8c09afef7c6 100644 --- a/src/Core/SettingsChangesHistory.h +++ b/src/Core/SettingsChangesHistory.h @@ -97,6 +97,8 @@ static std::map sett {"azure_ignore_file_doesnt_exist", false, false, "Allow to return 0 rows when the requested files don't exist instead of throwing an exception in AzureBlobStorage table engine"}, {"s3_ignore_file_doesnt_exist", false, false, "Allow to return 0 rows when the requested files don't exist instead of throwing an exception in S3 table engine"}, {"enable_blob_storage_log", true, true, "Write information about blob storage operations to system.blob_storage_log table"}, + {"allow_statistics_optimize", false, false, "The setting was renamed. The previous name is `allow_statistic_optimize`."}, + {"allow_experimental_statistics", false, false, "The setting was renamed. The previous name is `allow_experimental_statistic`."} }}, {"24.5", {{"allow_deprecated_error_prone_window_functions", true, false, "Allow usage of deprecated error prone window functions (neighbor, runningAccumulate, runningDifferenceStartingWithFirstValue, runningDifference)"}, {"allow_experimental_join_condition", false, false, "Support join with inequal conditions which involve columns from both left and right table. e.g. t1.y < t2.y."}, From 106c1529ed3af167be986d85aa7eba98f40bf23a Mon Sep 17 00:00:00 2001 From: Nikita Mikhaylov Date: Wed, 5 Jun 2024 21:14:26 +0200 Subject: [PATCH 14/17] Introduce an alias --- src/Core/Settings.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Core/Settings.h b/src/Core/Settings.h index ff72995b2b7..27ce54c03a7 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -160,8 +160,8 @@ class IColumn; M(Bool, enable_multiple_prewhere_read_steps, true, "Move more conditions from WHERE to PREWHERE and do reads from disk and filtering in multiple steps if there are multiple conditions combined with AND", 0) \ M(Bool, move_primary_key_columns_to_end_of_prewhere, true, "Move PREWHERE conditions containing primary key columns to the end of AND chain. It is likely that these conditions are taken into account during primary key analysis and thus will not contribute a lot to PREWHERE filtering.", 0) \ \ - M(Bool, allow_statistics_optimize, false, "Allows using statistics to optimize queries", 0) \ - M(Bool, allow_experimental_statistics, false, "Allows using statistics", 0) \ + M(Bool, allow_statistics_optimize, false, "Allows using statistics to optimize queries", 0) ALIAS(allow_statistic_optimize) \ + M(Bool, allow_experimental_statistics, false, "Allows using statistics", 0) ALIAS(allow_experimental_statistic) \ \ M(UInt64, alter_sync, 1, "Wait for actions to manipulate the partitions. 0 - do not wait, 1 - wait for execution only of itself, 2 - wait for everyone.", 0) ALIAS(replication_alter_partitions_sync) \ M(Int64, replication_wait_for_inactive_replica_timeout, 120, "Wait for inactive replica to execute ALTER/OPTIMIZE. Time in seconds, 0 - do not wait, negative - wait for unlimited time.", 0) \ From f42452d51e5c711875ffc1cc91982be6b6f1578a Mon Sep 17 00:00:00 2001 From: Nikita Mikhaylov Date: Wed, 5 Jun 2024 21:57:23 +0200 Subject: [PATCH 15/17] Add settings to changes history --- src/Core/SettingsChangesHistory.h | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/Core/SettingsChangesHistory.h b/src/Core/SettingsChangesHistory.h index 8c09afef7c6..f7423754fc2 100644 --- a/src/Core/SettingsChangesHistory.h +++ b/src/Core/SettingsChangesHistory.h @@ -97,6 +97,8 @@ static std::map sett {"azure_ignore_file_doesnt_exist", false, false, "Allow to return 0 rows when the requested files don't exist instead of throwing an exception in AzureBlobStorage table engine"}, {"s3_ignore_file_doesnt_exist", false, false, "Allow to return 0 rows when the requested files don't exist instead of throwing an exception in S3 table engine"}, {"enable_blob_storage_log", true, true, "Write information about blob storage operations to system.blob_storage_log table"}, + {"allow_statistic_optimize", false, false, "Old setting which popped up here being renamed."}, + {"allow_experimental_statistic", false, false, "Old setting which popped up here being renamed."}, {"allow_statistics_optimize", false, false, "The setting was renamed. The previous name is `allow_statistic_optimize`."}, {"allow_experimental_statistics", false, false, "The setting was renamed. The previous name is `allow_experimental_statistic`."} }}, From 0d50dd302b05f51ba476db27fe11f9c65b1f2deb Mon Sep 17 00:00:00 2001 From: Nikita Mikhaylov Date: Wed, 5 Jun 2024 22:25:27 +0200 Subject: [PATCH 16/17] Bump From 8863736459a62decd423175aa04a11eff6576c81 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Wed, 5 Jun 2024 22:50:47 +0200 Subject: [PATCH 17/17] Fix style --- src/Storages/System/StorageSystemDashboards.cpp | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/Storages/System/StorageSystemDashboards.cpp b/src/Storages/System/StorageSystemDashboards.cpp index 57f84e09857..5faa37d951e 100644 --- a/src/Storages/System/StorageSystemDashboards.cpp +++ b/src/Storages/System/StorageSystemDashboards.cpp @@ -218,12 +218,12 @@ ORDER BY t WITH FILL STEP {rounding:UInt32} { "dashboard", "Overview" }, { "title", "Concurrent network connections" }, { "query", trim(R"EOQ( -SELECT toStartOfInterval(event_time, INTERVAL {rounding:UInt32} SECOND)::INT AS t, max(TCP_Connections), max(MySQL_Connections), max(HTTP_Connections) -FROM ( -SELECT event_time, sum(CurrentMetric_TCPConnection) AS TCP_Connections, sum(CurrentMetric_MySQLConnection) AS MySQL_Connections, sum(CurrentMetric_HTTPConnection) AS HTTP_Connections +SELECT toStartOfInterval(event_time, INTERVAL {rounding:UInt32} SECOND)::INT AS t, + sum(CurrentMetric_TCPConnection) AS TCP_Connections, + sum(CurrentMetric_MySQLConnection) AS MySQL_Connections, + sum(CurrentMetric_HTTPConnection) AS HTTP_Connections FROM merge('system', '^metric_log') WHERE event_date >= toDate(now() - {seconds:UInt32}) AND event_time >= now() - {seconds:UInt32} -GROUP BY event_time) GROUP BY t ORDER BY t WITH FILL STEP {rounding:UInt32} )EOQ") } @@ -367,7 +367,7 @@ ORDER BY t WITH FILL STEP {rounding:UInt32} { { "dashboard", "Cloud overview" }, { "title", "Concurrent network connections" }, - { "query", "SELECT toStartOfInterval(event_time, INTERVAL {rounding:UInt32} SECOND)::INT AS t, max(TCP_Connections), max(MySQL_Connections), max(HTTP_Connections) FROM ( SELECT event_time, sum(CurrentMetric_TCPConnection) AS TCP_Connections, sum(CurrentMetric_MySQLConnection) AS MySQL_Connections, sum(CurrentMetric_HTTPConnection) AS HTTP_Connections FROM clusterAllReplicas(default, merge('system', '^metric_log')) WHERE event_date >= toDate(now() - {seconds:UInt32}) AND event_time >= now() - {seconds:UInt32} GROUP BY event_time) GROUP BY t ORDER BY t WITH FILL STEP {rounding:UInt32} SETTINGS skip_unavailable_shards = 1" } + { "query", "SELECT toStartOfInterval(event_time, INTERVAL {rounding:UInt32} SECOND)::INT AS t, max(TCP_Connections), max(MySQL_Connections), max(HTTP_Connections) FROM (SELECT event_time, sum(CurrentMetric_TCPConnection) AS TCP_Connections, sum(CurrentMetric_MySQLConnection) AS MySQL_Connections, sum(CurrentMetric_HTTPConnection) AS HTTP_Connections FROM clusterAllReplicas(default, merge('system', '^metric_log')) WHERE event_date >= toDate(now() - {seconds:UInt32}) AND event_time >= now() - {seconds:UInt32} GROUP BY event_time) GROUP BY t ORDER BY t WITH FILL STEP {rounding:UInt32} SETTINGS skip_unavailable_shards = 1" } } };