Backport #63353 to 24.3: Fix logical error during SELECT query after ALTER in rare case

This commit is contained in:
robot-clickhouse 2024-05-10 11:09:53 +00:00
parent 04fc0aba29
commit 042b24b8e6
7 changed files with 193 additions and 11 deletions

View File

@ -264,7 +264,8 @@ void IMergeTreeReader::performRequiredConversions(Columns & res_columns) const
/// Move columns from block. /// Move columns from block.
name_and_type = requested_columns.begin(); name_and_type = requested_columns.begin();
for (size_t pos = 0; pos < num_columns; ++pos, ++name_and_type) for (size_t pos = 0; pos < num_columns; ++pos, ++name_and_type)
res_columns[pos] = std::move(copy_block.getByName(name_and_type->name).column); if (copy_block.has(name_and_type->name))
res_columns[pos] = std::move(copy_block.getByName(name_and_type->name).column);
} }
catch (Exception & e) catch (Exception & e)
{ {

View File

@ -1004,6 +1004,10 @@ MergeTreeRangeReader::ReadResult MergeTreeRangeReader::read(size_t max_rows, Mar
filterColumns(columns, read_result.final_filter); filterColumns(columns, read_result.final_filter);
} }
/// If columns not empty, then apply on-fly alter conversions if any required
if (!prewhere_info || prewhere_info->perform_alter_conversions)
merge_tree_reader->performRequiredConversions(columns);
/// If some columns absent in part, then evaluate default values /// If some columns absent in part, then evaluate default values
if (should_evaluate_missing_defaults) if (should_evaluate_missing_defaults)
{ {
@ -1014,10 +1018,6 @@ MergeTreeRangeReader::ReadResult MergeTreeRangeReader::read(size_t max_rows, Mar
addDummyColumnWithRowCount(additional_columns, read_result.num_rows); addDummyColumnWithRowCount(additional_columns, read_result.num_rows);
merge_tree_reader->evaluateMissingDefaults(additional_columns, columns); merge_tree_reader->evaluateMissingDefaults(additional_columns, columns);
} }
/// If columns not empty, then apply on-fly alter conversions if any required
if (!prewhere_info || prewhere_info->perform_alter_conversions)
merge_tree_reader->performRequiredConversions(columns);
} }
read_result.columns.reserve(read_result.columns.size() + columns.size()); read_result.columns.reserve(read_result.columns.size() + columns.size());
@ -1043,14 +1043,14 @@ MergeTreeRangeReader::ReadResult MergeTreeRangeReader::read(size_t max_rows, Mar
bool should_evaluate_missing_defaults; bool should_evaluate_missing_defaults;
merge_tree_reader->fillMissingColumns(columns, should_evaluate_missing_defaults, read_result.num_rows); merge_tree_reader->fillMissingColumns(columns, should_evaluate_missing_defaults, read_result.num_rows);
/// If some columns absent in part, then evaluate default values
if (should_evaluate_missing_defaults)
merge_tree_reader->evaluateMissingDefaults({}, columns);
/// If result not empty, then apply on-fly alter conversions if any required /// If result not empty, then apply on-fly alter conversions if any required
if (!prewhere_info || prewhere_info->perform_alter_conversions) if (!prewhere_info || prewhere_info->perform_alter_conversions)
merge_tree_reader->performRequiredConversions(columns); merge_tree_reader->performRequiredConversions(columns);
/// If some columns absent in part, then evaluate default values
if (should_evaluate_missing_defaults)
merge_tree_reader->evaluateMissingDefaults({}, columns);
for (size_t i = 0; i < columns.size(); ++i) for (size_t i = 0; i < columns.size(); ++i)
read_result.columns[i] = std::move(columns[i]); read_result.columns[i] = std::move(columns[i]);
} }

View File

@ -240,11 +240,11 @@ try
bool should_evaluate_missing_defaults = false; bool should_evaluate_missing_defaults = false;
reader->fillMissingColumns(columns, should_evaluate_missing_defaults, rows_read); reader->fillMissingColumns(columns, should_evaluate_missing_defaults, rows_read);
reader->performRequiredConversions(columns);
if (should_evaluate_missing_defaults) if (should_evaluate_missing_defaults)
reader->evaluateMissingDefaults({}, columns); reader->evaluateMissingDefaults({}, columns);
reader->performRequiredConversions(columns);
/// Reorder columns and fill result block. /// Reorder columns and fill result block.
size_t num_columns = sample.size(); size_t num_columns = sample.size();
Columns res_columns; Columns res_columns;

View File

@ -0,0 +1,10 @@
0 0_42
1 1_42
2 2_42
3 3_42
4 4_42
5 5_42
6 6_42
7 7_42
8 8_42
9 9_42

View File

@ -0,0 +1,11 @@
drop table if exists tab;
create table tab (x UInt32) engine = MergeTree order by tuple();
insert into tab select number from numbers(10);
set alter_sync = 0;
alter table tab update x = x + sleepEachRow(0.1) where 1;
alter table tab modify column x String;
alter table tab add column y String default x || '_42';
select x, y from tab order by x;

View File

@ -0,0 +1,11 @@
Starting alters
Finishing alters
Equal number of columns
Replication did not hang: synced all replicas of concurrent_alter_add_drop_steroids_
Consistency: 1
0
0
0
0
0
0

View File

@ -0,0 +1,149 @@
#!/usr/bin/env bash
# Tags: zookeeper, no-parallel, no-fasttest
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
# shellcheck source=./replication.lib
. "$CURDIR"/replication.lib
REPLICAS=3
for i in $(seq $REPLICAS); do
$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS concurrent_alter_add_drop_steroids_$i"
done
for i in $(seq $REPLICAS); do
$CLICKHOUSE_CLIENT --query "CREATE TABLE concurrent_alter_add_drop_steroids_$i (key UInt64, value0 UInt8) ENGINE = ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/concurrent_alter_add_drop_steroids_column', '$i') ORDER BY key SETTINGS max_replicated_mutations_in_queue = 1000, number_of_free_entries_in_pool_to_execute_mutation = 0, max_replicated_merges_in_queue = 1000, index_granularity = 8192, index_granularity_bytes = '10Mi'"
done
$CLICKHOUSE_CLIENT --query "INSERT INTO concurrent_alter_add_drop_steroids_1 SELECT number, number + 10 from numbers(100000)"
for i in $(seq $REPLICAS); do
$CLICKHOUSE_CLIENT --query "SYSTEM SYNC REPLICA concurrent_alter_add_drop_steroids_$i"
done
function alter_thread()
{
while true; do
REPLICA=$(($RANDOM % 3 + 1))
ADD=$(($RANDOM % 5 + 1))
$CLICKHOUSE_CLIENT --query "ALTER TABLE concurrent_alter_add_drop_steroids_$REPLICA ADD COLUMN value$ADD UInt32 DEFAULT 42 SETTINGS replication_alter_partitions_sync=0"; # additionaly we don't wait anything for more heavy concurrency
DROP=$(($RANDOM % 5 + 1))
$CLICKHOUSE_CLIENT --query "ALTER TABLE concurrent_alter_add_drop_steroids_$REPLICA DROP COLUMN value$DROP SETTINGS replication_alter_partitions_sync=0"; # additionaly we don't wait anything for more heavy concurrency
sleep 0.$RANDOM
done
}
function alter_thread_1()
{
while true; do
REPLICA=$(($RANDOM % 3 + 1))
${CLICKHOUSE_CLIENT} --query "ALTER TABLE concurrent_alter_add_drop_steroids_1 MODIFY COLUMN value0 String SETTINGS mutations_sync = 0"
sleep 1.$RANDOM
${CLICKHOUSE_CLIENT} --query "ALTER TABLE concurrent_alter_add_drop_steroids_1 MODIFY COLUMN value0 UInt8 SETTINGS mutations_sync = 0"
sleep 1.$RANDOM
done
}
function optimize_thread()
{
while true; do
REPLICA=$(($RANDOM % 3 + 1))
$CLICKHOUSE_CLIENT --query "OPTIMIZE TABLE concurrent_alter_add_drop_steroids_$REPLICA FINAL SETTINGS replication_alter_partitions_sync=0";
sleep 0.$RANDOM
done
}
function insert_thread()
{
while true; do
REPLICA=$(($RANDOM % 3 + 1))
$CLICKHOUSE_CLIENT --query "INSERT INTO concurrent_alter_add_drop_steroids_$REPLICA VALUES($RANDOM, 7)"
sleep 0.$RANDOM
done
}
function select_thread()
{
while true; do
REPLICA=$(($RANDOM % 3 + 1))
$CLICKHOUSE_CLIENT --query "SELECT * FROM merge(currentDatabase(), 'concurrent_alter_add_drop_steroids_') FORMAT Null"
sleep 0.$RANDOM
done
}
echo "Starting alters"
export -f alter_thread;
export -f alter_thread_1;
export -f select_thread;
export -f optimize_thread;
export -f insert_thread;
TIMEOUT=30
# Sometimes we detach and attach tables
timeout $TIMEOUT bash -c alter_thread 2> /dev/null &
timeout $TIMEOUT bash -c alter_thread 2> /dev/null &
timeout $TIMEOUT bash -c alter_thread 2> /dev/null &
timeout $TIMEOUT bash -c alter_thread_1 2> /dev/null &
timeout $TIMEOUT bash -c alter_thread_1 2> /dev/null &
timeout $TIMEOUT bash -c alter_thread_1 2> /dev/null &
timeout $TIMEOUT bash -c select_thread 2> /dev/null &
timeout $TIMEOUT bash -c select_thread 2> /dev/null &
timeout $TIMEOUT bash -c select_thread 2> /dev/null &
timeout $TIMEOUT bash -c optimize_thread 2> /dev/null &
timeout $TIMEOUT bash -c optimize_thread 2> /dev/null &
timeout $TIMEOUT bash -c optimize_thread 2> /dev/null &
timeout $TIMEOUT bash -c insert_thread 2> /dev/null &
timeout $TIMEOUT bash -c insert_thread 2> /dev/null &
timeout $TIMEOUT bash -c insert_thread 2> /dev/null &
timeout $TIMEOUT bash -c insert_thread 2> /dev/null &
timeout $TIMEOUT bash -c insert_thread 2> /dev/null &
wait
echo "Finishing alters"
columns1=$($CLICKHOUSE_CLIENT --query "select count() from system.columns where table='concurrent_alter_add_drop_steroids_1' and database='$CLICKHOUSE_DATABASE'" 2> /dev/null)
columns2=$($CLICKHOUSE_CLIENT --query "select count() from system.columns where table='concurrent_alter_add_drop_steroids_2' and database='$CLICKHOUSE_DATABASE'" 2> /dev/null)
columns3=$($CLICKHOUSE_CLIENT --query "select count() from system.columns where table='concurrent_alter_add_drop_steroids_3' and database='$CLICKHOUSE_DATABASE'" 2> /dev/null)
while [ "$columns1" != "$columns2" ] || [ "$columns2" != "$columns3" ]; do
columns1=$($CLICKHOUSE_CLIENT --query "select count() from system.columns where table='concurrent_alter_add_drop_steroids_1' and database='$CLICKHOUSE_DATABASE'" 2> /dev/null)
columns2=$($CLICKHOUSE_CLIENT --query "select count() from system.columns where table='concurrent_alter_add_drop_steroids_2' and database='$CLICKHOUSE_DATABASE'" 2> /dev/null)
columns3=$($CLICKHOUSE_CLIENT --query "select count() from system.columns where table='concurrent_alter_add_drop_steroids_3' and database='$CLICKHOUSE_DATABASE'" 2> /dev/null)
sleep 1
done
echo "Equal number of columns"
# This alter will finish all previous, but replica 1 maybe still not up-to-date
while [[ $(timeout 120 ${CLICKHOUSE_CLIENT} --query "ALTER TABLE concurrent_alter_add_drop_steroids_1 MODIFY COLUMN value0 String SETTINGS replication_alter_partitions_sync=2" 2>&1) ]]; do
sleep 1
done
check_replication_consistency "concurrent_alter_add_drop_steroids_" "count(), sum(key), sum(cityHash64(value0))"
for i in $(seq $REPLICAS); do
$CLICKHOUSE_CLIENT --query "SYSTEM SYNC REPLICA concurrent_alter_add_drop_steroids_$i"
$CLICKHOUSE_CLIENT --query "SELECT COUNT() FROM system.mutations WHERE is_done = 0 and table = 'concurrent_alter_add_drop_steroids_$i'"
$CLICKHOUSE_CLIENT --query "SELECT * FROM system.mutations WHERE is_done = 0 and table = 'concurrent_alter_add_drop_steroids_$i'"
$CLICKHOUSE_CLIENT --query "SELECT COUNT() FROM system.replication_queue WHERE table = 'concurrent_alter_add_drop_steroids_$i'"
$CLICKHOUSE_CLIENT --query "SELECT * FROM system.replication_queue WHERE table = 'concurrent_alter_add_drop_steroids_$i' and (type = 'ALTER_METADATA' or type = 'MUTATE_PART')"
$CLICKHOUSE_CLIENT --query "DETACH TABLE concurrent_alter_add_drop_steroids_$i"
$CLICKHOUSE_CLIENT --query "ATTACH TABLE concurrent_alter_add_drop_steroids_$i"
$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS concurrent_alter_add_drop_steroids_$i"
done