diff --git a/dbms/src/Columns/ColumnAggregateFunction.cpp b/dbms/src/Columns/ColumnAggregateFunction.cpp index d863b7116d5..b50bb81f884 100644 --- a/dbms/src/Columns/ColumnAggregateFunction.cpp +++ b/dbms/src/Columns/ColumnAggregateFunction.cpp @@ -33,8 +33,6 @@ void ColumnAggregateFunction::addArena(ArenaPtr arena_) MutableColumnPtr ColumnAggregateFunction::convertToValues() const { - const IAggregateFunction * function = func.get(); - /** If the aggregate function returns an unfinalized/unfinished state, * then you just need to copy pointers to it and also shared ownership of data. * @@ -65,33 +63,73 @@ MutableColumnPtr ColumnAggregateFunction::convertToValues() const * AggregateFunction(quantileTiming(0.5), UInt64) * into UInt16 - already finished result of `quantileTiming`. */ - if (const AggregateFunctionState * function_state = typeid_cast(function)) + if (const AggregateFunctionState * function_state = typeid_cast(func.get())) { auto res = createView(); res->set(function_state->getNestedFunction()); - res->getData().assign(getData().begin(), getData().end()); + res->data.assign(data.begin(), data.end()); return res; } - MutableColumnPtr res = function->getReturnType()->createColumn(); - res->reserve(getData().size()); + MutableColumnPtr res = func->getReturnType()->createColumn(); + res->reserve(data.size()); - for (auto val : getData()) - function->insertResultInto(val, *res); + for (auto val : data) + func->insertResultInto(val, *res); return res; } +void ColumnAggregateFunction::ensureOwnership() +{ + if (src) + { + /// We must copy all data from src and take ownership. + size_t size = data.size(); + + Arena & arena = createOrGetArena(); + size_t size_of_state = func->sizeOfData(); + size_t align_of_state = func->alignOfData(); + + size_t rollback_pos = 0; + try + { + for (size_t i = 0; i < size; ++i) + { + ConstAggregateDataPtr old_place = data[i]; + data[i] = arena.alignedAlloc(size_of_state, align_of_state); + func->create(data[i]); + ++rollback_pos; + func->merge(data[i], old_place, &arena); + } + } + catch (...) + { + /// If we failed to take ownership, destroy all temporary data. + + if (!func->hasTrivialDestructor()) + for (size_t i = 0; i < rollback_pos; ++i) + func->destroy(data[i]); + + throw; + } + + /// Now we own all data. + src.reset(); + } +} + + void ColumnAggregateFunction::insertRangeFrom(const IColumn & from, size_t start, size_t length) { const ColumnAggregateFunction & from_concrete = static_cast(from); - if (start + length > from_concrete.getData().size()) + if (start + length > from_concrete.data.size()) throw Exception("Parameters start = " + toString(start) + ", length = " + toString(length) + " are out of bound in ColumnAggregateFunction::insertRangeFrom method" " (data.size() = " - + toString(from_concrete.getData().size()) + + toString(from_concrete.data.size()) + ").", ErrorCodes::PARAMETER_OUT_OF_BOUND); @@ -112,14 +150,14 @@ void ColumnAggregateFunction::insertRangeFrom(const IColumn & from, size_t start size_t old_size = data.size(); data.resize(old_size + length); - memcpy(&data[old_size], &from_concrete.getData()[start], length * sizeof(data[0])); + memcpy(&data[old_size], &from_concrete.data[start], length * sizeof(data[0])); } } ColumnPtr ColumnAggregateFunction::filter(const Filter & filter, ssize_t result_size_hint) const { - size_t size = getData().size(); + size_t size = data.size(); if (size != filter.size()) throw Exception("Size of filter doesn't match size of column.", ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH); @@ -127,14 +165,14 @@ ColumnPtr ColumnAggregateFunction::filter(const Filter & filter, ssize_t result_ return cloneEmpty(); auto res = createView(); - auto & res_data = res->getData(); + auto & res_data = res->data; if (result_size_hint) res_data.reserve(result_size_hint > 0 ? result_size_hint : size); for (size_t i = 0; i < size; ++i) if (filter[i]) - res_data.push_back(getData()[i]); + res_data.push_back(data[i]); /// To save RAM in case of too strong filtering. if (res_data.size() * 2 < res_data.capacity()) @@ -146,7 +184,7 @@ ColumnPtr ColumnAggregateFunction::filter(const Filter & filter, ssize_t result_ ColumnPtr ColumnAggregateFunction::permute(const Permutation & perm, size_t limit) const { - size_t size = getData().size(); + size_t size = data.size(); if (limit == 0) limit = size; @@ -158,9 +196,9 @@ ColumnPtr ColumnAggregateFunction::permute(const Permutation & perm, size_t limi auto res = createView(); - res->getData().resize(limit); + res->data.resize(limit); for (size_t i = 0; i < limit; ++i) - res->getData()[i] = getData()[perm[i]]; + res->data[i] = data[perm[i]]; return res; } @@ -175,9 +213,9 @@ ColumnPtr ColumnAggregateFunction::indexImpl(const PaddedPODArray & indexe { auto res = createView(); - res->getData().resize(limit); + res->data.resize(limit); for (size_t i = 0; i < limit; ++i) - res->getData()[i] = getData()[indexes[i]]; + res->data[i] = data[indexes[i]]; return res; } @@ -188,14 +226,14 @@ INSTANTIATE_INDEX_IMPL(ColumnAggregateFunction) void ColumnAggregateFunction::updateHashWithValue(size_t n, SipHash & hash) const { WriteBufferFromOwnString wbuf; - func->serialize(getData()[n], wbuf); + func->serialize(data[n], wbuf); hash.update(wbuf.str().c_str(), wbuf.str().size()); } /// NOTE: Highly overestimates size of a column if it was produced in AggregatingBlockInputStream (it contains size of other columns) size_t ColumnAggregateFunction::byteSize() const { - size_t res = getData().size() * sizeof(getData()[0]); + size_t res = data.size() * sizeof(data[0]); for (const auto & arena : arenas) res += arena->size(); @@ -207,7 +245,7 @@ size_t ColumnAggregateFunction::byteSize() const /// Like byteSize(), highly overestimates size size_t ColumnAggregateFunction::allocatedBytes() const { - size_t res = getData().allocated_bytes(); + size_t res = data.allocated_bytes(); for (const auto & arena : arenas) res += arena->size(); @@ -225,7 +263,7 @@ Field ColumnAggregateFunction::operator[](size_t n) const Field field = String(); { WriteBufferFromString buffer(field.get()); - func->serialize(getData()[n], buffer); + func->serialize(data[n], buffer); } return field; } @@ -235,18 +273,19 @@ void ColumnAggregateFunction::get(size_t n, Field & res) const res = String(); { WriteBufferFromString buffer(res.get()); - func->serialize(getData()[n], buffer); + func->serialize(data[n], buffer); } } StringRef ColumnAggregateFunction::getDataAt(size_t n) const { - return StringRef(reinterpret_cast(&getData()[n]), sizeof(getData()[n])); + return StringRef(reinterpret_cast(&data[n]), sizeof(data[n])); } void ColumnAggregateFunction::insertData(const char * pos, size_t /*length*/) { - getData().push_back(*reinterpret_cast(pos)); + ensureOwnership(); + data.push_back(*reinterpret_cast(pos)); } void ColumnAggregateFunction::insertFrom(const IColumn & from, size_t n) @@ -254,24 +293,26 @@ void ColumnAggregateFunction::insertFrom(const IColumn & from, size_t n) /// Must create new state of aggregate function and take ownership of it, /// because ownership of states of aggregate function cannot be shared for individual rows, /// (only as a whole, see comment above). + ensureOwnership(); insertDefault(); insertMergeFrom(from, n); } void ColumnAggregateFunction::insertFrom(ConstAggregateDataPtr place) { + ensureOwnership(); insertDefault(); insertMergeFrom(place); } void ColumnAggregateFunction::insertMergeFrom(ConstAggregateDataPtr place) { - func->merge(getData().back(), place, &createOrGetArena()); + func->merge(data.back(), place, &createOrGetArena()); } void ColumnAggregateFunction::insertMergeFrom(const IColumn & from, size_t n) { - insertMergeFrom(static_cast(from).getData()[n]); + insertMergeFrom(static_cast(from).data[n]); } Arena & ColumnAggregateFunction::createOrGetArena() @@ -281,47 +322,54 @@ Arena & ColumnAggregateFunction::createOrGetArena() return *arenas.back().get(); } + +static void pushBackAndCreateState(ColumnAggregateFunction::Container & data, Arena & arena, IAggregateFunction * func) +{ + data.push_back(arena.alignedAlloc(func->sizeOfData(), func->alignOfData())); + try + { + func->create(data.back()); + } + catch (...) + { + data.pop_back(); + throw; + } +} + + void ColumnAggregateFunction::insert(const Field & x) { - IAggregateFunction * function = func.get(); - + ensureOwnership(); Arena & arena = createOrGetArena(); - - getData().push_back(arena.alignedAlloc(function->sizeOfData(), function->alignOfData())); - function->create(getData().back()); + pushBackAndCreateState(data, arena, func.get()); ReadBufferFromString read_buffer(x.get()); - function->deserialize(getData().back(), read_buffer, &arena); + func->deserialize(data.back(), read_buffer, &arena); } void ColumnAggregateFunction::insertDefault() { - IAggregateFunction * function = func.get(); - + ensureOwnership(); Arena & arena = createOrGetArena(); - - getData().push_back(arena.alignedAlloc(function->sizeOfData(), function->alignOfData())); - function->create(getData().back()); + pushBackAndCreateState(data, arena, func.get()); } StringRef ColumnAggregateFunction::serializeValueIntoArena(size_t n, Arena & dst, const char *& begin) const { - IAggregateFunction * function = func.get(); WriteBufferFromArena out(dst, begin); - function->serialize(getData()[n], out); + func->serialize(data[n], out); return out.finish(); } const char * ColumnAggregateFunction::deserializeAndInsertFromArena(const char * src_arena) { - IAggregateFunction * function = func.get(); + ensureOwnership(); /** Parameter "src_arena" points to Arena, from which we will deserialize the state. * And "dst_arena" is another Arena, that aggregate function state will use to store its data. */ Arena & dst_arena = createOrGetArena(); - - getData().push_back(dst_arena.alignedAlloc(function->sizeOfData(), function->alignOfData())); - function->create(getData().back()); + pushBackAndCreateState(data, dst_arena, func.get()); /** We will read from src_arena. * There is no limit for reading - it is assumed, that we can read all that we need after src_arena pointer. @@ -331,7 +379,7 @@ const char * ColumnAggregateFunction::deserializeAndInsertFromArena(const char * * Probably this will not work under UBSan. */ ReadBufferFromMemory read_buffer(src_arena, std::numeric_limits::max() - src_arena); - function->deserialize(getData().back(), read_buffer, &dst_arena); + func->deserialize(data.back(), read_buffer, &dst_arena); return read_buffer.position(); } @@ -358,7 +406,7 @@ ColumnPtr ColumnAggregateFunction::replicate(const IColumn::Offsets & offsets) c return cloneEmpty(); auto res = createView(); - auto & res_data = res->getData(); + auto & res_data = res->data; res_data.reserve(offsets.back()); IColumn::Offset prev_offset = 0; @@ -399,7 +447,7 @@ MutableColumns ColumnAggregateFunction::scatter(IColumn::ColumnIndex num_columns void ColumnAggregateFunction::getPermutation(bool /*reverse*/, size_t /*limit*/, int /*nan_direction_hint*/, IColumn::Permutation & res) const { - size_t s = getData().size(); + size_t s = data.size(); res.resize(s); for (size_t i = 0; i < s; ++i) res[i] = i; diff --git a/dbms/src/Columns/ColumnAggregateFunction.h b/dbms/src/Columns/ColumnAggregateFunction.h index 674d98f8892..cd352007095 100644 --- a/dbms/src/Columns/ColumnAggregateFunction.h +++ b/dbms/src/Columns/ColumnAggregateFunction.h @@ -74,6 +74,11 @@ private: return res; } + /// If we have another column as a source (owner of data), copy all data to ourself and reset source. + /// This is needed before inserting new elements, because we must own these elements (to destroy them in destructor), + /// but ownership of different elements cannot be mixed by different columns. + void ensureOwnership(); + ColumnAggregateFunction(const AggregateFunctionPtr & func_) : func(func_) {