#!/usr/bin/env python3 import subprocess import threading import queue as queue import os import sys CLICKHOUSE_CLIENT = os.environ.get('CLICKHOUSE_CLIENT') CLICKHOUSE_CURL = os.environ.get('CLICKHOUSE_CURL') CLICKHOUSE_URL = os.environ.get('CLICKHOUSE_URL') def send_query(query): cmd = list(CLICKHOUSE_CLIENT.split()) cmd += ['--query', query] # print(cmd) return subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT).stdout def send_http_query(query): cmd = list(CLICKHOUSE_CURL.split()) # list(['curl', '-sSN', '--max-time', '10']) cmd += ['-sSN', CLICKHOUSE_URL, '-d', query] return subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT).stdout def read_lines_and_push_to_queue(pipe, queue): for line in iter(pipe.readline, ''): line = line.strip() print(line) sys.stdout.flush() queue.put(line) queue.put(None) def test(): send_query('DROP TABLE IF EXISTS test.lv').read() send_query('DROP TABLE IF EXISTS test.mt').read() send_query('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()').read() send_query('CREATE LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt').read() q = queue.Queue() pipe = send_http_query('WATCH test.lv') thread = threading.Thread(target=read_lines_and_push_to_queue, args=(pipe, q)) thread.start() line = q.get() print(line) assert (line == '0\t1') send_query('INSERT INTO test.mt VALUES (1),(2),(3)').read() line = q.get() print(line) assert (line == '6\t2') send_query('DROP TABLE if exists test.lv').read() send_query('DROP TABLE if exists test.lv').read() thread.join() test()