diff --git a/dbms/src/Common/RWLock.cpp b/dbms/src/Common/RWLock.cpp index e343ce0b0cd..3c7034b5c23 100644 --- a/dbms/src/Common/RWLock.cpp +++ b/dbms/src/Common/RWLock.cpp @@ -37,7 +37,6 @@ class RWLockImpl::LockHolderImpl RWLock parent; GroupsContainer::iterator it_group; ClientsContainer::iterator it_client; - ThreadToHolder::key_type thread_id; QueryIdToHolder::key_type query_id; CurrentMetrics::Increment active_client_increment; @@ -74,17 +73,12 @@ RWLockImpl::LockHolder RWLockImpl::getLock(RWLockImpl::Type type, const String & /// Check if the same query is acquiring previously acquired lock LockHolder existing_holder_ptr; - auto this_thread_id = std::this_thread::get_id(); - auto it_thread = thread_to_holder.find(this_thread_id); - - auto it_query = query_id_to_holder.end(); if (query_id != RWLockImpl::NO_QUERY) - it_query = query_id_to_holder.find(query_id); - - if (it_thread != thread_to_holder.end()) - existing_holder_ptr = it_thread->second.lock(); - else if (it_query != query_id_to_holder.end()) - existing_holder_ptr = it_query->second.lock(); + { + auto it_query = query_id_to_holder.find(query_id); + if (it_query != query_id_to_holder.end()) + existing_holder_ptr = it_query->second.lock(); + } if (existing_holder_ptr) { @@ -125,10 +119,7 @@ RWLockImpl::LockHolder RWLockImpl::getLock(RWLockImpl::Type type, const String & /// Wait a notification until we will be the only in the group. it_group->cv.wait(lock, [&] () { return it_group == queue.begin(); }); - /// Insert myself (weak_ptr to the holder) to threads set to implement recursive lock - thread_to_holder.emplace(this_thread_id, res); - res->thread_id = this_thread_id; - + /// Insert myself (weak_ptr to the holder) to queries set to implement recursive lock if (query_id != RWLockImpl::NO_QUERY) query_id_to_holder.emplace(query_id, res); res->query_id = query_id; @@ -143,7 +134,6 @@ RWLockImpl::LockHolderImpl::~LockHolderImpl() std::unique_lock lock(parent->mutex); /// Remove weak_ptrs to the holder, since there are no owners of the current lock - parent->thread_to_holder.erase(thread_id); parent->query_id_to_holder.erase(query_id); /// Removes myself from client list of our group diff --git a/dbms/src/Common/RWLock.h b/dbms/src/Common/RWLock.h index 0467901fa27..c94dd3fe1a4 100644 --- a/dbms/src/Common/RWLock.h +++ b/dbms/src/Common/RWLock.h @@ -6,7 +6,6 @@ #include #include #include -#include #include #include @@ -19,7 +18,7 @@ using RWLock = std::shared_ptr; /// Implements shared lock with FIFO service -/// Can be acquired recursively (several calls for the same query or the same OS thread) in Read mode +/// Can be acquired recursively (several calls for the same query) in Read mode /// /// NOTE: it is important to allow acquiring the same lock in Read mode without waiting if it is already /// acquired by another thread of the same query. Otherwise the following deadlock is possible: @@ -55,7 +54,6 @@ private: struct Group; using GroupsContainer = std::list; using ClientsContainer = std::list; - using ThreadToHolder = std::map>; using QueryIdToHolder = std::map>; /// Group of clients that should be executed concurrently @@ -73,7 +71,6 @@ private: mutable std::mutex mutex; GroupsContainer queue; - ThreadToHolder thread_to_holder; QueryIdToHolder query_id_to_holder; }; diff --git a/dbms/src/Common/tests/gtest_rw_lock.cpp b/dbms/src/Common/tests/gtest_rw_lock.cpp index 68927c8bc4a..373b417e7a0 100644 --- a/dbms/src/Common/tests/gtest_rw_lock.cpp +++ b/dbms/src/Common/tests/gtest_rw_lock.cpp @@ -94,7 +94,7 @@ TEST(Common, RWLock_Recursive) { for (int i = 0; i < 2 * cycles; ++i) { - auto lock = fifo_lock->getLock(RWLockImpl::Write, RWLockImpl::NO_QUERY); + auto lock = fifo_lock->getLock(RWLockImpl::Write, "q1"); auto sleep_for = std::chrono::duration(std::uniform_int_distribution<>(1, 100)(gen)); std::this_thread::sleep_for(sleep_for); @@ -105,17 +105,17 @@ TEST(Common, RWLock_Recursive) { for (int i = 0; i < cycles; ++i) { - auto lock1 = fifo_lock->getLock(RWLockImpl::Read, RWLockImpl::NO_QUERY); + auto lock1 = fifo_lock->getLock(RWLockImpl::Read, "q2"); auto sleep_for = std::chrono::duration(std::uniform_int_distribution<>(1, 100)(gen)); std::this_thread::sleep_for(sleep_for); - auto lock2 = fifo_lock->getLock(RWLockImpl::Read, RWLockImpl::NO_QUERY); + auto lock2 = fifo_lock->getLock(RWLockImpl::Read, "q2"); - EXPECT_ANY_THROW({fifo_lock->getLock(RWLockImpl::Write, RWLockImpl::NO_QUERY);}); + EXPECT_ANY_THROW({fifo_lock->getLock(RWLockImpl::Write, "q2");}); } - fifo_lock->getLock(RWLockImpl::Write, RWLockImpl::NO_QUERY); + fifo_lock->getLock(RWLockImpl::Write, "q2"); }); t1.join(); diff --git a/dbms/tests/queries/0_stateless/00960_live_view_watch_events_live.py b/dbms/tests/queries/0_stateless/00960_live_view_watch_events_live.py deleted file mode 100755 index 2095683720e..00000000000 --- a/dbms/tests/queries/0_stateless/00960_live_view_watch_events_live.py +++ /dev/null @@ -1,47 +0,0 @@ -#!/usr/bin/env python -import os -import sys -import signal - -CURDIR = os.path.dirname(os.path.realpath(__file__)) -sys.path.insert(0, os.path.join(CURDIR, 'helpers')) - -from client import client, prompt, end_of_block - -log = None -# uncomment the line below for debugging -#log=sys.stdout - -with client(name='client1>', log=log) as client1, client(name='client2>', log=log) as client2: - client1.expect(prompt) - client2.expect(prompt) - - client1.send('SET allow_experimental_live_view = 1') - client1.expect(prompt) - client2.send('SET allow_experimental_live_view = 1') - client2.expect(prompt) - - client1.send('DROP TABLE IF EXISTS test.lv') - client1.expect(prompt) - client1.send(' DROP TABLE IF EXISTS test.mt') - client1.expect(prompt) - client1.send('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()') - client1.expect(prompt) - client1.send('CREATE LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt') - client1.expect(prompt) - client1.send('WATCH test.lv EVENTS') - client1.expect('1.*' + end_of_block) - client2.send('INSERT INTO test.mt VALUES (1),(2),(3)') - client1.expect('2.*' + end_of_block) - client2.send('INSERT INTO test.mt VALUES (4),(5),(6)') - client1.expect('3.*' + end_of_block) - # send Ctrl-C - client1.send('\x03', eol='') - match = client1.expect('(%s)|([#\$] )' % prompt) - if match.groups()[1]: - client1.send(client1.command) - client1.expect(prompt) - client1.send('DROP TABLE test.lv') - client1.expect(prompt) - client1.send('DROP TABLE test.mt') - client1.expect(prompt) diff --git a/dbms/tests/queries/0_stateless/00960_live_view_watch_events_live.reference b/dbms/tests/queries/0_stateless/00960_live_view_watch_events_live.reference deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/dbms/tests/queries/0_stateless/00961_temporary_live_view_watch.reference b/dbms/tests/queries/0_stateless/00961_temporary_live_view_watch.reference deleted file mode 100644 index 6fbbedf1b21..00000000000 --- a/dbms/tests/queries/0_stateless/00961_temporary_live_view_watch.reference +++ /dev/null @@ -1,3 +0,0 @@ -0 1 -6 2 -21 3 diff --git a/dbms/tests/queries/0_stateless/00961_temporary_live_view_watch.sql b/dbms/tests/queries/0_stateless/00961_temporary_live_view_watch.sql deleted file mode 100644 index 7992da92f97..00000000000 --- a/dbms/tests/queries/0_stateless/00961_temporary_live_view_watch.sql +++ /dev/null @@ -1,20 +0,0 @@ -SET allow_experimental_live_view = 1; - -DROP TABLE IF EXISTS test.lv; -DROP TABLE IF EXISTS test.mt; - -CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple(); -CREATE TEMPORARY LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt; - -WATCH test.lv LIMIT 0; - -INSERT INTO test.mt VALUES (1),(2),(3); - -WATCH test.lv LIMIT 0; - -INSERT INTO test.mt VALUES (4),(5),(6); - -WATCH test.lv LIMIT 0; - -DROP TABLE test.lv; -DROP TABLE test.mt; diff --git a/dbms/tests/queries/0_stateless/00962_temporary_live_view_watch_live.py b/dbms/tests/queries/0_stateless/00962_temporary_live_view_watch_live.py deleted file mode 100755 index 3dbec01b29a..00000000000 --- a/dbms/tests/queries/0_stateless/00962_temporary_live_view_watch_live.py +++ /dev/null @@ -1,47 +0,0 @@ -#!/usr/bin/env python -import os -import sys -import signal - -CURDIR = os.path.dirname(os.path.realpath(__file__)) -sys.path.insert(0, os.path.join(CURDIR, 'helpers')) - -from client import client, prompt, end_of_block - -log = None -# uncomment the line below for debugging -#log=sys.stdout - -with client(name='client1>', log=log) as client1, client(name='client2>', log=log) as client2: - client1.expect(prompt) - client2.expect(prompt) - - client1.send('SET allow_experimental_live_view = 1') - client1.expect(prompt) - client2.send('SET allow_experimental_live_view = 1') - client2.expect(prompt) - - client1.send('DROP TABLE IF EXISTS test.lv') - client1.expect(prompt) - client1.send('DROP TABLE IF EXISTS test.mt') - client1.expect(prompt) - client1.send('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()') - client1.expect(prompt) - client1.send('CREATE TEMPORARY LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt') - client1.expect(prompt) - client1.send('WATCH test.lv') - client1.expect(r'0.*1' + end_of_block) - client2.send('INSERT INTO test.mt VALUES (1),(2),(3)') - client1.expect(r'6.*2' + end_of_block) - client2.send('INSERT INTO test.mt VALUES (4),(5),(6)') - client1.expect(r'21.*3' + end_of_block) - # send Ctrl-C - client1.send('\x03', eol='') - match = client1.expect('(%s)|([#\$] )' % prompt) - if match.groups()[1]: - client1.send(client1.command) - client1.expect(prompt) - client1.send('DROP TABLE test.lv') - client1.expect(prompt) - client1.send('DROP TABLE test.mt') - client1.expect(prompt) diff --git a/dbms/tests/queries/0_stateless/00962_temporary_live_view_watch_live.reference b/dbms/tests/queries/0_stateless/00962_temporary_live_view_watch_live.reference deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/dbms/tests/queries/0_stateless/00963_temporary_live_view_watch_live_timeout.py.disabled b/dbms/tests/queries/0_stateless/00963_temporary_live_view_watch_live_timeout.py.disabled deleted file mode 100755 index b324c1b90cc..00000000000 --- a/dbms/tests/queries/0_stateless/00963_temporary_live_view_watch_live_timeout.py.disabled +++ /dev/null @@ -1,54 +0,0 @@ -#!/usr/bin/env python -import os -import sys -import signal - -CURDIR = os.path.dirname(os.path.realpath(__file__)) -sys.path.insert(0, os.path.join(CURDIR, 'helpers')) - -from client import client, prompt, end_of_block - -log = None -# uncomment the line below for debugging -#log=sys.stdout - -with client(name='client1>', log=log) as client1, client(name='client2>', log=log) as client2: - client1.expect(prompt) - client2.expect(prompt) - - client1.send('SET allow_experimental_live_view = 1') - client1.expect(prompt) - client2.send('SET allow_experimental_live_view = 1') - client2.expect(prompt) - - client1.send('DROP TABLE IF EXISTS test.lv') - client1.expect(prompt) - client1.send('DROP TABLE IF EXISTS test.mt') - client1.expect(prompt) - client1.send('SET temporary_live_view_timeout=1') - client1.expect(prompt) - client1.send('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()') - client1.expect(prompt) - client1.send('CREATE TEMPORARY LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt') - client1.expect(prompt) - client1.send('WATCH test.lv') - client1.expect(r'0.*1' + end_of_block) - client2.send('INSERT INTO test.mt VALUES (1),(2),(3)') - client2.expect(prompt) - client1.expect(r'6.*2' + end_of_block) - client2.send('INSERT INTO test.mt VALUES (4),(5),(6)') - client2.expect(prompt) - client1.expect(r'21.*3' + end_of_block) - # send Ctrl-C - client1.send('\x03', eol='') - match = client1.expect('(%s)|([#\$] )' % prompt) - if match.groups()[1]: - client1.send(client1.command) - client1.expect(prompt) - client1.send('SELECT sleep(1)') - client1.expect(prompt) - client1.send('DROP TABLE test.lv') - client1.expect('Table test.lv doesn\'t exist') - client1.expect(prompt) - client1.send('DROP TABLE test.mt') - client1.expect(prompt) diff --git a/dbms/tests/queries/0_stateless/00963_temporary_live_view_watch_live_timeout.reference b/dbms/tests/queries/0_stateless/00963_temporary_live_view_watch_live_timeout.reference deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/dbms/tests/queries/0_stateless/00964_live_view_watch_events_heartbeat.py b/dbms/tests/queries/0_stateless/00964_live_view_watch_events_heartbeat.py deleted file mode 100755 index 528f18839bb..00000000000 --- a/dbms/tests/queries/0_stateless/00964_live_view_watch_events_heartbeat.py +++ /dev/null @@ -1,49 +0,0 @@ -#!/usr/bin/env python -import os -import sys -import signal - -CURDIR = os.path.dirname(os.path.realpath(__file__)) -sys.path.insert(0, os.path.join(CURDIR, 'helpers')) - -from client import client, prompt, end_of_block - -log = None -# uncomment the line below for debugging -#log=sys.stdout - -with client(name='client1>', log=log) as client1, client(name='client2>', log=log) as client2: - client1.expect(prompt) - client2.expect(prompt) - - client1.send('SET allow_experimental_live_view = 1') - client1.expect(prompt) - client2.send('SET allow_experimental_live_view = 1') - client2.expect(prompt) - - client1.send('DROP TABLE IF EXISTS test.lv') - client1.expect(prompt) - client1.send(' DROP TABLE IF EXISTS test.mt') - client1.expect(prompt) - client1.send('SET live_view_heartbeat_interval=1') - client1.expect(prompt) - client1.send('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()') - client1.expect(prompt) - client1.send('CREATE TEMPORARY LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt') - client1.expect(prompt) - client1.send('WATCH test.lv EVENTS') - client2.send('INSERT INTO test.mt VALUES (1),(2),(3)') - client1.expect('2.*' + end_of_block) - client1.expect('Progress: 2.00 rows.*\)') - # wait for heartbeat - client1.expect('Progress: 2.00 rows.*\)') - # send Ctrl-C - client1.send('\x03', eol='') - match = client1.expect('(%s)|([#\$] )' % prompt) - if match.groups()[1]: - client1.send(client1.command) - client1.expect(prompt) - client1.send('DROP TABLE test.lv') - client1.expect(prompt) - client1.send('DROP TABLE test.mt') - client1.expect(prompt) diff --git a/dbms/tests/queries/0_stateless/00964_live_view_watch_events_heartbeat.reference b/dbms/tests/queries/0_stateless/00964_live_view_watch_events_heartbeat.reference deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/dbms/tests/queries/0_stateless/00965_live_view_watch_heartbeat.py b/dbms/tests/queries/0_stateless/00965_live_view_watch_heartbeat.py deleted file mode 100755 index 2723936f876..00000000000 --- a/dbms/tests/queries/0_stateless/00965_live_view_watch_heartbeat.py +++ /dev/null @@ -1,50 +0,0 @@ -#!/usr/bin/env python -import os -import sys -import signal - -CURDIR = os.path.dirname(os.path.realpath(__file__)) -sys.path.insert(0, os.path.join(CURDIR, 'helpers')) - -from client import client, prompt, end_of_block - -log = None -# uncomment the line below for debugging -#log=sys.stdout - -with client(name='client1>', log=log) as client1, client(name='client2>', log=log) as client2: - client1.expect(prompt) - client2.expect(prompt) - - client1.send('SET allow_experimental_live_view = 1') - client1.expect(prompt) - client2.send('SET allow_experimental_live_view = 1') - client2.expect(prompt) - - client1.send('DROP TABLE IF EXISTS test.lv') - client1.expect(prompt) - client1.send(' DROP TABLE IF EXISTS test.mt') - client1.expect(prompt) - client1.send('SET live_view_heartbeat_interval=1') - client1.expect(prompt) - client1.send('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()') - client1.expect(prompt) - client1.send('CREATE TEMPORARY LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt') - client1.expect(prompt) - client1.send('WATCH test.lv') - client1.expect(r'0.*1' + end_of_block) - client2.send('INSERT INTO test.mt VALUES (1),(2),(3)') - client1.expect(r'6.*2' + end_of_block) - client1.expect('Progress: 2.00 rows.*\)') - # wait for heartbeat - client1.expect('Progress: 2.00 rows.*\)') - # send Ctrl-C - client1.send('\x03', eol='') - match = client1.expect('(%s)|([#\$] )' % prompt) - if match.groups()[1]: - client1.send(client1.command) - client1.expect(prompt) - client1.send('DROP TABLE test.lv') - client1.expect(prompt) - client1.send('DROP TABLE test.mt') - client1.expect(prompt) diff --git a/dbms/tests/queries/0_stateless/00965_live_view_watch_heartbeat.reference b/dbms/tests/queries/0_stateless/00965_live_view_watch_heartbeat.reference deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/dbms/tests/queries/0_stateless/00966_live_view_watch_events_http.py b/dbms/tests/queries/0_stateless/00966_live_view_watch_events_http.py deleted file mode 100755 index 72ab3ea8818..00000000000 --- a/dbms/tests/queries/0_stateless/00966_live_view_watch_events_http.py +++ /dev/null @@ -1,40 +0,0 @@ -#!/usr/bin/env python -import os -import sys - -CURDIR = os.path.dirname(os.path.realpath(__file__)) -sys.path.insert(0, os.path.join(CURDIR, 'helpers')) - -from client import client, prompt, end_of_block -from httpclient import client as http_client - -log = None -# uncomment the line below for debugging -#log=sys.stdout - -with client(name='client1>', log=log) as client1: - client1.expect(prompt) - - client1.send('SET allow_experimental_live_view = 1') - client1.expect(prompt) - - client1.send('DROP TABLE IF EXISTS test.lv') - client1.expect(prompt) - client1.send(' DROP TABLE IF EXISTS test.mt') - client1.expect(prompt) - client1.send('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()') - client1.expect(prompt) - client1.send('CREATE LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt') - client1.expect(prompt) - - - with http_client({'method':'GET', 'url': '/?allow_experimental_live_view=1&query=WATCH%20test.lv%20EVENTS'}, name='client2>', log=log) as client2: - client2.expect('.*1\n') - client1.send('INSERT INTO test.mt VALUES (1),(2),(3)') - client1.expect(prompt) - client2.expect('.*2\n') - - client1.send('DROP TABLE test.lv') - client1.expect(prompt) - client1.send('DROP TABLE test.mt') - client1.expect(prompt) diff --git a/dbms/tests/queries/0_stateless/00966_live_view_watch_events_http.reference b/dbms/tests/queries/0_stateless/00966_live_view_watch_events_http.reference deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/dbms/tests/queries/0_stateless/00967_live_view_watch_http.py b/dbms/tests/queries/0_stateless/00967_live_view_watch_http.py deleted file mode 100755 index e2f33971c3d..00000000000 --- a/dbms/tests/queries/0_stateless/00967_live_view_watch_http.py +++ /dev/null @@ -1,40 +0,0 @@ -#!/usr/bin/env python -import os -import sys - -CURDIR = os.path.dirname(os.path.realpath(__file__)) -sys.path.insert(0, os.path.join(CURDIR, 'helpers')) - -from client import client, prompt, end_of_block -from httpclient import client as http_client - -log = None -# uncomment the line below for debugging -#log=sys.stdout - -with client(name='client1>', log=log) as client1: - client1.expect(prompt) - - client1.send('SET allow_experimental_live_view = 1') - client1.expect(prompt) - - client1.send('DROP TABLE IF EXISTS test.lv') - client1.expect(prompt) - client1.send(' DROP TABLE IF EXISTS test.mt') - client1.expect(prompt) - client1.send('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()') - client1.expect(prompt) - client1.send('CREATE LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt') - client1.expect(prompt) - - - with http_client({'method':'GET', 'url':'/?allow_experimental_live_view=1&query=WATCH%20test.lv'}, name='client2>', log=log) as client2: - client2.expect('.*0\t1\n') - client1.send('INSERT INTO test.mt VALUES (1),(2),(3)') - client1.expect(prompt) - client2.expect('.*6\t2\n') - - client1.send('DROP TABLE test.lv') - client1.expect(prompt) - client1.send('DROP TABLE test.mt') - client1.expect(prompt) diff --git a/dbms/tests/queries/0_stateless/00967_live_view_watch_http.reference b/dbms/tests/queries/0_stateless/00967_live_view_watch_http.reference deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/dbms/tests/queries/0_stateless/00968_live_view_select_format_jsoneachrowwithprogress.reference b/dbms/tests/queries/0_stateless/00968_live_view_select_format_jsoneachrowwithprogress.reference deleted file mode 100644 index 5ae423d90d1..00000000000 --- a/dbms/tests/queries/0_stateless/00968_live_view_select_format_jsoneachrowwithprogress.reference +++ /dev/null @@ -1,4 +0,0 @@ -{"row":{"a":1}} -{"row":{"a":2}} -{"row":{"a":3}} -{"progress":{"read_rows":"3","read_bytes":"36","written_rows":"0","written_bytes":"0","total_rows_to_read":"0"}} diff --git a/dbms/tests/queries/0_stateless/00968_live_view_select_format_jsoneachrowwithprogress.sql b/dbms/tests/queries/0_stateless/00968_live_view_select_format_jsoneachrowwithprogress.sql deleted file mode 100644 index 1023cdf6b29..00000000000 --- a/dbms/tests/queries/0_stateless/00968_live_view_select_format_jsoneachrowwithprogress.sql +++ /dev/null @@ -1,14 +0,0 @@ -SET allow_experimental_live_view = 1; - -DROP TABLE IF EXISTS test.lv; -DROP TABLE IF EXISTS test.mt; - -CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple(); -CREATE LIVE VIEW test.lv AS SELECT * FROM test.mt; - -INSERT INTO test.mt VALUES (1),(2),(3); - -SELECT * FROM test.lv FORMAT JSONEachRowWithProgress; - -DROP TABLE test.lv; -DROP TABLE test.mt; diff --git a/dbms/tests/queries/0_stateless/00969_live_view_watch_format_jsoneachrowwithprogress.reference b/dbms/tests/queries/0_stateless/00969_live_view_watch_format_jsoneachrowwithprogress.reference deleted file mode 100644 index 287a1ced92d..00000000000 --- a/dbms/tests/queries/0_stateless/00969_live_view_watch_format_jsoneachrowwithprogress.reference +++ /dev/null @@ -1,6 +0,0 @@ -{"row":{"sum(a)":"0","_version":"1"}} -{"progress":{"read_rows":"1","read_bytes":"16","written_rows":"0","written_bytes":"0","total_rows_to_read":"0"}} -{"row":{"sum(a)":"6","_version":"2"}} -{"progress":{"read_rows":"1","read_bytes":"16","written_rows":"0","written_bytes":"0","total_rows_to_read":"0"}} -{"row":{"sum(a)":"21","_version":"3"}} -{"progress":{"read_rows":"1","read_bytes":"16","written_rows":"0","written_bytes":"0","total_rows_to_read":"0"}} diff --git a/dbms/tests/queries/0_stateless/00969_live_view_watch_format_jsoneachrowwithprogress.sql b/dbms/tests/queries/0_stateless/00969_live_view_watch_format_jsoneachrowwithprogress.sql deleted file mode 100644 index 3e46d55c014..00000000000 --- a/dbms/tests/queries/0_stateless/00969_live_view_watch_format_jsoneachrowwithprogress.sql +++ /dev/null @@ -1,20 +0,0 @@ -SET allow_experimental_live_view = 1; - -DROP TABLE IF EXISTS test.lv; -DROP TABLE IF EXISTS test.mt; - -CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple(); -CREATE LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt; - -WATCH test.lv LIMIT 0 FORMAT JSONEachRowWithProgress; - -INSERT INTO test.mt VALUES (1),(2),(3); - -WATCH test.lv LIMIT 0 FORMAT JSONEachRowWithProgress; - -INSERT INTO test.mt VALUES (4),(5),(6); - -WATCH test.lv LIMIT 0 FORMAT JSONEachRowWithProgress; - -DROP TABLE test.lv; -DROP TABLE test.mt; diff --git a/dbms/tests/queries/0_stateless/00970_live_view_watch_events_http_heartbeat.py b/dbms/tests/queries/0_stateless/00970_live_view_watch_events_http_heartbeat.py deleted file mode 100755 index 8435cdc147a..00000000000 --- a/dbms/tests/queries/0_stateless/00970_live_view_watch_events_http_heartbeat.py +++ /dev/null @@ -1,45 +0,0 @@ -#!/usr/bin/env python -import os -import sys - -CURDIR = os.path.dirname(os.path.realpath(__file__)) -sys.path.insert(0, os.path.join(CURDIR, 'helpers')) - -from client import client, prompt, end_of_block -from httpclient import client as http_client - -log = None -# uncomment the line below for debugging -#log=sys.stdout - -with client(name='client1>', log=log) as client1: - client1.expect(prompt) - - client1.send('SET allow_experimental_live_view = 1') - client1.expect(prompt) - - client1.send('DROP TABLE IF EXISTS test.lv') - client1.expect(prompt) - client1.send(' DROP TABLE IF EXISTS test.mt') - client1.expect(prompt) - client1.send('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()') - client1.expect(prompt) - client1.send('CREATE LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt') - client1.expect(prompt) - - with http_client({'method':'GET', 'url': '/?allow_experimental_live_view=1&live_view_heartbeat_interval=1&query=WATCH%20test.lv%20EVENTS%20FORMAT%20JSONEachRowWithProgress'}, name='client2>', log=log) as client2: - client2.expect('{"progress":{"read_rows":"1","read_bytes":"8","written_rows":"0","written_bytes":"0","total_rows_to_read":"0"}}\n', escape=True) - client2.expect('{"row":{"version":"1"}', escape=True) - client2.expect('{"progress":{"read_rows":"1","read_bytes":"8","written_rows":"0","written_bytes":"0","total_rows_to_read":"0"}}', escape=True) - # heartbeat is provided by progress message - client2.expect('{"progress":{"read_rows":"1","read_bytes":"8","written_rows":"0","written_bytes":"0","total_rows_to_read":"0"}}', escape=True) - - client1.send('INSERT INTO test.mt VALUES (1),(2),(3)') - client1.expect(prompt) - - client2.expect('{"row":{"version":"2"}}\n', escape=True) - - client1.send('DROP TABLE test.lv') - client1.expect(prompt) - client1.send('DROP TABLE test.mt') - client1.expect(prompt) diff --git a/dbms/tests/queries/0_stateless/00970_live_view_watch_events_http_heartbeat.reference b/dbms/tests/queries/0_stateless/00970_live_view_watch_events_http_heartbeat.reference deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/dbms/tests/queries/0_stateless/00971_live_view_watch_http_heartbeat.py b/dbms/tests/queries/0_stateless/00971_live_view_watch_http_heartbeat.py deleted file mode 100755 index 2317d705efe..00000000000 --- a/dbms/tests/queries/0_stateless/00971_live_view_watch_http_heartbeat.py +++ /dev/null @@ -1,46 +0,0 @@ -#!/usr/bin/env python -import os -import sys - -CURDIR = os.path.dirname(os.path.realpath(__file__)) -sys.path.insert(0, os.path.join(CURDIR, 'helpers')) - -from client import client, prompt, end_of_block -from httpclient import client as http_client - -log = None -# uncomment the line below for debugging -#log=sys.stdout - -with client(name='client1>', log=log) as client1: - client1.expect(prompt) - - client1.send('SET allow_experimental_live_view = 1') - client1.expect(prompt) - - client1.send('DROP TABLE IF EXISTS test.lv') - client1.expect(prompt) - client1.send(' DROP TABLE IF EXISTS test.mt') - client1.expect(prompt) - client1.send('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()') - client1.expect(prompt) - client1.send('CREATE LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt') - client1.expect(prompt) - - with http_client({'method':'GET', 'url':'/?allow_experimental_live_view=1&live_view_heartbeat_interval=1&query=WATCH%20test.lv%20FORMAT%20JSONEachRowWithProgress'}, name='client2>', log=log) as client2: - client2.expect('"progress".*',) - client2.expect('{"row":{"sum(a)":"0","_version":"1"}}\n', escape=True) - client2.expect('"progress".*\n') - # heartbeat is provided by progress message - client2.expect('"progress".*\n') - - client1.send('INSERT INTO test.mt VALUES (1),(2),(3)') - client1.expect(prompt) - - client2.expect('"progress".*"read_rows":"2".*\n') - client2.expect('{"row":{"sum(a)":"6","_version":"2"}}\n', escape=True) - - client1.send('DROP TABLE test.lv') - client1.expect(prompt) - client1.send('DROP TABLE test.mt') - client1.expect(prompt) diff --git a/dbms/tests/queries/0_stateless/00971_live_view_watch_http_heartbeat.reference b/dbms/tests/queries/0_stateless/00971_live_view_watch_http_heartbeat.reference deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/dbms/tests/queries/0_stateless/00972_live_view_select_1.reference b/dbms/tests/queries/0_stateless/00972_live_view_select_1.reference deleted file mode 100644 index d00491fd7e5..00000000000 --- a/dbms/tests/queries/0_stateless/00972_live_view_select_1.reference +++ /dev/null @@ -1 +0,0 @@ -1 diff --git a/dbms/tests/queries/0_stateless/00972_live_view_select_1.sql b/dbms/tests/queries/0_stateless/00972_live_view_select_1.sql deleted file mode 100644 index 135516b0cd3..00000000000 --- a/dbms/tests/queries/0_stateless/00972_live_view_select_1.sql +++ /dev/null @@ -1,9 +0,0 @@ -SET allow_experimental_live_view = 1; - -DROP TABLE IF EXISTS test.lv; - -CREATE LIVE VIEW test.lv AS SELECT 1; - -SELECT * FROM test.lv; - -DROP TABLE test.lv; diff --git a/dbms/tests/queries/0_stateless/00973_live_view_select.reference b/dbms/tests/queries/0_stateless/00973_live_view_select.reference deleted file mode 100644 index 75236c0daf7..00000000000 --- a/dbms/tests/queries/0_stateless/00973_live_view_select.reference +++ /dev/null @@ -1,4 +0,0 @@ -6 1 -6 1 -12 2 -12 2 diff --git a/dbms/tests/queries/0_stateless/00973_live_view_select.sql b/dbms/tests/queries/0_stateless/00973_live_view_select.sql deleted file mode 100644 index 4b5ca0a2dd7..00000000000 --- a/dbms/tests/queries/0_stateless/00973_live_view_select.sql +++ /dev/null @@ -1,20 +0,0 @@ -SET allow_experimental_live_view = 1; - -DROP TABLE IF EXISTS test.lv; -DROP TABLE IF EXISTS test.mt; - -CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple(); -CREATE LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt; - -INSERT INTO test.mt VALUES (1),(2),(3); - -SELECT *,_version FROM test.lv; -SELECT *,_version FROM test.lv; - -INSERT INTO test.mt VALUES (1),(2),(3); - -SELECT *,_version FROM test.lv; -SELECT *,_version FROM test.lv; - -DROP TABLE test.lv; -DROP TABLE test.mt; diff --git a/dbms/tests/queries/0_stateless/00974_live_view_select_with_aggregation.reference b/dbms/tests/queries/0_stateless/00974_live_view_select_with_aggregation.reference deleted file mode 100644 index 6d50f0e9c3a..00000000000 --- a/dbms/tests/queries/0_stateless/00974_live_view_select_with_aggregation.reference +++ /dev/null @@ -1,2 +0,0 @@ -6 -21 diff --git a/dbms/tests/queries/0_stateless/00974_live_view_select_with_aggregation.sql b/dbms/tests/queries/0_stateless/00974_live_view_select_with_aggregation.sql deleted file mode 100644 index 3faaec8f623..00000000000 --- a/dbms/tests/queries/0_stateless/00974_live_view_select_with_aggregation.sql +++ /dev/null @@ -1,18 +0,0 @@ -SET allow_experimental_live_view = 1; - -DROP TABLE IF EXISTS test.lv; -DROP TABLE IF EXISTS test.mt; - -CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple(); -CREATE LIVE VIEW test.lv AS SELECT * FROM test.mt; - -INSERT INTO test.mt VALUES (1),(2),(3); - -SELECT sum(a) FROM test.lv; - -INSERT INTO test.mt VALUES (4),(5),(6); - -SELECT sum(a) FROM test.lv; - -DROP TABLE test.lv; -DROP TABLE test.mt; diff --git a/dbms/tests/queries/0_stateless/00975_live_view_create.reference b/dbms/tests/queries/0_stateless/00975_live_view_create.reference deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/dbms/tests/queries/0_stateless/00975_live_view_create.sql b/dbms/tests/queries/0_stateless/00975_live_view_create.sql deleted file mode 100644 index 02c1644d193..00000000000 --- a/dbms/tests/queries/0_stateless/00975_live_view_create.sql +++ /dev/null @@ -1,9 +0,0 @@ -SET allow_experimental_live_view = 1; - -DROP TABLE IF EXISTS test.mt; - -CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple(); -CREATE LIVE VIEW test.lv AS SELECT * FROM test.mt; - -DROP TABLE test.lv; -DROP TABLE test.mt; diff --git a/dbms/tests/queries/0_stateless/00976_live_view_select_version.reference b/dbms/tests/queries/0_stateless/00976_live_view_select_version.reference deleted file mode 100644 index 453bd800469..00000000000 --- a/dbms/tests/queries/0_stateless/00976_live_view_select_version.reference +++ /dev/null @@ -1,3 +0,0 @@ -1 1 -2 1 -3 1 diff --git a/dbms/tests/queries/0_stateless/00976_live_view_select_version.sql b/dbms/tests/queries/0_stateless/00976_live_view_select_version.sql deleted file mode 100644 index ae1c59a92d7..00000000000 --- a/dbms/tests/queries/0_stateless/00976_live_view_select_version.sql +++ /dev/null @@ -1,14 +0,0 @@ -SET allow_experimental_live_view = 1; - -DROP TABLE IF EXISTS test.lv; -DROP TABLE IF EXISTS test.mt; - -CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple(); -CREATE LIVE VIEW test.lv AS SELECT * FROM test.mt; - -INSERT INTO test.mt VALUES (1),(2),(3); - -SELECT *,_version FROM test.lv; - -DROP TABLE test.lv; -DROP TABLE test.mt; diff --git a/dbms/tests/queries/0_stateless/00977_live_view_watch_events.reference b/dbms/tests/queries/0_stateless/00977_live_view_watch_events.reference deleted file mode 100644 index 01e79c32a8c..00000000000 --- a/dbms/tests/queries/0_stateless/00977_live_view_watch_events.reference +++ /dev/null @@ -1,3 +0,0 @@ -1 -2 -3 diff --git a/dbms/tests/queries/0_stateless/00977_live_view_watch_events.sql b/dbms/tests/queries/0_stateless/00977_live_view_watch_events.sql deleted file mode 100644 index 3e0d066fb8d..00000000000 --- a/dbms/tests/queries/0_stateless/00977_live_view_watch_events.sql +++ /dev/null @@ -1,20 +0,0 @@ -SET allow_experimental_live_view = 1; - -DROP TABLE IF EXISTS test.lv; -DROP TABLE IF EXISTS test.mt; - -CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple(); -CREATE LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt; - -WATCH test.lv EVENTS LIMIT 0; - -INSERT INTO test.mt VALUES (1),(2),(3); - -WATCH test.lv EVENTS LIMIT 0; - -INSERT INTO test.mt VALUES (4),(5),(6); - -WATCH test.lv EVENTS LIMIT 0; - -DROP TABLE test.lv; -DROP TABLE test.mt; diff --git a/dbms/tests/queries/0_stateless/00978_live_view_watch.reference b/dbms/tests/queries/0_stateless/00978_live_view_watch.reference deleted file mode 100644 index 6fbbedf1b21..00000000000 --- a/dbms/tests/queries/0_stateless/00978_live_view_watch.reference +++ /dev/null @@ -1,3 +0,0 @@ -0 1 -6 2 -21 3 diff --git a/dbms/tests/queries/0_stateless/00978_live_view_watch.sql b/dbms/tests/queries/0_stateless/00978_live_view_watch.sql deleted file mode 100644 index b8d0d93ccab..00000000000 --- a/dbms/tests/queries/0_stateless/00978_live_view_watch.sql +++ /dev/null @@ -1,20 +0,0 @@ -SET allow_experimental_live_view = 1; - -DROP TABLE IF EXISTS test.lv; -DROP TABLE IF EXISTS test.mt; - -CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple(); -CREATE LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt; - -WATCH test.lv LIMIT 0; - -INSERT INTO test.mt VALUES (1),(2),(3); - -WATCH test.lv LIMIT 0; - -INSERT INTO test.mt VALUES (4),(5),(6); - -WATCH test.lv LIMIT 0; - -DROP TABLE test.lv; -DROP TABLE test.mt; diff --git a/dbms/tests/queries/0_stateless/00979_live_view_watch_live.py b/dbms/tests/queries/0_stateless/00979_live_view_watch_live.py deleted file mode 100755 index 8c5bc5b8eb2..00000000000 --- a/dbms/tests/queries/0_stateless/00979_live_view_watch_live.py +++ /dev/null @@ -1,53 +0,0 @@ -#!/usr/bin/env python -import os -import sys -import signal - -CURDIR = os.path.dirname(os.path.realpath(__file__)) -sys.path.insert(0, os.path.join(CURDIR, 'helpers')) - -from client import client, prompt, end_of_block - -log = None -# uncomment the line below for debugging -#log=sys.stdout - -with client(name='client1>', log=log) as client1, client(name='client2>', log=log) as client2: - client1.expect(prompt) - client2.expect(prompt) - - client1.send('SET allow_experimental_live_view = 1') - client1.expect(prompt) - client2.send('SET allow_experimental_live_view = 1') - client2.expect(prompt) - - client1.send('DROP TABLE IF EXISTS test.lv') - client1.expect(prompt) - client1.send(' DROP TABLE IF EXISTS test.mt') - client1.expect(prompt) - client1.send('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()') - client1.expect(prompt) - client1.send('CREATE LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt') - client1.expect(prompt) - client1.send('WATCH test.lv') - client1.expect(r'0.*1' + end_of_block) - client2.send('INSERT INTO test.mt VALUES (1),(2),(3)') - client1.expect(r'6.*2' + end_of_block) - client2.expect(prompt) - client2.send('INSERT INTO test.mt VALUES (4),(5),(6)') - client1.expect(r'21.*3' + end_of_block) - client2.expect(prompt) - for i in range(1,129): - client2.send('INSERT INTO test.mt VALUES (1)') - client1.expect(r'%d.*%d' % (21+i, 3+i) + end_of_block) - client2.expect(prompt) - # send Ctrl-C - client1.send('\x03', eol='') - match = client1.expect('(%s)|([#\$] )' % prompt) - if match.groups()[1]: - client1.send(client1.command) - client1.expect(prompt) - client1.send('DROP TABLE test.lv') - client1.expect(prompt) - client1.send('DROP TABLE test.mt') - client1.expect(prompt) diff --git a/dbms/tests/queries/0_stateless/00979_live_view_watch_live.reference b/dbms/tests/queries/0_stateless/00979_live_view_watch_live.reference deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/dbms/tests/queries/0_stateless/00980_create_temporary_live_view.reference b/dbms/tests/queries/0_stateless/00980_create_temporary_live_view.reference deleted file mode 100644 index 7f9fcbb2e9c..00000000000 --- a/dbms/tests/queries/0_stateless/00980_create_temporary_live_view.reference +++ /dev/null @@ -1,3 +0,0 @@ -temporary_live_view_timeout 5 -live_view_heartbeat_interval 15 -0 diff --git a/dbms/tests/queries/0_stateless/00980_create_temporary_live_view.sql b/dbms/tests/queries/0_stateless/00980_create_temporary_live_view.sql deleted file mode 100644 index 037c2a9e587..00000000000 --- a/dbms/tests/queries/0_stateless/00980_create_temporary_live_view.sql +++ /dev/null @@ -1,17 +0,0 @@ -SET allow_experimental_live_view = 1; - -DROP TABLE IF EXISTS test.lv; -DROP TABLE IF EXISTS test.mt; - -SELECT name, value from system.settings WHERE name = 'temporary_live_view_timeout'; -SELECT name, value from system.settings WHERE name = 'live_view_heartbeat_interval'; - -SET temporary_live_view_timeout=1; -CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple(); -CREATE TEMPORARY LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt; - -SHOW TABLES LIKE 'lv'; -SELECT sleep(2); -SHOW TABLES LIKE 'lv'; - -DROP TABLE test.mt; diff --git a/dbms/tests/queries/0_stateless/00991_live_view_watch_event_live.python b/dbms/tests/queries/0_stateless/00991_live_view_watch_event_live.python deleted file mode 100644 index 782671cdfaf..00000000000 --- a/dbms/tests/queries/0_stateless/00991_live_view_watch_event_live.python +++ /dev/null @@ -1,81 +0,0 @@ -#!/usr/bin/env python - -import subprocess -import threading -import Queue as queue -import os -import sys -import signal - - -CLICKHOUSE_CLIENT = os.environ.get('CLICKHOUSE_CLIENT') -CLICKHOUSE_CURL = os.environ.get('CLICKHOUSE_CURL') -CLICKHOUSE_URL = os.environ.get('CLICKHOUSE_URL') - - -def send_query(query): - cmd = list(CLICKHOUSE_CLIENT.split()) - cmd += ['--query', query] - # print(cmd) - return subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT).stdout - - -def send_query_in_process_group(query): - cmd = list(CLICKHOUSE_CLIENT.split()) - cmd += ['--query', query] - # print(cmd) - return subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, preexec_fn=os.setsid) - - -def read_lines_and_push_to_queue(pipe, queue): - try: - for line in iter(pipe.readline, ''): - line = line.strip() - print(line) - sys.stdout.flush() - queue.put(line) - except KeyboardInterrupt: - pass - - queue.put(None) - - -def test(): - send_query('DROP TABLE IF EXISTS test.lv').read() - send_query('DROP TABLE IF EXISTS test.mt').read() - send_query('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()').read() - send_query('CREATE LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt').read() - - q = queue.Queue() - p = send_query_in_process_group('WATCH test.lv') - thread = threading.Thread(target=read_lines_and_push_to_queue, args=(p.stdout, q)) - thread.start() - - line = q.get() - print(line) - assert (line == '0\t1') - - send_query('INSERT INTO test.mt VALUES (1),(2),(3)').read() - line = q.get() - print(line) - assert (line == '6\t2') - - send_query('INSERT INTO test.mt VALUES (4),(5),(6)').read() - line = q.get() - print(line) - assert (line == '21\t3') - - # Send Ctrl+C to client. - os.killpg(os.getpgid(p.pid), signal.SIGINT) - # This insert shouldn't affect lv. - send_query('INSERT INTO test.mt VALUES (7),(8),(9)').read() - line = q.get() - print(line) - assert (line is None) - - send_query('DROP TABLE if exists test.lv').read() - send_query('DROP TABLE if exists test.lv').read() - - thread.join() - -test() diff --git a/dbms/tests/queries/0_stateless/00991_live_view_watch_event_live.reference b/dbms/tests/queries/0_stateless/00991_live_view_watch_event_live.reference deleted file mode 100644 index 1e94cdade41..00000000000 --- a/dbms/tests/queries/0_stateless/00991_live_view_watch_event_live.reference +++ /dev/null @@ -1,7 +0,0 @@ -0 1 -0 1 -6 2 -6 2 -21 3 -21 3 -None diff --git a/dbms/tests/queries/0_stateless/00991_live_view_watch_http.python b/dbms/tests/queries/0_stateless/00991_live_view_watch_http.python deleted file mode 100755 index 938547ca0cb..00000000000 --- a/dbms/tests/queries/0_stateless/00991_live_view_watch_http.python +++ /dev/null @@ -1,63 +0,0 @@ -#!/usr/bin/env python - -import subprocess -import threading -import Queue as queue -import os -import sys - - -CLICKHOUSE_CLIENT = os.environ.get('CLICKHOUSE_CLIENT') -CLICKHOUSE_CURL = os.environ.get('CLICKHOUSE_CURL') -CLICKHOUSE_URL = os.environ.get('CLICKHOUSE_URL') - - -def send_query(query): - cmd = list(CLICKHOUSE_CLIENT.split()) - cmd += ['--query', query] - # print(cmd) - return subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT).stdout - - -def send_http_query(query): - cmd = list(CLICKHOUSE_CURL.split()) # list(['curl', '-sSN', '--max-time', '10']) - cmd += ['-sSN', CLICKHOUSE_URL, '-d', query] - return subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT).stdout - - -def read_lines_and_push_to_queue(pipe, queue): - for line in iter(pipe.readline, ''): - line = line.strip() - print(line) - sys.stdout.flush() - queue.put(line) - - queue.put(None) - - -def test(): - send_query('DROP TABLE IF EXISTS test.lv').read() - send_query('DROP TABLE IF EXISTS test.mt').read() - send_query('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()').read() - send_query('CREATE LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt').read() - - q = queue.Queue() - pipe = send_http_query('WATCH test.lv') - thread = threading.Thread(target=read_lines_and_push_to_queue, args=(pipe, q)) - thread.start() - - line = q.get() - print(line) - assert (line == '0\t1') - - send_query('INSERT INTO test.mt VALUES (1),(2),(3)').read() - line = q.get() - print(line) - assert (line == '6\t2') - - send_query('DROP TABLE if exists test.lv').read() - send_query('DROP TABLE if exists test.lv').read() - - thread.join() - -test() diff --git a/dbms/tests/queries/0_stateless/00991_live_view_watch_http.reference b/dbms/tests/queries/0_stateless/00991_live_view_watch_http.reference deleted file mode 100644 index 489457d751b..00000000000 --- a/dbms/tests/queries/0_stateless/00991_live_view_watch_http.reference +++ /dev/null @@ -1,4 +0,0 @@ -0 1 -0 1 -6 2 -6 2 diff --git a/dbms/tests/queries/0_stateless/00991_temporary_live_view_watch_events_heartbeat.python b/dbms/tests/queries/0_stateless/00991_temporary_live_view_watch_events_heartbeat.python deleted file mode 100644 index 70063adc6e3..00000000000 --- a/dbms/tests/queries/0_stateless/00991_temporary_live_view_watch_events_heartbeat.python +++ /dev/null @@ -1,83 +0,0 @@ -#!/usr/bin/env python - -import subprocess -import threading -import Queue as queue -import os -import sys -import signal - - -CLICKHOUSE_CLIENT = os.environ.get('CLICKHOUSE_CLIENT') -CLICKHOUSE_CURL = os.environ.get('CLICKHOUSE_CURL') -CLICKHOUSE_URL = os.environ.get('CLICKHOUSE_URL') - - -def send_query(query): - cmd = list(CLICKHOUSE_CLIENT.split()) - cmd += ['--query', query] - # print(cmd) - return subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT).stdout - - -def send_query_in_process_group(query): - cmd = list(CLICKHOUSE_CLIENT.split()) - cmd += ['--query', query, '--live_view_heartbeat_interval=1', '--progress'] - # print(cmd) - return subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, preexec_fn=os.setsid) - - -def read_lines_and_push_to_queue(pipe, queue): - try: - for line in iter(pipe.readline, ''): - line = line.strip() - # print(line) - sys.stdout.flush() - queue.put(line) - except KeyboardInterrupt: - pass - - queue.put(None) - - -def test(): - send_query('DROP TABLE IF EXISTS test.lv').read() - send_query('DROP TABLE IF EXISTS test.mt').read() - send_query('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()').read() - send_query('CREATE TEMPORARY LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt').read() - - q = queue.Queue() - p = send_query_in_process_group('WATCH test.lv') - thread = threading.Thread(target=read_lines_and_push_to_queue, args=(p.stdout, q)) - thread.start() - - line = q.get() - # print(line) - assert (line.endswith('0\t1')) - assert ('Progress: 0.00 rows' in line) - - send_query('INSERT INTO test.mt VALUES (1),(2),(3)').read() - line = q.get() - assert (line.endswith('6\t2')) - assert ('Progress: 1.00 rows' in line) - - # send_query('INSERT INTO test.mt VALUES (4),(5),(6)').read() - # line = q.get() - # print(line) - # assert (line.endswith('6\t2')) - # assert ('Progress: 1.00 rows' in line) - - # Send Ctrl+C to client. - os.killpg(os.getpgid(p.pid), signal.SIGINT) - # This insert shouldn't affect lv. - send_query('INSERT INTO test.mt VALUES (7),(8),(9)').read() - line = q.get() - # print(line) - # assert (line is None) - - send_query('DROP TABLE if exists test.lv').read() - send_query('DROP TABLE if exists test.lv').read() - - thread.join() - -test() diff --git a/dbms/tests/queries/0_stateless/00991_temporary_live_view_watch_events_heartbeat.reference b/dbms/tests/queries/0_stateless/00991_temporary_live_view_watch_events_heartbeat.reference deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/dbms/tests/queries/0_stateless/00991_temporary_live_view_watch_live.python b/dbms/tests/queries/0_stateless/00991_temporary_live_view_watch_live.python deleted file mode 100644 index d290018a02c..00000000000 --- a/dbms/tests/queries/0_stateless/00991_temporary_live_view_watch_live.python +++ /dev/null @@ -1,81 +0,0 @@ -#!/usr/bin/env python - -import subprocess -import threading -import Queue as queue -import os -import sys -import signal - - -CLICKHOUSE_CLIENT = os.environ.get('CLICKHOUSE_CLIENT') -CLICKHOUSE_CURL = os.environ.get('CLICKHOUSE_CURL') -CLICKHOUSE_URL = os.environ.get('CLICKHOUSE_URL') - - -def send_query(query): - cmd = list(CLICKHOUSE_CLIENT.split()) - cmd += ['--query', query] - # print(cmd) - return subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT).stdout - - -def send_query_in_process_group(query): - cmd = list(CLICKHOUSE_CLIENT.split()) - cmd += ['--query', query] - # print(cmd) - return subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, preexec_fn=os.setsid) - - -def read_lines_and_push_to_queue(pipe, queue): - try: - for line in iter(pipe.readline, ''): - line = line.strip() - print(line) - sys.stdout.flush() - queue.put(line) - except KeyboardInterrupt: - pass - - queue.put(None) - - -def test(): - send_query('DROP TABLE IF EXISTS test.lv').read() - send_query('DROP TABLE IF EXISTS test.mt').read() - send_query('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()').read() - send_query('CREATE TEMPORARY LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt').read() - - q = queue.Queue() - p = send_query_in_process_group('WATCH test.lv') - thread = threading.Thread(target=read_lines_and_push_to_queue, args=(p.stdout, q)) - thread.start() - - line = q.get() - print(line) - assert (line == '0\t1') - - send_query('INSERT INTO test.mt VALUES (1),(2),(3)').read() - line = q.get() - print(line) - assert (line == '6\t2') - - send_query('INSERT INTO test.mt VALUES (4),(5),(6)').read() - line = q.get() - print(line) - assert (line == '21\t3') - - # Send Ctrl+C to client. - os.killpg(os.getpgid(p.pid), signal.SIGINT) - # This insert shouldn't affect lv. - send_query('INSERT INTO test.mt VALUES (7),(8),(9)').read() - line = q.get() - print(line) - assert (line is None) - - send_query('DROP TABLE if exists test.lv').read() - send_query('DROP TABLE if exists test.lv').read() - - thread.join() - -test() diff --git a/dbms/tests/queries/0_stateless/00991_temporary_live_view_watch_live.reference b/dbms/tests/queries/0_stateless/00991_temporary_live_view_watch_live.reference deleted file mode 100644 index 1e94cdade41..00000000000 --- a/dbms/tests/queries/0_stateless/00991_temporary_live_view_watch_live.reference +++ /dev/null @@ -1,7 +0,0 @@ -0 1 -0 1 -6 2 -6 2 -21 3 -21 3 -None