#pragma once #include #include #include #include #include #include #include #include #include #include #include #include namespace DB { class Block; struct Settings; struct FormatFactorySettings; class ReadBuffer; class WriteBuffer; class IProcessor; using ProcessorPtr = std::shared_ptr; class IInputFormat; class IOutputFormat; struct RowInputFormatParams; struct RowOutputFormatParams; class ISchemaReader; class IExternalSchemaReader; using SchemaReaderPtr = std::shared_ptr; using ExternalSchemaReaderPtr = std::shared_ptr; using InputFormatPtr = std::shared_ptr; using OutputFormatPtr = std::shared_ptr; template struct Memory; FormatSettings getFormatSettings(ContextPtr context); template FormatSettings getFormatSettings(ContextPtr context, const T & settings); /** Allows to create an IInputFormat or IOutputFormat by the name of the format. * Note: format and compression are independent things. */ class FormatFactory final : private boost::noncopyable { public: /// This callback allows to perform some additional actions after reading a single row. /// It's initial purpose was to extract payload for virtual columns from Kafka Consumer ReadBuffer. using ReadCallback = std::function; /** Fast reading data from buffer and save result to memory. * Reads at least min_chunk_bytes and some more until the end of the chunk, depends on the format. * Used in ParallelParsingBlockInputStream. */ using FileSegmentationEngine = std::function( ReadBuffer & buf, DB::Memory> & memory, size_t min_chunk_bytes)>; /// This callback allows to perform some additional actions after writing a single row. /// It's initial purpose was to flush Kafka message for each row. using WriteCallback = std::function; private: using InputCreator = std::function; using OutputCreator = std::function; /// Some input formats can have non trivial readPrefix() and readSuffix(), /// so in some cases there is no possibility to use parallel parsing. /// The checker should return true if parallel parsing should be disabled. using NonTrivialPrefixAndSuffixChecker = std::function; using SchemaReaderCreator = std::function; using ExternalSchemaReaderCreator = std::function; struct Creators { InputCreator input_creator; OutputCreator output_creator; FileSegmentationEngine file_segmentation_engine; SchemaReaderCreator schema_reader_creator; ExternalSchemaReaderCreator external_schema_reader_creator; bool supports_parallel_formatting{false}; bool is_column_oriented{false}; NonTrivialPrefixAndSuffixChecker non_trivial_prefix_and_suffix_checker; }; using FormatsDictionary = std::unordered_map; using FileExtensionFormats = std::unordered_map; public: static FormatFactory & instance(); InputFormatPtr getInput( const String & name, ReadBuffer & buf, const Block & sample, ContextPtr context, UInt64 max_block_size, const std::optional & format_settings = std::nullopt) const; InputFormatPtr getInputFormat( const String & name, ReadBuffer & buf, const Block & sample, ContextPtr context, UInt64 max_block_size, const std::optional & format_settings = std::nullopt) const; /// Checks all preconditions. Returns ordinary format if parallel formatting cannot be done. OutputFormatPtr getOutputFormatParallelIfPossible( const String & name, WriteBuffer & buf, const Block & sample, ContextPtr context, WriteCallback callback = {}, const std::optional & format_settings = std::nullopt) const; OutputFormatPtr getOutputFormat( const String & name, WriteBuffer & buf, const Block & sample, ContextPtr context, WriteCallback callback = {}, const std::optional & _format_settings = std::nullopt) const; String getContentType( const String & name, ContextPtr context, const std::optional & format_settings = std::nullopt) const; SchemaReaderPtr getSchemaReader( const String & name, ReadBuffer & buf, ContextPtr context, const std::optional & format_settings = std::nullopt) const; ExternalSchemaReaderPtr getExternalSchemaReader( const String & name, ContextPtr context, const std::optional & format_settings = std::nullopt) const; void registerFileSegmentationEngine(const String & name, FileSegmentationEngine file_segmentation_engine); void registerNonTrivialPrefixAndSuffixChecker(const String & name, NonTrivialPrefixAndSuffixChecker non_trivial_prefix_and_suffix_checker); /// Register format by its name. void registerInputFormat(const String & name, InputCreator input_creator); void registerOutputFormat(const String & name, OutputCreator output_creator); /// Register file extension for format void registerFileExtension(const String & extension, const String & format_name); String getFormatFromFileName(String file_name); /// Register schema readers for format its name. void registerSchemaReader(const String & name, SchemaReaderCreator schema_reader_creator); void registerExternalSchemaReader(const String & name, ExternalSchemaReaderCreator external_schema_reader_creator); void markOutputFormatSupportsParallelFormatting(const String & name); void markFormatAsColumnOriented(const String & name); bool checkIfFormatIsColumnOriented(const String & name); bool checkIfFormatHasSchemaReader(const String & name); bool checkIfFormatHasExternalSchemaReader(const String & name); bool checkIfFormatHasAnySchemaReader(const String & name); const FormatsDictionary & getAllFormats() const { return dict; } bool isInputFormat(const String & name) const; bool isOutputFormat(const String & name) const; private: FormatsDictionary dict; FileExtensionFormats file_extension_formats; const Creators & getCreators(const String & name) const; }; }