FFmpegKit Linux API 6.0
Loading...
Searching...
No Matches
fftools_thread_queue.c
Go to the documentation of this file.
1/*
2 * This file is part of FFmpeg.
3 * Copyright (c) 2023 ARTHENICA LTD
4 *
5 * FFmpeg is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU Lesser General Public
7 * License as published by the Free Software Foundation; either
8 * version 2.1 of the License, or (at your option) any later version.
9 *
10 * FFmpeg is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * Lesser General Public License for more details.
14 *
15 * You should have received a copy of the GNU Lesser General Public
16 * License along with FFmpeg; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
18 */
19
20/*
21 * This file is the modified version of thread_queue.c file living in ffmpeg source code under the fftools folder. We
22 * manually update it each time we depend on a new ffmpeg version. Below you can see the list of changes applied
23 * by us to develop ffmpeg-kit library.
24 *
25 * ffmpeg-kit changes by ARTHENICA LTD
26 *
27 * 07.2023
28 * --------------------------------------------------------
29 * - FFmpeg 6.0 changes migrated
30 * - fftools header names updated
31 */
32
33#include <stdint.h>
34#include <string.h>
35
36#include "libavutil/avassert.h"
37#include "libavutil/error.h"
38#include "libavutil/fifo.h"
39#include "libavutil/intreadwrite.h"
40#include "libavutil/mem.h"
41#include "libavutil/thread.h"
42
43#include "fftools_objpool.h"
45
46enum {
47 FINISHED_SEND = (1 << 0),
48 FINISHED_RECV = (1 << 1),
49};
50
51typedef struct FifoElem {
52 void *obj;
53 unsigned int stream_idx;
55
58 unsigned int nb_streams;
59
60 AVFifo *fifo;
61
63 void (*obj_move)(void *dst, void *src);
64
65 pthread_mutex_t lock;
66 pthread_cond_t cond;
67};
68
70{
71 ThreadQueue *tq = *ptq;
72
73 if (!tq)
74 return;
75
76 if (tq->fifo) {
77 FifoElem elem;
78 while (av_fifo_read(tq->fifo, &elem, 1) >= 0)
79 objpool_release(tq->obj_pool, &elem.obj);
80 }
81 av_fifo_freep2(&tq->fifo);
82
84
85 av_freep(&tq->finished);
86
87 pthread_cond_destroy(&tq->cond);
88 pthread_mutex_destroy(&tq->lock);
89
90 av_freep(ptq);
91}
92
93ThreadQueue *tq_alloc(unsigned int nb_streams, size_t queue_size,
94 ObjPool *obj_pool, void (*obj_move)(void *dst, void *src))
95{
96 ThreadQueue *tq;
97 int ret;
98
99 tq = av_mallocz(sizeof(*tq));
100 if (!tq)
101 return NULL;
102
103 ret = pthread_cond_init(&tq->cond, NULL);
104 if (ret) {
105 av_freep(&tq);
106 return NULL;
107 }
108
109 ret = pthread_mutex_init(&tq->lock, NULL);
110 if (ret) {
111 pthread_cond_destroy(&tq->cond);
112 av_freep(&tq);
113 return NULL;
114 }
115
116 tq->finished = av_calloc(nb_streams, sizeof(*tq->finished));
117 if (!tq->finished)
118 goto fail;
120
121 tq->fifo = av_fifo_alloc2(queue_size, sizeof(FifoElem), 0);
122 if (!tq->fifo)
123 goto fail;
124
125 tq->obj_pool = obj_pool;
126 tq->obj_move = obj_move;
127
128 return tq;
129fail:
130 tq_free(&tq);
131 return NULL;
132}
133
134int tq_send(ThreadQueue *tq, unsigned int stream_idx, void *data)
135{
136 int *finished;
137 int ret;
138
139 av_assert0(stream_idx < tq->nb_streams);
140 finished = &tq->finished[stream_idx];
141
143
144 if (*finished & FINISHED_SEND) {
145 ret = AVERROR(EINVAL);
146 goto finish;
147 }
148
149 while (!(*finished & FINISHED_RECV) && !av_fifo_can_write(tq->fifo))
150 pthread_cond_wait(&tq->cond, &tq->lock);
151
152 if (*finished & FINISHED_RECV) {
153 ret = AVERROR_EOF;
154 *finished |= FINISHED_SEND;
155 } else {
156 FifoElem elem = { .stream_idx = stream_idx };
157
158 ret = objpool_get(tq->obj_pool, &elem.obj);
159 if (ret < 0)
160 goto finish;
161
162 tq->obj_move(elem.obj, data);
163
164 ret = av_fifo_write(tq->fifo, &elem, 1);
165 av_assert0(ret >= 0);
166 pthread_cond_broadcast(&tq->cond);
167 }
168
169finish:
171
172 return ret;
173}
174
175static int receive_locked(ThreadQueue *tq, int *stream_idx,
176 void *data)
177{
178 FifoElem elem;
179 unsigned int nb_finished = 0;
180
181 if (av_fifo_read(tq->fifo, &elem, 1) >= 0) {
182 tq->obj_move(data, elem.obj);
183 objpool_release(tq->obj_pool, &elem.obj);
184 *stream_idx = elem.stream_idx;
185 return 0;
186 }
187
188 for (unsigned int i = 0; i < tq->nb_streams; i++) {
189 if (!(tq->finished[i] & FINISHED_SEND))
190 continue;
191
192 /* return EOF to the consumer at most once for each stream */
193 if (!(tq->finished[i] & FINISHED_RECV)) {
194 tq->finished[i] |= FINISHED_RECV;
195 *stream_idx = i;
196 return AVERROR_EOF;
197 }
198
199 nb_finished++;
200 }
201
202 return nb_finished == tq->nb_streams ? AVERROR_EOF : AVERROR(EAGAIN);
203}
204
205int tq_receive(ThreadQueue *tq, int *stream_idx, void *data)
206{
207 int ret;
208
209 *stream_idx = -1;
210
212
213 while (1) {
214 ret = receive_locked(tq, stream_idx, data);
215 if (ret == AVERROR(EAGAIN)) {
216 pthread_cond_wait(&tq->cond, &tq->lock);
217 continue;
218 }
219
220 break;
221 }
222
223 if (ret == 0)
224 pthread_cond_broadcast(&tq->cond);
225
227
228 return ret;
229}
230
231void tq_send_finish(ThreadQueue *tq, unsigned int stream_idx)
232{
233 av_assert0(stream_idx < tq->nb_streams);
234
236
237 /* mark the stream as send-finished;
238 * next time the consumer thread tries to read this stream it will get
239 * an EOF and recv-finished flag will be set */
240 tq->finished[stream_idx] |= FINISHED_SEND;
241 pthread_cond_broadcast(&tq->cond);
242
244}
245
246void tq_receive_finish(ThreadQueue *tq, unsigned int stream_idx)
247{
248 av_assert0(stream_idx < tq->nb_streams);
249
251
252 /* mark the stream as recv-finished;
253 * next time the producer thread tries to send for this stream, it will
254 * get an EOF and send-finished flag will be set */
255 tq->finished[stream_idx] |= FINISHED_RECV;
256 pthread_cond_broadcast(&tq->cond);
257
259}
#define pthread_mutex_lock(a)
#define pthread_mutex_unlock(a)
__thread int nb_streams
void objpool_release(ObjPool *op, void **obj)
void objpool_free(ObjPool **pop)
int objpool_get(ObjPool *op, void **obj)
void tq_send_finish(ThreadQueue *tq, unsigned int stream_idx)
int tq_send(ThreadQueue *tq, unsigned int stream_idx, void *data)
@ FINISHED_SEND
@ FINISHED_RECV
ThreadQueue * tq_alloc(unsigned int nb_streams, size_t queue_size, ObjPool *obj_pool, void(*obj_move)(void *dst, void *src))
static int receive_locked(ThreadQueue *tq, int *stream_idx, void *data)
int tq_receive(ThreadQueue *tq, int *stream_idx, void *data)
void tq_free(ThreadQueue **ptq)
void tq_receive_finish(ThreadQueue *tq, unsigned int stream_idx)
unsigned int stream_idx
pthread_mutex_t lock
unsigned int nb_streams
pthread_cond_t cond
void(* obj_move)(void *dst, void *src)