CREATE TABLE AS table_function()

This commit is contained in:
Dmitry Rubashkin 2019-07-18 21:29:49 +03:00
parent dcd8696466
commit 7382cb41fa
30 changed files with 105 additions and 62 deletions

View File

@ -9,6 +9,7 @@
#include <Storages/IStorage.h>
#include <Storages/StorageFactory.h>
#include <Common/typeid_cast.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <sstream>
@ -68,6 +69,13 @@ std::pair<String, StoragePtr> createTableFromDefinition(
ast_create_query.attach = true;
ast_create_query.database = database_name;
if (ast_create_query.as_table_function)
{
const auto * table_function = ast_create_query.as_table_function->as<ASTFunction>();
const auto & factory = TableFunctionFactory::instance();
StoragePtr storage = factory.get(table_function->name, context)->execute(ast_create_query.as_table_function, context, ast_create_query.table);
return {ast_create_query.table, storage};
}
/// We do not directly use `InterpreterCreateQuery::execute`, because
/// - the database has not been created yet;
/// - the code is simpler, since the query is already brought to a suitable form.

View File

@ -99,7 +99,8 @@ void SelectStreamFactory::createForShard(
if (table_func_ptr)
{
const auto * table_function = table_func_ptr->as<ASTFunction>();
main_table_storage = TableFunctionFactory::instance().get(table_function->name, context)->execute(table_func_ptr, context);
TableFunctionPtr table_function_ptr = TableFunctionFactory::instance().get(table_function->name, context);
main_table_storage = table_function_ptr->execute(table_func_ptr, context, table_function_ptr->getName());
}
else
main_table_storage = context.tryGetTable(main_table.database, main_table.table);

View File

@ -963,7 +963,7 @@ StoragePtr Context::executeTableFunction(const ASTPtr & table_expression)
TableFunctionPtr table_function_ptr = TableFunctionFactory::instance().get(table_expression->as<ASTFunction>()->name, *this);
/// Run it and remember the result
res = table_function_ptr->execute(table_expression, *this);
res = table_function_ptr->execute(table_expression, *this, table_function_ptr->getName());
}
return res;

View File

@ -46,6 +46,8 @@
#include <Interpreters/InterpreterDropQuery.h>
#include <Interpreters/addTypeConversionToAST.h>
#include <TableFunctions/TableFunctionFactory.h>
namespace DB
{
@ -389,6 +391,11 @@ ColumnsDescription InterpreterCreateQuery::setColumns(
columns = as_storage->getColumns();
indices = as_storage->getIndices();
}
else if (create.as_table_function)
{
columns = as_storage->getColumns();
indices = as_storage->getIndices();
}
else if (create.select)
{
columns = ColumnsDescription(as_select_sample.getNamesAndTypesList());
@ -518,6 +525,13 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
StoragePtr as_storage;
TableStructureReadLockHolder as_storage_lock;
if (create.as_table_function)
{
const auto * table_function = create.as_table_function->as<ASTFunction>();
const auto & factory = TableFunctionFactory::instance();
as_storage = factory.get(table_function->name, context)->execute(create.as_table_function, context, create.table);
}
if (!as_table_name.empty())
{
as_storage = context.getTable(as_database_name, as_table_name);
@ -585,6 +599,10 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
else if (context.tryGetExternalTable(table_name) && create.if_not_exists)
return {};
if (create.as_table_function)
res = as_storage;
else
{
res = StorageFactory::instance().get(create,
data_path,
table_name,
@ -594,6 +612,7 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
columns,
create.attach,
false);
}
if (create.temporary)
context.getSessionContext().addExternalTable(table_name, res, query_ptr);

View File

@ -79,7 +79,7 @@ BlockInputStreamPtr InterpreterDescribeQuery::executeImpl()
const auto & table_function = table_expression.table_function->as<ASTFunction &>();
TableFunctionPtr table_function_ptr = TableFunctionFactory::instance().get(table_function.name, context);
/// Run the table function and remember the result
table = table_function_ptr->execute(table_expression.table_function, context);
table = table_function_ptr->execute(table_expression.table_function, context, table_function_ptr->getName());
}
else
{

View File

@ -48,7 +48,8 @@ StoragePtr InterpreterInsertQuery::getTable(const ASTInsertQuery & query)
{
const auto * table_function = query.table_function->as<ASTFunction>();
const auto & factory = TableFunctionFactory::instance();
return factory.get(table_function->name, context)->execute(query.table_function, context);
TableFunctionPtr table_function_ptr = factory.get(table_function->name, context);
return table_function_ptr->execute(query.table_function, context, table_function_ptr->getName());
}
/// Into what table to write.

View File

@ -216,7 +216,11 @@ void ASTCreateQuery::formatQueryImpl(const FormatSettings & settings, FormatStat
<< (!database.empty() ? backQuoteIfNeed(database) + "." : "") << backQuoteIfNeed(table);
formatOnCluster(settings);
}
if (as_table_function)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << " AS " << (settings.hilite ? hilite_none : "");
as_table_function->formatImpl(settings, state, frame);
}
if (!to_table.empty())
{
settings.ostr
@ -231,7 +235,7 @@ void ASTCreateQuery::formatQueryImpl(const FormatSettings & settings, FormatStat
<< (!as_database.empty() ? backQuoteIfNeed(as_database) + "." : "") << backQuoteIfNeed(as_table);
}
if (columns_list)
if (columns_list && !as_table_function)
{
settings.ostr << (settings.one_line ? " (" : "\n(");
FormatStateStacked frame_nested = frame;

View File

@ -63,6 +63,7 @@ public:
ASTStorage * storage = nullptr;
String as_database;
String as_table;
ASTPtr as_table_function;
ASTSelectWithUnionQuery * select = nullptr;
/** Get the text that identifies this element. */

View File

@ -319,6 +319,7 @@ bool ParserCreateQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
ParserIdentifier name_p;
ParserColumnsOrIndicesDeclarationList columns_or_indices_p;
ParserSelectWithUnionQuery select_p;
ParserFunction table_function_p;
ASTPtr database;
ASTPtr table;
@ -328,6 +329,7 @@ bool ParserCreateQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
ASTPtr storage;
ASTPtr as_database;
ASTPtr as_table;
ASTPtr as_table_function;
ASTPtr select;
String cluster_str;
bool attach = false;
@ -407,6 +409,8 @@ bool ParserCreateQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
if (!s_as.ignore(pos, expected))
return false;
if (!table_function_p.parse(pos, as_table_function, expected))
{
if (!select_p.parse(pos, select, expected)) /// AS SELECT ...
{
/// AS [db.]table
@ -426,6 +430,7 @@ bool ParserCreateQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
}
}
}
}
else if (is_temporary)
return false;
else if (s_database.ignore(pos, expected))
@ -526,6 +531,9 @@ bool ParserCreateQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
auto query = std::make_shared<ASTCreateQuery>();
node = query;
if (as_table_function)
query->as_table_function = as_table_function;
query->attach = attach;
query->if_not_exists = if_not_exists;
query->is_view = is_view;

View File

@ -40,7 +40,8 @@ ColumnsDescription getStructureOfRemoteTable(
if (shard_info.isLocal())
{
const auto * table_function = table_func_ptr->as<ASTFunction>();
return TableFunctionFactory::instance().get(table_function->name, context)->execute(table_func_ptr, context)->getColumns();
TableFunctionPtr table_function_ptr = TableFunctionFactory::instance().get(table_function->name, context);
return table_function_ptr->execute(table_func_ptr, context, table_function_ptr->getName())->getColumns();
}
auto table_func_name = queryToString(table_func_ptr);

View File

@ -10,10 +10,10 @@ namespace ProfileEvents
namespace DB
{
StoragePtr ITableFunction::execute(const ASTPtr & ast_function, const Context & context) const
StoragePtr ITableFunction::execute(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const
{
ProfileEvents::increment(ProfileEvents::TableFunctionExecute);
return executeImpl(ast_function, context);
return executeImpl(ast_function, context, table_name);
}
}

View File

@ -30,12 +30,12 @@ public:
virtual std::string getName() const = 0;
/// Create storage according to the query.
StoragePtr execute(const ASTPtr & ast_function, const Context & context) const;
StoragePtr execute(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const;
virtual ~ITableFunction() {}
private:
virtual StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context) const = 0;
virtual StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const = 0;
};
using TableFunctionPtr = std::shared_ptr<ITableFunction>;

View File

@ -19,7 +19,7 @@ namespace ErrorCodes
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
}
StoragePtr ITableFunctionFileLike::executeImpl(const ASTPtr & ast_function, const Context & context) const
StoragePtr ITableFunctionFileLike::executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const
{
// Parse args
ASTs & args_func = ast_function->children;
@ -60,7 +60,7 @@ StoragePtr ITableFunctionFileLike::executeImpl(const ASTPtr & ast_function, cons
}
// Create table
StoragePtr storage = getStorage(filename, format, sample_block, const_cast<Context &>(context));
StoragePtr storage = getStorage(filename, format, sample_block, const_cast<Context &>(context), table_name);
storage->startup();

View File

@ -12,8 +12,8 @@ namespace DB
class ITableFunctionFileLike : public ITableFunction
{
private:
StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context) const override;
StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const override;
virtual StoragePtr getStorage(
const String & source, const String & format, const Block & sample_block, Context & global_context) const = 0;
const String & source, const String & format, const Block & sample_block, Context & global_context, const std::string & table_name) const = 0;
};
}

View File

@ -27,7 +27,7 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
StoragePtr ITableFunctionXDBC::executeImpl(const ASTPtr & ast_function, const Context & context) const
StoragePtr ITableFunctionXDBC::executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const
{
const auto & args_func = ast_function->as<ASTFunction &>();
@ -45,18 +45,18 @@ StoragePtr ITableFunctionXDBC::executeImpl(const ASTPtr & ast_function, const Co
std::string connection_string;
std::string schema_name;
std::string table_name;
//std::string table_name;
if (args.size() == 3)
{
connection_string = args[0]->as<ASTLiteral &>().value.safeGet<String>();
schema_name = args[1]->as<ASTLiteral &>().value.safeGet<String>();
table_name = args[2]->as<ASTLiteral &>().value.safeGet<String>();
//table_name = args[2]->as<ASTLiteral &>().value.safeGet<String>();
}
else if (args.size() == 2)
{
connection_string = args[0]->as<ASTLiteral &>().value.safeGet<String>();
table_name = args[1]->as<ASTLiteral &>().value.safeGet<String>();
//table_name = args[1]->as<ASTLiteral &>().value.safeGet<String>();
}
/* Infer external table structure */

View File

@ -15,7 +15,7 @@ namespace DB
class ITableFunctionXDBC : public ITableFunction
{
private:
StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context) const override;
StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const override;
/* A factory method to create bridge helper, that will assist in remote interaction */
virtual BridgeHelperPtr createBridgeHelper(Context & context,

View File

@ -5,12 +5,12 @@
namespace DB
{
StoragePtr TableFunctionFile::getStorage(
const String & source, const String & format, const Block & sample_block, Context & global_context) const
const String & source, const String & format, const Block & sample_block, Context & global_context, const std::string & table_name) const
{
return StorageFile::create(source,
-1,
global_context.getUserFilesPath(),
getName(),
table_name,
format,
ColumnsDescription{sample_block.getNamesAndTypesList()},
global_context);

View File

@ -24,6 +24,6 @@ public:
private:
StoragePtr getStorage(
const String & source, const String & format, const Block & sample_block, Context & global_context) const override;
const String & source, const String & format, const Block & sample_block, Context & global_context, const std::string & table_name) const override;
};
}

View File

@ -8,10 +8,10 @@
namespace DB
{
StoragePtr TableFunctionHDFS::getStorage(
const String & source, const String & format, const Block & sample_block, Context & global_context) const
const String & source, const String & format, const Block & sample_block, Context & global_context, const std::string & table_name) const
{
return StorageHDFS::create(source,
getName(),
table_name,
format,
ColumnsDescription{sample_block.getNamesAndTypesList()},
global_context);

View File

@ -25,7 +25,7 @@ public:
private:
StoragePtr getStorage(
const String & source, const String & format, const Block & sample_block, Context & global_context) const override;
const String & source, const String & format, const Block & sample_block, Context & global_context, const std::string & table_name) const override;
};
}

View File

@ -47,7 +47,7 @@ static NamesAndTypesList chooseColumns(const String & source_database, const Str
}
StoragePtr TableFunctionMerge::executeImpl(const ASTPtr & ast_function, const Context & context) const
StoragePtr TableFunctionMerge::executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const
{
ASTs & args_func = ast_function->children;
@ -70,7 +70,7 @@ StoragePtr TableFunctionMerge::executeImpl(const ASTPtr & ast_function, const Co
String table_name_regexp = args[1]->as<ASTLiteral &>().value.safeGet<String>();
auto res = StorageMerge::create(
getName(),
table_name,
ColumnsDescription{chooseColumns(source_database, table_name_regexp, context)},
source_database,
table_name_regexp,

View File

@ -16,7 +16,7 @@ public:
static constexpr auto name = "merge";
std::string getName() const override { return name; }
private:
StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context) const override;
StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const override;
};

View File

@ -37,7 +37,7 @@ namespace ErrorCodes
}
StoragePtr TableFunctionMySQL::executeImpl(const ASTPtr & ast_function, const Context & context) const
StoragePtr TableFunctionMySQL::executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const
{
const auto & args_func = ast_function->as<ASTFunction &>();
@ -55,7 +55,7 @@ StoragePtr TableFunctionMySQL::executeImpl(const ASTPtr & ast_function, const Co
std::string host_port = args[0]->as<ASTLiteral &>().value.safeGet<String>();
std::string database_name = args[1]->as<ASTLiteral &>().value.safeGet<String>();
std::string table_name = args[2]->as<ASTLiteral &>().value.safeGet<String>();
// std::string table_name = args[2]->as<ASTLiteral &>().value.safeGet<String>();
std::string user_name = args[3]->as<ASTLiteral &>().value.safeGet<String>();
std::string password = args[4]->as<ASTLiteral &>().value.safeGet<String>();

View File

@ -19,7 +19,7 @@ public:
return name;
}
private:
StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context) const override;
StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const override;
};
}

View File

@ -17,7 +17,7 @@ namespace ErrorCodes
}
StoragePtr TableFunctionNumbers::executeImpl(const ASTPtr & ast_function, const Context & context) const
StoragePtr TableFunctionNumbers::executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const
{
if (const auto * function = ast_function->as<ASTFunction>())
{
@ -30,7 +30,7 @@ StoragePtr TableFunctionNumbers::executeImpl(const ASTPtr & ast_function, const
UInt64 offset = arguments.size() == 2 ? evaluateArgument(context, arguments[0]) : 0;
UInt64 length = arguments.size() == 2 ? evaluateArgument(context, arguments[1]) : evaluateArgument(context, arguments[0]);
auto res = StorageSystemNumbers::create(getName(), false, length, offset);
auto res = StorageSystemNumbers::create(table_name, false, length, offset);
res->startup();
return res;
}

View File

@ -17,7 +17,7 @@ public:
static constexpr auto name = "numbers";
std::string getName() const override { return name; }
private:
StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context) const override;
StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const override;
UInt64 evaluateArgument(const Context & context, ASTPtr & argument) const;
};

View File

@ -25,7 +25,7 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
}
StoragePtr TableFunctionRemote::executeImpl(const ASTPtr & ast_function, const Context & context) const
StoragePtr TableFunctionRemote::executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const
{
ASTs & args_func = ast_function->children;
@ -162,13 +162,13 @@ StoragePtr TableFunctionRemote::executeImpl(const ASTPtr & ast_function, const C
StoragePtr res = remote_table_function_ptr
? StorageDistributed::createWithOwnCluster(
getName(),
table_name,
structure_remote_table,
remote_table_function_ptr,
cluster,
context)
: StorageDistributed::createWithOwnCluster(
getName(),
table_name,
structure_remote_table,
remote_database,
remote_table,

View File

@ -21,7 +21,7 @@ public:
std::string getName() const override { return name; }
private:
StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context) const override;
StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const override;
std::string name;
bool is_cluster_function;

View File

@ -6,10 +6,10 @@
namespace DB
{
StoragePtr TableFunctionURL::getStorage(
const String & source, const String & format, const Block & sample_block, Context & global_context) const
const String & source, const String & format, const Block & sample_block, Context & global_context, const std::string & table_name) const
{
Poco::URI uri(source);
return StorageURL::create(uri, getName(), format, ColumnsDescription{sample_block.getNamesAndTypesList()}, global_context);
return StorageURL::create(uri, table_name, format, ColumnsDescription{sample_block.getNamesAndTypesList()}, global_context);
}
void registerTableFunctionURL(TableFunctionFactory & factory)

View File

@ -20,6 +20,6 @@ public:
private:
StoragePtr getStorage(
const String & source, const String & format, const Block & sample_block, Context & global_context) const override;
const String & source, const String & format, const Block & sample_block, Context & global_context, const std::string & table_name) const override;
};
}