FreeCalypso > hg > themwi-rtp-lib
comparison src/twjit_in.c @ 5:1bb26347e253
twjit: split into separate base/in/out modules
author | Mychaela Falconia <falcon@freecalypso.org> |
---|---|
date | Fri, 05 Jul 2024 21:24:56 +0000 |
parents | src/twjit.c@d10ea5dc61b3 |
children | 95f6c8ce33b0 |
comparison
equal
deleted
inserted
replaced
4:be04d84f5468 | 5:1bb26347e253 |
---|---|
1 /* | |
2 * Themyscira Wireless jitter buffer implementation: RTP input processing. | |
3 */ | |
4 | |
5 #include <stdint.h> | |
6 #include <stdbool.h> | |
7 #include <string.h> | |
8 #include <arpa/inet.h> /* for network byte order functions */ | |
9 | |
10 #include <osmocom/core/linuxlist.h> | |
11 #include <osmocom/core/msgb.h> | |
12 #include <osmocom/core/timer.h> | |
13 #include <osmocom/core/utils.h> | |
14 | |
15 /* FIXME: this libosmo-netif dependency needs to be removed */ | |
16 #include <osmocom/netif/rtp.h> | |
17 | |
18 #include <themwi/rtp/twjit.h> | |
19 | |
20 static void get_current_time(struct timeval *tp) | |
21 { | |
22 struct timespec now; | |
23 | |
24 osmo_clock_gettime(CLOCK_MONOTONIC, &now); | |
25 tp->tv_sec = now.tv_sec; | |
26 tp->tv_usec = now.tv_nsec / 1000; | |
27 } | |
28 | |
29 static void | |
30 init_subbuf_first_packet(struct twrtp_jibuf_inst *twjit, struct msgb *msg, | |
31 uint32_t rx_ssrc, uint32_t rx_ts, | |
32 const struct timeval *new_time) | |
33 { | |
34 struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->write_sb]; | |
35 | |
36 OSMO_ASSERT(llist_empty(&sb->queue)); | |
37 OSMO_ASSERT(sb->depth == 0); | |
38 /* all good, proceed */ | |
39 sb->ssrc = rx_ssrc; | |
40 sb->head_ts = rx_ts; | |
41 msgb_enqueue(&sb->queue, msg); | |
42 sb->depth = 1; | |
43 memcpy(&sb->last_arrival, &new_time, sizeof(struct timeval)); | |
44 memcpy(&sb->conf, twjit->ext_config, sizeof(struct twrtp_jibuf_config)); | |
45 sb->drop_int_count = 0; | |
46 } | |
47 | |
48 enum input_decision { | |
49 INPUT_CONTINUE, | |
50 INPUT_TOO_OLD, | |
51 INPUT_RESET, | |
52 }; | |
53 | |
54 static enum input_decision | |
55 check_input_for_subbuf(struct twrtp_jibuf_inst *twjit, bool starting, | |
56 uint32_t rx_ssrc, uint32_t rx_ts, | |
57 const struct timeval *new_time) | |
58 { | |
59 struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->write_sb]; | |
60 struct timeval time_delta; | |
61 int32_t ts_delta; | |
62 | |
63 if (rx_ssrc != sb->ssrc) | |
64 return INPUT_RESET; | |
65 if (starting) { | |
66 timersub(new_time, &sb->last_arrival, &time_delta); | |
67 sb->delta_ms = time_delta.tv_sec * 1000 + | |
68 time_delta.tv_usec / 1000; | |
69 memcpy(&sb->last_arrival, new_time, sizeof(struct timeval)); | |
70 } | |
71 ts_delta = (int32_t)(rx_ts - sb->head_ts); | |
72 if (ts_delta < 0) | |
73 return INPUT_TOO_OLD; | |
74 if (ts_delta % twjit->ts_quantum) | |
75 return INPUT_RESET; | |
76 if (starting) { | |
77 if (sb->conf.start_max_delta && | |
78 sb->delta_ms > sb->conf.start_max_delta) | |
79 return INPUT_RESET; | |
80 } else { | |
81 uint32_t fwd = ts_delta / twjit->ts_quantum; | |
82 | |
83 if (fwd >= sb->conf.max_future_sec * twjit->quanta_per_sec) | |
84 return INPUT_RESET; | |
85 } | |
86 return INPUT_CONTINUE; | |
87 } | |
88 | |
89 static void toss_write_queue(struct twrtp_jibuf_inst *twjit) | |
90 { | |
91 struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->write_sb]; | |
92 | |
93 msgb_queue_free(&sb->queue); | |
94 sb->depth = 0; | |
95 } | |
96 | |
97 static void insert_pkt_write_sb(struct twrtp_jibuf_inst *twjit, | |
98 struct msgb *new_msg, uint32_t rx_ts) | |
99 { | |
100 struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->write_sb]; | |
101 uint32_t ts_delta = rx_ts - sb->head_ts; | |
102 uint32_t ins_depth = ts_delta / twjit->ts_quantum; | |
103 struct msgb *old_msg; | |
104 uint32_t old_ts_delta; | |
105 | |
106 /* are we increasing total depth, and can we do simple tail append? */ | |
107 if (ins_depth >= sb->depth) { | |
108 msgb_enqueue(&sb->queue, new_msg); | |
109 sb->depth = ins_depth + 1; | |
110 return; | |
111 } | |
112 /* nope - do it the hard way */ | |
113 llist_for_each_entry(old_msg, &sb->queue, list) { | |
114 old_ts_delta = old_msg->cb[0] - sb->head_ts; | |
115 if (old_ts_delta == ts_delta) { | |
116 /* two packets with the same timestamp! */ | |
117 twjit->stats.duplicate_ts++; | |
118 msgb_free(new_msg); | |
119 return; | |
120 } | |
121 if (old_ts_delta > ts_delta) | |
122 break; | |
123 } | |
124 llist_add_tail(&new_msg->list, &old_msg->list); | |
125 } | |
126 | |
127 static void trim_starting_sb(struct twrtp_jibuf_inst *twjit) | |
128 { | |
129 struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->write_sb]; | |
130 struct msgb *msg; | |
131 uint32_t msg_ts, ts_adv, quantum_adv; | |
132 | |
133 while (sb->depth > sb->conf.bd_start) { | |
134 msg = msgb_dequeue(&sb->queue); | |
135 OSMO_ASSERT(msg); | |
136 msgb_free(msg); | |
137 OSMO_ASSERT(!llist_empty(&sb->queue)); | |
138 msg = llist_entry(sb->queue.next, struct msgb, list); | |
139 msg_ts = msg->cb[0]; | |
140 ts_adv = msg_ts - sb->head_ts; | |
141 quantum_adv = ts_adv / twjit->ts_quantum; | |
142 OSMO_ASSERT(sb->depth > quantum_adv); | |
143 sb->head_ts = msg_ts; | |
144 sb->depth -= quantum_adv; | |
145 } | |
146 } | |
147 | |
148 static void rx_raw_analytics(struct twrtp_jibuf_inst *twjit, uint32_t rx_ssrc, | |
149 uint16_t rx_seq, uint32_t rx_ts) | |
150 { | |
151 int16_t seq_delta; | |
152 int32_t ts_delta; | |
153 | |
154 if (twjit->last_ssrc != rx_ssrc) { | |
155 twjit->stats.ssrc_changes++; | |
156 return; | |
157 } | |
158 seq_delta = (int16_t)(rx_seq - twjit->last_seq); | |
159 if (seq_delta < 0) | |
160 twjit->stats.seq_backwards++; | |
161 else if (seq_delta == 0) | |
162 twjit->stats.seq_repeats++; | |
163 else if (seq_delta == 1) { | |
164 ts_delta = (int32_t)(rx_ts - twjit->last_ts); | |
165 if (ts_delta != twjit->ts_quantum) { | |
166 if (ts_delta > 0 && (ts_delta % twjit->ts_quantum) == 0) | |
167 twjit->stats.intentional_gaps++; | |
168 else | |
169 twjit->stats.ts_resets++; | |
170 } | |
171 } else | |
172 twjit->stats.seq_skips++; | |
173 } | |
174 | |
175 void twrtp_jibuf_input(struct twrtp_jibuf_inst *twjit, struct msgb *msg) | |
176 { | |
177 struct rtp_hdr *rtph; | |
178 uint32_t rx_ssrc, rx_ts; | |
179 uint16_t rx_seq; | |
180 struct timeval new_time; | |
181 enum input_decision id; | |
182 | |
183 rtph = osmo_rtp_get_hdr(msg); | |
184 if (!rtph) { | |
185 /* invalid packet, couldn't even get header from it */ | |
186 twjit->stats.bad_packets++; | |
187 msgb_free(msg); | |
188 return; | |
189 } | |
190 rx_ssrc = ntohl(rtph->ssrc); | |
191 rx_ts = ntohl(rtph->timestamp); | |
192 rx_seq = ntohs(rtph->sequence); | |
193 get_current_time(&new_time); | |
194 if (twjit->got_first_packet) | |
195 rx_raw_analytics(twjit, rx_ssrc, rx_seq, rx_ts); | |
196 twjit->last_ssrc = rx_ssrc; | |
197 twjit->last_seq = rx_seq; | |
198 twjit->last_ts = rx_ts; | |
199 twjit->got_first_packet = true; | |
200 msg->cb[0] = rx_ts; | |
201 | |
202 switch (twjit->state) { | |
203 case TWJIT_STATE_EMPTY: | |
204 /* first packet into totally empty buffer */ | |
205 twjit->state = TWJIT_STATE_HUNT; | |
206 twjit->write_sb = 0; | |
207 init_subbuf_first_packet(twjit, msg, rx_ssrc, rx_ts, &new_time); | |
208 return; | |
209 case TWJIT_STATE_HUNT: | |
210 case TWJIT_STATE_HANDOVER: | |
211 id = check_input_for_subbuf(twjit, true, rx_ssrc, rx_ts, | |
212 &new_time); | |
213 if (id == INPUT_TOO_OLD) { | |
214 msgb_free(msg); | |
215 return; | |
216 } | |
217 if (id == INPUT_RESET) { | |
218 toss_write_queue(twjit); | |
219 init_subbuf_first_packet(twjit, msg, rx_ssrc, rx_ts, | |
220 &new_time); | |
221 return; | |
222 } | |
223 insert_pkt_write_sb(twjit, msg, rx_ts); | |
224 trim_starting_sb(twjit); | |
225 return; | |
226 case TWJIT_STATE_FLOWING: | |
227 id = check_input_for_subbuf(twjit, false, rx_ssrc, rx_ts, | |
228 &new_time); | |
229 if (id == INPUT_TOO_OLD) { | |
230 twjit->stats.too_old++; | |
231 msgb_free(msg); | |
232 return; | |
233 } | |
234 if (id == INPUT_RESET) { | |
235 twjit->state = TWJIT_STATE_HANDOVER; | |
236 twjit->write_sb = !twjit->write_sb; | |
237 init_subbuf_first_packet(twjit, msg, rx_ssrc, rx_ts, | |
238 &new_time); | |
239 twjit->stats.handovers++; | |
240 return; | |
241 } | |
242 insert_pkt_write_sb(twjit, msg, rx_ts); | |
243 return; | |
244 default: | |
245 OSMO_ASSERT(0); | |
246 } | |
247 } |