parallel formatting everywhere

This commit is contained in:
Nikita Mikhaylov 2021-03-15 14:51:24 +03:00
parent 588f3ee11e
commit 1f92c8ce58
6 changed files with 7 additions and 7 deletions

View File

@ -1180,7 +1180,7 @@ try
file_in.seek(0, SEEK_SET);
BlockInputStreamPtr input = context.getInputFormat(input_format, file_in, header, max_block_size);
BlockOutputStreamPtr output = context.getOutputStream(output_format, file_out, header);
BlockOutputStreamPtr output = context.getOutputStreamParallelIfPossible(output_format, file_out, header);
if (processed_rows + source_rows > limit)
input = std::make_shared<LimitBlockInputStream>(input, limit - processed_rows, 0);

View File

@ -176,7 +176,7 @@ void ODBCHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse
std::string query = params.get("query");
LOG_TRACE(log, "Query: {}", query);
BlockOutputStreamPtr writer = FormatFactory::instance().getOutputStream(format, out, *sample_block, context);
BlockOutputStreamPtr writer = FormatFactory::instance().getOutputStreamParallelIfPossible(format, out, *sample_block, context);
auto pool = getPool(connection_string);
ODBCBlockInputStream inp(pool->get(), query, *sample_block, max_block_size);
copyData(inp, *writer);

View File

@ -136,7 +136,7 @@ BlockInputStreamPtr HTTPDictionarySource::loadIds(const std::vector<UInt64> & id
ReadWriteBufferFromHTTP::OutStreamCallback out_stream_callback = [block, this](std::ostream & ostr)
{
WriteBufferFromOStream out_buffer(ostr);
auto output_stream = context.getOutputStream(format, out_buffer, sample_block);
auto output_stream = context.getOutputStreamParallelIfPossible(format, out_buffer, sample_block);
formatBlock(output_stream, block);
};
@ -157,7 +157,7 @@ BlockInputStreamPtr HTTPDictionarySource::loadKeys(const Columns & key_columns,
ReadWriteBufferFromHTTP::OutStreamCallback out_stream_callback = [block, this](std::ostream & ostr)
{
WriteBufferFromOStream out_buffer(ostr);
auto output_stream = context.getOutputStream(format, out_buffer, sample_block);
auto output_stream = context.getOutputStreamParallelIfPossible(format, out_buffer, sample_block);
formatBlock(output_stream, block);
};

View File

@ -183,7 +183,7 @@ public:
: sample_block(sample_block_)
{
write_buf = wrapWriteBufferWithCompressionMethod(std::make_unique<WriteBufferFromHDFS>(uri, context.getGlobalContext().getConfigRef()), compression_method, 3);
writer = FormatFactory::instance().getOutputStream(format, *write_buf, sample_block, context);
writer = FormatFactory::instance().getOutputStreamParallelIfPossible(format, *write_buf, sample_block, context);
}
Block getHeader() const override

View File

@ -155,7 +155,7 @@ namespace
{
write_buf = wrapWriteBufferWithCompressionMethod(
std::make_unique<WriteBufferFromS3>(client, bucket, key, min_upload_part_size, max_single_part_upload_size), compression_method, 3);
writer = FormatFactory::instance().getOutputStream(format, *write_buf, sample_block, context);
writer = FormatFactory::instance().getOutputStreamParallelIfPossible(format, *write_buf, sample_block, context);
}
Block getHeader() const override

View File

@ -155,7 +155,7 @@ StorageURLBlockOutputStream::StorageURLBlockOutputStream(const Poco::URI & uri,
write_buf = wrapWriteBufferWithCompressionMethod(
std::make_unique<WriteBufferFromHTTP>(uri, Poco::Net::HTTPRequest::HTTP_POST, timeouts),
compression_method, 3);
writer = FormatFactory::instance().getOutputStream(format, *write_buf, sample_block,
writer = FormatFactory::instance().getOutputStreamParallelIfPossible(format, *write_buf, sample_block,
context, {} /* write callback */, format_settings);
}