FreeCalypso > hg > themwi-rtp-lib
comparison src/twjit.c @ 3:d10ea5dc61b3
twjit: initial import from previous work repository
The initial development of twjit was done in a private branch of
osmo-bts git repository; the intent was to prototype twjit in osmo-bts.
However, as I am getting a better feel for the full scope of the problem
(producing a replacement for libortp), I changed course to the present
themwi-rtp-lib (libtwrtp) approach.
author | Mychaela Falconia <falcon@freecalypso.org> |
---|---|
date | Fri, 05 Jul 2024 18:50:48 +0000 |
parents | |
children | 1bb26347e253 |
comparison
equal
deleted
inserted
replaced
2:e3ab549d6a0f | 3:d10ea5dc61b3 |
---|---|
1 /* | |
2 * Themyscira Wireless jitter buffer implementation: code body. | |
3 */ | |
4 | |
5 #include <stdint.h> | |
6 #include <stdbool.h> | |
7 #include <string.h> | |
8 #include <arpa/inet.h> /* for network byte order functions */ | |
9 | |
10 #include <osmocom/core/linuxlist.h> | |
11 #include <osmocom/core/msgb.h> | |
12 #include <osmocom/core/talloc.h> | |
13 #include <osmocom/core/timer.h> | |
14 #include <osmocom/core/utils.h> | |
15 | |
16 /* FIXME: this libosmo-netif dependency needs to be removed */ | |
17 #include <osmocom/netif/rtp.h> | |
18 | |
19 #include <themwi/rtp/twjit.h> | |
20 | |
21 void twrtp_jibuf_init_defaults(struct twrtp_jibuf_config *config) | |
22 { | |
23 memset(config, 0, sizeof(struct twrtp_jibuf_config)); | |
24 config->bd_start = 2; /* smallest allowed */ | |
25 config->bd_hiwat = 3; /* Nstart+1 is practically-useful minimum */ | |
26 config->thinning_int = 17; /* prime number, usually 340 ms */ | |
27 config->max_future_sec = 10; /* 10 s is a long time for voice */ | |
28 } | |
29 | |
30 /* create and destroy functions */ | |
31 | |
32 struct twrtp_jibuf_inst * | |
33 twrtp_jibuf_create(void *ctx, uint16_t quantum_ms, uint32_t quantum_ts_inc, | |
34 struct twrtp_jibuf_config *config) | |
35 { | |
36 struct twrtp_jibuf_inst *twjit; | |
37 | |
38 twjit = talloc_zero(ctx, struct twrtp_jibuf_inst); | |
39 if (!twjit) | |
40 return NULL; | |
41 | |
42 twjit->ext_config = config; | |
43 twjit->ts_quantum = quantum_ts_inc; | |
44 twjit->quanta_per_sec = 1000 / quantum_ms; | |
45 twjit->state = TWJIT_STATE_EMPTY; | |
46 INIT_LLIST_HEAD(&twjit->sb[0].queue); | |
47 INIT_LLIST_HEAD(&twjit->sb[1].queue); | |
48 | |
49 return twjit; | |
50 } | |
51 | |
52 void twrtp_jibuf_destroy(struct twrtp_jibuf_inst *twjit) | |
53 { | |
54 msgb_queue_free(&twjit->sb[0].queue); | |
55 msgb_queue_free(&twjit->sb[1].queue); | |
56 talloc_free(twjit); | |
57 } | |
58 | |
59 /* RTP input to twjit */ | |
60 | |
61 static void get_current_time(struct timeval *tp) | |
62 { | |
63 struct timespec now; | |
64 | |
65 osmo_clock_gettime(CLOCK_MONOTONIC, &now); | |
66 tp->tv_sec = now.tv_sec; | |
67 tp->tv_usec = now.tv_nsec / 1000; | |
68 } | |
69 | |
70 static void | |
71 init_subbuf_first_packet(struct twrtp_jibuf_inst *twjit, struct msgb *msg, | |
72 uint32_t rx_ssrc, uint32_t rx_ts, | |
73 const struct timeval *new_time) | |
74 { | |
75 struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->write_sb]; | |
76 | |
77 OSMO_ASSERT(llist_empty(&sb->queue)); | |
78 OSMO_ASSERT(sb->depth == 0); | |
79 /* all good, proceed */ | |
80 sb->ssrc = rx_ssrc; | |
81 sb->head_ts = rx_ts; | |
82 msgb_enqueue(&sb->queue, msg); | |
83 sb->depth = 1; | |
84 memcpy(&sb->last_arrival, &new_time, sizeof(struct timeval)); | |
85 memcpy(&sb->conf, twjit->ext_config, sizeof(struct twrtp_jibuf_config)); | |
86 sb->drop_int_count = 0; | |
87 } | |
88 | |
89 enum input_decision { | |
90 INPUT_CONTINUE, | |
91 INPUT_TOO_OLD, | |
92 INPUT_RESET, | |
93 }; | |
94 | |
95 static enum input_decision | |
96 check_input_for_subbuf(struct twrtp_jibuf_inst *twjit, bool starting, | |
97 uint32_t rx_ssrc, uint32_t rx_ts, | |
98 const struct timeval *new_time) | |
99 { | |
100 struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->write_sb]; | |
101 struct timeval time_delta; | |
102 int32_t ts_delta; | |
103 | |
104 if (rx_ssrc != sb->ssrc) | |
105 return INPUT_RESET; | |
106 if (starting) { | |
107 timersub(new_time, &sb->last_arrival, &time_delta); | |
108 sb->delta_ms = time_delta.tv_sec * 1000 + | |
109 time_delta.tv_usec / 1000; | |
110 memcpy(&sb->last_arrival, new_time, sizeof(struct timeval)); | |
111 } | |
112 ts_delta = (int32_t)(rx_ts - sb->head_ts); | |
113 if (ts_delta < 0) | |
114 return INPUT_TOO_OLD; | |
115 if (ts_delta % twjit->ts_quantum) | |
116 return INPUT_RESET; | |
117 if (starting) { | |
118 if (sb->conf.start_max_delta && | |
119 sb->delta_ms > sb->conf.start_max_delta) | |
120 return INPUT_RESET; | |
121 } else { | |
122 uint32_t fwd = ts_delta / twjit->ts_quantum; | |
123 | |
124 if (fwd >= sb->conf.max_future_sec * twjit->quanta_per_sec) | |
125 return INPUT_RESET; | |
126 } | |
127 return INPUT_CONTINUE; | |
128 } | |
129 | |
130 static void toss_write_queue(struct twrtp_jibuf_inst *twjit) | |
131 { | |
132 struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->write_sb]; | |
133 | |
134 msgb_queue_free(&sb->queue); | |
135 sb->depth = 0; | |
136 } | |
137 | |
138 static void insert_pkt_write_sb(struct twrtp_jibuf_inst *twjit, | |
139 struct msgb *new_msg, uint32_t rx_ts) | |
140 { | |
141 struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->write_sb]; | |
142 uint32_t ts_delta = rx_ts - sb->head_ts; | |
143 uint32_t ins_depth = ts_delta / twjit->ts_quantum; | |
144 struct msgb *old_msg; | |
145 uint32_t old_ts_delta; | |
146 | |
147 /* are we increasing total depth, and can we do simple tail append? */ | |
148 if (ins_depth >= sb->depth) { | |
149 msgb_enqueue(&sb->queue, new_msg); | |
150 sb->depth = ins_depth + 1; | |
151 return; | |
152 } | |
153 /* nope - do it the hard way */ | |
154 llist_for_each_entry(old_msg, &sb->queue, list) { | |
155 old_ts_delta = old_msg->cb[0] - sb->head_ts; | |
156 if (old_ts_delta == ts_delta) { | |
157 /* two packets with the same timestamp! */ | |
158 twjit->stats.duplicate_ts++; | |
159 msgb_free(new_msg); | |
160 return; | |
161 } | |
162 if (old_ts_delta > ts_delta) | |
163 break; | |
164 } | |
165 llist_add_tail(&new_msg->list, &old_msg->list); | |
166 } | |
167 | |
168 static void trim_starting_sb(struct twrtp_jibuf_inst *twjit) | |
169 { | |
170 struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->write_sb]; | |
171 struct msgb *msg; | |
172 uint32_t msg_ts, ts_adv, quantum_adv; | |
173 | |
174 while (sb->depth > sb->conf.bd_start) { | |
175 msg = msgb_dequeue(&sb->queue); | |
176 OSMO_ASSERT(msg); | |
177 msgb_free(msg); | |
178 OSMO_ASSERT(!llist_empty(&sb->queue)); | |
179 msg = llist_entry(sb->queue.next, struct msgb, list); | |
180 msg_ts = msg->cb[0]; | |
181 ts_adv = msg_ts - sb->head_ts; | |
182 quantum_adv = ts_adv / twjit->ts_quantum; | |
183 OSMO_ASSERT(sb->depth > quantum_adv); | |
184 sb->head_ts = msg_ts; | |
185 sb->depth -= quantum_adv; | |
186 } | |
187 } | |
188 | |
189 static void rx_raw_analytics(struct twrtp_jibuf_inst *twjit, uint32_t rx_ssrc, | |
190 uint16_t rx_seq, uint32_t rx_ts) | |
191 { | |
192 int16_t seq_delta; | |
193 int32_t ts_delta; | |
194 | |
195 if (twjit->last_ssrc != rx_ssrc) { | |
196 twjit->stats.ssrc_changes++; | |
197 return; | |
198 } | |
199 seq_delta = (int16_t)(rx_seq - twjit->last_seq); | |
200 if (seq_delta < 0) | |
201 twjit->stats.seq_backwards++; | |
202 else if (seq_delta == 0) | |
203 twjit->stats.seq_repeats++; | |
204 else if (seq_delta == 1) { | |
205 ts_delta = (int32_t)(rx_ts - twjit->last_ts); | |
206 if (ts_delta != twjit->ts_quantum) { | |
207 if (ts_delta > 0 && (ts_delta % twjit->ts_quantum) == 0) | |
208 twjit->stats.intentional_gaps++; | |
209 else | |
210 twjit->stats.ts_resets++; | |
211 } | |
212 } else | |
213 twjit->stats.seq_skips++; | |
214 } | |
215 | |
216 void twrtp_jibuf_input(struct twrtp_jibuf_inst *twjit, struct msgb *msg) | |
217 { | |
218 struct rtp_hdr *rtph; | |
219 uint32_t rx_ssrc, rx_ts; | |
220 uint16_t rx_seq; | |
221 struct timeval new_time; | |
222 enum input_decision id; | |
223 | |
224 rtph = osmo_rtp_get_hdr(msg); | |
225 if (!rtph) { | |
226 /* invalid packet, couldn't even get header from it */ | |
227 twjit->stats.bad_packets++; | |
228 msgb_free(msg); | |
229 return; | |
230 } | |
231 rx_ssrc = ntohl(rtph->ssrc); | |
232 rx_ts = ntohl(rtph->timestamp); | |
233 rx_seq = ntohs(rtph->sequence); | |
234 get_current_time(&new_time); | |
235 if (twjit->got_first_packet) | |
236 rx_raw_analytics(twjit, rx_ssrc, rx_seq, rx_ts); | |
237 twjit->last_ssrc = rx_ssrc; | |
238 twjit->last_seq = rx_seq; | |
239 twjit->last_ts = rx_ts; | |
240 twjit->got_first_packet = true; | |
241 msg->cb[0] = rx_ts; | |
242 | |
243 switch (twjit->state) { | |
244 case TWJIT_STATE_EMPTY: | |
245 /* first packet into totally empty buffer */ | |
246 twjit->state = TWJIT_STATE_HUNT; | |
247 twjit->write_sb = 0; | |
248 init_subbuf_first_packet(twjit, msg, rx_ssrc, rx_ts, &new_time); | |
249 return; | |
250 case TWJIT_STATE_HUNT: | |
251 case TWJIT_STATE_HANDOVER: | |
252 id = check_input_for_subbuf(twjit, true, rx_ssrc, rx_ts, | |
253 &new_time); | |
254 if (id == INPUT_TOO_OLD) { | |
255 msgb_free(msg); | |
256 return; | |
257 } | |
258 if (id == INPUT_RESET) { | |
259 toss_write_queue(twjit); | |
260 init_subbuf_first_packet(twjit, msg, rx_ssrc, rx_ts, | |
261 &new_time); | |
262 return; | |
263 } | |
264 insert_pkt_write_sb(twjit, msg, rx_ts); | |
265 trim_starting_sb(twjit); | |
266 return; | |
267 case TWJIT_STATE_FLOWING: | |
268 id = check_input_for_subbuf(twjit, false, rx_ssrc, rx_ts, | |
269 &new_time); | |
270 if (id == INPUT_TOO_OLD) { | |
271 twjit->stats.too_old++; | |
272 msgb_free(msg); | |
273 return; | |
274 } | |
275 if (id == INPUT_RESET) { | |
276 twjit->state = TWJIT_STATE_HANDOVER; | |
277 twjit->write_sb = !twjit->write_sb; | |
278 init_subbuf_first_packet(twjit, msg, rx_ssrc, rx_ts, | |
279 &new_time); | |
280 twjit->stats.handovers++; | |
281 return; | |
282 } | |
283 insert_pkt_write_sb(twjit, msg, rx_ts); | |
284 return; | |
285 default: | |
286 OSMO_ASSERT(0); | |
287 } | |
288 } | |
289 | |
290 /* Output from twjit to the fixed timing system */ | |
291 | |
292 static bool starting_sb_is_ready(struct twrtp_jibuf_inst *twjit) | |
293 { | |
294 struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->write_sb]; | |
295 | |
296 if (sb->depth < sb->conf.bd_start) | |
297 return false; | |
298 if (sb->delta_ms < sb->conf.start_min_delta) | |
299 return false; | |
300 return true; | |
301 } | |
302 | |
303 static bool read_sb_is_empty(struct twrtp_jibuf_inst *twjit) | |
304 { | |
305 struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->read_sb]; | |
306 | |
307 return sb->depth == 0; | |
308 } | |
309 | |
310 static struct msgb *pull_from_read_sb(struct twrtp_jibuf_inst *twjit) | |
311 { | |
312 struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->read_sb]; | |
313 struct msgb *msg; | |
314 | |
315 OSMO_ASSERT(!llist_empty(&sb->queue)); | |
316 OSMO_ASSERT(sb->depth > 0); | |
317 msg = llist_entry(sb->queue.next, struct msgb, list); | |
318 if (msg->cb[0] == sb->head_ts) { | |
319 llist_del(&msg->list); | |
320 twjit->stats.delivered_pkt++; | |
321 twjit->stats.delivered_bytes += msg->len; | |
322 } else { | |
323 msg = NULL; | |
324 twjit->stats.output_gaps++; | |
325 } | |
326 sb->head_ts += twjit->ts_quantum; | |
327 sb->depth--; | |
328 return msg; | |
329 } | |
330 | |
331 static void read_sb_thinning(struct twrtp_jibuf_inst *twjit) | |
332 { | |
333 struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->read_sb]; | |
334 struct msgb *msg; | |
335 | |
336 if (sb->drop_int_count) { | |
337 sb->drop_int_count--; | |
338 return; | |
339 } | |
340 if (sb->depth <= sb->conf.bd_hiwat) | |
341 return; | |
342 twjit->stats.thinning_drops++; | |
343 msg = pull_from_read_sb(twjit); | |
344 if (msg) | |
345 msgb_free(msg); | |
346 sb->drop_int_count = sb->conf.thinning_int - 2; | |
347 } | |
348 | |
349 static void toss_read_queue(struct twrtp_jibuf_inst *twjit) | |
350 { | |
351 struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->read_sb]; | |
352 | |
353 msgb_queue_free(&sb->queue); | |
354 sb->depth = 0; | |
355 } | |
356 | |
357 struct msgb *twrtp_jibuf_output(struct twrtp_jibuf_inst *twjit) | |
358 { | |
359 switch (twjit->state) { | |
360 case TWJIT_STATE_EMPTY: | |
361 return NULL; | |
362 case TWJIT_STATE_HUNT: | |
363 if (!starting_sb_is_ready(twjit)) | |
364 return NULL; | |
365 twjit->state = TWJIT_STATE_FLOWING; | |
366 twjit->read_sb = twjit->write_sb; | |
367 return pull_from_read_sb(twjit); | |
368 case TWJIT_STATE_FLOWING: | |
369 if (read_sb_is_empty(twjit)) { | |
370 twjit->state = TWJIT_STATE_EMPTY; | |
371 twjit->stats.underruns++; | |
372 return NULL; | |
373 } | |
374 read_sb_thinning(twjit); | |
375 return pull_from_read_sb(twjit); | |
376 case TWJIT_STATE_HANDOVER: | |
377 if (starting_sb_is_ready(twjit)) { | |
378 toss_read_queue(twjit); | |
379 twjit->state = TWJIT_STATE_FLOWING; | |
380 twjit->read_sb = twjit->write_sb; | |
381 return pull_from_read_sb(twjit); | |
382 } | |
383 if (read_sb_is_empty(twjit)) { | |
384 twjit->state = TWJIT_STATE_HUNT; | |
385 twjit->stats.underruns++; | |
386 return NULL; | |
387 } | |
388 read_sb_thinning(twjit); | |
389 return pull_from_read_sb(twjit); | |
390 default: | |
391 OSMO_ASSERT(0); | |
392 } | |
393 } |