Merge pull request #21438 from arenadata/ADQM-170

This commit is contained in:
Vladimir 2021-03-19 17:06:55 +03:00 committed by GitHub
commit c8b5be636f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 69 additions and 21 deletions

View File

@ -554,7 +554,7 @@ AvroDeserializer::Action AvroDeserializer::createAction(const Block & header, co
}
}
AvroDeserializer::AvroDeserializer(const Block & header, avro::ValidSchema schema, const FormatSettings & format_settings)
AvroDeserializer::AvroDeserializer(const Block & header, avro::ValidSchema schema, bool allow_missing_fields)
{
const auto & schema_root = schema.root();
if (schema_root->type() != avro::AVRO_RECORD)
@ -565,7 +565,7 @@ AvroDeserializer::AvroDeserializer(const Block & header, avro::ValidSchema schem
column_found.resize(header.columns());
row_action = createAction(header, schema_root);
// fail on missing fields when allow_missing_fields = false
if (!format_settings.avro.allow_missing_fields)
if (!allow_missing_fields)
{
for (size_t i = 0; i < header.columns(); ++i)
{
@ -592,19 +592,24 @@ void AvroDeserializer::deserializeRow(MutableColumns & columns, avro::Decoder &
AvroRowInputFormat::AvroRowInputFormat(const Block & header_, ReadBuffer & in_, Params params_, const FormatSettings & format_settings_)
: IRowInputFormat(header_, in_, params_)
, file_reader(std::make_unique<InputStreamReadBufferAdapter>(in_))
, deserializer(output.getHeader(), file_reader.dataSchema(), format_settings_)
: IRowInputFormat(header_, in_, params_),
allow_missing_fields(format_settings_.avro.allow_missing_fields)
{
file_reader.init();
}
void AvroRowInputFormat::readPrefix()
{
file_reader_ptr = std::make_unique<avro::DataFileReaderBase>(std::make_unique<InputStreamReadBufferAdapter>(in));
deserializer_ptr = std::make_unique<AvroDeserializer>(output.getHeader(), file_reader_ptr->dataSchema(), allow_missing_fields);
file_reader_ptr->init();
}
bool AvroRowInputFormat::readRow(MutableColumns & columns, RowReadExtension &ext)
{
if (file_reader.hasMore())
if (file_reader_ptr->hasMore())
{
file_reader.decr();
deserializer.deserializeRow(columns, file_reader.decoder(), ext);
file_reader_ptr->decr();
deserializer_ptr->deserializeRow(columns, file_reader_ptr->decoder(), ext);
return true;
}
return false;
@ -781,7 +786,7 @@ const AvroDeserializer & AvroConfluentRowInputFormat::getOrCreateDeserializer(Sc
if (it == deserializer_cache.end())
{
auto schema = schema_registry->getSchema(schema_id);
AvroDeserializer deserializer(output.getHeader(), schema, format_settings);
AvroDeserializer deserializer(output.getHeader(), schema, format_settings.avro.allow_missing_fields);
it = deserializer_cache.emplace(schema_id, deserializer).first;
}
return it->second;

View File

@ -25,7 +25,7 @@ namespace DB
class AvroDeserializer
{
public:
AvroDeserializer(const Block & header, avro::ValidSchema schema, const FormatSettings & format_settings);
AvroDeserializer(const Block & header, avro::ValidSchema schema, bool allow_missing_fields);
void deserializeRow(MutableColumns & columns, avro::Decoder & decoder, RowReadExtension & ext) const;
private:
@ -107,12 +107,15 @@ class AvroRowInputFormat : public IRowInputFormat
{
public:
AvroRowInputFormat(const Block & header_, ReadBuffer & in_, Params params_, const FormatSettings & format_settings_);
virtual bool readRow(MutableColumns & columns, RowReadExtension & ext) override;
bool readRow(MutableColumns & columns, RowReadExtension & ext) override;
void readPrefix() override;
String getName() const override { return "AvroRowInputFormat"; }
private:
avro::DataFileReaderBase file_reader;
AvroDeserializer deserializer;
std::unique_ptr<avro::DataFileReaderBase> file_reader_ptr;
std::unique_ptr<AvroDeserializer> deserializer_ptr;
bool allow_missing_fields;
};
/// Confluent framing + Avro binary datum encoding. Mainly used for Kafka.

View File

@ -5,8 +5,11 @@ import socket
import subprocess
import threading
import time
import io
import avro.schema
import avro.io
import avro.datafile
from confluent_kafka.avro.cached_schema_registry_client import CachedSchemaRegistryClient
from confluent_kafka.avro.serializer.message_serializer import MessageSerializer
from confluent_kafka import admin
@ -140,6 +143,37 @@ def kafka_produce_protobuf_social(topic, start_index, num_messages):
producer.flush()
print(("Produced {} messages for topic {}".format(num_messages, topic)))
def avro_message(value):
schema = avro.schema.make_avsc_object({
'name': 'row',
'type': 'record',
'fields': [
{'name': 'id', 'type': 'long'},
{'name': 'blockNo', 'type': 'int'},
{'name': 'val1', 'type': 'string'},
{'name': 'val2', 'type': 'float'},
{'name': 'val3', 'type': 'int'}
]
})
bytes_writer = io.BytesIO()
# writer = avro.io.DatumWriter(schema)
# encoder = avro.io.BinaryEncoder(bytes_writer)
# writer.write(value, encoder)
# DataFileWrite seems to be mandatory to get schema encoded
writer = avro.datafile.DataFileWriter(bytes_writer, avro.io.DatumWriter(), schema)
if isinstance(value, list):
for v in value:
writer.append(v)
else:
writer.append(value)
writer.flush()
raw_bytes = bytes_writer.getvalue()
writer.close()
bytes_writer.close()
return raw_bytes
def avro_confluent_message(schema_registry_client, value):
# type: (CachedSchemaRegistryClient, dict) -> str
@ -572,13 +606,6 @@ def test_kafka_formats(kafka_cluster):
# # ''
# # ],
# },
# 'Avro' : {
# 'data_sample' : [
# b'\x4f\x62\x6a\x01\x04\x16\x61\x76\x72\x6f\x2e\x73\x63\x68\x65\x6d\x61\x82\x03\x7b\x22\x74\x79\x70\x65\x22\x3a\x22\x72\x65\x63\x6f\x72\x64\x22\x2c\x22\x6e\x61\x6d\x65\x22\x3a\x22\x72\x6f\x77\x22\x2c\x22\x66\x69\x65\x6c\x64\x73\x22\x3a\x5b\x7b\x22\x6e\x61\x6d\x65\x22\x3a\x22\x69\x64\x22\x2c\x22\x74\x79\x70\x65\x22\x3a\x22\x6c\x6f\x6e\x67\x22\x7d\x2c\x7b\x22\x6e\x61\x6d\x65\x22\x3a\x22\x62\x6c\x6f\x63\x6b\x4e\x6f\x22\x2c\x22\x74\x79\x70\x65\x22\x3a\x22\x69\x6e\x74\x22\x7d\x2c\x7b\x22\x6e\x61\x6d\x65\x22\x3a\x22\x76\x61\x6c\x31\x22\x2c\x22\x74\x79\x70\x65\x22\x3a\x22\x73\x74\x72\x69\x6e\x67\x22\x7d\x2c\x7b\x22\x6e\x61\x6d\x65\x22\x3a\x22\x76\x61\x6c\x32\x22\x2c\x22\x74\x79\x70\x65\x22\x3a\x22\x66\x6c\x6f\x61\x74\x22\x7d\x2c\x7b\x22\x6e\x61\x6d\x65\x22\x3a\x22\x76\x61\x6c\x33\x22\x2c\x22\x74\x79\x70\x65\x22\x3a\x22\x69\x6e\x74\x22\x7d\x5d\x7d\x14\x61\x76\x72\x6f\x2e\x63\x6f\x64\x65\x63\x08\x6e\x75\x6c\x6c\x00\x8d\x1f\xf2\x17\x71\xa4\x2e\xe4\xc9\x0a\x23\x67\x12\xaa\xc6\xc0\x02\x14\x00\x00\x04\x41\x4d\x00\x00\x00\x3f\x02\x8d\x1f\xf2\x17\x71\xa4\x2e\xe4\xc9\x0a\x23\x67\x12\xaa\xc6\xc0',
# b'\x4f\x62\x6a\x01\x04\x16\x61\x76\x72\x6f\x2e\x73\x63\x68\x65\x6d\x61\x82\x03\x7b\x22\x74\x79\x70\x65\x22\x3a\x22\x72\x65\x63\x6f\x72\x64\x22\x2c\x22\x6e\x61\x6d\x65\x22\x3a\x22\x72\x6f\x77\x22\x2c\x22\x66\x69\x65\x6c\x64\x73\x22\x3a\x5b\x7b\x22\x6e\x61\x6d\x65\x22\x3a\x22\x69\x64\x22\x2c\x22\x74\x79\x70\x65\x22\x3a\x22\x6c\x6f\x6e\x67\x22\x7d\x2c\x7b\x22\x6e\x61\x6d\x65\x22\x3a\x22\x62\x6c\x6f\x63\x6b\x4e\x6f\x22\x2c\x22\x74\x79\x70\x65\x22\x3a\x22\x69\x6e\x74\x22\x7d\x2c\x7b\x22\x6e\x61\x6d\x65\x22\x3a\x22\x76\x61\x6c\x31\x22\x2c\x22\x74\x79\x70\x65\x22\x3a\x22\x73\x74\x72\x69\x6e\x67\x22\x7d\x2c\x7b\x22\x6e\x61\x6d\x65\x22\x3a\x22\x76\x61\x6c\x32\x22\x2c\x22\x74\x79\x70\x65\x22\x3a\x22\x66\x6c\x6f\x61\x74\x22\x7d\x2c\x7b\x22\x6e\x61\x6d\x65\x22\x3a\x22\x76\x61\x6c\x33\x22\x2c\x22\x74\x79\x70\x65\x22\x3a\x22\x69\x6e\x74\x22\x7d\x5d\x7d\x14\x61\x76\x72\x6f\x2e\x63\x6f\x64\x65\x63\x08\x6e\x75\x6c\x6c\x00\xeb\x9d\x51\x82\xf2\x11\x3d\x0b\xc5\x92\x97\xb2\x07\x6d\x72\x5a\x1e\xac\x02\x02\x00\x04\x41\x4d\x00\x00\x00\x3f\x02\x04\x00\x04\x41\x4d\x00\x00\x00\x3f\x02\x06\x00\x04\x41\x4d\x00\x00\x00\x3f\x02\x08\x00\x04\x41\x4d\x00\x00\x00\x3f\x02\x0a\x00\x04\x41\x4d\x00\x00\x00\x3f\x02\x0c\x00\x04\x41\x4d\x00\x00\x00\x3f\x02\x0e\x00\x04\x41\x4d\x00\x00\x00\x3f\x02\x10\x00\x04\x41\x4d\x00\x00\x00\x3f\x02\x12\x00\x04\x41\x4d\x00\x00\x00\x3f\x02\x14\x00\x04\x41\x4d\x00\x00\x00\x3f\x02\x16\x00\x04\x41\x4d\x00\x00\x00\x3f\x02\x18\x00\x04\x41\x4d\x00\x00\x00\x3f\x02\x1a\x00\x04\x41\x4d\x00\x00\x00\x3f\x02\x1c\x00\x04\x41\x4d\x00\x00\x00\x3f\x02\x1e\x00\x04\x41\x4d\x00\x00\x00\x3f\x02\xeb\x9d\x51\x82\xf2\x11\x3d\x0b\xc5\x92\x97\xb2\x07\x6d\x72\x5a',
# b'\x4f\x62\x6a\x01\x04\x16\x61\x76\x72\x6f\x2e\x73\x63\x68\x65\x6d\x61\x82\x03\x7b\x22\x74\x79\x70\x65\x22\x3a\x22\x72\x65\x63\x6f\x72\x64\x22\x2c\x22\x6e\x61\x6d\x65\x22\x3a\x22\x72\x6f\x77\x22\x2c\x22\x66\x69\x65\x6c\x64\x73\x22\x3a\x5b\x7b\x22\x6e\x61\x6d\x65\x22\x3a\x22\x69\x64\x22\x2c\x22\x74\x79\x70\x65\x22\x3a\x22\x6c\x6f\x6e\x67\x22\x7d\x2c\x7b\x22\x6e\x61\x6d\x65\x22\x3a\x22\x62\x6c\x6f\x63\x6b\x4e\x6f\x22\x2c\x22\x74\x79\x70\x65\x22\x3a\x22\x69\x6e\x74\x22\x7d\x2c\x7b\x22\x6e\x61\x6d\x65\x22\x3a\x22\x76\x61\x6c\x31\x22\x2c\x22\x74\x79\x70\x65\x22\x3a\x22\x73\x74\x72\x69\x6e\x67\x22\x7d\x2c\x7b\x22\x6e\x61\x6d\x65\x22\x3a\x22\x76\x61\x6c\x32\x22\x2c\x22\x74\x79\x70\x65\x22\x3a\x22\x66\x6c\x6f\x61\x74\x22\x7d\x2c\x7b\x22\x6e\x61\x6d\x65\x22\x3a\x22\x76\x61\x6c\x33\x22\x2c\x22\x74\x79\x70\x65\x22\x3a\x22\x69\x6e\x74\x22\x7d\x5d\x7d\x14\x61\x76\x72\x6f\x2e\x63\x6f\x64\x65\x63\x08\x6e\x75\x6c\x6c\x00\x73\x65\x4f\x7c\xd9\x33\xe1\x18\xdd\x30\xe8\x22\x2a\x58\x20\x6f\x02\x14\x00\x00\x04\x41\x4d\x00\x00\x00\x3f\x02\x73\x65\x4f\x7c\xd9\x33\xe1\x18\xdd\x30\xe8\x22\x2a\x58\x20\x6f',
# ],
# },
'AvroConfluent': {
'data_sample': [
avro_confluent_message(cluster.schema_registry_client,
@ -596,6 +623,19 @@ def test_kafka_formats(kafka_cluster):
cluster.schema_registry_port
),
'supports_empty_value': True,
},
'Avro': {
# It seems impossible to send more than one avro file per a message
# because of nature of Avro: blocks go one after another
'data_sample': [
avro_message({'id': 0, 'blockNo': 0, 'val1': str('AM'), 'val2': 0.5, "val3": 1}),
avro_message([{'id': id, 'blockNo': 0, 'val1': str('AM'),
'val2': 0.5, "val3": 1} for id in range(1, 16)]),
avro_message({'id': 0, 'blockNo': 0, 'val1': str('AM'), 'val2': 0.5, "val3": 1}),
],
'supports_empty_value': False,
}
# 'Arrow' : {
# # Not working at all: DB::Exception: Error while opening a table: Invalid: File is too small: 0, Stack trace (when copying this message, always include the lines below):