Fixed code review issues

This commit is contained in:
Maksim Kita 2021-07-03 16:29:32 +03:00
parent b3e3a3cde0
commit 55220d49f9
5 changed files with 73 additions and 71 deletions

View File

@ -12,15 +12,15 @@ template <typename R, typename C, typename ...Args>
class FunctorToStaticMethodAdaptor<R (C::*)(Args...) const>
{
public:
static R call(C * ptr, Args... arguments)
static R call(C * ptr, Args &&... arguments)
{
return std::invoke(&C::operator(), ptr, arguments...);
return std::invoke(&C::operator(), ptr, std::forward<Args>(arguments)...);
}
static R unsafeCall(char * ptr, Args... arguments)
static R unsafeCall(char * ptr, Args &&... arguments)
{
C * ptr_typed = reinterpret_cast<C*>(ptr);
return std::invoke(&C::operator(), ptr_typed, arguments...);
return std::invoke(&C::operator(), ptr_typed, std::forward<Args>(arguments)...);
}
};
@ -28,14 +28,14 @@ template <typename R, typename C, typename ...Args>
class FunctorToStaticMethodAdaptor<R (C::*)(Args...)>
{
public:
static R call(C * ptr, Args... arguments)
static R call(C * ptr, Args &&... arguments)
{
return std::invoke(&C::operator(), ptr, arguments...);
return std::invoke(&C::operator(), ptr, std::forward<Args>(arguments)...);
}
static R unsafeCall(char * ptr, Args... arguments)
static R unsafeCall(char * ptr, Args &&... arguments)
{
C * ptr_typed = static_cast<C*>(ptr);
return std::invoke(&C::operator(), ptr_typed, arguments...);
return std::invoke(&C::operator(), ptr_typed, std::forward<Args>(arguments)...);
}
};

View File

@ -10,4 +10,44 @@ DataTypePtr IAggregateFunction::getStateType() const
return std::make_shared<DataTypeAggregateFunction>(shared_from_this(), argument_types, parameters);
}
String IAggregateFunction::getDescription() const
{
String description;
description += getName();
description += '(';
for (const auto & parameter : parameters)
{
description += parameter.dump();
description += ", ";
}
if (!parameters.empty())
{
description.pop_back();
description.pop_back();
}
description += ')';
description += '(';
for (const auto & argument_type : argument_types)
{
description += argument_type->getName();
description += ", ";
}
if (!argument_types.empty())
{
description.pop_back();
description.pop_back();
}
description += ')';
return description;
}
}

View File

@ -218,9 +218,10 @@ public:
const IColumn ** columns,
Arena * arena) const = 0;
/** Insert result of aggregate function into places with batch size.
* Also all places must be destroyed if there was exception during insert.
* If destroy_place is true. Then client must destroy aggregate places if insert throws exception.
/** Insert result of aggregate function into result column with batch size.
* If destroy_place_after_insert is true. Then implementation of this method
* must destroy aggregate place if insert state into result column was successful.
* All places that were not inserted must be destroyed if there was exception during insert into result column.
*/
virtual void insertResultIntoBatch(
size_t batch_size,
@ -228,7 +229,7 @@ public:
size_t place_offset,
IColumn & to,
Arena * arena,
bool destroy_place) const = 0;
bool destroy_place_after_insert) const = 0;
/** Destroy batch of aggregate places.
*/
@ -270,46 +271,8 @@ public:
// of true window functions, so this hack-ish interface suffices.
virtual bool isOnlyWindowFunction() const { return false; }
/// Description of AggregateFunction in form of name(argument_types)(parameters).
virtual String getDescription() const
{
String description;
description += getName();
description += '(';
for (const auto & argument_type : argument_types)
{
description += argument_type->getName();
description += ", ";
}
if (!argument_types.empty())
{
description.pop_back();
description.pop_back();
}
description += ')';
description += '(';
for (const auto & parameter : parameters)
{
description += parameter.dump();
description += ", ";
}
if (!parameters.empty())
{
description.pop_back();
description.pop_back();
}
description += ')';
return description;
}
/// Description of AggregateFunction in form of name(parameters)(argument_types).
String getDescription() const;
#if USE_EMBEDDED_COMPILER
@ -319,25 +282,25 @@ public:
/// compileCreate should generate code for initialization of aggregate function state in aggregate_data_ptr
virtual void compileCreate(llvm::IRBuilderBase & /*builder*/, llvm::Value * /*aggregate_data_ptr*/) const
{
throw Exception(getName() + " is not JIT-compilable", ErrorCodes::NOT_IMPLEMENTED);
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "{} is not JIT-compilable", getName());
}
/// compileAdd should generate code for updating aggregate function state stored in aggregate_data_ptr
virtual void compileAdd(llvm::IRBuilderBase & /*builder*/, llvm::Value * /*aggregate_data_ptr*/, const DataTypes & /*arguments_types*/, const std::vector<llvm::Value *> & /*arguments_values*/) const
{
throw Exception(getName() + " is not JIT-compilable", ErrorCodes::NOT_IMPLEMENTED);
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "{} is not JIT-compilable", getName());
}
/// compileMerge should generate code for merging aggregate function states stored in aggregate_data_dst_ptr and aggregate_data_src_ptr
virtual void compileMerge(llvm::IRBuilderBase & /*builder*/, llvm::Value * /*aggregate_data_dst_ptr*/, llvm::Value * /*aggregate_data_src_ptr*/) const
{
throw Exception(getName() + " is not JIT-compilable", ErrorCodes::NOT_IMPLEMENTED);
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "{} is not JIT-compilable", getName());
}
/// compileGetResult should generate code for getting result value from aggregate function state stored in aggregate_data_ptr
virtual llvm::Value * compileGetResult(llvm::IRBuilderBase & /*builder*/, llvm::Value * /*aggregate_data_ptr*/) const
{
throw Exception(getName() + " is not JIT-compilable", ErrorCodes::NOT_IMPLEMENTED);
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "{} is not JIT-compilable", getName());
}
#endif
@ -517,7 +480,7 @@ public:
}
}
void insertResultIntoBatch(size_t batch_size, AggregateDataPtr * places, size_t place_offset, IColumn & to, Arena * arena, bool destroy_place) const override
void insertResultIntoBatch(size_t batch_size, AggregateDataPtr * places, size_t place_offset, IColumn & to, Arena * arena, bool destroy_place_after_insert) const override
{
size_t batch_index = 0;
@ -527,15 +490,14 @@ public:
{
static_cast<const Derived *>(this)->insertResultInto(places[batch_index] + place_offset, to, arena);
if (destroy_place)
if (destroy_place_after_insert)
static_cast<const Derived *>(this)->destroy(places[batch_index] + place_offset);
}
}
catch (...)
{
for (size_t destroy_index = batch_index; destroy_index < batch_size; ++destroy_index)
if (destroy_place)
static_cast<const Derived *>(this)->destroy(places[destroy_index] + place_offset);
static_cast<const Derived *>(this)->destroy(places[destroy_index] + place_offset);
throw;
}

View File

@ -341,7 +341,6 @@ void Aggregator::compileAggregateFunctions()
is_aggregate_function_compiled[i] = function->isCompilable();
}
/// TODO: Probably better to compile more than 2 functions
if (functions_to_compile.empty())
return;
@ -636,7 +635,7 @@ void NO_INLINE Aggregator::executeImplBatch(
return;
}
/// Optimization for special case when aggregating by 8bit key.`
/// Optimization for special case when aggregating by 8bit key.
if constexpr (!no_more_keys && std::is_same_v<Method, typename decltype(AggregatedDataVariants::key8)::element_type>)
{
/// We use another method if there are aggregate functions with -Array combinator.
@ -702,6 +701,8 @@ void NO_INLINE Aggregator::executeImplBatch(
}
#if defined(MEMORY_SANITIZER)
/// We compile only functions that do not allocate some data in Arena. Only store necessary state in AggregateData place.
for (size_t aggregate_function_index = 0; aggregate_function_index < aggregate_functions.size(); ++aggregate_function_index)
{
if (!is_aggregate_function_compiled[aggregate_function_index])
@ -1384,17 +1385,18 @@ void NO_INLINE Aggregator::convertToBlockImplFinal(
auto & final_aggregate_column = final_aggregate_columns[aggregate_functions_destroy_index];
size_t offset = offsets_of_aggregate_states[aggregate_functions_destroy_index];
/** We increase aggregate_functions_destroy_index because by function contract if insertResultIntoAndDestroyBatch
/** We increase aggregate_functions_destroy_index because by function contract if insertResultIntoBatch
* throws exception, it also must destroy all necessary states.
* Then code need to continue to destroy other aggregate function states with next function index.
*/
size_t destroy_index = aggregate_functions_destroy_index;
++aggregate_functions_destroy_index;
/// For State AggregateFunction ownership of aggregate place is passed to result column after insert
bool is_state = aggregate_functions[destroy_index]->isState();
bool destroy_place = !is_state;
bool destroy_place_after_insert = !is_state;
aggregate_functions[destroy_index]->insertResultIntoBatch(places.size(), places.data(), offset, *final_aggregate_column, arena, destroy_place);
aggregate_functions[destroy_index]->insertResultIntoBatch(places.size(), places.data(), offset, *final_aggregate_column, arena, destroy_place_after_insert);
}
}
catch (...)
@ -1414,10 +1416,7 @@ void NO_INLINE Aggregator::convertToBlockImplFinal(
}
size_t offset = offsets_of_aggregate_states[aggregate_functions_destroy_index];
bool is_state = aggregate_functions[aggregate_functions_destroy_index]->isState();
if (!is_state)
aggregate_functions[aggregate_functions_destroy_index]->destroyBatch(places.size(), places.data(), offset);
aggregate_functions[aggregate_functions_destroy_index]->destroyBatch(places.size(), places.data(), offset);
}
if (exception)
@ -2015,7 +2014,6 @@ void NO_INLINE Aggregator::mergeSingleLevelDataImpl(
}
else if (res->without_key)
{
/// TODO: Use compile function
mergeDataNoMoreKeysImpl<Method>(
getDataVariant<Method>(*res).data,
res->without_key,
@ -2024,7 +2022,6 @@ void NO_INLINE Aggregator::mergeSingleLevelDataImpl(
}
else
{
/// TODO: Use compile function
mergeDataOnlyExistingKeysImpl<Method>(
getDataVariant<Method>(*res).data,
getDataVariant<Method>(current).data,

View File

@ -64,7 +64,10 @@ struct CompiledAggregateFunctions
JITMergeAggregateStatesFunction merge_aggregate_states_function;
JITInsertAggregateStatesIntoColumnsFunction insert_aggregates_into_columns_function;
/// Count of functions that were compiled
size_t functions_count;
/// Compiled module. It is client responsibility to destroy it after functions are no longer required.
CHJIT::CompiledModule compiled_module;
};