ClickHouse/dbms/tests/queries/0_stateless/00991_temporary_live_view_watch_live.python
2019-08-19 16:35:00 +03:00

82 lines
2.1 KiB
Python

#!/usr/bin/env python
import subprocess
import threading
import Queue as queue
import os
import sys
import signal
CLICKHOUSE_CLIENT = os.environ.get('CLICKHOUSE_CLIENT')
CLICKHOUSE_CURL = os.environ.get('CLICKHOUSE_CURL')
CLICKHOUSE_URL = os.environ.get('CLICKHOUSE_URL')
def send_query(query):
cmd = list(CLICKHOUSE_CLIENT.split())
cmd += ['--query', query]
# print(cmd)
return subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT).stdout
def send_query_in_process_group(query):
cmd = list(CLICKHOUSE_CLIENT.split())
cmd += ['--query', query]
print(cmd)
return subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, preexec_fn=os.setsid)
def read_lines_and_push_to_queue(pipe, queue):
try:
for line in iter(pipe.readline, ''):
line = line.strip()
print(line)
sys.stdout.flush()
queue.put(line)
except KeyboardInterrupt:
pass
queue.put(None)
def test():
send_query('DROP TABLE IF EXISTS test.lv').read()
send_query('DROP TABLE IF EXISTS test.mt').read()
send_query('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()').read()
send_query('CREATE TEMPORARY LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt').read()
q = queue.Queue()
p = send_query_in_process_group('WATCH test.lv')
thread = threading.Thread(target=read_lines_and_push_to_queue, args=(p.stdout, q))
thread.start()
line = q.get()
print(line)
assert (line == '0\t1')
send_query('INSERT INTO test.mt VALUES (1),(2),(3)').read()
line = q.get()
print(line)
assert (line == '6\t2')
send_query('INSERT INTO test.mt VALUES (4),(5),(6)').read()
line = q.get()
print(line)
assert (line == '21\t3')
# Send Ctrl+C to client.
os.killpg(os.getpgid(p.pid), signal.SIGINT)
# This insert shouldn't affect lv.
send_query('INSERT INTO test.mt VALUES (7),(8),(9)').read()
line = q.get()
print(line)
assert (line is None)
send_query('DROP TABLE if exists test.lv').read()
send_query('DROP TABLE if exists test.lv').read()
thread.join()
test()