ClickHouse/tests/queries/0_stateless/00646_url_engine.python

187 lines
6.9 KiB
Plaintext
Raw Normal View History

#!/usr/bin/env python
from __future__ import print_function
import csv
import sys
import tempfile
import threading
import os, urllib
import subprocess
from io import StringIO
from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
CLICKHOUSE_HOST = os.environ.get('CLICKHOUSE_HOST', '127.0.0.1')
CLICKHOUSE_PORT_HTTP = os.environ.get('CLICKHOUSE_PORT_HTTP', '8123')
#####################################################################################
# This test starts an HTTP server and serves data to clickhouse url-engine based table.
# In order for it to work ip+port of http server (given below) should be
# accessible from clickhouse server.
#####################################################################################
# IP-address of this host accessible from outside world.
HTTP_SERVER_HOST = subprocess.check_output(['hostname', '-i']).decode('utf-8').strip()
HTTP_SERVER_PORT = int(os.environ.get('CLICKHOUSE_TEST_HOST_EXPOSED_PORT', 51234))
# IP address and port of the HTTP server started from this script.
HTTP_SERVER_ADDRESS = (HTTP_SERVER_HOST, HTTP_SERVER_PORT)
HTTP_SERVER_URL_STR = 'http://' + ':'.join(str(s) for s in HTTP_SERVER_ADDRESS) + "/"
CSV_DATA = os.path.join(tempfile._get_default_tempdir(), next(tempfile._get_candidate_names()))
def get_ch_answer(query):
url = os.environ.get('CLICKHOUSE_URL', 'http://{host}:{port}'.format(host=CLICKHOUSE_HOST, port=CLICKHOUSE_PORT_HTTP))
return urllib.urlopen(url, data=query).read()
def check_answers(query, answer):
ch_answer = get_ch_answer(query)
if ch_answer.strip() != answer.strip():
print("FAIL on query:", query, file=sys.stderr)
print("Expected answer:", answer, file=sys.stderr)
print("Fetched answer :", ch_answer, file=sys.stderr)
raise Exception("Fail on query")
class CSVHTTPServer(BaseHTTPRequestHandler):
def _set_headers(self):
self.send_response(200)
self.send_header('Content-type', 'text/csv')
self.end_headers()
def do_GET(self):
self._set_headers()
with open(CSV_DATA, 'r') as fl:
reader = csv.reader(fl, delimiter=',')
for row in reader:
self.wfile.write(', '.join(row) + '\n')
return
def read_chunk(self):
msg = ''
while True:
sym = self.rfile.read(1)
if sym == '':
break
msg += sym.decode('utf-8')
if msg.endswith('\r\n'):
break
length = int(msg[:-2], 16)
if length == 0:
return ''
content = self.rfile.read(length)
self.rfile.read(2) # read sep \r\n
return content.decode('utf-8')
def do_POST(self):
data = ''
while True:
chunk = self.read_chunk()
if not chunk:
break
data += chunk
text = ""
with StringIO(data) as fl:
reader = csv.reader(fl, delimiter=',')
with open(CSV_DATA, 'a') as d:
for row in reader:
d.write(','.join(row) + '\n')
self._set_headers()
self.wfile.write("ok")
def log_message(self, format, *args):
return
def start_server(requests_amount):
httpd = HTTPServer(HTTP_SERVER_ADDRESS, CSVHTTPServer)
def real_func():
for i in xrange(requests_amount):
httpd.handle_request()
t = threading.Thread(target=real_func)
return t
# test section
def test_select(table_name="", schema="str String,numuint UInt32,numint Int32,double Float64", requests=[], answers=[], test_data=""):
with open(CSV_DATA, 'w') as f: # clear file
f.write('')
if test_data:
with open(CSV_DATA, 'w') as f:
f.write(test_data + "\n")
if table_name:
get_ch_answer("drop table if exists {}".format(table_name))
get_ch_answer("create table {} ({}) engine=URL('{}', 'CSV')".format(table_name, schema, HTTP_SERVER_URL_STR))
for i in xrange(len(requests)):
tbl = table_name
if not tbl:
tbl = "url('{addr}', 'CSV', '{schema}')".format(addr=HTTP_SERVER_URL_STR, schema=schema)
check_answers(requests[i].format(tbl=tbl), answers[i])
if table_name:
get_ch_answer("drop table if exists {}".format(table_name))
def test_insert(table_name="", schema="str String,numuint UInt32,numint Int32,double Float64", requests_insert=[], requests_select=[], answers=[]):
with open(CSV_DATA, 'w') as f: # flush test file
f.write('')
if table_name:
get_ch_answer("drop table if exists {}".format(table_name))
get_ch_answer("create table {} ({}) engine=URL('{}', 'CSV')".format(table_name, schema, HTTP_SERVER_URL_STR))
for req in requests_insert:
tbl = table_name
if not tbl:
tbl = "table function url('{addr}', 'CSV', '{schema}')".format(addr=HTTP_SERVER_URL_STR, schema=schema)
get_ch_answer(req.format(tbl=tbl))
for i in xrange(len(requests_select)):
tbl = table_name
if not tbl:
tbl = "url('{addr}', 'CSV', '{schema}')".format(addr=HTTP_SERVER_URL_STR, schema=schema)
check_answers(requests_select[i].format(tbl=tbl), answers[i])
if table_name:
get_ch_answer("drop table if exists {}".format(table_name))
def main():
test_data = "Hello,2,-2,7.7\nWorld,2,-5,8.8"
select_only_requests = {
"select str,numuint,numint,double from {tbl}" : test_data.replace(',', '\t'),
"select numuint, count(*) from {tbl} group by numuint" : "2\t2",
"select str,numuint,numint,double from {tbl} limit 1": test_data.split("\n")[0].replace(',', '\t'),
}
insert_requests = [
"insert into {tbl} values('Hello',10,-2,7.7)('World',10,-5,7.7)",
"insert into {tbl} select 'Buy', number, 9-number, 9.9 from system.numbers limit 10",
]
select_requests = {
"select distinct numuint from {tbl} order by numuint": '\n'.join([str(i) for i in xrange(11)]),
"select count(*) from {tbl}": '12',
'select double, count(*) from {tbl} group by double': "7.7\t2\n9.9\t10"
}
t = start_server(len(select_only_requests) * 2 + (len(insert_requests) + len(select_requests)) * 2)
t.start()
# test table with url engine
test_select(table_name="test_table_select", requests=select_only_requests.keys(), answers=select_only_requests.values(), test_data=test_data)
# test table function url
test_select(requests=select_only_requests.keys(), answers=select_only_requests.values(), test_data=test_data)
#test insert into table with url engine
test_insert(table_name="test_table_insert", requests_insert=insert_requests, requests_select=select_requests.keys(), answers=select_requests.values())
#test insert into table function url
test_insert(requests_insert=insert_requests, requests_select=select_requests.keys(), answers=select_requests.values())
t.join()
print("PASSED")
if __name__ == "__main__":
try:
main()
except:
os._exit(1)