diff --git a/src/Processors/Formats/Impl/NativeFormat.cpp b/src/Processors/Formats/Impl/NativeFormat.cpp index 19e2ede6b65..bd95cfd6376 100644 --- a/src/Processors/Formats/Impl/NativeFormat.cpp +++ b/src/Processors/Formats/Impl/NativeFormat.cpp @@ -15,21 +15,22 @@ namespace DB class NativeInputFormat final : public IInputFormat { public: - NativeInputFormat(ReadBuffer & buf, const Block & header) - : IInputFormat(header, buf) - , reader(buf, header, 0) {} + NativeInputFormat(ReadBuffer & buf, const Block & header_) + : IInputFormat(header_, buf) + , reader(std::make_unique(buf, header_, 0)) + , header(header_) {} String getName() const override { return "Native"; } void resetParser() override { IInputFormat::resetParser(); - reader.resetParser(); + reader->resetParser(); } Chunk generate() override { - auto block = reader.read(); + auto block = reader->read(); if (!block) return {}; @@ -40,8 +41,15 @@ public: return Chunk(block.getColumns(), num_rows); } + void setReadBuffer(ReadBuffer & in_) override + { + reader = std::make_unique(in_, header, 0); + IInputFormat::setReadBuffer(in_); + } + private: - NativeReader reader; + std::unique_ptr reader; + Block header; }; class NativeOutputFormat final : public IOutputFormat diff --git a/tests/queries/0_stateless/02187_async_inserts_all_formats.python b/tests/queries/0_stateless/02187_async_inserts_all_formats.python new file mode 100644 index 00000000000..0a909451259 --- /dev/null +++ b/tests/queries/0_stateless/02187_async_inserts_all_formats.python @@ -0,0 +1,50 @@ +#!/usr/bin/env python3 +import os +import sys + +CURDIR = os.path.dirname(os.path.realpath(__file__)) +sys.path.insert(0, os.path.join(CURDIR, 'helpers')) + +CLICKHOUSE_URL = os.environ.get('CLICKHOUSE_URL') +CLICKHOUSE_TMP = os.environ.get('CLICKHOUSE_TMP') + +from pure_http_client import ClickHouseClient + +client = ClickHouseClient() + +def run_test(data_format, gen_data_template, settings): + print(data_format) + client.query("TRUNCATE TABLE t_async_insert") + + expected = client.query(gen_data_template.format("TSV")).strip() + data = client.query(gen_data_template.format(data_format), settings=settings,binary_result=True) + + insert_query = "INSERT INTO t_async_insert FORMAT {}".format(data_format) + client.query_with_data(insert_query, data, settings=settings) + + result = client.query("SELECT * FROM t_async_insert FORMAT TSV").strip() + if result != expected: + print("Failed for format {}.\nExpected:\n{}\nGot:\n{}\n".format(data_format, expected, result)) + exit(1) + +formats = client.query("SELECT name FROM system.formats WHERE is_input AND is_output \ + AND name NOT IN ('CapnProto', 'RawBLOB', 'Template', 'ProtobufSingle', 'LineAsString', 'Protobuf') ORDER BY name").strip().split('\n') + +# Generic formats +client.query("DROP TABLE IF EXISTS t_async_insert") +client.query("CREATE TABLE t_async_insert (id UInt64, s String, arr Array(UInt64)) ENGINE = Memory") +gen_data_query = "SELECT number AS id, toString(number) AS s, range(number) AS arr FROM numbers(10) FORMAT {}" + +for data_format in formats: + run_test(data_format, gen_data_query, settings={"async_insert": 1, "wait_for_async_insert": 1}) + +# LineAsString +client.query("DROP TABLE IF EXISTS t_async_insert") +client.query("CREATE TABLE t_async_insert (s String) ENGINE = Memory") +gen_data_query = "SELECT toString(number) AS s FROM numbers(10) FORMAT {}" + +run_test('LineAsString', gen_data_query, settings={"async_insert": 1, "wait_for_async_insert": 1}) + +# TODO: add CapnProto and Protobuf + +print("OK") diff --git a/tests/queries/0_stateless/02187_async_inserts_all_formats.reference b/tests/queries/0_stateless/02187_async_inserts_all_formats.reference new file mode 100644 index 00000000000..b4a5b6c3a42 --- /dev/null +++ b/tests/queries/0_stateless/02187_async_inserts_all_formats.reference @@ -0,0 +1,40 @@ +Arrow +ArrowStream +Avro +CSV +CSVWithNames +CSVWithNamesAndTypes +CustomSeparated +CustomSeparatedWithNames +CustomSeparatedWithNamesAndTypes +JSONCompactEachRow +JSONCompactEachRowWithNames +JSONCompactEachRowWithNamesAndTypes +JSONCompactStringsEachRow +JSONCompactStringsEachRowWithNames +JSONCompactStringsEachRowWithNamesAndTypes +JSONEachRow +JSONStringsEachRow +MsgPack +Native +ORC +Parquet +RowBinary +RowBinaryWithNames +RowBinaryWithNamesAndTypes +TSKV +TSV +TSVRaw +TSVRawWithNames +TSVRawWithNamesAndTypes +TSVWithNames +TSVWithNamesAndTypes +TabSeparated +TabSeparatedRaw +TabSeparatedRawWithNames +TabSeparatedRawWithNamesAndTypes +TabSeparatedWithNames +TabSeparatedWithNamesAndTypes +Values +LineAsString +OK diff --git a/tests/queries/0_stateless/02187_async_inserts_all_formats.sh b/tests/queries/0_stateless/02187_async_inserts_all_formats.sh new file mode 100755 index 00000000000..0031f72fbe5 --- /dev/null +++ b/tests/queries/0_stateless/02187_async_inserts_all_formats.sh @@ -0,0 +1,9 @@ +#!/usr/bin/env bash +# Tags: no-fasttest + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CURDIR"/../shell_config.sh + +# We should have correct env vars from shell_config.sh to run this test +python3 "$CURDIR"/02187_async_inserts_all_formats.python diff --git a/tests/queries/0_stateless/helpers/pure_http_client.py b/tests/queries/0_stateless/helpers/pure_http_client.py index 9f79c4ac529..3335f141bb5 100644 --- a/tests/queries/0_stateless/helpers/pure_http_client.py +++ b/tests/queries/0_stateless/helpers/pure_http_client.py @@ -14,22 +14,23 @@ class ClickHouseClient: def __init__(self, host = CLICKHOUSE_SERVER_URL_STR): self.host = host - def query(self, query, connection_timeout = 1500): + def query(self, query, connection_timeout=1500, settings=dict(), binary_result=False): NUMBER_OF_TRIES = 30 DELAY = 10 + params = { + 'timeout_before_checking_execution_speed': 120, + 'max_execution_time': 6000, + 'database': CLICKHOUSE_DATABASE, + } + + # Add extra settings to params + params = {**params, **settings} + for i in range(NUMBER_OF_TRIES): - r = requests.post( - self.host, - params = { - 'timeout_before_checking_execution_speed': 120, - 'max_execution_time': 6000, - 'database': CLICKHOUSE_DATABASE - }, - timeout = connection_timeout, - data = query) + r = requests.post(self.host, params=params, timeout=connection_timeout, data=query) if r.status_code == 200: - return r.text + return r.content if binary_result else r.text else: print('ATTENTION: try #%d failed' % i) if i != (NUMBER_OF_TRIES-1): @@ -44,9 +45,22 @@ class ClickHouseClient: df = pd.read_csv(io.StringIO(data), sep = '\t') return df - def query_with_data(self, query, content): - content = content.encode('utf-8') - r = requests.post(self.host, data=content) + def query_with_data(self, query, data, connection_timeout=1500, settings=dict()): + params = { + 'query': query, + 'timeout_before_checking_execution_speed': 120, + 'max_execution_time': 6000, + 'database': CLICKHOUSE_DATABASE, + } + + headers = { + "Content-Type": "application/binary" + } + + # Add extra settings to params + params = {**params, **settings} + + r = requests.post(self.host, params=params, timeout=connection_timeout, data=data, headers=headers) result = r.text if r.status_code == 200: return result