mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 07:31:57 +00:00
Fix build (move code from AggregateFunctionMLMethod.h) (#5266)
This commit is contained in:
parent
2cbbbba7c8
commit
7cb8d46338
@ -1,92 +1,110 @@
|
||||
#include <AggregateFunctions/AggregateFunctionFactory.h>
|
||||
#include <AggregateFunctions/AggregateFunctionMLMethod.h>
|
||||
#include <AggregateFunctions/Helpers.h>
|
||||
#include <AggregateFunctions/FactoryHelpers.h>
|
||||
#include "AggregateFunctionMLMethod.h"
|
||||
|
||||
#include <IO/ReadHelpers.h>
|
||||
#include <IO/WriteHelpers.h>
|
||||
#include <Interpreters/castColumn.h>
|
||||
#include <Common/FieldVisitors.h>
|
||||
#include <Common/typeid_cast.h>
|
||||
#include "AggregateFunctionFactory.h"
|
||||
#include "FactoryHelpers.h"
|
||||
#include "Helpers.h"
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace
|
||||
{
|
||||
|
||||
using FuncLinearRegression = AggregateFunctionMLMethod<LinearModelData, NameLinearRegression>;
|
||||
using FuncLogisticRegression = AggregateFunctionMLMethod<LinearModelData, NameLogisticRegression>;
|
||||
template <class Method>
|
||||
AggregateFunctionPtr createAggregateFunctionMLMethod(
|
||||
const std::string & name, const DataTypes & argument_types, const Array & parameters)
|
||||
{
|
||||
if (parameters.size() > 4)
|
||||
throw Exception("Aggregate function " + name + " requires at most four parameters: learning_rate, l2_regularization_coef, mini-batch size and weights_updater method", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
|
||||
|
||||
if (argument_types.size() < 2)
|
||||
throw Exception("Aggregate function " + name + " requires at least two arguments: target and model's parameters", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
|
||||
|
||||
for (size_t i = 0; i < argument_types.size(); ++i)
|
||||
using FuncLinearRegression = AggregateFunctionMLMethod<LinearModelData, NameLinearRegression>;
|
||||
using FuncLogisticRegression = AggregateFunctionMLMethod<LinearModelData, NameLogisticRegression>;
|
||||
template <class Method>
|
||||
AggregateFunctionPtr
|
||||
createAggregateFunctionMLMethod(const std::string & name, const DataTypes & argument_types, const Array & parameters)
|
||||
{
|
||||
if (!isNumber(argument_types[i]))
|
||||
throw Exception("Argument " + std::to_string(i) + " of type " + argument_types[i]->getName() + " must be numeric for aggregate function " + name, ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
|
||||
}
|
||||
if (parameters.size() > 4)
|
||||
throw Exception(
|
||||
"Aggregate function " + name
|
||||
+ " requires at most four parameters: learning_rate, l2_regularization_coef, mini-batch size and weights_updater "
|
||||
"method",
|
||||
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
|
||||
|
||||
/// Such default parameters were picked because they did good on some tests,
|
||||
/// though it still requires to fit parameters to achieve better result
|
||||
auto learning_rate = Float64(0.01);
|
||||
auto l2_reg_coef = Float64(0.01);
|
||||
UInt32 batch_size = 1;
|
||||
if (argument_types.size() < 2)
|
||||
throw Exception(
|
||||
"Aggregate function " + name + " requires at least two arguments: target and model's parameters",
|
||||
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
|
||||
|
||||
std::shared_ptr<IWeightsUpdater> weights_updater = std::make_shared<StochasticGradientDescent>();
|
||||
std::shared_ptr<IGradientComputer> gradient_computer;
|
||||
|
||||
if (!parameters.empty())
|
||||
{
|
||||
learning_rate = applyVisitor(FieldVisitorConvertToNumber<Float64>(), parameters[0]);
|
||||
}
|
||||
if (parameters.size() > 1)
|
||||
{
|
||||
l2_reg_coef = applyVisitor(FieldVisitorConvertToNumber<Float64>(), parameters[1]);
|
||||
}
|
||||
if (parameters.size() > 2)
|
||||
{
|
||||
batch_size = applyVisitor(FieldVisitorConvertToNumber<UInt32>(), parameters[2]);
|
||||
|
||||
}
|
||||
if (parameters.size() > 3)
|
||||
{
|
||||
if (applyVisitor(FieldVisitorToString(), parameters[3]) == "\'SGD\'")
|
||||
for (size_t i = 0; i < argument_types.size(); ++i)
|
||||
{
|
||||
weights_updater = std::make_shared<StochasticGradientDescent>();
|
||||
if (!isNumber(argument_types[i]))
|
||||
throw Exception(
|
||||
"Argument " + std::to_string(i) + " of type " + argument_types[i]->getName()
|
||||
+ " must be numeric for aggregate function " + name,
|
||||
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
|
||||
}
|
||||
else if (applyVisitor(FieldVisitorToString(), parameters[3]) == "\'Momentum\'")
|
||||
|
||||
/// Such default parameters were picked because they did good on some tests,
|
||||
/// though it still requires to fit parameters to achieve better result
|
||||
auto learning_rate = Float64(0.01);
|
||||
auto l2_reg_coef = Float64(0.01);
|
||||
UInt32 batch_size = 1;
|
||||
|
||||
std::shared_ptr<IWeightsUpdater> weights_updater = std::make_shared<StochasticGradientDescent>();
|
||||
std::shared_ptr<IGradientComputer> gradient_computer;
|
||||
|
||||
if (!parameters.empty())
|
||||
{
|
||||
weights_updater = std::make_shared<Momentum>();
|
||||
learning_rate = applyVisitor(FieldVisitorConvertToNumber<Float64>(), parameters[0]);
|
||||
}
|
||||
else if (applyVisitor(FieldVisitorToString(), parameters[3]) == "\'Nesterov\'")
|
||||
if (parameters.size() > 1)
|
||||
{
|
||||
weights_updater = std::make_shared<Nesterov>();
|
||||
l2_reg_coef = applyVisitor(FieldVisitorConvertToNumber<Float64>(), parameters[1]);
|
||||
}
|
||||
if (parameters.size() > 2)
|
||||
{
|
||||
batch_size = applyVisitor(FieldVisitorConvertToNumber<UInt32>(), parameters[2]);
|
||||
}
|
||||
if (parameters.size() > 3)
|
||||
{
|
||||
if (applyVisitor(FieldVisitorToString(), parameters[3]) == "\'SGD\'")
|
||||
{
|
||||
weights_updater = std::make_shared<StochasticGradientDescent>();
|
||||
}
|
||||
else if (applyVisitor(FieldVisitorToString(), parameters[3]) == "\'Momentum\'")
|
||||
{
|
||||
weights_updater = std::make_shared<Momentum>();
|
||||
}
|
||||
else if (applyVisitor(FieldVisitorToString(), parameters[3]) == "\'Nesterov\'")
|
||||
{
|
||||
weights_updater = std::make_shared<Nesterov>();
|
||||
}
|
||||
else
|
||||
{
|
||||
throw Exception("Invalid parameter for weights updater", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
|
||||
}
|
||||
}
|
||||
|
||||
if (std::is_same<Method, FuncLinearRegression>::value)
|
||||
{
|
||||
gradient_computer = std::make_shared<LinearRegression>();
|
||||
}
|
||||
else if (std::is_same<Method, FuncLogisticRegression>::value)
|
||||
{
|
||||
gradient_computer = std::make_shared<LogisticRegression>();
|
||||
}
|
||||
else
|
||||
{
|
||||
throw Exception("Invalid parameter for weights updater", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
|
||||
throw Exception("Such gradient computer is not implemented yet", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
|
||||
}
|
||||
}
|
||||
|
||||
if (std::is_same<Method, FuncLinearRegression>::value)
|
||||
{
|
||||
gradient_computer = std::make_shared<LinearRegression>();
|
||||
return std::make_shared<Method>(
|
||||
argument_types.size() - 1,
|
||||
gradient_computer,
|
||||
weights_updater,
|
||||
learning_rate,
|
||||
l2_reg_coef,
|
||||
batch_size,
|
||||
argument_types,
|
||||
parameters);
|
||||
}
|
||||
else if (std::is_same<Method, FuncLogisticRegression>::value)
|
||||
{
|
||||
gradient_computer = std::make_shared<LogisticRegression>();
|
||||
}
|
||||
else
|
||||
{
|
||||
throw Exception("Such gradient computer is not implemented yet", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
|
||||
}
|
||||
|
||||
return std::make_shared<Method>(argument_types.size() - 1,
|
||||
gradient_computer, weights_updater,
|
||||
learning_rate, l2_reg_coef, batch_size, argument_types, parameters);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -96,4 +114,361 @@ void registerAggregateFunctionMLMethod(AggregateFunctionFactory & factory)
|
||||
factory.registerFunction("LogisticRegression", createAggregateFunctionMLMethod<FuncLogisticRegression>);
|
||||
}
|
||||
|
||||
LinearModelData::LinearModelData(
|
||||
Float64 learning_rate,
|
||||
Float64 l2_reg_coef,
|
||||
UInt32 param_num,
|
||||
UInt32 batch_capacity,
|
||||
std::shared_ptr<DB::IGradientComputer> gradient_computer,
|
||||
std::shared_ptr<DB::IWeightsUpdater> weights_updater)
|
||||
: learning_rate(learning_rate)
|
||||
, l2_reg_coef(l2_reg_coef)
|
||||
, batch_capacity(batch_capacity)
|
||||
, batch_size(0)
|
||||
, gradient_computer(std::move(gradient_computer))
|
||||
, weights_updater(std::move(weights_updater))
|
||||
{
|
||||
weights.resize(param_num, Float64{0.0});
|
||||
gradient_batch.resize(param_num + 1, Float64{0.0});
|
||||
}
|
||||
|
||||
void LinearModelData::update_state()
|
||||
{
|
||||
if (batch_size == 0)
|
||||
return;
|
||||
|
||||
weights_updater->update(batch_size, weights, bias, gradient_batch);
|
||||
batch_size = 0;
|
||||
++iter_num;
|
||||
gradient_batch.assign(gradient_batch.size(), Float64{0.0});
|
||||
}
|
||||
|
||||
void LinearModelData::predict(
|
||||
ColumnVector<Float64>::Container & container, Block & block, const ColumnNumbers & arguments, const Context & context) const
|
||||
{
|
||||
gradient_computer->predict(container, block, arguments, weights, bias, context);
|
||||
}
|
||||
|
||||
void LinearModelData::read(ReadBuffer & buf)
|
||||
{
|
||||
readBinary(bias, buf);
|
||||
readBinary(weights, buf);
|
||||
readBinary(iter_num, buf);
|
||||
readBinary(gradient_batch, buf);
|
||||
readBinary(batch_size, buf);
|
||||
weights_updater->read(buf);
|
||||
}
|
||||
|
||||
void LinearModelData::write(WriteBuffer & buf) const
|
||||
{
|
||||
writeBinary(bias, buf);
|
||||
writeBinary(weights, buf);
|
||||
writeBinary(iter_num, buf);
|
||||
writeBinary(gradient_batch, buf);
|
||||
writeBinary(batch_size, buf);
|
||||
weights_updater->write(buf);
|
||||
}
|
||||
|
||||
void LinearModelData::merge(const DB::LinearModelData & rhs)
|
||||
{
|
||||
if (iter_num == 0 && rhs.iter_num == 0)
|
||||
return;
|
||||
|
||||
update_state();
|
||||
/// can't update rhs state because it's constant
|
||||
|
||||
Float64 frac = (static_cast<Float64>(iter_num) * iter_num) / (iter_num * iter_num + rhs.iter_num * rhs.iter_num);
|
||||
|
||||
for (size_t i = 0; i < weights.size(); ++i)
|
||||
{
|
||||
weights[i] = weights[i] * frac + rhs.weights[i] * (1 - frac);
|
||||
}
|
||||
bias = bias * frac + rhs.bias * (1 - frac);
|
||||
|
||||
iter_num += rhs.iter_num;
|
||||
weights_updater->merge(*rhs.weights_updater, frac, 1 - frac);
|
||||
}
|
||||
|
||||
void LinearModelData::add(const IColumn ** columns, size_t row_num)
|
||||
{
|
||||
/// first column stores target; features start from (columns + 1)
|
||||
const auto target = (*columns[0])[row_num].get<Float64>();
|
||||
/// Here we have columns + 1 as first column corresponds to target value, and others - to features
|
||||
weights_updater->add_to_batch(
|
||||
gradient_batch, *gradient_computer, weights, bias, learning_rate, l2_reg_coef, target, columns + 1, row_num);
|
||||
|
||||
++batch_size;
|
||||
if (batch_size == batch_capacity)
|
||||
{
|
||||
update_state();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void Nesterov::read(ReadBuffer & buf)
|
||||
{
|
||||
readBinary(accumulated_gradient, buf);
|
||||
}
|
||||
|
||||
void Nesterov::write(WriteBuffer & buf) const
|
||||
{
|
||||
writeBinary(accumulated_gradient, buf);
|
||||
}
|
||||
|
||||
void Nesterov::merge(const IWeightsUpdater & rhs, Float64 frac, Float64 rhs_frac)
|
||||
{
|
||||
auto & nesterov_rhs = static_cast<const Nesterov &>(rhs);
|
||||
for (size_t i = 0; i < accumulated_gradient.size(); ++i)
|
||||
{
|
||||
accumulated_gradient[i] = accumulated_gradient[i] * frac + nesterov_rhs.accumulated_gradient[i] * rhs_frac;
|
||||
}
|
||||
}
|
||||
|
||||
void Nesterov::update(UInt32 batch_size, std::vector<Float64> & weights, Float64 & bias, const std::vector<Float64> & batch_gradient)
|
||||
{
|
||||
if (accumulated_gradient.empty())
|
||||
{
|
||||
accumulated_gradient.resize(batch_gradient.size(), Float64{0.0});
|
||||
}
|
||||
|
||||
for (size_t i = 0; i < batch_gradient.size(); ++i)
|
||||
{
|
||||
accumulated_gradient[i] = accumulated_gradient[i] * alpha_ + batch_gradient[i] / batch_size;
|
||||
}
|
||||
for (size_t i = 0; i < weights.size(); ++i)
|
||||
{
|
||||
weights[i] += accumulated_gradient[i];
|
||||
}
|
||||
bias += accumulated_gradient[weights.size()];
|
||||
}
|
||||
|
||||
void Nesterov::add_to_batch(
|
||||
std::vector<Float64> & batch_gradient,
|
||||
IGradientComputer & gradient_computer,
|
||||
const std::vector<Float64> & weights,
|
||||
Float64 bias,
|
||||
Float64 learning_rate,
|
||||
Float64 l2_reg_coef,
|
||||
Float64 target,
|
||||
const IColumn ** columns,
|
||||
size_t row_num)
|
||||
{
|
||||
if (accumulated_gradient.empty())
|
||||
{
|
||||
accumulated_gradient.resize(batch_gradient.size(), Float64{0.0});
|
||||
}
|
||||
|
||||
std::vector<Float64> shifted_weights(weights.size());
|
||||
for (size_t i = 0; i != shifted_weights.size(); ++i)
|
||||
{
|
||||
shifted_weights[i] = weights[i] + accumulated_gradient[i] * alpha_;
|
||||
}
|
||||
auto shifted_bias = bias + accumulated_gradient[weights.size()] * alpha_;
|
||||
|
||||
gradient_computer.compute(batch_gradient, shifted_weights, shifted_bias, learning_rate, l2_reg_coef, target, columns, row_num);
|
||||
}
|
||||
|
||||
void Momentum::read(ReadBuffer & buf)
|
||||
{
|
||||
readBinary(accumulated_gradient, buf);
|
||||
}
|
||||
|
||||
void Momentum::write(WriteBuffer & buf) const
|
||||
{
|
||||
writeBinary(accumulated_gradient, buf);
|
||||
}
|
||||
|
||||
void Momentum::merge(const IWeightsUpdater & rhs, Float64 frac, Float64 rhs_frac)
|
||||
{
|
||||
auto & momentum_rhs = static_cast<const Momentum &>(rhs);
|
||||
for (size_t i = 0; i < accumulated_gradient.size(); ++i)
|
||||
{
|
||||
accumulated_gradient[i] = accumulated_gradient[i] * frac + momentum_rhs.accumulated_gradient[i] * rhs_frac;
|
||||
}
|
||||
}
|
||||
|
||||
void Momentum::update(UInt32 batch_size, std::vector<Float64> & weights, Float64 & bias, const std::vector<Float64> & batch_gradient)
|
||||
{
|
||||
/// batch_size is already checked to be greater than 0
|
||||
if (accumulated_gradient.empty())
|
||||
{
|
||||
accumulated_gradient.resize(batch_gradient.size(), Float64{0.0});
|
||||
}
|
||||
|
||||
for (size_t i = 0; i < batch_gradient.size(); ++i)
|
||||
{
|
||||
accumulated_gradient[i] = accumulated_gradient[i] * alpha_ + batch_gradient[i] / batch_size;
|
||||
}
|
||||
for (size_t i = 0; i < weights.size(); ++i)
|
||||
{
|
||||
weights[i] += accumulated_gradient[i];
|
||||
}
|
||||
bias += accumulated_gradient[weights.size()];
|
||||
}
|
||||
|
||||
void StochasticGradientDescent::update(
|
||||
UInt32 batch_size, std::vector<Float64> & weights, Float64 & bias, const std::vector<Float64> & batch_gradient)
|
||||
{
|
||||
/// batch_size is already checked to be greater than 0
|
||||
for (size_t i = 0; i < weights.size(); ++i)
|
||||
{
|
||||
weights[i] += batch_gradient[i] / batch_size;
|
||||
}
|
||||
bias += batch_gradient[weights.size()] / batch_size;
|
||||
}
|
||||
|
||||
void IWeightsUpdater::add_to_batch(
|
||||
std::vector<Float64> & batch_gradient,
|
||||
IGradientComputer & gradient_computer,
|
||||
const std::vector<Float64> & weights,
|
||||
Float64 bias,
|
||||
Float64 learning_rate,
|
||||
Float64 l2_reg_coef,
|
||||
Float64 target,
|
||||
const IColumn ** columns,
|
||||
size_t row_num)
|
||||
{
|
||||
gradient_computer.compute(batch_gradient, weights, bias, learning_rate, l2_reg_coef, target, columns, row_num);
|
||||
}
|
||||
|
||||
void LogisticRegression::predict(
|
||||
ColumnVector<Float64>::Container & container,
|
||||
Block & block,
|
||||
const ColumnNumbers & arguments,
|
||||
const std::vector<Float64> & weights,
|
||||
Float64 bias,
|
||||
const Context & context) const
|
||||
{
|
||||
size_t rows_num = block.rows();
|
||||
std::vector<Float64> results(rows_num, bias);
|
||||
|
||||
for (size_t i = 1; i < arguments.size(); ++i)
|
||||
{
|
||||
const ColumnWithTypeAndName & cur_col = block.getByPosition(arguments[i]);
|
||||
if (!isNumber(cur_col.type))
|
||||
{
|
||||
throw Exception("Prediction arguments must have numeric type", ErrorCodes::BAD_ARGUMENTS);
|
||||
}
|
||||
|
||||
/// If column type is already Float64 then castColumn simply returns it
|
||||
auto features_col_ptr = castColumn(cur_col, std::make_shared<DataTypeFloat64>(), context);
|
||||
auto features_column = typeid_cast<const ColumnFloat64 *>(features_col_ptr.get());
|
||||
|
||||
if (!features_column)
|
||||
{
|
||||
throw Exception("Unexpectedly cannot dynamically cast features column " + std::to_string(i), ErrorCodes::LOGICAL_ERROR);
|
||||
}
|
||||
|
||||
for (size_t row_num = 0; row_num != rows_num; ++row_num)
|
||||
{
|
||||
results[row_num] += weights[i - 1] * features_column->getElement(row_num);
|
||||
}
|
||||
}
|
||||
|
||||
container.reserve(rows_num);
|
||||
for (size_t row_num = 0; row_num != rows_num; ++row_num)
|
||||
{
|
||||
container.emplace_back(1 / (1 + exp(-results[row_num])));
|
||||
}
|
||||
}
|
||||
|
||||
void LogisticRegression::compute(
|
||||
std::vector<Float64> & batch_gradient,
|
||||
const std::vector<Float64> & weights,
|
||||
Float64 bias,
|
||||
Float64 learning_rate,
|
||||
Float64 l2_reg_coef,
|
||||
Float64 target,
|
||||
const IColumn ** columns,
|
||||
size_t row_num)
|
||||
{
|
||||
Float64 derivative = bias;
|
||||
for (size_t i = 0; i < weights.size(); ++i)
|
||||
{
|
||||
auto value = (*columns[i])[row_num].get<Float64>();
|
||||
derivative += weights[i] * value;
|
||||
}
|
||||
derivative *= target;
|
||||
derivative = exp(derivative);
|
||||
|
||||
batch_gradient[weights.size()] += learning_rate * target / (derivative + 1);
|
||||
for (size_t i = 0; i < weights.size(); ++i)
|
||||
{
|
||||
auto value = (*columns[i])[row_num].get<Float64>();
|
||||
batch_gradient[i] += learning_rate * target * value / (derivative + 1) - 2 * l2_reg_coef * weights[i];
|
||||
}
|
||||
}
|
||||
|
||||
void LinearRegression::predict(
|
||||
ColumnVector<Float64>::Container & container,
|
||||
Block & block,
|
||||
const ColumnNumbers & arguments,
|
||||
const std::vector<Float64> & weights,
|
||||
Float64 bias,
|
||||
const Context & context) const
|
||||
{
|
||||
if (weights.size() + 1 != arguments.size())
|
||||
{
|
||||
throw Exception("In predict function number of arguments differs from the size of weights vector", ErrorCodes::LOGICAL_ERROR);
|
||||
}
|
||||
|
||||
size_t rows_num = block.rows();
|
||||
std::vector<Float64> results(rows_num, bias);
|
||||
|
||||
for (size_t i = 1; i < arguments.size(); ++i)
|
||||
{
|
||||
const ColumnWithTypeAndName & cur_col = block.getByPosition(arguments[i]);
|
||||
if (!isNumber(cur_col.type))
|
||||
{
|
||||
throw Exception("Prediction arguments must have numeric type", ErrorCodes::BAD_ARGUMENTS);
|
||||
}
|
||||
|
||||
/// If column type is already Float64 then castColumn simply returns it
|
||||
auto features_col_ptr = castColumn(cur_col, std::make_shared<DataTypeFloat64>(), context);
|
||||
auto features_column = typeid_cast<const ColumnFloat64 *>(features_col_ptr.get());
|
||||
|
||||
if (!features_column)
|
||||
{
|
||||
throw Exception("Unexpectedly cannot dynamically cast features column " + std::to_string(i), ErrorCodes::LOGICAL_ERROR);
|
||||
}
|
||||
|
||||
for (size_t row_num = 0; row_num != rows_num; ++row_num)
|
||||
{
|
||||
results[row_num] += weights[i - 1] * features_column->getElement(row_num);
|
||||
}
|
||||
}
|
||||
|
||||
container.reserve(rows_num);
|
||||
for (size_t row_num = 0; row_num != rows_num; ++row_num)
|
||||
{
|
||||
container.emplace_back(results[row_num]);
|
||||
}
|
||||
}
|
||||
|
||||
void LinearRegression::compute(
|
||||
std::vector<Float64> & batch_gradient,
|
||||
const std::vector<Float64> & weights,
|
||||
Float64 bias,
|
||||
Float64 learning_rate,
|
||||
Float64 l2_reg_coef,
|
||||
Float64 target,
|
||||
const IColumn ** columns,
|
||||
size_t row_num)
|
||||
{
|
||||
Float64 derivative = (target - bias);
|
||||
for (size_t i = 0; i < weights.size(); ++i)
|
||||
{
|
||||
auto value = (*columns[i])[row_num].get<Float64>();
|
||||
derivative -= weights[i] * value;
|
||||
}
|
||||
derivative *= (2 * learning_rate);
|
||||
|
||||
batch_gradient[weights.size()] += derivative;
|
||||
for (size_t i = 0; i < weights.size(); ++i)
|
||||
{
|
||||
auto value = (*columns[i])[row_num].get<Float64>();
|
||||
batch_gradient[i] += derivative * value - 2 * l2_reg_coef * weights[i];
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -1,27 +1,13 @@
|
||||
#pragma once
|
||||
|
||||
#include <type_traits>
|
||||
|
||||
#include <IO/WriteHelpers.h>
|
||||
#include <IO/ReadHelpers.h>
|
||||
|
||||
#include <DataTypes/DataTypesNumber.h>
|
||||
#include <DataTypes/DataTypesDecimal.h>
|
||||
|
||||
#include <Common/FieldVisitors.h>
|
||||
#include <Columns/ColumnVector.h>
|
||||
#include <Columns/ColumnsCommon.h>
|
||||
#include <Columns/ColumnsNumber.h>
|
||||
#include <AggregateFunctions/IAggregateFunction.h>
|
||||
#include <Functions/FunctionHelpers.h>
|
||||
#include <Interpreters/castColumn.h>
|
||||
|
||||
#include <cmath>
|
||||
#include <exception>
|
||||
#include <DataTypes/DataTypesNumber.h>
|
||||
#include "IAggregateFunction.h"
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
|
||||
@ -34,156 +20,79 @@ GradientComputer class computes gradient according to its loss function
|
||||
class IGradientComputer
|
||||
{
|
||||
public:
|
||||
IGradientComputer()
|
||||
{}
|
||||
IGradientComputer() {}
|
||||
|
||||
virtual ~IGradientComputer() = default;
|
||||
|
||||
/// Adds computed gradient in new point (weights, bias) to batch_gradient
|
||||
virtual void compute(std::vector<Float64> & batch_gradient, const std::vector<Float64> &weights, Float64 bias,
|
||||
Float64 learning_rate, Float64 l2_reg_coef, Float64 target, const IColumn **columns, size_t row_num) = 0;
|
||||
virtual void compute(
|
||||
std::vector<Float64> & batch_gradient,
|
||||
const std::vector<Float64> & weights,
|
||||
Float64 bias,
|
||||
Float64 learning_rate,
|
||||
Float64 l2_reg_coef,
|
||||
Float64 target,
|
||||
const IColumn ** columns,
|
||||
size_t row_num)
|
||||
= 0;
|
||||
|
||||
virtual void predict(ColumnVector<Float64>::Container &container,
|
||||
Block &block, const ColumnNumbers &arguments,
|
||||
const std::vector<Float64> &weights,
|
||||
Float64 bias, const Context & context) const = 0;
|
||||
virtual void predict(
|
||||
ColumnVector<Float64>::Container & container,
|
||||
Block & block,
|
||||
const ColumnNumbers & arguments,
|
||||
const std::vector<Float64> & weights,
|
||||
Float64 bias,
|
||||
const Context & context) const = 0;
|
||||
};
|
||||
|
||||
|
||||
class LinearRegression : public IGradientComputer
|
||||
{
|
||||
public:
|
||||
LinearRegression()
|
||||
{}
|
||||
LinearRegression() {}
|
||||
|
||||
void compute(std::vector<Float64> & batch_gradient, const std::vector<Float64> &weights, Float64 bias,
|
||||
Float64 learning_rate, Float64 l2_reg_coef, Float64 target, const IColumn **columns, size_t row_num) override
|
||||
{
|
||||
Float64 derivative = (target - bias);
|
||||
for (size_t i = 0; i < weights.size(); ++i)
|
||||
{
|
||||
auto value = (*columns[i])[row_num].get<Float64>();
|
||||
derivative -= weights[i] * value;
|
||||
}
|
||||
derivative *= (2 * learning_rate);
|
||||
void compute(
|
||||
std::vector<Float64> & batch_gradient,
|
||||
const std::vector<Float64> & weights,
|
||||
Float64 bias,
|
||||
Float64 learning_rate,
|
||||
Float64 l2_reg_coef,
|
||||
Float64 target,
|
||||
const IColumn ** columns,
|
||||
size_t row_num) override;
|
||||
|
||||
batch_gradient[weights.size()] += derivative;
|
||||
for (size_t i = 0; i < weights.size(); ++i)
|
||||
{
|
||||
auto value = (*columns[i])[row_num].get<Float64>();
|
||||
batch_gradient[i] += derivative * value - 2 * l2_reg_coef * weights[i];
|
||||
}
|
||||
}
|
||||
|
||||
void predict(ColumnVector<Float64>::Container &container,
|
||||
Block &block,
|
||||
const ColumnNumbers &arguments,
|
||||
const std::vector<Float64> &weights, Float64 bias, const Context & context) const override
|
||||
{
|
||||
if (weights.size() + 1 != arguments.size())
|
||||
{
|
||||
throw Exception("In predict function number of arguments differs from the size of weights vector", ErrorCodes::LOGICAL_ERROR);
|
||||
}
|
||||
|
||||
size_t rows_num = block.rows();
|
||||
std::vector<Float64> results(rows_num, bias);
|
||||
|
||||
for (size_t i = 1; i < arguments.size(); ++i)
|
||||
{
|
||||
const ColumnWithTypeAndName & cur_col = block.getByPosition(arguments[i]);
|
||||
if (!isNumber(cur_col.type))
|
||||
{
|
||||
throw Exception("Prediction arguments must have numeric type", ErrorCodes::BAD_ARGUMENTS);
|
||||
}
|
||||
|
||||
/// If column type is already Float64 then castColumn simply returns it
|
||||
auto features_col_ptr = castColumn(cur_col, std::make_shared<DataTypeFloat64>(), context);
|
||||
auto features_column = typeid_cast<const ColumnFloat64 *>(features_col_ptr.get());
|
||||
|
||||
if (!features_column)
|
||||
{
|
||||
throw Exception("Unexpectedly cannot dynamically cast features column " + std::to_string(i), ErrorCodes::LOGICAL_ERROR);
|
||||
}
|
||||
|
||||
for (size_t row_num = 0; row_num != rows_num; ++row_num)
|
||||
{
|
||||
results[row_num] += weights[i - 1] * features_column->getElement(row_num);
|
||||
}
|
||||
}
|
||||
|
||||
container.reserve(rows_num);
|
||||
for (size_t row_num = 0; row_num != rows_num; ++row_num)
|
||||
{
|
||||
container.emplace_back(results[row_num]);
|
||||
}
|
||||
}
|
||||
void predict(
|
||||
ColumnVector<Float64>::Container & container,
|
||||
Block & block,
|
||||
const ColumnNumbers & arguments,
|
||||
const std::vector<Float64> & weights,
|
||||
Float64 bias,
|
||||
const Context & context) const override;
|
||||
};
|
||||
|
||||
|
||||
class LogisticRegression : public IGradientComputer
|
||||
{
|
||||
public:
|
||||
LogisticRegression()
|
||||
{}
|
||||
LogisticRegression() {}
|
||||
|
||||
void compute(std::vector<Float64> & batch_gradient, const std::vector<Float64> & weights, Float64 bias,
|
||||
Float64 learning_rate, Float64 l2_reg_coef, Float64 target, const IColumn **columns, size_t row_num) override
|
||||
{
|
||||
Float64 derivative = bias;
|
||||
for (size_t i = 0; i < weights.size(); ++i)
|
||||
{
|
||||
auto value = (*columns[i])[row_num].get<Float64>();
|
||||
derivative += weights[i] * value;
|
||||
}
|
||||
derivative *= target;
|
||||
derivative = exp(derivative);
|
||||
void compute(
|
||||
std::vector<Float64> & batch_gradient,
|
||||
const std::vector<Float64> & weights,
|
||||
Float64 bias,
|
||||
Float64 learning_rate,
|
||||
Float64 l2_reg_coef,
|
||||
Float64 target,
|
||||
const IColumn ** columns,
|
||||
size_t row_num) override;
|
||||
|
||||
batch_gradient[weights.size()] += learning_rate * target / (derivative + 1);
|
||||
for (size_t i = 0; i < weights.size(); ++i)
|
||||
{
|
||||
auto value = (*columns[i])[row_num].get<Float64>();
|
||||
batch_gradient[i] += learning_rate * target * value / (derivative + 1)
|
||||
- 2 * l2_reg_coef * weights[i];
|
||||
}
|
||||
}
|
||||
|
||||
void predict(ColumnVector<Float64>::Container & container,
|
||||
Block & block,
|
||||
const ColumnNumbers & arguments,
|
||||
const std::vector<Float64> & weights, Float64 bias, const Context & context) const override
|
||||
{
|
||||
size_t rows_num = block.rows();
|
||||
std::vector<Float64> results(rows_num, bias);
|
||||
|
||||
for (size_t i = 1; i < arguments.size(); ++i)
|
||||
{
|
||||
const ColumnWithTypeAndName & cur_col = block.getByPosition(arguments[i]);
|
||||
if (!isNumber(cur_col.type))
|
||||
{
|
||||
throw Exception("Prediction arguments must have numeric type", ErrorCodes::BAD_ARGUMENTS);
|
||||
}
|
||||
|
||||
/// If column type is already Float64 then castColumn simply returns it
|
||||
auto features_col_ptr = castColumn(cur_col, std::make_shared<DataTypeFloat64>(), context);
|
||||
auto features_column = typeid_cast<const ColumnFloat64 *>(features_col_ptr.get());
|
||||
|
||||
if (!features_column)
|
||||
{
|
||||
throw Exception("Unexpectedly cannot dynamically cast features column " + std::to_string(i), ErrorCodes::LOGICAL_ERROR);
|
||||
}
|
||||
|
||||
for (size_t row_num = 0; row_num != rows_num; ++row_num)
|
||||
{
|
||||
results[row_num] += weights[i - 1] * features_column->getElement(row_num);
|
||||
}
|
||||
}
|
||||
|
||||
container.reserve(rows_num);
|
||||
for (size_t row_num = 0; row_num != rows_num; ++row_num)
|
||||
{
|
||||
container.emplace_back(1 / (1 + exp(-results[row_num])));
|
||||
}
|
||||
}
|
||||
void predict(
|
||||
ColumnVector<Float64>::Container & container,
|
||||
Block & block,
|
||||
const ColumnNumbers & arguments,
|
||||
const std::vector<Float64> & weights,
|
||||
Float64 bias,
|
||||
const Context & context) const override;
|
||||
};
|
||||
|
||||
|
||||
@ -197,98 +106,52 @@ public:
|
||||
virtual ~IWeightsUpdater() = default;
|
||||
|
||||
/// Calls GradientComputer to update current mini-batch
|
||||
virtual void add_to_batch(std::vector<Float64> & batch_gradient, IGradientComputer & gradient_computer,
|
||||
const std::vector<Float64> & weights, Float64 bias,
|
||||
Float64 learning_rate, Float64 l2_reg_coef, Float64 target, const IColumn **columns, size_t row_num)
|
||||
{
|
||||
gradient_computer.compute(batch_gradient, weights, bias, learning_rate, l2_reg_coef, target, columns, row_num);
|
||||
}
|
||||
virtual void add_to_batch(
|
||||
std::vector<Float64> & batch_gradient,
|
||||
IGradientComputer & gradient_computer,
|
||||
const std::vector<Float64> & weights,
|
||||
Float64 bias,
|
||||
Float64 learning_rate,
|
||||
Float64 l2_reg_coef,
|
||||
Float64 target,
|
||||
const IColumn ** columns,
|
||||
size_t row_num);
|
||||
|
||||
/// Updates current weights according to the gradient from the last mini-batch
|
||||
virtual void update(UInt32 batch_size,
|
||||
std::vector<Float64> & weights, Float64 & bias,
|
||||
const std::vector<Float64> & gradient) = 0;
|
||||
virtual void update(UInt32 batch_size, std::vector<Float64> & weights, Float64 & bias, const std::vector<Float64> & gradient) = 0;
|
||||
|
||||
/// Used during the merge of two states
|
||||
virtual void merge(const IWeightsUpdater &, Float64, Float64)
|
||||
{}
|
||||
virtual void merge(const IWeightsUpdater &, Float64, Float64) {}
|
||||
|
||||
/// Used for serialization when necessary
|
||||
virtual void write(WriteBuffer &) const
|
||||
{}
|
||||
virtual void write(WriteBuffer &) const {}
|
||||
|
||||
/// Used for serialization when necessary
|
||||
virtual void read(ReadBuffer &)
|
||||
{}
|
||||
virtual void read(ReadBuffer &) {}
|
||||
};
|
||||
|
||||
|
||||
class StochasticGradientDescent : public IWeightsUpdater
|
||||
{
|
||||
public:
|
||||
void update(UInt32 batch_size,
|
||||
std::vector<Float64> & weights, Float64 & bias,
|
||||
const std::vector<Float64> & batch_gradient) override
|
||||
{
|
||||
/// batch_size is already checked to be greater than 0
|
||||
for (size_t i = 0; i < weights.size(); ++i)
|
||||
{
|
||||
weights[i] += batch_gradient[i] / batch_size;
|
||||
}
|
||||
bias += batch_gradient[weights.size()] / batch_size;
|
||||
}
|
||||
void update(UInt32 batch_size, std::vector<Float64> & weights, Float64 & bias, const std::vector<Float64> & batch_gradient) override;
|
||||
};
|
||||
|
||||
|
||||
class Momentum : public IWeightsUpdater
|
||||
{
|
||||
public:
|
||||
Momentum()
|
||||
{}
|
||||
Momentum() {}
|
||||
|
||||
Momentum(Float64 alpha) : alpha_(alpha)
|
||||
{}
|
||||
Momentum(Float64 alpha) : alpha_(alpha) {}
|
||||
|
||||
void update(UInt32 batch_size,
|
||||
std::vector<Float64> & weights, Float64 & bias,
|
||||
const std::vector<Float64> & batch_gradient) override
|
||||
{
|
||||
/// batch_size is already checked to be greater than 0
|
||||
if (accumulated_gradient.empty())
|
||||
{
|
||||
accumulated_gradient.resize(batch_gradient.size(), Float64{0.0});
|
||||
}
|
||||
void update(UInt32 batch_size, std::vector<Float64> & weights, Float64 & bias, const std::vector<Float64> & batch_gradient) override;
|
||||
|
||||
for (size_t i = 0; i < batch_gradient.size(); ++i)
|
||||
{
|
||||
accumulated_gradient[i] = accumulated_gradient[i] * alpha_ + batch_gradient[i] / batch_size;
|
||||
}
|
||||
for (size_t i = 0; i < weights.size(); ++i)
|
||||
{
|
||||
weights[i] += accumulated_gradient[i];
|
||||
}
|
||||
bias += accumulated_gradient[weights.size()];
|
||||
}
|
||||
virtual void merge(const IWeightsUpdater & rhs, Float64 frac, Float64 rhs_frac) override;
|
||||
|
||||
virtual void merge(const IWeightsUpdater & rhs, Float64 frac, Float64 rhs_frac) override
|
||||
{
|
||||
auto &momentum_rhs = static_cast<const Momentum &>(rhs);
|
||||
for (size_t i = 0; i < accumulated_gradient.size(); ++i)
|
||||
{
|
||||
accumulated_gradient[i] = accumulated_gradient[i] * frac +
|
||||
momentum_rhs.accumulated_gradient[i] * rhs_frac;
|
||||
}
|
||||
}
|
||||
void write(WriteBuffer & buf) const override;
|
||||
|
||||
void write(WriteBuffer &buf) const override
|
||||
{
|
||||
writeBinary(accumulated_gradient, buf);
|
||||
}
|
||||
|
||||
void read(ReadBuffer &buf) override
|
||||
{
|
||||
readBinary(accumulated_gradient, buf);
|
||||
}
|
||||
void read(ReadBuffer & buf) override;
|
||||
|
||||
private:
|
||||
Float64 alpha_{0.1};
|
||||
@ -299,70 +162,28 @@ private:
|
||||
class Nesterov : public IWeightsUpdater
|
||||
{
|
||||
public:
|
||||
Nesterov()
|
||||
{}
|
||||
Nesterov() {}
|
||||
|
||||
Nesterov(Float64 alpha) : alpha_(alpha)
|
||||
{}
|
||||
Nesterov(Float64 alpha) : alpha_(alpha) {}
|
||||
|
||||
void add_to_batch(std::vector<Float64> & batch_gradient, IGradientComputer & gradient_computer,
|
||||
const std::vector<Float64> & weights, Float64 bias,
|
||||
Float64 learning_rate, Float64 l2_reg_coef, Float64 target, const IColumn ** columns, size_t row_num) override
|
||||
{
|
||||
if (accumulated_gradient.empty())
|
||||
{
|
||||
accumulated_gradient.resize(batch_gradient.size(), Float64{0.0});
|
||||
}
|
||||
void add_to_batch(
|
||||
std::vector<Float64> & batch_gradient,
|
||||
IGradientComputer & gradient_computer,
|
||||
const std::vector<Float64> & weights,
|
||||
Float64 bias,
|
||||
Float64 learning_rate,
|
||||
Float64 l2_reg_coef,
|
||||
Float64 target,
|
||||
const IColumn ** columns,
|
||||
size_t row_num) override;
|
||||
|
||||
std::vector<Float64> shifted_weights(weights.size());
|
||||
for (size_t i = 0; i != shifted_weights.size(); ++i)
|
||||
{
|
||||
shifted_weights[i] = weights[i] + accumulated_gradient[i] * alpha_;
|
||||
}
|
||||
auto shifted_bias = bias + accumulated_gradient[weights.size()] * alpha_;
|
||||
void update(UInt32 batch_size, std::vector<Float64> & weights, Float64 & bias, const std::vector<Float64> & batch_gradient) override;
|
||||
|
||||
gradient_computer.compute(batch_gradient, shifted_weights, shifted_bias, learning_rate, l2_reg_coef, target, columns, row_num);
|
||||
}
|
||||
virtual void merge(const IWeightsUpdater & rhs, Float64 frac, Float64 rhs_frac) override;
|
||||
|
||||
void update(UInt32 batch_size,
|
||||
std::vector<Float64> & weights, Float64 & bias,
|
||||
const std::vector<Float64> & batch_gradient) override
|
||||
{
|
||||
if (accumulated_gradient.empty())
|
||||
{
|
||||
accumulated_gradient.resize(batch_gradient.size(), Float64{0.0});
|
||||
}
|
||||
void write(WriteBuffer & buf) const override;
|
||||
|
||||
for (size_t i = 0; i < batch_gradient.size(); ++i)
|
||||
{
|
||||
accumulated_gradient[i] = accumulated_gradient[i] * alpha_ + batch_gradient[i] / batch_size;
|
||||
}
|
||||
for (size_t i = 0; i < weights.size(); ++i)
|
||||
{
|
||||
weights[i] += accumulated_gradient[i];
|
||||
}
|
||||
bias += accumulated_gradient[weights.size()];
|
||||
}
|
||||
|
||||
virtual void merge(const IWeightsUpdater & rhs, Float64 frac, Float64 rhs_frac) override
|
||||
{
|
||||
auto & nesterov_rhs = static_cast<const Nesterov &>(rhs);
|
||||
for (size_t i = 0; i < accumulated_gradient.size(); ++i)
|
||||
{
|
||||
accumulated_gradient[i] =
|
||||
accumulated_gradient[i] * frac + nesterov_rhs.accumulated_gradient[i] * rhs_frac;
|
||||
}
|
||||
}
|
||||
|
||||
void write(WriteBuffer &buf) const override
|
||||
{
|
||||
writeBinary(accumulated_gradient, buf);
|
||||
}
|
||||
|
||||
void read(ReadBuffer &buf) override
|
||||
{
|
||||
readBinary(accumulated_gradient, buf);
|
||||
}
|
||||
void read(ReadBuffer & buf) override;
|
||||
|
||||
private:
|
||||
Float64 alpha_{0.1};
|
||||
@ -376,85 +197,26 @@ private:
|
||||
class LinearModelData
|
||||
{
|
||||
public:
|
||||
LinearModelData()
|
||||
{}
|
||||
LinearModelData() {}
|
||||
|
||||
LinearModelData(Float64 learning_rate,
|
||||
Float64 l2_reg_coef,
|
||||
UInt32 param_num,
|
||||
UInt32 batch_capacity,
|
||||
std::shared_ptr<IGradientComputer> gradient_computer,
|
||||
std::shared_ptr<IWeightsUpdater> weights_updater)
|
||||
: learning_rate(learning_rate),
|
||||
l2_reg_coef(l2_reg_coef),
|
||||
batch_capacity(batch_capacity),
|
||||
batch_size(0),
|
||||
gradient_computer(std::move(gradient_computer)),
|
||||
weights_updater(std::move(weights_updater))
|
||||
{
|
||||
weights.resize(param_num, Float64{0.0});
|
||||
gradient_batch.resize(param_num + 1, Float64{0.0});
|
||||
}
|
||||
LinearModelData(
|
||||
Float64 learning_rate,
|
||||
Float64 l2_reg_coef,
|
||||
UInt32 param_num,
|
||||
UInt32 batch_capacity,
|
||||
std::shared_ptr<IGradientComputer> gradient_computer,
|
||||
std::shared_ptr<IWeightsUpdater> weights_updater);
|
||||
|
||||
void add(const IColumn **columns, size_t row_num)
|
||||
{
|
||||
/// first column stores target; features start from (columns + 1)
|
||||
const auto target = (*columns[0])[row_num].get<Float64>();
|
||||
/// Here we have columns + 1 as first column corresponds to target value, and others - to features
|
||||
weights_updater->add_to_batch(gradient_batch, *gradient_computer,
|
||||
weights, bias, learning_rate, l2_reg_coef, target, columns + 1, row_num);
|
||||
void add(const IColumn ** columns, size_t row_num);
|
||||
|
||||
++batch_size;
|
||||
if (batch_size == batch_capacity)
|
||||
{
|
||||
update_state();
|
||||
}
|
||||
}
|
||||
void merge(const LinearModelData & rhs);
|
||||
|
||||
void merge(const LinearModelData &rhs)
|
||||
{
|
||||
if (iter_num == 0 && rhs.iter_num == 0)
|
||||
return;
|
||||
void write(WriteBuffer & buf) const;
|
||||
|
||||
update_state();
|
||||
/// can't update rhs state because it's constant
|
||||
void read(ReadBuffer & buf);
|
||||
|
||||
Float64 frac = (static_cast<Float64>(iter_num) * iter_num) / (iter_num * iter_num + rhs.iter_num * rhs.iter_num);
|
||||
|
||||
for (size_t i = 0; i < weights.size(); ++i)
|
||||
{
|
||||
weights[i] = weights[i] * frac + rhs.weights[i] * (1 - frac);
|
||||
}
|
||||
bias = bias * frac + rhs.bias * (1 - frac);
|
||||
|
||||
iter_num += rhs.iter_num;
|
||||
weights_updater->merge(*rhs.weights_updater, frac, 1 - frac);
|
||||
}
|
||||
|
||||
void write(WriteBuffer &buf) const
|
||||
{
|
||||
writeBinary(bias, buf);
|
||||
writeBinary(weights, buf);
|
||||
writeBinary(iter_num, buf);
|
||||
writeBinary(gradient_batch, buf);
|
||||
writeBinary(batch_size, buf);
|
||||
weights_updater->write(buf);
|
||||
}
|
||||
|
||||
void read(ReadBuffer &buf)
|
||||
{
|
||||
readBinary(bias, buf);
|
||||
readBinary(weights, buf);
|
||||
readBinary(iter_num, buf);
|
||||
readBinary(gradient_batch, buf);
|
||||
readBinary(batch_size, buf);
|
||||
weights_updater->read(buf);
|
||||
}
|
||||
|
||||
void predict(ColumnVector<Float64>::Container &container, Block &block, const ColumnNumbers &arguments, const Context & context) const
|
||||
{
|
||||
gradient_computer->predict(container, block, arguments, weights, bias, context);
|
||||
}
|
||||
void
|
||||
predict(ColumnVector<Float64>::Container & container, Block & block, const ColumnNumbers & arguments, const Context & context) const;
|
||||
|
||||
private:
|
||||
std::vector<Float64> weights;
|
||||
@ -474,16 +236,7 @@ private:
|
||||
/**
|
||||
* The function is called when we want to flush current batch and update our weights
|
||||
*/
|
||||
void update_state()
|
||||
{
|
||||
if (batch_size == 0)
|
||||
return;
|
||||
|
||||
weights_updater->update(batch_size, weights, bias, gradient_batch);
|
||||
batch_size = 0;
|
||||
++iter_num;
|
||||
gradient_batch.assign(gradient_batch.size(), Float64{0.0});
|
||||
}
|
||||
void update_state();
|
||||
};
|
||||
|
||||
|
||||
@ -491,35 +244,33 @@ template <
|
||||
/// Implemented Machine Learning method
|
||||
typename Data,
|
||||
/// Name of the method
|
||||
typename Name
|
||||
>
|
||||
typename Name>
|
||||
class AggregateFunctionMLMethod final : public IAggregateFunctionDataHelper<Data, AggregateFunctionMLMethod<Data, Name>>
|
||||
{
|
||||
public:
|
||||
String getName() const override { return Name::name; }
|
||||
|
||||
explicit AggregateFunctionMLMethod(UInt32 param_num,
|
||||
std::shared_ptr<IGradientComputer> gradient_computer,
|
||||
std::shared_ptr<IWeightsUpdater> weights_updater,
|
||||
Float64 learning_rate,
|
||||
Float64 l2_reg_coef,
|
||||
UInt32 batch_size,
|
||||
const DataTypes & arguments_types,
|
||||
const Array & params)
|
||||
: IAggregateFunctionDataHelper<Data, AggregateFunctionMLMethod<Data, Name>>(arguments_types, params),
|
||||
param_num(param_num),
|
||||
learning_rate(learning_rate),
|
||||
l2_reg_coef(l2_reg_coef),
|
||||
batch_size(batch_size),
|
||||
gradient_computer(std::move(gradient_computer)),
|
||||
weights_updater(std::move(weights_updater))
|
||||
{}
|
||||
|
||||
DataTypePtr getReturnType() const override
|
||||
explicit AggregateFunctionMLMethod(
|
||||
UInt32 param_num,
|
||||
std::shared_ptr<IGradientComputer> gradient_computer,
|
||||
std::shared_ptr<IWeightsUpdater> weights_updater,
|
||||
Float64 learning_rate,
|
||||
Float64 l2_reg_coef,
|
||||
UInt32 batch_size,
|
||||
const DataTypes & arguments_types,
|
||||
const Array & params)
|
||||
: IAggregateFunctionDataHelper<Data, AggregateFunctionMLMethod<Data, Name>>(arguments_types, params)
|
||||
, param_num(param_num)
|
||||
, learning_rate(learning_rate)
|
||||
, l2_reg_coef(l2_reg_coef)
|
||||
, batch_size(batch_size)
|
||||
, gradient_computer(std::move(gradient_computer))
|
||||
, weights_updater(std::move(weights_updater))
|
||||
{
|
||||
return std::make_shared<DataTypeNumber<Float64>>();
|
||||
}
|
||||
|
||||
DataTypePtr getReturnType() const override { return std::make_shared<DataTypeNumber<Float64>>(); }
|
||||
|
||||
void create(AggregateDataPtr place) const override
|
||||
{
|
||||
new (place) Data(learning_rate, l2_reg_coef, param_num, batch_size, gradient_computer, weights_updater);
|
||||
@ -530,29 +281,22 @@ public:
|
||||
this->data(place).add(columns, row_num);
|
||||
}
|
||||
|
||||
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena *) const override
|
||||
{
|
||||
this->data(place).merge(this->data(rhs));
|
||||
}
|
||||
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena *) const override { this->data(place).merge(this->data(rhs)); }
|
||||
|
||||
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override
|
||||
{
|
||||
this->data(place).write(buf);
|
||||
}
|
||||
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override { this->data(place).write(buf); }
|
||||
|
||||
void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override
|
||||
{
|
||||
this->data(place).read(buf);
|
||||
}
|
||||
void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override { this->data(place).read(buf); }
|
||||
|
||||
void predictValues(ConstAggregateDataPtr place, IColumn & to, Block & block, const ColumnNumbers & arguments, const Context & context) const override
|
||||
void predictValues(
|
||||
ConstAggregateDataPtr place, IColumn & to, Block & block, const ColumnNumbers & arguments, const Context & context) const override
|
||||
{
|
||||
if (arguments.size() != param_num + 1)
|
||||
throw Exception("Predict got incorrect number of arguments. Got: " +
|
||||
std::to_string(arguments.size()) + ". Required: " + std::to_string(param_num + 1),
|
||||
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
|
||||
throw Exception(
|
||||
"Predict got incorrect number of arguments. Got: " + std::to_string(arguments.size())
|
||||
+ ". Required: " + std::to_string(param_num + 1),
|
||||
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
|
||||
|
||||
auto &column = dynamic_cast<ColumnVector<Float64> &>(to);
|
||||
auto & column = dynamic_cast<ColumnVector<Float64> &>(to);
|
||||
|
||||
this->data(place).predict(column.getData(), block, arguments, context);
|
||||
}
|
||||
@ -575,6 +319,12 @@ private:
|
||||
std::shared_ptr<IWeightsUpdater> weights_updater;
|
||||
};
|
||||
|
||||
struct NameLinearRegression { static constexpr auto name = "LinearRegression"; };
|
||||
struct NameLogisticRegression { static constexpr auto name = "LogisticRegression"; };
|
||||
struct NameLinearRegression
|
||||
{
|
||||
static constexpr auto name = "LinearRegression";
|
||||
};
|
||||
struct NameLogisticRegression
|
||||
{
|
||||
static constexpr auto name = "LogisticRegression";
|
||||
};
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user