ClickHouse/tests/queries/0_stateless/02010_lc_native.python
2021-11-24 17:19:59 +01:00

313 lines
8.8 KiB
Python
Executable File

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import socket
import os
import uuid
CLICKHOUSE_HOST = os.environ.get('CLICKHOUSE_HOST', '127.0.0.1')
CLICKHOUSE_PORT = int(os.environ.get('CLICKHOUSE_PORT_TCP', '900000'))
CLICKHOUSE_DATABASE = os.environ.get('CLICKHOUSE_DATABASE', 'default')
def writeVarUInt(x, ba):
for _ in range(0, 9):
byte = x & 0x7F
if x > 0x7F:
byte |= 0x80
ba.append(byte)
x >>= 7
if x == 0:
return
def writeStringBinary(s, ba):
b = bytes(s, 'utf-8')
writeVarUInt(len(s), ba)
ba.extend(b)
def readStrict(s, size = 1):
res = bytearray()
while size:
cur = s.recv(size)
# if not res:
# raise "Socket is closed"
size -= len(cur)
res.extend(cur)
return res
def readUInt(s, size=1):
res = readStrict(s, size)
val = 0
for i in range(len(res)):
val += res[i] << (i * 8)
return val
def readUInt8(s):
return readUInt(s)
def readUInt16(s):
return readUInt(s, 2)
def readUInt32(s):
return readUInt(s, 4)
def readUInt64(s):
return readUInt(s, 8)
def readVarUInt(s):
x = 0
for i in range(9):
byte = readStrict(s)[0]
x |= (byte & 0x7F) << (7 * i)
if not byte & 0x80:
return x
return x
def readStringBinary(s):
size = readVarUInt(s)
s = readStrict(s, size)
return s.decode('utf-8')
def sendHello(s):
ba = bytearray()
writeVarUInt(0, ba) # Hello
writeStringBinary('simple native protocol', ba)
writeVarUInt(21, ba)
writeVarUInt(9, ba)
writeVarUInt(54449, ba)
writeStringBinary('default', ba) # database
writeStringBinary('default', ba) # user
writeStringBinary('', ba) # pwd
s.sendall(ba)
def receiveHello(s):
p_type = readVarUInt(s)
assert (p_type == 0) # Hello
server_name = readStringBinary(s)
# print("Server name: ", server_name)
server_version_major = readVarUInt(s)
# print("Major: ", server_version_major)
server_version_minor = readVarUInt(s)
# print("Minor: ", server_version_minor)
server_revision = readVarUInt(s)
# print("Revision: ", server_revision)
server_timezone = readStringBinary(s)
# print("Timezone: ", server_timezone)
server_display_name = readStringBinary(s)
# print("Display name: ", server_display_name)
server_version_patch = readVarUInt(s)
# print("Version patch: ", server_version_patch)
def serializeClientInfo(ba, query_id):
writeStringBinary('default', ba) # initial_user
writeStringBinary(query_id, ba) # initial_query_id
writeStringBinary('127.0.0.1:9000', ba) # initial_address
ba.extend([0] * 8) # initial_query_start_time_microseconds
ba.append(1) # TCP
writeStringBinary('os_user', ba) # os_user
writeStringBinary('client_hostname', ba) # client_hostname
writeStringBinary('client_name', ba) # client_name
writeVarUInt(21, ba)
writeVarUInt(9, ba)
writeVarUInt(54449, ba)
writeStringBinary('', ba) # quota_key
writeVarUInt(0, ba) # distributed_depth
writeVarUInt(1, ba) # client_version_patch
ba.append(0) # No telemetry
def sendQuery(s, query):
ba = bytearray()
query_id = uuid.uuid4().hex
writeVarUInt(1, ba) # query
writeStringBinary(query_id, ba)
ba.append(1) # INITIAL_QUERY
# client info
serializeClientInfo(ba, query_id)
writeStringBinary('', ba) # No settings
writeStringBinary('', ba) # No interserver secret
writeVarUInt(2, ba) # Stage - Complete
ba.append(0) # No compression
writeStringBinary(query + ' settings input_format_defaults_for_omitted_fields=0', ba) # query, finally
s.sendall(ba)
def serializeBlockInfo(ba):
writeVarUInt(1, ba) # 1
ba.append(0) # is_overflows
writeVarUInt(2, ba) # 2
writeVarUInt(0, ba) # 0
ba.extend([0] * 4) # bucket_num
def sendEmptyBlock(s):
ba = bytearray()
writeVarUInt(2, ba) # Data
writeStringBinary('', ba)
serializeBlockInfo(ba)
writeVarUInt(0, ba) # rows
writeVarUInt(0, ba) # columns
s.sendall(ba)
def assertPacket(packet, expected):
assert(packet == expected), packet
def readHeader(s):
packet_type = readVarUInt(s)
if packet_type == 2: # Exception
raise RuntimeError(readException(s))
assertPacket(packet_type, 1) # Data
readStringBinary(s) # external table name
# BlockInfo
assertPacket(readVarUInt(s), 1) # 1
assertPacket(readUInt8(s), 0) # is_overflows
assertPacket(readVarUInt(s), 2) # 2
assertPacket(readUInt32(s), 4294967295) # bucket_num
assertPacket(readVarUInt(s), 0) # 0
columns = readVarUInt(s) # rows
rows = readVarUInt(s) # columns
print("Rows {} Columns {}".format(rows, columns))
for _ in range(columns):
col_name = readStringBinary(s)
type_name = readStringBinary(s)
print("Column {} type {}".format(col_name, type_name))
def readException(s):
code = readUInt32(s)
name = readStringBinary(s)
text = readStringBinary(s)
readStringBinary(s) # trace
assertPacket(readUInt8(s), 0) # has_nested
return "code {}: {}".format(code, text.replace('DB::Exception:', ''))
def insertValidLowCardinalityRow():
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(30)
s.connect((CLICKHOUSE_HOST, CLICKHOUSE_PORT))
sendHello(s)
receiveHello(s)
sendQuery(s, 'insert into {}.tab format TSV'.format(CLICKHOUSE_DATABASE))
# external tables
sendEmptyBlock(s)
readHeader(s)
# Data
ba = bytearray()
writeVarUInt(2, ba) # Data
writeStringBinary('', ba)
serializeBlockInfo(ba)
writeVarUInt(1, ba) # rows
writeVarUInt(1, ba) # columns
writeStringBinary('x', ba)
writeStringBinary('LowCardinality(String)', ba)
ba.extend([1] + [0] * 7) # SharedDictionariesWithAdditionalKeys
ba.extend([3, 2] + [0] * 6) # indexes type: UInt64 [3], with additional keys [2]
ba.extend([1] + [0] * 7) # num_keys in dict
writeStringBinary('hello', ba) # key
ba.extend([1] + [0] * 7) # num_indexes
ba.extend([0] * 8) # UInt64 index (0 for 'hello')
s.sendall(ba)
# Fin block
sendEmptyBlock(s)
assertPacket(readVarUInt(s), 5) # End of stream
s.close()
def insertLowCardinalityRowWithIndexOverflow():
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(30)
s.connect((CLICKHOUSE_HOST, CLICKHOUSE_PORT))
sendHello(s)
receiveHello(s)
sendQuery(s, 'insert into {}.tab format TSV'.format(CLICKHOUSE_DATABASE))
# external tables
sendEmptyBlock(s)
readHeader(s)
# Data
ba = bytearray()
writeVarUInt(2, ba) # Data
writeStringBinary('', ba)
serializeBlockInfo(ba)
writeVarUInt(1, ba) # rows
writeVarUInt(1, ba) # columns
writeStringBinary('x', ba)
writeStringBinary('LowCardinality(String)', ba)
ba.extend([1] + [0] * 7) # SharedDictionariesWithAdditionalKeys
ba.extend([3, 2] + [0] * 6) # indexes type: UInt64 [3], with additional keys [2]
ba.extend([1] + [0] * 7) # num_keys in dict
writeStringBinary('hello', ba) # key
ba.extend([1] + [0] * 7) # num_indexes
ba.extend([0] * 7 + [1]) # UInt64 index (overflow)
s.sendall(ba)
assertPacket(readVarUInt(s), 2)
print(readException(s))
s.close()
def insertLowCardinalityRowWithIncorrectDictType():
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(30)
s.connect((CLICKHOUSE_HOST, CLICKHOUSE_PORT))
sendHello(s)
receiveHello(s)
sendQuery(s, 'insert into {}.tab format TSV'.format(CLICKHOUSE_DATABASE))
# external tables
sendEmptyBlock(s)
readHeader(s)
# Data
ba = bytearray()
writeVarUInt(2, ba) # Data
writeStringBinary('', ba)
serializeBlockInfo(ba)
writeVarUInt(1, ba) # rows
writeVarUInt(1, ba) # columns
writeStringBinary('x', ba)
writeStringBinary('LowCardinality(String)', ba)
ba.extend([1] + [0] * 7) # SharedDictionariesWithAdditionalKeys
ba.extend([3, 3] + [0] * 6) # indexes type: UInt64 [3], with global dict and add keys [1 + 2]
ba.extend([1] + [0] * 7) # num_keys in dict
writeStringBinary('hello', ba) # key
ba.extend([1] + [0] * 7) # num_indexes
ba.extend([0] * 8) # UInt64 index (overflow)
s.sendall(ba)
assertPacket(readVarUInt(s), 2)
print(readException(s))
s.close()
def main():
insertValidLowCardinalityRow()
insertLowCardinalityRowWithIndexOverflow()
insertLowCardinalityRowWithIncorrectDictType()
if __name__ == "__main__":
main()