过程

如果不太明白SCOKS5具体通讯过程,请先阅读一篇很详细的文章SOCKS5代理原理探索
建立SOCKS5连接时获取访问的目标域名或者IP,如果在白名单中或者是你想要拦截的广告等可进行直连或者屏蔽,其他情况则连接设置的目标SOCKS5代理
话不多说,直接上源码

源码

import datetime
import json
import os
import platform
import re
import socket
import select
import threading
import struct
import time
import tracebackconf = None  # 全局配置def loadJsonFile(path):with open(path, 'r', encoding='utf-8') as f:data = json.load(f)return datadef add_log(s):filename = datetime.datetime.now().strftime('%Y-%m-%d')with open(filename + ".txt", 'a') as f:f.write(str(s) + "\n")class Conf(object):white_list = Noneblack_list = Noneproxy_ip = Noneproxy_port = Nonelocal_port = Nonedef __init__(self):json_conf = loadJsonFile('./conf.json')self.white_list = set(json_conf["white_list"])self.black_list = set(json_conf["black_list"])self.proxy_ip = json_conf["proxy_ip"]self.proxy_port = int(json_conf["proxy_port"])self.local_port = set(json_conf["local_port"])# 客户端开始发送具体的请求
class S5Req:def __init__(self, buf):self.ver, self.cmd, self.rsv, self.atyp = struct.unpack("BBBB", buf)self.dst_addr = Noneself.dst_port = Noneself.begin_buf = Nonedef parse_port(self, buf):  # 解析端口port = struct.unpack("H", buf[0:2])[0]self.dst_port = int(socket.ntohs(int(port)))def parse_ipv4(self, buf):  # 解析IPself.dst_addr = socket.inet_ntoa(buf[0:4])self.parse_port(buf[4:])def parse_domain_name(self, buf):  # 解析域名name_len = struct.unpack("B", buf[0:1])[0]self.dst_addr = bytes.decode(buf[1:name_len + 1])self.parse_port(buf[1 + name_len:])def parse_netloc(self, buf):  # 解析网络地址if self.atyp == 3:self.parse_domain_name(buf)if self.atyp == 1:self.parse_ipv4(buf)# 服务端收到请求后,处理后返回
class S5Resp:def __init__(self):self.ver = 5self.rep = 1self.rsv = 0self.atyp = 1self.end_buf = Nonedef pack(self):buf = struct.pack("BBBBIH", self.ver, self.rep, self.rsv, self.atyp, 0, 0)return bufclass Socks5Error(Exception):passclass Socks5Thread(threading.Thread):wait = 60.0buf_size = 1024 * 4buf1 = Nonebuf2 = Nonebuf3 = Noneis_white = Falsedef __init__(self, s, ip, port, socks5):self.s = sself.dst_s = Noneself.ip = ipself.port = portself.socks5 = socks5threading.Thread.__init__(self)def run(self):resp = S5Resp()try:global confself.socks5.a += 1self.buf1 = self.s.recv(255)if not self.buf1:raise socket.error# 协议版本号x05 s5 METHOD x00 无需认证self.s.send(b"\x05\x00")self.buf2 = self.s.recv(4)if not self.buf2 or len(self.buf2) != 4:raise socket.errorreq = S5Req(self.buf2)if req.ver != 5:  # 判断是否为客户端s5代理resp.rep = 1raise Socks5Errorcount = 255if req.atyp == 1:count = 6self.buf3 = self.s.recv(count)req.parse_netloc(self.buf3)if req.atyp == 3:try:addr = socket.gethostbyname(req.dst_addr)except socket.error:resp.rep = 4raise Socks5Errorelse:addr = req.dst_addrresp.rep = 0self.s.send(resp.pack())req.begin_buf = self.s.recv(256)if not req.begin_buf:raise socket.errorself.dst_s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)try:if req.dst_addr in conf.black_list:raise socket.errorelif req.dst_addr in conf.white_list or conf.proxy_ip == "":self.is_white = Trueself.dst_s.connect((addr, req.dst_port))self.dst_s.send(req.begin_buf)self.forward_loop(req=req)else:self.dst_s.connect((conf.proxy_ip, conf.proxy_port))self.dst_s.send(self.buf1)self.dst_s.recv(255)self.dst_s.send(self.buf2 + self.buf3)self.dst_s.recv(255)self.dst_s.send(req.begin_buf)self.forward_loop(req=req)except socket.error:raise socket.errorexcept Socks5Error:# traceback.print_exc()try:self.s.send(resp.pack())except Socks5Error:passexcept socket.error:# traceback.print_exc()passexcept:traceback.print_exc()finally:if self.s:self.s.close()if self.dst_s:self.dst_s.close()def forward_loop(self, req):rs_time = 0send_size = 0recv_size = 0begin_time = time.time()try:while 1:r, w, x = select.select([self.s, self.dst_s], [], [], self.wait)if not r:returnbegin_rs = time.time()try:for s in r:if s is self.s:buf = self.s.recv(self.buf_size)if not buf:raise socket.errorsend_size += len(buf)self.dst_s.send(buf)if s is self.dst_s:buf = self.dst_s.recv(self.buf_size)if not buf:raise socket.errorrecv_size += len(buf)self.s.send(buf)finally:rs_time += (time.time() - begin_rs)finally:if req.begin_buf:send_size = send_size >> 10recv_size = recv_size >> 10info = "是否直连:" + str(self.is_white) + " " + req.dst_addr + " " + str(req.dst_port) + " 发送:" + str(send_size) + "KB 接收:" + str(recv_size) + "KB 连接时长:" + \str(round(time.time() - begin_time, 4)) + "秒 传输时长:" + str(round(rs_time, 4)) + "秒 首次内容:" + \str(req.begin_buf)print(info)add_log(info)class Socks5(threading.Thread):def __init__(self, ip="0.0.0.0", port=8080):self.ip = ipself.port = portself.s = Nonethreading.Thread.__init__(self)self.a = 1def run(self):try:self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)self.s.bind((self.ip, self.port))self.s.listen(5)except socket.error as msg:print(msg)if self.s:self.s.close()self.s = Nonereturn Falsewhile 1:try:conn, addr = self.s.accept()thread = Socks5Thread(conn, addr[0], addr[1], self)thread.start()except socket.error as msg:print(msg)self.s.close()self.s = Nonereturn Falsepassdef kill_process(port):ret = os.popen("netstat -nao|findstr " + str(port))# 注意解码方式和cmd要相同,即为"gbk",否则输出乱码str_list = ret.read()ret_list = re.split('\n', str_list)try:process_pid = list(ret_list[0].split())[-1]os.popen('taskkill /pid ' + str(process_pid) + ' /F')except:passdef begin():global confconf = Conf()print("目标代理地址为:", end="")if conf.proxy_ip == "":print("无代理,直连")else:print(conf.proxy_ip, conf.proxy_port)print("当前计算机类型", platform.system())print("当前局域网地址:", socket.gethostbyname(socket.gethostname()))print("开放的端口如下:")ip_addr = "0.0.0.0"socks_set = {}for port in conf.local_port:print(port)kill_process(port)s5 = Socks5(ip_addr, int(port))s5.start()socks_set[int(port)] = s5if __name__ == '__main__':begin()

配置文件conf.json

{"#white_list": "白名单,可以是IP,也可以是域名","white_list": ["example1.com","1.1.1.1"],"#black_list": "黑名单,可以是IP,也可以是域名","black_list": ["example2.com","2.2.2.2"],"#proxy_ip": "Socks5代理IP,当proxy_ip为空字符串时则无代理","#proxy_port": "Socks5代理端口,当proxy_ip为空字符串时则无代理","eg_proxy_ip": "1.1.1.1","proxy_ip": "","proxy_port": "8000","local_port": [10073,10074,10075]
}

如果对你有帮助,不妨点个赞再走,让更多人能看到

如果有什么疑问,欢迎在下方留言,开源的世界相互学习交流,才能进步的更快

Python SOCKS5 二级代理服务器 实现白名单与网络控制相关推荐

  1. 计算机二级报名学校白名单,干货丨2021机器人编程赛事+等级考试攻略之教育部白名单赛事篇...

    近两年,通过和众多家长聊天.接触,小贝老师明显发现家长们对于机器人编程赛事格外关注. 无论是帮助孩子搜集各项赛事信息.报名.备赛,还是跟着孩子一路南征北战,家长们都大力支持,表现出了足够的热情度. 正 ...

  2. 后端系统开发之白名单机制

    前言 后端系统中经常会听到"某某白名单"的名字,为什么要有白名单呢?使用白名单机制有什么好处? 在大型后端系统中,白名单机制是必不可少的 概念 白名单的概念与"黑名单&q ...

  3. cordova whitelist白名单

    介绍 这个插件在Cordova 4.0以后实现了一个白名单政策用于导航到应用程序的webview上 安装插件 自动安装 详情 导航白名单Navigation Whitelist 控制URL WebVi ...

  4. python白名单验证-python脚本简单检测ip合法性并添加到白名单文件

    一.功能说明 有时候项目需要通过ip地址来判定是否允许访问,通常通过一个白名单地址文件来存放这些允许放行的ip,但每次打开文件编辑比较麻烦,容易出错,也不知道是否添加过,故用python写了一个自动添 ...

  5. python白名单验证是什么意思_第10.5节 使用__all__定义Python模块导入白名单

    一.引言 <第10.4节 Python模块的弱封装机制>介绍了Python模块的的弱封装机制,除了使用弱封装机制来从一定程度上防止导入特定成员外,Python模块中还提供可另外一种类似白名 ...

  6. Mapreduce Wordcount白名单 Python实现

    Mapreduce Wordcount白名单 Python实现 1.Mapper部分的map.py代码: 其中读入文件The_Man_of_Property.txt需要上传到HDFS文件系统上:had ...

  7. python白名单验证是什么意思_luminati python+selenium使用方式(白名单和账号密码验证方式)...

    一,在Windows下使用 1.官网登录后下载对应的exe代理软件 2.下载完成后打开,打开完成后登录127.0.0.1:22999,然后输入用户名和密码进行登录.登录成功后进入到如下界面 3.将本机 ...

  8. python白名单验证-Python中XSS白名单过滤的实现

    在Web开发中很多地方需要用户输入富文本但又要确保输入的这些内容绝对安全不会引发XSS漏洞,那么最常用的技术就是白名单技术. 白名单的通常做法都是构建一个允许使用的标签及对应属性的列表,然后对用户输入 ...

  9. python白名单验证-JWT黑名单和白名单

    单点登录系统 单点登录系统保存了用户的登录名和密码,上网用户在单点登录系统中认证成功后,就可以直接登录各个业务系统. 1. 用户使用单点登录系统的登录界面,输入用户名和密码登录成功后, 单点登录系统为 ...

最新文章

  1. Bing搜索核心技术BitFunnel原理
  2. 会签 数据库表设计_关于数据库表设计和实体类设计的思考
  3. Java 源程序与编译型运行区别
  4. elinks文字浏览器
  5. stackoverflow favorites
  6. python123判断性别程序_听说胎心能够预测宝宝性别?这是真的吗?
  7. Pingf的stm32学习笔记之GPIO_Part2[0913]
  8. django 1.8 官方文档翻译: 4-2-4 人性化
  9. SQL Server 中的执行计划和SQL Server Profiler
  10. 读取 Excel 之 Epplus
  11. sql聚集索引和非聚集索引_SQL Server中非聚集索引概述
  12. Serverless会使 SaaS 商业模式过时,而开源将成为新的王者
  13. image 微信小程序flex_第三天学习微信小程序开发总结
  14. 2008年6月6日今天终于调回公司本部啦,记录历史的一天。
  15. 单区域——OSPF 讲解+配置命令(为了做双机热备实验)
  16. 计算机编程的经典书籍(强烈推荐)
  17. Visual Studio 2017 下创建ASP.NET网站程序详细步骤
  18. 人机交互技术的发展趋势是怎样的?
  19. java work stealing_工作窃取(work-stealing)算法
  20. Java算法——加一(LeetCode第66题)

热门文章

  1. 应用无法更新,新版本的的版本号(VersionCode)低于之前的版本
  2. 2022-2028年中国纺织机械行业投资分析及前景预测报告(全卷)
  3. 2017年区块链行业年度特别报告
  4. 【每日早报】2019/12/26
  5. Photoshop中怎么画虚线
  6. 快餐店运行模拟C++程序源码代写
  7. linux7.4重启udev命令,Linux 7.x 中 UDEV 生效的方法
  8. 代理死神的英雄之路--《境界》
  9. MySQL查询每天每周每月每季每年的数据等常用统计查询
  10. 4.基于scrapy的实时电影爬虫开发