#include #include #include namespace DB { XMLRowOutputStream::XMLRowOutputStream(WriteBuffer & ostr_, const Block & sample_) : dst_ostr(ostr_) { NamesAndTypesList columns(sample_.getColumnsList()); fields.assign(columns.begin(), columns.end()); field_tag_names.resize(sample_.columns()); bool have_non_numeric_columns = false; for (size_t i = 0; i < sample_.columns(); ++i) { if (!sample_.unsafeGetByPosition(i).type->isNumeric()) have_non_numeric_columns = true; /// В качестве имён элементов будем использовать имя столбца, если оно имеет допустимый вид, или "field", иначе. /// Условие, приведённое ниже, более строгое, чем того требует стандарт XML. bool is_column_name_suitable = true; const char * begin = fields[i].name.data(); const char * end = begin + fields[i].name.size(); for (const char * pos = begin; pos != end; ++pos) { char c = *pos; if (!( isAlphaASCII(c) || (pos != begin && isNumericASCII(c)) || c == '_' || c == '-' || c == '.')) { is_column_name_suitable = false; break; } } field_tag_names[i] = is_column_name_suitable ? fields[i].name : "field"; } if (have_non_numeric_columns) { validating_ostr.reset(new WriteBufferValidUTF8(dst_ostr)); ostr = validating_ostr.get(); } else ostr = &dst_ostr; } void XMLRowOutputStream::writePrefix() { writeCString("\n", *ostr); writeCString("\n", *ostr); writeCString("\t\n", *ostr); writeCString("\t\t\n", *ostr); for (size_t i = 0; i < fields.size(); ++i) { writeCString("\t\t\t\n", *ostr); writeCString("\t\t\t\t", *ostr); writeXMLString(fields[i].name, *ostr); writeCString("\n", *ostr); writeCString("\t\t\t\t", *ostr); writeXMLString(fields[i].type->getName(), *ostr); writeCString("\n", *ostr); writeCString("\t\t\t\n", *ostr); } writeCString("\t\t\n", *ostr); writeCString("\t\n", *ostr); writeCString("\t\n", *ostr); } void XMLRowOutputStream::writeField(const IColumn & column, const IDataType & type, size_t row_num) { writeCString("\t\t\t<", *ostr); writeString(field_tag_names[field_number], *ostr); writeCString(">", *ostr); type.serializeTextXML(column, row_num, *ostr); writeCString("\n", *ostr); ++field_number; } void XMLRowOutputStream::writeRowStartDelimiter() { writeCString("\t\t\n", *ostr); } void XMLRowOutputStream::writeRowEndDelimiter() { writeCString("\t\t\n", *ostr); field_number = 0; ++row_count; } void XMLRowOutputStream::writeSuffix() { writeCString("\t\n", *ostr); writeTotals(); writeExtremes(); writeCString("\t", *ostr); writeIntText(row_count, *ostr); writeCString("\n", *ostr); writeRowsBeforeLimitAtLeast(); writeStatistics(); writeCString("\n", *ostr); ostr->next(); } void XMLRowOutputStream::writeRowsBeforeLimitAtLeast() { if (applied_limit) { writeCString("\t", *ostr); writeIntText(rows_before_limit, *ostr); writeCString("\n", *ostr); } } void XMLRowOutputStream::writeTotals() { if (totals) { writeCString("\t\n", *ostr); size_t totals_columns = totals.columns(); for (size_t i = 0; i < totals_columns; ++i) { const ColumnWithTypeAndName & column = totals.getByPosition(i); writeCString("\t\t<", *ostr); writeString(field_tag_names[i], *ostr); writeCString(">", *ostr); column.type->serializeTextXML(*column.column.get(), 0, *ostr); writeCString("\n", *ostr); } writeCString("\t\n", *ostr); } } static void writeExtremesElement(const char * title, const Block & extremes, size_t row_num, const Names & field_tag_names, WriteBuffer & ostr) { writeCString("\t\t<", ostr); writeCString(title, ostr); writeCString(">\n", ostr); size_t extremes_columns = extremes.columns(); for (size_t i = 0; i < extremes_columns; ++i) { const ColumnWithTypeAndName & column = extremes.getByPosition(i); writeCString("\t\t\t<", ostr); writeString(field_tag_names[i], ostr); writeCString(">", ostr); column.type->serializeTextXML(*column.column.get(), row_num, ostr); writeCString("\n", ostr); } writeCString("\t\t\n", ostr); } void XMLRowOutputStream::writeExtremes() { if (extremes) { writeCString("\t\n", *ostr); writeExtremesElement("min", extremes, 0, field_tag_names, *ostr); writeExtremesElement("max", extremes, 1, field_tag_names, *ostr); writeCString("\t\n", *ostr); } } void XMLRowOutputStream::onProgress(const Progress & value) { progress.incrementPiecewiseAtomically(value); } void XMLRowOutputStream::writeStatistics() { writeCString("\t\n", *ostr); writeCString("\t\t", *ostr); writeText(watch.elapsedSeconds(), *ostr); writeCString("\n", *ostr); writeCString("\t\t", *ostr); writeText(progress.rows.load(), *ostr); writeCString("\n", *ostr); writeCString("\t\t", *ostr); writeText(progress.bytes.load(), *ostr); writeCString("\n", *ostr); writeCString("\t", *ostr); } }