Fix header in library bridge

This commit is contained in:
Nikolai Kochetov 2021-10-13 14:24:12 +03:00
parent a5fa5c7ea3
commit bb9fa38369

View File

@ -8,6 +8,7 @@
#include <IO/WriteBufferFromString.h> #include <IO/WriteBufferFromString.h>
#include <IO/ReadHelpers.h> #include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h> #include <IO/WriteHelpers.h>
#include <Formats/FormatFactory.h>
#include <Poco/Util/AbstractConfiguration.h> #include <Poco/Util/AbstractConfiguration.h>
#include <Common/ShellCommand.h> #include <Common/ShellCommand.h>
#include <base/logger_useful.h> #include <base/logger_useful.h>
@ -79,6 +80,7 @@ bool LibraryBridgeHelper::bridgeHandShake()
} }
catch (...) catch (...)
{ {
tryLogCurrentException(log);
return false; return false;
} }
@ -225,7 +227,7 @@ Pipe LibraryBridgeHelper::loadKeys(const Block & requested_block)
ReadWriteBufferFromHTTP::OutStreamCallback out_stream_callback = [requested_block, this](std::ostream & os) ReadWriteBufferFromHTTP::OutStreamCallback out_stream_callback = [requested_block, this](std::ostream & os)
{ {
WriteBufferFromOStream out_buffer(os); WriteBufferFromOStream out_buffer(os);
auto output_format = getContext()->getOutputFormat(LibraryBridgeHelper::DEFAULT_FORMAT, out_buffer, sample_block); auto output_format = getContext()->getOutputFormat(LibraryBridgeHelper::DEFAULT_FORMAT, out_buffer, requested_block.cloneEmpty());
formatBlock(output_format, requested_block); formatBlock(output_format, requested_block);
}; };
return loadBase(uri, out_stream_callback); return loadBase(uri, out_stream_callback);
@ -258,7 +260,7 @@ Pipe LibraryBridgeHelper::loadBase(const Poco::URI & uri, ReadWriteBufferFromHTT
DBMS_DEFAULT_BUFFER_SIZE, DBMS_DEFAULT_BUFFER_SIZE,
ReadWriteBufferFromHTTP::HTTPHeaderEntries{}); ReadWriteBufferFromHTTP::HTTPHeaderEntries{});
auto source = getContext()->getInputFormat(LibraryBridgeHelper::DEFAULT_FORMAT, *read_buf_ptr, sample_block, DEFAULT_BLOCK_SIZE); auto source = FormatFactory::instance().getInput(LibraryBridgeHelper::DEFAULT_FORMAT, *read_buf_ptr, sample_block, getContext(), DEFAULT_BLOCK_SIZE);
source->addBuffer(std::move(read_buf_ptr)); source->addBuffer(std::move(read_buf_ptr));
return Pipe(std::move(source)); return Pipe(std::move(source));
} }