more flexible cleanup thread scheduling

This commit is contained in:
Alexander Tokmakov 2023-05-22 19:07:18 +02:00
parent 0f90bfdd11
commit b8305503d8
52 changed files with 353 additions and 112 deletions

View File

@ -11,3 +11,8 @@ constexpr double interpolateExponential(double min, double max, double ratio)
assert(min > 0 && ratio >= 0 && ratio <= 1);
return min * std::pow(max / min, ratio);
}
constexpr double interpolateLinear(double min, double max, double ratio)
{
return std::lerp(min, max, ratio);
}

View File

@ -211,9 +211,9 @@ void IMergeTreeDataPart::MinMaxIndex::appendFiles(const MergeTreeData & data, St
}
static void incrementStateMetric(MergeTreeDataPartState state)
void IMergeTreeDataPart::incrementStateMetric(MergeTreeDataPartState state_) const
{
switch (state)
switch (state_)
{
case MergeTreeDataPartState::Temporary:
CurrentMetrics::add(CurrentMetrics::PartsTemporary);
@ -227,6 +227,7 @@ static void incrementStateMetric(MergeTreeDataPartState state)
CurrentMetrics::add(CurrentMetrics::PartsCommitted);
return;
case MergeTreeDataPartState::Outdated:
storage.total_outdated_parts_count.fetch_add(1, std::memory_order_relaxed);
CurrentMetrics::add(CurrentMetrics::PartsOutdated);
return;
case MergeTreeDataPartState::Deleting:
@ -238,9 +239,9 @@ static void incrementStateMetric(MergeTreeDataPartState state)
}
}
static void decrementStateMetric(MergeTreeDataPartState state)
void IMergeTreeDataPart::decrementStateMetric(MergeTreeDataPartState state_) const
{
switch (state)
switch (state_)
{
case MergeTreeDataPartState::Temporary:
CurrentMetrics::sub(CurrentMetrics::PartsTemporary);
@ -254,6 +255,7 @@ static void decrementStateMetric(MergeTreeDataPartState state)
CurrentMetrics::sub(CurrentMetrics::PartsCommitted);
return;
case MergeTreeDataPartState::Outdated:
storage.total_outdated_parts_count.fetch_sub(1, std::memory_order_relaxed);
CurrentMetrics::sub(CurrentMetrics::PartsOutdated);
return;
case MergeTreeDataPartState::Deleting:

View File

@ -623,6 +623,9 @@ private:
/// for this column with default parameters.
CompressionCodecPtr detectDefaultCompressionCodec() const;
void incrementStateMetric(MergeTreeDataPartState state) const;
void decrementStateMetric(MergeTreeDataPartState state) const;
mutable MergeTreeDataPartState state{MergeTreeDataPartState::Temporary};
/// This ugly flag is needed for debug assertions only

View File

@ -81,6 +81,7 @@
#include <boost/algorithm/string/join.hpp>
#include <base/insertAtEnd.h>
#include <base/interpolate.h>
#include <algorithm>
#include <atomic>
@ -4311,6 +4312,29 @@ size_t MergeTreeData::getActivePartsCount() const
}
size_t MergeTreeData::getOutdatedPartsCount() const
{
return total_outdated_parts_count.load(std::memory_order_relaxed);
}
size_t MergeTreeData::getNumberOfOutdatedPartsWithExpiredRemovalTime() const
{
size_t res = 0;
auto time_now = time(nullptr);
auto parts_lock = lockParts();
auto outdated_parts_range = getDataPartsStateRange(DataPartState::Outdated);
for (const auto & part : outdated_parts_range)
{
auto part_remove_time = part->remove_time.load(std::memory_order_relaxed);
if (part_remove_time <= time_now && time_now - part_remove_time >= getSettings()->old_parts_lifetime.totalSeconds() && part.unique())
++res;
}
return res;
}
std::pair<size_t, size_t> MergeTreeData::getMaxPartsCountAndSizeForPartitionWithState(DataPartState state) const
{
auto lock = lockParts();
@ -4519,7 +4543,7 @@ void MergeTreeData::delayMutationOrThrowIfNeeded(Poco::Event * until, const Cont
size_t allowed_mutations_over_threshold = num_mutations_to_throw - num_mutations_to_delay;
double delay_factor = std::min(static_cast<double>(mutations_over_threshold) / allowed_mutations_over_threshold, 1.0);
size_t delay_milliseconds = static_cast<size_t>(std::lerp(settings->min_delay_to_mutate_ms, settings->max_delay_to_mutate_ms, delay_factor));
size_t delay_milliseconds = static_cast<size_t>(interpolateLinear(settings->min_delay_to_mutate_ms, settings->max_delay_to_mutate_ms, delay_factor));
ProfileEvents::increment(ProfileEvents::DelayedMutations);
ProfileEvents::increment(ProfileEvents::DelayedMutationsMilliseconds, delay_milliseconds);

View File

@ -532,6 +532,10 @@ public:
size_t getActivePartsCount() const;
size_t getOutdatedPartsCount() const;
size_t getNumberOfOutdatedPartsWithExpiredRemovalTime() const;
/// Returns a pair with: max number of parts in partition across partitions; sum size of parts inside that partition.
/// (if there are multiple partitions with max number of parts, the sum size of parts is returned for arbitrary of them)
std::pair<size_t, size_t> getMaxPartsCountAndSizeForPartitionWithState(DataPartState state) const;
@ -1491,6 +1495,8 @@ private:
std::atomic<size_t> total_active_size_rows = 0;
std::atomic<size_t> total_active_size_parts = 0;
mutable std::atomic<size_t> total_outdated_parts_count = 0;
// Record all query ids which access the table. It's guarded by `query_id_set_mutex` and is always mutable.
mutable std::set<String> query_id_set TSA_GUARDED_BY(query_id_set_mutex);
mutable std::mutex query_id_set_mutex;

View File

@ -120,8 +120,10 @@ struct Settings;
\
/** Check delay of replicas settings. */ \
M(UInt64, min_relative_delay_to_measure, 120, "Calculate relative replica delay only if absolute delay is not less that this value.", 0) \
M(UInt64, cleanup_delay_period, 30, "Period to clean old queue logs, blocks hashes and parts.", 0) \
M(UInt64, cleanup_delay_period, 30, "Minimum period to clean old queue logs, blocks hashes and parts.", 0) \
M(UInt64, max_cleanup_delay_period, 300, "Maximum period to clean old queue logs, blocks hashes and parts.", 0) \
M(UInt64, cleanup_delay_period_random_add, 10, "Add uniformly distributed value from 0 to x seconds to cleanup_delay_period to avoid thundering herd effect and subsequent DoS of ZooKeeper in case of very large number of tables.", 0) \
M(UInt64, cleanup_thread_preferred_points_per_iteration, 150, "Preferred batch size for background cleanup (points are abstract but 1 point is approximately equivalent to 1 inserted block).", 0) \
M(UInt64, min_relative_delay_to_close, 300, "Minimal delay from other replicas to close, stop serving requests and not return Ok during status check.", 0) \
M(UInt64, min_absolute_delay_to_close, 0, "Minimal absolute delay to close, stop serving requests and not return Ok during status check.", 0) \
M(UInt64, enable_vertical_merge_algorithm, 1, "Enable usage of Vertical merge algorithm.", 0) \

View File

@ -25,19 +25,22 @@ ReplicatedMergeTreeCleanupThread::ReplicatedMergeTreeCleanupThread(StorageReplic
: storage(storage_)
, log_name(storage.getStorageID().getFullTableName() + " (ReplicatedMergeTreeCleanupThread)")
, log(&Poco::Logger::get(log_name))
, sleep_ms(storage.getSettings()->cleanup_delay_period * 1000)
{
task = storage.getContext()->getSchedulePool().createTask(log_name, [this]{ run(); });
}
void ReplicatedMergeTreeCleanupThread::run()
{
auto storage_settings = storage.getSettings();
const auto sleep_ms = storage_settings->cleanup_delay_period * 1000
+ std::uniform_int_distribution<UInt64>(0, storage_settings->cleanup_delay_period_random_add * 1000)(rng);
SCOPE_EXIT({ is_running.store(false, std::memory_order_relaxed); });
is_running.store(true, std::memory_order_relaxed);
auto storage_settings = storage.getSettings();
Float32 cleanup_points = 0;
try
{
iterate();
cleanup_points = iterate();
}
catch (const Coordination::Exception & e)
{
@ -51,39 +54,144 @@ void ReplicatedMergeTreeCleanupThread::run()
tryLogCurrentException(log, __PRETTY_FUNCTION__);
}
UInt64 prev_timestamp = prev_cleanup_timestamp_ms.load(std::memory_order_relaxed);
UInt64 now_ms = clock_gettime_ns_adjusted(prev_timestamp * 1'000'000) / 1'000'000;
/// Do not adjust sleep_ms on the first run after starting the server
if (prev_timestamp && storage_settings->cleanup_thread_preferred_points_per_iteration)
{
/// We don't want to run the task too often when the table was barely changed and there's almost nothing to cleanup.
/// But we cannot simply sleep max_cleanup_delay_period (300s) when nothing was cleaned up and cleanup_delay_period (30s)
/// when we removed something, because inserting one part per 30s will lead to running cleanup each 30s just to remove one part.
/// So we need some interpolation based on preferred batch size.
auto expected_cleanup_points = storage_settings->cleanup_thread_preferred_points_per_iteration;
/// How long should we sleep to remove cleanup_thread_preferred_points_per_iteration on the next iteration?
Float32 ratio = cleanup_points / expected_cleanup_points;
if (ratio == 0)
sleep_ms = storage_settings->max_cleanup_delay_period * 1000;
else
sleep_ms = static_cast<UInt64>(sleep_ms / ratio);
if (sleep_ms < storage_settings->cleanup_delay_period * 1000)
sleep_ms = storage_settings->cleanup_delay_period * 1000;
if (storage_settings->max_cleanup_delay_period * 1000 < sleep_ms)
sleep_ms = storage_settings->max_cleanup_delay_period * 1000;
UInt64 interval_ms = now_ms - prev_timestamp;
LOG_TRACE(log, "Scheduling next cleanup after {}ms (points: {}, interval: {}ms, ratio: {}, points per minute: {})",
sleep_ms, cleanup_points, interval_ms, ratio, cleanup_points / interval_ms * 60'000);
}
prev_cleanup_timestamp_ms.store(now_ms, std::memory_order_relaxed);
sleep_ms += std::uniform_int_distribution<UInt64>(0, storage_settings->cleanup_delay_period_random_add * 1000)(rng);
task->scheduleAfter(sleep_ms);
}
void ReplicatedMergeTreeCleanupThread::iterate()
void ReplicatedMergeTreeCleanupThread::wakeupEarlierIfNeeded()
{
storage.clearOldPartsAndRemoveFromZK();
/// It may happen that the tables was idle for a long time, but then a user started to aggressively insert (or mutate) data.
/// In this case, sleep_ms was set to the highest possible value, the task is not going to wake up soon,
/// but the number of objects to clean up is growing. We need to wakeup the task earlier.
auto storage_settings = storage.getSettings();
if (!storage_settings->cleanup_thread_preferred_points_per_iteration)
return;
/// The number of other objects (logs, blocks, etc) is usually correlated with the number of Outdated parts.
/// Do not wake up unless we have too many.
size_t number_of_outdated_objects = storage.getOutdatedPartsCount();
if (number_of_outdated_objects < storage_settings->cleanup_thread_preferred_points_per_iteration * 2)
return;
/// A race condition is possible here, but it's okay
if (is_running.load(std::memory_order_relaxed))
return;
/// Do not re-check all parts too often (avoid constantly calling getNumberOfOutdatedPartsWithExpiredRemovalTime())
if (!wakeup_check_timer.compareAndRestart(storage_settings->cleanup_delay_period / 4))
return;
UInt64 prev_run_timestamp_ms = prev_cleanup_timestamp_ms.load(std::memory_order_relaxed);
UInt64 now_ms = clock_gettime_ns_adjusted(prev_run_timestamp_ms * 1'000'000) / 1'000'000;
if (!prev_run_timestamp_ms || now_ms <= prev_run_timestamp_ms)
return;
/// Don't run it more often than cleanup_delay_period
UInt64 seconds_passed = (now_ms - prev_run_timestamp_ms) / 1000;
if (seconds_passed < storage_settings->cleanup_delay_period)
return;
/// Do not count parts that cannot be removed anyway. Do not wake up unless we have too many.
number_of_outdated_objects = storage.getNumberOfOutdatedPartsWithExpiredRemovalTime();
if (number_of_outdated_objects < storage_settings->cleanup_thread_preferred_points_per_iteration * 2)
return;
LOG_TRACE(log, "Waking up cleanup thread because there are {} outdated objects and previous cleanup finished {}s ago",
number_of_outdated_objects, seconds_passed);
wakeup();
}
Float32 ReplicatedMergeTreeCleanupThread::iterate()
{
size_t cleaned_logs = 0;
Float32 cleaned_blocks = 0;
size_t cleaned_other = 0;
size_t cleaned_part_like = 0;
size_t cleaned_parts = storage.clearOldPartsAndRemoveFromZK();
auto storage_settings = storage.getSettings();
{
auto lock = storage.lockForShare(RWLockImpl::NO_QUERY, storage.getSettings()->lock_acquire_timeout_for_background_operations);
/// Both use relative_data_path which changes during rename, so we
/// do it under share lock
storage.clearOldWriteAheadLogs();
storage.clearOldTemporaryDirectories(storage.getSettings()->temporary_directories_lifetime.totalSeconds());
cleaned_other += storage.clearOldWriteAheadLogs();
cleaned_part_like += storage.clearOldTemporaryDirectories(storage.getSettings()->temporary_directories_lifetime.totalSeconds());
if (storage.getSettings()->merge_tree_enable_clear_old_broken_detached)
storage.clearOldBrokenPartsFromDetachedDirectory();
cleaned_part_like += storage.clearOldBrokenPartsFromDetachedDirectory();
}
/// This is loose condition: no problem if we actually had lost leadership at this moment
/// and two replicas will try to do cleanup simultaneously.
if (storage.is_leader)
{
clearOldLogs();
auto storage_settings = storage.getSettings();
clearOldBlocks("blocks", storage_settings->replicated_deduplication_window_seconds, storage_settings->replicated_deduplication_window, cached_block_stats_for_sync_inserts);
clearOldBlocks("async_blocks", storage_settings->replicated_deduplication_window_seconds_for_async_inserts, storage_settings->replicated_deduplication_window_for_async_inserts, cached_block_stats_for_async_inserts);
clearOldMutations();
storage.clearEmptyParts();
cleaned_logs = clearOldLogs();
size_t normal_blocks = clearOldBlocks("blocks", storage_settings->replicated_deduplication_window_seconds,
storage_settings->replicated_deduplication_window, cached_block_stats_for_sync_inserts);
size_t async_blocks = clearOldBlocks("async_blocks",
storage_settings->replicated_deduplication_window_seconds_for_async_inserts,
storage_settings->replicated_deduplication_window_for_async_inserts,
cached_block_stats_for_async_inserts);
/// Many async blocks are transformed into one ordinary block
Float32 async_blocks_per_block = static_cast<Float32>(storage_settings->replicated_deduplication_window) /
(storage_settings->replicated_deduplication_window_for_async_inserts + 1);
cleaned_blocks = (normal_blocks + async_blocks * async_blocks_per_block) / 2;
cleaned_other += clearOldMutations();
cleaned_part_like += storage.clearEmptyParts();
}
/// We need to measure the number of removed objects somehow (for better scheduling),
/// but just summing the number of removed async blocks, logs, and empty parts does not make any sense.
/// So we are trying to (approximately) measure the number of inserted blocks/parts, so we will be able to compare apples to apples.
/// Each inserted block produces 3 objects that have to be cleaned up: one block, one log entry and one part.
/// A few new parts get merged together producing one log entry and one part.
/// Other objects (like mutations and WALs) are much more rare than Outdated parts (because mutations usually produce
/// many Outdated parts, and WALs usually contain many parts too). We count then as one part for simplicity.
constexpr Float32 parts_number_amplification = 1.3f; /// Assuming we merge 4-5 parts each time
Float32 cleaned_inserted_parts = (cleaned_blocks + (cleaned_logs + cleaned_parts) / parts_number_amplification) / 3;
return cleaned_inserted_parts + cleaned_part_like + cleaned_other;
}
void ReplicatedMergeTreeCleanupThread::clearOldLogs()
size_t ReplicatedMergeTreeCleanupThread::clearOldLogs()
{
auto zookeeper = storage.getZooKeeper();
auto storage_settings = storage.getSettings();
@ -102,7 +210,7 @@ void ReplicatedMergeTreeCleanupThread::clearOldLogs()
size_t min_replicated_logs_to_keep = static_cast<size_t>(storage_settings->min_replicated_logs_to_keep * ratio);
if (static_cast<double>(children_count) < min_replicated_logs_to_keep)
return;
return 0;
Strings replicas = zookeeper->getChildren(storage.zookeeper_path + "/replicas", &stat);
@ -114,7 +222,7 @@ void ReplicatedMergeTreeCleanupThread::clearOldLogs()
Strings entries = zookeeper->getChildren(storage.zookeeper_path + "/log");
if (entries.empty())
return;
return 0;
::sort(entries.begin(), entries.end());
@ -227,7 +335,7 @@ void ReplicatedMergeTreeCleanupThread::clearOldLogs()
entries.erase(std::lower_bound(entries.begin(), entries.end(), "log-" + padIndex(min_saved_log_pointer)), entries.end());
if (entries.empty())
return;
return 0;
markLostReplicas(
host_versions_lost_replicas,
@ -268,6 +376,8 @@ void ReplicatedMergeTreeCleanupThread::clearOldLogs()
if (i != 0)
LOG_DEBUG(log, "Removed {} old log entries: {} - {}", i, entries[0], entries[i - 1]);
return i;
}
@ -323,7 +433,7 @@ struct ReplicatedMergeTreeCleanupThread::NodeWithStat
}
};
void ReplicatedMergeTreeCleanupThread::clearOldBlocks(const String & blocks_dir_name, UInt64 window_seconds, UInt64 window_size, NodeCTimeAndVersionCache & cached_block_stats)
size_t ReplicatedMergeTreeCleanupThread::clearOldBlocks(const String & blocks_dir_name, UInt64 window_seconds, UInt64 window_size, NodeCTimeAndVersionCache & cached_block_stats)
{
auto zookeeper = storage.getZooKeeper();
@ -331,7 +441,7 @@ void ReplicatedMergeTreeCleanupThread::clearOldBlocks(const String & blocks_dir_
getBlocksSortedByTime(blocks_dir_name, *zookeeper, timed_blocks, cached_block_stats);
if (timed_blocks.empty())
return;
return 0;
/// Use ZooKeeper's first node (last according to time) timestamp as "current" time.
Int64 current_time = timed_blocks.front().ctime;
@ -350,7 +460,7 @@ void ReplicatedMergeTreeCleanupThread::clearOldBlocks(const String & blocks_dir_
auto num_nodes_to_delete = timed_blocks.end() - first_outdated_block;
if (!num_nodes_to_delete)
return;
return 0;
auto last_outdated_block = timed_blocks.end() - 1;
LOG_TRACE(log, "Will clear {} old blocks from {} (ctime {}) to {} (ctime {})", num_nodes_to_delete,
@ -388,6 +498,7 @@ void ReplicatedMergeTreeCleanupThread::clearOldBlocks(const String & blocks_dir_
}
LOG_TRACE(log, "Cleared {} old blocks from ZooKeeper", num_nodes_to_delete);
return num_nodes_to_delete;
}
@ -456,17 +567,17 @@ void ReplicatedMergeTreeCleanupThread::getBlocksSortedByTime(const String & bloc
}
void ReplicatedMergeTreeCleanupThread::clearOldMutations()
size_t ReplicatedMergeTreeCleanupThread::clearOldMutations()
{
auto storage_settings = storage.getSettings();
if (!storage_settings->finished_mutations_to_keep)
return;
return 0;
if (storage.queue.countFinishedMutations() <= storage_settings->finished_mutations_to_keep)
{
/// Not strictly necessary, but helps to avoid unnecessary ZooKeeper requests.
/// If even this replica hasn't finished enough mutations yet, then we don't need to clean anything.
return;
return 0;
}
auto zookeeper = storage.getZooKeeper();
@ -481,7 +592,7 @@ void ReplicatedMergeTreeCleanupThread::clearOldMutations()
// No Need to check return value to delete mutations.
zookeeper->tryGet(storage.zookeeper_path + "/replicas/" + replica + "/mutation_pointer", pointer);
if (pointer.empty())
return; /// One replica hasn't done anything yet so we can't delete any mutations.
return 0; /// One replica hasn't done anything yet so we can't delete any mutations.
min_pointer = std::min(parse<UInt64>(pointer), min_pointer);
}
@ -492,11 +603,11 @@ void ReplicatedMergeTreeCleanupThread::clearOldMutations()
entries.erase(std::upper_bound(entries.begin(), entries.end(), padIndex(min_pointer)), entries.end());
/// Do not remove last `storage_settings->finished_mutations_to_keep` entries.
if (entries.size() <= storage_settings->finished_mutations_to_keep)
return;
return 0;
entries.erase(entries.end() - storage_settings->finished_mutations_to_keep, entries.end());
if (entries.empty())
return;
return 0;
Coordination::Requests ops;
size_t batch_start_i = 0;
@ -526,6 +637,8 @@ void ReplicatedMergeTreeCleanupThread::clearOldMutations()
ops.clear();
}
}
return entries.size();
}
}

View File

@ -4,6 +4,7 @@
#include <Common/ZooKeeper/Types.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/randomSeed.h>
#include <Common/Stopwatch.h>
#include <Core/BackgroundSchedulePool.h>
#include <thread>
@ -31,6 +32,8 @@ public:
void stop() { task->deactivate(); }
void wakeupEarlierIfNeeded();
private:
StorageReplicatedMergeTree & storage;
String log_name;
@ -38,11 +41,20 @@ private:
BackgroundSchedulePool::TaskHolder task;
pcg64 rng{randomSeed()};
void run();
void iterate();
UInt64 sleep_ms;
/// Remove old records from ZooKeeper.
void clearOldLogs();
std::atomic<UInt64> prev_cleanup_timestamp_ms = 0;
std::atomic<bool> is_running = false;
AtomicStopwatch wakeup_check_timer;
void run();
/// Returns a number this is directly proportional to the number of cleaned up blocks
Float32 iterate();
/// Remove old records from ZooKeeper. Returns the number of removed logs
size_t clearOldLogs();
/// The replica is marked as "lost" if it is inactive and its log pointer
/// is far behind and we are not going to keep logs for it.
@ -52,11 +64,11 @@ private:
size_t replicas_count, const zkutil::ZooKeeperPtr & zookeeper);
using NodeCTimeAndVersionCache = std::map<String, std::pair<Int64, Int32>>;
/// Remove old block hashes from ZooKeeper. This is done by the leader replica.
void clearOldBlocks(const String & blocks_dir_name, UInt64 window_seconds, UInt64 window_size, NodeCTimeAndVersionCache & cached_block_stats);
/// Remove old block hashes from ZooKeeper. This is done by the leader replica. Returns the number of removed blocks
size_t clearOldBlocks(const String & blocks_dir_name, UInt64 window_seconds, UInt64 window_size, NodeCTimeAndVersionCache & cached_block_stats);
/// Remove old mutations that are done from ZooKeeper. This is done by the leader replica.
void clearOldMutations();
/// Remove old mutations that are done from ZooKeeper. This is done by the leader replica. Returns the number of removed mutations
size_t clearOldMutations();
NodeCTimeAndVersionCache cached_block_stats_for_sync_inserts;
NodeCTimeAndVersionCache cached_block_stats_for_async_inserts;

View File

@ -28,7 +28,7 @@ struct Estimator
{
double difference = std::abs(log2(static_cast<double>(sum_size) / size_prev_at_left));
if (difference < settings.heuristic_to_align_parts_max_absolute_difference_in_powers_of_two)
current_score *= std::lerp(settings.heuristic_to_align_parts_max_score_adjustment, 1,
current_score *= interpolateLinear(settings.heuristic_to_align_parts_max_score_adjustment, 1,
difference / settings.heuristic_to_align_parts_max_absolute_difference_in_powers_of_two);
}
@ -115,8 +115,8 @@ bool allow(
// std::cerr << "size_normalized: " << size_normalized << "\n";
/// Calculate boundaries for age
double min_age_to_lower_base = std::lerp(settings.min_age_to_lower_base_at_min_size, settings.min_age_to_lower_base_at_max_size, size_normalized);
double max_age_to_lower_base = std::lerp(settings.max_age_to_lower_base_at_min_size, settings.max_age_to_lower_base_at_max_size, size_normalized);
double min_age_to_lower_base = interpolateLinear(settings.min_age_to_lower_base_at_min_size, settings.min_age_to_lower_base_at_max_size, size_normalized);
double max_age_to_lower_base = interpolateLinear(settings.max_age_to_lower_base_at_min_size, settings.max_age_to_lower_base_at_max_size, size_normalized);
// std::cerr << "min_age_to_lower_base: " << min_age_to_lower_base << "\n";
// std::cerr << "max_age_to_lower_base: " << max_age_to_lower_base << "\n";
@ -137,7 +137,7 @@ bool allow(
// std::cerr << "combined_ratio: " << combined_ratio << "\n";
double lowered_base = std::lerp(settings.base, 2.0, combined_ratio);
double lowered_base = interpolateLinear(settings.base, 2.0, combined_ratio);
// std::cerr << "------- lowered_base: " << lowered_base << "\n";

View File

@ -3147,6 +3147,8 @@ bool StorageReplicatedMergeTree::processQueueEntry(ReplicatedMergeTreeQueue::Sel
bool StorageReplicatedMergeTree::scheduleDataProcessingJob(BackgroundJobsAssignee & assignee)
{
cleanup_thread.wakeupEarlierIfNeeded();
/// If replication queue is stopped exit immediately as we successfully executed the task
if (queue.actions_blocker.isCancelled())
return false;
@ -6589,7 +6591,7 @@ bool StorageReplicatedMergeTree::hasLightweightDeletedMask() const
return has_lightweight_delete_parts.load(std::memory_order_relaxed);
}
void StorageReplicatedMergeTree::clearOldPartsAndRemoveFromZK()
size_t StorageReplicatedMergeTree::clearOldPartsAndRemoveFromZK()
{
auto table_lock = lockForShare(
RWLockImpl::NO_QUERY, getSettings()->lock_acquire_timeout_for_background_operations);
@ -6598,8 +6600,9 @@ void StorageReplicatedMergeTree::clearOldPartsAndRemoveFromZK()
/// Now these parts are in Deleting state. If we fail to remove some of them we must roll them back to Outdated state.
/// Otherwise they will not be deleted.
DataPartsVector parts = grabOldParts();
size_t total_parts_to_remove = parts.size();
if (parts.empty())
return;
return total_parts_to_remove;
DataPartsVector parts_to_delete_only_from_filesystem; // Only duplicates
DataPartsVector parts_to_delete_completely; // All parts except duplicates
@ -6707,6 +6710,8 @@ void StorageReplicatedMergeTree::clearOldPartsAndRemoveFromZK()
/// Otherwise nobody will try to remove them again (see grabOldParts).
delete_parts_from_fs_and_rollback_in_case_of_error(parts_to_remove_from_filesystem, "old");
}
return total_parts_to_remove;
}

View File

@ -342,8 +342,8 @@ public:
private:
std::atomic_bool are_restoring_replica {false};
/// Delete old parts from disk and from ZooKeeper.
void clearOldPartsAndRemoveFromZK();
/// Delete old parts from disk and from ZooKeeper. Returns the number of removed parts
size_t clearOldPartsAndRemoveFromZK();
template<bool async_insert>
friend class ReplicatedMergeTreeSinkImpl;

View File

@ -1,5 +1,7 @@
<clickhouse>
<merge_tree>
<number_of_free_entries_in_pool_to_execute_mutation>8</number_of_free_entries_in_pool_to_execute_mutation>
<max_cleanup_delay_period>60</max_cleanup_delay_period>
<cleanup_thread_preferred_points_per_iteration>10</cleanup_thread_preferred_points_per_iteration>
</merge_tree>
</clickhouse>

View File

@ -141,7 +141,8 @@ def test_remove_broken_detached_part_replicated_merge_tree(started_cluster):
merge_tree_enable_clear_old_broken_detached=1,
merge_tree_clear_old_broken_detached_parts_ttl_timeout_seconds=5,
cleanup_delay_period=1,
cleanup_delay_period_random_add=0;
cleanup_delay_period_random_add=0,
cleanup_thread_preferred_points_per_iteration=0;
"""
)

View File

@ -25,7 +25,7 @@ def test_merge_and_part_corruption(started_cluster):
"""
CREATE TABLE replicated_mt(date Date, id UInt32, value Int32)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/replicated_mt', '{replica}') ORDER BY id
SETTINGS cleanup_delay_period=1, cleanup_delay_period_random_add=1;
SETTINGS cleanup_delay_period=1, cleanup_delay_period_random_add=1, cleanup_thread_preferred_points_per_iteration=0;
""".format(
replica=node1.name
)

View File

@ -14,11 +14,13 @@ def initialize_database(nodes, shard):
CREATE TABLE `{database}`.src (p UInt64, d UInt64)
ENGINE = ReplicatedMergeTree('/clickhouse/{database}/tables/test_consistent_shard1{shard}/replicated', '{replica}')
ORDER BY d PARTITION BY p
SETTINGS min_replicated_logs_to_keep=3, max_replicated_logs_to_keep=5, cleanup_delay_period=0, cleanup_delay_period_random_add=0;
SETTINGS min_replicated_logs_to_keep=3, max_replicated_logs_to_keep=5,
cleanup_delay_period=0, cleanup_delay_period_random_add=0, cleanup_thread_preferred_points_per_iteration=0;
CREATE TABLE `{database}`.dest (p UInt64, d UInt64)
ENGINE = ReplicatedMergeTree('/clickhouse/{database}/tables/test_consistent_shard2{shard}/replicated', '{replica}')
ORDER BY d PARTITION BY p
SETTINGS min_replicated_logs_to_keep=3, max_replicated_logs_to_keep=5, cleanup_delay_period=0, cleanup_delay_period_random_add=0;
SETTINGS min_replicated_logs_to_keep=3, max_replicated_logs_to_keep=5,
cleanup_delay_period=0, cleanup_delay_period_random_add=0, cleanup_thread_preferred_points_per_iteration=0;
""".format(
shard=shard, replica=node.name, database=CLICKHOUSE_DATABASE
)

View File

@ -11,7 +11,8 @@ def fill_nodes(nodes, shard):
CREATE DATABASE test;
CREATE TABLE test.test_table(date Date, id UInt32)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/{shard}/replicated/test_table', '{replica}') ORDER BY id PARTITION BY toYYYYMM(date) SETTINGS min_replicated_logs_to_keep=3, max_replicated_logs_to_keep=5, cleanup_delay_period=0, cleanup_delay_period_random_add=0;
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/{shard}/replicated/test_table', '{replica}') ORDER BY id PARTITION BY toYYYYMM(date)
SETTINGS min_replicated_logs_to_keep=3, max_replicated_logs_to_keep=5, cleanup_delay_period=0, cleanup_delay_period_random_add=0, cleanup_thread_preferred_points_per_iteration=0;
""".format(
shard=shard, replica=node.name
)
@ -22,7 +23,8 @@ def fill_nodes(nodes, shard):
CREATE DATABASE test1;
CREATE TABLE test1.test_table(date Date, id UInt32)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test1/{shard}/replicated/test_table', '{replica}') ORDER BY id PARTITION BY toYYYYMM(date) SETTINGS min_replicated_logs_to_keep=3, max_replicated_logs_to_keep=5, cleanup_delay_period=0, cleanup_delay_period_random_add=0;
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test1/{shard}/replicated/test_table', '{replica}') ORDER BY id PARTITION BY toYYYYMM(date)
SETTINGS min_replicated_logs_to_keep=3, max_replicated_logs_to_keep=5, cleanup_delay_period=0, cleanup_delay_period_random_add=0, cleanup_thread_preferred_points_per_iteration=0;
""".format(
shard=shard, replica=node.name
)
@ -33,7 +35,8 @@ def fill_nodes(nodes, shard):
CREATE DATABASE test2;
CREATE TABLE test2.test_table(date Date, id UInt32)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test2/{shard}/replicated/test_table', '{replica}') ORDER BY id PARTITION BY toYYYYMM(date) SETTINGS min_replicated_logs_to_keep=3, max_replicated_logs_to_keep=5, cleanup_delay_period=0, cleanup_delay_period_random_add=0;
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test2/{shard}/replicated/test_table', '{replica}') ORDER BY id PARTITION BY toYYYYMM(date)
SETTINGS min_replicated_logs_to_keep=3, max_replicated_logs_to_keep=5, cleanup_delay_period=0, cleanup_delay_period_random_add=0, cleanup_thread_preferred_points_per_iteration=0;
""".format(
shard=shard, replica=node.name
)
@ -44,7 +47,8 @@ def fill_nodes(nodes, shard):
CREATE DATABASE test3;
CREATE TABLE test3.test_table(date Date, id UInt32)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test3/{shard}/replicated/test_table', '{replica}') ORDER BY id PARTITION BY toYYYYMM(date) SETTINGS min_replicated_logs_to_keep=3, max_replicated_logs_to_keep=5, cleanup_delay_period=0, cleanup_delay_period_random_add=0;
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test3/{shard}/replicated/test_table', '{replica}') ORDER BY id PARTITION BY toYYYYMM(date)
SETTINGS min_replicated_logs_to_keep=3, max_replicated_logs_to_keep=5, cleanup_delay_period=0, cleanup_delay_period_random_add=0, cleanup_thread_preferred_points_per_iteration=0;
""".format(
shard=shard, replica=node.name
)
@ -55,7 +59,8 @@ def fill_nodes(nodes, shard):
CREATE DATABASE test4;
CREATE TABLE test4.test_table(date Date, id UInt32)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test4/{shard}/replicated/test_table', '{replica}') ORDER BY id PARTITION BY toYYYYMM(date) SETTINGS min_replicated_logs_to_keep=3, max_replicated_logs_to_keep=5, cleanup_delay_period=0, cleanup_delay_period_random_add=0;
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test4/{shard}/replicated/test_table', '{replica}') ORDER BY id PARTITION BY toYYYYMM(date)
SETTINGS min_replicated_logs_to_keep=3, max_replicated_logs_to_keep=5, cleanup_delay_period=0, cleanup_delay_period_random_add=0, cleanup_thread_preferred_points_per_iteration=0;
""".format(
shard=shard, replica=node.name
)

View File

@ -134,6 +134,7 @@ def test_replicated_balanced_merge_fetch(start_cluster):
old_parts_lifetime = 1,
cleanup_delay_period = 1,
cleanup_delay_period_random_add = 2,
cleanup_thread_preferred_points_per_iteration=0,
min_bytes_to_rebalance_partition_over_jbod = 1024,
max_bytes_to_merge_at_max_space_in_pool = 4096
""".format(

View File

@ -58,6 +58,7 @@ def test_jbod_ha(start_cluster):
old_parts_lifetime = 1,
cleanup_delay_period = 1,
cleanup_delay_period_random_add = 2,
cleanup_thread_preferred_points_per_iteration=0,
max_bytes_to_merge_at_max_space_in_pool = 4096
""".format(
i

View File

@ -42,7 +42,7 @@ def test_lost_part_same_replica(start_cluster):
for node in [node1, node2]:
node.query(
f"CREATE TABLE mt0 (id UInt64, date Date) ENGINE ReplicatedMergeTree('/clickhouse/tables/t', '{node.name}') ORDER BY tuple() PARTITION BY date "
"SETTINGS cleanup_delay_period=1, cleanup_delay_period_random_add=1"
"SETTINGS cleanup_delay_period=1, cleanup_delay_period_random_add=1, cleanup_thread_preferred_points_per_iteration=0"
)
node1.query("SYSTEM STOP MERGES mt0")
@ -109,7 +109,7 @@ def test_lost_part_other_replica(start_cluster):
for node in [node1, node2]:
node.query(
f"CREATE TABLE mt1 (id UInt64) ENGINE ReplicatedMergeTree('/clickhouse/tables/t1', '{node.name}') ORDER BY tuple() "
"SETTINGS cleanup_delay_period=1, cleanup_delay_period_random_add=1"
"SETTINGS cleanup_delay_period=1, cleanup_delay_period_random_add=1, cleanup_thread_preferred_points_per_iteration=0"
)
node1.query("SYSTEM STOP MERGES mt1")
@ -178,7 +178,7 @@ def test_lost_part_mutation(start_cluster):
for node in [node1, node2]:
node.query(
f"CREATE TABLE mt2 (id UInt64) ENGINE ReplicatedMergeTree('/clickhouse/tables/t2', '{node.name}') ORDER BY tuple() "
"SETTINGS cleanup_delay_period=1, cleanup_delay_period_random_add=1"
"SETTINGS cleanup_delay_period=1, cleanup_delay_period_random_add=1, cleanup_thread_preferred_points_per_iteration=0"
)
node1.query("SYSTEM STOP MERGES mt2")
@ -241,7 +241,7 @@ def test_lost_last_part(start_cluster):
for node in [node1, node2]:
node.query(
f"CREATE TABLE mt3 (id UInt64, p String) ENGINE ReplicatedMergeTree('/clickhouse/tables/t3', '{node.name}') "
"ORDER BY tuple() PARTITION BY p SETTINGS cleanup_delay_period=1, cleanup_delay_period_random_add=1"
"ORDER BY tuple() PARTITION BY p SETTINGS cleanup_delay_period=1, cleanup_delay_period_random_add=1, cleanup_thread_preferred_points_per_iteration=0"
)
node1.query("SYSTEM STOP MERGES mt3")

View File

@ -1528,7 +1528,8 @@ def test_simple_replication_and_moves(start_cluster):
s1 String
) ENGINE = ReplicatedMergeTree('/clickhouse/replicated_table_for_moves', '{}')
ORDER BY tuple()
SETTINGS storage_policy='moving_jbod_with_external', old_parts_lifetime=1, cleanup_delay_period=1, cleanup_delay_period_random_add=2
SETTINGS storage_policy='moving_jbod_with_external', old_parts_lifetime=1,
cleanup_delay_period=1, cleanup_delay_period_random_add=2, cleanup_thread_preferred_points_per_iteration=0
""".format(
i + 1
)
@ -1609,7 +1610,8 @@ def test_download_appropriate_disk(start_cluster):
s1 String
) ENGINE = ReplicatedMergeTree('/clickhouse/replicated_table_for_download', '{}')
ORDER BY tuple()
SETTINGS storage_policy='moving_jbod_with_external', old_parts_lifetime=1, cleanup_delay_period=1, cleanup_delay_period_random_add=2
SETTINGS storage_policy='moving_jbod_with_external', old_parts_lifetime=1,
cleanup_delay_period=1, cleanup_delay_period_random_add=2, cleanup_thread_preferred_points_per_iteration=0
""".format(
i + 1
)

View File

@ -27,7 +27,8 @@ def started_cluster():
def test_part_finally_removed(started_cluster):
node1.query(
"CREATE TABLE drop_outdated_part (Key UInt64) ENGINE = ReplicatedMergeTree('/table/d', '1') ORDER BY tuple() SETTINGS old_parts_lifetime=10, cleanup_delay_period=10, cleanup_delay_period_random_add=1"
"CREATE TABLE drop_outdated_part (Key UInt64) ENGINE = ReplicatedMergeTree('/table/d', '1') ORDER BY tuple() "
"SETTINGS old_parts_lifetime=10, cleanup_delay_period=10, cleanup_delay_period_random_add=1, cleanup_thread_preferred_points_per_iteration=0"
)
node1.query("INSERT INTO drop_outdated_part VALUES (1)")
@ -44,7 +45,7 @@ def test_part_finally_removed(started_cluster):
)
node1.query(
"ALTER TABLE drop_outdated_part MODIFY SETTING old_parts_lifetime=1, cleanup_delay_period=1, cleanup_delay_period_random_add=1"
"ALTER TABLE drop_outdated_part MODIFY SETTING old_parts_lifetime=1, cleanup_delay_period=1, cleanup_delay_period_random_add=1, cleanup_thread_preferred_points_per_iteration=0"
)
for i in range(60):

View File

@ -21,7 +21,7 @@ def start_cluster():
CREATE DATABASE test;
CREATE TABLE test_table(date Date, id UInt32)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/replicated', 'node1')
ORDER BY id PARTITION BY toYYYYMM(date) SETTINGS old_parts_lifetime=4, cleanup_delay_period=1;
ORDER BY id PARTITION BY toYYYYMM(date) SETTINGS old_parts_lifetime=4, cleanup_delay_period=1, cleanup_thread_preferred_points_per_iteration=0;
"""
)

View File

@ -4,7 +4,7 @@ import pytest
from helpers.cluster import ClickHouseCluster
from helpers.test_tools import assert_eq_with_retry
SETTINGS = "SETTINGS min_replicated_logs_to_keep=3, max_replicated_logs_to_keep=5, cleanup_delay_period=0, cleanup_delay_period_random_add=0"
SETTINGS = "SETTINGS min_replicated_logs_to_keep=3, max_replicated_logs_to_keep=5, cleanup_delay_period=0, cleanup_delay_period_random_add=0, cleanup_thread_preferred_points_per_iteration=0"
def fill_nodes(nodes):

View File

@ -931,7 +931,8 @@ def test_nats_overloaded_insert(nats_cluster):
CREATE TABLE test.view_overload (key UInt64, value UInt64)
ENGINE = MergeTree
ORDER BY key
SETTINGS old_parts_lifetime=5, cleanup_delay_period=2, cleanup_delay_period_random_add=3;
SETTINGS old_parts_lifetime=5, cleanup_delay_period=2, cleanup_delay_period_random_add=3,
cleanup_thread_preferred_points_per_iteration=0;
CREATE MATERIALIZED VIEW test.consumer_overload TO test.view_overload AS
SELECT * FROM test.nats_consume;
"""

View File

@ -642,7 +642,8 @@ def test_rabbitmq_sharding_between_queues_publish(rabbitmq_cluster):
CREATE TABLE test.view (key UInt64, value UInt64, channel_id String)
ENGINE = MergeTree
ORDER BY key
SETTINGS old_parts_lifetime=5, cleanup_delay_period=2, cleanup_delay_period_random_add=3;
SETTINGS old_parts_lifetime=5, cleanup_delay_period=2, cleanup_delay_period_random_add=3,
cleanup_thread_preferred_points_per_iteration=0;
CREATE MATERIALIZED VIEW test.consumer TO test.view AS
SELECT *, _channel_id AS channel_id FROM test.rabbitmq;
"""
@ -1116,7 +1117,8 @@ def test_rabbitmq_direct_exchange(rabbitmq_cluster):
CREATE TABLE test.destination(key UInt64, value UInt64)
ENGINE = MergeTree()
ORDER BY key
SETTINGS old_parts_lifetime=5, cleanup_delay_period=2, cleanup_delay_period_random_add=3;
SETTINGS old_parts_lifetime=5, cleanup_delay_period=2, cleanup_delay_period_random_add=3,
cleanup_thread_preferred_points_per_iteration=0;
"""
)

View File

@ -13,7 +13,9 @@ def fill_nodes(nodes, shard):
CREATE DATABASE test;
CREATE TABLE test.test_table(date Date, id UInt32)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test{shard}/replicated', '{replica}') ORDER BY id PARTITION BY toYYYYMM(date) SETTINGS min_replicated_logs_to_keep=3, max_replicated_logs_to_keep=5, cleanup_delay_period=0, cleanup_delay_period_random_add=0;
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test{shard}/replicated', '{replica}') ORDER BY id PARTITION BY toYYYYMM(date)
SETTINGS min_replicated_logs_to_keep=3, max_replicated_logs_to_keep=5,
cleanup_delay_period=0, cleanup_delay_period_random_add=0, cleanup_thread_preferred_points_per_iteration=0;
""".format(
shard=shard, replica=node.name
)

View File

@ -422,7 +422,8 @@ def test_ttl_empty_parts(started_cluster):
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/test_ttl_empty_parts', '{replica}')
ORDER BY id
SETTINGS max_bytes_to_merge_at_min_space_in_pool = 1, max_bytes_to_merge_at_max_space_in_pool = 1,
cleanup_delay_period = 1, cleanup_delay_period_random_add = 0, old_parts_lifetime = 1
cleanup_delay_period = 1, cleanup_delay_period_random_add = 0,
cleanup_thread_preferred_points_per_iteration=0, old_parts_lifetime = 1
""".format(
replica=node.name

View File

@ -36,8 +36,12 @@ $CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS dst_r1;"
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS dst_r2;"
$CLICKHOUSE_CLIENT --query="CREATE TABLE src (p UInt64, k String, d UInt64) ENGINE = MergeTree PARTITION BY p ORDER BY k;"
$CLICKHOUSE_CLIENT --query="CREATE TABLE dst_r1 (p UInt64, k String, d UInt64) ENGINE = ReplicatedMergeTree('/clickhouse/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/dst_1', '1') PARTITION BY p ORDER BY k SETTINGS old_parts_lifetime=1, cleanup_delay_period=1, cleanup_delay_period_random_add=0;"
$CLICKHOUSE_CLIENT --query="CREATE TABLE dst_r2 (p UInt64, k String, d UInt64) ENGINE = ReplicatedMergeTree('/clickhouse/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/dst_1', '2') PARTITION BY p ORDER BY k SETTINGS old_parts_lifetime=1, cleanup_delay_period=1, cleanup_delay_period_random_add=0;"
$CLICKHOUSE_CLIENT --query="CREATE TABLE dst_r1 (p UInt64, k String, d UInt64)
ENGINE = ReplicatedMergeTree('/clickhouse/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/dst_1', '1') PARTITION BY p ORDER BY k
SETTINGS old_parts_lifetime=1, cleanup_delay_period=1, cleanup_delay_period_random_add=0, cleanup_thread_preferred_points_per_iteration=0;"
$CLICKHOUSE_CLIENT --query="CREATE TABLE dst_r2 (p UInt64, k String, d UInt64)
ENGINE = ReplicatedMergeTree('/clickhouse/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/dst_1', '2') PARTITION BY p ORDER BY k
SETTINGS old_parts_lifetime=1, cleanup_delay_period=1, cleanup_delay_period_random_add=0, cleanup_thread_preferred_points_per_iteration=0;"
$CLICKHOUSE_CLIENT --query="INSERT INTO src VALUES (0, '0', 1);"
$CLICKHOUSE_CLIENT --query="INSERT INTO src VALUES (1, '0', 1);"

View File

@ -56,11 +56,13 @@ ${CLICKHOUSE_CLIENT} --query="DROP TABLE IF EXISTS mutations_cleaner_r2 SYNC"
${CLICKHOUSE_CLIENT} --query="CREATE TABLE mutations_cleaner_r1(x UInt32) ENGINE ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/mutations_cleaner', 'r1') ORDER BY x SETTINGS \
finished_mutations_to_keep = 2,
cleanup_delay_period = 1,
cleanup_delay_period_random_add = 0"
cleanup_delay_period_random_add = 0,
cleanup_thread_preferred_points_per_iteration=0"
${CLICKHOUSE_CLIENT} --query="CREATE TABLE mutations_cleaner_r2(x UInt32) ENGINE ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/mutations_cleaner', 'r2') ORDER BY x SETTINGS \
finished_mutations_to_keep = 2,
cleanup_delay_period = 1,
cleanup_delay_period_random_add = 0"
cleanup_delay_period_random_add = 0,
cleanup_thread_preferred_points_per_iteration=0"
# Insert some data
${CLICKHOUSE_CLIENT} --insert_keeper_fault_injection_probability=0 --query="INSERT INTO mutations_cleaner_r1(x) VALUES (1), (2), (3), (4), (5)"

View File

@ -20,13 +20,15 @@ CREATE TABLE part_header_r1(x UInt32, y UInt32)
SETTINGS use_minimalistic_part_header_in_zookeeper = 0,
old_parts_lifetime = 1,
cleanup_delay_period = 0,
cleanup_delay_period_random_add = 0;
cleanup_delay_period_random_add = 0,
cleanup_thread_preferred_points_per_iteration=0;
CREATE TABLE part_header_r2(x UInt32, y UInt32)
ENGINE ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/test_00814/part_header/{shard}', '2{replica}') ORDER BY x
SETTINGS use_minimalistic_part_header_in_zookeeper = 1,
old_parts_lifetime = 1,
cleanup_delay_period = 0,
cleanup_delay_period_random_add = 0;
cleanup_delay_period_random_add = 0,
cleanup_thread_preferred_points_per_iteration=0;
SELECT '*** Test fetches ***';
INSERT INTO part_header_r1 VALUES (1, 1);

View File

@ -22,7 +22,7 @@ CREATE TABLE elog (
ENGINE = ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/elog/{shard}', '{replica}')
PARTITION BY date
ORDER BY (engine_id)
SETTINGS replicated_deduplication_window = 2, cleanup_delay_period=4, cleanup_delay_period_random_add=0;"
SETTINGS replicated_deduplication_window = 2, cleanup_delay_period=4, cleanup_delay_period_random_add=0, cleanup_thread_preferred_points_per_iteration=0;"
$CLICKHOUSE_CLIENT --query="INSERT INTO elog VALUES (toDate('2018-10-01'), 1, 'hello')"
$CLICKHOUSE_CLIENT --query="INSERT INTO elog VALUES (toDate('2018-10-01'), 2, 'hello')"

View File

@ -1,6 +1,8 @@
DROP TABLE IF EXISTS mt;
CREATE TABLE mt (x UInt64) ENGINE = MergeTree ORDER BY x SETTINGS max_part_removal_threads = 16, cleanup_delay_period = 1, cleanup_delay_period_random_add = 0, old_parts_lifetime = 1, parts_to_delay_insert = 100000, parts_to_throw_insert = 100000;
CREATE TABLE mt (x UInt64) ENGINE = MergeTree ORDER BY x
SETTINGS max_part_removal_threads = 16, cleanup_delay_period = 1, cleanup_delay_period_random_add = 0,
cleanup_thread_preferred_points_per_iteration=0, old_parts_lifetime = 1, parts_to_delay_insert = 100000, parts_to_throw_insert = 100000;
SYSTEM STOP MERGES mt;

View File

@ -13,8 +13,14 @@ $CLICKHOUSE_CLIENT -n -q "
DROP TABLE IF EXISTS alter_table0;
DROP TABLE IF EXISTS alter_table1;
CREATE TABLE alter_table0 (a UInt8, b Int16, c Float32, d String, e Array(UInt8), f Nullable(UUID), g Tuple(UInt8, UInt16)) ENGINE = ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/alter_table', 'r1') ORDER BY a PARTITION BY b % 10 SETTINGS old_parts_lifetime = 1, cleanup_delay_period = 1, cleanup_delay_period_random_add = 0, replicated_max_mutations_in_one_entry = $(($RANDOM / 50 + 100));
CREATE TABLE alter_table1 (a UInt8, b Int16, c Float32, d String, e Array(UInt8), f Nullable(UUID), g Tuple(UInt8, UInt16)) ENGINE = ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/alter_table', 'r2') ORDER BY a PARTITION BY b % 10 SETTINGS old_parts_lifetime = 1, cleanup_delay_period = 1, cleanup_delay_period_random_add = 0, replicated_max_mutations_in_one_entry = $(($RANDOM / 50 + 200));
CREATE TABLE alter_table0 (a UInt8, b Int16, c Float32, d String, e Array(UInt8), f Nullable(UUID), g Tuple(UInt8, UInt16))
ENGINE = ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/alter_table', 'r1') ORDER BY a PARTITION BY b % 10
SETTINGS old_parts_lifetime = 1, cleanup_delay_period = 1, cleanup_delay_period_random_add = 0,
cleanup_thread_preferred_points_per_iteration=0, replicated_max_mutations_in_one_entry = $(($RANDOM / 50 + 100));
CREATE TABLE alter_table1 (a UInt8, b Int16, c Float32, d String, e Array(UInt8), f Nullable(UUID), g Tuple(UInt8, UInt16))
ENGINE = ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/alter_table', 'r2') ORDER BY a PARTITION BY b % 10
SETTINGS old_parts_lifetime = 1, cleanup_delay_period = 1, cleanup_delay_period_random_add = 0,
cleanup_thread_preferred_points_per_iteration=0, replicated_max_mutations_in_one_entry = $(($RANDOM / 50 + 200));
"
function thread1()

View File

@ -58,7 +58,8 @@ function thread6()
$CLICKHOUSE_CLIENT -n -q "DROP TABLE IF EXISTS alter_table_$REPLICA;
CREATE TABLE alter_table_$REPLICA (a UInt8, b Int16, c Float32, d String, e Array(UInt8), f Nullable(UUID), g Tuple(UInt8, UInt16))
ENGINE = ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/alter_table', 'r_$REPLICA') ORDER BY a PARTITION BY b % 10
SETTINGS old_parts_lifetime = 1, cleanup_delay_period = 0, cleanup_delay_period_random_add = 0, replicated_max_mutations_in_one_entry = $(($RANDOM / 50));";
SETTINGS old_parts_lifetime = 1, cleanup_delay_period = 0, cleanup_delay_period_random_add = 0,
cleanup_thread_preferred_points_per_iteration=0, replicated_max_mutations_in_one_entry = $(($RANDOM / 50));";
sleep 0.$RANDOM;
done
}

View File

@ -28,7 +28,8 @@ $CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS src;"
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS dst;"
$CLICKHOUSE_CLIENT --query="CREATE TABLE src (p UInt64, k String, d UInt64) ENGINE = ReplicatedMergeTree('/clickhouse/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/src1', '1') PARTITION BY p ORDER BY k;"
$CLICKHOUSE_CLIENT --query="CREATE TABLE dst (p UInt64, k String, d UInt64) ENGINE = ReplicatedMergeTree('/clickhouse/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/dst1', '1') PARTITION BY p ORDER BY k SETTINGS old_parts_lifetime=1, cleanup_delay_period=1, cleanup_delay_period_random_add=0;"
$CLICKHOUSE_CLIENT --query="CREATE TABLE dst (p UInt64, k String, d UInt64) ENGINE = ReplicatedMergeTree('/clickhouse/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/dst1', '1') PARTITION BY p ORDER BY k
SETTINGS old_parts_lifetime=1, cleanup_delay_period=1, cleanup_delay_period_random_add=0, cleanup_thread_preferred_points_per_iteration=0;"
$CLICKHOUSE_CLIENT --query="INSERT INTO src VALUES (0, '0', 1);"
$CLICKHOUSE_CLIENT --query="INSERT INTO src VALUES (1, '0', 1);"
@ -58,7 +59,8 @@ $CLICKHOUSE_CLIENT --query="DROP TABLE dst;"
$CLICKHOUSE_CLIENT --query="SELECT 'MOVE incompatible schema missing column';"
$CLICKHOUSE_CLIENT --query="CREATE TABLE src (p UInt64, k String, d UInt64) ENGINE = ReplicatedMergeTree('/clickhouse/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/src2', '1') PARTITION BY p ORDER BY (d, p);"
$CLICKHOUSE_CLIENT --query="CREATE TABLE dst (p UInt64, d UInt64) ENGINE = ReplicatedMergeTree('/clickhouse/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/dst2', '1') PARTITION BY p ORDER BY (d, p) SETTINGS old_parts_lifetime=1, cleanup_delay_period=1, cleanup_delay_period_random_add=0;"
$CLICKHOUSE_CLIENT --query="CREATE TABLE dst (p UInt64, d UInt64) ENGINE = ReplicatedMergeTree('/clickhouse/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/dst2', '1') PARTITION BY p ORDER BY (d, p)
SETTINGS old_parts_lifetime=1, cleanup_delay_period=1, cleanup_delay_period_random_add=0, cleanup_thread_preferred_points_per_iteration=0;"
$CLICKHOUSE_CLIENT --query="INSERT INTO src VALUES (0, '0', 1);"
$CLICKHOUSE_CLIENT --query="INSERT INTO src VALUES (1, '0', 1);"

View File

@ -11,7 +11,8 @@ $CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS src;"
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS dst;"
$CLICKHOUSE_CLIENT --query="CREATE TABLE src (p UInt64, k String) ENGINE = ReplicatedMergeTree('/clickhouse/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/src', '1') PARTITION BY p ORDER BY k;"
$CLICKHOUSE_CLIENT --query="CREATE TABLE dst (p UInt64, k String) ENGINE = ReplicatedMergeTree('/clickhouse/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/dst', '1') PARTITION BY p ORDER BY k SETTINGS old_parts_lifetime=1, cleanup_delay_period=1, cleanup_delay_period_random_add=0;"
$CLICKHOUSE_CLIENT --query="CREATE TABLE dst (p UInt64, k String) ENGINE = ReplicatedMergeTree('/clickhouse/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/dst', '1') PARTITION BY p ORDER BY k
SETTINGS old_parts_lifetime=1, cleanup_delay_period=1, cleanup_delay_period_random_add=0, cleanup_thread_preferred_points_per_iteration=0;"
function thread1()
{

View File

@ -31,7 +31,8 @@ for i in $(seq $REPLICAS); do
max_replicated_merges_in_queue = 1000,
temporary_directories_lifetime = 10,
cleanup_delay_period = 3,
cleanup_delay_period_random_add = 0"
cleanup_delay_period_random_add = 0,
cleanup_thread_preferred_points_per_iteration=0"
done
$CLICKHOUSE_CLIENT --query "INSERT INTO concurrent_mutate_mt_1 SELECT number, number + 10, toString(number) from numbers(10)"

View File

@ -12,7 +12,10 @@ for i in $(seq $REPLICAS); do
done
for i in $(seq $REPLICAS); do
$CLICKHOUSE_CLIENT --query "CREATE TABLE concurrent_alter_detach_$i (key UInt64, value1 UInt8, value2 UInt8) ENGINE = ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/concurrent_alter_detach', '$i') ORDER BY key SETTINGS max_replicated_mutations_in_queue=1000, number_of_free_entries_in_pool_to_execute_mutation=0,max_replicated_merges_in_queue=1000,temporary_directories_lifetime=10,cleanup_delay_period=3,cleanup_delay_period_random_add=0"
$CLICKHOUSE_CLIENT --query "CREATE TABLE concurrent_alter_detach_$i (key UInt64, value1 UInt8, value2 UInt8)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/concurrent_alter_detach', '$i') ORDER BY key
SETTINGS max_replicated_mutations_in_queue=1000, number_of_free_entries_in_pool_to_execute_mutation=0,max_replicated_merges_in_queue=1000,
temporary_directories_lifetime=10,cleanup_delay_period=3,cleanup_delay_period_random_add=0,cleanup_thread_preferred_points_per_iteration=0"
done
$CLICKHOUSE_CLIENT --query "INSERT INTO concurrent_alter_detach_1 SELECT number, number + 10, number from numbers(10)"

View File

@ -27,7 +27,9 @@ function thread3()
{
while true; do
$CLICKHOUSE_CLIENT -n -q "DROP TABLE IF EXISTS concurrent_optimize_table;
CREATE TABLE concurrent_optimize_table (a UInt8, b Int16, c Float32, d String, e Array(UInt8), f Nullable(UUID), g Tuple(UInt8, UInt16)) ENGINE = ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/concurrent_optimize_table', '1') ORDER BY a PARTITION BY b % 10 SETTINGS old_parts_lifetime = 1, cleanup_delay_period = 0, cleanup_delay_period_random_add = 0;";
CREATE TABLE concurrent_optimize_table (a UInt8, b Int16, c Float32, d String, e Array(UInt8), f Nullable(UUID), g Tuple(UInt8, UInt16))
ENGINE = ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/concurrent_optimize_table', '1') ORDER BY a PARTITION BY b % 10
SETTINGS old_parts_lifetime = 1, cleanup_delay_period = 0, cleanup_delay_period_random_add = 0, cleanup_thread_preferred_points_per_iteration=0;";
sleep 0.$RANDOM;
sleep 0.$RANDOM;
sleep 0.$RANDOM;

View File

@ -6,7 +6,7 @@ SET insert_keeper_fault_injection_probability=0; -- disable fault injection; par
drop table if exists rmt sync;
-- cleanup code will perform extra Exists
-- (so the .reference will not match)
create table rmt (n int) engine=ReplicatedMergeTree('/test/01158/{database}/rmt', '1') order by n settings cleanup_delay_period=86400, replicated_can_become_leader=0;
create table rmt (n int) engine=ReplicatedMergeTree('/test/01158/{database}/rmt', '1') order by n settings cleanup_delay_period=86400, max_cleanup_delay_period=86400, replicated_can_become_leader=0;
system sync replica rmt;
insert into rmt values (1);
insert into rmt values (1);

View File

@ -13,8 +13,10 @@ SCALE=5000
$CLICKHOUSE_CLIENT -n --query "
DROP TABLE IF EXISTS r1;
DROP TABLE IF EXISTS r2;
CREATE TABLE r1 (x UInt64) ENGINE = ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/{shard}', '1{replica}') ORDER BY x SETTINGS old_parts_lifetime = 1, cleanup_delay_period = 0, cleanup_delay_period_random_add = 1, parts_to_throw_insert = 100000, max_replicated_logs_to_keep = 10;
CREATE TABLE r2 (x UInt64) ENGINE = ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/{shard}', '2{replica}') ORDER BY x SETTINGS old_parts_lifetime = 1, cleanup_delay_period = 0, cleanup_delay_period_random_add = 1, parts_to_throw_insert = 100000, max_replicated_logs_to_keep = 10;
CREATE TABLE r1 (x UInt64) ENGINE = ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/{shard}', '1{replica}') ORDER BY x
SETTINGS old_parts_lifetime = 1, cleanup_delay_period = 0, cleanup_delay_period_random_add = 1, cleanup_thread_preferred_points_per_iteration=0, parts_to_throw_insert = 100000, max_replicated_logs_to_keep = 10;
CREATE TABLE r2 (x UInt64) ENGINE = ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/{shard}', '2{replica}') ORDER BY x
SETTINGS old_parts_lifetime = 1, cleanup_delay_period = 0, cleanup_delay_period_random_add = 1, cleanup_thread_preferred_points_per_iteration=0, parts_to_throw_insert = 100000, max_replicated_logs_to_keep = 10;
DETACH TABLE r2;
"

View File

@ -8,7 +8,9 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS table_for_renames0"
$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS table_for_renames50"
$CLICKHOUSE_CLIENT --query "CREATE TABLE table_for_renames0 (value UInt64, data String) ENGINE ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/concurrent_rename', '1') ORDER BY tuple() SETTINGS cleanup_delay_period = 1, cleanup_delay_period_random_add = 0"
$CLICKHOUSE_CLIENT --query "CREATE TABLE table_for_renames0 (value UInt64, data String)
ENGINE ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/concurrent_rename', '1') ORDER BY tuple()
SETTINGS cleanup_delay_period = 1, cleanup_delay_period_random_add = 0, cleanup_thread_preferred_points_per_iteration=0"
$CLICKHOUSE_CLIENT --query "INSERT INTO table_for_renames0 SELECT number, toString(number) FROM numbers(1000)"

View File

@ -13,7 +13,8 @@ $CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS parallel_q1 SYNC"
$CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS parallel_q2 SYNC"
$CLICKHOUSE_CLIENT -q "CREATE TABLE parallel_q1 (x UInt64) ENGINE=ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/parallel_q', 'r1') ORDER BY tuple() SETTINGS old_parts_lifetime = 1, cleanup_delay_period = 0, cleanup_delay_period_random_add = 0"
$CLICKHOUSE_CLIENT -q "CREATE TABLE parallel_q1 (x UInt64) ENGINE=ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/parallel_q', 'r1') ORDER BY tuple()
SETTINGS old_parts_lifetime = 1, cleanup_delay_period = 0, cleanup_delay_period_random_add = 0, cleanup_thread_preferred_points_per_iteration=0"
$CLICKHOUSE_CLIENT -q "CREATE TABLE parallel_q2 (x UInt64) ENGINE=ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/parallel_q', 'r2') ORDER BY tuple() SETTINGS always_fetch_merged_part = 1"

View File

@ -24,7 +24,8 @@ for i in $(seq 1 $NUM_REPLICAS); do
ENGINE ReplicatedMergeTree('/test/01921_concurrent_ttl_and_normal_merges/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/ttl_table', '$i')
ORDER BY tuple()
TTL key + INTERVAL 1 SECOND
SETTINGS merge_with_ttl_timeout=1, max_replicated_merges_with_ttl_in_queue=100, max_number_of_merges_with_ttl_in_pool=100, cleanup_delay_period=1, cleanup_delay_period_random_add=0;"
SETTINGS merge_with_ttl_timeout=1, max_replicated_merges_with_ttl_in_queue=100, max_number_of_merges_with_ttl_in_pool=100,
cleanup_delay_period=1, cleanup_delay_period_random_add=0, cleanup_thread_preferred_points_per_iteration=0;"
done
function optimize_thread

View File

@ -4,11 +4,17 @@ DROP TABLE IF EXISTS partslost_0;
DROP TABLE IF EXISTS partslost_1;
DROP TABLE IF EXISTS partslost_2;
CREATE TABLE partslost_0 (x String) ENGINE=ReplicatedMergeTree('/clickhouse/table/{database}_02067_lost/partslost', '0') ORDER BY tuple() SETTINGS min_rows_for_wide_part = 0, min_bytes_for_wide_part = 0, old_parts_lifetime = 1, cleanup_delay_period = 1, cleanup_delay_period_random_add = 1;
CREATE TABLE partslost_0 (x String) ENGINE=ReplicatedMergeTree('/clickhouse/table/{database}_02067_lost/partslost', '0') ORDER BY tuple()
SETTINGS min_rows_for_wide_part = 0, min_bytes_for_wide_part = 0, old_parts_lifetime = 1,
cleanup_delay_period = 1, cleanup_delay_period_random_add = 1, cleanup_thread_preferred_points_per_iteration=0;
CREATE TABLE partslost_1 (x String) ENGINE=ReplicatedMergeTree('/clickhouse/table/{database}_02067_lost/partslost', '1') ORDER BY tuple() SETTINGS min_rows_for_wide_part = 0, min_bytes_for_wide_part = 0, old_parts_lifetime = 1, cleanup_delay_period = 1, cleanup_delay_period_random_add = 1;
CREATE TABLE partslost_1 (x String) ENGINE=ReplicatedMergeTree('/clickhouse/table/{database}_02067_lost/partslost', '1') ORDER BY tuple()
SETTINGS min_rows_for_wide_part = 0, min_bytes_for_wide_part = 0, old_parts_lifetime = 1,
cleanup_delay_period = 1, cleanup_delay_period_random_add = 1, cleanup_thread_preferred_points_per_iteration=0;
CREATE TABLE partslost_2 (x String) ENGINE=ReplicatedMergeTree('/clickhouse/table/{database}_02067_lost/partslost', '2') ORDER BY tuple() SETTINGS min_rows_for_wide_part = 0, min_bytes_for_wide_part = 0, old_parts_lifetime = 1, cleanup_delay_period = 1, cleanup_delay_period_random_add = 1;
CREATE TABLE partslost_2 (x String) ENGINE=ReplicatedMergeTree('/clickhouse/table/{database}_02067_lost/partslost', '2') ORDER BY tuple()
SETTINGS min_rows_for_wide_part = 0, min_bytes_for_wide_part = 0, old_parts_lifetime = 1,
cleanup_delay_period = 1, cleanup_delay_period_random_add = 1, cleanup_thread_preferred_points_per_iteration=0;
INSERT INTO partslost_0 SELECT toString(number) AS x from system.numbers LIMIT 10000;

View File

@ -9,7 +9,7 @@ $CLICKHOUSE_CLIENT -q "drop table if exists rmt1 sync;"
$CLICKHOUSE_CLIENT -q "drop table if exists rmt2 sync;"
$CLICKHOUSE_CLIENT -q "create table rmt1 (n int) engine=ReplicatedMergeTree('/test/02369/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/{database}', '1') order by n
settings cleanup_delay_period=0, cleanup_delay_period_random_add=0, old_parts_lifetime=0"
settings cleanup_delay_period=0, cleanup_delay_period_random_add=0, cleanup_thread_preferred_points_per_iteration=0, old_parts_lifetime=0"
$CLICKHOUSE_CLIENT -q "create table rmt2 (n int) engine=ReplicatedMergeTree('/test/02369/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/{database}', '2') order by n"
$CLICKHOUSE_CLIENT -q "system stop replicated sends rmt2"

View File

@ -15,8 +15,12 @@ $CLICKHOUSE_CLIENT -n -q "
DROP TABLE IF EXISTS alter_table0;
DROP TABLE IF EXISTS alter_table1;
CREATE TABLE alter_table0 (a UInt8, b Int16, c Float32, d String, e Array(UInt8), f Nullable(UUID), g Tuple(UInt8, UInt16)) ENGINE = ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/alter_table', 'r1') ORDER BY a PARTITION BY b % 10 SETTINGS old_parts_lifetime = 1, cleanup_delay_period = 1, cleanup_delay_period_random_add = 0;
CREATE TABLE alter_table1 (a UInt8, b Int16, c Float32, d String, e Array(UInt8), f Nullable(UUID), g Tuple(UInt8, UInt16)) ENGINE = ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/alter_table', 'r2') ORDER BY a PARTITION BY b % 10 SETTINGS old_parts_lifetime = 1, cleanup_delay_period = 1, cleanup_delay_period_random_add = 0
CREATE TABLE alter_table0 (a UInt8, b Int16, c Float32, d String, e Array(UInt8), f Nullable(UUID), g Tuple(UInt8, UInt16))
ENGINE = ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/alter_table', 'r1') ORDER BY a PARTITION BY b % 10
SETTINGS old_parts_lifetime = 1, cleanup_delay_period = 1, cleanup_delay_period_random_add = 0, cleanup_thread_preferred_points_per_iteration=0;
CREATE TABLE alter_table1 (a UInt8, b Int16, c Float32, d String, e Array(UInt8), f Nullable(UUID), g Tuple(UInt8, UInt16))
ENGINE = ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/alter_table', 'r2') ORDER BY a PARTITION BY b % 10
SETTINGS old_parts_lifetime = 1, cleanup_delay_period = 1, cleanup_delay_period_random_add = 0, cleanup_thread_preferred_points_per_iteration=0
"
function thread1()

View File

@ -58,7 +58,9 @@ function thread6()
while true; do
REPLICA=$(($RANDOM % 10))
$CLICKHOUSE_CLIENT -n -q "DROP TABLE IF EXISTS alter_table_$REPLICA;
CREATE TABLE alter_table_$REPLICA (a UInt8, b Int16, c Float32, d String, e Array(UInt8), f Nullable(UUID), g Tuple(UInt8, UInt16)) ENGINE = ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/alter_table', 'r_$REPLICA') ORDER BY a PARTITION BY b % 10 SETTINGS old_parts_lifetime = 1, cleanup_delay_period = 0, cleanup_delay_period_random_add = 0;";
CREATE TABLE alter_table_$REPLICA (a UInt8, b Int16, c Float32, d String, e Array(UInt8), f Nullable(UUID), g Tuple(UInt8, UInt16))
ENGINE = ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/alter_table', 'r_$REPLICA') ORDER BY a PARTITION BY b % 10
SETTINGS old_parts_lifetime = 1, cleanup_delay_period = 0, cleanup_delay_period_random_add = 0, cleanup_thread_preferred_points_per_iteration=0;";
sleep 0.$RANDOM;
done
}

View File

@ -8,7 +8,7 @@ drop table if exists rmt2;
-- Disable compact parts, because we need hardlinks in mutations.
create table rmt (n int, m int, k int) engine=ReplicatedMergeTree('/test/02432/{database}', '1') order by tuple()
settings storage_policy = 's3_cache', allow_remote_fs_zero_copy_replication=1,
max_part_removal_threads=10, concurrent_part_removal_threshold=1, cleanup_delay_period=1, cleanup_delay_period_random_add=1,
max_part_removal_threads=10, concurrent_part_removal_threshold=1, cleanup_delay_period=1, cleanup_delay_period_random_add=1, cleanup_thread_preferred_points_per_iteration=0,
max_replicated_merges_in_queue=0, max_replicated_mutations_in_queue=0, min_bytes_for_wide_part=0, min_rows_for_wide_part=0;
insert into rmt(n, m) values (1, 42);
@ -38,7 +38,7 @@ select count(), sum(n), sum(m) from rmt;
-- New table can assign merges/mutations and can remove old parts
create table rmt2 (n int, m int, k String) engine=ReplicatedMergeTree('/test/02432/{database}', '2') order by tuple()
settings storage_policy = 's3_cache', allow_remote_fs_zero_copy_replication=1,
max_part_removal_threads=10, concurrent_part_removal_threshold=1, cleanup_delay_period=1, cleanup_delay_period_random_add=1,
max_part_removal_threads=10, concurrent_part_removal_threshold=1, cleanup_delay_period=1, cleanup_delay_period_random_add=1, cleanup_thread_preferred_points_per_iteration=0,
min_bytes_for_wide_part=0, min_rows_for_wide_part=0, max_replicated_merges_in_queue=1,
old_parts_lifetime=0;

View File

@ -5,9 +5,11 @@ SET insert_keeper_fault_injection_probability=0; -- disable fault injection; par
drop table if exists rmt1;
drop table if exists rmt2;
create table rmt1 (n int) engine=ReplicatedMergeTree('/test/02448/{database}/rmt', '1') order by tuple()
settings min_replicated_logs_to_keep=1, max_replicated_logs_to_keep=2, cleanup_delay_period=0, cleanup_delay_period_random_add=1, old_parts_lifetime=0, max_parts_to_merge_at_once=4;
settings min_replicated_logs_to_keep=1, max_replicated_logs_to_keep=2, cleanup_delay_period=0, cleanup_delay_period_random_add=1,
cleanup_thread_preferred_points_per_iteration=0, old_parts_lifetime=0, max_parts_to_merge_at_once=4;
create table rmt2 (n int) engine=ReplicatedMergeTree('/test/02448/{database}/rmt', '2') order by tuple()
settings min_replicated_logs_to_keep=1, max_replicated_logs_to_keep=2, cleanup_delay_period=0, cleanup_delay_period_random_add=1, old_parts_lifetime=0, max_parts_to_merge_at_once=4;
settings min_replicated_logs_to_keep=1, max_replicated_logs_to_keep=2, cleanup_delay_period=0, cleanup_delay_period_random_add=1,
cleanup_thread_preferred_points_per_iteration=0, old_parts_lifetime=0, max_parts_to_merge_at_once=4;
-- insert part only on one replica
system stop replicated sends rmt1;

View File

@ -24,7 +24,8 @@ CREATE TABLE wikistat1
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{database}/02494_zero_copy_and_projection', '1')
ORDER BY (path, time)
SETTINGS old_parts_lifetime = 1, cleanup_delay_period = 0, cleanup_delay_period_random_add = 0, allow_remote_fs_zero_copy_replication=1, min_bytes_for_wide_part=0;
SETTINGS old_parts_lifetime = 1, cleanup_delay_period = 0, cleanup_delay_period_random_add = 0,
cleanup_thread_preferred_points_per_iteration=0, allow_remote_fs_zero_copy_replication=1, min_bytes_for_wide_part=0;
CREATE TABLE wikistat2
(
@ -49,7 +50,8 @@ CREATE TABLE wikistat2
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{database}/02494_zero_copy_and_projection', '2')
ORDER BY (path, time)
SETTINGS old_parts_lifetime = 1, cleanup_delay_period = 0, cleanup_delay_period_random_add = 0, allow_remote_fs_zero_copy_replication=1, min_bytes_for_wide_part=0;
SETTINGS old_parts_lifetime = 1, cleanup_delay_period = 0, cleanup_delay_period_random_add = 0,
cleanup_thread_preferred_points_per_iteration=0, allow_remote_fs_zero_copy_replication=1, min_bytes_for_wide_part=0;
INSERT INTO wikistat1 SELECT toDateTime('2020-10-01 00:00:00'), 'hello', 'world', '/data/path', 10 from numbers(100);

View File

@ -13,7 +13,7 @@ $CLICKHOUSE_CLIENT -n --query "
CREATE TABLE t_async_insert_cleanup (
KeyID UInt32
) Engine = ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/t_async_insert_cleanup', '{replica}')
ORDER BY (KeyID) SETTINGS cleanup_delay_period = 1, cleanup_delay_period_random_add = 1, replicated_deduplication_window_for_async_inserts=10
ORDER BY (KeyID) SETTINGS cleanup_delay_period = 1, cleanup_delay_period_random_add = 1, cleanup_thread_preferred_points_per_iteration=0, replicated_deduplication_window_for_async_inserts=10
"
for i in {1..100}; do