rumqttc/v5/mqttbytes/v5/
pubcomp.rs1use super::*;
2use bytes::{Buf, BufMut, Bytes, BytesMut};
3
4#[derive(Debug, Clone, Copy, PartialEq, Eq)]
6#[repr(u8)]
7pub enum PubCompReason {
8 Success,
9 PacketIdentifierNotFound,
10}
11
12#[derive(Debug, Clone, PartialEq, Eq)]
14pub struct PubComp {
15 pub pkid: u16,
16 pub reason: PubCompReason,
17 pub properties: Option<PubCompProperties>,
18}
19
20impl PubComp {
21 pub fn new(pkid: u16, properties: Option<PubCompProperties>) -> Self {
22 Self {
23 pkid,
24 reason: PubCompReason::Success,
25 properties,
26 }
27 }
28
29 pub fn size(&self) -> usize {
30 if self.reason == PubCompReason::Success && self.properties.is_none() {
31 return 4;
32 }
33 let len = self.len();
34 let remaining_len_size = len_len(len);
35
36 1 + remaining_len_size + len
37 }
38
39 fn len(&self) -> usize {
40 let mut len = 2 + 1; if self.reason == PubCompReason::Success && self.properties.is_none() {
46 return 2;
47 }
48
49 if let Some(p) = &self.properties {
50 let properties_len = p.len();
51 let properties_len_len = len_len(properties_len);
52 len += properties_len_len + properties_len;
53 } else {
54 len += 1;
55 }
56
57 len
58 }
59
60 pub fn read(fixed_header: FixedHeader, mut bytes: Bytes) -> Result<PubComp, Error> {
61 let variable_header_index = fixed_header.fixed_header_len;
62 bytes.advance(variable_header_index);
63 let pkid = read_u16(&mut bytes)?;
64
65 if fixed_header.remaining_len == 2 {
66 return Ok(PubComp {
67 pkid,
68 reason: PubCompReason::Success,
69 properties: None,
70 });
71 }
72
73 let ack_reason = read_u8(&mut bytes)?;
74 if fixed_header.remaining_len < 4 {
75 return Ok(PubComp {
76 pkid,
77 reason: reason(ack_reason)?,
78 properties: None,
79 });
80 }
81
82 let properties = PubCompProperties::read(&mut bytes)?;
83 let puback = PubComp {
84 pkid,
85 reason: reason(ack_reason)?,
86 properties,
87 };
88
89 Ok(puback)
90 }
91
92 pub fn write(&self, buffer: &mut BytesMut) -> Result<usize, Error> {
93 let len = self.len();
94 buffer.put_u8(0x70);
95 let count = write_remaining_length(buffer, len)?;
96 buffer.put_u16(self.pkid);
97
98 if self.reason == PubCompReason::Success && self.properties.is_none() {
100 return Ok(4);
101 }
102
103 buffer.put_u8(code(self.reason));
104
105 if let Some(p) = &self.properties {
106 p.write(buffer)?;
107 } else {
108 write_remaining_length(buffer, 0)?;
109 }
110
111 Ok(1 + count + len)
112 }
113}
114
115#[derive(Debug, Clone, PartialEq, Eq)]
116pub struct PubCompProperties {
117 pub reason_string: Option<String>,
118 pub user_properties: Vec<(String, String)>,
119}
120
121impl PubCompProperties {
122 fn len(&self) -> usize {
123 let mut len = 0;
124
125 if let Some(reason) = &self.reason_string {
126 len += 1 + 2 + reason.len();
127 }
128
129 for (key, value) in self.user_properties.iter() {
130 len += 1 + 2 + key.len() + 2 + value.len();
131 }
132
133 len
134 }
135
136 pub fn read(bytes: &mut Bytes) -> Result<Option<PubCompProperties>, Error> {
137 let mut reason_string = None;
138 let mut user_properties = Vec::new();
139
140 let (properties_len_len, properties_len) = length(bytes.iter())?;
141 bytes.advance(properties_len_len);
142 if properties_len == 0 {
143 return Ok(None);
144 }
145
146 let mut cursor = 0;
147 while cursor < properties_len {
149 let prop = read_u8(bytes)?;
150 cursor += 1;
151
152 match property(prop)? {
153 PropertyType::ReasonString => {
154 let reason = read_mqtt_string(bytes)?;
155 cursor += 2 + reason.len();
156 reason_string = Some(reason);
157 }
158 PropertyType::UserProperty => {
159 let key = read_mqtt_string(bytes)?;
160 let value = read_mqtt_string(bytes)?;
161 cursor += 2 + key.len() + 2 + value.len();
162 user_properties.push((key, value));
163 }
164 _ => return Err(Error::InvalidPropertyType(prop)),
165 }
166 }
167
168 Ok(Some(PubCompProperties {
169 reason_string,
170 user_properties,
171 }))
172 }
173
174 pub fn write(&self, buffer: &mut BytesMut) -> Result<(), Error> {
175 let len = self.len();
176 write_remaining_length(buffer, len)?;
177
178 if let Some(reason) = &self.reason_string {
179 buffer.put_u8(PropertyType::ReasonString as u8);
180 write_mqtt_string(buffer, reason);
181 }
182
183 for (key, value) in self.user_properties.iter() {
184 buffer.put_u8(PropertyType::UserProperty as u8);
185 write_mqtt_string(buffer, key);
186 write_mqtt_string(buffer, value);
187 }
188
189 Ok(())
190 }
191}
192
193fn reason(num: u8) -> Result<PubCompReason, Error> {
195 let code = match num {
196 0 => PubCompReason::Success,
197 146 => PubCompReason::PacketIdentifierNotFound,
198 num => return Err(Error::InvalidConnectReturnCode(num)),
199 };
200
201 Ok(code)
202}
203
204fn code(reason: PubCompReason) -> u8 {
205 match reason {
206 PubCompReason::Success => 0,
207 PubCompReason::PacketIdentifierNotFound => 146,
208 }
209}
210
211#[cfg(test)]
212mod test {
213 use super::super::test::{USER_PROP_KEY, USER_PROP_VAL};
214 use super::*;
215 use bytes::BytesMut;
216 use pretty_assertions::assert_eq;
217
218 #[test]
219 fn length_calculation() {
220 let mut dummy_bytes = BytesMut::new();
221 let pubcomp_props = PubCompProperties {
224 reason_string: None,
225 user_properties: vec![(USER_PROP_KEY.into(), USER_PROP_VAL.into())],
226 };
227
228 let pubcomp_pkt = PubComp::new(1, Some(pubcomp_props));
229
230 let size_from_size = pubcomp_pkt.size();
231 let size_from_write = pubcomp_pkt.write(&mut dummy_bytes).unwrap();
232 let size_from_bytes = dummy_bytes.len();
233
234 assert_eq!(size_from_write, size_from_bytes);
235 assert_eq!(size_from_size, size_from_bytes);
236 }
237}