diff --git a/contrib/capnproto b/contrib/capnproto index 7173ab638fd..a00ccd91b37 160000 --- a/contrib/capnproto +++ b/contrib/capnproto @@ -1 +1 @@ -Subproject commit 7173ab638fdf144032411dc69fb1082cd473e08f +Subproject commit a00ccd91b3746ef2ab51d40fe3265829949d1ace diff --git a/dbms/src/Formats/CapnProtoRowInputStream.cpp b/dbms/src/Formats/CapnProtoRowInputStream.cpp index 72fbd376399..598f33f49cb 100644 --- a/dbms/src/Formats/CapnProtoRowInputStream.cpp +++ b/dbms/src/Formats/CapnProtoRowInputStream.cpp @@ -68,13 +68,24 @@ Field convertNodeToField(capnp::DynamicValue::Reader value) auto listValue = value.as(); Array res(listValue.size()); for (auto i : kj::indices(listValue)) - res[i] = convertNodeToField(listValue[i]); + res[i] = convertNodeToField(listValue[i]); + return res; } case capnp::DynamicValue::ENUM: return UInt64(value.as().getRaw()); case capnp::DynamicValue::STRUCT: - throw Exception("STRUCT type not supported, read individual fields instead"); + { + auto structValue = value.as(); + const auto & fields = structValue.getSchema().getFields(); + + Field field = Tuple(TupleBackend(fields.size())); + TupleBackend & tuple = get(field).toUnderType(); + for (auto i : kj::indices(fields)) + tuple[i] = convertNodeToField(structValue.get(fields[i])); + + return field; + } case capnp::DynamicValue::CAPABILITY: throw Exception("CAPABILITY type not supported"); case capnp::DynamicValue::ANY_POINTER: @@ -88,9 +99,8 @@ capnp::StructSchema::Field getFieldOrThrow(capnp::StructSchema node, const std:: KJ_IF_MAYBE(child, node.findFieldByName(field)) return *child; else - throw Exception("Field " + field + " doesn't exist in schema."); + throw Exception("Field " + field + " doesn't exist in schema " + node.getShortDisplayName().cStr()); } - void CapnProtoRowInputStream::createActions(const NestedFieldList & sortedFields, capnp::StructSchema reader) { String last; @@ -110,13 +120,28 @@ void CapnProtoRowInputStream::createActions(const NestedFieldList & sortedFields // Descend to a nested structure for (; level < field.tokens.size() - 1; ++level) { - last = field.tokens[level]; - parent = getFieldOrThrow(reader, last); - reader = parent.getType().asStruct(); - actions.push_back({Action::PUSH, parent}); + auto node = getFieldOrThrow(reader, field.tokens[level]); + if (node.getType().isStruct()) { + // Descend to field structure + last = field.tokens[level]; + parent = node; + reader = parent.getType().asStruct(); + actions.push_back({Action::PUSH, parent}); + } else if (node.getType().isList()) { + break; // Collect list + } else + throw Exception("Field " + field.tokens[level] + "is neither Struct nor List"); } + // Read field from the structure - actions.push_back({Action::READ, getFieldOrThrow(reader, field.tokens[level]), field.pos}); + auto node = getFieldOrThrow(reader, field.tokens[level]); + if (node.getType().isList() && actions.size() > 0 && actions.back().field == node) { + // The field list hereĀ flattens Nested elements into multiple arrays + // In order to map Nested types in Cap'nProto back, they need to be collected + actions.back().columns.push_back(field.pos); + } else { + actions.push_back({Action::READ, node, {field.pos}}); + } } } @@ -176,7 +201,7 @@ bool CapnProtoRowInputStream::read(MutableColumns & columns) array = heap_array.asPtr(); } - capnp::FlatArrayMessageReader msg(array); + capnp::UnalignedFlatArrayMessageReader msg(array); std::vector stack; stack.push_back(msg.getRoot(root)); @@ -186,9 +211,28 @@ bool CapnProtoRowInputStream::read(MutableColumns & columns) { case Action::READ: { - auto & col = columns[action.column]; Field value = convertNodeToField(stack.back().get(action.field)); - col->insert(value); + if (action.columns.size() > 1) { + // Nested columns must be flattened into several arrays + // e.g. Array(Tuple(x ..., y ...)) -> Array(x ...), Array(y ...) + const Array & collected = DB::get(value); + size_t size = collected.size(); + // The flattened array contains an array of a part of the nested tuple + Array flattened(size); + for (size_t column_index = 0; column_index < action.columns.size(); ++column_index) { + // Populate array with a single tuple elements + for (size_t off = 0; off < size; ++off) { + const TupleBackend & tuple = DB::get(collected[off]).toUnderType(); + flattened[off] = tuple[column_index]; + } + auto & col = columns[action.columns[column_index]]; + col->insert(flattened); + } + } else { + auto & col = columns[action.columns[0]]; + col->insert(value); + } + break; } case Action::POP: diff --git a/dbms/src/Formats/CapnProtoRowInputStream.h b/dbms/src/Formats/CapnProtoRowInputStream.h index 9152649bcbb..a7fcce49143 100644 --- a/dbms/src/Formats/CapnProtoRowInputStream.h +++ b/dbms/src/Formats/CapnProtoRowInputStream.h @@ -41,12 +41,13 @@ private: void createActions(const NestedFieldList & sortedFields, capnp::StructSchema reader); /* Action for state machine for traversing nested structures. */ + using BlockPositionList = std::vector; struct Action { enum Type { POP, PUSH, READ }; Type type; capnp::StructSchema::Field field = {}; - size_t column = 0; + BlockPositionList columns = {}; }; // Wrapper for classes that could throw in destructor