mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-10-20 07:21:02 +00:00
ffe8931cda
This reverts commit6cf5327269
, reversing changes made to4155771106
.
84 lines
2.3 KiB
Python
84 lines
2.3 KiB
Python
#!/usr/bin/env python
|
|
|
|
import subprocess
|
|
import threading
|
|
import Queue as queue
|
|
import os
|
|
import sys
|
|
import signal
|
|
|
|
|
|
CLICKHOUSE_CLIENT = os.environ.get('CLICKHOUSE_CLIENT')
|
|
CLICKHOUSE_CURL = os.environ.get('CLICKHOUSE_CURL')
|
|
CLICKHOUSE_URL = os.environ.get('CLICKHOUSE_URL')
|
|
|
|
|
|
def send_query(query):
|
|
cmd = list(CLICKHOUSE_CLIENT.split())
|
|
cmd += ['--query', query]
|
|
# print(cmd)
|
|
return subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT).stdout
|
|
|
|
|
|
def send_query_in_process_group(query):
|
|
cmd = list(CLICKHOUSE_CLIENT.split())
|
|
cmd += ['--query', query, '--live_view_heartbeat_interval=1', '--progress']
|
|
# print(cmd)
|
|
return subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, preexec_fn=os.setsid)
|
|
|
|
|
|
def read_lines_and_push_to_queue(pipe, queue):
|
|
try:
|
|
for line in iter(pipe.readline, ''):
|
|
line = line.strip()
|
|
# print(line)
|
|
sys.stdout.flush()
|
|
queue.put(line)
|
|
except KeyboardInterrupt:
|
|
pass
|
|
|
|
queue.put(None)
|
|
|
|
|
|
def test():
|
|
send_query('DROP TABLE IF EXISTS test.lv').read()
|
|
send_query('DROP TABLE IF EXISTS test.mt').read()
|
|
send_query('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()').read()
|
|
send_query('CREATE TEMPORARY LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt').read()
|
|
|
|
q = queue.Queue()
|
|
p = send_query_in_process_group('WATCH test.lv')
|
|
thread = threading.Thread(target=read_lines_and_push_to_queue, args=(p.stdout, q))
|
|
thread.start()
|
|
|
|
line = q.get()
|
|
# print(line)
|
|
assert (line.endswith('0\t1'))
|
|
assert ('Progress: 0.00 rows' in line)
|
|
|
|
send_query('INSERT INTO test.mt VALUES (1),(2),(3)').read()
|
|
line = q.get()
|
|
assert (line.endswith('6\t2'))
|
|
assert ('Progress: 1.00 rows' in line)
|
|
|
|
# send_query('INSERT INTO test.mt VALUES (4),(5),(6)').read()
|
|
# line = q.get()
|
|
# print(line)
|
|
# assert (line.endswith('6\t2'))
|
|
# assert ('Progress: 1.00 rows' in line)
|
|
|
|
# Send Ctrl+C to client.
|
|
os.killpg(os.getpgid(p.pid), signal.SIGINT)
|
|
# This insert shouldn't affect lv.
|
|
send_query('INSERT INTO test.mt VALUES (7),(8),(9)').read()
|
|
line = q.get()
|
|
# print(line)
|
|
# assert (line is None)
|
|
|
|
send_query('DROP TABLE if exists test.lv').read()
|
|
send_query('DROP TABLE if exists test.lv').read()
|
|
|
|
thread.join()
|
|
|
|
test()
|