diff --git a/tests/queries/0_stateless/01247_insert_into_watch_live_view_url.py b/tests/queries/0_stateless/01247_insert_into_watch_live_view_url.py new file mode 100755 index 00000000000..de7b3dd10cb --- /dev/null +++ b/tests/queries/0_stateless/01247_insert_into_watch_live_view_url.py @@ -0,0 +1,66 @@ +#!/usr/bin/env python +import os +import sys +import time +import signal + +CURDIR = os.path.dirname(os.path.realpath(__file__)) +sys.path.insert(0, os.path.join(CURDIR, 'helpers')) + +from httpechoserver import start_server, HTTP_SERVER_URL_STR +from client import client, prompt, end_of_block + +log = None +# uncomment the line below for debugging +#log=sys.stdout + +server = start_server(1) +server.start() + +with client(name='client1>', log=log) as client1, client(name='client2>', log=log) as client2: + client1.expect(prompt) + client2.expect(prompt) + + client1.send('SET allow_experimental_live_view = 1') + client1.expect(prompt) + + client1.send('DROP TABLE IF EXISTS test.lv') + client1.expect(prompt) + client1.send('DROP TABLE IF EXISTS test.mt') + client1.expect(prompt) + client1.send('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()') + client1.expect(prompt) + client1.send('CREATE LIVE VIEW test.lv AS SELECT sum(a) AS s FROM test.mt') + client1.expect(prompt) + + client1.send("INSERT INTO FUNCTION url('%s', CSV, 's Int32, version Int32') WATCH test.lv" % HTTP_SERVER_URL_STR) + client1.expect(r'0.*1' + end_of_block) + time.sleep(0.25) + sys.stdout.write("-- first insert --\n") + sys.stdout.write(server.out.read()) + + client2.send('INSERT INTO test.mt VALUES (1),(2),(3)') + client2.expect(prompt) + time.sleep(0.25) + sys.stdout.write("-- second insert --\n") + sys.stdout.write(server.out.read()) + + client2.send('INSERT INTO test.mt VALUES (4),(5),(6)') + client2.expect(prompt) + time.sleep(0.25) + sys.stdout.write("-- third insert --\n") + sys.stdout.write(server.out.read()) + + # send Ctrl-C + client1.send('\x03', eol='') + match = client1.expect('(%s)|([#\$] )' % prompt) + if match.groups()[1]: + client1.send(client1.command) + client1.expect(prompt) + + client1.send('DROP TABLE test.lv') + client1.expect(prompt) + client1.send('DROP TABLE test.mt') + client1.expect(prompt) + +server.join() diff --git a/tests/queries/0_stateless/01247_insert_into_watch_live_view_url.reference b/tests/queries/0_stateless/01247_insert_into_watch_live_view_url.reference new file mode 100644 index 00000000000..184d8f8e5a4 --- /dev/null +++ b/tests/queries/0_stateless/01247_insert_into_watch_live_view_url.reference @@ -0,0 +1,9 @@ +-- first insert -- +0,1 +-- second insert -- +0,1 +6,2 +-- third insert -- +0,1 +6,2 +21,3 diff --git a/tests/queries/0_stateless/helpers/httpechoserver.py b/tests/queries/0_stateless/helpers/httpechoserver.py new file mode 100644 index 00000000000..5687e4a94ce --- /dev/null +++ b/tests/queries/0_stateless/helpers/httpechoserver.py @@ -0,0 +1,104 @@ +#!/usr/bin/env python +from __future__ import print_function +import sys +import os +import time +import subprocess +import threading +from io import StringIO, SEEK_END +from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer + +CLICKHOUSE_HOST = os.environ.get('CLICKHOUSE_HOST', '127.0.0.1') +CLICKHOUSE_PORT_HTTP = os.environ.get('CLICKHOUSE_PORT_HTTP', '8123') + +# IP-address of this host accessible from outside world. +HTTP_SERVER_HOST = subprocess.check_output(['hostname', '-i']).decode('utf-8').strip() +HTTP_SERVER_PORT = int(os.environ.get('CLICKHOUSE_TEST_HOST_EXPOSED_PORT', 51234)) + +# IP address and port of the HTTP server started from this script. +HTTP_SERVER_ADDRESS = (HTTP_SERVER_HOST, HTTP_SERVER_PORT) +HTTP_SERVER_URL_STR = 'http://' + ':'.join(str(s) for s in HTTP_SERVER_ADDRESS) + "/" + +ostream = StringIO() +istream = StringIO() + +class EchoCSVHTTPServer(BaseHTTPRequestHandler): + def _set_headers(self): + self.send_response(200) + self.send_header('Content-type', 'text/csv') + self.end_headers() + + def do_GET(self): + self._set_headers() + with open(CSV_DATA, 'r') as fl: + ostream.seek(0) + for row in ostream: + self.wfile.write(row + '\n') + return + + def read_chunk(self): + msg = '' + while True: + sym = self.rfile.read(1) + if sym == '': + break + msg += sym.decode('utf-8') + if msg.endswith('\r\n'): + break + length = int(msg[:-2], 16) + if length == 0: + return '' + content = self.rfile.read(length) + self.rfile.read(2) # read sep \r\n + return content.decode('utf-8') + + def do_POST(self): + data = '' + while True: + chunk = self.read_chunk() + if not chunk: + break + data += chunk + pos = istream.tell() + istream.seek(SEEK_END) + istream.write(data) + istream.seek(pos) + text = "" + self._set_headers() + self.wfile.write("ok") + + def log_message(self, format, *args): + return + +def start_server(requests_amount, test_data="Hello,2,-2,7.7\nWorld,2,-5,8.8"): + ostream = StringIO(test_data.decode("utf-8")) + + httpd = HTTPServer(HTTP_SERVER_ADDRESS, EchoCSVHTTPServer) + + def real_func(): + for i in xrange(requests_amount): + httpd.handle_request() + + t = threading.Thread(target=real_func) + t.out = istream + return t + +def run(): + t = start_server(1) + t.start() + t.join() + +if __name__ == "__main__": + exception_text = '' + for i in range(1, 5): + try: + run() + break + except Exception as ex: + exception_text = str(ex) + time.sleep(0.1) + + if exception_text: + print("Exception: {}".format(exception_text), file=sys.stderr) + os._exit(1) +