#include #if USE_AZURE_BLOB_STORAGE #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include using namespace Azure::Storage::Blobs; namespace CurrentMetrics { extern const Metric ObjectStorageAzureThreads; extern const Metric ObjectStorageAzureThreadsActive; } namespace DB { namespace ErrorCodes { extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; extern const int BAD_ARGUMENTS; extern const int DATABASE_ACCESS_DENIED; extern const int CANNOT_COMPILE_REGEXP; extern const int CANNOT_EXTRACT_TABLE_STRUCTURE; } namespace { static const std::unordered_set required_configuration_keys = { "blob_path", "container", }; static const std::unordered_set optional_configuration_keys = { "format", "compression", "compression_method", "account_name", "account_key", "connection_string", "storage_account_url", }; bool isConnectionString(const std::string & candidate) { return candidate.starts_with("DefaultEndpointsProtocol"); } void processNamedCollectionResult(StorageAzure::Configuration & configuration, const NamedCollection & collection) { validateNamedCollection(collection, required_configuration_keys, optional_configuration_keys); if (collection.has("connection_string")) { configuration.connection_url = collection.get("connection_string"); configuration.is_connection_string = true; } if (collection.has("storage_account_url")) { configuration.connection_url = collection.get("storage_account_url"); configuration.is_connection_string = false; } configuration.container = collection.get("container"); configuration.blob_path = collection.get("blob_path"); if (collection.has("account_name")) configuration.account_name = collection.get("account_name"); if (collection.has("account_key")) configuration.account_key = collection.get("account_key"); configuration.format = collection.getOrDefault("format", configuration.format); configuration.compression_method = collection.getOrDefault("compression_method", collection.getOrDefault("compression", "auto")); } } StorageAzure::Configuration StorageAzure::getConfiguration(ASTs & engine_args, ContextPtr local_context, bool get_format_from_file) { LOG_INFO(&Poco::Logger::get("StorageAzure"), "get_format_from_file = {}", get_format_from_file); StorageAzure::Configuration configuration; /// Supported signatures: /// /// Azure(connection_string|storage_account_url, container_name, blobpath, [account_name, account_key, format, compression]) /// if (auto named_collection = tryGetNamedCollectionWithOverrides(engine_args, local_context)) { processNamedCollectionResult(configuration, *named_collection); configuration.blobs_paths = {configuration.blob_path}; if (configuration.format == "auto" && get_format_from_file) configuration.format = FormatFactory::instance().getFormatFromFileName(configuration.blob_path, true); return configuration; } if (engine_args.size() < 3 || engine_args.size() > 7) throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "Storage Azure requires 3 to 7 arguments: " "Azure(connection_string|storage_account_url, container_name, blobpath, [account_name, account_key, format, compression])"); for (auto & engine_arg : engine_args) engine_arg = evaluateConstantExpressionOrIdentifierAsLiteral(engine_arg, local_context); std::unordered_map engine_args_to_idx; configuration.connection_url = checkAndGetLiteralArgument(engine_args[0], "connection_string/storage_account_url"); configuration.is_connection_string = isConnectionString(configuration.connection_url); configuration.container = checkAndGetLiteralArgument(engine_args[1], "container"); configuration.blob_path = checkAndGetLiteralArgument(engine_args[2], "blobpath"); LOG_INFO(&Poco::Logger::get("StorageAzure"), "connection_url = {}", configuration.connection_url); LOG_INFO(&Poco::Logger::get("StorageAzure"), "container = {}", configuration.container); LOG_INFO(&Poco::Logger::get("StorageAzure"), "blobpath = {}", configuration.blob_path); auto is_format_arg = [] (const std::string & s) -> bool { return s == "auto" || FormatFactory::instance().getAllFormats().contains(s); }; if (engine_args.size() == 4) { //'c1 UInt64, c2 UInt64 auto fourth_arg = checkAndGetLiteralArgument(engine_args[3], "format/account_name"); if (is_format_arg(fourth_arg)) { configuration.format = fourth_arg; } else { throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unknown format or account name specified without account key"); } } else if (engine_args.size() == 5) { auto fourth_arg = checkAndGetLiteralArgument(engine_args[3], "format/account_name"); if (is_format_arg(fourth_arg)) { configuration.format = fourth_arg; configuration.compression_method = checkAndGetLiteralArgument(engine_args[4], "compression"); } else { configuration.account_name = fourth_arg; configuration.account_key = checkAndGetLiteralArgument(engine_args[4], "account_key"); } } else if (engine_args.size() == 6) { auto fourth_arg = checkAndGetLiteralArgument(engine_args[3], "format/account_name"); if (fourth_arg == "auto" || FormatFactory::instance().getAllFormats().contains(fourth_arg)) { throw Exception(ErrorCodes::BAD_ARGUMENTS, "Format and compression must be last arguments"); } else { configuration.account_name = fourth_arg; configuration.account_key = checkAndGetLiteralArgument(engine_args[4], "account_key"); auto sixth_arg = checkAndGetLiteralArgument(engine_args[5], "format/account_name"); if (!is_format_arg(sixth_arg)) throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unknown format {}", sixth_arg); configuration.format = sixth_arg; } } else if (engine_args.size() == 7) { auto fourth_arg = checkAndGetLiteralArgument(engine_args[3], "format/account_name"); if (fourth_arg == "auto" || FormatFactory::instance().getAllFormats().contains(fourth_arg)) { throw Exception(ErrorCodes::BAD_ARGUMENTS, "Format and compression must be last arguments"); } else { configuration.account_name = fourth_arg; configuration.account_key = checkAndGetLiteralArgument(engine_args[4], "account_key"); auto sixth_arg = checkAndGetLiteralArgument(engine_args[5], "format/account_name"); if (!is_format_arg(sixth_arg)) throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unknown format {}", sixth_arg); configuration.format = sixth_arg; configuration.compression_method = checkAndGetLiteralArgument(engine_args[6], "compression"); } } configuration.blobs_paths = {configuration.blob_path}; LOG_INFO(&Poco::Logger::get("StorageAzure"), "get_format_from_file = {}", get_format_from_file); if (configuration.format == "auto" && get_format_from_file) configuration.format = FormatFactory::instance().getFormatFromFileName(configuration.blob_path, true); return configuration; } void registerStorageAzure(StorageFactory & factory) { factory.registerStorage("Azure", [](const StorageFactory::Arguments & args) { auto & engine_args = args.engine_args; if (engine_args.empty()) throw Exception(ErrorCodes::BAD_ARGUMENTS, "External data source must have arguments"); auto configuration = StorageAzure::getConfiguration(engine_args, args.getLocalContext()); auto client = StorageAzure::createClient(configuration); // Use format settings from global server context + settings from // the SETTINGS clause of the create query. Settings from current // session and user are ignored. std::optional format_settings; if (args.storage_def->settings) { FormatFactorySettings user_format_settings; // Apply changed settings from global context, but ignore the // unknown ones, because we only have the format settings here. const auto & changes = args.getContext()->getSettingsRef().changes(); for (const auto & change : changes) { if (user_format_settings.has(change.name)) user_format_settings.set(change.name, change.value); } // Apply changes from SETTINGS clause, with validation. user_format_settings.applyChanges(args.storage_def->settings->changes); format_settings = getFormatSettings(args.getContext(), user_format_settings); } else { format_settings = getFormatSettings(args.getContext()); } ASTPtr partition_by; if (args.storage_def->partition_by) partition_by = args.storage_def->partition_by->clone(); const auto & context_settings = args.getContext()->getSettingsRef(); auto settings = std::make_unique(); settings->max_single_part_upload_size = context_settings.azure_max_single_part_upload_size; settings->max_single_read_retries = context_settings.azure_max_single_read_retries; settings->list_object_keys_size = static_cast(context_settings.azure_list_object_keys_size); return std::make_shared( std::move(configuration), std::make_unique("AzureStorage", std::move(client), std::move(settings)), args.getContext(), args.table_id, args.columns, args.constraints, args.comment, format_settings, partition_by); }, { .supports_settings = true, .supports_sort_order = true, // for partition by .supports_schema_inference = true, .source_access_type = AccessType::AZURE, }); } AzureClientPtr StorageAzure::createClient(StorageAzure::Configuration configuration) { AzureClientPtr result; if (configuration.is_connection_string) { LOG_INFO(&Poco::Logger::get("StorageAzure"), "createClient is_connection_string "); result = std::make_unique(BlobContainerClient::CreateFromConnectionString(configuration.connection_url, configuration.container)); result->CreateIfNotExists(); } else { if (configuration.account_name.has_value() && configuration.account_key.has_value()) { auto storage_shared_key_credential = std::make_shared(*configuration.account_name, *configuration.account_key); auto blob_service_client = std::make_unique(configuration.connection_url, storage_shared_key_credential); try { result = std::make_unique(blob_service_client->CreateBlobContainer(configuration.container).Value); } catch (const Azure::Storage::StorageException & e) { if (e.StatusCode == Azure::Core::Http::HttpStatusCode::Conflict) { auto final_url = configuration.connection_url + (configuration.connection_url.back() == '/' ? "" : "/") + configuration.container; result = std::make_unique(final_url, storage_shared_key_credential); } else { throw; } } } else { auto managed_identity_credential = std::make_shared(); auto blob_service_client = std::make_unique(configuration.connection_url, managed_identity_credential); try { result = std::make_unique(blob_service_client->CreateBlobContainer(configuration.container).Value); } catch (const Azure::Storage::StorageException & e) { if (e.StatusCode == Azure::Core::Http::HttpStatusCode::Conflict) { auto final_url = configuration.connection_url + (configuration.connection_url.back() == '/' ? "" : "/") + configuration.container; result = std::make_unique(final_url, managed_identity_credential); } else { throw; } } } } return result; } Poco::URI StorageAzure::Configuration::getConnectionURL() const { if (!is_connection_string) return Poco::URI(connection_url); auto parsed_connection_string = Azure::Storage::_internal::ParseConnectionString(connection_url); return Poco::URI(parsed_connection_string.BlobServiceUrl.GetAbsoluteUrl()); } StorageAzure::StorageAzure( const Configuration & configuration_, std::unique_ptr && object_storage_, ContextPtr context, const StorageID & table_id_, const ColumnsDescription & columns_, const ConstraintsDescription & constraints_, const String & comment, std::optional format_settings_, ASTPtr partition_by_) : IStorage(table_id_) , name("AzureBlobStorage") , configuration(configuration_) , object_storage(std::move(object_storage_)) , distributed_processing(false) , format_settings(format_settings_) , partition_by(partition_by_) { FormatFactory::instance().checkFormatName(configuration.format); context->getGlobalContext()->getRemoteHostFilter().checkURL(configuration.getConnectionURL()); StorageInMemoryMetadata storage_metadata; if (columns_.empty()) { auto columns = getTableStructureFromDataImpl(context); storage_metadata.setColumns(columns); } else storage_metadata.setColumns(columns_); storage_metadata.setConstraints(constraints_); storage_metadata.setComment(comment); setInMemoryMetadata(storage_metadata); StoredObjects objects; for (const auto & key : configuration.blobs_paths) objects.emplace_back(key); for (auto obj : objects) { LOG_INFO(&Poco::Logger::get("StorageAzure"), "constructor obj.remote_paths = {}", obj.remote_path); if (object_storage->exists(obj)) { LOG_INFO(&Poco::Logger::get("StorageAzure"), "constructor exists obj.remote_paths = {}", obj.remote_path); // auto read_buffer = object_storage->readObject(obj); // LOG_INFO(&Poco::Logger::get("StorageAzure"), "constructor read size obj.remote_paths = {} , size = {}", obj.remote_path, read_buffer->getFileSize()); } } auto default_virtuals = NamesAndTypesList{ {"_path", std::make_shared(std::make_shared())}, {"_file", std::make_shared(std::make_shared())}}; auto columns = storage_metadata.getSampleBlock().getNamesAndTypesList(); virtual_columns = getVirtualsForStorage(columns, default_virtuals); for (const auto & column : virtual_columns) virtual_block.insert({column.type->createColumn(), column.type, column.name}); } void StorageAzure::truncate(const ASTPtr &, const StorageMetadataPtr &, ContextPtr, TableExclusiveLockHolder &) { if (configuration.withGlobs()) { throw Exception( ErrorCodes::DATABASE_ACCESS_DENIED, "S3 key '{}' contains globs, so the table is in readonly mode", configuration.blob_path); } StoredObjects objects; for (const auto & key : configuration.blobs_paths) objects.emplace_back(key); object_storage->removeObjectsIfExist(objects); } namespace { class StorageAzureSink : public SinkToStorage { public: StorageAzureSink( const String & format, const Block & sample_block_, ContextPtr context, std::optional format_settings_, const CompressionMethod compression_method, AzureObjectStorage * object_storage, const String & blob_path) : SinkToStorage(sample_block_) , sample_block(sample_block_) , format_settings(format_settings_) { StoredObject object(blob_path); write_buf = wrapWriteBufferWithCompressionMethod(object_storage->writeObject(object, WriteMode::Rewrite), compression_method, 3); writer = FormatFactory::instance().getOutputFormatParallelIfPossible(format, *write_buf, sample_block, context, format_settings); } String getName() const override { return "StorageS3Sink"; } void consume(Chunk chunk) override { std::lock_guard lock(cancel_mutex); if (cancelled) return; writer->write(getHeader().cloneWithColumns(chunk.detachColumns())); } void onCancel() override { std::lock_guard lock(cancel_mutex); finalize(); cancelled = true; } void onException() override { std::lock_guard lock(cancel_mutex); finalize(); } void onFinish() override { std::lock_guard lock(cancel_mutex); finalize(); } private: void finalize() { if (!writer) return; try { writer->finalize(); writer->flush(); write_buf->finalize(); } catch (...) { /// Stop ParallelFormattingOutputFormat correctly. writer.reset(); write_buf->finalize(); throw; } } Block sample_block; std::optional format_settings; std::unique_ptr write_buf; OutputFormatPtr writer; bool cancelled = false; std::mutex cancel_mutex; }; class PartitionedStorageAzureSink : public PartitionedSink { public: PartitionedStorageAzureSink( const ASTPtr & partition_by, const String & format_, const Block & sample_block_, ContextPtr context_, std::optional format_settings_, const CompressionMethod compression_method_, AzureObjectStorage * object_storage_, const String & blob_) : PartitionedSink(partition_by, context_, sample_block_) , format(format_) , sample_block(sample_block_) , context(context_) , compression_method(compression_method_) , object_storage(object_storage_) , blob(blob_) , format_settings(format_settings_) { } SinkPtr createSinkForPartition(const String & partition_id) override { auto partition_key = replaceWildcards(blob, partition_id); validateKey(partition_key); return std::make_shared( format, sample_block, context, format_settings, compression_method, object_storage, partition_key ); } private: const String format; const Block sample_block; const ContextPtr context; const CompressionMethod compression_method; AzureObjectStorage * object_storage; const String blob; const std::optional format_settings; ExpressionActionsPtr partition_by_expr; static void validateKey(const String & str) { validatePartitionKey(str, true); } }; } Pipe StorageAzure::read( const Names & column_names, const StorageSnapshotPtr & storage_snapshot, SelectQueryInfo & query_info, ContextPtr local_context, QueryProcessingStage::Enum /*processed_stage*/, size_t max_block_size, size_t num_streams) { if (partition_by && configuration.withWildcard()) throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Reading from a partitioned Azure storage is not implemented yet"); Pipes pipes; std::unordered_set column_names_set(column_names.begin(), column_names.end()); std::vector requested_virtual_columns; for (const auto & virtual_column : getVirtuals()) { if (column_names_set.contains(virtual_column.name)) requested_virtual_columns.push_back(virtual_column); } std::shared_ptr iterator_wrapper; if (configuration.withGlobs()) { /// Iterate through disclosed globs and make a source for each file iterator_wrapper = std::make_shared( object_storage.get(), configuration.container, std::nullopt, configuration.blob_path, query_info.query, virtual_block, local_context, nullptr); } else { iterator_wrapper = std::make_shared( object_storage.get(), configuration.container, configuration.blobs_paths, std::nullopt, query_info.query, virtual_block, local_context, nullptr); } ColumnsDescription columns_description; Block block_for_format; if (supportsSubsetOfColumns()) { auto fetch_columns = column_names; const auto & virtuals = getVirtuals(); std::erase_if( fetch_columns, [&](const String & col) { return std::any_of(virtuals.begin(), virtuals.end(), [&](const NameAndTypePair & virtual_col){ return col == virtual_col.name; }); }); if (fetch_columns.empty()) fetch_columns.push_back(ExpressionActions::getSmallestColumn(storage_snapshot->metadata->getColumns().getAllPhysical()).name); columns_description = storage_snapshot->getDescriptionForColumns(fetch_columns); block_for_format = storage_snapshot->getSampleBlockForColumns(columns_description.getNamesOfPhysical()); } else { columns_description = storage_snapshot->metadata->getColumns(); block_for_format = storage_snapshot->metadata->getSampleBlock(); } for (size_t i = 0; i < num_streams; ++i) { pipes.emplace_back(std::make_shared( requested_virtual_columns, configuration.format, getName(), block_for_format, local_context, format_settings, columns_description, max_block_size, configuration.compression_method, object_storage.get(), configuration.container, iterator_wrapper)); } return Pipe::unitePipes(std::move(pipes)); } SinkToStoragePtr StorageAzure::write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context) { auto sample_block = metadata_snapshot->getSampleBlock(); auto chosen_compression_method = chooseCompressionMethod(configuration.blobs_paths.back(), configuration.compression_method); auto insert_query = std::dynamic_pointer_cast(query); auto partition_by_ast = insert_query ? (insert_query->partition_by ? insert_query->partition_by : partition_by) : nullptr; bool is_partitioned_implementation = partition_by_ast && configuration.withWildcard(); if (is_partitioned_implementation) { return std::make_shared( partition_by_ast, configuration.format, sample_block, local_context, format_settings, chosen_compression_method, object_storage.get(), configuration.blobs_paths.back()); } else { if (configuration.withGlobs()) throw Exception(ErrorCodes::DATABASE_ACCESS_DENIED, "Azure key '{}' contains globs, so the table is in readonly mode", configuration.blob_path); bool truncate_in_insert = local_context->getSettingsRef().azure_truncate_on_insert; if (!truncate_in_insert && object_storage->exists(StoredObject(configuration.blob_path))) { if (local_context->getSettingsRef().azure_create_new_file_on_insert) { size_t index = configuration.blobs_paths.size(); const auto & first_key = configuration.blobs_paths[0]; auto pos = first_key.find_first_of('.'); String new_key; do { new_key = first_key.substr(0, pos) + "." + std::to_string(index) + (pos == std::string::npos ? "" : first_key.substr(pos)); ++index; } while (object_storage->exists(StoredObject(new_key))); configuration.blobs_paths.push_back(new_key); } else { throw Exception( ErrorCodes::BAD_ARGUMENTS, "Object in bucket {} with key {} already exists. " "If you want to overwrite it, enable setting azure_truncate_on_insert, if you " "want to create a new file on each insert, enable setting azure_create_new_file_on_insert", configuration.container, configuration.blobs_paths.back()); } } return std::make_shared( configuration.format, sample_block, local_context, format_settings, chosen_compression_method, object_storage.get(), configuration.blobs_paths.back()); } } NamesAndTypesList StorageAzure::getVirtuals() const { return virtual_columns; } bool StorageAzure::supportsPartitionBy() const { return true; } bool StorageAzure::supportsSubcolumns() const { return FormatFactory::instance().checkIfFormatSupportsSubcolumns(configuration.format); } bool StorageAzure::supportsSubsetOfColumns() const { return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(configuration.format); } bool StorageAzure::prefersLargeBlocks() const { return FormatFactory::instance().checkIfOutputFormatPrefersLargeBlocks(configuration.format); } bool StorageAzure::parallelizeOutputAfterReading(ContextPtr context) const { return FormatFactory::instance().checkParallelizeOutputAfterReading(configuration.format, context); } static void addPathToVirtualColumns(Block & block, const String & path, size_t idx) { if (block.has("_path")) block.getByName("_path").column->assumeMutableRef().insert(path); if (block.has("_file")) { auto pos = path.find_last_of('/'); assert(pos != std::string::npos); auto file = path.substr(pos + 1); block.getByName("_file").column->assumeMutableRef().insert(file); } block.getByName("_idx").column->assumeMutableRef().insert(idx); } StorageAzureSource::Iterator::Iterator( AzureObjectStorage * object_storage_, const std::string & container_, std::optional keys_, std::optional blob_path_with_globs_, ASTPtr query_, const Block & virtual_header_, ContextPtr context_, RelativePathsWithMetadata * outer_blobs_) : WithContext(context_) , object_storage(object_storage_) , container(container_) , keys(keys_) , blob_path_with_globs(blob_path_with_globs_) , query(query_) , virtual_header(virtual_header_) , outer_blobs(outer_blobs_) { if (keys.has_value() && blob_path_with_globs.has_value()) throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot specify keys and glob simulatenously it's a bug"); if (!keys.has_value() && !blob_path_with_globs.has_value()) throw Exception(ErrorCodes::LOGICAL_ERROR, "Both keys and glob mask are not specified"); if (keys) { Strings all_keys = *keys; blobs_with_metadata.emplace(); /// Create a virtual block with one row to construct filter if (query && virtual_header && !all_keys.empty()) { /// Append "idx" column as the filter result virtual_header.insert({ColumnUInt64::create(), std::make_shared(), "_idx"}); auto block = virtual_header.cloneEmpty(); addPathToVirtualColumns(block, fs::path(container) / all_keys.front(), 0); VirtualColumnUtils::prepareFilterBlockWithQuery(query, getContext(), block, filter_ast); if (filter_ast) { block = virtual_header.cloneEmpty(); for (size_t i = 0; i < all_keys.size(); ++i) addPathToVirtualColumns(block, fs::path(container) / all_keys[i], i); VirtualColumnUtils::filterBlockWithQuery(query, block, getContext(), filter_ast); const auto & idxs = typeid_cast(*block.getByName("_idx").column); Strings filtered_keys; filtered_keys.reserve(block.rows()); for (UInt64 idx : idxs.getData()) filtered_keys.emplace_back(std::move(all_keys[idx])); all_keys = std::move(filtered_keys); } } for (auto && key : all_keys) { ObjectMetadata object_metadata = object_storage->getObjectMetadata(key); total_size += object_metadata.size_bytes; blobs_with_metadata->emplace_back(RelativePathWithMetadata{key, object_metadata}); if (outer_blobs) outer_blobs->emplace_back(blobs_with_metadata->back()); } } else { LOG_DEBUG(&Poco::Logger::get("DEBUG"), "GLOBS BRANCH"); const String key_prefix = blob_path_with_globs->substr(0, blob_path_with_globs->find_first_of("*?{")); /// We don't have to list bucket, because there is no asterisks. if (key_prefix.size() == blob_path_with_globs->size()) { ObjectMetadata object_metadata = object_storage->getObjectMetadata(*blob_path_with_globs); blobs_with_metadata->emplace_back(*blob_path_with_globs, object_metadata); if (outer_blobs) outer_blobs->emplace_back(blobs_with_metadata->back()); return; } LOG_DEBUG(&Poco::Logger::get("DEBUG"), "KEY PREFIX {}", key_prefix); object_storage_iterator = object_storage->iterate(key_prefix); LOG_DEBUG(&Poco::Logger::get("DEBUG"), "BLOBS BLOBS{}", *blob_path_with_globs); LOG_DEBUG(&Poco::Logger::get("DEBUG"), "REGEXP PATTERN {}", makeRegexpPatternFromGlobs(*blob_path_with_globs)); matcher = std::make_unique(makeRegexpPatternFromGlobs(*blob_path_with_globs)); if (!matcher->ok()) throw Exception(ErrorCodes::CANNOT_COMPILE_REGEXP, "Cannot compile regex from glob ({}): {}", *blob_path_with_globs, matcher->error()); recursive = *blob_path_with_globs == "/**" ? true : false; } } RelativePathWithMetadata StorageAzureSource::Iterator::next() { if (is_finished) return {}; if (keys) { size_t current_index = index.fetch_add(1, std::memory_order_relaxed); if (current_index >= blobs_with_metadata->size()) { is_finished = true; return {}; } return (*blobs_with_metadata)[current_index]; } else { if (!blobs_with_metadata || index >= blobs_with_metadata->size()) { RelativePathsWithMetadata new_batch; while (new_batch.empty()) { if (object_storage_iterator->isValid()) { new_batch = object_storage_iterator->currentBatch(); object_storage_iterator->nextBatch(); } else { is_finished = true; return {}; } for (auto it = new_batch.begin(); it != new_batch.end();) { if (!recursive && !re2::RE2::FullMatch(it->relative_path, *matcher)) it = new_batch.erase(it); else ++it; } } index.store(0, std::memory_order_relaxed); if (!is_initialized) { createFilterAST(new_batch.front().relative_path); is_initialized = true; } if (filter_ast) { auto block = virtual_header.cloneEmpty(); for (size_t i = 0; i < new_batch.size(); ++i) addPathToVirtualColumns(block, fs::path(container) / new_batch[i].relative_path, i); VirtualColumnUtils::filterBlockWithQuery(query, block, getContext(), filter_ast); const auto & idxs = typeid_cast(*block.getByName("_idx").column); blob_path_with_globs.reset(); blob_path_with_globs.emplace(); for (UInt64 idx : idxs.getData()) { total_size.fetch_add(new_batch[idx].metadata.size_bytes, std::memory_order_relaxed); blobs_with_metadata->emplace_back(std::move(new_batch[idx])); if (outer_blobs) outer_blobs->emplace_back(blobs_with_metadata->back()); } } else { if (outer_blobs) outer_blobs->insert(outer_blobs->end(), new_batch.begin(), new_batch.end()); blobs_with_metadata = std::move(new_batch); for (const auto & [_, info] : *blobs_with_metadata) total_size.fetch_add(info.size_bytes, std::memory_order_relaxed); } } size_t current_index = index.fetch_add(1, std::memory_order_relaxed); return (*blobs_with_metadata)[current_index]; } } size_t StorageAzureSource::Iterator::getTotalSize() const { return total_size.load(std::memory_order_relaxed); } void StorageAzureSource::Iterator::createFilterAST(const String & any_key) { if (!query || !virtual_header) return; /// Create a virtual block with one row to construct filter /// Append "idx" column as the filter result virtual_header.insert({ColumnUInt64::create(), std::make_shared(), "_idx"}); auto block = virtual_header.cloneEmpty(); addPathToVirtualColumns(block, fs::path(container) / any_key, 0); VirtualColumnUtils::prepareFilterBlockWithQuery(query, getContext(), block, filter_ast); } Chunk StorageAzureSource::generate() { while (true) { if (isCancelled() || !reader) { if (reader) reader->cancel(); break; } Chunk chunk; if (reader->pull(chunk)) { UInt64 num_rows = chunk.getNumRows(); const auto & file_path = reader.getPath(); size_t total_size = file_iterator->getTotalSize(); if (num_rows && total_size) { updateRowsProgressApprox( *this, chunk, total_size, total_rows_approx_accumulated, total_rows_count_times, total_rows_approx_max); } for (const auto & virtual_column : requested_virtual_columns) { if (virtual_column.name == "_path") { chunk.addColumn(virtual_column.type->createColumnConst(num_rows, file_path)->convertToFullColumnIfConst()); } else if (virtual_column.name == "_file") { size_t last_slash_pos = file_path.find_last_of('/'); auto column = virtual_column.type->createColumnConst(num_rows, file_path.substr(last_slash_pos + 1)); chunk.addColumn(column->convertToFullColumnIfConst()); } } return chunk; } assert(reader_future.valid()); reader = reader_future.get(); if (!reader) break; /// Even if task is finished the thread may be not freed in pool. /// So wait until it will be freed before scheduling a new task. create_reader_pool.wait(); reader_future = createReaderAsync(); } return {}; } Block StorageAzureSource::getHeader(Block sample_block, const std::vector & requested_virtual_columns) { for (const auto & virtual_column : requested_virtual_columns) sample_block.insert({virtual_column.type->createColumn(), virtual_column.type, virtual_column.name}); return sample_block; } StorageAzureSource::StorageAzureSource( const std::vector & requested_virtual_columns_, const String & format_, String name_, const Block & sample_block_, ContextPtr context_, std::optional format_settings_, const ColumnsDescription & columns_, UInt64 max_block_size_, String compression_hint_, AzureObjectStorage * object_storage_, const String & container_, std::shared_ptr file_iterator_) :ISource(getHeader(sample_block_, requested_virtual_columns_)) , WithContext(context_) , requested_virtual_columns(requested_virtual_columns_) , format(format_) , name(std::move(name_)) , sample_block(sample_block_) , format_settings(format_settings_) , columns_desc(columns_) , max_block_size(max_block_size_) , compression_hint(compression_hint_) , object_storage(std::move(object_storage_)) , container(container_) , file_iterator(file_iterator_) , create_reader_pool(CurrentMetrics::ObjectStorageAzureThreads, CurrentMetrics::ObjectStorageAzureThreadsActive, 1) , create_reader_scheduler(threadPoolCallbackRunner(create_reader_pool, "AzureReader")) { reader = createReader(); if (reader) reader_future = createReaderAsync(); } StorageAzureSource::~StorageAzureSource() { create_reader_pool.wait(); } String StorageAzureSource::getName() const { return name; } StorageAzureSource::ReaderHolder StorageAzureSource::createReader() { auto [current_key, info] = file_iterator->next(); LOG_DEBUG(log, "KEY {} SIZE {}", current_key, info.size_bytes); if (current_key.empty()) return {}; size_t object_size = info.size_bytes != 0 ? info.size_bytes : object_storage->getObjectMetadata(current_key).size_bytes; auto compression_method = chooseCompressionMethod(current_key, compression_hint); auto read_buf = createAzureReadBuffer(current_key, object_size); auto input_format = FormatFactory::instance().getInput( format, *read_buf, sample_block, getContext(), max_block_size, format_settings, std::nullopt, std::nullopt, /* is_remote_fs */ true, compression_method); LOG_DEBUG(log, "FORMAT {}", format); QueryPipelineBuilder builder; builder.init(Pipe(input_format)); if (columns_desc.hasDefaults()) { builder.addSimpleTransform( [&](const Block & header) { return std::make_shared(header, columns_desc, *input_format, getContext()); }); } auto pipeline = std::make_unique(QueryPipelineBuilder::getPipeline(std::move(builder))); auto current_reader = std::make_unique(*pipeline); return ReaderHolder{fs::path(container) / current_key, std::move(read_buf), std::move(pipeline), std::move(current_reader)}; } std::future StorageAzureSource::createReaderAsync() { return create_reader_scheduler([this] { return createReader(); }, Priority{}); } std::unique_ptr StorageAzureSource::createAzureReadBuffer(const String & key, size_t object_size) { auto read_settings = getContext()->getReadSettings().adjustBufferSize(object_size); read_settings.enable_filesystem_cache = false; //auto download_buffer_size = getContext()->getSettings().max_download_buffer_size; //const bool object_too_small = object_size <= 2 * download_buffer_size; ///// Create a read buffer that will prefetch the first ~1 MB of the file. ///// When reading lots of tiny files, this prefetching almost doubles the throughput. ///// For bigger files, parallel reading is more useful. //if (object_too_small && read_settings.remote_fs_method == RemoteFSReadMethod::threadpool) //{ // LOG_TRACE(log, "Downloading object {} of size {} from S3 with initial prefetch", key, object_size); // return object_storage->readObjects({StoredObject(key)}, read_settings, {}, object_size); //} return object_storage->readObject(StoredObject(key), read_settings, {}, object_size); } ColumnsDescription StorageAzure::getTableStructureFromDataImpl(ContextPtr ctx) { RelativePathsWithMetadata read_keys; std::shared_ptr file_iterator; if (configuration.withGlobs()) { file_iterator = std::make_shared( object_storage.get(), configuration.container, std::nullopt, configuration.blob_path, nullptr, virtual_block, ctx, &read_keys); } else { file_iterator = std::make_shared( object_storage.get(), configuration.container, configuration.blobs_paths, std::nullopt, nullptr, virtual_block, ctx, &read_keys); } std::optional columns_from_cache; size_t prev_read_keys_size = read_keys.size(); if (ctx->getSettingsRef().schema_inference_use_cache_for_azure) columns_from_cache = tryGetColumnsFromCache(read_keys.begin(), read_keys.end(), ctx); ReadBufferIterator read_buffer_iterator = [&, first = true](ColumnsDescription & cached_columns) mutable -> std::unique_ptr { auto [key, metadata] = file_iterator->next(); if (key.empty()) { if (first) throw Exception( ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, "Cannot extract table structure from {} format file, because there are no files with provided path " "in S3. You must specify table structure manually", configuration.format); return nullptr; } /// S3 file iterator could get new keys after new iteration, check them in schema cache. if (ctx->getSettingsRef().schema_inference_use_cache_for_azure && read_keys.size() > prev_read_keys_size) { columns_from_cache = tryGetColumnsFromCache(read_keys.begin() + prev_read_keys_size, read_keys.end(), ctx); prev_read_keys_size = read_keys.size(); if (columns_from_cache) { cached_columns = *columns_from_cache; return nullptr; } } first = false; int zstd_window_log_max = static_cast(ctx->getSettingsRef().zstd_window_log_max); return wrapReadBufferWithCompressionMethod( object_storage->readObject(StoredObject(key), ctx->getReadSettings(), {}, metadata.size_bytes), chooseCompressionMethod(key, configuration.compression_method), zstd_window_log_max); }; ColumnsDescription columns; if (columns_from_cache) columns = *columns_from_cache; else columns = readSchemaFromFormat(configuration.format, format_settings, read_buffer_iterator, configuration.withGlobs(), ctx); if (ctx->getSettingsRef().schema_inference_use_cache_for_azure) addColumnsToCache(read_keys, columns, configuration.format, ctx); return columns; } std::optional StorageAzure::tryGetColumnsFromCache( const RelativePathsWithMetadata::const_iterator & begin, const RelativePathsWithMetadata::const_iterator & end, const ContextPtr & ctx) { auto & schema_cache = getSchemaCache(ctx); for (auto it = begin; it < end; ++it) { auto get_last_mod_time = [&] -> time_t { return it->metadata.last_modified->epochTime(); }; auto host_and_bucket = configuration.connection_url + '/' + configuration.container; String source = host_and_bucket + '/' + it->relative_path; auto cache_key = getKeyForSchemaCache(source, configuration.format, format_settings, ctx); auto columns = schema_cache.tryGet(cache_key, get_last_mod_time); if (columns) return columns; } return std::nullopt; } void StorageAzure::addColumnsToCache( const RelativePathsWithMetadata & keys, const ColumnsDescription & columns, const String & format_name, const ContextPtr & ctx) { auto host_and_bucket = configuration.connection_url + '/' + configuration.container; Strings sources; sources.reserve(keys.size()); std::transform(keys.begin(), keys.end(), std::back_inserter(sources), [&](const auto & elem){ return host_and_bucket + '/' + elem.relative_path; }); auto cache_keys = getKeysForSchemaCache(sources, format_name, format_settings, ctx); auto & schema_cache = getSchemaCache(ctx); schema_cache.addMany(cache_keys, columns); } SchemaCache & StorageAzure::getSchemaCache(const ContextPtr & ctx) { static SchemaCache schema_cache(ctx->getConfigRef().getUInt("schema_inference_cache_max_elements_for_azure", DEFAULT_SCHEMA_CACHE_ELEMENTS)); return schema_cache; } } #endif