Merge pull request #43323 from ClickHouse/fix-describe-delta-lake-hudi

Fix `DESCRIBE` for `deltaLake` and `hudi` table functions
This commit is contained in:
Antonio Andelic 2022-11-22 09:09:39 +01:00 committed by GitHub
commit 843401c3a3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 287 additions and 174 deletions

View File

@ -1,7 +1,7 @@
#include "config.h"
#if USE_AWS_S3
#include <Storages/StorageDelta.h>
#include <Storages/StorageDeltaLake.h>
#include <Common/logger_useful.h>
#include <IO/ReadBufferFromS3.h>
@ -47,7 +47,7 @@ void DeltaLakeMetadata::remove(const String & filename, uint64_t /*timestamp */)
throw Exception(ErrorCodes::INCORRECT_DATA, "Invalid table metadata, tried to remove {} before adding it", filename);
}
std::vector<String> DeltaLakeMetadata::ListCurrentFiles() &&
std::vector<String> DeltaLakeMetadata::listCurrentFiles() &&
{
std::vector<String> keys;
keys.reserve(file_update_time.size());
@ -61,10 +61,10 @@ std::vector<String> DeltaLakeMetadata::ListCurrentFiles() &&
JsonMetadataGetter::JsonMetadataGetter(StorageS3::S3Configuration & configuration_, const String & table_path_, ContextPtr context)
: base_configuration(configuration_), table_path(table_path_)
{
Init(context);
init(context);
}
void JsonMetadataGetter::Init(ContextPtr context)
void JsonMetadataGetter::init(ContextPtr context)
{
auto keys = getJsonLogFiles();
@ -180,7 +180,53 @@ void JsonMetadataGetter::handleJSON(const JSON & json)
}
}
StorageDelta::StorageDelta(
namespace
{
StorageS3::S3Configuration getBaseConfiguration(const StorageS3Configuration & configuration)
{
return {configuration.url, configuration.auth_settings, configuration.request_settings, configuration.headers};
}
// DeltaLake stores data in parts in different files
// keys is vector of parts with latest version
// generateQueryFromKeys constructs query from parts filenames for
// underlying StorageS3 engine
String generateQueryFromKeys(const std::vector<String> & keys)
{
std::string new_query = fmt::format("{{{}}}", fmt::join(keys, ","));
return new_query;
}
StorageS3Configuration getAdjustedS3Configuration(
const ContextPtr & context,
StorageS3::S3Configuration & base_configuration,
const StorageS3Configuration & configuration,
const std::string & table_path,
Poco::Logger * log)
{
JsonMetadataGetter getter{base_configuration, table_path, context};
auto keys = getter.getFiles();
auto new_uri = base_configuration.uri.uri.toString() + generateQueryFromKeys(keys);
LOG_DEBUG(log, "New uri: {}", new_uri);
LOG_DEBUG(log, "Table path: {}", table_path);
// set new url in configuration
StorageS3Configuration new_configuration;
new_configuration.url = new_uri;
new_configuration.auth_settings.access_key_id = configuration.auth_settings.access_key_id;
new_configuration.auth_settings.secret_access_key = configuration.auth_settings.secret_access_key;
new_configuration.format = configuration.format;
return new_configuration;
}
}
StorageDeltaLake::StorageDeltaLake(
const StorageS3Configuration & configuration_,
const StorageID & table_id_,
ColumnsDescription columns_,
@ -189,28 +235,14 @@ StorageDelta::StorageDelta(
ContextPtr context_,
std::optional<FormatSettings> format_settings_)
: IStorage(table_id_)
, base_configuration{configuration_.url, configuration_.auth_settings, configuration_.request_settings, configuration_.headers}
, base_configuration{getBaseConfiguration(configuration_)}
, log(&Poco::Logger::get("StorageDeltaLake (" + table_id_.table_name + ")"))
, table_path(base_configuration.uri.key)
{
StorageInMemoryMetadata storage_metadata;
StorageS3::updateS3Configuration(context_, base_configuration);
JsonMetadataGetter getter{base_configuration, table_path, context_};
auto keys = getter.getFiles();
auto new_uri = base_configuration.uri.uri.toString() + generateQueryFromKeys(std::move(keys));
LOG_DEBUG(log, "New uri: {}", new_uri);
LOG_DEBUG(log, "Table path: {}", table_path);
// set new url in configuration
StorageS3Configuration new_configuration;
new_configuration.url = new_uri;
new_configuration.auth_settings.access_key_id = configuration_.auth_settings.access_key_id;
new_configuration.auth_settings.secret_access_key = configuration_.auth_settings.secret_access_key;
new_configuration.format = configuration_.format;
auto new_configuration = getAdjustedS3Configuration(context_, base_configuration, configuration_, table_path, log);
if (columns_.empty())
{
@ -238,7 +270,7 @@ StorageDelta::StorageDelta(
nullptr);
}
Pipe StorageDelta::read(
Pipe StorageDeltaLake::read(
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
@ -252,16 +284,18 @@ Pipe StorageDelta::read(
return s3engine->read(column_names, storage_snapshot, query_info, context, processed_stage, max_block_size, num_streams);
}
String StorageDelta::generateQueryFromKeys(std::vector<String> && keys)
ColumnsDescription StorageDeltaLake::getTableStructureFromData(
const StorageS3Configuration & configuration, const std::optional<FormatSettings> & format_settings, ContextPtr ctx)
{
// DeltaLake store data parts in different files
// keys are filenames of parts
// for StorageS3 to read all parts we need format {key1,key2,key3,...keyn}
std::string new_query = fmt::format("{{{}}}", fmt::join(keys, ","));
return new_query;
auto base_configuration = getBaseConfiguration(configuration);
StorageS3::updateS3Configuration(ctx, base_configuration);
auto new_configuration = getAdjustedS3Configuration(
ctx, base_configuration, configuration, base_configuration.uri.key, &Poco::Logger::get("StorageDeltaLake"));
return StorageS3::getTableStructureFromData(
new_configuration, /*distributed processing*/ false, format_settings, ctx, /*object_infos*/ nullptr);
}
void registerStorageDelta(StorageFactory & factory)
void registerStorageDeltaLake(StorageFactory & factory)
{
factory.registerStorage(
"DeltaLake",
@ -287,7 +321,7 @@ void registerStorageDelta(StorageFactory & factory)
configuration.format = "Parquet";
}
return std::make_shared<StorageDelta>(
return std::make_shared<StorageDeltaLake>(
configuration, args.table_id, args.columns, args.constraints, args.comment, args.getContext(), std::nullopt);
},
{

View File

@ -32,7 +32,7 @@ public:
void setLastModifiedTime(const String & filename, uint64_t timestamp);
void remove(const String & filename, uint64_t timestamp);
std::vector<String> ListCurrentFiles() &&;
std::vector<String> listCurrentFiles() &&;
private:
std::unordered_map<String, uint64_t> file_update_time;
@ -44,10 +44,10 @@ class JsonMetadataGetter
public:
JsonMetadataGetter(StorageS3::S3Configuration & configuration_, const String & table_path_, ContextPtr context);
std::vector<String> getFiles() { return std::move(metadata).ListCurrentFiles(); }
std::vector<String> getFiles() { return std::move(metadata).listCurrentFiles(); }
private:
void Init(ContextPtr context);
void init(ContextPtr context);
std::vector<String> getJsonLogFiles();
@ -60,13 +60,13 @@ private:
DeltaLakeMetadata metadata;
};
class StorageDelta : public IStorage
class StorageDeltaLake : public IStorage
{
public:
// 1. Parses internal file structure of table
// 2. Finds out parts with latest version
// 3. Creates url for underlying StorageS3 enigne to handle reads
StorageDelta(
StorageDeltaLake(
const StorageS3Configuration & configuration_,
const StorageID & table_id_,
ColumnsDescription columns_,
@ -87,14 +87,12 @@ public:
size_t max_block_size,
size_t num_streams) override;
static ColumnsDescription getTableStructureFromData(
const StorageS3Configuration & configuration,
const std::optional<FormatSettings> & format_settings,
ContextPtr ctx);
private:
void Init();
// DeltaLake stores data in parts in different files
// keys is vector of parts with latest version
// generateQueryFromKeys constructs query from parts filenames for
// underlying StorageS3 engine
static String generateQueryFromKeys(std::vector<String> && keys);
void init();
StorageS3::S3Configuration base_configuration;
std::shared_ptr<StorageS3> s3engine;

View File

@ -28,115 +28,20 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
StorageHudi::StorageHudi(
const StorageS3Configuration & configuration_,
const StorageID & table_id_,
ColumnsDescription columns_,
const ConstraintsDescription & constraints_,
const String & comment,
ContextPtr context_,
std::optional<FormatSettings> format_settings_)
: IStorage(table_id_)
, base_configuration{configuration_.url, configuration_.auth_settings, configuration_.request_settings, configuration_.headers}
, log(&Poco::Logger::get("StorageHudi (" + table_id_.table_name + ")"))
, table_path(base_configuration.uri.key)
namespace
{
StorageInMemoryMetadata storage_metadata;
StorageS3::updateS3Configuration(context_, base_configuration);
auto keys = getKeysFromS3();
auto new_uri = base_configuration.uri.uri.toString() + generateQueryFromKeys(keys, configuration_.format);
LOG_DEBUG(log, "New uri: {}", new_uri);
LOG_DEBUG(log, "Table path: {}", table_path);
StorageS3Configuration new_configuration;
new_configuration.url = new_uri;
new_configuration.auth_settings.access_key_id = configuration_.auth_settings.access_key_id;
new_configuration.auth_settings.secret_access_key = configuration_.auth_settings.secret_access_key;
new_configuration.format = configuration_.format;
if (columns_.empty())
{
columns_ = StorageS3::getTableStructureFromData(
new_configuration, /*distributed processing*/ false, format_settings_, context_, nullptr);
storage_metadata.setColumns(columns_);
}
else
storage_metadata.setColumns(columns_);
storage_metadata.setConstraints(constraints_);
storage_metadata.setComment(comment);
setInMemoryMetadata(storage_metadata);
s3engine = std::make_shared<StorageS3>(
new_configuration,
table_id_,
columns_,
constraints_,
comment,
context_,
format_settings_,
/* distributed_processing_ */ false,
nullptr);
StorageS3::S3Configuration getBaseConfiguration(const StorageS3Configuration & configuration)
{
return {configuration.url, configuration.auth_settings, configuration.request_settings, configuration.headers};
}
Pipe StorageHudi::read(
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
size_t num_streams)
{
StorageS3::updateS3Configuration(context, base_configuration);
return s3engine->read(column_names, storage_snapshot, query_info, context, processed_stage, max_block_size, num_streams);
}
std::vector<std::string> StorageHudi::getKeysFromS3()
{
std::vector<std::string> keys;
const auto & client = base_configuration.client;
Aws::S3::Model::ListObjectsV2Request request;
Aws::S3::Model::ListObjectsV2Outcome outcome;
bool is_finished{false};
const auto bucket{base_configuration.uri.bucket};
request.SetBucket(bucket);
request.SetPrefix(table_path);
while (!is_finished)
{
outcome = client->ListObjectsV2(request);
if (!outcome.IsSuccess())
throw Exception(
ErrorCodes::S3_ERROR,
"Could not list objects in bucket {} with key {}, S3 exception: {}, message: {}",
quoteString(bucket),
quoteString(table_path),
backQuote(outcome.GetError().GetExceptionName()),
quoteString(outcome.GetError().GetMessage()));
const auto & result_batch = outcome.GetResult().GetContents();
for (const auto & obj : result_batch)
{
const auto & filename = obj.GetKey().substr(table_path.size()); /// Object name without tablepath prefix.
keys.push_back(filename);
LOG_DEBUG(log, "Found file: {}", filename);
}
request.SetContinuationToken(outcome.GetResult().GetNextContinuationToken());
is_finished = !outcome.GetResult().GetIsTruncated();
}
return keys;
}
String StorageHudi::generateQueryFromKeys(const std::vector<std::string> & keys, const String & format)
/// Apache Hudi store parts of data in different files.
/// Every part file has timestamp in it.
/// Every partition(directory) in Apache Hudi has different versions of part.
/// To find needed parts we need to find out latest part file for every partition.
/// Part format is usually parquet, but can differ.
String generateQueryFromKeys(const std::vector<std::string> & keys, const String & format)
{
/// For each partition path take only latest file.
struct FileInfo
@ -187,6 +92,138 @@ String StorageHudi::generateQueryFromKeys(const std::vector<std::string> & keys,
return "{" + list_of_keys + "}";
}
std::vector<std::string> getKeysFromS3(const StorageS3::S3Configuration & base_configuration, const std::string & table_path, Poco::Logger * log)
{
std::vector<std::string> keys;
const auto & client = base_configuration.client;
Aws::S3::Model::ListObjectsV2Request request;
Aws::S3::Model::ListObjectsV2Outcome outcome;
bool is_finished{false};
const auto bucket{base_configuration.uri.bucket};
request.SetBucket(bucket);
request.SetPrefix(table_path);
while (!is_finished)
{
outcome = client->ListObjectsV2(request);
if (!outcome.IsSuccess())
throw Exception(
ErrorCodes::S3_ERROR,
"Could not list objects in bucket {} with key {}, S3 exception: {}, message: {}",
quoteString(bucket),
quoteString(table_path),
backQuote(outcome.GetError().GetExceptionName()),
quoteString(outcome.GetError().GetMessage()));
const auto & result_batch = outcome.GetResult().GetContents();
for (const auto & obj : result_batch)
{
const auto & filename = obj.GetKey().substr(table_path.size()); /// Object name without tablepath prefix.
keys.push_back(filename);
LOG_DEBUG(log, "Found file: {}", filename);
}
request.SetContinuationToken(outcome.GetResult().GetNextContinuationToken());
is_finished = !outcome.GetResult().GetIsTruncated();
}
return keys;
}
StorageS3Configuration getAdjustedS3Configuration(
StorageS3::S3Configuration & base_configuration,
const StorageS3Configuration & configuration,
const std::string & table_path,
Poco::Logger * log)
{
auto keys = getKeysFromS3(base_configuration, table_path, log);
auto new_uri = base_configuration.uri.uri.toString() + generateQueryFromKeys(keys, configuration.format);
LOG_DEBUG(log, "New uri: {}", new_uri);
LOG_DEBUG(log, "Table path: {}", table_path);
StorageS3Configuration new_configuration;
new_configuration.url = new_uri;
new_configuration.auth_settings.access_key_id = configuration.auth_settings.access_key_id;
new_configuration.auth_settings.secret_access_key = configuration.auth_settings.secret_access_key;
new_configuration.format = configuration.format;
return new_configuration;
}
}
StorageHudi::StorageHudi(
const StorageS3Configuration & configuration_,
const StorageID & table_id_,
ColumnsDescription columns_,
const ConstraintsDescription & constraints_,
const String & comment,
ContextPtr context_,
std::optional<FormatSettings> format_settings_)
: IStorage(table_id_)
, base_configuration{getBaseConfiguration(configuration_)}
, log(&Poco::Logger::get("StorageHudi (" + table_id_.table_name + ")"))
, table_path(base_configuration.uri.key)
{
StorageInMemoryMetadata storage_metadata;
StorageS3::updateS3Configuration(context_, base_configuration);
auto new_configuration = getAdjustedS3Configuration(base_configuration, configuration_, table_path, log);
if (columns_.empty())
{
columns_ = StorageS3::getTableStructureFromData(
new_configuration, /*distributed processing*/ false, format_settings_, context_, nullptr);
storage_metadata.setColumns(columns_);
}
else
storage_metadata.setColumns(columns_);
storage_metadata.setConstraints(constraints_);
storage_metadata.setComment(comment);
setInMemoryMetadata(storage_metadata);
s3engine = std::make_shared<StorageS3>(
new_configuration,
table_id_,
columns_,
constraints_,
comment,
context_,
format_settings_,
/* distributed_processing_ */ false,
nullptr);
}
Pipe StorageHudi::read(
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
size_t num_streams)
{
StorageS3::updateS3Configuration(context, base_configuration);
return s3engine->read(column_names, storage_snapshot, query_info, context, processed_stage, max_block_size, num_streams);
}
ColumnsDescription StorageHudi::getTableStructureFromData(
const StorageS3Configuration & configuration, const std::optional<FormatSettings> & format_settings, ContextPtr ctx)
{
auto base_configuration = getBaseConfiguration(configuration);
StorageS3::updateS3Configuration(ctx, base_configuration);
auto new_configuration = getAdjustedS3Configuration(
base_configuration, configuration, base_configuration.uri.key, &Poco::Logger::get("StorageDeltaLake"));
return StorageS3::getTableStructureFromData(
new_configuration, /*distributed processing*/ false, format_settings, ctx, /*object_infos*/ nullptr);
}
void registerStorageHudi(StorageFactory & factory)
{

View File

@ -48,16 +48,11 @@ public:
size_t max_block_size,
size_t num_streams) override;
static ColumnsDescription getTableStructureFromData(
const StorageS3Configuration & configuration,
const std::optional<FormatSettings> & format_settings,
ContextPtr ctx);
private:
std::vector<std::string> getKeysFromS3();
/// Apache Hudi store parts of data in different files.
/// Every part file has timestamp in it.
/// Every partition(directory) in Apache Hudi has different versions of part.
/// To find needed parts we need to find out latest part file for every partition.
/// Part format is usually parquet, but can differ.
static String generateQueryFromKeys(const std::vector<std::string> & keys, const String & format);
StorageS3::S3Configuration base_configuration;
std::shared_ptr<StorageS3> s3engine;
Poco::Logger * log;

View File

@ -214,7 +214,7 @@ private:
friend class StorageS3Cluster;
friend class TableFunctionS3Cluster;
friend class StorageHudi;
friend class StorageDelta;
friend class StorageDeltaLake;
S3Configuration s3_configuration;
std::vector<String> keys;

View File

@ -34,7 +34,7 @@ void registerStorageS3(StorageFactory & factory);
void registerStorageCOS(StorageFactory & factory);
void registerStorageOSS(StorageFactory & factory);
void registerStorageHudi(StorageFactory & factory);
void registerStorageDelta(StorageFactory & factory);
void registerStorageDeltaLake(StorageFactory & factory);
#endif
#if USE_HDFS
@ -123,7 +123,7 @@ void registerStorages()
registerStorageCOS(factory);
registerStorageOSS(factory);
registerStorageHudi(factory);
registerStorageDelta(factory);
registerStorageDeltaLake(factory);
#endif
#if USE_HDFS

View File

@ -10,7 +10,7 @@
# include <Interpreters/evaluateConstantExpression.h>
# include <Interpreters/parseColumnsListForTableFunction.h>
# include <Parsers/ASTLiteral.h>
# include <Storages/StorageDelta.h>
# include <Storages/StorageDeltaLake.h>
# include <Storages/StorageURL.h>
# include <Storages/checkAndGetLiteralArgument.h>
# include <TableFunctions/TableFunctionDeltaLake.h>
@ -27,7 +27,7 @@ namespace ErrorCodes
}
void TableFunctionDelta::parseArgumentsImpl(
void TableFunctionDeltaLake::parseArgumentsImpl(
const String & error_message, ASTs & args, ContextPtr context, StorageS3Configuration & base_configuration)
{
if (args.empty() || args.size() > 6)
@ -100,7 +100,7 @@ void TableFunctionDelta::parseArgumentsImpl(
= checkAndGetLiteralArgument<String>(args[args_to_idx["secret_access_key"]], "secret_access_key");
}
void TableFunctionDelta::parseArguments(const ASTPtr & ast_function, ContextPtr context)
void TableFunctionDeltaLake::parseArguments(const ASTPtr & ast_function, ContextPtr context)
{
/// Parse args
ASTs & args_func = ast_function->children;
@ -125,18 +125,18 @@ void TableFunctionDelta::parseArguments(const ASTPtr & ast_function, ContextPtr
parseArgumentsImpl(message, args, context, configuration);
}
ColumnsDescription TableFunctionDelta::getActualTableStructure(ContextPtr context) const
ColumnsDescription TableFunctionDeltaLake::getActualTableStructure(ContextPtr context) const
{
if (configuration.structure == "auto")
{
context->checkAccess(getSourceAccessType());
return StorageS3::getTableStructureFromData(configuration, false, std::nullopt, context);
return StorageDeltaLake::getTableStructureFromData(configuration, std::nullopt, context);
}
return parseColumnsListFromString(configuration.structure, context);
}
StoragePtr TableFunctionDelta::executeImpl(
StoragePtr TableFunctionDeltaLake::executeImpl(
const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/) const
{
Poco::URI uri(configuration.url);
@ -146,7 +146,7 @@ StoragePtr TableFunctionDelta::executeImpl(
if (configuration.structure != "auto")
columns = parseColumnsListFromString(configuration.structure, context);
StoragePtr storage = std::make_shared<StorageDelta>(
StoragePtr storage = std::make_shared<StorageDeltaLake>(
configuration, StorageID(getDatabaseName(), table_name), columns, ConstraintsDescription{}, String{}, context, std::nullopt);
storage->startup();
@ -155,9 +155,9 @@ StoragePtr TableFunctionDelta::executeImpl(
}
void registerTableFunctionDelta(TableFunctionFactory & factory)
void registerTableFunctionDeltaLake(TableFunctionFactory & factory)
{
factory.registerFunction<TableFunctionDelta>(
factory.registerFunction<TableFunctionDeltaLake>(
{.documentation
= {R"(The table function can be used to read the DeltaLake table stored on object store.)",
Documentation::Examples{{"deltaLake", "SELECT * FROM deltaLake(url, access_key_id, secret_access_key)"}},

View File

@ -16,7 +16,7 @@ class TableFunctionS3Cluster;
/* deltaLake(source, [access_key_id, secret_access_key,] format, structure[, compression]) - creates a temporary DeltaLake table on S3.
*/
class TableFunctionDelta : public ITableFunction
class TableFunctionDeltaLake : public ITableFunction
{
public:
static constexpr auto name = "deltaLake";

View File

@ -130,7 +130,7 @@ ColumnsDescription TableFunctionHudi::getActualTableStructure(ContextPtr context
if (configuration.structure == "auto")
{
context->checkAccess(getSourceAccessType());
return StorageS3::getTableStructureFromData(configuration, false, std::nullopt, context);
return StorageHudi::getTableStructureFromData(configuration, std::nullopt, context);
}
return parseColumnsListFromString(configuration.structure, context);

View File

@ -28,7 +28,7 @@ void registerTableFunctions()
registerTableFunctionS3Cluster(factory);
registerTableFunctionCOS(factory);
registerTableFunctionHudi(factory);
registerTableFunctionDelta(factory);
registerTableFunctionDeltaLake(factory);
registerTableFunctionOSS(factory);
#endif

View File

@ -25,7 +25,7 @@ void registerTableFunctionS3(TableFunctionFactory & factory);
void registerTableFunctionS3Cluster(TableFunctionFactory & factory);
void registerTableFunctionCOS(TableFunctionFactory & factory);
void registerTableFunctionHudi(TableFunctionFactory & factory);
void registerTableFunctionDelta(TableFunctionFactory & factory);
void registerTableFunctionDeltaLake(TableFunctionFactory & factory);
void registerTableFunctionOSS(TableFunctionFactory & factory);
#endif

View File

@ -1,7 +1,6 @@
import logging
import os
import json
import helpers.client
import pytest
from helpers.cluster import ClickHouseCluster
@ -143,3 +142,25 @@ def test_select_query(started_cluster):
),
).splitlines()
assert len(result) > 0
def test_describe_query(started_cluster):
instance = started_cluster.instances["main_server"]
bucket = started_cluster.minio_bucket
result = instance.query(
f"DESCRIBE deltaLake('http://{started_cluster.minio_ip}:{started_cluster.minio_port}/{bucket}/test_table/', 'minio', 'minio123') FORMAT TSV",
)
assert result == TSV(
[
["begin_lat", "Nullable(Float64)"],
["begin_lon", "Nullable(Float64)"],
["driver", "Nullable(String)"],
["end_lat", "Nullable(Float64)"],
["end_lon", "Nullable(Float64)"],
["fare", "Nullable(Float64)"],
["rider", "Nullable(String)"],
["ts", "Nullable(Int64)"],
["uuid", "Nullable(String)"],
]
)

View File

@ -161,7 +161,7 @@ def test_select_query(started_cluster):
result = run_query(instance, distinct_select_query)
result_table_function = run_query(
instance,
distinct_select_query.format(
distinct_select_table_function_query.format(
ip=started_cluster.minio_ip, port=started_cluster.minio_port, bucket=bucket
),
)
@ -173,3 +173,31 @@ def test_select_query(started_cluster):
assert TSV(result) == TSV(expected)
assert TSV(result_table_function) == TSV(expected)
def test_describe_query(started_cluster):
instance = started_cluster.instances["main_server"]
bucket = started_cluster.minio_bucket
result = instance.query(
f"DESCRIBE hudi('http://{started_cluster.minio_ip}:{started_cluster.minio_port}/{bucket}/test_table/', 'minio', 'minio123') FORMAT TSV",
)
assert result == TSV(
[
["_hoodie_commit_time", "Nullable(String)"],
["_hoodie_commit_seqno", "Nullable(String)"],
["_hoodie_record_key", "Nullable(String)"],
["_hoodie_partition_path", "Nullable(String)"],
["_hoodie_file_name", "Nullable(String)"],
["begin_lat", "Nullable(Float64)"],
["begin_lon", "Nullable(Float64)"],
["driver", "Nullable(String)"],
["end_lat", "Nullable(Float64)"],
["end_lon", "Nullable(Float64)"],
["fare", "Nullable(Float64)"],
["partitionpath", "Nullable(String)"],
["rider", "Nullable(String)"],
["ts", "Nullable(Int64)"],
["uuid", "Nullable(String)"],
]
)