Remove streams from formats.

This commit is contained in:
Nikolai Kochetov 2021-10-11 19:11:50 +03:00
parent a95c28ec4b
commit ec18340351
107 changed files with 519 additions and 616 deletions

View File

@ -44,8 +44,6 @@
#include <IO/WriteBufferFromOStream.h> #include <IO/WriteBufferFromOStream.h>
#include <IO/UseSSL.h> #include <IO/UseSSL.h>
#include <DataStreams/NullBlockOutputStream.h>
#include <Parsers/ASTCreateQuery.h> #include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTDropQuery.h> #include <Parsers/ASTDropQuery.h>
#include <Parsers/ASTSetQuery.h> #include <Parsers/ASTSetQuery.h>
@ -61,7 +59,6 @@
#include <Functions/registerFunctions.h> #include <Functions/registerFunctions.h>
#include <AggregateFunctions/registerAggregateFunctions.h> #include <AggregateFunctions/registerAggregateFunctions.h>
#include <Formats/registerFormats.h> #include <Formats/registerFormats.h>
#include <Formats/FormatFactory.h>
#include "TestTags.h" #include "TestTags.h"
#ifndef __clang__ #ifndef __clang__

View File

@ -52,7 +52,6 @@
#include <DataStreams/RemoteQueryExecutor.h> #include <DataStreams/RemoteQueryExecutor.h>
#include <DataStreams/SquashingBlockInputStream.h> #include <DataStreams/SquashingBlockInputStream.h>
#include <DataStreams/copyData.h> #include <DataStreams/copyData.h>
#include <DataStreams/NullBlockOutputStream.h>
#include <IO/ConnectionTimeouts.h> #include <IO/ConnectionTimeouts.h>
#include <IO/Operators.h> #include <IO/Operators.h>
#include <IO/ReadBufferFromString.h> #include <IO/ReadBufferFromString.h>

View File

@ -11,6 +11,10 @@
#include <Poco/Net/HTMLForm.h> #include <Poco/Net/HTMLForm.h>
#include <Poco/ThreadPool.h> #include <Poco/ThreadPool.h>
#include <Processors/Formats/InputStreamFromInputFormat.h> #include <Processors/Formats/InputStreamFromInputFormat.h>
#include <Processors/Formats/IOutputFormat.h>
#include <Processors/QueryPipeline.h>
#include <Processors/Executors/CompletedPipelineExecutor.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
#include <Server/HTTP/HTMLForm.h> #include <Server/HTTP/HTMLForm.h>
#include <IO/ReadBufferFromString.h> #include <IO/ReadBufferFromString.h>
@ -63,6 +67,17 @@ namespace
} }
static void writeData(Block data, OutputFormatPtr format)
{
auto source = std::make_shared<SourceFromSingleChunk>(std::move(data));
QueryPipeline pipeline(std::move(source));
pipeline.complete(std::move(format));
CompletedPipelineExecutor executor(pipeline);
executor.execute();
}
void LibraryRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response) void LibraryRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response)
{ {
LOG_TRACE(log, "Request URI: {}", request.getURI()); LOG_TRACE(log, "Request URI: {}", request.getURI());
@ -173,7 +188,7 @@ void LibraryRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServe
} }
ReadBufferFromString read_block_buf(params.get("null_values")); ReadBufferFromString read_block_buf(params.get("null_values"));
auto format = FormatFactory::instance().getInput(FORMAT, read_block_buf, *sample_block, getContext(), DEFAULT_BLOCK_SIZE); auto format = getContext()->getInputFormat(FORMAT, read_block_buf, *sample_block, DEFAULT_BLOCK_SIZE);
auto reader = std::make_shared<InputStreamFromInputFormat>(format); auto reader = std::make_shared<InputStreamFromInputFormat>(format);
auto sample_block_with_nulls = reader->read(); auto sample_block_with_nulls = reader->read();
@ -221,8 +236,8 @@ void LibraryRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServe
auto input = library_handler->loadAll(); auto input = library_handler->loadAll();
LOG_DEBUG(log, "Started sending result data for dictionary id: {}", dictionary_id); LOG_DEBUG(log, "Started sending result data for dictionary id: {}", dictionary_id);
BlockOutputStreamPtr output = FormatFactory::instance().getOutputStream(FORMAT, out, sample_block, getContext()); auto output = FormatFactory::instance().getOutputFormat(FORMAT, out, sample_block, getContext());
copyData(*input, *output); writeData(std::move(input), std::move(output));
} }
else if (method == "loadIds") else if (method == "loadIds")
{ {
@ -239,8 +254,8 @@ void LibraryRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServe
auto input = library_handler->loadIds(ids); auto input = library_handler->loadIds(ids);
LOG_DEBUG(log, "Started sending result data for dictionary id: {}", dictionary_id); LOG_DEBUG(log, "Started sending result data for dictionary id: {}", dictionary_id);
BlockOutputStreamPtr output = FormatFactory::instance().getOutputStream(FORMAT, out, sample_block, getContext()); auto output = FormatFactory::instance().getOutputFormat(FORMAT, out, sample_block, getContext());
copyData(*input, *output); writeData(std::move(input), std::move(output));
} }
else if (method == "loadKeys") else if (method == "loadKeys")
{ {
@ -265,7 +280,7 @@ void LibraryRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServe
} }
auto & read_buf = request.getStream(); auto & read_buf = request.getStream();
auto format = FormatFactory::instance().getInput(FORMAT, read_buf, *requested_sample_block, getContext(), DEFAULT_BLOCK_SIZE); auto format = getContext()->getInputFormat(FORMAT, read_buf, *requested_sample_block, DEFAULT_BLOCK_SIZE);
auto reader = std::make_shared<InputStreamFromInputFormat>(format); auto reader = std::make_shared<InputStreamFromInputFormat>(format);
auto block = reader->read(); auto block = reader->read();
@ -278,8 +293,8 @@ void LibraryRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServe
auto input = library_handler->loadKeys(block.getColumns()); auto input = library_handler->loadKeys(block.getColumns());
LOG_DEBUG(log, "Started sending result data for dictionary id: {}", dictionary_id); LOG_DEBUG(log, "Started sending result data for dictionary id: {}", dictionary_id);
BlockOutputStreamPtr output = FormatFactory::instance().getOutputStream(FORMAT, out, sample_block, getContext()); auto output = FormatFactory::instance().getOutputFormat(FORMAT, out, sample_block, getContext());
copyData(*input, *output); writeData(std::move(input), std::move(output));
} }
} }
catch (...) catch (...)

View File

@ -92,7 +92,7 @@ bool SharedLibraryHandler::supportsSelectiveLoad()
} }
BlockInputStreamPtr SharedLibraryHandler::loadAll() Block SharedLibraryHandler::loadAll()
{ {
auto columns_holder = std::make_unique<ClickHouseLibrary::CString[]>(attributes_names.size()); auto columns_holder = std::make_unique<ClickHouseLibrary::CString[]>(attributes_names.size());
ClickHouseLibrary::CStrings columns{static_cast<decltype(ClickHouseLibrary::CStrings::data)>(columns_holder.get()), attributes_names.size()}; ClickHouseLibrary::CStrings columns{static_cast<decltype(ClickHouseLibrary::CStrings::data)>(columns_holder.get()), attributes_names.size()};
@ -107,13 +107,11 @@ BlockInputStreamPtr SharedLibraryHandler::loadAll()
SCOPE_EXIT(data_delete_func(lib_data, data_ptr)); SCOPE_EXIT(data_delete_func(lib_data, data_ptr));
ClickHouseLibrary::RawClickHouseLibraryTable data = load_all_func(data_ptr, &settings_holder->strings, &columns); ClickHouseLibrary::RawClickHouseLibraryTable data = load_all_func(data_ptr, &settings_holder->strings, &columns);
auto block = dataToBlock(data); return dataToBlock(data);
return std::make_shared<OneBlockInputStream>(block);
} }
BlockInputStreamPtr SharedLibraryHandler::loadIds(const std::vector<uint64_t> & ids) Block SharedLibraryHandler::loadIds(const std::vector<uint64_t> & ids)
{ {
const ClickHouseLibrary::VectorUInt64 ids_data{bit_cast<decltype(ClickHouseLibrary::VectorUInt64::data)>(ids.data()), ids.size()}; const ClickHouseLibrary::VectorUInt64 ids_data{bit_cast<decltype(ClickHouseLibrary::VectorUInt64::data)>(ids.data()), ids.size()};
@ -128,13 +126,11 @@ BlockInputStreamPtr SharedLibraryHandler::loadIds(const std::vector<uint64_t> &
SCOPE_EXIT(data_delete_func(lib_data, data_ptr)); SCOPE_EXIT(data_delete_func(lib_data, data_ptr));
ClickHouseLibrary::RawClickHouseLibraryTable data = load_ids_func(data_ptr, &settings_holder->strings, &columns_pass, &ids_data); ClickHouseLibrary::RawClickHouseLibraryTable data = load_ids_func(data_ptr, &settings_holder->strings, &columns_pass, &ids_data);
auto block = dataToBlock(data); return dataToBlock(data);
return std::make_shared<OneBlockInputStream>(block);
} }
BlockInputStreamPtr SharedLibraryHandler::loadKeys(const Columns & key_columns) Block SharedLibraryHandler::loadKeys(const Columns & key_columns)
{ {
auto holder = std::make_unique<ClickHouseLibrary::Row[]>(key_columns.size()); auto holder = std::make_unique<ClickHouseLibrary::Row[]>(key_columns.size());
std::vector<std::unique_ptr<ClickHouseLibrary::Field[]>> column_data_holders; std::vector<std::unique_ptr<ClickHouseLibrary::Field[]>> column_data_holders;
@ -171,9 +167,7 @@ BlockInputStreamPtr SharedLibraryHandler::loadKeys(const Columns & key_columns)
SCOPE_EXIT(data_delete_func(lib_data, data_ptr)); SCOPE_EXIT(data_delete_func(lib_data, data_ptr));
ClickHouseLibrary::RawClickHouseLibraryTable data = load_keys_func(data_ptr, &settings_holder->strings, &request_cols); ClickHouseLibrary::RawClickHouseLibraryTable data = load_keys_func(data_ptr, &settings_holder->strings, &request_cols);
auto block = dataToBlock(data); return dataToBlock(data);
return std::make_shared<OneBlockInputStream>(block);
} }

View File

@ -27,11 +27,11 @@ public:
~SharedLibraryHandler(); ~SharedLibraryHandler();
BlockInputStreamPtr loadAll(); Block loadAll();
BlockInputStreamPtr loadIds(const std::vector<uint64_t> & ids); Block loadIds(const std::vector<uint64_t> & ids);
BlockInputStreamPtr loadKeys(const Columns & key_columns); Block loadKeys(const Columns & key_columns);
bool isModified(); bool isModified();

View File

@ -24,10 +24,10 @@
#include <Common/typeid_cast.h> #include <Common/typeid_cast.h>
#include <Common/assert_cast.h> #include <Common/assert_cast.h>
#include <Formats/registerFormats.h> #include <Formats/registerFormats.h>
#include <Formats/FormatFactory.h>
#include <Processors/Formats/IInputFormat.h> #include <Processors/Formats/IInputFormat.h>
#include <Processors/QueryPipelineBuilder.h> #include <Processors/QueryPipelineBuilder.h>
#include <Processors/Executors/PullingPipelineExecutor.h> #include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/Executors/PushingPipelineExecutor.h>
#include <Core/Block.h> #include <Core/Block.h>
#include <base/StringRef.h> #include <base/StringRef.h>
#include <base/DateLUT.h> #include <base/DateLUT.h>
@ -1160,7 +1160,7 @@ try
if (!silent) if (!silent)
std::cerr << "Training models\n"; std::cerr << "Training models\n";
Pipe pipe(FormatFactory::instance().getInput(input_format, file_in, header, context, max_block_size)); Pipe pipe(context->getInputFormat(input_format, file_in, header, max_block_size));
QueryPipeline pipeline(std::move(pipe)); QueryPipeline pipeline(std::move(pipe));
PullingPipelineExecutor executor(pipeline); PullingPipelineExecutor executor(pipeline);
@ -1189,7 +1189,7 @@ try
file_in.seek(0, SEEK_SET); file_in.seek(0, SEEK_SET);
Pipe pipe(FormatFactory::instance().getInput(input_format, file_in, header, context, max_block_size)); Pipe pipe(context->getInputFormat(input_format, file_in, header, max_block_size));
if (processed_rows + source_rows > limit) if (processed_rows + source_rows > limit)
{ {
@ -1199,23 +1199,25 @@ try
}); });
} }
QueryPipeline pipeline(std::move(pipe)); QueryPipeline in_pipeline(std::move(pipe));
BlockOutputStreamPtr output = context->getOutputStreamParallelIfPossible(output_format, file_out, header); auto output = context->getOutputFormatParallelIfPossible(output_format, file_out, header);
QueryPipeline out_pipeline(std::move(output));
PullingPipelineExecutor executor(pipeline); PullingPipelineExecutor in_executor(in_pipeline);
PushingPipelineExecutor out_executor(out_pipeline);
output->writePrefix();
Block block; Block block;
while (executor.pull(block)) out_executor.start();
while (in_executor.pull(block))
{ {
Columns columns = obfuscator.generate(block.getColumns()); Columns columns = obfuscator.generate(block.getColumns());
output->write(header.cloneWithColumns(columns)); out_executor.push(header.cloneWithColumns(columns));
processed_rows += block.rows(); processed_rows += block.rows();
if (!silent) if (!silent)
std::cerr << "Processed " << processed_rows << " rows\n"; std::cerr << "Processed " << processed_rows << " rows\n";
} }
output->writeSuffix(); out_executor.finish();
obfuscator.updateSeed(); obfuscator.updateSeed();
} }

View File

@ -16,6 +16,8 @@
#include <Poco/Net/HTMLForm.h> #include <Poco/Net/HTMLForm.h>
#include <Poco/ThreadPool.h> #include <Poco/ThreadPool.h>
#include <Processors/Formats/InputStreamFromInputFormat.h> #include <Processors/Formats/InputStreamFromInputFormat.h>
#include <Processors/QueryPipeline.h>
#include <Processors/Executors/CompletedPipelineExecutor.h>
#include <base/logger_useful.h> #include <base/logger_useful.h>
#include <Server/HTTP/HTMLForm.h> #include <Server/HTTP/HTMLForm.h>
@ -133,10 +135,15 @@ void ODBCHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse
quoting_style = getQuotingStyle(connection_handler); quoting_style = getQuotingStyle(connection_handler);
#endif #endif
auto & read_buf = request.getStream(); auto & read_buf = request.getStream();
auto input_format = FormatFactory::instance().getInput(format, read_buf, *sample_block, getContext(), max_block_size); auto input_format = getContext()->getInputFormat(format, read_buf, *sample_block, max_block_size);
auto input_stream = std::make_shared<InputStreamFromInputFormat>(input_format); auto sink = std::make_shared<ODBCSink>(std::move(connection_handler), db_name, table_name, *sample_block, getContext(), quoting_style);
ODBCBlockOutputStream output_stream(std::move(connection_handler), db_name, table_name, *sample_block, getContext(), quoting_style);
copyData(*input_stream, output_stream); QueryPipeline pipeline(std::move(input_format));
pipeline.complete(std::move(sink));
CompletedPipelineExecutor executor(pipeline);
executor.execute();
writeStringBinary("Ok.", out); writeStringBinary("Ok.", out);
} }
else else
@ -144,9 +151,14 @@ void ODBCHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse
std::string query = params.get("query"); std::string query = params.get("query");
LOG_TRACE(log, "Query: {}", query); LOG_TRACE(log, "Query: {}", query);
BlockOutputStreamPtr writer = FormatFactory::instance().getOutputStreamParallelIfPossible(format, out, *sample_block, getContext()); auto writer = FormatFactory::instance().getOutputFormatParallelIfPossible(format, out, *sample_block, getContext());
ODBCBlockInputStream inp(std::move(connection_handler), query, *sample_block, max_block_size); auto source = std::make_shared<ODBCSource>(std::move(connection_handler), query, *sample_block, max_block_size);
copyData(inp, *writer);
QueryPipeline pipeline(std::move(source));
pipeline.complete(std::move(writer));
CompletedPipelineExecutor executor(pipeline);
executor.execute();
} }
} }
catch (...) catch (...)

View File

@ -19,9 +19,10 @@ namespace ErrorCodes
} }
ODBCBlockInputStream::ODBCBlockInputStream( ODBCSource::ODBCSource(
nanodbc::ConnectionHolderPtr connection_holder, const std::string & query_str, const Block & sample_block, const UInt64 max_block_size_) nanodbc::ConnectionHolderPtr connection_holder, const std::string & query_str, const Block & sample_block, const UInt64 max_block_size_)
: log(&Poco::Logger::get("ODBCBlockInputStream")) : ISource(sample_block)
, log(&Poco::Logger::get("ODBCBlockInputStream"))
, max_block_size{max_block_size_} , max_block_size{max_block_size_}
, query(query_str) , query(query_str)
{ {
@ -31,10 +32,10 @@ ODBCBlockInputStream::ODBCBlockInputStream(
} }
Block ODBCBlockInputStream::readImpl() Chunk ODBCSource::generate()
{ {
if (finished) if (is_finished)
return Block(); return {};
MutableColumns columns(description.sample_block.cloneEmptyColumns()); MutableColumns columns(description.sample_block.cloneEmptyColumns());
size_t num_rows = 0; size_t num_rows = 0;
@ -43,7 +44,7 @@ Block ODBCBlockInputStream::readImpl()
{ {
if (!result.next()) if (!result.next())
{ {
finished = true; is_finished = true;
break; break;
} }
@ -75,11 +76,11 @@ Block ODBCBlockInputStream::readImpl()
break; break;
} }
return description.sample_block.cloneWithColumns(std::move(columns)); return Chunk(std::move(columns), num_rows);
} }
void ODBCBlockInputStream::insertValue( void ODBCSource::insertValue(
IColumn & column, const DataTypePtr data_type, const ValueType type, nanodbc::result & row, size_t idx) IColumn & column, const DataTypePtr data_type, const ValueType type, nanodbc::result & row, size_t idx)
{ {
switch (type) switch (type)

View File

@ -2,7 +2,7 @@
#include <string> #include <string>
#include <Core/Block.h> #include <Core/Block.h>
#include <DataStreams/IBlockInputStream.h> #include <Processors/ISource.h>
#include <Core/ExternalResultDescription.h> #include <Core/ExternalResultDescription.h>
#include "ODBCConnectionFactory.h" #include "ODBCConnectionFactory.h"
@ -10,22 +10,20 @@
namespace DB namespace DB
{ {
/// Allows processing results of a query to ODBC source as a sequence of Blocks, simplifies chaining /// Allows processing results of a query to ODBC source as a sequence of Blocks, simplifies chaining
class ODBCBlockInputStream final : public IBlockInputStream class ODBCSource final : public ISource
{ {
public: public:
ODBCBlockInputStream(nanodbc::ConnectionHolderPtr connection, const std::string & query_str, const Block & sample_block, const UInt64 max_block_size_); ODBCSource(nanodbc::ConnectionHolderPtr connection, const std::string & query_str, const Block & sample_block, UInt64 max_block_size_);
String getName() const override { return "ODBC"; } String getName() const override { return "ODBC"; }
Block getHeader() const override { return description.sample_block.cloneEmpty(); }
private: private:
using QueryResult = std::shared_ptr<nanodbc::result>; using QueryResult = std::shared_ptr<nanodbc::result>;
using ValueType = ExternalResultDescription::ValueType; using ValueType = ExternalResultDescription::ValueType;
Block readImpl() override; Chunk generate() override;
static void insertValue(IColumn & column, const DataTypePtr data_type, const ValueType type, nanodbc::result & row, size_t idx); static void insertValue(IColumn & column, DataTypePtr data_type, ValueType type, nanodbc::result & row, size_t idx);
static void insertDefaultValue(IColumn & column, const IColumn & sample_column) static void insertDefaultValue(IColumn & column, const IColumn & sample_column)
{ {
@ -38,7 +36,7 @@ private:
nanodbc::result result; nanodbc::result result;
String query; String query;
bool finished = false; bool is_finished = false;
}; };
} }

View File

@ -8,7 +8,7 @@
#include "getIdentifierQuote.h" #include "getIdentifierQuote.h"
#include <IO/WriteHelpers.h> #include <IO/WriteHelpers.h>
#include <IO/Operators.h> #include <IO/Operators.h>
#include <Formats/FormatFactory.h> #include <Processors/Formats/IOutputFormat.h>
#include <Parsers/getInsertQuery.h> #include <Parsers/getInsertQuery.h>
@ -16,13 +16,15 @@ namespace DB
{ {
ODBCBlockOutputStream::ODBCBlockOutputStream(nanodbc::ConnectionHolderPtr connection_holder_, ODBCSink::ODBCSink(
nanodbc::ConnectionHolderPtr connection_holder_,
const std::string & remote_database_name_, const std::string & remote_database_name_,
const std::string & remote_table_name_, const std::string & remote_table_name_,
const Block & sample_block_, const Block & sample_block_,
ContextPtr local_context_, ContextPtr local_context_,
IdentifierQuotingStyle quoting_) IdentifierQuotingStyle quoting_)
: log(&Poco::Logger::get("ODBCBlockOutputStream")) : ISink(sample_block_)
, log(&Poco::Logger::get("ODBCBlockOutputStream"))
, connection_holder(std::move(connection_holder_)) , connection_holder(std::move(connection_holder_))
, db_name(remote_database_name_) , db_name(remote_database_name_)
, table_name(remote_table_name_) , table_name(remote_table_name_)
@ -33,15 +35,12 @@ ODBCBlockOutputStream::ODBCBlockOutputStream(nanodbc::ConnectionHolderPtr connec
description.init(sample_block); description.init(sample_block);
} }
Block ODBCBlockOutputStream::getHeader() const
{
return sample_block;
}
void ODBCBlockOutputStream::write(const Block & block) void ODBCSink::consume(Chunk chunk)
{ {
auto block = getPort().getHeader().cloneWithColumns(chunk.detachColumns());
WriteBufferFromOwnString values_buf; WriteBufferFromOwnString values_buf;
auto writer = FormatFactory::instance().getOutputStream("Values", values_buf, sample_block, local_context); auto writer = local_context->getOutputFormat("Values", values_buf, sample_block);
writer->write(block); writer->write(block);
std::string query = getInsertQuery(db_name, table_name, block.getColumnsWithTypeAndName(), quoting) + values_buf.str(); std::string query = getInsertQuery(db_name, table_name, block.getColumnsWithTypeAndName(), quoting) + values_buf.str();

View File

@ -1,7 +1,7 @@
#pragma once #pragma once
#include <Core/Block.h> #include <Core/Block.h>
#include <DataStreams/IBlockOutputStream.h> #include <Processors/ISink.h>
#include <Core/ExternalResultDescription.h> #include <Core/ExternalResultDescription.h>
#include <Parsers/IdentifierQuotingStyle.h> #include <Parsers/IdentifierQuotingStyle.h>
#include <Interpreters/Context_fwd.h> #include <Interpreters/Context_fwd.h>
@ -11,12 +11,12 @@
namespace DB namespace DB
{ {
class ODBCBlockOutputStream : public IBlockOutputStream class ODBCSink final : public ISink
{ {
using ValueType = ExternalResultDescription::ValueType; using ValueType = ExternalResultDescription::ValueType;
public: public:
ODBCBlockOutputStream( ODBCSink(
nanodbc::ConnectionHolderPtr connection_, nanodbc::ConnectionHolderPtr connection_,
const std::string & remote_database_name_, const std::string & remote_database_name_,
const std::string & remote_table_name_, const std::string & remote_table_name_,
@ -24,8 +24,10 @@ public:
ContextPtr local_context_, ContextPtr local_context_,
IdentifierQuotingStyle quoting); IdentifierQuotingStyle quoting);
Block getHeader() const override; String getName() const override { return "ODBCSink"; }
void write(const Block & block) override;
protected:
void consume(Chunk chunk) override;
private: private:
Poco::Logger * log; Poco::Logger * log;

View File

@ -8,7 +8,6 @@
#include <IO/WriteBufferFromString.h> #include <IO/WriteBufferFromString.h>
#include <IO/ReadHelpers.h> #include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h> #include <IO/WriteHelpers.h>
#include <Formats/FormatFactory.h>
#include <Poco/Util/AbstractConfiguration.h> #include <Poco/Util/AbstractConfiguration.h>
#include <Common/ShellCommand.h> #include <Common/ShellCommand.h>
#include <base/logger_useful.h> #include <base/logger_useful.h>
@ -134,8 +133,8 @@ ReadWriteBufferFromHTTP::OutStreamCallback LibraryBridgeHelper::getInitLibraryCa
{ {
/// Sample block must contain null values /// Sample block must contain null values
WriteBufferFromOwnString out; WriteBufferFromOwnString out;
auto output_stream = getContext()->getOutputStream(LibraryBridgeHelper::DEFAULT_FORMAT, out, sample_block); auto output_format = getContext()->getOutputFormat(LibraryBridgeHelper::DEFAULT_FORMAT, out, sample_block);
formatBlock(output_stream, sample_block); formatBlock(output_format, sample_block);
auto block_string = out.str(); auto block_string = out.str();
return [block_string, this](std::ostream & os) return [block_string, this](std::ostream & os)
@ -226,8 +225,8 @@ Pipe LibraryBridgeHelper::loadKeys(const Block & requested_block)
ReadWriteBufferFromHTTP::OutStreamCallback out_stream_callback = [requested_block, this](std::ostream & os) ReadWriteBufferFromHTTP::OutStreamCallback out_stream_callback = [requested_block, this](std::ostream & os)
{ {
WriteBufferFromOStream out_buffer(os); WriteBufferFromOStream out_buffer(os);
auto output_stream = getContext()->getOutputStream(LibraryBridgeHelper::DEFAULT_FORMAT, out_buffer, sample_block); auto output_format = getContext()->getOutputFormat(LibraryBridgeHelper::DEFAULT_FORMAT, out_buffer, sample_block);
formatBlock(output_stream, requested_block); formatBlock(output_format, requested_block);
}; };
return loadBase(uri, out_stream_callback); return loadBase(uri, out_stream_callback);
} }
@ -259,8 +258,7 @@ Pipe LibraryBridgeHelper::loadBase(const Poco::URI & uri, ReadWriteBufferFromHTT
DBMS_DEFAULT_BUFFER_SIZE, DBMS_DEFAULT_BUFFER_SIZE,
ReadWriteBufferFromHTTP::HTTPHeaderEntries{}); ReadWriteBufferFromHTTP::HTTPHeaderEntries{});
auto input_stream = getContext()->getInputFormat(LibraryBridgeHelper::DEFAULT_FORMAT, *read_buf_ptr, sample_block, DEFAULT_BLOCK_SIZE); auto source = getContext()->getInputFormat(LibraryBridgeHelper::DEFAULT_FORMAT, *read_buf_ptr, sample_block, DEFAULT_BLOCK_SIZE);
auto source = FormatFactory::instance().getInput(LibraryBridgeHelper::DEFAULT_FORMAT, *read_buf_ptr, sample_block, getContext(), DEFAULT_BLOCK_SIZE);
source->addBuffer(std::move(read_buf_ptr)); source->addBuffer(std::move(read_buf_ptr));
return Pipe(std::move(source)); return Pipe(std::move(source));
} }

View File

@ -38,16 +38,15 @@
#include <Parsers/ASTLiteral.h> #include <Parsers/ASTLiteral.h>
#include <Parsers/ASTIdentifier.h> #include <Parsers/ASTIdentifier.h>
#include <Formats/FormatFactory.h> #include <Formats/NullFormat.h>
#include <Processors/Formats/IInputFormat.h> #include <Processors/Formats/IInputFormat.h>
#include <Processors/Formats/IOutputFormat.h>
#include <Processors/QueryPipeline.h> #include <Processors/QueryPipeline.h>
#include <Processors/Executors/PullingAsyncPipelineExecutor.h> #include <Processors/Executors/PullingAsyncPipelineExecutor.h>
#include <Processors/Transforms/AddingDefaultsTransform.h> #include <Processors/Transforms/AddingDefaultsTransform.h>
#include <Interpreters/ReplaceQueryParameterVisitor.h> #include <Interpreters/ReplaceQueryParameterVisitor.h>
#include <IO/WriteBufferFromOStream.h> #include <IO/WriteBufferFromOStream.h>
#include <IO/CompressionMethod.h> #include <IO/CompressionMethod.h>
#include <DataStreams/NullBlockOutputStream.h>
#include <DataStreams/InternalTextLogs.h> #include <DataStreams/InternalTextLogs.h>
namespace fs = std::filesystem; namespace fs = std::filesystem;
@ -233,7 +232,7 @@ void ClientBase::onData(Block & block, ASTPtr parsed_query)
initBlockOutputStream(block, parsed_query); initBlockOutputStream(block, parsed_query);
/// The header block containing zero rows was used to initialize /// The header block containing zero rows was used to initialize
/// block_out_stream, do not output it. /// output_format, do not output it.
/// Also do not output too much data if we're fuzzing. /// Also do not output too much data if we're fuzzing.
if (block.rows() == 0 || (query_fuzzer_runs != 0 && processed_rows >= 100)) if (block.rows() == 0 || (query_fuzzer_runs != 0 && processed_rows >= 100))
return; return;
@ -241,11 +240,11 @@ void ClientBase::onData(Block & block, ASTPtr parsed_query)
if (need_render_progress && (stdout_is_a_tty || is_interactive)) if (need_render_progress && (stdout_is_a_tty || is_interactive))
progress_indication.clearProgressOutput(); progress_indication.clearProgressOutput();
block_out_stream->write(block); output_format->write(block);
written_first_block = true; written_first_block = true;
/// Received data block is immediately displayed to the user. /// Received data block is immediately displayed to the user.
block_out_stream->flush(); output_format->flush();
/// Restore progress bar after data block. /// Restore progress bar after data block.
if (need_render_progress && (stdout_is_a_tty || is_interactive)) if (need_render_progress && (stdout_is_a_tty || is_interactive))
@ -265,14 +264,14 @@ void ClientBase::onLogData(Block & block)
void ClientBase::onTotals(Block & block, ASTPtr parsed_query) void ClientBase::onTotals(Block & block, ASTPtr parsed_query)
{ {
initBlockOutputStream(block, parsed_query); initBlockOutputStream(block, parsed_query);
block_out_stream->setTotals(block); output_format->setTotals(block);
} }
void ClientBase::onExtremes(Block & block, ASTPtr parsed_query) void ClientBase::onExtremes(Block & block, ASTPtr parsed_query)
{ {
initBlockOutputStream(block, parsed_query); initBlockOutputStream(block, parsed_query);
block_out_stream->setExtremes(block); output_format->setExtremes(block);
} }
@ -286,19 +285,19 @@ void ClientBase::onReceiveExceptionFromServer(std::unique_ptr<Exception> && e)
void ClientBase::onProfileInfo(const BlockStreamProfileInfo & profile_info) void ClientBase::onProfileInfo(const BlockStreamProfileInfo & profile_info)
{ {
if (profile_info.hasAppliedLimit() && block_out_stream) if (profile_info.hasAppliedLimit() && output_format)
block_out_stream->setRowsBeforeLimit(profile_info.getRowsBeforeLimit()); output_format->setRowsBeforeLimit(profile_info.getRowsBeforeLimit());
} }
void ClientBase::initBlockOutputStream(const Block & block, ASTPtr parsed_query) void ClientBase::initBlockOutputStream(const Block & block, ASTPtr parsed_query)
{ {
if (!block_out_stream) if (!output_format)
{ {
/// Ignore all results when fuzzing as they can be huge. /// Ignore all results when fuzzing as they can be huge.
if (query_fuzzer_runs) if (query_fuzzer_runs)
{ {
block_out_stream = std::make_shared<NullBlockOutputStream>(block); output_format = std::make_shared<NullOutputFormat>(block);
return; return;
} }
@ -360,11 +359,11 @@ void ClientBase::initBlockOutputStream(const Block & block, ASTPtr parsed_query)
/// It is not clear how to write progress with parallel formatting. It may increase code complexity significantly. /// It is not clear how to write progress with parallel formatting. It may increase code complexity significantly.
if (!need_render_progress) if (!need_render_progress)
block_out_stream = global_context->getOutputStreamParallelIfPossible(current_format, out_file_buf ? *out_file_buf : *out_buf, block); output_format = global_context->getOutputFormatParallelIfPossible(current_format, out_file_buf ? *out_file_buf : *out_buf, block);
else else
block_out_stream = global_context->getOutputStream(current_format, out_file_buf ? *out_file_buf : *out_buf, block); output_format = global_context->getOutputFormat(current_format, out_file_buf ? *out_file_buf : *out_buf, block);
block_out_stream->writePrefix(); output_format->doWritePrefix();
} }
} }
@ -626,8 +625,8 @@ void ClientBase::onProgress(const Progress & value)
return; return;
} }
if (block_out_stream) if (output_format)
block_out_stream->onProgress(value); output_format->onProgress(value);
if (need_render_progress) if (need_render_progress)
progress_indication.writeProgress(); progress_indication.writeProgress();
@ -638,8 +637,8 @@ void ClientBase::onEndOfStream()
{ {
progress_indication.clearProgressOutput(); progress_indication.clearProgressOutput();
if (block_out_stream) if (output_format)
block_out_stream->writeSuffix(); output_format->doWriteSuffix();
resetOutput(); resetOutput();
@ -654,7 +653,7 @@ void ClientBase::onEndOfStream()
/// Flush all buffers. /// Flush all buffers.
void ClientBase::resetOutput() void ClientBase::resetOutput()
{ {
block_out_stream.reset(); output_format.reset();
logs_out_stream.reset(); logs_out_stream.reset();
if (pager_cmd) if (pager_cmd)
@ -848,7 +847,7 @@ void ClientBase::sendDataFrom(ReadBuffer & buf, Block & sample, const ColumnsDes
current_format = insert->format; current_format = insert->format;
} }
auto source = FormatFactory::instance().getInput(current_format, buf, sample, global_context, insert_format_max_block_size); auto source = global_context->getInputFormat(current_format, buf, sample, insert_format_max_block_size);
Pipe pipe(source); Pipe pipe(source);
if (columns_description.hasDefaults()) if (columns_description.hasDefaults())

View File

@ -177,7 +177,7 @@ protected:
/// The user can specify to redirect query output to a file. /// The user can specify to redirect query output to a file.
std::unique_ptr<WriteBuffer> out_file_buf; std::unique_ptr<WriteBuffer> out_file_buf;
BlockOutputStreamPtr block_out_stream; std::shared_ptr<IOutputFormat> output_format;
/// The user could specify special file for server logs (stderr by default) /// The user could specify special file for server logs (stderr by default)
std::unique_ptr<WriteBuffer> out_logs_buf; std::unique_ptr<WriteBuffer> out_logs_buf;

View File

@ -9,7 +9,6 @@
#include <IO/Operators.h> #include <IO/Operators.h>
#include <Functions/FunctionFactory.h> #include <Functions/FunctionFactory.h>
#include <TableFunctions/TableFunctionFactory.h> #include <TableFunctions/TableFunctionFactory.h>
#include <Formats/FormatFactory.h>
#include <Storages/StorageFactory.h> #include <Storages/StorageFactory.h>
#include <DataTypes/DataTypeFactory.h> #include <DataTypes/DataTypeFactory.h>
#include <Interpreters/Context.h> #include <Interpreters/Context.h>

View File

@ -1,7 +1,6 @@
#include <re2/re2.h> #include <re2/re2.h>
#include <Poco/URI.h> #include <Poco/URI.h>
#include <Poco/Util/AbstractConfiguration.h> #include <Poco/Util/AbstractConfiguration.h>
#include <Formats/FormatFactory.h>
#include <Common/RemoteHostFilter.h> #include <Common/RemoteHostFilter.h>
#include <Common/StringUtils/StringUtils.h> #include <Common/StringUtils/StringUtils.h>
#include <Common/Exception.h> #include <Common/Exception.h>

View File

@ -19,7 +19,6 @@
#include <Core/ExternalTable.h> #include <Core/ExternalTable.h>
#include <Poco/Net/MessageHeader.h> #include <Poco/Net/MessageHeader.h>
#include <Formats/FormatFactory.h>
#include <base/find_symbols.h> #include <base/find_symbols.h>
@ -36,7 +35,7 @@ ExternalTableDataPtr BaseExternalTable::getData(ContextPtr context)
{ {
initReadBuffer(); initReadBuffer();
initSampleBlock(); initSampleBlock();
auto input = FormatFactory::instance().getInput(format, *read_buffer, sample_block, context, DEFAULT_BLOCK_SIZE); auto input = context->getInputFormat(format, *read_buffer, sample_block, DEFAULT_BLOCK_SIZE);
auto data = std::make_unique<ExternalTableData>(); auto data = std::make_unique<ExternalTableData>();
data->pipe = std::make_unique<Pipe>(std::move(input)); data->pipe = std::make_unique<Pipe>(std::move(input));

View File

@ -1,22 +0,0 @@
#pragma once
#include <DataStreams/IBlockOutputStream.h>
namespace DB
{
/** Does nothing. Used for debugging and benchmarks.
*/
class NullBlockOutputStream : public IBlockOutputStream
{
public:
NullBlockOutputStream(const Block & header_) : header(header_) {}
Block getHeader() const override { return header; }
void write(const Block &) override {}
private:
Block header;
};
}

View File

@ -9,7 +9,6 @@
#include <Common/ThreadPool.h> #include <Common/ThreadPool.h>
#include <IO/ReadHelpers.h> #include <IO/ReadHelpers.h>
#include <Formats/FormatFactory.h>
#include <Processors/ISimpleTransform.h> #include <Processors/ISimpleTransform.h>
#include <Processors/Sources/SourceWithProgress.h> #include <Processors/Sources/SourceWithProgress.h>
#include <Processors/Formats/IInputFormat.h> #include <Processors/Formats/IInputFormat.h>
@ -97,7 +96,7 @@ public:
max_block_size = configuration.number_of_rows_to_read; max_block_size = configuration.number_of_rows_to_read;
} }
pipeline = QueryPipeline(Pipe(FormatFactory::instance().getInput(format, command->out, sample_block, context, max_block_size))); pipeline = QueryPipeline(Pipe(context->getInputFormat(format, command->out, sample_block, max_block_size)));
executor = std::make_unique<PullingPipelineExecutor>(pipeline); executor = std::make_unique<PullingPipelineExecutor>(pipeline);
} }

View File

@ -1,15 +1,20 @@
#include <Core/Block.h> #include <Core/Block.h>
#include <DataStreams/IBlockOutputStream.h>
#include <DataStreams/formatBlock.h> #include <DataStreams/formatBlock.h>
#include <Processors/Formats/IOutputFormat.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
#include <Processors/QueryPipeline.h>
#include <Processors/Executors/CompletedPipelineExecutor.h>
namespace DB namespace DB
{ {
void formatBlock(BlockOutputStreamPtr & out, const Block & block) void formatBlock(OutputFormatPtr out, const Block & block)
{ {
out->writePrefix(); auto source = std::make_shared<SourceFromSingleChunk>(block);
out->write(block); QueryPipeline pipeline(source);
out->writeSuffix(); pipeline.complete(out);
CompletedPipelineExecutor executor(pipeline);
executor.execute();
out->flush(); out->flush();
} }

View File

@ -1,12 +1,14 @@
#pragma once #pragma once
#include <memory>
#include <DataStreams/IBlockStream_fwd.h>
namespace DB namespace DB
{ {
class Block; class Block;
void formatBlock(BlockOutputStreamPtr & out, const Block & block); class IOutputFormat;
using OutputFormatPtr = std::shared_ptr<IOutputFormat>;
void formatBlock(OutputFormatPtr out, const Block & block);
} }

View File

@ -127,8 +127,8 @@ Pipe ExecutableDictionarySource::getStreamForBlock(const Block & block)
writeChar('\n', out); writeChar('\n', out);
} }
auto output_stream = context->getOutputStream(configuration.format, out, block.cloneEmpty()); auto output_format = context->getOutputFormat(configuration.format, out, block.cloneEmpty());
formatBlock(output_stream, block); formatBlock(output_format, block);
out.close(); out.close();
}}; }};
std::vector<ShellCommandSource::SendDataTask> tasks = {std::move(task)}; std::vector<ShellCommandSource::SendDataTask> tasks = {std::move(task)};

View File

@ -119,8 +119,8 @@ Pipe ExecutablePoolDictionarySource::getStreamForBlock(const Block & block)
writeChar('\n', out); writeChar('\n', out);
} }
auto output_stream = context->getOutputStream(configuration.format, out, block.cloneEmpty()); auto output_format = context->getOutputFormat(configuration.format, out, block.cloneEmpty());
formatBlock(output_stream, block); formatBlock(output_format, block);
}; };
std::vector<ShellCommandSource::SendDataTask> tasks = {std::move(task)}; std::vector<ShellCommandSource::SendDataTask> tasks = {std::move(task)};

View File

@ -4,7 +4,6 @@
#include <Common/filesystemHelpers.h> #include <Common/filesystemHelpers.h>
#include <IO/ReadBufferFromFile.h> #include <IO/ReadBufferFromFile.h>
#include <Interpreters/Context.h> #include <Interpreters/Context.h>
#include <Formats/FormatFactory.h>
#include <Processors/Formats/IInputFormat.h> #include <Processors/Formats/IInputFormat.h>
#include "DictionarySourceFactory.h" #include "DictionarySourceFactory.h"
#include "DictionaryStructure.h" #include "DictionaryStructure.h"
@ -51,7 +50,7 @@ Pipe FileDictionarySource::loadAll()
{ {
LOG_TRACE(&Poco::Logger::get("FileDictionary"), "loadAll {}", toString()); LOG_TRACE(&Poco::Logger::get("FileDictionary"), "loadAll {}", toString());
auto in_ptr = std::make_unique<ReadBufferFromFile>(filepath); auto in_ptr = std::make_unique<ReadBufferFromFile>(filepath);
auto source = FormatFactory::instance().getInput(format, *in_ptr, sample_block, context, max_block_size); auto source = context->getInputFormat(format, *in_ptr, sample_block, max_block_size);
source->addBuffer(std::move(in_ptr)); source->addBuffer(std::move(in_ptr));
last_modification = getLastModification(); last_modification = getLastModification();

View File

@ -7,7 +7,6 @@
#include <IO/WriteBufferFromOStream.h> #include <IO/WriteBufferFromOStream.h>
#include <IO/WriteBufferFromString.h> #include <IO/WriteBufferFromString.h>
#include <IO/WriteHelpers.h> #include <IO/WriteHelpers.h>
#include <Formats/FormatFactory.h>
#include <Processors/Formats/IInputFormat.h> #include <Processors/Formats/IInputFormat.h>
#include <Interpreters/Context.h> #include <Interpreters/Context.h>
#include <Poco/Net/HTTPRequest.h> #include <Poco/Net/HTTPRequest.h>
@ -69,7 +68,7 @@ Pipe HTTPDictionarySource::createWrappedBuffer(std::unique_ptr<ReadWriteBufferFr
String http_request_compression_method_str = http_buffer_ptr->getCompressionMethod(); String http_request_compression_method_str = http_buffer_ptr->getCompressionMethod();
auto in_ptr_wrapped auto in_ptr_wrapped
= wrapReadBufferWithCompressionMethod(std::move(http_buffer_ptr), chooseCompressionMethod(uri.getPath(), http_request_compression_method_str)); = wrapReadBufferWithCompressionMethod(std::move(http_buffer_ptr), chooseCompressionMethod(uri.getPath(), http_request_compression_method_str));
auto source = FormatFactory::instance().getInput(configuration.format, *in_ptr_wrapped, sample_block, context, max_block_size); auto source = context->getInputFormat(configuration.format, *in_ptr_wrapped, sample_block, max_block_size);
source->addBuffer(std::move(in_ptr_wrapped)); source->addBuffer(std::move(in_ptr_wrapped));
return Pipe(std::move(source)); return Pipe(std::move(source));
} }
@ -135,8 +134,8 @@ Pipe HTTPDictionarySource::loadIds(const std::vector<UInt64> & ids)
ReadWriteBufferFromHTTP::OutStreamCallback out_stream_callback = [block, this](std::ostream & ostr) ReadWriteBufferFromHTTP::OutStreamCallback out_stream_callback = [block, this](std::ostream & ostr)
{ {
WriteBufferFromOStream out_buffer(ostr); WriteBufferFromOStream out_buffer(ostr);
auto output_stream = context->getOutputStreamParallelIfPossible(configuration.format, out_buffer, sample_block); auto output_format = context->getOutputFormatParallelIfPossible(configuration.format, out_buffer, sample_block);
formatBlock(output_stream, block); formatBlock(output_format, block);
}; };
Poco::URI uri(configuration.url); Poco::URI uri(configuration.url);
@ -162,8 +161,8 @@ Pipe HTTPDictionarySource::loadKeys(const Columns & key_columns, const std::vect
ReadWriteBufferFromHTTP::OutStreamCallback out_stream_callback = [block, this](std::ostream & ostr) ReadWriteBufferFromHTTP::OutStreamCallback out_stream_callback = [block, this](std::ostream & ostr)
{ {
WriteBufferFromOStream out_buffer(ostr); WriteBufferFromOStream out_buffer(ostr);
auto output_stream = context->getOutputStreamParallelIfPossible(configuration.format, out_buffer, sample_block); auto output_format = context->getOutputFormatParallelIfPossible(configuration.format, out_buffer, sample_block);
formatBlock(output_stream, block); formatBlock(output_format, block);
}; };
Poco::URI uri(configuration.url); Poco::URI uri(configuration.url);

View File

@ -3,7 +3,6 @@
#include <Columns/ColumnString.h> #include <Columns/ColumnString.h>
#include <Processors/Sources/SourceWithProgress.h> #include <Processors/Sources/SourceWithProgress.h>
#include <DataTypes/DataTypeString.h> #include <DataTypes/DataTypeString.h>
#include <Formats/FormatFactory.h>
#include <Processors/Formats/InputStreamFromInputFormat.h> #include <Processors/Formats/InputStreamFromInputFormat.h>
#include <IO/ReadWriteBufferFromHTTP.h> #include <IO/ReadWriteBufferFromHTTP.h>
#include <IO/WriteHelpers.h> #include <IO/WriteHelpers.h>
@ -216,7 +215,7 @@ Pipe XDBCDictionarySource::loadFromQuery(const Poco::URI & url, const Block & re
}; };
auto read_buf = std::make_unique<ReadWriteBufferFromHTTP>(url, Poco::Net::HTTPRequest::HTTP_POST, write_body_callback, timeouts); auto read_buf = std::make_unique<ReadWriteBufferFromHTTP>(url, Poco::Net::HTTPRequest::HTTP_POST, write_body_callback, timeouts);
auto format = FormatFactory::instance().getInput(IXDBCBridgeHelper::DEFAULT_FORMAT, *read_buf, required_sample_block, getContext(), max_block_size); auto format = getContext()->getInputFormat(IXDBCBridgeHelper::DEFAULT_FORMAT, *read_buf, required_sample_block, max_block_size);
format->addBuffer(std::move(read_buf)); format->addBuffer(std::move(read_buf));
return Pipe(std::move(format)); return Pipe(std::move(format));

View File

@ -140,7 +140,7 @@ InputFormatPtr FormatFactory::getInput(
auto format_settings = _format_settings auto format_settings = _format_settings
? *_format_settings : getFormatSettings(context); ? *_format_settings : getFormatSettings(context);
if (!getCreators(name).input_processor_creator) if (!getCreators(name).input_creator)
{ {
throw Exception("Format " + name + " is not suitable for input (with processors)", ErrorCodes::FORMAT_IS_NOT_SUITABLE_FOR_INPUT); throw Exception("Format " + name + " is not suitable for input (with processors)", ErrorCodes::FORMAT_IS_NOT_SUITABLE_FOR_INPUT);
} }
@ -168,7 +168,7 @@ InputFormatPtr FormatFactory::getInput(
if (parallel_parsing) if (parallel_parsing)
{ {
const auto & input_getter = getCreators(name).input_processor_creator; const auto & input_getter = getCreators(name).input_creator;
RowInputFormatParams row_input_format_params; RowInputFormatParams row_input_format_params;
row_input_format_params.max_block_size = max_block_size; row_input_format_params.max_block_size = max_block_size;
@ -193,69 +193,6 @@ InputFormatPtr FormatFactory::getInput(
return format; return format;
} }
BlockOutputStreamPtr FormatFactory::getOutputStreamParallelIfPossible(
const String & name,
WriteBuffer & buf,
const Block & sample,
ContextPtr context,
WriteCallback callback,
const std::optional<FormatSettings> & _format_settings) const
{
const auto & output_getter = getCreators(name).output_processor_creator;
const Settings & settings = context->getSettingsRef();
bool parallel_formatting = settings.output_format_parallel_formatting;
auto format_settings = _format_settings ? *_format_settings : getFormatSettings(context);
if (output_getter && parallel_formatting && getCreators(name).supports_parallel_formatting
&& !settings.output_format_json_array_of_rows)
{
auto formatter_creator = [output_getter, sample, callback, format_settings]
(WriteBuffer & output) -> OutputFormatPtr
{ return output_getter(output, sample, {std::move(callback)}, format_settings);};
ParallelFormattingOutputFormat::Params params{buf, sample, formatter_creator, settings.max_threads};
auto format = std::make_shared<ParallelFormattingOutputFormat>(params);
/// Enable auto-flush for streaming mode. Currently it is needed by INSERT WATCH query.
if (format_settings.enable_streaming)
format->setAutoFlush();
return std::make_shared<MaterializingBlockOutputStream>(std::make_shared<OutputStreamToOutputFormat>(format), sample);
}
return getOutputStream(name, buf, sample, context, callback, _format_settings);
}
BlockOutputStreamPtr FormatFactory::getOutputStream(
const String & name,
WriteBuffer & buf,
const Block & sample,
ContextPtr context,
WriteCallback callback,
const std::optional<FormatSettings> & _format_settings) const
{
auto format_settings = _format_settings ? *_format_settings : getFormatSettings(context);
if (!getCreators(name).output_processor_creator)
{
const auto & output_getter = getCreators(name).output_creator;
if (!output_getter)
throw Exception("Format " + name + " is not suitable for output", ErrorCodes::FORMAT_IS_NOT_SUITABLE_FOR_OUTPUT);
/** Materialization is needed, because formats can use the functions `IDataType`,
* which only work with full columns.
*/
return std::make_shared<MaterializingBlockOutputStream>(
output_getter(buf, sample, std::move(callback), format_settings),
sample);
}
auto format = getOutputFormat(name, buf, sample, context, std::move(callback), _format_settings);
return std::make_shared<MaterializingBlockOutputStream>(std::make_shared<OutputStreamToOutputFormat>(format), sample);
}
InputFormatPtr FormatFactory::getInputFormat( InputFormatPtr FormatFactory::getInputFormat(
const String & name, const String & name,
@ -265,7 +202,7 @@ InputFormatPtr FormatFactory::getInputFormat(
UInt64 max_block_size, UInt64 max_block_size,
const std::optional<FormatSettings> & _format_settings) const const std::optional<FormatSettings> & _format_settings) const
{ {
const auto & input_getter = getCreators(name).input_processor_creator; const auto & input_getter = getCreators(name).input_creator;
if (!input_getter) if (!input_getter)
throw Exception("Format " + name + " is not suitable for input", ErrorCodes::FORMAT_IS_NOT_SUITABLE_FOR_INPUT); throw Exception("Format " + name + " is not suitable for input", ErrorCodes::FORMAT_IS_NOT_SUITABLE_FOR_INPUT);
@ -299,7 +236,7 @@ OutputFormatPtr FormatFactory::getOutputFormatParallelIfPossible(
WriteCallback callback, WriteCallback callback,
const std::optional<FormatSettings> & _format_settings) const const std::optional<FormatSettings> & _format_settings) const
{ {
const auto & output_getter = getCreators(name).output_processor_creator; const auto & output_getter = getCreators(name).output_creator;
if (!output_getter) if (!output_getter)
throw Exception(ErrorCodes::FORMAT_IS_NOT_SUITABLE_FOR_OUTPUT, "Format {} is not suitable for output (with processors)", name); throw Exception(ErrorCodes::FORMAT_IS_NOT_SUITABLE_FOR_OUTPUT, "Format {} is not suitable for output (with processors)", name);
@ -334,7 +271,7 @@ OutputFormatPtr FormatFactory::getOutputFormat(
WriteCallback callback, WriteCallback callback,
const std::optional<FormatSettings> & _format_settings) const const std::optional<FormatSettings> & _format_settings) const
{ {
const auto & output_getter = getCreators(name).output_processor_creator; const auto & output_getter = getCreators(name).output_creator;
if (!output_getter) if (!output_getter)
throw Exception(ErrorCodes::FORMAT_IS_NOT_SUITABLE_FOR_OUTPUT, "Format {} is not suitable for output (with processors)", name); throw Exception(ErrorCodes::FORMAT_IS_NOT_SUITABLE_FOR_OUTPUT, "Format {} is not suitable for output (with processors)", name);
@ -362,7 +299,6 @@ OutputFormatPtr FormatFactory::getOutputFormat(
return format; return format;
} }
void FormatFactory::registerInputFormat(const String & name, InputCreator input_creator) void FormatFactory::registerInputFormat(const String & name, InputCreator input_creator)
{ {
auto & target = dict[name].input_creator; auto & target = dict[name].input_creator;
@ -371,22 +307,6 @@ void FormatFactory::registerInputFormat(const String & name, InputCreator input_
target = std::move(input_creator); target = std::move(input_creator);
} }
void FormatFactory::registerOutputFormat(const String & name, OutputCreator output_creator)
{
auto & target = dict[name].output_creator;
if (target)
throw Exception("FormatFactory: Output format " + name + " is already registered", ErrorCodes::LOGICAL_ERROR);
target = std::move(output_creator);
}
void FormatFactory::registerInputFormatProcessor(const String & name, InputProcessorCreator input_creator)
{
auto & target = dict[name].input_processor_creator;
if (target)
throw Exception("FormatFactory: Input format " + name + " is already registered", ErrorCodes::LOGICAL_ERROR);
target = std::move(input_creator);
}
void FormatFactory::registerNonTrivialPrefixAndSuffixChecker(const String & name, NonTrivialPrefixAndSuffixChecker non_trivial_prefix_and_suffix_checker) void FormatFactory::registerNonTrivialPrefixAndSuffixChecker(const String & name, NonTrivialPrefixAndSuffixChecker non_trivial_prefix_and_suffix_checker)
{ {
auto & target = dict[name].non_trivial_prefix_and_suffix_checker; auto & target = dict[name].non_trivial_prefix_and_suffix_checker;
@ -395,9 +315,9 @@ void FormatFactory::registerNonTrivialPrefixAndSuffixChecker(const String & name
target = std::move(non_trivial_prefix_and_suffix_checker); target = std::move(non_trivial_prefix_and_suffix_checker);
} }
void FormatFactory::registerOutputFormatProcessor(const String & name, OutputProcessorCreator output_creator) void FormatFactory::registerOutputFormat(const String & name, OutputCreator output_creator)
{ {
auto & target = dict[name].output_processor_creator; auto & target = dict[name].output_creator;
if (target) if (target)
throw Exception("FormatFactory: Output format " + name + " is already registered", ErrorCodes::LOGICAL_ERROR); throw Exception("FormatFactory: Output format " + name + " is already registered", ErrorCodes::LOGICAL_ERROR);
target = std::move(output_creator); target = std::move(output_creator);
@ -439,13 +359,13 @@ bool FormatFactory::checkIfFormatIsColumnOriented(const String & name)
bool FormatFactory::isInputFormat(const String & name) const bool FormatFactory::isInputFormat(const String & name) const
{ {
auto it = dict.find(name); auto it = dict.find(name);
return it != dict.end() && (it->second.input_creator || it->second.input_processor_creator); return it != dict.end() && it->second.input_creator;
} }
bool FormatFactory::isOutputFormat(const String & name) const bool FormatFactory::isOutputFormat(const String & name) const
{ {
auto it = dict.find(name); auto it = dict.find(name);
return it != dict.end() && (it->second.output_creator || it->second.output_processor_creator); return it != dict.end() && it->second.output_creator;
} }
FormatFactory & FormatFactory::instance() FormatFactory & FormatFactory::instance()

View File

@ -66,28 +66,16 @@ public:
size_t row)>; size_t row)>;
private: private:
using InputCreator = std::function<BlockInputStreamPtr(
ReadBuffer & buf,
const Block & sample,
UInt64 max_block_size,
ReadCallback callback,
const FormatSettings & settings)>;
using OutputCreator = std::function<BlockOutputStreamPtr( using InputCreatorFunc = InputFormatPtr(
WriteBuffer & buf,
const Block & sample,
WriteCallback callback,
const FormatSettings & settings)>;
using InputProcessorCreatorFunc = InputFormatPtr(
ReadBuffer & buf, ReadBuffer & buf,
const Block & header, const Block & header,
const RowInputFormatParams & params, const RowInputFormatParams & params,
const FormatSettings & settings); const FormatSettings & settings);
using InputProcessorCreator = std::function<InputProcessorCreatorFunc>; using InputCreator = std::function<InputCreatorFunc>;
using OutputProcessorCreator = std::function<OutputFormatPtr( using OutputCreator = std::function<OutputFormatPtr(
WriteBuffer & buf, WriteBuffer & buf,
const Block & sample, const Block & sample,
const RowOutputFormatParams & params, const RowOutputFormatParams & params,
@ -102,8 +90,6 @@ private:
{ {
InputCreator input_creator; InputCreator input_creator;
OutputCreator output_creator; OutputCreator output_creator;
InputProcessorCreator input_processor_creator;
OutputProcessorCreator output_processor_creator;
FileSegmentationEngine file_segmentation_engine; FileSegmentationEngine file_segmentation_engine;
bool supports_parallel_formatting{false}; bool supports_parallel_formatting{false};
bool is_column_oriented{false}; bool is_column_oriented{false};
@ -123,25 +109,6 @@ public:
UInt64 max_block_size, UInt64 max_block_size,
const std::optional<FormatSettings> & format_settings = std::nullopt) const; const std::optional<FormatSettings> & format_settings = std::nullopt) const;
/// Checks all preconditions. Returns ordinary stream if parallel formatting cannot be done.
/// Currently used only in Client. Don't use it something else! Better look at getOutputFormatParallelIfPossible.
BlockOutputStreamPtr getOutputStreamParallelIfPossible(
const String & name,
WriteBuffer & buf,
const Block & sample,
ContextPtr context,
WriteCallback callback = {},
const std::optional<FormatSettings> & format_settings = std::nullopt) const;
/// Currently used only in Client. Don't use it something else! Better look at getOutputFormat.
BlockOutputStreamPtr getOutputStream(
const String & name,
WriteBuffer & buf,
const Block & sample,
ContextPtr context,
WriteCallback callback = {},
const std::optional<FormatSettings> & format_settings = std::nullopt) const;
InputFormatPtr getInputFormat( InputFormatPtr getInputFormat(
const String & name, const String & name,
ReadBuffer & buf, ReadBuffer & buf,
@ -167,15 +134,13 @@ public:
WriteCallback callback = {}, WriteCallback callback = {},
const std::optional<FormatSettings> & format_settings = std::nullopt) const; const std::optional<FormatSettings> & format_settings = std::nullopt) const;
/// Register format by its name.
void registerInputFormat(const String & name, InputCreator input_creator);
void registerOutputFormat(const String & name, OutputCreator output_creator);
void registerFileSegmentationEngine(const String & name, FileSegmentationEngine file_segmentation_engine); void registerFileSegmentationEngine(const String & name, FileSegmentationEngine file_segmentation_engine);
void registerNonTrivialPrefixAndSuffixChecker(const String & name, NonTrivialPrefixAndSuffixChecker non_trivial_prefix_and_suffix_checker); void registerNonTrivialPrefixAndSuffixChecker(const String & name, NonTrivialPrefixAndSuffixChecker non_trivial_prefix_and_suffix_checker);
void registerInputFormatProcessor(const String & name, InputProcessorCreator input_creator); /// Register format by its name.
void registerOutputFormatProcessor(const String & name, OutputProcessorCreator output_creator); void registerInputFormat(const String & name, InputCreator input_creator);
void registerOutputFormat(const String & name, OutputCreator output_creator);
void markOutputFormatSupportsParallelFormatting(const String & name); void markOutputFormatSupportsParallelFormatting(const String & name);
void markFormatAsColumnOriented(const String & name); void markFormatAsColumnOriented(const String & name);

View File

@ -84,7 +84,7 @@ private:
void registerInputFormatNative(FormatFactory & factory) void registerInputFormatNative(FormatFactory & factory)
{ {
factory.registerInputFormatProcessor("Native", []( factory.registerInputFormat("Native", [](
ReadBuffer & buf, ReadBuffer & buf,
const Block & sample, const Block & sample,
const RowInputFormatParams &, const RowInputFormatParams &,
@ -96,7 +96,7 @@ void registerInputFormatNative(FormatFactory & factory)
void registerOutputFormatNative(FormatFactory & factory) void registerOutputFormatNative(FormatFactory & factory)
{ {
factory.registerOutputFormatProcessor("Native", []( factory.registerOutputFormat("Native", [](
WriteBuffer & buf, WriteBuffer & buf,
const Block & sample, const Block & sample,
const RowOutputFormatParams &, const RowOutputFormatParams &,

View File

@ -1,19 +1,22 @@
#include <DataStreams/NullBlockOutputStream.h> #include <Formats/NullFormat.h>
#include <Formats/FormatFactory.h> #include <Formats/FormatFactory.h>
#include <IO/WriteBuffer.h>
namespace DB namespace DB
{ {
WriteBuffer NullOutputFormat::empty_buffer(nullptr, 0);
void registerOutputFormatNull(FormatFactory & factory) void registerOutputFormatNull(FormatFactory & factory)
{ {
factory.registerOutputFormat("Null", []( factory.registerOutputFormat("Null", [](
WriteBuffer &, WriteBuffer &,
const Block & sample, const Block & sample,
FormatFactory::WriteCallback, const RowOutputFormatParams &,
const FormatSettings &) const FormatSettings &)
{ {
return std::make_shared<NullBlockOutputStream>(sample); return std::make_shared<NullOutputFormat>(sample);
}); });
} }

20
src/Formats/NullFormat.h Normal file
View File

@ -0,0 +1,20 @@
#include <Processors/Formats/IOutputFormat.h>
namespace DB
{
class NullOutputFormat final : public IOutputFormat
{
public:
explicit NullOutputFormat(const Block & header) : IOutputFormat(header, empty_buffer) {}
String getName() const override { return "Null"; }
protected:
void consume(Chunk) override {}
private:
static WriteBuffer empty_buffer;
};
}

View File

@ -21,61 +21,61 @@ void registerFileSegmentationEngineJSONAsString(FormatFactory & factory);
void registerInputFormatNative(FormatFactory & factory); void registerInputFormatNative(FormatFactory & factory);
void registerOutputFormatNative(FormatFactory & factory); void registerOutputFormatNative(FormatFactory & factory);
void registerInputFormatProcessorRowBinary(FormatFactory & factory); void registerInputFormatRowBinary(FormatFactory & factory);
void registerOutputFormatProcessorRowBinary(FormatFactory & factory); void registerOutputFormatRowBinary(FormatFactory & factory);
void registerInputFormatProcessorTabSeparated(FormatFactory & factory); void registerInputFormatTabSeparated(FormatFactory & factory);
void registerOutputFormatProcessorTabSeparated(FormatFactory & factory); void registerOutputFormatTabSeparated(FormatFactory & factory);
void registerInputFormatProcessorValues(FormatFactory & factory); void registerInputFormatValues(FormatFactory & factory);
void registerOutputFormatProcessorValues(FormatFactory & factory); void registerOutputFormatValues(FormatFactory & factory);
void registerInputFormatProcessorCSV(FormatFactory & factory); void registerInputFormatCSV(FormatFactory & factory);
void registerOutputFormatProcessorCSV(FormatFactory & factory); void registerOutputFormatCSV(FormatFactory & factory);
void registerInputFormatProcessorTSKV(FormatFactory & factory); void registerInputFormatTSKV(FormatFactory & factory);
void registerOutputFormatProcessorTSKV(FormatFactory & factory); void registerOutputFormatTSKV(FormatFactory & factory);
void registerInputFormatProcessorJSONEachRow(FormatFactory & factory); void registerInputFormatJSONEachRow(FormatFactory & factory);
void registerOutputFormatProcessorJSONEachRow(FormatFactory & factory); void registerOutputFormatJSONEachRow(FormatFactory & factory);
void registerInputFormatProcessorJSONCompactEachRow(FormatFactory & factory); void registerInputFormatJSONCompactEachRow(FormatFactory & factory);
void registerOutputFormatProcessorJSONCompactEachRow(FormatFactory & factory); void registerOutputFormatJSONCompactEachRow(FormatFactory & factory);
void registerInputFormatProcessorProtobuf(FormatFactory & factory); void registerInputFormatProtobuf(FormatFactory & factory);
void registerOutputFormatProcessorProtobuf(FormatFactory & factory); void registerOutputFormatProtobuf(FormatFactory & factory);
void registerInputFormatProcessorTemplate(FormatFactory & factory); void registerInputFormatTemplate(FormatFactory & factory);
void registerOutputFormatProcessorTemplate(FormatFactory & factory); void registerOutputFormatTemplate(FormatFactory & factory);
void registerInputFormatProcessorMsgPack(FormatFactory & factory); void registerInputFormatMsgPack(FormatFactory & factory);
void registerOutputFormatProcessorMsgPack(FormatFactory & factory); void registerOutputFormatMsgPack(FormatFactory & factory);
void registerInputFormatProcessorORC(FormatFactory & factory); void registerInputFormatORC(FormatFactory & factory);
void registerOutputFormatProcessorORC(FormatFactory & factory); void registerOutputFormatORC(FormatFactory & factory);
void registerInputFormatProcessorParquet(FormatFactory & factory); void registerInputFormatParquet(FormatFactory & factory);
void registerOutputFormatProcessorParquet(FormatFactory & factory); void registerOutputFormatParquet(FormatFactory & factory);
void registerInputFormatProcessorArrow(FormatFactory & factory); void registerInputFormatArrow(FormatFactory & factory);
void registerOutputFormatProcessorArrow(FormatFactory & factory); void registerOutputFormatArrow(FormatFactory & factory);
void registerInputFormatProcessorAvro(FormatFactory & factory); void registerInputFormatAvro(FormatFactory & factory);
void registerOutputFormatProcessorAvro(FormatFactory & factory); void registerOutputFormatAvro(FormatFactory & factory);
void registerInputFormatProcessorRawBLOB(FormatFactory & factory); void registerInputFormatRawBLOB(FormatFactory & factory);
void registerOutputFormatProcessorRawBLOB(FormatFactory & factory); void registerOutputFormatRawBLOB(FormatFactory & factory);
/// Output only (presentational) formats. /// Output only (presentational) formats.
void registerOutputFormatNull(FormatFactory & factory); void registerOutputFormatNull(FormatFactory & factory);
void registerOutputFormatProcessorPretty(FormatFactory & factory); void registerOutputFormatPretty(FormatFactory & factory);
void registerOutputFormatProcessorPrettyCompact(FormatFactory & factory); void registerOutputFormatPrettyCompact(FormatFactory & factory);
void registerOutputFormatProcessorPrettySpace(FormatFactory & factory); void registerOutputFormatPrettySpace(FormatFactory & factory);
void registerOutputFormatProcessorVertical(FormatFactory & factory); void registerOutputFormatVertical(FormatFactory & factory);
void registerOutputFormatProcessorJSON(FormatFactory & factory); void registerOutputFormatJSON(FormatFactory & factory);
void registerOutputFormatProcessorJSONCompact(FormatFactory & factory); void registerOutputFormatJSONCompact(FormatFactory & factory);
void registerOutputFormatProcessorJSONEachRowWithProgress(FormatFactory & factory); void registerOutputFormatJSONEachRowWithProgress(FormatFactory & factory);
void registerOutputFormatProcessorXML(FormatFactory & factory); void registerOutputFormatXML(FormatFactory & factory);
void registerOutputFormatProcessorODBCDriver2(FormatFactory & factory); void registerOutputFormatODBCDriver2(FormatFactory & factory);
void registerOutputFormatProcessorNull(FormatFactory & factory); void registerOutputFormatNull(FormatFactory & factory);
void registerOutputFormatProcessorMySQLWire(FormatFactory & factory); void registerOutputFormatMySQLWire(FormatFactory & factory);
void registerOutputFormatProcessorMarkdown(FormatFactory & factory); void registerOutputFormatMarkdown(FormatFactory & factory);
void registerOutputFormatProcessorPostgreSQLWire(FormatFactory & factory); void registerOutputFormatPostgreSQLWire(FormatFactory & factory);
/// Input only formats. /// Input only formats.
void registerInputFormatProcessorRegexp(FormatFactory & factory); void registerInputFormatRegexp(FormatFactory & factory);
void registerInputFormatProcessorJSONAsString(FormatFactory & factory); void registerInputFormatJSONAsString(FormatFactory & factory);
void registerInputFormatProcessorLineAsString(FormatFactory & factory); void registerInputFormatLineAsString(FormatFactory & factory);
void registerInputFormatProcessorCapnProto(FormatFactory & factory); void registerInputFormatCapnProto(FormatFactory & factory);
/// Non trivial prefix and suffix checkers for disabling parallel parsing. /// Non trivial prefix and suffix checkers for disabling parallel parsing.
void registerNonTrivialPrefixAndSuffixCheckerJSONEachRow(FormatFactory & factory); void registerNonTrivialPrefixAndSuffixCheckerJSONEachRow(FormatFactory & factory);
@ -94,62 +94,60 @@ void registerFormats()
registerInputFormatNative(factory); registerInputFormatNative(factory);
registerOutputFormatNative(factory); registerOutputFormatNative(factory);
registerInputFormatProcessorRowBinary(factory); registerInputFormatRowBinary(factory);
registerOutputFormatProcessorRowBinary(factory); registerOutputFormatRowBinary(factory);
registerInputFormatProcessorTabSeparated(factory); registerInputFormatTabSeparated(factory);
registerOutputFormatProcessorTabSeparated(factory); registerOutputFormatTabSeparated(factory);
registerInputFormatProcessorValues(factory); registerInputFormatValues(factory);
registerOutputFormatProcessorValues(factory); registerOutputFormatValues(factory);
registerInputFormatProcessorCSV(factory); registerInputFormatCSV(factory);
registerOutputFormatProcessorCSV(factory); registerOutputFormatCSV(factory);
registerInputFormatProcessorTSKV(factory); registerInputFormatTSKV(factory);
registerOutputFormatProcessorTSKV(factory); registerOutputFormatTSKV(factory);
registerInputFormatProcessorJSONEachRow(factory); registerInputFormatJSONEachRow(factory);
registerOutputFormatProcessorJSONEachRow(factory); registerOutputFormatJSONEachRow(factory);
registerInputFormatProcessorJSONCompactEachRow(factory); registerInputFormatJSONCompactEachRow(factory);
registerOutputFormatProcessorJSONCompactEachRow(factory); registerOutputFormatJSONCompactEachRow(factory);
registerInputFormatProcessorProtobuf(factory); registerInputFormatProtobuf(factory);
registerOutputFormatProcessorProtobuf(factory); registerOutputFormatProtobuf(factory);
registerInputFormatProcessorTemplate(factory); registerInputFormatTemplate(factory);
registerOutputFormatProcessorTemplate(factory); registerOutputFormatTemplate(factory);
registerInputFormatProcessorMsgPack(factory); registerInputFormatMsgPack(factory);
registerOutputFormatProcessorMsgPack(factory); registerOutputFormatMsgPack(factory);
registerInputFormatProcessorRawBLOB(factory); registerInputFormatRawBLOB(factory);
registerOutputFormatProcessorRawBLOB(factory); registerOutputFormatRawBLOB(factory);
registerInputFormatProcessorORC(factory); registerInputFormatORC(factory);
registerOutputFormatProcessorORC(factory); registerOutputFormatORC(factory);
registerInputFormatProcessorParquet(factory); registerInputFormatParquet(factory);
registerOutputFormatProcessorParquet(factory); registerOutputFormatParquet(factory);
#if !defined(ARCADIA_BUILD) #if !defined(ARCADIA_BUILD)
registerInputFormatProcessorAvro(factory); registerInputFormatAvro(factory);
registerOutputFormatProcessorAvro(factory); registerOutputFormatAvro(factory);
#endif #endif
registerInputFormatProcessorArrow(factory); registerInputFormatArrow(factory);
registerOutputFormatProcessorArrow(factory); registerOutputFormatArrow(factory);
registerOutputFormatPretty(factory);
registerOutputFormatPrettyCompact(factory);
registerOutputFormatPrettySpace(factory);
registerOutputFormatVertical(factory);
registerOutputFormatJSON(factory);
registerOutputFormatJSONCompact(factory);
registerOutputFormatJSONEachRowWithProgress(factory);
registerOutputFormatXML(factory);
registerOutputFormatODBCDriver2(factory);
registerOutputFormatNull(factory); registerOutputFormatNull(factory);
registerOutputFormatMySQLWire(factory);
registerOutputFormatMarkdown(factory);
registerOutputFormatPostgreSQLWire(factory);
registerOutputFormatProcessorPretty(factory); registerInputFormatRegexp(factory);
registerOutputFormatProcessorPrettyCompact(factory); registerInputFormatJSONAsString(factory);
registerOutputFormatProcessorPrettySpace(factory); registerInputFormatLineAsString(factory);
registerOutputFormatProcessorVertical(factory);
registerOutputFormatProcessorJSON(factory);
registerOutputFormatProcessorJSONCompact(factory);
registerOutputFormatProcessorJSONEachRowWithProgress(factory);
registerOutputFormatProcessorXML(factory);
registerOutputFormatProcessorODBCDriver2(factory);
registerOutputFormatProcessorNull(factory);
registerOutputFormatProcessorMySQLWire(factory);
registerOutputFormatProcessorMarkdown(factory);
registerOutputFormatProcessorPostgreSQLWire(factory);
registerInputFormatProcessorRegexp(factory);
registerInputFormatProcessorJSONAsString(factory);
registerInputFormatProcessorLineAsString(factory);
#if !defined(ARCADIA_BUILD) #if !defined(ARCADIA_BUILD)
registerInputFormatProcessorCapnProto(factory); registerInputFormatCapnProto(factory);
#endif #endif
registerNonTrivialPrefixAndSuffixCheckerJSONEachRow(factory); registerNonTrivialPrefixAndSuffixCheckerJSONEachRow(factory);

View File

@ -2428,20 +2428,14 @@ void Context::checkPartitionCanBeDropped(const String & database, const String &
} }
BlockInputStreamPtr Context::getInputFormat(const String & name, ReadBuffer & buf, const Block & sample, UInt64 max_block_size) const InputFormatPtr Context::getInputFormat(const String & name, ReadBuffer & buf, const Block & sample, UInt64 max_block_size, const std::optional<FormatSettings> & format_settings) const
{ {
return std::make_shared<InputStreamFromInputFormat>( return FormatFactory::instance().getInput(name, buf, sample, shared_from_this(), max_block_size, format_settings);
FormatFactory::instance().getInput(name, buf, sample, shared_from_this(), max_block_size));
} }
BlockOutputStreamPtr Context::getOutputStreamParallelIfPossible(const String & name, WriteBuffer & buf, const Block & sample) const OutputFormatPtr Context::getOutputFormat(const String & name, WriteBuffer & buf, const Block & sample) const
{ {
return FormatFactory::instance().getOutputStreamParallelIfPossible(name, buf, sample, shared_from_this()); return FormatFactory::instance().getOutputFormat(name, buf, sample, shared_from_this());
}
BlockOutputStreamPtr Context::getOutputStream(const String & name, WriteBuffer & buf, const Block & sample) const
{
return FormatFactory::instance().getOutputStream(name, buf, sample, shared_from_this());
} }
OutputFormatPtr Context::getOutputFormatParallelIfPossible(const String & name, WriteBuffer & buf, const Block & sample) const OutputFormatPtr Context::getOutputFormatParallelIfPossible(const String & name, WriteBuffer & buf, const Block & sample) const

View File

@ -116,7 +116,9 @@ using PartUUIDsPtr = std::shared_ptr<PartUUIDs>;
class KeeperDispatcher; class KeeperDispatcher;
class Session; class Session;
class IInputFormat;
class IOutputFormat; class IOutputFormat;
using InputFormatPtr = std::shared_ptr<IInputFormat>;
using OutputFormatPtr = std::shared_ptr<IOutputFormat>; using OutputFormatPtr = std::shared_ptr<IOutputFormat>;
class IVolume; class IVolume;
using VolumePtr = std::shared_ptr<IVolume>; using VolumePtr = std::shared_ptr<IVolume>;
@ -571,12 +573,9 @@ public:
#endif #endif
/// I/O formats. /// I/O formats.
BlockInputStreamPtr getInputFormat(const String & name, ReadBuffer & buf, const Block & sample, UInt64 max_block_size) const; InputFormatPtr getInputFormat(const String & name, ReadBuffer & buf, const Block & sample, UInt64 max_block_size, const std::optional<FormatSettings> & format_settings = std::nullopt) const;
/// Don't use streams. Better look at getOutputFormat...
BlockOutputStreamPtr getOutputStreamParallelIfPossible(const String & name, WriteBuffer & buf, const Block & sample) const;
BlockOutputStreamPtr getOutputStream(const String & name, WriteBuffer & buf, const Block & sample) const;
OutputFormatPtr getOutputFormat(const String & name, WriteBuffer & buf, const Block & sample) const;
OutputFormatPtr getOutputFormatParallelIfPossible(const String & name, WriteBuffer & buf, const Block & sample) const; OutputFormatPtr getOutputFormatParallelIfPossible(const String & name, WriteBuffer & buf, const Block & sample) const;
InterserverIOHandler & getInterserverIOHandler(); InterserverIOHandler & getInterserverIOHandler();

View File

@ -100,8 +100,8 @@ public:
writeChar('\n', out); writeChar('\n', out);
} }
auto output_stream = context->getOutputStream(configuration.format, out, arguments_block.cloneEmpty()); auto output_format = context->getOutputFormat(configuration.format, out, arguments_block.cloneEmpty());
formatBlock(output_stream, arguments_block); formatBlock(output_format, arguments_block);
if (!is_executable_pool_function) if (!is_executable_pool_function)
out.close(); out.close();
}}; }};

View File

@ -112,9 +112,9 @@ void ArrowBlockInputFormat::prepareReader()
record_batch_current = 0; record_batch_current = 0;
} }
void registerInputFormatProcessorArrow(FormatFactory & factory) void registerInputFormatArrow(FormatFactory & factory)
{ {
factory.registerInputFormatProcessor( factory.registerInputFormat(
"Arrow", "Arrow",
[](ReadBuffer & buf, [](ReadBuffer & buf,
const Block & sample, const Block & sample,
@ -124,7 +124,7 @@ void registerInputFormatProcessorArrow(FormatFactory & factory)
return std::make_shared<ArrowBlockInputFormat>(buf, sample, false, format_settings); return std::make_shared<ArrowBlockInputFormat>(buf, sample, false, format_settings);
}); });
factory.markFormatAsColumnOriented("Arrow"); factory.markFormatAsColumnOriented("Arrow");
factory.registerInputFormatProcessor( factory.registerInputFormat(
"ArrowStream", "ArrowStream",
[](ReadBuffer & buf, [](ReadBuffer & buf,
const Block & sample, const Block & sample,
@ -141,7 +141,7 @@ void registerInputFormatProcessorArrow(FormatFactory & factory)
namespace DB namespace DB
{ {
class FormatFactory; class FormatFactory;
void registerInputFormatProcessorArrow(FormatFactory &) void registerInputFormatArrow(FormatFactory &)
{ {
} }
} }

View File

@ -82,9 +82,9 @@ void ArrowBlockOutputFormat::prepareWriter(const std::shared_ptr<arrow::Schema>
writer = *writer_status; writer = *writer_status;
} }
void registerOutputFormatProcessorArrow(FormatFactory & factory) void registerOutputFormatArrow(FormatFactory & factory)
{ {
factory.registerOutputFormatProcessor( factory.registerOutputFormat(
"Arrow", "Arrow",
[](WriteBuffer & buf, [](WriteBuffer & buf,
const Block & sample, const Block & sample,
@ -94,7 +94,7 @@ void registerOutputFormatProcessorArrow(FormatFactory & factory)
return std::make_shared<ArrowBlockOutputFormat>(buf, sample, false, format_settings); return std::make_shared<ArrowBlockOutputFormat>(buf, sample, false, format_settings);
}); });
factory.registerOutputFormatProcessor( factory.registerOutputFormat(
"ArrowStream", "ArrowStream",
[](WriteBuffer & buf, [](WriteBuffer & buf,
const Block & sample, const Block & sample,
@ -112,7 +112,7 @@ void registerOutputFormatProcessorArrow(FormatFactory & factory)
namespace DB namespace DB
{ {
class FormatFactory; class FormatFactory;
void registerOutputFormatProcessorArrow(FormatFactory &) void registerOutputFormatArrow(FormatFactory &)
{ {
} }
} }

View File

@ -806,9 +806,9 @@ const AvroDeserializer & AvroConfluentRowInputFormat::getOrCreateDeserializer(Sc
return it->second; return it->second;
} }
void registerInputFormatProcessorAvro(FormatFactory & factory) void registerInputFormatAvro(FormatFactory & factory)
{ {
factory.registerInputFormatProcessor("Avro", []( factory.registerInputFormat("Avro", [](
ReadBuffer & buf, ReadBuffer & buf,
const Block & sample, const Block & sample,
const RowInputFormatParams & params, const RowInputFormatParams & params,
@ -817,7 +817,7 @@ void registerInputFormatProcessorAvro(FormatFactory & factory)
return std::make_shared<AvroRowInputFormat>(sample, buf, params, settings); return std::make_shared<AvroRowInputFormat>(sample, buf, params, settings);
}); });
factory.registerInputFormatProcessor("AvroConfluent",[]( factory.registerInputFormat("AvroConfluent",[](
ReadBuffer & buf, ReadBuffer & buf,
const Block & sample, const Block & sample,
const RowInputFormatParams & params, const RowInputFormatParams & params,
@ -834,7 +834,7 @@ void registerInputFormatProcessorAvro(FormatFactory & factory)
namespace DB namespace DB
{ {
class FormatFactory; class FormatFactory;
void registerInputFormatProcessorAvro(FormatFactory &) void registerInputFormatAvro(FormatFactory &)
{ {
} }
} }

View File

@ -413,9 +413,9 @@ void AvroRowOutputFormat::writeSuffix()
file_writer.close(); file_writer.close();
} }
void registerOutputFormatProcessorAvro(FormatFactory & factory) void registerOutputFormatAvro(FormatFactory & factory)
{ {
factory.registerOutputFormatProcessor("Avro", []( factory.registerOutputFormat("Avro", [](
WriteBuffer & buf, WriteBuffer & buf,
const Block & sample, const Block & sample,
const RowOutputFormatParams & params, const RowOutputFormatParams & params,
@ -432,7 +432,7 @@ void registerOutputFormatProcessorAvro(FormatFactory & factory)
namespace DB namespace DB
{ {
class FormatFactory; class FormatFactory;
void registerOutputFormatProcessorAvro(FormatFactory &) void registerOutputFormatAvro(FormatFactory &)
{ {
} }
} }

View File

@ -56,9 +56,9 @@ void BinaryRowInputFormat::readPrefix()
} }
void registerInputFormatProcessorRowBinary(FormatFactory & factory) void registerInputFormatRowBinary(FormatFactory & factory)
{ {
factory.registerInputFormatProcessor("RowBinary", []( factory.registerInputFormat("RowBinary", [](
ReadBuffer & buf, ReadBuffer & buf,
const Block & sample, const Block & sample,
const IRowInputFormat::Params & params, const IRowInputFormat::Params & params,
@ -67,7 +67,7 @@ void registerInputFormatProcessorRowBinary(FormatFactory & factory)
return std::make_shared<BinaryRowInputFormat>(buf, sample, params, false, false); return std::make_shared<BinaryRowInputFormat>(buf, sample, params, false, false);
}); });
factory.registerInputFormatProcessor("RowBinaryWithNamesAndTypes", []( factory.registerInputFormat("RowBinaryWithNamesAndTypes", [](
ReadBuffer & buf, ReadBuffer & buf,
const Block & sample, const Block & sample,
const IRowInputFormat::Params & params, const IRowInputFormat::Params & params,

View File

@ -47,9 +47,9 @@ void BinaryRowOutputFormat::writeField(const IColumn & column, const ISerializat
} }
void registerOutputFormatProcessorRowBinary(FormatFactory & factory) void registerOutputFormatRowBinary(FormatFactory & factory)
{ {
factory.registerOutputFormatProcessor("RowBinary", []( factory.registerOutputFormat("RowBinary", [](
WriteBuffer & buf, WriteBuffer & buf,
const Block & sample, const Block & sample,
const RowOutputFormatParams & params, const RowOutputFormatParams & params,
@ -58,7 +58,7 @@ void registerOutputFormatProcessorRowBinary(FormatFactory & factory)
return std::make_shared<BinaryRowOutputFormat>(buf, sample, false, false, params); return std::make_shared<BinaryRowOutputFormat>(buf, sample, false, false, params);
}); });
factory.registerOutputFormatProcessor("RowBinaryWithNamesAndTypes", []( factory.registerOutputFormat("RowBinaryWithNamesAndTypes", [](
WriteBuffer & buf, WriteBuffer & buf,
const Block & sample, const Block & sample,
const RowOutputFormatParams & params, const RowOutputFormatParams & params,

View File

@ -420,11 +420,11 @@ void CSVRowInputFormat::resetParser()
} }
void registerInputFormatProcessorCSV(FormatFactory & factory) void registerInputFormatCSV(FormatFactory & factory)
{ {
for (bool with_names : {false, true}) for (bool with_names : {false, true})
{ {
factory.registerInputFormatProcessor(with_names ? "CSVWithNames" : "CSV", [=]( factory.registerInputFormat(with_names ? "CSVWithNames" : "CSV", [=](
ReadBuffer & buf, ReadBuffer & buf,
const Block & sample, const Block & sample,
IRowInputFormat::Params params, IRowInputFormat::Params params,

View File

@ -70,11 +70,11 @@ void CSVRowOutputFormat::writeBeforeExtremes()
} }
void registerOutputFormatProcessorCSV(FormatFactory & factory) void registerOutputFormatCSV(FormatFactory & factory)
{ {
for (bool with_names : {false, true}) for (bool with_names : {false, true})
{ {
factory.registerOutputFormatProcessor(with_names ? "CSVWithNames" : "CSV", [=]( factory.registerOutputFormat(with_names ? "CSVWithNames" : "CSV", [=](
WriteBuffer & buf, WriteBuffer & buf,
const Block & sample, const Block & sample,
const RowOutputFormatParams & params, const RowOutputFormatParams & params,

View File

@ -295,9 +295,9 @@ bool CapnProtoRowInputFormat::readRow(MutableColumns & columns, RowReadExtension
return true; return true;
} }
void registerInputFormatProcessorCapnProto(FormatFactory & factory) void registerInputFormatCapnProto(FormatFactory & factory)
{ {
factory.registerInputFormatProcessor( factory.registerInputFormat(
"CapnProto", "CapnProto",
[](ReadBuffer & buf, const Block & sample, IRowInputFormat::Params params, const FormatSettings & settings) [](ReadBuffer & buf, const Block & sample, IRowInputFormat::Params params, const FormatSettings & settings)
{ {
@ -314,7 +314,7 @@ void registerInputFormatProcessorCapnProto(FormatFactory & factory)
namespace DB namespace DB
{ {
class FormatFactory; class FormatFactory;
void registerInputFormatProcessorCapnProto(FormatFactory &) {} void registerInputFormatCapnProto(FormatFactory &) {}
} }
#endif // USE_CAPNP #endif // USE_CAPNP

View File

@ -171,9 +171,9 @@ bool JSONAsStringRowInputFormat::readRow(MutableColumns & columns, RowReadExtens
return !buf.eof(); return !buf.eof();
} }
void registerInputFormatProcessorJSONAsString(FormatFactory & factory) void registerInputFormatJSONAsString(FormatFactory & factory)
{ {
factory.registerInputFormatProcessor("JSONAsString", []( factory.registerInputFormat("JSONAsString", [](
ReadBuffer & buf, ReadBuffer & buf,
const Block & sample, const Block & sample,
const RowInputFormatParams & params, const RowInputFormatParams & params,

View File

@ -236,9 +236,9 @@ void JSONCompactEachRowRowInputFormat::syncAfterError()
skipToUnescapedNextLineOrEOF(*in); skipToUnescapedNextLineOrEOF(*in);
} }
void registerInputFormatProcessorJSONCompactEachRow(FormatFactory & factory) void registerInputFormatJSONCompactEachRow(FormatFactory & factory)
{ {
factory.registerInputFormatProcessor("JSONCompactEachRow", []( factory.registerInputFormat("JSONCompactEachRow", [](
ReadBuffer & buf, ReadBuffer & buf,
const Block & sample, const Block & sample,
IRowInputFormat::Params params, IRowInputFormat::Params params,
@ -247,7 +247,7 @@ void registerInputFormatProcessorJSONCompactEachRow(FormatFactory & factory)
return std::make_shared<JSONCompactEachRowRowInputFormat>(buf, sample, std::move(params), settings, false, false); return std::make_shared<JSONCompactEachRowRowInputFormat>(buf, sample, std::move(params), settings, false, false);
}); });
factory.registerInputFormatProcessor("JSONCompactEachRowWithNamesAndTypes", []( factory.registerInputFormat("JSONCompactEachRowWithNamesAndTypes", [](
ReadBuffer & buf, ReadBuffer & buf,
const Block & sample, const Block & sample,
IRowInputFormat::Params params, IRowInputFormat::Params params,
@ -256,7 +256,7 @@ void registerInputFormatProcessorJSONCompactEachRow(FormatFactory & factory)
return std::make_shared<JSONCompactEachRowRowInputFormat>(buf, sample, std::move(params), settings, true, false); return std::make_shared<JSONCompactEachRowRowInputFormat>(buf, sample, std::move(params), settings, true, false);
}); });
factory.registerInputFormatProcessor("JSONCompactStringsEachRow", []( factory.registerInputFormat("JSONCompactStringsEachRow", [](
ReadBuffer & buf, ReadBuffer & buf,
const Block & sample, const Block & sample,
IRowInputFormat::Params params, IRowInputFormat::Params params,
@ -265,7 +265,7 @@ void registerInputFormatProcessorJSONCompactEachRow(FormatFactory & factory)
return std::make_shared<JSONCompactEachRowRowInputFormat>(buf, sample, std::move(params), settings, false, true); return std::make_shared<JSONCompactEachRowRowInputFormat>(buf, sample, std::move(params), settings, false, true);
}); });
factory.registerInputFormatProcessor("JSONCompactStringsEachRowWithNamesAndTypes", []( factory.registerInputFormat("JSONCompactStringsEachRowWithNamesAndTypes", [](
ReadBuffer & buf, ReadBuffer & buf,
const Block & sample, const Block & sample,
IRowInputFormat::Params params, IRowInputFormat::Params params,

View File

@ -98,9 +98,9 @@ void JSONCompactEachRowRowOutputFormat::consumeTotals(DB::Chunk chunk)
IRowOutputFormat::consumeTotals(std::move(chunk)); IRowOutputFormat::consumeTotals(std::move(chunk));
} }
void registerOutputFormatProcessorJSONCompactEachRow(FormatFactory & factory) void registerOutputFormatJSONCompactEachRow(FormatFactory & factory)
{ {
factory.registerOutputFormatProcessor("JSONCompactEachRow", []( factory.registerOutputFormat("JSONCompactEachRow", [](
WriteBuffer & buf, WriteBuffer & buf,
const Block & sample, const Block & sample,
const RowOutputFormatParams & params, const RowOutputFormatParams & params,
@ -110,7 +110,7 @@ void registerOutputFormatProcessorJSONCompactEachRow(FormatFactory & factory)
}); });
factory.markOutputFormatSupportsParallelFormatting("JSONCompactEachRow"); factory.markOutputFormatSupportsParallelFormatting("JSONCompactEachRow");
factory.registerOutputFormatProcessor("JSONCompactEachRowWithNamesAndTypes", []( factory.registerOutputFormat("JSONCompactEachRowWithNamesAndTypes", [](
WriteBuffer &buf, WriteBuffer &buf,
const Block &sample, const Block &sample,
const RowOutputFormatParams & params, const RowOutputFormatParams & params,
@ -120,7 +120,7 @@ void registerOutputFormatProcessorJSONCompactEachRow(FormatFactory & factory)
}); });
factory.markOutputFormatSupportsParallelFormatting("JSONCompactEachRowWithNamesAndTypes"); factory.markOutputFormatSupportsParallelFormatting("JSONCompactEachRowWithNamesAndTypes");
factory.registerOutputFormatProcessor("JSONCompactStringsEachRow", []( factory.registerOutputFormat("JSONCompactStringsEachRow", [](
WriteBuffer & buf, WriteBuffer & buf,
const Block & sample, const Block & sample,
const RowOutputFormatParams & params, const RowOutputFormatParams & params,
@ -130,7 +130,7 @@ void registerOutputFormatProcessorJSONCompactEachRow(FormatFactory & factory)
}); });
factory.markOutputFormatSupportsParallelFormatting("JSONCompactStringsEachRow"); factory.markOutputFormatSupportsParallelFormatting("JSONCompactStringsEachRow");
factory.registerOutputFormatProcessor("JSONCompactStringsEachRowWithNamesAndTypes", []( factory.registerOutputFormat("JSONCompactStringsEachRowWithNamesAndTypes", [](
WriteBuffer &buf, WriteBuffer &buf,
const Block &sample, const Block &sample,
const RowOutputFormatParams & params, const RowOutputFormatParams & params,

View File

@ -88,9 +88,9 @@ void JSONCompactRowOutputFormat::writeExtremesElement(const char * title, const
writeChar(']', *ostr); writeChar(']', *ostr);
} }
void registerOutputFormatProcessorJSONCompact(FormatFactory & factory) void registerOutputFormatJSONCompact(FormatFactory & factory)
{ {
factory.registerOutputFormatProcessor("JSONCompact", []( factory.registerOutputFormat("JSONCompact", [](
WriteBuffer & buf, WriteBuffer & buf,
const Block & sample, const Block & sample,
const RowOutputFormatParams & params, const RowOutputFormatParams & params,
@ -99,7 +99,7 @@ void registerOutputFormatProcessorJSONCompact(FormatFactory & factory)
return std::make_shared<JSONCompactRowOutputFormat>(buf, sample, params, format_settings, false); return std::make_shared<JSONCompactRowOutputFormat>(buf, sample, params, format_settings, false);
}); });
factory.registerOutputFormatProcessor("JSONCompactStrings", []( factory.registerOutputFormat("JSONCompactStrings", [](
WriteBuffer & buf, WriteBuffer & buf,
const Block & sample, const Block & sample,
const RowOutputFormatParams & params, const RowOutputFormatParams & params,

View File

@ -332,9 +332,9 @@ void JSONEachRowRowInputFormat::readSuffix()
} }
void registerInputFormatProcessorJSONEachRow(FormatFactory & factory) void registerInputFormatJSONEachRow(FormatFactory & factory)
{ {
factory.registerInputFormatProcessor("JSONEachRow", []( factory.registerInputFormat("JSONEachRow", [](
ReadBuffer & buf, ReadBuffer & buf,
const Block & sample, const Block & sample,
IRowInputFormat::Params params, IRowInputFormat::Params params,
@ -343,7 +343,7 @@ void registerInputFormatProcessorJSONEachRow(FormatFactory & factory)
return std::make_shared<JSONEachRowRowInputFormat>(buf, sample, std::move(params), settings, false); return std::make_shared<JSONEachRowRowInputFormat>(buf, sample, std::move(params), settings, false);
}); });
factory.registerInputFormatProcessor("JSONStringsEachRow", []( factory.registerInputFormat("JSONStringsEachRow", [](
ReadBuffer & buf, ReadBuffer & buf,
const Block & sample, const Block & sample,
IRowInputFormat::Params params, IRowInputFormat::Params params,

View File

@ -125,9 +125,9 @@ void JSONEachRowRowOutputFormat::writeSuffix()
} }
void registerOutputFormatProcessorJSONEachRow(FormatFactory & factory) void registerOutputFormatJSONEachRow(FormatFactory & factory)
{ {
factory.registerOutputFormatProcessor("JSONEachRow", []( factory.registerOutputFormat("JSONEachRow", [](
WriteBuffer & buf, WriteBuffer & buf,
const Block & sample, const Block & sample,
const RowOutputFormatParams & params, const RowOutputFormatParams & params,
@ -140,7 +140,7 @@ void registerOutputFormatProcessorJSONEachRow(FormatFactory & factory)
}); });
factory.markOutputFormatSupportsParallelFormatting("JSONEachRow"); factory.markOutputFormatSupportsParallelFormatting("JSONEachRow");
factory.registerOutputFormatProcessor("JSONStringsEachRow", []( factory.registerOutputFormat("JSONStringsEachRow", [](
WriteBuffer & buf, WriteBuffer & buf,
const Block & sample, const Block & sample,
const RowOutputFormatParams & params, const RowOutputFormatParams & params,

View File

@ -28,9 +28,9 @@ void JSONEachRowWithProgressRowOutputFormat::onProgress(const Progress & value)
} }
void registerOutputFormatProcessorJSONEachRowWithProgress(FormatFactory & factory) void registerOutputFormatJSONEachRowWithProgress(FormatFactory & factory)
{ {
factory.registerOutputFormatProcessor("JSONEachRowWithProgress", []( factory.registerOutputFormat("JSONEachRowWithProgress", [](
WriteBuffer & buf, WriteBuffer & buf,
const Block & sample, const Block & sample,
const RowOutputFormatParams & params, const RowOutputFormatParams & params,
@ -42,7 +42,7 @@ void registerOutputFormatProcessorJSONEachRowWithProgress(FormatFactory & factor
sample, params, settings); sample, params, settings);
}); });
factory.registerOutputFormatProcessor("JSONStringsEachRowWithProgress", []( factory.registerOutputFormat("JSONStringsEachRowWithProgress", [](
WriteBuffer & buf, WriteBuffer & buf,
const Block & sample, const Block & sample,
const RowOutputFormatParams & params, const RowOutputFormatParams & params,

View File

@ -266,9 +266,9 @@ void JSONRowOutputFormat::onProgress(const Progress & value)
} }
void registerOutputFormatProcessorJSON(FormatFactory & factory) void registerOutputFormatJSON(FormatFactory & factory)
{ {
factory.registerOutputFormatProcessor("JSON", []( factory.registerOutputFormat("JSON", [](
WriteBuffer & buf, WriteBuffer & buf,
const Block & sample, const Block & sample,
const RowOutputFormatParams & params, const RowOutputFormatParams & params,
@ -277,7 +277,7 @@ void registerOutputFormatProcessorJSON(FormatFactory & factory)
return std::make_shared<JSONRowOutputFormat>(buf, sample, params, format_settings, false); return std::make_shared<JSONRowOutputFormat>(buf, sample, params, format_settings, false);
}); });
factory.registerOutputFormatProcessor("JSONStrings", []( factory.registerOutputFormat("JSONStrings", [](
WriteBuffer & buf, WriteBuffer & buf,
const Block & sample, const Block & sample,
const RowOutputFormatParams & params, const RowOutputFormatParams & params,

View File

@ -61,9 +61,9 @@ bool LineAsStringRowInputFormat::readRow(MutableColumns & columns, RowReadExtens
return true; return true;
} }
void registerInputFormatProcessorLineAsString(FormatFactory & factory) void registerInputFormatLineAsString(FormatFactory & factory)
{ {
factory.registerInputFormatProcessor("LineAsString", []( factory.registerInputFormat("LineAsString", [](
ReadBuffer & buf, ReadBuffer & buf,
const Block & sample, const Block & sample,
const RowInputFormatParams & params, const RowInputFormatParams & params,

View File

@ -55,9 +55,9 @@ void MarkdownRowOutputFormat::writeField(const IColumn & column, const ISerializ
serialization.serializeTextEscaped(column, row_num, out, format_settings); serialization.serializeTextEscaped(column, row_num, out, format_settings);
} }
void registerOutputFormatProcessorMarkdown(FormatFactory & factory) void registerOutputFormatMarkdown(FormatFactory & factory)
{ {
factory.registerOutputFormatProcessor("Markdown", []( factory.registerOutputFormat("Markdown", [](
WriteBuffer & buf, WriteBuffer & buf,
const Block & sample, const Block & sample,
const RowOutputFormatParams & params, const RowOutputFormatParams & params,

View File

@ -363,9 +363,9 @@ bool MsgPackRowInputFormat::readRow(MutableColumns & columns, RowReadExtension &
return true; return true;
} }
void registerInputFormatProcessorMsgPack(FormatFactory & factory) void registerInputFormatMsgPack(FormatFactory & factory)
{ {
factory.registerInputFormatProcessor("MsgPack", []( factory.registerInputFormat("MsgPack", [](
ReadBuffer & buf, ReadBuffer & buf,
const Block & sample, const Block & sample,
const RowInputFormatParams & params, const RowInputFormatParams & params,
@ -382,7 +382,7 @@ void registerInputFormatProcessorMsgPack(FormatFactory & factory)
namespace DB namespace DB
{ {
class FormatFactory; class FormatFactory;
void registerInputFormatProcessorMsgPack(FormatFactory &) void registerInputFormatMsgPack(FormatFactory &)
{ {
} }
} }

View File

@ -180,10 +180,10 @@ void MsgPackRowOutputFormat::write(const Columns & columns, size_t row_num)
} }
void registerOutputFormatProcessorMsgPack(FormatFactory & factory) void registerOutputFormatMsgPack(FormatFactory & factory)
{ {
factory.registerOutputFormatProcessor("MsgPack", []( factory.registerOutputFormat("MsgPack", [](
WriteBuffer & buf, WriteBuffer & buf,
const Block & sample, const Block & sample,
const RowOutputFormatParams & params, const RowOutputFormatParams & params,
@ -200,7 +200,7 @@ void registerOutputFormatProcessorMsgPack(FormatFactory & factory)
namespace DB namespace DB
{ {
class FormatFactory; class FormatFactory;
void registerOutputFormatProcessorMsgPack(FormatFactory &) void registerOutputFormatMsgPack(FormatFactory &)
{ {
} }
} }

View File

@ -105,9 +105,9 @@ void MySQLOutputFormat::flush()
packet_endpoint->out->next(); packet_endpoint->out->next();
} }
void registerOutputFormatProcessorMySQLWire(FormatFactory & factory) void registerOutputFormatMySQLWire(FormatFactory & factory)
{ {
factory.registerOutputFormatProcessor( factory.registerOutputFormat(
"MySQLWire", "MySQLWire",
[](WriteBuffer & buf, [](WriteBuffer & buf,
const Block & sample, const Block & sample,

View File

@ -16,9 +16,9 @@ protected:
void consume(Chunk) override {} void consume(Chunk) override {}
}; };
void registerOutputFormatProcessorNull(FormatFactory & factory) void registerOutputFormatNull(FormatFactory & factory)
{ {
factory.registerOutputFormatProcessor("Null", []( factory.registerOutputFormat("Null", [](
WriteBuffer & buf, WriteBuffer & buf,
const Block & sample, const Block & sample,
const RowOutputFormatParams &, const RowOutputFormatParams &,

View File

@ -110,9 +110,9 @@ void ODBCDriver2BlockOutputFormat::writePrefix()
} }
void registerOutputFormatProcessorODBCDriver2(FormatFactory & factory) void registerOutputFormatODBCDriver2(FormatFactory & factory)
{ {
factory.registerOutputFormatProcessor( factory.registerOutputFormat(
"ODBCDriver2", [](WriteBuffer & buf, const Block & sample, const RowOutputFormatParams &, const FormatSettings & format_settings) "ODBCDriver2", [](WriteBuffer & buf, const Block & sample, const RowOutputFormatParams &, const FormatSettings & format_settings)
{ {
return std::make_shared<ODBCDriver2BlockOutputFormat>(buf, sample, format_settings); return std::make_shared<ODBCDriver2BlockOutputFormat>(buf, sample, format_settings);

View File

@ -124,9 +124,9 @@ void ORCBlockInputFormat::prepareReader()
} }
} }
void registerInputFormatProcessorORC(FormatFactory &factory) void registerInputFormatORC(FormatFactory &factory)
{ {
factory.registerInputFormatProcessor( factory.registerInputFormat(
"ORC", "ORC",
[](ReadBuffer &buf, [](ReadBuffer &buf,
const Block &sample, const Block &sample,
@ -144,7 +144,7 @@ void registerInputFormatProcessorORC(FormatFactory &factory)
namespace DB namespace DB
{ {
class FormatFactory; class FormatFactory;
void registerInputFormatProcessorORC(FormatFactory &) void registerInputFormatORC(FormatFactory &)
{ {
} }
} }

View File

@ -514,9 +514,9 @@ void ORCBlockOutputFormat::prepareWriter()
writer = orc::createWriter(*schema, &output_stream, options); writer = orc::createWriter(*schema, &output_stream, options);
} }
void registerOutputFormatProcessorORC(FormatFactory & factory) void registerOutputFormatORC(FormatFactory & factory)
{ {
factory.registerOutputFormatProcessor("ORC", []( factory.registerOutputFormat("ORC", [](
WriteBuffer & buf, WriteBuffer & buf,
const Block & sample, const Block & sample,
const RowOutputFormatParams &, const RowOutputFormatParams &,
@ -533,7 +533,7 @@ void registerOutputFormatProcessorORC(FormatFactory & factory)
namespace DB namespace DB
{ {
class FormatFactory; class FormatFactory;
void registerOutputFormatProcessorORC(FormatFactory &) void registerOutputFormatORC(FormatFactory &)
{ {
} }
} }

View File

@ -123,9 +123,9 @@ void ParquetBlockInputFormat::prepareReader()
} }
} }
void registerInputFormatProcessorParquet(FormatFactory &factory) void registerInputFormatParquet(FormatFactory &factory)
{ {
factory.registerInputFormatProcessor( factory.registerInputFormat(
"Parquet", "Parquet",
[](ReadBuffer &buf, [](ReadBuffer &buf,
const Block &sample, const Block &sample,
@ -144,7 +144,7 @@ void registerInputFormatProcessorParquet(FormatFactory &factory)
namespace DB namespace DB
{ {
class FormatFactory; class FormatFactory;
void registerInputFormatProcessorParquet(FormatFactory &) void registerInputFormatParquet(FormatFactory &)
{ {
} }
} }

View File

@ -74,9 +74,9 @@ void ParquetBlockOutputFormat::finalize()
throw Exception{"Error while closing a table: " + status.ToString(), ErrorCodes::UNKNOWN_EXCEPTION}; throw Exception{"Error while closing a table: " + status.ToString(), ErrorCodes::UNKNOWN_EXCEPTION};
} }
void registerOutputFormatProcessorParquet(FormatFactory & factory) void registerOutputFormatParquet(FormatFactory & factory)
{ {
factory.registerOutputFormatProcessor( factory.registerOutputFormat(
"Parquet", "Parquet",
[](WriteBuffer & buf, [](WriteBuffer & buf,
const Block & sample, const Block & sample,
@ -94,7 +94,7 @@ void registerOutputFormatProcessorParquet(FormatFactory & factory)
namespace DB namespace DB
{ {
class FormatFactory; class FormatFactory;
void registerOutputFormatProcessorParquet(FormatFactory &) void registerOutputFormatParquet(FormatFactory &)
{ {
} }
} }

View File

@ -68,9 +68,9 @@ void PostgreSQLOutputFormat::flush()
message_transport.flush(); message_transport.flush();
} }
void registerOutputFormatProcessorPostgreSQLWire(FormatFactory & factory) void registerOutputFormatPostgreSQLWire(FormatFactory & factory)
{ {
factory.registerOutputFormatProcessor( factory.registerOutputFormat(
"PostgreSQLWire", "PostgreSQLWire",
[](WriteBuffer & buf, [](WriteBuffer & buf,
const Block & sample, const Block & sample,

View File

@ -407,9 +407,9 @@ void PrettyBlockOutputFormat::finalize()
} }
void registerOutputFormatProcessorPretty(FormatFactory & factory) void registerOutputFormatPretty(FormatFactory & factory)
{ {
factory.registerOutputFormatProcessor("Pretty", []( factory.registerOutputFormat("Pretty", [](
WriteBuffer & buf, WriteBuffer & buf,
const Block & sample, const Block & sample,
const RowOutputFormatParams &, const RowOutputFormatParams &,
@ -418,7 +418,7 @@ void registerOutputFormatProcessorPretty(FormatFactory & factory)
return std::make_shared<PrettyBlockOutputFormat>(buf, sample, format_settings); return std::make_shared<PrettyBlockOutputFormat>(buf, sample, format_settings);
}); });
factory.registerOutputFormatProcessor("PrettyNoEscapes", []( factory.registerOutputFormat("PrettyNoEscapes", [](
WriteBuffer & buf, WriteBuffer & buf,
const Block & sample, const Block & sample,
const RowOutputFormatParams &, const RowOutputFormatParams &,

View File

@ -255,11 +255,11 @@ void PrettyCompactBlockOutputFormat::writeChunk(const Chunk & chunk, PortKind po
} }
void registerOutputFormatProcessorPrettyCompact(FormatFactory & factory) void registerOutputFormatPrettyCompact(FormatFactory & factory)
{ {
for (const auto & [name, mono_block] : {std::make_pair("PrettyCompact", false), std::make_pair("PrettyCompactMonoBlock", true)}) for (const auto & [name, mono_block] : {std::make_pair("PrettyCompact", false), std::make_pair("PrettyCompactMonoBlock", true)})
{ {
factory.registerOutputFormatProcessor(name, [mono_block = mono_block]( factory.registerOutputFormat(name, [mono_block = mono_block](
WriteBuffer & buf, WriteBuffer & buf,
const Block & sample, const Block & sample,
const RowOutputFormatParams &, const RowOutputFormatParams &,
@ -269,7 +269,7 @@ void registerOutputFormatProcessorPrettyCompact(FormatFactory & factory)
}); });
} }
factory.registerOutputFormatProcessor("PrettyCompactNoEscapes", []( factory.registerOutputFormat("PrettyCompactNoEscapes", [](
WriteBuffer & buf, WriteBuffer & buf,
const Block & sample, const Block & sample,
const RowOutputFormatParams &, const RowOutputFormatParams &,

View File

@ -113,9 +113,9 @@ void PrettySpaceBlockOutputFormat::writeSuffix()
} }
void registerOutputFormatProcessorPrettySpace(FormatFactory & factory) void registerOutputFormatPrettySpace(FormatFactory & factory)
{ {
factory.registerOutputFormatProcessor("PrettySpace", []( factory.registerOutputFormat("PrettySpace", [](
WriteBuffer & buf, WriteBuffer & buf,
const Block & sample, const Block & sample,
const RowOutputFormatParams &, const RowOutputFormatParams &,
@ -124,7 +124,7 @@ void registerOutputFormatProcessorPrettySpace(FormatFactory & factory)
return std::make_shared<PrettySpaceBlockOutputFormat>(buf, sample, format_settings); return std::make_shared<PrettySpaceBlockOutputFormat>(buf, sample, format_settings);
}); });
factory.registerOutputFormatProcessor("PrettySpaceNoEscapes", []( factory.registerOutputFormat("PrettySpaceNoEscapes", [](
WriteBuffer & buf, WriteBuffer & buf,
const Block & sample, const Block & sample,
const RowOutputFormatParams &, const RowOutputFormatParams &,

View File

@ -56,11 +56,11 @@ void ProtobufRowInputFormat::syncAfterError()
reader->endMessage(true); reader->endMessage(true);
} }
void registerInputFormatProcessorProtobuf(FormatFactory & factory) void registerInputFormatProtobuf(FormatFactory & factory)
{ {
for (bool with_length_delimiter : {false, true}) for (bool with_length_delimiter : {false, true})
{ {
factory.registerInputFormatProcessor(with_length_delimiter ? "Protobuf" : "ProtobufSingle", [with_length_delimiter]( factory.registerInputFormat(with_length_delimiter ? "Protobuf" : "ProtobufSingle", [with_length_delimiter](
ReadBuffer & buf, ReadBuffer & buf,
const Block & sample, const Block & sample,
IRowInputFormat::Params params, IRowInputFormat::Params params,
@ -81,7 +81,7 @@ void registerInputFormatProcessorProtobuf(FormatFactory & factory)
namespace DB namespace DB
{ {
class FormatFactory; class FormatFactory;
void registerInputFormatProcessorProtobuf(FormatFactory &) {} void registerInputFormatProtobuf(FormatFactory &) {}
} }
#endif #endif

View File

@ -51,11 +51,11 @@ void ProtobufRowOutputFormat::write(const Columns & columns, size_t row_num)
} }
void registerOutputFormatProcessorProtobuf(FormatFactory & factory) void registerOutputFormatProtobuf(FormatFactory & factory)
{ {
for (bool with_length_delimiter : {false, true}) for (bool with_length_delimiter : {false, true})
{ {
factory.registerOutputFormatProcessor( factory.registerOutputFormat(
with_length_delimiter ? "Protobuf" : "ProtobufSingle", with_length_delimiter ? "Protobuf" : "ProtobufSingle",
[with_length_delimiter](WriteBuffer & buf, [with_length_delimiter](WriteBuffer & buf,
const Block & header, const Block & header,
@ -80,7 +80,7 @@ void registerOutputFormatProcessorProtobuf(FormatFactory & factory)
namespace DB namespace DB
{ {
class FormatFactory; class FormatFactory;
void registerOutputFormatProcessorProtobuf(FormatFactory &) {} void registerOutputFormatProtobuf(FormatFactory &) {}
} }
#endif #endif

View File

@ -39,9 +39,9 @@ bool RawBLOBRowInputFormat::readRow(MutableColumns & columns, RowReadExtension &
return false; return false;
} }
void registerInputFormatProcessorRawBLOB(FormatFactory & factory) void registerInputFormatRawBLOB(FormatFactory & factory)
{ {
factory.registerInputFormatProcessor("RawBLOB", []( factory.registerInputFormat("RawBLOB", [](
ReadBuffer & buf, ReadBuffer & buf,
const Block & sample, const Block & sample,
const RowInputFormatParams & params, const RowInputFormatParams & params,

View File

@ -22,9 +22,9 @@ void RawBLOBRowOutputFormat::writeField(const IColumn & column, const ISerializa
} }
void registerOutputFormatProcessorRawBLOB(FormatFactory & factory) void registerOutputFormatRawBLOB(FormatFactory & factory)
{ {
factory.registerOutputFormatProcessor("RawBLOB", []( factory.registerOutputFormat("RawBLOB", [](
WriteBuffer & buf, WriteBuffer & buf,
const Block & sample, const Block & sample,
const RowOutputFormatParams & params, const RowOutputFormatParams & params,

View File

@ -163,9 +163,9 @@ bool RegexpRowInputFormat::readRow(MutableColumns & columns, RowReadExtension &
return true; return true;
} }
void registerInputFormatProcessorRegexp(FormatFactory & factory) void registerInputFormatRegexp(FormatFactory & factory)
{ {
factory.registerInputFormatProcessor("Regexp", []( factory.registerInputFormat("Regexp", [](
ReadBuffer & buf, ReadBuffer & buf,
const Block & sample, const Block & sample,
IRowInputFormat::Params params, IRowInputFormat::Params params,

View File

@ -210,9 +210,9 @@ void TSKVRowInputFormat::resetParser()
name_buf.clear(); name_buf.clear();
} }
void registerInputFormatProcessorTSKV(FormatFactory & factory) void registerInputFormatTSKV(FormatFactory & factory)
{ {
factory.registerInputFormatProcessor("TSKV", []( factory.registerInputFormat("TSKV", [](
ReadBuffer & buf, ReadBuffer & buf,
const Block & sample, const Block & sample,
IRowInputFormat::Params params, IRowInputFormat::Params params,

View File

@ -39,9 +39,9 @@ void TSKVRowOutputFormat::writeRowEndDelimiter()
} }
void registerOutputFormatProcessorTSKV(FormatFactory & factory) void registerOutputFormatTSKV(FormatFactory & factory)
{ {
factory.registerOutputFormatProcessor("TSKV", []( factory.registerOutputFormat("TSKV", [](
WriteBuffer & buf, WriteBuffer & buf,
const Block & sample, const Block & sample,
const RowOutputFormatParams & params, const RowOutputFormatParams & params,

View File

@ -383,11 +383,11 @@ void TabSeparatedRowInputFormat::resetParser()
columns_to_fill_with_default_values.clear(); columns_to_fill_with_default_values.clear();
} }
void registerInputFormatProcessorTabSeparated(FormatFactory & factory) void registerInputFormatTabSeparated(FormatFactory & factory)
{ {
for (const auto * name : {"TabSeparated", "TSV"}) for (const auto * name : {"TabSeparated", "TSV"})
{ {
factory.registerInputFormatProcessor(name, []( factory.registerInputFormat(name, [](
ReadBuffer & buf, ReadBuffer & buf,
const Block & sample, const Block & sample,
IRowInputFormat::Params params, IRowInputFormat::Params params,
@ -399,7 +399,7 @@ void registerInputFormatProcessorTabSeparated(FormatFactory & factory)
for (const auto * name : {"TabSeparatedRaw", "TSVRaw"}) for (const auto * name : {"TabSeparatedRaw", "TSVRaw"})
{ {
factory.registerInputFormatProcessor(name, []( factory.registerInputFormat(name, [](
ReadBuffer & buf, ReadBuffer & buf,
const Block & sample, const Block & sample,
IRowInputFormat::Params params, IRowInputFormat::Params params,
@ -411,7 +411,7 @@ void registerInputFormatProcessorTabSeparated(FormatFactory & factory)
for (const auto * name : {"TabSeparatedWithNames", "TSVWithNames"}) for (const auto * name : {"TabSeparatedWithNames", "TSVWithNames"})
{ {
factory.registerInputFormatProcessor(name, []( factory.registerInputFormat(name, [](
ReadBuffer & buf, ReadBuffer & buf,
const Block & sample, const Block & sample,
IRowInputFormat::Params params, IRowInputFormat::Params params,
@ -423,7 +423,7 @@ void registerInputFormatProcessorTabSeparated(FormatFactory & factory)
for (const auto * name : {"TabSeparatedWithNamesAndTypes", "TSVWithNamesAndTypes"}) for (const auto * name : {"TabSeparatedWithNamesAndTypes", "TSVWithNamesAndTypes"})
{ {
factory.registerInputFormatProcessor(name, []( factory.registerInputFormat(name, [](
ReadBuffer & buf, ReadBuffer & buf,
const Block & sample, const Block & sample,
IRowInputFormat::Params params, IRowInputFormat::Params params,

View File

@ -73,11 +73,11 @@ void TabSeparatedRowOutputFormat::writeBeforeExtremes()
} }
void registerOutputFormatProcessorTabSeparated(FormatFactory & factory) void registerOutputFormatTabSeparated(FormatFactory & factory)
{ {
for (const auto * name : {"TabSeparated", "TSV"}) for (const auto * name : {"TabSeparated", "TSV"})
{ {
factory.registerOutputFormatProcessor(name, []( factory.registerOutputFormat(name, [](
WriteBuffer & buf, WriteBuffer & buf,
const Block & sample, const Block & sample,
const RowOutputFormatParams & params, const RowOutputFormatParams & params,
@ -90,7 +90,7 @@ void registerOutputFormatProcessorTabSeparated(FormatFactory & factory)
for (const auto * name : {"TabSeparatedRaw", "TSVRaw"}) for (const auto * name : {"TabSeparatedRaw", "TSVRaw"})
{ {
factory.registerOutputFormatProcessor(name, []( factory.registerOutputFormat(name, [](
WriteBuffer & buf, WriteBuffer & buf,
const Block & sample, const Block & sample,
const RowOutputFormatParams & params, const RowOutputFormatParams & params,
@ -103,7 +103,7 @@ void registerOutputFormatProcessorTabSeparated(FormatFactory & factory)
for (const auto * name : {"TabSeparatedWithNames", "TSVWithNames"}) for (const auto * name : {"TabSeparatedWithNames", "TSVWithNames"})
{ {
factory.registerOutputFormatProcessor(name, []( factory.registerOutputFormat(name, [](
WriteBuffer & buf, WriteBuffer & buf,
const Block & sample, const Block & sample,
const RowOutputFormatParams & params, const RowOutputFormatParams & params,
@ -116,7 +116,7 @@ void registerOutputFormatProcessorTabSeparated(FormatFactory & factory)
for (const auto * name : {"TabSeparatedWithNamesAndTypes", "TSVWithNamesAndTypes"}) for (const auto * name : {"TabSeparatedWithNamesAndTypes", "TSVWithNamesAndTypes"})
{ {
factory.registerOutputFormatProcessor(name, []( factory.registerOutputFormat(name, [](
WriteBuffer & buf, WriteBuffer & buf,
const Block & sample, const Block & sample,
const RowOutputFormatParams & params, const RowOutputFormatParams & params,

View File

@ -227,9 +227,9 @@ void TemplateBlockOutputFormat::finalize()
} }
void registerOutputFormatProcessorTemplate(FormatFactory & factory) void registerOutputFormatTemplate(FormatFactory & factory)
{ {
factory.registerOutputFormatProcessor("Template", []( factory.registerOutputFormat("Template", [](
WriteBuffer & buf, WriteBuffer & buf,
const Block & sample, const Block & sample,
const RowOutputFormatParams &, const RowOutputFormatParams &,
@ -267,7 +267,7 @@ void registerOutputFormatProcessorTemplate(FormatFactory & factory)
return std::make_shared<TemplateBlockOutputFormat>(sample, buf, settings, resultset_format, row_format, settings.template_settings.row_between_delimiter); return std::make_shared<TemplateBlockOutputFormat>(sample, buf, settings, resultset_format, row_format, settings.template_settings.row_between_delimiter);
}); });
factory.registerOutputFormatProcessor("CustomSeparated", []( factory.registerOutputFormat("CustomSeparated", [](
WriteBuffer & buf, WriteBuffer & buf,
const Block & sample, const Block & sample,
const RowOutputFormatParams &, const RowOutputFormatParams &,

View File

@ -503,11 +503,11 @@ void TemplateRowInputFormat::resetParser()
buf.reset(); buf.reset();
} }
void registerInputFormatProcessorTemplate(FormatFactory & factory) void registerInputFormatTemplate(FormatFactory & factory)
{ {
for (bool ignore_spaces : {false, true}) for (bool ignore_spaces : {false, true})
{ {
factory.registerInputFormatProcessor(ignore_spaces ? "TemplateIgnoreSpaces" : "Template", [=]( factory.registerInputFormat(ignore_spaces ? "TemplateIgnoreSpaces" : "Template", [=](
ReadBuffer & buf, ReadBuffer & buf,
const Block & sample, const Block & sample,
IRowInputFormat::Params params, IRowInputFormat::Params params,
@ -551,7 +551,7 @@ void registerInputFormatProcessorTemplate(FormatFactory & factory)
for (bool ignore_spaces : {false, true}) for (bool ignore_spaces : {false, true})
{ {
factory.registerInputFormatProcessor(ignore_spaces ? "CustomSeparatedIgnoreSpaces" : "CustomSeparated", [=]( factory.registerInputFormat(ignore_spaces ? "CustomSeparatedIgnoreSpaces" : "CustomSeparated", [=](
ReadBuffer & buf, ReadBuffer & buf,
const Block & sample, const Block & sample,
IRowInputFormat::Params params, IRowInputFormat::Params params,

View File

@ -543,9 +543,9 @@ void ValuesBlockInputFormat::resetParser()
total_rows = 0; total_rows = 0;
} }
void registerInputFormatProcessorValues(FormatFactory & factory) void registerInputFormatValues(FormatFactory & factory)
{ {
factory.registerInputFormatProcessor("Values", []( factory.registerInputFormat("Values", [](
ReadBuffer & buf, ReadBuffer & buf,
const Block & header, const Block & header,
const RowInputFormatParams & params, const RowInputFormatParams & params,

View File

@ -41,9 +41,9 @@ void ValuesRowOutputFormat::writeRowBetweenDelimiter()
} }
void registerOutputFormatProcessorValues(FormatFactory & factory) void registerOutputFormatValues(FormatFactory & factory)
{ {
factory.registerOutputFormatProcessor("Values", []( factory.registerOutputFormat("Values", [](
WriteBuffer & buf, WriteBuffer & buf,
const Block & sample, const Block & sample,
const RowOutputFormatParams & params, const RowOutputFormatParams & params,

View File

@ -161,9 +161,9 @@ void VerticalRowOutputFormat::writeSpecialRow(const Columns & columns, size_t ro
} }
} }
void registerOutputFormatProcessorVertical(FormatFactory & factory) void registerOutputFormatVertical(FormatFactory & factory)
{ {
factory.registerOutputFormatProcessor("Vertical", []( factory.registerOutputFormat("Vertical", [](
WriteBuffer & buf, WriteBuffer & buf,
const Block & sample, const Block & sample,
const RowOutputFormatParams & params, const RowOutputFormatParams & params,

View File

@ -240,9 +240,9 @@ void XMLRowOutputFormat::writeStatistics()
} }
void registerOutputFormatProcessorXML(FormatFactory & factory) void registerOutputFormatXML(FormatFactory & factory)
{ {
factory.registerOutputFormatProcessor("XML", []( factory.registerOutputFormat("XML", [](
WriteBuffer & buf, WriteBuffer & buf,
const Block & sample, const Block & sample,
const RowOutputFormatParams & params, const RowOutputFormatParams & params,

View File

@ -1,7 +1,6 @@
#include <Parsers/ASTInsertQuery.h> #include <Parsers/ASTInsertQuery.h>
#include <Interpreters/Context.h> #include <Interpreters/Context.h>
#include <Interpreters/InterpreterSetQuery.h> #include <Interpreters/InterpreterSetQuery.h>
#include <Formats/FormatFactory.h>
#include <IO/ConcatReadBuffer.h> #include <IO/ConcatReadBuffer.h>
#include <IO/ReadBufferFromMemory.h> #include <IO/ReadBufferFromMemory.h>
#include <IO/ReadBufferFromFile.h> #include <IO/ReadBufferFromFile.h>
@ -60,7 +59,7 @@ InputFormatPtr getInputFormatFromASTInsertQuery(
: std::make_unique<EmptyReadBuffer>(); : std::make_unique<EmptyReadBuffer>();
/// Create a source from input buffer using format from query /// Create a source from input buffer using format from query
auto source = FormatFactory::instance().getInput(format, *input_buffer, header, context, context->getSettings().max_insert_block_size); auto source = context->getInputFormat(format, *input_buffer, header, context->getSettings().max_insert_block_size);
source->addBuffer(std::move(input_buffer)); source->addBuffer(std::move(input_buffer));
return source; return source;
} }

View File

@ -27,10 +27,10 @@
#include <Processors/Executors/PushingPipelineExecutor.h> #include <Processors/Executors/PushingPipelineExecutor.h>
#include <Processors/Executors/CompletedPipelineExecutor.h> #include <Processors/Executors/CompletedPipelineExecutor.h>
#include <Processors/Formats/IInputFormat.h> #include <Processors/Formats/IInputFormat.h>
#include <Processors/Formats/IOutputFormat.h>
#include <Processors/Sinks/SinkToStorage.h> #include <Processors/Sinks/SinkToStorage.h>
#include <Processors/Sinks/EmptySink.h> #include <Processors/Sinks/EmptySink.h>
#include <Processors/QueryPipelineBuilder.h> #include <Processors/QueryPipelineBuilder.h>
#include <Formats/FormatFactory.h>
#include <Server/IServer.h> #include <Server/IServer.h>
#include <Storages/IStorage.h> #include <Storages/IStorage.h>
#include <Poco/FileStream.h> #include <Poco/FileStream.h>
@ -634,7 +634,7 @@ namespace
std::optional<WriteBufferFromString> write_buffer; std::optional<WriteBufferFromString> write_buffer;
std::unique_ptr<QueryPipeline> pipeline; std::unique_ptr<QueryPipeline> pipeline;
std::unique_ptr<PullingPipelineExecutor> pipeline_executor; std::unique_ptr<PullingPipelineExecutor> pipeline_executor;
BlockOutputStreamPtr block_output_stream; std::shared_ptr<IOutputFormat> output_format_processor;
bool need_input_data_from_insert_query = true; bool need_input_data_from_insert_query = true;
bool need_input_data_from_query_info = true; bool need_input_data_from_query_info = true;
bool need_input_data_delimiter = false; bool need_input_data_delimiter = false;
@ -943,8 +943,8 @@ namespace
}); });
assert(!pipeline); assert(!pipeline);
auto source = FormatFactory::instance().getInput( auto source = query_context->getInputFormat(
input_format, *read_buffer, header, query_context, query_context->getSettings().max_insert_block_size); input_format, *read_buffer, header, query_context->getSettings().max_insert_block_size);
QueryPipelineBuilder builder; QueryPipelineBuilder builder;
builder.init(Pipe(source)); builder.init(Pipe(source));
@ -1030,9 +1030,9 @@ namespace
external_table_context->checkSettingsConstraints(settings_changes); external_table_context->checkSettingsConstraints(settings_changes);
external_table_context->applySettingsChanges(settings_changes); external_table_context->applySettingsChanges(settings_changes);
} }
auto in = FormatFactory::instance().getInput( auto in = external_table_context->getInputFormat(
format, data, metadata_snapshot->getSampleBlock(), format, data, metadata_snapshot->getSampleBlock(),
external_table_context, external_table_context->getSettings().max_insert_block_size); external_table_context->getSettings().max_insert_block_size);
QueryPipelineBuilder cur_pipeline; QueryPipelineBuilder cur_pipeline;
cur_pipeline.init(Pipe(std::move(in))); cur_pipeline.init(Pipe(std::move(in)));
@ -1086,8 +1086,8 @@ namespace
header = io.pipeline.getHeader(); header = io.pipeline.getHeader();
write_buffer.emplace(*result.mutable_output()); write_buffer.emplace(*result.mutable_output());
block_output_stream = query_context->getOutputStream(output_format, *write_buffer, header); output_format_processor = query_context->getOutputFormat(output_format, *write_buffer, header);
block_output_stream->writePrefix(); output_format_processor->doWritePrefix();
Stopwatch after_send_progress; Stopwatch after_send_progress;
/// Unless the input() function is used we are not going to receive input data anymore. /// Unless the input() function is used we are not going to receive input data anymore.
@ -1118,7 +1118,7 @@ namespace
break; break;
if (block && !io.null_format) if (block && !io.null_format)
block_output_stream->write(block); output_format_processor->write(block);
if (after_send_progress.elapsedMicroseconds() >= interactive_delay) if (after_send_progress.elapsedMicroseconds() >= interactive_delay)
{ {
@ -1166,7 +1166,7 @@ namespace
executor->execute(); executor->execute();
} }
block_output_stream->writeSuffix(); output_format_processor->doWriteSuffix();
} }
void Call::finishQuery() void Call::finishQuery()
@ -1240,7 +1240,7 @@ namespace
responder.reset(); responder.reset();
pipeline_executor.reset(); pipeline_executor.reset();
pipeline.reset(); pipeline.reset();
block_output_stream.reset(); output_format_processor.reset();
read_buffer.reset(); read_buffer.reset();
write_buffer.reset(); write_buffer.reset();
io = {}; io = {};
@ -1362,10 +1362,10 @@ namespace
return; return;
WriteBufferFromString buf{*result.mutable_totals()}; WriteBufferFromString buf{*result.mutable_totals()};
auto stream = query_context->getOutputStream(output_format, buf, totals); auto format = query_context->getOutputFormat(output_format, buf, totals);
stream->writePrefix(); format->doWritePrefix();
stream->write(totals); format->write(totals);
stream->writeSuffix(); format->doWriteSuffix();
} }
void Call::addExtremesToResult(const Block & extremes) void Call::addExtremesToResult(const Block & extremes)
@ -1374,10 +1374,10 @@ namespace
return; return;
WriteBufferFromString buf{*result.mutable_extremes()}; WriteBufferFromString buf{*result.mutable_extremes()};
auto stream = query_context->getOutputStream(output_format, buf, extremes); auto format = query_context->getOutputFormat(output_format, buf, extremes);
stream->writePrefix(); format->doWritePrefix();
stream->write(extremes); format->write(extremes);
stream->writeSuffix(); format->doWriteSuffix();
} }
void Call::addProfileInfoToResult(const BlockStreamProfileInfo & info) void Call::addProfileInfoToResult(const BlockStreamProfileInfo & info)

View File

@ -14,6 +14,7 @@
#include <Storages/HDFS/HDFSCommon.h> #include <Storages/HDFS/HDFSCommon.h>
#include <Formats/FormatFactory.h> #include <Formats/FormatFactory.h>
#include <Processors/Formats/InputStreamFromInputFormat.h> #include <Processors/Formats/InputStreamFromInputFormat.h>
#include <Processors/Formats/IOutputFormat.h>
#include <DataTypes/DataTypeString.h> #include <DataTypes/DataTypeString.h>
#include <Processors/Sinks/SinkToStorage.h> #include <Processors/Sinks/SinkToStorage.h>
#include <Common/parseGlobs.h> #include <Common/parseGlobs.h>
@ -122,7 +123,7 @@ public:
auto compression = chooseCompressionMethod(path, compression_method); auto compression = chooseCompressionMethod(path, compression_method);
read_buf = wrapReadBufferWithCompressionMethod(std::make_unique<ReadBufferFromHDFS>(uri, path, getContext()->getGlobalContext()->getConfigRef()), compression); read_buf = wrapReadBufferWithCompressionMethod(std::make_unique<ReadBufferFromHDFS>(uri, path, getContext()->getGlobalContext()->getConfigRef()), compression);
auto input_format = FormatFactory::instance().getInput(format, *read_buf, sample_block, getContext(), max_block_size); auto input_format = getContext()->getInputFormat(format, *read_buf, sample_block, max_block_size);
reader = std::make_shared<InputStreamFromInputFormat>(input_format); reader = std::make_shared<InputStreamFromInputFormat>(input_format);
reader->readPrefix(); reader->readPrefix();
@ -182,7 +183,7 @@ public:
: SinkToStorage(sample_block) : SinkToStorage(sample_block)
{ {
write_buf = wrapWriteBufferWithCompressionMethod(std::make_unique<WriteBufferFromHDFS>(uri, context->getGlobalContext()->getConfigRef()), compression_method, 3); write_buf = wrapWriteBufferWithCompressionMethod(std::make_unique<WriteBufferFromHDFS>(uri, context->getGlobalContext()->getConfigRef()), compression_method, 3);
writer = FormatFactory::instance().getOutputStreamParallelIfPossible(format, *write_buf, sample_block, context); writer = FormatFactory::instance().getOutputFormatParallelIfPossible(format, *write_buf, sample_block, context);
} }
String getName() const override { return "HDFSSink"; } String getName() const override { return "HDFSSink"; }
@ -191,7 +192,7 @@ public:
{ {
if (is_first_chunk) if (is_first_chunk)
{ {
writer->writePrefix(); writer->doWritePrefix();
is_first_chunk = false; is_first_chunk = false;
} }
writer->write(getHeader().cloneWithColumns(chunk.detachColumns())); writer->write(getHeader().cloneWithColumns(chunk.detachColumns()));
@ -201,7 +202,7 @@ public:
{ {
try try
{ {
writer->writeSuffix(); writer->doWriteSuffix();
writer->flush(); writer->flush();
write_buf->sync(); write_buf->sync();
write_buf->finalize(); write_buf->finalize();
@ -215,7 +216,7 @@ public:
private: private:
std::unique_ptr<WriteBuffer> write_buf; std::unique_ptr<WriteBuffer> write_buf;
BlockOutputStreamPtr writer; OutputFormatPtr writer;
bool is_first_chunk = true; bool is_first_chunk = true;
}; };

View File

@ -1,6 +1,7 @@
#include <Storages/Kafka/KafkaBlockOutputStream.h> #include <Storages/Kafka/KafkaBlockOutputStream.h>
#include <Formats/FormatFactory.h> #include <Formats/FormatFactory.h>
#include <Processors/Formats/IOutputFormat.h>
#include <Storages/Kafka/WriteBufferToKafkaProducer.h> #include <Storages/Kafka/WriteBufferToKafkaProducer.h>
namespace DB namespace DB
@ -24,7 +25,7 @@ void KafkaSink::onStart()
auto format_settings = getFormatSettings(context); auto format_settings = getFormatSettings(context);
format_settings.protobuf.allow_multiple_rows_without_delimiter = true; format_settings.protobuf.allow_multiple_rows_without_delimiter = true;
child = FormatFactory::instance().getOutputStream(storage.getFormatName(), *buffer, format = FormatFactory::instance().getOutputFormat(storage.getFormatName(), *buffer,
getHeader(), context, getHeader(), context,
[this](const Columns & columns, size_t row) [this](const Columns & columns, size_t row)
{ {
@ -35,13 +36,13 @@ void KafkaSink::onStart()
void KafkaSink::consume(Chunk chunk) void KafkaSink::consume(Chunk chunk)
{ {
child->write(getHeader().cloneWithColumns(chunk.detachColumns())); format->write(getHeader().cloneWithColumns(chunk.detachColumns()));
} }
void KafkaSink::onFinish() void KafkaSink::onFinish()
{ {
if (child) if (format)
child->writeSuffix(); format->doWriteSuffix();
//flush(); //flush();
if (buffer) if (buffer)

View File

@ -6,6 +6,9 @@
namespace DB namespace DB
{ {
class IOutputFormat;
using IOutputFormatPtr = std::shared_ptr<IOutputFormat>;
class KafkaSink : public SinkToStorage class KafkaSink : public SinkToStorage
{ {
public: public:
@ -26,7 +29,7 @@ private:
StorageMetadataPtr metadata_snapshot; StorageMetadataPtr metadata_snapshot;
const ContextPtr context; const ContextPtr context;
ProducerBufferPtr buffer; ProducerBufferPtr buffer;
BlockOutputStreamPtr child; IOutputFormatPtr format;
}; };
} }

View File

@ -78,8 +78,8 @@ Chunk KafkaSource::generateImpl()
auto put_error_to_stream = handle_error_mode == HandleKafkaErrorMode::STREAM; auto put_error_to_stream = handle_error_mode == HandleKafkaErrorMode::STREAM;
auto input_format = FormatFactory::instance().getInputFormat( auto input_format = context->getInputFormat(
storage.getFormatName(), *buffer, non_virtual_header, context, max_block_size); storage.getFormatName(), *buffer, non_virtual_header, max_block_size);
std::optional<std::string> exception_message; std::optional<std::string> exception_message;
size_t total_rows = 0; size_t total_rows = 0;

View File

@ -15,7 +15,6 @@
#include <DataTypes/DataTypeTuple.h> #include <DataTypes/DataTypeTuple.h>
#include <DataTypes/NestedUtils.h> #include <DataTypes/NestedUtils.h>
#include <Disks/TemporaryFileOnDisk.h> #include <Disks/TemporaryFileOnDisk.h>
#include <Formats/FormatFactory.h>
#include <Functions/FunctionFactory.h> #include <Functions/FunctionFactory.h>
#include <Functions/IFunction.h> #include <Functions/IFunction.h>
#include <IO/ConcatReadBuffer.h> #include <IO/ConcatReadBuffer.h>
@ -3477,11 +3476,10 @@ String MergeTreeData::getPartitionIDFromQuery(const ASTPtr & ast, ContextPtr loc
buf.appendBuffer(std::make_unique<ReadBufferFromMemory>(partition_ast.fields_str.data(), partition_ast.fields_str.size())); buf.appendBuffer(std::make_unique<ReadBufferFromMemory>(partition_ast.fields_str.data(), partition_ast.fields_str.size()));
buf.appendBuffer(std::make_unique<ReadBufferFromMemory>(")", 1)); buf.appendBuffer(std::make_unique<ReadBufferFromMemory>(")", 1));
auto input_format = FormatFactory::instance().getInput( auto input_format = local_context->getInputFormat(
"Values", "Values",
buf, buf,
metadata_snapshot->getPartitionKey().sample_block, metadata_snapshot->getPartitionKey().sample_block,
local_context,
local_context->getSettingsRef().max_block_size); local_context->getSettingsRef().max_block_size);
auto input_stream = std::make_shared<InputStreamFromInputFormat>(input_format); auto input_stream = std::make_shared<InputStreamFromInputFormat>(input_format);

View File

@ -2,6 +2,7 @@
#include <Storages/RabbitMQ/WriteBufferToRabbitMQProducer.h> #include <Storages/RabbitMQ/WriteBufferToRabbitMQProducer.h>
#include <Storages/RabbitMQ/StorageRabbitMQ.h> #include <Storages/RabbitMQ/StorageRabbitMQ.h>
#include <Formats/FormatFactory.h> #include <Formats/FormatFactory.h>
#include <Processors/Formats/IOutputFormat.h>
#include <base/logger_useful.h> #include <base/logger_useful.h>
@ -31,7 +32,7 @@ void RabbitMQSink::onStart()
auto format_settings = getFormatSettings(context); auto format_settings = getFormatSettings(context);
format_settings.protobuf.allow_multiple_rows_without_delimiter = true; format_settings.protobuf.allow_multiple_rows_without_delimiter = true;
child = FormatFactory::instance().getOutputStream(storage.getFormatName(), *buffer, getHeader(), context, format = FormatFactory::instance().getOutputFormat(storage.getFormatName(), *buffer, getHeader(), context,
[this](const Columns & /* columns */, size_t /* rows */) [this](const Columns & /* columns */, size_t /* rows */)
{ {
buffer->countRow(); buffer->countRow();
@ -42,13 +43,13 @@ void RabbitMQSink::onStart()
void RabbitMQSink::consume(Chunk chunk) void RabbitMQSink::consume(Chunk chunk)
{ {
child->write(getHeader().cloneWithColumns(chunk.detachColumns())); format->write(getHeader().cloneWithColumns(chunk.detachColumns()));
} }
void RabbitMQSink::onFinish() void RabbitMQSink::onFinish()
{ {
child->writeSuffix(); format->doWriteSuffix();
if (buffer) if (buffer)
buffer->updateMaxWait(); buffer->updateMaxWait();

View File

@ -7,6 +7,9 @@
namespace DB namespace DB
{ {
class IOutputFormat;
using IOutputFormatPtr = std::shared_ptr<IOutputFormat>;
class RabbitMQSink : public SinkToStorage class RabbitMQSink : public SinkToStorage
{ {
@ -24,6 +27,6 @@ private:
StorageMetadataPtr metadata_snapshot; StorageMetadataPtr metadata_snapshot;
ContextPtr context; ContextPtr context;
ProducerBufferPtr buffer; ProducerBufferPtr buffer;
BlockOutputStreamPtr child; IOutputFormatPtr format;
}; };
} }

View File

@ -1,6 +1,5 @@
#include <Storages/RabbitMQ/RabbitMQSource.h> #include <Storages/RabbitMQ/RabbitMQSource.h>
#include <Formats/FormatFactory.h>
#include <Interpreters/Context.h> #include <Interpreters/Context.h>
#include <Processors/Formats/InputStreamFromInputFormat.h> #include <Processors/Formats/InputStreamFromInputFormat.h>
#include <Processors/Executors/StreamingFormatExecutor.h> #include <Processors/Executors/StreamingFormatExecutor.h>
@ -119,8 +118,8 @@ Chunk RabbitMQSource::generateImpl()
is_finished = true; is_finished = true;
MutableColumns virtual_columns = virtual_header.cloneEmptyColumns(); MutableColumns virtual_columns = virtual_header.cloneEmptyColumns();
auto input_format = FormatFactory::instance().getInputFormat( auto input_format = context->getInputFormat(
storage.getFormatName(), *buffer, non_virtual_header, context, max_block_size); storage.getFormatName(), *buffer, non_virtual_header, max_block_size);
StreamingFormatExecutor executor(non_virtual_header, input_format); StreamingFormatExecutor executor(non_virtual_header, input_format);

View File

@ -15,7 +15,6 @@
#include <Processors/Pipe.h> #include <Processors/Pipe.h>
#include <Processors/ISimpleTransform.h> #include <Processors/ISimpleTransform.h>
#include <Processors/Executors/CompletedPipelineExecutor.h> #include <Processors/Executors/CompletedPipelineExecutor.h>
#include <Formats/FormatFactory.h>
#include <Processors/Formats/IOutputFormat.h> #include <Processors/Formats/IOutputFormat.h>
#include <Interpreters/Context.h> #include <Interpreters/Context.h>
#include <Interpreters/InterpreterSelectWithUnionQuery.h> #include <Interpreters/InterpreterSelectWithUnionQuery.h>
@ -186,7 +185,7 @@ Pipe StorageExecutable::read(
auto pipeline = std::make_shared<QueryPipeline>(QueryPipelineBuilder::getPipeline(std::move(inputs[i]))); auto pipeline = std::make_shared<QueryPipeline>(QueryPipelineBuilder::getPipeline(std::move(inputs[i])));
auto out = FormatFactory::instance().getOutputFormat(format, *write_buffer, materializeBlock(pipeline->getHeader()), context); auto out = context->getOutputFormat(format, *write_buffer, materializeBlock(pipeline->getHeader()));
out->setAutoFlush(); out->setAutoFlush();
pipeline->complete(std::move(out)); pipeline->complete(std::move(out));

View File

@ -6,7 +6,6 @@
#include <Core/Settings.h> #include <Core/Settings.h>
#include <Interpreters/Context.h> #include <Interpreters/Context.h>
#include <DataTypes/DataTypeString.h> #include <DataTypes/DataTypeString.h>
#include <Formats/FormatFactory.h>
#include <Parsers/ASTLiteral.h> #include <Parsers/ASTLiteral.h>
#include <Common/parseAddress.h> #include <Common/parseAddress.h>
#include <Processors/Pipe.h> #include <Processors/Pipe.h>

View File

@ -35,6 +35,7 @@
#include <Storages/Distributed/DirectoryMonitor.h> #include <Storages/Distributed/DirectoryMonitor.h>
#include <Processors/Sources/SourceWithProgress.h> #include <Processors/Sources/SourceWithProgress.h>
#include <Processors/Formats/InputStreamFromInputFormat.h> #include <Processors/Formats/InputStreamFromInputFormat.h>
#include <Processors/Formats/IOutputFormat.h>
#include <Processors/Sources/NullSource.h> #include <Processors/Sources/NullSource.h>
#include <Processors/Pipe.h> #include <Processors/Pipe.h>
#include <Processors/Executors/PullingPipelineExecutor.h> #include <Processors/Executors/PullingPipelineExecutor.h>
@ -390,8 +391,8 @@ public:
return metadata_snapshot->getSampleBlock(); return metadata_snapshot->getSampleBlock();
}; };
auto format = FormatFactory::instance().getInput( auto format = context->getInputFormat(
storage->format_name, *read_buf, get_block_for_format(), context, max_block_size, storage->format_settings); storage->format_name, *read_buf, get_block_for_format(), max_block_size, storage->format_settings);
QueryPipelineBuilder builder; QueryPipelineBuilder builder;
builder.init(Pipe(format)); builder.init(Pipe(format));
@ -574,7 +575,7 @@ public:
write_buf = wrapWriteBufferWithCompressionMethod(std::move(naked_buffer), compression_method, 3); write_buf = wrapWriteBufferWithCompressionMethod(std::move(naked_buffer), compression_method, 3);
writer = FormatFactory::instance().getOutputStreamParallelIfPossible(storage.format_name, writer = FormatFactory::instance().getOutputFormatParallelIfPossible(storage.format_name,
*write_buf, metadata_snapshot->getSampleBlock(), context, *write_buf, metadata_snapshot->getSampleBlock(), context,
{}, format_settings); {}, format_settings);
} }
@ -584,7 +585,7 @@ public:
void onStart() override void onStart() override
{ {
if (!prefix_written) if (!prefix_written)
writer->writePrefix(); writer->doWritePrefix();
prefix_written = true; prefix_written = true;
} }
@ -595,7 +596,7 @@ public:
void onFinish() override void onFinish() override
{ {
writer->writeSuffix(); writer->doWriteSuffix();
} }
// void flush() override // void flush() override
@ -608,7 +609,7 @@ private:
StorageMetadataPtr metadata_snapshot; StorageMetadataPtr metadata_snapshot;
std::unique_lock<std::shared_timed_mutex> lock; std::unique_lock<std::shared_timed_mutex> lock;
std::unique_ptr<WriteBuffer> write_buf; std::unique_ptr<WriteBuffer> write_buf;
BlockOutputStreamPtr writer; OutputFormatPtr writer;
bool prefix_written{false}; bool prefix_written{false};
}; };

View File

@ -11,6 +11,7 @@
#include <DataTypes/DataTypeString.h> #include <DataTypes/DataTypeString.h>
#include <DataStreams/IBlockOutputStream.h> #include <DataStreams/IBlockOutputStream.h>
#include <Formats/FormatFactory.h> #include <Formats/FormatFactory.h>
#include <Processors/Formats/IOutputFormat.h>
#include <Common/parseAddress.h> #include <Common/parseAddress.h>
#include <IO/Operators.h> #include <IO/Operators.h>
#include <IO/WriteHelpers.h> #include <IO/WriteHelpers.h>
@ -162,7 +163,7 @@ public:
sqlbuf << backQuoteMySQL(remote_table_name); sqlbuf << backQuoteMySQL(remote_table_name);
sqlbuf << " (" << dumpNamesWithBackQuote(block) << ") VALUES "; sqlbuf << " (" << dumpNamesWithBackQuote(block) << ") VALUES ";
auto writer = FormatFactory::instance().getOutputStream("Values", sqlbuf, metadata_snapshot->getSampleBlock(), storage.getContext()); auto writer = FormatFactory::instance().getOutputFormat("Values", sqlbuf, metadata_snapshot->getSampleBlock(), storage.getContext());
writer->write(block); writer->write(block);
if (!storage.on_duplicate_clause.empty()) if (!storage.on_duplicate_clause.empty())

View File

@ -4,7 +4,6 @@
#include <Core/NamesAndTypes.h> #include <Core/NamesAndTypes.h>
#include <Storages/IStorage.h> #include <Storages/IStorage.h>
#include <DataStreams/NullBlockOutputStream.h>
#include <Processors/Sources/NullSource.h> #include <Processors/Sources/NullSource.h>
#include <Processors/Sinks/SinkToStorage.h> #include <Processors/Sinks/SinkToStorage.h>
#include <Processors/Pipe.h> #include <Processors/Pipe.h>

View File

@ -22,7 +22,6 @@
#include <Columns/ColumnDecimal.h> #include <Columns/ColumnDecimal.h>
#include <Columns/ColumnNullable.h> #include <Columns/ColumnNullable.h>
#include <Formats/FormatFactory.h>
#include <Formats/FormatSettings.h> #include <Formats/FormatSettings.h>
#include <IO/WriteHelpers.h> #include <IO/WriteHelpers.h>

View File

@ -32,6 +32,7 @@
#include <DataStreams/IBlockOutputStream.h> #include <DataStreams/IBlockOutputStream.h>
#include <Processors/Transforms/AddingDefaultsTransform.h> #include <Processors/Transforms/AddingDefaultsTransform.h>
#include <Processors/Formats/IOutputFormat.h>
#include <DataStreams/narrowBlockInputStreams.h> #include <DataStreams/narrowBlockInputStreams.h>
#include <Processors/QueryPipelineBuilder.h> #include <Processors/QueryPipelineBuilder.h>
@ -74,6 +75,10 @@ namespace ErrorCodes
extern const int S3_ERROR; extern const int S3_ERROR;
extern const int UNEXPECTED_EXPRESSION; extern const int UNEXPECTED_EXPRESSION;
} }
class IOutputFormat;
using OutputFormatPtr = std::shared_ptr<IOutputFormat>;
class StorageS3Source::DisclosedGlobIterator::Impl class StorageS3Source::DisclosedGlobIterator::Impl
{ {
@ -232,7 +237,7 @@ bool StorageS3Source::initialize()
read_buf = wrapReadBufferWithCompressionMethod( read_buf = wrapReadBufferWithCompressionMethod(
std::make_unique<ReadBufferFromS3>(client, bucket, current_key, max_single_read_retries, DBMS_DEFAULT_BUFFER_SIZE), std::make_unique<ReadBufferFromS3>(client, bucket, current_key, max_single_read_retries, DBMS_DEFAULT_BUFFER_SIZE),
chooseCompressionMethod(current_key, compression_hint)); chooseCompressionMethod(current_key, compression_hint));
auto input_format = FormatFactory::instance().getInput(format, *read_buf, sample_block, getContext(), max_block_size, format_settings); auto input_format = getContext()->getInputFormat(format, *read_buf, sample_block, max_block_size, format_settings);
QueryPipelineBuilder builder; QueryPipelineBuilder builder;
builder.init(Pipe(input_format)); builder.init(Pipe(input_format));
@ -309,7 +314,7 @@ public:
{ {
write_buf = wrapWriteBufferWithCompressionMethod( write_buf = wrapWriteBufferWithCompressionMethod(
std::make_unique<WriteBufferFromS3>(client, bucket, key, min_upload_part_size, max_single_part_upload_size), compression_method, 3); std::make_unique<WriteBufferFromS3>(client, bucket, key, min_upload_part_size, max_single_part_upload_size), compression_method, 3);
writer = FormatFactory::instance().getOutputStreamParallelIfPossible(format, *write_buf, sample_block, context, {}, format_settings); writer = FormatFactory::instance().getOutputFormatParallelIfPossible(format, *write_buf, sample_block, context, {}, format_settings);
} }
String getName() const override { return "StorageS3Sink"; } String getName() const override { return "StorageS3Sink"; }
@ -318,7 +323,7 @@ public:
{ {
if (is_first_chunk) if (is_first_chunk)
{ {
writer->writePrefix(); writer->doWritePrefix();
is_first_chunk = false; is_first_chunk = false;
} }
writer->write(getHeader().cloneWithColumns(chunk.detachColumns())); writer->write(getHeader().cloneWithColumns(chunk.detachColumns()));
@ -328,7 +333,7 @@ public:
{ {
try try
{ {
writer->writeSuffix(); writer->doWriteSuffix();
writer->flush(); writer->flush();
write_buf->finalize(); write_buf->finalize();
} }
@ -344,7 +349,7 @@ private:
Block sample_block; Block sample_block;
std::optional<FormatSettings> format_settings; std::optional<FormatSettings> format_settings;
std::unique_ptr<WriteBuffer> write_buf; std::unique_ptr<WriteBuffer> write_buf;
BlockOutputStreamPtr writer; OutputFormatPtr writer;
bool is_first_chunk = true; bool is_first_chunk = true;
}; };

View File

@ -22,7 +22,6 @@
#include <Interpreters/SelectQueryOptions.h> #include <Interpreters/SelectQueryOptions.h>
#include <Interpreters/InterpreterSelectQuery.h> #include <Interpreters/InterpreterSelectQuery.h>
#include <Interpreters/getTableExpressions.h> #include <Interpreters/getTableExpressions.h>
#include <Formats/FormatFactory.h>
#include <DataStreams/IBlockOutputStream.h> #include <DataStreams/IBlockOutputStream.h>
#include <Processors/Transforms/AddingDefaultsTransform.h> #include <Processors/Transforms/AddingDefaultsTransform.h>
#include <DataStreams/narrowBlockInputStreams.h> #include <DataStreams/narrowBlockInputStreams.h>

Some files were not shown because too many files have changed in this diff Show More