root/umpa/branches/link-layer-integration/tests/a_unit/test_sniffing/test_sniffing_init.py @ 5692

Revision 5692, 6.5 kB (checked in by kosma, 3 years ago)

refactor out libpcap support: split tests into libpcap and sniffing

Line 
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3
4# Copyright (C) 2009 Adriano Monteiro Marques.
5#
6# Author: Bartosz SKOWRON <getxsick at gmail dot com>
7#
8# This library is free software; you can redistribute it and/or modify
9# it under the terms of the GNU Lesser General Public License as published
10# by the Free Software Foundation; either version 2.1 of the License, or
11# (at your option) any later version.
12#
13# This library is distributed in the hope that it will be useful, but
14# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
15# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
16# License for more details.
17#
18# You should have received a copy of the GNU Lesser General Public License
19# along with this library; if not, write to the Free Software Foundation,
20# Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
21
22import tempfile
23
24import umit.umpa
25import umit.umpa.sniffing
26from umit.umpa.protocols import IP, TCP
27from umit.umpa.protocols._decoder import decode
28from umit.umpa.utils.exceptions import UMPASniffingException
29from tests.utils import SendPacket
30
31import py.test
32
33class TestSniffing(object):
34    def test_get_available_devices(self):
35        if umit.umpa.config['libpcap'] == 'pypcap':
36            from umit.umpa.utils.libpcap.wrappers import pypcap
37            assert umit.umpa.sniffing.get_available_devices()==pypcap.findalldevs()
38
39    def test_get_default_device(self):
40        if umit.umpa.config['libpcap'] == 'pypcap':
41            from umit.umpa.utils.libpcap.wrappers import pypcap
42            assert umit.umpa.sniffing.get_default_device()==pypcap.lookupdev()
43
44    def test_sniff(self):
45        th = SendPacket(umit.umpa.Packet(IP(src="1.2.3.4", dst="8.8.8.8"),
46                                    TCP(srcport=99)))
47        th.start()
48        result = umit.umpa.sniffing.sniff(1, filter="src port 99")
49        th.join()
50
51        assert len(result) == 1
52        assert result[0].ip.src == '1.2.3.4'
53        assert result[0].tcp.srcport == 99
54
55    def test_sniff_next(self):
56        th = SendPacket(umit.umpa.Packet(IP(src="1.2.3.4", dst="8.8.8.8"),
57                                    TCP(srcport=99)))
58        th.start()
59        result = umit.umpa.sniffing.sniff_next(filter="src port 99")
60        th.join()
61
62        assert result.ip.src == '1.2.3.4'
63        assert result.tcp.srcport == 99
64
65        # send more, sniff one
66        th = SendPacket(umit.umpa.Packet(IP(src="1.2.3.4", dst="8.8.8.8"),
67                                    TCP(srcport=99)), 5)
68        th.start()
69        result = umit.umpa.sniffing.sniff_next(filter="src port 99")
70        th.join()
71
72        assert result.ip.src == '1.2.3.4'
73        assert result.tcp.srcport == 99
74
75
76    def test_sniff_loop(self):
77        def cbk(ts, pkt, *args):
78            assert pkt.ip.src == "1.2.3.6"
79            assert pkt.tcp.srcport == 99
80            assert ts > 0
81            assert len(args) == 2
82            if args[0] > args[1]:
83                raise UMPASniffingException("test")
84
85        th = SendPacket(umit.umpa.Packet(IP(src="1.2.3.6", dst="8.8.8.8"),
86                                    TCP(srcport=99)), 2)
87        th.start()
88        umit.umpa.sniffing.sniff_loop(1, filter="src 1.2.3.6",
89                                            callback=cbk, callback_args=[1,2])
90        th.join()
91
92        th = SendPacket(umit.umpa.Packet(IP(src="1.2.3.6", dst="8.8.8.8"),
93                                    TCP(srcport=99)))
94        th.start()
95        py.test.raises(UMPASniffingException, umit.umpa.sniffing.sniff_loop, 1,
96                        filter="src 1.2.3.6", callback=cbk, callback_args=[2,1] )
97        th.join()
98
99        py.test.raises(UMPASniffingException, umit.umpa.sniffing.sniff_loop, 1)
100
101    def test_from_file(self):
102        dump_file = tempfile.NamedTemporaryFile(mode="w")
103        th = SendPacket(umit.umpa.Packet(IP(src="1.2.3.6", dst="8.8.8.8"),
104                                    TCP(srcport=99)), 3)
105        th.start()
106        umit.umpa.sniffing.sniff(3, dump=dump_file.name,
107                            filter="src host 1.2.3.6 and src port 99")
108        th.join()
109
110        result = umit.umpa.sniffing.from_file(dump_file.name)
111
112        assert len(result) == 3
113        for packet in result:
114            assert packet.ip.src == "1.2.3.6"
115            assert packet.tcp.srcport == 99
116
117        result = umit.umpa.sniffing.from_file(dump_file.name, 2)
118        assert len(result) == 2
119        for packet in result:
120            assert packet.ip.src == "1.2.3.6"
121            assert packet.tcp.srcport == 99
122
123
124    def test_from_file_loop(self):
125        global idx
126        idx = 0
127        def cbk(ts, pkt, *args):
128            global idx
129            assert pkt.ip.src == "1.2.3.6"
130            assert pkt.tcp.srcport == 99
131            assert ts > 0
132            assert len(args) == 1
133            idx += 1
134
135        dump_file = tempfile.NamedTemporaryFile(mode="w")
136        th = SendPacket(umit.umpa.Packet(IP(src="1.2.3.6", dst="8.8.8.8"),
137                                    TCP(srcport=99)), 3)
138        th.start()
139        umit.umpa.sniffing.sniff(3, dump=dump_file.name,
140                            filter="src host 1.2.3.6 and src port 99")
141        th.join()
142
143        idx = 0
144        amount = 3
145        umit.umpa.sniffing.from_file_loop(dump_file.name, callback=cbk,
146                                    callback_args=[amount,])
147        assert idx == amount
148
149        idx = 0
150        amount = 2
151        umit.umpa.sniffing.from_file_loop(dump_file.name, 2, callback=cbk,
152                                            callback_args=[amount,])
153        assert idx == amount
154
155    def test_to_file(self):
156        dump_file = tempfile.NamedTemporaryFile(mode="w")
157        amount = 5
158
159        th = SendPacket(umit.umpa.Packet(IP(src="1.2.3.4", dst="8.8.8.8"),
160                                    TCP(srcport=99)), amount)
161        th.start()
162        try:
163            umit.umpa.sniffing.to_file(dump_file.name, amount,
164                    "src host 1.2.3.4 and src port 99")
165        except UMPASniffingException:
166            py.test.skip("no suitable devices for sniffing found. "
167                        "propably not sufficent priviliges.")
168        finally:
169            th.join()
170
171        result = umit.umpa.sniffing.from_file(dump_file.name)
172        assert len(result) == amount
173        for packet in result:
174            assert packet.ip.src == "1.2.3.4"
175            assert packet.tcp.srcport == 99
176
177        result = umit.umpa.sniffing.from_file(dump_file.name, 2)
178        assert len(result) == 2
179        for packet in result:
180            assert packet.ip.src == "1.2.3.4"
181            assert packet.tcp.srcport == 99
Note: See TracBrowser for help on using the browser.