rumqttc/v5/mqttbytes/v5/
pubrec.rs

1use super::*;
2use bytes::{Buf, BufMut, Bytes, BytesMut};
3
4/// Return code in PubRec
5#[derive(Debug, Clone, Copy, PartialEq, Eq)]
6#[repr(u8)]
7pub enum PubRecReason {
8    Success,
9    NoMatchingSubscribers,
10    UnspecifiedError,
11    ImplementationSpecificError,
12    NotAuthorized,
13    TopicNameInvalid,
14    PacketIdentifierInUse,
15    QuotaExceeded,
16    PayloadFormatInvalid,
17}
18
19/// Acknowledgement to QoS1 publish
20#[derive(Debug, Clone, PartialEq, Eq)]
21pub struct PubRec {
22    pub pkid: u16,
23    pub reason: PubRecReason,
24    pub properties: Option<PubRecProperties>,
25}
26
27impl PubRec {
28    pub fn new(pkid: u16, properties: Option<PubRecProperties>) -> Self {
29        Self {
30            pkid,
31            reason: PubRecReason::Success,
32            properties,
33        }
34    }
35
36    pub fn size(&self) -> usize {
37        let len = self.len();
38        let remaining_len_size = len_len(len);
39
40        1 + remaining_len_size + len
41    }
42
43    fn len(&self) -> usize {
44        let mut len = 2 + 1; // pkid + reason
45
46        // The Reason Code and Property Length can be omitted if the Reason Code is 0x00 (Success)
47        // and there are no Properties. In this case the PUBREC has a Remaining Length of 2.
48        // <https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901134>
49        if self.reason == PubRecReason::Success && self.properties.is_none() {
50            return 2;
51        }
52
53        if let Some(p) = &self.properties {
54            let properties_len = p.len();
55            let properties_len_len = len_len(properties_len);
56            len += properties_len_len + properties_len;
57        } else {
58            len += 1
59        }
60
61        len
62    }
63
64    pub fn read(fixed_header: FixedHeader, mut bytes: Bytes) -> Result<PubRec, Error> {
65        let variable_header_index = fixed_header.fixed_header_len;
66        bytes.advance(variable_header_index);
67        let pkid = read_u16(&mut bytes)?;
68        if fixed_header.remaining_len == 2 {
69            return Ok(PubRec {
70                pkid,
71                reason: PubRecReason::Success,
72                properties: None,
73            });
74        }
75
76        let ack_reason = read_u8(&mut bytes)?;
77        if fixed_header.remaining_len < 4 {
78            return Ok(PubRec {
79                pkid,
80                reason: reason(ack_reason)?,
81                properties: None,
82            });
83        }
84
85        let properties = PubRecProperties::read(&mut bytes)?;
86        let puback = PubRec {
87            pkid,
88            reason: reason(ack_reason)?,
89            properties,
90        };
91        Ok(puback)
92    }
93
94    pub fn write(&self, buffer: &mut BytesMut) -> Result<usize, Error> {
95        let len = self.len();
96        buffer.put_u8(0x50);
97        let count = write_remaining_length(buffer, len)?;
98        buffer.put_u16(self.pkid);
99
100        // If there are no properties during success, sending reason code is optional
101        if self.reason == PubRecReason::Success && self.properties.is_none() {
102            return Ok(4);
103        }
104
105        buffer.put_u8(code(self.reason));
106
107        if let Some(p) = &self.properties {
108            p.write(buffer)?;
109        } else {
110            write_remaining_length(buffer, 0)?;
111        }
112
113        Ok(1 + count + len)
114    }
115}
116
117#[derive(Debug, Clone, PartialEq, Eq)]
118pub struct PubRecProperties {
119    pub reason_string: Option<String>,
120    pub user_properties: Vec<(String, String)>,
121}
122
123impl PubRecProperties {
124    fn len(&self) -> usize {
125        let mut len = 0;
126
127        if let Some(reason) = &self.reason_string {
128            len += 1 + 2 + reason.len();
129        }
130
131        for (key, value) in self.user_properties.iter() {
132            len += 1 + 2 + key.len() + 2 + value.len();
133        }
134
135        len
136    }
137
138    pub fn read(bytes: &mut Bytes) -> Result<Option<PubRecProperties>, Error> {
139        let mut reason_string = None;
140        let mut user_properties = Vec::new();
141
142        let (properties_len_len, properties_len) = length(bytes.iter())?;
143        bytes.advance(properties_len_len);
144        if properties_len == 0 {
145            return Ok(None);
146        }
147
148        let mut cursor = 0;
149        // read until cursor reaches property length. properties_len = 0 will skip this loop
150        while cursor < properties_len {
151            let prop = read_u8(bytes)?;
152            cursor += 1;
153
154            match property(prop)? {
155                PropertyType::ReasonString => {
156                    let reason = read_mqtt_string(bytes)?;
157                    cursor += 2 + reason.len();
158                    reason_string = Some(reason);
159                }
160                PropertyType::UserProperty => {
161                    let key = read_mqtt_string(bytes)?;
162                    let value = read_mqtt_string(bytes)?;
163                    cursor += 2 + key.len() + 2 + value.len();
164                    user_properties.push((key, value));
165                }
166                _ => return Err(Error::InvalidPropertyType(prop)),
167            }
168        }
169
170        Ok(Some(PubRecProperties {
171            reason_string,
172            user_properties,
173        }))
174    }
175
176    pub fn write(&self, buffer: &mut BytesMut) -> Result<(), Error> {
177        let len = self.len();
178        write_remaining_length(buffer, len)?;
179
180        if let Some(reason) = &self.reason_string {
181            buffer.put_u8(PropertyType::ReasonString as u8);
182            write_mqtt_string(buffer, reason);
183        }
184
185        for (key, value) in self.user_properties.iter() {
186            buffer.put_u8(PropertyType::UserProperty as u8);
187            write_mqtt_string(buffer, key);
188            write_mqtt_string(buffer, value);
189        }
190
191        Ok(())
192    }
193}
194
195/// Connection return code type
196fn reason(num: u8) -> Result<PubRecReason, Error> {
197    let code = match num {
198        0 => PubRecReason::Success,
199        16 => PubRecReason::NoMatchingSubscribers,
200        128 => PubRecReason::UnspecifiedError,
201        131 => PubRecReason::ImplementationSpecificError,
202        135 => PubRecReason::NotAuthorized,
203        144 => PubRecReason::TopicNameInvalid,
204        145 => PubRecReason::PacketIdentifierInUse,
205        151 => PubRecReason::QuotaExceeded,
206        153 => PubRecReason::PayloadFormatInvalid,
207        num => return Err(Error::InvalidConnectReturnCode(num)),
208    };
209
210    Ok(code)
211}
212
213fn code(reason: PubRecReason) -> u8 {
214    match reason {
215        PubRecReason::Success => 0,
216        PubRecReason::NoMatchingSubscribers => 16,
217        PubRecReason::UnspecifiedError => 128,
218        PubRecReason::ImplementationSpecificError => 131,
219        PubRecReason::NotAuthorized => 135,
220        PubRecReason::TopicNameInvalid => 144,
221        PubRecReason::PacketIdentifierInUse => 145,
222        PubRecReason::QuotaExceeded => 151,
223        PubRecReason::PayloadFormatInvalid => 153,
224    }
225}
226
227#[cfg(test)]
228mod test {
229    use super::super::test::{USER_PROP_KEY, USER_PROP_VAL};
230    use super::*;
231    use bytes::BytesMut;
232    use pretty_assertions::assert_eq;
233
234    #[test]
235    fn length_calculation() {
236        let mut dummy_bytes = BytesMut::new();
237        // Use user_properties to pad the size to exceed ~128 bytes to make the
238        // remaining_length field in the packet be 2 bytes long.
239        let pubrec_props = PubRecProperties {
240            reason_string: None,
241            user_properties: vec![(USER_PROP_KEY.into(), USER_PROP_VAL.into())],
242        };
243
244        let pubrec_pkt = PubRec::new(1, Some(pubrec_props));
245
246        let size_from_size = pubrec_pkt.size();
247        let size_from_write = pubrec_pkt.write(&mut dummy_bytes).unwrap();
248        let size_from_bytes = dummy_bytes.len();
249
250        assert_eq!(size_from_write, size_from_bytes);
251        assert_eq!(size_from_size, size_from_bytes);
252    }
253}