#!/usr/bin/env python import subprocess import threading import Queue as queue import os import sys import signal CLICKHOUSE_CLIENT = os.environ.get('CLICKHOUSE_CLIENT') CLICKHOUSE_CURL = os.environ.get('CLICKHOUSE_CURL') CLICKHOUSE_URL = os.environ.get('CLICKHOUSE_URL') def send_query(query): cmd = list(CLICKHOUSE_CLIENT.split()) cmd += ['--query', query] # print(cmd) return subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT).stdout def send_query_in_process_group(query): cmd = list(CLICKHOUSE_CLIENT.split()) cmd += ['--query', query] # print(cmd) return subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, preexec_fn=os.setsid) def read_lines_and_push_to_queue(pipe, queue): try: for line in iter(pipe.readline, ''): line = line.strip() print(line) sys.stdout.flush() queue.put(line) except KeyboardInterrupt: pass queue.put(None) def test(): send_query('DROP TABLE IF EXISTS test.lv').read() send_query('DROP TABLE IF EXISTS test.mt').read() send_query('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()').read() send_query('CREATE TEMPORARY LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt').read() q = queue.Queue() p = send_query_in_process_group('WATCH test.lv') thread = threading.Thread(target=read_lines_and_push_to_queue, args=(p.stdout, q)) thread.start() line = q.get() print(line) assert (line == '0\t1') send_query('INSERT INTO test.mt VALUES (1),(2),(3)').read() line = q.get() print(line) assert (line == '6\t2') send_query('INSERT INTO test.mt VALUES (4),(5),(6)').read() line = q.get() print(line) assert (line == '21\t3') # Send Ctrl+C to client. os.killpg(os.getpgid(p.pid), signal.SIGINT) # This insert shouldn't affect lv. send_query('INSERT INTO test.mt VALUES (7),(8),(9)').read() line = q.get() print(line) assert (line is None) send_query('DROP TABLE if exists test.lv').read() send_query('DROP TABLE if exists test.lv').read() thread.join() test()