ClickHouse/programs/library-bridge/Handlers.cpp

366 lines
13 KiB
C++
Raw Normal View History

2021-03-05 09:38:00 +00:00
#include "Handlers.h"
2021-03-06 18:21:40 +00:00
#include "SharedLibraryHandlerFactory.h"
2021-03-05 09:38:00 +00:00
#include <Formats/FormatFactory.h>
#include <Server/HTTP/WriteBufferFromHTTPServerResponse.h>
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>
#include <Poco/Net/HTTPServerRequest.h>
#include <Poco/Net/HTTPServerResponse.h>
#include <Poco/Net/HTMLForm.h>
#include <Poco/ThreadPool.h>
2021-10-11 16:11:50 +00:00
#include <Processors/Formats/IOutputFormat.h>
2021-10-14 10:25:43 +00:00
#include <Processors/Formats/IInputFormat.h>
2021-10-16 14:03:50 +00:00
#include <QueryPipeline/QueryPipeline.h>
2021-10-11 16:11:50 +00:00
#include <Processors/Executors/CompletedPipelineExecutor.h>
2021-10-14 10:25:43 +00:00
#include <Processors/Executors/PullingPipelineExecutor.h>
2021-10-11 16:11:50 +00:00
#include <Processors/Sources/SourceFromSingleChunk.h>
2021-10-16 14:03:50 +00:00
#include <QueryPipeline/Pipe.h>
2021-03-05 09:38:00 +00:00
#include <Server/HTTP/HTMLForm.h>
2021-03-24 07:53:15 +00:00
#include <IO/ReadBufferFromString.h>
2021-03-05 09:38:00 +00:00
namespace DB
{
2021-08-02 07:18:51 +00:00
namespace ErrorCodes
{
extern const int BAD_REQUEST_PARAMETER;
}
2021-03-05 09:38:00 +00:00
namespace
{
void processError(HTTPServerResponse & response, const std::string & message)
{
response.setStatusAndReason(HTTPResponse::HTTP_INTERNAL_SERVER_ERROR);
if (!response.sent())
*response.send() << message << std::endl;
LOG_WARNING(&Poco::Logger::get("LibraryBridge"), fmt::runtime(message));
}
2021-03-05 09:38:00 +00:00
std::shared_ptr<Block> parseColumns(std::string && column_string)
{
auto sample_block = std::make_shared<Block>();
auto names_and_types = NamesAndTypesList::parse(column_string);
for (const NameAndTypePair & column_data : names_and_types)
sample_block->insert({column_data.type, column_data.name});
return sample_block;
}
2021-03-24 07:53:15 +00:00
2021-08-01 08:51:40 +00:00
std::vector<uint64_t> parseIdsFromBinary(ReadBuffer & buf)
{
std::vector<uint64_t> ids;
readVectorBinary(ids, buf);
return ids;
}
2021-03-24 07:53:15 +00:00
2021-03-24 19:32:31 +00:00
std::vector<std::string> parseNamesFromBinary(const std::string & names_string)
2021-03-24 07:53:15 +00:00
{
2021-03-24 19:32:31 +00:00
ReadBufferFromString buf(names_string);
std::vector<std::string> names;
readVectorBinary(names, buf);
return names;
2021-03-24 07:53:15 +00:00
}
2021-03-05 09:38:00 +00:00
}
2021-10-11 16:11:50 +00:00
static void writeData(Block data, OutputFormatPtr format)
{
auto source = std::make_shared<SourceFromSingleChunk>(std::move(data));
QueryPipeline pipeline(std::move(source));
pipeline.complete(std::move(format));
CompletedPipelineExecutor executor(pipeline);
executor.execute();
}
2021-03-05 09:38:00 +00:00
void LibraryRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response)
{
LOG_TRACE(log, "Request URI: {}", request.getURI());
2021-06-16 14:33:14 +00:00
HTMLForm params(getContext()->getSettingsRef(), request);
2021-07-23 09:05:42 +00:00
2021-03-05 09:38:00 +00:00
if (!params.has("method"))
{
processError(response, "No 'method' in request URL");
return;
}
2021-03-07 11:31:55 +00:00
if (!params.has("dictionary_id"))
{
processError(response, "No 'dictionary_id in request URL");
return;
}
2021-03-05 09:38:00 +00:00
std::string method = params.get("method");
2021-03-07 11:31:55 +00:00
std::string dictionary_id = params.get("dictionary_id");
2021-03-05 09:38:00 +00:00
LOG_TRACE(log, "Library method: '{}', dictionary id: {}", method, dictionary_id);
2021-03-05 09:38:00 +00:00
WriteBufferFromHTTPServerResponse out(response, request.getMethod() == Poco::Net::HTTPRequest::HTTP_HEAD, keep_alive_timeout);
2021-03-07 11:31:55 +00:00
2021-03-05 09:38:00 +00:00
try
{
bool lib_new = (method == "libNew");
if (method == "libClone")
{
if (!params.has("from_dictionary_id"))
{
processError(response, "No 'from_dictionary_id' in request URL");
return;
}
std::string from_dictionary_id = params.get("from_dictionary_id");
bool cloned = false;
cloned = SharedLibraryHandlerFactory::instance().clone(from_dictionary_id, dictionary_id);
if (cloned)
{
writeStringBinary("1", out);
}
else
{
LOG_TRACE(log, "Cannot clone from dictionary with id: {}, will call libNew instead", from_dictionary_id);
lib_new = true;
}
}
if (lib_new)
2021-03-05 09:38:00 +00:00
{
2021-04-02 15:45:42 +00:00
auto & read_buf = request.getStream();
params.read(read_buf);
2021-03-24 09:23:29 +00:00
2021-03-05 09:38:00 +00:00
if (!params.has("library_path"))
{
processError(response, "No 'library_path' in request URL");
return;
}
if (!params.has("library_settings"))
{
processError(response, "No 'library_settings' in request URL");
return;
}
2021-03-24 09:23:29 +00:00
std::string library_path = params.get("library_path");
const auto & settings_string = params.get("library_settings");
LOG_DEBUG(log, "Parsing library settings from binary string");
2021-03-24 19:32:31 +00:00
std::vector<std::string> library_settings = parseNamesFromBinary(settings_string);
2021-03-24 09:23:29 +00:00
2021-04-02 15:45:42 +00:00
/// Needed for library dictionary
2021-03-24 19:32:31 +00:00
if (!params.has("attributes_names"))
2021-03-24 09:23:29 +00:00
{
2021-03-24 19:32:31 +00:00
processError(response, "No 'attributes_names' in request URL");
2021-03-24 09:23:29 +00:00
return;
}
2021-04-02 15:45:42 +00:00
const auto & attributes_string = params.get("attributes_names");
LOG_DEBUG(log, "Parsing attributes names from binary string");
2021-04-02 15:45:42 +00:00
std::vector<std::string> attributes_names = parseNamesFromBinary(attributes_string);
/// Needed to parse block from binary string format
2021-03-24 08:41:42 +00:00
if (!params.has("sample_block"))
{
processError(response, "No 'sample_block' in request URL");
return;
}
2021-04-02 15:45:42 +00:00
std::string sample_block_string = params.get("sample_block");
2021-03-24 08:41:42 +00:00
std::shared_ptr<Block> sample_block;
try
{
2021-04-02 15:45:42 +00:00
sample_block = parseColumns(std::move(sample_block_string));
2021-03-24 08:41:42 +00:00
}
catch (const Exception & ex)
{
processError(response, "Invalid 'sample_block' parameter in request body '" + ex.message() + "'");
LOG_WARNING(log, fmt::runtime(ex.getStackTraceString()));
2021-03-24 08:41:42 +00:00
return;
}
2021-04-05 13:13:07 +00:00
if (!params.has("null_values"))
2021-04-02 15:45:42 +00:00
{
2021-04-05 13:13:07 +00:00
processError(response, "No 'null_values' in request URL");
2021-04-02 15:45:42 +00:00
return;
}
2021-04-05 13:13:07 +00:00
ReadBufferFromString read_block_buf(params.get("null_values"));
2021-10-11 16:11:50 +00:00
auto format = getContext()->getInputFormat(FORMAT, read_block_buf, *sample_block, DEFAULT_BLOCK_SIZE);
2021-10-14 10:25:43 +00:00
QueryPipeline pipeline(Pipe(std::move(format)));
PullingPipelineExecutor executor(pipeline);
Block sample_block_with_nulls;
executor.pull(sample_block_with_nulls);
2021-04-02 15:45:42 +00:00
2021-04-05 13:13:07 +00:00
LOG_DEBUG(log, "Dictionary sample block with null values: {}", sample_block_with_nulls.dumpStructure());
2021-04-02 15:45:42 +00:00
SharedLibraryHandlerFactory::instance().create(dictionary_id, library_path, library_settings, sample_block_with_nulls, attributes_names);
2021-03-22 15:58:20 +00:00
writeStringBinary("1", out);
2021-03-05 09:38:00 +00:00
}
2021-03-06 18:21:40 +00:00
else if (method == "libDelete")
{
auto deleted = SharedLibraryHandlerFactory::instance().remove(dictionary_id);
/// Do not throw, a warning is ok.
if (!deleted)
LOG_WARNING(log, "Cannot delete library for with dictionary id: {}, because such id was not found.", dictionary_id);
2021-03-22 15:58:20 +00:00
writeStringBinary("1", out);
2021-03-05 10:43:47 +00:00
}
else if (method == "isModified")
{
2021-03-06 18:21:40 +00:00
auto library_handler = SharedLibraryHandlerFactory::instance().get(dictionary_id);
if (!library_handler)
2021-08-02 07:18:51 +00:00
throw Exception(ErrorCodes::BAD_REQUEST_PARAMETER, "Not found dictionary with id: {}", dictionary_id);
2021-03-06 18:21:40 +00:00
bool res = library_handler->isModified();
2021-03-05 10:43:47 +00:00
writeStringBinary(std::to_string(res), out);
}
else if (method == "supportsSelectiveLoad")
{
2021-03-06 18:21:40 +00:00
auto library_handler = SharedLibraryHandlerFactory::instance().get(dictionary_id);
if (!library_handler)
2021-08-02 07:18:51 +00:00
throw Exception(ErrorCodes::BAD_REQUEST_PARAMETER, "Not found dictionary with id: {}", dictionary_id);
2021-03-06 18:21:40 +00:00
bool res = library_handler->supportsSelectiveLoad();
2021-03-05 10:43:47 +00:00
writeStringBinary(std::to_string(res), out);
2021-03-05 09:38:00 +00:00
}
else if (method == "loadAll")
{
2021-03-06 18:21:40 +00:00
auto library_handler = SharedLibraryHandlerFactory::instance().get(dictionary_id);
if (!library_handler)
2021-08-02 07:18:51 +00:00
throw Exception(ErrorCodes::BAD_REQUEST_PARAMETER, "Not found dictionary with id: {}", dictionary_id);
2021-03-24 08:41:42 +00:00
const auto & sample_block = library_handler->getSampleBlock();
2021-08-01 08:51:40 +00:00
LOG_DEBUG(log, "Calling loadAll() for dictionary id: {}", dictionary_id);
2021-03-24 09:23:29 +00:00
auto input = library_handler->loadAll();
2021-04-02 15:45:42 +00:00
LOG_DEBUG(log, "Started sending result data for dictionary id: {}", dictionary_id);
2021-10-11 16:11:50 +00:00
auto output = FormatFactory::instance().getOutputFormat(FORMAT, out, sample_block, getContext());
writeData(std::move(input), std::move(output));
2021-03-05 10:43:47 +00:00
}
else if (method == "loadIds")
{
LOG_DEBUG(log, "Getting diciontary ids for dictionary with id: {}", dictionary_id);
2021-07-27 13:07:01 +00:00
String ids_string;
2021-08-01 08:51:40 +00:00
std::vector<uint64_t> ids = parseIdsFromBinary(request.getStream());
2021-03-23 15:41:53 +00:00
2021-03-06 18:21:40 +00:00
auto library_handler = SharedLibraryHandlerFactory::instance().get(dictionary_id);
if (!library_handler)
2021-08-02 07:18:51 +00:00
throw Exception(ErrorCodes::BAD_REQUEST_PARAMETER, "Not found dictionary with id: {}", dictionary_id);
2021-03-24 08:41:42 +00:00
const auto & sample_block = library_handler->getSampleBlock();
2021-08-01 08:51:40 +00:00
LOG_DEBUG(log, "Calling loadIds() for dictionary id: {}", dictionary_id);
2021-03-24 09:23:29 +00:00
auto input = library_handler->loadIds(ids);
LOG_DEBUG(log, "Started sending result data for dictionary id: {}", dictionary_id);
2021-10-11 16:11:50 +00:00
auto output = FormatFactory::instance().getOutputFormat(FORMAT, out, sample_block, getContext());
writeData(std::move(input), std::move(output));
2021-03-05 09:38:00 +00:00
}
2021-03-06 18:21:40 +00:00
else if (method == "loadKeys")
{
2021-03-24 09:23:29 +00:00
if (!params.has("requested_block_sample"))
2021-03-10 18:02:43 +00:00
{
2021-03-24 09:23:29 +00:00
processError(response, "No 'requested_block_sample' in request URL");
2021-03-10 18:02:43 +00:00
return;
}
2021-03-24 09:23:29 +00:00
std::string requested_block_string = params.get("requested_block_sample");
2021-03-10 13:10:05 +00:00
2021-03-10 18:02:43 +00:00
std::shared_ptr<Block> requested_sample_block;
2021-03-10 13:10:05 +00:00
try
{
2021-03-10 18:02:43 +00:00
requested_sample_block = parseColumns(std::move(requested_block_string));
2021-03-10 13:10:05 +00:00
}
catch (const Exception & ex)
{
2021-03-10 18:02:43 +00:00
processError(response, "Invalid 'requested_block' parameter in request body '" + ex.message() + "'");
LOG_WARNING(log, fmt::runtime(ex.getStackTraceString()));
2021-03-10 13:10:05 +00:00
return;
}
auto & read_buf = request.getStream();
2021-10-11 16:11:50 +00:00
auto format = getContext()->getInputFormat(FORMAT, read_buf, *requested_sample_block, DEFAULT_BLOCK_SIZE);
2021-10-14 10:25:43 +00:00
QueryPipeline pipeline(std::move(format));
PullingPipelineExecutor executor(pipeline);
Block block;
executor.pull(block);
2021-03-10 13:10:05 +00:00
2021-03-10 18:02:43 +00:00
auto library_handler = SharedLibraryHandlerFactory::instance().get(dictionary_id);
if (!library_handler)
2021-08-02 07:18:51 +00:00
throw Exception(ErrorCodes::BAD_REQUEST_PARAMETER, "Not found dictionary with id: {}", dictionary_id);
2021-03-24 08:41:42 +00:00
const auto & sample_block = library_handler->getSampleBlock();
2021-08-01 08:51:40 +00:00
LOG_DEBUG(log, "Calling loadKeys() for dictionary id: {}", dictionary_id);
2021-03-24 08:41:42 +00:00
auto input = library_handler->loadKeys(block.getColumns());
LOG_DEBUG(log, "Started sending result data for dictionary id: {}", dictionary_id);
2021-10-11 16:11:50 +00:00
auto output = FormatFactory::instance().getOutputFormat(FORMAT, out, sample_block, getContext());
writeData(std::move(input), std::move(output));
2021-03-06 18:21:40 +00:00
}
2021-03-05 09:38:00 +00:00
}
catch (...)
{
auto message = getCurrentExceptionMessage(true);
LOG_ERROR(log, "Failed to process request for dictionary_id: {}. Error: {}", dictionary_id, message);
2021-03-05 09:38:00 +00:00
response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR, message); // can't call process_error, because of too soon response sending
2021-03-05 09:38:00 +00:00
try
{
writeStringBinary(message, out);
out.finalize();
}
catch (...)
{
tryLogCurrentException(log);
}
}
try
{
out.finalize();
}
catch (...)
{
tryLogCurrentException(log);
}
}
2021-08-02 07:18:51 +00:00
void LibraryExistsHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response)
2021-03-05 09:38:00 +00:00
{
try
{
LOG_TRACE(log, "Request URI: {}", request.getURI());
HTMLForm params(getContext()->getSettingsRef(), request);
2021-03-05 09:38:00 +00:00
if (!params.has("dictionary_id"))
{
2021-08-02 07:18:51 +00:00
processError(response, "No 'dictionary_id' in request URL");
return;
}
2021-03-05 09:38:00 +00:00
std::string dictionary_id = params.get("dictionary_id");
auto library_handler = SharedLibraryHandlerFactory::instance().get(dictionary_id);
String res;
if (library_handler)
2021-08-02 07:18:51 +00:00
res = "1";
else
2021-08-02 07:18:51 +00:00
res = "0";
2021-03-05 09:38:00 +00:00
setResponseDefaultHeaders(response, keep_alive_timeout);
LOG_TRACE(log, "Senging ping response: {} (dictionary id: {})", res, dictionary_id);
response.sendBuffer(res.data(), res.size());
2021-03-05 09:38:00 +00:00
}
catch (...)
{
tryLogCurrentException("PingHandler");
}
}
}