mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 15:42:02 +00:00
Merge branch 'master' into revert_SingleValueDataString
This commit is contained in:
commit
cab33c02af
75
docs/en/sql-reference/table-functions/format.md
Normal file
75
docs/en/sql-reference/table-functions/format.md
Normal file
@ -0,0 +1,75 @@
|
||||
---
|
||||
slug: /en/sql-reference/table-functions/format
|
||||
sidebar_position: 56
|
||||
sidebar_label: format
|
||||
---
|
||||
|
||||
# format
|
||||
|
||||
Extracts table structure from data and parses it according to specified input format.
|
||||
|
||||
**Syntax**
|
||||
|
||||
``` sql
|
||||
format(format_name, data)
|
||||
```
|
||||
|
||||
**Parameters**
|
||||
|
||||
- `format_name` — The [format](../../interfaces/formats.md#formats) of the data.
|
||||
- `data` — String literal or constant expression that returns a string containing data in specified format
|
||||
|
||||
**Returned value**
|
||||
|
||||
A table with data parsed from `data` argument according specified format and extracted schema.
|
||||
|
||||
**Examples**
|
||||
|
||||
**Query:**
|
||||
``` sql
|
||||
:) select * from format(JSONEachRow,
|
||||
$$
|
||||
{"a": "Hello", "b": 111}
|
||||
{"a": "World", "b": 123}
|
||||
{"a": "Hello", "b": 112}
|
||||
{"a": "World", "b": 124}
|
||||
$$)
|
||||
```
|
||||
|
||||
**Result:**
|
||||
|
||||
```text
|
||||
┌───b─┬─a─────┐
|
||||
│ 111 │ Hello │
|
||||
│ 123 │ World │
|
||||
│ 112 │ Hello │
|
||||
│ 124 │ World │
|
||||
└─────┴───────┘
|
||||
```
|
||||
|
||||
**Query:**
|
||||
```sql
|
||||
|
||||
:) desc format(JSONEachRow,
|
||||
$$
|
||||
{"a": "Hello", "b": 111}
|
||||
{"a": "World", "b": 123}
|
||||
{"a": "Hello", "b": 112}
|
||||
{"a": "World", "b": 124}
|
||||
$$)
|
||||
```
|
||||
|
||||
**Result:**
|
||||
|
||||
```text
|
||||
┌─name─┬─type──────────────┬─default_type─┬─default_expression─┬─comment─┬─codec_expression─┬─ttl_expression─┐
|
||||
│ b │ Nullable(Float64) │ │ │ │ │ │
|
||||
│ a │ Nullable(String) │ │ │ │ │ │
|
||||
└──────┴───────────────────┴──────────────┴────────────────────┴─────────┴──────────────────┴────────────────┘
|
||||
```
|
||||
|
||||
**See Also**
|
||||
|
||||
- [Formats](../../interfaces/formats.md)
|
||||
|
||||
[Original article](https://clickhouse.com/docs/en/sql-reference/table-functions/format) <!--hide-->
|
1
docs/ru/sql-reference/table-functions/format.md
Symbolic link
1
docs/ru/sql-reference/table-functions/format.md
Symbolic link
@ -0,0 +1 @@
|
||||
../../../en/sql-reference/table-functions/format.md
|
1
docs/zh/sql-reference/table-functions/format.md
Symbolic link
1
docs/zh/sql-reference/table-functions/format.md
Symbolic link
@ -0,0 +1 @@
|
||||
../../../en/sql-reference/table-functions/format.md
|
@ -15,6 +15,7 @@
|
||||
#include <Common/formatReadable.h>
|
||||
#include <Common/filesystemHelpers.h>
|
||||
#include <Common/ErrorCodes.h>
|
||||
#include <Common/SensitiveDataMasker.h>
|
||||
#include <Common/LockMemoryExceptionInThread.h>
|
||||
#include <filesystem>
|
||||
|
||||
@ -63,11 +64,18 @@ void handle_error_code([[maybe_unused]] const std::string & msg, int code, bool
|
||||
ErrorCodes::increment(code, remote, msg, trace);
|
||||
}
|
||||
|
||||
Exception::Exception(const std::string & msg, int code, bool remote_)
|
||||
: Poco::Exception(msg, code)
|
||||
Exception::MessageMasked::MessageMasked(const std::string & msg_)
|
||||
: msg(msg_)
|
||||
{
|
||||
if (auto * masker = SensitiveDataMasker::getInstance())
|
||||
masker->wipeSensitiveData(msg);
|
||||
}
|
||||
|
||||
Exception::Exception(const MessageMasked & msg_masked, int code, bool remote_)
|
||||
: Poco::Exception(msg_masked.msg, code)
|
||||
, remote(remote_)
|
||||
{
|
||||
handle_error_code(msg, code, remote, getStackFramePointers());
|
||||
handle_error_code(msg_masked.msg, code, remote, getStackFramePointers());
|
||||
}
|
||||
|
||||
Exception::Exception(CreateFromPocoTag, const Poco::Exception & exc)
|
||||
|
@ -27,7 +27,19 @@ public:
|
||||
using FramePointers = std::vector<void *>;
|
||||
|
||||
Exception() = default;
|
||||
Exception(const std::string & msg, int code, bool remote_ = false);
|
||||
|
||||
// used to remove the sensitive information from exceptions if query_masking_rules is configured
|
||||
struct MessageMasked
|
||||
{
|
||||
std::string msg;
|
||||
MessageMasked(const std::string & msg_);
|
||||
};
|
||||
|
||||
Exception(const MessageMasked & msg_masked, int code, bool remote_);
|
||||
|
||||
// delegating constructor to mask sensitive information from the message
|
||||
Exception(const std::string & msg, int code, bool remote_ = false): Exception(MessageMasked(msg), code, remote_)
|
||||
{}
|
||||
|
||||
Exception(int code, const std::string & message)
|
||||
: Exception(message, code)
|
||||
@ -54,12 +66,17 @@ public:
|
||||
template <typename... Args>
|
||||
void addMessage(fmt::format_string<Args...> format, Args &&... args)
|
||||
{
|
||||
extendedMessage(fmt::format(format, std::forward<Args>(args)...));
|
||||
addMessage(fmt::format(format, std::forward<Args>(args)...));
|
||||
}
|
||||
|
||||
void addMessage(const std::string& message)
|
||||
{
|
||||
extendedMessage(message);
|
||||
addMessage(MessageMasked(message));
|
||||
}
|
||||
|
||||
void addMessage(const MessageMasked & msg_masked)
|
||||
{
|
||||
extendedMessage(msg_masked.msg);
|
||||
}
|
||||
|
||||
/// Used to distinguish local exceptions from the one that was received from remote node.
|
||||
|
@ -925,7 +925,7 @@ public:
|
||||
, ErrorCodes::SYNTAX_ERROR);
|
||||
}
|
||||
|
||||
if (allow_function_parameters && ParserToken(TokenType::OpeningRoundBracket).ignore(pos, expected))
|
||||
if (allow_function_parameters && !parameters && ParserToken(TokenType::OpeningRoundBracket).ignore(pos, expected))
|
||||
{
|
||||
parameters = std::make_shared<ASTExpressionList>();
|
||||
std::swap(parameters->children, elements);
|
||||
|
@ -22,7 +22,7 @@ bool ParserTableExpression::parseImpl(Pos & pos, ASTPtr & node, Expected & expec
|
||||
auto res = std::make_shared<ASTTableExpression>();
|
||||
|
||||
if (!ParserWithOptionalAlias(std::make_unique<ParserSubquery>(), true).parse(pos, res->subquery, expected)
|
||||
&& !ParserWithOptionalAlias(std::make_unique<ParserFunction>(true, true), true).parse(pos, res->table_function, expected)
|
||||
&& !ParserWithOptionalAlias(std::make_unique<ParserFunction>(false, true), true).parse(pos, res->table_function, expected)
|
||||
&& !ParserWithOptionalAlias(std::make_unique<ParserCompoundIdentifier>(true, true), true)
|
||||
.parse(pos, res->database_and_table_name, expected))
|
||||
return false;
|
||||
|
@ -99,32 +99,24 @@ Pipe StorageHDFSCluster::read(
|
||||
addColumnsStructureToQueryWithClusterEngine(
|
||||
query_to_send, StorageDictionary::generateNamesAndTypesDescription(storage_snapshot->metadata->getColumns().getAll()), 3, getName());
|
||||
|
||||
for (const auto & replicas : cluster->getShardsAddresses())
|
||||
const auto & current_settings = context->getSettingsRef();
|
||||
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(current_settings);
|
||||
for (const auto & shard_info : cluster->getShardsInfo())
|
||||
{
|
||||
/// There will be only one replica, because we consider each replica as a shard
|
||||
for (const auto & node : replicas)
|
||||
auto try_results = shard_info.pool->getMany(timeouts, ¤t_settings, PoolMode::GET_MANY);
|
||||
for (auto & try_result : try_results)
|
||||
{
|
||||
auto connection = std::make_shared<Connection>(
|
||||
node.host_name, node.port, context->getGlobalContext()->getCurrentDatabase(),
|
||||
node.user, node.password, node.quota_key, node.cluster, node.cluster_secret,
|
||||
"HDFSClusterInititiator",
|
||||
node.compression,
|
||||
node.secure
|
||||
);
|
||||
|
||||
|
||||
/// For unknown reason global context is passed to IStorage::read() method
|
||||
/// So, task_identifier is passed as constructor argument. It is more obvious.
|
||||
auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
|
||||
connection,
|
||||
queryToString(query_to_send),
|
||||
header,
|
||||
context,
|
||||
/*throttler=*/nullptr,
|
||||
scalars,
|
||||
Tables(),
|
||||
processed_stage,
|
||||
RemoteQueryExecutor::Extension{.task_iterator = callback});
|
||||
shard_info.pool,
|
||||
std::vector<IConnectionPool::Entry>{try_result},
|
||||
queryToString(query_to_send),
|
||||
header,
|
||||
context,
|
||||
/*throttler=*/nullptr,
|
||||
scalars,
|
||||
Tables(),
|
||||
processed_stage,
|
||||
RemoteQueryExecutor::Extension{.task_iterator = callback});
|
||||
|
||||
pipes.emplace_back(std::make_shared<RemoteSource>(remote_query_executor, add_agg_info, false));
|
||||
}
|
||||
|
@ -315,7 +315,9 @@ MergeTreeReadTaskColumns getReadTaskColumns(
|
||||
/// 1. Columns for row level filter
|
||||
if (prewhere_info->row_level_filter)
|
||||
{
|
||||
Names row_filter_column_names = prewhere_info->row_level_filter->getRequiredColumnsNames();
|
||||
Names row_filter_column_names = prewhere_info->row_level_filter->getRequiredColumnsNames();
|
||||
injectRequiredColumns(
|
||||
data_part_info_for_reader, storage_snapshot, with_subcolumns, row_filter_column_names);
|
||||
result.pre_columns.push_back(storage_snapshot->getColumnsByNames(options, row_filter_column_names));
|
||||
pre_name_set.insert(row_filter_column_names.begin(), row_filter_column_names.end());
|
||||
}
|
||||
@ -323,7 +325,7 @@ MergeTreeReadTaskColumns getReadTaskColumns(
|
||||
/// 2. Columns for prewhere
|
||||
Names all_pre_column_names = prewhere_info->prewhere_actions->getRequiredColumnsNames();
|
||||
|
||||
const auto injected_pre_columns = injectRequiredColumns(
|
||||
injectRequiredColumns(
|
||||
data_part_info_for_reader, storage_snapshot, with_subcolumns, all_pre_column_names);
|
||||
|
||||
for (const auto & name : all_pre_column_names)
|
||||
|
@ -1,7 +1,7 @@
|
||||
#include "config.h"
|
||||
#if USE_AWS_S3
|
||||
|
||||
#include <Storages/StorageDelta.h>
|
||||
#include <Storages/StorageDeltaLake.h>
|
||||
#include <Common/logger_useful.h>
|
||||
|
||||
#include <IO/ReadBufferFromS3.h>
|
||||
@ -47,7 +47,7 @@ void DeltaLakeMetadata::remove(const String & filename, uint64_t /*timestamp */)
|
||||
throw Exception(ErrorCodes::INCORRECT_DATA, "Invalid table metadata, tried to remove {} before adding it", filename);
|
||||
}
|
||||
|
||||
std::vector<String> DeltaLakeMetadata::ListCurrentFiles() &&
|
||||
std::vector<String> DeltaLakeMetadata::listCurrentFiles() &&
|
||||
{
|
||||
std::vector<String> keys;
|
||||
keys.reserve(file_update_time.size());
|
||||
@ -61,10 +61,10 @@ std::vector<String> DeltaLakeMetadata::ListCurrentFiles() &&
|
||||
JsonMetadataGetter::JsonMetadataGetter(StorageS3::S3Configuration & configuration_, const String & table_path_, ContextPtr context)
|
||||
: base_configuration(configuration_), table_path(table_path_)
|
||||
{
|
||||
Init(context);
|
||||
init(context);
|
||||
}
|
||||
|
||||
void JsonMetadataGetter::Init(ContextPtr context)
|
||||
void JsonMetadataGetter::init(ContextPtr context)
|
||||
{
|
||||
auto keys = getJsonLogFiles();
|
||||
|
||||
@ -180,7 +180,53 @@ void JsonMetadataGetter::handleJSON(const JSON & json)
|
||||
}
|
||||
}
|
||||
|
||||
StorageDelta::StorageDelta(
|
||||
namespace
|
||||
{
|
||||
|
||||
StorageS3::S3Configuration getBaseConfiguration(const StorageS3Configuration & configuration)
|
||||
{
|
||||
return {configuration.url, configuration.auth_settings, configuration.request_settings, configuration.headers};
|
||||
}
|
||||
|
||||
// DeltaLake stores data in parts in different files
|
||||
// keys is vector of parts with latest version
|
||||
// generateQueryFromKeys constructs query from parts filenames for
|
||||
// underlying StorageS3 engine
|
||||
String generateQueryFromKeys(const std::vector<String> & keys)
|
||||
{
|
||||
std::string new_query = fmt::format("{{{}}}", fmt::join(keys, ","));
|
||||
return new_query;
|
||||
}
|
||||
|
||||
|
||||
StorageS3Configuration getAdjustedS3Configuration(
|
||||
const ContextPtr & context,
|
||||
StorageS3::S3Configuration & base_configuration,
|
||||
const StorageS3Configuration & configuration,
|
||||
const std::string & table_path,
|
||||
Poco::Logger * log)
|
||||
{
|
||||
JsonMetadataGetter getter{base_configuration, table_path, context};
|
||||
|
||||
auto keys = getter.getFiles();
|
||||
auto new_uri = base_configuration.uri.uri.toString() + generateQueryFromKeys(keys);
|
||||
|
||||
LOG_DEBUG(log, "New uri: {}", new_uri);
|
||||
LOG_DEBUG(log, "Table path: {}", table_path);
|
||||
|
||||
// set new url in configuration
|
||||
StorageS3Configuration new_configuration;
|
||||
new_configuration.url = new_uri;
|
||||
new_configuration.auth_settings.access_key_id = configuration.auth_settings.access_key_id;
|
||||
new_configuration.auth_settings.secret_access_key = configuration.auth_settings.secret_access_key;
|
||||
new_configuration.format = configuration.format;
|
||||
|
||||
return new_configuration;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
StorageDeltaLake::StorageDeltaLake(
|
||||
const StorageS3Configuration & configuration_,
|
||||
const StorageID & table_id_,
|
||||
ColumnsDescription columns_,
|
||||
@ -189,28 +235,14 @@ StorageDelta::StorageDelta(
|
||||
ContextPtr context_,
|
||||
std::optional<FormatSettings> format_settings_)
|
||||
: IStorage(table_id_)
|
||||
, base_configuration{configuration_.url, configuration_.auth_settings, configuration_.request_settings, configuration_.headers}
|
||||
, base_configuration{getBaseConfiguration(configuration_)}
|
||||
, log(&Poco::Logger::get("StorageDeltaLake (" + table_id_.table_name + ")"))
|
||||
, table_path(base_configuration.uri.key)
|
||||
{
|
||||
StorageInMemoryMetadata storage_metadata;
|
||||
StorageS3::updateS3Configuration(context_, base_configuration);
|
||||
|
||||
JsonMetadataGetter getter{base_configuration, table_path, context_};
|
||||
|
||||
auto keys = getter.getFiles();
|
||||
auto new_uri = base_configuration.uri.uri.toString() + generateQueryFromKeys(std::move(keys));
|
||||
|
||||
LOG_DEBUG(log, "New uri: {}", new_uri);
|
||||
LOG_DEBUG(log, "Table path: {}", table_path);
|
||||
|
||||
// set new url in configuration
|
||||
StorageS3Configuration new_configuration;
|
||||
new_configuration.url = new_uri;
|
||||
new_configuration.auth_settings.access_key_id = configuration_.auth_settings.access_key_id;
|
||||
new_configuration.auth_settings.secret_access_key = configuration_.auth_settings.secret_access_key;
|
||||
new_configuration.format = configuration_.format;
|
||||
|
||||
auto new_configuration = getAdjustedS3Configuration(context_, base_configuration, configuration_, table_path, log);
|
||||
|
||||
if (columns_.empty())
|
||||
{
|
||||
@ -238,7 +270,7 @@ StorageDelta::StorageDelta(
|
||||
nullptr);
|
||||
}
|
||||
|
||||
Pipe StorageDelta::read(
|
||||
Pipe StorageDeltaLake::read(
|
||||
const Names & column_names,
|
||||
const StorageSnapshotPtr & storage_snapshot,
|
||||
SelectQueryInfo & query_info,
|
||||
@ -252,16 +284,18 @@ Pipe StorageDelta::read(
|
||||
return s3engine->read(column_names, storage_snapshot, query_info, context, processed_stage, max_block_size, num_streams);
|
||||
}
|
||||
|
||||
String StorageDelta::generateQueryFromKeys(std::vector<String> && keys)
|
||||
ColumnsDescription StorageDeltaLake::getTableStructureFromData(
|
||||
const StorageS3Configuration & configuration, const std::optional<FormatSettings> & format_settings, ContextPtr ctx)
|
||||
{
|
||||
// DeltaLake store data parts in different files
|
||||
// keys are filenames of parts
|
||||
// for StorageS3 to read all parts we need format {key1,key2,key3,...keyn}
|
||||
std::string new_query = fmt::format("{{{}}}", fmt::join(keys, ","));
|
||||
return new_query;
|
||||
auto base_configuration = getBaseConfiguration(configuration);
|
||||
StorageS3::updateS3Configuration(ctx, base_configuration);
|
||||
auto new_configuration = getAdjustedS3Configuration(
|
||||
ctx, base_configuration, configuration, base_configuration.uri.key, &Poco::Logger::get("StorageDeltaLake"));
|
||||
return StorageS3::getTableStructureFromData(
|
||||
new_configuration, /*distributed processing*/ false, format_settings, ctx, /*object_infos*/ nullptr);
|
||||
}
|
||||
|
||||
void registerStorageDelta(StorageFactory & factory)
|
||||
void registerStorageDeltaLake(StorageFactory & factory)
|
||||
{
|
||||
factory.registerStorage(
|
||||
"DeltaLake",
|
||||
@ -287,7 +321,7 @@ void registerStorageDelta(StorageFactory & factory)
|
||||
configuration.format = "Parquet";
|
||||
}
|
||||
|
||||
return std::make_shared<StorageDelta>(
|
||||
return std::make_shared<StorageDeltaLake>(
|
||||
configuration, args.table_id, args.columns, args.constraints, args.comment, args.getContext(), std::nullopt);
|
||||
},
|
||||
{
|
@ -32,7 +32,7 @@ public:
|
||||
void setLastModifiedTime(const String & filename, uint64_t timestamp);
|
||||
void remove(const String & filename, uint64_t timestamp);
|
||||
|
||||
std::vector<String> ListCurrentFiles() &&;
|
||||
std::vector<String> listCurrentFiles() &&;
|
||||
|
||||
private:
|
||||
std::unordered_map<String, uint64_t> file_update_time;
|
||||
@ -44,10 +44,10 @@ class JsonMetadataGetter
|
||||
public:
|
||||
JsonMetadataGetter(StorageS3::S3Configuration & configuration_, const String & table_path_, ContextPtr context);
|
||||
|
||||
std::vector<String> getFiles() { return std::move(metadata).ListCurrentFiles(); }
|
||||
std::vector<String> getFiles() { return std::move(metadata).listCurrentFiles(); }
|
||||
|
||||
private:
|
||||
void Init(ContextPtr context);
|
||||
void init(ContextPtr context);
|
||||
|
||||
std::vector<String> getJsonLogFiles();
|
||||
|
||||
@ -60,13 +60,13 @@ private:
|
||||
DeltaLakeMetadata metadata;
|
||||
};
|
||||
|
||||
class StorageDelta : public IStorage
|
||||
class StorageDeltaLake : public IStorage
|
||||
{
|
||||
public:
|
||||
// 1. Parses internal file structure of table
|
||||
// 2. Finds out parts with latest version
|
||||
// 3. Creates url for underlying StorageS3 enigne to handle reads
|
||||
StorageDelta(
|
||||
StorageDeltaLake(
|
||||
const StorageS3Configuration & configuration_,
|
||||
const StorageID & table_id_,
|
||||
ColumnsDescription columns_,
|
||||
@ -87,14 +87,12 @@ public:
|
||||
size_t max_block_size,
|
||||
size_t num_streams) override;
|
||||
|
||||
static ColumnsDescription getTableStructureFromData(
|
||||
const StorageS3Configuration & configuration,
|
||||
const std::optional<FormatSettings> & format_settings,
|
||||
ContextPtr ctx);
|
||||
private:
|
||||
void Init();
|
||||
|
||||
// DeltaLake stores data in parts in different files
|
||||
// keys is vector of parts with latest version
|
||||
// generateQueryFromKeys constructs query from parts filenames for
|
||||
// underlying StorageS3 engine
|
||||
static String generateQueryFromKeys(std::vector<String> && keys);
|
||||
void init();
|
||||
|
||||
StorageS3::S3Configuration base_configuration;
|
||||
std::shared_ptr<StorageS3> s3engine;
|
@ -28,115 +28,20 @@ namespace ErrorCodes
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
|
||||
StorageHudi::StorageHudi(
|
||||
const StorageS3Configuration & configuration_,
|
||||
const StorageID & table_id_,
|
||||
ColumnsDescription columns_,
|
||||
const ConstraintsDescription & constraints_,
|
||||
const String & comment,
|
||||
ContextPtr context_,
|
||||
std::optional<FormatSettings> format_settings_)
|
||||
: IStorage(table_id_)
|
||||
, base_configuration{configuration_.url, configuration_.auth_settings, configuration_.request_settings, configuration_.headers}
|
||||
, log(&Poco::Logger::get("StorageHudi (" + table_id_.table_name + ")"))
|
||||
, table_path(base_configuration.uri.key)
|
||||
namespace
|
||||
{
|
||||
StorageInMemoryMetadata storage_metadata;
|
||||
StorageS3::updateS3Configuration(context_, base_configuration);
|
||||
|
||||
auto keys = getKeysFromS3();
|
||||
auto new_uri = base_configuration.uri.uri.toString() + generateQueryFromKeys(keys, configuration_.format);
|
||||
|
||||
LOG_DEBUG(log, "New uri: {}", new_uri);
|
||||
LOG_DEBUG(log, "Table path: {}", table_path);
|
||||
|
||||
StorageS3Configuration new_configuration;
|
||||
new_configuration.url = new_uri;
|
||||
new_configuration.auth_settings.access_key_id = configuration_.auth_settings.access_key_id;
|
||||
new_configuration.auth_settings.secret_access_key = configuration_.auth_settings.secret_access_key;
|
||||
new_configuration.format = configuration_.format;
|
||||
|
||||
if (columns_.empty())
|
||||
{
|
||||
columns_ = StorageS3::getTableStructureFromData(
|
||||
new_configuration, /*distributed processing*/ false, format_settings_, context_, nullptr);
|
||||
storage_metadata.setColumns(columns_);
|
||||
}
|
||||
else
|
||||
storage_metadata.setColumns(columns_);
|
||||
|
||||
storage_metadata.setConstraints(constraints_);
|
||||
storage_metadata.setComment(comment);
|
||||
setInMemoryMetadata(storage_metadata);
|
||||
|
||||
s3engine = std::make_shared<StorageS3>(
|
||||
new_configuration,
|
||||
table_id_,
|
||||
columns_,
|
||||
constraints_,
|
||||
comment,
|
||||
context_,
|
||||
format_settings_,
|
||||
/* distributed_processing_ */ false,
|
||||
nullptr);
|
||||
StorageS3::S3Configuration getBaseConfiguration(const StorageS3Configuration & configuration)
|
||||
{
|
||||
return {configuration.url, configuration.auth_settings, configuration.request_settings, configuration.headers};
|
||||
}
|
||||
|
||||
Pipe StorageHudi::read(
|
||||
const Names & column_names,
|
||||
const StorageSnapshotPtr & storage_snapshot,
|
||||
SelectQueryInfo & query_info,
|
||||
ContextPtr context,
|
||||
QueryProcessingStage::Enum processed_stage,
|
||||
size_t max_block_size,
|
||||
size_t num_streams)
|
||||
{
|
||||
StorageS3::updateS3Configuration(context, base_configuration);
|
||||
return s3engine->read(column_names, storage_snapshot, query_info, context, processed_stage, max_block_size, num_streams);
|
||||
}
|
||||
|
||||
std::vector<std::string> StorageHudi::getKeysFromS3()
|
||||
{
|
||||
std::vector<std::string> keys;
|
||||
|
||||
const auto & client = base_configuration.client;
|
||||
|
||||
Aws::S3::Model::ListObjectsV2Request request;
|
||||
Aws::S3::Model::ListObjectsV2Outcome outcome;
|
||||
|
||||
bool is_finished{false};
|
||||
const auto bucket{base_configuration.uri.bucket};
|
||||
|
||||
request.SetBucket(bucket);
|
||||
request.SetPrefix(table_path);
|
||||
|
||||
while (!is_finished)
|
||||
{
|
||||
outcome = client->ListObjectsV2(request);
|
||||
if (!outcome.IsSuccess())
|
||||
throw Exception(
|
||||
ErrorCodes::S3_ERROR,
|
||||
"Could not list objects in bucket {} with key {}, S3 exception: {}, message: {}",
|
||||
quoteString(bucket),
|
||||
quoteString(table_path),
|
||||
backQuote(outcome.GetError().GetExceptionName()),
|
||||
quoteString(outcome.GetError().GetMessage()));
|
||||
|
||||
const auto & result_batch = outcome.GetResult().GetContents();
|
||||
for (const auto & obj : result_batch)
|
||||
{
|
||||
const auto & filename = obj.GetKey().substr(table_path.size()); /// Object name without tablepath prefix.
|
||||
keys.push_back(filename);
|
||||
LOG_DEBUG(log, "Found file: {}", filename);
|
||||
}
|
||||
|
||||
request.SetContinuationToken(outcome.GetResult().GetNextContinuationToken());
|
||||
is_finished = !outcome.GetResult().GetIsTruncated();
|
||||
}
|
||||
|
||||
return keys;
|
||||
}
|
||||
|
||||
String StorageHudi::generateQueryFromKeys(const std::vector<std::string> & keys, const String & format)
|
||||
/// Apache Hudi store parts of data in different files.
|
||||
/// Every part file has timestamp in it.
|
||||
/// Every partition(directory) in Apache Hudi has different versions of part.
|
||||
/// To find needed parts we need to find out latest part file for every partition.
|
||||
/// Part format is usually parquet, but can differ.
|
||||
String generateQueryFromKeys(const std::vector<std::string> & keys, const String & format)
|
||||
{
|
||||
/// For each partition path take only latest file.
|
||||
struct FileInfo
|
||||
@ -187,6 +92,138 @@ String StorageHudi::generateQueryFromKeys(const std::vector<std::string> & keys,
|
||||
return "{" + list_of_keys + "}";
|
||||
}
|
||||
|
||||
std::vector<std::string> getKeysFromS3(const StorageS3::S3Configuration & base_configuration, const std::string & table_path, Poco::Logger * log)
|
||||
{
|
||||
std::vector<std::string> keys;
|
||||
|
||||
const auto & client = base_configuration.client;
|
||||
|
||||
Aws::S3::Model::ListObjectsV2Request request;
|
||||
Aws::S3::Model::ListObjectsV2Outcome outcome;
|
||||
|
||||
bool is_finished{false};
|
||||
const auto bucket{base_configuration.uri.bucket};
|
||||
|
||||
request.SetBucket(bucket);
|
||||
request.SetPrefix(table_path);
|
||||
|
||||
while (!is_finished)
|
||||
{
|
||||
outcome = client->ListObjectsV2(request);
|
||||
if (!outcome.IsSuccess())
|
||||
throw Exception(
|
||||
ErrorCodes::S3_ERROR,
|
||||
"Could not list objects in bucket {} with key {}, S3 exception: {}, message: {}",
|
||||
quoteString(bucket),
|
||||
quoteString(table_path),
|
||||
backQuote(outcome.GetError().GetExceptionName()),
|
||||
quoteString(outcome.GetError().GetMessage()));
|
||||
|
||||
const auto & result_batch = outcome.GetResult().GetContents();
|
||||
for (const auto & obj : result_batch)
|
||||
{
|
||||
const auto & filename = obj.GetKey().substr(table_path.size()); /// Object name without tablepath prefix.
|
||||
keys.push_back(filename);
|
||||
LOG_DEBUG(log, "Found file: {}", filename);
|
||||
}
|
||||
|
||||
request.SetContinuationToken(outcome.GetResult().GetNextContinuationToken());
|
||||
is_finished = !outcome.GetResult().GetIsTruncated();
|
||||
}
|
||||
|
||||
return keys;
|
||||
}
|
||||
|
||||
|
||||
StorageS3Configuration getAdjustedS3Configuration(
|
||||
StorageS3::S3Configuration & base_configuration,
|
||||
const StorageS3Configuration & configuration,
|
||||
const std::string & table_path,
|
||||
Poco::Logger * log)
|
||||
{
|
||||
auto keys = getKeysFromS3(base_configuration, table_path, log);
|
||||
auto new_uri = base_configuration.uri.uri.toString() + generateQueryFromKeys(keys, configuration.format);
|
||||
|
||||
LOG_DEBUG(log, "New uri: {}", new_uri);
|
||||
LOG_DEBUG(log, "Table path: {}", table_path);
|
||||
|
||||
StorageS3Configuration new_configuration;
|
||||
new_configuration.url = new_uri;
|
||||
new_configuration.auth_settings.access_key_id = configuration.auth_settings.access_key_id;
|
||||
new_configuration.auth_settings.secret_access_key = configuration.auth_settings.secret_access_key;
|
||||
new_configuration.format = configuration.format;
|
||||
|
||||
return new_configuration;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
StorageHudi::StorageHudi(
|
||||
const StorageS3Configuration & configuration_,
|
||||
const StorageID & table_id_,
|
||||
ColumnsDescription columns_,
|
||||
const ConstraintsDescription & constraints_,
|
||||
const String & comment,
|
||||
ContextPtr context_,
|
||||
std::optional<FormatSettings> format_settings_)
|
||||
: IStorage(table_id_)
|
||||
, base_configuration{getBaseConfiguration(configuration_)}
|
||||
, log(&Poco::Logger::get("StorageHudi (" + table_id_.table_name + ")"))
|
||||
, table_path(base_configuration.uri.key)
|
||||
{
|
||||
StorageInMemoryMetadata storage_metadata;
|
||||
StorageS3::updateS3Configuration(context_, base_configuration);
|
||||
|
||||
auto new_configuration = getAdjustedS3Configuration(base_configuration, configuration_, table_path, log);
|
||||
|
||||
if (columns_.empty())
|
||||
{
|
||||
columns_ = StorageS3::getTableStructureFromData(
|
||||
new_configuration, /*distributed processing*/ false, format_settings_, context_, nullptr);
|
||||
storage_metadata.setColumns(columns_);
|
||||
}
|
||||
else
|
||||
storage_metadata.setColumns(columns_);
|
||||
|
||||
storage_metadata.setConstraints(constraints_);
|
||||
storage_metadata.setComment(comment);
|
||||
setInMemoryMetadata(storage_metadata);
|
||||
|
||||
s3engine = std::make_shared<StorageS3>(
|
||||
new_configuration,
|
||||
table_id_,
|
||||
columns_,
|
||||
constraints_,
|
||||
comment,
|
||||
context_,
|
||||
format_settings_,
|
||||
/* distributed_processing_ */ false,
|
||||
nullptr);
|
||||
}
|
||||
|
||||
Pipe StorageHudi::read(
|
||||
const Names & column_names,
|
||||
const StorageSnapshotPtr & storage_snapshot,
|
||||
SelectQueryInfo & query_info,
|
||||
ContextPtr context,
|
||||
QueryProcessingStage::Enum processed_stage,
|
||||
size_t max_block_size,
|
||||
size_t num_streams)
|
||||
{
|
||||
StorageS3::updateS3Configuration(context, base_configuration);
|
||||
return s3engine->read(column_names, storage_snapshot, query_info, context, processed_stage, max_block_size, num_streams);
|
||||
}
|
||||
|
||||
ColumnsDescription StorageHudi::getTableStructureFromData(
|
||||
const StorageS3Configuration & configuration, const std::optional<FormatSettings> & format_settings, ContextPtr ctx)
|
||||
{
|
||||
auto base_configuration = getBaseConfiguration(configuration);
|
||||
StorageS3::updateS3Configuration(ctx, base_configuration);
|
||||
auto new_configuration = getAdjustedS3Configuration(
|
||||
base_configuration, configuration, base_configuration.uri.key, &Poco::Logger::get("StorageDeltaLake"));
|
||||
return StorageS3::getTableStructureFromData(
|
||||
new_configuration, /*distributed processing*/ false, format_settings, ctx, /*object_infos*/ nullptr);
|
||||
}
|
||||
|
||||
void registerStorageHudi(StorageFactory & factory)
|
||||
{
|
||||
|
@ -48,16 +48,11 @@ public:
|
||||
size_t max_block_size,
|
||||
size_t num_streams) override;
|
||||
|
||||
static ColumnsDescription getTableStructureFromData(
|
||||
const StorageS3Configuration & configuration,
|
||||
const std::optional<FormatSettings> & format_settings,
|
||||
ContextPtr ctx);
|
||||
private:
|
||||
std::vector<std::string> getKeysFromS3();
|
||||
|
||||
/// Apache Hudi store parts of data in different files.
|
||||
/// Every part file has timestamp in it.
|
||||
/// Every partition(directory) in Apache Hudi has different versions of part.
|
||||
/// To find needed parts we need to find out latest part file for every partition.
|
||||
/// Part format is usually parquet, but can differ.
|
||||
static String generateQueryFromKeys(const std::vector<std::string> & keys, const String & format);
|
||||
|
||||
StorageS3::S3Configuration base_configuration;
|
||||
std::shared_ptr<StorageS3> s3engine;
|
||||
Poco::Logger * log;
|
||||
|
@ -214,7 +214,7 @@ private:
|
||||
friend class StorageS3Cluster;
|
||||
friend class TableFunctionS3Cluster;
|
||||
friend class StorageHudi;
|
||||
friend class StorageDelta;
|
||||
friend class StorageDeltaLake;
|
||||
|
||||
S3Configuration s3_configuration;
|
||||
std::vector<String> keys;
|
||||
|
@ -34,7 +34,7 @@ void registerStorageS3(StorageFactory & factory);
|
||||
void registerStorageCOS(StorageFactory & factory);
|
||||
void registerStorageOSS(StorageFactory & factory);
|
||||
void registerStorageHudi(StorageFactory & factory);
|
||||
void registerStorageDelta(StorageFactory & factory);
|
||||
void registerStorageDeltaLake(StorageFactory & factory);
|
||||
#endif
|
||||
|
||||
#if USE_HDFS
|
||||
@ -123,7 +123,7 @@ void registerStorages()
|
||||
registerStorageCOS(factory);
|
||||
registerStorageOSS(factory);
|
||||
registerStorageHudi(factory);
|
||||
registerStorageDelta(factory);
|
||||
registerStorageDeltaLake(factory);
|
||||
#endif
|
||||
|
||||
#if USE_HDFS
|
||||
|
@ -10,7 +10,7 @@
|
||||
# include <Interpreters/evaluateConstantExpression.h>
|
||||
# include <Interpreters/parseColumnsListForTableFunction.h>
|
||||
# include <Parsers/ASTLiteral.h>
|
||||
# include <Storages/StorageDelta.h>
|
||||
# include <Storages/StorageDeltaLake.h>
|
||||
# include <Storages/StorageURL.h>
|
||||
# include <Storages/checkAndGetLiteralArgument.h>
|
||||
# include <TableFunctions/TableFunctionDeltaLake.h>
|
||||
@ -27,7 +27,7 @@ namespace ErrorCodes
|
||||
}
|
||||
|
||||
|
||||
void TableFunctionDelta::parseArgumentsImpl(
|
||||
void TableFunctionDeltaLake::parseArgumentsImpl(
|
||||
const String & error_message, ASTs & args, ContextPtr context, StorageS3Configuration & base_configuration)
|
||||
{
|
||||
if (args.empty() || args.size() > 6)
|
||||
@ -100,7 +100,7 @@ void TableFunctionDelta::parseArgumentsImpl(
|
||||
= checkAndGetLiteralArgument<String>(args[args_to_idx["secret_access_key"]], "secret_access_key");
|
||||
}
|
||||
|
||||
void TableFunctionDelta::parseArguments(const ASTPtr & ast_function, ContextPtr context)
|
||||
void TableFunctionDeltaLake::parseArguments(const ASTPtr & ast_function, ContextPtr context)
|
||||
{
|
||||
/// Parse args
|
||||
ASTs & args_func = ast_function->children;
|
||||
@ -125,18 +125,18 @@ void TableFunctionDelta::parseArguments(const ASTPtr & ast_function, ContextPtr
|
||||
parseArgumentsImpl(message, args, context, configuration);
|
||||
}
|
||||
|
||||
ColumnsDescription TableFunctionDelta::getActualTableStructure(ContextPtr context) const
|
||||
ColumnsDescription TableFunctionDeltaLake::getActualTableStructure(ContextPtr context) const
|
||||
{
|
||||
if (configuration.structure == "auto")
|
||||
{
|
||||
context->checkAccess(getSourceAccessType());
|
||||
return StorageS3::getTableStructureFromData(configuration, false, std::nullopt, context);
|
||||
return StorageDeltaLake::getTableStructureFromData(configuration, std::nullopt, context);
|
||||
}
|
||||
|
||||
return parseColumnsListFromString(configuration.structure, context);
|
||||
}
|
||||
|
||||
StoragePtr TableFunctionDelta::executeImpl(
|
||||
StoragePtr TableFunctionDeltaLake::executeImpl(
|
||||
const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/) const
|
||||
{
|
||||
Poco::URI uri(configuration.url);
|
||||
@ -146,7 +146,7 @@ StoragePtr TableFunctionDelta::executeImpl(
|
||||
if (configuration.structure != "auto")
|
||||
columns = parseColumnsListFromString(configuration.structure, context);
|
||||
|
||||
StoragePtr storage = std::make_shared<StorageDelta>(
|
||||
StoragePtr storage = std::make_shared<StorageDeltaLake>(
|
||||
configuration, StorageID(getDatabaseName(), table_name), columns, ConstraintsDescription{}, String{}, context, std::nullopt);
|
||||
|
||||
storage->startup();
|
||||
@ -155,9 +155,9 @@ StoragePtr TableFunctionDelta::executeImpl(
|
||||
}
|
||||
|
||||
|
||||
void registerTableFunctionDelta(TableFunctionFactory & factory)
|
||||
void registerTableFunctionDeltaLake(TableFunctionFactory & factory)
|
||||
{
|
||||
factory.registerFunction<TableFunctionDelta>(
|
||||
factory.registerFunction<TableFunctionDeltaLake>(
|
||||
{.documentation
|
||||
= {R"(The table function can be used to read the DeltaLake table stored on object store.)",
|
||||
Documentation::Examples{{"deltaLake", "SELECT * FROM deltaLake(url, access_key_id, secret_access_key)"}},
|
||||
|
@ -16,7 +16,7 @@ class TableFunctionS3Cluster;
|
||||
|
||||
/* deltaLake(source, [access_key_id, secret_access_key,] format, structure[, compression]) - creates a temporary DeltaLake table on S3.
|
||||
*/
|
||||
class TableFunctionDelta : public ITableFunction
|
||||
class TableFunctionDeltaLake : public ITableFunction
|
||||
{
|
||||
public:
|
||||
static constexpr auto name = "deltaLake";
|
||||
|
@ -89,9 +89,72 @@ StoragePtr TableFunctionFormat::executeImpl(const ASTPtr & /*ast_function*/, Con
|
||||
return res;
|
||||
}
|
||||
|
||||
static const Documentation format_table_function_documentation =
|
||||
{
|
||||
R"(
|
||||
Extracts table structure from data and parses it according to specified input format.
|
||||
Syntax: `format(format_name, data)`.
|
||||
Parameters:
|
||||
- `format_name` - the format of the data.
|
||||
- `data ` - String literal or constant expression that returns a string containing data in specified format.
|
||||
Returned value: A table with data parsed from `data` argument according specified format and extracted schema.
|
||||
)",
|
||||
Documentation::Examples
|
||||
{
|
||||
{
|
||||
"First example",
|
||||
R"(
|
||||
Query:
|
||||
```
|
||||
:) select * from format(JSONEachRow,
|
||||
$$
|
||||
{"a": "Hello", "b": 111}
|
||||
{"a": "World", "b": 123}
|
||||
{"a": "Hello", "b": 112}
|
||||
{"a": "World", "b": 124}
|
||||
$$)
|
||||
```
|
||||
|
||||
Result:
|
||||
```
|
||||
┌───b─┬─a─────┐
|
||||
│ 111 │ Hello │
|
||||
│ 123 │ World │
|
||||
│ 112 │ Hello │
|
||||
│ 124 │ World │
|
||||
└─────┴───────┘
|
||||
```
|
||||
)"
|
||||
},
|
||||
{
|
||||
"Second example",
|
||||
R"(
|
||||
Query:
|
||||
```
|
||||
:) desc format(JSONEachRow,
|
||||
$$
|
||||
{"a": "Hello", "b": 111}
|
||||
{"a": "World", "b": 123}
|
||||
{"a": "Hello", "b": 112}
|
||||
{"a": "World", "b": 124}
|
||||
$$)
|
||||
```
|
||||
|
||||
Result:
|
||||
```
|
||||
┌─name─┬─type──────────────┬─default_type─┬─default_expression─┬─comment─┬─codec_expression─┬─ttl_expression─┐
|
||||
│ b │ Nullable(Float64) │ │ │ │ │ │
|
||||
│ a │ Nullable(String) │ │ │ │ │ │
|
||||
└──────┴───────────────────┴──────────────┴────────────────────┴─────────┴──────────────────┴────────────────┘
|
||||
```
|
||||
)"
|
||||
},
|
||||
},
|
||||
Documentation::Categories{"format", "table-functions"}
|
||||
};
|
||||
|
||||
void registerTableFunctionFormat(TableFunctionFactory & factory)
|
||||
{
|
||||
factory.registerFunction<TableFunctionFormat>({}, TableFunctionFactory::CaseInsensitive);
|
||||
factory.registerFunction<TableFunctionFormat>({format_table_function_documentation, false}, TableFunctionFactory::CaseInsensitive);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -130,7 +130,7 @@ ColumnsDescription TableFunctionHudi::getActualTableStructure(ContextPtr context
|
||||
if (configuration.structure == "auto")
|
||||
{
|
||||
context->checkAccess(getSourceAccessType());
|
||||
return StorageS3::getTableStructureFromData(configuration, false, std::nullopt, context);
|
||||
return StorageHudi::getTableStructureFromData(configuration, std::nullopt, context);
|
||||
}
|
||||
|
||||
return parseColumnsListFromString(configuration.structure, context);
|
||||
|
@ -28,7 +28,7 @@ void registerTableFunctions()
|
||||
registerTableFunctionS3Cluster(factory);
|
||||
registerTableFunctionCOS(factory);
|
||||
registerTableFunctionHudi(factory);
|
||||
registerTableFunctionDelta(factory);
|
||||
registerTableFunctionDeltaLake(factory);
|
||||
registerTableFunctionOSS(factory);
|
||||
|
||||
#endif
|
||||
|
@ -25,7 +25,7 @@ void registerTableFunctionS3(TableFunctionFactory & factory);
|
||||
void registerTableFunctionS3Cluster(TableFunctionFactory & factory);
|
||||
void registerTableFunctionCOS(TableFunctionFactory & factory);
|
||||
void registerTableFunctionHudi(TableFunctionFactory & factory);
|
||||
void registerTableFunctionDelta(TableFunctionFactory & factory);
|
||||
void registerTableFunctionDeltaLake(TableFunctionFactory & factory);
|
||||
void registerTableFunctionOSS(TableFunctionFactory & factory);
|
||||
#endif
|
||||
|
||||
|
@ -84,6 +84,8 @@ def test_restart_zookeeper(start_cluster):
|
||||
time.sleep(5)
|
||||
|
||||
for table_id in range(NUM_TABLES):
|
||||
node1.query(
|
||||
f"INSERT INTO test_table_{table_id} VALUES (6), (7), (8), (9), (10);"
|
||||
node1.query_with_retry(
|
||||
sql=f"INSERT INTO test_table_{table_id} VALUES (6), (7), (8), (9), (10);",
|
||||
retry_count=10,
|
||||
sleep_time=1,
|
||||
)
|
||||
|
@ -1,7 +1,6 @@
|
||||
import logging
|
||||
import os
|
||||
import json
|
||||
|
||||
import helpers.client
|
||||
import pytest
|
||||
from helpers.cluster import ClickHouseCluster
|
||||
@ -143,3 +142,25 @@ def test_select_query(started_cluster):
|
||||
),
|
||||
).splitlines()
|
||||
assert len(result) > 0
|
||||
|
||||
|
||||
def test_describe_query(started_cluster):
|
||||
instance = started_cluster.instances["main_server"]
|
||||
bucket = started_cluster.minio_bucket
|
||||
result = instance.query(
|
||||
f"DESCRIBE deltaLake('http://{started_cluster.minio_ip}:{started_cluster.minio_port}/{bucket}/test_table/', 'minio', 'minio123') FORMAT TSV",
|
||||
)
|
||||
|
||||
assert result == TSV(
|
||||
[
|
||||
["begin_lat", "Nullable(Float64)"],
|
||||
["begin_lon", "Nullable(Float64)"],
|
||||
["driver", "Nullable(String)"],
|
||||
["end_lat", "Nullable(Float64)"],
|
||||
["end_lon", "Nullable(Float64)"],
|
||||
["fare", "Nullable(Float64)"],
|
||||
["rider", "Nullable(String)"],
|
||||
["ts", "Nullable(Int64)"],
|
||||
["uuid", "Nullable(String)"],
|
||||
]
|
||||
)
|
||||
|
18
tests/integration/test_storage_hdfs/configs/cluster.xml
Normal file
18
tests/integration/test_storage_hdfs/configs/cluster.xml
Normal file
@ -0,0 +1,18 @@
|
||||
<clickhouse>
|
||||
<remote_servers>
|
||||
<cluster_non_existent_port>
|
||||
<shard>
|
||||
<replica>
|
||||
<host>node1</host>
|
||||
<port>9000</port>
|
||||
</replica>
|
||||
</shard>
|
||||
<shard>
|
||||
<replica>
|
||||
<host>node1</host>
|
||||
<port>19000</port>
|
||||
</replica>
|
||||
</shard>
|
||||
</cluster_non_existent_port>
|
||||
</remote_servers>
|
||||
</clickhouse>
|
@ -9,7 +9,11 @@ from pyhdfs import HdfsClient
|
||||
cluster = ClickHouseCluster(__file__)
|
||||
node1 = cluster.add_instance(
|
||||
"node1",
|
||||
main_configs=["configs/macro.xml", "configs/schema_cache.xml"],
|
||||
main_configs=[
|
||||
"configs/macro.xml",
|
||||
"configs/schema_cache.xml",
|
||||
"configs/cluster.xml",
|
||||
],
|
||||
with_hdfs=True,
|
||||
)
|
||||
|
||||
@ -783,6 +787,32 @@ def test_schema_inference_cache(started_cluster):
|
||||
check_cache_misses(node1, files, 4)
|
||||
|
||||
|
||||
def test_hdfsCluster_skip_unavailable_shards(started_cluster):
|
||||
hdfs_api = started_cluster.hdfs_api
|
||||
node = started_cluster.instances["node1"]
|
||||
data = "1\tSerialize\t555.222\n2\tData\t777.333\n"
|
||||
hdfs_api.write_data("/skip_unavailable_shards", data)
|
||||
|
||||
assert (
|
||||
node1.query(
|
||||
"select * from hdfsCluster('cluster_non_existent_port', 'hdfs://hdfs1:9000/skip_unavailable_shards', 'TSV', 'id UInt64, text String, number Float64') settings skip_unavailable_shards = 1"
|
||||
)
|
||||
== data
|
||||
)
|
||||
|
||||
|
||||
def test_hdfsCluster_unskip_unavailable_shards(started_cluster):
|
||||
hdfs_api = started_cluster.hdfs_api
|
||||
node = started_cluster.instances["node1"]
|
||||
data = "1\tSerialize\t555.222\n2\tData\t777.333\n"
|
||||
hdfs_api.write_data("/unskip_unavailable_shards", data)
|
||||
error = node.query_and_get_error(
|
||||
"select * from hdfsCluster('cluster_non_existent_port', 'hdfs://hdfs1:9000/unskip_unavailable_shards', 'TSV', 'id UInt64, text String, number Float64')"
|
||||
)
|
||||
|
||||
assert "NETWORK_ERROR" in error
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
cluster.start()
|
||||
input("Cluster created, press any key to destroy...")
|
||||
|
@ -161,7 +161,7 @@ def test_select_query(started_cluster):
|
||||
result = run_query(instance, distinct_select_query)
|
||||
result_table_function = run_query(
|
||||
instance,
|
||||
distinct_select_query.format(
|
||||
distinct_select_table_function_query.format(
|
||||
ip=started_cluster.minio_ip, port=started_cluster.minio_port, bucket=bucket
|
||||
),
|
||||
)
|
||||
@ -173,3 +173,31 @@ def test_select_query(started_cluster):
|
||||
|
||||
assert TSV(result) == TSV(expected)
|
||||
assert TSV(result_table_function) == TSV(expected)
|
||||
|
||||
|
||||
def test_describe_query(started_cluster):
|
||||
instance = started_cluster.instances["main_server"]
|
||||
bucket = started_cluster.minio_bucket
|
||||
result = instance.query(
|
||||
f"DESCRIBE hudi('http://{started_cluster.minio_ip}:{started_cluster.minio_port}/{bucket}/test_table/', 'minio', 'minio123') FORMAT TSV",
|
||||
)
|
||||
|
||||
assert result == TSV(
|
||||
[
|
||||
["_hoodie_commit_time", "Nullable(String)"],
|
||||
["_hoodie_commit_seqno", "Nullable(String)"],
|
||||
["_hoodie_record_key", "Nullable(String)"],
|
||||
["_hoodie_partition_path", "Nullable(String)"],
|
||||
["_hoodie_file_name", "Nullable(String)"],
|
||||
["begin_lat", "Nullable(Float64)"],
|
||||
["begin_lon", "Nullable(Float64)"],
|
||||
["driver", "Nullable(String)"],
|
||||
["end_lat", "Nullable(Float64)"],
|
||||
["end_lon", "Nullable(Float64)"],
|
||||
["fare", "Nullable(Float64)"],
|
||||
["partitionpath", "Nullable(String)"],
|
||||
["rider", "Nullable(String)"],
|
||||
["ts", "Nullable(Int64)"],
|
||||
["uuid", "Nullable(String)"],
|
||||
]
|
||||
)
|
||||
|
@ -1,11 +1,14 @@
|
||||
1
|
||||
2
|
||||
3
|
||||
3.1
|
||||
4
|
||||
5
|
||||
5.1
|
||||
6
|
||||
7
|
||||
7.1
|
||||
7.2
|
||||
8
|
||||
9
|
||||
text_log non empty
|
||||
|
@ -37,12 +37,20 @@ rm -f "$tmp_file" >/dev/null 2>&1
|
||||
echo 3
|
||||
# failure at before query start
|
||||
$CLICKHOUSE_CLIENT \
|
||||
--query="SELECT 'find_me_TOPSECRET=TOPSECRET' FROM non_existing_table FORMAT Null" \
|
||||
--query="SELECT 1 FROM system.numbers WHERE credit_card_number='find_me_TOPSECRET=TOPSECRET' FORMAT Null" \
|
||||
--log_queries=1 --ignore-error --multiquery |& grep -v '^(query: ' > "$tmp_file"
|
||||
|
||||
grep -F 'find_me_[hidden]' "$tmp_file" >/dev/null || echo 'fail 3a'
|
||||
grep -F 'TOPSECRET' "$tmp_file" && echo 'fail 3b'
|
||||
|
||||
echo '3.1'
|
||||
echo "SELECT 1 FROM system.numbers WHERE credit_card_number='find_me_TOPSECRET=TOPSECRET' FORMAT Null" | ${CLICKHOUSE_CURL} -sSg "${CLICKHOUSE_URL}" -d @- >"$tmp_file" 2>&1
|
||||
|
||||
grep -F 'find_me_[hidden]' "$tmp_file" >/dev/null || echo 'fail 3.1a'
|
||||
grep -F 'TOPSECRET' "$tmp_file" && echo 'fail 3.1b'
|
||||
|
||||
#echo "SELECT 1 FROM system.numbers WHERE credit_card_number='find_me_TOPSECRET=TOPSECRET' FORMAT Null" | curl -sSg http://172.17.0.3:8123/ -d @-
|
||||
|
||||
rm -f "$tmp_file" >/dev/null 2>&1
|
||||
echo 4
|
||||
# failure at the end of query
|
||||
@ -100,6 +108,21 @@ $CLICKHOUSE_CLIENT \
|
||||
--server_logs_file=/dev/null \
|
||||
--query="select * from system.query_log where current_database = currentDatabase() AND event_date >= yesterday() and query like '%TOPSECRET%';"
|
||||
|
||||
echo '7.1'
|
||||
# query_log exceptions
|
||||
$CLICKHOUSE_CLIENT \
|
||||
--server_logs_file=/dev/null \
|
||||
--query="select * from system.query_log where current_database = currentDatabase() AND event_date >= yesterday() and exception like '%TOPSECRET%'"
|
||||
|
||||
echo '7.2'
|
||||
|
||||
# not perfect: when run in parallel with other tests that check can give false-negative result
|
||||
# because other tests can overwrite the last_error_message, where we check the absence of sensitive data.
|
||||
# But it's still good enough for CI - in case of regressions it will start flapping (normally it shouldn't)
|
||||
$CLICKHOUSE_CLIENT \
|
||||
--server_logs_file=/dev/null \
|
||||
--query="select * from system.errors where last_error_message like '%TOPSECRET%';"
|
||||
|
||||
|
||||
rm -f "$tmp_file" >/dev/null 2>&1
|
||||
echo 8
|
||||
|
@ -3,7 +3,6 @@ clusterAllReplicas
|
||||
dictionary
|
||||
executable
|
||||
file
|
||||
format
|
||||
generateRandom
|
||||
input
|
||||
jdbc
|
||||
|
@ -0,0 +1,16 @@
|
||||
-- { echoOn }
|
||||
|
||||
SELECT a, c FROM test_rlp WHERE c%2 == 0 AND b < 5;
|
||||
0 10
|
||||
2 12
|
||||
4 14
|
||||
DROP POLICY IF EXISTS test_rlp_policy ON test_rlp;
|
||||
CREATE ROW POLICY test_rlp_policy ON test_rlp FOR SELECT USING c%2 == 0 TO default;
|
||||
SELECT a, c FROM test_rlp WHERE b < 5 SETTINGS optimize_move_to_prewhere = 0;
|
||||
0 10
|
||||
2 12
|
||||
4 14
|
||||
SELECT a, c FROM test_rlp PREWHERE b < 5;
|
||||
0 10
|
||||
2 12
|
||||
4 14
|
@ -0,0 +1,25 @@
|
||||
DROP TABLE IF EXISTS test_rlp;
|
||||
|
||||
CREATE TABLE test_rlp (a Int32, b Int32) ENGINE=MergeTree() ORDER BY a SETTINGS index_granularity=5;
|
||||
|
||||
INSERT INTO test_rlp SELECT number, number FROM numbers(15);
|
||||
|
||||
ALTER TABLE test_rlp ADD COLUMN c Int32 DEFAULT b+10;
|
||||
|
||||
-- { echoOn }
|
||||
|
||||
SELECT a, c FROM test_rlp WHERE c%2 == 0 AND b < 5;
|
||||
|
||||
DROP POLICY IF EXISTS test_rlp_policy ON test_rlp;
|
||||
|
||||
CREATE ROW POLICY test_rlp_policy ON test_rlp FOR SELECT USING c%2 == 0 TO default;
|
||||
|
||||
SELECT a, c FROM test_rlp WHERE b < 5 SETTINGS optimize_move_to_prewhere = 0;
|
||||
|
||||
SELECT a, c FROM test_rlp PREWHERE b < 5;
|
||||
|
||||
-- { echoOff }
|
||||
|
||||
DROP POLICY test_rlp_policy ON test_rlp;
|
||||
|
||||
DROP TABLE test_rlp;
|
@ -0,0 +1,2 @@
|
||||
SELECT func(1)(2)(3); -- { clientError SYNTAX_ERROR }
|
||||
SELECT * FROM VALUES(1)(2); -- { clientError SYNTAX_ERROR }
|
Loading…
Reference in New Issue
Block a user