mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 07:31:57 +00:00
Cleanups #4246
This commit is contained in:
parent
75c087bcf5
commit
c603d270f4
@ -182,7 +182,7 @@ ColumnPtr ColumnAggregateFunction::filter(const Filter & filter, ssize_t result_
|
||||
}
|
||||
|
||||
|
||||
ColumnPtr ColumnAggregateFunction::permute(const Permutation & perm, size_t limit) const
|
||||
ColumnPtr ColumnAggregateFunction::permute(const Permutation & perm, UInt64 limit) const
|
||||
{
|
||||
size_t size = data.size();
|
||||
|
||||
@ -203,13 +203,13 @@ ColumnPtr ColumnAggregateFunction::permute(const Permutation & perm, size_t limi
|
||||
return res;
|
||||
}
|
||||
|
||||
ColumnPtr ColumnAggregateFunction::index(const IColumn & indexes, size_t limit) const
|
||||
ColumnPtr ColumnAggregateFunction::index(const IColumn & indexes, UInt64 limit) const
|
||||
{
|
||||
return selectIndexImpl(*this, indexes, limit);
|
||||
}
|
||||
|
||||
template <typename Type>
|
||||
ColumnPtr ColumnAggregateFunction::indexImpl(const PaddedPODArray<Type> & indexes, size_t limit) const
|
||||
ColumnPtr ColumnAggregateFunction::indexImpl(const PaddedPODArray<Type> & indexes, UInt64 limit) const
|
||||
{
|
||||
auto res = createView();
|
||||
|
||||
|
@ -161,12 +161,12 @@ public:
|
||||
|
||||
ColumnPtr filter(const Filter & filter, ssize_t result_size_hint) const override;
|
||||
|
||||
ColumnPtr permute(const Permutation & perm, size_t limit) const override;
|
||||
ColumnPtr permute(const Permutation & perm, UInt64 limit) const override;
|
||||
|
||||
ColumnPtr index(const IColumn & indexes, size_t limit) const override;
|
||||
ColumnPtr index(const IColumn & indexes, UInt64 limit) const override;
|
||||
|
||||
template <typename Type>
|
||||
ColumnPtr indexImpl(const PaddedPODArray<Type> & indexes, size_t limit) const;
|
||||
ColumnPtr indexImpl(const PaddedPODArray<Type> & indexes, UInt64 limit) const;
|
||||
|
||||
ColumnPtr replicate(const Offsets & offsets) const override;
|
||||
|
||||
@ -179,7 +179,7 @@ public:
|
||||
return 0;
|
||||
}
|
||||
|
||||
void getPermutation(bool reverse, size_t limit, int nan_direction_hint, Permutation & res) const override;
|
||||
void getPermutation(bool reverse, UInt64 limit, int nan_direction_hint, Permutation & res) const override;
|
||||
|
||||
/** More efficient manipulation methods */
|
||||
Container & getData()
|
||||
|
@ -589,7 +589,7 @@ ColumnPtr ColumnArray::filterTuple(const Filter & filt, ssize_t result_size_hint
|
||||
}
|
||||
|
||||
|
||||
ColumnPtr ColumnArray::permute(const Permutation & perm, size_t limit) const
|
||||
ColumnPtr ColumnArray::permute(const Permutation & perm, UInt64 limit) const
|
||||
{
|
||||
size_t size = getOffsets().size();
|
||||
|
||||
@ -626,13 +626,13 @@ ColumnPtr ColumnArray::permute(const Permutation & perm, size_t limit) const
|
||||
return res;
|
||||
}
|
||||
|
||||
ColumnPtr ColumnArray::index(const IColumn & indexes, size_t limit) const
|
||||
ColumnPtr ColumnArray::index(const IColumn & indexes, UInt64 limit) const
|
||||
{
|
||||
return selectIndexImpl(*this, indexes, limit);
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
ColumnPtr ColumnArray::indexImpl(const PaddedPODArray<T> & indexes, size_t limit) const
|
||||
ColumnPtr ColumnArray::indexImpl(const PaddedPODArray<T> & indexes, UInt64 limit) const
|
||||
{
|
||||
if (limit == 0)
|
||||
return ColumnArray::create(data);
|
||||
@ -664,7 +664,7 @@ ColumnPtr ColumnArray::indexImpl(const PaddedPODArray<T> & indexes, size_t limit
|
||||
|
||||
INSTANTIATE_INDEX_IMPL(ColumnArray)
|
||||
|
||||
void ColumnArray::getPermutation(bool reverse, size_t limit, int nan_direction_hint, Permutation & res) const
|
||||
void ColumnArray::getPermutation(bool reverse, UInt64 limit, int nan_direction_hint, Permutation & res) const
|
||||
{
|
||||
size_t s = size();
|
||||
if (limit >= s)
|
||||
|
@ -70,11 +70,11 @@ public:
|
||||
void insertDefault() override;
|
||||
void popBack(size_t n) override;
|
||||
ColumnPtr filter(const Filter & filt, ssize_t result_size_hint) const override;
|
||||
ColumnPtr permute(const Permutation & perm, size_t limit) const override;
|
||||
ColumnPtr index(const IColumn & indexes, size_t limit) const override;
|
||||
template <typename Type> ColumnPtr indexImpl(const PaddedPODArray<Type> & indexes, size_t limit) const;
|
||||
ColumnPtr permute(const Permutation & perm, UInt64 limit) const override;
|
||||
ColumnPtr index(const IColumn & indexes, UInt64 limit) const override;
|
||||
template <typename Type> ColumnPtr indexImpl(const PaddedPODArray<Type> & indexes, UInt64 limit) const;
|
||||
int compareAt(size_t n, size_t m, const IColumn & rhs_, int nan_direction_hint) const override;
|
||||
void getPermutation(bool reverse, size_t limit, int nan_direction_hint, Permutation & res) const override;
|
||||
void getPermutation(bool reverse, UInt64 limit, int nan_direction_hint, Permutation & res) const override;
|
||||
void reserve(size_t n) override;
|
||||
size_t byteSize() const override;
|
||||
size_t allocatedBytes() const override;
|
||||
|
@ -54,7 +54,7 @@ ColumnPtr ColumnConst::replicate(const Offsets & offsets) const
|
||||
return ColumnConst::create(data, replicated_size);
|
||||
}
|
||||
|
||||
ColumnPtr ColumnConst::permute(const Permutation & perm, size_t limit) const
|
||||
ColumnPtr ColumnConst::permute(const Permutation & perm, UInt64 limit) const
|
||||
{
|
||||
if (limit == 0)
|
||||
limit = s;
|
||||
@ -68,7 +68,7 @@ ColumnPtr ColumnConst::permute(const Permutation & perm, size_t limit) const
|
||||
return ColumnConst::create(data, limit);
|
||||
}
|
||||
|
||||
ColumnPtr ColumnConst::index(const IColumn & indexes, size_t limit) const
|
||||
ColumnPtr ColumnConst::index(const IColumn & indexes, UInt64 limit) const
|
||||
{
|
||||
if (limit == 0)
|
||||
limit = indexes.size();
|
||||
|
@ -154,9 +154,9 @@ public:
|
||||
|
||||
ColumnPtr filter(const Filter & filt, ssize_t result_size_hint) const override;
|
||||
ColumnPtr replicate(const Offsets & offsets) const override;
|
||||
ColumnPtr permute(const Permutation & perm, size_t limit) const override;
|
||||
ColumnPtr index(const IColumn & indexes, size_t limit) const override;
|
||||
void getPermutation(bool reverse, size_t limit, int nan_direction_hint, Permutation & res) const override;
|
||||
ColumnPtr permute(const Permutation & perm, UInt64 limit) const override;
|
||||
ColumnPtr index(const IColumn & indexes, UInt64 limit) const override;
|
||||
void getPermutation(bool reverse, UInt64 limit, int nan_direction_hint, Permutation & res) const override;
|
||||
|
||||
size_t byteSize() const override
|
||||
{
|
||||
|
@ -63,7 +63,7 @@ void ColumnDecimal<T>::updateHashWithValue(size_t n, SipHash & hash) const
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
void ColumnDecimal<T>::getPermutation(bool reverse, size_t limit, int , IColumn::Permutation & res) const
|
||||
void ColumnDecimal<T>::getPermutation(bool reverse, UInt64 limit, int , IColumn::Permutation & res) const
|
||||
{
|
||||
#if 1 /// TODO: perf test
|
||||
if (data.size() <= std::numeric_limits<UInt32>::max())
|
||||
@ -82,7 +82,7 @@ void ColumnDecimal<T>::getPermutation(bool reverse, size_t limit, int , IColumn:
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
ColumnPtr ColumnDecimal<T>::permute(const IColumn::Permutation & perm, size_t limit) const
|
||||
ColumnPtr ColumnDecimal<T>::permute(const IColumn::Permutation & perm, UInt64 limit) const
|
||||
{
|
||||
size_t size = limit ? std::min(data.size(), limit) : data.size();
|
||||
if (perm.size() < size)
|
||||
@ -173,7 +173,7 @@ ColumnPtr ColumnDecimal<T>::filter(const IColumn::Filter & filt, ssize_t result_
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
ColumnPtr ColumnDecimal<T>::index(const IColumn & indexes, size_t limit) const
|
||||
ColumnPtr ColumnDecimal<T>::index(const IColumn & indexes, UInt64 limit) const
|
||||
{
|
||||
return selectIndexImpl(*this, indexes, limit);
|
||||
}
|
||||
|
@ -101,7 +101,7 @@ public:
|
||||
const char * deserializeAndInsertFromArena(const char * pos) override;
|
||||
void updateHashWithValue(size_t n, SipHash & hash) const override;
|
||||
int compareAt(size_t n, size_t m, const IColumn & rhs_, int nan_direction_hint) const override;
|
||||
void getPermutation(bool reverse, size_t limit, int nan_direction_hint, IColumn::Permutation & res) const override;
|
||||
void getPermutation(bool reverse, UInt64 limit, int nan_direction_hint, IColumn::Permutation & res) const override;
|
||||
|
||||
MutableColumnPtr cloneResized(size_t size) const override;
|
||||
|
||||
@ -116,11 +116,11 @@ public:
|
||||
bool isDefaultAt(size_t n) const override { return data[n] == 0; }
|
||||
|
||||
ColumnPtr filter(const IColumn::Filter & filt, ssize_t result_size_hint) const override;
|
||||
ColumnPtr permute(const IColumn::Permutation & perm, size_t limit) const override;
|
||||
ColumnPtr index(const IColumn & indexes, size_t limit) const override;
|
||||
ColumnPtr permute(const IColumn::Permutation & perm, UInt64 limit) const override;
|
||||
ColumnPtr index(const IColumn & indexes, UInt64 limit) const override;
|
||||
|
||||
template <typename Type>
|
||||
ColumnPtr indexImpl(const PaddedPODArray<Type> & indexes, size_t limit) const;
|
||||
ColumnPtr indexImpl(const PaddedPODArray<Type> & indexes, UInt64 limit) const;
|
||||
|
||||
ColumnPtr replicate(const IColumn::Offsets & offsets) const override;
|
||||
void getExtremes(Field & min, Field & max) const override;
|
||||
@ -144,7 +144,7 @@ protected:
|
||||
UInt32 scale;
|
||||
|
||||
template <typename U>
|
||||
void permutation(bool reverse, size_t limit, PaddedPODArray<U> & res) const
|
||||
void permutation(bool reverse, UInt64 limit, PaddedPODArray<U> & res) const
|
||||
{
|
||||
size_t s = data.size();
|
||||
res.resize(s);
|
||||
@ -164,7 +164,7 @@ protected:
|
||||
|
||||
template <typename T>
|
||||
template <typename Type>
|
||||
ColumnPtr ColumnDecimal<T>::indexImpl(const PaddedPODArray<Type> & indexes, size_t limit) const
|
||||
ColumnPtr ColumnDecimal<T>::indexImpl(const PaddedPODArray<Type> & indexes, UInt64 limit) const
|
||||
{
|
||||
size_t size = indexes.size();
|
||||
|
||||
|
@ -112,7 +112,7 @@ struct ColumnFixedString::less
|
||||
}
|
||||
};
|
||||
|
||||
void ColumnFixedString::getPermutation(bool reverse, size_t limit, int /*nan_direction_hint*/, Permutation & res) const
|
||||
void ColumnFixedString::getPermutation(bool reverse, UInt64 limit, int /*nan_direction_hint*/, Permutation & res) const
|
||||
{
|
||||
size_t s = size();
|
||||
res.resize(s);
|
||||
@ -231,7 +231,7 @@ ColumnPtr ColumnFixedString::filter(const IColumn::Filter & filt, ssize_t result
|
||||
return res;
|
||||
}
|
||||
|
||||
ColumnPtr ColumnFixedString::permute(const Permutation & perm, size_t limit) const
|
||||
ColumnPtr ColumnFixedString::permute(const Permutation & perm, UInt64 limit) const
|
||||
{
|
||||
size_t col_size = size();
|
||||
|
||||
@ -260,14 +260,14 @@ ColumnPtr ColumnFixedString::permute(const Permutation & perm, size_t limit) con
|
||||
}
|
||||
|
||||
|
||||
ColumnPtr ColumnFixedString::index(const IColumn & indexes, size_t limit) const
|
||||
ColumnPtr ColumnFixedString::index(const IColumn & indexes, UInt64 limit) const
|
||||
{
|
||||
return selectIndexImpl(*this, indexes, limit);
|
||||
}
|
||||
|
||||
|
||||
template <typename Type>
|
||||
ColumnPtr ColumnFixedString::indexImpl(const PaddedPODArray<Type> & indexes, size_t limit) const
|
||||
ColumnPtr ColumnFixedString::indexImpl(const PaddedPODArray<Type> & indexes, UInt64 limit) const
|
||||
{
|
||||
if (limit == 0)
|
||||
return ColumnFixedString::create(n);
|
||||
|
@ -101,18 +101,18 @@ public:
|
||||
return memcmp(&chars[p1 * n], &rhs.chars[p2 * n], n);
|
||||
}
|
||||
|
||||
void getPermutation(bool reverse, size_t limit, int nan_direction_hint, Permutation & res) const override;
|
||||
void getPermutation(bool reverse, UInt64 limit, int nan_direction_hint, Permutation & res) const override;
|
||||
|
||||
void insertRangeFrom(const IColumn & src, size_t start, size_t length) override;
|
||||
|
||||
ColumnPtr filter(const IColumn::Filter & filt, ssize_t result_size_hint) const override;
|
||||
|
||||
ColumnPtr permute(const Permutation & perm, size_t limit) const override;
|
||||
ColumnPtr permute(const Permutation & perm, UInt64 limit) const override;
|
||||
|
||||
ColumnPtr index(const IColumn & indexes, size_t limit) const override;
|
||||
ColumnPtr index(const IColumn & indexes, UInt64 limit) const override;
|
||||
|
||||
template <typename Type>
|
||||
ColumnPtr indexImpl(const PaddedPODArray<Type> & indexes, size_t limit) const;
|
||||
ColumnPtr indexImpl(const PaddedPODArray<Type> & indexes, UInt64 limit) const;
|
||||
|
||||
ColumnPtr replicate(const Offsets & offsets) const override;
|
||||
|
||||
|
@ -70,7 +70,7 @@ ColumnPtr ColumnFunction::filter(const Filter & filt, ssize_t result_size_hint)
|
||||
return ColumnFunction::create(filtered_size, function, capture);
|
||||
}
|
||||
|
||||
ColumnPtr ColumnFunction::permute(const Permutation & perm, size_t limit) const
|
||||
ColumnPtr ColumnFunction::permute(const Permutation & perm, UInt64 limit) const
|
||||
{
|
||||
if (limit == 0)
|
||||
limit = size_;
|
||||
@ -88,7 +88,7 @@ ColumnPtr ColumnFunction::permute(const Permutation & perm, size_t limit) const
|
||||
return ColumnFunction::create(limit, function, capture);
|
||||
}
|
||||
|
||||
ColumnPtr ColumnFunction::index(const IColumn & indexes, size_t limit) const
|
||||
ColumnPtr ColumnFunction::index(const IColumn & indexes, UInt64 limit) const
|
||||
{
|
||||
ColumnsWithTypeAndName capture = captured_columns;
|
||||
for (auto & column : capture)
|
||||
|
@ -32,8 +32,8 @@ public:
|
||||
ColumnPtr cut(size_t start, size_t length) const override;
|
||||
ColumnPtr replicate(const Offsets & offsets) const override;
|
||||
ColumnPtr filter(const Filter & filt, ssize_t result_size_hint) const override;
|
||||
ColumnPtr permute(const Permutation & perm, size_t limit) const override;
|
||||
ColumnPtr index(const IColumn & indexes, size_t limit) const override;
|
||||
ColumnPtr permute(const Permutation & perm, UInt64 limit) const override;
|
||||
ColumnPtr index(const IColumn & indexes, UInt64 limit) const override;
|
||||
void insertDefault() override;
|
||||
void popBack(size_t n) override;
|
||||
std::vector<MutableColumnPtr> scatter(IColumn::ColumnIndex num_columns,
|
||||
|
@ -250,7 +250,7 @@ int ColumnLowCardinality::compareAt(size_t n, size_t m, const IColumn & rhs, int
|
||||
return getDictionary().compareAt(n_index, m_index, low_cardinality_column.getDictionary(), nan_direction_hint);
|
||||
}
|
||||
|
||||
void ColumnLowCardinality::getPermutation(bool reverse, size_t limit, int nan_direction_hint, Permutation & res) const
|
||||
void ColumnLowCardinality::getPermutation(bool reverse, UInt64 limit, int nan_direction_hint, Permutation & res) const
|
||||
{
|
||||
if (limit == 0)
|
||||
limit = size();
|
||||
@ -343,7 +343,7 @@ void ColumnLowCardinality::compactIfSharedDictionary()
|
||||
|
||||
|
||||
ColumnLowCardinality::DictionaryEncodedColumn
|
||||
ColumnLowCardinality::getMinimalDictionaryEncodedColumn(size_t offset, size_t limit) const
|
||||
ColumnLowCardinality::getMinimalDictionaryEncodedColumn(size_t offset, UInt64 limit) const
|
||||
{
|
||||
MutableColumnPtr sub_indexes = (*std::move(idx.getPositions()->cut(offset, limit))).mutate();
|
||||
auto indexes_map = mapUniqueIndex(*sub_indexes);
|
||||
@ -527,7 +527,7 @@ void ColumnLowCardinality::Index::insertPosition(UInt64 position)
|
||||
checkSizeOfType();
|
||||
}
|
||||
|
||||
void ColumnLowCardinality::Index::insertPositionsRange(const IColumn & column, size_t offset, size_t limit)
|
||||
void ColumnLowCardinality::Index::insertPositionsRange(const IColumn & column, size_t offset, UInt64 limit)
|
||||
{
|
||||
auto insertForType = [&](auto type)
|
||||
{
|
||||
|
@ -90,19 +90,19 @@ public:
|
||||
return ColumnLowCardinality::create(dictionary.getColumnUniquePtr(), getIndexes().filter(filt, result_size_hint));
|
||||
}
|
||||
|
||||
ColumnPtr permute(const Permutation & perm, size_t limit) const override
|
||||
ColumnPtr permute(const Permutation & perm, UInt64 limit) const override
|
||||
{
|
||||
return ColumnLowCardinality::create(dictionary.getColumnUniquePtr(), getIndexes().permute(perm, limit));
|
||||
}
|
||||
|
||||
ColumnPtr index(const IColumn & indexes_, size_t limit) const override
|
||||
ColumnPtr index(const IColumn & indexes_, UInt64 limit) const override
|
||||
{
|
||||
return ColumnLowCardinality::create(dictionary.getColumnUniquePtr(), getIndexes().index(indexes_, limit));
|
||||
}
|
||||
|
||||
int compareAt(size_t n, size_t m, const IColumn & rhs, int nan_direction_hint) const override;
|
||||
|
||||
void getPermutation(bool reverse, size_t limit, int nan_direction_hint, Permutation & res) const override;
|
||||
void getPermutation(bool reverse, UInt64 limit, int nan_direction_hint, Permutation & res) const override;
|
||||
|
||||
ColumnPtr replicate(const Offsets & offsets) const override
|
||||
{
|
||||
@ -180,7 +180,7 @@ public:
|
||||
ColumnPtr indexes;
|
||||
};
|
||||
|
||||
DictionaryEncodedColumn getMinimalDictionaryEncodedColumn(size_t offset, size_t limit) const;
|
||||
DictionaryEncodedColumn getMinimalDictionaryEncodedColumn(size_t offset, UInt64 limit) const;
|
||||
|
||||
ColumnPtr countKeys() const;
|
||||
|
||||
@ -196,7 +196,7 @@ public:
|
||||
ColumnPtr & getPositionsPtr() { return positions; }
|
||||
size_t getPositionAt(size_t row) const;
|
||||
void insertPosition(UInt64 position);
|
||||
void insertPositionsRange(const IColumn & column, size_t offset, size_t limit);
|
||||
void insertPositionsRange(const IColumn & column, size_t offset, UInt64 limit);
|
||||
|
||||
void popBack(size_t n) { positions->assumeMutableRef().popBack(n); }
|
||||
void reserve(size_t n) { positions->assumeMutableRef().reserve(n); }
|
||||
|
@ -158,14 +158,14 @@ ColumnPtr ColumnNullable::filter(const Filter & filt, ssize_t result_size_hint)
|
||||
return ColumnNullable::create(filtered_data, filtered_null_map);
|
||||
}
|
||||
|
||||
ColumnPtr ColumnNullable::permute(const Permutation & perm, size_t limit) const
|
||||
ColumnPtr ColumnNullable::permute(const Permutation & perm, UInt64 limit) const
|
||||
{
|
||||
ColumnPtr permuted_data = getNestedColumn().permute(perm, limit);
|
||||
ColumnPtr permuted_null_map = getNullMapColumn().permute(perm, limit);
|
||||
return ColumnNullable::create(permuted_data, permuted_null_map);
|
||||
}
|
||||
|
||||
ColumnPtr ColumnNullable::index(const IColumn & indexes, size_t limit) const
|
||||
ColumnPtr ColumnNullable::index(const IColumn & indexes, UInt64 limit) const
|
||||
{
|
||||
ColumnPtr indexed_data = getNestedColumn().index(indexes, limit);
|
||||
ColumnPtr indexed_null_map = getNullMapColumn().index(indexes, limit);
|
||||
@ -197,7 +197,7 @@ int ColumnNullable::compareAt(size_t n, size_t m, const IColumn & rhs_, int null
|
||||
return getNestedColumn().compareAt(n, m, nested_rhs, null_direction_hint);
|
||||
}
|
||||
|
||||
void ColumnNullable::getPermutation(bool reverse, size_t limit, int null_direction_hint, Permutation & res) const
|
||||
void ColumnNullable::getPermutation(bool reverse, UInt64 limit, int null_direction_hint, Permutation & res) const
|
||||
{
|
||||
/// Cannot pass limit because of unknown amount of NULLs.
|
||||
getNestedColumn().getPermutation(reverse, 0, null_direction_hint, res);
|
||||
|
@ -64,10 +64,10 @@ public:
|
||||
|
||||
void popBack(size_t n) override;
|
||||
ColumnPtr filter(const Filter & filt, ssize_t result_size_hint) const override;
|
||||
ColumnPtr permute(const Permutation & perm, size_t limit) const override;
|
||||
ColumnPtr index(const IColumn & indexes, size_t limit) const override;
|
||||
ColumnPtr permute(const Permutation & perm, UInt64 limit) const override;
|
||||
ColumnPtr index(const IColumn & indexes, UInt64 limit) const override;
|
||||
int compareAt(size_t n, size_t m, const IColumn & rhs_, int null_direction_hint) const override;
|
||||
void getPermutation(bool reverse, size_t limit, int null_direction_hint, Permutation & res) const override;
|
||||
void getPermutation(bool reverse, UInt64 limit, int null_direction_hint, Permutation & res) const override;
|
||||
void reserve(size_t n) override;
|
||||
size_t byteSize() const override;
|
||||
size_t allocatedBytes() const override;
|
||||
|
@ -111,7 +111,7 @@ ColumnPtr ColumnString::filter(const Filter & filt, ssize_t result_size_hint) co
|
||||
}
|
||||
|
||||
|
||||
ColumnPtr ColumnString::permute(const Permutation & perm, size_t limit) const
|
||||
ColumnPtr ColumnString::permute(const Permutation & perm, UInt64 limit) const
|
||||
{
|
||||
size_t size = offsets.size();
|
||||
|
||||
@ -191,13 +191,13 @@ const char * ColumnString::deserializeAndInsertFromArena(const char * pos)
|
||||
}
|
||||
|
||||
|
||||
ColumnPtr ColumnString::index(const IColumn & indexes, size_t limit) const
|
||||
ColumnPtr ColumnString::index(const IColumn & indexes, UInt64 limit) const
|
||||
{
|
||||
return selectIndexImpl(*this, indexes, limit);
|
||||
}
|
||||
|
||||
template <typename Type>
|
||||
ColumnPtr ColumnString::indexImpl(const PaddedPODArray<Type> & indexes, size_t limit) const
|
||||
ColumnPtr ColumnString::indexImpl(const PaddedPODArray<Type> & indexes, UInt64 limit) const
|
||||
{
|
||||
if (limit == 0)
|
||||
return ColumnString::create();
|
||||
@ -251,7 +251,7 @@ struct ColumnString::less
|
||||
}
|
||||
};
|
||||
|
||||
void ColumnString::getPermutation(bool reverse, size_t limit, int /*nan_direction_hint*/, Permutation & res) const
|
||||
void ColumnString::getPermutation(bool reverse, UInt64 limit, int /*nan_direction_hint*/, Permutation & res) const
|
||||
{
|
||||
size_t s = offsets.size();
|
||||
res.resize(s);
|
||||
@ -389,7 +389,7 @@ struct ColumnString::lessWithCollation
|
||||
}
|
||||
};
|
||||
|
||||
void ColumnString::getPermutationWithCollation(const Collator & collator, bool reverse, size_t limit, Permutation & res) const
|
||||
void ColumnString::getPermutationWithCollation(const Collator & collator, bool reverse, UInt64 limit, Permutation & res) const
|
||||
{
|
||||
size_t s = offsets.size();
|
||||
res.resize(s);
|
||||
|
@ -194,12 +194,12 @@ public:
|
||||
|
||||
ColumnPtr filter(const Filter & filt, ssize_t result_size_hint) const override;
|
||||
|
||||
ColumnPtr permute(const Permutation & perm, size_t limit) const override;
|
||||
ColumnPtr permute(const Permutation & perm, UInt64 limit) const override;
|
||||
|
||||
ColumnPtr index(const IColumn & indexes, size_t limit) const override;
|
||||
ColumnPtr index(const IColumn & indexes, UInt64 limit) const override;
|
||||
|
||||
template <typename Type>
|
||||
ColumnPtr indexImpl(const PaddedPODArray<Type> & indexes, size_t limit) const;
|
||||
ColumnPtr indexImpl(const PaddedPODArray<Type> & indexes, UInt64 limit) const;
|
||||
|
||||
void insertDefault() override
|
||||
{
|
||||
@ -225,10 +225,10 @@ public:
|
||||
/// Variant of compareAt for string comparison with respect of collation.
|
||||
int compareAtWithCollation(size_t n, size_t m, const IColumn & rhs_, const Collator & collator) const;
|
||||
|
||||
void getPermutation(bool reverse, size_t limit, int nan_direction_hint, Permutation & res) const override;
|
||||
void getPermutation(bool reverse, UInt64 limit, int nan_direction_hint, Permutation & res) const override;
|
||||
|
||||
/// Sorting with respect of collation.
|
||||
void getPermutationWithCollation(const Collator & collator, bool reverse, size_t limit, Permutation & res) const;
|
||||
void getPermutationWithCollation(const Collator & collator, bool reverse, UInt64 limit, Permutation & res) const;
|
||||
|
||||
ColumnPtr replicate(const Offsets & replicate_offsets) const override;
|
||||
|
||||
|
@ -170,7 +170,7 @@ ColumnPtr ColumnTuple::filter(const Filter & filt, ssize_t result_size_hint) con
|
||||
return ColumnTuple::create(new_columns);
|
||||
}
|
||||
|
||||
ColumnPtr ColumnTuple::permute(const Permutation & perm, size_t limit) const
|
||||
ColumnPtr ColumnTuple::permute(const Permutation & perm, UInt64 limit) const
|
||||
{
|
||||
const size_t tuple_size = columns.size();
|
||||
Columns new_columns(tuple_size);
|
||||
@ -181,7 +181,7 @@ ColumnPtr ColumnTuple::permute(const Permutation & perm, size_t limit) const
|
||||
return ColumnTuple::create(new_columns);
|
||||
}
|
||||
|
||||
ColumnPtr ColumnTuple::index(const IColumn & indexes, size_t limit) const
|
||||
ColumnPtr ColumnTuple::index(const IColumn & indexes, UInt64 limit) const
|
||||
{
|
||||
const size_t tuple_size = columns.size();
|
||||
Columns new_columns(tuple_size);
|
||||
@ -261,7 +261,7 @@ struct ColumnTuple::Less
|
||||
}
|
||||
};
|
||||
|
||||
void ColumnTuple::getPermutation(bool reverse, size_t limit, int nan_direction_hint, Permutation & res) const
|
||||
void ColumnTuple::getPermutation(bool reverse, UInt64 limit, int nan_direction_hint, Permutation & res) const
|
||||
{
|
||||
size_t rows = size();
|
||||
res.resize(rows);
|
||||
|
@ -60,14 +60,14 @@ public:
|
||||
void updateHashWithValue(size_t n, SipHash & hash) const override;
|
||||
void insertRangeFrom(const IColumn & src, size_t start, size_t length) override;
|
||||
ColumnPtr filter(const Filter & filt, ssize_t result_size_hint) const override;
|
||||
ColumnPtr permute(const Permutation & perm, size_t limit) const override;
|
||||
ColumnPtr index(const IColumn & indexes, size_t limit) const override;
|
||||
ColumnPtr permute(const Permutation & perm, UInt64 limit) const override;
|
||||
ColumnPtr index(const IColumn & indexes, UInt64 limit) const override;
|
||||
ColumnPtr replicate(const Offsets & offsets) const override;
|
||||
MutableColumns scatter(ColumnIndex num_columns, const Selector & selector) const override;
|
||||
void gather(ColumnGathererStream & gatherer_stream) override;
|
||||
int compareAt(size_t n, size_t m, const IColumn & rhs, int nan_direction_hint) const override;
|
||||
void getExtremes(Field & min, Field & max) const override;
|
||||
void getPermutation(bool reverse, size_t limit, int nan_direction_hint, Permutation & res) const override;
|
||||
void getPermutation(bool reverse, UInt64 limit, int nan_direction_hint, Permutation & res) const override;
|
||||
void reserve(size_t n) override;
|
||||
size_t byteSize() const override;
|
||||
size_t allocatedBytes() const override;
|
||||
|
@ -71,7 +71,7 @@ struct ColumnVector<T>::greater
|
||||
};
|
||||
|
||||
template <typename T>
|
||||
void ColumnVector<T>::getPermutation(bool reverse, size_t limit, int nan_direction_hint, IColumn::Permutation & res) const
|
||||
void ColumnVector<T>::getPermutation(bool reverse, UInt64 limit, int nan_direction_hint, IColumn::Permutation & res) const
|
||||
{
|
||||
size_t s = data.size();
|
||||
res.resize(s);
|
||||
@ -211,7 +211,7 @@ ColumnPtr ColumnVector<T>::filter(const IColumn::Filter & filt, ssize_t result_s
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
ColumnPtr ColumnVector<T>::permute(const IColumn::Permutation & perm, size_t limit) const
|
||||
ColumnPtr ColumnVector<T>::permute(const IColumn::Permutation & perm, UInt64 limit) const
|
||||
{
|
||||
size_t size = data.size();
|
||||
|
||||
@ -232,7 +232,7 @@ ColumnPtr ColumnVector<T>::permute(const IColumn::Permutation & perm, size_t lim
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
ColumnPtr ColumnVector<T>::index(const IColumn & indexes, size_t limit) const
|
||||
ColumnPtr ColumnVector<T>::index(const IColumn & indexes, UInt64 limit) const
|
||||
{
|
||||
return selectIndexImpl(*this, indexes, limit);
|
||||
}
|
||||
|
@ -174,7 +174,7 @@ public:
|
||||
return CompareHelper<T>::compare(data[n], static_cast<const Self &>(rhs_).data[m], nan_direction_hint);
|
||||
}
|
||||
|
||||
void getPermutation(bool reverse, size_t limit, int nan_direction_hint, IColumn::Permutation & res) const override;
|
||||
void getPermutation(bool reverse, UInt64 limit, int nan_direction_hint, IColumn::Permutation & res) const override;
|
||||
|
||||
void reserve(size_t n) override
|
||||
{
|
||||
@ -221,12 +221,12 @@ public:
|
||||
|
||||
ColumnPtr filter(const IColumn::Filter & filt, ssize_t result_size_hint) const override;
|
||||
|
||||
ColumnPtr permute(const IColumn::Permutation & perm, size_t limit) const override;
|
||||
ColumnPtr permute(const IColumn::Permutation & perm, UInt64 limit) const override;
|
||||
|
||||
ColumnPtr index(const IColumn & indexes, size_t limit) const override;
|
||||
ColumnPtr index(const IColumn & indexes, UInt64 limit) const override;
|
||||
|
||||
template <typename Type>
|
||||
ColumnPtr indexImpl(const PaddedPODArray<Type> & indexes, size_t limit) const;
|
||||
ColumnPtr indexImpl(const PaddedPODArray<Type> & indexes, UInt64 limit) const;
|
||||
|
||||
ColumnPtr replicate(const IColumn::Offsets & offsets) const override;
|
||||
|
||||
@ -273,7 +273,7 @@ protected:
|
||||
|
||||
template <typename T>
|
||||
template <typename Type>
|
||||
ColumnPtr ColumnVector<T>::indexImpl(const PaddedPODArray<Type> & indexes, size_t limit) const
|
||||
ColumnPtr ColumnVector<T>::indexImpl(const PaddedPODArray<Type> & indexes, UInt64 limit) const
|
||||
{
|
||||
size_t size = indexes.size();
|
||||
|
||||
|
@ -44,9 +44,9 @@ namespace detail
|
||||
const PaddedPODArray<T> * getIndexesData(const IColumn & indexes);
|
||||
}
|
||||
|
||||
/// Check limit <= indexes->size() and call column.indexImpl(const PaddedPodArray<Type> & indexes, size_t limit).
|
||||
/// Check limit <= indexes->size() and call column.indexImpl(const PaddedPodArray<Type> & indexes, UInt64 limit).
|
||||
template <typename Column>
|
||||
ColumnPtr selectIndexImpl(const Column & column, const IColumn & indexes, size_t limit)
|
||||
ColumnPtr selectIndexImpl(const Column & column, const IColumn & indexes, UInt64 limit)
|
||||
{
|
||||
if (limit == 0)
|
||||
limit = indexes.size();
|
||||
@ -68,8 +68,8 @@ ColumnPtr selectIndexImpl(const Column & column, const IColumn & indexes, size_t
|
||||
}
|
||||
|
||||
#define INSTANTIATE_INDEX_IMPL(Column) \
|
||||
template ColumnPtr Column::indexImpl<UInt8>(const PaddedPODArray<UInt8> & indexes, size_t limit) const; \
|
||||
template ColumnPtr Column::indexImpl<UInt16>(const PaddedPODArray<UInt16> & indexes, size_t limit) const; \
|
||||
template ColumnPtr Column::indexImpl<UInt32>(const PaddedPODArray<UInt32> & indexes, size_t limit) const; \
|
||||
template ColumnPtr Column::indexImpl<UInt64>(const PaddedPODArray<UInt64> & indexes, size_t limit) const;
|
||||
template ColumnPtr Column::indexImpl<UInt8>(const PaddedPODArray<UInt8> & indexes, UInt64 limit) const; \
|
||||
template ColumnPtr Column::indexImpl<UInt16>(const PaddedPODArray<UInt16> & indexes, UInt64 limit) const; \
|
||||
template ColumnPtr Column::indexImpl<UInt32>(const PaddedPODArray<UInt32> & indexes, UInt64 limit) const; \
|
||||
template ColumnPtr Column::indexImpl<UInt64>(const PaddedPODArray<UInt64> & indexes, UInt64 limit) const;
|
||||
}
|
||||
|
@ -184,11 +184,11 @@ public:
|
||||
/// Permutes elements using specified permutation. Is used in sortings.
|
||||
/// limit - if it isn't 0, puts only first limit elements in the result.
|
||||
using Permutation = PaddedPODArray<size_t>;
|
||||
virtual Ptr permute(const Permutation & perm, size_t limit) const = 0;
|
||||
virtual Ptr permute(const Permutation & perm, UInt64 limit) const = 0;
|
||||
|
||||
/// Creates new column with values column[indexes[:limit]]. If limit is 0, all indexes are used.
|
||||
/// Indexes must be one of the ColumnUInt. For default implementation, see selectIndexImpl from ColumnsCommon.h
|
||||
virtual Ptr index(const IColumn & indexes, size_t limit) const = 0;
|
||||
virtual Ptr index(const IColumn & indexes, UInt64 limit) const = 0;
|
||||
|
||||
/** Compares (*this)[n] and rhs[m]. Column rhs should have the same type.
|
||||
* Returns negative number, 0, or positive number (*this)[n] is less, equal, greater than rhs[m] respectively.
|
||||
@ -209,7 +209,7 @@ public:
|
||||
* limit - if isn't 0, then only first limit elements of the result column could be sorted.
|
||||
* nan_direction_hint - see above.
|
||||
*/
|
||||
virtual void getPermutation(bool reverse, size_t limit, int nan_direction_hint, Permutation & res) const = 0;
|
||||
virtual void getPermutation(bool reverse, UInt64 limit, int nan_direction_hint, Permutation & res) const = 0;
|
||||
|
||||
/** Copies each element according offsets parameter.
|
||||
* (i-th element should be copied offsets[i] - offsets[i - 1] times.)
|
||||
|
@ -79,7 +79,7 @@ public:
|
||||
return cloneDummy(countBytesInFilter(filt));
|
||||
}
|
||||
|
||||
ColumnPtr permute(const Permutation & perm, size_t limit) const override
|
||||
ColumnPtr permute(const Permutation & perm, UInt64 limit) const override
|
||||
{
|
||||
if (s != perm.size())
|
||||
throw Exception("Size of permutation doesn't match size of column.", ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH);
|
||||
@ -87,7 +87,7 @@ public:
|
||||
return cloneDummy(limit ? std::min(s, limit) : s);
|
||||
}
|
||||
|
||||
ColumnPtr index(const IColumn & indexes, size_t limit) const override
|
||||
ColumnPtr index(const IColumn & indexes, UInt64 limit) const override
|
||||
{
|
||||
if (indexes.size() < limit)
|
||||
throw Exception("Size of indexes is less than required.", ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH);
|
||||
|
@ -108,7 +108,7 @@ public:
|
||||
private:
|
||||
size_t count = 0;
|
||||
const size_t max_speed = 0;
|
||||
const size_t limit = 0; /// 0 - not limited.
|
||||
const UInt64 limit = 0; /// 0 - not limited.
|
||||
const char * limit_exceeded_exception_message = nullptr;
|
||||
Stopwatch watch {CLOCK_MONOTONIC_COARSE};
|
||||
std::mutex mutex;
|
||||
|
@ -8,7 +8,7 @@ namespace ErrorCodes
|
||||
extern const int SET_SIZE_LIMIT_EXCEEDED;
|
||||
}
|
||||
|
||||
DistinctBlockInputStream::DistinctBlockInputStream(const BlockInputStreamPtr & input, const SizeLimits & set_size_limits, size_t limit_hint_, const Names & columns)
|
||||
DistinctBlockInputStream::DistinctBlockInputStream(const BlockInputStreamPtr & input, const SizeLimits & set_size_limits, UInt64 limit_hint_, const Names & columns)
|
||||
: columns_names(columns)
|
||||
, limit_hint(limit_hint_)
|
||||
, set_size_limits(set_size_limits)
|
||||
|
@ -17,7 +17,7 @@ class DistinctBlockInputStream : public IBlockInputStream
|
||||
{
|
||||
public:
|
||||
/// Empty columns_ means all collumns.
|
||||
DistinctBlockInputStream(const BlockInputStreamPtr & input, const SizeLimits & set_size_limits, size_t limit_hint_, const Names & columns);
|
||||
DistinctBlockInputStream(const BlockInputStreamPtr & input, const SizeLimits & set_size_limits, UInt64 limit_hint_, const Names & columns);
|
||||
|
||||
String getName() const override { return "Distinct"; }
|
||||
|
||||
@ -41,7 +41,7 @@ private:
|
||||
Names columns_names;
|
||||
SetVariants data;
|
||||
Sizes key_sizes;
|
||||
size_t limit_hint;
|
||||
UInt64 limit_hint;
|
||||
|
||||
bool no_more_rows = false;
|
||||
|
||||
|
@ -9,7 +9,7 @@ namespace ErrorCodes
|
||||
}
|
||||
|
||||
DistinctSortedBlockInputStream::DistinctSortedBlockInputStream(
|
||||
const BlockInputStreamPtr & input, const SizeLimits & set_size_limits, size_t limit_hint_, const Names & columns)
|
||||
const BlockInputStreamPtr & input, const SizeLimits & set_size_limits, UInt64 limit_hint_, const Names & columns)
|
||||
: description(input->getSortDescription())
|
||||
, columns_names(columns)
|
||||
, limit_hint(limit_hint_)
|
||||
|
@ -21,7 +21,7 @@ class DistinctSortedBlockInputStream : public IBlockInputStream
|
||||
{
|
||||
public:
|
||||
/// Empty columns_ means all collumns.
|
||||
DistinctSortedBlockInputStream(const BlockInputStreamPtr & input, const SizeLimits & set_size_limits, size_t limit_hint_, const Names & columns);
|
||||
DistinctSortedBlockInputStream(const BlockInputStreamPtr & input, const SizeLimits & set_size_limits, UInt64 limit_hint_, const Names & columns);
|
||||
|
||||
String getName() const override { return "DistinctSorted"; }
|
||||
|
||||
@ -59,7 +59,7 @@ private:
|
||||
Names columns_names;
|
||||
ClearableSetVariants data;
|
||||
Sizes key_sizes;
|
||||
size_t limit_hint;
|
||||
UInt64 limit_hint;
|
||||
|
||||
/// Restrictions on the maximum size of the output data.
|
||||
SizeLimits set_size_limits;
|
||||
|
@ -26,7 +26,7 @@ static bool isPrefix(const SortDescription & pref_descr, const SortDescription &
|
||||
FinishSortingBlockInputStream::FinishSortingBlockInputStream(
|
||||
const BlockInputStreamPtr & input, const SortDescription & description_sorted_,
|
||||
const SortDescription & description_to_sort_,
|
||||
size_t max_merged_block_size_, size_t limit_)
|
||||
size_t max_merged_block_size_, UInt64 limit_)
|
||||
: description_sorted(description_sorted_), description_to_sort(description_to_sort_),
|
||||
max_merged_block_size(max_merged_block_size_), limit(limit_)
|
||||
{
|
||||
|
@ -17,7 +17,7 @@ public:
|
||||
/// limit - if not 0, allowed to return just first 'limit' rows in sorted order.
|
||||
FinishSortingBlockInputStream(const BlockInputStreamPtr & input, const SortDescription & description_sorted_,
|
||||
const SortDescription & description_to_sort_,
|
||||
size_t max_merged_block_size_, size_t limit_);
|
||||
size_t max_merged_block_size_, UInt64 limit_);
|
||||
|
||||
String getName() const override { return "FinishSorting"; }
|
||||
|
||||
@ -33,7 +33,7 @@ private:
|
||||
SortDescription description_sorted;
|
||||
SortDescription description_to_sort;
|
||||
size_t max_merged_block_size;
|
||||
size_t limit;
|
||||
UInt64 limit;
|
||||
|
||||
Block tail_block;
|
||||
Blocks blocks;
|
||||
|
@ -6,7 +6,7 @@
|
||||
namespace DB
|
||||
{
|
||||
|
||||
LimitBlockInputStream::LimitBlockInputStream(const BlockInputStreamPtr & input, size_t limit_, size_t offset_, bool always_read_till_end_)
|
||||
LimitBlockInputStream::LimitBlockInputStream(const BlockInputStreamPtr & input, UInt64 limit_, size_t offset_, bool always_read_till_end_)
|
||||
: limit(limit_), offset(offset_), always_read_till_end(always_read_till_end_)
|
||||
{
|
||||
children.push_back(input);
|
||||
|
@ -17,7 +17,7 @@ public:
|
||||
* If always_read_till_end = true - reads all the data to the end, but ignores them. This is necessary in rare cases:
|
||||
* when otherwise, due to the cancellation of the request, we would not have received the data for GROUP BY WITH TOTALS from the remote server.
|
||||
*/
|
||||
LimitBlockInputStream(const BlockInputStreamPtr & input, size_t limit_, size_t offset_, bool always_read_till_end_ = false);
|
||||
LimitBlockInputStream(const BlockInputStreamPtr & input, UInt64 limit_, size_t offset_, bool always_read_till_end_ = false);
|
||||
|
||||
String getName() const override { return "Limit"; }
|
||||
|
||||
@ -27,7 +27,7 @@ protected:
|
||||
Block readImpl() override;
|
||||
|
||||
private:
|
||||
size_t limit;
|
||||
UInt64 limit;
|
||||
size_t offset;
|
||||
size_t pos = 0;
|
||||
bool always_read_till_end;
|
||||
|
@ -20,7 +20,7 @@ namespace DB
|
||||
|
||||
MergeSortingBlockInputStream::MergeSortingBlockInputStream(
|
||||
const BlockInputStreamPtr & input, SortDescription & description_,
|
||||
size_t max_merged_block_size_, size_t limit_, size_t max_bytes_before_remerge_,
|
||||
size_t max_merged_block_size_, UInt64 limit_, size_t max_bytes_before_remerge_,
|
||||
size_t max_bytes_before_external_sort_, const std::string & tmp_path_)
|
||||
: description(description_), max_merged_block_size(max_merged_block_size_), limit(limit_),
|
||||
max_bytes_before_remerge(max_bytes_before_remerge_),
|
||||
@ -134,7 +134,7 @@ Block MergeSortingBlockInputStream::readImpl()
|
||||
|
||||
|
||||
MergeSortingBlocksBlockInputStream::MergeSortingBlocksBlockInputStream(
|
||||
Blocks & blocks_, SortDescription & description_, size_t max_merged_block_size_, size_t limit_)
|
||||
Blocks & blocks_, SortDescription & description_, size_t max_merged_block_size_, UInt64 limit_)
|
||||
: blocks(blocks_), header(blocks.at(0).cloneEmpty()), description(description_), max_merged_block_size(max_merged_block_size_), limit(limit_)
|
||||
{
|
||||
Blocks nonempty_blocks;
|
||||
|
@ -30,7 +30,7 @@ class MergeSortingBlocksBlockInputStream : public IBlockInputStream
|
||||
public:
|
||||
/// limit - if not 0, allowed to return just first 'limit' rows in sorted order.
|
||||
MergeSortingBlocksBlockInputStream(Blocks & blocks_, SortDescription & description_,
|
||||
size_t max_merged_block_size_, size_t limit_ = 0);
|
||||
size_t max_merged_block_size_, UInt64 limit_ = 0);
|
||||
|
||||
String getName() const override { return "MergeSortingBlocks"; }
|
||||
|
||||
@ -47,7 +47,7 @@ private:
|
||||
Block header;
|
||||
SortDescription description;
|
||||
size_t max_merged_block_size;
|
||||
size_t limit;
|
||||
UInt64 limit;
|
||||
size_t total_merged_rows = 0;
|
||||
|
||||
using CursorImpls = std::vector<SortCursorImpl>;
|
||||
@ -71,7 +71,7 @@ class MergeSortingBlockInputStream : public IBlockInputStream
|
||||
public:
|
||||
/// limit - if not 0, allowed to return just first 'limit' rows in sorted order.
|
||||
MergeSortingBlockInputStream(const BlockInputStreamPtr & input, SortDescription & description_,
|
||||
size_t max_merged_block_size_, size_t limit_,
|
||||
size_t max_merged_block_size_, UInt64 limit_,
|
||||
size_t max_bytes_before_remerge_,
|
||||
size_t max_bytes_before_external_sort_, const std::string & tmp_path_);
|
||||
|
||||
@ -88,7 +88,7 @@ protected:
|
||||
private:
|
||||
SortDescription description;
|
||||
size_t max_merged_block_size;
|
||||
size_t limit;
|
||||
UInt64 limit;
|
||||
|
||||
size_t max_bytes_before_remerge;
|
||||
size_t max_bytes_before_external_sort;
|
||||
|
@ -17,7 +17,7 @@ namespace ErrorCodes
|
||||
|
||||
MergingSortedBlockInputStream::MergingSortedBlockInputStream(
|
||||
const BlockInputStreams & inputs_, const SortDescription & description_,
|
||||
size_t max_block_size_, size_t limit_, WriteBuffer * out_row_sources_buf_, bool quiet_)
|
||||
size_t max_block_size_, UInt64 limit_, WriteBuffer * out_row_sources_buf_, bool quiet_)
|
||||
: description(description_), max_block_size(max_block_size_), limit(limit_), quiet(quiet_)
|
||||
, source_blocks(inputs_.size()), cursors(inputs_.size()), out_row_sources_buf(out_row_sources_buf_)
|
||||
{
|
||||
|
@ -69,7 +69,7 @@ public:
|
||||
*/
|
||||
MergingSortedBlockInputStream(
|
||||
const BlockInputStreams & inputs_, const SortDescription & description_, size_t max_block_size_,
|
||||
size_t limit_ = 0, WriteBuffer * out_row_sources_buf_ = nullptr, bool quiet_ = false);
|
||||
UInt64 limit_ = 0, WriteBuffer * out_row_sources_buf_ = nullptr, bool quiet_ = false);
|
||||
|
||||
String getName() const override { return "MergingSorted"; }
|
||||
|
||||
@ -134,7 +134,7 @@ protected:
|
||||
|
||||
const SortDescription description;
|
||||
const size_t max_block_size;
|
||||
size_t limit;
|
||||
UInt64 limit;
|
||||
size_t total_merged_rows = 0;
|
||||
|
||||
bool first = true;
|
||||
|
@ -41,7 +41,7 @@ void NativeBlockOutputStream::flush()
|
||||
}
|
||||
|
||||
|
||||
void NativeBlockOutputStream::writeData(const IDataType & type, const ColumnPtr & column, WriteBuffer & ostr, size_t offset, size_t limit)
|
||||
void NativeBlockOutputStream::writeData(const IDataType & type, const ColumnPtr & column, WriteBuffer & ostr, size_t offset, UInt64 limit)
|
||||
{
|
||||
/** If there are columns-constants - then we materialize them.
|
||||
* (Since the data type does not know how to serialize / deserialize constants.)
|
||||
|
@ -30,7 +30,7 @@ public:
|
||||
void write(const Block & block) override;
|
||||
void flush() override;
|
||||
|
||||
static void writeData(const IDataType & type, const ColumnPtr & column, WriteBuffer & ostr, size_t offset, size_t limit);
|
||||
static void writeData(const IDataType & type, const ColumnPtr & column, WriteBuffer & ostr, size_t offset, UInt64 limit);
|
||||
|
||||
String getContentType() const override { return "application/octet-stream"; }
|
||||
|
||||
|
@ -15,7 +15,7 @@ class PartialSortingBlockInputStream : public IBlockInputStream
|
||||
{
|
||||
public:
|
||||
/// limit - if not 0, then you can sort each block not completely, but only `limit` first rows by order.
|
||||
PartialSortingBlockInputStream(const BlockInputStreamPtr & input_, SortDescription & description_, size_t limit_ = 0)
|
||||
PartialSortingBlockInputStream(const BlockInputStreamPtr & input_, SortDescription & description_, UInt64 limit_ = 0)
|
||||
: description(description_), limit(limit_)
|
||||
{
|
||||
children.push_back(input_);
|
||||
@ -29,7 +29,7 @@ protected:
|
||||
|
||||
private:
|
||||
SortDescription description;
|
||||
size_t limit;
|
||||
UInt64 limit;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -100,7 +100,7 @@ void DataTypeAggregateFunction::deserializeBinary(IColumn & column, ReadBuffer &
|
||||
column_concrete.getData().push_back(place);
|
||||
}
|
||||
|
||||
void DataTypeAggregateFunction::serializeBinaryBulk(const IColumn & column, WriteBuffer & ostr, size_t offset, size_t limit) const
|
||||
void DataTypeAggregateFunction::serializeBinaryBulk(const IColumn & column, WriteBuffer & ostr, size_t offset, UInt64 limit) const
|
||||
{
|
||||
const ColumnAggregateFunction & real_column = typeid_cast<const ColumnAggregateFunction &>(column);
|
||||
const ColumnAggregateFunction::Container & vec = real_column.getData();
|
||||
@ -115,7 +115,7 @@ void DataTypeAggregateFunction::serializeBinaryBulk(const IColumn & column, Writ
|
||||
function->serialize(*it, ostr);
|
||||
}
|
||||
|
||||
void DataTypeAggregateFunction::deserializeBinaryBulk(IColumn & column, ReadBuffer & istr, size_t limit, double /*avg_value_size_hint*/) const
|
||||
void DataTypeAggregateFunction::deserializeBinaryBulk(IColumn & column, ReadBuffer & istr, UInt64 limit, double /*avg_value_size_hint*/) const
|
||||
{
|
||||
ColumnAggregateFunction & real_column = typeid_cast<ColumnAggregateFunction &>(column);
|
||||
ColumnAggregateFunction::Container & vec = real_column.getData();
|
||||
|
@ -44,8 +44,8 @@ public:
|
||||
|
||||
void serializeBinary(const IColumn & column, size_t row_num, WriteBuffer & ostr) const override;
|
||||
void deserializeBinary(IColumn & column, ReadBuffer & istr) const override;
|
||||
void serializeBinaryBulk(const IColumn & column, WriteBuffer & ostr, size_t offset, size_t limit) const override;
|
||||
void deserializeBinaryBulk(IColumn & column, ReadBuffer & istr, size_t limit, double avg_value_size_hint) const override;
|
||||
void serializeBinaryBulk(const IColumn & column, WriteBuffer & ostr, size_t offset, UInt64 limit) const override;
|
||||
void deserializeBinaryBulk(IColumn & column, ReadBuffer & istr, UInt64 limit, double avg_value_size_hint) const override;
|
||||
void serializeText(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const override;
|
||||
void serializeTextEscaped(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const override;
|
||||
void deserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings &) const override;
|
||||
|
@ -100,7 +100,7 @@ void DataTypeArray::deserializeBinary(IColumn & column, ReadBuffer & istr) const
|
||||
|
||||
namespace
|
||||
{
|
||||
void serializeArraySizesPositionIndependent(const IColumn & column, WriteBuffer & ostr, size_t offset, size_t limit)
|
||||
void serializeArraySizesPositionIndependent(const IColumn & column, WriteBuffer & ostr, size_t offset, UInt64 limit)
|
||||
{
|
||||
const ColumnArray & column_array = typeid_cast<const ColumnArray &>(column);
|
||||
const ColumnArray::Offsets & offset_values = column_array.getOffsets();
|
||||
@ -122,7 +122,7 @@ namespace
|
||||
}
|
||||
}
|
||||
|
||||
void deserializeArraySizesPositionIndependent(IColumn & column, ReadBuffer & istr, size_t limit)
|
||||
void deserializeArraySizesPositionIndependent(IColumn & column, ReadBuffer & istr, UInt64 limit)
|
||||
{
|
||||
ColumnArray & column_array = typeid_cast<ColumnArray &>(column);
|
||||
ColumnArray::Offsets & offset_values = column_array.getOffsets();
|
||||
@ -188,7 +188,7 @@ void DataTypeArray::deserializeBinaryBulkStatePrefix(
|
||||
void DataTypeArray::serializeBinaryBulkWithMultipleStreams(
|
||||
const IColumn & column,
|
||||
size_t offset,
|
||||
size_t limit,
|
||||
UInt64 limit,
|
||||
SerializeBinaryBulkSettings & settings,
|
||||
SerializeBinaryBulkStatePtr & state) const
|
||||
{
|
||||
@ -234,7 +234,7 @@ void DataTypeArray::serializeBinaryBulkWithMultipleStreams(
|
||||
|
||||
void DataTypeArray::deserializeBinaryBulkWithMultipleStreams(
|
||||
IColumn & column,
|
||||
size_t limit,
|
||||
UInt64 limit,
|
||||
DeserializeBinaryBulkSettings & settings,
|
||||
DeserializeBinaryBulkStatePtr & state) const
|
||||
{
|
||||
|
@ -74,13 +74,13 @@ public:
|
||||
void serializeBinaryBulkWithMultipleStreams(
|
||||
const IColumn & column,
|
||||
size_t offset,
|
||||
size_t limit,
|
||||
UInt64 limit,
|
||||
SerializeBinaryBulkSettings & settings,
|
||||
SerializeBinaryBulkStatePtr & state) const override;
|
||||
|
||||
void deserializeBinaryBulkWithMultipleStreams(
|
||||
IColumn & column,
|
||||
size_t limit,
|
||||
UInt64 limit,
|
||||
DeserializeBinaryBulkSettings & settings,
|
||||
DeserializeBinaryBulkStatePtr & state) const override;
|
||||
|
||||
|
@ -201,7 +201,7 @@ void DataTypeEnum<Type>::deserializeTextCSV(IColumn & column, ReadBuffer & istr,
|
||||
|
||||
template <typename Type>
|
||||
void DataTypeEnum<Type>::serializeBinaryBulk(
|
||||
const IColumn & column, WriteBuffer & ostr, const size_t offset, size_t limit) const
|
||||
const IColumn & column, WriteBuffer & ostr, const size_t offset, UInt64 limit) const
|
||||
{
|
||||
const auto & x = typeid_cast<const ColumnType &>(column).getData();
|
||||
const auto size = x.size();
|
||||
@ -214,7 +214,7 @@ void DataTypeEnum<Type>::serializeBinaryBulk(
|
||||
|
||||
template <typename Type>
|
||||
void DataTypeEnum<Type>::deserializeBinaryBulk(
|
||||
IColumn & column, ReadBuffer & istr, const size_t limit, const double /*avg_value_size_hint*/) const
|
||||
IColumn & column, ReadBuffer & istr, const UInt64 limit, const double /*avg_value_size_hint*/) const
|
||||
{
|
||||
auto & x = typeid_cast<ColumnType &>(column).getData();
|
||||
const auto initial_size = x.size();
|
||||
|
@ -102,8 +102,8 @@ public:
|
||||
void serializeTextCSV(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const override;
|
||||
void deserializeTextCSV(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const override;
|
||||
|
||||
void serializeBinaryBulk(const IColumn & column, WriteBuffer & ostr, const size_t offset, size_t limit) const override;
|
||||
void deserializeBinaryBulk(IColumn & column, ReadBuffer & istr, const size_t limit, const double avg_value_size_hint) const override;
|
||||
void serializeBinaryBulk(const IColumn & column, WriteBuffer & ostr, const size_t offset, UInt64 limit) const override;
|
||||
void deserializeBinaryBulk(IColumn & column, ReadBuffer & istr, const UInt64 limit, const double avg_value_size_hint) const override;
|
||||
|
||||
void serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf) const override;
|
||||
|
||||
|
@ -80,7 +80,7 @@ void DataTypeFixedString::deserializeBinary(IColumn & column, ReadBuffer & istr)
|
||||
}
|
||||
|
||||
|
||||
void DataTypeFixedString::serializeBinaryBulk(const IColumn & column, WriteBuffer & ostr, size_t offset, size_t limit) const
|
||||
void DataTypeFixedString::serializeBinaryBulk(const IColumn & column, WriteBuffer & ostr, size_t offset, UInt64 limit) const
|
||||
{
|
||||
const ColumnFixedString::Chars & data = typeid_cast<const ColumnFixedString &>(column).getChars();
|
||||
|
||||
@ -94,7 +94,7 @@ void DataTypeFixedString::serializeBinaryBulk(const IColumn & column, WriteBuffe
|
||||
}
|
||||
|
||||
|
||||
void DataTypeFixedString::deserializeBinaryBulk(IColumn & column, ReadBuffer & istr, size_t limit, double /*avg_value_size_hint*/) const
|
||||
void DataTypeFixedString::deserializeBinaryBulk(IColumn & column, ReadBuffer & istr, UInt64 limit, double /*avg_value_size_hint*/) const
|
||||
{
|
||||
ColumnFixedString::Chars & data = typeid_cast<ColumnFixedString &>(column).getChars();
|
||||
|
||||
|
@ -45,8 +45,8 @@ public:
|
||||
void serializeBinary(const IColumn & column, size_t row_num, WriteBuffer & ostr) const override;
|
||||
void deserializeBinary(IColumn & column, ReadBuffer & istr) const override;
|
||||
|
||||
void serializeBinaryBulk(const IColumn & column, WriteBuffer & ostr, size_t offset, size_t limit) const override;
|
||||
void deserializeBinaryBulk(IColumn & column, ReadBuffer & istr, size_t limit, double avg_value_size_hint) const override;
|
||||
void serializeBinaryBulk(const IColumn & column, WriteBuffer & ostr, size_t offset, UInt64 limit) const override;
|
||||
void deserializeBinaryBulk(IColumn & column, ReadBuffer & istr, UInt64 limit, double avg_value_size_hint) const override;
|
||||
|
||||
void serializeText(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const override;
|
||||
|
||||
|
@ -476,7 +476,7 @@ namespace
|
||||
void DataTypeLowCardinality::serializeBinaryBulkWithMultipleStreams(
|
||||
const IColumn & column,
|
||||
size_t offset,
|
||||
size_t limit,
|
||||
UInt64 limit,
|
||||
SerializeBinaryBulkSettings & settings,
|
||||
SerializeBinaryBulkStatePtr & state) const
|
||||
{
|
||||
@ -572,7 +572,7 @@ void DataTypeLowCardinality::serializeBinaryBulkWithMultipleStreams(
|
||||
|
||||
void DataTypeLowCardinality::deserializeBinaryBulkWithMultipleStreams(
|
||||
IColumn & column,
|
||||
size_t limit,
|
||||
UInt64 limit,
|
||||
DeserializeBinaryBulkSettings & settings,
|
||||
DeserializeBinaryBulkStatePtr & state) const
|
||||
{
|
||||
|
@ -39,13 +39,13 @@ public:
|
||||
void serializeBinaryBulkWithMultipleStreams(
|
||||
const IColumn & column,
|
||||
size_t offset,
|
||||
size_t limit,
|
||||
UInt64 limit,
|
||||
SerializeBinaryBulkSettings & settings,
|
||||
SerializeBinaryBulkStatePtr & state) const override;
|
||||
|
||||
void deserializeBinaryBulkWithMultipleStreams(
|
||||
IColumn & column,
|
||||
size_t limit,
|
||||
UInt64 limit,
|
||||
DeserializeBinaryBulkSettings & settings,
|
||||
DeserializeBinaryBulkStatePtr & state) const override;
|
||||
|
||||
|
@ -14,7 +14,7 @@ MutableColumnPtr DataTypeNothing::createColumn() const
|
||||
return ColumnNothing::create(0);
|
||||
}
|
||||
|
||||
void DataTypeNothing::serializeBinaryBulk(const IColumn & column, WriteBuffer & ostr, size_t offset, size_t limit) const
|
||||
void DataTypeNothing::serializeBinaryBulk(const IColumn & column, WriteBuffer & ostr, size_t offset, UInt64 limit) const
|
||||
{
|
||||
size_t size = column.size();
|
||||
|
||||
@ -25,7 +25,7 @@ void DataTypeNothing::serializeBinaryBulk(const IColumn & column, WriteBuffer &
|
||||
ostr.write('0');
|
||||
}
|
||||
|
||||
void DataTypeNothing::deserializeBinaryBulk(IColumn & column, ReadBuffer & istr, size_t limit, double /*avg_value_size_hint*/) const
|
||||
void DataTypeNothing::deserializeBinaryBulk(IColumn & column, ReadBuffer & istr, UInt64 limit, double /*avg_value_size_hint*/) const
|
||||
{
|
||||
typeid_cast<ColumnNothing &>(column).addSize(istr.tryIgnore(limit));
|
||||
}
|
||||
|
@ -21,8 +21,8 @@ public:
|
||||
MutableColumnPtr createColumn() const override;
|
||||
|
||||
/// These methods read and write zero bytes just to allow to figure out size of column.
|
||||
void serializeBinaryBulk(const IColumn & column, WriteBuffer & ostr, size_t offset, size_t limit) const override;
|
||||
void deserializeBinaryBulk(IColumn & column, ReadBuffer & istr, size_t limit, double avg_value_size_hint) const override;
|
||||
void serializeBinaryBulk(const IColumn & column, WriteBuffer & ostr, size_t offset, UInt64 limit) const override;
|
||||
void deserializeBinaryBulk(IColumn & column, ReadBuffer & istr, UInt64 limit, double avg_value_size_hint) const override;
|
||||
|
||||
bool equals(const IDataType & rhs) const override;
|
||||
|
||||
|
@ -80,7 +80,7 @@ void DataTypeNullable::deserializeBinaryBulkStatePrefix(
|
||||
void DataTypeNullable::serializeBinaryBulkWithMultipleStreams(
|
||||
const IColumn & column,
|
||||
size_t offset,
|
||||
size_t limit,
|
||||
UInt64 limit,
|
||||
SerializeBinaryBulkSettings & settings,
|
||||
SerializeBinaryBulkStatePtr & state) const
|
||||
{
|
||||
@ -101,7 +101,7 @@ void DataTypeNullable::serializeBinaryBulkWithMultipleStreams(
|
||||
|
||||
void DataTypeNullable::deserializeBinaryBulkWithMultipleStreams(
|
||||
IColumn & column,
|
||||
size_t limit,
|
||||
UInt64 limit,
|
||||
DeserializeBinaryBulkSettings & settings,
|
||||
DeserializeBinaryBulkStatePtr & state) const
|
||||
{
|
||||
|
@ -35,13 +35,13 @@ public:
|
||||
void serializeBinaryBulkWithMultipleStreams(
|
||||
const IColumn & column,
|
||||
size_t offset,
|
||||
size_t limit,
|
||||
UInt64 limit,
|
||||
SerializeBinaryBulkSettings & settings,
|
||||
SerializeBinaryBulkStatePtr & state) const override;
|
||||
|
||||
void deserializeBinaryBulkWithMultipleStreams(
|
||||
IColumn & column,
|
||||
size_t limit,
|
||||
UInt64 limit,
|
||||
DeserializeBinaryBulkSettings & settings,
|
||||
DeserializeBinaryBulkStatePtr & state) const override;
|
||||
|
||||
|
@ -1,4 +1,4 @@
|
||||
#include <type_traits>
|
||||
#include <type_traits>
|
||||
#include <DataTypes/DataTypeNumberBase.h>
|
||||
#include <Columns/ColumnVector.h>
|
||||
#include <Columns/ColumnConst.h>
|
||||
@ -179,7 +179,7 @@ void DataTypeNumberBase<T>::deserializeBinary(IColumn & column, ReadBuffer & ist
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
void DataTypeNumberBase<T>::serializeBinaryBulk(const IColumn & column, WriteBuffer & ostr, size_t offset, size_t limit) const
|
||||
void DataTypeNumberBase<T>::serializeBinaryBulk(const IColumn & column, WriteBuffer & ostr, size_t offset, UInt64 limit) const
|
||||
{
|
||||
const typename ColumnVector<T>::Container & x = typeid_cast<const ColumnVector<T> &>(column).getData();
|
||||
|
||||
@ -193,7 +193,7 @@ void DataTypeNumberBase<T>::serializeBinaryBulk(const IColumn & column, WriteBuf
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
void DataTypeNumberBase<T>::deserializeBinaryBulk(IColumn & column, ReadBuffer & istr, size_t limit, double /*avg_value_size_hint*/) const
|
||||
void DataTypeNumberBase<T>::deserializeBinaryBulk(IColumn & column, ReadBuffer & istr, UInt64 limit, double /*avg_value_size_hint*/) const
|
||||
{
|
||||
typename ColumnVector<T>::Container & x = typeid_cast<ColumnVector<T> &>(column).getData();
|
||||
size_t initial_size = x.size();
|
||||
|
@ -33,8 +33,8 @@ public:
|
||||
void deserializeBinary(Field & field, ReadBuffer & istr) const override;
|
||||
void serializeBinary(const IColumn & column, size_t row_num, WriteBuffer & ostr) const override;
|
||||
void deserializeBinary(IColumn & column, ReadBuffer & istr) const override;
|
||||
void serializeBinaryBulk(const IColumn & column, WriteBuffer & ostr, size_t offset, size_t limit) const override;
|
||||
void deserializeBinaryBulk(IColumn & column, ReadBuffer & istr, size_t limit, double avg_value_size_hint) const override;
|
||||
void serializeBinaryBulk(const IColumn & column, WriteBuffer & ostr, size_t offset, UInt64 limit) const override;
|
||||
void deserializeBinaryBulk(IColumn & column, ReadBuffer & istr, UInt64 limit, double avg_value_size_hint) const override;
|
||||
|
||||
void serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf) const override;
|
||||
|
||||
|
@ -78,7 +78,7 @@ void DataTypeString::deserializeBinary(IColumn & column, ReadBuffer & istr) cons
|
||||
}
|
||||
|
||||
|
||||
void DataTypeString::serializeBinaryBulk(const IColumn & column, WriteBuffer & ostr, size_t offset, size_t limit) const
|
||||
void DataTypeString::serializeBinaryBulk(const IColumn & column, WriteBuffer & ostr, size_t offset, UInt64 limit) const
|
||||
{
|
||||
const ColumnString & column_string = typeid_cast<const ColumnString &>(column);
|
||||
const ColumnString::Chars & data = column_string.getChars();
|
||||
@ -111,7 +111,7 @@ void DataTypeString::serializeBinaryBulk(const IColumn & column, WriteBuffer & o
|
||||
|
||||
|
||||
template <int UNROLL_TIMES>
|
||||
static NO_INLINE void deserializeBinarySSE2(ColumnString::Chars & data, ColumnString::Offsets & offsets, ReadBuffer & istr, size_t limit)
|
||||
static NO_INLINE void deserializeBinarySSE2(ColumnString::Chars & data, ColumnString::Offsets & offsets, ReadBuffer & istr, UInt64 limit)
|
||||
{
|
||||
size_t offset = data.size();
|
||||
for (size_t i = 0; i < limit; ++i)
|
||||
@ -171,7 +171,7 @@ static NO_INLINE void deserializeBinarySSE2(ColumnString::Chars & data, ColumnSt
|
||||
}
|
||||
|
||||
|
||||
void DataTypeString::deserializeBinaryBulk(IColumn & column, ReadBuffer & istr, size_t limit, double avg_value_size_hint) const
|
||||
void DataTypeString::deserializeBinaryBulk(IColumn & column, ReadBuffer & istr, UInt64 limit, double avg_value_size_hint) const
|
||||
{
|
||||
ColumnString & column_string = typeid_cast<ColumnString &>(column);
|
||||
ColumnString::Chars & data = column_string.getChars();
|
||||
|
@ -26,8 +26,8 @@ public:
|
||||
void serializeBinary(const IColumn & column, size_t row_num, WriteBuffer & ostr) const override;
|
||||
void deserializeBinary(IColumn & column, ReadBuffer & istr) const override;
|
||||
|
||||
void serializeBinaryBulk(const IColumn & column, WriteBuffer & ostr, size_t offset, size_t limit) const override;
|
||||
void deserializeBinaryBulk(IColumn & column, ReadBuffer & istr, size_t limit, double avg_value_size_hint) const override;
|
||||
void serializeBinaryBulk(const IColumn & column, WriteBuffer & ostr, size_t offset, UInt64 limit) const override;
|
||||
void deserializeBinaryBulk(IColumn & column, ReadBuffer & istr, UInt64 limit, double avg_value_size_hint) const override;
|
||||
|
||||
void serializeText(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const override;
|
||||
|
||||
|
@ -372,7 +372,7 @@ void DataTypeTuple::deserializeBinaryBulkStatePrefix(
|
||||
void DataTypeTuple::serializeBinaryBulkWithMultipleStreams(
|
||||
const IColumn & column,
|
||||
size_t offset,
|
||||
size_t limit,
|
||||
UInt64 limit,
|
||||
SerializeBinaryBulkSettings & settings,
|
||||
SerializeBinaryBulkStatePtr & state) const
|
||||
{
|
||||
@ -390,7 +390,7 @@ void DataTypeTuple::serializeBinaryBulkWithMultipleStreams(
|
||||
|
||||
void DataTypeTuple::deserializeBinaryBulkWithMultipleStreams(
|
||||
IColumn & column,
|
||||
size_t limit,
|
||||
UInt64 limit,
|
||||
DeserializeBinaryBulkSettings & settings,
|
||||
DeserializeBinaryBulkStatePtr & state) const
|
||||
{
|
||||
|
@ -67,13 +67,13 @@ public:
|
||||
void serializeBinaryBulkWithMultipleStreams(
|
||||
const IColumn & column,
|
||||
size_t offset,
|
||||
size_t limit,
|
||||
UInt64 limit,
|
||||
SerializeBinaryBulkSettings & settings,
|
||||
SerializeBinaryBulkStatePtr & state) const override;
|
||||
|
||||
void deserializeBinaryBulkWithMultipleStreams(
|
||||
IColumn & column,
|
||||
size_t limit,
|
||||
UInt64 limit,
|
||||
DeserializeBinaryBulkSettings & settings,
|
||||
DeserializeBinaryBulkStatePtr & state) const override;
|
||||
|
||||
|
@ -94,7 +94,7 @@ void DataTypeDecimal<T>::serializeBinary(const IColumn & column, size_t row_num,
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
void DataTypeDecimal<T>::serializeBinaryBulk(const IColumn & column, WriteBuffer & ostr, size_t offset, size_t limit) const
|
||||
void DataTypeDecimal<T>::serializeBinaryBulk(const IColumn & column, WriteBuffer & ostr, size_t offset, UInt64 limit) const
|
||||
{
|
||||
const typename ColumnType::Container & x = typeid_cast<const ColumnType &>(column).getData();
|
||||
|
||||
@ -124,7 +124,7 @@ void DataTypeDecimal<T>::deserializeBinary(IColumn & column, ReadBuffer & istr)
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
void DataTypeDecimal<T>::deserializeBinaryBulk(IColumn & column, ReadBuffer & istr, size_t limit, double) const
|
||||
void DataTypeDecimal<T>::deserializeBinaryBulk(IColumn & column, ReadBuffer & istr, UInt64 limit, double) const
|
||||
{
|
||||
typename ColumnType::Container & x = typeid_cast<ColumnType &>(column).getData();
|
||||
size_t initial_size = x.size();
|
||||
|
@ -94,11 +94,11 @@ public:
|
||||
|
||||
void serializeBinary(const Field & field, WriteBuffer & ostr) const override;
|
||||
void serializeBinary(const IColumn & column, size_t row_num, WriteBuffer & ostr) const override;
|
||||
void serializeBinaryBulk(const IColumn & column, WriteBuffer & ostr, size_t offset, size_t limit) const override;
|
||||
void serializeBinaryBulk(const IColumn & column, WriteBuffer & ostr, size_t offset, UInt64 limit) const override;
|
||||
|
||||
void deserializeBinary(Field & field, ReadBuffer & istr) const override;
|
||||
void deserializeBinary(IColumn & column, ReadBuffer & istr) const override;
|
||||
void deserializeBinaryBulk(IColumn & column, ReadBuffer & istr, size_t limit, double avg_value_size_hint) const override;
|
||||
void deserializeBinaryBulk(IColumn & column, ReadBuffer & istr, UInt64 limit, double avg_value_size_hint) const override;
|
||||
|
||||
void serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf) const override;
|
||||
|
||||
|
@ -173,7 +173,7 @@ public:
|
||||
virtual void serializeBinaryBulkWithMultipleStreams(
|
||||
const IColumn & column,
|
||||
size_t offset,
|
||||
size_t limit,
|
||||
UInt64 limit,
|
||||
SerializeBinaryBulkSettings & settings,
|
||||
SerializeBinaryBulkStatePtr & /*state*/) const
|
||||
{
|
||||
@ -184,7 +184,7 @@ public:
|
||||
/// Read no more than limit values and append them into column.
|
||||
virtual void deserializeBinaryBulkWithMultipleStreams(
|
||||
IColumn & column,
|
||||
size_t limit,
|
||||
UInt64 limit,
|
||||
DeserializeBinaryBulkSettings & settings,
|
||||
DeserializeBinaryBulkStatePtr & /*state*/) const
|
||||
{
|
||||
@ -194,8 +194,8 @@ public:
|
||||
|
||||
/** Override these methods for data types that require just single stream (most of data types).
|
||||
*/
|
||||
virtual void serializeBinaryBulk(const IColumn & column, WriteBuffer & ostr, size_t offset, size_t limit) const;
|
||||
virtual void deserializeBinaryBulk(IColumn & column, ReadBuffer & istr, size_t limit, double avg_value_size_hint) const;
|
||||
virtual void serializeBinaryBulk(const IColumn & column, WriteBuffer & ostr, size_t offset, UInt64 limit) const;
|
||||
virtual void deserializeBinaryBulk(IColumn & column, ReadBuffer & istr, UInt64 limit, double avg_value_size_hint) const;
|
||||
|
||||
/** Serialization/deserialization of individual values.
|
||||
*
|
||||
|
@ -36,7 +36,7 @@ bool LimitReadBuffer::nextImpl()
|
||||
}
|
||||
|
||||
|
||||
LimitReadBuffer::LimitReadBuffer(ReadBuffer & in, size_t limit, bool throw_exception, std::string exception_message)
|
||||
LimitReadBuffer::LimitReadBuffer(ReadBuffer & in, UInt64 limit, bool throw_exception, std::string exception_message)
|
||||
: ReadBuffer(in.position(), 0), in(in), limit(limit), throw_exception(throw_exception), exception_message(std::move(exception_message))
|
||||
{
|
||||
size_t remaining_bytes_in_buffer = in.buffer().end() - in.position();
|
||||
|
@ -14,14 +14,14 @@ class LimitReadBuffer : public ReadBuffer
|
||||
{
|
||||
private:
|
||||
ReadBuffer & in;
|
||||
size_t limit;
|
||||
UInt64 limit;
|
||||
bool throw_exception;
|
||||
std::string exception_message;
|
||||
|
||||
bool nextImpl() override;
|
||||
|
||||
public:
|
||||
LimitReadBuffer(ReadBuffer & in, size_t limit, bool throw_exception, std::string exception_message = {});
|
||||
LimitReadBuffer(ReadBuffer & in, UInt64 limit, bool throw_exception, std::string exception_message = {});
|
||||
~LimitReadBuffer() override;
|
||||
};
|
||||
|
||||
|
@ -17,7 +17,7 @@ int main(int argc, char ** argv)
|
||||
return 1;
|
||||
}
|
||||
|
||||
size_t limit = std::stol(argv[1]);
|
||||
UInt64 limit = std::stol(argv[1]);
|
||||
|
||||
ReadBufferFromFileDescriptor in(STDIN_FILENO);
|
||||
WriteBufferFromFileDescriptor out(STDOUT_FILENO);
|
||||
|
@ -21,6 +21,7 @@
|
||||
#include <DataStreams/RollupBlockInputStream.h>
|
||||
#include <DataStreams/CubeBlockInputStream.h>
|
||||
#include <DataStreams/ConvertColumnLowCardinalityToFullBlockInputStream.h>
|
||||
#include <DataStreams/ConvertingBlockInputStream.h>
|
||||
|
||||
#include <Parsers/ASTSelectQuery.h>
|
||||
#include <Parsers/ASTSelectWithUnionQuery.h>
|
||||
@ -34,10 +35,11 @@
|
||||
#include <Interpreters/InterpreterSelectWithUnionQuery.h>
|
||||
#include <Interpreters/InterpreterSetQuery.h>
|
||||
#include <Interpreters/evaluateConstantExpression.h>
|
||||
#include <Interpreters/convertFieldToType.h>
|
||||
#include <Interpreters/ExpressionAnalyzer.h>
|
||||
#include <Interpreters/DatabaseAndTableWithAlias.h>
|
||||
#include <Storages/MergeTree/MergeTreeWhereOptimizer.h>
|
||||
|
||||
#include <Storages/MergeTree/MergeTreeWhereOptimizer.h>
|
||||
#include <Storages/IStorage.h>
|
||||
#include <Storages/StorageMergeTree.h>
|
||||
#include <Storages/StorageReplicatedMergeTree.h>
|
||||
@ -52,7 +54,6 @@
|
||||
#include <Parsers/queryToString.h>
|
||||
#include <ext/map.h>
|
||||
#include <memory>
|
||||
#include <DataStreams/ConvertingBlockInputStream.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -721,26 +722,44 @@ void InterpreterSelectQuery::executeImpl(Pipeline & pipeline, const BlockInputSt
|
||||
}
|
||||
|
||||
|
||||
void InterpreterSelectQuery::getLimitLengthAndOffset(ASTSelectQuery & query, size_t & length, size_t & offset)
|
||||
static UInt64 getLimitUIntValue(const ASTPtr & node, const Context & context)
|
||||
{
|
||||
length = 0;
|
||||
offset = 0;
|
||||
if (query.limit_length)
|
||||
{
|
||||
getLimitUIntValue(query.limit_length, length);
|
||||
if (query.limit_offset)
|
||||
getLimitUIntValue(query.limit_offset, offset);
|
||||
}
|
||||
const auto & [field, type] = evaluateConstantExpression(node, context);
|
||||
|
||||
if (!isNumber(type))
|
||||
throw Exception("Illegal type " + type->getName() + " of LIMIT expression, must be numeric type", ErrorCodes::INVALID_LIMIT_EXPRESSION);
|
||||
|
||||
Field converted = convertFieldToType(field, DataTypeUInt64());
|
||||
if (converted.isNull())
|
||||
throw Exception("The value " + applyVisitor(FieldVisitorToString(), field) + " of LIMIT expression is not representable as UInt64", ErrorCodes::INVALID_LIMIT_EXPRESSION);
|
||||
|
||||
return converted.safeGet<UInt64>();
|
||||
}
|
||||
|
||||
|
||||
void InterpreterSelectQuery::getLimitUIntValue(const ASTPtr& ptr, size_t& result)
|
||||
static std::pair<UInt64, UInt64> getLimitLengthAndOffset(const ASTSelectQuery & query, const Context & context)
|
||||
{
|
||||
const auto& eval_result = evaluateConstantExpression(ptr, context);
|
||||
if (!isNumber(eval_result.second)) {
|
||||
throw Exception("Illegal limit expression", ErrorCodes::INVALID_LIMIT_EXPRESSION);
|
||||
UInt64 length = 0;
|
||||
UInt64 offset = 0;
|
||||
|
||||
if (query.limit_length)
|
||||
{
|
||||
length = getLimitUIntValue(query.limit_length, context);
|
||||
if (query.limit_offset)
|
||||
offset = getLimitUIntValue(query.limit_offset, context);
|
||||
}
|
||||
result = applyVisitor(FieldVisitorConvertToNumber<UInt64>(), eval_result.first);
|
||||
|
||||
return {length, offset};
|
||||
}
|
||||
|
||||
static UInt64 getLimitForSorting(ASTSelectQuery & query, const Context & context)
|
||||
{
|
||||
/// Partial sort can be done if there is LIMIT but no DISTINCT or LIMIT BY.
|
||||
if (!query.distinct && !query.limit_by_expression_list)
|
||||
{
|
||||
auto [limit_length, limit_offset] = getLimitLengthAndOffset(query, context);
|
||||
return limit_length + limit_offset;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
@ -891,10 +910,6 @@ void InterpreterSelectQuery::executeFetchColumns(
|
||||
+ ", maximum: " + settings.max_columns_to_read.toString(),
|
||||
ErrorCodes::TOO_MANY_COLUMNS);
|
||||
|
||||
size_t limit_length = 0;
|
||||
size_t limit_offset = 0;
|
||||
getLimitLengthAndOffset(query, limit_length, limit_offset);
|
||||
|
||||
/** With distributed query processing, almost no computations are done in the threads,
|
||||
* but wait and receive data from remote servers.
|
||||
* If we have 20 remote servers, and max_threads = 8, then it would not be very good
|
||||
@ -914,6 +929,8 @@ void InterpreterSelectQuery::executeFetchColumns(
|
||||
if (!max_block_size)
|
||||
throw Exception("Setting 'max_block_size' cannot be zero", ErrorCodes::PARAMETER_OUT_OF_BOUND);
|
||||
|
||||
auto [limit_length, limit_offset] = getLimitLengthAndOffset(query, context);
|
||||
|
||||
/** Optimization - if not specified DISTINCT, WHERE, GROUP, HAVING, ORDER, LIMIT BY but LIMIT is specified, and limit + offset < max_block_size,
|
||||
* then as the block size we will use limit + offset (not to read more from the table than requested),
|
||||
* and also set the number of threads to 1.
|
||||
@ -1244,26 +1261,11 @@ static SortDescription getSortDescription(ASTSelectQuery & query)
|
||||
return order_descr;
|
||||
}
|
||||
|
||||
size_t InterpreterSelectQuery::getLimitForSorting(ASTSelectQuery & query)
|
||||
{
|
||||
/// Partial sort can be done if there is LIMIT but no DISTINCT or LIMIT BY.
|
||||
size_t limit = 0;
|
||||
if (!query.distinct && !query.limit_by_expression_list)
|
||||
{
|
||||
size_t limit_length = 0;
|
||||
size_t limit_offset = 0;
|
||||
getLimitLengthAndOffset(query, limit_length, limit_offset);
|
||||
limit = limit_length + limit_offset;
|
||||
}
|
||||
|
||||
return limit;
|
||||
}
|
||||
|
||||
|
||||
void InterpreterSelectQuery::executeOrder(Pipeline & pipeline)
|
||||
{
|
||||
SortDescription order_descr = getSortDescription(query);
|
||||
size_t limit = getLimitForSorting(query);
|
||||
UInt64 limit = getLimitForSorting(query, context);
|
||||
|
||||
const Settings & settings = context.getSettingsRef();
|
||||
|
||||
@ -1294,7 +1296,7 @@ void InterpreterSelectQuery::executeOrder(Pipeline & pipeline)
|
||||
void InterpreterSelectQuery::executeMergeSorted(Pipeline & pipeline)
|
||||
{
|
||||
SortDescription order_descr = getSortDescription(query);
|
||||
size_t limit = getLimitForSorting(query);
|
||||
UInt64 limit = getLimitForSorting(query, context);
|
||||
|
||||
const Settings & settings = context.getSettingsRef();
|
||||
|
||||
@ -1333,11 +1335,8 @@ void InterpreterSelectQuery::executeDistinct(Pipeline & pipeline, bool before_or
|
||||
{
|
||||
const Settings & settings = context.getSettingsRef();
|
||||
|
||||
size_t limit_length = 0;
|
||||
size_t limit_offset = 0;
|
||||
getLimitLengthAndOffset(query, limit_length, limit_offset);
|
||||
|
||||
size_t limit_for_distinct = 0;
|
||||
auto [limit_length, limit_offset] = getLimitLengthAndOffset(query, context);
|
||||
UInt64 limit_for_distinct = 0;
|
||||
|
||||
/// If after this stage of DISTINCT ORDER BY is not executed, then you can get no more than limit_length + limit_offset of different rows.
|
||||
if (!query.order_expression_list || !before_order)
|
||||
@ -1374,16 +1373,13 @@ void InterpreterSelectQuery::executeUnion(Pipeline & pipeline)
|
||||
/// Preliminary LIMIT - is used in every source, if there are several sources, before they are combined.
|
||||
void InterpreterSelectQuery::executePreLimit(Pipeline & pipeline)
|
||||
{
|
||||
size_t limit_length = 0;
|
||||
size_t limit_offset = 0;
|
||||
getLimitLengthAndOffset(query, limit_length, limit_offset);
|
||||
|
||||
/// If there is LIMIT
|
||||
if (query.limit_length)
|
||||
{
|
||||
pipeline.transform([&](auto & stream)
|
||||
auto [limit_length, limit_offset] = getLimitLengthAndOffset(query, context);
|
||||
pipeline.transform([&, limit = limit_length + limit_offset](auto & stream)
|
||||
{
|
||||
stream = std::make_shared<LimitBlockInputStream>(stream, limit_length + limit_offset, 0, false);
|
||||
stream = std::make_shared<LimitBlockInputStream>(stream, limit, 0, false);
|
||||
});
|
||||
}
|
||||
}
|
||||
@ -1432,10 +1428,6 @@ bool hasWithTotalsInAnySubqueryInFromClause(const ASTSelectQuery & query)
|
||||
|
||||
void InterpreterSelectQuery::executeLimit(Pipeline & pipeline)
|
||||
{
|
||||
size_t limit_length = 0;
|
||||
size_t limit_offset = 0;
|
||||
getLimitLengthAndOffset(query, limit_length, limit_offset);
|
||||
|
||||
/// If there is LIMIT
|
||||
if (query.limit_length)
|
||||
{
|
||||
@ -1456,6 +1448,10 @@ void InterpreterSelectQuery::executeLimit(Pipeline & pipeline)
|
||||
if (!query.group_by_with_totals && hasWithTotalsInAnySubqueryInFromClause(query))
|
||||
always_read_till_end = true;
|
||||
|
||||
UInt64 limit_length;
|
||||
UInt64 limit_offset;
|
||||
std::tie(limit_length, limit_offset) = getLimitLengthAndOffset(query, context);
|
||||
|
||||
pipeline.transform([&](auto & stream)
|
||||
{
|
||||
stream = std::make_shared<LimitBlockInputStream>(stream, limit_length, limit_offset, always_read_till_end);
|
||||
|
@ -171,11 +171,6 @@ private:
|
||||
*/
|
||||
void getDatabaseAndTableNames(String & database_name, String & table_name);
|
||||
|
||||
size_t getLimitForSorting(ASTSelectQuery & query);
|
||||
|
||||
void getLimitLengthAndOffset(ASTSelectQuery & query, size_t & length, size_t & offset);
|
||||
|
||||
void getLimitUIntValue(const ASTPtr& ptr, size_t& result);
|
||||
/// Different stages of query execution.
|
||||
|
||||
/// dry_run - don't read from table, use empty header block instead.
|
||||
|
@ -95,7 +95,7 @@ struct PartialSortingLessWithCollation
|
||||
}
|
||||
};
|
||||
|
||||
void sortBlock(Block & block, const SortDescription & description, size_t limit)
|
||||
void sortBlock(Block & block, const SortDescription & description, UInt64 limit)
|
||||
{
|
||||
if (!block)
|
||||
return;
|
||||
|
@ -8,7 +8,7 @@ namespace DB
|
||||
{
|
||||
|
||||
/// Sort one block by `description`. If limit != 0, then the partial sort of the first `limit` rows is produced.
|
||||
void sortBlock(Block & block, const SortDescription & description, size_t limit = 0);
|
||||
void sortBlock(Block & block, const SortDescription & description, UInt64 limit = 0);
|
||||
|
||||
|
||||
/** Used only in StorageMergeTree to sort the data with INSERT.
|
||||
|
@ -37,7 +37,7 @@ struct IMergeTreeIndexGranule
|
||||
/// Updates the stored info using rows of the specified block.
|
||||
/// Reads no more than `limit` rows.
|
||||
/// After finishing updating `pos` will store the position of the first row which was not read.
|
||||
virtual void update(const Block & block, size_t * pos, size_t limit) = 0;
|
||||
virtual void update(const Block & block, size_t * pos, UInt64 limit) = 0;
|
||||
};
|
||||
|
||||
using MergeTreeIndexGranulePtr = std::shared_ptr<IMergeTreeIndexGranule>;
|
||||
|
@ -52,7 +52,7 @@ void MergeTreeMinMaxGranule::deserializeBinary(ReadBuffer & istr)
|
||||
}
|
||||
}
|
||||
|
||||
void MergeTreeMinMaxGranule::update(const Block & block, size_t * pos, size_t limit)
|
||||
void MergeTreeMinMaxGranule::update(const Block & block, size_t * pos, UInt64 limit)
|
||||
{
|
||||
if (*pos >= block.rows())
|
||||
throw Exception(
|
||||
|
@ -21,7 +21,7 @@ struct MergeTreeMinMaxGranule : public IMergeTreeIndexGranule
|
||||
void deserializeBinary(ReadBuffer & istr) override;
|
||||
|
||||
bool empty() const override { return parallelogram.empty(); }
|
||||
void update(const Block & block, size_t * pos, size_t limit) override;
|
||||
void update(const Block & block, size_t * pos, UInt64 limit) override;
|
||||
|
||||
~MergeTreeMinMaxGranule() override = default;
|
||||
|
||||
|
@ -78,7 +78,7 @@ void MergeTreeSetIndexGranule::deserializeBinary(ReadBuffer & istr)
|
||||
set->insertFromBlock(block);
|
||||
}
|
||||
|
||||
void MergeTreeSetIndexGranule::update(const Block & new_block, size_t * pos, size_t limit)
|
||||
void MergeTreeSetIndexGranule::update(const Block & new_block, size_t * pos, UInt64 limit)
|
||||
{
|
||||
if (*pos >= new_block.rows())
|
||||
throw Exception(
|
||||
|
@ -24,7 +24,7 @@ struct MergeTreeSetIndexGranule : public IMergeTreeIndexGranule
|
||||
size_t size() const { return set->getTotalRowCount(); }
|
||||
bool empty() const override { return !size(); }
|
||||
|
||||
void update(const Block & block, size_t * pos, size_t limit) override;
|
||||
void update(const Block & block, size_t * pos, UInt64 limit) override;
|
||||
Block getElementsBlock() const;
|
||||
|
||||
~MergeTreeSetIndexGranule() override = default;
|
||||
|
@ -110,7 +110,7 @@ void IMergedBlockOutputStream::writeData(
|
||||
size_t prev_mark = 0;
|
||||
while (prev_mark < size)
|
||||
{
|
||||
size_t limit = 0;
|
||||
UInt64 limit = 0;
|
||||
|
||||
/// If there is `index_offset`, then the first mark goes not immediately, but after this number of rows.
|
||||
if (prev_mark == 0 && index_offset != 0)
|
||||
@ -551,7 +551,7 @@ void MergedBlockOutputStream::writeImpl(const Block & block, const IColumn::Perm
|
||||
|
||||
while (prev_pos < rows)
|
||||
{
|
||||
size_t limit = 0;
|
||||
UInt64 limit = 0;
|
||||
if (prev_pos == 0 && index_offset != 0)
|
||||
{
|
||||
limit = index_offset;
|
||||
|
@ -141,7 +141,7 @@ public:
|
||||
const size_t columns = block.columns();
|
||||
const size_t rows = block.rows();
|
||||
size_t offsets = 0;
|
||||
size_t limits = max_batch_rows;
|
||||
UInt64 limits = max_batch_rows;
|
||||
for (size_t idx = 0; idx < splited_block_size; ++idx)
|
||||
{
|
||||
/// For last batch, limits should be the remain size
|
||||
|
@ -100,7 +100,7 @@ private:
|
||||
using DeserializeStates = std::map<String, DeserializeState>;
|
||||
DeserializeStates deserialize_states;
|
||||
|
||||
void readData(const String & name, const IDataType & type, IColumn & column, size_t limit);
|
||||
void readData(const String & name, const IDataType & type, IColumn & column, UInt64 limit);
|
||||
};
|
||||
|
||||
|
||||
@ -214,7 +214,7 @@ Block TinyLogBlockInputStream::readImpl()
|
||||
}
|
||||
|
||||
|
||||
void TinyLogBlockInputStream::readData(const String & name, const IDataType & type, IColumn & column, size_t limit)
|
||||
void TinyLogBlockInputStream::readData(const String & name, const IDataType & type, IColumn & column, UInt64 limit)
|
||||
{
|
||||
IDataType::DeserializeBinaryBulkSettings settings; /// TODO Use avg_value_size_hint.
|
||||
settings.getter = [&] (const IDataType::SubstreamPath & path) -> ReadBuffer *
|
||||
|
Loading…
Reference in New Issue
Block a user