Add loadKeys method

This commit is contained in:
kssenii 2021-03-10 13:10:05 +00:00
parent f6610ceaad
commit 38f7f37468
6 changed files with 126 additions and 12 deletions

View File

@ -34,9 +34,7 @@ namespace
void LibraryRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response)
{
LOG_TRACE(log, "Request URI: {}", request.getURI());
HTMLForm params(request);
params.read(request.getStream());
if (!params.has("method"))
{
@ -177,7 +175,46 @@ void LibraryRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServe
}
else if (method == "loadKeys")
{
/// TODO
std::string key_columns_string = params.get("key_columns");
std::shared_ptr<Block> keys_sample_block;
try
{
keys_sample_block = parseColumns(std::move(key_columns_string));
}
catch (const Exception & ex)
{
processError(response, "Invalid 'key_columns' parameter in request body '" + ex.message() + "'");
LOG_WARNING(log, ex.getStackTraceString());
return;
}
std::string columns = params.get("columns");
std::shared_ptr<Block> sample_block;
try
{
sample_block = parseColumns(std::move(columns));
}
catch (const Exception & ex)
{
processError(response, "Invalid 'columns' parameter in request body '" + ex.message() + "'");
LOG_WARNING(log, ex.getStackTraceString());
return;
}
auto library_handler = SharedLibraryHandlerFactory::instance().get(dictionary_id);
auto & read_buf = request.getStream();
auto format = FormatFactory::instance().getInput("RowBinary", read_buf, *keys_sample_block, context, DEFAULT_BLOCK_SIZE);
auto reader = std::make_shared<InputStreamFromInputFormat>(format);
auto block = reader->read();
auto key_columns = block.getColumns();
auto input = library_handler->loadKeys(key_columns, *sample_block);
BlockOutputStreamPtr output = FormatFactory::instance().getOutputStream("RowBinary", out, *sample_block, context);
copyData(*input, *output);
}
}
catch (...)

View File

@ -139,6 +139,52 @@ BlockInputStreamPtr SharedLibraryHandler::loadIds(const std::string & attributes
}
BlockInputStreamPtr SharedLibraryHandler::loadKeys(const Columns & key_columns, const Block & sample_block)
{
auto holder = std::make_unique<ClickHouseLibrary::Row[]>(key_columns.size());
std::vector<std::unique_ptr<ClickHouseLibrary::Field[]>> column_data_holders;
for (size_t i = 0; i < key_columns.size(); ++i)
{
auto cell_holder = std::make_unique<ClickHouseLibrary::Field[]>(key_columns[i]->size());
for (size_t j = 0; j < key_columns[i]->size(); ++j)
{
auto data_ref = key_columns[i]->getDataAt(j);
cell_holder[j] = ClickHouseLibrary::Field{
.data = static_cast<const void *>(data_ref.data),
.size = data_ref.size};
}
holder[i] = ClickHouseLibrary::Row{
.data = static_cast<ClickHouseLibrary::Field *>(cell_holder.get()),
.size = key_columns[i]->size()};
column_data_holders.push_back(std::move(cell_holder));
}
ClickHouseLibrary::Table request_cols{
.data = static_cast<ClickHouseLibrary::Row *>(holder.get()),
.size = key_columns.size()};
void * data_ptr = nullptr;
/// Get function pointer before dataNew call because library->get may throw.
auto func_load_keys = library->get<void * (*)(
decltype(data_ptr), decltype(&settings_holder->strings), decltype(&request_cols))>("ClickHouseDictionary_v3_loadKeys");
data_ptr = library->get<decltype(data_ptr) (*)(decltype(lib_data))>("ClickHouseDictionary_v3_dataNew")(lib_data);
auto * data = func_load_keys(data_ptr, &settings_holder->strings, &request_cols);
auto block = dataToBlock(sample_block, data);
SCOPE_EXIT(library->get<void (*)(decltype(lib_data), decltype(data_ptr))>("ClickHouseDictionary_v3_dataDelete")(lib_data, data_ptr));
return std::make_shared<OneBlockInputStream>(block);
}
Block SharedLibraryHandler::dataToBlock(const Block & sample_block, const void * data)
{
if (!data)

View File

@ -23,7 +23,7 @@ public:
BlockInputStreamPtr loadIds(const std::string & ids_string, const std::string & attributes_string, const Block & sample_block);
/// TODO: loadKeys
BlockInputStreamPtr loadKeys(const Columns & key_columns, const Block & sample_block);
bool isModified();

View File

@ -4,7 +4,10 @@
#include <IO/ReadHelpers.h>
#include <IO/ReadWriteBufferFromHTTP.h>
#include <DataStreams/OneBlockInputStream.h>
#include <DataStreams/OwningBlockInputStream.h>
#include <Dictionaries/DictionarySourceHelpers.h>
#include <Processors/Formats/InputStreamFromInputFormat.h>
#include <IO/WriteBufferFromOStream.h>
#include <Formats/FormatFactory.h>
#include <Poco/Path.h>
#include <Poco/Util/AbstractConfiguration.h>
@ -170,16 +173,43 @@ BlockInputStreamPtr LibraryBridgeHelper::loadIds(const std::string attributes_st
}
/// Not implemented, TODO
BlockInputStreamPtr LibraryBridgeHelper::loadKeys()
BlockInputStreamPtr LibraryBridgeHelper::loadKeys(const Block & key_columns, const Block & sample_block)
{
startBridgeSync();
auto columns = key_columns.getColumns();
auto keys_sample_block = key_columns.cloneEmpty();
auto uri = getDictionaryURI();
uri.addQueryParameter("method", LOAD_KEYS_METHOD);
uri.addQueryParameter("columns", sample_block.getNamesAndTypesList().toString());
uri.addQueryParameter("key_columns", keys_sample_block.getNamesAndTypesList().toString());
ReadWriteBufferFromHTTP read_buf(uri, Poco::Net::HTTPRequest::HTTP_POST, {}, {});
return {};
ReadWriteBufferFromHTTP::OutStreamCallback out_stream_callback = [key_columns, sample_block, this](std::ostream & ostr)
{
WriteBufferFromOStream out_buffer(ostr);
auto output_stream = context.getOutputStream(
LibraryBridgeHelper::DEFAULT_FORMAT, out_buffer, sample_block);
formatBlock(output_stream, key_columns);
};
//Poco::Net::HTTPBasicCredentials credentials;
//ReadWriteBufferFromHTTP::HTTPHeaderEntries header_entries;
//ConnectionTimeouts timeouts;
//auto in_ptr = std::make_unique<ReadWriteBufferFromHTTP>(
// uri, Poco::Net::HTTPRequest::HTTP_POST, out_stream_callback, timeouts,
// 0, credentials, DBMS_DEFAULT_BUFFER_SIZE, header_entries);
//auto input_stream = context.getInputFormat(LibraryBridgeHelper::DEFAULT_FORMAT, *in_ptr, sample_block, DEFAULT_BLOCK_SIZE);
//return std::make_shared<OwningBlockInputStream<ReadWriteBufferFromHTTP>>(input_stream, std::move(in_ptr));
ReadWriteBufferFromHTTP read_buf(uri, Poco::Net::HTTPRequest::HTTP_POST, out_stream_callback, {});
auto format = FormatFactory::instance().getInput(LibraryBridgeHelper::DEFAULT_FORMAT, read_buf, sample_block, context, DEFAULT_BLOCK_SIZE);
auto reader = std::make_shared<InputStreamFromInputFormat>(format);
auto block = reader->read();
return std::make_shared<OneBlockInputStream>(block);
}
}

View File

@ -28,7 +28,7 @@ public:
BlockInputStreamPtr loadIds(const std::string attributes_string, const std::string ids_string, const Block & sample_block);
BlockInputStreamPtr loadKeys();
BlockInputStreamPtr loadKeys(const Block & key_columns, const Block & sample_block);
bool isModified();

View File

@ -9,6 +9,7 @@
#include <ext/scope_guard.h>
#include <Common/StringUtils/StringUtils.h>
#include "DictionarySourceFactory.h"
#include "DictionarySourceHelpers.h"
#include "DictionaryStructure.h"
#include "LibraryDictionarySourceExternal.h"
#include "registerDictionaries.h"
@ -109,11 +110,11 @@ BlockInputStreamPtr LibraryDictionarySource::loadIds(const std::vector<UInt64> &
}
/// Not implemented, TODO
BlockInputStreamPtr LibraryDictionarySource::loadKeys(const Columns & /* key_columns */, const std::vector<std::size_t> & requested_rows)
BlockInputStreamPtr LibraryDictionarySource::loadKeys(const Columns & key_columns, const std::vector<std::size_t> & requested_rows)
{
LOG_TRACE(log, "loadKeys {} size = {}", toString(), requested_rows.size());
return bridge_helper->loadKeys();
auto block = blockForKeys(dict_struct, key_columns, requested_rows);
return bridge_helper->loadKeys(block, description.sample_block);
}