ClickHouse/dbms/src/Databases/DatabaseAtomic.cpp

89 lines
2.9 KiB
C++
Raw Normal View History

2019-10-23 13:46:38 +00:00
#include <Databases/DatabaseAtomic.h>
2019-10-30 12:17:52 +00:00
#include <Databases/DatabaseOnDisk.h>
2019-11-11 11:34:03 +00:00
#include <Poco/File.h>
#include <IO/ReadHelpers.h>
2019-10-23 13:46:38 +00:00
namespace DB
{
2019-11-11 11:34:03 +00:00
namespace ErrorCodes
{
extern const int UNKNOWN_TABLE;
extern const int TABLE_ALREADY_EXISTS;
2019-11-11 14:28:28 +00:00
extern const int FILE_DOESNT_EXIST;
2019-11-11 11:34:03 +00:00
}
2019-10-23 13:46:38 +00:00
DatabaseAtomic::DatabaseAtomic(String name_, String metadata_path_, const Context & context_)
: DatabaseOrdinary(name_, metadata_path_, context_)
{
2019-11-11 11:34:03 +00:00
data_path = "store/";
2019-11-28 19:40:51 +00:00
log = &Logger::get("DatabaseAtomic (" + name_ + ")");
2019-11-11 11:34:03 +00:00
}
String DatabaseAtomic::getDataPath(const String & table_name) const
{
2019-12-02 19:11:18 +00:00
std::lock_guard lock(mutex);
2019-11-11 11:34:03 +00:00
auto it = table_name_to_path.find(table_name);
if (it == table_name_to_path.end())
throw Exception("Table " + table_name + " not found in database " + getDatabaseName(), ErrorCodes::UNKNOWN_TABLE);
return data_path + it->second;
}
String DatabaseAtomic::getDataPath(const ASTCreateQuery & query) const
{
stringToUUID(query.uuid); /// Check UUID is valid
const size_t uuid_prefix_len = 3;
return data_path + query.uuid.substr(0, uuid_prefix_len) + '/' + query.uuid + '/';
}
void DatabaseAtomic::drop(const Context &)
{
Poco::File(getMetadataPath()).remove(false);
}
void DatabaseAtomic::attachTable(const String & name, const StoragePtr & table, const String & relative_table_path)
{
DatabaseWithDictionaries::attachTable(name, table, relative_table_path);
std::lock_guard lock(mutex);
2019-12-02 19:11:18 +00:00
table_name_to_path.emplace(std::make_pair(name, relative_table_path));
2019-10-23 13:46:38 +00:00
}
2019-11-11 11:34:03 +00:00
StoragePtr DatabaseAtomic::detachTable(const String & name)
{
{
std::lock_guard lock(mutex);
table_name_to_path.erase(name);
}
return DatabaseWithDictionaries::detachTable(name);
}
2019-10-23 13:46:38 +00:00
2019-11-11 14:28:28 +00:00
void DatabaseAtomic::renameTable(const Context & context, const String & table_name, IDatabase & to_database,
2019-12-02 19:11:18 +00:00
const String & to_table_name)
2019-11-11 14:28:28 +00:00
{
if (typeid(*this) != typeid(to_database))
2019-12-02 19:11:18 +00:00
{
if (!typeid_cast<DatabaseOrdinary *>(&to_database))
throw Exception("Moving tables between databases of different engines is not supported", ErrorCodes::NOT_IMPLEMENTED);
/// Allow moving tables from Atomic to Ordinary (with table lock)
DatabaseOnDisk::renameTable(context, table_name, to_database, to_table_name);
return;
}
2019-11-11 14:28:28 +00:00
StoragePtr table = tryGetTable(context, table_name);
if (!table)
throw Exception("Table " + backQuote(getDatabaseName()) + "." + backQuote(table_name) + " doesn't exist.", ErrorCodes::UNKNOWN_TABLE);
2019-12-02 19:11:18 +00:00
/// Update database and table name in memory without moving any data on disk
2019-11-11 14:28:28 +00:00
table->renameInMemory(to_database.getDatabaseName(), to_table_name);
/// NOTE Non-atomic.
2019-12-02 19:11:18 +00:00
to_database.attachTable(to_table_name, table, getDataPath(table_name));
detachTable(table_name);
Poco::File(getObjectMetadataPath(table_name)).renameTo(to_database.getObjectMetadataPath(to_table_name));
2019-11-11 14:28:28 +00:00
}
2019-10-23 13:46:38 +00:00
}