dbms: fixed error (progress message in the middle of data, in JSON formats) [#METR-11125].

This commit is contained in:
Alexey Milovidov 2014-08-15 00:27:41 +04:00
parent fd9668be12
commit 65cf115313
15 changed files with 34 additions and 10 deletions

View File

@ -33,6 +33,8 @@ public:
output->write(res);
}
void flush() { output->flush(); }
private:
BlockOutputStreamPtr output;
NamesAndTypesListPtr required_columns;

View File

@ -20,6 +20,8 @@ public:
void writeField(const Field & field);
void writeRowEndDelimiter();
void flush() { ostr.next(); }
protected:
WriteBuffer & ostr;
const Block sample;

View File

@ -17,6 +17,8 @@ public:
void write(const Block & block);
void writePrefix() { row_output->writePrefix(); }
void writeSuffix() { row_output->writeSuffix(); }
void flush() { row_output->flush(); }
void setRowsBeforeLimit(size_t rows_before_limit);
void setTotals(const Block & totals);

View File

@ -31,6 +31,10 @@ public:
*/
virtual void writePrefix() {}
virtual void writeSuffix() {}
/** Сбросить имеющиеся буферы для записи.
*/
virtual void flush() {}
/** Методы для установки дополнительной информации для вывода в поддерживающих её форматах.
*/

View File

@ -32,6 +32,9 @@ public:
virtual void writePrefix() {}; /// разделитель перед началом результата
virtual void writeSuffix() {}; /// разделитель после конца результата
/** Сбросить имеющиеся буферы для записи. */
virtual void flush() {}
/** Методы для установки дополнительной информации для вывода в поддерживающих её форматах.
*/
virtual void setRowsBeforeLimit(size_t rows_before_limit) {}

View File

@ -25,6 +25,8 @@ public:
void writeRowEndDelimiter();
void writePrefix();
void writeSuffix();
void flush() { ostr.next(); dst_ostr.next(); }
void setRowsBeforeLimit(size_t rows_before_limit_)
{
@ -41,7 +43,8 @@ protected:
virtual void writeTotals();
virtual void writeExtremes();
WriteBufferValidUTF8 ostr;
WriteBuffer & dst_ostr;
WriteBufferValidUTF8 ostr; /// Валидирует и пишет в dst_ostr.
size_t field_number;
size_t row_count;
bool applied_limit;

View File

@ -15,6 +15,8 @@ public:
NativeBlockOutputStream(WriteBuffer & ostr_) : ostr(ostr_) {}
void write(const Block & block);
void flush() { ostr.next(); }
private:
WriteBuffer & ostr;
};

View File

@ -18,6 +18,8 @@ public:
void write(const Block & block);
void writeSuffix();
void flush() { ostr.next(); }
void setTotals(const Block & totals_) { totals = totals_; }
void setExtremes(const Block & extremes_) { extremes = extremes_; }

View File

@ -18,8 +18,8 @@ namespace DB
class PushingToViewsBlockOutputStream : public IBlockOutputStream
{
public:
PushingToViewsBlockOutputStream(String database_, String table_, const Context &context_, ASTPtr query_ptr_)
:database(database_), table(table_), context(context_), query_ptr(query_ptr_)
PushingToViewsBlockOutputStream(String database_, String table_, const Context & context_, ASTPtr query_ptr_)
: database(database_), table(table_), context(context_), query_ptr(query_ptr_)
{
if (database.empty())
database = context.getCurrentDatabase();

View File

@ -16,6 +16,8 @@ public:
TabSeparatedBlockOutputStream(WriteBuffer & ostr_) : ostr(ostr_) {}
void write(const Block & block);
void flush() { ostr.next(); }
private:
WriteBuffer & ostr;
};

View File

@ -26,6 +26,8 @@ public:
void writePrefix();
void writeSuffix();
void flush() { ostr.next(); }
void setTotals(const Block & totals_) { totals = totals_; }
void setExtremes(const Block & extremes_) { extremes = extremes_; }

View File

@ -26,6 +26,8 @@ public:
void writeRowEndDelimiter();
void writeRowBetweenDelimiter();
void flush() { ostr.next(); }
private:
WriteBuffer & ostr;
const Block sample;

View File

@ -25,6 +25,8 @@ public:
void writeRowStartDelimiter();
void writeRowBetweenDelimiter();
void flush() { ostr.next(); }
private:
WriteBuffer & ostr;
const Block sample;

View File

@ -788,12 +788,8 @@ private:
written_first_block = true;
}
/** Это обычно приводит к тому, что полученный блок данных выводится клиенту.
* Но не всегда. Например, JSONRowOutputStream пишет данные сначала в WriteBufferValidUTF8,
* которые ещё немного буферизует данные перед записью в std_out.
* Поэтому, вызов std_out.next() может записать не все данные.
*/
std_out.next();
/// Полученный блок данных сразу выводится клиенту.
block_std_out->flush();
}

View File

@ -10,7 +10,7 @@ using Poco::SharedPtr;
JSONRowOutputStream::JSONRowOutputStream(WriteBuffer & ostr_, const Block & sample_)
: ostr(ostr_), field_number(0), row_count(0), applied_limit(false), rows_before_limit(0)
: dst_ostr(ostr_), ostr(dst_ostr), field_number(0), row_count(0), applied_limit(false), rows_before_limit(0)
{
NamesAndTypesList columns(sample_.getColumnsList());
fields.assign(columns.begin(), columns.end());