diff --git a/dbms/tests/queries/0_stateless/00991_live_view_watch_http.python b/dbms/tests/queries/0_stateless/00991_live_view_watch_http.python index d2e55cd1ea1..2f46aa11046 100755 --- a/dbms/tests/queries/0_stateless/00991_live_view_watch_http.python +++ b/dbms/tests/queries/0_stateless/00991_live_view_watch_http.python @@ -12,76 +12,61 @@ CLICKHOUSE_CURL = os.environ.get('CLICKHOUSE_CURL') CLICKHOUSE_URL = os.environ.get('CLICKHOUSE_URL') -print(CLICKHOUSE_CLIENT) -print(CLICKHOUSE_CURL) -print(CLICKHOUSE_URL) - - def send_query(query): cmd = list(CLICKHOUSE_CLIENT.split()) cmd += ['--query', query] - print(' '.join(cmd)) - return subprocess.Popen(cmd, stdout=subprocess.PIPE, close_fds=True).stdout + # print(cmd) + return subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) def send_http_query(query): - cmd = list(['curl', '--max-time', '20']) - cmd += ['-sS', CLICKHOUSE_URL, '-d', query] - return subprocess.Popen(cmd, stdout=subprocess.PIPE, close_fds=True, bufsize=1).stdout + cmd = list(CLICKHOUSE_CURL.split()) # list(['curl', '-sSN', '--max-time', '10']) + cmd += ['-sSN', CLICKHOUSE_URL, '-d', query] + return subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) -def read_lines_and_push_to_queue(pipe, queue): - print('--------') +def read_lines_and_push_to_queue(p, queue): + pipe = p.stdout sys.stdout.flush() - s = '' - while True: - char = pipe.read(1) - if char == '': - break - print('> ' + char) - if char == '\n': - queue.put(s) - s = '' - else: - s += char + for line in iter(pipe.readline, ''): + line = line.strip() + print(line) sys.stdout.flush() - #queue.put(line) + queue.put(line) - if s: - queue.put(s) - print(None) queue.put(None) + p.wait() def test(): - print(1) - send_query('DROP TABLE IF EXISTS test.lv').read() - print(2) - send_query('DROP TABLE IF EXISTS test.mt').read() - send_query('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()').read() - send_query('CREATE LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt').read() + send_query('DROP TABLE IF EXISTS test.lv').stdout.read() + send_query('DROP TABLE IF EXISTS test.mt').stdout.read() + send_query('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()').stdout.read() + send_query('CREATE LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt').stdout.read() q = queue.Queue() pipe = send_http_query('WATCH test.lv') - sys.stdout.flush() thread = threading.Thread(target=read_lines_and_push_to_queue, args=(pipe, q)) thread.start() - print(3) - sys.stdout.flush() - assert (q.get() == '0\t1') - print(3.1) - sys.stdout.flush() - send_query('INSERT INTO test.mt VALUES (1),(2),(3)').read() - print(4) - assert (q.get() == '6\t2') - print(4.1) - send_query('DROP TABLE test.lv').read() - print(5) - assert (q.get() is None) - print(5.2) - send_query('DROP TABLE test.mt').read() - print(6) + + line = q.get() + print(line) + assert (line == '0\t1') + + p = send_query('INSERT INTO test.mt VALUES (1),(2),(3)') + p.stdout.read() + p.wait() + line = q.get() + print(line) + assert (line == '6\t2') + + p = send_query('DROP TABLE if exitst test.lv') + p.stdout.read() + p.wait() + p = send_query('DROP TABLE if exitst test.mt') + p.stdout.read() + p.wait() thread.join() test()