Merge branch 'master' of github.com:yandex/ClickHouse

This commit is contained in:
Alexey Milovidov 2016-10-03 16:58:23 +03:00
commit f8bd3a3c33
76 changed files with 1114 additions and 366 deletions

View File

@ -83,7 +83,7 @@ public:
return nested_func->alignOfData();
}
void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num) const override
void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena *) const override
{
const IColumn * nested[num_agruments];
@ -97,12 +97,12 @@ public:
size_t end = offsets[row_num];
for (size_t i = begin; i < end; ++i)
nested_func->add(place, nested, i);
nested_func->add(place, nested, i, nullptr);
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const override
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena * arena) const override
{
nested_func->merge(place, rhs);
nested_func->merge(place, rhs, arena);
}
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override
@ -110,9 +110,9 @@ public:
nested_func->serialize(place, buf);
}
void deserialize(AggregateDataPtr place, ReadBuffer & buf) const override
void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena * arena) const override
{
nested_func->deserialize(place, buf);
nested_func->deserialize(place, buf, arena);
}
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
@ -120,9 +120,9 @@ public:
nested_func->insertResultInto(place, to);
}
static void addFree(const IAggregateFunction * that, AggregateDataPtr place, const IColumn ** columns, size_t row_num)
static void addFree(const IAggregateFunction * that, AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena * arena)
{
return static_cast<const AggregateFunctionArray &>(*that).add(place, columns, row_num);
static_cast<const AggregateFunctionArray &>(*that).add(place, columns, row_num, arena);
}
IAggregateFunction::AddFunc getAddressOfAddFunction() const override final { return &addFree; }

View File

@ -42,13 +42,13 @@ public:
}
void addImpl(AggregateDataPtr place, const IColumn & column, size_t row_num) const
void addImpl(AggregateDataPtr place, const IColumn & column, size_t row_num, Arena *) const
{
this->data(place).sum += static_cast<const ColumnVector<T> &>(column).getData()[row_num];
++this->data(place).count;
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const override
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena * arena) const override
{
this->data(place).sum += this->data(rhs).sum;
this->data(place).count += this->data(rhs).count;
@ -60,7 +60,7 @@ public:
writeVarUInt(this->data(place).count, buf);
}
void deserialize(AggregateDataPtr place, ReadBuffer & buf) const override
void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override
{
readBinary(this->data(place).sum, buf);
readVarUInt(this->data(place).count, buf);

View File

@ -36,7 +36,7 @@ public:
++data(place).count;
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const override
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena * arena) const override
{
data(place).count += data(rhs).count;
}
@ -46,7 +46,7 @@ public:
writeVarUInt(data(place).count, buf);
}
void deserialize(AggregateDataPtr place, ReadBuffer & buf) const override
void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override
{
readVarUInt(data(place).count, buf);
}

View File

@ -51,12 +51,12 @@ public:
{
}
void addImpl(AggregateDataPtr place, const IColumn & column, size_t row_num) const
void addImpl(AggregateDataPtr place, const IColumn & column, size_t row_num, Arena *) const
{
this->data(place).value.push_back(static_cast<const ColumnVector<T> &>(column).getData()[row_num]);
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const override
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena * arena) const override
{
this->data(place).value.insert(this->data(rhs).value.begin(), this->data(rhs).value.end());
}
@ -69,7 +69,7 @@ public:
buf.write(reinterpret_cast<const char *>(&value[0]), size * sizeof(value[0]));
}
void deserialize(AggregateDataPtr place, ReadBuffer & buf) const override
void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override
{
size_t size = 0;
readVarUInt(size, buf);
@ -128,13 +128,13 @@ public:
}
void addImpl(AggregateDataPtr place, const IColumn & column, size_t row_num) const
void addImpl(AggregateDataPtr place, const IColumn & column, size_t row_num, Arena *) const
{
data(place).value.push_back(Array::value_type());
column.get(row_num, data(place).value.back());
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const override
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena * arena) const override
{
data(place).value.insert(data(place).value.end(), data(rhs).value.begin(), data(rhs).value.end());
}
@ -148,7 +148,7 @@ public:
type->serializeBinary(value[i], buf);
}
void deserialize(AggregateDataPtr place, ReadBuffer & buf) const override
void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override
{
size_t size = 0;
readVarUInt(size, buf);

View File

@ -5,6 +5,7 @@
#include <DB/DataTypes/DataTypeArray.h>
#include <DB/DataTypes/DataTypesNumberFixed.h>
#include <DB/DataTypes/DataTypeString.h>
#include <DB/Columns/ColumnArray.h>
@ -23,12 +24,12 @@ template <typename T>
struct AggregateFunctionGroupUniqArrayData
{
/// При создании, хэш-таблица должна быть небольшой.
typedef HashSet<
using Set = HashSet<
T,
DefaultHash<T>,
HashTableGrower<4>,
HashTableAllocatorWithStackMemory<sizeof(T) * (1 << 4)>
> Set;
>;
Set value;
};
@ -55,26 +56,26 @@ public:
}
void addImpl(AggregateDataPtr place, const IColumn & column, size_t row_num) const
void addImpl(AggregateDataPtr place, const IColumn & column, size_t row_num, Arena *) const
{
this->data(place).value.insert(static_cast<const ColumnVector<T> &>(column).getData()[row_num]);
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const override
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena * arena) const override
{
this->data(place).value.merge(this->data(rhs).value);
}
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override
{
const typename State::Set & set = this->data(place).value;
auto & set = this->data(place).value;
size_t size = set.size();
writeVarUInt(size, buf);
for (typename State::Set::const_iterator it = set.begin(); it != set.end(); ++it)
writeIntBinary(*it, buf);
for (auto & elem : set)
writeIntBinary(elem, buf);
}
void deserialize(AggregateDataPtr place, ReadBuffer & buf) const override
void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override
{
this->data(place).value.read(buf);
}
@ -94,11 +95,159 @@ public:
data_to.resize(old_size + size);
size_t i = 0;
for (typename State::Set::const_iterator it = set.begin(); it != set.end(); ++it, ++i)
for (auto it = set.begin(); it != set.end(); ++it, ++i)
data_to[old_size + i] = *it;
}
};
/// Generic implementation, it uses serialized representation as object descriptor.
struct AggreagteFunctionGroupUniqArrayGenericData
{
static constexpr size_t INIT_ELEMS = 2; /// adjustable
static constexpr size_t ELEM_SIZE = sizeof(HashSetCellWithSavedHash<StringRef, StringRefHash>);
using Set = HashSetWithSavedHash<StringRef, StringRefHash, HashTableGrower<INIT_ELEMS>, HashTableAllocatorWithStackMemory<INIT_ELEMS * ELEM_SIZE>>;
Set value;
};
/** Template parameter with true value should be used for columns that store their elements in memory continuously.
* For such columns groupUniqArray() can be implemented more efficently (especially for small numeric arrays).
*/
template <bool is_plain_column = false>
class AggreagteFunctionGroupUniqArrayGeneric : public IUnaryAggregateFunction<AggreagteFunctionGroupUniqArrayGenericData, AggreagteFunctionGroupUniqArrayGeneric<is_plain_column>>
{
DataTypePtr input_data_type;
using State = AggreagteFunctionGroupUniqArrayGenericData;
static StringRef getSerialization(const IColumn & column, size_t row_num, Arena & arena);
static void deserializeAndInsert(StringRef str, IColumn & data_to);
public:
String getName() const override { return "groupUniqArray"; }
void setArgument(const DataTypePtr & argument)
{
input_data_type = argument;
}
DataTypePtr getReturnType() const override
{
return std::make_shared<DataTypeArray>(input_data_type->clone());
}
bool allocatesMemoryInArena() const override
{
return true;
}
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override
{
auto & set = this->data(place).value;
writeVarUInt(set.size(), buf);
for (const auto & elem : set)
{
writeStringBinary(elem, buf);
}
}
void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena * arena) const override
{
auto & set = this->data(place).value;
size_t size;
readVarUInt(size, buf);
//TODO: set.reserve(size);
for (size_t i = 0; i < size; i++)
{
set.insert(readStringBinaryInto(*arena, buf));
}
}
void addImpl(AggregateDataPtr place, const IColumn & column, size_t row_num, Arena * arena) const
{
auto & set = this->data(place).value;
bool inserted;
State::Set::iterator it;
StringRef str_serialized = getSerialization(column, row_num, *arena);
set.emplace(str_serialized, it, inserted);
if (!is_plain_column)
{
if (!inserted)
arena->rollback(str_serialized.size);
}
else
{
if (inserted)
it->data = arena->insert(str_serialized.data, str_serialized.size);
}
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena * arena) const override
{
auto & cur_set = this->data(place).value;
auto & rhs_set = this->data(rhs).value;
bool inserted;
State::Set::iterator it;
for (auto & rhs_elem : rhs_set)
{
cur_set.emplace(rhs_elem, it, inserted);
if (inserted)
it->data = arena->insert(it->data, it->size);
}
}
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
{
ColumnArray & arr_to = static_cast<ColumnArray &>(to);
ColumnArray::Offsets_t & offsets_to = arr_to.getOffsets();
IColumn & data_to = arr_to.getData();
auto & set = this->data(place).value;
offsets_to.push_back((offsets_to.size() == 0 ? 0 : offsets_to.back()) + set.size());
for (auto & elem : set)
{
deserializeAndInsert(elem, data_to);
}
}
};
template <>
inline StringRef AggreagteFunctionGroupUniqArrayGeneric<false>::getSerialization(const IColumn & column, size_t row_num, Arena & arena)
{
const char * begin = nullptr;
return column.serializeValueIntoArena(row_num, arena, begin);
}
template <>
inline StringRef AggreagteFunctionGroupUniqArrayGeneric<true>::getSerialization(const IColumn & column, size_t row_num, Arena &)
{
return column.getDataAt(row_num);
}
template <>
inline void AggreagteFunctionGroupUniqArrayGeneric<false>::deserializeAndInsert(StringRef str, IColumn & data_to)
{
data_to.deserializeAndInsertFromArena(str.data);
}
template <>
inline void AggreagteFunctionGroupUniqArrayGeneric<true>::deserializeAndInsert(StringRef str, IColumn & data_to)
{
data_to.insertData(str.data, str.size);
}
#undef AGGREGATE_FUNCTION_GROUP_ARRAY_UNIQ_MAX_SIZE
}

View File

@ -77,15 +77,15 @@ public:
return nested_func->alignOfData();
}
void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num) const override
void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena *) const override
{
if (static_cast<const ColumnUInt8 &>(*columns[num_agruments - 1]).getData()[row_num])
nested_func->add(place, columns, row_num);
nested_func->add(place, columns, row_num, nullptr);
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const override
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena * arena) const override
{
nested_func->merge(place, rhs);
nested_func->merge(place, rhs, arena);
}
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override
@ -93,9 +93,9 @@ public:
nested_func->serialize(place, buf);
}
void deserialize(AggregateDataPtr place, ReadBuffer & buf) const override
void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena * arena) const override
{
nested_func->deserialize(place, buf);
nested_func->deserialize(place, buf, arena);
}
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
@ -103,9 +103,9 @@ public:
nested_func->insertResultInto(place, to);
}
static void addFree(const IAggregateFunction * that, AggregateDataPtr place, const IColumn ** columns, size_t row_num)
static void addFree(const IAggregateFunction * that, AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena * arena)
{
return static_cast<const AggregateFunctionIf &>(*that).add(place, columns, row_num);
static_cast<const AggregateFunctionIf &>(*that).add(place, columns, row_num, arena);
}
IAggregateFunction::AddFunc getAddressOfAddFunction() const override final { return &addFree; }

View File

@ -79,14 +79,14 @@ public:
return nested_func->alignOfData();
}
void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num) const override
void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena * arena) const override
{
nested_func->merge(place, static_cast<const ColumnAggregateFunction &>(*columns[0]).getData()[row_num]);
nested_func->merge(place, static_cast<const ColumnAggregateFunction &>(*columns[0]).getData()[row_num], arena);
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const override
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena * arena) const override
{
nested_func->merge(place, rhs);
nested_func->merge(place, rhs, arena);
}
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override
@ -94,9 +94,9 @@ public:
nested_func->serialize(place, buf);
}
void deserialize(AggregateDataPtr place, ReadBuffer & buf) const override
void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena * arena) const override
{
nested_func->deserialize(place, buf);
nested_func->deserialize(place, buf, arena);
}
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
@ -104,9 +104,9 @@ public:
nested_func->insertResultInto(place, to);
}
static void addFree(const IAggregateFunction * that, AggregateDataPtr place, const IColumn ** columns, size_t row_num)
static void addFree(const IAggregateFunction * that, AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena * arena)
{
return static_cast<const AggregateFunctionMerge &>(*that).add(place, columns, row_num);
static_cast<const AggregateFunctionMerge &>(*that).add(place, columns, row_num, arena);
}
IAggregateFunction::AddFunc getAddressOfAddFunction() const override final { return &addFree; }

View File

@ -68,12 +68,12 @@ public:
}
void addImpl(AggregateDataPtr place, const IColumn & column, size_t row_num) const
void addImpl(AggregateDataPtr place, const IColumn & column, size_t row_num, Arena *) const
{
this->data(place).sample.insert(static_cast<const ColumnVector<ArgumentFieldType> &>(column).getData()[row_num]);
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const override
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena * arena) const override
{
this->data(place).sample.merge(this->data(rhs).sample);
}
@ -83,7 +83,7 @@ public:
this->data(place).sample.write(buf);
}
void deserialize(AggregateDataPtr place, ReadBuffer & buf) const override
void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override
{
this->data(place).sample.read(buf);
}
@ -145,12 +145,12 @@ public:
}
void addImpl(AggregateDataPtr place, const IColumn & column, size_t row_num) const
void addImpl(AggregateDataPtr place, const IColumn & column, size_t row_num, Arena *) const
{
this->data(place).sample.insert(static_cast<const ColumnVector<ArgumentFieldType> &>(column).getData()[row_num]);
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const override
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena * arena) const override
{
this->data(place).sample.merge(this->data(rhs).sample);
}
@ -160,7 +160,7 @@ public:
this->data(place).sample.write(buf);
}
void deserialize(AggregateDataPtr place, ReadBuffer & buf) const override
void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override
{
this->data(place).sample.read(buf);
}

View File

@ -74,13 +74,13 @@ public:
}
void addImpl(AggregateDataPtr place, const IColumn & column, const IColumn & determinator, size_t row_num) const
void addImpl(AggregateDataPtr place, const IColumn & column, const IColumn & determinator, size_t row_num, Arena *) const
{
this->data(place).sample.insert(static_cast<const ColumnVector<ArgumentFieldType> &>(column).getData()[row_num],
determinator.get64(row_num));
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const override
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena * arena) const override
{
this->data(place).sample.merge(this->data(rhs).sample);
}
@ -90,7 +90,7 @@ public:
this->data(place).sample.write(buf);
}
void deserialize(AggregateDataPtr place, ReadBuffer & buf) const override
void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override
{
this->data(place).sample.read(buf);
}
@ -158,13 +158,13 @@ public:
}
void addImpl(AggregateDataPtr place, const IColumn & column, const IColumn & determinator, size_t row_num) const
void addImpl(AggregateDataPtr place, const IColumn & column, const IColumn & determinator, size_t row_num, Arena *) const
{
this->data(place).sample.insert(static_cast<const ColumnVector<ArgumentFieldType> &>(column).getData()[row_num],
determinator.get64(row_num));
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const override
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena * arena) const override
{
this->data(place).sample.merge(this->data(rhs).sample);
}
@ -174,7 +174,7 @@ public:
this->data(place).sample.write(buf);
}
void deserialize(AggregateDataPtr place, ReadBuffer & buf) const override
void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override
{
this->data(place).sample.read(buf);
}

View File

@ -68,12 +68,12 @@ public:
level = apply_visitor(FieldVisitorConvertToNumber<Float64>(), params[0]);
}
void addImpl(AggregateDataPtr place, const IColumn & column, size_t row_num) const
void addImpl(AggregateDataPtr place, const IColumn & column, size_t row_num, Arena *) const
{
this->data(place).array.push_back(static_cast<const ColumnVector<T> &>(column).getData()[row_num]);
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const override
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena * arena) const override
{
this->data(place).array.insert(this->data(rhs).array.begin(), this->data(rhs).array.end());
}
@ -87,7 +87,7 @@ public:
buf.write(reinterpret_cast<const char *>(&array[0]), size * sizeof(array[0]));
}
void deserialize(AggregateDataPtr place, ReadBuffer & buf) const override
void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override
{
auto & array = this->data(place).array;
@ -150,12 +150,12 @@ public:
levels.set(params);
}
void addImpl(AggregateDataPtr place, const IColumn & column, size_t row_num) const
void addImpl(AggregateDataPtr place, const IColumn & column, size_t row_num, Arena *) const
{
this->data(place).array.push_back(static_cast<const ColumnVector<T> &>(column).getData()[row_num]);
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const override
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena * arena) const override
{
this->data(place).array.insert(this->data(rhs).array.begin(), this->data(rhs).array.end());
}
@ -169,7 +169,7 @@ public:
buf.write(reinterpret_cast<const char *>(&array[0]), size * sizeof(array[0]));
}
void deserialize(AggregateDataPtr place, ReadBuffer & buf) const override
void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override
{
auto & array = this->data(place).array;

View File

@ -74,14 +74,14 @@ public:
level = apply_visitor(FieldVisitorConvertToNumber<Float64>(), params[0]);
}
void addImpl(AggregateDataPtr place, const IColumn & column_value, const IColumn & column_weight, size_t row_num) const
void addImpl(AggregateDataPtr place, const IColumn & column_value, const IColumn & column_weight, size_t row_num, Arena *) const
{
this->data(place)
.map[static_cast<const ColumnVector<ValueType> &>(column_value).getData()[row_num]]
+= static_cast<const ColumnVector<WeightType> &>(column_weight).getData()[row_num];
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const override
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena * arena) const override
{
auto & map = this->data(place).map;
const auto & rhs_map = this->data(rhs).map;
@ -95,7 +95,7 @@ public:
this->data(place).map.write(buf);
}
void deserialize(AggregateDataPtr place, ReadBuffer & buf) const override
void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override
{
typename AggregateFunctionQuantileExactWeightedData<ValueType>::Map::Reader reader(buf);
@ -189,14 +189,14 @@ public:
levels.set(params);
}
void addImpl(AggregateDataPtr place, const IColumn & column_value, const IColumn & column_weight, size_t row_num) const
void addImpl(AggregateDataPtr place, const IColumn & column_value, const IColumn & column_weight, size_t row_num, Arena *) const
{
this->data(place)
.map[static_cast<const ColumnVector<ValueType> &>(column_value).getData()[row_num]]
+= static_cast<const ColumnVector<WeightType> &>(column_weight).getData()[row_num];
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const override
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena * arena) const override
{
auto & map = this->data(place).map;
const auto & rhs_map = this->data(rhs).map;
@ -210,7 +210,7 @@ public:
this->data(place).map.write(buf);
}
void deserialize(AggregateDataPtr place, ReadBuffer & buf) const override
void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override
{
typename AggregateFunctionQuantileExactWeightedData<ValueType>::Map::Reader reader(buf);

View File

@ -382,12 +382,12 @@ public:
level = apply_visitor(FieldVisitorConvertToNumber<Float32>(), params[0]);
}
void addImpl(AggregateDataPtr place, const IColumn & column, size_t row_num) const
void addImpl(AggregateDataPtr place, const IColumn & column, size_t row_num, Arena *) const
{
this->data(place).digest.add(params, static_cast<const ColumnVector<T> &>(column).getData()[row_num]);
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const override
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena * arena) const override
{
this->data(place).digest.merge(params, this->data(rhs).digest);
}
@ -397,7 +397,7 @@ public:
this->data(const_cast<AggregateDataPtr>(place)).digest.write(params, buf);
}
void deserialize(AggregateDataPtr place, ReadBuffer & buf) const override
void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override
{
this->data(place).digest.read(params, buf);
}
@ -449,14 +449,14 @@ public:
level = apply_visitor(FieldVisitorConvertToNumber<Float32>(), params[0]);
}
void addImpl(AggregateDataPtr place, const IColumn & column_value, const IColumn & column_weight, size_t row_num) const
void addImpl(AggregateDataPtr place, const IColumn & column_value, const IColumn & column_weight, size_t row_num, Arena *) const
{
this->data(place).digest.add(params,
static_cast<const ColumnVector<T> &>(column_value).getData()[row_num],
static_cast<const ColumnVector<Weight> &>(column_weight).getData()[row_num]);
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const override
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena * arena) const override
{
this->data(place).digest.merge(params, this->data(rhs).digest);
}
@ -466,7 +466,7 @@ public:
this->data(const_cast<AggregateDataPtr>(place)).digest.write(params, buf);
}
void deserialize(AggregateDataPtr place, ReadBuffer & buf) const override
void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override
{
this->data(place).digest.read(params, buf);
}
@ -513,12 +513,12 @@ public:
levels.set(params);
}
void addImpl(AggregateDataPtr place, const IColumn & column, size_t row_num) const
void addImpl(AggregateDataPtr place, const IColumn & column, size_t row_num, Arena *) const
{
this->data(place).digest.add(params, static_cast<const ColumnVector<T> &>(column).getData()[row_num]);
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const override
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena * arena) const override
{
this->data(place).digest.merge(params, this->data(rhs).digest);
}
@ -528,7 +528,7 @@ public:
this->data(const_cast<AggregateDataPtr>(place)).digest.write(params, buf);
}
void deserialize(AggregateDataPtr place, ReadBuffer & buf) const override
void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override
{
this->data(place).digest.read(params, buf);
}
@ -593,14 +593,14 @@ public:
levels.set(params);
}
void addImpl(AggregateDataPtr place, const IColumn & column_value, const IColumn & column_weight, size_t row_num) const
void addImpl(AggregateDataPtr place, const IColumn & column_value, const IColumn & column_weight, size_t row_num, Arena *) const
{
this->data(place).digest.add(params,
static_cast<const ColumnVector<T> &>(column_value).getData()[row_num],
static_cast<const ColumnVector<Weight> &>(column_weight).getData()[row_num]);
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const override
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena * arena) const override
{
this->data(place).digest.merge(params, this->data(rhs).digest);
}
@ -610,7 +610,7 @@ public:
this->data(const_cast<AggregateDataPtr>(place)).digest.write(params, buf);
}
void deserialize(AggregateDataPtr place, ReadBuffer & buf) const override
void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override
{
this->data(place).digest.read(params, buf);
}

View File

@ -815,12 +815,12 @@ public:
}
void addImpl(AggregateDataPtr place, const IColumn & column, size_t row_num) const
void addImpl(AggregateDataPtr place, const IColumn & column, size_t row_num, Arena *) const
{
this->data(place).insert(static_cast<const ColumnVector<ArgumentFieldType> &>(column).getData()[row_num]);
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const override
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena * arena) const override
{
this->data(place).merge(this->data(rhs));
}
@ -830,7 +830,7 @@ public:
this->data(place).serialize(buf);
}
void deserialize(AggregateDataPtr place, ReadBuffer & buf) const override
void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override
{
this->data(place).deserialize(buf);
}
@ -873,14 +873,14 @@ public:
level = apply_visitor(FieldVisitorConvertToNumber<Float64>(), params[0]);
}
void addImpl(AggregateDataPtr place, const IColumn & column_value, const IColumn & column_weight, size_t row_num) const
void addImpl(AggregateDataPtr place, const IColumn & column_value, const IColumn & column_weight, size_t row_num, Arena *) const
{
this->data(place).insertWeighted(
static_cast<const ColumnVector<ArgumentFieldType> &>(column_value).getData()[row_num],
static_cast<const ColumnVector<WeightFieldType> &>(column_weight).getData()[row_num]);
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const override
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena * arena) const override
{
this->data(place).merge(this->data(rhs));
}
@ -890,7 +890,7 @@ public:
this->data(place).serialize(buf);
}
void deserialize(AggregateDataPtr place, ReadBuffer & buf) const override
void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override
{
this->data(place).deserialize(buf);
}
@ -930,12 +930,12 @@ public:
}
void addImpl(AggregateDataPtr place, const IColumn & column, size_t row_num) const
void addImpl(AggregateDataPtr place, const IColumn & column, size_t row_num, Arena *) const
{
this->data(place).insert(static_cast<const ColumnVector<ArgumentFieldType> &>(column).getData()[row_num]);
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const override
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena * arena) const override
{
this->data(place).merge(this->data(rhs));
}
@ -945,7 +945,7 @@ public:
this->data(place).serialize(buf);
}
void deserialize(AggregateDataPtr place, ReadBuffer & buf) const override
void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override
{
this->data(place).deserialize(buf);
}
@ -991,14 +991,14 @@ public:
levels.set(params);
}
void addImpl(AggregateDataPtr place, const IColumn & column_value, const IColumn & column_weight, size_t row_num) const
void addImpl(AggregateDataPtr place, const IColumn & column_value, const IColumn & column_weight, size_t row_num, Arena *) const
{
this->data(place).insertWeighted(
static_cast<const ColumnVector<ArgumentFieldType> &>(column_value).getData()[row_num],
static_cast<const ColumnVector<WeightFieldType> &>(column_weight).getData()[row_num]);
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const override
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena * arena) const override
{
this->data(place).merge(this->data(rhs));
}
@ -1008,7 +1008,7 @@ public:
this->data(place).serialize(buf);
}
void deserialize(AggregateDataPtr place, ReadBuffer & buf) const override
void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override
{
this->data(place).deserialize(buf);
}

View File

@ -192,7 +192,7 @@ public:
parsePattern();
}
void add(AggregateDataPtr place, const IColumn ** columns, const size_t row_num) const override
void add(AggregateDataPtr place, const IColumn ** columns, const size_t row_num, Arena *) const override
{
const auto timestamp = static_cast<const ColumnUInt32 *>(columns[0])->getData()[row_num];
@ -206,7 +206,7 @@ public:
data(place).add(timestamp, events);
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const override
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena * arena) const override
{
data(place).merge(data(rhs));
}
@ -216,7 +216,7 @@ public:
data(place).serialize(buf);
}
void deserialize(AggregateDataPtr place, ReadBuffer & buf) const override
void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override
{
data(place).deserialize(buf);
}
@ -234,9 +234,9 @@ public:
static_cast<ColumnUInt8 &>(to).getData().push_back(match(events_it, events_end));
}
static void addFree(const IAggregateFunction * that, AggregateDataPtr place, const IColumn ** columns, size_t row_num)
static void addFree(const IAggregateFunction * that, AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena * arena)
{
return static_cast<const AggregateFunctionSequenceMatch &>(*that).add(place, columns, row_num);
static_cast<const AggregateFunctionSequenceMatch &>(*that).add(place, columns, row_num, arena);
}
IAggregateFunction::AddFunc getAddressOfAddFunction() const override final { return &addFree; }

View File

@ -72,14 +72,14 @@ public:
return nested_func->alignOfData();
}
void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num) const override
void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena * arena) const override
{
nested_func->add(place, columns, row_num);
nested_func->add(place, columns, row_num, arena);
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const override
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena * arena) const override
{
nested_func->merge(place, rhs);
nested_func->merge(place, rhs, arena);
}
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override
@ -87,9 +87,9 @@ public:
nested_func->serialize(place, buf);
}
void deserialize(AggregateDataPtr place, ReadBuffer & buf) const override
void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena * arena) const override
{
nested_func->deserialize(place, buf);
nested_func->deserialize(place, buf, arena);
}
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
@ -102,9 +102,9 @@ public:
AggregateFunctionPtr getNestedFunction() const { return nested_func_owner; }
static void addFree(const IAggregateFunction * that, AggregateDataPtr place, const IColumn ** columns, size_t row_num)
static void addFree(const IAggregateFunction * that, AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena * arena)
{
return static_cast<const AggregateFunctionState &>(*that).add(place, columns, row_num);
static_cast<const AggregateFunctionState &>(*that).add(place, columns, row_num, arena);
}
IAggregateFunction::AddFunc getAddressOfAddFunction() const override final { return &addFree; }

View File

@ -40,12 +40,12 @@ public:
}
void addImpl(AggregateDataPtr place, const IColumn & column, size_t row_num) const
void addImpl(AggregateDataPtr place, const IColumn & column, size_t row_num, Arena *) const
{
this->data(place).sum += static_cast<const ColumnVector<T> &>(column).getData()[row_num];
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const override
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena * arena) const override
{
this->data(place).sum += this->data(rhs).sum;
}
@ -55,7 +55,7 @@ public:
writeBinary(this->data(place).sum, buf);
}
void deserialize(AggregateDataPtr place, ReadBuffer & buf) const override
void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override
{
readBinary(this->data(place).sum, buf);
}

View File

@ -340,12 +340,12 @@ public:
{
}
void addImpl(AggregateDataPtr place, const IColumn & column, size_t row_num) const
void addImpl(AggregateDataPtr place, const IColumn & column, size_t row_num, Arena *) const
{
detail::OneAdder<T, Data>::addImpl(this->data(place), column, row_num);
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const override
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena * arena) const override
{
this->data(place).set.merge(this->data(rhs).set);
}
@ -355,7 +355,7 @@ public:
this->data(place).set.write(buf);
}
void deserialize(AggregateDataPtr place, ReadBuffer & buf) const override
void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override
{
this->data(place).set.read(buf);
}
@ -395,12 +395,12 @@ public:
num_args = arguments.size();
}
void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num) const override
void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena *) const override
{
this->data(place).set.insert(UniqVariadicHash<is_exact, argument_is_tuple>::apply(num_args, columns, row_num));
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const override
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena * arena) const override
{
this->data(place).set.merge(this->data(rhs).set);
}
@ -410,7 +410,7 @@ public:
this->data(place).set.write(buf);
}
void deserialize(AggregateDataPtr place, ReadBuffer & buf) const override
void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override
{
this->data(place).set.read(buf);
}
@ -420,9 +420,9 @@ public:
static_cast<ColumnUInt64 &>(to).getData().push_back(this->data(place).set.size());
}
static void addFree(const IAggregateFunction * that, AggregateDataPtr place, const IColumn ** columns, size_t row_num)
static void addFree(const IAggregateFunction * that, AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena * arena)
{
return static_cast<const AggregateFunctionUniqVariadic &>(*that).add(place, columns, row_num);
static_cast<const AggregateFunctionUniqVariadic &>(*that).add(place, columns, row_num, arena);
}
IAggregateFunction::AddFunc getAddressOfAddFunction() const override final { return &addFree; }

View File

@ -151,12 +151,12 @@ public:
threshold = threshold_param;
}
void addImpl(AggregateDataPtr place, const IColumn & column, size_t row_num) const
void addImpl(AggregateDataPtr place, const IColumn & column, size_t row_num, Arena *) const
{
this->data(place).addImpl(column, row_num, threshold);
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const override
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena * arena) const override
{
this->data(place).merge(this->data(rhs), threshold);
}
@ -166,7 +166,7 @@ public:
this->data(place).write(buf, threshold);
}
void deserialize(AggregateDataPtr place, ReadBuffer & buf) const override
void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override
{
this->data(place).read(buf, threshold);
}
@ -224,12 +224,12 @@ public:
threshold = threshold_param;
}
void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num) const override
void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena *) const override
{
this->data(place).insert(UniqVariadicHash<false, argument_is_tuple>::apply(num_args, columns, row_num), threshold);
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const override
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena * arena) const override
{
this->data(place).merge(this->data(rhs), threshold);
}
@ -239,7 +239,7 @@ public:
this->data(place).write(buf, threshold);
}
void deserialize(AggregateDataPtr place, ReadBuffer & buf) const override
void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override
{
this->data(place).read(buf, threshold);
}
@ -249,9 +249,9 @@ public:
static_cast<ColumnUInt64 &>(to).getData().push_back(this->data(place).size());
}
static void addFree(const IAggregateFunction * that, AggregateDataPtr place, const IColumn ** columns, size_t row_num)
static void addFree(const IAggregateFunction * that, AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena *arena)
{
return static_cast<const AggregateFunctionUniqUpToVariadic &>(*that).add(place, columns, row_num);
static_cast<const AggregateFunctionUniqUpToVariadic &>(*that).add(place, columns, row_num, arena);
}
IAggregateFunction::AddFunc getAddressOfAddFunction() const override final { return &addFree; }

View File

@ -41,13 +41,13 @@ public:
type_val = arguments[1];
}
void addImpl(AggregateDataPtr place, const IColumn & column_arg, const IColumn & column_max, size_t row_num) const
void addImpl(AggregateDataPtr place, const IColumn & column_arg, const IColumn & column_max, size_t row_num, Arena *) const
{
if (this->data(place).value.changeIfBetter(column_max, row_num))
this->data(place).result.change(column_arg, row_num);
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const override
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena * arena) const override
{
if (this->data(place).value.changeIfBetter(this->data(rhs).value))
this->data(place).result.change(this->data(rhs).result);
@ -59,7 +59,7 @@ public:
this->data(place).value.write(buf, *type_val.get());
}
void deserialize(AggregateDataPtr place, ReadBuffer & buf) const override
void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override
{
this->data(place).result.read(buf, *type_res.get());
this->data(place).value.read(buf, *type_val.get());

View File

@ -701,12 +701,12 @@ public:
}
void addImpl(AggregateDataPtr place, const IColumn & column, size_t row_num) const
void addImpl(AggregateDataPtr place, const IColumn & column, size_t row_num, Arena *) const
{
this->data(place).changeIfBetter(column, row_num);
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const override
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena * arena) const override
{
this->data(place).changeIfBetter(this->data(rhs));
}
@ -716,7 +716,7 @@ public:
this->data(place).write(buf, *type.get());
}
void deserialize(AggregateDataPtr place, ReadBuffer & buf) const override
void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override
{
this->data(place).read(buf, *type.get());
}

View File

@ -129,12 +129,12 @@ public:
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
}
void addImpl(AggregateDataPtr place, const IColumn & column, size_t row_num) const
void addImpl(AggregateDataPtr place, const IColumn & column, size_t row_num, Arena *) const
{
this->data(place).update(column, row_num);
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const override
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena * arena) const override
{
this->data(place).mergeWith(this->data(rhs));
}
@ -144,7 +144,7 @@ public:
this->data(place).serialize(buf);
}
void deserialize(AggregateDataPtr place, ReadBuffer & buf) const override
void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override
{
this->data(place).deserialize(buf);
}
@ -397,12 +397,12 @@ public:
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
}
void addImpl(AggregateDataPtr place, const IColumn & column_left, const IColumn & column_right, size_t row_num) const
void addImpl(AggregateDataPtr place, const IColumn & column_left, const IColumn & column_right, size_t row_num, Arena *) const
{
this->data(place).update(column_left, column_right, row_num);
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const override
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena * arena) const override
{
this->data(place).mergeWith(this->data(rhs));
}
@ -412,7 +412,7 @@ public:
this->data(place).serialize(buf);
}
void deserialize(AggregateDataPtr place, ReadBuffer & buf) const override
void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override
{
this->data(place).deserialize(buf);
}

View File

@ -5,6 +5,7 @@
#include <DB/Core/Row.h>
#include <DB/DataTypes/IDataType.h>
#include <DB/Common/typeid_cast.h>
#include <DB/Common/Arena.h>
namespace DB
@ -76,19 +77,29 @@ public:
/// Как должна быть выровнена структура с данными. NOTE: Сейчас не используется (структуры с состоянием агрегации кладутся без выравнивания).
virtual size_t alignOfData() const = 0;
/// Добавить значение. columns - столбцы, содержащие аргументы, row_num - номер строки в столбцах.
virtual void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num) const = 0;
/** Adds a value into aggregation data on which place points to.
* columns points to columns containing arguments of aggregation function.
* row_num is number of row which should be added.
* Additional parameter arena should be used instead of standard memory allocator if the addition requires memory allocation.
*/
virtual void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena * arena) const = 0;
/// Объединить состояние с другим состоянием.
virtual void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const = 0;
/// Merges state (on which place points to) with other state of current aggregation function.
virtual void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena * arena) const = 0;
/// Сериализовать состояние (например, для передачи по сети).
/// Serializes state (to transmit it over the network, for example).
virtual void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const = 0;
/// Десериализовать состояние. Вызывается для пустого (только что созданного) состояния.
virtual void deserialize(AggregateDataPtr place, ReadBuffer & buf) const = 0;
/// Deserializes state. This function is called only for empty (just created) states.
virtual void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena * arena) const = 0;
/// Вставить результат в столбец.
/// Returns true if a function requires Arena to handle own states (see add(), merge(), deserialize()).
virtual bool allocatesMemoryInArena() const
{
return false;
}
/// Inserts results into a column.
virtual void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const = 0;
/** Возвращает true для агрегатных функций типа -State.
@ -103,7 +114,7 @@ public:
* Это даёт падение производительности на простых запросах в районе 12%.
* После появления более хороших компиляторов, код можно будет убрать.
*/
using AddFunc = void (*)(const IAggregateFunction *, AggregateDataPtr, const IColumn **, size_t);
using AddFunc = void (*)(const IAggregateFunction *, AggregateDataPtr, const IColumn **, size_t, Arena *);
virtual AddFunc getAddressOfAddFunction() const = 0;
};

View File

@ -25,14 +25,14 @@ public:
getDerived().setArgumentsImpl(arguments);
}
void add(AggregateDataPtr place, const IColumn ** columns, const size_t row_num) const override final
void add(AggregateDataPtr place, const IColumn ** columns, const size_t row_num, Arena * arena) const override final
{
getDerived().addImpl(place, *columns[0], *columns[1], row_num);
getDerived().addImpl(place, *columns[0], *columns[1], row_num, arena);
}
static void addFree(const IAggregateFunction * that, AggregateDataPtr place, const IColumn ** columns, size_t row_num)
static void addFree(const IAggregateFunction * that, AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena * arena)
{
return static_cast<const Derived &>(*that).addImpl(place, *columns[0], *columns[1], row_num);
static_cast<const Derived &>(*that).addImpl(place, *columns[0], *columns[1], row_num, arena);
}
IAggregateFunction::AddFunc getAddressOfAddFunction() const override final { return &addFree; }

View File

@ -26,12 +26,12 @@ public:
}
/// Добавить значение.
void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num) const override final
void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena *) const override final
{
getDerived().addImpl(place);
}
static void addFree(const IAggregateFunction * that, AggregateDataPtr place, const IColumn ** columns, size_t row_num)
static void addFree(const IAggregateFunction * that, AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena *)
{
return static_cast<const Derived &>(*that).addImpl(place);
}

View File

@ -27,14 +27,14 @@ public:
}
/// Добавить значение.
void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num) const override final
void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena * arena) const override final
{
getDerived().addImpl(place, *columns[0], row_num);
getDerived().addImpl(place, *columns[0], row_num, arena);
}
static void addFree(const IAggregateFunction * that, AggregateDataPtr place, const IColumn ** columns, size_t row_num)
static void addFree(const IAggregateFunction * that, AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena * arena)
{
return static_cast<const Derived &>(*that).addImpl(place, *columns[0], row_num);
static_cast<const Derived &>(*that).addImpl(place, *columns[0], row_num, arena);
}
IAggregateFunction::AddFunc getAddressOfAddFunction() const override { return &addFree; }

View File

@ -159,7 +159,8 @@ public:
/// Объединить состояние в последней строке с заданным
void insertMergeFrom(const IColumn & src, size_t n)
{
func->merge(getData().back(), static_cast<const ColumnAggregateFunction &>(src).getData()[n]);
Arena & arena = createOrGetArena();
func->merge(getData().back(), static_cast<const ColumnAggregateFunction &>(src).getData()[n], &arena);
}
Arena & createOrGetArena()
@ -178,7 +179,7 @@ public:
getData().push_back(arena.alloc(function->sizeOfData()));
function->create(getData().back());
ReadBufferFromString read_buffer(x.get<const String &>());
function->deserialize(getData().back(), read_buffer);
function->deserialize(getData().back(), read_buffer, &arena);
}
void insertDefault() override

View File

@ -14,7 +14,7 @@ namespace DB
class JSONCompactRowOutputStream : public JSONRowOutputStream
{
public:
JSONCompactRowOutputStream(WriteBuffer & ostr_, const Block & sample_, bool write_statistics_);
JSONCompactRowOutputStream(WriteBuffer & ostr_, const Block & sample_, bool write_statistics_, bool force_quoting_64bit_integers_);
void writeField(const IColumn & column, const IDataType & type, size_t row_num) override;
void writeFieldDelimiter() override;

View File

@ -14,7 +14,7 @@ namespace DB
class JSONEachRowRowOutputStream : public IRowOutputStream
{
public:
JSONEachRowRowOutputStream(WriteBuffer & ostr_, const Block & sample);
JSONEachRowRowOutputStream(WriteBuffer & ostr_, const Block & sample, bool force_quoting_64bit_integers_);
void writeField(const IColumn & column, const IDataType & type, size_t row_num) override;
void writeFieldDelimiter() override;
@ -30,6 +30,7 @@ private:
WriteBuffer & ostr;
size_t field_number = 0;
Names fields;
bool force_quoting_64bit_integers;
};
}

View File

@ -16,7 +16,7 @@ class JSONRowOutputStream : public IRowOutputStream
{
public:
JSONRowOutputStream(WriteBuffer & ostr_, const Block & sample_,
bool write_statistics_);
bool write_statistics_, bool force_quoting_64bit_integers_);
void writeField(const IColumn & column, const IDataType & type, size_t row_num) override;
void writeFieldDelimiter() override;
@ -68,6 +68,7 @@ protected:
Progress progress;
Stopwatch watch;
bool write_statistics;
bool force_quoting_64bit_integers;
};
}

View File

@ -53,7 +53,7 @@ public:
void deserializeTextEscaped(IColumn & column, ReadBuffer & istr) const override;
void serializeTextQuoted(const IColumn & column, size_t row_num, WriteBuffer & ostr) const override;
void deserializeTextQuoted(IColumn & column, ReadBuffer & istr) const override;
void serializeTextJSON(const IColumn & column, size_t row_num, WriteBuffer & ostr) const override;
void serializeTextJSON(const IColumn & column, size_t row_num, WriteBuffer & ostr, bool) const override;
void deserializeTextJSON(IColumn & column, ReadBuffer & istr) const override;
void serializeTextXML(const IColumn & column, size_t row_num, WriteBuffer & ostr) const override;
void serializeTextCSV(const IColumn & column, size_t row_num, WriteBuffer & ostr) const override;

View File

@ -46,7 +46,7 @@ public:
void serializeTextQuoted(const IColumn & column, size_t row_num, WriteBuffer & ostr) const override;
void deserializeTextQuoted(IColumn & column, ReadBuffer & istr) const override;
void serializeTextJSON(const IColumn & column, size_t row_num, WriteBuffer & ostr) const override;
void serializeTextJSON(const IColumn & column, size_t row_num, WriteBuffer & ostr, bool) const override;
void deserializeTextJSON(IColumn & column, ReadBuffer & istr) const override;
void serializeTextXML(const IColumn & column, size_t row_num, WriteBuffer & ostr) const override;

View File

@ -58,7 +58,7 @@ public:
static_cast<ColumnType &>(column).getData().push_back(x); /// Важно делать это в конце - для exception safety.
}
void serializeTextJSON(const IColumn & column, size_t row_num, WriteBuffer & ostr) const override
void serializeTextJSON(const IColumn & column, size_t row_num, WriteBuffer & ostr, bool) const override
{
writeChar('"', ostr);
serializeText(column, row_num, ostr);

View File

@ -58,7 +58,7 @@ public:
static_cast<ColumnType &>(column).getData().push_back(x); /// Важно делать это в конце - для exception safety.
}
void serializeTextJSON(const IColumn & column, size_t row_num, WriteBuffer & ostr) const override
void serializeTextJSON(const IColumn & column, size_t row_num, WriteBuffer & ostr, bool) const override
{
writeChar('"', ostr);
serializeText(column, row_num, ostr);

View File

@ -81,7 +81,7 @@ public:
void deserializeTextEscaped(IColumn & column, ReadBuffer & istr) const override;
void serializeTextQuoted(const IColumn & column, size_t row_num, WriteBuffer & ostr) const override;
void deserializeTextQuoted(IColumn & column, ReadBuffer & istr) const override;
void serializeTextJSON(const IColumn & column, size_t row_num, WriteBuffer & ostr) const override;
void serializeTextJSON(const IColumn & column, size_t row_num, WriteBuffer & ostr, bool) const override;
void deserializeTextJSON(IColumn & column, ReadBuffer & istr) const override;
void serializeTextXML(const IColumn & column, size_t row_num, WriteBuffer & ostr) const override;
void serializeTextCSV(const IColumn & column, size_t row_num, WriteBuffer & ostr) const override;

View File

@ -52,7 +52,7 @@ public:
void serializeTextQuoted(const IColumn & column, size_t row_num, WriteBuffer & ostr) const override;
void deserializeTextQuoted(IColumn & column, ReadBuffer & istr) const override;
void serializeTextJSON(const IColumn & column, size_t row_num, WriteBuffer & ostr) const override;
void serializeTextJSON(const IColumn & column, size_t row_num, WriteBuffer & ostr, bool) const override;
void deserializeTextJSON(IColumn & column, ReadBuffer & istr) const override;
void serializeTextXML(const IColumn & column, size_t row_num, WriteBuffer & ostr) const override;

View File

@ -39,7 +39,7 @@ public:
void serializeTextQuoted(const IColumn & column, size_t row_num, WriteBuffer & ostr) const override;
void deserializeTextQuoted(IColumn & column, ReadBuffer & istr) const override;
void serializeTextJSON(const IColumn & column, size_t row_num, WriteBuffer & ostr) const override;
void serializeTextJSON(const IColumn & column, size_t row_num, WriteBuffer & ostr, bool) const override;
void deserializeTextJSON(IColumn & column, ReadBuffer & istr) const override;
void serializeTextXML(const IColumn & column, size_t row_num, WriteBuffer & ostr) const override;

View File

@ -31,7 +31,7 @@ public:
void deserializeTextEscaped(IColumn & column, ReadBuffer & istr) const override;
void serializeTextQuoted(const IColumn & column, size_t row_num, WriteBuffer & ostr) const override;
void deserializeTextQuoted(IColumn & column, ReadBuffer & istr) const override;
void serializeTextJSON(const IColumn & column, size_t row_num, WriteBuffer & ostr) const override;
void serializeTextJSON(const IColumn & column, size_t row_num, WriteBuffer & ostr, bool) const override;
void deserializeTextJSON(IColumn & column, ReadBuffer & istr) const override;
void serializeTextXML(const IColumn & column, size_t row_num, WriteBuffer & ostr) const override;

View File

@ -87,9 +87,10 @@ public:
*/
virtual void serializeText(const IColumn & column, size_t row_num, WriteBuffer & ostr) const = 0;
/** Текстовая сериализация в виде литерала для использования в формате JSON.
/** Text serialization intended for using in JSON format.
* force_quoting_64bit_integers parameter forces to brace UInt64 and Int64 types into quotes.
*/
virtual void serializeTextJSON(const IColumn & column, size_t row_num, WriteBuffer & ostr) const = 0;
virtual void serializeTextJSON(const IColumn & column, size_t row_num, WriteBuffer & ostr, bool force_quoting_64bit_integers) const = 0;
virtual void deserializeTextJSON(IColumn & column, ReadBuffer & istr) const = 0;
/** Текстовая сериализация для подстановки в формат XML.

View File

@ -38,7 +38,7 @@ public:
void serializeTextQuoted(const IColumn & column, size_t row_num, WriteBuffer & ostr) const override { throwNoSerialization(); }
void deserializeTextQuoted(IColumn & column, ReadBuffer & istr) const override { throwNoSerialization(); }
void serializeTextJSON(const IColumn & column, size_t row_num, WriteBuffer & ostr) const override { throwNoSerialization(); }
void serializeTextJSON(const IColumn & column, size_t row_num, WriteBuffer & ostr, bool) const override { throwNoSerialization(); }
void deserializeTextJSON(IColumn & column, ReadBuffer & istr) const override { throwNoSerialization(); }
void serializeTextCSV(const IColumn & column, size_t row_num, WriteBuffer & ostr) const override { throwNoSerialization(); }

View File

@ -56,7 +56,7 @@ public:
deserializeText(column, istr);
}
inline void serializeTextJSON(const IColumn & column, size_t row_num, WriteBuffer & ostr) const override;
inline void serializeTextJSON(const IColumn & column, size_t row_num, WriteBuffer & ostr, bool) const override;
void deserializeTextJSON(IColumn & column, ReadBuffer & istr) const override
{
@ -121,7 +121,7 @@ public:
void deserializeTextEscaped(IColumn & column, ReadBuffer & istr) const override {}
void serializeTextQuoted(const IColumn & column, size_t row_num, WriteBuffer & ostr) const override {}
void deserializeTextQuoted(IColumn & column, ReadBuffer & istr) const override {}
void serializeTextJSON(const IColumn & column, size_t row_num, WriteBuffer & ostr) const override {}
void serializeTextJSON(const IColumn & column, size_t row_num, WriteBuffer & ostr, bool) const override {}
void deserializeTextJSON(IColumn & column, ReadBuffer & istr) const override {}
void serializeTextCSV(const IColumn & column, size_t row_num, WriteBuffer & ostr) const override {}
void deserializeTextCSV(IColumn & column, ReadBuffer & istr, const char delimiter) const override {}
@ -129,26 +129,30 @@ public:
Field getDefault() const override { return {}; }
};
template <typename FType> inline void IDataTypeNumber<FType>::serializeTextJSON(const IColumn & column, size_t row_num, WriteBuffer & ostr) const
template <typename FType> inline void IDataTypeNumber<FType>::serializeTextJSON(const IColumn & column, size_t row_num, WriteBuffer & ostr, bool) const
{
serializeText(column, row_num, ostr);
}
template <> inline void IDataTypeNumber<Int64>::serializeTextJSON(const IColumn & column, size_t row_num, WriteBuffer & ostr) const
template <> inline void IDataTypeNumber<Int64>::serializeTextJSON(const IColumn & column, size_t row_num, WriteBuffer & ostr, bool force_quoting_64bit_integers) const
{
writeChar('"', ostr);
if (force_quoting_64bit_integers)
writeChar('"', ostr);
serializeText(column, row_num, ostr);
writeChar('"', ostr);
if (force_quoting_64bit_integers)
writeChar('"', ostr);
}
template <> inline void IDataTypeNumber<UInt64>::serializeTextJSON(const IColumn & column, size_t row_num, WriteBuffer & ostr) const
template <> inline void IDataTypeNumber<UInt64>::serializeTextJSON(const IColumn & column, size_t row_num, WriteBuffer & ostr, bool force_quoting_64bit_integers) const
{
writeChar('"', ostr);
if (force_quoting_64bit_integers)
writeChar('"', ostr);
serializeText(column, row_num, ostr);
writeChar('"', ostr);
if (force_quoting_64bit_integers)
writeChar('"', ostr);
}
template <> inline void IDataTypeNumber<Float32>::serializeTextJSON(const IColumn & column, size_t row_num, WriteBuffer & ostr) const
template <> inline void IDataTypeNumber<Float32>::serializeTextJSON(const IColumn & column, size_t row_num, WriteBuffer & ostr, bool) const
{
auto x = static_cast<const ColumnType &>(column).getData()[row_num];
if (likely(std::isfinite(x)))
@ -157,7 +161,7 @@ template <> inline void IDataTypeNumber<Float32>::serializeTextJSON(const IColum
writeCString("null", ostr);
}
template <> inline void IDataTypeNumber<Float64>::serializeTextJSON(const IColumn & column, size_t row_num, WriteBuffer & ostr) const
template <> inline void IDataTypeNumber<Float64>::serializeTextJSON(const IColumn & column, size_t row_num, WriteBuffer & ostr, bool) const
{
auto x = static_cast<const ColumnType &>(column).getData()[row_num];
if (likely(std::isfinite(x)))

View File

@ -2714,7 +2714,7 @@ public:
try
{
for (size_t j = current_offset; j < next_offset; ++j)
agg_func.add(place, aggregate_arguments, j);
agg_func.add(place, aggregate_arguments, j, nullptr);
agg_func.insertResultInto(place, res_col);
}

View File

@ -170,7 +170,19 @@ struct ToStartOfMinuteImpl
{
static inline UInt32 execute(UInt32 t, const DateLUTImpl & remote_date_lut, const DateLUTImpl & local_date_lut)
{
return remote_date_lut.toStartOfMinuteInaccurate(t);
if (&remote_date_lut == &local_date_lut)
return local_date_lut.toStartOfMinuteInaccurate(t);
else
{
time_t remote_ts = remote_date_lut.toTimeInaccurate(t) + 86400;
remote_ts = remote_date_lut.toStartOfMinuteInaccurate(remote_ts);
const auto & values = remote_date_lut.getValues(t);
return local_date_lut.makeDateTime(values.year, values.month, values.day_of_month,
remote_date_lut.toHourInaccurate(remote_ts),
remote_date_lut.toMinuteInaccurate(remote_ts),
remote_date_lut.toSecondInaccurate(remote_ts));
}
}
static inline UInt32 execute(UInt16 d, const DateLUTImpl & remote_date_lut, const DateLUTImpl & local_date_lut)
{
@ -184,7 +196,19 @@ struct ToStartOfFiveMinuteImpl
{
static inline UInt32 execute(UInt32 t, const DateLUTImpl & remote_date_lut, const DateLUTImpl & local_date_lut)
{
return remote_date_lut.toStartOfFiveMinuteInaccurate(t);
if (&remote_date_lut == &local_date_lut)
return local_date_lut.toStartOfFiveMinuteInaccurate(t);
else
{
time_t remote_ts = remote_date_lut.toTimeInaccurate(t) + 86400;
remote_ts = remote_date_lut.toStartOfFiveMinuteInaccurate(remote_ts);
const auto & values = remote_date_lut.getValues(t);
return local_date_lut.makeDateTime(values.year, values.month, values.day_of_month,
remote_date_lut.toHourInaccurate(remote_ts),
remote_date_lut.toMinuteInaccurate(remote_ts),
remote_date_lut.toSecondInaccurate(remote_ts));
}
}
static inline UInt32 execute(UInt16 d, const DateLUTImpl & remote_date_lut, const DateLUTImpl & local_date_lut)
{
@ -198,7 +222,19 @@ struct ToStartOfHourImpl
{
static inline UInt32 execute(UInt32 t, const DateLUTImpl & remote_date_lut, const DateLUTImpl & local_date_lut)
{
return remote_date_lut.toStartOfHourInaccurate(t);
if (&remote_date_lut == &local_date_lut)
return local_date_lut.toStartOfHourInaccurate(t);
else
{
time_t remote_ts = remote_date_lut.toTimeInaccurate(t) + 86400;
remote_ts = remote_date_lut.toStartOfHourInaccurate(remote_ts);
const auto & values = remote_date_lut.getValues(t);
return local_date_lut.makeDateTime(values.year, values.month, values.day_of_month,
remote_date_lut.toHourInaccurate(remote_ts),
remote_date_lut.toMinuteInaccurate(remote_ts),
remote_date_lut.toSecondInaccurate(remote_ts));
}
}
static inline UInt32 execute(UInt16 d, const DateLUTImpl & remote_date_lut, const DateLUTImpl & local_date_lut)
{

View File

@ -25,6 +25,7 @@
#include <DB/Common/UnicodeBar.h>
#include <DB/Functions/IFunction.h>
#include <DB/Functions/NumberTraits.h>
#include <DB/Functions/ObjectPool.h>
#include <DB/Interpreters/ExpressionActions.h>
#include <ext/range.hpp>
@ -1242,13 +1243,22 @@ public:
IColumn & result_column = *result_column_ptr;
result_column.reserve(column_with_states->size());
auto arena = (agg_func.allocatesMemoryInArena()) ?
arenas_pool.get(0, []{ return new Arena(); }) :
nullptr;
const auto & states = column_with_states->getData();
for (const auto & state_to_add : states)
{
agg_func.merge(place.get(), state_to_add);
/// Will pass empty arena if agg_func does not allocate memory in arena
agg_func.merge(place.get(), state_to_add, arena.get());
agg_func.insertResultInto(place.get(), result_column);
}
}
private:
ObjectPool<Arena, int> arenas_pool; /// Used only for complex functions
};

View File

@ -1,3 +1,4 @@
#pragma once
#include <map>
#include <memory>
#include <stack>

View File

@ -17,6 +17,7 @@
#include <DB/Core/StringRef.h>
#include <DB/Common/Exception.h>
#include <DB/Common/StringUtils.h>
#include <DB/Common/Arena.h>
#include <DB/IO/ReadBuffer.h>
#include <DB/IO/VarInt.h>
@ -127,6 +128,18 @@ inline void readStringBinary(std::string & s, ReadBuffer & buf, size_t MAX_STRIN
}
inline StringRef readStringBinaryInto(Arena & arena, ReadBuffer & buf)
{
size_t size = 0;
readVarUInt(size, buf);
char * data = arena.alloc(size);
buf.readStrict(data, size);
return StringRef(data, size);
}
template <typename T>
void readVectorBinary(std::vector<T> & v, ReadBuffer & buf, size_t MAX_VECTOR_SIZE = DEFAULT_MAX_STRING_SIZE)
{

View File

@ -129,8 +129,8 @@ struct AggregationMethodOneNumber
};
/// Из значения в хэш-таблице получить AggregateDataPtr.
static AggregateDataPtr & getAggregateData(Mapped & value) { return value; }
static const AggregateDataPtr & getAggregateData(const Mapped & value) { return value; }
static AggregateDataPtr & getAggregateData(Mapped & value) { return value; }
static const AggregateDataPtr & getAggregateData(const Mapped & value) { return value; }
/** Разместить дополнительные данные, если это необходимо, в случае, когда в хэш-таблицу был вставлен новый ключ.
*/
@ -199,8 +199,8 @@ struct AggregationMethodString
}
};
static AggregateDataPtr & getAggregateData(Mapped & value) { return value; }
static const AggregateDataPtr & getAggregateData(const Mapped & value) { return value; }
static AggregateDataPtr & getAggregateData(Mapped & value) { return value; }
static const AggregateDataPtr & getAggregateData(const Mapped & value) { return value; }
static void onNewKey(typename Data::value_type & value, size_t keys_size, size_t i, StringRefs & keys, Arena & pool)
{
@ -260,8 +260,8 @@ struct AggregationMethodFixedString
}
};
static AggregateDataPtr & getAggregateData(Mapped & value) { return value; }
static const AggregateDataPtr & getAggregateData(const Mapped & value) { return value; }
static AggregateDataPtr & getAggregateData(Mapped & value) { return value; }
static const AggregateDataPtr & getAggregateData(const Mapped & value) { return value; }
static void onNewKey(typename Data::value_type & value, size_t keys_size, size_t i, StringRefs & keys, Arena & pool)
{
@ -314,8 +314,8 @@ struct AggregationMethodKeysFixed
}
};
static AggregateDataPtr & getAggregateData(Mapped & value) { return value; }
static const AggregateDataPtr & getAggregateData(const Mapped & value) { return value; }
static AggregateDataPtr & getAggregateData(Mapped & value) { return value; }
static const AggregateDataPtr & getAggregateData(const Mapped & value) { return value; }
static void onNewKey(typename Data::value_type & value, size_t keys_size, size_t i, StringRefs & keys, Arena & pool)
{
@ -373,8 +373,8 @@ struct AggregationMethodConcat
}
};
static AggregateDataPtr & getAggregateData(Mapped & value) { return value; }
static const AggregateDataPtr & getAggregateData(const Mapped & value) { return value; }
static AggregateDataPtr & getAggregateData(Mapped & value) { return value; }
static const AggregateDataPtr & getAggregateData(const Mapped & value) { return value; }
static void onNewKey(typename Data::value_type & value, size_t keys_size, size_t i, StringRefs & keys, Arena & pool)
{
@ -451,8 +451,8 @@ struct AggregationMethodSerialized
}
};
static AggregateDataPtr & getAggregateData(Mapped & value) { return value; }
static const AggregateDataPtr & getAggregateData(const Mapped & value) { return value; }
static AggregateDataPtr & getAggregateData(Mapped & value) { return value; }
static const AggregateDataPtr & getAggregateData(const Mapped & value) { return value; }
static void onNewKey(typename Data::value_type & value, size_t keys_size, size_t i, StringRefs & keys, Arena & pool)
{
@ -510,8 +510,8 @@ struct AggregationMethodHashed
}
};
static AggregateDataPtr & getAggregateData(Mapped & value) { return value.second; }
static const AggregateDataPtr & getAggregateData(const Mapped & value) { return value.second; }
static AggregateDataPtr & getAggregateData(Mapped & value) { return value.second; }
static const AggregateDataPtr & getAggregateData(const Mapped & value) { return value.second; }
static void onNewKey(typename Data::value_type & value, size_t keys_size, size_t i, StringRefs & keys, Arena & pool)
{
@ -569,23 +569,23 @@ struct AggregatedDataVariants : private boost::noncopyable
std::unique_ptr<AggregationMethodOneNumber<UInt32, AggregatedDataWithUInt64Key>> key32;
std::unique_ptr<AggregationMethodOneNumber<UInt64, AggregatedDataWithUInt64Key>> key64;
std::unique_ptr<AggregationMethodString<AggregatedDataWithStringKey>> key_string;
std::unique_ptr<AggregationMethodFixedString<AggregatedDataWithStringKey>> key_fixed_string;
std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys128>> keys128;
std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys256>> keys256;
std::unique_ptr<AggregationMethodHashed<AggregatedDataHashed>> hashed;
std::unique_ptr<AggregationMethodConcat<AggregatedDataWithStringKey>> concat;
std::unique_ptr<AggregationMethodSerialized<AggregatedDataWithStringKey>> serialized;
std::unique_ptr<AggregationMethodString<AggregatedDataWithStringKey>> key_string;
std::unique_ptr<AggregationMethodFixedString<AggregatedDataWithStringKey>> key_fixed_string;
std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys128>> keys128;
std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys256>> keys256;
std::unique_ptr<AggregationMethodHashed<AggregatedDataHashed>> hashed;
std::unique_ptr<AggregationMethodConcat<AggregatedDataWithStringKey>> concat;
std::unique_ptr<AggregationMethodSerialized<AggregatedDataWithStringKey>> serialized;
std::unique_ptr<AggregationMethodOneNumber<UInt32, AggregatedDataWithUInt64KeyTwoLevel>> key32_two_level;
std::unique_ptr<AggregationMethodOneNumber<UInt64, AggregatedDataWithUInt64KeyTwoLevel>> key64_two_level;
std::unique_ptr<AggregationMethodString<AggregatedDataWithStringKeyTwoLevel>> key_string_two_level;
std::unique_ptr<AggregationMethodFixedString<AggregatedDataWithStringKeyTwoLevel>> key_fixed_string_two_level;
std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys128TwoLevel>> keys128_two_level;
std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys256TwoLevel>> keys256_two_level;
std::unique_ptr<AggregationMethodHashed<AggregatedDataHashedTwoLevel>> hashed_two_level;
std::unique_ptr<AggregationMethodConcat<AggregatedDataWithStringKeyTwoLevel>> concat_two_level;
std::unique_ptr<AggregationMethodSerialized<AggregatedDataWithStringKeyTwoLevel>> serialized_two_level;
std::unique_ptr<AggregationMethodFixedString<AggregatedDataWithStringKeyTwoLevel>> key_fixed_string_two_level;
std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys128TwoLevel>> keys128_two_level;
std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys256TwoLevel>> keys256_two_level;
std::unique_ptr<AggregationMethodHashed<AggregatedDataHashedTwoLevel>> hashed_two_level;
std::unique_ptr<AggregationMethodConcat<AggregatedDataWithStringKeyTwoLevel>> concat_two_level;
std::unique_ptr<AggregationMethodSerialized<AggregatedDataWithStringKeyTwoLevel>> serialized_two_level;
std::unique_ptr<AggregationMethodOneNumber<UInt64, AggregatedDataWithUInt64KeyHash64>> key64_hash64;
std::unique_ptr<AggregationMethodString<AggregatedDataWithStringKeyHash64>> key_string_hash64;
@ -1059,7 +1059,8 @@ protected:
void executeWithoutKeyImpl(
AggregatedDataWithoutKey & res,
size_t rows,
AggregateFunctionInstruction * aggregate_instructions) const;
AggregateFunctionInstruction * aggregate_instructions,
Arena * arena) const;
template <typename Method>
void writeToTemporaryFileImpl(
@ -1099,27 +1100,31 @@ public:
void executeSpecializedWithoutKey(
AggregatedDataWithoutKey & res,
size_t rows,
AggregateColumns & aggregate_columns) const;
AggregateColumns & aggregate_columns,
Arena * arena) const;
protected:
/// Слить данные из хэш-таблицы src в dst.
template <typename Method, typename Table>
void mergeDataImpl(
Table & table_dst,
Table & table_src) const;
Table & table_src,
Arena * arena) const;
/// Слить данные из хэш-таблицы src в dst, но только для ключей, которые уже есть в dst. В остальных случаях, слить данные в overflows.
template <typename Method, typename Table>
void mergeDataNoMoreKeysImpl(
Table & table_dst,
AggregatedDataWithoutKey & overflows,
Table & table_src) const;
Table & table_src,
Arena * arena) const;
/// То же самое, но игнорирует остальные ключи.
template <typename Method, typename Table>
void mergeDataOnlyExistingKeysImpl(
Table & table_dst,
Table & table_src) const;
Table & table_src,
Arena * arena) const;
void mergeWithoutKeyDataImpl(
ManyAggregatedDataVariants & non_empty_data) const;
@ -1204,7 +1209,7 @@ protected:
template <typename Method>
void mergeBucketImpl(
ManyAggregatedDataVariants & data, Int32 bucket) const;
ManyAggregatedDataVariants & data, Int32 bucket, Arena * arena) const;
template <typename Method>
void convertBlockToTwoLevelImpl(

View File

@ -225,7 +225,10 @@ struct Settings
M(SettingBool, add_http_cors_header, false) \
\
/** Skip columns with unknown names from input data (it works for JSONEachRow and TSKV formats). */ \
M(SettingBool, input_format_skip_unknown_fields, false)
M(SettingBool, input_format_skip_unknown_fields, false) \
\
/** Controls quoting of 64-bit integers in JSON output format. */ \
M(SettingBool, output_format_json_quote_64bit_integers, true)
/// Всевозможные ограничения на выполнение запроса.
Limits limits;

View File

@ -74,11 +74,12 @@ struct AggregateFunctionsUpdater
const Sizes & offsets_of_aggregate_states_,
Aggregator::AggregateColumns & aggregate_columns_,
AggregateDataPtr & value_,
size_t row_num_)
size_t row_num_,
Arena * arena_)
: aggregate_functions(aggregate_functions_),
offsets_of_aggregate_states(offsets_of_aggregate_states_),
aggregate_columns(aggregate_columns_),
value(value_), row_num(row_num_)
value(value_), row_num(row_num_), arena(arena_)
{
}
@ -90,6 +91,7 @@ struct AggregateFunctionsUpdater
Aggregator::AggregateColumns & aggregate_columns;
AggregateDataPtr & value;
size_t row_num;
Arena * arena;
};
template <typename AggregateFunction, size_t column_num>
@ -98,7 +100,7 @@ void AggregateFunctionsUpdater::operator()()
static_cast<AggregateFunction *>(aggregate_functions[column_num])->add(
value + offsets_of_aggregate_states[column_num],
&aggregate_columns[column_num][0],
row_num);
row_num, arena);
}
struct AggregateFunctionsCreator
@ -205,7 +207,7 @@ void NO_INLINE Aggregator::executeSpecializedCase(
/// Добавляем значения в агрегатные функции.
AggregateFunctionsList::forEach(AggregateFunctionsUpdater(
aggregate_functions, offsets_of_aggregate_states, aggregate_columns, value, i));
aggregate_functions, offsets_of_aggregate_states, aggregate_columns, value, i, aggregates_pool));
method.onExistingKey(key, keys, *aggregates_pool);
continue;
@ -254,7 +256,7 @@ void NO_INLINE Aggregator::executeSpecializedCase(
/// Добавляем значения в агрегатные функции.
AggregateFunctionsList::forEach(AggregateFunctionsUpdater(
aggregate_functions, offsets_of_aggregate_states, aggregate_columns, value, i));
aggregate_functions, offsets_of_aggregate_states, aggregate_columns, value, i, aggregates_pool));
}
}
@ -264,7 +266,8 @@ template <typename AggregateFunctionsList>
void NO_INLINE Aggregator::executeSpecializedWithoutKey(
AggregatedDataWithoutKey & res,
size_t rows,
AggregateColumns & aggregate_columns) const
AggregateColumns & aggregate_columns,
Arena * arena) const
{
/// Оптимизация в случае единственной агрегатной функции count.
AggregateFunctionCount * agg_count = params.aggregates_size == 1
@ -278,7 +281,7 @@ void NO_INLINE Aggregator::executeSpecializedWithoutKey(
for (size_t i = 0; i < rows; ++i)
{
AggregateFunctionsList::forEach(AggregateFunctionsUpdater(
aggregate_functions, offsets_of_aggregate_states, aggregate_columns, res, i));
aggregate_functions, offsets_of_aggregate_states, aggregate_columns, res, i, arena));
}
}
}

View File

@ -54,7 +54,7 @@ public:
{
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const override
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena * arena) const override
{
}
@ -63,7 +63,7 @@ public:
writeBinary(UInt8(0), buf);
}
void deserialize(AggregateDataPtr place, ReadBuffer & buf) const override
void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override
{
UInt8 tmp;
readBinary(tmp, buf);

View File

@ -8,6 +8,28 @@ namespace DB
namespace
{
static IAggregateFunction * createWithExtraTypes(const IDataType & argument_type)
{
if (typeid_cast<const DataTypeDateTime *>(&argument_type)) return new AggregateFunctionGroupUniqArray<DataTypeDateTime::FieldType>;
else if (typeid_cast<const DataTypeDate *>(&argument_type)) return new AggregateFunctionGroupUniqArray<DataTypeDate::FieldType>;
else
{
/// Check that we can use plain version of AggreagteFunctionGroupUniqArrayGeneric
if (typeid_cast<const DataTypeString*>(&argument_type) || typeid_cast<const DataTypeFixedString*>(&argument_type))
return new AggreagteFunctionGroupUniqArrayGeneric<true>;
auto * array_type = typeid_cast<const DataTypeArray *>(&argument_type);
if (array_type)
{
auto nested_type = array_type->getNestedType();
if (nested_type->isNumeric() || typeid_cast<DataTypeFixedString *>(nested_type.get()))
return new AggreagteFunctionGroupUniqArrayGeneric<true>;
}
return new AggreagteFunctionGroupUniqArrayGeneric<false>;
}
}
AggregateFunctionPtr createAggregateFunctionGroupUniqArray(const std::string & name, const DataTypes & argument_types)
{
if (argument_types.size() != 1)
@ -16,6 +38,9 @@ AggregateFunctionPtr createAggregateFunctionGroupUniqArray(const std::string & n
AggregateFunctionPtr res(createWithNumericType<AggregateFunctionGroupUniqArray>(*argument_types[0]));
if (!res)
res = AggregateFunctionPtr(createWithExtraTypes(*argument_types[0]));
if (!res)
throw Exception("Illegal type " + argument_types[0]->getName() +
" of argument for aggregate function " + name, ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);

View File

@ -26,8 +26,10 @@
#include <DB/IO/ReadBufferFromFileDescriptor.h>
#include <DB/IO/WriteBufferFromFileDescriptor.h>
#include <DB/IO/WriteBufferFromFile.h>
#include <DB/IO/ReadHelpers.h>
#include <DB/IO/WriteHelpers.h>
#include <DB/IO/Operators.h>
#include <DB/DataStreams/RemoteBlockInputStream.h>
@ -38,8 +40,8 @@
#include "InterruptListener.h"
/** Инструмент для измерения производительности ClickHouse
* при выполнении запросов с фиксированным количеством одновременных запросов.
/** A tool for evaluating ClickHouse performance.
* The tool emulates a case with fixed amount of simultaneously executing queries.
*/
namespace DB
@ -58,12 +60,13 @@ public:
Benchmark(unsigned concurrency_, double delay_,
const String & host_, UInt16 port_, const String & default_database_,
const String & user_, const String & password_, const String & stage,
bool randomize_,
const Settings & settings_)
: concurrency(concurrency_), delay(delay_), queue(concurrency),
bool randomize_, size_t max_iterations_, double max_time_,
const String & json_path_, const Settings & settings_)
:
concurrency(concurrency_), delay(delay_), queue(concurrency),
connections(concurrency, host_, port_, default_database_, user_, password_),
randomize(randomize_),
settings(settings_), pool(concurrency)
randomize(randomize_), max_iterations(max_iterations_), max_time(max_time_),
json_path(json_path_), settings(settings_), pool(concurrency)
{
std::cerr << std::fixed << std::setprecision(3);
@ -76,6 +79,11 @@ public:
else
throw Exception("Unknown query processing stage: " + stage, ErrorCodes::BAD_ARGUMENTS);
if (!json_path.empty() && Poco::File(json_path).exists()) /// Clear file with previous results
{
Poco::File(json_path).remove();
}
readQueries();
run();
}
@ -94,6 +102,9 @@ private:
ConnectionPool connections;
bool randomize;
size_t max_iterations;
double max_time;
String json_path;
Settings settings;
QueryProcessingStage::Enum query_processing_stage;
@ -183,18 +194,13 @@ private:
Stopwatch watch;
/// В цикле, кладём все запросы в очередь.
for (size_t i = 0; !interrupt_listener.check(); ++i)
for (size_t i = 0; !max_iterations || i < max_iterations; ++i)
{
if (i >= queries.size())
i = 0;
size_t query_index = randomize
? distribution(generator)
: i;
size_t query_index = randomize ? distribution(generator) : i % queries.size();
queue.push(queries[query_index]);
if (watch.elapsedSeconds() > delay)
if (delay > 0 && watch.elapsedSeconds() > delay)
{
auto total_queries = 0;
{
@ -206,6 +212,18 @@ private:
report(info_per_interval);
watch.restart();
}
if (max_time > 0 && info_total.watch.elapsedSeconds() >= max_time)
{
std::cout << "Stopping launch of queries. Requested time limit is exhausted.\n";
break;
}
if (interrupt_listener.check())
{
std::cout << "Stopping launch of queries. SIGINT recieved.\n";
break;
}
}
/// Попросим потоки завершиться.
@ -214,6 +232,9 @@ private:
pool.wait();
info_total.watch.stop();
if (!json_path.empty())
reportJSON(info_total, json_path);
printNumberOfQueriesExecuted(info_total.queries);
report(info_total);
}
@ -320,17 +341,67 @@ private:
<< "result MiB/s: " << (info.result_bytes / seconds / 1048576) << "."
<< "\n";
for (size_t percent = 0; percent <= 90; percent += 10)
auto print_percentile = [&](double percent)
{
std::cerr << percent << "%\t" << info.sampler.quantileInterpolated(percent / 100.0) << " sec." << std::endl;
};
std::cerr << "95%\t" << info.sampler.quantileInterpolated(0.95) << " sec.\n";
std::cerr << "99%\t" << info.sampler.quantileInterpolated(0.99) << " sec.\n";
std::cerr << "99.9%\t" << info.sampler.quantileInterpolated(0.999) << " sec.\n";
std::cerr << "99.99%\t" << info.sampler.quantileInterpolated(0.9999) << " sec.\n";
std::cerr << "100%\t" << info.sampler.quantileInterpolated(1) << " sec.\n";
for (int percent = 0; percent <= 90; percent += 10)
print_percentile(percent);
print_percentile(95);
print_percentile(99);
print_percentile(99.9);
print_percentile(99.99);
info.clear();
}
void reportJSON(Stats & info, const std::string & filename)
{
WriteBufferFromFile json_out(filename);
std::lock_guard<std::mutex> lock(mutex);
auto print_key_value = [&](auto key, auto value, bool with_comma = true)
{
json_out << double_quote << key << ": " << value << (with_comma ? ",\n" : "\n");
};
auto print_percentile = [&](auto percent, bool with_comma = true)
{
json_out << "\"" << percent << "\"" << ": " << info.sampler.quantileInterpolated(percent / 100.0) << (with_comma ? ",\n" : "\n");
};
json_out << "{\n";
json_out << double_quote << "statistics" << ": {\n";
double seconds = info.watch.elapsedSeconds();
print_key_value("QPS", info.queries / seconds);
print_key_value("RPS", info.queries / seconds);
print_key_value("MiBPS", info.queries / seconds);
print_key_value("RPS_result", info.queries / seconds);
print_key_value("MiBPS_result", info.queries / seconds);
print_key_value("num_queries", info.queries / seconds, false);
json_out << "},\n";
json_out << double_quote << "query_time_percentiles" << ": {\n";
for (int percent = 0; percent <= 90; percent += 10)
print_percentile(percent);
print_percentile(95);
print_percentile(99);
print_percentile(99.9);
print_percentile(99.99, false);
json_out << "}\n";
json_out << "}\n";
}
};
}
@ -342,18 +413,24 @@ int main(int argc, char ** argv)
try
{
using boost::program_options::value;
boost::program_options::options_description desc("Allowed options");
desc.add_options()
("help", "produce help message")
("concurrency,c", boost::program_options::value<unsigned>()->default_value(1), "number of parallel queries")
("delay,d", boost::program_options::value<double>()->default_value(1), "delay between reports in seconds")
("host,h", boost::program_options::value<std::string>()->default_value("localhost"), "")
("port", boost::program_options::value<UInt16>()->default_value(9000), "")
("user", boost::program_options::value<std::string>()->default_value("default"), "")
("password", boost::program_options::value<std::string>()->default_value(""), "")
("database", boost::program_options::value<std::string>()->default_value("default"), "")
("stage", boost::program_options::value<std::string>()->default_value("complete"), "request query processing up to specified stage")
("randomize,r", boost::program_options::value<bool>()->default_value(false), "randomize order of execution")
("help", "produce help message")
("concurrency,c", value<unsigned>()->default_value(1), "number of parallel queries")
("delay,d", value<double>()->default_value(1), "delay between intermediate reports in seconds (set 0 to disable reports)")
("stage", value<std::string>()->default_value("complete"), "request query processing up to specified stage")
("iterations,i", value<size_t>()->default_value(0), "amount of queries to be executed")
("timelimit,t", value<double>()->default_value(0.), "stop launch of queries after specified time limit")
("randomize,r", value<bool>()->default_value(false), "randomize order of execution")
("json", value<std::string>()->default_value(""), "write final report to specified file in JSON format")
("host,h", value<std::string>()->default_value("localhost"), "")
("port", value<UInt16>()->default_value(9000), "")
("user", value<std::string>()->default_value("default"), "")
("password", value<std::string>()->default_value(""), "")
("database", value<std::string>()->default_value("default"), "")
#define DECLARE_SETTING(TYPE, NAME, DEFAULT) (#NAME, boost::program_options::value<std::string> (), "Settings.h")
#define DECLARE_LIMIT(TYPE, NAME, DEFAULT) (#NAME, boost::program_options::value<std::string> (), "Limits.h")
APPLY_FOR_SETTINGS(DECLARE_SETTING)
@ -392,6 +469,9 @@ int main(int argc, char ** argv)
options["password"].as<std::string>(),
options["stage"].as<std::string>(),
options["randomize"].as<bool>(),
options["iterations"].as<size_t>(),
options["timelimit"].as<double>(),
options["json"].as<std::string>(),
settings);
}
catch (const Exception & e)

View File

@ -135,12 +135,13 @@ static BlockOutputStreamPtr getOutputImpl(const String & name, WriteBuffer & buf
return std::make_shared<BlockOutputStreamFromRowOutputStream>(std::make_shared<ValuesRowOutputStream>(buf));
else if (name == "JSON")
return std::make_shared<BlockOutputStreamFromRowOutputStream>(std::make_shared<JSONRowOutputStream>(buf, sample,
context.getSettingsRef().output_format_write_statistics));
context.getSettingsRef().output_format_write_statistics, context.getSettingsRef().output_format_json_quote_64bit_integers));
else if (name == "JSONCompact")
return std::make_shared<BlockOutputStreamFromRowOutputStream>(std::make_shared<JSONCompactRowOutputStream>(buf, sample,
context.getSettingsRef().output_format_write_statistics));
context.getSettingsRef().output_format_write_statistics, context.getSettingsRef().output_format_json_quote_64bit_integers));
else if (name == "JSONEachRow")
return std::make_shared<BlockOutputStreamFromRowOutputStream>(std::make_shared<JSONEachRowRowOutputStream>(buf, sample));
return std::make_shared<BlockOutputStreamFromRowOutputStream>(std::make_shared<JSONEachRowRowOutputStream>(buf, sample,
context.getSettingsRef().output_format_json_quote_64bit_integers));
else if (name == "XML")
return std::make_shared<BlockOutputStreamFromRowOutputStream>(std::make_shared<XMLRowOutputStream>(buf, sample,
context.getSettingsRef().output_format_write_statistics));

View File

@ -223,7 +223,7 @@ template <class TSortCursor>
void GraphiteRollupSortedBlockInputStream::accumulateRow(TSortCursor & cursor)
{
if (current_pattern)
current_pattern->function->add(place_for_aggregate_state.data(), &cursor->all_columns[value_column_num], cursor->pos);
current_pattern->function->add(place_for_aggregate_state.data(), &cursor->all_columns[value_column_num], cursor->pos, nullptr);
}
}

View File

@ -6,15 +6,15 @@
namespace DB
{
JSONCompactRowOutputStream::JSONCompactRowOutputStream(WriteBuffer & ostr_, const Block & sample_, bool write_statistics_)
: JSONRowOutputStream(ostr_, sample_, write_statistics_)
JSONCompactRowOutputStream::JSONCompactRowOutputStream(WriteBuffer & ostr_, const Block & sample_, bool write_statistics_, bool force_quoting_64bit_integers_)
: JSONRowOutputStream(ostr_, sample_, write_statistics_, force_quoting_64bit_integers_)
{
}
void JSONCompactRowOutputStream::writeField(const IColumn & column, const IDataType & type, size_t row_num)
{
type.serializeTextJSON(column, row_num, *ostr);
type.serializeTextJSON(column, row_num, *ostr, force_quoting_64bit_integers);
++field_number;
}
@ -56,7 +56,7 @@ void JSONCompactRowOutputStream::writeTotals()
writeChar(',', *ostr);
const ColumnWithTypeAndName & column = totals.getByPosition(i);
column.type->serializeTextJSON(*column.column.get(), 0, *ostr);
column.type->serializeTextJSON(*column.column.get(), 0, *ostr, force_quoting_64bit_integers);
}
writeChar(']', *ostr);
@ -64,7 +64,7 @@ void JSONCompactRowOutputStream::writeTotals()
}
static void writeExtremesElement(const char * title, const Block & extremes, size_t row_num, WriteBuffer & ostr)
static void writeExtremesElement(const char * title, const Block & extremes, size_t row_num, WriteBuffer & ostr, bool force_quoting_64bit_integers)
{
writeCString("\t\t\"", ostr);
writeCString(title, ostr);
@ -77,7 +77,7 @@ static void writeExtremesElement(const char * title, const Block & extremes, siz
writeChar(',', ostr);
const ColumnWithTypeAndName & column = extremes.getByPosition(i);
column.type->serializeTextJSON(*column.column.get(), row_num, ostr);
column.type->serializeTextJSON(*column.column.get(), row_num, ostr, force_quoting_64bit_integers);
}
writeChar(']', ostr);
@ -92,9 +92,9 @@ void JSONCompactRowOutputStream::writeExtremes()
writeCString("\t\"extremes\":\n", *ostr);
writeCString("\t{\n", *ostr);
writeExtremesElement("min", extremes, 0, *ostr);
writeExtremesElement("min", extremes, 0, *ostr, force_quoting_64bit_integers);
writeCString(",\n", *ostr);
writeExtremesElement("max", extremes, 1, *ostr);
writeExtremesElement("max", extremes, 1, *ostr, force_quoting_64bit_integers);
writeChar('\n', *ostr);
writeCString("\t}", *ostr);

View File

@ -7,8 +7,8 @@ namespace DB
{
JSONEachRowRowOutputStream::JSONEachRowRowOutputStream(WriteBuffer & ostr_, const Block & sample)
: ostr(ostr_)
JSONEachRowRowOutputStream::JSONEachRowRowOutputStream(WriteBuffer & ostr_, const Block & sample, bool force_quoting_)
: ostr(ostr_), force_quoting(force_quoting_)
{
size_t columns = sample.columns();
fields.resize(columns);
@ -25,7 +25,7 @@ void JSONEachRowRowOutputStream::writeField(const IColumn & column, const IDataT
{
writeString(fields[field_number], ostr);
writeChar(':', ostr);
type.serializeTextJSON(column, row_num, ostr);
type.serializeTextJSON(column, row_num, ostr, force_quoting);
++field_number;
}

View File

@ -7,8 +7,8 @@ namespace DB
{
JSONRowOutputStream::JSONRowOutputStream(WriteBuffer & ostr_, const Block & sample_, bool write_statistics_)
: dst_ostr(ostr_), write_statistics(write_statistics_)
JSONRowOutputStream::JSONRowOutputStream(WriteBuffer & ostr_, const Block & sample_, bool write_statistics_, bool force_quoting_)
: dst_ostr(ostr_), write_statistics(write_statistics_), force_quoting(force_quoting_)
{
NamesAndTypesList columns(sample_.getColumnsList());
fields.assign(columns.begin(), columns.end());
@ -72,7 +72,7 @@ void JSONRowOutputStream::writeField(const IColumn & column, const IDataType & t
writeCString("\t\t\t", *ostr);
writeString(fields[field_number].name, *ostr);
writeCString(": ", *ostr);
type.serializeTextJSON(column, row_num, *ostr);
type.serializeTextJSON(column, row_num, *ostr, force_quoting);
++field_number;
}
@ -152,7 +152,7 @@ void JSONRowOutputStream::writeTotals()
writeCString("\t\t", *ostr);
writeJSONString(column.name, *ostr);
writeCString(": ", *ostr);
column.type->serializeTextJSON(*column.column.get(), 0, *ostr);
column.type->serializeTextJSON(*column.column.get(), 0, *ostr, force_quoting);
}
writeChar('\n', *ostr);
@ -161,7 +161,7 @@ void JSONRowOutputStream::writeTotals()
}
static void writeExtremesElement(const char * title, const Block & extremes, size_t row_num, WriteBuffer & ostr)
static void writeExtremesElement(const char * title, const Block & extremes, size_t row_num, WriteBuffer & ostr, bool force_quoting)
{
writeCString("\t\t\"", ostr);
writeCString(title, ostr);
@ -179,7 +179,7 @@ static void writeExtremesElement(const char * title, const Block & extremes, siz
writeCString("\t\t\t", ostr);
writeJSONString(column.name, ostr);
writeCString(": ", ostr);
column.type->serializeTextJSON(*column.column.get(), row_num, ostr);
column.type->serializeTextJSON(*column.column.get(), row_num, ostr, force_quoting);
}
writeChar('\n', ostr);
@ -195,9 +195,9 @@ void JSONRowOutputStream::writeExtremes()
writeCString("\t\"extremes\":\n", *ostr);
writeCString("\t{\n", *ostr);
writeExtremesElement("min", extremes, 0, *ostr);
writeExtremesElement("min", extremes, 0, *ostr, force_quoting);
writeCString(",\n", *ostr);
writeExtremesElement("max", extremes, 1, *ostr);
writeExtremesElement("max", extremes, 1, *ostr, force_quoting);
writeChar('\n', *ostr);
writeCString("\t}", *ostr);

View File

@ -208,12 +208,12 @@ void TotalsHavingBlockInputStream::addToTotals(Block & totals, Block & block, co
{
for (size_t j = 0; j < size; ++j)
if ((*filter)[j])
function->merge(data, vec[j]);
function->merge(data, vec[j], arena.get());
}
else
{
for (size_t j = 0; j < size; ++j)
function->merge(data, vec[j]);
function->merge(data, vec[j], arena.get());
}
}
}

View File

@ -62,13 +62,14 @@ void DataTypeAggregateFunction::deserializeBinary(IColumn & column, ReadBuffer &
{
ColumnAggregateFunction & column_concrete = static_cast<ColumnAggregateFunction &>(column);
Arena & arena = column_concrete.createOrGetArena();
size_t size_of_state = function->sizeOfData();
AggregateDataPtr place = column_concrete.createOrGetArena().alloc(size_of_state);
AggregateDataPtr place = arena.alloc(size_of_state);
function->create(place);
try
{
function->deserialize(place, istr);
function->deserialize(place, istr, &arena);
}
catch (...)
{
@ -116,7 +117,7 @@ void DataTypeAggregateFunction::deserializeBinary(IColumn & column, ReadBuffer &
try
{
function->deserialize(place, istr);
function->deserialize(place, istr, &arena);
}
catch (...)
{
@ -140,15 +141,16 @@ static void deserializeFromString(const AggregateFunctionPtr & function, IColumn
{
ColumnAggregateFunction & column_concrete = static_cast<ColumnAggregateFunction &>(column);
Arena & arena = column_concrete.createOrGetArena();
size_t size_of_state = function->sizeOfData();
AggregateDataPtr place = column_concrete.createOrGetArena().alloc(size_of_state);
AggregateDataPtr place = arena.alloc(size_of_state);
function->create(place);
try
{
ReadBufferFromString istr(s);
function->deserialize(place, istr);
function->deserialize(place, istr, &arena);
}
catch (...)
{
@ -193,7 +195,7 @@ void DataTypeAggregateFunction::deserializeTextQuoted(IColumn & column, ReadBuff
}
void DataTypeAggregateFunction::serializeTextJSON(const IColumn & column, size_t row_num, WriteBuffer & ostr) const
void DataTypeAggregateFunction::serializeTextJSON(const IColumn & column, size_t row_num, WriteBuffer & ostr, bool) const
{
writeJSONString(serializeToString(function, column, row_num), ostr);
}

View File

@ -288,7 +288,7 @@ void DataTypeArray::deserializeTextQuoted(IColumn & column, ReadBuffer & istr) c
}
void DataTypeArray::serializeTextJSON(const IColumn & column, size_t row_num, WriteBuffer & ostr) const
void DataTypeArray::serializeTextJSON(const IColumn & column, size_t row_num, WriteBuffer & ostr, bool force_quoting_64bit_integers) const
{
const ColumnArray & column_array = static_cast<const ColumnArray &>(column);
const ColumnArray::Offsets_t & offsets = column_array.getOffsets();
@ -303,7 +303,7 @@ void DataTypeArray::serializeTextJSON(const IColumn & column, size_t row_num, Wr
{
if (i != offset)
writeChar(',', ostr);
nested->serializeTextJSON(nested_column, i, ostr);
nested->serializeTextJSON(nested_column, i, ostr, force_quoting_64bit_integers);
}
writeChar(']', ostr);
}

View File

@ -170,7 +170,7 @@ void DataTypeEnum<Type>::deserializeTextQuoted(IColumn & column, ReadBuffer & is
}
template <typename Type>
void DataTypeEnum<Type>::serializeTextJSON(const IColumn & column, size_t row_num, WriteBuffer & ostr) const
void DataTypeEnum<Type>::serializeTextJSON(const IColumn & column, size_t row_num, WriteBuffer & ostr, bool) const
{
writeJSONString(getNameForValue(static_cast<const ColumnType &>(column).getData()[row_num]), ostr);
}

View File

@ -158,7 +158,7 @@ void DataTypeFixedString::deserializeTextQuoted(IColumn & column, ReadBuffer & i
}
void DataTypeFixedString::serializeTextJSON(const IColumn & column, size_t row_num, WriteBuffer & ostr) const
void DataTypeFixedString::serializeTextJSON(const IColumn & column, size_t row_num, WriteBuffer & ostr, bool) const
{
const char * pos = reinterpret_cast<const char *>(&static_cast<const ColumnFixedString &>(column).getChars()[n * row_num]);
writeJSONString(pos, pos + n, ostr);

View File

@ -266,7 +266,7 @@ void DataTypeString::deserializeTextQuoted(IColumn & column, ReadBuffer & istr)
}
void DataTypeString::serializeTextJSON(const IColumn & column, size_t row_num, WriteBuffer & ostr) const
void DataTypeString::serializeTextJSON(const IColumn & column, size_t row_num, WriteBuffer & ostr, bool) const
{
writeJSONString(static_cast<const ColumnString &>(column).getDataAt(row_num), ostr);
}

View File

@ -144,14 +144,14 @@ void DataTypeTuple::deserializeTextQuoted(IColumn & column, ReadBuffer & istr) c
deserializeText(column, istr);
}
void DataTypeTuple::serializeTextJSON(const IColumn & column, size_t row_num, WriteBuffer & ostr) const
void DataTypeTuple::serializeTextJSON(const IColumn & column, size_t row_num, WriteBuffer & ostr, bool force_quoting_64bit_integers) const
{
writeChar('[', ostr);
for (const auto i : ext::range(0, ext::size(elems)))
{
if (i != 0)
writeChar(',', ostr);
elems[i]->serializeTextJSON(extractElementColumn(column, i), row_num, ostr);
elems[i]->serializeTextJSON(extractElementColumn(column, i), row_num, ostr, force_quoting_64bit_integers);
}
writeChar(']', ostr);
}

View File

@ -284,17 +284,18 @@ void Aggregator::compileIfPossible(AggregatedDataVariants::Type type)
code <<
"template void Aggregator::executeSpecializedWithoutKey<\n"
"\t" << "TypeList<" << aggregate_functions_typenames << ">>(\n"
"\tAggregatedDataWithoutKey &, size_t, AggregateColumns &) const;\n"
"\tAggregatedDataWithoutKey &, size_t, AggregateColumns &, Arena *) const;\n"
"\n"
"static void wrapper(\n"
"\tconst Aggregator & aggregator,\n"
"\tAggregatedDataWithoutKey & method,\n"
"\tsize_t rows,\n"
"\tAggregator::AggregateColumns & aggregate_columns)\n"
"\tAggregator::AggregateColumns & aggregate_columns,\n"
"\tArena * arena)\n"
"{\n"
"\taggregator.executeSpecializedWithoutKey<\n"
"\t\tTypeList<" << aggregate_functions_typenames << ">>(\n"
"\t\tmethod, rows, aggregate_columns);\n"
"\t\tmethod, rows, aggregate_columns, arena);\n"
"}\n"
"\n"
"void * getPtr() __attribute__((__visibility__(\"default\")));\n"
@ -537,7 +538,7 @@ void NO_INLINE Aggregator::executeImplCase(
/// Добавляем значения в агрегатные функции.
AggregateDataPtr value = Method::getAggregateData(it->second);
for (AggregateFunctionInstruction * inst = aggregate_instructions; inst->that; ++inst)
(*inst->func)(inst->that, value + inst->state_offset, inst->arguments, i);
(*inst->func)(inst->that, value + inst->state_offset, inst->arguments, i, aggregates_pool);
method.onExistingKey(key, keys, *aggregates_pool);
continue;
@ -585,7 +586,7 @@ void NO_INLINE Aggregator::executeImplCase(
/// Добавляем значения в агрегатные функции.
for (AggregateFunctionInstruction * inst = aggregate_instructions; inst->that; ++inst)
(*inst->func)(inst->that, value + inst->state_offset, inst->arguments, i);
(*inst->func)(inst->that, value + inst->state_offset, inst->arguments, i, aggregates_pool);
}
}
@ -596,7 +597,8 @@ void NO_INLINE Aggregator::executeImplCase(
void NO_INLINE Aggregator::executeWithoutKeyImpl(
AggregatedDataWithoutKey & res,
size_t rows,
AggregateFunctionInstruction * aggregate_instructions) const
AggregateFunctionInstruction * aggregate_instructions,
Arena * arena) const
{
/// Оптимизация в случае единственной агрегатной функции count.
AggregateFunctionCount * agg_count = params.aggregates_size == 1
@ -611,7 +613,7 @@ void NO_INLINE Aggregator::executeWithoutKeyImpl(
{
/// Добавляем значения
for (AggregateFunctionInstruction * inst = aggregate_instructions; inst->that; ++inst)
(*inst->func)(inst->that, res + inst->state_offset, inst->arguments, i);
(*inst->func)(inst->that, res + inst->state_offset, inst->arguments, i, arena);
}
}
}
@ -708,11 +710,11 @@ bool Aggregator::executeOnBlock(Block & block, AggregatedDataVariants & result,
if (compiled_data->compiled_method_ptr)
{
reinterpret_cast<
void (*)(const Aggregator &, AggregatedDataWithoutKey &, size_t, AggregateColumns &)>
(compiled_data->compiled_method_ptr)(*this, result.without_key, rows, aggregate_columns);
void (*)(const Aggregator &, AggregatedDataWithoutKey &, size_t, AggregateColumns &, Arena *)>
(compiled_data->compiled_method_ptr)(*this, result.without_key, rows, aggregate_columns, result.aggregates_pool);
}
else
executeWithoutKeyImpl(result.without_key, rows, &aggregate_functions_instructions[0]);
executeWithoutKeyImpl(result.without_key, rows, &aggregate_functions_instructions[0], result.aggregates_pool);
}
else
{
@ -1042,6 +1044,7 @@ void NO_INLINE Aggregator::convertToBlockImplNotFinal(
AggregateColumnsData & aggregate_columns,
const Sizes & key_sizes) const
{
for (auto & value : data)
{
method.insertKeyIntoColumns(value, key_columns, params.keys_size, key_sizes);
@ -1060,7 +1063,7 @@ Block Aggregator::prepareBlockAndFill(
AggregatedDataVariants & data_variants,
bool final,
size_t rows,
Filler && filler) const
Filler && filler) const
{
Block res = sample.cloneEmpty();
@ -1337,7 +1340,8 @@ BlocksList Aggregator::convertToBlocks(AggregatedDataVariants & data_variants, b
template <typename Method, typename Table>
void NO_INLINE Aggregator::mergeDataImpl(
Table & table_dst,
Table & table_src) const
Table & table_src,
Arena * arena) const
{
for (auto it = table_src.begin(); it != table_src.end(); ++it)
{
@ -1350,7 +1354,8 @@ void NO_INLINE Aggregator::mergeDataImpl(
for (size_t i = 0; i < params.aggregates_size; ++i)
aggregate_functions[i]->merge(
Method::getAggregateData(res_it->second) + offsets_of_aggregate_states[i],
Method::getAggregateData(it->second) + offsets_of_aggregate_states[i]);
Method::getAggregateData(it->second) + offsets_of_aggregate_states[i],
arena);
for (size_t i = 0; i < params.aggregates_size; ++i)
aggregate_functions[i]->destroy(
@ -1372,7 +1377,8 @@ template <typename Method, typename Table>
void NO_INLINE Aggregator::mergeDataNoMoreKeysImpl(
Table & table_dst,
AggregatedDataWithoutKey & overflows,
Table & table_src) const
Table & table_src,
Arena * arena) const
{
for (auto it = table_src.begin(); it != table_src.end(); ++it)
{
@ -1385,7 +1391,8 @@ void NO_INLINE Aggregator::mergeDataNoMoreKeysImpl(
for (size_t i = 0; i < params.aggregates_size; ++i)
aggregate_functions[i]->merge(
res_data + offsets_of_aggregate_states[i],
Method::getAggregateData(it->second) + offsets_of_aggregate_states[i]);
Method::getAggregateData(it->second) + offsets_of_aggregate_states[i],
arena);
for (size_t i = 0; i < params.aggregates_size; ++i)
aggregate_functions[i]->destroy(
@ -1400,7 +1407,8 @@ void NO_INLINE Aggregator::mergeDataNoMoreKeysImpl(
template <typename Method, typename Table>
void NO_INLINE Aggregator::mergeDataOnlyExistingKeysImpl(
Table & table_dst,
Table & table_src) const
Table & table_src,
Arena * arena) const
{
for (auto it = table_src.begin(); it != table_src.end(); ++it)
{
@ -1414,7 +1422,8 @@ void NO_INLINE Aggregator::mergeDataOnlyExistingKeysImpl(
for (size_t i = 0; i < params.aggregates_size; ++i)
aggregate_functions[i]->merge(
res_data + offsets_of_aggregate_states[i],
Method::getAggregateData(it->second) + offsets_of_aggregate_states[i]);
Method::getAggregateData(it->second) + offsets_of_aggregate_states[i],
arena);
for (size_t i = 0; i < params.aggregates_size; ++i)
aggregate_functions[i]->destroy(
@ -1439,7 +1448,7 @@ void NO_INLINE Aggregator::mergeWithoutKeyDataImpl(
AggregatedDataWithoutKey & current_data = non_empty_data[i]->without_key;
for (size_t i = 0; i < params.aggregates_size; ++i)
aggregate_functions[i]->merge(res_data + offsets_of_aggregate_states[i], current_data + offsets_of_aggregate_states[i]);
aggregate_functions[i]->merge(res_data + offsets_of_aggregate_states[i], current_data + offsets_of_aggregate_states[i], res->aggregates_pool);
for (size_t i = 0; i < params.aggregates_size; ++i)
aggregate_functions[i]->destroy(current_data + offsets_of_aggregate_states[i]);
@ -1467,16 +1476,19 @@ void NO_INLINE Aggregator::mergeSingleLevelDataImpl(
if (!no_more_keys)
mergeDataImpl<Method>(
getDataVariant<Method>(*res).data,
getDataVariant<Method>(current).data);
getDataVariant<Method>(current).data,
res->aggregates_pool);
else if (res->without_key)
mergeDataNoMoreKeysImpl<Method>(
getDataVariant<Method>(*res).data,
res->without_key,
getDataVariant<Method>(current).data);
getDataVariant<Method>(current).data,
res->aggregates_pool);
else
mergeDataOnlyExistingKeysImpl<Method>(
getDataVariant<Method>(*res).data,
getDataVariant<Method>(current).data);
getDataVariant<Method>(current).data,
res->aggregates_pool);
/// current не будет уничтожать состояния агрегатных функций в деструкторе
current.aggregator = nullptr;
@ -1486,7 +1498,7 @@ void NO_INLINE Aggregator::mergeSingleLevelDataImpl(
template <typename Method>
void NO_INLINE Aggregator::mergeBucketImpl(
ManyAggregatedDataVariants & data, Int32 bucket) const
ManyAggregatedDataVariants & data, Int32 bucket, Arena * arena) const
{
/// Все результаты агрегации соединяем с первым.
AggregatedDataVariantsPtr & res = data[0];
@ -1496,7 +1508,8 @@ void NO_INLINE Aggregator::mergeBucketImpl(
mergeDataImpl<Method>(
getDataVariant<Method>(*res).data.impls[bucket],
getDataVariant<Method>(current).data.impls[bucket]);
getDataVariant<Method>(current).data.impls[bucket],
arena);
}
}
@ -1513,7 +1526,16 @@ public:
* которые все либо являются одноуровневыми, либо являются двухуровневыми.
*/
MergingAndConvertingBlockInputStream(const Aggregator & aggregator_, ManyAggregatedDataVariants & data_, bool final_, size_t threads_)
: aggregator(aggregator_), data(data_), final(final_), threads(threads_) {}
: aggregator(aggregator_), data(data_), final(final_), threads(threads_)
{
/// At least we need one arena in first data item per thread
if (!data.empty() && threads > data[0]->aggregates_pools.size())
{
Arenas & first_pool = data[0]->aggregates_pools;
for (size_t j = first_pool.size(); j < threads; j++)
first_pool.emplace_back(std::make_shared<Arena>());
}
}
String getName() const override { return "MergingAndConverting"; }
@ -1655,17 +1677,21 @@ private:
try
{
/// TODO Возможно, поддержать no_more_keys
/// TODO: add no_more_keys support maybe
auto & merged_data = *data[0];
auto method = merged_data.type;
Block block;
/// Select Arena to avoid race conditions
size_t thread_number = static_cast<size_t>(bucket_num) % threads;
Arena * arena = merged_data.aggregates_pools.at(thread_number).get();
if (false) {}
#define M(NAME) \
else if (method == AggregatedDataVariants::Type::NAME) \
{ \
aggregator.mergeBucketImpl<decltype(merged_data.NAME)::element_type>(data, bucket_num); \
aggregator.mergeBucketImpl<decltype(merged_data.NAME)::element_type>(data, bucket_num, arena); \
block = aggregator.convertOneBucketToBlock(merged_data, *merged_data.NAME, final, bucket_num); \
}
@ -1825,7 +1851,8 @@ void NO_INLINE Aggregator::mergeStreamsImplCase(
for (size_t j = 0; j < params.aggregates_size; ++j)
aggregate_functions[j]->merge(
value + offsets_of_aggregate_states[j],
(*aggregate_columns[j])[i]);
(*aggregate_columns[j])[i],
aggregates_pool);
}
/// Пораньше освобождаем память.
@ -1869,7 +1896,7 @@ void NO_INLINE Aggregator::mergeWithoutKeyStreamsImpl(
/// Добавляем значения
for (size_t i = 0; i < params.aggregates_size; ++i)
aggregate_functions[i]->merge(res + offsets_of_aggregate_states[i], (*aggregate_columns[i])[0]);
aggregate_functions[i]->merge(res + offsets_of_aggregate_states[i], (*aggregate_columns[i])[0], result.aggregates_pool);
/// Пораньше освобождаем память.
block.clear();

View File

@ -59,15 +59,15 @@
0
0
2014-09-30 23:50:00
2014-09-30 23:50:00
2014-09-30 23:50:00
2014-09-30 23:50:00
2014-09-30 23:50:00
2014-09-30 23:00:00
2014-09-30 23:00:00
2014-09-30 23:00:00
2014-09-30 23:00:00
2014-09-30 21:50:00
2014-09-30 20:50:00
2014-10-01 04:50:00
2014-09-30 11:20:00
2014-09-30 23:00:00
2014-09-30 21:00:00
2014-09-30 20:00:00
2014-10-01 04:00:00
2014-09-30 11:00:00
2014
2014
2014
@ -123,16 +123,16 @@
2015 1 1 4
2014 9 30 2
2015 3 15 7
19 30 0 2015-07-15 13:30:00
21 0 0 2014-12-29 00:00:00
19 30 0 2015-07-15 19:30:00
21 0 0 2014-12-28 21:00:00
12 0 0 2015-01-01 12:00:00
21 50 0 2014-09-30 23:50:00
2 30 0 2015-03-15 13:30:00
2015-07-15 13:00:00 2015 24187 2375
2014-12-29 00:00:00 2014 24180 2346
21 50 0 2014-09-30 21:50:00
2 30 0 2015-03-15 02:00:00
2015-07-15 19:00:00 2015 24187 2375
2014-12-28 21:00:00 2014 24180 2346
2015-01-01 12:00:00 2015 24181 2347
2014-09-30 23:00:00 2014 24177 2334
2015-03-15 13:00:00 2015 24183 2357
2014-09-30 21:00:00 2014 24177 2334
2015-03-15 02:00:00 2015 24183 2357
16631 399154 23949270 1436956200
16432 394389 23663340 1419800400
16436 394473 23668380 1420102800
@ -153,16 +153,16 @@
2015 1 1 4
2014 9 1 2
2015 3 15 7
12 30 0 2015-07-15 13:30:00
22 0 0 2014-12-29 00:00:00
10 0 0 2015-01-01 12:00:00
21 50 0 2014-09-30 23:50:00
11 30 0 2015-03-15 13:30:00
2015-07-15 13:00:00 2015 24187 2375
2014-12-29 00:00:00 2014 24180 2346
2015-01-01 12:00:00 2015 24181 2347
2014-09-30 23:00:00 2014 24178 2334
2015-03-15 13:00:00 2015 24183 2357
12 30 0 2015-07-15 02:00:00
22 0 0 2014-12-28 13:30:00
10 0 0 2015-01-01 01:30:00
21 50 0 2014-09-30 11:20:00
11 30 0 2015-03-15 02:00:00
2015-07-15 12:00:00 2015 24187 2375
2014-12-28 22:00:00 2014 24180 2346
2015-01-01 10:00:00 2015 24181 2347
2014-09-30 21:00:00 2014 24178 2334
2015-03-15 11:00:00 2015 24183 2357
16631 399154 23949270 1436956200
16432 394389 23663340 1419800400
16436 394473 23668380 1420102800
@ -183,16 +183,16 @@
2015 3 15 7
2015 3 15 7
2015 3 15 7
19 30 0 2015-03-15 13:30:00
10 30 0 2015-03-15 13:30:00
19 30 0 2015-03-15 19:30:00
10 30 0 2015-03-15 10:30:00
13 30 0 2015-03-15 13:30:00
11 30 0 2015-03-15 13:30:00
2 30 0 2015-03-15 13:30:00
2015-03-15 13:00:00 2015 24183 2357
2015-03-15 13:00:00 2015 24183 2357
2015-03-15 13:00:00 2015 24183 2357
2015-03-15 13:00:00 2015 24183 2357
11 30 0 2015-03-15 11:30:00
2 30 0 2015-03-15 02:00:00
2015-03-15 19:00:00 2015 24183 2357
2015-03-15 10:00:00 2015 24183 2357
2015-03-15 13:00:00 2015 24183 2357
2015-03-15 11:00:00 2015 24183 2357
2015-03-15 02:00:00 2015 24183 2357
16509 396226 23773590 1426415400
16509 396226 23773590 1426415400
16509 396226 23773590 1426415400

View File

@ -1,4 +1,5 @@
clickhouse-client -n --query "DROP TABLE IF EXISTS test.json_noisy; CREATE TABLE test.json_noisy (d1 UInt8, d2 String) ENGINE = Memory"
#!/bin/bash
clickhouse-client -n --query "DROP TABLE IF EXISTS test.json_noisy; CREATE TABLE test.json_noisy (d1 UInt8, d2 String) ENGINE = Memory"
echo '{"d1" : 1, "d2" : "ok"}
{ }
@ -7,4 +8,4 @@ echo '{"d1" : 1, "d2" : "ok"}
{"d2":"ok","t1":[[[]],true, null, false, "1","2", 0.03, 1], "d1":"1", "t2":["1","2"]}' \
| clickhouse-client -n --query "SET input_format_skip_unknown_fields = 1; INSERT INTO test.json_noisy FORMAT JSONEachRow"
clickhouse-client -n --query "SELECT * FROM test.json_noisy; DROP TABLE IF EXISTS test.json_noisy;"
clickhouse-client -n --query "SELECT * FROM test.json_noisy; DROP TABLE IF EXISTS test.json_noisy;"

View File

@ -0,0 +1,20 @@
1000
1000
1000
1000
1000
1000
1000
1000
1000
1000
1000
1000
1000
1000
1000
1000
1000
1000
1000
1000

View File

@ -0,0 +1,10 @@
DROP TABLE IF EXISTS test.group_uniq_str;
CREATE TABLE test.group_uniq_str ENGINE = Memory AS SELECT number % 10 as id, toString(intDiv((number%10000), 10)) as v FROM system.numbers LIMIT 10000000;
INSERT INTO test.group_uniq_str SELECT 2 as id, toString(number % 100) as v FROM system.numbers LIMIT 1000000;
INSERT INTO test.group_uniq_str SELECT 5 as id, toString(number % 100) as v FROM system.numbers LIMIT 10000000;
SELECT length(groupUniqArray(v)) FROM test.group_uniq_str GROUP BY id ORDER BY id;
SELECT length(groupUniqArray(v)) FROM remote('127.0.0.{1,2,3,4}', 'test', 'group_uniq_str') GROUP BY id ORDER BY id;
DROP TABLE IF EXISTS test.group_uniq_str;

View File

@ -0,0 +1,20 @@
20001
20001
20001
20001
20001
20001
20001
20001
20001
20001
20001
20001
20001
20001
20001
20001
20001
20001
20001
20001

View File

@ -0,0 +1,9 @@
DROP TABLE IF EXISTS test.group_uniq_array_int;
CREATE TABLE test.group_uniq_arr_int ENGINE = Memory AS
SELECT g as id, if(c == 0, [v], if(c == 1, emptyArrayInt64(), [v, v])) as v FROM
(SELECT intDiv(number%1000000, 100) as v, intDiv(number%100, 10) as g, number%10 as c FROM system.numbers WHERE c < 3 LIMIT 10000000);
SELECT length(groupUniqArray(v)) FROM test.group_uniq_arr_int GROUP BY id ORDER BY id;
SELECT length(groupUniqArray(v)) FROM remote('127.0.0.{1,2,3,4}', 'test', 'group_uniq_arr_int') GROUP BY id ORDER BY id;
DROP TABLE IF EXISTS test.group_uniq_arr_int;

View File

@ -0,0 +1,20 @@
20001
20001
20001
20001
20001
20001
20001
20001
20001
20001
20001
20001
20001
20001
20001
20001
20001
20001
20001
20001

View File

@ -0,0 +1,9 @@
DROP TABLE IF EXISTS test.group_uniq_arr_str;
CREATE TABLE test.group_uniq_arr_str ENGINE = Memory AS
SELECT hex(intHash32(g)) as id, if(c == 0, [hex(v)], if(c == 1, emptyArrayString(), [hex(v), hex(v)])) as v FROM
(SELECT intDiv(number%1000000, 100) as v, intDiv(number%100, 10) as g, number%10 as c FROM system.numbers WHERE c < 3 LIMIT 10000000);
SELECT length(groupUniqArray(v)) FROM test.group_uniq_arr_str GROUP BY id ORDER BY id;
SELECT length(groupUniqArray(v)) FROM remote('127.0.0.{1,2,3,4}', 'test', 'group_uniq_arr_str') GROUP BY id ORDER BY id;
DROP TABLE IF EXISTS test.group_uniq_arr_str;

View File

@ -0,0 +1,264 @@
{
"meta":
[
{
"name": "i0",
"type": "Int64"
},
{
"name": "u0",
"type": "UInt64"
},
{
"name": "ip",
"type": "Int64"
},
{
"name": "in",
"type": "Int64"
},
{
"name": "up",
"type": "UInt64"
},
{
"name": "arr",
"type": "Array(Int64)"
},
{
"name": "tuple",
"type": "Tuple(UInt64, UInt64)"
}
],
"data":
[
{
"i0": "0",
"u0": "0",
"ip": "9223372036854775807",
"in": "-9223372036854775808",
"up": "18446744073709551615",
"arr": ["0"],
"tuple": ["0","0"]
}
],
"totals":
{
"i0": "0",
"u0": "0",
"ip": "0",
"in": "0",
"up": "0",
"arr": [],
"tuple": ["0","0"]
},
"extremes":
{
"min":
{
"i0": "0",
"u0": "0",
"ip": "9223372036854775807",
"in": "-9223372036854775808",
"up": "18446744073709551615",
"arr": [],
"tuple": ["0","0"]
},
"max":
{
"i0": "0",
"u0": "0",
"ip": "9223372036854775807",
"in": "-9223372036854775808",
"up": "18446744073709551615",
"arr": [],
"tuple": ["0","0"]
}
},
"rows": 1
}
{
"meta":
[
{
"name": "i0",
"type": "Int64"
},
{
"name": "u0",
"type": "UInt64"
},
{
"name": "ip",
"type": "Int64"
},
{
"name": "in",
"type": "Int64"
},
{
"name": "up",
"type": "UInt64"
},
{
"name": "arr",
"type": "Array(Int64)"
},
{
"name": "tuple",
"type": "Tuple(UInt64, UInt64)"
}
],
"data":
[
["0", "0", "9223372036854775807", "-9223372036854775808", "18446744073709551615", ["0"], ["0","0"]]
],
"totals": ["0","0","0","0","0",[],["0","0"]],
"extremes":
{
"min": ["0","0","9223372036854775807","-9223372036854775808","18446744073709551615",[],["0","0"]],
"max": ["0","0","9223372036854775807","-9223372036854775808","18446744073709551615",[],["0","0"]]
},
"rows": 1
}
{"i0":"0","u0":"0","ip":"9223372036854775807","in":"-9223372036854775808","up":"18446744073709551615","arr":["0"],"tuple":["0","0"]}
{
"meta":
[
{
"name": "i0",
"type": "Int64"
},
{
"name": "u0",
"type": "UInt64"
},
{
"name": "ip",
"type": "Int64"
},
{
"name": "in",
"type": "Int64"
},
{
"name": "up",
"type": "UInt64"
},
{
"name": "arr",
"type": "Array(Int64)"
},
{
"name": "tuple",
"type": "Tuple(UInt64, UInt64)"
}
],
"data":
[
{
"i0": 0,
"u0": 0,
"ip": 9223372036854775807,
"in": -9223372036854775808,
"up": 18446744073709551615,
"arr": [0],
"tuple": [0,0]
}
],
"totals":
{
"i0": 0,
"u0": 0,
"ip": 0,
"in": 0,
"up": 0,
"arr": [],
"tuple": [0,0]
},
"extremes":
{
"min":
{
"i0": 0,
"u0": 0,
"ip": 9223372036854775807,
"in": -9223372036854775808,
"up": 18446744073709551615,
"arr": [],
"tuple": [0,0]
},
"max":
{
"i0": 0,
"u0": 0,
"ip": 9223372036854775807,
"in": -9223372036854775808,
"up": 18446744073709551615,
"arr": [],
"tuple": [0,0]
}
},
"rows": 1
}
{
"meta":
[
{
"name": "i0",
"type": "Int64"
},
{
"name": "u0",
"type": "UInt64"
},
{
"name": "ip",
"type": "Int64"
},
{
"name": "in",
"type": "Int64"
},
{
"name": "up",
"type": "UInt64"
},
{
"name": "arr",
"type": "Array(Int64)"
},
{
"name": "tuple",
"type": "Tuple(UInt64, UInt64)"
}
],
"data":
[
[0, 0, 9223372036854775807, -9223372036854775808, 18446744073709551615, [0], [0,0]]
],
"totals": [0,0,0,0,0,[],[0,0]],
"extremes":
{
"min": [0,0,9223372036854775807,-9223372036854775808,18446744073709551615,[],[0,0]],
"max": [0,0,9223372036854775807,-9223372036854775808,18446744073709551615,[],[0,0]]
},
"rows": 1
}
{"i0":0,"u0":0,"ip":9223372036854775807,"in":-9223372036854775808,"up":18446744073709551615,"arr":[0],"tuple":[0,0]}

View File

@ -0,0 +1,12 @@
SET output_format_write_statistics = 0;
SET extremes = 1;
SET output_format_json_quote_64bit_integers = 1;
SELECT toInt64(0) as i0, toUInt64(0) as u0, toInt64(9223372036854775807) as ip, toInt64(-9223372036854775808) as in, toUInt64(18446744073709551615) as up, [toInt64(0)] as arr, (toUInt64(0), toUInt64(0)) as tuple WITH TOTALS FORMAT JSON;
SELECT toInt64(0) as i0, toUInt64(0) as u0, toInt64(9223372036854775807) as ip, toInt64(-9223372036854775808) as in, toUInt64(18446744073709551615) as up, [toInt64(0)] as arr, (toUInt64(0), toUInt64(0)) as tuple WITH TOTALS FORMAT JSONCompact;
SELECT toInt64(0) as i0, toUInt64(0) as u0, toInt64(9223372036854775807) as ip, toInt64(-9223372036854775808) as in, toUInt64(18446744073709551615) as up, [toInt64(0)] as arr, (toUInt64(0), toUInt64(0)) as tuple WITH TOTALS FORMAT JSONEachRow;
SET output_format_json_quote_64bit_integers = 0;
SELECT toInt64(0) as i0, toUInt64(0) as u0, toInt64(9223372036854775807) as ip, toInt64(-9223372036854775808) as in, toUInt64(18446744073709551615) as up, [toInt64(0)] as arr, (toUInt64(0), toUInt64(0)) as tuple WITH TOTALS FORMAT JSON;
SELECT toInt64(0) as i0, toUInt64(0) as u0, toInt64(9223372036854775807) as ip, toInt64(-9223372036854775808) as in, toUInt64(18446744073709551615) as up, [toInt64(0)] as arr, (toUInt64(0), toUInt64(0)) as tuple WITH TOTALS FORMAT JSONCompact;
SELECT toInt64(0) as i0, toUInt64(0) as u0, toInt64(9223372036854775807) as ip, toInt64(-9223372036854775808) as in, toUInt64(18446744073709551615) as up, [toInt64(0)] as arr, (toUInt64(0), toUInt64(0)) as tuple WITH TOTALS FORMAT JSONEachRow;

View File

@ -6616,12 +6616,17 @@ Allows setting a default sampling coefficient for all SELECT queries.
(For tables that don&#39;t support sampling, an exception will be thrown.)
If set to 1, default sampling is not performed.
====input_format_skip_unknown_fields==
==input_format_skip_unknown_fields==
If the parameter is true, INSERT operation will skip columns with unknown names from input.
Otherwise, an exception will be generated.
Otherwise, an exception will be generated, it is default behavior.
The parameter works only for JSONEachRow and TSKV input formats.
==output_format_json_quote_64bit_integers==
If the parameter is true (default value), UInt64 and Int64 numbers are printed as quoted strings in all JSON output formats.
Such behavior is compatible with most JavaScript interpreters that stores all numbers as double-precision floating point numbers.
Otherwise, they are printed as regular numbers.
==Restrictions on query complexity==

View File

@ -3881,7 +3881,7 @@ Extremes:
}
%%
JSON совместим с JavaScript. Для этого, дополнительно эскейпятся некоторые символы: символ прямого слеша %%/%% экранируется в виде %%\/%%; альтернативные переводы строк %%U+2028%%, %%U+2029%%, на которых ломаются некоторые браузеры, экранируются в виде <span class="inline-example">\u<i>XXXX</i></span>-последовательностей. Эскейпятся ASCII control characters: backspace, form feed, line feed, carriage return, horizontal tab в виде %%\b%%, %%\f%%, %%\n%%, %%\r%%, %%\t%% соответственно, а также остальные байты из диапазона 00-1F с помощью <span class="inline-example">\u<i>XXXX</i></span>-последовательностей. Невалидные UTF-8 последовательности заменяются на replacement character %%<25>%% и, таким образом, выводимый текст будет состоять из валидных UTF-8 последовательностей. Числа типа UInt64 и Int64, для совместимости с JavaScript, выводятся в двойных кавычках.
JSON совместим с JavaScript. Для этого, дополнительно эскейпятся некоторые символы: символ прямого слеша %%/%% экранируется в виде %%\/%%; альтернативные переводы строк %%U+2028%%, %%U+2029%%, на которых ломаются некоторые браузеры, экранируются в виде <span class="inline-example">\u<i>XXXX</i></span>-последовательностей. Эскейпятся ASCII control characters: backspace, form feed, line feed, carriage return, horizontal tab в виде %%\b%%, %%\f%%, %%\n%%, %%\r%%, %%\t%% соответственно, а также остальные байты из диапазона 00-1F с помощью <span class="inline-example">\u<i>XXXX</i></span>-последовательностей. Невалидные UTF-8 последовательности заменяются на replacement character %%<25>%% и, таким образом, выводимый текст будет состоять из валидных UTF-8 последовательностей. Числа типа UInt64 и Int64, для совместимости с JavaScript, по-умолчанию выводятся в двойных кавычках, чтобы они выводились без кавычек можно установить конфигурационный параметр output_format_json_quote_64bit_integers равным 0.
%%rows%% - общее количество выведенных строчек.
%%rows_before_limit_at_least%% - не менее скольких строчек получилось бы, если бы не было LIMIT-а. Выводится только если запрос содержит LIMIT.
@ -6775,6 +6775,10 @@ regions_names_*.txt: TabSeparated (без заголовка), столбцы:
Если значение истино, то при выполнении INSERT из входных данных пропускаются (не рассматриваются) колонки с неизвестными именами, иначе в данной ситуации будет сгенерировано исключение.
Работает для форматов JSONEachRow и TSKV.
==output_format_json_quote_64bit_integers==
Если значение истино, то при использовании JSON* форматов UInt64 и Int64 числа выводятся в кавычках (из соображений совместимости с большинством реализаций JavaScript), иначе - без кавычек.
==Ограничения на сложность запроса==