This commit is contained in:
Anton Popov 2024-08-28 03:47:28 +04:00 committed by GitHub
commit 581e6b638c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
26 changed files with 282 additions and 50 deletions

View File

@ -369,6 +369,17 @@ void ColumnArray::popBack(size_t n)
offsets_data.resize_assume_reserved(offsets_data.size() - n);
}
ColumnCheckpointPtr ColumnArray::getCheckpoint() const
{
return std::make_shared<ColumnCheckpointWithNested>(size(), getData().getCheckpoint());
}
void ColumnArray::rollback(const ColumnCheckpoint & checkpoint)
{
getOffsets().resize_assume_reserved(checkpoint.size);
getData().rollback(*assert_cast<const ColumnCheckpointWithNested &>(checkpoint).nested);
}
int ColumnArray::compareAtImpl(size_t n, size_t m, const IColumn & rhs_, int nan_direction_hint, const Collator * collator) const
{
const ColumnArray & rhs = assert_cast<const ColumnArray &>(rhs_);

View File

@ -161,6 +161,9 @@ public:
ColumnPtr compress() const override;
ColumnCheckpointPtr getCheckpoint() const override;
void rollback(const ColumnCheckpoint & checkpoint) override;
void forEachSubcolumn(MutableColumnCallback callback) override
{
callback(offsets);

View File

@ -304,6 +304,16 @@ public:
variant_column_ptr->protect();
}
ColumnCheckpointPtr getCheckpoint() const override
{
return variant_column_ptr->getCheckpoint();
}
void rollback(const ColumnCheckpoint & checkpoint) override
{
variant_column_ptr->rollback(checkpoint);
}
void forEachSubcolumn(MutableColumnCallback callback) override
{
callback(variant_column);

View File

@ -312,6 +312,16 @@ void ColumnMap::getExtremes(Field & min, Field & max) const
max = std::move(map_max_value);
}
ColumnCheckpointPtr ColumnMap::getCheckpoint() const
{
return nested->getCheckpoint();
}
void ColumnMap::rollback(const ColumnCheckpoint & checkpoint)
{
nested->rollback(checkpoint);
}
void ColumnMap::forEachSubcolumn(MutableColumnCallback callback)
{
callback(nested);

View File

@ -102,6 +102,8 @@ public:
size_t byteSizeAt(size_t n) const override;
size_t allocatedBytes() const override;
void protect() override;
ColumnCheckpointPtr getCheckpoint() const override;
void rollback(const ColumnCheckpoint & checkpoint) override;
void forEachSubcolumn(MutableColumnCallback callback) override;
void forEachSubcolumnRecursively(RecursiveMutableColumnCallback callback) override;
bool structureEquals(const IColumn & rhs) const override;

View File

@ -305,6 +305,17 @@ void ColumnNullable::popBack(size_t n)
getNullMapColumn().popBack(n);
}
ColumnCheckpointPtr ColumnNullable::getCheckpoint() const
{
return std::make_shared<ColumnCheckpointWithNested>(size(), nested_column->getCheckpoint());
}
void ColumnNullable::rollback(const ColumnCheckpoint & checkpoint)
{
getNullMapData().resize_assume_reserved(checkpoint.size);
nested_column->rollback(*assert_cast<const ColumnCheckpointWithNested &>(checkpoint).nested);
}
ColumnPtr ColumnNullable::filter(const Filter & filt, ssize_t result_size_hint) const
{
ColumnPtr filtered_data = getNestedColumn().filter(filt, result_size_hint);

View File

@ -143,6 +143,9 @@ public:
ColumnPtr compress() const override;
ColumnCheckpointPtr getCheckpoint() const override;
void rollback(const ColumnCheckpoint & checkpoint) override;
void forEachSubcolumn(MutableColumnCallback callback) override
{
callback(nested_column);

View File

@ -30,6 +30,23 @@ const std::shared_ptr<SerializationDynamic> & getDynamicSerialization()
return dynamic_serialization;
}
struct ColumnObjectCheckpoint : public ColumnCheckpoint
{
using CheckpointsMap = std::unordered_map<String, ColumnCheckpointPtr>;
ColumnObjectCheckpoint(size_t size_, CheckpointsMap typed_paths_, CheckpointsMap dynamic_paths_, ColumnCheckpointPtr shared_data_)
: ColumnCheckpoint(size_)
, typed_paths(std::move(typed_paths_))
, dynamic_paths(std::move(dynamic_paths_))
, shared_data(std::move(shared_data_))
{
}
CheckpointsMap typed_paths;
CheckpointsMap dynamic_paths;
ColumnCheckpointPtr shared_data;
};
}
ColumnObject::ColumnObject(
@ -655,6 +672,41 @@ void ColumnObject::popBack(size_t n)
shared_data->popBack(n);
}
ColumnCheckpointPtr ColumnObject::getCheckpoint() const
{
auto get_checkpoints = [](const auto & columns)
{
std::unordered_map<String, ColumnCheckpointPtr> checkpoints;
for (const auto & [name, column] : columns)
checkpoints[name] = column->getCheckpoint();
return checkpoints;
};
return std::make_shared<ColumnObjectCheckpoint>(size(), get_checkpoints(typed_paths), get_checkpoints(dynamic_paths_ptrs), shared_data->getCheckpoint());
}
void ColumnObject::rollback(const ColumnCheckpoint & checkpoint)
{
const auto & object_checkpoint = assert_cast<const ColumnObjectCheckpoint &>(checkpoint);
for (auto & [name, column] : typed_paths)
{
const auto & nested_checkpoint = object_checkpoint.typed_paths.at(name);
chassert(nested_checkpoint);
column->rollback(*nested_checkpoint);
}
for (auto & [name, column] : dynamic_paths_ptrs)
{
const auto & nested_checkpoint = object_checkpoint.dynamic_paths.at(name);
chassert(nested_checkpoint);
column->rollback(*nested_checkpoint);
}
shared_data->rollback(*object_checkpoint.shared_data);
}
StringRef ColumnObject::serializeValueIntoArena(size_t n, Arena & arena, const char *& begin) const
{
StringRef res(begin, 0);

View File

@ -159,6 +159,8 @@ public:
size_t byteSizeAt(size_t n) const override;
size_t allocatedBytes() const override;
void protect() override;
ColumnCheckpointPtr getCheckpoint() const override;
void rollback(const ColumnCheckpoint & checkpoint) override;
void forEachSubcolumn(MutableColumnCallback callback) override;

View File

@ -308,6 +308,22 @@ void ColumnSparse::popBack(size_t n)
_size = new_size;
}
ColumnCheckpointPtr ColumnSparse::getCheckpoint() const
{
return std::make_shared<ColumnCheckpointWithNested>(size(), values->getCheckpoint());
}
void ColumnSparse::rollback(const ColumnCheckpoint & checkpoint)
{
_size = checkpoint.size;
const auto & nested = *assert_cast<const ColumnCheckpointWithNested &>(checkpoint).nested;
chassert(nested.size > 0);
values->rollback(nested);
getOffsetsData().resize_assume_reserved(nested.size - 1);
}
ColumnPtr ColumnSparse::filter(const Filter & filt, ssize_t) const
{
if (_size != filt.size())

View File

@ -149,6 +149,9 @@ public:
ColumnPtr compress() const override;
ColumnCheckpointPtr getCheckpoint() const override;
void rollback(const ColumnCheckpoint & checkpoint) override;
void forEachSubcolumn(MutableColumnCallback callback) override;
void forEachSubcolumnRecursively(RecursiveMutableColumnCallback callback) override;

View File

@ -254,6 +254,27 @@ void ColumnTuple::popBack(size_t n)
column->popBack(n);
}
ColumnCheckpointPtr ColumnTuple::getCheckpoint() const
{
ColumnCheckpoints checkpoints;
checkpoints.reserve(columns.size());
for (const auto & column : columns)
checkpoints.push_back(column->getCheckpoint());
return std::make_shared<ColumnCheckpointWithNestedTuple>(size(), std::move(checkpoints));
}
void ColumnTuple::rollback(const ColumnCheckpoint & checkpoint)
{
column_length = checkpoint.size;
const auto & checkpoints = assert_cast<const ColumnCheckpointWithNestedTuple &>(checkpoint).nested;
chassert(columns.size() == checkpoints.size());
for (size_t i = 0; i < columns.size(); ++i)
columns[i]->rollback(*checkpoints[i]);
}
StringRef ColumnTuple::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const
{
if (columns.empty())

View File

@ -118,6 +118,8 @@ public:
size_t byteSizeAt(size_t n) const override;
size_t allocatedBytes() const override;
void protect() override;
ColumnCheckpointPtr getCheckpoint() const override;
void rollback(const ColumnCheckpoint & checkpoint) override;
void forEachSubcolumn(MutableColumnCallback callback) override;
void forEachSubcolumnRecursively(RecursiveMutableColumnCallback callback) override;
bool structureEquals(const IColumn & rhs) const override;

View File

@ -739,6 +739,29 @@ void ColumnVariant::popBack(size_t n)
offsets->popBack(n);
}
ColumnCheckpointPtr ColumnVariant::getCheckpoint() const
{
ColumnCheckpoints checkpoints;
checkpoints.reserve(variants.size());
for (const auto & column : variants)
checkpoints.push_back(column->getCheckpoint());
return std::make_shared<ColumnCheckpointWithNestedTuple>(size(), std::move(checkpoints));
}
void ColumnVariant::rollback(const ColumnCheckpoint & checkpoint)
{
getOffsets().resize_assume_reserved(checkpoint.size);
getLocalDiscriminators().resize_assume_reserved(checkpoint.size);
const auto & checkpoints = assert_cast<const ColumnCheckpointWithNestedTuple &>(checkpoint).nested;
chassert(variants.size() == checkpoints.size());
for (size_t i = 0; i < variants.size(); ++i)
variants[i]->rollback(*checkpoints[i]);
}
StringRef ColumnVariant::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const
{
/// During any serialization/deserialization we should always use global discriminators.

View File

@ -248,6 +248,8 @@ public:
size_t byteSizeAt(size_t n) const override;
size_t allocatedBytes() const override;
void protect() override;
ColumnCheckpointPtr getCheckpoint() const override;
void rollback(const ColumnCheckpoint & checkpoint) override;
void forEachSubcolumn(MutableColumnCallback callback) override;
void forEachSubcolumnRecursively(RecursiveMutableColumnCallback callback) override;
bool structureEquals(const IColumn & rhs) const override;

View File

@ -49,6 +49,40 @@ struct EqualRange
using EqualRanges = std::vector<EqualRange>;
/// A checkpoint that contains size of column and all its subcolumns.
/// It can be used to rollback column to the previous state, for example
/// after failed parsing when column may be in inconsistent state.
struct ColumnCheckpoint
{
explicit ColumnCheckpoint(size_t size_) : size(size_) {}
size_t size = 0;
};
using ColumnCheckpointPtr = std::shared_ptr<const ColumnCheckpoint>;
using ColumnCheckpoints = std::vector<ColumnCheckpointPtr>;
struct ColumnCheckpointWithNested : public ColumnCheckpoint
{
ColumnCheckpointWithNested(size_t size_, ColumnCheckpointPtr nested_)
: ColumnCheckpoint(size_)
, nested(std::move(nested_))
{
}
ColumnCheckpointPtr nested;
};
struct ColumnCheckpointWithNestedTuple : public ColumnCheckpoint
{
ColumnCheckpointWithNestedTuple(size_t size_, ColumnCheckpoints nested_)
: ColumnCheckpoint(size_)
, nested(std::move(nested_))
{
}
ColumnCheckpoints nested;
};
/// Declares interface to store columns in memory.
class IColumn : public COW<IColumn>
{
@ -509,6 +543,13 @@ public:
/// The operation is slow and performed only for debug builds.
virtual void protect() {}
/// Returns checkpoint of current state of column.
virtual ColumnCheckpointPtr getCheckpoint() const { return std::make_shared<ColumnCheckpoint>(size()); }
/// Rollbacks column to the checkpoint.
/// Unlike 'popBack' this method should work correctly even if column has invalid state.
virtual void rollback(const ColumnCheckpoint & checkpoint) { popBack(size() - checkpoint.size); }
/// If the column contains subcolumns (such as Array, Nullable, etc), do callback on them.
/// Shallow: doesn't do recursive calls; don't do call for itself.

View File

@ -975,15 +975,14 @@ Chunk AsynchronousInsertQueue::processEntriesWithParsing(
adding_defaults_transform = std::make_shared<AddingDefaultsTransform>(header, columns, *format, insert_context);
}
auto on_error = [&](const MutableColumns & result_columns, Exception & e)
auto on_error = [&](const MutableColumns & result_columns, const ColumnCheckpoints & checkpoints, Exception & e)
{
current_exception = e.displayText();
LOG_ERROR(logger, "Failed parsing for query '{}' with query id {}. {}",
key.query_str, current_entry->query_id, current_exception);
for (const auto & column : result_columns)
if (column->size() > total_rows)
column->popBack(column->size() - total_rows);
for (size_t i = 0; i < result_columns.size(); ++i)
result_columns[i]->rollback(*checkpoints[i]);
current_entry->finish(std::current_exception());
return 0;

View File

@ -22,6 +22,7 @@ StreamingFormatExecutor::StreamingFormatExecutor(
, adding_defaults_transform(std::move(adding_defaults_transform_))
, port(format->getPort().getHeader(), format.get())
, result_columns(header.cloneEmptyColumns())
, checkpoints(result_columns.size())
{
connect(format->getPort(), port);
}
@ -53,6 +54,8 @@ size_t StreamingFormatExecutor::execute(ReadBuffer & buffer)
size_t StreamingFormatExecutor::execute()
{
setCheckpoints();
try
{
size_t new_rows = 0;
@ -85,19 +88,19 @@ size_t StreamingFormatExecutor::execute()
catch (Exception & e)
{
format->resetParser();
return on_error(result_columns, e);
return on_error(result_columns, checkpoints, e);
}
catch (std::exception & e)
{
format->resetParser();
auto exception = Exception(Exception::CreateFromSTDTag{}, e);
return on_error(result_columns, exception);
return on_error(result_columns, checkpoints, exception);
}
catch (...)
{
format->resetParser();
auto exception = Exception(ErrorCodes::UNKNOWN_EXCEPTION, "Unknowk exception while executing StreamingFormatExecutor with format {}", format->getName());
return on_error(result_columns, exception);
auto exception = Exception(ErrorCodes::UNKNOWN_EXCEPTION, "Unknown exception while executing StreamingFormatExecutor with format {}", format->getName());
return on_error(result_columns, checkpoints, exception);
}
}
@ -114,4 +117,11 @@ size_t StreamingFormatExecutor::insertChunk(Chunk chunk)
return chunk_rows;
}
void StreamingFormatExecutor::setCheckpoints()
{
for (size_t i = 0; i < result_columns.size(); ++i)
checkpoints[i] = result_columns[i]->getCheckpoint();
}
}

View File

@ -19,12 +19,12 @@ public:
/// and exception to rethrow it or add context to it.
/// Should return number of new rows, which are added in callback
/// to result columns in comparison to previous call of `execute`.
using ErrorCallback = std::function<size_t(const MutableColumns &, Exception &)>;
using ErrorCallback = std::function<size_t(const MutableColumns &, const ColumnCheckpoints &, Exception &)>;
StreamingFormatExecutor(
const Block & header_,
InputFormatPtr format_,
ErrorCallback on_error_ = [](const MutableColumns &, Exception & e) -> size_t { throw std::move(e); },
ErrorCallback on_error_ = [](const MutableColumns &, const ColumnCheckpoints, Exception & e) -> size_t { throw std::move(e); },
SimpleTransformPtr adding_defaults_transform_ = nullptr);
/// Returns numbers of new read rows.
@ -43,6 +43,8 @@ public:
void setQueryParameters(const NameToNameMap & parameters);
private:
void setCheckpoints();
const Block header;
const InputFormatPtr format;
const ErrorCallback on_error;
@ -50,6 +52,7 @@ private:
InputPort port;
MutableColumns result_columns;
ColumnCheckpoints checkpoints;
};
}

View File

@ -86,21 +86,18 @@ Chunk FileLogSource::generate()
std::optional<String> exception_message;
size_t total_rows = 0;
auto on_error = [&](const MutableColumns & result_columns, Exception & e)
auto on_error = [&](const MutableColumns & result_columns, const ColumnCheckpoints & checkpoints, Exception & e)
{
if (handle_error_mode == StreamingHandleErrorMode::STREAM)
{
exception_message = e.message();
for (const auto & column : result_columns)
for (size_t i = 0; i < result_columns.size(); ++i)
{
// We could already push some rows to result_columns
// before exception, we need to fix it.
auto cur_rows = column->size();
if (cur_rows > total_rows)
column->popBack(cur_rows - total_rows);
// We could already push some rows to result_columns before exception, we need to fix it.
result_columns[i]->rollback(*checkpoints[i]);
// All data columns will get default value in case of error.
column->insertDefault();
result_columns[i]->insertDefault();
}
return 1;

View File

@ -108,23 +108,20 @@ Chunk KafkaSource::generateImpl()
size_t total_rows = 0;
size_t failed_poll_attempts = 0;
auto on_error = [&](const MutableColumns & result_columns, Exception & e)
auto on_error = [&](const MutableColumns & result_columns, const ColumnCheckpoints & checkpoints, Exception & e)
{
ProfileEvents::increment(ProfileEvents::KafkaMessagesFailed);
if (put_error_to_stream)
{
exception_message = e.message();
for (const auto & column : result_columns)
for (size_t i = 0; i < result_columns.size(); ++i)
{
// read_kafka_message could already push some rows to result_columns
// before exception, we need to fix it.
auto cur_rows = column->size();
if (cur_rows > total_rows)
column->popBack(cur_rows - total_rows);
// We could already push some rows to result_columns before exception, we need to fix it.
result_columns[i]->rollback(*checkpoints[i]);
// all data columns will get default value in case of error
column->insertDefault();
result_columns[i]->insertDefault();
}
return 1;

View File

@ -817,23 +817,20 @@ StorageKafka2::PolledBatchInfo StorageKafka2::pollConsumer(
size_t total_rows = 0;
size_t failed_poll_attempts = 0;
auto on_error = [&](const MutableColumns & result_columns, Exception & e)
auto on_error = [&](const MutableColumns & result_columns, const ColumnCheckpoints & checkpoints, Exception & e)
{
ProfileEvents::increment(ProfileEvents::KafkaMessagesFailed);
if (put_error_to_stream)
{
exception_message = e.message();
for (const auto & column : result_columns)
for (size_t i = 0; i < result_columns.size(); ++i)
{
// read_kafka_message could already push some rows to result_columns
// before exception, we need to fix it.
auto cur_rows = column->size();
if (cur_rows > total_rows)
column->popBack(cur_rows - total_rows);
// We could already push some rows to result_columns before exception, we need to fix it.
result_columns[i]->rollback(*checkpoints[i]);
// all data columns will get default value in case of error
column->insertDefault();
result_columns[i]->insertDefault();
}
return 1;

View File

@ -102,21 +102,18 @@ Chunk NATSSource::generate()
storage.getFormatName(), empty_buf, non_virtual_header, context, max_block_size, std::nullopt, 1);
std::optional<String> exception_message;
size_t total_rows = 0;
auto on_error = [&](const MutableColumns & result_columns, Exception & e)
auto on_error = [&](const MutableColumns & result_columns, const ColumnCheckpoints & checkpoints, Exception & e)
{
if (handle_error_mode == StreamingHandleErrorMode::STREAM)
{
exception_message = e.message();
for (const auto & column : result_columns)
for (size_t i = 0; i < result_columns.size(); ++i)
{
// We could already push some rows to result_columns
// before exception, we need to fix it.
auto cur_rows = column->size();
if (cur_rows > total_rows)
column->popBack(cur_rows - total_rows);
// We could already push some rows to result_columns before exception, we need to fix it.
result_columns[i]->rollback(*checkpoints[i]);
// All data columns will get default value in case of error.
column->insertDefault();
result_columns[i]->insertDefault();
}
return 1;

View File

@ -161,21 +161,18 @@ Chunk RabbitMQSource::generateImpl()
std::optional<String> exception_message;
size_t total_rows = 0;
auto on_error = [&](const MutableColumns & result_columns, Exception & e)
auto on_error = [&](const MutableColumns & result_columns, const ColumnCheckpoints & checkpoints, Exception & e)
{
if (handle_error_mode == StreamingHandleErrorMode::STREAM)
{
exception_message = e.message();
for (const auto & column : result_columns)
for (size_t i = 0; i < result_columns.size(); ++i)
{
// We could already push some rows to result_columns
// before exception, we need to fix it.
auto cur_rows = column->size();
if (cur_rows > total_rows)
column->popBack(cur_rows - total_rows);
// We could already push some rows to result_columns before exception, we need to fix it.
result_columns[i]->rollback(*checkpoints[i]);
// All data columns will get default value in case of error.
column->insertDefault();
result_columns[i]->insertDefault();
}
return 1;

View File

@ -0,0 +1,23 @@
#!/usr/bin/env bash
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
${CLICKHOUSE_CLIENT} -q "
DROP TABLE IF EXISTS async_inserts_native;
CREATE TABLE async_inserts_native (m Map(UInt64, UInt64), v UInt64 MATERIALIZED m[4]) ENGINE = Memory;
"
url="${CLICKHOUSE_URL}&async_insert=1&async_insert_busy_timeout_max_ms=1000&async_insert_busy_timeout_min_ms=1000&wait_for_async_insert=1"
# This test runs inserts with memory_tracker_fault_probability > 0 to trigger memory limit during insertion.
# If rollback of columns is wrong in that case it may produce LOGICAL_ERROR and it will caught by termintation of server in debug mode.
for _ in {1..10}; do
${CLICKHOUSE_CLIENT} -q "SELECT (range(number), range(number))::Map(UInt64, UInt64) AS m FROM numbers(1000) FORMAT Native" | \
${CLICKHOUSE_CURL} -sS -X POST "${url}&max_block_size=100&memory_tracker_fault_probability=0.01&query=INSERT+INTO+async_inserts_native+FORMAT+Native" --data-binary @- >/dev/null 2>&1 &
done
wait
${CLICKHOUSE_CLIENT} -q "DROP TABLE async_inserts_native;"