#!/usr/bin/env python3 # -*- coding: utf-8 -*- import socket import os CLICKHOUSE_HOST = os.environ.get('CLICKHOUSE_HOST', '127.0.0.1') CLICKHOUSE_PORT = int(os.environ.get('CLICKHOUSE_PORT_TCP', '900000')) CLICKHOUSE_DATABASE = os.environ.get('CLICKHOUSE_DATABASE', 'default') def writeVarUInt(x, ba): for _ in range(0, 9): byte = x & 0x7F if x > 0x7F: byte |= 0x80 ba.append(byte) x >>= 7 if x == 0: return def writeStringBinary(s, ba): b = bytes(s, 'utf-8') writeVarUInt(len(s), ba) ba.extend(b) def readStrict(s, size = 1): res = bytearray() while size: cur = s.recv(size) # if not res: # raise "Socket is closed" size -= len(cur) res.extend(cur) return res def readUInt(s, size=1): res = readStrict(s, size) val = 0 for i in range(len(res)): val += res[i] << (i * 8) return val def readUInt8(s): return readUInt(s) def readUInt16(s): return readUInt(s, 2) def readUInt32(s): return readUInt(s, 4) def readUInt64(s): return readUInt(s, 8) def readVarUInt(s): x = 0 for i in range(9): byte = readStrict(s)[0] x |= (byte & 0x7F) << (7 * i) if not byte & 0x80: return x return x def readStringBinary(s): size = readVarUInt(s) s = readStrict(s, size) return s.decode('utf-8') def sendHello(s): ba = bytearray() writeVarUInt(0, ba) # Hello writeStringBinary('simple native protocol', ba) writeVarUInt(21, ba) writeVarUInt(9, ba) writeVarUInt(54449, ba) writeStringBinary('default', ba) # database writeStringBinary('default', ba) # user writeStringBinary('', ba) # pwd s.sendall(ba) def receiveHello(s): p_type = readVarUInt(s) assert (p_type == 0) # Hello server_name = readStringBinary(s) # print("Server name: ", server_name) server_version_major = readVarUInt(s) # print("Major: ", server_version_major) server_version_minor = readVarUInt(s) # print("Minor: ", server_version_minor) server_revision = readVarUInt(s) # print("Revision: ", server_revision) server_timezone = readStringBinary(s) # print("Timezone: ", server_timezone) server_display_name = readStringBinary(s) # print("Display name: ", server_display_name) server_version_patch = readVarUInt(s) # print("Version patch: ", server_version_patch) def serializeClientInfo(ba): writeStringBinary('default', ba) # initial_user writeStringBinary('123456', ba) # initial_query_id writeStringBinary('127.0.0.1:9000', ba) # initial_address ba.extend([0] * 8) # initial_query_start_time_microseconds ba.append(1) # TCP writeStringBinary('os_user', ba) # os_user writeStringBinary('client_hostname', ba) # client_hostname writeStringBinary('client_name', ba) # client_name writeVarUInt(21, ba) writeVarUInt(9, ba) writeVarUInt(54449, ba) writeStringBinary('', ba) # quota_key writeVarUInt(0, ba) # distributed_depth writeVarUInt(1, ba) # client_version_patch ba.append(0) # No telemetry def sendQuery(s, query): ba = bytearray() writeVarUInt(1, ba) # query writeStringBinary('123456', ba) ba.append(1) # INITIAL_QUERY # client info serializeClientInfo(ba) writeStringBinary('', ba) # No settings writeStringBinary('', ba) # No interserver secret writeVarUInt(2, ba) # Stage - Complete ba.append(0) # No compression writeStringBinary(query + ' settings input_format_defaults_for_omitted_fields=0', ba) # query, finally s.sendall(ba) def serializeBlockInfo(ba): writeVarUInt(1, ba) # 1 ba.append(0) # is_overflows writeVarUInt(2, ba) # 2 writeVarUInt(0, ba) # 0 ba.extend([0] * 4) # bucket_num def sendEmptyBlock(s): ba = bytearray() writeVarUInt(2, ba) # Data writeStringBinary('', ba) serializeBlockInfo(ba) writeVarUInt(0, ba) # rows writeVarUInt(0, ba) # columns s.sendall(ba) def readHeader(s): readVarUInt(s) # Data readStringBinary(s) # external table name # BlockInfo readVarUInt(s) # 1 readUInt8(s) # is_overflows readVarUInt(s) # 2 readUInt32(s) # bucket_num readVarUInt(s) # 0 columns = readVarUInt(s) # rows rows = readVarUInt(s) # columns print("Rows {} Columns {}".format(rows, columns)) for _ in range(columns): col_name = readStringBinary(s) type_name = readStringBinary(s) print("Column {} type {}".format(col_name, type_name)) def readException(s): assert(readVarUInt(s) == 2) code = readUInt32(s) name = readStringBinary(s) text = readStringBinary(s) readStringBinary(s) # trace assert(readUInt8(s) == 0) # has_nested print("code {}: {}".format(code, text.replace('DB::Exception:', ''))) def insertValidLowCardinalityRow(): with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.settimeout(30) s.connect((CLICKHOUSE_HOST, CLICKHOUSE_PORT)) sendHello(s) receiveHello(s) sendQuery(s, 'insert into {}.tab format TSV'.format(CLICKHOUSE_DATABASE)) # external tables sendEmptyBlock(s) readHeader(s) # Data ba = bytearray() writeVarUInt(2, ba) # Data writeStringBinary('', ba) serializeBlockInfo(ba) writeVarUInt(1, ba) # rows writeVarUInt(1, ba) # columns writeStringBinary('x', ba) writeStringBinary('LowCardinality(String)', ba) ba.extend([1] + [0] * 7) # SharedDictionariesWithAdditionalKeys ba.extend([3, 2] + [0] * 6) # indexes type: UInt64 [3], with additional keys [2] ba.extend([1] + [0] * 7) # num_keys in dict writeStringBinary('hello', ba) # key ba.extend([1] + [0] * 7) # num_indexes ba.extend([0] * 8) # UInt64 index (0 for 'hello') s.sendall(ba) # Fin block sendEmptyBlock(s) assert(readVarUInt(s) == 5) # End of stream s.close() def insertLowCardinalityRowWithIndexOverflow(): with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.settimeout(30) s.connect((CLICKHOUSE_HOST, CLICKHOUSE_PORT)) sendHello(s) receiveHello(s) sendQuery(s, 'insert into {}.tab format TSV'.format(CLICKHOUSE_DATABASE)) # external tables sendEmptyBlock(s) readHeader(s) # Data ba = bytearray() writeVarUInt(2, ba) # Data writeStringBinary('', ba) serializeBlockInfo(ba) writeVarUInt(1, ba) # rows writeVarUInt(1, ba) # columns writeStringBinary('x', ba) writeStringBinary('LowCardinality(String)', ba) ba.extend([1] + [0] * 7) # SharedDictionariesWithAdditionalKeys ba.extend([3, 2] + [0] * 6) # indexes type: UInt64 [3], with additional keys [2] ba.extend([1] + [0] * 7) # num_keys in dict writeStringBinary('hello', ba) # key ba.extend([1] + [0] * 7) # num_indexes ba.extend([0] * 7 + [1]) # UInt64 index (overflow) s.sendall(ba) readException(s) s.close() def insertLowCardinalityRowWithIncorrectDictType(): with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.settimeout(30) s.connect((CLICKHOUSE_HOST, CLICKHOUSE_PORT)) sendHello(s) receiveHello(s) sendQuery(s, 'insert into {}.tab format TSV'.format(CLICKHOUSE_DATABASE)) # external tables sendEmptyBlock(s) readHeader(s) # Data ba = bytearray() writeVarUInt(2, ba) # Data writeStringBinary('', ba) serializeBlockInfo(ba) writeVarUInt(1, ba) # rows writeVarUInt(1, ba) # columns writeStringBinary('x', ba) writeStringBinary('LowCardinality(String)', ba) ba.extend([1] + [0] * 7) # SharedDictionariesWithAdditionalKeys ba.extend([3, 3] + [0] * 6) # indexes type: UInt64 [3], with global dict and add keys [1 + 2] ba.extend([1] + [0] * 7) # num_keys in dict writeStringBinary('hello', ba) # key ba.extend([1] + [0] * 7) # num_indexes ba.extend([0] * 8) # UInt64 index (overflow) s.sendall(ba) readException(s) s.close() def main(): insertValidLowCardinalityRow() insertLowCardinalityRowWithIndexOverflow() insertLowCardinalityRowWithIncorrectDictType() if __name__ == "__main__": main()