ClickHouse/dbms/tests/queries/0_stateless/uexpect.py
Vitaliy Zakaznikov aa3ef47aab * Adding JSONEachRowWithProgress format
that can be used with WATCH queries when HTTP connection is used
  in this case progress events are used as a heartbeat
* Adding new tests that use JSONEachRowWithProgress format
* Adding tests for WATCH query heartbeats when HTTP connection is used
* Small fixes to uexpect.py
2019-06-07 09:05:14 -04:00

194 lines
5.5 KiB
Python

# Copyright (c) 2019 Vitaliy Zakaznikov
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import pty
import time
import sys
import re
from threading import Thread
from subprocess import Popen
from Queue import Queue, Empty
class TimeoutError(Exception):
def __init__(self, timeout):
self.timeout = timeout
def __str__(self):
return 'Timeout %.3fs' % float(self.timeout)
class ExpectTimeoutError(Exception):
def __init__(self, pattern, timeout, buffer):
self.pattern = pattern
self.timeout = timeout
self.buffer = buffer
def __str__(self):
return ('Timeout %.3fs ' % float(self.timeout) +
'for %s ' % repr(self.pattern.pattern) +
'buffer ends with %s ' % repr(self.buffer[-80:]) +
'or \'%s\'' % ','.join(['%x' % ord(c) for c in self.buffer[-80:]]))
class IO(object):
class EOF(object):
pass
class Timeout(object):
pass
EOF = EOF
TIMEOUT = Timeout
class Logger(object):
def __init__(self, logger, prefix=''):
self._logger = logger
self._prefix = prefix
def write(self, data):
self._logger.write(('\n' + data).replace('\n','\n' + self._prefix))
def flush(self):
self._logger.flush()
def __init__(self, process, master, queue):
self.process = process
self.master = master
self.queue = queue
self.buffer = None
self.before = None
self.after = None
self.match = None
self.pattern = None
self._timeout = None
self._logger = None
self._eol = ''
def logger(self, logger=None, prefix=''):
if logger:
self._logger = self.Logger(logger, prefix=prefix)
return self._logger
def timeout(self, timeout=None):
if timeout:
self._timeout = timeout
return self._timeout
def eol(self, eol=None):
if eol:
self._eol = eol
return self._eol
def close(self):
if self._logger:
self._logger.write('\n')
self._logger.flush()
def send(self, data, eol=None):
if eol is None:
eol = self._eol
return self.write(data + eol)
def write(self, data):
return os.write(self.master, data)
def expect(self, pattern, timeout=None, escape=False):
self.match = None
self.before = None
self.after = None
if escape:
pattern = re.escape(pattern)
pattern = re.compile(pattern)
if timeout is None:
timeout = self._timeout
while timeout >= 0:
start_time = time.time()
try:
data = self.read(timeout=timeout, raise_exception=True)
except TimeoutError:
if self._logger:
self._logger.write(self.buffer + '\n')
self._logger.flush()
exception = ExpectTimeoutError(pattern, timeout, self.buffer)
self.buffer = None
raise exception
timeout -= (time.time() - start_time)
if data:
self.buffer = self.buffer + data if self.buffer else data
self.match = pattern.search(self.buffer, 0)
if self.match:
self.after = self.buffer[self.match.start():self.match.end()]
self.before = self.buffer[:self.match.start()]
self.buffer = self.buffer[self.match.end():]
break
if self._logger:
self._logger.write((self.before or '') + (self.after or ''))
self._logger.flush()
return self.match
def read(self, timeout=0, raise_exception=False):
data = ''
try:
while timeout >= 0 :
start_time = time.time()
data += self.queue.get(timeout=timeout)
if data:
break
timeout -= (time.time() - start_time)
except Empty:
if data:
return data
if raise_exception:
raise TimeoutError(timeout)
pass
return data
def spawn(command):
master, slave = pty.openpty()
process = Popen(command, preexec_fn=os.setsid, stdout=slave, stdin=slave, stderr=slave, bufsize=1)
os.close(slave)
queue = Queue()
thread = Thread(target=reader, args=(process, master, queue))
thread.daemon = True
thread.start()
return IO(process, master, queue)
def reader(process, out, queue):
while True:
if process.poll() is not None:
data = os.read(out)
queue.put(data)
break
data = os.read(out, 65536)
queue.put(data)
if __name__ == '__main__':
io = spawn(['/bin/bash','--noediting'])
prompt = '\$ '
io.logger(sys.stdout)
io.timeout(2)
io.eol('\r')
io.expect(prompt)
io.send('clickhouse-client')
prompt = ':\) '
io.expect(prompt)
io.send('SELECT 1')
io.expect(prompt)
io.send('SHOW TABLES')
io.expect('.*\r\n.*')
io.expect(prompt)
io.close()