Changeset 4870
- Timestamp:
- 06/02/09 11:15:41 (4 years ago)
- Location:
- branch/UMPA
- Files:
-
- 2 modified
-
tests/unit/test_sniffing/test_sniffing_init.py (modified) (3 diffs)
-
umpa/sniffing/__init__.py (modified) (6 diffs)
Legend:
- Unmodified
- Added
- Removed
-
branch/UMPA/tests/unit/test_sniffing/test_sniffing_init.py
r4858 r4870 75 75 if args[0] > args[1]: 76 76 raise UMPASniffingException("test") 77 78 def cbk2(ts, pkt, *args): 79 print pkt 80 81 th = SendPacket(umpa.Packet(IP(source_address="1.2.3.6"), 82 TCP(source_port=99))) 83 th.start() 84 umpa.sniffing.sniff_loop(1, filter="src 1.2.3.6", device='any', 85 callback=cbk, callback_args=[1,2]) 86 th.join() 77 87 78 88 th = SendPacket(umpa.Packet(IP(source_address="1.2.3.6"), … … 80 90 th.start() 81 91 umpa.sniffing.sniff_loop(1, filter="src 1.2.3.6", device='any', 82 callback=cbk, callback_args=[1,2])92 callback=cbk2) 83 93 th.join() 84 94 … … 93 103 def test_from_file(self): 94 104 py.test.skip("implement to_file first") 105 106 def test_from_file_loop(self): 107 py.test.skip("implement to_file first") -
branch/UMPA/umpa/sniffing/__init__.py
r4858 r4870 106 106 107 107 def sniff_loop(count=0, filter=None, device=None, timeout=0, snaplen=1024, 108 promisc=True, callback=None, callback_args=None):108 promisc=True, callback=None, callback_args=None): 109 109 """ 110 110 Sniff packets and call a callback function for each. … … 136 136 """ 137 137 138 if callback_args is None: 139 callback_args = [] 140 138 141 session = lpcap.open_pcap(device, snaplen, promisc, timeout) 139 142 if filter: … … 148 151 return sniff(1, device='any') 149 152 150 def from_file(filename, count=0, filter=None, callback=None, 151 callback_args=None): 153 def from_file(filename, count=0, filter=None): 152 154 """ 153 155 Load data from pcap file instead of sniffing online. … … 163 165 @type filter: C{str} 164 166 @param filter: BPF filter 165 166 @type callback: C{func}167 @param callback: function with (timestamp, pkt, *callback_args) prototype168 169 @type callback_args: C{list}170 @param callback_args: additional arguments for callback function171 167 """ 172 168 … … 178 174 if filter: 179 175 f.setfilter(filter) 180 if callback is not None: 181 f.loop(count, callback, *callback_args) 182 elif count > 0: 176 if count > 0: 183 177 packets = [] 184 178 for i in xrange(count): … … 189 183 return packets 190 184 185 def from_file_loop(filename, count=0, filter=None, callback=None, 186 callback_args=None): 187 """ 188 Load data from pcap file instead of sniffing online. 189 190 Call callback for each or return list of packets. 191 192 @type filename: C{str} 193 @param filename: path to a file in pcap format 194 195 @type count: C{int} 196 @param count: number of sniffing packets; 0 means infinity (default: I{0}) 197 198 @type filter: C{str} 199 @param filter: BPF filter 200 201 @type callback: C{func} 202 @param callback: function with (timestamp, pkt, *callback_args) prototype 203 204 @type callback_args: C{list} 205 @param callback_args: additional arguments for callback function 206 """ 207 208 if callback_args is None: 209 callback_args = [] 210 211 if os.path.isfile(filename): 212 f = lpcap.open_pcap(filename) 213 else: 214 raise UMPASniffingException("can't open file: %s" % filename) 215 216 if filter: 217 f.setfilter(filter) 218 f.loop(count, callback, *callback_args) 219 191 220 def to_file(): 192 221 pass
