ClickHouse/src/Processors/Formats/Impl/ValuesBlockInputFormat.cpp

652 lines
23 KiB
C++
Raw Normal View History

#include <IO/ReadHelpers.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Interpreters/Context.h>
#include <Interpreters/convertFieldToType.h>
#include <Parsers/TokenIterator.h>
#include <Processors/Formats/Impl/ValuesBlockInputFormat.h>
#include <Formats/FormatFactory.h>
#include <Formats/ReadSchemaUtils.h>
#include <Core/Block.h>
2021-10-02 07:13:14 +00:00
#include <base/find_symbols.h>
2021-02-17 10:33:41 +00:00
#include <Common/typeid_cast.h>
#include <Common/checkStackSize.h>
2019-09-11 19:55:28 +00:00
#include <Parsers/ASTLiteral.h>
2021-03-09 14:46:52 +00:00
#include <DataTypes/Serializations/SerializationNullable.h>
#include <DataTypes/DataTypeTuple.h>
2021-02-17 10:29:06 +00:00
#include <DataTypes/DataTypeArray.h>
2021-02-21 11:57:03 +00:00
#include <DataTypes/DataTypeMap.h>
2021-08-10 01:33:57 +00:00
#include <DataTypes/ObjectUtils.h>
#include <base/logger_useful.h>
namespace DB
{
namespace ErrorCodes
{
2020-02-25 18:02:41 +00:00
extern const int LOGICAL_ERROR;
extern const int SYNTAX_ERROR;
extern const int TYPE_MISMATCH;
2019-09-11 19:55:28 +00:00
extern const int SUPPORT_IS_DISABLED;
extern const int ARGUMENT_OUT_OF_BOUND;
2021-02-01 12:40:57 +00:00
extern const int CANNOT_READ_ALL_DATA;
}
2021-10-20 14:17:20 +00:00
ValuesBlockInputFormat::ValuesBlockInputFormat(
ReadBuffer & in_,
const Block & header_,
const RowInputFormatParams & params_,
const FormatSettings & format_settings_)
: ValuesBlockInputFormat(std::make_unique<PeekableReadBuffer>(in_), header_, params_, format_settings_)
{
}
ValuesBlockInputFormat::ValuesBlockInputFormat(
std::unique_ptr<PeekableReadBuffer> buf_,
const Block & header_,
const RowInputFormatParams & params_,
const FormatSettings & format_settings_)
: IInputFormat(header_, *buf_), buf(std::move(buf_)),
params(params_), format_settings(format_settings_), num_columns(header_.columns()),
parser_type_for_column(num_columns, ParserType::Streaming),
attempts_to_deduce_template(num_columns), attempts_to_deduce_template_cached(num_columns),
rows_parsed_using_template(num_columns), templates(num_columns), types(header_.getDataTypes())
{
2021-03-09 14:46:52 +00:00
serializations.resize(types.size());
for (size_t i = 0; i < types.size(); ++i)
serializations[i] = types[i]->getDefaultSerialization();
}
Chunk ValuesBlockInputFormat::generate()
{
if (total_rows == 0)
readPrefix();
const Block & header = getPort().getHeader();
2019-05-16 02:05:44 +00:00
MutableColumns columns = header.cloneEmptyColumns();
2019-10-16 19:52:00 +00:00
block_missing_values.clear();
for (size_t rows_in_block = 0; rows_in_block < params.max_block_size; ++rows_in_block)
{
try
{
2021-10-20 14:17:20 +00:00
skipWhitespaceIfAny(*buf);
if (buf->eof() || *buf->position() == ';')
2019-05-16 02:05:44 +00:00
break;
2019-10-16 19:52:00 +00:00
readRow(columns, rows_in_block);
}
catch (Exception & e)
{
if (isParseError(e.code()))
e.addMessage(" at row " + std::to_string(total_rows));
throw;
}
}
2019-05-16 02:05:44 +00:00
/// Evaluate expressions, which were parsed using templates, if any
for (size_t i = 0; i < columns.size(); ++i)
{
2019-09-11 19:55:28 +00:00
if (!templates[i] || !templates[i]->rowsCount())
continue;
const auto & expected_type = header.getByPosition(i).type;
if (columns[i]->empty())
columns[i] = IColumn::mutate(templates[i]->evaluateAll(block_missing_values, i, expected_type));
else
{
ColumnPtr evaluated = templates[i]->evaluateAll(block_missing_values, i, expected_type, columns[i]->size());
columns[i]->insertRangeFrom(*evaluated, 0, evaluated->size());
}
}
if (columns.empty() || columns[0]->empty())
{
readSuffix();
return {};
}
2021-08-10 01:33:57 +00:00
finalizeObjectColumns(columns);
size_t rows_in_block = columns[0]->size();
return Chunk{std::move(columns), rows_in_block};
}
2019-10-16 19:52:00 +00:00
void ValuesBlockInputFormat::readRow(MutableColumns & columns, size_t row_num)
2019-09-25 16:08:58 +00:00
{
2021-10-20 14:17:20 +00:00
assertChar('(', *buf);
2019-09-25 16:08:58 +00:00
for (size_t column_idx = 0; column_idx < num_columns; ++column_idx)
{
2021-10-20 14:17:20 +00:00
skipWhitespaceIfAny(*buf);
PeekableReadBufferCheckpoint checkpoint{*buf};
2019-10-16 19:52:00 +00:00
bool read;
2019-09-25 16:08:58 +00:00
/// Parse value using fast streaming parser for literals and slow SQL parser for expressions.
/// If there is SQL expression in some row, template of this expression will be deduced,
/// so it makes possible to parse the following rows much faster
/// if expressions in the following rows have the same structure
if (parser_type_for_column[column_idx] == ParserType::Streaming)
2019-10-16 19:52:00 +00:00
read = tryReadValue(*columns[column_idx], column_idx);
2019-09-25 16:08:58 +00:00
else if (parser_type_for_column[column_idx] == ParserType::BatchTemplate)
2019-10-16 19:52:00 +00:00
read = tryParseExpressionUsingTemplate(columns[column_idx], column_idx);
2019-09-25 16:08:58 +00:00
else /// if (parser_type_for_column[column_idx] == ParserType::SingleExpressionEvaluation)
2019-10-16 19:52:00 +00:00
read = parseExpression(*columns[column_idx], column_idx);
if (!read)
block_missing_values.setBit(column_idx, row_num);
/// If read is true, value still may be missing. Bit mask for these values will be copied from ConstantExpressionTemplate later.
2019-09-25 16:08:58 +00:00
}
2021-10-20 14:17:20 +00:00
skipWhitespaceIfAny(*buf);
if (!buf->eof() && *buf->position() == ',')
++buf->position();
2019-09-25 16:08:58 +00:00
++total_rows;
}
2019-10-16 19:52:00 +00:00
bool ValuesBlockInputFormat::tryParseExpressionUsingTemplate(MutableColumnPtr & column, size_t column_idx)
{
2019-09-11 19:55:28 +00:00
/// Try to parse expression using template if one was successfully deduced while parsing the first row
auto settings = context->getSettingsRef();
2021-10-20 14:17:20 +00:00
if (templates[column_idx]->parseExpression(*buf, format_settings, settings))
{
2019-09-11 19:55:28 +00:00
++rows_parsed_using_template[column_idx];
2019-10-16 19:52:00 +00:00
return true;
}
2019-09-11 19:55:28 +00:00
const auto & header = getPort().getHeader();
const auto & expected_type = header.getByPosition(column_idx).type;
2019-09-11 19:55:28 +00:00
/// Expression in the current row is not match template deduced on the first row.
/// Evaluate expressions, which were parsed using this template.
if (column->empty())
column = IColumn::mutate(templates[column_idx]->evaluateAll(block_missing_values, column_idx, expected_type));
2019-09-11 19:55:28 +00:00
else
{
ColumnPtr evaluated = templates[column_idx]->evaluateAll(block_missing_values, column_idx, expected_type, column->size());
2019-09-11 19:55:28 +00:00
column->insertRangeFrom(*evaluated, 0, evaluated->size());
}
/// Do not use this template anymore
templates[column_idx].reset();
2021-10-20 14:17:20 +00:00
buf->rollbackToCheckpoint();
2019-09-11 19:55:28 +00:00
/// It will deduce new template or fallback to slow SQL parser
2019-10-16 19:52:00 +00:00
return parseExpression(*column, column_idx);
}
2019-10-16 19:52:00 +00:00
bool ValuesBlockInputFormat::tryReadValue(IColumn & column, size_t column_idx)
2019-05-16 02:05:44 +00:00
{
2019-09-13 17:40:48 +00:00
bool rollback_on_exception = false;
2019-05-16 02:05:44 +00:00
try
{
2019-10-16 19:52:00 +00:00
bool read = true;
if (bool default_value = checkStringByFirstCharacterAndAssertTheRestCaseInsensitive("DEFAULT", *buf); default_value)
{
column.insertDefault();
read = false;
}
2019-10-16 19:52:00 +00:00
else
{
const auto & type = types[column_idx];
const auto & serialization = serializations[column_idx];
if (format_settings.null_as_default && !type->isNullable() && !type->isLowCardinalityNullable())
read = SerializationNullable::deserializeTextQuotedImpl(column, *buf, format_settings, serialization);
else
serialization->deserializeTextQuoted(column, *buf, format_settings);
}
2021-03-09 14:46:52 +00:00
2019-09-13 17:40:48 +00:00
rollback_on_exception = true;
2019-05-16 02:05:44 +00:00
2021-10-20 14:17:20 +00:00
skipWhitespaceIfAny(*buf);
2019-09-13 17:40:48 +00:00
assertDelimiterAfterValue(column_idx);
2019-10-16 19:52:00 +00:00
return read;
2019-05-16 02:05:44 +00:00
}
catch (const Exception & e)
{
/// Do not consider decimal overflow as parse error to avoid attempts to parse it as expression with float literal
bool decimal_overflow = e.code() == ErrorCodes::ARGUMENT_OUT_OF_BOUND;
if (!isParseError(e.code()) || decimal_overflow)
2019-05-16 02:05:44 +00:00
throw;
2019-09-13 17:40:48 +00:00
if (rollback_on_exception)
column.popBack(1);
/// Switch to SQL parser and don't try to use streaming parser for complex expressions
/// Note: Throwing exceptions for each expression may be very slow because of stacktraces
2021-10-20 14:17:20 +00:00
buf->rollbackToCheckpoint();
2019-10-16 19:52:00 +00:00
return parseExpression(column, column_idx);
2019-05-16 02:05:44 +00:00
}
}
namespace
{
2021-02-21 11:57:03 +00:00
void tryToReplaceNullFieldsInComplexTypesWithDefaultValues(Field & value, const IDataType & data_type)
{
2021-02-17 10:33:41 +00:00
checkStackSize();
2021-02-17 10:29:06 +00:00
2021-02-21 11:57:03 +00:00
WhichDataType type(data_type);
2021-02-17 10:29:06 +00:00
2021-02-21 11:57:03 +00:00
if (type.isTuple() && value.getType() == Field::Types::Tuple)
2021-02-17 10:29:06 +00:00
{
2021-02-21 11:57:03 +00:00
const DataTypeTuple & type_tuple = static_cast<const DataTypeTuple &>(data_type);
2021-02-17 10:29:06 +00:00
Tuple & tuple_value = value.get<Tuple>();
2021-02-17 10:29:06 +00:00
size_t src_tuple_size = tuple_value.size();
2021-02-21 11:57:03 +00:00
size_t dst_tuple_size = type_tuple.getElements().size();
2021-02-17 10:29:06 +00:00
if (src_tuple_size != dst_tuple_size)
throw Exception(fmt::format("Bad size of tuple. Expected size: {}, actual size: {}.",
std::to_string(src_tuple_size), std::to_string(dst_tuple_size)), ErrorCodes::TYPE_MISMATCH);
2021-02-17 10:29:06 +00:00
for (size_t i = 0; i < src_tuple_size; ++i)
{
2021-02-21 11:57:03 +00:00
const auto & element_type = *(type_tuple.getElements()[i]);
2021-02-17 10:29:06 +00:00
if (tuple_value[i].isNull() && !element_type.isNullable())
tuple_value[i] = element_type.getDefault();
2021-02-21 11:57:03 +00:00
tryToReplaceNullFieldsInComplexTypesWithDefaultValues(tuple_value[i], element_type);
2021-02-17 10:29:06 +00:00
}
}
2021-02-21 11:57:03 +00:00
else if (type.isArray() && value.getType() == Field::Types::Array)
{
2021-02-21 11:57:03 +00:00
const DataTypeArray & type_aray = static_cast<const DataTypeArray &>(data_type);
const auto & element_type = *(type_aray.getNestedType());
2021-02-17 10:29:06 +00:00
if (element_type.isNullable())
return;
2021-02-17 10:29:06 +00:00
Array & array_value = value.get<Array>();
size_t array_value_size = array_value.size();
for (size_t i = 0; i < array_value_size; ++i)
{
if (array_value[i].isNull())
array_value[i] = element_type.getDefault();
2021-02-21 11:57:03 +00:00
tryToReplaceNullFieldsInComplexTypesWithDefaultValues(array_value[i], element_type);
}
}
else if (type.isMap() && value.getType() == Field::Types::Map)
{
const DataTypeMap & type_map = static_cast<const DataTypeMap &>(data_type);
const auto & key_type = *type_map.getKeyType();
const auto & value_type = *type_map.getValueType();
auto & map = value.get<Map>();
size_t map_size = map.size();
for (size_t i = 0; i < map_size; ++i)
{
auto & map_entry = map[i].get<Tuple>();
auto & entry_key = map_entry[0];
auto & entry_value = map_entry[1];
if (entry_key.isNull() && !key_type.isNullable())
entry_key = key_type.getDefault();
tryToReplaceNullFieldsInComplexTypesWithDefaultValues(entry_key, key_type);
if (entry_value.isNull() && !value_type.isNullable())
entry_value = value_type.getDefault();
tryToReplaceNullFieldsInComplexTypesWithDefaultValues(entry_value, value_type);
2021-02-17 10:29:06 +00:00
}
}
}
}
/// Can be used in fileSegmentationEngine for parallel parsing of Values
static bool skipToNextRow(PeekableReadBuffer * buf, size_t min_chunk_bytes, int balance)
{
skipWhitespaceIfAny(*buf);
if (buf->eof() || *buf->position() == ';')
return false;
bool quoted = false;
size_t chunk_begin_buf_count = buf->count();
while (!buf->eof() && (balance || buf->count() - chunk_begin_buf_count < min_chunk_bytes))
{
buf->position() = find_first_symbols<'\\', '\'', ')', '('>(buf->position(), buf->buffer().end());
if (buf->position() == buf->buffer().end())
continue;
if (*buf->position() == '\\')
{
++buf->position();
if (!buf->eof())
++buf->position();
}
else if (*buf->position() == '\'')
{
quoted ^= true;
++buf->position();
}
else if (*buf->position() == ')')
{
++buf->position();
if (!quoted)
--balance;
}
else if (*buf->position() == '(')
{
++buf->position();
if (!quoted)
++balance;
}
}
if (!buf->eof() && *buf->position() == ',')
++buf->position();
return true;
}
2019-10-16 19:52:00 +00:00
bool ValuesBlockInputFormat::parseExpression(IColumn & column, size_t column_idx)
{
const Block & header = getPort().getHeader();
const IDataType & type = *header.getByPosition(column_idx).type;
auto settings = context->getSettingsRef();
/// We need continuous memory containing the expression to use Lexer
skipToNextRow(buf.get(), 0, 1);
2021-10-20 14:17:20 +00:00
buf->makeContinuousMemoryFromCheckpointToPos();
buf->rollbackToCheckpoint();
Expected expected;
2021-10-20 14:17:20 +00:00
Tokens tokens(buf->position(), buf->buffer().end());
IParser::Pos token_iterator(tokens, settings.max_parser_depth);
ASTPtr ast;
bool parsed = parser.parse(token_iterator, ast, expected);
2019-09-06 19:01:44 +00:00
/// Consider delimiter after value (',' or ')') as part of expression
if (column_idx + 1 != num_columns)
parsed &= token_iterator->type == TokenType::Comma;
else
parsed &= token_iterator->type == TokenType::ClosingRoundBracket;
if (!parsed)
throw Exception("Cannot parse expression of type " + type.getName() + " here: "
2021-10-20 14:17:20 +00:00
+ String(buf->position(), std::min(SHOW_CHARS_ON_SYNTAX_ERROR, buf->buffer().end() - buf->position())),
ErrorCodes::SYNTAX_ERROR);
++token_iterator;
2019-10-02 17:51:00 +00:00
if (parser_type_for_column[column_idx] != ParserType::Streaming && dynamic_cast<const ASTLiteral *>(ast.get()))
2019-09-13 17:40:48 +00:00
{
/// It's possible that streaming parsing has failed on some row (e.g. because of '+' sign before integer),
/// but it still can parse the following rows
/// Check if we can use fast streaming parser instead if using templates
bool rollback_on_exception = false;
bool ok = false;
try
{
2021-03-09 14:46:52 +00:00
const auto & serialization = serializations[column_idx];
2021-10-20 14:17:20 +00:00
serialization->deserializeTextQuoted(column, *buf, format_settings);
2019-09-13 17:40:48 +00:00
rollback_on_exception = true;
2021-10-20 14:17:20 +00:00
skipWhitespaceIfAny(*buf);
2019-09-13 17:40:48 +00:00
if (checkDelimiterAfterValue(column_idx))
ok = true;
}
catch (const Exception & e)
{
bool decimal_overflow = e.code() == ErrorCodes::ARGUMENT_OUT_OF_BOUND;
if (!isParseError(e.code()) || decimal_overflow)
2019-09-13 17:40:48 +00:00
throw;
}
if (ok)
{
parser_type_for_column[column_idx] = ParserType::Streaming;
2019-10-16 19:52:00 +00:00
return true;
2019-09-13 17:40:48 +00:00
}
else if (rollback_on_exception)
column.popBack(1);
}
2019-10-02 17:51:00 +00:00
parser_type_for_column[column_idx] = ParserType::SingleExpressionEvaluation;
2019-09-13 17:40:48 +00:00
/// Try to deduce template of expression and use it to parse the following rows
if (shouldDeduceNewTemplate(column_idx))
{
2019-05-16 02:05:44 +00:00
if (templates[column_idx])
throw DB::Exception("Template for column " + std::to_string(column_idx) + " already exists and it was not evaluated yet",
ErrorCodes::LOGICAL_ERROR);
2019-10-02 17:51:00 +00:00
std::exception_ptr exception;
try
{
2019-09-13 17:40:48 +00:00
bool found_in_cache = false;
const auto & result_type = header.getByPosition(column_idx).type;
const char * delimiter = (column_idx + 1 == num_columns) ? ")" : ",";
auto structure = templates_cache.getFromCacheOrConstruct(
result_type,
!result_type->isNullable() && format_settings.null_as_default,
TokenIterator(tokens),
token_iterator,
ast,
context,
&found_in_cache,
delimiter);
2019-09-13 17:40:48 +00:00
templates[column_idx].emplace(structure);
if (found_in_cache)
++attempts_to_deduce_template_cached[column_idx];
else
++attempts_to_deduce_template[column_idx];
2021-10-20 14:17:20 +00:00
buf->rollbackToCheckpoint();
if (templates[column_idx]->parseExpression(*buf, format_settings, settings))
2019-10-02 17:51:00 +00:00
{
++rows_parsed_using_template[column_idx];
parser_type_for_column[column_idx] = ParserType::BatchTemplate;
2019-10-16 19:52:00 +00:00
return true;
2019-10-02 17:51:00 +00:00
}
}
2019-05-16 02:05:44 +00:00
catch (...)
{
2019-10-02 17:51:00 +00:00
exception = std::current_exception();
}
if (!format_settings.values.interpret_expressions)
{
if (exception)
std::rethrow_exception(exception);
else
{
2021-10-20 14:17:20 +00:00
buf->rollbackToCheckpoint();
size_t len = const_cast<char *>(token_iterator->begin) - buf->position();
throw Exception("Cannot deduce template of expression: " + std::string(buf->position(), len), ErrorCodes::SYNTAX_ERROR);
2019-10-02 17:51:00 +00:00
}
}
2019-10-02 17:51:00 +00:00
/// Continue parsing without template
templates[column_idx].reset();
}
2019-05-16 02:05:44 +00:00
2019-09-13 17:40:48 +00:00
if (!format_settings.values.interpret_expressions)
throw Exception("Interpreting expressions is disabled", ErrorCodes::SUPPORT_IS_DISABLED);
/// Try to evaluate single expression if other parsers don't work
2021-10-20 14:17:20 +00:00
buf->position() = const_cast<char *>(token_iterator->begin);
2019-09-13 17:40:48 +00:00
std::pair<Field, DataTypePtr> value_raw = evaluateConstantExpression(ast, context);
2021-02-21 11:57:03 +00:00
Field & expression_value = value_raw.first;
if (format_settings.null_as_default)
2021-02-21 11:57:03 +00:00
tryToReplaceNullFieldsInComplexTypesWithDefaultValues(expression_value, type);
2021-02-21 11:57:03 +00:00
Field value = convertFieldToType(expression_value, type, value_raw.second.get());
2019-09-04 19:42:01 +00:00
/// Check that we are indeed allowed to insert a NULL.
if (value.isNull() && !type.isNullable() && !type.isLowCardinalityNullable())
2019-09-04 19:42:01 +00:00
{
2019-10-16 19:52:00 +00:00
if (format_settings.null_as_default)
{
type.insertDefaultInto(column);
return false;
}
2021-10-20 14:17:20 +00:00
buf->rollbackToCheckpoint();
throw Exception{"Cannot insert NULL value into a column of type '" + type.getName() + "'"
+ " at: " +
2021-10-20 14:17:20 +00:00
String(buf->position(), std::min(SHOW_CHARS_ON_SYNTAX_ERROR, buf->buffer().end() - buf->position())),
ErrorCodes::TYPE_MISMATCH};
2019-09-04 19:42:01 +00:00
}
2019-05-16 02:05:44 +00:00
column.insert(value);
2019-10-16 19:52:00 +00:00
return true;
2019-05-16 02:05:44 +00:00
}
void ValuesBlockInputFormat::assertDelimiterAfterValue(size_t column_idx)
2019-05-16 02:05:44 +00:00
{
2019-09-25 16:08:58 +00:00
if (unlikely(!checkDelimiterAfterValue(column_idx)))
2021-10-20 14:17:20 +00:00
throwAtAssertionFailed((column_idx + 1 == num_columns) ? ")" : ",", *buf);
}
2019-09-04 19:42:01 +00:00
bool ValuesBlockInputFormat::checkDelimiterAfterValue(size_t column_idx)
{
2021-10-20 14:17:20 +00:00
skipWhitespaceIfAny(*buf);
2019-09-04 19:42:01 +00:00
2019-09-25 16:08:58 +00:00
if (likely(column_idx + 1 != num_columns))
2021-10-20 14:17:20 +00:00
return checkChar(',', *buf);
2019-09-04 19:42:01 +00:00
else
2021-10-20 14:17:20 +00:00
return checkChar(')', *buf);
2019-09-04 19:42:01 +00:00
}
bool ValuesBlockInputFormat::shouldDeduceNewTemplate(size_t column_idx)
{
2019-09-11 19:55:28 +00:00
if (!format_settings.values.deduce_templates_of_expressions)
return false;
2019-09-04 19:42:01 +00:00
/// TODO better heuristic
2019-09-13 17:40:48 +00:00
/// Using template from cache is approx 2x faster, than evaluating single expression
/// Construction of new template is approx 1.5x slower, than evaluating single expression
float attempts_weighted = 1.5 * attempts_to_deduce_template[column_idx] + 0.5 * attempts_to_deduce_template_cached[column_idx];
constexpr size_t max_attempts = 100;
if (attempts_weighted < max_attempts)
return true;
2019-09-13 17:40:48 +00:00
if (rows_parsed_using_template[column_idx] / attempts_weighted > 1)
{
/// Try again
2019-09-04 19:42:01 +00:00
attempts_to_deduce_template[column_idx] = 0;
2019-09-13 17:40:48 +00:00
attempts_to_deduce_template_cached[column_idx] = 0;
rows_parsed_using_template[column_idx] = 0;
return true;
}
return false;
}
void ValuesBlockInputFormat::readPrefix()
{
/// In this format, BOM at beginning of stream cannot be confused with value, so it is safe to skip it.
2021-10-20 14:17:20 +00:00
skipBOMIfExists(*buf);
}
2019-09-30 18:21:58 +00:00
void ValuesBlockInputFormat::readSuffix()
{
2021-10-20 14:17:20 +00:00
if (!buf->eof() && *buf->position() == ';')
2021-02-01 12:40:57 +00:00
{
2021-10-20 14:17:20 +00:00
++buf->position();
skipWhitespaceIfAny(*buf);
if (buf->hasUnreadData())
2021-02-01 12:40:57 +00:00
throw Exception("Cannot read data after semicolon", ErrorCodes::CANNOT_READ_ALL_DATA);
return;
}
2021-10-20 14:17:20 +00:00
if (buf->hasUnreadData())
2019-09-30 18:21:58 +00:00
throw Exception("Unread data in PeekableReadBuffer will be lost. Most likely it's a bug.", ErrorCodes::LOGICAL_ERROR);
}
void ValuesBlockInputFormat::resetParser()
{
IInputFormat::resetParser();
2019-12-03 00:50:50 +00:00
// I'm not resetting parser modes here.
2019-12-03 00:51:10 +00:00
// There is a good chance that all messages have the same format.
2021-10-20 14:17:20 +00:00
buf->reset();
total_rows = 0;
}
2021-10-20 14:17:20 +00:00
void ValuesBlockInputFormat::setReadBuffer(ReadBuffer & in_)
{
buf = std::make_unique<PeekableReadBuffer>(in_);
IInputFormat::setReadBuffer(*buf);
}
ValuesSchemaReader::ValuesSchemaReader(ReadBuffer & in_, const FormatSettings & format_settings_, ContextPtr context_)
: IRowSchemaReader(buf, format_settings_.max_rows_to_read_for_schema_inference), buf(in_), context(context_)
{
}
DataTypes ValuesSchemaReader::readRowAndGetDataTypes()
{
if (first_row)
{
skipBOMIfExists(buf);
first_row = false;
}
skipWhitespaceIfAny(buf);
if (buf.eof())
return {};
assertChar('(', buf);
PeekableReadBufferCheckpoint checkpoint(buf);
skipToNextRow(&buf, 0, 1);
buf.makeContinuousMemoryFromCheckpointToPos();
buf.rollbackToCheckpoint();
Tokens tokens(buf.position(), buf.buffer().end());
IParser::Pos token_iterator(tokens, context->getSettingsRef().max_parser_depth);
DataTypes data_types;
bool finish = false;
while (!finish)
{
Expected expected;
ASTPtr ast;
bool parsed = parser.parse(token_iterator, ast, expected);
/// Consider delimiter after value (',' or ')') as part of expression
parsed &= token_iterator->type == TokenType::Comma || token_iterator->type == TokenType::ClosingRoundBracket;
if (!parsed)
throw Exception(ErrorCodes::SYNTAX_ERROR, "Cannot parse expression here: {}, token: {}",
String(buf.position(), std::min(SHOW_CHARS_ON_SYNTAX_ERROR, buf.buffer().end() - buf.position())), String(token_iterator.get().begin, token_iterator.get().end));
std::pair<Field, DataTypePtr> result = evaluateConstantExpression(ast, context);
data_types.push_back(generalizeDataType(result.second));
if (token_iterator->type == TokenType::ClosingRoundBracket)
finish = true;
++token_iterator;
buf.position() = const_cast<char *>(token_iterator->begin);
}
skipWhitespaceIfAny(buf);
if (!buf.eof() && *buf.position() == ',')
++buf.position();
return data_types;
}
2021-10-11 16:11:50 +00:00
void registerInputFormatValues(FormatFactory & factory)
{
2021-10-11 16:11:50 +00:00
factory.registerInputFormat("Values", [](
ReadBuffer & buf,
const Block & header,
const RowInputFormatParams & params,
const FormatSettings & settings)
{
return std::make_shared<ValuesBlockInputFormat>(buf, header, params, settings);
});
}
void registerValuesSchemaReader(FormatFactory & factory)
{
factory.registerSchemaReader("Values", [](ReadBuffer & buf, const FormatSettings & settings, ContextPtr context)
{
return std::make_shared<ValuesSchemaReader>(buf, settings, context);
});
}
}