This commit is contained in:
Alexander Tokmakov 2020-04-22 23:43:10 +03:00
parent fefbbd37df
commit 1833ac7f16
9 changed files with 91 additions and 43 deletions

View File

@ -33,6 +33,8 @@ DatabaseAtomic::DatabaseAtomic(String name_, String metadata_path_, Context & co
: DatabaseOrdinary(name_, metadata_path_, "store/", "DatabaseAtomic (" + name_ + ")", context_)
, path_to_table_symlinks(context_.getPath() + "data/" + escapeForFileName(name_) + "/")
{
/// Symlinks in data/db_name/ directory are not used by ClickHouse,
/// it's needed only for convenient introspection.
Poco::File(path_to_table_symlinks).createDirectories();
}
@ -63,10 +65,10 @@ void DatabaseAtomic::attachTable(const String & name, const StoragePtr & table,
{
assert(relative_table_path != data_path && !relative_table_path.empty());
DetachedTables not_in_use;
std::lock_guard lock(mutex);
std::unique_lock lock(mutex);
not_in_use = cleenupDetachedTables();
assertDetachedTableNotInUse(table->getStorageID().uuid);
DatabaseWithDictionaries::attachTableUnlocked(name, table, relative_table_path);
DatabaseWithDictionaries::attachTableUnlocked(name, table, lock);
table_name_to_path.emplace(std::make_pair(name, relative_table_path));
tryCreateSymlink(name, relative_table_path);
}
@ -74,8 +76,8 @@ void DatabaseAtomic::attachTable(const String & name, const StoragePtr & table,
StoragePtr DatabaseAtomic::detachTable(const String & name)
{
DetachedTables not_in_use;
std::lock_guard lock(mutex);
auto table = DatabaseWithDictionaries::detachTableUnlocked(name);
std::unique_lock lock(mutex);
auto table = DatabaseWithDictionaries::detachTableUnlocked(name, lock);
table_name_to_path.erase(name);
detached_tables.emplace(table->getStorageID().uuid, table);
not_in_use = cleenupDetachedTables();
@ -89,14 +91,16 @@ void DatabaseAtomic::dropTable(const Context &, const String & table_name, bool
String table_metadata_path_drop;
StoragePtr table;
{
std::lock_guard lock(mutex);
table = getTableUnlocked(table_name);
std::unique_lock lock(mutex);
table = getTableUnlocked(table_name, lock);
table_metadata_path_drop = DatabaseCatalog::instance().getPathForDroppedMetadata(table->getStorageID());
Poco::File(table_metadata_path).renameTo(table_metadata_path_drop);
DatabaseWithDictionaries::detachTableUnlocked(table_name); /// Should never throw
Poco::File(table_metadata_path).renameTo(table_metadata_path_drop); /// Mark table as dropped
DatabaseWithDictionaries::detachTableUnlocked(table_name, lock); /// Should never throw
table_name_to_path.erase(table_name);
}
tryRemoveSymlink(table_name);
/// Notify DatabaseCatalog that table was dropped. It will remove table data in background.
/// Cleanup is performed outside of database to allow easily DROP DATABASE without waiting for cleanup to complete.
DatabaseCatalog::instance().enqueueDroppedTableCleanup(table->getStorageID(), table, table_metadata_path_drop, no_delay);
}
@ -163,20 +167,22 @@ void DatabaseAtomic::renameTable(const Context & context, const String & table_n
db_lock = std::unique_lock{mutex};
}
StoragePtr table = getTableUnlocked(table_name);
StoragePtr table = getTableUnlocked(table_name, db_lock);
assert_can_move_mat_view(table);
StoragePtr other_table;
if (exchange)
{
other_table = other_db.getTableUnlocked(to_table_name);
other_table = other_db.getTableUnlocked(to_table_name, other_db_lock);
assert_can_move_mat_view(other_table);
}
/// Table renaming actually begins here
if (exchange)
renameExchange(old_metadata_path, new_metadata_path);
else
renameNoReplace(old_metadata_path, new_metadata_path);
/// After metadata was successfully moved, the following methods should not throw (if them do, it's a logical error)
table_data_path = detach(*this, table_name);
if (exchange)
other_table_data_path = detach(other_db, to_table_name);
@ -204,11 +210,11 @@ void DatabaseAtomic::commitCreateTable(const ASTCreateQuery & query, const Stora
auto table_data_path = getTableDataPath(query);
try
{
std::lock_guard lock{mutex};
std::unique_lock lock{mutex};
not_in_use = cleenupDetachedTables();
assertDetachedTableNotInUse(query.uuid);
renameNoReplace(table_metadata_tmp_path, table_metadata_path);
attachTableUnlocked(query.table, table, table_data_path); /// Should never throw
attachTableUnlocked(query.table, table, lock); /// Should never throw
table_name_to_path.emplace(query.table, table_data_path);
}
catch (...)
@ -223,8 +229,8 @@ void DatabaseAtomic::commitAlterTable(const StorageID & table_id, const String &
{
SCOPE_EXIT({ Poco::File(table_metadata_tmp_path).remove(); });
std::lock_guard lock{mutex};
auto actual_table_id = getTableUnlocked(table_id.table_name)->getStorageID();
std::unique_lock lock{mutex};
auto actual_table_id = getTableUnlocked(table_id.table_name, lock)->getStorageID();
if (table_id.uuid != actual_table_id.uuid)
throw Exception("Cannot alter table because it was renamed", ErrorCodes::CANNOT_ASSIGN_ALTER);
@ -234,6 +240,12 @@ void DatabaseAtomic::commitAlterTable(const StorageID & table_id, const String &
void DatabaseAtomic::assertDetachedTableNotInUse(const UUID & uuid)
{
/// Without this check the following race is possible since table RWLocks are not used:
/// 1. INSERT INTO table ...;
/// 2. DETACH TABLE table; (INSERT still in progress, it holds StoragePtr)
/// 3. ATTACH TABLE table; (new instance of Storage with the same UUID is created, instances share data on disk)
/// 4. INSERT INTO table ...; (both Storage instances writes data without any synchronization)
/// To avoid it, we remember UUIDs of detached tables and does not allow ATTACH table with such UUID until detached instance still in use.
if (detached_tables.count(uuid))
throw Exception("Cannot attach table with UUID " + toString(uuid) +
", because it was detached but still used by come query. Retry later.", ErrorCodes::TABLE_ALREADY_EXISTS);
@ -253,6 +265,7 @@ DatabaseAtomic::DetachedTables DatabaseAtomic::cleenupDetachedTables()
else
++it;
}
/// It should be destroyed in caller with released database mutex
return not_in_use;
}

View File

@ -8,6 +8,15 @@
namespace DB
{
/// All tables in DatabaseAtomic have persistent UUID and store data in
/// /clickhouse_path/store/xxx/xxxyyyyy-yyyy-yyyy-yyyy-yyyyyyyyyyyy/
/// where xxxyyyyy-yyyy-yyyy-yyyy-yyyyyyyyyyyy is UUID of the table.
/// RENAMEs are performed without changing UUID and moving table data.
/// Tables in Atomic databases can be accessed by UUID through DatabaseCatalog.
/// On DROP TABLE no data is removed, DatabaseAtomic just marks table as dropped
/// by moving metadata to /clickhouse_path/metadata_dropped/ and notifies DatabaseCatalog.
/// Running queries still may use dropped table. Table will be actually removed when it's not in use.
/// Allows to execute RENAME and DROP without IStorage-level RWLocks
class DatabaseAtomic : public DatabaseOrdinary
{
public:
@ -48,7 +57,7 @@ private:
void assertDetachedTableNotInUse(const UUID & uuid);
typedef std::unordered_map<UUID, StoragePtr> DetachedTables;
DetachedTables cleenupDetachedTables();
[[nodiscard]] DetachedTables cleenupDetachedTables();
void tryCreateSymlink(const String & table_name, const String & actual_data_path);
void tryRemoveSymlink(const String & table_name);

View File

@ -27,8 +27,8 @@ void DatabaseMemory::createTable(
const StoragePtr & table,
const ASTPtr & query)
{
std::lock_guard lock{mutex};
attachTableUnlocked(table_name, table);
std::unique_lock lock{mutex};
attachTableUnlocked(table_name, table, lock);
create_queries.emplace(table_name, query);
}
@ -37,8 +37,8 @@ void DatabaseMemory::dropTable(
const String & table_name,
bool /*no_delay*/)
{
std::lock_guard lock{mutex};
auto table = detachTableUnlocked(table_name);
std::unique_lock lock{mutex};
auto table = detachTableUnlocked(table_name, lock);
try
{
table->drop();
@ -48,7 +48,7 @@ void DatabaseMemory::dropTable(
}
catch (...)
{
attachTableUnlocked(table_name, table);
attachTableUnlocked(table_name, table, lock);
throw;
}
table->is_dropped = true;

View File

@ -36,7 +36,7 @@ void DatabaseWithDictionaries::attachDictionary(const String & dictionary_name,
{
String full_name = getDatabaseName() + "." + dictionary_name;
{
std::lock_guard lock(mutex);
std::unique_lock lock(mutex);
auto [it, inserted] = dictionaries.emplace(dictionary_name, attach_info);
if (!inserted)
throw Exception("Dictionary " + full_name + " already exists.", ErrorCodes::DICTIONARY_ALREADY_EXISTS);
@ -49,7 +49,8 @@ void DatabaseWithDictionaries::attachDictionary(const String & dictionary_name,
StorageDictionary::create(
StorageID(getDatabaseName(), dictionary_name),
full_name,
ExternalDictionariesLoader::getDictionaryStructure(*attach_info.config)));
ExternalDictionariesLoader::getDictionaryStructure(*attach_info.config)),
lock);
}
catch (...)
{
@ -76,7 +77,7 @@ void DatabaseWithDictionaries::detachDictionaryImpl(const String & dictionary_na
String full_name = getDatabaseName() + "." + dictionary_name;
{
std::lock_guard lock(mutex);
std::unique_lock lock(mutex);
auto it = dictionaries.find(dictionary_name);
if (it == dictionaries.end())
throw Exception("Dictionary " + full_name + " doesn't exist.", ErrorCodes::UNKNOWN_DICTIONARY);
@ -86,7 +87,7 @@ void DatabaseWithDictionaries::detachDictionaryImpl(const String & dictionary_na
/// Detach the dictionary as table too.
try
{
detachTableUnlocked(dictionary_name);
detachTableUnlocked(dictionary_name, lock);
}
catch (...)
{

View File

@ -64,11 +64,11 @@ bool DatabaseWithOwnTablesBase::empty(const Context & /*context*/) const
StoragePtr DatabaseWithOwnTablesBase::detachTable(const String & table_name)
{
std::lock_guard lock(mutex);
return detachTableUnlocked(table_name);
std::unique_lock lock(mutex);
return detachTableUnlocked(table_name, lock);
}
StoragePtr DatabaseWithOwnTablesBase::detachTableUnlocked(const String & table_name)
StoragePtr DatabaseWithOwnTablesBase::detachTableUnlocked(const String & table_name, std::unique_lock<std::mutex> &)
{
StoragePtr res;
@ -88,13 +88,13 @@ StoragePtr DatabaseWithOwnTablesBase::detachTableUnlocked(const String & table_n
return res;
}
void DatabaseWithOwnTablesBase::attachTable(const String & table_name, const StoragePtr & table, const String & relative_table_path)
void DatabaseWithOwnTablesBase::attachTable(const String & table_name, const StoragePtr & table, const String &)
{
std::lock_guard lock(mutex);
attachTableUnlocked(table_name, table, relative_table_path);
std::unique_lock lock(mutex);
attachTableUnlocked(table_name, table, lock);
}
void DatabaseWithOwnTablesBase::attachTableUnlocked(const String & table_name, const StoragePtr & table, const String &)
void DatabaseWithOwnTablesBase::attachTableUnlocked(const String & table_name, const StoragePtr & table, std::unique_lock<std::mutex> &)
{
if (!tables.emplace(table_name, table).second)
throw Exception("Table " + database_name + "." + table_name + " already exists.", ErrorCodes::TABLE_ALREADY_EXISTS);
@ -144,7 +144,7 @@ DatabaseWithOwnTablesBase::~DatabaseWithOwnTablesBase()
}
}
StoragePtr DatabaseWithOwnTablesBase::getTableUnlocked(const String & table_name) const
StoragePtr DatabaseWithOwnTablesBase::getTableUnlocked(const String & table_name, std::unique_lock<std::mutex> &) const
{
auto it = tables.find(table_name);
if (it != tables.end())

View File

@ -46,9 +46,9 @@ protected:
DatabaseWithOwnTablesBase(const String & name_, const String & logger);
void attachTableUnlocked(const String & table_name, const StoragePtr & table, const String & relative_table_path = {});
StoragePtr detachTableUnlocked(const String & table_name);
StoragePtr getTableUnlocked(const String & table_name) const;
void attachTableUnlocked(const String & table_name, const StoragePtr & table, std::unique_lock<std::mutex> & lock);
StoragePtr detachTableUnlocked(const String & table_name, std::unique_lock<std::mutex> & lock);
StoragePtr getTableUnlocked(const String & table_name, std::unique_lock<std::mutex> & lock) const;
};
}

View File

@ -106,7 +106,7 @@ StoragePtr TemporaryTableHolder::getTable() const
void DatabaseCatalog::loadDatabases()
{
drop_delay_s = global_context->getConfigRef().getInt("database_atomic_delay_before_drop_table_s", 60);
drop_delay_sec = global_context->getConfigRef().getInt("database_atomic_delay_before_drop_table_sec", 60);
auto db_for_temporary_and_external_tables = std::make_shared<DatabaseMemory>(TEMPORARY_DATABASE, *global_context);
attachDatabase(TEMPORARY_DATABASE, db_for_temporary_and_external_tables);
@ -499,11 +499,22 @@ DatabaseAndTable DatabaseCatalog::tryGetDatabaseAndTable(const StorageID & table
void DatabaseCatalog::loadMarkedAsDroppedTables()
{
/// /clickhouse_root/metadata_dropped/ contains files with metadata of tables,
/// which where marked as dropped by Atomic databases.
/// Data directories of such tables still exists in store/
/// and metadata still exists in ZooKeeper for ReplicatedMergeTree tables.
/// If server restarts before such tables was completely dropped,
/// we should load them and enqueue cleanup to remove data from store/ and metadata from ZooKeeper
std::map<String, StorageID> dropped_metadata;
String path = global_context->getPath() + "metadata_dropped/";
Poco::DirectoryIterator dir_end;
for (Poco::DirectoryIterator it(path); it != dir_end; ++it)
{
/// File name has the following format:
/// database_name.table_name.uuid.sql
/// Ignore unexpected files
if (!it.name().ends_with(".sql"))
continue;
@ -555,12 +566,13 @@ void DatabaseCatalog::enqueueDroppedTableCleanup(StorageID table_id, StoragePtr
assert(!table || table->getStorageID().uuid == table_id.uuid);
assert(dropped_metadata_path == getPathForDroppedMetadata(table_id));
/// Table was removed from database. Enqueue removal of its data from disk.
time_t drop_time;
if (table)
drop_time = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
else
{
/// Try load table from metadata to drop it correctly (e.g. remove metadata from zk)
/// Try load table from metadata to drop it correctly (e.g. remove metadata from zk or remove data from all volumes)
LOG_INFO(log, "Trying load partially dropped table " << table_id.getNameForLogs() << " from " << dropped_metadata_path);
ASTPtr ast = DatabaseOnDisk::parseQueryFromMetadata(log, *global_context, dropped_metadata_path, /*throw_on_error*/ false, /*remove_empty*/false);
auto create = typeid_cast<ASTCreateQuery *>(ast.get());
@ -599,10 +611,17 @@ void DatabaseCatalog::enqueueDroppedTableCleanup(StorageID table_id, StoragePtr
tables_marked_dropped.push_front({table_id, table, dropped_metadata_path, 0});
else
tables_marked_dropped.push_back({table_id, table, dropped_metadata_path, drop_time});
/// If list of dropped tables was empty, start a drop task
if (tables_marked_dropped.size() == 1)
(*drop_task)->schedule();
}
void DatabaseCatalog::dropTableDataTask()
{
/// Background task that removes data of tables which were marked as dropped by Atomic databases.
/// Table can be removed when it's not used by queries and drop_delay_sec elapsed since it was marked as dropped.
bool need_reschedule = true;
TableMarkedAsDropped table;
try
{
@ -611,7 +630,7 @@ void DatabaseCatalog::dropTableDataTask()
auto it = std::find_if(tables_marked_dropped.begin(), tables_marked_dropped.end(), [&](const auto & elem)
{
bool not_in_use = !elem.table || elem.table.unique();
bool old_enough = elem.drop_time + drop_delay_s < current_time;
bool old_enough = elem.drop_time + drop_delay_sec < current_time;
return not_in_use && old_enough;
});
if (it != tables_marked_dropped.end())
@ -620,6 +639,7 @@ void DatabaseCatalog::dropTableDataTask()
LOG_INFO(log, "Will try drop " + table.table_id.getNameForLogs());
tables_marked_dropped.erase(it);
}
need_reschedule = !tables_marked_dropped.empty();
}
catch (...)
{
@ -640,10 +660,15 @@ void DatabaseCatalog::dropTableDataTask()
{
std::lock_guard lock(tables_marked_dropped_mutex);
tables_marked_dropped.emplace_back(std::move(table));
/// If list of dropped tables was empty, schedule a task to retry deletion.
if (tables_marked_dropped.size() == 1)
need_reschedule = true;
}
}
}
/// Do not schedule a task if there is no tables to drop
if (need_reschedule)
(*drop_task)->scheduleAfter(reschedule_time_ms);
}

View File

@ -224,7 +224,7 @@ private:
mutable std::mutex tables_marked_dropped_mutex;
std::unique_ptr<BackgroundSchedulePoolTaskHolder> drop_task;
time_t drop_delay_s = 60;
time_t drop_delay_sec = 60;
};
}

View File

@ -1,5 +1,5 @@
DROP DATABASE IF EXISTS dict_db_01254;
CREATE DATABASE dict_db_01254;
CREATE DATABASE dict_db_01254 ENGINE=Ordinary;
CREATE TABLE dict_db_01254.dict_data (key UInt64, val UInt64) Engine=Memory();
CREATE DICTIONARY dict_db_01254.dict
@ -13,7 +13,7 @@ LIFETIME(MIN 0 MAX 0)
LAYOUT(FLAT());
DETACH DATABASE dict_db_01254;
ATTACH DATABASE dict_db_01254;
ATTACH DATABASE dict_db_01254 ENGINE=Ordinary;
SELECT query_count, status FROM system.dictionaries WHERE database = 'dict_db_01254' AND name = 'dict';
SYSTEM RELOAD DICTIONARY dict_db_01254.dict;