From 5f27f690f471c1c54a472d9b37402d08f858d1b0 Mon Sep 17 00:00:00 2001 From: Nikita Mikhaylov Date: Thu, 31 Mar 2022 14:19:56 +0200 Subject: [PATCH] Revert "Added support for schema inference for `hdfsCluster` (#35602)" This reverts commit f6bfdcc0c91ddc6e7280f0bbf2efb0f3a6a4d5a0. --- docker/test/stateless/Dockerfile | 10 +--- docker/test/stateless/run.sh | 1 - .../test/stateless/setup_hdfs_minicluster.sh | 20 ------- src/Storages/HDFS/StorageHDFS.cpp | 5 +- src/Storages/HDFS/StorageHDFSCluster.cpp | 26 ++------- src/Storages/HDFS/StorageHDFSCluster.h | 1 - .../TableFunctionHDFSCluster.cpp | 53 +++++++------------ src/TableFunctions/TableFunctionHDFSCluster.h | 16 ++++-- .../0_stateless/02244_hdfs_cluster.reference | 48 ----------------- .../0_stateless/02244_hdfs_cluster.sql | 26 --------- 10 files changed, 38 insertions(+), 168 deletions(-) delete mode 100755 docker/test/stateless/setup_hdfs_minicluster.sh delete mode 100644 tests/queries/0_stateless/02244_hdfs_cluster.reference delete mode 100644 tests/queries/0_stateless/02244_hdfs_cluster.sql diff --git a/docker/test/stateless/Dockerfile b/docker/test/stateless/Dockerfile index c1c50fe6b4e..68c08c23b3f 100644 --- a/docker/test/stateless/Dockerfile +++ b/docker/test/stateless/Dockerfile @@ -32,9 +32,7 @@ RUN apt-get update -y \ mysql-client=8.0* \ postgresql-client \ sqlite3 \ - awscli \ - openjdk-11-jre-headless - + awscli RUN pip3 install numpy scipy pandas Jinja2 @@ -60,16 +58,10 @@ RUN arch=${TARGETARCH:-amd64} \ && wget "https://dl.min.io/client/mc/release/linux-${arch}/mc" \ && chmod +x ./mc - -RUN wget 'https://dlcdn.apache.org/hadoop/common/hadoop-3.2.2/hadoop-3.2.2.tar.gz' \ - && tar -xvf hadoop-3.2.2.tar.gz \ - && rm -rf hadoop-3.2.2.tar.gz - ENV MINIO_ROOT_USER="clickhouse" ENV MINIO_ROOT_PASSWORD="clickhouse" ENV EXPORT_S3_STORAGE_POLICIES=1 COPY run.sh / COPY setup_minio.sh / -COPY setup_hdfs_minicluster.sh / CMD ["/bin/bash", "/run.sh"] diff --git a/docker/test/stateless/run.sh b/docker/test/stateless/run.sh index e9752061dca..f8b73791388 100755 --- a/docker/test/stateless/run.sh +++ b/docker/test/stateless/run.sh @@ -19,7 +19,6 @@ ln -s /usr/share/clickhouse-test/clickhouse-test /usr/bin/clickhouse-test /usr/share/clickhouse-test/config/install.sh ./setup_minio.sh -./setup_hdfs_minicluster.sh # For flaky check we also enable thread fuzzer if [ "$NUM_TRIES" -gt "1" ]; then diff --git a/docker/test/stateless/setup_hdfs_minicluster.sh b/docker/test/stateless/setup_hdfs_minicluster.sh deleted file mode 100755 index 9754364321f..00000000000 --- a/docker/test/stateless/setup_hdfs_minicluster.sh +++ /dev/null @@ -1,20 +0,0 @@ -#!/bin/bash - -set -e -x -a -u - -ls -lha - -cd hadoop-3.2.2 - -export JAVA_HOME=/usr -mkdir -p target/test/data -chown clickhouse ./target/test/data -sudo -E -u clickhouse bin/mapred minicluster -format -nomr -nnport 12222 & - -while ! nc -z localhost 12222; do - sleep 1 -done - -lsof -i :12222 - -sleep 5 diff --git a/src/Storages/HDFS/StorageHDFS.cpp b/src/Storages/HDFS/StorageHDFS.cpp index b681a31c7c8..74f6937dbae 100644 --- a/src/Storages/HDFS/StorageHDFS.cpp +++ b/src/Storages/HDFS/StorageHDFS.cpp @@ -15,7 +15,6 @@ #include #include -#include #include #include @@ -248,7 +247,7 @@ public: { auto path_and_uri = getPathFromUriAndUriWithoutPath(uris_[0]); HDFSBuilderWrapper builder = createHDFSBuilder(path_and_uri.second + "/", context->getGlobalContext()->getConfigRef()); - auto fs = createHDFSFS(builder.get()); + HDFSFSPtr fs = createHDFSFS(builder.get()); for (const auto & uri : uris_) { path_and_uri = getPathFromUriAndUriWithoutPath(uri); @@ -657,7 +656,7 @@ void StorageHDFS::truncate(const ASTPtr & /* query */, const StorageMetadataPtr const String url = uris[0].substr(0, begin_of_path); HDFSBuilderWrapper builder = createHDFSBuilder(url + "/", local_context->getGlobalContext()->getConfigRef()); - auto fs = createHDFSFS(builder.get()); + HDFSFSPtr fs = createHDFSFS(builder.get()); for (const auto & uri : uris) { diff --git a/src/Storages/HDFS/StorageHDFSCluster.cpp b/src/Storages/HDFS/StorageHDFSCluster.cpp index c11ebe11183..b039caa4330 100644 --- a/src/Storages/HDFS/StorageHDFSCluster.cpp +++ b/src/Storages/HDFS/StorageHDFSCluster.cpp @@ -2,8 +2,6 @@ #if USE_HDFS -#include - #include #include #include @@ -12,20 +10,16 @@ #include #include #include - +#include #include #include -#include - -#include - #include +#include #include #include - #include #include -#include +#include #include @@ -34,7 +28,6 @@ namespace DB { StorageHDFSCluster::StorageHDFSCluster( - ContextPtr context_, String cluster_name_, const String & uri_, const StorageID & table_id_, @@ -48,19 +41,8 @@ StorageHDFSCluster::StorageHDFSCluster( , format_name(format_name_) , compression_method(compression_method_) { - context_->getRemoteHostFilter().checkURL(Poco::URI(uri_)); - checkHDFSURL(uri_); - StorageInMemoryMetadata storage_metadata; - - if (columns_.empty()) - { - auto columns = StorageHDFS::getTableStructureFromData(format_name, uri_, compression_method, context_); - storage_metadata.setColumns(columns); - } - else - storage_metadata.setColumns(columns_); - + storage_metadata.setColumns(columns_); storage_metadata.setConstraints(constraints_); setInMemoryMetadata(storage_metadata); } diff --git a/src/Storages/HDFS/StorageHDFSCluster.h b/src/Storages/HDFS/StorageHDFSCluster.h index 677a00f661c..953311de056 100644 --- a/src/Storages/HDFS/StorageHDFSCluster.h +++ b/src/Storages/HDFS/StorageHDFSCluster.h @@ -34,7 +34,6 @@ public: protected: StorageHDFSCluster( - ContextPtr context_, String cluster_name_, const String & uri_, const StorageID & table_id_, diff --git a/src/TableFunctions/TableFunctionHDFSCluster.cpp b/src/TableFunctions/TableFunctionHDFSCluster.cpp index f1d3567efa7..ca1ac6a11cd 100644 --- a/src/TableFunctions/TableFunctionHDFSCluster.cpp +++ b/src/TableFunctions/TableFunctionHDFSCluster.cpp @@ -31,15 +31,13 @@ namespace DB namespace ErrorCodes { extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; - extern const int BAD_GET; } void TableFunctionHDFSCluster::parseArguments(const ASTPtr & ast_function, ContextPtr context) { - auto ast_copy = ast_function->clone(); /// Parse args - ASTs & args_func = ast_copy->children; + ASTs & args_func = ast_function->children; if (args_func.size() != 1) throw Exception("Table function '" + getName() + "' must have arguments.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); @@ -48,50 +46,41 @@ void TableFunctionHDFSCluster::parseArguments(const ASTPtr & ast_function, Conte const auto message = fmt::format( "The signature of table function {} shall be the following:\n" \ - " - cluster, uri\n",\ - " - cluster, format\n",\ - " - cluster, uri, format, structure\n",\ + " - cluster, uri, format, structure", " - cluster, uri, format, structure, compression_method", getName()); - if (args.size() < 2 || args.size() > 5) + if (args.size() < 4 || args.size() > 5) throw Exception(message, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); for (auto & arg : args) arg = evaluateConstantExpressionOrIdentifierAsLiteral(arg, context); - /// This argument is always the first + /// This arguments are always the first cluster_name = args[0]->as().value.safeGet(); - - if (!context->tryGetCluster(cluster_name)) - throw Exception(ErrorCodes::BAD_GET, "Requested cluster '{}' not found", cluster_name); - - /// Just cut the first arg (cluster_name) and try to parse other table function arguments as is - args.erase(args.begin()); - - ITableFunctionFileLike::parseArguments(ast_copy, context); + uri = args[1]->as().value.safeGet(); + format = args[2]->as().value.safeGet(); + structure = args[3]->as().value.safeGet(); + if (args.size() >= 5) + compression_method = args[4]->as().value.safeGet(); } ColumnsDescription TableFunctionHDFSCluster::getActualTableStructure(ContextPtr context) const { - if (structure == "auto") - return StorageHDFS::getTableStructureFromData(format, filename, compression_method, context); - return parseColumnsListFromString(structure, context); } - -StoragePtr TableFunctionHDFSCluster::getStorage( - const String & /*source*/, const String & /*format_*/, const ColumnsDescription &, ContextPtr context, - const std::string & table_name, const String & /*compression_method_*/) const +StoragePtr TableFunctionHDFSCluster::executeImpl( + const ASTPtr & /*function*/, ContextPtr context, + const std::string & table_name, ColumnsDescription /*cached_columns*/) const { StoragePtr storage; if (context->getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY) { /// On worker node this uri won't contains globs storage = StorageHDFS::create( - filename, + uri, StorageID(getDatabaseName(), table_name), format, getActualTableStructure(context), @@ -105,19 +94,17 @@ StoragePtr TableFunctionHDFSCluster::getStorage( else { storage = StorageHDFSCluster::create( - context, - cluster_name, - filename, - StorageID(getDatabaseName(), table_name), - format, - getActualTableStructure(context), - ConstraintsDescription{}, - compression_method - ); + cluster_name, uri, StorageID(getDatabaseName(), table_name), + format, getActualTableStructure(context), ConstraintsDescription{}, + compression_method); } + + storage->startup(); + return storage; } + void registerTableFunctionHDFSCluster(TableFunctionFactory & factory) { factory.registerFunction(); diff --git a/src/TableFunctions/TableFunctionHDFSCluster.h b/src/TableFunctions/TableFunctionHDFSCluster.h index 0560d103303..58d1c3d9b05 100644 --- a/src/TableFunctions/TableFunctionHDFSCluster.h +++ b/src/TableFunctions/TableFunctionHDFSCluster.h @@ -4,7 +4,7 @@ #if USE_HDFS -#include +#include namespace DB @@ -20,7 +20,7 @@ class Context; * On worker node it asks initiator about next task to process, processes it. * This is repeated until the tasks are finished. */ -class TableFunctionHDFSCluster : public ITableFunctionFileLike +class TableFunctionHDFSCluster : public ITableFunction { public: static constexpr auto name = "hdfsCluster"; @@ -31,9 +31,11 @@ public: bool hasStaticStructure() const override { return true; } protected: - StoragePtr getStorage( - const String & source, const String & format_, const ColumnsDescription & columns, ContextPtr global_context, - const std::string & table_name, const String & compression_method_) const override; + StoragePtr executeImpl( + const ASTPtr & ast_function, + ContextPtr context, + const std::string & table_name, + ColumnsDescription cached_columns) const override; const char * getStorageTypeName() const override { return "HDFSCluster"; } @@ -41,6 +43,10 @@ protected: void parseArguments(const ASTPtr &, ContextPtr) override; String cluster_name; + String uri; + String format; + String structure; + String compression_method = "auto"; }; } diff --git a/tests/queries/0_stateless/02244_hdfs_cluster.reference b/tests/queries/0_stateless/02244_hdfs_cluster.reference deleted file mode 100644 index 390627138ef..00000000000 --- a/tests/queries/0_stateless/02244_hdfs_cluster.reference +++ /dev/null @@ -1,48 +0,0 @@ -1 2 3 -4 5 6 -7 8 9 -1 2 3 -4 5 6 -7 8 9 -1 2 3 -4 5 6 -7 8 9 -1 2 3 -4 5 6 -7 8 9 -1 2 3 -4 5 6 -7 8 9 -1 2 3 -4 5 6 -7 8 9 -1 2 3 -4 5 6 -7 8 9 -1 2 3 -4 5 6 -7 8 9 -c1 Nullable(String) -c2 Nullable(String) -c3 Nullable(String) -c1 Nullable(String) -c2 Nullable(String) -c3 Nullable(String) -c1 UInt32 -c2 UInt32 -c3 UInt32 -c1 UInt32 -c2 UInt32 -c3 UInt32 -c1 Nullable(String) -c2 Nullable(String) -c3 Nullable(String) -c1 Nullable(String) -c2 Nullable(String) -c3 Nullable(String) -c1 UInt32 -c2 UInt32 -c3 UInt32 -c1 UInt32 -c2 UInt32 -c3 UInt32 diff --git a/tests/queries/0_stateless/02244_hdfs_cluster.sql b/tests/queries/0_stateless/02244_hdfs_cluster.sql deleted file mode 100644 index ffd4a35a506..00000000000 --- a/tests/queries/0_stateless/02244_hdfs_cluster.sql +++ /dev/null @@ -1,26 +0,0 @@ --- Tags: no-fasttest, no-parallel, no-cpu-aarch64 --- Tag no-fasttest: Depends on Java - -insert into table function hdfs('hdfs://localhost:12222/test_1.tsv', 'TSV', 'column1 UInt32, column2 UInt32, column3 UInt32') select 1, 2, 3 settings hdfs_truncate_on_insert=1; -insert into table function hdfs('hdfs://localhost:12222/test_2.tsv', 'TSV', 'column1 UInt32, column2 UInt32, column3 UInt32') select 4, 5, 6 settings hdfs_truncate_on_insert=1; -insert into table function hdfs('hdfs://localhost:12222/test_3.tsv', 'TSV', 'column1 UInt32, column2 UInt32, column3 UInt32') select 7, 8, 9 settings hdfs_truncate_on_insert=1; - -select * from hdfs('hdfs://localhost:12222/test_{1,2,3}.tsv') order by c1, c2, c3; -select * from hdfs('hdfs://localhost:12222/test_{1,2,3}.tsv', 'TSV') order by c1, c2, c3; -select * from hdfs('hdfs://localhost:12222/test_{1,2,3}.tsv', 'TSV', 'c1 UInt32, c2 UInt32, c3 UInt32') order by c1, c2, c3; -select * from hdfs('hdfs://localhost:12222/test_{1,2,3}.tsv', 'TSV', 'c1 UInt32, c2 UInt32, c3 UInt32', 'auto') order by c1, c2, c3; - -select * from hdfsCluster('test_cluster_two_shards_localhost', 'hdfs://localhost:12222/test_{1,2,3}.tsv') order by c1, c2, c3; -select * from hdfsCluster('test_cluster_two_shards_localhost', 'hdfs://localhost:12222/test_{1,2,3}.tsv', 'TSV') order by c1, c2, c3; -select * from hdfsCluster('test_cluster_two_shards_localhost', 'hdfs://localhost:12222/test_{1,2,3}.tsv', 'TSV', 'c1 UInt32, c2 UInt32, c3 UInt32') order by c1, c2, c3; -select * from hdfsCluster('test_cluster_two_shards_localhost', 'hdfs://localhost:12222/test_{1,2,3}.tsv', 'TSV', 'c1 UInt32, c2 UInt32, c3 UInt32', 'auto') order by c1, c2, c3; - -desc hdfs('hdfs://localhost:12222/test_{1,2,3}.tsv'); -desc hdfs('hdfs://localhost:12222/test_{1,2,3}.tsv', 'TSV'); -desc hdfs('hdfs://localhost:12222/test_{1,2,3}.tsv', 'TSV', 'c1 UInt32, c2 UInt32, c3 UInt32'); -desc hdfs('hdfs://localhost:12222/test_{1,2,3}.tsv', 'TSV', 'c1 UInt32, c2 UInt32, c3 UInt32', 'auto'); - -desc hdfsCluster('test_cluster_two_shards_localhost', 'hdfs://localhost:12222/test_{1,2,3}.tsv'); -desc hdfsCluster('test_cluster_two_shards_localhost', 'hdfs://localhost:12222/test_{1,2,3}.tsv', 'TSV'); -desc hdfsCluster('test_cluster_two_shards_localhost', 'hdfs://localhost:12222/test_{1,2,3}.tsv', 'TSV', 'c1 UInt32, c2 UInt32, c3 UInt32'); -desc hdfsCluster('test_cluster_two_shards_localhost', 'hdfs://localhost:12222/test_{1,2,3}.tsv', 'TSV', 'c1 UInt32, c2 UInt32, c3 UInt32', 'auto');