mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-12 01:12:12 +00:00
303 lines
8.3 KiB
Plaintext
303 lines
8.3 KiB
Plaintext
|
#!/usr/bin/env python3
|
||
|
# -*- coding: utf-8 -*-
|
||
|
|
||
|
import socket
|
||
|
import os
|
||
|
|
||
|
CLICKHOUSE_HOST = os.environ.get('CLICKHOUSE_HOST', '127.0.0.1')
|
||
|
CLICKHOUSE_PORT = int(os.environ.get('CLICKHOUSE_PORT_TCP', '900000'))
|
||
|
CLICKHOUSE_DATABASE = os.environ.get('CLICKHOUSE_DATABASE', 'default')
|
||
|
|
||
|
def writeVarUInt(x, ba):
|
||
|
for _ in range(0, 9):
|
||
|
|
||
|
byte = x & 0x7F
|
||
|
if x > 0x7F:
|
||
|
byte |= 0x80
|
||
|
|
||
|
ba.append(byte)
|
||
|
|
||
|
x >>= 7
|
||
|
if x == 0:
|
||
|
return
|
||
|
|
||
|
|
||
|
def writeStringBinary(s, ba):
|
||
|
b = bytes(s, 'utf-8')
|
||
|
writeVarUInt(len(s), ba)
|
||
|
ba.extend(b)
|
||
|
|
||
|
|
||
|
def readStrict(s, size = 1):
|
||
|
res = bytearray()
|
||
|
while size:
|
||
|
cur = s.recv(size)
|
||
|
# if not res:
|
||
|
# raise "Socket is closed"
|
||
|
size -= len(cur)
|
||
|
res.extend(cur)
|
||
|
|
||
|
return res
|
||
|
|
||
|
|
||
|
def readUInt(s, size=1):
|
||
|
res = readStrict(s, size)
|
||
|
val = 0
|
||
|
for i in range(len(res)):
|
||
|
val += res[i] << (i * 8)
|
||
|
return val
|
||
|
|
||
|
def readUInt8(s):
|
||
|
return readUInt(s)
|
||
|
|
||
|
def readUInt16(s):
|
||
|
return readUInt(s, 2)
|
||
|
|
||
|
def readUInt32(s):
|
||
|
return readUInt(s, 4)
|
||
|
|
||
|
def readUInt64(s):
|
||
|
return readUInt(s, 8)
|
||
|
|
||
|
def readVarUInt(s):
|
||
|
x = 0
|
||
|
for i in range(9):
|
||
|
byte = readStrict(s)[0]
|
||
|
x |= (byte & 0x7F) << (7 * i)
|
||
|
|
||
|
if not byte & 0x80:
|
||
|
return x
|
||
|
|
||
|
return x
|
||
|
|
||
|
|
||
|
def readStringBinary(s):
|
||
|
size = readVarUInt(s)
|
||
|
s = readStrict(s, size)
|
||
|
return s.decode('utf-8')
|
||
|
|
||
|
|
||
|
def sendHello(s):
|
||
|
ba = bytearray()
|
||
|
writeVarUInt(0, ba) # Hello
|
||
|
writeStringBinary('simple native protocol', ba)
|
||
|
writeVarUInt(21, ba)
|
||
|
writeVarUInt(9, ba)
|
||
|
writeVarUInt(54449, ba)
|
||
|
writeStringBinary('default', ba) # database
|
||
|
writeStringBinary('default', ba) # user
|
||
|
writeStringBinary('', ba) # pwd
|
||
|
s.sendall(ba)
|
||
|
|
||
|
|
||
|
def receiveHello(s):
|
||
|
p_type = readVarUInt(s)
|
||
|
assert (p_type == 0) # Hello
|
||
|
server_name = readStringBinary(s)
|
||
|
# print("Server name: ", server_name)
|
||
|
server_version_major = readVarUInt(s)
|
||
|
# print("Major: ", server_version_major)
|
||
|
server_version_minor = readVarUInt(s)
|
||
|
# print("Minor: ", server_version_minor)
|
||
|
server_revision = readVarUInt(s)
|
||
|
# print("Revision: ", server_revision)
|
||
|
server_timezone = readStringBinary(s)
|
||
|
# print("Timezone: ", server_timezone)
|
||
|
server_display_name = readStringBinary(s)
|
||
|
# print("Display name: ", server_display_name)
|
||
|
server_version_patch = readVarUInt(s)
|
||
|
# print("Version patch: ", server_version_patch)
|
||
|
|
||
|
|
||
|
def serializeClientInfo(ba):
|
||
|
writeStringBinary('default', ba) # initial_user
|
||
|
writeStringBinary('123456', ba) # initial_query_id
|
||
|
writeStringBinary('127.0.0.1:9000', ba) # initial_address
|
||
|
ba.extend([0] * 8) # initial_query_start_time_microseconds
|
||
|
ba.append(1) # TCP
|
||
|
writeStringBinary('os_user', ba) # os_user
|
||
|
writeStringBinary('client_hostname', ba) # client_hostname
|
||
|
writeStringBinary('client_name', ba) # client_name
|
||
|
writeVarUInt(21, ba)
|
||
|
writeVarUInt(9, ba)
|
||
|
writeVarUInt(54449, ba)
|
||
|
writeStringBinary('', ba) # quota_key
|
||
|
writeVarUInt(0, ba) # distributed_depth
|
||
|
writeVarUInt(1, ba) # client_version_patch
|
||
|
ba.append(0) # No telemetry
|
||
|
|
||
|
|
||
|
def sendQuery(s, query):
|
||
|
ba = bytearray()
|
||
|
writeVarUInt(1, ba) # query
|
||
|
writeStringBinary('123456', ba)
|
||
|
|
||
|
ba.append(1) # INITIAL_QUERY
|
||
|
|
||
|
# client info
|
||
|
serializeClientInfo(ba)
|
||
|
|
||
|
writeStringBinary('', ba) # No settings
|
||
|
writeStringBinary('', ba) # No interserver secret
|
||
|
writeVarUInt(2, ba) # Stage - Complete
|
||
|
ba.append(0) # No compression
|
||
|
writeStringBinary(query + ' settings input_format_defaults_for_omitted_fields=0', ba) # query, finally
|
||
|
s.sendall(ba)
|
||
|
|
||
|
|
||
|
def serializeBlockInfo(ba):
|
||
|
writeVarUInt(1, ba) # 1
|
||
|
ba.append(0) # is_overflows
|
||
|
writeVarUInt(2, ba) # 2
|
||
|
writeVarUInt(0, ba) # 0
|
||
|
ba.extend([0] * 4) # bucket_num
|
||
|
|
||
|
|
||
|
def sendEmptyBlock(s):
|
||
|
ba = bytearray()
|
||
|
writeVarUInt(2, ba) # Data
|
||
|
writeStringBinary('', ba)
|
||
|
serializeBlockInfo(ba)
|
||
|
writeVarUInt(0, ba) # rows
|
||
|
writeVarUInt(0, ba) # columns
|
||
|
s.sendall(ba)
|
||
|
|
||
|
|
||
|
def readHeader(s):
|
||
|
readVarUInt(s) # Data
|
||
|
readStringBinary(s) # external table name
|
||
|
# BlockInfo
|
||
|
readVarUInt(s) # 1
|
||
|
readUInt8(s) # is_overflows
|
||
|
readVarUInt(s) # 2
|
||
|
readUInt32(s) # bucket_num
|
||
|
readVarUInt(s) # 0
|
||
|
columns = readVarUInt(s) # rows
|
||
|
rows = readVarUInt(s) # columns
|
||
|
print("Rows {} Columns {}".format(rows, columns))
|
||
|
for _ in range(columns):
|
||
|
col_name = readStringBinary(s)
|
||
|
type_name = readStringBinary(s)
|
||
|
print("Column {} type {}".format(col_name, type_name))
|
||
|
|
||
|
|
||
|
def readException(s):
|
||
|
assert(readVarUInt(s) == 2)
|
||
|
code = readUInt32(s)
|
||
|
name = readStringBinary(s)
|
||
|
text = readStringBinary(s)
|
||
|
readStringBinary(s) # trace
|
||
|
assert(readUInt8(s) == 0) # has_nested
|
||
|
print("code {}: {}".format(code, text.replace('DB::Exception:', '')))
|
||
|
|
||
|
|
||
|
def insertValidLowCardinalityRow():
|
||
|
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
|
||
|
s.settimeout(30)
|
||
|
s.connect((CLICKHOUSE_HOST, CLICKHOUSE_PORT))
|
||
|
sendHello(s)
|
||
|
receiveHello(s)
|
||
|
sendQuery(s, 'insert into {}.tab format TSV'.format(CLICKHOUSE_DATABASE))
|
||
|
|
||
|
# external tables
|
||
|
sendEmptyBlock(s)
|
||
|
readHeader(s)
|
||
|
|
||
|
# Data
|
||
|
ba = bytearray()
|
||
|
writeVarUInt(2, ba) # Data
|
||
|
writeStringBinary('', ba)
|
||
|
serializeBlockInfo(ba)
|
||
|
writeVarUInt(1, ba) # rows
|
||
|
writeVarUInt(1, ba) # columns
|
||
|
writeStringBinary('x', ba)
|
||
|
writeStringBinary('LowCardinality(String)', ba)
|
||
|
ba.extend([1] + [0] * 7) # SharedDictionariesWithAdditionalKeys
|
||
|
ba.extend([3, 2] + [0] * 6) # indexes type: UInt64 [3], with additional keys [2]
|
||
|
ba.extend([1] + [0] * 7) # num_keys in dict
|
||
|
writeStringBinary('hello', ba) # key
|
||
|
ba.extend([1] + [0] * 7) # num_indexes
|
||
|
ba.extend([0] * 8) # UInt64 index (0 for 'hello')
|
||
|
s.sendall(ba)
|
||
|
|
||
|
# Fin block
|
||
|
sendEmptyBlock(s)
|
||
|
|
||
|
assert(readVarUInt(s) == 5) # End of stream
|
||
|
s.close()
|
||
|
|
||
|
|
||
|
def insertLowCardinalityRowWithIndexOverflow():
|
||
|
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
|
||
|
s.settimeout(30)
|
||
|
s.connect((CLICKHOUSE_HOST, CLICKHOUSE_PORT))
|
||
|
sendHello(s)
|
||
|
receiveHello(s)
|
||
|
sendQuery(s, 'insert into {}.tab format TSV'.format(CLICKHOUSE_DATABASE))
|
||
|
|
||
|
# external tables
|
||
|
sendEmptyBlock(s)
|
||
|
readHeader(s)
|
||
|
|
||
|
# Data
|
||
|
ba = bytearray()
|
||
|
writeVarUInt(2, ba) # Data
|
||
|
writeStringBinary('', ba)
|
||
|
serializeBlockInfo(ba)
|
||
|
writeVarUInt(1, ba) # rows
|
||
|
writeVarUInt(1, ba) # columns
|
||
|
writeStringBinary('x', ba)
|
||
|
writeStringBinary('LowCardinality(String)', ba)
|
||
|
ba.extend([1] + [0] * 7) # SharedDictionariesWithAdditionalKeys
|
||
|
ba.extend([3, 2] + [0] * 6) # indexes type: UInt64 [3], with additional keys [2]
|
||
|
ba.extend([1] + [0] * 7) # num_keys in dict
|
||
|
writeStringBinary('hello', ba) # key
|
||
|
ba.extend([1] + [0] * 7) # num_indexes
|
||
|
ba.extend([0] * 7 + [1]) # UInt64 index (overflow)
|
||
|
s.sendall(ba)
|
||
|
|
||
|
readException(s)
|
||
|
s.close()
|
||
|
|
||
|
|
||
|
def insertLowCardinalityRowWithIncorrectDictType():
|
||
|
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
|
||
|
s.settimeout(30)
|
||
|
s.connect((CLICKHOUSE_HOST, CLICKHOUSE_PORT))
|
||
|
sendHello(s)
|
||
|
receiveHello(s)
|
||
|
sendQuery(s, 'insert into {}.tab format TSV'.format(CLICKHOUSE_DATABASE))
|
||
|
|
||
|
# external tables
|
||
|
sendEmptyBlock(s)
|
||
|
readHeader(s)
|
||
|
|
||
|
# Data
|
||
|
ba = bytearray()
|
||
|
writeVarUInt(2, ba) # Data
|
||
|
writeStringBinary('', ba)
|
||
|
serializeBlockInfo(ba)
|
||
|
writeVarUInt(1, ba) # rows
|
||
|
writeVarUInt(1, ba) # columns
|
||
|
writeStringBinary('x', ba)
|
||
|
writeStringBinary('LowCardinality(String)', ba)
|
||
|
ba.extend([1] + [0] * 7) # SharedDictionariesWithAdditionalKeys
|
||
|
ba.extend([3, 3] + [0] * 6) # indexes type: UInt64 [3], with global dict and add keys [1 + 2]
|
||
|
ba.extend([1] + [0] * 7) # num_keys in dict
|
||
|
writeStringBinary('hello', ba) # key
|
||
|
ba.extend([1] + [0] * 7) # num_indexes
|
||
|
ba.extend([0] * 8) # UInt64 index (overflow)
|
||
|
s.sendall(ba)
|
||
|
|
||
|
readException(s)
|
||
|
s.close()
|
||
|
|
||
|
|
||
|
def main():
|
||
|
insertValidLowCardinalityRow()
|
||
|
insertLowCardinalityRowWithIndexOverflow()
|
||
|
insertLowCardinalityRowWithIncorrectDictType()
|
||
|
|
||
|
if __name__ == "__main__":
|
||
|
main()
|