update ddl guard

This commit is contained in:
CurtizJ 2018-09-18 21:33:15 +03:00
parent c1533f6176
commit 3177f3f6a9
10 changed files with 70 additions and 74 deletions

View File

@ -896,50 +896,30 @@ StoragePtr Context::executeTableFunction(const ASTPtr & table_expression)
}
DDLGuard::DDLGuard(Map & map_, std::mutex & mutex_, std::unique_lock<std::mutex> && /*lock*/, const String & elem, const String & message)
: map(map_), mutex(mutex_)
DDLGuard::DDLGuard(Map & map_, std::mutex & guards_mutex_, std::unique_lock<std::mutex> && lock, const String & elem)
: map(map_), guards_mutex(guards_mutex_)
{
bool inserted;
std::tie(it, inserted) = map.emplace(elem, message);
if (!inserted)
throw Exception(it->second, ErrorCodes::DDL_GUARD_IS_ACTIVE);
it = map.emplace(elem, Entry{std::make_unique<std::mutex>(), 0}).first;
++it->second.counter;
lock.unlock();
table_lock = std::unique_lock<std::mutex>(*it->second.mutex);
}
DDLGuard::~DDLGuard()
{
std::lock_guard<std::mutex> lock(mutex);
map.erase(it);
std::lock_guard<std::mutex> lock(guards_mutex);
--it->second.counter;
if (!it->second.counter)
{
table_lock.unlock();
map.erase(it);
}
}
std::unique_ptr<DDLGuard> Context::getDDLGuard(const String & database, const String & table, const String & message) const
std::unique_ptr<DDLGuard> Context::getDDLGuard(const String & database, const String & table) const
{
std::unique_lock<std::mutex> lock(shared->ddl_guards_mutex);
return std::make_unique<DDLGuard>(shared->ddl_guards[database], shared->ddl_guards_mutex, std::move(lock), table, message);
}
std::unique_ptr<DDLGuard> Context::getDDLGuardIfTableDoesntExist(const String & database, const String & table, const String & message) const
{
auto lock = getLock();
Databases::const_iterator it = shared->databases.find(database);
if (shared->databases.end() != it && it->second->isTableExist(*this, table))
return {};
return getDDLGuard(database, table, message);
}
std::unique_ptr<DDLGuard> Context::getDDLGuardIfDatabaseDoesntExist(const String & database, const String & message) const
{
auto lock = getLock();
auto it = shared->ddl_guards.find(database);
if (it != shared->ddl_guards.end() && !it->second.empty())
return {};
return getDDLGuard(database, "", message);
return std::make_unique<DDLGuard>(shared->ddl_guards[database], shared->ddl_guards_mutex, std::move(lock), table);
}

View File

@ -225,10 +225,7 @@ public:
/// Get an object that protects the table from concurrently executing multiple DDL operations.
/// If such an object already exists, an exception is thrown.
std::unique_ptr<DDLGuard> getDDLGuard(const String & database, const String & table, const String & message) const;
/// If the table already exists, it returns nullptr, otherwise guard is created.
std::unique_ptr<DDLGuard> getDDLGuardIfTableDoesntExist(const String & database, const String & table, const String & message) const;
std::unique_ptr<DDLGuard> getDDLGuardIfDatabaseDoesntExist(const String & database, const String & message) const;
std::unique_ptr<DDLGuard> getDDLGuard(const String & database, const String & table) const;
String getCurrentDatabase() const;
String getCurrentQueryId() const;
@ -473,17 +470,23 @@ private:
class DDLGuard
{
public:
struct Entry {
std::unique_ptr<std::mutex> mutex;
UInt32 counter;
};
/// Element name -> message.
/// NOTE: using std::map here (and not std::unordered_map) to avoid iterator invalidation on insertion.
using Map = std::map<String, String>;
using Map = std::map<String, Entry>;
DDLGuard(Map & map_, std::mutex & mutex_, std::unique_lock<std::mutex> && lock, const String & elem, const String & message);
DDLGuard(Map & map_, std::mutex & guards_mutex_, std::unique_lock<std::mutex> && guards_lock, const String & elem);
~DDLGuard();
private:
Map & map;
Map::iterator it;
std::mutex & mutex;
std::mutex & guards_mutex;
std::unique_lock<std::mutex> table_lock;
};

View File

@ -72,19 +72,16 @@ BlockIO InterpreterCreateQuery::createDatabase(ASTCreateQuery & create)
String database_name = create.database;
auto guard = context.getDDLGuardIfDatabaseDoesntExist(database_name, "Database " + database_name + " is creating right now");
if (!guard)
auto guard = context.getDDLGuard(database_name, "");
/// Database can be created before or it can be created concurrently in another thread, while we were waiting in DDLGuard
if (context.isDatabaseExist(database_name))
{
if (create.if_not_exists)
return {};
else
throw Exception("Database " + database_name + " already exists.", ErrorCodes::DATABASE_ALREADY_EXISTS);
}
else
{
if (create.if_not_exists && context.isDatabaseExist(database_name))
return {};
}
String database_engine_name;
if (!create.storage)
@ -555,15 +552,13 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
database = context.getDatabase(database_name);
data_path = database->getDataPath();
/** If the table already exists, and the request specifies IF NOT EXISTS,
* then we allow concurrent CREATE queries (which do nothing).
* Otherwise, concurrent queries for creating a table, if the table does not exist,
* can throw an exception, even if IF NOT EXISTS is specified.
/** If the request specifies IF NOT EXISTS, we allow concurrent CREATE queries (which do nothing).
* If table doesnt exist, one thread is creating table, while others wait in DDLGuard.
*/
guard = context.getDDLGuardIfTableDoesntExist(database_name, table_name,
"Table " + database_name + "." + table_name + " is creating or attaching right now");
guard = context.getDDLGuard(database_name, table_name);
if (!guard)
/// Table can be created before or it can be created concurrently in another thread, while we were waiting in DDLGuard.
if (database->isTableExist(context, table_name))
{
if (create.if_not_exists)
return {};

View File

@ -58,13 +58,12 @@ BlockIO InterpreterDropQuery::executeToTable(String & database_name_, String & t
String database_name = database_name_.empty() ? context.getCurrentDatabase() : database_name_;
auto ddl_guard = context.getDDLGuard(database_name, table_name);
DatabaseAndTable database_and_table = tryGetDatabaseAndTable(database_name, table_name, if_exists);
if (database_and_table.first && database_and_table.second)
{
auto ddl_guard = context.getDDLGuard(
database_name, table_name, "Table " + database_name + "." + table_name + " is dropping or detaching right now");
if (kind == ASTDropQuery::Kind::Detach)
{
database_and_table.second->shutdown();
@ -146,6 +145,8 @@ BlockIO InterpreterDropQuery::executeToTemporaryTable(String & table_name, ASTDr
BlockIO InterpreterDropQuery::executeToDatabase(String & database_name, ASTDropQuery::Kind kind, bool if_exists)
{
auto ddl_guard = context.getDDLGuard(database_name, "");
if (auto database = tryGetDatabase(database_name, if_exists))
{
if (kind == ASTDropQuery::Kind::Truncate)

View File

@ -90,18 +90,10 @@ BlockIO InterpreterRenameQuery::execute()
unique_tables_from.emplace(from);
if (!table_guards.count(from))
table_guards.emplace(from,
context.getDDLGuard(
from.database_name,
from.table_name,
"Table " + from.database_name + "." + from.table_name + " is being renamed right now"));
table_guards.emplace(from, context.getDDLGuard(from.database_name, from.table_name));
if (!table_guards.count(to))
table_guards.emplace(to,
context.getDDLGuard(
to.database_name,
to.table_name,
"Some table right now is being renamed to " + to.database_name + "." + to.table_name));
table_guards.emplace(to, context.getDDLGuard(to.database_name, to.table_name));
}
std::vector<TableFullWriteLock> locks;

View File

@ -225,7 +225,8 @@ BlockIO InterpreterSystemQuery::execute()
StoragePtr InterpreterSystemQuery::tryRestartReplica(const String & database_name, const String & table_name, Context & system_context)
{
auto database = system_context.getDatabase(database_name);
auto table_ddl_guard = system_context.getDDLGuard(database_name, table_name, "Table " + database_name + "." + table_name + " is restarting right now");
auto table_ddl_guard = system_context.getDDLGuard(database_name, table_name);
LOG_DEBUG(&Logger::get("System"), "trying restart replica");
ASTPtr create_ast;
/// Detach actions

View File

@ -4,13 +4,13 @@ set -e
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. $CURDIR/../shell_config.sh
${CLICKHOUSE_CLIENT} --query "DROP DATABASE IF EXISTS db_create_race"
${CLICKHOUSE_CLIENT} --query "DROP DATABASE IF EXISTS d_create_race"
function query()
{
for i in {1..100}; do
${CLICKHOUSE_CLIENT} --query "CREATE DATABASE IF NOT EXISTS db_create_race"
${CLICKHOUSE_CLIENT} --query "DROP DATABASE IF EXISTS db_create_race"
${CLICKHOUSE_CLIENT} --query "CREATE DATABASE IF NOT EXISTS d_create_race"
${CLICKHOUSE_CLIENT} --query "DROP DATABASE IF EXISTS d_create_race"
done
}
@ -20,4 +20,4 @@ done
wait
${CLICKHOUSE_CLIENT} --query "DROP DATABASE IF EXISTS db_create_race"
${CLICKHOUSE_CLIENT} --query "DROP DATABASE IF EXISTS d_create_race"

View File

@ -0,0 +1,24 @@
#!/usr/bin/env bash
set -e
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. $CURDIR/../shell_config.sh
${CLICKHOUSE_CLIENT} --query "DROP TABLE IF EXISTS db_create_race"
function query()
{
for i in {1..100}; do
${CLICKHOUSE_CLIENT} --query "CREATE TABLE IF NOT EXISTS db_create_race(a Int) ENGINE = Memory"
${CLICKHOUSE_CLIENT} --query "DROP TABLE IF EXISTS db_create_race"
echo i > 2
done
}
for i in {1..2}; do
query &
done
wait
${CLICKHOUSE_CLIENT} --query "DROP TABLE IF EXISTS db_create_race"