change ValuesBlockInputStream to ValuesBlockInputFormat

This commit is contained in:
Alexander Tokmakov 2019-09-02 19:26:22 +03:00
parent 8adf13047b
commit d31358a280
7 changed files with 81 additions and 75 deletions

View File

@ -48,7 +48,7 @@ static bool handleOverflowMode(OverflowMode mode, const String & message, int co
}
static bool checkTimeLimit(const IRowInputFormat::Params & params, const Stopwatch & stopwatch)
bool checkTimeLimit(const IRowInputFormat::Params & params, const Stopwatch & stopwatch)
{
if (params.max_execution_time != 0
&& stopwatch.elapsed() > static_cast<UInt64>(params.max_execution_time.totalMicroseconds()) * 1000)

View File

@ -37,6 +37,7 @@ struct RowInputFormatParams
};
bool isParseError(int code);
bool checkTimeLimit(const RowInputFormatParams & params, const Stopwatch & stopwatch);
///Row oriented input format: reads data row by row.
class IRowInputFormat : public IInputFormat

View File

@ -14,7 +14,7 @@
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/CommonParsers.h>
#include <Formats/ConstantExpressionTemplate.h>
#include <Processors/Formats/Impl/ConstantExpressionTemplate.h>
#include <Parsers/ExpressionElementParsers.h>

View File

@ -4,9 +4,8 @@
#include <Interpreters/convertFieldToType.h>
#include <Parsers/TokenIterator.h>
#include <Parsers/ExpressionListParsers.h>
#include <Formats/ValuesBlockInputStream.h>
#include <Processors/Formats/Impl/ValuesBlockInputFormat.h>
#include <Formats/FormatFactory.h>
#include <Processors/Formats/IRowInputFormat.h>
#include <Common/FieldVisitors.h>
#include <Core/Block.h>
#include <Common/typeid_cast.h>
@ -32,41 +31,42 @@ namespace ErrorCodes
}
ValuesBlockInputStream::ValuesBlockInputStream(ReadBuffer & istr_, const Block & header_, const Context & context_,
const FormatSettings & format_settings_, UInt64 max_block_size_, UInt64 rows_portion_size_)
: istr(istr_), header(header_), context(std::make_unique<Context>(context_)), format_settings(format_settings_),
max_block_size(max_block_size_), rows_portion_size(rows_portion_size_), num_columns(header.columns()),
attempts_to_generate_template(num_columns), rows_parsed_using_template(num_columns)
ValuesBlockInputFormat::ValuesBlockInputFormat(ReadBuffer & in_, const Block & header_, const RowInputFormatParams & params_,
const Context & context_, const FormatSettings & format_settings_)
: IInputFormat(header_, buf), buf(in_), params(params_), context(std::make_unique<Context>(context_)),
format_settings(format_settings_), num_columns(header_.columns()),
attempts_to_generate_template(num_columns), rows_parsed_using_template(num_columns), templates(num_columns)
{
templates.resize(header.columns());
/// In this format, BOM at beginning of stream cannot be confused with value, so it is safe to skip it.
skipBOMIfExists(istr);
skipBOMIfExists(buf);
/// TODO remove before merge
const_cast<FormatSettings&>(this->format_settings).values.interpret_expressions = false;
}
Block ValuesBlockInputStream::readImpl()
Chunk ValuesBlockInputFormat::generate()
{
const Block & header = getPort().getHeader();
MutableColumns columns = header.cloneEmptyColumns();
for (size_t rows_in_block = 0, batch = 0; rows_in_block < max_block_size; ++rows_in_block, ++batch)
for (size_t rows_in_block = 0, batch = 0; rows_in_block < params.max_block_size; ++rows_in_block, ++batch)
{
if (rows_portion_size && batch == rows_portion_size)
if (params.rows_portion_size && batch == params.rows_portion_size)
{
batch = 0;
if (!checkTimeLimit() || isCancelled())
if (!checkTimeLimit(params, total_stopwatch) || isCancelled())
break;
}
try
{
skipWhitespaceIfAny(istr);
if (istr.eof() || *istr.position() == ';')
skipWhitespaceIfAny(buf);
if (buf.eof() || *buf.position() == ';')
break;
assertChar('(', istr);
assertChar('(', buf);
for (size_t column_idx = 0; column_idx < num_columns; ++column_idx)
{
skipWhitespaceIfAny(istr);
PeekableReadBufferCheckpoint checkpoint{istr};
skipWhitespaceIfAny(buf);
PeekableReadBufferCheckpoint checkpoint{buf};
bool parse_separate_value = !parseExpressionUsingTemplate(columns[column_idx], column_idx);
@ -77,9 +77,9 @@ Block ValuesBlockInputStream::readImpl()
readValue(*columns[column_idx], column_idx, shouldGenerateNewTemplate(column_idx));
}
skipWhitespaceIfAny(istr);
if (!istr.eof() && *istr.position() == ',')
++istr.position();
skipWhitespaceIfAny(buf);
if (!buf.eof() && *buf.position() == ',')
++buf.position();
++total_rows;
}
@ -106,20 +106,24 @@ Block ValuesBlockInputStream::readImpl()
}
if (columns.empty() || columns[0]->empty())
{
readSuffix();
return {};
}
return header.cloneWithColumns(std::move(columns));
size_t rows_in_block = columns[0]->size();
return Chunk{std::move(columns), rows_in_block};
}
bool ValuesBlockInputStream::parseExpressionUsingTemplate(MutableColumnPtr & column, size_t column_idx)
bool ValuesBlockInputFormat::parseExpressionUsingTemplate(MutableColumnPtr & column, size_t column_idx)
{
if (templates[column_idx])
{
/// Try to parse expression using template if one was successfully generated while parsing the first row
try
{
templates[column_idx].value().parseExpression(istr, format_settings);
assertDelimAfterValue(column_idx);
templates[column_idx].value().parseExpression(buf, format_settings);
assertDelimiterAfterValue(column_idx);
++rows_parsed_using_template[column_idx];
return true;
}
@ -139,23 +143,24 @@ bool ValuesBlockInputStream::parseExpressionUsingTemplate(MutableColumnPtr & col
/// Do not use the template anymore and fallback to slow SQL parser
templates[column_idx].reset();
++attempts_to_generate_template[column_idx];
istr.rollbackToCheckpoint();
buf.rollbackToCheckpoint();
}
}
return false;
}
void ValuesBlockInputStream::readValue(IColumn & column, size_t column_idx, bool generate_template)
void ValuesBlockInputFormat::readValue(IColumn & column, size_t column_idx, bool generate_template)
{
bool rollback_on_exception = false;
try
{
header.getByPosition(column_idx).type->deserializeAsTextQuoted(column, istr, format_settings);
const Block & header = getPort().getHeader();
header.getByPosition(column_idx).type->deserializeAsTextQuoted(column, buf, format_settings);
rollback_on_exception = true;
skipWhitespaceIfAny(istr);
skipWhitespaceIfAny(buf);
assertDelimAfterValue(column_idx);
assertDelimiterAfterValue(column_idx);
}
catch (const Exception & e)
{
@ -176,7 +181,7 @@ void ValuesBlockInputStream::readValue(IColumn & column, size_t column_idx, bool
if (rollback_on_exception)
column.popBack(1);
istr.rollbackToCheckpoint();
buf.rollbackToCheckpoint();
parseExpression(column, column_idx, generate_template);
}
else
@ -185,22 +190,23 @@ void ValuesBlockInputStream::readValue(IColumn & column, size_t column_idx, bool
}
void
ValuesBlockInputStream::parseExpression(IColumn & column, size_t column_idx, bool generate_template)
ValuesBlockInputFormat::parseExpression(IColumn & column, size_t column_idx, bool generate_template)
{
const Block & header = getPort().getHeader();
const IDataType & type = *header.getByPosition(column_idx).type;
Expected expected;
// TODO make tokenizer to work with buffers, not only with continuous memory
Tokens tokens(istr.position(), istr.buffer().end());
Tokens tokens(buf.position(), buf.buffer().end());
IParser::Pos token_iterator(tokens);
ASTPtr ast;
if (!parser.parse(token_iterator, ast, expected))
{
istr.rollbackToCheckpoint();
buf.rollbackToCheckpoint();
throw Exception("Cannot parse expression of type " + type.getName() + " here: "
+ String(istr.position(), std::min(SHOW_CHARS_ON_SYNTAX_ERROR, istr.buffer().end() - istr.position())),
+ String(buf.position(), std::min(SHOW_CHARS_ON_SYNTAX_ERROR, buf.buffer().end() - buf.position())),
ErrorCodes::SYNTAX_ERROR);
}
@ -210,15 +216,15 @@ ValuesBlockInputStream::parseExpression(IColumn & column, size_t column_idx, boo
/// Check that we are indeed allowed to insert a NULL.
if (value.isNull() && !type.isNullable())
{
istr.rollbackToCheckpoint();
buf.rollbackToCheckpoint();
throw Exception{"Expression returns value " + applyVisitor(FieldVisitorToString(), value)
+ ", that is out of range of type " + type.getName()
+ ", at: " +
String(istr.position(), std::min(SHOW_CHARS_ON_SYNTAX_ERROR, istr.buffer().end() - istr.position())),
String(buf.position(), std::min(SHOW_CHARS_ON_SYNTAX_ERROR, buf.buffer().end() - buf.position())),
ErrorCodes::VALUE_IS_OUT_OF_RANGE_OF_DATA_TYPE};
}
istr.position() = const_cast<char *>(token_iterator->begin);
buf.position() = const_cast<char *>(token_iterator->begin);
if (format_settings.values.deduce_templates_of_expressions && generate_template)
{
@ -228,9 +234,9 @@ ValuesBlockInputStream::parseExpression(IColumn & column, size_t column_idx, boo
try
{
templates[column_idx] = ConstantExpressionTemplate(type, TokenIterator(tokens), token_iterator, ast, *context);
istr.rollbackToCheckpoint();
templates[column_idx].value().parseExpression(istr, format_settings);
assertDelimAfterValue(column_idx);
buf.rollbackToCheckpoint();
templates[column_idx].value().parseExpression(buf, format_settings);
assertDelimiterAfterValue(column_idx);
return;
}
catch (...)
@ -239,25 +245,25 @@ ValuesBlockInputStream::parseExpression(IColumn & column, size_t column_idx, boo
throw;
/// Continue parsing without template
templates[column_idx].reset();
istr.position() = const_cast<char *>(token_iterator->begin);
buf.position() = const_cast<char *>(token_iterator->begin);
}
}
assertDelimAfterValue(column_idx);
assertDelimiterAfterValue(column_idx);
column.insert(value);
}
void ValuesBlockInputStream::assertDelimAfterValue(size_t column_idx)
void ValuesBlockInputFormat::assertDelimiterAfterValue(size_t column_idx)
{
skipWhitespaceIfAny(istr);
skipWhitespaceIfAny(buf);
if (column_idx + 1 != num_columns)
assertChar(',', istr);
assertChar(',', buf);
else
assertChar(')', istr);
assertChar(')', buf);
}
bool ValuesBlockInputStream::shouldGenerateNewTemplate(size_t column_idx)
bool ValuesBlockInputFormat::shouldGenerateNewTemplate(size_t column_idx)
{
// TODO better heuristic
constexpr size_t max_attempts = 3;
@ -275,18 +281,16 @@ bool ValuesBlockInputStream::shouldGenerateNewTemplate(size_t column_idx)
}
void registerInputFormatValues(FormatFactory & factory)
void registerInputFormatProcessorValues(FormatFactory & factory)
{
factory.registerInputFormat("Values", [](
factory.registerInputFormatProcessor("Values", [](
ReadBuffer & buf,
const Block & sample,
const Block & header,
const Context & context,
UInt64 max_block_size,
UInt64 rows_portion_size,
FormatFactory::ReadCallback,
const RowInputFormatParams & params,
const FormatSettings & settings)
{
return std::make_shared<ValuesBlockInputStream>(buf, sample, context, settings, max_block_size, rows_portion_size);
return std::make_shared<ValuesBlockInputFormat>(buf, header, params, context, settings);
});
}

View File

@ -1,9 +1,10 @@
#pragma once
#include <Core/Block.h>
#include <DataStreams/IBlockInputStream.h>
#include <Processors/Formats/IInputFormat.h>
#include <Processors/Formats/IRowInputFormat.h>
#include <Formats/FormatSettings.h>
#include <Formats/ConstantExpressionTemplate.h>
#include <Processors/Formats/Impl/ConstantExpressionTemplate.h>
#include <IO/PeekableReadBuffer.h>
#include <Parsers/ExpressionListParsers.h>
@ -17,41 +18,41 @@ class ReadBuffer;
/** Stream to read data in VALUES format (as in INSERT query).
*/
class ValuesBlockInputStream : public IBlockInputStream
class ValuesBlockInputFormat : public IInputFormat
{
public:
/** Data is parsed using fast, streaming parser.
* If interpret_expressions is true, it will, in addition, try to use SQL parser and interpreter
* in case when streaming parser could not parse field (this is very slow).
*/
ValuesBlockInputStream(ReadBuffer & istr_, const Block & header_, const Context & context_, const FormatSettings & format_settings_, UInt64 max_block_size_, UInt64 rows_portion_size_);
ValuesBlockInputFormat(ReadBuffer & in_, const Block & header_, const RowInputFormatParams & params_,
const Context & context_, const FormatSettings & format_settings_);
String getName() const override { return "ValuesBlockOutputStream"; }
Block getHeader() const override { return header; }
void readPrefix() override { }
void readSuffix() override { }
String getName() const override { return "ValuesBlockInputFormat"; }
private:
typedef std::vector<std::optional<ConstantExpressionTemplate>> ConstantExpressionTemplates;
Block readImpl() override;
Chunk generate() override;
bool parseExpressionUsingTemplate(MutableColumnPtr & column, size_t column_idx);
void readValue(IColumn & column, size_t column_idx, bool generate_template);
void parseExpression(IColumn & column, size_t column_idx, bool generate_template);
inline void assertDelimAfterValue(size_t column_idx);
inline void assertDelimiterAfterValue(size_t column_idx);
bool shouldGenerateNewTemplate(size_t column_idx);
void readSuffix() { buf.assertCanBeDestructed(); }
private:
PeekableReadBuffer istr;
Block header;
PeekableReadBuffer buf;
RowInputFormatParams params;
Stopwatch total_stopwatch {CLOCK_MONOTONIC_COARSE};
std::unique_ptr<Context> context; /// pimpl
const FormatSettings format_settings;
UInt64 max_block_size;
UInt64 rows_portion_size;
size_t num_columns;
size_t total_rows = 0;

View File

@ -149,9 +149,9 @@ bool ValuesRowInputFormat::readRow(MutableColumns & columns, RowReadExtension &)
}
void registerInputFormatProcessorValues(FormatFactory & factory)
void _registerInputFormatProcessorValues(FormatFactory & factory)
{
factory.registerInputFormatProcessor("Values", [](
factory.registerInputFormatProcessor("_Values", [](
ReadBuffer & buf,
const Block & sample,
const Context & context,