Merge branch 'oandrew-avro' of github.com:yandex/ClickHouse into oandrew-avro

This commit is contained in:
Alexey Milovidov 2020-01-23 21:43:06 +03:00
commit 629f5e85e6
10 changed files with 120 additions and 71 deletions

View File

@ -31,7 +31,6 @@ enum class TypeIndex
Float64,
Date,
DateTime,
DateTime32 = DateTime,
DateTime64,
String,
FixedString,
@ -158,8 +157,6 @@ using Decimal32 = Decimal<Int32>;
using Decimal64 = Decimal<Int64>;
using Decimal128 = Decimal<Int128>;
// TODO (nemkov): consider making a strong typedef
//using DateTime32 = time_t;
using DateTime64 = Decimal64;
template <> struct TypeName<Decimal32> { static const char * get() { return "Decimal32"; } };

View File

@ -76,7 +76,7 @@ class InputStreamReadBufferAdapter : public avro::InputStream
public:
InputStreamReadBufferAdapter(ReadBuffer & in_) : in(in_) {}
bool next(const uint8_t ** data, size_t * len)
bool next(const uint8_t ** data, size_t * len) override
{
if (in.eof())
{
@ -91,11 +91,11 @@ public:
return true;
}
void backup(size_t len) { in.position() -= len; }
void backup(size_t len) override { in.position() -= len; }
void skip(size_t len) { in.tryIgnore(len); }
void skip(size_t len) override { in.tryIgnore(len); }
size_t byteCount() const { return in.count(); }
size_t byteCount() const override { return in.count(); }
private:
ReadBuffer & in;
@ -105,9 +105,53 @@ static void deserializeNoop(IColumn &, avro::Decoder &)
{
}
/// Insert value with conversion to the column of target type.
template <typename T>
static void insertNumber(IColumn & column, WhichDataType type, T value)
{
switch (type.idx)
{
case TypeIndex::UInt8:
assert_cast<ColumnUInt8 &>(column).insertValue(value);
break;
case TypeIndex::Date: [[fallthrough]];
case TypeIndex::UInt16:
assert_cast<ColumnUInt16 &>(column).insertValue(value);
break;
case TypeIndex::DateTime: [[fallthrough]];
case TypeIndex::UInt32:
assert_cast<ColumnUInt32 &>(column).insertValue(value);
break;
case TypeIndex::DateTime64: [[fallthrough]];
case TypeIndex::UInt64:
assert_cast<ColumnUInt64 &>(column).insertValue(value);
break;
case TypeIndex::Int8:
assert_cast<ColumnInt8 &>(column).insertValue(value);
break;
case TypeIndex::Int16:
assert_cast<ColumnInt16 &>(column).insertValue(value);
break;
case TypeIndex::Int32:
assert_cast<ColumnInt32 &>(column).insertValue(value);
break;
case TypeIndex::Int64:
assert_cast<ColumnInt64 &>(column).insertValue(value);
break;
case TypeIndex::Float32:
assert_cast<ColumnFloat32 &>(column).insertValue(value);
break;
case TypeIndex::Float64:
assert_cast<ColumnFloat64 &>(column).insertValue(value);
break;
default:
throw Exception("Type is not compatible with Avro", ErrorCodes::ILLEGAL_COLUMN);
}
}
AvroDeserializer::DeserializeFn AvroDeserializer::createDeserializeFn(avro::NodePtr root_node, DataTypePtr target_type)
{
auto logical_type = root_node->logicalType().type();
WhichDataType target(target_type);
switch (root_node->type())
{
@ -123,32 +167,15 @@ AvroDeserializer::DeserializeFn AvroDeserializer::createDeserializeFn(avro::Node
}
break;
case avro::AVRO_INT:
if (target.isInt32())
return [target](IColumn & column, avro::Decoder & decoder)
{
return [](IColumn & column, avro::Decoder & decoder)
{
assert_cast<ColumnInt32 &>(column).insertValue(decoder.decodeInt());
};
}
if (target.isDate() && logical_type == avro::LogicalType::DATE)
{
return [](IColumn & column, avro::Decoder & decoder)
{
assert_cast<DataTypeDate::ColumnType &>(column).insertValue(decoder.decodeInt());
};
}
break;
insertNumber(column, target, decoder.decodeInt());
};
case avro::AVRO_LONG:
if (target.isInt64())
{
return [](IColumn & column, avro::Decoder & decoder)
{
assert_cast<ColumnInt64 &>(column).insertValue(decoder.decodeLong());
};
}
if (target.isDateTime64())
{
auto date_time_scale = assert_cast<const DataTypeDateTime64 &>(*target_type).getScale();
auto logical_type = root_node->logicalType().type();
if ((logical_type == avro::LogicalType::TIMESTAMP_MILLIS && date_time_scale == 3)
|| (logical_type == avro::LogicalType::TIMESTAMP_MICROS && date_time_scale == 6))
{
@ -158,34 +185,29 @@ AvroDeserializer::DeserializeFn AvroDeserializer::createDeserializeFn(avro::Node
};
}
}
else
{
return [target](IColumn & column, avro::Decoder & decoder)
{
insertNumber(column, target, decoder.decodeLong());
};
}
break;
case avro::AVRO_FLOAT:
if (target.isFloat32())
return [target](IColumn & column, avro::Decoder & decoder)
{
return [](IColumn & column, avro::Decoder & decoder)
{
assert_cast<ColumnFloat32 &>(column).insertValue(decoder.decodeFloat());
};
}
break;
insertNumber(column, target, decoder.decodeFloat());
};
case avro::AVRO_DOUBLE:
if (target.isFloat64())
return [target](IColumn & column, avro::Decoder & decoder)
{
return [](IColumn & column, avro::Decoder & decoder)
{
assert_cast<ColumnFloat64 &>(column).insertValue(decoder.decodeDouble());
};
}
break;
insertNumber(column, target, decoder.decodeDouble());
};
case avro::AVRO_BOOL:
if (target.isUInt8())
return [target](IColumn & column, avro::Decoder & decoder)
{
return [](IColumn & column, avro::Decoder & decoder)
{
assert_cast<ColumnUInt8 &>(column).insertValue(decoder.decodeBool());
};
}
break;
insertNumber(column, target, decoder.decodeBool());
};
case avro::AVRO_ARRAY:
if (target.isArray())
{
@ -304,14 +326,14 @@ AvroDeserializer::DeserializeFn AvroDeserializer::createDeserializeFn(avro::Node
}
break;
}
case avro::AVRO_MAP:
case avro::AVRO_RECORD:
case avro::AVRO_MAP: [[fallthrough]];
case avro::AVRO_RECORD: [[fallthrough]];
default:
break;
}
throw Exception(
"Type " + target_type->getName() + " is not compatible" + " with Avro " + avro::ValidSchema(root_node).toJson(false),
"Type " + target_type->getName() + " is not compatible with Avro " + avro::ValidSchema(root_node).toJson(false),
ErrorCodes::ILLEGAL_COLUMN);
}
@ -394,7 +416,7 @@ AvroDeserializer::SkipFn AvroDeserializer::createSkipFn(avro::NodePtr root_node)
};
}
default:
throw Exception("Unsupported Avro type", ErrorCodes::ILLEGAL_COLUMN);
throw Exception("Unsupported Avro type " + root_node->name().fullname() + " (" + toString(int(root_node->type())) + ")", ErrorCodes::ILLEGAL_COLUMN);
}
}
@ -406,16 +428,19 @@ AvroDeserializer::AvroDeserializer(const ColumnsWithTypeAndName & columns, avro:
{
throw Exception("Root schema must be a record", ErrorCodes::TYPE_MISMATCH);
}
field_mapping.resize(schema_root->leaves(), -1);
for (size_t i = 0; i < schema_root->leaves(); ++i)
{
skip_fns.push_back(createSkipFn(schema_root->leafAt(i)));
deserialize_fns.push_back(&deserializeNoop);
}
for (size_t i = 0; i < columns.size(); ++i)
{
const auto & column = columns[i];
size_t field_index;
size_t field_index = 0;
if (!schema_root->nameIndex(column.name, field_index))
{
throw Exception("Field " + column.name + " not found in Avro schema", ErrorCodes::THERE_IS_NO_COLUMN);

View File

@ -30,8 +30,13 @@ private:
static DeserializeFn createDeserializeFn(avro::NodePtr root_node, DataTypePtr target_type);
static SkipFn createSkipFn(avro::NodePtr root_node);
/// Map from field index in Avro schema to column number in block header. Or -1 if there is no corresponding column.
std::vector<int> field_mapping;
/// How to skip the corresponding field in Avro schema.
std::vector<SkipFn> skip_fns;
/// How to deserialize the corresponding field in Avro schema.
std::vector<DeserializeFn> deserialize_fns;
};

View File

@ -78,8 +78,10 @@ private:
};
AvroSerializer::SchemaWithSerializeFn AvroSerializer::createSchemaWithSerializeFn(DataTypePtr data_type)
AvroSerializer::SchemaWithSerializeFn AvroSerializer::createSchemaWithSerializeFn(DataTypePtr data_type, size_t & type_name_increment)
{
++type_name_increment;
switch (data_type->getTypeId())
{
case TypeIndex::UInt8:
@ -169,7 +171,8 @@ AvroSerializer::SchemaWithSerializeFn AvroSerializer::createSchemaWithSerializeF
}};
case TypeIndex::FixedString:
{
auto schema = avro::FixedSchema(data_type->getSizeOfValueInMemory(), "fixed");
auto size = data_type->getSizeOfValueInMemory();
auto schema = avro::FixedSchema(size, "fixed_" + toString(type_name_increment));
return {schema, [](const IColumn & column, size_t row_num, avro::Encoder & encoder)
{
const StringRef & s = assert_cast<const ColumnFixedString &>(column).getDataAt(row_num);
@ -178,7 +181,7 @@ AvroSerializer::SchemaWithSerializeFn AvroSerializer::createSchemaWithSerializeF
}
case TypeIndex::Enum8:
{
auto schema = avro::EnumSchema("enum8");
auto schema = avro::EnumSchema("enum8_" + toString(type_name_increment)); /// type names must be different for different types.
std::unordered_map<DataTypeEnum8::FieldType, size_t> enum_mapping;
const auto & enum_values = assert_cast<const DataTypeEnum8 &>(*data_type).getValues();
for (size_t i = 0; i < enum_values.size(); ++i)
@ -194,7 +197,7 @@ AvroSerializer::SchemaWithSerializeFn AvroSerializer::createSchemaWithSerializeF
}
case TypeIndex::Enum16:
{
auto schema = avro::EnumSchema("enum16");
auto schema = avro::EnumSchema("enum16" + toString(type_name_increment));
std::unordered_map<DataTypeEnum16::FieldType, size_t> enum_mapping;
const auto & enum_values = assert_cast<const DataTypeEnum16 &>(*data_type).getValues();
for (size_t i = 0; i < enum_values.size(); ++i)
@ -211,7 +214,7 @@ AvroSerializer::SchemaWithSerializeFn AvroSerializer::createSchemaWithSerializeF
case TypeIndex::Array:
{
const auto & array_type = assert_cast<const DataTypeArray &>(*data_type);
auto nested_mapping = createSchemaWithSerializeFn(array_type.getNestedType());
auto nested_mapping = createSchemaWithSerializeFn(array_type.getNestedType(), type_name_increment);
auto schema = avro::ArraySchema(nested_mapping.schema);
return {schema, [nested_mapping](const IColumn & column, size_t row_num, avro::Encoder & encoder)
{
@ -237,7 +240,7 @@ AvroSerializer::SchemaWithSerializeFn AvroSerializer::createSchemaWithSerializeF
case TypeIndex::Nullable:
{
auto nested_type = removeNullable(data_type);
auto nested_mapping = createSchemaWithSerializeFn(nested_type);
auto nested_mapping = createSchemaWithSerializeFn(nested_type, type_name_increment);
if (nested_type->getTypeId() == TypeIndex::Nothing)
{
return nested_mapping;
@ -266,7 +269,7 @@ AvroSerializer::SchemaWithSerializeFn AvroSerializer::createSchemaWithSerializeF
case TypeIndex::LowCardinality:
{
const auto & nested_type = removeLowCardinality(data_type);
auto nested_mapping = createSchemaWithSerializeFn(nested_type);
auto nested_mapping = createSchemaWithSerializeFn(nested_type, type_name_increment);
return {nested_mapping.schema, [nested_mapping](const IColumn & column, size_t row_num, avro::Encoder & encoder)
{
const auto & col = assert_cast<const ColumnLowCardinality &>(column);
@ -285,11 +288,13 @@ AvroSerializer::SchemaWithSerializeFn AvroSerializer::createSchemaWithSerializeF
AvroSerializer::AvroSerializer(const ColumnsWithTypeAndName & columns)
{
avro::RecordSchema record_schema("row");
size_t type_name_increment = 0;
for (auto & column : columns)
{
try
{
auto field_mapping = createSchemaWithSerializeFn(column.type);
auto field_mapping = createSchemaWithSerializeFn(column.type, type_name_increment);
serialize_fns.push_back(field_mapping.serialize);
//TODO: verify name starts with A-Za-z_
record_schema.addField(column.name, field_mapping.schema);
@ -312,7 +317,7 @@ void AvroSerializer::serializeRow(const Columns & columns, size_t row_num, avro:
}
}
static avro::Codec getCodec(const std::string& codec_name)
static avro::Codec getCodec(const std::string & codec_name)
{
if (codec_name == "")
{

View File

@ -32,7 +32,9 @@ private:
avro::Schema schema;
SerializeFn serialize;
};
static SchemaWithSerializeFn createSchemaWithSerializeFn(DataTypePtr data_type);
/// Type names for different complex types (e.g. enums, fixed strings) must be unique. We use simple incremental number to give them different names.
static SchemaWithSerializeFn createSchemaWithSerializeFn(DataTypePtr data_type, size_t & type_name_increment);
std::vector<SerializeFn> serialize_fns;
avro::ValidSchema schema;

View File

@ -21,7 +21,7 @@
1000
= other
0
not compatible
1000
not found
=== output
= primitive
@ -33,4 +33,4 @@ not found
= other
0
1000
not supported
147

View File

@ -37,7 +37,7 @@ echo = other
#no data
cat $DATA_DIR/empty.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S 'a Int64' -q 'select count() from table'
# type mismatch
cat $DATA_DIR/simple.null.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S 'a Int32' -q 'select count() from table' 2>&1 | grep -i 'not compatible' -o
cat $DATA_DIR/simple.null.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S 'a Int32' -q 'select count() from table'
# field not found
cat $DATA_DIR/simple.null.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S 'b Int64' -q 'select count() from table' 2>&1 | grep -i 'not found' -o
@ -66,5 +66,5 @@ S4="a Int64"
${CLICKHOUSE_LOCAL} -q "select toInt64(number) as a from numbers(0) format Avro" | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S "$S4" -q 'select count() from table'
${CLICKHOUSE_LOCAL} -q "select toInt64(number) as a from numbers(1000) format Avro" | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S "$S4" -q 'select count() from table'
# type not supported
${CLICKHOUSE_LOCAL} -q "select toInt16(123) as a format Avro" 2>&1 | grep -i 'not supported' -o
# type supported via conversion
${CLICKHOUSE_LOCAL} -q "select toInt16(123) as a format Avro" | wc -c

View File

@ -0,0 +1,2 @@
17300372046749301651
17300372046749301651

View File

@ -0,0 +1,9 @@
DROP TABLE IF EXISTS test.avro;
CREATE TABLE test.avro AS test.hits ENGINE = File(Avro);
INSERT INTO test.avro SELECT * FROM test.hits WHERE intHash64(WatchID) % 100 = 0;
SELECT sum(cityHash64(*)) FROM test.hits WHERE intHash64(WatchID) % 100 = 0;
SELECT sum(cityHash64(*)) FROM test.avro;
DROP TABLE test.avro;

View File

@ -145,6 +145,8 @@ Q2.
Upd. На данный момент исправляются проблемы с регрессиями производительности в отдельных случаях. Кажется, что все проблемы исправлены.
Включение по-умолчанию в Q1, но остаётся вторая часть задачи по корректному выделению async части.
Upd. Включили по-умолчанию. Удаление старого кода не раньше, чем после первого релиза, в котором это включено по-умолчанию и всё ещё можно выключить обратно.
### 2.2. Инфраструктура событий/метрик/ограничений/квот/трассировки.
В очереди. https://gist.github.com/alexey-milovidov/d62d73222d83b9319dc519cbb13aeff6
@ -214,10 +216,12 @@ Upd. На данный момент исправляются проблемы с
Требует 3.1.
### 3.3. Исправить катастрофически отвратительно неприемлемый поиск по документации.
### + 3.3. Исправить катастрофически отвратительно неприемлемый поиск по документации.
[Иван Блинков](https://github.com/blinkov/) - очень хороший человек. Сам сайт документации основан на технологиях, не удовлетворяющих требованиям задачи, и эти технологии трудно исправить. Задачу будет делать первый встретившийся нам frontend разработчик, которого мы сможем заставить это сделать.
Upd. Иван Блинков сделал эту задачу путём замены треш-технологий на нормальные.
### 3.4. + Добавить японский язык в документацию.
Эту задачу сделает [Иван Блинков](https://github.com/blinkov/), до конца декабря 2019. Сделано.