ClickHouse/tests/integration/test_table_function_redis/test.py

231 lines
5.1 KiB
Python
Raw Normal View History

2023-05-30 12:31:23 +00:00
import datetime
2023-05-25 04:33:07 +00:00
2023-05-26 02:34:37 +00:00
import redis
2023-05-25 04:33:07 +00:00
import pytest
2023-05-30 12:31:23 +00:00
import sys
import struct
2023-05-25 04:33:07 +00:00
2023-05-26 02:34:37 +00:00
from helpers.client import QueryRuntimeException
2023-05-25 04:33:07 +00:00
from helpers.cluster import ClickHouseCluster
2023-05-26 02:34:37 +00:00
from helpers.test_tools import TSV
cluster = ClickHouseCluster(__file__)
node = cluster.add_instance("node", with_redis=True)
2023-05-25 04:33:07 +00:00
@pytest.fixture(scope="module")
2023-05-26 02:34:37 +00:00
def started_cluster():
2023-05-25 04:33:07 +00:00
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
2023-05-26 02:34:37 +00:00
def get_redis_connection(db_id=0):
client = redis.Redis(
2023-05-31 10:15:38 +00:00
host="localhost", port=cluster.redis_port, password="clickhouse", db=db_id
2023-05-25 04:33:07 +00:00
)
2023-05-26 02:34:37 +00:00
return client
def get_address_for_ch():
2023-05-31 10:15:38 +00:00
return cluster.redis_host + ":6379"
2023-05-26 02:34:37 +00:00
2023-05-30 12:31:23 +00:00
# see SerializationString.serializeBinary
def serialize_binary_for_string(x):
var_uint_max = (1 << 63) - 1
buf = bytearray()
# write length
length = len(x)
# length = (length << 1) ^ (length >> 63)
if length > var_uint_max:
raise ValueError("Value too large for varint encoding")
for i in range(9):
byte = length & 0x7F
if length > 0x7F:
byte |= 0x80
2023-05-31 10:15:38 +00:00
buf += bytes([byte])
2023-05-30 12:31:23 +00:00
length >>= 7
if not length:
break
# write data
2023-05-31 10:15:38 +00:00
buf += x.encode("utf-8")
2023-05-30 12:31:23 +00:00
return bytes(buf)
# see SerializationNumber.serializeBinary
def serialize_binary_for_uint32(x):
buf = bytearray()
2023-05-31 10:15:38 +00:00
packed_num = struct.pack("I", x)
2023-05-30 12:31:23 +00:00
buf += packed_num
2023-05-31 10:15:38 +00:00
if sys.byteorder != "little":
2023-05-30 12:31:23 +00:00
buf.reverse()
return bytes(buf)
def test_simple_select(started_cluster):
2023-05-26 02:34:37 +00:00
client = get_redis_connection()
address = get_address_for_ch()
# clean all
client.flushall()
data = {}
for i in range(100):
2023-05-30 12:31:23 +00:00
packed = serialize_binary_for_string(str(i))
data[packed] = packed
2023-05-26 02:34:37 +00:00
client.mset(data)
client.close()
2023-05-31 12:32:34 +00:00
response = TSV.toMat(
node.query(
f"""
SELECT
key, value
FROM
redis('{address}', 'key', 'key String, value String', 0, 'clickhouse', 10)
WHERE
key='0'
FORMAT TSV
"""
)
)
2023-05-26 02:34:37 +00:00
2023-05-31 10:15:38 +00:00
assert len(response) == 1
assert response[0] == ["0", "0"]
2023-05-26 02:34:37 +00:00
2023-05-31 12:32:34 +00:00
response = TSV.toMat(
node.query(
f"""
SELECT
*
FROM
redis('{address}', 'key', 'key String, value String', 0, 'clickhouse', 10)
ORDER BY
key
FORMAT TSV
"""
)
)
2023-05-26 02:34:37 +00:00
2023-05-31 10:15:38 +00:00
assert len(response) == 100
assert response[0] == ["0", "0"]
2023-05-26 02:34:37 +00:00
2023-05-30 12:31:23 +00:00
def test_create_table(started_cluster):
2023-05-26 02:34:37 +00:00
client = get_redis_connection()
address = get_address_for_ch()
# clean all
client.flushall()
client.close()
node.query(
f"""
SELECT
*
FROM
2023-05-30 12:31:23 +00:00
redis('{address}', 'k', 'k String, v UInt32', 0, 'clickhouse', 10)
2023-05-31 12:32:34 +00:00
"""
)
2023-05-26 02:34:37 +00:00
2023-05-30 12:31:23 +00:00
# illegal data type
2023-05-25 04:33:07 +00:00
with pytest.raises(QueryRuntimeException):
node.query(
2023-05-26 02:34:37 +00:00
f"""
SELECT
*
FROM
2023-05-30 12:31:23 +00:00
redis('{address}', 'k', 'k not_exist_type, v String', 0, 'clickhouse', 10)
2023-05-31 12:32:34 +00:00
"""
)
2023-05-25 04:33:07 +00:00
2023-05-30 12:31:23 +00:00
# illegal key
2023-05-26 02:34:37 +00:00
with pytest.raises(QueryRuntimeException):
2023-05-25 04:33:07 +00:00
node.query(
2023-05-26 02:34:37 +00:00
f"""
SELECT
*
FROM
2023-05-30 12:31:23 +00:00
redis('{address}', 'not_exist_key', 'k not_exist_type, v String', 0, 'clickhouse', 10)
2023-05-31 14:23:09 +00:00
"""
)
2023-05-29 07:22:29 +00:00
def test_data_type(started_cluster):
client = get_redis_connection()
address = get_address_for_ch()
# string
client.flushall()
2023-05-31 10:15:38 +00:00
value = serialize_binary_for_string("0")
2023-05-30 12:31:23 +00:00
client.set(value, value)
2023-05-29 07:22:29 +00:00
2023-05-31 12:32:34 +00:00
response = TSV.toMat(
node.query(
f"""
SELECT
*
FROM
redis('{address}', 'k', 'k String, v String', 0, 'clickhouse', 10)
WHERE
k='0'
FORMAT TSV
"""
)
)
2023-05-29 07:22:29 +00:00
2023-05-31 10:15:38 +00:00
assert len(response) == 1
assert response[0] == ["0", "0"]
2023-05-29 07:22:29 +00:00
# number
client.flushall()
2023-05-30 12:31:23 +00:00
value = serialize_binary_for_uint32(0)
client.set(value, value)
2023-05-29 07:22:29 +00:00
2023-05-31 12:32:34 +00:00
response = TSV.toMat(
node.query(
f"""
SELECT
*
FROM
redis('{address}', 'k', 'k UInt32, v UInt32', 0, 'clickhouse', 10)
WHERE
k=0
FORMAT TSV
"""
)
)
2023-05-29 07:22:29 +00:00
2023-05-31 10:15:38 +00:00
assert len(response) == 1
assert response[0] == ["0", "0"]
2023-05-29 07:22:29 +00:00
# datetime
client.flushall()
2023-05-30 12:31:23 +00:00
# clickhouse store datatime as uint32 in internal
dt = datetime.datetime(2023, 6, 1, 0, 0, 0)
seconds_since_epoch = dt.timestamp()
value = serialize_binary_for_uint32(int(seconds_since_epoch))
client.set(value, value)
2023-05-29 07:22:29 +00:00
2023-05-31 12:32:34 +00:00
response = TSV.toMat(
node.query(
f"""
SELECT
*
FROM
redis('{address}', 'k', 'k DateTime, v DateTime', 0, 'clickhouse', 10)
WHERE
k='2023-06-01 00:00:00'
FORMAT TSV
"""
)
)
2023-05-29 07:22:29 +00:00
2023-05-31 10:15:38 +00:00
assert len(response) == 1
assert response[0] == ["2023-06-01 00:00:00", "2023-06-01 00:00:00"]