ClickHouse/dbms/tests/queries/0_stateless/00991_live_view_watch_http.python
2019-08-16 19:28:25 +03:00

88 lines
2.1 KiB
Python
Executable File

#!/usr/bin/env python
import subprocess
import threading
import Queue as queue
import os
import sys
CLICKHOUSE_CLIENT = os.environ.get('CLICKHOUSE_CLIENT')
CLICKHOUSE_CURL = os.environ.get('CLICKHOUSE_CURL')
CLICKHOUSE_URL = os.environ.get('CLICKHOUSE_URL')
print(CLICKHOUSE_CLIENT)
print(CLICKHOUSE_CURL)
print(CLICKHOUSE_URL)
def send_query(query):
cmd = list(CLICKHOUSE_CLIENT.split())
cmd += ['--query', query]
print(' '.join(cmd))
return subprocess.Popen(cmd, stdout=subprocess.PIPE, close_fds=True).stdout
def send_http_query(query):
cmd = list(['curl', '--max-time', '20'])
cmd += ['-sS', CLICKHOUSE_URL, '-d', query]
return subprocess.Popen(cmd, stdout=subprocess.PIPE, close_fds=True, bufsize=1).stdout
def read_lines_and_push_to_queue(pipe, queue):
print('--------')
sys.stdout.flush()
s = ''
while True:
char = pipe.read(1)
if char == '':
break
print('> ' + char)
if char == '\n':
queue.put(s)
s = ''
else:
s += char
sys.stdout.flush()
#queue.put(line)
if s:
queue.put(s)
print(None)
queue.put(None)
def test():
print(1)
send_query('DROP TABLE IF EXISTS test.lv').read()
print(2)
send_query('DROP TABLE IF EXISTS test.mt').read()
send_query('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()').read()
send_query('CREATE LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt').read()
q = queue.Queue()
pipe = send_http_query('WATCH test.lv')
sys.stdout.flush()
thread = threading.Thread(target=read_lines_and_push_to_queue, args=(pipe, q))
thread.start()
print(3)
sys.stdout.flush()
assert (q.get() == '0\t1')
print(3.1)
sys.stdout.flush()
send_query('INSERT INTO test.mt VALUES (1),(2),(3)').read()
print(4)
assert (q.get() == '6\t2')
print(4.1)
send_query('DROP TABLE test.lv').read()
print(5)
assert (q.get() is None)
print(5.2)
send_query('DROP TABLE test.mt').read()
print(6)
thread.join()
test()