#include #if USE_HDFS #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "registerTableFunctions.h" #include #include namespace DB { namespace ErrorCodes { extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; } void TableFunctionHDFSCluster::parseArguments(const ASTPtr & ast_function, ContextPtr context) { /// Parse args ASTs & args_func = ast_function->children; if (args_func.size() != 1) throw Exception("Table function '" + getName() + "' must have arguments.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); ASTs & args = args_func.at(0)->children; const auto message = fmt::format( "The signature of table function {} shall be the following:\n" \ " - cluster, uri, format, structure", " - cluster, uri, format, structure, compression_method", getName()); if (args.size() < 4 || args.size() > 5) throw Exception(message, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); for (auto & arg : args) arg = evaluateConstantExpressionOrIdentifierAsLiteral(arg, context); /// This arguments are always the first cluster_name = args[0]->as().value.safeGet(); uri = args[1]->as().value.safeGet(); format = args[2]->as().value.safeGet(); structure = args[3]->as().value.safeGet(); if (args.size() >= 5) compression_method = args[4]->as().value.safeGet(); } ColumnsDescription TableFunctionHDFSCluster::getActualTableStructure(ContextPtr context) const { return parseColumnsListFromString(structure, context); } StoragePtr TableFunctionHDFSCluster::executeImpl( const ASTPtr & /*function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/) const { StoragePtr storage; if (context->getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY) { /// On worker node this uri won't contains globs storage = StorageHDFS::create( uri, StorageID(getDatabaseName(), table_name), format, getActualTableStructure(context), ConstraintsDescription{}, String{}, context, compression_method, /*distributed_processing=*/true, nullptr); } else { storage = StorageHDFSCluster::create( cluster_name, uri, StorageID(getDatabaseName(), table_name), format, getActualTableStructure(context), ConstraintsDescription{}, compression_method); } storage->startup(); return storage; } void registerTableFunctionHDFSCluster(TableFunctionFactory & factory) { factory.registerFunction(); } } #endif