Revert "Added support for schema inference for hdfsCluster (#35602)"

This reverts commit f6bfdcc0c9.
This commit is contained in:
Nikita Mikhaylov 2022-03-31 14:19:56 +02:00 committed by GitHub
parent bd2ab32e3f
commit 5f27f690f4
10 changed files with 38 additions and 168 deletions

View File

@ -32,9 +32,7 @@ RUN apt-get update -y \
mysql-client=8.0* \ mysql-client=8.0* \
postgresql-client \ postgresql-client \
sqlite3 \ sqlite3 \
awscli \ awscli
openjdk-11-jre-headless
RUN pip3 install numpy scipy pandas Jinja2 RUN pip3 install numpy scipy pandas Jinja2
@ -60,16 +58,10 @@ RUN arch=${TARGETARCH:-amd64} \
&& wget "https://dl.min.io/client/mc/release/linux-${arch}/mc" \ && wget "https://dl.min.io/client/mc/release/linux-${arch}/mc" \
&& chmod +x ./mc && chmod +x ./mc
RUN wget 'https://dlcdn.apache.org/hadoop/common/hadoop-3.2.2/hadoop-3.2.2.tar.gz' \
&& tar -xvf hadoop-3.2.2.tar.gz \
&& rm -rf hadoop-3.2.2.tar.gz
ENV MINIO_ROOT_USER="clickhouse" ENV MINIO_ROOT_USER="clickhouse"
ENV MINIO_ROOT_PASSWORD="clickhouse" ENV MINIO_ROOT_PASSWORD="clickhouse"
ENV EXPORT_S3_STORAGE_POLICIES=1 ENV EXPORT_S3_STORAGE_POLICIES=1
COPY run.sh / COPY run.sh /
COPY setup_minio.sh / COPY setup_minio.sh /
COPY setup_hdfs_minicluster.sh /
CMD ["/bin/bash", "/run.sh"] CMD ["/bin/bash", "/run.sh"]

View File

@ -19,7 +19,6 @@ ln -s /usr/share/clickhouse-test/clickhouse-test /usr/bin/clickhouse-test
/usr/share/clickhouse-test/config/install.sh /usr/share/clickhouse-test/config/install.sh
./setup_minio.sh ./setup_minio.sh
./setup_hdfs_minicluster.sh
# For flaky check we also enable thread fuzzer # For flaky check we also enable thread fuzzer
if [ "$NUM_TRIES" -gt "1" ]; then if [ "$NUM_TRIES" -gt "1" ]; then

View File

@ -1,20 +0,0 @@
#!/bin/bash
set -e -x -a -u
ls -lha
cd hadoop-3.2.2
export JAVA_HOME=/usr
mkdir -p target/test/data
chown clickhouse ./target/test/data
sudo -E -u clickhouse bin/mapred minicluster -format -nomr -nnport 12222 &
while ! nc -z localhost 12222; do
sleep 1
done
lsof -i :12222
sleep 5

View File

@ -15,7 +15,6 @@
#include <Processors/Transforms/AddingDefaultsTransform.h> #include <Processors/Transforms/AddingDefaultsTransform.h>
#include <IO/WriteHelpers.h> #include <IO/WriteHelpers.h>
#include <IO/CompressionMethod.h>
#include <Interpreters/evaluateConstantExpression.h> #include <Interpreters/evaluateConstantExpression.h>
#include <Interpreters/ExpressionAnalyzer.h> #include <Interpreters/ExpressionAnalyzer.h>
@ -248,7 +247,7 @@ public:
{ {
auto path_and_uri = getPathFromUriAndUriWithoutPath(uris_[0]); auto path_and_uri = getPathFromUriAndUriWithoutPath(uris_[0]);
HDFSBuilderWrapper builder = createHDFSBuilder(path_and_uri.second + "/", context->getGlobalContext()->getConfigRef()); HDFSBuilderWrapper builder = createHDFSBuilder(path_and_uri.second + "/", context->getGlobalContext()->getConfigRef());
auto fs = createHDFSFS(builder.get()); HDFSFSPtr fs = createHDFSFS(builder.get());
for (const auto & uri : uris_) for (const auto & uri : uris_)
{ {
path_and_uri = getPathFromUriAndUriWithoutPath(uri); path_and_uri = getPathFromUriAndUriWithoutPath(uri);
@ -657,7 +656,7 @@ void StorageHDFS::truncate(const ASTPtr & /* query */, const StorageMetadataPtr
const String url = uris[0].substr(0, begin_of_path); const String url = uris[0].substr(0, begin_of_path);
HDFSBuilderWrapper builder = createHDFSBuilder(url + "/", local_context->getGlobalContext()->getConfigRef()); HDFSBuilderWrapper builder = createHDFSBuilder(url + "/", local_context->getGlobalContext()->getConfigRef());
auto fs = createHDFSFS(builder.get()); HDFSFSPtr fs = createHDFSFS(builder.get());
for (const auto & uri : uris) for (const auto & uri : uris)
{ {

View File

@ -2,8 +2,6 @@
#if USE_HDFS #if USE_HDFS
#include <Storages/HDFS/StorageHDFSCluster.h>
#include <Client/Connection.h> #include <Client/Connection.h>
#include <Core/QueryProcessingStage.h> #include <Core/QueryProcessingStage.h>
#include <DataTypes/DataTypeString.h> #include <DataTypes/DataTypeString.h>
@ -12,20 +10,16 @@
#include <Interpreters/SelectQueryOptions.h> #include <Interpreters/SelectQueryOptions.h>
#include <Interpreters/InterpreterSelectQuery.h> #include <Interpreters/InterpreterSelectQuery.h>
#include <Interpreters/getTableExpressions.h> #include <Interpreters/getTableExpressions.h>
#include <Processors/Transforms/AddingDefaultsTransform.h>
#include <QueryPipeline/narrowBlockInputStreams.h> #include <QueryPipeline/narrowBlockInputStreams.h>
#include <QueryPipeline/Pipe.h> #include <QueryPipeline/Pipe.h>
#include <QueryPipeline/RemoteQueryExecutor.h>
#include <Processors/Transforms/AddingDefaultsTransform.h>
#include <Processors/Sources/RemoteSource.h> #include <Processors/Sources/RemoteSource.h>
#include <QueryPipeline/RemoteQueryExecutor.h>
#include <Parsers/queryToString.h> #include <Parsers/queryToString.h>
#include <Parsers/ASTTablesInSelectQuery.h> #include <Parsers/ASTTablesInSelectQuery.h>
#include <Storages/IStorage.h> #include <Storages/IStorage.h>
#include <Storages/SelectQueryInfo.h> #include <Storages/SelectQueryInfo.h>
#include <Storages/HDFS/HDFSCommon.h> #include <Storages/HDFS/StorageHDFSCluster.h>
#include <memory> #include <memory>
@ -34,7 +28,6 @@ namespace DB
{ {
StorageHDFSCluster::StorageHDFSCluster( StorageHDFSCluster::StorageHDFSCluster(
ContextPtr context_,
String cluster_name_, String cluster_name_,
const String & uri_, const String & uri_,
const StorageID & table_id_, const StorageID & table_id_,
@ -48,19 +41,8 @@ StorageHDFSCluster::StorageHDFSCluster(
, format_name(format_name_) , format_name(format_name_)
, compression_method(compression_method_) , compression_method(compression_method_)
{ {
context_->getRemoteHostFilter().checkURL(Poco::URI(uri_));
checkHDFSURL(uri_);
StorageInMemoryMetadata storage_metadata; StorageInMemoryMetadata storage_metadata;
if (columns_.empty())
{
auto columns = StorageHDFS::getTableStructureFromData(format_name, uri_, compression_method, context_);
storage_metadata.setColumns(columns);
}
else
storage_metadata.setColumns(columns_); storage_metadata.setColumns(columns_);
storage_metadata.setConstraints(constraints_); storage_metadata.setConstraints(constraints_);
setInMemoryMetadata(storage_metadata); setInMemoryMetadata(storage_metadata);
} }

View File

@ -34,7 +34,6 @@ public:
protected: protected:
StorageHDFSCluster( StorageHDFSCluster(
ContextPtr context_,
String cluster_name_, String cluster_name_,
const String & uri_, const String & uri_,
const StorageID & table_id_, const StorageID & table_id_,

View File

@ -31,15 +31,13 @@ namespace DB
namespace ErrorCodes namespace ErrorCodes
{ {
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int BAD_GET;
} }
void TableFunctionHDFSCluster::parseArguments(const ASTPtr & ast_function, ContextPtr context) void TableFunctionHDFSCluster::parseArguments(const ASTPtr & ast_function, ContextPtr context)
{ {
auto ast_copy = ast_function->clone();
/// Parse args /// Parse args
ASTs & args_func = ast_copy->children; ASTs & args_func = ast_function->children;
if (args_func.size() != 1) if (args_func.size() != 1)
throw Exception("Table function '" + getName() + "' must have arguments.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); throw Exception("Table function '" + getName() + "' must have arguments.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
@ -48,50 +46,41 @@ void TableFunctionHDFSCluster::parseArguments(const ASTPtr & ast_function, Conte
const auto message = fmt::format( const auto message = fmt::format(
"The signature of table function {} shall be the following:\n" \ "The signature of table function {} shall be the following:\n" \
" - cluster, uri\n",\ " - cluster, uri, format, structure",
" - cluster, format\n",\
" - cluster, uri, format, structure\n",\
" - cluster, uri, format, structure, compression_method", " - cluster, uri, format, structure, compression_method",
getName()); getName());
if (args.size() < 2 || args.size() > 5) if (args.size() < 4 || args.size() > 5)
throw Exception(message, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); throw Exception(message, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
for (auto & arg : args) for (auto & arg : args)
arg = evaluateConstantExpressionOrIdentifierAsLiteral(arg, context); arg = evaluateConstantExpressionOrIdentifierAsLiteral(arg, context);
/// This argument is always the first /// This arguments are always the first
cluster_name = args[0]->as<ASTLiteral &>().value.safeGet<String>(); cluster_name = args[0]->as<ASTLiteral &>().value.safeGet<String>();
uri = args[1]->as<ASTLiteral &>().value.safeGet<String>();
if (!context->tryGetCluster(cluster_name)) format = args[2]->as<ASTLiteral &>().value.safeGet<String>();
throw Exception(ErrorCodes::BAD_GET, "Requested cluster '{}' not found", cluster_name); structure = args[3]->as<ASTLiteral &>().value.safeGet<String>();
if (args.size() >= 5)
/// Just cut the first arg (cluster_name) and try to parse other table function arguments as is compression_method = args[4]->as<ASTLiteral &>().value.safeGet<String>();
args.erase(args.begin());
ITableFunctionFileLike::parseArguments(ast_copy, context);
} }
ColumnsDescription TableFunctionHDFSCluster::getActualTableStructure(ContextPtr context) const ColumnsDescription TableFunctionHDFSCluster::getActualTableStructure(ContextPtr context) const
{ {
if (structure == "auto")
return StorageHDFS::getTableStructureFromData(format, filename, compression_method, context);
return parseColumnsListFromString(structure, context); return parseColumnsListFromString(structure, context);
} }
StoragePtr TableFunctionHDFSCluster::executeImpl(
StoragePtr TableFunctionHDFSCluster::getStorage( const ASTPtr & /*function*/, ContextPtr context,
const String & /*source*/, const String & /*format_*/, const ColumnsDescription &, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/) const
const std::string & table_name, const String & /*compression_method_*/) const
{ {
StoragePtr storage; StoragePtr storage;
if (context->getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY) if (context->getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY)
{ {
/// On worker node this uri won't contains globs /// On worker node this uri won't contains globs
storage = StorageHDFS::create( storage = StorageHDFS::create(
filename, uri,
StorageID(getDatabaseName(), table_name), StorageID(getDatabaseName(), table_name),
format, format,
getActualTableStructure(context), getActualTableStructure(context),
@ -105,19 +94,17 @@ StoragePtr TableFunctionHDFSCluster::getStorage(
else else
{ {
storage = StorageHDFSCluster::create( storage = StorageHDFSCluster::create(
context, cluster_name, uri, StorageID(getDatabaseName(), table_name),
cluster_name, format, getActualTableStructure(context), ConstraintsDescription{},
filename, compression_method);
StorageID(getDatabaseName(), table_name),
format,
getActualTableStructure(context),
ConstraintsDescription{},
compression_method
);
} }
storage->startup();
return storage; return storage;
} }
void registerTableFunctionHDFSCluster(TableFunctionFactory & factory) void registerTableFunctionHDFSCluster(TableFunctionFactory & factory)
{ {
factory.registerFunction<TableFunctionHDFSCluster>(); factory.registerFunction<TableFunctionHDFSCluster>();

View File

@ -4,7 +4,7 @@
#if USE_HDFS #if USE_HDFS
#include <TableFunctions/ITableFunctionFileLike.h> #include <TableFunctions/ITableFunction.h>
namespace DB namespace DB
@ -20,7 +20,7 @@ class Context;
* On worker node it asks initiator about next task to process, processes it. * On worker node it asks initiator about next task to process, processes it.
* This is repeated until the tasks are finished. * This is repeated until the tasks are finished.
*/ */
class TableFunctionHDFSCluster : public ITableFunctionFileLike class TableFunctionHDFSCluster : public ITableFunction
{ {
public: public:
static constexpr auto name = "hdfsCluster"; static constexpr auto name = "hdfsCluster";
@ -31,9 +31,11 @@ public:
bool hasStaticStructure() const override { return true; } bool hasStaticStructure() const override { return true; }
protected: protected:
StoragePtr getStorage( StoragePtr executeImpl(
const String & source, const String & format_, const ColumnsDescription & columns, ContextPtr global_context, const ASTPtr & ast_function,
const std::string & table_name, const String & compression_method_) const override; ContextPtr context,
const std::string & table_name,
ColumnsDescription cached_columns) const override;
const char * getStorageTypeName() const override { return "HDFSCluster"; } const char * getStorageTypeName() const override { return "HDFSCluster"; }
@ -41,6 +43,10 @@ protected:
void parseArguments(const ASTPtr &, ContextPtr) override; void parseArguments(const ASTPtr &, ContextPtr) override;
String cluster_name; String cluster_name;
String uri;
String format;
String structure;
String compression_method = "auto";
}; };
} }

View File

@ -1,48 +0,0 @@
1 2 3
4 5 6
7 8 9
1 2 3
4 5 6
7 8 9
1 2 3
4 5 6
7 8 9
1 2 3
4 5 6
7 8 9
1 2 3
4 5 6
7 8 9
1 2 3
4 5 6
7 8 9
1 2 3
4 5 6
7 8 9
1 2 3
4 5 6
7 8 9
c1 Nullable(String)
c2 Nullable(String)
c3 Nullable(String)
c1 Nullable(String)
c2 Nullable(String)
c3 Nullable(String)
c1 UInt32
c2 UInt32
c3 UInt32
c1 UInt32
c2 UInt32
c3 UInt32
c1 Nullable(String)
c2 Nullable(String)
c3 Nullable(String)
c1 Nullable(String)
c2 Nullable(String)
c3 Nullable(String)
c1 UInt32
c2 UInt32
c3 UInt32
c1 UInt32
c2 UInt32
c3 UInt32

View File

@ -1,26 +0,0 @@
-- Tags: no-fasttest, no-parallel, no-cpu-aarch64
-- Tag no-fasttest: Depends on Java
insert into table function hdfs('hdfs://localhost:12222/test_1.tsv', 'TSV', 'column1 UInt32, column2 UInt32, column3 UInt32') select 1, 2, 3 settings hdfs_truncate_on_insert=1;
insert into table function hdfs('hdfs://localhost:12222/test_2.tsv', 'TSV', 'column1 UInt32, column2 UInt32, column3 UInt32') select 4, 5, 6 settings hdfs_truncate_on_insert=1;
insert into table function hdfs('hdfs://localhost:12222/test_3.tsv', 'TSV', 'column1 UInt32, column2 UInt32, column3 UInt32') select 7, 8, 9 settings hdfs_truncate_on_insert=1;
select * from hdfs('hdfs://localhost:12222/test_{1,2,3}.tsv') order by c1, c2, c3;
select * from hdfs('hdfs://localhost:12222/test_{1,2,3}.tsv', 'TSV') order by c1, c2, c3;
select * from hdfs('hdfs://localhost:12222/test_{1,2,3}.tsv', 'TSV', 'c1 UInt32, c2 UInt32, c3 UInt32') order by c1, c2, c3;
select * from hdfs('hdfs://localhost:12222/test_{1,2,3}.tsv', 'TSV', 'c1 UInt32, c2 UInt32, c3 UInt32', 'auto') order by c1, c2, c3;
select * from hdfsCluster('test_cluster_two_shards_localhost', 'hdfs://localhost:12222/test_{1,2,3}.tsv') order by c1, c2, c3;
select * from hdfsCluster('test_cluster_two_shards_localhost', 'hdfs://localhost:12222/test_{1,2,3}.tsv', 'TSV') order by c1, c2, c3;
select * from hdfsCluster('test_cluster_two_shards_localhost', 'hdfs://localhost:12222/test_{1,2,3}.tsv', 'TSV', 'c1 UInt32, c2 UInt32, c3 UInt32') order by c1, c2, c3;
select * from hdfsCluster('test_cluster_two_shards_localhost', 'hdfs://localhost:12222/test_{1,2,3}.tsv', 'TSV', 'c1 UInt32, c2 UInt32, c3 UInt32', 'auto') order by c1, c2, c3;
desc hdfs('hdfs://localhost:12222/test_{1,2,3}.tsv');
desc hdfs('hdfs://localhost:12222/test_{1,2,3}.tsv', 'TSV');
desc hdfs('hdfs://localhost:12222/test_{1,2,3}.tsv', 'TSV', 'c1 UInt32, c2 UInt32, c3 UInt32');
desc hdfs('hdfs://localhost:12222/test_{1,2,3}.tsv', 'TSV', 'c1 UInt32, c2 UInt32, c3 UInt32', 'auto');
desc hdfsCluster('test_cluster_two_shards_localhost', 'hdfs://localhost:12222/test_{1,2,3}.tsv');
desc hdfsCluster('test_cluster_two_shards_localhost', 'hdfs://localhost:12222/test_{1,2,3}.tsv', 'TSV');
desc hdfsCluster('test_cluster_two_shards_localhost', 'hdfs://localhost:12222/test_{1,2,3}.tsv', 'TSV', 'c1 UInt32, c2 UInt32, c3 UInt32');
desc hdfsCluster('test_cluster_two_shards_localhost', 'hdfs://localhost:12222/test_{1,2,3}.tsv', 'TSV', 'c1 UInt32, c2 UInt32, c3 UInt32', 'auto');