Database engines: development [#METR-19997].

This commit is contained in:
Alexey Milovidov 2016-03-16 08:00:58 +03:00
parent 0ce482c4d5
commit c60d5116c5
10 changed files with 96 additions and 45 deletions

View File

@ -33,6 +33,8 @@ public:
ASTPtr getCreateQuery(const String & name) const override;
void drop() override;
void shutdown() override;
};

View File

@ -13,7 +13,12 @@ namespace DB
class IDatabaseIterator
{
public:
virtual StoragePtr next() = 0;
virtual void next() = 0;
virtual bool isValid() const = 0;
virtual const String & name() const = 0;
virtual StoragePtr & table() const = 0;
virtual ~IDatabaseIterator() {}
};
@ -51,6 +56,9 @@ public:
/// Получить запрос CREATE TABLE для таблицы.
virtual ASTPtr getCreateQuery(const String & name) const = 0;
/// Удалить все таблицы.
virtual void drop() = 0;
/// Попросить все таблицы завершить фоновые потоки, которые они используют, и удалить все объекты таблиц.
virtual void shutdown() = 0;

View File

@ -209,12 +209,15 @@ public:
/// Получить запрос на CREATE таблицы.
ASTPtr getCreateQuery(const String & database_name, const String & table_name) const;
/// Для методов ниже может быть необходимо захватывать mutex самостоятельно.
Poco::Mutex & getMutex() const;
const DatabasePtr getDatabase(const String & database_name) const;
DatabasePtr getDatabase(const String & database_name);
const Databases getDatabases() const;
Databases getDatabases();
/// Для методов ниже может быть необходимо захватывать mutex самостоятельно.
Poco::Mutex & getMutex() const;
Context & getSessionContext();
Context & getGlobalContext();
@ -224,6 +227,7 @@ public:
const Settings & getSettingsRef() const { return settings; };
Settings & getSettingsRef() { return settings; };
void setProgressCallback(ProgressCallback callback);
/// Используется в InterpreterSelectQuery, чтобы передать его в IProfilingBlockInputStream.
ProgressCallback getProgressCallback() const;

View File

@ -67,17 +67,18 @@ private:
StoragePtr any_table;
{
/// Список таблиц могут менять в другом потоке.
Poco::ScopedLock<Poco::Mutex> lock(context.getMutex());
context.assertDatabaseExists(source_database);
const Tables & tables = context.getDatabases().at(source_database);
for (Tables::const_iterator it = tables.begin(); it != tables.end(); ++it)
auto database = context.getDatabase(source_database);
auto iterator = database->getIterator();
while (iterator->isValid())
{
if (table_name_regexp.match(it->first))
if (table_name_regexp.match(iterator->name()))
{
any_table = it->second;
any_table = iterator->table();
break;
}
iterator->next();
}
}

View File

@ -225,6 +225,11 @@ ASTPtr DatabaseOrdinary::getCreateQuery(const String & name) const
}
void DatabaseOrdinary::drop()
{
}
void DatabaseOrdinary::shutdown()
{
std::lock_guard<std::mutex> lock(mutex);

View File

@ -179,14 +179,41 @@ const TableFunctionFactory & Context::getTableFunctionFactory() const { return
const AggregateFunctionFactory & Context::getAggregateFunctionFactory() const { return shared->aggregate_function_factory; }
InterserverIOHandler & Context::getInterserverIOHandler() { return shared->interserver_io_handler; }
Poco::Mutex & Context::getMutex() const { return shared->mutex; }
const Databases & Context::getDatabases() const { return shared->databases; }
Databases & Context::getDatabases() { return shared->databases; }
ProcessList & Context::getProcessList() { return shared->process_list; }
const ProcessList & Context::getProcessList() const { return shared->process_list; }
MergeList & Context::getMergeList() { return shared->merge_list; }
const MergeList & Context::getMergeList() const { return shared->merge_list; }
const Databases Context::getDatabases() const
{
Poco::ScopedLock<Poco::Mutex> lock(shared->mutex);
return shared->databases;
}
Databases Context::getDatabases()
{
Poco::ScopedLock<Poco::Mutex> lock(shared->mutex);
return shared->databases;
}
const DatabasePtr Context::getDatabase(const String & database_name) const
{
Poco::ScopedLock<Poco::Mutex> lock(shared->mutex);
String db = database_name.empty() ? current_database : database_name;
assertDatabaseExists(db);
return shared->databases[db];
}
DatabasePtr Context::getDatabase(const String & database_name)
{
Poco::ScopedLock<Poco::Mutex> lock(shared->mutex);
String db = database_name.empty() ? current_database : database_name;
assertDatabaseExists(db);
return shared->databases[db];
}
String Context::getPath() const
{
Poco::ScopedLock<Poco::Mutex> lock(shared->mutex);

View File

@ -27,30 +27,28 @@ void ReplicasStatusHandler::handleRequest(Poco::Net::HTTPServerRequest & request
/// Даже в случае, когда отставание небольшое, выводить подробную информацию об отставании.
bool verbose = params.get("verbose", "") == "1";
/// Собираем набор реплицируемых таблиц.
Databases replicated_tables;
{
Poco::ScopedLock<Poco::Mutex> lock(context.getMutex());
for (const auto & db : context.getDatabases())
for (const auto & table : db.second)
if (typeid_cast<const StorageReplicatedMergeTree *>(table.second.get()))
replicated_tables[db.first][table.first] = table.second;
}
const MergeTreeSettings & settings = context.getMergeTreeSettings();
bool ok = true;
std::stringstream message;
for (const auto & db : replicated_tables)
auto databases = context.getDatabases();
/// Перебираем все реплицируемые таблицы.
for (const auto & db : databases)
{
for (auto & table : db.second)
for (auto iterator = db.second->getIterator(); iterator.isValid(); iterator.next())
{
const auto & table = iterator.table();
const StorageReplicatedMergeTree * table_replicated = typeid_cast<const StorageReplicatedMergeTree *>(table.get());
if (!table_replicated)
continue;
time_t absolute_delay = 0;
time_t relative_delay = 0;
static_cast<StorageReplicatedMergeTree &>(*table.second).getReplicaDelays(absolute_delay, relative_delay);
table_replicated->getReplicaDelays(absolute_delay, relative_delay);
if ((settings.min_absolute_delay_to_close && absolute_delay >= static_cast<time_t>(settings.min_absolute_delay_to_close))
|| (settings.min_relative_delay_to_close && relative_delay >= static_cast<time_t>(settings.min_relative_delay_to_close)))

View File

@ -204,14 +204,20 @@ Block StorageMerge::getBlockWithVirtualColumns(const std::vector<StoragePtr> & s
void StorageMerge::getSelectedTables(StorageVector & selected_tables) const
{
/// Список таблиц могут менять в другом потоке.
Poco::ScopedLock<Poco::Mutex> lock(context.getMutex());
context.assertDatabaseExists(source_database);
auto database = context.getDatabase(source_database);
auto iterator = database->getIterator();
const Tables & tables = context.getDatabases().at(source_database);
for (const auto & name_table_pair : tables)
if (name_table_pair.second.get() != this && table_name_regexp.match(name_table_pair.first))
selected_tables.push_back(name_table_pair.second);
while (iterator->isValid())
{
if (table_name_regexp.match(iterator->name()))
{
auto & table = iterator->table();
if (table.get() != this)
selected_tables.emplace_back(table);
}
iterator->next();
}
}

View File

@ -7,6 +7,8 @@
#include <DB/DataTypes/DataTypesNumberFixed.h>
#include <DB/DataStreams/OneBlockInputStream.h>
#include <DB/Common/VirtualColumnUtils.h>
#include <DB/Databases/IDatabase.h>
namespace DB
{
@ -47,9 +49,7 @@ BlockInputStreams StorageSystemColumns::read(
std::map<std::pair<std::string, std::string>, StoragePtr> storages;
{
Poco::ScopedLock<Poco::Mutex> lock(context.getMutex());
const Databases & databases = context.getDatabases();
Databases databases = context.getDatabases();
/// Добавляем столбец database.
ColumnPtr database_column = new ColumnString;
@ -72,13 +72,14 @@ BlockInputStreams StorageSystemColumns::read(
for (size_t i = 0; i < rows; ++i)
{
const std::string database_name = (*database_column)[i].get<std::string>();
const Tables & tables = databases.at(database_name);
const DatabasePtr database = databases.at(database_name);
offsets[i] = i ? offsets[i - 1] : 0;
for (const auto & table : tables)
for (auto iterator = database->getIterator(); iterator->isValid(); iterator->next())
{
storages.insert(std::make_pair(std::make_pair(database_name, table.first), table.second));
table_column->insert(table.first);
const String & table_name = iterator->name();
storages.emplace({ { database_name, table_name }, iterator->table() });
table_column->insert(table_name);
offsets[i] += 1;
}
}

View File

@ -39,10 +39,9 @@ BlockInputStreams StorageSystemDatabases::read(
col_name.column = new ColumnString;
block.insert(col_name);
Poco::ScopedLock<Poco::Mutex> lock(context.getMutex());
for (Databases::const_iterator it = context.getDatabases().begin(); it != context.getDatabases().end(); ++it)
col_name.column->insert(it->first);
auto databases = context.getDatabases();
for (const auto & database : databases)
col_name.column->insert(database.first);
return BlockInputStreams(1, new OneBlockInputStream(block));
}