#!/usr/bin/env bash # shellcheck source=./mergetree_mutations.lib . "$CURDIR"/mergetree_mutations.lib function try_sync_replicas() { table_name_prefix=$1 time_left=$2 readarray -t empty_partitions_arr < <(${CLICKHOUSE_CLIENT} -q \ "SELECT DISTINCT substr(new_part_name, 1, position(new_part_name, '_') - 1) AS partition_id FROM system.replication_queue WHERE (database = currentDatabase()) AND (table LIKE '$table_name_prefix%') AND (last_exception LIKE '%No active replica has part%') AND (partition_id NOT IN ( SELECT partition_id FROM system.parts WHERE (database = currentDatabase()) AND (table LIKE '$table_name_prefix%') ))") readarray -t tables_arr < <(${CLICKHOUSE_CLIENT} -q "SELECT name FROM system.tables WHERE database=currentDatabase() AND name like '$table_name_prefix%' AND engine like '%Replicated%'") for t in "${tables_arr[@]}" do for p in "${empty_partitions_arr[@]}" do # Avoid "Empty part ... is not created instead of lost part because there are no parts in partition" $CLICKHOUSE_CLIENT -q "ALTER TABLE $t DROP PARTITION ID '$p'" 2>/dev/null done done i=0 for t in "${tables_arr[@]}" do # Do not start new merges (it can make SYNC a bit faster) $CLICKHOUSE_CLIENT -q "ALTER TABLE $t MODIFY SETTING max_replicated_merges_in_queue=0" $CLICKHOUSE_CLIENT --receive_timeout $time_left -q "SYSTEM SYNC REPLICA $t STRICT" || ($CLICKHOUSE_CLIENT -q \ "select 'sync failed, queue:', * from system.replication_queue where database=currentDatabase() and table='$t' order by database, table, node_name" && exit 1) & pids[${i}]=$! i=$((i + 1)) done for pid in "${pids[@]}"; do wait $pid || (echo "Failed to sync some replicas" && exit 1) done echo "Replication did not hang: synced all replicas of $table_name_prefix" } function check_replication_consistency() { table_name_prefix=$1 check_query_part=$2 # Wait for all queries to finish (query may still be running if thread is killed by timeout) num_tries=0 while [[ $($CLICKHOUSE_CLIENT -q "SELECT count() FROM system.processes WHERE current_database=currentDatabase() AND query LIKE '%$table_name_prefix%'") -ne 1 ]]; do sleep 1; num_tries=$((num_tries+1)) if [ $num_tries -eq 250 ]; then echo "Queries for $table_name_prefix did not finish automatically after 250+ seconds" echo "==================== QUERIES ====================" $CLICKHOUSE_CLIENT -q "SELECT * FROM system.processes WHERE current_database=currentDatabase() AND query LIKE '%$table_name_prefix%' FORMAT Vertical" echo "==================== STACK TRACES ====================" $CLICKHOUSE_CLIENT -q "SELECT query_id, thread_name, thread_id, arrayStringConcat(arrayMap(x -> demangle(addressToSymbol(x)), trace), '\n') FROM system.stack_trace where query_id IN (SELECT query_id FROM system.processes WHERE current_database=currentDatabase() AND query LIKE '%$table_name_prefix%') SETTINGS allow_introspection_functions=1 FORMAT Vertical" echo "==================== MUTATIONS ====================" $CLICKHOUSE_CLIENT -q "SELECT * FROM system.mutations WHERE database=currentDatabase() FORMAT Vertical" break fi done # Do not check anything if all replicas are readonly, # because is this case all replicas are probably lost (it may happen and it's not a bug) res=$($CLICKHOUSE_CLIENT -q "SELECT count() - sum(is_readonly) FROM system.replicas WHERE database=currentDatabase() AND table LIKE '$table_name_prefix%'") if [ $res -eq 0 ]; then # Print dummy lines echo "Replication did not hang: synced all replicas of $table_name_prefix" echo "Consistency: 1" return 0 fi # Touch all data to check that it's readable (and trigger PartCheckThread if needed) while ! $CLICKHOUSE_CLIENT -q "SELECT * FROM merge(currentDatabase(), '$table_name_prefix') FORMAT Null" 2>/dev/null; do sleep 1; num_tries=$((num_tries+1)) if [ $num_tries -eq 250 ]; then break fi done time_left=$((300 - num_tries)) # Trigger pullLogsToQueue(...) and updateMutations(...) on some replica to make it pull all mutations, so it will be possible to kill them some_table=$($CLICKHOUSE_CLIENT -q "SELECT name FROM system.tables WHERE database=currentDatabase() AND name like '$table_name_prefix%' ORDER BY rand() LIMIT 1") $CLICKHOUSE_CLIENT -q "SYSTEM SYNC REPLICA $some_table PULL" 1>/dev/null 2>/dev/null ||: some_table=$($CLICKHOUSE_CLIENT -q "SELECT name FROM system.tables WHERE database=currentDatabase() AND name like '$table_name_prefix%' ORDER BY rand() LIMIT 1") $CLICKHOUSE_CLIENT -q "SYSTEM SYNC REPLICA $some_table PULL" 1>/dev/null 2>/dev/null ||: # Forcefully cancel mutations to avoid waiting for them to finish ${CLICKHOUSE_CLIENT} -q "KILL MUTATION WHERE database=currentDatabase() AND table like '$table_name_prefix%'" > /dev/null # SYNC REPLICA is not enough if some MUTATE_PARTs are not assigned yet wait_for_all_mutations "$table_name_prefix%" try_sync_replicas "$table_name_prefix" "$time_left" || exit 1 res=$($CLICKHOUSE_CLIENT -q \ "SELECT if((countDistinct(data) as c) == 0, 1, c) FROM ( SELECT _table, ($check_query_part) AS data FROM merge(currentDatabase(), '$table_name_prefix') GROUP BY _table )") echo "Consistency: $res" if [ $res -ne 1 ]; then echo "Replicas have diverged:" $CLICKHOUSE_CLIENT -q "select 'data', _table, $check_query_part, arraySort(groupArrayDistinct(_part)) from merge(currentDatabase(), '$table_name_prefix') group by _table order by _table" $CLICKHOUSE_CLIENT -q "select 'queue', * from system.replication_queue where database=currentDatabase() and table like '$table_name_prefix%' order by database, table, node_name" $CLICKHOUSE_CLIENT -q "select 'mutations', * from system.mutations where database=currentDatabase() and table like '$table_name_prefix%' order by database, table, mutation_id" $CLICKHOUSE_CLIENT -q "select 'parts', * from system.parts where database=currentDatabase() and table like '$table_name_prefix%' order by database, table, name" echo "Good luck with debugging..." exit 1 fi } # vi: ft=bash