Merge pull request #6514 from yandex/added-test-for-race-conditions

Added a test for race conditions.
This commit is contained in:
alexey-milovidov 2019-08-21 01:33:34 +03:00 committed by GitHub
commit 58358ec856
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 292 additions and 31 deletions

View File

@ -67,7 +67,8 @@ BlockIO InterpreterAlterQuery::execute()
if (!mutation_commands.empty())
{
MutationsInterpreter(table, mutation_commands, context).validate();
auto table_lock_holder = table->lockStructureForShare(false /* because mutation is executed asyncronously */, context.getCurrentQueryId());
MutationsInterpreter(table, mutation_commands, context).validate(table_lock_holder);
table->mutate(mutation_commands, context);
}

View File

@ -23,7 +23,6 @@ BlockIO InterpreterOptimizeQuery::execute()
return executeDDLQueryOnCluster(query_ptr, context, {ast.database});
StoragePtr table = context.getTable(ast.database, ast.table);
auto table_lock = table->lockStructureForShare(true, context.getCurrentQueryId());
table->optimize(query_ptr, ast.partition, ast.final, ast.deduplicate, context);
return {};
}

View File

@ -458,7 +458,7 @@ BlockInputStreamPtr MutationsInterpreter::addStreamsForLaterStages(const std::ve
return in;
}
void MutationsInterpreter::validate()
void MutationsInterpreter::validate(TableStructureReadLockHolder &)
{
prepare(/* dry_run = */ true);
Block first_stage_header = interpreter_select->getSampleBlock();
@ -466,7 +466,7 @@ void MutationsInterpreter::validate()
addStreamsForLaterStages(stages, in)->getHeader();
}
BlockInputStreamPtr MutationsInterpreter::execute()
BlockInputStreamPtr MutationsInterpreter::execute(TableStructureReadLockHolder &)
{
prepare(/* dry_run = */ false);
BlockInputStreamPtr in = interpreter_select->execute().in;

View File

@ -25,13 +25,13 @@ public:
{
}
void validate();
void validate(TableStructureReadLockHolder & table_lock_holder);
/// Return false if the data isn't going to be changed by mutations.
bool isStorageTouchedByMutations() const;
/// The resulting stream will return blocks containing only changed columns and columns, that we need to recalculate indices.
BlockInputStreamPtr execute();
BlockInputStreamPtr execute(TableStructureReadLockHolder & table_lock_holder);
/// Only changed columns.
const Block & getUpdatedHeader() const;
@ -44,7 +44,6 @@ private:
std::unique_ptr<InterpreterSelectQuery> prepareInterpreterSelect(std::vector<Stage> & prepared_stages, bool dry_run);
BlockInputStreamPtr addStreamsForLaterStages(const std::vector<Stage> & prepared_stages, BlockInputStreamPtr in) const;
private:
StoragePtr storage;
std::vector<MutationCommand> commands;
const Context & context;

View File

@ -521,7 +521,7 @@ public:
/// parts should be sorted.
MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTemporaryPart(
const FutureMergedMutatedPart & future_part, MergeList::Entry & merge_entry,
const FutureMergedMutatedPart & future_part, MergeList::Entry & merge_entry, TableStructureReadLockHolder &,
time_t time_of_merge, DiskSpaceMonitor::Reservation * disk_reservation, bool deduplicate, bool force_ttl)
{
static const String TMP_PREFIX = "tmp_merge_";
@ -883,7 +883,8 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
const FutureMergedMutatedPart & future_part,
const std::vector<MutationCommand> & commands,
MergeListEntry & merge_entry,
const Context & context)
const Context & context,
TableStructureReadLockHolder & table_lock_holder)
{
auto check_not_cancelled = [&]()
{
@ -918,7 +919,6 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
command.partition, context_for_reading);
});
MutationsInterpreter mutations_interpreter(storage_from_source_part, commands_for_part, context_for_reading);
if (!mutations_interpreter.isStorageTouchedByMutations())
@ -949,7 +949,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
Poco::File(new_part_tmp_path).createDirectories();
auto in = mutations_interpreter.execute();
auto in = mutations_interpreter.execute(table_lock_holder);
const auto & updated_header = mutations_interpreter.getUpdatedHeader();
NamesAndTypesList all_columns = data.getColumns().getAllPhysical();

View File

@ -94,14 +94,14 @@ public:
*/
MergeTreeData::MutableDataPartPtr mergePartsToTemporaryPart(
const FutureMergedMutatedPart & future_part,
MergeListEntry & merge_entry, time_t time_of_merge,
MergeListEntry & merge_entry, TableStructureReadLockHolder & table_lock_holder, time_t time_of_merge,
DiskSpaceMonitor::Reservation * disk_reservation, bool deduplication, bool force_ttl);
/// Mutate a single data part with the specified commands. Will create and return a temporary part.
MergeTreeData::MutableDataPartPtr mutatePartToTemporaryPart(
const FutureMergedMutatedPart & future_part,
const std::vector<MutationCommand> & commands,
MergeListEntry & merge_entry, const Context & context);
MergeListEntry & merge_entry, const Context & context, TableStructureReadLockHolder & table_lock_holder);
MergeTreeData::DataPartPtr renameMergedTemporaryPart(
MergeTreeData::MutableDataPartPtr & new_data_part,

View File

@ -1281,9 +1281,9 @@ bool ReplicatedMergeTreeQueue::tryFinalizeMutations(zkutil::ZooKeeperPtr zookeep
}
if (!finished.empty())
{
zookeeper->set(replica_path + "/mutation_pointer", finished.back()->znode_name);
{
std::lock_guard lock(state_mutex);
mutation_pointer = finished.back()->znode_name;
@ -1655,7 +1655,7 @@ bool ReplicatedMergeTreeMergePredicate::operator()(
std::optional<Int64> ReplicatedMergeTreeMergePredicate::getDesiredMutationVersion(const MergeTreeData::DataPartPtr & part) const
{
/// Assigning mutations is easier than assigning merges because mutations appear in the same order as
/// the order of their version numbers (see StorageReplicatedMergeTree::mutate()).
/// the order of their version numbers (see StorageReplicatedMergeTree::mutate).
/// This means that if we have loaded the mutation with version number X then all mutations with
/// the version numbers less than X are also loaded and if there is no merge or mutation assigned to
/// the part (checked by querying queue.virtual_parts), we can confidently assign a mutation to

View File

@ -206,29 +206,35 @@ std::vector<MergeTreeData::AlterDataPartTransactionPtr> StorageMergeTree::prepar
const Settings & settings_ = context.getSettingsRef();
size_t thread_pool_size = std::min<size_t>(parts.size(), settings_.max_alter_threads);
ThreadPool thread_pool(thread_pool_size);
std::optional<ThreadPool> thread_pool;
if (thread_pool_size > 1)
thread_pool.emplace(thread_pool_size);
for (const auto & part : parts)
{
transactions.push_back(std::make_unique<MergeTreeData::AlterDataPartTransaction>(part));
thread_pool.schedule(
[this, & transaction = transactions.back(), & columns_for_parts, & new_indices = new_indices.indices]
{
this->alterDataPart(columns_for_parts, new_indices, false, transaction);
}
);
auto job = [this, & transaction = transactions.back(), & columns_for_parts, & new_indices = new_indices.indices]
{
this->alterDataPart(columns_for_parts, new_indices, false, transaction);
};
if (thread_pool)
thread_pool->schedule(job);
else
job();
}
thread_pool.wait();
if (thread_pool)
thread_pool->wait();
auto erase_pos = std::remove_if(transactions.begin(), transactions.end(),
[](const MergeTreeData::AlterDataPartTransactionPtr & transaction)
{
return !transaction->isValid();
}
);
});
transactions.erase(erase_pos, transactions.end());
return transactions;
@ -596,7 +602,7 @@ bool StorageMergeTree::merge(
bool force_ttl = (final && (hasTableTTL() || hasAnyColumnTTL()));
new_part = merger_mutator.mergePartsToTemporaryPart(
future_part, *merge_entry, time(nullptr),
future_part, *merge_entry, table_lock_holder, time(nullptr),
merging_tagger->reserved_space.get(), deduplicate, force_ttl);
merger_mutator.renameMergedTemporaryPart(new_part, future_part.parts, nullptr);
removeEmptyColumnsFromPart(new_part);
@ -714,7 +720,7 @@ bool StorageMergeTree::tryMutatePart()
try
{
new_part = merger_mutator.mutatePartToTemporaryPart(future_part, commands, *merge_entry, global_context);
new_part = merger_mutator.mutatePartToTemporaryPart(future_part, commands, *merge_entry, global_context, table_lock_holder);
renameTempPartAndReplace(new_part);
tagger->is_successful = true;
write_part_log({});

View File

@ -1087,7 +1087,7 @@ bool StorageReplicatedMergeTree::tryExecuteMerge(const LogEntry & entry)
try
{
part = merger_mutator.mergePartsToTemporaryPart(
future_merged_part, *merge_entry, entry.create_time, reserved_space.get(), entry.deduplicate, entry.force_ttl);
future_merged_part, *merge_entry, table_lock, entry.create_time, reserved_space.get(), entry.deduplicate, entry.force_ttl);
merger_mutator.renameMergedTemporaryPart(part, parts, &transaction);
removeEmptyColumnsFromPart(part);
@ -1216,7 +1216,7 @@ bool StorageReplicatedMergeTree::tryExecutePartMutation(const StorageReplicatedM
try
{
new_part = merger_mutator.mutatePartToTemporaryPart(future_mutated_part, commands, *merge_entry, global_context);
new_part = merger_mutator.mutatePartToTemporaryPart(future_mutated_part, commands, *merge_entry, global_context, table_lock);
renameTempPartAndReplace(new_part, nullptr, &transaction);
try
@ -3110,9 +3110,10 @@ bool StorageReplicatedMergeTree::optimize(const ASTPtr & query, const ASTPtr & p
}
}
/// TODO: Bad setting name for such purpose
if (query_context.getSettingsRef().replication_alter_partitions_sync != 0)
{
/// NOTE Table lock must not be held while waiting. Some combination of R-W-R locks from different threads will yield to deadlock.
/// TODO Check all other "wait" places.
for (auto & merge_entry : merge_entries)
waitForAllReplicasToProcessLogEntry(merge_entry);
}

View File

@ -470,10 +470,15 @@ private:
/** Wait until all replicas, including this, execute the specified action from the log.
* If replicas are added at the same time, it can not wait the added replica .
*
* NOTE: This method must be called without table lock held.
* Because it effectively waits for other thread that usually has to also acquire a lock to proceed and this yields deadlock.
* TODO: There are wrong usages of this method that are not fixed yet.
*/
void waitForAllReplicasToProcessLogEntry(const ReplicatedMergeTreeLogEntryData & entry);
/** Wait until the specified replica executes the specified action from the log.
* NOTE: See comment about locks above.
*/
void waitForReplicaToProcessLogEntry(const String & replica_name, const ReplicatedMergeTreeLogEntryData & entry);

View File

@ -40,7 +40,7 @@ ninja
## Start ClickHouse and run tests
```
sudo -u clickhouse TSAN_OPTIONS='halt_on_error=1,suppressions=../dbms/tests/tsan_suppressions.txt' ./clickhouse-tsan server --config /etc/clickhouse-server/config.xml
sudo -u clickhouse TSAN_OPTIONS='halt_on_error=1' ./clickhouse-tsan server --config /etc/clickhouse-server/config.xml
```

View File

@ -0,0 +1,76 @@
#!/usr/bin/env bash
# This test is disabled because it triggers internal assert in Thread Sanitizer.
# Thread Sanitizer does not support for more than 64 mutexes to be locked in a single thread.
# https://github.com/google/sanitizers/issues/950
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. $CURDIR/../shell_config.sh
set -e
$CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS alter_table"
$CLICKHOUSE_CLIENT -q "CREATE TABLE alter_table (a UInt8, b Int16, c Float32, d String, e Array(UInt8), f Nullable(UUID), g Tuple(UInt8, UInt16)) ENGINE = MergeTree ORDER BY a PARTITION BY b % 10 SETTINGS old_parts_lifetime = 1"
function thread1()
{
while true; do $CLICKHOUSE_CLIENT --query "SELECT * FROM system.parts FORMAT Null"; done
}
function thread2()
{
while true; do $CLICKHOUSE_CLIENT -n --query "ALTER TABLE alter_table ADD COLUMN h String; ALTER TABLE alter_table MODIFY COLUMN h UInt64; ALTER TABLE alter_table DROP COLUMN h;"; done
}
function thread3()
{
while true; do $CLICKHOUSE_CLIENT -q "INSERT INTO alter_table SELECT rand(1), rand(2), 1 / rand(3), toString(rand(4)), [rand(5), rand(6)], rand(7) % 2 ? NULL : generateUUIDv4(), (rand(8), rand(9)) FROM numbers(100000)"; done
}
function thread4()
{
while true; do $CLICKHOUSE_CLIENT -q "OPTIMIZE TABLE alter_table FINAL"; done
}
function thread5()
{
while true; do $CLICKHOUSE_CLIENT -q "ALTER TABLE alter_table DELETE WHERE rand() % 2 = 1"; done
}
# https://stackoverflow.com/questions/9954794/execute-a-shell-function-with-timeout
export -f thread1;
export -f thread2;
export -f thread3;
export -f thread4;
export -f thread5;
TIMEOUT=30
timeout $TIMEOUT bash -c thread1 2> /dev/null &
timeout $TIMEOUT bash -c thread2 2> /dev/null &
timeout $TIMEOUT bash -c thread3 2> /dev/null &
timeout $TIMEOUT bash -c thread4 2> /dev/null &
timeout $TIMEOUT bash -c thread5 2> /dev/null &
timeout $TIMEOUT bash -c thread1 2> /dev/null &
timeout $TIMEOUT bash -c thread2 2> /dev/null &
timeout $TIMEOUT bash -c thread3 2> /dev/null &
timeout $TIMEOUT bash -c thread4 2> /dev/null &
timeout $TIMEOUT bash -c thread5 2> /dev/null &
timeout $TIMEOUT bash -c thread1 2> /dev/null &
timeout $TIMEOUT bash -c thread2 2> /dev/null &
timeout $TIMEOUT bash -c thread3 2> /dev/null &
timeout $TIMEOUT bash -c thread4 2> /dev/null &
timeout $TIMEOUT bash -c thread5 2> /dev/null &
timeout $TIMEOUT bash -c thread1 2> /dev/null &
timeout $TIMEOUT bash -c thread2 2> /dev/null &
timeout $TIMEOUT bash -c thread3 2> /dev/null &
timeout $TIMEOUT bash -c thread4 2> /dev/null &
timeout $TIMEOUT bash -c thread5 2> /dev/null &
wait
$CLICKHOUSE_CLIENT -q "DROP TABLE alter_table"

View File

@ -0,0 +1,74 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. $CURDIR/../shell_config.sh
set -e
$CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS alter_table"
$CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS alter_table2"
$CLICKHOUSE_CLIENT -q "CREATE TABLE alter_table (a UInt8, b Int16, c Float32, d String, e Array(UInt8), f Nullable(UUID), g Tuple(UInt8, UInt16)) ENGINE = ReplicatedMergeTree('/clickhouse/tables/alter_table', 'r1') ORDER BY a PARTITION BY b % 10 SETTINGS old_parts_lifetime = 1"
$CLICKHOUSE_CLIENT -q "CREATE TABLE alter_table2 (a UInt8, b Int16, c Float32, d String, e Array(UInt8), f Nullable(UUID), g Tuple(UInt8, UInt16)) ENGINE = ReplicatedMergeTree('/clickhouse/tables/alter_table', 'r2') ORDER BY a PARTITION BY b % 10 SETTINGS old_parts_lifetime = 1"
function thread1()
{
while true; do $CLICKHOUSE_CLIENT --query "SELECT * FROM system.parts FORMAT Null"; done
}
function thread2()
{
while true; do $CLICKHOUSE_CLIENT -n --query "ALTER TABLE alter_table ADD COLUMN h String; ALTER TABLE alter_table MODIFY COLUMN h UInt64; ALTER TABLE alter_table DROP COLUMN h;"; done
}
function thread3()
{
while true; do $CLICKHOUSE_CLIENT -q "INSERT INTO alter_table SELECT rand(1), rand(2), 1 / rand(3), toString(rand(4)), [rand(5), rand(6)], rand(7) % 2 ? NULL : generateUUIDv4(), (rand(8), rand(9)) FROM numbers(100000)"; done
}
function thread4()
{
while true; do $CLICKHOUSE_CLIENT -q "OPTIMIZE TABLE alter_table FINAL"; done
}
function thread5()
{
while true; do $CLICKHOUSE_CLIENT -q "ALTER TABLE alter_table DELETE WHERE rand() % 2 = 1"; done
}
# https://stackoverflow.com/questions/9954794/execute-a-shell-function-with-timeout
export -f thread1;
export -f thread2;
export -f thread3;
export -f thread4;
export -f thread5;
TIMEOUT=10
timeout $TIMEOUT bash -c thread1 2> /dev/null &
timeout $TIMEOUT bash -c thread2 2> /dev/null &
timeout $TIMEOUT bash -c thread3 2> /dev/null &
timeout $TIMEOUT bash -c thread4 2> /dev/null &
timeout $TIMEOUT bash -c thread5 2> /dev/null &
timeout $TIMEOUT bash -c thread1 2> /dev/null &
timeout $TIMEOUT bash -c thread2 2> /dev/null &
timeout $TIMEOUT bash -c thread3 2> /dev/null &
timeout $TIMEOUT bash -c thread4 2> /dev/null &
timeout $TIMEOUT bash -c thread5 2> /dev/null &
timeout $TIMEOUT bash -c thread1 2> /dev/null &
timeout $TIMEOUT bash -c thread2 2> /dev/null &
timeout $TIMEOUT bash -c thread3 2> /dev/null &
timeout $TIMEOUT bash -c thread4 2> /dev/null &
timeout $TIMEOUT bash -c thread5 2> /dev/null &
timeout $TIMEOUT bash -c thread1 2> /dev/null &
timeout $TIMEOUT bash -c thread2 2> /dev/null &
timeout $TIMEOUT bash -c thread3 2> /dev/null &
timeout $TIMEOUT bash -c thread4 2> /dev/null &
timeout $TIMEOUT bash -c thread5 2> /dev/null &
wait
$CLICKHOUSE_CLIENT -q "DROP TABLE alter_table"
$CLICKHOUSE_CLIENT -q "DROP TABLE alter_table2"

View File

@ -0,0 +1,100 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. $CURDIR/../shell_config.sh
set -e
function thread1()
{
while true; do
$CLICKHOUSE_CLIENT --query "SELECT * FROM system.parts FORMAT Null";
done
}
function thread2()
{
while true; do
REPLICA=$(($RANDOM % 10))
$CLICKHOUSE_CLIENT -n --query "ALTER TABLE alter_table_$REPLICA ADD COLUMN h String; ALTER TABLE alter_table_$REPLICA MODIFY COLUMN h UInt64; ALTER TABLE alter_table_$REPLICA DROP COLUMN h;";
done
}
function thread3()
{
while true; do
REPLICA=$(($RANDOM % 10))
$CLICKHOUSE_CLIENT -q "INSERT INTO alter_table_$REPLICA SELECT rand(1), rand(2), 1 / rand(3), toString(rand(4)), [rand(5), rand(6)], rand(7) % 2 ? NULL : generateUUIDv4(), (rand(8), rand(9)) FROM numbers(100000)";
done
}
function thread4()
{
while true; do
REPLICA=$(($RANDOM % 10))
$CLICKHOUSE_CLIENT -q "OPTIMIZE TABLE alter_table_$REPLICA FINAL";
sleep 0.$RANDOM;
done
}
function thread5()
{
while true; do
REPLICA=$(($RANDOM % 10))
$CLICKHOUSE_CLIENT -q "ALTER TABLE alter_table_$REPLICA DELETE WHERE rand() % 2 = 1";
sleep 0.$RANDOM;
done
}
function thread6()
{
while true; do
REPLICA=$(($RANDOM % 10))
$CLICKHOUSE_CLIENT -n -q "DROP TABLE IF EXISTS alter_table_$REPLICA;
CREATE TABLE alter_table_$REPLICA (a UInt8, b Int16, c Float32, d String, e Array(UInt8), f Nullable(UUID), g Tuple(UInt8, UInt16)) ENGINE = ReplicatedMergeTree('/clickhouse/tables/alter_table', 'r_$REPLICA') ORDER BY a PARTITION BY b % 10 SETTINGS old_parts_lifetime = 1;";
sleep 0.$RANDOM;
done
}
# https://stackoverflow.com/questions/9954794/execute-a-shell-function-with-timeout
export -f thread1;
export -f thread2;
export -f thread3;
export -f thread4;
export -f thread5;
export -f thread6;
TIMEOUT=30
timeout $TIMEOUT bash -c thread1 2> /dev/null &
timeout $TIMEOUT bash -c thread2 2> /dev/null &
timeout $TIMEOUT bash -c thread3 2> /dev/null &
timeout $TIMEOUT bash -c thread4 2> /dev/null &
timeout $TIMEOUT bash -c thread5 2> /dev/null &
timeout $TIMEOUT bash -c thread6 2> /dev/null &
timeout $TIMEOUT bash -c thread1 2> /dev/null &
timeout $TIMEOUT bash -c thread2 2> /dev/null &
timeout $TIMEOUT bash -c thread3 2> /dev/null &
timeout $TIMEOUT bash -c thread4 2> /dev/null &
timeout $TIMEOUT bash -c thread5 2> /dev/null &
timeout $TIMEOUT bash -c thread6 2> /dev/null &
timeout $TIMEOUT bash -c thread1 2> /dev/null &
timeout $TIMEOUT bash -c thread2 2> /dev/null &
timeout $TIMEOUT bash -c thread3 2> /dev/null &
timeout $TIMEOUT bash -c thread4 2> /dev/null &
timeout $TIMEOUT bash -c thread5 2> /dev/null &
timeout $TIMEOUT bash -c thread6 2> /dev/null &
timeout $TIMEOUT bash -c thread1 2> /dev/null &
timeout $TIMEOUT bash -c thread2 2> /dev/null &
timeout $TIMEOUT bash -c thread3 2> /dev/null &
timeout $TIMEOUT bash -c thread4 2> /dev/null &
timeout $TIMEOUT bash -c thread5 2> /dev/null &
timeout $TIMEOUT bash -c thread6 2> /dev/null &
wait
for i in {0..9}; do $CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS alter_table_$i"; done