dbms: Server: Script cleanup. [#METR-17276]

This commit is contained in:
Alexey Arno 2015-09-02 19:21:24 +03:00
parent 047f492011
commit 6c4cd4a0aa
2 changed files with 82 additions and 63 deletions

View File

@ -34,8 +34,8 @@ class UniqueRandomGenerator:
return self.prime - residue
# Создать таблицу содержащую уникальные значения.
def generate_data_source(host, port, http_port, begin, end, count):
chunk_size = round((end - begin) / float(count))
def generate_data_source(host, port, http_port, min_cardinality, max_cardinality, count):
chunk_size = round((max_cardinality - min_cardinality) / float(count))
used_values = 0
cur_count = 0
@ -46,11 +46,22 @@ def generate_data_source(host, port, http_port, begin, end, count):
n2 = random.randrange(0, sup)
urng = UniqueRandomGenerator(n1, n2)
is_first = True
with tempfile.TemporaryDirectory() as tmp_dir:
filename = tmp_dir + '/table.txt'
with open(filename, 'w+b') as file_handle:
while cur_count < count:
next_size += chunk_size
if is_first == True:
is_first = False
if min_cardinality != 0:
next_size = min_cardinality + 1
else:
next_size = chunk_size
else:
next_size += chunk_size
while used_values < next_size:
h = urng.next()
used_values = used_values + 1
@ -107,7 +118,7 @@ def generate_sample(raw_estimates, biases, n_samples):
min_card = raw_estimates[0]
max_card = raw_estimates[len(raw_estimates) - 1]
step = (max_card - min_card) / n_samples
step = (max_card - min_card) / (n_samples - 1)
for i in range(0, n_samples + 1):
x = min_card + i * step
@ -194,16 +205,16 @@ def generate_sample(raw_estimates, biases, n_samples):
return final_result
def dump_arrays(stats):
def dump_arrays(data):
print("Size of each array: {0}\n".format(len(stats)))
print("Size of each array: {0}\n".format(len(data)))
is_first = True
sep = ''
print("raw_estimates = ")
print("{")
for row in stats:
for row in data:
print("\t{0}{1}".format(sep, row[0]))
if is_first == True:
is_first = False
@ -215,7 +226,7 @@ def dump_arrays(stats):
print("\nbiases = ")
print("{")
for row in stats:
for row in data:
print("\t{0}{1}".format(sep, row[1]))
if is_first == True:
is_first = False
@ -228,8 +239,9 @@ def start():
parser.add_argument("-p", "--port", type=int, default=9000, help="ClickHouse server TCP port");
parser.add_argument("-t", "--http_port", type=int, default=8123, help="ClickHouse server HTTP port");
parser.add_argument("-i", "--iterations", type=int, default=5000, help="number of iterations");
parser.add_argument("-s", "--generated", type=int, default=700000, help="number of generated values");
parser.add_argument("-g", "--samples", type=int, default=200, help="number of sampled values");
parser.add_argument("-m", "--min_cardinality", type=int, default=16384, help="minimal cardinality");
parser.add_argument("-M", "--max_cardinality", type=int, default=655360, help="maximal cardinality");
parser.add_argument("-s", "--samples", type=int, default=200, help="number of sampled values");
args = parser.parse_args()
accumulated_data = []
@ -238,13 +250,13 @@ def start():
print(i + 1)
sys.stdout.flush()
generate_data_source(args.host, str(args.port), str(args.http_port), 0, args.generated, 1000)
generate_data_source(args.host, str(args.port), str(args.http_port), args.min_cardinality, args.max_cardinality, 1000)
response = perform_query(args.host, str(args.port))
data = parse_clickhouse_response(response)
accumulated_data = accumulate_data(accumulated_data, data)
result = generate_raw_result(accumulated_data, args.iterations)
sample = generate_sample(result[0], result[1], args.samples)
dump_arrays(sample)
sampled_data = generate_sample(result[0], result[1], args.samples)
dump_arrays(sampled_data)
if __name__ == "__main__": start()

View File

@ -9,6 +9,7 @@ import subprocess
import bisect
from copy import deepcopy
# Псевдослучайный генератор уникальных чисел.
# http://preshing.com/20121224/how-to-generate-a-sequence-of-unique-random-integers/
class UniqueRandomGenerator:
prime = 4294967291
@ -32,8 +33,9 @@ class UniqueRandomGenerator:
else:
return self.prime - residue
def generate_data_source(host, port, http_port, begin, end, count):
chunk_size = round((end - begin) / float(count))
# Создать таблицу содержащую уникальные значения.
def generate_data_source(host, port, http_port, min_cardinality, max_cardinality, count):
chunk_size = round((max_cardinality - (min_cardinality + 1)) / float(count))
used_values = 0
cur_count = 0
@ -44,30 +46,36 @@ def generate_data_source(host, port, http_port, begin, end, count):
n2 = random.randrange(0, sup)
urng = UniqueRandomGenerator(n1, n2)
is_first = True
with tempfile.TemporaryDirectory() as tmp_dir:
filename = tmp_dir + '/table.txt'
file_handle = open(filename, 'w+b')
with open(filename, 'w+b') as file_handle:
while cur_count < count:
while cur_count < count:
next_size += chunk_size
if is_first == True:
is_first = False
if min_cardinality != 0:
next_size = min_cardinality + 1
else:
next_size = chunk_size
else:
next_size += chunk_size
while used_values < next_size:
h = urng.next()
used_values = used_values + 1
outstr = str(h) + "\t" + str(cur_count) + "\n";
file_handle.write(bytes(outstr, 'UTF-8'));
while used_values < next_size:
h = urng.next()
used_values = used_values + 1
out = str(h) + "\t" + str(cur_count) + "\n";
file_handle.write(bytes(out, 'UTF-8'));
cur_count = cur_count + 1
cur_count = cur_count + 1
file_handle.close()
query = 'DROP TABLE IF EXISTS data_source'
query = "DROP TABLE IF EXISTS data_source"
subprocess.check_output(["clickhouse-client", "--host", host, "--port", str(port), "--query", query])
query = 'CREATE TABLE data_source(UserID UInt64, KeyID UInt64) ENGINE=TinyLog'
query = "CREATE TABLE data_source(UserID UInt64, KeyID UInt64) ENGINE=TinyLog"
subprocess.check_output(["clickhouse-client", "--host", host, "--port", str(port), "--query", query])
cat = subprocess.Popen(("cat", filename), stdout=subprocess.PIPE)
subprocess.check_output(("POST", "http://localhost:{0}/?query=INSERT INTO data_source FORMAT TabSeparated".format(http_port)), stdin=cat.stdout)
subprocess.check_output(("POST", "http://{0}:{1}/?query=INSERT INTO data_source FORMAT TabSeparated".format(host, http_port)), stdin=cat.stdout)
cat.wait()
def perform_query(host, port):
@ -78,66 +86,65 @@ def perform_query(host, port):
query += "FROM data_source GROUP BY KeyID"
return subprocess.check_output(["clickhouse-client", "--host", host, "--port", port, "--query", query])
def parse_clickhouse_response(output):
def parse_clickhouse_response(response):
parsed = []
lines = output.decode().split("\n")
lines = response.decode().split("\n")
for cur_line in lines:
rows = cur_line.split("\t")
if len(rows) == 4:
parsed.append([float(rows[0]), float(rows[1]), float(rows[2]), float(rows[3])])
return parsed
def accumulate_data(stats, data):
if not stats:
stats = deepcopy(data)
def accumulate_data(accumulated_data, data):
if not accumulated_data:
accumulated_data = deepcopy(data)
else:
for row1, row2 in zip(stats, data):
for row1, row2 in zip(accumulated_data, data):
row1[1] += row2[1];
row1[2] += row2[2];
row1[3] += row2[3];
return stats
return accumulated_data
def dump_graphs(stats, count):
fh1 = open("raw_graph.txt", "w+b")
fh2 = open("linear_counting_graph.txt", "w+b")
fh3 = open("bias_corrected_graph.txt", "w+b")
def dump_graphs(data, count):
with open("raw_graph.txt", "w+b") as fh1, open("linear_counting_graph.txt", "w+b") as fh2, open("bias_corrected_graph.txt", "w+b") as fh3:
expected_tab = []
bias_tab = []
for row in data:
exact = row[0]
raw = row[1] / count;
linear_counting = row[2] / count;
bias_corrected = row[3] / count;
expected_tab = []
bias_tab = []
for row in stats:
exact = row[0]
raw = row[1] / count;
linear_counting = row[2] / count;
bias_corrected = row[3] / count;
outstr = "{0}\t{1}\n".format(exact, abs(raw - exact) / exact)
fh1.write(bytes(outstr, 'UTF-8'))
outstr = "{0}\t{1}\n".format(exact, abs(raw - exact) / exact)
fh1.write(bytes(outstr, 'UTF-8'))
outstr = "{0}\t{1}\n".format(exact, abs(linear_counting - exact) / exact)
fh2.write(bytes(outstr, 'UTF-8'))
outstr = "{0}\t{1}\n".format(exact, abs(linear_counting - exact) / exact)
fh2.write(bytes(outstr, 'UTF-8'))
outstr = "{0}\t{1}\n".format(exact, abs(bias_corrected - exact) / exact)
fh3.write(bytes(outstr, 'UTF-8'))
outstr = "{0}\t{1}\n".format(exact, abs(bias_corrected - exact) / exact)
fh3.write(bytes(outstr, 'UTF-8'))
def start():
parser = argparse.ArgumentParser(description = "Generate bias correction tables.")
parser = argparse.ArgumentParser(description = "Generate graphs that help to determine the linear counting threshold.")
parser.add_argument("-x", "--host", default="127.0.0.1", help="clickhouse host name");
parser.add_argument("-p", "--port", type=int, default=9000, help="clickhouse client TCP port");
parser.add_argument("-t", "--http_port", type=int, default=8123, help="clickhouse HTTP port");
parser.add_argument("-i", "--iterations", type=int, default=5000, help="number of iterations");
parser.add_argument("-s", "--generated", type=int, default=700000, help="number of generated values");
parser.add_argument("-m", "--min_cardinality", type=int, default=16384, help="minimal cardinality");
parser.add_argument("-M", "--max_cardinality", type=int, default=655360, help="maximal cardinality");
args = parser.parse_args()
stats = []
accumulated_data = []
for i in range(0, args.iterations):
print(i + 1)
sys.stdout.flush()
generate_data_source(args.host, str(args.port), str(args.http_port), 0, args.generated, 1000)
output = perform_query(args.host, str(args.port))
data = parse_clickhouse_response(output)
stats = accumulate_data(stats, data)
dump_graphs(stats, args.iterations)
generate_data_source(args.host, str(args.port), str(args.http_port), args.min_cardinality, args.max_cardinality, 1000)
response = perform_query(args.host, str(args.port))
data = parse_clickhouse_response(response)
accumulated_data = accumulate_data(accumulated_data, data)
dump_graphs(accumulated_data, args.iterations)
if __name__ == "__main__": start()