Changeset 5591
- Timestamp:
- 06/04/10 22:38:07 (3 years ago)
- Location:
- UMPA/branch/link-layer-integration
- Files:
-
- 4 modified
-
tests/a_unit/test_sockets.py (modified) (4 diffs)
-
tests/utils.py (modified) (1 diff)
-
umit/umpa/__init__.py (modified) (1 diff)
-
umit/umpa/_sockets.py (modified) (2 diffs)
Legend:
- Unmodified
- Added
- Removed
-
UMPA/branch/link-layer-integration/tests/a_unit/test_sockets.py
r5170 r5591 2 2 # -*- coding: utf-8 -*- 3 3 4 # Copyright (C) 2009 Adriano Monteiro Marques.4 # Copyright (C) 2009-2010 Adriano Monteiro Marques. 5 5 # 6 # Author: Bartosz SKOWRON <getxsick at gmail dot com> 6 # Authors: Bartosz SKOWRON <getxsick at gmail dot com> 7 # Kosma Moczek <kosma at kosma dot pl> 7 8 # 8 9 # This library is free software; you can redistribute it and/or modify … … 23 24 import py.test 24 25 25 from umit.umpa import Socket, Packet 26 from umit.umpa.protocols import IP, TCP 26 from umit.umpa import Socket, SocketL2, Packet 27 import umit.umpa.sniffing 28 from umit.umpa.protocols import Ethernet, IP, TCP, UDP, Payload 27 29 from umit.umpa.utils.exceptions import UMPAException, UMPANotPermittedException 28 30 from tests.utils import SendPacket, SendPacketL2 29 31 30 32 class TestUMPASockets(object): … … 35 37 def test_sent_size(self): 36 38 if os.name == 'posix' and os.geteuid() != 0: 37 py.test.skip('root-privil iges are needed')39 py.test.skip('root-privileges are needed') 38 40 39 41 p1 = Packet(IP(), TCP()) … … 42 44 s = Socket() 43 45 size = s.send(p1, p2) 46 assert size == [40, 40] 44 47 45 assert size == [40, 40] 48 def test_send_size_L2(self): 49 if os.name == 'posix' and os.geteuid() != 0: 50 py.test.skip('root-privileges are needed') 51 52 p1 = Packet(Ethernet(src='00:11:22:33:44:55', dst='00:11:22:33:44:55'), 53 IP(src="127.0.0.1", dst="127.0.0.1"), 54 TCP(srcport=1234, dstport=4321), 55 Payload('xyz')) 56 p2 = Packet(Ethernet(src='00:11:22:33:44:55', dst='00:11:22:33:44:55'), 57 IP(src="127.0.0.1", dst="127.0.0.1"), 58 TCP(srcport=1234, dstport=4321), 59 Payload('xyz')) 60 61 s = SocketL2(iface='lo') 62 size = s.send(p1, p2) 63 64 assert size == [57, 57] 65 66 def test_send_sniff_tcp(self): 67 if os.name == 'posix' and os.geteuid() != 0: 68 py.test.skip('root-privileges are needed') 69 70 p = Packet(Ethernet(src='00:11:22:33:44:55', dst='00:11:22:33:44:55'), 71 IP(src="127.0.0.1", dst="127.0.0.1"), 72 UDP(srcport=1234, dstport=4321), 73 Payload('xyz')) 74 75 th = SendPacketL2(p, iface='lo') 76 th.start() 77 result = umit.umpa.sniffing.sniff(1, device='lo', filter="src port 1234") 78 th.join() 79 80 assert len(result) == 1 81 assert result[0].ethernet.src == '00:11:22:33:44:55' 82 assert result[0].ip.src == '127.0.0.1' 83 assert result[0].udp.srcport == 1234 84 assert result[0].udp.dstport == 4321 85 86 def test_send_sniff_tcp_L2(self): 87 if os.name == 'posix' and os.geteuid() != 0: 88 py.test.skip('root-privileges are needed') 89 90 p = Packet(Ethernet(src='00:11:22:33:44:55', dst='00:11:22:33:44:55'), 91 IP(src="127.0.0.1", dst="127.0.0.1"), 92 TCP(srcport=1234, dstport=4321), 93 Payload('xyz')) 94 95 th = SendPacketL2(p, iface='lo') 96 th.start() 97 result = umit.umpa.sniffing.sniff(1, device='lo', filter="src port 1234") 98 th.join() 99 100 assert len(result) == 1 101 assert result[0].ethernet.src == '00:11:22:33:44:55' 102 assert result[0].ip.src == '127.0.0.1' 103 assert result[0].tcp.srcport == 1234 104 assert result[0].tcp.dstport == 4321 -
UMPA/branch/link-layer-integration/tests/utils.py
r5170 r5591 35 35 time.sleep(2) 36 36 s.send(self._packet) 37 38 class SendPacketL2(threading.Thread): 39 def __init__(self, packet, amount=1, iface=None): 40 super(SendPacketL2, self).__init__() 41 self._packet = packet 42 self._amount = amount 43 self._iface = iface 44 def run(self): 45 s = umit.umpa.SocketL2(iface=self._iface) 46 for i in xrange(self._amount): 47 time.sleep(2) 48 s.send(self._packet) -
UMPA/branch/link-layer-integration/umit/umpa/__init__.py
r5170 r5591 41 41 from umit.umpa._config import config 42 42 from umit.umpa._packets import Packet 43 from umit.umpa._sockets import Socket 43 from umit.umpa._sockets import Socket, SocketL2 44 44 45 45 # UMPA handles with the local directory $HOME/.umpa -
UMPA/branch/link-layer-integration/umit/umpa/_sockets.py
r5413 r5591 2 2 # -*- coding: utf-8 -*- 3 3 4 # Copyright (C) 2008-2009 Adriano Monteiro Marques. 5 # 6 # Author: Bartosz SKOWRON <getxsick at gmail dot com> 4 # Copyright (C) 2008-2010 Adriano Monteiro Marques. 5 # 6 # Authors: Bartosz SKOWRON <getxsick at gmail dot com> 7 # Kosma Moczek <kosma at kosma dot pl> 7 8 # 8 9 # This library is free software; you can redistribute it and/or modify … … 21 22 22 23 """ 23 Socket connection management. 24 25 It contains Socket class which should be used instead of 26 socket.socket() directly from the standard library. 27 28 But it's correctly to use socket module directly if needed. 24 Raw sockets support. 25 26 Contains Socket classes which can be used to send raw packets in 27 a platform-independent manner. The standard Python socket module can be used 28 instead if advanced functionalities are needed. 29 29 """ 30 30 31 31 import socket 32 import sys 33 import re 34 from fcntl import ioctl 32 35 33 36 from umit.umpa.utils.exceptions import UMPAException, UMPANotPermittedException 37 38 # constants from various header files not available under Python 39 ETH_P_ALL = 3 # from linux/if_ether.h 40 BIOCSETIF = 2149597804 # from net/bpf.h 41 42 # Detect socket programming model. This greatly simplifies socket code. 43 if re.match('linux', sys.platform): 44 _l2model = 'AF_PACKET' 45 _l3model = 'AF_INET' 46 _l3quirk = None 47 elif re.match('freebsd|netbsd|darwin', sys.platform): 48 _l2model = 'bpf' 49 _l3model = 'AF_INET' 50 _l3quirk = 'ntohs' 51 elif re.match('openbsd', sys.platform): 52 _l2model = 'bpf' 53 _l3model = 'AF_INET', 54 _l3quirk = None 55 elif re.match('win', sys.platform): 56 _l2model = 'NDIS' 57 _l3model = 'AF_INET' 58 _l3quirk = 'windows' 59 else: 60 _l2model = None 61 _l3model = None 62 _l3quirk = None 63 64 def send(*packets, **kwargs): 65 """ 66 Send arbitrary packets. 67 68 The function creates sockets of proper level as needed. The 'iface' 69 named argument must be supplied for L2 (link-layer) sockets. 70 71 @type packets: C{Packet} 72 @param packets: list of umit.umpa.Packet objects to send. 73 74 @returns: List of return values (byte counts) from the send() function. 75 """ 76 77 sent_bytes = [] 78 for packet in packets: 79 # create appropriate socket based on packet's lowermost layer 80 if packet.protos[0].layer == 2: 81 sock = SocketL2(iface=kwargs.get('iface')) 82 else: 83 sock = SocketL3() 84 sent_bytes.extend(sock.send(packet)) 85 return sent_bytes 86 87 class SocketL2(object): 88 """ 89 Level 2 (link-layer) socket class. 90 91 Supported platforms: Linux (AF_PACKET), BSD (bpf). 92 """ 93 94 def __init__(self, iface=None): 95 """ 96 Create a new SocketL2. 97 98 @type iface: C{str} 99 @param iface: Interface to use for sending the packets. 100 """ 101 if iface is None: 102 # TODO: port interface detection from the link-layer branch 103 raise NotImplementedError("You need to specify iface") 104 105 if _l2model == 'AF_PACKET': 106 try: 107 self._sock = socket.socket(socket.AF_PACKET, 108 socket.SOCK_RAW, 109 ETH_P_ALL) 110 except socket.error, msg: 111 raise UMPANotPermittedException(msg) 112 113 self._sock.bind((iface, ETH_P_ALL)) 114 self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 2**20) 115 elif _l2model == 'bpf': 116 try: 117 self._sock = open("/dev/bpf", 'w') 118 except IOError, msg: 119 raise UMPAException('cannot open /dev/bpf: ' + msg) 120 121 ioctl(self._sock, BIOCSETIF, iface) 122 else: 123 raise NotImplementedError("L2 sockets unsupported on your platform") 124 125 def send(self, *packets): 126 """ 127 Send packets through the socket. 128 129 @type packets: C{Packet} 130 @param packets: List of umit.umpa.Packet objects to send. 131 132 @returns: List of return values (byte counts) from the send() function. 133 """ 134 135 sent_bytes = [] 136 for packet in packets: 137 if _l2model == 'AF_PACKET': 138 sent_bytes.append(self._sock.send(packet.get_raw())) 139 elif _l2model == 'bpf': 140 sent_bytes.append(self._sock.write(packet.get_raw())) 141 else: 142 raise NotImplementedError("L2 send unsupported on your platform") 143 return sent_bytes 144 145 # TODO: legacy L3 code below, not integrated yet 34 146 35 147 class Socket(object):
