diff --git a/programs/client/Client.cpp b/programs/client/Client.cpp index 64d80eeaee9..86e51ee396f 100644 --- a/programs/client/Client.cpp +++ b/programs/client/Client.cpp @@ -813,7 +813,7 @@ private: insert->tryFindInputFunction(input_function); /// INSERT query for which data transfer is needed (not an INSERT SELECT or input()) is processed separately. - if (insert && (!insert->select || input_function)) + if (insert && (!insert->select || input_function) && !insert->watch) { if (input_function && insert->format.empty()) throw Exception("FORMAT must be specified for function input()", ErrorCodes::INVALID_USAGE_OF_INPUT); diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 2ce8553864c..61fcf658ba8 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -178,6 +178,7 @@ struct Settings : public SettingsCollection \ M(SettingString, count_distinct_implementation, "uniqExact", "What aggregate function to use for implementation of count(DISTINCT ...)", 0) \ \ + M(SettingBool, output_format_enable_streaming, false, "Enable streaming in output formats that support it.", 0) \ M(SettingBool, output_format_write_statistics, true, "Write statistics about read rows, bytes, time elapsed in suitable output formats.", 0) \ \ M(SettingBool, add_http_cors_header, false, "Write add http CORS header.", 0) \ diff --git a/src/Formats/FormatFactory.cpp b/src/Formats/FormatFactory.cpp index 91cfb7eade9..c01ecde8542 100644 --- a/src/Formats/FormatFactory.cpp +++ b/src/Formats/FormatFactory.cpp @@ -91,6 +91,7 @@ static FormatSettings getInputFormatSetting(const Settings & settings, const Con static FormatSettings getOutputFormatSetting(const Settings & settings, const Context & context) { FormatSettings format_settings; + format_settings.enable_streaming = settings.output_format_enable_streaming; format_settings.json.quote_64bit_integers = settings.output_format_json_quote_64bit_integers; format_settings.json.quote_denormals = settings.output_format_json_quote_denormals; format_settings.json.escape_forward_slashes = settings.output_format_json_escape_forward_slashes; @@ -278,6 +279,10 @@ OutputFormatPtr FormatFactory::getOutputFormat( */ auto format = output_getter(buf, sample, std::move(callback), format_settings); + /// Enable auto-flush for streaming mode. Currently it is needed by INSERT WATCH query. + if (format_settings.enable_streaming) + format->setAutoFlush(); + /// It's a kludge. Because I cannot remove context from MySQL format. if (auto * mysql = typeid_cast(format.get())) mysql->setContext(context); diff --git a/src/Formats/FormatSettings.h b/src/Formats/FormatSettings.h index 1eb95ce2dbf..af722e52c72 100644 --- a/src/Formats/FormatSettings.h +++ b/src/Formats/FormatSettings.h @@ -13,6 +13,10 @@ namespace DB */ struct FormatSettings { + /// Format will be used for streaming. Not every formats support it + /// Option means that each chunk of data need to be formatted independently. Also each chunk will be flushed at the end of processing. + bool enable_streaming = false; + struct JSON { bool quote_64bit_integers = true; diff --git a/src/Interpreters/InterpreterInsertQuery.cpp b/src/Interpreters/InterpreterInsertQuery.cpp index d1a7568581b..f3b116e490c 100644 --- a/src/Interpreters/InterpreterInsertQuery.cpp +++ b/src/Interpreters/InterpreterInsertQuery.cpp @@ -16,6 +16,7 @@ #include #include #include +#include #include #include #include @@ -185,7 +186,7 @@ BlockIO InterpreterInsertQuery::execute() } } - if (!is_distributed_insert_select) + if (!is_distributed_insert_select || query.watch) { size_t out_streams_size = 1; if (query.select) @@ -206,6 +207,14 @@ BlockIO InterpreterInsertQuery::execute() res.out = nullptr; } } + else if (query.watch) + { + InterpreterWatchQuery interpreter_watch{ query.watch, context }; + res = interpreter_watch.execute(); + in_streams.emplace_back(res.in); + res.in = nullptr; + res.out = nullptr; + } for (size_t i = 0; i < out_streams_size; i++) { @@ -221,7 +230,7 @@ BlockIO InterpreterInsertQuery::execute() /// Do not squash blocks if it is a sync INSERT into Distributed, since it lead to double bufferization on client and server side. /// Client-side bufferization might cause excessive timeouts (especially in case of big blocks). - if (!(context.getSettingsRef().insert_distributed_sync && table->isRemote()) && !no_squash) + if (!(context.getSettingsRef().insert_distributed_sync && table->isRemote()) && !no_squash && !query.watch) { out = std::make_shared( out, @@ -246,8 +255,8 @@ BlockIO InterpreterInsertQuery::execute() } } - /// What type of query: INSERT or INSERT SELECT? - if (query.select) + /// What type of query: INSERT or INSERT SELECT or INSERT WATCH? + if (query.select || query.watch) { for (auto & in_stream : in_streams) { @@ -259,7 +268,7 @@ BlockIO InterpreterInsertQuery::execute() if (in_streams.size() > 1) { for (size_t i = 1; i < in_streams.size(); ++i) - assertBlocksHaveEqualStructure(in_streams[i]->getHeader(), in_header, "INSERT SELECT"); + assertBlocksHaveEqualStructure(in_streams[i]->getHeader(), in_header, query.select ? "INSERT SELECT" : "INSERT WATCH"); } res.in = std::make_shared(in_streams, out_streams); diff --git a/src/Interpreters/InterpreterWatchQuery.h b/src/Interpreters/InterpreterWatchQuery.h index 7a5d57a1cf5..7929b86b1c8 100644 --- a/src/Interpreters/InterpreterWatchQuery.h +++ b/src/Interpreters/InterpreterWatchQuery.h @@ -30,14 +30,14 @@ using StoragePtr = std::shared_ptr; class InterpreterWatchQuery : public IInterpreter { public: - InterpreterWatchQuery(const ASTPtr & query_ptr_, Context & context_) + InterpreterWatchQuery(const ASTPtr & query_ptr_, const Context & context_) : query_ptr(query_ptr_), context(context_) {} BlockIO execute() override; private: ASTPtr query_ptr; - Context & context; + const Context & context; /// Table from where to read data, if not subquery. StoragePtr storage; diff --git a/src/Interpreters/executeQuery.cpp b/src/Interpreters/executeQuery.cpp index aa05e582150..62ef4152a2e 100644 --- a/src/Interpreters/executeQuery.cpp +++ b/src/Interpreters/executeQuery.cpp @@ -180,6 +180,14 @@ static void onExceptionBeforeStart(const String & query_for_logging, Context & c query_log->add(elem); } +static void setQuerySpecificSettings(ASTPtr & ast, Context & context) +{ + if (auto * ast_insert_into = dynamic_cast(ast.get())) + { + if (ast_insert_into->watch) + context.setSetting("output_format_enable_streaming", 1); + } +} static std::tuple executeQueryImpl( const char * begin, @@ -246,6 +254,8 @@ static std::tuple executeQueryImpl( throw; } + setQuerySpecificSettings(ast, context); + /// Copy query into string. It will be written to log and presented in processlist. If an INSERT query, string will not include data to insertion. String query(begin, query_end); BlockIO res; diff --git a/src/Parsers/ASTInsertQuery.cpp b/src/Parsers/ASTInsertQuery.cpp index 5945d2752f3..dc9b9f092ac 100644 --- a/src/Parsers/ASTInsertQuery.cpp +++ b/src/Parsers/ASTInsertQuery.cpp @@ -40,6 +40,11 @@ void ASTInsertQuery::formatImpl(const FormatSettings & settings, FormatState & s settings.ostr << " "; select->formatImpl(settings, state, frame); } + else if (watch) + { + settings.ostr << " "; + watch->formatImpl(settings, state, frame); + } else { if (!format.empty()) diff --git a/src/Parsers/ASTInsertQuery.h b/src/Parsers/ASTInsertQuery.h index 91ab29d4d85..982f310fdb3 100644 --- a/src/Parsers/ASTInsertQuery.h +++ b/src/Parsers/ASTInsertQuery.h @@ -16,6 +16,7 @@ public: ASTPtr columns; String format; ASTPtr select; + ASTPtr watch; ASTPtr table_function; ASTPtr settings_ast; @@ -39,6 +40,7 @@ public: if (columns) { res->columns = columns->clone(); res->children.push_back(res->columns); } if (select) { res->select = select->clone(); res->children.push_back(res->select); } + if (watch) { res->watch = watch->clone(); res->children.push_back(res->watch); } if (table_function) { res->table_function = table_function->clone(); res->children.push_back(res->table_function); } if (settings_ast) { res->settings_ast = settings_ast->clone(); res->children.push_back(res->settings_ast); } diff --git a/src/Parsers/ParserInsertQuery.cpp b/src/Parsers/ParserInsertQuery.cpp index 26219bc82cb..dc25954c71f 100644 --- a/src/Parsers/ParserInsertQuery.cpp +++ b/src/Parsers/ParserInsertQuery.cpp @@ -6,6 +6,7 @@ #include #include #include +#include #include #include #include @@ -30,6 +31,7 @@ bool ParserInsertQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) ParserKeyword s_format("FORMAT"); ParserKeyword s_settings("SETTINGS"); ParserKeyword s_select("SELECT"); + ParserKeyword s_watch("WATCH"); ParserKeyword s_with("WITH"); ParserToken s_lparen(TokenType::OpeningRoundBracket); ParserToken s_rparen(TokenType::ClosingRoundBracket); @@ -42,6 +44,7 @@ bool ParserInsertQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) ASTPtr columns; ASTPtr format; ASTPtr select; + ASTPtr watch; ASTPtr table_function; ASTPtr settings_ast; /// Insertion data @@ -80,7 +83,7 @@ bool ParserInsertQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) return false; } - Pos before_select = pos; + Pos before_values = pos; /// VALUES or FORMAT or SELECT if (s_values.ignore(pos, expected)) @@ -94,7 +97,7 @@ bool ParserInsertQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) } else if (s_select.ignore(pos, expected) || s_with.ignore(pos,expected)) { - pos = before_select; + pos = before_values; ParserSelectWithUnionQuery select_p; select_p.parse(pos, select, expected); @@ -102,6 +105,16 @@ bool ParserInsertQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) if (s_format.ignore(pos, expected) && !name_p.parse(pos, format, expected)) return false; } + else if (s_watch.ignore(pos, expected)) + { + pos = before_values; + ParserWatchQuery watch_p; + watch_p.parse(pos, watch, expected); + + /// FORMAT section is expected if we have input() in SELECT part + if (s_format.ignore(pos, expected) && !name_p.parse(pos, format, expected)) + return false; + } else { return false; @@ -159,6 +172,7 @@ bool ParserInsertQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) query->columns = columns; query->select = select; + query->watch = watch; query->settings_ast = settings_ast; query->data = data != end ? data : nullptr; query->end = end; @@ -167,6 +181,8 @@ bool ParserInsertQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) query->children.push_back(columns); if (select) query->children.push_back(select); + if (watch) + query->children.push_back(watch); if (settings_ast) query->children.push_back(settings_ast); diff --git a/src/Parsers/ParserInsertQuery.h b/src/Parsers/ParserInsertQuery.h index 5669d48ffc6..b69bc645c15 100644 --- a/src/Parsers/ParserInsertQuery.h +++ b/src/Parsers/ParserInsertQuery.h @@ -18,9 +18,9 @@ namespace DB * INSERT INTO [db.]table (c1, c2, c3) FORMAT format \n ... * INSERT INTO [db.]table FORMAT format \n ... * - * Insert the result of the SELECT query. - * INSERT INTO [db.]table (c1, c2, c3) SELECT ... - * INSERT INTO [db.]table SELECT ... + * Insert the result of the SELECT or WATCH query. + * INSERT INTO [db.]table (c1, c2, c3) SELECT | WATCH ... + * INSERT INTO [db.]table SELECT | WATCH ... */ class ParserInsertQuery : public IParserBase { diff --git a/src/Processors/Formats/IOutputFormat.cpp b/src/Processors/Formats/IOutputFormat.cpp index 543a854f75e..334843036dc 100644 --- a/src/Processors/Formats/IOutputFormat.cpp +++ b/src/Processors/Formats/IOutputFormat.cpp @@ -80,5 +80,13 @@ void IOutputFormat::flush() out.next(); } +void IOutputFormat::write(const Block & block) +{ + consume(Chunk(block.getColumns(), block.rows())); + + if (auto_flush) + flush(); +} + } diff --git a/src/Processors/Formats/IOutputFormat.h b/src/Processors/Formats/IOutputFormat.h index 71a0d2f0066..2e3db50ee6e 100644 --- a/src/Processors/Formats/IOutputFormat.h +++ b/src/Processors/Formats/IOutputFormat.h @@ -70,10 +70,9 @@ public: InputPort & getPort(PortKind kind) { return *std::next(inputs.begin(), kind); } -public: /// Compatible to IBlockOutputStream interface - void write(const Block & block) { consume(Chunk(block.getColumns(), block.rows())); } + void write(const Block & block); virtual void doWritePrefix() {} virtual void doWriteSuffix() { finalize(); } diff --git a/tests/queries/0_stateless/00979_live_view_watch_continuous_aggregates.py b/tests/queries/0_stateless/00979_live_view_watch_continuous_aggregates.py new file mode 100755 index 00000000000..876b4334217 --- /dev/null +++ b/tests/queries/0_stateless/00979_live_view_watch_continuous_aggregates.py @@ -0,0 +1,66 @@ +#!/usr/bin/env python +import os +import sys +import signal + +CURDIR = os.path.dirname(os.path.realpath(__file__)) +sys.path.insert(0, os.path.join(CURDIR, 'helpers')) + +from client import client, prompt, end_of_block + +log = None +# uncomment the line below for debugging +#log=sys.stdout + +with client(name='client1>', log=log) as client1, client(name='client2>', log=log) as client2: + client1.expect(prompt) + client2.expect(prompt) + + client1.send('SET allow_experimental_live_view = 1') + client1.expect(prompt) + client2.send('SET allow_experimental_live_view = 1') + client2.expect(prompt) + + client1.send('DROP TABLE IF EXISTS test.lv') + client1.expect(prompt) + client1.send('DROP TABLE IF EXISTS test.mt') + client1.expect(prompt) + client1.send('CREATE TABLE test.mt (time DateTime, location String, temperature UInt32) Engine=MergeTree order by tuple()') + client1.expect(prompt) + client1.send('CREATE LIVE VIEW test.lv AS SELECT toStartOfDay(time) AS day, location, avg(temperature) FROM test.mt GROUP BY day, location ORDER BY day, location') + client1.expect(prompt) + client1.send('WATCH test.lv FORMAT CSV') + client2.send("INSERT INTO test.mt VALUES ('2019-01-01 00:00:00','New York',60),('2019-01-01 00:10:00','New York',70)") + client2.expect(prompt) + client1.expect(r'"2019-01-01 00:00:00","New York",65,2') + client2.send("INSERT INTO test.mt VALUES ('2019-01-01 00:00:00','Moscow',30),('2019-01-01 00:10:00', 'Moscow', 40)") + client2.expect(prompt) + client1.expect(r'"2019-01-01 00:00:00","Moscow",35,3') + client1.expect(r'"2019-01-01 00:00:00","New York",65,3') + client2.send("INSERT INTO test.mt VALUES ('2019-01-02 00:00:00','New York',50),('2019-01-02 00:10:00','New York',60)") + client2.expect(prompt) + client1.expect(r'"2019-01-01 00:00:00","Moscow",35,4') + client1.expect(r'"2019-01-01 00:00:00","New York",65,4') + client1.expect(r'"2019-01-02 00:00:00","New York",55,4') + client2.send("INSERT INTO test.mt VALUES ('2019-01-02 00:00:00','Moscow',20),('2019-01-02 00:10:00', 'Moscow', 30)") + client2.expect(prompt) + client1.expect(r'"2019-01-01 00:00:00","Moscow",35,5') + client1.expect(r'"2019-01-01 00:00:00","New York",65,5') + client1.expect(r'"2019-01-02 00:00:00","Moscow",25,5') + client1.expect(r'"2019-01-02 00:00:00","New York",55,5') + client2.send("INSERT INTO test.mt VALUES ('2019-01-02 00:03:00','New York',40),('2019-01-02 00:06:00','New York',30)") + client2.expect(prompt) + client1.expect(r'"2019-01-01 00:00:00","Moscow",35,6') + client1.expect(r'"2019-01-01 00:00:00","New York",65,6') + client1.expect(r'"2019-01-02 00:00:00","Moscow",25,6') + client1.expect(r'"2019-01-02 00:00:00","New York",45,6') + # send Ctrl-C + client1.send('\x03', eol='') + match = client1.expect('(%s)|([#\$] )' % prompt) + if match.groups()[1]: + client1.send(client1.command) + client1.expect(prompt) + client1.send('DROP TABLE test.lv') + client1.expect(prompt) + client1.send('DROP TABLE test.mt') + client1.expect(prompt) diff --git a/tests/queries/0_stateless/00979_live_view_watch_continuous_aggregates.reference b/tests/queries/0_stateless/00979_live_view_watch_continuous_aggregates.reference new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/queries/0_stateless/01246_insert_into_watch_live_view.py b/tests/queries/0_stateless/01246_insert_into_watch_live_view.py new file mode 100755 index 00000000000..591302d42cf --- /dev/null +++ b/tests/queries/0_stateless/01246_insert_into_watch_live_view.py @@ -0,0 +1,71 @@ +#!/usr/bin/env python +import os +import sys +import time +import signal + +CURDIR = os.path.dirname(os.path.realpath(__file__)) +sys.path.insert(0, os.path.join(CURDIR, 'helpers')) + +from client import client, prompt, end_of_block + +log = None +# uncomment the line below for debugging +#log=sys.stdout + +with client(name='client1>', log=log) as client1, client(name='client2>', log=log) as client2: + client1.expect(prompt) + client2.expect(prompt) + + client1.send('SET allow_experimental_live_view = 1') + client1.expect(prompt) + client2.send('SET allow_experimental_live_view = 1') + client2.expect(prompt) + + client1.send('DROP TABLE IF EXISTS test.lv') + client1.expect(prompt) + client1.send('DROP TABLE IF EXISTS test.mt') + client1.expect(prompt) + client1.send('DROP TABLE IF EXISTS test.sums') + client1.expect(prompt) + client1.send('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()') + client1.expect(prompt) + client1.send('CREATE LIVE VIEW test.lv AS SELECT sum(a) AS s FROM test.mt') + client1.expect(prompt) + client1.send('CREATE TABLE test.sums (s Int32, version Int32) Engine=MergeTree ORDER BY tuple()') + client1.expect(prompt) + + client1.send('INSERT INTO test.sums WATCH test.lv') + client1.expect(r'0.*1' + end_of_block) + time.sleep(0.25) + client2.send('SELECT * FROM test.sums ORDER BY version FORMAT CSV') + client2.expect('0,1\r\n') + client2.expect(prompt) + + client2.send('INSERT INTO test.mt VALUES (1),(2),(3)') + client2.expect(prompt) + time.sleep(0.25) + client2.send('SELECT * FROM test.sums ORDER BY version FORMAT CSV') + client2.expect('6,2\r\n') + client2.expect(prompt) + + client2.send('INSERT INTO test.mt VALUES (4),(5),(6)') + client2.expect(prompt) + time.sleep(0.25) + client2.send('SELECT * FROM test.sums ORDER BY version FORMAT CSV') + client2.expect('21,3\r\n') + client2.expect(prompt) + + # send Ctrl-C + client1.send('\x03', eol='') + match = client1.expect('(%s)|([#\$] )' % prompt) + if match.groups()[1]: + client1.send(client1.command) + client1.expect(prompt) + + client1.send('DROP TABLE test.sums') + client1.expect(prompt) + client1.send('DROP TABLE test.lv') + client1.expect(prompt) + client1.send('DROP TABLE test.mt') + client1.expect(prompt) diff --git a/tests/queries/0_stateless/01246_insert_into_watch_live_view.reference b/tests/queries/0_stateless/01246_insert_into_watch_live_view.reference new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/queries/0_stateless/01247_insert_into_watch_live_view_url.py b/tests/queries/0_stateless/01247_insert_into_watch_live_view_url.py new file mode 100755 index 00000000000..ed4e3218438 --- /dev/null +++ b/tests/queries/0_stateless/01247_insert_into_watch_live_view_url.py @@ -0,0 +1,75 @@ +#!/usr/bin/env python +import os +import sys +import time +import signal +import requests + +CURDIR = os.path.dirname(os.path.realpath(__file__)) +sys.path.insert(0, os.path.join(CURDIR, 'helpers')) + +from httpechoserver import start_server, HTTP_SERVER_URL_STR +from client import client, prompt, end_of_block + +log = None +# uncomment the line below for debugging +#log=sys.stdout + +server = start_server(3) +server.start() + +try: + for output_format in ['CSV', 'JSONEachRow', 'JSONEachRowWithProgress']: + with client(name='client1>', log=log) as client1, client(name='client2>', log=log) as client2: + client1.expect(prompt) + client2.expect(prompt) + + client1.send('SET allow_experimental_live_view = 1') + client1.expect(prompt) + + client1.send('DROP TABLE IF EXISTS test.lv') + client1.expect(prompt) + client1.send('DROP TABLE IF EXISTS test.mt') + client1.expect(prompt) + client1.send('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()') + client1.expect(prompt) + client1.send('CREATE LIVE VIEW test.lv AS SELECT sum(a) AS s FROM test.mt') + client1.expect(prompt) + + client1.send("INSERT INTO FUNCTION url('%s', %s, 's Int32, version Int32') WATCH test.lv" % (HTTP_SERVER_URL_STR, output_format)) + client1.expect(r'0.*1' + end_of_block) + time.sleep(0.25) + sys.stdout.write("-- first insert --\n") + sys.stdout.write(server.out.read() + "\n") + + client2.send('INSERT INTO test.mt VALUES (1),(2),(3)') + client2.expect(prompt) + time.sleep(0.25) + sys.stdout.write("-- second insert --\n") + sys.stdout.write(server.out.read() + "\n") + + client2.send('INSERT INTO test.mt VALUES (4),(5),(6)') + client2.expect(prompt) + time.sleep(0.25) + sys.stdout.write("-- third insert --\n") + sys.stdout.write(server.out.read() + "\n") + + # send Ctrl-C + client1.send('\x03', eol='') + match = client1.expect('(%s)|([#\$] )' % prompt) + if match.groups()[1]: + client1.send(client1.command) + client1.expect(prompt) + + client1.send('DROP TABLE test.lv') + client1.expect(prompt) + client1.send('DROP TABLE test.mt') + client1.expect(prompt) +finally: + try: + for i in range(3): + requests.post(HTTP_SERVER_URL_STR, data=b"0\r\n", timeout=1) + except Exception: + pass + finally: + server.join() diff --git a/tests/queries/0_stateless/01247_insert_into_watch_live_view_url.reference b/tests/queries/0_stateless/01247_insert_into_watch_live_view_url.reference new file mode 100644 index 00000000000..aa3146539cd --- /dev/null +++ b/tests/queries/0_stateless/01247_insert_into_watch_live_view_url.reference @@ -0,0 +1,27 @@ +-- first insert -- +0,1 + +-- second insert -- +6,2 + +-- third insert -- +21,3 + +-- first insert -- +{"s":0,"version":1} + +-- second insert -- +{"s":6,"version":2} + +-- third insert -- +{"s":21,"version":3} + +-- first insert -- +{"row":{"s":0,"version":1}} + +-- second insert -- +{"row":{"s":6,"version":2}} + +-- third insert -- +{"row":{"s":21,"version":3}} + diff --git a/tests/queries/0_stateless/helpers/httpechoserver.py b/tests/queries/0_stateless/helpers/httpechoserver.py new file mode 100644 index 00000000000..938f1b4bba1 --- /dev/null +++ b/tests/queries/0_stateless/helpers/httpechoserver.py @@ -0,0 +1,102 @@ +#!/usr/bin/env python +from __future__ import print_function +import sys +import os +import time +import subprocess +import threading +from io import StringIO, SEEK_END +from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer + +CLICKHOUSE_HOST = os.environ.get('CLICKHOUSE_HOST', '127.0.0.1') +CLICKHOUSE_PORT_HTTP = os.environ.get('CLICKHOUSE_PORT_HTTP', '8123') + +# IP-address of this host accessible from outside world. +HTTP_SERVER_HOST = os.environ.get('HTTP_SERVER_HOST', subprocess.check_output(['hostname', '-i']).decode('utf-8').strip()) +HTTP_SERVER_PORT = int(os.environ.get('CLICKHOUSE_TEST_HOST_EXPOSED_PORT', 51234)) + +# IP address and port of the HTTP server started from this script. +HTTP_SERVER_ADDRESS = (HTTP_SERVER_HOST, HTTP_SERVER_PORT) +HTTP_SERVER_URL_STR = 'http://' + ':'.join(str(s) for s in HTTP_SERVER_ADDRESS) + "/" + +ostream = StringIO() +istream = StringIO() + +class EchoCSVHTTPServer(BaseHTTPRequestHandler): + def _set_headers(self): + self.send_response(200) + self.send_header('Content-type', 'text/plain') + self.end_headers() + + def do_GET(self): + self._set_headers() + with open(CSV_DATA, 'r') as fl: + ostream.seek(0) + for row in ostream: + self.wfile.write(row + '\n') + return + + def read_chunk(self): + msg = '' + while True: + sym = self.rfile.read(1) + if sym == '': + break + msg += sym.decode('utf-8') + if msg.endswith('\r\n'): + break + length = int(msg[:-2], 16) + if length == 0: + return '' + content = self.rfile.read(length) + self.rfile.read(2) # read sep \r\n + return content.decode('utf-8') + + def do_POST(self): + while True: + chunk = self.read_chunk() + if not chunk: + break + pos = istream.tell() + istream.seek(0, SEEK_END) + istream.write(chunk) + istream.seek(pos) + text = "" + self._set_headers() + self.wfile.write("ok") + + def log_message(self, format, *args): + return + +def start_server(requests_amount, test_data="Hello,2,-2,7.7\nWorld,2,-5,8.8"): + ostream = StringIO(test_data.decode("utf-8")) + + httpd = HTTPServer(HTTP_SERVER_ADDRESS, EchoCSVHTTPServer) + + def real_func(): + for i in xrange(requests_amount): + httpd.handle_request() + + t = threading.Thread(target=real_func) + t.out = istream + return t + +def run(): + t = start_server(1) + t.start() + t.join() + +if __name__ == "__main__": + exception_text = '' + for i in range(1, 5): + try: + run() + break + except Exception as ex: + exception_text = str(ex) + time.sleep(0.1) + + if exception_text: + print("Exception: {}".format(exception_text), file=sys.stderr) + os._exit(1) +