#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #if USE_CAPNP #include #endif #include namespace DB { namespace ErrorCodes { extern const int FORMAT_IS_NOT_SUITABLE_FOR_INPUT; extern const int UNKNOWN_FORMAT; } BlockInputStreamPtr FormatFactory::getInput(const String & name, ReadBuffer & buf, const Block & sample, const Context & context, size_t max_block_size) const { const Settings & settings = context.getSettingsRef(); auto wrap_row_stream = [&](auto && row_stream) { return std::make_shared(std::move(row_stream), sample, max_block_size, settings.input_format_allow_errors_num, settings.input_format_allow_errors_ratio); }; if (name == "Native") { return std::make_shared(buf, sample, 0); } else if (name == "RowBinary") { return wrap_row_stream(std::make_shared(buf, sample)); } else if (name == "TabSeparated" || name == "TSV") /// TSV is a synonym/alias for the original TabSeparated format { return wrap_row_stream(std::make_shared(buf, sample)); } else if (name == "TabSeparatedWithNames" || name == "TSVWithNames") { return wrap_row_stream(std::make_shared(buf, sample, true)); } else if (name == "TabSeparatedWithNamesAndTypes" || name == "TSVWithNamesAndTypes") { return wrap_row_stream(std::make_shared(buf, sample, true, true)); } else if (name == "Values") { return wrap_row_stream(std::make_shared(buf, sample, context, settings.input_format_values_interpret_expressions)); } else if (name == "CSV" || name == "CSVWithNames") { char csv_delimiter = settings.format_csv_delimiter; bool with_names = name == "CSVWithNames"; return wrap_row_stream(std::make_shared(buf, sample, csv_delimiter, with_names)); } else if (name == "TSKV") { return wrap_row_stream(std::make_shared(buf, sample, settings.input_format_skip_unknown_fields)); } else if (name == "JSONEachRow") { return wrap_row_stream(std::make_shared(buf, sample, settings.input_format_skip_unknown_fields)); } #if USE_CAPNP else if (name == "CapnProto") { std::vector tokens; auto schema_and_root = settings.format_schema.toString(); boost::split(tokens, schema_and_root, boost::is_any_of(":")); if (tokens.size() != 2) throw Exception("Format CapnProto requires 'format_schema' setting to have a schema_file:root_object format, e.g. 'schema.capnp:Message'"); const String & schema_dir = context.getFormatSchemaPath(); return wrap_row_stream(std::make_shared(buf, sample, schema_dir, tokens[0], tokens[1])); } #endif else if (name == "TabSeparatedRaw" || name == "TSVRaw" || name == "Pretty" || name == "PrettyCompact" || name == "PrettyCompactMonoBlock" || name == "PrettySpace" || name == "PrettyNoEscapes" || name == "PrettyCompactNoEscapes" || name == "PrettySpaceNoEscapes" || name == "Vertical" || name == "VerticalRaw" || name == "Null" || name == "JSON" || name == "JSONCompact" || name == "XML" || name == "ODBCDriver") { throw Exception("Format " + name + " is not suitable for input", ErrorCodes::FORMAT_IS_NOT_SUITABLE_FOR_INPUT); } else throw Exception("Unknown format " + name, ErrorCodes::UNKNOWN_FORMAT); } static BlockOutputStreamPtr getOutputImpl(const String & name, WriteBuffer & buf, const Block & sample, const Context & context) { const Settings & settings = context.getSettingsRef(); FormatSettingsJSON json_settings(settings.output_format_json_quote_64bit_integers, settings.output_format_json_quote_denormals); if (name == "Native") return std::make_shared(buf, 0, sample); else if (name == "RowBinary") return std::make_shared(std::make_shared(buf), sample); else if (name == "TabSeparated" || name == "TSV") return std::make_shared(std::make_shared(buf, sample), sample); else if (name == "TabSeparatedWithNames" || name == "TSVWithNames") return std::make_shared(std::make_shared(buf, sample, true), sample); else if (name == "TabSeparatedWithNamesAndTypes" || name == "TSVWithNamesAndTypes") return std::make_shared(std::make_shared(buf, sample, true, true), sample); else if (name == "TabSeparatedRaw" || name == "TSVRaw") return std::make_shared(std::make_shared(buf, sample), sample); else if (name == "CSV" || name == "CSVWithNames") { char csv_delimiter = settings.format_csv_delimiter; bool with_names = name == "CSVWithNames"; return std::make_shared(std::make_shared(buf, sample, csv_delimiter, with_names), sample); } else if (name == "Pretty") return std::make_shared(buf, sample, false, settings.output_format_pretty_max_rows, context); else if (name == "PrettyCompact") return std::make_shared(buf, sample, false, settings.output_format_pretty_max_rows, context); else if (name == "PrettyCompactMonoBlock") { BlockOutputStreamPtr dst = std::make_shared(buf, sample, false, settings.output_format_pretty_max_rows, context); auto res = std::make_shared(dst, settings.output_format_pretty_max_rows, 0); res->disableFlush(); return res; } else if (name == "PrettySpace") return std::make_shared(buf, sample, false, settings.output_format_pretty_max_rows, context); else if (name == "PrettyNoEscapes") return std::make_shared(buf, sample, true, settings.output_format_pretty_max_rows, context); else if (name == "PrettyCompactNoEscapes") return std::make_shared(buf, sample, true, settings.output_format_pretty_max_rows, context); else if (name == "PrettySpaceNoEscapes") return std::make_shared(buf, sample, true, settings.output_format_pretty_max_rows, context); else if (name == "Vertical") return std::make_shared(std::make_shared( buf, sample, settings.output_format_pretty_max_rows), sample); else if (name == "VerticalRaw") return std::make_shared(std::make_shared( buf, sample, settings.output_format_pretty_max_rows), sample); else if (name == "Values") return std::make_shared(std::make_shared(buf), sample); else if (name == "JSON") return std::make_shared(std::make_shared( buf, sample, settings.output_format_write_statistics, json_settings), sample); else if (name == "JSONCompact") return std::make_shared(std::make_shared( buf, sample, settings.output_format_write_statistics, json_settings), sample); else if (name == "JSONEachRow") return std::make_shared(std::make_shared( buf, sample, json_settings), sample); else if (name == "XML") return std::make_shared(std::make_shared(buf, sample, settings.output_format_write_statistics), sample); else if (name == "TSKV") return std::make_shared(std::make_shared(buf, sample), sample); else if (name == "ODBCDriver") return std::make_shared(buf, sample); else if (name == "Null") return std::make_shared(sample); else throw Exception("Unknown format " + name, ErrorCodes::UNKNOWN_FORMAT); } BlockOutputStreamPtr FormatFactory::getOutput(const String & name, WriteBuffer & buf, const Block & sample, const Context & context) const { /** Materialization is needed, because formats can use the functions `IDataType`, * which only work with full columns. */ return std::make_shared(getOutputImpl(name, buf, materializeBlock(sample), context), sample); } }