ClickHouse/src/Processors/Formats/Impl/XMLRowOutputFormat.cpp

256 lines
7.1 KiB
C++
Raw Normal View History

2019-02-19 18:41:18 +00:00
#include <IO/WriteHelpers.h>
#include <IO/WriteBufferValidUTF8.h>
#include <Processors/Formats/Impl/XMLRowOutputFormat.h>
#include <Formats/FormatFactory.h>
namespace DB
{
XMLRowOutputFormat::XMLRowOutputFormat(WriteBuffer & out_, const Block & header_, const RowOutputFormatParams & params_, const FormatSettings & format_settings_)
: IRowOutputFormat(header_, out_, params_), format_settings(format_settings_)
2019-02-19 18:41:18 +00:00
{
2020-04-22 06:34:20 +00:00
const auto & sample = getPort(PortKind::Main).getHeader();
2019-02-19 18:41:18 +00:00
NamesAndTypesList columns(sample.getNamesAndTypesList());
fields.assign(columns.begin(), columns.end());
field_tag_names.resize(sample.columns());
bool need_validate_utf8 = false;
for (size_t i = 0; i < sample.columns(); ++i)
{
if (!sample.getByPosition(i).type->textCanContainOnlyValidUTF8())
need_validate_utf8 = true;
/// As element names, we will use the column name if it has a valid form, or "field", otherwise.
/// The condition below is more strict than the XML standard requires.
bool is_column_name_suitable = true;
const char * begin = fields[i].name.data();
const char * end = begin + fields[i].name.size();
for (const char * pos = begin; pos != end; ++pos)
{
char c = *pos;
if (!(isAlphaASCII(c)
|| (pos != begin && isNumericASCII(c))
|| c == '_'
|| c == '-'
|| c == '.'))
{
is_column_name_suitable = false;
break;
}
}
field_tag_names[i] = is_column_name_suitable
? fields[i].name
: "field";
}
if (need_validate_utf8)
{
validating_ostr = std::make_unique<WriteBufferValidUTF8>(out);
ostr = validating_ostr.get();
}
else
ostr = &out;
}
void XMLRowOutputFormat::writePrefix()
{
writeCString("<?xml version='1.0' encoding='UTF-8' ?>\n", *ostr);
writeCString("<result>\n", *ostr);
writeCString("\t<meta>\n", *ostr);
writeCString("\t\t<columns>\n", *ostr);
for (const auto & field : fields)
{
writeCString("\t\t\t<column>\n", *ostr);
writeCString("\t\t\t\t<name>", *ostr);
2020-12-02 09:00:27 +00:00
writeXMLStringForTextElement(field.name, *ostr);
2019-02-19 18:41:18 +00:00
writeCString("</name>\n", *ostr);
writeCString("\t\t\t\t<type>", *ostr);
2020-12-02 09:00:27 +00:00
writeXMLStringForTextElement(field.type->getName(), *ostr);
2019-02-19 18:41:18 +00:00
writeCString("</type>\n", *ostr);
writeCString("\t\t\t</column>\n", *ostr);
}
writeCString("\t\t</columns>\n", *ostr);
writeCString("\t</meta>\n", *ostr);
writeCString("\t<data>\n", *ostr);
}
2021-03-09 14:46:52 +00:00
void XMLRowOutputFormat::writeField(const IColumn & column, const ISerialization & serialization, size_t row_num)
2019-02-19 18:41:18 +00:00
{
writeCString("\t\t\t<", *ostr);
writeString(field_tag_names[field_number], *ostr);
writeCString(">", *ostr);
2021-03-09 14:46:52 +00:00
serialization.serializeTextXML(column, row_num, *ostr, format_settings);
2019-02-19 18:41:18 +00:00
writeCString("</", *ostr);
writeString(field_tag_names[field_number], *ostr);
writeCString(">\n", *ostr);
++field_number;
}
void XMLRowOutputFormat::writeRowStartDelimiter()
{
writeCString("\t\t<row>\n", *ostr);
}
void XMLRowOutputFormat::writeRowEndDelimiter()
{
writeCString("\t\t</row>\n", *ostr);
field_number = 0;
++row_count;
}
void XMLRowOutputFormat::writeSuffix()
{
writeCString("\t</data>\n", *ostr);
}
void XMLRowOutputFormat::writeBeforeTotals()
{
writeCString("\t<totals>\n", *ostr);
}
void XMLRowOutputFormat::writeTotals(const Columns & columns, size_t row_num)
{
size_t totals_columns = columns.size();
2020-04-22 06:34:20 +00:00
const auto & header = getPort(PortKind::Totals).getHeader();
2019-02-19 18:41:18 +00:00
for (size_t i = 0; i < totals_columns; ++i)
{
const ColumnWithTypeAndName & column = header.safeGetByPosition(i);
writeCString("\t\t<", *ostr);
writeString(field_tag_names[i], *ostr);
writeCString(">", *ostr);
2021-03-09 14:46:52 +00:00
column.type->getDefaultSerialization()->serializeTextXML(*columns[i], row_num, *ostr, format_settings);
2019-02-19 18:41:18 +00:00
writeCString("</", *ostr);
writeString(field_tag_names[i], *ostr);
writeCString(">\n", *ostr);
}
}
void XMLRowOutputFormat::writeAfterTotals()
{
writeCString("\t</totals>\n", *ostr);
}
void XMLRowOutputFormat::writeBeforeExtremes()
{
writeCString("\t<extremes>\n", *ostr);
}
void XMLRowOutputFormat::writeMinExtreme(const Columns & columns, size_t row_num)
{
writeExtremesElement("min", columns, row_num);
}
void XMLRowOutputFormat::writeMaxExtreme(const Columns & columns, size_t row_num)
{
writeExtremesElement("max", columns, row_num);
}
void XMLRowOutputFormat::writeAfterExtremes()
{
writeCString("\t</extremes>\n", *ostr);
}
void XMLRowOutputFormat::writeExtremesElement(const char * title, const Columns & columns, size_t row_num)
{
2020-04-22 06:34:20 +00:00
const auto & header = getPort(PortKind::Extremes).getHeader();
2019-02-19 18:41:18 +00:00
writeCString("\t\t<", *ostr);
writeCString(title, *ostr);
writeCString(">\n", *ostr);
size_t extremes_columns = columns.size();
for (size_t i = 0; i < extremes_columns; ++i)
{
const ColumnWithTypeAndName & column = header.safeGetByPosition(i);
writeCString("\t\t\t<", *ostr);
writeString(field_tag_names[i], *ostr);
writeCString(">", *ostr);
2021-03-09 14:46:52 +00:00
column.type->getDefaultSerialization()->serializeTextXML(*columns[i], row_num, *ostr, format_settings);
2019-02-19 18:41:18 +00:00
writeCString("</", *ostr);
writeString(field_tag_names[i], *ostr);
writeCString(">\n", *ostr);
}
writeCString("\t\t</", *ostr);
writeCString(title, *ostr);
writeCString(">\n", *ostr);
}
void XMLRowOutputFormat::onProgress(const Progress & value)
{
progress.incrementPiecewiseAtomically(value);
}
void XMLRowOutputFormat::writeLastSuffix()
{
writeCString("\t<rows>", *ostr);
writeIntText(row_count, *ostr);
writeCString("</rows>\n", *ostr);
writeRowsBeforeLimitAtLeast();
if (format_settings.write_statistics)
writeStatistics();
writeCString("</result>\n", *ostr);
ostr->next();
}
void XMLRowOutputFormat::writeRowsBeforeLimitAtLeast()
{
if (applied_limit)
{
writeCString("\t<rows_before_limit_at_least>", *ostr);
writeIntText(rows_before_limit, *ostr);
writeCString("</rows_before_limit_at_least>\n", *ostr);
}
}
void XMLRowOutputFormat::writeStatistics()
{
writeCString("\t<statistics>\n", *ostr);
writeCString("\t\t<elapsed>", *ostr);
writeText(watch.elapsedSeconds(), *ostr);
writeCString("</elapsed>\n", *ostr);
writeCString("\t\t<rows_read>", *ostr);
2019-06-25 17:19:32 +00:00
writeText(progress.read_rows.load(), *ostr);
2019-02-19 18:41:18 +00:00
writeCString("</rows_read>\n", *ostr);
writeCString("\t\t<bytes_read>", *ostr);
2019-06-25 17:19:32 +00:00
writeText(progress.read_bytes.load(), *ostr);
2019-02-19 18:41:18 +00:00
writeCString("</bytes_read>\n", *ostr);
writeCString("\t</statistics>\n", *ostr);
}
void registerOutputFormatProcessorXML(FormatFactory & factory)
{
factory.registerOutputFormatProcessor("XML", [](
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams & params,
2019-02-19 18:41:18 +00:00
const FormatSettings & settings)
{
return std::make_shared<XMLRowOutputFormat>(buf, sample, params, settings);
2019-02-19 18:41:18 +00:00
});
}
}