This commit is contained in:
Evgeniy Gatov 2015-11-13 19:08:57 +03:00
commit ecfcad2a58
44 changed files with 1235 additions and 381 deletions

View File

@ -24,17 +24,17 @@ private:
public:
AggregateFunctionArray(AggregateFunctionPtr nested_) : nested_func_owner(nested_), nested_func(nested_func_owner.get()) {}
String getName() const
String getName() const override
{
return nested_func->getName() + "Array";
}
DataTypePtr getReturnType() const
DataTypePtr getReturnType() const override
{
return nested_func->getReturnType();
}
void setArguments(const DataTypes & arguments)
void setArguments(const DataTypes & arguments) override
{
num_agruments = arguments.size();
@ -49,37 +49,37 @@ public:
nested_func->setArguments(nested_arguments);
}
void setParameters(const Array & params)
void setParameters(const Array & params) override
{
nested_func->setParameters(params);
}
void create(AggregateDataPtr place) const
void create(AggregateDataPtr place) const override
{
nested_func->create(place);
}
void destroy(AggregateDataPtr place) const noexcept
void destroy(AggregateDataPtr place) const noexcept override
{
nested_func->destroy(place);
}
bool hasTrivialDestructor() const
bool hasTrivialDestructor() const override
{
return nested_func->hasTrivialDestructor();
}
size_t sizeOfData() const
size_t sizeOfData() const override
{
return nested_func->sizeOfData();
}
size_t alignOfData() const
size_t alignOfData() const override
{
return nested_func->alignOfData();
}
void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num) const
void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num) const override
{
const IColumn * nested[num_agruments];
@ -96,22 +96,22 @@ public:
nested_func->add(place, nested, i);
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const override
{
nested_func->merge(place, rhs);
}
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override
{
nested_func->serialize(place, buf);
}
void deserializeMerge(AggregateDataPtr place, ReadBuffer & buf) const
void deserializeMerge(AggregateDataPtr place, ReadBuffer & buf) const override
{
nested_func->deserializeMerge(place, buf);
}
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
{
nested_func->insertResultInto(place, to);
}

View File

@ -27,14 +27,14 @@ template <typename T>
class AggregateFunctionAvg final : public IUnaryAggregateFunction<AggregateFunctionAvgData<typename NearestFieldType<T>::Type>, AggregateFunctionAvg<T> >
{
public:
String getName() const { return "avg"; }
String getName() const override { return "avg"; }
DataTypePtr getReturnType() const
DataTypePtr getReturnType() const override
{
return new DataTypeFloat64;
}
void setArgument(const DataTypePtr & argument)
void setArgument(const DataTypePtr & argument) override
{
if (!argument->isNumeric())
throw Exception("Illegal type " + argument->getName() + " of argument for aggregate function " + getName(),
@ -48,19 +48,19 @@ public:
++this->data(place).count;
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const override
{
this->data(place).sum += this->data(rhs).sum;
this->data(place).count += this->data(rhs).count;
}
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override
{
writeBinary(this->data(place).sum, buf);
writeVarUInt(this->data(place).count, buf);
}
void deserializeMerge(AggregateDataPtr place, ReadBuffer & buf) const
void deserializeMerge(AggregateDataPtr place, ReadBuffer & buf) const override
{
typename NearestFieldType<T>::Type tmp_sum = 0;
UInt64 tmp_count = 0;
@ -70,7 +70,7 @@ public:
this->data(place).count += tmp_count;
}
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
{
static_cast<ColumnFloat64 &>(to).getData().push_back(
static_cast<Float64>(this->data(place).sum) / this->data(place).count);

View File

@ -23,9 +23,9 @@ struct AggregateFunctionCountData
class AggregateFunctionCount final : public INullaryAggregateFunction<AggregateFunctionCountData, AggregateFunctionCount>
{
public:
String getName() const { return "count"; }
String getName() const override { return "count"; }
DataTypePtr getReturnType() const
DataTypePtr getReturnType() const override
{
return new DataTypeUInt64;
}
@ -36,24 +36,24 @@ public:
++data(place).count;
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const override
{
data(place).count += data(rhs).count;
}
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override
{
writeVarUInt(data(place).count, buf);
}
void deserializeMerge(AggregateDataPtr place, ReadBuffer & buf) const
void deserializeMerge(AggregateDataPtr place, ReadBuffer & buf) const override
{
UInt64 tmp;
readVarUInt(tmp, buf);
data(place).count += tmp;
}
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
{
static_cast<ColumnUInt64 &>(to).getData().push_back(data(place).count);
}

View File

@ -26,14 +26,14 @@ private:
DataTypePtr type;
public:
String getName() const { return "groupArray"; }
String getName() const override { return "groupArray"; }
DataTypePtr getReturnType() const
DataTypePtr getReturnType() const override
{
return new DataTypeArray(type);
}
void setArgument(const DataTypePtr & argument)
void setArgument(const DataTypePtr & argument) override
{
type = argument;
}
@ -45,12 +45,12 @@ public:
column.get(row_num, data(place).value.back());
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const override
{
data(place).value.insert(data(place).value.end(), data(rhs).value.begin(), data(rhs).value.end());
}
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override
{
const Array & value = data(place).value;
size_t size = value.size();
@ -59,7 +59,7 @@ public:
type->serializeBinary(value[i], buf);
}
void deserializeMerge(AggregateDataPtr place, ReadBuffer & buf) const
void deserializeMerge(AggregateDataPtr place, ReadBuffer & buf) const override
{
size_t size = 0;
readVarUInt(size, buf);
@ -75,7 +75,7 @@ public:
type->deserializeBinary(value[old_size + i], buf);
}
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
{
to.insert(data(place).value);
}

View File

@ -42,14 +42,14 @@ private:
typedef AggregateFunctionGroupUniqArrayData<T> State;
public:
String getName() const { return "groupUniqArray"; }
String getName() const override { return "groupUniqArray"; }
DataTypePtr getReturnType() const
DataTypePtr getReturnType() const override
{
return new DataTypeArray(new typename DataTypeFromFieldType<T>::Type);
}
void setArgument(const DataTypePtr & argument)
void setArgument(const DataTypePtr & argument) override
{
}
@ -59,12 +59,12 @@ public:
this->data(place).value.insert(static_cast<const ColumnVector<T> &>(column).getData()[row_num]);
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const override
{
this->data(place).value.merge(this->data(rhs).value);
}
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override
{
const typename State::Set & set = this->data(place).value;
size_t size = set.size();
@ -73,12 +73,12 @@ public:
writeIntBinary(*it, buf);
}
void deserializeMerge(AggregateDataPtr place, ReadBuffer & buf) const
void deserializeMerge(AggregateDataPtr place, ReadBuffer & buf) const override
{
this->data(place).value.readAndMerge(buf);
}
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
{
ColumnArray & arr_to = static_cast<ColumnArray &>(to);
ColumnArray::Offsets_t & offsets_to = arr_to.getOffsets();

View File

@ -23,17 +23,17 @@ private:
public:
AggregateFunctionIf(AggregateFunctionPtr nested_) : nested_func_owner(nested_), nested_func(nested_func_owner.get()) {}
String getName() const
String getName() const override
{
return nested_func->getName() + "If";
}
DataTypePtr getReturnType() const
DataTypePtr getReturnType() const override
{
return nested_func->getReturnType();
}
void setArguments(const DataTypes & arguments)
void setArguments(const DataTypes & arguments) override
{
num_agruments = arguments.size();
@ -47,58 +47,58 @@ public:
nested_func->setArguments(nested_arguments);
}
void setParameters(const Array & params)
void setParameters(const Array & params) override
{
nested_func->setParameters(params);
}
void create(AggregateDataPtr place) const
void create(AggregateDataPtr place) const override
{
nested_func->create(place);
}
void destroy(AggregateDataPtr place) const noexcept
void destroy(AggregateDataPtr place) const noexcept override
{
nested_func->destroy(place);
}
bool hasTrivialDestructor() const
bool hasTrivialDestructor() const override
{
return nested_func->hasTrivialDestructor();
}
size_t sizeOfData() const
size_t sizeOfData() const override
{
return nested_func->sizeOfData();
}
size_t alignOfData() const
size_t alignOfData() const override
{
return nested_func->alignOfData();
}
void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num) const
void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num) const override
{
if (static_cast<const ColumnUInt8 &>(*columns[num_agruments - 1]).getData()[row_num])
nested_func->add(place, columns, row_num);
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const override
{
nested_func->merge(place, rhs);
}
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override
{
nested_func->serialize(place, buf);
}
void deserializeMerge(AggregateDataPtr place, ReadBuffer & buf) const
void deserializeMerge(AggregateDataPtr place, ReadBuffer & buf) const override
{
nested_func->deserializeMerge(place, buf);
}
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
{
nested_func->insertResultInto(place, to);
}

View File

@ -24,17 +24,17 @@ private:
public:
AggregateFunctionMerge(AggregateFunctionPtr nested_) : nested_func_owner(nested_), nested_func(nested_func_owner.get()) {}
String getName() const
String getName() const override
{
return nested_func->getName() + "Merge";
}
DataTypePtr getReturnType() const
DataTypePtr getReturnType() const override
{
return nested_func->getReturnType();
}
void setArguments(const DataTypes & arguments)
void setArguments(const DataTypes & arguments) override
{
if (arguments.size() != 1)
throw Exception("Passed " + toString(arguments.size()) + " arguments to unary aggregate function " + this->getName(),
@ -49,57 +49,57 @@ public:
nested_func->setArguments(data_type->getArgumentsDataTypes());
}
void setParameters(const Array & params)
void setParameters(const Array & params) override
{
nested_func->setParameters(params);
}
void create(AggregateDataPtr place) const
void create(AggregateDataPtr place) const override
{
nested_func->create(place);
}
void destroy(AggregateDataPtr place) const noexcept
void destroy(AggregateDataPtr place) const noexcept override
{
nested_func->destroy(place);
}
bool hasTrivialDestructor() const
bool hasTrivialDestructor() const override
{
return nested_func->hasTrivialDestructor();
}
size_t sizeOfData() const
size_t sizeOfData() const override
{
return nested_func->sizeOfData();
}
size_t alignOfData() const
size_t alignOfData() const override
{
return nested_func->alignOfData();
}
void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num) const
void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num) const override
{
nested_func->merge(place, static_cast<const ColumnAggregateFunction &>(*columns[0]).getData()[row_num]);
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const override
{
nested_func->merge(place, rhs);
}
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override
{
nested_func->serialize(place, buf);
}
void deserializeMerge(AggregateDataPtr place, ReadBuffer & buf) const
void deserializeMerge(AggregateDataPtr place, ReadBuffer & buf) const override
{
nested_func->deserializeMerge(place, buf);
}
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
{
nested_func->insertResultInto(place, to);
}

View File

@ -32,7 +32,8 @@ struct AggregateFunctionQuantileData
* Для дат и дат-с-временем returns_float следует задавать равным false.
*/
template <typename ArgumentFieldType, bool returns_float = true>
class AggregateFunctionQuantile final : public IUnaryAggregateFunction<AggregateFunctionQuantileData<ArgumentFieldType>, AggregateFunctionQuantile<ArgumentFieldType, returns_float> >
class AggregateFunctionQuantile final
: public IUnaryAggregateFunction<AggregateFunctionQuantileData<ArgumentFieldType>, AggregateFunctionQuantile<ArgumentFieldType, returns_float> >
{
private:
using Sample = typename AggregateFunctionQuantileData<ArgumentFieldType>::Sample;
@ -43,14 +44,14 @@ private:
public:
AggregateFunctionQuantile(double level_ = 0.5) : level(level_) {}
String getName() const { return "quantile"; }
String getName() const override { return "quantile"; }
DataTypePtr getReturnType() const
DataTypePtr getReturnType() const override
{
return type;
}
void setArgument(const DataTypePtr & argument)
void setArgument(const DataTypePtr & argument) override
{
if (returns_float)
type = new DataTypeFloat64;
@ -58,7 +59,7 @@ public:
type = argument;
}
void setParameters(const Array & params)
void setParameters(const Array & params) override
{
if (params.size() != 1)
throw Exception("Aggregate function " + getName() + " requires exactly one parameter.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
@ -72,24 +73,24 @@ public:
this->data(place).sample.insert(static_cast<const ColumnVector<ArgumentFieldType> &>(column).getData()[row_num]);
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const override
{
this->data(place).sample.merge(this->data(rhs).sample);
}
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override
{
this->data(place).sample.write(buf);
}
void deserializeMerge(AggregateDataPtr place, ReadBuffer & buf) const
void deserializeMerge(AggregateDataPtr place, ReadBuffer & buf) const override
{
Sample tmp_sample;
tmp_sample.read(buf);
this->data(place).sample.merge(tmp_sample);
}
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
{
/// Sample может отсортироваться при получении квантиля, но в этом контексте можно не считать это нарушением константности.
Sample & sample = const_cast<Sample &>(this->data(place).sample);
@ -107,7 +108,8 @@ public:
* Возвращает массив результатов.
*/
template <typename ArgumentFieldType, bool returns_float = true>
class AggregateFunctionQuantiles final : public IUnaryAggregateFunction<AggregateFunctionQuantileData<ArgumentFieldType>, AggregateFunctionQuantiles<ArgumentFieldType, returns_float> >
class AggregateFunctionQuantiles final
: public IUnaryAggregateFunction<AggregateFunctionQuantileData<ArgumentFieldType>, AggregateFunctionQuantiles<ArgumentFieldType, returns_float> >
{
private:
using Sample = typename AggregateFunctionQuantileData<ArgumentFieldType>::Sample;
@ -117,14 +119,14 @@ private:
DataTypePtr type;
public:
String getName() const { return "quantiles"; }
String getName() const override { return "quantiles"; }
DataTypePtr getReturnType() const
DataTypePtr getReturnType() const override
{
return new DataTypeArray(type);
}
void setArgument(const DataTypePtr & argument)
void setArgument(const DataTypePtr & argument) override
{
if (returns_float)
type = new DataTypeFloat64;
@ -132,7 +134,7 @@ public:
type = argument;
}
void setParameters(const Array & params)
void setParameters(const Array & params) override
{
if (params.empty())
throw Exception("Aggregate function " + getName() + " requires at least one parameter.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
@ -150,24 +152,24 @@ public:
this->data(place).sample.insert(static_cast<const ColumnVector<ArgumentFieldType> &>(column).getData()[row_num]);
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const override
{
this->data(place).sample.merge(this->data(rhs).sample);
}
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override
{
this->data(place).sample.write(buf);
}
void deserializeMerge(AggregateDataPtr place, ReadBuffer & buf) const
void deserializeMerge(AggregateDataPtr place, ReadBuffer & buf) const override
{
Sample tmp_sample;
tmp_sample.read(buf);
this->data(place).sample.merge(tmp_sample);
}
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
{
/// Sample может отсортироваться при получении квантиля, но в этом контексте можно не считать это нарушением константности.
Sample & sample = const_cast<Sample &>(this->data(place).sample);

View File

@ -46,9 +46,9 @@ private:
public:
AggregateFunctionQuantileDeterministic(double level_ = 0.5) : level(level_) {}
String getName() const { return "quantileDeterministic"; }
String getName() const override { return "quantileDeterministic"; }
DataTypePtr getReturnType() const
DataTypePtr getReturnType() const override
{
return type;
}
@ -65,7 +65,7 @@ public:
};
}
void setParameters(const Array & params)
void setParameters(const Array & params) override
{
if (params.size() != 1)
throw Exception("Aggregate function " + getName() + " requires exactly one parameter.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
@ -74,30 +74,30 @@ public:
}
void addOne(AggregateDataPtr place, const IColumn & column, const IColumn & determinator, size_t row_num) const
void addTwo(AggregateDataPtr place, const IColumn & column, const IColumn & determinator, size_t row_num) const
{
this->data(place).sample.insert(static_cast<const ColumnVector<ArgumentFieldType> &>(column).getData()[row_num],
determinator.get64(row_num));
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const override
{
this->data(place).sample.merge(this->data(rhs).sample);
}
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override
{
this->data(place).sample.write(buf);
}
void deserializeMerge(AggregateDataPtr place, ReadBuffer & buf) const
void deserializeMerge(AggregateDataPtr place, ReadBuffer & buf) const override
{
Sample tmp_sample;
tmp_sample.read(buf);
this->data(place).sample.merge(tmp_sample);
}
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
{
/// Sample может отсортироваться при получении квантиля, но в этом контексте можно не считать это нарушением константности.
Sample & sample = const_cast<Sample &>(this->data(place).sample);
@ -128,9 +128,9 @@ private:
DataTypePtr type;
public:
String getName() const { return "quantilesDeterministic"; }
String getName() const override { return "quantilesDeterministic"; }
DataTypePtr getReturnType() const
DataTypePtr getReturnType() const override
{
return new DataTypeArray(type);
}
@ -147,7 +147,7 @@ public:
};
}
void setParameters(const Array & params)
void setParameters(const Array & params) override
{
if (params.empty())
throw Exception("Aggregate function " + getName() + " requires at least one parameter.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
@ -160,30 +160,30 @@ public:
}
void addOne(AggregateDataPtr place, const IColumn & column, const IColumn & determinator, size_t row_num) const
void addTwo(AggregateDataPtr place, const IColumn & column, const IColumn & determinator, size_t row_num) const
{
this->data(place).sample.insert(static_cast<const ColumnVector<ArgumentFieldType> &>(column).getData()[row_num],
determinator.get64(row_num));
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const override
{
this->data(place).sample.merge(this->data(rhs).sample);
}
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override
{
this->data(place).sample.write(buf);
}
void deserializeMerge(AggregateDataPtr place, ReadBuffer & buf) const
void deserializeMerge(AggregateDataPtr place, ReadBuffer & buf) const override
{
Sample tmp_sample;
tmp_sample.read(buf);
this->data(place).sample.merge(tmp_sample);
}
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
{
/// Sample может отсортироваться при получении квантиля, но в этом контексте можно не считать это нарушением константности.
Sample & sample = const_cast<Sample &>(this->data(place).sample);

View File

@ -549,18 +549,18 @@ private:
public:
AggregateFunctionQuantileTiming(double level_ = 0.5) : level(level_) {}
String getName() const { return "quantileTiming"; }
String getName() const override { return "quantileTiming"; }
DataTypePtr getReturnType() const
DataTypePtr getReturnType() const override
{
return new DataTypeFloat32;
}
void setArgument(const DataTypePtr & argument)
void setArgument(const DataTypePtr & argument) override
{
}
void setParameters(const Array & params)
void setParameters(const Array & params) override
{
if (params.size() != 1)
throw Exception("Aggregate function " + getName() + " requires exactly one parameter.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
@ -574,22 +574,22 @@ public:
this->data(place).insert(static_cast<const ColumnVector<ArgumentFieldType> &>(column).getData()[row_num]);
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const override
{
this->data(place).merge(this->data(rhs));
}
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override
{
this->data(place).serialize(buf);
}
void deserializeMerge(AggregateDataPtr place, ReadBuffer & buf) const
void deserializeMerge(AggregateDataPtr place, ReadBuffer & buf) const override
{
this->data(place).deserializeMerge(buf);
}
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
{
static_cast<ColumnFloat32 &>(to).getData().push_back(this->data(place).getFloat(level));
}
@ -607,18 +607,18 @@ private:
public:
AggregateFunctionQuantileTimingWeighted(double level_ = 0.5) : level(level_) {}
String getName() const { return "quantileTimingWeighted"; }
String getName() const override { return "quantileTimingWeighted"; }
DataTypePtr getReturnType() const
DataTypePtr getReturnType() const override
{
return new DataTypeFloat32;
}
void setArguments(const DataTypes & arguments)
void setArguments(const DataTypes & arguments) override
{
}
void setParameters(const Array & params)
void setParameters(const Array & params) override
{
if (params.size() != 1)
throw Exception("Aggregate function " + getName() + " requires exactly one parameter.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
@ -627,29 +627,29 @@ public:
}
void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num) const
void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num) const override
{
this->data(place).insertWeighted(
static_cast<const ColumnVector<ArgumentFieldType> &>(*columns[0]).getData()[row_num],
static_cast<const ColumnVector<WeightFieldType> &>(*columns[1]).getData()[row_num]);
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const override
{
this->data(place).merge(this->data(rhs));
}
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override
{
this->data(place).serialize(buf);
}
void deserializeMerge(AggregateDataPtr place, ReadBuffer & buf) const
void deserializeMerge(AggregateDataPtr place, ReadBuffer & buf) const override
{
this->data(place).deserializeMerge(buf);
}
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
{
static_cast<ColumnFloat32 &>(to).getData().push_back(this->data(place).getFloat(level));
}
@ -668,18 +668,18 @@ private:
Levels levels;
public:
String getName() const { return "quantilesTiming"; }
String getName() const override { return "quantilesTiming"; }
DataTypePtr getReturnType() const
DataTypePtr getReturnType() const override
{
return new DataTypeArray(new DataTypeFloat32);
}
void setArgument(const DataTypePtr & argument)
void setArgument(const DataTypePtr & argument) override
{
}
void setParameters(const Array & params)
void setParameters(const Array & params) override
{
if (params.empty())
throw Exception("Aggregate function " + getName() + " requires at least one parameter.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
@ -697,22 +697,22 @@ public:
this->data(place).insert(static_cast<const ColumnVector<ArgumentFieldType> &>(column).getData()[row_num]);
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const override
{
this->data(place).merge(this->data(rhs));
}
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override
{
this->data(place).serialize(buf);
}
void deserializeMerge(AggregateDataPtr place, ReadBuffer & buf) const
void deserializeMerge(AggregateDataPtr place, ReadBuffer & buf) const override
{
this->data(place).deserializeMerge(buf);
}
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
{
ColumnArray & arr_to = static_cast<ColumnArray &>(to);
ColumnArray::Offsets_t & offsets_to = arr_to.getOffsets();
@ -737,18 +737,18 @@ private:
Levels levels;
public:
String getName() const { return "quantilesTimingWeighted"; }
String getName() const override { return "quantilesTimingWeighted"; }
DataTypePtr getReturnType() const
DataTypePtr getReturnType() const override
{
return new DataTypeArray(new DataTypeFloat32);
}
void setArguments(const DataTypes & arguments)
void setArguments(const DataTypes & arguments) override
{
}
void setParameters(const Array & params)
void setParameters(const Array & params) override
{
if (params.empty())
throw Exception("Aggregate function " + getName() + " requires at least one parameter.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
@ -760,29 +760,29 @@ public:
levels[i] = apply_visitor(FieldVisitorConvertToNumber<Float64>(), params[i]);
}
void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num) const
void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num) const override
{
this->data(place).insertWeighted(
static_cast<const ColumnVector<ArgumentFieldType> &>(*columns[0]).getData()[row_num],
static_cast<const ColumnVector<WeightFieldType> &>(*columns[1]).getData()[row_num]);
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const override
{
this->data(place).merge(this->data(rhs));
}
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override
{
this->data(place).serialize(buf);
}
void deserializeMerge(AggregateDataPtr place, ReadBuffer & buf) const
void deserializeMerge(AggregateDataPtr place, ReadBuffer & buf) const override
{
this->data(place).deserializeMerge(buf);
}
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
{
ColumnArray & arr_to = static_cast<ColumnArray &>(to);
ColumnArray::Offsets_t & offsets_to = arr_to.getOffsets();

View File

@ -25,81 +25,82 @@ private:
public:
AggregateFunctionState(AggregateFunctionPtr nested_) : nested_func_owner(nested_), nested_func(nested_func_owner.get()) {}
String getName() const
String getName() const override
{
return nested_func->getName() + "State";
}
DataTypePtr getReturnType() const
DataTypePtr getReturnType() const override
{
return new DataTypeAggregateFunction(nested_func_owner, arguments, params);
}
void setArguments(const DataTypes & arguments_)
void setArguments(const DataTypes & arguments_) override
{
arguments = arguments_;
nested_func->setArguments(arguments);
}
void setParameters(const Array & params_)
void setParameters(const Array & params_) override
{
params = params_;
nested_func->setParameters(params);
}
void create(AggregateDataPtr place) const
void create(AggregateDataPtr place) const override
{
nested_func->create(place);
}
void destroy(AggregateDataPtr place) const noexcept
void destroy(AggregateDataPtr place) const noexcept override
{
nested_func->destroy(place);
}
bool hasTrivialDestructor() const
bool hasTrivialDestructor() const override
{
return nested_func->hasTrivialDestructor();
}
size_t sizeOfData() const
size_t sizeOfData() const override
{
return nested_func->sizeOfData();
}
size_t alignOfData() const
size_t alignOfData() const override
{
return nested_func->alignOfData();
}
void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num) const
void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num) const override
{
nested_func->add(place, columns, row_num);
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const override
{
nested_func->merge(place, rhs);
}
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override
{
nested_func->serialize(place, buf);
}
void deserializeMerge(AggregateDataPtr place, ReadBuffer & buf) const
void deserializeMerge(AggregateDataPtr place, ReadBuffer & buf) const override
{
nested_func->deserializeMerge(place, buf);
}
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
{
static_cast<ColumnAggregateFunction &>(to).getData().push_back(const_cast<AggregateDataPtr>(place));
}
/// Аггрегатная функция или состояние аггрегатной функции.
bool isState() const { return true; }
bool isState() const override { return true; }
AggregateFunctionPtr getNestedFunction() const { return nested_func_owner; }
};
}

View File

@ -25,14 +25,14 @@ template <typename T>
class AggregateFunctionSum final : public IUnaryAggregateFunction<AggregateFunctionSumData<typename NearestFieldType<T>::Type>, AggregateFunctionSum<T> >
{
public:
String getName() const { return "sum"; }
String getName() const override { return "sum"; }
DataTypePtr getReturnType() const
DataTypePtr getReturnType() const override
{
return new typename DataTypeFromFieldType<typename NearestFieldType<T>::Type>::Type;
}
void setArgument(const DataTypePtr & argument)
void setArgument(const DataTypePtr & argument) override
{
if (!argument->isNumeric())
throw Exception("Illegal type " + argument->getName() + " of argument for aggregate function " + getName(),
@ -45,24 +45,24 @@ public:
this->data(place).sum += static_cast<const ColumnVector<T> &>(column).getData()[row_num];
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const override
{
this->data(place).sum += this->data(rhs).sum;
}
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override
{
writeBinary(this->data(place).sum, buf);
}
void deserializeMerge(AggregateDataPtr place, ReadBuffer & buf) const
void deserializeMerge(AggregateDataPtr place, ReadBuffer & buf) const override
{
typename NearestFieldType<T>::Type tmp;
readBinary(tmp, buf);
this->data(place).sum += tmp;
}
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
{
static_cast<ColumnVector<typename NearestFieldType<T>::Type> &>(to).getData().push_back(this->data(place).sum);
}

View File

@ -330,14 +330,14 @@ template <typename T, typename Data>
class AggregateFunctionUniq final : public IUnaryAggregateFunction<Data, AggregateFunctionUniq<T, Data> >
{
public:
String getName() const { return Data::getName(); }
String getName() const override { return Data::getName(); }
DataTypePtr getReturnType() const
DataTypePtr getReturnType() const override
{
return new DataTypeUInt64;
}
void setArgument(const DataTypePtr & argument)
void setArgument(const DataTypePtr & argument) override
{
}
@ -346,22 +346,22 @@ public:
detail::OneAdder<T, Data>::addOne(this->data(place), column, row_num);
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const override
{
this->data(place).set.merge(this->data(rhs).set);
}
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override
{
this->data(place).set.write(buf);
}
void deserializeMerge(AggregateDataPtr place, ReadBuffer & buf) const
void deserializeMerge(AggregateDataPtr place, ReadBuffer & buf) const override
{
this->data(place).set.readAndMerge(buf);
}
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
{
static_cast<ColumnUInt64 &>(to).getData().push_back(this->data(place).set.size());
}
@ -381,14 +381,14 @@ private:
size_t num_args = 0;
public:
String getName() const { return Data::getName(); }
String getName() const override { return Data::getName(); }
DataTypePtr getReturnType() const
DataTypePtr getReturnType() const override
{
return new DataTypeUInt64;
}
void setArguments(const DataTypes & arguments)
void setArguments(const DataTypes & arguments) override
{
if (argument_is_tuple)
num_args = typeid_cast<const DataTypeTuple &>(*arguments[0]).getElements().size();
@ -396,27 +396,27 @@ public:
num_args = arguments.size();
}
void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num) const
void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num) const override
{
this->data(place).set.insert(UniqVariadicHash<is_exact, argument_is_tuple>::apply(num_args, columns, row_num));
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const override
{
this->data(place).set.merge(this->data(rhs).set);
}
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override
{
this->data(place).set.write(buf);
}
void deserializeMerge(AggregateDataPtr place, ReadBuffer & buf) const
void deserializeMerge(AggregateDataPtr place, ReadBuffer & buf) const override
{
this->data(place).set.readAndMerge(buf);
}
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
{
static_cast<ColumnUInt64 &>(to).getData().push_back(this->data(place).set.size());
}

View File

@ -133,23 +133,23 @@ private:
UInt8 threshold = 5; /// Значение по-умолчанию, если параметр не указан.
public:
size_t sizeOfData() const
size_t sizeOfData() const override
{
return sizeof(AggregateFunctionUniqUpToData<T>) + sizeof(T) * threshold;
}
String getName() const { return "uniqUpTo"; }
String getName() const override { return "uniqUpTo"; }
DataTypePtr getReturnType() const
DataTypePtr getReturnType() const override
{
return new DataTypeUInt64;
}
void setArgument(const DataTypePtr & argument)
void setArgument(const DataTypePtr & argument) override
{
}
void setParameters(const Array & params)
void setParameters(const Array & params) override
{
if (params.size() != 1)
throw Exception("Aggregate function " + getName() + " requires exactly one parameter.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
@ -168,22 +168,22 @@ public:
this->data(place).addOne(column, row_num, threshold);
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const override
{
this->data(place).merge(this->data(rhs), threshold);
}
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override
{
this->data(place).write(buf, threshold);
}
void deserializeMerge(AggregateDataPtr place, ReadBuffer & buf) const
void deserializeMerge(AggregateDataPtr place, ReadBuffer & buf) const override
{
this->data(place).readAndMerge(buf, threshold);
}
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
{
static_cast<ColumnUInt64 &>(to).getData().push_back(this->data(place).size());
}
@ -202,19 +202,19 @@ private:
UInt8 threshold = 5; /// Значение по-умолчанию, если параметр не указан.
public:
size_t sizeOfData() const
size_t sizeOfData() const override
{
return sizeof(AggregateFunctionUniqUpToData<UInt64>) + sizeof(UInt64) * threshold;
}
String getName() const { return "uniqUpTo"; }
String getName() const override { return "uniqUpTo"; }
DataTypePtr getReturnType() const
DataTypePtr getReturnType() const override
{
return new DataTypeUInt64;
}
void setArguments(const DataTypes & arguments)
void setArguments(const DataTypes & arguments) override
{
if (argument_is_tuple)
num_args = typeid_cast<const DataTypeTuple &>(*arguments[0]).getElements().size();
@ -222,7 +222,7 @@ public:
num_args = arguments.size();
}
void setParameters(const Array & params)
void setParameters(const Array & params) override
{
if (params.size() != 1)
throw Exception("Aggregate function " + getName() + " requires exactly one parameter.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
@ -236,27 +236,27 @@ public:
threshold = threshold_param;
}
void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num) const
void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num) const override
{
this->data(place).insert(UniqVariadicHash<false, argument_is_tuple>::apply(num_args, columns, row_num), threshold);
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const override
{
this->data(place).merge(this->data(rhs), threshold);
}
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override
{
this->data(place).write(buf, threshold);
}
void deserializeMerge(AggregateDataPtr place, ReadBuffer & buf) const
void deserializeMerge(AggregateDataPtr place, ReadBuffer & buf) const override
{
this->data(place).readAndMerge(buf, threshold);
}
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
{
static_cast<ColumnUInt64 &>(to).getData().push_back(this->data(place).size());
}

View File

@ -27,14 +27,14 @@ private:
DataTypePtr type_val;
public:
String getName() const { return (0 == strcmp(Data::ValueData_t::name(), "min")) ? "argMin" : "argMax"; }
String getName() const override { return (0 == strcmp(Data::ValueData_t::name(), "min")) ? "argMin" : "argMax"; }
DataTypePtr getReturnType() const
DataTypePtr getReturnType() const override
{
return type_res;
}
void setArguments(const DataTypes & arguments)
void setArguments(const DataTypes & arguments) override
{
if (arguments.size() != 2)
throw Exception("Aggregate function " + getName() + " requires exactly two arguments.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
@ -43,25 +43,25 @@ public:
type_val = arguments[1];
}
void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num) const
void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num) const override
{
if (this->data(place).value.changeIfBetter(*columns[1], row_num))
this->data(place).result.change(*columns[0], row_num);
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const override
{
if (this->data(place).value.changeIfBetter(this->data(rhs).value))
this->data(place).result.change(this->data(rhs).result);
}
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override
{
this->data(place).result.write(buf, *type_res.get());
this->data(place).value.write(buf, *type_val.get());
}
void deserializeMerge(AggregateDataPtr place, ReadBuffer & buf) const
void deserializeMerge(AggregateDataPtr place, ReadBuffer & buf) const override
{
Data rhs; /// Для строчек не очень оптимально, так как может делаться одна лишняя аллокация.
@ -72,7 +72,7 @@ public:
this->data(place).result.change(rhs.result);
}
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
{
this->data(place).result.insertResultInto(to);
}

View File

@ -5,6 +5,7 @@
#include <DB/Columns/ColumnVector.h>
#include <DB/Columns/ColumnString.h>
#include <DB/DataTypes/DataTypeAggregateFunction.h>
#include <DB/AggregateFunctions/IUnaryAggregateFunction.h>
@ -531,16 +532,19 @@ private:
DataTypePtr type;
public:
String getName() const { return Data::name(); }
String getName() const override { return Data::name(); }
DataTypePtr getReturnType() const
DataTypePtr getReturnType() const override
{
return type;
}
void setArgument(const DataTypePtr & argument)
void setArgument(const DataTypePtr & argument) override
{
type = argument;
if (typeid_cast<const DataTypeAggregateFunction *>(type.get()))
throw Exception("Illegal type " + type->getName() + " of argument of aggregate function " + getName(), ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
}
@ -549,17 +553,17 @@ public:
this->data(place).changeIfBetter(column, row_num);
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const override
{
this->data(place).changeIfBetter(this->data(rhs));
}
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override
{
this->data(place).write(buf, *type.get());
}
void deserializeMerge(AggregateDataPtr place, ReadBuffer & buf) const
void deserializeMerge(AggregateDataPtr place, ReadBuffer & buf) const override
{
Data rhs; /// Для строчек не очень оптимально, так как может делаться одна лишняя аллокация.
rhs.read(buf, *type.get());
@ -567,7 +571,7 @@ public:
this->data(place).changeIfBetter(rhs);
}
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
{
this->data(place).insertResultInto(to);
}

View File

@ -400,7 +400,7 @@ public:
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
}
void addOne(AggregateDataPtr place, const IColumn & column_left, const IColumn & column_right, size_t row_num) const
void addTwo(AggregateDataPtr place, const IColumn & column_left, const IColumn & column_right, size_t row_num) const
{
this->data(place).update(column_left, column_right, row_num);
}

View File

@ -104,28 +104,28 @@ protected:
static const Data & data(ConstAggregateDataPtr place) { return *reinterpret_cast<const Data*>(place); }
public:
void create(AggregateDataPtr place) const
void create(AggregateDataPtr place) const override
{
new (place) Data;
}
void destroy(AggregateDataPtr place) const noexcept
void destroy(AggregateDataPtr place) const noexcept override
{
data(place).~Data();
}
bool hasTrivialDestructor() const
bool hasTrivialDestructor() const override
{
return __has_trivial_destructor(Data);
}
size_t sizeOfData() const
size_t sizeOfData() const override
{
return sizeof(Data);
}
/// NOTE: Сейчас не используется (структуры с состоянием агрегации кладутся без выравнивания).
size_t alignOfData() const
size_t alignOfData() const override
{
return __alignof__(Data);
}

View File

@ -12,7 +12,7 @@ class IBinaryAggregateFunction : public IAggregateFunctionHelper<T>
const Derived & getDerived() const { return static_cast<const Derived &>(*this); }
public:
void setArguments(const DataTypes & arguments)
void setArguments(const DataTypes & arguments) override
{
if (arguments.size() != 2)
throw Exception{
@ -23,9 +23,9 @@ public:
getDerived().setArgumentsImpl(arguments);
}
void add(AggregateDataPtr place, const IColumn ** columns, const size_t row_num) const
void add(AggregateDataPtr place, const IColumn ** columns, const size_t row_num) const override
{
getDerived().addOne(place, *columns[0], *columns[1], row_num);
getDerived().addTwo(place, *columns[0], *columns[1], row_num);
}
};

View File

@ -13,7 +13,7 @@ class INullaryAggregateFunction : public IAggregateFunctionHelper<T>
{
public:
/// Получить тип результата по типам аргументов. Если функция неприменима для данных аргументов - кинуть исключение.
void setArguments(const DataTypes & arguments)
void setArguments(const DataTypes & arguments) override
{
if (arguments.size() != 0)
throw Exception("Passed " + toString(arguments.size()) + " arguments to nullary aggregate function " + this->getName(),
@ -21,7 +21,7 @@ public:
}
/// Добавить значение.
void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num) const
void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num) const override
{
static_cast<const Derived &>(*this).addZero(place);
}

View File

@ -12,7 +12,7 @@ template <typename T, typename Derived>
class IUnaryAggregateFunction : public IAggregateFunctionHelper<T>
{
public:
void setArguments(const DataTypes & arguments)
void setArguments(const DataTypes & arguments) override
{
if (arguments.size() != 1)
throw Exception("Passed " + toString(arguments.size()) + " arguments to unary aggregate function " + this->getName(),
@ -23,7 +23,7 @@ public:
virtual void setArgument(const DataTypePtr & argument) = 0;
/// Добавить значение.
void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num) const
void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num) const override
{
static_cast<const Derived &>(*this).addOne(place, *columns[0], row_num);
}

View File

@ -103,30 +103,9 @@ public:
arenas.push_back(arena_);
}
ColumnPtr convertToValues() const
{
const IAggregateFunction * function = holder->func;
ColumnPtr res = function->getReturnType()->createColumn();
/** Если агрегатная функция возвращает нефинализированное состояние,
* то надо просто скопировать указатели на него а также разделяемое владение аренами.
*/
if (typeid_cast<const ColumnAggregateFunction *>(res.get()))
{
ColumnAggregateFunction * res_ = new ColumnAggregateFunction(*this);
res = res_;
res_->getData().assign(getData().begin(), getData().end());
return res;
}
IColumn & column = *res;
res->reserve(getData().size());
for (auto val : getData())
function->insertResultInto(val, column);
return res;
}
/** Преобразовать столбец состояний агрегатной функции в столбец с готовыми значениями результатов.
*/
ColumnPtr convertToValues() const;
std::string getName() const override { return "ColumnAggregateFunction"; }
@ -186,6 +165,9 @@ public:
{
IAggregateFunction * function = holder.get()->func;
if (unlikely(arenas.empty()))
arenas.emplace_back(new Arena);
getData().push_back(arenas.back().get()->alloc(function->sizeOfData()));
function->create(getData().back());
ReadBufferFromString read_buffer(x.get<const String &>());

View File

@ -21,7 +21,6 @@ private:
/** См. комментарий в HashTableAllocator.h
*/
static constexpr size_t MMAP_THRESHOLD = 64 * (1 << 20);
static constexpr size_t HUGE_PAGE_SIZE = 2 * (1 << 20);
static constexpr size_t MMAP_MIN_ALIGNMENT = 4096;
static constexpr size_t MALLOC_MIN_ALIGNMENT = 8;
@ -42,10 +41,6 @@ public:
buf = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
if (MAP_FAILED == buf)
DB::throwFromErrno("Allocator: Cannot mmap.", DB::ErrorCodes::CANNOT_ALLOCATE_MEMORY);
/// См. комментарий в HashTableAllocator.h
if (size >= HUGE_PAGE_SIZE && 0 != madvise(buf, size, MADV_HUGEPAGE))
DB::throwFromErrno("HashTableAllocator: Cannot madvise with MADV_HUGEPAGE.", DB::ErrorCodes::CANNOT_ALLOCATE_MEMORY);
}
else
{

View File

@ -36,7 +36,6 @@ private:
* PS. Также это требуется, потому что tcmalloc не может выделить кусок памяти больше 16 GB.
*/
static constexpr size_t MMAP_THRESHOLD = 64 * (1 << 20);
static constexpr size_t HUGE_PAGE_SIZE = 2 * (1 << 20);
public:
/// Выделить кусок памяти и заполнить его нулями.
@ -53,14 +52,6 @@ public:
if (MAP_FAILED == buf)
DB::throwFromErrno("HashTableAllocator: Cannot mmap.", DB::ErrorCodes::CANNOT_ALLOCATE_MEMORY);
/** Использование huge pages позволяет увеличить производительность более чем в три раза
* в запросе SELECT number % 1000000 AS k, count() FROM system.numbers GROUP BY k,
* (хэш-таблица на 1 000 000 элементов)
* и примерно на 15% в случае хэш-таблицы на 100 000 000 элементов.
*/
if (size >= HUGE_PAGE_SIZE && 0 != madvise(buf, size, MADV_HUGEPAGE))
DB::throwFromErrno("HashTableAllocator: Cannot madvise with MADV_HUGEPAGE.", DB::ErrorCodes::CANNOT_ALLOCATE_MEMORY);
/// Заполнение нулями не нужно - mmap сам это делает.
}
else
@ -117,10 +108,6 @@ public:
if (MAP_FAILED == buf)
DB::throwFromErrno("HashTableAllocator: Cannot mremap.", DB::ErrorCodes::CANNOT_MREMAP);
/** Здесь не получается сделать madvise с MADV_HUGEPAGE.
* Похоже, что при mremap, huge pages сами расширяются на новую область.
*/
/// Заполнение нулями не нужно.
}
else

View File

@ -34,7 +34,7 @@ public:
QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete,
const Context & context_ = getDefaultContext());
/// Принимает пулы - один для каждого шарда -, из которых нужно будет достать одно или несколько соединений.
/// Принимает пулы - один для каждого шарда, из которых нужно будет достать одно или несколько соединений.
RemoteBlockInputStream(ConnectionPoolsPtr & pools_, const String & query_, const Settings * settings_,
ThrottlerPtr throttler_ = nullptr, const Tables & external_tables_ = Tables(),
QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete,

View File

@ -95,35 +95,57 @@ private:
WriteBufferFromString out{query};
writeString("SELECT ", out);
if (!dict_struct.id.expression.empty())
if (dict_struct.id)
{
writeParenthesisedString(dict_struct.id.expression, out);
writeString(" AS ", out);
if (!dict_struct.id->expression.empty())
{
writeParenthesisedString(dict_struct.id->expression, out);
writeString(" AS ", out);
}
writeProbablyBackQuotedString(dict_struct.id->name, out);
if (dict_struct.range_min && dict_struct.range_max)
{
writeString(", ", out);
if (!dict_struct.range_min->expression.empty())
{
writeParenthesisedString(dict_struct.range_min->expression, out);
writeString(" AS ", out);
}
writeProbablyBackQuotedString(dict_struct.range_min->name, out);
writeString(", ", out);
if (!dict_struct.range_max->expression.empty())
{
writeParenthesisedString(dict_struct.range_max->expression, out);
writeString(" AS ", out);
}
writeProbablyBackQuotedString(dict_struct.range_max->name, out);
}
}
writeProbablyBackQuotedString(dict_struct.id.name, out);
if (dict_struct.range_min && dict_struct.range_max)
else if (dict_struct.key)
{
writeString(", ", out);
if (!dict_struct.range_min->expression.empty())
auto first = true;
for (const auto & key : *dict_struct.key)
{
writeParenthesisedString(dict_struct.range_min->expression, out);
writeString(" AS ", out);
if (!first)
writeString(", ", out);
first = false;
if (!key.expression.empty())
{
writeParenthesisedString(key.expression, out);
writeString(" AS ", out);
}
writeProbablyBackQuotedString(key.name, out);
}
writeProbablyBackQuotedString(dict_struct.range_min->name, out);
writeString(", ", out);
if (!dict_struct.range_max->expression.empty())
{
writeParenthesisedString(dict_struct.range_max->expression, out);
writeString(" AS ", out);
}
writeProbablyBackQuotedString(dict_struct.range_max->name, out);
}
for (const auto & attr : dict_struct.attributes)
@ -161,19 +183,22 @@ private:
std::string composeLoadIdsQuery(const std::vector<std::uint64_t> ids)
{
if (dict_struct.key)
throw Exception{"Complex key not supported", ErrorCodes::UNSUPPORTED_METHOD};
std::string query;
{
WriteBufferFromString out{query};
writeString("SELECT ", out);
if (!dict_struct.id.expression.empty())
if (!dict_struct.id->expression.empty())
{
writeParenthesisedString(dict_struct.id.expression, out);
writeParenthesisedString(dict_struct.id->expression, out);
writeString(" AS ", out);
}
writeProbablyBackQuotedString(dict_struct.id.name, out);
writeProbablyBackQuotedString(dict_struct.id->name, out);
for (const auto & attr : dict_struct.attributes)
{
@ -204,7 +229,7 @@ private:
writeString(" AND ", out);
}
writeProbablyBackQuotedString(dict_struct.id.name, out);
writeProbablyBackQuotedString(dict_struct.id->name, out);
writeString(" IN (", out);
auto first = true;

View File

@ -0,0 +1,515 @@
#pragma once
#include <DB/Dictionaries/IDictionary.h>
#include <DB/Dictionaries/IDictionarySource.h>
#include <DB/Dictionaries/DictionaryStructure.h>
#include <DB/Core/StringRef.h>
#include <DB/Common/HashTable/HashMap.h>
#include <DB/Columns/ColumnString.h>
#include <ext/range.hpp>
#include <atomic>
#include <memory>
#include <tuple>
namespace DB
{
class ComplexKeyHashedDictionary final : public IDictionaryBase
{
public:
ComplexKeyHashedDictionary(
const std::string & name, const DictionaryStructure & dict_struct, DictionarySourcePtr source_ptr,
const DictionaryLifetime dict_lifetime, bool require_nonempty)
: name{name}, dict_struct(dict_struct), source_ptr{std::move(source_ptr)}, dict_lifetime(dict_lifetime),
require_nonempty(require_nonempty), key_description{createKeyDescription(dict_struct)}
{
createAttributes();
try
{
loadData();
calculateBytesAllocated();
}
catch (...)
{
creation_exception = std::current_exception();
}
creation_time = std::chrono::system_clock::now();
}
ComplexKeyHashedDictionary(const ComplexKeyHashedDictionary & other)
: ComplexKeyHashedDictionary{other.name, other.dict_struct, other.source_ptr->clone(), other.dict_lifetime, other.require_nonempty}
{}
std::string getKeyDescription() const { return key_description; };
std::exception_ptr getCreationException() const override { return creation_exception; }
std::string getName() const override { return name; }
std::string getTypeName() const override { return "ComplexKeyHashed"; }
std::size_t getBytesAllocated() const override { return bytes_allocated; }
std::size_t getQueryCount() const override { return query_count.load(std::memory_order_relaxed); }
double getHitRate() const override { return 1.0; }
std::size_t getElementCount() const override { return element_count; }
double getLoadFactor() const override { return static_cast<double>(element_count) / bucket_count; }
bool isCached() const override { return false; }
DictionaryPtr clone() const override { return std::make_unique<ComplexKeyHashedDictionary>(*this); }
const IDictionarySource * getSource() const override { return source_ptr.get(); }
const DictionaryLifetime & getLifetime() const override { return dict_lifetime; }
const DictionaryStructure & getStructure() const override { return dict_struct; }
std::chrono::time_point<std::chrono::system_clock> getCreationTime() const override
{
return creation_time;
}
bool isInjective(const std::string & attribute_name) const override
{
return dict_struct.attributes[&getAttribute(attribute_name) - attributes.data()].injective;
}
#define DECLARE_MULTIPLE_GETTER(TYPE)\
void get##TYPE(\
const std::string & attribute_name, const ConstColumnPlainPtrs & key_columns, const DataTypes & key_types,\
PODArray<TYPE> & out) const\
{\
validateKeyColumns(key_types);\
\
const auto & attribute = getAttribute(attribute_name);\
if (attribute.type != AttributeUnderlyingType::TYPE)\
throw Exception{\
name + ": type mismatch: attribute " + attribute_name + " has type " + toString(attribute.type),\
ErrorCodes::TYPE_MISMATCH\
};\
\
const auto null_value = std::get<TYPE>(attribute.null_values);\
\
getItems<TYPE>(attribute, key_columns,\
[&] (const std::size_t row, const auto value) { out[row] = value; },\
[&] (const std::size_t) { return null_value; });\
}
DECLARE_MULTIPLE_GETTER(UInt8)
DECLARE_MULTIPLE_GETTER(UInt16)
DECLARE_MULTIPLE_GETTER(UInt32)
DECLARE_MULTIPLE_GETTER(UInt64)
DECLARE_MULTIPLE_GETTER(Int8)
DECLARE_MULTIPLE_GETTER(Int16)
DECLARE_MULTIPLE_GETTER(Int32)
DECLARE_MULTIPLE_GETTER(Int64)
DECLARE_MULTIPLE_GETTER(Float32)
DECLARE_MULTIPLE_GETTER(Float64)
#undef DECLARE_MULTIPLE_GETTER
void getString(
const std::string & attribute_name, const ConstColumnPlainPtrs & key_columns, const DataTypes & key_types,
ColumnString * out) const
{
validateKeyColumns(key_types);
const auto & attribute = getAttribute(attribute_name);
if (attribute.type != AttributeUnderlyingType::String)
throw Exception{
name + ": type mismatch: attribute " + attribute_name + " has type " + toString(attribute.type),
ErrorCodes::TYPE_MISMATCH
};
const auto & null_value = StringRef{std::get<String>(attribute.null_values)};
getItems<StringRef>(attribute, key_columns,
[&] (const std::size_t row, const StringRef value) { out->insertData(value.data, value.size); },
[&] (const std::size_t) { return null_value; });
}
#define DECLARE_MULTIPLE_GETTER_WITH_DEFAULT(TYPE)\
void get##TYPE(\
const std::string & attribute_name, const ConstColumnPlainPtrs & key_columns, const DataTypes & key_types,\
const PODArray<TYPE> & def, PODArray<TYPE> & out) const\
{\
validateKeyColumns(key_types);\
\
const auto & attribute = getAttribute(attribute_name);\
if (attribute.type != AttributeUnderlyingType::TYPE)\
throw Exception{\
name + ": type mismatch: attribute " + attribute_name + " has type " + toString(attribute.type),\
ErrorCodes::TYPE_MISMATCH\
};\
\
getItems<TYPE>(attribute, key_columns,\
[&] (const std::size_t row, const auto value) { out[row] = value; },\
[&] (const std::size_t row) { return def[row]; });\
}
DECLARE_MULTIPLE_GETTER_WITH_DEFAULT(UInt8)
DECLARE_MULTIPLE_GETTER_WITH_DEFAULT(UInt16)
DECLARE_MULTIPLE_GETTER_WITH_DEFAULT(UInt32)
DECLARE_MULTIPLE_GETTER_WITH_DEFAULT(UInt64)
DECLARE_MULTIPLE_GETTER_WITH_DEFAULT(Int8)
DECLARE_MULTIPLE_GETTER_WITH_DEFAULT(Int16)
DECLARE_MULTIPLE_GETTER_WITH_DEFAULT(Int32)
DECLARE_MULTIPLE_GETTER_WITH_DEFAULT(Int64)
DECLARE_MULTIPLE_GETTER_WITH_DEFAULT(Float32)
DECLARE_MULTIPLE_GETTER_WITH_DEFAULT(Float64)
#undef DECLARE_MULTIPLE_GETTER_WITH_DEFAULT
void getString(
const std::string & attribute_name, const ConstColumnPlainPtrs & key_columns, const DataTypes & key_types,
const ColumnString * const def, ColumnString * const out) const
{
validateKeyColumns(key_types);
const auto & attribute = getAttribute(attribute_name);
if (attribute.type != AttributeUnderlyingType::String)
throw Exception{
name + ": type mismatch: attribute " + attribute_name + " has type " + toString(attribute.type),
ErrorCodes::TYPE_MISMATCH
};
getItems<StringRef>(attribute, key_columns,
[&] (const std::size_t row, const StringRef value) { out->insertData(value.data, value.size); },
[&] (const std::size_t row) { return def->getDataAt(row); });
}
private:
template <typename Value> using MapType = HashMapWithSavedHash<StringRef, Value, StringRefHash>;
template <typename Value> using MapPointerType = std::unique_ptr<MapType<Value>>;
struct attribute_t final
{
AttributeUnderlyingType type;
std::tuple<
UInt8, UInt16, UInt32, UInt64,
Int8, Int16, Int32, Int64,
Float32, Float64,
String> null_values;
std::tuple<
MapPointerType<UInt8>, MapPointerType<UInt16>, MapPointerType<UInt32>, MapPointerType<UInt64>,
MapPointerType<Int8>, MapPointerType<Int16>, MapPointerType<Int32>, MapPointerType<Int64>,
MapPointerType<Float32>, MapPointerType<Float64>,
MapPointerType<StringRef>> maps;
std::unique_ptr<Arena> string_arena;
};
void createAttributes()
{
const auto size = dict_struct.attributes.size();
attributes.reserve(size);
for (const auto & attribute : dict_struct.attributes)
{
attribute_index_by_name.emplace(attribute.name, attributes.size());
attributes.push_back(createAttributeWithType(attribute.underlying_type, attribute.null_value));
if (attribute.hierarchical)
throw Exception{
name + ": hierarchical attributes not supported for dictionary of type " + getTypeName(),
ErrorCodes::TYPE_MISMATCH
};
}
}
void loadData()
{
auto stream = source_ptr->loadAll();
stream->readPrefix();
/// created upfront to avoid excess allocations
const auto keys_size = dict_struct.key->size();
StringRefs keys(keys_size);
const auto attributes_size = attributes.size();
while (const auto block = stream->read())
{
const auto rows = block.rowsInFirstColumn();
element_count += rows;
const auto key_column_ptrs = ext::map<ConstColumnPlainPtrs>(ext::range(0, keys_size),
[&] (const std::size_t attribute_idx) {
return block.getByPosition(attribute_idx).column.get();
});
const auto attribute_column_ptrs = ext::map<ConstColumnPlainPtrs>(ext::range(0, attributes_size),
[&] (const std::size_t attribute_idx) {
return block.getByPosition(keys_size + attribute_idx).column.get();
});
for (const auto row_idx : ext::range(0, rows))
{
/// calculate key once per row
const auto key = placeKeysInPool(row_idx, key_column_ptrs, keys, keys_pool);
auto should_rollback = false;
for (const auto attribute_idx : ext::range(0, attributes_size))
{
const auto & attribute_column = *attribute_column_ptrs[attribute_idx];
auto & attribute = attributes[attribute_idx];
const auto inserted = setAttributeValue(attribute, key, attribute_column[row_idx]);
if (!inserted)
should_rollback = true;
}
/// @note on multiple equal keys the mapped value for the first one is stored
if (should_rollback)
keys_pool.rollback(key.size);
}
}
stream->readSuffix();
if (require_nonempty && 0 == element_count)
throw Exception{
name + ": dictionary source is empty and 'require_nonempty' property is set.",
ErrorCodes::DICTIONARY_IS_EMPTY
};
}
template <typename T>
void addAttributeSize(const attribute_t & attribute)
{
const auto & map_ref = std::get<MapPointerType<T>>(attribute.maps);
bytes_allocated += sizeof(MapType<T>) + map_ref->getBufferSizeInBytes();
bucket_count = map_ref->getBufferSizeInCells();
}
void calculateBytesAllocated()
{
bytes_allocated += attributes.size() * sizeof(attributes.front());
for (const auto & attribute : attributes)
{
switch (attribute.type)
{
case AttributeUnderlyingType::UInt8: addAttributeSize<UInt8>(attribute); break;
case AttributeUnderlyingType::UInt16: addAttributeSize<UInt16>(attribute); break;
case AttributeUnderlyingType::UInt32: addAttributeSize<UInt32>(attribute); break;
case AttributeUnderlyingType::UInt64: addAttributeSize<UInt64>(attribute); break;
case AttributeUnderlyingType::Int8: addAttributeSize<Int8>(attribute); break;
case AttributeUnderlyingType::Int16: addAttributeSize<Int16>(attribute); break;
case AttributeUnderlyingType::Int32: addAttributeSize<Int32>(attribute); break;
case AttributeUnderlyingType::Int64: addAttributeSize<Int64>(attribute); break;
case AttributeUnderlyingType::Float32: addAttributeSize<Float32>(attribute); break;
case AttributeUnderlyingType::Float64: addAttributeSize<Float64>(attribute); break;
case AttributeUnderlyingType::String:
{
addAttributeSize<StringRef>(attribute);
bytes_allocated += sizeof(Arena) + attribute.string_arena->size();
break;
}
}
}
bytes_allocated += keys_pool.size();
}
template <typename T>
void createAttributeImpl(attribute_t & attribute, const Field & null_value)
{
std::get<T>(attribute.null_values) = null_value.get<typename NearestFieldType<T>::Type>();
std::get<MapPointerType<T>>(attribute.maps) = std::make_unique<MapType<T>>();
}
attribute_t createAttributeWithType(const AttributeUnderlyingType type, const Field & null_value)
{
attribute_t attr{type};
switch (type)
{
case AttributeUnderlyingType::UInt8: createAttributeImpl<UInt8>(attr, null_value); break;
case AttributeUnderlyingType::UInt16: createAttributeImpl<UInt16>(attr, null_value); break;
case AttributeUnderlyingType::UInt32: createAttributeImpl<UInt32>(attr, null_value); break;
case AttributeUnderlyingType::UInt64: createAttributeImpl<UInt64>(attr, null_value); break;
case AttributeUnderlyingType::Int8: createAttributeImpl<Int8>(attr, null_value); break;
case AttributeUnderlyingType::Int16: createAttributeImpl<Int16>(attr, null_value); break;
case AttributeUnderlyingType::Int32: createAttributeImpl<Int32>(attr, null_value); break;
case AttributeUnderlyingType::Int64: createAttributeImpl<Int64>(attr, null_value); break;
case AttributeUnderlyingType::Float32: createAttributeImpl<Float32>(attr, null_value); break;
case AttributeUnderlyingType::Float64: createAttributeImpl<Float64>(attr, null_value); break;
case AttributeUnderlyingType::String:
{
std::get<String>(attr.null_values) = null_value.get<String>();
std::get<MapPointerType<StringRef>>(attr.maps) = std::make_unique<MapType<StringRef>>();
attr.string_arena = std::make_unique<Arena>();
break;
}
}
return attr;
}
static std::string createKeyDescription(const DictionaryStructure & dict_struct)
{
std::ostringstream out;
out << '(';
auto first = true;
for (const auto & key : *dict_struct.key)
{
if (!first)
out << ", ";
first = false;
out << key.type->getName();
}
out << ')';
return out.str();
}
void validateKeyColumns(const DataTypes & key_types) const
{
if (key_types.size() != dict_struct.key->size())
throw Exception{
"Key structure does not match, expected " + key_description,
ErrorCodes::TYPE_MISMATCH
};
for (const auto i : ext::range(0, key_types.size()))
{
const auto & expected_type = (*dict_struct.key)[i].type->getName();
const auto & actual_type = key_types[i]->getName();
if (expected_type != actual_type)
throw Exception{
"Key type at position " + std::to_string(i) + " does not match, expected " + expected_type +
", found " + actual_type,
ErrorCodes::TYPE_MISMATCH
};
}
}
template <typename T, typename ValueSetter, typename DefaultGetter>
void getItems(
const attribute_t & attribute, const ConstColumnPlainPtrs & key_columns, ValueSetter && set_value,
DefaultGetter && get_default) const
{
const auto & attr = *std::get<MapPointerType<T>>(attribute.maps);
const auto keys_size = key_columns.size();
StringRefs keys(keys_size);
Arena temporary_keys_pool;
const auto rows = key_columns.front()->size();
for (const auto i : ext::range(0, rows))
{
/// copy key data to arena so it is contiguous and return StringRef to it
const auto key = placeKeysInPool(i, key_columns, keys, temporary_keys_pool);
const auto it = attr.find(key);
set_value(i, it != attr.end() ? it->second : get_default(i));
/// free memory allocated for key
temporary_keys_pool.rollback(key.size);
}
query_count.fetch_add(rows, std::memory_order_relaxed);
}
template <typename T>
bool setAttributeValueImpl(attribute_t & attribute, const StringRef key, const T value)
{
auto & map = *std::get<MapPointerType<T>>(attribute.maps);
const auto pair = map.insert({ key, value });
return pair.second;
}
bool setAttributeValue(attribute_t & attribute, const StringRef key, const Field & value)
{
switch (attribute.type)
{
case AttributeUnderlyingType::UInt8: return setAttributeValueImpl<UInt8>(attribute, key, value.get<UInt64>());
case AttributeUnderlyingType::UInt16: return setAttributeValueImpl<UInt16>(attribute, key, value.get<UInt64>());
case AttributeUnderlyingType::UInt32: return setAttributeValueImpl<UInt32>(attribute, key, value.get<UInt64>());
case AttributeUnderlyingType::UInt64: return setAttributeValueImpl<UInt64>(attribute, key, value.get<UInt64>());
case AttributeUnderlyingType::Int8: return setAttributeValueImpl<Int8>(attribute, key, value.get<Int64>());
case AttributeUnderlyingType::Int16: return setAttributeValueImpl<Int16>(attribute, key, value.get<Int64>());
case AttributeUnderlyingType::Int32: return setAttributeValueImpl<Int32>(attribute, key, value.get<Int64>());
case AttributeUnderlyingType::Int64: return setAttributeValueImpl<Int64>(attribute, key, value.get<Int64>());
case AttributeUnderlyingType::Float32: return setAttributeValueImpl<Float32>(attribute, key, value.get<Float64>());
case AttributeUnderlyingType::Float64: return setAttributeValueImpl<Float64>(attribute, key, value.get<Float64>());
case AttributeUnderlyingType::String:
{
auto & map = *std::get<MapPointerType<StringRef>>(attribute.maps);
const auto & string = value.get<String>();
const auto string_in_arena = attribute.string_arena->insert(string.data(), string.size());
const auto pair = map.insert({ key, StringRef{string_in_arena, string.size()} });
return pair.second;
}
}
return {};
}
const attribute_t & getAttribute(const std::string & attribute_name) const
{
const auto it = attribute_index_by_name.find(attribute_name);
if (it == std::end(attribute_index_by_name))
throw Exception{
name + ": no such attribute '" + attribute_name + "'",
ErrorCodes::BAD_ARGUMENTS
};
return attributes[it->second];
}
static StringRef placeKeysInPool(
const std::size_t row, const ConstColumnPlainPtrs & key_columns, StringRefs & keys, Arena & pool)
{
const auto keys_size = key_columns.size();
size_t sum_keys_size{};
for (const auto i : ext::range(0, keys_size))
{
keys[i] = key_columns[i]->getDataAtWithTerminatingZero(row);
sum_keys_size += keys[i].size;
}
const auto res = pool.alloc(sum_keys_size);
auto place = res;
for (size_t j = 0; j < keys_size; ++j)
{
memcpy(place, keys[j].data, keys[j].size);
place += keys[j].size;
}
return { res, sum_keys_size };
}
const std::string name;
const DictionaryStructure dict_struct;
const DictionarySourcePtr source_ptr;
const DictionaryLifetime dict_lifetime;
const bool require_nonempty;
const std::string key_description;
std::map<std::string, std::size_t> attribute_index_by_name;
std::vector<attribute_t> attributes;
Arena keys_pool;
std::size_t bytes_allocated = 0;
std::size_t element_count = 0;
std::size_t bucket_count = 0;
mutable std::atomic<std::size_t> query_count{0};
std::chrono::time_point<std::chrono::system_clock> creation_time;
std::exception_ptr creation_exception;
};
}

View File

@ -20,14 +20,29 @@ namespace
Block createSampleBlock(const DictionaryStructure & dict_struct)
{
Block block{
ColumnWithTypeAndName{new ColumnUInt64{1}, new DataTypeUInt64, dict_struct.id.name}
};
Block block;
if (dict_struct.id)
block.insert(ColumnWithTypeAndName{
new ColumnUInt64{1}, new DataTypeUInt64, dict_struct.id->name
});
if (dict_struct.key)
{
for (const auto & attribute : *dict_struct.key)
{
auto column = attribute.type->createColumn();
column->insertDefault();
block.insert(ColumnWithTypeAndName{column, attribute.type, attribute.name});
}
}
if (dict_struct.range_min)
for (const auto & attribute : { dict_struct.range_min, dict_struct.range_max })
block.insert(
ColumnWithTypeAndName{new ColumnUInt16{1}, new DataTypeDate, attribute->name});
block.insert(ColumnWithTypeAndName{
new ColumnUInt16{1}, new DataTypeDate, attribute->name
});
for (const auto & attribute : dict_struct.attributes)
{

View File

@ -136,35 +136,65 @@ struct DictionarySpecialAttribute final
/// Name of identifier plus list of attributes
struct DictionaryStructure final
{
DictionarySpecialAttribute id;
std::experimental::optional<DictionarySpecialAttribute> id;
std::experimental::optional<std::vector<DictionaryAttribute>> key;
std::vector<DictionaryAttribute> attributes;
std::experimental::optional<DictionarySpecialAttribute> range_min;
std::experimental::optional<DictionarySpecialAttribute> range_max;
bool has_expressions = false;
DictionaryStructure(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix)
: id{config, config_prefix + ".id"}
{
if (id.name.empty())
throw Exception{
"No 'id' specified for dictionary",
ErrorCodes::BAD_ARGUMENTS
};
const auto has_id = config.has(config_prefix + ".id");
const auto has_key = config.has(config_prefix + ".key");
if (config.has(config_prefix + ".range_min"))
range_min.emplace(config, config_prefix + ".range_min");
if (has_key && has_id)
throw Exception{"Only one of 'id' and 'key' should be specified", ErrorCodes::BAD_ARGUMENTS};
if (config.has(config_prefix + ".range_max"))
range_max.emplace(config, config_prefix + ".range_max");
if (has_id)
id.emplace(config, config_prefix + ".id");
else if (has_key)
{
key.emplace(getAttributes(config, config_prefix + ".key", false, false));
if (key->empty())
throw Exception{"Empty 'key' supplied", ErrorCodes::BAD_ARGUMENTS};
}
else
throw Exception{"Dictionary structure should specify either 'id' or 'key'", ErrorCodes::BAD_ARGUMENTS};
if (!id.expression.empty() ||
(range_min && !range_min->expression.empty()) || (range_max && !range_max->expression.empty()))
has_expressions = true;
if (id)
{
if (id->name.empty())
throw Exception{"'id' cannot be empty", ErrorCodes::BAD_ARGUMENTS};
if (config.has(config_prefix + ".range_min"))
range_min.emplace(config, config_prefix + ".range_min");
if (config.has(config_prefix + ".range_max"))
range_max.emplace(config, config_prefix + ".range_max");
if (!id->expression.empty() ||
(range_min && !range_min->expression.empty()) ||
(range_max && !range_max->expression.empty()))
has_expressions = true;
}
attributes = getAttributes(config, config_prefix);
if (attributes.empty())
throw Exception{"Dictionary has no attributes defined", ErrorCodes::BAD_ARGUMENTS};
}
private:
std::vector<DictionaryAttribute> getAttributes(
const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix,
const bool hierarchy_allowed = true, const bool allow_null_values = true)
{
Poco::Util::AbstractConfiguration::Keys keys;
config.keys(config_prefix, keys);
auto has_hierarchy = false;
std::vector<DictionaryAttribute> attributes;
for (const auto & key : keys)
{
if (0 != strncmp(key.data(), "attribute", strlen("attribute")))
@ -181,19 +211,22 @@ struct DictionaryStructure final
if (!expression.empty())
has_expressions = true;
const auto null_value_string = config.getString(prefix + "null_value");
Field null_value;
try
if (allow_null_values)
{
ReadBufferFromString null_value_buffer{null_value_string};
type->deserializeText(null_value, null_value_buffer);
}
catch (const std::exception & e)
{
throw Exception{
std::string{"Error parsing null_value: "} + e.what(),
ErrorCodes::BAD_ARGUMENTS
};
const auto null_value_string = config.getString(prefix + "null_value");
try
{
ReadBufferFromString null_value_buffer{null_value_string};
type->deserializeText(null_value, null_value_buffer);
}
catch (const std::exception & e)
{
throw Exception{
std::string{"Error parsing null_value: "} + e.what(),
ErrorCodes::BAD_ARGUMENTS
};
}
}
const auto hierarchical = config.getBool(prefix + "hierarchical", false);
@ -204,6 +237,12 @@ struct DictionaryStructure final
ErrorCodes::BAD_ARGUMENTS
};
if (has_hierarchy && !hierarchy_allowed)
throw Exception{
"Hierarchy not allowed in '" + prefix,
ErrorCodes::BAD_ARGUMENTS
};
if (has_hierarchy && hierarchical)
throw Exception{
"Only one hierarchical attribute supported",
@ -217,11 +256,7 @@ struct DictionaryStructure final
});
}
if (attributes.empty())
throw Exception{
"Dictionary has no attributes defined",
ErrorCodes::BAD_ARGUMENTS
};
return attributes;
}
};

View File

@ -354,6 +354,7 @@ private:
for (const auto i : ext::range(0, ids.size()))
{
const auto id = ids[i];
/// @todo only works for out-of-bounds values, other defaults are explicitly stored.
out[i] = id < attr.size() ? attr[id] : def[i];
}

View File

@ -4,6 +4,7 @@
#include <DB/Dictionaries/MongoDBBlockInputStream.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <mongo/client/dbclient.h>
#include <ext/collection_cast.hpp>
namespace DB
@ -53,8 +54,8 @@ class MongoDBDictionarySource final : public IDictionarySource
if (!mongo_init_status.isOK())
throw DB::Exception{
"mongo::client::initialize() failed: " + mongo_init_status.toString(),
ErrorCodes::MONGODB_INIT_FAILED
"mongo::client::initialize() failed: " + mongo_init_status.toString(),
ErrorCodes::MONGODB_INIT_FAILED
};
LOG_TRACE(&Logger::get("MongoDBDictionarySource"), "mongo::client::initialize() ok");
@ -97,9 +98,12 @@ public:
BlockInputStreamPtr loadIds(const std::vector<std::uint64_t> & ids) override
{
if (dict_struct.key)
throw Exception{"Complex key not supported", ErrorCodes::UNSUPPORTED_METHOD};
/// mongo::BSONObj has shitty design and does not use fixed width integral types
const std::vector<long long int> iids{std::begin(ids), std::end(ids)};
const auto ids_enumeration = BSON(dict_struct.id.name << BSON("$in" << iids));
const auto ids_enumeration = BSON(
dict_struct.id->name << BSON("$in" << ext::collection_cast<std::vector<long long int>>(ids)));
return new MongoDBBlockInputStream{
connection.query(db + '.' + collection, ids_enumeration, 0, 0, &fields_to_query),

View File

@ -156,35 +156,57 @@ private:
WriteBufferFromString out{query};
writeString("SELECT ", out);
if (!dict_struct.id.expression.empty())
if (dict_struct.id)
{
writeParenthesisedString(dict_struct.id.expression, out);
writeString(" AS ", out);
if (!dict_struct.id->expression.empty())
{
writeParenthesisedString(dict_struct.id->expression, out);
writeString(" AS ", out);
}
writeProbablyBackQuotedString(dict_struct.id->name, out);
if (dict_struct.range_min && dict_struct.range_max)
{
writeString(", ", out);
if (!dict_struct.range_min->expression.empty())
{
writeParenthesisedString(dict_struct.range_min->expression, out);
writeString(" AS ", out);
}
writeProbablyBackQuotedString(dict_struct.range_min->name, out);
writeString(", ", out);
if (!dict_struct.range_max->expression.empty())
{
writeParenthesisedString(dict_struct.range_max->expression, out);
writeString(" AS ", out);
}
writeProbablyBackQuotedString(dict_struct.range_max->name, out);
}
}
writeProbablyBackQuotedString(dict_struct.id.name, out);
if (dict_struct.range_min && dict_struct.range_max)
else if (dict_struct.key)
{
writeString(", ", out);
if (!dict_struct.range_min->expression.empty())
auto first = true;
for (const auto & key : *dict_struct.key)
{
writeParenthesisedString(dict_struct.range_min->expression, out);
writeString(" AS ", out);
if (!first)
writeString(", ", out);
first = false;
if (!key.expression.empty())
{
writeParenthesisedString(key.expression, out);
writeString(" AS ", out);
}
writeProbablyBackQuotedString(key.name, out);
}
writeProbablyBackQuotedString(dict_struct.range_min->name, out);
writeString(", ", out);
if (!dict_struct.range_max->expression.empty())
{
writeParenthesisedString(dict_struct.range_max->expression, out);
writeString(" AS ", out);
}
writeProbablyBackQuotedString(dict_struct.range_max->name, out);
}
for (const auto & attr : dict_struct.attributes)
@ -222,19 +244,22 @@ private:
std::string composeLoadIdsQuery(const std::vector<std::uint64_t> & ids)
{
if (dict_struct.key)
throw Exception{"Complex key not supported", ErrorCodes::UNSUPPORTED_METHOD};
std::string query;
{
WriteBufferFromString out{query};
writeString("SELECT ", out);
if (!dict_struct.id.expression.empty())
if (!dict_struct.id->expression.empty())
{
writeParenthesisedString(dict_struct.id.expression, out);
writeParenthesisedString(dict_struct.id->expression, out);
writeString(" AS ", out);
}
writeProbablyBackQuotedString(dict_struct.id.name, out);
writeProbablyBackQuotedString(dict_struct.id->name, out);
for (const auto & attr : dict_struct.attributes)
{
@ -265,7 +290,7 @@ private:
writeString(" AND ", out);
}
writeProbablyBackQuotedString(dict_struct.id.name, out);
writeProbablyBackQuotedString(dict_struct.id->name, out);
writeString(" IN (", out);
auto first = true;

View File

@ -5,6 +5,7 @@
#include <DB/DataTypes/DataTypeString.h>
#include <DB/DataTypes/DataTypeDate.h>
#include <DB/DataTypes/DataTypeDateTime.h>
#include <DB/DataTypes/DataTypeTuple.h>
#include <DB/Columns/ColumnVector.h>
#include <DB/Columns/ColumnArray.h>
@ -18,6 +19,7 @@
#include <DB/Dictionaries/FlatDictionary.h>
#include <DB/Dictionaries/HashedDictionary.h>
#include <DB/Dictionaries/CacheDictionary.h>
#include <DB/Dictionaries/ComplexKeyHashedDictionary.h>
#include <DB/Dictionaries/RangeHashedDictionary.h>
#include <ext/range.hpp>
@ -781,11 +783,12 @@ private:
};
}
if (!typeid_cast<const DataTypeUInt64 *>(arguments[2].get()))
if (!typeid_cast<const DataTypeUInt64 *>(arguments[2].get()) &&
!typeid_cast<const DataTypeTuple *>(arguments[2].get()))
{
throw Exception{
"Illegal type " + arguments[2]->getName() + " of third argument of function " + getName()
+ ", must be UInt64.",
+ ", must be UInt64 or tuple(...).",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT
};
}
@ -794,7 +797,7 @@ private:
{
throw Exception{
"Illegal type " + arguments[3]->getName() + " of fourth argument of function " + getName()
+ ", must be Date.",
+ ", must be Date.",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT
};
}
@ -817,6 +820,7 @@ private:
if (!executeDispatch<FlatDictionary>(block, arguments, result, dict_ptr) &&
!executeDispatch<HashedDictionary>(block, arguments, result, dict_ptr) &&
!executeDispatch<CacheDictionary>(block, arguments, result, dict_ptr) &&
!executeDispatchComplex(block, arguments, result, dict_ptr) &&
!executeDispatchRange<RangeHashedDictionary>(block, arguments, result, dict_ptr))
throw Exception{
"Unsupported dictionary type " + dict_ptr->getTypeName(),
@ -876,6 +880,52 @@ private:
return true;
}
bool executeDispatchComplex(
Block & block, const ColumnNumbers & arguments, const size_t result, const IDictionaryBase * const dictionary)
{
const auto dict = typeid_cast<const ComplexKeyHashedDictionary *>(dictionary);
if (!dict)
return false;
if (arguments.size() != 3)
throw Exception{
"Function " + getName() + " for dictionary of type " + dict->getTypeName() +
" requires exactly 3 arguments",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH
};
const auto attr_name_col = typeid_cast<const ColumnConst<String> *>(block.getByPosition(arguments[1]).column.get());
if (!attr_name_col)
throw Exception{
"Second argument of function " + getName() + " must be a constant string",
ErrorCodes::ILLEGAL_COLUMN
};
const auto & attr_name = attr_name_col->getData();
const auto key_col_with_type = block.getByPosition(arguments[2]);
if (const auto key_col = typeid_cast<const ColumnTuple *>(key_col_with_type.column.get()))
{
const auto key_columns = ext::map<ConstColumnPlainPtrs>(key_col->getColumns(), [] (const ColumnPtr & ptr) {
return ptr.get();
});
const auto & key_types = static_cast<const DataTypeTuple &>(*key_col_with_type.type).getElements();
const auto out = new ColumnString;
block.getByPosition(result).column = out;
dict->getString(attr_name, key_columns, key_types, out);
}
else
throw Exception{
"Third argument of function " + getName() + " must be " + dict->getKeyDescription(),
ErrorCodes::TYPE_MISMATCH
};
return true;
}
template <typename DictionaryType>
bool executeDispatchRange(
Block & block, const ColumnNumbers & arguments, const size_t result, const IDictionaryBase * const dictionary)
@ -1186,6 +1236,13 @@ template <> struct DictGetTraits<DATA_TYPE>\
dict->get##TYPE(name, ids, out);\
}\
template <typename DictionaryType>\
static void get(\
const DictionaryType * const dict, const std::string & name, const ConstColumnPlainPtrs & key_columns,\
const DataTypes & key_types, PODArray<TYPE> & out)\
{\
dict->get##TYPE(name, key_columns, key_types, out);\
}\
template <typename DictionaryType>\
static void get(\
const DictionaryType * const dict, const std::string & name, const PODArray<UInt64> & ids,\
const PODArray<UInt16> & dates, PODArray<TYPE> & out)\
@ -1199,6 +1256,13 @@ template <> struct DictGetTraits<DATA_TYPE>\
{\
dict->get##TYPE(name, ids, def, out);\
}\
template <typename DictionaryType>\
static void getOrDefault(\
const DictionaryType * const dict, const std::string & name, const ConstColumnPlainPtrs & key_columns,\
const DataTypes & key_types, const PODArray<TYPE> & def, PODArray<TYPE> & out)\
{\
dict->get##TYPE(name, key_columns, key_types, def, out);\
}\
};
DECLARE_DICT_GET_TRAITS(UInt8, DataTypeUInt8)
DECLARE_DICT_GET_TRAITS(UInt16, DataTypeUInt16)
@ -1259,11 +1323,12 @@ private:
};
}
if (!typeid_cast<const DataTypeUInt64 *>(arguments[2].get()))
if (!typeid_cast<const DataTypeUInt64 *>(arguments[2].get()) &&
!typeid_cast<const DataTypeTuple *>(arguments[2].get()))
{
throw Exception{
"Illegal type " + arguments[2]->getName() + " of third argument of function " + getName()
+ ", must be UInt64.",
+ ", must be UInt64 or tuple(...).",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT
};
}
@ -1295,6 +1360,7 @@ private:
if (!executeDispatch<FlatDictionary>(block, arguments, result, dict_ptr) &&
!executeDispatch<HashedDictionary>(block, arguments, result, dict_ptr) &&
!executeDispatch<CacheDictionary>(block, arguments, result, dict_ptr) &&
!executeDispatchComplex(block, arguments, result, dict_ptr) &&
!executeDispatchRange<RangeHashedDictionary>(block, arguments, result, dict_ptr))
throw Exception{
"Unsupported dictionary type " + dict_ptr->getTypeName(),
@ -1329,13 +1395,11 @@ private:
const auto id_col_untyped = block.getByPosition(arguments[2]).column.get();
if (const auto id_col = typeid_cast<const ColumnVector<UInt64> *>(id_col_untyped))
{
const auto out = new ColumnVector<Type>;
const auto out = new ColumnVector<Type>(id_col->size());
block.getByPosition(result).column = out;
const auto & ids = id_col->getData();
auto & data = out->getData();
const auto size = ids.size();
data.resize(size);
DictGetTraits<DataType>::get(dict, attr_name, ids, data);
}
@ -1358,6 +1422,54 @@ private:
return true;
}
bool executeDispatchComplex(
Block & block, const ColumnNumbers & arguments, const size_t result, const IDictionaryBase * const dictionary)
{
const auto dict = typeid_cast<const ComplexKeyHashedDictionary *>(dictionary);
if (!dict)
return false;
if (arguments.size() != 3)
throw Exception{
"Function " + getName() + " for dictionary of type " + dict->getTypeName() +
" requires exactly 3 arguments",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH
};
const auto attr_name_col = typeid_cast<const ColumnConst<String> *>(block.getByPosition(arguments[1]).column.get());
if (!attr_name_col)
throw Exception{
"Second argument of function " + getName() + " must be a constant string",
ErrorCodes::ILLEGAL_COLUMN
};
const auto & attr_name = attr_name_col->getData();
const auto key_col_with_type = block.getByPosition(arguments[2]);
if (const auto key_col = typeid_cast<const ColumnTuple *>(key_col_with_type.column.get()))
{
const auto key_columns = ext::map<ConstColumnPlainPtrs>(key_col->getColumns(), [] (const ColumnPtr & ptr) {
return ptr.get();
});
const auto & key_types = static_cast<const DataTypeTuple &>(*key_col_with_type.type).getElements();
const auto out = new ColumnVector<Type>(key_columns.front()->size());
block.getByPosition(result).column = out;
auto & data = out->getData();
DictGetTraits<DataType>::get(dict, attr_name, key_columns, key_types, data);
}
else
throw Exception{
"Third argument of function " + getName() + " must be " + dict->getKeyDescription(),
ErrorCodes::TYPE_MISMATCH
};
return true;
}
template <typename DictionaryType>
bool executeDispatchRange(
Block & block, const ColumnNumbers & arguments, const size_t result, const IDictionaryBase * const dictionary)
@ -1538,11 +1650,12 @@ private:
};
}
if (!typeid_cast<const DataTypeUInt64 *>(arguments[2].get()))
if (!typeid_cast<const DataTypeUInt64 *>(arguments[2].get()) &&
!typeid_cast<const DataTypeTuple *>(arguments[2].get()))
{
throw Exception{
"Illegal type " + arguments[2]->getName() + " of third argument of function " + getName()
+ ", must be UInt64.",
+ ", must be UInt64 or tuple(...).",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT
};
}

View File

@ -0,0 +1,62 @@
#include <DB/AggregateFunctions/AggregateFunctionState.h>
#include <DB/Columns/ColumnAggregateFunction.h>
namespace DB
{
ColumnPtr ColumnAggregateFunction::convertToValues() const
{
const IAggregateFunction * function = holder->func;
ColumnPtr res = function->getReturnType()->createColumn();
/** Если агрегатная функция возвращает нефинализированное состояние,
* то надо просто скопировать указатели на него а также разделяемое владение данными.
*
* Также заменяем агрегатную функцию на вложенную.
* То есть, если этот столбец - состояния агрегатной функции aggState,
* то мы возвращаем такой же столбец, но с состояниями агрегатной функции agg.
* Это одни и те же состояния, меняем только функцию, которой они соответствуют.
*
* Это довольно сложно для понимания.
* Пример, когда такое происходит:
*
* SELECT k, finalizeAggregation(quantileTimingState(0.5)(x)) FROM ... GROUP BY k WITH TOTALS
*
* Здесь вычисляется агрегатная функция quantileTimingState.
* Её тип возвращаемого значения:
* AggregateFunction(quantileTiming(0.5), UInt64).
* Из-за наличия WITH TOTALS, при агрегации будут сохранены состояния этой агрегатной функции
* в столбце ColumnAggregateFunction, имеющего тип
* AggregateFunction(quantileTimingState(0.5), UInt64).
* Затем в TotalsHavingBlockInputStream у него будет вызван метод convertToValues,
* чтобы получить "готовые" значения.
* Но он всего лишь преобразует столбец типа
* AggregateFunction(quantileTimingState(0.5), UInt64)
* в AggregateFunction(quantileTiming(0.5), UInt64)
* - в такие же состояния.
*
* Затем будет вычислена функция finalizeAggregation, которая позовёт convertToValues уже от результата.
* И это преобразует столбец типа
* AggregateFunction(quantileTiming(0.5), UInt64)
* в UInt16 - уже готовый результат работы quantileTiming.
*/
if (const AggregateFunctionState * function_state = typeid_cast<const AggregateFunctionState *>(function))
{
ColumnAggregateFunction * res_ = new ColumnAggregateFunction(*this);
res = res_;
res_->set(function_state->getNestedFunction());
res_->getData().assign(getData().begin(), getData().end());
return res;
}
IColumn & column = *res;
res->reserve(getData().size());
for (auto val : getData())
function->insertResultInto(val, column);
return res;
}
}

View File

@ -4,6 +4,7 @@
#include <DB/Dictionaries/HashedDictionary.h>
#include <DB/Dictionaries/CacheDictionary.h>
#include <DB/Dictionaries/RangeHashedDictionary.h>
#include <DB/Dictionaries/ComplexKeyHashedDictionary.h>
#include <DB/Dictionaries/DictionaryStructure.h>
#include <memory>
@ -37,7 +38,13 @@ DictionaryPtr DictionaryFactory::create(const std::string & name, Poco::Util::Ab
if ("range_hashed" == layout_type)
{
if (!dict_struct.range_min || !dict_struct.range_min)
if (dict_struct.key)
throw Exception{
"'key' is not supported for dictionary of layout 'range_hashed'",
ErrorCodes::UNSUPPORTED_METHOD
};
if (!dict_struct.range_min || !dict_struct.range_max)
throw Exception{
name + ": dictionary of layout 'range_hashed' requires .structure.range_min and .structure.range_max",
ErrorCodes::BAD_ARGUMENTS
@ -45,9 +52,25 @@ DictionaryPtr DictionaryFactory::create(const std::string & name, Poco::Util::Ab
return std::make_unique<RangeHashedDictionary>(name, dict_struct, std::move(source_ptr), dict_lifetime, require_nonempty);
}
else if ("complex_key_hashed" == layout_type)
{
if (!dict_struct.key)
throw Exception{
"'key' is required for dictionary of layout 'complex_key_hashed'",
ErrorCodes::BAD_ARGUMENTS
};
return std::make_unique<ComplexKeyHashedDictionary>(name, dict_struct, std::move(source_ptr), dict_lifetime, require_nonempty);
}
else
{
if (dict_struct.range_min || dict_struct.range_min)
if (dict_struct.key)
throw Exception{
"'key' is not supported for dictionary of layout '" + layout_type + "'",
ErrorCodes::UNSUPPORTED_METHOD
};
if (dict_struct.range_min || dict_struct.range_max)
throw Exception{
name + ": elements .structure.range_min and .structure.range_max should be defined only "
"for a dictionary of layout 'range_hashed'",

View File

@ -28,7 +28,6 @@
#include <DB/Interpreters/InterpreterSelectQuery.h>
#include <DB/Interpreters/InterpreterSetQuery.h>
#include <DB/Interpreters/ExpressionAnalyzer.h>
#include <DB/Storages/StorageView.h>
#include <DB/TableFunctions/ITableFunction.h>
#include <DB/TableFunctions/TableFunctionFactory.h>

View File

@ -297,7 +297,9 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
}
catch (...)
{
onExceptionBeforeStart(query, context, current_time);
if (!internal)
onExceptionBeforeStart(query, context, current_time);
throw;
}

View File

@ -595,15 +595,39 @@ bool ParserExpressionElement::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos &
bool ParserWithOptionalAlias::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_parsed_pos, Expected & expected)
{
ParserWhiteSpaceOrComments ws;
ParserAlias alias_p(allow_alias_without_as_keyword);
if (!elem_parser->parse(pos, end, node, max_parsed_pos, expected))
return false;
/** Маленький хак.
*
* В секции SELECT мы разрешаем парсить алиасы без указания ключевого слова AS.
* Эти алиасы не могут совпадать с ключевыми словами запроса.
* А само выражение может быть идентификатором, совпадающем с ключевым словом.
* Например, столбец может называться where. И в запросе может быть написано SELECT where AS x FROM table или даже SELECT where x FROM table.
* Даже может быть написано SELECT where AS from FROM table, но не может быть написано SELECT where from FROM table.
* Смотрите подробнее в реализации ParserAlias.
*
* Но возникает небольшая проблема - неудобное сообщение об ошибке, если в секции SELECT в конце есть лишняя запятая.
* Хотя такая ошибка очень распространена. Пример: SELECT x, y, z, FROM tbl
* Если ничего не предпринять, то это парсится как выбор столбца с именем FROM и алиасом tbl.
* Чтобы избежать такой ситуации, мы не разрешаем парсить алиас без ключевого слова AS для идентификатора с именем FROM.
*
* Замечание: это также фильтрует случай, когда идентификатор квотирован.
* Пример: SELECT x, y, z, `FROM` tbl. Но такой случай можно было бы разрешить.
*
* В дальнейшем было бы проще запретить неквотированные идентификаторы, совпадающие с ключевыми словами.
*/
bool allow_alias_without_as_keyword_now = allow_alias_without_as_keyword;
if (allow_alias_without_as_keyword)
if (const ASTIdentifier * id = typeid_cast<const ASTIdentifier *>(node.get()))
if (0 == strcasecmp(id->name.data(), "FROM"))
allow_alias_without_as_keyword_now = false;
ws.ignore(pos, end);
ASTPtr alias_node;
if (alias_p.parse(pos, end, alias_node, max_parsed_pos, expected))
if (ParserAlias(allow_alias_without_as_keyword_now).parse(pos, end, alias_node, max_parsed_pos, expected))
{
String alias_name = typeid_cast<ASTIdentifier &>(*alias_node).name;

View File

@ -199,7 +199,13 @@ BlockInputStreams StorageDistributed::read(
/// Отключаем мультиплексирование шардов, если есть ORDER BY без GROUP BY.
const ASTSelectQuery & ast = *(static_cast<const ASTSelectQuery *>(modified_query_ast.get()));
bool enable_shard_multiplexing = !(ast.order_expression_list && !ast.group_expression_list);
/** Функциональность shard_multiplexing не доделана - выключаем её.
* (Потому что установка соединений с разными шардами в рамках одного потока выполняется не параллельно.)
* Подробнее смотрите в https://███████████.yandex-team.ru/METR-18300
*/
//bool enable_shard_multiplexing = !(ast.order_expression_list && !ast.group_expression_list);
bool enable_shard_multiplexing = false;
size_t thread_count;
@ -213,7 +219,7 @@ BlockInputStreams StorageDistributed::read(
thread_count = std::min(remote_count, static_cast<size_t>(settings.max_distributed_processing_threads));
size_t pools_per_thread = (thread_count > 0) ? (remote_count / thread_count) : 0;
size_t remainder = (thread_count > 0) ? (remote_count % thread_count) : 0;
size_t remainder = (thread_count > 0) ? (remote_count % thread_count) : 0;
ConnectionPoolsPtr pools;
bool do_init = true;
@ -341,12 +347,19 @@ BlockInputStreams StorageDistributed::describe(const Context & context, const Se
size_t thread_count;
if (remote_count == 0)
/** Функциональность shard_multiplexing не доделана - выключаем её.
* (Потому что установка соединений с разными шардами в рамках одного потока выполняется не параллельно.)
* Подробнее смотрите в https://███████████.yandex-team.ru/METR-18300
*/
/* if (remote_count == 0)
thread_count = 0;
else if (settings.max_distributed_processing_threads == 0)
thread_count = 1;
else
thread_count = std::min(remote_count, static_cast<size_t>(settings.max_distributed_processing_threads));
*/
thread_count = remote_count;
size_t pools_per_thread = (thread_count > 0) ? (remote_count / thread_count) : 0;
size_t remainder = (thread_count > 0) ? (remote_count % thread_count) : 0;

View File

@ -80,6 +80,8 @@ BlockInputStreams StorageView::read(
const size_t max_block_size,
const unsigned threads)
{
processed_stage = QueryProcessingStage::FetchColumns;
ASTPtr inner_query_clone = getInnerQuery();
ASTSelectQuery & inner_select = static_cast<ASTSelectQuery &>(*inner_query_clone);
const ASTSelectQuery & outer_select = typeid_cast<const ASTSelectQuery &>(*query);

View File

@ -0,0 +1,5 @@
Hello, world!
Hello, world!
Hello, world!
Hello, world!
Hello, world!

View File

@ -0,0 +1,13 @@
DROP TABLE IF EXISTS test.view1;
DROP TABLE IF EXISTS test.view2;
DROP TABLE IF EXISTS test.merge_view;
CREATE VIEW test.view1 AS SELECT number FROM system.numbers LIMIT 10;
CREATE VIEW test.view2 AS SELECT number FROM system.numbers LIMIT 10;
CREATE TABLE test.merge_view (number UInt64) ENGINE = Merge(test, '^view');
SELECT 'Hello, world!' FROM test.merge_view LIMIT 5;
DROP TABLE test.view1;
DROP TABLE test.view2;
DROP TABLE test.merge_view;

View File

@ -0,0 +1,6 @@
0 [15008]
1 [15008]
2 [15008]
3 [5001]
0 [13341]

View File

@ -0,0 +1 @@
SELECT k, finalizeAggregation(quantilesTimingState(0.5)(x)) FROM (SELECT intDiv(number, 30000 AS d) AS k, number % d AS x FROM system.numbers LIMIT 100000) GROUP BY k WITH TOTALS ORDER BY k;