mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-20 00:30:49 +00:00
better rollbacks of columns
This commit is contained in:
parent
0837a51313
commit
c39d7092d0
@ -369,6 +369,17 @@ void ColumnArray::popBack(size_t n)
|
||||
offsets_data.resize_assume_reserved(offsets_data.size() - n);
|
||||
}
|
||||
|
||||
ColumnCheckpointPtr ColumnArray::getCheckpoint() const
|
||||
{
|
||||
return std::make_shared<ColumnCheckpointWithNested>(size(), getData().getCheckpoint());
|
||||
}
|
||||
|
||||
void ColumnArray::rollback(const ColumnCheckpoint & checkpoint)
|
||||
{
|
||||
getOffsets().resize_assume_reserved(checkpoint.size);
|
||||
getData().rollback(*assert_cast<const ColumnCheckpointWithNested &>(checkpoint).nested);
|
||||
}
|
||||
|
||||
int ColumnArray::compareAtImpl(size_t n, size_t m, const IColumn & rhs_, int nan_direction_hint, const Collator * collator) const
|
||||
{
|
||||
const ColumnArray & rhs = assert_cast<const ColumnArray &>(rhs_);
|
||||
|
@ -161,6 +161,9 @@ public:
|
||||
|
||||
ColumnPtr compress() const override;
|
||||
|
||||
ColumnCheckpointPtr getCheckpoint() const override;
|
||||
void rollback(const ColumnCheckpoint & checkpoint) override;
|
||||
|
||||
void forEachSubcolumn(MutableColumnCallback callback) override
|
||||
{
|
||||
callback(offsets);
|
||||
|
@ -304,6 +304,16 @@ public:
|
||||
variant_column_ptr->protect();
|
||||
}
|
||||
|
||||
ColumnCheckpointPtr getCheckpoint() const override
|
||||
{
|
||||
return variant_column_ptr->getCheckpoint();
|
||||
}
|
||||
|
||||
void rollback(const ColumnCheckpoint & checkpoint) override
|
||||
{
|
||||
variant_column_ptr->rollback(checkpoint);
|
||||
}
|
||||
|
||||
void forEachSubcolumn(MutableColumnCallback callback) override
|
||||
{
|
||||
callback(variant_column);
|
||||
|
@ -312,6 +312,16 @@ void ColumnMap::getExtremes(Field & min, Field & max) const
|
||||
max = std::move(map_max_value);
|
||||
}
|
||||
|
||||
ColumnCheckpointPtr ColumnMap::getCheckpoint() const
|
||||
{
|
||||
return nested->getCheckpoint();
|
||||
}
|
||||
|
||||
void ColumnMap::rollback(const ColumnCheckpoint & checkpoint)
|
||||
{
|
||||
nested->rollback(checkpoint);
|
||||
}
|
||||
|
||||
void ColumnMap::forEachSubcolumn(MutableColumnCallback callback)
|
||||
{
|
||||
callback(nested);
|
||||
|
@ -102,6 +102,8 @@ public:
|
||||
size_t byteSizeAt(size_t n) const override;
|
||||
size_t allocatedBytes() const override;
|
||||
void protect() override;
|
||||
ColumnCheckpointPtr getCheckpoint() const override;
|
||||
void rollback(const ColumnCheckpoint & checkpoint) override;
|
||||
void forEachSubcolumn(MutableColumnCallback callback) override;
|
||||
void forEachSubcolumnRecursively(RecursiveMutableColumnCallback callback) override;
|
||||
bool structureEquals(const IColumn & rhs) const override;
|
||||
|
@ -305,6 +305,17 @@ void ColumnNullable::popBack(size_t n)
|
||||
getNullMapColumn().popBack(n);
|
||||
}
|
||||
|
||||
ColumnCheckpointPtr ColumnNullable::getCheckpoint() const
|
||||
{
|
||||
return std::make_shared<ColumnCheckpointWithNested>(size(), nested_column->getCheckpoint());
|
||||
}
|
||||
|
||||
void ColumnNullable::rollback(const ColumnCheckpoint & checkpoint)
|
||||
{
|
||||
getNullMapData().resize_assume_reserved(checkpoint.size);
|
||||
nested_column->rollback(*assert_cast<const ColumnCheckpointWithNested &>(checkpoint).nested);
|
||||
}
|
||||
|
||||
ColumnPtr ColumnNullable::filter(const Filter & filt, ssize_t result_size_hint) const
|
||||
{
|
||||
ColumnPtr filtered_data = getNestedColumn().filter(filt, result_size_hint);
|
||||
|
@ -143,6 +143,9 @@ public:
|
||||
|
||||
ColumnPtr compress() const override;
|
||||
|
||||
ColumnCheckpointPtr getCheckpoint() const override;
|
||||
void rollback(const ColumnCheckpoint & checkpoint) override;
|
||||
|
||||
void forEachSubcolumn(MutableColumnCallback callback) override
|
||||
{
|
||||
callback(nested_column);
|
||||
|
@ -30,6 +30,23 @@ const std::shared_ptr<SerializationDynamic> & getDynamicSerialization()
|
||||
return dynamic_serialization;
|
||||
}
|
||||
|
||||
struct ColumnObjectCheckpoint : public ColumnCheckpoint
|
||||
{
|
||||
using CheckpointsMap = std::unordered_map<String, ColumnCheckpointPtr>;
|
||||
|
||||
ColumnObjectCheckpoint(size_t size_, CheckpointsMap typed_paths_, CheckpointsMap dynamic_paths_, ColumnCheckpointPtr shared_data_)
|
||||
: ColumnCheckpoint(size_)
|
||||
, typed_paths(std::move(typed_paths_))
|
||||
, dynamic_paths(std::move(dynamic_paths_))
|
||||
, shared_data(std::move(shared_data_))
|
||||
{
|
||||
}
|
||||
|
||||
CheckpointsMap typed_paths;
|
||||
CheckpointsMap dynamic_paths;
|
||||
ColumnCheckpointPtr shared_data;
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
ColumnObject::ColumnObject(
|
||||
@ -655,6 +672,41 @@ void ColumnObject::popBack(size_t n)
|
||||
shared_data->popBack(n);
|
||||
}
|
||||
|
||||
ColumnCheckpointPtr ColumnObject::getCheckpoint() const
|
||||
{
|
||||
auto get_checkpoints = [](const auto & columns)
|
||||
{
|
||||
std::unordered_map<String, ColumnCheckpointPtr> checkpoints;
|
||||
for (const auto & [name, column] : columns)
|
||||
checkpoints[name] = column->getCheckpoint();
|
||||
|
||||
return checkpoints;
|
||||
};
|
||||
|
||||
return std::make_shared<ColumnObjectCheckpoint>(size(), get_checkpoints(typed_paths), get_checkpoints(dynamic_paths_ptrs), shared_data->getCheckpoint());
|
||||
}
|
||||
|
||||
void ColumnObject::rollback(const ColumnCheckpoint & checkpoint)
|
||||
{
|
||||
const auto & object_checkpoint = assert_cast<const ColumnObjectCheckpoint &>(checkpoint);
|
||||
|
||||
for (auto & [name, column] : typed_paths)
|
||||
{
|
||||
const auto & nested_checkpoint = object_checkpoint.typed_paths.at(name);
|
||||
chassert(nested_checkpoint);
|
||||
column->rollback(*nested_checkpoint);
|
||||
}
|
||||
|
||||
for (auto & [name, column] : dynamic_paths_ptrs)
|
||||
{
|
||||
const auto & nested_checkpoint = object_checkpoint.dynamic_paths.at(name);
|
||||
chassert(nested_checkpoint);
|
||||
column->rollback(*nested_checkpoint);
|
||||
}
|
||||
|
||||
shared_data->rollback(*object_checkpoint.shared_data);
|
||||
}
|
||||
|
||||
StringRef ColumnObject::serializeValueIntoArena(size_t n, Arena & arena, const char *& begin) const
|
||||
{
|
||||
StringRef res(begin, 0);
|
||||
|
@ -159,6 +159,8 @@ public:
|
||||
size_t byteSizeAt(size_t n) const override;
|
||||
size_t allocatedBytes() const override;
|
||||
void protect() override;
|
||||
ColumnCheckpointPtr getCheckpoint() const override;
|
||||
void rollback(const ColumnCheckpoint & checkpoint) override;
|
||||
|
||||
void forEachSubcolumn(MutableColumnCallback callback) override;
|
||||
|
||||
|
@ -308,6 +308,22 @@ void ColumnSparse::popBack(size_t n)
|
||||
_size = new_size;
|
||||
}
|
||||
|
||||
ColumnCheckpointPtr ColumnSparse::getCheckpoint() const
|
||||
{
|
||||
return std::make_shared<ColumnCheckpointWithNested>(size(), values->getCheckpoint());
|
||||
}
|
||||
|
||||
void ColumnSparse::rollback(const ColumnCheckpoint & checkpoint)
|
||||
{
|
||||
_size = checkpoint.size;
|
||||
|
||||
const auto & nested = *assert_cast<const ColumnCheckpointWithNested &>(checkpoint).nested;
|
||||
chassert(nested.size > 0);
|
||||
|
||||
values->rollback(nested);
|
||||
getOffsetsData().resize_assume_reserved(nested.size - 1);
|
||||
}
|
||||
|
||||
ColumnPtr ColumnSparse::filter(const Filter & filt, ssize_t) const
|
||||
{
|
||||
if (_size != filt.size())
|
||||
|
@ -149,6 +149,9 @@ public:
|
||||
|
||||
ColumnPtr compress() const override;
|
||||
|
||||
ColumnCheckpointPtr getCheckpoint() const override;
|
||||
void rollback(const ColumnCheckpoint & checkpoint) override;
|
||||
|
||||
void forEachSubcolumn(MutableColumnCallback callback) override;
|
||||
void forEachSubcolumnRecursively(RecursiveMutableColumnCallback callback) override;
|
||||
|
||||
|
@ -254,6 +254,27 @@ void ColumnTuple::popBack(size_t n)
|
||||
column->popBack(n);
|
||||
}
|
||||
|
||||
ColumnCheckpointPtr ColumnTuple::getCheckpoint() const
|
||||
{
|
||||
ColumnCheckpoints checkpoints;
|
||||
checkpoints.reserve(columns.size());
|
||||
|
||||
for (const auto & column : columns)
|
||||
checkpoints.push_back(column->getCheckpoint());
|
||||
|
||||
return std::make_shared<ColumnCheckpointWithNestedTuple>(size(), std::move(checkpoints));
|
||||
}
|
||||
|
||||
void ColumnTuple::rollback(const ColumnCheckpoint & checkpoint)
|
||||
{
|
||||
column_length = checkpoint.size;
|
||||
const auto & checkpoints = assert_cast<const ColumnCheckpointWithNestedTuple &>(checkpoint).nested;
|
||||
|
||||
chassert(columns.size() == checkpoints.size());
|
||||
for (size_t i = 0; i < columns.size(); ++i)
|
||||
columns[i]->rollback(*checkpoints[i]);
|
||||
}
|
||||
|
||||
StringRef ColumnTuple::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const
|
||||
{
|
||||
if (columns.empty())
|
||||
|
@ -118,6 +118,8 @@ public:
|
||||
size_t byteSizeAt(size_t n) const override;
|
||||
size_t allocatedBytes() const override;
|
||||
void protect() override;
|
||||
ColumnCheckpointPtr getCheckpoint() const override;
|
||||
void rollback(const ColumnCheckpoint & checkpoint) override;
|
||||
void forEachSubcolumn(MutableColumnCallback callback) override;
|
||||
void forEachSubcolumnRecursively(RecursiveMutableColumnCallback callback) override;
|
||||
bool structureEquals(const IColumn & rhs) const override;
|
||||
|
@ -739,6 +739,29 @@ void ColumnVariant::popBack(size_t n)
|
||||
offsets->popBack(n);
|
||||
}
|
||||
|
||||
ColumnCheckpointPtr ColumnVariant::getCheckpoint() const
|
||||
{
|
||||
ColumnCheckpoints checkpoints;
|
||||
checkpoints.reserve(variants.size());
|
||||
|
||||
for (const auto & column : variants)
|
||||
checkpoints.push_back(column->getCheckpoint());
|
||||
|
||||
return std::make_shared<ColumnCheckpointWithNestedTuple>(size(), std::move(checkpoints));
|
||||
}
|
||||
|
||||
void ColumnVariant::rollback(const ColumnCheckpoint & checkpoint)
|
||||
{
|
||||
getOffsets().resize_assume_reserved(checkpoint.size);
|
||||
getLocalDiscriminators().resize_assume_reserved(checkpoint.size);
|
||||
|
||||
const auto & checkpoints = assert_cast<const ColumnCheckpointWithNestedTuple &>(checkpoint).nested;
|
||||
chassert(variants.size() == checkpoints.size());
|
||||
|
||||
for (size_t i = 0; i < variants.size(); ++i)
|
||||
variants[i]->rollback(*checkpoints[i]);
|
||||
}
|
||||
|
||||
StringRef ColumnVariant::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const
|
||||
{
|
||||
/// During any serialization/deserialization we should always use global discriminators.
|
||||
|
@ -248,6 +248,8 @@ public:
|
||||
size_t byteSizeAt(size_t n) const override;
|
||||
size_t allocatedBytes() const override;
|
||||
void protect() override;
|
||||
ColumnCheckpointPtr getCheckpoint() const override;
|
||||
void rollback(const ColumnCheckpoint & checkpoint) override;
|
||||
void forEachSubcolumn(MutableColumnCallback callback) override;
|
||||
void forEachSubcolumnRecursively(RecursiveMutableColumnCallback callback) override;
|
||||
bool structureEquals(const IColumn & rhs) const override;
|
||||
|
@ -49,6 +49,40 @@ struct EqualRange
|
||||
|
||||
using EqualRanges = std::vector<EqualRange>;
|
||||
|
||||
/// A checkpoint that contains size of column and all its subcolumns.
|
||||
/// It can be used to rollback column to the previous state, for example
|
||||
/// after failed parsing when column may be in inconsistent state.
|
||||
struct ColumnCheckpoint
|
||||
{
|
||||
explicit ColumnCheckpoint(size_t size_) : size(size_) {}
|
||||
size_t size = 0;
|
||||
};
|
||||
|
||||
using ColumnCheckpointPtr = std::shared_ptr<const ColumnCheckpoint>;
|
||||
using ColumnCheckpoints = std::vector<ColumnCheckpointPtr>;
|
||||
|
||||
struct ColumnCheckpointWithNested : public ColumnCheckpoint
|
||||
{
|
||||
ColumnCheckpointWithNested(size_t size_, ColumnCheckpointPtr nested_)
|
||||
: ColumnCheckpoint(size_)
|
||||
, nested(std::move(nested_))
|
||||
{
|
||||
}
|
||||
|
||||
ColumnCheckpointPtr nested;
|
||||
};
|
||||
|
||||
struct ColumnCheckpointWithNestedTuple : public ColumnCheckpoint
|
||||
{
|
||||
ColumnCheckpointWithNestedTuple(size_t size_, ColumnCheckpoints nested_)
|
||||
: ColumnCheckpoint(size_)
|
||||
, nested(std::move(nested_))
|
||||
{
|
||||
}
|
||||
|
||||
ColumnCheckpoints nested;
|
||||
};
|
||||
|
||||
/// Declares interface to store columns in memory.
|
||||
class IColumn : public COW<IColumn>
|
||||
{
|
||||
@ -509,6 +543,13 @@ public:
|
||||
/// The operation is slow and performed only for debug builds.
|
||||
virtual void protect() {}
|
||||
|
||||
/// Returns checkpoint of current state of column.
|
||||
virtual ColumnCheckpointPtr getCheckpoint() const { return std::make_shared<ColumnCheckpoint>(size()); }
|
||||
|
||||
/// Rollbacks column to the checkpoint.
|
||||
/// Unlike 'popBack' this method should work correctly even if column has invalid state.
|
||||
virtual void rollback(const ColumnCheckpoint & checkpoint) { popBack(size() - checkpoint.size); }
|
||||
|
||||
/// If the column contains subcolumns (such as Array, Nullable, etc), do callback on them.
|
||||
/// Shallow: doesn't do recursive calls; don't do call for itself.
|
||||
|
||||
|
@ -971,15 +971,14 @@ Chunk AsynchronousInsertQueue::processEntriesWithParsing(
|
||||
adding_defaults_transform = std::make_shared<AddingDefaultsTransform>(header, columns, *format, insert_context);
|
||||
}
|
||||
|
||||
auto on_error = [&](const MutableColumns & result_columns, Exception & e)
|
||||
auto on_error = [&](const MutableColumns & result_columns, const ColumnCheckpoints & checkpoints, Exception & e)
|
||||
{
|
||||
current_exception = e.displayText();
|
||||
LOG_ERROR(logger, "Failed parsing for query '{}' with query id {}. {}",
|
||||
key.query_str, current_entry->query_id, current_exception);
|
||||
|
||||
for (const auto & column : result_columns)
|
||||
if (column->size() > total_rows)
|
||||
column->popBack(column->size() - total_rows);
|
||||
for (size_t i = 0; i < result_columns.size(); ++i)
|
||||
result_columns[i]->rollback(*checkpoints[i]);
|
||||
|
||||
current_entry->finish(std::current_exception());
|
||||
return 0;
|
||||
|
@ -21,6 +21,7 @@ StreamingFormatExecutor::StreamingFormatExecutor(
|
||||
, adding_defaults_transform(std::move(adding_defaults_transform_))
|
||||
, port(format->getPort().getHeader(), format.get())
|
||||
, result_columns(header.cloneEmptyColumns())
|
||||
, checkpoints(result_columns.size())
|
||||
{
|
||||
connect(format->getPort(), port);
|
||||
}
|
||||
@ -45,6 +46,8 @@ size_t StreamingFormatExecutor::execute(ReadBuffer & buffer)
|
||||
|
||||
size_t StreamingFormatExecutor::execute()
|
||||
{
|
||||
setCheckpoints();
|
||||
|
||||
try
|
||||
{
|
||||
size_t new_rows = 0;
|
||||
@ -77,19 +80,19 @@ size_t StreamingFormatExecutor::execute()
|
||||
catch (Exception & e)
|
||||
{
|
||||
format->resetParser();
|
||||
return on_error(result_columns, e);
|
||||
return on_error(result_columns, checkpoints, e);
|
||||
}
|
||||
catch (std::exception & e)
|
||||
{
|
||||
format->resetParser();
|
||||
auto exception = Exception(Exception::CreateFromSTDTag{}, e);
|
||||
return on_error(result_columns, exception);
|
||||
return on_error(result_columns, checkpoints, exception);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
format->resetParser();
|
||||
auto exception = Exception(ErrorCodes::UNKNOWN_EXCEPTION, "Unknowk exception while executing StreamingFormatExecutor with format {}", format->getName());
|
||||
return on_error(result_columns, exception);
|
||||
auto exception = Exception(ErrorCodes::UNKNOWN_EXCEPTION, "Unknown exception while executing StreamingFormatExecutor with format {}", format->getName());
|
||||
return on_error(result_columns, checkpoints, exception);
|
||||
}
|
||||
}
|
||||
|
||||
@ -106,4 +109,11 @@ size_t StreamingFormatExecutor::insertChunk(Chunk chunk)
|
||||
return chunk_rows;
|
||||
}
|
||||
|
||||
void StreamingFormatExecutor::setCheckpoints()
|
||||
{
|
||||
for (size_t i = 0; i < result_columns.size(); ++i)
|
||||
checkpoints[i] = result_columns[i]->getCheckpoint();
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
@ -19,12 +19,12 @@ public:
|
||||
/// and exception to rethrow it or add context to it.
|
||||
/// Should return number of new rows, which are added in callback
|
||||
/// to result columns in comparison to previous call of `execute`.
|
||||
using ErrorCallback = std::function<size_t(const MutableColumns &, Exception &)>;
|
||||
using ErrorCallback = std::function<size_t(const MutableColumns &, const ColumnCheckpoints &, Exception &)>;
|
||||
|
||||
StreamingFormatExecutor(
|
||||
const Block & header_,
|
||||
InputFormatPtr format_,
|
||||
ErrorCallback on_error_ = [](const MutableColumns &, Exception & e) -> size_t { throw std::move(e); },
|
||||
ErrorCallback on_error_ = [](const MutableColumns &, const ColumnCheckpoints, Exception & e) -> size_t { throw std::move(e); },
|
||||
SimpleTransformPtr adding_defaults_transform_ = nullptr);
|
||||
|
||||
/// Returns numbers of new read rows.
|
||||
@ -40,6 +40,8 @@ public:
|
||||
MutableColumns getResultColumns();
|
||||
|
||||
private:
|
||||
void setCheckpoints();
|
||||
|
||||
const Block header;
|
||||
const InputFormatPtr format;
|
||||
const ErrorCallback on_error;
|
||||
@ -47,6 +49,7 @@ private:
|
||||
|
||||
InputPort port;
|
||||
MutableColumns result_columns;
|
||||
ColumnCheckpoints checkpoints;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -86,21 +86,18 @@ Chunk FileLogSource::generate()
|
||||
std::optional<String> exception_message;
|
||||
size_t total_rows = 0;
|
||||
|
||||
auto on_error = [&](const MutableColumns & result_columns, Exception & e)
|
||||
auto on_error = [&](const MutableColumns & result_columns, const ColumnCheckpoints & checkpoints, Exception & e)
|
||||
{
|
||||
if (handle_error_mode == StreamingHandleErrorMode::STREAM)
|
||||
{
|
||||
exception_message = e.message();
|
||||
for (const auto & column : result_columns)
|
||||
for (size_t i = 0; i < result_columns.size(); ++i)
|
||||
{
|
||||
// We could already push some rows to result_columns
|
||||
// before exception, we need to fix it.
|
||||
auto cur_rows = column->size();
|
||||
if (cur_rows > total_rows)
|
||||
column->popBack(cur_rows - total_rows);
|
||||
// We could already push some rows to result_columns before exception, we need to fix it.
|
||||
result_columns[i]->rollback(*checkpoints[i]);
|
||||
|
||||
// All data columns will get default value in case of error.
|
||||
column->insertDefault();
|
||||
result_columns[i]->insertDefault();
|
||||
}
|
||||
|
||||
return 1;
|
||||
|
@ -108,23 +108,20 @@ Chunk KafkaSource::generateImpl()
|
||||
size_t total_rows = 0;
|
||||
size_t failed_poll_attempts = 0;
|
||||
|
||||
auto on_error = [&](const MutableColumns & result_columns, Exception & e)
|
||||
auto on_error = [&](const MutableColumns & result_columns, const ColumnCheckpoints & checkpoints, Exception & e)
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::KafkaMessagesFailed);
|
||||
|
||||
if (put_error_to_stream)
|
||||
{
|
||||
exception_message = e.message();
|
||||
for (const auto & column : result_columns)
|
||||
for (size_t i = 0; i < result_columns.size(); ++i)
|
||||
{
|
||||
// read_kafka_message could already push some rows to result_columns
|
||||
// before exception, we need to fix it.
|
||||
auto cur_rows = column->size();
|
||||
if (cur_rows > total_rows)
|
||||
column->popBack(cur_rows - total_rows);
|
||||
// We could already push some rows to result_columns before exception, we need to fix it.
|
||||
result_columns[i]->rollback(*checkpoints[i]);
|
||||
|
||||
// all data columns will get default value in case of error
|
||||
column->insertDefault();
|
||||
result_columns[i]->insertDefault();
|
||||
}
|
||||
|
||||
return 1;
|
||||
|
@ -817,23 +817,20 @@ StorageKafka2::PolledBatchInfo StorageKafka2::pollConsumer(
|
||||
size_t total_rows = 0;
|
||||
size_t failed_poll_attempts = 0;
|
||||
|
||||
auto on_error = [&](const MutableColumns & result_columns, Exception & e)
|
||||
auto on_error = [&](const MutableColumns & result_columns, const ColumnCheckpoints & checkpoints, Exception & e)
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::KafkaMessagesFailed);
|
||||
|
||||
if (put_error_to_stream)
|
||||
{
|
||||
exception_message = e.message();
|
||||
for (const auto & column : result_columns)
|
||||
for (size_t i = 0; i < result_columns.size(); ++i)
|
||||
{
|
||||
// read_kafka_message could already push some rows to result_columns
|
||||
// before exception, we need to fix it.
|
||||
auto cur_rows = column->size();
|
||||
if (cur_rows > total_rows)
|
||||
column->popBack(cur_rows - total_rows);
|
||||
// We could already push some rows to result_columns before exception, we need to fix it.
|
||||
result_columns[i]->rollback(*checkpoints[i]);
|
||||
|
||||
// all data columns will get default value in case of error
|
||||
column->insertDefault();
|
||||
result_columns[i]->insertDefault();
|
||||
}
|
||||
|
||||
return 1;
|
||||
|
@ -102,21 +102,18 @@ Chunk NATSSource::generate()
|
||||
storage.getFormatName(), empty_buf, non_virtual_header, context, max_block_size, std::nullopt, 1);
|
||||
std::optional<String> exception_message;
|
||||
size_t total_rows = 0;
|
||||
auto on_error = [&](const MutableColumns & result_columns, Exception & e)
|
||||
auto on_error = [&](const MutableColumns & result_columns, const ColumnCheckpoints & checkpoints, Exception & e)
|
||||
{
|
||||
if (handle_error_mode == StreamingHandleErrorMode::STREAM)
|
||||
{
|
||||
exception_message = e.message();
|
||||
for (const auto & column : result_columns)
|
||||
for (size_t i = 0; i < result_columns.size(); ++i)
|
||||
{
|
||||
// We could already push some rows to result_columns
|
||||
// before exception, we need to fix it.
|
||||
auto cur_rows = column->size();
|
||||
if (cur_rows > total_rows)
|
||||
column->popBack(cur_rows - total_rows);
|
||||
// We could already push some rows to result_columns before exception, we need to fix it.
|
||||
result_columns[i]->rollback(*checkpoints[i]);
|
||||
|
||||
// All data columns will get default value in case of error.
|
||||
column->insertDefault();
|
||||
result_columns[i]->insertDefault();
|
||||
}
|
||||
|
||||
return 1;
|
||||
|
@ -161,21 +161,18 @@ Chunk RabbitMQSource::generateImpl()
|
||||
std::optional<String> exception_message;
|
||||
size_t total_rows = 0;
|
||||
|
||||
auto on_error = [&](const MutableColumns & result_columns, Exception & e)
|
||||
auto on_error = [&](const MutableColumns & result_columns, const ColumnCheckpoints & checkpoints, Exception & e)
|
||||
{
|
||||
if (handle_error_mode == StreamingHandleErrorMode::STREAM)
|
||||
{
|
||||
exception_message = e.message();
|
||||
for (const auto & column : result_columns)
|
||||
for (size_t i = 0; i < result_columns.size(); ++i)
|
||||
{
|
||||
// We could already push some rows to result_columns
|
||||
// before exception, we need to fix it.
|
||||
auto cur_rows = column->size();
|
||||
if (cur_rows > total_rows)
|
||||
column->popBack(cur_rows - total_rows);
|
||||
// We could already push some rows to result_columns before exception, we need to fix it.
|
||||
result_columns[i]->rollback(*checkpoints[i]);
|
||||
|
||||
// All data columns will get default value in case of error.
|
||||
column->insertDefault();
|
||||
result_columns[i]->insertDefault();
|
||||
}
|
||||
|
||||
return 1;
|
||||
|
23
tests/queries/0_stateless/03230_async_insert_native.sh
Executable file
23
tests/queries/0_stateless/03230_async_insert_native.sh
Executable file
@ -0,0 +1,23 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
# shellcheck source=../shell_config.sh
|
||||
. "$CUR_DIR"/../shell_config.sh
|
||||
|
||||
${CLICKHOUSE_CLIENT} -q "
|
||||
DROP TABLE IF EXISTS async_inserts_native;
|
||||
CREATE TABLE async_inserts_native (m Map(UInt64, UInt64), v UInt64 MATERIALIZED m[4]) ENGINE = Memory;
|
||||
"
|
||||
|
||||
url="${CLICKHOUSE_URL}&async_insert=1&async_insert_busy_timeout_max_ms=1000&async_insert_busy_timeout_min_ms=1000&wait_for_async_insert=1"
|
||||
|
||||
# This test runs inserts with memory_tracker_fault_probability > 0 to trigger memory limit during insertion.
|
||||
# If rollback of columns is wrong in that case it may produce LOGICAL_ERROR and it will caught by termintation of server in debug mode.
|
||||
for _ in {1..10}; do
|
||||
${CLICKHOUSE_CLIENT} -q "SELECT (range(number), range(number))::Map(UInt64, UInt64) AS m FROM numbers(1000) FORMAT Native" | \
|
||||
${CLICKHOUSE_CURL} -sS -X POST "${url}&max_block_size=100&memory_tracker_fault_probability=0.01&query=INSERT+INTO+async_inserts_native+FORMAT+Native" --data-binary @- >/dev/null 2>&1 &
|
||||
done
|
||||
|
||||
wait
|
||||
|
||||
${CLICKHOUSE_CLIENT} -q "DROP TABLE async_inserts_native;"
|
Loading…
Reference in New Issue
Block a user