Merge branch 'master' into fix_special_build_check

This commit is contained in:
alesapin 2021-12-14 19:08:28 +03:00
commit d7663b2179
17 changed files with 357 additions and 150 deletions

View File

@ -421,6 +421,7 @@ ASTPtr MutationsInterpreter::prepare(bool dry_run)
NameSet updated_columns;
bool materialize_ttl_recalculate_only = materializeTTLRecalculateOnly(storage);
for (const MutationCommand & command : commands)
{
if (command.type == MutationCommand::Type::UPDATE
@ -631,7 +632,9 @@ ASTPtr MutationsInterpreter::prepare(bool dry_run)
dependencies.insert(dependency);
}
}
else if (metadata_snapshot->hasRowsTTL())
else if (metadata_snapshot->hasRowsTTL()
|| metadata_snapshot->hasAnyRowsWhereTTL()
|| metadata_snapshot->hasAnyGroupByTTL())
{
for (const auto & column : all_columns)
dependencies.emplace(column.name, ColumnDependency::TTL_TARGET);

View File

@ -64,57 +64,57 @@ CustomSeparatedRowInputFormat::CustomSeparatedRowInputFormat(
void CustomSeparatedRowInputFormat::skipPrefixBeforeHeader()
{
skipSpaces();
assertString(format_settings.custom.result_before_delimiter, buf);
assertString(format_settings.custom.result_before_delimiter, *buf);
}
void CustomSeparatedRowInputFormat::skipRowStartDelimiter()
{
skipSpaces();
assertString(format_settings.custom.row_before_delimiter, buf);
assertString(format_settings.custom.row_before_delimiter, *buf);
}
void CustomSeparatedRowInputFormat::skipFieldDelimiter()
{
skipSpaces();
assertString(format_settings.custom.field_delimiter, buf);
assertString(format_settings.custom.field_delimiter, *buf);
}
void CustomSeparatedRowInputFormat::skipRowEndDelimiter()
{
skipSpaces();
assertString(format_settings.custom.row_after_delimiter, buf);
assertString(format_settings.custom.row_after_delimiter, *buf);
}
void CustomSeparatedRowInputFormat::skipRowBetweenDelimiter()
{
skipSpaces();
assertString(format_settings.custom.row_between_delimiter, buf);
assertString(format_settings.custom.row_between_delimiter, *buf);
}
void CustomSeparatedRowInputFormat::skipField()
{
skipSpaces();
skipFieldByEscapingRule(buf, escaping_rule, format_settings);
skipFieldByEscapingRule(*buf, escaping_rule, format_settings);
}
bool CustomSeparatedRowInputFormat::checkEndOfRow()
{
PeekableReadBufferCheckpoint checkpoint{buf, true};
PeekableReadBufferCheckpoint checkpoint{*buf, true};
skipSpaces();
if (!checkString(format_settings.custom.row_after_delimiter, buf))
if (!checkString(format_settings.custom.row_after_delimiter, *buf))
return false;
skipSpaces();
/// At the end of row after row_after_delimiter we expect result_after_delimiter or row_between_delimiter.
if (checkString(format_settings.custom.row_between_delimiter, buf))
if (checkString(format_settings.custom.row_between_delimiter, *buf))
return true;
buf.rollbackToCheckpoint();
buf->rollbackToCheckpoint();
skipSpaces();
buf.ignore(format_settings.custom.row_after_delimiter.size());
buf->ignore(format_settings.custom.row_after_delimiter.size());
return checkForSuffixImpl(true);
}
@ -127,7 +127,7 @@ std::vector<String> CustomSeparatedRowInputFormat::readHeaderRow()
if (!values.empty())
skipFieldDelimiter();
skipSpaces();
values.push_back(readStringByEscapingRule(buf, escaping_rule, format_settings));
values.push_back(readStringByEscapingRule(*buf, escaping_rule, format_settings));
}
while (!checkEndOfRow());
@ -151,7 +151,7 @@ void CustomSeparatedRowInputFormat::skipHeaderRow()
bool CustomSeparatedRowInputFormat::readField(IColumn & column, const DataTypePtr & type, const SerializationPtr & serialization, bool, const String &)
{
skipSpaces();
return deserializeFieldByEscapingRule(type, serialization, column, buf, escaping_rule, format_settings);
return deserializeFieldByEscapingRule(type, serialization, column, *buf, escaping_rule, format_settings);
}
bool CustomSeparatedRowInputFormat::checkForSuffixImpl(bool check_eof)
@ -162,16 +162,16 @@ bool CustomSeparatedRowInputFormat::checkForSuffixImpl(bool check_eof)
if (!check_eof)
return false;
return buf.eof();
return buf->eof();
}
if (unlikely(checkString(format_settings.custom.result_after_delimiter, buf)))
if (unlikely(checkString(format_settings.custom.result_after_delimiter, *buf)))
{
skipSpaces();
if (!check_eof)
return true;
if (buf.eof())
if (buf->eof())
return true;
}
return false;
@ -179,25 +179,25 @@ bool CustomSeparatedRowInputFormat::checkForSuffixImpl(bool check_eof)
bool CustomSeparatedRowInputFormat::tryParseSuffixWithDiagnosticInfo(WriteBuffer & out)
{
PeekableReadBufferCheckpoint checkpoint{buf};
PeekableReadBufferCheckpoint checkpoint{*buf};
if (checkForSuffixImpl(false))
{
if (buf.eof())
if (buf->eof())
out << "<End of stream>\n";
else
out << " There is some data after suffix\n";
return false;
}
buf.rollbackToCheckpoint();
buf->rollbackToCheckpoint();
return true;
}
bool CustomSeparatedRowInputFormat::checkForSuffix()
{
PeekableReadBufferCheckpoint checkpoint{buf};
PeekableReadBufferCheckpoint checkpoint{*buf};
if (checkForSuffixImpl(true))
return true;
buf.rollbackToCheckpoint();
buf->rollbackToCheckpoint();
return false;
}
@ -209,37 +209,43 @@ bool CustomSeparatedRowInputFormat::allowSyncAfterError() const
void CustomSeparatedRowInputFormat::syncAfterError()
{
skipToNextRowOrEof(buf, format_settings.custom.row_after_delimiter, format_settings.custom.row_between_delimiter, ignore_spaces);
end_of_stream = buf.eof();
/// It can happen that buf.position() is not at the beginning of row
skipToNextRowOrEof(*buf, format_settings.custom.row_after_delimiter, format_settings.custom.row_between_delimiter, ignore_spaces);
end_of_stream = buf->eof();
/// It can happen that buf->position() is not at the beginning of row
/// if some delimiters is similar to row_format.delimiters.back() and row_between_delimiter.
/// It will cause another parsing error.
}
bool CustomSeparatedRowInputFormat::parseRowStartWithDiagnosticInfo(WriteBuffer & out)
{
return parseDelimiterWithDiagnosticInfo(out, buf, format_settings.custom.row_before_delimiter, "delimiter before first field", ignore_spaces);
return parseDelimiterWithDiagnosticInfo(out, *buf, format_settings.custom.row_before_delimiter, "delimiter before first field", ignore_spaces);
}
bool CustomSeparatedRowInputFormat::parseFieldDelimiterWithDiagnosticInfo(WriteBuffer & out)
{
return parseDelimiterWithDiagnosticInfo(out, buf, format_settings.custom.field_delimiter, "delimiter between fields", ignore_spaces);
return parseDelimiterWithDiagnosticInfo(out, *buf, format_settings.custom.field_delimiter, "delimiter between fields", ignore_spaces);
}
bool CustomSeparatedRowInputFormat::parseRowEndWithDiagnosticInfo(WriteBuffer & out)
{
return parseDelimiterWithDiagnosticInfo(out, buf, format_settings.custom.row_after_delimiter, "delimiter after last field", ignore_spaces);
return parseDelimiterWithDiagnosticInfo(out, *buf, format_settings.custom.row_after_delimiter, "delimiter after last field", ignore_spaces);
}
bool CustomSeparatedRowInputFormat::parseRowBetweenDelimiterWithDiagnosticInfo(WriteBuffer & out)
{
return parseDelimiterWithDiagnosticInfo(out, buf, format_settings.custom.row_between_delimiter, "delimiter between rows", ignore_spaces);
return parseDelimiterWithDiagnosticInfo(out, *buf, format_settings.custom.row_between_delimiter, "delimiter between rows", ignore_spaces);
}
void CustomSeparatedRowInputFormat::resetParser()
{
RowInputFormatWithNamesAndTypes::resetParser();
buf.reset();
buf->reset();
}
void CustomSeparatedRowInputFormat::setReadBuffer(ReadBuffer & in_)
{
buf = std::make_unique<PeekableReadBuffer>(in_);
IInputFormat::setReadBuffer(*buf);
}
void registerInputFormatCustomSeparated(FormatFactory & factory)

View File

@ -20,6 +20,8 @@ public:
void resetParser() override;
String getName() const override { return "CustomSeparatedRowInputFormat"; }
void setReadBuffer(ReadBuffer & in_) override;
private:
CustomSeparatedRowInputFormat(
const Block & header_,
@ -59,9 +61,9 @@ private:
bool checkEndOfRow();
bool checkForSuffixImpl(bool check_eof);
inline void skipSpaces() { if (ignore_spaces) skipWhitespaceIfAny(buf); }
inline void skipSpaces() { if (ignore_spaces) skipWhitespaceIfAny(*buf); }
PeekableReadBuffer buf;
std::unique_ptr<PeekableReadBuffer> buf;
bool ignore_spaces;
EscapingRule escaping_rule;
};

View File

@ -14,8 +14,11 @@ namespace ErrorCodes
extern const int INCORRECT_DATA;
}
JSONAsStringRowInputFormat::JSONAsStringRowInputFormat(const Block & header_, ReadBuffer & in_, Params params_) :
IRowInputFormat(header_, buf, std::move(params_)), buf(in_)
JSONAsStringRowInputFormat::JSONAsStringRowInputFormat(const Block & header_, ReadBuffer & in_, Params params_)
: JSONAsStringRowInputFormat(header_, std::make_unique<PeekableReadBuffer>(in_), params_) {}
JSONAsStringRowInputFormat::JSONAsStringRowInputFormat(const Block & header_, std::unique_ptr<PeekableReadBuffer> buf_, Params params_) :
IRowInputFormat(header_, *buf_, std::move(params_)), buf(std::move(buf_))
{
if (header_.columns() > 1)
throw Exception(ErrorCodes::BAD_ARGUMENTS,
@ -31,113 +34,113 @@ JSONAsStringRowInputFormat::JSONAsStringRowInputFormat(const Block & header_, Re
void JSONAsStringRowInputFormat::resetParser()
{
IRowInputFormat::resetParser();
buf.reset();
buf->reset();
}
void JSONAsStringRowInputFormat::readPrefix()
{
/// In this format, BOM at beginning of stream cannot be confused with value, so it is safe to skip it.
skipBOMIfExists(buf);
skipBOMIfExists(*buf);
skipWhitespaceIfAny(buf);
if (!buf.eof() && *buf.position() == '[')
skipWhitespaceIfAny(*buf);
if (!buf->eof() && *buf->position() == '[')
{
++buf.position();
++buf->position();
data_in_square_brackets = true;
}
}
void JSONAsStringRowInputFormat::readSuffix()
{
skipWhitespaceIfAny(buf);
skipWhitespaceIfAny(*buf);
if (data_in_square_brackets)
{
assertChar(']', buf);
skipWhitespaceIfAny(buf);
assertChar(']', *buf);
skipWhitespaceIfAny(*buf);
}
if (!buf.eof() && *buf.position() == ';')
if (!buf->eof() && *buf->position() == ';')
{
++buf.position();
skipWhitespaceIfAny(buf);
++buf->position();
skipWhitespaceIfAny(*buf);
}
assertEOF(buf);
assertEOF(*buf);
}
void JSONAsStringRowInputFormat::readJSONObject(IColumn & column)
{
PeekableReadBufferCheckpoint checkpoint{buf};
PeekableReadBufferCheckpoint checkpoint{*buf};
size_t balance = 0;
bool quotes = false;
if (*buf.position() != '{')
if (*buf->position() != '{')
throw Exception("JSON object must begin with '{'.", ErrorCodes::INCORRECT_DATA);
++buf.position();
++buf->position();
++balance;
char * pos;
while (balance)
{
if (buf.eof())
if (buf->eof())
throw Exception("Unexpected end of file while parsing JSON object.", ErrorCodes::INCORRECT_DATA);
if (quotes)
{
pos = find_first_symbols<'"', '\\'>(buf.position(), buf.buffer().end());
buf.position() = pos;
if (buf.position() == buf.buffer().end())
pos = find_first_symbols<'"', '\\'>(buf->position(), buf->buffer().end());
buf->position() = pos;
if (buf->position() == buf->buffer().end())
continue;
if (*buf.position() == '"')
if (*buf->position() == '"')
{
quotes = false;
++buf.position();
++buf->position();
}
else if (*buf.position() == '\\')
else if (*buf->position() == '\\')
{
++buf.position();
if (!buf.eof())
++buf->position();
if (!buf->eof())
{
++buf.position();
++buf->position();
}
}
}
else
{
pos = find_first_symbols<'"', '{', '}', '\\'>(buf.position(), buf.buffer().end());
buf.position() = pos;
if (buf.position() == buf.buffer().end())
pos = find_first_symbols<'"', '{', '}', '\\'>(buf->position(), buf->buffer().end());
buf->position() = pos;
if (buf->position() == buf->buffer().end())
continue;
if (*buf.position() == '{')
if (*buf->position() == '{')
{
++balance;
++buf.position();
++buf->position();
}
else if (*buf.position() == '}')
else if (*buf->position() == '}')
{
--balance;
++buf.position();
++buf->position();
}
else if (*buf.position() == '\\')
else if (*buf->position() == '\\')
{
++buf.position();
if (!buf.eof())
++buf->position();
if (!buf->eof())
{
++buf.position();
++buf->position();
}
}
else if (*buf.position() == '"')
else if (*buf->position() == '"')
{
quotes = true;
++buf.position();
++buf->position();
}
}
}
buf.makeContinuousMemoryFromCheckpointToPos();
char * end = buf.position();
buf.rollbackToCheckpoint();
column.insertData(buf.position(), end - buf.position());
buf.position() = end;
buf->makeContinuousMemoryFromCheckpointToPos();
char * end = buf->position();
buf->rollbackToCheckpoint();
column.insertData(buf->position(), end - buf->position());
buf->position() = end;
}
bool JSONAsStringRowInputFormat::readRow(MutableColumns & columns, RowReadExtension &)
@ -145,30 +148,36 @@ bool JSONAsStringRowInputFormat::readRow(MutableColumns & columns, RowReadExtens
if (!allow_new_rows)
return false;
skipWhitespaceIfAny(buf);
if (!buf.eof())
skipWhitespaceIfAny(*buf);
if (!buf->eof())
{
if (!data_in_square_brackets && *buf.position() == ';')
if (!data_in_square_brackets && *buf->position() == ';')
{
/// ';' means the end of query, but it cannot be before ']'.
return allow_new_rows = false;
}
else if (data_in_square_brackets && *buf.position() == ']')
else if (data_in_square_brackets && *buf->position() == ']')
{
/// ']' means the end of query.
return allow_new_rows = false;
}
}
if (!buf.eof())
if (!buf->eof())
readJSONObject(*columns[0]);
skipWhitespaceIfAny(buf);
if (!buf.eof() && *buf.position() == ',')
++buf.position();
skipWhitespaceIfAny(buf);
skipWhitespaceIfAny(*buf);
if (!buf->eof() && *buf->position() == ',')
++buf->position();
skipWhitespaceIfAny(*buf);
return !buf.eof();
return !buf->eof();
}
void JSONAsStringRowInputFormat::setReadBuffer(ReadBuffer & in_)
{
buf = std::make_unique<PeekableReadBuffer>(in_);
IInputFormat::setReadBuffer(*buf);
}
void registerInputFormatJSONAsString(FormatFactory & factory)

View File

@ -20,8 +20,11 @@ public:
String getName() const override { return "JSONAsStringRowInputFormat"; }
void resetParser() override;
void setReadBuffer(ReadBuffer & in_) override;
private:
JSONAsStringRowInputFormat(const Block & header_, std::unique_ptr<PeekableReadBuffer> buf_, Params params_);
bool readRow(MutableColumns & columns, RowReadExtension & ext) override;
void readPrefix() override;
@ -29,7 +32,7 @@ private:
void readJSONObject(IColumn & column);
PeekableReadBuffer buf;
std::unique_ptr<PeekableReadBuffer> buf;
/// This flag is needed to know if data is in square brackets.
bool data_in_square_brackets = false;

View File

@ -29,12 +29,15 @@ namespace ErrorCodes
}
MsgPackRowInputFormat::MsgPackRowInputFormat(const Block & header_, ReadBuffer & in_, Params params_)
: IRowInputFormat(header_, buf, std::move(params_)), buf(in_), parser(visitor), data_types(header_.getDataTypes()) {}
: MsgPackRowInputFormat(header_, std::make_unique<PeekableReadBuffer>(in_), params_) {}
MsgPackRowInputFormat::MsgPackRowInputFormat(const Block & header_, std::unique_ptr<PeekableReadBuffer> buf_, Params params_)
: IRowInputFormat(header_, *buf_, std::move(params_)), buf(std::move(buf_)), parser(visitor), data_types(header_.getDataTypes()) {}
void MsgPackRowInputFormat::resetParser()
{
IRowInputFormat::resetParser();
buf.reset();
buf->reset();
visitor.reset();
}
@ -325,21 +328,21 @@ void MsgPackVisitor::parse_error(size_t, size_t) // NOLINT
bool MsgPackRowInputFormat::readObject()
{
if (buf.eof())
if (buf->eof())
return false;
PeekableReadBufferCheckpoint checkpoint{buf};
PeekableReadBufferCheckpoint checkpoint{*buf};
size_t offset = 0;
while (!parser.execute(buf.position(), buf.available(), offset))
while (!parser.execute(buf->position(), buf->available(), offset))
{
buf.position() = buf.buffer().end();
if (buf.eof())
buf->position() = buf->buffer().end();
if (buf->eof())
throw Exception("Unexpected end of file while parsing msgpack object.", ErrorCodes::INCORRECT_DATA);
buf.position() = buf.buffer().end();
buf.makeContinuousMemoryFromCheckpointToPos();
buf.rollbackToCheckpoint();
buf->position() = buf->buffer().end();
buf->makeContinuousMemoryFromCheckpointToPos();
buf->rollbackToCheckpoint();
}
buf.position() += offset;
buf->position() += offset;
return true;
}
@ -363,6 +366,12 @@ bool MsgPackRowInputFormat::readRow(MutableColumns & columns, RowReadExtension &
return true;
}
void MsgPackRowInputFormat::setReadBuffer(ReadBuffer & in_)
{
buf = std::make_unique<PeekableReadBuffer>(in_);
IInputFormat::setReadBuffer(*buf);
}
void registerInputFormatMsgPack(FormatFactory & factory)
{
factory.registerInputFormat("MsgPack", [](

View File

@ -61,13 +61,16 @@ public:
String getName() const override { return "MagPackRowInputFormat"; }
void resetParser() override;
void setReadBuffer(ReadBuffer & in_) override;
private:
MsgPackRowInputFormat(const Block & header_, std::unique_ptr<PeekableReadBuffer> buf_, Params params_);
bool readRow(MutableColumns & columns, RowReadExtension & ext) override;
bool readObject();
PeekableReadBuffer buf;
std::unique_ptr<PeekableReadBuffer> buf;
MsgPackVisitor visitor;
msgpack::detail::parse_helper<MsgPackVisitor> parser;
const DataTypes data_types;

View File

@ -14,10 +14,15 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
RegexpRowInputFormat::RegexpRowInputFormat(ReadBuffer & in_, const Block & header_, Params params_, const FormatSettings & format_settings_)
: RegexpRowInputFormat(std::make_unique<PeekableReadBuffer>(in_), header_, params_, format_settings_)
{
}
RegexpRowInputFormat::RegexpRowInputFormat(
ReadBuffer & in_, const Block & header_, Params params_, const FormatSettings & format_settings_)
: IRowInputFormat(header_, buf, std::move(params_))
, buf(in_)
std::unique_ptr<PeekableReadBuffer> buf_, const Block & header_, Params params_, const FormatSettings & format_settings_)
: IRowInputFormat(header_, *buf_, std::move(params_))
, buf(std::move(buf_))
, format_settings(format_settings_)
, escaping_rule(format_settings_.regexp.escaping_rule)
, regexp(format_settings_.regexp.regexp)
@ -39,7 +44,7 @@ RegexpRowInputFormat::RegexpRowInputFormat(
void RegexpRowInputFormat::resetParser()
{
IRowInputFormat::resetParser();
buf.reset();
buf->reset();
}
bool RegexpRowInputFormat::readField(size_t index, MutableColumns & columns)
@ -71,45 +76,51 @@ void RegexpRowInputFormat::readFieldsFromMatch(MutableColumns & columns, RowRead
bool RegexpRowInputFormat::readRow(MutableColumns & columns, RowReadExtension & ext)
{
if (buf.eof())
if (buf->eof())
return false;
PeekableReadBufferCheckpoint checkpoint{buf};
PeekableReadBufferCheckpoint checkpoint{*buf};
size_t line_size = 0;
do
{
char * pos = find_first_symbols<'\n', '\r'>(buf.position(), buf.buffer().end());
line_size += pos - buf.position();
buf.position() = pos;
} while (buf.position() == buf.buffer().end() && !buf.eof());
char * pos = find_first_symbols<'\n', '\r'>(buf->position(), buf->buffer().end());
line_size += pos - buf->position();
buf->position() = pos;
} while (buf->position() == buf->buffer().end() && !buf->eof());
buf.makeContinuousMemoryFromCheckpointToPos();
buf.rollbackToCheckpoint();
buf->makeContinuousMemoryFromCheckpointToPos();
buf->rollbackToCheckpoint();
bool match = RE2::FullMatchN(re2::StringPiece(buf.position(), line_size), regexp, re2_arguments_ptrs.data(), re2_arguments_ptrs.size());
bool match = RE2::FullMatchN(re2::StringPiece(buf->position(), line_size), regexp, re2_arguments_ptrs.data(), re2_arguments_ptrs.size());
bool read_line = true;
if (!match)
{
if (!format_settings.regexp.skip_unmatched)
throw Exception("Line \"" + std::string(buf.position(), line_size) + "\" doesn't match the regexp.", ErrorCodes::INCORRECT_DATA);
throw Exception("Line \"" + std::string(buf->position(), line_size) + "\" doesn't match the regexp.", ErrorCodes::INCORRECT_DATA);
read_line = false;
}
if (read_line)
readFieldsFromMatch(columns, ext);
buf.position() += line_size;
buf->position() += line_size;
checkChar('\r', buf);
if (!buf.eof() && !checkChar('\n', buf))
checkChar('\r', *buf);
if (!buf->eof() && !checkChar('\n', *buf))
throw Exception("No \\n after \\r at the end of line.", ErrorCodes::INCORRECT_DATA);
return true;
}
void RegexpRowInputFormat::setReadBuffer(ReadBuffer & in_)
{
buf = std::make_unique<PeekableReadBuffer>(in_);
IInputFormat::setReadBuffer(*buf);
}
void registerInputFormatRegexp(FormatFactory & factory)
{
factory.registerInputFormat("Regexp", [](

View File

@ -31,14 +31,17 @@ public:
String getName() const override { return "RegexpRowInputFormat"; }
void resetParser() override;
void setReadBuffer(ReadBuffer & in_) override;
private:
RegexpRowInputFormat(std::unique_ptr<PeekableReadBuffer> buf_, const Block & header_, Params params_, const FormatSettings & format_settings_);
bool readRow(MutableColumns & columns, RowReadExtension & ext) override;
bool readField(size_t index, MutableColumns & columns);
void readFieldsFromMatch(MutableColumns & columns, RowReadExtension & ext);
PeekableReadBuffer buf;
std::unique_ptr<PeekableReadBuffer> buf;
const FormatSettings format_settings;
const EscapingRule escaping_rule;

View File

@ -20,11 +20,25 @@ extern const int SYNTAX_ERROR;
}
TemplateRowInputFormat::TemplateRowInputFormat(const Block & header_, ReadBuffer & in_, const Params & params_,
TemplateRowInputFormat::TemplateRowInputFormat(
const Block & header_,
ReadBuffer & in_,
const Params & params_,
FormatSettings settings_,
bool ignore_spaces_,
ParsedTemplateFormatString format_,
ParsedTemplateFormatString row_format_,
std::string row_between_delimiter_)
: TemplateRowInputFormat(
header_, std::make_unique<PeekableReadBuffer>(in_), params_, settings_, ignore_spaces_, format_, row_format_, row_between_delimiter_)
{
}
TemplateRowInputFormat::TemplateRowInputFormat(const Block & header_, std::unique_ptr<PeekableReadBuffer> buf_, const Params & params_,
FormatSettings settings_, bool ignore_spaces_,
ParsedTemplateFormatString format_, ParsedTemplateFormatString row_format_,
std::string row_between_delimiter_)
: RowInputFormatWithDiagnosticInfo(header_, buf, params_), buf(in_), data_types(header_.getDataTypes()),
: RowInputFormatWithDiagnosticInfo(header_, *buf_, params_), buf(std::move(buf_)), data_types(header_.getDataTypes()),
settings(std::move(settings_)), ignore_spaces(ignore_spaces_),
format(std::move(format_)), row_format(std::move(row_format_)),
default_csv_delimiter(settings.csv.delimiter), row_between_delimiter(std::move(row_between_delimiter_))
@ -101,10 +115,10 @@ ReturnType TemplateRowInputFormat::tryReadPrefixOrSuffix(size_t & input_part_beg
skipSpaces();
if constexpr (throw_exception)
assertString(format.delimiters[input_part_beg], buf);
assertString(format.delimiters[input_part_beg], *buf);
else
{
if (likely(!checkString(format.delimiters[input_part_beg], buf)))
if (likely(!checkString(format.delimiters[input_part_beg], *buf)))
return ReturnType(false);
}
@ -133,10 +147,10 @@ ReturnType TemplateRowInputFormat::tryReadPrefixOrSuffix(size_t & input_part_beg
skipSpaces();
if constexpr (throw_exception)
assertString(format.delimiters[input_part_beg], buf);
assertString(format.delimiters[input_part_beg], *buf);
else
{
if (likely(!checkString(format.delimiters[input_part_beg], buf)))
if (likely(!checkString(format.delimiters[input_part_beg], *buf)))
return ReturnType(false);
}
}
@ -162,14 +176,14 @@ bool TemplateRowInputFormat::readRow(MutableColumns & columns, RowReadExtension
updateDiagnosticInfo();
if (likely(row_num != 1))
assertString(row_between_delimiter, buf);
assertString(row_between_delimiter, *buf);
extra.read_columns.assign(columns.size(), false);
for (size_t i = 0; i < row_format.columnsCount(); ++i)
{
skipSpaces();
assertString(row_format.delimiters[i], buf);
assertString(row_format.delimiters[i], *buf);
skipSpaces();
if (row_format.format_idx_to_column_idx[i])
{
@ -182,7 +196,7 @@ bool TemplateRowInputFormat::readRow(MutableColumns & columns, RowReadExtension
}
skipSpaces();
assertString(row_format.delimiters.back(), buf);
assertString(row_format.delimiters.back(), *buf);
for (const auto & idx : always_default_columns)
data_types[idx]->insertDefaultInto(*columns[idx]);
@ -200,7 +214,7 @@ bool TemplateRowInputFormat::deserializeField(const DataTypePtr & type,
row_format.delimiters[file_column + 1].front();
try
{
return deserializeFieldByEscapingRule(type, serialization, column, buf, escaping_rule, settings);
return deserializeFieldByEscapingRule(type, serialization, column, *buf, escaping_rule, settings);
}
catch (Exception & e)
{
@ -214,7 +228,7 @@ void TemplateRowInputFormat::skipField(TemplateRowInputFormat::EscapingRule esca
{
try
{
skipFieldByEscapingRule(buf, escaping_rule, settings);
skipFieldByEscapingRule(*buf, escaping_rule, settings);
}
catch (Exception & e)
{
@ -228,7 +242,7 @@ void TemplateRowInputFormat::skipField(TemplateRowInputFormat::EscapingRule esca
/// Otherwise returns false
bool TemplateRowInputFormat::checkForSuffix()
{
PeekableReadBufferCheckpoint checkpoint{buf};
PeekableReadBufferCheckpoint checkpoint{*buf};
bool suffix_found = false;
size_t last_successfully_parsed_idx = format_data_idx + 1;
try
@ -246,11 +260,11 @@ bool TemplateRowInputFormat::checkForSuffix()
if (unlikely(suffix_found))
{
skipSpaces();
if (buf.eof())
if (buf->eof())
return true;
}
buf.rollbackToCheckpoint();
buf->rollbackToCheckpoint();
return false;
}
@ -258,11 +272,11 @@ bool TemplateRowInputFormat::parseRowAndPrintDiagnosticInfo(MutableColumns & col
{
out << "Suffix does not match: ";
size_t last_successfully_parsed_idx = format_data_idx + 1;
const ReadBuffer::Position row_begin_pos = buf.position();
const ReadBuffer::Position row_begin_pos = buf->position();
bool caught = false;
try
{
PeekableReadBufferCheckpoint checkpoint{buf, true};
PeekableReadBufferCheckpoint checkpoint{*buf, true};
tryReadPrefixOrSuffix<void>(last_successfully_parsed_idx, format.columnsCount());
}
catch (Exception & e)
@ -273,12 +287,12 @@ bool TemplateRowInputFormat::parseRowAndPrintDiagnosticInfo(MutableColumns & col
if (!caught)
{
out << " There is some data after suffix (EOF expected, got ";
verbosePrintString(buf.position(), std::min(buf.buffer().end(), buf.position() + 16), out);
verbosePrintString(buf->position(), std::min(buf->buffer().end(), buf->position() + 16), out);
out << "). ";
}
out << " Format string (from format_schema): \n" << format.dump() << "\n";
if (row_begin_pos != buf.position())
if (row_begin_pos != buf->position())
{
/// Pointers to buffer memory were invalidated during checking for suffix
out << "\nCannot print more diagnostic info.";
@ -287,12 +301,12 @@ bool TemplateRowInputFormat::parseRowAndPrintDiagnosticInfo(MutableColumns & col
out << "\nUsing format string (from format_schema_rows): " << row_format.dump() << "\n";
out << "\nTrying to parse next row, because suffix does not match:\n";
if (likely(row_num != 1) && !parseDelimiterWithDiagnosticInfo(out, buf, row_between_delimiter, "delimiter between rows", ignore_spaces))
if (likely(row_num != 1) && !parseDelimiterWithDiagnosticInfo(out, *buf, row_between_delimiter, "delimiter between rows", ignore_spaces))
return false;
for (size_t i = 0; i < row_format.columnsCount(); ++i)
{
if (!parseDelimiterWithDiagnosticInfo(out, buf, row_format.delimiters[i], "delimiter before field " + std::to_string(i), ignore_spaces))
if (!parseDelimiterWithDiagnosticInfo(out, *buf, row_format.delimiters[i], "delimiter before field " + std::to_string(i), ignore_spaces))
return false;
skipSpaces();
@ -318,7 +332,7 @@ bool TemplateRowInputFormat::parseRowAndPrintDiagnosticInfo(MutableColumns & col
}
}
return parseDelimiterWithDiagnosticInfo(out, buf, row_format.delimiters.back(), "delimiter after last field", ignore_spaces);
return parseDelimiterWithDiagnosticInfo(out, *buf, row_format.delimiters.back(), "delimiter after last field", ignore_spaces);
}
bool parseDelimiterWithDiagnosticInfo(WriteBuffer & out, ReadBuffer & buf, const String & delimiter, const String & description, bool skip_spaces)
@ -366,9 +380,9 @@ bool TemplateRowInputFormat::allowSyncAfterError() const
void TemplateRowInputFormat::syncAfterError()
{
skipToNextRowOrEof(buf, row_format.delimiters.back(), row_between_delimiter, ignore_spaces);
end_of_stream = buf.eof();
/// It can happen that buf.position() is not at the beginning of row
skipToNextRowOrEof(*buf, row_format.delimiters.back(), row_between_delimiter, ignore_spaces);
end_of_stream = buf->eof();
/// It can happen that buf->position() is not at the beginning of row
/// if some delimiters is similar to row_format.delimiters.back() and row_between_delimiter.
/// It will cause another parsing error.
}
@ -384,7 +398,13 @@ void TemplateRowInputFormat::resetParser()
{
RowInputFormatWithDiagnosticInfo::resetParser();
end_of_stream = false;
buf.reset();
buf->reset();
}
void TemplateRowInputFormat::setReadBuffer(ReadBuffer & in_)
{
buf = std::make_unique<PeekableReadBuffer>(in_);
IInputFormat::setReadBuffer(*buf);
}
void registerInputFormatTemplate(FormatFactory & factory)

View File

@ -25,6 +25,11 @@ public:
void resetParser() override;
private:
TemplateRowInputFormat(const Block & header_, std::unique_ptr<PeekableReadBuffer> buf_, const Params & params_,
FormatSettings settings_, bool ignore_spaces_,
ParsedTemplateFormatString format_, ParsedTemplateFormatString row_format_,
std::string row_between_delimiter);
bool readRow(MutableColumns & columns, RowReadExtension & extra) override;
void readPrefix() override;
@ -36,7 +41,7 @@ private:
const SerializationPtr & serialization, IColumn & column, size_t file_column);
void skipField(EscapingRule escaping_rule);
inline void skipSpaces() { if (ignore_spaces) skipWhitespaceIfAny(buf); }
inline void skipSpaces() { if (ignore_spaces) skipWhitespaceIfAny(*buf); }
template <typename ReturnType = void>
ReturnType tryReadPrefixOrSuffix(size_t & input_part_beg, size_t input_part_end);
@ -48,7 +53,9 @@ private:
bool isGarbageAfterField(size_t after_col_idx, ReadBuffer::Position pos) override;
PeekableReadBuffer buf;
void setReadBuffer(ReadBuffer & in_) override;
std::unique_ptr<PeekableReadBuffer> buf;
const DataTypes data_types;
FormatSettings settings;

View File

@ -254,16 +254,24 @@ ColumnDependencies StorageInMemoryMetadata::getColumnDependencies(const NameSet
for (const auto & projection : getProjections())
add_dependent_columns(&projection, projections_columns);
if (hasRowsTTL())
auto add_for_rows_ttl = [&](const auto & expression, auto & to_set)
{
auto rows_expression = getRowsTTL().expression;
if (add_dependent_columns(rows_expression, required_ttl_columns) && include_ttl_target)
if (add_dependent_columns(expression, to_set) && include_ttl_target)
{
/// Filter all columns, if rows TTL expression have to be recalculated.
for (const auto & column : getColumns().getAllPhysical())
updated_ttl_columns.insert(column.name);
}
}
};
if (hasRowsTTL())
add_for_rows_ttl(getRowsTTL().expression, required_ttl_columns);
for (const auto & entry : getRowsWhereTTLs())
add_for_rows_ttl(entry.expression, required_ttl_columns);
for (const auto & entry : getGroupByTTLs())
add_for_rows_ttl(entry.expression, required_ttl_columns);
for (const auto & entry : getRecompressionTTLs())
add_dependent_columns(entry.expression, required_ttl_columns);

View File

@ -2156,7 +2156,7 @@ class ClickHouseInstance:
def wait_start(self, start_wait_sec):
start_time = time.time()
last_err = None
while time.time() <= start_time + start_wait_sec:
while True:
try:
pid = self.get_process_pid("clickhouse")
if pid is None:
@ -2170,6 +2170,8 @@ class ClickHouseInstance:
logging.warning(f"ERROR {err}")
else:
raise Exception("ClickHouse server is not running. Check logs.")
if time.time() > start_time + start_wait_sec:
break
logging.error(f"No time left to start. But process is still running. Will dump threads.")
ps_clickhouse = self.exec_in_container(["bash", "-c", "ps -C clickhouse"], nothrow=True, user='root')
logging.info(f"PS RESULT:\n{ps_clickhouse}")

View File

@ -0,0 +1,41 @@
0 2021-01-01 0
0 2021-01-01 0
1 2021-01-01 0
1 2021-01-01 0
2 2021-01-01 0
2 2021-01-01 0
3 2021-01-01 0
3 2021-01-01 0
4 2021-01-01 0
4 2021-01-01 0
5 2021-01-01 0
5 2021-01-01 0
6 2021-01-01 0
6 2021-01-01 0
7 2021-01-01 0
7 2021-01-01 0
8 2021-01-01 0
8 2021-01-01 0
9 2021-01-01 0
9 2021-01-01 0
==========
0 2021-01-01 0
0 2021-01-01 0
1 2021-01-01 0
1 2021-01-01 0
2 2021-01-01 0
2 2021-01-01 0
3 2021-01-01 0
3 2021-01-01 0
4 2021-01-01 0
4 2021-01-01 0
5 2021-01-01 0
5 2021-01-01 0
6 2021-01-01 0
6 2021-01-01 0
7 2021-01-01 0
7 2021-01-01 0
8 2021-01-01 0
8 2021-01-01 0
9 2021-01-01 0
9 2021-01-01 0

View File

@ -0,0 +1,31 @@
drop table if exists ttl_test_02129;
create table ttl_test_02129(a Int64, b String, d Date)
Engine=MergeTree partition by d order by a
settings min_bytes_for_wide_part = 0, min_rows_for_wide_part = 0, materialize_ttl_recalculate_only = 0;
insert into ttl_test_02129 select number, '', '2021-01-01' from numbers(10);
alter table ttl_test_02129 add column c Int64 settings mutations_sync=2;
insert into ttl_test_02129 select number, '', '2021-01-01', 0 from numbers(10);
alter table ttl_test_02129 modify TTL (d + INTERVAL 1 MONTH) DELETE WHERE c=1 settings mutations_sync=2;
select * from ttl_test_02129 order by a, b, d, c;
drop table ttl_test_02129;
drop table if exists ttl_test_02129;
select '==========';
create table ttl_test_02129(a Int64, b String, d Date)
Engine=MergeTree partition by d order by a
settings min_bytes_for_wide_part = 0, min_rows_for_wide_part = 0, materialize_ttl_recalculate_only = 1;
insert into ttl_test_02129 select number, '', '2021-01-01' from numbers(10);
alter table ttl_test_02129 add column c Int64 settings mutations_sync=2;
insert into ttl_test_02129 select number, '', '2021-01-01', 0 from numbers(10);
alter table ttl_test_02129 modify TTL (d + INTERVAL 1 MONTH) DELETE WHERE c=1 settings mutations_sync=2;
select * from ttl_test_02129 order by a, b, d, c;
drop table ttl_test_02129;

View File

@ -0,0 +1,10 @@
1 a
2 b
3 a
4 b
5 a
6 b
7 a
8 b
all_1_1_0 4 0
all_2_2_0 4 0

View File

@ -0,0 +1,39 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
url="${CLICKHOUSE_URL}&async_insert=1&wait_for_async_insert=1"
${CLICKHOUSE_CLIENT} -q "DROP TABLE IF EXISTS async_inserts"
${CLICKHOUSE_CLIENT} -q "CREATE TABLE async_inserts (id UInt32, s String) ENGINE = MergeTree ORDER BY id"
${CLICKHOUSE_CURL} -sS "$url" -d "INSERT INTO async_inserts FORMAT CustomSeparated settings format_custom_escaping_rule='CSV', format_custom_field_delimiter=','
1,\"a\"
2,\"b\"
" &
${CLICKHOUSE_CURL} -sS "$url" -d "INSERT INTO async_inserts FORMAT CustomSeparated settings format_custom_escaping_rule='CSV', format_custom_field_delimiter=','
3,\"a\"
4,\"b\"
" &
${CLICKHOUSE_CURL} -sS "$url" -d "INSERT INTO async_inserts FORMAT CustomSeparatedWithNames settings format_custom_escaping_rule='CSV', format_custom_field_delimiter=','
\"id\",\"s\"
5,\"a\"
6,\"b\"
" &
${CLICKHOUSE_CURL} -sS "$url" -d "INSERT INTO async_inserts FORMAT CustomSeparatedWithNames settings format_custom_escaping_rule='CSV', format_custom_field_delimiter=','
\"id\",\"s\"
7,\"a\"
8,\"b\"
" &
wait
${CLICKHOUSE_CLIENT} -q "SELECT * FROM async_inserts ORDER BY id"
${CLICKHOUSE_CLIENT} -q "SELECT name, rows, level FROM system.parts WHERE table = 'async_inserts' AND database = '$CLICKHOUSE_DATABASE' ORDER BY name"
${CLICKHOUSE_CLIENT} -q "DROP TABLE async_inserts"