diff --git a/dbms/src/AggregateFunctions/AggregateFunctionGroupArray.cpp b/dbms/src/AggregateFunctions/AggregateFunctionGroupArray.cpp index b0c210ea43c..549ac309b6d 100644 --- a/dbms/src/AggregateFunctions/AggregateFunctionGroupArray.cpp +++ b/dbms/src/AggregateFunctions/AggregateFunctionGroupArray.cpp @@ -8,22 +8,7 @@ namespace DB namespace { -AggregateFunctionPtr createAggregateFunctionGroupArray(const std::string & name, const DataTypes & argument_types, const Array & parameters) -{ - if (argument_types.size() != 1) - throw Exception("Incorrect number of arguments for aggregate function " + name + ", should be 2", - ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); - - AggregateFunctionPtr res(createWithNumericType(*argument_types[0])); - - if (!res) - res = std::make_shared(); - - return res; -} - - -AggregateFunctionPtr createAggregateFunctionGroupArray2(const std::string & name, const DataTypes & argument_types, const Array & parameters) +static AggregateFunctionPtr createAggregateFunctionGroupArray(const std::string & name, const DataTypes & argument_types, const Array & parameters) { if (argument_types.size() != 1) throw Exception("Incorrect number of arguments for aggregate function " + name + ", should be 1", @@ -38,17 +23,16 @@ AggregateFunctionPtr createAggregateFunctionGroupArray2(const std::string & name } else if (parameters.size() == 1) { - if (parameters[0].getType() == Field::Types::Int64 || parameters[0].getType() == Field::Types::UInt64) - { - if ((parameters[0].getType() == Field::Types::Int64 && parameters[0].get() < 0) || - (parameters[0].getType() == Field::Types::UInt64 && parameters[0].get() == 0)) - throw Exception("Parameter for aggregate function " + name + " should be positive number", ErrorCodes::BAD_ARGUMENTS); + auto type = parameters[0].getType(); + if (type != Field::Types::Int64 && type != Field::Types::UInt64) + throw Exception("Parameter for aggregate function " + name + " should be positive number", ErrorCodes::BAD_ARGUMENTS); - limit_size = true; - max_elems = parameters[0].get(); - } - else + if ((type == Field::Types::Int64 && parameters[0].get() < 0) || + (type == Field::Types::UInt64 && parameters[0].get() == 0)) throw Exception("Parameter for aggregate function " + name + " should be positive number", ErrorCodes::BAD_ARGUMENTS); + + limit_size = true; + max_elems = parameters[0].get(); } else throw Exception("Incorrect number of parameters for aggregate function " + name + ", should be 0 or 1", @@ -56,49 +40,30 @@ AggregateFunctionPtr createAggregateFunctionGroupArray2(const std::string & name if (!limit_size) { - if (auto res = createWithNumericType(*argument_types[0])) + if (auto res = createWithNumericType(*argument_types[0])) return AggregateFunctionPtr(res); else if (typeid_cast(argument_types[0].get())) - return std::make_shared>(); + return std::make_shared>(); else - return std::make_shared>(); + return std::make_shared>(); } else { - if (auto res = createWithNumericType(*argument_types[0], max_elems)) - { + if (auto res = createWithNumericType(*argument_types[0], max_elems)) return AggregateFunctionPtr(res); - } else if (typeid_cast(argument_types[0].get())) - return std::make_shared>(max_elems); + return std::make_shared>(max_elems); else - return std::make_shared>(max_elems); + return std::make_shared>(max_elems); } } - -AggregateFunctionPtr createAggregateFunctionGroupArray4(const std::string & name, const DataTypes & argument_types, const Array & parameters) -{ - if (argument_types.size() != 1) - throw Exception("Incorrect number of arguments for aggregate function " + name + ", should be 2", - ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); - -// if (auto res = createWithNumericType(*argument_types[0])) -// return AggregateFunctionPtr(res); - - if (typeid_cast(argument_types[0].get())) - return std::make_shared(); - else - return std::make_shared(); } -} void registerAggregateFunctionGroupArray(AggregateFunctionFactory & factory) { factory.registerFunction("groupArray", createAggregateFunctionGroupArray); - factory.registerFunction("groupArray2", createAggregateFunctionGroupArray2); - factory.registerFunction("groupArray4", createAggregateFunctionGroupArray4); } } diff --git a/dbms/src/AggregateFunctions/AggregateFunctionGroupArray.h b/dbms/src/AggregateFunctions/AggregateFunctionGroupArray.h index 3613a9f7b24..f766d1add0b 100644 --- a/dbms/src/AggregateFunctions/AggregateFunctionGroupArray.h +++ b/dbms/src/AggregateFunctions/AggregateFunctionGroupArray.h @@ -30,106 +30,32 @@ namespace ErrorCodes } -/// A particular case is an implementation for numeric types. -template -struct AggregateFunctionGroupArrayDataNumeric +namespace { - /// Memory is allocated to several elements immediately so that the state occupies 64 bytes. - static constexpr size_t bytes_in_arena = 64 - sizeof(PODArray); - - using Array = PODArray, bytes_in_arena>>; - Array value; -}; - - -template -class AggregateFunctionGroupArrayNumeric final - : public IUnaryAggregateFunction, AggregateFunctionGroupArrayNumeric> -{ -public: - String getName() const override { return "groupArray"; } - - DataTypePtr getReturnType() const override - { - return std::make_shared(std::make_shared>()); - } - - void setArgument(const DataTypePtr & argument) - { - } - - void addImpl(AggregateDataPtr place, const IColumn & column, size_t row_num, Arena *) const - { - this->data(place).value.push_back(static_cast &>(column).getData()[row_num]); - } - - void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena * arena) const override - { - this->data(place).value.insert(this->data(rhs).value.begin(), this->data(rhs).value.end()); - } - - void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override - { - const auto & value = this->data(place).value; - size_t size = value.size(); - writeVarUInt(size, buf); - buf.write(reinterpret_cast(&value[0]), size * sizeof(value[0])); - } - - void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override - { - size_t size = 0; - readVarUInt(size, buf); - - if (unlikely(size > AGGREGATE_FUNCTION_GROUP_ARRAY_MAX_ARRAY_SIZE)) - throw Exception("Too large array size", ErrorCodes::TOO_LARGE_ARRAY_SIZE); - - auto & value = this->data(place).value; - - value.resize(size); - buf.read(reinterpret_cast(&value[0]), size * sizeof(value[0])); - } - - void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override - { - const auto & value = this->data(place).value; - size_t size = value.size(); - - ColumnArray & arr_to = static_cast(to); - ColumnArray::Offsets_t & offsets_to = arr_to.getOffsets(); - - offsets_to.push_back((offsets_to.size() == 0 ? 0 : offsets_to.back()) + size); - - typename ColumnVector::Container_t & data_to = static_cast &>(arr_to.getData()).getData(); - data_to.insert(this->data(place).value.begin(), this->data(place).value.end()); - } -}; - /// A particular case is an implementation for numeric types. template -struct AggregateFunctionGroupArrayDataNumeric2 +struct GroupArrayNumericData { - /// Memory is allocated to several elements immediately so that the state occupies 64 bytes. - static constexpr size_t bytes_in_arena = 64 - sizeof(PODArrayArenaAllocator); + // Switch to ordinary Allocator after 4096 bytes to avoid fragmentation and trash in Arena + using Allocator = MixedArenaAllocator<4096>; + using Array = PODArrayArenaAllocator; - //using Array = PODArrayArenaAllocator>; - using Array = PODArrayArenaAllocator>; Array value; }; template -class AggregateFunctionGroupArrayNumeric2 final - : public IUnaryAggregateFunction, AggregateFunctionGroupArrayNumeric2> +class GroupArrayNumericImpl final + : public IUnaryAggregateFunction, GroupArrayNumericImpl> { static constexpr bool limit_num_elems = Tlimit_num_elems::value; UInt64 max_elems; public: - AggregateFunctionGroupArrayNumeric2(UInt64 max_elems_ = std::numeric_limits::max()) : max_elems(max_elems_) {} + GroupArrayNumericImpl(UInt64 max_elems_ = std::numeric_limits::max()) : max_elems(max_elems_) {} - String getName() const override { return "groupArray2"; } + String getName() const override { return "groupArray"; } DataTypePtr getReturnType() const override { @@ -206,82 +132,18 @@ public: typename ColumnVector::Container_t & data_to = static_cast &>(arr_to.getData()).getData(); data_to.insert(this->data(place).value.begin(), this->data(place).value.end()); } -}; - -/// General case (inefficient). NOTE You can also implement a special case for strings. -struct AggregateFunctionGroupArrayDataGeneric -{ - Array value; /// TODO Add MemoryTracker -}; - - -/// Puts all values to an array, general case. Implemented inefficiently. -class AggregateFunctionGroupArrayGeneric final - : public IUnaryAggregateFunction -{ -private: - DataTypePtr type; - -public: - String getName() const override { return "groupArray"; } - - DataTypePtr getReturnType() const override + bool allocatesMemoryInArena() const override { - return std::make_shared(type); - } - - void setArgument(const DataTypePtr & argument) - { - type = argument; - } - - - void addImpl(AggregateDataPtr place, const IColumn & column, size_t row_num, Arena *) const - { - data(place).value.push_back(Array::value_type()); - column.get(row_num, data(place).value.back()); - } - - void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena * arena) const override - { - data(place).value.insert(data(place).value.end(), data(rhs).value.begin(), data(rhs).value.end()); - } - - void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override - { - const Array & value = data(place).value; - size_t size = value.size(); - writeVarUInt(size, buf); - for (size_t i = 0; i < size; ++i) - type->serializeBinary(value[i], buf); - } - - void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override - { - size_t size = 0; - readVarUInt(size, buf); - - if (unlikely(size > AGGREGATE_FUNCTION_GROUP_ARRAY_MAX_ARRAY_SIZE)) - throw Exception("Too large array size", ErrorCodes::TOO_LARGE_ARRAY_SIZE); - - Array & value = data(place).value; - - value.resize(size); - for (size_t i = 0; i < size; ++i) - type->deserializeBinary(value[i], buf); - } - - void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override - { - to.insert(data(place).value); + return true; } }; -namespace -{ +/// General case + +/// Nodes used to implement linked list for stoarge of groupArray states struct NodeString; struct NodeGeneral; @@ -289,8 +151,9 @@ template struct NodeBase { Node * next; - UInt64 size; + UInt64 size; // size of payload + /// Returns pointer to actual payload char * data() { static_assert(sizeof(NodeBase) == sizeof(Node)); @@ -346,7 +209,6 @@ struct NodeString : public NodeBase } }; - struct NodeGeneral : public NodeBase { using Node = NodeGeneral; @@ -371,7 +233,7 @@ struct NodeGeneral : public NodeBase template -struct AggregateFunctionGroupArrayListImpl_Data +struct GroupArrayGeneralListData { UInt64 elems = 0; Node * first = nullptr; @@ -379,13 +241,13 @@ struct AggregateFunctionGroupArrayListImpl_Data }; -/// Implementation of groupArray(String or ComplexObject) via linked list +/// Implementation of groupArray for String or any ComplexObject via linked list /// It has poor performance in case of many small objects template -class AggregateFunctionGroupArrayStringListImpl final - : public IUnaryAggregateFunction, AggregateFunctionGroupArrayStringListImpl> +class GroupArrayGeneralListImpl final + : public IUnaryAggregateFunction, GroupArrayGeneralListImpl> { - using Data = AggregateFunctionGroupArrayListImpl_Data; + using Data = GroupArrayGeneralListData; static Data & data(AggregateDataPtr place) { return *reinterpret_cast(place); } static const Data & data(ConstAggregateDataPtr place) { return *reinterpret_cast(place); } @@ -393,9 +255,9 @@ class AggregateFunctionGroupArrayStringListImpl final UInt64 max_elems; public: - AggregateFunctionGroupArrayStringListImpl(UInt64 max_elems_ = std::numeric_limits::max()) : max_elems(max_elems_) {} + GroupArrayGeneralListImpl(UInt64 max_elems_ = std::numeric_limits::max()) : max_elems(max_elems_) {} - String getName() const override { return "groupArray2"; } + String getName() const override { return "groupArray"; } DataTypePtr getReturnType() const override { return std::make_shared(data_type); } @@ -553,207 +415,6 @@ public: } -struct AggregateFunctionGroupArrayStringConcatImpl_Data -{ - static constexpr size_t target_size = 64; - static constexpr size_t free_space = target_size - sizeof(PODArrayArenaAllocator) - sizeof(PODArrayArenaAllocator); - - PODArrayArenaAllocator chars; - PODArrayArenaAllocator> offsets; -}; - - -class AggregateFunctionGroupArrayStringConcatImpl final - : public IUnaryAggregateFunction -{ -public: - - String getName() const override { return "groupArray4"; } - - DataTypePtr getReturnType() const override { return std::make_shared(std::make_shared()); } - - void setArgument(const DataTypePtr & argument) {} - - void addImpl(AggregateDataPtr place, const IColumn & column, size_t row_num, Arena * arena) const - { - StringRef string = static_cast(column).getDataAtWithTerminatingZero(row_num); - - data(place).chars.insert(string.data, string.data + string.size, arena); - data(place).offsets.push_back(string.size, arena); - } - - void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override - { - } - - void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena * arena) const override - { - } - - void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena * arena) const override - { - auto & cur_state = data(place); - auto & rhs_state = data(rhs); - - cur_state.chars.insert(rhs_state.chars.begin(), rhs_state.chars.end(), arena); - cur_state.offsets.insert(rhs_state.offsets.begin(), rhs_state.offsets.end(), arena); - } - - void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override - { - auto & column_array = static_cast(to); - auto & column_string = static_cast(column_array.getData()); - auto & offsets = column_array.getOffsets(); - auto & cur_state = data(place); - - offsets.push_back((offsets.size() == 0 ? 0 : offsets.back()) + cur_state.offsets.size()); - - auto pos = column_string.getChars().size(); - column_string.getChars().insert(cur_state.chars.begin(), cur_state.chars.end()); - - column_string.getOffsets().reserve(column_string.getOffsets().size() + cur_state.offsets.size()); - for (UInt64 i = 0; i < cur_state.offsets.size(); ++i) - { - pos += cur_state.offsets[i]; - column_string.getOffsets().push_back(pos); - } - } -}; - - -struct AggregateFunctionGroupArrayGeneric_SerializedData -{ - PODArrayArenaAllocator values; -}; - -class AggregateFunctionGroupArrayGeneric_SerializedDataImpl final - : public IUnaryAggregateFunction -{ -private: - DataTypePtr type; - -public: - String getName() const override { return "groupArray"; } - - DataTypePtr getReturnType() const override - { - return std::make_shared(type); - } - - void setArgument(const DataTypePtr & argument) - { - type = argument; - } - - void addImpl(AggregateDataPtr place, const IColumn & column, size_t row_num, Arena * arena) const - { - const char * begin = nullptr; - data(place).values.push_back(column.serializeValueIntoArena(row_num, *arena, begin), arena); - } - - void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena * arena) const override - { - for (const StringRef & elem : data(rhs).values) - data(place).values.push_back(StringRef(arena->insert(elem.data, elem.size), elem.size), arena); - } - - void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override - { - } - - void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override - { - } - - void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override - { - ColumnArray & column_array = static_cast(to); - auto & column_data = column_array.getData(); - auto & offsets = column_array.getOffsets(); - auto & cur_state = data(place); - - offsets.push_back((offsets.size() == 0 ? 0 : offsets.back()) + cur_state.values.size()); - - column_data.reserve(cur_state.values.size()); - - for (const StringRef & elem : cur_state.values) - column_data.deserializeAndInsertFromArena(elem.data); - } - - bool allocatesMemoryInArena() const override - { - return true; - } -}; - - - - -struct AggregateFunctionGroupArrayGeneric_ColumnPtrImpl_Data -{ - ColumnPtr container; -}; - -class AggregateFunctionGroupArrayGeneric_ColumnPtrImpl final - : public IUnaryAggregateFunction -{ -private: - DataTypePtr type; - -public: - String getName() const override { return "groupArray4"; } - - DataTypePtr getReturnType() const override - { - return std::make_shared(type); - } - - void setArgument(const DataTypePtr & argument) - { - type = argument; - } - - void create(AggregateDataPtr place) const override - { - new (place) Data; - data(place).container = type->createColumn(); - } - - void addImpl(AggregateDataPtr place, const IColumn & column, size_t row_num, Arena *) const - { - data(place).container->insertFrom(column, row_num); - } - - void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena * arena) const override - { - data(place).container->insertRangeFrom(*data(rhs).container, 0, data(rhs).container->size()); - } - - void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override - { - UInt64 s = data(place).container->size(); - writeVarUInt(s, buf); - type->serializeBinaryBulk(*data(place).container, buf, 0, s); - } - - void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override - { - UInt64 s; - readVarUInt(s, buf); - type->deserializeBinaryBulk(*data(place).container, buf, s, 0); - } - - void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override - { - ColumnArray & array_column = static_cast(to); - auto s = data(place).container->size(); - array_column.getOffsets().push_back(s); - array_column.getData().insertRangeFrom(*data(place).container, 0, s); - } -}; - - - #undef AGGREGATE_FUNCTION_GROUP_ARRAY_MAX_ARRAY_SIZE } diff --git a/dbms/src/Common/PODArrayArena.h b/dbms/src/Common/PODArrayArena.h index b66d55815a8..ae7cfb3875e 100644 --- a/dbms/src/Common/PODArrayArena.h +++ b/dbms/src/Common/PODArrayArena.h @@ -1,4 +1,3 @@ -#include #include #include @@ -6,8 +5,7 @@ namespace DB { -/// Fake Allocator which proxies all allocations to Arena -/// Used in aggregate functions +/// Fake Allocator which proxies all allocations to Arena. Used in aggregate functions. struct ArenaAllocator { static void * alloc(size_t size, Arena * arena) @@ -32,10 +30,14 @@ struct ArenaAllocator } } - static void free(void * buf, size_t size) {} + static void free(void * buf, size_t size) + { + // Remains trash in arena + } }; +/// Switches to ordinary Allocator after REAL_ALLOCATION_TRESHOLD bytes to avoid fragmentation and trash in Arena. template , typename TArenaAllocator = ArenaAllocator> class MixedArenaAllocator : private TRealAllocator { @@ -102,6 +104,8 @@ public: /// Similar to PODArray, but allocates memory using ArenaAllocator +/// Uses additional Arena * arena parameter in all modification methods to not store it +/// TODO: avoid copypaste from PODArray template class PODArrayArenaAllocator : private TAllocator { diff --git a/dbms/tests/queries/0_stateless/00113_group_array.sql b/dbms/tests/queries/0_stateless/00113_group_array.sql index b7e5180c1e2..91a62e78872 100644 --- a/dbms/tests/queries/0_stateless/00113_group_array.sql +++ b/dbms/tests/queries/0_stateless/00113_group_array.sql @@ -1,17 +1,17 @@ -SELECT intDiv(number, 100) AS k, length(groupArray2(number)) FROM (SELECT * FROM system.numbers LIMIT 1000000) GROUP BY k WITH TOTALS ORDER BY k LIMIT 10; +SELECT intDiv(number, 100) AS k, length(groupArray(number)) FROM (SELECT * FROM system.numbers LIMIT 1000000) GROUP BY k WITH TOTALS ORDER BY k LIMIT 10; DROP TABLE IF EXISTS test.numbers_mt; CREATE TABLE test.numbers_mt (number UInt64) ENGINE = Log; INSERT INTO test.numbers_mt SELECT * FROM system.numbers LIMIT 1, 1000000; -SELECT count(), sum(ns), max(ns) FROM (SELECT intDiv(number, 100) AS k, groupArray2(number) AS ns FROM test.numbers_mt GROUP BY k) ARRAY JOIN ns; -SELECT count(), sum(toUInt64(ns)), max(toUInt64(ns)) FROM (SELECT intDiv(number, 100) AS k, groupArray2(toString(number)) AS ns FROM test.numbers_mt GROUP BY k) ARRAY JOIN ns; -SELECT count(), sum(toUInt64(ns[1])), max(toUInt64(ns[1])), sum(toUInt64(ns[2]))/10 FROM (SELECT intDiv(number, 100) AS k, groupArray2([toString(number), toString(number*10)]) AS ns FROM test.numbers_mt GROUP BY k) ARRAY JOIN ns; -SELECT count(), sum(ns[1]), max(ns[1]), sum(ns[2])/10 FROM (SELECT intDiv(number, 100) AS k, groupArray2([number, number*10]) AS ns FROM test.numbers_mt GROUP BY k) ARRAY JOIN ns; +SELECT count(), sum(ns), max(ns) FROM (SELECT intDiv(number, 100) AS k, groupArray(number) AS ns FROM test.numbers_mt GROUP BY k) ARRAY JOIN ns; +SELECT count(), sum(toUInt64(ns)), max(toUInt64(ns)) FROM (SELECT intDiv(number, 100) AS k, groupArray(toString(number)) AS ns FROM test.numbers_mt GROUP BY k) ARRAY JOIN ns; +SELECT count(), sum(toUInt64(ns[1])), max(toUInt64(ns[1])), sum(toUInt64(ns[2]))/10 FROM (SELECT intDiv(number, 100) AS k, groupArray([toString(number), toString(number*10)]) AS ns FROM test.numbers_mt GROUP BY k) ARRAY JOIN ns; +SELECT count(), sum(ns[1]), max(ns[1]), sum(ns[2])/10 FROM (SELECT intDiv(number, 100) AS k, groupArray([number, number*10]) AS ns FROM test.numbers_mt GROUP BY k) ARRAY JOIN ns; -SELECT count(), sum(ns), max(ns) FROM (SELECT intDiv(number, 100) AS k, groupArray2(number) AS ns FROM remote('127.0.0.{1,2}', 'test', 'numbers_mt') GROUP BY k) ARRAY JOIN ns; -SELECT count(), sum(toUInt64(ns)), max(toUInt64(ns)) FROM (SELECT intDiv(number, 100) AS k, groupArray2(toString(number)) AS ns FROM remote('127.0.0.{1,2}', 'test', 'numbers_mt') GROUP BY k) ARRAY JOIN ns; -SELECT count(), sum(toUInt64(ns[1])), max(toUInt64(ns[1])), sum(toUInt64(ns[2]))/10 FROM (SELECT intDiv(number, 100) AS k, groupArray2([toString(number), toString(number*10)]) AS ns FROM remote('127.0.0.{1,2}', 'test', 'numbers_mt') GROUP BY k) ARRAY JOIN ns; +SELECT count(), sum(ns), max(ns) FROM (SELECT intDiv(number, 100) AS k, groupArray(number) AS ns FROM remote('127.0.0.{1,2}', 'test', 'numbers_mt') GROUP BY k) ARRAY JOIN ns; +SELECT count(), sum(toUInt64(ns)), max(toUInt64(ns)) FROM (SELECT intDiv(number, 100) AS k, groupArray(toString(number)) AS ns FROM remote('127.0.0.{1,2}', 'test', 'numbers_mt') GROUP BY k) ARRAY JOIN ns; +SELECT count(), sum(toUInt64(ns[1])), max(toUInt64(ns[1])), sum(toUInt64(ns[2]))/10 FROM (SELECT intDiv(number, 100) AS k, groupArray([toString(number), toString(number*10)]) AS ns FROM remote('127.0.0.{1,2}', 'test', 'numbers_mt') GROUP BY k) ARRAY JOIN ns; DROP TABLE test.numbers_mt;