#include #include #include #include namespace DB { XMLRowOutputFormat::XMLRowOutputFormat(WriteBuffer & out_, const Block & header_, const RowOutputFormatParams & params_, const FormatSettings & format_settings_) : IRowOutputFormat(header_, out_, params_), format_settings(format_settings_) { const auto & sample = getPort(PortKind::Main).getHeader(); NamesAndTypesList columns(sample.getNamesAndTypesList()); fields.assign(columns.begin(), columns.end()); field_tag_names.resize(sample.columns()); bool need_validate_utf8 = false; for (size_t i = 0; i < sample.columns(); ++i) { if (!sample.getByPosition(i).type->textCanContainOnlyValidUTF8()) need_validate_utf8 = true; /// As element names, we will use the column name if it has a valid form, or "field", otherwise. /// The condition below is more strict than the XML standard requires. bool is_column_name_suitable = true; const char * begin = fields[i].name.data(); const char * end = begin + fields[i].name.size(); for (const char * pos = begin; pos != end; ++pos) { char c = *pos; if (!(isAlphaASCII(c) || (pos != begin && isNumericASCII(c)) || c == '_' || c == '-' || c == '.')) { is_column_name_suitable = false; break; } } field_tag_names[i] = is_column_name_suitable ? fields[i].name : "field"; } if (need_validate_utf8) { validating_ostr = std::make_unique(out); ostr = validating_ostr.get(); } else ostr = &out; } void XMLRowOutputFormat::writePrefix() { writeCString("\n", *ostr); writeCString("\n", *ostr); writeCString("\t\n", *ostr); writeCString("\t\t\n", *ostr); for (const auto & field : fields) { writeCString("\t\t\t\n", *ostr); writeCString("\t\t\t\t", *ostr); writeXMLString(field.name, *ostr); writeCString("\n", *ostr); writeCString("\t\t\t\t", *ostr); writeXMLString(field.type->getName(), *ostr); writeCString("\n", *ostr); writeCString("\t\t\t\n", *ostr); } writeCString("\t\t\n", *ostr); writeCString("\t\n", *ostr); writeCString("\t\n", *ostr); } void XMLRowOutputFormat::writeField(const IColumn & column, const IDataType & type, size_t row_num) { writeCString("\t\t\t<", *ostr); writeString(field_tag_names[field_number], *ostr); writeCString(">", *ostr); type.serializeAsTextXML(column, row_num, *ostr, format_settings); writeCString("\n", *ostr); ++field_number; } void XMLRowOutputFormat::writeRowStartDelimiter() { writeCString("\t\t\n", *ostr); } void XMLRowOutputFormat::writeRowEndDelimiter() { writeCString("\t\t\n", *ostr); field_number = 0; ++row_count; } void XMLRowOutputFormat::writeSuffix() { writeCString("\t\n", *ostr); } void XMLRowOutputFormat::writeBeforeTotals() { writeCString("\t\n", *ostr); } void XMLRowOutputFormat::writeTotals(const Columns & columns, size_t row_num) { size_t totals_columns = columns.size(); const auto & header = getPort(PortKind::Totals).getHeader(); for (size_t i = 0; i < totals_columns; ++i) { const ColumnWithTypeAndName & column = header.safeGetByPosition(i); writeCString("\t\t<", *ostr); writeString(field_tag_names[i], *ostr); writeCString(">", *ostr); column.type->serializeAsTextXML(*columns[i], row_num, *ostr, format_settings); writeCString("\n", *ostr); } } void XMLRowOutputFormat::writeAfterTotals() { writeCString("\t\n", *ostr); } void XMLRowOutputFormat::writeBeforeExtremes() { writeCString("\t\n", *ostr); } void XMLRowOutputFormat::writeMinExtreme(const Columns & columns, size_t row_num) { writeExtremesElement("min", columns, row_num); } void XMLRowOutputFormat::writeMaxExtreme(const Columns & columns, size_t row_num) { writeExtremesElement("max", columns, row_num); } void XMLRowOutputFormat::writeAfterExtremes() { writeCString("\t\n", *ostr); } void XMLRowOutputFormat::writeExtremesElement(const char * title, const Columns & columns, size_t row_num) { const auto & header = getPort(PortKind::Extremes).getHeader(); writeCString("\t\t<", *ostr); writeCString(title, *ostr); writeCString(">\n", *ostr); size_t extremes_columns = columns.size(); for (size_t i = 0; i < extremes_columns; ++i) { const ColumnWithTypeAndName & column = header.safeGetByPosition(i); writeCString("\t\t\t<", *ostr); writeString(field_tag_names[i], *ostr); writeCString(">", *ostr); column.type->serializeAsTextXML(*columns[i], row_num, *ostr, format_settings); writeCString("\n", *ostr); } writeCString("\t\t\n", *ostr); } void XMLRowOutputFormat::onProgress(const Progress & value) { progress.incrementPiecewiseAtomically(value); } void XMLRowOutputFormat::writeLastSuffix() { writeCString("\t", *ostr); writeIntText(row_count, *ostr); writeCString("\n", *ostr); writeRowsBeforeLimitAtLeast(); if (format_settings.write_statistics) writeStatistics(); writeCString("\n", *ostr); ostr->next(); } void XMLRowOutputFormat::writeRowsBeforeLimitAtLeast() { if (applied_limit) { writeCString("\t", *ostr); writeIntText(rows_before_limit, *ostr); writeCString("\n", *ostr); } } void XMLRowOutputFormat::writeStatistics() { writeCString("\t\n", *ostr); writeCString("\t\t", *ostr); writeText(watch.elapsedSeconds(), *ostr); writeCString("\n", *ostr); writeCString("\t\t", *ostr); writeText(progress.read_rows.load(), *ostr); writeCString("\n", *ostr); writeCString("\t\t", *ostr); writeText(progress.read_bytes.load(), *ostr); writeCString("\n", *ostr); writeCString("\t\n", *ostr); } void registerOutputFormatProcessorXML(FormatFactory & factory) { factory.registerOutputFormatProcessor("XML", []( WriteBuffer & buf, const Block & sample, const RowOutputFormatParams & params, const FormatSettings & settings) { return std::make_shared(buf, sample, params, settings); }); } }