rumqttc/v5/mqttbytes/v5/
suback.rs

1use super::*;
2use bytes::{Buf, BufMut, Bytes, BytesMut};
3
4/// Acknowledgement to subscribe
5#[derive(Debug, Clone, PartialEq, Eq)]
6pub struct SubAck {
7    pub pkid: u16,
8    pub return_codes: Vec<SubscribeReasonCode>,
9    pub properties: Option<SubAckProperties>,
10}
11
12impl SubAck {
13    fn len(&self) -> usize {
14        let mut len = 2 + self.return_codes.len();
15
16        if let Some(p) = &self.properties {
17            let properties_len = p.len();
18            let properties_len_len = len_len(properties_len);
19            len += properties_len_len + properties_len;
20        } else {
21            // just 1 byte representing 0 len
22            len += 1;
23        }
24
25        len
26    }
27
28    pub fn size(&self) -> usize {
29        let len = self.len();
30        let remaining_len_size = len_len(len);
31
32        1 + remaining_len_size + len
33    }
34
35    pub fn read(fixed_header: FixedHeader, mut bytes: Bytes) -> Result<SubAck, Error> {
36        let variable_header_index = fixed_header.fixed_header_len;
37        bytes.advance(variable_header_index);
38
39        let pkid = read_u16(&mut bytes)?;
40        let properties = SubAckProperties::read(&mut bytes)?;
41
42        if !bytes.has_remaining() {
43            return Err(Error::MalformedPacket);
44        }
45
46        let mut return_codes = Vec::new();
47        while bytes.has_remaining() {
48            let return_code = read_u8(&mut bytes)?;
49            return_codes.push(reason(return_code)?);
50        }
51
52        let suback = SubAck {
53            pkid,
54            return_codes,
55            properties,
56        };
57
58        Ok(suback)
59    }
60
61    pub fn write(&self, buffer: &mut BytesMut) -> Result<usize, Error> {
62        buffer.put_u8(0x90);
63        let remaining_len = self.len();
64        let remaining_len_bytes = write_remaining_length(buffer, remaining_len)?;
65
66        buffer.put_u16(self.pkid);
67
68        if let Some(p) = &self.properties {
69            p.write(buffer)?;
70        } else {
71            write_remaining_length(buffer, 0)?;
72        }
73
74        let p: Vec<u8> = self.return_codes.iter().map(|&c| code(c)).collect();
75
76        buffer.extend_from_slice(&p);
77        Ok(1 + remaining_len_bytes + remaining_len)
78    }
79}
80
81#[derive(Debug, Clone, Copy, PartialEq, Eq)]
82pub enum SubscribeReasonCode {
83    Success(QoS),
84    Failure,
85    Unspecified,
86    ImplementationSpecific,
87    NotAuthorized,
88    TopicFilterInvalid,
89    PkidInUse,
90    QuotaExceeded,
91    SharedSubscriptionsNotSupported,
92    SubscriptionIdNotSupported,
93    WildcardSubscriptionsNotSupported,
94}
95
96#[derive(Debug, Clone, PartialEq, Eq)]
97pub struct SubAckProperties {
98    pub reason_string: Option<String>,
99    pub user_properties: Vec<(String, String)>,
100}
101
102impl SubAckProperties {
103    fn len(&self) -> usize {
104        let mut len = 0;
105
106        if let Some(reason) = &self.reason_string {
107            len += 1 + 2 + reason.len();
108        }
109
110        for (key, value) in self.user_properties.iter() {
111            len += 1 + 2 + key.len() + 2 + value.len();
112        }
113
114        len
115    }
116
117    pub fn read(bytes: &mut Bytes) -> Result<Option<SubAckProperties>, Error> {
118        let mut reason_string = None;
119        let mut user_properties = Vec::new();
120
121        let (properties_len_len, properties_len) = length(bytes.iter())?;
122        bytes.advance(properties_len_len);
123        if properties_len == 0 {
124            return Ok(None);
125        }
126
127        let mut cursor = 0;
128        // read until cursor reaches property length. properties_len = 0 will skip this loop
129        while cursor < properties_len {
130            let prop = read_u8(bytes)?;
131            cursor += 1;
132
133            match property(prop)? {
134                PropertyType::ReasonString => {
135                    let reason = read_mqtt_string(bytes)?;
136                    cursor += 2 + reason.len();
137                    reason_string = Some(reason);
138                }
139                PropertyType::UserProperty => {
140                    let key = read_mqtt_string(bytes)?;
141                    let value = read_mqtt_string(bytes)?;
142                    cursor += 2 + key.len() + 2 + value.len();
143                    user_properties.push((key, value));
144                }
145                _ => return Err(Error::InvalidPropertyType(prop)),
146            }
147        }
148
149        Ok(Some(SubAckProperties {
150            reason_string,
151            user_properties,
152        }))
153    }
154
155    pub fn write(&self, buffer: &mut BytesMut) -> Result<(), Error> {
156        let len = self.len();
157        write_remaining_length(buffer, len)?;
158
159        if let Some(reason) = &self.reason_string {
160            buffer.put_u8(PropertyType::ReasonString as u8);
161            write_mqtt_string(buffer, reason);
162        }
163
164        for (key, value) in self.user_properties.iter() {
165            buffer.put_u8(PropertyType::UserProperty as u8);
166            write_mqtt_string(buffer, key);
167            write_mqtt_string(buffer, value);
168        }
169
170        Ok(())
171    }
172}
173
174fn reason(code: u8) -> Result<SubscribeReasonCode, Error> {
175    let v = match code {
176        0 => SubscribeReasonCode::Success(QoS::AtMostOnce),
177        1 => SubscribeReasonCode::Success(QoS::AtLeastOnce),
178        2 => SubscribeReasonCode::Success(QoS::ExactlyOnce),
179        128 => SubscribeReasonCode::Unspecified,
180        131 => SubscribeReasonCode::ImplementationSpecific,
181        135 => SubscribeReasonCode::NotAuthorized,
182        143 => SubscribeReasonCode::TopicFilterInvalid,
183        145 => SubscribeReasonCode::PkidInUse,
184        151 => SubscribeReasonCode::QuotaExceeded,
185        158 => SubscribeReasonCode::SharedSubscriptionsNotSupported,
186        161 => SubscribeReasonCode::SubscriptionIdNotSupported,
187        162 => SubscribeReasonCode::WildcardSubscriptionsNotSupported,
188        v => return Err(Error::InvalidSubscribeReasonCode(v)),
189    };
190
191    Ok(v)
192}
193
194fn code(value: SubscribeReasonCode) -> u8 {
195    match value {
196        SubscribeReasonCode::Success(qos) => qos as u8,
197        SubscribeReasonCode::Failure => 0x80,
198        SubscribeReasonCode::Unspecified => 128,
199        SubscribeReasonCode::ImplementationSpecific => 131,
200        SubscribeReasonCode::NotAuthorized => 135,
201        SubscribeReasonCode::TopicFilterInvalid => 143,
202        SubscribeReasonCode::PkidInUse => 145,
203        SubscribeReasonCode::QuotaExceeded => 151,
204        SubscribeReasonCode::SharedSubscriptionsNotSupported => 158,
205        SubscribeReasonCode::SubscriptionIdNotSupported => 161,
206        SubscribeReasonCode::WildcardSubscriptionsNotSupported => 162,
207    }
208}
209
210#[cfg(test)]
211mod test {
212    use super::super::test::{USER_PROP_KEY, USER_PROP_VAL};
213    use super::*;
214    use bytes::BytesMut;
215    use pretty_assertions::assert_eq;
216
217    #[test]
218    fn length_calculation() {
219        let mut dummy_bytes = BytesMut::new();
220        // Use user_properties to pad the size to exceed ~128 bytes to make the
221        // remaining_length field in the packet be 2 bytes long.
222        let suback_props = SubAckProperties {
223            reason_string: None,
224            user_properties: vec![(USER_PROP_KEY.into(), USER_PROP_VAL.into())],
225        };
226
227        let suback_pkt = SubAck {
228            pkid: 1,
229            return_codes: vec![SubscribeReasonCode::Success(QoS::ExactlyOnce)],
230            properties: Some(suback_props),
231        };
232
233        let size_from_size = suback_pkt.size();
234        let size_from_write = suback_pkt.write(&mut dummy_bytes).unwrap();
235        let size_from_bytes = dummy_bytes.len();
236
237        assert_eq!(size_from_write, size_from_bytes);
238        assert_eq!(size_from_size, size_from_bytes);
239    }
240}