Merge pull request #65997 from yariks5s/hive_style_partitioning

Implementing Hive-style partitioning
This commit is contained in:
Yarik Briukhovetskyi 2024-08-13 10:04:21 +00:00 committed by GitHub
commit 086c0f03a6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
51 changed files with 817 additions and 103 deletions

View File

@ -99,10 +99,9 @@ upload_data() {
# iterating over globs will cause redundant file variable to be
# a path to a file, not a filename
# shellcheck disable=SC2045
for file in $(ls "${data_path}"); do
echo "${file}";
./mc cp "${data_path}"/"${file}" clickminio/test/"${file}";
done
if [ -d "${data_path}" ]; then
./mc cp --recursive "${data_path}"/ clickminio/test/
fi
}
setup_aws_credentials() {

View File

@ -5627,6 +5627,12 @@ Disable all insert and mutations (alter table update / alter table delete / alte
Default value: `false`.
## use_hive_partitioning
When enabled, ClickHouse will detect Hive-style partitioning in path (`/name=value/`) in file-like table engines [File](../../engines/table-engines/special/file.md#hive-style-partitioning)/[S3](../../engines/table-engines/integrations/s3.md#hive-style-partitioning)/[URL](../../engines/table-engines/special/url.md#hive-style-partitioning)/[HDFS](../../engines/table-engines/integrations/hdfs.md#hive-style-partitioning)/[AzureBlobStorage](../../engines/table-engines/integrations/azureBlobStorage.md#hive-style-partitioning) and will allow to use partition columns as virtual columns in the query. These virtual columns will have the same names as in the partitioned path, but starting with `_`.
Default value: `false`.
## allow_experimental_time_series_table {#allow-experimental-time-series-table}
Allows creation of tables with the [TimeSeries](../../engines/table-engines/integrations/time-series.md) table engine.

View File

@ -77,3 +77,16 @@ SELECT count(*) FROM azureBlobStorage('DefaultEndpointsProtocol=https;AccountNam
**See Also**
- [AzureBlobStorage Table Engine](/docs/en/engines/table-engines/integrations/azureBlobStorage.md)
## Hive-style partitioning {#hive-style-partitioning}
When setting `use_hive_partitioning` is set to 1, ClickHouse will detect Hive-style partitioning in the path (`/name=value/`) and will allow to use partition columns as virtual columns in the query. These virtual columns will have the same names as in the partitioned path, but starting with `_`.
**Example**
Use virtual column, created with Hive-style partitioning
``` sql
SET use_hive_partitioning = 1;
SELECT * from azureBlobStorage(config, storage_account_url='...', container='...', blob_path='http://data/path/date=*/country=*/code=*/*.parquet') where _date > '2020-01-01' and _country = 'Netherlands' and _code = 42;
```

View File

@ -206,6 +206,19 @@ SELECT count(*) FROM file('big_dir/**/file002', 'CSV', 'name String, value UInt3
- `_size` — Size of the file in bytes. Type: `Nullable(UInt64)`. If the file size is unknown, the value is `NULL`.
- `_time` — Last modified time of the file. Type: `Nullable(DateTime)`. If the time is unknown, the value is `NULL`.
## Hive-style partitioning {#hive-style-partitioning}
When setting `use_hive_partitioning` is set to 1, ClickHouse will detect Hive-style partitioning in the path (`/name=value/`) and will allow to use partition columns as virtual columns in the query. These virtual columns will have the same names as in the partitioned path, but starting with `_`.
**Example**
Use virtual column, created with Hive-style partitioning
``` sql
SET use_hive_partitioning = 1;
SELECT * from file('data/path/date=*/country=*/code=*/*.parquet') where _date > '2020-01-01' and _country = 'Netherlands' and _code = 42;
```
## Settings {#settings}
- [engine_file_empty_if_not_exists](/docs/en/operations/settings/settings.md#engine-file-empty_if-not-exists) - allows to select empty data from a file that doesn't exist. Disabled by default.

View File

@ -100,6 +100,19 @@ FROM hdfs('hdfs://hdfs1:9000/big_dir/file{0..9}{0..9}{0..9}', 'CSV', 'name Strin
- `_size` — Size of the file in bytes. Type: `Nullable(UInt64)`. If the size is unknown, the value is `NULL`.
- `_time` — Last modified time of the file. Type: `Nullable(DateTime)`. If the time is unknown, the value is `NULL`.
## Hive-style partitioning {#hive-style-partitioning}
When setting `use_hive_partitioning` is set to 1, ClickHouse will detect Hive-style partitioning in the path (`/name=value/`) and will allow to use partition columns as virtual columns in the query. These virtual columns will have the same names as in the partitioned path, but starting with `_`.
**Example**
Use virtual column, created with Hive-style partitioning
``` sql
SET use_hive_partitioning = 1;
SELECT * from HDFS('hdfs://hdfs1:9000/data/path/date=*/country=*/code=*/*.parquet') where _date > '2020-01-01' and _country = 'Netherlands' and _code = 42;
```
## Storage Settings {#storage-settings}
- [hdfs_truncate_on_insert](/docs/en/operations/settings/settings.md#hdfs_truncate_on_insert) - allows to truncate file before insert into it. Disabled by default.

View File

@ -274,6 +274,19 @@ FROM s3(
- `_size` — Size of the file in bytes. Type: `Nullable(UInt64)`. If the file size is unknown, the value is `NULL`. In case of archive shows uncompressed file size of the file inside the archive.
- `_time` — Last modified time of the file. Type: `Nullable(DateTime)`. If the time is unknown, the value is `NULL`.
## Hive-style partitioning {#hive-style-partitioning}
When setting `use_hive_partitioning` is set to 1, ClickHouse will detect Hive-style partitioning in the path (`/name=value/`) and will allow to use partition columns as virtual columns in the query. These virtual columns will have the same names as in the partitioned path, but starting with `_`.
**Example**
Use virtual column, created with Hive-style partitioning
``` sql
SET use_hive_partitioning = 1;
SELECT * from s3('s3://data/path/date=*/country=*/code=*/*.parquet') where _date > '2020-01-01' and _country = 'Netherlands' and _code = 42;
```
## Storage Settings {#storage-settings}
- [s3_truncate_on_insert](/docs/en/operations/settings/settings.md#s3_truncate_on_insert) - allows to truncate file before insert into it. Disabled by default.

View File

@ -55,6 +55,19 @@ Character `|` inside patterns is used to specify failover addresses. They are it
- `_size` — Size of the resource in bytes. Type: `Nullable(UInt64)`. If the size is unknown, the value is `NULL`.
- `_time` — Last modified time of the file. Type: `Nullable(DateTime)`. If the time is unknown, the value is `NULL`.
## Hive-style partitioning {#hive-style-partitioning}
When setting `use_hive_partitioning` is set to 1, ClickHouse will detect Hive-style partitioning in the path (`/name=value/`) and will allow to use partition columns as virtual columns in the query. These virtual columns will have the same names as in the partitioned path, but starting with `_`.
**Example**
Use virtual column, created with Hive-style partitioning
``` sql
SET use_hive_partitioning = 1;
SELECT * from url('http://data/path/date=*/country=*/code=*/*.parquet') where _date > '2020-01-01' and _country = 'Netherlands' and _code = 42;
```
## Storage Settings {#storage-settings}
- [engine_url_skip_empty_files](/docs/en/operations/settings/settings.md#engine_url_skip_empty_files) - allows to skip empty files while reading. Disabled by default.

View File

@ -1307,6 +1307,7 @@ try
throw ErrnoException(ErrorCodes::CANNOT_SEEK_THROUGH_FILE, "Input must be seekable file (it will be read twice)");
SingleReadBufferIterator read_buffer_iterator(std::move(file));
schema_columns = readSchemaFromFormat(input_format, {}, read_buffer_iterator, context_const);
}
else

View File

@ -921,6 +921,7 @@ class IColumn;
M(UInt64, grace_hash_join_initial_buckets, 1, "Initial number of grace hash join buckets", 0) \
M(UInt64, grace_hash_join_max_buckets, 1024, "Limit on the number of grace hash join buckets", 0) \
M(Timezone, session_timezone, "", "This setting can be removed in the future due to potential caveats. It is experimental and is not suitable for production usage. The default timezone for current session or query. The server default timezone if empty.", 0) \
M(Bool, use_hive_partitioning, false, "Allows to use hive partitioning for File, URL, S3, AzureBlobStorage and HDFS engines.", 0)\
\
M(Bool, allow_statistics_optimize, false, "Allows using statistics to optimize queries", 0) ALIAS(allow_statistic_optimize) \
M(Bool, allow_experimental_statistics, false, "Allows using statistics", 0) ALIAS(allow_experimental_statistic) \

View File

@ -80,6 +80,7 @@ static std::initializer_list<std::pair<ClickHouseVersion, SettingsChangesHistory
{"restore_replace_external_engines_to_null", false, false, "New setting."},
{"input_format_json_max_depth", 1000000, 1000, "It was unlimited in previous versions, but that was unsafe."},
{"merge_tree_min_bytes_per_task_for_remote_reading", 4194304, 2097152, "Value is unified with `filesystem_prefetch_min_bytes_for_single_read_task`"},
{"use_hive_partitioning", false, false, "Allows to use hive partitioning for File, URL, S3, AzureBlobStorage and HDFS engines."},
{"allow_experimental_kafka_offsets_storage_in_keeper", false, false, "Allow the usage of experimental Kafka storage engine that stores the committed offsets in ClickHouse Keeper"},
{"allow_archive_path_syntax", true, true, "Added new setting to allow disabling archive path syntax."},
{"allow_experimental_time_series_table", false, false, "Added new setting to allow the TimeSeries table engine"},

View File

@ -164,7 +164,7 @@ try
return {*iterator_data.cached_columns, *format_name};
}
schemas_for_union_mode.emplace_back(iterator_data.cached_columns->getAll(), read_buffer_iterator.getLastFileName());
schemas_for_union_mode.emplace_back(iterator_data.cached_columns->getAll(), read_buffer_iterator.getLastFilePath());
continue;
}
@ -250,7 +250,7 @@ try
if (!names_and_types.empty())
read_buffer_iterator.setSchemaToLastFile(ColumnsDescription(names_and_types));
schemas_for_union_mode.emplace_back(names_and_types, read_buffer_iterator.getLastFileName());
schemas_for_union_mode.emplace_back(names_and_types, read_buffer_iterator.getLastFilePath());
}
catch (...)
{
@ -411,7 +411,7 @@ try
throw Exception(ErrorCodes::CANNOT_DETECT_FORMAT, "The data format cannot be detected by the contents of the files. You can specify the format manually");
read_buffer_iterator.setSchemaToLastFile(ColumnsDescription(names_and_types));
schemas_for_union_mode.emplace_back(names_and_types, read_buffer_iterator.getLastFileName());
schemas_for_union_mode.emplace_back(names_and_types, read_buffer_iterator.getLastFilePath());
}
if (format_name && mode == SchemaInferenceMode::DEFAULT)
@ -527,9 +527,9 @@ try
}
catch (Exception & e)
{
auto file_name = read_buffer_iterator.getLastFileName();
if (!file_name.empty())
e.addMessage(fmt::format("(in file/uri {})", file_name));
auto file_path = read_buffer_iterator.getLastFilePath();
if (!file_path.empty())
e.addMessage(fmt::format("(in file/uri {})", file_path));
throw;
}

View File

@ -56,8 +56,8 @@ struct IReadBufferIterator
/// Set auto detected format name.
virtual void setFormatName(const String & /*format_name*/) {}
/// Get last processed file name for better exception messages.
virtual String getLastFileName() const { return ""; }
/// Get last processed file path for better exception messages.
virtual String getLastFilePath() const { return ""; }
/// Return true if method recreateLastReadBuffer is implemented.
virtual bool supportsLastReadBufferRecreation() const { return false; }

View File

@ -445,7 +445,7 @@ StorageHive::StorageHive(
storage_metadata.partition_key = KeyDescription::getKeyFromAST(partition_by_ast, storage_metadata.columns, getContext());
setInMemoryMetadata(storage_metadata);
setVirtuals(VirtualColumnUtils::getVirtualsForFileLikeStorage(storage_metadata.getColumns()));
setVirtuals(VirtualColumnUtils::getVirtualsForFileLikeStorage(storage_metadata.getColumns(), getContext()));
}
void StorageHive::lazyInitialize()

View File

@ -91,8 +91,9 @@ public:
{
ConfigurationPtr configuration = base_configuration->clone();
configuration->setPaths(metadata->getDataFiles());
std::string sample_path;
return Storage::resolveSchemaFromData(
object_storage_, configuration, format_settings_, local_context);
object_storage_, configuration, format_settings_, sample_path, local_context);
}
}

View File

@ -132,7 +132,7 @@ void ReadBufferIterator::setFormatName(const String & format_name)
format = format_name;
}
String ReadBufferIterator::getLastFileName() const
String ReadBufferIterator::getLastFilePath() const
{
if (current_object_info)
return current_object_info->getPath();

View File

@ -33,7 +33,7 @@ public:
void setResultingSchema(const ColumnsDescription & columns) override;
String getLastFileName() const override;
String getLastFilePath() const override;
void setFormatName(const String & format_name) override;

View File

@ -1,4 +1,5 @@
#include <Storages/ObjectStorage/StorageObjectStorage.h>
#include <Core/ColumnWithTypeAndName.h>
#include <Core/Settings.h>
#include <Formats/FormatFactory.h>
@ -33,6 +34,33 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
String StorageObjectStorage::getPathSample(StorageInMemoryMetadata metadata, ContextPtr context)
{
auto query_settings = configuration->getQuerySettings(context);
/// We don't want to throw an exception if there are no files with specified path.
query_settings.throw_on_zero_files_match = false;
bool local_distributed_processing = distributed_processing;
if (context->getSettingsRef().use_hive_partitioning)
local_distributed_processing = false;
auto file_iterator = StorageObjectStorageSource::createFileIterator(
configuration,
query_settings,
object_storage,
local_distributed_processing,
context,
{}, // predicate
metadata.getColumns().getAll(), // virtual_columns
nullptr, // read_keys
{} // file_progress_callback
);
if (auto file = file_iterator->next(0))
return file->getPath();
return "";
}
StorageObjectStorage::StorageObjectStorage(
ConfigurationPtr configuration_,
ObjectStoragePtr object_storage_,
@ -53,7 +81,9 @@ StorageObjectStorage::StorageObjectStorage(
, log(getLogger(fmt::format("Storage{}({})", configuration->getEngineName(), table_id_.getFullTableName())))
{
ColumnsDescription columns{columns_};
resolveSchemaAndFormat(columns, configuration->format, object_storage, configuration, format_settings, context);
std::string sample_path;
resolveSchemaAndFormat(columns, configuration->format, object_storage, configuration, format_settings, sample_path, context);
configuration->check(context);
StorageInMemoryMetadata metadata;
@ -61,7 +91,10 @@ StorageObjectStorage::StorageObjectStorage(
metadata.setConstraints(constraints_);
metadata.setComment(comment);
setVirtuals(VirtualColumnUtils::getVirtualsForFileLikeStorage(metadata.getColumns()));
if (sample_path.empty() && context->getSettingsRef().use_hive_partitioning)
sample_path = getPathSample(metadata, context);
setVirtuals(VirtualColumnUtils::getVirtualsForFileLikeStorage(metadata.getColumns(), context, sample_path, format_settings));
setInMemoryMetadata(metadata);
}
@ -198,7 +231,7 @@ private:
return;
auto context = getContext();
iterator_wrapper = StorageObjectStorageSource::createFileIterator(
configuration, object_storage, distributed_processing,
configuration, configuration->getQuerySettings(context), object_storage, distributed_processing,
context, predicate, virtual_columns, nullptr, context->getFileProgressCallback());
}
};
@ -350,6 +383,7 @@ std::unique_ptr<ReadBufferIterator> StorageObjectStorage::createReadBufferIterat
{
auto file_iterator = StorageObjectStorageSource::createFileIterator(
configuration,
configuration->getQuerySettings(context),
object_storage,
false/* distributed_processing */,
context,
@ -366,33 +400,41 @@ ColumnsDescription StorageObjectStorage::resolveSchemaFromData(
const ObjectStoragePtr & object_storage,
const ConfigurationPtr & configuration,
const std::optional<FormatSettings> & format_settings,
std::string & sample_path,
const ContextPtr & context)
{
ObjectInfos read_keys;
auto iterator = createReadBufferIterator(object_storage, configuration, format_settings, read_keys, context);
return readSchemaFromFormat(configuration->format, format_settings, *iterator, context);
auto schema = readSchemaFromFormat(configuration->format, format_settings, *iterator, context);
sample_path = iterator->getLastFilePath();
return schema;
}
std::string StorageObjectStorage::resolveFormatFromData(
const ObjectStoragePtr & object_storage,
const ConfigurationPtr & configuration,
const std::optional<FormatSettings> & format_settings,
std::string & sample_path,
const ContextPtr & context)
{
ObjectInfos read_keys;
auto iterator = createReadBufferIterator(object_storage, configuration, format_settings, read_keys, context);
return detectFormatAndReadSchema(format_settings, *iterator, context).second;
auto format_and_schema = detectFormatAndReadSchema(format_settings, *iterator, context).second;
sample_path = iterator->getLastFilePath();
return format_and_schema;
}
std::pair<ColumnsDescription, std::string> StorageObjectStorage::resolveSchemaAndFormatFromData(
const ObjectStoragePtr & object_storage,
const ConfigurationPtr & configuration,
const std::optional<FormatSettings> & format_settings,
std::string & sample_path,
const ContextPtr & context)
{
ObjectInfos read_keys;
auto iterator = createReadBufferIterator(object_storage, configuration, format_settings, read_keys, context);
auto [columns, format] = detectFormatAndReadSchema(format_settings, *iterator, context);
sample_path = iterator->getLastFilePath();
configuration->format = format;
return std::pair(columns, format);
}

View File

@ -102,23 +102,28 @@ public:
const ObjectStoragePtr & object_storage,
const ConfigurationPtr & configuration,
const std::optional<FormatSettings> & format_settings,
std::string & sample_path,
const ContextPtr & context);
static std::string resolveFormatFromData(
const ObjectStoragePtr & object_storage,
const ConfigurationPtr & configuration,
const std::optional<FormatSettings> & format_settings,
std::string & sample_path,
const ContextPtr & context);
static std::pair<ColumnsDescription, std::string> resolveSchemaAndFormatFromData(
const ObjectStoragePtr & object_storage,
const ConfigurationPtr & configuration,
const std::optional<FormatSettings> & format_settings,
std::string & sample_path,
const ContextPtr & context);
protected:
virtual void updateConfiguration(ContextPtr local_context);
String getPathSample(StorageInMemoryMetadata metadata, ContextPtr context);
virtual ReadFromFormatInfo prepareReadingFromFormat(
const Strings & requested_columns,
const StorageSnapshotPtr & storage_snapshot,

View File

@ -1,6 +1,8 @@
#include "Storages/ObjectStorage/StorageObjectStorageCluster.h"
#include <Common/Exception.h>
#include <Core/Settings.h>
#include <Formats/FormatFactory.h>
#include <Parsers/queryToString.h>
#include <Processors/Sources/RemoteSource.h>
#include <QueryPipeline/RemoteQueryExecutor.h>
@ -19,6 +21,28 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
String StorageObjectStorageCluster::getPathSample(StorageInMemoryMetadata metadata, ContextPtr context)
{
auto query_settings = configuration->getQuerySettings(context);
/// We don't want to throw an exception if there are no files with specified path.
query_settings.throw_on_zero_files_match = false;
auto file_iterator = StorageObjectStorageSource::createFileIterator(
configuration,
query_settings,
object_storage,
false, // distributed_processing
context,
{}, // predicate
metadata.getColumns().getAll(), // virtual_columns
nullptr, // read_keys
{} // file_progress_callback
);
if (auto file = file_iterator->next(0))
return file->getPath();
return "";
}
StorageObjectStorageCluster::StorageObjectStorageCluster(
const String & cluster_name_,
ConfigurationPtr configuration_,
@ -33,14 +57,18 @@ StorageObjectStorageCluster::StorageObjectStorageCluster(
, object_storage(object_storage_)
{
ColumnsDescription columns{columns_};
resolveSchemaAndFormat(columns, configuration->format, object_storage, configuration, {}, context_);
std::string sample_path;
resolveSchemaAndFormat(columns, configuration->format, object_storage, configuration, {}, sample_path, context_);
configuration->check(context_);
StorageInMemoryMetadata metadata;
metadata.setColumns(columns);
metadata.setConstraints(constraints_);
setVirtuals(VirtualColumnUtils::getVirtualsForFileLikeStorage(metadata.getColumns()));
if (sample_path.empty() && context_->getSettingsRef().use_hive_partitioning)
sample_path = getPathSample(metadata, context_);
setVirtuals(VirtualColumnUtils::getVirtualsForFileLikeStorage(metadata.getColumns(), context_, sample_path));
setInMemoryMetadata(metadata);
}
@ -83,8 +111,8 @@ RemoteQueryExecutor::Extension StorageObjectStorageCluster::getTaskIteratorExten
const ActionsDAG::Node * predicate, const ContextPtr & local_context) const
{
auto iterator = StorageObjectStorageSource::createFileIterator(
configuration, object_storage, /* distributed_processing */false, local_context,
predicate, virtual_columns, nullptr, local_context->getFileProgressCallback());
configuration, configuration->getQuerySettings(local_context), object_storage, /* distributed_processing */false,
local_context, predicate, virtual_columns, nullptr, local_context->getFileProgressCallback());
auto callback = std::make_shared<std::function<String()>>([iterator]() mutable -> String
{

View File

@ -27,6 +27,8 @@ public:
RemoteQueryExecutor::Extension getTaskIteratorExtension(
const ActionsDAG::Node * predicate, const ContextPtr & context) const override;
String getPathSample(StorageInMemoryMetadata metadata, ContextPtr context);
private:
void updateQueryToSendIfNeeded(
ASTPtr & query,

View File

@ -99,6 +99,7 @@ std::string StorageObjectStorageSource::getUniqueStoragePathIdentifier(
std::shared_ptr<StorageObjectStorageSource::IIterator> StorageObjectStorageSource::createFileIterator(
ConfigurationPtr configuration,
const StorageObjectStorage::QuerySettings & query_settings,
ObjectStoragePtr object_storage,
bool distributed_processing,
const ContextPtr & local_context,
@ -116,7 +117,6 @@ std::shared_ptr<StorageObjectStorageSource::IIterator> StorageObjectStorageSourc
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Expression can not have wildcards inside {} name", configuration->getNamespaceType());
auto settings = configuration->getQuerySettings(local_context);
const bool is_archive = configuration->isArchive();
std::unique_ptr<IIterator> iterator;
@ -125,8 +125,8 @@ std::shared_ptr<StorageObjectStorageSource::IIterator> StorageObjectStorageSourc
/// Iterate through disclosed globs and make a source for each file
iterator = std::make_unique<GlobIterator>(
object_storage, configuration, predicate, virtual_columns,
local_context, is_archive ? nullptr : read_keys, settings.list_object_keys_size,
settings.throw_on_zero_files_match, file_progress_callback);
local_context, is_archive ? nullptr : read_keys, query_settings.list_object_keys_size,
query_settings.throw_on_zero_files_match, file_progress_callback);
}
else
{
@ -148,7 +148,7 @@ std::shared_ptr<StorageObjectStorageSource::IIterator> StorageObjectStorageSourc
iterator = std::make_unique<KeysIterator>(
object_storage, copy_configuration, virtual_columns, is_archive ? nullptr : read_keys,
settings.ignore_non_existent_file, file_progress_callback);
query_settings.ignore_non_existent_file, file_progress_callback);
}
if (is_archive)
@ -198,15 +198,17 @@ Chunk StorageObjectStorageSource::generate()
const auto & object_info = reader.getObjectInfo();
const auto & filename = object_info->getFileName();
chassert(object_info->metadata);
VirtualColumnUtils::addRequestedFileLikeStorageVirtualsToChunk(
chunk,
read_from_format_info.requested_virtual_columns,
{.path = getUniqueStoragePathIdentifier(*configuration, *object_info, false),
.size = object_info->isArchive() ? object_info->fileSizeInArchive() : object_info->metadata->size_bytes,
.filename = &filename,
.last_modified = object_info->metadata->last_modified,
.etag = &(object_info->metadata->etag)
});
{
.path = getUniqueStoragePathIdentifier(*configuration, *object_info, false),
.size = object_info->isArchive() ? object_info->fileSizeInArchive() : object_info->metadata->size_bytes,
.filename = &filename,
.last_modified = object_info->metadata->last_modified,
.etag = &(object_info->metadata->etag)
}, getContext(), read_from_format_info.columns_description);
const auto & partition_columns = configuration->getPartitionColumns();
if (!partition_columns.empty() && chunk_size && chunk.hasColumns())

View File

@ -52,6 +52,7 @@ public:
static std::shared_ptr<IIterator> createFileIterator(
ConfigurationPtr configuration,
const StorageObjectStorage::QuerySettings & query_settings,
ObjectStoragePtr object_storage,
bool distributed_processing,
const ContextPtr & local_context,

View File

@ -49,19 +49,20 @@ void resolveSchemaAndFormat(
ObjectStoragePtr object_storage,
const StorageObjectStorage::ConfigurationPtr & configuration,
std::optional<FormatSettings> format_settings,
std::string & sample_path,
const ContextPtr & context)
{
if (columns.empty())
{
if (format == "auto")
std::tie(columns, format) =
StorageObjectStorage::resolveSchemaAndFormatFromData(object_storage, configuration, format_settings, context);
StorageObjectStorage::resolveSchemaAndFormatFromData(object_storage, configuration, format_settings, sample_path, context);
else
columns = StorageObjectStorage::resolveSchemaFromData(object_storage, configuration, format_settings, context);
columns = StorageObjectStorage::resolveSchemaFromData(object_storage, configuration, format_settings, sample_path, context);
}
else if (format == "auto")
{
format = StorageObjectStorage::resolveFormatFromData(object_storage, configuration, format_settings, context);
format = StorageObjectStorage::resolveFormatFromData(object_storage, configuration, format_settings, sample_path, context);
}
if (!columns.hasOnlyOrdinary())

View File

@ -19,6 +19,7 @@ void resolveSchemaAndFormat(
ObjectStoragePtr object_storage,
const StorageObjectStorage::ConfigurationPtr & configuration,
std::optional<FormatSettings> format_settings,
std::string & sample_path,
const ContextPtr & context);
}

View File

@ -524,7 +524,7 @@ Chunk ObjectStorageQueueSource::generateImpl()
{
.path = path,
.size = reader.getObjectInfo()->metadata->size_bytes
});
}, getContext(), read_from_format_info.columns_description);
return chunk;
}

View File

@ -161,14 +161,15 @@ StorageObjectStorageQueue::StorageObjectStorageQueue(
configuration->check(context_);
ColumnsDescription columns{columns_};
resolveSchemaAndFormat(columns, configuration->format, object_storage, configuration, format_settings, context_);
std::string sample_path;
resolveSchemaAndFormat(columns, configuration->format, object_storage, configuration, format_settings, sample_path, context_);
configuration->check(context_);
StorageInMemoryMetadata storage_metadata;
storage_metadata.setColumns(columns);
storage_metadata.setConstraints(constraints_);
storage_metadata.setComment(comment);
setVirtuals(VirtualColumnUtils::getVirtualsForFileLikeStorage(storage_metadata.getColumns()));
setVirtuals(VirtualColumnUtils::getVirtualsForFileLikeStorage(storage_metadata.getColumns(), context_));
setInMemoryMetadata(storage_metadata);
LOG_INFO(log, "Using zookeeper path: {}", zk_path.string());

View File

@ -53,6 +53,7 @@
#include <Common/logger_useful.h>
#include <Common/ProfileEvents.h>
#include <Common/re2.h>
#include <Formats/SchemaInferenceUtils.h>
#include "base/defines.h"
#include <Core/Settings.h>
@ -516,7 +517,7 @@ namespace
StorageFile::getSchemaCache(getContext()).addManyColumns(cache_keys, columns);
}
String getLastFileName() const override
String getLastFilePath() const override
{
if (current_index != 0)
return paths[current_index - 1];
@ -793,7 +794,7 @@ namespace
format = format_name;
}
String getLastFileName() const override
String getLastFilePath() const override
{
return last_read_file_path;
}
@ -1112,7 +1113,8 @@ void StorageFile::setStorageMetadata(CommonArguments args)
storage_metadata.setConstraints(args.constraints);
storage_metadata.setComment(args.comment);
setInMemoryMetadata(storage_metadata);
setVirtuals(VirtualColumnUtils::getVirtualsForFileLikeStorage(storage_metadata.getColumns()));
setVirtuals(VirtualColumnUtils::getVirtualsForFileLikeStorage(storage_metadata.getColumns(), args.getContext(), paths.empty() ? "" : paths[0], format_settings));
}
@ -1466,7 +1468,7 @@ Chunk StorageFileSource::generate()
.size = current_file_size,
.filename = (filename_override.has_value() ? &filename_override.value() : nullptr),
.last_modified = current_file_last_modified
});
}, getContext(), columns_description);
return chunk;
}

View File

@ -61,7 +61,7 @@ StorageFileCluster::StorageFileCluster(
storage_metadata.setConstraints(constraints_);
setInMemoryMetadata(storage_metadata);
setVirtuals(VirtualColumnUtils::getVirtualsForFileLikeStorage(storage_metadata.getColumns()));
setVirtuals(VirtualColumnUtils::getVirtualsForFileLikeStorage(storage_metadata.getColumns(), context, paths.empty() ? "" : paths[0]));
}
void StorageFileCluster::updateQueryToSendIfNeeded(DB::ASTPtr & query, const StorageSnapshotPtr & storage_snapshot, const DB::ContextPtr & context)

View File

@ -36,6 +36,8 @@
#include <Common/thread_local_rng.h>
#include <Common/logger_useful.h>
#include <Common/re2.h>
#include <Formats/SchemaInferenceUtils.h>
#include <Core/ServerSettings.h>
#include <Core/Settings.h>
#include <IO/ReadWriteBufferFromHTTP.h>
@ -90,11 +92,22 @@ static const std::vector<std::shared_ptr<re2::RE2>> optional_regex_keys = {
std::make_shared<re2::RE2>(R"(headers.header\[[0-9]*\].value)"),
};
static bool urlWithGlobs(const String & uri)
bool urlWithGlobs(const String & uri)
{
return (uri.find('{') != std::string::npos && uri.find('}') != std::string::npos) || uri.find('|') != std::string::npos;
}
String getSampleURI(String uri, ContextPtr context)
{
if (urlWithGlobs(uri))
{
auto uris = parseRemoteDescription(uri, 0, uri.size(), ',', context->getSettingsRef().glob_expansion_max_elements);
if (!uris.empty())
return uris[0];
}
return uri;
}
static ConnectionTimeouts getHTTPTimeouts(ContextPtr context)
{
return ConnectionTimeouts::getHTTPTimeouts(context->getSettingsRef(), context->getServerSettings().keep_alive_timeout);
@ -153,7 +166,8 @@ IStorageURLBase::IStorageURLBase(
storage_metadata.setConstraints(constraints_);
storage_metadata.setComment(comment);
setInMemoryMetadata(storage_metadata);
setVirtuals(VirtualColumnUtils::getVirtualsForFileLikeStorage(storage_metadata.getColumns()));
setVirtuals(VirtualColumnUtils::getVirtualsForFileLikeStorage(storage_metadata.getColumns(), context_, getSampleURI(uri, context_), format_settings));
}
@ -414,13 +428,14 @@ Chunk StorageURLSource::generate()
size_t chunk_size = 0;
if (input_format)
chunk_size = input_format->getApproxBytesReadForChunk();
progress(num_rows, chunk_size ? chunk_size : chunk.bytes());
VirtualColumnUtils::addRequestedFileLikeStorageVirtualsToChunk(
chunk, requested_virtual_columns,
{
.path = curr_uri.getPath(),
.size = current_file_size
});
.size = current_file_size,
}, getContext(), columns_description);
return chunk;
}
@ -839,7 +854,7 @@ namespace
format = format_name;
}
String getLastFileName() const override { return current_url_option; }
String getLastFilePath() const override { return current_url_option; }
bool supportsLastReadBufferRecreation() const override { return true; }
@ -1160,6 +1175,7 @@ void ReadFromURL::createIterator(const ActionsDAG::Node * predicate)
void ReadFromURL::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
{
createIterator(nullptr);
const auto & settings = context->getSettingsRef();
if (is_empty_glob)
{
@ -1170,7 +1186,6 @@ void ReadFromURL::initializePipeline(QueryPipelineBuilder & pipeline, const Buil
Pipes pipes;
pipes.reserve(num_streams);
const auto & settings = context->getSettingsRef();
const size_t max_parsing_threads = num_streams >= settings.max_parsing_threads ? 1 : (settings.max_parsing_threads / num_streams);
for (size_t i = 0; i < num_streams; ++i)

View File

@ -141,6 +141,9 @@ private:
virtual Block getHeaderBlock(const Names & column_names, const StorageSnapshotPtr & storage_snapshot) const = 0;
};
bool urlWithGlobs(const String & uri);
String getSampleURI(String uri, ContextPtr context);
class StorageURLSource : public SourceWithKeyCondition, WithContext
{

View File

@ -76,7 +76,7 @@ StorageURLCluster::StorageURLCluster(
storage_metadata.setConstraints(constraints_);
setInMemoryMetadata(storage_metadata);
setVirtuals(VirtualColumnUtils::getVirtualsForFileLikeStorage(storage_metadata.getColumns()));
setVirtuals(VirtualColumnUtils::getVirtualsForFileLikeStorage(storage_metadata.getColumns(), context, getSampleURI(uri, context)));
}
void StorageURLCluster::updateQueryToSendIfNeeded(ASTPtr & query, const StorageSnapshotPtr & storage_snapshot, const ContextPtr & context)

View File

@ -1,51 +1,66 @@
#include <Storages/VirtualColumnUtils.h>
#include <memory>
#include <stack>
#include <Columns/ColumnConst.h>
#include <Columns/ColumnSet.h>
#include <Columns/ColumnsCommon.h>
#include <Columns/ColumnsNumber.h>
#include <Columns/FilterDescription.h>
#include <Core/NamesAndTypes.h>
#include <Core/TypeId.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <Functions/FunctionHelpers.h>
#include <Functions/FunctionsLogical.h>
#include <Functions/IFunction.h>
#include <Functions/IFunctionAdaptors.h>
#include <Functions/indexHint.h>
#include <IO/WriteHelpers.h>
#include <Interpreters/ActionsDAG.h>
#include <Interpreters/ActionsVisitor.h>
#include <Interpreters/Context.h>
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/IdentifierSemantic.h>
#include <Interpreters/TreeRewriter.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/IdentifierSemantic.h>
#include <Interpreters/misc.h>
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTSelectQuery.h>
#include <Parsers/ASTSubquery.h>
#include <Parsers/makeASTForLogicalFunction.h>
#include <Processors/Executors/CompletedPipelineExecutor.h>
#include <Columns/ColumnConst.h>
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnsCommon.h>
#include <Columns/FilterDescription.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <DataTypes/DataTypeDateTime.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPlan/BuildQueryPipelineSettings.h>
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/Sinks/EmptySink.h>
#include <Processors/Executors/CompletedPipelineExecutor.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Storages/VirtualColumnUtils.h>
#include <IO/WriteHelpers.h>
#include <Common/re2.h>
#include <Common/typeid_cast.h>
#include <Formats/SchemaInferenceUtils.h>
#include <Formats/EscapingRuleUtils.h>
#include <Formats/FormatFactory.h>
#include <Core/Settings.h>
#include "Functions/FunctionsLogical.h"
#include "Functions/IFunction.h"
#include "Functions/IFunctionAdaptors.h"
#include "Functions/indexHint.h"
#include <Interpreters/convertFieldToType.h>
#include <Parsers/makeASTForLogicalFunction.h>
#include <Columns/ColumnSet.h>
#include <Functions/FunctionHelpers.h>
#include <Interpreters/ActionsVisitor.h>
namespace DB
{
namespace ErrorCodes
{
extern const int INCORRECT_DATA;
}
namespace VirtualColumnUtils
{
@ -114,7 +129,29 @@ NameSet getVirtualNamesForFileLikeStorage()
return {"_path", "_file", "_size", "_time", "_etag"};
}
VirtualColumnsDescription getVirtualsForFileLikeStorage(const ColumnsDescription & storage_columns)
std::unordered_map<std::string, std::string> parseHivePartitioningKeysAndValues(const String & path, const ColumnsDescription & storage_columns)
{
std::string pattern = "([^/]+)=([^/]+)/";
re2::StringPiece input_piece(path);
std::unordered_map<std::string, std::string> key_values;
std::string key, value;
std::unordered_set<String> used_keys;
while (RE2::FindAndConsume(&input_piece, pattern, &key, &value))
{
if (used_keys.contains(key))
throw Exception(ErrorCodes::INCORRECT_DATA, "Path '{}' to file with enabled hive-style partitioning contains duplicated partition key {}, only unique keys are allowed", path, key);
used_keys.insert(key);
auto col_name = "_" + key;
while (storage_columns.has(col_name))
col_name = "_" + col_name;
key_values[col_name] = value;
}
return key_values;
}
VirtualColumnsDescription getVirtualsForFileLikeStorage(const ColumnsDescription & storage_columns, const ContextPtr & context, const std::string & path, std::optional<FormatSettings> format_settings_)
{
VirtualColumnsDescription desc;
@ -132,6 +169,22 @@ VirtualColumnsDescription getVirtualsForFileLikeStorage(const ColumnsDescription
add_virtual("_time", makeNullable(std::make_shared<DataTypeDateTime>()));
add_virtual("_etag", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>()));
if (context->getSettingsRef().use_hive_partitioning)
{
auto map = parseHivePartitioningKeysAndValues(path, storage_columns);
auto format_settings = format_settings_ ? *format_settings_ : getFormatSettings(context);
for (auto & item : map)
{
auto type = tryInferDataTypeByEscapingRule(item.second, format_settings, FormatSettings::EscapingRule::Raw);
if (type == nullptr)
type = std::make_shared<DataTypeString>();
if (type->canBeInsideLowCardinality())
add_virtual(item.first, std::make_shared<DataTypeLowCardinality>(type));
else
add_virtual(item.first, type);
}
}
return desc;
}
@ -191,8 +244,12 @@ ColumnPtr getFilterByPathAndFileIndexes(const std::vector<String> & paths, const
void addRequestedFileLikeStorageVirtualsToChunk(
Chunk & chunk, const NamesAndTypesList & requested_virtual_columns,
VirtualsForFileLikeStorage virtual_values)
VirtualsForFileLikeStorage virtual_values, ContextPtr context, const ColumnsDescription & columns)
{
std::unordered_map<std::string, std::string> hive_map;
if (context->getSettingsRef().use_hive_partitioning)
hive_map = parseHivePartitioningKeysAndValues(virtual_values.path, columns);
for (const auto & virtual_column : requested_virtual_columns)
{
if (virtual_column.name == "_path")
@ -226,6 +283,10 @@ void addRequestedFileLikeStorageVirtualsToChunk(
else
chunk.addColumn(virtual_column.type->createColumnConstWithDefaultValue(chunk.getNumRows())->convertToFullColumnIfConst());
}
else if (auto it = hive_map.find(virtual_column.getNameInStorage()); it != hive_map.end())
{
chunk.addColumn(virtual_column.type->createColumnConst(chunk.getNumRows(), convertFieldToType(Field(it->second), *virtual_column.type))->convertToFullColumnIfConst());
}
else if (virtual_column.name == "_etag")
{
if (virtual_values.etag)

View File

@ -5,6 +5,7 @@
#include <Parsers/IAST_fwd.h>
#include <Storages/SelectQueryInfo.h>
#include <Storages/VirtualColumnsDescription.h>
#include <Formats/FormatSettings.h>
#include <unordered_set>
@ -68,7 +69,11 @@ auto extractSingleValueFromBlock(const Block & block, const String & name)
}
NameSet getVirtualNamesForFileLikeStorage();
VirtualColumnsDescription getVirtualsForFileLikeStorage(const ColumnsDescription & storage_columns);
VirtualColumnsDescription getVirtualsForFileLikeStorage(
const ColumnsDescription & storage_columns,
const ContextPtr & context,
const std::string & sample_path = "",
std::optional<FormatSettings> format_settings_ = std::nullopt);
std::optional<ActionsDAG> createPathAndFileFilterDAG(const ActionsDAG::Node * predicate, const NamesAndTypesList & virtual_columns);
@ -100,7 +105,7 @@ struct VirtualsForFileLikeStorage
void addRequestedFileLikeStorageVirtualsToChunk(
Chunk & chunk, const NamesAndTypesList & requested_virtual_columns,
VirtualsForFileLikeStorage virtual_values);
VirtualsForFileLikeStorage virtual_values, ContextPtr context, const ColumnsDescription & columns);
}
}

View File

@ -84,7 +84,8 @@ ColumnsDescription TableFunctionObjectStorage<
context->checkAccess(getSourceAccessType());
ColumnsDescription columns;
auto storage = getObjectStorage(context, !is_insert_query);
resolveSchemaAndFormat(columns, configuration->format, storage, configuration, std::nullopt, context);
std::string sample_path;
resolveSchemaAndFormat(columns, configuration->format, storage, configuration, std::nullopt, sample_path, context);
return columns;
}
else

View File

@ -5,6 +5,7 @@ import json
import logging
import os
import io
import re
import random
import threading
import time
@ -134,6 +135,7 @@ def test_create_table_connection_string(cluster):
Engine = AzureBlobStorage('{cluster.env_variables['AZURITE_CONNECTION_STRING']}', 'cont', 'test_create_connection_string', 'CSV')
""",
)
azure_query(node, "DROP TABLE IF EXISTS test_create_table_conn_string")
def test_create_table_account_string(cluster):
@ -143,6 +145,7 @@ def test_create_table_account_string(cluster):
f"CREATE TABLE test_create_table_account_url (key UInt64, data String) Engine = AzureBlobStorage('{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}',"
f"'cont', 'test_create_connection_string', 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV')",
)
azure_query(node, "DROP TABLE IF EXISTS test_create_table_account_url")
def test_simple_write_account_string(cluster):
@ -156,6 +159,7 @@ def test_simple_write_account_string(cluster):
azure_query(node, "INSERT INTO test_simple_write VALUES (1, 'a')")
print(get_azure_file_content("test_simple_write.csv", port))
assert get_azure_file_content("test_simple_write.csv", port) == '1,"a"\n'
azure_query(node, "DROP TABLE test_simple_write")
def test_simple_write_connection_string(cluster):
@ -169,6 +173,7 @@ def test_simple_write_connection_string(cluster):
azure_query(node, "INSERT INTO test_simple_write_connection_string VALUES (1, 'a')")
print(get_azure_file_content("test_simple_write_c.csv", port))
assert get_azure_file_content("test_simple_write_c.csv", port) == '1,"a"\n'
azure_query(node, "DROP TABLE test_simple_write_connection_string")
def test_simple_write_named_collection_1(cluster):
@ -184,7 +189,7 @@ def test_simple_write_named_collection_1(cluster):
)
print(get_azure_file_content("test_simple_write_named.csv", port))
assert get_azure_file_content("test_simple_write_named.csv", port) == '1,"a"\n'
azure_query(node, "TRUNCATE TABLE test_simple_write_named_collection_1")
azure_query(node, "DROP TABLE test_simple_write_named_collection_1")
def test_simple_write_named_collection_2(cluster):
@ -201,6 +206,7 @@ def test_simple_write_named_collection_2(cluster):
)
print(get_azure_file_content("test_simple_write_named_2.csv", port))
assert get_azure_file_content("test_simple_write_named_2.csv", port) == '1,"a"\n'
azure_query(node, "DROP TABLE test_simple_write_named_collection_2")
def test_partition_by(cluster):
@ -222,6 +228,7 @@ def test_partition_by(cluster):
assert "1,2,3\n" == get_azure_file_content("test_3.csv", port)
assert "3,2,1\n" == get_azure_file_content("test_1.csv", port)
assert "78,43,45\n" == get_azure_file_content("test_45.csv", port)
azure_query(node, "DROP TABLE test_partitioned_write")
def test_partition_by_string_column(cluster):
@ -242,6 +249,7 @@ def test_partition_by_string_column(cluster):
assert '1,"foo/bar"\n' == get_azure_file_content("test_foo/bar.csv", port)
assert '3,"йцук"\n' == get_azure_file_content("test_йцук.csv", port)
assert '78,"你好"\n' == get_azure_file_content("test_你好.csv", port)
azure_query(node, "DROP TABLE test_partitioned_string_write")
def test_partition_by_const_column(cluster):
@ -260,6 +268,7 @@ def test_partition_by_const_column(cluster):
)
azure_query(node, f"INSERT INTO test_partitioned_const_write VALUES {values}")
assert values_csv == get_azure_file_content("test_88.csv", port)
azure_query(node, "DROP TABLE test_partitioned_const_write")
def test_truncate(cluster):
@ -275,6 +284,7 @@ def test_truncate(cluster):
azure_query(node, "TRUNCATE TABLE test_truncate")
with pytest.raises(Exception):
print(get_azure_file_content("test_truncate.csv", port))
azure_query(node, "DROP TABLE test_truncate")
def test_simple_read_write(cluster):
@ -291,6 +301,7 @@ def test_simple_read_write(cluster):
assert get_azure_file_content("test_simple_read_write.csv", port) == '1,"a"\n'
print(azure_query(node, "SELECT * FROM test_simple_read_write"))
assert azure_query(node, "SELECT * FROM test_simple_read_write") == "1\ta\n"
azure_query(node, "DROP TABLE test_simple_read_write")
def test_create_new_files_on_insert(cluster):
@ -343,6 +354,7 @@ def test_overwrite(cluster):
result = azure_query(node, f"select count() from test_overwrite")
assert int(result) == 200
azure_query(node, f"DROP TABLE test_overwrite")
def test_insert_with_path_with_globs(cluster):
@ -355,6 +367,7 @@ def test_insert_with_path_with_globs(cluster):
node.query_and_get_error(
f"insert into table function test_insert_globs SELECT number, randomString(100) FROM numbers(500)"
)
azure_query(node, f"DROP TABLE test_insert_globs")
def test_put_get_with_globs(cluster):
@ -363,6 +376,7 @@ def test_put_get_with_globs(cluster):
node = cluster.instances["node"] # type: ClickHouseInstance
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
max_path = ""
used_names = []
for i in range(10):
for j in range(10):
path = "{}/{}_{}/{}.csv".format(
@ -371,6 +385,8 @@ def test_put_get_with_globs(cluster):
max_path = max(path, max_path)
values = f"({i},{j},{i + j})"
used_names.append(f"test_put_{i}_{j}")
azure_query(
node,
f"CREATE TABLE test_put_{i}_{j} ({table_format}) Engine = AzureBlobStorage(azure_conf2, "
@ -391,6 +407,9 @@ def test_put_get_with_globs(cluster):
bucket="cont", max_path=max_path
)
]
azure_query(node, "DROP TABLE test_glob_select")
for name in used_names:
azure_query(node, f"DROP TABLE {name}")
def test_azure_glob_scheherazade(cluster):
@ -399,12 +418,14 @@ def test_azure_glob_scheherazade(cluster):
values = "(1, 1, 1)"
nights_per_job = 1001 // 30
jobs = []
used_names = []
for night in range(0, 1001, nights_per_job):
def add_tales(start, end):
for i in range(start, end):
path = "night_{}/tale.csv".format(i)
unique_num = random.randint(1, 10000)
used_names.append(f"test_scheherazade_{i}_{unique_num}")
azure_query(
node,
f"CREATE TABLE test_scheherazade_{i}_{unique_num} ({table_format}) Engine = AzureBlobStorage(azure_conf2, "
@ -432,6 +453,9 @@ def test_azure_glob_scheherazade(cluster):
)
query = "select count(), sum(column1), sum(column2), sum(column3) from test_glob_select_scheherazade"
assert azure_query(node, query).splitlines() == ["1001\t1001\t1001\t1001"]
azure_query(node, "DROP TABLE test_glob_select_scheherazade")
for name in used_names:
azure_query(node, f"DROP TABLE {name}")
@pytest.mark.parametrize(
@ -505,6 +529,8 @@ def test_schema_inference_no_globs(cluster):
assert azure_query(node, query).splitlines() == [
"499500\t2890\t332833500\ttest_schema_inference_no_globs.csv\tcont/test_schema_inference_no_globs.csv"
]
azure_query(node, f"DROP TABLE test_schema_inference_src")
azure_query(node, f"DROP TABLE test_select_inference")
def test_schema_inference_from_globs(cluster):
@ -513,6 +539,7 @@ def test_schema_inference_from_globs(cluster):
node = cluster.instances["node"] # type: ClickHouseInstance
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
max_path = ""
used_names = []
for i in range(10):
for j in range(10):
path = "{}/{}_{}/{}.csv".format(
@ -520,6 +547,7 @@ def test_schema_inference_from_globs(cluster):
)
max_path = max(path, max_path)
values = f"({i},{j},{i + j})"
used_names.append(f"test_schema_{i}_{j}")
azure_query(
node,
@ -545,6 +573,9 @@ def test_schema_inference_from_globs(cluster):
bucket="cont", max_path=max_path
)
]
azure_query(node, "DROP TABLE test_glob_select_inference")
for name in used_names:
azure_query(node, f"DROP TABLE {name}")
def test_simple_write_account_string_table_function(cluster):
@ -594,7 +625,7 @@ def test_simple_write_named_collection_1_table_function(cluster):
azure_query(
node,
"TRUNCATE TABLE drop_table",
"DROP TABLE drop_table",
)
@ -605,6 +636,7 @@ def test_simple_write_named_collection_2_table_function(cluster):
node,
f"INSERT INTO TABLE FUNCTION azureBlobStorage(azure_conf2, storage_account_url = '{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}',"
f" container='cont', blob_path='test_simple_write_named_2_tf.csv', format='CSV', structure='key UInt64, data String') VALUES (1, 'a')",
settings={"azure_truncate_on_insert": 1},
)
print(get_azure_file_content("test_simple_write_named_2_tf.csv", port))
assert get_azure_file_content("test_simple_write_named_2_tf.csv", port) == '1,"a"\n'
@ -628,6 +660,7 @@ def test_put_get_with_globs_tf(cluster):
node,
f"INSERT INTO TABLE FUNCTION azureBlobStorage(azure_conf2, storage_account_url = '{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}',"
f" container='cont', blob_path='{path}', format='CSV', compression='auto', structure='{table_format}') VALUES {values}",
settings={"azure_truncate_on_insert": 1},
)
query = (
f"select sum(column1), sum(column2), sum(column3), min(_file), max(_path) from azureBlobStorage(azure_conf2, "
@ -648,7 +681,7 @@ def test_schema_inference_no_globs_tf(cluster):
query = (
f"insert into table function azureBlobStorage(azure_conf2, storage_account_url = '{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}', "
f"container='cont', blob_path='test_schema_inference_no_globs_tf.csv', format='CSVWithNames', structure='{table_format}') "
f"SELECT number, toString(number), number * number FROM numbers(1000)"
f"SELECT number, toString(number), number * number FROM numbers(1000) SETTINGS azure_truncate_on_insert=1"
)
azure_query(node, query)
@ -681,7 +714,7 @@ def test_schema_inference_from_globs_tf(cluster):
f"insert into table function azureBlobStorage(azure_conf2, storage_account_url = '{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}', "
f"container='cont', blob_path='{path}', format='CSVWithNames', structure='{table_format}') VALUES {values}"
)
azure_query(node, query)
azure_query(node, query, settings={"azure_truncate_on_insert": 1})
query = (
f"select sum(column1), sum(column2), sum(column3), min(_file), max(_path) from azureBlobStorage(azure_conf2, "
@ -708,6 +741,7 @@ def test_partition_by_tf(cluster):
f"INSERT INTO TABLE FUNCTION azureBlobStorage('{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}', "
f"'cont', '{filename}', 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', "
f"'CSV', 'auto', '{table_format}') PARTITION BY {partition_by} VALUES {values}",
settings={"azure_truncate_on_insert": 1},
)
assert "1,2,3\n" == get_azure_file_content("test_partition_tf_3.csv", port)
@ -727,6 +761,7 @@ def test_filter_using_file(cluster):
f"INSERT INTO TABLE FUNCTION azureBlobStorage('{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}', 'cont', '{filename}', "
f"'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV', 'auto', "
f"'{table_format}') PARTITION BY {partition_by} VALUES {values}",
settings={"azure_truncate_on_insert": 1},
)
query = (
@ -744,7 +779,7 @@ def test_read_subcolumns(cluster):
node,
f"INSERT INTO TABLE FUNCTION azureBlobStorage('{storage_account_url}', 'cont', 'test_subcolumns.tsv', "
f"'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'auto', 'auto',"
f" 'a Tuple(b Tuple(c UInt32, d UInt32), e UInt32)') select ((1, 2), 3)",
f" 'a Tuple(b Tuple(c UInt32, d UInt32), e UInt32)') select ((1, 2), 3) SETTINGS azure_truncate_on_insert=1",
)
azure_query(
@ -794,7 +829,7 @@ def test_read_subcolumn_time(cluster):
node,
f"INSERT INTO TABLE FUNCTION azureBlobStorage('{storage_account_url}', 'cont', 'test_subcolumn_time.tsv', "
f"'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'auto', 'auto',"
f" 'a UInt32') select (42)",
f" 'a UInt32') select (42) SETTINGS azure_truncate_on_insert=1",
)
res = node.query(
@ -825,6 +860,7 @@ def test_function_signatures(cluster):
azure_query(
node,
f"INSERT INTO TABLE FUNCTION azureBlobStorage('{storage_account_url}', 'cont', 'test_signature.csv', '{account_name}', '{account_key}', 'CSV', 'auto', 'column1 UInt32') VALUES (1),(2),(3)",
settings={"azure_truncate_on_insert": 1},
)
# " - connection_string, container_name, blobpath\n"
@ -939,11 +975,13 @@ def test_union_schema_inference_mode(cluster):
azure_query(
node,
f"INSERT INTO TABLE FUNCTION azureBlobStorage('{storage_account_url}', 'cont', 'test_union_schema_inference1.jsonl', '{account_name}', '{account_key}', 'JSONEachRow', 'auto', 'a UInt32') VALUES (1)",
settings={"azure_truncate_on_insert": 1},
)
azure_query(
node,
f"INSERT INTO TABLE FUNCTION azureBlobStorage('{storage_account_url}', 'cont', 'test_union_schema_inference2.jsonl', '{account_name}', '{account_key}', 'JSONEachRow', 'auto', 'b UInt32') VALUES (2)",
settings={"azure_truncate_on_insert": 1},
)
node.query("system drop schema cache for azure")
@ -981,6 +1019,7 @@ def test_union_schema_inference_mode(cluster):
azure_query(
node,
f"INSERT INTO TABLE FUNCTION azureBlobStorage('{storage_account_url}', 'cont', 'test_union_schema_inference3.jsonl', '{account_name}', '{account_key}', 'CSV', 'auto', 's String') VALUES ('Error')",
settings={"azure_truncate_on_insert": 1},
)
error = azure_query(
@ -1002,7 +1041,7 @@ def test_schema_inference_cache(cluster):
azure_query(
node,
f"INSERT INTO TABLE FUNCTION azureBlobStorage('{storage_account_url}', 'cont', 'test_cache0.jsonl', '{account_name}', '{account_key}') "
f"select * from numbers(100)",
f"select * from numbers(100) SETTINGS azure_truncate_on_insert=1",
)
time.sleep(1)
@ -1209,19 +1248,19 @@ def test_filtering_by_file_or_path(cluster):
azure_query(
node,
f"INSERT INTO TABLE FUNCTION azureBlobStorage('{storage_account_url}','cont', 'test_filter1.tsv', 'devstoreaccount1', "
f"'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'auto', 'auto', 'x UInt64') select 1",
f"'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'auto', 'auto', 'x UInt64') select 1 SETTINGS azure_truncate_on_insert=1",
)
azure_query(
node,
f"INSERT INTO TABLE FUNCTION azureBlobStorage('{storage_account_url}','cont', 'test_filter2.tsv', 'devstoreaccount1', "
f"'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'auto', 'auto', 'x UInt64') select 2",
f"'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'auto', 'auto', 'x UInt64') select 2 SETTINGS azure_truncate_on_insert=1",
)
azure_query(
node,
f"INSERT INTO TABLE FUNCTION azureBlobStorage('{storage_account_url}', 'cont', 'test_filter3.tsv', 'devstoreaccount1', "
f"'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'auto', 'auto', 'x UInt64') select 3",
f"'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'auto', 'auto', 'x UInt64') select 3 SETTINGS azure_truncate_on_insert=1",
)
node.query(
@ -1233,7 +1272,7 @@ def test_filtering_by_file_or_path(cluster):
node.query("SYSTEM FLUSH LOGS")
result = node.query(
f"SELECT ProfileEvents['EngineFileLikeReadFiles'] FROM system.query_log WHERE query ilike '%select%azure%test_filter%' AND type='QueryFinish'"
f"SELECT ProfileEvents['EngineFileLikeReadFiles'] FROM system.query_log WHERE query ilike '%select%azure%test_filter%' AND type='QueryFinish' ORDER BY event_time_microseconds DESC LIMIT 1"
)
assert int(result) == 1
@ -1245,19 +1284,19 @@ def test_size_virtual_column(cluster):
azure_query(
node,
f"INSERT INTO TABLE FUNCTION azureBlobStorage('{storage_account_url}','cont', 'test_size_virtual_column1.tsv', 'devstoreaccount1', "
f"'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'auto', 'auto', 'x UInt64') select 1",
f"'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'auto', 'auto', 'x UInt64') select 1 SETTINGS azure_truncate_on_insert=1",
)
azure_query(
node,
f"INSERT INTO TABLE FUNCTION azureBlobStorage('{storage_account_url}','cont', 'test_size_virtual_column2.tsv', 'devstoreaccount1', "
f"'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'auto', 'auto', 'x UInt64') select 11",
f"'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'auto', 'auto', 'x UInt64') select 11 SETTINGS azure_truncate_on_insert=1",
)
azure_query(
node,
f"INSERT INTO TABLE FUNCTION azureBlobStorage('{storage_account_url}', 'cont', 'test_size_virtual_column3.tsv', 'devstoreaccount1', "
f"'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'auto', 'auto', 'x UInt64') select 111",
f"'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'auto', 'auto', 'x UInt64') select 111 SETTINGS azure_truncate_on_insert=1",
)
result = azure_query(
@ -1280,7 +1319,7 @@ def test_format_detection(cluster):
account_key = "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw=="
azure_query(
node,
f"INSERT INTO TABLE FUNCTION azureBlobStorage('{storage_account_url}', 'cont', 'test_format_detection0', '{account_name}', '{account_key}', 'JSONEachRow', 'auto', 'x UInt64, y String') select number as x, 'str_' || toString(number) from numbers(0)",
f"INSERT INTO TABLE FUNCTION azureBlobStorage('{storage_account_url}', 'cont', 'test_format_detection0', '{account_name}', '{account_key}', 'JSONEachRow', 'auto', 'x UInt64, y String') select number as x, 'str_' || toString(number) from numbers(0) SETTINGS azure_truncate_on_insert=1",
)
azure_query(
@ -1350,7 +1389,7 @@ def test_write_to_globbed_partitioned_path(cluster):
account_key = "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw=="
error = azure_query(
node,
f"INSERT INTO TABLE FUNCTION azureBlobStorage('{storage_account_url}', 'cont', 'test_data_*_{{_partition_id}}', '{account_name}', '{account_key}', 'CSV', 'auto', 'x UInt64') partition by 42 select 42",
f"INSERT INTO TABLE FUNCTION azureBlobStorage('{storage_account_url}', 'cont', 'test_data_*_{{_partition_id}}', '{account_name}', '{account_key}', 'CSV', 'auto', 'x UInt64') partition by 42 select 42 SETTINGS azure_truncate_on_insert=1",
expect_error="true",
)
@ -1462,3 +1501,115 @@ def test_insert_create_new_file(cluster):
assert TSV(res) == TSV(
"test_create_new_file.csv\t1\ntest_create_new_file.1.csv\t2\n"
)
def test_hive_partitioning_with_one_parameter(cluster):
# type: (ClickHouseCluster) -> None
node = cluster.instances["node"] # type: ClickHouseInstance
table_format = "column1 String, column2 String"
values = f"('Elizabeth', 'Gordon')"
path = "a/column1=Elizabeth/sample.csv"
azure_query(
node,
f"INSERT INTO TABLE FUNCTION azureBlobStorage(azure_conf2, storage_account_url = '{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}',"
f" container='cont', blob_path='{path}', format='CSV', compression='auto', structure='{table_format}') VALUES {values}",
settings={"azure_truncate_on_insert": 1},
)
query = (
f"SELECT column1, column2, _file, _path, _column1 FROM azureBlobStorage(azure_conf2, "
f"storage_account_url = '{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}', container='cont', "
f"blob_path='{path}', format='CSV', structure='{table_format}')"
)
assert azure_query(
node, query, settings={"use_hive_partitioning": 1}
).splitlines() == [
"Elizabeth\tGordon\tsample.csv\t{bucket}/{max_path}\tElizabeth".format(
bucket="cont", max_path=path
)
]
query = (
f"SELECT column2 FROM azureBlobStorage(azure_conf2, "
f"storage_account_url = '{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}', container='cont', "
f"blob_path='{path}', format='CSV', structure='{table_format}') WHERE column1=_column1;"
)
assert azure_query(
node, query, settings={"use_hive_partitioning": 1}
).splitlines() == ["Gordon"]
def test_hive_partitioning_with_two_parameters(cluster):
# type: (ClickHouseCluster) -> None
node = cluster.instances["node"] # type: ClickHouseInstance
table_format = "column1 String, column2 String"
values_1 = f"('Elizabeth', 'Gordon')"
values_2 = f"('Emilia', 'Gregor')"
path = "a/column1=Elizabeth/column2=Gordon/sample.csv"
azure_query(
node,
f"INSERT INTO TABLE FUNCTION azureBlobStorage(azure_conf2, storage_account_url = '{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}',"
f" container='cont', blob_path='{path}', format='CSV', compression='auto', structure='{table_format}') VALUES {values_1}, {values_2}",
settings={"azure_truncate_on_insert": 1},
)
query = (
f"SELECT column1, column2, _file, _path, _column1, _column2 FROM azureBlobStorage(azure_conf2, "
f"storage_account_url = '{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}', container='cont', "
f"blob_path='{path}', format='CSV', structure='{table_format}') WHERE column1=_column1;"
)
assert azure_query(
node, query, settings={"use_hive_partitioning": 1}
).splitlines() == [
"Elizabeth\tGordon\tsample.csv\t{bucket}/{max_path}\tElizabeth\tGordon".format(
bucket="cont", max_path=path
)
]
query = (
f"SELECT column1 FROM azureBlobStorage(azure_conf2, "
f"storage_account_url = '{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}', container='cont', "
f"blob_path='{path}', format='CSV', structure='{table_format}') WHERE column2=_column2;"
)
assert azure_query(
node, query, settings={"use_hive_partitioning": 1}
).splitlines() == ["Elizabeth"]
query = (
f"SELECT column1 FROM azureBlobStorage(azure_conf2, "
f"storage_account_url = '{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}', container='cont', "
f"blob_path='{path}', format='CSV', structure='{table_format}') WHERE column2=_column2 AND column1=_column1;"
)
assert azure_query(
node, query, settings={"use_hive_partitioning": 1}
).splitlines() == ["Elizabeth"]
def test_hive_partitioning_without_setting(cluster):
# type: (ClickHouseCluster) -> None
node = cluster.instances["node"] # type: ClickHouseInstance
table_format = "column1 String, column2 String"
values_1 = f"('Elizabeth', 'Gordon')"
values_2 = f"('Emilia', 'Gregor')"
path = "a/column1=Elizabeth/column2=Gordon/sample.csv"
azure_query(
node,
f"INSERT INTO TABLE FUNCTION azureBlobStorage(azure_conf2, storage_account_url = '{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}',"
f" container='cont', blob_path='{path}', format='CSV', compression='auto', structure='{table_format}') VALUES {values_1}, {values_2}",
settings={"azure_truncate_on_insert": 1},
)
query = (
f"SELECT column1, column2, _file, _path, _column1, _column2 FROM azureBlobStorage(azure_conf2, "
f"storage_account_url = '{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}', container='cont', "
f"blob_path='{path}', format='CSV', structure='{table_format}') WHERE column1=_column1;"
)
pattern = re.compile(
r"DB::Exception: Unknown expression identifier '.*' in scope.*", re.DOTALL
)
with pytest.raises(Exception, match=pattern):
azure_query(node, query, settings={"use_hive_partitioning": 0})

View File

@ -72,6 +72,7 @@ def test_select_all(cluster):
f"INSERT INTO TABLE FUNCTION azureBlobStorage('{storage_account_url}', 'cont', 'test_cluster_select_all.csv', 'devstoreaccount1',"
f"'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV', 'auto', 'key UInt64, data String') "
f"VALUES (1, 'a'), (2, 'b')",
settings={"azure_truncate_on_insert": 1},
)
print(get_azure_file_content("test_cluster_select_all.csv", port))
@ -101,6 +102,7 @@ def test_count(cluster):
f"INSERT INTO TABLE FUNCTION azureBlobStorage('{storage_account_url}', 'cont', 'test_cluster_count.csv', 'devstoreaccount1', "
f"'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV', "
f"'auto', 'key UInt64') VALUES (1), (2)",
settings={"azure_truncate_on_insert": 1},
)
print(get_azure_file_content("test_cluster_count.csv", port))
@ -129,6 +131,7 @@ def test_union_all(cluster):
f"INSERT INTO TABLE FUNCTION azureBlobStorage('{storage_account_url}', 'cont', 'test_parquet_union_all', 'devstoreaccount1', "
f"'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'Parquet', "
f"'auto', 'a Int32, b String') VALUES (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd')",
settings={"azure_truncate_on_insert": 1},
)
pure_azure = azure_query(
@ -180,6 +183,7 @@ def test_skip_unavailable_shards(cluster):
f"INSERT INTO TABLE FUNCTION azureBlobStorage('{storage_account_url}', 'cont', 'test_skip_unavailable.csv', 'devstoreaccount1', "
f"'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'auto', "
f"'auto', 'a UInt64') VALUES (1), (2)",
settings={"azure_truncate_on_insert": 1},
)
result = azure_query(
node,
@ -200,6 +204,7 @@ def test_unset_skip_unavailable_shards(cluster):
f"INSERT INTO TABLE FUNCTION azureBlobStorage('{storage_account_url}', 'cont', 'test_unset_skip_unavailable.csv', 'devstoreaccount1', "
f"'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'auto', "
f"'auto', 'a UInt64') VALUES (1), (2)",
settings={"azure_truncate_on_insert": 1},
)
result = azure_query(
node,
@ -218,6 +223,7 @@ def test_cluster_with_named_collection(cluster):
f"INSERT INTO TABLE FUNCTION azureBlobStorage('{storage_account_url}', 'cont', 'test_cluster_with_named_collection.csv', 'devstoreaccount1', "
f"'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'auto', "
f"'auto', 'a UInt64') VALUES (1), (2)",
settings={"azure_truncate_on_insert": 1},
)
pure_azure = azure_query(
@ -249,6 +255,7 @@ def test_partition_parallel_reading_with_cluster(cluster):
f"INSERT INTO TABLE FUNCTION azureBlobStorage('{storage_account_url}', 'cont', '{filename}', 'devstoreaccount1', "
f"'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV', 'auto', '{table_format}') "
f"PARTITION BY {partition_by} VALUES {values}",
settings={"azure_truncate_on_insert": 1},
)
assert "1,2,3\n" == get_azure_file_content("test_tf_3.csv", port)
@ -272,12 +279,12 @@ def test_format_detection(cluster):
azure_query(
node,
f"INSERT INTO TABLE FUNCTION azureBlobStorage('{storage_account_url}', 'cont', 'test_format_detection0', '{account_name}', '{account_key}', 'JSONEachRow', 'auto', 'x UInt32, y String') select number as x, 'str_' || toString(number) from numbers(10)",
f"INSERT INTO TABLE FUNCTION azureBlobStorage('{storage_account_url}', 'cont', 'test_format_detection0', '{account_name}', '{account_key}', 'JSONEachRow', 'auto', 'x UInt32, y String') select number as x, 'str_' || toString(number) from numbers(10) SETTINGS azure_truncate_on_insert=1",
)
azure_query(
node,
f"INSERT INTO TABLE FUNCTION azureBlobStorage('{storage_account_url}', 'cont', 'test_format_detection1', '{account_name}', '{account_key}', 'JSONEachRow', 'auto', 'x UInt32, y String') select number as x, 'str_' || toString(number) from numbers(10, 10)",
f"INSERT INTO TABLE FUNCTION azureBlobStorage('{storage_account_url}', 'cont', 'test_format_detection1', '{account_name}', '{account_key}', 'JSONEachRow', 'auto', 'x UInt32, y String') select number as x, 'str_' || toString(number) from numbers(10, 10) SETTINGS azure_truncate_on_insert=1",
)
expected_desc_result = azure_query(

View File

@ -3,7 +3,9 @@ import os
import pytest
import uuid
import time
import re
from helpers.cluster import ClickHouseCluster, is_arm
from helpers.client import QueryRuntimeException
from helpers.test_tools import TSV
from pyhdfs import HdfsClient
@ -1255,6 +1257,55 @@ def test_respect_object_existence_on_partitioned_write(started_cluster):
assert int(result) == 44
def test_hive_partitioning_with_one_parameter(started_cluster):
hdfs_api = started_cluster.hdfs_api
hdfs_api.write_data(f"/column0=Elizabeth/parquet_1", f"Elizabeth\tGordon\n")
assert hdfs_api.read_data(f"/column0=Elizabeth/parquet_1") == f"Elizabeth\tGordon\n"
r = node1.query(
"SELECT _column0 FROM hdfs('hdfs://hdfs1:9000/column0=Elizabeth/parquet_1', 'TSV')",
settings={"use_hive_partitioning": 1},
)
assert r == f"Elizabeth\n"
def test_hive_partitioning_with_two_parameters(started_cluster):
hdfs_api = started_cluster.hdfs_api
hdfs_api.write_data(
f"/column0=Elizabeth/column1=Gordon/parquet_2", f"Elizabeth\tGordon\n"
)
assert (
hdfs_api.read_data(f"/column0=Elizabeth/column1=Gordon/parquet_2")
== f"Elizabeth\tGordon\n"
)
r = node1.query(
"SELECT _column1 FROM hdfs('hdfs://hdfs1:9000/column0=Elizabeth/column1=Gordon/parquet_2', 'TSV');",
settings={"use_hive_partitioning": 1},
)
assert r == f"Gordon\n"
def test_hive_partitioning_without_setting(started_cluster):
hdfs_api = started_cluster.hdfs_api
hdfs_api.write_data(
f"/column0=Elizabeth/column1=Gordon/parquet_2", f"Elizabeth\tGordon\n"
)
assert (
hdfs_api.read_data(f"/column0=Elizabeth/column1=Gordon/parquet_2")
== f"Elizabeth\tGordon\n"
)
pattern = re.compile(
r"DB::Exception: Unknown expression identifier '.*' in scope.*", re.DOTALL
)
with pytest.raises(QueryRuntimeException, match=pattern):
node1.query(
f"SELECT _column1 FROM hdfs('hdfs://hdfs1:9000/column0=Elizabeth/column1=Gordon/parquet_2', 'TSV');",
settings={"use_hive_partitioning": 0},
)
if __name__ == "__main__":
cluster.start()
input("Cluster created, press any key to destroy...")

View File

@ -0,0 +1,120 @@
TESTING THE FILE HIVE PARTITIONING
first last Elizabeth
Jorge Frank Elizabeth
Hunter Moreno Elizabeth
Esther Guzman Elizabeth
Dennis Stephens Elizabeth
Nettie Franklin Elizabeth
Stanley Gibson Elizabeth
Eugenia Greer Elizabeth
Jeffery Delgado Elizabeth
Clara Cross Elizabeth
Elizabeth Gordon Elizabeth
Eva Schmidt Elizabeth Schmidt
Samuel Schmidt Elizabeth Schmidt
Eva Schmidt Elizabeth
Samuel Schmidt Elizabeth
Elizabeth Gordon Elizabeth Gordon
Elizabeth Gordon Elizabeth
Elizabeth Gordon Elizabeth Gordon
Elizabeth Gordon Elizabeth
first last Elizabeth
Jorge Frank Elizabeth
Hunter Moreno Elizabeth
Esther Guzman Elizabeth
Dennis Stephens Elizabeth
Nettie Franklin Elizabeth
Stanley Gibson Elizabeth
Eugenia Greer Elizabeth
Jeffery Delgado Elizabeth
Clara Cross Elizabeth
Elizabeth Gordon Elizabeth
42 2020-01-01
[1,2,3] 42.42
Array(Int64) LowCardinality(Float64)
101
2070
4081
2070
2070
1
1
TESTING THE URL PARTITIONING
first last Elizabeth
Jorge Frank Elizabeth
Hunter Moreno Elizabeth
Esther Guzman Elizabeth
Dennis Stephens Elizabeth
Nettie Franklin Elizabeth
Stanley Gibson Elizabeth
Eugenia Greer Elizabeth
Jeffery Delgado Elizabeth
Clara Cross Elizabeth
Elizabeth Gordon Elizabeth
Eva Schmidt Elizabeth Schmidt
Samuel Schmidt Elizabeth Schmidt
Eva Schmidt Elizabeth
Samuel Schmidt Elizabeth
Elizabeth Gordon Elizabeth Gordon
Elizabeth Gordon Elizabeth
Elizabeth Gordon Elizabeth Gordon
Elizabeth Gordon Elizabeth
first last Elizabeth
Jorge Frank Elizabeth
Hunter Moreno Elizabeth
Esther Guzman Elizabeth
Dennis Stephens Elizabeth
Nettie Franklin Elizabeth
Stanley Gibson Elizabeth
Eugenia Greer Elizabeth
Jeffery Delgado Elizabeth
Clara Cross Elizabeth
1
TESTING THE S3 PARTITIONING
first last Elizabeth
Jorge Frank Elizabeth
Hunter Moreno Elizabeth
Esther Guzman Elizabeth
Dennis Stephens Elizabeth
Nettie Franklin Elizabeth
Stanley Gibson Elizabeth
Eugenia Greer Elizabeth
Jeffery Delgado Elizabeth
Clara Cross Elizabeth
Elizabeth Gordon Elizabeth
Eva Schmidt Elizabeth Schmidt
Samuel Schmidt Elizabeth Schmidt
Eva Schmidt Elizabeth
Samuel Schmidt Elizabeth
Elizabeth Gordon Elizabeth Gordon
Elizabeth Gordon Elizabeth
Elizabeth Gordon Elizabeth Gordon
Elizabeth Gordon Elizabeth
first last Elizabeth
Jorge Frank Elizabeth
Hunter Moreno Elizabeth
Esther Guzman Elizabeth
Dennis Stephens Elizabeth
Nettie Franklin Elizabeth
Stanley Gibson Elizabeth
Eugenia Greer Elizabeth
Jeffery Delgado Elizabeth
Clara Cross Elizabeth
Elizabeth Gordon Elizabeth
OK
TESTING THE S3CLUSTER PARTITIONING
first last Elizabeth
Jorge Frank Elizabeth
Hunter Moreno Elizabeth
Esther Guzman Elizabeth
Dennis Stephens Elizabeth
Nettie Franklin Elizabeth
Stanley Gibson Elizabeth
Eugenia Greer Elizabeth
Jeffery Delgado Elizabeth
Clara Cross Elizabeth
Elizabeth Gordon Elizabeth
Eva Schmidt Elizabeth Schmidt
Samuel Schmidt Elizabeth Schmidt
Eva Schmidt Elizabeth
Samuel Schmidt Elizabeth

View File

@ -0,0 +1,136 @@
#!/usr/bin/env bash
# Tags: no-fasttest
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
$CLICKHOUSE_LOCAL -q "SELECT 'TESTING THE FILE HIVE PARTITIONING'"
$CLICKHOUSE_LOCAL -n -q """
set use_hive_partitioning = 1;
SELECT *, _column0 FROM file('$CURDIR/data_hive/partitioning/column0=Elizabeth/sample.parquet') LIMIT 10;
SELECT *, _column0 FROM file('$CURDIR/data_hive/partitioning/column0=Elizabeth/sample.parquet') WHERE column0 = _column0;
SELECT *, _column0, _column1 FROM file('$CURDIR/data_hive/partitioning/column0=Elizabeth/column1=Schmidt/sample.parquet') WHERE column1 = _column1;
SELECT *, _column0 FROM file('$CURDIR/data_hive/partitioning/column0=Elizabeth/column1=Schmidt/sample.parquet') WHERE column1 = _column1;
SELECT *, _column0, _column1 FROM file('$CURDIR/data_hive/partitioning/column0=Elizabeth/column1=Schmidt/sample.parquet') WHERE column0 = _column0 AND column1 = _column1;
SELECT *, _column0 FROM file('$CURDIR/data_hive/partitioning/column0=Elizabeth/column1=Schmidt/sample.parquet') WHERE column0 = _column0 AND column1 = _column1;
SELECT *, _column0, _column1 FROM file('$CURDIR/data_hive/partitioning/column0=Elizabeth/column1=Gordon/sample.parquet') WHERE column1 = _column1;
SELECT *, _column0 FROM file('$CURDIR/data_hive/partitioning/column0=Elizabeth/column1=Gordon/sample.parquet') WHERE column1 = _column1;
SELECT *, _column0, _column1 FROM file('$CURDIR/data_hive/partitioning/column0=Elizabeth/column1=Gordon/sample.parquet') WHERE column0 = _column0 AND column1 = _column1;
SELECT *, _column0 FROM file('$CURDIR/data_hive/partitioning/column0=Elizabeth/column1=Gordon/sample.parquet') WHERE column0 = _column0 AND column1 = _column1;
SELECT *, _non_existing_column FROM file('$CURDIR/data_hive/partitioning/non_existing_column=Elizabeth/sample.parquet') LIMIT 10;
SELECT *, _column0 FROM file('$CURDIR/data_hive/partitioning/column0=*/sample.parquet') WHERE column0 = _column0;
SELECT _number, _date FROM file('$CURDIR/data_hive/partitioning/number=42/date=2020-01-01/sample.parquet') LIMIT 1;
SELECT _array, _float FROM file('$CURDIR/data_hive/partitioning/array=[1,2,3]/float=42.42/sample.parquet') LIMIT 1;
SELECT toTypeName(_array), toTypeName(_float) FROM file('$CURDIR/data_hive/partitioning/array=[1,2,3]/float=42.42/sample.parquet') LIMIT 1;
SELECT count(*) FROM file('$CURDIR/data_hive/partitioning/number=42/date=2020-01-01/sample.parquet') WHERE _number = 42;
"""
$CLICKHOUSE_LOCAL -n -q """
set use_hive_partitioning = 1;
SELECT _identifier FROM file('$CURDIR/data_hive/partitioning/identifier=*/email.csv') LIMIT 2;
SELECT __identifier FROM file('$CURDIR/data_hive/partitioning/identifier=*/email.csv') LIMIT 2;
"""
$CLICKHOUSE_LOCAL -n -q """
set use_hive_partitioning = 1;
SELECT *, _column0 FROM file('$CURDIR/data_hive/partitioning/column0=Elizabeth/column0=Elizabeth/sample.parquet') LIMIT 10;
""" 2>&1 | grep -c "INCORRECT_DATA"
$CLICKHOUSE_LOCAL -n -q """
set use_hive_partitioning = 0;
SELECT *, _column0 FROM file('$CURDIR/data_hive/partitioning/column0=Elizabeth/sample.parquet') LIMIT 10;
""" 2>&1 | grep -c "UNKNOWN_IDENTIFIER"
$CLICKHOUSE_LOCAL -q "SELECT 'TESTING THE URL PARTITIONING'"
$CLICKHOUSE_LOCAL -n -q """
set use_hive_partitioning = 1;
SELECT *, _column0 FROM url('http://localhost:11111/test/hive_partitioning/column0=Elizabeth/sample.parquet') LIMIT 10;
SELECT *, _column0 FROM url('http://localhost:11111/test/hive_partitioning/column0=Elizabeth/sample.parquet') WHERE column0 = _column0;
SELECT *, _column0, _column1 FROM url('http://localhost:11111/test/hive_partitioning/column0=Elizabeth/column1=Schmidt/sample.parquet') WHERE column1 = _column1;
SELECT *, _column0 FROM url('http://localhost:11111/test/hive_partitioning/column0=Elizabeth/column1=Schmidt/sample.parquet') WHERE column1 = _column1;
SELECT *, _column0, _column1 FROM url('http://localhost:11111/test/hive_partitioning/column0=Elizabeth/column1=Schmidt/sample.parquet') WHERE column0 = _column0 AND column1 = _column1;
SELECT *, _column0 FROM url('http://localhost:11111/test/hive_partitioning/column0=Elizabeth/column1=Schmidt/sample.parquet') WHERE column0 = _column0 AND column1 = _column1;
SELECT *, _column0, _column1 FROM url('http://localhost:11111/test/hive_partitioning/column0=Elizabeth/column1=Gordon/sample.parquet') WHERE column1 = _column1;
SELECT *, _column0 FROM url('http://localhost:11111/test/hive_partitioning/column0=Elizabeth/column1=Gordon/sample.parquet') WHERE column1 = _column1;
SELECT *, _column0, _column1 FROM url('http://localhost:11111/test/hive_partitioning/column0=Elizabeth/column1=Gordon/sample.parquet') WHERE column0 = _column0 AND column1 = _column1;
SELECT *, _column0 FROM url('http://localhost:11111/test/hive_partitioning/column0=Elizabeth/column1=Gordon/sample.parquet') WHERE column0 = _column0 AND column1 = _column1;
SELECT *, _non_existing_column FROM url('http://localhost:11111/test/hive_partitioning/non_existing_column=Elizabeth/sample.parquet') LIMIT 10;"""
$CLICKHOUSE_LOCAL -n -q """
set use_hive_partitioning = 0;
SELECT *, _column0 FROM url('http://localhost:11111/test/hive_partitioning/column0=Elizabeth/sample.parquet') LIMIT 10;
""" 2>&1 | grep -c "UNKNOWN_IDENTIFIER"
$CLICKHOUSE_LOCAL -q "SELECT 'TESTING THE S3 PARTITIONING'"
$CLICKHOUSE_CLIENT -n -q """
set use_hive_partitioning = 1;
SELECT *, _column0 FROM s3('http://localhost:11111/test/hive_partitioning/column0=Elizabeth/sample.parquet') LIMIT 10;
SELECT *, _column0 FROM s3('http://localhost:11111/test/hive_partitioning/column0=Elizabeth/sample.parquet') WHERE column0 = _column0;
SELECT *, _column0, _column1 FROM s3('http://localhost:11111/test/hive_partitioning/column0=Elizabeth/column1=Schmidt/sample.parquet') WHERE column1 = _column1;
SELECT *, _column0 FROM s3('http://localhost:11111/test/hive_partitioning/column0=Elizabeth/column1=Schmidt/sample.parquet') WHERE column1 = _column1;
SELECT *, _column0, _column1 FROM s3('http://localhost:11111/test/hive_partitioning/column0=Elizabeth/column1=Schmidt/sample.parquet') WHERE column0 = _column0 AND column1 = _column1;
SELECT *, _column0 FROM s3('http://localhost:11111/test/hive_partitioning/column0=Elizabeth/column1=Schmidt/sample.parquet') WHERE column0 = _column0 AND column1 = _column1;
SELECT *, _column0, _column1 FROM s3('http://localhost:11111/test/hive_partitioning/column0=Elizabeth/column1=Gordon/sample.parquet') WHERE column1 = _column1;
SELECT *, _column0 FROM s3('http://localhost:11111/test/hive_partitioning/column0=Elizabeth/column1=Gordon/sample.parquet') WHERE column1 = _column1;
SELECT *, _column0, _column1 FROM s3('http://localhost:11111/test/hive_partitioning/column0=Elizabeth/column1=Gordon/sample.parquet') WHERE column0 = _column0 AND column1 = _column1;
SELECT *, _column0 FROM s3('http://localhost:11111/test/hive_partitioning/column0=Elizabeth/column1=Gordon/sample.parquet') WHERE column0 = _column0 AND column1 = _column1;
SELECT *, _non_existing_column FROM s3('http://localhost:11111/test/hive_partitioning/non_existing_column=Elizabeth/sample.parquet') LIMIT 10;
SELECT *, _column0 FROM s3('http://localhost:11111/test/hive_partitioning/column0=*/sample.parquet') WHERE column0 = _column0;
"""
$CLICKHOUSE_CLIENT -n -q """
set use_hive_partitioning = 0;
SELECT *, _column0 FROM s3('http://localhost:11111/test/hive_partitioning/column0=Elizabeth/sample.parquet') LIMIT 10;
""" 2>&1 | grep -F -q "UNKNOWN_IDENTIFIER" && echo "OK" || echo "FAIL";
$CLICKHOUSE_LOCAL -q "SELECT 'TESTING THE S3CLUSTER PARTITIONING'"
$CLICKHOUSE_CLIENT -n -q """
set use_hive_partitioning = 1;
SELECT *, _column0 FROM s3Cluster(test_cluster_one_shard_three_replicas_localhost, 'http://localhost:11111/test/hive_partitioning/column0=Elizabeth/sample.parquet') LIMIT 10;
SELECT *, _column0 FROM s3Cluster(test_cluster_one_shard_three_replicas_localhost, 'http://localhost:11111/test/hive_partitioning/column0=Elizabeth/sample.parquet') WHERE column0 = _column0;
SELECT *, _column0, _column1 FROM s3Cluster(test_cluster_one_shard_three_replicas_localhost, 'http://localhost:11111/test/hive_partitioning/column0=Elizabeth/column1=Schmidt/sample.parquet') WHERE column1 = _column1;
SELECT *, _column0 FROM s3Cluster(test_cluster_one_shard_three_replicas_localhost, 'http://localhost:11111/test/hive_partitioning/column0=Elizabeth/column1=Schmidt/sample.parquet') WHERE column1 = _column1;
SELECT *, _column0, _column1 FROM s3Cluster(test_cluster_one_shard_three_replicas_localhost, 'http://localhost:11111/test/hive_partitioning/column0=Elizabeth/column1=Schmidt/sample.parquet') WHERE column0 = _column0 AND column1 = _column1;
SELECT *, _column0 FROM s3Cluster(test_cluster_one_shard_three_replicas_localhost, 'http://localhost:11111/test/hive_partitioning/column0=Elizabeth/column1=Schmidt/sample.parquet') WHERE column0 = _column0 AND column1 = _column1;
"""

View File

@ -0,0 +1,5 @@
_login_email,_identifier,_first_name,_last_name
laura@example.com,2070,Laura,Grey
craig@example.com,4081,Craig,Johnson
mary@example.com,9346,Mary,Jenkins
jamie@example.com,5079,Jamie,Smith
1 _login_email _identifier _first_name _last_name
2 laura@example.com 2070 Laura Grey
3 craig@example.com 4081 Craig Johnson
4 mary@example.com 9346 Mary Jenkins
5 jamie@example.com 5079 Jamie Smith