Merge pull request #2102 from yandex/fix-detach-database

Shutdown storages when DETACHing a database
This commit is contained in:
alexey-milovidov 2018-03-29 03:02:45 +03:00 committed by GitHub
commit a2bc046202
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 164 additions and 134 deletions

View File

@ -125,7 +125,6 @@ namespace ErrorCodes
extern const int UNKNOWN_SETTING = 115;
extern const int THERE_IS_NO_DEFAULT_VALUE = 116;
extern const int INCORRECT_DATA = 117;
extern const int TABLE_METADATA_DOESNT_EXIST = 118;
extern const int ENGINE_REQUIRED = 119;
extern const int CANNOT_INSERT_VALUE_OF_DIFFERENT_SIZE_INTO_TUPLE = 120;
extern const int UNKNOWN_SET_DATA_VARIANT = 121;

View File

@ -91,7 +91,7 @@ StoragePtr DatabaseDictionary::tryGetTable(
DatabaseIteratorPtr DatabaseDictionary::getIterator(const Context & /*context*/)
{
return std::make_unique<DatabaseSnaphotIterator>(loadTables());
return std::make_unique<DatabaseSnapshotIterator>(loadTables());
}
bool DatabaseDictionary::empty(const Context & /*context*/) const

View File

@ -8,75 +8,22 @@ namespace DB
namespace ErrorCodes
{
extern const int TABLE_ALREADY_EXISTS;
extern const int UNKNOWN_TABLE;
extern const int LOGICAL_ERROR;
extern const int CANNOT_GET_CREATE_TABLE_QUERY;
}
DatabaseMemory::DatabaseMemory(String name_)
: DatabaseWithOwnTablesBase(std::move(name_))
, log(&Logger::get("DatabaseMemory(" + name + ")"))
{}
void DatabaseMemory::loadTables(
Context & /*context*/,
ThreadPool * /*thread_pool*/,
bool /*has_force_restore_data_flag*/)
{
log = &Logger::get("DatabaseMemory(" + name + ")");
/// Nothing to load.
}
bool DatabaseMemory::isTableExist(
const Context & /*context*/,
const String & table_name) const
{
std::lock_guard<std::mutex> lock(mutex);
return tables.find(table_name) != tables.end();
}
StoragePtr DatabaseMemory::tryGetTable(
const Context & /*context*/,
const String & table_name) const
{
std::lock_guard<std::mutex> lock(mutex);
auto it = tables.find(table_name);
if (it == tables.end())
return {};
return it->second;
}
DatabaseIteratorPtr DatabaseMemory::getIterator(const Context & /*context*/)
{
std::lock_guard<std::mutex> lock(mutex);
return std::make_unique<DatabaseSnaphotIterator>(tables);
}
bool DatabaseMemory::empty(const Context & /*context*/) const
{
std::lock_guard<std::mutex> lock(mutex);
return tables.empty();
}
StoragePtr DatabaseMemory::detachTable(const String & table_name)
{
StoragePtr res;
{
std::lock_guard<std::mutex> lock(mutex);
auto it = tables.find(table_name);
if (it == tables.end())
throw Exception("Table " + name + "." + table_name + " doesn't exist.", ErrorCodes::UNKNOWN_TABLE);
res = it->second;
tables.erase(it);
}
return res;
}
void DatabaseMemory::attachTable(const String & table_name, const StoragePtr & table)
{
std::lock_guard<std::mutex> lock(mutex);
if (!tables.emplace(table_name, table).second)
throw Exception("Table " + name + "." + table_name + " already exists.", ErrorCodes::TABLE_ALREADY_EXISTS);
}
void DatabaseMemory::createTable(
const Context & /*context*/,
const String & table_name,
@ -125,31 +72,12 @@ ASTPtr DatabaseMemory::getCreateTableQuery(
throw Exception("There is no CREATE TABLE query for DatabaseMemory tables", ErrorCodes::CANNOT_GET_CREATE_TABLE_QUERY);
}
ASTPtr DatabaseMemory::getCreateDatabaseQuery(const Context &) const
ASTPtr DatabaseMemory::getCreateDatabaseQuery(
const Context &) const
{
throw Exception("There is no CREATE DATABASE query for DatabaseMemory", ErrorCodes::CANNOT_GET_CREATE_TABLE_QUERY);
}
void DatabaseMemory::shutdown()
{
/// You can not hold a lock during shutdown.
/// Because inside `shutdown` function tables can work with database, and mutex is not recursive.
Tables tables_snapshot;
{
std::lock_guard<std::mutex> lock(mutex);
tables_snapshot = tables;
}
for (const auto & kv: tables_snapshot)
{
kv.second->shutdown();
}
std::lock_guard<std::mutex> lock(mutex);
tables.clear();
}
void DatabaseMemory::drop()
{
/// Additional actions to delete database are not required.

View File

@ -1,8 +1,6 @@
#pragma once
#include <mutex>
#include <Databases/IDatabase.h>
#include <Storages/IStorage.h>
#include <Databases/DatabasesCommon.h>
namespace Poco { class Logger; }
@ -16,18 +14,10 @@ namespace DB
* All tables are created by calling code.
* TODO: Maybe DatabaseRuntime is more suitable class name.
*/
class DatabaseMemory : public IDatabase
class DatabaseMemory : public DatabaseWithOwnTablesBase
{
protected:
const String name;
mutable std::mutex mutex;
Tables tables;
Poco::Logger * log;
public:
DatabaseMemory(const String & name_) : name(name_) {}
DatabaseMemory(String name_);
String getEngineName() const override { return "Memory"; }
@ -36,18 +26,6 @@ public:
ThreadPool * thread_pool,
bool has_force_restore_data_flag) override;
bool empty(const Context & context) const override;
DatabaseIteratorPtr getIterator(const Context & context) override;
bool isTableExist(
const Context & context,
const String & table_name) const override;
StoragePtr tryGetTable(
const Context & context,
const String & table_name) const override;
void createTable(
const Context & context,
const String & table_name,
@ -58,9 +36,6 @@ public:
const Context & context,
const String & table_name) override;
void attachTable(const String & table_name, const StoragePtr & table) override;
StoragePtr detachTable(const String & table_name) override;
void renameTable(
const Context & context,
const String & table_name,
@ -82,8 +57,10 @@ public:
ASTPtr getCreateDatabaseQuery(const Context & context) const override;
void shutdown() override;
void drop() override;
private:
Poco::Logger * log;
};
}

View File

@ -25,7 +25,6 @@ namespace ErrorCodes
{
extern const int TABLE_ALREADY_EXISTS;
extern const int UNKNOWN_TABLE;
extern const int TABLE_METADATA_DOESNT_EXIST;
extern const int CANNOT_CREATE_TABLE_FROM_METADATA;
extern const int INCORRECT_FILE_NAME;
extern const int FILE_DOESNT_EXIST;
@ -101,8 +100,11 @@ static void loadTable(
}
DatabaseOrdinary::DatabaseOrdinary(const String & name_, const String & metadata_path_, const Context & context)
: DatabaseMemory(name_), metadata_path(metadata_path_), data_path(context.getPath() + "data/" + escapeForFileName(name_) + "/")
DatabaseOrdinary::DatabaseOrdinary(String name_, const String & metadata_path_, const Context & context)
: DatabaseWithOwnTablesBase(std::move(name_))
, metadata_path(metadata_path_)
, data_path(context.getPath() + "data/" + escapeForFileName(name) + "/")
, log(&Logger::get("DatabaseOrdinary (" + name + ")"))
{
Poco::File(data_path).createDirectories();
}
@ -113,8 +115,6 @@ void DatabaseOrdinary::loadTables(
ThreadPool * thread_pool,
bool has_force_restore_data_flag)
{
log = &Logger::get("DatabaseOrdinary (" + name + ")");
using FileNames = std::vector<std::string>;
FileNames file_names;
@ -513,7 +513,6 @@ void DatabaseOrdinary::drop()
/// No additional removal actions are required.
}
void DatabaseOrdinary::alterTable(
const Context & context,
const String & name,

View File

@ -1,6 +1,6 @@
#pragma once
#include <Databases/DatabaseMemory.h>
#include <Databases/DatabasesCommon.h>
namespace DB
@ -10,14 +10,10 @@ namespace DB
* It stores tables list in filesystem using list of .sql files,
* that contain declaration of table represented by SQL ATTACH TABLE query.
*/
class DatabaseOrdinary : public DatabaseMemory
class DatabaseOrdinary : public DatabaseWithOwnTablesBase
{
protected:
const String metadata_path;
const String data_path;
public:
DatabaseOrdinary(const String & name_, const String & metadata_path_, const Context & context);
DatabaseOrdinary(String name_, const String & metadata_path_, const Context & context);
String getEngineName() const override { return "Ordinary"; }
@ -70,6 +66,10 @@ public:
void drop() override;
private:
const String metadata_path;
const String data_path;
Poco::Logger * log;
void startupTables(ThreadPool * thread_pool);
ASTPtr getCreateTableQueryImpl(const Context & context, const String & table_name, bool throw_on_error) const;

View File

@ -16,6 +16,9 @@ namespace DB
namespace ErrorCodes
{
extern const int EMPTY_LIST_OF_COLUMNS_PASSED;
extern const int TABLE_ALREADY_EXISTS;
extern const int UNKNOWN_TABLE;
extern const int LOGICAL_ERROR;
}
@ -80,4 +83,90 @@ std::pair<String, StoragePtr> createTableFromDefinition(
};
}
bool DatabaseWithOwnTablesBase::isTableExist(
const Context & /*context*/,
const String & table_name) const
{
std::lock_guard<std::mutex> lock(mutex);
return tables.find(table_name) != tables.end();
}
StoragePtr DatabaseWithOwnTablesBase::tryGetTable(
const Context & /*context*/,
const String & table_name) const
{
std::lock_guard<std::mutex> lock(mutex);
auto it = tables.find(table_name);
if (it == tables.end())
return {};
return it->second;
}
DatabaseIteratorPtr DatabaseWithOwnTablesBase::getIterator(const Context & /*context*/)
{
std::lock_guard<std::mutex> lock(mutex);
return std::make_unique<DatabaseSnapshotIterator>(tables);
}
bool DatabaseWithOwnTablesBase::empty(const Context & /*context*/) const
{
std::lock_guard<std::mutex> lock(mutex);
return tables.empty();
}
StoragePtr DatabaseWithOwnTablesBase::detachTable(const String & table_name)
{
StoragePtr res;
{
std::lock_guard<std::mutex> lock(mutex);
auto it = tables.find(table_name);
if (it == tables.end())
throw Exception("Table " + name + "." + table_name + " doesn't exist.", ErrorCodes::UNKNOWN_TABLE);
res = it->second;
tables.erase(it);
}
return res;
}
void DatabaseWithOwnTablesBase::attachTable(const String & table_name, const StoragePtr & table)
{
std::lock_guard<std::mutex> lock(mutex);
if (!tables.emplace(table_name, table).second)
throw Exception("Table " + name + "." + table_name + " already exists.", ErrorCodes::TABLE_ALREADY_EXISTS);
}
void DatabaseWithOwnTablesBase::shutdown()
{
/// You can not hold a lock during shutdown.
/// Because inside `shutdown` function tables can work with database, and mutex is not recursive.
Tables tables_snapshot;
{
std::lock_guard<std::mutex> lock(mutex);
tables_snapshot = tables;
}
for (const auto & kv: tables_snapshot)
{
kv.second->shutdown();
}
std::lock_guard<std::mutex> lock(mutex);
tables.clear();
}
DatabaseWithOwnTablesBase::~DatabaseWithOwnTablesBase()
{
try
{
shutdown();
}
catch(...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
}

View File

@ -36,17 +36,17 @@ std::pair<String, StoragePtr> createTableFromDefinition(
/// Copies list of tables and iterates through such snapshot.
class DatabaseSnaphotIterator : public IDatabaseIterator
class DatabaseSnapshotIterator : public IDatabaseIterator
{
private:
Tables tables;
Tables::iterator it;
public:
DatabaseSnaphotIterator(Tables & tables_)
DatabaseSnapshotIterator(Tables & tables_)
: tables(tables_), it(tables.begin()) {}
DatabaseSnaphotIterator(Tables && tables_)
DatabaseSnapshotIterator(Tables && tables_)
: tables(tables_), it(tables.begin()) {}
void next() override
@ -70,4 +70,38 @@ public:
}
};
/// A base class for databases that manage their own list of tables.
class DatabaseWithOwnTablesBase : public IDatabase
{
public:
bool isTableExist(
const Context & context,
const String & table_name) const override;
StoragePtr tryGetTable(
const Context & context,
const String & table_name) const override;
bool empty(const Context & context) const override;
void attachTable(const String & table_name, const StoragePtr & table) override;
StoragePtr detachTable(const String & table_name) override;
DatabaseIteratorPtr getIterator(const Context & context) override;
void shutdown() override;
virtual ~DatabaseWithOwnTablesBase() override;
protected:
String name;
mutable std::mutex mutex;
Tables tables;
DatabaseWithOwnTablesBase(String name_) : name(std::move(name_)) { }
};
}

View File

@ -74,7 +74,6 @@ namespace ErrorCodes
extern const int TABLE_ALREADY_EXISTS;
extern const int TABLE_WAS_NOT_DROPPED;
extern const int DATABASE_ALREADY_EXISTS;
extern const int TABLE_METADATA_DOESNT_EXIST;
extern const int THERE_IS_NO_SESSION;
extern const int THERE_IS_NO_QUERY;
extern const int NO_ELEMENTS_IN_CONFIG;

View File

@ -46,8 +46,6 @@ namespace DB
namespace ErrorCodes
{
extern const int DIRECTORY_DOESNT_EXIST;
extern const int DIRECTORY_ALREADY_EXISTS;
extern const int TABLE_ALREADY_EXISTS;
extern const int EMPTY_LIST_OF_COLUMNS_PASSED;
extern const int INCORRECT_QUERY;

View File

@ -41,7 +41,8 @@ BlockIO InterpreterDropQuery::execute()
if (drop_database && drop.detach)
{
context.detachDatabase(drop.database);
auto database = context.detachDatabase(drop.database);
database->shutdown();
return {};
}

View File

@ -6,7 +6,7 @@
#include <Poco/String.h>
#include <Poco/Logger.h>
#include <Poco/NullChannel.h>
#include <Databases/DatabaseOrdinary.h>
#include <Databases/DatabaseMemory.h>
#include <Storages/System/attachSystemTables.h>
#include <Interpreters/Context.h>
#include <Interpreters/ProcessList.h>

View File

@ -35,6 +35,7 @@ class ASTCreateQuery;
class IStorage;
using StoragePtr = std::shared_ptr<IStorage>;
using StorageWeakPtr = std::weak_ptr<IStorage>;
struct Settings;

View File

@ -24,6 +24,7 @@ namespace ErrorCodes
extern const int ABORTED;
extern const int BAD_SIZE_OF_FILE_IN_DATA_PART;
extern const int CANNOT_WRITE_TO_OSTREAM;
extern const int UNKNOWN_TABLE;
}
namespace DataPartsExchange
@ -68,6 +69,9 @@ void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & /*bo
++data.current_table_sends;
SCOPE_EXIT({--data.current_table_sends;});
StoragePtr owned_storage = storage.lock();
if (!owned_storage)
throw Exception("The table was already dropped", ErrorCodes::UNKNOWN_TABLE);
LOG_TRACE(log, "Sending part " << part_name);
@ -122,7 +126,7 @@ void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & /*bo
}
catch (const NetException & e)
{
/// Network error or error on remote side. No need to enquue part for check.
/// Network error or error on remote side. No need to enqueue part for check.
throw;
}
catch (const Exception & e)

View File

@ -2,6 +2,7 @@
#include <Interpreters/InterserverIOHandler.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/IStorage.h>
#include <IO/HashingWriteBuffer.h>
#include <IO/copyData.h>
#include <IO/ConnectionTimeouts.h>
@ -19,7 +20,7 @@ class Service final : public InterserverIOEndpoint
{
public:
Service(MergeTreeData & data_, StoragePtr & storage_) : data(data_),
owned_storage(storage_), log(&Logger::get(data.getLogName() + " (Replicated PartsService)")) {}
storage(storage_), log(&Logger::get(data.getLogName() + " (Replicated PartsService)")) {}
Service(const Service &) = delete;
Service & operator=(const Service &) = delete;
@ -32,7 +33,7 @@ private:
private:
MergeTreeData & data;
StoragePtr owned_storage;
StorageWeakPtr storage;
Logger * log;
};