Support schema inference for INSERT INTO FUNCTION

This commit is contained in:
avogar 2022-02-18 16:19:42 +00:00
parent 652980eec3
commit 653d769d34
11 changed files with 85 additions and 0 deletions

View File

@ -60,6 +60,18 @@ StoragePtr InterpreterInsertQuery::getTable(ASTInsertQuery & query)
{
const auto & factory = TableFunctionFactory::instance();
TableFunctionPtr table_function_ptr = factory.get(query.table_function, getContext());
/// If table function needs structure hint from select query
/// we can create a temporary pipeline and get the header.
if (query.select && table_function_ptr->needStructureHint())
{
InterpreterSelectWithUnionQuery interpreter_select{
query.select, getContext(), SelectQueryOptions(QueryProcessingStage::Complete, 1)};
QueryPipelineBuilder tmp_pipeline = interpreter_select.buildQueryPipeline();
ColumnsDescription structure_hint{tmp_pipeline.getHeader().getNamesAndTypesList()};
table_function_ptr->setStructureHint(structure_hint);
}
return table_function_ptr->execute(query.table_function, getContext(), table_function_ptr->getName());
}

View File

@ -52,6 +52,16 @@ public:
/// Returns actual table structure probably requested from remote server, may fail
virtual ColumnsDescription getActualTableStructure(ContextPtr /*context*/) const = 0;
/// Check if table function needs a structure hint from SELECT query in case of
/// INSERT INTO FUNCTION ... SELECT ...
/// It's used for schema inference.
virtual bool needStructureHint() const { return false; }
/// Set a structure hint from SELECT query in case of
/// INSERT INTO FUNCTION ... SELECT ...
/// This hint could be used not to repeat schema in function arguments.
virtual void setStructureHint(const ColumnsDescription &) {}
/// Create storage according to the query.
StoragePtr
execute(const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns_ = {}, bool use_global_context = false) const;

View File

@ -95,6 +95,9 @@ StoragePtr ITableFunctionFileLike::executeImpl(const ASTPtr & /*ast_function*/,
ColumnsDescription columns;
if (structure != "auto")
columns = parseColumnsListFromString(structure, context);
else if (!structure_hint.empty())
columns = structure_hint;
StoragePtr storage = getStorage(filename, format, columns, context, table_name, compression_method);
storage->startup();
return storage;

View File

@ -12,6 +12,10 @@ class Context;
*/
class ITableFunctionFileLike : public ITableFunction
{
public:
bool needStructureHint() const override { return structure == "auto"; }
void setStructureHint(const ColumnsDescription & structure_hint_) override { structure_hint = structure_hint_; }
protected:
void parseArguments(const ASTPtr & ast_function, ContextPtr context) override;
@ -20,6 +24,7 @@ protected:
String format = "auto";
String structure = "auto";
String compression_method = "auto";
ColumnsDescription structure_hint;
private:
StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns) const override;

View File

@ -41,6 +41,7 @@ ColumnsDescription TableFunctionFile::getActualTableStructure(ContextPtr context
return StorageFile::getTableStructureFromFile(format, paths, compression_method, std::nullopt, context);
}
return parseColumnsListFromString(structure, context);
}

View File

@ -168,6 +168,8 @@ StoragePtr TableFunctionS3::executeImpl(const ASTPtr & /*ast_function*/, Context
ColumnsDescription columns;
if (s3_configuration->structure != "auto")
columns = parseColumnsListFromString(s3_configuration->structure, context);
else if (!structure_hint.empty())
columns = structure_hint;
StoragePtr storage = StorageS3::create(
s3_uri,

View File

@ -25,6 +25,10 @@ public:
}
bool hasStaticStructure() const override { return s3_configuration->structure != "auto"; }
bool needStructureHint() const override { return s3_configuration->structure == "auto"; }
void setStructureHint(const ColumnsDescription & structure_hint_) override { structure_hint = structure_hint_; }
protected:
StoragePtr executeImpl(
const ASTPtr & ast_function,
@ -38,6 +42,7 @@ protected:
void parseArguments(const ASTPtr & ast_function, ContextPtr context) override;
std::optional<StorageS3Configuration> s3_configuration;
ColumnsDescription structure_hint;
};
class TableFunctionCOS : public TableFunctionS3

View File

@ -421,6 +421,16 @@ def test_schema_inference_with_globs(started_cluster):
assert(sorted(result.split()) == ['0', '\\N'])
def test_insert_select_schema_inference(started_cluster):
node1.query(f"insert into table function hdfs('hdfs://hdfs1:9000/test.native.zst) select toUInt64(1) as x")
result = node1.query(f"desc hdfs('hdfs://hdfs1:9000/test.native.zst')")
assert(result.strip() == 'x\tUInt64')
result = node1.query(f"select * from hdfs('hdfs://hdfs1:9000/test.native.zst')")
assert(int(result) == 1)
if __name__ == '__main__':
cluster.start()
input("Cluster created, press any key to destroy...")

View File

@ -1038,3 +1038,22 @@ def test_signatures(started_cluster):
result = instance.query(f"select * from s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test.arrow', 'minio', 'minio123', 'Arrow')")
assert(int(result) == 1)
def test_insert_select_schema_inference(started_cluster):
bucket = started_cluster.minio_bucket
instance = started_cluster.instances["dummy"]
instance.query(f"insert into function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test.native.zst') select toUInt64(1) as x")
result = instance.query(f"desc s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test.native.zst')")
assert(result.strip() == 'number\tUInt64')
result = instance.query(f"select * from s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test.native.zst') format CSV")
assert(int(result) == 1)
instance.query(f"insert into function url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test2.native.zst') select toUInt64(1) as x")
result = instance.query(f"desc url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test2.native.zst')")
assert(result.strip() == 'number\tUInt64')
result = instance.query(f"select * from url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test2.native.zst') format CSV")
assert(int(result) == 1)

View File

@ -0,0 +1,13 @@
x UInt32
y String
d Date
0 0 1970-01-01
1 1 1970-01-02
2 2 1970-01-03
3 3 1970-01-04
4 4 1970-01-05
5 5 1970-01-06
6 6 1970-01-07
7 7 1970-01-08
8 8 1970-01-09
9 9 1970-01-10

View File

@ -0,0 +1,5 @@
drop table if exists test;
create table test (x UInt32, y String, d Date) engine=Memory() as select number as x, toString(number) as y, toDate(number) as d from numbers(10);
insert into table function file('data.native.zst') select * from test;
desc file('data.native.zst');
select * from file('data.native.zst');