diff --git a/docker/test/stateless/Dockerfile b/docker/test/stateless/Dockerfile index 68c08c23b3f..c196c6e9562 100644 --- a/docker/test/stateless/Dockerfile +++ b/docker/test/stateless/Dockerfile @@ -32,7 +32,9 @@ RUN apt-get update -y \ mysql-client=8.0* \ postgresql-client \ sqlite3 \ - awscli + awscli \ + openjdk-11-jre-headless + RUN pip3 install numpy scipy pandas Jinja2 @@ -58,10 +60,16 @@ RUN arch=${TARGETARCH:-amd64} \ && wget "https://dl.min.io/client/mc/release/linux-${arch}/mc" \ && chmod +x ./mc + +RUN wget 'https://dlcdn.apache.org/hadoop/common/hadoop-3.3.1/hadoop-3.3.1.tar.gz' \ + && tar -xvf hadoop-3.3.1.tar.gz \ + && rm -rf hadoop-3.3.1.tar.gz + ENV MINIO_ROOT_USER="clickhouse" ENV MINIO_ROOT_PASSWORD="clickhouse" ENV EXPORT_S3_STORAGE_POLICIES=1 COPY run.sh / COPY setup_minio.sh / +COPY setup_hdfs_minicluster.sh / CMD ["/bin/bash", "/run.sh"] diff --git a/docker/test/stateless/run.sh b/docker/test/stateless/run.sh index 50632630f1f..4d105061ecc 100755 --- a/docker/test/stateless/run.sh +++ b/docker/test/stateless/run.sh @@ -19,6 +19,7 @@ ln -s /usr/share/clickhouse-test/clickhouse-test /usr/bin/clickhouse-test /usr/share/clickhouse-test/config/install.sh ./setup_minio.sh +./setup_hdfs_minicluster.sh # For flaky check we also enable thread fuzzer if [ "$NUM_TRIES" -gt "1" ]; then diff --git a/docker/test/stateless/setup_hdfs_minicluster.sh b/docker/test/stateless/setup_hdfs_minicluster.sh new file mode 100755 index 00000000000..128db96d694 --- /dev/null +++ b/docker/test/stateless/setup_hdfs_minicluster.sh @@ -0,0 +1,20 @@ +#!/bin/bash + +set -e -x -a -u + +ls -lha + +cd hadoop-3.3.1 + +export JAVA_HOME=/usr +mkdir -p target/test/data +chown clickhouse ./target/test/data +sudo -E -u clickhouse bin/mapred minicluster -format -nomr -nnport 12222 & + +while ! nc -z localhost 12222; do + sleep 1 +done + +lsof -i :12222 + +sleep 5 diff --git a/src/Storages/HDFS/StorageHDFS.cpp b/src/Storages/HDFS/StorageHDFS.cpp index 2e4c608dc9d..2edcbeb9a7e 100644 --- a/src/Storages/HDFS/StorageHDFS.cpp +++ b/src/Storages/HDFS/StorageHDFS.cpp @@ -15,6 +15,7 @@ #include #include +#include #include #include @@ -238,7 +239,7 @@ public: { auto path_and_uri = getPathFromUriAndUriWithoutPath(uris_[0]); HDFSBuilderWrapper builder = createHDFSBuilder(path_and_uri.second + "/", context->getGlobalContext()->getConfigRef()); - HDFSFSPtr fs = createHDFSFS(builder.get()); + auto fs = createHDFSFS(builder.get()); for (const auto & uri : uris_) { path_and_uri = getPathFromUriAndUriWithoutPath(uri); @@ -637,7 +638,7 @@ void StorageHDFS::truncate(const ASTPtr & /* query */, const StorageMetadataPtr const String url = uris[0].substr(0, begin_of_path); HDFSBuilderWrapper builder = createHDFSBuilder(url + "/", local_context->getGlobalContext()->getConfigRef()); - HDFSFSPtr fs = createHDFSFS(builder.get()); + auto fs = createHDFSFS(builder.get()); for (const auto & uri : uris) { diff --git a/src/Storages/HDFS/StorageHDFSCluster.cpp b/src/Storages/HDFS/StorageHDFSCluster.cpp index fd242169a8a..08dabfccf55 100644 --- a/src/Storages/HDFS/StorageHDFSCluster.cpp +++ b/src/Storages/HDFS/StorageHDFSCluster.cpp @@ -2,6 +2,8 @@ #if USE_HDFS +#include + #include #include #include @@ -10,16 +12,19 @@ #include #include #include -#include #include #include -#include #include + +#include + +#include #include #include + #include #include -#include +#include #include @@ -28,6 +33,7 @@ namespace DB { StorageHDFSCluster::StorageHDFSCluster( + ContextPtr context_, String cluster_name_, const String & uri_, const StorageID & table_id_, @@ -41,8 +47,19 @@ StorageHDFSCluster::StorageHDFSCluster( , format_name(format_name_) , compression_method(compression_method_) { + context_->getRemoteHostFilter().checkURL(Poco::URI(uri_)); + checkHDFSURL(uri_); + StorageInMemoryMetadata storage_metadata; - storage_metadata.setColumns(columns_); + + if (columns_.empty()) + { + auto columns = StorageHDFS::getTableStructureFromData(format_name, uri_, compression_method, context_); + storage_metadata.setColumns(columns); + } + else + storage_metadata.setColumns(columns_); + storage_metadata.setConstraints(constraints_); setInMemoryMetadata(storage_metadata); } diff --git a/src/Storages/HDFS/StorageHDFSCluster.h b/src/Storages/HDFS/StorageHDFSCluster.h index 7facc31152e..21ae73c11ea 100644 --- a/src/Storages/HDFS/StorageHDFSCluster.h +++ b/src/Storages/HDFS/StorageHDFSCluster.h @@ -20,6 +20,7 @@ class StorageHDFSCluster : public IStorage { public: StorageHDFSCluster( + ContextPtr context_, String cluster_name_, const String & uri_, const StorageID & table_id_, diff --git a/src/TableFunctions/TableFunctionHDFSCluster.cpp b/src/TableFunctions/TableFunctionHDFSCluster.cpp index 46118cf109a..80f19cd015a 100644 --- a/src/TableFunctions/TableFunctionHDFSCluster.cpp +++ b/src/TableFunctions/TableFunctionHDFSCluster.cpp @@ -31,13 +31,15 @@ namespace DB namespace ErrorCodes { extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; + extern const int BAD_GET; } void TableFunctionHDFSCluster::parseArguments(const ASTPtr & ast_function, ContextPtr context) { + auto ast_copy = ast_function->clone(); /// Parse args - ASTs & args_func = ast_function->children; + ASTs & args_func = ast_copy->children; if (args_func.size() != 1) throw Exception("Table function '" + getName() + "' must have arguments.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); @@ -46,41 +48,50 @@ void TableFunctionHDFSCluster::parseArguments(const ASTPtr & ast_function, Conte const auto message = fmt::format( "The signature of table function {} shall be the following:\n" \ - " - cluster, uri, format, structure", + " - cluster, uri\n",\ + " - cluster, format\n",\ + " - cluster, uri, format, structure\n",\ " - cluster, uri, format, structure, compression_method", getName()); - if (args.size() < 4 || args.size() > 5) + if (args.size() < 2 || args.size() > 5) throw Exception(message, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); for (auto & arg : args) arg = evaluateConstantExpressionOrIdentifierAsLiteral(arg, context); - /// This arguments are always the first + /// This argument is always the first cluster_name = args[0]->as().value.safeGet(); - uri = args[1]->as().value.safeGet(); - format = args[2]->as().value.safeGet(); - structure = args[3]->as().value.safeGet(); - if (args.size() >= 5) - compression_method = args[4]->as().value.safeGet(); + + if (!context->tryGetCluster(cluster_name)) + throw Exception(ErrorCodes::BAD_GET, "Requested cluster '{}' not found", cluster_name); + + /// Just cut the first arg (cluster_name) and try to parse other table function arguments as is + args.erase(args.begin()); + + ITableFunctionFileLike::parseArguments(ast_copy, context); } ColumnsDescription TableFunctionHDFSCluster::getActualTableStructure(ContextPtr context) const { + if (structure == "auto") + return StorageHDFS::getTableStructureFromData(format, filename, compression_method, context); + return parseColumnsListFromString(structure, context); } -StoragePtr TableFunctionHDFSCluster::executeImpl( - const ASTPtr & /*function*/, ContextPtr context, - const std::string & table_name, ColumnsDescription /*cached_columns*/) const + +StoragePtr TableFunctionHDFSCluster::getStorage( + const String & /*source*/, const String & /*format_*/, const ColumnsDescription &, ContextPtr context, + const std::string & table_name, const String & /*compression_method_*/) const { StoragePtr storage; if (context->getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY) { /// On worker node this uri won't contains globs storage = std::make_shared( - uri, + filename, StorageID(getDatabaseName(), table_name), format, getActualTableStructure(context), @@ -94,17 +105,14 @@ StoragePtr TableFunctionHDFSCluster::executeImpl( else { storage = std::make_shared( - cluster_name, uri, StorageID(getDatabaseName(), table_name), + context, + cluster_name, filename, StorageID(getDatabaseName(), table_name), format, getActualTableStructure(context), ConstraintsDescription{}, compression_method); } - - storage->startup(); - return storage; } - void registerTableFunctionHDFSCluster(TableFunctionFactory & factory) { factory.registerFunction(); diff --git a/src/TableFunctions/TableFunctionHDFSCluster.h b/src/TableFunctions/TableFunctionHDFSCluster.h index b5464e2fd19..f8f86dda939 100644 --- a/src/TableFunctions/TableFunctionHDFSCluster.h +++ b/src/TableFunctions/TableFunctionHDFSCluster.h @@ -4,7 +4,7 @@ #if USE_HDFS -#include +#include namespace DB @@ -20,7 +20,7 @@ class Context; * On worker node it asks initiator about next task to process, processes it. * This is repeated until the tasks are finished. */ -class TableFunctionHDFSCluster : public ITableFunction +class TableFunctionHDFSCluster : public ITableFunctionFileLike { public: static constexpr auto name = "hdfsCluster"; @@ -31,11 +31,9 @@ public: bool hasStaticStructure() const override { return true; } protected: - StoragePtr executeImpl( - const ASTPtr & ast_function, - ContextPtr context, - const std::string & table_name, - ColumnsDescription cached_columns) const override; + StoragePtr getStorage( + const String & source, const String & format_, const ColumnsDescription & columns, ContextPtr global_context, + const std::string & table_name, const String & compression_method_) const override; const char * getStorageTypeName() const override { return "HDFSCluster"; } @@ -45,10 +43,6 @@ protected: void parseArguments(const ASTPtr &, ContextPtr) override; String cluster_name; - String uri; - String format; - String structure; - String compression_method = "auto"; }; } diff --git a/tests/queries/0_stateless/02244_hdfs_cluster.reference b/tests/queries/0_stateless/02244_hdfs_cluster.reference new file mode 100644 index 00000000000..4bf4799e904 --- /dev/null +++ b/tests/queries/0_stateless/02244_hdfs_cluster.reference @@ -0,0 +1,48 @@ +1 2 3 +4 5 6 +7 8 9 +1 2 3 +4 5 6 +7 8 9 +1 2 3 +4 5 6 +7 8 9 +1 2 3 +4 5 6 +7 8 9 +1 2 3 +4 5 6 +7 8 9 +1 2 3 +4 5 6 +7 8 9 +1 2 3 +4 5 6 +7 8 9 +1 2 3 +4 5 6 +7 8 9 +c1 Nullable(Float64) +c2 Nullable(Float64) +c3 Nullable(Float64) +c1 Nullable(Float64) +c2 Nullable(Float64) +c3 Nullable(Float64) +c1 UInt32 +c2 UInt32 +c3 UInt32 +c1 UInt32 +c2 UInt32 +c3 UInt32 +c1 Nullable(Float64) +c2 Nullable(Float64) +c3 Nullable(Float64) +c1 Nullable(Float64) +c2 Nullable(Float64) +c3 Nullable(Float64) +c1 UInt32 +c2 UInt32 +c3 UInt32 +c1 UInt32 +c2 UInt32 +c3 UInt32 diff --git a/tests/queries/0_stateless/02244_hdfs_cluster.sql b/tests/queries/0_stateless/02244_hdfs_cluster.sql new file mode 100644 index 00000000000..ffd4a35a506 --- /dev/null +++ b/tests/queries/0_stateless/02244_hdfs_cluster.sql @@ -0,0 +1,26 @@ +-- Tags: no-fasttest, no-parallel, no-cpu-aarch64 +-- Tag no-fasttest: Depends on Java + +insert into table function hdfs('hdfs://localhost:12222/test_1.tsv', 'TSV', 'column1 UInt32, column2 UInt32, column3 UInt32') select 1, 2, 3 settings hdfs_truncate_on_insert=1; +insert into table function hdfs('hdfs://localhost:12222/test_2.tsv', 'TSV', 'column1 UInt32, column2 UInt32, column3 UInt32') select 4, 5, 6 settings hdfs_truncate_on_insert=1; +insert into table function hdfs('hdfs://localhost:12222/test_3.tsv', 'TSV', 'column1 UInt32, column2 UInt32, column3 UInt32') select 7, 8, 9 settings hdfs_truncate_on_insert=1; + +select * from hdfs('hdfs://localhost:12222/test_{1,2,3}.tsv') order by c1, c2, c3; +select * from hdfs('hdfs://localhost:12222/test_{1,2,3}.tsv', 'TSV') order by c1, c2, c3; +select * from hdfs('hdfs://localhost:12222/test_{1,2,3}.tsv', 'TSV', 'c1 UInt32, c2 UInt32, c3 UInt32') order by c1, c2, c3; +select * from hdfs('hdfs://localhost:12222/test_{1,2,3}.tsv', 'TSV', 'c1 UInt32, c2 UInt32, c3 UInt32', 'auto') order by c1, c2, c3; + +select * from hdfsCluster('test_cluster_two_shards_localhost', 'hdfs://localhost:12222/test_{1,2,3}.tsv') order by c1, c2, c3; +select * from hdfsCluster('test_cluster_two_shards_localhost', 'hdfs://localhost:12222/test_{1,2,3}.tsv', 'TSV') order by c1, c2, c3; +select * from hdfsCluster('test_cluster_two_shards_localhost', 'hdfs://localhost:12222/test_{1,2,3}.tsv', 'TSV', 'c1 UInt32, c2 UInt32, c3 UInt32') order by c1, c2, c3; +select * from hdfsCluster('test_cluster_two_shards_localhost', 'hdfs://localhost:12222/test_{1,2,3}.tsv', 'TSV', 'c1 UInt32, c2 UInt32, c3 UInt32', 'auto') order by c1, c2, c3; + +desc hdfs('hdfs://localhost:12222/test_{1,2,3}.tsv'); +desc hdfs('hdfs://localhost:12222/test_{1,2,3}.tsv', 'TSV'); +desc hdfs('hdfs://localhost:12222/test_{1,2,3}.tsv', 'TSV', 'c1 UInt32, c2 UInt32, c3 UInt32'); +desc hdfs('hdfs://localhost:12222/test_{1,2,3}.tsv', 'TSV', 'c1 UInt32, c2 UInt32, c3 UInt32', 'auto'); + +desc hdfsCluster('test_cluster_two_shards_localhost', 'hdfs://localhost:12222/test_{1,2,3}.tsv'); +desc hdfsCluster('test_cluster_two_shards_localhost', 'hdfs://localhost:12222/test_{1,2,3}.tsv', 'TSV'); +desc hdfsCluster('test_cluster_two_shards_localhost', 'hdfs://localhost:12222/test_{1,2,3}.tsv', 'TSV', 'c1 UInt32, c2 UInt32, c3 UInt32'); +desc hdfsCluster('test_cluster_two_shards_localhost', 'hdfs://localhost:12222/test_{1,2,3}.tsv', 'TSV', 'c1 UInt32, c2 UInt32, c3 UInt32', 'auto');