From f959c29be60b9a70eef34ecff4135e871c45201e Mon Sep 17 00:00:00 2001 From: Vitaliy Zakaznikov Date: Sat, 24 Aug 2019 21:38:50 -0400 Subject: [PATCH 1/8] Revert "Temporarily disable all LIVE VIEW tests because this feature has subtle bugs that manifestate under TSan" This reverts commit 96869d405fc132a7443cfbc13d1e583079503db8. --- .../00960_live_view_watch_events_live.py | 42 ++++ ...0960_live_view_watch_events_live.reference | 0 .../00961_temporary_live_view_watch.reference | 3 + .../00961_temporary_live_view_watch.sql | 18 ++ .../00962_temporary_live_view_watch_live.py | 42 ++++ ...2_temporary_live_view_watch_live.reference | 0 ...y_live_view_watch_live_timeout.py.disabled | 49 +++++ ...ary_live_view_watch_live_timeout.reference | 0 .../00964_live_view_watch_events_heartbeat.py | 44 ++++ ...live_view_watch_events_heartbeat.reference | 0 .../00965_live_view_watch_heartbeat.py | 45 ++++ .../00965_live_view_watch_heartbeat.reference | 0 .../00966_live_view_watch_events_http.py | 37 ++++ ...0966_live_view_watch_events_http.reference | 0 .../0_stateless/00967_live_view_watch_http.py | 37 ++++ .../00967_live_view_watch_http.reference | 0 ...t_format_jsoneachrowwithprogress.reference | 4 + ..._select_format_jsoneachrowwithprogress.sql | 12 + ...h_format_jsoneachrowwithprogress.reference | 6 + ...w_watch_format_jsoneachrowwithprogress.sql | 18 ++ ...0_live_view_watch_events_http_heartbeat.py | 43 ++++ ...view_watch_events_http_heartbeat.reference | 0 .../00971_live_view_watch_http_heartbeat.py | 43 ++++ ...1_live_view_watch_http_heartbeat.reference | 0 .../00972_live_view_select_1.reference | 1 + .../0_stateless/00972_live_view_select_1.sql | 7 + .../00973_live_view_select.reference | 4 + .../0_stateless/00973_live_view_select.sql | 18 ++ ...ive_view_select_with_aggregation.reference | 2 + ...0974_live_view_select_with_aggregation.sql | 16 ++ .../00975_live_view_create.reference | 0 .../0_stateless/00975_live_view_create.sql | 7 + .../00976_live_view_select_version.reference | 3 + .../00976_live_view_select_version.sql | 12 + .../00977_live_view_watch_events.reference | 3 + .../00977_live_view_watch_events.sql | 18 ++ .../00978_live_view_watch.reference | 3 + .../0_stateless/00978_live_view_watch.sql | 18 ++ .../0_stateless/00979_live_view_watch_live.py | 48 ++++ .../00979_live_view_watch_live.reference | 0 ...00980_create_temporary_live_view.reference | 3 + .../00980_create_temporary_live_view.sql | 15 ++ .../00991_live_view_watch_event_live.python | 81 +++++++ ...00991_live_view_watch_event_live.reference | 7 + ...991_live_view_watch_event_live.sh.disabled | 6 + .../00991_live_view_watch_http.python | 63 ++++++ .../00991_live_view_watch_http.reference | 4 + .../00991_live_view_watch_http.sh.disabled | 6 + ...ry_live_view_watch_events_heartbeat.python | 83 +++++++ ...live_view_watch_events_heartbeat.reference | 0 ...ve_view_watch_events_heartbeat.sh.disabled | 6 + ...0991_temporary_live_view_watch_live.python | 81 +++++++ ...1_temporary_live_view_watch_live.reference | 7 + ...temporary_live_view_watch_live.sh.disabled | 6 + .../queries/0_stateless/helpers/client.py | 36 +++ .../queries/0_stateless/helpers/httpclient.py | 14 ++ .../queries/0_stateless/helpers/httpexpect.py | 73 +++++++ .../queries/0_stateless/helpers/uexpect.py | 206 ++++++++++++++++++ 58 files changed, 1300 insertions(+) create mode 100755 dbms/tests/queries/0_stateless/00960_live_view_watch_events_live.py create mode 100644 dbms/tests/queries/0_stateless/00960_live_view_watch_events_live.reference create mode 100644 dbms/tests/queries/0_stateless/00961_temporary_live_view_watch.reference create mode 100644 dbms/tests/queries/0_stateless/00961_temporary_live_view_watch.sql create mode 100755 dbms/tests/queries/0_stateless/00962_temporary_live_view_watch_live.py create mode 100644 dbms/tests/queries/0_stateless/00962_temporary_live_view_watch_live.reference create mode 100755 dbms/tests/queries/0_stateless/00963_temporary_live_view_watch_live_timeout.py.disabled create mode 100644 dbms/tests/queries/0_stateless/00963_temporary_live_view_watch_live_timeout.reference create mode 100755 dbms/tests/queries/0_stateless/00964_live_view_watch_events_heartbeat.py create mode 100644 dbms/tests/queries/0_stateless/00964_live_view_watch_events_heartbeat.reference create mode 100755 dbms/tests/queries/0_stateless/00965_live_view_watch_heartbeat.py create mode 100644 dbms/tests/queries/0_stateless/00965_live_view_watch_heartbeat.reference create mode 100755 dbms/tests/queries/0_stateless/00966_live_view_watch_events_http.py create mode 100644 dbms/tests/queries/0_stateless/00966_live_view_watch_events_http.reference create mode 100755 dbms/tests/queries/0_stateless/00967_live_view_watch_http.py create mode 100644 dbms/tests/queries/0_stateless/00967_live_view_watch_http.reference create mode 100644 dbms/tests/queries/0_stateless/00968_live_view_select_format_jsoneachrowwithprogress.reference create mode 100644 dbms/tests/queries/0_stateless/00968_live_view_select_format_jsoneachrowwithprogress.sql create mode 100644 dbms/tests/queries/0_stateless/00969_live_view_watch_format_jsoneachrowwithprogress.reference create mode 100644 dbms/tests/queries/0_stateless/00969_live_view_watch_format_jsoneachrowwithprogress.sql create mode 100755 dbms/tests/queries/0_stateless/00970_live_view_watch_events_http_heartbeat.py create mode 100644 dbms/tests/queries/0_stateless/00970_live_view_watch_events_http_heartbeat.reference create mode 100755 dbms/tests/queries/0_stateless/00971_live_view_watch_http_heartbeat.py create mode 100644 dbms/tests/queries/0_stateless/00971_live_view_watch_http_heartbeat.reference create mode 100644 dbms/tests/queries/0_stateless/00972_live_view_select_1.reference create mode 100644 dbms/tests/queries/0_stateless/00972_live_view_select_1.sql create mode 100644 dbms/tests/queries/0_stateless/00973_live_view_select.reference create mode 100644 dbms/tests/queries/0_stateless/00973_live_view_select.sql create mode 100644 dbms/tests/queries/0_stateless/00974_live_view_select_with_aggregation.reference create mode 100644 dbms/tests/queries/0_stateless/00974_live_view_select_with_aggregation.sql create mode 100644 dbms/tests/queries/0_stateless/00975_live_view_create.reference create mode 100644 dbms/tests/queries/0_stateless/00975_live_view_create.sql create mode 100644 dbms/tests/queries/0_stateless/00976_live_view_select_version.reference create mode 100644 dbms/tests/queries/0_stateless/00976_live_view_select_version.sql create mode 100644 dbms/tests/queries/0_stateless/00977_live_view_watch_events.reference create mode 100644 dbms/tests/queries/0_stateless/00977_live_view_watch_events.sql create mode 100644 dbms/tests/queries/0_stateless/00978_live_view_watch.reference create mode 100644 dbms/tests/queries/0_stateless/00978_live_view_watch.sql create mode 100755 dbms/tests/queries/0_stateless/00979_live_view_watch_live.py create mode 100644 dbms/tests/queries/0_stateless/00979_live_view_watch_live.reference create mode 100644 dbms/tests/queries/0_stateless/00980_create_temporary_live_view.reference create mode 100644 dbms/tests/queries/0_stateless/00980_create_temporary_live_view.sql create mode 100644 dbms/tests/queries/0_stateless/00991_live_view_watch_event_live.python create mode 100644 dbms/tests/queries/0_stateless/00991_live_view_watch_event_live.reference create mode 100755 dbms/tests/queries/0_stateless/00991_live_view_watch_event_live.sh.disabled create mode 100755 dbms/tests/queries/0_stateless/00991_live_view_watch_http.python create mode 100644 dbms/tests/queries/0_stateless/00991_live_view_watch_http.reference create mode 100755 dbms/tests/queries/0_stateless/00991_live_view_watch_http.sh.disabled create mode 100644 dbms/tests/queries/0_stateless/00991_temporary_live_view_watch_events_heartbeat.python create mode 100644 dbms/tests/queries/0_stateless/00991_temporary_live_view_watch_events_heartbeat.reference create mode 100755 dbms/tests/queries/0_stateless/00991_temporary_live_view_watch_events_heartbeat.sh.disabled create mode 100644 dbms/tests/queries/0_stateless/00991_temporary_live_view_watch_live.python create mode 100644 dbms/tests/queries/0_stateless/00991_temporary_live_view_watch_live.reference create mode 100755 dbms/tests/queries/0_stateless/00991_temporary_live_view_watch_live.sh.disabled create mode 100644 dbms/tests/queries/0_stateless/helpers/client.py create mode 100644 dbms/tests/queries/0_stateless/helpers/httpclient.py create mode 100644 dbms/tests/queries/0_stateless/helpers/httpexpect.py create mode 100644 dbms/tests/queries/0_stateless/helpers/uexpect.py diff --git a/dbms/tests/queries/0_stateless/00960_live_view_watch_events_live.py b/dbms/tests/queries/0_stateless/00960_live_view_watch_events_live.py new file mode 100755 index 00000000000..b7fc3f4e3a6 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00960_live_view_watch_events_live.py @@ -0,0 +1,42 @@ +#!/usr/bin/env python +import os +import sys +import signal + +CURDIR = os.path.dirname(os.path.realpath(__file__)) +sys.path.insert(0, os.path.join(CURDIR, 'helpers')) + +from client import client, prompt, end_of_block + +log = None +# uncomment the line below for debugging +#log=sys.stdout + +with client(name='client1>', log=log) as client1, client(name='client2>', log=log) as client2: + client1.expect(prompt) + client2.expect(prompt) + + client1.send('DROP TABLE IF EXISTS test.lv') + client1.expect(prompt) + client1.send(' DROP TABLE IF EXISTS test.mt') + client1.expect(prompt) + client1.send('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()') + client1.expect(prompt) + client1.send('CREATE LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt') + client1.expect(prompt) + client1.send('WATCH test.lv EVENTS') + client1.expect('1.*' + end_of_block) + client2.send('INSERT INTO test.mt VALUES (1),(2),(3)') + client1.expect('2.*' + end_of_block) + client2.send('INSERT INTO test.mt VALUES (4),(5),(6)') + client1.expect('3.*' + end_of_block) + # send Ctrl-C + client1.send('\x03', eol='') + match = client1.expect('(%s)|([#\$] )' % prompt) + if match.groups()[1]: + client1.send(client1.command) + client1.expect(prompt) + client1.send('DROP TABLE test.lv') + client1.expect(prompt) + client1.send('DROP TABLE test.mt') + client1.expect(prompt) diff --git a/dbms/tests/queries/0_stateless/00960_live_view_watch_events_live.reference b/dbms/tests/queries/0_stateless/00960_live_view_watch_events_live.reference new file mode 100644 index 00000000000..e69de29bb2d diff --git a/dbms/tests/queries/0_stateless/00961_temporary_live_view_watch.reference b/dbms/tests/queries/0_stateless/00961_temporary_live_view_watch.reference new file mode 100644 index 00000000000..6fbbedf1b21 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00961_temporary_live_view_watch.reference @@ -0,0 +1,3 @@ +0 1 +6 2 +21 3 diff --git a/dbms/tests/queries/0_stateless/00961_temporary_live_view_watch.sql b/dbms/tests/queries/0_stateless/00961_temporary_live_view_watch.sql new file mode 100644 index 00000000000..c3e2ab8d102 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00961_temporary_live_view_watch.sql @@ -0,0 +1,18 @@ +DROP TABLE IF EXISTS test.lv; +DROP TABLE IF EXISTS test.mt; + +CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple(); +CREATE TEMPORARY LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt; + +WATCH test.lv LIMIT 0; + +INSERT INTO test.mt VALUES (1),(2),(3); + +WATCH test.lv LIMIT 0; + +INSERT INTO test.mt VALUES (4),(5),(6); + +WATCH test.lv LIMIT 0; + +DROP TABLE test.lv; +DROP TABLE test.mt; diff --git a/dbms/tests/queries/0_stateless/00962_temporary_live_view_watch_live.py b/dbms/tests/queries/0_stateless/00962_temporary_live_view_watch_live.py new file mode 100755 index 00000000000..f27b1213c70 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00962_temporary_live_view_watch_live.py @@ -0,0 +1,42 @@ +#!/usr/bin/env python +import os +import sys +import signal + +CURDIR = os.path.dirname(os.path.realpath(__file__)) +sys.path.insert(0, os.path.join(CURDIR, 'helpers')) + +from client import client, prompt, end_of_block + +log = None +# uncomment the line below for debugging +#log=sys.stdout + +with client(name='client1>', log=log) as client1, client(name='client2>', log=log) as client2: + client1.expect(prompt) + client2.expect(prompt) + + client1.send('DROP TABLE IF EXISTS test.lv') + client1.expect(prompt) + client1.send('DROP TABLE IF EXISTS test.mt') + client1.expect(prompt) + client1.send('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()') + client1.expect(prompt) + client1.send('CREATE TEMPORARY LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt') + client1.expect(prompt) + client1.send('WATCH test.lv') + client1.expect(r'0.*1' + end_of_block) + client2.send('INSERT INTO test.mt VALUES (1),(2),(3)') + client1.expect(r'6.*2' + end_of_block) + client2.send('INSERT INTO test.mt VALUES (4),(5),(6)') + client1.expect(r'21.*3' + end_of_block) + # send Ctrl-C + client1.send('\x03', eol='') + match = client1.expect('(%s)|([#\$] )' % prompt) + if match.groups()[1]: + client1.send(client1.command) + client1.expect(prompt) + client1.send('DROP TABLE test.lv') + client1.expect(prompt) + client1.send('DROP TABLE test.mt') + client1.expect(prompt) diff --git a/dbms/tests/queries/0_stateless/00962_temporary_live_view_watch_live.reference b/dbms/tests/queries/0_stateless/00962_temporary_live_view_watch_live.reference new file mode 100644 index 00000000000..e69de29bb2d diff --git a/dbms/tests/queries/0_stateless/00963_temporary_live_view_watch_live_timeout.py.disabled b/dbms/tests/queries/0_stateless/00963_temporary_live_view_watch_live_timeout.py.disabled new file mode 100755 index 00000000000..df627c84e49 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00963_temporary_live_view_watch_live_timeout.py.disabled @@ -0,0 +1,49 @@ +#!/usr/bin/env python +import os +import sys +import signal + +CURDIR = os.path.dirname(os.path.realpath(__file__)) +sys.path.insert(0, os.path.join(CURDIR, 'helpers')) + +from client import client, prompt, end_of_block + +log = None +# uncomment the line below for debugging +#log=sys.stdout + +with client(name='client1>', log=log) as client1, client(name='client2>', log=log) as client2: + client1.expect(prompt) + client2.expect(prompt) + + client1.send('DROP TABLE IF EXISTS test.lv') + client1.expect(prompt) + client1.send('DROP TABLE IF EXISTS test.mt') + client1.expect(prompt) + client1.send('SET temporary_live_view_timeout=1') + client1.expect(prompt) + client1.send('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()') + client1.expect(prompt) + client1.send('CREATE TEMPORARY LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt') + client1.expect(prompt) + client1.send('WATCH test.lv') + client1.expect(r'0.*1' + end_of_block) + client2.send('INSERT INTO test.mt VALUES (1),(2),(3)') + client2.expect(prompt) + client1.expect(r'6.*2' + end_of_block) + client2.send('INSERT INTO test.mt VALUES (4),(5),(6)') + client2.expect(prompt) + client1.expect(r'21.*3' + end_of_block) + # send Ctrl-C + client1.send('\x03', eol='') + match = client1.expect('(%s)|([#\$] )' % prompt) + if match.groups()[1]: + client1.send(client1.command) + client1.expect(prompt) + client1.send('SELECT sleep(1)') + client1.expect(prompt) + client1.send('DROP TABLE test.lv') + client1.expect('Table test.lv doesn\'t exist') + client1.expect(prompt) + client1.send('DROP TABLE test.mt') + client1.expect(prompt) diff --git a/dbms/tests/queries/0_stateless/00963_temporary_live_view_watch_live_timeout.reference b/dbms/tests/queries/0_stateless/00963_temporary_live_view_watch_live_timeout.reference new file mode 100644 index 00000000000..e69de29bb2d diff --git a/dbms/tests/queries/0_stateless/00964_live_view_watch_events_heartbeat.py b/dbms/tests/queries/0_stateless/00964_live_view_watch_events_heartbeat.py new file mode 100755 index 00000000000..5664c0e6c6d --- /dev/null +++ b/dbms/tests/queries/0_stateless/00964_live_view_watch_events_heartbeat.py @@ -0,0 +1,44 @@ +#!/usr/bin/env python +import os +import sys +import signal + +CURDIR = os.path.dirname(os.path.realpath(__file__)) +sys.path.insert(0, os.path.join(CURDIR, 'helpers')) + +from client import client, prompt, end_of_block + +log = None +# uncomment the line below for debugging +#log=sys.stdout + +with client(name='client1>', log=log) as client1, client(name='client2>', log=log) as client2: + client1.expect(prompt) + client2.expect(prompt) + + client1.send('DROP TABLE IF EXISTS test.lv') + client1.expect(prompt) + client1.send(' DROP TABLE IF EXISTS test.mt') + client1.expect(prompt) + client1.send('SET live_view_heartbeat_interval=1') + client1.expect(prompt) + client1.send('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()') + client1.expect(prompt) + client1.send('CREATE TEMPORARY LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt') + client1.expect(prompt) + client1.send('WATCH test.lv EVENTS') + client2.send('INSERT INTO test.mt VALUES (1),(2),(3)') + client1.expect('2.*' + end_of_block) + client1.expect('Progress: 2.00 rows.*\)') + # wait for heartbeat + client1.expect('Progress: 2.00 rows.*\)') + # send Ctrl-C + client1.send('\x03', eol='') + match = client1.expect('(%s)|([#\$] )' % prompt) + if match.groups()[1]: + client1.send(client1.command) + client1.expect(prompt) + client1.send('DROP TABLE test.lv') + client1.expect(prompt) + client1.send('DROP TABLE test.mt') + client1.expect(prompt) diff --git a/dbms/tests/queries/0_stateless/00964_live_view_watch_events_heartbeat.reference b/dbms/tests/queries/0_stateless/00964_live_view_watch_events_heartbeat.reference new file mode 100644 index 00000000000..e69de29bb2d diff --git a/dbms/tests/queries/0_stateless/00965_live_view_watch_heartbeat.py b/dbms/tests/queries/0_stateless/00965_live_view_watch_heartbeat.py new file mode 100755 index 00000000000..03e22175dff --- /dev/null +++ b/dbms/tests/queries/0_stateless/00965_live_view_watch_heartbeat.py @@ -0,0 +1,45 @@ +#!/usr/bin/env python +import os +import sys +import signal + +CURDIR = os.path.dirname(os.path.realpath(__file__)) +sys.path.insert(0, os.path.join(CURDIR, 'helpers')) + +from client import client, prompt, end_of_block + +log = None +# uncomment the line below for debugging +#log=sys.stdout + +with client(name='client1>', log=log) as client1, client(name='client2>', log=log) as client2: + client1.expect(prompt) + client2.expect(prompt) + + client1.send('DROP TABLE IF EXISTS test.lv') + client1.expect(prompt) + client1.send(' DROP TABLE IF EXISTS test.mt') + client1.expect(prompt) + client1.send('SET live_view_heartbeat_interval=1') + client1.expect(prompt) + client1.send('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()') + client1.expect(prompt) + client1.send('CREATE TEMPORARY LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt') + client1.expect(prompt) + client1.send('WATCH test.lv') + client1.expect(r'0.*1' + end_of_block) + client2.send('INSERT INTO test.mt VALUES (1),(2),(3)') + client1.expect(r'6.*2' + end_of_block) + client1.expect('Progress: 2.00 rows.*\)') + # wait for heartbeat + client1.expect('Progress: 2.00 rows.*\)') + # send Ctrl-C + client1.send('\x03', eol='') + match = client1.expect('(%s)|([#\$] )' % prompt) + if match.groups()[1]: + client1.send(client1.command) + client1.expect(prompt) + client1.send('DROP TABLE test.lv') + client1.expect(prompt) + client1.send('DROP TABLE test.mt') + client1.expect(prompt) diff --git a/dbms/tests/queries/0_stateless/00965_live_view_watch_heartbeat.reference b/dbms/tests/queries/0_stateless/00965_live_view_watch_heartbeat.reference new file mode 100644 index 00000000000..e69de29bb2d diff --git a/dbms/tests/queries/0_stateless/00966_live_view_watch_events_http.py b/dbms/tests/queries/0_stateless/00966_live_view_watch_events_http.py new file mode 100755 index 00000000000..bb9d6152200 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00966_live_view_watch_events_http.py @@ -0,0 +1,37 @@ +#!/usr/bin/env python +import os +import sys + +CURDIR = os.path.dirname(os.path.realpath(__file__)) +sys.path.insert(0, os.path.join(CURDIR, 'helpers')) + +from client import client, prompt, end_of_block +from httpclient import client as http_client + +log = None +# uncomment the line below for debugging +#log=sys.stdout + +with client(name='client1>', log=log) as client1: + client1.expect(prompt) + + client1.send('DROP TABLE IF EXISTS test.lv') + client1.expect(prompt) + client1.send(' DROP TABLE IF EXISTS test.mt') + client1.expect(prompt) + client1.send('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()') + client1.expect(prompt) + client1.send('CREATE LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt') + client1.expect(prompt) + + + with http_client({'method':'GET', 'url': '/?query=WATCH%20test.lv%20EVENTS'}, name='client2>', log=log) as client2: + client2.expect('.*1\n') + client1.send('INSERT INTO test.mt VALUES (1),(2),(3)') + client1.expect(prompt) + client2.expect('.*2\n') + + client1.send('DROP TABLE test.lv') + client1.expect(prompt) + client1.send('DROP TABLE test.mt') + client1.expect(prompt) diff --git a/dbms/tests/queries/0_stateless/00966_live_view_watch_events_http.reference b/dbms/tests/queries/0_stateless/00966_live_view_watch_events_http.reference new file mode 100644 index 00000000000..e69de29bb2d diff --git a/dbms/tests/queries/0_stateless/00967_live_view_watch_http.py b/dbms/tests/queries/0_stateless/00967_live_view_watch_http.py new file mode 100755 index 00000000000..d3439431eb3 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00967_live_view_watch_http.py @@ -0,0 +1,37 @@ +#!/usr/bin/env python +import os +import sys + +CURDIR = os.path.dirname(os.path.realpath(__file__)) +sys.path.insert(0, os.path.join(CURDIR, 'helpers')) + +from client import client, prompt, end_of_block +from httpclient import client as http_client + +log = None +# uncomment the line below for debugging +#log=sys.stdout + +with client(name='client1>', log=log) as client1: + client1.expect(prompt) + + client1.send('DROP TABLE IF EXISTS test.lv') + client1.expect(prompt) + client1.send(' DROP TABLE IF EXISTS test.mt') + client1.expect(prompt) + client1.send('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()') + client1.expect(prompt) + client1.send('CREATE LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt') + client1.expect(prompt) + + + with http_client({'method':'GET', 'url':'/?query=WATCH%20test.lv'}, name='client2>', log=log) as client2: + client2.expect('.*0\t1\n') + client1.send('INSERT INTO test.mt VALUES (1),(2),(3)') + client1.expect(prompt) + client2.expect('.*6\t2\n') + + client1.send('DROP TABLE test.lv') + client1.expect(prompt) + client1.send('DROP TABLE test.mt') + client1.expect(prompt) diff --git a/dbms/tests/queries/0_stateless/00967_live_view_watch_http.reference b/dbms/tests/queries/0_stateless/00967_live_view_watch_http.reference new file mode 100644 index 00000000000..e69de29bb2d diff --git a/dbms/tests/queries/0_stateless/00968_live_view_select_format_jsoneachrowwithprogress.reference b/dbms/tests/queries/0_stateless/00968_live_view_select_format_jsoneachrowwithprogress.reference new file mode 100644 index 00000000000..5ae423d90d1 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00968_live_view_select_format_jsoneachrowwithprogress.reference @@ -0,0 +1,4 @@ +{"row":{"a":1}} +{"row":{"a":2}} +{"row":{"a":3}} +{"progress":{"read_rows":"3","read_bytes":"36","written_rows":"0","written_bytes":"0","total_rows_to_read":"0"}} diff --git a/dbms/tests/queries/0_stateless/00968_live_view_select_format_jsoneachrowwithprogress.sql b/dbms/tests/queries/0_stateless/00968_live_view_select_format_jsoneachrowwithprogress.sql new file mode 100644 index 00000000000..8c6f4197d54 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00968_live_view_select_format_jsoneachrowwithprogress.sql @@ -0,0 +1,12 @@ +DROP TABLE IF EXISTS test.lv; +DROP TABLE IF EXISTS test.mt; + +CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple(); +CREATE LIVE VIEW test.lv AS SELECT * FROM test.mt; + +INSERT INTO test.mt VALUES (1),(2),(3); + +SELECT * FROM test.lv FORMAT JSONEachRowWithProgress; + +DROP TABLE test.lv; +DROP TABLE test.mt; diff --git a/dbms/tests/queries/0_stateless/00969_live_view_watch_format_jsoneachrowwithprogress.reference b/dbms/tests/queries/0_stateless/00969_live_view_watch_format_jsoneachrowwithprogress.reference new file mode 100644 index 00000000000..287a1ced92d --- /dev/null +++ b/dbms/tests/queries/0_stateless/00969_live_view_watch_format_jsoneachrowwithprogress.reference @@ -0,0 +1,6 @@ +{"row":{"sum(a)":"0","_version":"1"}} +{"progress":{"read_rows":"1","read_bytes":"16","written_rows":"0","written_bytes":"0","total_rows_to_read":"0"}} +{"row":{"sum(a)":"6","_version":"2"}} +{"progress":{"read_rows":"1","read_bytes":"16","written_rows":"0","written_bytes":"0","total_rows_to_read":"0"}} +{"row":{"sum(a)":"21","_version":"3"}} +{"progress":{"read_rows":"1","read_bytes":"16","written_rows":"0","written_bytes":"0","total_rows_to_read":"0"}} diff --git a/dbms/tests/queries/0_stateless/00969_live_view_watch_format_jsoneachrowwithprogress.sql b/dbms/tests/queries/0_stateless/00969_live_view_watch_format_jsoneachrowwithprogress.sql new file mode 100644 index 00000000000..725a4ad00ed --- /dev/null +++ b/dbms/tests/queries/0_stateless/00969_live_view_watch_format_jsoneachrowwithprogress.sql @@ -0,0 +1,18 @@ +DROP TABLE IF EXISTS test.lv; +DROP TABLE IF EXISTS test.mt; + +CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple(); +CREATE LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt; + +WATCH test.lv LIMIT 0 FORMAT JSONEachRowWithProgress; + +INSERT INTO test.mt VALUES (1),(2),(3); + +WATCH test.lv LIMIT 0 FORMAT JSONEachRowWithProgress; + +INSERT INTO test.mt VALUES (4),(5),(6); + +WATCH test.lv LIMIT 0 FORMAT JSONEachRowWithProgress; + +DROP TABLE test.lv; +DROP TABLE test.mt; diff --git a/dbms/tests/queries/0_stateless/00970_live_view_watch_events_http_heartbeat.py b/dbms/tests/queries/0_stateless/00970_live_view_watch_events_http_heartbeat.py new file mode 100755 index 00000000000..63628c4a76f --- /dev/null +++ b/dbms/tests/queries/0_stateless/00970_live_view_watch_events_http_heartbeat.py @@ -0,0 +1,43 @@ +#!/usr/bin/env python +import os +import sys + +CURDIR = os.path.dirname(os.path.realpath(__file__)) +sys.path.insert(0, os.path.join(CURDIR, 'helpers')) + +from client import client, prompt, end_of_block +from httpclient import client as http_client + +log = None +# uncomment the line below for debugging +#log=sys.stdout + +with client(name='client1>', log=log) as client1: + client1.expect(prompt) + + client1.send('DROP TABLE IF EXISTS test.lv') + client1.expect(prompt) + client1.send(' DROP TABLE IF EXISTS test.mt') + client1.expect(prompt) + client1.send('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()') + client1.expect(prompt) + client1.send('CREATE LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt') + client1.expect(prompt) + + + with http_client({'method':'GET', 'url': '/?live_view_heartbeat_interval=1&query=WATCH%20test.lv%20EVENTS%20FORMAT%20JSONEachRowWithProgress'}, name='client2>', log=log) as client2: + client2.expect('{"progress":{"read_rows":"1","read_bytes":"8","written_rows":"0","written_bytes":"0","total_rows_to_read":"0"}}\n', escape=True) + client2.expect('{"row":{"version":"1"}', escape=True) + client2.expect('{"progress":{"read_rows":"1","read_bytes":"8","written_rows":"0","written_bytes":"0","total_rows_to_read":"0"}}', escape=True) + # heartbeat is provided by progress message + client2.expect('{"progress":{"read_rows":"1","read_bytes":"8","written_rows":"0","written_bytes":"0","total_rows_to_read":"0"}}', escape=True) + + client1.send('INSERT INTO test.mt VALUES (1),(2),(3)') + client1.expect(prompt) + + client2.expect('{"row":{"version":"2"}}\n', escape=True) + + client1.send('DROP TABLE test.lv') + client1.expect(prompt) + client1.send('DROP TABLE test.mt') + client1.expect(prompt) diff --git a/dbms/tests/queries/0_stateless/00970_live_view_watch_events_http_heartbeat.reference b/dbms/tests/queries/0_stateless/00970_live_view_watch_events_http_heartbeat.reference new file mode 100644 index 00000000000..e69de29bb2d diff --git a/dbms/tests/queries/0_stateless/00971_live_view_watch_http_heartbeat.py b/dbms/tests/queries/0_stateless/00971_live_view_watch_http_heartbeat.py new file mode 100755 index 00000000000..7bdb47b7caa --- /dev/null +++ b/dbms/tests/queries/0_stateless/00971_live_view_watch_http_heartbeat.py @@ -0,0 +1,43 @@ +#!/usr/bin/env python +import os +import sys + +CURDIR = os.path.dirname(os.path.realpath(__file__)) +sys.path.insert(0, os.path.join(CURDIR, 'helpers')) + +from client import client, prompt, end_of_block +from httpclient import client as http_client + +log = None +# uncomment the line below for debugging +#log=sys.stdout + +with client(name='client1>', log=log) as client1: + client1.expect(prompt) + + client1.send('DROP TABLE IF EXISTS test.lv') + client1.expect(prompt) + client1.send(' DROP TABLE IF EXISTS test.mt') + client1.expect(prompt) + client1.send('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()') + client1.expect(prompt) + client1.send('CREATE LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt') + client1.expect(prompt) + + with http_client({'method':'GET', 'url':'/?live_view_heartbeat_interval=1&query=WATCH%20test.lv%20FORMAT%20JSONEachRowWithProgress'}, name='client2>', log=log) as client2: + client2.expect('"progress".*',) + client2.expect('{"row":{"sum(a)":"0","_version":"1"}}\n', escape=True) + client2.expect('"progress".*\n') + # heartbeat is provided by progress message + client2.expect('"progress".*\n') + + client1.send('INSERT INTO test.mt VALUES (1),(2),(3)') + client1.expect(prompt) + + client2.expect('"progress".*"read_rows":"2".*\n') + client2.expect('{"row":{"sum(a)":"6","_version":"2"}}\n', escape=True) + + client1.send('DROP TABLE test.lv') + client1.expect(prompt) + client1.send('DROP TABLE test.mt') + client1.expect(prompt) diff --git a/dbms/tests/queries/0_stateless/00971_live_view_watch_http_heartbeat.reference b/dbms/tests/queries/0_stateless/00971_live_view_watch_http_heartbeat.reference new file mode 100644 index 00000000000..e69de29bb2d diff --git a/dbms/tests/queries/0_stateless/00972_live_view_select_1.reference b/dbms/tests/queries/0_stateless/00972_live_view_select_1.reference new file mode 100644 index 00000000000..d00491fd7e5 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00972_live_view_select_1.reference @@ -0,0 +1 @@ +1 diff --git a/dbms/tests/queries/0_stateless/00972_live_view_select_1.sql b/dbms/tests/queries/0_stateless/00972_live_view_select_1.sql new file mode 100644 index 00000000000..661080b577b --- /dev/null +++ b/dbms/tests/queries/0_stateless/00972_live_view_select_1.sql @@ -0,0 +1,7 @@ +DROP TABLE IF EXISTS test.lv; + +CREATE LIVE VIEW test.lv AS SELECT 1; + +SELECT * FROM test.lv; + +DROP TABLE test.lv; diff --git a/dbms/tests/queries/0_stateless/00973_live_view_select.reference b/dbms/tests/queries/0_stateless/00973_live_view_select.reference new file mode 100644 index 00000000000..75236c0daf7 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00973_live_view_select.reference @@ -0,0 +1,4 @@ +6 1 +6 1 +12 2 +12 2 diff --git a/dbms/tests/queries/0_stateless/00973_live_view_select.sql b/dbms/tests/queries/0_stateless/00973_live_view_select.sql new file mode 100644 index 00000000000..ff4a45ffcc1 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00973_live_view_select.sql @@ -0,0 +1,18 @@ +DROP TABLE IF EXISTS test.lv; +DROP TABLE IF EXISTS test.mt; + +CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple(); +CREATE LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt; + +INSERT INTO test.mt VALUES (1),(2),(3); + +SELECT *,_version FROM test.lv; +SELECT *,_version FROM test.lv; + +INSERT INTO test.mt VALUES (1),(2),(3); + +SELECT *,_version FROM test.lv; +SELECT *,_version FROM test.lv; + +DROP TABLE test.lv; +DROP TABLE test.mt; diff --git a/dbms/tests/queries/0_stateless/00974_live_view_select_with_aggregation.reference b/dbms/tests/queries/0_stateless/00974_live_view_select_with_aggregation.reference new file mode 100644 index 00000000000..6d50f0e9c3a --- /dev/null +++ b/dbms/tests/queries/0_stateless/00974_live_view_select_with_aggregation.reference @@ -0,0 +1,2 @@ +6 +21 diff --git a/dbms/tests/queries/0_stateless/00974_live_view_select_with_aggregation.sql b/dbms/tests/queries/0_stateless/00974_live_view_select_with_aggregation.sql new file mode 100644 index 00000000000..3c11f855c9d --- /dev/null +++ b/dbms/tests/queries/0_stateless/00974_live_view_select_with_aggregation.sql @@ -0,0 +1,16 @@ +DROP TABLE IF EXISTS test.lv; +DROP TABLE IF EXISTS test.mt; + +CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple(); +CREATE LIVE VIEW test.lv AS SELECT * FROM test.mt; + +INSERT INTO test.mt VALUES (1),(2),(3); + +SELECT sum(a) FROM test.lv; + +INSERT INTO test.mt VALUES (4),(5),(6); + +SELECT sum(a) FROM test.lv; + +DROP TABLE test.lv; +DROP TABLE test.mt; diff --git a/dbms/tests/queries/0_stateless/00975_live_view_create.reference b/dbms/tests/queries/0_stateless/00975_live_view_create.reference new file mode 100644 index 00000000000..e69de29bb2d diff --git a/dbms/tests/queries/0_stateless/00975_live_view_create.sql b/dbms/tests/queries/0_stateless/00975_live_view_create.sql new file mode 100644 index 00000000000..1c929b15b00 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00975_live_view_create.sql @@ -0,0 +1,7 @@ +DROP TABLE IF EXISTS test.mt; + +CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple(); +CREATE LIVE VIEW test.lv AS SELECT * FROM test.mt; + +DROP TABLE test.lv; +DROP TABLE test.mt; diff --git a/dbms/tests/queries/0_stateless/00976_live_view_select_version.reference b/dbms/tests/queries/0_stateless/00976_live_view_select_version.reference new file mode 100644 index 00000000000..453bd800469 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00976_live_view_select_version.reference @@ -0,0 +1,3 @@ +1 1 +2 1 +3 1 diff --git a/dbms/tests/queries/0_stateless/00976_live_view_select_version.sql b/dbms/tests/queries/0_stateless/00976_live_view_select_version.sql new file mode 100644 index 00000000000..5f3ab1f7546 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00976_live_view_select_version.sql @@ -0,0 +1,12 @@ +DROP TABLE IF EXISTS test.lv; +DROP TABLE IF EXISTS test.mt; + +CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple(); +CREATE LIVE VIEW test.lv AS SELECT * FROM test.mt; + +INSERT INTO test.mt VALUES (1),(2),(3); + +SELECT *,_version FROM test.lv; + +DROP TABLE test.lv; +DROP TABLE test.mt; diff --git a/dbms/tests/queries/0_stateless/00977_live_view_watch_events.reference b/dbms/tests/queries/0_stateless/00977_live_view_watch_events.reference new file mode 100644 index 00000000000..01e79c32a8c --- /dev/null +++ b/dbms/tests/queries/0_stateless/00977_live_view_watch_events.reference @@ -0,0 +1,3 @@ +1 +2 +3 diff --git a/dbms/tests/queries/0_stateless/00977_live_view_watch_events.sql b/dbms/tests/queries/0_stateless/00977_live_view_watch_events.sql new file mode 100644 index 00000000000..a3b84e8d4c1 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00977_live_view_watch_events.sql @@ -0,0 +1,18 @@ +DROP TABLE IF EXISTS test.lv; +DROP TABLE IF EXISTS test.mt; + +CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple(); +CREATE LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt; + +WATCH test.lv EVENTS LIMIT 0; + +INSERT INTO test.mt VALUES (1),(2),(3); + +WATCH test.lv EVENTS LIMIT 0; + +INSERT INTO test.mt VALUES (4),(5),(6); + +WATCH test.lv EVENTS LIMIT 0; + +DROP TABLE test.lv; +DROP TABLE test.mt; diff --git a/dbms/tests/queries/0_stateless/00978_live_view_watch.reference b/dbms/tests/queries/0_stateless/00978_live_view_watch.reference new file mode 100644 index 00000000000..6fbbedf1b21 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00978_live_view_watch.reference @@ -0,0 +1,3 @@ +0 1 +6 2 +21 3 diff --git a/dbms/tests/queries/0_stateless/00978_live_view_watch.sql b/dbms/tests/queries/0_stateless/00978_live_view_watch.sql new file mode 100644 index 00000000000..abe4a6c32ae --- /dev/null +++ b/dbms/tests/queries/0_stateless/00978_live_view_watch.sql @@ -0,0 +1,18 @@ +DROP TABLE IF EXISTS test.lv; +DROP TABLE IF EXISTS test.mt; + +CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple(); +CREATE LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt; + +WATCH test.lv LIMIT 0; + +INSERT INTO test.mt VALUES (1),(2),(3); + +WATCH test.lv LIMIT 0; + +INSERT INTO test.mt VALUES (4),(5),(6); + +WATCH test.lv LIMIT 0; + +DROP TABLE test.lv; +DROP TABLE test.mt; diff --git a/dbms/tests/queries/0_stateless/00979_live_view_watch_live.py b/dbms/tests/queries/0_stateless/00979_live_view_watch_live.py new file mode 100755 index 00000000000..948e4c93662 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00979_live_view_watch_live.py @@ -0,0 +1,48 @@ +#!/usr/bin/env python +import os +import sys +import signal + +CURDIR = os.path.dirname(os.path.realpath(__file__)) +sys.path.insert(0, os.path.join(CURDIR, 'helpers')) + +from client import client, prompt, end_of_block + +log = None +# uncomment the line below for debugging +#log=sys.stdout + +with client(name='client1>', log=log) as client1, client(name='client2>', log=log) as client2: + client1.expect(prompt) + client2.expect(prompt) + + client1.send('DROP TABLE IF EXISTS test.lv') + client1.expect(prompt) + client1.send(' DROP TABLE IF EXISTS test.mt') + client1.expect(prompt) + client1.send('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()') + client1.expect(prompt) + client1.send('CREATE LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt') + client1.expect(prompt) + client1.send('WATCH test.lv') + client1.expect(r'0.*1' + end_of_block) + client2.send('INSERT INTO test.mt VALUES (1),(2),(3)') + client1.expect(r'6.*2' + end_of_block) + client2.expect(prompt) + client2.send('INSERT INTO test.mt VALUES (4),(5),(6)') + client1.expect(r'21.*3' + end_of_block) + client2.expect(prompt) + for i in range(1,129): + client2.send('INSERT INTO test.mt VALUES (1)') + client1.expect(r'%d.*%d' % (21+i, 3+i) + end_of_block) + client2.expect(prompt) + # send Ctrl-C + client1.send('\x03', eol='') + match = client1.expect('(%s)|([#\$] )' % prompt) + if match.groups()[1]: + client1.send(client1.command) + client1.expect(prompt) + client1.send('DROP TABLE test.lv') + client1.expect(prompt) + client1.send('DROP TABLE test.mt') + client1.expect(prompt) diff --git a/dbms/tests/queries/0_stateless/00979_live_view_watch_live.reference b/dbms/tests/queries/0_stateless/00979_live_view_watch_live.reference new file mode 100644 index 00000000000..e69de29bb2d diff --git a/dbms/tests/queries/0_stateless/00980_create_temporary_live_view.reference b/dbms/tests/queries/0_stateless/00980_create_temporary_live_view.reference new file mode 100644 index 00000000000..7f9fcbb2e9c --- /dev/null +++ b/dbms/tests/queries/0_stateless/00980_create_temporary_live_view.reference @@ -0,0 +1,3 @@ +temporary_live_view_timeout 5 +live_view_heartbeat_interval 15 +0 diff --git a/dbms/tests/queries/0_stateless/00980_create_temporary_live_view.sql b/dbms/tests/queries/0_stateless/00980_create_temporary_live_view.sql new file mode 100644 index 00000000000..8cd6ee06ace --- /dev/null +++ b/dbms/tests/queries/0_stateless/00980_create_temporary_live_view.sql @@ -0,0 +1,15 @@ +DROP TABLE IF EXISTS test.lv; +DROP TABLE IF EXISTS test.mt; + +SELECT name, value from system.settings WHERE name = 'temporary_live_view_timeout'; +SELECT name, value from system.settings WHERE name = 'live_view_heartbeat_interval'; + +SET temporary_live_view_timeout=1; +CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple(); +CREATE TEMPORARY LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt; + +SHOW TABLES LIKE 'lv'; +SELECT sleep(2); +SHOW TABLES LIKE 'lv'; + +DROP TABLE test.mt; diff --git a/dbms/tests/queries/0_stateless/00991_live_view_watch_event_live.python b/dbms/tests/queries/0_stateless/00991_live_view_watch_event_live.python new file mode 100644 index 00000000000..782671cdfaf --- /dev/null +++ b/dbms/tests/queries/0_stateless/00991_live_view_watch_event_live.python @@ -0,0 +1,81 @@ +#!/usr/bin/env python + +import subprocess +import threading +import Queue as queue +import os +import sys +import signal + + +CLICKHOUSE_CLIENT = os.environ.get('CLICKHOUSE_CLIENT') +CLICKHOUSE_CURL = os.environ.get('CLICKHOUSE_CURL') +CLICKHOUSE_URL = os.environ.get('CLICKHOUSE_URL') + + +def send_query(query): + cmd = list(CLICKHOUSE_CLIENT.split()) + cmd += ['--query', query] + # print(cmd) + return subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT).stdout + + +def send_query_in_process_group(query): + cmd = list(CLICKHOUSE_CLIENT.split()) + cmd += ['--query', query] + # print(cmd) + return subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, preexec_fn=os.setsid) + + +def read_lines_and_push_to_queue(pipe, queue): + try: + for line in iter(pipe.readline, ''): + line = line.strip() + print(line) + sys.stdout.flush() + queue.put(line) + except KeyboardInterrupt: + pass + + queue.put(None) + + +def test(): + send_query('DROP TABLE IF EXISTS test.lv').read() + send_query('DROP TABLE IF EXISTS test.mt').read() + send_query('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()').read() + send_query('CREATE LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt').read() + + q = queue.Queue() + p = send_query_in_process_group('WATCH test.lv') + thread = threading.Thread(target=read_lines_and_push_to_queue, args=(p.stdout, q)) + thread.start() + + line = q.get() + print(line) + assert (line == '0\t1') + + send_query('INSERT INTO test.mt VALUES (1),(2),(3)').read() + line = q.get() + print(line) + assert (line == '6\t2') + + send_query('INSERT INTO test.mt VALUES (4),(5),(6)').read() + line = q.get() + print(line) + assert (line == '21\t3') + + # Send Ctrl+C to client. + os.killpg(os.getpgid(p.pid), signal.SIGINT) + # This insert shouldn't affect lv. + send_query('INSERT INTO test.mt VALUES (7),(8),(9)').read() + line = q.get() + print(line) + assert (line is None) + + send_query('DROP TABLE if exists test.lv').read() + send_query('DROP TABLE if exists test.lv').read() + + thread.join() + +test() diff --git a/dbms/tests/queries/0_stateless/00991_live_view_watch_event_live.reference b/dbms/tests/queries/0_stateless/00991_live_view_watch_event_live.reference new file mode 100644 index 00000000000..1e94cdade41 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00991_live_view_watch_event_live.reference @@ -0,0 +1,7 @@ +0 1 +0 1 +6 2 +6 2 +21 3 +21 3 +None diff --git a/dbms/tests/queries/0_stateless/00991_live_view_watch_event_live.sh.disabled b/dbms/tests/queries/0_stateless/00991_live_view_watch_event_live.sh.disabled new file mode 100755 index 00000000000..10e4e98b2e3 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00991_live_view_watch_event_live.sh.disabled @@ -0,0 +1,6 @@ +#!/usr/bin/env bash + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +. $CURDIR/../shell_config.sh + +python $CURDIR/00991_live_view_watch_event_live.python diff --git a/dbms/tests/queries/0_stateless/00991_live_view_watch_http.python b/dbms/tests/queries/0_stateless/00991_live_view_watch_http.python new file mode 100755 index 00000000000..938547ca0cb --- /dev/null +++ b/dbms/tests/queries/0_stateless/00991_live_view_watch_http.python @@ -0,0 +1,63 @@ +#!/usr/bin/env python + +import subprocess +import threading +import Queue as queue +import os +import sys + + +CLICKHOUSE_CLIENT = os.environ.get('CLICKHOUSE_CLIENT') +CLICKHOUSE_CURL = os.environ.get('CLICKHOUSE_CURL') +CLICKHOUSE_URL = os.environ.get('CLICKHOUSE_URL') + + +def send_query(query): + cmd = list(CLICKHOUSE_CLIENT.split()) + cmd += ['--query', query] + # print(cmd) + return subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT).stdout + + +def send_http_query(query): + cmd = list(CLICKHOUSE_CURL.split()) # list(['curl', '-sSN', '--max-time', '10']) + cmd += ['-sSN', CLICKHOUSE_URL, '-d', query] + return subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT).stdout + + +def read_lines_and_push_to_queue(pipe, queue): + for line in iter(pipe.readline, ''): + line = line.strip() + print(line) + sys.stdout.flush() + queue.put(line) + + queue.put(None) + + +def test(): + send_query('DROP TABLE IF EXISTS test.lv').read() + send_query('DROP TABLE IF EXISTS test.mt').read() + send_query('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()').read() + send_query('CREATE LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt').read() + + q = queue.Queue() + pipe = send_http_query('WATCH test.lv') + thread = threading.Thread(target=read_lines_and_push_to_queue, args=(pipe, q)) + thread.start() + + line = q.get() + print(line) + assert (line == '0\t1') + + send_query('INSERT INTO test.mt VALUES (1),(2),(3)').read() + line = q.get() + print(line) + assert (line == '6\t2') + + send_query('DROP TABLE if exists test.lv').read() + send_query('DROP TABLE if exists test.lv').read() + + thread.join() + +test() diff --git a/dbms/tests/queries/0_stateless/00991_live_view_watch_http.reference b/dbms/tests/queries/0_stateless/00991_live_view_watch_http.reference new file mode 100644 index 00000000000..489457d751b --- /dev/null +++ b/dbms/tests/queries/0_stateless/00991_live_view_watch_http.reference @@ -0,0 +1,4 @@ +0 1 +0 1 +6 2 +6 2 diff --git a/dbms/tests/queries/0_stateless/00991_live_view_watch_http.sh.disabled b/dbms/tests/queries/0_stateless/00991_live_view_watch_http.sh.disabled new file mode 100755 index 00000000000..88cce77f595 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00991_live_view_watch_http.sh.disabled @@ -0,0 +1,6 @@ +#!/usr/bin/env bash + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +. $CURDIR/../shell_config.sh + +python $CURDIR/00991_live_view_watch_http.python diff --git a/dbms/tests/queries/0_stateless/00991_temporary_live_view_watch_events_heartbeat.python b/dbms/tests/queries/0_stateless/00991_temporary_live_view_watch_events_heartbeat.python new file mode 100644 index 00000000000..70063adc6e3 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00991_temporary_live_view_watch_events_heartbeat.python @@ -0,0 +1,83 @@ +#!/usr/bin/env python + +import subprocess +import threading +import Queue as queue +import os +import sys +import signal + + +CLICKHOUSE_CLIENT = os.environ.get('CLICKHOUSE_CLIENT') +CLICKHOUSE_CURL = os.environ.get('CLICKHOUSE_CURL') +CLICKHOUSE_URL = os.environ.get('CLICKHOUSE_URL') + + +def send_query(query): + cmd = list(CLICKHOUSE_CLIENT.split()) + cmd += ['--query', query] + # print(cmd) + return subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT).stdout + + +def send_query_in_process_group(query): + cmd = list(CLICKHOUSE_CLIENT.split()) + cmd += ['--query', query, '--live_view_heartbeat_interval=1', '--progress'] + # print(cmd) + return subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, preexec_fn=os.setsid) + + +def read_lines_and_push_to_queue(pipe, queue): + try: + for line in iter(pipe.readline, ''): + line = line.strip() + # print(line) + sys.stdout.flush() + queue.put(line) + except KeyboardInterrupt: + pass + + queue.put(None) + + +def test(): + send_query('DROP TABLE IF EXISTS test.lv').read() + send_query('DROP TABLE IF EXISTS test.mt').read() + send_query('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()').read() + send_query('CREATE TEMPORARY LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt').read() + + q = queue.Queue() + p = send_query_in_process_group('WATCH test.lv') + thread = threading.Thread(target=read_lines_and_push_to_queue, args=(p.stdout, q)) + thread.start() + + line = q.get() + # print(line) + assert (line.endswith('0\t1')) + assert ('Progress: 0.00 rows' in line) + + send_query('INSERT INTO test.mt VALUES (1),(2),(3)').read() + line = q.get() + assert (line.endswith('6\t2')) + assert ('Progress: 1.00 rows' in line) + + # send_query('INSERT INTO test.mt VALUES (4),(5),(6)').read() + # line = q.get() + # print(line) + # assert (line.endswith('6\t2')) + # assert ('Progress: 1.00 rows' in line) + + # Send Ctrl+C to client. + os.killpg(os.getpgid(p.pid), signal.SIGINT) + # This insert shouldn't affect lv. + send_query('INSERT INTO test.mt VALUES (7),(8),(9)').read() + line = q.get() + # print(line) + # assert (line is None) + + send_query('DROP TABLE if exists test.lv').read() + send_query('DROP TABLE if exists test.lv').read() + + thread.join() + +test() diff --git a/dbms/tests/queries/0_stateless/00991_temporary_live_view_watch_events_heartbeat.reference b/dbms/tests/queries/0_stateless/00991_temporary_live_view_watch_events_heartbeat.reference new file mode 100644 index 00000000000..e69de29bb2d diff --git a/dbms/tests/queries/0_stateless/00991_temporary_live_view_watch_events_heartbeat.sh.disabled b/dbms/tests/queries/0_stateless/00991_temporary_live_view_watch_events_heartbeat.sh.disabled new file mode 100755 index 00000000000..f7aa13d52b3 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00991_temporary_live_view_watch_events_heartbeat.sh.disabled @@ -0,0 +1,6 @@ +#!/usr/bin/env bash + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +. $CURDIR/../shell_config.sh + +python $CURDIR/00991_temporary_live_view_watch_events_heartbeat.python diff --git a/dbms/tests/queries/0_stateless/00991_temporary_live_view_watch_live.python b/dbms/tests/queries/0_stateless/00991_temporary_live_view_watch_live.python new file mode 100644 index 00000000000..d290018a02c --- /dev/null +++ b/dbms/tests/queries/0_stateless/00991_temporary_live_view_watch_live.python @@ -0,0 +1,81 @@ +#!/usr/bin/env python + +import subprocess +import threading +import Queue as queue +import os +import sys +import signal + + +CLICKHOUSE_CLIENT = os.environ.get('CLICKHOUSE_CLIENT') +CLICKHOUSE_CURL = os.environ.get('CLICKHOUSE_CURL') +CLICKHOUSE_URL = os.environ.get('CLICKHOUSE_URL') + + +def send_query(query): + cmd = list(CLICKHOUSE_CLIENT.split()) + cmd += ['--query', query] + # print(cmd) + return subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT).stdout + + +def send_query_in_process_group(query): + cmd = list(CLICKHOUSE_CLIENT.split()) + cmd += ['--query', query] + # print(cmd) + return subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, preexec_fn=os.setsid) + + +def read_lines_and_push_to_queue(pipe, queue): + try: + for line in iter(pipe.readline, ''): + line = line.strip() + print(line) + sys.stdout.flush() + queue.put(line) + except KeyboardInterrupt: + pass + + queue.put(None) + + +def test(): + send_query('DROP TABLE IF EXISTS test.lv').read() + send_query('DROP TABLE IF EXISTS test.mt').read() + send_query('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()').read() + send_query('CREATE TEMPORARY LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt').read() + + q = queue.Queue() + p = send_query_in_process_group('WATCH test.lv') + thread = threading.Thread(target=read_lines_and_push_to_queue, args=(p.stdout, q)) + thread.start() + + line = q.get() + print(line) + assert (line == '0\t1') + + send_query('INSERT INTO test.mt VALUES (1),(2),(3)').read() + line = q.get() + print(line) + assert (line == '6\t2') + + send_query('INSERT INTO test.mt VALUES (4),(5),(6)').read() + line = q.get() + print(line) + assert (line == '21\t3') + + # Send Ctrl+C to client. + os.killpg(os.getpgid(p.pid), signal.SIGINT) + # This insert shouldn't affect lv. + send_query('INSERT INTO test.mt VALUES (7),(8),(9)').read() + line = q.get() + print(line) + assert (line is None) + + send_query('DROP TABLE if exists test.lv').read() + send_query('DROP TABLE if exists test.lv').read() + + thread.join() + +test() diff --git a/dbms/tests/queries/0_stateless/00991_temporary_live_view_watch_live.reference b/dbms/tests/queries/0_stateless/00991_temporary_live_view_watch_live.reference new file mode 100644 index 00000000000..1e94cdade41 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00991_temporary_live_view_watch_live.reference @@ -0,0 +1,7 @@ +0 1 +0 1 +6 2 +6 2 +21 3 +21 3 +None diff --git a/dbms/tests/queries/0_stateless/00991_temporary_live_view_watch_live.sh.disabled b/dbms/tests/queries/0_stateless/00991_temporary_live_view_watch_live.sh.disabled new file mode 100755 index 00000000000..4d01d1c3a8e --- /dev/null +++ b/dbms/tests/queries/0_stateless/00991_temporary_live_view_watch_live.sh.disabled @@ -0,0 +1,6 @@ +#!/usr/bin/env bash + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +. $CURDIR/../shell_config.sh + +python $CURDIR/00991_temporary_live_view_watch_live.python diff --git a/dbms/tests/queries/0_stateless/helpers/client.py b/dbms/tests/queries/0_stateless/helpers/client.py new file mode 100644 index 00000000000..f3938d3bf63 --- /dev/null +++ b/dbms/tests/queries/0_stateless/helpers/client.py @@ -0,0 +1,36 @@ +import os +import sys +import time + +CURDIR = os.path.dirname(os.path.realpath(__file__)) + +sys.path.insert(0, os.path.join(CURDIR)) + +import uexpect + +prompt = ':\) ' +end_of_block = r'.*\r\n.*\r\n' + +class client(object): + def __init__(self, command=None, name='', log=None): + self.client = uexpect.spawn(['/bin/bash','--noediting']) + if command is None: + command = os.environ.get('CLICKHOUSE_BINARY', 'clickhouse') + '-client' + self.client.command = command + self.client.eol('\r') + self.client.logger(log, prefix=name) + self.client.timeout(20) + self.client.expect('[#\$] ', timeout=2) + self.client.send(command) + + def __enter__(self): + return self.client.__enter__() + + def __exit__(self, type, value, traceback): + self.client.reader['kill_event'].set() + # send Ctrl-C + self.client.send('\x03', eol='') + time.sleep(0.3) + self.client.send('quit', eol='\r') + self.client.send('\x03', eol='') + return self.client.__exit__(type, value, traceback) diff --git a/dbms/tests/queries/0_stateless/helpers/httpclient.py b/dbms/tests/queries/0_stateless/helpers/httpclient.py new file mode 100644 index 00000000000..a42fad2cbc3 --- /dev/null +++ b/dbms/tests/queries/0_stateless/helpers/httpclient.py @@ -0,0 +1,14 @@ +import os +import sys + +CURDIR = os.path.dirname(os.path.realpath(__file__)) + +sys.path.insert(0, os.path.join(CURDIR)) + +import httpexpect + +def client(request, name='', log=None): + client = httpexpect.spawn({'host':'localhost','port':8123}, request) + client.logger(log, prefix=name) + client.timeout(20) + return client diff --git a/dbms/tests/queries/0_stateless/helpers/httpexpect.py b/dbms/tests/queries/0_stateless/helpers/httpexpect.py new file mode 100644 index 00000000000..e440dafce4e --- /dev/null +++ b/dbms/tests/queries/0_stateless/helpers/httpexpect.py @@ -0,0 +1,73 @@ +# Copyright (c) 2019 Vitaliy Zakaznikov +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import os +import sys +import httplib + +CURDIR = os.path.dirname(os.path.realpath(__file__)) +sys.path.insert(0, CURDIR) + +import uexpect + +from threading import Thread, Event +from Queue import Queue, Empty + +class IO(uexpect.IO): + def __init__(self, connection, response, queue, reader): + self.connection = connection + self.response = response + super(IO, self).__init__(None, None, queue, reader) + + def write(self, data): + raise NotImplementedError + + def close(self, force=True): + self.reader['kill_event'].set() + self.connection.close() + if self._logger: + self._logger.write('\n') + self._logger.flush() + + +def reader(response, queue, kill_event): + while True: + try: + if kill_event.is_set(): + break + data = response.read(1) + queue.put(data) + except Exception, e: + if kill_event.is_set(): + break + raise + +def spawn(connection, request): + connection = httplib.HTTPConnection(**connection) + connection.request(**request) + response = connection.getresponse() + + queue = Queue() + reader_kill_event = Event() + thread = Thread(target=reader, args=(response, queue, reader_kill_event)) + thread.daemon = True + thread.start() + + return IO(connection, response, queue, reader={'thread':thread, 'kill_event':reader_kill_event}) + +if __name__ == '__main__': + with http({'host':'localhost','port':8123},{'method':'GET', 'url':'?query=SELECT%201'}) as client: + client.logger(sys.stdout) + client.timeout(2) + print client.response.status, client.response.reason + client.expect('1\n') diff --git a/dbms/tests/queries/0_stateless/helpers/uexpect.py b/dbms/tests/queries/0_stateless/helpers/uexpect.py new file mode 100644 index 00000000000..f71b32a53e1 --- /dev/null +++ b/dbms/tests/queries/0_stateless/helpers/uexpect.py @@ -0,0 +1,206 @@ +# Copyright (c) 2019 Vitaliy Zakaznikov +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import os +import pty +import time +import sys +import re + +from threading import Thread, Event +from subprocess import Popen +from Queue import Queue, Empty + +class TimeoutError(Exception): + def __init__(self, timeout): + self.timeout = timeout + + def __str__(self): + return 'Timeout %.3fs' % float(self.timeout) + +class ExpectTimeoutError(Exception): + def __init__(self, pattern, timeout, buffer): + self.pattern = pattern + self.timeout = timeout + self.buffer = buffer + + def __str__(self): + s = 'Timeout %.3fs ' % float(self.timeout) + if self.pattern: + s += 'for %s ' % repr(self.pattern.pattern) + if self.buffer: + s += 'buffer %s ' % repr(self.buffer[:]) + s += 'or \'%s\'' % ','.join(['%x' % ord(c) for c in self.buffer[:]]) + return s + +class IO(object): + class EOF(object): + pass + + class Timeout(object): + pass + + EOF = EOF + TIMEOUT = Timeout + + class Logger(object): + def __init__(self, logger, prefix=''): + self._logger = logger + self._prefix = prefix + + def write(self, data): + self._logger.write(('\n' + data).replace('\n','\n' + self._prefix)) + + def flush(self): + self._logger.flush() + + def __init__(self, process, master, queue, reader): + self.process = process + self.master = master + self.queue = queue + self.buffer = None + self.before = None + self.after = None + self.match = None + self.pattern = None + self.reader = reader + self._timeout = None + self._logger = None + self._eol = '' + + def __enter__(self): + return self + + def __exit__(self, type, value, traceback): + self.close() + + def logger(self, logger=None, prefix=''): + if logger: + self._logger = self.Logger(logger, prefix=prefix) + return self._logger + + def timeout(self, timeout=None): + if timeout: + self._timeout = timeout + return self._timeout + + def eol(self, eol=None): + if eol: + self._eol = eol + return self._eol + + def close(self, force=True): + self.reader['kill_event'].set() + os.system('pkill -TERM -P %d' % self.process.pid) + if force: + self.process.kill() + else: + self.process.terminate() + os.close(self.master) + if self._logger: + self._logger.write('\n') + self._logger.flush() + + def send(self, data, eol=None): + if eol is None: + eol = self._eol + return self.write(data + eol) + + def write(self, data): + return os.write(self.master, data) + + def expect(self, pattern, timeout=None, escape=False): + self.match = None + self.before = None + self.after = None + if escape: + pattern = re.escape(pattern) + pattern = re.compile(pattern) + if timeout is None: + timeout = self._timeout + timeleft = timeout + while True: + start_time = time.time() + if self.buffer is not None: + self.match = pattern.search(self.buffer, 0) + if self.match is not None: + self.after = self.buffer[self.match.start():self.match.end()] + self.before = self.buffer[:self.match.start()] + self.buffer = self.buffer[self.match.end():] + break + if timeleft < 0: + break + try: + data = self.read(timeout=timeleft, raise_exception=True) + except TimeoutError: + if self._logger: + self._logger.write((self.buffer or '') + '\n') + self._logger.flush() + exception = ExpectTimeoutError(pattern, timeout, self.buffer) + self.buffer = None + raise exception + timeleft -= (time.time() - start_time) + if data: + self.buffer = (self.buffer + data) if self.buffer else data + if self._logger: + self._logger.write((self.before or '') + (self.after or '')) + self._logger.flush() + if self.match is None: + exception = ExpectTimeoutError(pattern, timeout, self.buffer) + self.buffer = None + raise exception + return self.match + + def read(self, timeout=0, raise_exception=False): + data = '' + timeleft = timeout + try: + while timeleft >= 0 : + start_time = time.time() + data += self.queue.get(timeout=timeleft) + if data: + break + timeleft -= (time.time() - start_time) + except Empty: + if data: + return data + if raise_exception: + raise TimeoutError(timeout) + pass + if not data and raise_exception: + raise TimeoutError(timeout) + + return data + +def spawn(command): + master, slave = pty.openpty() + process = Popen(command, preexec_fn=os.setsid, stdout=slave, stdin=slave, stderr=slave, bufsize=1) + os.close(slave) + + queue = Queue() + reader_kill_event = Event() + thread = Thread(target=reader, args=(process, master, queue, reader_kill_event)) + thread.daemon = True + thread.start() + + return IO(process, master, queue, reader={'thread':thread, 'kill_event':reader_kill_event}) + +def reader(process, out, queue, kill_event): + while True: + try: + data = os.read(out, 65536) + queue.put(data) + except: + if kill_event.is_set(): + break + raise From 62988800e61042c131bec717b36a369f8a419ca0 Mon Sep 17 00:00:00 2001 From: Vitaliy Zakaznikov Date: Sat, 24 Aug 2019 21:40:24 -0400 Subject: [PATCH 2/8] Rewriting implementation of LIVE VIEW no users thread. --- .../LiveView/LiveViewBlockInputStream.h | 21 +++--- .../LiveView/LiveViewEventsBlockInputStream.h | 20 +++--- .../src/Storages/LiveView/StorageLiveView.cpp | 66 +++++++++---------- dbms/src/Storages/LiveView/StorageLiveView.h | 16 ++--- 4 files changed, 63 insertions(+), 60 deletions(-) diff --git a/dbms/src/Storages/LiveView/LiveViewBlockInputStream.h b/dbms/src/Storages/LiveView/LiveViewBlockInputStream.h index 345fceaf095..60839f3e66f 100644 --- a/dbms/src/Storages/LiveView/LiveViewBlockInputStream.h +++ b/dbms/src/Storages/LiveView/LiveViewBlockInputStream.h @@ -18,10 +18,13 @@ using NonBlockingResult = std::pair; public: ~LiveViewBlockInputStream() override { - /// Start storage no users thread - /// if we are the last active user - if (!storage->is_dropped && blocks_ptr.use_count() < 3) - storage->startNoUsersThread(temporary_live_view_timeout_sec); + /// Wake up no users thread + { + std::cerr << "DEBUG: live view block input stream ... send wake up thread\n"; + std::lock_guard lock(storage->no_users_thread_mutex); + storage->no_users_thread_wakeup = true; + storage->no_users_thread_condition.notify_one(); + } } LiveViewBlockInputStream(std::shared_ptr storage_, @@ -29,9 +32,12 @@ public: std::shared_ptr blocks_metadata_ptr_, std::shared_ptr active_ptr_, const bool has_limit_, const UInt64 limit_, - const UInt64 heartbeat_interval_sec_, - const UInt64 temporary_live_view_timeout_sec_) - : storage(std::move(storage_)), blocks_ptr(std::move(blocks_ptr_)), blocks_metadata_ptr(std::move(blocks_metadata_ptr_)), active_ptr(std::move(active_ptr_)), has_limit(has_limit_), limit(limit_), heartbeat_interval_usec(heartbeat_interval_sec_ * 1000000), temporary_live_view_timeout_sec(temporary_live_view_timeout_sec_) + const UInt64 heartbeat_interval_sec_) + : storage(std::move(storage_)), blocks_ptr(std::move(blocks_ptr_)), + blocks_metadata_ptr(std::move(blocks_metadata_ptr_)), + active_ptr(std::move(active_ptr_)), + has_limit(has_limit_), limit(limit_), + heartbeat_interval_usec(heartbeat_interval_sec_ * 1000000) { /// grab active pointer active = active_ptr.lock(); @@ -200,7 +206,6 @@ private: Int64 num_updates = -1; bool end_of_blocks = false; UInt64 heartbeat_interval_usec; - UInt64 temporary_live_view_timeout_sec; UInt64 last_event_timestamp_usec = 0; Poco::Timestamp timestamp; }; diff --git a/dbms/src/Storages/LiveView/LiveViewEventsBlockInputStream.h b/dbms/src/Storages/LiveView/LiveViewEventsBlockInputStream.h index 120d0098536..e0e6ff78d21 100644 --- a/dbms/src/Storages/LiveView/LiveViewEventsBlockInputStream.h +++ b/dbms/src/Storages/LiveView/LiveViewEventsBlockInputStream.h @@ -37,10 +37,12 @@ using NonBlockingResult = std::pair; public: ~LiveViewEventsBlockInputStream() override { - /// Start storage no users thread - /// if we are the last active user - if (!storage->is_dropped && blocks_ptr.use_count() < 3) - storage->startNoUsersThread(temporary_live_view_timeout_sec); + /// Wake up no users thread + { + std::lock_guard lock(storage->no_users_thread_mutex); + storage->no_users_thread_wakeup = true; + storage->no_users_thread_condition.notify_one(); + } } /// length default -2 because we want LIMIT to specify number of updates so that LIMIT 1 waits for 1 update /// and LIMIT 0 just returns data without waiting for any updates @@ -49,9 +51,12 @@ public: std::shared_ptr blocks_metadata_ptr_, std::shared_ptr active_ptr_, const bool has_limit_, const UInt64 limit_, - const UInt64 heartbeat_interval_sec_, - const UInt64 temporary_live_view_timeout_sec_) - : storage(std::move(storage_)), blocks_ptr(std::move(blocks_ptr_)), blocks_metadata_ptr(std::move(blocks_metadata_ptr_)), active_ptr(std::move(active_ptr_)), has_limit(has_limit_), limit(limit_), heartbeat_interval_usec(heartbeat_interval_sec_ * 1000000), temporary_live_view_timeout_sec(temporary_live_view_timeout_sec_) + const UInt64 heartbeat_interval_sec_) + : storage(std::move(storage_)), blocks_ptr(std::move(blocks_ptr_)), + blocks_metadata_ptr(std::move(blocks_metadata_ptr_)), + active_ptr(std::move(active_ptr_)), has_limit(has_limit_), + limit(limit_), + heartbeat_interval_usec(heartbeat_interval_sec_ * 1000000) { /// grab active pointer active = active_ptr.lock(); @@ -236,7 +241,6 @@ private: Int64 num_updates = -1; bool end_of_blocks = false; UInt64 heartbeat_interval_usec; - UInt64 temporary_live_view_timeout_sec; UInt64 last_event_timestamp_usec = 0; Poco::Timestamp timestamp; }; diff --git a/dbms/src/Storages/LiveView/StorageLiveView.cpp b/dbms/src/Storages/LiveView/StorageLiveView.cpp index 3c0d205fa3f..d5de3b4a914 100644 --- a/dbms/src/Storages/LiveView/StorageLiveView.cpp +++ b/dbms/src/Storages/LiveView/StorageLiveView.cpp @@ -363,45 +363,49 @@ void StorageLiveView::checkTableCanBeDropped() const } } -void StorageLiveView::noUsersThread(const UInt64 & timeout) +void StorageLiveView::noUsersThread(std::shared_ptr storage, const UInt64 & timeout) { - if (shutdown_called) - return; - bool drop_table = false; + if (storage->shutdown_called || storage->is_dropped) + return; + { while (1) { - std::unique_lock lock(no_users_thread_mutex); - if (!no_users_thread_condition.wait_for(lock, std::chrono::seconds(timeout), [&] { return no_users_thread_wakeup; })) + std::unique_lock lock(storage->no_users_thread_mutex); + if(!storage->no_users_thread_condition.wait_for(lock, std::chrono::seconds(timeout), [&] { return storage->no_users_thread_wakeup; })) { - no_users_thread_wakeup = false; - if (shutdown_called) + storage->no_users_thread_wakeup = false; + if (storage->shutdown_called || storage->is_dropped) return; - if (hasUsers()) - return; - if (!global_context.getDependencies(database_name, table_name).empty()) + if (storage->hasUsers()) + continue; + if (!storage->global_context.getDependencies(storage->database_name, storage->table_name).empty()) continue; drop_table = true; } + else { + storage->no_users_thread_wakeup = false; + continue; + } break; } } if (drop_table) { - if (global_context.tryGetTable(database_name, table_name)) + if (storage->global_context.tryGetTable(storage->database_name, storage->table_name)) { try { /// We create and execute `drop` query for this table auto drop_query = std::make_shared(); - drop_query->database = database_name; - drop_query->table = table_name; + drop_query->database = storage->database_name; + drop_query->table = storage->table_name; drop_query->kind = ASTDropQuery::Kind::Drop; ASTPtr ast_drop_query = drop_query; - InterpreterDropQuery drop_interpreter(ast_drop_query, global_context); + InterpreterDropQuery drop_interpreter(ast_drop_query, storage->global_context); drop_interpreter.execute(); } catch (...) @@ -413,13 +417,6 @@ void StorageLiveView::noUsersThread(const UInt64 & timeout) void StorageLiveView::startNoUsersThread(const UInt64 & timeout) { - bool expected = false; - if (!start_no_users_thread_called.compare_exchange_strong(expected, true)) - return; - - if (is_dropped) - return; - if (is_temporary) { if (no_users_thread.joinable()) @@ -435,10 +432,9 @@ void StorageLiveView::startNoUsersThread(const UInt64 & timeout) std::lock_guard lock(no_users_thread_mutex); no_users_thread_wakeup = false; } - if (!is_dropped) - no_users_thread = std::thread(&StorageLiveView::noUsersThread, this, timeout); + no_users_thread = std::thread(&StorageLiveView::noUsersThread, + std::static_pointer_cast(shared_from_this()), timeout); } - start_no_users_thread_called = false; } void StorageLiveView::startup() @@ -454,19 +450,19 @@ void StorageLiveView::shutdown() if (no_users_thread.joinable()) { - std::lock_guard lock(no_users_thread_mutex); - no_users_thread_wakeup = true; - no_users_thread_condition.notify_one(); - /// Must detach the no users thread - /// as we can't join it as it will result - /// in a deadlock - no_users_thread.detach(); /// TODO Not viable at all. + { + std::lock_guard lock(no_users_thread_mutex); + no_users_thread_wakeup = true; + no_users_thread_condition.notify_one(); + } } } StorageLiveView::~StorageLiveView() { shutdown(); + if (no_users_thread.joinable()) + no_users_thread.detach(); } void StorageLiveView::drop() @@ -532,8 +528,7 @@ BlockInputStreams StorageLiveView::watch( if (query.is_watch_events) { - auto reader = std::make_shared(std::static_pointer_cast(shared_from_this()), blocks_ptr, blocks_metadata_ptr, active_ptr, has_limit, limit, context.getSettingsRef().live_view_heartbeat_interval.totalSeconds(), - context.getSettingsRef().temporary_live_view_timeout.totalSeconds()); + auto reader = std::make_shared(std::static_pointer_cast(shared_from_this()), blocks_ptr, blocks_metadata_ptr, active_ptr, has_limit, limit, context.getSettingsRef().live_view_heartbeat_interval.totalSeconds()); if (no_users_thread.joinable()) { @@ -557,8 +552,7 @@ BlockInputStreams StorageLiveView::watch( } else { - auto reader = std::make_shared(std::static_pointer_cast(shared_from_this()), blocks_ptr, blocks_metadata_ptr, active_ptr, has_limit, limit, context.getSettingsRef().live_view_heartbeat_interval.totalSeconds(), - context.getSettingsRef().temporary_live_view_timeout.totalSeconds()); + auto reader = std::make_shared(std::static_pointer_cast(shared_from_this()), blocks_ptr, blocks_metadata_ptr, active_ptr, has_limit, limit, context.getSettingsRef().live_view_heartbeat_interval.totalSeconds()); if (no_users_thread.joinable()) { diff --git a/dbms/src/Storages/LiveView/StorageLiveView.h b/dbms/src/Storages/LiveView/StorageLiveView.h index 9930d8d6154..a9a8985b4f8 100644 --- a/dbms/src/Storages/LiveView/StorageLiveView.h +++ b/dbms/src/Storages/LiveView/StorageLiveView.h @@ -71,11 +71,9 @@ public: { return active_ptr.use_count() > 1; } - /// Background thread for temporary tables - /// which drops this table if there are no users - void startNoUsersThread(const UInt64 & timeout); + /// No users thread mutex, predicate and wake up condition std::mutex no_users_thread_mutex; - bool no_users_thread_wakeup{false}; + bool no_users_thread_wakeup = false; std::condition_variable no_users_thread_condition; /// Get blocks hash /// must be called with mutex locked @@ -149,7 +147,7 @@ private: String database_name; ASTPtr inner_query; Context & global_context; - bool is_temporary {false}; + bool is_temporary = false; mutable Block sample_block; /// Mutex for the blocks and ready condition @@ -166,10 +164,12 @@ private: std::shared_ptr blocks_metadata_ptr; BlocksPtrs mergeable_blocks; - void noUsersThread(const UInt64 & timeout); + /// Background thread for temporary tables + /// which drops this table if there are no users + void startNoUsersThread(const UInt64 & timeout); + static void noUsersThread(std::shared_ptr storage, const UInt64 & timeout); std::thread no_users_thread; - std::atomic shutdown_called{false}; - std::atomic start_no_users_thread_called{false}; + std::atomic shutdown_called = false; UInt64 temporary_live_view_timeout; StorageLiveView( From 2342d64d1b8f2c2b7fa2946d65664432a6a890b9 Mon Sep 17 00:00:00 2001 From: Vitaliy Zakaznikov Date: Sun, 25 Aug 2019 07:36:08 -0400 Subject: [PATCH 3/8] * Updating no users thread to sleep 3 times longer when users are present to reduce the number of times thread wakes up. * Updating startNoUsersThread to return if the thread is already running. --- .../src/Storages/LiveView/StorageLiveView.cpp | 33 +++++++++++++------ 1 file changed, 23 insertions(+), 10 deletions(-) diff --git a/dbms/src/Storages/LiveView/StorageLiveView.cpp b/dbms/src/Storages/LiveView/StorageLiveView.cpp index d5de3b4a914..6ceabf3478c 100644 --- a/dbms/src/Storages/LiveView/StorageLiveView.cpp +++ b/dbms/src/Storages/LiveView/StorageLiveView.cpp @@ -366,27 +366,40 @@ void StorageLiveView::checkTableCanBeDropped() const void StorageLiveView::noUsersThread(std::shared_ptr storage, const UInt64 & timeout) { bool drop_table = false; + UInt64 next_timeout = timeout; - if (storage->shutdown_called || storage->is_dropped) + if (storage->shutdown_called) return; { while (1) { std::unique_lock lock(storage->no_users_thread_mutex); - if(!storage->no_users_thread_condition.wait_for(lock, std::chrono::seconds(timeout), [&] { return storage->no_users_thread_wakeup; })) + if (!storage->no_users_thread_condition.wait_for(lock, std::chrono::seconds(next_timeout), [&] { return storage->no_users_thread_wakeup; })) { storage->no_users_thread_wakeup = false; - if (storage->shutdown_called || storage->is_dropped) + if (storage->shutdown_called) return; if (storage->hasUsers()) + { + /// Thread woke up but there are still users so sleep for 3 times longer than + /// the original timeout to reduce the number of times thread wakes up. + /// Wait until we are explicitely woken up when a user goes away to + /// reset wait time to the original timeout. + next_timeout = timeout * 3; continue; + } if (!storage->global_context.getDependencies(storage->database_name, storage->table_name).empty()) continue; drop_table = true; } - else { + else + { + /// Thread was explicitly awaken so reset timeout to the original + next_timeout = timeout; storage->no_users_thread_wakeup = false; + if (storage->shutdown_called) + return; continue; } break; @@ -421,12 +434,12 @@ void StorageLiveView::startNoUsersThread(const UInt64 & timeout) { if (no_users_thread.joinable()) { - { - std::lock_guard lock(no_users_thread_mutex); - no_users_thread_wakeup = true; - no_users_thread_condition.notify_one(); - } - no_users_thread.join(); + /// If the thread is already running then + /// wake it up and just return + std::lock_guard lock(no_users_thread_mutex); + no_users_thread_wakeup = true; + no_users_thread_condition.notify_one(); + return; } { std::lock_guard lock(no_users_thread_mutex); From 7fb13b12f9e15eb01ecac959c4e9b0e9a5f6eb53 Mon Sep 17 00:00:00 2001 From: Vitaliy Zakaznikov Date: Sun, 25 Aug 2019 08:27:47 -0400 Subject: [PATCH 4/8] Reverting to previous no users thread functionality to avoid keeping no users thread always alive for each live view. --- .../LiveView/LiveViewBlockInputStream.h | 18 +++--- .../LiveView/LiveViewEventsBlockInputStream.h | 17 +++--- .../src/Storages/LiveView/StorageLiveView.cpp | 56 +++++++++---------- dbms/src/Storages/LiveView/StorageLiveView.h | 3 +- 4 files changed, 47 insertions(+), 47 deletions(-) diff --git a/dbms/src/Storages/LiveView/LiveViewBlockInputStream.h b/dbms/src/Storages/LiveView/LiveViewBlockInputStream.h index 60839f3e66f..f73991ddb77 100644 --- a/dbms/src/Storages/LiveView/LiveViewBlockInputStream.h +++ b/dbms/src/Storages/LiveView/LiveViewBlockInputStream.h @@ -18,13 +18,10 @@ using NonBlockingResult = std::pair; public: ~LiveViewBlockInputStream() override { - /// Wake up no users thread - { - std::cerr << "DEBUG: live view block input stream ... send wake up thread\n"; - std::lock_guard lock(storage->no_users_thread_mutex); - storage->no_users_thread_wakeup = true; - storage->no_users_thread_condition.notify_one(); - } + /// Start storage no users thread + /// if we are the last active user + if (!storage->is_dropped && blocks_ptr.use_count() < 3) + storage->startNoUsersThread(temporary_live_view_timeout_sec); } LiveViewBlockInputStream(std::shared_ptr storage_, @@ -32,12 +29,14 @@ public: std::shared_ptr blocks_metadata_ptr_, std::shared_ptr active_ptr_, const bool has_limit_, const UInt64 limit_, - const UInt64 heartbeat_interval_sec_) + const UInt64 heartbeat_interval_sec_, + const UInt64 temporary_live_view_timeout_sec_) : storage(std::move(storage_)), blocks_ptr(std::move(blocks_ptr_)), blocks_metadata_ptr(std::move(blocks_metadata_ptr_)), active_ptr(std::move(active_ptr_)), has_limit(has_limit_), limit(limit_), - heartbeat_interval_usec(heartbeat_interval_sec_ * 1000000) + heartbeat_interval_usec(heartbeat_interval_sec_ * 1000000), + temporary_live_view_timeout_sec(temporary_live_view_timeout_sec_) { /// grab active pointer active = active_ptr.lock(); @@ -206,6 +205,7 @@ private: Int64 num_updates = -1; bool end_of_blocks = false; UInt64 heartbeat_interval_usec; + UInt64 temporary_live_view_timeout_sec; UInt64 last_event_timestamp_usec = 0; Poco::Timestamp timestamp; }; diff --git a/dbms/src/Storages/LiveView/LiveViewEventsBlockInputStream.h b/dbms/src/Storages/LiveView/LiveViewEventsBlockInputStream.h index e0e6ff78d21..3308ff2858b 100644 --- a/dbms/src/Storages/LiveView/LiveViewEventsBlockInputStream.h +++ b/dbms/src/Storages/LiveView/LiveViewEventsBlockInputStream.h @@ -37,12 +37,10 @@ using NonBlockingResult = std::pair; public: ~LiveViewEventsBlockInputStream() override { - /// Wake up no users thread - { - std::lock_guard lock(storage->no_users_thread_mutex); - storage->no_users_thread_wakeup = true; - storage->no_users_thread_condition.notify_one(); - } + /// Start storage no users thread + /// if we are the last active user + if (!storage->is_dropped && blocks_ptr.use_count() < 3) + storage->startNoUsersThread(temporary_live_view_timeout_sec); } /// length default -2 because we want LIMIT to specify number of updates so that LIMIT 1 waits for 1 update /// and LIMIT 0 just returns data without waiting for any updates @@ -51,12 +49,14 @@ public: std::shared_ptr blocks_metadata_ptr_, std::shared_ptr active_ptr_, const bool has_limit_, const UInt64 limit_, - const UInt64 heartbeat_interval_sec_) + const UInt64 heartbeat_interval_sec_, + const UInt64 temporary_live_view_timeout_sec_) : storage(std::move(storage_)), blocks_ptr(std::move(blocks_ptr_)), blocks_metadata_ptr(std::move(blocks_metadata_ptr_)), active_ptr(std::move(active_ptr_)), has_limit(has_limit_), limit(limit_), - heartbeat_interval_usec(heartbeat_interval_sec_ * 1000000) + heartbeat_interval_usec(heartbeat_interval_sec_ * 1000000), + temporary_live_view_timeout_sec(temporary_live_view_timeout_sec_) { /// grab active pointer active = active_ptr.lock(); @@ -241,6 +241,7 @@ private: Int64 num_updates = -1; bool end_of_blocks = false; UInt64 heartbeat_interval_usec; + UInt64 temporary_live_view_timeout_sec; UInt64 last_event_timestamp_usec = 0; Poco::Timestamp timestamp; }; diff --git a/dbms/src/Storages/LiveView/StorageLiveView.cpp b/dbms/src/Storages/LiveView/StorageLiveView.cpp index 6ceabf3478c..6c949424f68 100644 --- a/dbms/src/Storages/LiveView/StorageLiveView.cpp +++ b/dbms/src/Storages/LiveView/StorageLiveView.cpp @@ -366,7 +366,6 @@ void StorageLiveView::checkTableCanBeDropped() const void StorageLiveView::noUsersThread(std::shared_ptr storage, const UInt64 & timeout) { bool drop_table = false; - UInt64 next_timeout = timeout; if (storage->shutdown_called) return; @@ -375,33 +374,17 @@ void StorageLiveView::noUsersThread(std::shared_ptr storage, co while (1) { std::unique_lock lock(storage->no_users_thread_mutex); - if (!storage->no_users_thread_condition.wait_for(lock, std::chrono::seconds(next_timeout), [&] { return storage->no_users_thread_wakeup; })) + if (!storage->no_users_thread_condition.wait_for(lock, std::chrono::seconds(timeout), [&] { return storage->no_users_thread_wakeup; })) { storage->no_users_thread_wakeup = false; if (storage->shutdown_called) return; if (storage->hasUsers()) - { - /// Thread woke up but there are still users so sleep for 3 times longer than - /// the original timeout to reduce the number of times thread wakes up. - /// Wait until we are explicitely woken up when a user goes away to - /// reset wait time to the original timeout. - next_timeout = timeout * 3; - continue; - } + return; if (!storage->global_context.getDependencies(storage->database_name, storage->table_name).empty()) continue; drop_table = true; } - else - { - /// Thread was explicitly awaken so reset timeout to the original - next_timeout = timeout; - storage->no_users_thread_wakeup = false; - if (storage->shutdown_called) - return; - continue; - } break; } } @@ -430,24 +413,31 @@ void StorageLiveView::noUsersThread(std::shared_ptr storage, co void StorageLiveView::startNoUsersThread(const UInt64 & timeout) { + bool expected = false; + if (!start_no_users_thread_called.compare_exchange_strong(expected, true)) + return; + if (is_temporary) { if (no_users_thread.joinable()) { - /// If the thread is already running then - /// wake it up and just return - std::lock_guard lock(no_users_thread_mutex); - no_users_thread_wakeup = true; - no_users_thread_condition.notify_one(); - return; + { + std::lock_guard lock(no_users_thread_mutex); + no_users_thread_wakeup = true; + no_users_thread_condition.notify_one(); + } + no_users_thread.join(); } { std::lock_guard lock(no_users_thread_mutex); no_users_thread_wakeup = false; } - no_users_thread = std::thread(&StorageLiveView::noUsersThread, - std::static_pointer_cast(shared_from_this()), timeout); + if (!is_dropped) + no_users_thread = std::thread(&StorageLiveView::noUsersThread, + std::static_pointer_cast(shared_from_this()), timeout); } + + start_no_users_thread_called = false; } void StorageLiveView::startup() @@ -541,7 +531,11 @@ BlockInputStreams StorageLiveView::watch( if (query.is_watch_events) { - auto reader = std::make_shared(std::static_pointer_cast(shared_from_this()), blocks_ptr, blocks_metadata_ptr, active_ptr, has_limit, limit, context.getSettingsRef().live_view_heartbeat_interval.totalSeconds()); + auto reader = std::make_shared( + std::static_pointer_cast(shared_from_this()), + blocks_ptr, blocks_metadata_ptr, active_ptr, has_limit, limit, + context.getSettingsRef().live_view_heartbeat_interval.totalSeconds(), + context.getSettingsRef().temporary_live_view_timeout.totalSeconds()); if (no_users_thread.joinable()) { @@ -565,7 +559,11 @@ BlockInputStreams StorageLiveView::watch( } else { - auto reader = std::make_shared(std::static_pointer_cast(shared_from_this()), blocks_ptr, blocks_metadata_ptr, active_ptr, has_limit, limit, context.getSettingsRef().live_view_heartbeat_interval.totalSeconds()); + auto reader = std::make_shared( + std::static_pointer_cast(shared_from_this()), + blocks_ptr, blocks_metadata_ptr, active_ptr, has_limit, limit, + context.getSettingsRef().live_view_heartbeat_interval.totalSeconds(), + context.getSettingsRef().temporary_live_view_timeout.totalSeconds()); if (no_users_thread.joinable()) { diff --git a/dbms/src/Storages/LiveView/StorageLiveView.h b/dbms/src/Storages/LiveView/StorageLiveView.h index a9a8985b4f8..f71d0758f03 100644 --- a/dbms/src/Storages/LiveView/StorageLiveView.h +++ b/dbms/src/Storages/LiveView/StorageLiveView.h @@ -72,6 +72,7 @@ public: return active_ptr.use_count() > 1; } /// No users thread mutex, predicate and wake up condition + void startNoUsersThread(const UInt64 & timeout); std::mutex no_users_thread_mutex; bool no_users_thread_wakeup = false; std::condition_variable no_users_thread_condition; @@ -166,10 +167,10 @@ private: /// Background thread for temporary tables /// which drops this table if there are no users - void startNoUsersThread(const UInt64 & timeout); static void noUsersThread(std::shared_ptr storage, const UInt64 & timeout); std::thread no_users_thread; std::atomic shutdown_called = false; + std::atomic start_no_users_thread_called = false; UInt64 temporary_live_view_timeout; StorageLiveView( From 12cb72175b8e5ad156a03ea9f8d45dd453d62a1d Mon Sep 17 00:00:00 2001 From: Vitaliy Zakaznikov Date: Sun, 25 Aug 2019 20:44:03 -0400 Subject: [PATCH 5/8] Updating test server config to enable live views using the allow_experimental_live_view option. --- dbms/tests/server-test.xml | 1 + 1 file changed, 1 insertion(+) diff --git a/dbms/tests/server-test.xml b/dbms/tests/server-test.xml index d68cbca53c1..4a4e0a333ff 100644 --- a/dbms/tests/server-test.xml +++ b/dbms/tests/server-test.xml @@ -43,6 +43,7 @@ + 1 3 /tmp/clickhouse/data/ /tmp/clickhouse/tmp/ From c70f656d5d6e6c83f6b3e3720d14bfc3277372b8 Mon Sep 17 00:00:00 2001 From: Vitaliy Zakaznikov Date: Sun, 25 Aug 2019 21:57:32 -0400 Subject: [PATCH 6/8] Another attempt to enable allow_experimental_live_view options during tests. --- dbms/tests/clickhouse-client.xml | 1 + dbms/tests/server-test.xml | 1 - 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/dbms/tests/clickhouse-client.xml b/dbms/tests/clickhouse-client.xml index b6003ca2d09..ebce35127e5 100644 --- a/dbms/tests/clickhouse-client.xml +++ b/dbms/tests/clickhouse-client.xml @@ -1,3 +1,4 @@ + 1 100000 diff --git a/dbms/tests/server-test.xml b/dbms/tests/server-test.xml index 4a4e0a333ff..d68cbca53c1 100644 --- a/dbms/tests/server-test.xml +++ b/dbms/tests/server-test.xml @@ -43,7 +43,6 @@ - 1 3 /tmp/clickhouse/data/ /tmp/clickhouse/tmp/ From 5fb5c8dffcbc773d46d96d3ec43fc4febca4d468 Mon Sep 17 00:00:00 2001 From: Vitaliy Zakaznikov Date: Mon, 26 Aug 2019 19:50:37 -0400 Subject: [PATCH 7/8] Updating all live view tests to set the allow_experimental_live_view option. --- .../0_stateless/00960_live_view_watch_events_live.py | 5 +++++ .../queries/0_stateless/00961_temporary_live_view_watch.sql | 2 ++ .../0_stateless/00962_temporary_live_view_watch_live.py | 5 +++++ ...00963_temporary_live_view_watch_live_timeout.py.disabled | 5 +++++ .../0_stateless/00964_live_view_watch_events_heartbeat.py | 5 +++++ .../queries/0_stateless/00965_live_view_watch_heartbeat.py | 5 +++++ .../0_stateless/00966_live_view_watch_events_http.py | 5 ++++- .../tests/queries/0_stateless/00967_live_view_watch_http.py | 5 ++++- ...0968_live_view_select_format_jsoneachrowwithprogress.sql | 2 ++ ...00969_live_view_watch_format_jsoneachrowwithprogress.sql | 2 ++ .../00970_live_view_watch_events_http_heartbeat.py | 6 ++++-- .../0_stateless/00971_live_view_watch_http_heartbeat.py | 5 ++++- dbms/tests/queries/0_stateless/00972_live_view_select_1.sql | 2 ++ dbms/tests/queries/0_stateless/00973_live_view_select.sql | 2 ++ .../0_stateless/00974_live_view_select_with_aggregation.sql | 2 ++ dbms/tests/queries/0_stateless/00975_live_view_create.sql | 2 ++ .../queries/0_stateless/00976_live_view_select_version.sql | 2 ++ .../queries/0_stateless/00977_live_view_watch_events.sql | 2 ++ dbms/tests/queries/0_stateless/00978_live_view_watch.sql | 2 ++ .../tests/queries/0_stateless/00979_live_view_watch_live.py | 5 +++++ .../0_stateless/00980_create_temporary_live_view.sql | 2 ++ 21 files changed, 68 insertions(+), 5 deletions(-) diff --git a/dbms/tests/queries/0_stateless/00960_live_view_watch_events_live.py b/dbms/tests/queries/0_stateless/00960_live_view_watch_events_live.py index b7fc3f4e3a6..2095683720e 100755 --- a/dbms/tests/queries/0_stateless/00960_live_view_watch_events_live.py +++ b/dbms/tests/queries/0_stateless/00960_live_view_watch_events_live.py @@ -16,6 +16,11 @@ with client(name='client1>', log=log) as client1, client(name='client2>', log=lo client1.expect(prompt) client2.expect(prompt) + client1.send('SET allow_experimental_live_view = 1') + client1.expect(prompt) + client2.send('SET allow_experimental_live_view = 1') + client2.expect(prompt) + client1.send('DROP TABLE IF EXISTS test.lv') client1.expect(prompt) client1.send(' DROP TABLE IF EXISTS test.mt') diff --git a/dbms/tests/queries/0_stateless/00961_temporary_live_view_watch.sql b/dbms/tests/queries/0_stateless/00961_temporary_live_view_watch.sql index c3e2ab8d102..7992da92f97 100644 --- a/dbms/tests/queries/0_stateless/00961_temporary_live_view_watch.sql +++ b/dbms/tests/queries/0_stateless/00961_temporary_live_view_watch.sql @@ -1,3 +1,5 @@ +SET allow_experimental_live_view = 1; + DROP TABLE IF EXISTS test.lv; DROP TABLE IF EXISTS test.mt; diff --git a/dbms/tests/queries/0_stateless/00962_temporary_live_view_watch_live.py b/dbms/tests/queries/0_stateless/00962_temporary_live_view_watch_live.py index f27b1213c70..3dbec01b29a 100755 --- a/dbms/tests/queries/0_stateless/00962_temporary_live_view_watch_live.py +++ b/dbms/tests/queries/0_stateless/00962_temporary_live_view_watch_live.py @@ -16,6 +16,11 @@ with client(name='client1>', log=log) as client1, client(name='client2>', log=lo client1.expect(prompt) client2.expect(prompt) + client1.send('SET allow_experimental_live_view = 1') + client1.expect(prompt) + client2.send('SET allow_experimental_live_view = 1') + client2.expect(prompt) + client1.send('DROP TABLE IF EXISTS test.lv') client1.expect(prompt) client1.send('DROP TABLE IF EXISTS test.mt') diff --git a/dbms/tests/queries/0_stateless/00963_temporary_live_view_watch_live_timeout.py.disabled b/dbms/tests/queries/0_stateless/00963_temporary_live_view_watch_live_timeout.py.disabled index df627c84e49..b324c1b90cc 100755 --- a/dbms/tests/queries/0_stateless/00963_temporary_live_view_watch_live_timeout.py.disabled +++ b/dbms/tests/queries/0_stateless/00963_temporary_live_view_watch_live_timeout.py.disabled @@ -16,6 +16,11 @@ with client(name='client1>', log=log) as client1, client(name='client2>', log=lo client1.expect(prompt) client2.expect(prompt) + client1.send('SET allow_experimental_live_view = 1') + client1.expect(prompt) + client2.send('SET allow_experimental_live_view = 1') + client2.expect(prompt) + client1.send('DROP TABLE IF EXISTS test.lv') client1.expect(prompt) client1.send('DROP TABLE IF EXISTS test.mt') diff --git a/dbms/tests/queries/0_stateless/00964_live_view_watch_events_heartbeat.py b/dbms/tests/queries/0_stateless/00964_live_view_watch_events_heartbeat.py index 5664c0e6c6d..528f18839bb 100755 --- a/dbms/tests/queries/0_stateless/00964_live_view_watch_events_heartbeat.py +++ b/dbms/tests/queries/0_stateless/00964_live_view_watch_events_heartbeat.py @@ -16,6 +16,11 @@ with client(name='client1>', log=log) as client1, client(name='client2>', log=lo client1.expect(prompt) client2.expect(prompt) + client1.send('SET allow_experimental_live_view = 1') + client1.expect(prompt) + client2.send('SET allow_experimental_live_view = 1') + client2.expect(prompt) + client1.send('DROP TABLE IF EXISTS test.lv') client1.expect(prompt) client1.send(' DROP TABLE IF EXISTS test.mt') diff --git a/dbms/tests/queries/0_stateless/00965_live_view_watch_heartbeat.py b/dbms/tests/queries/0_stateless/00965_live_view_watch_heartbeat.py index 03e22175dff..2723936f876 100755 --- a/dbms/tests/queries/0_stateless/00965_live_view_watch_heartbeat.py +++ b/dbms/tests/queries/0_stateless/00965_live_view_watch_heartbeat.py @@ -16,6 +16,11 @@ with client(name='client1>', log=log) as client1, client(name='client2>', log=lo client1.expect(prompt) client2.expect(prompt) + client1.send('SET allow_experimental_live_view = 1') + client1.expect(prompt) + client2.send('SET allow_experimental_live_view = 1') + client2.expect(prompt) + client1.send('DROP TABLE IF EXISTS test.lv') client1.expect(prompt) client1.send(' DROP TABLE IF EXISTS test.mt') diff --git a/dbms/tests/queries/0_stateless/00966_live_view_watch_events_http.py b/dbms/tests/queries/0_stateless/00966_live_view_watch_events_http.py index bb9d6152200..72ab3ea8818 100755 --- a/dbms/tests/queries/0_stateless/00966_live_view_watch_events_http.py +++ b/dbms/tests/queries/0_stateless/00966_live_view_watch_events_http.py @@ -15,6 +15,9 @@ log = None with client(name='client1>', log=log) as client1: client1.expect(prompt) + client1.send('SET allow_experimental_live_view = 1') + client1.expect(prompt) + client1.send('DROP TABLE IF EXISTS test.lv') client1.expect(prompt) client1.send(' DROP TABLE IF EXISTS test.mt') @@ -25,7 +28,7 @@ with client(name='client1>', log=log) as client1: client1.expect(prompt) - with http_client({'method':'GET', 'url': '/?query=WATCH%20test.lv%20EVENTS'}, name='client2>', log=log) as client2: + with http_client({'method':'GET', 'url': '/?allow_experimental_live_view=1&query=WATCH%20test.lv%20EVENTS'}, name='client2>', log=log) as client2: client2.expect('.*1\n') client1.send('INSERT INTO test.mt VALUES (1),(2),(3)') client1.expect(prompt) diff --git a/dbms/tests/queries/0_stateless/00967_live_view_watch_http.py b/dbms/tests/queries/0_stateless/00967_live_view_watch_http.py index d3439431eb3..e2f33971c3d 100755 --- a/dbms/tests/queries/0_stateless/00967_live_view_watch_http.py +++ b/dbms/tests/queries/0_stateless/00967_live_view_watch_http.py @@ -15,6 +15,9 @@ log = None with client(name='client1>', log=log) as client1: client1.expect(prompt) + client1.send('SET allow_experimental_live_view = 1') + client1.expect(prompt) + client1.send('DROP TABLE IF EXISTS test.lv') client1.expect(prompt) client1.send(' DROP TABLE IF EXISTS test.mt') @@ -25,7 +28,7 @@ with client(name='client1>', log=log) as client1: client1.expect(prompt) - with http_client({'method':'GET', 'url':'/?query=WATCH%20test.lv'}, name='client2>', log=log) as client2: + with http_client({'method':'GET', 'url':'/?allow_experimental_live_view=1&query=WATCH%20test.lv'}, name='client2>', log=log) as client2: client2.expect('.*0\t1\n') client1.send('INSERT INTO test.mt VALUES (1),(2),(3)') client1.expect(prompt) diff --git a/dbms/tests/queries/0_stateless/00968_live_view_select_format_jsoneachrowwithprogress.sql b/dbms/tests/queries/0_stateless/00968_live_view_select_format_jsoneachrowwithprogress.sql index 8c6f4197d54..1023cdf6b29 100644 --- a/dbms/tests/queries/0_stateless/00968_live_view_select_format_jsoneachrowwithprogress.sql +++ b/dbms/tests/queries/0_stateless/00968_live_view_select_format_jsoneachrowwithprogress.sql @@ -1,3 +1,5 @@ +SET allow_experimental_live_view = 1; + DROP TABLE IF EXISTS test.lv; DROP TABLE IF EXISTS test.mt; diff --git a/dbms/tests/queries/0_stateless/00969_live_view_watch_format_jsoneachrowwithprogress.sql b/dbms/tests/queries/0_stateless/00969_live_view_watch_format_jsoneachrowwithprogress.sql index 725a4ad00ed..3e46d55c014 100644 --- a/dbms/tests/queries/0_stateless/00969_live_view_watch_format_jsoneachrowwithprogress.sql +++ b/dbms/tests/queries/0_stateless/00969_live_view_watch_format_jsoneachrowwithprogress.sql @@ -1,3 +1,5 @@ +SET allow_experimental_live_view = 1; + DROP TABLE IF EXISTS test.lv; DROP TABLE IF EXISTS test.mt; diff --git a/dbms/tests/queries/0_stateless/00970_live_view_watch_events_http_heartbeat.py b/dbms/tests/queries/0_stateless/00970_live_view_watch_events_http_heartbeat.py index 63628c4a76f..8435cdc147a 100755 --- a/dbms/tests/queries/0_stateless/00970_live_view_watch_events_http_heartbeat.py +++ b/dbms/tests/queries/0_stateless/00970_live_view_watch_events_http_heartbeat.py @@ -15,6 +15,9 @@ log = None with client(name='client1>', log=log) as client1: client1.expect(prompt) + client1.send('SET allow_experimental_live_view = 1') + client1.expect(prompt) + client1.send('DROP TABLE IF EXISTS test.lv') client1.expect(prompt) client1.send(' DROP TABLE IF EXISTS test.mt') @@ -24,8 +27,7 @@ with client(name='client1>', log=log) as client1: client1.send('CREATE LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt') client1.expect(prompt) - - with http_client({'method':'GET', 'url': '/?live_view_heartbeat_interval=1&query=WATCH%20test.lv%20EVENTS%20FORMAT%20JSONEachRowWithProgress'}, name='client2>', log=log) as client2: + with http_client({'method':'GET', 'url': '/?allow_experimental_live_view=1&live_view_heartbeat_interval=1&query=WATCH%20test.lv%20EVENTS%20FORMAT%20JSONEachRowWithProgress'}, name='client2>', log=log) as client2: client2.expect('{"progress":{"read_rows":"1","read_bytes":"8","written_rows":"0","written_bytes":"0","total_rows_to_read":"0"}}\n', escape=True) client2.expect('{"row":{"version":"1"}', escape=True) client2.expect('{"progress":{"read_rows":"1","read_bytes":"8","written_rows":"0","written_bytes":"0","total_rows_to_read":"0"}}', escape=True) diff --git a/dbms/tests/queries/0_stateless/00971_live_view_watch_http_heartbeat.py b/dbms/tests/queries/0_stateless/00971_live_view_watch_http_heartbeat.py index 7bdb47b7caa..2317d705efe 100755 --- a/dbms/tests/queries/0_stateless/00971_live_view_watch_http_heartbeat.py +++ b/dbms/tests/queries/0_stateless/00971_live_view_watch_http_heartbeat.py @@ -15,6 +15,9 @@ log = None with client(name='client1>', log=log) as client1: client1.expect(prompt) + client1.send('SET allow_experimental_live_view = 1') + client1.expect(prompt) + client1.send('DROP TABLE IF EXISTS test.lv') client1.expect(prompt) client1.send(' DROP TABLE IF EXISTS test.mt') @@ -24,7 +27,7 @@ with client(name='client1>', log=log) as client1: client1.send('CREATE LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt') client1.expect(prompt) - with http_client({'method':'GET', 'url':'/?live_view_heartbeat_interval=1&query=WATCH%20test.lv%20FORMAT%20JSONEachRowWithProgress'}, name='client2>', log=log) as client2: + with http_client({'method':'GET', 'url':'/?allow_experimental_live_view=1&live_view_heartbeat_interval=1&query=WATCH%20test.lv%20FORMAT%20JSONEachRowWithProgress'}, name='client2>', log=log) as client2: client2.expect('"progress".*',) client2.expect('{"row":{"sum(a)":"0","_version":"1"}}\n', escape=True) client2.expect('"progress".*\n') diff --git a/dbms/tests/queries/0_stateless/00972_live_view_select_1.sql b/dbms/tests/queries/0_stateless/00972_live_view_select_1.sql index 661080b577b..135516b0cd3 100644 --- a/dbms/tests/queries/0_stateless/00972_live_view_select_1.sql +++ b/dbms/tests/queries/0_stateless/00972_live_view_select_1.sql @@ -1,3 +1,5 @@ +SET allow_experimental_live_view = 1; + DROP TABLE IF EXISTS test.lv; CREATE LIVE VIEW test.lv AS SELECT 1; diff --git a/dbms/tests/queries/0_stateless/00973_live_view_select.sql b/dbms/tests/queries/0_stateless/00973_live_view_select.sql index ff4a45ffcc1..4b5ca0a2dd7 100644 --- a/dbms/tests/queries/0_stateless/00973_live_view_select.sql +++ b/dbms/tests/queries/0_stateless/00973_live_view_select.sql @@ -1,3 +1,5 @@ +SET allow_experimental_live_view = 1; + DROP TABLE IF EXISTS test.lv; DROP TABLE IF EXISTS test.mt; diff --git a/dbms/tests/queries/0_stateless/00974_live_view_select_with_aggregation.sql b/dbms/tests/queries/0_stateless/00974_live_view_select_with_aggregation.sql index 3c11f855c9d..3faaec8f623 100644 --- a/dbms/tests/queries/0_stateless/00974_live_view_select_with_aggregation.sql +++ b/dbms/tests/queries/0_stateless/00974_live_view_select_with_aggregation.sql @@ -1,3 +1,5 @@ +SET allow_experimental_live_view = 1; + DROP TABLE IF EXISTS test.lv; DROP TABLE IF EXISTS test.mt; diff --git a/dbms/tests/queries/0_stateless/00975_live_view_create.sql b/dbms/tests/queries/0_stateless/00975_live_view_create.sql index 1c929b15b00..02c1644d193 100644 --- a/dbms/tests/queries/0_stateless/00975_live_view_create.sql +++ b/dbms/tests/queries/0_stateless/00975_live_view_create.sql @@ -1,3 +1,5 @@ +SET allow_experimental_live_view = 1; + DROP TABLE IF EXISTS test.mt; CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple(); diff --git a/dbms/tests/queries/0_stateless/00976_live_view_select_version.sql b/dbms/tests/queries/0_stateless/00976_live_view_select_version.sql index 5f3ab1f7546..ae1c59a92d7 100644 --- a/dbms/tests/queries/0_stateless/00976_live_view_select_version.sql +++ b/dbms/tests/queries/0_stateless/00976_live_view_select_version.sql @@ -1,3 +1,5 @@ +SET allow_experimental_live_view = 1; + DROP TABLE IF EXISTS test.lv; DROP TABLE IF EXISTS test.mt; diff --git a/dbms/tests/queries/0_stateless/00977_live_view_watch_events.sql b/dbms/tests/queries/0_stateless/00977_live_view_watch_events.sql index a3b84e8d4c1..3e0d066fb8d 100644 --- a/dbms/tests/queries/0_stateless/00977_live_view_watch_events.sql +++ b/dbms/tests/queries/0_stateless/00977_live_view_watch_events.sql @@ -1,3 +1,5 @@ +SET allow_experimental_live_view = 1; + DROP TABLE IF EXISTS test.lv; DROP TABLE IF EXISTS test.mt; diff --git a/dbms/tests/queries/0_stateless/00978_live_view_watch.sql b/dbms/tests/queries/0_stateless/00978_live_view_watch.sql index abe4a6c32ae..b8d0d93ccab 100644 --- a/dbms/tests/queries/0_stateless/00978_live_view_watch.sql +++ b/dbms/tests/queries/0_stateless/00978_live_view_watch.sql @@ -1,3 +1,5 @@ +SET allow_experimental_live_view = 1; + DROP TABLE IF EXISTS test.lv; DROP TABLE IF EXISTS test.mt; diff --git a/dbms/tests/queries/0_stateless/00979_live_view_watch_live.py b/dbms/tests/queries/0_stateless/00979_live_view_watch_live.py index 948e4c93662..8c5bc5b8eb2 100755 --- a/dbms/tests/queries/0_stateless/00979_live_view_watch_live.py +++ b/dbms/tests/queries/0_stateless/00979_live_view_watch_live.py @@ -16,6 +16,11 @@ with client(name='client1>', log=log) as client1, client(name='client2>', log=lo client1.expect(prompt) client2.expect(prompt) + client1.send('SET allow_experimental_live_view = 1') + client1.expect(prompt) + client2.send('SET allow_experimental_live_view = 1') + client2.expect(prompt) + client1.send('DROP TABLE IF EXISTS test.lv') client1.expect(prompt) client1.send(' DROP TABLE IF EXISTS test.mt') diff --git a/dbms/tests/queries/0_stateless/00980_create_temporary_live_view.sql b/dbms/tests/queries/0_stateless/00980_create_temporary_live_view.sql index 8cd6ee06ace..037c2a9e587 100644 --- a/dbms/tests/queries/0_stateless/00980_create_temporary_live_view.sql +++ b/dbms/tests/queries/0_stateless/00980_create_temporary_live_view.sql @@ -1,3 +1,5 @@ +SET allow_experimental_live_view = 1; + DROP TABLE IF EXISTS test.lv; DROP TABLE IF EXISTS test.mt; From e80ff65a0f10ab1abe6792a79b56196aad74a661 Mon Sep 17 00:00:00 2001 From: Vitaliy Zakaznikov Date: Tue, 27 Aug 2019 16:23:12 -0400 Subject: [PATCH 8/8] Removing allow_experimental_live_view option from clickhouse-client.xml. --- dbms/tests/clickhouse-client.xml | 1 - 1 file changed, 1 deletion(-) diff --git a/dbms/tests/clickhouse-client.xml b/dbms/tests/clickhouse-client.xml index ebce35127e5..b6003ca2d09 100644 --- a/dbms/tests/clickhouse-client.xml +++ b/dbms/tests/clickhouse-client.xml @@ -1,4 +1,3 @@ - 1 100000