#include "AggregateFunctionMLMethod.h" #include #include #include #include #include #include #include #include #include "AggregateFunctionFactory.h" #include "FactoryHelpers.h" #include "Helpers.h" namespace DB { namespace { using FuncLinearRegression = AggregateFunctionMLMethod; using FuncLogisticRegression = AggregateFunctionMLMethod; template AggregateFunctionPtr createAggregateFunctionMLMethod(const std::string & name, const DataTypes & argument_types, const Array & parameters) { if (parameters.size() > 4) throw Exception( "Aggregate function " + name + " requires at most four parameters: learning_rate, l2_regularization_coef, mini-batch size and weights_updater " "method", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); if (argument_types.size() < 2) throw Exception( "Aggregate function " + name + " requires at least two arguments: target and model's parameters", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); for (size_t i = 0; i < argument_types.size(); ++i) { if (!isNativeNumber(argument_types[i])) throw Exception( "Argument " + std::to_string(i) + " of type " + argument_types[i]->getName() + " must be numeric for aggregate function " + name, ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); } /// Such default parameters were picked because they did good on some tests, /// though it still requires to fit parameters to achieve better result auto learning_rate = Float64(1.0); auto l2_reg_coef = Float64(0.5); UInt64 batch_size = 15; std::string weights_updater_name = "Adam"; std::unique_ptr gradient_computer; if (!parameters.empty()) { learning_rate = applyVisitor(FieldVisitorConvertToNumber(), parameters[0]); } if (parameters.size() > 1) { l2_reg_coef = applyVisitor(FieldVisitorConvertToNumber(), parameters[1]); } if (parameters.size() > 2) { batch_size = applyVisitor(FieldVisitorConvertToNumber(), parameters[2]); } if (parameters.size() > 3) { weights_updater_name = parameters[3].safeGet(); if (weights_updater_name != "SGD" && weights_updater_name != "Momentum" && weights_updater_name != "Nesterov" && weights_updater_name != "Adam") throw Exception("Invalid parameter for weights updater. The only supported are 'SGD', 'Momentum' and 'Nesterov'", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); } if (std::is_same::value) { gradient_computer = std::make_unique(); } else if (std::is_same::value) { gradient_computer = std::make_unique(); } else { throw Exception("Such gradient computer is not implemented yet", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); } return std::make_shared( argument_types.size() - 1, std::move(gradient_computer), weights_updater_name, learning_rate, l2_reg_coef, batch_size, argument_types, parameters); } } void registerAggregateFunctionMLMethod(AggregateFunctionFactory & factory) { factory.registerFunction("stochasticLinearRegression", createAggregateFunctionMLMethod); factory.registerFunction("stochasticLogisticRegression", createAggregateFunctionMLMethod); } LinearModelData::LinearModelData( Float64 learning_rate_, Float64 l2_reg_coef_, UInt64 param_num_, UInt64 batch_capacity_, std::shared_ptr gradient_computer_, std::shared_ptr weights_updater_) : learning_rate(learning_rate_) , l2_reg_coef(l2_reg_coef_) , batch_capacity(batch_capacity_) , batch_size(0) , gradient_computer(std::move(gradient_computer_)) , weights_updater(std::move(weights_updater_)) { weights.resize(param_num_, Float64{0.0}); gradient_batch.resize(param_num_ + 1, Float64{0.0}); } void LinearModelData::update_state() { if (batch_size == 0) return; weights_updater->update(batch_size, weights, bias, learning_rate, gradient_batch); batch_size = 0; ++iter_num; gradient_batch.assign(gradient_batch.size(), Float64{0.0}); } void LinearModelData::predict( ColumnVector::Container & container, Block & block, size_t offset, size_t limit, const ColumnNumbers & arguments, const Context & context) const { gradient_computer->predict(container, block, offset, limit, arguments, weights, bias, context); } void LinearModelData::returnWeights(IColumn & to) const { size_t size = weights.size() + 1; ColumnArray & arr_to = assert_cast(to); ColumnArray::Offsets & offsets_to = arr_to.getOffsets(); size_t old_size = offsets_to.back(); offsets_to.push_back(old_size + size); typename ColumnFloat64::Container & val_to = assert_cast(arr_to.getData()).getData(); val_to.reserve(old_size + size); for (size_t i = 0; i + 1 < size; ++i) val_to.push_back(weights[i]); val_to.push_back(bias); } void LinearModelData::read(ReadBuffer & buf) { readBinary(bias, buf); readBinary(weights, buf); readBinary(iter_num, buf); readBinary(gradient_batch, buf); readBinary(batch_size, buf); weights_updater->read(buf); } void LinearModelData::write(WriteBuffer & buf) const { writeBinary(bias, buf); writeBinary(weights, buf); writeBinary(iter_num, buf); writeBinary(gradient_batch, buf); writeBinary(batch_size, buf); weights_updater->write(buf); } void LinearModelData::merge(const DB::LinearModelData & rhs) { if (iter_num == 0 && rhs.iter_num == 0) return; update_state(); /// can't update rhs state because it's constant /// squared mean is more stable (in sence of quality of prediction) when two states with quietly different number of learning steps are merged Float64 frac = (static_cast(iter_num) * iter_num) / (iter_num * iter_num + rhs.iter_num * rhs.iter_num); for (size_t i = 0; i < weights.size(); ++i) { weights[i] = weights[i] * frac + rhs.weights[i] * (1 - frac); } bias = bias * frac + rhs.bias * (1 - frac); iter_num += rhs.iter_num; weights_updater->merge(*rhs.weights_updater, frac, 1 - frac); } void LinearModelData::add(const IColumn ** columns, size_t row_num) { /// first column stores target; features start from (columns + 1) Float64 target = (*columns[0]).getFloat64(row_num); /// Here we have columns + 1 as first column corresponds to target value, and others - to features weights_updater->add_to_batch( gradient_batch, *gradient_computer, weights, bias, l2_reg_coef, target, columns + 1, row_num); ++batch_size; if (batch_size == batch_capacity) { update_state(); } } /// Weights updaters void Adam::write(WriteBuffer & buf) const { writeBinary(average_gradient, buf); writeBinary(average_squared_gradient, buf); } void Adam::read(ReadBuffer & buf) { readBinary(average_gradient, buf); readBinary(average_squared_gradient, buf); } void Adam::merge(const IWeightsUpdater & rhs, Float64 frac, Float64 rhs_frac) { auto & adam_rhs = static_cast(rhs); if (adam_rhs.average_gradient.empty()) return; if (average_gradient.empty()) { if (!average_squared_gradient.empty() || adam_rhs.average_gradient.size() != adam_rhs.average_squared_gradient.size()) throw Exception("Average_gradient and average_squared_gradient must have same size", ErrorCodes::LOGICAL_ERROR); average_gradient.resize(adam_rhs.average_gradient.size(), Float64{0.0}); average_squared_gradient.resize(adam_rhs.average_squared_gradient.size(), Float64{0.0}); } for (size_t i = 0; i < average_gradient.size(); ++i) { average_gradient[i] = average_gradient[i] * frac + adam_rhs.average_gradient[i] * rhs_frac; average_squared_gradient[i] = average_squared_gradient[i] * frac + adam_rhs.average_squared_gradient[i] * rhs_frac; } beta1_powered_ *= adam_rhs.beta1_powered_; beta2_powered_ *= adam_rhs.beta2_powered_; } void Adam::update(UInt64 batch_size, std::vector & weights, Float64 & bias, Float64 learning_rate, const std::vector & batch_gradient) { if (average_gradient.empty()) { if (!average_squared_gradient.empty()) throw Exception("Average_gradient and average_squared_gradient must have same size", ErrorCodes::LOGICAL_ERROR); average_gradient.resize(batch_gradient.size(), Float64{0.0}); average_squared_gradient.resize(batch_gradient.size(), Float64{0.0}); } for (size_t i = 0; i != average_gradient.size(); ++i) { Float64 normed_gradient = batch_gradient[i] / batch_size; average_gradient[i] = beta1_ * average_gradient[i] + (1 - beta1_) * normed_gradient; average_squared_gradient[i] = beta2_ * average_squared_gradient[i] + (1 - beta2_) * normed_gradient * normed_gradient; } for (size_t i = 0; i < weights.size(); ++i) { weights[i] += (learning_rate * average_gradient[i]) / ((1 - beta1_powered_) * (sqrt(average_squared_gradient[i] / (1 - beta2_powered_)) + eps_)); } bias += (learning_rate * average_gradient[weights.size()]) / ((1 - beta1_powered_) * (sqrt(average_squared_gradient[weights.size()] / (1 - beta2_powered_)) + eps_)); beta1_powered_ *= beta1_; beta2_powered_ *= beta2_; } void Adam::add_to_batch( std::vector & batch_gradient, IGradientComputer & gradient_computer, const std::vector & weights, Float64 bias, Float64 l2_reg_coef, Float64 target, const IColumn ** columns, size_t row_num) { if (average_gradient.empty()) { average_gradient.resize(batch_gradient.size(), Float64{0.0}); average_squared_gradient.resize(batch_gradient.size(), Float64{0.0}); } gradient_computer.compute(batch_gradient, weights, bias, l2_reg_coef, target, columns, row_num); } void Nesterov::read(ReadBuffer & buf) { readBinary(accumulated_gradient, buf); } void Nesterov::write(WriteBuffer & buf) const { writeBinary(accumulated_gradient, buf); } void Nesterov::merge(const IWeightsUpdater & rhs, Float64 frac, Float64 rhs_frac) { auto & nesterov_rhs = static_cast(rhs); if (accumulated_gradient.empty()) accumulated_gradient.resize(nesterov_rhs.accumulated_gradient.size(), Float64{0.0}); for (size_t i = 0; i < accumulated_gradient.size(); ++i) { accumulated_gradient[i] = accumulated_gradient[i] * frac + nesterov_rhs.accumulated_gradient[i] * rhs_frac; } } void Nesterov::update(UInt64 batch_size, std::vector & weights, Float64 & bias, Float64 learning_rate, const std::vector & batch_gradient) { if (accumulated_gradient.empty()) { accumulated_gradient.resize(batch_gradient.size(), Float64{0.0}); } for (size_t i = 0; i < batch_gradient.size(); ++i) { accumulated_gradient[i] = accumulated_gradient[i] * alpha_ + (learning_rate * batch_gradient[i]) / batch_size; } for (size_t i = 0; i < weights.size(); ++i) { weights[i] += accumulated_gradient[i]; } bias += accumulated_gradient[weights.size()]; } void Nesterov::add_to_batch( std::vector & batch_gradient, IGradientComputer & gradient_computer, const std::vector & weights, Float64 bias, Float64 l2_reg_coef, Float64 target, const IColumn ** columns, size_t row_num) { if (accumulated_gradient.empty()) { accumulated_gradient.resize(batch_gradient.size(), Float64{0.0}); } std::vector shifted_weights(weights.size()); for (size_t i = 0; i != shifted_weights.size(); ++i) { shifted_weights[i] = weights[i] + accumulated_gradient[i] * alpha_; } auto shifted_bias = bias + accumulated_gradient[weights.size()] * alpha_; gradient_computer.compute(batch_gradient, shifted_weights, shifted_bias, l2_reg_coef, target, columns, row_num); } void Momentum::read(ReadBuffer & buf) { readBinary(accumulated_gradient, buf); } void Momentum::write(WriteBuffer & buf) const { writeBinary(accumulated_gradient, buf); } void Momentum::merge(const IWeightsUpdater & rhs, Float64 frac, Float64 rhs_frac) { auto & momentum_rhs = static_cast(rhs); for (size_t i = 0; i < accumulated_gradient.size(); ++i) { accumulated_gradient[i] = accumulated_gradient[i] * frac + momentum_rhs.accumulated_gradient[i] * rhs_frac; } } void Momentum::update(UInt64 batch_size, std::vector & weights, Float64 & bias, Float64 learning_rate, const std::vector & batch_gradient) { /// batch_size is already checked to be greater than 0 if (accumulated_gradient.empty()) { accumulated_gradient.resize(batch_gradient.size(), Float64{0.0}); } for (size_t i = 0; i < batch_gradient.size(); ++i) { accumulated_gradient[i] = accumulated_gradient[i] * alpha_ + (learning_rate * batch_gradient[i]) / batch_size; } for (size_t i = 0; i < weights.size(); ++i) { weights[i] += accumulated_gradient[i]; } bias += accumulated_gradient[weights.size()]; } void StochasticGradientDescent::update( UInt64 batch_size, std::vector & weights, Float64 & bias, Float64 learning_rate, const std::vector & batch_gradient) { /// batch_size is already checked to be greater than 0 for (size_t i = 0; i < weights.size(); ++i) { weights[i] += (learning_rate * batch_gradient[i]) / batch_size; } bias += (learning_rate * batch_gradient[weights.size()]) / batch_size; } void IWeightsUpdater::add_to_batch( std::vector & batch_gradient, IGradientComputer & gradient_computer, const std::vector & weights, Float64 bias, Float64 l2_reg_coef, Float64 target, const IColumn ** columns, size_t row_num) { gradient_computer.compute(batch_gradient, weights, bias, l2_reg_coef, target, columns, row_num); } /// Gradient computers void LogisticRegression::predict( ColumnVector::Container & container, Block & block, size_t offset, size_t limit, const ColumnNumbers & arguments, const std::vector & weights, Float64 bias, const Context & /*context*/) const { size_t rows_num = block.rows(); if (offset > rows_num || offset + limit > rows_num) throw Exception("Invalid offset and limit for LogisticRegression::predict. " "Block has " + toString(rows_num) + " rows, but offset is " + toString(offset) + " and limit is " + toString(limit), ErrorCodes::LOGICAL_ERROR); std::vector results(limit, bias); for (size_t i = 1; i < arguments.size(); ++i) { const ColumnWithTypeAndName & cur_col = block.getByPosition(arguments[i]); if (!isNativeNumber(cur_col.type)) throw Exception("Prediction arguments must have numeric type", ErrorCodes::BAD_ARGUMENTS); auto & features_column = cur_col.column; for (size_t row_num = 0; row_num < limit; ++row_num) results[row_num] += weights[i - 1] * features_column->getFloat64(offset + row_num); } container.reserve(container.size() + limit); for (size_t row_num = 0; row_num < limit; ++row_num) container.emplace_back(1 / (1 + exp(-results[row_num]))); } void LogisticRegression::compute( std::vector & batch_gradient, const std::vector & weights, Float64 bias, Float64 l2_reg_coef, Float64 target, const IColumn ** columns, size_t row_num) { Float64 derivative = bias; for (size_t i = 0; i < weights.size(); ++i) { auto value = (*columns[i]).getFloat64(row_num); derivative += weights[i] * value; } derivative *= target; derivative = exp(derivative); batch_gradient[weights.size()] += target / (derivative + 1); for (size_t i = 0; i < weights.size(); ++i) { auto value = (*columns[i]).getFloat64(row_num); batch_gradient[i] += target * value / (derivative + 1) - 2 * l2_reg_coef * weights[i]; } } void LinearRegression::predict( ColumnVector::Container & container, Block & block, size_t offset, size_t limit, const ColumnNumbers & arguments, const std::vector & weights, Float64 bias, const Context & /*context*/) const { if (weights.size() + 1 != arguments.size()) { throw Exception("In predict function number of arguments differs from the size of weights vector", ErrorCodes::LOGICAL_ERROR); } size_t rows_num = block.rows(); if (offset > rows_num || offset + limit > rows_num) throw Exception("Invalid offset and limit for LogisticRegression::predict. " "Block has " + toString(rows_num) + " rows, but offset is " + toString(offset) + " and limit is " + toString(limit), ErrorCodes::LOGICAL_ERROR); std::vector results(limit, bias); for (size_t i = 1; i < arguments.size(); ++i) { const ColumnWithTypeAndName & cur_col = block.getByPosition(arguments[i]); if (!isNativeNumber(cur_col.type)) throw Exception("Prediction arguments must have numeric type", ErrorCodes::BAD_ARGUMENTS); auto features_column = cur_col.column; if (!features_column) throw Exception("Unexpectedly cannot dynamically cast features column " + std::to_string(i), ErrorCodes::LOGICAL_ERROR); for (size_t row_num = 0; row_num < limit; ++row_num) results[row_num] += weights[i - 1] * features_column->getFloat64(row_num + offset); } container.reserve(container.size() + limit); for (size_t row_num = 0; row_num < limit; ++row_num) container.emplace_back(results[row_num]); } void LinearRegression::compute( std::vector & batch_gradient, const std::vector & weights, Float64 bias, Float64 l2_reg_coef, Float64 target, const IColumn ** columns, size_t row_num) { Float64 derivative = (target - bias); for (size_t i = 0; i < weights.size(); ++i) { auto value = (*columns[i]).getFloat64(row_num); derivative -= weights[i] * value; } derivative *= 2; batch_gradient[weights.size()] += derivative; for (size_t i = 0; i < weights.size(); ++i) { auto value = (*columns[i]).getFloat64(row_num); batch_gradient[i] += derivative * value - 2 * l2_reg_coef * weights[i]; } } }