Merge branch 'flush_in_memory_part' of github.com:nautaa/ClickHouse into flush_in_memory_part

This commit is contained in:
nautaa 2021-12-14 19:42:15 +08:00
commit 5545793555
20 changed files with 465 additions and 153 deletions

View File

@ -421,6 +421,7 @@ ASTPtr MutationsInterpreter::prepare(bool dry_run)
NameSet updated_columns;
bool materialize_ttl_recalculate_only = materializeTTLRecalculateOnly(storage);
for (const MutationCommand & command : commands)
{
if (command.type == MutationCommand::Type::UPDATE
@ -631,7 +632,9 @@ ASTPtr MutationsInterpreter::prepare(bool dry_run)
dependencies.insert(dependency);
}
}
else if (metadata_snapshot->hasRowsTTL())
else if (metadata_snapshot->hasRowsTTL()
|| metadata_snapshot->hasAnyRowsWhereTTL()
|| metadata_snapshot->hasAnyGroupByTTL())
{
for (const auto & column : all_columns)
dependencies.emplace(column.name, ColumnDependency::TTL_TARGET);

View File

@ -30,8 +30,21 @@ CustomSeparatedRowInputFormat::CustomSeparatedRowInputFormat(
bool with_types_,
bool ignore_spaces_,
const FormatSettings & format_settings_)
: RowInputFormatWithNamesAndTypes(header_, buf, params_, with_names_, with_types_, updateFormatSettings(format_settings_))
, buf(in_)
: CustomSeparatedRowInputFormat(
header_, std::make_unique<PeekableReadBuffer>(in_), params_, with_names_, with_types_, ignore_spaces_, format_settings_)
{
}
CustomSeparatedRowInputFormat::CustomSeparatedRowInputFormat(
const Block & header_,
std::unique_ptr<PeekableReadBuffer> buf_,
const Params & params_,
bool with_names_,
bool with_types_,
bool ignore_spaces_,
const FormatSettings & format_settings_)
: RowInputFormatWithNamesAndTypes(header_, *buf_, params_, with_names_, with_types_, updateFormatSettings(format_settings_))
, buf(std::move(buf_))
, ignore_spaces(ignore_spaces_)
, escaping_rule(format_settings_.custom.escaping_rule)
{
@ -51,57 +64,57 @@ CustomSeparatedRowInputFormat::CustomSeparatedRowInputFormat(
void CustomSeparatedRowInputFormat::skipPrefixBeforeHeader()
{
skipSpaces();
assertString(format_settings.custom.result_before_delimiter, buf);
assertString(format_settings.custom.result_before_delimiter, *buf);
}
void CustomSeparatedRowInputFormat::skipRowStartDelimiter()
{
skipSpaces();
assertString(format_settings.custom.row_before_delimiter, buf);
assertString(format_settings.custom.row_before_delimiter, *buf);
}
void CustomSeparatedRowInputFormat::skipFieldDelimiter()
{
skipSpaces();
assertString(format_settings.custom.field_delimiter, buf);
assertString(format_settings.custom.field_delimiter, *buf);
}
void CustomSeparatedRowInputFormat::skipRowEndDelimiter()
{
skipSpaces();
assertString(format_settings.custom.row_after_delimiter, buf);
assertString(format_settings.custom.row_after_delimiter, *buf);
}
void CustomSeparatedRowInputFormat::skipRowBetweenDelimiter()
{
skipSpaces();
assertString(format_settings.custom.row_between_delimiter, buf);
assertString(format_settings.custom.row_between_delimiter, *buf);
}
void CustomSeparatedRowInputFormat::skipField()
{
skipSpaces();
skipFieldByEscapingRule(buf, escaping_rule, format_settings);
skipFieldByEscapingRule(*buf, escaping_rule, format_settings);
}
bool CustomSeparatedRowInputFormat::checkEndOfRow()
{
PeekableReadBufferCheckpoint checkpoint{buf, true};
PeekableReadBufferCheckpoint checkpoint{*buf, true};
skipSpaces();
if (!checkString(format_settings.custom.row_after_delimiter, buf))
if (!checkString(format_settings.custom.row_after_delimiter, *buf))
return false;
skipSpaces();
/// At the end of row after row_after_delimiter we expect result_after_delimiter or row_between_delimiter.
if (checkString(format_settings.custom.row_between_delimiter, buf))
if (checkString(format_settings.custom.row_between_delimiter, *buf))
return true;
buf.rollbackToCheckpoint();
buf->rollbackToCheckpoint();
skipSpaces();
buf.ignore(format_settings.custom.row_after_delimiter.size());
buf->ignore(format_settings.custom.row_after_delimiter.size());
return checkForSuffixImpl(true);
}
@ -114,7 +127,7 @@ std::vector<String> CustomSeparatedRowInputFormat::readHeaderRow()
if (!values.empty())
skipFieldDelimiter();
skipSpaces();
values.push_back(readStringByEscapingRule(buf, escaping_rule, format_settings));
values.push_back(readStringByEscapingRule(*buf, escaping_rule, format_settings));
}
while (!checkEndOfRow());
@ -138,7 +151,7 @@ void CustomSeparatedRowInputFormat::skipHeaderRow()
bool CustomSeparatedRowInputFormat::readField(IColumn & column, const DataTypePtr & type, const SerializationPtr & serialization, bool, const String &)
{
skipSpaces();
return deserializeFieldByEscapingRule(type, serialization, column, buf, escaping_rule, format_settings);
return deserializeFieldByEscapingRule(type, serialization, column, *buf, escaping_rule, format_settings);
}
bool CustomSeparatedRowInputFormat::checkForSuffixImpl(bool check_eof)
@ -149,16 +162,16 @@ bool CustomSeparatedRowInputFormat::checkForSuffixImpl(bool check_eof)
if (!check_eof)
return false;
return buf.eof();
return buf->eof();
}
if (unlikely(checkString(format_settings.custom.result_after_delimiter, buf)))
if (unlikely(checkString(format_settings.custom.result_after_delimiter, *buf)))
{
skipSpaces();
if (!check_eof)
return true;
if (buf.eof())
if (buf->eof())
return true;
}
return false;
@ -166,25 +179,25 @@ bool CustomSeparatedRowInputFormat::checkForSuffixImpl(bool check_eof)
bool CustomSeparatedRowInputFormat::tryParseSuffixWithDiagnosticInfo(WriteBuffer & out)
{
PeekableReadBufferCheckpoint checkpoint{buf};
PeekableReadBufferCheckpoint checkpoint{*buf};
if (checkForSuffixImpl(false))
{
if (buf.eof())
if (buf->eof())
out << "<End of stream>\n";
else
out << " There is some data after suffix\n";
return false;
}
buf.rollbackToCheckpoint();
buf->rollbackToCheckpoint();
return true;
}
bool CustomSeparatedRowInputFormat::checkForSuffix()
{
PeekableReadBufferCheckpoint checkpoint{buf};
PeekableReadBufferCheckpoint checkpoint{*buf};
if (checkForSuffixImpl(true))
return true;
buf.rollbackToCheckpoint();
buf->rollbackToCheckpoint();
return false;
}
@ -196,37 +209,43 @@ bool CustomSeparatedRowInputFormat::allowSyncAfterError() const
void CustomSeparatedRowInputFormat::syncAfterError()
{
skipToNextRowOrEof(buf, format_settings.custom.row_after_delimiter, format_settings.custom.row_between_delimiter, ignore_spaces);
end_of_stream = buf.eof();
/// It can happen that buf.position() is not at the beginning of row
skipToNextRowOrEof(*buf, format_settings.custom.row_after_delimiter, format_settings.custom.row_between_delimiter, ignore_spaces);
end_of_stream = buf->eof();
/// It can happen that buf->position() is not at the beginning of row
/// if some delimiters is similar to row_format.delimiters.back() and row_between_delimiter.
/// It will cause another parsing error.
}
bool CustomSeparatedRowInputFormat::parseRowStartWithDiagnosticInfo(WriteBuffer & out)
{
return parseDelimiterWithDiagnosticInfo(out, buf, format_settings.custom.row_before_delimiter, "delimiter before first field", ignore_spaces);
return parseDelimiterWithDiagnosticInfo(out, *buf, format_settings.custom.row_before_delimiter, "delimiter before first field", ignore_spaces);
}
bool CustomSeparatedRowInputFormat::parseFieldDelimiterWithDiagnosticInfo(WriteBuffer & out)
{
return parseDelimiterWithDiagnosticInfo(out, buf, format_settings.custom.field_delimiter, "delimiter between fields", ignore_spaces);
return parseDelimiterWithDiagnosticInfo(out, *buf, format_settings.custom.field_delimiter, "delimiter between fields", ignore_spaces);
}
bool CustomSeparatedRowInputFormat::parseRowEndWithDiagnosticInfo(WriteBuffer & out)
{
return parseDelimiterWithDiagnosticInfo(out, buf, format_settings.custom.row_after_delimiter, "delimiter after last field", ignore_spaces);
return parseDelimiterWithDiagnosticInfo(out, *buf, format_settings.custom.row_after_delimiter, "delimiter after last field", ignore_spaces);
}
bool CustomSeparatedRowInputFormat::parseRowBetweenDelimiterWithDiagnosticInfo(WriteBuffer & out)
{
return parseDelimiterWithDiagnosticInfo(out, buf, format_settings.custom.row_between_delimiter, "delimiter between rows", ignore_spaces);
return parseDelimiterWithDiagnosticInfo(out, *buf, format_settings.custom.row_between_delimiter, "delimiter between rows", ignore_spaces);
}
void CustomSeparatedRowInputFormat::resetParser()
{
RowInputFormatWithNamesAndTypes::resetParser();
buf.reset();
buf->reset();
}
void CustomSeparatedRowInputFormat::setReadBuffer(ReadBuffer & in_)
{
buf = std::make_unique<PeekableReadBuffer>(in_);
IInputFormat::setReadBuffer(*buf);
}
void registerInputFormatCustomSeparated(FormatFactory & factory)

View File

@ -20,6 +20,8 @@ public:
void resetParser() override;
String getName() const override { return "CustomSeparatedRowInputFormat"; }
void setReadBuffer(ReadBuffer & in_) override;
private:
CustomSeparatedRowInputFormat(
const Block & header_,
@ -59,9 +61,9 @@ private:
bool checkEndOfRow();
bool checkForSuffixImpl(bool check_eof);
inline void skipSpaces() { if (ignore_spaces) skipWhitespaceIfAny(buf); }
inline void skipSpaces() { if (ignore_spaces) skipWhitespaceIfAny(*buf); }
PeekableReadBuffer buf;
std::unique_ptr<PeekableReadBuffer> buf;
bool ignore_spaces;
EscapingRule escaping_rule;
};

View File

@ -14,8 +14,11 @@ namespace ErrorCodes
extern const int INCORRECT_DATA;
}
JSONAsStringRowInputFormat::JSONAsStringRowInputFormat(const Block & header_, ReadBuffer & in_, Params params_) :
IRowInputFormat(header_, buf, std::move(params_)), buf(in_)
JSONAsStringRowInputFormat::JSONAsStringRowInputFormat(const Block & header_, ReadBuffer & in_, Params params_)
: JSONAsStringRowInputFormat(header_, std::make_unique<PeekableReadBuffer>(in_), params_) {}
JSONAsStringRowInputFormat::JSONAsStringRowInputFormat(const Block & header_, std::unique_ptr<PeekableReadBuffer> buf_, Params params_) :
IRowInputFormat(header_, *buf_, std::move(params_)), buf(std::move(buf_))
{
if (header_.columns() > 1)
throw Exception(ErrorCodes::BAD_ARGUMENTS,
@ -31,113 +34,113 @@ JSONAsStringRowInputFormat::JSONAsStringRowInputFormat(const Block & header_, Re
void JSONAsStringRowInputFormat::resetParser()
{
IRowInputFormat::resetParser();
buf.reset();
buf->reset();
}
void JSONAsStringRowInputFormat::readPrefix()
{
/// In this format, BOM at beginning of stream cannot be confused with value, so it is safe to skip it.
skipBOMIfExists(buf);
skipBOMIfExists(*buf);
skipWhitespaceIfAny(buf);
if (!buf.eof() && *buf.position() == '[')
skipWhitespaceIfAny(*buf);
if (!buf->eof() && *buf->position() == '[')
{
++buf.position();
++buf->position();
data_in_square_brackets = true;
}
}
void JSONAsStringRowInputFormat::readSuffix()
{
skipWhitespaceIfAny(buf);
skipWhitespaceIfAny(*buf);
if (data_in_square_brackets)
{
assertChar(']', buf);
skipWhitespaceIfAny(buf);
assertChar(']', *buf);
skipWhitespaceIfAny(*buf);
}
if (!buf.eof() && *buf.position() == ';')
if (!buf->eof() && *buf->position() == ';')
{
++buf.position();
skipWhitespaceIfAny(buf);
++buf->position();
skipWhitespaceIfAny(*buf);
}
assertEOF(buf);
assertEOF(*buf);
}
void JSONAsStringRowInputFormat::readJSONObject(IColumn & column)
{
PeekableReadBufferCheckpoint checkpoint{buf};
PeekableReadBufferCheckpoint checkpoint{*buf};
size_t balance = 0;
bool quotes = false;
if (*buf.position() != '{')
if (*buf->position() != '{')
throw Exception("JSON object must begin with '{'.", ErrorCodes::INCORRECT_DATA);
++buf.position();
++buf->position();
++balance;
char * pos;
while (balance)
{
if (buf.eof())
if (buf->eof())
throw Exception("Unexpected end of file while parsing JSON object.", ErrorCodes::INCORRECT_DATA);
if (quotes)
{
pos = find_first_symbols<'"', '\\'>(buf.position(), buf.buffer().end());
buf.position() = pos;
if (buf.position() == buf.buffer().end())
pos = find_first_symbols<'"', '\\'>(buf->position(), buf->buffer().end());
buf->position() = pos;
if (buf->position() == buf->buffer().end())
continue;
if (*buf.position() == '"')
if (*buf->position() == '"')
{
quotes = false;
++buf.position();
++buf->position();
}
else if (*buf.position() == '\\')
else if (*buf->position() == '\\')
{
++buf.position();
if (!buf.eof())
++buf->position();
if (!buf->eof())
{
++buf.position();
++buf->position();
}
}
}
else
{
pos = find_first_symbols<'"', '{', '}', '\\'>(buf.position(), buf.buffer().end());
buf.position() = pos;
if (buf.position() == buf.buffer().end())
pos = find_first_symbols<'"', '{', '}', '\\'>(buf->position(), buf->buffer().end());
buf->position() = pos;
if (buf->position() == buf->buffer().end())
continue;
if (*buf.position() == '{')
if (*buf->position() == '{')
{
++balance;
++buf.position();
++buf->position();
}
else if (*buf.position() == '}')
else if (*buf->position() == '}')
{
--balance;
++buf.position();
++buf->position();
}
else if (*buf.position() == '\\')
else if (*buf->position() == '\\')
{
++buf.position();
if (!buf.eof())
++buf->position();
if (!buf->eof())
{
++buf.position();
++buf->position();
}
}
else if (*buf.position() == '"')
else if (*buf->position() == '"')
{
quotes = true;
++buf.position();
++buf->position();
}
}
}
buf.makeContinuousMemoryFromCheckpointToPos();
char * end = buf.position();
buf.rollbackToCheckpoint();
column.insertData(buf.position(), end - buf.position());
buf.position() = end;
buf->makeContinuousMemoryFromCheckpointToPos();
char * end = buf->position();
buf->rollbackToCheckpoint();
column.insertData(buf->position(), end - buf->position());
buf->position() = end;
}
bool JSONAsStringRowInputFormat::readRow(MutableColumns & columns, RowReadExtension &)
@ -145,30 +148,36 @@ bool JSONAsStringRowInputFormat::readRow(MutableColumns & columns, RowReadExtens
if (!allow_new_rows)
return false;
skipWhitespaceIfAny(buf);
if (!buf.eof())
skipWhitespaceIfAny(*buf);
if (!buf->eof())
{
if (!data_in_square_brackets && *buf.position() == ';')
if (!data_in_square_brackets && *buf->position() == ';')
{
/// ';' means the end of query, but it cannot be before ']'.
return allow_new_rows = false;
}
else if (data_in_square_brackets && *buf.position() == ']')
else if (data_in_square_brackets && *buf->position() == ']')
{
/// ']' means the end of query.
return allow_new_rows = false;
}
}
if (!buf.eof())
if (!buf->eof())
readJSONObject(*columns[0]);
skipWhitespaceIfAny(buf);
if (!buf.eof() && *buf.position() == ',')
++buf.position();
skipWhitespaceIfAny(buf);
skipWhitespaceIfAny(*buf);
if (!buf->eof() && *buf->position() == ',')
++buf->position();
skipWhitespaceIfAny(*buf);
return !buf.eof();
return !buf->eof();
}
void JSONAsStringRowInputFormat::setReadBuffer(ReadBuffer & in_)
{
buf = std::make_unique<PeekableReadBuffer>(in_);
IInputFormat::setReadBuffer(*buf);
}
void registerInputFormatJSONAsString(FormatFactory & factory)

View File

@ -20,8 +20,11 @@ public:
String getName() const override { return "JSONAsStringRowInputFormat"; }
void resetParser() override;
void setReadBuffer(ReadBuffer & in_) override;
private:
JSONAsStringRowInputFormat(const Block & header_, std::unique_ptr<PeekableReadBuffer> buf_, Params params_);
bool readRow(MutableColumns & columns, RowReadExtension & ext) override;
void readPrefix() override;
@ -29,7 +32,7 @@ private:
void readJSONObject(IColumn & column);
PeekableReadBuffer buf;
std::unique_ptr<PeekableReadBuffer> buf;
/// This flag is needed to know if data is in square brackets.
bool data_in_square_brackets = false;

View File

@ -29,12 +29,15 @@ namespace ErrorCodes
}
MsgPackRowInputFormat::MsgPackRowInputFormat(const Block & header_, ReadBuffer & in_, Params params_)
: IRowInputFormat(header_, buf, std::move(params_)), buf(in_), parser(visitor), data_types(header_.getDataTypes()) {}
: MsgPackRowInputFormat(header_, std::make_unique<PeekableReadBuffer>(in_), params_) {}
MsgPackRowInputFormat::MsgPackRowInputFormat(const Block & header_, std::unique_ptr<PeekableReadBuffer> buf_, Params params_)
: IRowInputFormat(header_, *buf_, std::move(params_)), buf(std::move(buf_)), parser(visitor), data_types(header_.getDataTypes()) {}
void MsgPackRowInputFormat::resetParser()
{
IRowInputFormat::resetParser();
buf.reset();
buf->reset();
visitor.reset();
}
@ -325,21 +328,21 @@ void MsgPackVisitor::parse_error(size_t, size_t) // NOLINT
bool MsgPackRowInputFormat::readObject()
{
if (buf.eof())
if (buf->eof())
return false;
PeekableReadBufferCheckpoint checkpoint{buf};
PeekableReadBufferCheckpoint checkpoint{*buf};
size_t offset = 0;
while (!parser.execute(buf.position(), buf.available(), offset))
while (!parser.execute(buf->position(), buf->available(), offset))
{
buf.position() = buf.buffer().end();
if (buf.eof())
buf->position() = buf->buffer().end();
if (buf->eof())
throw Exception("Unexpected end of file while parsing msgpack object.", ErrorCodes::INCORRECT_DATA);
buf.position() = buf.buffer().end();
buf.makeContinuousMemoryFromCheckpointToPos();
buf.rollbackToCheckpoint();
buf->position() = buf->buffer().end();
buf->makeContinuousMemoryFromCheckpointToPos();
buf->rollbackToCheckpoint();
}
buf.position() += offset;
buf->position() += offset;
return true;
}
@ -363,6 +366,12 @@ bool MsgPackRowInputFormat::readRow(MutableColumns & columns, RowReadExtension &
return true;
}
void MsgPackRowInputFormat::setReadBuffer(ReadBuffer & in_)
{
buf = std::make_unique<PeekableReadBuffer>(in_);
IInputFormat::setReadBuffer(*buf);
}
void registerInputFormatMsgPack(FormatFactory & factory)
{
factory.registerInputFormat("MsgPack", [](

View File

@ -61,13 +61,16 @@ public:
String getName() const override { return "MagPackRowInputFormat"; }
void resetParser() override;
void setReadBuffer(ReadBuffer & in_) override;
private:
MsgPackRowInputFormat(const Block & header_, std::unique_ptr<PeekableReadBuffer> buf_, Params params_);
bool readRow(MutableColumns & columns, RowReadExtension & ext) override;
bool readObject();
PeekableReadBuffer buf;
std::unique_ptr<PeekableReadBuffer> buf;
MsgPackVisitor visitor;
msgpack::detail::parse_helper<MsgPackVisitor> parser;
const DataTypes data_types;

View File

@ -14,10 +14,15 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
RegexpRowInputFormat::RegexpRowInputFormat(ReadBuffer & in_, const Block & header_, Params params_, const FormatSettings & format_settings_)
: RegexpRowInputFormat(std::make_unique<PeekableReadBuffer>(in_), header_, params_, format_settings_)
{
}
RegexpRowInputFormat::RegexpRowInputFormat(
ReadBuffer & in_, const Block & header_, Params params_, const FormatSettings & format_settings_)
: IRowInputFormat(header_, buf, std::move(params_))
, buf(in_)
std::unique_ptr<PeekableReadBuffer> buf_, const Block & header_, Params params_, const FormatSettings & format_settings_)
: IRowInputFormat(header_, *buf_, std::move(params_))
, buf(std::move(buf_))
, format_settings(format_settings_)
, escaping_rule(format_settings_.regexp.escaping_rule)
, regexp(format_settings_.regexp.regexp)
@ -39,7 +44,7 @@ RegexpRowInputFormat::RegexpRowInputFormat(
void RegexpRowInputFormat::resetParser()
{
IRowInputFormat::resetParser();
buf.reset();
buf->reset();
}
bool RegexpRowInputFormat::readField(size_t index, MutableColumns & columns)
@ -71,45 +76,51 @@ void RegexpRowInputFormat::readFieldsFromMatch(MutableColumns & columns, RowRead
bool RegexpRowInputFormat::readRow(MutableColumns & columns, RowReadExtension & ext)
{
if (buf.eof())
if (buf->eof())
return false;
PeekableReadBufferCheckpoint checkpoint{buf};
PeekableReadBufferCheckpoint checkpoint{*buf};
size_t line_size = 0;
do
{
char * pos = find_first_symbols<'\n', '\r'>(buf.position(), buf.buffer().end());
line_size += pos - buf.position();
buf.position() = pos;
} while (buf.position() == buf.buffer().end() && !buf.eof());
char * pos = find_first_symbols<'\n', '\r'>(buf->position(), buf->buffer().end());
line_size += pos - buf->position();
buf->position() = pos;
} while (buf->position() == buf->buffer().end() && !buf->eof());
buf.makeContinuousMemoryFromCheckpointToPos();
buf.rollbackToCheckpoint();
buf->makeContinuousMemoryFromCheckpointToPos();
buf->rollbackToCheckpoint();
bool match = RE2::FullMatchN(re2::StringPiece(buf.position(), line_size), regexp, re2_arguments_ptrs.data(), re2_arguments_ptrs.size());
bool match = RE2::FullMatchN(re2::StringPiece(buf->position(), line_size), regexp, re2_arguments_ptrs.data(), re2_arguments_ptrs.size());
bool read_line = true;
if (!match)
{
if (!format_settings.regexp.skip_unmatched)
throw Exception("Line \"" + std::string(buf.position(), line_size) + "\" doesn't match the regexp.", ErrorCodes::INCORRECT_DATA);
throw Exception("Line \"" + std::string(buf->position(), line_size) + "\" doesn't match the regexp.", ErrorCodes::INCORRECT_DATA);
read_line = false;
}
if (read_line)
readFieldsFromMatch(columns, ext);
buf.position() += line_size;
buf->position() += line_size;
checkChar('\r', buf);
if (!buf.eof() && !checkChar('\n', buf))
checkChar('\r', *buf);
if (!buf->eof() && !checkChar('\n', *buf))
throw Exception("No \\n after \\r at the end of line.", ErrorCodes::INCORRECT_DATA);
return true;
}
void RegexpRowInputFormat::setReadBuffer(ReadBuffer & in_)
{
buf = std::make_unique<PeekableReadBuffer>(in_);
IInputFormat::setReadBuffer(*buf);
}
void registerInputFormatRegexp(FormatFactory & factory)
{
factory.registerInputFormat("Regexp", [](

View File

@ -31,14 +31,17 @@ public:
String getName() const override { return "RegexpRowInputFormat"; }
void resetParser() override;
void setReadBuffer(ReadBuffer & in_) override;
private:
RegexpRowInputFormat(std::unique_ptr<PeekableReadBuffer> buf_, const Block & header_, Params params_, const FormatSettings & format_settings_);
bool readRow(MutableColumns & columns, RowReadExtension & ext) override;
bool readField(size_t index, MutableColumns & columns);
void readFieldsFromMatch(MutableColumns & columns, RowReadExtension & ext);
PeekableReadBuffer buf;
std::unique_ptr<PeekableReadBuffer> buf;
const FormatSettings format_settings;
const EscapingRule escaping_rule;

View File

@ -20,11 +20,25 @@ extern const int SYNTAX_ERROR;
}
TemplateRowInputFormat::TemplateRowInputFormat(const Block & header_, ReadBuffer & in_, const Params & params_,
TemplateRowInputFormat::TemplateRowInputFormat(
const Block & header_,
ReadBuffer & in_,
const Params & params_,
FormatSettings settings_,
bool ignore_spaces_,
ParsedTemplateFormatString format_,
ParsedTemplateFormatString row_format_,
std::string row_between_delimiter_)
: TemplateRowInputFormat(
header_, std::make_unique<PeekableReadBuffer>(in_), params_, settings_, ignore_spaces_, format_, row_format_, row_between_delimiter_)
{
}
TemplateRowInputFormat::TemplateRowInputFormat(const Block & header_, std::unique_ptr<PeekableReadBuffer> buf_, const Params & params_,
FormatSettings settings_, bool ignore_spaces_,
ParsedTemplateFormatString format_, ParsedTemplateFormatString row_format_,
std::string row_between_delimiter_)
: RowInputFormatWithDiagnosticInfo(header_, buf, params_), buf(in_), data_types(header_.getDataTypes()),
: RowInputFormatWithDiagnosticInfo(header_, *buf_, params_), buf(std::move(buf_)), data_types(header_.getDataTypes()),
settings(std::move(settings_)), ignore_spaces(ignore_spaces_),
format(std::move(format_)), row_format(std::move(row_format_)),
default_csv_delimiter(settings.csv.delimiter), row_between_delimiter(std::move(row_between_delimiter_))
@ -101,10 +115,10 @@ ReturnType TemplateRowInputFormat::tryReadPrefixOrSuffix(size_t & input_part_beg
skipSpaces();
if constexpr (throw_exception)
assertString(format.delimiters[input_part_beg], buf);
assertString(format.delimiters[input_part_beg], *buf);
else
{
if (likely(!checkString(format.delimiters[input_part_beg], buf)))
if (likely(!checkString(format.delimiters[input_part_beg], *buf)))
return ReturnType(false);
}
@ -133,10 +147,10 @@ ReturnType TemplateRowInputFormat::tryReadPrefixOrSuffix(size_t & input_part_beg
skipSpaces();
if constexpr (throw_exception)
assertString(format.delimiters[input_part_beg], buf);
assertString(format.delimiters[input_part_beg], *buf);
else
{
if (likely(!checkString(format.delimiters[input_part_beg], buf)))
if (likely(!checkString(format.delimiters[input_part_beg], *buf)))
return ReturnType(false);
}
}
@ -162,14 +176,14 @@ bool TemplateRowInputFormat::readRow(MutableColumns & columns, RowReadExtension
updateDiagnosticInfo();
if (likely(row_num != 1))
assertString(row_between_delimiter, buf);
assertString(row_between_delimiter, *buf);
extra.read_columns.assign(columns.size(), false);
for (size_t i = 0; i < row_format.columnsCount(); ++i)
{
skipSpaces();
assertString(row_format.delimiters[i], buf);
assertString(row_format.delimiters[i], *buf);
skipSpaces();
if (row_format.format_idx_to_column_idx[i])
{
@ -182,7 +196,7 @@ bool TemplateRowInputFormat::readRow(MutableColumns & columns, RowReadExtension
}
skipSpaces();
assertString(row_format.delimiters.back(), buf);
assertString(row_format.delimiters.back(), *buf);
for (const auto & idx : always_default_columns)
data_types[idx]->insertDefaultInto(*columns[idx]);
@ -200,7 +214,7 @@ bool TemplateRowInputFormat::deserializeField(const DataTypePtr & type,
row_format.delimiters[file_column + 1].front();
try
{
return deserializeFieldByEscapingRule(type, serialization, column, buf, escaping_rule, settings);
return deserializeFieldByEscapingRule(type, serialization, column, *buf, escaping_rule, settings);
}
catch (Exception & e)
{
@ -214,7 +228,7 @@ void TemplateRowInputFormat::skipField(TemplateRowInputFormat::EscapingRule esca
{
try
{
skipFieldByEscapingRule(buf, escaping_rule, settings);
skipFieldByEscapingRule(*buf, escaping_rule, settings);
}
catch (Exception & e)
{
@ -228,7 +242,7 @@ void TemplateRowInputFormat::skipField(TemplateRowInputFormat::EscapingRule esca
/// Otherwise returns false
bool TemplateRowInputFormat::checkForSuffix()
{
PeekableReadBufferCheckpoint checkpoint{buf};
PeekableReadBufferCheckpoint checkpoint{*buf};
bool suffix_found = false;
size_t last_successfully_parsed_idx = format_data_idx + 1;
try
@ -246,11 +260,11 @@ bool TemplateRowInputFormat::checkForSuffix()
if (unlikely(suffix_found))
{
skipSpaces();
if (buf.eof())
if (buf->eof())
return true;
}
buf.rollbackToCheckpoint();
buf->rollbackToCheckpoint();
return false;
}
@ -258,11 +272,11 @@ bool TemplateRowInputFormat::parseRowAndPrintDiagnosticInfo(MutableColumns & col
{
out << "Suffix does not match: ";
size_t last_successfully_parsed_idx = format_data_idx + 1;
const ReadBuffer::Position row_begin_pos = buf.position();
const ReadBuffer::Position row_begin_pos = buf->position();
bool caught = false;
try
{
PeekableReadBufferCheckpoint checkpoint{buf, true};
PeekableReadBufferCheckpoint checkpoint{*buf, true};
tryReadPrefixOrSuffix<void>(last_successfully_parsed_idx, format.columnsCount());
}
catch (Exception & e)
@ -273,12 +287,12 @@ bool TemplateRowInputFormat::parseRowAndPrintDiagnosticInfo(MutableColumns & col
if (!caught)
{
out << " There is some data after suffix (EOF expected, got ";
verbosePrintString(buf.position(), std::min(buf.buffer().end(), buf.position() + 16), out);
verbosePrintString(buf->position(), std::min(buf->buffer().end(), buf->position() + 16), out);
out << "). ";
}
out << " Format string (from format_schema): \n" << format.dump() << "\n";
if (row_begin_pos != buf.position())
if (row_begin_pos != buf->position())
{
/// Pointers to buffer memory were invalidated during checking for suffix
out << "\nCannot print more diagnostic info.";
@ -287,12 +301,12 @@ bool TemplateRowInputFormat::parseRowAndPrintDiagnosticInfo(MutableColumns & col
out << "\nUsing format string (from format_schema_rows): " << row_format.dump() << "\n";
out << "\nTrying to parse next row, because suffix does not match:\n";
if (likely(row_num != 1) && !parseDelimiterWithDiagnosticInfo(out, buf, row_between_delimiter, "delimiter between rows", ignore_spaces))
if (likely(row_num != 1) && !parseDelimiterWithDiagnosticInfo(out, *buf, row_between_delimiter, "delimiter between rows", ignore_spaces))
return false;
for (size_t i = 0; i < row_format.columnsCount(); ++i)
{
if (!parseDelimiterWithDiagnosticInfo(out, buf, row_format.delimiters[i], "delimiter before field " + std::to_string(i), ignore_spaces))
if (!parseDelimiterWithDiagnosticInfo(out, *buf, row_format.delimiters[i], "delimiter before field " + std::to_string(i), ignore_spaces))
return false;
skipSpaces();
@ -318,7 +332,7 @@ bool TemplateRowInputFormat::parseRowAndPrintDiagnosticInfo(MutableColumns & col
}
}
return parseDelimiterWithDiagnosticInfo(out, buf, row_format.delimiters.back(), "delimiter after last field", ignore_spaces);
return parseDelimiterWithDiagnosticInfo(out, *buf, row_format.delimiters.back(), "delimiter after last field", ignore_spaces);
}
bool parseDelimiterWithDiagnosticInfo(WriteBuffer & out, ReadBuffer & buf, const String & delimiter, const String & description, bool skip_spaces)
@ -366,9 +380,9 @@ bool TemplateRowInputFormat::allowSyncAfterError() const
void TemplateRowInputFormat::syncAfterError()
{
skipToNextRowOrEof(buf, row_format.delimiters.back(), row_between_delimiter, ignore_spaces);
end_of_stream = buf.eof();
/// It can happen that buf.position() is not at the beginning of row
skipToNextRowOrEof(*buf, row_format.delimiters.back(), row_between_delimiter, ignore_spaces);
end_of_stream = buf->eof();
/// It can happen that buf->position() is not at the beginning of row
/// if some delimiters is similar to row_format.delimiters.back() and row_between_delimiter.
/// It will cause another parsing error.
}
@ -384,7 +398,13 @@ void TemplateRowInputFormat::resetParser()
{
RowInputFormatWithDiagnosticInfo::resetParser();
end_of_stream = false;
buf.reset();
buf->reset();
}
void TemplateRowInputFormat::setReadBuffer(ReadBuffer & in_)
{
buf = std::make_unique<PeekableReadBuffer>(in_);
IInputFormat::setReadBuffer(*buf);
}
void registerInputFormatTemplate(FormatFactory & factory)

View File

@ -25,6 +25,11 @@ public:
void resetParser() override;
private:
TemplateRowInputFormat(const Block & header_, std::unique_ptr<PeekableReadBuffer> buf_, const Params & params_,
FormatSettings settings_, bool ignore_spaces_,
ParsedTemplateFormatString format_, ParsedTemplateFormatString row_format_,
std::string row_between_delimiter);
bool readRow(MutableColumns & columns, RowReadExtension & extra) override;
void readPrefix() override;
@ -36,7 +41,7 @@ private:
const SerializationPtr & serialization, IColumn & column, size_t file_column);
void skipField(EscapingRule escaping_rule);
inline void skipSpaces() { if (ignore_spaces) skipWhitespaceIfAny(buf); }
inline void skipSpaces() { if (ignore_spaces) skipWhitespaceIfAny(*buf); }
template <typename ReturnType = void>
ReturnType tryReadPrefixOrSuffix(size_t & input_part_beg, size_t input_part_end);
@ -48,7 +53,9 @@ private:
bool isGarbageAfterField(size_t after_col_idx, ReadBuffer::Position pos) override;
PeekableReadBuffer buf;
void setReadBuffer(ReadBuffer & in_) override;
std::unique_ptr<PeekableReadBuffer> buf;
const DataTypes data_types;
FormatSettings settings;

View File

@ -254,16 +254,24 @@ ColumnDependencies StorageInMemoryMetadata::getColumnDependencies(const NameSet
for (const auto & projection : getProjections())
add_dependent_columns(&projection, projections_columns);
if (hasRowsTTL())
auto add_for_rows_ttl = [&](const auto & expression, auto & to_set)
{
auto rows_expression = getRowsTTL().expression;
if (add_dependent_columns(rows_expression, required_ttl_columns) && include_ttl_target)
if (add_dependent_columns(expression, to_set) && include_ttl_target)
{
/// Filter all columns, if rows TTL expression have to be recalculated.
for (const auto & column : getColumns().getAllPhysical())
updated_ttl_columns.insert(column.name);
}
}
};
if (hasRowsTTL())
add_for_rows_ttl(getRowsTTL().expression, required_ttl_columns);
for (const auto & entry : getRowsWhereTTLs())
add_for_rows_ttl(entry.expression, required_ttl_columns);
for (const auto & entry : getGroupByTTLs())
add_for_rows_ttl(entry.expression, required_ttl_columns);
for (const auto & entry : getRecompressionTTLs())
add_dependent_columns(entry.expression, required_ttl_columns);

View File

@ -66,10 +66,17 @@ void StorageMongoDB::connectIfNotConnected()
if (!authenticated)
{
Poco::URI poco_uri(uri);
auto query_params = poco_uri.getQueryParameters();
auto auth_source = std::find_if(query_params.begin(), query_params.end(),
[&](const std::pair<std::string, std::string> & param) { return param.first == "authSource"; });
auto auth_db = database_name;
if (auth_source != query_params.end())
auth_db = auth_source->second;
# if POCO_VERSION >= 0x01070800
if (!username.empty() && !password.empty())
{
Poco::MongoDB::Database poco_db(database_name);
Poco::MongoDB::Database poco_db(auth_db);
if (!poco_db.authenticate(*connection, username, password, Poco::MongoDB::Database::AUTH_SCRAM_SHA1))
throw Exception("Cannot authenticate in MongoDB, incorrect user or password", ErrorCodes::MONGODB_CANNOT_AUTHENTICATE);
}

View File

@ -2156,7 +2156,7 @@ class ClickHouseInstance:
def wait_start(self, start_wait_sec):
start_time = time.time()
last_err = None
while time.time() <= start_time + start_wait_sec:
while True:
try:
pid = self.get_process_pid("clickhouse")
if pid is None:
@ -2170,6 +2170,8 @@ class ClickHouseInstance:
logging.warning(f"ERROR {err}")
else:
raise Exception("ClickHouse server is not running. Check logs.")
if time.time() > start_time + start_wait_sec:
break
logging.error(f"No time left to start. But process is still running. Will dump threads.")
ps_clickhouse = self.exec_in_container(["bash", "-c", "ps -C clickhouse"], nothrow=True, user='root')
logging.info(f"PS RESULT:\n{ps_clickhouse}")

View File

@ -46,6 +46,20 @@ def wait_for_large_objects_count(cluster, expected, size=100, timeout=30):
assert get_large_objects_count(cluster, size=size) == expected
def wait_for_active_parts(node, num_expected_parts, table_name, timeout=30):
deadline = time.monotonic() + timeout
num_parts = 0
while time.monotonic() < deadline:
num_parts_str = node.query("select count() from system.parts where table = '{}' and active".format(table_name))
num_parts = int(num_parts_str.strip())
if num_parts == num_expected_parts:
return
time.sleep(0.2)
assert num_parts == num_expected_parts
@pytest.mark.parametrize(
"policy", ["s3"]
)
@ -248,3 +262,50 @@ def test_s3_zero_copy_with_ttl_delete(cluster, large_data, iterations):
node1.query("DROP TABLE IF EXISTS ttl_delete_test NO DELAY")
node2.query("DROP TABLE IF EXISTS ttl_delete_test NO DELAY")
def test_s3_zero_copy_concurrent_merge(cluster):
node1 = cluster.instances["node1"]
node2 = cluster.instances["node2"]
node1.query("DROP TABLE IF EXISTS concurrent_merge NO DELAY")
node2.query("DROP TABLE IF EXISTS concurrent_merge NO DELAY")
for node in (node1, node2):
node.query(
"""
CREATE TABLE concurrent_merge (id UInt64)
ENGINE=ReplicatedMergeTree('/clickhouse/tables/concurrent_merge', '{replica}')
ORDER BY id
SETTINGS index_granularity=2, storage_policy='s3', remote_fs_execute_merges_on_single_replica_time_threshold=1
"""
)
node1.query("system stop merges")
node2.query("system stop merges")
# This will generate two parts with 20 granules each
node1.query("insert into concurrent_merge select number from numbers(40)")
node1.query("insert into concurrent_merge select number + 1 from numbers(40)")
wait_for_active_parts(node2, 2, 'concurrent_merge')
# Merge will materialize default column, it should sleep every granule and take 20 * 2 * 0.1 = 4 sec.
node1.query("alter table concurrent_merge add column x UInt32 default sleep(0.1)")
node1.query("system start merges")
node2.query("system start merges")
# Now, the merge should start.
# Because of remote_fs_execute_merges_on_single_replica_time_threshold=1,
# only one replica will start merge instantly.
# The other replica should wait for 1 sec and also start it.
# That should probably cause a data race at s3 storage.
# For now, it does not happen (every blob has a random name, and we just have a duplicating data)
node1.query("optimize table concurrent_merge final")
wait_for_active_parts(node1, 1, 'concurrent_merge')
wait_for_active_parts(node2, 1, 'concurrent_merge')
for node in (node1, node2):
assert node.query('select sum(id) from concurrent_merge').strip() == '1600'

View File

@ -159,3 +159,27 @@ def test_no_credentials(started_cluster):
node.query("create table simple_mongo_table_2(key UInt64, data String) engine = MongoDB('mongo2:27017', 'test', 'simple_table', '', '')")
assert node.query("SELECT count() FROM simple_mongo_table_2") == '100\n'
simple_mongo_table.drop()
@pytest.mark.parametrize('started_cluster', [False], indirect=['started_cluster'])
def test_auth_source(started_cluster):
mongo_connection = get_mongo_connection(started_cluster, with_credentials=False)
admin_db = mongo_connection['admin']
admin_db.add_user('root', 'clickhouse', roles=[{ 'role': "userAdminAnyDatabase", 'db': "admin" }, "readWriteAnyDatabase"])
simple_mongo_table = admin_db['simple_table']
data = []
for i in range(0, 50):
data.append({'key': i, 'data': hex(i * i)})
simple_mongo_table.insert_many(data)
db = mongo_connection['test']
simple_mongo_table = db['simple_table']
data = []
for i in range(0, 100):
data.append({'key': i, 'data': hex(i * i)})
simple_mongo_table.insert_many(data)
node = started_cluster.instances['node']
node.query("create table simple_mongo_table_fail(key UInt64, data String) engine = MongoDB('mongo2:27017', 'test', 'simple_table', 'root', 'clickhouse')")
node.query_and_get_error("SELECT count() FROM simple_mongo_table_fail")
node.query("create table simple_mongo_table_ok(key UInt64, data String) engine = MongoDB('mongo2:27017', 'test', 'simple_table', 'root', 'clickhouse', 'authSource=admin')")
assert node.query("SELECT count() FROM simple_mongo_table_ok") == '100\n'
simple_mongo_table.drop()

View File

@ -0,0 +1,41 @@
0 2021-01-01 0
0 2021-01-01 0
1 2021-01-01 0
1 2021-01-01 0
2 2021-01-01 0
2 2021-01-01 0
3 2021-01-01 0
3 2021-01-01 0
4 2021-01-01 0
4 2021-01-01 0
5 2021-01-01 0
5 2021-01-01 0
6 2021-01-01 0
6 2021-01-01 0
7 2021-01-01 0
7 2021-01-01 0
8 2021-01-01 0
8 2021-01-01 0
9 2021-01-01 0
9 2021-01-01 0
==========
0 2021-01-01 0
0 2021-01-01 0
1 2021-01-01 0
1 2021-01-01 0
2 2021-01-01 0
2 2021-01-01 0
3 2021-01-01 0
3 2021-01-01 0
4 2021-01-01 0
4 2021-01-01 0
5 2021-01-01 0
5 2021-01-01 0
6 2021-01-01 0
6 2021-01-01 0
7 2021-01-01 0
7 2021-01-01 0
8 2021-01-01 0
8 2021-01-01 0
9 2021-01-01 0
9 2021-01-01 0

View File

@ -0,0 +1,31 @@
drop table if exists ttl_test_02129;
create table ttl_test_02129(a Int64, b String, d Date)
Engine=MergeTree partition by d order by a
settings min_bytes_for_wide_part = 0, min_rows_for_wide_part = 0, materialize_ttl_recalculate_only = 0;
insert into ttl_test_02129 select number, '', '2021-01-01' from numbers(10);
alter table ttl_test_02129 add column c Int64 settings mutations_sync=2;
insert into ttl_test_02129 select number, '', '2021-01-01', 0 from numbers(10);
alter table ttl_test_02129 modify TTL (d + INTERVAL 1 MONTH) DELETE WHERE c=1 settings mutations_sync=2;
select * from ttl_test_02129 order by a, b, d, c;
drop table ttl_test_02129;
drop table if exists ttl_test_02129;
select '==========';
create table ttl_test_02129(a Int64, b String, d Date)
Engine=MergeTree partition by d order by a
settings min_bytes_for_wide_part = 0, min_rows_for_wide_part = 0, materialize_ttl_recalculate_only = 1;
insert into ttl_test_02129 select number, '', '2021-01-01' from numbers(10);
alter table ttl_test_02129 add column c Int64 settings mutations_sync=2;
insert into ttl_test_02129 select number, '', '2021-01-01', 0 from numbers(10);
alter table ttl_test_02129 modify TTL (d + INTERVAL 1 MONTH) DELETE WHERE c=1 settings mutations_sync=2;
select * from ttl_test_02129 order by a, b, d, c;
drop table ttl_test_02129;

View File

@ -0,0 +1,10 @@
1 a
2 b
3 a
4 b
5 a
6 b
7 a
8 b
all_1_1_0 4 0
all_2_2_0 4 0

View File

@ -0,0 +1,39 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
url="${CLICKHOUSE_URL}&async_insert=1&wait_for_async_insert=1"
${CLICKHOUSE_CLIENT} -q "DROP TABLE IF EXISTS async_inserts"
${CLICKHOUSE_CLIENT} -q "CREATE TABLE async_inserts (id UInt32, s String) ENGINE = MergeTree ORDER BY id"
${CLICKHOUSE_CURL} -sS "$url" -d "INSERT INTO async_inserts FORMAT CustomSeparated settings format_custom_escaping_rule='CSV', format_custom_field_delimiter=','
1,\"a\"
2,\"b\"
" &
${CLICKHOUSE_CURL} -sS "$url" -d "INSERT INTO async_inserts FORMAT CustomSeparated settings format_custom_escaping_rule='CSV', format_custom_field_delimiter=','
3,\"a\"
4,\"b\"
" &
${CLICKHOUSE_CURL} -sS "$url" -d "INSERT INTO async_inserts FORMAT CustomSeparatedWithNames settings format_custom_escaping_rule='CSV', format_custom_field_delimiter=','
\"id\",\"s\"
5,\"a\"
6,\"b\"
" &
${CLICKHOUSE_CURL} -sS "$url" -d "INSERT INTO async_inserts FORMAT CustomSeparatedWithNames settings format_custom_escaping_rule='CSV', format_custom_field_delimiter=','
\"id\",\"s\"
7,\"a\"
8,\"b\"
" &
wait
${CLICKHOUSE_CLIENT} -q "SELECT * FROM async_inserts ORDER BY id"
${CLICKHOUSE_CLIENT} -q "SELECT name, rows, level FROM system.parts WHERE table = 'async_inserts' AND database = '$CLICKHOUSE_DATABASE' ORDER BY name"
${CLICKHOUSE_CLIENT} -q "DROP TABLE async_inserts"