ClickHouse/programs/library-bridge/LibraryBridgeHandlers.cpp

403 lines
15 KiB
C++
Raw Normal View History

#include "LibraryBridgeHandlers.h"
#include "ExternalDictionaryLibraryHandlerFactory.h"
2021-03-05 09:38:00 +00:00
#include <Formats/FormatFactory.h>
#include <Server/HTTP/WriteBufferFromHTTPServerResponse.h>
#include <IO/WriteHelpers.h>
2021-03-05 09:38:00 +00:00
#include <IO/ReadHelpers.h>
#include <Common/BridgeProtocolVersion.h>
2021-03-05 09:38:00 +00:00
#include <Poco/Net/HTTPServerRequest.h>
#include <Poco/Net/HTTPServerResponse.h>
#include <Poco/Net/HTMLForm.h>
2021-03-05 09:38:00 +00:00
#include <Poco/ThreadPool.h>
#include <Processors/Formats/IOutputFormat.h>
#include <Processors/Formats/IInputFormat.h>
#include <QueryPipeline/QueryPipeline.h>
2021-10-11 16:11:50 +00:00
#include <Processors/Executors/CompletedPipelineExecutor.h>
2021-10-14 10:25:43 +00:00
#include <Processors/Executors/PullingPipelineExecutor.h>
2021-10-11 16:11:50 +00:00
#include <Processors/Sources/SourceFromSingleChunk.h>
2021-10-16 14:03:50 +00:00
#include <QueryPipeline/Pipe.h>
2021-03-05 09:38:00 +00:00
#include <Server/HTTP/HTMLForm.h>
#include <IO/ReadBufferFromString.h>
2021-03-05 09:38:00 +00:00
namespace DB
{
2021-08-02 07:18:51 +00:00
namespace ErrorCodes
{
extern const int BAD_REQUEST_PARAMETER;
}
2021-03-05 09:38:00 +00:00
namespace
{
void processError(HTTPServerResponse & response, const std::string & message)
{
response.setStatusAndReason(HTTPResponse::HTTP_INTERNAL_SERVER_ERROR);
if (!response.sent())
*response.send() << message << std::endl;
LOG_WARNING(&Poco::Logger::get("LibraryBridge"), fmt::runtime(message));
}
std::shared_ptr<Block> parseColumns(std::string && column_string)
2021-03-05 09:38:00 +00:00
{
auto sample_block = std::make_shared<Block>();
auto names_and_types = NamesAndTypesList::parse(column_string);
for (const NameAndTypePair & column_data : names_and_types)
sample_block->insert({column_data.type, column_data.name});
return sample_block;
}
2021-03-24 07:53:15 +00:00
2021-08-01 08:51:40 +00:00
std::vector<uint64_t> parseIdsFromBinary(ReadBuffer & buf)
{
std::vector<uint64_t> ids;
readVectorBinary(ids, buf);
return ids;
}
2021-03-24 07:53:15 +00:00
std::vector<std::string> parseNamesFromBinary(const std::string & names_string)
2021-03-24 07:53:15 +00:00
{
2021-03-24 19:32:31 +00:00
ReadBufferFromString buf(names_string);
std::vector<std::string> names;
2021-03-24 19:32:31 +00:00
readVectorBinary(names, buf);
return names;
2021-03-24 07:53:15 +00:00
}
2021-03-05 09:38:00 +00:00
}
2021-10-11 16:11:50 +00:00
static void writeData(Block data, OutputFormatPtr format)
{
auto source = std::make_shared<SourceFromSingleChunk>(std::move(data));
QueryPipeline pipeline(std::move(source));
pipeline.complete(std::move(format));
CompletedPipelineExecutor executor(pipeline);
executor.execute();
}
ExternalDictionaryLibraryBridgeRequestHandler::ExternalDictionaryLibraryBridgeRequestHandler(size_t keep_alive_timeout_, ContextPtr context_)
: WithContext(context_)
feat: implement catboost in library-bridge This commit moves the catboost model evaluation out of the server process into the library-bridge binary. This serves two goals: On the one hand, crashes / memory corruptions of the catboost library no longer affect the server. On the other hand, we can forbid loading dynamic libraries in the server (catboost was the last consumer of this functionality), thus improving security. SQL syntax: SELECT catboostEvaluate('/path/to/model.bin', FEAT_1, ..., FEAT_N) > 0 AS prediction, ACTION AS target FROM amazon_train LIMIT 10 Required configuration: <catboost_lib_path>/path/to/libcatboostmodel.so</catboost_lib_path> *** Implementation Details *** The internal protocol between the server and the library-bridge is simple: - HTTP GET on path "/extdict_ping": A ping, used during the handshake to check if the library-bridge runs. - HTTP POST on path "extdict_request" (1) Send a "catboost_GetTreeCount" request from the server to the bridge, containing a library path (e.g /home/user/libcatboost.so) and a model path (e.g. /home/user/model.bin). Rirst, this unloads the catboost library handler associated to the model path (if it was loaded), then loads the catboost library handler associated to the model path, then executes GetTreeCount() on the library handler and finally sends the result back to the server. Step (1) is called once by the server from FunctionCatBoostEvaluate::getReturnTypeImpl(). The library path handler is unloaded in the beginning because it contains state which may no longer be valid if the user runs catboost("/path/to/model.bin", ...) more than once and if "model.bin" was updated in between. (2) Send "catboost_Evaluate" from the server to the bridge, containing the model path and the features to run the interference on. Step (2) is called multiple times (once per chunk) by the server from function FunctionCatBoostEvaluate::executeImpl(). The library handler for the given model path is expected to be already loaded by Step (1). Fixes #27870
2022-08-05 07:53:06 +00:00
, log(&Poco::Logger::get("ExternalDictionaryLibraryBridgeRequestHandler"))
, keep_alive_timeout(keep_alive_timeout_)
{
}
2021-10-11 16:11:50 +00:00
void ExternalDictionaryLibraryBridgeRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response)
2021-03-05 09:38:00 +00:00
{
LOG_TRACE(log, "Request URI: {}", request.getURI());
2021-06-16 14:33:14 +00:00
HTMLForm params(getContext()->getSettingsRef(), request);
2021-07-23 09:05:42 +00:00
size_t version;
if (!params.has("version"))
version = 0; /// assumed version for too old servers which do not send a version
else
{
String version_str = params.get("version");
if (!tryParse(version, version_str))
{
processError(response, "Unable to parse 'version' string in request URL: '" + version_str + "' Check if the server and library-bridge have the same version.");
return;
}
}
if (version != LIBRARY_BRIDGE_PROTOCOL_VERSION)
{
/// backwards compatibility is considered unnecessary for now, just let the user know that the server and the bridge must be upgraded together
processError(response, "Server and library-bridge have different versions: '" + std::to_string(version) + "' vs. '" + std::to_string(LIBRARY_BRIDGE_PROTOCOL_VERSION) + "'");
return;
}
2021-03-05 09:38:00 +00:00
if (!params.has("method"))
{
processError(response, "No 'method' in request URL");
return;
}
2021-03-07 11:31:55 +00:00
if (!params.has("dictionary_id"))
{
processError(response, "No 'dictionary_id in request URL");
return;
}
std::string method = params.get("method");
std::string dictionary_id = params.get("dictionary_id");
2021-03-05 09:38:00 +00:00
LOG_TRACE(log, "Library method: '{}', dictionary id: {}", method, dictionary_id);
2021-03-05 09:38:00 +00:00
WriteBufferFromHTTPServerResponse out(response, request.getMethod() == Poco::Net::HTTPRequest::HTTP_HEAD, keep_alive_timeout);
2021-03-07 11:31:55 +00:00
2021-03-05 09:38:00 +00:00
try
{
bool lib_new = (method == "extDict_libNew");
if (method == "extDict_libClone")
{
if (!params.has("from_dictionary_id"))
{
processError(response, "No 'from_dictionary_id' in request URL");
return;
}
std::string from_dictionary_id = params.get("from_dictionary_id");
bool cloned = false;
cloned = ExternalDictionaryLibraryHandlerFactory::instance().clone(from_dictionary_id, dictionary_id);
if (cloned)
{
writeStringBinary("1", out);
}
else
{
LOG_TRACE(log, "Cannot clone from dictionary with id: {}, will call extDict_libNew instead", from_dictionary_id);
lib_new = true;
}
}
if (lib_new)
2021-03-05 09:38:00 +00:00
{
2021-04-02 15:45:42 +00:00
auto & read_buf = request.getStream();
params.read(read_buf);
2021-03-24 09:23:29 +00:00
2021-03-05 09:38:00 +00:00
if (!params.has("library_path"))
{
processError(response, "No 'library_path' in request URL");
return;
}
std::string library_path = params.get("library_path");
2021-03-05 09:38:00 +00:00
if (!params.has("library_settings"))
{
processError(response, "No 'library_settings' in request URL");
return;
}
const auto & settings_string = params.get("library_settings");
LOG_DEBUG(log, "Parsing library settings from binary string");
std::vector<std::string> library_settings = parseNamesFromBinary(settings_string);
2021-03-24 09:23:29 +00:00
2021-04-02 15:45:42 +00:00
/// Needed for library dictionary
2021-03-24 19:32:31 +00:00
if (!params.has("attributes_names"))
2021-03-24 09:23:29 +00:00
{
2021-03-24 19:32:31 +00:00
processError(response, "No 'attributes_names' in request URL");
2021-03-24 09:23:29 +00:00
return;
}
const auto & attributes_string = params.get("attributes_names");
LOG_DEBUG(log, "Parsing attributes names from binary string");
std::vector<std::string> attributes_names = parseNamesFromBinary(attributes_string);
2021-04-02 15:45:42 +00:00
/// Needed to parse block from binary string format
2021-03-24 08:41:42 +00:00
if (!params.has("sample_block"))
{
processError(response, "No 'sample_block' in request URL");
return;
}
std::string sample_block_string = params.get("sample_block");
2021-03-24 08:41:42 +00:00
std::shared_ptr<Block> sample_block;
try
{
2021-04-02 15:45:42 +00:00
sample_block = parseColumns(std::move(sample_block_string));
2021-03-24 08:41:42 +00:00
}
catch (const Exception & ex)
{
processError(response, "Invalid 'sample_block' parameter in request body '" + ex.message() + "'");
LOG_WARNING(log, fmt::runtime(ex.getStackTraceString()));
2021-03-24 08:41:42 +00:00
return;
}
2021-04-05 13:13:07 +00:00
if (!params.has("null_values"))
2021-04-02 15:45:42 +00:00
{
2021-04-05 13:13:07 +00:00
processError(response, "No 'null_values' in request URL");
2021-04-02 15:45:42 +00:00
return;
}
2021-04-05 13:13:07 +00:00
ReadBufferFromString read_block_buf(params.get("null_values"));
2021-10-11 16:11:50 +00:00
auto format = getContext()->getInputFormat(FORMAT, read_block_buf, *sample_block, DEFAULT_BLOCK_SIZE);
2021-10-14 10:25:43 +00:00
QueryPipeline pipeline(Pipe(std::move(format)));
PullingPipelineExecutor executor(pipeline);
Block sample_block_with_nulls;
executor.pull(sample_block_with_nulls);
2021-04-02 15:45:42 +00:00
2021-04-05 13:13:07 +00:00
LOG_DEBUG(log, "Dictionary sample block with null values: {}", sample_block_with_nulls.dumpStructure());
2021-04-02 15:45:42 +00:00
ExternalDictionaryLibraryHandlerFactory::instance().create(dictionary_id, library_path, library_settings, sample_block_with_nulls, attributes_names);
2021-03-22 15:58:20 +00:00
writeStringBinary("1", out);
2021-03-05 09:38:00 +00:00
}
else if (method == "extDict_libDelete")
2021-03-06 18:21:40 +00:00
{
bool deleted = ExternalDictionaryLibraryHandlerFactory::instance().remove(dictionary_id);
/// Do not throw, a warning is ok.
if (!deleted)
LOG_WARNING(log, "Cannot delete library for with dictionary id: {}, because such id was not found.", dictionary_id);
2021-03-22 15:58:20 +00:00
writeStringBinary("1", out);
2021-03-05 10:43:47 +00:00
}
else if (method == "extDict_isModified")
2021-03-05 10:43:47 +00:00
{
auto library_handler = ExternalDictionaryLibraryHandlerFactory::instance().get(dictionary_id);
if (!library_handler)
2021-08-02 07:18:51 +00:00
throw Exception(ErrorCodes::BAD_REQUEST_PARAMETER, "Not found dictionary with id: {}", dictionary_id);
2021-03-06 18:21:40 +00:00
bool res = library_handler->isModified();
2021-03-05 10:43:47 +00:00
writeStringBinary(std::to_string(res), out);
}
else if (method == "extDict_supportsSelectiveLoad")
2021-03-05 10:43:47 +00:00
{
auto library_handler = ExternalDictionaryLibraryHandlerFactory::instance().get(dictionary_id);
if (!library_handler)
2021-08-02 07:18:51 +00:00
throw Exception(ErrorCodes::BAD_REQUEST_PARAMETER, "Not found dictionary with id: {}", dictionary_id);
2021-03-06 18:21:40 +00:00
bool res = library_handler->supportsSelectiveLoad();
2021-03-05 10:43:47 +00:00
writeStringBinary(std::to_string(res), out);
2021-03-05 09:38:00 +00:00
}
else if (method == "extDict_loadAll")
2021-03-05 09:38:00 +00:00
{
auto library_handler = ExternalDictionaryLibraryHandlerFactory::instance().get(dictionary_id);
if (!library_handler)
2021-08-02 07:18:51 +00:00
throw Exception(ErrorCodes::BAD_REQUEST_PARAMETER, "Not found dictionary with id: {}", dictionary_id);
2021-03-24 08:41:42 +00:00
const auto & sample_block = library_handler->getSampleBlock();
LOG_DEBUG(log, "Calling extDict_loadAll() for dictionary id: {}", dictionary_id);
2021-03-24 09:23:29 +00:00
auto input = library_handler->loadAll();
2021-04-02 15:45:42 +00:00
LOG_DEBUG(log, "Started sending result data for dictionary id: {}", dictionary_id);
2021-10-11 16:11:50 +00:00
auto output = FormatFactory::instance().getOutputFormat(FORMAT, out, sample_block, getContext());
writeData(std::move(input), std::move(output));
2021-03-05 10:43:47 +00:00
}
else if (method == "extDict_loadIds")
2021-03-05 10:43:47 +00:00
{
LOG_DEBUG(log, "Getting diciontary ids for dictionary with id: {}", dictionary_id);
2021-07-27 13:07:01 +00:00
String ids_string;
2021-08-01 08:51:40 +00:00
std::vector<uint64_t> ids = parseIdsFromBinary(request.getStream());
2021-03-23 15:41:53 +00:00
auto library_handler = ExternalDictionaryLibraryHandlerFactory::instance().get(dictionary_id);
if (!library_handler)
2021-08-02 07:18:51 +00:00
throw Exception(ErrorCodes::BAD_REQUEST_PARAMETER, "Not found dictionary with id: {}", dictionary_id);
2021-03-24 08:41:42 +00:00
const auto & sample_block = library_handler->getSampleBlock();
LOG_DEBUG(log, "Calling extDict_loadIds() for dictionary id: {}", dictionary_id);
2021-03-24 09:23:29 +00:00
auto input = library_handler->loadIds(ids);
LOG_DEBUG(log, "Started sending result data for dictionary id: {}", dictionary_id);
2021-10-11 16:11:50 +00:00
auto output = FormatFactory::instance().getOutputFormat(FORMAT, out, sample_block, getContext());
writeData(std::move(input), std::move(output));
2021-03-05 09:38:00 +00:00
}
else if (method == "extDict_loadKeys")
2021-03-06 18:21:40 +00:00
{
2021-03-24 09:23:29 +00:00
if (!params.has("requested_block_sample"))
2021-03-10 18:02:43 +00:00
{
2021-03-24 09:23:29 +00:00
processError(response, "No 'requested_block_sample' in request URL");
2021-03-10 18:02:43 +00:00
return;
}
std::string requested_block_string = params.get("requested_block_sample");
2021-03-10 13:10:05 +00:00
2021-03-10 18:02:43 +00:00
std::shared_ptr<Block> requested_sample_block;
2021-03-10 13:10:05 +00:00
try
{
2021-03-10 18:02:43 +00:00
requested_sample_block = parseColumns(std::move(requested_block_string));
2021-03-10 13:10:05 +00:00
}
catch (const Exception & ex)
{
2021-03-10 18:02:43 +00:00
processError(response, "Invalid 'requested_block' parameter in request body '" + ex.message() + "'");
LOG_WARNING(log, fmt::runtime(ex.getStackTraceString()));
2021-03-10 13:10:05 +00:00
return;
}
auto & read_buf = request.getStream();
2021-10-11 16:11:50 +00:00
auto format = getContext()->getInputFormat(FORMAT, read_buf, *requested_sample_block, DEFAULT_BLOCK_SIZE);
2021-10-14 10:25:43 +00:00
QueryPipeline pipeline(std::move(format));
PullingPipelineExecutor executor(pipeline);
Block block;
executor.pull(block);
2021-03-10 13:10:05 +00:00
auto library_handler = ExternalDictionaryLibraryHandlerFactory::instance().get(dictionary_id);
if (!library_handler)
2021-08-02 07:18:51 +00:00
throw Exception(ErrorCodes::BAD_REQUEST_PARAMETER, "Not found dictionary with id: {}", dictionary_id);
2021-03-24 08:41:42 +00:00
const auto & sample_block = library_handler->getSampleBlock();
LOG_DEBUG(log, "Calling extDict_loadKeys() for dictionary id: {}", dictionary_id);
2021-03-24 08:41:42 +00:00
auto input = library_handler->loadKeys(block.getColumns());
LOG_DEBUG(log, "Started sending result data for dictionary id: {}", dictionary_id);
2021-10-11 16:11:50 +00:00
auto output = FormatFactory::instance().getOutputFormat(FORMAT, out, sample_block, getContext());
writeData(std::move(input), std::move(output));
2021-03-06 18:21:40 +00:00
}
else
{
LOG_WARNING(log, "Unknown library method: '{}'", method);
}
2021-03-05 09:38:00 +00:00
}
catch (...)
{
auto message = getCurrentExceptionMessage(true);
LOG_ERROR(log, "Failed to process request for dictionary_id: {}. Error: {}", dictionary_id, message);
2021-03-05 09:38:00 +00:00
response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR, message); // can't call process_error, because of too soon response sending
2021-03-05 09:38:00 +00:00
try
{
writeStringBinary(message, out);
out.finalize();
}
catch (...)
{
tryLogCurrentException(log);
}
}
try
{
out.finalize();
}
catch (...)
{
tryLogCurrentException(log);
}
}
ExternalDictionaryLibraryBridgeExistsHandler::ExternalDictionaryLibraryBridgeExistsHandler(size_t keep_alive_timeout_, ContextPtr context_)
: WithContext(context_)
, keep_alive_timeout(keep_alive_timeout_)
, log(&Poco::Logger::get("ExternalDictionaryLibraryBridgeExistsHandler"))
{
}
2021-03-05 09:38:00 +00:00
void ExternalDictionaryLibraryBridgeExistsHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response)
2021-03-05 09:38:00 +00:00
{
try
{
LOG_TRACE(log, "Request URI: {}", request.getURI());
HTMLForm params(getContext()->getSettingsRef(), request);
2021-03-05 09:38:00 +00:00
if (!params.has("dictionary_id"))
{
2021-08-02 07:18:51 +00:00
processError(response, "No 'dictionary_id' in request URL");
return;
}
2021-03-05 09:38:00 +00:00
std::string dictionary_id = params.get("dictionary_id");
auto library_handler = ExternalDictionaryLibraryHandlerFactory::instance().get(dictionary_id);
String res = library_handler ? "1" : "0";
2021-03-05 09:38:00 +00:00
setResponseDefaultHeaders(response, keep_alive_timeout);
LOG_TRACE(log, "Sending ping response: {} (dictionary id: {})", res, dictionary_id);
response.sendBuffer(res.data(), res.size());
2021-03-05 09:38:00 +00:00
}
catch (...)
{
tryLogCurrentException("PingHandler");
}
}
}