* Adding JSONEachRowWithProgress format

that can be used with WATCH queries when HTTP connection is used
  in this case progress events are used as a heartbeat
* Adding new tests that use JSONEachRowWithProgress format
* Adding tests for WATCH query heartbeats when HTTP connection is used
* Small fixes to uexpect.py
This commit is contained in:
Vitaliy Zakaznikov 2019-06-07 09:05:14 -04:00
parent b65ff910b6
commit aa3ef47aab
14 changed files with 245 additions and 5 deletions

View File

@ -113,6 +113,7 @@ void registerInputFormatTSKV(FormatFactory & factory);
void registerOutputFormatTSKV(FormatFactory & factory);
void registerInputFormatJSONEachRow(FormatFactory & factory);
void registerOutputFormatJSONEachRow(FormatFactory & factory);
void registerOutputFormatJSONEachRowWithProgress(FormatFactory & factory);
void registerInputFormatParquet(FormatFactory & factory);
void registerOutputFormatParquet(FormatFactory & factory);
void registerInputFormatProtobuf(FormatFactory & factory);
@ -153,6 +154,7 @@ FormatFactory::FormatFactory()
registerOutputFormatTSKV(*this);
registerInputFormatJSONEachRow(*this);
registerOutputFormatJSONEachRow(*this);
registerOutputFormatJSONEachRowWithProgress(*this);
registerInputFormatProtobuf(*this);
registerOutputFormatProtobuf(*this);
registerInputFormatCapnProto(*this);

View File

@ -27,7 +27,7 @@ public:
ostr.next();
}
private:
protected:
WriteBuffer & ostr;
size_t field_number = 0;
Names fields;

View File

@ -0,0 +1,47 @@
#include <IO/WriteHelpers.h>
#include <IO/WriteBufferValidUTF8.h>
#include <Formats/JSONEachRowWithProgressRowOutputStream.h>
#include <Formats/FormatFactory.h>
#include <Formats/BlockOutputStreamFromRowOutputStream.h>
namespace DB
{
void JSONEachRowWithProgressRowOutputStream::writeRowStartDelimiter()
{
writeCString("{\"row\":{", ostr);
}
void JSONEachRowWithProgressRowOutputStream::writeRowEndDelimiter()
{
writeCString("}}\n", ostr);
field_number = 0;
}
void JSONEachRowWithProgressRowOutputStream::onProgress(const Progress & value)
{
progress.incrementPiecewiseAtomically(value);
writeCString("{\"progress\":", ostr);
progress.writeJSON(ostr);
writeCString("}\n", ostr);
}
void registerOutputFormatJSONEachRowWithProgress(FormatFactory & factory)
{
factory.registerOutputFormat("JSONEachRowWithProgress", [](
WriteBuffer & buf,
const Block & sample,
const Context &,
const FormatSettings & format_settings)
{
return std::make_shared<BlockOutputStreamFromRowOutputStream>(
std::make_shared<JSONEachRowWithProgressRowOutputStream>(buf, sample, format_settings), sample);
});
}
}

View File

@ -0,0 +1,27 @@
#pragma once
#include <IO/Progress.h>
#include <Formats/JSONEachRowRowOutputStream.h>
namespace DB
{
/** The stream for outputting data in JSON format, by object per line
* that includes progress rows. Does not validate UTF-8.
*/
class JSONEachRowWithProgressRowOutputStream : public JSONEachRowRowOutputStream
{
public:
using JSONEachRowRowOutputStream::JSONEachRowRowOutputStream;
void writeRowStartDelimiter() override;
void writeRowEndDelimiter() override;
void onProgress(const Progress & value) override;
private:
Progress progress;
};
}

View File

@ -9,7 +9,7 @@ CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple();
CREATE TEMPORARY LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt;
SHOW TABLES LIKE 'lv';
SELECT sleep(1);
SELECT sleep(2);
SHOW TABLES LIKE 'lv';
DROP TABLE test.mt;

View File

@ -0,0 +1,5 @@
lv
{"row":{"a":1}}
{"row":{"a":2}}
{"row":{"a":3}}
{"progress":{"read_rows":"3","read_bytes":"36","written_rows":"0","written_bytes":"0","total_rows_to_read":"0"}}

View File

@ -0,0 +1,14 @@
DROP TABLE IF EXISTS test.lv;
DROP TABLE IF EXISTS test.mt;
CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple();
CREATE LIVE VIEW test.lv AS SELECT * FROM test.mt;
SHOW TABLES LIKE 'lv';
INSERT INTO test.mt VALUES (1),(2),(3);
SELECT * FROM test.lv FORMAT JSONEachRowWithProgress;
DROP TABLE test.lv;
DROP TABLE test.mt;

View File

@ -0,0 +1,7 @@
lv
{"row":{"sum(a)":"0","_version":"1"}}
{"progress":{"read_rows":"1","read_bytes":"16","written_rows":"0","written_bytes":"0","total_rows_to_read":"0"}}
{"row":{"sum(a)":"6","_version":"2"}}
{"progress":{"read_rows":"1","read_bytes":"16","written_rows":"0","written_bytes":"0","total_rows_to_read":"0"}}
{"row":{"sum(a)":"21","_version":"3"}}
{"progress":{"read_rows":"1","read_bytes":"16","written_rows":"0","written_bytes":"0","total_rows_to_read":"0"}}

View File

@ -0,0 +1,20 @@
DROP TABLE IF EXISTS test.lv;
DROP TABLE IF EXISTS test.mt;
CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple();
CREATE LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt;
SHOW TABLES LIKE 'lv';
WATCH test.lv LIMIT 0 FORMAT JSONEachRowWithProgress;
INSERT INTO test.mt VALUES (1),(2),(3);
WATCH test.lv LIMIT 0 FORMAT JSONEachRowWithProgress;
INSERT INTO test.mt VALUES (4),(5),(6);
WATCH test.lv LIMIT 0 FORMAT JSONEachRowWithProgress;
DROP TABLE test.lv;
DROP TABLE test.mt;

View File

@ -0,0 +1,57 @@
#!/usr/bin/env python
import imp
import os
import sys
import signal
CURDIR = os.path.dirname(os.path.realpath(__file__))
uexpect = imp.load_source('uexpect', os.path.join(CURDIR, 'uexpect.py'))
def client(name='', command=None):
if command is None:
client = uexpect.spawn(os.environ.get('CLICKHOUSE_CLIENT'))
else:
client = uexpect.spawn(command)
client.eol('\r')
# Note: uncomment this line for debugging
#client.logger(sys.stdout, prefix=name)
client.timeout(2)
return client
prompt = ':\) '
end_of_block = r'.*\xe2\x94\x82\r\n.*\xe2\x94\x98\r\n'
client1 = client('client1>')
client1.expect(prompt)
client1.send('DROP TABLE IF EXISTS test.lv')
client1.expect(prompt)
client1.send(' DROP TABLE IF EXISTS test.mt')
client1.expect(prompt)
client1.send('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()')
client1.expect(prompt)
client1.send('CREATE LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt')
client1.expect(prompt)
client2 = client('client2>', ['bash', '--noediting'])
client2.expect('\$ ')
client2.send('wget -O- -q "http://localhost:8123/?live_view_heartbeat_interval=1&query=WATCH test.lv EVENTS FORMAT JSONEachRowWithProgress"')
client2.expect('{"progress":{"read_rows":"1","read_bytes":"49","written_rows":"0","written_bytes":"0","total_rows_to_read":"0"}}\r\n', escape=True)
client2.expect('{"row":{"version":"1","hash":"c9d39b11cce79112219a73aaa319b475"}}', escape=True)
client2.expect('{"progress":{"read_rows":"1","read_bytes":"49","written_rows":"0","written_bytes":"0","total_rows_to_read":"0"}}', escape=True)
# heartbeat is provided by progress message
client2.expect('{"progress":{"read_rows":"1","read_bytes":"49","written_rows":"0","written_bytes":"0","total_rows_to_read":"0"}}', escape=True)
client1.send('INSERT INTO test.mt VALUES (1),(2),(3)')
client1.expect(prompt)
client2.expect('{"row":{"version":"2","hash":"4cd0592103888d4682de9a32a23602e3"}}\r\n', escape=True)
client2.expect('.*2\t.*\r\n')
## send Ctrl-C
os.kill(client2.process.pid,signal.SIGINT)
client1.send('DROP TABLE test.lv')
client1.expect(prompt)
client1.send('DROP TABLE test.mt')
client1.expect(prompt)

View File

@ -0,0 +1,57 @@
#!/usr/bin/env python
import imp
import os
import sys
import signal
CURDIR = os.path.dirname(os.path.realpath(__file__))
uexpect = imp.load_source('uexpect', os.path.join(CURDIR, 'uexpect.py'))
def client(name='', command=None):
if command is None:
client = uexpect.spawn(os.environ.get('CLICKHOUSE_CLIENT'))
else:
client = uexpect.spawn(command)
client.eol('\r')
# Note: uncomment this line for debugging
#client.logger(sys.stdout, prefix=name)
client.timeout(2)
return client
prompt = ':\) '
end_of_block = r'.*\xe2\x94\x82\r\n.*\xe2\x94\x98\r\n'
client1 = client('client1>')
client1.expect(prompt)
client1.send('DROP TABLE IF EXISTS test.lv')
client1.expect(prompt)
client1.send(' DROP TABLE IF EXISTS test.mt')
client1.expect(prompt)
client1.send('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()')
client1.expect(prompt)
client1.send('CREATE LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt')
client1.expect(prompt)
client2 = client('client2>', ['bash', '--noediting'])
client2.expect('\$ ')
client2.send('wget -O- -q "http://localhost:8123/?live_view_heartbeat_interval=1&query=WATCH test.lv FORMAT JSONEachRowWithProgress"')
client2.expect('"progress".*',)
client2.expect('{"row":{"sum(a)":"0","_version":"1"}}\r\n', escape=True)
client2.expect('"progress".*\r\n')
# heartbeat is provided by progress message
client2.expect('"progress".*\r\n')
client1.send('INSERT INTO test.mt VALUES (1),(2),(3)')
client1.expect(prompt)
client2.expect('"progress".*"read_rows":"2".*\r\n')
client2.expect('{"row":{"sum(a)":"6","_version":"2"}}\r\n', escape=True)
## send Ctrl-C
os.kill(client2.process.pid,signal.SIGINT)
client1.send('DROP TABLE test.lv')
client1.expect(prompt)
client1.send('DROP TABLE test.mt')
client1.expect(prompt)

View File

@ -132,7 +132,7 @@ class IO(object):
self.buffer = self.buffer[self.match.end():]
break
if self._logger:
self._logger.write(self.before + self.after)
self._logger.write((self.before or '') + (self.after or ''))
self._logger.flush()
return self.match
@ -159,14 +159,18 @@ def spawn(command):
os.close(slave)
queue = Queue()
thread = Thread(target=reader, args=(master, queue))
thread = Thread(target=reader, args=(process, master, queue))
thread.daemon = True
thread.start()
return IO(process, master, queue)
def reader(out, queue):
def reader(process, out, queue):
while True:
if process.poll() is not None:
data = os.read(out)
queue.put(data)
break
data = os.read(out, 65536)
queue.put(data)