Added support for schema inference for hdfsCluster (#35602)

This commit is contained in:
Nikita Mikhaylov 2022-03-31 12:47:36 +02:00 committed by GitHub
parent 8f0919d2a2
commit f6bfdcc0c9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 168 additions and 38 deletions

View File

@ -32,7 +32,9 @@ RUN apt-get update -y \
mysql-client=8.0* \
postgresql-client \
sqlite3 \
awscli
awscli \
openjdk-11-jre-headless
RUN pip3 install numpy scipy pandas Jinja2
@ -58,10 +60,16 @@ RUN arch=${TARGETARCH:-amd64} \
&& wget "https://dl.min.io/client/mc/release/linux-${arch}/mc" \
&& chmod +x ./mc
RUN wget 'https://dlcdn.apache.org/hadoop/common/hadoop-3.2.2/hadoop-3.2.2.tar.gz' \
&& tar -xvf hadoop-3.2.2.tar.gz \
&& rm -rf hadoop-3.2.2.tar.gz
ENV MINIO_ROOT_USER="clickhouse"
ENV MINIO_ROOT_PASSWORD="clickhouse"
ENV EXPORT_S3_STORAGE_POLICIES=1
COPY run.sh /
COPY setup_minio.sh /
COPY setup_hdfs_minicluster.sh /
CMD ["/bin/bash", "/run.sh"]

View File

@ -19,6 +19,7 @@ ln -s /usr/share/clickhouse-test/clickhouse-test /usr/bin/clickhouse-test
/usr/share/clickhouse-test/config/install.sh
./setup_minio.sh
./setup_hdfs_minicluster.sh
# For flaky check we also enable thread fuzzer
if [ "$NUM_TRIES" -gt "1" ]; then

View File

@ -0,0 +1,20 @@
#!/bin/bash
set -e -x -a -u
ls -lha
cd hadoop-3.2.2
export JAVA_HOME=/usr
mkdir -p target/test/data
chown clickhouse ./target/test/data
sudo -E -u clickhouse bin/mapred minicluster -format -nomr -nnport 12222 &
while ! nc -z localhost 12222; do
sleep 1
done
lsof -i :12222
sleep 5

View File

@ -15,6 +15,7 @@
#include <Processors/Transforms/AddingDefaultsTransform.h>
#include <IO/WriteHelpers.h>
#include <IO/CompressionMethod.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Interpreters/ExpressionAnalyzer.h>
@ -247,7 +248,7 @@ public:
{
auto path_and_uri = getPathFromUriAndUriWithoutPath(uris_[0]);
HDFSBuilderWrapper builder = createHDFSBuilder(path_and_uri.second + "/", context->getGlobalContext()->getConfigRef());
HDFSFSPtr fs = createHDFSFS(builder.get());
auto fs = createHDFSFS(builder.get());
for (const auto & uri : uris_)
{
path_and_uri = getPathFromUriAndUriWithoutPath(uri);
@ -656,7 +657,7 @@ void StorageHDFS::truncate(const ASTPtr & /* query */, const StorageMetadataPtr
const String url = uris[0].substr(0, begin_of_path);
HDFSBuilderWrapper builder = createHDFSBuilder(url + "/", local_context->getGlobalContext()->getConfigRef());
HDFSFSPtr fs = createHDFSFS(builder.get());
auto fs = createHDFSFS(builder.get());
for (const auto & uri : uris)
{

View File

@ -2,6 +2,8 @@
#if USE_HDFS
#include <Storages/HDFS/StorageHDFSCluster.h>
#include <Client/Connection.h>
#include <Core/QueryProcessingStage.h>
#include <DataTypes/DataTypeString.h>
@ -10,16 +12,20 @@
#include <Interpreters/SelectQueryOptions.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <Interpreters/getTableExpressions.h>
#include <Processors/Transforms/AddingDefaultsTransform.h>
#include <QueryPipeline/narrowBlockInputStreams.h>
#include <QueryPipeline/Pipe.h>
#include <Processors/Sources/RemoteSource.h>
#include <QueryPipeline/RemoteQueryExecutor.h>
#include <Processors/Transforms/AddingDefaultsTransform.h>
#include <Processors/Sources/RemoteSource.h>
#include <Parsers/queryToString.h>
#include <Parsers/ASTTablesInSelectQuery.h>
#include <Storages/IStorage.h>
#include <Storages/SelectQueryInfo.h>
#include <Storages/HDFS/StorageHDFSCluster.h>
#include <Storages/HDFS/HDFSCommon.h>
#include <memory>
@ -28,6 +34,7 @@ namespace DB
{
StorageHDFSCluster::StorageHDFSCluster(
ContextPtr context_,
String cluster_name_,
const String & uri_,
const StorageID & table_id_,
@ -41,8 +48,19 @@ StorageHDFSCluster::StorageHDFSCluster(
, format_name(format_name_)
, compression_method(compression_method_)
{
context_->getRemoteHostFilter().checkURL(Poco::URI(uri_));
checkHDFSURL(uri_);
StorageInMemoryMetadata storage_metadata;
storage_metadata.setColumns(columns_);
if (columns_.empty())
{
auto columns = StorageHDFS::getTableStructureFromData(format_name, uri_, compression_method, context_);
storage_metadata.setColumns(columns);
}
else
storage_metadata.setColumns(columns_);
storage_metadata.setConstraints(constraints_);
setInMemoryMetadata(storage_metadata);
}

View File

@ -34,6 +34,7 @@ public:
protected:
StorageHDFSCluster(
ContextPtr context_,
String cluster_name_,
const String & uri_,
const StorageID & table_id_,

View File

@ -31,13 +31,15 @@ namespace DB
namespace ErrorCodes
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int BAD_GET;
}
void TableFunctionHDFSCluster::parseArguments(const ASTPtr & ast_function, ContextPtr context)
{
auto ast_copy = ast_function->clone();
/// Parse args
ASTs & args_func = ast_function->children;
ASTs & args_func = ast_copy->children;
if (args_func.size() != 1)
throw Exception("Table function '" + getName() + "' must have arguments.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
@ -46,41 +48,50 @@ void TableFunctionHDFSCluster::parseArguments(const ASTPtr & ast_function, Conte
const auto message = fmt::format(
"The signature of table function {} shall be the following:\n" \
" - cluster, uri, format, structure",
" - cluster, uri\n",\
" - cluster, format\n",\
" - cluster, uri, format, structure\n",\
" - cluster, uri, format, structure, compression_method",
getName());
if (args.size() < 4 || args.size() > 5)
if (args.size() < 2 || args.size() > 5)
throw Exception(message, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
for (auto & arg : args)
arg = evaluateConstantExpressionOrIdentifierAsLiteral(arg, context);
/// This arguments are always the first
/// This argument is always the first
cluster_name = args[0]->as<ASTLiteral &>().value.safeGet<String>();
uri = args[1]->as<ASTLiteral &>().value.safeGet<String>();
format = args[2]->as<ASTLiteral &>().value.safeGet<String>();
structure = args[3]->as<ASTLiteral &>().value.safeGet<String>();
if (args.size() >= 5)
compression_method = args[4]->as<ASTLiteral &>().value.safeGet<String>();
if (!context->tryGetCluster(cluster_name))
throw Exception(ErrorCodes::BAD_GET, "Requested cluster '{}' not found", cluster_name);
/// Just cut the first arg (cluster_name) and try to parse other table function arguments as is
args.erase(args.begin());
ITableFunctionFileLike::parseArguments(ast_copy, context);
}
ColumnsDescription TableFunctionHDFSCluster::getActualTableStructure(ContextPtr context) const
{
if (structure == "auto")
return StorageHDFS::getTableStructureFromData(format, filename, compression_method, context);
return parseColumnsListFromString(structure, context);
}
StoragePtr TableFunctionHDFSCluster::executeImpl(
const ASTPtr & /*function*/, ContextPtr context,
const std::string & table_name, ColumnsDescription /*cached_columns*/) const
StoragePtr TableFunctionHDFSCluster::getStorage(
const String & /*source*/, const String & /*format_*/, const ColumnsDescription &, ContextPtr context,
const std::string & table_name, const String & /*compression_method_*/) const
{
StoragePtr storage;
if (context->getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY)
{
/// On worker node this uri won't contains globs
storage = StorageHDFS::create(
uri,
filename,
StorageID(getDatabaseName(), table_name),
format,
getActualTableStructure(context),
@ -94,17 +105,19 @@ StoragePtr TableFunctionHDFSCluster::executeImpl(
else
{
storage = StorageHDFSCluster::create(
cluster_name, uri, StorageID(getDatabaseName(), table_name),
format, getActualTableStructure(context), ConstraintsDescription{},
compression_method);
context,
cluster_name,
filename,
StorageID(getDatabaseName(), table_name),
format,
getActualTableStructure(context),
ConstraintsDescription{},
compression_method
);
}
storage->startup();
return storage;
}
void registerTableFunctionHDFSCluster(TableFunctionFactory & factory)
{
factory.registerFunction<TableFunctionHDFSCluster>();

View File

@ -4,7 +4,7 @@
#if USE_HDFS
#include <TableFunctions/ITableFunction.h>
#include <TableFunctions/ITableFunctionFileLike.h>
namespace DB
@ -20,7 +20,7 @@ class Context;
* On worker node it asks initiator about next task to process, processes it.
* This is repeated until the tasks are finished.
*/
class TableFunctionHDFSCluster : public ITableFunction
class TableFunctionHDFSCluster : public ITableFunctionFileLike
{
public:
static constexpr auto name = "hdfsCluster";
@ -31,11 +31,9 @@ public:
bool hasStaticStructure() const override { return true; }
protected:
StoragePtr executeImpl(
const ASTPtr & ast_function,
ContextPtr context,
const std::string & table_name,
ColumnsDescription cached_columns) const override;
StoragePtr getStorage(
const String & source, const String & format_, const ColumnsDescription & columns, ContextPtr global_context,
const std::string & table_name, const String & compression_method_) const override;
const char * getStorageTypeName() const override { return "HDFSCluster"; }
@ -43,10 +41,6 @@ protected:
void parseArguments(const ASTPtr &, ContextPtr) override;
String cluster_name;
String uri;
String format;
String structure;
String compression_method = "auto";
};
}

View File

@ -0,0 +1,48 @@
1 2 3
4 5 6
7 8 9
1 2 3
4 5 6
7 8 9
1 2 3
4 5 6
7 8 9
1 2 3
4 5 6
7 8 9
1 2 3
4 5 6
7 8 9
1 2 3
4 5 6
7 8 9
1 2 3
4 5 6
7 8 9
1 2 3
4 5 6
7 8 9
c1 Nullable(String)
c2 Nullable(String)
c3 Nullable(String)
c1 Nullable(String)
c2 Nullable(String)
c3 Nullable(String)
c1 UInt32
c2 UInt32
c3 UInt32
c1 UInt32
c2 UInt32
c3 UInt32
c1 Nullable(String)
c2 Nullable(String)
c3 Nullable(String)
c1 Nullable(String)
c2 Nullable(String)
c3 Nullable(String)
c1 UInt32
c2 UInt32
c3 UInt32
c1 UInt32
c2 UInt32
c3 UInt32

View File

@ -0,0 +1,26 @@
-- Tags: no-fasttest, no-parallel, no-cpu-aarch64
-- Tag no-fasttest: Depends on Java
insert into table function hdfs('hdfs://localhost:12222/test_1.tsv', 'TSV', 'column1 UInt32, column2 UInt32, column3 UInt32') select 1, 2, 3 settings hdfs_truncate_on_insert=1;
insert into table function hdfs('hdfs://localhost:12222/test_2.tsv', 'TSV', 'column1 UInt32, column2 UInt32, column3 UInt32') select 4, 5, 6 settings hdfs_truncate_on_insert=1;
insert into table function hdfs('hdfs://localhost:12222/test_3.tsv', 'TSV', 'column1 UInt32, column2 UInt32, column3 UInt32') select 7, 8, 9 settings hdfs_truncate_on_insert=1;
select * from hdfs('hdfs://localhost:12222/test_{1,2,3}.tsv') order by c1, c2, c3;
select * from hdfs('hdfs://localhost:12222/test_{1,2,3}.tsv', 'TSV') order by c1, c2, c3;
select * from hdfs('hdfs://localhost:12222/test_{1,2,3}.tsv', 'TSV', 'c1 UInt32, c2 UInt32, c3 UInt32') order by c1, c2, c3;
select * from hdfs('hdfs://localhost:12222/test_{1,2,3}.tsv', 'TSV', 'c1 UInt32, c2 UInt32, c3 UInt32', 'auto') order by c1, c2, c3;
select * from hdfsCluster('test_cluster_two_shards_localhost', 'hdfs://localhost:12222/test_{1,2,3}.tsv') order by c1, c2, c3;
select * from hdfsCluster('test_cluster_two_shards_localhost', 'hdfs://localhost:12222/test_{1,2,3}.tsv', 'TSV') order by c1, c2, c3;
select * from hdfsCluster('test_cluster_two_shards_localhost', 'hdfs://localhost:12222/test_{1,2,3}.tsv', 'TSV', 'c1 UInt32, c2 UInt32, c3 UInt32') order by c1, c2, c3;
select * from hdfsCluster('test_cluster_two_shards_localhost', 'hdfs://localhost:12222/test_{1,2,3}.tsv', 'TSV', 'c1 UInt32, c2 UInt32, c3 UInt32', 'auto') order by c1, c2, c3;
desc hdfs('hdfs://localhost:12222/test_{1,2,3}.tsv');
desc hdfs('hdfs://localhost:12222/test_{1,2,3}.tsv', 'TSV');
desc hdfs('hdfs://localhost:12222/test_{1,2,3}.tsv', 'TSV', 'c1 UInt32, c2 UInt32, c3 UInt32');
desc hdfs('hdfs://localhost:12222/test_{1,2,3}.tsv', 'TSV', 'c1 UInt32, c2 UInt32, c3 UInt32', 'auto');
desc hdfsCluster('test_cluster_two_shards_localhost', 'hdfs://localhost:12222/test_{1,2,3}.tsv');
desc hdfsCluster('test_cluster_two_shards_localhost', 'hdfs://localhost:12222/test_{1,2,3}.tsv', 'TSV');
desc hdfsCluster('test_cluster_two_shards_localhost', 'hdfs://localhost:12222/test_{1,2,3}.tsv', 'TSV', 'c1 UInt32, c2 UInt32, c3 UInt32');
desc hdfsCluster('test_cluster_two_shards_localhost', 'hdfs://localhost:12222/test_{1,2,3}.tsv', 'TSV', 'c1 UInt32, c2 UInt32, c3 UInt32', 'auto');