rumqttc/v5/mqttbytes/v5/
unsubscribe.rs1use super::*;
2use bytes::{Buf, Bytes};
3
4#[derive(Debug, Clone, PartialEq, Eq, Default)]
6pub struct Unsubscribe {
7 pub pkid: u16,
8 pub filters: Vec<String>,
9 pub properties: Option<UnsubscribeProperties>,
10}
11
12impl Unsubscribe {
13 pub fn new<S: Into<String>>(filter: S, properties: Option<UnsubscribeProperties>) -> Self {
14 Self {
15 filters: vec![filter.into()],
16 properties,
17 ..Default::default()
18 }
19 }
20
21 pub fn size(&self) -> usize {
22 let len = self.len();
23 let remaining_len_size = len_len(len);
24
25 1 + remaining_len_size + len
26 }
27
28 fn len(&self) -> usize {
29 let mut len = 2 + self.filters.iter().fold(0, |s, t| 2 + s + t.len());
32
33 if let Some(p) = &self.properties {
34 let properties_len = p.len();
35 let properties_len_len = len_len(properties_len);
36 len += properties_len_len + properties_len;
37 } else {
38 len += 1;
40 }
41
42 len
43 }
44
45 pub fn read(fixed_header: FixedHeader, mut bytes: Bytes) -> Result<Unsubscribe, Error> {
46 let variable_header_index = fixed_header.fixed_header_len;
47 bytes.advance(variable_header_index);
48
49 let pkid = read_u16(&mut bytes)?;
50 let properties = UnsubscribeProperties::read(&mut bytes)?;
51
52 let mut filters = Vec::with_capacity(1);
53 while bytes.has_remaining() {
54 let filter = read_mqtt_string(&mut bytes)?;
55 filters.push(filter);
56 }
57
58 let unsubscribe = Unsubscribe {
59 pkid,
60 filters,
61 properties,
62 };
63 Ok(unsubscribe)
64 }
65
66 pub fn write(&self, buffer: &mut BytesMut) -> Result<usize, Error> {
67 buffer.put_u8(0xA2);
68
69 let remaining_len = self.len();
71 let remaining_len_bytes = write_remaining_length(buffer, remaining_len)?;
72
73 buffer.put_u16(self.pkid);
75
76 if let Some(p) = &self.properties {
77 p.write(buffer)?;
78 } else {
79 write_remaining_length(buffer, 0)?;
80 }
81
82 for filter in self.filters.iter() {
84 write_mqtt_string(buffer, filter);
85 }
86
87 Ok(1 + remaining_len_bytes + remaining_len)
88 }
89}
90
91#[derive(Debug, Clone, PartialEq, Eq)]
92pub struct UnsubscribeProperties {
93 pub user_properties: Vec<(String, String)>,
94}
95
96impl UnsubscribeProperties {
97 fn len(&self) -> usize {
98 let mut len = 0;
99
100 for (key, value) in self.user_properties.iter() {
101 len += 1 + 2 + key.len() + 2 + value.len();
102 }
103
104 len
105 }
106
107 pub fn read(bytes: &mut Bytes) -> Result<Option<UnsubscribeProperties>, Error> {
108 let mut user_properties = Vec::new();
109
110 let (properties_len_len, properties_len) = length(bytes.iter())?;
111 bytes.advance(properties_len_len);
112
113 if properties_len == 0 {
114 return Ok(None);
115 }
116
117 let mut cursor = 0;
118 while cursor < properties_len {
120 let prop = read_u8(bytes)?;
121 cursor += 1;
122
123 match property(prop)? {
124 PropertyType::UserProperty => {
125 let key = read_mqtt_string(bytes)?;
126 let value = read_mqtt_string(bytes)?;
127 cursor += 2 + key.len() + 2 + value.len();
128 user_properties.push((key, value));
129 }
130 _ => return Err(Error::InvalidPropertyType(prop)),
131 }
132 }
133
134 Ok(Some(UnsubscribeProperties { user_properties }))
135 }
136
137 pub fn write(&self, buffer: &mut BytesMut) -> Result<(), Error> {
138 let len = self.len();
139 write_remaining_length(buffer, len)?;
140
141 for (key, value) in self.user_properties.iter() {
142 buffer.put_u8(PropertyType::UserProperty as u8);
143 write_mqtt_string(buffer, key);
144 write_mqtt_string(buffer, value);
145 }
146
147 Ok(())
148 }
149}
150
151#[cfg(test)]
152mod test {
153 use super::super::test::{USER_PROP_KEY, USER_PROP_VAL};
154 use super::*;
155 use bytes::BytesMut;
156 use pretty_assertions::assert_eq;
157
158 #[test]
159 fn length_calculation() {
160 let mut dummy_bytes = BytesMut::new();
161 let unsubscribe_props = UnsubscribeProperties {
164 user_properties: vec![(USER_PROP_KEY.into(), USER_PROP_VAL.into())],
165 };
166
167 let unsubscribe_pkt = Unsubscribe::new("hello/world", Some(unsubscribe_props));
168
169 let size_from_size = unsubscribe_pkt.size();
170 let size_from_write = unsubscribe_pkt.write(&mut dummy_bytes).unwrap();
171 let size_from_bytes = dummy_bytes.len();
172
173 assert_eq!(size_from_write, size_from_bytes);
174 assert_eq!(size_from_size, size_from_bytes);
175 }
176}