Merge pull request #11346 from ClickHouse/optimize-final-should-force-merge-2

OPTIMIZE FINAL should force merge even if concurrent merges are performed (experimental, try 2)
This commit is contained in:
alexey-milovidov 2020-06-02 16:40:20 +03:00 committed by GitHub
commit d47d0d7108
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 158 additions and 5 deletions

View File

@ -368,6 +368,8 @@ public:
entry.latest_fail_reason = exception_message;
}
}
storage.currently_processing_in_background_condition.notify_all();
}
};
@ -569,7 +571,7 @@ bool StorageMergeTree::merge(
std::optional<CurrentlyMergingPartsTagger> merging_tagger;
{
std::lock_guard lock(currently_processing_in_background_mutex);
std::unique_lock lock(currently_processing_in_background_mutex);
auto can_merge = [this, &lock] (const DataPartPtr & left, const DataPartPtr & right, String *) -> bool
{
@ -593,8 +595,33 @@ bool StorageMergeTree::merge(
}
else
{
UInt64 disk_space = getStoragePolicy()->getMaxUnreservedFreeSpace();
selected = merger_mutator.selectAllPartsToMergeWithinPartition(future_part, disk_space, can_merge, partition_id, final, out_disable_reason);
while (true)
{
UInt64 disk_space = getStoragePolicy()->getMaxUnreservedFreeSpace();
selected = merger_mutator.selectAllPartsToMergeWithinPartition(
future_part, disk_space, can_merge, partition_id, final, out_disable_reason);
/// If final - we will wait for currently processing merges to finish and continue.
/// TODO Respect query settings for timeout
if (final
&& !selected
&& !currently_merging_mutating_parts.empty()
&& out_disable_reason
&& out_disable_reason->empty())
{
LOG_DEBUG(log, "Waiting for currently running merges ({} parts are merging right now) to perform OPTIMIZE FINAL",
currently_merging_mutating_parts.size());
if (std::cv_status::timeout == currently_processing_in_background_condition.wait_for(
lock, std::chrono::seconds(DBMS_DEFAULT_LOCK_ACQUIRE_TIMEOUT_SEC)))
{
*out_disable_reason = "Timeout while waiting for already running merges before running OPTIMIZE with FINAL";
break;
}
}
else
break;
}
}
if (!selected)
@ -850,7 +877,7 @@ BackgroundProcessingPoolTaskResult StorageMergeTree::mergeMutateTask()
Int64 StorageMergeTree::getCurrentMutationVersion(
const DataPartPtr & part,
std::lock_guard<std::mutex> & /* currently_processing_in_background_mutex_lock */) const
std::unique_lock<std::mutex> & /* currently_processing_in_background_mutex_lock */) const
{
auto it = current_mutations_by_version.upper_bound(part->info.getDataVersion());
if (it == current_mutations_by_version.begin())

View File

@ -95,6 +95,7 @@ private:
/// Mutex for parts currently processing in background
/// merging (also with TTL), mutating or moving.
mutable std::mutex currently_processing_in_background_mutex;
mutable std::condition_variable currently_processing_in_background_condition;
/// Parts that currently participate in merge or mutation.
/// This set have to be used with `currently_processing_in_background_mutex`.
@ -133,7 +134,7 @@ private:
Int64 getCurrentMutationVersion(
const DataPartPtr & part,
std::lock_guard<std::mutex> & /* currently_processing_in_background_mutex_lock */) const;
std::unique_lock<std::mutex> & /* currently_processing_in_background_mutex_lock */) const;
void clearOldMutations(bool truncate = false);

View File

@ -0,0 +1,100 @@
55 0
55 0
55 0
55 0
55 0
55 0
55 0
55 0
55 0
55 0
55 0
55 0
55 0
55 0
55 0
55 0
55 0
55 0
55 0
55 0
55 0
55 0
55 0
55 0
55 0
55 0
55 0
55 0
55 0
55 0
55 0
55 0
55 0
55 0
55 0
55 0
55 0
55 0
55 0
55 0
55 0
55 0
55 0
55 0
55 0
55 0
55 0
55 0
55 0
55 0
55 0
55 0
55 0
55 0
55 0
55 0
55 0
55 0
55 0
55 0
55 0
55 0
55 0
55 0
55 0
55 0
55 0
55 0
55 0
55 0
55 0
55 0
55 0
55 0
55 0
55 0
55 0
55 0
55 0
55 0
55 0
55 0
55 0
55 0
55 0
55 0
55 0
55 0
55 0
55 0
55 0
55 0
55 0
55 0
55 0
55 0
55 0
55 0
55 0
55 0

View File

@ -0,0 +1,25 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. $CURDIR/../shell_config.sh
for i in {1..100}; do $CLICKHOUSE_CLIENT --multiquery --query "
DROP TABLE IF EXISTS mt;
CREATE TABLE mt (x UInt8, k UInt8 DEFAULT 0) ENGINE = SummingMergeTree ORDER BY k;
INSERT INTO mt (x) VALUES (1);
INSERT INTO mt (x) VALUES (2);
INSERT INTO mt (x) VALUES (3);
INSERT INTO mt (x) VALUES (4);
INSERT INTO mt (x) VALUES (5);
INSERT INTO mt (x) VALUES (6);
INSERT INTO mt (x) VALUES (7);
INSERT INTO mt (x) VALUES (8);
INSERT INTO mt (x) VALUES (9);
INSERT INTO mt (x) VALUES (10);
OPTIMIZE TABLE mt FINAL;
SELECT * FROM mt;
DROP TABLE mt;
"; done