Merge pull request #6656 from vzakaznikov/fix_live_view_no_users_thread

Fix live view no users thread
This commit is contained in:
alexey-milovidov 2019-08-28 16:33:36 +03:00 committed by GitHub
commit 696d505a1a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
62 changed files with 1417 additions and 38 deletions

View File

@ -31,7 +31,12 @@ public:
const bool has_limit_, const UInt64 limit_,
const UInt64 heartbeat_interval_sec_,
const UInt64 temporary_live_view_timeout_sec_)
: storage(std::move(storage_)), blocks_ptr(std::move(blocks_ptr_)), blocks_metadata_ptr(std::move(blocks_metadata_ptr_)), active_ptr(std::move(active_ptr_)), has_limit(has_limit_), limit(limit_), heartbeat_interval_usec(heartbeat_interval_sec_ * 1000000), temporary_live_view_timeout_sec(temporary_live_view_timeout_sec_)
: storage(std::move(storage_)), blocks_ptr(std::move(blocks_ptr_)),
blocks_metadata_ptr(std::move(blocks_metadata_ptr_)),
active_ptr(std::move(active_ptr_)),
has_limit(has_limit_), limit(limit_),
heartbeat_interval_usec(heartbeat_interval_sec_ * 1000000),
temporary_live_view_timeout_sec(temporary_live_view_timeout_sec_)
{
/// grab active pointer
active = active_ptr.lock();

View File

@ -51,7 +51,12 @@ public:
const bool has_limit_, const UInt64 limit_,
const UInt64 heartbeat_interval_sec_,
const UInt64 temporary_live_view_timeout_sec_)
: storage(std::move(storage_)), blocks_ptr(std::move(blocks_ptr_)), blocks_metadata_ptr(std::move(blocks_metadata_ptr_)), active_ptr(std::move(active_ptr_)), has_limit(has_limit_), limit(limit_), heartbeat_interval_usec(heartbeat_interval_sec_ * 1000000), temporary_live_view_timeout_sec(temporary_live_view_timeout_sec_)
: storage(std::move(storage_)), blocks_ptr(std::move(blocks_ptr_)),
blocks_metadata_ptr(std::move(blocks_metadata_ptr_)),
active_ptr(std::move(active_ptr_)), has_limit(has_limit_),
limit(limit_),
heartbeat_interval_usec(heartbeat_interval_sec_ * 1000000),
temporary_live_view_timeout_sec(temporary_live_view_timeout_sec_)
{
/// grab active pointer
active = active_ptr.lock();

View File

@ -365,25 +365,25 @@ void StorageLiveView::checkTableCanBeDropped() const
}
}
void StorageLiveView::noUsersThread(const UInt64 & timeout)
void StorageLiveView::noUsersThread(std::shared_ptr<StorageLiveView> storage, const UInt64 & timeout)
{
if (shutdown_called)
return;
bool drop_table = false;
if (storage->shutdown_called)
return;
{
while (1)
{
std::unique_lock lock(no_users_thread_mutex);
if (!no_users_thread_condition.wait_for(lock, std::chrono::seconds(timeout), [&] { return no_users_thread_wakeup; }))
std::unique_lock lock(storage->no_users_thread_mutex);
if (!storage->no_users_thread_condition.wait_for(lock, std::chrono::seconds(timeout), [&] { return storage->no_users_thread_wakeup; }))
{
no_users_thread_wakeup = false;
if (shutdown_called)
storage->no_users_thread_wakeup = false;
if (storage->shutdown_called)
return;
if (hasUsers())
if (storage->hasUsers())
return;
if (!global_context.getDependencies(database_name, table_name).empty())
if (!storage->global_context.getDependencies(storage->database_name, storage->table_name).empty())
continue;
drop_table = true;
}
@ -393,17 +393,17 @@ void StorageLiveView::noUsersThread(const UInt64 & timeout)
if (drop_table)
{
if (global_context.tryGetTable(database_name, table_name))
if (storage->global_context.tryGetTable(storage->database_name, storage->table_name))
{
try
{
/// We create and execute `drop` query for this table
auto drop_query = std::make_shared<ASTDropQuery>();
drop_query->database = database_name;
drop_query->table = table_name;
drop_query->database = storage->database_name;
drop_query->table = storage->table_name;
drop_query->kind = ASTDropQuery::Kind::Drop;
ASTPtr ast_drop_query = drop_query;
InterpreterDropQuery drop_interpreter(ast_drop_query, global_context);
InterpreterDropQuery drop_interpreter(ast_drop_query, storage->global_context);
drop_interpreter.execute();
}
catch (...)
@ -419,9 +419,6 @@ void StorageLiveView::startNoUsersThread(const UInt64 & timeout)
if (!start_no_users_thread_called.compare_exchange_strong(expected, true))
return;
if (is_dropped)
return;
if (is_temporary)
{
if (no_users_thread.joinable())
@ -438,8 +435,10 @@ void StorageLiveView::startNoUsersThread(const UInt64 & timeout)
no_users_thread_wakeup = false;
}
if (!is_dropped)
no_users_thread = std::thread(&StorageLiveView::noUsersThread, this, timeout);
no_users_thread = std::thread(&StorageLiveView::noUsersThread,
std::static_pointer_cast<StorageLiveView>(shared_from_this()), timeout);
}
start_no_users_thread_called = false;
}
@ -456,19 +455,19 @@ void StorageLiveView::shutdown()
if (no_users_thread.joinable())
{
std::lock_guard lock(no_users_thread_mutex);
no_users_thread_wakeup = true;
no_users_thread_condition.notify_one();
/// Must detach the no users thread
/// as we can't join it as it will result
/// in a deadlock
no_users_thread.detach(); /// TODO Not viable at all.
{
std::lock_guard lock(no_users_thread_mutex);
no_users_thread_wakeup = true;
no_users_thread_condition.notify_one();
}
}
}
StorageLiveView::~StorageLiveView()
{
shutdown();
if (no_users_thread.joinable())
no_users_thread.detach();
}
void StorageLiveView::drop()
@ -534,8 +533,11 @@ BlockInputStreams StorageLiveView::watch(
if (query.is_watch_events)
{
auto reader = std::make_shared<LiveViewEventsBlockInputStream>(std::static_pointer_cast<StorageLiveView>(shared_from_this()), blocks_ptr, blocks_metadata_ptr, active_ptr, has_limit, limit, context.getSettingsRef().live_view_heartbeat_interval.totalSeconds(),
context.getSettingsRef().temporary_live_view_timeout.totalSeconds());
auto reader = std::make_shared<LiveViewEventsBlockInputStream>(
std::static_pointer_cast<StorageLiveView>(shared_from_this()),
blocks_ptr, blocks_metadata_ptr, active_ptr, has_limit, limit,
context.getSettingsRef().live_view_heartbeat_interval.totalSeconds(),
context.getSettingsRef().temporary_live_view_timeout.totalSeconds());
if (no_users_thread.joinable())
{
@ -559,8 +561,11 @@ BlockInputStreams StorageLiveView::watch(
}
else
{
auto reader = std::make_shared<LiveViewBlockInputStream>(std::static_pointer_cast<StorageLiveView>(shared_from_this()), blocks_ptr, blocks_metadata_ptr, active_ptr, has_limit, limit, context.getSettingsRef().live_view_heartbeat_interval.totalSeconds(),
context.getSettingsRef().temporary_live_view_timeout.totalSeconds());
auto reader = std::make_shared<LiveViewBlockInputStream>(
std::static_pointer_cast<StorageLiveView>(shared_from_this()),
blocks_ptr, blocks_metadata_ptr, active_ptr, has_limit, limit,
context.getSettingsRef().live_view_heartbeat_interval.totalSeconds(),
context.getSettingsRef().temporary_live_view_timeout.totalSeconds());
if (no_users_thread.joinable())
{

View File

@ -71,11 +71,10 @@ public:
{
return active_ptr.use_count() > 1;
}
/// Background thread for temporary tables
/// which drops this table if there are no users
/// No users thread mutex, predicate and wake up condition
void startNoUsersThread(const UInt64 & timeout);
std::mutex no_users_thread_mutex;
bool no_users_thread_wakeup{false};
bool no_users_thread_wakeup = false;
std::condition_variable no_users_thread_condition;
/// Get blocks hash
/// must be called with mutex locked
@ -149,7 +148,7 @@ private:
String database_name;
ASTPtr inner_query;
Context & global_context;
bool is_temporary {false};
bool is_temporary = false;
mutable Block sample_block;
/// Mutex for the blocks and ready condition
@ -166,10 +165,12 @@ private:
std::shared_ptr<BlocksMetadataPtr> blocks_metadata_ptr;
BlocksPtrs mergeable_blocks;
void noUsersThread(const UInt64 & timeout);
/// Background thread for temporary tables
/// which drops this table if there are no users
static void noUsersThread(std::shared_ptr<StorageLiveView> storage, const UInt64 & timeout);
std::thread no_users_thread;
std::atomic<bool> shutdown_called{false};
std::atomic<bool> start_no_users_thread_called{false};
std::atomic<bool> shutdown_called = false;
std::atomic<bool> start_no_users_thread_called = false;
UInt64 temporary_live_view_timeout;
StorageLiveView(

View File

@ -0,0 +1,47 @@
#!/usr/bin/env python
import os
import sys
import signal
CURDIR = os.path.dirname(os.path.realpath(__file__))
sys.path.insert(0, os.path.join(CURDIR, 'helpers'))
from client import client, prompt, end_of_block
log = None
# uncomment the line below for debugging
#log=sys.stdout
with client(name='client1>', log=log) as client1, client(name='client2>', log=log) as client2:
client1.expect(prompt)
client2.expect(prompt)
client1.send('SET allow_experimental_live_view = 1')
client1.expect(prompt)
client2.send('SET allow_experimental_live_view = 1')
client2.expect(prompt)
client1.send('DROP TABLE IF EXISTS test.lv')
client1.expect(prompt)
client1.send(' DROP TABLE IF EXISTS test.mt')
client1.expect(prompt)
client1.send('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()')
client1.expect(prompt)
client1.send('CREATE LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt')
client1.expect(prompt)
client1.send('WATCH test.lv EVENTS')
client1.expect('1.*' + end_of_block)
client2.send('INSERT INTO test.mt VALUES (1),(2),(3)')
client1.expect('2.*' + end_of_block)
client2.send('INSERT INTO test.mt VALUES (4),(5),(6)')
client1.expect('3.*' + end_of_block)
# send Ctrl-C
client1.send('\x03', eol='')
match = client1.expect('(%s)|([#\$] )' % prompt)
if match.groups()[1]:
client1.send(client1.command)
client1.expect(prompt)
client1.send('DROP TABLE test.lv')
client1.expect(prompt)
client1.send('DROP TABLE test.mt')
client1.expect(prompt)

View File

@ -0,0 +1,3 @@
0 1
6 2
21 3

View File

@ -0,0 +1,20 @@
SET allow_experimental_live_view = 1;
DROP TABLE IF EXISTS test.lv;
DROP TABLE IF EXISTS test.mt;
CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple();
CREATE TEMPORARY LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt;
WATCH test.lv LIMIT 0;
INSERT INTO test.mt VALUES (1),(2),(3);
WATCH test.lv LIMIT 0;
INSERT INTO test.mt VALUES (4),(5),(6);
WATCH test.lv LIMIT 0;
DROP TABLE test.lv;
DROP TABLE test.mt;

View File

@ -0,0 +1,47 @@
#!/usr/bin/env python
import os
import sys
import signal
CURDIR = os.path.dirname(os.path.realpath(__file__))
sys.path.insert(0, os.path.join(CURDIR, 'helpers'))
from client import client, prompt, end_of_block
log = None
# uncomment the line below for debugging
#log=sys.stdout
with client(name='client1>', log=log) as client1, client(name='client2>', log=log) as client2:
client1.expect(prompt)
client2.expect(prompt)
client1.send('SET allow_experimental_live_view = 1')
client1.expect(prompt)
client2.send('SET allow_experimental_live_view = 1')
client2.expect(prompt)
client1.send('DROP TABLE IF EXISTS test.lv')
client1.expect(prompt)
client1.send('DROP TABLE IF EXISTS test.mt')
client1.expect(prompt)
client1.send('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()')
client1.expect(prompt)
client1.send('CREATE TEMPORARY LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt')
client1.expect(prompt)
client1.send('WATCH test.lv')
client1.expect(r'0.*1' + end_of_block)
client2.send('INSERT INTO test.mt VALUES (1),(2),(3)')
client1.expect(r'6.*2' + end_of_block)
client2.send('INSERT INTO test.mt VALUES (4),(5),(6)')
client1.expect(r'21.*3' + end_of_block)
# send Ctrl-C
client1.send('\x03', eol='')
match = client1.expect('(%s)|([#\$] )' % prompt)
if match.groups()[1]:
client1.send(client1.command)
client1.expect(prompt)
client1.send('DROP TABLE test.lv')
client1.expect(prompt)
client1.send('DROP TABLE test.mt')
client1.expect(prompt)

View File

@ -0,0 +1,54 @@
#!/usr/bin/env python
import os
import sys
import signal
CURDIR = os.path.dirname(os.path.realpath(__file__))
sys.path.insert(0, os.path.join(CURDIR, 'helpers'))
from client import client, prompt, end_of_block
log = None
# uncomment the line below for debugging
#log=sys.stdout
with client(name='client1>', log=log) as client1, client(name='client2>', log=log) as client2:
client1.expect(prompt)
client2.expect(prompt)
client1.send('SET allow_experimental_live_view = 1')
client1.expect(prompt)
client2.send('SET allow_experimental_live_view = 1')
client2.expect(prompt)
client1.send('DROP TABLE IF EXISTS test.lv')
client1.expect(prompt)
client1.send('DROP TABLE IF EXISTS test.mt')
client1.expect(prompt)
client1.send('SET temporary_live_view_timeout=1')
client1.expect(prompt)
client1.send('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()')
client1.expect(prompt)
client1.send('CREATE TEMPORARY LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt')
client1.expect(prompt)
client1.send('WATCH test.lv')
client1.expect(r'0.*1' + end_of_block)
client2.send('INSERT INTO test.mt VALUES (1),(2),(3)')
client2.expect(prompt)
client1.expect(r'6.*2' + end_of_block)
client2.send('INSERT INTO test.mt VALUES (4),(5),(6)')
client2.expect(prompt)
client1.expect(r'21.*3' + end_of_block)
# send Ctrl-C
client1.send('\x03', eol='')
match = client1.expect('(%s)|([#\$] )' % prompt)
if match.groups()[1]:
client1.send(client1.command)
client1.expect(prompt)
client1.send('SELECT sleep(1)')
client1.expect(prompt)
client1.send('DROP TABLE test.lv')
client1.expect('Table test.lv doesn\'t exist')
client1.expect(prompt)
client1.send('DROP TABLE test.mt')
client1.expect(prompt)

View File

@ -0,0 +1,49 @@
#!/usr/bin/env python
import os
import sys
import signal
CURDIR = os.path.dirname(os.path.realpath(__file__))
sys.path.insert(0, os.path.join(CURDIR, 'helpers'))
from client import client, prompt, end_of_block
log = None
# uncomment the line below for debugging
#log=sys.stdout
with client(name='client1>', log=log) as client1, client(name='client2>', log=log) as client2:
client1.expect(prompt)
client2.expect(prompt)
client1.send('SET allow_experimental_live_view = 1')
client1.expect(prompt)
client2.send('SET allow_experimental_live_view = 1')
client2.expect(prompt)
client1.send('DROP TABLE IF EXISTS test.lv')
client1.expect(prompt)
client1.send(' DROP TABLE IF EXISTS test.mt')
client1.expect(prompt)
client1.send('SET live_view_heartbeat_interval=1')
client1.expect(prompt)
client1.send('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()')
client1.expect(prompt)
client1.send('CREATE TEMPORARY LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt')
client1.expect(prompt)
client1.send('WATCH test.lv EVENTS')
client2.send('INSERT INTO test.mt VALUES (1),(2),(3)')
client1.expect('2.*' + end_of_block)
client1.expect('Progress: 2.00 rows.*\)')
# wait for heartbeat
client1.expect('Progress: 2.00 rows.*\)')
# send Ctrl-C
client1.send('\x03', eol='')
match = client1.expect('(%s)|([#\$] )' % prompt)
if match.groups()[1]:
client1.send(client1.command)
client1.expect(prompt)
client1.send('DROP TABLE test.lv')
client1.expect(prompt)
client1.send('DROP TABLE test.mt')
client1.expect(prompt)

View File

@ -0,0 +1,50 @@
#!/usr/bin/env python
import os
import sys
import signal
CURDIR = os.path.dirname(os.path.realpath(__file__))
sys.path.insert(0, os.path.join(CURDIR, 'helpers'))
from client import client, prompt, end_of_block
log = None
# uncomment the line below for debugging
#log=sys.stdout
with client(name='client1>', log=log) as client1, client(name='client2>', log=log) as client2:
client1.expect(prompt)
client2.expect(prompt)
client1.send('SET allow_experimental_live_view = 1')
client1.expect(prompt)
client2.send('SET allow_experimental_live_view = 1')
client2.expect(prompt)
client1.send('DROP TABLE IF EXISTS test.lv')
client1.expect(prompt)
client1.send(' DROP TABLE IF EXISTS test.mt')
client1.expect(prompt)
client1.send('SET live_view_heartbeat_interval=1')
client1.expect(prompt)
client1.send('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()')
client1.expect(prompt)
client1.send('CREATE TEMPORARY LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt')
client1.expect(prompt)
client1.send('WATCH test.lv')
client1.expect(r'0.*1' + end_of_block)
client2.send('INSERT INTO test.mt VALUES (1),(2),(3)')
client1.expect(r'6.*2' + end_of_block)
client1.expect('Progress: 2.00 rows.*\)')
# wait for heartbeat
client1.expect('Progress: 2.00 rows.*\)')
# send Ctrl-C
client1.send('\x03', eol='')
match = client1.expect('(%s)|([#\$] )' % prompt)
if match.groups()[1]:
client1.send(client1.command)
client1.expect(prompt)
client1.send('DROP TABLE test.lv')
client1.expect(prompt)
client1.send('DROP TABLE test.mt')
client1.expect(prompt)

View File

@ -0,0 +1,40 @@
#!/usr/bin/env python
import os
import sys
CURDIR = os.path.dirname(os.path.realpath(__file__))
sys.path.insert(0, os.path.join(CURDIR, 'helpers'))
from client import client, prompt, end_of_block
from httpclient import client as http_client
log = None
# uncomment the line below for debugging
#log=sys.stdout
with client(name='client1>', log=log) as client1:
client1.expect(prompt)
client1.send('SET allow_experimental_live_view = 1')
client1.expect(prompt)
client1.send('DROP TABLE IF EXISTS test.lv')
client1.expect(prompt)
client1.send(' DROP TABLE IF EXISTS test.mt')
client1.expect(prompt)
client1.send('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()')
client1.expect(prompt)
client1.send('CREATE LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt')
client1.expect(prompt)
with http_client({'method':'GET', 'url': '/?allow_experimental_live_view=1&query=WATCH%20test.lv%20EVENTS'}, name='client2>', log=log) as client2:
client2.expect('.*1\n')
client1.send('INSERT INTO test.mt VALUES (1),(2),(3)')
client1.expect(prompt)
client2.expect('.*2\n')
client1.send('DROP TABLE test.lv')
client1.expect(prompt)
client1.send('DROP TABLE test.mt')
client1.expect(prompt)

View File

@ -0,0 +1,40 @@
#!/usr/bin/env python
import os
import sys
CURDIR = os.path.dirname(os.path.realpath(__file__))
sys.path.insert(0, os.path.join(CURDIR, 'helpers'))
from client import client, prompt, end_of_block
from httpclient import client as http_client
log = None
# uncomment the line below for debugging
#log=sys.stdout
with client(name='client1>', log=log) as client1:
client1.expect(prompt)
client1.send('SET allow_experimental_live_view = 1')
client1.expect(prompt)
client1.send('DROP TABLE IF EXISTS test.lv')
client1.expect(prompt)
client1.send(' DROP TABLE IF EXISTS test.mt')
client1.expect(prompt)
client1.send('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()')
client1.expect(prompt)
client1.send('CREATE LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt')
client1.expect(prompt)
with http_client({'method':'GET', 'url':'/?allow_experimental_live_view=1&query=WATCH%20test.lv'}, name='client2>', log=log) as client2:
client2.expect('.*0\t1\n')
client1.send('INSERT INTO test.mt VALUES (1),(2),(3)')
client1.expect(prompt)
client2.expect('.*6\t2\n')
client1.send('DROP TABLE test.lv')
client1.expect(prompt)
client1.send('DROP TABLE test.mt')
client1.expect(prompt)

View File

@ -0,0 +1,4 @@
{"row":{"a":1}}
{"row":{"a":2}}
{"row":{"a":3}}
{"progress":{"read_rows":"3","read_bytes":"36","written_rows":"0","written_bytes":"0","total_rows_to_read":"0"}}

View File

@ -0,0 +1,14 @@
SET allow_experimental_live_view = 1;
DROP TABLE IF EXISTS test.lv;
DROP TABLE IF EXISTS test.mt;
CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple();
CREATE LIVE VIEW test.lv AS SELECT * FROM test.mt;
INSERT INTO test.mt VALUES (1),(2),(3);
SELECT * FROM test.lv FORMAT JSONEachRowWithProgress;
DROP TABLE test.lv;
DROP TABLE test.mt;

View File

@ -0,0 +1,6 @@
{"row":{"sum(a)":"0","_version":"1"}}
{"progress":{"read_rows":"1","read_bytes":"16","written_rows":"0","written_bytes":"0","total_rows_to_read":"0"}}
{"row":{"sum(a)":"6","_version":"2"}}
{"progress":{"read_rows":"1","read_bytes":"16","written_rows":"0","written_bytes":"0","total_rows_to_read":"0"}}
{"row":{"sum(a)":"21","_version":"3"}}
{"progress":{"read_rows":"1","read_bytes":"16","written_rows":"0","written_bytes":"0","total_rows_to_read":"0"}}

View File

@ -0,0 +1,20 @@
SET allow_experimental_live_view = 1;
DROP TABLE IF EXISTS test.lv;
DROP TABLE IF EXISTS test.mt;
CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple();
CREATE LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt;
WATCH test.lv LIMIT 0 FORMAT JSONEachRowWithProgress;
INSERT INTO test.mt VALUES (1),(2),(3);
WATCH test.lv LIMIT 0 FORMAT JSONEachRowWithProgress;
INSERT INTO test.mt VALUES (4),(5),(6);
WATCH test.lv LIMIT 0 FORMAT JSONEachRowWithProgress;
DROP TABLE test.lv;
DROP TABLE test.mt;

View File

@ -0,0 +1,45 @@
#!/usr/bin/env python
import os
import sys
CURDIR = os.path.dirname(os.path.realpath(__file__))
sys.path.insert(0, os.path.join(CURDIR, 'helpers'))
from client import client, prompt, end_of_block
from httpclient import client as http_client
log = None
# uncomment the line below for debugging
#log=sys.stdout
with client(name='client1>', log=log) as client1:
client1.expect(prompt)
client1.send('SET allow_experimental_live_view = 1')
client1.expect(prompt)
client1.send('DROP TABLE IF EXISTS test.lv')
client1.expect(prompt)
client1.send(' DROP TABLE IF EXISTS test.mt')
client1.expect(prompt)
client1.send('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()')
client1.expect(prompt)
client1.send('CREATE LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt')
client1.expect(prompt)
with http_client({'method':'GET', 'url': '/?allow_experimental_live_view=1&live_view_heartbeat_interval=1&query=WATCH%20test.lv%20EVENTS%20FORMAT%20JSONEachRowWithProgress'}, name='client2>', log=log) as client2:
client2.expect('{"progress":{"read_rows":"1","read_bytes":"8","written_rows":"0","written_bytes":"0","total_rows_to_read":"0"}}\n', escape=True)
client2.expect('{"row":{"version":"1"}', escape=True)
client2.expect('{"progress":{"read_rows":"1","read_bytes":"8","written_rows":"0","written_bytes":"0","total_rows_to_read":"0"}}', escape=True)
# heartbeat is provided by progress message
client2.expect('{"progress":{"read_rows":"1","read_bytes":"8","written_rows":"0","written_bytes":"0","total_rows_to_read":"0"}}', escape=True)
client1.send('INSERT INTO test.mt VALUES (1),(2),(3)')
client1.expect(prompt)
client2.expect('{"row":{"version":"2"}}\n', escape=True)
client1.send('DROP TABLE test.lv')
client1.expect(prompt)
client1.send('DROP TABLE test.mt')
client1.expect(prompt)

View File

@ -0,0 +1,46 @@
#!/usr/bin/env python
import os
import sys
CURDIR = os.path.dirname(os.path.realpath(__file__))
sys.path.insert(0, os.path.join(CURDIR, 'helpers'))
from client import client, prompt, end_of_block
from httpclient import client as http_client
log = None
# uncomment the line below for debugging
#log=sys.stdout
with client(name='client1>', log=log) as client1:
client1.expect(prompt)
client1.send('SET allow_experimental_live_view = 1')
client1.expect(prompt)
client1.send('DROP TABLE IF EXISTS test.lv')
client1.expect(prompt)
client1.send(' DROP TABLE IF EXISTS test.mt')
client1.expect(prompt)
client1.send('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()')
client1.expect(prompt)
client1.send('CREATE LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt')
client1.expect(prompt)
with http_client({'method':'GET', 'url':'/?allow_experimental_live_view=1&live_view_heartbeat_interval=1&query=WATCH%20test.lv%20FORMAT%20JSONEachRowWithProgress'}, name='client2>', log=log) as client2:
client2.expect('"progress".*',)
client2.expect('{"row":{"sum(a)":"0","_version":"1"}}\n', escape=True)
client2.expect('"progress".*\n')
# heartbeat is provided by progress message
client2.expect('"progress".*\n')
client1.send('INSERT INTO test.mt VALUES (1),(2),(3)')
client1.expect(prompt)
client2.expect('"progress".*"read_rows":"2".*\n')
client2.expect('{"row":{"sum(a)":"6","_version":"2"}}\n', escape=True)
client1.send('DROP TABLE test.lv')
client1.expect(prompt)
client1.send('DROP TABLE test.mt')
client1.expect(prompt)

View File

@ -0,0 +1,9 @@
SET allow_experimental_live_view = 1;
DROP TABLE IF EXISTS test.lv;
CREATE LIVE VIEW test.lv AS SELECT 1;
SELECT * FROM test.lv;
DROP TABLE test.lv;

View File

@ -0,0 +1,4 @@
6 1
6 1
12 2
12 2

View File

@ -0,0 +1,20 @@
SET allow_experimental_live_view = 1;
DROP TABLE IF EXISTS test.lv;
DROP TABLE IF EXISTS test.mt;
CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple();
CREATE LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt;
INSERT INTO test.mt VALUES (1),(2),(3);
SELECT *,_version FROM test.lv;
SELECT *,_version FROM test.lv;
INSERT INTO test.mt VALUES (1),(2),(3);
SELECT *,_version FROM test.lv;
SELECT *,_version FROM test.lv;
DROP TABLE test.lv;
DROP TABLE test.mt;

View File

@ -0,0 +1,18 @@
SET allow_experimental_live_view = 1;
DROP TABLE IF EXISTS test.lv;
DROP TABLE IF EXISTS test.mt;
CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple();
CREATE LIVE VIEW test.lv AS SELECT * FROM test.mt;
INSERT INTO test.mt VALUES (1),(2),(3);
SELECT sum(a) FROM test.lv;
INSERT INTO test.mt VALUES (4),(5),(6);
SELECT sum(a) FROM test.lv;
DROP TABLE test.lv;
DROP TABLE test.mt;

View File

@ -0,0 +1,9 @@
SET allow_experimental_live_view = 1;
DROP TABLE IF EXISTS test.mt;
CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple();
CREATE LIVE VIEW test.lv AS SELECT * FROM test.mt;
DROP TABLE test.lv;
DROP TABLE test.mt;

View File

@ -0,0 +1,3 @@
1 1
2 1
3 1

View File

@ -0,0 +1,14 @@
SET allow_experimental_live_view = 1;
DROP TABLE IF EXISTS test.lv;
DROP TABLE IF EXISTS test.mt;
CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple();
CREATE LIVE VIEW test.lv AS SELECT * FROM test.mt;
INSERT INTO test.mt VALUES (1),(2),(3);
SELECT *,_version FROM test.lv;
DROP TABLE test.lv;
DROP TABLE test.mt;

View File

@ -0,0 +1,3 @@
1
2
3

View File

@ -0,0 +1,20 @@
SET allow_experimental_live_view = 1;
DROP TABLE IF EXISTS test.lv;
DROP TABLE IF EXISTS test.mt;
CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple();
CREATE LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt;
WATCH test.lv EVENTS LIMIT 0;
INSERT INTO test.mt VALUES (1),(2),(3);
WATCH test.lv EVENTS LIMIT 0;
INSERT INTO test.mt VALUES (4),(5),(6);
WATCH test.lv EVENTS LIMIT 0;
DROP TABLE test.lv;
DROP TABLE test.mt;

View File

@ -0,0 +1,3 @@
0 1
6 2
21 3

View File

@ -0,0 +1,20 @@
SET allow_experimental_live_view = 1;
DROP TABLE IF EXISTS test.lv;
DROP TABLE IF EXISTS test.mt;
CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple();
CREATE LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt;
WATCH test.lv LIMIT 0;
INSERT INTO test.mt VALUES (1),(2),(3);
WATCH test.lv LIMIT 0;
INSERT INTO test.mt VALUES (4),(5),(6);
WATCH test.lv LIMIT 0;
DROP TABLE test.lv;
DROP TABLE test.mt;

View File

@ -0,0 +1,53 @@
#!/usr/bin/env python
import os
import sys
import signal
CURDIR = os.path.dirname(os.path.realpath(__file__))
sys.path.insert(0, os.path.join(CURDIR, 'helpers'))
from client import client, prompt, end_of_block
log = None
# uncomment the line below for debugging
#log=sys.stdout
with client(name='client1>', log=log) as client1, client(name='client2>', log=log) as client2:
client1.expect(prompt)
client2.expect(prompt)
client1.send('SET allow_experimental_live_view = 1')
client1.expect(prompt)
client2.send('SET allow_experimental_live_view = 1')
client2.expect(prompt)
client1.send('DROP TABLE IF EXISTS test.lv')
client1.expect(prompt)
client1.send(' DROP TABLE IF EXISTS test.mt')
client1.expect(prompt)
client1.send('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()')
client1.expect(prompt)
client1.send('CREATE LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt')
client1.expect(prompt)
client1.send('WATCH test.lv')
client1.expect(r'0.*1' + end_of_block)
client2.send('INSERT INTO test.mt VALUES (1),(2),(3)')
client1.expect(r'6.*2' + end_of_block)
client2.expect(prompt)
client2.send('INSERT INTO test.mt VALUES (4),(5),(6)')
client1.expect(r'21.*3' + end_of_block)
client2.expect(prompt)
for i in range(1,129):
client2.send('INSERT INTO test.mt VALUES (1)')
client1.expect(r'%d.*%d' % (21+i, 3+i) + end_of_block)
client2.expect(prompt)
# send Ctrl-C
client1.send('\x03', eol='')
match = client1.expect('(%s)|([#\$] )' % prompt)
if match.groups()[1]:
client1.send(client1.command)
client1.expect(prompt)
client1.send('DROP TABLE test.lv')
client1.expect(prompt)
client1.send('DROP TABLE test.mt')
client1.expect(prompt)

View File

@ -0,0 +1,3 @@
temporary_live_view_timeout 5
live_view_heartbeat_interval 15
0

View File

@ -0,0 +1,17 @@
SET allow_experimental_live_view = 1;
DROP TABLE IF EXISTS test.lv;
DROP TABLE IF EXISTS test.mt;
SELECT name, value from system.settings WHERE name = 'temporary_live_view_timeout';
SELECT name, value from system.settings WHERE name = 'live_view_heartbeat_interval';
SET temporary_live_view_timeout=1;
CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple();
CREATE TEMPORARY LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt;
SHOW TABLES LIKE 'lv';
SELECT sleep(2);
SHOW TABLES LIKE 'lv';
DROP TABLE test.mt;

View File

@ -0,0 +1,81 @@
#!/usr/bin/env python
import subprocess
import threading
import Queue as queue
import os
import sys
import signal
CLICKHOUSE_CLIENT = os.environ.get('CLICKHOUSE_CLIENT')
CLICKHOUSE_CURL = os.environ.get('CLICKHOUSE_CURL')
CLICKHOUSE_URL = os.environ.get('CLICKHOUSE_URL')
def send_query(query):
cmd = list(CLICKHOUSE_CLIENT.split())
cmd += ['--query', query]
# print(cmd)
return subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT).stdout
def send_query_in_process_group(query):
cmd = list(CLICKHOUSE_CLIENT.split())
cmd += ['--query', query]
# print(cmd)
return subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, preexec_fn=os.setsid)
def read_lines_and_push_to_queue(pipe, queue):
try:
for line in iter(pipe.readline, ''):
line = line.strip()
print(line)
sys.stdout.flush()
queue.put(line)
except KeyboardInterrupt:
pass
queue.put(None)
def test():
send_query('DROP TABLE IF EXISTS test.lv').read()
send_query('DROP TABLE IF EXISTS test.mt').read()
send_query('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()').read()
send_query('CREATE LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt').read()
q = queue.Queue()
p = send_query_in_process_group('WATCH test.lv')
thread = threading.Thread(target=read_lines_and_push_to_queue, args=(p.stdout, q))
thread.start()
line = q.get()
print(line)
assert (line == '0\t1')
send_query('INSERT INTO test.mt VALUES (1),(2),(3)').read()
line = q.get()
print(line)
assert (line == '6\t2')
send_query('INSERT INTO test.mt VALUES (4),(5),(6)').read()
line = q.get()
print(line)
assert (line == '21\t3')
# Send Ctrl+C to client.
os.killpg(os.getpgid(p.pid), signal.SIGINT)
# This insert shouldn't affect lv.
send_query('INSERT INTO test.mt VALUES (7),(8),(9)').read()
line = q.get()
print(line)
assert (line is None)
send_query('DROP TABLE if exists test.lv').read()
send_query('DROP TABLE if exists test.lv').read()
thread.join()
test()

View File

@ -0,0 +1,7 @@
0 1
0 1
6 2
6 2
21 3
21 3
None

View File

@ -0,0 +1,6 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. $CURDIR/../shell_config.sh
python $CURDIR/00991_live_view_watch_event_live.python

View File

@ -0,0 +1,63 @@
#!/usr/bin/env python
import subprocess
import threading
import Queue as queue
import os
import sys
CLICKHOUSE_CLIENT = os.environ.get('CLICKHOUSE_CLIENT')
CLICKHOUSE_CURL = os.environ.get('CLICKHOUSE_CURL')
CLICKHOUSE_URL = os.environ.get('CLICKHOUSE_URL')
def send_query(query):
cmd = list(CLICKHOUSE_CLIENT.split())
cmd += ['--query', query]
# print(cmd)
return subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT).stdout
def send_http_query(query):
cmd = list(CLICKHOUSE_CURL.split()) # list(['curl', '-sSN', '--max-time', '10'])
cmd += ['-sSN', CLICKHOUSE_URL, '-d', query]
return subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT).stdout
def read_lines_and_push_to_queue(pipe, queue):
for line in iter(pipe.readline, ''):
line = line.strip()
print(line)
sys.stdout.flush()
queue.put(line)
queue.put(None)
def test():
send_query('DROP TABLE IF EXISTS test.lv').read()
send_query('DROP TABLE IF EXISTS test.mt').read()
send_query('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()').read()
send_query('CREATE LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt').read()
q = queue.Queue()
pipe = send_http_query('WATCH test.lv')
thread = threading.Thread(target=read_lines_and_push_to_queue, args=(pipe, q))
thread.start()
line = q.get()
print(line)
assert (line == '0\t1')
send_query('INSERT INTO test.mt VALUES (1),(2),(3)').read()
line = q.get()
print(line)
assert (line == '6\t2')
send_query('DROP TABLE if exists test.lv').read()
send_query('DROP TABLE if exists test.lv').read()
thread.join()
test()

View File

@ -0,0 +1,4 @@
0 1
0 1
6 2
6 2

View File

@ -0,0 +1,6 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. $CURDIR/../shell_config.sh
python $CURDIR/00991_live_view_watch_http.python

View File

@ -0,0 +1,83 @@
#!/usr/bin/env python
import subprocess
import threading
import Queue as queue
import os
import sys
import signal
CLICKHOUSE_CLIENT = os.environ.get('CLICKHOUSE_CLIENT')
CLICKHOUSE_CURL = os.environ.get('CLICKHOUSE_CURL')
CLICKHOUSE_URL = os.environ.get('CLICKHOUSE_URL')
def send_query(query):
cmd = list(CLICKHOUSE_CLIENT.split())
cmd += ['--query', query]
# print(cmd)
return subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT).stdout
def send_query_in_process_group(query):
cmd = list(CLICKHOUSE_CLIENT.split())
cmd += ['--query', query, '--live_view_heartbeat_interval=1', '--progress']
# print(cmd)
return subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, preexec_fn=os.setsid)
def read_lines_and_push_to_queue(pipe, queue):
try:
for line in iter(pipe.readline, ''):
line = line.strip()
# print(line)
sys.stdout.flush()
queue.put(line)
except KeyboardInterrupt:
pass
queue.put(None)
def test():
send_query('DROP TABLE IF EXISTS test.lv').read()
send_query('DROP TABLE IF EXISTS test.mt').read()
send_query('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()').read()
send_query('CREATE TEMPORARY LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt').read()
q = queue.Queue()
p = send_query_in_process_group('WATCH test.lv')
thread = threading.Thread(target=read_lines_and_push_to_queue, args=(p.stdout, q))
thread.start()
line = q.get()
# print(line)
assert (line.endswith('0\t1'))
assert ('Progress: 0.00 rows' in line)
send_query('INSERT INTO test.mt VALUES (1),(2),(3)').read()
line = q.get()
assert (line.endswith('6\t2'))
assert ('Progress: 1.00 rows' in line)
# send_query('INSERT INTO test.mt VALUES (4),(5),(6)').read()
# line = q.get()
# print(line)
# assert (line.endswith('6\t2'))
# assert ('Progress: 1.00 rows' in line)
# Send Ctrl+C to client.
os.killpg(os.getpgid(p.pid), signal.SIGINT)
# This insert shouldn't affect lv.
send_query('INSERT INTO test.mt VALUES (7),(8),(9)').read()
line = q.get()
# print(line)
# assert (line is None)
send_query('DROP TABLE if exists test.lv').read()
send_query('DROP TABLE if exists test.lv').read()
thread.join()
test()

View File

@ -0,0 +1,6 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. $CURDIR/../shell_config.sh
python $CURDIR/00991_temporary_live_view_watch_events_heartbeat.python

View File

@ -0,0 +1,81 @@
#!/usr/bin/env python
import subprocess
import threading
import Queue as queue
import os
import sys
import signal
CLICKHOUSE_CLIENT = os.environ.get('CLICKHOUSE_CLIENT')
CLICKHOUSE_CURL = os.environ.get('CLICKHOUSE_CURL')
CLICKHOUSE_URL = os.environ.get('CLICKHOUSE_URL')
def send_query(query):
cmd = list(CLICKHOUSE_CLIENT.split())
cmd += ['--query', query]
# print(cmd)
return subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT).stdout
def send_query_in_process_group(query):
cmd = list(CLICKHOUSE_CLIENT.split())
cmd += ['--query', query]
# print(cmd)
return subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, preexec_fn=os.setsid)
def read_lines_and_push_to_queue(pipe, queue):
try:
for line in iter(pipe.readline, ''):
line = line.strip()
print(line)
sys.stdout.flush()
queue.put(line)
except KeyboardInterrupt:
pass
queue.put(None)
def test():
send_query('DROP TABLE IF EXISTS test.lv').read()
send_query('DROP TABLE IF EXISTS test.mt').read()
send_query('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()').read()
send_query('CREATE TEMPORARY LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt').read()
q = queue.Queue()
p = send_query_in_process_group('WATCH test.lv')
thread = threading.Thread(target=read_lines_and_push_to_queue, args=(p.stdout, q))
thread.start()
line = q.get()
print(line)
assert (line == '0\t1')
send_query('INSERT INTO test.mt VALUES (1),(2),(3)').read()
line = q.get()
print(line)
assert (line == '6\t2')
send_query('INSERT INTO test.mt VALUES (4),(5),(6)').read()
line = q.get()
print(line)
assert (line == '21\t3')
# Send Ctrl+C to client.
os.killpg(os.getpgid(p.pid), signal.SIGINT)
# This insert shouldn't affect lv.
send_query('INSERT INTO test.mt VALUES (7),(8),(9)').read()
line = q.get()
print(line)
assert (line is None)
send_query('DROP TABLE if exists test.lv').read()
send_query('DROP TABLE if exists test.lv').read()
thread.join()
test()

View File

@ -0,0 +1,7 @@
0 1
0 1
6 2
6 2
21 3
21 3
None

View File

@ -0,0 +1,6 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. $CURDIR/../shell_config.sh
python $CURDIR/00991_temporary_live_view_watch_live.python

View File

@ -0,0 +1,36 @@
import os
import sys
import time
CURDIR = os.path.dirname(os.path.realpath(__file__))
sys.path.insert(0, os.path.join(CURDIR))
import uexpect
prompt = ':\) '
end_of_block = r'.*\r\n.*\r\n'
class client(object):
def __init__(self, command=None, name='', log=None):
self.client = uexpect.spawn(['/bin/bash','--noediting'])
if command is None:
command = os.environ.get('CLICKHOUSE_BINARY', 'clickhouse') + '-client'
self.client.command = command
self.client.eol('\r')
self.client.logger(log, prefix=name)
self.client.timeout(20)
self.client.expect('[#\$] ', timeout=2)
self.client.send(command)
def __enter__(self):
return self.client.__enter__()
def __exit__(self, type, value, traceback):
self.client.reader['kill_event'].set()
# send Ctrl-C
self.client.send('\x03', eol='')
time.sleep(0.3)
self.client.send('quit', eol='\r')
self.client.send('\x03', eol='')
return self.client.__exit__(type, value, traceback)

View File

@ -0,0 +1,14 @@
import os
import sys
CURDIR = os.path.dirname(os.path.realpath(__file__))
sys.path.insert(0, os.path.join(CURDIR))
import httpexpect
def client(request, name='', log=None):
client = httpexpect.spawn({'host':'localhost','port':8123}, request)
client.logger(log, prefix=name)
client.timeout(20)
return client

View File

@ -0,0 +1,73 @@
# Copyright (c) 2019 Vitaliy Zakaznikov
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import sys
import httplib
CURDIR = os.path.dirname(os.path.realpath(__file__))
sys.path.insert(0, CURDIR)
import uexpect
from threading import Thread, Event
from Queue import Queue, Empty
class IO(uexpect.IO):
def __init__(self, connection, response, queue, reader):
self.connection = connection
self.response = response
super(IO, self).__init__(None, None, queue, reader)
def write(self, data):
raise NotImplementedError
def close(self, force=True):
self.reader['kill_event'].set()
self.connection.close()
if self._logger:
self._logger.write('\n')
self._logger.flush()
def reader(response, queue, kill_event):
while True:
try:
if kill_event.is_set():
break
data = response.read(1)
queue.put(data)
except Exception, e:
if kill_event.is_set():
break
raise
def spawn(connection, request):
connection = httplib.HTTPConnection(**connection)
connection.request(**request)
response = connection.getresponse()
queue = Queue()
reader_kill_event = Event()
thread = Thread(target=reader, args=(response, queue, reader_kill_event))
thread.daemon = True
thread.start()
return IO(connection, response, queue, reader={'thread':thread, 'kill_event':reader_kill_event})
if __name__ == '__main__':
with http({'host':'localhost','port':8123},{'method':'GET', 'url':'?query=SELECT%201'}) as client:
client.logger(sys.stdout)
client.timeout(2)
print client.response.status, client.response.reason
client.expect('1\n')

View File

@ -0,0 +1,206 @@
# Copyright (c) 2019 Vitaliy Zakaznikov
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import pty
import time
import sys
import re
from threading import Thread, Event
from subprocess import Popen
from Queue import Queue, Empty
class TimeoutError(Exception):
def __init__(self, timeout):
self.timeout = timeout
def __str__(self):
return 'Timeout %.3fs' % float(self.timeout)
class ExpectTimeoutError(Exception):
def __init__(self, pattern, timeout, buffer):
self.pattern = pattern
self.timeout = timeout
self.buffer = buffer
def __str__(self):
s = 'Timeout %.3fs ' % float(self.timeout)
if self.pattern:
s += 'for %s ' % repr(self.pattern.pattern)
if self.buffer:
s += 'buffer %s ' % repr(self.buffer[:])
s += 'or \'%s\'' % ','.join(['%x' % ord(c) for c in self.buffer[:]])
return s
class IO(object):
class EOF(object):
pass
class Timeout(object):
pass
EOF = EOF
TIMEOUT = Timeout
class Logger(object):
def __init__(self, logger, prefix=''):
self._logger = logger
self._prefix = prefix
def write(self, data):
self._logger.write(('\n' + data).replace('\n','\n' + self._prefix))
def flush(self):
self._logger.flush()
def __init__(self, process, master, queue, reader):
self.process = process
self.master = master
self.queue = queue
self.buffer = None
self.before = None
self.after = None
self.match = None
self.pattern = None
self.reader = reader
self._timeout = None
self._logger = None
self._eol = ''
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
self.close()
def logger(self, logger=None, prefix=''):
if logger:
self._logger = self.Logger(logger, prefix=prefix)
return self._logger
def timeout(self, timeout=None):
if timeout:
self._timeout = timeout
return self._timeout
def eol(self, eol=None):
if eol:
self._eol = eol
return self._eol
def close(self, force=True):
self.reader['kill_event'].set()
os.system('pkill -TERM -P %d' % self.process.pid)
if force:
self.process.kill()
else:
self.process.terminate()
os.close(self.master)
if self._logger:
self._logger.write('\n')
self._logger.flush()
def send(self, data, eol=None):
if eol is None:
eol = self._eol
return self.write(data + eol)
def write(self, data):
return os.write(self.master, data)
def expect(self, pattern, timeout=None, escape=False):
self.match = None
self.before = None
self.after = None
if escape:
pattern = re.escape(pattern)
pattern = re.compile(pattern)
if timeout is None:
timeout = self._timeout
timeleft = timeout
while True:
start_time = time.time()
if self.buffer is not None:
self.match = pattern.search(self.buffer, 0)
if self.match is not None:
self.after = self.buffer[self.match.start():self.match.end()]
self.before = self.buffer[:self.match.start()]
self.buffer = self.buffer[self.match.end():]
break
if timeleft < 0:
break
try:
data = self.read(timeout=timeleft, raise_exception=True)
except TimeoutError:
if self._logger:
self._logger.write((self.buffer or '') + '\n')
self._logger.flush()
exception = ExpectTimeoutError(pattern, timeout, self.buffer)
self.buffer = None
raise exception
timeleft -= (time.time() - start_time)
if data:
self.buffer = (self.buffer + data) if self.buffer else data
if self._logger:
self._logger.write((self.before or '') + (self.after or ''))
self._logger.flush()
if self.match is None:
exception = ExpectTimeoutError(pattern, timeout, self.buffer)
self.buffer = None
raise exception
return self.match
def read(self, timeout=0, raise_exception=False):
data = ''
timeleft = timeout
try:
while timeleft >= 0 :
start_time = time.time()
data += self.queue.get(timeout=timeleft)
if data:
break
timeleft -= (time.time() - start_time)
except Empty:
if data:
return data
if raise_exception:
raise TimeoutError(timeout)
pass
if not data and raise_exception:
raise TimeoutError(timeout)
return data
def spawn(command):
master, slave = pty.openpty()
process = Popen(command, preexec_fn=os.setsid, stdout=slave, stdin=slave, stderr=slave, bufsize=1)
os.close(slave)
queue = Queue()
reader_kill_event = Event()
thread = Thread(target=reader, args=(process, master, queue, reader_kill_event))
thread.daemon = True
thread.start()
return IO(process, master, queue, reader={'thread':thread, 'kill_event':reader_kill_event})
def reader(process, out, queue, kill_event):
while True:
try:
data = os.read(out, 65536)
queue.put(data)
except:
if kill_event.is_set():
break
raise