#!/usr/bin/env python3 import os import sys from math import sqrt, nan from random import randrange from scipy import stats import pandas as pd import numpy as np CURDIR = os.path.dirname(os.path.realpath(__file__)) sys.path.insert(0, os.path.join(CURDIR, 'helpers')) from pure_http_client import ClickHouseClient # unpooled variance z-test for proportions of two samples def twosample_proportion_ztest(s1, s2, t1, t2, alpha): if s1 == 0 or s2 == 0 or s1 > t1 or s2 > t2 or t1 + t2 == 0: return nan, nan, nan, nan p1 = s1 / t1 p2 = s2 / t2 se = sqrt(p1 * (1 - p1) / t1 + p2 * (1 - p2) / t2) if se == 0: return nan, nan, nan, nan z_stat = (p1 - p2) / se one_side = 1 - stats.norm.cdf(abs(z_stat)) p_value = one_side * 2 z = stats.norm.ppf(1 - 0.5 * alpha) ci_lower = (p1 - p2) - z * se ci_upper = (p1 - p2) + z * se return z_stat, p_value, ci_lower, ci_upper def test_and_check(name, z_stat, p_value, ci_lower, ci_upper, precision=1e-2): client = ClickHouseClient() real = client.query_return_df( "SELECT roundBankers({}.1, 16) as z_stat, ".format(name) + "roundBankers({}.2, 16) as p_value, ".format(name) + "roundBankers({}.3, 16) as ci_lower, ".format(name) + "roundBankers({}.4, 16) as ci_upper ".format(name) + "FORMAT TabSeparatedWithNames;") real_z_stat = real['z_stat'][0] real_p_value = real['p_value'][0] real_ci_lower = real['ci_lower'][0] real_ci_upper = real['ci_upper'][0] assert((np.isnan(real_z_stat) and np.isnan(z_stat)) or abs(real_z_stat - np.float64(z_stat)) < precision), "clickhouse_z_stat {}, py_z_stat {}".format(real_z_stat, z_stat) assert((np.isnan(real_p_value) and np.isnan(p_value)) or abs(real_p_value - np.float64(p_value)) < precision), "clickhouse_p_value {}, py_p_value {}".format(real_p_value, p_value) assert((np.isnan(real_ci_lower) and np.isnan(ci_lower)) or abs(real_ci_lower - np.float64(ci_lower)) < precision), "clickhouse_ci_lower {}, py_ci_lower {}".format(real_ci_lower, ci_lower) assert((np.isnan(real_ci_upper) and np.isnan(ci_upper)) or abs(real_ci_upper - np.float64(ci_upper)) < precision), "clickhouse_ci_upper {}, py_ci_upper {}".format(real_ci_upper, ci_upper) def test_mean_ztest(): counts = [0, 0] nobs = [0, 0] z_stat, p_value, ci_lower, ci_upper = twosample_proportion_ztest(counts[0], counts[1], nobs[0], nobs[1], 0.05) test_and_check("proportionsZTest(%d, %d, %d, %d, 0.95, 'unpooled')" % (counts[0], counts[1], nobs[0], nobs[1]), z_stat, p_value, ci_lower, ci_upper) z_stat, p_value, ci_lower, ci_upper = twosample_proportion_ztest(10, 10, 10, 10, 0.05) counts = [10, 10] nobs = [10, 10] z_stat, p_value, ci_lower, ci_upper = twosample_proportion_ztest(counts[0], counts[1], nobs[0], nobs[1], 0.05) test_and_check("proportionsZTest(%d, %d, %d, %d, 0.95, 'unpooled')" % (counts[0], counts[1], nobs[0], nobs[1]), z_stat, p_value, ci_lower, ci_upper) z_stat, p_value, ci_lower, ci_upper = twosample_proportion_ztest(10, 10, 10, 10, 0.05) counts = [16, 16] nobs = [16, 18] z_stat, p_value, ci_lower, ci_upper = twosample_proportion_ztest(counts[0], counts[1], nobs[0], nobs[1], 0.05) test_and_check("proportionsZTest(%d, %d, %d, %d, 0.95, 'unpooled')" % (counts[0], counts[1], nobs[0], nobs[1]), z_stat, p_value, ci_lower, ci_upper) counts = [10, 20] nobs = [30, 40] z_stat, p_value, ci_lower, ci_upper = twosample_proportion_ztest(counts[0], counts[1], nobs[0], nobs[1], 0.05) test_and_check("proportionsZTest(%d, %d, %d, %d, 0.95, 'unpooled')" % (counts[0], counts[1], nobs[0], nobs[1]), z_stat, p_value, ci_lower, ci_upper) counts = [20, 10] nobs = [40, 30] z_stat, p_value, ci_lower, ci_upper = twosample_proportion_ztest(counts[0], counts[1], nobs[0], nobs[1], 0.05) test_and_check("proportionsZTest(%d, %d, %d, %d, 0.95, 'unpooled')" % (counts[0], counts[1], nobs[0], nobs[1]), z_stat, p_value, ci_lower, ci_upper) counts = [randrange(10,20), randrange(10,20)] nobs = [randrange(counts[0] + 1, counts[0] * 2), randrange(counts[1], counts[1] * 2)] z_stat, p_value, ci_lower, ci_upper = twosample_proportion_ztest(counts[0], counts[1], nobs[0], nobs[1], 0.05) test_and_check("proportionsZTest(%d, %d, %d, %d, 0.95, 'unpooled')" % (counts[0], counts[1], nobs[0], nobs[1]), z_stat, p_value, ci_lower, ci_upper) counts = [randrange(1,100), randrange(1,200)] nobs = [randrange(counts[0], counts[0] * 2), randrange(counts[1], counts[1] * 3)] z_stat, p_value, ci_lower, ci_upper = twosample_proportion_ztest(counts[0], counts[1], nobs[0], nobs[1], 0.05) test_and_check("proportionsZTest(%d, %d, %d, %d, 0.95, 'unpooled')" % (counts[0], counts[1], nobs[0], nobs[1]), z_stat, p_value, ci_lower, ci_upper) counts = [randrange(1,200), randrange(1,100)] nobs = [randrange(counts[0], counts[0] * 3), randrange(counts[1], counts[1] * 2)] z_stat, p_value, ci_lower, ci_upper = twosample_proportion_ztest(counts[0], counts[1], nobs[0], nobs[1], 0.05) test_and_check("proportionsZTest(%d, %d, %d, %d, 0.95, 'unpooled')" % (counts[0], counts[1], nobs[0], nobs[1]), z_stat, p_value, ci_lower, ci_upper) counts = [randrange(1,1000), randrange(1,1000)] nobs = [randrange(counts[0], counts[0] * 2), randrange(counts[1], counts[1] * 2)] z_stat, p_value, ci_lower, ci_upper = twosample_proportion_ztest(counts[0], counts[1], nobs[0], nobs[1], 0.05) test_and_check("proportionsZTest(%d, %d, %d, %d, 0.95, 'unpooled')" % (counts[0], counts[1], nobs[0], nobs[1]), z_stat, p_value, ci_lower, ci_upper) if __name__ == "__main__": test_mean_ztest() print("Ok.")