add support for some new types

This commit is contained in:
Mikhail Artemenko 2022-01-27 19:22:05 +03:00
parent 698364f7b0
commit f349f0e006
10 changed files with 234 additions and 102 deletions

View File

@ -602,6 +602,8 @@
M(632, UNEXPECTED_DATA_AFTER_PARSED_VALUE) \
M(633, QUERY_IS_NOT_SUPPORTED_IN_WINDOW_VIEW) \
M(634, MEILISEARCH_EXCEPTION) \
M(635, UNSUPPORTED_MEILISEARCH_TYPE) \
M(636, MEILISEARCH_MISSING_SOME_COLUMNS) \
\
M(999, KEEPER_EXCEPTION) \
M(1000, POCO_EXCEPTION) \

View File

@ -1,5 +1,5 @@
#include "MeiliSearchConnection.h"
#include <string_view>
#include <Storages/MeiliSearch/MeiliSearchConnection.h>
#include <curl/curl.h>
#include <Common/Exception.h>
@ -20,10 +20,7 @@ static size_t writeCallback(void * contents, size_t size, size_t nmemb, void * u
return size * nmemb;
}
CURLcode MeiliSearchConnection::execQuery(
std::string_view url,
std::string_view post_fields,
std::string& response_buffer) const
CURLcode MeiliSearchConnection::execQuery(std::string_view url, std::string_view post_fields, std::string & response_buffer) const
{
CURLcode ret_code;
CURL * handle;

View File

@ -4,7 +4,7 @@
#include <string_view>
#include <unordered_map>
#include <base/types.h>
#include "curl/curl.h"
#include <curl/curl.h>
namespace DB
{
@ -33,11 +33,7 @@ public:
String updateQuery(std::string_view data) const;
private:
CURLcode execQuery(
std::string_view url,
std::string_view post_fields,
std::string& response_buffer) const;
CURLcode execQuery(std::string_view url, std::string_view post_fields, std::string & response_buffer) const;
MeiliConfig config;
};

View File

@ -1,10 +1,9 @@
#include <Formats/FormatFactory.h>
#include <IO/WriteBufferFromString.h>
#include <Processors/Formats/Impl/JSONRowOutputFormat.h>
#include <Storages/MeiliSearch/SinkMeiliSearch.h>
#include "Core/Field.h"
#include "Formats/FormatFactory.h"
#include "IO/WriteBufferFromString.h"
#include "Processors/Formats/Impl/JSONRowOutputFormat.h"
#include "base/JSON.h"
#include "base/types.h"
#include <base/JSON.h>
#include <base/types.h>
namespace DB
{
@ -13,25 +12,28 @@ namespace ErrorCodes
extern const int MEILISEARCH_EXCEPTION;
}
SinkMeiliSearch::SinkMeiliSearch(
const MeiliSearchConfiguration & config_, const Block & sample_block_, ContextPtr local_context_)
: SinkToStorage(sample_block_)
, connection(config_)
, local_context{local_context_}
, sample_block{sample_block_}
SinkMeiliSearch::SinkMeiliSearch(const MeiliSearchConfiguration & config_, const Block & sample_block_, ContextPtr local_context_)
: SinkToStorage(sample_block_), connection(config_), local_context{local_context_}, sample_block{sample_block_}
{
}
void extractData(std::string_view& view) {
int ind = view.find("\"data\":") + 9;
view.remove_prefix(ind);
int bal = ind = 1;
while (bal > 0)
{
if (view[ind] == '[') ++bal;
else if (view[ind] == ']') --bal;
// gets the content of the json data section, which was obtained using the JSON format output
// "data": [{...}, {...}, {...}]
void extractData(std::string_view & view)
{
size_t ind = view.find("\"data\":");
while (view[ind] != '[')
++ind;
}
view.remove_prefix(ind);
size_t bal = ind = 1;
while (bal > 0)
{
if (view[ind] == '[')
++bal;
else if (view[ind] == ']')
--bal;
++ind;
}
view.remove_suffix(view.size() - ind);
}

View File

@ -1,18 +1,17 @@
#pragma once
#include <Core/ExternalResultDescription.h>
#include <Interpreters/Context.h>
#include <Interpreters/Context_fwd.h>
#include <Processors/Sinks/SinkToStorage.h>
#include <Storages/MeiliSearch/MeiliSearchConnection.h>
#include "Interpreters/Context_fwd.h"
namespace DB
{
class SinkMeiliSearch : public SinkToStorage
{
public:
SinkMeiliSearch(
const MeiliSearchConfiguration & config_, const Block & sample_block_, ContextPtr local_context_);
SinkMeiliSearch(const MeiliSearchConfiguration & config_, const Block & sample_block_, ContextPtr local_context_);
String getName() const override { return "SinkMeiliSearch"; }

View File

@ -1,19 +1,32 @@
#include "SourceMeiliSearch.h"
#include <Columns/ColumnString.h>
#include <Columns/ColumnVector.h>
#include <Columns/ColumnsNumber.h>
#include <Columns/IColumn.h>
#include <Core/ExternalResultDescription.h>
#include <Core/Field.h>
#include <Core/Types.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/Serializations/ISerialization.h>
#include <IO/ReadBufferFromString.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <Storages/MeiliSearch/SourceMeiliSearch.h>
#include <base/JSON.h>
#include <base/range.h>
#include "Common/Exception.h"
#include "Common/quoteString.h"
#include "Columns/ColumnString.h"
#include "Columns/ColumnVector.h"
#include "Columns/IColumn.h"
#include "Core/Field.h"
#include "base/types.h"
#include <base/types.h>
#include <magic_enum.hpp>
#include <Common/Exception.h>
#include <Common/quoteString.h>
namespace DB
{
namespace ErrorCodes
{
extern const int MEILISEARCH_EXCEPTION;
extern const int UNSUPPORTED_MEILISEARCH_TYPE;
extern const int MEILISEARCH_MISSING_SOME_COLUMNS;
}
MeiliSearchSource::MeiliSearchSource(
@ -32,7 +45,7 @@ MeiliSearchSource::MeiliSearchSource(
String columns_to_get = "[";
for (const auto & col : description.sample_block)
columns_to_get += doubleQuoteString(col.name) + ",";
columns_to_get.back() = ']';
query_params[doubleQuoteString("attributesToRetrieve")] = columns_to_get;
@ -42,35 +55,81 @@ MeiliSearchSource::MeiliSearchSource(
MeiliSearchSource::~MeiliSearchSource() = default;
void insertWithTypeId(MutableColumnPtr & column, JSON kv_pair, ExternalResultDescription::ValueType type_id)
Field getField(JSON value, DataTypePtr type_ptr)
{
if (type_id == ExternalResultDescription::ValueType::vtUInt64 ||
type_id == ExternalResultDescription::ValueType::vtUInt32 ||
type_id == ExternalResultDescription::ValueType::vtUInt16 ||
type_id == ExternalResultDescription::ValueType::vtUInt8)
TypeIndex type_id = type_ptr->getTypeId();
if (type_id == TypeIndex::UInt64 || type_id == TypeIndex::UInt32 || type_id == TypeIndex::UInt16 || type_id == TypeIndex::UInt8)
{
auto value = kv_pair.getValue().get<UInt64>();
column->insert(value);
if (value.isBool())
return value.getBool();
else
return value.get<UInt64>();
}
else if (type_id == ExternalResultDescription::ValueType::vtInt64 ||
type_id == ExternalResultDescription::ValueType::vtInt32 ||
type_id == ExternalResultDescription::ValueType::vtInt16 ||
type_id == ExternalResultDescription::ValueType::vtInt8)
else if (type_id == TypeIndex::Int64 || type_id == TypeIndex::Int32 || type_id == TypeIndex::Int16 || type_id == TypeIndex::Int8)
{
auto value = kv_pair.getValue().get<Int64>();
column->insert(value);
return value.get<Int64>();
}
else if (type_id == ExternalResultDescription::ValueType::vtString)
else if (type_id == TypeIndex::String)
{
auto value = kv_pair.getValue().get<String>();
column->insert(value);
if (value.isObject())
return value.toString();
else
return value.get<String>();
}
else if (type_id == ExternalResultDescription::ValueType::vtFloat64 ||
type_id == ExternalResultDescription::ValueType::vtFloat32)
else if (type_id == TypeIndex::Float64 || type_id == TypeIndex::Float32)
{
auto value = kv_pair.getValue().get<Float64>();
column->insert(value);
return value.get<Float64>();
}
else if (type_id == TypeIndex::Date)
{
return UInt16{LocalDate{String(value.toString())}.getDayNum()};
}
else if (type_id == TypeIndex::Date32)
{
return Int32{LocalDate{String(value.toString())}.getExtenedDayNum()};
}
else if (type_id == TypeIndex::DateTime)
{
ReadBufferFromString in(value.toString());
time_t time = 0;
readDateTimeText(time, in, assert_cast<const DataTypeDateTime *>(type_ptr.get())->getTimeZone());
if (time < 0)
time = 0;
return time;
}
else if (type_id == TypeIndex::Nullable)
{
if (value.isNull())
return Null();
const auto * null_type = typeid_cast<const DataTypeNullable *>(type_ptr.get());
DataTypePtr nested = null_type->getNestedType();
return getField(value, nested);
}
else if (type_id == TypeIndex::Array)
{
const auto * array_type = typeid_cast<const DataTypeArray *>(type_ptr.get());
DataTypePtr nested = array_type->getNestedType();
Array array;
for (const auto el : value)
array.push_back(getField(el, nested));
return array;
}
else
{
const std::string_view type_name = magic_enum::enum_name(type_id);
const String err_msg = "MeiliSearch storage doesn't support type: ";
throw Exception(ErrorCodes::UNSUPPORTED_MEILISEARCH_TYPE, err_msg + type_name.data());
}
}
void insertWithTypeId(MutableColumnPtr & column, JSON value, DataTypePtr type_ptr)
{
column->insert(getField(value, type_ptr));
}
Chunk MeiliSearchSource::generate()
@ -94,16 +153,21 @@ Chunk MeiliSearchSource::generate()
for (const auto json : jres.getValue())
{
++cnt_match;
size_t cnt_fields = 0;
for (const auto kv_pair : json)
{
++cnt_fields;
const auto & name = kv_pair.getName();
int pos = description.sample_block.getPositionByName(name);
auto & col = columns[pos];
ExternalResultDescription::ValueType type_id = description.types[pos].first;
insertWithTypeId(col, kv_pair, type_id);
MutableColumnPtr & col = columns[pos];
DataTypePtr type_ptr = description.sample_block.getByPosition(pos).type;
insertWithTypeId(col, kv_pair.getValue(), type_ptr);
}
if (cnt_fields != columns.size())
throw Exception(ErrorCodes::MEILISEARCH_MISSING_SOME_COLUMNS, "Some columns were not found in the table");
}
offset += cnt_match;
if (cnt_match == 0)

View File

@ -1,10 +1,12 @@
#pragma once
#include <unordered_map>
#include <Core/ColumnsWithTypeAndName.h>
#include <Core/ExternalResultDescription.h>
#include <Processors/Chunk.h>
#include <Processors/Sources/SourceWithProgress.h>
#include <Storages/MeiliSearch/MeiliSearchConnection.h>
#include <base/JSON.h>
namespace DB
{

View File

@ -1,4 +1,8 @@
#include <Core/Types.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTSelectQuery.h>
#include <Parsers/IAST_fwd.h>
#include <Processors/Formats/IOutputFormat.h>
#include <QueryPipeline/Pipe.h>
#include <Storages/IStorage.h>
#include <Storages/MeiliSearch/MeiliSearchConnection.h>
@ -11,10 +15,6 @@
#include <Storages/transformQueryForExternalDatabase.h>
#include <base/logger_useful.h>
#include <Common/parseAddress.h>
#include "Core/Types.h"
#include "Parsers/ASTFunction.h"
#include "Parsers/IAST_fwd.h"
#include "Processors/Formats/IOutputFormat.h"
namespace DB
{
@ -39,16 +39,7 @@ StorageMeiliSearch::StorageMeiliSearch(
setInMemoryMetadata(storage_metadata);
}
void printAST(ASTPtr ptr)
{
WriteBufferFromOwnString out;
IAST::FormatSettings settings(out, true);
settings.identifier_quoting_style = IdentifierQuotingStyle::BackticksMySQL;
settings.always_quote_identifiers = IdentifierQuotingStyle::BackticksMySQL != IdentifierQuotingStyle::None;
ptr->format(settings);
}
std::string convertASTtoStr(ASTPtr ptr)
String convertASTtoStr(ASTPtr ptr)
{
WriteBufferFromOwnString out;
IAST::FormatSettings settings(out, true);
@ -58,17 +49,17 @@ std::string convertASTtoStr(ASTPtr ptr)
return out.str();
}
ASTPtr getFunctionParams(ASTPtr node, const std::string & name)
ASTPtr getFunctionParams(ASTPtr node, const String & name)
{
if (!node)
return nullptr;
const auto * ptr = node->as<ASTFunction>();
if (ptr && ptr->name == name)
{
if (node->children.size() == 1)
if (node->children.size() == 1)
return node->children[0];
else
else
return nullptr;
}
for (const auto & next : node->children)
@ -105,7 +96,7 @@ Pipe StorageMeiliSearch::read(
auto it = find(str.begin(), str.end(), '=');
if (it == str.end())
throw Exception("meiliMatch function must have parameters of the form \'key=value\'", ErrorCodes::BAD_QUERY_PARAMETER);
String key(str.begin() + 1, it);
String value(it + 1, str.end() - 1);
kv_pairs_params[key] = value;

View File

@ -1,10 +1,9 @@
#pragma once
#include <base/shared_ptr_helper.h>
#include <Storages/ExternalDataSourceConfiguration.h>
#include <Storages/IStorage.h>
#include <Storages/MeiliSearch/MeiliSearchConnection.h>
#include <base/shared_ptr_helper.h>
namespace DB

View File

@ -1,9 +1,12 @@
import json
import os
from pydoc import cli
from time import sleep
import meilisearch
from pymysql import NULL
import pytest
from sympy import true
from helpers.client import QueryRuntimeException
from helpers.cluster import ClickHouseCluster
@ -154,8 +157,8 @@ def test_incorrect_data_type(started_cluster):
node = started_cluster.instances['meili']
node.query("CREATE TABLE strange_meili_table(id UInt64, data String, bbbb String) ENGINE = MeiliSearch('meili1:7700', 'new_table', '')")
with pytest.raises(QueryRuntimeException):
node.query("SELECT bbbb FROM strange_meili_table")
error = node.query_and_get_error("SELECT bbbb FROM strange_meili_table")
assert("MEILISEARCH_MISSING_SOME_COLUMNS" in error)
node.query("DROP TABLE strange_meili_table")
table.delete()
@ -182,14 +185,14 @@ def test_simple_select_secure(started_cluster):
assert node.query("SELECT sum(id) FROM simple_meili_table") == str(sum(range(0, 100))) + '\n'
assert node.query("SELECT data FROM simple_meili_table WHERE id = 42") == hex(42 * 42) + '\n'
with pytest.raises(QueryRuntimeException):
node.query("SELECT COUNT() FROM wrong_meili_table")
error = node.query_and_get_error("SELECT COUNT() FROM wrong_meili_table")
assert("MEILISEARCH_EXCEPTION" in error)
with pytest.raises(QueryRuntimeException):
node.query("SELECT sum(id) FROM wrong_meili_table")
error = node.query_and_get_error("SELECT sum(id) FROM wrong_meili_table")
assert("MEILISEARCH_EXCEPTION" in error)
with pytest.raises(QueryRuntimeException):
node.query("SELECT data FROM wrong_meili_table WHERE id = 42")
error = node.query_and_get_error("SELECT data FROM wrong_meili_table WHERE id = 42")
assert("MEILISEARCH_EXCEPTION" in error)
node.query("DROP TABLE simple_meili_table")
node.query("DROP TABLE wrong_meili_table")
@ -252,8 +255,8 @@ def test_incorrect_data_type_secure(started_cluster):
node = started_cluster.instances['meili']
node.query("CREATE TABLE strange_meili_table(id UInt64, data String, bbbb String) ENGINE = MeiliSearch('meili_secure:7700', 'new_table', 'password')")
with pytest.raises(QueryRuntimeException):
node.query("SELECT bbbb FROM strange_meili_table")
error = node.query_and_get_error("SELECT bbbb FROM strange_meili_table")
assert("MEILISEARCH_MISSING_SOME_COLUMNS" in error)
node.query("DROP TABLE strange_meili_table")
table.delete()
@ -306,8 +309,8 @@ def test_security_levels(started_cluster):
node.query(f"CREATE TABLE read_table(id UInt64, data String) ENGINE = MeiliSearch('meili_secure:7700', 'new_table', '{search_key}')")
node.query(f"CREATE TABLE write_table(id UInt64, data String) ENGINE = MeiliSearch('meili_secure:7700', 'new_table', '{admin_key}')")
with pytest.raises(QueryRuntimeException):
node.query("INSERT INTO read_table (id, data) VALUES " + values)
error = node.query_and_get_error("INSERT INTO read_table (id, data) VALUES " + values)
assert("MEILISEARCH_EXCEPTION" in error)
node.query("INSERT INTO write_table (id, data) VALUES " + values)
sleep(1)
@ -326,3 +329,80 @@ def test_security_levels(started_cluster):
node.query("DROP TABLE write_table")
client.index("new_table").delete()
@pytest.mark.parametrize('started_cluster', [False], indirect=['started_cluster'])
def test_types(started_cluster):
client = get_meili_client(started_cluster)
table = client.index("types_table")
data = {
'id' : 1,
'UInt8_test' : 128,
'UInt16_test' : 32768,
'UInt32_test' : 2147483648,
'UInt64_test' : 9223372036854775808,
'Int8_test' : -128,
'Int16_test' : -32768,
'Int32_test' : -2147483648,
'Int64_test' : -9223372036854775808,
'String_test' : "abacaba",
'Float32_test' : 42.42,
'Float64_test' : 42.42,
'Array_test' : [['aba', 'caba'], ['2d', 'array']],
'Null_test1' : "value",
'Null_test2' : NULL,
'Bool_test1' : True,
'Bool_test2' : False,
'Json_test' : {"a" : 1, "b" : {"in_json" : "qwerty"}}
}
push_data(client, table, data)
node = started_cluster.instances['meili']
node.query("CREATE TABLE types_table(\
id UInt64,\
UInt8_test UInt8,\
UInt16_test UInt16,\
UInt32_test UInt32,\
UInt64_test UInt64,\
Int8_test Int8,\
Int16_test Int16,\
Int32_test Int32,\
Int64_test Int64,\
String_test String,\
Float32_test Float32,\
Float64_test Float64,\
Array_test Array(Array(String)),\
Null_test1 Nullable(String),\
Null_test2 Nullable(String),\
Bool_test1 Boolean,\
Bool_test2 Boolean,\
Json_test String\
) ENGINE = MeiliSearch('meili1:7700', 'types_table', '')")
assert node.query("SELECT id FROM types_table") == '1\n'
assert node.query("SELECT UInt8_test FROM types_table") == '128\n'
assert node.query("SELECT UInt16_test FROM types_table") == '32768\n'
assert node.query("SELECT UInt32_test FROM types_table") == '2147483648\n'
assert node.query("SELECT UInt64_test FROM types_table") == '9223372036854775808\n'
assert node.query("SELECT Int8_test FROM types_table") == '-128\n'
assert node.query("SELECT Int16_test FROM types_table") == '-32768\n'
assert node.query("SELECT Int32_test FROM types_table") == '-2147483648\n'
assert node.query("SELECT Int64_test FROM types_table") == '-9223372036854775808\n'
assert node.query("SELECT String_test FROM types_table") == 'abacaba\n'
assert node.query("SELECT Float32_test FROM types_table") == '42.42\n'
assert node.query("SELECT Float32_test FROM types_table") == '42.42\n'
assert node.query("SELECT Array_test FROM types_table") == "[['aba','caba'],['2d','array']]\n"
assert node.query("SELECT Null_test1 FROM types_table") == 'value\n'
assert node.query("SELECT Null_test2 FROM types_table") == 'NULL\n'
assert node.query("SELECT Bool_test1 FROM types_table") == 'true\n'
assert node.query("SELECT Bool_test2 FROM types_table") == 'false\n'
assert node.query("SELECT Json_test FROM types_table") == '{"a":1,"b":{"in_json":"qwerty"}}\n'
node.query("DROP TABLE types_table")
table.delete()