mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-10 09:32:06 +00:00
Merge pull request #3216 from vavrusa/capnproto-tuple-nested
Formats/CapnProtoRowInputStream: support Nested and Tuple, fix alignm…
This commit is contained in:
commit
33578cb1b1
2
contrib/capnproto
vendored
2
contrib/capnproto
vendored
@ -1 +1 @@
|
||||
Subproject commit 7173ab638fdf144032411dc69fb1082cd473e08f
|
||||
Subproject commit a00ccd91b3746ef2ab51d40fe3265829949d1ace
|
@ -68,13 +68,24 @@ Field convertNodeToField(capnp::DynamicValue::Reader value)
|
||||
auto listValue = value.as<capnp::DynamicList>();
|
||||
Array res(listValue.size());
|
||||
for (auto i : kj::indices(listValue))
|
||||
res[i] = convertNodeToField(listValue[i]);
|
||||
res[i] = convertNodeToField(listValue[i]);
|
||||
|
||||
return res;
|
||||
}
|
||||
case capnp::DynamicValue::ENUM:
|
||||
return UInt64(value.as<capnp::DynamicEnum>().getRaw());
|
||||
case capnp::DynamicValue::STRUCT:
|
||||
throw Exception("STRUCT type not supported, read individual fields instead");
|
||||
{
|
||||
auto structValue = value.as<capnp::DynamicStruct>();
|
||||
const auto & fields = structValue.getSchema().getFields();
|
||||
|
||||
Field field = Tuple(TupleBackend(fields.size()));
|
||||
TupleBackend & tuple = get<Tuple &>(field).toUnderType();
|
||||
for (auto i : kj::indices(fields))
|
||||
tuple[i] = convertNodeToField(structValue.get(fields[i]));
|
||||
|
||||
return field;
|
||||
}
|
||||
case capnp::DynamicValue::CAPABILITY:
|
||||
throw Exception("CAPABILITY type not supported");
|
||||
case capnp::DynamicValue::ANY_POINTER:
|
||||
@ -88,7 +99,7 @@ capnp::StructSchema::Field getFieldOrThrow(capnp::StructSchema node, const std::
|
||||
KJ_IF_MAYBE(child, node.findFieldByName(field))
|
||||
return *child;
|
||||
else
|
||||
throw Exception("Field " + field + " doesn't exist in schema.");
|
||||
throw Exception("Field " + field + " doesn't exist in schema " + node.getShortDisplayName().cStr());
|
||||
}
|
||||
|
||||
void CapnProtoRowInputStream::createActions(const NestedFieldList & sortedFields, capnp::StructSchema reader)
|
||||
@ -110,13 +121,35 @@ void CapnProtoRowInputStream::createActions(const NestedFieldList & sortedFields
|
||||
// Descend to a nested structure
|
||||
for (; level < field.tokens.size() - 1; ++level)
|
||||
{
|
||||
last = field.tokens[level];
|
||||
parent = getFieldOrThrow(reader, last);
|
||||
reader = parent.getType().asStruct();
|
||||
actions.push_back({Action::PUSH, parent});
|
||||
auto node = getFieldOrThrow(reader, field.tokens[level]);
|
||||
if (node.getType().isStruct())
|
||||
{
|
||||
// Descend to field structure
|
||||
last = field.tokens[level];
|
||||
parent = node;
|
||||
reader = parent.getType().asStruct();
|
||||
actions.push_back({Action::PUSH, parent});
|
||||
}
|
||||
else if (node.getType().isList())
|
||||
{
|
||||
break; // Collect list
|
||||
}
|
||||
else
|
||||
throw Exception("Field " + field.tokens[level] + "is neither Struct nor List");
|
||||
}
|
||||
|
||||
// Read field from the structure
|
||||
actions.push_back({Action::READ, getFieldOrThrow(reader, field.tokens[level]), field.pos});
|
||||
auto node = getFieldOrThrow(reader, field.tokens[level]);
|
||||
if (node.getType().isList() && actions.size() > 0 && actions.back().field == node)
|
||||
{
|
||||
// The field list here flattens Nested elements into multiple arrays
|
||||
// In order to map Nested types in Cap'nProto back, they need to be collected
|
||||
actions.back().columns.push_back(field.pos);
|
||||
}
|
||||
else
|
||||
{
|
||||
actions.push_back({Action::READ, node, {field.pos}});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -176,7 +209,7 @@ bool CapnProtoRowInputStream::read(MutableColumns & columns)
|
||||
array = heap_array.asPtr();
|
||||
}
|
||||
|
||||
capnp::FlatArrayMessageReader msg(array);
|
||||
capnp::UnalignedFlatArrayMessageReader msg(array);
|
||||
std::vector<capnp::DynamicStruct::Reader> stack;
|
||||
stack.push_back(msg.getRoot<capnp::DynamicStruct>(root));
|
||||
|
||||
@ -186,9 +219,33 @@ bool CapnProtoRowInputStream::read(MutableColumns & columns)
|
||||
{
|
||||
case Action::READ:
|
||||
{
|
||||
auto & col = columns[action.column];
|
||||
Field value = convertNodeToField(stack.back().get(action.field));
|
||||
col->insert(value);
|
||||
if (action.columns.size() > 1)
|
||||
{
|
||||
// Nested columns must be flattened into several arrays
|
||||
// e.g. Array(Tuple(x ..., y ...)) -> Array(x ...), Array(y ...)
|
||||
const Array & collected = DB::get<const Array &>(value);
|
||||
size_t size = collected.size();
|
||||
// The flattened array contains an array of a part of the nested tuple
|
||||
Array flattened(size);
|
||||
for (size_t column_index = 0; column_index < action.columns.size(); ++column_index)
|
||||
{
|
||||
// Populate array with a single tuple elements
|
||||
for (size_t off = 0; off < size; ++off)
|
||||
{
|
||||
const TupleBackend & tuple = DB::get<const Tuple &>(collected[off]).toUnderType();
|
||||
flattened[off] = tuple[column_index];
|
||||
}
|
||||
auto & col = columns[action.columns[column_index]];
|
||||
col->insert(flattened);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
auto & col = columns[action.columns[0]];
|
||||
col->insert(value);
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
case Action::POP:
|
||||
|
@ -41,12 +41,13 @@ private:
|
||||
void createActions(const NestedFieldList & sortedFields, capnp::StructSchema reader);
|
||||
|
||||
/* Action for state machine for traversing nested structures. */
|
||||
using BlockPositionList = std::vector<size_t>;
|
||||
struct Action
|
||||
{
|
||||
enum Type { POP, PUSH, READ };
|
||||
Type type;
|
||||
capnp::StructSchema::Field field = {};
|
||||
size_t column = 0;
|
||||
BlockPositionList columns = {};
|
||||
};
|
||||
|
||||
// Wrapper for classes that could throw in destructor
|
||||
|
Loading…
Reference in New Issue
Block a user