fix aligned states review comments

This commit is contained in:
chenxing.xc 2018-08-05 18:12:06 +08:00
parent 202894e191
commit 4acc441179
5 changed files with 21 additions and 20 deletions

View File

@ -264,7 +264,7 @@ void ColumnAggregateFunction::insert(const Field & x)
Arena & arena = createOrGetArena();
getData().push_back(arena.alloc_align(function->sizeOfData(), function->alignOfData()));
getData().push_back(arena.alignedAlloc(function->sizeOfData(), function->alignOfData()));
function->create(getData().back());
ReadBufferFromString read_buffer(x.get<const String &>());
function->deserialize(getData().back(), read_buffer, &arena);
@ -276,7 +276,7 @@ void ColumnAggregateFunction::insertDefault()
Arena & arena = createOrGetArena();
getData().push_back(arena.alloc_align(function->sizeOfData(), function->alignOfData()));
getData().push_back(arena.alignedAlloc(function->sizeOfData(), function->alignOfData()));
function->create(getData().back());
}
@ -297,7 +297,7 @@ const char * ColumnAggregateFunction::deserializeAndInsertFromArena(const char *
*/
Arena & dst_arena = createOrGetArena();
getData().push_back(dst_arena.alloc_align(function->sizeOfData(), function->alignOfData()));
getData().push_back(dst_arena.alignedAlloc(function->sizeOfData(), function->alignOfData()));
function->create(getData().back());
/** We will read from src_arena.

View File

@ -125,24 +125,23 @@ public:
}
/// Get peice of memory with alignment
char * alloc_align(size_t size, size_t align)
char * alignedAlloc(size_t size, size_t align)
{
// Fast code path for non-alignment requirement
if (align <= 1) return alloc(size);
if (align <= 1)
return alloc(size);
size_t pos = (size_t)(head->pos);
uintptr_t pos = reinterpret_cast<uintptr_t>(head->pos);
// next pos match align requirement
pos = (pos & ~(align - 1)) + align;
pos = ((pos-1) & ~(align - 1)) + align;
if (unlikely(pos + size > (size_t)(head->end)))
if (unlikely(pos + size > reinterpret_cast<uintptr_t>(head->end)))
{
addChunk(size);
pos = (size_t)(head->pos);
pos = (pos & ~(align - 1)) + align;
pos = reinterpret_cast<uintptr_t>(head->pos);
pos = ((pos-1) & ~(align - 1)) + align;
}
char * res = (char *)pos;
head->pos = res + size;
return res;

View File

@ -82,7 +82,7 @@ void DataTypeAggregateFunction::deserializeBinary(IColumn & column, ReadBuffer &
Arena & arena = column_concrete.createOrGetArena();
size_t size_of_state = function->sizeOfData();
AggregateDataPtr place = arena.alloc(size_of_state);
AggregateDataPtr place = arena.alignedAlloc(size_of_state, function->alignOfData());
function->create(place);
try
@ -123,13 +123,14 @@ void DataTypeAggregateFunction::deserializeBinaryBulk(IColumn & column, ReadBuff
vec.reserve(vec.size() + limit);
size_t size_of_state = function->sizeOfData();
size_t align_of_state = function->alignOfData();
for (size_t i = 0; i < limit; ++i)
{
if (istr.eof())
break;
AggregateDataPtr place = arena.alloc(size_of_state);
AggregateDataPtr place = arena.alignedAlloc(size_of_state, align_of_state);
function->create(place);
@ -160,7 +161,7 @@ static void deserializeFromString(const AggregateFunctionPtr & function, IColumn
Arena & arena = column_concrete.createOrGetArena();
size_t size_of_state = function->sizeOfData();
AggregateDataPtr place = arena.alloc(size_of_state);
AggregateDataPtr place = arena.alignedAlloc(size_of_state, function->alignOfData());
function->create(place);

View File

@ -635,7 +635,7 @@ void NO_INLINE Aggregator::executeImplCase(
method.onNewKey(*it, params.keys_size, keys, *aggregates_pool);
AggregateDataPtr place = aggregates_pool->alloc_align(total_size_of_aggregate_states,
AggregateDataPtr place = aggregates_pool->alignedAlloc(total_size_of_aggregate_states,
align_aggregate_states);
createAggregateStates(place);
aggregate_data = place;
@ -754,7 +754,7 @@ bool Aggregator::executeOnBlock(const Block & block, AggregatedDataVariants & re
if ((params.overflow_row || result.type == AggregatedDataVariants::Type::without_key) && !result.without_key)
{
AggregateDataPtr place = result.aggregates_pool->alloc_align(total_size_of_aggregate_states,
AggregateDataPtr place = result.aggregates_pool->alignedAlloc(total_size_of_aggregate_states,
align_aggregate_states);
createAggregateStates(place);
result.without_key = place;
@ -1923,7 +1923,7 @@ void NO_INLINE Aggregator::mergeStreamsImplCase(
method.onNewKey(*it, params.keys_size, keys, *aggregates_pool);
AggregateDataPtr place = aggregates_pool->alloc_align(total_size_of_aggregate_states,
AggregateDataPtr place = aggregates_pool->alignedAlloc(total_size_of_aggregate_states,
align_aggregate_states);
createAggregateStates(place);
aggregate_data = place;
@ -1975,7 +1975,7 @@ void NO_INLINE Aggregator::mergeWithoutKeyStreamsImpl(
AggregatedDataWithoutKey & res = result.without_key;
if (!res)
{
AggregateDataPtr place = result.aggregates_pool->alloc_align(total_size_of_aggregate_states,
AggregateDataPtr place = result.aggregates_pool->alignedAlloc(total_size_of_aggregate_states,
align_aggregate_states);
createAggregateStates(place);
res = place;

View File

@ -191,7 +191,8 @@ void NO_INLINE Aggregator::executeSpecializedCase(
method.onNewKey(*it, params.keys_size, keys, *aggregates_pool);
AggregateDataPtr place = aggregates_pool->alloc(total_size_of_aggregate_states);
AggregateDataPtr place = aggregates_pool->alignedAlloc(total_size_of_aggregate_states,
align_aggregate_states);
AggregateFunctionsList::forEach(AggregateFunctionsCreator(
aggregate_functions, offsets_of_aggregate_states, place));