Make VP8 encoder thread-safe (fix video corruption issue when the configuration is updated while we're encoding a frame)

This commit is contained in:
bossiel 2015-05-26 23:47:58 +00:00
parent f7ca47efc6
commit 231fdb89ac
1 changed files with 335 additions and 326 deletions

View File

@ -1,7 +1,5 @@
/*
* Copyright (C) 2011 Doubango Telecom <http://www.doubango.org>
*
* Contact: Mamadou Diop <diopmamadou(at)doubango(dot)org>
* Copyright (C) 2011-2015 Doubango Telecom <http://www.doubango.org>
*
* This file is part of Open Source Doubango Framework.
*
@ -77,26 +75,28 @@ typedef struct tdav_codec_vp8_s
TMEDIA_DECLARE_CODEC_VIDEO;
// Encoder
struct{
struct {
vpx_codec_enc_cfg_t cfg;
tsk_bool_t initialized;
vpx_codec_pts_t pts;
vpx_codec_ctx_t context;
unsigned pic_id:15;
unsigned pic_id : 15;
uint64_t frame_count;
tsk_bool_t force_idr;
int rotation;
struct{
struct {
uint8_t* ptr;
tsk_size_t size;
} rtp;
tsk_mutex_handle_t* mutex;
} encoder;
// decoder
struct{
struct {
vpx_codec_dec_cfg_t cfg;
unsigned initialized:1;
unsigned initialized : 1;
vpx_codec_ctx_t context;
void* accumulator;
tsk_size_t accumulator_pos;
@ -154,7 +154,7 @@ static int tdav_codec_vp8_set(tmedia_codec_t* self, const tmedia_param_t* param)
}
}
}
else if(tsk_striequals(param->key, "bw_kbps")) { // both up and down (from the SDP)
else if (tsk_striequals(param->key, "bw_kbps")) { // both up and down (from the SDP)
int32_t max_bw_userdefine = tmedia_defaults_get_bandwidth_video_upload_max();
int32_t max_bw_new = *((int32_t*)param->value);
if (max_bw_userdefine > 0) {
@ -162,7 +162,7 @@ static int tdav_codec_vp8_set(tmedia_codec_t* self, const tmedia_param_t* param)
TMEDIA_CODEC(vp8)->bandwidth_max_upload = TSK_MIN(max_bw_new, max_bw_userdefine);
}
else {
TMEDIA_CODEC(vp8)->bandwidth_max_upload= max_bw_new;
TMEDIA_CODEC(vp8)->bandwidth_max_upload = max_bw_new;
}
reconf = tsk_true;
}
@ -186,9 +186,13 @@ static int tdav_codec_vp8_set(tmedia_codec_t* self, const tmedia_param_t* param)
if (reconf) {
if (vp8->encoder.initialized) {
// The encoder isn't thread safe. Without this lock (and the one in the encode() function) we may have corruptions in the video (issue report from GE).
// Google says the encoder is thread-safe but this is not the case. But it is *multi-instance* thread-safe.
tsk_mutex_lock(vp8->encoder.mutex);
if ((vpx_ret = vpx_codec_enc_config_set(&vp8->encoder.context, &vp8->encoder.cfg)) != VPX_CODEC_OK) {
TSK_DEBUG_ERROR("vpx_codec_enc_config_set failed with error =%s", vpx_codec_err_to_string(vpx_ret));
}
tsk_mutex_unlock(vp8->encoder.mutex);
}
return (vpx_ret == VPX_CODEC_OK) ? 0 : -2;
}
@ -201,7 +205,7 @@ static int tdav_codec_vp8_open(tmedia_codec_t* self)
tdav_codec_vp8_t* vp8 = (tdav_codec_vp8_t*)self;
int ret;
if(!vp8){
if (!vp8) {
TSK_DEBUG_ERROR("Invalid parameter");
return -1;
}
@ -210,12 +214,12 @@ static int tdav_codec_vp8_open(tmedia_codec_t* self)
// Encoder
if((ret = tdav_codec_vp8_open_encoder(vp8))){
if ((ret = tdav_codec_vp8_open_encoder(vp8))) {
return ret;
}
// Decoder
if((ret = tdav_codec_vp8_open_decoder(vp8))){
if ((ret = tdav_codec_vp8_open_decoder(vp8))) {
return ret;
}
@ -226,7 +230,7 @@ static int tdav_codec_vp8_close(tmedia_codec_t* self)
{
tdav_codec_vp8_t* vp8 = (tdav_codec_vp8_t*)self;
if(!vp8){
if (!vp8) {
TSK_DEBUG_ERROR("Invalid parameter");
return -1;
}
@ -244,41 +248,44 @@ static tsk_size_t tdav_codec_vp8_encode(tmedia_codec_t* self, const void* in_dat
vpx_codec_err_t vpx_ret = VPX_CODEC_OK;
const vpx_codec_cx_pkt_t *pkt;
vpx_codec_iter_t iter = tsk_null;
vpx_image_t image;
vpx_image_t image = {0};
if(!vp8 || !in_data || !in_size){
if (!vp8 || !in_data || !in_size) {
TSK_DEBUG_ERROR("Invalid parameter");
return 0;
}
if(in_size != (vp8->encoder.context.config.enc->g_w * vp8->encoder.context.config.enc->g_h * 3)>>1){
if (in_size != (vp8->encoder.context.config.enc->g_w * vp8->encoder.context.config.enc->g_h * 3) >> 1) {
TSK_DEBUG_ERROR("Invalid size");
return 0;
}
// wrap yuv420 buffer
if(!vpx_img_wrap(&image, VPX_IMG_FMT_I420, vp8->encoder.context.config.enc->g_w, vp8->encoder.context.config.enc->g_h, 1, (unsigned char*)in_data)){
if (!vpx_img_wrap(&image, VPX_IMG_FMT_I420, vp8->encoder.context.config.enc->g_w, vp8->encoder.context.config.enc->g_h, 1, (unsigned char*)in_data)) {
TSK_DEBUG_ERROR("vpx_img_wrap failed");
return 0;
}
// encode data
++vp8->encoder.pts;
if(vp8->encoder.force_idr){
if (vp8->encoder.force_idr) {
flags |= VPX_EFLAG_FORCE_KF;
vp8->encoder.force_idr = tsk_false;
}
if((vpx_ret = vpx_codec_encode(&vp8->encoder.context, &image, vp8->encoder.pts, 1, flags, VPX_DL_REALTIME)) != VPX_CODEC_OK){
tsk_mutex_lock(vp8->encoder.mutex); // must
vpx_ret = vpx_codec_encode(&vp8->encoder.context, &image, vp8->encoder.pts, 1, flags, VPX_DL_REALTIME);
tsk_mutex_unlock(vp8->encoder.mutex);
if (vpx_ret != VPX_CODEC_OK) {
TSK_DEBUG_ERROR("vpx_codec_encode failed with error =%s", vpx_codec_err_to_string(vpx_ret));
vpx_img_free(&image);
return 0;
goto bail;
}
++vp8->encoder.frame_count;
++vp8->encoder.pic_id;
while ((pkt = vpx_codec_get_cx_data(&vp8->encoder.context, &iter))) {
switch(pkt->kind){
switch (pkt->kind) {
case VPX_CODEC_CX_FRAME_PKT:
{
tdav_codec_vp8_encap(vp8, pkt);
@ -295,6 +302,7 @@ static tsk_size_t tdav_codec_vp8_encode(tmedia_codec_t* self, const void* in_dat
}
}
bail:
vpx_img_free(&image);
return 0;
}
@ -310,7 +318,7 @@ static tsk_size_t tdav_codec_vp8_decode(tmedia_codec_t* self, const void* in_dat
static const tsk_size_t xmax_size = (3840 * 2160 * 3) >> 3; // >>3 instead of >>1 (not an error)
uint8_t S, PartID;
if (!self || !in_data || in_size<1 || !out_data || !vp8->decoder.initialized) {
if (!self || !in_data || in_size < 1 || !out_data || !vp8->decoder.initialized) {
TSK_DEBUG_ERROR("Invalid parameter");
return 0;
}
@ -318,14 +326,14 @@ static tsk_size_t tdav_codec_vp8_decode(tmedia_codec_t* self, const void* in_dat
{ /* 4.2. VP8 Payload Descriptor */
uint8_t X, R, N, I, L, T, K;//FIXME: store
X = (*pdata & 0x80)>>7;
R = (*pdata & 0x40)>>6;
X = (*pdata & 0x80) >> 7;
R = (*pdata & 0x40) >> 6;
if (R) {
TSK_DEBUG_ERROR("R<>0");
return 0;
}
N = (*pdata & 0x20)>>5;
S = (*pdata & 0x10)>>4;
N = (*pdata & 0x20) >> 5;
S = (*pdata & 0x10) >> 4;
PartID = (*pdata & 0x0F);
// skip "REQUIRED" header
if (++pdata >= pdata_end) {
@ -378,7 +386,7 @@ static tsk_size_t tdav_codec_vp8_decode(tmedia_codec_t* self, const void* in_dat
vp8->decoder.last_seq = rtp_hdr->seq_num;
// New frame ?
if(vp8->decoder.last_timestamp != rtp_hdr->timestamp){
if (vp8->decoder.last_timestamp != rtp_hdr->timestamp) {
/* 4.3. VP8 Payload Header
Note that the header is present only in packets
which have the S bit equal to one and the PartID equal to zero in the
@ -483,7 +491,7 @@ static tsk_size_t tdav_codec_vp8_decode(tmedia_codec_t* self, const void* in_dat
// libvpx will crash very ofen when the frame is corrupted => for now we decided not to decode such frame
// according to the latest release there is a function to check if the frame
// is corrupted or not => To be checked
if(vp8->decoder.corrupted){
if(vp8->decoder.corrupted) {
vp8->decoder.corrupted = tsk_false;
goto bail;
}
@ -532,9 +540,9 @@ static tsk_size_t tdav_codec_vp8_decode(tmedia_codec_t* self, const void* in_dat
}
// layout picture
for (plane=0; plane < 3; plane++) {
unsigned char *buf =img->planes[plane];
for (y=0; y<img->d_h >> (plane ? 1 : 0); y++) {
for (plane = 0; plane < 3; plane++) {
unsigned char *buf = img->planes[plane];
for (y = 0; y < img->d_h >> (plane ? 1 : 0); y++) {
unsigned int w_count = img->d_w >> (plane ? 1 : 0);
if ((ret + w_count) > *out_max_size) {
TSK_DEBUG_ERROR("BufferOverflow");
@ -557,18 +565,18 @@ bail:
}
fatal_error = tsk_true;
// vp8->decoder.last_PartID = PartID;
// vp8->decoder.last_S = S;
// vp8->decoder.last_N = N;
// vp8->decoder.last_PartID = PartID;
// vp8->decoder.last_S = S;
// vp8->decoder.last_N = N;
return ret;
}
static tsk_bool_t tdav_codec_vp8_sdp_att_match(const tmedia_codec_t* codec, const char* att_name, const char* att_value)
{
#if 0
if(tsk_striequals(att_name, "fmtp")){
if(tsk_striequals(att_name, "fmtp")) {
unsigned width, height, fps;
if(tmedia_parse_video_fmtp(att_value, TMEDIA_CODEC_VIDEO(codec)->pref_size, &width, &height, &fps)){
if(tmedia_parse_video_fmtp(att_value, TMEDIA_CODEC_VIDEO(codec)->pref_size, &width, &height, &fps)) {
TSK_DEBUG_ERROR("Failed to match fmtp=%s", att_value);
return tsk_false;
}
@ -578,9 +586,9 @@ static tsk_bool_t tdav_codec_vp8_sdp_att_match(const tmedia_codec_t* codec, cons
}
else
#endif
if(tsk_striequals(att_name, "imageattr")){
if (tsk_striequals(att_name, "imageattr")) {
unsigned in_width, in_height, out_width, out_height;
if(tmedia_parse_video_imageattr(att_value, TMEDIA_CODEC_VIDEO(codec)->pref_size, &in_width, &in_height, &out_width, &out_height) != 0){
if (tmedia_parse_video_imageattr(att_value, TMEDIA_CODEC_VIDEO(codec)->pref_size, &in_width, &in_height, &out_width, &out_height) != 0) {
return tsk_false;
}
TMEDIA_CODEC_VIDEO(codec)->in.width = in_width;
@ -595,12 +603,12 @@ static tsk_bool_t tdav_codec_vp8_sdp_att_match(const tmedia_codec_t* codec, cons
static char* tdav_codec_vp8_sdp_att_get(const tmedia_codec_t* codec, const char* att_name)
{
#if 0
if(tsk_striequals(att_name, "fmtp")){
if(tsk_striequals(att_name, "fmtp")) {
return tmedia_get_video_fmtp(TMEDIA_CODEC_VIDEO(codec)->pref_size);
}
else
#endif
if(tsk_striequals(att_name, "imageattr")){
if (tsk_striequals(att_name, "imageattr")) {
return tmedia_get_video_imageattr(TMEDIA_CODEC_VIDEO(codec)->pref_size,
TMEDIA_CODEC_VIDEO(codec)->in.width, TMEDIA_CODEC_VIDEO(codec)->in.height, TMEDIA_CODEC_VIDEO(codec)->out.width, TMEDIA_CODEC_VIDEO(codec)->out.height);
}
@ -613,7 +621,7 @@ static char* tdav_codec_vp8_sdp_att_get(const tmedia_codec_t* codec, const char*
static tsk_object_t* tdav_codec_vp8_ctor(tsk_object_t * self, va_list * app)
{
tdav_codec_vp8_t *vp8 = self;
if(vp8){
if (vp8) {
/* init base: called by tmedia_codec_create() */
/* init self */
}
@ -624,26 +632,12 @@ static tsk_object_t* tdav_codec_vp8_dtor(tsk_object_t * self)
{
tdav_codec_vp8_t *vp8 = self;
TSK_DEBUG_INFO("*** tdav_codec_vp8_dtor destroyed ***");
if(vp8){
if (vp8) {
/* deinit base */
tmedia_codec_video_deinit(vp8);
/* deinit self */
if(vp8->encoder.rtp.ptr){
TSK_FREE(vp8->encoder.rtp.ptr);
vp8->encoder.rtp.size = 0;
}
if(vp8->encoder.initialized){
vpx_codec_destroy(&vp8->encoder.context);
vp8->encoder.initialized = tsk_false;
}
if(vp8->decoder.initialized){
vpx_codec_destroy(&vp8->decoder.context);
vp8->decoder.initialized = tsk_false;
}
if(vp8->decoder.accumulator){
TSK_FREE(vp8->decoder.accumulator);
vp8->decoder.accumulator_pos = 0;
}
tdav_codec_vp8_close_encoder(vp8);
tdav_codec_vp8_close_decoder(vp8);
}
return self;
@ -673,7 +667,7 @@ static const tmedia_codec_plugin_def_t tdav_codec_vp8_plugin_def_s =
{ 0 },
/* video (defaul width,height,fps) */
{176, 144, 0}, // fps is @deprecated
{ 176, 144, 0 }, // fps is @deprecated
tdav_codec_vp8_set,
tdav_codec_vp8_open,
@ -692,12 +686,12 @@ int tdav_codec_vp8_open_encoder(tdav_codec_vp8_t* self)
vpx_codec_err_t vpx_ret;
vpx_enc_frame_flags_t enc_flags = 0; // VPX_EFLAG_XXX
if(self->encoder.initialized){
if (self->encoder.initialized) {
TSK_DEBUG_ERROR("VP8 encoder already inialized");
return -1;
}
if((vpx_ret = vpx_codec_enc_config_default(vp8_interface_enc, &self->encoder.cfg, 0)) != VPX_CODEC_OK){
if ((vpx_ret = vpx_codec_enc_config_default(vp8_interface_enc, &self->encoder.cfg, 0)) != VPX_CODEC_OK) {
TSK_DEBUG_ERROR("vpx_codec_enc_config_default failed with error =%s", vpx_codec_err_to_string(vpx_ret));
return -2;
}
@ -745,12 +739,11 @@ int tdav_codec_vp8_open_encoder(tdav_codec_vp8_t* self)
self->encoder.cfg.rc_buf_sz = 1000;
#endif
if((vpx_ret = vpx_codec_enc_init(&self->encoder.context, vp8_interface_enc, &self->encoder.cfg, enc_flags)) != VPX_CODEC_OK){
if ((vpx_ret = vpx_codec_enc_init(&self->encoder.context, vp8_interface_enc, &self->encoder.cfg, enc_flags)) != VPX_CODEC_OK) {
TSK_DEBUG_ERROR("vpx_codec_enc_init failed with error =%s", vpx_codec_err_to_string(vpx_ret));
return -3;
}
self->encoder.pic_id = /*(rand() ^ rand()) % 0x7FFF*/0/*Use zero: why do you want to make your life harder?*/;
self->encoder.initialized = tsk_true;
/* vpx_codec_control(&self->encoder.context, VP8E_SET_STATIC_THRESHOLD, 800); */
#if !TDAV_UNDER_MOBILE /* must not remove: crash on Android for sure and probably on iOS also (all ARM devices ?) */
@ -777,6 +770,14 @@ int tdav_codec_vp8_open_encoder(tdav_codec_vp8_t* self)
}
#endif
// Create the mutex if not already done
if (!self->encoder.mutex && !(self->encoder.mutex = tsk_mutex_create())) {
vpx_codec_destroy(&self->encoder.context);
TSK_DEBUG_ERROR("Failed to create mutex");
return -4;
}
self->encoder.initialized = tsk_true;
TSK_DEBUG_INFO("[VP8] target_bitrate=%d kbps", self->encoder.cfg.rc_target_bitrate);
@ -789,10 +790,10 @@ int tdav_codec_vp8_open_decoder(tdav_codec_vp8_t* self)
vpx_codec_caps_t dec_caps;
vpx_codec_flags_t dec_flags = 0;
#if !TDAV_UNDER_MOBILE
static vp8_postproc_cfg_t __pp = { VP8_DEBLOCK | VP8_DEMACROBLOCK, 4, 0};
static vp8_postproc_cfg_t __pp = { VP8_DEBLOCK | VP8_DEMACROBLOCK, 4, 0 };
#endif
if(self->decoder.initialized){
if (self->decoder.initialized) {
TSK_DEBUG_ERROR("VP8 decoder already initialized");
return -1;
}
@ -809,22 +810,22 @@ int tdav_codec_vp8_open_decoder(tdav_codec_vp8_t* self)
dec_caps = vpx_codec_get_caps(&vpx_codec_vp8_dx_algo);
#if !TDAV_UNDER_MOBILE
if(dec_caps & VPX_CODEC_CAP_POSTPROC){
if (dec_caps & VPX_CODEC_CAP_POSTPROC) {
dec_flags |= VPX_CODEC_USE_POSTPROC;
}
#endif
#if defined(VPX_CODEC_CAP_ERROR_CONCEALMENT)
if(dec_caps & VPX_CODEC_CAP_ERROR_CONCEALMENT){
if (dec_caps & VPX_CODEC_CAP_ERROR_CONCEALMENT) {
dec_flags |= VPX_CODEC_USE_ERROR_CONCEALMENT;
}
#endif
if((vpx_ret = vpx_codec_dec_init(&self->decoder.context, vp8_interface_dec, &self->decoder.cfg, dec_flags)) != VPX_CODEC_OK){
if ((vpx_ret = vpx_codec_dec_init(&self->decoder.context, vp8_interface_dec, &self->decoder.cfg, dec_flags)) != VPX_CODEC_OK) {
TSK_DEBUG_ERROR("vpx_codec_dec_init failed with error =%s", vpx_codec_err_to_string(vpx_ret));
return -4;
}
#if !TDAV_UNDER_MOBILE
if((vpx_ret = vpx_codec_control(&self->decoder.context, VP8_SET_POSTPROC, &__pp))){
if ((vpx_ret = vpx_codec_control(&self->decoder.context, VP8_SET_POSTPROC, &__pp))) {
TSK_DEBUG_WARN("vpx_codec_dec_init failed with error =%s", vpx_codec_err_to_string(vpx_ret));
}
#endif
@ -840,6 +841,11 @@ int tdav_codec_vp8_close_encoder(tdav_codec_vp8_t* self)
vpx_codec_destroy(&self->encoder.context);
self->encoder.initialized = tsk_false;
}
if (self->encoder.mutex) {
tsk_mutex_destroy(&self->encoder.mutex);
}
TSK_FREE(self->encoder.rtp.ptr);
self->encoder.rtp.size = 0;
self->encoder.rotation = 0; // reset rotation
TSK_DEBUG_INFO("tdav_codec_vp8_close_encoder(end)");
return 0;
@ -848,10 +854,13 @@ int tdav_codec_vp8_close_encoder(tdav_codec_vp8_t* self)
int tdav_codec_vp8_close_decoder(tdav_codec_vp8_t* self)
{
TSK_DEBUG_INFO("tdav_codec_vp8_close_decoder(begin)");
if(self->decoder.initialized){
if (self->decoder.initialized) {
vpx_codec_destroy(&self->decoder.context);
self->decoder.initialized = tsk_false;
}
TSK_FREE(self->decoder.accumulator);
self->decoder.accumulator_size = 0;
self->decoder.accumulator_pos = 0;
TSK_DEBUG_INFO("tdav_codec_vp8_close_decoder(end)");
return 0;
@ -872,7 +881,7 @@ static void tdav_codec_vp8_encap(tdav_codec_vp8_t* self, const vpx_codec_cx_pkt_
}
index = 0;
frame_ptr = pkt->data.frame.buf ;
frame_ptr = pkt->data.frame.buf;
pkt_size = (uint32_t)pkt->data.frame.sz;
non_ref = (pkt->data.frame.flags & VPX_FRAME_IS_DROPPABLE);
is_keyframe = (pkt->data.frame.flags & VPX_FRAME_IS_KEY);
@ -981,17 +990,17 @@ static void tdav_codec_vp8_rtp_callback(tdav_codec_vp8_t *self, const void *data
Note that the header is present only in packets which have the S bit equal to one and the
PartID equal to zero in the payload descriptor.
*/
if((has_hdr = (part_start && partID == 0))){
if ((has_hdr = (part_start && partID == 0))) {
has_hdr = tsk_true;
paydesc_and_hdr_size += 0; // encoded data already contains payload header?
}
if(!data || !size){
if (!data || !size) {
TSK_DEBUG_ERROR("Invalid parameter");
return;
}
if(self->encoder.rtp.size < (size + paydesc_and_hdr_size)){
if(!(self->encoder.rtp.ptr = tsk_realloc(self->encoder.rtp.ptr, (size + paydesc_and_hdr_size)))){
if (self->encoder.rtp.size < (size + paydesc_and_hdr_size)) {
if (!(self->encoder.rtp.ptr = tsk_realloc(self->encoder.rtp.ptr, (size + paydesc_and_hdr_size)))) {
TSK_DEBUG_ERROR("Failed to allocate new buffer");
return;
}
@ -1021,15 +1030,15 @@ static void tdav_codec_vp8_rtp_callback(tdav_codec_vp8_t *self, const void *data
#endif
/* 4.2. VP8 Payload Header */
//if(has_hdr){
//if(has_hdr) {
// already part of the encoded stream
//}
// Send data over the network
if(TMEDIA_CODEC_VIDEO(self)->out.callback){
if (TMEDIA_CODEC_VIDEO(self)->out.callback) {
TMEDIA_CODEC_VIDEO(self)->out.result.buffer.ptr = self->encoder.rtp.ptr;
TMEDIA_CODEC_VIDEO(self)->out.result.buffer.size = (size + TDAV_VP8_PAY_DESC_SIZE);
TMEDIA_CODEC_VIDEO(self)->out.result.duration = (uint32_t) ((1./(double)TMEDIA_CODEC_VIDEO(self)->out.fps) * TMEDIA_CODEC(self)->plugin->rate);
TMEDIA_CODEC_VIDEO(self)->out.result.duration = (uint32_t)((1. / (double)TMEDIA_CODEC_VIDEO(self)->out.fps) * TMEDIA_CODEC(self)->plugin->rate);
TMEDIA_CODEC_VIDEO(self)->out.result.last_chunck = last;
TMEDIA_CODEC_VIDEO(self)->out.callback(&TMEDIA_CODEC_VIDEO(self)->out.result);
}