#pragma once #include #include #include #include #include #include #include namespace DB { class Block; class Context; struct FormatSettings; class ReadBuffer; class WriteBuffer; class IProcessor; using ProcessorPtr = std::shared_ptr; class IInputFormat; class IOutputFormat; struct RowInputFormatParams; using InputFormatPtr = std::shared_ptr; using OutputFormatPtr = std::shared_ptr; /** Allows to create an IBlockInputStream or IBlockOutputStream by the name of the format. * Note: format and compression are independent things. */ class FormatFactory final : private boost::noncopyable { public: /// This callback allows to perform some additional actions after reading a single row. /// It's initial purpose was to extract payload for virtual columns from Kafka Consumer ReadBuffer. using ReadCallback = std::function; /** Fast reading data from buffer and save result to memory. * Reads at least min_chunk_size bytes and some more until the end of the chunk, depends on the format. * Used in SharedReadBuffer. */ using FileSegmentationEngine = std::function & memory, size_t & used_size, size_t min_chunk_size)>; /// This callback allows to perform some additional actions after writing a single row. /// It's initial purpose was to flush Kafka message for each row. using WriteCallback = std::function; private: using InputCreator = std::function; using OutputCreator = std::function; using InputProcessorCreator = std::function; using OutputProcessorCreator = std::function; struct Creators { InputCreator input_creator; OutputCreator output_creator; InputProcessorCreator input_processor_creator; OutputProcessorCreator output_processor_creator; FileSegmentationEngine file_segmentation_engine; }; using FormatsDictionary = std::unordered_map; public: static FormatFactory & instance(); BlockInputStreamPtr getInput( const String & name, ReadBuffer & buf, const Block & sample, const Context & context, UInt64 max_block_size, ReadCallback callback = {}) const; BlockOutputStreamPtr getOutput(const String & name, WriteBuffer & buf, const Block & sample, const Context & context, WriteCallback callback = {}) const; InputFormatPtr getInputFormat( const String & name, ReadBuffer & buf, const Block & sample, const Context & context, UInt64 max_block_size, ReadCallback callback = {}) const; OutputFormatPtr getOutputFormat( const String & name, WriteBuffer & buf, const Block & sample, const Context & context, WriteCallback callback = {}) const; /// Register format by its name. void registerInputFormat(const String & name, InputCreator input_creator); void registerOutputFormat(const String & name, OutputCreator output_creator); void registerFileSegmentationEngine(const String & name, FileSegmentationEngine file_segmentation_engine); void registerInputFormatProcessor(const String & name, InputProcessorCreator input_creator); void registerOutputFormatProcessor(const String & name, OutputProcessorCreator output_creator); const FormatsDictionary & getAllFormats() const { return dict; } private: /// FormatsDictionary dict; FormatsDictionary dict; FormatFactory(); const Creators & getCreators(const String & name) const; }; }