write table tructure for table function remote(...)

This commit is contained in:
Alexander Tokmakov 2020-08-19 16:34:38 +03:00
parent 318f14b95e
commit 969940b4c9
11 changed files with 166 additions and 70 deletions

View File

@ -54,9 +54,13 @@ std::pair<String, StoragePtr> createTableFromAST(
if (ast_create_query.as_table_function)
{
const auto & table_function = ast_create_query.as_table_function->as<ASTFunction &>();
const auto & ast_table_function = ast_create_query.as_table_function->as<ASTFunction &>();
const auto & factory = TableFunctionFactory::instance();
StoragePtr storage = factory.get(table_function.name, context)->execute(ast_create_query.as_table_function, context, ast_create_query.table);
auto table_function = factory.get(ast_table_function.name, context);
ColumnsDescription columns;
if (ast_create_query.columns_list && ast_create_query.columns_list->columns)
columns = InterpreterCreateQuery::getColumnsDescription(*ast_create_query.columns_list->columns, context, false);
StoragePtr storage = table_function->execute(ast_create_query.as_table_function, context, ast_create_query.table, std::move(columns));
storage->renameInMemory(ast_create_query);
return {ast_create_query.table, storage};
}

View File

@ -456,6 +456,9 @@ InterpreterCreateQuery::TableProperties InterpreterCreateQuery::setProperties(AS
if (create.columns_list)
{
if (create.as_table_function && (create.columns_list->indices || create.columns_list->constraints))
throw Exception("Indexes and constraints are not supported for table functions", ErrorCodes::INCORRECT_QUERY);
if (create.columns_list->columns)
{
bool sanity_check_compression_codecs = !create.attach && !context.getSettingsRef().allow_suspicious_codecs;
@ -492,7 +495,13 @@ InterpreterCreateQuery::TableProperties InterpreterCreateQuery::setProperties(AS
properties.columns = ColumnsDescription(as_select_sample.getNamesAndTypesList());
}
else if (create.as_table_function)
return {};
{
/// Table function without columns list.
auto table_function = TableFunctionFactory::instance().get(create.as_table_function->as<ASTFunction &>().name, context);
properties.columns = table_function->getActualTableStructure(create.as_table_function, context);
if (properties.columns.empty()) //FIXME
return {};
}
else
throw Exception("Incorrect CREATE query: required list of column descriptions or AS section or SELECT.", ErrorCodes::INCORRECT_QUERY);
@ -761,7 +770,7 @@ bool InterpreterCreateQuery::doCreateTable(ASTCreateQuery & create,
{
const auto & table_function = create.as_table_function->as<ASTFunction &>();
const auto & factory = TableFunctionFactory::instance();
res = factory.get(table_function.name, context)->execute(create.as_table_function, context, create.table);
res = factory.get(table_function.name, context)->execute(create.as_table_function, context, create.table, properties.columns);
res->renameInMemory({create.database, create.table, create.uuid});
}
else

View File

@ -282,13 +282,23 @@ void ASTCreateQuery::formatQueryImpl(const FormatSettings & settings, FormatStat
if (as_table_function)
{
if (columns_list)
{
frame.expression_list_always_start_on_new_line = true;
settings.ostr << (settings.one_line ? " (" : "\n(");
FormatStateStacked frame_nested = frame;
columns_list->formatImpl(settings, state, frame_nested);
settings.ostr << (settings.one_line ? ")" : "\n)");
frame.expression_list_always_start_on_new_line = false;
}
settings.ostr << (settings.hilite ? hilite_keyword : "") << " AS " << (settings.hilite ? hilite_none : "");
as_table_function->formatImpl(settings, state, frame);
}
frame.expression_list_always_start_on_new_line = true;
if (columns_list)
if (columns_list && !as_table_function)
{
settings.ostr << (settings.one_line ? " (" : "\n(");
FormatStateStacked frame_nested = frame;

View File

@ -416,7 +416,12 @@ bool ParserCreateTableQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expe
return false;
if (!storage_p.parse(pos, storage, expected) && !is_temporary)
return false;
{
if (!s_as.ignore(pos, expected))
return false;
if (!table_function_p.parse(pos, as_table_function, expected))
return false;
}
}
else
{

View File

@ -292,12 +292,14 @@ StorageDistributed::StorageDistributed(
const ASTPtr & sharding_key_,
const String & storage_policy_name_,
const String & relative_data_path_,
bool attach_)
bool attach_,
ClusterPtr owned_cluster_)
: IStorage(id_)
, remote_database(remote_database_)
, remote_table(remote_table_)
, global_context(std::make_unique<Context>(context_))
, log(&Poco::Logger::get("StorageDistributed (" + id_.table_name + ")"))
, owned_cluster(std::move(owned_cluster_))
, cluster_name(global_context->getMacros()->expand(cluster_name_))
, has_sharding_key(sharding_key_)
, relative_data_path(relative_data_path_)
@ -341,40 +343,14 @@ StorageDistributed::StorageDistributed(
const ASTPtr & sharding_key_,
const String & storage_policy_name_,
const String & relative_data_path_,
bool attach)
: StorageDistributed(id_, columns_, constraints_, String{}, String{}, cluster_name_, context_, sharding_key_, storage_policy_name_, relative_data_path_, attach)
bool attach,
ClusterPtr owned_cluster_)
: StorageDistributed(id_, columns_, constraints_, String{}, String{}, cluster_name_, context_, sharding_key_, storage_policy_name_, relative_data_path_, attach, std::move(owned_cluster_))
{
remote_table_function_ptr = std::move(remote_table_function_ptr_);
}
StoragePtr StorageDistributed::createWithOwnCluster(
const StorageID & table_id_,
const ColumnsDescription & columns_,
const String & remote_database_, /// database on remote servers.
const String & remote_table_, /// The name of the table on the remote servers.
ClusterPtr owned_cluster_,
const Context & context_)
{
auto res = create(table_id_, columns_, ConstraintsDescription{}, remote_database_, remote_table_, String{}, context_, ASTPtr(), String(), String(), false);
res->owned_cluster = std::move(owned_cluster_);
return res;
}
StoragePtr StorageDistributed::createWithOwnCluster(
const StorageID & table_id_,
const ColumnsDescription & columns_,
ASTPtr & remote_table_function_ptr_,
ClusterPtr & owned_cluster_,
const Context & context_)
{
auto res = create(table_id_, columns_, ConstraintsDescription{}, remote_table_function_ptr_, String{}, context_, ASTPtr(), String(), String(), false);
res->owned_cluster = owned_cluster_;
return res;
}
bool StorageDistributed::canForceGroupByNoMerge(const Context &context, QueryProcessingStage::Enum to_stage, const ASTPtr & query_ptr) const
{
const auto & settings = context.getSettingsRef();

View File

@ -42,21 +42,6 @@ class StorageDistributed final : public ext::shared_ptr_helper<StorageDistribute
public:
~StorageDistributed() override;
static StoragePtr createWithOwnCluster(
const StorageID & table_id_,
const ColumnsDescription & columns_,
const String & remote_database_, /// database on remote servers.
const String & remote_table_, /// The name of the table on the remote servers.
ClusterPtr owned_cluster_,
const Context & context_);
static StoragePtr createWithOwnCluster(
const StorageID & table_id_,
const ColumnsDescription & columns_,
ASTPtr & remote_table_function_ptr_, /// Table function ptr.
ClusterPtr & owned_cluster_,
const Context & context_);
std::string getName() const override { return "Distributed"; }
bool supportsSampling() const override { return true; }
@ -165,7 +150,8 @@ protected:
const ASTPtr & sharding_key_,
const String & storage_policy_name_,
const String & relative_data_path_,
bool attach_);
bool attach_,
ClusterPtr owned_cluster_ = {});
StorageDistributed(
const StorageID & id_,
@ -177,7 +163,8 @@ protected:
const ASTPtr & sharding_key_,
const String & storage_policy_name_,
const String & relative_data_path_,
bool attach);
bool attach,
ClusterPtr owned_cluster_ = {});
String relative_data_path;

View File

@ -0,0 +1,60 @@
#pragma once
#include <Storages/IStorage.h>
#include <TableFunctions/ITableFunction.h>
namespace DB
{
template<typename StorageT>
class StorageTableFunction : public StorageT
{
public:
using GetStructureFunc = std::function<ColumnsDescription()>;
template<typename... StorageArgs>
StorageTableFunction(GetStructureFunc get_structure_, StorageArgs && ... args)
: StorageT(std::forward<StorageArgs>(args)...), get_structure(std::move(get_structure_))
{
}
Pipe read(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
unsigned num_streams)
{
assertBlocksHaveEqualStructure();
return StorageT::read(column_names, metadata_snapshot, query_info, context, processed_stage, max_block_size, num_streams);
}
BlockOutputStreamPtr write(
const ASTPtr & query,
const StorageMetadataPtr & metadata_snapshot,
const Context & context)
{
assertBlocksHaveEqualStructure();
return StorageT::write(query, metadata_snapshot, context);
}
private:
void assertSourceStructure()
{
if (!get_structure)
return;
StorageInMemoryMetadata source_metadata;
source_metadata.setColumns(get_structure());
actual_source_structure = source_metadata.getSampleBlock();
assertBlocksHaveEqualStructure(getInMemoryMetadataPtr()->getSampleBlock(), actual_source_structure);
get_structure = {};
}
GetStructureFunc get_structure;
Block actual_source_structure;
};
}

View File

@ -13,10 +13,11 @@ namespace ProfileEvents
namespace DB
{
StoragePtr ITableFunction::execute(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const
StoragePtr ITableFunction::execute(const ASTPtr & ast_function, const Context & context, const std::string & table_name, ColumnsDescription cached_columns_) const
{
ProfileEvents::increment(ProfileEvents::TableFunctionExecute);
context.checkAccess(AccessType::CREATE_TEMPORARY_TABLE | StorageFactory::instance().getSourceAccessType(getStorageTypeName()));
cached_columns = std::move(cached_columns_);
return executeImpl(ast_function, context, table_name);
}

View File

@ -2,6 +2,7 @@
#include <Parsers/IAST_fwd.h>
#include <Storages/IStorage_fwd.h>
#include <Storages/ColumnsDescription.h>
#include <memory>
#include <string>
@ -31,14 +32,19 @@ public:
/// Get the main function name.
virtual std::string getName() const = 0;
virtual ColumnsDescription getActualTableStructure(const ASTPtr & /*ast_function*/, const Context & /*context*/) { return {}; }
/// Create storage according to the query.
StoragePtr execute(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const;
StoragePtr execute(const ASTPtr & ast_function, const Context & context, const std::string & table_name, ColumnsDescription cached_columns_ = {}) const;
virtual ~ITableFunction() {}
private:
virtual StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const = 0;
virtual const char * getStorageTypeName() const = 0;
protected:
mutable ColumnsDescription cached_columns;
};
using TableFunctionPtr = std::shared_ptr<ITableFunction>;

View File

@ -27,8 +27,12 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
}
StoragePtr TableFunctionRemote::executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const
void TableFunctionRemote::prepareClusterInfo(const ASTPtr & ast_function, const Context & context) const
{
if (cluster)
return;
ASTs & args_func = ast_function->children;
if (args_func.size() != 1)
@ -44,7 +48,7 @@ StoragePtr TableFunctionRemote::executeImpl(const ASTPtr & ast_function, const C
String cluster_description;
String remote_database;
String remote_table;
ASTPtr remote_table_function_ptr;
//ASTPtr remote_table_function_ptr;
String username;
String password;
@ -136,7 +140,7 @@ StoragePtr TableFunctionRemote::executeImpl(const ASTPtr & ast_function, const C
for (auto ast : args)
setIdentifierSpecial(ast);
ClusterPtr cluster;
//ClusterPtr cluster;
if (!cluster_name.empty())
{
/// Use an existing cluster from the main config
@ -189,30 +193,54 @@ StoragePtr TableFunctionRemote::executeImpl(const ASTPtr & ast_function, const C
if (!remote_table_function_ptr && remote_table.empty())
throw Exception("The name of remote table cannot be empty", ErrorCodes::BAD_ARGUMENTS);
auto remote_table_id = StorageID::createEmpty();
//auto remote_table_id = StorageID::createEmpty();
remote_table_id.database_name = remote_database;
remote_table_id.table_name = remote_table;
auto structure_remote_table = getStructureOfRemoteTable(*cluster, remote_table_id, context, remote_table_function_ptr);
}
StoragePtr TableFunctionRemote::executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const
{
prepareClusterInfo(ast_function, context);
if (cached_columns.empty())
cached_columns = getStructureOfRemoteTable(*cluster, remote_table_id, context, remote_table_function_ptr);
//auto structure_remote_table = getStructureOfRemoteTable(*cluster, remote_table_id, context, remote_table_function_ptr);
StoragePtr res = remote_table_function_ptr
? StorageDistributed::createWithOwnCluster(
? StorageDistributed::create(
StorageID(getDatabaseName(), table_name),
structure_remote_table,
cached_columns,
ConstraintsDescription{},
remote_table_function_ptr,
cluster,
context)
: StorageDistributed::createWithOwnCluster(
String{},
context,
ASTPtr{},
String{},
String{},
false,
cluster)
: StorageDistributed::create(
StorageID(getDatabaseName(), table_name),
structure_remote_table,
remote_database,
remote_table,
cluster,
context);
cached_columns,
ConstraintsDescription{},
remote_table_id.database_name,
remote_table_id.table_name,
String{},
context,
ASTPtr{},
String{},
String{},
false,
cluster);
res->startup();
return res;
}
ColumnsDescription TableFunctionRemote::getActualTableStructure(const ASTPtr & ast_function, const Context & context)
{
prepareClusterInfo(ast_function, context);
return getStructureOfRemoteTable(*cluster, remote_table_id, context, remote_table_function_ptr);
}
TableFunctionRemote::TableFunctionRemote(const std::string & name_, bool secure_)
: name{name_}, secure{secure_}

View File

@ -1,6 +1,8 @@
#pragma once
#include <TableFunctions/ITableFunction.h>
#include <Interpreters/Cluster.h>
#include <Interpreters/StorageID.h>
namespace DB
@ -20,14 +22,22 @@ public:
std::string getName() const override { return name; }
ColumnsDescription getActualTableStructure(const ASTPtr & ast_function, const Context & context) override;
private:
StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const override;
const char * getStorageTypeName() const override { return "Distributed"; }
void prepareClusterInfo(const ASTPtr & ast_function, const Context & context) const;
std::string name;
bool is_cluster_function;
std::string help_message;
bool secure;
mutable ClusterPtr cluster;
mutable StorageID remote_table_id = StorageID::createEmpty();
mutable ASTPtr remote_table_function_ptr;
};
}