【Python】ICMPを使ったネットワークプログラミング
PythonでRawSocketを使ってICMPパケットの送受信をしてみたいと思います。
tracerouteを実装します。
このツール今までc言語、Rust、Rubyで実装したことがあります。
c言語やRustと実装した時と比べてRubyで実装すると行数をそれなりに抑えることができました。
Pythonでも少ない行数で作ることができます。
ソースコード
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 |
# coding: utf-8 import socket import struct import select import sys class RouteScanner: def __init__(self): # icmp socketを作成 ipヘッダーを修正する self.soc = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP) self.soc.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL,1) def checksum(self, data): data_len = len(data) // 2*2 sum = 0 for i in range(0,data_len,2): sum += (ord(data[i+1]) << 8) + ord(data[i]) #Python3 #sum += (ord(chr(data[i+1])) << 8) + ord(chr(data[i])) if len(data) % 2 != 0: sum += ord(data[-1]) #Python3 #sum += ord(chr(data[-1])) while sum >> 16: sum = (sum >> 16) + (sum & 0xffff) sum = sum >> 8 | (sum << 8 & 0xff00) return ~sum&0xffff # ipヘッダー作成 def make_ip(self, target_ip, ttl): #固定の値はハードコード ip_ver = 4 ip_hl = 5 ip_tos = 0 ip_len = 28 ip_id = 1 ip_off = 0 ip_ttl = ttl ip_proto = socket.IPPROTO_ICMP ip_check = 0 ip_src = socket.inet_aton("0.0.0.0") ip_dst = socket.inet_aton(target_ip) ip_ver_hl = (ip_ver << 4) + ip_hl ip_header = struct.pack("!BBHHHBBH4s4s",ip_ver_hl,ip_tos,ip_len,ip_id,ip_off,ip_ttl,ip_proto,ip_check,ip_src,ip_dst) ip_check = self.checksum(ip_header) ip_header = struct.pack("!BBHHHBBH4s4s",ip_ver_hl,ip_tos,ip_len,ip_id,ip_off,ip_ttl,ip_proto,ip_check,ip_src,ip_dst) return ip_header # icmpヘッダー作成 def make_icmp_echo_request(self): # 固定の値はハードコード # echo request type = 8 code = 0 check = 0 id = 1 seq = 1 check = self.checksum(struct.pack("!BBHHH", type, code, check, id, seq)) return struct.pack("!BBHHH", type, code, check, id, seq) # ipヘッダー取得 def extract_ip(self, packet): #packで配列に変換 第一引数はipヘッダーに合うように指定 return struct.unpack('!BBHHHBBH4s4s', packet) # スキャンします def scan(self, target_ip): print("Scanning to {} ...".format(target_ip)) # 到達したかどうか reach = 0 # 1~255まで確認 for i in range(1, 256): ip = self.make_ip(target_ip, i) icmp = self.make_icmp_echo_request() packet = ip+icmp # 3回繰り返す for _ in range(3): self.soc.sendto(packet, (target_ip, 0)) # socketがreadyになるのを待つ sel_res = select.select([self.soc],[],[],3) if len(sel_res[0]) > 0: packet, addr = sel_res[0][0].recvfrom(1024) ip = self.extract_ip(packet[0:20]) start = (ip[0]&0x0f)*4 res = self.analyze(packet[start:start+8]) if res == 1: #到達した print("- Reach time to live {} from {}".format(i, addr[0])) reach = 1 break elif res == 0: #途中ノードから返ってきた print("- time to live {} from {}".format(i, addr[0])) break # 到達したのでループを抜ける if reach == 1: break # パケットを確認 def analyze(self,packet): icmp = struct.unpack("!BBHHH", packet) if icmp[0] == 0: # エコー応答なら return 1 # 到達した elif icmp[0] == 11: # 時間超過なら return 0 # 到達していない return 2 #一応2を返しておく |
ソケット作成
icmpソケットを作成し、ipヘッダーを自分で作ることができるようsetsockoptで設定します。
1 2 3 4 5 |
def __init__(self): # icmp socketを作成 ipヘッダーを修正する self.soc = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP) self.soc.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL,1) |
パケット作成
ipヘッダーをicmpヘッダーを作成します。
固定の部分はハードコードしてます。
ターゲットにエコー要求を送信してそれに対し返却されたパケットの内容に応じてその後の処理を決定します。
想定しているicmp受信パケットはエコー応答か時間超過です。
ルーターからは時間超過パケット、ターゲットからエコー応答が返却される想定です。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
# ipヘッダー作成 def make_ip(self, target_ip, ttl): #固定の値はハードコード ip_ver = 4 ip_hl = 5 ip_tos = 0 ip_len = 28 ip_id = 1 ip_off = 0 ip_ttl = ttl ip_proto = socket.IPPROTO_ICMP ip_check = 0 ip_src = socket.inet_aton("0.0.0.0") ip_dst = socket.inet_aton(target_ip) ip_ver_hl = (ip_ver << 4) + ip_hl ip_header = struct.pack("!BBHHHBBH4s4s",ip_ver_hl,ip_tos,ip_len,ip_id,ip_off,ip_ttl,ip_proto,ip_check,ip_src,ip_dst) ip_check = self.checksum(ip_header) ip_header = struct.pack("!BBHHHBBH4s4s",ip_ver_hl,ip_tos,ip_len,ip_id,ip_off,ip_ttl,ip_proto,ip_check,ip_src,ip_dst) return ip_header # icmpヘッダー作成 def make_icmp_echo_request(self): # 固定の値はハードコード # echo request type = 8 code = 0 check = 0 id = 1 seq = 1 check = self.checksum(struct.pack("!BBHHH", type, code, check, id, seq)) return struct.pack("!BBHHH", type, code, check, id, seq) |
スキャン
time to liveを1~255までインクリメントするループに確認のための3回ループをネストしています。
ひとつのtime to liveにつき3回まで送信処理を繰り返します。
パケットの送信後socketがレディー状態になるのを3秒間待ちます。
パケットを受信したらipヘッダーとicmpヘッダーを抽出して、内容を確認します。
エコー応答ならターゲットからのパケットのため送信処理を終了します。
時間超過なら途中ノードからのパケットと判断し、
3回ループを抜けてtime to liveをインクリメントして送信処理を続行します。
動作確認
以下のようにクラスを使用します。
1 2 3 |
target_ip = {ターゲットのipアドレス} scanner = RouteScanner() scanner.scan(target_ip) |