From 1f92c8ce581bfb8c820a7220a63ae005ed13316c Mon Sep 17 00:00:00 2001 From: Nikita Mikhaylov Date: Mon, 15 Mar 2021 14:51:24 +0300 Subject: [PATCH] parallel formatting everywhere --- programs/obfuscator/Obfuscator.cpp | 2 +- programs/odbc-bridge/MainHandler.cpp | 2 +- src/Dictionaries/HTTPDictionarySource.cpp | 4 ++-- src/Storages/HDFS/StorageHDFS.cpp | 2 +- src/Storages/StorageS3.cpp | 2 +- src/Storages/StorageURL.cpp | 2 +- 6 files changed, 7 insertions(+), 7 deletions(-) diff --git a/programs/obfuscator/Obfuscator.cpp b/programs/obfuscator/Obfuscator.cpp index 950db4e4f05..3ccbfd44357 100644 --- a/programs/obfuscator/Obfuscator.cpp +++ b/programs/obfuscator/Obfuscator.cpp @@ -1180,7 +1180,7 @@ try file_in.seek(0, SEEK_SET); BlockInputStreamPtr input = context.getInputFormat(input_format, file_in, header, max_block_size); - BlockOutputStreamPtr output = context.getOutputStream(output_format, file_out, header); + BlockOutputStreamPtr output = context.getOutputStreamParallelIfPossible(output_format, file_out, header); if (processed_rows + source_rows > limit) input = std::make_shared(input, limit - processed_rows, 0); diff --git a/programs/odbc-bridge/MainHandler.cpp b/programs/odbc-bridge/MainHandler.cpp index 4fcc9deea6a..079fc371ab4 100644 --- a/programs/odbc-bridge/MainHandler.cpp +++ b/programs/odbc-bridge/MainHandler.cpp @@ -176,7 +176,7 @@ void ODBCHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse std::string query = params.get("query"); LOG_TRACE(log, "Query: {}", query); - BlockOutputStreamPtr writer = FormatFactory::instance().getOutputStream(format, out, *sample_block, context); + BlockOutputStreamPtr writer = FormatFactory::instance().getOutputStreamParallelIfPossible(format, out, *sample_block, context); auto pool = getPool(connection_string); ODBCBlockInputStream inp(pool->get(), query, *sample_block, max_block_size); copyData(inp, *writer); diff --git a/src/Dictionaries/HTTPDictionarySource.cpp b/src/Dictionaries/HTTPDictionarySource.cpp index ddcac117e58..62bf478afc4 100644 --- a/src/Dictionaries/HTTPDictionarySource.cpp +++ b/src/Dictionaries/HTTPDictionarySource.cpp @@ -136,7 +136,7 @@ BlockInputStreamPtr HTTPDictionarySource::loadIds(const std::vector & id ReadWriteBufferFromHTTP::OutStreamCallback out_stream_callback = [block, this](std::ostream & ostr) { WriteBufferFromOStream out_buffer(ostr); - auto output_stream = context.getOutputStream(format, out_buffer, sample_block); + auto output_stream = context.getOutputStreamParallelIfPossible(format, out_buffer, sample_block); formatBlock(output_stream, block); }; @@ -157,7 +157,7 @@ BlockInputStreamPtr HTTPDictionarySource::loadKeys(const Columns & key_columns, ReadWriteBufferFromHTTP::OutStreamCallback out_stream_callback = [block, this](std::ostream & ostr) { WriteBufferFromOStream out_buffer(ostr); - auto output_stream = context.getOutputStream(format, out_buffer, sample_block); + auto output_stream = context.getOutputStreamParallelIfPossible(format, out_buffer, sample_block); formatBlock(output_stream, block); }; diff --git a/src/Storages/HDFS/StorageHDFS.cpp b/src/Storages/HDFS/StorageHDFS.cpp index f7afd4a497d..e26d3375c33 100644 --- a/src/Storages/HDFS/StorageHDFS.cpp +++ b/src/Storages/HDFS/StorageHDFS.cpp @@ -183,7 +183,7 @@ public: : sample_block(sample_block_) { write_buf = wrapWriteBufferWithCompressionMethod(std::make_unique(uri, context.getGlobalContext().getConfigRef()), compression_method, 3); - writer = FormatFactory::instance().getOutputStream(format, *write_buf, sample_block, context); + writer = FormatFactory::instance().getOutputStreamParallelIfPossible(format, *write_buf, sample_block, context); } Block getHeader() const override diff --git a/src/Storages/StorageS3.cpp b/src/Storages/StorageS3.cpp index a31a7fa0944..1cbbe14d09f 100644 --- a/src/Storages/StorageS3.cpp +++ b/src/Storages/StorageS3.cpp @@ -155,7 +155,7 @@ namespace { write_buf = wrapWriteBufferWithCompressionMethod( std::make_unique(client, bucket, key, min_upload_part_size, max_single_part_upload_size), compression_method, 3); - writer = FormatFactory::instance().getOutputStream(format, *write_buf, sample_block, context); + writer = FormatFactory::instance().getOutputStreamParallelIfPossible(format, *write_buf, sample_block, context); } Block getHeader() const override diff --git a/src/Storages/StorageURL.cpp b/src/Storages/StorageURL.cpp index ca984f9ece9..8b16a08b957 100644 --- a/src/Storages/StorageURL.cpp +++ b/src/Storages/StorageURL.cpp @@ -155,7 +155,7 @@ StorageURLBlockOutputStream::StorageURLBlockOutputStream(const Poco::URI & uri, write_buf = wrapWriteBufferWithCompressionMethod( std::make_unique(uri, Poco::Net::HTTPRequest::HTTP_POST, timeouts), compression_method, 3); - writer = FormatFactory::instance().getOutputStream(format, *write_buf, sample_block, + writer = FormatFactory::instance().getOutputStreamParallelIfPossible(format, *write_buf, sample_block, context, {} /* write callback */, format_settings); }