#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace DB { namespace ErrorCodes { extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; extern const int DATABASE_ACCESS_DENIED; } StoragePtr TableFunctionFile::executeImpl(const ASTPtr & ast_function, const Context & context) const { // Parse args ASTs & args_func = typeid_cast(*ast_function).children; if (args_func.size() != 1) throw Exception("Table function 'file' must have arguments.", ErrorCodes::LOGICAL_ERROR); ASTs & args = typeid_cast(*args_func.at(0)).children; if (args.size() != 3 && args.size() != 4) throw Exception("Table function 'file' requires exactly 3 or 4 arguments: path, format, structure and useStorageMemory.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); for (size_t i = 0; i < 3; ++i) args[i] = evaluateConstantExpressionOrIdentifierAsLiteral(args[i], context); std::string path = static_cast(*args[0]).value.safeGet(); std::string format = static_cast(*args[1]).value.safeGet(); std::string structure = static_cast(*args[2]).value.safeGet(); uint8_t useStorageMemory = 0; if (args.size() == 4) useStorageMemory = static_cast(*args[2]).value.safeGet(); std::string db_data_path = context.getPath() + "data/" + escapeForFileName(context.getCurrentDatabase()); Poco::Path poco_path = Poco::Path(path); if (poco_path.isRelative()) poco_path = Poco::Path(db_data_path, poco_path); std::string absolute_path = poco_path.absolute().toString(); // Create sample block std::vector structure_vals; boost::split(structure_vals, structure, boost::algorithm::is_any_of(" ,"), boost::algorithm::token_compress_on); if (structure_vals.size() & 1) throw Exception("Odd number of attributes in section structure", ErrorCodes::LOGICAL_ERROR); Block sample_block = Block(); const DataTypeFactory & data_type_factory = DataTypeFactory::instance(); for (size_t i = 0; i < structure_vals.size(); i += 2) { ColumnWithTypeAndName column; column.name = structure_vals[i]; column.type = data_type_factory.get(structure_vals[i + 1]); column.column = column.type->createColumn(); sample_block.insert(std::move(column)); } // Create table ColumnsDescription columns = ColumnsDescription{sample_block.getNamesAndTypesList()}; StoragePtr storage; if (useStorageMemory) { // Validate path if (!startsWith(absolute_path, db_data_path)) throw Exception("Part path " + absolute_path + " is not inside " + db_data_path, ErrorCodes::DATABASE_ACCESS_DENIED); // Create Storage Memory storage = StorageMemory::create(getName(), columns); storage->startup(); BlockOutputStreamPtr output = storage->write(ASTPtr(), context.getSettingsRef()); // Write data std::unique_ptr read_buffer = std::make_unique(absolute_path); BlockInputStreamPtr data = std::make_shared(context.getInputFormat( format, *read_buffer, sample_block, DEFAULT_BLOCK_SIZE)); data->readPrefix(); output->writePrefix(); while(Block block = data->read()) output->write(block); data->readSuffix(); output->writeSuffix(); } else { Context var_context = context; storage = StorageFile::create(absolute_path, -1, db_data_path, getName(), format, columns, var_context); storage->startup(); } return storage; } void registerTableFunctionFile(TableFunctionFactory & factory) { factory.registerFunction(); } }