Merge pull request #34068 from CurtizJ/fix-async-insert-native

Fix asynchronous inserts with `Native` format
This commit is contained in:
Anton Popov 2022-01-29 01:24:53 +03:00 committed by GitHub
commit b950a12cb3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 141 additions and 20 deletions

View File

@ -15,21 +15,22 @@ namespace DB
class NativeInputFormat final : public IInputFormat
{
public:
NativeInputFormat(ReadBuffer & buf, const Block & header)
: IInputFormat(header, buf)
, reader(buf, header, 0) {}
NativeInputFormat(ReadBuffer & buf, const Block & header_)
: IInputFormat(header_, buf)
, reader(std::make_unique<NativeReader>(buf, header_, 0))
, header(header_) {}
String getName() const override { return "Native"; }
void resetParser() override
{
IInputFormat::resetParser();
reader.resetParser();
reader->resetParser();
}
Chunk generate() override
{
auto block = reader.read();
auto block = reader->read();
if (!block)
return {};
@ -40,8 +41,15 @@ public:
return Chunk(block.getColumns(), num_rows);
}
void setReadBuffer(ReadBuffer & in_) override
{
reader = std::make_unique<NativeReader>(in_, header, 0);
IInputFormat::setReadBuffer(in_);
}
private:
NativeReader reader;
std::unique_ptr<NativeReader> reader;
Block header;
};
class NativeOutputFormat final : public IOutputFormat

View File

@ -0,0 +1,50 @@
#!/usr/bin/env python3
import os
import sys
CURDIR = os.path.dirname(os.path.realpath(__file__))
sys.path.insert(0, os.path.join(CURDIR, 'helpers'))
CLICKHOUSE_URL = os.environ.get('CLICKHOUSE_URL')
CLICKHOUSE_TMP = os.environ.get('CLICKHOUSE_TMP')
from pure_http_client import ClickHouseClient
client = ClickHouseClient()
def run_test(data_format, gen_data_template, settings):
print(data_format)
client.query("TRUNCATE TABLE t_async_insert")
expected = client.query(gen_data_template.format("TSV")).strip()
data = client.query(gen_data_template.format(data_format), settings=settings,binary_result=True)
insert_query = "INSERT INTO t_async_insert FORMAT {}".format(data_format)
client.query_with_data(insert_query, data, settings=settings)
result = client.query("SELECT * FROM t_async_insert FORMAT TSV").strip()
if result != expected:
print("Failed for format {}.\nExpected:\n{}\nGot:\n{}\n".format(data_format, expected, result))
exit(1)
formats = client.query("SELECT name FROM system.formats WHERE is_input AND is_output \
AND name NOT IN ('CapnProto', 'RawBLOB', 'Template', 'ProtobufSingle', 'LineAsString', 'Protobuf') ORDER BY name").strip().split('\n')
# Generic formats
client.query("DROP TABLE IF EXISTS t_async_insert")
client.query("CREATE TABLE t_async_insert (id UInt64, s String, arr Array(UInt64)) ENGINE = Memory")
gen_data_query = "SELECT number AS id, toString(number) AS s, range(number) AS arr FROM numbers(10) FORMAT {}"
for data_format in formats:
run_test(data_format, gen_data_query, settings={"async_insert": 1, "wait_for_async_insert": 1})
# LineAsString
client.query("DROP TABLE IF EXISTS t_async_insert")
client.query("CREATE TABLE t_async_insert (s String) ENGINE = Memory")
gen_data_query = "SELECT toString(number) AS s FROM numbers(10) FORMAT {}"
run_test('LineAsString', gen_data_query, settings={"async_insert": 1, "wait_for_async_insert": 1})
# TODO: add CapnProto and Protobuf
print("OK")

View File

@ -0,0 +1,40 @@
Arrow
ArrowStream
Avro
CSV
CSVWithNames
CSVWithNamesAndTypes
CustomSeparated
CustomSeparatedWithNames
CustomSeparatedWithNamesAndTypes
JSONCompactEachRow
JSONCompactEachRowWithNames
JSONCompactEachRowWithNamesAndTypes
JSONCompactStringsEachRow
JSONCompactStringsEachRowWithNames
JSONCompactStringsEachRowWithNamesAndTypes
JSONEachRow
JSONStringsEachRow
MsgPack
Native
ORC
Parquet
RowBinary
RowBinaryWithNames
RowBinaryWithNamesAndTypes
TSKV
TSV
TSVRaw
TSVRawWithNames
TSVRawWithNamesAndTypes
TSVWithNames
TSVWithNamesAndTypes
TabSeparated
TabSeparatedRaw
TabSeparatedRawWithNames
TabSeparatedRawWithNamesAndTypes
TabSeparatedWithNames
TabSeparatedWithNamesAndTypes
Values
LineAsString
OK

View File

@ -0,0 +1,9 @@
#!/usr/bin/env bash
# Tags: no-fasttest, long
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
# We should have correct env vars from shell_config.sh to run this test
python3 "$CURDIR"/02187_async_inserts_all_formats.python

View File

@ -14,22 +14,23 @@ class ClickHouseClient:
def __init__(self, host = CLICKHOUSE_SERVER_URL_STR):
self.host = host
def query(self, query, connection_timeout = 1500):
def query(self, query, connection_timeout=1500, settings=dict(), binary_result=False):
NUMBER_OF_TRIES = 30
DELAY = 10
params = {
'timeout_before_checking_execution_speed': 120,
'max_execution_time': 6000,
'database': CLICKHOUSE_DATABASE,
}
# Add extra settings to params
params = {**params, **settings}
for i in range(NUMBER_OF_TRIES):
r = requests.post(
self.host,
params = {
'timeout_before_checking_execution_speed': 120,
'max_execution_time': 6000,
'database': CLICKHOUSE_DATABASE
},
timeout = connection_timeout,
data = query)
r = requests.post(self.host, params=params, timeout=connection_timeout, data=query)
if r.status_code == 200:
return r.text
return r.content if binary_result else r.text
else:
print('ATTENTION: try #%d failed' % i)
if i != (NUMBER_OF_TRIES-1):
@ -44,9 +45,22 @@ class ClickHouseClient:
df = pd.read_csv(io.StringIO(data), sep = '\t')
return df
def query_with_data(self, query, content):
content = content.encode('utf-8')
r = requests.post(self.host, data=content)
def query_with_data(self, query, data, connection_timeout=1500, settings=dict()):
params = {
'query': query,
'timeout_before_checking_execution_speed': 120,
'max_execution_time': 6000,
'database': CLICKHOUSE_DATABASE,
}
headers = {
"Content-Type": "application/binary"
}
# Add extra settings to params
params = {**params, **settings}
r = requests.post(self.host, params=params, timeout=connection_timeout, data=data, headers=headers)
result = r.text
if r.status_code == 200:
return result