rumqttc/v5/mqttbytes/v5/
unsuback.rs

1use super::*;
2use bytes::{Buf, BufMut, Bytes, BytesMut};
3
4/// Acknowledgement to unsubscribe
5#[derive(Debug, Clone, PartialEq, Eq)]
6pub struct UnsubAck {
7    pub pkid: u16,
8    pub reasons: Vec<UnsubAckReason>,
9    pub properties: Option<UnsubAckProperties>,
10}
11
12impl UnsubAck {
13    fn len(&self) -> usize {
14        let mut len = 2 + self.reasons.len();
15
16        if let Some(p) = &self.properties {
17            let properties_len = p.len();
18            let properties_len_len = len_len(properties_len);
19            len += properties_len_len + properties_len;
20        } else {
21            // just 1 byte representing 0 len
22            len += 1;
23        }
24
25        len
26    }
27
28    pub fn size(&self) -> usize {
29        let len = self.len();
30        let remaining_len_size = len_len(len);
31
32        1 + remaining_len_size + len
33    }
34
35    pub fn read(fixed_header: FixedHeader, mut bytes: Bytes) -> Result<UnsubAck, Error> {
36        let variable_header_index = fixed_header.fixed_header_len;
37        bytes.advance(variable_header_index);
38
39        let pkid = read_u16(&mut bytes)?;
40        let properties = UnsubAckProperties::read(&mut bytes)?;
41
42        if !bytes.has_remaining() {
43            return Err(Error::MalformedPacket);
44        }
45
46        let mut reasons = Vec::new();
47        while bytes.has_remaining() {
48            let r = read_u8(&mut bytes)?;
49            reasons.push(reason(r)?);
50        }
51
52        let unsuback = UnsubAck {
53            pkid,
54            reasons,
55            properties,
56        };
57
58        Ok(unsuback)
59    }
60
61    pub fn write(&self, buffer: &mut BytesMut) -> Result<usize, Error> {
62        buffer.put_u8(0xB0);
63        let remaining_len = self.len();
64        let remaining_len_bytes = write_remaining_length(buffer, remaining_len)?;
65
66        buffer.put_u16(self.pkid);
67
68        if let Some(p) = &self.properties {
69            p.write(buffer)?;
70        } else {
71            write_remaining_length(buffer, 0)?;
72        }
73
74        let p: Vec<u8> = self.reasons.iter().map(|&c| code(c)).collect();
75        buffer.extend_from_slice(&p);
76        Ok(1 + remaining_len_bytes + remaining_len)
77    }
78}
79
80#[derive(Debug, Clone, Copy, PartialEq, Eq)]
81#[repr(u8)]
82pub enum UnsubAckReason {
83    Success,
84    NoSubscriptionExisted,
85    UnspecifiedError,
86    ImplementationSpecificError,
87    NotAuthorized,
88    TopicFilterInvalid,
89    PacketIdentifierInUse,
90}
91
92#[derive(Debug, Clone, PartialEq, Eq)]
93pub struct UnsubAckProperties {
94    pub reason_string: Option<String>,
95    pub user_properties: Vec<(String, String)>,
96}
97
98impl UnsubAckProperties {
99    fn len(&self) -> usize {
100        let mut len = 0;
101
102        if let Some(reason) = &self.reason_string {
103            len += 1 + 2 + reason.len();
104        }
105
106        for (key, value) in self.user_properties.iter() {
107            len += 1 + 2 + key.len() + 2 + value.len();
108        }
109
110        len
111    }
112
113    pub fn read(bytes: &mut Bytes) -> Result<Option<UnsubAckProperties>, Error> {
114        let mut reason_string = None;
115        let mut user_properties = Vec::new();
116
117        let (properties_len_len, properties_len) = length(bytes.iter())?;
118        bytes.advance(properties_len_len);
119        if properties_len == 0 {
120            return Ok(None);
121        }
122
123        let mut cursor = 0;
124        // read until cursor reaches property length. properties_len = 0 will skip this loop
125        while cursor < properties_len {
126            let prop = read_u8(bytes)?;
127            cursor += 1;
128
129            match property(prop)? {
130                PropertyType::ReasonString => {
131                    let reason = read_mqtt_string(bytes)?;
132                    cursor += 2 + reason.len();
133                    reason_string = Some(reason);
134                }
135                PropertyType::UserProperty => {
136                    let key = read_mqtt_string(bytes)?;
137                    let value = read_mqtt_string(bytes)?;
138                    cursor += 2 + key.len() + 2 + value.len();
139                    user_properties.push((key, value));
140                }
141                _ => return Err(Error::InvalidPropertyType(prop)),
142            }
143        }
144
145        Ok(Some(UnsubAckProperties {
146            reason_string,
147            user_properties,
148        }))
149    }
150
151    pub fn write(&self, buffer: &mut BytesMut) -> Result<(), Error> {
152        let len = self.len();
153        write_remaining_length(buffer, len)?;
154
155        if let Some(reason) = &self.reason_string {
156            buffer.put_u8(PropertyType::ReasonString as u8);
157            write_mqtt_string(buffer, reason);
158        }
159
160        for (key, value) in self.user_properties.iter() {
161            buffer.put_u8(PropertyType::UserProperty as u8);
162            write_mqtt_string(buffer, key);
163            write_mqtt_string(buffer, value);
164        }
165
166        Ok(())
167    }
168}
169
170/// Connection return code type
171fn reason(num: u8) -> Result<UnsubAckReason, Error> {
172    let code = match num {
173        0x00 => UnsubAckReason::Success,
174        0x11 => UnsubAckReason::NoSubscriptionExisted,
175        0x80 => UnsubAckReason::UnspecifiedError,
176        0x83 => UnsubAckReason::ImplementationSpecificError,
177        0x87 => UnsubAckReason::NotAuthorized,
178        0x8F => UnsubAckReason::TopicFilterInvalid,
179        0x91 => UnsubAckReason::PacketIdentifierInUse,
180        num => return Err(Error::InvalidSubscribeReasonCode(num)),
181    };
182
183    Ok(code)
184}
185
186fn code(reason: UnsubAckReason) -> u8 {
187    match reason {
188        UnsubAckReason::Success => 0x00,
189        UnsubAckReason::NoSubscriptionExisted => 0x11,
190        UnsubAckReason::UnspecifiedError => 0x80,
191        UnsubAckReason::ImplementationSpecificError => 0x83,
192        UnsubAckReason::NotAuthorized => 0x87,
193        UnsubAckReason::TopicFilterInvalid => 0x8F,
194        UnsubAckReason::PacketIdentifierInUse => 0x91,
195    }
196}
197
198#[cfg(test)]
199mod test {
200    use super::super::test::{USER_PROP_KEY, USER_PROP_VAL};
201    use super::*;
202    use bytes::BytesMut;
203    use pretty_assertions::assert_eq;
204
205    #[test]
206    fn length_calculation() {
207        let mut dummy_bytes = BytesMut::new();
208        // Use user_properties to pad the size to exceed ~128 bytes to make the
209        // remaining_length field in the packet be 2 bytes long.
210        let unsuback_props = UnsubAckProperties {
211            reason_string: None,
212            user_properties: vec![(USER_PROP_KEY.into(), USER_PROP_VAL.into())],
213        };
214
215        let unsuback_pkt = UnsubAck {
216            pkid: 1,
217            reasons: vec![UnsubAckReason::Success],
218            properties: Some(unsuback_props),
219        };
220
221        let size_from_size = unsuback_pkt.size();
222        let size_from_write = unsuback_pkt.write(&mut dummy_bytes).unwrap();
223        let size_from_bytes = dummy_bytes.len();
224
225        assert_eq!(size_from_write, size_from_bytes);
226        assert_eq!(size_from_size, size_from_bytes);
227    }
228}