works with HTTP requests [#CONV-7549]

This commit is contained in:
Vyacheslav Alipov 2013-05-20 12:21:51 +00:00
parent a67e8a8d35
commit 70a13e3812
15 changed files with 63 additions and 21 deletions

View File

@ -19,7 +19,7 @@ public:
Block & sample, size_t max_block_size, const DataTypeFactory & data_type_factory) const;
BlockOutputStreamPtr getOutput(const String & name, WriteBuffer & buf,
Block & sample) const;
Block & sample, const BlockInputStreamPtr & input_stream = NULL) const;
};
}

View File

@ -26,15 +26,19 @@ struct BlockStreamProfileInfo
size_t blocks;
size_t bytes;
bool applied_limit; /// Применялся ли LIMIT
size_t rows_before_limit; /// Число строк до выполнения LIMIT
/// Информация о вложенных потоках - для выделения чистого времени работы.
typedef std::vector<const BlockStreamProfileInfo *> BlockStreamProfileInfos;
BlockStreamProfileInfos nested_infos;
String column_names;
BlockStreamProfileInfo() : started(false), rows(0), blocks(0), bytes(0) {}
BlockStreamProfileInfo() : started(false), rows(0), blocks(0), bytes(0), applied_limit(false), rows_before_limit(0) {}
void update(Block & block);
void updateRowsBeforeLimit(Block & block);
void print(std::ostream & ostr) const;
};

View File

@ -11,12 +11,12 @@
namespace DB
{
/** Поток для вывода данных в формате JSON.
/** Поток для вывода данных в формате JSONCompact.
*/
class JSONCompactRowOutputStream : public JSONRowOutputStream
{
public:
JSONCompactRowOutputStream(WriteBuffer & ostr_, const Block & sample_);
JSONCompactRowOutputStream(WriteBuffer & ostr_, const Block & sample_, const BlockInputStreamPtr & input_stream_ = NULL);
void writeField(const Field & field);
void writeFieldDelimiter();

View File

@ -6,6 +6,7 @@
#include <DB/IO/WriteBuffer.h>
#include <DB/IO/WriteBufferValidUTF8.h>
#include <DB/DataStreams/IRowOutputStream.h>
#include <DB/DataStreams/IProfilingBlockInputStream.h>
namespace DB
@ -16,7 +17,7 @@ namespace DB
class JSONRowOutputStream : public IRowOutputStream
{
public:
JSONRowOutputStream(WriteBuffer & ostr_, const Block & sample_);
JSONRowOutputStream(WriteBuffer & ostr_, const Block & sample_, const BlockInputStreamPtr & input_stream_ = NULL);
void writeField(const Field & field);
void writeFieldDelimiter();
@ -26,11 +27,15 @@ public:
void writeSuffix();
protected:
void writeRowsBeforeLimitAtLeast();
typedef std::vector<NameAndTypePair> NamesAndTypesVector;
WriteBufferValidUTF8 ostr;
size_t field_number;
size_t row_count;
BlockInputStreamPtr input_stream;
NamesAndTypesVector fields;
};

View File

@ -40,7 +40,7 @@ public:
String format_name = format_ast ? dynamic_cast<ASTIdentifier &>(*format_ast).name : "TabSeparated";
BlockInputStreamPtr in = executeImpl();
BlockOutputStreamPtr out = context.getFormatFactory().getOutput(format_name, buf, sample);
BlockOutputStreamPtr out = context.getFormatFactory().getOutput(format_name, buf, sample, in);
copyData(*in, *out);

View File

@ -38,7 +38,7 @@ public:
String format_name = format_ast ? dynamic_cast<ASTIdentifier &>(*format_ast).name : "TabSeparated";
BlockInputStreamPtr in = executeImpl();
BlockOutputStreamPtr out = context.getFormatFactory().getOutput(format_name, buf, sample);
BlockOutputStreamPtr out = context.getFormatFactory().getOutput(format_name, buf, sample, in);
copyData(*in, *out);

View File

@ -40,7 +40,7 @@ public:
String format_name = format_ast ? dynamic_cast<ASTIdentifier &>(*format_ast).name : "TabSeparated";
BlockInputStreamPtr in = executeImpl();
BlockOutputStreamPtr out = context.getFormatFactory().getOutput(format_name, buf, sample);
BlockOutputStreamPtr out = context.getFormatFactory().getOutput(format_name, buf, sample, in);
copyData(*in, *out);

View File

@ -695,7 +695,7 @@ private:
if (ASTIdentifier * id = dynamic_cast<ASTIdentifier *>(&*query_with_output->format))
current_format = id->name;
block_std_out = context.getFormatFactory().getOutput(current_format, std_out, block);
block_std_out = context.getFormatFactory().getOutput(current_format, std_out, block, block_std_in);
block_std_out->writePrefix();
}

View File

@ -46,7 +46,7 @@ BlockInputStreamPtr FormatFactory::getInput(const String & name, ReadBuffer & bu
BlockOutputStreamPtr FormatFactory::getOutput(const String & name, WriteBuffer & buf,
Block & sample) const
Block & sample, const BlockInputStreamPtr & input_stream) const
{
if (name == "Native")
return new NativeBlockOutputStream(buf);
@ -79,9 +79,9 @@ BlockOutputStreamPtr FormatFactory::getOutput(const String & name, WriteBuffer &
else if (name == "Values")
return new BlockOutputStreamFromRowOutputStream(new ValuesRowOutputStream(buf, sample));
else if (name == "JSON")
return new BlockOutputStreamFromRowOutputStream(new JSONRowOutputStream(buf, sample));
return new BlockOutputStreamFromRowOutputStream(new JSONRowOutputStream(buf, sample, input_stream));
else if (name == "JSONCompact")
return new BlockOutputStreamFromRowOutputStream(new JSONCompactRowOutputStream(buf, sample));
return new BlockOutputStreamFromRowOutputStream(new JSONCompactRowOutputStream(buf, sample, input_stream));
else if (name == "Null")
return new NullBlockOutputStream;
else

View File

@ -21,6 +21,13 @@ void BlockStreamProfileInfo::update(Block & block)
}
void BlockStreamProfileInfo::updateRowsBeforeLimit(Block & block)
{
applied_limit = true;
rows_before_limit += block.rows();
}
void BlockStreamProfileInfo::print(std::ostream & ostr) const
{
UInt64 elapsed = work_stopwatch.elapsed();

View File

@ -9,8 +9,8 @@ namespace DB
using Poco::SharedPtr;
JSONCompactRowOutputStream::JSONCompactRowOutputStream(WriteBuffer & ostr_, const Block & sample_)
: JSONRowOutputStream(ostr_, sample_)
JSONCompactRowOutputStream::JSONCompactRowOutputStream(WriteBuffer & ostr_, const Block & sample_, const BlockInputStreamPtr & input_stream_)
: JSONRowOutputStream(ostr_, sample_, input_stream_)
{
}

View File

@ -9,8 +9,8 @@ namespace DB
using Poco::SharedPtr;
JSONRowOutputStream::JSONRowOutputStream(WriteBuffer & ostr_, const Block & sample_)
: ostr(ostr_), field_number(0), row_count(0)
JSONRowOutputStream::JSONRowOutputStream(WriteBuffer & ostr_, const Block & sample_, const BlockInputStreamPtr & input_stream_)
: ostr(ostr_), field_number(0), row_count(0), input_stream(input_stream_)
{
NamesAndTypesList columns(sample_.getColumnsList());
fields.assign(columns.begin(), columns.end());
@ -87,9 +87,30 @@ void JSONRowOutputStream::writeSuffix()
writeChar('\n', ostr);
writeCString("\t\"rows\": ", ostr);
writeIntText(row_count, ostr);
writeRowsBeforeLimitAtLeast();
writeChar('\n', ostr);
writeCString("}\n", ostr);
ostr.next();
}
void JSONRowOutputStream::writeRowsBeforeLimitAtLeast()
{
if (input_stream.isNull())
return;
if (const IProfilingBlockInputStream * input = dynamic_cast<const IProfilingBlockInputStream *>(&*input_stream))
{
const BlockStreamProfileInfo & info = input->getInfo();
if (info.applied_limit)
{
writeCString(",\n", ostr);
writeChar('\n', ostr);
writeCString("\t\"rows_before_limit_at_least\": ", ostr);
writeIntText(info.rows_before_limit, ostr);
}
}
}
}

View File

@ -20,6 +20,9 @@ Block LimitBlockInputStream::readImpl()
Block res;
size_t rows = 0;
/// укажем, что LIMIT применялся
info.updateRowsBeforeLimit(res);
/// pos - сколько строк было прочитано, включая последний прочитанный блок
if (pos >= offset + limit)
@ -30,6 +33,7 @@ Block LimitBlockInputStream::readImpl()
res = children.back()->read();
if (!res)
return res;
info.updateRowsBeforeLimit(res);
rows = res.rows();
pos += rows;
} while (pos <= offset);

View File

@ -572,7 +572,7 @@ BlockInputStreamPtr InterpreterSelectQuery::executeAndFormat(WriteBuffer & buf)
String format_name = query.format ? dynamic_cast<ASTIdentifier &>(*query.format).name : "TabSeparated";
BlockInputStreamPtr in = execute();
BlockOutputStreamPtr out = context.getFormatFactory().getOutput(format_name, buf, sample);
BlockOutputStreamPtr out = context.getFormatFactory().getOutput(format_name, buf, sample, in);
copyData(*in, *out);

View File

@ -435,7 +435,8 @@ void TCPHandler::sendData(Block & block)
state.block_out = query_context.getFormatFactory().getOutput(
"Native",
*state.maybe_compressed_out,
state.io.in_sample);
state.io.in_sample,
state.io.in);
}
writeVarUInt(Protocol::Server::Data, *out);