Merge pull request #45483 from Avogar/fix-hdfs-cluster

Fix schema inference from insertion table in hdfsCluster
This commit is contained in:
Kruglov Pavel 2023-01-25 20:24:51 +01:00 committed by GitHub
commit cd3d3fdd68
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 24 additions and 11 deletions

View File

@ -42,15 +42,17 @@ StorageHDFSCluster::StorageHDFSCluster(
const String & format_name_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
const String & compression_method_)
const String & compression_method_,
bool structure_argument_was_provided_)
: IStorageCluster(table_id_)
, cluster_name(cluster_name_)
, uri(uri_)
, format_name(format_name_)
, compression_method(compression_method_)
, structure_argument_was_provided(structure_argument_was_provided_)
{
context_->getRemoteHostFilter().checkURL(Poco::URI(uri_));
checkHDFSURL(uri_);
context_->getRemoteHostFilter().checkURL(Poco::URI(uri_));
StorageInMemoryMetadata storage_metadata;
@ -58,7 +60,6 @@ StorageHDFSCluster::StorageHDFSCluster(
{
auto columns = StorageHDFS::getTableStructureFromData(format_name, uri_, compression_method, context_);
storage_metadata.setColumns(columns);
add_columns_structure_to_query = true;
}
else
storage_metadata.setColumns(columns_);
@ -91,7 +92,7 @@ Pipe StorageHDFSCluster::read(
const bool add_agg_info = processed_stage == QueryProcessingStage::WithMergeableState;
auto query_to_send = query_info.original_query->clone();
if (add_columns_structure_to_query)
if (!structure_argument_was_provided)
addColumnsStructureToQueryWithClusterEngine(
query_to_send, StorageDictionary::generateNamesAndTypesDescription(storage_snapshot->metadata->getColumns().getAll()), 3, getName());

View File

@ -28,7 +28,8 @@ public:
const String & format_name_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
const String & compression_method_);
const String & compression_method_,
bool structure_argument_was_provided_);
std::string getName() const override { return "HDFSCluster"; }
@ -48,7 +49,7 @@ private:
String uri;
String format_name;
String compression_method;
bool add_columns_structure_to_query = false;
bool structure_argument_was_provided;
};

View File

@ -83,7 +83,7 @@ ColumnsDescription TableFunctionHDFSCluster::getActualTableStructure(ContextPtr
StoragePtr TableFunctionHDFSCluster::getStorage(
const String & /*source*/, const String & /*format_*/, const ColumnsDescription &, ContextPtr context,
const String & /*source*/, const String & /*format_*/, const ColumnsDescription & columns, ContextPtr context,
const std::string & table_name, const String & /*compression_method_*/) const
{
StoragePtr storage;
@ -94,7 +94,7 @@ StoragePtr TableFunctionHDFSCluster::getStorage(
filename,
StorageID(getDatabaseName(), table_name),
format,
getActualTableStructure(context),
columns,
ConstraintsDescription{},
String{},
context,
@ -107,8 +107,8 @@ StoragePtr TableFunctionHDFSCluster::getStorage(
storage = std::make_shared<StorageHDFSCluster>(
context,
cluster_name, filename, StorageID(getDatabaseName(), table_name),
format, getActualTableStructure(context), ConstraintsDescription{},
compression_method);
format, columns, ConstraintsDescription{},
compression_method, structure != "auto");
}
return storage;
}

View File

@ -28,7 +28,6 @@ public:
{
return name;
}
bool hasStaticStructure() const override { return true; }
protected:
StoragePtr getStorage(

View File

@ -0,0 +1,11 @@
-- Tags: no-fasttest, no-parallel, no-cpu-aarch64
-- Tag no-fasttest: Depends on Java
insert into table function hdfs('hdfs://localhost:12222/test_02536.jsonl', 'TSV') select '{"x" : {"a" : 1, "b" : 2}}' settings hdfs_truncate_on_insert=1;
drop table if exists test;
create table test (x Tuple(a UInt32, b UInt32)) engine=Memory();
insert into test select * from hdfsCluster('test_cluster_two_shards_localhost', 'hdfs://localhost:12222/test_02536.jsonl') settings use_structure_from_insertion_table_in_table_functions=0; -- {serverError TYPE_MISMATCH}
insert into test select * from hdfsCluster('test_cluster_two_shards_localhost', 'hdfs://localhost:12222/test_02536.jsonl') settings use_structure_from_insertion_table_in_table_functions=1;
select * from test;
drop table test;