#!/usr/bin/env python3 import os import io import sys import requests import time import pandas as pd import numpy as np from scipy import stats CLICKHOUSE_HOST = os.environ.get('CLICKHOUSE_HOST', '127.0.0.1') CLICKHOUSE_PORT_HTTP = os.environ.get('CLICKHOUSE_PORT_HTTP', '8123') CLICKHOUSE_SERVER_URL_STR = 'http://' + ':'.join(str(s) for s in [CLICKHOUSE_HOST, CLICKHOUSE_PORT_HTTP]) + "/" class ClickHouseClient: def __init__(self, host = CLICKHOUSE_SERVER_URL_STR): self.host = host def query(self, query, connection_timeout = 1500): NUMBER_OF_TRIES = 30 DELAY = 10 for i in range(NUMBER_OF_TRIES): r = requests.post( self.host, params = {'timeout_before_checking_execution_speed': 120, 'max_execution_time': 6000}, timeout = connection_timeout, data = query) if r.status_code == 200: return r.text else: print('ATTENTION: try #%d failed' % i) if i != (NUMBER_OF_TRIES-1): print(query) print(r.text) time.sleep(DELAY*(i+1)) else: raise ValueError(r.text) def query_return_df(self, query, connection_timeout = 1500): data = self.query(query, connection_timeout) df = pd.read_csv(io.StringIO(data), sep = '\t') return df def query_with_data(self, query, content): content = content.encode('utf-8') r = requests.post(self.host, data=content) result = r.text if r.status_code == 200: return result else: raise ValueError(r.text) def test_and_check(name, a, b, t_stat, p_value): client = ClickHouseClient() client.query("DROP TABLE IF EXISTS ttest;") client.query("CREATE TABLE ttest (left Float64, right Float64) ENGINE = Memory;"); client.query("INSERT INTO ttest VALUES {};".format(", ".join(['({},{})'.format(i, j) for i,j in zip(a, b)]))) real = client.query_return_df( "SELECT roundBankers({}(left, right).1, 16) as t_stat, ".format(name) + "roundBankers({}(left, right).2, 16) as p_value ".format(name) + "FROM ttest FORMAT TabSeparatedWithNames;") real_t_stat = real['t_stat'][0] real_p_value = real['p_value'][0] assert(abs(real_t_stat - np.float64(t_stat) < 1e-2)), "clickhouse_t_stat {}, scipy_t_stat {}".format(real_t_stat, t_stat) assert(abs(real_p_value - np.float64(p_value)) < 1e-2), "clickhouse_p_value {}, scipy_p_value {}".format(real_p_value, p_value) client.query("DROP TABLE IF EXISTS ttest;") def test_student(): rvs1 = np.round(stats.norm.rvs(loc=1, scale=5,size=500), 5) rvs2 = np.round(stats.norm.rvs(loc=10, scale=5,size=500), 5) s, p = stats.ttest_ind(rvs1, rvs2, equal_var = True) test_and_check("studentTTest", rvs1, rvs2, s, p) rvs1 = np.round(stats.norm.rvs(loc=0, scale=5,size=500), 5) rvs2 = np.round(stats.norm.rvs(loc=0, scale=5,size=500), 5) s, p = stats.ttest_ind(rvs1, rvs2, equal_var = True) test_and_check("studentTTest", rvs1, rvs2, s, p) rvs1 = np.round(stats.norm.rvs(loc=0, scale=10,size=65536), 5) rvs2 = np.round(stats.norm.rvs(loc=5, scale=1,size=65536), 5) s, p = stats.ttest_ind(rvs1, rvs2, equal_var = True) test_and_check("studentTTest", rvs1, rvs2, s, p) def test_welch(): rvs1 = np.round(stats.norm.rvs(loc=1, scale=15,size=500), 5) rvs2 = np.round(stats.norm.rvs(loc=10, scale=5,size=500), 5) s, p = stats.ttest_ind(rvs1, rvs2, equal_var = True) test_and_check("studentTTest", rvs1, rvs2, s, p) rvs1 = np.round(stats.norm.rvs(loc=0, scale=7,size=500), 5) rvs2 = np.round(stats.norm.rvs(loc=0, scale=3,size=500), 5) s, p = stats.ttest_ind(rvs1, rvs2, equal_var = True) test_and_check("studentTTest", rvs1, rvs2, s, p) rvs1 = np.round(stats.norm.rvs(loc=0, scale=10,size=65536), 5) rvs2 = np.round(stats.norm.rvs(loc=5, scale=1,size=65536), 5) s, p = stats.ttest_ind(rvs1, rvs2, equal_var = True) test_and_check("studentTTest", rvs1, rvs2, s, p) if __name__ == "__main__": test_student() test_welch() print("Ok.")