美文网首页视觉艺术
python3端口扫描的编写

python3端口扫描的编写

作者: CSeroad | 来源:发表于2020-04-11 16:10 被阅读0次

前言

之前学习python2的时候写了端口扫描的一些脚本,写的不怎么样。抽空又重新捋了一遍,使用python3从简单的单线程单个IP到编写多线程、队列加线程。也算是以端口扫描为目标又重新学了一遍python3。

端口扫描

V0.1
python3写的单线程扫描自定义端口。也是最简单的探测端口开放状态的脚本。
注解:
1.s.settimeout(0.1) 设置了超时时间;
2.用最简单的connect_ex方法进行socket连接,成功返回0,失败返回error;
3.使用sys模块来接收参数;

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import socket
import sys

def scan(host,port):  
    s = socket.socket()
    s.settimeout(0.1)
    # 设置超时时间
    if s.connect_ex((host, port)) == 0:  
        # 与connect一样,成功返回0.失败返回error
        print(port,'open')
    s.close()
if __name__ == '__main__':
    if(len(sys.argv) == 2):
        print('User: please port_scan.py IP')
        host = sys.argv[1]
        port = [22,23,25,135,137,445,3306,1541,3389,1080,9200]
        for p in port:
            scan(host,p)
    else:
        print('User: please port_scan.py IP')

V0.2
python3写的单线程扫描端口段。适用于探测一些web资产,比如8000-10000之间的端口。
注解:
相比较V0.1脚本出现了一下变动:
1.使用内置的map函数,对每一个元素调用function函数。即每一个port调用scan函数;
2.返回的结果在python2位list类型,python3手动添加list;

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import socket
import sys

def scan(port):  
    s = socket.socket()
    s.settimeout(0.1)
    if s.connect_ex((sys.argv[1], port)) == 0:
        print(port,'open')
    s.close()
if __name__ == '__main__':
    if(len(sys.argv) == 4):
        print('User: please port_scan.py IP start_port end_port')
        start_port = int(sys.argv[2])
        end_port = int(sys.argv[3])
        list(map(scan,range(start_port,end_port)))
        # python2 可以直接map返回list,python3 需要手动list
    else:
        print('User: please port_scan.py IP start_port end_port')

V0.2.1
python3写的单线程扫描端口段。
注解:
1.相比较V0.2脚本,引入了optparse 模块用于处理命令行参数;

#!/usr/bin/env python3
#-*- coding:utf-8 -*-

import socket
import sys
import optparse
# optparse 模块用于处理命令行参数

def scan(host,port):  
    s = socket.socket()
    s.settimeout(0.1)
    if s.connect_ex((host, port)) == 0:
        print(port,'open')
    s.close()
if __name__ == '__main__':
    parser = optparse.OptionParser("port_scan.py -H host -s start_port -e end_port")
    parser.add_option("-H",action="store",type="string",dest="host",default="127.0.0.1",help="ip")
    parser.add_option("-s",action="store",type="int",dest="start_port",default=0,help="start_port")
    parser.add_option("-e",action="store",type="int",dest="end_port",default=512,help="end_port")
    (options,args) = parser.parse_args()
    #print(options)
    host = options.host
    start_port = options.start_port
    end_port   = options.end_port
    for p in range(start_port,end_port):
            scan(host,p)

V1.0
python3写的多线程扫描自定义端口。
适用于更快更高效的快速扫描一些端口。
注解:
1.首先使用了threading模块多线程,将port进行循环添加线程中,有多少了port就有多少线程数。在port_list列表的端口扫描够了;
2.没有加Lock锁;

# !/usr/bin/env python3
# -*- coding: utf-8 -*-
import threading
import socket
import sys


def scan(host,port):
    s = socket.socket()
    s.settimeout(0.1)
    if s.connect_ex((host, port)) == 0:
        print(port,'open')
    s.close()

def thread():
    thread_list = []
    port_list = [21,22,23,25,80,135,137,139,445,1433,1521,3306,3389,8080,9015]
    for port in port_list:
        t = threading.Thread(target = scan,args = (host,port))
        thread_list.append(t)
    for t in thread_list:
        t.start() #启动线程
    for t in thread_list:
        t.join() #等待到线程终止

if __name__ == '__main__':
    if(len(sys.argv) == 2):
        print('User: please port_scan_thread.py IP')
        host = sys.argv[1]
        thread()
    else:
        print('User: please port_scan_thread.py IP')

V1.0.1
python3 写的多线程自定义端口扫描。这次使用继承threading类的方式来编写。
注解:
1.在对run方法进行重写的时候,python2 使用的apply()函数,python3使用星号标记。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import threading
import sys
import socket

class Mythread(threading.Thread):
    def __init__(self,fun,args):
        # 初始化
        threading.Thread.__init__(self)
        self.fun = fun
        self.args = args
    def run(self):
        # 重写run方法
        self.fun(*self.args)
        # python2 apply(self.fun,self.args) 
        
def scan(host,port):
    s = socket.socket()
    s.settimeout(0.1)
    if s.connect_ex((host, port)) == 0:
        print(port,'open')
    s.close()


def thread(host):
    port_list = [21,22,23,25,80,135,137,139,445,1433,1521,3306,3389,8080,9015]
    thread_list = []
    for port in port_list:
        t = Mythread(scan,(host,port))
        thread_list.append(t)
    for t in thread_list:
        t.start()
    for t in thread_list:
        t.join()

if __name__=='__main__':
    if(len(sys.argv) == 2):
        print('User: please port_scan_thread.py IP')
        host = sys.argv[1]
        thread(host)
    else:
        print('User: please port_scan_thread.py IP')

V1.1
python3写的多线程扫描指定端口范围。
适用于更快更高效的快速扫描端口,但是端口范围不易过大。
注解:
1.使用threading模块,将port进行循环添加线程中,有多少了port就有多少线程数。端口范围不易过大,否则意味着线程过高。虽然加了Lock锁,当线程过高的时候,程序依然会崩溃;
2.注意加锁的位置,在connect_ex之后;
3.也加了时间方便比较单线程;

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import threading
import socket
import sys
import time

lock = threading.Lock()
# 创建锁
def scan(host,port):
        s = socket.socket()
        s.settimeout(0.1)
        if s.connect_ex((host, port)) == 0:
                if lock.acquire(): #锁定
                        print(port,'open')
                        lock.release() #释放锁
        s.close()

def thread(port_list):
    thread_list = []
    for port in port_list:
        t = threading.Thread(target = scan,args = (host,port))
        thread_list.append(t)
    for t in thread_list:
        t.start()
    for t in thread_list:
        t.join()

if __name__ == '__main__':
    if(len(sys.argv) == 4):
        print('User: please port_scan_thread.py IP start_port end_port')
        host = sys.argv[1]
        start_port = int(sys.argv[2])
        end_port = int(sys.argv[3])
        port_list = list(range(start_port,end_port))
        t1 = time.time()
        thread(port_list)
        print('intime:',time.time()-t1)
    else:
        print('User: please port_scan_thread.py IP start_port end_port')

V1.2
python3 写的多线程扫描端口段。
注解
1.相比较V1.1,添加了队列解决线程过高导致程序崩溃的问题;
2.利用队列先入先出;
3.设置线程数为50;

#!/usr/bin/env python3
#-*- coding:utf-8 -*-
import threading
import socket
import sys
import time
import queue


def scan(host,port):
        s = socket.socket()
        s.settimeout(0.1)
        if s.connect_ex((host, port)) == 0:
            print(port,'open')
        s.close()

def worker(host):
    while not q.empty():
        port = q.get() # 取出队列
        try:
            scan(host,port)
        finally:
            q.task_done()
def thread(host):
    thread_list = []
    for t in range(50):
        t = threading.Thread(target = worker,args = (host,))
        thread_list.append(t)
    for x in thread_list:
        x.start()
    for x in thread_list:
        x.join()

if __name__ == '__main__':
    if(len(sys.argv) == 4):
        print('User: please port_scan_thread.py IP start_port end_port')
        host = sys.argv[1]
        start_port = int(sys.argv[2])
        end_port = int(sys.argv[3])
        t1 = time.time()
        q = queue.Queue() # 创建队列的对象
        for port in range(start_port,end_port):
            q.put(port) #put()方法将值port放入队列
        thread(host)
        q.join()
        print("end time:",time.time()-t1)
    else:
        print('User: please port_scan_thread.py IP start_port end_port')

V1.2.1
python3 写的多线程扫描端口段。
注解
1.只是把thread线程设置为自定义参数。当然不建议自定义的线程过高;

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import threading
import socket
import sys
import time
import queue


def scan(host,port):
        s = socket.socket()
        s.settimeout(0.1)
        if s.connect_ex((host, port)) == 0:
            print(port,'open')
        s.close()

def worker(host):
    while not q.empty():
        port = q.get() # 取出队列
        try:
            scan(host,port)
        finally:
            q.task_done()
def thread(host,threads):
    thread_list = []
    for t in range(threads):
        t = threading.Thread(target = worker,args = (host,))
        thread_list.append(t)
    for x in thread_list:
        x.start()
    for x in thread_list:
        x.join()

if __name__ == '__main__':
    if(len(sys.argv) == 5):
        print('User: please port_scan_thread.py IP start_port end_port threads')
        host = sys.argv[1]
        start_port = int(sys.argv[2])
        end_port = int(sys.argv[3])
        threads = int(sys.argv[4])
        t1 = time.time()
        q = queue.Queue() # 创建队列的对象
        for port in range(start_port,end_port):
            q.put(port) #put()方法将值port放入队列
        thread(host,threads)
        q.join()
        print("end time:",time.time()-t1)
    else:
        print('User: please port_scan_thread.py IP start_port end_port threads')

V1.3
python3写的多线程全端口扫描。
注解
1.默认也是50个线程;
2.把start_port、end_port写死就行;

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import threading
import socket
import sys
import time
import queue


def scan(host,port):
        s = socket.socket()
        s.settimeout(0.1)
        if s.connect_ex((host, port)) == 0:
            print(port,'open')
        s.close()

def worker(host):
    while not q.empty():
        port = q.get() # 取出队列
        try:
            scan(host,port)
        finally:
            q.task_done() 
def thread(host):
    thread_list = []
    for t in range(50):
        t = threading.Thread(target = worker,args = (host,))
        thread_list.append(t)
    for x in thread_list:
        x.start()
    for x in thread_list:
        x.join()

if __name__ == '__main__':
    if(len(sys.argv) == 2):
        print('User: please port_scan_thread.py IP')
        host = sys.argv[1]
        t1 = time.time()
        q = queue.Queue() # 创建队列的对象
        for port in range(1,65535):
            q.put(port) #put()方法将值port放入队列
        thread(host)
        q.join()
        print("end time:",time.time()-t1)
    else:
        print('User: please port_scan_thread.py IP')

参考资料

https://github.com/windard/Port_Scan
https://blog.csdn.net/handsomekang/article/details/39826729

相关文章

网友评论

    本文标题:python3端口扫描的编写

    本文链接:https://www.haomeiwen.com/subject/ixycuhtx.html