comparison src/twjit.c @ 5:1bb26347e253

twjit: split into separate base/in/out modules
author Mychaela Falconia <falcon@freecalypso.org>
date Fri, 05 Jul 2024 21:24:56 +0000
parents d10ea5dc61b3
children 668b84c52094
comparison
equal deleted inserted replaced
4:be04d84f5468 5:1bb26347e253
1 /* 1 /*
2 * Themyscira Wireless jitter buffer implementation: code body. 2 * Themyscira Wireless jitter buffer implementation: main body.
3 */ 3 */
4 4
5 #include <stdint.h> 5 #include <stdint.h>
6 #include <stdbool.h> 6 #include <stdbool.h>
7 #include <string.h> 7 #include <string.h>
8 #include <arpa/inet.h> /* for network byte order functions */
9 8
10 #include <osmocom/core/linuxlist.h> 9 #include <osmocom/core/linuxlist.h>
11 #include <osmocom/core/msgb.h> 10 #include <osmocom/core/msgb.h>
12 #include <osmocom/core/talloc.h> 11 #include <osmocom/core/talloc.h>
13 #include <osmocom/core/timer.h>
14 #include <osmocom/core/utils.h> 12 #include <osmocom/core/utils.h>
15
16 /* FIXME: this libosmo-netif dependency needs to be removed */
17 #include <osmocom/netif/rtp.h>
18 13
19 #include <themwi/rtp/twjit.h> 14 #include <themwi/rtp/twjit.h>
20 15
21 void twrtp_jibuf_init_defaults(struct twrtp_jibuf_config *config) 16 void twrtp_jibuf_init_defaults(struct twrtp_jibuf_config *config)
22 { 17 {
53 { 48 {
54 msgb_queue_free(&twjit->sb[0].queue); 49 msgb_queue_free(&twjit->sb[0].queue);
55 msgb_queue_free(&twjit->sb[1].queue); 50 msgb_queue_free(&twjit->sb[1].queue);
56 talloc_free(twjit); 51 talloc_free(twjit);
57 } 52 }
58
59 /* RTP input to twjit */
60
61 static void get_current_time(struct timeval *tp)
62 {
63 struct timespec now;
64
65 osmo_clock_gettime(CLOCK_MONOTONIC, &now);
66 tp->tv_sec = now.tv_sec;
67 tp->tv_usec = now.tv_nsec / 1000;
68 }
69
70 static void
71 init_subbuf_first_packet(struct twrtp_jibuf_inst *twjit, struct msgb *msg,
72 uint32_t rx_ssrc, uint32_t rx_ts,
73 const struct timeval *new_time)
74 {
75 struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->write_sb];
76
77 OSMO_ASSERT(llist_empty(&sb->queue));
78 OSMO_ASSERT(sb->depth == 0);
79 /* all good, proceed */
80 sb->ssrc = rx_ssrc;
81 sb->head_ts = rx_ts;
82 msgb_enqueue(&sb->queue, msg);
83 sb->depth = 1;
84 memcpy(&sb->last_arrival, &new_time, sizeof(struct timeval));
85 memcpy(&sb->conf, twjit->ext_config, sizeof(struct twrtp_jibuf_config));
86 sb->drop_int_count = 0;
87 }
88
89 enum input_decision {
90 INPUT_CONTINUE,
91 INPUT_TOO_OLD,
92 INPUT_RESET,
93 };
94
95 static enum input_decision
96 check_input_for_subbuf(struct twrtp_jibuf_inst *twjit, bool starting,
97 uint32_t rx_ssrc, uint32_t rx_ts,
98 const struct timeval *new_time)
99 {
100 struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->write_sb];
101 struct timeval time_delta;
102 int32_t ts_delta;
103
104 if (rx_ssrc != sb->ssrc)
105 return INPUT_RESET;
106 if (starting) {
107 timersub(new_time, &sb->last_arrival, &time_delta);
108 sb->delta_ms = time_delta.tv_sec * 1000 +
109 time_delta.tv_usec / 1000;
110 memcpy(&sb->last_arrival, new_time, sizeof(struct timeval));
111 }
112 ts_delta = (int32_t)(rx_ts - sb->head_ts);
113 if (ts_delta < 0)
114 return INPUT_TOO_OLD;
115 if (ts_delta % twjit->ts_quantum)
116 return INPUT_RESET;
117 if (starting) {
118 if (sb->conf.start_max_delta &&
119 sb->delta_ms > sb->conf.start_max_delta)
120 return INPUT_RESET;
121 } else {
122 uint32_t fwd = ts_delta / twjit->ts_quantum;
123
124 if (fwd >= sb->conf.max_future_sec * twjit->quanta_per_sec)
125 return INPUT_RESET;
126 }
127 return INPUT_CONTINUE;
128 }
129
130 static void toss_write_queue(struct twrtp_jibuf_inst *twjit)
131 {
132 struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->write_sb];
133
134 msgb_queue_free(&sb->queue);
135 sb->depth = 0;
136 }
137
138 static void insert_pkt_write_sb(struct twrtp_jibuf_inst *twjit,
139 struct msgb *new_msg, uint32_t rx_ts)
140 {
141 struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->write_sb];
142 uint32_t ts_delta = rx_ts - sb->head_ts;
143 uint32_t ins_depth = ts_delta / twjit->ts_quantum;
144 struct msgb *old_msg;
145 uint32_t old_ts_delta;
146
147 /* are we increasing total depth, and can we do simple tail append? */
148 if (ins_depth >= sb->depth) {
149 msgb_enqueue(&sb->queue, new_msg);
150 sb->depth = ins_depth + 1;
151 return;
152 }
153 /* nope - do it the hard way */
154 llist_for_each_entry(old_msg, &sb->queue, list) {
155 old_ts_delta = old_msg->cb[0] - sb->head_ts;
156 if (old_ts_delta == ts_delta) {
157 /* two packets with the same timestamp! */
158 twjit->stats.duplicate_ts++;
159 msgb_free(new_msg);
160 return;
161 }
162 if (old_ts_delta > ts_delta)
163 break;
164 }
165 llist_add_tail(&new_msg->list, &old_msg->list);
166 }
167
168 static void trim_starting_sb(struct twrtp_jibuf_inst *twjit)
169 {
170 struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->write_sb];
171 struct msgb *msg;
172 uint32_t msg_ts, ts_adv, quantum_adv;
173
174 while (sb->depth > sb->conf.bd_start) {
175 msg = msgb_dequeue(&sb->queue);
176 OSMO_ASSERT(msg);
177 msgb_free(msg);
178 OSMO_ASSERT(!llist_empty(&sb->queue));
179 msg = llist_entry(sb->queue.next, struct msgb, list);
180 msg_ts = msg->cb[0];
181 ts_adv = msg_ts - sb->head_ts;
182 quantum_adv = ts_adv / twjit->ts_quantum;
183 OSMO_ASSERT(sb->depth > quantum_adv);
184 sb->head_ts = msg_ts;
185 sb->depth -= quantum_adv;
186 }
187 }
188
189 static void rx_raw_analytics(struct twrtp_jibuf_inst *twjit, uint32_t rx_ssrc,
190 uint16_t rx_seq, uint32_t rx_ts)
191 {
192 int16_t seq_delta;
193 int32_t ts_delta;
194
195 if (twjit->last_ssrc != rx_ssrc) {
196 twjit->stats.ssrc_changes++;
197 return;
198 }
199 seq_delta = (int16_t)(rx_seq - twjit->last_seq);
200 if (seq_delta < 0)
201 twjit->stats.seq_backwards++;
202 else if (seq_delta == 0)
203 twjit->stats.seq_repeats++;
204 else if (seq_delta == 1) {
205 ts_delta = (int32_t)(rx_ts - twjit->last_ts);
206 if (ts_delta != twjit->ts_quantum) {
207 if (ts_delta > 0 && (ts_delta % twjit->ts_quantum) == 0)
208 twjit->stats.intentional_gaps++;
209 else
210 twjit->stats.ts_resets++;
211 }
212 } else
213 twjit->stats.seq_skips++;
214 }
215
216 void twrtp_jibuf_input(struct twrtp_jibuf_inst *twjit, struct msgb *msg)
217 {
218 struct rtp_hdr *rtph;
219 uint32_t rx_ssrc, rx_ts;
220 uint16_t rx_seq;
221 struct timeval new_time;
222 enum input_decision id;
223
224 rtph = osmo_rtp_get_hdr(msg);
225 if (!rtph) {
226 /* invalid packet, couldn't even get header from it */
227 twjit->stats.bad_packets++;
228 msgb_free(msg);
229 return;
230 }
231 rx_ssrc = ntohl(rtph->ssrc);
232 rx_ts = ntohl(rtph->timestamp);
233 rx_seq = ntohs(rtph->sequence);
234 get_current_time(&new_time);
235 if (twjit->got_first_packet)
236 rx_raw_analytics(twjit, rx_ssrc, rx_seq, rx_ts);
237 twjit->last_ssrc = rx_ssrc;
238 twjit->last_seq = rx_seq;
239 twjit->last_ts = rx_ts;
240 twjit->got_first_packet = true;
241 msg->cb[0] = rx_ts;
242
243 switch (twjit->state) {
244 case TWJIT_STATE_EMPTY:
245 /* first packet into totally empty buffer */
246 twjit->state = TWJIT_STATE_HUNT;
247 twjit->write_sb = 0;
248 init_subbuf_first_packet(twjit, msg, rx_ssrc, rx_ts, &new_time);
249 return;
250 case TWJIT_STATE_HUNT:
251 case TWJIT_STATE_HANDOVER:
252 id = check_input_for_subbuf(twjit, true, rx_ssrc, rx_ts,
253 &new_time);
254 if (id == INPUT_TOO_OLD) {
255 msgb_free(msg);
256 return;
257 }
258 if (id == INPUT_RESET) {
259 toss_write_queue(twjit);
260 init_subbuf_first_packet(twjit, msg, rx_ssrc, rx_ts,
261 &new_time);
262 return;
263 }
264 insert_pkt_write_sb(twjit, msg, rx_ts);
265 trim_starting_sb(twjit);
266 return;
267 case TWJIT_STATE_FLOWING:
268 id = check_input_for_subbuf(twjit, false, rx_ssrc, rx_ts,
269 &new_time);
270 if (id == INPUT_TOO_OLD) {
271 twjit->stats.too_old++;
272 msgb_free(msg);
273 return;
274 }
275 if (id == INPUT_RESET) {
276 twjit->state = TWJIT_STATE_HANDOVER;
277 twjit->write_sb = !twjit->write_sb;
278 init_subbuf_first_packet(twjit, msg, rx_ssrc, rx_ts,
279 &new_time);
280 twjit->stats.handovers++;
281 return;
282 }
283 insert_pkt_write_sb(twjit, msg, rx_ts);
284 return;
285 default:
286 OSMO_ASSERT(0);
287 }
288 }
289
290 /* Output from twjit to the fixed timing system */
291
292 static bool starting_sb_is_ready(struct twrtp_jibuf_inst *twjit)
293 {
294 struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->write_sb];
295
296 if (sb->depth < sb->conf.bd_start)
297 return false;
298 if (sb->delta_ms < sb->conf.start_min_delta)
299 return false;
300 return true;
301 }
302
303 static bool read_sb_is_empty(struct twrtp_jibuf_inst *twjit)
304 {
305 struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->read_sb];
306
307 return sb->depth == 0;
308 }
309
310 static struct msgb *pull_from_read_sb(struct twrtp_jibuf_inst *twjit)
311 {
312 struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->read_sb];
313 struct msgb *msg;
314
315 OSMO_ASSERT(!llist_empty(&sb->queue));
316 OSMO_ASSERT(sb->depth > 0);
317 msg = llist_entry(sb->queue.next, struct msgb, list);
318 if (msg->cb[0] == sb->head_ts) {
319 llist_del(&msg->list);
320 twjit->stats.delivered_pkt++;
321 twjit->stats.delivered_bytes += msg->len;
322 } else {
323 msg = NULL;
324 twjit->stats.output_gaps++;
325 }
326 sb->head_ts += twjit->ts_quantum;
327 sb->depth--;
328 return msg;
329 }
330
331 static void read_sb_thinning(struct twrtp_jibuf_inst *twjit)
332 {
333 struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->read_sb];
334 struct msgb *msg;
335
336 if (sb->drop_int_count) {
337 sb->drop_int_count--;
338 return;
339 }
340 if (sb->depth <= sb->conf.bd_hiwat)
341 return;
342 twjit->stats.thinning_drops++;
343 msg = pull_from_read_sb(twjit);
344 if (msg)
345 msgb_free(msg);
346 sb->drop_int_count = sb->conf.thinning_int - 2;
347 }
348
349 static void toss_read_queue(struct twrtp_jibuf_inst *twjit)
350 {
351 struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->read_sb];
352
353 msgb_queue_free(&sb->queue);
354 sb->depth = 0;
355 }
356
357 struct msgb *twrtp_jibuf_output(struct twrtp_jibuf_inst *twjit)
358 {
359 switch (twjit->state) {
360 case TWJIT_STATE_EMPTY:
361 return NULL;
362 case TWJIT_STATE_HUNT:
363 if (!starting_sb_is_ready(twjit))
364 return NULL;
365 twjit->state = TWJIT_STATE_FLOWING;
366 twjit->read_sb = twjit->write_sb;
367 return pull_from_read_sb(twjit);
368 case TWJIT_STATE_FLOWING:
369 if (read_sb_is_empty(twjit)) {
370 twjit->state = TWJIT_STATE_EMPTY;
371 twjit->stats.underruns++;
372 return NULL;
373 }
374 read_sb_thinning(twjit);
375 return pull_from_read_sb(twjit);
376 case TWJIT_STATE_HANDOVER:
377 if (starting_sb_is_ready(twjit)) {
378 toss_read_queue(twjit);
379 twjit->state = TWJIT_STATE_FLOWING;
380 twjit->read_sb = twjit->write_sb;
381 return pull_from_read_sb(twjit);
382 }
383 if (read_sb_is_empty(twjit)) {
384 twjit->state = TWJIT_STATE_HUNT;
385 twjit->stats.underruns++;
386 return NULL;
387 }
388 read_sb_thinning(twjit);
389 return pull_from_read_sb(twjit);
390 default:
391 OSMO_ASSERT(0);
392 }
393 }