400多行Python代码实现了一个FTP服务器
Python版本
实现了比之前的xxftp更多更完善的功能
1、继续支持多用户
2、继续支持虚拟目录
3、增加支持用户根目录以及映射虚拟目录的权限设置
4、增加支持限制用户根目录或者虚拟目录的空间大小
xxftp的特点
1、开源、跨平台
2、简单、易用
3、不需要数据库
4、可扩展性超强
5、你可以免费使用xxftp假设自己的私人FTP服务器
测试地址
ftp://xiaoxia.org
匿名帐号可以使用!
匿名根目录只读,映射了一个虚拟目录,可以上传文件但不允许更改!
使用方法
跟之前用C语言写的xxftp使用方法一样:
- Create a root directory to hold the user directories.
Configure it in config.xml. - Create user directories under the root directory.
If you want to specify a password, create a directory named “.xxftp”,
under which create a text file named “password” containing the MD5
code of the password. - If you want to specify the welcome and goodbye message, write it in
xxftp.welcome and xxftp.goodbye under the root directory. - Configure config.xml.
The structure of your FTP server root may be like:
-/root
-xxftp.welcome
-xxftp.goodbye
-user1
-.xxftp
-password
-…
-user2
-.xxftp
-password
-…
-anonymous源代码
复制代码 代码如下:
import socket, threading, os, sys, time
import hashlib, platform, stat
listen_ip = “localhost”
listen_port = 21
conn_list = []
root_dir = “./home”
max_connections = 500
conn_timeout = 120
class FtpConnection(threading.Thread):
def init(self, fd):
threading.Thread.init(self)
self.fd = fd
self.running = True
self.setDaemon(True)
self.alive_time = time.time()
self.option_utf8 = False
self.identified = False
self.option_pasv = True
self.username = “”
def process(self, cmd, arg):
cmd = cmd.upper();
if self.option_utf8:
arg = unicode(arg, “utf8”).encode(sys.getfilesystemencoding())
print “<<”, cmd, arg, self.fd
Ftp Command
if cmd == “BYE” or cmd == “QUIT”:
if os.path.exists(root_dir + “/xxftp.goodbye”):
self.message(221, open(root_dir + “/xxftp.goodbye”).read())
else:
self.message(221, “Bye!”)
self.running = False
return
elif cmd == “USER”:
Set Anonymous User
if arg == “”: arg = “anonymous”
for c in arg:
if not c.isalpha() and not c.isdigit() and c!="_":
self.message(530, “Incorrect username.”)
return
self.username = arg
self.home_dir = root_dir + “/” + self.username
self.curr_dir = “/”
self.curr_dir, self.full_path, permission, self.vdir_list, \
limit_size, is_virtual = self.parse_path("/")
if not os.path.isdir(self.home_dir):
self.message(530, “User " + self.username + " not exists.”)
return
self.pass_path = self.home_dir + “/.xxftp/password”
if os.path.isfile(self.pass_path):
self.message(331, "Password required for " + self.username)
else:
self.message(230, “Identified!”)
self.identified = True
return
elif cmd == “PASS”:
if open(self.pass_path).read() == hashlib.md5(arg).hexdigest():
self.message(230, “Identified!”)
self.identified = True
else:
self.message(530, “Not identified!”)
self.identified = False
return
elif not self.identified:
self.message(530, “Please login with USER and PASS.”)
return
self.alive_time = time.time()
finish = True
if cmd == “NOOP”:
self.message(200, “ok”)
elif cmd == “TYPE”:
self.message(200, “ok”)
elif cmd == “SYST”:
self.message(200, “UNIX”)
elif cmd == “EPSV” or cmd == “PASV”:
self.option_pasv = True
try:
self.data_fd = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.data_fd.bind((listen_ip, 0))
self.data_fd.listen(1)
ip, port = self.data_fd.getsockname()
if cmd == “EPSV”:
self.message(229, “Entering Extended Passive Mode (|||” + str(port) + “|)”)
else:
ipnum = socket.inet_aton(ip)
self.message(227, “Entering Passive Mode (%s,%u,%u).” %
(",".join(ip.split(".")), (port>>8&0xff), (port&0xff)))
except:
self.message(500, “failed to create data socket.”)
elif cmd == “EPRT”:
self.message(500, “implement EPRT later…”)
elif cmd == “PORT”:
self.option_pasv = False
self.data_fd = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s = arg.split(",")
self.data_ip = “.”.join(s[:4])
self.data_port = int(s[4])*256 + int(s[5])
self.message(200, “ok”)
elif cmd == “PWD” or cmd == “XPWD”:
if self.curr_dir == “”: self.curr_dir = “/”
self.message(257, ‘"’ + self.curr_dir + ‘"’)
elif cmd == “LIST” or cmd == “NLST”:
if arg != “” and arg[0] == “-”: arg = “” # omit parameters
remote, local, perm, vdir_list, limit_size, is_virtual = self.parse_path(arg)
if not os.path.exists(local):
self.message(550, “failed.”)
return
if not self.establish(): return
self.message(150, “ok”)
for v in vdir_list:
f = v[0]
if self.option_utf8:
f = unicode(f, sys.getfilesystemencoding()).encode(“utf8”)
if cmd == “NLST”:
info = f + “\r\n”
else:
info = “d%s%s------- %04u %8s %8s %8lu %s %s\r\n” % (
“r” if “read” in perm else “-”,
“w” if “write” in perm else “-”,
1, “0”, “0”, 0,
time.strftime("%b %d %Y", time.localtime(time.time())),
f)
self.data_fd.send(info)
for f in os.listdir(local):
if f[0] == “.”: continue
path = local + “/” + f
if self.option_utf8:
f = unicode(f, sys.getfilesystemencoding()).encode(“utf8”)
if cmd == “NLST”:
info = f + “\r\n”
else:
st = os.stat(path)
info = “%s%s%s------- %04u %8s %8s %8lu %s %s\r\n” % (
“-” if os.path.isfile(path) else “d”,
“r” if “read” in perm else “-”,
“w” if “write” in perm else “-”,
1, “0”, “0”, st[stat.ST_SIZE],
time.strftime("%b %d %Y", time.localtime(st[stat.ST_MTIME])),
f)
self.data_fd.send(info)
self.message(226, "Limit size: " + str(limit_size))
self.data_fd.close()
self.data_fd = 0
elif cmd == “REST”:
self.file_pos = int(arg)
self.message(250, “ok”)
elif cmd == “FEAT”:
features = “211-Features:\r\nSITES\r\nEPRT\r\nEPSV\r\nMDTM\r\nPASV\r\n”\
“REST STREAM\r\nSIZE\r\nUTF8\r\n211 End\r\n”
self.fd.send(features)
elif cmd == “OPTS”:
arg = arg.upper()
if arg == “UTF8 ON”:
self.option_utf8 = True
self.message(200, “ok”)
elif arg == “UTF8 OFF”:
self.option_utf8 = False
self.message(200, “ok”)
else:
self.message(500, “unrecognized option”)
elif cmd == “CDUP”:
finish = False
arg = “…”
else:
finish = False
if finish: return
Parse argument ( It’s a path )
if arg == “”:
self.message(500, “where’s my argument?”)
return
remote, local, permission, vdir_list, limit_size, is_virtual = \
self.parse_path(arg)
can not do anything to virtual directory
if is_virtual: permission = “none”
can_read, can_write, can_modify = “read” in permission, “write” in permission,
“modify” in permission
newpath = local
try:
if cmd == “CWD”:
if(os.path.isdir(newpath)):
self.curr_dir = remote
self.full_path = newpath
self.message(250, ‘"’ + remote + ‘"’)
else:
self.message(550, “failed”)
elif cmd == “MDTM”:
if os.path.exists(newpath):
self.message(213, time.strftime("%Y%m%d%I%M%S", time.localtime(
os.path.getmtime(newpath))))
else:
self.message(550, “failed”)
elif cmd == “SIZE”:
self.message(231, os.path.getsize(newpath))
elif cmd == “XMKD” or cmd == “MKD”:
if not can_modify:
self.message(550, “permission denied.”)
return
os.mkdir(newpath)
self.message(250, “ok”)
elif cmd == “RNFR”:
if not can_modify:
self.message(550, “permission denied.”)
return
self.temp_path = newpath
self.message(350, "rename from " + remote)
elif cmd == “RNTO”:
os.rename(self.temp_path, newpath)
self.message(250, "RNTO to " + remote)
elif cmd == “XRMD” or cmd == “RMD”:
if not can_modify:
self.message(550, “permission denied.”)
return
os.rmdir(newpath)
self.message(250, “ok”)
elif cmd == “DELE”:
if not can_modify:
self.message(550, “permission denied.”)
return
os.remove(newpath)
self.message(250, “ok”)
elif cmd == “RETR”:
if not os.path.isfile(newpath):
self.message(550, “failed”)
return
if not can_read:
self.message(550, “permission denied.”)
return
if not self.establish(): return
self.message(150, “ok”)
f = open(newpath, “rb”)
while self.running:
self.alive_time = time.time()
data = f.read(8192)
if len(data) == 0: break
self.data_fd.send(data)
f.close()
self.data_fd.close()
self.data_fd = 0
self.message(226, “ok”)
elif cmd == “STOR” or cmd == “APPE”:
if not can_write:
self.message(550, “permission denied.”)
return
if os.path.exists(newpath) and not can_modify:
self.message(550, “permission denied.”)
return
Check space size remained!
used_size = 0
if limit_size > 0:
used_size = self.get_dir_size(os.path.dirname(newpath))
if not self.establish(): return
self.message(150, “ok”)
f = open(newpath, (“ab” if cmd == “APPE” else “wb”) )
while self.running:
self.alive_time = time.time()
data = self.data_fd.recv(8192)
if len(data) == 0: break
if limit_size > 0:
used_size = used_size + len(data)
if used_size > limit_size: break
f.write(data)
f.close()
self.data_fd.close()
self.data_fd = 0
if limit_size > 0 and used_size > limit_size:
self.message(550, “Exceeding user space limit: " + str(limit_size) + " bytes”)
else:
self.message(226, “ok”)
else:
self.message(500, cmd + " not implemented")
except:
self.message(550, “failed.”)
def establish(self):
if self.data_fd == 0:
self.message(500, “no data connection”)
return False
if self.option_pasv:
fd = self.data_fd.accept()[0]
self.data_fd.close()
self.data_fd = fd
else:
try:
self.data_fd.connect((self.data_ip, self.data_port))
except:
self.message(500, “failed to establish data connection”)
return False
return True
def read_virtual(self, path):
vdir_list = []
path = path + “/.xxftp/virtual”
if os.path.isfile(path):
for v in open(path, “r”).readlines():
items = v.split()
items[1] = items[1].replace("$root", root_dir)
vdir_list.append(items)
return vdir_list
def get_dir_size(self, folder):
size = 0
for path, dirs, files in os.walk(folder):
for f in files:
size += os.path.getsize(os.path.join(path, f))
return size
def read_size(self, path):
size = 0
path = path + “/.xxftp/size”
if os.path.isfile(path):
size = int(open(path, “r”).readline())
return size
def read_permission(self, path):
permission = “read,write,modify”
path = path + “/.xxftp/permission”
if os.path.isfile(path):
permission = open(path, “r”).readline()
return permission
def parse_path(self, path):
if path == “”: path = “.”
if path[0] != “/”:
path = self.curr_dir + “/” + path
s = os.path.normpath(path).replace("\", “/”).split("/")
local = self.home_dir
reset directory permission
vdir_list = self.read_virtual(local)
limit_size = self.read_size(local)
permission = self.read_permission(local)
remote = “”
is_virtual = False
for name in s:
name = name.lstrip(".")
if name == “”: continue
remote = remote + “/” + name
is_virtual = False
for v in vdir_list:
if v[0] == name:
permission = v[2]
local = v[1]
limit_size = self.read_size(local)
is_virtual = True
if not is_virtual: local = local + “/” + name
vdir_list = self.read_virtual(local)
return (remote, local, permission, vdir_list, limit_size, is_virtual)
def run(self):
‘’’ Connection Process ‘’’
try:
if len(conn_list) > max_connections:
self.message(500, “too many connections!”)
self.fd.close()
self.running = False
return
Welcome Message
if os.path.exists(root_dir + “/xxftp.welcome”):
self.message(220, open(root_dir + “/xxftp.welcome”).read())
else:
self.message(220, “xxftp(Python) www.xiaoxia.org”)
Command Loop
line = “”
while self.running:
data = self.fd.recv(4096)
if len(data) == 0: break
line += data
if line[-2:] != “\r\n”: continue
line = line[:-2]
space = line.find(" ")
if space == -1:
self.process(line, “”)
else:
self.process(line[:space], line[space+1:])
line = “”
except:
print “error”, sys.exc_info()
self.running = False
self.fd.close()
print “connection end”, self.fd, “user”, self.username
def message(self, code, s):
‘’’ Send Ftp Message ‘’’
s = str(s).replace("\r", “”)
ss = s.split("\n")
if len(ss) > 1:
r = (str(code) + “-”) + ("\r\n" + str(code) + “-”).join(ss[:-1])
r += “\r\n” + str(code) + " " + ss[-1] + “\r\n”
else:
r = str(code) + " " + ss[0] + “\r\n”
if self.option_utf8:
r = unicode(r, sys.getfilesystemencoding()).encode(“utf8”)
self.fd.send®
def server_listen():
global conn_list
listen_fd = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
listen_fd.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
listen_fd.bind((listen_ip, listen_port))
listen_fd.listen(1024)
conn_lock = threading.Lock()
print "ftpd is listening on ", listen_ip + “:” + str(listen_port)
while True:
conn_fd, remote_addr = listen_fd.accept()
print "connection from ", remote_addr, “conn_list”, len(conn_list)
conn = FtpConnection(conn_fd)
conn.start()
conn_lock.acquire()
conn_list.append(conn)
check timeout
try:
curr_time = time.time()
for conn in conn_list:
if int(curr_time - conn.alive_time) > conn_timeout:
if conn.running == True:
conn.fd.shutdown(socket.SHUT_RDWR)
conn.running = False
conn_list = [conn for conn in conn_list if conn.running]
except:
print sys.exc_info()
conn_lock.release()
def main():
server_listen()
if name == “main”:
main()
400多行Python代码实现了一个FTP服务器相关推荐
- python写一个游戏多少代码-使用50行Python代码从零开始实现一个AI平衡小游戏
集智导读: 本文会为大家展示机器学习专家 Mike Shi 如何用 50 行 Python 代码创建一个 AI,使用增强学习技术,玩耍一个保持杆子平衡的小游戏.所用环境为标准的 OpenAI Gym, ...
- python换脸完整程序_小 200 行 Python 代码做了一个换脸程序
原标题:小 200 行 Python 代码做了一个换脸程序 简介 在这篇文章中我将介绍如何写一个简短(200行)的 Python 脚本,来自动地将一幅图片的脸替换为另一幅图片的脸. 这个过程分四步: ...
- 50行python游戏代码_使用50行Python代码从零开始实现一个AI平衡小游戏
使用50行Python代码从零开始实现一个AI平衡小游戏 发布时间:2020-10-23 09:26:14 来源:脚本之家 阅读:74 集智导读: 本文会为大家展示机器学习专家 Mike Shi 如何 ...
- python程序30行_30行Python代码,打造一个简单的微信群聊助手,简单方便
大家都知道,最近代码君迷上了Python,一直在研究这门语言,还是那句话,人生苦短,我学Python,今天代码君要教大家一个黑科技,30行代码实现自己定制的微信群聊助手,这个助手有什么用呐,就是用来活 ...
- python50行小游戏_使用50行Python代码从零开始实现一个AI平衡小游戏
集智导读: 本文会为大家展示机器学习专家 Mike Shi 如何用 50 行 Python 代码创建一个 AI,使用增强学习技术,玩耍一个保持杆子平衡的小游戏.所用环境为标准的 OpenAI Gym, ...
- 46 行 Python 代码,做一个会旋转的三维甜甜圈!
摘要:在三维渲染技术中,符号距离函数很难理解,而本文作者仅用 46 行 Python 代码就展示了这项技术. 链接:https://vgel.me/posts/donut/ 声明:本文为 CSDN ...
- python社区微信群_30行Python代码,打造一个简单的微信群聊助手,简单方便
大家都知道,最近代码君迷上了Python,一直在研究这门语言,还是那句话,人生苦短,我学Python,今天代码君要教大家一个黑科技,30行代码实现自己定制的微信群聊助手,这个助手有什么用呐,就是用来活 ...
- 小 200 行 Python 代码做了一个换脸程序
本文转自"伯乐在线",英文出处:change faces . 简介 在这篇文章中我将介绍如何写一个简短(200行)的 Python 脚本,来自动地将一幅图片的脸替换为另一幅图片的脸 ...
- 30行Python代码,打造一个微信群聊助手~
大家都知道,最近代码君迷上了Python,一直在研究这门语言,还是那句话,人生苦短,我学Python,今天代码君要教大家一个黑科技,30行代码实现自己定制的微信群聊助手,这个助手有什么用呐,就是用来活 ...
最新文章
- 经典:盘点80后男人找老婆的20条标准
- 统计学习笔记(1) 监督学习概论(1)
- Oracle收集用户的权限
- [转]SQL Collation冲突解决 临时表
- 【Axure高保真原型】移动端地图模板
- 基于自适应决策算子的鲸鱼优化算法-附代码
- UI设计需要使用哪些软件?推荐这5款
- gcc编译优化-O0 -O1 -O2 -O3 -OS解析
- 软件工程师必须掌握的知识结构
- 意念控制头环:用脑电波来操控智能家居
- 吉他大横按的学习技巧总结
- 华三c语言笔试,H3C笔试及答案解析
- 抖音最疯狂的辞职代码,你下载了吗?
- MCU 上电复位功能的使用注意点
- PowerPoint文档“大减肥”(downmoon)
- 声纹识别概述(1)初识
- 工信部就垃圾短信问题约谈7家虚拟运营商
- html写文章发布,写文章.html
- Python热点舆情数据挖掘
- 阿里云code和git管理项目