FreeCalypso > hg > themwi-rtp-lib
comparison src/twjit_in.c @ 9:117fa99ff871
twjit: implement proper analytics
| author | Mychaela Falconia <falcon@freecalypso.org> |
|---|---|
| date | Fri, 05 Jul 2024 23:49:37 +0000 |
| parents | 95f6c8ce33b0 |
| children | e60df79cbe9f |
comparison
equal
deleted
inserted
replaced
| 8:95f6c8ce33b0 | 9:117fa99ff871 |
|---|---|
| 15 /* FIXME: this libosmo-netif dependency needs to be removed */ | 15 /* FIXME: this libosmo-netif dependency needs to be removed */ |
| 16 #include <osmocom/netif/rtp.h> | 16 #include <osmocom/netif/rtp.h> |
| 17 | 17 |
| 18 #include <themwi/rtp/twjit.h> | 18 #include <themwi/rtp/twjit.h> |
| 19 | 19 |
| 20 static void get_current_time(struct timeval *tp) | 20 /* raw analytics on the Rx packet stream */ |
| 21 { | 21 |
| 22 struct timespec now; | 22 static void analytics_init(struct twrtp_jibuf_inst *twjit, uint16_t rx_seq) |
| 23 | 23 { |
| 24 osmo_clock_gettime(CLOCK_MONOTONIC, &now); | 24 struct twrtp_jibuf_rr_info *rri = &twjit->rr_info; |
| 25 tp->tv_sec = now.tv_sec; | 25 |
| 26 tp->tv_usec = now.tv_nsec / 1000; | 26 rri->rx_packets = 1; |
| 27 } | 27 rri->base_seq = rx_seq; |
| 28 rri->max_seq_ext = rx_seq; | |
| 29 rri->expected_pkt = 1; | |
| 30 rri->jitter_accum = 0; | |
| 31 } | |
| 32 | |
| 33 static void analytics_cont(struct twrtp_jibuf_inst *twjit, uint16_t rx_seq, | |
| 34 uint32_t rx_ts, struct timespec *now) | |
| 35 { | |
| 36 struct twrtp_jibuf_rr_info *rri = &twjit->rr_info; | |
| 37 uint16_t seq_ext_lo = rri->max_seq_ext; | |
| 38 uint16_t seq_ext_hi = rri->max_seq_ext >> 16; | |
| 39 int16_t seq_delta = (int16_t)(rx_seq - twjit->last_seq); | |
| 40 int16_t seq_delta2 = (int16_t)(rx_seq - seq_ext_lo); | |
| 41 int32_t ts_delta = (int32_t)(rx_ts - twjit->last_ts); | |
| 42 struct timespec time_delta; | |
| 43 uint32_t time_delta_tsu; | |
| 44 int32_t jitter_new, ts_delta_clamp; | |
| 45 | |
| 46 /* analytics for our own stats */ | |
| 47 if (seq_delta < 0) | |
| 48 twjit->stats.seq_backwards++; | |
| 49 else if (seq_delta == 0) | |
| 50 twjit->stats.seq_repeats++; | |
| 51 else if (seq_delta == 1) { | |
| 52 if (ts_delta != twjit->ts_quantum) { | |
| 53 if (ts_delta > 0 && (ts_delta % twjit->ts_quantum) == 0) | |
| 54 twjit->stats.intentional_gaps++; | |
| 55 else | |
| 56 twjit->stats.ts_resets++; | |
| 57 } | |
| 58 } else | |
| 59 twjit->stats.seq_skips++; | |
| 60 | |
| 61 /* analytics for RTCP RR: packet counts */ | |
| 62 rri->rx_packets++; | |
| 63 if (seq_delta2 > 0) { | |
| 64 if (rx_seq < seq_ext_lo) | |
| 65 seq_ext_hi++; | |
| 66 seq_ext_lo = rx_seq; | |
| 67 rri->max_seq_ext = ((uint32_t) seq_ext_hi << 16) | seq_ext_lo; | |
| 68 rri->expected_pkt = rri->max_seq_ext - rri->base_seq + 1; | |
| 69 } | |
| 70 | |
| 71 /* time-of-arrival analytics */ | |
| 72 time_delta.tv_sec = now->tv_sec - twjit->last_arrival.tv_sec; | |
| 73 time_delta.tv_nsec = now->tv_nsec - twjit->last_arrival.tv_nsec; | |
| 74 if (time_delta.tv_nsec < 0) { | |
| 75 time_delta.tv_sec--; | |
| 76 time_delta.tv_nsec += 1000000000; | |
| 77 } | |
| 78 /* to avoid overflows in downstream math, clamp to 1 hour */ | |
| 79 if (time_delta.tv_sec >= 3600) { | |
| 80 time_delta.tv_sec = 3600; | |
| 81 time_delta.tv_nsec = 0; | |
| 82 } | |
| 83 /* convert to RTP timestamp units */ | |
| 84 time_delta_tsu = time_delta.tv_sec * twjit->ts_units_per_sec + | |
| 85 time_delta.tv_nsec / twjit->ns_to_ts_units; | |
| 86 twjit->last_arrival_delta = time_delta_tsu; | |
| 87 /* jitter calculation for RTCP RR */ | |
| 88 ts_delta_clamp = twjit->ts_units_per_sec * 3600; | |
| 89 if (ts_delta > ts_delta_clamp) | |
| 90 ts_delta = ts_delta_clamp; | |
| 91 else if (ts_delta < -ts_delta_clamp) | |
| 92 ts_delta = -ts_delta_clamp; | |
| 93 jitter_new = time_delta_tsu - ts_delta; | |
| 94 if (jitter_new < 0) | |
| 95 jitter_new = -jitter_new; | |
| 96 rri->jitter_accum += jitter_new - ((rri->jitter_accum + 8) >> 4); | |
| 97 } | |
| 98 | |
| 99 /* actual twjit input logic */ | |
| 28 | 100 |
| 29 static void | 101 static void |
| 30 init_subbuf_first_packet(struct twrtp_jibuf_inst *twjit, struct msgb *msg, | 102 init_subbuf_first_packet(struct twrtp_jibuf_inst *twjit, struct msgb *msg, |
| 31 uint32_t rx_ssrc, uint32_t rx_ts, | 103 uint32_t rx_ssrc, uint32_t rx_ts) |
| 32 const struct timeval *new_time) | |
| 33 { | 104 { |
| 34 struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->write_sb]; | 105 struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->write_sb]; |
| 35 | 106 |
| 36 OSMO_ASSERT(llist_empty(&sb->queue)); | 107 OSMO_ASSERT(llist_empty(&sb->queue)); |
| 37 OSMO_ASSERT(sb->depth == 0); | 108 OSMO_ASSERT(sb->depth == 0); |
| 38 /* all good, proceed */ | 109 /* all good, proceed */ |
| 39 sb->ssrc = rx_ssrc; | 110 sb->ssrc = rx_ssrc; |
| 40 sb->head_ts = rx_ts; | 111 sb->head_ts = rx_ts; |
| 41 msgb_enqueue(&sb->queue, msg); | 112 msgb_enqueue(&sb->queue, msg); |
| 42 sb->depth = 1; | 113 sb->depth = 1; |
| 43 memcpy(&sb->last_arrival, &new_time, sizeof(struct timeval)); | |
| 44 memcpy(&sb->conf, twjit->ext_config, sizeof(struct twrtp_jibuf_config)); | 114 memcpy(&sb->conf, twjit->ext_config, sizeof(struct twrtp_jibuf_config)); |
| 45 sb->drop_int_count = 0; | 115 sb->drop_int_count = 0; |
| 46 } | 116 } |
| 47 | 117 |
| 48 enum input_decision { | 118 enum input_decision { |
| 51 INPUT_RESET, | 121 INPUT_RESET, |
| 52 }; | 122 }; |
| 53 | 123 |
| 54 static enum input_decision | 124 static enum input_decision |
| 55 check_input_for_subbuf(struct twrtp_jibuf_inst *twjit, bool starting, | 125 check_input_for_subbuf(struct twrtp_jibuf_inst *twjit, bool starting, |
| 56 uint32_t rx_ssrc, uint32_t rx_ts, | 126 uint32_t rx_ssrc, uint32_t rx_ts) |
| 57 const struct timeval *new_time) | 127 { |
| 58 { | 128 struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->write_sb]; |
| 59 struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->write_sb]; | |
| 60 struct timeval time_delta; | |
| 61 int32_t ts_delta; | 129 int32_t ts_delta; |
| 62 | 130 |
| 63 if (rx_ssrc != sb->ssrc) | 131 if (rx_ssrc != sb->ssrc) |
| 64 return INPUT_RESET; | 132 return INPUT_RESET; |
| 65 if (starting) { | 133 sb->delta_ms = twjit->last_arrival_delta / twjit->ts_units_per_ms; |
| 66 timersub(new_time, &sb->last_arrival, &time_delta); | |
| 67 sb->delta_ms = time_delta.tv_sec * 1000 + | |
| 68 time_delta.tv_usec / 1000; | |
| 69 memcpy(&sb->last_arrival, new_time, sizeof(struct timeval)); | |
| 70 } | |
| 71 ts_delta = (int32_t)(rx_ts - sb->head_ts); | 134 ts_delta = (int32_t)(rx_ts - sb->head_ts); |
| 72 if (ts_delta < 0) | 135 if (ts_delta < 0) |
| 73 return INPUT_TOO_OLD; | 136 return INPUT_TOO_OLD; |
| 74 if (ts_delta % twjit->ts_quantum) | 137 if (ts_delta % twjit->ts_quantum) |
| 75 return INPUT_RESET; | 138 return INPUT_RESET; |
| 144 sb->head_ts = msg_ts; | 207 sb->head_ts = msg_ts; |
| 145 sb->depth -= quantum_adv; | 208 sb->depth -= quantum_adv; |
| 146 } | 209 } |
| 147 } | 210 } |
| 148 | 211 |
| 149 static void rx_raw_analytics(struct twrtp_jibuf_inst *twjit, uint32_t rx_ssrc, | |
| 150 uint16_t rx_seq, uint32_t rx_ts) | |
| 151 { | |
| 152 int16_t seq_delta; | |
| 153 int32_t ts_delta; | |
| 154 | |
| 155 if (twjit->last_ssrc != rx_ssrc) { | |
| 156 twjit->stats.ssrc_changes++; | |
| 157 return; | |
| 158 } | |
| 159 seq_delta = (int16_t)(rx_seq - twjit->last_seq); | |
| 160 if (seq_delta < 0) | |
| 161 twjit->stats.seq_backwards++; | |
| 162 else if (seq_delta == 0) | |
| 163 twjit->stats.seq_repeats++; | |
| 164 else if (seq_delta == 1) { | |
| 165 ts_delta = (int32_t)(rx_ts - twjit->last_ts); | |
| 166 if (ts_delta != twjit->ts_quantum) { | |
| 167 if (ts_delta > 0 && (ts_delta % twjit->ts_quantum) == 0) | |
| 168 twjit->stats.intentional_gaps++; | |
| 169 else | |
| 170 twjit->stats.ts_resets++; | |
| 171 } | |
| 172 } else | |
| 173 twjit->stats.seq_skips++; | |
| 174 } | |
| 175 | |
| 176 void twrtp_jibuf_input(struct twrtp_jibuf_inst *twjit, struct msgb *msg) | 212 void twrtp_jibuf_input(struct twrtp_jibuf_inst *twjit, struct msgb *msg) |
| 177 { | 213 { |
| 178 struct rtp_hdr *rtph; | 214 struct rtp_hdr *rtph; |
| 179 uint32_t rx_ssrc, rx_ts; | 215 uint32_t rx_ssrc, rx_ts; |
| 180 uint16_t rx_seq; | 216 uint16_t rx_seq; |
| 181 struct timeval new_time; | 217 struct timespec now; |
| 182 enum input_decision id; | 218 enum input_decision id; |
| 183 | 219 |
| 184 rtph = osmo_rtp_get_hdr(msg); | 220 rtph = osmo_rtp_get_hdr(msg); |
| 185 if (!rtph) { | 221 if (!rtph) { |
| 186 /* invalid packet, couldn't even get header from it */ | 222 /* invalid packet, couldn't even get header from it */ |
| 189 return; | 225 return; |
| 190 } | 226 } |
| 191 rx_ssrc = ntohl(rtph->ssrc); | 227 rx_ssrc = ntohl(rtph->ssrc); |
| 192 rx_ts = ntohl(rtph->timestamp); | 228 rx_ts = ntohl(rtph->timestamp); |
| 193 rx_seq = ntohs(rtph->sequence); | 229 rx_seq = ntohs(rtph->sequence); |
| 194 get_current_time(&new_time); | 230 osmo_clock_gettime(CLOCK_MONOTONIC, &now); |
| 195 if (twjit->got_first_packet) | 231 if (!twjit->got_first_packet) { |
| 196 rx_raw_analytics(twjit, rx_ssrc, rx_seq, rx_ts); | 232 analytics_init(twjit, rx_seq); |
| 233 twjit->got_first_packet = true; | |
| 234 } else if (rx_ssrc != twjit->last_ssrc) { | |
| 235 twjit->stats.ssrc_changes++; | |
| 236 analytics_init(twjit, rx_seq); | |
| 237 } else | |
| 238 analytics_cont(twjit, rx_seq, rx_ts, &now); | |
| 197 twjit->last_ssrc = rx_ssrc; | 239 twjit->last_ssrc = rx_ssrc; |
| 198 twjit->last_seq = rx_seq; | 240 twjit->last_seq = rx_seq; |
| 199 twjit->last_ts = rx_ts; | 241 twjit->last_ts = rx_ts; |
| 200 twjit->got_first_packet = true; | 242 memcpy(&twjit->last_arrival, &now, sizeof(struct timespec)); |
| 201 msg->cb[0] = rx_ts; | 243 msg->cb[0] = rx_ts; |
| 202 | 244 |
| 203 switch (twjit->state) { | 245 switch (twjit->state) { |
| 204 case TWJIT_STATE_EMPTY: | 246 case TWJIT_STATE_EMPTY: |
| 205 /* first packet into totally empty buffer */ | 247 /* first packet into totally empty buffer */ |
| 206 twjit->state = TWJIT_STATE_HUNT; | 248 twjit->state = TWJIT_STATE_HUNT; |
| 207 twjit->write_sb = 0; | 249 twjit->write_sb = 0; |
| 208 init_subbuf_first_packet(twjit, msg, rx_ssrc, rx_ts, &new_time); | 250 init_subbuf_first_packet(twjit, msg, rx_ssrc, rx_ts); |
| 209 return; | 251 return; |
| 210 case TWJIT_STATE_HUNT: | 252 case TWJIT_STATE_HUNT: |
| 211 case TWJIT_STATE_HANDOVER: | 253 case TWJIT_STATE_HANDOVER: |
| 212 id = check_input_for_subbuf(twjit, true, rx_ssrc, rx_ts, | 254 id = check_input_for_subbuf(twjit, true, rx_ssrc, rx_ts); |
| 213 &new_time); | |
| 214 if (id == INPUT_TOO_OLD) { | 255 if (id == INPUT_TOO_OLD) { |
| 215 msgb_free(msg); | 256 msgb_free(msg); |
| 216 return; | 257 return; |
| 217 } | 258 } |
| 218 if (id == INPUT_RESET) { | 259 if (id == INPUT_RESET) { |
| 219 toss_write_queue(twjit); | 260 toss_write_queue(twjit); |
| 220 init_subbuf_first_packet(twjit, msg, rx_ssrc, rx_ts, | 261 init_subbuf_first_packet(twjit, msg, rx_ssrc, rx_ts); |
| 221 &new_time); | |
| 222 return; | 262 return; |
| 223 } | 263 } |
| 224 insert_pkt_write_sb(twjit, msg, rx_ts); | 264 insert_pkt_write_sb(twjit, msg, rx_ts); |
| 225 trim_starting_sb(twjit); | 265 trim_starting_sb(twjit); |
| 226 return; | 266 return; |
| 227 case TWJIT_STATE_FLOWING: | 267 case TWJIT_STATE_FLOWING: |
| 228 id = check_input_for_subbuf(twjit, false, rx_ssrc, rx_ts, | 268 id = check_input_for_subbuf(twjit, false, rx_ssrc, rx_ts); |
| 229 &new_time); | |
| 230 if (id == INPUT_TOO_OLD) { | 269 if (id == INPUT_TOO_OLD) { |
| 231 twjit->stats.too_old++; | 270 twjit->stats.too_old++; |
| 232 msgb_free(msg); | 271 msgb_free(msg); |
| 233 return; | 272 return; |
| 234 } | 273 } |
| 235 if (id == INPUT_RESET) { | 274 if (id == INPUT_RESET) { |
| 236 twjit->state = TWJIT_STATE_HANDOVER; | 275 twjit->state = TWJIT_STATE_HANDOVER; |
| 237 twjit->write_sb = !twjit->write_sb; | 276 twjit->write_sb = !twjit->write_sb; |
| 238 init_subbuf_first_packet(twjit, msg, rx_ssrc, rx_ts, | 277 init_subbuf_first_packet(twjit, msg, rx_ssrc, rx_ts); |
| 239 &new_time); | |
| 240 twjit->stats.handovers++; | 278 twjit->stats.handovers++; |
| 241 return; | 279 return; |
| 242 } | 280 } |
| 243 insert_pkt_write_sb(twjit, msg, rx_ts); | 281 insert_pkt_write_sb(twjit, msg, rx_ts); |
| 244 return; | 282 return; |
