Cloud databases: development [#METR-19998].

This commit is contained in:
Alexey Milovidov 2016-03-28 14:19:14 +03:00
parent 605e9c4afb
commit 5c2f335cc0
7 changed files with 145 additions and 14 deletions

View File

@ -103,6 +103,7 @@ public:
ASTPtr getCreateQuery(const String & table_name) const override; ASTPtr getCreateQuery(const String & table_name) const override;
void shutdown() override; void shutdown() override;
void drop() override;
using Hash = UInt128; using Hash = UInt128;

View File

@ -44,6 +44,7 @@ public:
ASTPtr getCreateQuery(const String & table_name) const override; ASTPtr getCreateQuery(const String & table_name) const override;
void shutdown() override; void shutdown() override;
void drop() override;
}; };
} }

View File

@ -74,6 +74,9 @@ public:
/// Попросить все таблицы завершить фоновые потоки, которые они используют, и удалить все объекты таблиц. /// Попросить все таблицы завершить фоновые потоки, которые они используют, и удалить все объекты таблиц.
virtual void shutdown() = 0; virtual void shutdown() = 0;
/// Удалить метаданные, удаление которых отличается от рекурсивного удаления директории, если такие есть.
virtual void drop() = 0;
virtual ~IDatabase() {} virtual ~IDatabase() {}
}; };

View File

@ -1,17 +1,12 @@
#pragma once #pragma once
#include <DB/Common/OptimizedRegularExpression.h> #include <DB/Common/OptimizedRegularExpression.h>
#include <DB/Interpreters/Context.h>
#include <DB/Storages/IStorage.h> #include <DB/Storages/IStorage.h>
namespace DB namespace DB
{ {
class StorageMerge;
typedef Poco::SharedPtr<StorageMerge> StorageMergePtr;
/** Таблица, представляющая собой объединение произвольного количества других таблиц. /** Таблица, представляющая собой объединение произвольного количества других таблиц.
* У всех таблиц должна быть одинаковая структура. * У всех таблиц должна быть одинаковая структура.
*/ */

View File

@ -109,9 +109,10 @@ String DatabaseCloud::getNameOfNodeWithTables(const String & table_name) const
{ {
Hash hash = getTableHash(table_name); Hash hash = getTableHash(table_name);
String res; String res;
WriteBufferFromString out(res); {
writeText(hash.first % TABLE_TO_NODE_DIVISOR, out); WriteBufferFromString out(res);
out.next(); writeText(hash.first % TABLE_TO_NODE_DIVISOR, out);
}
return res; return res;
} }
@ -119,11 +120,11 @@ String DatabaseCloud::getNameOfNodeWithTables(const String & table_name) const
static String hashToHex(Hash hash) static String hashToHex(Hash hash)
{ {
String res; String res;
WriteBufferFromString str_out(res); {
HexWriteBuffer hex_out(str_out); WriteBufferFromString str_out(res);
writePODBinary(hash, hex_out); HexWriteBuffer hex_out(str_out);
hex_out.next(); writePODBinary(hash, hex_out);
str_out.next(); }
return res; return res;
} }
@ -177,6 +178,16 @@ struct TableSet
read(in); read(in);
} }
String toString() const
{
String res;
{
WriteBufferFromString out(res);
write(out);
}
return res;
}
void write(WriteBuffer & buf) const void write(WriteBuffer & buf) const
{ {
writeCString("Version 1\n", buf); writeCString("Version 1\n", buf);
@ -221,6 +232,16 @@ struct LocalTableSet
read(in); read(in);
} }
String toString() const
{
String res;
{
WriteBufferFromString out(res);
write(out);
}
return res;
}
void write(WriteBuffer & buf) const void write(WriteBuffer & buf) const
{ {
writeCString("Version 1\n", buf); writeCString("Version 1\n", buf);
@ -469,6 +490,101 @@ ASTPtr DatabaseCloud::getCreateQuery(const String & table_name) const
} }
void DatabaseCloud::attachTable(const String & table_name, const StoragePtr & table)
{
throw Exception("Attaching tables to cloud database is not supported", ErrorCodes::NOT_IMPLEMENTED);
}
StoragePtr DatabaseCloud::detachTable(const String & table_name)
{
throw Exception("Detaching tables from cloud database is not supported", ErrorCodes::NOT_IMPLEMENTED);
}
void DatabaseCloud::removeTable(const String & table_name)
{
zkutil::ZooKeeperPtr zookeeper = context.getZooKeeper();
/// Ищем локальную таблицу.
/// Если не нашли - ищем облачную таблицу в ZooKeeper.
String table_name_escaped = escapeForFileName(table_name);
if (Poco::File(data_path + table_name_escaped).exists())
{
/// Удаляем информация о локальной таблице из ZK.
while (true)
{
zkutil::Stat stat;
String local_tables_node_path = zookeeper_path + "/local_tables/" + name + "/" + getNameOfNodeWithTables(table_name);
String old_local_tables_value = zookeeper->get(local_tables_node_path, &stat);
LocalTableSet local_tables_info(old_local_tables_value);
Hash table_hash = getTableHash(table_name);
auto it = local_tables_info.map.find(table_hash);
if (it == local_tables_info.map.end())
break; /// Таблицу уже удалили.
local_tables_info.map.erase(it);
String new_local_tables_value = local_tables_info.toString();
auto code = zookeeper->trySet(local_tables_node_path, new_local_tables_value, stat.version);
if (code == ZOK)
break;
else if (code == ZBADVERSION)
continue; /// Узел успели поменять - попробуем ещё раз.
else
throw zkutil::KeeperException(code, local_tables_node_path);
}
/// Удаляем локальную таблицу из кэша.
{
std::lock_guard<std::mutex> lock(local_tables_mutex);
local_tables_cache.erase(table_name);
}
}
else
{
/// Удаляем таблицу из ZK, а также запоминаем список серверов, на которых расположены локальные таблицы.
TableDescription description;
while (true)
{
zkutil::Stat stat;
String tables_node_path = zookeeper_path + "/tables/" + name + "/" + getNameOfNodeWithTables(table_name);
String old_tables_value = zookeeper->get(tables_node_path, &stat);
TableSet tables_info(old_tables_value);
auto it = tables_info.map.find(table_name);
if (it == tables_info.map.end())
break; /// Таблицу уже удалили.
description = it->second;
tables_info.map.erase(it);
String new_tables_value = tables_info.toString();
auto code = zookeeper->trySet(tables_node_path, new_tables_value, stat.version);
if (code == ZOK)
break;
else if (code == ZBADVERSION)
continue; /// Узел успели поменять - попробуем ещё раз.
else
throw zkutil::KeeperException(code, tables_node_path);
}
if (!description.local_table_name.empty() && !description.hosts.empty())
{
/// Удаление локальных таблиц.
}
}
}
void DatabaseCloud::shutdown() void DatabaseCloud::shutdown()
{ {
/// Нельзя удерживать блокировку во время shutdown. /// Нельзя удерживать блокировку во время shutdown.
@ -488,4 +604,10 @@ void DatabaseCloud::shutdown()
} }
} }
void DatabaseCloud::drop()
{
}
} }

View File

@ -428,4 +428,10 @@ void DatabaseOrdinary::shutdown()
tables.clear(); tables.clear();
} }
void DatabaseOrdinary::drop()
{
/// Дополнительных действий по удалению не требуется.
}
} }

View File

@ -126,7 +126,10 @@ BlockIO InterpreterDropQuery::execute()
throw Exception("New table appeared in database being dropped. Try dropping it again.", ErrorCodes::DATABASE_NOT_EMPTY); throw Exception("New table appeared in database being dropped. Try dropping it again.", ErrorCodes::DATABASE_NOT_EMPTY);
/// Удаляем информацию о БД из оперативки /// Удаляем информацию о БД из оперативки
context.detachDatabase(database_name); auto database = context.detachDatabase(database_name);
/// Удаляем БД.
database->drop();
Poco::File(data_path).remove(false); Poco::File(data_path).remove(false);
Poco::File(metadata_path).remove(false); Poco::File(metadata_path).remove(false);