Merge remote-tracking branch 'upstream/master' into fix25

This commit is contained in:
proller 2019-09-02 14:41:58 +03:00
commit bcee0dd0ad
53 changed files with 12 additions and 1035 deletions

View File

@ -37,7 +37,6 @@ class RWLockImpl::LockHolderImpl
RWLock parent;
GroupsContainer::iterator it_group;
ClientsContainer::iterator it_client;
ThreadToHolder::key_type thread_id;
QueryIdToHolder::key_type query_id;
CurrentMetrics::Increment active_client_increment;
@ -74,17 +73,12 @@ RWLockImpl::LockHolder RWLockImpl::getLock(RWLockImpl::Type type, const String &
/// Check if the same query is acquiring previously acquired lock
LockHolder existing_holder_ptr;
auto this_thread_id = std::this_thread::get_id();
auto it_thread = thread_to_holder.find(this_thread_id);
auto it_query = query_id_to_holder.end();
if (query_id != RWLockImpl::NO_QUERY)
it_query = query_id_to_holder.find(query_id);
if (it_thread != thread_to_holder.end())
existing_holder_ptr = it_thread->second.lock();
else if (it_query != query_id_to_holder.end())
existing_holder_ptr = it_query->second.lock();
{
auto it_query = query_id_to_holder.find(query_id);
if (it_query != query_id_to_holder.end())
existing_holder_ptr = it_query->second.lock();
}
if (existing_holder_ptr)
{
@ -125,10 +119,7 @@ RWLockImpl::LockHolder RWLockImpl::getLock(RWLockImpl::Type type, const String &
/// Wait a notification until we will be the only in the group.
it_group->cv.wait(lock, [&] () { return it_group == queue.begin(); });
/// Insert myself (weak_ptr to the holder) to threads set to implement recursive lock
thread_to_holder.emplace(this_thread_id, res);
res->thread_id = this_thread_id;
/// Insert myself (weak_ptr to the holder) to queries set to implement recursive lock
if (query_id != RWLockImpl::NO_QUERY)
query_id_to_holder.emplace(query_id, res);
res->query_id = query_id;
@ -143,7 +134,6 @@ RWLockImpl::LockHolderImpl::~LockHolderImpl()
std::unique_lock lock(parent->mutex);
/// Remove weak_ptrs to the holder, since there are no owners of the current lock
parent->thread_to_holder.erase(thread_id);
parent->query_id_to_holder.erase(query_id);
/// Removes myself from client list of our group

View File

@ -6,7 +6,6 @@
#include <vector>
#include <mutex>
#include <condition_variable>
#include <thread>
#include <map>
#include <string>
@ -19,7 +18,7 @@ using RWLock = std::shared_ptr<RWLockImpl>;
/// Implements shared lock with FIFO service
/// Can be acquired recursively (several calls for the same query or the same OS thread) in Read mode
/// Can be acquired recursively (several calls for the same query) in Read mode
///
/// NOTE: it is important to allow acquiring the same lock in Read mode without waiting if it is already
/// acquired by another thread of the same query. Otherwise the following deadlock is possible:
@ -55,7 +54,6 @@ private:
struct Group;
using GroupsContainer = std::list<Group>;
using ClientsContainer = std::list<Type>;
using ThreadToHolder = std::map<std::thread::id, std::weak_ptr<LockHolderImpl>>;
using QueryIdToHolder = std::map<String, std::weak_ptr<LockHolderImpl>>;
/// Group of clients that should be executed concurrently
@ -73,7 +71,6 @@ private:
mutable std::mutex mutex;
GroupsContainer queue;
ThreadToHolder thread_to_holder;
QueryIdToHolder query_id_to_holder;
};

View File

@ -94,7 +94,7 @@ TEST(Common, RWLock_Recursive)
{
for (int i = 0; i < 2 * cycles; ++i)
{
auto lock = fifo_lock->getLock(RWLockImpl::Write, RWLockImpl::NO_QUERY);
auto lock = fifo_lock->getLock(RWLockImpl::Write, "q1");
auto sleep_for = std::chrono::duration<int, std::micro>(std::uniform_int_distribution<>(1, 100)(gen));
std::this_thread::sleep_for(sleep_for);
@ -105,17 +105,17 @@ TEST(Common, RWLock_Recursive)
{
for (int i = 0; i < cycles; ++i)
{
auto lock1 = fifo_lock->getLock(RWLockImpl::Read, RWLockImpl::NO_QUERY);
auto lock1 = fifo_lock->getLock(RWLockImpl::Read, "q2");
auto sleep_for = std::chrono::duration<int, std::micro>(std::uniform_int_distribution<>(1, 100)(gen));
std::this_thread::sleep_for(sleep_for);
auto lock2 = fifo_lock->getLock(RWLockImpl::Read, RWLockImpl::NO_QUERY);
auto lock2 = fifo_lock->getLock(RWLockImpl::Read, "q2");
EXPECT_ANY_THROW({fifo_lock->getLock(RWLockImpl::Write, RWLockImpl::NO_QUERY);});
EXPECT_ANY_THROW({fifo_lock->getLock(RWLockImpl::Write, "q2");});
}
fifo_lock->getLock(RWLockImpl::Write, RWLockImpl::NO_QUERY);
fifo_lock->getLock(RWLockImpl::Write, "q2");
});
t1.join();

View File

@ -1,47 +0,0 @@
#!/usr/bin/env python
import os
import sys
import signal
CURDIR = os.path.dirname(os.path.realpath(__file__))
sys.path.insert(0, os.path.join(CURDIR, 'helpers'))
from client import client, prompt, end_of_block
log = None
# uncomment the line below for debugging
#log=sys.stdout
with client(name='client1>', log=log) as client1, client(name='client2>', log=log) as client2:
client1.expect(prompt)
client2.expect(prompt)
client1.send('SET allow_experimental_live_view = 1')
client1.expect(prompt)
client2.send('SET allow_experimental_live_view = 1')
client2.expect(prompt)
client1.send('DROP TABLE IF EXISTS test.lv')
client1.expect(prompt)
client1.send(' DROP TABLE IF EXISTS test.mt')
client1.expect(prompt)
client1.send('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()')
client1.expect(prompt)
client1.send('CREATE LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt')
client1.expect(prompt)
client1.send('WATCH test.lv EVENTS')
client1.expect('1.*' + end_of_block)
client2.send('INSERT INTO test.mt VALUES (1),(2),(3)')
client1.expect('2.*' + end_of_block)
client2.send('INSERT INTO test.mt VALUES (4),(5),(6)')
client1.expect('3.*' + end_of_block)
# send Ctrl-C
client1.send('\x03', eol='')
match = client1.expect('(%s)|([#\$] )' % prompt)
if match.groups()[1]:
client1.send(client1.command)
client1.expect(prompt)
client1.send('DROP TABLE test.lv')
client1.expect(prompt)
client1.send('DROP TABLE test.mt')
client1.expect(prompt)

View File

@ -1,20 +0,0 @@
SET allow_experimental_live_view = 1;
DROP TABLE IF EXISTS test.lv;
DROP TABLE IF EXISTS test.mt;
CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple();
CREATE TEMPORARY LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt;
WATCH test.lv LIMIT 0;
INSERT INTO test.mt VALUES (1),(2),(3);
WATCH test.lv LIMIT 0;
INSERT INTO test.mt VALUES (4),(5),(6);
WATCH test.lv LIMIT 0;
DROP TABLE test.lv;
DROP TABLE test.mt;

View File

@ -1,47 +0,0 @@
#!/usr/bin/env python
import os
import sys
import signal
CURDIR = os.path.dirname(os.path.realpath(__file__))
sys.path.insert(0, os.path.join(CURDIR, 'helpers'))
from client import client, prompt, end_of_block
log = None
# uncomment the line below for debugging
#log=sys.stdout
with client(name='client1>', log=log) as client1, client(name='client2>', log=log) as client2:
client1.expect(prompt)
client2.expect(prompt)
client1.send('SET allow_experimental_live_view = 1')
client1.expect(prompt)
client2.send('SET allow_experimental_live_view = 1')
client2.expect(prompt)
client1.send('DROP TABLE IF EXISTS test.lv')
client1.expect(prompt)
client1.send('DROP TABLE IF EXISTS test.mt')
client1.expect(prompt)
client1.send('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()')
client1.expect(prompt)
client1.send('CREATE TEMPORARY LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt')
client1.expect(prompt)
client1.send('WATCH test.lv')
client1.expect(r'0.*1' + end_of_block)
client2.send('INSERT INTO test.mt VALUES (1),(2),(3)')
client1.expect(r'6.*2' + end_of_block)
client2.send('INSERT INTO test.mt VALUES (4),(5),(6)')
client1.expect(r'21.*3' + end_of_block)
# send Ctrl-C
client1.send('\x03', eol='')
match = client1.expect('(%s)|([#\$] )' % prompt)
if match.groups()[1]:
client1.send(client1.command)
client1.expect(prompt)
client1.send('DROP TABLE test.lv')
client1.expect(prompt)
client1.send('DROP TABLE test.mt')
client1.expect(prompt)

View File

@ -1,54 +0,0 @@
#!/usr/bin/env python
import os
import sys
import signal
CURDIR = os.path.dirname(os.path.realpath(__file__))
sys.path.insert(0, os.path.join(CURDIR, 'helpers'))
from client import client, prompt, end_of_block
log = None
# uncomment the line below for debugging
#log=sys.stdout
with client(name='client1>', log=log) as client1, client(name='client2>', log=log) as client2:
client1.expect(prompt)
client2.expect(prompt)
client1.send('SET allow_experimental_live_view = 1')
client1.expect(prompt)
client2.send('SET allow_experimental_live_view = 1')
client2.expect(prompt)
client1.send('DROP TABLE IF EXISTS test.lv')
client1.expect(prompt)
client1.send('DROP TABLE IF EXISTS test.mt')
client1.expect(prompt)
client1.send('SET temporary_live_view_timeout=1')
client1.expect(prompt)
client1.send('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()')
client1.expect(prompt)
client1.send('CREATE TEMPORARY LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt')
client1.expect(prompt)
client1.send('WATCH test.lv')
client1.expect(r'0.*1' + end_of_block)
client2.send('INSERT INTO test.mt VALUES (1),(2),(3)')
client2.expect(prompt)
client1.expect(r'6.*2' + end_of_block)
client2.send('INSERT INTO test.mt VALUES (4),(5),(6)')
client2.expect(prompt)
client1.expect(r'21.*3' + end_of_block)
# send Ctrl-C
client1.send('\x03', eol='')
match = client1.expect('(%s)|([#\$] )' % prompt)
if match.groups()[1]:
client1.send(client1.command)
client1.expect(prompt)
client1.send('SELECT sleep(1)')
client1.expect(prompt)
client1.send('DROP TABLE test.lv')
client1.expect('Table test.lv doesn\'t exist')
client1.expect(prompt)
client1.send('DROP TABLE test.mt')
client1.expect(prompt)

View File

@ -1,49 +0,0 @@
#!/usr/bin/env python
import os
import sys
import signal
CURDIR = os.path.dirname(os.path.realpath(__file__))
sys.path.insert(0, os.path.join(CURDIR, 'helpers'))
from client import client, prompt, end_of_block
log = None
# uncomment the line below for debugging
#log=sys.stdout
with client(name='client1>', log=log) as client1, client(name='client2>', log=log) as client2:
client1.expect(prompt)
client2.expect(prompt)
client1.send('SET allow_experimental_live_view = 1')
client1.expect(prompt)
client2.send('SET allow_experimental_live_view = 1')
client2.expect(prompt)
client1.send('DROP TABLE IF EXISTS test.lv')
client1.expect(prompt)
client1.send(' DROP TABLE IF EXISTS test.mt')
client1.expect(prompt)
client1.send('SET live_view_heartbeat_interval=1')
client1.expect(prompt)
client1.send('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()')
client1.expect(prompt)
client1.send('CREATE TEMPORARY LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt')
client1.expect(prompt)
client1.send('WATCH test.lv EVENTS')
client2.send('INSERT INTO test.mt VALUES (1),(2),(3)')
client1.expect('2.*' + end_of_block)
client1.expect('Progress: 2.00 rows.*\)')
# wait for heartbeat
client1.expect('Progress: 2.00 rows.*\)')
# send Ctrl-C
client1.send('\x03', eol='')
match = client1.expect('(%s)|([#\$] )' % prompt)
if match.groups()[1]:
client1.send(client1.command)
client1.expect(prompt)
client1.send('DROP TABLE test.lv')
client1.expect(prompt)
client1.send('DROP TABLE test.mt')
client1.expect(prompt)

View File

@ -1,50 +0,0 @@
#!/usr/bin/env python
import os
import sys
import signal
CURDIR = os.path.dirname(os.path.realpath(__file__))
sys.path.insert(0, os.path.join(CURDIR, 'helpers'))
from client import client, prompt, end_of_block
log = None
# uncomment the line below for debugging
#log=sys.stdout
with client(name='client1>', log=log) as client1, client(name='client2>', log=log) as client2:
client1.expect(prompt)
client2.expect(prompt)
client1.send('SET allow_experimental_live_view = 1')
client1.expect(prompt)
client2.send('SET allow_experimental_live_view = 1')
client2.expect(prompt)
client1.send('DROP TABLE IF EXISTS test.lv')
client1.expect(prompt)
client1.send(' DROP TABLE IF EXISTS test.mt')
client1.expect(prompt)
client1.send('SET live_view_heartbeat_interval=1')
client1.expect(prompt)
client1.send('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()')
client1.expect(prompt)
client1.send('CREATE TEMPORARY LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt')
client1.expect(prompt)
client1.send('WATCH test.lv')
client1.expect(r'0.*1' + end_of_block)
client2.send('INSERT INTO test.mt VALUES (1),(2),(3)')
client1.expect(r'6.*2' + end_of_block)
client1.expect('Progress: 2.00 rows.*\)')
# wait for heartbeat
client1.expect('Progress: 2.00 rows.*\)')
# send Ctrl-C
client1.send('\x03', eol='')
match = client1.expect('(%s)|([#\$] )' % prompt)
if match.groups()[1]:
client1.send(client1.command)
client1.expect(prompt)
client1.send('DROP TABLE test.lv')
client1.expect(prompt)
client1.send('DROP TABLE test.mt')
client1.expect(prompt)

View File

@ -1,40 +0,0 @@
#!/usr/bin/env python
import os
import sys
CURDIR = os.path.dirname(os.path.realpath(__file__))
sys.path.insert(0, os.path.join(CURDIR, 'helpers'))
from client import client, prompt, end_of_block
from httpclient import client as http_client
log = None
# uncomment the line below for debugging
#log=sys.stdout
with client(name='client1>', log=log) as client1:
client1.expect(prompt)
client1.send('SET allow_experimental_live_view = 1')
client1.expect(prompt)
client1.send('DROP TABLE IF EXISTS test.lv')
client1.expect(prompt)
client1.send(' DROP TABLE IF EXISTS test.mt')
client1.expect(prompt)
client1.send('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()')
client1.expect(prompt)
client1.send('CREATE LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt')
client1.expect(prompt)
with http_client({'method':'GET', 'url': '/?allow_experimental_live_view=1&query=WATCH%20test.lv%20EVENTS'}, name='client2>', log=log) as client2:
client2.expect('.*1\n')
client1.send('INSERT INTO test.mt VALUES (1),(2),(3)')
client1.expect(prompt)
client2.expect('.*2\n')
client1.send('DROP TABLE test.lv')
client1.expect(prompt)
client1.send('DROP TABLE test.mt')
client1.expect(prompt)

View File

@ -1,40 +0,0 @@
#!/usr/bin/env python
import os
import sys
CURDIR = os.path.dirname(os.path.realpath(__file__))
sys.path.insert(0, os.path.join(CURDIR, 'helpers'))
from client import client, prompt, end_of_block
from httpclient import client as http_client
log = None
# uncomment the line below for debugging
#log=sys.stdout
with client(name='client1>', log=log) as client1:
client1.expect(prompt)
client1.send('SET allow_experimental_live_view = 1')
client1.expect(prompt)
client1.send('DROP TABLE IF EXISTS test.lv')
client1.expect(prompt)
client1.send(' DROP TABLE IF EXISTS test.mt')
client1.expect(prompt)
client1.send('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()')
client1.expect(prompt)
client1.send('CREATE LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt')
client1.expect(prompt)
with http_client({'method':'GET', 'url':'/?allow_experimental_live_view=1&query=WATCH%20test.lv'}, name='client2>', log=log) as client2:
client2.expect('.*0\t1\n')
client1.send('INSERT INTO test.mt VALUES (1),(2),(3)')
client1.expect(prompt)
client2.expect('.*6\t2\n')
client1.send('DROP TABLE test.lv')
client1.expect(prompt)
client1.send('DROP TABLE test.mt')
client1.expect(prompt)

View File

@ -1,4 +0,0 @@
{"row":{"a":1}}
{"row":{"a":2}}
{"row":{"a":3}}
{"progress":{"read_rows":"3","read_bytes":"36","written_rows":"0","written_bytes":"0","total_rows_to_read":"0"}}

View File

@ -1,14 +0,0 @@
SET allow_experimental_live_view = 1;
DROP TABLE IF EXISTS test.lv;
DROP TABLE IF EXISTS test.mt;
CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple();
CREATE LIVE VIEW test.lv AS SELECT * FROM test.mt;
INSERT INTO test.mt VALUES (1),(2),(3);
SELECT * FROM test.lv FORMAT JSONEachRowWithProgress;
DROP TABLE test.lv;
DROP TABLE test.mt;

View File

@ -1,6 +0,0 @@
{"row":{"sum(a)":"0","_version":"1"}}
{"progress":{"read_rows":"1","read_bytes":"16","written_rows":"0","written_bytes":"0","total_rows_to_read":"0"}}
{"row":{"sum(a)":"6","_version":"2"}}
{"progress":{"read_rows":"1","read_bytes":"16","written_rows":"0","written_bytes":"0","total_rows_to_read":"0"}}
{"row":{"sum(a)":"21","_version":"3"}}
{"progress":{"read_rows":"1","read_bytes":"16","written_rows":"0","written_bytes":"0","total_rows_to_read":"0"}}

View File

@ -1,20 +0,0 @@
SET allow_experimental_live_view = 1;
DROP TABLE IF EXISTS test.lv;
DROP TABLE IF EXISTS test.mt;
CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple();
CREATE LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt;
WATCH test.lv LIMIT 0 FORMAT JSONEachRowWithProgress;
INSERT INTO test.mt VALUES (1),(2),(3);
WATCH test.lv LIMIT 0 FORMAT JSONEachRowWithProgress;
INSERT INTO test.mt VALUES (4),(5),(6);
WATCH test.lv LIMIT 0 FORMAT JSONEachRowWithProgress;
DROP TABLE test.lv;
DROP TABLE test.mt;

View File

@ -1,45 +0,0 @@
#!/usr/bin/env python
import os
import sys
CURDIR = os.path.dirname(os.path.realpath(__file__))
sys.path.insert(0, os.path.join(CURDIR, 'helpers'))
from client import client, prompt, end_of_block
from httpclient import client as http_client
log = None
# uncomment the line below for debugging
#log=sys.stdout
with client(name='client1>', log=log) as client1:
client1.expect(prompt)
client1.send('SET allow_experimental_live_view = 1')
client1.expect(prompt)
client1.send('DROP TABLE IF EXISTS test.lv')
client1.expect(prompt)
client1.send(' DROP TABLE IF EXISTS test.mt')
client1.expect(prompt)
client1.send('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()')
client1.expect(prompt)
client1.send('CREATE LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt')
client1.expect(prompt)
with http_client({'method':'GET', 'url': '/?allow_experimental_live_view=1&live_view_heartbeat_interval=1&query=WATCH%20test.lv%20EVENTS%20FORMAT%20JSONEachRowWithProgress'}, name='client2>', log=log) as client2:
client2.expect('{"progress":{"read_rows":"1","read_bytes":"8","written_rows":"0","written_bytes":"0","total_rows_to_read":"0"}}\n', escape=True)
client2.expect('{"row":{"version":"1"}', escape=True)
client2.expect('{"progress":{"read_rows":"1","read_bytes":"8","written_rows":"0","written_bytes":"0","total_rows_to_read":"0"}}', escape=True)
# heartbeat is provided by progress message
client2.expect('{"progress":{"read_rows":"1","read_bytes":"8","written_rows":"0","written_bytes":"0","total_rows_to_read":"0"}}', escape=True)
client1.send('INSERT INTO test.mt VALUES (1),(2),(3)')
client1.expect(prompt)
client2.expect('{"row":{"version":"2"}}\n', escape=True)
client1.send('DROP TABLE test.lv')
client1.expect(prompt)
client1.send('DROP TABLE test.mt')
client1.expect(prompt)

View File

@ -1,46 +0,0 @@
#!/usr/bin/env python
import os
import sys
CURDIR = os.path.dirname(os.path.realpath(__file__))
sys.path.insert(0, os.path.join(CURDIR, 'helpers'))
from client import client, prompt, end_of_block
from httpclient import client as http_client
log = None
# uncomment the line below for debugging
#log=sys.stdout
with client(name='client1>', log=log) as client1:
client1.expect(prompt)
client1.send('SET allow_experimental_live_view = 1')
client1.expect(prompt)
client1.send('DROP TABLE IF EXISTS test.lv')
client1.expect(prompt)
client1.send(' DROP TABLE IF EXISTS test.mt')
client1.expect(prompt)
client1.send('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()')
client1.expect(prompt)
client1.send('CREATE LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt')
client1.expect(prompt)
with http_client({'method':'GET', 'url':'/?allow_experimental_live_view=1&live_view_heartbeat_interval=1&query=WATCH%20test.lv%20FORMAT%20JSONEachRowWithProgress'}, name='client2>', log=log) as client2:
client2.expect('"progress".*',)
client2.expect('{"row":{"sum(a)":"0","_version":"1"}}\n', escape=True)
client2.expect('"progress".*\n')
# heartbeat is provided by progress message
client2.expect('"progress".*\n')
client1.send('INSERT INTO test.mt VALUES (1),(2),(3)')
client1.expect(prompt)
client2.expect('"progress".*"read_rows":"2".*\n')
client2.expect('{"row":{"sum(a)":"6","_version":"2"}}\n', escape=True)
client1.send('DROP TABLE test.lv')
client1.expect(prompt)
client1.send('DROP TABLE test.mt')
client1.expect(prompt)

View File

@ -1,9 +0,0 @@
SET allow_experimental_live_view = 1;
DROP TABLE IF EXISTS test.lv;
CREATE LIVE VIEW test.lv AS SELECT 1;
SELECT * FROM test.lv;
DROP TABLE test.lv;

View File

@ -1,4 +0,0 @@
6 1
6 1
12 2
12 2

View File

@ -1,20 +0,0 @@
SET allow_experimental_live_view = 1;
DROP TABLE IF EXISTS test.lv;
DROP TABLE IF EXISTS test.mt;
CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple();
CREATE LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt;
INSERT INTO test.mt VALUES (1),(2),(3);
SELECT *,_version FROM test.lv;
SELECT *,_version FROM test.lv;
INSERT INTO test.mt VALUES (1),(2),(3);
SELECT *,_version FROM test.lv;
SELECT *,_version FROM test.lv;
DROP TABLE test.lv;
DROP TABLE test.mt;

View File

@ -1,18 +0,0 @@
SET allow_experimental_live_view = 1;
DROP TABLE IF EXISTS test.lv;
DROP TABLE IF EXISTS test.mt;
CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple();
CREATE LIVE VIEW test.lv AS SELECT * FROM test.mt;
INSERT INTO test.mt VALUES (1),(2),(3);
SELECT sum(a) FROM test.lv;
INSERT INTO test.mt VALUES (4),(5),(6);
SELECT sum(a) FROM test.lv;
DROP TABLE test.lv;
DROP TABLE test.mt;

View File

@ -1,9 +0,0 @@
SET allow_experimental_live_view = 1;
DROP TABLE IF EXISTS test.mt;
CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple();
CREATE LIVE VIEW test.lv AS SELECT * FROM test.mt;
DROP TABLE test.lv;
DROP TABLE test.mt;

View File

@ -1,14 +0,0 @@
SET allow_experimental_live_view = 1;
DROP TABLE IF EXISTS test.lv;
DROP TABLE IF EXISTS test.mt;
CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple();
CREATE LIVE VIEW test.lv AS SELECT * FROM test.mt;
INSERT INTO test.mt VALUES (1),(2),(3);
SELECT *,_version FROM test.lv;
DROP TABLE test.lv;
DROP TABLE test.mt;

View File

@ -1,20 +0,0 @@
SET allow_experimental_live_view = 1;
DROP TABLE IF EXISTS test.lv;
DROP TABLE IF EXISTS test.mt;
CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple();
CREATE LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt;
WATCH test.lv EVENTS LIMIT 0;
INSERT INTO test.mt VALUES (1),(2),(3);
WATCH test.lv EVENTS LIMIT 0;
INSERT INTO test.mt VALUES (4),(5),(6);
WATCH test.lv EVENTS LIMIT 0;
DROP TABLE test.lv;
DROP TABLE test.mt;

View File

@ -1,3 +0,0 @@
0 1
6 2
21 3

View File

@ -1,20 +0,0 @@
SET allow_experimental_live_view = 1;
DROP TABLE IF EXISTS test.lv;
DROP TABLE IF EXISTS test.mt;
CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple();
CREATE LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt;
WATCH test.lv LIMIT 0;
INSERT INTO test.mt VALUES (1),(2),(3);
WATCH test.lv LIMIT 0;
INSERT INTO test.mt VALUES (4),(5),(6);
WATCH test.lv LIMIT 0;
DROP TABLE test.lv;
DROP TABLE test.mt;

View File

@ -1,53 +0,0 @@
#!/usr/bin/env python
import os
import sys
import signal
CURDIR = os.path.dirname(os.path.realpath(__file__))
sys.path.insert(0, os.path.join(CURDIR, 'helpers'))
from client import client, prompt, end_of_block
log = None
# uncomment the line below for debugging
#log=sys.stdout
with client(name='client1>', log=log) as client1, client(name='client2>', log=log) as client2:
client1.expect(prompt)
client2.expect(prompt)
client1.send('SET allow_experimental_live_view = 1')
client1.expect(prompt)
client2.send('SET allow_experimental_live_view = 1')
client2.expect(prompt)
client1.send('DROP TABLE IF EXISTS test.lv')
client1.expect(prompt)
client1.send(' DROP TABLE IF EXISTS test.mt')
client1.expect(prompt)
client1.send('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()')
client1.expect(prompt)
client1.send('CREATE LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt')
client1.expect(prompt)
client1.send('WATCH test.lv')
client1.expect(r'0.*1' + end_of_block)
client2.send('INSERT INTO test.mt VALUES (1),(2),(3)')
client1.expect(r'6.*2' + end_of_block)
client2.expect(prompt)
client2.send('INSERT INTO test.mt VALUES (4),(5),(6)')
client1.expect(r'21.*3' + end_of_block)
client2.expect(prompt)
for i in range(1,129):
client2.send('INSERT INTO test.mt VALUES (1)')
client1.expect(r'%d.*%d' % (21+i, 3+i) + end_of_block)
client2.expect(prompt)
# send Ctrl-C
client1.send('\x03', eol='')
match = client1.expect('(%s)|([#\$] )' % prompt)
if match.groups()[1]:
client1.send(client1.command)
client1.expect(prompt)
client1.send('DROP TABLE test.lv')
client1.expect(prompt)
client1.send('DROP TABLE test.mt')
client1.expect(prompt)

View File

@ -1,3 +0,0 @@
temporary_live_view_timeout 5
live_view_heartbeat_interval 15
0

View File

@ -1,17 +0,0 @@
SET allow_experimental_live_view = 1;
DROP TABLE IF EXISTS test.lv;
DROP TABLE IF EXISTS test.mt;
SELECT name, value from system.settings WHERE name = 'temporary_live_view_timeout';
SELECT name, value from system.settings WHERE name = 'live_view_heartbeat_interval';
SET temporary_live_view_timeout=1;
CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple();
CREATE TEMPORARY LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt;
SHOW TABLES LIKE 'lv';
SELECT sleep(2);
SHOW TABLES LIKE 'lv';
DROP TABLE test.mt;

View File

@ -1,81 +0,0 @@
#!/usr/bin/env python
import subprocess
import threading
import Queue as queue
import os
import sys
import signal
CLICKHOUSE_CLIENT = os.environ.get('CLICKHOUSE_CLIENT')
CLICKHOUSE_CURL = os.environ.get('CLICKHOUSE_CURL')
CLICKHOUSE_URL = os.environ.get('CLICKHOUSE_URL')
def send_query(query):
cmd = list(CLICKHOUSE_CLIENT.split())
cmd += ['--query', query]
# print(cmd)
return subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT).stdout
def send_query_in_process_group(query):
cmd = list(CLICKHOUSE_CLIENT.split())
cmd += ['--query', query]
# print(cmd)
return subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, preexec_fn=os.setsid)
def read_lines_and_push_to_queue(pipe, queue):
try:
for line in iter(pipe.readline, ''):
line = line.strip()
print(line)
sys.stdout.flush()
queue.put(line)
except KeyboardInterrupt:
pass
queue.put(None)
def test():
send_query('DROP TABLE IF EXISTS test.lv').read()
send_query('DROP TABLE IF EXISTS test.mt').read()
send_query('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()').read()
send_query('CREATE LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt').read()
q = queue.Queue()
p = send_query_in_process_group('WATCH test.lv')
thread = threading.Thread(target=read_lines_and_push_to_queue, args=(p.stdout, q))
thread.start()
line = q.get()
print(line)
assert (line == '0\t1')
send_query('INSERT INTO test.mt VALUES (1),(2),(3)').read()
line = q.get()
print(line)
assert (line == '6\t2')
send_query('INSERT INTO test.mt VALUES (4),(5),(6)').read()
line = q.get()
print(line)
assert (line == '21\t3')
# Send Ctrl+C to client.
os.killpg(os.getpgid(p.pid), signal.SIGINT)
# This insert shouldn't affect lv.
send_query('INSERT INTO test.mt VALUES (7),(8),(9)').read()
line = q.get()
print(line)
assert (line is None)
send_query('DROP TABLE if exists test.lv').read()
send_query('DROP TABLE if exists test.lv').read()
thread.join()
test()

View File

@ -1,7 +0,0 @@
0 1
0 1
6 2
6 2
21 3
21 3
None

View File

@ -1,63 +0,0 @@
#!/usr/bin/env python
import subprocess
import threading
import Queue as queue
import os
import sys
CLICKHOUSE_CLIENT = os.environ.get('CLICKHOUSE_CLIENT')
CLICKHOUSE_CURL = os.environ.get('CLICKHOUSE_CURL')
CLICKHOUSE_URL = os.environ.get('CLICKHOUSE_URL')
def send_query(query):
cmd = list(CLICKHOUSE_CLIENT.split())
cmd += ['--query', query]
# print(cmd)
return subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT).stdout
def send_http_query(query):
cmd = list(CLICKHOUSE_CURL.split()) # list(['curl', '-sSN', '--max-time', '10'])
cmd += ['-sSN', CLICKHOUSE_URL, '-d', query]
return subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT).stdout
def read_lines_and_push_to_queue(pipe, queue):
for line in iter(pipe.readline, ''):
line = line.strip()
print(line)
sys.stdout.flush()
queue.put(line)
queue.put(None)
def test():
send_query('DROP TABLE IF EXISTS test.lv').read()
send_query('DROP TABLE IF EXISTS test.mt').read()
send_query('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()').read()
send_query('CREATE LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt').read()
q = queue.Queue()
pipe = send_http_query('WATCH test.lv')
thread = threading.Thread(target=read_lines_and_push_to_queue, args=(pipe, q))
thread.start()
line = q.get()
print(line)
assert (line == '0\t1')
send_query('INSERT INTO test.mt VALUES (1),(2),(3)').read()
line = q.get()
print(line)
assert (line == '6\t2')
send_query('DROP TABLE if exists test.lv').read()
send_query('DROP TABLE if exists test.lv').read()
thread.join()
test()

View File

@ -1,4 +0,0 @@
0 1
0 1
6 2
6 2

View File

@ -1,83 +0,0 @@
#!/usr/bin/env python
import subprocess
import threading
import Queue as queue
import os
import sys
import signal
CLICKHOUSE_CLIENT = os.environ.get('CLICKHOUSE_CLIENT')
CLICKHOUSE_CURL = os.environ.get('CLICKHOUSE_CURL')
CLICKHOUSE_URL = os.environ.get('CLICKHOUSE_URL')
def send_query(query):
cmd = list(CLICKHOUSE_CLIENT.split())
cmd += ['--query', query]
# print(cmd)
return subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT).stdout
def send_query_in_process_group(query):
cmd = list(CLICKHOUSE_CLIENT.split())
cmd += ['--query', query, '--live_view_heartbeat_interval=1', '--progress']
# print(cmd)
return subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, preexec_fn=os.setsid)
def read_lines_and_push_to_queue(pipe, queue):
try:
for line in iter(pipe.readline, ''):
line = line.strip()
# print(line)
sys.stdout.flush()
queue.put(line)
except KeyboardInterrupt:
pass
queue.put(None)
def test():
send_query('DROP TABLE IF EXISTS test.lv').read()
send_query('DROP TABLE IF EXISTS test.mt').read()
send_query('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()').read()
send_query('CREATE TEMPORARY LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt').read()
q = queue.Queue()
p = send_query_in_process_group('WATCH test.lv')
thread = threading.Thread(target=read_lines_and_push_to_queue, args=(p.stdout, q))
thread.start()
line = q.get()
# print(line)
assert (line.endswith('0\t1'))
assert ('Progress: 0.00 rows' in line)
send_query('INSERT INTO test.mt VALUES (1),(2),(3)').read()
line = q.get()
assert (line.endswith('6\t2'))
assert ('Progress: 1.00 rows' in line)
# send_query('INSERT INTO test.mt VALUES (4),(5),(6)').read()
# line = q.get()
# print(line)
# assert (line.endswith('6\t2'))
# assert ('Progress: 1.00 rows' in line)
# Send Ctrl+C to client.
os.killpg(os.getpgid(p.pid), signal.SIGINT)
# This insert shouldn't affect lv.
send_query('INSERT INTO test.mt VALUES (7),(8),(9)').read()
line = q.get()
# print(line)
# assert (line is None)
send_query('DROP TABLE if exists test.lv').read()
send_query('DROP TABLE if exists test.lv').read()
thread.join()
test()

View File

@ -1,81 +0,0 @@
#!/usr/bin/env python
import subprocess
import threading
import Queue as queue
import os
import sys
import signal
CLICKHOUSE_CLIENT = os.environ.get('CLICKHOUSE_CLIENT')
CLICKHOUSE_CURL = os.environ.get('CLICKHOUSE_CURL')
CLICKHOUSE_URL = os.environ.get('CLICKHOUSE_URL')
def send_query(query):
cmd = list(CLICKHOUSE_CLIENT.split())
cmd += ['--query', query]
# print(cmd)
return subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT).stdout
def send_query_in_process_group(query):
cmd = list(CLICKHOUSE_CLIENT.split())
cmd += ['--query', query]
# print(cmd)
return subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, preexec_fn=os.setsid)
def read_lines_and_push_to_queue(pipe, queue):
try:
for line in iter(pipe.readline, ''):
line = line.strip()
print(line)
sys.stdout.flush()
queue.put(line)
except KeyboardInterrupt:
pass
queue.put(None)
def test():
send_query('DROP TABLE IF EXISTS test.lv').read()
send_query('DROP TABLE IF EXISTS test.mt').read()
send_query('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()').read()
send_query('CREATE TEMPORARY LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt').read()
q = queue.Queue()
p = send_query_in_process_group('WATCH test.lv')
thread = threading.Thread(target=read_lines_and_push_to_queue, args=(p.stdout, q))
thread.start()
line = q.get()
print(line)
assert (line == '0\t1')
send_query('INSERT INTO test.mt VALUES (1),(2),(3)').read()
line = q.get()
print(line)
assert (line == '6\t2')
send_query('INSERT INTO test.mt VALUES (4),(5),(6)').read()
line = q.get()
print(line)
assert (line == '21\t3')
# Send Ctrl+C to client.
os.killpg(os.getpgid(p.pid), signal.SIGINT)
# This insert shouldn't affect lv.
send_query('INSERT INTO test.mt VALUES (7),(8),(9)').read()
line = q.get()
print(line)
assert (line is None)
send_query('DROP TABLE if exists test.lv').read()
send_query('DROP TABLE if exists test.lv').read()
thread.join()
test()

View File

@ -1,7 +0,0 @@
0 1
0 1
6 2
6 2
21 3
21 3
None