mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-24 02:30:51 +00:00
Merged with master.
This commit is contained in:
parent
abc63f8a06
commit
cddbd6db3c
@ -1,332 +0,0 @@
|
||||
#include "CapnProtoRowInputStream.h"
|
||||
|
||||
#if USE_CAPNP
|
||||
#include <IO/ReadBuffer.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Formats/FormatFactory.h>
|
||||
#include <Formats/BlockInputStreamFromRowInputStream.h>
|
||||
#include <Formats/FormatSchemaInfo.h>
|
||||
#include <capnp/serialize.h>
|
||||
#include <capnp/dynamic.h>
|
||||
#include <capnp/common.h>
|
||||
#include <boost/algorithm/string.hpp>
|
||||
#include <boost/range/join.hpp>
|
||||
#include <common/logger_useful.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int BAD_TYPE_OF_FIELD;
|
||||
extern const int BAD_ARGUMENTS;
|
||||
extern const int THERE_IS_NO_COLUMN;
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
|
||||
CapnProtoRowInputStream::NestedField split(const Block & header, size_t i)
|
||||
{
|
||||
CapnProtoRowInputStream::NestedField field = {{}, i};
|
||||
|
||||
// Remove leading dot in field definition, e.g. ".msg" -> "msg"
|
||||
String name(header.safeGetByPosition(i).name);
|
||||
if (name.size() > 0 && name[0] == '.')
|
||||
name.erase(0, 1);
|
||||
|
||||
boost::split(field.tokens, name, boost::is_any_of("._"));
|
||||
return field;
|
||||
}
|
||||
|
||||
|
||||
Field convertNodeToField(capnp::DynamicValue::Reader value)
|
||||
{
|
||||
switch (value.getType())
|
||||
{
|
||||
case capnp::DynamicValue::UNKNOWN:
|
||||
throw Exception("Unknown field type", ErrorCodes::BAD_TYPE_OF_FIELD);
|
||||
case capnp::DynamicValue::VOID:
|
||||
return Field();
|
||||
case capnp::DynamicValue::BOOL:
|
||||
return value.as<bool>() ? 1u : 0u;
|
||||
case capnp::DynamicValue::INT:
|
||||
return value.as<int64_t>();
|
||||
case capnp::DynamicValue::UINT:
|
||||
return value.as<uint64_t>();
|
||||
case capnp::DynamicValue::FLOAT:
|
||||
return value.as<double>();
|
||||
case capnp::DynamicValue::TEXT:
|
||||
{
|
||||
auto arr = value.as<capnp::Text>();
|
||||
return String(arr.begin(), arr.size());
|
||||
}
|
||||
case capnp::DynamicValue::DATA:
|
||||
{
|
||||
auto arr = value.as<capnp::Data>().asChars();
|
||||
return String(arr.begin(), arr.size());
|
||||
}
|
||||
case capnp::DynamicValue::LIST:
|
||||
{
|
||||
auto listValue = value.as<capnp::DynamicList>();
|
||||
Array res(listValue.size());
|
||||
for (auto i : kj::indices(listValue))
|
||||
res[i] = convertNodeToField(listValue[i]);
|
||||
|
||||
return res;
|
||||
}
|
||||
case capnp::DynamicValue::ENUM:
|
||||
return value.as<capnp::DynamicEnum>().getRaw();
|
||||
case capnp::DynamicValue::STRUCT:
|
||||
{
|
||||
auto structValue = value.as<capnp::DynamicStruct>();
|
||||
const auto & fields = structValue.getSchema().getFields();
|
||||
|
||||
Field field = Tuple(TupleBackend(fields.size()));
|
||||
TupleBackend & tuple = get<Tuple &>(field).toUnderType();
|
||||
for (auto i : kj::indices(fields))
|
||||
tuple[i] = convertNodeToField(structValue.get(fields[i]));
|
||||
|
||||
return field;
|
||||
}
|
||||
case capnp::DynamicValue::CAPABILITY:
|
||||
throw Exception("CAPABILITY type not supported", ErrorCodes::BAD_TYPE_OF_FIELD);
|
||||
case capnp::DynamicValue::ANY_POINTER:
|
||||
throw Exception("ANY_POINTER type not supported", ErrorCodes::BAD_TYPE_OF_FIELD);
|
||||
}
|
||||
return Field();
|
||||
}
|
||||
|
||||
capnp::StructSchema::Field getFieldOrThrow(capnp::StructSchema node, const std::string & field)
|
||||
{
|
||||
KJ_IF_MAYBE(child, node.findFieldByName(field))
|
||||
return *child;
|
||||
else
|
||||
throw Exception("Field " + field + " doesn't exist in schema " + node.getShortDisplayName().cStr(), ErrorCodes::THERE_IS_NO_COLUMN);
|
||||
}
|
||||
|
||||
|
||||
void CapnProtoRowInputStream::createActions(const NestedFieldList & sorted_fields, capnp::StructSchema reader)
|
||||
{
|
||||
/// Columns in a table can map to fields in Cap'n'Proto or to structs.
|
||||
|
||||
/// Store common parents and their tokens in order to backtrack.
|
||||
std::vector<capnp::StructSchema::Field> parents;
|
||||
std::vector<std::string> parent_tokens;
|
||||
|
||||
capnp::StructSchema cur_reader = reader;
|
||||
|
||||
for (const auto & field : sorted_fields)
|
||||
{
|
||||
if (field.tokens.empty())
|
||||
throw Exception("Logical error in CapnProtoRowInputStream", ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
// Backtrack to common parent
|
||||
while (field.tokens.size() < parent_tokens.size() + 1
|
||||
|| !std::equal(parent_tokens.begin(), parent_tokens.end(), field.tokens.begin()))
|
||||
{
|
||||
actions.push_back({Action::POP});
|
||||
parents.pop_back();
|
||||
parent_tokens.pop_back();
|
||||
|
||||
if (parents.empty())
|
||||
{
|
||||
cur_reader = reader;
|
||||
break;
|
||||
}
|
||||
else
|
||||
cur_reader = parents.back().getType().asStruct();
|
||||
}
|
||||
|
||||
// Go forward
|
||||
while (parent_tokens.size() + 1 < field.tokens.size())
|
||||
{
|
||||
const auto & token = field.tokens[parents.size()];
|
||||
auto node = getFieldOrThrow(cur_reader, token);
|
||||
if (node.getType().isStruct())
|
||||
{
|
||||
// Descend to field structure
|
||||
parents.emplace_back(node);
|
||||
parent_tokens.emplace_back(token);
|
||||
cur_reader = node.getType().asStruct();
|
||||
actions.push_back({Action::PUSH, node});
|
||||
}
|
||||
else if (node.getType().isList())
|
||||
{
|
||||
break; // Collect list
|
||||
}
|
||||
else
|
||||
throw Exception("Field " + token + " is neither Struct nor List", ErrorCodes::BAD_TYPE_OF_FIELD);
|
||||
}
|
||||
|
||||
// Read field from the structure
|
||||
auto node = getFieldOrThrow(cur_reader, field.tokens[parents.size()]);
|
||||
if (node.getType().isList() && actions.size() > 0 && actions.back().field == node)
|
||||
{
|
||||
// The field list here flattens Nested elements into multiple arrays
|
||||
// In order to map Nested types in Cap'nProto back, they need to be collected
|
||||
// Since the field names are sorted, the order of field positions must be preserved
|
||||
// For example, if the fields are { b @0 :Text, a @1 :Text }, the `a` would come first
|
||||
// even though it's position is second.
|
||||
auto & columns = actions.back().columns;
|
||||
auto it = std::upper_bound(columns.cbegin(), columns.cend(), field.pos);
|
||||
columns.insert(it, field.pos);
|
||||
}
|
||||
else
|
||||
{
|
||||
actions.push_back({Action::READ, node, {field.pos}});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
CapnProtoRowInputStream::CapnProtoRowInputStream(ReadBuffer & istr_, const Block & header_, const FormatSchemaInfo& info)
|
||||
: istr(istr_), header(header_), parser(std::make_shared<SchemaParser>())
|
||||
{
|
||||
// Parse the schema and fetch the root object
|
||||
|
||||
#pragma GCC diagnostic push
|
||||
#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
|
||||
auto schema = parser->impl.parseDiskFile(info.schemaPath(), info.absoluteSchemaPath(), {});
|
||||
#pragma GCC diagnostic pop
|
||||
|
||||
root = schema.getNested(info.messageName()).asStruct();
|
||||
|
||||
/**
|
||||
* The schema typically consists of fields in various nested structures.
|
||||
* Here we gather the list of fields and sort them in a way so that fields in the same structure are adjacent,
|
||||
* and the nesting level doesn't decrease to make traversal easier.
|
||||
*/
|
||||
NestedFieldList list;
|
||||
size_t num_columns = header.columns();
|
||||
for (size_t i = 0; i < num_columns; ++i)
|
||||
list.push_back(split(header, i));
|
||||
|
||||
// Order list first by value of strings then by length of string vector.
|
||||
std::sort(list.begin(), list.end(), [](const NestedField & a, const NestedField & b) { return a.tokens < b.tokens; });
|
||||
createActions(list, root);
|
||||
}
|
||||
|
||||
kj::Array<capnp::word> CapnProtoRowInputStream::readMessage()
|
||||
{
|
||||
uint32_t segment_count;
|
||||
istr.readStrict(reinterpret_cast<char*>(&segment_count), sizeof(uint32_t));
|
||||
|
||||
// one for segmentCount and one because segmentCount starts from 0
|
||||
const auto prefix_size = (2 + segment_count) * sizeof(uint32_t);
|
||||
const auto words_prefix_size = (segment_count + 1) / 2 + 1;
|
||||
auto prefix = kj::heapArray<capnp::word>(words_prefix_size);
|
||||
auto prefix_chars = prefix.asChars();
|
||||
::memcpy(prefix_chars.begin(), &segment_count, sizeof(uint32_t));
|
||||
|
||||
// read size of each segment
|
||||
for (size_t i = 0; i <= segment_count; ++i)
|
||||
istr.readStrict(prefix_chars.begin() + ((i + 1) * sizeof(uint32_t)), sizeof(uint32_t));
|
||||
|
||||
// calculate size of message
|
||||
const auto expected_words = capnp::expectedSizeInWordsFromPrefix(prefix);
|
||||
const auto expected_bytes = expected_words * sizeof(capnp::word);
|
||||
const auto data_size = expected_bytes - prefix_size;
|
||||
auto msg = kj::heapArray<capnp::word>(expected_words);
|
||||
auto msg_chars = msg.asChars();
|
||||
|
||||
// read full message
|
||||
::memcpy(msg_chars.begin(), prefix_chars.begin(), prefix_size);
|
||||
istr.readStrict(msg_chars.begin() + prefix_size, data_size);
|
||||
|
||||
return msg;
|
||||
}
|
||||
|
||||
bool CapnProtoRowInputStream::read(MutableColumns & columns, RowReadExtension &)
|
||||
{
|
||||
if (istr.eof())
|
||||
return false;
|
||||
|
||||
auto array = readMessage();
|
||||
|
||||
#if CAPNP_VERSION >= 8000
|
||||
capnp::UnalignedFlatArrayMessageReader msg(array);
|
||||
#else
|
||||
capnp::FlatArrayMessageReader msg(array);
|
||||
#endif
|
||||
std::vector<capnp::DynamicStruct::Reader> stack;
|
||||
stack.push_back(msg.getRoot<capnp::DynamicStruct>(root));
|
||||
|
||||
for (auto action : actions)
|
||||
{
|
||||
switch (action.type)
|
||||
{
|
||||
case Action::READ:
|
||||
{
|
||||
Field value = convertNodeToField(stack.back().get(action.field));
|
||||
if (action.columns.size() > 1)
|
||||
{
|
||||
// Nested columns must be flattened into several arrays
|
||||
// e.g. Array(Tuple(x ..., y ...)) -> Array(x ...), Array(y ...)
|
||||
const Array & collected = DB::get<const Array &>(value);
|
||||
size_t size = collected.size();
|
||||
// The flattened array contains an array of a part of the nested tuple
|
||||
Array flattened(size);
|
||||
for (size_t column_index = 0; column_index < action.columns.size(); ++column_index)
|
||||
{
|
||||
// Populate array with a single tuple elements
|
||||
for (size_t off = 0; off < size; ++off)
|
||||
{
|
||||
const TupleBackend & tuple = DB::get<const Tuple &>(collected[off]).toUnderType();
|
||||
flattened[off] = tuple[column_index];
|
||||
}
|
||||
auto & col = columns[action.columns[column_index]];
|
||||
col->insert(flattened);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
auto & col = columns[action.columns[0]];
|
||||
col->insert(value);
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
case Action::POP:
|
||||
stack.pop_back();
|
||||
break;
|
||||
case Action::PUSH:
|
||||
stack.push_back(stack.back().get(action.field).as<capnp::DynamicStruct>());
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
void registerInputFormatCapnProto(FormatFactory & factory)
|
||||
{
|
||||
factory.registerInputFormat(
|
||||
"CapnProto",
|
||||
[](ReadBuffer & buf,
|
||||
const Block & sample,
|
||||
const Context & context,
|
||||
UInt64 max_block_size,
|
||||
UInt64 rows_portion_size,
|
||||
FormatFactory::ReadCallback callback,
|
||||
const FormatSettings & settings)
|
||||
{
|
||||
return std::make_shared<BlockInputStreamFromRowInputStream>(
|
||||
std::make_shared<CapnProtoRowInputStream>(buf, sample, FormatSchemaInfo(context, "CapnProto")),
|
||||
sample,
|
||||
max_block_size,
|
||||
rows_portion_size,
|
||||
callback,
|
||||
settings);
|
||||
});
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
#else
|
||||
|
||||
namespace DB
|
||||
{
|
||||
class FormatFactory;
|
||||
void registerInputFormatCapnProto(FormatFactory &) {}
|
||||
}
|
||||
|
||||
#endif // USE_CAPNP
|
@ -1,494 +0,0 @@
|
||||
#include "ParquetBlockInputStream.h"
|
||||
|
||||
#if USE_PARQUET
|
||||
# include <algorithm>
|
||||
# include <iterator>
|
||||
# include <vector>
|
||||
// TODO: clear includes
|
||||
# include <Columns/ColumnNullable.h>
|
||||
# include <Columns/ColumnString.h>
|
||||
# include <Columns/ColumnsNumber.h>
|
||||
# include <Columns/IColumn.h>
|
||||
# include <Core/ColumnWithTypeAndName.h>
|
||||
# include <DataTypes/DataTypeDate.h>
|
||||
# include <DataTypes/DataTypeDateTime.h>
|
||||
# include <DataTypes/DataTypeFactory.h>
|
||||
# include <DataTypes/DataTypeNullable.h>
|
||||
# include <DataTypes/DataTypeString.h>
|
||||
# include <DataTypes/DataTypesDecimal.h>
|
||||
# include <DataTypes/DataTypesNumber.h>
|
||||
# include <Formats/FormatFactory.h>
|
||||
# include <IO/BufferBase.h>
|
||||
# include <IO/ReadBufferFromMemory.h>
|
||||
# include <IO/WriteBufferFromString.h>
|
||||
# include <IO/WriteHelpers.h>
|
||||
# include <IO/copyData.h>
|
||||
# include <Interpreters/castColumn.h>
|
||||
# include <arrow/api.h>
|
||||
# include <parquet/arrow/reader.h>
|
||||
# include <parquet/file_reader.h>
|
||||
# include <common/DateLUTImpl.h>
|
||||
# include <ext/range.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int UNKNOWN_TYPE;
|
||||
extern const int VALUE_IS_OUT_OF_RANGE_OF_DATA_TYPE;
|
||||
extern const int CANNOT_READ_ALL_DATA;
|
||||
extern const int EMPTY_DATA_PASSED;
|
||||
extern const int SIZES_OF_COLUMNS_DOESNT_MATCH;
|
||||
extern const int CANNOT_CONVERT_TYPE;
|
||||
extern const int CANNOT_INSERT_NULL_IN_ORDINARY_COLUMN;
|
||||
extern const int THERE_IS_NO_COLUMN;
|
||||
}
|
||||
|
||||
ParquetBlockInputStream::ParquetBlockInputStream(ReadBuffer & istr_, const Block & header_, const Context & context_)
|
||||
: istr{istr_}, header{header_}, context{context_}
|
||||
{
|
||||
}
|
||||
|
||||
Block ParquetBlockInputStream::getHeader() const
|
||||
{
|
||||
return header;
|
||||
}
|
||||
|
||||
/// Inserts numeric data right into internal column data to reduce an overhead
|
||||
template <typename NumericType, typename VectorType = ColumnVector<NumericType>>
|
||||
void fillColumnWithNumericData(std::shared_ptr<arrow::Column> & arrow_column, MutableColumnPtr & internal_column)
|
||||
{
|
||||
auto & column_data = static_cast<VectorType &>(*internal_column).getData();
|
||||
column_data.reserve(arrow_column->length());
|
||||
|
||||
for (size_t chunk_i = 0, num_chunks = static_cast<size_t>(arrow_column->data()->num_chunks()); chunk_i < num_chunks; ++chunk_i)
|
||||
{
|
||||
std::shared_ptr<arrow::Array> chunk = arrow_column->data()->chunk(chunk_i);
|
||||
/// buffers[0] is a null bitmap and buffers[1] are actual values
|
||||
std::shared_ptr<arrow::Buffer> buffer = chunk->data()->buffers[1];
|
||||
|
||||
const auto * raw_data = reinterpret_cast<const NumericType *>(buffer->data());
|
||||
column_data.insert_assume_reserved(raw_data, raw_data + chunk->length());
|
||||
}
|
||||
}
|
||||
|
||||
/// Inserts chars and offsets right into internal column data to reduce an overhead.
|
||||
/// Internal offsets are shifted by one to the right in comparison with Arrow ones. So the last offset should map to the end of all chars.
|
||||
/// Also internal strings are null terminated.
|
||||
void fillColumnWithStringData(std::shared_ptr<arrow::Column> & arrow_column, MutableColumnPtr & internal_column)
|
||||
{
|
||||
PaddedPODArray<UInt8> & column_chars_t = static_cast<ColumnString &>(*internal_column).getChars();
|
||||
PaddedPODArray<UInt64> & column_offsets = static_cast<ColumnString &>(*internal_column).getOffsets();
|
||||
|
||||
size_t chars_t_size = 0;
|
||||
for (size_t chunk_i = 0, num_chunks = static_cast<size_t>(arrow_column->data()->num_chunks()); chunk_i < num_chunks; ++chunk_i)
|
||||
{
|
||||
arrow::BinaryArray & chunk = static_cast<arrow::BinaryArray &>(*(arrow_column->data()->chunk(chunk_i)));
|
||||
const size_t chunk_length = chunk.length();
|
||||
|
||||
chars_t_size += chunk.value_offset(chunk_length - 1) + chunk.value_length(chunk_length - 1);
|
||||
chars_t_size += chunk_length; /// additional space for null bytes
|
||||
}
|
||||
|
||||
column_chars_t.reserve(chars_t_size);
|
||||
column_offsets.reserve(arrow_column->length());
|
||||
|
||||
for (size_t chunk_i = 0, num_chunks = static_cast<size_t>(arrow_column->data()->num_chunks()); chunk_i < num_chunks; ++chunk_i)
|
||||
{
|
||||
arrow::BinaryArray & chunk = static_cast<arrow::BinaryArray &>(*(arrow_column->data()->chunk(chunk_i)));
|
||||
std::shared_ptr<arrow::Buffer> buffer = chunk.value_data();
|
||||
const size_t chunk_length = chunk.length();
|
||||
|
||||
for (size_t offset_i = 0; offset_i != chunk_length; ++offset_i)
|
||||
{
|
||||
if (!chunk.IsNull(offset_i) && buffer)
|
||||
{
|
||||
const UInt8 * raw_data = buffer->data() + chunk.value_offset(offset_i);
|
||||
column_chars_t.insert_assume_reserved(raw_data, raw_data + chunk.value_length(offset_i));
|
||||
}
|
||||
column_chars_t.emplace_back('\0');
|
||||
|
||||
column_offsets.emplace_back(column_chars_t.size());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void fillColumnWithBooleanData(std::shared_ptr<arrow::Column> & arrow_column, MutableColumnPtr & internal_column)
|
||||
{
|
||||
auto & column_data = static_cast<ColumnVector<UInt8> &>(*internal_column).getData();
|
||||
column_data.resize(arrow_column->length());
|
||||
|
||||
for (size_t chunk_i = 0, num_chunks = static_cast<size_t>(arrow_column->data()->num_chunks()); chunk_i < num_chunks; ++chunk_i)
|
||||
{
|
||||
arrow::BooleanArray & chunk = static_cast<arrow::BooleanArray &>(*(arrow_column->data()->chunk(chunk_i)));
|
||||
/// buffers[0] is a null bitmap and buffers[1] are actual values
|
||||
std::shared_ptr<arrow::Buffer> buffer = chunk.data()->buffers[1];
|
||||
|
||||
for (size_t bool_i = 0; bool_i != static_cast<size_t>(chunk.length()); ++bool_i)
|
||||
column_data[bool_i] = chunk.Value(bool_i);
|
||||
}
|
||||
}
|
||||
|
||||
/// Arrow stores Parquet::DATE in Int32, while ClickHouse stores Date in UInt16. Therefore, it should be checked before saving
|
||||
void fillColumnWithDate32Data(std::shared_ptr<arrow::Column> & arrow_column, MutableColumnPtr & internal_column)
|
||||
{
|
||||
PaddedPODArray<UInt16> & column_data = static_cast<ColumnVector<UInt16> &>(*internal_column).getData();
|
||||
column_data.reserve(arrow_column->length());
|
||||
|
||||
for (size_t chunk_i = 0, num_chunks = static_cast<size_t>(arrow_column->data()->num_chunks()); chunk_i < num_chunks; ++chunk_i)
|
||||
{
|
||||
arrow::Date32Array & chunk = static_cast<arrow::Date32Array &>(*(arrow_column->data()->chunk(chunk_i)));
|
||||
|
||||
for (size_t value_i = 0, length = static_cast<size_t>(chunk.length()); value_i < length; ++value_i)
|
||||
{
|
||||
UInt32 days_num = static_cast<UInt32>(chunk.Value(value_i));
|
||||
if (days_num > DATE_LUT_MAX_DAY_NUM)
|
||||
{
|
||||
// TODO: will it rollback correctly?
|
||||
throw Exception{"Input value " + std::to_string(days_num) + " of a column \"" + arrow_column->name()
|
||||
+ "\" is greater than "
|
||||
"max allowed Date value, which is "
|
||||
+ std::to_string(DATE_LUT_MAX_DAY_NUM),
|
||||
ErrorCodes::VALUE_IS_OUT_OF_RANGE_OF_DATA_TYPE};
|
||||
}
|
||||
|
||||
column_data.emplace_back(days_num);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Arrow stores Parquet::DATETIME in Int64, while ClickHouse stores DateTime in UInt32. Therefore, it should be checked before saving
|
||||
void fillColumnWithDate64Data(std::shared_ptr<arrow::Column> & arrow_column, MutableColumnPtr & internal_column)
|
||||
{
|
||||
auto & column_data = static_cast<ColumnVector<UInt32> &>(*internal_column).getData();
|
||||
column_data.reserve(arrow_column->length());
|
||||
|
||||
for (size_t chunk_i = 0, num_chunks = static_cast<size_t>(arrow_column->data()->num_chunks()); chunk_i < num_chunks; ++chunk_i)
|
||||
{
|
||||
auto & chunk = static_cast<arrow::Date64Array &>(*(arrow_column->data()->chunk(chunk_i)));
|
||||
for (size_t value_i = 0, length = static_cast<size_t>(chunk.length()); value_i < length; ++value_i)
|
||||
{
|
||||
auto timestamp = static_cast<UInt32>(chunk.Value(value_i) / 1000); // Always? in ms
|
||||
column_data.emplace_back(timestamp);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void fillColumnWithTimestampData(std::shared_ptr<arrow::Column> & arrow_column, MutableColumnPtr & internal_column)
|
||||
{
|
||||
auto & column_data = static_cast<ColumnVector<UInt32> &>(*internal_column).getData();
|
||||
column_data.reserve(arrow_column->length());
|
||||
|
||||
for (size_t chunk_i = 0, num_chunks = static_cast<size_t>(arrow_column->data()->num_chunks()); chunk_i < num_chunks; ++chunk_i)
|
||||
{
|
||||
auto & chunk = static_cast<arrow::TimestampArray &>(*(arrow_column->data()->chunk(chunk_i)));
|
||||
const auto & type = static_cast<const ::arrow::TimestampType &>(*chunk.type());
|
||||
|
||||
UInt32 divide = 1;
|
||||
const auto unit = type.unit();
|
||||
switch (unit)
|
||||
{
|
||||
case arrow::TimeUnit::SECOND:
|
||||
divide = 1;
|
||||
break;
|
||||
case arrow::TimeUnit::MILLI:
|
||||
divide = 1000;
|
||||
break;
|
||||
case arrow::TimeUnit::MICRO:
|
||||
divide = 1000000;
|
||||
break;
|
||||
case arrow::TimeUnit::NANO:
|
||||
divide = 1000000000;
|
||||
break;
|
||||
}
|
||||
|
||||
for (size_t value_i = 0, length = static_cast<size_t>(chunk.length()); value_i < length; ++value_i)
|
||||
{
|
||||
auto timestamp = static_cast<UInt32>(chunk.Value(value_i) / divide); // ms! TODO: check other 's' 'ns' ...
|
||||
column_data.emplace_back(timestamp);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void fillColumnWithDecimalData(std::shared_ptr<arrow::Column> & arrow_column, MutableColumnPtr & internal_column)
|
||||
{
|
||||
auto & column = static_cast<ColumnDecimal<Decimal128> &>(*internal_column);
|
||||
auto & column_data = column.getData();
|
||||
column_data.reserve(arrow_column->length());
|
||||
|
||||
for (size_t chunk_i = 0, num_chunks = static_cast<size_t>(arrow_column->data()->num_chunks()); chunk_i < num_chunks; ++chunk_i)
|
||||
{
|
||||
auto & chunk = static_cast<arrow::DecimalArray &>(*(arrow_column->data()->chunk(chunk_i)));
|
||||
for (size_t value_i = 0, length = static_cast<size_t>(chunk.length()); value_i < length; ++value_i)
|
||||
{
|
||||
column_data.emplace_back(
|
||||
chunk.IsNull(value_i) ? Decimal128(0) : *reinterpret_cast<const Decimal128 *>(chunk.Value(value_i))); // TODO: copy column
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates a null bytemap from arrow's null bitmap
|
||||
void fillByteMapFromArrowColumn(std::shared_ptr<arrow::Column> & arrow_column, MutableColumnPtr & bytemap)
|
||||
{
|
||||
PaddedPODArray<UInt8> & bytemap_data = static_cast<ColumnVector<UInt8> &>(*bytemap).getData();
|
||||
bytemap_data.reserve(arrow_column->length());
|
||||
|
||||
for (size_t chunk_i = 0; chunk_i != static_cast<size_t>(arrow_column->data()->num_chunks()); ++chunk_i)
|
||||
{
|
||||
std::shared_ptr<arrow::Array> chunk = arrow_column->data()->chunk(chunk_i);
|
||||
|
||||
for (size_t value_i = 0; value_i != static_cast<size_t>(chunk->length()); ++value_i)
|
||||
bytemap_data.emplace_back(chunk->IsNull(value_i));
|
||||
}
|
||||
}
|
||||
|
||||
# define FOR_ARROW_NUMERIC_TYPES(M) \
|
||||
M(arrow::Type::UINT8, UInt8) \
|
||||
M(arrow::Type::INT8, Int8) \
|
||||
M(arrow::Type::UINT16, UInt16) \
|
||||
M(arrow::Type::INT16, Int16) \
|
||||
M(arrow::Type::UINT32, UInt32) \
|
||||
M(arrow::Type::INT32, Int32) \
|
||||
M(arrow::Type::UINT64, UInt64) \
|
||||
M(arrow::Type::INT64, Int64) \
|
||||
M(arrow::Type::FLOAT, Float32) \
|
||||
M(arrow::Type::DOUBLE, Float64)
|
||||
//M(arrow::Type::HALF_FLOAT, Float32) // TODO
|
||||
|
||||
|
||||
using NameToColumnPtr = std::unordered_map<std::string, std::shared_ptr<arrow::Column>>;
|
||||
|
||||
|
||||
Block ParquetBlockInputStream::readImpl()
|
||||
{
|
||||
static const std::unordered_map<arrow::Type::type, std::shared_ptr<IDataType>> arrow_type_to_internal_type = {
|
||||
//{arrow::Type::DECIMAL, std::make_shared<DataTypeDecimal>()},
|
||||
{arrow::Type::UINT8, std::make_shared<DataTypeUInt8>()},
|
||||
{arrow::Type::INT8, std::make_shared<DataTypeInt8>()},
|
||||
{arrow::Type::UINT16, std::make_shared<DataTypeUInt16>()},
|
||||
{arrow::Type::INT16, std::make_shared<DataTypeInt16>()},
|
||||
{arrow::Type::UINT32, std::make_shared<DataTypeUInt32>()},
|
||||
{arrow::Type::INT32, std::make_shared<DataTypeInt32>()},
|
||||
{arrow::Type::UINT64, std::make_shared<DataTypeUInt64>()},
|
||||
{arrow::Type::INT64, std::make_shared<DataTypeInt64>()},
|
||||
{arrow::Type::HALF_FLOAT, std::make_shared<DataTypeFloat32>()},
|
||||
{arrow::Type::FLOAT, std::make_shared<DataTypeFloat32>()},
|
||||
{arrow::Type::DOUBLE, std::make_shared<DataTypeFloat64>()},
|
||||
|
||||
{arrow::Type::BOOL, std::make_shared<DataTypeUInt8>()},
|
||||
//{arrow::Type::DATE32, std::make_shared<DataTypeDate>()},
|
||||
{arrow::Type::DATE32, std::make_shared<DataTypeDate>()},
|
||||
//{arrow::Type::DATE32, std::make_shared<DataTypeDateTime>()},
|
||||
{arrow::Type::DATE64, std::make_shared<DataTypeDateTime>()},
|
||||
{arrow::Type::TIMESTAMP, std::make_shared<DataTypeDateTime>()},
|
||||
//{arrow::Type::TIME32, std::make_shared<DataTypeDateTime>()},
|
||||
|
||||
|
||||
{arrow::Type::STRING, std::make_shared<DataTypeString>()},
|
||||
{arrow::Type::BINARY, std::make_shared<DataTypeString>()},
|
||||
//{arrow::Type::FIXED_SIZE_BINARY, std::make_shared<DataTypeString>()},
|
||||
//{arrow::Type::UUID, std::make_shared<DataTypeString>()},
|
||||
|
||||
|
||||
// TODO: add other types that are convertable to internal ones:
|
||||
// 0. ENUM?
|
||||
// 1. UUID -> String
|
||||
// 2. JSON -> String
|
||||
// Full list of types: contrib/arrow/cpp/src/arrow/type.h
|
||||
};
|
||||
|
||||
|
||||
Block res;
|
||||
|
||||
if (!istr.eof())
|
||||
{
|
||||
/*
|
||||
First we load whole stream into string (its very bad and limiting .parquet file size to half? of RAM)
|
||||
Then producing blocks for every row_group (dont load big .parquet files with one row_group - it can eat x10+ RAM from .parquet file size)
|
||||
*/
|
||||
|
||||
if (row_group_current < row_group_total)
|
||||
throw Exception{"Got new data, but data from previous chunks not readed " + std::to_string(row_group_current) + "/"
|
||||
+ std::to_string(row_group_total),
|
||||
ErrorCodes::CANNOT_READ_ALL_DATA};
|
||||
|
||||
file_data.clear();
|
||||
{
|
||||
WriteBufferFromString file_buffer(file_data);
|
||||
copyData(istr, file_buffer);
|
||||
}
|
||||
|
||||
buffer = std::make_unique<arrow::Buffer>(file_data);
|
||||
// TODO: maybe use parquet::RandomAccessSource?
|
||||
auto reader = parquet::ParquetFileReader::Open(std::make_shared<::arrow::io::BufferReader>(*buffer));
|
||||
file_reader = std::make_unique<parquet::arrow::FileReader>(::arrow::default_memory_pool(), std::move(reader));
|
||||
row_group_total = file_reader->num_row_groups();
|
||||
row_group_current = 0;
|
||||
}
|
||||
if (row_group_current >= row_group_total)
|
||||
return res;
|
||||
|
||||
// TODO: also catch a ParquetException thrown by filereader?
|
||||
//arrow::Status read_status = filereader.ReadTable(&table);
|
||||
std::shared_ptr<arrow::Table> table;
|
||||
arrow::Status read_status = file_reader->ReadRowGroup(row_group_current, &table);
|
||||
|
||||
if (!read_status.ok())
|
||||
throw Exception{"Error while reading parquet data: " + read_status.ToString(), ErrorCodes::CANNOT_READ_ALL_DATA};
|
||||
|
||||
if (0 == table->num_rows())
|
||||
throw Exception{"Empty table in input data", ErrorCodes::EMPTY_DATA_PASSED};
|
||||
|
||||
if (header.columns() > static_cast<size_t>(table->num_columns()))
|
||||
// TODO: What if some columns were not presented? Insert NULLs? What if a column is not nullable?
|
||||
throw Exception{"Number of columns is less than the table has", ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH};
|
||||
|
||||
++row_group_current;
|
||||
|
||||
NameToColumnPtr name_to_column_ptr;
|
||||
for (size_t i = 0, num_columns = static_cast<size_t>(table->num_columns()); i < num_columns; ++i)
|
||||
{
|
||||
std::shared_ptr<arrow::Column> arrow_column = table->column(i);
|
||||
name_to_column_ptr[arrow_column->name()] = arrow_column;
|
||||
}
|
||||
|
||||
for (size_t column_i = 0, columns = header.columns(); column_i < columns; ++column_i)
|
||||
{
|
||||
ColumnWithTypeAndName header_column = header.getByPosition(column_i);
|
||||
|
||||
if (name_to_column_ptr.find(header_column.name) == name_to_column_ptr.end())
|
||||
// TODO: What if some columns were not presented? Insert NULLs? What if a column is not nullable?
|
||||
throw Exception{"Column \"" + header_column.name + "\" is not presented in input data", ErrorCodes::THERE_IS_NO_COLUMN};
|
||||
|
||||
std::shared_ptr<arrow::Column> arrow_column = name_to_column_ptr[header_column.name];
|
||||
arrow::Type::type arrow_type = arrow_column->type()->id();
|
||||
|
||||
// TODO: check if a column is const?
|
||||
if (!header_column.type->isNullable() && arrow_column->null_count())
|
||||
{
|
||||
throw Exception{"Can not insert NULL data into non-nullable column \"" + header_column.name + "\"",
|
||||
ErrorCodes::CANNOT_INSERT_NULL_IN_ORDINARY_COLUMN};
|
||||
}
|
||||
|
||||
const bool target_column_is_nullable = header_column.type->isNullable() || arrow_column->null_count();
|
||||
|
||||
DataTypePtr internal_nested_type;
|
||||
|
||||
if (arrow_type == arrow::Type::DECIMAL)
|
||||
{
|
||||
const auto decimal_type = static_cast<arrow::DecimalType *>(arrow_column->type().get());
|
||||
internal_nested_type = std::make_shared<DataTypeDecimal<Decimal128>>(decimal_type->precision(), decimal_type->scale());
|
||||
}
|
||||
else if (arrow_type_to_internal_type.find(arrow_type) != arrow_type_to_internal_type.end())
|
||||
{
|
||||
internal_nested_type = arrow_type_to_internal_type.at(arrow_type);
|
||||
}
|
||||
else
|
||||
{
|
||||
throw Exception{"The type \"" + arrow_column->type()->name() + "\" of an input column \"" + arrow_column->name()
|
||||
+ "\" is not supported for conversion from a Parquet data format",
|
||||
ErrorCodes::CANNOT_CONVERT_TYPE};
|
||||
}
|
||||
|
||||
const DataTypePtr internal_type = target_column_is_nullable ? makeNullable(internal_nested_type) : internal_nested_type;
|
||||
const std::string internal_nested_type_name = internal_nested_type->getName();
|
||||
|
||||
const DataTypePtr column_nested_type = header_column.type->isNullable()
|
||||
? static_cast<const DataTypeNullable *>(header_column.type.get())->getNestedType()
|
||||
: header_column.type;
|
||||
|
||||
const DataTypePtr column_type = header_column.type;
|
||||
|
||||
const std::string column_nested_type_name = column_nested_type->getName();
|
||||
|
||||
ColumnWithTypeAndName column;
|
||||
column.name = header_column.name;
|
||||
column.type = internal_type;
|
||||
|
||||
/// Data
|
||||
MutableColumnPtr read_column = internal_nested_type->createColumn();
|
||||
switch (arrow_type)
|
||||
{
|
||||
case arrow::Type::STRING:
|
||||
case arrow::Type::BINARY:
|
||||
//case arrow::Type::FIXED_SIZE_BINARY:
|
||||
fillColumnWithStringData(arrow_column, read_column);
|
||||
break;
|
||||
case arrow::Type::BOOL:
|
||||
fillColumnWithBooleanData(arrow_column, read_column);
|
||||
break;
|
||||
case arrow::Type::DATE32:
|
||||
fillColumnWithDate32Data(arrow_column, read_column);
|
||||
break;
|
||||
case arrow::Type::DATE64:
|
||||
fillColumnWithDate64Data(arrow_column, read_column);
|
||||
break;
|
||||
case arrow::Type::TIMESTAMP:
|
||||
fillColumnWithTimestampData(arrow_column, read_column);
|
||||
break;
|
||||
case arrow::Type::DECIMAL:
|
||||
//fillColumnWithNumericData<Decimal128, ColumnDecimal<Decimal128>>(arrow_column, read_column); // Have problems with trash values under NULL, but faster
|
||||
fillColumnWithDecimalData(arrow_column, read_column /*, internal_nested_type*/);
|
||||
break;
|
||||
# define DISPATCH(ARROW_NUMERIC_TYPE, CPP_NUMERIC_TYPE) \
|
||||
case ARROW_NUMERIC_TYPE: \
|
||||
fillColumnWithNumericData<CPP_NUMERIC_TYPE>(arrow_column, read_column); \
|
||||
break;
|
||||
|
||||
FOR_ARROW_NUMERIC_TYPES(DISPATCH)
|
||||
# undef DISPATCH
|
||||
// TODO: support TIMESTAMP_MICROS and TIMESTAMP_MILLIS with truncated micro- and milliseconds?
|
||||
// TODO: read JSON as a string?
|
||||
// TODO: read UUID as a string?
|
||||
default:
|
||||
throw Exception{"Unsupported parquet type \"" + arrow_column->type()->name() + "\" of an input column \""
|
||||
+ arrow_column->name() + "\"",
|
||||
ErrorCodes::UNKNOWN_TYPE};
|
||||
}
|
||||
|
||||
if (column.type->isNullable())
|
||||
{
|
||||
MutableColumnPtr null_bytemap = DataTypeUInt8().createColumn();
|
||||
fillByteMapFromArrowColumn(arrow_column, null_bytemap);
|
||||
column.column = ColumnNullable::create(std::move(read_column), std::move(null_bytemap));
|
||||
}
|
||||
else
|
||||
{
|
||||
column.column = std::move(read_column);
|
||||
}
|
||||
|
||||
column.column = castColumn(column, column_type, context);
|
||||
column.type = column_type;
|
||||
|
||||
res.insert(std::move(column));
|
||||
}
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
void registerInputFormatParquet(FormatFactory & factory)
|
||||
{
|
||||
factory.registerInputFormat(
|
||||
"Parquet",
|
||||
[](ReadBuffer & buf,
|
||||
const Block & sample,
|
||||
const Context & context,
|
||||
UInt64 /* max_block_size */,
|
||||
UInt64 /* rows_portion_size */,
|
||||
FormatFactory::ReadCallback /* callback */,
|
||||
const FormatSettings & /* settings */) { return std::make_shared<ParquetBlockInputStream>(buf, sample, context); });
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
#else
|
||||
|
||||
namespace DB
|
||||
{
|
||||
class FormatFactory;
|
||||
void registerInputFormatParquet(FormatFactory &)
|
||||
{
|
||||
}
|
||||
}
|
||||
|
||||
#endif
|
@ -1,450 +0,0 @@
|
||||
#include "ParquetBlockOutputStream.h"
|
||||
|
||||
#if USE_PARQUET
|
||||
// TODO: clean includes
|
||||
# include <Columns/ColumnDecimal.h>
|
||||
# include <Columns/ColumnFixedString.h>
|
||||
# include <Columns/ColumnNullable.h>
|
||||
# include <Columns/ColumnString.h>
|
||||
# include <Columns/ColumnVector.h>
|
||||
# include <Columns/ColumnsNumber.h>
|
||||
# include <Core/ColumnWithTypeAndName.h>
|
||||
# include <Core/callOnTypeIndex.h>
|
||||
# include <DataTypes/DataTypeDateTime.h>
|
||||
# include <DataTypes/DataTypeNullable.h>
|
||||
# include <DataTypes/DataTypesDecimal.h>
|
||||
# include <DataStreams/SquashingBlockOutputStream.h>
|
||||
# include <Formats/FormatFactory.h>
|
||||
# include <IO/WriteHelpers.h>
|
||||
# include <arrow/api.h>
|
||||
# include <arrow/io/api.h>
|
||||
# include <arrow/util/decimal.h>
|
||||
# include <parquet/arrow/writer.h>
|
||||
# include <parquet/exception.h>
|
||||
# include <parquet/util/memory.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int UNKNOWN_EXCEPTION;
|
||||
extern const int UNKNOWN_TYPE;
|
||||
}
|
||||
|
||||
ParquetBlockOutputStream::ParquetBlockOutputStream(WriteBuffer & ostr, const Block & header, const FormatSettings & format_settings) : ostr{ostr}, header{header}, format_settings{format_settings}
|
||||
{
|
||||
}
|
||||
|
||||
void ParquetBlockOutputStream::flush()
|
||||
{
|
||||
ostr.next();
|
||||
}
|
||||
|
||||
void checkStatus(arrow::Status & status, const std::string & column_name)
|
||||
{
|
||||
if (!status.ok())
|
||||
throw Exception{"Error with a parquet column \"" + column_name + "\": " + status.ToString(), ErrorCodes::UNKNOWN_EXCEPTION};
|
||||
}
|
||||
|
||||
template <typename NumericType, typename ArrowBuilderType>
|
||||
void fillArrowArrayWithNumericColumnData(
|
||||
ColumnPtr write_column, std::shared_ptr<arrow::Array> & arrow_array, const PaddedPODArray<UInt8> * null_bytemap)
|
||||
{
|
||||
const PaddedPODArray<NumericType> & internal_data = static_cast<const ColumnVector<NumericType> &>(*write_column).getData();
|
||||
ArrowBuilderType builder;
|
||||
arrow::Status status;
|
||||
|
||||
const UInt8 * arrow_null_bytemap_raw_ptr = nullptr;
|
||||
PaddedPODArray<UInt8> arrow_null_bytemap;
|
||||
if (null_bytemap)
|
||||
{
|
||||
/// Invert values since Arrow interprets 1 as a non-null value, while CH as a null
|
||||
arrow_null_bytemap.reserve(null_bytemap->size());
|
||||
for (size_t i = 0, size = null_bytemap->size(); i < size; ++i)
|
||||
arrow_null_bytemap.emplace_back(1 ^ (*null_bytemap)[i]);
|
||||
|
||||
arrow_null_bytemap_raw_ptr = arrow_null_bytemap.data();
|
||||
}
|
||||
|
||||
status = builder.AppendValues(internal_data.data(), internal_data.size(), arrow_null_bytemap_raw_ptr);
|
||||
checkStatus(status, write_column->getName());
|
||||
|
||||
status = builder.Finish(&arrow_array);
|
||||
checkStatus(status, write_column->getName());
|
||||
}
|
||||
|
||||
template <typename ColumnType>
|
||||
void fillArrowArrayWithStringColumnData(
|
||||
ColumnPtr write_column, std::shared_ptr<arrow::Array> & arrow_array, const PaddedPODArray<UInt8> * null_bytemap)
|
||||
{
|
||||
const auto & internal_column = static_cast<const ColumnType &>(*write_column);
|
||||
arrow::StringBuilder builder;
|
||||
arrow::Status status;
|
||||
|
||||
for (size_t string_i = 0, size = internal_column.size(); string_i < size; ++string_i)
|
||||
{
|
||||
if (null_bytemap && (*null_bytemap)[string_i])
|
||||
{
|
||||
status = builder.AppendNull();
|
||||
}
|
||||
else
|
||||
{
|
||||
StringRef string_ref = internal_column.getDataAt(string_i);
|
||||
status = builder.Append(string_ref.data, string_ref.size);
|
||||
}
|
||||
|
||||
checkStatus(status, write_column->getName());
|
||||
}
|
||||
|
||||
status = builder.Finish(&arrow_array);
|
||||
checkStatus(status, write_column->getName());
|
||||
}
|
||||
|
||||
void fillArrowArrayWithDateColumnData(
|
||||
ColumnPtr write_column, std::shared_ptr<arrow::Array> & arrow_array, const PaddedPODArray<UInt8> * null_bytemap)
|
||||
{
|
||||
const PaddedPODArray<UInt16> & internal_data = static_cast<const ColumnVector<UInt16> &>(*write_column).getData();
|
||||
//arrow::Date32Builder date_builder;
|
||||
arrow::UInt16Builder builder;
|
||||
arrow::Status status;
|
||||
|
||||
for (size_t value_i = 0, size = internal_data.size(); value_i < size; ++value_i)
|
||||
{
|
||||
if (null_bytemap && (*null_bytemap)[value_i])
|
||||
status = builder.AppendNull();
|
||||
else
|
||||
/// Implicitly converts UInt16 to Int32
|
||||
status = builder.Append(internal_data[value_i]);
|
||||
checkStatus(status, write_column->getName());
|
||||
}
|
||||
|
||||
status = builder.Finish(&arrow_array);
|
||||
checkStatus(status, write_column->getName());
|
||||
}
|
||||
|
||||
void fillArrowArrayWithDateTimeColumnData(
|
||||
ColumnPtr write_column, std::shared_ptr<arrow::Array> & arrow_array, const PaddedPODArray<UInt8> * null_bytemap)
|
||||
{
|
||||
auto & internal_data = static_cast<const ColumnVector<UInt32> &>(*write_column).getData();
|
||||
//arrow::Date64Builder builder;
|
||||
arrow::UInt32Builder builder;
|
||||
arrow::Status status;
|
||||
|
||||
for (size_t value_i = 0, size = internal_data.size(); value_i < size; ++value_i)
|
||||
{
|
||||
if (null_bytemap && (*null_bytemap)[value_i])
|
||||
status = builder.AppendNull();
|
||||
else
|
||||
/// Implicitly converts UInt16 to Int32
|
||||
//status = date_builder.Append(static_cast<int64_t>(internal_data[value_i]) * 1000); // now ms. TODO check other units
|
||||
status = builder.Append(internal_data[value_i]);
|
||||
|
||||
checkStatus(status, write_column->getName());
|
||||
}
|
||||
|
||||
status = builder.Finish(&arrow_array);
|
||||
checkStatus(status, write_column->getName());
|
||||
}
|
||||
|
||||
template <typename DataType>
|
||||
void fillArrowArrayWithDecimalColumnData(
|
||||
ColumnPtr write_column,
|
||||
std::shared_ptr<arrow::Array> & arrow_array,
|
||||
const PaddedPODArray<UInt8> * null_bytemap,
|
||||
const DataType * decimal_type)
|
||||
{
|
||||
const auto & column = static_cast<const typename DataType::ColumnType &>(*write_column);
|
||||
arrow::DecimalBuilder builder(arrow::decimal(decimal_type->getPrecision(), decimal_type->getScale()));
|
||||
arrow::Status status;
|
||||
|
||||
for (size_t value_i = 0, size = column.size(); value_i < size; ++value_i)
|
||||
{
|
||||
if (null_bytemap && (*null_bytemap)[value_i])
|
||||
status = builder.AppendNull();
|
||||
else
|
||||
status = builder.Append(
|
||||
arrow::Decimal128(reinterpret_cast<const uint8_t *>(&column.getElement(value_i).value))); // TODO: try copy column
|
||||
|
||||
checkStatus(status, write_column->getName());
|
||||
}
|
||||
status = builder.Finish(&arrow_array);
|
||||
checkStatus(status, write_column->getName());
|
||||
|
||||
/* TODO column copy
|
||||
const auto & internal_data = static_cast<const typename DataType::ColumnType &>(*write_column).getData();
|
||||
//ArrowBuilderType numeric_builder;
|
||||
arrow::DecimalBuilder builder(arrow::decimal(decimal_type->getPrecision(), decimal_type->getScale()));
|
||||
arrow::Status status;
|
||||
|
||||
const uint8_t * arrow_null_bytemap_raw_ptr = nullptr;
|
||||
PaddedPODArray<UInt8> arrow_null_bytemap;
|
||||
if (null_bytemap)
|
||||
{
|
||||
/// Invert values since Arrow interprets 1 as a non-null value, while CH as a null
|
||||
arrow_null_bytemap.reserve(null_bytemap->size());
|
||||
for (size_t i = 0, size = null_bytemap->size(); i < size; ++i)
|
||||
arrow_null_bytemap.emplace_back(1 ^ (*null_bytemap)[i]);
|
||||
|
||||
arrow_null_bytemap_raw_ptr = arrow_null_bytemap.data();
|
||||
}
|
||||
|
||||
status = builder.AppendValues(reinterpret_cast<const uint8_t*>(internal_data.data()), internal_data.size(), arrow_null_bytemap_raw_ptr);
|
||||
checkStatus(status, write_column->getName());
|
||||
|
||||
status = builder.Finish(&arrow_array);
|
||||
checkStatus(status, write_column->getName());
|
||||
*/
|
||||
}
|
||||
|
||||
# define FOR_INTERNAL_NUMERIC_TYPES(M) \
|
||||
M(UInt8, arrow::UInt8Builder) \
|
||||
M(Int8, arrow::Int8Builder) \
|
||||
M(UInt16, arrow::UInt16Builder) \
|
||||
M(Int16, arrow::Int16Builder) \
|
||||
M(UInt32, arrow::UInt32Builder) \
|
||||
M(Int32, arrow::Int32Builder) \
|
||||
M(UInt64, arrow::UInt64Builder) \
|
||||
M(Int64, arrow::Int64Builder) \
|
||||
M(Float32, arrow::FloatBuilder) \
|
||||
M(Float64, arrow::DoubleBuilder)
|
||||
|
||||
const std::unordered_map<String, std::shared_ptr<arrow::DataType>> internal_type_to_arrow_type = {
|
||||
{"UInt8", arrow::uint8()},
|
||||
{"Int8", arrow::int8()},
|
||||
{"UInt16", arrow::uint16()},
|
||||
{"Int16", arrow::int16()},
|
||||
{"UInt32", arrow::uint32()},
|
||||
{"Int32", arrow::int32()},
|
||||
{"UInt64", arrow::uint64()},
|
||||
{"Int64", arrow::int64()},
|
||||
{"Float32", arrow::float32()},
|
||||
{"Float64", arrow::float64()},
|
||||
|
||||
//{"Date", arrow::date64()},
|
||||
//{"Date", arrow::date32()},
|
||||
{"Date", arrow::uint16()}, // CHECK
|
||||
//{"DateTime", arrow::date64()}, // BUG! saves as date32
|
||||
{"DateTime", arrow::uint32()},
|
||||
|
||||
// TODO: ClickHouse can actually store non-utf8 strings!
|
||||
{"String", arrow::utf8()},
|
||||
{"FixedString", arrow::utf8()},
|
||||
};
|
||||
|
||||
const PaddedPODArray<UInt8> * extractNullBytemapPtr(ColumnPtr column)
|
||||
{
|
||||
ColumnPtr null_column = static_cast<const ColumnNullable &>(*column).getNullMapColumnPtr();
|
||||
const PaddedPODArray<UInt8> & null_bytemap = static_cast<const ColumnVector<UInt8> &>(*null_column).getData();
|
||||
return &null_bytemap;
|
||||
}
|
||||
|
||||
|
||||
class OstreamOutputStream : public parquet::OutputStream
|
||||
{
|
||||
public:
|
||||
explicit OstreamOutputStream(WriteBuffer & ostr_) : ostr(ostr_) {}
|
||||
virtual ~OstreamOutputStream() {}
|
||||
virtual void Close() {}
|
||||
virtual int64_t Tell() { return total_length; }
|
||||
virtual void Write(const uint8_t * data, int64_t length)
|
||||
{
|
||||
ostr.write(reinterpret_cast<const char *>(data), length);
|
||||
total_length += length;
|
||||
}
|
||||
|
||||
private:
|
||||
WriteBuffer & ostr;
|
||||
int64_t total_length = 0;
|
||||
|
||||
PARQUET_DISALLOW_COPY_AND_ASSIGN(OstreamOutputStream);
|
||||
};
|
||||
|
||||
|
||||
void ParquetBlockOutputStream::write(const Block & block)
|
||||
{
|
||||
block.checkNumberOfRows();
|
||||
|
||||
const size_t columns_num = block.columns();
|
||||
|
||||
/// For arrow::Schema and arrow::Table creation
|
||||
std::vector<std::shared_ptr<arrow::Field>> arrow_fields;
|
||||
std::vector<std::shared_ptr<arrow::Array>> arrow_arrays;
|
||||
arrow_fields.reserve(columns_num);
|
||||
arrow_arrays.reserve(columns_num);
|
||||
|
||||
for (size_t column_i = 0; column_i < columns_num; ++column_i)
|
||||
{
|
||||
// TODO: constructed every iteration
|
||||
const ColumnWithTypeAndName & column = block.safeGetByPosition(column_i);
|
||||
|
||||
const bool is_column_nullable = column.type->isNullable();
|
||||
const auto & column_nested_type
|
||||
= is_column_nullable ? static_cast<const DataTypeNullable *>(column.type.get())->getNestedType() : column.type;
|
||||
const std::string column_nested_type_name = column_nested_type->getFamilyName();
|
||||
|
||||
if (isDecimal(column_nested_type))
|
||||
{
|
||||
const auto add_decimal_field = [&](const auto & types) -> bool {
|
||||
using Types = std::decay_t<decltype(types)>;
|
||||
using ToDataType = typename Types::LeftType;
|
||||
|
||||
if constexpr (
|
||||
std::is_same_v<
|
||||
ToDataType,
|
||||
DataTypeDecimal<
|
||||
Decimal32>> || std::is_same_v<ToDataType, DataTypeDecimal<Decimal64>> || std::is_same_v<ToDataType, DataTypeDecimal<Decimal128>>)
|
||||
{
|
||||
const auto & decimal_type = static_cast<const ToDataType *>(column_nested_type.get());
|
||||
arrow_fields.emplace_back(std::make_shared<arrow::Field>(
|
||||
column.name, arrow::decimal(decimal_type->getPrecision(), decimal_type->getScale()), is_column_nullable));
|
||||
}
|
||||
|
||||
return false;
|
||||
};
|
||||
callOnIndexAndDataType<void>(column_nested_type->getTypeId(), add_decimal_field);
|
||||
}
|
||||
else
|
||||
{
|
||||
if (internal_type_to_arrow_type.find(column_nested_type_name) == internal_type_to_arrow_type.end())
|
||||
{
|
||||
throw Exception{"The type \"" + column_nested_type_name + "\" of a column \"" + column.name
|
||||
+ "\""
|
||||
" is not supported for conversion into a Parquet data format",
|
||||
ErrorCodes::UNKNOWN_TYPE};
|
||||
}
|
||||
|
||||
arrow_fields.emplace_back(std::make_shared<arrow::Field>(column.name, internal_type_to_arrow_type.at(column_nested_type_name), is_column_nullable));
|
||||
}
|
||||
|
||||
std::shared_ptr<arrow::Array> arrow_array;
|
||||
|
||||
ColumnPtr nested_column
|
||||
= is_column_nullable ? static_cast<const ColumnNullable &>(*column.column).getNestedColumnPtr() : column.column;
|
||||
const PaddedPODArray<UInt8> * null_bytemap = is_column_nullable ? extractNullBytemapPtr(column.column) : nullptr;
|
||||
|
||||
if ("String" == column_nested_type_name)
|
||||
{
|
||||
fillArrowArrayWithStringColumnData<ColumnString>(nested_column, arrow_array, null_bytemap);
|
||||
}
|
||||
else if ("FixedString" == column_nested_type_name)
|
||||
{
|
||||
fillArrowArrayWithStringColumnData<ColumnFixedString>(nested_column, arrow_array, null_bytemap);
|
||||
}
|
||||
else if ("Date" == column_nested_type_name)
|
||||
{
|
||||
fillArrowArrayWithDateColumnData(nested_column, arrow_array, null_bytemap);
|
||||
}
|
||||
else if ("DateTime" == column_nested_type_name)
|
||||
{
|
||||
fillArrowArrayWithDateTimeColumnData(nested_column, arrow_array, null_bytemap);
|
||||
}
|
||||
|
||||
else if (isDecimal(column_nested_type))
|
||||
{
|
||||
auto fill_decimal = [&](const auto & types) -> bool
|
||||
{
|
||||
using Types = std::decay_t<decltype(types)>;
|
||||
using ToDataType = typename Types::LeftType;
|
||||
if constexpr (
|
||||
std::is_same_v<
|
||||
ToDataType,
|
||||
DataTypeDecimal<
|
||||
Decimal32>> || std::is_same_v<ToDataType, DataTypeDecimal<Decimal64>> || std::is_same_v<ToDataType, DataTypeDecimal<Decimal128>>)
|
||||
{
|
||||
const auto & decimal_type = static_cast<const ToDataType *>(column_nested_type.get());
|
||||
fillArrowArrayWithDecimalColumnData(nested_column, arrow_array, null_bytemap, decimal_type);
|
||||
}
|
||||
return false;
|
||||
};
|
||||
callOnIndexAndDataType<void>(column_nested_type->getTypeId(), fill_decimal);
|
||||
}
|
||||
# define DISPATCH(CPP_NUMERIC_TYPE, ARROW_BUILDER_TYPE) \
|
||||
else if (#CPP_NUMERIC_TYPE == column_nested_type_name) \
|
||||
{ \
|
||||
fillArrowArrayWithNumericColumnData<CPP_NUMERIC_TYPE, ARROW_BUILDER_TYPE>(nested_column, arrow_array, null_bytemap); \
|
||||
}
|
||||
|
||||
FOR_INTERNAL_NUMERIC_TYPES(DISPATCH)
|
||||
# undef DISPATCH
|
||||
else
|
||||
{
|
||||
throw Exception{"Internal type \"" + column_nested_type_name + "\" of a column \"" + column.name
|
||||
+ "\""
|
||||
" is not supported for conversion into a Parquet data format",
|
||||
ErrorCodes::UNKNOWN_TYPE};
|
||||
}
|
||||
|
||||
|
||||
arrow_arrays.emplace_back(std::move(arrow_array));
|
||||
}
|
||||
|
||||
std::shared_ptr<arrow::Schema> arrow_schema = std::make_shared<arrow::Schema>(std::move(arrow_fields));
|
||||
|
||||
std::shared_ptr<arrow::Table> arrow_table = arrow::Table::Make(arrow_schema, arrow_arrays);
|
||||
|
||||
auto sink = std::make_shared<OstreamOutputStream>(ostr);
|
||||
|
||||
if (!file_writer)
|
||||
{
|
||||
|
||||
parquet::WriterProperties::Builder builder;
|
||||
#if USE_SNAPPY
|
||||
builder.compression(parquet::Compression::SNAPPY);
|
||||
#endif
|
||||
auto props = builder.build();
|
||||
auto status = parquet::arrow::FileWriter::Open(
|
||||
*arrow_table->schema(),
|
||||
arrow::default_memory_pool(),
|
||||
sink,
|
||||
props, /*parquet::default_writer_properties(),*/
|
||||
parquet::arrow::default_arrow_writer_properties(),
|
||||
&file_writer);
|
||||
if (!status.ok())
|
||||
throw Exception{"Error while opening a table: " + status.ToString(), ErrorCodes::UNKNOWN_EXCEPTION};
|
||||
}
|
||||
|
||||
// TODO: calculate row_group_size depending on a number of rows and table size
|
||||
auto status = file_writer->WriteTable(*arrow_table, format_settings.parquet.row_group_size);
|
||||
|
||||
if (!status.ok())
|
||||
throw Exception{"Error while writing a table: " + status.ToString(), ErrorCodes::UNKNOWN_EXCEPTION};
|
||||
}
|
||||
|
||||
void ParquetBlockOutputStream::writeSuffix()
|
||||
{
|
||||
if (file_writer)
|
||||
{
|
||||
auto status = file_writer->Close();
|
||||
if (!status.ok())
|
||||
throw Exception{"Error while closing a table: " + status.ToString(), ErrorCodes::UNKNOWN_EXCEPTION};
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void registerOutputFormatParquet(FormatFactory & factory)
|
||||
{
|
||||
factory.registerOutputFormat(
|
||||
"Parquet", [](WriteBuffer & buf, const Block & sample, const Context & /*context*/, const FormatSettings & format_settings)
|
||||
{
|
||||
BlockOutputStreamPtr impl = std::make_shared<ParquetBlockOutputStream>(buf, sample, format_settings);
|
||||
auto res = std::make_shared<SquashingBlockOutputStream>(impl, impl->getHeader(), format_settings.parquet.row_group_size, 0);
|
||||
res->disableFlush();
|
||||
return res;
|
||||
});
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
#else
|
||||
|
||||
namespace DB
|
||||
{
|
||||
class FormatFactory;
|
||||
void registerOutputFormatParquet(FormatFactory &)
|
||||
{
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
#endif
|
@ -1,94 +0,0 @@
|
||||
#include "ProtobufRowInputStream.h"
|
||||
|
||||
#if USE_PROTOBUF
|
||||
#include <Core/Block.h>
|
||||
#include <Formats/BlockInputStreamFromRowInputStream.h>
|
||||
#include <Formats/FormatFactory.h>
|
||||
#include <Formats/FormatSchemaInfo.h>
|
||||
#include <Formats/ProtobufSchemas.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
ProtobufRowInputStream::ProtobufRowInputStream(ReadBuffer & in_, const Block & header, const FormatSchemaInfo & format_schema)
|
||||
: data_types(header.getDataTypes()), reader(in_, ProtobufSchemas::instance().getMessageTypeForFormatSchema(format_schema), header.getNames())
|
||||
{
|
||||
}
|
||||
|
||||
ProtobufRowInputStream::~ProtobufRowInputStream() = default;
|
||||
|
||||
bool ProtobufRowInputStream::read(MutableColumns & columns, RowReadExtension & extra)
|
||||
{
|
||||
if (!reader.startMessage())
|
||||
return false; // EOF reached, no more messages.
|
||||
|
||||
// Set of columns for which the values were read. The rest will be filled with default values.
|
||||
auto & read_columns = extra.read_columns;
|
||||
read_columns.assign(columns.size(), false);
|
||||
|
||||
// Read values from this message and put them to the columns while it's possible.
|
||||
size_t column_index;
|
||||
while (reader.readColumnIndex(column_index))
|
||||
{
|
||||
bool allow_add_row = !static_cast<bool>(read_columns[column_index]);
|
||||
do
|
||||
{
|
||||
bool row_added;
|
||||
data_types[column_index]->deserializeProtobuf(*columns[column_index], reader, allow_add_row, row_added);
|
||||
if (row_added)
|
||||
{
|
||||
read_columns[column_index] = true;
|
||||
allow_add_row = false;
|
||||
}
|
||||
} while (reader.canReadMoreValues());
|
||||
}
|
||||
|
||||
// Fill non-visited columns with the default values.
|
||||
for (column_index = 0; column_index < read_columns.size(); ++column_index)
|
||||
if (!read_columns[column_index])
|
||||
data_types[column_index]->insertDefaultInto(*columns[column_index]);
|
||||
|
||||
reader.endMessage();
|
||||
return true;
|
||||
}
|
||||
|
||||
bool ProtobufRowInputStream::allowSyncAfterError() const
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
void ProtobufRowInputStream::syncAfterError()
|
||||
{
|
||||
reader.endMessage(true);
|
||||
}
|
||||
|
||||
|
||||
void registerInputFormatProtobuf(FormatFactory & factory)
|
||||
{
|
||||
factory.registerInputFormat("Protobuf", [](
|
||||
ReadBuffer & buf,
|
||||
const Block & sample,
|
||||
const Context & context,
|
||||
UInt64 max_block_size,
|
||||
UInt64 rows_portion_size,
|
||||
FormatFactory::ReadCallback callback,
|
||||
const FormatSettings & settings)
|
||||
{
|
||||
return std::make_shared<BlockInputStreamFromRowInputStream>(
|
||||
std::make_shared<ProtobufRowInputStream>(buf, sample, FormatSchemaInfo(context, "Protobuf")),
|
||||
sample, max_block_size, rows_portion_size, callback, settings);
|
||||
});
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
#else
|
||||
|
||||
namespace DB
|
||||
{
|
||||
class FormatFactory;
|
||||
void registerInputFormatProtobuf(FormatFactory &) {}
|
||||
}
|
||||
|
||||
#endif
|
Loading…
Reference in New Issue
Block a user