mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-24 08:32:02 +00:00
Add simple tests for table function url and table function engine
This commit is contained in:
parent
df4f8bea35
commit
ceef414cb8
171
dbms/tests/queries/0_stateless/00646_url_engine.python
Normal file
171
dbms/tests/queries/0_stateless/00646_url_engine.python
Normal file
@ -0,0 +1,171 @@
|
|||||||
|
#!/usr/bin/env python
|
||||||
|
from __future__ import print_function
|
||||||
|
import csv
|
||||||
|
import tempfile
|
||||||
|
import threading
|
||||||
|
import os, urllib
|
||||||
|
from io import StringIO
|
||||||
|
from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
|
||||||
|
|
||||||
|
|
||||||
|
SERVER_ADDRESS = ('127.0.0.1', 51234)
|
||||||
|
SERVER_ADDRESS_STR = 'http://' + ':'.join(str(s) for s in SERVER_ADDRESS) + "/"
|
||||||
|
CSV_DATA = os.path.join(tempfile._get_default_tempdir(), next(tempfile._get_candidate_names()))
|
||||||
|
|
||||||
|
|
||||||
|
def get_ch_answer(query):
|
||||||
|
return urllib.urlopen(os.environ.get('CLICKHOUSE_URL', 'http://localhost:' + os.environ.get('CLICKHOUSE_PORT_HTTP', '8123')), data=query).read()
|
||||||
|
|
||||||
|
def check_answers(query, answer):
|
||||||
|
ch_answer = get_ch_answer(query)
|
||||||
|
if ch_answer.strip() != answer.strip():
|
||||||
|
print("FAIL on query:", query)
|
||||||
|
print("Expected answer:", answer)
|
||||||
|
print("Fetched answer :", ch_answer)
|
||||||
|
raise Exception("Fail on query")
|
||||||
|
|
||||||
|
class CSVHTTPServer(BaseHTTPRequestHandler):
|
||||||
|
def _set_headers(self):
|
||||||
|
self.send_response(200)
|
||||||
|
self.send_header('Content-type', 'text/csv')
|
||||||
|
self.end_headers()
|
||||||
|
|
||||||
|
def do_GET(self):
|
||||||
|
self._set_headers()
|
||||||
|
with open(CSV_DATA, 'r') as fl:
|
||||||
|
reader = csv.reader(fl, delimiter=',')
|
||||||
|
for row in reader:
|
||||||
|
self.wfile.write(', '.join(row) + '\n')
|
||||||
|
return
|
||||||
|
|
||||||
|
def read_chunk(self):
|
||||||
|
msg = ''
|
||||||
|
while True:
|
||||||
|
sym = self.rfile.read(1)
|
||||||
|
if sym == '':
|
||||||
|
break
|
||||||
|
msg += sym.decode('utf-8')
|
||||||
|
if msg.endswith('\r\n'):
|
||||||
|
break
|
||||||
|
length = int(msg[:-2], 16)
|
||||||
|
if length == 0:
|
||||||
|
return ''
|
||||||
|
content = self.rfile.read(length)
|
||||||
|
self.rfile.read(2) # read sep \r\n
|
||||||
|
return content.decode('utf-8')
|
||||||
|
|
||||||
|
def do_POST(self):
|
||||||
|
data = ''
|
||||||
|
while True:
|
||||||
|
chunk = self.read_chunk()
|
||||||
|
if not chunk:
|
||||||
|
break
|
||||||
|
data += chunk
|
||||||
|
text = ""
|
||||||
|
with StringIO(data) as fl:
|
||||||
|
reader = csv.reader(fl, delimiter=',')
|
||||||
|
with open(CSV_DATA, 'a') as d:
|
||||||
|
for row in reader:
|
||||||
|
d.write(','.join(row) + '\n')
|
||||||
|
self._set_headers()
|
||||||
|
self.wfile.write("ok")
|
||||||
|
|
||||||
|
def log_message(self, format, *args):
|
||||||
|
return
|
||||||
|
|
||||||
|
def start_server(requests_amount):
|
||||||
|
httpd = HTTPServer(SERVER_ADDRESS, CSVHTTPServer)
|
||||||
|
|
||||||
|
def real_func():
|
||||||
|
for i in xrange(requests_amount):
|
||||||
|
httpd.handle_request()
|
||||||
|
|
||||||
|
t = threading.Thread(target=real_func)
|
||||||
|
return t
|
||||||
|
|
||||||
|
# test section
|
||||||
|
|
||||||
|
def test_select(table_name="", schema="str String,numuint UInt32,numint Int32,double Float64", requests=[], answers=[], test_data=""):
|
||||||
|
with open(CSV_DATA, 'w') as f: # clear file
|
||||||
|
f.write('')
|
||||||
|
|
||||||
|
if test_data:
|
||||||
|
with open(CSV_DATA, 'w') as f:
|
||||||
|
f.write(test_data + "\n")
|
||||||
|
|
||||||
|
if table_name:
|
||||||
|
get_ch_answer("drop table if exists {}".format(table_name))
|
||||||
|
get_ch_answer("create table {} ({}) engine=URL('{}', 'CSV')".format(table_name, schema, SERVER_ADDRESS_STR))
|
||||||
|
|
||||||
|
for i in xrange(len(requests)):
|
||||||
|
tbl = table_name
|
||||||
|
if not tbl:
|
||||||
|
tbl = "url('{addr}', 'CSV', '{schema}')".format(addr=SERVER_ADDRESS_STR, schema=schema)
|
||||||
|
check_answers(requests[i].format(tbl=tbl), answers[i])
|
||||||
|
|
||||||
|
if table_name:
|
||||||
|
get_ch_answer("drop table if exists {}".format(table_name))
|
||||||
|
|
||||||
|
def test_insert(table_name="", schema="str String,numuint UInt32,numint Int32,double Float64", requests_insert=[], requests_select=[], answers=[]):
|
||||||
|
with open(CSV_DATA, 'w') as f: # flush test file
|
||||||
|
f.write('')
|
||||||
|
|
||||||
|
if table_name:
|
||||||
|
get_ch_answer("drop table if exists {}".format(table_name))
|
||||||
|
get_ch_answer("create table {} ({}) engine=URL('{}', 'CSV')".format(table_name, schema, SERVER_ADDRESS_STR))
|
||||||
|
|
||||||
|
for req in requests_insert:
|
||||||
|
tbl = table_name
|
||||||
|
if not tbl:
|
||||||
|
tbl = "table function url('{addr}', 'CSV', '{schema}')".format(addr=SERVER_ADDRESS_STR, schema=schema)
|
||||||
|
get_ch_answer(req.format(tbl=tbl))
|
||||||
|
|
||||||
|
|
||||||
|
for i in xrange(len(requests_select)):
|
||||||
|
tbl = table_name
|
||||||
|
if not tbl:
|
||||||
|
tbl = "url('{addr}', 'CSV', '{schema}')".format(addr=SERVER_ADDRESS_STR, schema=schema)
|
||||||
|
check_answers(requests_select[i].format(tbl=tbl), answers[i])
|
||||||
|
|
||||||
|
if table_name:
|
||||||
|
get_ch_answer("drop table if exists {}".format(table_name))
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
test_data = "Hello,2,-2,7.7\nWorld,2,-5,8.8"
|
||||||
|
select_table_input = {
|
||||||
|
"select str,numuint,numint,double from {tbl}" : test_data.replace(',', '\t'),
|
||||||
|
"select numuint, count(*) from {tbl} group by numuint" : "2\t2",
|
||||||
|
"select str,numuint,numint,double from {tbl} limit 1": test_data.split("\n")[0].replace(',', '\t'),
|
||||||
|
}
|
||||||
|
|
||||||
|
insert_requests = [
|
||||||
|
"insert into {tbl} values('Hello',10,-2,7.7)('World',10,-5,7.7)",
|
||||||
|
"insert into {tbl} select 'Buy', number, 9-number, 9.9 from system.numbers limit 10",
|
||||||
|
]
|
||||||
|
|
||||||
|
select_requests = {
|
||||||
|
"select distinct numuint from {tbl} order by numuint": '\n'.join([str(i) for i in xrange(11)]),
|
||||||
|
"select count(*) from {tbl}": '12',
|
||||||
|
'select double, count(*) from {tbl} group by double': "7.7\t2\n9.9\t10"
|
||||||
|
}
|
||||||
|
|
||||||
|
t = start_server(len(select_table_input) * 2 + (len(insert_requests) + len(select_requests)) * 2)
|
||||||
|
t.start()
|
||||||
|
# test table with url engine
|
||||||
|
test_select(table_name="test_table_select", requests=select_table_input.keys(), answers=select_table_input.values(), test_data=test_data)
|
||||||
|
# test table function url
|
||||||
|
test_select(requests=select_table_input.keys(), answers=select_table_input.values(), test_data=test_data)
|
||||||
|
#test insert into table with url engine
|
||||||
|
test_insert(table_name="test_table_insert", requests_insert=insert_requests, requests_select=select_requests.keys(), answers=select_requests.values())
|
||||||
|
#test insert into table function url
|
||||||
|
test_insert(requests_insert=insert_requests, requests_select=select_requests.keys(), answers=select_requests.values())
|
||||||
|
t.join()
|
||||||
|
print("PASSED")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
try:
|
||||||
|
main()
|
||||||
|
except:
|
||||||
|
os._exit(1)
|
@ -0,0 +1 @@
|
|||||||
|
PASSED
|
8
dbms/tests/queries/0_stateless/00646_url_engine.sh
Executable file
8
dbms/tests/queries/0_stateless/00646_url_engine.sh
Executable file
@ -0,0 +1,8 @@
|
|||||||
|
#!/usr/bin/env bash
|
||||||
|
|
||||||
|
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||||
|
. $CURDIR/../shell_config.sh
|
||||||
|
|
||||||
|
# We should have correct env vars from shell_config.sh to run this test
|
||||||
|
|
||||||
|
python $CURDIR/00646_url_engine.python
|
Loading…
Reference in New Issue
Block a user