Merge pull request #50107 from ClickHouse/rmt_better_background_tasks_scheduling

Improve scheduling of background tasks in ReplicatedMergeTree
This commit is contained in:
Alexander Tokmakov 2023-06-20 16:25:19 +03:00 committed by GitHub
commit bca0d76cbf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
58 changed files with 544 additions and 208 deletions

View File

@ -11,3 +11,8 @@ constexpr double interpolateExponential(double min, double max, double ratio)
assert(min > 0 && ratio >= 0 && ratio <= 1);
return min * std::pow(max / min, ratio);
}
constexpr double interpolateLinear(double min, double max, double ratio)
{
return std::lerp(min, max, ratio);
}

View File

@ -59,6 +59,8 @@ install_packages previous_release_package_folder
# available for dump via clickhouse-local
configure
# it contains some new settings, but we can safely remove it
rm /etc/clickhouse-server/config.d/merge_tree.xml
rm /etc/clickhouse-server/users.d/nonconst_timezone.xml
start
@ -85,6 +87,8 @@ export USE_S3_STORAGE_FOR_MERGE_TREE=1
export ZOOKEEPER_FAULT_INJECTION=0
configure
# it contains some new settings, but we can safely remove it
rm /etc/clickhouse-server/config.d/merge_tree.xml
rm /etc/clickhouse-server/users.d/nonconst_timezone.xml
start

View File

@ -211,9 +211,9 @@ void IMergeTreeDataPart::MinMaxIndex::appendFiles(const MergeTreeData & data, St
}
static void incrementStateMetric(MergeTreeDataPartState state)
void IMergeTreeDataPart::incrementStateMetric(MergeTreeDataPartState state_) const
{
switch (state)
switch (state_)
{
case MergeTreeDataPartState::Temporary:
CurrentMetrics::add(CurrentMetrics::PartsTemporary);
@ -227,6 +227,7 @@ static void incrementStateMetric(MergeTreeDataPartState state)
CurrentMetrics::add(CurrentMetrics::PartsCommitted);
return;
case MergeTreeDataPartState::Outdated:
storage.total_outdated_parts_count.fetch_add(1, std::memory_order_relaxed);
CurrentMetrics::add(CurrentMetrics::PartsOutdated);
return;
case MergeTreeDataPartState::Deleting:
@ -238,9 +239,9 @@ static void incrementStateMetric(MergeTreeDataPartState state)
}
}
static void decrementStateMetric(MergeTreeDataPartState state)
void IMergeTreeDataPart::decrementStateMetric(MergeTreeDataPartState state_) const
{
switch (state)
switch (state_)
{
case MergeTreeDataPartState::Temporary:
CurrentMetrics::sub(CurrentMetrics::PartsTemporary);
@ -254,6 +255,7 @@ static void decrementStateMetric(MergeTreeDataPartState state)
CurrentMetrics::sub(CurrentMetrics::PartsCommitted);
return;
case MergeTreeDataPartState::Outdated:
storage.total_outdated_parts_count.fetch_sub(1, std::memory_order_relaxed);
CurrentMetrics::sub(CurrentMetrics::PartsOutdated);
return;
case MergeTreeDataPartState::Deleting:

View File

@ -625,6 +625,9 @@ private:
/// for this column with default parameters.
CompressionCodecPtr detectDefaultCompressionCodec() const;
void incrementStateMetric(MergeTreeDataPartState state) const;
void decrementStateMetric(MergeTreeDataPartState state) const;
mutable MergeTreeDataPartState state{MergeTreeDataPartState::Temporary};
/// This ugly flag is needed for debug assertions only

View File

@ -83,6 +83,7 @@
#include <boost/algorithm/string/join.hpp>
#include <base/insertAtEnd.h>
#include <base/interpolate.h>
#include <algorithm>
#include <atomic>
@ -4280,6 +4281,29 @@ size_t MergeTreeData::getActivePartsCount() const
}
size_t MergeTreeData::getOutdatedPartsCount() const
{
return total_outdated_parts_count.load(std::memory_order_relaxed);
}
size_t MergeTreeData::getNumberOfOutdatedPartsWithExpiredRemovalTime() const
{
size_t res = 0;
auto time_now = time(nullptr);
auto parts_lock = lockParts();
auto outdated_parts_range = getDataPartsStateRange(DataPartState::Outdated);
for (const auto & part : outdated_parts_range)
{
auto part_remove_time = part->remove_time.load(std::memory_order_relaxed);
if (part_remove_time <= time_now && time_now - part_remove_time >= getSettings()->old_parts_lifetime.totalSeconds() && part.unique())
++res;
}
return res;
}
std::pair<size_t, size_t> MergeTreeData::getMaxPartsCountAndSizeForPartitionWithState(DataPartState state) const
{
auto lock = lockParts();
@ -4487,7 +4511,7 @@ void MergeTreeData::delayMutationOrThrowIfNeeded(Poco::Event * until, const Cont
size_t allowed_mutations_over_threshold = num_mutations_to_throw - num_mutations_to_delay;
double delay_factor = std::min(static_cast<double>(mutations_over_threshold) / allowed_mutations_over_threshold, 1.0);
size_t delay_milliseconds = static_cast<size_t>(std::lerp(settings->min_delay_to_mutate_ms, settings->max_delay_to_mutate_ms, delay_factor));
size_t delay_milliseconds = static_cast<size_t>(interpolateLinear(settings->min_delay_to_mutate_ms, settings->max_delay_to_mutate_ms, delay_factor));
ProfileEvents::increment(ProfileEvents::DelayedMutations);
ProfileEvents::increment(ProfileEvents::DelayedMutationsMilliseconds, delay_milliseconds);

View File

@ -533,6 +533,10 @@ public:
size_t getActivePartsCount() const;
size_t getOutdatedPartsCount() const;
size_t getNumberOfOutdatedPartsWithExpiredRemovalTime() const;
/// Returns a pair with: max number of parts in partition across partitions; sum size of parts inside that partition.
/// (if there are multiple partitions with max number of parts, the sum size of parts is returned for arbitrary of them)
std::pair<size_t, size_t> getMaxPartsCountAndSizeForPartitionWithState(DataPartState state) const;
@ -1509,6 +1513,8 @@ private:
std::atomic<size_t> total_active_size_rows = 0;
std::atomic<size_t> total_active_size_parts = 0;
mutable std::atomic<size_t> total_outdated_parts_count = 0;
// Record all query ids which access the table. It's guarded by `query_id_set_mutex` and is always mutable.
mutable std::set<String> query_id_set TSA_GUARDED_BY(query_id_set_mutex);
mutable std::mutex query_id_set_mutex;

View File

@ -175,5 +175,29 @@ void MergeTreeSettings::sanityCheck(size_t background_pool_tasks) const
min_bytes_to_rebalance_partition_over_jbod,
max_bytes_to_merge_at_max_space_in_pool / 1024);
}
if (max_cleanup_delay_period < cleanup_delay_period)
{
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"The value of max_cleanup_delay_period setting ({}) must be greater than the value of cleanup_delay_period setting ({})",
max_cleanup_delay_period, cleanup_delay_period);
}
if (max_merge_selecting_sleep_ms < merge_selecting_sleep_ms)
{
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"The value of max_merge_selecting_sleep_ms setting ({}) must be greater than the value of merge_selecting_sleep_ms setting ({})",
max_merge_selecting_sleep_ms, merge_selecting_sleep_ms);
}
if (merge_selecting_sleep_slowdown_factor < 1.f)
{
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"The value of merge_selecting_sleep_slowdown_factor setting ({}) cannot be less than 1.0",
merge_selecting_sleep_slowdown_factor);
}
}
}

View File

@ -57,7 +57,9 @@ struct Settings;
M(Bool, fsync_part_directory, false, "Do fsync for part directory after all part operations (writes, renames, etc.).", 0) \
M(UInt64, non_replicated_deduplication_window, 0, "How many last blocks of hashes should be kept on disk (0 - disabled).", 0) \
M(UInt64, max_parts_to_merge_at_once, 100, "Max amount of parts which can be merged at once (0 - disabled). Doesn't affect OPTIMIZE FINAL query.", 0) \
M(UInt64, merge_selecting_sleep_ms, 5000, "Sleep time for merge selecting when no part selected, a lower setting will trigger selecting tasks in background_schedule_pool frequently which result in large amount of requests to zookeeper in large-scale clusters", 0) \
M(UInt64, merge_selecting_sleep_ms, 5000, "Maximum sleep time for merge selecting, a lower setting will trigger selecting tasks in background_schedule_pool frequently which result in large amount of requests to zookeeper in large-scale clusters", 0) \
M(UInt64, max_merge_selecting_sleep_ms, 60000, "Maximum sleep time for merge selecting, a lower setting will trigger selecting tasks in background_schedule_pool frequently which result in large amount of requests to zookeeper in large-scale clusters", 0) \
M(Float, merge_selecting_sleep_slowdown_factor, 1.2f, "The sleep time for merge selecting task is multiplied by this factor when there's nothing to merge and divided when a merge was assigned", 0) \
M(UInt64, merge_tree_clear_old_temporary_directories_interval_seconds, 60, "The period of executing the clear old temporary directories operation in background.", 0) \
M(UInt64, merge_tree_clear_old_parts_interval_seconds, 1, "The period of executing the clear old parts operation in background.", 0) \
M(UInt64, merge_tree_clear_old_broken_detached_parts_ttl_timeout_seconds, 1ULL * 3600 * 24 * 30, "Remove old broken detached parts in the background if they remained intouched for a specified by this setting period of time.", 0) \
@ -120,8 +122,10 @@ struct Settings;
\
/** Check delay of replicas settings. */ \
M(UInt64, min_relative_delay_to_measure, 120, "Calculate relative replica delay only if absolute delay is not less that this value.", 0) \
M(UInt64, cleanup_delay_period, 30, "Period to clean old queue logs, blocks hashes and parts.", 0) \
M(UInt64, cleanup_delay_period, 30, "Minimum period to clean old queue logs, blocks hashes and parts.", 0) \
M(UInt64, max_cleanup_delay_period, 300, "Maximum period to clean old queue logs, blocks hashes and parts.", 0) \
M(UInt64, cleanup_delay_period_random_add, 10, "Add uniformly distributed value from 0 to x seconds to cleanup_delay_period to avoid thundering herd effect and subsequent DoS of ZooKeeper in case of very large number of tables.", 0) \
M(UInt64, cleanup_thread_preferred_points_per_iteration, 150, "Preferred batch size for background cleanup (points are abstract but 1 point is approximately equivalent to 1 inserted block).", 0) \
M(UInt64, min_relative_delay_to_close, 300, "Minimal delay from other replicas to close, stop serving requests and not return Ok during status check.", 0) \
M(UInt64, min_absolute_delay_to_close, 0, "Minimal absolute delay to close, stop serving requests and not return Ok during status check.", 0) \
M(UInt64, enable_vertical_merge_algorithm, 1, "Enable usage of Vertical merge algorithm.", 0) \

View File

@ -25,19 +25,22 @@ ReplicatedMergeTreeCleanupThread::ReplicatedMergeTreeCleanupThread(StorageReplic
: storage(storage_)
, log_name(storage.getStorageID().getFullTableName() + " (ReplicatedMergeTreeCleanupThread)")
, log(&Poco::Logger::get(log_name))
, sleep_ms(storage.getSettings()->cleanup_delay_period * 1000)
{
task = storage.getContext()->getSchedulePool().createTask(log_name, [this]{ run(); });
}
void ReplicatedMergeTreeCleanupThread::run()
{
auto storage_settings = storage.getSettings();
const auto sleep_ms = storage_settings->cleanup_delay_period * 1000
+ std::uniform_int_distribution<UInt64>(0, storage_settings->cleanup_delay_period_random_add * 1000)(rng);
SCOPE_EXIT({ is_running.store(false, std::memory_order_relaxed); });
is_running.store(true, std::memory_order_relaxed);
auto storage_settings = storage.getSettings();
Float32 cleanup_points = 0;
try
{
iterate();
cleanup_points = iterate();
}
catch (const Coordination::Exception & e)
{
@ -51,39 +54,144 @@ void ReplicatedMergeTreeCleanupThread::run()
tryLogCurrentException(log, __PRETTY_FUNCTION__);
}
UInt64 prev_timestamp = prev_cleanup_timestamp_ms.load(std::memory_order_relaxed);
UInt64 now_ms = clock_gettime_ns_adjusted(prev_timestamp * 1'000'000) / 1'000'000;
/// Do not adjust sleep_ms on the first run after starting the server
if (prev_timestamp && storage_settings->cleanup_thread_preferred_points_per_iteration)
{
/// We don't want to run the task too often when the table was barely changed and there's almost nothing to cleanup.
/// But we cannot simply sleep max_cleanup_delay_period (300s) when nothing was cleaned up and cleanup_delay_period (30s)
/// when we removed something, because inserting one part per 30s will lead to running cleanup each 30s just to remove one part.
/// So we need some interpolation based on preferred batch size.
auto expected_cleanup_points = storage_settings->cleanup_thread_preferred_points_per_iteration;
/// How long should we sleep to remove cleanup_thread_preferred_points_per_iteration on the next iteration?
Float32 ratio = cleanup_points / expected_cleanup_points;
if (ratio == 0)
sleep_ms = storage_settings->max_cleanup_delay_period * 1000;
else
sleep_ms = static_cast<UInt64>(sleep_ms / ratio);
if (sleep_ms < storage_settings->cleanup_delay_period * 1000)
sleep_ms = storage_settings->cleanup_delay_period * 1000;
if (storage_settings->max_cleanup_delay_period * 1000 < sleep_ms)
sleep_ms = storage_settings->max_cleanup_delay_period * 1000;
UInt64 interval_ms = now_ms - prev_timestamp;
LOG_TRACE(log, "Scheduling next cleanup after {}ms (points: {}, interval: {}ms, ratio: {}, points per minute: {})",
sleep_ms, cleanup_points, interval_ms, ratio, cleanup_points / interval_ms * 60'000);
}
prev_cleanup_timestamp_ms.store(now_ms, std::memory_order_relaxed);
sleep_ms += std::uniform_int_distribution<UInt64>(0, storage_settings->cleanup_delay_period_random_add * 1000)(rng);
task->scheduleAfter(sleep_ms);
}
void ReplicatedMergeTreeCleanupThread::iterate()
void ReplicatedMergeTreeCleanupThread::wakeupEarlierIfNeeded()
{
storage.clearOldPartsAndRemoveFromZK();
/// It may happen that the tables was idle for a long time, but then a user started to aggressively insert (or mutate) data.
/// In this case, sleep_ms was set to the highest possible value, the task is not going to wake up soon,
/// but the number of objects to clean up is growing. We need to wakeup the task earlier.
auto storage_settings = storage.getSettings();
if (!storage_settings->cleanup_thread_preferred_points_per_iteration)
return;
/// The number of other objects (logs, blocks, etc) is usually correlated with the number of Outdated parts.
/// Do not wake up unless we have too many.
size_t number_of_outdated_objects = storage.getOutdatedPartsCount();
if (number_of_outdated_objects < storage_settings->cleanup_thread_preferred_points_per_iteration * 2)
return;
/// A race condition is possible here, but it's okay
if (is_running.load(std::memory_order_relaxed))
return;
/// Do not re-check all parts too often (avoid constantly calling getNumberOfOutdatedPartsWithExpiredRemovalTime())
if (!wakeup_check_timer.compareAndRestart(storage_settings->cleanup_delay_period / 4.0))
return;
UInt64 prev_run_timestamp_ms = prev_cleanup_timestamp_ms.load(std::memory_order_relaxed);
UInt64 now_ms = clock_gettime_ns_adjusted(prev_run_timestamp_ms * 1'000'000) / 1'000'000;
if (!prev_run_timestamp_ms || now_ms <= prev_run_timestamp_ms)
return;
/// Don't run it more often than cleanup_delay_period
UInt64 seconds_passed = (now_ms - prev_run_timestamp_ms) / 1000;
if (seconds_passed < storage_settings->cleanup_delay_period)
return;
/// Do not count parts that cannot be removed anyway. Do not wake up unless we have too many.
number_of_outdated_objects = storage.getNumberOfOutdatedPartsWithExpiredRemovalTime();
if (number_of_outdated_objects < storage_settings->cleanup_thread_preferred_points_per_iteration * 2)
return;
LOG_TRACE(log, "Waking up cleanup thread because there are {} outdated objects and previous cleanup finished {}s ago",
number_of_outdated_objects, seconds_passed);
wakeup();
}
Float32 ReplicatedMergeTreeCleanupThread::iterate()
{
size_t cleaned_logs = 0;
Float32 cleaned_blocks = 0;
size_t cleaned_other = 0;
size_t cleaned_part_like = 0;
size_t cleaned_parts = storage.clearOldPartsAndRemoveFromZK();
auto storage_settings = storage.getSettings();
{
auto lock = storage.lockForShare(RWLockImpl::NO_QUERY, storage.getSettings()->lock_acquire_timeout_for_background_operations);
/// Both use relative_data_path which changes during rename, so we
/// do it under share lock
storage.clearOldWriteAheadLogs();
storage.clearOldTemporaryDirectories(storage.getSettings()->temporary_directories_lifetime.totalSeconds());
cleaned_other += storage.clearOldWriteAheadLogs();
cleaned_part_like += storage.clearOldTemporaryDirectories(storage.getSettings()->temporary_directories_lifetime.totalSeconds());
if (storage.getSettings()->merge_tree_enable_clear_old_broken_detached)
storage.clearOldBrokenPartsFromDetachedDirectory();
cleaned_part_like += storage.clearOldBrokenPartsFromDetachedDirectory();
}
/// This is loose condition: no problem if we actually had lost leadership at this moment
/// and two replicas will try to do cleanup simultaneously.
if (storage.is_leader)
{
clearOldLogs();
auto storage_settings = storage.getSettings();
clearOldBlocks("blocks", storage_settings->replicated_deduplication_window_seconds, storage_settings->replicated_deduplication_window, cached_block_stats_for_sync_inserts);
clearOldBlocks("async_blocks", storage_settings->replicated_deduplication_window_seconds_for_async_inserts, storage_settings->replicated_deduplication_window_for_async_inserts, cached_block_stats_for_async_inserts);
clearOldMutations();
storage.clearEmptyParts();
cleaned_logs = clearOldLogs();
size_t normal_blocks = clearOldBlocks("blocks", storage_settings->replicated_deduplication_window_seconds,
storage_settings->replicated_deduplication_window, cached_block_stats_for_sync_inserts);
size_t async_blocks = clearOldBlocks("async_blocks",
storage_settings->replicated_deduplication_window_seconds_for_async_inserts,
storage_settings->replicated_deduplication_window_for_async_inserts,
cached_block_stats_for_async_inserts);
/// Many async blocks are transformed into one ordinary block
Float32 async_blocks_per_block = static_cast<Float32>(storage_settings->replicated_deduplication_window) /
(storage_settings->replicated_deduplication_window_for_async_inserts + 1);
cleaned_blocks = (normal_blocks + async_blocks * async_blocks_per_block) / 2;
cleaned_other += clearOldMutations();
cleaned_part_like += storage.clearEmptyParts();
}
/// We need to measure the number of removed objects somehow (for better scheduling),
/// but just summing the number of removed async blocks, logs, and empty parts does not make any sense.
/// So we are trying to (approximately) measure the number of inserted blocks/parts, so we will be able to compare apples to apples.
/// Each inserted block produces 3 objects that have to be cleaned up: one block, one log entry and one part.
/// A few new parts get merged together producing one log entry and one part.
/// Other objects (like mutations and WALs) are much more rare than Outdated parts (because mutations usually produce
/// many Outdated parts, and WALs usually contain many parts too). We count then as one part for simplicity.
constexpr Float32 parts_number_amplification = 1.3f; /// Assuming we merge 4-5 parts each time
Float32 cleaned_inserted_parts = (cleaned_blocks + (cleaned_logs + cleaned_parts) / parts_number_amplification) / 3;
return cleaned_inserted_parts + cleaned_part_like + cleaned_other;
}
void ReplicatedMergeTreeCleanupThread::clearOldLogs()
size_t ReplicatedMergeTreeCleanupThread::clearOldLogs()
{
auto zookeeper = storage.getZooKeeper();
auto storage_settings = storage.getSettings();
@ -102,7 +210,7 @@ void ReplicatedMergeTreeCleanupThread::clearOldLogs()
size_t min_replicated_logs_to_keep = static_cast<size_t>(storage_settings->min_replicated_logs_to_keep * ratio);
if (static_cast<double>(children_count) < min_replicated_logs_to_keep)
return;
return 0;
Strings replicas = zookeeper->getChildren(storage.zookeeper_path + "/replicas", &stat);
@ -114,7 +222,7 @@ void ReplicatedMergeTreeCleanupThread::clearOldLogs()
Strings entries = zookeeper->getChildren(storage.zookeeper_path + "/log");
if (entries.empty())
return;
return 0;
::sort(entries.begin(), entries.end());
@ -227,7 +335,7 @@ void ReplicatedMergeTreeCleanupThread::clearOldLogs()
entries.erase(std::lower_bound(entries.begin(), entries.end(), "log-" + padIndex(min_saved_log_pointer)), entries.end());
if (entries.empty())
return;
return 0;
markLostReplicas(
host_versions_lost_replicas,
@ -268,6 +376,8 @@ void ReplicatedMergeTreeCleanupThread::clearOldLogs()
if (i != 0)
LOG_DEBUG(log, "Removed {} old log entries: {} - {}", i, entries[0], entries[i - 1]);
return i;
}
@ -323,7 +433,7 @@ struct ReplicatedMergeTreeCleanupThread::NodeWithStat
}
};
void ReplicatedMergeTreeCleanupThread::clearOldBlocks(const String & blocks_dir_name, UInt64 window_seconds, UInt64 window_size, NodeCTimeAndVersionCache & cached_block_stats)
size_t ReplicatedMergeTreeCleanupThread::clearOldBlocks(const String & blocks_dir_name, UInt64 window_seconds, UInt64 window_size, NodeCTimeAndVersionCache & cached_block_stats)
{
auto zookeeper = storage.getZooKeeper();
@ -331,7 +441,7 @@ void ReplicatedMergeTreeCleanupThread::clearOldBlocks(const String & blocks_dir_
getBlocksSortedByTime(blocks_dir_name, *zookeeper, timed_blocks, cached_block_stats);
if (timed_blocks.empty())
return;
return 0;
/// Use ZooKeeper's first node (last according to time) timestamp as "current" time.
Int64 current_time = timed_blocks.front().ctime;
@ -350,7 +460,7 @@ void ReplicatedMergeTreeCleanupThread::clearOldBlocks(const String & blocks_dir_
auto num_nodes_to_delete = timed_blocks.end() - first_outdated_block;
if (!num_nodes_to_delete)
return;
return 0;
auto last_outdated_block = timed_blocks.end() - 1;
LOG_TRACE(log, "Will clear {} old blocks from {} (ctime {}) to {} (ctime {})", num_nodes_to_delete,
@ -388,6 +498,7 @@ void ReplicatedMergeTreeCleanupThread::clearOldBlocks(const String & blocks_dir_
}
LOG_TRACE(log, "Cleared {} old blocks from ZooKeeper", num_nodes_to_delete);
return num_nodes_to_delete;
}
@ -456,17 +567,17 @@ void ReplicatedMergeTreeCleanupThread::getBlocksSortedByTime(const String & bloc
}
void ReplicatedMergeTreeCleanupThread::clearOldMutations()
size_t ReplicatedMergeTreeCleanupThread::clearOldMutations()
{
auto storage_settings = storage.getSettings();
if (!storage_settings->finished_mutations_to_keep)
return;
return 0;
if (storage.queue.countFinishedMutations() <= storage_settings->finished_mutations_to_keep)
{
/// Not strictly necessary, but helps to avoid unnecessary ZooKeeper requests.
/// If even this replica hasn't finished enough mutations yet, then we don't need to clean anything.
return;
return 0;
}
auto zookeeper = storage.getZooKeeper();
@ -481,7 +592,7 @@ void ReplicatedMergeTreeCleanupThread::clearOldMutations()
// No Need to check return value to delete mutations.
zookeeper->tryGet(storage.zookeeper_path + "/replicas/" + replica + "/mutation_pointer", pointer);
if (pointer.empty())
return; /// One replica hasn't done anything yet so we can't delete any mutations.
return 0; /// One replica hasn't done anything yet so we can't delete any mutations.
min_pointer = std::min(parse<UInt64>(pointer), min_pointer);
}
@ -492,11 +603,11 @@ void ReplicatedMergeTreeCleanupThread::clearOldMutations()
entries.erase(std::upper_bound(entries.begin(), entries.end(), padIndex(min_pointer)), entries.end());
/// Do not remove last `storage_settings->finished_mutations_to_keep` entries.
if (entries.size() <= storage_settings->finished_mutations_to_keep)
return;
return 0;
entries.erase(entries.end() - storage_settings->finished_mutations_to_keep, entries.end());
if (entries.empty())
return;
return 0;
Coordination::Requests ops;
size_t batch_start_i = 0;
@ -526,6 +637,8 @@ void ReplicatedMergeTreeCleanupThread::clearOldMutations()
ops.clear();
}
}
return entries.size();
}
}

View File

@ -4,6 +4,7 @@
#include <Common/ZooKeeper/Types.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/randomSeed.h>
#include <Common/Stopwatch.h>
#include <Core/BackgroundSchedulePool.h>
#include <thread>
@ -31,6 +32,8 @@ public:
void stop() { task->deactivate(); }
void wakeupEarlierIfNeeded();
private:
StorageReplicatedMergeTree & storage;
String log_name;
@ -38,11 +41,20 @@ private:
BackgroundSchedulePool::TaskHolder task;
pcg64 rng{randomSeed()};
void run();
void iterate();
UInt64 sleep_ms;
/// Remove old records from ZooKeeper.
void clearOldLogs();
std::atomic<UInt64> prev_cleanup_timestamp_ms = 0;
std::atomic<bool> is_running = false;
AtomicStopwatch wakeup_check_timer;
void run();
/// Returns a number this is directly proportional to the number of cleaned up blocks
Float32 iterate();
/// Remove old records from ZooKeeper. Returns the number of removed logs
size_t clearOldLogs();
/// The replica is marked as "lost" if it is inactive and its log pointer
/// is far behind and we are not going to keep logs for it.
@ -52,11 +64,11 @@ private:
size_t replicas_count, const zkutil::ZooKeeperPtr & zookeeper);
using NodeCTimeAndVersionCache = std::map<String, std::pair<Int64, Int32>>;
/// Remove old block hashes from ZooKeeper. This is done by the leader replica.
void clearOldBlocks(const String & blocks_dir_name, UInt64 window_seconds, UInt64 window_size, NodeCTimeAndVersionCache & cached_block_stats);
/// Remove old block hashes from ZooKeeper. This is done by the leader replica. Returns the number of removed blocks
size_t clearOldBlocks(const String & blocks_dir_name, UInt64 window_seconds, UInt64 window_size, NodeCTimeAndVersionCache & cached_block_stats);
/// Remove old mutations that are done from ZooKeeper. This is done by the leader replica.
void clearOldMutations();
/// Remove old mutations that are done from ZooKeeper. This is done by the leader replica. Returns the number of removed mutations
size_t clearOldMutations();
NodeCTimeAndVersionCache cached_block_stats_for_sync_inserts;
NodeCTimeAndVersionCache cached_block_stats_for_async_inserts;

View File

@ -28,7 +28,7 @@ struct Estimator
{
double difference = std::abs(log2(static_cast<double>(sum_size) / size_prev_at_left));
if (difference < settings.heuristic_to_align_parts_max_absolute_difference_in_powers_of_two)
current_score *= std::lerp(settings.heuristic_to_align_parts_max_score_adjustment, 1,
current_score *= interpolateLinear(settings.heuristic_to_align_parts_max_score_adjustment, 1,
difference / settings.heuristic_to_align_parts_max_absolute_difference_in_powers_of_two);
}
@ -115,8 +115,8 @@ bool allow(
// std::cerr << "size_normalized: " << size_normalized << "\n";
/// Calculate boundaries for age
double min_age_to_lower_base = std::lerp(settings.min_age_to_lower_base_at_min_size, settings.min_age_to_lower_base_at_max_size, size_normalized);
double max_age_to_lower_base = std::lerp(settings.max_age_to_lower_base_at_min_size, settings.max_age_to_lower_base_at_max_size, size_normalized);
double min_age_to_lower_base = interpolateLinear(settings.min_age_to_lower_base_at_min_size, settings.min_age_to_lower_base_at_max_size, size_normalized);
double max_age_to_lower_base = interpolateLinear(settings.max_age_to_lower_base_at_min_size, settings.max_age_to_lower_base_at_max_size, size_normalized);
// std::cerr << "min_age_to_lower_base: " << min_age_to_lower_base << "\n";
// std::cerr << "max_age_to_lower_base: " << max_age_to_lower_base << "\n";
@ -137,7 +137,7 @@ bool allow(
// std::cerr << "combined_ratio: " << combined_ratio << "\n";
double lowered_base = std::lerp(settings.base, 2.0, combined_ratio);
double lowered_base = interpolateLinear(settings.base, 2.0, combined_ratio);
// std::cerr << "------- lowered_base: " << lowered_base << "\n";

View File

@ -1303,8 +1303,7 @@ bool StorageMergeTree::scheduleDataProcessingJob(BackgroundJobsAssignee & assign
/// which is equal or more fresh than commands themselves. In extremely rare case it can happen that we will have alter
/// in between we took snapshot above and selected commands. That is why we take new snapshot here.
auto task = std::make_shared<MutatePlainMergeTreeTask>(*this, getInMemoryMetadataPtr(), mutate_entry, shared_lock, common_assignee_trigger);
assignee.scheduleMergeMutateTask(task);
return true;
return assignee.scheduleMergeMutateTask(task);
}
if (has_mutations)
{

View File

@ -4,6 +4,7 @@
#include <ranges>
#include <base/hex.h>
#include <base/interpolate.h>
#include <Common/Macros.h>
#include <Common/MemoryTracker.h>
#include <Common/ProfileEventsScope.h>
@ -334,6 +335,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
/// Will be activated if we will achieve leader state.
merge_selecting_task->deactivate();
merge_selecting_sleep_ms = getSettings()->merge_selecting_sleep_ms;
mutations_finalizing_task = getContext()->getSchedulePool().createTask(
getStorageID().getFullTableName() + " (StorageReplicatedMergeTree::mutationsFinalizingTask)", [this] { mutationsFinalizingTask(); });
@ -424,6 +426,19 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
loadDataParts(skip_sanity_checks);
if (attach)
{
/// Provide better initial value of merge_selecting_sleep_ms on server startup
auto settings = getSettings();
size_t max_parts_in_partition = getMaxPartsCountAndSizeForPartition().first;
if (settings->parts_to_delay_insert && max_parts_in_partition < settings->parts_to_delay_insert)
{
Float64 ratio = 1.0 - static_cast<Float64>(max_parts_in_partition) / settings->parts_to_delay_insert;
merge_selecting_sleep_ms = static_cast<UInt64>(interpolateLinear(settings->merge_selecting_sleep_ms,
settings->max_merge_selecting_sleep_ms, ratio));
}
}
if (!current_zookeeper)
{
if (!attach)
@ -3236,6 +3251,8 @@ bool StorageReplicatedMergeTree::processQueueEntry(ReplicatedMergeTreeQueue::Sel
bool StorageReplicatedMergeTree::scheduleDataProcessingJob(BackgroundJobsAssignee & assignee)
{
cleanup_thread.wakeupEarlierIfNeeded();
/// If replication queue is stopped exit immediately as we successfully executed the task
if (queue.actions_blocker.isCancelled())
return false;
@ -3324,7 +3341,15 @@ void StorageReplicatedMergeTree::mergeSelectingTask()
const bool cleanup = (storage_settings_ptr->clean_deleted_rows != CleanDeletedRows::Never);
CreateMergeEntryResult create_result = CreateMergeEntryResult::Other;
try
enum class AttemptStatus
{
EntryCreated,
NeedRetry,
Limited,
CannotSelect,
};
auto try_assign_merge = [&]() -> AttemptStatus
{
/// We must select parts for merge under merge_selecting_mutex because other threads
/// (OPTIMIZE queries) can assign new merges.
@ -3346,108 +3371,136 @@ void StorageReplicatedMergeTree::mergeSelectingTask()
"Current background tasks memory usage: {}.",
formatReadableSizeWithBinarySuffix(background_memory_tracker.getSoftLimit()),
formatReadableSizeWithBinarySuffix(background_memory_tracker.get()));
return AttemptStatus::Limited;
}
else if (merges_and_mutations_sum >= storage_settings_ptr->max_replicated_merges_in_queue)
if (merges_and_mutations_sum >= storage_settings_ptr->max_replicated_merges_in_queue)
{
LOG_TRACE(log, "Number of queued merges ({}) and part mutations ({})"
" is greater than max_replicated_merges_in_queue ({}), so won't select new parts to merge or mutate.",
merges_and_mutations_queued.merges,
merges_and_mutations_queued.mutations,
storage_settings_ptr->max_replicated_merges_in_queue);
return AttemptStatus::Limited;
}
else
UInt64 max_source_parts_size_for_merge = merger_mutator.getMaxSourcePartsSizeForMerge(
storage_settings_ptr->max_replicated_merges_in_queue, merges_and_mutations_sum);
UInt64 max_source_part_size_for_mutation = merger_mutator.getMaxSourcePartSizeForMutation();
bool merge_with_ttl_allowed = merges_and_mutations_queued.merges_with_ttl < storage_settings_ptr->max_replicated_merges_with_ttl_in_queue &&
getTotalMergesWithTTLInMergeList() < storage_settings_ptr->max_number_of_merges_with_ttl_in_pool;
auto future_merged_part = std::make_shared<FutureMergedMutatedPart>();
if (storage_settings.get()->assign_part_uuids)
future_merged_part->uuid = UUIDHelpers::generateV4();
bool can_assign_merge = max_source_parts_size_for_merge > 0;
PartitionIdsHint partitions_to_merge_in;
if (can_assign_merge)
{
UInt64 max_source_parts_size_for_merge = merger_mutator.getMaxSourcePartsSizeForMerge(
storage_settings_ptr->max_replicated_merges_in_queue, merges_and_mutations_sum);
auto lightweight_merge_pred = LocalMergePredicate(queue);
partitions_to_merge_in = merger_mutator.getPartitionsThatMayBeMerged(
max_source_parts_size_for_merge, lightweight_merge_pred, merge_with_ttl_allowed, NO_TRANSACTION_PTR);
if (partitions_to_merge_in.empty())
can_assign_merge = false;
else
merge_pred.emplace(queue.getMergePredicate(zookeeper, partitions_to_merge_in));
}
UInt64 max_source_part_size_for_mutation = merger_mutator.getMaxSourcePartSizeForMutation();
if (can_assign_merge &&
merger_mutator.selectPartsToMerge(future_merged_part, false, max_source_parts_size_for_merge, *merge_pred,
merge_with_ttl_allowed, NO_TRANSACTION_PTR, nullptr, &partitions_to_merge_in) == SelectPartsDecision::SELECTED)
{
create_result = createLogEntryToMergeParts(
zookeeper,
future_merged_part->parts,
future_merged_part->name,
future_merged_part->uuid,
future_merged_part->part_format,
deduplicate,
deduplicate_by_columns,
cleanup,
nullptr,
merge_pred->getVersion(),
future_merged_part->merge_type);
bool merge_with_ttl_allowed = merges_and_mutations_queued.merges_with_ttl < storage_settings_ptr->max_replicated_merges_with_ttl_in_queue &&
getTotalMergesWithTTLInMergeList() < storage_settings_ptr->max_number_of_merges_with_ttl_in_pool;
auto future_merged_part = std::make_shared<FutureMergedMutatedPart>();
if (storage_settings.get()->assign_part_uuids)
future_merged_part->uuid = UUIDHelpers::generateV4();
if (create_result == CreateMergeEntryResult::Ok)
return AttemptStatus::EntryCreated;
if (create_result == CreateMergeEntryResult::LogUpdated)
return AttemptStatus::NeedRetry;
}
bool can_assign_merge = max_source_parts_size_for_merge > 0;
PartitionIdsHint partitions_to_merge_in;
if (can_assign_merge)
/// If there are many mutations in queue, it may happen, that we cannot enqueue enough merges to merge all new parts
if (max_source_part_size_for_mutation == 0 || merges_and_mutations_queued.mutations >= storage_settings_ptr->max_replicated_mutations_in_queue)
return AttemptStatus::Limited;
if (queue.countMutations() > 0)
{
/// We don't need the list of committing blocks to choose a part to mutate
if (!merge_pred)
merge_pred.emplace(queue.getMergePredicate(zookeeper, PartitionIdsHint{}));
/// Choose a part to mutate.
DataPartsVector data_parts = getDataPartsVectorForInternalUsage();
for (const auto & part : data_parts)
{
auto lightweight_merge_pred = LocalMergePredicate(queue);
partitions_to_merge_in = merger_mutator.getPartitionsThatMayBeMerged(
max_source_parts_size_for_merge, lightweight_merge_pred, merge_with_ttl_allowed, NO_TRANSACTION_PTR);
if (partitions_to_merge_in.empty())
can_assign_merge = false;
else
merge_pred.emplace(queue.getMergePredicate(zookeeper, partitions_to_merge_in));
}
if (part->getBytesOnDisk() > max_source_part_size_for_mutation)
continue;
if (can_assign_merge &&
merger_mutator.selectPartsToMerge(future_merged_part, false, max_source_parts_size_for_merge, *merge_pred,
merge_with_ttl_allowed, NO_TRANSACTION_PTR, nullptr, &partitions_to_merge_in) == SelectPartsDecision::SELECTED)
{
create_result = createLogEntryToMergeParts(
zookeeper,
future_merged_part->parts,
future_merged_part->name,
std::optional<std::pair<Int64, int>> desired_mutation_version = merge_pred->getDesiredMutationVersion(part);
if (!desired_mutation_version)
continue;
create_result = createLogEntryToMutatePart(
*part,
future_merged_part->uuid,
future_merged_part->part_format,
deduplicate,
deduplicate_by_columns,
cleanup,
nullptr,
merge_pred->getVersion(),
future_merged_part->merge_type);
}
/// If there are many mutations in queue, it may happen, that we cannot enqueue enough merges to merge all new parts
else if (max_source_part_size_for_mutation > 0 && queue.countMutations() > 0
&& merges_and_mutations_queued.mutations < storage_settings_ptr->max_replicated_mutations_in_queue)
{
/// We don't need the list of committing blocks to choose a part to mutate
if (!merge_pred)
merge_pred.emplace(queue.getMergePredicate(zookeeper, PartitionIdsHint{}));
desired_mutation_version->first,
desired_mutation_version->second,
merge_pred->getVersion());
/// Choose a part to mutate.
DataPartsVector data_parts = getDataPartsVectorForInternalUsage();
for (const auto & part : data_parts)
{
if (part->getBytesOnDisk() > max_source_part_size_for_mutation)
continue;
std::optional<std::pair<Int64, int>> desired_mutation_version = merge_pred->getDesiredMutationVersion(part);
if (!desired_mutation_version)
continue;
create_result = createLogEntryToMutatePart(
*part,
future_merged_part->uuid,
desired_mutation_version->first,
desired_mutation_version->second,
merge_pred->getVersion());
if (create_result == CreateMergeEntryResult::Ok ||
create_result == CreateMergeEntryResult::LogUpdated)
break;
}
if (create_result == CreateMergeEntryResult::Ok)
return AttemptStatus::EntryCreated;
if (create_result == CreateMergeEntryResult::LogUpdated)
return AttemptStatus::NeedRetry;
}
}
return AttemptStatus::CannotSelect;
};
AttemptStatus result = AttemptStatus::CannotSelect;
try
{
result = try_assign_merge();
}
catch (...)
{
tryLogCurrentException(log, __PRETTY_FUNCTION__);
}
if (!is_leader)
return;
if (create_result != CreateMergeEntryResult::Ok
&& create_result != CreateMergeEntryResult::LogUpdated)
{
merge_selecting_task->scheduleAfter(storage_settings_ptr->merge_selecting_sleep_ms);
}
Float32 new_sleep_ms = merge_selecting_sleep_ms;
if (result == AttemptStatus::EntryCreated || result == AttemptStatus::NeedRetry)
new_sleep_ms /= storage_settings_ptr->merge_selecting_sleep_slowdown_factor;
else if (result == AttemptStatus::CannotSelect)
new_sleep_ms *= storage_settings_ptr->merge_selecting_sleep_slowdown_factor;
new_sleep_ms *= std::uniform_real_distribution<Float32>(1.f, 1.1f)(thread_local_rng);
merge_selecting_sleep_ms = static_cast<UInt64>(new_sleep_ms);
if (merge_selecting_sleep_ms < storage_settings_ptr->merge_selecting_sleep_ms)
merge_selecting_sleep_ms = storage_settings_ptr->merge_selecting_sleep_ms;
if (merge_selecting_sleep_ms > storage_settings_ptr->max_merge_selecting_sleep_ms)
merge_selecting_sleep_ms = storage_settings_ptr->max_merge_selecting_sleep_ms;
if (result == AttemptStatus::EntryCreated)
merge_selecting_task->schedule();
else
{
merge_selecting_task->schedule();
LOG_TRACE(log, "Scheduling next merge selecting task after {}ms", merge_selecting_sleep_ms);
merge_selecting_task->scheduleAfter(merge_selecting_sleep_ms);
}
}
@ -5500,6 +5553,7 @@ void StorageReplicatedMergeTree::alter(
if (mutation_znode)
{
LOG_DEBUG(log, "Metadata changes applied. Will wait for data changes.");
merge_selecting_task->schedule();
waitMutation(*mutation_znode, query_context->getSettingsRef().alter_sync);
LOG_DEBUG(log, "Data changes applied.");
}
@ -6651,6 +6705,8 @@ void StorageReplicatedMergeTree::mutate(const MutationCommands & commands, Conte
throw Coordination::Exception("Unable to create a mutation znode", rc);
}
merge_selecting_task->schedule();
waitMutation(mutation_entry.znode_name, query_context->getSettingsRef().mutations_sync);
}
@ -6716,7 +6772,7 @@ bool StorageReplicatedMergeTree::hasLightweightDeletedMask() const
return has_lightweight_delete_parts.load(std::memory_order_relaxed);
}
void StorageReplicatedMergeTree::clearOldPartsAndRemoveFromZK()
size_t StorageReplicatedMergeTree::clearOldPartsAndRemoveFromZK()
{
auto table_lock = lockForShare(RWLockImpl::NO_QUERY, getSettings()->lock_acquire_timeout_for_background_operations);
auto zookeeper = getZooKeeper();
@ -6724,15 +6780,16 @@ void StorageReplicatedMergeTree::clearOldPartsAndRemoveFromZK()
/// Now these parts are in Deleting state. If we fail to remove some of them we must roll them back to Outdated state.
/// Otherwise they will not be deleted.
DataPartsVector parts = grabOldParts();
size_t total_parts_to_remove = parts.size();
if (parts.empty())
return;
return total_parts_to_remove;
NOEXCEPT_SCOPE({ clearOldPartsAndRemoveFromZKImpl(zookeeper, std::move(parts)); });
return total_parts_to_remove;
}
void StorageReplicatedMergeTree::clearOldPartsAndRemoveFromZKImpl(zkutil::ZooKeeperPtr zookeeper, DataPartsVector && parts)
{
DataPartsVector parts_to_delete_only_from_filesystem; // Only duplicates
DataPartsVector parts_to_delete_completely; // All parts except duplicates
DataPartsVector parts_to_retry_deletion; // Parts that should be retried due to network problems

View File

@ -342,8 +342,8 @@ public:
private:
std::atomic_bool are_restoring_replica {false};
/// Delete old parts from disk and from ZooKeeper.
void clearOldPartsAndRemoveFromZK();
/// Delete old parts from disk and from ZooKeeper. Returns the number of removed parts
size_t clearOldPartsAndRemoveFromZK();
void clearOldPartsAndRemoveFromZKImpl(zkutil::ZooKeeperPtr zookeeper, DataPartsVector && parts);
template<bool async_insert>
@ -458,6 +458,8 @@ private:
/// It is acquired for each iteration of the selection of parts to merge or each OPTIMIZE query.
std::mutex merge_selecting_mutex;
UInt64 merge_selecting_sleep_ms;
/// A task that marks finished mutations as done.
BackgroundSchedulePool::TaskHolder mutations_finalizing_task;

View File

@ -2,6 +2,8 @@
<merge_tree>
<enable_the_endpoint_id_with_zookeeper_name_prefix>1</enable_the_endpoint_id_with_zookeeper_name_prefix>
<number_of_free_entries_in_pool_to_execute_mutation>8</number_of_free_entries_in_pool_to_execute_mutation>
<max_cleanup_delay_period>60</max_cleanup_delay_period>
<cleanup_thread_preferred_points_per_iteration>10</cleanup_thread_preferred_points_per_iteration>
</merge_tree>
<replicated_merge_tree_paranoid_check_on_drop_range>1</replicated_merge_tree_paranoid_check_on_drop_range>

View File

@ -141,7 +141,8 @@ def test_remove_broken_detached_part_replicated_merge_tree(started_cluster):
merge_tree_enable_clear_old_broken_detached=1,
merge_tree_clear_old_broken_detached_parts_ttl_timeout_seconds=5,
cleanup_delay_period=1,
cleanup_delay_period_random_add=0;
cleanup_delay_period_random_add=0,
cleanup_thread_preferred_points_per_iteration=0;
"""
)

View File

@ -25,7 +25,7 @@ def test_merge_and_part_corruption(started_cluster):
"""
CREATE TABLE replicated_mt(date Date, id UInt32, value Int32)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/replicated_mt', '{replica}') ORDER BY id
SETTINGS cleanup_delay_period=1, cleanup_delay_period_random_add=1;
SETTINGS cleanup_delay_period=1, cleanup_delay_period_random_add=1, cleanup_thread_preferred_points_per_iteration=0;
""".format(
replica=node1.name
)

View File

@ -14,11 +14,13 @@ def initialize_database(nodes, shard):
CREATE TABLE `{database}`.src (p UInt64, d UInt64)
ENGINE = ReplicatedMergeTree('/clickhouse/{database}/tables/test_consistent_shard1{shard}/replicated', '{replica}')
ORDER BY d PARTITION BY p
SETTINGS min_replicated_logs_to_keep=3, max_replicated_logs_to_keep=5, cleanup_delay_period=0, cleanup_delay_period_random_add=0;
SETTINGS min_replicated_logs_to_keep=3, max_replicated_logs_to_keep=5,
cleanup_delay_period=0, cleanup_delay_period_random_add=0, cleanup_thread_preferred_points_per_iteration=0;
CREATE TABLE `{database}`.dest (p UInt64, d UInt64)
ENGINE = ReplicatedMergeTree('/clickhouse/{database}/tables/test_consistent_shard2{shard}/replicated', '{replica}')
ORDER BY d PARTITION BY p
SETTINGS min_replicated_logs_to_keep=3, max_replicated_logs_to_keep=5, cleanup_delay_period=0, cleanup_delay_period_random_add=0, temporary_directories_lifetime=1;
SETTINGS min_replicated_logs_to_keep=3, max_replicated_logs_to_keep=5,
cleanup_delay_period=0, cleanup_delay_period_random_add=0, cleanup_thread_preferred_points_per_iteration=0, temporary_directories_lifetime=1;
""".format(
shard=shard, replica=node.name, database=CLICKHOUSE_DATABASE
)

View File

@ -13,7 +13,8 @@ def fill_nodes(nodes, shard):
CREATE TABLE test_table(date Date, id UInt32)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test{shard}/replicated', '{replica}')
ORDER BY id PARTITION BY toYYYYMM(date)
SETTINGS min_replicated_logs_to_keep=3, max_replicated_logs_to_keep=5, cleanup_delay_period=0, cleanup_delay_period_random_add=0;
SETTINGS min_replicated_logs_to_keep=3, max_replicated_logs_to_keep=5, cleanup_delay_period=0,
cleanup_delay_period_random_add=0, cleanup_thread_preferred_points_per_iteration=0;
""".format(
shard=shard, replica=node.name
)

View File

@ -11,7 +11,8 @@ def fill_nodes(nodes, shard):
CREATE DATABASE test;
CREATE TABLE test.test_table(date Date, id UInt32)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/{shard}/replicated/test_table', '{replica}') ORDER BY id PARTITION BY toYYYYMM(date) SETTINGS min_replicated_logs_to_keep=3, max_replicated_logs_to_keep=5, cleanup_delay_period=0, cleanup_delay_period_random_add=0;
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/{shard}/replicated/test_table', '{replica}') ORDER BY id PARTITION BY toYYYYMM(date)
SETTINGS min_replicated_logs_to_keep=3, max_replicated_logs_to_keep=5, cleanup_delay_period=0, cleanup_delay_period_random_add=0, cleanup_thread_preferred_points_per_iteration=0;
""".format(
shard=shard, replica=node.name
)
@ -22,7 +23,8 @@ def fill_nodes(nodes, shard):
CREATE DATABASE test1;
CREATE TABLE test1.test_table(date Date, id UInt32)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test1/{shard}/replicated/test_table', '{replica}') ORDER BY id PARTITION BY toYYYYMM(date) SETTINGS min_replicated_logs_to_keep=3, max_replicated_logs_to_keep=5, cleanup_delay_period=0, cleanup_delay_period_random_add=0;
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test1/{shard}/replicated/test_table', '{replica}') ORDER BY id PARTITION BY toYYYYMM(date)
SETTINGS min_replicated_logs_to_keep=3, max_replicated_logs_to_keep=5, cleanup_delay_period=0, cleanup_delay_period_random_add=0, cleanup_thread_preferred_points_per_iteration=0;
""".format(
shard=shard, replica=node.name
)
@ -33,7 +35,8 @@ def fill_nodes(nodes, shard):
CREATE DATABASE test2;
CREATE TABLE test2.test_table(date Date, id UInt32)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test2/{shard}/replicated/test_table', '{replica}') ORDER BY id PARTITION BY toYYYYMM(date) SETTINGS min_replicated_logs_to_keep=3, max_replicated_logs_to_keep=5, cleanup_delay_period=0, cleanup_delay_period_random_add=0;
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test2/{shard}/replicated/test_table', '{replica}') ORDER BY id PARTITION BY toYYYYMM(date)
SETTINGS min_replicated_logs_to_keep=3, max_replicated_logs_to_keep=5, cleanup_delay_period=0, cleanup_delay_period_random_add=0, cleanup_thread_preferred_points_per_iteration=0;
""".format(
shard=shard, replica=node.name
)
@ -44,7 +47,8 @@ def fill_nodes(nodes, shard):
CREATE DATABASE test3;
CREATE TABLE test3.test_table(date Date, id UInt32)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test3/{shard}/replicated/test_table', '{replica}') ORDER BY id PARTITION BY toYYYYMM(date) SETTINGS min_replicated_logs_to_keep=3, max_replicated_logs_to_keep=5, cleanup_delay_period=0, cleanup_delay_period_random_add=0;
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test3/{shard}/replicated/test_table', '{replica}') ORDER BY id PARTITION BY toYYYYMM(date)
SETTINGS min_replicated_logs_to_keep=3, max_replicated_logs_to_keep=5, cleanup_delay_period=0, cleanup_delay_period_random_add=0, cleanup_thread_preferred_points_per_iteration=0;
""".format(
shard=shard, replica=node.name
)
@ -55,7 +59,8 @@ def fill_nodes(nodes, shard):
CREATE DATABASE test4;
CREATE TABLE test4.test_table(date Date, id UInt32)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test4/{shard}/replicated/test_table', '{replica}') ORDER BY id PARTITION BY toYYYYMM(date) SETTINGS min_replicated_logs_to_keep=3, max_replicated_logs_to_keep=5, cleanup_delay_period=0, cleanup_delay_period_random_add=0;
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test4/{shard}/replicated/test_table', '{replica}') ORDER BY id PARTITION BY toYYYYMM(date)
SETTINGS min_replicated_logs_to_keep=3, max_replicated_logs_to_keep=5, cleanup_delay_period=0, cleanup_delay_period_random_add=0, cleanup_thread_preferred_points_per_iteration=0;
""".format(
shard=shard, replica=node.name
)

View File

@ -134,6 +134,7 @@ def test_replicated_balanced_merge_fetch(start_cluster):
old_parts_lifetime = 1,
cleanup_delay_period = 1,
cleanup_delay_period_random_add = 2,
cleanup_thread_preferred_points_per_iteration=0,
min_bytes_to_rebalance_partition_over_jbod = 1024,
max_bytes_to_merge_at_max_space_in_pool = 4096
""".format(

View File

@ -58,6 +58,7 @@ def test_jbod_ha(start_cluster):
old_parts_lifetime = 1,
cleanup_delay_period = 1,
cleanup_delay_period_random_add = 2,
cleanup_thread_preferred_points_per_iteration=0,
max_bytes_to_merge_at_max_space_in_pool = 4096
""".format(
i

View File

@ -42,7 +42,8 @@ def test_lost_part_same_replica(start_cluster):
for node in [node1, node2]:
node.query(
f"CREATE TABLE mt0 (id UInt64, date Date) ENGINE ReplicatedMergeTree('/clickhouse/tables/t', '{node.name}') ORDER BY tuple() PARTITION BY date "
"SETTINGS cleanup_delay_period=1, cleanup_delay_period_random_add=1"
"SETTINGS cleanup_delay_period=1, cleanup_delay_period_random_add=1, cleanup_thread_preferred_points_per_iteration=0,"
"merge_selecting_sleep_ms=100, max_merge_selecting_sleep_ms=1000"
)
node1.query("SYSTEM STOP MERGES mt0")
@ -109,7 +110,8 @@ def test_lost_part_other_replica(start_cluster):
for node in [node1, node2]:
node.query(
f"CREATE TABLE mt1 (id UInt64) ENGINE ReplicatedMergeTree('/clickhouse/tables/t1', '{node.name}') ORDER BY tuple() "
"SETTINGS cleanup_delay_period=1, cleanup_delay_period_random_add=1"
"SETTINGS cleanup_delay_period=1, cleanup_delay_period_random_add=1, cleanup_thread_preferred_points_per_iteration=0,"
"merge_selecting_sleep_ms=100, max_merge_selecting_sleep_ms=1000"
)
node1.query("SYSTEM STOP MERGES mt1")
@ -178,7 +180,8 @@ def test_lost_part_mutation(start_cluster):
for node in [node1, node2]:
node.query(
f"CREATE TABLE mt2 (id UInt64) ENGINE ReplicatedMergeTree('/clickhouse/tables/t2', '{node.name}') ORDER BY tuple() "
"SETTINGS cleanup_delay_period=1, cleanup_delay_period_random_add=1"
"SETTINGS cleanup_delay_period=1, cleanup_delay_period_random_add=1, cleanup_thread_preferred_points_per_iteration=0,"
"merge_selecting_sleep_ms=100, max_merge_selecting_sleep_ms=1000"
)
node1.query("SYSTEM STOP MERGES mt2")
@ -241,7 +244,8 @@ def test_lost_last_part(start_cluster):
for node in [node1, node2]:
node.query(
f"CREATE TABLE mt3 (id UInt64, p String) ENGINE ReplicatedMergeTree('/clickhouse/tables/t3', '{node.name}') "
"ORDER BY tuple() PARTITION BY p SETTINGS cleanup_delay_period=1, cleanup_delay_period_random_add=1"
"ORDER BY tuple() PARTITION BY p SETTINGS cleanup_delay_period=1, cleanup_delay_period_random_add=1, cleanup_thread_preferred_points_per_iteration=0,"
"merge_selecting_sleep_ms=100, max_merge_selecting_sleep_ms=1000"
)
node1.query("SYSTEM STOP MERGES mt3")

View File

@ -27,7 +27,7 @@ def test_empty_parts_alter_delete(started_cluster):
"CREATE TABLE empty_parts_delete (d Date, key UInt64, value String) "
"ENGINE = ReplicatedMergeTree('/clickhouse/tables/empty_parts_delete', 'r1') "
"PARTITION BY toYYYYMM(d) ORDER BY key "
"SETTINGS old_parts_lifetime = 1"
"SETTINGS old_parts_lifetime = 1, cleanup_delay_period=0, cleanup_thread_preferred_points_per_iteration=0"
)
node1.query("INSERT INTO empty_parts_delete VALUES (toDate('2020-10-10'), 1, 'a')")
@ -48,7 +48,7 @@ def test_empty_parts_summing(started_cluster):
"CREATE TABLE empty_parts_summing (d Date, key UInt64, value Int64) "
"ENGINE = ReplicatedSummingMergeTree('/clickhouse/tables/empty_parts_summing', 'r1') "
"PARTITION BY toYYYYMM(d) ORDER BY key "
"SETTINGS old_parts_lifetime = 1"
"SETTINGS old_parts_lifetime = 1, cleanup_delay_period=0, cleanup_thread_preferred_points_per_iteration=0"
)
node1.query("INSERT INTO empty_parts_summing VALUES (toDate('2020-10-10'), 1, 1)")

View File

@ -1528,7 +1528,8 @@ def test_simple_replication_and_moves(start_cluster):
s1 String
) ENGINE = ReplicatedMergeTree('/clickhouse/replicated_table_for_moves', '{}')
ORDER BY tuple()
SETTINGS storage_policy='moving_jbod_with_external', old_parts_lifetime=1, cleanup_delay_period=1, cleanup_delay_period_random_add=2
SETTINGS storage_policy='moving_jbod_with_external', old_parts_lifetime=1,
cleanup_delay_period=1, cleanup_delay_period_random_add=2, cleanup_thread_preferred_points_per_iteration=0
""".format(
i + 1
)
@ -1609,7 +1610,8 @@ def test_download_appropriate_disk(start_cluster):
s1 String
) ENGINE = ReplicatedMergeTree('/clickhouse/replicated_table_for_download', '{}')
ORDER BY tuple()
SETTINGS storage_policy='moving_jbod_with_external', old_parts_lifetime=1, cleanup_delay_period=1, cleanup_delay_period_random_add=2
SETTINGS storage_policy='moving_jbod_with_external', old_parts_lifetime=1,
cleanup_delay_period=1, cleanup_delay_period_random_add=2, cleanup_thread_preferred_points_per_iteration=0
""".format(
i + 1
)

View File

@ -27,7 +27,8 @@ def started_cluster():
def test_part_finally_removed(started_cluster):
node1.query(
"CREATE TABLE drop_outdated_part (Key UInt64) ENGINE = ReplicatedMergeTree('/table/d', '1') ORDER BY tuple() SETTINGS old_parts_lifetime=10, cleanup_delay_period=10, cleanup_delay_period_random_add=1"
"CREATE TABLE drop_outdated_part (Key UInt64) ENGINE = ReplicatedMergeTree('/table/d', '1') ORDER BY tuple() "
"SETTINGS old_parts_lifetime=10, cleanup_delay_period=10, cleanup_delay_period_random_add=1, cleanup_thread_preferred_points_per_iteration=0"
)
node1.query("INSERT INTO drop_outdated_part VALUES (1)")
@ -44,7 +45,7 @@ def test_part_finally_removed(started_cluster):
)
node1.query(
"ALTER TABLE drop_outdated_part MODIFY SETTING old_parts_lifetime=1, cleanup_delay_period=1, cleanup_delay_period_random_add=1"
"ALTER TABLE drop_outdated_part MODIFY SETTING old_parts_lifetime=1, cleanup_delay_period=1, cleanup_delay_period_random_add=1, cleanup_thread_preferred_points_per_iteration=0"
)
for i in range(60):

View File

@ -21,7 +21,7 @@ def start_cluster():
CREATE DATABASE test;
CREATE TABLE test_table(date Date, id UInt32)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/replicated', 'node1')
ORDER BY id PARTITION BY toYYYYMM(date) SETTINGS old_parts_lifetime=4, cleanup_delay_period=1;
ORDER BY id PARTITION BY toYYYYMM(date) SETTINGS old_parts_lifetime=4, cleanup_delay_period=1, cleanup_thread_preferred_points_per_iteration=0;
"""
)

View File

@ -4,7 +4,7 @@ import pytest
from helpers.cluster import ClickHouseCluster
from helpers.test_tools import assert_eq_with_retry
SETTINGS = "SETTINGS min_replicated_logs_to_keep=3, max_replicated_logs_to_keep=5, cleanup_delay_period=0, cleanup_delay_period_random_add=0"
SETTINGS = "SETTINGS min_replicated_logs_to_keep=3, max_replicated_logs_to_keep=5, cleanup_delay_period=0, cleanup_delay_period_random_add=0, cleanup_thread_preferred_points_per_iteration=0"
def fill_nodes(nodes):

View File

@ -931,7 +931,8 @@ def test_nats_overloaded_insert(nats_cluster):
CREATE TABLE test.view_overload (key UInt64, value UInt64)
ENGINE = MergeTree
ORDER BY key
SETTINGS old_parts_lifetime=5, cleanup_delay_period=2, cleanup_delay_period_random_add=3;
SETTINGS old_parts_lifetime=5, cleanup_delay_period=2, cleanup_delay_period_random_add=3,
cleanup_thread_preferred_points_per_iteration=0;
CREATE MATERIALIZED VIEW test.consumer_overload TO test.view_overload AS
SELECT * FROM test.nats_consume;
"""

View File

@ -642,7 +642,8 @@ def test_rabbitmq_sharding_between_queues_publish(rabbitmq_cluster):
CREATE TABLE test.view (key UInt64, value UInt64, channel_id String)
ENGINE = MergeTree
ORDER BY key
SETTINGS old_parts_lifetime=5, cleanup_delay_period=2, cleanup_delay_period_random_add=3;
SETTINGS old_parts_lifetime=5, cleanup_delay_period=2, cleanup_delay_period_random_add=3,
cleanup_thread_preferred_points_per_iteration=0;
CREATE MATERIALIZED VIEW test.consumer TO test.view AS
SELECT *, _channel_id AS channel_id FROM test.rabbitmq;
"""
@ -1116,7 +1117,8 @@ def test_rabbitmq_direct_exchange(rabbitmq_cluster):
CREATE TABLE test.destination(key UInt64, value UInt64)
ENGINE = MergeTree()
ORDER BY key
SETTINGS old_parts_lifetime=5, cleanup_delay_period=2, cleanup_delay_period_random_add=3;
SETTINGS old_parts_lifetime=5, cleanup_delay_period=2, cleanup_delay_period_random_add=3,
cleanup_thread_preferred_points_per_iteration=0;
"""
)

View File

@ -13,7 +13,9 @@ def fill_nodes(nodes, shard):
CREATE DATABASE test;
CREATE TABLE test.test_table(date Date, id UInt32)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test{shard}/replicated', '{replica}') ORDER BY id PARTITION BY toYYYYMM(date) SETTINGS min_replicated_logs_to_keep=3, max_replicated_logs_to_keep=5, cleanup_delay_period=0, cleanup_delay_period_random_add=0;
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test{shard}/replicated', '{replica}') ORDER BY id PARTITION BY toYYYYMM(date)
SETTINGS min_replicated_logs_to_keep=3, max_replicated_logs_to_keep=5,
cleanup_delay_period=0, cleanup_delay_period_random_add=0, cleanup_thread_preferred_points_per_iteration=0;
""".format(
shard=shard, replica=node.name
)

View File

@ -6,6 +6,7 @@ from helpers.cluster import ClickHouseCluster
from helpers.test_tools import TSV, exec_query_with_retry
from helpers.wait_for_helpers import wait_for_delete_inactive_parts
from helpers.wait_for_helpers import wait_for_delete_empty_parts
from helpers.test_tools import assert_eq_with_retry
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance("node1", with_zookeeper=True)
@ -75,7 +76,8 @@ def test_ttl_columns(started_cluster):
"""
CREATE TABLE test_ttl(date DateTime, id UInt32, a Int32 TTL date + INTERVAL 1 DAY, b Int32 TTL date + INTERVAL 1 MONTH)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/test_ttl_columns', '{replica}')
ORDER BY id PARTITION BY toDayOfMonth(date) SETTINGS merge_with_ttl_timeout=0, min_bytes_for_wide_part=0;
ORDER BY id PARTITION BY toDayOfMonth(date)
SETTINGS merge_with_ttl_timeout=0, min_bytes_for_wide_part=0, max_merge_selecting_sleep_ms=6000;
""".format(
replica=node.name
)
@ -108,7 +110,7 @@ def test_merge_with_ttl_timeout(started_cluster):
CREATE TABLE {table}(date DateTime, id UInt32, a Int32 TTL date + INTERVAL 1 DAY, b Int32 TTL date + INTERVAL 1 MONTH)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/{table}', '{replica}')
ORDER BY id PARTITION BY toDayOfMonth(date)
SETTINGS min_bytes_for_wide_part=0;
SETTINGS min_bytes_for_wide_part=0, max_merge_selecting_sleep_ms=6000;
""".format(
replica=node.name, table=table
)
@ -143,13 +145,11 @@ def test_merge_with_ttl_timeout(started_cluster):
)
)
time.sleep(15) # TTL merges shall not happen.
assert (
node1.query("SELECT countIf(a = 0) FROM {table}".format(table=table)) == "3\n"
assert_eq_with_retry(
node1, "SELECT countIf(a = 0) FROM {table}".format(table=table), "3\n"
)
assert (
node2.query("SELECT countIf(a = 0) FROM {table}".format(table=table)) == "3\n"
assert_eq_with_retry(
node2, "SELECT countIf(a = 0) FROM {table}".format(table=table), "3\n"
)
@ -164,7 +164,7 @@ def test_ttl_many_columns(started_cluster):
_offset Int32 TTL date,
_partition Int32 TTL date)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/test_ttl_2', '{replica}')
ORDER BY id PARTITION BY toDayOfMonth(date) SETTINGS merge_with_ttl_timeout=0;
ORDER BY id PARTITION BY toDayOfMonth(date) SETTINGS merge_with_ttl_timeout=0, max_merge_selecting_sleep_ms=6000;
""".format(
replica=node.name
)
@ -222,7 +222,7 @@ def test_ttl_table(started_cluster, delete_suffix):
CREATE TABLE test_ttl(date DateTime, id UInt32)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/test_ttl', '{replica}')
ORDER BY id PARTITION BY toDayOfMonth(date)
TTL date + INTERVAL 1 DAY {delete_suffix} SETTINGS merge_with_ttl_timeout=0;
TTL date + INTERVAL 1 DAY {delete_suffix} SETTINGS merge_with_ttl_timeout=0, max_merge_selecting_sleep_ms=6000;
""".format(
replica=node.name, delete_suffix=delete_suffix
)
@ -313,7 +313,7 @@ def test_ttl_double_delete_rule_returns_error(started_cluster):
CREATE TABLE test_ttl(date DateTime, id UInt32)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/test_ttl_double_delete', '{replica}')
ORDER BY id PARTITION BY toDayOfMonth(date)
TTL date + INTERVAL 1 DAY, date + INTERVAL 2 DAY SETTINGS merge_with_ttl_timeout=0
TTL date + INTERVAL 1 DAY, date + INTERVAL 2 DAY SETTINGS merge_with_ttl_timeout=0, max_merge_selecting_sleep_ms=6000
""".format(
replica=node1.name
)
@ -431,7 +431,8 @@ def test_ttl_empty_parts(started_cluster):
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/test_ttl_empty_parts', '{replica}')
ORDER BY id
SETTINGS max_bytes_to_merge_at_min_space_in_pool = 1, max_bytes_to_merge_at_max_space_in_pool = 1,
cleanup_delay_period = 1, cleanup_delay_period_random_add = 0, old_parts_lifetime = 1
cleanup_delay_period = 1, cleanup_delay_period_random_add = 0,
cleanup_thread_preferred_points_per_iteration=0, old_parts_lifetime = 1
""".format(
replica=node.name

View File

@ -36,8 +36,12 @@ $CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS dst_r1;"
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS dst_r2;"
$CLICKHOUSE_CLIENT --query="CREATE TABLE src (p UInt64, k String, d UInt64) ENGINE = MergeTree PARTITION BY p ORDER BY k;"
$CLICKHOUSE_CLIENT --query="CREATE TABLE dst_r1 (p UInt64, k String, d UInt64) ENGINE = ReplicatedMergeTree('/clickhouse/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/dst_1', '1') PARTITION BY p ORDER BY k SETTINGS old_parts_lifetime=1, cleanup_delay_period=1, cleanup_delay_period_random_add=0;"
$CLICKHOUSE_CLIENT --query="CREATE TABLE dst_r2 (p UInt64, k String, d UInt64) ENGINE = ReplicatedMergeTree('/clickhouse/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/dst_1', '2') PARTITION BY p ORDER BY k SETTINGS old_parts_lifetime=1, cleanup_delay_period=1, cleanup_delay_period_random_add=0;"
$CLICKHOUSE_CLIENT --query="CREATE TABLE dst_r1 (p UInt64, k String, d UInt64)
ENGINE = ReplicatedMergeTree('/clickhouse/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/dst_1', '1') PARTITION BY p ORDER BY k
SETTINGS old_parts_lifetime=1, cleanup_delay_period=1, cleanup_delay_period_random_add=0, cleanup_thread_preferred_points_per_iteration=0;"
$CLICKHOUSE_CLIENT --query="CREATE TABLE dst_r2 (p UInt64, k String, d UInt64)
ENGINE = ReplicatedMergeTree('/clickhouse/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/dst_1', '2') PARTITION BY p ORDER BY k
SETTINGS old_parts_lifetime=1, cleanup_delay_period=1, cleanup_delay_period_random_add=0, cleanup_thread_preferred_points_per_iteration=0;"
$CLICKHOUSE_CLIENT --query="INSERT INTO src VALUES (0, '0', 1);"
$CLICKHOUSE_CLIENT --query="INSERT INTO src VALUES (1, '0', 1);"

View File

@ -56,11 +56,13 @@ ${CLICKHOUSE_CLIENT} --query="DROP TABLE IF EXISTS mutations_cleaner_r2 SYNC"
${CLICKHOUSE_CLIENT} --query="CREATE TABLE mutations_cleaner_r1(x UInt32) ENGINE ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/mutations_cleaner', 'r1') ORDER BY x SETTINGS \
finished_mutations_to_keep = 2,
cleanup_delay_period = 1,
cleanup_delay_period_random_add = 0"
cleanup_delay_period_random_add = 0,
cleanup_thread_preferred_points_per_iteration=0"
${CLICKHOUSE_CLIENT} --query="CREATE TABLE mutations_cleaner_r2(x UInt32) ENGINE ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/mutations_cleaner', 'r2') ORDER BY x SETTINGS \
finished_mutations_to_keep = 2,
cleanup_delay_period = 1,
cleanup_delay_period_random_add = 0"
cleanup_delay_period_random_add = 0,
cleanup_thread_preferred_points_per_iteration=0"
# Insert some data
${CLICKHOUSE_CLIENT} --insert_keeper_fault_injection_probability=0 --query="INSERT INTO mutations_cleaner_r1(x) VALUES (1), (2), (3), (4), (5)"

View File

@ -20,13 +20,15 @@ CREATE TABLE part_header_r1(x UInt32, y UInt32)
SETTINGS use_minimalistic_part_header_in_zookeeper = 0,
old_parts_lifetime = 1,
cleanup_delay_period = 0,
cleanup_delay_period_random_add = 0;
cleanup_delay_period_random_add = 0,
cleanup_thread_preferred_points_per_iteration=0;
CREATE TABLE part_header_r2(x UInt32, y UInt32)
ENGINE ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/test_00814/part_header/{shard}', '2{replica}') ORDER BY x
SETTINGS use_minimalistic_part_header_in_zookeeper = 1,
old_parts_lifetime = 1,
cleanup_delay_period = 0,
cleanup_delay_period_random_add = 0;
cleanup_delay_period_random_add = 0,
cleanup_thread_preferred_points_per_iteration=0;
SELECT '*** Test fetches ***';
INSERT INTO part_header_r1 VALUES (1, 1);

View File

@ -22,7 +22,7 @@ CREATE TABLE elog (
ENGINE = ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/elog/{shard}', '{replica}')
PARTITION BY date
ORDER BY (engine_id)
SETTINGS replicated_deduplication_window = 2, cleanup_delay_period=4, cleanup_delay_period_random_add=0;"
SETTINGS replicated_deduplication_window = 2, cleanup_delay_period=4, cleanup_delay_period_random_add=0, cleanup_thread_preferred_points_per_iteration=0;"
$CLICKHOUSE_CLIENT --query="INSERT INTO elog VALUES (toDate('2018-10-01'), 1, 'hello')"
$CLICKHOUSE_CLIENT --query="INSERT INTO elog VALUES (toDate('2018-10-01'), 2, 'hello')"

View File

@ -1,6 +1,8 @@
DROP TABLE IF EXISTS mt;
CREATE TABLE mt (x UInt64) ENGINE = MergeTree ORDER BY x SETTINGS cleanup_delay_period = 1, cleanup_delay_period_random_add = 0, old_parts_lifetime = 1, parts_to_delay_insert = 100000, parts_to_throw_insert = 100000;
CREATE TABLE mt (x UInt64) ENGINE = MergeTree ORDER BY x
SETTINGS cleanup_delay_period = 1, cleanup_delay_period_random_add = 0,
cleanup_thread_preferred_points_per_iteration=0, old_parts_lifetime = 1, parts_to_delay_insert = 100000, parts_to_throw_insert = 100000;
SYSTEM STOP MERGES mt;

View File

@ -13,8 +13,14 @@ $CLICKHOUSE_CLIENT -n -q "
DROP TABLE IF EXISTS alter_table0;
DROP TABLE IF EXISTS alter_table1;
CREATE TABLE alter_table0 (a UInt8, b Int16, c Float32, d String, e Array(UInt8), f Nullable(UUID), g Tuple(UInt8, UInt16)) ENGINE = ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/alter_table', 'r1') ORDER BY a PARTITION BY b % 10 SETTINGS old_parts_lifetime = 1, cleanup_delay_period = 1, cleanup_delay_period_random_add = 0, replicated_max_mutations_in_one_entry = $(($RANDOM / 50 + 100));
CREATE TABLE alter_table1 (a UInt8, b Int16, c Float32, d String, e Array(UInt8), f Nullable(UUID), g Tuple(UInt8, UInt16)) ENGINE = ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/alter_table', 'r2') ORDER BY a PARTITION BY b % 10 SETTINGS old_parts_lifetime = 1, cleanup_delay_period = 1, cleanup_delay_period_random_add = 0, replicated_max_mutations_in_one_entry = $(($RANDOM / 50 + 200));
CREATE TABLE alter_table0 (a UInt8, b Int16, c Float32, d String, e Array(UInt8), f Nullable(UUID), g Tuple(UInt8, UInt16))
ENGINE = ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/alter_table', 'r1') ORDER BY a PARTITION BY b % 10
SETTINGS old_parts_lifetime = 1, cleanup_delay_period = 1, cleanup_delay_period_random_add = 0,
cleanup_thread_preferred_points_per_iteration=0, replicated_max_mutations_in_one_entry = $(($RANDOM / 50 + 100));
CREATE TABLE alter_table1 (a UInt8, b Int16, c Float32, d String, e Array(UInt8), f Nullable(UUID), g Tuple(UInt8, UInt16))
ENGINE = ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/alter_table', 'r2') ORDER BY a PARTITION BY b % 10
SETTINGS old_parts_lifetime = 1, cleanup_delay_period = 1, cleanup_delay_period_random_add = 0,
cleanup_thread_preferred_points_per_iteration=0, replicated_max_mutations_in_one_entry = $(($RANDOM / 50 + 200));
"
function thread1()

View File

@ -58,7 +58,8 @@ function thread6()
$CLICKHOUSE_CLIENT -n -q "DROP TABLE IF EXISTS alter_table_$REPLICA;
CREATE TABLE alter_table_$REPLICA (a UInt8, b Int16, c Float32, d String, e Array(UInt8), f Nullable(UUID), g Tuple(UInt8, UInt16))
ENGINE = ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/alter_table', 'r_$REPLICA') ORDER BY a PARTITION BY b % 10
SETTINGS old_parts_lifetime = 1, cleanup_delay_period = 0, cleanup_delay_period_random_add = 0, replicated_max_mutations_in_one_entry = $(($RANDOM / 50));";
SETTINGS old_parts_lifetime = 1, cleanup_delay_period = 0, cleanup_delay_period_random_add = 0,
cleanup_thread_preferred_points_per_iteration=0, replicated_max_mutations_in_one_entry = $(($RANDOM / 50));";
sleep 0.$RANDOM;
done
}

View File

@ -28,7 +28,8 @@ $CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS src;"
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS dst;"
$CLICKHOUSE_CLIENT --query="CREATE TABLE src (p UInt64, k String, d UInt64) ENGINE = ReplicatedMergeTree('/clickhouse/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/src1', '1') PARTITION BY p ORDER BY k;"
$CLICKHOUSE_CLIENT --query="CREATE TABLE dst (p UInt64, k String, d UInt64) ENGINE = ReplicatedMergeTree('/clickhouse/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/dst1', '1') PARTITION BY p ORDER BY k SETTINGS old_parts_lifetime=1, cleanup_delay_period=1, cleanup_delay_period_random_add=0;"
$CLICKHOUSE_CLIENT --query="CREATE TABLE dst (p UInt64, k String, d UInt64) ENGINE = ReplicatedMergeTree('/clickhouse/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/dst1', '1') PARTITION BY p ORDER BY k
SETTINGS old_parts_lifetime=1, cleanup_delay_period=1, cleanup_delay_period_random_add=0, cleanup_thread_preferred_points_per_iteration=0;"
$CLICKHOUSE_CLIENT --query="INSERT INTO src VALUES (0, '0', 1);"
$CLICKHOUSE_CLIENT --query="INSERT INTO src VALUES (1, '0', 1);"
@ -58,7 +59,8 @@ $CLICKHOUSE_CLIENT --query="DROP TABLE dst;"
$CLICKHOUSE_CLIENT --query="SELECT 'MOVE incompatible schema missing column';"
$CLICKHOUSE_CLIENT --query="CREATE TABLE src (p UInt64, k String, d UInt64) ENGINE = ReplicatedMergeTree('/clickhouse/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/src2', '1') PARTITION BY p ORDER BY (d, p);"
$CLICKHOUSE_CLIENT --query="CREATE TABLE dst (p UInt64, d UInt64) ENGINE = ReplicatedMergeTree('/clickhouse/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/dst2', '1') PARTITION BY p ORDER BY (d, p) SETTINGS old_parts_lifetime=1, cleanup_delay_period=1, cleanup_delay_period_random_add=0;"
$CLICKHOUSE_CLIENT --query="CREATE TABLE dst (p UInt64, d UInt64) ENGINE = ReplicatedMergeTree('/clickhouse/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/dst2', '1') PARTITION BY p ORDER BY (d, p)
SETTINGS old_parts_lifetime=1, cleanup_delay_period=1, cleanup_delay_period_random_add=0, cleanup_thread_preferred_points_per_iteration=0;"
$CLICKHOUSE_CLIENT --query="INSERT INTO src VALUES (0, '0', 1);"
$CLICKHOUSE_CLIENT --query="INSERT INTO src VALUES (1, '0', 1);"

View File

@ -11,7 +11,8 @@ $CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS src;"
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS dst;"
$CLICKHOUSE_CLIENT --query="CREATE TABLE src (p UInt64, k String) ENGINE = ReplicatedMergeTree('/clickhouse/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/src', '1') PARTITION BY p ORDER BY k;"
$CLICKHOUSE_CLIENT --query="CREATE TABLE dst (p UInt64, k String) ENGINE = ReplicatedMergeTree('/clickhouse/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/dst', '1') PARTITION BY p ORDER BY k SETTINGS old_parts_lifetime=1, cleanup_delay_period=1, cleanup_delay_period_random_add=0;"
$CLICKHOUSE_CLIENT --query="CREATE TABLE dst (p UInt64, k String) ENGINE = ReplicatedMergeTree('/clickhouse/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/dst', '1') PARTITION BY p ORDER BY k
SETTINGS old_parts_lifetime=1, cleanup_delay_period=1, cleanup_delay_period_random_add=0, cleanup_thread_preferred_points_per_iteration=0;"
function thread1()
{

View File

@ -31,7 +31,8 @@ for i in $(seq $REPLICAS); do
max_replicated_merges_in_queue = 1000,
temporary_directories_lifetime = 10,
cleanup_delay_period = 3,
cleanup_delay_period_random_add = 0"
cleanup_delay_period_random_add = 0,
cleanup_thread_preferred_points_per_iteration=0"
done
$CLICKHOUSE_CLIENT --query "INSERT INTO concurrent_mutate_mt_1 SELECT number, number + 10, toString(number) from numbers(10)"

View File

@ -12,7 +12,10 @@ for i in $(seq $REPLICAS); do
done
for i in $(seq $REPLICAS); do
$CLICKHOUSE_CLIENT --query "CREATE TABLE concurrent_alter_detach_$i (key UInt64, value1 UInt8, value2 UInt8) ENGINE = ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/concurrent_alter_detach', '$i') ORDER BY key SETTINGS max_replicated_mutations_in_queue=1000, number_of_free_entries_in_pool_to_execute_mutation=0,max_replicated_merges_in_queue=1000,temporary_directories_lifetime=10,cleanup_delay_period=3,cleanup_delay_period_random_add=0"
$CLICKHOUSE_CLIENT --query "CREATE TABLE concurrent_alter_detach_$i (key UInt64, value1 UInt8, value2 UInt8)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/concurrent_alter_detach', '$i') ORDER BY key
SETTINGS max_replicated_mutations_in_queue=1000, number_of_free_entries_in_pool_to_execute_mutation=0,max_replicated_merges_in_queue=1000,
temporary_directories_lifetime=10,cleanup_delay_period=3,cleanup_delay_period_random_add=0,cleanup_thread_preferred_points_per_iteration=0"
done
$CLICKHOUSE_CLIENT --query "INSERT INTO concurrent_alter_detach_1 SELECT number, number + 10, number from numbers(10)"

View File

@ -27,7 +27,9 @@ function thread3()
{
while true; do
$CLICKHOUSE_CLIENT -n -q "DROP TABLE IF EXISTS concurrent_optimize_table;
CREATE TABLE concurrent_optimize_table (a UInt8, b Int16, c Float32, d String, e Array(UInt8), f Nullable(UUID), g Tuple(UInt8, UInt16)) ENGINE = ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/concurrent_optimize_table', '1') ORDER BY a PARTITION BY b % 10 SETTINGS old_parts_lifetime = 1, cleanup_delay_period = 0, cleanup_delay_period_random_add = 0;";
CREATE TABLE concurrent_optimize_table (a UInt8, b Int16, c Float32, d String, e Array(UInt8), f Nullable(UUID), g Tuple(UInt8, UInt16))
ENGINE = ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/concurrent_optimize_table', '1') ORDER BY a PARTITION BY b % 10
SETTINGS old_parts_lifetime = 1, cleanup_delay_period = 0, cleanup_delay_period_random_add = 0, cleanup_thread_preferred_points_per_iteration=0;";
sleep 0.$RANDOM;
sleep 0.$RANDOM;
sleep 0.$RANDOM;

View File

@ -6,7 +6,7 @@ SET insert_keeper_fault_injection_probability=0; -- disable fault injection; par
drop table if exists rmt sync;
-- cleanup code will perform extra Exists
-- (so the .reference will not match)
create table rmt (n int) engine=ReplicatedMergeTree('/test/01158/{database}/rmt', '1') order by n settings cleanup_delay_period=86400, replicated_can_become_leader=0;
create table rmt (n int) engine=ReplicatedMergeTree('/test/01158/{database}/rmt', '1') order by n settings cleanup_delay_period=86400, max_cleanup_delay_period=86400, replicated_can_become_leader=0;
system sync replica rmt;
insert into rmt values (1);
insert into rmt values (1);

View File

@ -13,8 +13,10 @@ SCALE=5000
$CLICKHOUSE_CLIENT -n --query "
DROP TABLE IF EXISTS r1;
DROP TABLE IF EXISTS r2;
CREATE TABLE r1 (x UInt64) ENGINE = ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/{shard}', '1{replica}') ORDER BY x SETTINGS old_parts_lifetime = 1, cleanup_delay_period = 0, cleanup_delay_period_random_add = 1, parts_to_throw_insert = 100000, max_replicated_logs_to_keep = 10;
CREATE TABLE r2 (x UInt64) ENGINE = ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/{shard}', '2{replica}') ORDER BY x SETTINGS old_parts_lifetime = 1, cleanup_delay_period = 0, cleanup_delay_period_random_add = 1, parts_to_throw_insert = 100000, max_replicated_logs_to_keep = 10;
CREATE TABLE r1 (x UInt64) ENGINE = ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/{shard}', '1{replica}') ORDER BY x
SETTINGS old_parts_lifetime = 1, cleanup_delay_period = 0, cleanup_delay_period_random_add = 1, cleanup_thread_preferred_points_per_iteration=0, parts_to_throw_insert = 100000, max_replicated_logs_to_keep = 10;
CREATE TABLE r2 (x UInt64) ENGINE = ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/{shard}', '2{replica}') ORDER BY x
SETTINGS old_parts_lifetime = 1, cleanup_delay_period = 0, cleanup_delay_period_random_add = 1, cleanup_thread_preferred_points_per_iteration=0, parts_to_throw_insert = 100000, max_replicated_logs_to_keep = 10;
DETACH TABLE r2;
"

View File

@ -8,7 +8,9 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS table_for_renames0"
$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS table_for_renames50"
$CLICKHOUSE_CLIENT --query "CREATE TABLE table_for_renames0 (value UInt64, data String) ENGINE ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/concurrent_rename', '1') ORDER BY tuple() SETTINGS cleanup_delay_period = 1, cleanup_delay_period_random_add = 0"
$CLICKHOUSE_CLIENT --query "CREATE TABLE table_for_renames0 (value UInt64, data String)
ENGINE ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/concurrent_rename', '1') ORDER BY tuple()
SETTINGS cleanup_delay_period = 1, cleanup_delay_period_random_add = 0, cleanup_thread_preferred_points_per_iteration=0"
$CLICKHOUSE_CLIENT --query "INSERT INTO table_for_renames0 SELECT number, toString(number) FROM numbers(1000)"

View File

@ -13,7 +13,8 @@ $CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS parallel_q1 SYNC"
$CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS parallel_q2 SYNC"
$CLICKHOUSE_CLIENT -q "CREATE TABLE parallel_q1 (x UInt64) ENGINE=ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/parallel_q', 'r1') ORDER BY tuple() SETTINGS old_parts_lifetime = 1, cleanup_delay_period = 0, cleanup_delay_period_random_add = 0"
$CLICKHOUSE_CLIENT -q "CREATE TABLE parallel_q1 (x UInt64) ENGINE=ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/parallel_q', 'r1') ORDER BY tuple()
SETTINGS old_parts_lifetime = 1, cleanup_delay_period = 0, cleanup_delay_period_random_add = 0, cleanup_thread_preferred_points_per_iteration=0"
$CLICKHOUSE_CLIENT -q "CREATE TABLE parallel_q2 (x UInt64) ENGINE=ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/parallel_q', 'r2') ORDER BY tuple() SETTINGS always_fetch_merged_part = 1"

View File

@ -24,7 +24,8 @@ for i in $(seq 1 $NUM_REPLICAS); do
ENGINE ReplicatedMergeTree('/test/01921_concurrent_ttl_and_normal_merges/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/ttl_table', '$i')
ORDER BY tuple()
TTL key + INTERVAL 1 SECOND
SETTINGS merge_with_ttl_timeout=1, max_replicated_merges_with_ttl_in_queue=100, max_number_of_merges_with_ttl_in_pool=100, cleanup_delay_period=1, cleanup_delay_period_random_add=0;"
SETTINGS merge_with_ttl_timeout=1, max_replicated_merges_with_ttl_in_queue=100, max_number_of_merges_with_ttl_in_pool=100,
cleanup_delay_period=1, cleanup_delay_period_random_add=0, cleanup_thread_preferred_points_per_iteration=0;"
done
function optimize_thread

View File

@ -4,11 +4,17 @@ DROP TABLE IF EXISTS partslost_0;
DROP TABLE IF EXISTS partslost_1;
DROP TABLE IF EXISTS partslost_2;
CREATE TABLE partslost_0 (x String) ENGINE=ReplicatedMergeTree('/clickhouse/table/{database}_02067_lost/partslost', '0') ORDER BY tuple() SETTINGS min_rows_for_wide_part = 0, min_bytes_for_wide_part = 0, old_parts_lifetime = 1, cleanup_delay_period = 1, cleanup_delay_period_random_add = 1;
CREATE TABLE partslost_0 (x String) ENGINE=ReplicatedMergeTree('/clickhouse/table/{database}_02067_lost/partslost', '0') ORDER BY tuple()
SETTINGS min_rows_for_wide_part = 0, min_bytes_for_wide_part = 0, old_parts_lifetime = 1,
cleanup_delay_period = 1, cleanup_delay_period_random_add = 1, cleanup_thread_preferred_points_per_iteration=0;
CREATE TABLE partslost_1 (x String) ENGINE=ReplicatedMergeTree('/clickhouse/table/{database}_02067_lost/partslost', '1') ORDER BY tuple() SETTINGS min_rows_for_wide_part = 0, min_bytes_for_wide_part = 0, old_parts_lifetime = 1, cleanup_delay_period = 1, cleanup_delay_period_random_add = 1;
CREATE TABLE partslost_1 (x String) ENGINE=ReplicatedMergeTree('/clickhouse/table/{database}_02067_lost/partslost', '1') ORDER BY tuple()
SETTINGS min_rows_for_wide_part = 0, min_bytes_for_wide_part = 0, old_parts_lifetime = 1,
cleanup_delay_period = 1, cleanup_delay_period_random_add = 1, cleanup_thread_preferred_points_per_iteration=0;
CREATE TABLE partslost_2 (x String) ENGINE=ReplicatedMergeTree('/clickhouse/table/{database}_02067_lost/partslost', '2') ORDER BY tuple() SETTINGS min_rows_for_wide_part = 0, min_bytes_for_wide_part = 0, old_parts_lifetime = 1, cleanup_delay_period = 1, cleanup_delay_period_random_add = 1;
CREATE TABLE partslost_2 (x String) ENGINE=ReplicatedMergeTree('/clickhouse/table/{database}_02067_lost/partslost', '2') ORDER BY tuple()
SETTINGS min_rows_for_wide_part = 0, min_bytes_for_wide_part = 0, old_parts_lifetime = 1,
cleanup_delay_period = 1, cleanup_delay_period_random_add = 1, cleanup_thread_preferred_points_per_iteration=0;
INSERT INTO partslost_0 SELECT toString(number) AS x from system.numbers LIMIT 10000;

View File

@ -9,7 +9,7 @@ $CLICKHOUSE_CLIENT -q "drop table if exists rmt1 sync;"
$CLICKHOUSE_CLIENT -q "drop table if exists rmt2 sync;"
$CLICKHOUSE_CLIENT -q "create table rmt1 (n int) engine=ReplicatedMergeTree('/test/02369/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/{database}', '1') order by n
settings cleanup_delay_period=0, cleanup_delay_period_random_add=0, old_parts_lifetime=0"
settings cleanup_delay_period=0, cleanup_delay_period_random_add=0, cleanup_thread_preferred_points_per_iteration=0, old_parts_lifetime=0"
$CLICKHOUSE_CLIENT -q "create table rmt2 (n int) engine=ReplicatedMergeTree('/test/02369/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/{database}', '2') order by n"
$CLICKHOUSE_CLIENT -q "system stop replicated sends rmt2"

View File

@ -15,8 +15,12 @@ $CLICKHOUSE_CLIENT -n -q "
DROP TABLE IF EXISTS alter_table0;
DROP TABLE IF EXISTS alter_table1;
CREATE TABLE alter_table0 (a UInt8, b Int16, c Float32, d String, e Array(UInt8), f Nullable(UUID), g Tuple(UInt8, UInt16)) ENGINE = ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/alter_table', 'r1') ORDER BY a PARTITION BY b % 10 SETTINGS old_parts_lifetime = 1, cleanup_delay_period = 1, cleanup_delay_period_random_add = 0;
CREATE TABLE alter_table1 (a UInt8, b Int16, c Float32, d String, e Array(UInt8), f Nullable(UUID), g Tuple(UInt8, UInt16)) ENGINE = ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/alter_table', 'r2') ORDER BY a PARTITION BY b % 10 SETTINGS old_parts_lifetime = 1, cleanup_delay_period = 1, cleanup_delay_period_random_add = 0
CREATE TABLE alter_table0 (a UInt8, b Int16, c Float32, d String, e Array(UInt8), f Nullable(UUID), g Tuple(UInt8, UInt16))
ENGINE = ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/alter_table', 'r1') ORDER BY a PARTITION BY b % 10
SETTINGS old_parts_lifetime = 1, cleanup_delay_period = 1, cleanup_delay_period_random_add = 0, cleanup_thread_preferred_points_per_iteration=0;
CREATE TABLE alter_table1 (a UInt8, b Int16, c Float32, d String, e Array(UInt8), f Nullable(UUID), g Tuple(UInt8, UInt16))
ENGINE = ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/alter_table', 'r2') ORDER BY a PARTITION BY b % 10
SETTINGS old_parts_lifetime = 1, cleanup_delay_period = 1, cleanup_delay_period_random_add = 0, cleanup_thread_preferred_points_per_iteration=0
"
function thread1()

View File

@ -58,7 +58,9 @@ function thread6()
while true; do
REPLICA=$(($RANDOM % 10))
$CLICKHOUSE_CLIENT -n -q "DROP TABLE IF EXISTS alter_table_$REPLICA;
CREATE TABLE alter_table_$REPLICA (a UInt8, b Int16, c Float32, d String, e Array(UInt8), f Nullable(UUID), g Tuple(UInt8, UInt16)) ENGINE = ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/alter_table', 'r_$REPLICA') ORDER BY a PARTITION BY b % 10 SETTINGS old_parts_lifetime = 1, cleanup_delay_period = 0, cleanup_delay_period_random_add = 0;";
CREATE TABLE alter_table_$REPLICA (a UInt8, b Int16, c Float32, d String, e Array(UInt8), f Nullable(UUID), g Tuple(UInt8, UInt16))
ENGINE = ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/alter_table', 'r_$REPLICA') ORDER BY a PARTITION BY b % 10
SETTINGS old_parts_lifetime = 1, cleanup_delay_period = 0, cleanup_delay_period_random_add = 0, cleanup_thread_preferred_points_per_iteration=0;";
sleep 0.$RANDOM;
done
}

View File

@ -9,7 +9,7 @@ CREATE TABLE mutate_and_zero_copy_replication1
)
ENGINE ReplicatedMergeTree('/clickhouse/tables/{database}/test_02427_mutate_and_zero_copy_replication/alter', '1')
ORDER BY tuple()
SETTINGS old_parts_lifetime=0, cleanup_delay_period=300, cleanup_delay_period_random_add=300, min_bytes_for_wide_part = 0;
SETTINGS old_parts_lifetime=0, cleanup_delay_period=300, max_cleanup_delay_period=300, cleanup_delay_period_random_add=300, min_bytes_for_wide_part = 0;
CREATE TABLE mutate_and_zero_copy_replication2
(
@ -19,7 +19,7 @@ CREATE TABLE mutate_and_zero_copy_replication2
)
ENGINE ReplicatedMergeTree('/clickhouse/tables/{database}/test_02427_mutate_and_zero_copy_replication/alter', '2')
ORDER BY tuple()
SETTINGS old_parts_lifetime=0, cleanup_delay_period=300, cleanup_delay_period_random_add=300;
SETTINGS old_parts_lifetime=0, cleanup_delay_period=300, max_cleanup_delay_period=300, cleanup_delay_period_random_add=300;
INSERT INTO mutate_and_zero_copy_replication1 VALUES (1, '1', 1.0);

View File

@ -8,7 +8,7 @@ drop table if exists rmt2;
-- Disable compact parts, because we need hardlinks in mutations.
create table rmt (n int, m int, k int) engine=ReplicatedMergeTree('/test/02432/{database}', '1') order by tuple()
settings storage_policy = 's3_cache', allow_remote_fs_zero_copy_replication=1,
concurrent_part_removal_threshold=1, cleanup_delay_period=1, cleanup_delay_period_random_add=1,
concurrent_part_removal_threshold=1, cleanup_delay_period=1, cleanup_delay_period_random_add=1, cleanup_thread_preferred_points_per_iteration=0,
max_replicated_merges_in_queue=0, max_replicated_mutations_in_queue=0, min_bytes_for_wide_part=0, min_rows_for_wide_part=0;
insert into rmt(n, m) values (1, 42);
@ -38,7 +38,7 @@ select count(), sum(n), sum(m) from rmt;
-- New table can assign merges/mutations and can remove old parts
create table rmt2 (n int, m int, k String) engine=ReplicatedMergeTree('/test/02432/{database}', '2') order by tuple()
settings storage_policy = 's3_cache', allow_remote_fs_zero_copy_replication=1,
concurrent_part_removal_threshold=1, cleanup_delay_period=1, cleanup_delay_period_random_add=1,
concurrent_part_removal_threshold=1, cleanup_delay_period=1, cleanup_delay_period_random_add=1, cleanup_thread_preferred_points_per_iteration=0,
min_bytes_for_wide_part=0, min_rows_for_wide_part=0, max_replicated_merges_in_queue=1,
old_parts_lifetime=0;

View File

@ -5,9 +5,15 @@ SET insert_keeper_fault_injection_probability=0; -- disable fault injection; par
drop table if exists rmt1;
drop table if exists rmt2;
create table rmt1 (n int) engine=ReplicatedMergeTree('/test/02448/{database}/rmt', '1') order by tuple()
settings min_replicated_logs_to_keep=1, max_replicated_logs_to_keep=2, cleanup_delay_period=0, cleanup_delay_period_random_add=1, old_parts_lifetime=0, max_parts_to_merge_at_once=4;
settings min_replicated_logs_to_keep=1, max_replicated_logs_to_keep=2,
max_cleanup_delay_period=1, cleanup_delay_period=0, cleanup_delay_period_random_add=1,
cleanup_thread_preferred_points_per_iteration=0, old_parts_lifetime=0, max_parts_to_merge_at_once=4,
merge_selecting_sleep_ms=1000, max_merge_selecting_sleep_ms=2000;
create table rmt2 (n int) engine=ReplicatedMergeTree('/test/02448/{database}/rmt', '2') order by tuple()
settings min_replicated_logs_to_keep=1, max_replicated_logs_to_keep=2, cleanup_delay_period=0, cleanup_delay_period_random_add=1, old_parts_lifetime=0, max_parts_to_merge_at_once=4;
settings min_replicated_logs_to_keep=1, max_replicated_logs_to_keep=2,
max_cleanup_delay_period=1, cleanup_delay_period=0, cleanup_delay_period_random_add=1,
cleanup_thread_preferred_points_per_iteration=0, old_parts_lifetime=0, max_parts_to_merge_at_once=4,
merge_selecting_sleep_ms=1000, max_merge_selecting_sleep_ms=2000;
-- insert part only on one replica
system stop replicated sends rmt1;
@ -137,7 +143,10 @@ system sync replica rmt2;
-- merge through gap
optimize table rmt2;
-- give it a chance to cleanup log
select sleep(2) format Null; -- increases probability of reproducing the issue
select sleepEachRow(2) from url('http://localhost:8123/?param_tries={1..10}&query=' || encodeURLComponent(
'select value from system.zookeeper where path=''/test/02448/' || currentDatabase() || '/rmt/replicas/1'' and name=''is_lost'' and value=''0'''
), 'LineAsString', 's String') settings max_threads=1 format Null;
-- rmt1 will mimic rmt2, but will not be able to fetch parts for a while
system stop replicated sends rmt2;

View File

@ -24,7 +24,8 @@ CREATE TABLE wikistat1
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{database}/02494_zero_copy_and_projection', '1')
ORDER BY (path, time)
SETTINGS old_parts_lifetime = 1, cleanup_delay_period = 0, cleanup_delay_period_random_add = 0, allow_remote_fs_zero_copy_replication=1, min_bytes_for_wide_part=0;
SETTINGS old_parts_lifetime = 1, cleanup_delay_period = 0, cleanup_delay_period_random_add = 0,
cleanup_thread_preferred_points_per_iteration=0, allow_remote_fs_zero_copy_replication=1, min_bytes_for_wide_part=0;
CREATE TABLE wikistat2
(
@ -49,7 +50,8 @@ CREATE TABLE wikistat2
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{database}/02494_zero_copy_and_projection', '2')
ORDER BY (path, time)
SETTINGS old_parts_lifetime = 1, cleanup_delay_period = 0, cleanup_delay_period_random_add = 0, allow_remote_fs_zero_copy_replication=1, min_bytes_for_wide_part=0;
SETTINGS old_parts_lifetime = 1, cleanup_delay_period = 0, cleanup_delay_period_random_add = 0,
cleanup_thread_preferred_points_per_iteration=0, allow_remote_fs_zero_copy_replication=1, min_bytes_for_wide_part=0;
INSERT INTO wikistat1 SELECT toDateTime('2020-10-01 00:00:00'), 'hello', 'world', '/data/path', 10 from numbers(100);

View File

@ -13,7 +13,7 @@ $CLICKHOUSE_CLIENT -n --query "
CREATE TABLE t_async_insert_cleanup (
KeyID UInt32
) Engine = ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/t_async_insert_cleanup', '{replica}')
ORDER BY (KeyID) SETTINGS cleanup_delay_period = 1, cleanup_delay_period_random_add = 1, replicated_deduplication_window_for_async_inserts=10
ORDER BY (KeyID) SETTINGS cleanup_delay_period = 1, cleanup_delay_period_random_add = 1, cleanup_thread_preferred_points_per_iteration=0, replicated_deduplication_window_for_async_inserts=10
"
for i in {1..100}; do