Formats/CapnProtoRowInputStream: support Nested and Tuple, fix alignment issues

This updated contrib/capnproto to a newer version that fixes problems with
unaligned access to message frames.

It also adds support for parsing Struct types as Tuple (named or unnamed),
and Nested array types.

The `struct X { a @0 :UInt64; b @1 :Text }` in Cap'nProto is equivalent to
`x Tuple(a UInt64, b String)` in ClickHouse.

Arrays of Struct types such as `y List(X)` are equivalent to `y Nested(a UInt64, b String)`.
This commit is contained in:
Marek Vavruša 2018-09-25 22:22:03 -07:00
parent b326b95592
commit cbbcb6d9df
3 changed files with 59 additions and 14 deletions

2
contrib/capnproto vendored

@ -1 +1 @@
Subproject commit 7173ab638fdf144032411dc69fb1082cd473e08f Subproject commit a00ccd91b3746ef2ab51d40fe3265829949d1ace

View File

@ -68,13 +68,24 @@ Field convertNodeToField(capnp::DynamicValue::Reader value)
auto listValue = value.as<capnp::DynamicList>(); auto listValue = value.as<capnp::DynamicList>();
Array res(listValue.size()); Array res(listValue.size());
for (auto i : kj::indices(listValue)) for (auto i : kj::indices(listValue))
res[i] = convertNodeToField(listValue[i]); res[i] = convertNodeToField(listValue[i]);
return res; return res;
} }
case capnp::DynamicValue::ENUM: case capnp::DynamicValue::ENUM:
return UInt64(value.as<capnp::DynamicEnum>().getRaw()); return UInt64(value.as<capnp::DynamicEnum>().getRaw());
case capnp::DynamicValue::STRUCT: case capnp::DynamicValue::STRUCT:
throw Exception("STRUCT type not supported, read individual fields instead"); {
auto structValue = value.as<capnp::DynamicStruct>();
const auto & fields = structValue.getSchema().getFields();
Field field = Tuple(TupleBackend(fields.size()));
TupleBackend & tuple = get<Tuple &>(field).toUnderType();
for (auto i : kj::indices(fields))
tuple[i] = convertNodeToField(structValue.get(fields[i]));
return field;
}
case capnp::DynamicValue::CAPABILITY: case capnp::DynamicValue::CAPABILITY:
throw Exception("CAPABILITY type not supported"); throw Exception("CAPABILITY type not supported");
case capnp::DynamicValue::ANY_POINTER: case capnp::DynamicValue::ANY_POINTER:
@ -88,9 +99,8 @@ capnp::StructSchema::Field getFieldOrThrow(capnp::StructSchema node, const std::
KJ_IF_MAYBE(child, node.findFieldByName(field)) KJ_IF_MAYBE(child, node.findFieldByName(field))
return *child; return *child;
else else
throw Exception("Field " + field + " doesn't exist in schema."); throw Exception("Field " + field + " doesn't exist in schema " + node.getShortDisplayName().cStr());
} }
void CapnProtoRowInputStream::createActions(const NestedFieldList & sortedFields, capnp::StructSchema reader) void CapnProtoRowInputStream::createActions(const NestedFieldList & sortedFields, capnp::StructSchema reader)
{ {
String last; String last;
@ -110,13 +120,28 @@ void CapnProtoRowInputStream::createActions(const NestedFieldList & sortedFields
// Descend to a nested structure // Descend to a nested structure
for (; level < field.tokens.size() - 1; ++level) for (; level < field.tokens.size() - 1; ++level)
{ {
last = field.tokens[level]; auto node = getFieldOrThrow(reader, field.tokens[level]);
parent = getFieldOrThrow(reader, last); if (node.getType().isStruct()) {
reader = parent.getType().asStruct(); // Descend to field structure
actions.push_back({Action::PUSH, parent}); last = field.tokens[level];
parent = node;
reader = parent.getType().asStruct();
actions.push_back({Action::PUSH, parent});
} else if (node.getType().isList()) {
break; // Collect list
} else
throw Exception("Field " + field.tokens[level] + "is neither Struct nor List");
} }
// Read field from the structure // Read field from the structure
actions.push_back({Action::READ, getFieldOrThrow(reader, field.tokens[level]), field.pos}); auto node = getFieldOrThrow(reader, field.tokens[level]);
if (node.getType().isList() && actions.size() > 0 && actions.back().field == node) {
// The field list here flattens Nested elements into multiple arrays
// In order to map Nested types in Cap'nProto back, they need to be collected
actions.back().columns.push_back(field.pos);
} else {
actions.push_back({Action::READ, node, {field.pos}});
}
} }
} }
@ -176,7 +201,7 @@ bool CapnProtoRowInputStream::read(MutableColumns & columns)
array = heap_array.asPtr(); array = heap_array.asPtr();
} }
capnp::FlatArrayMessageReader msg(array); capnp::UnalignedFlatArrayMessageReader msg(array);
std::vector<capnp::DynamicStruct::Reader> stack; std::vector<capnp::DynamicStruct::Reader> stack;
stack.push_back(msg.getRoot<capnp::DynamicStruct>(root)); stack.push_back(msg.getRoot<capnp::DynamicStruct>(root));
@ -186,9 +211,28 @@ bool CapnProtoRowInputStream::read(MutableColumns & columns)
{ {
case Action::READ: case Action::READ:
{ {
auto & col = columns[action.column];
Field value = convertNodeToField(stack.back().get(action.field)); Field value = convertNodeToField(stack.back().get(action.field));
col->insert(value); if (action.columns.size() > 1) {
// Nested columns must be flattened into several arrays
// e.g. Array(Tuple(x ..., y ...)) -> Array(x ...), Array(y ...)
const Array & collected = DB::get<const Array &>(value);
size_t size = collected.size();
// The flattened array contains an array of a part of the nested tuple
Array flattened(size);
for (size_t column_index = 0; column_index < action.columns.size(); ++column_index) {
// Populate array with a single tuple elements
for (size_t off = 0; off < size; ++off) {
const TupleBackend & tuple = DB::get<const Tuple &>(collected[off]).toUnderType();
flattened[off] = tuple[column_index];
}
auto & col = columns[action.columns[column_index]];
col->insert(flattened);
}
} else {
auto & col = columns[action.columns[0]];
col->insert(value);
}
break; break;
} }
case Action::POP: case Action::POP:

View File

@ -41,12 +41,13 @@ private:
void createActions(const NestedFieldList & sortedFields, capnp::StructSchema reader); void createActions(const NestedFieldList & sortedFields, capnp::StructSchema reader);
/* Action for state machine for traversing nested structures. */ /* Action for state machine for traversing nested structures. */
using BlockPositionList = std::vector<size_t>;
struct Action struct Action
{ {
enum Type { POP, PUSH, READ }; enum Type { POP, PUSH, READ };
Type type; Type type;
capnp::StructSchema::Field field = {}; capnp::StructSchema::Field field = {};
size_t column = 0; BlockPositionList columns = {};
}; };
// Wrapper for classes that could throw in destructor // Wrapper for classes that could throw in destructor