mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 23:52:03 +00:00
263 lines
8.7 KiB
Python
Executable File
263 lines
8.7 KiB
Python
Executable File
#!/usr/bin/python3
|
|
# -*- coding: utf-8 -*-
|
|
|
|
import sys
|
|
import argparse
|
|
import tempfile
|
|
import random
|
|
import subprocess
|
|
import bisect
|
|
from copy import deepcopy
|
|
|
|
# Псевдослучайный генератор уникальных чисел.
|
|
# http://preshing.com/20121224/how-to-generate-a-sequence-of-unique-random-integers/
|
|
class UniqueRandomGenerator:
|
|
prime = 4294967291
|
|
|
|
def __init__(self, seed_base, seed_offset):
|
|
self.index = self.permutePQR(self.permutePQR(seed_base) + 0x682f0161)
|
|
self.intermediate_offset = self.permutePQR(self.permutePQR(seed_offset) + 0x46790905)
|
|
|
|
def next(self):
|
|
val = self.permutePQR((self.permutePQR(self.index) + self.intermediate_offset) ^ 0x5bf03635)
|
|
self.index = self.index + 1
|
|
return val
|
|
|
|
def permutePQR(self, x):
|
|
if x >=self.prime:
|
|
return x
|
|
else:
|
|
residue = (x * x) % self.prime
|
|
if x <= self.prime/2:
|
|
return residue
|
|
else:
|
|
return self.prime - residue
|
|
|
|
# Создать таблицу содержащую уникальные значения.
|
|
def generate_data_source(host, port, http_port, min_cardinality, max_cardinality, count):
|
|
chunk_size = round((max_cardinality - min_cardinality) / float(count))
|
|
used_values = 0
|
|
|
|
cur_count = 0
|
|
next_size = 0
|
|
|
|
sup = 32768
|
|
n1 = random.randrange(0, sup)
|
|
n2 = random.randrange(0, sup)
|
|
urng = UniqueRandomGenerator(n1, n2)
|
|
|
|
is_first = True
|
|
|
|
with tempfile.TemporaryDirectory() as tmp_dir:
|
|
filename = tmp_dir + '/table.txt'
|
|
with open(filename, 'w+b') as file_handle:
|
|
while cur_count < count:
|
|
|
|
if is_first == True:
|
|
is_first = False
|
|
if min_cardinality != 0:
|
|
next_size = min_cardinality + 1
|
|
else:
|
|
next_size = chunk_size
|
|
else:
|
|
next_size += chunk_size
|
|
|
|
while used_values < next_size:
|
|
h = urng.next()
|
|
used_values = used_values + 1
|
|
out = str(h) + "\t" + str(cur_count) + "\n";
|
|
file_handle.write(bytes(out, 'UTF-8'));
|
|
cur_count = cur_count + 1
|
|
|
|
query = "DROP TABLE IF EXISTS data_source"
|
|
subprocess.check_output(["clickhouse-client", "--host", host, "--port", str(port), "--query", query])
|
|
query = "CREATE TABLE data_source(UserID UInt64, KeyID UInt64) ENGINE=TinyLog"
|
|
subprocess.check_output(["clickhouse-client", "--host", host, "--port", str(port), "--query", query])
|
|
|
|
cat = subprocess.Popen(("cat", filename), stdout=subprocess.PIPE)
|
|
subprocess.check_output(("POST", "http://{0}:{1}/?query=INSERT INTO data_source FORMAT TabSeparated".format(host, http_port)), stdin=cat.stdout)
|
|
cat.wait()
|
|
|
|
def perform_query(host, port):
|
|
query = "SELECT runningAccumulate(uniqExactState(UserID)) AS exact, "
|
|
query += "runningAccumulate(uniqCombinedRawState(UserID)) AS approx "
|
|
query += "FROM data_source GROUP BY KeyID"
|
|
return subprocess.check_output(["clickhouse-client", "--host", host, "--port", port, "--query", query])
|
|
|
|
def parse_clickhouse_response(response):
|
|
parsed = []
|
|
lines = response.decode().split("\n")
|
|
for cur_line in lines:
|
|
rows = cur_line.split("\t")
|
|
if len(rows) == 2:
|
|
parsed.append([float(rows[0]), float(rows[1])])
|
|
return parsed
|
|
|
|
def accumulate_data(accumulated_data, data):
|
|
if not accumulated_data:
|
|
accumulated_data = deepcopy(data)
|
|
else:
|
|
for row1, row2 in zip(accumulated_data, data):
|
|
row1[1] += row2[1];
|
|
return accumulated_data
|
|
|
|
def generate_raw_result(accumulated_data, count):
|
|
expected_tab = []
|
|
bias_tab = []
|
|
for row in accumulated_data:
|
|
exact = row[0]
|
|
expected = row[1] / count
|
|
bias = expected - exact
|
|
|
|
expected_tab.append(expected)
|
|
bias_tab.append(bias)
|
|
return [ expected_tab, bias_tab ]
|
|
|
|
def generate_sample(raw_estimates, biases, n_samples):
|
|
result = []
|
|
|
|
min_card = raw_estimates[0]
|
|
max_card = raw_estimates[len(raw_estimates) - 1]
|
|
step = (max_card - min_card) / (n_samples - 1)
|
|
|
|
for i in range(0, n_samples + 1):
|
|
x = min_card + i * step
|
|
j = bisect.bisect_left(raw_estimates, x)
|
|
|
|
if j == len(raw_estimates):
|
|
result.append((raw_estimates[j - 1], biases[j - 1]))
|
|
elif raw_estimates[j] == x:
|
|
result.append((raw_estimates[j], biases[j]))
|
|
else:
|
|
# Найти 6 ближайших соседей. Вычислить среднее арифметическое.
|
|
|
|
# 6 точек слева x [j-6 j-5 j-4 j-3 j-2 j-1]
|
|
|
|
begin = max(j - 6, 0) - 1
|
|
end = j - 1
|
|
|
|
T = []
|
|
for k in range(end, begin, -1):
|
|
T.append(x - raw_estimates[k])
|
|
|
|
# 6 точек справа x [j j+1 j+2 j+3 j+4 j+5]
|
|
|
|
begin = j
|
|
end = min(j + 5, len(raw_estimates) - 1) + 1
|
|
|
|
U = []
|
|
for k in range(begin, end):
|
|
U.append(raw_estimates[k] - x)
|
|
|
|
# Сливаем расстояния.
|
|
|
|
V = []
|
|
|
|
lim = min(len(T), len(U))
|
|
k1 = 0
|
|
k2 = 0
|
|
|
|
while k1 < lim and k2 < lim:
|
|
if T[k1] == U[k2]:
|
|
V.append(j - k1 - 1)
|
|
V.append(j + k2)
|
|
k1 = k1 + 1
|
|
k2 = k2 + 1
|
|
elif T[k1] < U[k2]:
|
|
V.append(j - k1 - 1)
|
|
k1 = k1 + 1
|
|
else:
|
|
V.append(j + k2)
|
|
k2 = k2 + 1
|
|
|
|
if k1 < len(T):
|
|
while k1 < len(T):
|
|
V.append(j - k1 - 1)
|
|
k1 = k1 + 1
|
|
elif k2 < len(U):
|
|
while k2 < len(U):
|
|
V.append(j + k2)
|
|
k2 = k2 + 1
|
|
|
|
# Выбираем 6 ближайших точек.
|
|
# Вычисляем средние.
|
|
|
|
begin = 0
|
|
end = min(len(V), 6)
|
|
|
|
sum = 0
|
|
bias = 0
|
|
for k in range(begin, end):
|
|
sum += raw_estimates[V[k]]
|
|
bias += biases[V[k]]
|
|
sum /= float(end)
|
|
bias /= float(end)
|
|
|
|
result.append((sum, bias))
|
|
|
|
# Пропустить последовательные результаты, чьи оценки одинаковые.
|
|
final_result = []
|
|
last = -1
|
|
for entry in result:
|
|
if entry[0] != last:
|
|
final_result.append((entry[0], entry[1]))
|
|
last = entry[0]
|
|
|
|
return final_result
|
|
|
|
def dump_arrays(data):
|
|
|
|
print("Size of each array: {0}\n".format(len(data)))
|
|
|
|
is_first = True
|
|
sep = ''
|
|
|
|
print("raw_estimates = ")
|
|
print("{")
|
|
for row in data:
|
|
print("\t{0}{1}".format(sep, row[0]))
|
|
if is_first == True:
|
|
is_first = False
|
|
sep = ","
|
|
print("};")
|
|
|
|
is_first = True
|
|
sep = ""
|
|
|
|
print("\nbiases = ")
|
|
print("{")
|
|
for row in data:
|
|
print("\t{0}{1}".format(sep, row[1]))
|
|
if is_first == True:
|
|
is_first = False
|
|
sep = ","
|
|
print("};")
|
|
|
|
def start():
|
|
parser = argparse.ArgumentParser(description = "Generate bias correction tables for HyperLogLog-based functions.")
|
|
parser.add_argument("-x", "--host", default="localhost", help="ClickHouse server host name");
|
|
parser.add_argument("-p", "--port", type=int, default=9000, help="ClickHouse server TCP port");
|
|
parser.add_argument("-t", "--http_port", type=int, default=8123, help="ClickHouse server HTTP port");
|
|
parser.add_argument("-i", "--iterations", type=int, default=5000, help="number of iterations");
|
|
parser.add_argument("-m", "--min_cardinality", type=int, default=16384, help="minimal cardinality");
|
|
parser.add_argument("-M", "--max_cardinality", type=int, default=655360, help="maximal cardinality");
|
|
parser.add_argument("-s", "--samples", type=int, default=200, help="number of sampled values");
|
|
args = parser.parse_args()
|
|
|
|
accumulated_data = []
|
|
|
|
for i in range(0, args.iterations):
|
|
print(i + 1)
|
|
sys.stdout.flush()
|
|
|
|
generate_data_source(args.host, str(args.port), str(args.http_port), args.min_cardinality, args.max_cardinality, 1000)
|
|
response = perform_query(args.host, str(args.port))
|
|
data = parse_clickhouse_response(response)
|
|
accumulated_data = accumulate_data(accumulated_data, data)
|
|
|
|
result = generate_raw_result(accumulated_data, args.iterations)
|
|
sampled_data = generate_sample(result[0], result[1], args.samples)
|
|
dump_arrays(sampled_data)
|
|
|
|
if __name__ == "__main__": start()
|