Changeset 4858
- Timestamp:
- 05/31/09 01:36:46 (4 years ago)
- Location:
- branch/UMPA
- Files:
-
- 5 modified
-
tests/unit/test_sniffing/test_libpcap/test_pypcap.py (modified) (3 diffs)
-
tests/unit/test_sniffing/test_sniffing_init.py (modified) (1 diff)
-
umpa/sniffing/__init__.py (modified) (4 diffs)
-
umpa/sniffing/libpcap/_abstract.py (modified) (1 diff)
-
umpa/sniffing/libpcap/pypcap.py (modified) (1 diff)
Legend:
- Unmodified
- Added
- Removed
-
branch/UMPA/tests/unit/test_sniffing/test_libpcap/test_pypcap.py
r4849 r4858 42 42 def test_openlive(self): 43 43 try: 44 obj = pypcap.open_ live()44 obj = pypcap.open_pcap() 45 45 assert obj.device == pcap.lookupdev() 46 obj = pypcap.open_ live(device="any") # XXX can we use 'any'?46 obj = pypcap.open_pcap(device="any") # XXX can we use 'any'? 47 47 assert obj.device == "any" 48 48 except UMPASniffingException: … … 63 63 64 64 try: 65 p = pypcap.open_ live("any", to_ms=100)65 p = pypcap.open_pcap("any", to_ms=100) 66 66 p.setfilter("src host 1.2.3.4 and src port 99") 67 67 th = SendPacket(umpa.Packet(IP(source_address="1.2.3.4"), … … 88 88 th.start() 89 89 try: 90 p = pypcap.open_ live("any", to_ms=100)90 p = pypcap.open_pcap("any", to_ms=100) 91 91 p.setfilter("src host 1.2.3.4 and src port 99") 92 92 for i in xrange(amount): -
branch/UMPA/tests/unit/test_sniffing/test_sniffing_init.py
r4851 r4858 90 90 callback_args=[2,1] ) 91 91 th.join() 92 93 def test_from_file(self): 94 py.test.skip("implement to_file first") -
branch/UMPA/umpa/sniffing/__init__.py
r4852 r4858 19 19 # along with this library; if not, write to the Free Software Foundation, 20 20 # Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 21 22 import os.path 21 23 22 24 import umpa … … 73 75 """ 74 76 75 session = lpcap.open_ live(device, snaplen, promisc, timeout)77 session = lpcap.open_pcap(device, snaplen, promisc, timeout) 76 78 if filter: 77 79 session.setfilter(filter) … … 134 136 """ 135 137 136 session = lpcap.open_ live(device, snaplen, promisc, timeout)138 session = lpcap.open_pcap(device, snaplen, promisc, timeout) 137 139 if filter: 138 140 session.setfilter(filter) … … 146 148 return sniff(1, device='any') 147 149 148 def from_file(): 149 pass 150 def from_file(filename, count=0, filter=None, callback=None, 151 callback_args=None): 152 """ 153 Load data from pcap file instead of sniffing online. 154 155 Call callback for each or return list of packets. 156 157 @type filename: C{str} 158 @param filename: path to a file in pcap format 159 160 @type count: C{int} 161 @param count: number of sniffing packets; 0 means infinity (default: I{0}) 162 163 @type filter: C{str} 164 @param filter: BPF filter 165 166 @type callback: C{func} 167 @param callback: function with (timestamp, pkt, *callback_args) prototype 168 169 @type callback_args: C{list} 170 @param callback_args: additional arguments for callback function 171 """ 172 173 if os.path.isfile(filename): 174 f = lpcap.open_pcap(filename) 175 else: 176 raise UMPASniffingException("can't open file: %s" % filename) 177 178 if filter: 179 f.setfilter(filter) 180 if callback is not None: 181 f.loop(count, callback, *callback_args) 182 elif count > 0: 183 packets = [] 184 for i in xrange(count): 185 packets.append(f.next()) 186 else: 187 packets = f.readpkts() 188 189 return packets 150 190 151 191 def to_file(): -
branch/UMPA/umpa/sniffing/libpcap/_abstract.py
r4811 r4858 48 48 "selected libpcap backend or abstract module") 49 49 50 def open_offline(fname): 51 """ 52 Open a file in tcpdump format for reading. 53 """ 54 55 raise NotImplementedError("not implemented method for the " 56 "selected libpcap backend or abstract module") 57 58 class open_live(object): 50 class open_pcap(object): 59 51 """ 60 52 Packet capture descriptor. 53 54 This is for online and offline capturing. 61 55 """ 62 56 -
branch/UMPA/umpa/sniffing/libpcap/pypcap.py
r4816 r4858 41 41 return result 42 42 43 class open_ live(open_live):43 class open_pcap(open_pcap): 44 44 def __init__(self, device=None, snaplen=1024, promisc=True, to_ms=0): 45 45 if device is None:
