mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-30 03:22:14 +00:00
unified sync implemetation with async
This commit is contained in:
parent
bcba581191
commit
b699764bbe
@ -167,36 +167,6 @@ int32_t HardwareCodecDeflateQpl::doCompressData(const char * source, uint32_t so
|
|||||||
return compressed_size;
|
return compressed_size;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t HardwareCodecDeflateQpl::doDecompressDataSynchronous(const char * source, uint32_t source_size, char * dest, uint32_t uncompressed_size) const
|
|
||||||
{
|
|
||||||
uint32_t job_id = 0;
|
|
||||||
qpl_job* job_ptr = nullptr;
|
|
||||||
if (!(job_ptr = DeflateQplJobHWPool::instance().acquireJob(&job_id)))
|
|
||||||
{
|
|
||||||
LOG_WARNING(log, "DeflateQpl HW codec failed, falling back to SW codec.(Details: doDecompressDataSynchronous->acquireJob fail, probably job pool exhausted)");
|
|
||||||
return RET_ERROR;
|
|
||||||
}
|
|
||||||
// Performing a decompression operation
|
|
||||||
job_ptr->op = qpl_op_decompress;
|
|
||||||
job_ptr->next_in_ptr = reinterpret_cast<uint8_t *>(const_cast<char *>(source));
|
|
||||||
job_ptr->next_out_ptr = reinterpret_cast<uint8_t *>(dest);
|
|
||||||
job_ptr->available_in = source_size;
|
|
||||||
job_ptr->available_out = uncompressed_size;
|
|
||||||
job_ptr->flags = QPL_FLAG_FIRST | QPL_FLAG_LAST;
|
|
||||||
|
|
||||||
if (auto status = qpl_execute_job(job_ptr); status == QPL_STS_OK)
|
|
||||||
{
|
|
||||||
DeflateQplJobHWPool::instance().releaseJob(job_id);
|
|
||||||
return job_ptr->total_out;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
LOG_WARNING(log, "DeflateQpl HW codec failed, falling back to SW codec.(Details: doDecompressDataSynchronous->qpl_execute_job with error code:{} - please refer to qpl_status in ./contrib/qpl/include/qpl/c_api/status.h)", status);
|
|
||||||
DeflateQplJobHWPool::instance().releaseJob(job_id);
|
|
||||||
return RET_ERROR;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t HardwareCodecDeflateQpl::doDecompressDataAsynchronous(const char * source, uint32_t source_size, char * dest, uint32_t uncompressed_size)
|
int32_t HardwareCodecDeflateQpl::doDecompressDataAsynchronous(const char * source, uint32_t source_size, char * dest, uint32_t uncompressed_size)
|
||||||
{
|
{
|
||||||
uint32_t job_id = 0;
|
uint32_t job_id = 0;
|
||||||
@ -362,8 +332,14 @@ void CompressionCodecDeflateQpl::doDecompressData(const char * source, uint32_t
|
|||||||
{
|
{
|
||||||
int32_t res = HardwareCodecDeflateQpl::RET_ERROR;
|
int32_t res = HardwareCodecDeflateQpl::RET_ERROR;
|
||||||
if (DeflateQplJobHWPool::instance().isJobPoolReady())
|
if (DeflateQplJobHWPool::instance().isJobPoolReady())
|
||||||
res = hw_codec->doDecompressDataSynchronous(source, source_size, dest, uncompressed_size);
|
{
|
||||||
if (res == HardwareCodecDeflateQpl::RET_ERROR)
|
res = hw_codec->doDecompressDataAsynchronous(source, source_size, dest, uncompressed_size);
|
||||||
|
if(res != HardwareCodecDeflateQpl::RET_ERROR)
|
||||||
|
hw_codec->flushAsynchronousDecompressRequests();
|
||||||
|
else
|
||||||
|
sw_codec->doDecompressData(source, source_size, dest, uncompressed_size);
|
||||||
|
}
|
||||||
|
else
|
||||||
sw_codec->doDecompressData(source, source_size, dest, uncompressed_size);
|
sw_codec->doDecompressData(source, source_size, dest, uncompressed_size);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -77,7 +77,6 @@ public:
|
|||||||
HardwareCodecDeflateQpl();
|
HardwareCodecDeflateQpl();
|
||||||
~HardwareCodecDeflateQpl();
|
~HardwareCodecDeflateQpl();
|
||||||
int32_t doCompressData(const char * source, uint32_t source_size, char * dest, uint32_t dest_size) const;
|
int32_t doCompressData(const char * source, uint32_t source_size, char * dest, uint32_t dest_size) const;
|
||||||
int32_t doDecompressDataSynchronous(const char * source, uint32_t source_size, char * dest, uint32_t uncompressed_size) const;
|
|
||||||
int32_t doDecompressDataAsynchronous(const char * source, uint32_t source_size, char * dest, uint32_t uncompressed_size);
|
int32_t doDecompressDataAsynchronous(const char * source, uint32_t source_size, char * dest, uint32_t uncompressed_size);
|
||||||
/// Flush result for previous asynchronous decompression requests.Must be used following with several calls of doDecompressDataReq.
|
/// Flush result for previous asynchronous decompression requests.Must be used following with several calls of doDecompressDataReq.
|
||||||
void flushAsynchronousDecompressRequests();
|
void flushAsynchronousDecompressRequests();
|
||||||
|
Loading…
Reference in New Issue
Block a user