Addition to prev. revision

This commit is contained in:
Alexey Milovidov 2020-01-23 03:41:37 +03:00
parent b213f08f1c
commit 0c18bcf415

View File

@ -76,7 +76,7 @@ class InputStreamReadBufferAdapter : public avro::InputStream
public:
InputStreamReadBufferAdapter(ReadBuffer & in_) : in(in_) {}
bool next(const uint8_t ** data, size_t * len)
bool next(const uint8_t ** data, size_t * len) override
{
if (in.eof())
{
@ -91,11 +91,11 @@ public:
return true;
}
void backup(size_t len) { in.position() -= len; }
void backup(size_t len) override { in.position() -= len; }
void skip(size_t len) { in.tryIgnore(len); }
void skip(size_t len) override { in.tryIgnore(len); }
size_t byteCount() const { return in.count(); }
size_t byteCount() const override { return in.count(); }
private:
ReadBuffer & in;
@ -126,33 +126,32 @@ static void insertNumber(IColumn & column, WhichDataType type, T value)
case TypeIndex::UInt64:
assert_cast<ColumnUInt64 &>(column).insertValue(value);
break;
case TypeIndex::Int8,
case TypeIndex::Int8:
assert_cast<ColumnInt8 &>(column).insertValue(value);
break;
case TypeIndex::Int16,
case TypeIndex::Int16:
assert_cast<ColumnInt16 &>(column).insertValue(value);
break;
case TypeIndex::Int32,
case TypeIndex::Int32:
assert_cast<ColumnInt32 &>(column).insertValue(value);
break;
case TypeIndex::Int64,
case TypeIndex::Int64:
assert_cast<ColumnInt64 &>(column).insertValue(value);
break;
case TypeIndex::Float32,
case TypeIndex::Float32:
assert_cast<ColumnFloat32 &>(column).insertValue(value);
break;
case TypeIndex::Float64,
case TypeIndex::Float64:
assert_cast<ColumnFloat64 &>(column).insertValue(value);
break;
default:
throw Exception("Type " + type->getName() + " is not compatible with Avro", ErrorCodes::ILLEGAL_COLUMN);
throw Exception("Type is not compatible with Avro", ErrorCodes::ILLEGAL_COLUMN);
}
}
AvroDeserializer::DeserializeFn AvroDeserializer::createDeserializeFn(avro::NodePtr root_node, DataTypePtr target_type)
{
auto logical_type = root_node->logicalType().type();
WhichDataType target(target_type);
switch (root_node->type())
{
@ -170,33 +169,28 @@ AvroDeserializer::DeserializeFn AvroDeserializer::createDeserializeFn(avro::Node
case avro::AVRO_INT:
return [target](IColumn & column, avro::Decoder & decoder)
{
insertValue(column, target, decoder.decodeInt());
insertNumber(column, target, decoder.decodeInt());
};
break;
case avro::AVRO_LONG:
return [target](IColumn & column, avro::Decoder & decoder)
{
insertValue(column, target, decoder.decodeLong());
insertNumber(column, target, decoder.decodeLong());
};
break;
case avro::AVRO_FLOAT:
return [target](IColumn & column, avro::Decoder & decoder)
{
insertValue(column, target, decoder.decodeFloat());
insertNumber(column, target, decoder.decodeFloat());
};
break;
case avro::AVRO_DOUBLE:
return [target](IColumn & column, avro::Decoder & decoder)
{
insertValue(column, target, decoder.decodeDouble());
insertNumber(column, target, decoder.decodeDouble());
};
break;
case avro::AVRO_BOOL:
return [target](IColumn & column, avro::Decoder & decoder)
{
insertValue(column, target, decoder.decodeBool());
insertNumber(column, target, decoder.decodeBool());
};
break;
case avro::AVRO_ARRAY:
if (target.isArray())
{