Merge pull request #30456 from CurtizJ/async-inserts-values

Support VALUES format in async inserts
This commit is contained in:
alexey-milovidov 2021-10-26 09:47:27 +03:00 committed by GitHub
commit e7751c59f6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 134 additions and 74 deletions

View File

@ -407,14 +407,20 @@ try
}
StreamingFormatExecutor executor(header, format, std::move(on_error), std::move(adding_defaults_transform));
std::unique_ptr<ReadBuffer> buffer;
std::unique_ptr<ReadBuffer> last_buffer;
for (const auto & entry : data->entries)
{
buffer = std::make_unique<ReadBufferFromString>(entry->bytes);
auto buffer = std::make_unique<ReadBufferFromString>(entry->bytes);
current_entry = entry;
total_rows += executor.execute(*buffer);
/// Keep buffer, because it still can be used
/// in destructor, while resetting buffer at next iteration.
last_buffer = std::move(buffer);
}
format->addBuffer(std::move(last_buffer));
auto chunk = Chunk(executor.getResultColumns(), total_rows);
size_t total_bytes = chunk.bytes();

View File

@ -131,18 +131,22 @@ bool ParserInsertQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
}
Pos before_values = pos;
String format_str;
/// VALUES or FROM INFILE or FORMAT or SELECT
if (!infile && s_values.ignore(pos, expected))
{
/// If VALUES is defined in query, everything except setting will be parsed as data
data = pos->begin;
format_str = "Values";
}
else if (s_format.ignore(pos, expected))
{
/// If FORMAT is defined, read format name
if (!name_p.parse(pos, format, expected))
return false;
tryGetIdentifierNameInto(format, format_str);
}
else if (s_select.ignore(pos, expected) || s_with.ignore(pos,expected))
{
@ -155,6 +159,8 @@ bool ParserInsertQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
/// FORMAT section is expected if we have input() in SELECT part
if (s_format.ignore(pos, expected) && !name_p.parse(pos, format, expected))
return false;
tryGetIdentifierNameInto(format, format_str);
}
else if (s_watch.ignore(pos, expected))
{
@ -242,9 +248,8 @@ bool ParserInsertQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
tryGetIdentifierNameInto(table, query->table_id.table_name);
}
tryGetIdentifierNameInto(format, query->format);
query->columns = columns;
query->format = std::move(format_str);
query->select = select;
query->watch = watch;
query->settings_ast = settings_ast;

View File

@ -55,6 +55,8 @@ public:
*/
virtual void resetParser();
virtual void setReadBuffer(ReadBuffer & in_);
virtual const BlockMissingValues & getMissingValues() const
{
static const BlockMissingValues none;
@ -70,7 +72,6 @@ public:
void setCurrentUnitNumber(size_t current_unit_number_) { current_unit_number = current_unit_number_; }
void addBuffer(std::unique_ptr<ReadBuffer> buffer) { owned_buffers.emplace_back(std::move(buffer)); }
void setReadBuffer(ReadBuffer & in_);
protected:
ColumnMappingPtr column_mapping{};

View File

@ -32,13 +32,25 @@ namespace ErrorCodes
}
ValuesBlockInputFormat::ValuesBlockInputFormat(ReadBuffer & in_, const Block & header_, const RowInputFormatParams & params_,
const FormatSettings & format_settings_)
: IInputFormat(header_, buf), buf(in_), params(params_),
format_settings(format_settings_), num_columns(header_.columns()),
parser_type_for_column(num_columns, ParserType::Streaming),
attempts_to_deduce_template(num_columns), attempts_to_deduce_template_cached(num_columns),
rows_parsed_using_template(num_columns), templates(num_columns), types(header_.getDataTypes())
ValuesBlockInputFormat::ValuesBlockInputFormat(
ReadBuffer & in_,
const Block & header_,
const RowInputFormatParams & params_,
const FormatSettings & format_settings_)
: ValuesBlockInputFormat(std::make_unique<PeekableReadBuffer>(in_), header_, params_, format_settings_)
{
}
ValuesBlockInputFormat::ValuesBlockInputFormat(
std::unique_ptr<PeekableReadBuffer> buf_,
const Block & header_,
const RowInputFormatParams & params_,
const FormatSettings & format_settings_)
: IInputFormat(header_, *buf_), buf(std::move(buf_)),
params(params_), format_settings(format_settings_), num_columns(header_.columns()),
parser_type_for_column(num_columns, ParserType::Streaming),
attempts_to_deduce_template(num_columns), attempts_to_deduce_template_cached(num_columns),
rows_parsed_using_template(num_columns), templates(num_columns), types(header_.getDataTypes())
{
serializations.resize(types.size());
for (size_t i = 0; i < types.size(); ++i)
@ -58,8 +70,8 @@ Chunk ValuesBlockInputFormat::generate()
{
try
{
skipWhitespaceIfAny(buf);
if (buf.eof() || *buf.position() == ';')
skipWhitespaceIfAny(*buf);
if (buf->eof() || *buf->position() == ';')
break;
readRow(columns, rows_in_block);
}
@ -99,12 +111,12 @@ Chunk ValuesBlockInputFormat::generate()
void ValuesBlockInputFormat::readRow(MutableColumns & columns, size_t row_num)
{
assertChar('(', buf);
assertChar('(', *buf);
for (size_t column_idx = 0; column_idx < num_columns; ++column_idx)
{
skipWhitespaceIfAny(buf);
PeekableReadBufferCheckpoint checkpoint{buf};
skipWhitespaceIfAny(*buf);
PeekableReadBufferCheckpoint checkpoint{*buf};
bool read;
/// Parse value using fast streaming parser for literals and slow SQL parser for expressions.
@ -123,9 +135,9 @@ void ValuesBlockInputFormat::readRow(MutableColumns & columns, size_t row_num)
/// If read is true, value still may be missing. Bit mask for these values will be copied from ConstantExpressionTemplate later.
}
skipWhitespaceIfAny(buf);
if (!buf.eof() && *buf.position() == ',')
++buf.position();
skipWhitespaceIfAny(*buf);
if (!buf->eof() && *buf->position() == ',')
++buf->position();
++total_rows;
}
@ -134,7 +146,7 @@ bool ValuesBlockInputFormat::tryParseExpressionUsingTemplate(MutableColumnPtr &
{
/// Try to parse expression using template if one was successfully deduced while parsing the first row
auto settings = context->getSettingsRef();
if (templates[column_idx]->parseExpression(buf, format_settings, settings))
if (templates[column_idx]->parseExpression(*buf, format_settings, settings))
{
++rows_parsed_using_template[column_idx];
return true;
@ -154,7 +166,7 @@ bool ValuesBlockInputFormat::tryParseExpressionUsingTemplate(MutableColumnPtr &
}
/// Do not use this template anymore
templates[column_idx].reset();
buf.rollbackToCheckpoint();
buf->rollbackToCheckpoint();
/// It will deduce new template or fallback to slow SQL parser
return parseExpression(*column, column_idx);
@ -169,13 +181,13 @@ bool ValuesBlockInputFormat::tryReadValue(IColumn & column, size_t column_idx)
const auto & type = types[column_idx];
const auto & serialization = serializations[column_idx];
if (format_settings.null_as_default && !type->isNullable())
read = SerializationNullable::deserializeTextQuotedImpl(column, buf, format_settings, serialization);
read = SerializationNullable::deserializeTextQuotedImpl(column, *buf, format_settings, serialization);
else
serialization->deserializeTextQuoted(column, buf, format_settings);
serialization->deserializeTextQuoted(column, *buf, format_settings);
rollback_on_exception = true;
skipWhitespaceIfAny(buf);
skipWhitespaceIfAny(*buf);
assertDelimiterAfterValue(column_idx);
return read;
}
@ -190,7 +202,7 @@ bool ValuesBlockInputFormat::tryReadValue(IColumn & column, size_t column_idx)
/// Switch to SQL parser and don't try to use streaming parser for complex expressions
/// Note: Throwing exceptions for each expression may be very slow because of stacktraces
buf.rollbackToCheckpoint();
buf->rollbackToCheckpoint();
return parseExpression(column, column_idx);
}
}
@ -284,11 +296,11 @@ bool ValuesBlockInputFormat::parseExpression(IColumn & column, size_t column_idx
/// We need continuous memory containing the expression to use Lexer
skipToNextRow(0, 1);
buf.makeContinuousMemoryFromCheckpointToPos();
buf.rollbackToCheckpoint();
buf->makeContinuousMemoryFromCheckpointToPos();
buf->rollbackToCheckpoint();
Expected expected;
Tokens tokens(buf.position(), buf.buffer().end());
Tokens tokens(buf->position(), buf->buffer().end());
IParser::Pos token_iterator(tokens, settings.max_parser_depth);
ASTPtr ast;
@ -302,7 +314,7 @@ bool ValuesBlockInputFormat::parseExpression(IColumn & column, size_t column_idx
if (!parsed)
throw Exception("Cannot parse expression of type " + type.getName() + " here: "
+ String(buf.position(), std::min(SHOW_CHARS_ON_SYNTAX_ERROR, buf.buffer().end() - buf.position())),
+ String(buf->position(), std::min(SHOW_CHARS_ON_SYNTAX_ERROR, buf->buffer().end() - buf->position())),
ErrorCodes::SYNTAX_ERROR);
++token_iterator;
@ -316,9 +328,9 @@ bool ValuesBlockInputFormat::parseExpression(IColumn & column, size_t column_idx
try
{
const auto & serialization = serializations[column_idx];
serialization->deserializeTextQuoted(column, buf, format_settings);
serialization->deserializeTextQuoted(column, *buf, format_settings);
rollback_on_exception = true;
skipWhitespaceIfAny(buf);
skipWhitespaceIfAny(*buf);
if (checkDelimiterAfterValue(column_idx))
ok = true;
}
@ -366,8 +378,8 @@ bool ValuesBlockInputFormat::parseExpression(IColumn & column, size_t column_idx
else
++attempts_to_deduce_template[column_idx];
buf.rollbackToCheckpoint();
if (templates[column_idx]->parseExpression(buf, format_settings, settings))
buf->rollbackToCheckpoint();
if (templates[column_idx]->parseExpression(*buf, format_settings, settings))
{
++rows_parsed_using_template[column_idx];
parser_type_for_column[column_idx] = ParserType::BatchTemplate;
@ -384,9 +396,9 @@ bool ValuesBlockInputFormat::parseExpression(IColumn & column, size_t column_idx
std::rethrow_exception(exception);
else
{
buf.rollbackToCheckpoint();
size_t len = const_cast<char *>(token_iterator->begin) - buf.position();
throw Exception("Cannot deduce template of expression: " + std::string(buf.position(), len), ErrorCodes::SYNTAX_ERROR);
buf->rollbackToCheckpoint();
size_t len = const_cast<char *>(token_iterator->begin) - buf->position();
throw Exception("Cannot deduce template of expression: " + std::string(buf->position(), len), ErrorCodes::SYNTAX_ERROR);
}
}
/// Continue parsing without template
@ -397,7 +409,7 @@ bool ValuesBlockInputFormat::parseExpression(IColumn & column, size_t column_idx
throw Exception("Interpreting expressions is disabled", ErrorCodes::SUPPORT_IS_DISABLED);
/// Try to evaluate single expression if other parsers don't work
buf.position() = const_cast<char *>(token_iterator->begin);
buf->position() = const_cast<char *>(token_iterator->begin);
std::pair<Field, DataTypePtr> value_raw = evaluateConstantExpression(ast, context);
@ -416,10 +428,10 @@ bool ValuesBlockInputFormat::parseExpression(IColumn & column, size_t column_idx
type.insertDefaultInto(column);
return false;
}
buf.rollbackToCheckpoint();
buf->rollbackToCheckpoint();
throw Exception{"Cannot insert NULL value into a column of type '" + type.getName() + "'"
+ " at: " +
String(buf.position(), std::min(SHOW_CHARS_ON_SYNTAX_ERROR, buf.buffer().end() - buf.position())),
String(buf->position(), std::min(SHOW_CHARS_ON_SYNTAX_ERROR, buf->buffer().end() - buf->position())),
ErrorCodes::TYPE_MISMATCH};
}
@ -430,61 +442,61 @@ bool ValuesBlockInputFormat::parseExpression(IColumn & column, size_t column_idx
/// Can be used in fileSegmentationEngine for parallel parsing of Values
bool ValuesBlockInputFormat::skipToNextRow(size_t min_chunk_bytes, int balance)
{
skipWhitespaceIfAny(buf);
if (buf.eof() || *buf.position() == ';')
skipWhitespaceIfAny(*buf);
if (buf->eof() || *buf->position() == ';')
return false;
bool quoted = false;
size_t chunk_begin_buf_count = buf.count();
while (!buf.eof() && (balance || buf.count() - chunk_begin_buf_count < min_chunk_bytes))
size_t chunk_begin_buf_count = buf->count();
while (!buf->eof() && (balance || buf->count() - chunk_begin_buf_count < min_chunk_bytes))
{
buf.position() = find_first_symbols<'\\', '\'', ')', '('>(buf.position(), buf.buffer().end());
if (buf.position() == buf.buffer().end())
buf->position() = find_first_symbols<'\\', '\'', ')', '('>(buf->position(), buf->buffer().end());
if (buf->position() == buf->buffer().end())
continue;
if (*buf.position() == '\\')
if (*buf->position() == '\\')
{
++buf.position();
if (!buf.eof())
++buf.position();
++buf->position();
if (!buf->eof())
++buf->position();
}
else if (*buf.position() == '\'')
else if (*buf->position() == '\'')
{
quoted ^= true;
++buf.position();
++buf->position();
}
else if (*buf.position() == ')')
else if (*buf->position() == ')')
{
++buf.position();
++buf->position();
if (!quoted)
--balance;
}
else if (*buf.position() == '(')
else if (*buf->position() == '(')
{
++buf.position();
++buf->position();
if (!quoted)
++balance;
}
}
if (!buf.eof() && *buf.position() == ',')
++buf.position();
if (!buf->eof() && *buf->position() == ',')
++buf->position();
return true;
}
void ValuesBlockInputFormat::assertDelimiterAfterValue(size_t column_idx)
{
if (unlikely(!checkDelimiterAfterValue(column_idx)))
throwAtAssertionFailed((column_idx + 1 == num_columns) ? ")" : ",", buf);
throwAtAssertionFailed((column_idx + 1 == num_columns) ? ")" : ",", *buf);
}
bool ValuesBlockInputFormat::checkDelimiterAfterValue(size_t column_idx)
{
skipWhitespaceIfAny(buf);
skipWhitespaceIfAny(*buf);
if (likely(column_idx + 1 != num_columns))
return checkChar(',', buf);
return checkChar(',', *buf);
else
return checkChar(')', buf);
return checkChar(')', *buf);
}
bool ValuesBlockInputFormat::shouldDeduceNewTemplate(size_t column_idx)
@ -516,21 +528,21 @@ bool ValuesBlockInputFormat::shouldDeduceNewTemplate(size_t column_idx)
void ValuesBlockInputFormat::readPrefix()
{
/// In this format, BOM at beginning of stream cannot be confused with value, so it is safe to skip it.
skipBOMIfExists(buf);
skipBOMIfExists(*buf);
}
void ValuesBlockInputFormat::readSuffix()
{
if (!buf.eof() && *buf.position() == ';')
if (!buf->eof() && *buf->position() == ';')
{
++buf.position();
skipWhitespaceIfAny(buf);
if (buf.hasUnreadData())
++buf->position();
skipWhitespaceIfAny(*buf);
if (buf->hasUnreadData())
throw Exception("Cannot read data after semicolon", ErrorCodes::CANNOT_READ_ALL_DATA);
return;
}
if (buf.hasUnreadData())
if (buf->hasUnreadData())
throw Exception("Unread data in PeekableReadBuffer will be lost. Most likely it's a bug.", ErrorCodes::LOGICAL_ERROR);
}
@ -539,10 +551,16 @@ void ValuesBlockInputFormat::resetParser()
IInputFormat::resetParser();
// I'm not resetting parser modes here.
// There is a good chance that all messages have the same format.
buf.reset();
buf->reset();
total_rows = 0;
}
void ValuesBlockInputFormat::setReadBuffer(ReadBuffer & in_)
{
buf = std::make_unique<PeekableReadBuffer>(in_);
IInputFormat::setReadBuffer(*buf);
}
void registerInputFormatValues(FormatFactory & factory)
{
factory.registerInputFormat("Values", [](

View File

@ -32,6 +32,7 @@ public:
String getName() const override { return "ValuesBlockInputFormat"; }
void resetParser() override;
void setReadBuffer(ReadBuffer & in_) override;
/// TODO: remove context somehow.
void setContext(ContextPtr context_) { context = Context::createCopy(context_); }
@ -39,6 +40,9 @@ public:
const BlockMissingValues & getMissingValues() const override { return block_missing_values; }
private:
ValuesBlockInputFormat(std::unique_ptr<PeekableReadBuffer> buf_, const Block & header_, const RowInputFormatParams & params_,
const FormatSettings & format_settings_);
enum class ParserType
{
Streaming,
@ -66,7 +70,7 @@ private:
bool skipToNextRow(size_t min_chunk_bytes = 0, int balance = 0);
PeekableReadBuffer buf;
std::unique_ptr<PeekableReadBuffer> buf;
const RowInputFormatParams params;

View File

@ -42,12 +42,12 @@ InputFormatPtr getInputFormatFromASTInsertQuery(
if (ast_insert_query->infile && context->getApplicationType() == Context::ApplicationType::SERVER)
throw Exception("Query has infile and was send directly to server", ErrorCodes::UNKNOWN_TYPE_OF_QUERY);
String format = ast_insert_query->format;
if (format.empty())
if (ast_insert_query->format.empty())
{
if (input_function)
throw Exception("FORMAT must be specified for function input()", ErrorCodes::INVALID_USAGE_OF_INPUT);
format = "Values";
else
throw Exception("Logical error: INSERT query requires format to be set", ErrorCodes::LOGICAL_ERROR);
}
/// Data could be in parsed (ast_insert_query.data) and in not parsed yet (input_buffer_tail_part) part of query.
@ -59,7 +59,7 @@ InputFormatPtr getInputFormatFromASTInsertQuery(
: std::make_unique<EmptyReadBuffer>();
/// Create a source from input buffer using format from query
auto source = context->getInputFormat(format, *input_buffer, header, context->getSettings().max_insert_block_size);
auto source = context->getInputFormat(ast_insert_query->format, *input_buffer, header, context->getSettings().max_insert_block_size);
source->addBuffer(std::move(input_buffer));
return source;
}

View File

@ -0,0 +1,6 @@
1 a
2 b
3 c
4 d
5 e
6 f

View File

@ -0,0 +1,20 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
url="${CLICKHOUSE_URL}&async_insert=1&wait_for_async_insert=1"
${CLICKHOUSE_CLIENT} -q "DROP TABLE IF EXISTS async_inserts"
${CLICKHOUSE_CLIENT} -q "CREATE TABLE async_inserts (id UInt32, s String) ENGINE = Memory"
${CLICKHOUSE_CURL} -sS $url -d "INSERT INTO async_inserts VALUES (1, 'a') (2, 'b')" &
${CLICKHOUSE_CURL} -sS $url -d "INSERT INTO async_inserts VALUES (3, 'c'), (4, 'd')" &
${CLICKHOUSE_CURL} -sS $url -d "INSERT INTO async_inserts VALUES (5, 'e'), (6, 'f'), " &
wait
${CLICKHOUSE_CLIENT} -q "SELECT * FROM async_inserts ORDER BY id"
${CLICKHOUSE_CLIENT} -q "DROP TABLE async_inserts"