mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-28 10:31:57 +00:00
Revert "Merge pull request #6770 from yandex/temporary-remove-live-view-tests"
This reverts commit6cf5327269
, reversing changes made to4155771106
.
This commit is contained in:
parent
4952c6bc65
commit
ffe8931cda
47
dbms/tests/queries/0_stateless/00960_live_view_watch_events_live.py
Executable file
47
dbms/tests/queries/0_stateless/00960_live_view_watch_events_live.py
Executable file
@ -0,0 +1,47 @@
|
|||||||
|
#!/usr/bin/env python
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
import signal
|
||||||
|
|
||||||
|
CURDIR = os.path.dirname(os.path.realpath(__file__))
|
||||||
|
sys.path.insert(0, os.path.join(CURDIR, 'helpers'))
|
||||||
|
|
||||||
|
from client import client, prompt, end_of_block
|
||||||
|
|
||||||
|
log = None
|
||||||
|
# uncomment the line below for debugging
|
||||||
|
#log=sys.stdout
|
||||||
|
|
||||||
|
with client(name='client1>', log=log) as client1, client(name='client2>', log=log) as client2:
|
||||||
|
client1.expect(prompt)
|
||||||
|
client2.expect(prompt)
|
||||||
|
|
||||||
|
client1.send('SET allow_experimental_live_view = 1')
|
||||||
|
client1.expect(prompt)
|
||||||
|
client2.send('SET allow_experimental_live_view = 1')
|
||||||
|
client2.expect(prompt)
|
||||||
|
|
||||||
|
client1.send('DROP TABLE IF EXISTS test.lv')
|
||||||
|
client1.expect(prompt)
|
||||||
|
client1.send(' DROP TABLE IF EXISTS test.mt')
|
||||||
|
client1.expect(prompt)
|
||||||
|
client1.send('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()')
|
||||||
|
client1.expect(prompt)
|
||||||
|
client1.send('CREATE LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt')
|
||||||
|
client1.expect(prompt)
|
||||||
|
client1.send('WATCH test.lv EVENTS')
|
||||||
|
client1.expect('1.*' + end_of_block)
|
||||||
|
client2.send('INSERT INTO test.mt VALUES (1),(2),(3)')
|
||||||
|
client1.expect('2.*' + end_of_block)
|
||||||
|
client2.send('INSERT INTO test.mt VALUES (4),(5),(6)')
|
||||||
|
client1.expect('3.*' + end_of_block)
|
||||||
|
# send Ctrl-C
|
||||||
|
client1.send('\x03', eol='')
|
||||||
|
match = client1.expect('(%s)|([#\$] )' % prompt)
|
||||||
|
if match.groups()[1]:
|
||||||
|
client1.send(client1.command)
|
||||||
|
client1.expect(prompt)
|
||||||
|
client1.send('DROP TABLE test.lv')
|
||||||
|
client1.expect(prompt)
|
||||||
|
client1.send('DROP TABLE test.mt')
|
||||||
|
client1.expect(prompt)
|
@ -0,0 +1,3 @@
|
|||||||
|
0 1
|
||||||
|
6 2
|
||||||
|
21 3
|
@ -0,0 +1,20 @@
|
|||||||
|
SET allow_experimental_live_view = 1;
|
||||||
|
|
||||||
|
DROP TABLE IF EXISTS test.lv;
|
||||||
|
DROP TABLE IF EXISTS test.mt;
|
||||||
|
|
||||||
|
CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple();
|
||||||
|
CREATE TEMPORARY LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt;
|
||||||
|
|
||||||
|
WATCH test.lv LIMIT 0;
|
||||||
|
|
||||||
|
INSERT INTO test.mt VALUES (1),(2),(3);
|
||||||
|
|
||||||
|
WATCH test.lv LIMIT 0;
|
||||||
|
|
||||||
|
INSERT INTO test.mt VALUES (4),(5),(6);
|
||||||
|
|
||||||
|
WATCH test.lv LIMIT 0;
|
||||||
|
|
||||||
|
DROP TABLE test.lv;
|
||||||
|
DROP TABLE test.mt;
|
47
dbms/tests/queries/0_stateless/00962_temporary_live_view_watch_live.py
Executable file
47
dbms/tests/queries/0_stateless/00962_temporary_live_view_watch_live.py
Executable file
@ -0,0 +1,47 @@
|
|||||||
|
#!/usr/bin/env python
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
import signal
|
||||||
|
|
||||||
|
CURDIR = os.path.dirname(os.path.realpath(__file__))
|
||||||
|
sys.path.insert(0, os.path.join(CURDIR, 'helpers'))
|
||||||
|
|
||||||
|
from client import client, prompt, end_of_block
|
||||||
|
|
||||||
|
log = None
|
||||||
|
# uncomment the line below for debugging
|
||||||
|
#log=sys.stdout
|
||||||
|
|
||||||
|
with client(name='client1>', log=log) as client1, client(name='client2>', log=log) as client2:
|
||||||
|
client1.expect(prompt)
|
||||||
|
client2.expect(prompt)
|
||||||
|
|
||||||
|
client1.send('SET allow_experimental_live_view = 1')
|
||||||
|
client1.expect(prompt)
|
||||||
|
client2.send('SET allow_experimental_live_view = 1')
|
||||||
|
client2.expect(prompt)
|
||||||
|
|
||||||
|
client1.send('DROP TABLE IF EXISTS test.lv')
|
||||||
|
client1.expect(prompt)
|
||||||
|
client1.send('DROP TABLE IF EXISTS test.mt')
|
||||||
|
client1.expect(prompt)
|
||||||
|
client1.send('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()')
|
||||||
|
client1.expect(prompt)
|
||||||
|
client1.send('CREATE TEMPORARY LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt')
|
||||||
|
client1.expect(prompt)
|
||||||
|
client1.send('WATCH test.lv')
|
||||||
|
client1.expect(r'0.*1' + end_of_block)
|
||||||
|
client2.send('INSERT INTO test.mt VALUES (1),(2),(3)')
|
||||||
|
client1.expect(r'6.*2' + end_of_block)
|
||||||
|
client2.send('INSERT INTO test.mt VALUES (4),(5),(6)')
|
||||||
|
client1.expect(r'21.*3' + end_of_block)
|
||||||
|
# send Ctrl-C
|
||||||
|
client1.send('\x03', eol='')
|
||||||
|
match = client1.expect('(%s)|([#\$] )' % prompt)
|
||||||
|
if match.groups()[1]:
|
||||||
|
client1.send(client1.command)
|
||||||
|
client1.expect(prompt)
|
||||||
|
client1.send('DROP TABLE test.lv')
|
||||||
|
client1.expect(prompt)
|
||||||
|
client1.send('DROP TABLE test.mt')
|
||||||
|
client1.expect(prompt)
|
@ -0,0 +1,54 @@
|
|||||||
|
#!/usr/bin/env python
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
import signal
|
||||||
|
|
||||||
|
CURDIR = os.path.dirname(os.path.realpath(__file__))
|
||||||
|
sys.path.insert(0, os.path.join(CURDIR, 'helpers'))
|
||||||
|
|
||||||
|
from client import client, prompt, end_of_block
|
||||||
|
|
||||||
|
log = None
|
||||||
|
# uncomment the line below for debugging
|
||||||
|
#log=sys.stdout
|
||||||
|
|
||||||
|
with client(name='client1>', log=log) as client1, client(name='client2>', log=log) as client2:
|
||||||
|
client1.expect(prompt)
|
||||||
|
client2.expect(prompt)
|
||||||
|
|
||||||
|
client1.send('SET allow_experimental_live_view = 1')
|
||||||
|
client1.expect(prompt)
|
||||||
|
client2.send('SET allow_experimental_live_view = 1')
|
||||||
|
client2.expect(prompt)
|
||||||
|
|
||||||
|
client1.send('DROP TABLE IF EXISTS test.lv')
|
||||||
|
client1.expect(prompt)
|
||||||
|
client1.send('DROP TABLE IF EXISTS test.mt')
|
||||||
|
client1.expect(prompt)
|
||||||
|
client1.send('SET temporary_live_view_timeout=1')
|
||||||
|
client1.expect(prompt)
|
||||||
|
client1.send('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()')
|
||||||
|
client1.expect(prompt)
|
||||||
|
client1.send('CREATE TEMPORARY LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt')
|
||||||
|
client1.expect(prompt)
|
||||||
|
client1.send('WATCH test.lv')
|
||||||
|
client1.expect(r'0.*1' + end_of_block)
|
||||||
|
client2.send('INSERT INTO test.mt VALUES (1),(2),(3)')
|
||||||
|
client2.expect(prompt)
|
||||||
|
client1.expect(r'6.*2' + end_of_block)
|
||||||
|
client2.send('INSERT INTO test.mt VALUES (4),(5),(6)')
|
||||||
|
client2.expect(prompt)
|
||||||
|
client1.expect(r'21.*3' + end_of_block)
|
||||||
|
# send Ctrl-C
|
||||||
|
client1.send('\x03', eol='')
|
||||||
|
match = client1.expect('(%s)|([#\$] )' % prompt)
|
||||||
|
if match.groups()[1]:
|
||||||
|
client1.send(client1.command)
|
||||||
|
client1.expect(prompt)
|
||||||
|
client1.send('SELECT sleep(1)')
|
||||||
|
client1.expect(prompt)
|
||||||
|
client1.send('DROP TABLE test.lv')
|
||||||
|
client1.expect('Table test.lv doesn\'t exist')
|
||||||
|
client1.expect(prompt)
|
||||||
|
client1.send('DROP TABLE test.mt')
|
||||||
|
client1.expect(prompt)
|
49
dbms/tests/queries/0_stateless/00964_live_view_watch_events_heartbeat.py
Executable file
49
dbms/tests/queries/0_stateless/00964_live_view_watch_events_heartbeat.py
Executable file
@ -0,0 +1,49 @@
|
|||||||
|
#!/usr/bin/env python
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
import signal
|
||||||
|
|
||||||
|
CURDIR = os.path.dirname(os.path.realpath(__file__))
|
||||||
|
sys.path.insert(0, os.path.join(CURDIR, 'helpers'))
|
||||||
|
|
||||||
|
from client import client, prompt, end_of_block
|
||||||
|
|
||||||
|
log = None
|
||||||
|
# uncomment the line below for debugging
|
||||||
|
#log=sys.stdout
|
||||||
|
|
||||||
|
with client(name='client1>', log=log) as client1, client(name='client2>', log=log) as client2:
|
||||||
|
client1.expect(prompt)
|
||||||
|
client2.expect(prompt)
|
||||||
|
|
||||||
|
client1.send('SET allow_experimental_live_view = 1')
|
||||||
|
client1.expect(prompt)
|
||||||
|
client2.send('SET allow_experimental_live_view = 1')
|
||||||
|
client2.expect(prompt)
|
||||||
|
|
||||||
|
client1.send('DROP TABLE IF EXISTS test.lv')
|
||||||
|
client1.expect(prompt)
|
||||||
|
client1.send(' DROP TABLE IF EXISTS test.mt')
|
||||||
|
client1.expect(prompt)
|
||||||
|
client1.send('SET live_view_heartbeat_interval=1')
|
||||||
|
client1.expect(prompt)
|
||||||
|
client1.send('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()')
|
||||||
|
client1.expect(prompt)
|
||||||
|
client1.send('CREATE TEMPORARY LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt')
|
||||||
|
client1.expect(prompt)
|
||||||
|
client1.send('WATCH test.lv EVENTS')
|
||||||
|
client2.send('INSERT INTO test.mt VALUES (1),(2),(3)')
|
||||||
|
client1.expect('2.*' + end_of_block)
|
||||||
|
client1.expect('Progress: 2.00 rows.*\)')
|
||||||
|
# wait for heartbeat
|
||||||
|
client1.expect('Progress: 2.00 rows.*\)')
|
||||||
|
# send Ctrl-C
|
||||||
|
client1.send('\x03', eol='')
|
||||||
|
match = client1.expect('(%s)|([#\$] )' % prompt)
|
||||||
|
if match.groups()[1]:
|
||||||
|
client1.send(client1.command)
|
||||||
|
client1.expect(prompt)
|
||||||
|
client1.send('DROP TABLE test.lv')
|
||||||
|
client1.expect(prompt)
|
||||||
|
client1.send('DROP TABLE test.mt')
|
||||||
|
client1.expect(prompt)
|
50
dbms/tests/queries/0_stateless/00965_live_view_watch_heartbeat.py
Executable file
50
dbms/tests/queries/0_stateless/00965_live_view_watch_heartbeat.py
Executable file
@ -0,0 +1,50 @@
|
|||||||
|
#!/usr/bin/env python
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
import signal
|
||||||
|
|
||||||
|
CURDIR = os.path.dirname(os.path.realpath(__file__))
|
||||||
|
sys.path.insert(0, os.path.join(CURDIR, 'helpers'))
|
||||||
|
|
||||||
|
from client import client, prompt, end_of_block
|
||||||
|
|
||||||
|
log = None
|
||||||
|
# uncomment the line below for debugging
|
||||||
|
#log=sys.stdout
|
||||||
|
|
||||||
|
with client(name='client1>', log=log) as client1, client(name='client2>', log=log) as client2:
|
||||||
|
client1.expect(prompt)
|
||||||
|
client2.expect(prompt)
|
||||||
|
|
||||||
|
client1.send('SET allow_experimental_live_view = 1')
|
||||||
|
client1.expect(prompt)
|
||||||
|
client2.send('SET allow_experimental_live_view = 1')
|
||||||
|
client2.expect(prompt)
|
||||||
|
|
||||||
|
client1.send('DROP TABLE IF EXISTS test.lv')
|
||||||
|
client1.expect(prompt)
|
||||||
|
client1.send(' DROP TABLE IF EXISTS test.mt')
|
||||||
|
client1.expect(prompt)
|
||||||
|
client1.send('SET live_view_heartbeat_interval=1')
|
||||||
|
client1.expect(prompt)
|
||||||
|
client1.send('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()')
|
||||||
|
client1.expect(prompt)
|
||||||
|
client1.send('CREATE TEMPORARY LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt')
|
||||||
|
client1.expect(prompt)
|
||||||
|
client1.send('WATCH test.lv')
|
||||||
|
client1.expect(r'0.*1' + end_of_block)
|
||||||
|
client2.send('INSERT INTO test.mt VALUES (1),(2),(3)')
|
||||||
|
client1.expect(r'6.*2' + end_of_block)
|
||||||
|
client1.expect('Progress: 2.00 rows.*\)')
|
||||||
|
# wait for heartbeat
|
||||||
|
client1.expect('Progress: 2.00 rows.*\)')
|
||||||
|
# send Ctrl-C
|
||||||
|
client1.send('\x03', eol='')
|
||||||
|
match = client1.expect('(%s)|([#\$] )' % prompt)
|
||||||
|
if match.groups()[1]:
|
||||||
|
client1.send(client1.command)
|
||||||
|
client1.expect(prompt)
|
||||||
|
client1.send('DROP TABLE test.lv')
|
||||||
|
client1.expect(prompt)
|
||||||
|
client1.send('DROP TABLE test.mt')
|
||||||
|
client1.expect(prompt)
|
40
dbms/tests/queries/0_stateless/00966_live_view_watch_events_http.py
Executable file
40
dbms/tests/queries/0_stateless/00966_live_view_watch_events_http.py
Executable file
@ -0,0 +1,40 @@
|
|||||||
|
#!/usr/bin/env python
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
|
||||||
|
CURDIR = os.path.dirname(os.path.realpath(__file__))
|
||||||
|
sys.path.insert(0, os.path.join(CURDIR, 'helpers'))
|
||||||
|
|
||||||
|
from client import client, prompt, end_of_block
|
||||||
|
from httpclient import client as http_client
|
||||||
|
|
||||||
|
log = None
|
||||||
|
# uncomment the line below for debugging
|
||||||
|
#log=sys.stdout
|
||||||
|
|
||||||
|
with client(name='client1>', log=log) as client1:
|
||||||
|
client1.expect(prompt)
|
||||||
|
|
||||||
|
client1.send('SET allow_experimental_live_view = 1')
|
||||||
|
client1.expect(prompt)
|
||||||
|
|
||||||
|
client1.send('DROP TABLE IF EXISTS test.lv')
|
||||||
|
client1.expect(prompt)
|
||||||
|
client1.send(' DROP TABLE IF EXISTS test.mt')
|
||||||
|
client1.expect(prompt)
|
||||||
|
client1.send('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()')
|
||||||
|
client1.expect(prompt)
|
||||||
|
client1.send('CREATE LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt')
|
||||||
|
client1.expect(prompt)
|
||||||
|
|
||||||
|
|
||||||
|
with http_client({'method':'GET', 'url': '/?allow_experimental_live_view=1&query=WATCH%20test.lv%20EVENTS'}, name='client2>', log=log) as client2:
|
||||||
|
client2.expect('.*1\n')
|
||||||
|
client1.send('INSERT INTO test.mt VALUES (1),(2),(3)')
|
||||||
|
client1.expect(prompt)
|
||||||
|
client2.expect('.*2\n')
|
||||||
|
|
||||||
|
client1.send('DROP TABLE test.lv')
|
||||||
|
client1.expect(prompt)
|
||||||
|
client1.send('DROP TABLE test.mt')
|
||||||
|
client1.expect(prompt)
|
40
dbms/tests/queries/0_stateless/00967_live_view_watch_http.py
Executable file
40
dbms/tests/queries/0_stateless/00967_live_view_watch_http.py
Executable file
@ -0,0 +1,40 @@
|
|||||||
|
#!/usr/bin/env python
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
|
||||||
|
CURDIR = os.path.dirname(os.path.realpath(__file__))
|
||||||
|
sys.path.insert(0, os.path.join(CURDIR, 'helpers'))
|
||||||
|
|
||||||
|
from client import client, prompt, end_of_block
|
||||||
|
from httpclient import client as http_client
|
||||||
|
|
||||||
|
log = None
|
||||||
|
# uncomment the line below for debugging
|
||||||
|
#log=sys.stdout
|
||||||
|
|
||||||
|
with client(name='client1>', log=log) as client1:
|
||||||
|
client1.expect(prompt)
|
||||||
|
|
||||||
|
client1.send('SET allow_experimental_live_view = 1')
|
||||||
|
client1.expect(prompt)
|
||||||
|
|
||||||
|
client1.send('DROP TABLE IF EXISTS test.lv')
|
||||||
|
client1.expect(prompt)
|
||||||
|
client1.send(' DROP TABLE IF EXISTS test.mt')
|
||||||
|
client1.expect(prompt)
|
||||||
|
client1.send('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()')
|
||||||
|
client1.expect(prompt)
|
||||||
|
client1.send('CREATE LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt')
|
||||||
|
client1.expect(prompt)
|
||||||
|
|
||||||
|
|
||||||
|
with http_client({'method':'GET', 'url':'/?allow_experimental_live_view=1&query=WATCH%20test.lv'}, name='client2>', log=log) as client2:
|
||||||
|
client2.expect('.*0\t1\n')
|
||||||
|
client1.send('INSERT INTO test.mt VALUES (1),(2),(3)')
|
||||||
|
client1.expect(prompt)
|
||||||
|
client2.expect('.*6\t2\n')
|
||||||
|
|
||||||
|
client1.send('DROP TABLE test.lv')
|
||||||
|
client1.expect(prompt)
|
||||||
|
client1.send('DROP TABLE test.mt')
|
||||||
|
client1.expect(prompt)
|
@ -0,0 +1,4 @@
|
|||||||
|
{"row":{"a":1}}
|
||||||
|
{"row":{"a":2}}
|
||||||
|
{"row":{"a":3}}
|
||||||
|
{"progress":{"read_rows":"3","read_bytes":"36","written_rows":"0","written_bytes":"0","total_rows_to_read":"0"}}
|
@ -0,0 +1,14 @@
|
|||||||
|
SET allow_experimental_live_view = 1;
|
||||||
|
|
||||||
|
DROP TABLE IF EXISTS test.lv;
|
||||||
|
DROP TABLE IF EXISTS test.mt;
|
||||||
|
|
||||||
|
CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple();
|
||||||
|
CREATE LIVE VIEW test.lv AS SELECT * FROM test.mt;
|
||||||
|
|
||||||
|
INSERT INTO test.mt VALUES (1),(2),(3);
|
||||||
|
|
||||||
|
SELECT * FROM test.lv FORMAT JSONEachRowWithProgress;
|
||||||
|
|
||||||
|
DROP TABLE test.lv;
|
||||||
|
DROP TABLE test.mt;
|
@ -0,0 +1,6 @@
|
|||||||
|
{"row":{"sum(a)":"0","_version":"1"}}
|
||||||
|
{"progress":{"read_rows":"1","read_bytes":"16","written_rows":"0","written_bytes":"0","total_rows_to_read":"0"}}
|
||||||
|
{"row":{"sum(a)":"6","_version":"2"}}
|
||||||
|
{"progress":{"read_rows":"1","read_bytes":"16","written_rows":"0","written_bytes":"0","total_rows_to_read":"0"}}
|
||||||
|
{"row":{"sum(a)":"21","_version":"3"}}
|
||||||
|
{"progress":{"read_rows":"1","read_bytes":"16","written_rows":"0","written_bytes":"0","total_rows_to_read":"0"}}
|
@ -0,0 +1,20 @@
|
|||||||
|
SET allow_experimental_live_view = 1;
|
||||||
|
|
||||||
|
DROP TABLE IF EXISTS test.lv;
|
||||||
|
DROP TABLE IF EXISTS test.mt;
|
||||||
|
|
||||||
|
CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple();
|
||||||
|
CREATE LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt;
|
||||||
|
|
||||||
|
WATCH test.lv LIMIT 0 FORMAT JSONEachRowWithProgress;
|
||||||
|
|
||||||
|
INSERT INTO test.mt VALUES (1),(2),(3);
|
||||||
|
|
||||||
|
WATCH test.lv LIMIT 0 FORMAT JSONEachRowWithProgress;
|
||||||
|
|
||||||
|
INSERT INTO test.mt VALUES (4),(5),(6);
|
||||||
|
|
||||||
|
WATCH test.lv LIMIT 0 FORMAT JSONEachRowWithProgress;
|
||||||
|
|
||||||
|
DROP TABLE test.lv;
|
||||||
|
DROP TABLE test.mt;
|
@ -0,0 +1,45 @@
|
|||||||
|
#!/usr/bin/env python
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
|
||||||
|
CURDIR = os.path.dirname(os.path.realpath(__file__))
|
||||||
|
sys.path.insert(0, os.path.join(CURDIR, 'helpers'))
|
||||||
|
|
||||||
|
from client import client, prompt, end_of_block
|
||||||
|
from httpclient import client as http_client
|
||||||
|
|
||||||
|
log = None
|
||||||
|
# uncomment the line below for debugging
|
||||||
|
#log=sys.stdout
|
||||||
|
|
||||||
|
with client(name='client1>', log=log) as client1:
|
||||||
|
client1.expect(prompt)
|
||||||
|
|
||||||
|
client1.send('SET allow_experimental_live_view = 1')
|
||||||
|
client1.expect(prompt)
|
||||||
|
|
||||||
|
client1.send('DROP TABLE IF EXISTS test.lv')
|
||||||
|
client1.expect(prompt)
|
||||||
|
client1.send(' DROP TABLE IF EXISTS test.mt')
|
||||||
|
client1.expect(prompt)
|
||||||
|
client1.send('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()')
|
||||||
|
client1.expect(prompt)
|
||||||
|
client1.send('CREATE LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt')
|
||||||
|
client1.expect(prompt)
|
||||||
|
|
||||||
|
with http_client({'method':'GET', 'url': '/?allow_experimental_live_view=1&live_view_heartbeat_interval=1&query=WATCH%20test.lv%20EVENTS%20FORMAT%20JSONEachRowWithProgress'}, name='client2>', log=log) as client2:
|
||||||
|
client2.expect('{"progress":{"read_rows":"1","read_bytes":"8","written_rows":"0","written_bytes":"0","total_rows_to_read":"0"}}\n', escape=True)
|
||||||
|
client2.expect('{"row":{"version":"1"}', escape=True)
|
||||||
|
client2.expect('{"progress":{"read_rows":"1","read_bytes":"8","written_rows":"0","written_bytes":"0","total_rows_to_read":"0"}}', escape=True)
|
||||||
|
# heartbeat is provided by progress message
|
||||||
|
client2.expect('{"progress":{"read_rows":"1","read_bytes":"8","written_rows":"0","written_bytes":"0","total_rows_to_read":"0"}}', escape=True)
|
||||||
|
|
||||||
|
client1.send('INSERT INTO test.mt VALUES (1),(2),(3)')
|
||||||
|
client1.expect(prompt)
|
||||||
|
|
||||||
|
client2.expect('{"row":{"version":"2"}}\n', escape=True)
|
||||||
|
|
||||||
|
client1.send('DROP TABLE test.lv')
|
||||||
|
client1.expect(prompt)
|
||||||
|
client1.send('DROP TABLE test.mt')
|
||||||
|
client1.expect(prompt)
|
46
dbms/tests/queries/0_stateless/00971_live_view_watch_http_heartbeat.py
Executable file
46
dbms/tests/queries/0_stateless/00971_live_view_watch_http_heartbeat.py
Executable file
@ -0,0 +1,46 @@
|
|||||||
|
#!/usr/bin/env python
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
|
||||||
|
CURDIR = os.path.dirname(os.path.realpath(__file__))
|
||||||
|
sys.path.insert(0, os.path.join(CURDIR, 'helpers'))
|
||||||
|
|
||||||
|
from client import client, prompt, end_of_block
|
||||||
|
from httpclient import client as http_client
|
||||||
|
|
||||||
|
log = None
|
||||||
|
# uncomment the line below for debugging
|
||||||
|
#log=sys.stdout
|
||||||
|
|
||||||
|
with client(name='client1>', log=log) as client1:
|
||||||
|
client1.expect(prompt)
|
||||||
|
|
||||||
|
client1.send('SET allow_experimental_live_view = 1')
|
||||||
|
client1.expect(prompt)
|
||||||
|
|
||||||
|
client1.send('DROP TABLE IF EXISTS test.lv')
|
||||||
|
client1.expect(prompt)
|
||||||
|
client1.send(' DROP TABLE IF EXISTS test.mt')
|
||||||
|
client1.expect(prompt)
|
||||||
|
client1.send('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()')
|
||||||
|
client1.expect(prompt)
|
||||||
|
client1.send('CREATE LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt')
|
||||||
|
client1.expect(prompt)
|
||||||
|
|
||||||
|
with http_client({'method':'GET', 'url':'/?allow_experimental_live_view=1&live_view_heartbeat_interval=1&query=WATCH%20test.lv%20FORMAT%20JSONEachRowWithProgress'}, name='client2>', log=log) as client2:
|
||||||
|
client2.expect('"progress".*',)
|
||||||
|
client2.expect('{"row":{"sum(a)":"0","_version":"1"}}\n', escape=True)
|
||||||
|
client2.expect('"progress".*\n')
|
||||||
|
# heartbeat is provided by progress message
|
||||||
|
client2.expect('"progress".*\n')
|
||||||
|
|
||||||
|
client1.send('INSERT INTO test.mt VALUES (1),(2),(3)')
|
||||||
|
client1.expect(prompt)
|
||||||
|
|
||||||
|
client2.expect('"progress".*"read_rows":"2".*\n')
|
||||||
|
client2.expect('{"row":{"sum(a)":"6","_version":"2"}}\n', escape=True)
|
||||||
|
|
||||||
|
client1.send('DROP TABLE test.lv')
|
||||||
|
client1.expect(prompt)
|
||||||
|
client1.send('DROP TABLE test.mt')
|
||||||
|
client1.expect(prompt)
|
@ -0,0 +1 @@
|
|||||||
|
1
|
@ -0,0 +1,9 @@
|
|||||||
|
SET allow_experimental_live_view = 1;
|
||||||
|
|
||||||
|
DROP TABLE IF EXISTS test.lv;
|
||||||
|
|
||||||
|
CREATE LIVE VIEW test.lv AS SELECT 1;
|
||||||
|
|
||||||
|
SELECT * FROM test.lv;
|
||||||
|
|
||||||
|
DROP TABLE test.lv;
|
@ -0,0 +1,4 @@
|
|||||||
|
6 1
|
||||||
|
6 1
|
||||||
|
12 2
|
||||||
|
12 2
|
20
dbms/tests/queries/0_stateless/00973_live_view_select.sql
Normal file
20
dbms/tests/queries/0_stateless/00973_live_view_select.sql
Normal file
@ -0,0 +1,20 @@
|
|||||||
|
SET allow_experimental_live_view = 1;
|
||||||
|
|
||||||
|
DROP TABLE IF EXISTS test.lv;
|
||||||
|
DROP TABLE IF EXISTS test.mt;
|
||||||
|
|
||||||
|
CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple();
|
||||||
|
CREATE LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt;
|
||||||
|
|
||||||
|
INSERT INTO test.mt VALUES (1),(2),(3);
|
||||||
|
|
||||||
|
SELECT *,_version FROM test.lv;
|
||||||
|
SELECT *,_version FROM test.lv;
|
||||||
|
|
||||||
|
INSERT INTO test.mt VALUES (1),(2),(3);
|
||||||
|
|
||||||
|
SELECT *,_version FROM test.lv;
|
||||||
|
SELECT *,_version FROM test.lv;
|
||||||
|
|
||||||
|
DROP TABLE test.lv;
|
||||||
|
DROP TABLE test.mt;
|
@ -0,0 +1,2 @@
|
|||||||
|
6
|
||||||
|
21
|
@ -0,0 +1,18 @@
|
|||||||
|
SET allow_experimental_live_view = 1;
|
||||||
|
|
||||||
|
DROP TABLE IF EXISTS test.lv;
|
||||||
|
DROP TABLE IF EXISTS test.mt;
|
||||||
|
|
||||||
|
CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple();
|
||||||
|
CREATE LIVE VIEW test.lv AS SELECT * FROM test.mt;
|
||||||
|
|
||||||
|
INSERT INTO test.mt VALUES (1),(2),(3);
|
||||||
|
|
||||||
|
SELECT sum(a) FROM test.lv;
|
||||||
|
|
||||||
|
INSERT INTO test.mt VALUES (4),(5),(6);
|
||||||
|
|
||||||
|
SELECT sum(a) FROM test.lv;
|
||||||
|
|
||||||
|
DROP TABLE test.lv;
|
||||||
|
DROP TABLE test.mt;
|
@ -0,0 +1,9 @@
|
|||||||
|
SET allow_experimental_live_view = 1;
|
||||||
|
|
||||||
|
DROP TABLE IF EXISTS test.mt;
|
||||||
|
|
||||||
|
CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple();
|
||||||
|
CREATE LIVE VIEW test.lv AS SELECT * FROM test.mt;
|
||||||
|
|
||||||
|
DROP TABLE test.lv;
|
||||||
|
DROP TABLE test.mt;
|
@ -0,0 +1,3 @@
|
|||||||
|
1 1
|
||||||
|
2 1
|
||||||
|
3 1
|
@ -0,0 +1,14 @@
|
|||||||
|
SET allow_experimental_live_view = 1;
|
||||||
|
|
||||||
|
DROP TABLE IF EXISTS test.lv;
|
||||||
|
DROP TABLE IF EXISTS test.mt;
|
||||||
|
|
||||||
|
CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple();
|
||||||
|
CREATE LIVE VIEW test.lv AS SELECT * FROM test.mt;
|
||||||
|
|
||||||
|
INSERT INTO test.mt VALUES (1),(2),(3);
|
||||||
|
|
||||||
|
SELECT *,_version FROM test.lv;
|
||||||
|
|
||||||
|
DROP TABLE test.lv;
|
||||||
|
DROP TABLE test.mt;
|
@ -0,0 +1,3 @@
|
|||||||
|
1
|
||||||
|
2
|
||||||
|
3
|
@ -0,0 +1,20 @@
|
|||||||
|
SET allow_experimental_live_view = 1;
|
||||||
|
|
||||||
|
DROP TABLE IF EXISTS test.lv;
|
||||||
|
DROP TABLE IF EXISTS test.mt;
|
||||||
|
|
||||||
|
CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple();
|
||||||
|
CREATE LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt;
|
||||||
|
|
||||||
|
WATCH test.lv EVENTS LIMIT 0;
|
||||||
|
|
||||||
|
INSERT INTO test.mt VALUES (1),(2),(3);
|
||||||
|
|
||||||
|
WATCH test.lv EVENTS LIMIT 0;
|
||||||
|
|
||||||
|
INSERT INTO test.mt VALUES (4),(5),(6);
|
||||||
|
|
||||||
|
WATCH test.lv EVENTS LIMIT 0;
|
||||||
|
|
||||||
|
DROP TABLE test.lv;
|
||||||
|
DROP TABLE test.mt;
|
@ -0,0 +1,3 @@
|
|||||||
|
0 1
|
||||||
|
6 2
|
||||||
|
21 3
|
20
dbms/tests/queries/0_stateless/00978_live_view_watch.sql
Normal file
20
dbms/tests/queries/0_stateless/00978_live_view_watch.sql
Normal file
@ -0,0 +1,20 @@
|
|||||||
|
SET allow_experimental_live_view = 1;
|
||||||
|
|
||||||
|
DROP TABLE IF EXISTS test.lv;
|
||||||
|
DROP TABLE IF EXISTS test.mt;
|
||||||
|
|
||||||
|
CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple();
|
||||||
|
CREATE LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt;
|
||||||
|
|
||||||
|
WATCH test.lv LIMIT 0;
|
||||||
|
|
||||||
|
INSERT INTO test.mt VALUES (1),(2),(3);
|
||||||
|
|
||||||
|
WATCH test.lv LIMIT 0;
|
||||||
|
|
||||||
|
INSERT INTO test.mt VALUES (4),(5),(6);
|
||||||
|
|
||||||
|
WATCH test.lv LIMIT 0;
|
||||||
|
|
||||||
|
DROP TABLE test.lv;
|
||||||
|
DROP TABLE test.mt;
|
53
dbms/tests/queries/0_stateless/00979_live_view_watch_live.py
Executable file
53
dbms/tests/queries/0_stateless/00979_live_view_watch_live.py
Executable file
@ -0,0 +1,53 @@
|
|||||||
|
#!/usr/bin/env python
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
import signal
|
||||||
|
|
||||||
|
CURDIR = os.path.dirname(os.path.realpath(__file__))
|
||||||
|
sys.path.insert(0, os.path.join(CURDIR, 'helpers'))
|
||||||
|
|
||||||
|
from client import client, prompt, end_of_block
|
||||||
|
|
||||||
|
log = None
|
||||||
|
# uncomment the line below for debugging
|
||||||
|
#log=sys.stdout
|
||||||
|
|
||||||
|
with client(name='client1>', log=log) as client1, client(name='client2>', log=log) as client2:
|
||||||
|
client1.expect(prompt)
|
||||||
|
client2.expect(prompt)
|
||||||
|
|
||||||
|
client1.send('SET allow_experimental_live_view = 1')
|
||||||
|
client1.expect(prompt)
|
||||||
|
client2.send('SET allow_experimental_live_view = 1')
|
||||||
|
client2.expect(prompt)
|
||||||
|
|
||||||
|
client1.send('DROP TABLE IF EXISTS test.lv')
|
||||||
|
client1.expect(prompt)
|
||||||
|
client1.send(' DROP TABLE IF EXISTS test.mt')
|
||||||
|
client1.expect(prompt)
|
||||||
|
client1.send('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()')
|
||||||
|
client1.expect(prompt)
|
||||||
|
client1.send('CREATE LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt')
|
||||||
|
client1.expect(prompt)
|
||||||
|
client1.send('WATCH test.lv')
|
||||||
|
client1.expect(r'0.*1' + end_of_block)
|
||||||
|
client2.send('INSERT INTO test.mt VALUES (1),(2),(3)')
|
||||||
|
client1.expect(r'6.*2' + end_of_block)
|
||||||
|
client2.expect(prompt)
|
||||||
|
client2.send('INSERT INTO test.mt VALUES (4),(5),(6)')
|
||||||
|
client1.expect(r'21.*3' + end_of_block)
|
||||||
|
client2.expect(prompt)
|
||||||
|
for i in range(1,129):
|
||||||
|
client2.send('INSERT INTO test.mt VALUES (1)')
|
||||||
|
client1.expect(r'%d.*%d' % (21+i, 3+i) + end_of_block)
|
||||||
|
client2.expect(prompt)
|
||||||
|
# send Ctrl-C
|
||||||
|
client1.send('\x03', eol='')
|
||||||
|
match = client1.expect('(%s)|([#\$] )' % prompt)
|
||||||
|
if match.groups()[1]:
|
||||||
|
client1.send(client1.command)
|
||||||
|
client1.expect(prompt)
|
||||||
|
client1.send('DROP TABLE test.lv')
|
||||||
|
client1.expect(prompt)
|
||||||
|
client1.send('DROP TABLE test.mt')
|
||||||
|
client1.expect(prompt)
|
@ -0,0 +1,3 @@
|
|||||||
|
temporary_live_view_timeout 5
|
||||||
|
live_view_heartbeat_interval 15
|
||||||
|
0
|
@ -0,0 +1,17 @@
|
|||||||
|
SET allow_experimental_live_view = 1;
|
||||||
|
|
||||||
|
DROP TABLE IF EXISTS test.lv;
|
||||||
|
DROP TABLE IF EXISTS test.mt;
|
||||||
|
|
||||||
|
SELECT name, value from system.settings WHERE name = 'temporary_live_view_timeout';
|
||||||
|
SELECT name, value from system.settings WHERE name = 'live_view_heartbeat_interval';
|
||||||
|
|
||||||
|
SET temporary_live_view_timeout=1;
|
||||||
|
CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple();
|
||||||
|
CREATE TEMPORARY LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt;
|
||||||
|
|
||||||
|
SHOW TABLES LIKE 'lv';
|
||||||
|
SELECT sleep(2);
|
||||||
|
SHOW TABLES LIKE 'lv';
|
||||||
|
|
||||||
|
DROP TABLE test.mt;
|
@ -0,0 +1,81 @@
|
|||||||
|
#!/usr/bin/env python
|
||||||
|
|
||||||
|
import subprocess
|
||||||
|
import threading
|
||||||
|
import Queue as queue
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
import signal
|
||||||
|
|
||||||
|
|
||||||
|
CLICKHOUSE_CLIENT = os.environ.get('CLICKHOUSE_CLIENT')
|
||||||
|
CLICKHOUSE_CURL = os.environ.get('CLICKHOUSE_CURL')
|
||||||
|
CLICKHOUSE_URL = os.environ.get('CLICKHOUSE_URL')
|
||||||
|
|
||||||
|
|
||||||
|
def send_query(query):
|
||||||
|
cmd = list(CLICKHOUSE_CLIENT.split())
|
||||||
|
cmd += ['--query', query]
|
||||||
|
# print(cmd)
|
||||||
|
return subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT).stdout
|
||||||
|
|
||||||
|
|
||||||
|
def send_query_in_process_group(query):
|
||||||
|
cmd = list(CLICKHOUSE_CLIENT.split())
|
||||||
|
cmd += ['--query', query]
|
||||||
|
# print(cmd)
|
||||||
|
return subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, preexec_fn=os.setsid)
|
||||||
|
|
||||||
|
|
||||||
|
def read_lines_and_push_to_queue(pipe, queue):
|
||||||
|
try:
|
||||||
|
for line in iter(pipe.readline, ''):
|
||||||
|
line = line.strip()
|
||||||
|
print(line)
|
||||||
|
sys.stdout.flush()
|
||||||
|
queue.put(line)
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
pass
|
||||||
|
|
||||||
|
queue.put(None)
|
||||||
|
|
||||||
|
|
||||||
|
def test():
|
||||||
|
send_query('DROP TABLE IF EXISTS test.lv').read()
|
||||||
|
send_query('DROP TABLE IF EXISTS test.mt').read()
|
||||||
|
send_query('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()').read()
|
||||||
|
send_query('CREATE LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt').read()
|
||||||
|
|
||||||
|
q = queue.Queue()
|
||||||
|
p = send_query_in_process_group('WATCH test.lv')
|
||||||
|
thread = threading.Thread(target=read_lines_and_push_to_queue, args=(p.stdout, q))
|
||||||
|
thread.start()
|
||||||
|
|
||||||
|
line = q.get()
|
||||||
|
print(line)
|
||||||
|
assert (line == '0\t1')
|
||||||
|
|
||||||
|
send_query('INSERT INTO test.mt VALUES (1),(2),(3)').read()
|
||||||
|
line = q.get()
|
||||||
|
print(line)
|
||||||
|
assert (line == '6\t2')
|
||||||
|
|
||||||
|
send_query('INSERT INTO test.mt VALUES (4),(5),(6)').read()
|
||||||
|
line = q.get()
|
||||||
|
print(line)
|
||||||
|
assert (line == '21\t3')
|
||||||
|
|
||||||
|
# Send Ctrl+C to client.
|
||||||
|
os.killpg(os.getpgid(p.pid), signal.SIGINT)
|
||||||
|
# This insert shouldn't affect lv.
|
||||||
|
send_query('INSERT INTO test.mt VALUES (7),(8),(9)').read()
|
||||||
|
line = q.get()
|
||||||
|
print(line)
|
||||||
|
assert (line is None)
|
||||||
|
|
||||||
|
send_query('DROP TABLE if exists test.lv').read()
|
||||||
|
send_query('DROP TABLE if exists test.lv').read()
|
||||||
|
|
||||||
|
thread.join()
|
||||||
|
|
||||||
|
test()
|
@ -0,0 +1,7 @@
|
|||||||
|
0 1
|
||||||
|
0 1
|
||||||
|
6 2
|
||||||
|
6 2
|
||||||
|
21 3
|
||||||
|
21 3
|
||||||
|
None
|
63
dbms/tests/queries/0_stateless/00991_live_view_watch_http.python
Executable file
63
dbms/tests/queries/0_stateless/00991_live_view_watch_http.python
Executable file
@ -0,0 +1,63 @@
|
|||||||
|
#!/usr/bin/env python
|
||||||
|
|
||||||
|
import subprocess
|
||||||
|
import threading
|
||||||
|
import Queue as queue
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
|
||||||
|
|
||||||
|
CLICKHOUSE_CLIENT = os.environ.get('CLICKHOUSE_CLIENT')
|
||||||
|
CLICKHOUSE_CURL = os.environ.get('CLICKHOUSE_CURL')
|
||||||
|
CLICKHOUSE_URL = os.environ.get('CLICKHOUSE_URL')
|
||||||
|
|
||||||
|
|
||||||
|
def send_query(query):
|
||||||
|
cmd = list(CLICKHOUSE_CLIENT.split())
|
||||||
|
cmd += ['--query', query]
|
||||||
|
# print(cmd)
|
||||||
|
return subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT).stdout
|
||||||
|
|
||||||
|
|
||||||
|
def send_http_query(query):
|
||||||
|
cmd = list(CLICKHOUSE_CURL.split()) # list(['curl', '-sSN', '--max-time', '10'])
|
||||||
|
cmd += ['-sSN', CLICKHOUSE_URL, '-d', query]
|
||||||
|
return subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT).stdout
|
||||||
|
|
||||||
|
|
||||||
|
def read_lines_and_push_to_queue(pipe, queue):
|
||||||
|
for line in iter(pipe.readline, ''):
|
||||||
|
line = line.strip()
|
||||||
|
print(line)
|
||||||
|
sys.stdout.flush()
|
||||||
|
queue.put(line)
|
||||||
|
|
||||||
|
queue.put(None)
|
||||||
|
|
||||||
|
|
||||||
|
def test():
|
||||||
|
send_query('DROP TABLE IF EXISTS test.lv').read()
|
||||||
|
send_query('DROP TABLE IF EXISTS test.mt').read()
|
||||||
|
send_query('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()').read()
|
||||||
|
send_query('CREATE LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt').read()
|
||||||
|
|
||||||
|
q = queue.Queue()
|
||||||
|
pipe = send_http_query('WATCH test.lv')
|
||||||
|
thread = threading.Thread(target=read_lines_and_push_to_queue, args=(pipe, q))
|
||||||
|
thread.start()
|
||||||
|
|
||||||
|
line = q.get()
|
||||||
|
print(line)
|
||||||
|
assert (line == '0\t1')
|
||||||
|
|
||||||
|
send_query('INSERT INTO test.mt VALUES (1),(2),(3)').read()
|
||||||
|
line = q.get()
|
||||||
|
print(line)
|
||||||
|
assert (line == '6\t2')
|
||||||
|
|
||||||
|
send_query('DROP TABLE if exists test.lv').read()
|
||||||
|
send_query('DROP TABLE if exists test.lv').read()
|
||||||
|
|
||||||
|
thread.join()
|
||||||
|
|
||||||
|
test()
|
@ -0,0 +1,4 @@
|
|||||||
|
0 1
|
||||||
|
0 1
|
||||||
|
6 2
|
||||||
|
6 2
|
@ -0,0 +1,83 @@
|
|||||||
|
#!/usr/bin/env python
|
||||||
|
|
||||||
|
import subprocess
|
||||||
|
import threading
|
||||||
|
import Queue as queue
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
import signal
|
||||||
|
|
||||||
|
|
||||||
|
CLICKHOUSE_CLIENT = os.environ.get('CLICKHOUSE_CLIENT')
|
||||||
|
CLICKHOUSE_CURL = os.environ.get('CLICKHOUSE_CURL')
|
||||||
|
CLICKHOUSE_URL = os.environ.get('CLICKHOUSE_URL')
|
||||||
|
|
||||||
|
|
||||||
|
def send_query(query):
|
||||||
|
cmd = list(CLICKHOUSE_CLIENT.split())
|
||||||
|
cmd += ['--query', query]
|
||||||
|
# print(cmd)
|
||||||
|
return subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT).stdout
|
||||||
|
|
||||||
|
|
||||||
|
def send_query_in_process_group(query):
|
||||||
|
cmd = list(CLICKHOUSE_CLIENT.split())
|
||||||
|
cmd += ['--query', query, '--live_view_heartbeat_interval=1', '--progress']
|
||||||
|
# print(cmd)
|
||||||
|
return subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, preexec_fn=os.setsid)
|
||||||
|
|
||||||
|
|
||||||
|
def read_lines_and_push_to_queue(pipe, queue):
|
||||||
|
try:
|
||||||
|
for line in iter(pipe.readline, ''):
|
||||||
|
line = line.strip()
|
||||||
|
# print(line)
|
||||||
|
sys.stdout.flush()
|
||||||
|
queue.put(line)
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
pass
|
||||||
|
|
||||||
|
queue.put(None)
|
||||||
|
|
||||||
|
|
||||||
|
def test():
|
||||||
|
send_query('DROP TABLE IF EXISTS test.lv').read()
|
||||||
|
send_query('DROP TABLE IF EXISTS test.mt').read()
|
||||||
|
send_query('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()').read()
|
||||||
|
send_query('CREATE TEMPORARY LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt').read()
|
||||||
|
|
||||||
|
q = queue.Queue()
|
||||||
|
p = send_query_in_process_group('WATCH test.lv')
|
||||||
|
thread = threading.Thread(target=read_lines_and_push_to_queue, args=(p.stdout, q))
|
||||||
|
thread.start()
|
||||||
|
|
||||||
|
line = q.get()
|
||||||
|
# print(line)
|
||||||
|
assert (line.endswith('0\t1'))
|
||||||
|
assert ('Progress: 0.00 rows' in line)
|
||||||
|
|
||||||
|
send_query('INSERT INTO test.mt VALUES (1),(2),(3)').read()
|
||||||
|
line = q.get()
|
||||||
|
assert (line.endswith('6\t2'))
|
||||||
|
assert ('Progress: 1.00 rows' in line)
|
||||||
|
|
||||||
|
# send_query('INSERT INTO test.mt VALUES (4),(5),(6)').read()
|
||||||
|
# line = q.get()
|
||||||
|
# print(line)
|
||||||
|
# assert (line.endswith('6\t2'))
|
||||||
|
# assert ('Progress: 1.00 rows' in line)
|
||||||
|
|
||||||
|
# Send Ctrl+C to client.
|
||||||
|
os.killpg(os.getpgid(p.pid), signal.SIGINT)
|
||||||
|
# This insert shouldn't affect lv.
|
||||||
|
send_query('INSERT INTO test.mt VALUES (7),(8),(9)').read()
|
||||||
|
line = q.get()
|
||||||
|
# print(line)
|
||||||
|
# assert (line is None)
|
||||||
|
|
||||||
|
send_query('DROP TABLE if exists test.lv').read()
|
||||||
|
send_query('DROP TABLE if exists test.lv').read()
|
||||||
|
|
||||||
|
thread.join()
|
||||||
|
|
||||||
|
test()
|
@ -0,0 +1,81 @@
|
|||||||
|
#!/usr/bin/env python
|
||||||
|
|
||||||
|
import subprocess
|
||||||
|
import threading
|
||||||
|
import Queue as queue
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
import signal
|
||||||
|
|
||||||
|
|
||||||
|
CLICKHOUSE_CLIENT = os.environ.get('CLICKHOUSE_CLIENT')
|
||||||
|
CLICKHOUSE_CURL = os.environ.get('CLICKHOUSE_CURL')
|
||||||
|
CLICKHOUSE_URL = os.environ.get('CLICKHOUSE_URL')
|
||||||
|
|
||||||
|
|
||||||
|
def send_query(query):
|
||||||
|
cmd = list(CLICKHOUSE_CLIENT.split())
|
||||||
|
cmd += ['--query', query]
|
||||||
|
# print(cmd)
|
||||||
|
return subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT).stdout
|
||||||
|
|
||||||
|
|
||||||
|
def send_query_in_process_group(query):
|
||||||
|
cmd = list(CLICKHOUSE_CLIENT.split())
|
||||||
|
cmd += ['--query', query]
|
||||||
|
# print(cmd)
|
||||||
|
return subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, preexec_fn=os.setsid)
|
||||||
|
|
||||||
|
|
||||||
|
def read_lines_and_push_to_queue(pipe, queue):
|
||||||
|
try:
|
||||||
|
for line in iter(pipe.readline, ''):
|
||||||
|
line = line.strip()
|
||||||
|
print(line)
|
||||||
|
sys.stdout.flush()
|
||||||
|
queue.put(line)
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
pass
|
||||||
|
|
||||||
|
queue.put(None)
|
||||||
|
|
||||||
|
|
||||||
|
def test():
|
||||||
|
send_query('DROP TABLE IF EXISTS test.lv').read()
|
||||||
|
send_query('DROP TABLE IF EXISTS test.mt').read()
|
||||||
|
send_query('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()').read()
|
||||||
|
send_query('CREATE TEMPORARY LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt').read()
|
||||||
|
|
||||||
|
q = queue.Queue()
|
||||||
|
p = send_query_in_process_group('WATCH test.lv')
|
||||||
|
thread = threading.Thread(target=read_lines_and_push_to_queue, args=(p.stdout, q))
|
||||||
|
thread.start()
|
||||||
|
|
||||||
|
line = q.get()
|
||||||
|
print(line)
|
||||||
|
assert (line == '0\t1')
|
||||||
|
|
||||||
|
send_query('INSERT INTO test.mt VALUES (1),(2),(3)').read()
|
||||||
|
line = q.get()
|
||||||
|
print(line)
|
||||||
|
assert (line == '6\t2')
|
||||||
|
|
||||||
|
send_query('INSERT INTO test.mt VALUES (4),(5),(6)').read()
|
||||||
|
line = q.get()
|
||||||
|
print(line)
|
||||||
|
assert (line == '21\t3')
|
||||||
|
|
||||||
|
# Send Ctrl+C to client.
|
||||||
|
os.killpg(os.getpgid(p.pid), signal.SIGINT)
|
||||||
|
# This insert shouldn't affect lv.
|
||||||
|
send_query('INSERT INTO test.mt VALUES (7),(8),(9)').read()
|
||||||
|
line = q.get()
|
||||||
|
print(line)
|
||||||
|
assert (line is None)
|
||||||
|
|
||||||
|
send_query('DROP TABLE if exists test.lv').read()
|
||||||
|
send_query('DROP TABLE if exists test.lv').read()
|
||||||
|
|
||||||
|
thread.join()
|
||||||
|
|
||||||
|
test()
|
@ -0,0 +1,7 @@
|
|||||||
|
0 1
|
||||||
|
0 1
|
||||||
|
6 2
|
||||||
|
6 2
|
||||||
|
21 3
|
||||||
|
21 3
|
||||||
|
None
|
Loading…
Reference in New Issue
Block a user