mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 07:31:57 +00:00
add tests to redis engine
This commit is contained in:
parent
ff961834d6
commit
8c822a7edf
@ -1,5 +1,9 @@
|
||||
#include "RedisCommon.h"
|
||||
#include <Common/Exception.h>
|
||||
#include <Common/parseAddress.h>
|
||||
#include <Storages/NamedCollectionsHelpers.h>
|
||||
#include <Interpreters/evaluateConstantExpression.h>
|
||||
#include <Storages/checkAndGetLiteralArgument.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -169,7 +173,7 @@ RedisColumnType getRedisColumnType(RedisStorageType storage_type, const Names &
|
||||
}
|
||||
}
|
||||
|
||||
RedisConfiguration getRedisConfiguration(const ASTs & engine_args, ContextPtr context)
|
||||
RedisConfiguration getRedisConfiguration(ASTs & engine_args, ContextPtr context)
|
||||
{
|
||||
RedisConfiguration configuration;
|
||||
configuration.db_index = 0;
|
||||
|
@ -84,7 +84,7 @@ RedisArrayPtr getRedisHashMapKeys(const RedisConnectionPtr & connection, RedisAr
|
||||
RedisColumnType getRedisColumnType(RedisStorageType storage_type, const Names & all_columns, const String & column);
|
||||
|
||||
/// parse redis table engine/function configuration from engine_args
|
||||
RedisConfiguration getRedisConfiguration(const ASTs & engine_args, ContextPtr context);
|
||||
RedisConfiguration getRedisConfiguration(ASTs & engine_args, ContextPtr context);
|
||||
|
||||
/// checking Redis table/table-function when creating
|
||||
void checkRedisTableStructure(const ColumnsDescription & columns, const RedisConfiguration & configuration);
|
||||
|
@ -168,7 +168,7 @@ void registerStorageRedis(StorageFactory & factory)
|
||||
"Redis",
|
||||
[](const StorageFactory::Arguments & args)
|
||||
{
|
||||
auto configuration = StorageRedis::getConfiguration(args.engine_args, args.getLocalContext());
|
||||
auto configuration = getRedisConfiguration(args.engine_args, args.getLocalContext());
|
||||
|
||||
checkRedisTableStructure(args.columns, configuration);
|
||||
|
||||
|
@ -11,6 +11,7 @@
|
||||
#include <Storages/ColumnsDescription.h>
|
||||
#include <TableFunctions/TableFunctionFactory.h>
|
||||
#include <TableFunctions/registerTableFunctions.h>
|
||||
#include <Storages/checkAndGetLiteralArgument.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -43,19 +44,6 @@ StoragePtr TableFunctionRedis::executeImpl(
|
||||
/// TODO support user customized table structure
|
||||
ColumnsDescription TableFunctionRedis::getActualTableStructure(ContextPtr context) const
|
||||
{
|
||||
/// generate table structure by storage type.
|
||||
String structure;
|
||||
switch (configuration->storage_type)
|
||||
{
|
||||
case RedisStorageType::SIMPLE:
|
||||
structure = "key String, value String";
|
||||
break;
|
||||
case RedisStorageType::HASH_MAP:
|
||||
structure = "key String, field String, value String";
|
||||
break;
|
||||
case RedisStorageType::UNKNOWN:
|
||||
throw Exception(ErrorCodes::INVALID_REDIS_STORAGE_TYPE, "Invalid Redis storage type.");
|
||||
}
|
||||
return parseColumnsListFromString(structure, context);
|
||||
}
|
||||
|
||||
@ -66,15 +54,25 @@ void TableFunctionRedis::parseArguments(const ASTPtr & ast_function, ContextPtr
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Table function 'redis' must have arguments.");
|
||||
|
||||
ASTs & args = func_args.arguments->children;
|
||||
|
||||
if (args.size() != 5)
|
||||
{
|
||||
throw Exception(
|
||||
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
|
||||
"Table function 'Redis' requires from 5 parameters: "
|
||||
"redis('host:port', db_index, 'password', 'storage_type', 'pool_size')");
|
||||
}
|
||||
configuration = getRedisConfiguration(args, context);
|
||||
|
||||
if (args.size() > 5)
|
||||
structure = checkAndGetLiteralArgument<String>(args[5], "structure");
|
||||
|
||||
if (structure.empty())
|
||||
{
|
||||
switch (configuration->storage_type)
|
||||
{
|
||||
case RedisStorageType::SIMPLE:
|
||||
structure = "key String, value String";
|
||||
break;
|
||||
case RedisStorageType::HASH_MAP:
|
||||
structure = "key String, field String, value String";
|
||||
break;
|
||||
case RedisStorageType::UNKNOWN:
|
||||
throw Exception(ErrorCodes::INVALID_REDIS_STORAGE_TYPE, "Invalid Redis storage type.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
@ -24,6 +24,7 @@ private:
|
||||
void parseArguments(const ASTPtr & ast_function, ContextPtr context) override;
|
||||
|
||||
std::optional<RedisConfiguration> configuration;
|
||||
String structure;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -1,5 +1,6 @@
|
||||
import time
|
||||
|
||||
## sudo -H pip install redis
|
||||
import redis
|
||||
import pytest
|
||||
|
||||
@ -61,11 +62,11 @@ def test_storage_simple_select(started_cluster):
|
||||
"""
|
||||
)
|
||||
|
||||
response = TSV.toMat(node.query("SELECT k, v from test_storage_simple_select where k='0' FORMAT TSV"))
|
||||
response = TSV.toMat(node.query("SELECT k, v FROM test_storage_simple_select WHERE k='0' FORMAT TSV"))
|
||||
assert (len(response) == 1)
|
||||
assert (response[0] == ['0', '0'])
|
||||
|
||||
response = TSV.toMat(node.query("SELECT * from test_storage_simple_select order by k FORMAT TSV"))
|
||||
response = TSV.toMat(node.query("SELECT * FROM test_storage_simple_select ORDER BY k FORMAT TSV"))
|
||||
assert (len(response) == 100)
|
||||
assert (response[0] == ['0', '0'])
|
||||
|
||||
@ -97,11 +98,11 @@ def test_storage_hash_map_select(started_cluster):
|
||||
"""
|
||||
)
|
||||
|
||||
response = TSV.toMat(node.query("SELECT k, f, v from test_storage_hash_map_select where f='0' FORMAT TSV"))
|
||||
response = TSV.toMat(node.query("SELECT k, f, v FROM test_storage_hash_map_select WHERE f='0' FORMAT TSV"))
|
||||
assert (len(response) == 1)
|
||||
assert (response[0] == ['k', '0', '0'])
|
||||
|
||||
response = TSV.toMat(node.query("SELECT * from test_storage_hash_map_select FORMAT TSV"))
|
||||
response = TSV.toMat(node.query("SELECT * FROM test_storage_hash_map_select ORDER BY f FORMAT TSV"))
|
||||
assert (len(response) == 100)
|
||||
assert (response[0] == ['k', '0', '0'])
|
||||
|
||||
|
@ -1,8 +0,0 @@
|
||||
<clickhouse>
|
||||
<openSSL>
|
||||
<client>
|
||||
<!-- For self-signed certificate -->
|
||||
<verificationMode>none</verificationMode>
|
||||
</client>
|
||||
</openSSL>
|
||||
</clickhouse>
|
@ -1,276 +1,159 @@
|
||||
import pymongo
|
||||
import time
|
||||
|
||||
import redis
|
||||
import pytest
|
||||
from helpers.client import QueryRuntimeException
|
||||
|
||||
from helpers.client import QueryRuntimeException
|
||||
from helpers.cluster import ClickHouseCluster
|
||||
from helpers.test_tools import TSV
|
||||
|
||||
cluster = ClickHouseCluster(__file__)
|
||||
|
||||
node = cluster.add_instance("node", with_redis=True)
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def started_cluster(request):
|
||||
def started_cluster():
|
||||
try:
|
||||
cluster = ClickHouseCluster(__file__)
|
||||
node = cluster.add_instance(
|
||||
"node",
|
||||
with_mongo=True,
|
||||
main_configs=[
|
||||
"configs_secure/config.d/ssl_conf.xml",
|
||||
],
|
||||
with_mongo_secure=request.param,
|
||||
)
|
||||
cluster.start()
|
||||
yield cluster
|
||||
finally:
|
||||
cluster.shutdown()
|
||||
|
||||
|
||||
def get_mongo_connection(started_cluster, secure=False, with_credentials=True):
|
||||
connection_str = ""
|
||||
if with_credentials:
|
||||
connection_str = "mongodb://root:clickhouse@localhost:{}".format(
|
||||
started_cluster.mongo_port
|
||||
)
|
||||
else:
|
||||
connection_str = "mongodb://localhost:{}".format(
|
||||
started_cluster.mongo_no_cred_port
|
||||
)
|
||||
if secure:
|
||||
connection_str += "/?tls=true&tlsAllowInvalidCertificates=true"
|
||||
return pymongo.MongoClient(connection_str)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("started_cluster", [False], indirect=["started_cluster"])
|
||||
def test_simple_select(started_cluster):
|
||||
mongo_connection = get_mongo_connection(started_cluster)
|
||||
db = mongo_connection["test"]
|
||||
db.add_user("root", "clickhouse")
|
||||
simple_mongo_table = db["simple_table"]
|
||||
|
||||
node = started_cluster.instances["node"]
|
||||
for i in range(0, 100):
|
||||
node.query(
|
||||
"INSERT INTO FUNCTION mongodb('mongo1:27017', 'test', 'simple_table', 'root', 'clickhouse', structure='key UInt64, data String') (key, data) VALUES ({}, '{}')".format(
|
||||
i, hex(i * i)
|
||||
)
|
||||
)
|
||||
assert (
|
||||
node.query(
|
||||
"SELECT COUNT() FROM mongodb('mongo1:27017', 'test', 'simple_table', 'root', 'clickhouse', structure='key UInt64, data String')"
|
||||
)
|
||||
== "100\n"
|
||||
def get_redis_connection(db_id=0):
|
||||
client = redis.Redis(
|
||||
host='localhost', port=cluster.redis_port, password="clickhouse", db=db_id
|
||||
)
|
||||
assert (
|
||||
node.query(
|
||||
"SELECT sum(key) FROM mongodb('mongo1:27017', 'test', 'simple_table', 'root', 'clickhouse', structure='key UInt64, data String')"
|
||||
)
|
||||
== str(sum(range(0, 100))) + "\n"
|
||||
)
|
||||
assert (
|
||||
node.query(
|
||||
"SELECT sum(key) FROM mongodb('mongo1:27017', 'test', 'simple_table', 'root', 'clickhouse', 'key UInt64, data String')"
|
||||
)
|
||||
== str(sum(range(0, 100))) + "\n"
|
||||
)
|
||||
|
||||
assert (
|
||||
node.query(
|
||||
"SELECT data from mongodb('mongo1:27017', 'test', 'simple_table', 'root', 'clickhouse', structure='key UInt64, data String') where key = 42"
|
||||
)
|
||||
== hex(42 * 42) + "\n"
|
||||
)
|
||||
simple_mongo_table.drop()
|
||||
return client
|
||||
|
||||
|
||||
@pytest.mark.parametrize("started_cluster", [False], indirect=["started_cluster"])
|
||||
def test_complex_data_type(started_cluster):
|
||||
mongo_connection = get_mongo_connection(started_cluster)
|
||||
db = mongo_connection["test"]
|
||||
db.add_user("root", "clickhouse")
|
||||
incomplete_mongo_table = db["complex_table"]
|
||||
data = []
|
||||
for i in range(0, 100):
|
||||
data.append({"key": i, "data": hex(i * i), "dict": {"a": i, "b": str(i)}})
|
||||
incomplete_mongo_table.insert_many(data)
|
||||
|
||||
node = started_cluster.instances["node"]
|
||||
|
||||
assert (
|
||||
node.query(
|
||||
"SELECT COUNT() FROM mongodb('mongo1:27017', 'test', 'complex_table', 'root', 'clickhouse', structure='key UInt64, data String, dict Map(UInt64, String)')"
|
||||
)
|
||||
== "100\n"
|
||||
)
|
||||
assert (
|
||||
node.query(
|
||||
"SELECT sum(key) FROM mongodb('mongo1:27017', 'test', 'complex_table', 'root', 'clickhouse', structure='key UInt64, data String, dict Map(UInt64, String)')"
|
||||
)
|
||||
== str(sum(range(0, 100))) + "\n"
|
||||
)
|
||||
|
||||
assert (
|
||||
node.query(
|
||||
"SELECT data from mongodb('mongo1:27017', 'test', 'complex_table', 'root', 'clickhouse', structure='key UInt64, data String, dict Map(UInt64, String)') where key = 42"
|
||||
)
|
||||
== hex(42 * 42) + "\n"
|
||||
)
|
||||
incomplete_mongo_table.drop()
|
||||
def get_address_for_ch():
|
||||
return cluster.redis_host + ':6379'
|
||||
|
||||
|
||||
@pytest.mark.parametrize("started_cluster", [False], indirect=["started_cluster"])
|
||||
def test_incorrect_data_type(started_cluster):
|
||||
mongo_connection = get_mongo_connection(started_cluster)
|
||||
db = mongo_connection["test"]
|
||||
db.add_user("root", "clickhouse")
|
||||
strange_mongo_table = db["strange_table"]
|
||||
data = []
|
||||
for i in range(0, 100):
|
||||
data.append({"key": i, "data": hex(i * i), "aaaa": "Hello"})
|
||||
strange_mongo_table.insert_many(data)
|
||||
def test_storage_simple(started_cluster):
|
||||
client = get_redis_connection()
|
||||
address = get_address_for_ch()
|
||||
|
||||
node = started_cluster.instances["node"]
|
||||
# clean all
|
||||
client.flushall()
|
||||
|
||||
data = {}
|
||||
for i in range(100):
|
||||
data[str(i)] = str(i)
|
||||
|
||||
client.mset(data)
|
||||
client.close()
|
||||
|
||||
response = TSV.toMat(node.query(
|
||||
f"""
|
||||
SELECT
|
||||
key, value
|
||||
FROM
|
||||
redis('{address}', 0, 'clickhouse')
|
||||
WHERE
|
||||
key='0'
|
||||
FORMAT TSV
|
||||
"""))
|
||||
|
||||
assert (len(response) == 1)
|
||||
assert (response[0] == ['0', '0'])
|
||||
|
||||
response = TSV.toMat(node.query(
|
||||
f"""
|
||||
SELECT
|
||||
*
|
||||
FROM
|
||||
redis('{address}', 0, 'clickhouse')
|
||||
ORDER BY
|
||||
key
|
||||
FORMAT TSV
|
||||
"""))
|
||||
|
||||
assert (len(response) == 100)
|
||||
assert (response[0] == ['0', '0'])
|
||||
|
||||
|
||||
def test_storage_hash_map(started_cluster):
|
||||
client = get_redis_connection()
|
||||
address = get_address_for_ch()
|
||||
|
||||
# clean all
|
||||
client.flushall()
|
||||
|
||||
key = 'k'
|
||||
data = {}
|
||||
for i in range(100):
|
||||
data[str(i)] = str(i)
|
||||
|
||||
client.hset(key, mapping=data)
|
||||
client.close()
|
||||
|
||||
response = TSV.toMat(node.query(
|
||||
f"""
|
||||
SELECT
|
||||
key, field, value
|
||||
FROM
|
||||
redis('{address}', 0, 'clickhouse','hash_map')
|
||||
WHERE
|
||||
field='0'
|
||||
FORMAT TSV
|
||||
"""))
|
||||
|
||||
assert (len(response) == 1)
|
||||
assert (response[0] == ['k', '0', '0'])
|
||||
|
||||
response = TSV.toMat(node.query(
|
||||
f"""
|
||||
SELECT
|
||||
*
|
||||
FROM
|
||||
redis('{address}', 0, 'clickhouse','hash_map')
|
||||
ORDER BY
|
||||
field
|
||||
FORMAT TSV
|
||||
"""))
|
||||
|
||||
assert (len(response) == 100)
|
||||
assert (response[0] == ['k', '0', '0'])
|
||||
|
||||
|
||||
def test_customized_table_structure(started_cluster):
|
||||
address = get_address_for_ch()
|
||||
|
||||
node.query(
|
||||
f"""
|
||||
SELECT
|
||||
*
|
||||
FROM
|
||||
redis('{address}', 0, 'clickhouse', "simple", 10, "k String, v UInt8")
|
||||
""")
|
||||
|
||||
node.query(
|
||||
f"""
|
||||
SELECT
|
||||
*
|
||||
FROM
|
||||
redis('{address}', 0, 'clickhouse', "hash_map", 10, "k String, f UInt8, v String")
|
||||
""")
|
||||
|
||||
# illegal columns
|
||||
with pytest.raises(QueryRuntimeException):
|
||||
node.query(
|
||||
"SELECT aaaa FROM mongodb('mongo1:27017', 'test', 'strange_table', 'root', 'clickhouse', structure='key UInt64, data String')"
|
||||
)
|
||||
f"""
|
||||
SELECT
|
||||
*
|
||||
FROM
|
||||
redis('{address}', 0, 'clickhouse', "hash_map", 10, "k String, v String")
|
||||
""")
|
||||
|
||||
strange_mongo_table.drop()
|
||||
|
||||
|
||||
@pytest.mark.parametrize("started_cluster", [True], indirect=["started_cluster"])
|
||||
def test_secure_connection(started_cluster):
|
||||
mongo_connection = get_mongo_connection(started_cluster, secure=True)
|
||||
db = mongo_connection["test"]
|
||||
db.add_user("root", "clickhouse")
|
||||
simple_mongo_table = db["simple_table"]
|
||||
data = []
|
||||
for i in range(0, 100):
|
||||
data.append({"key": i, "data": hex(i * i)})
|
||||
simple_mongo_table.insert_many(data)
|
||||
|
||||
node = started_cluster.instances["node"]
|
||||
|
||||
assert (
|
||||
# illegal data type
|
||||
with pytest.raises(QueryRuntimeException):
|
||||
node.query(
|
||||
"SELECT COUNT() FROM mongodb('mongo1:27017', 'test', 'simple_table', 'root', 'clickhouse', structure='key UInt64, data String', options='ssl=true')"
|
||||
)
|
||||
== "100\n"
|
||||
)
|
||||
assert (
|
||||
node.query(
|
||||
"SELECT sum(key) FROM mongodb('mongo1:27017', 'test', 'simple_table', 'root', 'clickhouse', structure='key UInt64, data String', options='ssl=true')"
|
||||
)
|
||||
== str(sum(range(0, 100))) + "\n"
|
||||
)
|
||||
assert (
|
||||
node.query(
|
||||
"SELECT sum(key) FROM mongodb('mongo1:27017', 'test', 'simple_table', 'root', 'clickhouse', 'key UInt64, data String', 'ssl=true')"
|
||||
)
|
||||
== str(sum(range(0, 100))) + "\n"
|
||||
)
|
||||
|
||||
assert (
|
||||
node.query(
|
||||
"SELECT data from mongodb('mongo1:27017', 'test', 'simple_table', 'root', 'clickhouse', structure='key UInt64, data String', options='ssl=true') where key = 42"
|
||||
)
|
||||
== hex(42 * 42) + "\n"
|
||||
)
|
||||
simple_mongo_table.drop()
|
||||
|
||||
|
||||
@pytest.mark.parametrize("started_cluster", [False], indirect=["started_cluster"])
|
||||
def test_predefined_connection_configuration(started_cluster):
|
||||
mongo_connection = get_mongo_connection(started_cluster)
|
||||
db = mongo_connection["test"]
|
||||
db.add_user("root", "clickhouse")
|
||||
simple_mongo_table = db["simple_table"]
|
||||
data = []
|
||||
for i in range(0, 100):
|
||||
data.append({"key": i, "data": hex(i * i)})
|
||||
simple_mongo_table.insert_many(data)
|
||||
|
||||
node = started_cluster.instances["node"]
|
||||
assert (
|
||||
node.query(
|
||||
"SELECT count() FROM mongodb('mongo1:27017', 'test', 'simple_table', 'root', 'clickhouse', structure='key UInt64, data String')"
|
||||
)
|
||||
== "100\n"
|
||||
)
|
||||
simple_mongo_table.drop()
|
||||
|
||||
|
||||
@pytest.mark.parametrize("started_cluster", [False], indirect=["started_cluster"])
|
||||
def test_no_credentials(started_cluster):
|
||||
mongo_connection = get_mongo_connection(started_cluster, with_credentials=False)
|
||||
db = mongo_connection["test"]
|
||||
simple_mongo_table = db["simple_table"]
|
||||
data = []
|
||||
for i in range(0, 100):
|
||||
data.append({"key": i, "data": hex(i * i)})
|
||||
simple_mongo_table.insert_many(data)
|
||||
|
||||
node = started_cluster.instances["node"]
|
||||
assert (
|
||||
node.query(
|
||||
"SELECT count() FROM mongodb('mongo2:27017', 'test', 'simple_table', '', '', structure='key UInt64, data String')"
|
||||
)
|
||||
== "100\n"
|
||||
)
|
||||
simple_mongo_table.drop()
|
||||
|
||||
|
||||
@pytest.mark.parametrize("started_cluster", [False], indirect=["started_cluster"])
|
||||
def test_auth_source(started_cluster):
|
||||
mongo_connection = get_mongo_connection(started_cluster, with_credentials=False)
|
||||
admin_db = mongo_connection["admin"]
|
||||
admin_db.add_user(
|
||||
"root",
|
||||
"clickhouse",
|
||||
roles=[{"role": "userAdminAnyDatabase", "db": "admin"}, "readWriteAnyDatabase"],
|
||||
)
|
||||
simple_mongo_table = admin_db["simple_table"]
|
||||
data = []
|
||||
for i in range(0, 50):
|
||||
data.append({"key": i, "data": hex(i * i)})
|
||||
simple_mongo_table.insert_many(data)
|
||||
db = mongo_connection["test"]
|
||||
simple_mongo_table = db["simple_table"]
|
||||
data = []
|
||||
for i in range(0, 100):
|
||||
data.append({"key": i, "data": hex(i * i)})
|
||||
simple_mongo_table.insert_many(data)
|
||||
|
||||
node = started_cluster.instances["node"]
|
||||
|
||||
node.query_and_get_error(
|
||||
"SELECT count() FROM mongodb('mongo2:27017', 'test', 'simple_table', 'root', 'clickhouse', structure='key UInt64, data String')"
|
||||
)
|
||||
|
||||
assert (
|
||||
node.query(
|
||||
"SELECT count() FROM mongodb('mongo2:27017', 'test', 'simple_table', 'root', 'clickhouse', structure='key UInt64, data String', options='authSource=admin')"
|
||||
)
|
||||
== "100\n"
|
||||
)
|
||||
simple_mongo_table.drop()
|
||||
|
||||
|
||||
@pytest.mark.parametrize("started_cluster", [False], indirect=["started_cluster"])
|
||||
def test_missing_columns(started_cluster):
|
||||
mongo_connection = get_mongo_connection(started_cluster)
|
||||
db = mongo_connection["test"]
|
||||
db.add_user("root", "clickhouse")
|
||||
simple_mongo_table = db["simple_table"]
|
||||
data = []
|
||||
for i in range(0, 10):
|
||||
data.append({"key": i, "data": hex(i * i)})
|
||||
for i in range(0, 10):
|
||||
data.append({"key": i})
|
||||
simple_mongo_table.insert_many(data)
|
||||
|
||||
node = started_cluster.instances["node"]
|
||||
result = node.query(
|
||||
"SELECT count() FROM mongodb('mongo1:27017', 'test', 'simple_table', 'root', 'clickhouse', structure='key UInt64, data Nullable(String)') WHERE isNull(data)"
|
||||
)
|
||||
assert result == "10\n"
|
||||
simple_mongo_table.drop()
|
||||
f"""
|
||||
SELECT
|
||||
*
|
||||
FROM
|
||||
redis('{address}', 0, 'clickhouse', "simple", 10, "k Ss, v String")
|
||||
""")
|
||||
|
Loading…
Reference in New Issue
Block a user