comparison src/twjit_in.c @ 5:1bb26347e253

twjit: split into separate base/in/out modules
author Mychaela Falconia <falcon@freecalypso.org>
date Fri, 05 Jul 2024 21:24:56 +0000
parents src/twjit.c@d10ea5dc61b3
children 95f6c8ce33b0
comparison
equal deleted inserted replaced
4:be04d84f5468 5:1bb26347e253
1 /*
2 * Themyscira Wireless jitter buffer implementation: RTP input processing.
3 */
4
5 #include <stdint.h>
6 #include <stdbool.h>
7 #include <string.h>
8 #include <arpa/inet.h> /* for network byte order functions */
9
10 #include <osmocom/core/linuxlist.h>
11 #include <osmocom/core/msgb.h>
12 #include <osmocom/core/timer.h>
13 #include <osmocom/core/utils.h>
14
15 /* FIXME: this libosmo-netif dependency needs to be removed */
16 #include <osmocom/netif/rtp.h>
17
18 #include <themwi/rtp/twjit.h>
19
20 static void get_current_time(struct timeval *tp)
21 {
22 struct timespec now;
23
24 osmo_clock_gettime(CLOCK_MONOTONIC, &now);
25 tp->tv_sec = now.tv_sec;
26 tp->tv_usec = now.tv_nsec / 1000;
27 }
28
29 static void
30 init_subbuf_first_packet(struct twrtp_jibuf_inst *twjit, struct msgb *msg,
31 uint32_t rx_ssrc, uint32_t rx_ts,
32 const struct timeval *new_time)
33 {
34 struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->write_sb];
35
36 OSMO_ASSERT(llist_empty(&sb->queue));
37 OSMO_ASSERT(sb->depth == 0);
38 /* all good, proceed */
39 sb->ssrc = rx_ssrc;
40 sb->head_ts = rx_ts;
41 msgb_enqueue(&sb->queue, msg);
42 sb->depth = 1;
43 memcpy(&sb->last_arrival, &new_time, sizeof(struct timeval));
44 memcpy(&sb->conf, twjit->ext_config, sizeof(struct twrtp_jibuf_config));
45 sb->drop_int_count = 0;
46 }
47
48 enum input_decision {
49 INPUT_CONTINUE,
50 INPUT_TOO_OLD,
51 INPUT_RESET,
52 };
53
54 static enum input_decision
55 check_input_for_subbuf(struct twrtp_jibuf_inst *twjit, bool starting,
56 uint32_t rx_ssrc, uint32_t rx_ts,
57 const struct timeval *new_time)
58 {
59 struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->write_sb];
60 struct timeval time_delta;
61 int32_t ts_delta;
62
63 if (rx_ssrc != sb->ssrc)
64 return INPUT_RESET;
65 if (starting) {
66 timersub(new_time, &sb->last_arrival, &time_delta);
67 sb->delta_ms = time_delta.tv_sec * 1000 +
68 time_delta.tv_usec / 1000;
69 memcpy(&sb->last_arrival, new_time, sizeof(struct timeval));
70 }
71 ts_delta = (int32_t)(rx_ts - sb->head_ts);
72 if (ts_delta < 0)
73 return INPUT_TOO_OLD;
74 if (ts_delta % twjit->ts_quantum)
75 return INPUT_RESET;
76 if (starting) {
77 if (sb->conf.start_max_delta &&
78 sb->delta_ms > sb->conf.start_max_delta)
79 return INPUT_RESET;
80 } else {
81 uint32_t fwd = ts_delta / twjit->ts_quantum;
82
83 if (fwd >= sb->conf.max_future_sec * twjit->quanta_per_sec)
84 return INPUT_RESET;
85 }
86 return INPUT_CONTINUE;
87 }
88
89 static void toss_write_queue(struct twrtp_jibuf_inst *twjit)
90 {
91 struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->write_sb];
92
93 msgb_queue_free(&sb->queue);
94 sb->depth = 0;
95 }
96
97 static void insert_pkt_write_sb(struct twrtp_jibuf_inst *twjit,
98 struct msgb *new_msg, uint32_t rx_ts)
99 {
100 struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->write_sb];
101 uint32_t ts_delta = rx_ts - sb->head_ts;
102 uint32_t ins_depth = ts_delta / twjit->ts_quantum;
103 struct msgb *old_msg;
104 uint32_t old_ts_delta;
105
106 /* are we increasing total depth, and can we do simple tail append? */
107 if (ins_depth >= sb->depth) {
108 msgb_enqueue(&sb->queue, new_msg);
109 sb->depth = ins_depth + 1;
110 return;
111 }
112 /* nope - do it the hard way */
113 llist_for_each_entry(old_msg, &sb->queue, list) {
114 old_ts_delta = old_msg->cb[0] - sb->head_ts;
115 if (old_ts_delta == ts_delta) {
116 /* two packets with the same timestamp! */
117 twjit->stats.duplicate_ts++;
118 msgb_free(new_msg);
119 return;
120 }
121 if (old_ts_delta > ts_delta)
122 break;
123 }
124 llist_add_tail(&new_msg->list, &old_msg->list);
125 }
126
127 static void trim_starting_sb(struct twrtp_jibuf_inst *twjit)
128 {
129 struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->write_sb];
130 struct msgb *msg;
131 uint32_t msg_ts, ts_adv, quantum_adv;
132
133 while (sb->depth > sb->conf.bd_start) {
134 msg = msgb_dequeue(&sb->queue);
135 OSMO_ASSERT(msg);
136 msgb_free(msg);
137 OSMO_ASSERT(!llist_empty(&sb->queue));
138 msg = llist_entry(sb->queue.next, struct msgb, list);
139 msg_ts = msg->cb[0];
140 ts_adv = msg_ts - sb->head_ts;
141 quantum_adv = ts_adv / twjit->ts_quantum;
142 OSMO_ASSERT(sb->depth > quantum_adv);
143 sb->head_ts = msg_ts;
144 sb->depth -= quantum_adv;
145 }
146 }
147
148 static void rx_raw_analytics(struct twrtp_jibuf_inst *twjit, uint32_t rx_ssrc,
149 uint16_t rx_seq, uint32_t rx_ts)
150 {
151 int16_t seq_delta;
152 int32_t ts_delta;
153
154 if (twjit->last_ssrc != rx_ssrc) {
155 twjit->stats.ssrc_changes++;
156 return;
157 }
158 seq_delta = (int16_t)(rx_seq - twjit->last_seq);
159 if (seq_delta < 0)
160 twjit->stats.seq_backwards++;
161 else if (seq_delta == 0)
162 twjit->stats.seq_repeats++;
163 else if (seq_delta == 1) {
164 ts_delta = (int32_t)(rx_ts - twjit->last_ts);
165 if (ts_delta != twjit->ts_quantum) {
166 if (ts_delta > 0 && (ts_delta % twjit->ts_quantum) == 0)
167 twjit->stats.intentional_gaps++;
168 else
169 twjit->stats.ts_resets++;
170 }
171 } else
172 twjit->stats.seq_skips++;
173 }
174
175 void twrtp_jibuf_input(struct twrtp_jibuf_inst *twjit, struct msgb *msg)
176 {
177 struct rtp_hdr *rtph;
178 uint32_t rx_ssrc, rx_ts;
179 uint16_t rx_seq;
180 struct timeval new_time;
181 enum input_decision id;
182
183 rtph = osmo_rtp_get_hdr(msg);
184 if (!rtph) {
185 /* invalid packet, couldn't even get header from it */
186 twjit->stats.bad_packets++;
187 msgb_free(msg);
188 return;
189 }
190 rx_ssrc = ntohl(rtph->ssrc);
191 rx_ts = ntohl(rtph->timestamp);
192 rx_seq = ntohs(rtph->sequence);
193 get_current_time(&new_time);
194 if (twjit->got_first_packet)
195 rx_raw_analytics(twjit, rx_ssrc, rx_seq, rx_ts);
196 twjit->last_ssrc = rx_ssrc;
197 twjit->last_seq = rx_seq;
198 twjit->last_ts = rx_ts;
199 twjit->got_first_packet = true;
200 msg->cb[0] = rx_ts;
201
202 switch (twjit->state) {
203 case TWJIT_STATE_EMPTY:
204 /* first packet into totally empty buffer */
205 twjit->state = TWJIT_STATE_HUNT;
206 twjit->write_sb = 0;
207 init_subbuf_first_packet(twjit, msg, rx_ssrc, rx_ts, &new_time);
208 return;
209 case TWJIT_STATE_HUNT:
210 case TWJIT_STATE_HANDOVER:
211 id = check_input_for_subbuf(twjit, true, rx_ssrc, rx_ts,
212 &new_time);
213 if (id == INPUT_TOO_OLD) {
214 msgb_free(msg);
215 return;
216 }
217 if (id == INPUT_RESET) {
218 toss_write_queue(twjit);
219 init_subbuf_first_packet(twjit, msg, rx_ssrc, rx_ts,
220 &new_time);
221 return;
222 }
223 insert_pkt_write_sb(twjit, msg, rx_ts);
224 trim_starting_sb(twjit);
225 return;
226 case TWJIT_STATE_FLOWING:
227 id = check_input_for_subbuf(twjit, false, rx_ssrc, rx_ts,
228 &new_time);
229 if (id == INPUT_TOO_OLD) {
230 twjit->stats.too_old++;
231 msgb_free(msg);
232 return;
233 }
234 if (id == INPUT_RESET) {
235 twjit->state = TWJIT_STATE_HANDOVER;
236 twjit->write_sb = !twjit->write_sb;
237 init_subbuf_first_packet(twjit, msg, rx_ssrc, rx_ts,
238 &new_time);
239 twjit->stats.handovers++;
240 return;
241 }
242 insert_pkt_write_sb(twjit, msg, rx_ts);
243 return;
244 default:
245 OSMO_ASSERT(0);
246 }
247 }