2014-11-21 12:29:33 +00:00
# pragma once
# include <limits>
# include <algorithm>
# include <climits>
# include <sstream>
2015-11-15 06:11:58 +00:00
# include <DB/AggregateFunctions/ReservoirSampler.h>
2015-09-29 19:19:54 +00:00
# include <common/Common.h>
2015-09-24 12:40:36 +00:00
# include <DB/Common/HashTable/Hash.h>
2014-11-21 12:29:33 +00:00
# include <DB/IO/ReadBuffer.h>
# include <DB/IO/ReadHelpers.h>
# include <DB/IO/WriteHelpers.h>
2015-11-15 05:52:41 +00:00
# include <DB/Common/PODArray.h>
2014-11-21 12:29:33 +00:00
# include <Poco/Exception.h>
# include <boost/random.hpp>
2017-03-09 00:56:38 +00:00
/// Implementation of Reservoir Sampling algorithm. Incrementally selects from the added objects a random subset of the `sample_count` size.
/// Can approximately get quantiles.
/// The `quantile` call takes O(sample_count log sample_count), if after the previous call `quantile` there was at least one call to insert. Otherwise, O(1).
/// That is, it makes sense to first add, then get quantiles without adding.
2014-11-21 12:29:33 +00:00
2016-01-12 02:21:15 +00:00
namespace DB
{
namespace ErrorCodes
{
extern const int MEMORY_LIMIT_EXCEEDED ;
}
}
2014-11-21 12:29:33 +00:00
namespace detail
{
const size_t DEFAULT_SAMPLE_COUNT = 8192 ;
2014-11-21 14:07:25 +00:00
const auto MAX_SKIP_DEGREE = sizeof ( UInt32 ) * 8 ;
2014-11-21 12:29:33 +00:00
}
2017-03-09 00:56:38 +00:00
/// What if there is not a single value - throw an exception, or return 0 or NaN in the case of double?
2014-11-21 12:29:33 +00:00
enum class ReservoirSamplerDeterministicOnEmpty
{
THROW ,
RETURN_NAN_OR_ZERO ,
} ;
2015-09-11 08:22:58 +00:00
template < typename T ,
ReservoirSamplerDeterministicOnEmpty OnEmpty = ReservoirSamplerDeterministicOnEmpty : : THROW >
2014-11-21 12:29:33 +00:00
class ReservoirSamplerDeterministic
{
bool good ( const UInt32 hash )
{
return hash = = ( ( hash > > skip_degree ) < < skip_degree ) ;
}
public :
ReservoirSamplerDeterministic ( const size_t sample_count = DEFAULT_SAMPLE_COUNT )
: sample_count { sample_count }
{
}
void clear ( )
{
samples . clear ( ) ;
sorted = false ;
total_values = 0 ;
}
void insert ( const T & v , const UInt64 determinator )
{
const UInt32 hash = intHash64 ( determinator ) ;
if ( ! good ( hash ) )
return ;
insertImpl ( v , hash ) ;
sorted = false ;
+ + total_values ;
}
size_t size ( ) const
{
return total_values ;
}
T quantileNearest ( double level )
{
if ( samples . empty ( ) )
return onEmpty < T > ( ) ;
sortIfNeeded ( ) ;
double index = level * ( samples . size ( ) - 1 ) ;
size_t int_index = static_cast < size_t > ( index + 0.5 ) ;
int_index = std : : max ( 0LU , std : : min ( samples . size ( ) - 1 , int_index ) ) ;
return samples [ int_index ] . first ;
}
2017-03-09 04:26:17 +00:00
/** If T is not a numeric type, using this method causes a compilation error,
* but use of error class does not cause . SFINAE .
* Not SFINAE . Functions members of type templates are simply not checked until they are used .
2014-11-21 12:29:33 +00:00
*/
double quantileInterpolated ( double level )
{
if ( samples . empty ( ) )
return onEmpty < double > ( ) ;
sortIfNeeded ( ) ;
const double index = std : : max ( 0. , std : : min ( samples . size ( ) - 1. , level * ( samples . size ( ) - 1 ) ) ) ;
2017-03-09 04:26:17 +00:00
/// To get a value from a fractional index, we linearly interpolate between adjacent values.
2014-11-21 12:29:33 +00:00
size_t left_index = static_cast < size_t > ( index ) ;
size_t right_index = left_index + 1 ;
if ( right_index = = samples . size ( ) )
return samples [ left_index ] . first ;
const double left_coef = right_index - index ;
const double right_coef = index - left_index ;
return samples [ left_index ] . first * left_coef + samples [ right_index ] . first * right_coef ;
}
void merge ( const ReservoirSamplerDeterministic & b )
{
if ( sample_count ! = b . sample_count )
throw Poco : : Exception ( " Cannot merge ReservoirSamplerDeterministic's with different sample_count " ) ;
sorted = false ;
if ( b . skip_degree > skip_degree )
{
skip_degree = b . skip_degree ;
thinOut ( ) ;
}
2015-09-10 15:07:29 +00:00
for ( const auto & sample : b . samples )
if ( good ( sample . second ) )
insertImpl ( sample . first , sample . second ) ;
2014-11-21 12:29:33 +00:00
total_values + = b . total_values ;
}
void read ( DB : : ReadBuffer & buf )
{
DB : : readIntBinary < size_t > ( sample_count , buf ) ;
DB : : readIntBinary < size_t > ( total_values , buf ) ;
samples . resize ( std : : min ( total_values , sample_count ) ) ;
for ( size_t i = 0 ; i < samples . size ( ) ; + + i )
2015-09-10 15:07:29 +00:00
DB : : readPODBinary ( samples [ i ] , buf ) ;
2014-11-21 12:29:33 +00:00
sorted = false ;
}
void write ( DB : : WriteBuffer & buf ) const
{
DB : : writeIntBinary < size_t > ( sample_count , buf ) ;
DB : : writeIntBinary < size_t > ( total_values , buf ) ;
for ( size_t i = 0 ; i < std : : min ( sample_count , total_values ) ; + + i )
2015-09-10 15:07:29 +00:00
DB : : writePODBinary ( samples [ i ] , buf ) ;
2014-11-21 12:29:33 +00:00
}
private :
2017-03-09 04:26:17 +00:00
/// We allocate some memory on the stack to avoid allocations when there are many objects with a small number of elements.
2015-11-15 05:52:41 +00:00
static constexpr size_t bytes_on_stack = 64 ;
using Element = std : : pair < T , UInt32 > ;
using Array = DB : : PODArray < Element , bytes_on_stack / sizeof ( Element ) , AllocatorWithStackMemory < Allocator < false > , bytes_on_stack > > ;
2014-11-21 12:29:33 +00:00
size_t sample_count ;
size_t total_values { } ;
bool sorted { } ;
2015-11-15 05:52:41 +00:00
Array samples ;
2014-11-21 12:29:33 +00:00
UInt8 skip_degree { } ;
2015-09-11 08:22:58 +00:00
void insertImpl ( const T & v , const UInt32 hash )
{
/// @todo why + 1? I don't quite recall
while ( samples . size ( ) + 1 > = sample_count )
{
if ( + + skip_degree > detail : : MAX_SKIP_DEGREE )
throw DB : : Exception { " skip_degree exceeds maximum value " , DB : : ErrorCodes : : MEMORY_LIMIT_EXCEEDED } ;
thinOut ( ) ;
}
samples . emplace_back ( v , hash ) ;
}
void thinOut ( )
{
auto size = samples . size ( ) ;
for ( size_t i = 0 ; i < size ; )
{
if ( ! good ( samples [ i ] . second ) )
{
/// swap current element with the last one
std : : swap ( samples [ size - 1 ] , samples [ i ] ) ;
- - size ;
}
else
+ + i ;
}
if ( size ! = samples . size ( ) )
{
samples . resize ( size ) ;
sorted = false ;
}
}
2014-11-21 12:29:33 +00:00
void sortIfNeeded ( )
{
if ( sorted )
return ;
sorted = true ;
std : : sort ( samples . begin ( ) , samples . end ( ) , [ ] ( const std : : pair < T , UInt32 > & lhs , const std : : pair < T , UInt32 > & rhs ) {
return lhs . first < rhs . first ;
} ) ;
}
template < typename ResultType >
ResultType onEmpty ( ) const
{
if ( OnEmpty = = ReservoirSamplerDeterministicOnEmpty : : THROW )
throw Poco : : Exception ( " Quantile of empty ReservoirSamplerDeterministic " ) ;
else
return NanLikeValueConstructor < ResultType , std : : is_floating_point < ResultType > : : value > : : getValue ( ) ;
}
} ;