mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-26 09:32:01 +00:00
Try to rewrite live view test.
This commit is contained in:
parent
e9070ede0a
commit
5c10bff9c2
@ -12,76 +12,61 @@ CLICKHOUSE_CURL = os.environ.get('CLICKHOUSE_CURL')
|
||||
CLICKHOUSE_URL = os.environ.get('CLICKHOUSE_URL')
|
||||
|
||||
|
||||
print(CLICKHOUSE_CLIENT)
|
||||
print(CLICKHOUSE_CURL)
|
||||
print(CLICKHOUSE_URL)
|
||||
|
||||
|
||||
def send_query(query):
|
||||
cmd = list(CLICKHOUSE_CLIENT.split())
|
||||
cmd += ['--query', query]
|
||||
print(' '.join(cmd))
|
||||
return subprocess.Popen(cmd, stdout=subprocess.PIPE, close_fds=True).stdout
|
||||
# print(cmd)
|
||||
return subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
|
||||
|
||||
|
||||
def send_http_query(query):
|
||||
cmd = list(['curl', '--max-time', '20'])
|
||||
cmd += ['-sS', CLICKHOUSE_URL, '-d', query]
|
||||
return subprocess.Popen(cmd, stdout=subprocess.PIPE, close_fds=True, bufsize=1).stdout
|
||||
cmd = list(CLICKHOUSE_CURL.split()) # list(['curl', '-sSN', '--max-time', '10'])
|
||||
cmd += ['-sSN', CLICKHOUSE_URL, '-d', query]
|
||||
return subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
|
||||
|
||||
|
||||
def read_lines_and_push_to_queue(pipe, queue):
|
||||
print('--------')
|
||||
def read_lines_and_push_to_queue(p, queue):
|
||||
pipe = p.stdout
|
||||
sys.stdout.flush()
|
||||
s = ''
|
||||
while True:
|
||||
char = pipe.read(1)
|
||||
if char == '':
|
||||
break
|
||||
|
||||
print('> ' + char)
|
||||
if char == '\n':
|
||||
queue.put(s)
|
||||
s = ''
|
||||
else:
|
||||
s += char
|
||||
for line in iter(pipe.readline, ''):
|
||||
line = line.strip()
|
||||
print(line)
|
||||
sys.stdout.flush()
|
||||
#queue.put(line)
|
||||
queue.put(line)
|
||||
|
||||
if s:
|
||||
queue.put(s)
|
||||
print(None)
|
||||
queue.put(None)
|
||||
p.wait()
|
||||
|
||||
|
||||
def test():
|
||||
print(1)
|
||||
send_query('DROP TABLE IF EXISTS test.lv').read()
|
||||
print(2)
|
||||
send_query('DROP TABLE IF EXISTS test.mt').read()
|
||||
send_query('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()').read()
|
||||
send_query('CREATE LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt').read()
|
||||
send_query('DROP TABLE IF EXISTS test.lv').stdout.read()
|
||||
send_query('DROP TABLE IF EXISTS test.mt').stdout.read()
|
||||
send_query('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()').stdout.read()
|
||||
send_query('CREATE LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt').stdout.read()
|
||||
|
||||
q = queue.Queue()
|
||||
pipe = send_http_query('WATCH test.lv')
|
||||
sys.stdout.flush()
|
||||
thread = threading.Thread(target=read_lines_and_push_to_queue, args=(pipe, q))
|
||||
thread.start()
|
||||
print(3)
|
||||
sys.stdout.flush()
|
||||
assert (q.get() == '0\t1')
|
||||
print(3.1)
|
||||
sys.stdout.flush()
|
||||
send_query('INSERT INTO test.mt VALUES (1),(2),(3)').read()
|
||||
print(4)
|
||||
assert (q.get() == '6\t2')
|
||||
print(4.1)
|
||||
send_query('DROP TABLE test.lv').read()
|
||||
print(5)
|
||||
assert (q.get() is None)
|
||||
print(5.2)
|
||||
send_query('DROP TABLE test.mt').read()
|
||||
print(6)
|
||||
|
||||
line = q.get()
|
||||
print(line)
|
||||
assert (line == '0\t1')
|
||||
|
||||
p = send_query('INSERT INTO test.mt VALUES (1),(2),(3)')
|
||||
p.stdout.read()
|
||||
p.wait()
|
||||
line = q.get()
|
||||
print(line)
|
||||
assert (line == '6\t2')
|
||||
|
||||
p = send_query('DROP TABLE if exitst test.lv')
|
||||
p.stdout.read()
|
||||
p.wait()
|
||||
p = send_query('DROP TABLE if exitst test.mt')
|
||||
p.stdout.read()
|
||||
p.wait()
|
||||
thread.join()
|
||||
|
||||
test()
|
||||
|
Loading…
Reference in New Issue
Block a user