From ceef414cb85d41aa668fda095a90137ce4df5092 Mon Sep 17 00:00:00 2001 From: alesapin Date: Tue, 12 Jun 2018 18:59:43 +0300 Subject: [PATCH] Add simple tests for table function url and table function engine --- .../0_stateless/00646_url_engine.python | 171 ++++++++++++++++++ .../0_stateless/00646_url_engine.reference | 1 + .../queries/0_stateless/00646_url_engine.sh | 8 + 3 files changed, 180 insertions(+) create mode 100644 dbms/tests/queries/0_stateless/00646_url_engine.python create mode 100644 dbms/tests/queries/0_stateless/00646_url_engine.reference create mode 100755 dbms/tests/queries/0_stateless/00646_url_engine.sh diff --git a/dbms/tests/queries/0_stateless/00646_url_engine.python b/dbms/tests/queries/0_stateless/00646_url_engine.python new file mode 100644 index 00000000000..488c929a210 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00646_url_engine.python @@ -0,0 +1,171 @@ +#!/usr/bin/env python +from __future__ import print_function +import csv +import tempfile +import threading +import os, urllib +from io import StringIO +from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer + + +SERVER_ADDRESS = ('127.0.0.1', 51234) +SERVER_ADDRESS_STR = 'http://' + ':'.join(str(s) for s in SERVER_ADDRESS) + "/" +CSV_DATA = os.path.join(tempfile._get_default_tempdir(), next(tempfile._get_candidate_names())) + + +def get_ch_answer(query): + return urllib.urlopen(os.environ.get('CLICKHOUSE_URL', 'http://localhost:' + os.environ.get('CLICKHOUSE_PORT_HTTP', '8123')), data=query).read() + +def check_answers(query, answer): + ch_answer = get_ch_answer(query) + if ch_answer.strip() != answer.strip(): + print("FAIL on query:", query) + print("Expected answer:", answer) + print("Fetched answer :", ch_answer) + raise Exception("Fail on query") + +class CSVHTTPServer(BaseHTTPRequestHandler): + def _set_headers(self): + self.send_response(200) + self.send_header('Content-type', 'text/csv') + self.end_headers() + + def do_GET(self): + self._set_headers() + with open(CSV_DATA, 'r') as fl: + reader = csv.reader(fl, delimiter=',') + for row in reader: + self.wfile.write(', '.join(row) + '\n') + return + + def read_chunk(self): + msg = '' + while True: + sym = self.rfile.read(1) + if sym == '': + break + msg += sym.decode('utf-8') + if msg.endswith('\r\n'): + break + length = int(msg[:-2], 16) + if length == 0: + return '' + content = self.rfile.read(length) + self.rfile.read(2) # read sep \r\n + return content.decode('utf-8') + + def do_POST(self): + data = '' + while True: + chunk = self.read_chunk() + if not chunk: + break + data += chunk + text = "" + with StringIO(data) as fl: + reader = csv.reader(fl, delimiter=',') + with open(CSV_DATA, 'a') as d: + for row in reader: + d.write(','.join(row) + '\n') + self._set_headers() + self.wfile.write("ok") + + def log_message(self, format, *args): + return + +def start_server(requests_amount): + httpd = HTTPServer(SERVER_ADDRESS, CSVHTTPServer) + + def real_func(): + for i in xrange(requests_amount): + httpd.handle_request() + + t = threading.Thread(target=real_func) + return t + +# test section + +def test_select(table_name="", schema="str String,numuint UInt32,numint Int32,double Float64", requests=[], answers=[], test_data=""): + with open(CSV_DATA, 'w') as f: # clear file + f.write('') + + if test_data: + with open(CSV_DATA, 'w') as f: + f.write(test_data + "\n") + + if table_name: + get_ch_answer("drop table if exists {}".format(table_name)) + get_ch_answer("create table {} ({}) engine=URL('{}', 'CSV')".format(table_name, schema, SERVER_ADDRESS_STR)) + + for i in xrange(len(requests)): + tbl = table_name + if not tbl: + tbl = "url('{addr}', 'CSV', '{schema}')".format(addr=SERVER_ADDRESS_STR, schema=schema) + check_answers(requests[i].format(tbl=tbl), answers[i]) + + if table_name: + get_ch_answer("drop table if exists {}".format(table_name)) + +def test_insert(table_name="", schema="str String,numuint UInt32,numint Int32,double Float64", requests_insert=[], requests_select=[], answers=[]): + with open(CSV_DATA, 'w') as f: # flush test file + f.write('') + + if table_name: + get_ch_answer("drop table if exists {}".format(table_name)) + get_ch_answer("create table {} ({}) engine=URL('{}', 'CSV')".format(table_name, schema, SERVER_ADDRESS_STR)) + + for req in requests_insert: + tbl = table_name + if not tbl: + tbl = "table function url('{addr}', 'CSV', '{schema}')".format(addr=SERVER_ADDRESS_STR, schema=schema) + get_ch_answer(req.format(tbl=tbl)) + + + for i in xrange(len(requests_select)): + tbl = table_name + if not tbl: + tbl = "url('{addr}', 'CSV', '{schema}')".format(addr=SERVER_ADDRESS_STR, schema=schema) + check_answers(requests_select[i].format(tbl=tbl), answers[i]) + + if table_name: + get_ch_answer("drop table if exists {}".format(table_name)) + + +def main(): + test_data = "Hello,2,-2,7.7\nWorld,2,-5,8.8" + select_table_input = { + "select str,numuint,numint,double from {tbl}" : test_data.replace(',', '\t'), + "select numuint, count(*) from {tbl} group by numuint" : "2\t2", + "select str,numuint,numint,double from {tbl} limit 1": test_data.split("\n")[0].replace(',', '\t'), + } + + insert_requests = [ + "insert into {tbl} values('Hello',10,-2,7.7)('World',10,-5,7.7)", + "insert into {tbl} select 'Buy', number, 9-number, 9.9 from system.numbers limit 10", + ] + + select_requests = { + "select distinct numuint from {tbl} order by numuint": '\n'.join([str(i) for i in xrange(11)]), + "select count(*) from {tbl}": '12', + 'select double, count(*) from {tbl} group by double': "7.7\t2\n9.9\t10" + } + + t = start_server(len(select_table_input) * 2 + (len(insert_requests) + len(select_requests)) * 2) + t.start() + # test table with url engine + test_select(table_name="test_table_select", requests=select_table_input.keys(), answers=select_table_input.values(), test_data=test_data) + # test table function url + test_select(requests=select_table_input.keys(), answers=select_table_input.values(), test_data=test_data) + #test insert into table with url engine + test_insert(table_name="test_table_insert", requests_insert=insert_requests, requests_select=select_requests.keys(), answers=select_requests.values()) + #test insert into table function url + test_insert(requests_insert=insert_requests, requests_select=select_requests.keys(), answers=select_requests.values()) + t.join() + print("PASSED") + + +if __name__ == "__main__": + try: + main() + except: + os._exit(1) diff --git a/dbms/tests/queries/0_stateless/00646_url_engine.reference b/dbms/tests/queries/0_stateless/00646_url_engine.reference new file mode 100644 index 00000000000..53cdf1e9393 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00646_url_engine.reference @@ -0,0 +1 @@ +PASSED diff --git a/dbms/tests/queries/0_stateless/00646_url_engine.sh b/dbms/tests/queries/0_stateless/00646_url_engine.sh new file mode 100755 index 00000000000..e218a41c28b --- /dev/null +++ b/dbms/tests/queries/0_stateless/00646_url_engine.sh @@ -0,0 +1,8 @@ +#!/usr/bin/env bash + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +. $CURDIR/../shell_config.sh + +# We should have correct env vars from shell_config.sh to run this test + +python $CURDIR/00646_url_engine.python