#!/usr/bin/env python from __future__ import print_function import csv import sys import time import tempfile import threading import os, urllib import subprocess from io import StringIO from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer CLICKHOUSE_HOST = os.environ.get('CLICKHOUSE_HOST', '127.0.0.1') CLICKHOUSE_PORT_HTTP = os.environ.get('CLICKHOUSE_PORT_HTTP', '8123') ##################################################################################### # This test starts an HTTP server and serves data to clickhouse url-engine based table. # In order for it to work ip+port of http server (given below) should be # accessible from clickhouse server. ##################################################################################### # IP-address of this host accessible from outside world. HTTP_SERVER_HOST = subprocess.check_output(['hostname', '-i']).decode('utf-8').strip() HTTP_SERVER_PORT = 55123 # IP address and port of the HTTP server started from this script. HTTP_SERVER_ADDRESS = (HTTP_SERVER_HOST, HTTP_SERVER_PORT) HTTP_SERVER_URL_STR = 'http://' + ':'.join(str(s) for s in HTTP_SERVER_ADDRESS) + "/" CSV_DATA = os.path.join(tempfile._get_default_tempdir(), next(tempfile._get_candidate_names())) def get_ch_answer(query): url = os.environ.get('CLICKHOUSE_URL', 'http://{host}:{port}'.format(host=CLICKHOUSE_HOST, port=CLICKHOUSE_PORT_HTTP)) return urllib.urlopen(url, data=query).read() def check_answers(query, answer): ch_answer = get_ch_answer(query) if ch_answer.strip() != answer.strip(): print("FAIL on query:", query, file=sys.stderr) print("Expected answer:", answer, file=sys.stderr) print("Fetched answer :", ch_answer, file=sys.stderr) raise Exception("Fail on query") class CSVHTTPServer(BaseHTTPRequestHandler): def _set_headers(self): self.send_response(200) self.send_header('Content-type', 'text/csv') self.end_headers() def do_GET(self): self._set_headers() with open(CSV_DATA, 'r') as fl: reader = csv.reader(fl, delimiter=',') for row in reader: self.wfile.write(', '.join(row) + '\n') return def read_chunk(self): msg = '' while True: sym = self.rfile.read(1) if sym == '': break msg += sym.decode('utf-8') if msg.endswith('\r\n'): break length = int(msg[:-2], 16) if length == 0: return '' content = self.rfile.read(length) self.rfile.read(2) # read sep \r\n return content.decode('utf-8') def do_POST(self): data = '' while True: chunk = self.read_chunk() if not chunk: break data += chunk text = "" with StringIO(data) as fl: reader = csv.reader(fl, delimiter=',') with open(CSV_DATA, 'a') as d: for row in reader: d.write(','.join(row) + '\n') self._set_headers() self.wfile.write("ok") def log_message(self, format, *args): return def start_server(requests_amount): httpd = HTTPServer(HTTP_SERVER_ADDRESS, CSVHTTPServer) def real_func(): for i in xrange(requests_amount): httpd.handle_request() t = threading.Thread(target=real_func) return t # test section def test_select(table_name="", schema="str String,numuint UInt32,numint Int32,double Float64", requests=[], answers=[], test_data=""): with open(CSV_DATA, 'w') as f: # clear file f.write('') if test_data: with open(CSV_DATA, 'w') as f: f.write(test_data + "\n") if table_name: get_ch_answer("drop table if exists {}".format(table_name)) get_ch_answer("create table {} ({}) engine=URL('{}', 'CSV')".format(table_name, schema, HTTP_SERVER_URL_STR)) for i in xrange(len(requests)): tbl = table_name if not tbl: tbl = "url('{addr}', 'CSV', '{schema}')".format(addr=HTTP_SERVER_URL_STR, schema=schema) check_answers(requests[i].format(tbl=tbl), answers[i]) if table_name: get_ch_answer("drop table if exists {}".format(table_name)) def test_insert(table_name="", schema="str String,numuint UInt32,numint Int32,double Float64", requests_insert=[], requests_select=[], answers=[]): with open(CSV_DATA, 'w') as f: # flush test file f.write('') if table_name: get_ch_answer("drop table if exists {}".format(table_name)) get_ch_answer("create table {} ({}) engine=URL('{}', 'CSV')".format(table_name, schema, HTTP_SERVER_URL_STR)) for req in requests_insert: tbl = table_name if not tbl: tbl = "table function url('{addr}', 'CSV', '{schema}')".format(addr=HTTP_SERVER_URL_STR, schema=schema) get_ch_answer(req.format(tbl=tbl)) for i in xrange(len(requests_select)): tbl = table_name if not tbl: tbl = "url('{addr}', 'CSV', '{schema}')".format(addr=HTTP_SERVER_URL_STR, schema=schema) check_answers(requests_select[i].format(tbl=tbl), answers[i]) if table_name: get_ch_answer("drop table if exists {}".format(table_name)) def main(): test_data = "Hello,2,-2,7.7\nWorld,2,-5,8.8" select_only_requests = { "select str,numuint,numint,double from {tbl}" : test_data.replace(',', '\t'), "select numuint, count(*) from {tbl} group by numuint" : "2\t2", "select str,numuint,numint,double from {tbl} limit 1": test_data.split("\n")[0].replace(',', '\t'), } insert_requests = [ "insert into {tbl} values('Hello',10,-2,7.7)('World',10,-5,7.7)", "insert into {tbl} select 'Buy', number, 9-number, 9.9 from system.numbers limit 10", ] select_requests = { "select distinct numuint from {tbl} order by numuint": '\n'.join([str(i) for i in xrange(11)]), "select count(*) from {tbl}": '12', 'select double, count(*) from {tbl} group by double': "7.7\t2\n9.9\t10" } t = start_server(len(select_only_requests) * 2 + (len(insert_requests) + len(select_requests)) * 2) t.start() # test table with url engine test_select(table_name="test_table_select", requests=select_only_requests.keys(), answers=select_only_requests.values(), test_data=test_data) # test table function url test_select(requests=select_only_requests.keys(), answers=select_only_requests.values(), test_data=test_data) #test insert into table with url engine test_insert(table_name="test_table_insert", requests_insert=insert_requests, requests_select=select_requests.keys(), answers=select_requests.values()) #test insert into table function url test_insert(requests_insert=insert_requests, requests_select=select_requests.keys(), answers=select_requests.values()) t.join() print("PASSED") if __name__ == "__main__": exception_text = '' for i in range(1, 5): try: main() break except Exception as ex: exception_text = str(ex) time.sleep(0.1) if exception_text: print("Exception: {}".format(exception_text), file=sys.stderr) os._exit(1)