diff --git a/dbms/tests/integration/test_storage_s3/test.py b/dbms/tests/integration/test_storage_s3/test.py index 68765f4a6df..9cedc5b8b4f 100644 --- a/dbms/tests/integration/test_storage_s3/test.py +++ b/dbms/tests/integration/test_storage_s3/test.py @@ -14,54 +14,36 @@ def started_cluster(): cluster.shutdown() +import json import os -import socket -import subprocess -import sys -import tempfile -import threading import time -try: - import urllib.parse as urlparse -except ImportError: - import urlparse - -try: - from BaseHTTPServer import BaseHTTPRequestHandler -except ImportError: - from http.server import BaseHTTPRequestHandler - -try: - from BaseHTTPServer import HTTPServer -except ImportError: - from http.server import HTTPServer - - -received_data = [] -received_data_completed = False - - def test_sophisticated_default(started_cluster): instance = started_cluster.instances['dummy'] - - def GetFreeTCPPortsAndIP(n): - result = [] - sockets = [] - for i in range(n): - tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - tcp.bind(('localhost', 0)) - addr, port = tcp.getsockname() - result.append(port) - sockets.append(tcp) - [ s.close() for s in sockets ] - return result, addr + instance.copy_file_to_container(os.path.join(os.path.dirname(__file__), 'test_server.py'), 'test_server.py') + communication_path = '/test_sophisticated_default' + instance.exec_in_container(['python', 'test_server.py', communication_path], detach=True) format = 'column1 UInt32, column2 UInt32, column3 UInt32' values = '(1, 2, 3), (3, 2, 1), (78, 43, 45)' other_values = '(1, 1, 1), (1, 1, 1), (11, 11, 11)' - (redirecting_to_http_port, redirecting_to_https_port, preserving_data_port, redirecting_preserving_data_port), localhost = GetFreeTCPPortsAndIP(4) + for i in range(10): + try: + raw = instance.exec_in_container(['cat', communication_path]) + data = json.loads(instance.exec_in_container(['cat', communication_path])) + redirecting_to_http_port = data['redirecting_to_http_port'] + redirecting_to_https_port = data['redirecting_to_https_port'] + preserving_data_port = data['preserving_data_port'] + redirecting_preserving_data_port = data['redirecting_preserving_data_port'] + localhost = data['localhost'] + except: + time.sleep(0.5) + else: + break + else: + assert False, 'Could not initialize mock server' + str(raw) + redirecting_host = localhost bucket = 'abc' @@ -88,189 +70,6 @@ def test_sophisticated_default(started_cluster): "select *, column1*column2*column3 from s3('http://{}:{}/{}/test.csv', 'CSV', '{}')".format(localhost, preserving_data_port, bucket, format), ] - - class RedirectingToHTTPHandler(BaseHTTPRequestHandler): - def do_GET(self): - self.send_response(307) - self.send_header('Content-type', 'text/xml') - self.send_header('Location', 'http://storage.yandexcloud.net/milovidov/test.csv') - self.end_headers() - self.wfile.write(r''' - - TemporaryRedirect - Please re-send this request to the specified temporary endpoint. - Continue to use the original request endpoint for future requests. - storage.yandexcloud.net - '''.encode()) - self.finish() - - - class RedirectingToHTTPSHandler(BaseHTTPRequestHandler): - def do_GET(self): - self.send_response(307) - self.send_header('Content-type', 'text/xml') - self.send_header('Location', 'https://storage.yandexcloud.net/milovidov/test.csv') - self.end_headers() - self.wfile.write(r''' - - TemporaryRedirect - Please re-send this request to the specified temporary endpoint. - Continue to use the original request endpoint for future requests. - storage.yandexcloud.net - '''.encode()) - self.finish() - - - class PreservingDataHandler(BaseHTTPRequestHandler): - protocol_version = 'HTTP/1.1' - - def parse_request(self): - result = BaseHTTPRequestHandler.parse_request(self) - # Adaptation to Python 3. - if sys.version_info.major == 2 and result == True: - expect = self.headers.get('Expect', "") - if (expect.lower() == "100-continue" and self.protocol_version >= "HTTP/1.1" and self.request_version >= "HTTP/1.1"): - if not self.handle_expect_100(): - return False - return result - - def send_response_only(self, code, message=None): - if message is None: - if code in self.responses: - message = self.responses[code][0] - else: - message = '' - if self.request_version != 'HTTP/0.9': - self.wfile.write("%s %d %s\r\n" % (self.protocol_version, code, message)) - - def handle_expect_100(self): - print('Received Expect-100') - self.send_response_only(100) - self.end_headers() - return True - - def do_POST(self): - self.send_response(200) - query = urlparse.urlparse(self.path).query - print('POST', query) - if query == 'uploads': - data = r''' - TEST'''.encode() - self.send_header('Content-length', str(len(data))) - self.send_header('Content-type', 'text/plain') - self.end_headers() - self.wfile.write(data) - else: - data = self.rfile.read(int(self.headers.get('Content-Length'))) - assert query == 'uploadId=TEST' - assert data == b'1hello-etag' - self.send_header('Content-type', 'text/plain') - self.end_headers() - global received_data_completed - received_data_completed = True - self.finish() - - def do_PUT(self): - self.send_response(200) - self.send_header('Content-type', 'text/plain') - self.send_header('ETag', 'hello-etag') - self.end_headers() - query = urlparse.urlparse(self.path).query - path = urlparse.urlparse(self.path).path - print('Content-Length =', self.headers.get('Content-Length')) - print('PUT', query) - assert self.headers.get('Content-Length') - assert self.headers['Expect'] == '100-continue' - data = self.rfile.read() - received_data.append(data) - print('PUT to {}'.format(path)) - self.server.storage[path] = data - self.finish() - - def do_GET(self): - path = urlparse.urlparse(self.path).path - if path in self.server.storage: - self.send_response(200) - self.send_header('Content-type', 'text/plain') - self.send_header('Content-length', str(len(self.server.storage[path]))) - self.end_headers() - self.wfile.write(self.server.storage[path]) - else: - self.send_response(404) - self.end_headers() - self.finish() - - - class RedirectingPreservingDataHandler(BaseHTTPRequestHandler): - protocol_version = 'HTTP/1.1' - - def parse_request(self): - result = BaseHTTPRequestHandler.parse_request(self) - # Adaptation to Python 3. - if sys.version_info.major == 2 and result == True: - expect = self.headers.get('Expect', "") - if (expect.lower() == "100-continue" and self.protocol_version >= "HTTP/1.1" and self.request_version >= "HTTP/1.1"): - if not self.handle_expect_100(): - return False - return result - - def send_response_only(self, code, message=None): - if message is None: - if code in self.responses: - message = self.responses[code][0] - else: - message = '' - if self.request_version != 'HTTP/0.9': - self.wfile.write("%s %d %s\r\n" % (self.protocol_version, code, message)) - - def handle_expect_100(self): - print('Received Expect-100') - return True - - def do_POST(self): - query = urlparse.urlparse(self.path).query - if query: - query = '?{}'.format(query) - self.send_response(307) - self.send_header('Content-type', 'text/xml') - self.send_header('Location', 'http://{host}:{port}/{bucket}/test.csv{query}'.format(host=localhost, port=preserving_data_port, bucket=bucket, query=query)) - self.end_headers() - self.wfile.write(r''' - - TemporaryRedirect - Please re-send this request to the specified temporary endpoint. - Continue to use the original request endpoint for future requests. - {host}:{port} - '''.format(host=localhost, port=preserving_data_port).encode()) - self.finish() - - def do_PUT(self): - query = urlparse.urlparse(self.path).query - if query: - query = '?{}'.format(query) - self.send_response(307) - self.send_header('Content-type', 'text/xml') - self.send_header('Location', 'http://{host}:{port}/{bucket}/test.csv{query}'.format(host=localhost, port=preserving_data_port, bucket=bucket, query=query)) - self.end_headers() - self.wfile.write(r''' - - TemporaryRedirect - Please re-send this request to the specified temporary endpoint. - Continue to use the original request endpoint for future requests. - {host}:{port} - '''.format(host=localhost, port=preserving_data_port).encode()) - self.finish() - - - servers = [] - servers.append(HTTPServer((redirecting_host, redirecting_to_https_port), RedirectingToHTTPSHandler)) - servers.append(HTTPServer((redirecting_host, redirecting_to_http_port), RedirectingToHTTPHandler)) - servers.append(HTTPServer((redirecting_host, preserving_data_port), PreservingDataHandler)) - servers[-1].storage = {} - servers.append(HTTPServer((redirecting_host, redirecting_preserving_data_port), RedirectingPreservingDataHandler)) - jobs = [ threading.Thread(target=server.serve_forever) for server in servers ] - [ job.start() for job in jobs ] - try: print('Phase 1') for query in prepare_put_queries: @@ -287,11 +86,24 @@ def test_sophisticated_default(started_cluster): print('Phase 3') query = put_query - global received_data_completed - received_data_completed = False run_query(query) + for i in range(10): + try: + data = json.loads(instance.exec_in_container(['cat', communication_path])) + received_data_completed = data['received_data_completed'] + received_data = data['received_data'] + finalize_data = data['finalize_data'] + finalize_data_query = data['finalize_data_query'] + except: + time.sleep(0.5) + else: + break + else: + assert False, 'Could not read data from mock server'+str(data) assert received_data[-1].decode() == '1,2,3\n3,2,1\n78,43,45\n' assert received_data_completed + assert finalize_data == '1hello-etag' + assert finalize_data_query == 'uploadId=TEST' print('Phase 4') query = redirect_put_query @@ -307,8 +119,4 @@ def test_sophisticated_default(started_cluster): ] finally: - print('Shutting down') - [ server.shutdown() for server in servers ] - print('Joining threads') - [ job.join() for job in jobs ] print('Done') diff --git a/dbms/tests/integration/test_storage_s3/test_server.py b/dbms/tests/integration/test_storage_s3/test_server.py new file mode 100644 index 00000000000..9b2ac3bdb60 --- /dev/null +++ b/dbms/tests/integration/test_storage_s3/test_server.py @@ -0,0 +1,250 @@ +try: + from BaseHTTPServer import BaseHTTPRequestHandler +except ImportError: + from http.server import BaseHTTPRequestHandler + +try: + from BaseHTTPServer import HTTPServer +except ImportError: + from http.server import HTTPServer + +try: + import urllib.parse as urlparse +except ImportError: + import urlparse + +import json +import logging +import os +import socket +import sys +import threading +import time + + +logging.getLogger().setLevel(logging.INFO) +file_handler = logging.FileHandler('/var/log/clickhouse-server/test-server.log', 'a', encoding='utf-8') +file_handler.setFormatter(logging.Formatter('%(asctime)s %(message)s')) +logging.getLogger().addHandler(file_handler) +logging.getLogger().addHandler(logging.StreamHandler()) + +comm_path = sys.argv[1] + +def GetFreeTCPPortsAndIP(n): + result = [] + sockets = [] + for i in range(n): + tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + tcp.bind((socket.gethostname(), 0)) + addr, port = tcp.getsockname() + result.append(port) + sockets.append(tcp) + [ s.close() for s in sockets ] + return result, addr + +(redirecting_to_http_port, redirecting_to_https_port, preserving_data_port, redirecting_preserving_data_port), localhost = GetFreeTCPPortsAndIP(4) +data = { + 'redirecting_to_http_port': redirecting_to_http_port, + 'redirecting_to_https_port': redirecting_to_https_port, + 'preserving_data_port': preserving_data_port, + 'redirecting_preserving_data_port': redirecting_preserving_data_port, + 'localhost': localhost +} +redirecting_host = localhost + +with open(comm_path, 'w') as f: + f.write(json.dumps(data)) + + +class RedirectingToHTTPHandler(BaseHTTPRequestHandler): + def do_GET(self): + self.send_response(307) + self.send_header('Content-type', 'text/xml') + self.send_header('Location', 'http://storage.yandexcloud.net/milovidov/test.csv') + self.end_headers() + self.wfile.write(r''' + + TemporaryRedirect + Please re-send this request to the specified temporary endpoint. + Continue to use the original request endpoint for future requests. + storage.yandexcloud.net +'''.encode()) + self.finish() + + +class RedirectingToHTTPSHandler(BaseHTTPRequestHandler): + def do_GET(self): + self.send_response(307) + self.send_header('Content-type', 'text/xml') + self.send_header('Location', 'https://storage.yandexcloud.net/milovidov/test.csv') + self.end_headers() + self.wfile.write(r''' + + TemporaryRedirect + Please re-send this request to the specified temporary endpoint. + Continue to use the original request endpoint for future requests. + storage.yandexcloud.net +'''.encode()) + self.finish() + + +class PreservingDataHandler(BaseHTTPRequestHandler): + protocol_version = 'HTTP/1.1' + + def parse_request(self): + result = BaseHTTPRequestHandler.parse_request(self) + # Adaptation to Python 3. + if sys.version_info.major == 2 and result == True: + expect = self.headers.get('Expect', "") + if (expect.lower() == "100-continue" and self.protocol_version >= "HTTP/1.1" and self.request_version >= "HTTP/1.1"): + if not self.handle_expect_100(): + return False + return result + + def send_response_only(self, code, message=None): + if message is None: + if code in self.responses: + message = self.responses[code][0] + else: + message = '' + if self.request_version != 'HTTP/0.9': + self.wfile.write("%s %d %s\r\n" % (self.protocol_version, code, message)) + + def handle_expect_100(self): + logging.info('Received Expect-100') + self.send_response_only(100) + self.end_headers() + return True + + def do_POST(self): + self.send_response(200) + query = urlparse.urlparse(self.path).query + logging.info('POST ' + query) + if query == 'uploads': + post_data = r''' +TEST'''.encode() + self.send_header('Content-length', str(len(post_data))) + self.send_header('Content-type', 'text/plain') + self.end_headers() + self.wfile.write(post_data) + else: + post_data = self.rfile.read(int(self.headers.get('Content-Length'))) + self.send_header('Content-type', 'text/plain') + self.end_headers() + data['received_data_completed'] = True + data['finalize_data'] = post_data + data['finalize_data_query'] = query + with open(comm_path, 'w') as f: + f.write(json.dumps(data)) + self.finish() + + def do_PUT(self): + self.send_response(200) + self.send_header('Content-type', 'text/plain') + self.send_header('ETag', 'hello-etag') + self.end_headers() + query = urlparse.urlparse(self.path).query + path = urlparse.urlparse(self.path).path + logging.info('Content-Length = ' + self.headers.get('Content-Length')) + logging.info('PUT ' + query) + assert self.headers.get('Content-Length') + assert self.headers['Expect'] == '100-continue' + put_data = self.rfile.read() + data.setdefault('received_data', []).append(put_data) + with open(comm_path, 'w') as f: + f.write(json.dumps(data)) + logging.info('PUT to {}'.format(path)) + self.server.storage[path] = put_data + self.finish() + + def do_GET(self): + path = urlparse.urlparse(self.path).path + if path in self.server.storage: + self.send_response(200) + self.send_header('Content-type', 'text/plain') + self.send_header('Content-length', str(len(self.server.storage[path]))) + self.end_headers() + self.wfile.write(self.server.storage[path]) + else: + self.send_response(404) + self.end_headers() + self.finish() + + +class RedirectingPreservingDataHandler(BaseHTTPRequestHandler): + protocol_version = 'HTTP/1.1' + + def parse_request(self): + result = BaseHTTPRequestHandler.parse_request(self) + # Adaptation to Python 3. + if sys.version_info.major == 2 and result == True: + expect = self.headers.get('Expect', "") + if (expect.lower() == "100-continue" and self.protocol_version >= "HTTP/1.1" and self.request_version >= "HTTP/1.1"): + if not self.handle_expect_100(): + return False + return result + + def send_response_only(self, code, message=None): + if message is None: + if code in self.responses: + message = self.responses[code][0] + else: + message = '' + if self.request_version != 'HTTP/0.9': + self.wfile.write("%s %d %s\r\n" % (self.protocol_version, code, message)) + + def handle_expect_100(self): + logging.info('Received Expect-100') + return True + + def do_POST(self): + query = urlparse.urlparse(self.path).query + if query: + query = '?{}'.format(query) + self.send_response(307) + self.send_header('Content-type', 'text/xml') + self.send_header('Location', 'http://{host}:{port}/{bucket}/test.csv{query}'.format(host=localhost, port=preserving_data_port, bucket=bucket, query=query)) + self.end_headers() + self.wfile.write(r''' + + TemporaryRedirect + Please re-send this request to the specified temporary endpoint. + Continue to use the original request endpoint for future requests. + {host}:{port} +'''.format(host=localhost, port=preserving_data_port).encode()) + self.finish() + + def do_PUT(self): + query = urlparse.urlparse(self.path).query + if query: + query = '?{}'.format(query) + self.send_response(307) + self.send_header('Content-type', 'text/xml') + self.send_header('Location', 'http://{host}:{port}/{bucket}/test.csv{query}'.format(host=localhost, port=preserving_data_port, bucket=bucket, query=query)) + self.end_headers() + self.wfile.write(r''' + + TemporaryRedirect + Please re-send this request to the specified temporary endpoint. + Continue to use the original request endpoint for future requests. + {host}:{port} +'''.format(host=localhost, port=preserving_data_port).encode()) + self.finish() + + +servers = [] +servers.append(HTTPServer((redirecting_host, redirecting_to_https_port), RedirectingToHTTPSHandler)) +servers.append(HTTPServer((redirecting_host, redirecting_to_http_port), RedirectingToHTTPHandler)) +servers.append(HTTPServer((redirecting_host, preserving_data_port), PreservingDataHandler)) +servers[-1].storage = {} +servers.append(HTTPServer((redirecting_host, redirecting_preserving_data_port), RedirectingPreservingDataHandler)) +jobs = [ threading.Thread(target=server.serve_forever) for server in servers ] +[ job.start() for job in jobs ] + +time.sleep(60) # Timeout + +logging.info('Shutting down') +[ server.shutdown() for server in servers ] +logging.info('Joining threads') +[ job.join() for job in jobs ] +logging.info('Done')