Fix race between DROP MatView and RESTART REPLICAS (#47863)

* fix race between drop mv and restart replicas

* unrelated: fix bad exception messages

* fix test

* fix

* fix

* fix

* fix

* fix test

* fix

* fix test

* Update 02437_drop_mv_restart_replicas.sh

* fix tests
This commit is contained in:
Alexander Tokmakov 2023-04-01 15:26:00 +03:00 committed by GitHub
parent bf02068702
commit 48b23dd012
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 139 additions and 41 deletions

View File

@ -71,9 +71,12 @@ public:
scale(scale_) scale(scale_)
{ {
if (unlikely(precision < 1 || precision > maxPrecision())) if (unlikely(precision < 1 || precision > maxPrecision()))
throw Exception(ErrorCodes::ARGUMENT_OUT_OF_BOUND, "Precision {} is out of bounds", std::to_string(precision)); throw Exception(ErrorCodes::ARGUMENT_OUT_OF_BOUND,
"Precision {} is out of bounds (precision range: [1, {}])",
std::to_string(precision), maxPrecision());
if (unlikely(scale > maxPrecision())) if (unlikely(scale > maxPrecision()))
throw Exception(ErrorCodes::ARGUMENT_OUT_OF_BOUND, "Scale {} is out of bounds", std::to_string(scale)); throw Exception(ErrorCodes::ARGUMENT_OUT_OF_BOUND, "Scale {} is out of bounds (max scale: {})",
std::to_string(scale), maxPrecision());
} }
TypeIndex getTypeId() const override { return TypeToTypeIndex<T>; } TypeIndex getTypeId() const override { return TypeToTypeIndex<T>; }

View File

@ -116,7 +116,8 @@ inline ReturnType convertDecimalsImpl(const typename FromDataType::FieldType & v
if (common::mulOverflow(static_cast<MaxNativeType>(value.value), converted_value, converted_value)) if (common::mulOverflow(static_cast<MaxNativeType>(value.value), converted_value, converted_value))
{ {
if constexpr (throw_exception) if constexpr (throw_exception)
throw Exception(ErrorCodes::DECIMAL_OVERFLOW, "{} convert overflow", std::string(ToDataType::family_name)); throw Exception(ErrorCodes::DECIMAL_OVERFLOW, "{} convert overflow while multiplying {} by scale {}",
std::string(ToDataType::family_name), toString(value.value), toString(converted_value));
else else
return ReturnType(false); return ReturnType(false);
} }
@ -136,7 +137,10 @@ inline ReturnType convertDecimalsImpl(const typename FromDataType::FieldType & v
converted_value > std::numeric_limits<typename ToFieldType::NativeType>::max()) converted_value > std::numeric_limits<typename ToFieldType::NativeType>::max())
{ {
if constexpr (throw_exception) if constexpr (throw_exception)
throw Exception(ErrorCodes::DECIMAL_OVERFLOW, "{} convert overflow", std::string(ToDataType::family_name)); throw Exception(ErrorCodes::DECIMAL_OVERFLOW, "{} convert overflow: {} is not in range ({}, {})",
std::string(ToDataType::family_name), toString(converted_value),
toString(std::numeric_limits<typename ToFieldType::NativeType>::min()),
toString(std::numeric_limits<typename ToFieldType::NativeType>::max()));
else else
return ReturnType(false); return ReturnType(false);
} }

View File

@ -223,7 +223,8 @@ struct ArrayAggregateImpl
auto result_scale = column_typed->getScale() * array_size; auto result_scale = column_typed->getScale() * array_size;
if (unlikely(result_scale > DecimalUtils::max_precision<AggregationType>)) if (unlikely(result_scale > DecimalUtils::max_precision<AggregationType>))
throw Exception(ErrorCodes::ARGUMENT_OUT_OF_BOUND, "Scale {} is out of bounds", result_scale); throw Exception(ErrorCodes::ARGUMENT_OUT_OF_BOUND, "Scale {} is out of bounds (max scale: {})",
result_scale, DecimalUtils::max_precision<AggregationType>);
res[i] = DecimalUtils::convertTo<ResultType>(product, static_cast<UInt32>(result_scale)); res[i] = DecimalUtils::convertTo<ResultType>(product, static_cast<UInt32>(result_scale));
} }
@ -332,7 +333,8 @@ struct ArrayAggregateImpl
auto result_scale = column->getScale() * count; auto result_scale = column->getScale() * count;
if (unlikely(result_scale > DecimalUtils::max_precision<AggregationType>)) if (unlikely(result_scale > DecimalUtils::max_precision<AggregationType>))
throw Exception(ErrorCodes::ARGUMENT_OUT_OF_BOUND, "Scale {} is out of bounds", result_scale); throw Exception(ErrorCodes::ARGUMENT_OUT_OF_BOUND, "Scale {} is out of bounds (max scale: {})",
result_scale, DecimalUtils::max_precision<AggregationType>);
res[i] = DecimalUtils::convertTo<ResultType>(aggregate_value, static_cast<UInt32>(result_scale)); res[i] = DecimalUtils::convertTo<ResultType>(aggregate_value, static_cast<UInt32>(result_scale));
} }

View File

@ -71,7 +71,8 @@ private:
/// For array on stack, see below. /// For array on stack, see below.
if (arguments.size() > 10000) if (arguments.size() > 10000)
{ {
throw Exception(ErrorCodes::TOO_MANY_ARGUMENTS_FOR_FUNCTION, "Number of arguments of function {} is too large.", throw Exception(ErrorCodes::TOO_MANY_ARGUMENTS_FOR_FUNCTION,
"Number of arguments of function {} is too large (maximum: 10000).",
getName()); getName());
} }

View File

@ -37,7 +37,7 @@ namespace DB
{ {
namespace ErrorCodes namespace ErrorCodes
{ {
extern const int TOO_FEW_ARGUMENTS_FOR_FUNCTION; extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int BAD_ARGUMENTS; extern const int BAD_ARGUMENTS;
extern const int ILLEGAL_TYPE_OF_ARGUMENT; extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int ILLEGAL_COLUMN; extern const int ILLEGAL_COLUMN;
@ -87,7 +87,7 @@ public:
{ {
if (arguments.size() < 2) if (arguments.size() < 2)
{ {
throw Exception(ErrorCodes::TOO_FEW_ARGUMENTS_FOR_FUNCTION, "Too few arguments"); throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "Function {} requires at least 2 arguments", getName());
} }
/** We allow function invocation in one of the following forms: /** We allow function invocation in one of the following forms:

View File

@ -13,8 +13,7 @@ namespace DB
namespace ErrorCodes namespace ErrorCodes
{ {
extern const int ILLEGAL_TYPE_OF_ARGUMENT; extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int TOO_MANY_ARGUMENTS_FOR_FUNCTION; extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int TOO_FEW_ARGUMENTS_FOR_FUNCTION;
} }
class FunctionSvg : public IFunction class FunctionSvg : public IFunction
@ -48,13 +47,9 @@ public:
DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override
{ {
if (arguments.size() > 2) if (arguments.empty() || arguments.size() > 2)
{ {
throw Exception(ErrorCodes::TOO_MANY_ARGUMENTS_FOR_FUNCTION, "Too many arguments"); throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "Incorrect number of arguments: expected 1 or 2 arguments");
}
else if (arguments.empty())
{
throw Exception(ErrorCodes::TOO_FEW_ARGUMENTS_FOR_FUNCTION, "Too few arguments");
} }
else if (arguments.size() == 2 && checkAndGetDataType<DataTypeString>(arguments[1].get()) == nullptr) else if (arguments.size() == 2 && checkAndGetDataType<DataTypeString>(arguments[1].get()) == nullptr)
{ {

View File

@ -355,19 +355,22 @@ BlockIO InterpreterDropQuery::executeToDatabaseImpl(const ASTDropQuery & query,
/// Flush should not be done if shouldBeEmptyOnDetach() == false, /// Flush should not be done if shouldBeEmptyOnDetach() == false,
/// since in this case getTablesIterator() may do some additional work, /// since in this case getTablesIterator() may do some additional work,
/// see DatabaseMaterializedMySQL::getTablesIterator() /// see DatabaseMaterializedMySQL::getTablesIterator()
for (auto iterator = database->getTablesIterator(getContext()); iterator->isValid(); iterator->next())
{
iterator->table()->flush();
}
auto table_context = Context::createCopy(getContext()); auto table_context = Context::createCopy(getContext());
table_context->setInternalQuery(true); table_context->setInternalQuery(true);
/// Do not hold extra shared pointers to tables
std::vector<std::pair<String, bool>> tables_to_drop;
for (auto iterator = database->getTablesIterator(table_context); iterator->isValid(); iterator->next()) for (auto iterator = database->getTablesIterator(table_context); iterator->isValid(); iterator->next())
{ {
iterator->table()->flush();
tables_to_drop.push_back({iterator->name(), iterator->table()->isDictionary()});
}
for (const auto & table : tables_to_drop)
{
query_for_table.setTable(table.first);
query_for_table.is_dictionary = table.second;
DatabasePtr db; DatabasePtr db;
UUID table_to_wait = UUIDHelpers::Nil; UUID table_to_wait = UUIDHelpers::Nil;
query_for_table.setTable(iterator->name());
query_for_table.is_dictionary = iterator->table()->isDictionary();
executeToTableImpl(table_context, query_for_table, db, table_to_wait); executeToTableImpl(table_context, query_for_table, db, table_to_wait);
uuids_to_wait.push_back(table_to_wait); uuids_to_wait.push_back(table_to_wait);
} }
@ -428,7 +431,8 @@ AccessRightsElements InterpreterDropQuery::getRequiredAccessForDDLOnCluster() co
return required_access; return required_access;
} }
void InterpreterDropQuery::executeDropQuery(ASTDropQuery::Kind kind, ContextPtr global_context, ContextPtr current_context, const StorageID & target_table_id, bool sync) void InterpreterDropQuery::executeDropQuery(ASTDropQuery::Kind kind, ContextPtr global_context, ContextPtr current_context,
const StorageID & target_table_id, bool sync, bool ignore_sync_setting)
{ {
if (DatabaseCatalog::instance().tryGetTable(target_table_id, current_context)) if (DatabaseCatalog::instance().tryGetTable(target_table_id, current_context))
{ {
@ -445,6 +449,8 @@ void InterpreterDropQuery::executeDropQuery(ASTDropQuery::Kind kind, ContextPtr
/// and not allowed to drop inner table explicitly. Allowing to drop inner table without explicit grant /// and not allowed to drop inner table explicitly. Allowing to drop inner table without explicit grant
/// looks like expected behaviour and we have tests for it. /// looks like expected behaviour and we have tests for it.
auto drop_context = Context::createCopy(global_context); auto drop_context = Context::createCopy(global_context);
if (ignore_sync_setting)
drop_context->setSetting("database_atomic_wait_for_drop_and_detach_synchronously", false);
drop_context->getClientInfo().query_kind = ClientInfo::QueryKind::SECONDARY_QUERY; drop_context->getClientInfo().query_kind = ClientInfo::QueryKind::SECONDARY_QUERY;
if (auto txn = current_context->getZooKeeperMetadataTransaction()) if (auto txn = current_context->getZooKeeperMetadataTransaction())
{ {

View File

@ -24,7 +24,8 @@ public:
/// Drop table or database. /// Drop table or database.
BlockIO execute() override; BlockIO execute() override;
static void executeDropQuery(ASTDropQuery::Kind kind, ContextPtr global_context, ContextPtr current_context, const StorageID & target_table_id, bool sync); static void executeDropQuery(ASTDropQuery::Kind kind, ContextPtr global_context, ContextPtr current_context,
const StorageID & target_table_id, bool sync, bool ignore_sync_setting = false);
bool supportsTransactions() const override; bool supportsTransactions() const override;

View File

@ -604,6 +604,7 @@ void InterpreterSystemQuery::restoreReplica()
StoragePtr InterpreterSystemQuery::tryRestartReplica(const StorageID & replica, ContextMutablePtr system_context, bool need_ddl_guard) StoragePtr InterpreterSystemQuery::tryRestartReplica(const StorageID & replica, ContextMutablePtr system_context, bool need_ddl_guard)
{ {
LOG_TRACE(log, "Restarting replica {}", replica);
auto table_ddl_guard = need_ddl_guard auto table_ddl_guard = need_ddl_guard
? DatabaseCatalog::instance().getDDLGuard(replica.getDatabaseName(), replica.getTableName()) ? DatabaseCatalog::instance().getDDLGuard(replica.getDatabaseName(), replica.getTableName())
: nullptr; : nullptr;
@ -647,6 +648,7 @@ StoragePtr InterpreterSystemQuery::tryRestartReplica(const StorageID & replica,
database->attachTable(system_context, replica.table_name, table, data_path); database->attachTable(system_context, replica.table_name, table, data_path);
table->startup(); table->startup();
LOG_TRACE(log, "Restarted replica {}", replica);
return table; return table;
} }
@ -693,11 +695,11 @@ void InterpreterSystemQuery::restartReplicas(ContextMutablePtr system_context)
guard.second = catalog.getDDLGuard(guard.first.database_name, guard.first.table_name); guard.second = catalog.getDDLGuard(guard.first.database_name, guard.first.table_name);
size_t threads = std::min(static_cast<size_t>(getNumberOfPhysicalCPUCores()), replica_names.size()); size_t threads = std::min(static_cast<size_t>(getNumberOfPhysicalCPUCores()), replica_names.size());
LOG_DEBUG(log, "Will restart {} replicas using {} threads", replica_names.size(), threads);
ThreadPool pool(CurrentMetrics::RestartReplicaThreads, CurrentMetrics::RestartReplicaThreadsActive, threads); ThreadPool pool(CurrentMetrics::RestartReplicaThreads, CurrentMetrics::RestartReplicaThreadsActive, threads);
for (auto & replica : replica_names) for (auto & replica : replica_names)
{ {
LOG_TRACE(log, "Restarting replica on {}", replica.getNameForLogs());
pool.scheduleOrThrowOnError([&]() { tryRestartReplica(replica, system_context, false); }); pool.scheduleOrThrowOnError([&]() { tryRestartReplica(replica, system_context, false); });
} }
pool.wait(); pool.wait();

View File

@ -250,7 +250,7 @@ void StorageMaterializedPostgreSQL::dropInnerTableIfAny(bool sync, ContextPtr lo
auto nested_table = tryGetNested() != nullptr; auto nested_table = tryGetNested() != nullptr;
if (nested_table) if (nested_table)
InterpreterDropQuery::executeDropQuery(ASTDropQuery::Kind::Drop, getContext(), local_context, getNestedStorageID(), sync); InterpreterDropQuery::executeDropQuery(ASTDropQuery::Kind::Drop, getContext(), local_context, getNestedStorageID(), sync, /* ignore_sync_setting */ true);
} }

View File

@ -212,13 +212,26 @@ void StorageMaterializedView::drop()
if (!select_query.select_table_id.empty()) if (!select_query.select_table_id.empty())
DatabaseCatalog::instance().removeViewDependency(select_query.select_table_id, table_id); DatabaseCatalog::instance().removeViewDependency(select_query.select_table_id, table_id);
dropInnerTableIfAny(true, getContext()); /// Sync flag and the setting make sense for Atomic databases only.
/// However, with Atomic databases, IStorage::drop() can be called only from a background task in DatabaseCatalog.
/// Running synchronous DROP from that task leads to deadlock.
/// Usually dropInnerTableIfAny is no-op, because the inner table is dropped before enqueueing a drop task for the MV itself.
/// But there's a race condition with SYSTEM RESTART REPLICA: the inner table might be detached due to RESTART.
/// In this case, dropInnerTableIfAny will not find the inner table and will not drop it during executions of DROP query for the MV itself.
/// DDLGuard does not protect from that, because RESTART REPLICA acquires DDLGuard for the inner table name,
/// but DROP acquires DDLGuard for the name of MV. And we cannot acquire second DDLGuard for the inner name in DROP,
/// because it may lead to lock-order-inversion (DDLGuards must be acquired in lexicographical order).
dropInnerTableIfAny(/* sync */ false, getContext());
} }
void StorageMaterializedView::dropInnerTableIfAny(bool sync, ContextPtr local_context) void StorageMaterializedView::dropInnerTableIfAny(bool sync, ContextPtr local_context)
{ {
/// We will use `sync` argument wneh this function is called from a DROP query
/// and will ignore database_atomic_wait_for_drop_and_detach_synchronously when it's called from drop task.
/// See the comment in StorageMaterializedView::drop
if (has_inner_table && tryGetTargetTable()) if (has_inner_table && tryGetTargetTable())
InterpreterDropQuery::executeDropQuery(ASTDropQuery::Kind::Drop, getContext(), local_context, target_table_id, sync); InterpreterDropQuery::executeDropQuery(ASTDropQuery::Kind::Drop, getContext(), local_context, target_table_id,
sync, /* ignore_sync_setting */ true);
} }
void StorageMaterializedView::truncate(const ASTPtr &, const StorageMetadataPtr &, ContextPtr local_context, TableExclusiveLockHolder &) void StorageMaterializedView::truncate(const ASTPtr &, const StorageMetadataPtr &, ContextPtr local_context, TableExclusiveLockHolder &)

View File

@ -1609,7 +1609,7 @@ void StorageWindowView::drop()
{ {
/// Must be guaranteed at this point for database engine Atomic that has_inner_table == false, /// Must be guaranteed at this point for database engine Atomic that has_inner_table == false,
/// because otherwise will be a deadlock. /// because otherwise will be a deadlock.
dropInnerTableIfAny(true, getContext()); dropInnerTableIfAny(false, getContext());
} }
void StorageWindowView::dropInnerTableIfAny(bool sync, ContextPtr local_context) void StorageWindowView::dropInnerTableIfAny(bool sync, ContextPtr local_context)
@ -1623,7 +1623,7 @@ void StorageWindowView::dropInnerTableIfAny(bool sync, ContextPtr local_context)
ASTDropQuery::Kind::Drop, getContext(), local_context, inner_table_id, sync); ASTDropQuery::Kind::Drop, getContext(), local_context, inner_table_id, sync);
if (has_inner_target_table) if (has_inner_target_table)
InterpreterDropQuery::executeDropQuery(ASTDropQuery::Kind::Drop, getContext(), local_context, target_table_id, sync); InterpreterDropQuery::executeDropQuery(ASTDropQuery::Kind::Drop, getContext(), local_context, target_table_id, sync, /* ignore_sync_setting */ true);
} }
catch (...) catch (...)
{ {

View File

@ -149,9 +149,14 @@ function stop()
if [ $check_hang == true ] if [ $check_hang == true ]
then then
# We failed to stop the server with SIGTERM. Maybe it hang, let's collect stacktraces. # We failed to stop the server with SIGTERM. Maybe it hang, let's collect stacktraces.
echo -e "Possible deadlock on shutdown (see gdb.log)$FAIL" >> /test_output/test_results.tsv # Add a special status just in case, so it will be possible to find in the CI DB
echo -e "Warning: server did not stop yet$OK" >> /test_output/test_results.tsv
kill -TERM "$(pidof gdb)" ||: kill -TERM "$(pidof gdb)" ||:
sleep 5 sleep 5
# The server could finally stop while we were terminating gdb, let's recheck if it's still running
kill -s 0 $pid || return
echo -e "Possible deadlock on shutdown (see gdb.log)$FAIL" >> /test_output/test_results.tsv
echo "thread apply all backtrace (on stop)" >> /test_output/gdb.log echo "thread apply all backtrace (on stop)" >> /test_output/gdb.log
timeout 30m gdb -batch -ex 'thread apply all backtrace' -p "$pid" | ts '%Y-%m-%d %H:%M:%S' >> /test_output/gdb.log timeout 30m gdb -batch -ex 'thread apply all backtrace' -p "$pid" | ts '%Y-%m-%d %H:%M:%S' >> /test_output/gdb.log
clickhouse stop --force clickhouse stop --force

View File

@ -278,7 +278,7 @@ def need_retry(args, stdout, stderr, total_time):
def get_processlist_with_stacktraces(args): def get_processlist_with_stacktraces(args):
try: try:
if args.replicated_database: if args.replicated_database:
return clickhouse_execute_json( return clickhouse_execute(
args, args,
""" """
SELECT materialize(hostName() || '::' || tcpPort()::String) as host_port, * SELECT materialize(hostName() || '::' || tcpPort()::String) as host_port, *
@ -295,14 +295,14 @@ def get_processlist_with_stacktraces(args):
WHERE query NOT LIKE '%system.processes%' WHERE query NOT LIKE '%system.processes%'
GROUP BY p.* GROUP BY p.*
)) ))
ORDER BY elapsed DESC ORDER BY elapsed DESC FORMAT Vertical
""", """,
settings={ settings={
"allow_introspection_functions": 1, "allow_introspection_functions": 1,
}, },
) )
else: else:
return clickhouse_execute_json( return clickhouse_execute(
args, args,
""" """
SELECT SELECT
@ -315,7 +315,7 @@ def get_processlist_with_stacktraces(args):
JOIN system.stack_trace s USING (query_id) JOIN system.stack_trace s USING (query_id)
WHERE query NOT LIKE '%system.processes%' WHERE query NOT LIKE '%system.processes%'
GROUP BY p.* GROUP BY p.*
ORDER BY elapsed DESC ORDER BY elapsed DESC FORMAT Vertical
""", """,
settings={ settings={
"allow_introspection_functions": 1, "allow_introspection_functions": 1,
@ -2058,7 +2058,10 @@ def reportLogStats(args):
'Table {} is not replicated', '{} {}.{} already exists', 'Attempt to read after eof', 'Table {} is not replicated', '{} {}.{} already exists', 'Attempt to read after eof',
'Replica {} already exists', 'Convert overflow', 'key must be a tuple', 'Division by zero', 'Replica {} already exists', 'Convert overflow', 'key must be a tuple', 'Division by zero',
'No part {} in committed state', 'Files set to {}', 'Bytes set to {}', 'Sharding key {} is not used', 'No part {} in committed state', 'Files set to {}', 'Bytes set to {}', 'Sharding key {} is not used',
'Cannot parse datetime', 'Bad get: has {}, requested {}', 'There is no {} in {}', 'Numeric overflow' 'Cannot parse datetime', 'Bad get: has {}, requested {}', 'There is no {} in {}', 'Numeric overflow',
'Polygon is not valid: {}', 'Decimal math overflow', '{} only accepts maps', 'Dictionary ({}) not found',
'Unknown format {}', 'Invalid IPv4 value', 'Invalid IPv6 value', 'Unknown setting {}',
'Unknown table function {}'
) AS known_short_messages ) AS known_short_messages
SELECT count() AS c, message_format_string, substr(any(message), 1, 120) SELECT count() AS c, message_format_string, substr(any(message), 1, 120)
FROM system.text_log FROM system.text_log

View File

@ -91,8 +91,8 @@ def test_mutate_and_upgrade(start_cluster):
node2.query("OPTIMIZE TABLE mt FINAL") node2.query("OPTIMIZE TABLE mt FINAL")
assert node1.query("SELECT id FROM mt") == "1\n4\n" assert node1.query("SELECT id FROM mt ORDER BY id") == "1\n4\n"
assert node2.query("SELECT id FROM mt") == "1\n4\n" assert node2.query("SELECT id FROM mt ORDER BY id") == "1\n4\n"
for node in [node1, node2]: for node in [node1, node2]:
node.query("DROP TABLE mt") node.query("DROP TABLE mt")

View File

@ -0,0 +1,63 @@
#!/usr/bin/env bash
# Tags: long, zookeeper, race, no-ordinary-database
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
$CLICKHOUSE_CLIENT -q "create user u_$CLICKHOUSE_DATABASE"
$CLICKHOUSE_CLIENT -q "grant all on db_$CLICKHOUSE_DATABASE.* to u_$CLICKHOUSE_DATABASE"
# For tests with Replicated
ENGINE=$($CLICKHOUSE_CLIENT -q "select replace(engine_full, '$CLICKHOUSE_DATABASE', 'db_$CLICKHOUSE_DATABASE') from system.databases where name='$CLICKHOUSE_DATABASE' format TSVRaw")
export ENGINE
function thread_ddl()
{
while true; do
$CLICKHOUSE_CLIENT -q "create database if not exists db_$CLICKHOUSE_DATABASE engine=$ENGINE"
$CLICKHOUSE_CLIENT -q "CREATE TABLE if not exists db_$CLICKHOUSE_DATABASE.test (test String, A Int64, B Int64) ENGINE = ReplicatedMergeTree ('/clickhouse/tables/{database}/test_02124/{table}', '1') ORDER BY tuple();"
$CLICKHOUSE_CLIENT -q "CREATE MATERIALIZED VIEW if not exists db_$CLICKHOUSE_DATABASE.test_mv_a Engine=ReplicatedMergeTree ('/clickhouse/tables/{database}/test_02124/{table}', '1') order by tuple() AS SELECT test, A, count() c FROM db_$CLICKHOUSE_DATABASE.test group by test, A;"
$CLICKHOUSE_CLIENT -q "CREATE MATERIALIZED VIEW if not exists db_$CLICKHOUSE_DATABASE.test_mv_b Engine=ReplicatedMergeTree ('/clickhouse/tables/{database}/test_02124/{table}', '1') partition by A order by tuple() AS SELECT test, A, count() c FROM db_$CLICKHOUSE_DATABASE.test group by test, A;"
$CLICKHOUSE_CLIENT -q "CREATE MATERIALIZED VIEW if not exists db_$CLICKHOUSE_DATABASE.test_mv_c Engine=ReplicatedMergeTree ('/clickhouse/tables/{database}/test_02124/{table}', '1') order by tuple() AS SELECT test, A, count() c FROM db_$CLICKHOUSE_DATABASE.test group by test, A;"
sleep 0.$RANDOM;
# A kind of backoff
timeout 5s $CLICKHOUSE_CLIENT -q "select sleepEachRow(0.1) from system.dropped_tables format Null" 2>/dev/null ||:
$CLICKHOUSE_CLIENT -q "drop database if exists db_$CLICKHOUSE_DATABASE"
done
}
function thread_insert()
{
while true; do
$CLICKHOUSE_CLIENT -q "INSERT INTO db_$CLICKHOUSE_DATABASE.test SELECT 'case1', number%3, rand() FROM numbers(5)"
sleep 0.$RANDOM;
done
}
function thread_restart()
{
while true; do
# The simplest way to restart only replicas from a specific database is to use a special user
$CLICKHOUSE_CLIENT --user "u_$CLICKHOUSE_DATABASE" -q "system restart replicas"
sleep 0.$RANDOM;
done
}
export -f thread_ddl;
export -f thread_insert;
export -f thread_restart;
TIMEOUT=15
timeout $TIMEOUT bash -c thread_ddl 2>&1| grep -Fa "Exception: " | grep -Fv -e "TABLE_IS_DROPPED" -e "UNKNOWN_TABLE" -e "DATABASE_NOT_EMPTY" &
timeout $TIMEOUT bash -c thread_insert 2> /dev/null &
timeout $TIMEOUT bash -c thread_restart 2>&1| grep -Fa "Exception: " | grep -Fv -e "is currently dropped or renamed" &
wait
timeout 45s $CLICKHOUSE_CLIENT -q "select sleepEachRow(0.3) from system.dropped_tables format Null" 2>/dev/null ||:
$CLICKHOUSE_CLIENT -q "drop database if exists db_$CLICKHOUSE_DATABASE" 2>&1| grep -Fa "Exception: " | grep -Fv -e "TABLE_IS_DROPPED" -e "UNKNOWN_TABLE" -e "DATABASE_NOT_EMPTY" ||: