ClickHouse/tests/queries/0_stateless/02750_settings_alias_tcp_protocol.python
2023-10-21 03:14:22 +02:00

220 lines
5.1 KiB
Python

#!/usr/bin/env python3
import socket
import os
import uuid
import json
CLICKHOUSE_HOST = os.environ.get("CLICKHOUSE_HOST", "127.0.0.1")
CLICKHOUSE_PORT = int(os.environ.get("CLICKHOUSE_PORT_TCP", "900000"))
CLICKHOUSE_DATABASE = os.environ.get("CLICKHOUSE_DATABASE", "default")
CLIENT_NAME = "simple native protocol"
def writeVarUInt(x, ba):
for _ in range(0, 9):
byte = x & 0x7F
if x > 0x7F:
byte |= 0x80
ba.append(byte)
x >>= 7
if x == 0:
return
def writeStringBinary(s, ba):
b = bytes(s, "utf-8")
writeVarUInt(len(s), ba)
ba.extend(b)
def readStrict(s, size=1):
res = bytearray()
while size:
cur = s.recv(size)
# if not res:
# raise "Socket is closed"
size -= len(cur)
res.extend(cur)
return res
def readUInt(s, size=1):
res = readStrict(s, size)
val = 0
for i in range(len(res)):
val += res[i] << (i * 8)
return val
def readUInt8(s):
return readUInt(s)
def readUInt16(s):
return readUInt(s, 2)
def readUInt32(s):
return readUInt(s, 4)
def readUInt64(s):
return readUInt(s, 8)
def readVarUInt(s):
x = 0
for i in range(9):
byte = readStrict(s)[0]
x |= (byte & 0x7F) << (7 * i)
if not byte & 0x80:
return x
return x
def readStringBinary(s):
size = readVarUInt(s)
s = readStrict(s, size)
return s.decode("utf-8")
def sendHello(s):
ba = bytearray()
writeVarUInt(0, ba) # Hello
writeStringBinary(CLIENT_NAME, ba)
writeVarUInt(21, ba)
writeVarUInt(9, ba)
writeVarUInt(54449, ba)
writeStringBinary(CLICKHOUSE_DATABASE, ba) # database
writeStringBinary("default", ba) # user
writeStringBinary("", ba) # pwd
s.sendall(ba)
def receiveHello(s):
p_type = readVarUInt(s)
assert p_type == 0 # Hello
_server_name = readStringBinary(s)
_server_version_major = readVarUInt(s)
_server_version_minor = readVarUInt(s)
_server_revision = readVarUInt(s)
_server_timezone = readStringBinary(s)
_server_display_name = readStringBinary(s)
_server_version_patch = readVarUInt(s)
def serializeClientInfo(ba, query_id):
writeStringBinary("default", ba) # initial_user
writeStringBinary(query_id, ba) # initial_query_id
writeStringBinary("127.0.0.1:9000", ba) # initial_address
ba.extend([0] * 8) # initial_query_start_time_microseconds
ba.append(1) # TCP
writeStringBinary("os_user", ba) # os_user
writeStringBinary("client_hostname", ba) # client_hostname
writeStringBinary(CLIENT_NAME, ba) # client_name
writeVarUInt(21, ba)
writeVarUInt(9, ba)
writeVarUInt(54449, ba)
writeStringBinary("", ba) # quota_key
writeVarUInt(0, ba) # distributed_depth
writeVarUInt(1, ba) # client_version_patch
ba.append(0) # No telemetry
def sendQuery(s, query, settings):
ba = bytearray()
query_id = uuid.uuid4().hex
writeVarUInt(1, ba) # query
writeStringBinary(query_id, ba)
ba.append(1) # INITIAL_QUERY
# client info
serializeClientInfo(ba, query_id)
# Settings
for key, value in settings.items():
writeStringBinary(key, ba)
writeVarUInt(1, ba) # is_important
writeStringBinary(str(value), ba)
writeStringBinary("", ba) # End of settings
writeStringBinary("", ba) # No interserver secret
writeVarUInt(2, ba) # Stage - Complete
ba.append(0) # No compression
writeStringBinary(query, ba) # query, finally
s.sendall(ba)
def serializeBlockInfo(ba):
writeVarUInt(1, ba) # 1
ba.append(0) # is_overflows
writeVarUInt(2, ba) # 2
writeVarUInt(0, ba) # 0
ba.extend([0] * 4) # bucket_num
def sendEmptyBlock(s):
ba = bytearray()
writeVarUInt(2, ba) # Data
writeStringBinary("", ba)
serializeBlockInfo(ba)
writeVarUInt(0, ba) # rows
writeVarUInt(0, ba) # columns
s.sendall(ba)
def assertPacket(packet, expected):
assert packet == expected, "Got: {}, expected: {}".format(packet, expected)
def readResponse(s):
packet_type = readVarUInt(s)
if packet_type == 2: # Exception
raise RuntimeError(readException(s))
if packet_type == 1: # Data
return None
if packet_type == 3: # Progress
return None
if packet_type == 5: # End stream
return None
raise RuntimeError("Unexpected packet: {}".format(packet_type))
def readException(s):
code = readUInt32(s)
_name = readStringBinary(s)
text = readStringBinary(s)
readStringBinary(s) # trace
assertPacket(readUInt8(s), 0) # has_nested
return "code {}: {}".format(code, text.replace("DB::Exception:", ""))
def main():
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(30)
s.connect((CLICKHOUSE_HOST, CLICKHOUSE_PORT))
sendHello(s)
receiveHello(s)
sendQuery(s, "select 1", {"replication_alter_partitions_sync": 1})
# external tables
sendEmptyBlock(s)
while readResponse(s) is not None:
pass
s.close()
print("OK")
if __name__ == "__main__":
main()