diff --git a/dbms/tests/queries/0_stateless/00960_live_view_watch_events_live.py b/dbms/tests/queries/0_stateless/00960_live_view_watch_events_live.py new file mode 100755 index 00000000000..2095683720e --- /dev/null +++ b/dbms/tests/queries/0_stateless/00960_live_view_watch_events_live.py @@ -0,0 +1,47 @@ +#!/usr/bin/env python +import os +import sys +import signal + +CURDIR = os.path.dirname(os.path.realpath(__file__)) +sys.path.insert(0, os.path.join(CURDIR, 'helpers')) + +from client import client, prompt, end_of_block + +log = None +# uncomment the line below for debugging +#log=sys.stdout + +with client(name='client1>', log=log) as client1, client(name='client2>', log=log) as client2: + client1.expect(prompt) + client2.expect(prompt) + + client1.send('SET allow_experimental_live_view = 1') + client1.expect(prompt) + client2.send('SET allow_experimental_live_view = 1') + client2.expect(prompt) + + client1.send('DROP TABLE IF EXISTS test.lv') + client1.expect(prompt) + client1.send(' DROP TABLE IF EXISTS test.mt') + client1.expect(prompt) + client1.send('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()') + client1.expect(prompt) + client1.send('CREATE LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt') + client1.expect(prompt) + client1.send('WATCH test.lv EVENTS') + client1.expect('1.*' + end_of_block) + client2.send('INSERT INTO test.mt VALUES (1),(2),(3)') + client1.expect('2.*' + end_of_block) + client2.send('INSERT INTO test.mt VALUES (4),(5),(6)') + client1.expect('3.*' + end_of_block) + # send Ctrl-C + client1.send('\x03', eol='') + match = client1.expect('(%s)|([#\$] )' % prompt) + if match.groups()[1]: + client1.send(client1.command) + client1.expect(prompt) + client1.send('DROP TABLE test.lv') + client1.expect(prompt) + client1.send('DROP TABLE test.mt') + client1.expect(prompt) diff --git a/dbms/tests/queries/0_stateless/00960_live_view_watch_events_live.reference b/dbms/tests/queries/0_stateless/00960_live_view_watch_events_live.reference new file mode 100644 index 00000000000..e69de29bb2d diff --git a/dbms/tests/queries/0_stateless/00961_temporary_live_view_watch.reference b/dbms/tests/queries/0_stateless/00961_temporary_live_view_watch.reference new file mode 100644 index 00000000000..6fbbedf1b21 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00961_temporary_live_view_watch.reference @@ -0,0 +1,3 @@ +0 1 +6 2 +21 3 diff --git a/dbms/tests/queries/0_stateless/00961_temporary_live_view_watch.sql b/dbms/tests/queries/0_stateless/00961_temporary_live_view_watch.sql new file mode 100644 index 00000000000..7992da92f97 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00961_temporary_live_view_watch.sql @@ -0,0 +1,20 @@ +SET allow_experimental_live_view = 1; + +DROP TABLE IF EXISTS test.lv; +DROP TABLE IF EXISTS test.mt; + +CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple(); +CREATE TEMPORARY LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt; + +WATCH test.lv LIMIT 0; + +INSERT INTO test.mt VALUES (1),(2),(3); + +WATCH test.lv LIMIT 0; + +INSERT INTO test.mt VALUES (4),(5),(6); + +WATCH test.lv LIMIT 0; + +DROP TABLE test.lv; +DROP TABLE test.mt; diff --git a/dbms/tests/queries/0_stateless/00962_temporary_live_view_watch_live.py b/dbms/tests/queries/0_stateless/00962_temporary_live_view_watch_live.py new file mode 100755 index 00000000000..3dbec01b29a --- /dev/null +++ b/dbms/tests/queries/0_stateless/00962_temporary_live_view_watch_live.py @@ -0,0 +1,47 @@ +#!/usr/bin/env python +import os +import sys +import signal + +CURDIR = os.path.dirname(os.path.realpath(__file__)) +sys.path.insert(0, os.path.join(CURDIR, 'helpers')) + +from client import client, prompt, end_of_block + +log = None +# uncomment the line below for debugging +#log=sys.stdout + +with client(name='client1>', log=log) as client1, client(name='client2>', log=log) as client2: + client1.expect(prompt) + client2.expect(prompt) + + client1.send('SET allow_experimental_live_view = 1') + client1.expect(prompt) + client2.send('SET allow_experimental_live_view = 1') + client2.expect(prompt) + + client1.send('DROP TABLE IF EXISTS test.lv') + client1.expect(prompt) + client1.send('DROP TABLE IF EXISTS test.mt') + client1.expect(prompt) + client1.send('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()') + client1.expect(prompt) + client1.send('CREATE TEMPORARY LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt') + client1.expect(prompt) + client1.send('WATCH test.lv') + client1.expect(r'0.*1' + end_of_block) + client2.send('INSERT INTO test.mt VALUES (1),(2),(3)') + client1.expect(r'6.*2' + end_of_block) + client2.send('INSERT INTO test.mt VALUES (4),(5),(6)') + client1.expect(r'21.*3' + end_of_block) + # send Ctrl-C + client1.send('\x03', eol='') + match = client1.expect('(%s)|([#\$] )' % prompt) + if match.groups()[1]: + client1.send(client1.command) + client1.expect(prompt) + client1.send('DROP TABLE test.lv') + client1.expect(prompt) + client1.send('DROP TABLE test.mt') + client1.expect(prompt) diff --git a/dbms/tests/queries/0_stateless/00962_temporary_live_view_watch_live.reference b/dbms/tests/queries/0_stateless/00962_temporary_live_view_watch_live.reference new file mode 100644 index 00000000000..e69de29bb2d diff --git a/dbms/tests/queries/0_stateless/00963_temporary_live_view_watch_live_timeout.py.disabled b/dbms/tests/queries/0_stateless/00963_temporary_live_view_watch_live_timeout.py.disabled new file mode 100755 index 00000000000..b324c1b90cc --- /dev/null +++ b/dbms/tests/queries/0_stateless/00963_temporary_live_view_watch_live_timeout.py.disabled @@ -0,0 +1,54 @@ +#!/usr/bin/env python +import os +import sys +import signal + +CURDIR = os.path.dirname(os.path.realpath(__file__)) +sys.path.insert(0, os.path.join(CURDIR, 'helpers')) + +from client import client, prompt, end_of_block + +log = None +# uncomment the line below for debugging +#log=sys.stdout + +with client(name='client1>', log=log) as client1, client(name='client2>', log=log) as client2: + client1.expect(prompt) + client2.expect(prompt) + + client1.send('SET allow_experimental_live_view = 1') + client1.expect(prompt) + client2.send('SET allow_experimental_live_view = 1') + client2.expect(prompt) + + client1.send('DROP TABLE IF EXISTS test.lv') + client1.expect(prompt) + client1.send('DROP TABLE IF EXISTS test.mt') + client1.expect(prompt) + client1.send('SET temporary_live_view_timeout=1') + client1.expect(prompt) + client1.send('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()') + client1.expect(prompt) + client1.send('CREATE TEMPORARY LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt') + client1.expect(prompt) + client1.send('WATCH test.lv') + client1.expect(r'0.*1' + end_of_block) + client2.send('INSERT INTO test.mt VALUES (1),(2),(3)') + client2.expect(prompt) + client1.expect(r'6.*2' + end_of_block) + client2.send('INSERT INTO test.mt VALUES (4),(5),(6)') + client2.expect(prompt) + client1.expect(r'21.*3' + end_of_block) + # send Ctrl-C + client1.send('\x03', eol='') + match = client1.expect('(%s)|([#\$] )' % prompt) + if match.groups()[1]: + client1.send(client1.command) + client1.expect(prompt) + client1.send('SELECT sleep(1)') + client1.expect(prompt) + client1.send('DROP TABLE test.lv') + client1.expect('Table test.lv doesn\'t exist') + client1.expect(prompt) + client1.send('DROP TABLE test.mt') + client1.expect(prompt) diff --git a/dbms/tests/queries/0_stateless/00963_temporary_live_view_watch_live_timeout.reference b/dbms/tests/queries/0_stateless/00963_temporary_live_view_watch_live_timeout.reference new file mode 100644 index 00000000000..e69de29bb2d diff --git a/dbms/tests/queries/0_stateless/00964_live_view_watch_events_heartbeat.py b/dbms/tests/queries/0_stateless/00964_live_view_watch_events_heartbeat.py new file mode 100755 index 00000000000..528f18839bb --- /dev/null +++ b/dbms/tests/queries/0_stateless/00964_live_view_watch_events_heartbeat.py @@ -0,0 +1,49 @@ +#!/usr/bin/env python +import os +import sys +import signal + +CURDIR = os.path.dirname(os.path.realpath(__file__)) +sys.path.insert(0, os.path.join(CURDIR, 'helpers')) + +from client import client, prompt, end_of_block + +log = None +# uncomment the line below for debugging +#log=sys.stdout + +with client(name='client1>', log=log) as client1, client(name='client2>', log=log) as client2: + client1.expect(prompt) + client2.expect(prompt) + + client1.send('SET allow_experimental_live_view = 1') + client1.expect(prompt) + client2.send('SET allow_experimental_live_view = 1') + client2.expect(prompt) + + client1.send('DROP TABLE IF EXISTS test.lv') + client1.expect(prompt) + client1.send(' DROP TABLE IF EXISTS test.mt') + client1.expect(prompt) + client1.send('SET live_view_heartbeat_interval=1') + client1.expect(prompt) + client1.send('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()') + client1.expect(prompt) + client1.send('CREATE TEMPORARY LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt') + client1.expect(prompt) + client1.send('WATCH test.lv EVENTS') + client2.send('INSERT INTO test.mt VALUES (1),(2),(3)') + client1.expect('2.*' + end_of_block) + client1.expect('Progress: 2.00 rows.*\)') + # wait for heartbeat + client1.expect('Progress: 2.00 rows.*\)') + # send Ctrl-C + client1.send('\x03', eol='') + match = client1.expect('(%s)|([#\$] )' % prompt) + if match.groups()[1]: + client1.send(client1.command) + client1.expect(prompt) + client1.send('DROP TABLE test.lv') + client1.expect(prompt) + client1.send('DROP TABLE test.mt') + client1.expect(prompt) diff --git a/dbms/tests/queries/0_stateless/00964_live_view_watch_events_heartbeat.reference b/dbms/tests/queries/0_stateless/00964_live_view_watch_events_heartbeat.reference new file mode 100644 index 00000000000..e69de29bb2d diff --git a/dbms/tests/queries/0_stateless/00965_live_view_watch_heartbeat.py b/dbms/tests/queries/0_stateless/00965_live_view_watch_heartbeat.py new file mode 100755 index 00000000000..2723936f876 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00965_live_view_watch_heartbeat.py @@ -0,0 +1,50 @@ +#!/usr/bin/env python +import os +import sys +import signal + +CURDIR = os.path.dirname(os.path.realpath(__file__)) +sys.path.insert(0, os.path.join(CURDIR, 'helpers')) + +from client import client, prompt, end_of_block + +log = None +# uncomment the line below for debugging +#log=sys.stdout + +with client(name='client1>', log=log) as client1, client(name='client2>', log=log) as client2: + client1.expect(prompt) + client2.expect(prompt) + + client1.send('SET allow_experimental_live_view = 1') + client1.expect(prompt) + client2.send('SET allow_experimental_live_view = 1') + client2.expect(prompt) + + client1.send('DROP TABLE IF EXISTS test.lv') + client1.expect(prompt) + client1.send(' DROP TABLE IF EXISTS test.mt') + client1.expect(prompt) + client1.send('SET live_view_heartbeat_interval=1') + client1.expect(prompt) + client1.send('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()') + client1.expect(prompt) + client1.send('CREATE TEMPORARY LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt') + client1.expect(prompt) + client1.send('WATCH test.lv') + client1.expect(r'0.*1' + end_of_block) + client2.send('INSERT INTO test.mt VALUES (1),(2),(3)') + client1.expect(r'6.*2' + end_of_block) + client1.expect('Progress: 2.00 rows.*\)') + # wait for heartbeat + client1.expect('Progress: 2.00 rows.*\)') + # send Ctrl-C + client1.send('\x03', eol='') + match = client1.expect('(%s)|([#\$] )' % prompt) + if match.groups()[1]: + client1.send(client1.command) + client1.expect(prompt) + client1.send('DROP TABLE test.lv') + client1.expect(prompt) + client1.send('DROP TABLE test.mt') + client1.expect(prompt) diff --git a/dbms/tests/queries/0_stateless/00965_live_view_watch_heartbeat.reference b/dbms/tests/queries/0_stateless/00965_live_view_watch_heartbeat.reference new file mode 100644 index 00000000000..e69de29bb2d diff --git a/dbms/tests/queries/0_stateless/00966_live_view_watch_events_http.py b/dbms/tests/queries/0_stateless/00966_live_view_watch_events_http.py new file mode 100755 index 00000000000..72ab3ea8818 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00966_live_view_watch_events_http.py @@ -0,0 +1,40 @@ +#!/usr/bin/env python +import os +import sys + +CURDIR = os.path.dirname(os.path.realpath(__file__)) +sys.path.insert(0, os.path.join(CURDIR, 'helpers')) + +from client import client, prompt, end_of_block +from httpclient import client as http_client + +log = None +# uncomment the line below for debugging +#log=sys.stdout + +with client(name='client1>', log=log) as client1: + client1.expect(prompt) + + client1.send('SET allow_experimental_live_view = 1') + client1.expect(prompt) + + client1.send('DROP TABLE IF EXISTS test.lv') + client1.expect(prompt) + client1.send(' DROP TABLE IF EXISTS test.mt') + client1.expect(prompt) + client1.send('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()') + client1.expect(prompt) + client1.send('CREATE LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt') + client1.expect(prompt) + + + with http_client({'method':'GET', 'url': '/?allow_experimental_live_view=1&query=WATCH%20test.lv%20EVENTS'}, name='client2>', log=log) as client2: + client2.expect('.*1\n') + client1.send('INSERT INTO test.mt VALUES (1),(2),(3)') + client1.expect(prompt) + client2.expect('.*2\n') + + client1.send('DROP TABLE test.lv') + client1.expect(prompt) + client1.send('DROP TABLE test.mt') + client1.expect(prompt) diff --git a/dbms/tests/queries/0_stateless/00966_live_view_watch_events_http.reference b/dbms/tests/queries/0_stateless/00966_live_view_watch_events_http.reference new file mode 100644 index 00000000000..e69de29bb2d diff --git a/dbms/tests/queries/0_stateless/00967_live_view_watch_http.py b/dbms/tests/queries/0_stateless/00967_live_view_watch_http.py new file mode 100755 index 00000000000..e2f33971c3d --- /dev/null +++ b/dbms/tests/queries/0_stateless/00967_live_view_watch_http.py @@ -0,0 +1,40 @@ +#!/usr/bin/env python +import os +import sys + +CURDIR = os.path.dirname(os.path.realpath(__file__)) +sys.path.insert(0, os.path.join(CURDIR, 'helpers')) + +from client import client, prompt, end_of_block +from httpclient import client as http_client + +log = None +# uncomment the line below for debugging +#log=sys.stdout + +with client(name='client1>', log=log) as client1: + client1.expect(prompt) + + client1.send('SET allow_experimental_live_view = 1') + client1.expect(prompt) + + client1.send('DROP TABLE IF EXISTS test.lv') + client1.expect(prompt) + client1.send(' DROP TABLE IF EXISTS test.mt') + client1.expect(prompt) + client1.send('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()') + client1.expect(prompt) + client1.send('CREATE LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt') + client1.expect(prompt) + + + with http_client({'method':'GET', 'url':'/?allow_experimental_live_view=1&query=WATCH%20test.lv'}, name='client2>', log=log) as client2: + client2.expect('.*0\t1\n') + client1.send('INSERT INTO test.mt VALUES (1),(2),(3)') + client1.expect(prompt) + client2.expect('.*6\t2\n') + + client1.send('DROP TABLE test.lv') + client1.expect(prompt) + client1.send('DROP TABLE test.mt') + client1.expect(prompt) diff --git a/dbms/tests/queries/0_stateless/00967_live_view_watch_http.reference b/dbms/tests/queries/0_stateless/00967_live_view_watch_http.reference new file mode 100644 index 00000000000..e69de29bb2d diff --git a/dbms/tests/queries/0_stateless/00968_live_view_select_format_jsoneachrowwithprogress.reference b/dbms/tests/queries/0_stateless/00968_live_view_select_format_jsoneachrowwithprogress.reference new file mode 100644 index 00000000000..5ae423d90d1 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00968_live_view_select_format_jsoneachrowwithprogress.reference @@ -0,0 +1,4 @@ +{"row":{"a":1}} +{"row":{"a":2}} +{"row":{"a":3}} +{"progress":{"read_rows":"3","read_bytes":"36","written_rows":"0","written_bytes":"0","total_rows_to_read":"0"}} diff --git a/dbms/tests/queries/0_stateless/00968_live_view_select_format_jsoneachrowwithprogress.sql b/dbms/tests/queries/0_stateless/00968_live_view_select_format_jsoneachrowwithprogress.sql new file mode 100644 index 00000000000..1023cdf6b29 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00968_live_view_select_format_jsoneachrowwithprogress.sql @@ -0,0 +1,14 @@ +SET allow_experimental_live_view = 1; + +DROP TABLE IF EXISTS test.lv; +DROP TABLE IF EXISTS test.mt; + +CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple(); +CREATE LIVE VIEW test.lv AS SELECT * FROM test.mt; + +INSERT INTO test.mt VALUES (1),(2),(3); + +SELECT * FROM test.lv FORMAT JSONEachRowWithProgress; + +DROP TABLE test.lv; +DROP TABLE test.mt; diff --git a/dbms/tests/queries/0_stateless/00969_live_view_watch_format_jsoneachrowwithprogress.reference b/dbms/tests/queries/0_stateless/00969_live_view_watch_format_jsoneachrowwithprogress.reference new file mode 100644 index 00000000000..287a1ced92d --- /dev/null +++ b/dbms/tests/queries/0_stateless/00969_live_view_watch_format_jsoneachrowwithprogress.reference @@ -0,0 +1,6 @@ +{"row":{"sum(a)":"0","_version":"1"}} +{"progress":{"read_rows":"1","read_bytes":"16","written_rows":"0","written_bytes":"0","total_rows_to_read":"0"}} +{"row":{"sum(a)":"6","_version":"2"}} +{"progress":{"read_rows":"1","read_bytes":"16","written_rows":"0","written_bytes":"0","total_rows_to_read":"0"}} +{"row":{"sum(a)":"21","_version":"3"}} +{"progress":{"read_rows":"1","read_bytes":"16","written_rows":"0","written_bytes":"0","total_rows_to_read":"0"}} diff --git a/dbms/tests/queries/0_stateless/00969_live_view_watch_format_jsoneachrowwithprogress.sql b/dbms/tests/queries/0_stateless/00969_live_view_watch_format_jsoneachrowwithprogress.sql new file mode 100644 index 00000000000..3e46d55c014 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00969_live_view_watch_format_jsoneachrowwithprogress.sql @@ -0,0 +1,20 @@ +SET allow_experimental_live_view = 1; + +DROP TABLE IF EXISTS test.lv; +DROP TABLE IF EXISTS test.mt; + +CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple(); +CREATE LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt; + +WATCH test.lv LIMIT 0 FORMAT JSONEachRowWithProgress; + +INSERT INTO test.mt VALUES (1),(2),(3); + +WATCH test.lv LIMIT 0 FORMAT JSONEachRowWithProgress; + +INSERT INTO test.mt VALUES (4),(5),(6); + +WATCH test.lv LIMIT 0 FORMAT JSONEachRowWithProgress; + +DROP TABLE test.lv; +DROP TABLE test.mt; diff --git a/dbms/tests/queries/0_stateless/00970_live_view_watch_events_http_heartbeat.py b/dbms/tests/queries/0_stateless/00970_live_view_watch_events_http_heartbeat.py new file mode 100755 index 00000000000..8435cdc147a --- /dev/null +++ b/dbms/tests/queries/0_stateless/00970_live_view_watch_events_http_heartbeat.py @@ -0,0 +1,45 @@ +#!/usr/bin/env python +import os +import sys + +CURDIR = os.path.dirname(os.path.realpath(__file__)) +sys.path.insert(0, os.path.join(CURDIR, 'helpers')) + +from client import client, prompt, end_of_block +from httpclient import client as http_client + +log = None +# uncomment the line below for debugging +#log=sys.stdout + +with client(name='client1>', log=log) as client1: + client1.expect(prompt) + + client1.send('SET allow_experimental_live_view = 1') + client1.expect(prompt) + + client1.send('DROP TABLE IF EXISTS test.lv') + client1.expect(prompt) + client1.send(' DROP TABLE IF EXISTS test.mt') + client1.expect(prompt) + client1.send('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()') + client1.expect(prompt) + client1.send('CREATE LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt') + client1.expect(prompt) + + with http_client({'method':'GET', 'url': '/?allow_experimental_live_view=1&live_view_heartbeat_interval=1&query=WATCH%20test.lv%20EVENTS%20FORMAT%20JSONEachRowWithProgress'}, name='client2>', log=log) as client2: + client2.expect('{"progress":{"read_rows":"1","read_bytes":"8","written_rows":"0","written_bytes":"0","total_rows_to_read":"0"}}\n', escape=True) + client2.expect('{"row":{"version":"1"}', escape=True) + client2.expect('{"progress":{"read_rows":"1","read_bytes":"8","written_rows":"0","written_bytes":"0","total_rows_to_read":"0"}}', escape=True) + # heartbeat is provided by progress message + client2.expect('{"progress":{"read_rows":"1","read_bytes":"8","written_rows":"0","written_bytes":"0","total_rows_to_read":"0"}}', escape=True) + + client1.send('INSERT INTO test.mt VALUES (1),(2),(3)') + client1.expect(prompt) + + client2.expect('{"row":{"version":"2"}}\n', escape=True) + + client1.send('DROP TABLE test.lv') + client1.expect(prompt) + client1.send('DROP TABLE test.mt') + client1.expect(prompt) diff --git a/dbms/tests/queries/0_stateless/00970_live_view_watch_events_http_heartbeat.reference b/dbms/tests/queries/0_stateless/00970_live_view_watch_events_http_heartbeat.reference new file mode 100644 index 00000000000..e69de29bb2d diff --git a/dbms/tests/queries/0_stateless/00971_live_view_watch_http_heartbeat.py b/dbms/tests/queries/0_stateless/00971_live_view_watch_http_heartbeat.py new file mode 100755 index 00000000000..2317d705efe --- /dev/null +++ b/dbms/tests/queries/0_stateless/00971_live_view_watch_http_heartbeat.py @@ -0,0 +1,46 @@ +#!/usr/bin/env python +import os +import sys + +CURDIR = os.path.dirname(os.path.realpath(__file__)) +sys.path.insert(0, os.path.join(CURDIR, 'helpers')) + +from client import client, prompt, end_of_block +from httpclient import client as http_client + +log = None +# uncomment the line below for debugging +#log=sys.stdout + +with client(name='client1>', log=log) as client1: + client1.expect(prompt) + + client1.send('SET allow_experimental_live_view = 1') + client1.expect(prompt) + + client1.send('DROP TABLE IF EXISTS test.lv') + client1.expect(prompt) + client1.send(' DROP TABLE IF EXISTS test.mt') + client1.expect(prompt) + client1.send('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()') + client1.expect(prompt) + client1.send('CREATE LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt') + client1.expect(prompt) + + with http_client({'method':'GET', 'url':'/?allow_experimental_live_view=1&live_view_heartbeat_interval=1&query=WATCH%20test.lv%20FORMAT%20JSONEachRowWithProgress'}, name='client2>', log=log) as client2: + client2.expect('"progress".*',) + client2.expect('{"row":{"sum(a)":"0","_version":"1"}}\n', escape=True) + client2.expect('"progress".*\n') + # heartbeat is provided by progress message + client2.expect('"progress".*\n') + + client1.send('INSERT INTO test.mt VALUES (1),(2),(3)') + client1.expect(prompt) + + client2.expect('"progress".*"read_rows":"2".*\n') + client2.expect('{"row":{"sum(a)":"6","_version":"2"}}\n', escape=True) + + client1.send('DROP TABLE test.lv') + client1.expect(prompt) + client1.send('DROP TABLE test.mt') + client1.expect(prompt) diff --git a/dbms/tests/queries/0_stateless/00971_live_view_watch_http_heartbeat.reference b/dbms/tests/queries/0_stateless/00971_live_view_watch_http_heartbeat.reference new file mode 100644 index 00000000000..e69de29bb2d diff --git a/dbms/tests/queries/0_stateless/00972_live_view_select_1.reference b/dbms/tests/queries/0_stateless/00972_live_view_select_1.reference new file mode 100644 index 00000000000..d00491fd7e5 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00972_live_view_select_1.reference @@ -0,0 +1 @@ +1 diff --git a/dbms/tests/queries/0_stateless/00972_live_view_select_1.sql b/dbms/tests/queries/0_stateless/00972_live_view_select_1.sql new file mode 100644 index 00000000000..135516b0cd3 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00972_live_view_select_1.sql @@ -0,0 +1,9 @@ +SET allow_experimental_live_view = 1; + +DROP TABLE IF EXISTS test.lv; + +CREATE LIVE VIEW test.lv AS SELECT 1; + +SELECT * FROM test.lv; + +DROP TABLE test.lv; diff --git a/dbms/tests/queries/0_stateless/00973_live_view_select.reference b/dbms/tests/queries/0_stateless/00973_live_view_select.reference new file mode 100644 index 00000000000..75236c0daf7 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00973_live_view_select.reference @@ -0,0 +1,4 @@ +6 1 +6 1 +12 2 +12 2 diff --git a/dbms/tests/queries/0_stateless/00973_live_view_select.sql b/dbms/tests/queries/0_stateless/00973_live_view_select.sql new file mode 100644 index 00000000000..4b5ca0a2dd7 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00973_live_view_select.sql @@ -0,0 +1,20 @@ +SET allow_experimental_live_view = 1; + +DROP TABLE IF EXISTS test.lv; +DROP TABLE IF EXISTS test.mt; + +CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple(); +CREATE LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt; + +INSERT INTO test.mt VALUES (1),(2),(3); + +SELECT *,_version FROM test.lv; +SELECT *,_version FROM test.lv; + +INSERT INTO test.mt VALUES (1),(2),(3); + +SELECT *,_version FROM test.lv; +SELECT *,_version FROM test.lv; + +DROP TABLE test.lv; +DROP TABLE test.mt; diff --git a/dbms/tests/queries/0_stateless/00974_live_view_select_with_aggregation.reference b/dbms/tests/queries/0_stateless/00974_live_view_select_with_aggregation.reference new file mode 100644 index 00000000000..6d50f0e9c3a --- /dev/null +++ b/dbms/tests/queries/0_stateless/00974_live_view_select_with_aggregation.reference @@ -0,0 +1,2 @@ +6 +21 diff --git a/dbms/tests/queries/0_stateless/00974_live_view_select_with_aggregation.sql b/dbms/tests/queries/0_stateless/00974_live_view_select_with_aggregation.sql new file mode 100644 index 00000000000..3faaec8f623 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00974_live_view_select_with_aggregation.sql @@ -0,0 +1,18 @@ +SET allow_experimental_live_view = 1; + +DROP TABLE IF EXISTS test.lv; +DROP TABLE IF EXISTS test.mt; + +CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple(); +CREATE LIVE VIEW test.lv AS SELECT * FROM test.mt; + +INSERT INTO test.mt VALUES (1),(2),(3); + +SELECT sum(a) FROM test.lv; + +INSERT INTO test.mt VALUES (4),(5),(6); + +SELECT sum(a) FROM test.lv; + +DROP TABLE test.lv; +DROP TABLE test.mt; diff --git a/dbms/tests/queries/0_stateless/00975_live_view_create.reference b/dbms/tests/queries/0_stateless/00975_live_view_create.reference new file mode 100644 index 00000000000..e69de29bb2d diff --git a/dbms/tests/queries/0_stateless/00975_live_view_create.sql b/dbms/tests/queries/0_stateless/00975_live_view_create.sql new file mode 100644 index 00000000000..02c1644d193 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00975_live_view_create.sql @@ -0,0 +1,9 @@ +SET allow_experimental_live_view = 1; + +DROP TABLE IF EXISTS test.mt; + +CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple(); +CREATE LIVE VIEW test.lv AS SELECT * FROM test.mt; + +DROP TABLE test.lv; +DROP TABLE test.mt; diff --git a/dbms/tests/queries/0_stateless/00976_live_view_select_version.reference b/dbms/tests/queries/0_stateless/00976_live_view_select_version.reference new file mode 100644 index 00000000000..453bd800469 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00976_live_view_select_version.reference @@ -0,0 +1,3 @@ +1 1 +2 1 +3 1 diff --git a/dbms/tests/queries/0_stateless/00976_live_view_select_version.sql b/dbms/tests/queries/0_stateless/00976_live_view_select_version.sql new file mode 100644 index 00000000000..ae1c59a92d7 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00976_live_view_select_version.sql @@ -0,0 +1,14 @@ +SET allow_experimental_live_view = 1; + +DROP TABLE IF EXISTS test.lv; +DROP TABLE IF EXISTS test.mt; + +CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple(); +CREATE LIVE VIEW test.lv AS SELECT * FROM test.mt; + +INSERT INTO test.mt VALUES (1),(2),(3); + +SELECT *,_version FROM test.lv; + +DROP TABLE test.lv; +DROP TABLE test.mt; diff --git a/dbms/tests/queries/0_stateless/00977_live_view_watch_events.reference b/dbms/tests/queries/0_stateless/00977_live_view_watch_events.reference new file mode 100644 index 00000000000..01e79c32a8c --- /dev/null +++ b/dbms/tests/queries/0_stateless/00977_live_view_watch_events.reference @@ -0,0 +1,3 @@ +1 +2 +3 diff --git a/dbms/tests/queries/0_stateless/00977_live_view_watch_events.sql b/dbms/tests/queries/0_stateless/00977_live_view_watch_events.sql new file mode 100644 index 00000000000..3e0d066fb8d --- /dev/null +++ b/dbms/tests/queries/0_stateless/00977_live_view_watch_events.sql @@ -0,0 +1,20 @@ +SET allow_experimental_live_view = 1; + +DROP TABLE IF EXISTS test.lv; +DROP TABLE IF EXISTS test.mt; + +CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple(); +CREATE LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt; + +WATCH test.lv EVENTS LIMIT 0; + +INSERT INTO test.mt VALUES (1),(2),(3); + +WATCH test.lv EVENTS LIMIT 0; + +INSERT INTO test.mt VALUES (4),(5),(6); + +WATCH test.lv EVENTS LIMIT 0; + +DROP TABLE test.lv; +DROP TABLE test.mt; diff --git a/dbms/tests/queries/0_stateless/00978_live_view_watch.reference b/dbms/tests/queries/0_stateless/00978_live_view_watch.reference new file mode 100644 index 00000000000..6fbbedf1b21 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00978_live_view_watch.reference @@ -0,0 +1,3 @@ +0 1 +6 2 +21 3 diff --git a/dbms/tests/queries/0_stateless/00978_live_view_watch.sql b/dbms/tests/queries/0_stateless/00978_live_view_watch.sql new file mode 100644 index 00000000000..b8d0d93ccab --- /dev/null +++ b/dbms/tests/queries/0_stateless/00978_live_view_watch.sql @@ -0,0 +1,20 @@ +SET allow_experimental_live_view = 1; + +DROP TABLE IF EXISTS test.lv; +DROP TABLE IF EXISTS test.mt; + +CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple(); +CREATE LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt; + +WATCH test.lv LIMIT 0; + +INSERT INTO test.mt VALUES (1),(2),(3); + +WATCH test.lv LIMIT 0; + +INSERT INTO test.mt VALUES (4),(5),(6); + +WATCH test.lv LIMIT 0; + +DROP TABLE test.lv; +DROP TABLE test.mt; diff --git a/dbms/tests/queries/0_stateless/00979_live_view_watch_live.py b/dbms/tests/queries/0_stateless/00979_live_view_watch_live.py new file mode 100755 index 00000000000..8c5bc5b8eb2 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00979_live_view_watch_live.py @@ -0,0 +1,53 @@ +#!/usr/bin/env python +import os +import sys +import signal + +CURDIR = os.path.dirname(os.path.realpath(__file__)) +sys.path.insert(0, os.path.join(CURDIR, 'helpers')) + +from client import client, prompt, end_of_block + +log = None +# uncomment the line below for debugging +#log=sys.stdout + +with client(name='client1>', log=log) as client1, client(name='client2>', log=log) as client2: + client1.expect(prompt) + client2.expect(prompt) + + client1.send('SET allow_experimental_live_view = 1') + client1.expect(prompt) + client2.send('SET allow_experimental_live_view = 1') + client2.expect(prompt) + + client1.send('DROP TABLE IF EXISTS test.lv') + client1.expect(prompt) + client1.send(' DROP TABLE IF EXISTS test.mt') + client1.expect(prompt) + client1.send('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()') + client1.expect(prompt) + client1.send('CREATE LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt') + client1.expect(prompt) + client1.send('WATCH test.lv') + client1.expect(r'0.*1' + end_of_block) + client2.send('INSERT INTO test.mt VALUES (1),(2),(3)') + client1.expect(r'6.*2' + end_of_block) + client2.expect(prompt) + client2.send('INSERT INTO test.mt VALUES (4),(5),(6)') + client1.expect(r'21.*3' + end_of_block) + client2.expect(prompt) + for i in range(1,129): + client2.send('INSERT INTO test.mt VALUES (1)') + client1.expect(r'%d.*%d' % (21+i, 3+i) + end_of_block) + client2.expect(prompt) + # send Ctrl-C + client1.send('\x03', eol='') + match = client1.expect('(%s)|([#\$] )' % prompt) + if match.groups()[1]: + client1.send(client1.command) + client1.expect(prompt) + client1.send('DROP TABLE test.lv') + client1.expect(prompt) + client1.send('DROP TABLE test.mt') + client1.expect(prompt) diff --git a/dbms/tests/queries/0_stateless/00979_live_view_watch_live.reference b/dbms/tests/queries/0_stateless/00979_live_view_watch_live.reference new file mode 100644 index 00000000000..e69de29bb2d diff --git a/dbms/tests/queries/0_stateless/00980_create_temporary_live_view.reference b/dbms/tests/queries/0_stateless/00980_create_temporary_live_view.reference new file mode 100644 index 00000000000..7f9fcbb2e9c --- /dev/null +++ b/dbms/tests/queries/0_stateless/00980_create_temporary_live_view.reference @@ -0,0 +1,3 @@ +temporary_live_view_timeout 5 +live_view_heartbeat_interval 15 +0 diff --git a/dbms/tests/queries/0_stateless/00980_create_temporary_live_view.sql b/dbms/tests/queries/0_stateless/00980_create_temporary_live_view.sql new file mode 100644 index 00000000000..037c2a9e587 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00980_create_temporary_live_view.sql @@ -0,0 +1,17 @@ +SET allow_experimental_live_view = 1; + +DROP TABLE IF EXISTS test.lv; +DROP TABLE IF EXISTS test.mt; + +SELECT name, value from system.settings WHERE name = 'temporary_live_view_timeout'; +SELECT name, value from system.settings WHERE name = 'live_view_heartbeat_interval'; + +SET temporary_live_view_timeout=1; +CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple(); +CREATE TEMPORARY LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt; + +SHOW TABLES LIKE 'lv'; +SELECT sleep(2); +SHOW TABLES LIKE 'lv'; + +DROP TABLE test.mt; diff --git a/dbms/tests/queries/0_stateless/00991_live_view_watch_event_live.python b/dbms/tests/queries/0_stateless/00991_live_view_watch_event_live.python new file mode 100644 index 00000000000..782671cdfaf --- /dev/null +++ b/dbms/tests/queries/0_stateless/00991_live_view_watch_event_live.python @@ -0,0 +1,81 @@ +#!/usr/bin/env python + +import subprocess +import threading +import Queue as queue +import os +import sys +import signal + + +CLICKHOUSE_CLIENT = os.environ.get('CLICKHOUSE_CLIENT') +CLICKHOUSE_CURL = os.environ.get('CLICKHOUSE_CURL') +CLICKHOUSE_URL = os.environ.get('CLICKHOUSE_URL') + + +def send_query(query): + cmd = list(CLICKHOUSE_CLIENT.split()) + cmd += ['--query', query] + # print(cmd) + return subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT).stdout + + +def send_query_in_process_group(query): + cmd = list(CLICKHOUSE_CLIENT.split()) + cmd += ['--query', query] + # print(cmd) + return subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, preexec_fn=os.setsid) + + +def read_lines_and_push_to_queue(pipe, queue): + try: + for line in iter(pipe.readline, ''): + line = line.strip() + print(line) + sys.stdout.flush() + queue.put(line) + except KeyboardInterrupt: + pass + + queue.put(None) + + +def test(): + send_query('DROP TABLE IF EXISTS test.lv').read() + send_query('DROP TABLE IF EXISTS test.mt').read() + send_query('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()').read() + send_query('CREATE LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt').read() + + q = queue.Queue() + p = send_query_in_process_group('WATCH test.lv') + thread = threading.Thread(target=read_lines_and_push_to_queue, args=(p.stdout, q)) + thread.start() + + line = q.get() + print(line) + assert (line == '0\t1') + + send_query('INSERT INTO test.mt VALUES (1),(2),(3)').read() + line = q.get() + print(line) + assert (line == '6\t2') + + send_query('INSERT INTO test.mt VALUES (4),(5),(6)').read() + line = q.get() + print(line) + assert (line == '21\t3') + + # Send Ctrl+C to client. + os.killpg(os.getpgid(p.pid), signal.SIGINT) + # This insert shouldn't affect lv. + send_query('INSERT INTO test.mt VALUES (7),(8),(9)').read() + line = q.get() + print(line) + assert (line is None) + + send_query('DROP TABLE if exists test.lv').read() + send_query('DROP TABLE if exists test.lv').read() + + thread.join() + +test() diff --git a/dbms/tests/queries/0_stateless/00991_live_view_watch_event_live.reference b/dbms/tests/queries/0_stateless/00991_live_view_watch_event_live.reference new file mode 100644 index 00000000000..1e94cdade41 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00991_live_view_watch_event_live.reference @@ -0,0 +1,7 @@ +0 1 +0 1 +6 2 +6 2 +21 3 +21 3 +None diff --git a/dbms/tests/queries/0_stateless/00991_live_view_watch_http.python b/dbms/tests/queries/0_stateless/00991_live_view_watch_http.python new file mode 100755 index 00000000000..938547ca0cb --- /dev/null +++ b/dbms/tests/queries/0_stateless/00991_live_view_watch_http.python @@ -0,0 +1,63 @@ +#!/usr/bin/env python + +import subprocess +import threading +import Queue as queue +import os +import sys + + +CLICKHOUSE_CLIENT = os.environ.get('CLICKHOUSE_CLIENT') +CLICKHOUSE_CURL = os.environ.get('CLICKHOUSE_CURL') +CLICKHOUSE_URL = os.environ.get('CLICKHOUSE_URL') + + +def send_query(query): + cmd = list(CLICKHOUSE_CLIENT.split()) + cmd += ['--query', query] + # print(cmd) + return subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT).stdout + + +def send_http_query(query): + cmd = list(CLICKHOUSE_CURL.split()) # list(['curl', '-sSN', '--max-time', '10']) + cmd += ['-sSN', CLICKHOUSE_URL, '-d', query] + return subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT).stdout + + +def read_lines_and_push_to_queue(pipe, queue): + for line in iter(pipe.readline, ''): + line = line.strip() + print(line) + sys.stdout.flush() + queue.put(line) + + queue.put(None) + + +def test(): + send_query('DROP TABLE IF EXISTS test.lv').read() + send_query('DROP TABLE IF EXISTS test.mt').read() + send_query('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()').read() + send_query('CREATE LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt').read() + + q = queue.Queue() + pipe = send_http_query('WATCH test.lv') + thread = threading.Thread(target=read_lines_and_push_to_queue, args=(pipe, q)) + thread.start() + + line = q.get() + print(line) + assert (line == '0\t1') + + send_query('INSERT INTO test.mt VALUES (1),(2),(3)').read() + line = q.get() + print(line) + assert (line == '6\t2') + + send_query('DROP TABLE if exists test.lv').read() + send_query('DROP TABLE if exists test.lv').read() + + thread.join() + +test() diff --git a/dbms/tests/queries/0_stateless/00991_live_view_watch_http.reference b/dbms/tests/queries/0_stateless/00991_live_view_watch_http.reference new file mode 100644 index 00000000000..489457d751b --- /dev/null +++ b/dbms/tests/queries/0_stateless/00991_live_view_watch_http.reference @@ -0,0 +1,4 @@ +0 1 +0 1 +6 2 +6 2 diff --git a/dbms/tests/queries/0_stateless/00991_temporary_live_view_watch_events_heartbeat.python b/dbms/tests/queries/0_stateless/00991_temporary_live_view_watch_events_heartbeat.python new file mode 100644 index 00000000000..70063adc6e3 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00991_temporary_live_view_watch_events_heartbeat.python @@ -0,0 +1,83 @@ +#!/usr/bin/env python + +import subprocess +import threading +import Queue as queue +import os +import sys +import signal + + +CLICKHOUSE_CLIENT = os.environ.get('CLICKHOUSE_CLIENT') +CLICKHOUSE_CURL = os.environ.get('CLICKHOUSE_CURL') +CLICKHOUSE_URL = os.environ.get('CLICKHOUSE_URL') + + +def send_query(query): + cmd = list(CLICKHOUSE_CLIENT.split()) + cmd += ['--query', query] + # print(cmd) + return subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT).stdout + + +def send_query_in_process_group(query): + cmd = list(CLICKHOUSE_CLIENT.split()) + cmd += ['--query', query, '--live_view_heartbeat_interval=1', '--progress'] + # print(cmd) + return subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, preexec_fn=os.setsid) + + +def read_lines_and_push_to_queue(pipe, queue): + try: + for line in iter(pipe.readline, ''): + line = line.strip() + # print(line) + sys.stdout.flush() + queue.put(line) + except KeyboardInterrupt: + pass + + queue.put(None) + + +def test(): + send_query('DROP TABLE IF EXISTS test.lv').read() + send_query('DROP TABLE IF EXISTS test.mt').read() + send_query('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()').read() + send_query('CREATE TEMPORARY LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt').read() + + q = queue.Queue() + p = send_query_in_process_group('WATCH test.lv') + thread = threading.Thread(target=read_lines_and_push_to_queue, args=(p.stdout, q)) + thread.start() + + line = q.get() + # print(line) + assert (line.endswith('0\t1')) + assert ('Progress: 0.00 rows' in line) + + send_query('INSERT INTO test.mt VALUES (1),(2),(3)').read() + line = q.get() + assert (line.endswith('6\t2')) + assert ('Progress: 1.00 rows' in line) + + # send_query('INSERT INTO test.mt VALUES (4),(5),(6)').read() + # line = q.get() + # print(line) + # assert (line.endswith('6\t2')) + # assert ('Progress: 1.00 rows' in line) + + # Send Ctrl+C to client. + os.killpg(os.getpgid(p.pid), signal.SIGINT) + # This insert shouldn't affect lv. + send_query('INSERT INTO test.mt VALUES (7),(8),(9)').read() + line = q.get() + # print(line) + # assert (line is None) + + send_query('DROP TABLE if exists test.lv').read() + send_query('DROP TABLE if exists test.lv').read() + + thread.join() + +test() diff --git a/dbms/tests/queries/0_stateless/00991_temporary_live_view_watch_events_heartbeat.reference b/dbms/tests/queries/0_stateless/00991_temporary_live_view_watch_events_heartbeat.reference new file mode 100644 index 00000000000..e69de29bb2d diff --git a/dbms/tests/queries/0_stateless/00991_temporary_live_view_watch_live.python b/dbms/tests/queries/0_stateless/00991_temporary_live_view_watch_live.python new file mode 100644 index 00000000000..d290018a02c --- /dev/null +++ b/dbms/tests/queries/0_stateless/00991_temporary_live_view_watch_live.python @@ -0,0 +1,81 @@ +#!/usr/bin/env python + +import subprocess +import threading +import Queue as queue +import os +import sys +import signal + + +CLICKHOUSE_CLIENT = os.environ.get('CLICKHOUSE_CLIENT') +CLICKHOUSE_CURL = os.environ.get('CLICKHOUSE_CURL') +CLICKHOUSE_URL = os.environ.get('CLICKHOUSE_URL') + + +def send_query(query): + cmd = list(CLICKHOUSE_CLIENT.split()) + cmd += ['--query', query] + # print(cmd) + return subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT).stdout + + +def send_query_in_process_group(query): + cmd = list(CLICKHOUSE_CLIENT.split()) + cmd += ['--query', query] + # print(cmd) + return subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, preexec_fn=os.setsid) + + +def read_lines_and_push_to_queue(pipe, queue): + try: + for line in iter(pipe.readline, ''): + line = line.strip() + print(line) + sys.stdout.flush() + queue.put(line) + except KeyboardInterrupt: + pass + + queue.put(None) + + +def test(): + send_query('DROP TABLE IF EXISTS test.lv').read() + send_query('DROP TABLE IF EXISTS test.mt').read() + send_query('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()').read() + send_query('CREATE TEMPORARY LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt').read() + + q = queue.Queue() + p = send_query_in_process_group('WATCH test.lv') + thread = threading.Thread(target=read_lines_and_push_to_queue, args=(p.stdout, q)) + thread.start() + + line = q.get() + print(line) + assert (line == '0\t1') + + send_query('INSERT INTO test.mt VALUES (1),(2),(3)').read() + line = q.get() + print(line) + assert (line == '6\t2') + + send_query('INSERT INTO test.mt VALUES (4),(5),(6)').read() + line = q.get() + print(line) + assert (line == '21\t3') + + # Send Ctrl+C to client. + os.killpg(os.getpgid(p.pid), signal.SIGINT) + # This insert shouldn't affect lv. + send_query('INSERT INTO test.mt VALUES (7),(8),(9)').read() + line = q.get() + print(line) + assert (line is None) + + send_query('DROP TABLE if exists test.lv').read() + send_query('DROP TABLE if exists test.lv').read() + + thread.join() + +test() diff --git a/dbms/tests/queries/0_stateless/00991_temporary_live_view_watch_live.reference b/dbms/tests/queries/0_stateless/00991_temporary_live_view_watch_live.reference new file mode 100644 index 00000000000..1e94cdade41 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00991_temporary_live_view_watch_live.reference @@ -0,0 +1,7 @@ +0 1 +0 1 +6 2 +6 2 +21 3 +21 3 +None