root/umpa/branches/link-layer-integration/tests/a_unit/test_utils/test_libpcap/test_libpcap_init.py @ 5691

Revision 5691, 6.9 kB (checked in by kosma, 3 years ago)

refactor out libpcap support: refactor out tests (stage 1)

Line 
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3
4# Copyright (C) 2009 Adriano Monteiro Marques.
5#
6# Author: Bartosz SKOWRON <getxsick at gmail dot com>
7#
8# This library is free software; you can redistribute it and/or modify
9# it under the terms of the GNU Lesser General Public License as published
10# by the Free Software Foundation; either version 2.1 of the License, or
11# (at your option) any later version.
12#
13# This library is distributed in the hope that it will be useful, but
14# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
15# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
16# License for more details.
17#
18# You should have received a copy of the GNU Lesser General Public License
19# along with this library; if not, write to the Free Software Foundation,
20# Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
21
22import tempfile
23
24import umit.umpa
25import umit.umpa.sniffing
26from umit.umpa.protocols import IP, TCP
27from umit.umpa.protocols._decoder import decode
28from umit.umpa.utils.exceptions import UMPASniffingException
29from tests.utils import SendPacket
30
31import py.test
32
33class TestSniffing(object):
34    def test_import_backend(self):
35        assert hasattr(umit.umpa.sniffing, 'lpcap')
36        assert umit.umpa.sniffing.lpcap._backend == umit.umpa.config['libpcap']
37
38        oldlpcap = umit.umpa.config['libpcap']
39        umit.umpa.config['libpcap'] = "foobar"
40        py.test.raises(UMPASniffingException, "reload(umit.umpa.sniffing)")
41        umit.umpa.config['libpcap'] = oldlpcap
42
43    def test_get_available_devices(self):
44        if umit.umpa.config['libpcap'] == 'pypcap':
45            from umit.umpa.sniffing.libpcap import pypcap
46            assert umit.umpa.sniffing.get_available_devices()==pypcap.findalldevs()
47
48    def test_get_default_device(self):
49        if umit.umpa.config['libpcap'] == 'pypcap':
50            from umit.umpa.sniffing.libpcap import pypcap
51            assert umit.umpa.sniffing.get_default_device()==pypcap.lookupdev()
52
53    def test_sniff(self):
54        th = SendPacket(umit.umpa.Packet(IP(src="1.2.3.4", dst="8.8.8.8"),
55                                    TCP(srcport=99)))
56        th.start()
57        result = umit.umpa.sniffing.sniff(1, filter="src port 99")
58        th.join()
59
60        assert len(result) == 1
61        assert result[0].ip.src == '1.2.3.4'
62        assert result[0].tcp.srcport == 99
63
64    def test_sniff_next(self):
65        th = SendPacket(umit.umpa.Packet(IP(src="1.2.3.4", dst="8.8.8.8"),
66                                    TCP(srcport=99)))
67        th.start()
68        result = umit.umpa.sniffing.sniff_next(filter="src port 99")
69        th.join()
70
71        assert result.ip.src == '1.2.3.4'
72        assert result.tcp.srcport == 99
73
74        # send more, sniff one
75        th = SendPacket(umit.umpa.Packet(IP(src="1.2.3.4", dst="8.8.8.8"),
76                                    TCP(srcport=99)), 5)
77        th.start()
78        result = umit.umpa.sniffing.sniff_next(filter="src port 99")
79        th.join()
80
81        assert result.ip.src == '1.2.3.4'
82        assert result.tcp.srcport == 99
83
84
85    def test_sniff_loop(self):
86        def cbk(ts, pkt, *args):
87            assert pkt.ip.src == "1.2.3.6"
88            assert pkt.tcp.srcport == 99
89            assert ts > 0
90            assert len(args) == 2
91            if args[0] > args[1]:
92                raise UMPASniffingException("test")
93
94        th = SendPacket(umit.umpa.Packet(IP(src="1.2.3.6", dst="8.8.8.8"),
95                                    TCP(srcport=99)), 2)
96        th.start()
97        umit.umpa.sniffing.sniff_loop(1, filter="src 1.2.3.6",
98                                            callback=cbk, callback_args=[1,2])
99        th.join()
100
101        th = SendPacket(umit.umpa.Packet(IP(src="1.2.3.6", dst="8.8.8.8"),
102                                    TCP(srcport=99)))
103        th.start()
104        py.test.raises(UMPASniffingException, umit.umpa.sniffing.sniff_loop, 1,
105                        filter="src 1.2.3.6", callback=cbk, callback_args=[2,1] )
106        th.join()
107
108        py.test.raises(UMPASniffingException, umit.umpa.sniffing.sniff_loop, 1)
109
110    def test_from_file(self):
111        dump_file = tempfile.NamedTemporaryFile(mode="w")
112        th = SendPacket(umit.umpa.Packet(IP(src="1.2.3.6", dst="8.8.8.8"),
113                                    TCP(srcport=99)), 3)
114        th.start()
115        umit.umpa.sniffing.sniff(3, dump=dump_file.name,
116                            filter="src host 1.2.3.6 and src port 99")
117        th.join()
118
119        result = umit.umpa.sniffing.from_file(dump_file.name)
120
121        assert len(result) == 3
122        for packet in result:
123            assert packet.ip.src == "1.2.3.6"
124            assert packet.tcp.srcport == 99
125
126        result = umit.umpa.sniffing.from_file(dump_file.name, 2)
127        assert len(result) == 2
128        for packet in result:
129            assert packet.ip.src == "1.2.3.6"
130            assert packet.tcp.srcport == 99
131
132
133    def test_from_file_loop(self):
134        global idx
135        idx = 0
136        def cbk(ts, pkt, *args):
137            global idx
138            assert pkt.ip.src == "1.2.3.6"
139            assert pkt.tcp.srcport == 99
140            assert ts > 0
141            assert len(args) == 1
142            idx += 1
143
144        dump_file = tempfile.NamedTemporaryFile(mode="w")
145        th = SendPacket(umit.umpa.Packet(IP(src="1.2.3.6", dst="8.8.8.8"),
146                                    TCP(srcport=99)), 3)
147        th.start()
148        umit.umpa.sniffing.sniff(3, dump=dump_file.name,
149                            filter="src host 1.2.3.6 and src port 99")
150        th.join()
151
152        idx = 0
153        amount = 3
154        umit.umpa.sniffing.from_file_loop(dump_file.name, callback=cbk,
155                                    callback_args=[amount,])
156        assert idx == amount
157
158        idx = 0
159        amount = 2
160        umit.umpa.sniffing.from_file_loop(dump_file.name, 2, callback=cbk,
161                                            callback_args=[amount,])
162        assert idx == amount
163
164    def test_to_file(self):
165        dump_file = tempfile.NamedTemporaryFile(mode="w")
166        amount = 5
167
168        th = SendPacket(umit.umpa.Packet(IP(src="1.2.3.4", dst="8.8.8.8"),
169                                    TCP(srcport=99)), amount)
170        th.start()
171        try:
172            umit.umpa.sniffing.to_file(dump_file.name, amount,
173                    "src host 1.2.3.4 and src port 99")
174        except UMPASniffingException:
175            py.test.skip("no suitable devices for sniffing found. "
176                        "propably not sufficent priviliges.")
177        finally:
178            th.join()
179
180        result = umit.umpa.sniffing.from_file(dump_file.name)
181        assert len(result) == amount
182        for packet in result:
183            assert packet.ip.src == "1.2.3.4"
184            assert packet.tcp.srcport == 99
185
186        result = umit.umpa.sniffing.from_file(dump_file.name, 2)
187        assert len(result) == 2
188        for packet in result:
189            assert packet.ip.src == "1.2.3.4"
190            assert packet.tcp.srcport == 99
Note: See TracBrowser for help on using the browser.