rumqttc/v5/mqttbytes/v5/
puback.rs

1use super::*;
2use bytes::{Buf, BufMut, Bytes, BytesMut};
3
4/// Return code in puback
5#[derive(Debug, Clone, Copy, PartialEq, Eq)]
6pub enum PubAckReason {
7    Success,
8    NoMatchingSubscribers,
9    UnspecifiedError,
10    ImplementationSpecificError,
11    NotAuthorized,
12    TopicNameInvalid,
13    PacketIdentifierInUse,
14    QuotaExceeded,
15    PayloadFormatInvalid,
16}
17
18/// Acknowledgement to QoS1 publish
19#[derive(Debug, Clone, PartialEq, Eq)]
20pub struct PubAck {
21    pub pkid: u16,
22    pub reason: PubAckReason,
23    pub properties: Option<PubAckProperties>,
24}
25
26impl PubAck {
27    pub fn new(pkid: u16, properties: Option<PubAckProperties>) -> Self {
28        Self {
29            pkid,
30            reason: PubAckReason::Success,
31            properties,
32        }
33    }
34
35    pub fn size(&self) -> usize {
36        if self.reason == PubAckReason::Success && self.properties.is_none() {
37            return 4;
38        }
39        let len = self.len();
40        let remaining_len_size = len_len(len);
41
42        1 + remaining_len_size + len
43    }
44
45    fn len(&self) -> usize {
46        let mut len = 2 + 1; // pkid + reason
47
48        // If there are no properties, sending reason code is optional
49        if self.reason == PubAckReason::Success && self.properties.is_none() {
50            return 2;
51        }
52
53        if let Some(p) = &self.properties {
54            let properties_len = p.len();
55            let properties_len_len = len_len(properties_len);
56            len += properties_len_len + properties_len;
57        } else {
58            // just 1 byte representing 0 len properties
59            len += 1;
60        }
61
62        len
63    }
64
65    pub fn read(fixed_header: FixedHeader, mut bytes: Bytes) -> Result<PubAck, Error> {
66        let variable_header_index = fixed_header.fixed_header_len;
67        bytes.advance(variable_header_index);
68        let pkid = read_u16(&mut bytes)?;
69
70        // No reason code or properties if remaining length == 2
71        if fixed_header.remaining_len == 2 {
72            return Ok(PubAck {
73                pkid,
74                reason: PubAckReason::Success,
75                properties: None,
76            });
77        }
78
79        // No properties len or properties if remaining len > 2 but < 4
80        let ack_reason = read_u8(&mut bytes)?;
81        if fixed_header.remaining_len < 4 {
82            return Ok(PubAck {
83                pkid,
84                reason: reason(ack_reason)?,
85                properties: None,
86            });
87        }
88
89        let properties = PubAckProperties::read(&mut bytes)?;
90        let puback = PubAck {
91            pkid,
92            reason: reason(ack_reason)?,
93            properties,
94        };
95
96        Ok(puback)
97    }
98
99    pub fn write(&self, buffer: &mut BytesMut) -> Result<usize, Error> {
100        let len = self.len();
101        buffer.put_u8(0x40);
102
103        let count = write_remaining_length(buffer, len)?;
104        buffer.put_u16(self.pkid);
105
106        // Reason code is optional with success if there are no properties
107        if self.reason == PubAckReason::Success && self.properties.is_none() {
108            return Ok(4);
109        }
110
111        buffer.put_u8(code(self.reason));
112        if let Some(p) = &self.properties {
113            p.write(buffer)?;
114        } else {
115            write_remaining_length(buffer, 0)?;
116        }
117
118        Ok(1 + count + len)
119    }
120}
121
122#[derive(Debug, Clone, PartialEq, Eq)]
123pub struct PubAckProperties {
124    pub reason_string: Option<String>,
125    pub user_properties: Vec<(String, String)>,
126}
127
128impl PubAckProperties {
129    fn len(&self) -> usize {
130        let mut len = 0;
131
132        if let Some(reason) = &self.reason_string {
133            len += 1 + 2 + reason.len();
134        }
135
136        for (key, value) in self.user_properties.iter() {
137            len += 1 + 2 + key.len() + 2 + value.len();
138        }
139
140        len
141    }
142
143    pub fn read(bytes: &mut Bytes) -> Result<Option<PubAckProperties>, Error> {
144        let mut reason_string = None;
145        let mut user_properties = Vec::new();
146
147        let (properties_len_len, properties_len) = length(bytes.iter())?;
148        bytes.advance(properties_len_len);
149        if properties_len == 0 {
150            return Ok(None);
151        }
152
153        let mut cursor = 0;
154        // read until cursor reaches property length. properties_len = 0 will skip this loop
155        while cursor < properties_len {
156            let prop = read_u8(bytes)?;
157            cursor += 1;
158
159            match property(prop)? {
160                PropertyType::ReasonString => {
161                    let reason = read_mqtt_string(bytes)?;
162                    cursor += 2 + reason.len();
163                    reason_string = Some(reason);
164                }
165                PropertyType::UserProperty => {
166                    let key = read_mqtt_string(bytes)?;
167                    let value = read_mqtt_string(bytes)?;
168                    cursor += 2 + key.len() + 2 + value.len();
169                    user_properties.push((key, value));
170                }
171                _ => return Err(Error::InvalidPropertyType(prop)),
172            }
173        }
174
175        Ok(Some(PubAckProperties {
176            reason_string,
177            user_properties,
178        }))
179    }
180
181    pub fn write(&self, buffer: &mut BytesMut) -> Result<(), Error> {
182        let len = self.len();
183        write_remaining_length(buffer, len)?;
184
185        if let Some(reason) = &self.reason_string {
186            buffer.put_u8(PropertyType::ReasonString as u8);
187            write_mqtt_string(buffer, reason);
188        }
189
190        for (key, value) in self.user_properties.iter() {
191            buffer.put_u8(PropertyType::UserProperty as u8);
192            write_mqtt_string(buffer, key);
193            write_mqtt_string(buffer, value);
194        }
195
196        Ok(())
197    }
198}
199
200/// Connection return code type
201fn reason(num: u8) -> Result<PubAckReason, Error> {
202    let code = match num {
203        0 => PubAckReason::Success,
204        16 => PubAckReason::NoMatchingSubscribers,
205        128 => PubAckReason::UnspecifiedError,
206        131 => PubAckReason::ImplementationSpecificError,
207        135 => PubAckReason::NotAuthorized,
208        144 => PubAckReason::TopicNameInvalid,
209        145 => PubAckReason::PacketIdentifierInUse,
210        151 => PubAckReason::QuotaExceeded,
211        153 => PubAckReason::PayloadFormatInvalid,
212        num => return Err(Error::InvalidConnectReturnCode(num)),
213    };
214
215    Ok(code)
216}
217
218// TODO: Is typecasting significantly faster than functions?
219fn code(reason: PubAckReason) -> u8 {
220    match reason {
221        PubAckReason::Success => 0,
222        PubAckReason::NoMatchingSubscribers => 16,
223        PubAckReason::UnspecifiedError => 128,
224        PubAckReason::ImplementationSpecificError => 131,
225        PubAckReason::NotAuthorized => 135,
226        PubAckReason::TopicNameInvalid => 144,
227        PubAckReason::PacketIdentifierInUse => 145,
228        PubAckReason::QuotaExceeded => 151,
229        PubAckReason::PayloadFormatInvalid => 153,
230    }
231}
232
233#[cfg(test)]
234mod test {
235    use super::super::test::{USER_PROP_KEY, USER_PROP_VAL};
236    use super::*;
237    use bytes::BytesMut;
238    use pretty_assertions::assert_eq;
239
240    #[test]
241    fn length_calculation() {
242        let mut dummy_bytes = BytesMut::new();
243        // Use user_properties to pad the size to exceed ~128 bytes to make the
244        // remaining_length field in the packet be 2 bytes long.
245        let puback_props = PubAckProperties {
246            reason_string: None,
247            user_properties: vec![(USER_PROP_KEY.into(), USER_PROP_VAL.into())],
248        };
249
250        let puback_pkt = PubAck::new(1, Some(puback_props));
251
252        let size_from_size = puback_pkt.size();
253        let size_from_write = puback_pkt.write(&mut dummy_bytes).unwrap();
254        let size_from_bytes = dummy_bytes.len();
255
256        assert_eq!(size_from_write, size_from_bytes);
257        assert_eq!(size_from_size, size_from_bytes);
258    }
259}