rewrite SinkMeiliSearch using JSONRowOutputFormat

This commit is contained in:
Mikhail Artemenko 2022-01-08 14:08:17 +03:00
parent 5a9ad15df7
commit e24d2b4d34
6 changed files with 34 additions and 87 deletions

View File

@ -38,9 +38,7 @@ String MeiliSearchConnection::searchQuery(const std::unordered_map<String, Strin
std::string post_fields = "{"; std::string post_fields = "{";
for (const auto & q_attr : query_params) for (const auto & q_attr : query_params)
{
post_fields += q_attr.first + ":" + q_attr.second + ","; post_fields += q_attr.first + ":" + q_attr.second + ",";
}
post_fields.back() = '}'; post_fields.back() = '}';
@ -65,14 +63,12 @@ String MeiliSearchConnection::searchQuery(const std::unordered_map<String, Strin
curl_slist_free_all(slist1); curl_slist_free_all(slist1);
if (ret_code != 0) if (ret_code != 0)
{
throw Exception(ErrorCodes::NETWORK_ERROR, curl_easy_strerror(ret_code)); throw Exception(ErrorCodes::NETWORK_ERROR, curl_easy_strerror(ret_code));
}
return response_buffer; return response_buffer;
} }
String MeiliSearchConnection::updateQuery(const String & data) const String MeiliSearchConnection::updateQuery(std::string_view data) const
{ {
CURLcode ret_code; CURLcode ret_code;
CURL * hnd; CURL * hnd;
@ -89,7 +85,7 @@ String MeiliSearchConnection::updateQuery(const String & data) const
curl_easy_setopt(hnd, CURLOPT_BUFFERSIZE, 102400L); curl_easy_setopt(hnd, CURLOPT_BUFFERSIZE, 102400L);
curl_easy_setopt(hnd, CURLOPT_URL, url.c_str()); curl_easy_setopt(hnd, CURLOPT_URL, url.c_str());
curl_easy_setopt(hnd, CURLOPT_NOPROGRESS, 1L); curl_easy_setopt(hnd, CURLOPT_NOPROGRESS, 1L);
curl_easy_setopt(hnd, CURLOPT_POSTFIELDS, data.c_str()); curl_easy_setopt(hnd, CURLOPT_POSTFIELDS, data.data());
curl_easy_setopt(hnd, CURLOPT_POSTFIELDSIZE_LARGE, data.size()); curl_easy_setopt(hnd, CURLOPT_POSTFIELDSIZE_LARGE, data.size());
curl_easy_setopt(hnd, CURLOPT_HTTPHEADER, slist1); curl_easy_setopt(hnd, CURLOPT_HTTPHEADER, slist1);
curl_easy_setopt(hnd, CURLOPT_MAXREDIRS, 50L); curl_easy_setopt(hnd, CURLOPT_MAXREDIRS, 50L);
@ -104,9 +100,7 @@ String MeiliSearchConnection::updateQuery(const String & data) const
curl_slist_free_all(slist1); curl_slist_free_all(slist1);
if (ret_code != 0) if (ret_code != 0)
{
throw Exception(ErrorCodes::NETWORK_ERROR, curl_easy_strerror(ret_code)); throw Exception(ErrorCodes::NETWORK_ERROR, curl_easy_strerror(ret_code));
}
return response_buffer; return response_buffer;
} }

View File

@ -4,6 +4,7 @@
#include <iostream> #include <iostream>
#include <memory> #include <memory>
#include <string> #include <string>
#include <string_view>
#include <unordered_map> #include <unordered_map>
#include <utility> #include <utility>
#include <vector> #include <vector>
@ -35,7 +36,7 @@ public:
String searchQuery(const std::unordered_map<String, String> & query_params) const; String searchQuery(const std::unordered_map<String, String> & query_params) const;
String updateQuery(const String & data) const; String updateQuery(std::string_view data) const;
private: private:
MeiliConfig config; MeiliConfig config;

View File

@ -1,5 +1,6 @@
#include <Storages/MeiliSearch/SinkMeiliSearch.h> #include <Storages/MeiliSearch/SinkMeiliSearch.h>
#include "Core/Field.h" #include "Core/Field.h"
#include "Formats/FormatFactory.h"
#include "IO/WriteBufferFromString.h" #include "IO/WriteBufferFromString.h"
#include "Processors/Formats/Impl/JSONRowOutputFormat.h" #include "Processors/Formats/Impl/JSONRowOutputFormat.h"
#include "base/JSON.h" #include "base/JSON.h"
@ -22,64 +23,35 @@ SinkMeiliSearch::SinkMeiliSearch(
{ {
} }
String getStringRepresentation(const ColumnWithTypeAndName & col, size_t row) void extractData(std::string_view& view) {
{ int ind = view.find("\"data\":") + 9;
Field elem; view.remove_prefix(ind);
if (col.column->size() <= row) int bal = ind = 1;
{ while (bal > 0) {
return ""; if (view[ind] == '[') ++bal;
else if (view[ind] == ']') --bal;
++ind;
} }
col.column->get(row, elem); view.remove_suffix(view.size() - ind);
if (elem.getType() == Field::Types::Int64)
{
return std::to_string(elem.get<Int64>());
}
else if (elem.getType() == Field::Types::UInt64)
{
return std::to_string(elem.get<UInt64>());
}
else if (elem.getType() == Field::Types::String)
{
return doubleQuoteString(elem.get<String>());
}
else if (elem.getType() == Field::Types::Float64)
{
return std::to_string(elem.get<Float64>());
}
return "";
}
String SinkMeiliSearch::getOneElement(const Block & block, int ind) const
{
String ans = "{";
int id = 0;
for (const auto & col : block)
{
ans += doubleQuoteString(sample_block.getByPosition(id++).name) + ":" + getStringRepresentation(col, ind) + ",";
}
ans.back() = '}';
return ans;
} }
void SinkMeiliSearch::writeBlockData(const Block & block) const void SinkMeiliSearch::writeBlockData(const Block & block) const
{ {
size_t max_col_size = 0; FormatSettings settings = getFormatSettings(local_context);
for (const auto & col : block) settings.json.quote_64bit_integers = false;
{ WriteBufferFromOwnString buf;
max_col_size = std::max(max_col_size, col.column->size()); auto writer = FormatFactory::instance().getOutputFormat("JSON", buf, sample_block, local_context, {}, settings);
} writer->write(block);
String json_array = "["; writer->flush();
for (size_t i = 0; i < max_col_size; ++i) writer->finalize();
{
json_array += getOneElement(block, i) + ","; std::string_view vbuf(buf.str());
} extractData(vbuf);
json_array.back() = ']';
auto response = connection.updateQuery(json_array); auto response = connection.updateQuery(vbuf);
JSON jres = JSON(response).begin(); auto jres = JSON(response).begin();
if (jres.getName() == "message") if (jres.getName() == "message")
{
throw Exception(ErrorCodes::MEILISEARCH_EXCEPTION, jres.getValue().toString()); throw Exception(ErrorCodes::MEILISEARCH_EXCEPTION, jres.getValue().toString());
}
} }
Blocks SinkMeiliSearch::splitBlocks(const Block & block, const size_t & max_rows) const Blocks SinkMeiliSearch::splitBlocks(const Block & block, const size_t & max_rows) const
@ -104,9 +76,8 @@ Blocks SinkMeiliSearch::splitBlocks(const Block & block, const size_t & max_rows
if (idx == split_block_size - 1) if (idx == split_block_size - 1)
limits = rows - offsets; limits = rows - offsets;
for (size_t col_idx = 0; col_idx < columns; ++col_idx) for (size_t col_idx = 0; col_idx < columns; ++col_idx)
{
split_blocks[idx].getByPosition(col_idx).column = block.getByPosition(col_idx).column->cut(offsets, limits); split_blocks[idx].getByPosition(col_idx).column = block.getByPosition(col_idx).column->cut(offsets, limits);
}
offsets += max_block_size; offsets += max_block_size;
} }
@ -118,9 +89,7 @@ void SinkMeiliSearch::consume(Chunk chunk)
auto block = getHeader().cloneWithColumns(chunk.detachColumns()); auto block = getHeader().cloneWithColumns(chunk.detachColumns());
auto blocks = splitBlocks(block, max_block_size); auto blocks = splitBlocks(block, max_block_size);
for (const auto & b : blocks) for (const auto & b : blocks)
{
writeBlockData(b); writeBlockData(b);
}
} }

View File

@ -22,8 +22,6 @@ public:
Blocks splitBlocks(const Block & block, const size_t & max_rows) const; Blocks splitBlocks(const Block & block, const size_t & max_rows) const;
private: private:
String getOneElement(const Block & block, int ind) const;
MeiliSearchConnection connection; MeiliSearchConnection connection;
ContextPtr local_context; ContextPtr local_context;
const UInt64 max_block_size; const UInt64 max_block_size;

View File

@ -32,9 +32,8 @@ MeiliSearchSource::MeiliSearchSource(
String columns_to_get = "["; String columns_to_get = "[";
for (const auto & col : description.sample_block) for (const auto & col : description.sample_block)
{
columns_to_get += doubleQuoteString(col.name) + ","; columns_to_get += doubleQuoteString(col.name) + ",";
}
columns_to_get.back() = ']'; columns_to_get.back() = ']';
query_params[doubleQuoteString("attributesToRetrieve")] = columns_to_get; query_params[doubleQuoteString("attributesToRetrieve")] = columns_to_get;
@ -71,9 +70,7 @@ void insertWithTypeId(MutableColumnPtr & column, JSON kv_pair, int type_id)
Chunk MeiliSearchSource::generate() Chunk MeiliSearchSource::generate()
{ {
if (all_read) if (all_read)
{
return {}; return {};
}
MutableColumns columns(description.sample_block.columns()); MutableColumns columns(description.sample_block.columns());
const size_t size = columns.size(); const size_t size = columns.size();
@ -87,9 +84,7 @@ Chunk MeiliSearchSource::generate()
JSON jres = JSON(response).begin(); JSON jres = JSON(response).begin();
if (jres.getName() == "message") if (jres.getName() == "message")
{
throw Exception(ErrorCodes::MEILISEARCH_EXCEPTION, jres.getValue().toString()); throw Exception(ErrorCodes::MEILISEARCH_EXCEPTION, jres.getValue().toString());
}
size_t cnt_match = 0; size_t cnt_match = 0;
String def; String def;

View File

@ -62,29 +62,22 @@ std::string convertASTtoStr(ASTPtr ptr)
ASTPtr getFunctionParams(ASTPtr node, const std::string & name) ASTPtr getFunctionParams(ASTPtr node, const std::string & name)
{ {
if (!node) if (!node)
{
return nullptr; return nullptr;
}
const auto * ptr = node->as<ASTFunction>(); const auto * ptr = node->as<ASTFunction>();
if (ptr && ptr->name == name) if (ptr && ptr->name == name)
{ {
if (node->children.size() == 1) if (node->children.size() == 1)
{
return node->children[0]; return node->children[0];
}
else else
{
return nullptr; return nullptr;
} }
}
for (const auto & next : node->children) for (const auto & next : node->children)
{ {
auto res = getFunctionParams(next, name); auto res = getFunctionParams(next, name);
if (res != nullptr) if (res != nullptr)
{
return res; return res;
} }
}
return nullptr; return nullptr;
} }
@ -112,9 +105,8 @@ Pipe StorageMeiliSearch::read(
auto str = el->getColumnName(); auto str = el->getColumnName();
auto it = find(str.begin(), str.end(), '='); auto it = find(str.begin(), str.end(), '=');
if (it == str.end()) if (it == str.end())
{
throw Exception("meiliMatch function must have parameters of the form \'key=value\'", ErrorCodes::BAD_QUERY_PARAMETER); throw Exception("meiliMatch function must have parameters of the form \'key=value\'", ErrorCodes::BAD_QUERY_PARAMETER);
}
String key(str.begin() + 1, it); String key(str.begin() + 1, it);
String value(it + 1, str.end() - 1); String value(it + 1, str.end() - 1);
kv_pairs_params[key] = value; kv_pairs_params[key] = value;
@ -126,9 +118,7 @@ Pipe StorageMeiliSearch::read(
} }
for (const auto & el : kv_pairs_params) for (const auto & el : kv_pairs_params)
{
LOG_TRACE(log, "Parsed parameter: key = " + el.first + ", value = " + el.second); LOG_TRACE(log, "Parsed parameter: key = " + el.first + ", value = " + el.second);
}
Block sample_block; Block sample_block;
for (const String & column_name : column_names) for (const String & column_name : column_names)