diff --git a/programs/copier/ClusterCopier.cpp b/programs/copier/ClusterCopier.cpp index 5a9e135a4fa..7fa0f663295 100644 --- a/programs/copier/ClusterCopier.cpp +++ b/programs/copier/ClusterCopier.cpp @@ -1323,7 +1323,7 @@ TaskStatus ClusterCopier::processPartitionPieceTaskImpl( /// Try start processing, create node about it { String start_state = TaskStateWithOwner::getData(TaskState::Started, host_id); - CleanStateClock new_clean_state_clock(zookeeper, piece_is_dirty_flag_path, piece_is_dirty_cleaned_path); + CleanStateClock new_clean_state_clock (zookeeper, piece_is_dirty_flag_path, piece_is_dirty_cleaned_path); if (clean_state_clock != new_clean_state_clock) { LOG_INFO(log, "Partition {} piece {} clean state changed, cowardly bailing", task_partition.name, toString(current_piece_number)); @@ -1360,8 +1360,7 @@ TaskStatus ClusterCopier::processPartitionPieceTaskImpl( LOG_DEBUG(log, "Create destination tables. Query: {}", query); UInt64 shards = executeQueryOnCluster(task_table.cluster_push, query, task_cluster->settings_push, PoolMode::GET_MANY); - LOG_DEBUG(log, "Destination tables {} have been created on {} shards of {}", - getQuotedTable(task_table.table_push), shards, task_table.cluster_push->getShardCount()); + LOG_DEBUG(log, "Destination tables {} have been created on {} shards of {}", getQuotedTable(task_table.table_push), shards, task_table.cluster_push->getShardCount()); } /// Do the copying @@ -1393,17 +1392,17 @@ TaskStatus ClusterCopier::processPartitionPieceTaskImpl( try { /// Custom INSERT SELECT implementation + Context context_select = context; + context_select.setSettings(task_cluster->settings_pull); + + Context context_insert = context; + context_insert.setSettings(task_cluster->settings_push); + BlockInputStreamPtr input; BlockOutputStreamPtr output; { - std::unique_ptr context_select = std::make_unique(context); - context_select->setSettings(task_cluster->settings_pull); - - std::unique_ptr context_insert = std::make_unique(context); - context_insert->setSettings(task_cluster->settings_push); - - BlockIO io_select = InterpreterFactory::get(query_select_ast, *context_select)->execute(); - BlockIO io_insert = InterpreterFactory::get(query_insert_ast, *context_insert)->execute(); + BlockIO io_select = InterpreterFactory::get(query_select_ast, context_select)->execute(); + BlockIO io_insert = InterpreterFactory::get(query_insert_ast, context_insert)->execute(); input = io_select.getInputStream(); output = io_insert.out; diff --git a/tests/queries/0_stateless/01396_inactive_replica_cleanup_nodes.reference b/tests/queries/0_stateless/01396_inactive_replica_cleanup_nodes.reference new file mode 100644 index 00000000000..e80c22b893b --- /dev/null +++ b/tests/queries/0_stateless/01396_inactive_replica_cleanup_nodes.reference @@ -0,0 +1,10 @@ +1 + +--- + +0 +1 + +--- + +0 diff --git a/tests/queries/0_stateless/01396_inactive_replica_cleanup_nodes.sh b/tests/queries/0_stateless/01396_inactive_replica_cleanup_nodes.sh new file mode 100755 index 00000000000..27fa4261504 --- /dev/null +++ b/tests/queries/0_stateless/01396_inactive_replica_cleanup_nodes.sh @@ -0,0 +1,43 @@ +#!/usr/bin/env bash + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +. $CURDIR/../shell_config.sh + +# Check that if we have one inactive replica and a huge number of INSERTs to active replicas, +# the number of nodes in ZooKeeper does not grow unbounded. + +$CLICKHOUSE_CLIENT -n --query " + DROP TABLE IF EXISTS r1; + DROP TABLE IF EXISTS r2; + CREATE TABLE r1 (x UInt64) ENGINE = ReplicatedMergeTree('/clickhouse/tables/r', '1') ORDER BY x SETTINGS old_parts_lifetime = 1, cleanup_delay_period = 0, cleanup_delay_period_random_add = 1, parts_to_throw_insert = 100000, max_replicated_logs_to_keep = 10; + CREATE TABLE r2 (x UInt64) ENGINE = ReplicatedMergeTree('/clickhouse/tables/r', '2') ORDER BY x SETTINGS old_parts_lifetime = 1, cleanup_delay_period = 0, cleanup_delay_period_random_add = 1, parts_to_throw_insert = 100000, max_replicated_logs_to_keep = 10; + DETACH TABLE r2; +" + +$CLICKHOUSE_CLIENT --max_block_size 1 --min_insert_block_size_rows 1 --min_insert_block_size_bytes 1 --max_insert_threads 16 --query "INSERT INTO r1 SELECT * FROM numbers_mt(10000)" + + +# Now wait for cleanup thread + +for i in {1..60}; do + $CLICKHOUSE_CLIENT --query "SYSTEM FLUSH LOGS" + [[ $($CLICKHOUSE_CLIENT --query "SELECT sum(toUInt32(extract(message, 'Removed (\d+) old log entries'))) FROM system.text_log WHERE event_date >= yesterday() AND logger_name LIKE '%' || currentDatabase() || '%r1%(ReplicatedMergeTreeCleanupThread)%' AND message LIKE '%Removed % old log entries%'") -gt 9900 ]] && break; + sleep 1 +done + + +$CLICKHOUSE_CLIENT --query "SELECT numChildren < 2500 FROM system.zookeeper WHERE path = '/clickhouse/tables/r' AND name = 'log'"; +echo -e '\n---\n'; +$CLICKHOUSE_CLIENT --query "SELECT value FROM system.zookeeper WHERE path = '/clickhouse/tables/r/replicas/1' AND name = 'is_lost'"; +$CLICKHOUSE_CLIENT --query "SELECT value FROM system.zookeeper WHERE path = '/clickhouse/tables/r/replicas/2' AND name = 'is_lost'"; +echo -e '\n---\n'; + +$CLICKHOUSE_CLIENT --query "ATTACH TABLE r2" +$CLICKHOUSE_CLIENT --receive_timeout 600 --query "SYSTEM SYNC REPLICA r2" # Need to increase timeout, otherwise it timed out in debug build + +$CLICKHOUSE_CLIENT --query "SELECT value FROM system.zookeeper WHERE path = '/clickhouse/tables/r/replicas/2' AND name = 'is_lost'"; + +$CLICKHOUSE_CLIENT -n --query " + DROP TABLE IF EXISTS r1; + DROP TABLE IF EXISTS r2; +"