1use super::*;
2use bytes::{Buf, BufMut, Bytes, BytesMut};
3
4#[derive(Debug, Clone, Copy, PartialEq, Eq)]
6#[repr(u8)]
7pub enum PubRecReason {
8 Success,
9 NoMatchingSubscribers,
10 UnspecifiedError,
11 ImplementationSpecificError,
12 NotAuthorized,
13 TopicNameInvalid,
14 PacketIdentifierInUse,
15 QuotaExceeded,
16 PayloadFormatInvalid,
17}
18
19#[derive(Debug, Clone, PartialEq, Eq)]
21pub struct PubRec {
22 pub pkid: u16,
23 pub reason: PubRecReason,
24 pub properties: Option<PubRecProperties>,
25}
26
27impl PubRec {
28 pub fn new(pkid: u16, properties: Option<PubRecProperties>) -> Self {
29 Self {
30 pkid,
31 reason: PubRecReason::Success,
32 properties,
33 }
34 }
35
36 pub fn size(&self) -> usize {
37 let len = self.len();
38 let remaining_len_size = len_len(len);
39
40 1 + remaining_len_size + len
41 }
42
43 fn len(&self) -> usize {
44 let mut len = 2 + 1; if self.reason == PubRecReason::Success && self.properties.is_none() {
50 return 2;
51 }
52
53 if let Some(p) = &self.properties {
54 let properties_len = p.len();
55 let properties_len_len = len_len(properties_len);
56 len += properties_len_len + properties_len;
57 } else {
58 len += 1
59 }
60
61 len
62 }
63
64 pub fn read(fixed_header: FixedHeader, mut bytes: Bytes) -> Result<PubRec, Error> {
65 let variable_header_index = fixed_header.fixed_header_len;
66 bytes.advance(variable_header_index);
67 let pkid = read_u16(&mut bytes)?;
68 if fixed_header.remaining_len == 2 {
69 return Ok(PubRec {
70 pkid,
71 reason: PubRecReason::Success,
72 properties: None,
73 });
74 }
75
76 let ack_reason = read_u8(&mut bytes)?;
77 if fixed_header.remaining_len < 4 {
78 return Ok(PubRec {
79 pkid,
80 reason: reason(ack_reason)?,
81 properties: None,
82 });
83 }
84
85 let properties = PubRecProperties::read(&mut bytes)?;
86 let puback = PubRec {
87 pkid,
88 reason: reason(ack_reason)?,
89 properties,
90 };
91 Ok(puback)
92 }
93
94 pub fn write(&self, buffer: &mut BytesMut) -> Result<usize, Error> {
95 let len = self.len();
96 buffer.put_u8(0x50);
97 let count = write_remaining_length(buffer, len)?;
98 buffer.put_u16(self.pkid);
99
100 if self.reason == PubRecReason::Success && self.properties.is_none() {
102 return Ok(4);
103 }
104
105 buffer.put_u8(code(self.reason));
106
107 if let Some(p) = &self.properties {
108 p.write(buffer)?;
109 } else {
110 write_remaining_length(buffer, 0)?;
111 }
112
113 Ok(1 + count + len)
114 }
115}
116
117#[derive(Debug, Clone, PartialEq, Eq)]
118pub struct PubRecProperties {
119 pub reason_string: Option<String>,
120 pub user_properties: Vec<(String, String)>,
121}
122
123impl PubRecProperties {
124 fn len(&self) -> usize {
125 let mut len = 0;
126
127 if let Some(reason) = &self.reason_string {
128 len += 1 + 2 + reason.len();
129 }
130
131 for (key, value) in self.user_properties.iter() {
132 len += 1 + 2 + key.len() + 2 + value.len();
133 }
134
135 len
136 }
137
138 pub fn read(bytes: &mut Bytes) -> Result<Option<PubRecProperties>, Error> {
139 let mut reason_string = None;
140 let mut user_properties = Vec::new();
141
142 let (properties_len_len, properties_len) = length(bytes.iter())?;
143 bytes.advance(properties_len_len);
144 if properties_len == 0 {
145 return Ok(None);
146 }
147
148 let mut cursor = 0;
149 while cursor < properties_len {
151 let prop = read_u8(bytes)?;
152 cursor += 1;
153
154 match property(prop)? {
155 PropertyType::ReasonString => {
156 let reason = read_mqtt_string(bytes)?;
157 cursor += 2 + reason.len();
158 reason_string = Some(reason);
159 }
160 PropertyType::UserProperty => {
161 let key = read_mqtt_string(bytes)?;
162 let value = read_mqtt_string(bytes)?;
163 cursor += 2 + key.len() + 2 + value.len();
164 user_properties.push((key, value));
165 }
166 _ => return Err(Error::InvalidPropertyType(prop)),
167 }
168 }
169
170 Ok(Some(PubRecProperties {
171 reason_string,
172 user_properties,
173 }))
174 }
175
176 pub fn write(&self, buffer: &mut BytesMut) -> Result<(), Error> {
177 let len = self.len();
178 write_remaining_length(buffer, len)?;
179
180 if let Some(reason) = &self.reason_string {
181 buffer.put_u8(PropertyType::ReasonString as u8);
182 write_mqtt_string(buffer, reason);
183 }
184
185 for (key, value) in self.user_properties.iter() {
186 buffer.put_u8(PropertyType::UserProperty as u8);
187 write_mqtt_string(buffer, key);
188 write_mqtt_string(buffer, value);
189 }
190
191 Ok(())
192 }
193}
194
195fn reason(num: u8) -> Result<PubRecReason, Error> {
197 let code = match num {
198 0 => PubRecReason::Success,
199 16 => PubRecReason::NoMatchingSubscribers,
200 128 => PubRecReason::UnspecifiedError,
201 131 => PubRecReason::ImplementationSpecificError,
202 135 => PubRecReason::NotAuthorized,
203 144 => PubRecReason::TopicNameInvalid,
204 145 => PubRecReason::PacketIdentifierInUse,
205 151 => PubRecReason::QuotaExceeded,
206 153 => PubRecReason::PayloadFormatInvalid,
207 num => return Err(Error::InvalidConnectReturnCode(num)),
208 };
209
210 Ok(code)
211}
212
213fn code(reason: PubRecReason) -> u8 {
214 match reason {
215 PubRecReason::Success => 0,
216 PubRecReason::NoMatchingSubscribers => 16,
217 PubRecReason::UnspecifiedError => 128,
218 PubRecReason::ImplementationSpecificError => 131,
219 PubRecReason::NotAuthorized => 135,
220 PubRecReason::TopicNameInvalid => 144,
221 PubRecReason::PacketIdentifierInUse => 145,
222 PubRecReason::QuotaExceeded => 151,
223 PubRecReason::PayloadFormatInvalid => 153,
224 }
225}
226
227#[cfg(test)]
228mod test {
229 use super::super::test::{USER_PROP_KEY, USER_PROP_VAL};
230 use super::*;
231 use bytes::BytesMut;
232 use pretty_assertions::assert_eq;
233
234 #[test]
235 fn length_calculation() {
236 let mut dummy_bytes = BytesMut::new();
237 let pubrec_props = PubRecProperties {
240 reason_string: None,
241 user_properties: vec![(USER_PROP_KEY.into(), USER_PROP_VAL.into())],
242 };
243
244 let pubrec_pkt = PubRec::new(1, Some(pubrec_props));
245
246 let size_from_size = pubrec_pkt.size();
247 let size_from_write = pubrec_pkt.write(&mut dummy_bytes).unwrap();
248 let size_from_bytes = dummy_bytes.len();
249
250 assert_eq!(size_from_write, size_from_bytes);
251 assert_eq!(size_from_size, size_from_bytes);
252 }
253}