Add table function format(format_name, data)

This commit is contained in:
avogar 2022-01-28 19:27:53 +03:00
parent 107246e214
commit 89e471924c
7 changed files with 264 additions and 2 deletions

View File

@ -0,0 +1,107 @@
#include <Formats/ReadSchemaUtils.h>
#include <IO/ReadBufferFromString.h>
#include <Interpreters/Context.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Parsers/ASTLiteral.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/Formats/IInputFormat.h>
#include <QueryPipeline/Pipe.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Storages/StorageValues.h>
#include <TableFunctions/TableFunctionFormat.h>
#include <TableFunctions/TableFunctionFactory.h>
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int LOGICAL_ERROR;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
}
void TableFunctionFormat::parseArguments(const ASTPtr & ast_function, ContextPtr context)
{
ASTs & args_func = ast_function->children;
if (args_func.size() != 1)
throw Exception("Table function '" + getName() + "' must have arguments", ErrorCodes::LOGICAL_ERROR);
ASTs & args = args_func.at(0)->children;
if (args.size() != 2)
throw Exception("Table function '" + getName() + "' requires 2 arguments: format and data", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
for (auto & arg : args)
arg = evaluateConstantExpressionOrIdentifierAsLiteral(arg, context);
format = args[0]->as<ASTLiteral &>().value.safeGet<String>();
data = args[1]->as<ASTLiteral &>().value.safeGet<String>();
}
ColumnsDescription TableFunctionFormat::getActualTableStructure(ContextPtr context) const
{
auto read_buffer_creator = [&]()
{
return std::make_unique<ReadBufferFromString>(data);
};
return readSchemaFromFormat(format, std::nullopt, read_buffer_creator, context);
}
Block TableFunctionFormat::parseData(ColumnsDescription columns, ContextPtr context) const
{
Block result;
for (const auto & name_and_type : columns.getAllPhysical())
result.insert({name_and_type.type->createColumn(), name_and_type.type, name_and_type.name});
auto read_buf = std::make_unique<ReadBufferFromString>(data);
auto input_format = context->getInputFormat(format, *read_buf, result, context->getSettingsRef().max_block_size);
QueryPipelineBuilder builder;
builder.init(Pipe(input_format));
auto pipeline = std::make_unique<QueryPipeline>(QueryPipelineBuilder::getPipeline(std::move(builder)));
auto reader = std::make_unique<PullingPipelineExecutor>(*pipeline);
reader->pull(result);
/// In case when data contains more then 1 block we read all blocks
/// and combine them all to one big block (this is considered a rare case).
Block new_block;
while (reader->pull(new_block))
{
for (size_t i = 0; i != result.columns(); ++i)
{
auto & result_column = result.getByPosition(i);
const auto & new_column = new_block.getByPosition(i);
auto mutable_column = IColumn::mutate(result_column.column);
mutable_column->insertManyFrom(*new_column.column, 0, new_column.column->size());
result_column.column = std::move(mutable_column);
}
}
return result;
}
StoragePtr TableFunctionFormat::executeImpl(const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/) const
{
auto columns = getActualTableStructure(context);
Block res_block = parseData(columns, context);
auto res = StorageValues::create(StorageID(getDatabaseName(), table_name), columns, res_block);
res->startup();
return res;
}
void registerTableFunctionFormat(TableFunctionFactory & factory)
{
factory.registerFunction<TableFunctionFormat>(TableFunctionFactory::CaseInsensitive);
}
}

View File

@ -0,0 +1,33 @@
#pragma once
#include <TableFunctions/ITableFunction.h>
namespace DB
{
class Context;
/* format(format_name, data) - ...
*/
class TableFunctionFormat : public ITableFunction
{
public:
static constexpr auto name = "format";
std::string getName() const override { return name; }
bool hasStaticStructure() const override { return false; }
private:
StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns) const override;
const char * getStorageTypeName() const override { return "Values"; }
ColumnsDescription getActualTableStructure(ContextPtr context) const override;
void parseArguments(const ASTPtr & ast_function, ContextPtr context) override;
Block parseData(ColumnsDescription columns, ContextPtr context) const;
String format;
String data;
};
}

View File

@ -5,11 +5,9 @@
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTLiteral.h>
#include <Common/Exception.h>
#include <Common/typeid_cast.h>
#include <Storages/StorageInput.h>
#include <DataTypes/DataTypeFactory.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <boost/algorithm/string.hpp>
#include "registerTableFunctions.h"

View File

@ -49,6 +49,8 @@ void registerTableFunctions()
#endif
registerTableFunctionDictionary(factory);
registerTableFunctionFormat(factory);
}
}

View File

@ -48,6 +48,8 @@ void registerTableFunctionSQLite(TableFunctionFactory & factory);
void registerTableFunctionDictionary(TableFunctionFactory & factory);
void registerTableFunctionFormat(TableFunctionFactory & factory);
void registerTableFunctions();
}

View File

@ -0,0 +1,52 @@
111 Hello
123 World
111 Hello
123 World
111 Hello
123 World
111 Hello
123 World
111 Hello
123 World
111 Hello
123 World
111 Hello
123 World
111 Hello
123 World
111 Hello
123 World
111 Hello
123 World
111 Hello
123 World
111 Hello
123 World
111 Hello
123 World
123 World
123 World
123 World
123 World
111 Hello
111 Hello
111 Hello
111 Hello
111 Hello
123 World
123 World
123 World
123 World
123 World
1 2 [1,2,3] [['abc'],[],['d','e']]
c1 Nullable(Float64)
c2 Nullable(Float64)
c3 Array(Nullable(Float64))
c4 Array(Array(Nullable(String)))
111 Hello
123 World
111 Hello
131 Hello
123 World
b Nullable(Float64)
a Nullable(String)

View File

@ -0,0 +1,68 @@
select * from format(JSONEachRow,
$$
{"a": "Hello", "b": 111}
{"a": "World", "b": 123}
{"a": "Hello", "b": 111}
{"a": "World", "b": 123}
{"a": "Hello", "b": 111}
{"a": "World", "b": 123}
{"a": "Hello", "b": 111}
{"a": "World", "b": 123}
{"a": "Hello", "b": 111}
{"a": "World", "b": 123}
{"a": "Hello", "b": 111}
{"a": "World", "b": 123}
{"a": "Hello", "b": 111}
{"a": "World", "b": 123}
{"a": "Hello", "b": 111}
{"a": "World", "b": 123}
{"a": "Hello", "b": 111}
{"a": "World", "b": 123}
{"a": "Hello", "b": 111}
{"a": "World", "b": 123}
$$);
set max_block_size=5;
select * from format(JSONEachRow,
$$
{"a": "Hello", "b": 111}
{"a": "World", "b": 123}
{"a": "Hello", "b": 111}
{"a": "World", "b": 123}
{"a": "Hello", "b": 111}
{"a": "World", "b": 123}
{"a": "Hello", "b": 111}
{"a": "World", "b": 123}
{"a": "Hello", "b": 111}
{"a": "World", "b": 123}
{"a": "Hello", "b": 111}
{"a": "World", "b": 123}
{"a": "Hello", "b": 111}
{"a": "World", "b": 123}
{"a": "Hello", "b": 111}
{"a": "World", "b": 123}
{"a": "Hello", "b": 111}
{"a": "World", "b": 123}
{"a": "Hello", "b": 111}
{"a": "World", "b": 123}
$$);
select * from format(CSV, '1,2,"[1,2,3]","[[\'abc\'], [], [\'d\', \'e\']]"');
desc format(CSV, '1,2,"[1,2,3]","[[\'abc\'], [], [\'d\', \'e\']]"');
drop table if exists test;
create table test as format(JSONEachRow,
$$
{"a": "Hello", "b": 111}
{"a": "World", "b": 123}
{"a": "Hello", "b": 111}
{"a": "Hello", "b": 131}
{"a": "World", "b": 123}
$$);
select * from test;
desc table test;
drop table test;