ClickHouse/dbms/src/Formats/FormatFactory.h

203 lines
7.5 KiB
C++
Raw Normal View History

#pragma once
#include <Core/Types.h>
2020-02-03 10:02:52 +00:00
#include <Columns/IColumn.h>
#include <DataStreams/IBlockStream_fwd.h>
2019-10-01 10:48:46 +00:00
#include <IO/BufferWithOwnMemory.h>
#include <functional>
#include <memory>
#include <unordered_map>
#include <boost/noncopyable.hpp>
namespace DB
{
class Block;
class Context;
struct FormatSettings;
class ReadBuffer;
class WriteBuffer;
2019-03-26 18:28:37 +00:00
class IProcessor;
using ProcessorPtr = std::shared_ptr<IProcessor>;
2019-02-19 18:41:18 +00:00
class IInputFormat;
class IOutputFormat;
struct RowInputFormatParams;
2019-02-19 18:41:18 +00:00
using InputFormatPtr = std::shared_ptr<IInputFormat>;
using OutputFormatPtr = std::shared_ptr<IOutputFormat>;
/** Allows to create an IBlockInputStream or IBlockOutputStream by the name of the format.
* Note: format and compression are independent things.
*/
class FormatFactory final : private boost::noncopyable
{
public:
2019-06-14 17:19:02 +00:00
/// This callback allows to perform some additional actions after reading a single row.
/// It's initial purpose was to extract payload for virtual columns from Kafka Consumer ReadBuffer.
using ReadCallback = std::function<void()>;
2019-10-01 10:48:46 +00:00
/** Fast reading data from buffer and save result to memory.
2019-11-18 19:25:17 +00:00
* Reads at least min_chunk_bytes and some more until the end of the chunk, depends on the format.
* Used in ParallelParsingBlockInputStream.
2019-10-01 10:48:46 +00:00
*/
using FileSegmentationEngine = std::function<bool(
ReadBuffer & buf,
DB::Memory<> & memory,
2019-11-18 19:25:17 +00:00
size_t min_chunk_bytes)>;
2019-10-01 10:48:46 +00:00
/// This callback allows to perform some additional actions after writing a single row.
/// It's initial purpose was to flush Kafka message for each row.
2020-02-03 10:02:52 +00:00
using WriteCallback = std::function<void(
const Columns & columns,
size_t row)>;
private:
using InputCreator = std::function<BlockInputStreamPtr(
ReadBuffer & buf,
const Block & sample,
2019-02-10 16:55:12 +00:00
UInt64 max_block_size,
ReadCallback callback,
const FormatSettings & settings)>;
using OutputCreator = std::function<BlockOutputStreamPtr(
WriteBuffer & buf,
const Block & sample,
WriteCallback callback,
const FormatSettings & settings)>;
2019-02-19 18:41:18 +00:00
using InputProcessorCreator = std::function<InputFormatPtr(
ReadBuffer & buf,
const Block & header,
const RowInputFormatParams & params,
const FormatSettings & settings)>;
using OutputProcessorCreator = std::function<OutputFormatPtr(
WriteBuffer & buf,
const Block & sample,
WriteCallback callback,
2019-02-19 18:41:18 +00:00
const FormatSettings & settings)>;
2019-08-02 14:41:19 +00:00
struct Creators
{
2019-10-01 10:48:46 +00:00
InputCreator input_creator;
2019-08-02 14:41:19 +00:00
OutputCreator output_creator;
InputProcessorCreator input_processor_creator;
OutputProcessorCreator output_processor_creator;
2019-10-01 10:48:46 +00:00
FileSegmentationEngine file_segmentation_engine;
2019-08-02 14:41:19 +00:00
};
using FormatsDictionary = std::unordered_map<String, Creators>;
public:
static FormatFactory & instance();
BlockInputStreamPtr getInput(
const String & name,
ReadBuffer & buf,
const Block & sample,
const Context & context,
UInt64 max_block_size,
ReadCallback callback = {}) const;
BlockOutputStreamPtr getOutput(const String & name, WriteBuffer & buf,
const Block & sample, const Context & context, WriteCallback callback = {}) const;
InputFormatPtr getInputFormat(
const String & name,
ReadBuffer & buf,
const Block & sample,
const Context & context,
UInt64 max_block_size,
ReadCallback callback = {}) const;
2019-02-19 18:41:18 +00:00
OutputFormatPtr getOutputFormat(
const String & name, WriteBuffer & buf, const Block & sample, const Context & context, WriteCallback callback = {}) const;
2019-02-19 18:41:18 +00:00
/// Register format by its name.
void registerInputFormat(const String & name, InputCreator input_creator);
void registerOutputFormat(const String & name, OutputCreator output_creator);
2019-10-01 10:48:46 +00:00
void registerFileSegmentationEngine(const String & name, FileSegmentationEngine file_segmentation_engine);
2019-02-19 18:41:18 +00:00
void registerInputFormatProcessor(const String & name, InputProcessorCreator input_creator);
void registerOutputFormatProcessor(const String & name, OutputProcessorCreator output_creator);
2019-08-02 14:41:19 +00:00
const FormatsDictionary & getAllFormats() const
2018-07-20 15:59:11 +00:00
{
2019-08-02 14:41:19 +00:00
return dict;
}
private:
/// FormatsDictionary dict;
2019-08-02 14:41:19 +00:00
FormatsDictionary dict;
FormatFactory();
2019-08-02 14:41:19 +00:00
const Creators & getCreators(const String & name) const;
};
2019-12-15 06:34:43 +00:00
/// Formats for both input/output.
void registerInputFormatNative(FormatFactory & factory);
void registerOutputFormatNative(FormatFactory & factory);
void registerInputFormatProcessorNative(FormatFactory & factory);
void registerOutputFormatProcessorNative(FormatFactory & factory);
void registerInputFormatProcessorRowBinary(FormatFactory & factory);
void registerOutputFormatProcessorRowBinary(FormatFactory & factory);
void registerInputFormatProcessorTabSeparated(FormatFactory & factory);
void registerOutputFormatProcessorTabSeparated(FormatFactory & factory);
void registerInputFormatProcessorValues(FormatFactory & factory);
void registerOutputFormatProcessorValues(FormatFactory & factory);
void registerInputFormatProcessorCSV(FormatFactory & factory);
void registerOutputFormatProcessorCSV(FormatFactory & factory);
void registerInputFormatProcessorTSKV(FormatFactory & factory);
void registerOutputFormatProcessorTSKV(FormatFactory & factory);
void registerInputFormatProcessorJSONEachRow(FormatFactory & factory);
void registerOutputFormatProcessorJSONEachRow(FormatFactory & factory);
void registerInputFormatProcessorJSONCompactEachRow(FormatFactory & factory);
void registerOutputFormatProcessorJSONCompactEachRow(FormatFactory & factory);
void registerInputFormatProcessorParquet(FormatFactory & factory);
void registerInputFormatProcessorORC(FormatFactory & factory);
void registerOutputFormatProcessorParquet(FormatFactory & factory);
void registerInputFormatProcessorProtobuf(FormatFactory & factory);
void registerOutputFormatProcessorProtobuf(FormatFactory & factory);
void registerInputFormatProcessorAvro(FormatFactory & factory);
void registerOutputFormatProcessorAvro(FormatFactory & factory);
2019-12-15 06:34:43 +00:00
void registerInputFormatProcessorTemplate(FormatFactory & factory);
void registerOutputFormatProcessorTemplate(FormatFactory &factory);
/// File Segmentation Engines for parallel reading
void registerFileSegmentationEngineTabSeparated(FormatFactory & factory);
void registerFileSegmentationEngineCSV(FormatFactory & factory);
void registerFileSegmentationEngineJSONEachRow(FormatFactory & factory);
/// Output only (presentational) formats.
void registerOutputFormatNull(FormatFactory & factory);
void registerOutputFormatProcessorPretty(FormatFactory & factory);
void registerOutputFormatProcessorPrettyCompact(FormatFactory & factory);
void registerOutputFormatProcessorPrettySpace(FormatFactory & factory);
void registerOutputFormatProcessorVertical(FormatFactory & factory);
void registerOutputFormatProcessorJSON(FormatFactory & factory);
void registerOutputFormatProcessorJSONCompact(FormatFactory & factory);
void registerOutputFormatProcessorJSONEachRowWithProgress(FormatFactory & factory);
void registerOutputFormatProcessorXML(FormatFactory & factory);
void registerOutputFormatProcessorODBCDriver(FormatFactory & factory);
void registerOutputFormatProcessorODBCDriver2(FormatFactory & factory);
void registerOutputFormatProcessorNull(FormatFactory & factory);
void registerOutputFormatProcessorMySQLWrite(FormatFactory & factory);
/// Input only formats.
void registerInputFormatProcessorCapnProto(FormatFactory & factory);
}