2021-12-30 18:11:12 +00:00
|
|
|
#include <Storages/MeiliSearch/SinkMeiliSearch.h>
|
|
|
|
#include "Core/Field.h"
|
2021-12-31 15:23:29 +00:00
|
|
|
#include "IO/WriteBufferFromString.h"
|
|
|
|
#include "Processors/Formats/Impl/JSONRowOutputFormat.h"
|
2021-12-30 18:11:12 +00:00
|
|
|
#include "base/JSON.h"
|
2021-12-31 15:23:29 +00:00
|
|
|
#include "base/types.h"
|
2021-12-30 18:11:12 +00:00
|
|
|
|
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
namespace ErrorCodes
|
|
|
|
{
|
|
|
|
extern const int MEILISEARCH_EXCEPTION;
|
|
|
|
}
|
|
|
|
|
|
|
|
SinkMeiliSearch::SinkMeiliSearch(
|
2021-12-31 15:23:29 +00:00
|
|
|
const MeiliSearchConfiguration & config_, const Block & sample_block_, ContextPtr local_context_, UInt64 max_block_size_)
|
2021-12-30 18:11:12 +00:00
|
|
|
: SinkToStorage(sample_block_)
|
|
|
|
, connection(config_)
|
2021-12-31 15:23:29 +00:00
|
|
|
, local_context{local_context_}
|
2021-12-30 18:11:12 +00:00
|
|
|
, max_block_size{max_block_size_}
|
2021-12-31 15:23:29 +00:00
|
|
|
, sample_block{sample_block_}
|
|
|
|
{
|
|
|
|
}
|
2021-12-30 18:11:12 +00:00
|
|
|
|
2021-12-31 15:23:29 +00:00
|
|
|
String getStringRepresentation(const ColumnWithTypeAndName & col, size_t row)
|
|
|
|
{
|
2021-12-30 18:11:12 +00:00
|
|
|
Field elem;
|
2021-12-31 15:23:29 +00:00
|
|
|
if (col.column->size() <= row)
|
|
|
|
{
|
2021-12-30 18:11:12 +00:00
|
|
|
return "";
|
|
|
|
}
|
|
|
|
col.column->get(row, elem);
|
2021-12-31 15:23:29 +00:00
|
|
|
if (elem.getType() == Field::Types::Int64)
|
|
|
|
{
|
2021-12-30 18:11:12 +00:00
|
|
|
return std::to_string(elem.get<Int64>());
|
2021-12-31 15:23:29 +00:00
|
|
|
}
|
|
|
|
else if (elem.getType() == Field::Types::UInt64)
|
|
|
|
{
|
2021-12-30 18:11:12 +00:00
|
|
|
return std::to_string(elem.get<UInt64>());
|
2021-12-31 15:23:29 +00:00
|
|
|
}
|
|
|
|
else if (elem.getType() == Field::Types::String)
|
|
|
|
{
|
|
|
|
return doubleQuoteString(elem.get<String>());
|
|
|
|
}
|
|
|
|
else if (elem.getType() == Field::Types::Float64)
|
|
|
|
{
|
|
|
|
return std::to_string(elem.get<Float64>());
|
2021-12-30 18:11:12 +00:00
|
|
|
}
|
|
|
|
return "";
|
|
|
|
}
|
|
|
|
|
2021-12-31 15:23:29 +00:00
|
|
|
String SinkMeiliSearch::getOneElement(const Block & block, int ind) const
|
|
|
|
{
|
2021-12-30 18:11:12 +00:00
|
|
|
String ans = "{";
|
|
|
|
int id = 0;
|
2021-12-31 15:23:29 +00:00
|
|
|
for (const auto & col : block)
|
|
|
|
{
|
|
|
|
ans += doubleQuoteString(sample_block.getByPosition(id++).name) + ":" + getStringRepresentation(col, ind) + ",";
|
2021-12-30 18:11:12 +00:00
|
|
|
}
|
|
|
|
ans.back() = '}';
|
|
|
|
return ans;
|
|
|
|
}
|
|
|
|
|
2021-12-31 15:23:29 +00:00
|
|
|
void SinkMeiliSearch::writeBlockData(const Block & block) const
|
|
|
|
{
|
2021-12-30 18:11:12 +00:00
|
|
|
size_t max_col_size = 0;
|
2021-12-31 15:23:29 +00:00
|
|
|
for (const auto & col : block)
|
|
|
|
{
|
2021-12-30 18:11:12 +00:00
|
|
|
max_col_size = std::max(max_col_size, col.column->size());
|
|
|
|
}
|
|
|
|
String json_array = "[";
|
2021-12-31 15:23:29 +00:00
|
|
|
for (size_t i = 0; i < max_col_size; ++i)
|
|
|
|
{
|
2021-12-30 18:11:12 +00:00
|
|
|
json_array += getOneElement(block, i) + ",";
|
|
|
|
}
|
|
|
|
json_array.back() = ']';
|
|
|
|
auto response = connection.updateQuery(json_array);
|
|
|
|
JSON jres = JSON(response).begin();
|
2021-12-31 15:23:29 +00:00
|
|
|
if (jres.getName() == "message")
|
|
|
|
{
|
2021-12-30 18:11:12 +00:00
|
|
|
throw Exception(ErrorCodes::MEILISEARCH_EXCEPTION, jres.getValue().toString());
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
Blocks SinkMeiliSearch::splitBlocks(const Block & block, const size_t & max_rows) const
|
|
|
|
{
|
|
|
|
/// Avoid Excessive copy when block is small enough
|
|
|
|
if (block.rows() <= max_rows)
|
|
|
|
return Blocks{std::move(block)};
|
|
|
|
|
|
|
|
const size_t split_block_size = ceil(block.rows() * 1.0 / max_rows);
|
|
|
|
Blocks split_blocks(split_block_size);
|
|
|
|
|
|
|
|
for (size_t idx = 0; idx < split_block_size; ++idx)
|
|
|
|
split_blocks[idx] = block.cloneEmpty();
|
|
|
|
|
|
|
|
const size_t columns = block.columns();
|
|
|
|
const size_t rows = block.rows();
|
|
|
|
size_t offsets = 0;
|
|
|
|
UInt64 limits = max_block_size;
|
|
|
|
for (size_t idx = 0; idx < split_block_size; ++idx)
|
|
|
|
{
|
|
|
|
/// For last batch, limits should be the remain size
|
2021-12-31 15:23:29 +00:00
|
|
|
if (idx == split_block_size - 1)
|
|
|
|
limits = rows - offsets;
|
2021-12-30 18:11:12 +00:00
|
|
|
for (size_t col_idx = 0; col_idx < columns; ++col_idx)
|
|
|
|
{
|
|
|
|
split_blocks[idx].getByPosition(col_idx).column = block.getByPosition(col_idx).column->cut(offsets, limits);
|
|
|
|
}
|
|
|
|
offsets += max_block_size;
|
|
|
|
}
|
|
|
|
|
|
|
|
return split_blocks;
|
|
|
|
}
|
|
|
|
|
2021-12-31 15:23:29 +00:00
|
|
|
void SinkMeiliSearch::consume(Chunk chunk)
|
|
|
|
{
|
2021-12-30 18:11:12 +00:00
|
|
|
auto block = getHeader().cloneWithColumns(chunk.detachColumns());
|
|
|
|
auto blocks = splitBlocks(block, max_block_size);
|
2021-12-31 15:23:29 +00:00
|
|
|
for (const auto & b : blocks)
|
|
|
|
{
|
|
|
|
writeBlockData(b);
|
2021-12-30 18:11:12 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
}
|