diff --git a/src/Formats/ReadSchemaUtils.cpp b/src/Formats/ReadSchemaUtils.cpp index 1ac660fdc9d..f065ddeb44d 100644 --- a/src/Formats/ReadSchemaUtils.cpp +++ b/src/Formats/ReadSchemaUtils.cpp @@ -52,6 +52,7 @@ ColumnsDescription readSchemaFromFormat( const String & format_name, const std::optional & format_settings, ReadBufferIterator & read_buffer_iterator, + bool retry, ContextPtr & context, std::unique_ptr & buf_out) { @@ -59,20 +60,27 @@ ColumnsDescription readSchemaFromFormat( if (FormatFactory::instance().checkIfFormatHasExternalSchemaReader(format_name)) { auto external_schema_reader = FormatFactory::instance().getExternalSchemaReader(format_name, context, format_settings); - names_and_types = external_schema_reader->readSchema(); + try + { + names_and_types = external_schema_reader->readSchema(); + } + catch (const DB::Exception & e) + { + throw Exception(ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, "Cannot extract table structure from {} format file. Error: {}", format_name, e.message()); + } } else if (FormatFactory::instance().checkIfFormatHasSchemaReader(format_name)) { std::string exception_messages; SchemaReaderPtr schema_reader; std::unique_ptr buf; - while ((buf = read_buffer_iterator.next())) + while ((buf = read_buffer_iterator())) { if (buf->eof()) { auto exception_message = fmt::format("Cannot extract table structure from {} format file, file is empty\n", format_name); - if (read_buffer_iterator.isSingle()) + if (!retry) throw Exception(ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, exception_message); exception_messages += "\n" + exception_message; @@ -90,7 +98,7 @@ ColumnsDescription readSchemaFromFormat( { auto exception_message = getCurrentExceptionMessage(false); - if (read_buffer_iterator.isSingle() || !isRetryableSchemaInferenceError(getCurrentExceptionCode())) + if (!retry || !isRetryableSchemaInferenceError(getCurrentExceptionCode())) throw Exception(ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, "Cannot extract table structure from {} format file. Error: {}", format_name, exception_message); exception_messages += "\n" + exception_message; @@ -122,10 +130,10 @@ ColumnsDescription readSchemaFromFormat( return ColumnsDescription(names_and_types); } -ColumnsDescription readSchemaFromFormat(const String & format_name, const std::optional & format_settings, ReadBufferIterator & read_buffer_iterator, ContextPtr & context) +ColumnsDescription readSchemaFromFormat(const String & format_name, const std::optional & format_settings, ReadBufferIterator & read_buffer_iterator, bool retry, ContextPtr & context) { std::unique_ptr buf_out; - return readSchemaFromFormat(format_name, format_settings, read_buffer_iterator, context, buf_out); + return readSchemaFromFormat(format_name, format_settings, read_buffer_iterator, retry, context, buf_out); } DataTypePtr makeNullableRecursivelyAndCheckForNothing(DataTypePtr type) diff --git a/src/Formats/ReadSchemaUtils.h b/src/Formats/ReadSchemaUtils.h index 33ae49983f3..b7558d5c22b 100644 --- a/src/Formats/ReadSchemaUtils.h +++ b/src/Formats/ReadSchemaUtils.h @@ -6,64 +6,7 @@ namespace DB { -class ReadBufferIterator -{ -public: - virtual std::unique_ptr next() = 0; - virtual bool isSingle() const = 0; - virtual ~ReadBufferIterator() = default; -}; - -template -class ReadBufferListIterator : public ReadBufferIterator -{ -public: - ReadBufferListIterator(ListIterator begin_, ListIterator end_, BufferCreator buffer_creator_) - : it(begin_), end(end_), buffer_creator(buffer_creator_), is_single(std::distance(begin_, end_) == 1) - { - } - - std::unique_ptr next() override - { - if (it == end) - return nullptr; - - auto res = buffer_creator(it); - ++it; - return res; - } - - bool isSingle() const override { return is_single; } - -private: - ListIterator it; - ListIterator end; - BufferCreator buffer_creator; - bool is_single; -}; - -template -class ReadBufferSingleIterator : public ReadBufferIterator -{ -public: - ReadBufferSingleIterator(BufferCreator buffer_creator_) : buffer_creator(buffer_creator_) - { - } - - std::unique_ptr next() override - { - if (done) - return nullptr; - done = true; - return buffer_creator(); - } - - bool isSingle() const override { return true; } - -private: - BufferCreator buffer_creator; - bool done = false; -}; +using ReadBufferIterator = std::function()>; /// Try to determine the schema of the data in specifying format. /// For formats that have an external schema reader, it will @@ -79,6 +22,7 @@ ColumnsDescription readSchemaFromFormat( const String & format_name, const std::optional & format_settings, ReadBufferIterator & read_buffer_iterator, + bool retry, ContextPtr & context); /// If ReadBuffer is created, it will be written to buf_out. @@ -86,6 +30,7 @@ ColumnsDescription readSchemaFromFormat( const String & format_name, const std::optional & format_settings, ReadBufferIterator & read_buffer_iterator, + bool retry, ContextPtr & context, std::unique_ptr & buf_out); diff --git a/src/Storages/HDFS/StorageHDFS.cpp b/src/Storages/HDFS/StorageHDFS.cpp index 090d9942522..87d6c297b5a 100644 --- a/src/Storages/HDFS/StorageHDFS.cpp +++ b/src/Storages/HDFS/StorageHDFS.cpp @@ -189,14 +189,15 @@ ColumnsDescription StorageHDFS::getTableStructureFromData( "specify table structure manually", format); - auto read_buffer_creator = [&, uri_without_path = uri_without_path](std::vector::const_iterator & it) -> std::unique_ptr + ReadBufferIterator read_buffer_iterator = [&, uri_without_path = uri_without_path, it = paths.begin()]() mutable -> std::unique_ptr { + if (it == paths.end()) + return nullptr; auto compression = chooseCompressionMethod(*it, compression_method); return wrapReadBufferWithCompressionMethod( - std::make_unique(uri_without_path, *it, ctx->getGlobalContext()->getConfigRef()), compression); + std::make_unique(uri_without_path, *it++, ctx->getGlobalContext()->getConfigRef()), compression); }; - ReadBufferListIterator read_buffer_iterator(paths.cbegin(), paths.cend(), read_buffer_creator); - return readSchemaFromFormat(format, std::nullopt, read_buffer_iterator, ctx); + return readSchemaFromFormat(format, std::nullopt, read_buffer_iterator, paths.size() > 1, ctx); } class HDFSSource::DisclosedGlobIterator::Impl diff --git a/src/Storages/StorageFile.cpp b/src/Storages/StorageFile.cpp index 97958f21c8f..3c2cadc9fd8 100644 --- a/src/Storages/StorageFile.cpp +++ b/src/Storages/StorageFile.cpp @@ -237,7 +237,7 @@ ColumnsDescription StorageFile::getTableStructureFromFileDescriptor(ContextPtr c /// in case of file descriptor we have a stream of data and we cannot /// start reading data from the beginning after reading some data for /// schema inference. - auto read_buffer_creator = [&]() + ReadBufferIterator read_buffer_iterator = [&]() { /// We will use PeekableReadBuffer to create a checkpoint, so we need a place /// where we can store the original read buffer. @@ -247,8 +247,7 @@ ColumnsDescription StorageFile::getTableStructureFromFileDescriptor(ContextPtr c return read_buf; }; - ReadBufferSingleIterator read_buffer_iterator(read_buffer_creator); - auto columns = readSchemaFromFormat(format_name, format_settings, read_buffer_iterator, context, peekable_read_buffer_from_fd); + auto columns = readSchemaFromFormat(format_name, format_settings, read_buffer_iterator, false, context, peekable_read_buffer_from_fd); if (peekable_read_buffer_from_fd) { /// If we have created read buffer in readSchemaFromFormat we should rollback to checkpoint. @@ -282,13 +281,15 @@ ColumnsDescription StorageFile::getTableStructureFromFile( "table structure manually", format); - auto read_buffer_creator = [&](std::vector::const_iterator & it) + ReadBufferIterator read_buffer_iterator = [&, it = paths.begin()]() mutable -> std::unique_ptr { - return createReadBuffer(*it, false, "File", -1, compression_method, context); + if (it == paths.end()) + return nullptr; + + return createReadBuffer(*it++, false, "File", -1, compression_method, context); }; - ReadBufferListIterator read_buffer_iterator(paths.begin(), paths.end(), read_buffer_creator); - return readSchemaFromFormat(format, format_settings, read_buffer_iterator, context); + return readSchemaFromFormat(format, format_settings, read_buffer_iterator, paths.size() > 1, context); } bool StorageFile::isColumnOriented() const diff --git a/src/Storages/StorageS3.cpp b/src/Storages/StorageS3.cpp index 234bd4cd52c..0acee24c1b2 100644 --- a/src/Storages/StorageS3.cpp +++ b/src/Storages/StorageS3.cpp @@ -962,30 +962,29 @@ ColumnsDescription StorageS3::getTableStructureFromDataImpl( ContextPtr ctx) { auto file_iterator = createFileIterator(client_auth, {client_auth.uri.key}, is_key_with_globs, distributed_processing, ctx); - std::vector keys; - String key = (*file_iterator)(); - while (!key.empty()) - { - keys.push_back(key); - key = (*file_iterator)(); - } - if (keys.empty() && !FormatFactory::instance().checkIfFormatHasExternalSchemaReader(format)) - throw Exception( - ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, - "Cannot extract table structure from {} format file, because there are no files with provided path in S3. You must specify " - "table structure manually", - format); - - auto read_buffer_creator = [&](std::vector::iterator & it) + ReadBufferIterator read_buffer_iterator = [&, first = false]() mutable -> std::unique_ptr { + auto key = (*file_iterator)(); + if (key.empty()) + { + if (first) + throw Exception( + ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, + "Cannot extract table structure from {} format file, because there are no files with provided path in S3. You must specify " + "table structure manually", + format); + return nullptr; + } + + first = false; return wrapReadBufferWithCompressionMethod( std::make_unique( - client_auth.client, client_auth.uri.bucket, *it, max_single_read_retries, ctx->getReadSettings()), - chooseCompressionMethod(*it, compression_method)); + client_auth.client, client_auth.uri.bucket, key, max_single_read_retries, ctx->getReadSettings()), + chooseCompressionMethod(key, compression_method)); }; - ReadBufferListIterator read_buffer_iterator(keys.begin(), keys.end(), read_buffer_creator); - return readSchemaFromFormat(format, format_settings, read_buffer_iterator, ctx); + + return readSchemaFromFormat(format, format_settings, read_buffer_iterator, is_key_with_globs, ctx); } diff --git a/src/Storages/StorageURL.cpp b/src/Storages/StorageURL.cpp index 8ff197099b6..96dd20ab9f0 100644 --- a/src/Storages/StorageURL.cpp +++ b/src/Storages/StorageURL.cpp @@ -552,8 +552,11 @@ ColumnsDescription IStorageURLBase::getTableStructureFromData( } - auto read_buffer_creator = [&](std::vector::const_iterator & it) + ReadBufferIterator read_buffer_iterator = [&, it = urls_to_check.cbegin()]() mutable -> std::unique_ptr { + if (it == urls_to_check.cend()) + return nullptr; + return StorageURLSource::getFirstAvailableURLReadBuffer( it, urls_to_check.cend(), @@ -570,8 +573,7 @@ ColumnsDescription IStorageURLBase::getTableStructureFromData( context->getSettingsRef().max_download_threads); }; - ReadBufferListIterator read_buffer_iterator(urls_to_check.cbegin(), urls_to_check.cend(), read_buffer_creator); - return readSchemaFromFormat(format, format_settings, read_buffer_iterator, context); + return readSchemaFromFormat(format, format_settings, read_buffer_iterator, urls_to_check.size() > 1, context); } bool IStorageURLBase::isColumnOriented() const diff --git a/src/TableFunctions/TableFunctionFormat.cpp b/src/TableFunctions/TableFunctionFormat.cpp index f63b4bbee67..9a2de735353 100644 --- a/src/TableFunctions/TableFunctionFormat.cpp +++ b/src/TableFunctions/TableFunctionFormat.cpp @@ -49,12 +49,11 @@ void TableFunctionFormat::parseArguments(const ASTPtr & ast_function, ContextPtr ColumnsDescription TableFunctionFormat::getActualTableStructure(ContextPtr context) const { - auto read_buffer_creator = [&]() + ReadBufferIterator read_buffer_iterator = [&]() { return std::make_unique(data); }; - ReadBufferSingleIterator read_buffer_iterator(read_buffer_creator); - return readSchemaFromFormat(format, std::nullopt, read_buffer_iterator, context); + return readSchemaFromFormat(format, std::nullopt, read_buffer_iterator, false, context); } Block TableFunctionFormat::parseData(ColumnsDescription columns, ContextPtr context) const