ClickHouse/src/Compression/CompressionCodecDeflateQpl.h

117 lines
3.9 KiB
C++
Raw Normal View History

#pragma once
#include <Compression/ICompressionCodec.h>
#include <map>
2022-07-09 16:03:25 +00:00
#include <random>
#include <qpl/qpl.h>
2022-07-07 14:04:17 +00:00
namespace Poco
{
class Logger;
}
namespace DB
{
2022-07-19 21:48:41 +00:00
/// DeflateQplJobHWPool is resource pool to provide the job objects.
/// Job object is used for storing context information during offloading compression job to HW Accelerator.
2022-07-09 18:42:01 +00:00
class DeflateQplJobHWPool
{
public:
2022-07-09 18:42:01 +00:00
DeflateQplJobHWPool();
~DeflateQplJobHWPool();
static DeflateQplJobHWPool & instance();
2022-07-09 18:57:38 +00:00
qpl_job * acquireJob(UInt32 & job_id);
void releaseJob(UInt32 job_id);
const bool & isJobPoolReady() { return job_pool_ready; }
private:
bool tryLockJob(UInt32 index);
void unLockJob(UInt32 index);
/// size of each job objects
UInt32 per_job_size;
2022-07-15 20:12:26 +00:00
/// Maximum jobs running in parallel supported by IAA hardware
2023-04-26 15:23:07 +00:00
UInt32 max_hw_jobs;
2022-07-11 16:08:35 +00:00
/// Entire buffer for storing all job objects
std::unique_ptr<uint8_t[]> hw_jobs_buffer;
2022-07-11 16:08:35 +00:00
/// Locks for accessing each job object pointers
std::unique_ptr<std::atomic_bool[]> hw_job_ptr_locks;
bool job_pool_ready;
2022-07-09 16:03:25 +00:00
std::mt19937 random_engine;
std::uniform_int_distribution<int> distribution;
};
2022-07-09 18:42:01 +00:00
class SoftwareCodecDeflateQpl
{
public:
2022-07-09 18:42:01 +00:00
~SoftwareCodecDeflateQpl();
2022-07-15 19:32:10 +00:00
UInt32 doCompressData(const char * source, UInt32 source_size, char * dest, UInt32 dest_size);
void doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size);
private:
2022-07-09 14:09:23 +00:00
qpl_job * sw_job = nullptr;
2022-07-13 20:12:15 +00:00
std::unique_ptr<uint8_t[]> sw_buffer;
qpl_job * getJobCodecPtr();
};
2022-07-09 18:42:01 +00:00
class HardwareCodecDeflateQpl
{
public:
/// RET_ERROR stands for hardware codec fail, needs fallback to software codec.
2022-07-15 19:32:10 +00:00
static constexpr Int32 RET_ERROR = -1;
2022-07-09 15:12:41 +00:00
2022-07-09 18:42:01 +00:00
HardwareCodecDeflateQpl();
~HardwareCodecDeflateQpl();
2022-07-15 19:32:10 +00:00
Int32 doCompressData(const char * source, UInt32 source_size, char * dest, UInt32 dest_size) const;
/// Submit job request to the IAA hardware and then busy waiting till it complete.
Int32 doDecompressDataSynchronous(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size);
/// Submit job request to the IAA hardware and return immediately. IAA hardware will process decompression jobs automatically.
2022-07-15 19:32:10 +00:00
Int32 doDecompressDataAsynchronous(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size);
/// Flush result for all previous requests which means busy waiting till all the jobs in "decomp_async_job_map" are finished.
/// Must be called subsequently after several calls of doDecompressDataReq.
void flushAsynchronousDecompressRequests();
private:
2022-07-09 14:09:23 +00:00
/// Asynchronous job map for decompression: job ID - job object.
/// For each submission, push job ID && job object into this map;
/// For flush, pop out job ID && job object from this map. Use job ID to release job lock and use job object to check job status till complete.
2022-07-15 19:32:10 +00:00
std::map<UInt32, qpl_job *> decomp_async_job_map;
Poco::Logger * log;
};
2022-07-11 16:08:35 +00:00
2023-02-02 15:47:00 +00:00
class CompressionCodecDeflateQpl final : public ICompressionCodec
{
public:
2022-07-09 18:42:01 +00:00
CompressionCodecDeflateQpl();
uint8_t getMethodByte() const override;
void updateHash(SipHash & hash) const override;
protected:
bool isCompression() const override { return true; }
bool isGenericCompression() const override { return true; }
2023-06-09 20:50:17 +00:00
bool isDeflateQpl() const override { return true; }
2022-07-15 19:32:10 +00:00
UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override;
void doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const override;
/// Flush result for previous asynchronous decompression requests on asynchronous mode.
void flushAsynchronousDecompressRequests() override;
private:
2022-07-15 19:32:10 +00:00
UInt32 getMaxCompressedDataSize(UInt32 uncompressed_size) const override;
2022-07-09 18:42:01 +00:00
std::unique_ptr<HardwareCodecDeflateQpl> hw_codec;
std::unique_ptr<SoftwareCodecDeflateQpl> sw_codec;
};
}