#!/usr/bin/env python3 import os import sys from math import sqrt, nan from random import randrange from scipy import stats import pandas as pd import numpy as np CURDIR = os.path.dirname(os.path.realpath(__file__)) sys.path.insert(0, os.path.join(CURDIR, "helpers")) from pure_http_client import ClickHouseClient # unpooled variance z-test for proportions of two samples def twosample_proportion_ztest(s1, s2, t1, t2, alpha): if s1 == 0 or s2 == 0 or s1 > t1 or s2 > t2 or t1 + t2 == 0: return nan, nan, nan, nan p1 = s1 / t1 p2 = s2 / t2 se = sqrt(p1 * (1 - p1) / t1 + p2 * (1 - p2) / t2) if se == 0: return nan, nan, nan, nan z_stat = (p1 - p2) / se one_side = 1 - stats.norm.cdf(abs(z_stat)) p_value = one_side * 2 z = stats.norm.ppf(1 - 0.5 * alpha) ci_lower = (p1 - p2) - z * se ci_upper = (p1 - p2) + z * se return z_stat, p_value, ci_lower, ci_upper def test_and_check(name, z_stat, p_value, ci_lower, ci_upper, precision=1e-2): client = ClickHouseClient() real = client.query_return_df( "SELECT roundBankers({}.1, 16) as z_stat, ".format(name) + "roundBankers({}.2, 16) as p_value, ".format(name) + "roundBankers({}.3, 16) as ci_lower, ".format(name) + "roundBankers({}.4, 16) as ci_upper ".format(name) + "FORMAT TabSeparatedWithNames;" ) real_z_stat = real["z_stat"][0] real_p_value = real["p_value"][0] real_ci_lower = real["ci_lower"][0] real_ci_upper = real["ci_upper"][0] assert (np.isnan(real_z_stat) and np.isnan(z_stat)) or abs( real_z_stat - np.float64(z_stat) ) < precision, "clickhouse_z_stat {}, py_z_stat {}".format(real_z_stat, z_stat) assert (np.isnan(real_p_value) and np.isnan(p_value)) or abs( real_p_value - np.float64(p_value) ) < precision, "clickhouse_p_value {}, py_p_value {}".format(real_p_value, p_value) assert (np.isnan(real_ci_lower) and np.isnan(ci_lower)) or abs( real_ci_lower - np.float64(ci_lower) ) < precision, "clickhouse_ci_lower {}, py_ci_lower {}".format( real_ci_lower, ci_lower ) assert (np.isnan(real_ci_upper) and np.isnan(ci_upper)) or abs( real_ci_upper - np.float64(ci_upper) ) < precision, "clickhouse_ci_upper {}, py_ci_upper {}".format( real_ci_upper, ci_upper ) def test_mean_ztest(): counts = [0, 0] nobs = [0, 0] z_stat, p_value, ci_lower, ci_upper = twosample_proportion_ztest( counts[0], counts[1], nobs[0], nobs[1], 0.05 ) test_and_check( "proportionsZTest(%d, %d, %d, %d, 0.95, 'unpooled')" % (counts[0], counts[1], nobs[0], nobs[1]), z_stat, p_value, ci_lower, ci_upper, ) z_stat, p_value, ci_lower, ci_upper = twosample_proportion_ztest( 10, 10, 10, 10, 0.05 ) counts = [10, 10] nobs = [10, 10] z_stat, p_value, ci_lower, ci_upper = twosample_proportion_ztest( counts[0], counts[1], nobs[0], nobs[1], 0.05 ) test_and_check( "proportionsZTest(%d, %d, %d, %d, 0.95, 'unpooled')" % (counts[0], counts[1], nobs[0], nobs[1]), z_stat, p_value, ci_lower, ci_upper, ) z_stat, p_value, ci_lower, ci_upper = twosample_proportion_ztest( 10, 10, 10, 10, 0.05 ) counts = [16, 16] nobs = [16, 18] z_stat, p_value, ci_lower, ci_upper = twosample_proportion_ztest( counts[0], counts[1], nobs[0], nobs[1], 0.05 ) test_and_check( "proportionsZTest(%d, %d, %d, %d, 0.95, 'unpooled')" % (counts[0], counts[1], nobs[0], nobs[1]), z_stat, p_value, ci_lower, ci_upper, ) counts = [10, 20] nobs = [30, 40] z_stat, p_value, ci_lower, ci_upper = twosample_proportion_ztest( counts[0], counts[1], nobs[0], nobs[1], 0.05 ) test_and_check( "proportionsZTest(%d, %d, %d, %d, 0.95, 'unpooled')" % (counts[0], counts[1], nobs[0], nobs[1]), z_stat, p_value, ci_lower, ci_upper, ) counts = [20, 10] nobs = [40, 30] z_stat, p_value, ci_lower, ci_upper = twosample_proportion_ztest( counts[0], counts[1], nobs[0], nobs[1], 0.05 ) test_and_check( "proportionsZTest(%d, %d, %d, %d, 0.95, 'unpooled')" % (counts[0], counts[1], nobs[0], nobs[1]), z_stat, p_value, ci_lower, ci_upper, ) counts = [randrange(10, 20), randrange(10, 20)] nobs = [ randrange(counts[0] + 1, counts[0] * 2), randrange(counts[1], counts[1] * 2), ] z_stat, p_value, ci_lower, ci_upper = twosample_proportion_ztest( counts[0], counts[1], nobs[0], nobs[1], 0.05 ) test_and_check( "proportionsZTest(%d, %d, %d, %d, 0.95, 'unpooled')" % (counts[0], counts[1], nobs[0], nobs[1]), z_stat, p_value, ci_lower, ci_upper, ) counts = [randrange(1, 100), randrange(1, 200)] nobs = [randrange(counts[0], counts[0] * 2), randrange(counts[1], counts[1] * 3)] z_stat, p_value, ci_lower, ci_upper = twosample_proportion_ztest( counts[0], counts[1], nobs[0], nobs[1], 0.05 ) test_and_check( "proportionsZTest(%d, %d, %d, %d, 0.95, 'unpooled')" % (counts[0], counts[1], nobs[0], nobs[1]), z_stat, p_value, ci_lower, ci_upper, ) counts = [randrange(1, 200), randrange(1, 100)] nobs = [randrange(counts[0], counts[0] * 3), randrange(counts[1], counts[1] * 2)] z_stat, p_value, ci_lower, ci_upper = twosample_proportion_ztest( counts[0], counts[1], nobs[0], nobs[1], 0.05 ) test_and_check( "proportionsZTest(%d, %d, %d, %d, 0.95, 'unpooled')" % (counts[0], counts[1], nobs[0], nobs[1]), z_stat, p_value, ci_lower, ci_upper, ) counts = [randrange(1, 1000), randrange(1, 1000)] nobs = [randrange(counts[0], counts[0] * 2), randrange(counts[1], counts[1] * 2)] z_stat, p_value, ci_lower, ci_upper = twosample_proportion_ztest( counts[0], counts[1], nobs[0], nobs[1], 0.05 ) test_and_check( "proportionsZTest(%d, %d, %d, %d, 0.95, 'unpooled')" % (counts[0], counts[1], nobs[0], nobs[1]), z_stat, p_value, ci_lower, ci_upper, ) if __name__ == "__main__": test_mean_ztest() print("Ok.")