Merge pull request #31556 from azat/fix-02010_lc_native

Fix 02010_lc_native flakiness (Query with id = 123456 is already running)
This commit is contained in:
alexey-milovidov 2021-11-21 11:33:35 +03:00 committed by GitHub
commit 6ef001fce5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -3,10 +3,12 @@
import socket import socket
import os import os
import uuid
CLICKHOUSE_HOST = os.environ.get('CLICKHOUSE_HOST', '127.0.0.1') CLICKHOUSE_HOST = os.environ.get('CLICKHOUSE_HOST', '127.0.0.1')
CLICKHOUSE_PORT = int(os.environ.get('CLICKHOUSE_PORT_TCP', '900000')) CLICKHOUSE_PORT = int(os.environ.get('CLICKHOUSE_PORT_TCP', '900000'))
CLICKHOUSE_DATABASE = os.environ.get('CLICKHOUSE_DATABASE', 'default') CLICKHOUSE_DATABASE = os.environ.get('CLICKHOUSE_DATABASE', 'default')
CLICKHOUSE_QUERY_ID = uuid.uuid4().hex
def writeVarUInt(x, ba): def writeVarUInt(x, ba):
for _ in range(0, 9): for _ in range(0, 9):
@ -111,7 +113,7 @@ def receiveHello(s):
def serializeClientInfo(ba): def serializeClientInfo(ba):
writeStringBinary('default', ba) # initial_user writeStringBinary('default', ba) # initial_user
writeStringBinary('123456', ba) # initial_query_id writeStringBinary(CLICKHOUSE_QUERY_ID, ba) # initial_query_id
writeStringBinary('127.0.0.1:9000', ba) # initial_address writeStringBinary('127.0.0.1:9000', ba) # initial_address
ba.extend([0] * 8) # initial_query_start_time_microseconds ba.extend([0] * 8) # initial_query_start_time_microseconds
ba.append(1) # TCP ba.append(1) # TCP
@ -130,7 +132,7 @@ def serializeClientInfo(ba):
def sendQuery(s, query): def sendQuery(s, query):
ba = bytearray() ba = bytearray()
writeVarUInt(1, ba) # query writeVarUInt(1, ba) # query
writeStringBinary('123456', ba) writeStringBinary(CLICKHOUSE_QUERY_ID, ba)
ba.append(1) # INITIAL_QUERY ba.append(1) # INITIAL_QUERY
@ -163,15 +165,22 @@ def sendEmptyBlock(s):
s.sendall(ba) s.sendall(ba)
def assertPacket(packet, expected):
assert(packet == expected), packet
def readHeader(s): def readHeader(s):
readVarUInt(s) # Data packet_type = readVarUInt(s)
if packet_type == 2: # Exception
raise RuntimeError(readException(s))
assertPacket(packet_type, 1) # Data
readStringBinary(s) # external table name readStringBinary(s) # external table name
# BlockInfo # BlockInfo
readVarUInt(s) # 1 assertPacket(readVarUInt(s), 1) # 1
readUInt8(s) # is_overflows assertPacket(readUInt8(s), 0) # is_overflows
readVarUInt(s) # 2 assertPacket(readVarUInt(s), 2) # 2
readUInt32(s) # bucket_num assertPacket(readUInt32(s), 4294967295) # bucket_num
readVarUInt(s) # 0 assertPacket(readVarUInt(s), 0) # 0
columns = readVarUInt(s) # rows columns = readVarUInt(s) # rows
rows = readVarUInt(s) # columns rows = readVarUInt(s) # columns
print("Rows {} Columns {}".format(rows, columns)) print("Rows {} Columns {}".format(rows, columns))
@ -182,13 +191,12 @@ def readHeader(s):
def readException(s): def readException(s):
assert(readVarUInt(s) == 2)
code = readUInt32(s) code = readUInt32(s)
name = readStringBinary(s) name = readStringBinary(s)
text = readStringBinary(s) text = readStringBinary(s)
readStringBinary(s) # trace readStringBinary(s) # trace
assert(readUInt8(s) == 0) # has_nested assertPacket(readUInt8(s), 0) # has_nested
print("code {}: {}".format(code, text.replace('DB::Exception:', ''))) return "code {}: {}".format(code, text.replace('DB::Exception:', ''))
def insertValidLowCardinalityRow(): def insertValidLowCardinalityRow():
@ -223,7 +231,7 @@ def insertValidLowCardinalityRow():
# Fin block # Fin block
sendEmptyBlock(s) sendEmptyBlock(s)
assert(readVarUInt(s) == 5) # End of stream assertPacket(readVarUInt(s), 5) # End of stream
s.close() s.close()
@ -256,7 +264,8 @@ def insertLowCardinalityRowWithIndexOverflow():
ba.extend([0] * 7 + [1]) # UInt64 index (overflow) ba.extend([0] * 7 + [1]) # UInt64 index (overflow)
s.sendall(ba) s.sendall(ba)
readException(s) assertPacket(readVarUInt(s), 2)
print(readException(s))
s.close() s.close()
@ -289,7 +298,8 @@ def insertLowCardinalityRowWithIncorrectDictType():
ba.extend([0] * 8) # UInt64 index (overflow) ba.extend([0] * 8) # UInt64 index (overflow)
s.sendall(ba) s.sendall(ba)
readException(s) assertPacket(readVarUInt(s), 2)
print(readException(s))
s.close() s.close()