rumqttc/v5/mqttbytes/v5/
pubcomp.rs

1use super::*;
2use bytes::{Buf, BufMut, Bytes, BytesMut};
3
4/// Return code in PubComp
5#[derive(Debug, Clone, Copy, PartialEq, Eq)]
6#[repr(u8)]
7pub enum PubCompReason {
8    Success,
9    PacketIdentifierNotFound,
10}
11
12/// QoS2 Assured publish complete, in response to PUBREL packet
13#[derive(Debug, Clone, PartialEq, Eq)]
14pub struct PubComp {
15    pub pkid: u16,
16    pub reason: PubCompReason,
17    pub properties: Option<PubCompProperties>,
18}
19
20impl PubComp {
21    pub fn new(pkid: u16, properties: Option<PubCompProperties>) -> Self {
22        Self {
23            pkid,
24            reason: PubCompReason::Success,
25            properties,
26        }
27    }
28
29    pub fn size(&self) -> usize {
30        if self.reason == PubCompReason::Success && self.properties.is_none() {
31            return 4;
32        }
33        let len = self.len();
34        let remaining_len_size = len_len(len);
35
36        1 + remaining_len_size + len
37    }
38
39    fn len(&self) -> usize {
40        let mut len = 2 + 1; // pkid + reason
41
42        // The Reason Code and Property Length can be omitted if the Reason Code is 0x00 (Success)
43        // and there are no Properties. In this case the PUBCOMP has a Remaining Length of 2.
44        // <https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901154>
45        if self.reason == PubCompReason::Success && self.properties.is_none() {
46            return 2;
47        }
48
49        if let Some(p) = &self.properties {
50            let properties_len = p.len();
51            let properties_len_len = len_len(properties_len);
52            len += properties_len_len + properties_len;
53        } else {
54            len += 1;
55        }
56
57        len
58    }
59
60    pub fn read(fixed_header: FixedHeader, mut bytes: Bytes) -> Result<PubComp, Error> {
61        let variable_header_index = fixed_header.fixed_header_len;
62        bytes.advance(variable_header_index);
63        let pkid = read_u16(&mut bytes)?;
64
65        if fixed_header.remaining_len == 2 {
66            return Ok(PubComp {
67                pkid,
68                reason: PubCompReason::Success,
69                properties: None,
70            });
71        }
72
73        let ack_reason = read_u8(&mut bytes)?;
74        if fixed_header.remaining_len < 4 {
75            return Ok(PubComp {
76                pkid,
77                reason: reason(ack_reason)?,
78                properties: None,
79            });
80        }
81
82        let properties = PubCompProperties::read(&mut bytes)?;
83        let puback = PubComp {
84            pkid,
85            reason: reason(ack_reason)?,
86            properties,
87        };
88
89        Ok(puback)
90    }
91
92    pub fn write(&self, buffer: &mut BytesMut) -> Result<usize, Error> {
93        let len = self.len();
94        buffer.put_u8(0x70);
95        let count = write_remaining_length(buffer, len)?;
96        buffer.put_u16(self.pkid);
97
98        // If there are no properties during success, sending reason code is optional
99        if self.reason == PubCompReason::Success && self.properties.is_none() {
100            return Ok(4);
101        }
102
103        buffer.put_u8(code(self.reason));
104
105        if let Some(p) = &self.properties {
106            p.write(buffer)?;
107        } else {
108            write_remaining_length(buffer, 0)?;
109        }
110
111        Ok(1 + count + len)
112    }
113}
114
115#[derive(Debug, Clone, PartialEq, Eq)]
116pub struct PubCompProperties {
117    pub reason_string: Option<String>,
118    pub user_properties: Vec<(String, String)>,
119}
120
121impl PubCompProperties {
122    fn len(&self) -> usize {
123        let mut len = 0;
124
125        if let Some(reason) = &self.reason_string {
126            len += 1 + 2 + reason.len();
127        }
128
129        for (key, value) in self.user_properties.iter() {
130            len += 1 + 2 + key.len() + 2 + value.len();
131        }
132
133        len
134    }
135
136    pub fn read(bytes: &mut Bytes) -> Result<Option<PubCompProperties>, Error> {
137        let mut reason_string = None;
138        let mut user_properties = Vec::new();
139
140        let (properties_len_len, properties_len) = length(bytes.iter())?;
141        bytes.advance(properties_len_len);
142        if properties_len == 0 {
143            return Ok(None);
144        }
145
146        let mut cursor = 0;
147        // read until cursor reaches property length. properties_len = 0 will skip this loop
148        while cursor < properties_len {
149            let prop = read_u8(bytes)?;
150            cursor += 1;
151
152            match property(prop)? {
153                PropertyType::ReasonString => {
154                    let reason = read_mqtt_string(bytes)?;
155                    cursor += 2 + reason.len();
156                    reason_string = Some(reason);
157                }
158                PropertyType::UserProperty => {
159                    let key = read_mqtt_string(bytes)?;
160                    let value = read_mqtt_string(bytes)?;
161                    cursor += 2 + key.len() + 2 + value.len();
162                    user_properties.push((key, value));
163                }
164                _ => return Err(Error::InvalidPropertyType(prop)),
165            }
166        }
167
168        Ok(Some(PubCompProperties {
169            reason_string,
170            user_properties,
171        }))
172    }
173
174    pub fn write(&self, buffer: &mut BytesMut) -> Result<(), Error> {
175        let len = self.len();
176        write_remaining_length(buffer, len)?;
177
178        if let Some(reason) = &self.reason_string {
179            buffer.put_u8(PropertyType::ReasonString as u8);
180            write_mqtt_string(buffer, reason);
181        }
182
183        for (key, value) in self.user_properties.iter() {
184            buffer.put_u8(PropertyType::UserProperty as u8);
185            write_mqtt_string(buffer, key);
186            write_mqtt_string(buffer, value);
187        }
188
189        Ok(())
190    }
191}
192
193/// Connection return code type
194fn reason(num: u8) -> Result<PubCompReason, Error> {
195    let code = match num {
196        0 => PubCompReason::Success,
197        146 => PubCompReason::PacketIdentifierNotFound,
198        num => return Err(Error::InvalidConnectReturnCode(num)),
199    };
200
201    Ok(code)
202}
203
204fn code(reason: PubCompReason) -> u8 {
205    match reason {
206        PubCompReason::Success => 0,
207        PubCompReason::PacketIdentifierNotFound => 146,
208    }
209}
210
211#[cfg(test)]
212mod test {
213    use super::super::test::{USER_PROP_KEY, USER_PROP_VAL};
214    use super::*;
215    use bytes::BytesMut;
216    use pretty_assertions::assert_eq;
217
218    #[test]
219    fn length_calculation() {
220        let mut dummy_bytes = BytesMut::new();
221        // Use user_properties to pad the size to exceed ~128 bytes to make the
222        // remaining_length field in the packet be 2 bytes long.
223        let pubcomp_props = PubCompProperties {
224            reason_string: None,
225            user_properties: vec![(USER_PROP_KEY.into(), USER_PROP_VAL.into())],
226        };
227
228        let pubcomp_pkt = PubComp::new(1, Some(pubcomp_props));
229
230        let size_from_size = pubcomp_pkt.size();
231        let size_from_write = pubcomp_pkt.write(&mut dummy_bytes).unwrap();
232        let size_from_bytes = dummy_bytes.len();
233
234        assert_eq!(size_from_write, size_from_bytes);
235        assert_eq!(size_from_size, size_from_bytes);
236    }
237}