fix asynchronous inserts with Native format

This commit is contained in:
Anton Popov 2022-01-28 03:25:15 +03:00
parent 88629657ca
commit 6c0959b907
5 changed files with 141 additions and 20 deletions

View File

@ -15,21 +15,22 @@ namespace DB
class NativeInputFormat final : public IInputFormat class NativeInputFormat final : public IInputFormat
{ {
public: public:
NativeInputFormat(ReadBuffer & buf, const Block & header) NativeInputFormat(ReadBuffer & buf, const Block & header_)
: IInputFormat(header, buf) : IInputFormat(header_, buf)
, reader(buf, header, 0) {} , reader(std::make_unique<NativeReader>(buf, header_, 0))
, header(header_) {}
String getName() const override { return "Native"; } String getName() const override { return "Native"; }
void resetParser() override void resetParser() override
{ {
IInputFormat::resetParser(); IInputFormat::resetParser();
reader.resetParser(); reader->resetParser();
} }
Chunk generate() override Chunk generate() override
{ {
auto block = reader.read(); auto block = reader->read();
if (!block) if (!block)
return {}; return {};
@ -40,8 +41,15 @@ public:
return Chunk(block.getColumns(), num_rows); return Chunk(block.getColumns(), num_rows);
} }
void setReadBuffer(ReadBuffer & in_) override
{
reader = std::make_unique<NativeReader>(in_, header, 0);
IInputFormat::setReadBuffer(in_);
}
private: private:
NativeReader reader; std::unique_ptr<NativeReader> reader;
Block header;
}; };
class NativeOutputFormat final : public IOutputFormat class NativeOutputFormat final : public IOutputFormat

View File

@ -0,0 +1,50 @@
#!/usr/bin/env python3
import os
import sys
CURDIR = os.path.dirname(os.path.realpath(__file__))
sys.path.insert(0, os.path.join(CURDIR, 'helpers'))
CLICKHOUSE_URL = os.environ.get('CLICKHOUSE_URL')
CLICKHOUSE_TMP = os.environ.get('CLICKHOUSE_TMP')
from pure_http_client import ClickHouseClient
client = ClickHouseClient()
def run_test(data_format, gen_data_template, settings):
print(data_format)
client.query("TRUNCATE TABLE t_async_insert")
expected = client.query(gen_data_template.format("TSV")).strip()
data = client.query(gen_data_template.format(data_format), settings=settings,binary_result=True)
insert_query = "INSERT INTO t_async_insert FORMAT {}".format(data_format)
client.query_with_data(insert_query, data, settings=settings)
result = client.query("SELECT * FROM t_async_insert FORMAT TSV").strip()
if result != expected:
print("Failed for format {}.\nExpected:\n{}\nGot:\n{}\n".format(data_format, expected, result))
exit(1)
formats = client.query("SELECT name FROM system.formats WHERE is_input AND is_output \
AND name NOT IN ('CapnProto', 'RawBLOB', 'Template', 'ProtobufSingle', 'LineAsString', 'Protobuf') ORDER BY name").strip().split('\n')
# Generic formats
client.query("DROP TABLE IF EXISTS t_async_insert")
client.query("CREATE TABLE t_async_insert (id UInt64, s String, arr Array(UInt64)) ENGINE = Memory")
gen_data_query = "SELECT number AS id, toString(number) AS s, range(number) AS arr FROM numbers(10) FORMAT {}"
for data_format in formats:
run_test(data_format, gen_data_query, settings={"async_insert": 1, "wait_for_async_insert": 1})
# LineAsString
client.query("DROP TABLE IF EXISTS t_async_insert")
client.query("CREATE TABLE t_async_insert (s String) ENGINE = Memory")
gen_data_query = "SELECT toString(number) AS s FROM numbers(10) FORMAT {}"
run_test('LineAsString', gen_data_query, settings={"async_insert": 1, "wait_for_async_insert": 1})
# TODO: add CapnProto and Protobuf
print("OK")

View File

@ -0,0 +1,40 @@
Arrow
ArrowStream
Avro
CSV
CSVWithNames
CSVWithNamesAndTypes
CustomSeparated
CustomSeparatedWithNames
CustomSeparatedWithNamesAndTypes
JSONCompactEachRow
JSONCompactEachRowWithNames
JSONCompactEachRowWithNamesAndTypes
JSONCompactStringsEachRow
JSONCompactStringsEachRowWithNames
JSONCompactStringsEachRowWithNamesAndTypes
JSONEachRow
JSONStringsEachRow
MsgPack
Native
ORC
Parquet
RowBinary
RowBinaryWithNames
RowBinaryWithNamesAndTypes
TSKV
TSV
TSVRaw
TSVRawWithNames
TSVRawWithNamesAndTypes
TSVWithNames
TSVWithNamesAndTypes
TabSeparated
TabSeparatedRaw
TabSeparatedRawWithNames
TabSeparatedRawWithNamesAndTypes
TabSeparatedWithNames
TabSeparatedWithNamesAndTypes
Values
LineAsString
OK

View File

@ -0,0 +1,9 @@
#!/usr/bin/env bash
# Tags: no-fasttest
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
# We should have correct env vars from shell_config.sh to run this test
python3 "$CURDIR"/02187_async_inserts_all_formats.python

View File

@ -14,22 +14,23 @@ class ClickHouseClient:
def __init__(self, host = CLICKHOUSE_SERVER_URL_STR): def __init__(self, host = CLICKHOUSE_SERVER_URL_STR):
self.host = host self.host = host
def query(self, query, connection_timeout = 1500): def query(self, query, connection_timeout=1500, settings=dict(), binary_result=False):
NUMBER_OF_TRIES = 30 NUMBER_OF_TRIES = 30
DELAY = 10 DELAY = 10
params = {
'timeout_before_checking_execution_speed': 120,
'max_execution_time': 6000,
'database': CLICKHOUSE_DATABASE,
}
# Add extra settings to params
params = {**params, **settings}
for i in range(NUMBER_OF_TRIES): for i in range(NUMBER_OF_TRIES):
r = requests.post( r = requests.post(self.host, params=params, timeout=connection_timeout, data=query)
self.host,
params = {
'timeout_before_checking_execution_speed': 120,
'max_execution_time': 6000,
'database': CLICKHOUSE_DATABASE
},
timeout = connection_timeout,
data = query)
if r.status_code == 200: if r.status_code == 200:
return r.text return r.content if binary_result else r.text
else: else:
print('ATTENTION: try #%d failed' % i) print('ATTENTION: try #%d failed' % i)
if i != (NUMBER_OF_TRIES-1): if i != (NUMBER_OF_TRIES-1):
@ -44,9 +45,22 @@ class ClickHouseClient:
df = pd.read_csv(io.StringIO(data), sep = '\t') df = pd.read_csv(io.StringIO(data), sep = '\t')
return df return df
def query_with_data(self, query, content): def query_with_data(self, query, data, connection_timeout=1500, settings=dict()):
content = content.encode('utf-8') params = {
r = requests.post(self.host, data=content) 'query': query,
'timeout_before_checking_execution_speed': 120,
'max_execution_time': 6000,
'database': CLICKHOUSE_DATABASE,
}
headers = {
"Content-Type": "application/binary"
}
# Add extra settings to params
params = {**params, **settings}
r = requests.post(self.host, params=params, timeout=connection_timeout, data=data, headers=headers)
result = r.text result = r.text
if r.status_code == 200: if r.status_code == 200:
return result return result