This commit is contained in:
proller 2016-11-24 22:57:24 +03:00
parent 89d9b7c835
commit e6d3c8855e
3 changed files with 23 additions and 24 deletions

View File

@ -1,7 +1,6 @@
#pragma once
#include <Poco/Net/HTTPClientSession.h>
#include <Poco/Net/HTTPRequest.h>
#include <Poco/URI.h>
#include <DB/IO/ReadBufferFromHTTP.h>
#include <DB/IO/ReadBuffer.h>
@ -29,13 +28,10 @@ private:
std::unique_ptr<ReadBuffer> impl;
public:
using OutStreamCallback = std::function<void(std::ostream&)>;
//using Params = std::vector<std::pair<String, String>>;
ReadWriteBufferFromHTTP(
const Poco::URI & uri,
const std::string & method = Poco::Net::HTTPRequest::HTTP_GET,
OutStreamCallback out_stream_callback = {},
const std::string & method = {},
const std::string & post_body = {},
size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE,
const HTTPTimeouts & timeouts = {}
);

View File

@ -1,8 +1,9 @@
#include <Poco/Net/HTTPRequest.h>
#include <DB/Dictionaries/HTTPDictionarySource.h>
#include <DB/Interpreters/Context.h>
#include <DB/Dictionaries/OwningBlockInputStream.h>
#include <DB/IO/ReadBufferFromHTTP.h>
#include <DB/IO/ReadWriteBufferFromHTTP.h>
#include <DB/DataStreams/IBlockOutputStream.h>
@ -35,17 +36,17 @@ BlockInputStreamPtr HTTPDictionarySource::loadAll()
{
LOG_TRACE(log, "loadAll " + toString());
Poco::URI uri(url);
auto in_ptr = std::make_unique<ReadBufferFromHTTP>(uri.getHost(), uri.getPort(), uri.getPathAndQuery(), ReadBufferFromHTTP::Params(), Poco::Net::HTTPRequest::HTTP_GET);
auto in_ptr = std::make_unique<ReadWriteBufferFromHTTP>(uri, Poco::Net::HTTPRequest::HTTP_GET);
auto stream = context.getInputFormat(format, *in_ptr, sample_block, max_block_size);
return std::make_shared<OwningBlockInputStream<ReadBufferFromHTTP>>(stream, std::move(in_ptr));
return std::make_shared<OwningBlockInputStream<ReadWriteBufferFromHTTP>>(stream, std::move(in_ptr));
}
BlockInputStreamPtr HTTPDictionarySource::loadIds(const std::vector<UInt64> & ids)
{
LOG_TRACE(log, "loadIds " + toString());
Poco::URI uri(url);
ReadWriteBufferFromHTTP::OutStreamCallback out_stream_callback = [&](std::ostream & out_stream) {
std::ostringstream out_stream;
{
// copypaste from ExecutableDictionarySource.cpp, todo: make func
ColumnWithTypeAndName column;
column.type = std::make_shared<DataTypeUInt64>();
@ -62,7 +63,9 @@ BlockInputStreamPtr HTTPDictionarySource::loadIds(const std::vector<UInt64> & id
auto stream_out = context.getOutputFormat(format, out_buffer, sample_block);
stream_out->write(block);
};
auto in_ptr = std::make_unique<ReadWriteBufferFromHTTP>(uri, Poco::Net::HTTPRequest::HTTP_POST, out_stream_callback);
Poco::URI uri(url);
auto in_ptr = std::make_unique<ReadWriteBufferFromHTTP>(uri, Poco::Net::HTTPRequest::HTTP_POST, out_stream.str());
auto stream = context.getInputFormat(format, *in_ptr, sample_block, max_block_size);
return std::make_shared<OwningBlockInputStream<ReadWriteBufferFromHTTP>>(stream, std::move(in_ptr));
}
@ -72,18 +75,15 @@ BlockInputStreamPtr HTTPDictionarySource::loadKeys(
{
LOG_TRACE(log, "loadKeys " + toString());
Poco::URI uri(url);
ReadWriteBufferFromHTTP::OutStreamCallback out_stream_callback = [&](std::ostream & out_stream) {
// copypaste from ExecutableDictionarySource.cpp, todo: make func
std::ostringstream out_stream;
{
Block block;
const auto keys_size = key_columns.size();
for (const auto i : ext::range(0, keys_size))
{
const auto & key_description = (*dict_struct.key)[i];
const auto & key = key_columns[i];
ColumnWithTypeAndName column;
column.type = key_description.type;
column.column = key->clone(); // CHECKME !!
@ -95,7 +95,8 @@ BlockInputStreamPtr HTTPDictionarySource::loadKeys(
stream_out->write(block);
};
auto in_ptr = std::make_unique<ReadWriteBufferFromHTTP>(uri, Poco::Net::HTTPRequest::HTTP_POST, out_stream_callback);
Poco::URI uri(url);
auto in_ptr = std::make_unique<ReadWriteBufferFromHTTP>(uri, Poco::Net::HTTPRequest::HTTP_POST, out_stream.str());
auto stream = context.getInputFormat(format, *in_ptr, sample_block, max_block_size);
return std::make_shared<OwningBlockInputStream<ReadWriteBufferFromHTTP>>(stream, std::move(in_ptr));
}

View File

@ -3,6 +3,7 @@
#include <Poco/URI.h>
#include <Poco/Net/DNS.h>
#include <Poco/Net/HTTPResponse.h>
#include <Poco/Net/HTTPRequest.h>
#include <DB/IO/ReadBufferFromIStream.h>
@ -36,13 +37,13 @@ static Poco::Net::IPAddress resolveHost(const String & host)
ReadWriteBufferFromHTTP::ReadWriteBufferFromHTTP(
const Poco::URI & uri,
const std::string & method,
OutStreamCallback out_stream_callback,
const std::string & post_body,
size_t buffer_size_,
const HTTPTimeouts & timeouts
) :
ReadBuffer(nullptr, 0),
uri{uri},
method{method},
method{!method.empty() ? method : post_body.empty() ? Poco::Net::HTTPRequest::HTTP_GET : Poco::Net::HTTPRequest::HTTP_POST},
timeouts{timeouts}
{
session.setHost(resolveHost(uri.getHost()).toString()); /// Cache DNS forever (until server restart)
@ -55,12 +56,13 @@ ReadWriteBufferFromHTTP::ReadWriteBufferFromHTTP(
LOG_TRACE((&Logger::get("ReadBufferFromHTTP")), "Sending request to " << uri.toString());
//request.setContentLength();
if (!post_body.empty() || method == Poco::Net::HTTPRequest::HTTP_POST)
request.setContentLength(post_body.size());
auto & stream_out = session.sendRequest(request);
if (out_stream_callback) {
out_stream_callback(stream_out);
}
if (!post_body.empty())
stream_out << post_body;
istr = &session.receiveResponse(response);