Merge pull request #10498 from vzakaznikov/insert_into_watch_from_live_view

Adding support for INSERT INTO table WATCH query to build streaming systems using LIVE VIEW tables
This commit is contained in:
Nikolai Kochetov 2020-04-29 12:15:54 +03:00 committed by GitHub
commit a9d63d4520
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 415 additions and 15 deletions

View File

@ -813,7 +813,7 @@ private:
insert->tryFindInputFunction(input_function);
/// INSERT query for which data transfer is needed (not an INSERT SELECT or input()) is processed separately.
if (insert && (!insert->select || input_function))
if (insert && (!insert->select || input_function) && !insert->watch)
{
if (input_function && insert->format.empty())
throw Exception("FORMAT must be specified for function input()", ErrorCodes::INVALID_USAGE_OF_INPUT);

View File

@ -178,6 +178,7 @@ struct Settings : public SettingsCollection<Settings>
\
M(SettingString, count_distinct_implementation, "uniqExact", "What aggregate function to use for implementation of count(DISTINCT ...)", 0) \
\
M(SettingBool, output_format_enable_streaming, false, "Enable streaming in output formats that support it.", 0) \
M(SettingBool, output_format_write_statistics, true, "Write statistics about read rows, bytes, time elapsed in suitable output formats.", 0) \
\
M(SettingBool, add_http_cors_header, false, "Write add http CORS header.", 0) \

View File

@ -91,6 +91,7 @@ static FormatSettings getInputFormatSetting(const Settings & settings, const Con
static FormatSettings getOutputFormatSetting(const Settings & settings, const Context & context)
{
FormatSettings format_settings;
format_settings.enable_streaming = settings.output_format_enable_streaming;
format_settings.json.quote_64bit_integers = settings.output_format_json_quote_64bit_integers;
format_settings.json.quote_denormals = settings.output_format_json_quote_denormals;
format_settings.json.escape_forward_slashes = settings.output_format_json_escape_forward_slashes;
@ -278,6 +279,10 @@ OutputFormatPtr FormatFactory::getOutputFormat(
*/
auto format = output_getter(buf, sample, std::move(callback), format_settings);
/// Enable auto-flush for streaming mode. Currently it is needed by INSERT WATCH query.
if (format_settings.enable_streaming)
format->setAutoFlush();
/// It's a kludge. Because I cannot remove context from MySQL format.
if (auto * mysql = typeid_cast<MySQLOutputFormat *>(format.get()))
mysql->setContext(context);

View File

@ -13,6 +13,10 @@ namespace DB
*/
struct FormatSettings
{
/// Format will be used for streaming. Not every formats support it
/// Option means that each chunk of data need to be formatted independently. Also each chunk will be flushed at the end of processing.
bool enable_streaming = false;
struct JSON
{
bool quote_64bit_integers = true;

View File

@ -16,6 +16,7 @@
#include <IO/ConcatReadBuffer.h>
#include <IO/ReadBufferFromMemory.h>
#include <Interpreters/InterpreterSelectWithUnionQuery.h>
#include <Interpreters/InterpreterWatchQuery.h>
#include <Access/AccessFlags.h>
#include <Interpreters/JoinedTables.h>
#include <Parsers/ASTFunction.h>
@ -185,7 +186,7 @@ BlockIO InterpreterInsertQuery::execute()
}
}
if (!is_distributed_insert_select)
if (!is_distributed_insert_select || query.watch)
{
size_t out_streams_size = 1;
if (query.select)
@ -206,6 +207,14 @@ BlockIO InterpreterInsertQuery::execute()
res.out = nullptr;
}
}
else if (query.watch)
{
InterpreterWatchQuery interpreter_watch{ query.watch, context };
res = interpreter_watch.execute();
in_streams.emplace_back(res.in);
res.in = nullptr;
res.out = nullptr;
}
for (size_t i = 0; i < out_streams_size; i++)
{
@ -221,7 +230,7 @@ BlockIO InterpreterInsertQuery::execute()
/// Do not squash blocks if it is a sync INSERT into Distributed, since it lead to double bufferization on client and server side.
/// Client-side bufferization might cause excessive timeouts (especially in case of big blocks).
if (!(context.getSettingsRef().insert_distributed_sync && table->isRemote()) && !no_squash)
if (!(context.getSettingsRef().insert_distributed_sync && table->isRemote()) && !no_squash && !query.watch)
{
out = std::make_shared<SquashingBlockOutputStream>(
out,
@ -246,8 +255,8 @@ BlockIO InterpreterInsertQuery::execute()
}
}
/// What type of query: INSERT or INSERT SELECT?
if (query.select)
/// What type of query: INSERT or INSERT SELECT or INSERT WATCH?
if (query.select || query.watch)
{
for (auto & in_stream : in_streams)
{
@ -259,7 +268,7 @@ BlockIO InterpreterInsertQuery::execute()
if (in_streams.size() > 1)
{
for (size_t i = 1; i < in_streams.size(); ++i)
assertBlocksHaveEqualStructure(in_streams[i]->getHeader(), in_header, "INSERT SELECT");
assertBlocksHaveEqualStructure(in_streams[i]->getHeader(), in_header, query.select ? "INSERT SELECT" : "INSERT WATCH");
}
res.in = std::make_shared<NullAndDoCopyBlockInputStream>(in_streams, out_streams);

View File

@ -30,14 +30,14 @@ using StoragePtr = std::shared_ptr<IStorage>;
class InterpreterWatchQuery : public IInterpreter
{
public:
InterpreterWatchQuery(const ASTPtr & query_ptr_, Context & context_)
InterpreterWatchQuery(const ASTPtr & query_ptr_, const Context & context_)
: query_ptr(query_ptr_), context(context_) {}
BlockIO execute() override;
private:
ASTPtr query_ptr;
Context & context;
const Context & context;
/// Table from where to read data, if not subquery.
StoragePtr storage;

View File

@ -180,6 +180,14 @@ static void onExceptionBeforeStart(const String & query_for_logging, Context & c
query_log->add(elem);
}
static void setQuerySpecificSettings(ASTPtr & ast, Context & context)
{
if (auto * ast_insert_into = dynamic_cast<ASTInsertQuery *>(ast.get()))
{
if (ast_insert_into->watch)
context.setSetting("output_format_enable_streaming", 1);
}
}
static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
const char * begin,
@ -246,6 +254,8 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
throw;
}
setQuerySpecificSettings(ast, context);
/// Copy query into string. It will be written to log and presented in processlist. If an INSERT query, string will not include data to insertion.
String query(begin, query_end);
BlockIO res;

View File

@ -40,6 +40,11 @@ void ASTInsertQuery::formatImpl(const FormatSettings & settings, FormatState & s
settings.ostr << " ";
select->formatImpl(settings, state, frame);
}
else if (watch)
{
settings.ostr << " ";
watch->formatImpl(settings, state, frame);
}
else
{
if (!format.empty())

View File

@ -16,6 +16,7 @@ public:
ASTPtr columns;
String format;
ASTPtr select;
ASTPtr watch;
ASTPtr table_function;
ASTPtr settings_ast;
@ -39,6 +40,7 @@ public:
if (columns) { res->columns = columns->clone(); res->children.push_back(res->columns); }
if (select) { res->select = select->clone(); res->children.push_back(res->select); }
if (watch) { res->watch = watch->clone(); res->children.push_back(res->watch); }
if (table_function) { res->table_function = table_function->clone(); res->children.push_back(res->table_function); }
if (settings_ast) { res->settings_ast = settings_ast->clone(); res->children.push_back(res->settings_ast); }

View File

@ -6,6 +6,7 @@
#include <Parsers/ExpressionElementParsers.h>
#include <Parsers/ExpressionListParsers.h>
#include <Parsers/ParserSelectWithUnionQuery.h>
#include <Parsers/ParserWatchQuery.h>
#include <Parsers/ParserInsertQuery.h>
#include <Parsers/ParserSetQuery.h>
#include <Common/typeid_cast.h>
@ -30,6 +31,7 @@ bool ParserInsertQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
ParserKeyword s_format("FORMAT");
ParserKeyword s_settings("SETTINGS");
ParserKeyword s_select("SELECT");
ParserKeyword s_watch("WATCH");
ParserKeyword s_with("WITH");
ParserToken s_lparen(TokenType::OpeningRoundBracket);
ParserToken s_rparen(TokenType::ClosingRoundBracket);
@ -42,6 +44,7 @@ bool ParserInsertQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
ASTPtr columns;
ASTPtr format;
ASTPtr select;
ASTPtr watch;
ASTPtr table_function;
ASTPtr settings_ast;
/// Insertion data
@ -80,7 +83,7 @@ bool ParserInsertQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
return false;
}
Pos before_select = pos;
Pos before_values = pos;
/// VALUES or FORMAT or SELECT
if (s_values.ignore(pos, expected))
@ -94,7 +97,7 @@ bool ParserInsertQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
}
else if (s_select.ignore(pos, expected) || s_with.ignore(pos,expected))
{
pos = before_select;
pos = before_values;
ParserSelectWithUnionQuery select_p;
select_p.parse(pos, select, expected);
@ -102,6 +105,16 @@ bool ParserInsertQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
if (s_format.ignore(pos, expected) && !name_p.parse(pos, format, expected))
return false;
}
else if (s_watch.ignore(pos, expected))
{
pos = before_values;
ParserWatchQuery watch_p;
watch_p.parse(pos, watch, expected);
/// FORMAT section is expected if we have input() in SELECT part
if (s_format.ignore(pos, expected) && !name_p.parse(pos, format, expected))
return false;
}
else
{
return false;
@ -159,6 +172,7 @@ bool ParserInsertQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
query->columns = columns;
query->select = select;
query->watch = watch;
query->settings_ast = settings_ast;
query->data = data != end ? data : nullptr;
query->end = end;
@ -167,6 +181,8 @@ bool ParserInsertQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
query->children.push_back(columns);
if (select)
query->children.push_back(select);
if (watch)
query->children.push_back(watch);
if (settings_ast)
query->children.push_back(settings_ast);

View File

@ -18,9 +18,9 @@ namespace DB
* INSERT INTO [db.]table (c1, c2, c3) FORMAT format \n ...
* INSERT INTO [db.]table FORMAT format \n ...
*
* Insert the result of the SELECT query.
* INSERT INTO [db.]table (c1, c2, c3) SELECT ...
* INSERT INTO [db.]table SELECT ...
* Insert the result of the SELECT or WATCH query.
* INSERT INTO [db.]table (c1, c2, c3) SELECT | WATCH ...
* INSERT INTO [db.]table SELECT | WATCH ...
*/
class ParserInsertQuery : public IParserBase
{

View File

@ -80,5 +80,13 @@ void IOutputFormat::flush()
out.next();
}
void IOutputFormat::write(const Block & block)
{
consume(Chunk(block.getColumns(), block.rows()));
if (auto_flush)
flush();
}
}

View File

@ -70,10 +70,9 @@ public:
InputPort & getPort(PortKind kind) { return *std::next(inputs.begin(), kind); }
public:
/// Compatible to IBlockOutputStream interface
void write(const Block & block) { consume(Chunk(block.getColumns(), block.rows())); }
void write(const Block & block);
virtual void doWritePrefix() {}
virtual void doWriteSuffix() { finalize(); }

View File

@ -0,0 +1,66 @@
#!/usr/bin/env python
import os
import sys
import signal
CURDIR = os.path.dirname(os.path.realpath(__file__))
sys.path.insert(0, os.path.join(CURDIR, 'helpers'))
from client import client, prompt, end_of_block
log = None
# uncomment the line below for debugging
#log=sys.stdout
with client(name='client1>', log=log) as client1, client(name='client2>', log=log) as client2:
client1.expect(prompt)
client2.expect(prompt)
client1.send('SET allow_experimental_live_view = 1')
client1.expect(prompt)
client2.send('SET allow_experimental_live_view = 1')
client2.expect(prompt)
client1.send('DROP TABLE IF EXISTS test.lv')
client1.expect(prompt)
client1.send('DROP TABLE IF EXISTS test.mt')
client1.expect(prompt)
client1.send('CREATE TABLE test.mt (time DateTime, location String, temperature UInt32) Engine=MergeTree order by tuple()')
client1.expect(prompt)
client1.send('CREATE LIVE VIEW test.lv AS SELECT toStartOfDay(time) AS day, location, avg(temperature) FROM test.mt GROUP BY day, location ORDER BY day, location')
client1.expect(prompt)
client1.send('WATCH test.lv FORMAT CSV')
client2.send("INSERT INTO test.mt VALUES ('2019-01-01 00:00:00','New York',60),('2019-01-01 00:10:00','New York',70)")
client2.expect(prompt)
client1.expect(r'"2019-01-01 00:00:00","New York",65,2')
client2.send("INSERT INTO test.mt VALUES ('2019-01-01 00:00:00','Moscow',30),('2019-01-01 00:10:00', 'Moscow', 40)")
client2.expect(prompt)
client1.expect(r'"2019-01-01 00:00:00","Moscow",35,3')
client1.expect(r'"2019-01-01 00:00:00","New York",65,3')
client2.send("INSERT INTO test.mt VALUES ('2019-01-02 00:00:00','New York',50),('2019-01-02 00:10:00','New York',60)")
client2.expect(prompt)
client1.expect(r'"2019-01-01 00:00:00","Moscow",35,4')
client1.expect(r'"2019-01-01 00:00:00","New York",65,4')
client1.expect(r'"2019-01-02 00:00:00","New York",55,4')
client2.send("INSERT INTO test.mt VALUES ('2019-01-02 00:00:00','Moscow',20),('2019-01-02 00:10:00', 'Moscow', 30)")
client2.expect(prompt)
client1.expect(r'"2019-01-01 00:00:00","Moscow",35,5')
client1.expect(r'"2019-01-01 00:00:00","New York",65,5')
client1.expect(r'"2019-01-02 00:00:00","Moscow",25,5')
client1.expect(r'"2019-01-02 00:00:00","New York",55,5')
client2.send("INSERT INTO test.mt VALUES ('2019-01-02 00:03:00','New York',40),('2019-01-02 00:06:00','New York',30)")
client2.expect(prompt)
client1.expect(r'"2019-01-01 00:00:00","Moscow",35,6')
client1.expect(r'"2019-01-01 00:00:00","New York",65,6')
client1.expect(r'"2019-01-02 00:00:00","Moscow",25,6')
client1.expect(r'"2019-01-02 00:00:00","New York",45,6')
# send Ctrl-C
client1.send('\x03', eol='')
match = client1.expect('(%s)|([#\$] )' % prompt)
if match.groups()[1]:
client1.send(client1.command)
client1.expect(prompt)
client1.send('DROP TABLE test.lv')
client1.expect(prompt)
client1.send('DROP TABLE test.mt')
client1.expect(prompt)

View File

@ -0,0 +1,71 @@
#!/usr/bin/env python
import os
import sys
import time
import signal
CURDIR = os.path.dirname(os.path.realpath(__file__))
sys.path.insert(0, os.path.join(CURDIR, 'helpers'))
from client import client, prompt, end_of_block
log = None
# uncomment the line below for debugging
#log=sys.stdout
with client(name='client1>', log=log) as client1, client(name='client2>', log=log) as client2:
client1.expect(prompt)
client2.expect(prompt)
client1.send('SET allow_experimental_live_view = 1')
client1.expect(prompt)
client2.send('SET allow_experimental_live_view = 1')
client2.expect(prompt)
client1.send('DROP TABLE IF EXISTS test.lv')
client1.expect(prompt)
client1.send('DROP TABLE IF EXISTS test.mt')
client1.expect(prompt)
client1.send('DROP TABLE IF EXISTS test.sums')
client1.expect(prompt)
client1.send('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()')
client1.expect(prompt)
client1.send('CREATE LIVE VIEW test.lv AS SELECT sum(a) AS s FROM test.mt')
client1.expect(prompt)
client1.send('CREATE TABLE test.sums (s Int32, version Int32) Engine=MergeTree ORDER BY tuple()')
client1.expect(prompt)
client1.send('INSERT INTO test.sums WATCH test.lv')
client1.expect(r'0.*1' + end_of_block)
time.sleep(0.25)
client2.send('SELECT * FROM test.sums ORDER BY version FORMAT CSV')
client2.expect('0,1\r\n')
client2.expect(prompt)
client2.send('INSERT INTO test.mt VALUES (1),(2),(3)')
client2.expect(prompt)
time.sleep(0.25)
client2.send('SELECT * FROM test.sums ORDER BY version FORMAT CSV')
client2.expect('6,2\r\n')
client2.expect(prompt)
client2.send('INSERT INTO test.mt VALUES (4),(5),(6)')
client2.expect(prompt)
time.sleep(0.25)
client2.send('SELECT * FROM test.sums ORDER BY version FORMAT CSV')
client2.expect('21,3\r\n')
client2.expect(prompt)
# send Ctrl-C
client1.send('\x03', eol='')
match = client1.expect('(%s)|([#\$] )' % prompt)
if match.groups()[1]:
client1.send(client1.command)
client1.expect(prompt)
client1.send('DROP TABLE test.sums')
client1.expect(prompt)
client1.send('DROP TABLE test.lv')
client1.expect(prompt)
client1.send('DROP TABLE test.mt')
client1.expect(prompt)

View File

@ -0,0 +1,75 @@
#!/usr/bin/env python
import os
import sys
import time
import signal
import requests
CURDIR = os.path.dirname(os.path.realpath(__file__))
sys.path.insert(0, os.path.join(CURDIR, 'helpers'))
from httpechoserver import start_server, HTTP_SERVER_URL_STR
from client import client, prompt, end_of_block
log = None
# uncomment the line below for debugging
#log=sys.stdout
server = start_server(3)
server.start()
try:
for output_format in ['CSV', 'JSONEachRow', 'JSONEachRowWithProgress']:
with client(name='client1>', log=log) as client1, client(name='client2>', log=log) as client2:
client1.expect(prompt)
client2.expect(prompt)
client1.send('SET allow_experimental_live_view = 1')
client1.expect(prompt)
client1.send('DROP TABLE IF EXISTS test.lv')
client1.expect(prompt)
client1.send('DROP TABLE IF EXISTS test.mt')
client1.expect(prompt)
client1.send('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()')
client1.expect(prompt)
client1.send('CREATE LIVE VIEW test.lv AS SELECT sum(a) AS s FROM test.mt')
client1.expect(prompt)
client1.send("INSERT INTO FUNCTION url('%s', %s, 's Int32, version Int32') WATCH test.lv" % (HTTP_SERVER_URL_STR, output_format))
client1.expect(r'0.*1' + end_of_block)
time.sleep(0.25)
sys.stdout.write("-- first insert --\n")
sys.stdout.write(server.out.read() + "\n")
client2.send('INSERT INTO test.mt VALUES (1),(2),(3)')
client2.expect(prompt)
time.sleep(0.25)
sys.stdout.write("-- second insert --\n")
sys.stdout.write(server.out.read() + "\n")
client2.send('INSERT INTO test.mt VALUES (4),(5),(6)')
client2.expect(prompt)
time.sleep(0.25)
sys.stdout.write("-- third insert --\n")
sys.stdout.write(server.out.read() + "\n")
# send Ctrl-C
client1.send('\x03', eol='')
match = client1.expect('(%s)|([#\$] )' % prompt)
if match.groups()[1]:
client1.send(client1.command)
client1.expect(prompt)
client1.send('DROP TABLE test.lv')
client1.expect(prompt)
client1.send('DROP TABLE test.mt')
client1.expect(prompt)
finally:
try:
for i in range(3):
requests.post(HTTP_SERVER_URL_STR, data=b"0\r\n", timeout=1)
except Exception:
pass
finally:
server.join()

View File

@ -0,0 +1,27 @@
-- first insert --
0,1
-- second insert --
6,2
-- third insert --
21,3
-- first insert --
{"s":0,"version":1}
-- second insert --
{"s":6,"version":2}
-- third insert --
{"s":21,"version":3}
-- first insert --
{"row":{"s":0,"version":1}}
-- second insert --
{"row":{"s":6,"version":2}}
-- third insert --
{"row":{"s":21,"version":3}}

View File

@ -0,0 +1,102 @@
#!/usr/bin/env python
from __future__ import print_function
import sys
import os
import time
import subprocess
import threading
from io import StringIO, SEEK_END
from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
CLICKHOUSE_HOST = os.environ.get('CLICKHOUSE_HOST', '127.0.0.1')
CLICKHOUSE_PORT_HTTP = os.environ.get('CLICKHOUSE_PORT_HTTP', '8123')
# IP-address of this host accessible from outside world.
HTTP_SERVER_HOST = os.environ.get('HTTP_SERVER_HOST', subprocess.check_output(['hostname', '-i']).decode('utf-8').strip())
HTTP_SERVER_PORT = int(os.environ.get('CLICKHOUSE_TEST_HOST_EXPOSED_PORT', 51234))
# IP address and port of the HTTP server started from this script.
HTTP_SERVER_ADDRESS = (HTTP_SERVER_HOST, HTTP_SERVER_PORT)
HTTP_SERVER_URL_STR = 'http://' + ':'.join(str(s) for s in HTTP_SERVER_ADDRESS) + "/"
ostream = StringIO()
istream = StringIO()
class EchoCSVHTTPServer(BaseHTTPRequestHandler):
def _set_headers(self):
self.send_response(200)
self.send_header('Content-type', 'text/plain')
self.end_headers()
def do_GET(self):
self._set_headers()
with open(CSV_DATA, 'r') as fl:
ostream.seek(0)
for row in ostream:
self.wfile.write(row + '\n')
return
def read_chunk(self):
msg = ''
while True:
sym = self.rfile.read(1)
if sym == '':
break
msg += sym.decode('utf-8')
if msg.endswith('\r\n'):
break
length = int(msg[:-2], 16)
if length == 0:
return ''
content = self.rfile.read(length)
self.rfile.read(2) # read sep \r\n
return content.decode('utf-8')
def do_POST(self):
while True:
chunk = self.read_chunk()
if not chunk:
break
pos = istream.tell()
istream.seek(0, SEEK_END)
istream.write(chunk)
istream.seek(pos)
text = ""
self._set_headers()
self.wfile.write("ok")
def log_message(self, format, *args):
return
def start_server(requests_amount, test_data="Hello,2,-2,7.7\nWorld,2,-5,8.8"):
ostream = StringIO(test_data.decode("utf-8"))
httpd = HTTPServer(HTTP_SERVER_ADDRESS, EchoCSVHTTPServer)
def real_func():
for i in xrange(requests_amount):
httpd.handle_request()
t = threading.Thread(target=real_func)
t.out = istream
return t
def run():
t = start_server(1)
t.start()
t.join()
if __name__ == "__main__":
exception_text = ''
for i in range(1, 5):
try:
run()
break
except Exception as ex:
exception_text = str(ex)
time.sleep(0.1)
if exception_text:
print("Exception: {}".format(exception_text), file=sys.stderr)
os._exit(1)