support async generator, fix bugs

This commit is contained in:
serxa 2024-11-20 18:41:37 +00:00
parent 3e7174600a
commit 40c2098034
7 changed files with 58 additions and 20 deletions

View File

@ -57,10 +57,14 @@ export class MergeTree {
level: 0,
begin: this.inserted_parts_count,
end: this.inserted_parts_count + 1,
left_bytes: this.inserted_bytes,
right_bytes: this.inserted_bytes + bytes,
is_leftmost: false,
is_rightmost: false,
active: true,
merging: false,
idx: this.parts.length,
source_part_count: 1
source_part_count: 0
};
this.parts.push(result);
this.active_part_count++;
@ -127,6 +131,8 @@ export class MergeTree {
level: 1 + Math.max(...parts_to_merge.map(d => d.level)),
begin: Math.min(...parts_to_merge.map(d => d.begin)),
end: Math.max(...parts_to_merge.map(d => d.end)),
left_bytes: Math.min(...parts_to_merge.map(d => d.left_bytes)),
right_bytes: Math.max(...parts_to_merge.map(d => d.right_bytes)),
active: true,
merging: false,
idx: this.parts.length,
@ -136,11 +142,20 @@ export class MergeTree {
this.active_part_count++;
for (let p of parts_to_merge)
{
p.parent = idx;
if (p.active == false)
throw { message: "Merging inactive part", part: p};
// Update children parts
p.parent = idx;
p.parent_part = result;
p.active = false;
p.merging = false;
if (p.left_bytes == result.left_bytes)
p.is_leftmost = true;
if (p.right_bytes == result.right_bytes)
p.is_rightmost = true;
// Update metrics
this.merging_part_count--;
this.active_part_count--;
}

View File

@ -7,14 +7,18 @@ export class MergeTreeInserter
this.mt = mt; // MergeTree
this.inserter = inserter;
this.signals = signals;
this.#iterateInserter();
}
#iterateInserter()
async start()
{
await this.#iterateInserter();
}
async #iterateInserter()
{
while (true)
{
const { value, done } = this.inserter.next();
const { value, done } = await this.inserter.next();
if (done)
return; // No more inserts
switch (value.type)
@ -27,7 +31,7 @@ export class MergeTreeInserter
case 'sleep':
if (value.delay > 0)
{
this.sim.scheduleAt(this.sim.time + value.delay, "InserterSleep", () => this.#iterateInserter());
this.sim.scheduleAt(this.sim.time + value.delay, "InserterSleep", async () => await this.#iterateInserter());
return;
}
break;

View File

@ -9,19 +9,23 @@ export class MergeTreeMerger
this.selector = selector;
this.signals = signals;
this.merges_running = 0;
if (mt.active_part_count == 0) // Hack to start only after initial parts are inserted
this.sim.postpone("PostponeMergerInit", () => this.#iterateSelector());
else
this.#iterateSelector();
// TODO(serxa): if we are going to use more than 1 selector in parallel we should "subscribe" for pool availability to be able to run more merges when worker is release by another merger
}
#iterateSelector()
async start()
{
if (this.mt.active_part_count == 0) // Hack to start only after initial parts are inserted
this.sim.postpone("PostponeMergerInit", async () => await this.#iterateSelector());
else
await this.#iterateSelector();
}
async #iterateSelector()
{
let value_to_send = null;
loop: while (this.pool.isAvailable())
{
const { value, done } = this.selector.next(value_to_send);
const { value, done } = await this.selector.next(value_to_send);
value_to_send = null;
if (done)
return; // No more merges required
@ -41,7 +45,7 @@ export class MergeTreeMerger
throw { message: "Merge selector wait for zero merges. Run at least one or use 'sleep' instead" };
break loop; // No need to do anything iterateSelector() will be called on the end of any merge
case 'sleep':
this.sim.scheduleAt(this.sim.time + value.delay, "MergerSleep", () => this.#iterateSelector());
this.sim.scheduleAt(this.sim.time + value.delay, "MergerSleep", async () => await this.#iterateSelector());
return;
default:
throw { message: "Unknown merge selector yield type", value };

View File

@ -1,4 +1,6 @@
export function* clickHousePartsInserter({host, user, password, query, table, database, partition})
import { queryClickHouse } from './queryClickHouse.js';
export async function* clickHousePartsInserter({host, user, password, query, table, database, partition})
{
if (!query)
{
@ -12,13 +14,14 @@ export function* clickHousePartsInserter({host, user, password, query, table, da
query = `SELECT * FROM system.parts ${where} ORDER BY min_block_number`;
}
let rows = [];
queryClickHouse({
await queryClickHouse({
host,
user,
password,
query,
for_each_row: (data) => rows.push(data)
});
yield {type: 'sleep', delay: 0};
for (const row of rows)
yield {type: 'insert', bytes}; // TODO(serxa): pass modification_time
yield {type: 'insert', bytes: +row.bytes_on_disk}; // TODO(serxa): pass modification_time
}

View File

@ -1,6 +1,6 @@
let add_http_cors_header = (location.protocol != 'file:');
export async function queryClickHouse({host, user, password, query, is_stopping, for_each_row})
export async function queryClickHouse({host, user, password, query, is_stopping, for_each_row, on_error})
{
// Construct URL
let url = `${host}?default_format=JSONEachRow&enable_http_compression=1`
@ -12,13 +12,20 @@ export async function queryClickHouse({host, user, password, query, is_stopping,
if (password)
url += `&password=${encodeURIComponent(password)}`;
console.log("QUERY", query);
let response, reply, error;
try
{
// Send the query
response = await fetch(url, { method: "POST", body: query });
if (!response.ok)
throw new Error(`HTTP error. Status: ${response.status}`);
{
const reply = JSON.parse(await response.text());
const error = ('exception' in reply) ? reply.exception : reply.toString();
throw new Error(`HTTP Status: ${response.status}. Error: ${error}`);
}
// Initiate stream processing of response body
const reader = response.body.getReader();
@ -48,7 +55,8 @@ export async function queryClickHouse({host, user, password, query, is_stopping,
}
catch (e)
{
console.log(e);
error = e.toString();
console.log("CLICKHOUSE QUERY FAILED", e);
if (on_error)
on_error(e.toString());
}
}

View File

@ -38,6 +38,9 @@ export async function testMergeTreeInserter()
const inserter2 = new MergeTreeInserter(sim, mt, testInserter2());
const inserter3 = new MergeTreeInserter(sim, mt, testInserter3());
await inserter1.start();
await inserter2.start();
await inserter3.start();
await sim.run();
assert.deepEqual(mt.parts.map(d => d.bytes), [1, 42, 42, 42, 1, 13, 13, 2, 3, 4, 666]);

View File

@ -47,6 +47,7 @@ export async function testMergeTreeMerger()
}
const merger = new MergeTreeMerger(sim, mt, pool, testSelector());
await merger.start();
await sim.run();
assert.deepEqual(mt.parts.filter(d => d.active).map(d => d.bytes), [expected_bytes]);