mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-10 01:25:21 +00:00
Fix DESCRIBE for deltaLake and hudi table functions
This commit is contained in:
parent
aa855d86a1
commit
d5848d53cc
@ -47,7 +47,7 @@ void DeltaLakeMetadata::remove(const String & filename, uint64_t /*timestamp */)
|
||||
throw Exception(ErrorCodes::INCORRECT_DATA, "Invalid table metadata, tried to remove {} before adding it", filename);
|
||||
}
|
||||
|
||||
std::vector<String> DeltaLakeMetadata::ListCurrentFiles() &&
|
||||
std::vector<String> DeltaLakeMetadata::listCurrentFiles() &&
|
||||
{
|
||||
std::vector<String> keys;
|
||||
keys.reserve(file_update_time.size());
|
||||
@ -61,10 +61,10 @@ std::vector<String> DeltaLakeMetadata::ListCurrentFiles() &&
|
||||
JsonMetadataGetter::JsonMetadataGetter(StorageS3::S3Configuration & configuration_, const String & table_path_, ContextPtr context)
|
||||
: base_configuration(configuration_), table_path(table_path_)
|
||||
{
|
||||
Init(context);
|
||||
init(context);
|
||||
}
|
||||
|
||||
void JsonMetadataGetter::Init(ContextPtr context)
|
||||
void JsonMetadataGetter::init(ContextPtr context)
|
||||
{
|
||||
auto keys = getJsonLogFiles();
|
||||
|
||||
@ -178,6 +178,52 @@ void JsonMetadataGetter::handleJSON(const JSON & json)
|
||||
}
|
||||
}
|
||||
|
||||
namespace
|
||||
{
|
||||
|
||||
StorageS3::S3Configuration getBaseConfiguration(const StorageS3Configuration & configuration)
|
||||
{
|
||||
return {configuration.url, configuration.auth_settings, configuration.rw_settings, configuration.headers};
|
||||
}
|
||||
|
||||
// DeltaLake stores data in parts in different files
|
||||
// keys is vector of parts with latest version
|
||||
// generateQueryFromKeys constructs query from parts filenames for
|
||||
// underlying StorageS3 engine
|
||||
String generateQueryFromKeys(const std::vector<String> & keys)
|
||||
{
|
||||
std::string new_query = fmt::format("{{{}}}", fmt::join(keys, ","));
|
||||
return new_query;
|
||||
}
|
||||
|
||||
|
||||
StorageS3Configuration getAdjustedS3Configuration(
|
||||
const ContextPtr & context,
|
||||
StorageS3::S3Configuration & base_configuration,
|
||||
const StorageS3Configuration & configuration,
|
||||
const std::string & table_path,
|
||||
Poco::Logger * log)
|
||||
{
|
||||
JsonMetadataGetter getter{base_configuration, table_path, context};
|
||||
|
||||
auto keys = getter.getFiles();
|
||||
auto new_uri = base_configuration.uri.uri.toString() + generateQueryFromKeys(keys);
|
||||
|
||||
LOG_DEBUG(log, "New uri: {}", new_uri);
|
||||
LOG_DEBUG(log, "Table path: {}", table_path);
|
||||
|
||||
// set new url in configuration
|
||||
StorageS3Configuration new_configuration;
|
||||
new_configuration.url = new_uri;
|
||||
new_configuration.auth_settings.access_key_id = configuration.auth_settings.access_key_id;
|
||||
new_configuration.auth_settings.secret_access_key = configuration.auth_settings.secret_access_key;
|
||||
new_configuration.format = configuration.format;
|
||||
|
||||
return new_configuration;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
StorageDelta::StorageDelta(
|
||||
const StorageS3Configuration & configuration_,
|
||||
const StorageID & table_id_,
|
||||
@ -187,28 +233,14 @@ StorageDelta::StorageDelta(
|
||||
ContextPtr context_,
|
||||
std::optional<FormatSettings> format_settings_)
|
||||
: IStorage(table_id_)
|
||||
, base_configuration{configuration_.url, configuration_.auth_settings, configuration_.rw_settings, configuration_.headers}
|
||||
, base_configuration{getBaseConfiguration(configuration_)}
|
||||
, log(&Poco::Logger::get("StorageDeltaLake (" + table_id_.table_name + ")"))
|
||||
, table_path(base_configuration.uri.key)
|
||||
{
|
||||
StorageInMemoryMetadata storage_metadata;
|
||||
StorageS3::updateS3Configuration(context_, base_configuration);
|
||||
|
||||
JsonMetadataGetter getter{base_configuration, table_path, context_};
|
||||
|
||||
auto keys = getter.getFiles();
|
||||
auto new_uri = base_configuration.uri.uri.toString() + generateQueryFromKeys(std::move(keys));
|
||||
|
||||
LOG_DEBUG(log, "New uri: {}", new_uri);
|
||||
LOG_DEBUG(log, "Table path: {}", table_path);
|
||||
|
||||
// set new url in configuration
|
||||
StorageS3Configuration new_configuration;
|
||||
new_configuration.url = new_uri;
|
||||
new_configuration.auth_settings.access_key_id = configuration_.auth_settings.access_key_id;
|
||||
new_configuration.auth_settings.secret_access_key = configuration_.auth_settings.secret_access_key;
|
||||
new_configuration.format = configuration_.format;
|
||||
|
||||
auto new_configuration = getAdjustedS3Configuration(context_, base_configuration, configuration_, table_path, log);
|
||||
|
||||
if (columns_.empty())
|
||||
{
|
||||
@ -250,13 +282,15 @@ Pipe StorageDelta::read(
|
||||
return s3engine->read(column_names, storage_snapshot, query_info, context, processed_stage, max_block_size, num_streams);
|
||||
}
|
||||
|
||||
String StorageDelta::generateQueryFromKeys(std::vector<String> && keys)
|
||||
ColumnsDescription StorageDelta::getTableStructureFromData(
|
||||
const StorageS3Configuration & configuration, const std::optional<FormatSettings> & format_settings, ContextPtr ctx)
|
||||
{
|
||||
// DeltaLake store data parts in different files
|
||||
// keys are filenames of parts
|
||||
// for StorageS3 to read all parts we need format {key1,key2,key3,...keyn}
|
||||
std::string new_query = fmt::format("{{{}}}", fmt::join(keys, ","));
|
||||
return new_query;
|
||||
auto base_configuration = getBaseConfiguration(configuration);
|
||||
StorageS3::updateS3Configuration(ctx, base_configuration);
|
||||
auto new_configuration = getAdjustedS3Configuration(
|
||||
ctx, base_configuration, configuration, base_configuration.uri.key, &Poco::Logger::get("StorageDeltaLake"));
|
||||
return StorageS3::getTableStructureFromData(
|
||||
new_configuration, /*distributed processing*/ false, format_settings, ctx, /*object_infos*/ nullptr);
|
||||
}
|
||||
|
||||
void registerStorageDelta(StorageFactory & factory)
|
||||
|
@ -32,7 +32,7 @@ public:
|
||||
void setLastModifiedTime(const String & filename, uint64_t timestamp);
|
||||
void remove(const String & filename, uint64_t timestamp);
|
||||
|
||||
std::vector<String> ListCurrentFiles() &&;
|
||||
std::vector<String> listCurrentFiles() &&;
|
||||
|
||||
private:
|
||||
std::unordered_map<String, uint64_t> file_update_time;
|
||||
@ -44,10 +44,10 @@ class JsonMetadataGetter
|
||||
public:
|
||||
JsonMetadataGetter(StorageS3::S3Configuration & configuration_, const String & table_path_, ContextPtr context);
|
||||
|
||||
std::vector<String> getFiles() { return std::move(metadata).ListCurrentFiles(); }
|
||||
std::vector<String> getFiles() { return std::move(metadata).listCurrentFiles(); }
|
||||
|
||||
private:
|
||||
void Init(ContextPtr context);
|
||||
void init(ContextPtr context);
|
||||
|
||||
std::vector<String> getJsonLogFiles();
|
||||
|
||||
@ -87,14 +87,12 @@ public:
|
||||
size_t max_block_size,
|
||||
size_t num_streams) override;
|
||||
|
||||
static ColumnsDescription getTableStructureFromData(
|
||||
const StorageS3Configuration & configuration,
|
||||
const std::optional<FormatSettings> & format_settings,
|
||||
ContextPtr ctx);
|
||||
private:
|
||||
void Init();
|
||||
|
||||
// DeltaLake stores data in parts in different files
|
||||
// keys is vector of parts with latest version
|
||||
// generateQueryFromKeys constructs query from parts filenames for
|
||||
// underlying StorageS3 engine
|
||||
static String generateQueryFromKeys(std::vector<String> && keys);
|
||||
void init();
|
||||
|
||||
StorageS3::S3Configuration base_configuration;
|
||||
std::shared_ptr<StorageS3> s3engine;
|
||||
|
@ -28,115 +28,20 @@ namespace ErrorCodes
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
|
||||
StorageHudi::StorageHudi(
|
||||
const StorageS3Configuration & configuration_,
|
||||
const StorageID & table_id_,
|
||||
ColumnsDescription columns_,
|
||||
const ConstraintsDescription & constraints_,
|
||||
const String & comment,
|
||||
ContextPtr context_,
|
||||
std::optional<FormatSettings> format_settings_)
|
||||
: IStorage(table_id_)
|
||||
, base_configuration{configuration_.url, configuration_.auth_settings, configuration_.rw_settings, configuration_.headers}
|
||||
, log(&Poco::Logger::get("StorageHudi (" + table_id_.table_name + ")"))
|
||||
, table_path(base_configuration.uri.key)
|
||||
namespace
|
||||
{
|
||||
StorageInMemoryMetadata storage_metadata;
|
||||
StorageS3::updateS3Configuration(context_, base_configuration);
|
||||
|
||||
auto keys = getKeysFromS3();
|
||||
auto new_uri = base_configuration.uri.uri.toString() + generateQueryFromKeys(keys, configuration_.format);
|
||||
|
||||
LOG_DEBUG(log, "New uri: {}", new_uri);
|
||||
LOG_DEBUG(log, "Table path: {}", table_path);
|
||||
|
||||
StorageS3Configuration new_configuration;
|
||||
new_configuration.url = new_uri;
|
||||
new_configuration.auth_settings.access_key_id = configuration_.auth_settings.access_key_id;
|
||||
new_configuration.auth_settings.secret_access_key = configuration_.auth_settings.secret_access_key;
|
||||
new_configuration.format = configuration_.format;
|
||||
|
||||
if (columns_.empty())
|
||||
{
|
||||
columns_ = StorageS3::getTableStructureFromData(
|
||||
new_configuration, /*distributed processing*/ false, format_settings_, context_, nullptr);
|
||||
storage_metadata.setColumns(columns_);
|
||||
}
|
||||
else
|
||||
storage_metadata.setColumns(columns_);
|
||||
|
||||
storage_metadata.setConstraints(constraints_);
|
||||
storage_metadata.setComment(comment);
|
||||
setInMemoryMetadata(storage_metadata);
|
||||
|
||||
s3engine = std::make_shared<StorageS3>(
|
||||
new_configuration,
|
||||
table_id_,
|
||||
columns_,
|
||||
constraints_,
|
||||
comment,
|
||||
context_,
|
||||
format_settings_,
|
||||
/* distributed_processing_ */ false,
|
||||
nullptr);
|
||||
StorageS3::S3Configuration getBaseConfiguration(const StorageS3Configuration & configuration)
|
||||
{
|
||||
return {configuration.url, configuration.auth_settings, configuration.rw_settings, configuration.headers};
|
||||
}
|
||||
|
||||
Pipe StorageHudi::read(
|
||||
const Names & column_names,
|
||||
const StorageSnapshotPtr & storage_snapshot,
|
||||
SelectQueryInfo & query_info,
|
||||
ContextPtr context,
|
||||
QueryProcessingStage::Enum processed_stage,
|
||||
size_t max_block_size,
|
||||
size_t num_streams)
|
||||
{
|
||||
StorageS3::updateS3Configuration(context, base_configuration);
|
||||
return s3engine->read(column_names, storage_snapshot, query_info, context, processed_stage, max_block_size, num_streams);
|
||||
}
|
||||
|
||||
std::vector<std::string> StorageHudi::getKeysFromS3()
|
||||
{
|
||||
std::vector<std::string> keys;
|
||||
|
||||
const auto & client = base_configuration.client;
|
||||
|
||||
Aws::S3::Model::ListObjectsV2Request request;
|
||||
Aws::S3::Model::ListObjectsV2Outcome outcome;
|
||||
|
||||
bool is_finished{false};
|
||||
const auto bucket{base_configuration.uri.bucket};
|
||||
|
||||
request.SetBucket(bucket);
|
||||
request.SetPrefix(table_path);
|
||||
|
||||
while (!is_finished)
|
||||
{
|
||||
outcome = client->ListObjectsV2(request);
|
||||
if (!outcome.IsSuccess())
|
||||
throw Exception(
|
||||
ErrorCodes::S3_ERROR,
|
||||
"Could not list objects in bucket {} with key {}, S3 exception: {}, message: {}",
|
||||
quoteString(bucket),
|
||||
quoteString(table_path),
|
||||
backQuote(outcome.GetError().GetExceptionName()),
|
||||
quoteString(outcome.GetError().GetMessage()));
|
||||
|
||||
const auto & result_batch = outcome.GetResult().GetContents();
|
||||
for (const auto & obj : result_batch)
|
||||
{
|
||||
const auto & filename = obj.GetKey().substr(table_path.size()); /// Object name without tablepath prefix.
|
||||
keys.push_back(filename);
|
||||
LOG_DEBUG(log, "Found file: {}", filename);
|
||||
}
|
||||
|
||||
request.SetContinuationToken(outcome.GetResult().GetNextContinuationToken());
|
||||
is_finished = !outcome.GetResult().GetIsTruncated();
|
||||
}
|
||||
|
||||
return keys;
|
||||
}
|
||||
|
||||
String StorageHudi::generateQueryFromKeys(const std::vector<std::string> & keys, const String & format)
|
||||
/// Apache Hudi store parts of data in different files.
|
||||
/// Every part file has timestamp in it.
|
||||
/// Every partition(directory) in Apache Hudi has different versions of part.
|
||||
/// To find needed parts we need to find out latest part file for every partition.
|
||||
/// Part format is usually parquet, but can differ.
|
||||
String generateQueryFromKeys(const std::vector<std::string> & keys, const String & format)
|
||||
{
|
||||
/// For each partition path take only latest file.
|
||||
struct FileInfo
|
||||
@ -187,6 +92,138 @@ String StorageHudi::generateQueryFromKeys(const std::vector<std::string> & keys,
|
||||
return "{" + list_of_keys + "}";
|
||||
}
|
||||
|
||||
std::vector<std::string> getKeysFromS3(const StorageS3::S3Configuration & base_configuration, const std::string & table_path, Poco::Logger * log)
|
||||
{
|
||||
std::vector<std::string> keys;
|
||||
|
||||
const auto & client = base_configuration.client;
|
||||
|
||||
Aws::S3::Model::ListObjectsV2Request request;
|
||||
Aws::S3::Model::ListObjectsV2Outcome outcome;
|
||||
|
||||
bool is_finished{false};
|
||||
const auto bucket{base_configuration.uri.bucket};
|
||||
|
||||
request.SetBucket(bucket);
|
||||
request.SetPrefix(table_path);
|
||||
|
||||
while (!is_finished)
|
||||
{
|
||||
outcome = client->ListObjectsV2(request);
|
||||
if (!outcome.IsSuccess())
|
||||
throw Exception(
|
||||
ErrorCodes::S3_ERROR,
|
||||
"Could not list objects in bucket {} with key {}, S3 exception: {}, message: {}",
|
||||
quoteString(bucket),
|
||||
quoteString(table_path),
|
||||
backQuote(outcome.GetError().GetExceptionName()),
|
||||
quoteString(outcome.GetError().GetMessage()));
|
||||
|
||||
const auto & result_batch = outcome.GetResult().GetContents();
|
||||
for (const auto & obj : result_batch)
|
||||
{
|
||||
const auto & filename = obj.GetKey().substr(table_path.size()); /// Object name without tablepath prefix.
|
||||
keys.push_back(filename);
|
||||
LOG_DEBUG(log, "Found file: {}", filename);
|
||||
}
|
||||
|
||||
request.SetContinuationToken(outcome.GetResult().GetNextContinuationToken());
|
||||
is_finished = !outcome.GetResult().GetIsTruncated();
|
||||
}
|
||||
|
||||
return keys;
|
||||
}
|
||||
|
||||
|
||||
StorageS3Configuration getAdjustedS3Configuration(
|
||||
StorageS3::S3Configuration & base_configuration,
|
||||
const StorageS3Configuration & configuration,
|
||||
const std::string & table_path,
|
||||
Poco::Logger * log)
|
||||
{
|
||||
auto keys = getKeysFromS3(base_configuration, table_path, log);
|
||||
auto new_uri = base_configuration.uri.uri.toString() + generateQueryFromKeys(keys, configuration.format);
|
||||
|
||||
LOG_DEBUG(log, "New uri: {}", new_uri);
|
||||
LOG_DEBUG(log, "Table path: {}", table_path);
|
||||
|
||||
StorageS3Configuration new_configuration;
|
||||
new_configuration.url = new_uri;
|
||||
new_configuration.auth_settings.access_key_id = configuration.auth_settings.access_key_id;
|
||||
new_configuration.auth_settings.secret_access_key = configuration.auth_settings.secret_access_key;
|
||||
new_configuration.format = configuration.format;
|
||||
|
||||
return new_configuration;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
StorageHudi::StorageHudi(
|
||||
const StorageS3Configuration & configuration_,
|
||||
const StorageID & table_id_,
|
||||
ColumnsDescription columns_,
|
||||
const ConstraintsDescription & constraints_,
|
||||
const String & comment,
|
||||
ContextPtr context_,
|
||||
std::optional<FormatSettings> format_settings_)
|
||||
: IStorage(table_id_)
|
||||
, base_configuration{getBaseConfiguration(configuration_)}
|
||||
, log(&Poco::Logger::get("StorageHudi (" + table_id_.table_name + ")"))
|
||||
, table_path(base_configuration.uri.key)
|
||||
{
|
||||
StorageInMemoryMetadata storage_metadata;
|
||||
StorageS3::updateS3Configuration(context_, base_configuration);
|
||||
|
||||
auto new_configuration = getAdjustedS3Configuration(base_configuration, configuration_, table_path, log);
|
||||
|
||||
if (columns_.empty())
|
||||
{
|
||||
columns_ = StorageS3::getTableStructureFromData(
|
||||
new_configuration, /*distributed processing*/ false, format_settings_, context_, nullptr);
|
||||
storage_metadata.setColumns(columns_);
|
||||
}
|
||||
else
|
||||
storage_metadata.setColumns(columns_);
|
||||
|
||||
storage_metadata.setConstraints(constraints_);
|
||||
storage_metadata.setComment(comment);
|
||||
setInMemoryMetadata(storage_metadata);
|
||||
|
||||
s3engine = std::make_shared<StorageS3>(
|
||||
new_configuration,
|
||||
table_id_,
|
||||
columns_,
|
||||
constraints_,
|
||||
comment,
|
||||
context_,
|
||||
format_settings_,
|
||||
/* distributed_processing_ */ false,
|
||||
nullptr);
|
||||
}
|
||||
|
||||
Pipe StorageHudi::read(
|
||||
const Names & column_names,
|
||||
const StorageSnapshotPtr & storage_snapshot,
|
||||
SelectQueryInfo & query_info,
|
||||
ContextPtr context,
|
||||
QueryProcessingStage::Enum processed_stage,
|
||||
size_t max_block_size,
|
||||
size_t num_streams)
|
||||
{
|
||||
StorageS3::updateS3Configuration(context, base_configuration);
|
||||
return s3engine->read(column_names, storage_snapshot, query_info, context, processed_stage, max_block_size, num_streams);
|
||||
}
|
||||
|
||||
ColumnsDescription StorageHudi::getTableStructureFromData(
|
||||
const StorageS3Configuration & configuration, const std::optional<FormatSettings> & format_settings, ContextPtr ctx)
|
||||
{
|
||||
auto base_configuration = getBaseConfiguration(configuration);
|
||||
StorageS3::updateS3Configuration(ctx, base_configuration);
|
||||
auto new_configuration = getAdjustedS3Configuration(
|
||||
base_configuration, configuration, base_configuration.uri.key, &Poco::Logger::get("StorageDeltaLake"));
|
||||
return StorageS3::getTableStructureFromData(
|
||||
new_configuration, /*distributed processing*/ false, format_settings, ctx, /*object_infos*/ nullptr);
|
||||
}
|
||||
|
||||
void registerStorageHudi(StorageFactory & factory)
|
||||
{
|
||||
|
@ -48,16 +48,11 @@ public:
|
||||
size_t max_block_size,
|
||||
size_t num_streams) override;
|
||||
|
||||
static ColumnsDescription getTableStructureFromData(
|
||||
const StorageS3Configuration & configuration,
|
||||
const std::optional<FormatSettings> & format_settings,
|
||||
ContextPtr ctx);
|
||||
private:
|
||||
std::vector<std::string> getKeysFromS3();
|
||||
|
||||
/// Apache Hudi store parts of data in different files.
|
||||
/// Every part file has timestamp in it.
|
||||
/// Every partition(directory) in Apache Hudi has different versions of part.
|
||||
/// To find needed parts we need to find out latest part file for every partition.
|
||||
/// Part format is usually parquet, but can differ.
|
||||
static String generateQueryFromKeys(const std::vector<std::string> & keys, const String & format);
|
||||
|
||||
StorageS3::S3Configuration base_configuration;
|
||||
std::shared_ptr<StorageS3> s3engine;
|
||||
Poco::Logger * log;
|
||||
|
@ -130,7 +130,7 @@ ColumnsDescription TableFunctionDelta::getActualTableStructure(ContextPtr contex
|
||||
if (configuration.structure == "auto")
|
||||
{
|
||||
context->checkAccess(getSourceAccessType());
|
||||
return StorageS3::getTableStructureFromData(configuration, false, std::nullopt, context);
|
||||
return StorageDelta::getTableStructureFromData(configuration, std::nullopt, context);
|
||||
}
|
||||
|
||||
return parseColumnsListFromString(configuration.structure, context);
|
||||
|
@ -130,7 +130,7 @@ ColumnsDescription TableFunctionHudi::getActualTableStructure(ContextPtr context
|
||||
if (configuration.structure == "auto")
|
||||
{
|
||||
context->checkAccess(getSourceAccessType());
|
||||
return StorageS3::getTableStructureFromData(configuration, false, std::nullopt, context);
|
||||
return StorageHudi::getTableStructureFromData(configuration, std::nullopt, context);
|
||||
}
|
||||
|
||||
return parseColumnsListFromString(configuration.structure, context);
|
||||
|
@ -1,7 +1,6 @@
|
||||
import logging
|
||||
import os
|
||||
import json
|
||||
|
||||
import helpers.client
|
||||
import pytest
|
||||
from helpers.cluster import ClickHouseCluster
|
||||
@ -143,3 +142,25 @@ def test_select_query(started_cluster):
|
||||
),
|
||||
).splitlines()
|
||||
assert len(result) > 0
|
||||
|
||||
|
||||
def test_describe_query(started_cluster):
|
||||
instance = started_cluster.instances["main_server"]
|
||||
bucket = started_cluster.minio_bucket
|
||||
result = instance.query(
|
||||
f"DESCRIBE deltaLake('http://{started_cluster.minio_ip}:{started_cluster.minio_port}/{bucket}/test_table/', 'minio', 'minio123') FORMAT TSV",
|
||||
)
|
||||
|
||||
assert result == TSV(
|
||||
[
|
||||
["begin_lat", "Nullable(Float64)"],
|
||||
["begin_lon", "Nullable(Float64)"],
|
||||
["driver", "Nullable(String)"],
|
||||
["end_lat", "Nullable(Float64)"],
|
||||
["end_lon", "Nullable(Float64)"],
|
||||
["fare", "Nullable(Float64)"],
|
||||
["rider", "Nullable(String)"],
|
||||
["ts", "Nullable(Int64)"],
|
||||
["uuid", "Nullable(String)"],
|
||||
]
|
||||
)
|
||||
|
@ -161,7 +161,7 @@ def test_select_query(started_cluster):
|
||||
result = run_query(instance, distinct_select_query)
|
||||
result_table_function = run_query(
|
||||
instance,
|
||||
distinct_select_query.format(
|
||||
distinct_select_table_function_query.format(
|
||||
ip=started_cluster.minio_ip, port=started_cluster.minio_port, bucket=bucket
|
||||
),
|
||||
)
|
||||
@ -173,3 +173,30 @@ def test_select_query(started_cluster):
|
||||
|
||||
assert TSV(result) == TSV(expected)
|
||||
assert TSV(result_table_function) == TSV(expected)
|
||||
|
||||
def test_describe_query(started_cluster):
|
||||
instance = started_cluster.instances["main_server"]
|
||||
bucket = started_cluster.minio_bucket
|
||||
result = instance.query(
|
||||
f"DESCRIBE hudi('http://{started_cluster.minio_ip}:{started_cluster.minio_port}/{bucket}/test_table/', 'minio', 'minio123') FORMAT TSV",
|
||||
)
|
||||
|
||||
assert result == TSV(
|
||||
[
|
||||
["_hoodie_commit_time", "Nullable(String)"],
|
||||
["_hoodie_commit_seqno", "Nullable(String)"],
|
||||
["_hoodie_record_key", "Nullable(String)"],
|
||||
["_hoodie_partition_path", "Nullable(String)"],
|
||||
["_hoodie_file_name", "Nullable(String)"],
|
||||
["begin_lat", "Nullable(Float64)"],
|
||||
["begin_lon", "Nullable(Float64)"],
|
||||
["driver", "Nullable(String)"],
|
||||
["end_lat", "Nullable(Float64)"],
|
||||
["end_lon", "Nullable(Float64)"],
|
||||
["fare", "Nullable(Float64)"],
|
||||
["partitionpath", "Nullable(String)"],
|
||||
["rider", "Nullable(String)"],
|
||||
["ts", "Nullable(Int64)"],
|
||||
["uuid", "Nullable(String)"],
|
||||
]
|
||||
)
|
||||
|
Loading…
Reference in New Issue
Block a user