2022-04-26 18:14:09 +00:00
# pragma once
# include <Compression/ICompressionCodec.h>
# include <qpl/qpl.h>
2022-07-09 16:03:25 +00:00
# include <random>
2022-07-07 14:04:17 +00:00
2022-04-26 18:14:09 +00:00
namespace Poco
{
class Logger ;
}
namespace DB
{
2022-07-07 18:44:42 +00:00
/// DeflateJobHWPool is resource pool for provide the job objects which is required to save context infomation during offload asynchronous compression to IAA.
2022-04-26 18:14:09 +00:00
class DeflateJobHWPool
{
public :
DeflateJobHWPool ( ) ;
~ DeflateJobHWPool ( ) ;
static DeflateJobHWPool & instance ( ) ;
2022-07-07 16:10:06 +00:00
static constexpr auto JOB_POOL_SIZE = 1024 ;
2022-04-26 18:14:09 +00:00
static constexpr qpl_path_t PATH = qpl_path_hardware ;
2022-07-08 16:20:24 +00:00
static qpl_job * hw_job_pool [ JOB_POOL_SIZE ] ;
static std : : atomic_bool hw_job_locks [ JOB_POOL_SIZE ] ;
2022-07-07 21:21:51 +00:00
static bool job_pool_ready ;
2022-04-26 18:14:09 +00:00
2022-07-07 21:13:20 +00:00
bool & jobPoolReady ( ) { return job_pool_ready ; }
2022-07-07 18:14:47 +00:00
2022-07-07 14:26:57 +00:00
qpl_job * acquireJob ( uint32_t * job_id )
2022-04-26 18:14:09 +00:00
{
2022-07-07 18:14:47 +00:00
if ( jobPoolReady ( ) )
2022-04-26 18:14:09 +00:00
{
2022-06-08 13:28:35 +00:00
uint32_t retry = 0 ;
2022-07-09 16:03:25 +00:00
auto index = distribution ( random_engine ) ;
2022-06-08 13:28:35 +00:00
while ( tryLockJob ( index ) = = false )
2022-04-26 18:14:09 +00:00
{
2022-07-09 16:03:25 +00:00
index = distribution ( random_engine ) ;
2022-06-08 13:28:35 +00:00
retry + + ;
2022-07-07 16:10:06 +00:00
if ( retry > JOB_POOL_SIZE )
2022-06-08 13:28:35 +00:00
{
return nullptr ;
}
2022-04-26 18:14:09 +00:00
}
2022-07-07 16:10:06 +00:00
* job_id = JOB_POOL_SIZE - index ;
2022-07-08 16:20:24 +00:00
return hw_job_pool [ index ] ;
2022-06-08 13:28:35 +00:00
}
else
{
return nullptr ;
2022-04-26 18:14:09 +00:00
}
}
2022-07-07 18:14:47 +00:00
2022-07-07 14:26:57 +00:00
qpl_job * releaseJob ( uint32_t job_id )
2022-04-26 18:14:09 +00:00
{
2022-07-07 18:14:47 +00:00
if ( jobPoolReady ( ) )
2022-06-08 13:28:35 +00:00
{
2022-07-07 16:10:06 +00:00
uint32_t index = JOB_POOL_SIZE - job_id ;
2022-06-08 13:28:35 +00:00
ReleaseJobObjectGuard _ ( index ) ;
2022-07-08 16:20:24 +00:00
return hw_job_pool [ index ] ;
2022-06-08 13:28:35 +00:00
}
else
{
return nullptr ;
}
2022-04-26 18:14:09 +00:00
}
2022-07-07 18:14:47 +00:00
2022-04-26 18:14:09 +00:00
private :
2022-07-07 14:26:57 +00:00
bool tryLockJob ( size_t index )
2022-04-26 18:14:09 +00:00
{
bool expected = false ;
2022-07-07 21:13:20 +00:00
assert ( index < JOB_POOL_SIZE ) ;
2022-07-08 16:20:24 +00:00
return hw_job_locks [ index ] . compare_exchange_strong ( expected , true ) ;
2022-04-26 18:14:09 +00:00
}
2022-07-08 16:20:24 +00:00
void unLockJob ( uint32_t index ) { hw_job_locks [ index ] . store ( false ) ; }
2022-07-08 16:09:58 +00:00
2022-07-07 21:13:20 +00:00
class ReleaseJobObjectGuard
2022-04-26 18:14:09 +00:00
{
uint32_t index ;
ReleaseJobObjectGuard ( ) = delete ;
public :
2022-07-07 21:13:20 +00:00
ReleaseJobObjectGuard ( const uint32_t index_ ) : index ( index_ )
2022-04-26 18:14:09 +00:00
{
}
2022-07-07 18:14:47 +00:00
2022-07-07 14:26:57 +00:00
~ ReleaseJobObjectGuard ( )
2022-04-26 18:14:09 +00:00
{
2022-07-08 16:20:24 +00:00
hw_job_locks [ index ] . store ( false ) ;
2022-04-26 18:14:09 +00:00
}
} ;
2022-07-08 16:20:24 +00:00
std : : unique_ptr < uint8_t [ ] > hw_job_pool_buffer ;
2022-06-08 15:47:44 +00:00
Poco : : Logger * log ;
2022-07-09 16:03:25 +00:00
std : : mt19937 random_engine ;
std : : uniform_int_distribution < int > distribution ;
2022-06-08 15:47:44 +00:00
} ;
2022-07-07 18:14:47 +00:00
2022-06-08 15:47:44 +00:00
class SoftwareCodecDeflate
{
public :
~ SoftwareCodecDeflate ( ) ;
uint32_t doCompressData ( const char * source , uint32_t source_size , char * dest , uint32_t dest_size ) ;
void doDecompressData ( const char * source , uint32_t source_size , char * dest , uint32_t uncompressed_size ) ;
private :
2022-07-09 14:09:23 +00:00
qpl_job * sw_job = nullptr ;
2022-06-08 15:47:44 +00:00
qpl_job * getJobCodecPtr ( ) ;
2022-04-26 18:14:09 +00:00
} ;
2022-06-08 15:47:44 +00:00
class HardwareCodecDeflate
{
public :
2022-07-09 15:12:41 +00:00
/// RET_ERROR stands for hardware codec fail,need fallback to software codec.
static constexpr int32_t RET_ERROR = - 1 ;
2022-06-08 15:47:44 +00:00
HardwareCodecDeflate ( ) ;
~ HardwareCodecDeflate ( ) ;
2022-07-09 15:12:41 +00:00
int32_t doCompressData ( const char * source , uint32_t source_size , char * dest , uint32_t dest_size ) const ;
int32_t doDecompressData ( const char * source , uint32_t source_size , char * dest , uint32_t uncompressed_size ) const ;
int32_t doDecompressDataReq ( const char * source , uint32_t source_size , char * dest , uint32_t uncompressed_size ) ;
2022-07-09 17:54:23 +00:00
/// Flush result for previous asynchronous decompression requests.Must be used following with several calls of doDecompressDataReq.
2022-07-07 18:14:47 +00:00
void flushAsynchronousDecompressRequests ( ) ;
2022-06-08 15:47:44 +00:00
private :
2022-07-09 14:09:23 +00:00
/// Asynchronous job map for decompression: job ID - job object.
/// For each submission, push job ID && job object into this map;
/// For flush, pop out job ID && job object from this map. Use job ID to release job lock and use job object to check job status till complete.
2022-07-08 16:20:24 +00:00
std : : map < uint32_t , qpl_job * > decomp_async_job_map ;
2022-06-08 15:47:44 +00:00
Poco : : Logger * log ;
} ;
2022-04-26 18:14:09 +00:00
class CompressionCodecDeflate : public ICompressionCodec
{
public :
CompressionCodecDeflate ( ) ;
uint8_t getMethodByte ( ) const override ;
void updateHash ( SipHash & hash ) const override ;
protected :
bool isCompression ( ) const override
{
return true ;
}
2022-07-07 18:14:47 +00:00
2022-04-26 18:14:09 +00:00
bool isGenericCompression ( ) const override
{
return true ;
}
2022-07-07 18:14:47 +00:00
2022-04-26 18:14:09 +00:00
uint32_t doCompressData ( const char * source , uint32_t source_size , char * dest ) const override ;
void doDecompressData ( const char * source , uint32_t source_size , char * dest , uint32_t uncompressed_size ) const override ;
2022-07-09 17:54:23 +00:00
///Flush result for previous asynchronous decompression requests on asynchronous mode.
2022-07-07 18:14:47 +00:00
void flushAsynchronousDecompressRequests ( ) override ;
2022-04-26 18:14:09 +00:00
private :
uint32_t getMaxCompressedDataSize ( uint32_t uncompressed_size ) const override ;
2022-07-07 18:14:47 +00:00
std : : unique_ptr < HardwareCodecDeflate > hw_codec ;
std : : unique_ptr < SoftwareCodecDeflate > sw_codec ;
2022-04-26 18:14:09 +00:00
} ;
}