Set output_size properly before calling the decoder so it doesn't return nonsense...
[ffmpeg:tocheskeys-ffmpeg-mt.git] / libavcodec / pthread.c
1 /*
2  * Copyright (c) 2004 Roman Shaposhnik.
3  * Copyright (c) 2008 Alexander Strange (astrange@ithinksw.com)
4  *
5  * Many thanks to Steven M. Schultz for providing clever ideas and
6  * to Michael Niedermayer <michaelni@gmx.at> for writing initial
7  * implementation.
8  *
9  * This file is part of FFmpeg.
10  *
11  * FFmpeg is free software; you can redistribute it and/or
12  * modify it under the terms of the GNU Lesser General Public
13  * License as published by the Free Software Foundation; either
14  * version 2.1 of the License, or (at your option) any later version.
15  *
16  * FFmpeg is distributed in the hope that it will be useful,
17  * but WITHOUT ANY WARRANTY; without even the implied warranty of
18  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
19  * Lesser General Public License for more details.
20  *
21  * You should have received a copy of the GNU Lesser General Public
22  * License along with FFmpeg; if not, write to the Free Software
23  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
24  */
25 #include <pthread.h>
26
27 #include "avcodec.h"
28 #include "thread.h"
29
30 #define MAX_DELAYED_RELEASED_BUFFERS 16
31
32 typedef int (action_t)(AVCodecContext *c, void *arg);
33
34 typedef struct ThreadContext {
35     pthread_t *workers;
36     action_t *func;
37     void **args;
38     int *rets;
39     int rets_count;
40     int job_count;
41
42     pthread_cond_t last_job_cond;
43     pthread_cond_t current_job_cond;
44     pthread_mutex_t current_job_lock;
45     int current_job;
46     int done;
47 } ThreadContext;
48
49 typedef struct PerThreadContext {
50     pthread_t thread;
51     pthread_cond_t input_cond, progress_cond, output_cond;
52     pthread_mutex_t mutex, progress_mutex, buffer_mutex;
53
54     AVCodecContext *context;
55
56     uint8_t *input;
57     int input_size;
58
59     AVFrame result;
60     int output_size, output_res;
61
62     struct FrameThreadContext *parent;
63
64     int decode_progress;
65
66     enum {
67         STATE_INPUT_READY,
68         STATE_PREDECODING,
69         STATE_DECODING
70     } state;
71
72     AVFrame released_buffers[MAX_DELAYED_RELEASED_BUFFERS];
73     int nb_released_buffers;
74 } PerThreadContext;
75
76 typedef struct FrameThreadContext {
77     PerThreadContext *threads;
78     PerThreadContext *prev_thread;
79
80     int next_available, next_ready;
81     int delaying;
82
83     int die;
84 } FrameThreadContext;
85
86 static void* attribute_align_arg worker(void *v)
87 {
88     AVCodecContext *avctx = v;
89     ThreadContext *c = avctx->thread_opaque;
90     int our_job = c->job_count;
91     int thread_count = avctx->thread_count;
92     int self_id;
93
94     pthread_mutex_lock(&c->current_job_lock);
95     self_id = c->current_job++;
96     for (;;){
97         while (our_job >= c->job_count) {
98             if (c->current_job == thread_count + c->job_count)
99                 pthread_cond_signal(&c->last_job_cond);
100
101             pthread_cond_wait(&c->current_job_cond, &c->current_job_lock);
102             our_job = self_id;
103
104             if (c->done) {
105                 pthread_mutex_unlock(&c->current_job_lock);
106                 return NULL;
107             }
108         }
109         pthread_mutex_unlock(&c->current_job_lock);
110
111         c->rets[our_job%c->rets_count] = c->func(avctx, c->args[our_job]);
112
113         pthread_mutex_lock(&c->current_job_lock);
114         our_job = c->current_job++;
115     }
116 }
117
118 static av_always_inline void avcodec_thread_park_workers(ThreadContext *c, int thread_count)
119 {
120     pthread_cond_wait(&c->last_job_cond, &c->current_job_lock);
121     pthread_mutex_unlock(&c->current_job_lock);
122 }
123
124 static void ff_thread_free(AVCodecContext *avctx)
125 {
126     ThreadContext *c = avctx->thread_opaque;
127     int i;
128
129     pthread_mutex_lock(&c->current_job_lock);
130     c->done = 1;
131     pthread_cond_broadcast(&c->current_job_cond);
132     pthread_mutex_unlock(&c->current_job_lock);
133
134     for (i=0; i<avctx->thread_count; i++)
135          pthread_join(c->workers[i], NULL);
136
137     pthread_mutex_destroy(&c->current_job_lock);
138     pthread_cond_destroy(&c->current_job_cond);
139     pthread_cond_destroy(&c->last_job_cond);
140     av_free(c->workers);
141     av_freep(&avctx->thread_opaque);
142 }
143
144 int avcodec_thread_execute(AVCodecContext *avctx, action_t* func, void **arg, int *ret, int job_count)
145 {
146     ThreadContext *c= avctx->thread_opaque;
147     int dummy_ret;
148
149     if (!USE_AVCODEC_EXECUTE(avctx) || avctx->thread_count <= 1)
150         return avcodec_default_execute(avctx, func, arg, ret, job_count);
151
152     if (job_count <= 0)
153         return 0;
154
155     pthread_mutex_lock(&c->current_job_lock);
156
157     c->current_job = avctx->thread_count;
158     c->job_count = job_count;
159     c->args = arg;
160     c->func = func;
161     if (ret) {
162         c->rets = ret;
163         c->rets_count = job_count;
164     } else {
165         c->rets = &dummy_ret;
166         c->rets_count = 1;
167     }
168     pthread_cond_broadcast(&c->current_job_cond);
169
170     avcodec_thread_park_workers(c, avctx->thread_count);
171
172     return 0;
173 }
174
175 static int ff_thread_init(AVCodecContext *avctx, int thread_count)
176 {
177     int i;
178     ThreadContext *c;
179
180     c = av_mallocz(sizeof(ThreadContext));
181     if (!c)
182         return -1;
183
184     c->workers = av_mallocz(sizeof(pthread_t)*thread_count);
185     if (!c->workers) {
186         av_free(c);
187         return -1;
188     }
189
190     avctx->thread_opaque = c;
191     c->current_job = 0;
192     c->job_count = 0;
193     c->done = 0;
194     pthread_cond_init(&c->current_job_cond, NULL);
195     pthread_cond_init(&c->last_job_cond, NULL);
196     pthread_mutex_init(&c->current_job_lock, NULL);
197     pthread_mutex_lock(&c->current_job_lock);
198     for (i=0; i<thread_count; i++) {
199         if(pthread_create(&c->workers[i], NULL, worker, avctx)) {
200            avctx->thread_count = i;
201            pthread_mutex_unlock(&c->current_job_lock);
202            avcodec_thread_free(avctx);
203            return -1;
204         }
205     }
206
207     avcodec_thread_park_workers(c, thread_count);
208
209     avctx->execute = avcodec_thread_execute;
210     return 0;
211 }
212
213 static attribute_align_arg void *decode_frame_thread(void *arg)
214 {
215     PerThreadContext * volatile p = arg;
216     AVCodecContext *avctx = p->context;
217     FrameThreadContext * volatile fctx = p->parent;
218     AVCodec *codec = avctx->codec;
219
220     while (1) {
221         pthread_mutex_lock(&p->mutex);
222         while (p->state == STATE_INPUT_READY && !fctx->die) {pthread_cond_wait(&p->input_cond, &p->mutex);}
223         pthread_mutex_unlock(&p->mutex);
224
225         if (fctx->die) break;
226
227         if (!codec->update_context) ff_report_predecode_done(avctx);
228
229         pthread_mutex_lock(&p->mutex);
230         avcodec_get_frame_defaults(&p->result);
231         p->output_size = 0;
232         p->output_res = codec->decode(avctx, &p->result, &p->output_size, p->input, p->input_size);
233
234         if (p->state == STATE_PREDECODING) ff_report_predecode_done(avctx); //duplication
235
236         // prevent hang if there's a decode error
237         if (p->output_size && p->result.data[0]) ff_report_decode_progress(&p->result, INT_MAX);
238
239         p->input_size = 0;
240         p->state = STATE_INPUT_READY;
241
242         pthread_mutex_lock(&p->progress_mutex);
243         pthread_cond_signal(&p->output_cond);
244         pthread_mutex_unlock(&p->progress_mutex);
245         pthread_mutex_unlock(&p->mutex);
246     };
247
248     return NULL;
249 }
250
251 static int ff_frame_thread_init(AVCodecContext *avctx)
252 {
253     FrameThreadContext *fctx;
254     int i, thread_count = avctx->thread_count;
255
256     fctx = av_mallocz(sizeof(FrameThreadContext));
257     fctx->delaying = 1;
258
259     fctx->threads = av_mallocz(sizeof(PerThreadContext) * thread_count);
260
261     for (i = 0; i < thread_count; i++) {
262         PerThreadContext *p = &fctx->threads[i];
263
264         pthread_mutex_init(&p->mutex, NULL);
265         pthread_mutex_init(&p->progress_mutex, NULL);
266         pthread_mutex_init(&p->buffer_mutex, NULL);
267         pthread_cond_init(&p->input_cond, NULL);
268         pthread_cond_init(&p->progress_cond, NULL);
269         pthread_cond_init(&p->output_cond, NULL);
270
271         p->parent = fctx;
272
273         if (!i) {
274             p->context = avctx;
275             avctx->thread_opaque = p;
276         } else {
277             AVCodecContext *copy = av_malloc(sizeof(AVCodecContext));
278
279             *copy = *avctx;
280             copy->is_copy = 1;
281             copy->priv_data = av_malloc(avctx->codec->priv_data_size);
282             memcpy(copy->priv_data, avctx->priv_data, copy->codec->priv_data_size);
283             if (copy->codec->init_copy) copy->codec->init_copy(copy);
284
285             p->context = copy;
286             copy->thread_opaque = p;
287         }
288
289         pthread_create(&p->thread, NULL, decode_frame_thread, p);
290     }
291
292     return 0;
293 }
294
295 static void submit_frame(PerThreadContext * volatile p, const uint8_t *buf, int buf_size)
296 {
297     AVCodec *codec = p->context->codec;
298     PerThreadContext *prev_thread = p->parent->prev_thread;
299     int i;
300
301     if (!buf_size && !(p->context->codec->capabilities & CODEC_CAP_DELAY)) return;
302
303     pthread_mutex_lock(&p->mutex);
304
305     if (codec->update_context && prev_thread) {
306         codec->update_context(p->context, prev_thread->context);
307     }
308
309     p->input = av_fast_realloc(p->input, &p->input_size, buf_size + FF_INPUT_BUFFER_PADDING_SIZE);
310     memcpy(p->input, buf, buf_size);
311     p->input_size = buf_size;
312     p->decode_progress = -1;
313
314     for (i = 0; i < p->nb_released_buffers; i++)
315         ff_release_buffer(&p->released_buffers[i]);
316
317     p->nb_released_buffers = 0;
318
319     p->state = STATE_PREDECODING;
320     pthread_cond_signal(&p->input_cond);
321     pthread_mutex_unlock(&p->mutex);
322
323     pthread_mutex_lock(&p->progress_mutex);
324     while (p->state == STATE_PREDECODING) pthread_cond_wait(&p->progress_cond, &p->progress_mutex);
325     pthread_mutex_unlock(&p->progress_mutex);
326
327     p->parent->prev_thread = p;
328 }
329
330 int ff_decode_frame_threaded(AVCodecContext *avctx,
331                              void *data, int *data_size,
332                              const uint8_t *buf, int buf_size)
333 {
334     FrameThreadContext *fctx;
335     PerThreadContext * volatile p;
336     int thread_count = avctx->thread_count;
337
338     // We can't call init until now, because avcodec_thread_init
339     // is sometimes called before avcodec_open, and we need codec info
340     if (!avctx->thread_opaque) ff_frame_thread_init(avctx);
341
342     fctx = ((PerThreadContext*)avctx->thread_opaque)->parent;
343
344     p = &fctx->threads[fctx->next_available++];
345     submit_frame(p, buf, buf_size);
346
347     if (fctx->delaying) {
348         if (fctx->next_available >= (thread_count-1)) fctx->delaying = 0;
349
350         *data_size=0;
351         return 0;
352     }
353
354     p = &fctx->threads[fctx->next_ready++];
355
356     pthread_mutex_lock(&p->progress_mutex);
357     while (p->state != STATE_INPUT_READY) {pthread_cond_wait(&p->output_cond, &p->progress_mutex);}
358     pthread_mutex_unlock(&p->progress_mutex);
359
360     *(AVFrame*)data = p->result;
361     *data_size = p->output_size;
362
363     if (fctx->next_available >= thread_count) fctx->next_available = 0;
364     if (fctx->next_ready >= thread_count) fctx->next_ready = 0;
365
366     return p->output_res;
367 }
368
369 void ff_report_decode_progress(AVFrame *f, int n)
370 {
371     PerThreadContext *p = f->avctx->thread_opaque;
372     int *progress = f->thread_opaque;
373
374     if (*progress >= n) return;
375
376     pthread_mutex_lock(&p->progress_mutex);
377     *progress = n;
378     pthread_cond_broadcast(&p->progress_cond);
379     pthread_mutex_unlock(&p->progress_mutex);
380 }
381
382 void ff_await_decode_progress(AVFrame *f, int n)
383 {
384     PerThreadContext *p = f->avctx->thread_opaque;
385     int * volatile progress = f->thread_opaque;
386
387     if (*progress >= n) return;
388
389     pthread_mutex_lock(&p->progress_mutex);
390     while (*progress < n)
391         pthread_cond_wait(&p->progress_cond, &p->progress_mutex);
392     pthread_mutex_unlock(&p->progress_mutex);
393 }
394
395 void ff_report_predecode_done(AVCodecContext *avctx) {
396     PerThreadContext *p = avctx->thread_opaque;
397
398     if (!USE_FRAME_THREADING(avctx)) return;
399
400     pthread_mutex_lock(&p->progress_mutex);
401     p->state = STATE_DECODING;
402     pthread_cond_broadcast(&p->progress_cond);
403     pthread_mutex_unlock(&p->progress_mutex);
404 }
405
406 static void park_frame_decode_threads(FrameThreadContext *fctx, int thread_count)
407 {
408     int i;
409
410     for (i = 0; i < thread_count; i++) {
411         PerThreadContext *p = &fctx->threads[i];
412
413         pthread_mutex_lock(&p->progress_mutex);
414         while (p->state != STATE_INPUT_READY) pthread_cond_wait(&p->output_cond, &p->progress_mutex);
415         pthread_mutex_unlock(&p->progress_mutex);
416     }
417 }
418
419 static void ff_frame_thread_free(AVCodecContext *avctx)
420 {
421     FrameThreadContext *fctx;
422     AVCodec *codec;
423     int i;
424
425     if (!avctx->thread_opaque) return;
426
427     fctx = ((PerThreadContext*)avctx->thread_opaque)->parent;
428     codec = fctx->prev_thread->context->codec;
429
430     park_frame_decode_threads(fctx, avctx->thread_count);
431
432     if (fctx->prev_thread != &fctx->threads[0] && codec->update_context)
433         codec->update_context(fctx->threads[0].context, fctx->prev_thread->context);
434
435     fctx->die = 1;
436
437     for (i = 0; i < avctx->thread_count; i++) {
438         PerThreadContext *p = &fctx->threads[i];
439
440         pthread_mutex_lock(&p->mutex);
441         pthread_cond_signal(&p->input_cond);
442         pthread_mutex_unlock(&p->mutex);
443
444         pthread_join(p->thread, NULL);
445
446         pthread_mutex_destroy(&p->mutex);
447         pthread_mutex_destroy(&p->progress_mutex);
448         pthread_mutex_destroy(&p->buffer_mutex);
449         pthread_cond_destroy(&p->input_cond);
450         pthread_cond_destroy(&p->progress_cond);
451         pthread_cond_destroy(&p->output_cond);
452         av_freep(&p->input);
453
454         if (i) {
455             p->context->codec->close(p->context);
456             avcodec_default_free_buffers(p->context);
457
458             av_freep(&p->context->priv_data);
459             av_freep(&p->context);
460         }
461     }
462
463     av_freep(&fctx->threads);
464     av_free(fctx);
465
466     avctx->thread_opaque = NULL;
467     avctx->thread_algorithm = 0;
468 }
469
470 void ff_frame_thread_flush(AVCodecContext *avctx)
471 {
472     FrameThreadContext *fctx = ((PerThreadContext*)avctx->thread_opaque)->parent;
473     int i;
474
475     park_frame_decode_threads(fctx, avctx->thread_count);
476
477     if (fctx->prev_thread != fctx->threads)
478         avctx->codec->update_context(fctx->threads->context, fctx->prev_thread->context);
479
480     for (i = 0; i < avctx->thread_count; i++) {
481         PerThreadContext *p = &fctx->threads[i];
482         int j;
483
484         for (j = 0; j < p->nb_released_buffers; j++)
485             ff_release_buffer(&p->released_buffers[j]);
486
487         p->nb_released_buffers = 0;
488     }
489
490     if (avctx->codec->flush)
491         avctx->codec->flush(fctx->threads->context);
492
493     fctx->next_available = fctx->next_ready = 0;
494     fctx->delaying = 1;
495     fctx->prev_thread = NULL;
496 }
497
498 int ff_get_buffer(AVCodecContext *avctx, AVFrame *f)
499 {
500     int ret, *progress;
501     PerThreadContext *p = avctx->thread_opaque;
502
503     f->avctx = avctx;
504     f->thread_opaque = progress = av_malloc(sizeof(int));
505
506     if (!USE_FRAME_THREADING(avctx)) {
507         *progress = INT_MAX;
508         return avctx->get_buffer(avctx, f);
509     }
510
511     *progress = -1;
512
513     pthread_mutex_lock(&p->buffer_mutex);
514     ret = avctx->get_buffer(avctx, f);
515     pthread_mutex_unlock(&p->buffer_mutex);
516
517     return ret;
518 }
519
520 void ff_release_buffer(AVFrame *f)
521 {
522     PerThreadContext *p = f->avctx->thread_opaque;
523
524     av_freep(&f->thread_opaque);
525
526     if (!USE_FRAME_THREADING(f->avctx)) {
527         f->avctx->release_buffer(f->avctx, f);
528         return;
529     }
530
531     pthread_mutex_lock(&p->buffer_mutex);
532     f->avctx->release_buffer(f->avctx, f);
533     pthread_mutex_unlock(&p->buffer_mutex);
534 }
535
536 void ff_delayed_release_buffer(AVCodecContext *avctx, AVFrame *f)
537 {
538     PerThreadContext *p = avctx->thread_opaque;
539
540     if (!USE_FRAME_THREADING(avctx)) {
541         f->avctx->release_buffer(f->avctx, f);
542         return;
543     }
544
545     if (p->nb_released_buffers >= MAX_DELAYED_RELEASED_BUFFERS) {
546         av_log(p->context, AV_LOG_ERROR, "too many delayed release_buffer calls!\n");
547         return;
548     }
549
550     if(avctx->debug&FF_DEBUG_BUFFERS)
551         av_log(avctx, AV_LOG_DEBUG, "delayed_release_buffer called on pic %p, %d buffers used\n", f, f->avctx->internal_buffer_count);
552
553     p->released_buffers[p->nb_released_buffers++] = *f;
554     memset(f->data, 0, sizeof(f->data));
555 }
556
557 /// Set the threading algorithm used, or none if an algorithm was set but no thread count.
558 static void ff_validate_thread_parameters(AVCodecContext *avctx)
559 {
560     if (avctx->thread_count <= 1)
561         avctx->thread_algorithm = 0;
562     else if (avctx->thread_algorithm == FF_THREAD_AUTO)
563         avctx->thread_algorithm = (avctx->codec->capabilities & CODEC_CAP_FRAME_THREADS) ? FF_THREAD_MULTIFRAME : FF_THREAD_MULTISLICE;
564 }
565
566 int avcodec_thread_init(AVCodecContext *avctx, int thread_count)
567 {
568     avctx->thread_count = thread_count;
569
570     if (avctx->codec && !avctx->thread_opaque) {
571         ff_validate_thread_parameters(avctx);
572
573         if (USE_AVCODEC_EXECUTE(avctx))
574             return ff_thread_init(avctx, thread_count);
575     }
576
577     return 0;
578 }
579
580 void avcodec_thread_free(AVCodecContext *avctx)
581 {
582     if (USE_FRAME_THREADING(avctx))
583         ff_frame_thread_free(avctx);
584     else
585         ff_thread_free(avctx);
586 }