Actualize TODOs

This commit is contained in:
Ivan Zhukov 2018-05-26 23:44:51 +03:00
parent 72ccc8f978
commit 882e246e4f
4 changed files with 20 additions and 23 deletions

View File

@ -171,13 +171,14 @@ Block ParquetBlockInputStream::readImpl()
}
// TODO: is it possible to read metadata only and then read columns one by one?
// TODO: seems like row groups are especially for that (kinda)
arrow::Buffer buffer(file_data);
// TODO: maybe use parquet::RandomAccessSource?
auto reader = parquet::ParquetFileReader::Open(std::make_shared<::arrow::io::BufferReader>(buffer));
parquet::arrow::FileReader filereader(::arrow::default_memory_pool(), std::move(reader));
std::shared_ptr<arrow::Table> table;
// TODO: also catch a ParquetException?
// TODO: also catch a ParquetException thrown by filereader?
arrow::Status read_status = filereader.ReadTable(&table);
if (!read_status.ok())
throw Exception("Error while reading parquet data: " + read_status.ToString()/*, ErrorCodes::TODO*/);
@ -205,10 +206,6 @@ Block ParquetBlockInputStream::readImpl()
// TODO: What if some columns were not presented? Insert NULLs? What if a column is not nullable?
throw Exception("Column \"" + header_column.name + "\" is not presented in input data" /*, ErrorCodes::TODO*/);
// TODO: timezones?
// TODO: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md
// TODO: how to interpet a JSON doc?
std::shared_ptr<arrow::Column> arrow_column = name_to_column_ptr[header_column.name];
arrow::Type::type arrow_type = arrow_column->type()->id();
@ -252,6 +249,7 @@ Block ParquetBlockInputStream::readImpl()
#undef DISPATCH
// TODO: support TIMESTAMP_MICROS and TIMESTAMP_MILLIS with truncated micro- and milliseconds?
// TODO: read JSON as a string?
// TODO: read UUID as a string?
default:
throw Exception("Unsupported parquet type " + arrow_column->type()->name()/*, ErrorCodes::TODO*/);

View File

@ -8,13 +8,10 @@
#include <DataTypes/DataTypeDate.h>
// TODO: refine includes
#include <arrow/api.h>
/* #include <DataStreams/MarkInCompressedFile.h> */
/* #include <Common/PODArray.h> */
namespace DB
{
// TODO: move a common parts for parquet and arrow to smth like ArrowBlockInputStream
class ParquetBlockInputStream : public IProfilingBlockInputStream
{
public:
@ -53,7 +50,10 @@ private:
{arrow::Type::DATE32, std::make_shared<DataTypeDate>()},
{arrow::Type::STRING, std::make_shared<DataTypeString>()}//,
// TODO: add other types
// TODO: add other types that are convertable to internal ones:
// 0. ENUM?
// 1. UUID -> String
// 2. JSON -> String
};
// TODO: check that this class implements every part of its parent

View File

@ -20,6 +20,11 @@ ParquetBlockOutputStream::ParquetBlockOutputStream(WriteBuffer & ostr_, const Bl
{
}
void ParquetBlockOutputStream::flush()
{
ostr.next();
}
void checkAppendStatus(arrow::Status & append_status, const std::string & column_name)
{
if (!append_status.ok())
@ -101,9 +106,6 @@ void ParquetBlockOutputStream::fillArrowArrayWithDateColumnData(ColumnPtr write_
M(Float64, arrow::DoubleBuilder)
// TODO: create a better row_group_size estimation
/* static constexpr const UInt64 GiB_in_bytes = 1 << 30; */
void ParquetBlockOutputStream::write(const Block & block)
{
block.checkNumberOfRows();
@ -122,7 +124,6 @@ void ParquetBlockOutputStream::write(const Block & block)
// TODO: support NULLs
arrow_fields.emplace_back(new arrow::Field(column.name, internal_type_to_arrow_type[column.type->getName()], /*nullable = */false));
// TODO: !keep in mind that arrow and parquet types are not interchangeable!
std::shared_ptr<arrow::Array> arrow_array;
String internal_type_name = column.type->getName();
@ -143,7 +144,9 @@ void ParquetBlockOutputStream::write(const Block & block)
{
fillArrowArrayWithDateColumnData(column.column, arrow_array); \
}
// TODO: are there internal types that are convertable to parquet/arrow once?
// TODO: there are also internal types that are convertable to parquet/arrow once:
// 1. FixedString(N)
// 2. DateTime
else
{
throw Exception(
@ -161,8 +164,7 @@ void ParquetBlockOutputStream::write(const Block & block)
// TODO: get rid of extra copying
std::shared_ptr<parquet::InMemoryOutputStream> sink = std::make_shared<parquet::InMemoryOutputStream>();
// TODO: calculate row_group_size
/* const UInt64 row_group_size = std::min(1, GiB_in_bytes / sizeof(UInt64) / arrow_table->num_rows()); */
// TODO: calculate row_group_size depending on a number of rows and table size
arrow::Status write_status = parquet::arrow::WriteTable(
*arrow_table, arrow::default_memory_pool(), sink,
@ -176,9 +178,4 @@ void ParquetBlockOutputStream::write(const Block & block)
writeString(reinterpret_cast<const char *>(table_buffer->data()), table_buffer->size(), ostr);
}
/* void ParquetBlockOutputStream::flush() */
/* { */
/* ostr.next(); */
/* } */
};

View File

@ -14,7 +14,7 @@ public:
Block getHeader() const override { return header; }
void write(const Block & block) override;
/* void flush() override; */
void flush() override;
String getContentType() const override { return "application/octet-stream"; }
@ -43,7 +43,9 @@ private:
{"Date", arrow::date32()},
{"String", arrow::utf8()}//,
// TODO: add other types
// TODO: add other types:
// 1. FixedString
// 2. DateTime
};
};