add table function

This commit is contained in:
Mikhail Artemenko 2022-02-28 22:43:43 +03:00
parent 54ea1c172a
commit 96cfc7f07a
10 changed files with 290 additions and 5 deletions

View File

@ -0,0 +1,92 @@
#include <memory>
#include <Storages/MeiliSearch/MeiliSearchColumnDescriptionFetcher.h>
#include <base/JSON.h>
#include "DataTypes/DataTypeArray.h"
#include "DataTypes/DataTypeNullable.h"
#include "DataTypes/DataTypeString.h"
#include "DataTypes/DataTypesNumber.h"
#include "DataTypes/Serializations/ISerialization.h"
namespace DB
{
namespace ErrorCodes
{
extern const int UNSUPPORTED_MEILISEARCH_TYPE;
extern const int CANNOT_READ_ALL_DATA;
extern const int MEILISEARCH_EXCEPTION;
}
MeiliSearchColumnDescriptionFetcher::MeiliSearchColumnDescriptionFetcher(const MeiliSearchConfiguration & config) : connection(config)
{
}
void MeiliSearchColumnDescriptionFetcher::addParam(const String & key, const String & val)
{
query_params[key] = val;
}
bool isDouble(const std::string & val)
{
std::istringstream iss(val);
double f;
iss >> std::noskipws >> f;
return iss.eof() && !iss.fail() && static_cast<Int64>(f) != f;
}
DataTypePtr parseTypeOfField(JSON ptr)
{
if (ptr.isString())
{
return std::make_shared<DataTypeString>();
}
if (ptr.isArray())
{
auto nested_type = parseTypeOfField(ptr.begin());
return std::make_shared<DataTypeArray>(nested_type);
}
if (ptr.isBool())
{
return std::make_shared<DataTypeUInt8>();
}
if (ptr.isNull())
{
DataTypePtr res = std::make_shared<DataTypeNullable>(res);
return res;
}
if (isDouble(ptr.toString()))
{
return std::make_shared<DataTypeFloat64>();
}
if (ptr.isNumber())
{
return std::make_shared<DataTypeInt64>();
}
if (ptr.isObject())
{
return std::make_shared<DataTypeString>();
}
throw Exception(ErrorCodes::UNSUPPORTED_MEILISEARCH_TYPE, "Can't recognize type of some fields");
}
ColumnsDescription MeiliSearchColumnDescriptionFetcher::fetchColumnsDescription() const
{
auto response = connection.searchQuery(query_params);
JSON jres = JSON(response).begin();
if (jres.getName() == "message")
throw Exception(ErrorCodes::MEILISEARCH_EXCEPTION, jres.getValue().toString());
NamesAndTypesList list;
for (const JSON kv_pair : jres.getValue().begin())
{
if (!kv_pair.isNameValuePair())
throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "Bad response data");
list.emplace_back(kv_pair.getName(), parseTypeOfField(kv_pair.getValue()));
}
return ColumnsDescription(list);
}
};

View File

@ -0,0 +1,22 @@
#include <unordered_map>
#include <Storages/ColumnsDescription.h>
#include <Storages/MeiliSearch/MeiliSearchConnection.h>
#include <base/types.h>
namespace DB
{
class MeiliSearchColumnDescriptionFetcher
{
public:
explicit MeiliSearchColumnDescriptionFetcher(const MeiliSearchConfiguration & config);
void addParam(const String & key, const String & val);
ColumnsDescription fetchColumnsDescription() const;
private:
std::unordered_map<String, String> query_params;
MeiliSearchConnection connection;
};
};

View File

@ -164,7 +164,8 @@ Chunk MeiliSearchSource::generate()
insertWithTypeId(col, kv_pair.getValue(), type_ptr);
}
if (cnt_fields != columns.size())
throw Exception(ErrorCodes::MEILISEARCH_MISSING_SOME_COLUMNS, "Some columns were not found in the table");
throw Exception(
ErrorCodes::MEILISEARCH_MISSING_SOME_COLUMNS, "Some columns were not found in the table, json = " + json.toString());
}

View File

@ -121,7 +121,7 @@ SinkToStoragePtr StorageMeiliSearch::write(const ASTPtr & /*query*/, const Stora
return std::make_shared<SinkMeiliSearch>(config, metadata_snapshot->getSampleBlock(), local_context);
}
MeiliSearchConfiguration getConfiguration(ASTs engine_args, ContextPtr context)
MeiliSearchConfiguration StorageMeiliSearch::getConfiguration(ASTs engine_args, ContextPtr context)
{
if (auto named_collection = getExternalDataSourceConfiguration(engine_args, context))
{
@ -177,7 +177,7 @@ void registerStorageMeiliSearch(StorageFactory & factory)
factory.registerStorage(
"MeiliSearch",
[](const StorageFactory::Arguments & args) {
auto config = getConfiguration(args.engine_args, args.getLocalContext());
auto config = StorageMeiliSearch::getConfiguration(args.engine_args, args.getLocalContext());
return StorageMeiliSearch::create(args.table_id, config, args.columns, args.constraints, args.comment);
},
{

View File

@ -33,10 +33,9 @@ public:
SinkToStoragePtr write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context) override;
MeiliSearchConfiguration static getConfiguration(ASTs engine_args, ContextPtr context);
private:
MeiliSearchConfiguration getConfiguration(ASTs engine_args);
MeiliSearchConfiguration config;
Poco::Logger * log;

View File

@ -0,0 +1,44 @@
#include <memory>
#include <TableFunctions/TableFunctionMeiliSearch.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <Storages/MeiliSearch/StorageMeiliSearch.h>
#include <Storages/MeiliSearch/MeiliSearchColumnDescriptionFetcher.h>
#include <Common/Exception.h>
#include <Parsers/ASTFunction.h>
namespace DB
{
StoragePtr TableFunctionMeiliSearch::executeImpl(const ASTPtr & /* ast_function */,
ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/) const
{
auto columns = getActualTableStructure(context);
return StorageMeiliSearch::create(
StorageID(getDatabaseName(), table_name),
configuration.value(),
columns,
ConstraintsDescription{},
String{});
}
ColumnsDescription TableFunctionMeiliSearch::getActualTableStructure(ContextPtr /* context */) const
{
MeiliSearchColumnDescriptionFetcher fetcher(configuration.value());
fetcher.addParam(doubleQuoteString("limit"), "1");
return fetcher.fetchColumnsDescription();
}
void TableFunctionMeiliSearch::parseArguments(const ASTPtr & ast_function, ContextPtr context)
{
const auto & func_args = ast_function->as<ASTFunction &>();
configuration = StorageMeiliSearch::getConfiguration(func_args.arguments->children, context);
}
void registerTableFunctionMeiliSearch(TableFunctionFactory & factory)
{
factory.registerFunction<TableFunctionMeiliSearch>();
}
}

View File

@ -0,0 +1,28 @@
#pragma once
#include <TableFunctions/ITableFunction.h>
#include <Storages/MeiliSearch/MeiliSearchConnection.h>
namespace DB
{
class TableFunctionMeiliSearch : public ITableFunction
{
public:
static constexpr auto name = "MeiliSearch";
std::string getName() const override { return name; }
private:
StoragePtr executeImpl(
const ASTPtr & ast_function, ContextPtr context,
const std::string & table_name, ColumnsDescription cached_columns) const override;
const char * getStorageTypeName() const override { return "MeiliSearch"; }
ColumnsDescription getActualTableStructure(ContextPtr context) const override;
void parseArguments(const ASTPtr & ast_function, ContextPtr context) override;
std::optional<MeiliSearchConfiguration> configuration;
};
}

View File

@ -20,6 +20,8 @@ void registerTableFunctions()
registerTableFunctionInput(factory);
registerTableFunctionGenerate(factory);
registerTableFunctionMeiliSearch(factory);
#if USE_AWS_S3
registerTableFunctionS3(factory);
registerTableFunctionS3Cluster(factory);

View File

@ -18,6 +18,8 @@ void registerTableFunctionValues(TableFunctionFactory & factory);
void registerTableFunctionInput(TableFunctionFactory & factory);
void registerTableFunctionGenerate(TableFunctionFactory & factory);
void registerTableFunctionMeiliSearch(TableFunctionFactory & factory);
#if USE_AWS_S3
void registerTableFunctionS3(TableFunctionFactory & factory);
void registerTableFunctionS3Cluster(TableFunctionFactory & factory);

View File

@ -458,3 +458,98 @@ def test_named_collection_secure(started_cluster):
node.query("DROP TABLE simple_meili_table")
node.query("DROP TABLE wrong_meili_table")
table.delete()
@pytest.mark.parametrize('started_cluster', [False], indirect=['started_cluster'])
def test_table_function(started_cluster):
client = get_meili_client(started_cluster)
table = client.index("new_table")
data = []
for i in range(0, 100):
data.append({'id': i, 'data': hex(i * i)})
push_data(client, table, data)
node = started_cluster.instances['meili']
assert node.query("SELECT COUNT() FROM MeiliSearch('meili1:7700', 'new_table', '')") == '100\n'
assert node.query("SELECT sum(id) FROM MeiliSearch('meili1:7700', 'new_table', '')") == str(sum(range(0, 100))) + '\n'
assert node.query("SELECT data FROM MeiliSearch('meili1:7700', 'new_table', '') WHERE id = 42") == hex(42 * 42) + '\n'
table.delete()
@pytest.mark.parametrize('started_cluster', [False], indirect=['started_cluster'])
def test_table_function_secure(started_cluster):
client = get_meili_secure_client(started_cluster)
table = client.index("new_table")
data = []
for i in range(0, 100):
data.append({'id': i, 'data': hex(i * i)})
push_data(client, table, data)
node = started_cluster.instances['meili']
assert node.query("SELECT COUNT() FROM MeiliSearch('meili_secure:7700', 'new_table', 'password')") == '100\n'
assert node.query("SELECT sum(id) FROM MeiliSearch('meili_secure:7700', 'new_table', 'password')") == str(sum(range(0, 100))) + '\n'
assert node.query("SELECT data FROM MeiliSearch('meili_secure:7700', 'new_table', 'password') WHERE id = 42") == hex(42 * 42) + '\n'
error = node.query_and_get_error("SELECT COUNT() FROM MeiliSearch('meili_secure:7700', 'new_table', 'wrong_password')")
assert("MEILISEARCH_EXCEPTION" in error)
error = node.query_and_get_error("SELECT sum(id) FROM MeiliSearch('meili_secure:7700', 'new_table', 'wrong_password')")
assert("MEILISEARCH_EXCEPTION" in error)
error = node.query_and_get_error("SELECT data FROM MeiliSearch('meili_secure:7700', 'new_table', 'wrong_password') WHERE id = 42")
assert("MEILISEARCH_EXCEPTION" in error)
table.delete()
@pytest.mark.parametrize('started_cluster', [False], indirect=['started_cluster'])
def test_types_in_table_function(started_cluster):
client = get_meili_client(started_cluster)
table = client.index("types_table")
data = {
'id' : 1,
'UInt8_test' : 128,
'UInt16_test' : 32768,
'UInt32_test' : 2147483648,
'Int8_test' : -128,
'Int16_test' : -32768,
'Int32_test' : -2147483648,
'Int64_test' : -9223372036854775808,
'String_test' : "abacaba",
'Float32_test' : 42.42,
'Float64_test' : 42.42,
'Array_test' : [['aba', 'caba'], ['2d', 'array']],
'Null_test1' : "value",
'Null_test2' : NULL,
'Bool_test1' : True,
'Bool_test2' : False,
'Json_test' : {"a" : 1, "b" : {"in_json" : "qwerty"}}
}
push_data(client, table, data)
node = started_cluster.instances['meili']
assert node.query("SELECT id FROM MeiliSearch('meili1:7700', 'types_table', '')") == '1\n'
assert node.query("SELECT UInt8_test FROM MeiliSearch('meili1:7700', 'types_table', '')") == '128\n'
assert node.query("SELECT UInt16_test FROM MeiliSearch('meili1:7700', 'types_table', '')") == '32768\n'
assert node.query("SELECT UInt32_test FROM MeiliSearch('meili1:7700', 'types_table', '')") == '2147483648\n'
assert node.query("SELECT Int8_test FROM MeiliSearch('meili1:7700', 'types_table', '')") == '-128\n'
assert node.query("SELECT Int16_test FROM MeiliSearch('meili1:7700', 'types_table', '')") == '-32768\n'
assert node.query("SELECT Int32_test FROM MeiliSearch('meili1:7700', 'types_table', '')") == '-2147483648\n'
assert node.query("SELECT Int64_test FROM MeiliSearch('meili1:7700', 'types_table', '')") == '-9223372036854775808\n'
assert node.query("SELECT String_test FROM MeiliSearch('meili1:7700', 'types_table', '')") == 'abacaba\n'
assert node.query("SELECT Float32_test FROM MeiliSearch('meili1:7700', 'types_table', '')") == '42.42\n'
assert node.query("SELECT Float32_test FROM MeiliSearch('meili1:7700', 'types_table', '')") == '42.42\n'
assert node.query("SELECT Array_test FROM MeiliSearch('meili1:7700', 'types_table', '')") == "[['aba','caba'],['2d','array']]\n"
assert node.query("SELECT Null_test1 FROM MeiliSearch('meili1:7700', 'types_table', '')") == 'value\n'
assert node.query("SELECT Null_test2 FROM MeiliSearch('meili1:7700', 'types_table', '')") == 'NULL\n'
assert node.query("SELECT Bool_test1 FROM MeiliSearch('meili1:7700', 'types_table', '')") == '1\n'
assert node.query("SELECT Bool_test2 FROM MeiliSearch('meili1:7700', 'types_table', '')") == '0\n'
assert node.query("SELECT Json_test FROM MeiliSearch('meili1:7700', 'types_table', '')") == '{"a":1,"b":{"in_json":"qwerty"}}\n'
table.delete()