Parquet output: refactor functions-fillers

This commit is contained in:
Ivan Zhukov 2018-05-25 01:22:15 +03:00
parent 0111969eaa
commit 72ccc8f978

View File

@ -20,6 +20,28 @@ ParquetBlockOutputStream::ParquetBlockOutputStream(WriteBuffer & ostr_, const Bl
{
}
void checkAppendStatus(arrow::Status & append_status, const std::string & column_name)
{
if (!append_status.ok())
{
throw Exception(
"Error while building a parquet column \"" + column_name + "\": " + append_status.ToString()/*,
ErrorCodes::TODO*/
);
}
}
void checkFinishStatus(arrow::Status & finish_status, const std::string & column_name)
{
if (!finish_status.ok())
{
throw Exception(
"Error while writing a parquet column \"" + column_name + "\": " + finish_status.ToString()/*,
ErrorCodes::TODO*/
);
}
}
template <typename NumericType, typename ArrowBuilderType>
void ParquetBlockOutputStream::fillArrowArrayWithNumericColumnData(ColumnPtr write_column, std::shared_ptr<arrow::Array> & arrow_array)
{
@ -27,22 +49,10 @@ void ParquetBlockOutputStream::fillArrowArrayWithNumericColumnData(ColumnPtr wri
ArrowBuilderType numeric_builder;
arrow::Status append_status = numeric_builder.AppendValues(internal_data.data(), internal_data.size());
if (!append_status.ok())
{
throw Exception(
"Error while building a parquet column \"" + write_column->getName() + "\": " + append_status.ToString()/*,
ErrorCodes::TODO*/
);
}
checkAppendStatus(append_status, write_column->getName());
arrow::Status finish_status = numeric_builder.Finish(&arrow_array);
if (!finish_status.ok())
{
throw Exception(
"Error while writing a parquet column \"" + write_column->getName() + "\": " + finish_status.ToString()/*,
ErrorCodes::TODO*/
);
}
checkFinishStatus(finish_status, write_column->getName());
}
void ParquetBlockOutputStream::fillArrowArrayWithStringColumnData(ColumnPtr write_column, std::shared_ptr<arrow::Array> & arrow_array)
@ -52,26 +62,14 @@ void ParquetBlockOutputStream::fillArrowArrayWithStringColumnData(ColumnPtr writ
for (size_t string_i = 0; string_i != internal_column.size(); ++string_i)
{
StringRef && string_ref = internal_column.getDataAt(string_i);
StringRef string_ref = internal_column.getDataAt(string_i);
arrow::Status append_status = string_builder.Append(string_ref.data, string_ref.size);
if (!append_status.ok())
{
throw Exception(
"Error while building a parquet column \"" + write_column->getName() + "\": " + append_status.ToString()/*,
ErrorCodes::TODO*/
);
}
checkAppendStatus(append_status, write_column->getName());
}
arrow::Status finish_status = string_builder.Finish(&arrow_array);
if (!finish_status.ok())
{
throw Exception(
"Error while writing a parquet column \"" + write_column->getName() + "\": " + finish_status.ToString()/*,
ErrorCodes::TODO*/
);
}
checkFinishStatus(finish_status, write_column->getName());
}
void ParquetBlockOutputStream::fillArrowArrayWithDateColumnData(ColumnPtr write_column, std::shared_ptr<arrow::Array> & arrow_array)
@ -83,23 +81,11 @@ void ParquetBlockOutputStream::fillArrowArrayWithDateColumnData(ColumnPtr write_
{
/// Implicitly converts UInt16 to Int32
arrow::Status append_status = date32_builder.Append(internal_data[value_i]);
if (!append_status.ok())
{
throw Exception(
"Error while building a parquet column \"" + write_column->getName() + "\": " + append_status.ToString()/*,
ErrorCodes::TODO*/
);
}
checkAppendStatus(append_status, write_column->getName());
}
arrow::Status finish_status = date32_builder.Finish(&arrow_array);
if (!finish_status.ok())
{
throw Exception(
"Error while writing a parquet column \"" + write_column->getName() + "\": " + finish_status.ToString()/*,
ErrorCodes::TODO*/
);
}
checkFinishStatus(finish_status, write_column->getName());
}
#define FOR_INTERNAL_NUMERIC_TYPES(M) \