table functions: main interface implementation, function merge implementation, plug for remote function [METR-9750]

This commit is contained in:
Sergey Fedorov 2014-01-28 16:45:10 +00:00
parent 8beae0f470
commit a7edba5032
10 changed files with 254 additions and 15 deletions

View File

@ -16,6 +16,7 @@
#include <DB/AggregateFunctions/AggregateFunctionFactory.h>
#include <DB/DataTypes/DataTypeFactory.h>
#include <DB/Storages/StorageFactory.h>
#include <DB/TableFunctions/TableFunctionFactory.h>
#include <DB/Interpreters/Settings.h>
#include <DB/Interpreters/Users.h>
#include <DB/Interpreters/Quota.h>
@ -28,6 +29,8 @@
namespace DB
{
class TableFunctionFactory;
using Poco::SharedPtr;
/// имя таблицы -> таблица
@ -72,6 +75,7 @@ struct ContextShared
String path; /// Путь к директории с данными, со слешем на конце.
Databases databases; /// Список БД и таблиц в них.
DatabaseDroppers database_droppers; /// Reference counter'ы для ленивого удаления БД.
TableFunctionFactory table_function_factory; /// Табличные функции.
FunctionFactory function_factory; /// Обычные функции.
AggregateFunctionFactory aggregate_function_factory; /// Агрегатные функции.
DataTypeFactory data_type_factory; /// Типы данных.
@ -203,6 +207,7 @@ public:
/// Установить настройку по имени.
void setSetting(const String & name, const Field & value);
const TableFunctionFactory & getTableFunctionFactory() const { return shared->table_function_factory; }
const FunctionFactory & getFunctionFactory() const { return shared->function_factory; }
const AggregateFunctionFactory & getAggregateFunctionFactory() const { return shared->aggregate_function_factory; }
const DataTypeFactory & getDataTypeFactory() const { return shared->data_type_factory; }

View File

@ -5,7 +5,7 @@
#include <DB/Interpreters/ExpressionAnalyzer.h>
#include <DB/DataStreams/IBlockInputStream.h>
#include <DB/Parsers/ASTSelectQuery.h>
#include <DB/TableFunctions/ITableFunction.h>
namespace DB
{
@ -77,6 +77,7 @@ private:
size_t subquery_depth;
ExpressionAnalyzerPtr query_analyzer;
BlockInputStreams streams;
StoragePtr table_function_storage;
Logger * log;
};

View File

@ -0,0 +1,58 @@
#pragma once
#include <Poco/SharedPtr.h>
#include <DB/Storages/StoragePtr.h>
#include <DB/Parsers/ASTFunction.h>
#include <DB/Interpreters/Context.h>
namespace DB
{
/** Интерфейс для табличных функций.
*
* Табличные функции не имеют отношения к другим функциям.
* Табличная функция может быть указана в секции FROM вместо [db.]table
* Табличная функция возвращает временный объект StoragePtr, который используется для выполнения запроса.
*
* Пример:
* SELECT count() FROM remote('example01-01-1', merge, hits)
* - пойти на example01-01-1, в БД merge, таблицу hits.
*/
class ITableFunction
{
public:
/// Получить основное имя функции.
virtual std::string getName() const = 0;
/// Создать storage в соответствии с запросом
virtual StoragePtr execute(ASTPtr ast_function, Context & context)
{
throw Exception("execute is not implemented for " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}
protected:
/// Сгенерировать уникальное имя для временной таблицы.
String ChooseName () const {
String result = "TemproraryTable" + getName() + "Id";
for (size_t i = 0; i < 10; ++i)
{
int x = rand() % 62;
char now;
if (x < 10)
now = '0' + rand() % 10;
else if (x < 36)
now = 'a' + x - 10;
else
now = 'A' + x - 36;
result += now;
}
return result;
}
};
typedef SharedPtr<ITableFunction> TableFunctionPtr;
}

View File

@ -0,0 +1,23 @@
#pragma once
#include <DB/TableFunctions/ITableFunction.h>
namespace DB
{
class Context;
class ITableFunction;
/** Позволяет получить табличную функцию по ее имени.
*/
class TableFunctionFactory
{
public:
TableFunctionPtr get(
const String & name,
const Context & context) const;
};
}

View File

@ -0,0 +1,71 @@
#pragma once
#include <statdaemons/OptimizedRegularExpression.h>
#include <DB/Storages/StorageFactory.h>
#include <DB/Storages/StorageMerge.h>
#include <DB/Parsers/ASTExpressionList.h>
#include <DB/Parsers/ASTIdentifier.h>
#include <DB/Parsers/ASTLiteral.h>
#include <DB/TableFunctions/ITableFunction.h>
namespace DB
{
/*
* merge(db_name, tables_regexp)- создаёт временный StorageMerge.
* руктура таблицы берётся из первой попавшейся таблицы, подходящей под регексп.
* Если такой таблицы нет - кидается исключение.
*/
class TableFunctionMerge: public ITableFunction
{
public:
std::string getName() const { return "merge"; }
StoragePtr execute(ASTPtr ast_function, Context & context)
{
ASTs & args_func = dynamic_cast<ASTFunction &>(*ast_function).children;
if (args_func.size() != 1)
throw Exception("Storage Merge requires exactly 2 parameters"
" - name of source database and regexp for table names.",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
ASTs & args = dynamic_cast<ASTExpressionList &>(*args_func.at(0)).children;
if (args.size() != 2)
throw Exception("Storage Merge requires exactly 2 parameters"
" - name of source database and regexp for table names.",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
String table_name_regexp = safeGet<const String &>(dynamic_cast<ASTLiteral &>(*args[1]).value);
String source_database = dynamic_cast<ASTIdentifier &>(*args[0]).name;
/// В InterpreterSelectQuery будет создан ExpressionAnalzyer, который при обработке запроса наткнется на этот Identifier.
/// Нам необходимо его пометить как имя базы данных, посколку по умолчанию стоит значение column
dynamic_cast<ASTIdentifier &>(*args[0]).kind = ASTIdentifier::Database;
return StorageMerge::create(ChooseName(), ChooseColumns(source_database, table_name_regexp, context), source_database, table_name_regexp, context);
}
private:
NamesAndTypesListPtr ChooseColumns(const String & source_database, const String & table_name_regexp_, Context & context)
{
OptimizedRegularExpression table_name_regexp(table_name_regexp_);
/// Список таблиц могут менять в другом потоке.
Poco::ScopedLock<Poco::Mutex> lock(context.getMutex());
context.assertDatabaseExists(source_database);
const Tables & tables = context.getDatabases().at(source_database);
for (Tables::const_iterator it = tables.begin(); it != tables.end(); ++it)
if (table_name_regexp.match(it->first))
return new NamesAndTypesList((it->second)->getColumnsList());
throw Exception("Error whyle creating table function merge. In database " + source_database + " no one matches regular expression: " + table_name_regexp_, ErrorCodes::UNKNOWN_TABLE);
}
};
}

View File

@ -0,0 +1,28 @@
#pragma once
#include <DB/TableFunctions/ITableFunction.h>
namespace DB
{
/*
* remote('address', db, table) - создаёт временный StorageDistributed.
* Чтобы получить структуру таблицы, делается запрос DESC TABLE на удалённый сервер.
* Например:
* SELECT count() FROM remote('example01-01-1', merge, hits) - пойти на example01-01-1, в БД merge, таблицу hits. *
*/
/// Пока не реализована.
class TableFunctionRemote: public ITableFunction
{
public:
std::string getName() const { return "remote"; }
StoragePtr execute(ASTPtr ast_function, Context & context)
{
return StoragePtr();
}
};
}

View File

@ -199,7 +199,7 @@ StoragePtr ExpressionAnalyzer::getTable()
{
if (const ASTSelectQuery * select = dynamic_cast<const ASTSelectQuery *>(&*ast))
{
if (select->table && !dynamic_cast<const ASTSelectQuery *>(&*select->table))
if (select->table && !dynamic_cast<const ASTSelectQuery *>(&*select->table) && !dynamic_cast<const ASTFunction *>(&*select->table))
{
String database = select->database ?
dynamic_cast<const ASTIdentifier &>(*select->database).name :

View File

@ -22,6 +22,8 @@
#include <DB/Interpreters/InterpreterSelectQuery.h>
#include <DB/Storages/StorageView.h>
#include <DB/TableFunctions/ITableFunction.h>
#include <DB/TableFunctions/TableFunctionFactory.h>
namespace DB
@ -35,9 +37,21 @@ void InterpreterSelectQuery::init(BlockInputStreamPtr input_)
throw Exception("Too deep subqueries. Maximum: " + toString(settings.limits.max_subquery_depth),
ErrorCodes::TOO_DEEP_SUBQUERIES);
context.setColumns(!query.table || !dynamic_cast<ASTSelectQuery *>(&*query.table)
? getTable()->getColumnsList()
: InterpreterSelectQuery(query.table, context).getSampleBlock().getColumnsList());
/// Если имееем дело с табличной функцией
if (query.table && dynamic_cast<ASTFunction *>(&*query.table))
{
/// Получить табличную функцию
TableFunctionPtr table_function_ptr = context.getTableFunctionFactory().get(dynamic_cast<ASTFunction *>(&*query.table)->name, context);
/// Выполнить ее и запомнить результат
table_function_storage = table_function_ptr->execute(query.table, context);
}
if (table_function_storage)
context.setColumns(table_function_storage->getColumnsList());
else
context.setColumns(!query.table || !dynamic_cast<ASTSelectQuery *>(&*query.table)
? getTable()->getColumnsList()
: InterpreterSelectQuery(query.table, context).getSampleBlock().getColumnsList());
if (context.getColumns().empty())
throw Exception("There are no available columns", ErrorCodes::THERE_IS_NO_COLUMN);
@ -378,7 +392,9 @@ QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns(BlockInpu
/// Список столбцов, которых нужно прочитать, чтобы выполнить запрос.
Names required_columns = query_analyzer->getRequiredColumns();
if (!query.table || !dynamic_cast<ASTSelectQuery *>(&*query.table))
if (table_function_storage)
table = table_function_storage; /// Если в запросе была указана табличная функция, данные читаем из нее.
else if (!query.table || !dynamic_cast<ASTSelectQuery *>(&*query.table))
table = getTable();
else if (dynamic_cast<ASTSelectQuery *>(&*query.table))
interpreter_subquery = new InterpreterSelectQuery(query.table, context, required_columns, QueryProcessingStage::Complete, subquery_depth + 1);

View File

@ -5,6 +5,7 @@
#include <DB/Parsers/ExpressionElementParsers.h>
#include <DB/Parsers/ExpressionListParsers.h>
#include <DB/Parsers/ParserSelectQuery.h>
#include <DB/Parsers/ParserCreateQuery.h>
namespace DB
@ -70,6 +71,8 @@ bool ParserSelectQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, String & ex
ParserString s_rparen(")");
ParserString s_dot(".");
ParserIdentifier ident;
ParserIdentifierWithOptionalParameters table_function;
Pos before = pos;
if (s_lparen.ignore(pos, end, expected))
{
@ -88,20 +91,30 @@ bool ParserSelectQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, String & ex
}
else if (ident.parse(pos, end, select_query->table, expected))
{
ws.ignore(pos, end);
if (s_dot.ignore(pos, end, expected))
/// Если сразу после identifier идет скобка, значит это должна быть табличная функция
if (s_lparen.ignore(pos, end, expected))
{
select_query->database = select_query->table;
if (!ident.parse(pos, end, select_query->table, expected))
pos = before;
if (!table_function.parse(pos, end, select_query->table, expected))
return false;
ws.ignore(pos, end);
}
else
{
ws.ignore(pos, end);
if (s_dot.ignore(pos, end, expected))
{
select_query->database = select_query->table;
if (!ident.parse(pos, end, select_query->table, expected))
return false;
if (select_query->database)
dynamic_cast<ASTIdentifier &>(*select_query->database).kind = ASTIdentifier::Database;
dynamic_cast<ASTIdentifier &>(*select_query->table).kind = ASTIdentifier::Table;
ws.ignore(pos, end);
}
if (select_query->database)
dynamic_cast<ASTIdentifier &>(*select_query->database).kind = ASTIdentifier::Database;
dynamic_cast<ASTIdentifier &>(*select_query->table).kind = ASTIdentifier::Table;
}
}
else
return false;

View File

@ -0,0 +1,24 @@
#include <boost/assign/list_inserter.hpp>
#include <DB/TableFunctions/TableFunctionMerge.h>
#include <DB/TableFunctions/TableFunctionRemote.h>
#include <DB/TableFunctions/TableFunctionFactory.h>
namespace DB
{
TableFunctionPtr TableFunctionFactory::get(
const String & name,
const Context & context) const
{
/// Немного неоптимально.
if (name == "merge") return new TableFunctionMerge;
else if (name == "remote") return new TableFunctionRemote;
else
throw Exception("Unknown function " + name, ErrorCodes::UNKNOWN_FUNCTION);
}
}