httd dict wip

This commit is contained in:
proller 2016-11-19 03:07:58 +03:00
parent 636cbe4323
commit fd3a9da076
4 changed files with 181 additions and 5 deletions

View File

@ -502,6 +502,7 @@ add_library (dbms
include/DB/IO/ReadHelpers.h
include/DB/IO/WriteBufferFromPocoSocket.h
include/DB/IO/ReadBufferFromHTTP.h
include/DB/IO/ReadWriteBufferFromHTTP.h
include/DB/IO/WriteBufferFromHTTPServerResponse.h
include/DB/IO/createReadBufferFromFileBase.h
include/DB/IO/WriteIntText.h
@ -673,6 +674,7 @@ add_library (dbms
src/IO/InterserverWriteBuffer.cpp
src/IO/ReadBufferFromHTTP.cpp
src/IO/ReadWriteBufferFromHTTP.cpp
src/Columns/ColumnConst.cpp
src/Columns/ColumnArray.cpp

View File

@ -0,0 +1,59 @@
#pragma once
//#include <memory>
#include <Poco/Net/HTTPClientSession.h>
#include <Poco/Net/HTTPRequest.h>
#include <DB/IO/ReadBufferFromHTTP.h>
#include <DB/IO/ReadBuffer.h>
//#include <DB/Core/Types.h>
namespace DB
{
struct HTTPLocation {
using Params = std::vector<std::pair<String, String>>;
std::string protocol = "http";
// user
// password
std::string host = "::1";
unsigned short port = 80;
std::string path = "/";
Params params;
//std::string query = "";
std::string method = Poco::Net::HTTPRequest::HTTP_GET;
};
struct HTTPTimeouts {
Poco::Timespan connection_timeout = Poco::Timespan(DEFAULT_HTTP_READ_BUFFER_CONNECTION_TIMEOUT, 0);
Poco::Timespan send_timeout = Poco::Timespan(DEFAULT_HTTP_READ_BUFFER_TIMEOUT, 0);
Poco::Timespan receive_timeout = Poco::Timespan(DEFAULT_HTTP_READ_BUFFER_TIMEOUT, 0);
};
/** Perform HTTP POST request and provide response to read.
*/
class ReadWriteBufferFromHTTP : public ReadBuffer
{
private:
HTTPLocation location;
HTTPTimeouts timeouts;
Poco::Net::HTTPClientSession session;
std::istream * istr; /// owned by session
std::unique_ptr<ReadBuffer> impl;
public:
//using Params = std::vector<std::pair<String, String>>;
ReadWriteBufferFromHTTP(
HTTPLocation http_query,
size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE,
HTTPTimeouts timeouts = HTTPTimeouts()
);
bool nextImpl() override;
};
}

View File

@ -2,18 +2,19 @@
#include <DB/Interpreters/Context.h>
#include <DB/Dictionaries/OwningBufferBlockInputStream.h>
#include <DB/Dictionaries/OwningBlockInputStream.h>
#include <DB/IO/ReadBufferFromHTTP.h>
//#include <Poco/Net/HTTPRequest.h> // HTTP_GET
#include <DB/IO/ReadWriteBufferFromHTTP.h>
namespace DB
{
HTTPDictionarySource::HTTPDictionarySource(const DictionaryStructure & dict_struct_, const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, Block & sample_block, const Context & context) :
dict_struct{dict_struct_},
host{config.getString(config_prefix + ".host")},
port{std::stoi(config.getString(config_prefix + ".port"))},
path{config.getString(config_prefix + ".path")},
method{config.getString(config_prefix + ".method")},
host{config.getString(config_prefix + ".host", "::1")},
port{std::stoi(config.getString(config_prefix + ".port", "80"))},
path{config.getString(config_prefix + ".path", "")},
//method{config.getString(config_prefix + ".method", "")},
format{config.getString(config_prefix + ".format")},
sample_block{sample_block},
context(context)
@ -33,6 +34,7 @@ HTTPDictionarySource::HTTPDictionarySource(const HTTPDictionarySource & other) :
BlockInputStreamPtr HTTPDictionarySource::loadAll()
{
LOG_TRACE(log, "loadAll " + toString());
auto in_ptr = std::make_unique<ReadBufferFromHTTP>(host, port, path, ReadBufferFromHTTP::Params(), method);
auto stream = context.getInputFormat(format, *in_ptr, sample_block, max_block_size);
return std::make_shared<OwningBufferBlockInputStream>(stream, std::move(in_ptr));
@ -40,12 +42,24 @@ BlockInputStreamPtr HTTPDictionarySource::loadAll()
BlockInputStreamPtr HTTPDictionarySource::loadIds(const std::vector<UInt64> & ids)
{
LOG_TRACE(log, "loadIds " + toString());
throw Exception{"Method unsupported", ErrorCodes::NOT_IMPLEMENTED};
HTTPLocation http_location;
http_location.host = host;
http_location.port = port;
http_location.path = path;
http_location.method = method;
auto in_ptr = std::make_unique<ReadWriteBufferFromHTTP>(http_location);
auto stream = context.getInputFormat(format, *in_ptr, sample_block, max_block_size);
return std::make_shared<OwningBlockInputStream<ReadWriteBufferFromHTTP>>(stream, std::move(in_ptr));
}
BlockInputStreamPtr HTTPDictionarySource::loadKeys(
const ConstColumnPlainPtrs & key_columns, const std::vector<std::size_t> & requested_rows)
{
LOG_TRACE(log, "loadKeys " + toString());
throw Exception{"Method unsupported", ErrorCodes::NOT_IMPLEMENTED};
}

View File

@ -0,0 +1,101 @@
#include <DB/IO/ReadWriteBufferFromHTTP.h>
#include <Poco/URI.h>
#include <Poco/Net/DNS.h>
//#include <Poco/Net/HTTPRequest.h>
#include <Poco/Net/HTTPResponse.h>
#include <DB/IO/ReadBufferFromIStream.h>
#include <DB/Common/SimpleCache.h>
#include <common/logger_useful.h>
namespace DB
{
// copypaste from ReadBufferFromHTTP.cpp
namespace ErrorCodes
{
extern const int RECEIVED_ERROR_FROM_REMOTE_IO_SERVER;
}
static Poco::Net::IPAddress resolveHostImpl(const String & host)
{
return Poco::Net::DNS::resolveOne(host);
}
static Poco::Net::IPAddress resolveHost(const String & host)
{
static SimpleCache<decltype(resolveHostImpl), &resolveHostImpl> cache;
return cache(host);
}
// ==========
ReadWriteBufferFromHTTP::ReadWriteBufferFromHTTP(
HTTPLocation location,
size_t buffer_size_,
HTTPTimeouts timeouts
) :
ReadBuffer(nullptr, 0),
location{location},
timeouts{timeouts}
{
std::stringstream uri;
std::stringstream path_params;
path_params << location.path;
bool first = true;
for (const auto & it : location.params)
{
path_params << (first ? "?" : "&");
first = false;
String encoded_key;
String encoded_value;
Poco::URI::encode(it.first, "=&#", encoded_key);
Poco::URI::encode(it.second, "&#", encoded_value);
path_params << encoded_key << "=" << encoded_value;
}
uri << "http://" << location.host << ":" << location.port << path_params.str();
session.setHost(resolveHost(location.host).toString()); /// Cache DNS forever (until server restart)
session.setPort(location.port);
session.setTimeout(timeouts.connection_timeout, timeouts.send_timeout, timeouts.receive_timeout);
Poco::Net::HTTPRequest request(location.method, path_params.str());
Poco::Net::HTTPResponse response;
LOG_TRACE((&Logger::get("ReadBufferFromHTTP")), "Sending request to " << uri.str());
session.sendRequest(request);
istr = &session.receiveResponse(response);
auto status = response.getStatus();
if (status != Poco::Net::HTTPResponse::HTTP_OK)
{
std::stringstream error_message;
error_message << "Received error from remote server " << uri.str() << ". HTTP status code: "
<< status << ", body: " << istr->rdbuf();
throw Exception(error_message.str(), ErrorCodes::RECEIVED_ERROR_FROM_REMOTE_IO_SERVER);
}
impl = std::make_unique<ReadBufferFromIStream>(*istr, buffer_size_);
}
bool ReadWriteBufferFromHTTP::nextImpl()
{
if (!impl->next())
return false;
internal_buffer = impl->buffer();
working_buffer = internal_buffer;
return true;
}
}