This commit is contained in:
Alexander Tokmakov 2020-03-28 01:58:03 +03:00
parent d826e4f766
commit d012266440
9 changed files with 52 additions and 16 deletions

View File

@ -54,11 +54,17 @@ namespace ErrorCodes
#endif
#if !defined(__NR_renameat2)
[[noreturn]]
#endif
void renameNoReplace(const std::string & old_path, const std::string & new_path)
{
renameat2(old_path, new_path, RENAME_NOREPLACE);
}
#if !defined(__NR_renameat2)
[[noreturn]]
#endif
void renameExchange(const std::string & old_path, const std::string & new_path)
{
renameat2(old_path, new_path, RENAME_EXCHANGE);

View File

@ -38,6 +38,8 @@ void DatabaseLazy::loadStoredObjects(
Context & context,
bool /* has_force_restore_data_flag */)
{
Poco::File(context.getPath() + getDataPath()).createDirectories();
Poco::File(getMetadataPath()).createDirectories();
iterateMetadataFiles(context, [this](const String & file_name)
{
const std::string table_name = file_name.substr(0, file_name.size() - 4);

View File

@ -123,13 +123,11 @@ String getObjectDefinitionFromCreateQuery(const ASTPtr & query)
return statement_stream.str();
}
DatabaseOnDisk::DatabaseOnDisk(const String & name, const String & metadata_path_, const String & logger, const Context & context)
DatabaseOnDisk::DatabaseOnDisk(const String & name, const String & metadata_path_, const String & logger, const Context &)
: DatabaseWithOwnTablesBase(name, logger)
, metadata_path(metadata_path_)
, data_path("data/" + escapeForFileName(database_name) + "/")
{
Poco::File(context.getPath() + getDataPath()).createDirectories();
Poco::File(getMetadataPath()).createDirectories();
}

View File

@ -113,6 +113,8 @@ void DatabaseOrdinary::loadStoredObjects(
Context & context,
bool has_force_restore_data_flag)
{
Poco::File(context.getPath() + getDataPath()).createDirectories();
Poco::File(getMetadataPath()).createDirectories();
/** Tables load faster if they are loaded in sorted (by name) order.
* Otherwise (for the ext4 filesystem), `DirectoryIterator` iterates through them in some order,
* which does not correspond to order tables creation and does not correspond to order of their location on disk.

View File

@ -139,6 +139,12 @@ void DatabaseCatalog::shutdown()
for (auto & database : current_databases)
database.second->shutdown();
{
std::lock_guard lock(tables_marked_droped_mutex);
for (auto & elem : tables_marked_droped)
if (elem.need_shutdown)
elem.table->shutdown();
}
std::lock_guard lock(databases_mutex);
assert(std::find_if_not(uuid_map.begin(), uuid_map.end(), [](const auto & elem) { return elem.map.empty(); }) == uuid_map.end());
@ -514,6 +520,7 @@ void DatabaseCatalog::enqueueDroppedTableCleanup(StorageID table_id, StoragePtr
assert(dropped_metadata_path == getPathForDroppedMetadata(table_id));
time_t drop_time;
bool need_shutdown = true;
if (table)
drop_time = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
else
@ -550,13 +557,14 @@ void DatabaseCatalog::enqueueDroppedTableCleanup(StorageID table_id, StoragePtr
}
drop_time = Poco::File(dropped_metadata_path).getLastModified().epochTime();
need_shutdown = false;
}
std::lock_guard lock(tables_marked_droped_mutex);
if (ignore_delay)
tables_marked_droped.push_front({table_id, table, dropped_metadata_path, 0});
tables_marked_droped.push_front({table_id, table, dropped_metadata_path, 0, need_shutdown});
else
tables_marked_droped.push_back({table_id, table, dropped_metadata_path, drop_time});
tables_marked_droped.push_back({table_id, table, dropped_metadata_path, drop_time, need_shutdown});
}
void DatabaseCatalog::dropTableDataTask()
@ -573,14 +581,21 @@ void DatabaseCatalog::dropTableDataTask()
//LOG_INFO(log, "Check table " + elem.table_id.getNameForLogs() + ": " +
// "refcount = " + std::to_string(elem.table.use_count()) + ", " +
// "time elapsed = " + std::to_string(current_time - elem.drop_time));
return (!elem.table || elem.table.unique()) && elem.drop_time + drop_delay_s < current_time;
bool not_in_use = !elem.table || elem.table.unique();
bool old_enough = elem.drop_time + drop_delay_s < current_time;
return (not_in_use && old_enough) || elem.need_shutdown;
});
if (it != tables_marked_droped.end())
if (it != tables_marked_droped.end() && !it->need_shutdown)
{
table = std::move(*it);
LOG_INFO(log, "Will try drop " + table.table_id.getNameForLogs());
tables_marked_droped.erase(it);
}
else if (it->need_shutdown)
{
table = *it;
it->need_shutdown = false;
}
}
catch (...)
{
@ -589,6 +604,13 @@ void DatabaseCatalog::dropTableDataTask()
if (table.table_id)
{
if (table.need_shutdown)
{
table.table->shutdown();
(*drop_task)->scheduleAfter(0);
return;
}
try
{
dropTableFinally(table);

View File

@ -185,6 +185,7 @@ private:
StoragePtr table;
String metadata_path;
time_t drop_time;
bool need_shutdown{true};
};
using TablesMarkedAsDropped = std::list<TableMarkedAsDropped>;

View File

@ -92,8 +92,11 @@ BlockIO InterpreterDropQuery::executeToTable(
{
context.checkAccess(table->isView() ? AccessType::DROP_VIEW : AccessType::DROP_TABLE, table_id);
table->shutdown();
/// If table was already dropped by anyone, an exception will be thrown
auto table_lock = table->lockExclusively(context.getCurrentQueryId());
TableStructureWriteLockHolder table_lock;
if (database->getEngineName() != "Atomic")
{
table_lock = table->lockExclusively(context.getCurrentQueryId());
}
/// Drop table from memory, don't touch data and metadata
database->detachTable(table_id.table_name);
}
@ -112,12 +115,12 @@ BlockIO InterpreterDropQuery::executeToTable(
context.checkAccess(table->isView() ? AccessType::DROP_VIEW : AccessType::DROP_TABLE, table_id);
table->checkTableCanBeDropped();
table->shutdown();
/// If table was already dropped by anyone, an exception will be thrown
TableStructureWriteLockHolder table_lock;
if (database->getEngineName() != "Atomic")
{
table->shutdown();
table_lock = table->lockExclusively(context.getCurrentQueryId());
}
database->dropTable(context, table_id.table_name, query.no_delay);
}

View File

@ -1,4 +1,3 @@
<?xml version="1.0"?>
<yandex>
<profiles>
<default/>

View File

@ -83,7 +83,8 @@ def drop_table(cluster):
node = cluster.instances["node"]
minio = cluster.minio_client
node.query("DROP TABLE IF EXISTS s3_test")
node.query("DROP TABLE IF EXISTS s3_test NO DELAY")
time.sleep(1)
assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == 0
@ -301,7 +302,8 @@ def test_move_replace_partition_to_another_table(cluster):
time.sleep(3)
assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == FILES_OVERHEAD*2 + FILES_OVERHEAD_PER_PART_WIDE*4
node.query("DROP TABLE s3_clone")
node.query("DROP TABLE s3_clone NO DELAY")
time.sleep(1)
assert node.query("SELECT sum(id) FROM s3_test FORMAT Values") == "(0)"
assert node.query("SELECT count(*) FROM s3_test FORMAT Values") == "(16384)"
# Data should remain in S3
@ -311,7 +313,8 @@ def test_move_replace_partition_to_another_table(cluster):
# Number S3 objects should be unchanged.
assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE*4
node.query("DROP TABLE s3_test")
node.query("DROP TABLE s3_test NO DELAY")
time.sleep(1)
# Backup data should remain in S3.
assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == FILES_OVERHEAD_PER_PART_WIDE*4