ClickHouse/tests/queries/0_stateless/02158_proportions_ztest_cmp.python
2022-03-16 13:17:07 +01:00

109 lines
5.8 KiB
Python

#!/usr/bin/env python3
import os
import sys
from math import sqrt, nan
from random import randrange
from scipy import stats
import pandas as pd
import numpy as np
CURDIR = os.path.dirname(os.path.realpath(__file__))
sys.path.insert(0, os.path.join(CURDIR, 'helpers'))
from pure_http_client import ClickHouseClient
# unpooled variance z-test for proportions of two samples
def twosample_proportion_ztest(s1, s2, t1, t2, alpha):
if s1 == 0 or s2 == 0 or s1 > t1 or s2 > t2 or t1 + t2 == 0:
return nan, nan, nan, nan
p1 = s1 / t1
p2 = s2 / t2
se = sqrt(p1 * (1 - p1) / t1 + p2 * (1 - p2) / t2)
if se == 0:
return nan, nan, nan, nan
z_stat = (p1 - p2) / se
one_side = 1 - stats.norm.cdf(abs(z_stat))
p_value = one_side * 2
z = stats.norm.ppf(1 - 0.5 * alpha)
ci_lower = (p1 - p2) - z * se
ci_upper = (p1 - p2) + z * se
return z_stat, p_value, ci_lower, ci_upper
def test_and_check(name, z_stat, p_value, ci_lower, ci_upper, precision=1e-2):
client = ClickHouseClient()
real = client.query_return_df(
"SELECT roundBankers({}.1, 16) as z_stat, ".format(name) +
"roundBankers({}.2, 16) as p_value, ".format(name) +
"roundBankers({}.3, 16) as ci_lower, ".format(name) +
"roundBankers({}.4, 16) as ci_upper ".format(name) +
"FORMAT TabSeparatedWithNames;")
real_z_stat = real['z_stat'][0]
real_p_value = real['p_value'][0]
real_ci_lower = real['ci_lower'][0]
real_ci_upper = real['ci_upper'][0]
assert((np.isnan(real_z_stat) and np.isnan(z_stat)) or abs(real_z_stat - np.float64(z_stat)) < precision), "clickhouse_z_stat {}, py_z_stat {}".format(real_z_stat, z_stat)
assert((np.isnan(real_p_value) and np.isnan(p_value)) or abs(real_p_value - np.float64(p_value)) < precision), "clickhouse_p_value {}, py_p_value {}".format(real_p_value, p_value)
assert((np.isnan(real_ci_lower) and np.isnan(ci_lower)) or abs(real_ci_lower - np.float64(ci_lower)) < precision), "clickhouse_ci_lower {}, py_ci_lower {}".format(real_ci_lower, ci_lower)
assert((np.isnan(real_ci_upper) and np.isnan(ci_upper)) or abs(real_ci_upper - np.float64(ci_upper)) < precision), "clickhouse_ci_upper {}, py_ci_upper {}".format(real_ci_upper, ci_upper)
def test_mean_ztest():
counts = [0, 0]
nobs = [0, 0]
z_stat, p_value, ci_lower, ci_upper = twosample_proportion_ztest(counts[0], counts[1], nobs[0], nobs[1], 0.05)
test_and_check("proportionsZTest(%d, %d, %d, %d, 0.95, 'unpooled')" % (counts[0], counts[1], nobs[0], nobs[1]), z_stat, p_value, ci_lower, ci_upper)
z_stat, p_value, ci_lower, ci_upper = twosample_proportion_ztest(10, 10, 10, 10, 0.05)
counts = [10, 10]
nobs = [10, 10]
z_stat, p_value, ci_lower, ci_upper = twosample_proportion_ztest(counts[0], counts[1], nobs[0], nobs[1], 0.05)
test_and_check("proportionsZTest(%d, %d, %d, %d, 0.95, 'unpooled')" % (counts[0], counts[1], nobs[0], nobs[1]), z_stat, p_value, ci_lower, ci_upper)
z_stat, p_value, ci_lower, ci_upper = twosample_proportion_ztest(10, 10, 10, 10, 0.05)
counts = [16, 16]
nobs = [16, 18]
z_stat, p_value, ci_lower, ci_upper = twosample_proportion_ztest(counts[0], counts[1], nobs[0], nobs[1], 0.05)
test_and_check("proportionsZTest(%d, %d, %d, %d, 0.95, 'unpooled')" % (counts[0], counts[1], nobs[0], nobs[1]), z_stat, p_value, ci_lower, ci_upper)
counts = [10, 20]
nobs = [30, 40]
z_stat, p_value, ci_lower, ci_upper = twosample_proportion_ztest(counts[0], counts[1], nobs[0], nobs[1], 0.05)
test_and_check("proportionsZTest(%d, %d, %d, %d, 0.95, 'unpooled')" % (counts[0], counts[1], nobs[0], nobs[1]), z_stat, p_value, ci_lower, ci_upper)
counts = [20, 10]
nobs = [40, 30]
z_stat, p_value, ci_lower, ci_upper = twosample_proportion_ztest(counts[0], counts[1], nobs[0], nobs[1], 0.05)
test_and_check("proportionsZTest(%d, %d, %d, %d, 0.95, 'unpooled')" % (counts[0], counts[1], nobs[0], nobs[1]), z_stat, p_value, ci_lower, ci_upper)
counts = [randrange(10,20), randrange(10,20)]
nobs = [randrange(counts[0] + 1, counts[0] * 2), randrange(counts[1], counts[1] * 2)]
z_stat, p_value, ci_lower, ci_upper = twosample_proportion_ztest(counts[0], counts[1], nobs[0], nobs[1], 0.05)
test_and_check("proportionsZTest(%d, %d, %d, %d, 0.95, 'unpooled')" % (counts[0], counts[1], nobs[0], nobs[1]), z_stat, p_value, ci_lower, ci_upper)
counts = [randrange(1,100), randrange(1,200)]
nobs = [randrange(counts[0], counts[0] * 2), randrange(counts[1], counts[1] * 3)]
z_stat, p_value, ci_lower, ci_upper = twosample_proportion_ztest(counts[0], counts[1], nobs[0], nobs[1], 0.05)
test_and_check("proportionsZTest(%d, %d, %d, %d, 0.95, 'unpooled')" % (counts[0], counts[1], nobs[0], nobs[1]), z_stat, p_value, ci_lower, ci_upper)
counts = [randrange(1,200), randrange(1,100)]
nobs = [randrange(counts[0], counts[0] * 3), randrange(counts[1], counts[1] * 2)]
z_stat, p_value, ci_lower, ci_upper = twosample_proportion_ztest(counts[0], counts[1], nobs[0], nobs[1], 0.05)
test_and_check("proportionsZTest(%d, %d, %d, %d, 0.95, 'unpooled')" % (counts[0], counts[1], nobs[0], nobs[1]), z_stat, p_value, ci_lower, ci_upper)
counts = [randrange(1,1000), randrange(1,1000)]
nobs = [randrange(counts[0], counts[0] * 2), randrange(counts[1], counts[1] * 2)]
z_stat, p_value, ci_lower, ci_upper = twosample_proportion_ztest(counts[0], counts[1], nobs[0], nobs[1], 0.05)
test_and_check("proportionsZTest(%d, %d, %d, %d, 0.95, 'unpooled')" % (counts[0], counts[1], nobs[0], nobs[1]), z_stat, p_value, ci_lower, ci_upper)
if __name__ == "__main__":
test_mean_ztest()
print("Ok.")