diff --git a/programs/obfuscator/Obfuscator.cpp b/programs/obfuscator/Obfuscator.cpp index 688ae1a1143..11e85bc1302 100644 --- a/programs/obfuscator/Obfuscator.cpp +++ b/programs/obfuscator/Obfuscator.cpp @@ -1307,7 +1307,9 @@ try throw ErrnoException(ErrorCodes::CANNOT_SEEK_THROUGH_FILE, "Input must be seekable file (it will be read twice)"); SingleReadBufferIterator read_buffer_iterator(std::move(file)); - schema_columns = readSchemaFromFormat(input_format, {}, read_buffer_iterator, context_const); + + std::string sample_string; + schema_columns = readSchemaFromFormat(input_format, {}, read_buffer_iterator, sample_string, context_const); } else { diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 8399d3925db..65b93b893b6 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -935,6 +935,7 @@ class IColumn; M(Bool, iceberg_engine_ignore_schema_evolution, false, "Ignore schema evolution in Iceberg table engine and read all data using latest schema saved on table creation. Note that it can lead to incorrect result", 0) \ M(Bool, allow_deprecated_error_prone_window_functions, false, "Allow usage of deprecated error prone window functions (neighbor, runningAccumulate, runningDifferenceStartingWithFirstValue, runningDifference)", 0) \ M(Bool, allow_deprecated_snowflake_conversion_functions, false, "Enables deprecated functions snowflakeToDateTime[64] and dateTime[64]ToSnowflake.", 0) \ + M(Bool, use_hive_partitioning, false, "Allows to use hive partitioning for File, URL, S3, AzureBlobStorage and HDFS engines.", 0)\ // End of COMMON_SETTINGS // Please add settings related to formats into the FORMAT_FACTORY_SETTINGS, move obsolete settings to OBSOLETE_SETTINGS and obsolete format settings to OBSOLETE_FORMAT_SETTINGS. @@ -1106,11 +1107,6 @@ class IColumn; M(Bool, input_format_tsv_skip_trailing_empty_lines, false, "Skip trailing empty lines in TSV format", 0) \ M(Bool, input_format_custom_skip_trailing_empty_lines, false, "Skip trailing empty lines in CustomSeparated format", 0) \ M(Bool, input_format_tsv_crlf_end_of_line, false, "If it is set true, file function will read TSV format with \\r\\n instead of \\n.", 0) \ - M(Bool, file_hive_partitioning, false, "Allows to use hive partitioning for file format", 0)\ - M(Bool, url_hive_partitioning, false, "Allows to use hive partitioning for url format", 0)\ - M(Bool, s3_hive_partitioning, false, "Allows to use hive partitioning for s3 format", 0)\ - M(Bool, azure_blob_storage_hive_partitioning, false, "Allows to use hive partitioning for AzureBlobStorage format", 0)\ - M(Bool, hdfs_hive_partitioning, false, "Allows to use hive partitioning for hdfs format", 0)\ \ M(Bool, input_format_native_allow_types_conversion, true, "Allow data types conversion in Native input format", 0) \ \ diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index 607f9b6d858..b676cd85ce6 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -59,11 +59,7 @@ static std::initializer_list readSchemaFromFormatImpl( std::optional format_name, const std::optional & format_settings, IReadBufferIterator & read_buffer_iterator, + std::string & sample_path, const ContextPtr & context) try { @@ -143,6 +144,10 @@ try { iterator_data = read_buffer_iterator.next(); + /// Extracting the File path for hive-style partitioning + if (sample_path.empty()) + sample_path = read_buffer_iterator.getLastFilePath(); + /// Read buffer iterator can determine the data format if it's unknown. /// For example by scanning schema cache or by finding new file with format extension. if (!format_name && iterator_data.format_name) @@ -163,7 +168,7 @@ try return {*iterator_data.cached_columns, *format_name}; } - schemas_for_union_mode.emplace_back(iterator_data.cached_columns->getAll(), read_buffer_iterator.getLastFileName()); + schemas_for_union_mode.emplace_back(iterator_data.cached_columns->getAll(), read_buffer_iterator.getLastFilePath()); continue; } @@ -249,7 +254,7 @@ try if (!names_and_types.empty()) read_buffer_iterator.setSchemaToLastFile(ColumnsDescription(names_and_types)); - schemas_for_union_mode.emplace_back(names_and_types, read_buffer_iterator.getLastFileName()); + schemas_for_union_mode.emplace_back(names_and_types, read_buffer_iterator.getLastFilePath()); } catch (...) { @@ -410,7 +415,7 @@ try throw Exception(ErrorCodes::CANNOT_DETECT_FORMAT, "The data format cannot be detected by the contents of the files. You can specify the format manually"); read_buffer_iterator.setSchemaToLastFile(ColumnsDescription(names_and_types)); - schemas_for_union_mode.emplace_back(names_and_types, read_buffer_iterator.getLastFileName()); + schemas_for_union_mode.emplace_back(names_and_types, read_buffer_iterator.getLastFilePath()); } if (format_name && mode == SchemaInferenceMode::DEFAULT) @@ -526,9 +531,9 @@ try } catch (Exception & e) { - auto file_name = read_buffer_iterator.getLastFileName(); - if (!file_name.empty()) - e.addMessage(fmt::format("(in file/uri {})", file_name)); + auto file_path = read_buffer_iterator.getLastFilePath(); + if (!file_path.empty()) + e.addMessage(fmt::format("(in file/uri {})", file_path)); throw; } @@ -536,17 +541,19 @@ ColumnsDescription readSchemaFromFormat( const String & format_name, const std::optional & format_settings, IReadBufferIterator & read_buffer_iterator, + std::string & sample_path, const ContextPtr & context) { - return readSchemaFromFormatImpl(format_name, format_settings, read_buffer_iterator, context).first; + return readSchemaFromFormatImpl(format_name, format_settings, read_buffer_iterator, sample_path, context).first; } std::pair detectFormatAndReadSchema( const std::optional & format_settings, IReadBufferIterator & read_buffer_iterator, + std::string & sample_path, const ContextPtr & context) { - return readSchemaFromFormatImpl(std::nullopt, format_settings, read_buffer_iterator, context); + return readSchemaFromFormatImpl(std::nullopt, format_settings, read_buffer_iterator, sample_path, context); } SchemaCache::Key getKeyForSchemaCache( diff --git a/src/Formats/ReadSchemaUtils.h b/src/Formats/ReadSchemaUtils.h index bb5e068f696..6c562a06bf0 100644 --- a/src/Formats/ReadSchemaUtils.h +++ b/src/Formats/ReadSchemaUtils.h @@ -56,8 +56,8 @@ struct IReadBufferIterator /// Set auto detected format name. virtual void setFormatName(const String & /*format_name*/) {} - /// Get last processed file name for better exception messages. - virtual String getLastFileName() const { return ""; } + /// Get last processed file path for better exception messages. + virtual String getLastFilePath() const { return ""; } /// Return true if method recreateLastReadBuffer is implemented. virtual bool supportsLastReadBufferRecreation() const { return false; } @@ -122,6 +122,7 @@ ColumnsDescription readSchemaFromFormat( const String & format_name, const std::optional & format_settings, IReadBufferIterator & read_buffer_iterator, + std::string & sample_path, const ContextPtr & context); /// Try to detect the format of the data and it's schema. @@ -131,6 +132,7 @@ ColumnsDescription readSchemaFromFormat( std::pair detectFormatAndReadSchema( const std::optional & format_settings, IReadBufferIterator & read_buffer_iterator, + std::string & sample_path, const ContextPtr & context); SchemaCache::Key getKeyForSchemaCache(const String & source, const String & format, const std::optional & format_settings, const ContextPtr & context); diff --git a/src/Storages/ObjectStorage/DataLakes/IStorageDataLake.h b/src/Storages/ObjectStorage/DataLakes/IStorageDataLake.h index 83865c47eb8..5c40cda442b 100644 --- a/src/Storages/ObjectStorage/DataLakes/IStorageDataLake.h +++ b/src/Storages/ObjectStorage/DataLakes/IStorageDataLake.h @@ -89,8 +89,9 @@ public: { ConfigurationPtr configuration = base_configuration->clone(); configuration->setPaths(metadata->getDataFiles()); + std::string sample_string; return Storage::resolveSchemaFromData( - object_storage_, configuration, format_settings_, local_context); + object_storage_, configuration, format_settings_, sample_string, local_context); } } diff --git a/src/Storages/ObjectStorage/ReadBufferIterator.cpp b/src/Storages/ObjectStorage/ReadBufferIterator.cpp index 78cdc442f64..a47049791ae 100644 --- a/src/Storages/ObjectStorage/ReadBufferIterator.cpp +++ b/src/Storages/ObjectStorage/ReadBufferIterator.cpp @@ -131,7 +131,7 @@ void ReadBufferIterator::setFormatName(const String & format_name) format = format_name; } -String ReadBufferIterator::getLastFileName() const +String ReadBufferIterator::getLastFilePath() const { if (current_object_info) return current_object_info->getPath(); diff --git a/src/Storages/ObjectStorage/ReadBufferIterator.h b/src/Storages/ObjectStorage/ReadBufferIterator.h index 6eeb52ec2ed..b81aebb7b07 100644 --- a/src/Storages/ObjectStorage/ReadBufferIterator.h +++ b/src/Storages/ObjectStorage/ReadBufferIterator.h @@ -33,7 +33,7 @@ public: void setResultingSchema(const ColumnsDescription & columns) override; - String getLastFileName() const override; + String getLastFilePath() const override; void setFormatName(const String & format_name) override; diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.cpp b/src/Storages/ObjectStorage/StorageObjectStorage.cpp index ae7c211330c..717f48983f3 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorage.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorage.cpp @@ -33,17 +33,22 @@ namespace ErrorCodes extern const int LOGICAL_ERROR; } - -bool checkIfHiveSettingEnabled(const ContextPtr & context, const std::string & storage_type_name) +std::string StorageObjectStorage::getPathSample(StorageInMemoryMetadata metadata, ContextPtr context) { - if (storage_type_name == "s3") - return context->getSettings().s3_hive_partitioning; - else if (storage_type_name == "hdfs") - return context->getSettings().hdfs_hive_partitioning; - else if (storage_type_name == "azure") - return context->getSettings().azure_blob_storage_hive_partitioning; - else - throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Unsupported storage type: {}", storage_type_name); + auto file_iterator = StorageObjectStorageSource::createFileIterator( + configuration, + object_storage, + distributed_processing, + context, + {}, // predicate + metadata.getColumns().getAll(), // virtual_columns + nullptr, // read_keys + {} // file_progress_callback + ); + + if (auto file = file_iterator->next(0)) + return file->getPath(); + return ""; } StorageObjectStorage::StorageObjectStorage( @@ -66,7 +71,9 @@ StorageObjectStorage::StorageObjectStorage( , log(getLogger(fmt::format("Storage{}({})", configuration->getEngineName(), table_id_.getFullTableName()))) { ColumnsDescription columns{columns_}; - resolveSchemaAndFormat(columns, configuration->format, object_storage, configuration, format_settings, context); + + std::string sample_path; + resolveSchemaAndFormat(columns, configuration->format, object_storage, configuration, format_settings, sample_path, context); configuration->check(context); StorageInMemoryMetadata metadata; @@ -74,23 +81,13 @@ StorageObjectStorage::StorageObjectStorage( metadata.setConstraints(constraints_); metadata.setComment(comment); - auto file_iterator = StorageObjectStorageSource::createFileIterator( - configuration, - object_storage, - distributed_processing_, - context, - {}, // predicate - metadata.getColumns().getAll(), // virtual_columns - nullptr, // read_keys - {} // file_progress_callback - ); + + if (sample_path.empty() && context->getSettings().use_hive_partitioning) + sample_path = getPathSample(metadata, context); + else if (!context->getSettings().use_hive_partitioning) + sample_path = ""; - Strings paths; - - if (checkIfHiveSettingEnabled(context, configuration->getTypeName())) - if (auto file = file_iterator->next(0)) - paths = {file->getPath()}; - setVirtuals(VirtualColumnUtils::getVirtualsForFileLikeStorage(metadata.getColumns(), paths)); + setVirtuals(VirtualColumnUtils::getVirtualsForFileLikeStorage(metadata.getColumns(), sample_path)); setInMemoryMetadata(metadata); } @@ -386,33 +383,36 @@ ColumnsDescription StorageObjectStorage::resolveSchemaFromData( const ObjectStoragePtr & object_storage, const ConfigurationPtr & configuration, const std::optional & format_settings, + std::string & sample_path, const ContextPtr & context) { ObjectInfos read_keys; auto iterator = createReadBufferIterator(object_storage, configuration, format_settings, read_keys, context); - return readSchemaFromFormat(configuration->format, format_settings, *iterator, context); + return readSchemaFromFormat(configuration->format, format_settings, *iterator, sample_path, context); } std::string StorageObjectStorage::resolveFormatFromData( const ObjectStoragePtr & object_storage, const ConfigurationPtr & configuration, const std::optional & format_settings, + std::string & sample_path, const ContextPtr & context) { ObjectInfos read_keys; auto iterator = createReadBufferIterator(object_storage, configuration, format_settings, read_keys, context); - return detectFormatAndReadSchema(format_settings, *iterator, context).second; + return detectFormatAndReadSchema(format_settings, *iterator, sample_path, context).second; } std::pair StorageObjectStorage::resolveSchemaAndFormatFromData( const ObjectStoragePtr & object_storage, const ConfigurationPtr & configuration, const std::optional & format_settings, + std::string & sample_path, const ContextPtr & context) { ObjectInfos read_keys; auto iterator = createReadBufferIterator(object_storage, configuration, format_settings, read_keys, context); - auto [columns, format] = detectFormatAndReadSchema(format_settings, *iterator, context); + auto [columns, format] = detectFormatAndReadSchema(format_settings, *iterator, sample_path, context); configuration->format = format; return std::pair(columns, format); } diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.h b/src/Storages/ObjectStorage/StorageObjectStorage.h index cf8ec113653..dd7ec31c970 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorage.h +++ b/src/Storages/ObjectStorage/StorageObjectStorage.h @@ -42,6 +42,7 @@ public: size_t list_object_keys_size; bool throw_on_zero_files_match; bool ignore_non_existent_file; + bool use_hive_partitioning; }; StorageObjectStorage( @@ -100,23 +101,28 @@ public: const ObjectStoragePtr & object_storage, const ConfigurationPtr & configuration, const std::optional & format_settings, + std::string & sample_path, const ContextPtr & context); static std::string resolveFormatFromData( const ObjectStoragePtr & object_storage, const ConfigurationPtr & configuration, const std::optional & format_settings, + std::string & sample_path, const ContextPtr & context); static std::pair resolveSchemaAndFormatFromData( const ObjectStoragePtr & object_storage, const ConfigurationPtr & configuration, const std::optional & format_settings, + std::string & sample_path, const ContextPtr & context); protected: virtual void updateConfiguration(ContextPtr local_context); + std::string getPathSample(StorageInMemoryMetadata metadata, ContextPtr context); + static std::unique_ptr createReadBufferIterator( const ObjectStoragePtr & object_storage, const ConfigurationPtr & configuration, diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp index 78f568d8ae2..0dc4b845a47 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp @@ -33,7 +33,8 @@ StorageObjectStorageCluster::StorageObjectStorageCluster( , object_storage(object_storage_) { ColumnsDescription columns{columns_}; - resolveSchemaAndFormat(columns, configuration->format, object_storage, configuration, {}, context_); + std::string sample_path; + resolveSchemaAndFormat(columns, configuration->format, object_storage, configuration, {}, sample_path, context_); configuration->check(context_); StorageInMemoryMetadata metadata; diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp index afb23961312..ecb3ff9d856 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp @@ -197,16 +197,6 @@ Chunk StorageObjectStorageSource::generate() const auto & filename = object_info->getFileName(); chassert(object_info->metadata); - auto hive_map = VirtualColumnUtils::parsePartitionMapFromPath(object_info->getPath()); - bool contains_virtual_column = std::any_of(hive_map.begin(), hive_map.end(), - [&](const auto& pair) - { - return read_from_format_info.requested_virtual_columns.contains(pair.first); - }); - - if (!contains_virtual_column) - hive_map.clear(); // If we cannot find any virtual column in requested, we don't add any of them to chunk - VirtualColumnUtils::addRequestedFileLikeStorageVirtualsToChunk( chunk, read_from_format_info.requested_virtual_columns, { @@ -214,8 +204,7 @@ Chunk StorageObjectStorageSource::generate() .size = object_info->metadata->size_bytes, .filename = &filename, .last_modified = object_info->metadata->last_modified, - .hive_partitioning_map = hive_map - }); + }, object_info->getPath()); return chunk; } diff --git a/src/Storages/ObjectStorage/Utils.cpp b/src/Storages/ObjectStorage/Utils.cpp index e49e14d2a0c..73410d959e0 100644 --- a/src/Storages/ObjectStorage/Utils.cpp +++ b/src/Storages/ObjectStorage/Utils.cpp @@ -49,19 +49,20 @@ void resolveSchemaAndFormat( ObjectStoragePtr object_storage, const StorageObjectStorage::ConfigurationPtr & configuration, std::optional format_settings, + std::string & sample_path, const ContextPtr & context) { if (columns.empty()) { if (format == "auto") std::tie(columns, format) = - StorageObjectStorage::resolveSchemaAndFormatFromData(object_storage, configuration, format_settings, context); + StorageObjectStorage::resolveSchemaAndFormatFromData(object_storage, configuration, format_settings, sample_path, context); else - columns = StorageObjectStorage::resolveSchemaFromData(object_storage, configuration, format_settings, context); + columns = StorageObjectStorage::resolveSchemaFromData(object_storage, configuration, format_settings, sample_path, context); } else if (format == "auto") { - format = StorageObjectStorage::resolveFormatFromData(object_storage, configuration, format_settings, context); + format = StorageObjectStorage::resolveFormatFromData(object_storage, configuration, format_settings, sample_path, context); } if (!columns.hasOnlyOrdinary()) diff --git a/src/Storages/ObjectStorage/Utils.h b/src/Storages/ObjectStorage/Utils.h index 2077999df41..7ee14f50979 100644 --- a/src/Storages/ObjectStorage/Utils.h +++ b/src/Storages/ObjectStorage/Utils.h @@ -19,6 +19,7 @@ void resolveSchemaAndFormat( ObjectStoragePtr object_storage, const StorageObjectStorage::ConfigurationPtr & configuration, std::optional format_settings, + std::string & sample_path, const ContextPtr & context); } diff --git a/src/Storages/ObjectStorageQueue/StorageObjectStorageQueue.cpp b/src/Storages/ObjectStorageQueue/StorageObjectStorageQueue.cpp index 95265cde9ea..c12cdddeec7 100644 --- a/src/Storages/ObjectStorageQueue/StorageObjectStorageQueue.cpp +++ b/src/Storages/ObjectStorageQueue/StorageObjectStorageQueue.cpp @@ -160,7 +160,8 @@ StorageObjectStorageQueue::StorageObjectStorageQueue( configuration->check(context_); ColumnsDescription columns{columns_}; - resolveSchemaAndFormat(columns, configuration->format, object_storage, configuration, format_settings, context_); + std::string sample_path; + resolveSchemaAndFormat(columns, configuration->format, object_storage, configuration, format_settings, sample_path, context_); configuration->check(context_); StorageInMemoryMetadata storage_metadata; diff --git a/src/Storages/StorageFile.cpp b/src/Storages/StorageFile.cpp index 0c32f29cb34..9751d596fff 100644 --- a/src/Storages/StorageFile.cpp +++ b/src/Storages/StorageFile.cpp @@ -502,7 +502,7 @@ namespace StorageFile::getSchemaCache(getContext()).addManyColumns(cache_keys, columns); } - String getLastFileName() const override + String getLastFilePath() const override { if (current_index != 0) return paths[current_index - 1]; @@ -777,7 +777,7 @@ namespace format = format_name; } - String getLastFileName() const override + String getLastFilePath() const override { return last_read_file_path; } @@ -880,10 +880,11 @@ std::pair StorageFile::getTableStructureAndFormatFro auto read_buffer_iterator = SingleReadBufferIterator(std::move(read_buf)); ColumnsDescription columns; + std::string sample_path; if (format) - columns = readSchemaFromFormat(*format, format_settings, read_buffer_iterator, context); + columns = readSchemaFromFormat(*format, format_settings, read_buffer_iterator, sample_path, context); else - std::tie(columns, format) = detectFormatAndReadSchema(format_settings, read_buffer_iterator, context); + std::tie(columns, format) = detectFormatAndReadSchema(format_settings, read_buffer_iterator, sample_path, context); peekable_read_buffer_from_fd = read_buffer_iterator.releaseBuffer(); if (peekable_read_buffer_from_fd) @@ -928,20 +929,21 @@ std::pair StorageFile::getTableStructureAndFormatFro } + std::string sample_path; if (archive_info) { ReadBufferFromArchiveIterator read_buffer_iterator(*archive_info, format, format_settings, context); if (format) - return {readSchemaFromFormat(*format, format_settings, read_buffer_iterator, context), *format}; + return {readSchemaFromFormat(*format, format_settings, read_buffer_iterator, sample_path, context), *format}; - return detectFormatAndReadSchema(format_settings, read_buffer_iterator, context); + return detectFormatAndReadSchema(format_settings, read_buffer_iterator, sample_path, context); } ReadBufferFromFileIterator read_buffer_iterator(paths, format, compression_method, format_settings, context); if (format) - return {readSchemaFromFormat(*format, format_settings, read_buffer_iterator, context), *format}; + return {readSchemaFromFormat(*format, format_settings, read_buffer_iterator, sample_path, context), *format}; - return detectFormatAndReadSchema(format_settings, read_buffer_iterator, context); + return detectFormatAndReadSchema(format_settings, read_buffer_iterator, sample_path, context); } ColumnsDescription StorageFile::getTableStructureFromFile( @@ -1097,10 +1099,10 @@ void StorageFile::setStorageMetadata(CommonArguments args) storage_metadata.setComment(args.comment); setInMemoryMetadata(storage_metadata); - Strings paths_for_virtuals; - if (args.getContext()->getSettingsRef().file_hive_partitioning) - paths_for_virtuals = paths; - setVirtuals(VirtualColumnUtils::getVirtualsForFileLikeStorage(storage_metadata.getColumns(), paths_for_virtuals)); + std::string path_for_virtuals; + if (args.getContext()->getSettingsRef().use_hive_partitioning && !paths.empty()) + path_for_virtuals = paths[0]; + setVirtuals(VirtualColumnUtils::getVirtualsForFileLikeStorage(storage_metadata.getColumns(), path_for_virtuals, format_settings.value_or(FormatSettings{}))); } @@ -1442,14 +1444,9 @@ Chunk StorageFileSource::generate() chunk_size = input_format->getApproxBytesReadForChunk(); progress(num_rows, chunk_size ? chunk_size : chunk.bytes()); - std::map hive_map; - if (getContext()->getSettingsRef().file_hive_partitioning) - { - hive_map = VirtualColumnUtils::parsePartitionMapFromPath(current_path); - - for (const auto& item : hive_map) - requested_virtual_columns.push_back(NameAndTypePair(item.first, std::make_shared())); - } + std::string hive_partitioning_path; + if (getContext()->getSettingsRef().use_hive_partitioning) + hive_partitioning_path = current_path; /// Enrich with virtual columns. VirtualColumnUtils::addRequestedFileLikeStorageVirtualsToChunk( @@ -1459,8 +1456,7 @@ Chunk StorageFileSource::generate() .size = current_file_size, .filename = (filename_override.has_value() ? &filename_override.value() : nullptr), .last_modified = current_file_last_modified, - .hive_partitioning_map = hive_map - }); + }, hive_partitioning_path); return chunk; } @@ -1636,16 +1632,6 @@ void ReadFromFile::createIterator(const ActionsDAG::Node * predicate) storage->distributed_processing); } -void addPartitionColumnsToInfoHeader(Strings paths, ReadFromFormatInfo & info) -{ - for (const auto& path : paths) - { - auto map = VirtualColumnUtils::parsePartitionMapFromPath(path); - for (const auto& item : map) - info.source_header.insertUnique(ColumnWithTypeAndName(std::make_shared(), item.first)); - } -} - void ReadFromFile::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) { createIterator(nullptr); @@ -1665,9 +1651,6 @@ void ReadFromFile::initializePipeline(QueryPipelineBuilder & pipeline, const Bui paths = storage->paths; } - if (getContext()->getSettingsRef().file_hive_partitioning) - addPartitionColumnsToInfoHeader(paths, info); - if (max_num_streams > files_to_read) num_streams = files_to_read; diff --git a/src/Storages/StorageURL.cpp b/src/Storages/StorageURL.cpp index f6374701fc2..59c5465a381 100644 --- a/src/Storages/StorageURL.cpp +++ b/src/Storages/StorageURL.cpp @@ -153,10 +153,10 @@ IStorageURLBase::IStorageURLBase( storage_metadata.setComment(comment); setInMemoryMetadata(storage_metadata); - Strings uri_for_partitioning; - if (context_->getSettingsRef().url_hive_partitioning) - uri_for_partitioning = {uri}; - setVirtuals(VirtualColumnUtils::getVirtualsForFileLikeStorage(storage_metadata.getColumns(), uri_for_partitioning)); + std::string uri_for_partitioning; + if (context_->getSettingsRef().use_hive_partitioning) + uri_for_partitioning = uri; + setVirtuals(VirtualColumnUtils::getVirtualsForFileLikeStorage(storage_metadata.getColumns(), uri_for_partitioning, format_settings.value_or(FormatSettings{}))); } @@ -415,9 +415,9 @@ Chunk StorageURLSource::generate() size_t chunk_size = 0; if (input_format) chunk_size = input_format->getApproxBytesReadForChunk(); - std::map hive_map; - if (getContext()->getSettingsRef().url_hive_partitioning) - hive_map = VirtualColumnUtils::parsePartitionMapFromPath(curr_uri.getPath()); + std::string hive_partitioning_path; + if (getContext()->getSettingsRef().use_hive_partitioning) + hive_partitioning_path = curr_uri.getPath(); progress(num_rows, chunk_size ? chunk_size : chunk.bytes()); VirtualColumnUtils::addRequestedFileLikeStorageVirtualsToChunk( @@ -425,8 +425,7 @@ Chunk StorageURLSource::generate() { .path = curr_uri.getPath(), .size = current_file_size, - .hive_partitioning_map = hive_map - }); + }, hive_partitioning_path); return chunk; } @@ -859,7 +858,7 @@ namespace format = format_name; } - String getLastFileName() const override { return current_url_option; } + String getLastFilePath() const override { return current_url_option; } bool supportsLastReadBufferRecreation() const override { return true; } @@ -960,9 +959,10 @@ std::pair IStorageURLBase::getTableStructureAndForma urls_to_check = {uri}; ReadBufferIterator read_buffer_iterator(urls_to_check, format, compression_method, headers, format_settings, context); + std::string sample_path; if (format) - return {readSchemaFromFormat(*format, format_settings, read_buffer_iterator, context), *format}; - return detectFormatAndReadSchema(format_settings, read_buffer_iterator, context); + return {readSchemaFromFormat(*format, format_settings, read_buffer_iterator, sample_path, context), *format}; + return detectFormatAndReadSchema(format_settings, read_buffer_iterator, sample_path, context); } ColumnsDescription IStorageURLBase::getTableStructureFromData( diff --git a/src/Storages/VirtualColumnUtils.cpp b/src/Storages/VirtualColumnUtils.cpp index 0b79e3b7a16..379b14d8e51 100644 --- a/src/Storages/VirtualColumnUtils.cpp +++ b/src/Storages/VirtualColumnUtils.cpp @@ -38,6 +38,7 @@ #include #include #include +#include #include "Functions/FunctionsLogical.h" #include "Functions/IFunction.h" #include "Functions/IFunctionAdaptors.h" @@ -115,22 +116,19 @@ NameSet getVirtualNamesForFileLikeStorage() return {"_path", "_file", "_size", "_time"}; } -Strings parseVirtualColumnNameFromPath(const std::string & path) +std::map parseFromPath(const std::string& path) { std::string pattern = "/([^/]+)=([^/]+)"; - // Map to store the key-value pairs - std::map key_values; - re2::StringPiece input_piece(path); - std::string key; - Strings result; - while (RE2::FindAndConsume(&input_piece, pattern, &key)) - result.push_back(key); - return result; + std::map key_values; + std::string key, value; + while (RE2::FindAndConsume(&input_piece, pattern, &key, &value)) + key_values["_" + key] = value; + return key_values; } -VirtualColumnsDescription getVirtualsForFileLikeStorage(const ColumnsDescription & storage_columns, Strings paths) +VirtualColumnsDescription getVirtualsForFileLikeStorage(const ColumnsDescription & storage_columns, std::string path, FormatSettings settings) { VirtualColumnsDescription desc; @@ -147,11 +145,13 @@ VirtualColumnsDescription getVirtualsForFileLikeStorage(const ColumnsDescription add_virtual("_size", makeNullable(std::make_shared())); add_virtual("_time", makeNullable(std::make_shared())); - for (const auto& path : paths) + auto map = parseFromPath(path); + for (const auto& item : map) { - auto names = parseVirtualColumnNameFromPath(path); - for (const auto& name : names) - add_virtual("_" + name, std::make_shared(std::make_shared())); + auto type = tryInferDataTypeForSingleField(item.second, settings); + if (type == nullptr) + type = std::make_shared(); + add_virtual(item.first, std::make_shared(type)); } return desc; @@ -213,25 +213,11 @@ ColumnPtr getFilterByPathAndFileIndexes(const std::vector & paths, const return block.getByName("_idx").column; } -std::map parsePartitionMapFromPath(const std::string & path) -{ - std::string pattern = "/([^/]+)=([^/]+)"; // Regex to capture key=value pairs - // Map to store the key-value pairs - std::map key_values; - - re2::StringPiece input_piece(path); - std::string key; - std::string value; - while (RE2::FindAndConsume(&input_piece, pattern, &key, &value)) - key_values["_" + key] = value; - - return key_values; -} - void addRequestedFileLikeStorageVirtualsToChunk( Chunk & chunk, const NamesAndTypesList & requested_virtual_columns, - VirtualsForFileLikeStorage virtual_values) + VirtualsForFileLikeStorage virtual_values, const std::string & hive_partitioning_path) { + auto hive_map = parseFromPath(hive_partitioning_path); for (const auto & virtual_column : requested_virtual_columns) { if (virtual_column.name == "_path") @@ -265,13 +251,22 @@ void addRequestedFileLikeStorageVirtualsToChunk( else chunk.addColumn(virtual_column.type->createColumnConstWithDefaultValue(chunk.getNumRows())->convertToFullColumnIfConst()); } - else + else if (!hive_map.empty()) { - auto it = virtual_values.hive_partitioning_map.find(virtual_column.getNameInStorage()); - if (it != virtual_values.hive_partitioning_map.end()) + bool contains_virtual_column = std::any_of(hive_map.begin(), hive_map.end(), + [&](const auto& pair) + { + return requested_virtual_columns.contains(pair.first); + }); + + if (!contains_virtual_column) + hive_map.clear(); // If we cannot find any virtual column in requested, we don't add any of them to chunk + + auto it = hive_map.find(virtual_column.getNameInStorage()); + if (it != hive_map.end()) { chunk.addColumn(virtual_column.getTypeInStorage()->createColumnConst(chunk.getNumRows(), it->second)->convertToFullColumnIfConst()); - virtual_values.hive_partitioning_map.erase(it); + hive_map.erase(it); } } } diff --git a/src/Storages/VirtualColumnUtils.h b/src/Storages/VirtualColumnUtils.h index f9b49cc48ed..72922be60bd 100644 --- a/src/Storages/VirtualColumnUtils.h +++ b/src/Storages/VirtualColumnUtils.h @@ -5,6 +5,7 @@ #include #include #include +#include #include #include @@ -49,7 +50,7 @@ auto extractSingleValueFromBlock(const Block & block, const String & name) } NameSet getVirtualNamesForFileLikeStorage(); -VirtualColumnsDescription getVirtualsForFileLikeStorage(const ColumnsDescription & storage_columns, Strings paths = {}); +VirtualColumnsDescription getVirtualsForFileLikeStorage(const ColumnsDescription & storage_columns, std::string path = "", FormatSettings settings = FormatSettings()); ActionsDAGPtr createPathAndFileFilterDAG(const ActionsDAG::Node * predicate, const NamesAndTypesList & virtual_columns); @@ -76,15 +77,13 @@ struct VirtualsForFileLikeStorage std::optional size { std::nullopt }; const String * filename { nullptr }; std::optional last_modified { std::nullopt }; - std::map hive_partitioning_map {}; - }; -std::map parsePartitionMapFromPath(const std::string & path); +std::map parseFromPath(const std::string& path); void addRequestedFileLikeStorageVirtualsToChunk( Chunk & chunk, const NamesAndTypesList & requested_virtual_columns, - VirtualsForFileLikeStorage virtual_values); + VirtualsForFileLikeStorage virtual_values, const std::string & hive_partitioning_path = ""); } } diff --git a/src/TableFunctions/TableFunctionFormat.cpp b/src/TableFunctions/TableFunctionFormat.cpp index ad2a142a140..66152cb0c91 100644 --- a/src/TableFunctions/TableFunctionFormat.cpp +++ b/src/TableFunctions/TableFunctionFormat.cpp @@ -85,9 +85,10 @@ ColumnsDescription TableFunctionFormat::getActualTableStructure(ContextPtr conte if (structure == "auto") { SingleReadBufferIterator read_buffer_iterator(std::make_unique(data)); + std::string sample_path; if (format == "auto") - return detectFormatAndReadSchema(std::nullopt, read_buffer_iterator, context).first; - return readSchemaFromFormat(format, std::nullopt, read_buffer_iterator, context); + return detectFormatAndReadSchema(std::nullopt, read_buffer_iterator, sample_path, context).first; + return readSchemaFromFormat(format, std::nullopt, read_buffer_iterator, sample_path, context); } return parseColumnsListFromString(structure, context); } @@ -131,11 +132,12 @@ StoragePtr TableFunctionFormat::executeImpl(const ASTPtr & /*ast_function*/, Con String format_name = format; if (structure == "auto") { + std::string sample_path; SingleReadBufferIterator read_buffer_iterator(std::make_unique(data)); if (format_name == "auto") - std::tie(columns, format_name) = detectFormatAndReadSchema(std::nullopt, read_buffer_iterator, context); + std::tie(columns, format_name) = detectFormatAndReadSchema(std::nullopt, read_buffer_iterator, sample_path, context); else - columns = readSchemaFromFormat(format, std::nullopt, read_buffer_iterator, context); + columns = readSchemaFromFormat(format, std::nullopt, read_buffer_iterator, sample_path, context); } else { diff --git a/src/TableFunctions/TableFunctionObjectStorage.cpp b/src/TableFunctions/TableFunctionObjectStorage.cpp index 550d9cc799b..39392a4c44c 100644 --- a/src/TableFunctions/TableFunctionObjectStorage.cpp +++ b/src/TableFunctions/TableFunctionObjectStorage.cpp @@ -84,7 +84,8 @@ ColumnsDescription TableFunctionObjectStorage< context->checkAccess(getSourceAccessType()); ColumnsDescription columns; auto storage = getObjectStorage(context, !is_insert_query); - resolveSchemaAndFormat(columns, configuration->format, storage, configuration, std::nullopt, context); + std::string sample_path; + resolveSchemaAndFormat(columns, configuration->format, storage, configuration, std::nullopt, sample_path, context); return columns; } else diff --git a/tests/integration/test_hive_style_partitioning_hdfs_azure/__init__.py b/tests/integration/test_hive_style_partitioning_hdfs_azure/__init__.py deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/tests/integration/test_hive_style_partitioning_hdfs_azure/configs/cluster_azure.xml b/tests/integration/test_hive_style_partitioning_hdfs_azure/configs/cluster_azure.xml deleted file mode 100644 index ffa4673c9ee..00000000000 --- a/tests/integration/test_hive_style_partitioning_hdfs_azure/configs/cluster_azure.xml +++ /dev/null @@ -1,39 +0,0 @@ - - - - - - node_0 - 9000 - - - node_1 - 9000 - - - node_2 - 9000 - - - - - - - - node_0 - 9000 - - - - - node_1 - 19000 - - - - - - - simple_cluster - - \ No newline at end of file diff --git a/tests/integration/test_hive_style_partitioning_hdfs_azure/configs/cluster_hdfs.xml b/tests/integration/test_hive_style_partitioning_hdfs_azure/configs/cluster_hdfs.xml deleted file mode 100644 index b99b21ea40b..00000000000 --- a/tests/integration/test_hive_style_partitioning_hdfs_azure/configs/cluster_hdfs.xml +++ /dev/null @@ -1,33 +0,0 @@ - - - - - - node1 - 9000 - - - - - node1 - 19000 - - - - - - - - 127.0.0.1 - 9000 - - - - - 127.0.0.2 - 9000 - - - - - diff --git a/tests/integration/test_hive_style_partitioning_hdfs_azure/configs/disable_profilers_azure.xml b/tests/integration/test_hive_style_partitioning_hdfs_azure/configs/disable_profilers_azure.xml deleted file mode 100644 index a39badbf8ec..00000000000 --- a/tests/integration/test_hive_style_partitioning_hdfs_azure/configs/disable_profilers_azure.xml +++ /dev/null @@ -1,9 +0,0 @@ - - - - - 0 - 0 - - - diff --git a/tests/integration/test_hive_style_partitioning_hdfs_azure/configs/macro_hdfs.xml b/tests/integration/test_hive_style_partitioning_hdfs_azure/configs/macro_hdfs.xml deleted file mode 100644 index c2e11b47a5e..00000000000 --- a/tests/integration/test_hive_style_partitioning_hdfs_azure/configs/macro_hdfs.xml +++ /dev/null @@ -1,5 +0,0 @@ - - - test_cluster_two_shards - - \ No newline at end of file diff --git a/tests/integration/test_hive_style_partitioning_hdfs_azure/configs/named_collections_azure.xml b/tests/integration/test_hive_style_partitioning_hdfs_azure/configs/named_collections_azure.xml deleted file mode 100644 index bd7f9ff97f1..00000000000 --- a/tests/integration/test_hive_style_partitioning_hdfs_azure/configs/named_collections_azure.xml +++ /dev/null @@ -1,14 +0,0 @@ - - - - cont - test_simple_write_named.csv - key UInt64, data String - CSV - - - devstoreaccount1 - Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw== - - - diff --git a/tests/integration/test_hive_style_partitioning_hdfs_azure/configs/schema_cache_azure.xml b/tests/integration/test_hive_style_partitioning_hdfs_azure/configs/schema_cache_azure.xml deleted file mode 100644 index e2168ecd06d..00000000000 --- a/tests/integration/test_hive_style_partitioning_hdfs_azure/configs/schema_cache_azure.xml +++ /dev/null @@ -1,3 +0,0 @@ - - 2 - \ No newline at end of file diff --git a/tests/integration/test_hive_style_partitioning_hdfs_azure/configs/schema_cache_hdfs.xml b/tests/integration/test_hive_style_partitioning_hdfs_azure/configs/schema_cache_hdfs.xml deleted file mode 100644 index 37639649b5f..00000000000 --- a/tests/integration/test_hive_style_partitioning_hdfs_azure/configs/schema_cache_hdfs.xml +++ /dev/null @@ -1,3 +0,0 @@ - - 2 - \ No newline at end of file diff --git a/tests/integration/test_hive_style_partitioning_hdfs_azure/configs/users_azure.xml b/tests/integration/test_hive_style_partitioning_hdfs_azure/configs/users_azure.xml deleted file mode 100644 index 4b6ba057ecb..00000000000 --- a/tests/integration/test_hive_style_partitioning_hdfs_azure/configs/users_azure.xml +++ /dev/null @@ -1,9 +0,0 @@ - - - - - default - 1 - - - diff --git a/tests/integration/test_hive_style_partitioning_hdfs_azure/test_azure.py b/tests/integration/test_hive_style_partitioning_hdfs_azure/test_azure.py deleted file mode 100644 index 0be697821f0..00000000000 --- a/tests/integration/test_hive_style_partitioning_hdfs_azure/test_azure.py +++ /dev/null @@ -1,219 +0,0 @@ -#!/usr/bin/env python3 - -import pytest -import time - -from helpers.cluster import ClickHouseCluster, is_arm -import re - -from azure.storage.blob import BlobServiceClient -from helpers.cluster import ClickHouseCluster, ClickHouseInstance - -if is_arm(): - pytestmark = pytest.mark.skip - - -@pytest.fixture(scope="module") -def cluster(): - try: - cluster = ClickHouseCluster(__file__) - cluster.add_instance( - "node", - main_configs=[ - "configs/named_collections_azure.xml", - "configs/schema_cache_azure.xml", - ], - user_configs=[ - "configs/disable_profilers_azure.xml", - "configs/users_azure.xml", - ], - with_azurite=True, - ) - cluster.start() - container_client = cluster.blob_service_client.get_container_client("cont") - container_client.create_container() - yield cluster - finally: - cluster.shutdown() - - -def azure_query( - node, query, expect_error=False, try_num=10, settings={}, query_on_retry=None -): - for i in range(try_num): - try: - if expect_error: - return node.query_and_get_error(query, settings=settings) - else: - return node.query(query, settings=settings) - except Exception as ex: - retriable_errors = [ - "DB::Exception: Azure::Core::Http::TransportException: Connection was closed by the server while trying to read a response", - "DB::Exception: Azure::Core::Http::TransportException: Connection closed before getting full response or response is less than expected", - "DB::Exception: Azure::Core::Http::TransportException: Connection was closed by the server while trying to read a response", - "DB::Exception: Azure::Core::Http::TransportException: Error while polling for socket ready read", - "Azure::Core::Http::TransportException, e.what() = Connection was closed by the server while trying to read a response", - "Azure::Core::Http::TransportException, e.what() = Connection closed before getting full response or response is less than expected", - "Azure::Core::Http::TransportException, e.what() = Connection was closed by the server while trying to read a response", - "Azure::Core::Http::TransportException, e.what() = Error while polling for socket ready read", - ] - retry = False - for error in retriable_errors: - if error in str(ex): - retry = True - print(f"Try num: {i}. Having retriable error: {ex}") - time.sleep(i) - break - if not retry or i == try_num - 1: - raise Exception(ex) - if query_on_retry is not None: - node.query(query_on_retry) - continue - - -def get_azure_file_content(filename, port): - container_name = "cont" - connection_string = ( - f"DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;" - f"AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;" - f"BlobEndpoint=http://127.0.0.1:{port}/devstoreaccount1;" - ) - blob_service_client = BlobServiceClient.from_connection_string( - str(connection_string) - ) - container_client = blob_service_client.get_container_client(container_name) - blob_client = container_client.get_blob_client(filename) - download_stream = blob_client.download_blob() - return download_stream.readall().decode("utf-8") - - -@pytest.fixture(autouse=True, scope="function") -def delete_all_files(cluster): - port = cluster.env_variables["AZURITE_PORT"] - connection_string = ( - f"DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;" - f"AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;" - f"BlobEndpoint=http://127.0.0.1:{port}/devstoreaccount1;" - ) - blob_service_client = BlobServiceClient.from_connection_string(connection_string) - containers = blob_service_client.list_containers() - for container in containers: - container_client = blob_service_client.get_container_client(container) - blob_list = container_client.list_blobs() - for blob in blob_list: - print(blob) - blob_client = container_client.get_blob_client(blob) - blob_client.delete_blob() - - assert len(list(container_client.list_blobs())) == 0 - - yield - - -def test_azure_partitioning_with_one_parameter(cluster): - # type: (ClickHouseCluster) -> None - node = cluster.instances["node"] # type: ClickHouseInstance - table_format = "column1 String, column2 String" - values = f"('Elizabeth', 'Gordon')" - path = "a/column1=Elizabeth/sample.csv" - - azure_query( - node, - f"INSERT INTO TABLE FUNCTION azureBlobStorage(azure_conf2, storage_account_url = '{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}'," - f" container='cont', blob_path='{path}', format='CSV', compression='auto', structure='{table_format}') VALUES {values}", - ) - - query = ( - f"SELECT column1, column2, _file, _path, _column1 FROM azureBlobStorage(azure_conf2, " - f"storage_account_url = '{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}', container='cont', " - f"blob_path='{path}', format='CSV', structure='{table_format}')" - ) - assert azure_query( - node, query, settings={"azure_blob_storage_hive_partitioning": 1} - ).splitlines() == [ - "Elizabeth\tGordon\tsample.csv\t{bucket}/{max_path}\tElizabeth".format( - bucket="cont", max_path=path - ) - ] - - query = ( - f"SELECT column2 FROM azureBlobStorage(azure_conf2, " - f"storage_account_url = '{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}', container='cont', " - f"blob_path='{path}', format='CSV', structure='{table_format}') WHERE column1=_column1;" - ) - assert azure_query( - node, query, settings={"azure_blob_storage_hive_partitioning": 1} - ).splitlines() == ["Gordon"] - - -def test_azure_partitioning_with_two_parameters(cluster): - # type: (ClickHouseCluster) -> None - node = cluster.instances["node"] # type: ClickHouseInstance - table_format = "column1 String, column2 String" - values_1 = f"('Elizabeth', 'Gordon')" - values_2 = f"('Emilia', 'Gregor')" - path = "a/column1=Elizabeth/column2=Gordon/sample.csv" - - azure_query( - node, - f"INSERT INTO TABLE FUNCTION azureBlobStorage(azure_conf2, storage_account_url = '{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}'," - f" container='cont', blob_path='{path}', format='CSV', compression='auto', structure='{table_format}') VALUES {values_1}, {values_2}", - ) - - query = ( - f"SELECT column1, column2, _file, _path, _column1, _column2 FROM azureBlobStorage(azure_conf2, " - f"storage_account_url = '{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}', container='cont', " - f"blob_path='{path}', format='CSV', structure='{table_format}') WHERE column1=_column1;" - ) - assert azure_query( - node, query, settings={"azure_blob_storage_hive_partitioning": 1} - ).splitlines() == [ - "Elizabeth\tGordon\tsample.csv\t{bucket}/{max_path}\tElizabeth\tGordon".format( - bucket="cont", max_path=path - ) - ] - - query = ( - f"SELECT column1 FROM azureBlobStorage(azure_conf2, " - f"storage_account_url = '{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}', container='cont', " - f"blob_path='{path}', format='CSV', structure='{table_format}') WHERE column2=_column2;" - ) - assert azure_query( - node, query, settings={"azure_blob_storage_hive_partitioning": 1} - ).splitlines() == ["Elizabeth"] - - query = ( - f"SELECT column1 FROM azureBlobStorage(azure_conf2, " - f"storage_account_url = '{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}', container='cont', " - f"blob_path='{path}', format='CSV', structure='{table_format}') WHERE column2=_column2 AND column1=_column1;" - ) - assert azure_query( - node, query, settings={"azure_blob_storage_hive_partitioning": 1} - ).splitlines() == ["Elizabeth"] - - -def test_azure_partitioning_without_setting(cluster): - # type: (ClickHouseCluster) -> None - node = cluster.instances["node"] # type: ClickHouseInstance - table_format = "column1 String, column2 String" - values_1 = f"('Elizabeth', 'Gordon')" - values_2 = f"('Emilia', 'Gregor')" - path = "a/column1=Elizabeth/column2=Gordon/sample.csv" - - azure_query( - node, - f"INSERT INTO TABLE FUNCTION azureBlobStorage(azure_conf2, storage_account_url = '{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}'," - f" container='cont', blob_path='{path}', format='CSV', compression='auto', structure='{table_format}') VALUES {values_1}, {values_2}", - ) - - query = ( - f"SELECT column1, column2, _file, _path, _column1, _column2 FROM azureBlobStorage(azure_conf2, " - f"storage_account_url = '{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}', container='cont', " - f"blob_path='{path}', format='CSV', structure='{table_format}') WHERE column1=_column1;" - ) - pattern = re.compile( - r"DB::Exception: Unknown expression identifier '.*' in scope.*", re.DOTALL - ) - - with pytest.raises(Exception, match=pattern): - azure_query(node, query, settings={"azure_blob_storage_hive_partitioning": 0}) diff --git a/tests/integration/test_hive_style_partitioning_hdfs_azure/test_hdfs.py b/tests/integration/test_hive_style_partitioning_hdfs_azure/test_hdfs.py deleted file mode 100644 index 4667d18688a..00000000000 --- a/tests/integration/test_hive_style_partitioning_hdfs_azure/test_hdfs.py +++ /dev/null @@ -1,87 +0,0 @@ -#!/usr/bin/env python3 - -import pytest - -from helpers.client import QueryRuntimeException -from helpers.cluster import ClickHouseCluster, is_arm -import re - -from helpers.cluster import ClickHouseCluster - -if is_arm(): - pytestmark = pytest.mark.skip - -cluster = ClickHouseCluster(__file__) -node1 = cluster.add_instance( - "node1", - main_configs=[ - "configs/macro_hdfs.xml", - "configs/schema_cache_hdfs.xml", - "configs/cluster_hdfs.xml", - ], - with_hdfs=True, -) - - -@pytest.fixture(scope="module") -def started_cluster(): - try: - cluster.start() - yield cluster - finally: - cluster.shutdown() - - -def test_hdfs_partitioning_with_one_parameter(started_cluster): - hdfs_api = started_cluster.hdfs_api - hdfs_api.write_data(f"/column0=Elizabeth/parquet_1", f"Elizabeth\tGordon\n") - assert hdfs_api.read_data(f"/column0=Elizabeth/parquet_1") == f"Elizabeth\tGordon\n" - - r = node1.query( - "SELECT _column0 FROM hdfs('hdfs://hdfs1:9000/column0=Elizabeth/parquet_1', 'TSV')", - settings={"hdfs_hive_partitioning": 1}, - ) - assert r == f"Elizabeth\n" - - -def test_hdfs_partitioning_with_two_parameters(started_cluster): - hdfs_api = started_cluster.hdfs_api - hdfs_api.write_data( - f"/column0=Elizabeth/column1=Gordon/parquet_2", f"Elizabeth\tGordon\n" - ) - assert ( - hdfs_api.read_data(f"/column0=Elizabeth/column1=Gordon/parquet_2") - == f"Elizabeth\tGordon\n" - ) - - r = node1.query( - "SELECT _column1 FROM hdfs('hdfs://hdfs1:9000/column0=Elizabeth/column1=Gordon/parquet_2', 'TSV');", - settings={"hdfs_hive_partitioning": 1}, - ) - assert r == f"Gordon\n" - - -def test_hdfs_partitioning_without_setting(started_cluster): - hdfs_api = started_cluster.hdfs_api - hdfs_api.write_data( - f"/column0=Elizabeth/column1=Gordon/parquet_2", f"Elizabeth\tGordon\n" - ) - assert ( - hdfs_api.read_data(f"/column0=Elizabeth/column1=Gordon/parquet_2") - == f"Elizabeth\tGordon\n" - ) - pattern = re.compile( - r"DB::Exception: Unknown expression identifier '.*' in scope.*", re.DOTALL - ) - - with pytest.raises(QueryRuntimeException, match=pattern): - node1.query( - f"SELECT _column1 FROM hdfs('hdfs://hdfs1:9000/column0=Elizabeth/column1=Gordon/parquet_2', 'TSV');", - settings={"hdfs_hive_partitioning": 0}, - ) - - -if __name__ == "__main__": - cluster.start() - input("Cluster created, press any key to destroy...") - cluster.shutdown() diff --git a/tests/integration/test_storage_azure_blob_storage/test.py b/tests/integration/test_storage_azure_blob_storage/test.py index 20b004a7605..893df6d23aa 100644 --- a/tests/integration/test_storage_azure_blob_storage/test.py +++ b/tests/integration/test_storage_azure_blob_storage/test.py @@ -5,6 +5,7 @@ import json import logging import os import io +import re import random import threading import time @@ -1462,3 +1463,112 @@ def test_insert_create_new_file(cluster): assert TSV(res) == TSV( "test_create_new_file.csv\t1\ntest_create_new_file.1.csv\t2\n" ) + + +def test_hive_partitioning_with_one_parameter(cluster): + # type: (ClickHouseCluster) -> None + node = cluster.instances["node"] # type: ClickHouseInstance + table_format = "column1 String, column2 String" + values = f"('Elizabeth', 'Gordon')" + path = "a/column1=Elizabeth/sample.csv" + + azure_query( + node, + f"INSERT INTO TABLE FUNCTION azureBlobStorage(azure_conf2, storage_account_url = '{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}'," + f" container='cont', blob_path='{path}', format='CSV', compression='auto', structure='{table_format}') VALUES {values}", + ) + + query = ( + f"SELECT column1, column2, _file, _path, _column1 FROM azureBlobStorage(azure_conf2, " + f"storage_account_url = '{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}', container='cont', " + f"blob_path='{path}', format='CSV', structure='{table_format}')" + ) + assert azure_query( + node, query, settings={"azure_blob_storage_hive_partitioning": 1} + ).splitlines() == [ + "Elizabeth\tGordon\tsample.csv\t{bucket}/{max_path}\tElizabeth".format( + bucket="cont", max_path=path + ) + ] + + query = ( + f"SELECT column2 FROM azureBlobStorage(azure_conf2, " + f"storage_account_url = '{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}', container='cont', " + f"blob_path='{path}', format='CSV', structure='{table_format}') WHERE column1=_column1;" + ) + assert azure_query( + node, query, settings={"azure_blob_storage_hive_partitioning": 1} + ).splitlines() == ["Gordon"] + + +def test_hive_partitioning_with_two_parameters(cluster): + # type: (ClickHouseCluster) -> None + node = cluster.instances["node"] # type: ClickHouseInstance + table_format = "column1 String, column2 String" + values_1 = f"('Elizabeth', 'Gordon')" + values_2 = f"('Emilia', 'Gregor')" + path = "a/column1=Elizabeth/column2=Gordon/sample.csv" + + azure_query( + node, + f"INSERT INTO TABLE FUNCTION azureBlobStorage(azure_conf2, storage_account_url = '{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}'," + f" container='cont', blob_path='{path}', format='CSV', compression='auto', structure='{table_format}') VALUES {values_1}, {values_2}", + ) + + query = ( + f"SELECT column1, column2, _file, _path, _column1, _column2 FROM azureBlobStorage(azure_conf2, " + f"storage_account_url = '{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}', container='cont', " + f"blob_path='{path}', format='CSV', structure='{table_format}') WHERE column1=_column1;" + ) + assert azure_query( + node, query, settings={"azure_blob_storage_hive_partitioning": 1} + ).splitlines() == [ + "Elizabeth\tGordon\tsample.csv\t{bucket}/{max_path}\tElizabeth\tGordon".format( + bucket="cont", max_path=path + ) + ] + + query = ( + f"SELECT column1 FROM azureBlobStorage(azure_conf2, " + f"storage_account_url = '{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}', container='cont', " + f"blob_path='{path}', format='CSV', structure='{table_format}') WHERE column2=_column2;" + ) + assert azure_query( + node, query, settings={"azure_blob_storage_hive_partitioning": 1} + ).splitlines() == ["Elizabeth"] + + query = ( + f"SELECT column1 FROM azureBlobStorage(azure_conf2, " + f"storage_account_url = '{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}', container='cont', " + f"blob_path='{path}', format='CSV', structure='{table_format}') WHERE column2=_column2 AND column1=_column1;" + ) + assert azure_query( + node, query, settings={"azure_blob_storage_hive_partitioning": 1} + ).splitlines() == ["Elizabeth"] + + +def test_hive_partitioning_without_setting(cluster): + # type: (ClickHouseCluster) -> None + node = cluster.instances["node"] # type: ClickHouseInstance + table_format = "column1 String, column2 String" + values_1 = f"('Elizabeth', 'Gordon')" + values_2 = f"('Emilia', 'Gregor')" + path = "a/column1=Elizabeth/column2=Gordon/sample.csv" + + azure_query( + node, + f"INSERT INTO TABLE FUNCTION azureBlobStorage(azure_conf2, storage_account_url = '{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}'," + f" container='cont', blob_path='{path}', format='CSV', compression='auto', structure='{table_format}') VALUES {values_1}, {values_2}", + ) + + query = ( + f"SELECT column1, column2, _file, _path, _column1, _column2 FROM azureBlobStorage(azure_conf2, " + f"storage_account_url = '{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}', container='cont', " + f"blob_path='{path}', format='CSV', structure='{table_format}') WHERE column1=_column1;" + ) + pattern = re.compile( + r"DB::Exception: Unknown expression identifier '.*' in scope.*", re.DOTALL + ) + + with pytest.raises(Exception, match=pattern): + azure_query(node, query, settings={"azure_blob_storage_hive_partitioning": 0}) diff --git a/tests/integration/test_storage_hdfs/test.py b/tests/integration/test_storage_hdfs/test.py index 47d8f44c0b7..8071b520a4f 100644 --- a/tests/integration/test_storage_hdfs/test.py +++ b/tests/integration/test_storage_hdfs/test.py @@ -3,6 +3,7 @@ import os import pytest import time from helpers.cluster import ClickHouseCluster, is_arm +from helpers.client import QueryRuntimeException from helpers.test_tools import TSV from pyhdfs import HdfsClient @@ -1180,6 +1181,54 @@ def test_respect_object_existence_on_partitioned_write(started_cluster): assert int(result) == 44 +def test_hive_partitioning_with_one_parameter(started_cluster): + hdfs_api = started_cluster.hdfs_api + hdfs_api.write_data(f"/column0=Elizabeth/parquet_1", f"Elizabeth\tGordon\n") + assert hdfs_api.read_data(f"/column0=Elizabeth/parquet_1") == f"Elizabeth\tGordon\n" + + r = node1.query( + "SELECT _column0 FROM hdfs('hdfs://hdfs1:9000/column0=Elizabeth/parquet_1', 'TSV')", + settings={"hdfs_hive_partitioning": 1}, + ) + assert r == f"Elizabeth\n" + + +def test_hive_partitioning_with_two_parameters(started_cluster): + hdfs_api = started_cluster.hdfs_api + hdfs_api.write_data( + f"/column0=Elizabeth/column1=Gordon/parquet_2", f"Elizabeth\tGordon\n" + ) + assert ( + hdfs_api.read_data(f"/column0=Elizabeth/column1=Gordon/parquet_2") + == f"Elizabeth\tGordon\n" + ) + + r = node1.query( + "SELECT _column1 FROM hdfs('hdfs://hdfs1:9000/column0=Elizabeth/column1=Gordon/parquet_2', 'TSV');", + settings={"hdfs_hive_partitioning": 1}, + ) + assert r == f"Gordon\n" + + +def test_hive_partitioning_without_setting(started_cluster): + hdfs_api = started_cluster.hdfs_api + hdfs_api.write_data( + f"/column0=Elizabeth/column1=Gordon/parquet_2", f"Elizabeth\tGordon\n" + ) + assert ( + hdfs_api.read_data(f"/column0=Elizabeth/column1=Gordon/parquet_2") + == f"Elizabeth\tGordon\n" + ) + pattern = re.compile( + r"DB::Exception: Unknown expression identifier '.*' in scope.*", re.DOTALL + ) + + with pytest.raises(QueryRuntimeException, match=pattern): + node1.query( + f"SELECT _column1 FROM hdfs('hdfs://hdfs1:9000/column0=Elizabeth/column1=Gordon/parquet_2', 'TSV');", + settings={"hdfs_hive_partitioning": 0}, + ) + if __name__ == "__main__": cluster.start() input("Cluster created, press any key to destroy...")