日常生活中常会遇到一些小任务,如果人工处理会很麻烦。

用python做些小脚本处理,能够提高不少效率。或者可以把python当工具使用,辅助提高一下办公效率。(比如我常拿python当计算器,计算和字符转换用)

以下总结下个人用到的一些python小脚本留作备忘。

打印16进制字符串

用途:通信报文中的hex数据不好看,可以打印为16进制的字符串显示出来。

#coding=utf-8
#name: myutil.py
def print_hex1(s,prev='0x'):for c in s:print '%s%02x' %(prev,ord(c)),print
def print_hex(s):for c in s:print '%02x' %(ord(c)),print
print 'myutil'def print_hex3(s,prev='0x'):i = 0for c in s:print '%s%s,' %(prev,s[i:i+2]),i += 2print

文件合并

之前搞单片机时生成的hex应用程序文件不能直接刷到单片机里,还需要把iap程序合并成一个文件才能烧写到单片机。每次打包麻烦,做个脚本处理:

#path='C:\\Users\\test\\IAP_CZ_v204w.hex'
#file=open(path,'r')
#for ll in file.readlines()
#    print ll
#coding=gb18030
import time
import os
def prr():print 'file combination begin..'path0=os.getcwd()
print path0
path=path0
#path1=path0
path2=path0
path+='\\IAP_CZ_v204w.hex'
#path1+='\\NC_armStaSystem.hex'
path2+='\\'
print path
s=raw_input('enter file path:')
path1=s
#path1+='\\NC_armStaSystem.hex'
print path1
s=raw_input('enter file name:')
path2+=s
path2+=time.strftime('_%y%m%d%H%M%S')
path2+='.hex'
print path2
prr()
try:f1=open(path,'r')count=0for l in f1.readlines():#    print lcount+=1    #print countf1.close()f1=open(path,'r')f2=open(path1,'r')f3=open(path2,'w')while(count>1):l=f1.readline()#   print lf3.write(l)count-=1#   print countf3.flush()for l in f2.readlines():f3.write(l)f3.flush()f3.close()print 'combination success!'
except Exception,ex:print 'excettion occured!'print exs=raw_input('press any key to continue...')
finally:f1.close()f2.close()s=raw_input('press any key to continue...')  

多线程下载图集

网上好看的动漫图集,如果手工下载太费时了。简单分析下网页地址规律,写个多线程脚本搞定。

#!/usr/bin/python
# -*- coding: utf-8 -*-
# filename: paxel.py'''It is a multi-thread downloading toolIt was developed follow axel.Author: volansE-mail: volansw [at] gmail.com
'''import sys
import os
import time
import urllib
from threading import Threadlocal_proxies = {'http': 'http://131.139.58.200:8080'}class AxelPython(Thread, urllib.FancyURLopener):'''Multi-thread downloading class.run() is a vitural method of Thread.'''def __init__(self, threadname, url, filename, ranges=0, proxies={}):Thread.__init__(self, name=threadname)urllib.FancyURLopener.__init__(self, proxies)self.name = threadnameself.url = urlself.filename = filenameself.ranges = rangesself.downloaded = 0def run(self):'''vertual function in Thread'''try:self.downloaded = os.path.getsize( self.filename )except OSError:#print 'never downloaded'self.downloaded = 0# rebuild start poindself.startpoint = self.ranges[0] + self.downloaded# This part is completedif self.startpoint >= self.ranges[1]:print 'Part %s has been downloaded over.' % self.filenamereturnself.oneTimeSize = 16384 #16kByte/timeprint 'task %s will download from %d to %d' % (self.name, self.startpoint, self.ranges[1])self.addheader("Range", "bytes=%d-%d" % (self.startpoint, self.ranges[1]))self.urlhandle = self.open( self.url )data = self.urlhandle.read( self.oneTimeSize )while data:filehandle = open( self.filename, 'ab+' )filehandle.write( data )filehandle.close()self.downloaded += len( data )#print "%s" % (self.name)#progress = u'\r...'data = self.urlhandle.read( self.oneTimeSize )def GetUrlFileSize(url, proxies={}):urlHandler = urllib.urlopen( url, proxies=proxies )headers = urlHandler.info().headerslength = 0for header in headers:if header.find('Length') != -1:length = header.split(':')[-1].strip()length = int(length)return lengthdef SpliteBlocks(totalsize, blocknumber):blocksize = totalsize/blocknumberranges = []for i in range(0, blocknumber-1):ranges.append((i*blocksize, i*blocksize +blocksize - 1))ranges.append(( blocksize*(blocknumber-1), totalsize -1 ))return ranges
def islive(tasks):for task in tasks:if task.isAlive():return Truereturn Falsedef paxel(url, output, blocks=6, proxies=local_proxies):''' paxel'''size = GetUrlFileSize( url, proxies )ranges = SpliteBlocks( size, blocks )threadname = [ "thread_%d" % i for i in range(0, blocks) ]filename = [ "tmpfile_%d" % i for i in range(0, blocks) ]tasks = []for i in range(0,blocks):task = AxelPython( threadname[i], url, filename[i], ranges[i] )task.setDaemon( True )task.start()tasks.append( task )time.sleep( 2 )while islive(tasks):downloaded = sum( [task.downloaded for task in tasks] )process = downloaded/float(size)*100show = u'\rFilesize:%d Downloaded:%d Completed:%.2f%%' % (size, downloaded, process)sys.stdout.write(show)sys.stdout.flush()time.sleep( 0.5 )filehandle = open( output, 'wb+' )for i in filename:f = open( i, 'rb' )filehandle.write( f.read() )f.close()try:os.remove(i)passexcept:passfilehandle.close()if __name__ == '__main__':url = "http://xz1.mm667.com/xz84/images/001.jpg"output = '001.jpg'paxel( url, output, blocks=4, proxies={} )

多线程下载图片

多线程下载图片并存储到指定目录中,若目录不存在则自动创建。

# -*- coding: UTF-8 -*-
'''
import re
import urllib
urls='http://xz5.mm667.com/xz82/images/01.jpg'
def getHtml(url):page = urllib.urlopen(url)html = page.read()return htmldef getImg(html):reg = r'src="(.+?\.jpg)" pic_ext'imgre = re.compile(reg)imglist = imgre.findall(html)x = 0for imgurl in imglist:urllib.urlretrieve(imgurl,'%s.jpg' % x)x = x + 1html = getHtml("http://tieba.baidu.com/p/2460150866")
getImg(html)
'''
import re
import urllib
import threading
import time
import socket
socket.setdefaulttimeout(30)
urls=[]
j=0
for i in xrange(1,81):if (i-1)%4 == 0:j += 1if ((j-1)%5) == 0 :j=1site='http://xz%d.mm667.com/xz%02d/images/' %(j,i)urls.append(site)print urls[i-1]
#print urls
'''
urls.append('http://xz1.mm667.com/xz01/images/')
urls.append('http://xz1.mm667.com/xz02/images/')
urls.append('http://xz1.mm667.com/xz03/images/')
urls.append('http://xz1.mm667.com/xz04/images/')urls.append('http://xz1.mm667.com/xz84/images/')
urls.append('http://xz2.mm667.com/xz85/images/')
urls.append('http://xz3.mm667.com/xz86/images/')
urls.append('http://xz1.mm667.com/s/')
urls.append('http://xz1.mm667.com/p/')
'''
def mkdir(path):# 引入模块import os# 去除首位空格path=path.strip()# 去除尾部 \ 符号path=path.rstrip("\\")# 判断路径是否存在# 存在     True# 不存在   FalseisExists=os.path.exists(path)# 判断结果if not isExists:# 如果不存在则创建目录print path+u' 创建成功'# 创建目录操作函数os.makedirs(path)return Trueelse:# 如果目录存在则不创建,并提示目录已存在print path+u' 目录已存在'return Falsedef cbk(a,b,c):'''''回调函数@a: 已经下载的数据块@b: 数据块的大小@c: 远程文件的大小'''per = 100.0 * a * b / cif per > 100:per = 100print '%.2f%%' % per#url = 'http://www.sina.com.cn'
local = 'd:\\mysite\\pic1\\'
d=0
mutex = threading.Lock()
# mutex1 = threading.Lock()
class MyThread(threading.Thread):def __init__(self, url, name):threading.Thread.__init__(self)self.url=urlself.name=namedef run(self):mutex.acquire()printprint 'down from %s' % self.urltime.sleep(1)mutex.release()try:urllib.urlretrieve(self.url, self.name)except Exception,e:print etime.sleep(1)urllib.urlretrieve(self.url, self.name)
threads=[] for u in urls[84:]:d += 1local = 'd:\\mysite\\pic1\\%d\\' %dmkdir(local)print 'download begin...'for i in xrange(40):lcal = localurl=uurl += '%03d.jpg' %ilcal += '%03d.jpg' %ith = MyThread(url,lcal)threads.append(th)th.start()
# for t in threads:
#     t.join()
print 'over! download finished'

爬虫抓取信息

#!/usr/bin/env python
# -*- coding:utf-8 -*-
"""
Python爬虫,抓取一卡通相关企业信息
Anthor: yangyongzhen
Version: 0.0.2
Date: 2014-12-14
Language: Python2.7.5
Editor: Sublime Text2
"""import urllib2, re, string
import threading, Queue, time
import sys
import os
from bs4 import BeautifulSoup
#from pprint import pprintreload(sys)
sys.setdefaultencoding('utf8')
_DATA = []
FILE_LOCK = threading.Lock()
SHARE_Q = Queue.Queue()  #构造一个不限制大小的的队列
_WORKER_THREAD_NUM = 3  #设置线程的个数_Num = 0 #总条数
class MyThread(threading.Thread) :def __init__(self, func,num) :super(MyThread, self).__init__()  #调用父类的构造函数self.func = func  #传入线程函数逻辑self.thread_num = num  def run(self) :self.func()#print u'线程ID:',self.thread_numdef worker() :global SHARE_Qwhile not SHARE_Q.empty():url = SHARE_Q.get() #获得任务my_page = get_page(url)find_data(my_page)  #获得当前页面的数据#write_into_file(temp_data)time.sleep(1)SHARE_Q.task_done()def get_page(url) :"""根据所给的url爬取网页HTMLArgs: url: 表示当前要爬取页面的urlReturns:返回抓取到整个页面的HTML(unicode编码)Raises:URLError:url引发的异常"""try :html = urllib2.urlopen(url).read()my_page = html.decode("gbk",'ignore')#my_page = unicode(html,'utf-8','ignore').encode('utf-8','ignore')#my_page = urllib2.urlopen(url).read().decode("utf8")except urllib2.URLError, e :if hasattr(e, "code"):print "The server couldn't fulfill the request."print "Error code: %s" % e.codeelif hasattr(e, "reason"):print "We failed to reach a server. Please check your url and read the Reason"print "Reason: %s" % e.reasonreturn my_pagedef find_data(my_page) :"""通过返回的整个网页HTML, 正则匹配名称Args:my_page: 传入页面的HTML文本用于正则匹配"""global _Numtemp_data = []items = BeautifulSoup(my_page).find_all("div", style="width:96%;margin:10px;border-bottom:1px #CCC dashed;padding-bottom:10px;")for index, item in enumerate(items) :#print item#print item.h1#print h.group()#temp_data.append(item)#print item.find(re.compile("^a"))href = item.find(re.compile("^a"))#soup = BeautifulSoup(item)#公司名称if item.a:data = item.a.string.encode("gbk","ignore")print datatemp_data.append(data)goods = item.find_all("div", style="font-size:12px;")#经营产品与联系方式for i in goods:data = i.get_text().encode("gbk","ignore")temp_data.append(data)print data#b = item.find_all("b")#print b#链接地址pat = re.compile(r'href="([^"]*)"')h = pat.search(str(item))if h:#print h.group(0)href = h.group(1)print hreftemp_data.append(h.group(1))_Num += 1#b = item.find_all(text=re.compile("Dormouse"))#pprint(goods)#print href#pat = re.compile(r'title="([^"]*)"')#h = pat.search(str(href))#if h:#print h.group(1)#temp_data.append(h.group(1))_DATA.append(temp_data)#headers = {'User-Agent':"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.1 (KHTML, like Gecko) Chrome/22.0.1207.1 Safari/537.1"}##浏览器请求头(大部分网站没有这个请求头会报错、请务必加上哦)
#all_url = 'http://www.mzitu.com/all'  ##开始的URL地址
#start_html = requests.get(all_url,  headers=headers)  ##使用requests中的get方法来获取all_url(就是:http://www.mzitu.com/all这个地址)的内容 headers为上面设置的请求头、请务必参考requests官方文档解释
#print(start_html.text) ##打印出start_html (请注意,concent是二进制的数据,一般用于下载图片、视频、音频、等多媒体内容是才使用concent, 对于打印网页内容请使用text)def main() :global SHARE_Qthreads = []start = time.clock()douban_url = "http://company.yktworld.com/comapny_search.asp?page={page}"#向队列中放入任务, 真正使用时, 应该设置为可持续的放入任务for index in xrange(20) :   SHARE_Q.put(douban_url.format(page = index * 1))for i in xrange(_WORKER_THREAD_NUM) :thread = MyThread(worker,i)thread.start()  #线程开始处理任务threads.append(thread)for thread in threads :thread.join()SHARE_Q.join()i = 0with open("down.txt", "w+") as my_file :for page in _DATA :i += 1for name in page:my_file.write(name + "\n")print "Spider Successful!!!"end = time.clock()print u'抓取完成!'print u'总页数:',iprint u'总条数:',_Numprint u'一共用时:',end-start,u'秒'if __name__ == '__main__':main()

爬虫多线程下载电影名称

#!/usr/bin/env python
# -*- coding:utf-8 -*-
"""
Python爬虫
Anthor: yangyongzhen
Version: 0.0.2
Date: 2014-12-14
Language: Python2.7.8
Editor: Sublime Text2
"""import urllib2, re, string
import threading, Queue, time
import sys
import os
from bs4 import BeautifulSoupreload(sys)
sys.setdefaultencoding('utf8')
_DATA = []
FILE_LOCK = threading.Lock()
SHARE_Q = Queue.Queue()  #构造一个不限制大小的的队列
_WORKER_THREAD_NUM = 3  #设置线程的个数rootpath = os.getcwd()+u'/抓取的内容/'def makedir(path):if not os.path.isdir(path):os.makedirs(path)#创建抓取的根目录
#makedir(rootpath)
#显示下载进度
def Schedule(a,b,c):'''''a:已经下载的数据块b:数据块的大小c:远程文件的大小'''per = 100.0 * a * b / cif per > 100 :per = 100print '%.2f%%' % per
class MyThread(threading.Thread) :def __init__(self, func) :super(MyThread, self).__init__()  #调用父类的构造函数self.func = func  #传入线程函数逻辑def run(self) :self.func()def worker() :print 'work thread start...\n'global SHARE_Qwhile not SHARE_Q.empty():url = SHARE_Q.get() #获得任务my_page = get_page(url)find_title(my_page)  #获得当前页面的电影名#write_into_file(temp_data)time.sleep(1)SHARE_Q.task_done()def get_page(url) :"""根据所给的url爬取网页HTMLArgs: url: 表示当前要爬取页面的urlReturns:返回抓取到整个页面的HTML(unicode编码)Raises:URLError:url引发的异常"""try :html = urllib2.urlopen(url).read()my_page = html.decode("utf8")#my_page = unicode(html,'utf-8','ignore').encode('utf-8','ignore')#my_page = urllib2.urlopen(url).read().decode("utf8")except urllib2.URLError, e :if hasattr(e, "code"):print "The server couldn't fulfill the request."print "Error code: %s" % e.codeelif hasattr(e, "reason"):print "We failed to reach a server. Please check your url and read the Reason"print "Reason: %s" % e.reasonreturn my_pagedef find_title(my_page) :"""通过返回的整个网页HTML, 正则匹配前100的电影名称Args:my_page: 传入页面的HTML文本用于正则匹配"""temp_data = []movie_items = BeautifulSoup(my_page).findAll('h1')for index, item in enumerate(movie_items) :#print item#print item.h1pat = re.compile(r'href="([^"]*)"')h = pat.search(str(item))if h:#print h.group(0)href = h.group(1)print hreftemp_data.append(h.group(1))#print h.group()#temp_data.append(item)#print item.find(re.compile("^a"))href = item.find(re.compile("^a"))#soup = BeautifulSoup(item)if item.a:#print item.a.stringtemp_data.append(item.a.string)#print href#pat = re.compile(r'title="([^"]*)"')#h = pat.search(str(href))#if h:#print h.group(1)#temp_data.append(h.group(1))_DATA.append(temp_data)def main() :global SHARE_Qthreads = []start = time.clock()douban_url = "http://movie.misszm.com/page/{page}"#向队列中放入任务, 真正使用时, 应该设置为可持续的放入任务for index in xrange(5) :   SHARE_Q.put(douban_url.format(page = index * 1))for i in xrange(_WORKER_THREAD_NUM) :thread = MyThread(worker)thread.start()  #线程开始处理任务threads.append(thread)for thread in threads :thread.join()SHARE_Q.join()with open("movie.txt", "w+") as my_file :for page in _DATA :for movie_name in page:my_file.write(movie_name + "\n")print "Spider Successful!!!"end = time.clock()print u'抓取完成!'print u'一共用时:',end-start,u'秒'if __name__ == '__main__':main()

串口转tcp工具

#coding=utf-8
#author:yangyongzhen
#QQ:534117529
#'CardTest TcpServer  - Simple Test Card Tool 1.00' import sys,threading,time;
import serial;
import binascii,encodings;
import re;
import os;
from socket import *
from struct import *;
#from myutil import *;
#name: myutil.pymylock = threading.RLock() Server_IP = ''
Srever_Port = ''def print_hex1(s,prev='0x'):for c in s:print '%s%02x' %(prev,ord(c)),print
def print_hex(s):for c in s:print '%02x' %(ord(c)),printdef hexto_str(s):r =''for c in s:r += '%02x' %(ord(c))return r
def strto_hex(s):r = s.decode('hex')return r
#''代表服务器为localhost#在一个非保留端口号上进行监听class ComThread:def __init__(self, Port=0):self.l_serial = None;self.alive = False;self.waitEnd = None;self.port = Port;#TCP部分#self.sockobj = socket.socket(socket.AF_INET, socket.SOCK_STREAM)self.connection = None#数据self.snddata = ''self.rcvdata = ''def waiting(self):if not self.waitEnd is None:self.waitEnd.wait();def SetStopEvent(self):if not self.waitEnd is None:self.waitEnd.set();self.alive = False;self.stop();def start(self):self.l_serial = serial.Serial();self.l_serial.port = self.port;self.l_serial.baudrate = 115200;self.l_serial.timeout = 2;  #秒self.l_serial.open();if self.l_serial.isOpen():self.waitEnd = threading.Event();self.alive = True;print 'open serial port %d ok!\n' %(self.port+1)print 'baudrate:115200 \n'self.thread_read = None;self.thread_read = threading.Thread(target=self.FirstReader);self.thread_read.setDaemon(1);self.thread_read.start();self.thread_write = None;self.thread_write = threading.Thread(target=self.FirstWriter);self.thread_write.setDaemon(1);self.thread_write.start();#TCP部分self.thread_TcpClient = None;self.thread_TcpClient = threading.Thread(target=self.TcpClient);self.thread_TcpClient.setDaemon(1);self.thread_TcpClient.start();self.thread_TcpSend = None;self.thread_TcpSend = threading.Thread(target=self.TcpSend);self.thread_TcpSend.setDaemon(1);self.thread_TcpSend.start();return True;else:return False;def FirstReader(self):while self.alive:# 接收间隔time.sleep(0.1);try:data = '';n = self.l_serial.inWaiting();if n:data = data+self.l_serial.read(n);#for l in xrange(len(data)):#print '%02X' % ord(data[l]),# 发送数据print u'->请求:'print data;mylock.acquire() self.snddata = datamylock.release()#print_hex(data);# 判断结束except Exception, ex:print str(ex);self.waitEnd.set();self.alive = False;def FirstWriter(self):while self.alive:# 接收间隔time.sleep(0.1);try:#snddata = raw_input('\nenter data send:\n')if self.rcvdata!='':self.l_serial.write(self.rcvdata); print u'-<应答:'print self.rcvdata;mylock.acquire() self.rcvdata = '';mylock.release()#print_hex(snddata);except Exception, ex:print str(ex);self.waitEnd.set();self.alive = False;def TcpClient(self):while True:# 接收间隔time.sleep(0.1);self.connection = socket(AF_INET, SOCK_STREAM);self.connection.connect((Server_IP, int(Server_Port)));print 'Connect to Server OK!';self.snddata = ''self.rcvdata = ''while True:#读取客户端套接字的下一行data = self.connection.recv(1024)#如果没有数量的话,那么跳出循环if not data: break#发送一个回复至客户端mylock.acquire() self.snddata = ''self.rcvdata = datamylock.release()#connection.send('Echo=>' + data)self.connection.close()self.waitEnd.set();self.alive = False;def TcpSend(self):while True:# 接收间隔time.sleep(0.1);while True:time.sleep(0.1);try:if not self.connection is None:if self.snddata != '':self.connection.send(self.snddata)mylock.acquire() self.rcvdata = ''self.snddata = ''mylock.release()except Exception, ex:pass          def stop(self):self.alive = False;self.thread_read.join();if self.l_serial.isOpen():self.l_serial.close();#测试用部分
if __name__ == '__main__':print 'Serial to Tcp Tool 1.00\n' print 'Author:yangyongzhen\n'print 'QQ:534117529\n'print 'Copyright (c) **cap 2015-2016.\n'Server_IP = raw_input('please enter ServerIP:')print 'Server_IP: %s' %(Server_IP)Server_Port = raw_input('please enter ServerPort:')print 'Server_Port: %s' %(Server_Port)com =raw_input('please enter com port(1-9):')rt = ComThread(int(com)-1);try:if rt.start():rt.waiting();rt.stop();else:pass;            except Exception,se:print str(se);if rt.alive:rt.stop();os.system("pause")print '';print 'End OK .';del rt;

远程读卡器server端

很早之前做过一个远程读卡器工具,原理就是在现场客服电脑上装个python做的tcpserver服务端,操控现场的读卡器。在公司内部做个客户端连接过去,这样实现在公司调试现场的卡片业务。

这个就是服务端工具的实现:

#coding=utf-8
#author:yangyongzhen
#QQ:534117529
#'CardTest TcpServer  - Simple Test Card Tool 1.00' import sys,threading,time;
import serial;
import binascii,encodings;
import re;
import os;
from socket import *
from struct import *;
#from myutil import *;
#name: myutil.pymylock = threading.RLock() def print_hex1(s,prev='0x'):for c in s:print '%s%02x' %(prev,ord(c)),print
def print_hex(s):for c in s:print '%02x' %(ord(c)),printdef hexto_str(s):r =''for c in s:r += '%02x' %(ord(c))return r
def strto_hex(s):r = s.decode('hex')return r
#''代表服务器为localhost#在一个非保留端口号上进行监听class ComThread:def __init__(self, Port=0):self.l_serial = None;self.alive = False;self.waitEnd = None;self.port = Port;#TCP部分self.myHost = ''self.myPort = 5050self.sockobj = socket(AF_INET, SOCK_STREAM)self.connection = None#数据self.snddata = ''self.rcvdata = ''def waiting(self):if not self.waitEnd is None:self.waitEnd.wait();def SetStopEvent(self):if not self.waitEnd is None:self.waitEnd.set();self.alive = False;self.stop();def start(self):self.l_serial = serial.Serial();self.l_serial.port = self.port;self.l_serial.baudrate = 115200;self.l_serial.timeout = 2;  #秒self.l_serial.open();if self.l_serial.isOpen():self.waitEnd = threading.Event();self.alive = True;print 'open serial port %d ok!\n' %(self.port+1)print 'baudrate:115200 \n'self.thread_read = None;self.thread_read = threading.Thread(target=self.FirstReader);self.thread_read.setDaemon(1);self.thread_read.start();self.thread_write = None;self.thread_write = threading.Thread(target=self.FirstWriter);self.thread_write.setDaemon(1);self.thread_write.start();#TCP部分self.thread_TcpServer = None;self.thread_TcpServer = threading.Thread(target=self.TcpServer);self.thread_TcpServer.setDaemon(1);self.thread_TcpServer.start();self.thread_TcpSend = None;self.thread_TcpSend = threading.Thread(target=self.TcpSend);self.thread_TcpSend.setDaemon(1);self.thread_TcpSend.start();return True;else:return False;def FirstReader(self):while self.alive:# 接收间隔time.sleep(0.1);try:data = '';n = self.l_serial.inWaiting();if n:data = data+self.l_serial.read(n);#for l in xrange(len(data)):#print '%02X' % ord(data[l]),# 发送数据print 'serial recv:'print data;mylock.acquire() self.snddata = datamylock.release()#print_hex(data);# 判断结束except Exception, ex:print str(ex);self.waitEnd.set();self.alive = False;def FirstWriter(self):while self.alive:# 接收间隔time.sleep(0.1);try:#snddata = raw_input('\nenter data send:\n')if self.rcvdata!='':self.l_serial.write(self.rcvdata); print 'serial send:'print self.rcvdata;mylock.acquire() self.rcvdata = '';mylock.release()#print_hex(snddata);except Exception, ex:print str(ex);self.waitEnd.set();self.alive = False;def TcpServer(self):self.sockobj.bind((self.myHost, self.myPort))self.sockobj.listen(10)print 'TcpServer listen at 5050 oK!\n'print 'Waiting for connect...\n'while True:# 接收间隔time.sleep(0.1);self.connection, address = self.sockobj.accept()print 'Server connected by', addressself.snddata = ''self.rcvdata = ''try:while True:#读取客户端套接字的下一行data = self.connection.recv(1024)#如果没有数量的话,那么跳出循环if not data: break#发送一个回复至客户端mylock.acquire() self.snddata = ''self.rcvdata = datamylock.release()#connection.send('Echo=>' + data)self.connection.close()except Exception, ex:self.connection.close()self.waitEnd.set();self.alive = False;def TcpSend(self):while True:# 接收间隔time.sleep(0.1);while True:time.sleep(0.1);try:if not self.connection is None:if self.snddata != '':self.connection.send(self.snddata)mylock.acquire() self.rcvdata = ''self.snddata = ''mylock.release()except Exception, ex:pass          def stop(self):self.alive = False;self.thread_read.join();if self.l_serial.isOpen():self.l_serial.close();#测试用部分
if __name__ == '__main__':print 'CardTest TcpServer  - Simple Test Card Tool 1.00\n' print 'Author:yangyongzhen\n'print 'QQ:534117529\n'print 'Copyright (c) **** 2015-2016.\n'com =raw_input('please enter com port(1-9):')rt = ComThread(int(com)-1);try:if rt.start():rt.waiting();rt.stop();else:pass;            except Exception,se:print str(se);if rt.alive:rt.stop();os.system("pause")print '';print 'End OK .';del rt;

黑客rtcp反向链接

# -*- coding: utf-8 -*-'''
filename:rtcp.py
@desc:
利用python的socket端口转发,用于远程维护
如果连接不到远程,会sleep 36s,最多尝试200(即两小时)@usage:
./rtcp.py stream1 stream2
stream为:l:port或c:host:port
l:port表示监听指定的本地端口
c:host:port表示监听远程指定的端口@author: watercloud, zd, knownsec team
@web: www.knownsec.com, blog.knownsec.com
@date: 2009-7
'''import socket
import sys
import threading
import timestreams = [None, None]  # 存放需要进行数据转发的两个数据流(都是SocketObj对象)
debug = 1  # 调试状态 0 or 1def print_hex(s):for c in s:print '%02x' %(ord(c)),print
def _usage():print 'Usage: ./rtcp.py stream1 stream2\nstream : L:port  or C:host:port'def _get_another_stream(num):'''从streams获取另外一个流对象,如果当前为空,则等待'''if num == 0:num = 1elif num == 1:num = 0else:raise "ERROR"while True:if streams[num] == 'quit':print("can't connect to the target, quit now!")sys.exit(1)if streams[num] != None:return streams[num]else:time.sleep(1)def _xstream(num, s1, s2):'''交换两个流的数据num为当前流编号,主要用于调试目的,区分两个回路状态用。'''try:while True:#注意,recv函数会阻塞,直到对端完全关闭(close后还需要一定时间才能关闭,最快关闭方法是shutdow)buff = s1.recv(1024)if debug > 0:print num,"recv"if len(buff) == 0: #对端关闭连接,读不到数据print num,"one closed"breaks2.sendall(buff)if debug > 0:print num,"sendall"print_hex(buff)except :print num,"one connect closed."try:s1.shutdown(socket.SHUT_RDWR)s1.close()except:passtry:s2.shutdown(socket.SHUT_RDWR)s2.close()except:passstreams[0] = Nonestreams[1] = Noneprint num, "CLOSED"def _server(port, num):'''处理服务情况,num为流编号(第0号还是第1号)'''srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)srv.bind(('0.0.0.0', port))srv.listen(1)#print 'local listening at port %d' (%(port))while True:conn, addr = srv.accept()print "connected from:", addrstreams[num] = conn  # 放入本端流对象s2 = _get_another_stream(num)  # 获取另一端流对象_xstream(num, conn, s2)def _connect(host, port, num):'''  处理连接,num为流编号(第0号还是第1号)@note: 如果连接不到远程,会sleep 36s,最多尝试200(即两小时)'''not_connet_time = 0wait_time = 36try_cnt = 199while True:if not_connet_time > try_cnt:streams[num] = 'quit'print('not connected')return Noneconn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)try:conn.connect((host, port))except Exception, e:print ('can not connect %s:%s!' % (host, port))not_connet_time += 1time.sleep(wait_time)continueprint "connected to %s:%i" % (host, port)streams[num] = conn  #放入本端流对象s2 = _get_another_stream(num) #获取另一端流对象_xstream(num, conn, s2)if __name__ == '__main__':print 'Tcp to Tcp Tool 1.00\n' print 'Author:yangyongzhen\n'print 'QQ:534117529\n'print 'Copyright (c) Newcapec 2015-2016.\n'Server_IP = raw_input('please enter Server IP:')print 'Server_IP: %s' %(Server_IP)Server_Port = raw_input('please enter Server Port:')print 'Server_Port: %s' %(Server_Port)com =raw_input('please enter Local Port:')tlist = []  # 线程列表,最终存放两个线程对象#targv = [sys.argv[1], sys.argv[2] ]t = threading.Thread(target=_server, args=(int(com), 0))tlist.append(t)t = threading.Thread(target=_connect, args=(Server_IP, int(Server_Port), 1))tlist.append(t)for t in tlist:t.start()for t in tlist:t.join()sys.exit(0)

调用c的动态库示例

# -*- coding:utf8 -*-
from ctypes import *
from binascii import unhexlify as unhex
import os
dll = cdll.LoadLibrary('mydll.dll');print 'begin load mydll..'
#key
#str1='\x9B\xED\x98\x89\x15\x80\xC3\xB2'
str1=unhex('0000556677222238')
#data
str2=unhex('002d2000000100015566772222383CD881604D0D286A556677222238000020141214181427')
#output
str3='\x12\x34\x56\x78\x12\x34\x56\x78'
pstr1=c_char_p()
pstr2=c_char_p()
pstr3=c_char_p()
pstr1.value=str1
pstr2.value=str2
pstr3.value=str3
dll.CurCalc_DES_MAC64(805306481,pstr1,0,pstr2,13,pstr3)
print pstr1
print pstr2
print pstr3
stro= pstr3.value
print stro
strtemp=''
for c in stro:print "%02x" % (ord(c))strtemp+="{0:02x}".format(ord(c))
print strtemp
os.execlp("E:\\RSA.exe",'')
s=raw_input('press any key to continue...')

tcp的socket连接报文测试工具

# -*- coding: utf-8 -*-
import socket
from myutil import *
from binascii import unhexlify as unhex
from ctypes import *
dll = cdll.LoadLibrary('mydll.dll')
print 'begin load mydll..'
HOST, PORT = "192.168.51.28", 5800
sd ="1234567812345678"
# Create a socket (SOCK_STREAM means a TCP socket)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:# Connect to server and send datasock.connect((HOST, int(PORT))print "Sent1 OK:"print sd# Receive data from the server and shut downreceived = sock.recv(1024)print "Received:"print_hex(received)print  'received len is 0x%02x' %(len(received))print  'received data analysis...'re1=received[0:4]print_hex(re1)re1=received[4:6]print_hex(re1)re1=received[6:10]print_hex(re1)re1=received[10:16]print_hex(re1)#pack2 sendsock.send(sd2.decode('hex'))print "Sent2 OK:"print sd2# Receive data from the server and shut downreceived1 = sock.recv(1024)print "Received1:"print_hex(received1)print  'received1 len is 0x%02x' %(len(received1))finally:sock.close()s=raw_input('press any key to continue...')

报文拼接与加解密测试

# -*- coding: gb2312 -*-
import socket
from myutil import *
from binascii import unhexlify as unhex
from ctypes import *
dll = cdll.LoadLibrary('mydll.dll')
print 'begin load mydll..'
#key
key='\xF1\xE2\xD3\xC4\xF1\xE2\xD3\xC4'
#output MAC
mac='\x00'*8
data='\x00'*8
pkey=c_char_p()
pdata=c_char_p()
pmac=c_char_p()
pkey.value=key
pdata.value=data
pmac.value=mac
#pack1
class pack:pass
pk=pack()
pk.len='00000032'
pk.ID='0001'
pk.slnum='00000004'
pk.poscode='123456781234'
pk.rand='1122334455667788'
pk.psam='313233343536'
pk.kind='0000'
pk.ver='000001'
pk.time='20140805135601'
pk.mac='06cc571e6d96e12d'data=unhex(pk.len+pk.ID+pk.slnum+pk.poscode+pk.rand+pk.psam+pk.kind+pk.ver+pk.time)
#print_hex(data)
pdata.value=data
#cacl MAC
dll.CurCalc_DES_MAC64(805306481,pkey,0,pdata,42,pmac)
stro= pmac.value
strtemp=''
for c in stro:strtemp+="{0:02x}".format(ord(c))
#print strtemp
pk.mac=strtemp
#data to send
sd=pk.len+pk.ID+pk.slnum+pk.poscode+pk.rand+pk.psam+pk.kind+pk.ver+pk.time+pk.mac
print  'send1 len is 0x%02x' %(len(sd)/2)
print sd
#pack2
class pack2:pass
pk2=pack2()
pk2.len='0000006E'
pk2.ID='0012'
pk2.slnum='00000005'
pk2.fatCode='00'
pk2.cardASN='0000000000000000'
pk2.cardType='00'
pk2.userNO= '0000000000000000'pk2.fileName1='00000000000000000000000000000015'
pk2.dataLen1='00'
pk2.dataArea1='00000000000000319999990800FB2014080620240806FFFFFFFFFFFFFFFFFFFF'
pk2.fileName2='00000000000000000000000000000016'
pk2.dataLen2='00'
pk2.dataArea2='000003E800FFFF16'
pk2.mac='06cc571e6d96e12d'data2=unhex(pk2.len+pk2.ID+pk2.slnum+pk2.fatCode+pk2.cardASN+pk2.cardType+pk2.userNO+pk2.fileName1+pk2.dataLen1+pk2.dataArea1+pk2.fileName2+pk2.dataLen2+pk2.dataArea2)pdata.value=data2
#cacl MAC
dll.CurCalc_DES_MAC64(805306481,pkey,0,pdata,102,pmac)
stro= pmac.value
strtemp=''
for c in stro:strtemp+="{0:02x}".format(ord(c))
#print strtemp
pk2.mac=strtemp
#data to send
sd2=pk2.len+pk2.ID+pk2.slnum+pk2.fatCode+pk2.cardASN+pk2.cardType+pk2.userNO+pk2.fileName1+pk2.dataLen1+pk2.dataArea1+pk2.fileName2+pk2.dataLen2+pk2.dataArea2+pk2.macprint  'send2 len is 0x%02x' %(len(sd2)/2)
print sd2#PORT="192.168.60.37"
#PORT="localhost"
HOST, PORT = "192.168.51.28", 5800
# Create a socket (SOCK_STREAM means a TCP socket)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:# Connect to server and send datasock.connect((HOST, int(PORT))#data= "123456789"#s = struct.pack('bbb',1,2,3)sock.send(sd.decode('hex'))print "Sent1 OK:"print sd# Receive data from the server and shut downreceived = sock.recv(1024)print "Received:"print_hex(received)print  'received len is 0x%02x' %(len(received))print  'received data analysis...'re1=received[0:4]print_hex(re1)re1=received[4:6]print_hex(re1)re1=received[6:10]print_hex(re1)re1=received[10:16]print_hex(re1)#pack2 sendsock.send(sd2.decode('hex'))print "Sent2 OK:"print sd2# Receive data from the server and shut downreceived1 = sock.recv(1024)print "Received1:"print_hex(received1)print  'received1 len is 0x%02x' %(len(received1))finally:sock.close()s=raw_input('press any key to continue...')

二进制文件解析工具

# -*- coding: utf-8 -*-
from myutil import *
from binascii import unhexlify as unhex
import os
path=os.getcwd()path+='\\rec04.bin'
#print  path
print "begin ans......"f1=open(path,'rb')
for i in range(1,35):s=f1.read(280)print "data:",iprint_hex(s)print 'read data is:'
print_hex(s)recstatadd = 187
print "终端编号:"
print_hex(s[recstatadd:recstatadd+10])
print "卡号长度:"
print_hex(s[10])
print "卡号:    "
print_hex(s[11:11+10])
print "持卡序号1+所属地城市代码2+交易地城市代码2"
print_hex(s[recstatadd+22:recstatadd+22+5])
print "应用交易计数器"
print_hex(s[92:92+2])
print "交易前余额4,交易金额3"
print_hex(s[recstatadd+29:recstatadd+29+7])
print "交易日期:"
print_hex(s[99:99+3])
print "交易时间:"
print_hex(s[44:44+3])
print "终端编号"
print_hex(s[21:21+8])
print "商户编号"
print_hex(s[21+8:21+8+15])
print "批次号"
print_hex(s[5:5+3])
print "应用密文"
print_hex(s[47:47+8])
print "授权金额"
print_hex(s[103:103+6])
print "其他金额"
print_hex(s[115:115+6])
print "终端验证结果"
print_hex(s[94:5+94])
print "应用交易计数器"
print_hex(s[92:92+4])
print "卡片验证结果"
print_hex(s[56:56+32])
print "卡片序列号:"
print_hex(s[131])
f1.close()

抓取动漫图片

# -*- coding:utf8 -*-
# 2013.12.36 19:41
# 抓取dbmei.com的图片。from bs4 import BeautifulSoup
import os, sys, urllib2,time,random# 创建文件夹
path = os.getcwd()                      # 获取此脚本所在目录
new_path = os.path.join(path,u'暴走漫画')
if not os.path.isdir(new_path):os.mkdir(new_path)def page_loop(page=1):url = 'http://baozoumanhua.com/all/hot/page/%s?sv=1389537379' % pagecontent = urllib2.urlopen(url)soup = BeautifulSoup(content)my_girl = soup.find_all('div',class_='img-wrap')for girl in my_girl:jokes = girl.find('img')link = jokes.get('src')flink = linkprint flinkcontent2 = urllib2.urlopen(flink).read()#with open(u'暴走漫画'+'/'+time.strftime('%H-%M-%S')+random.choice('qwertyuiopasdfghjklzxcvbnm')+flink[-5:],'wb') as code:          #在OSC上现学的with open(u'暴走漫画'+'/'+flink[-11:],'wb') as code:code.write(content2)page = int(page) + 1print u'开始抓取下一页'print 'the %s page' % pagepage_loop(page)page_loop()

抓取网站模板

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# by yangyongzhen
# 2016-12-06from bs4 import BeautifulSoup
import urllib,urllib2,os,time
import rerootpath = os.getcwd()+u'/抓取的模板/'def makedir(path):if not os.path.isdir(path):os.makedirs(path)#创建抓取的根目录
makedir(rootpath)
#显示下载进度
def Schedule(a,b,c):'''''a:已经下载的数据块b:数据块的大小c:远程文件的大小'''per = 100.0 * a * b / cif per > 100 :per = 100print '%.2f%%' % perdef grabHref(url,listhref,localfile):html = urllib2.urlopen(url).read()html = unicode(html,'gb2312','ignore').encode('utf-8','ignore')content = BeautifulSoup(html).findAll('link')myfile = open(localfile,'w')pat = re.compile(r'href="([^"]*)"')pat2 = re.compile(r'http')for item in content:h = pat.search(str(item))href = h.group(1)if pat2.search(href):ans = hrefelse:ans = url+hreflisthref.append(ans)    myfile.write(ans)myfile.write('\r\n')print anscontent = BeautifulSoup(html).findAll('script')pat = re.compile(r'src="([^"]*)"')pat2 = re.compile(r'http')for item in content:h = pat.search(str(item))if h:href = h.group(1)if pat2.search(href):ans = hrefelse:ans = url+hreflisthref.append(ans)     myfile.write(ans)myfile.write('\r\n')print anscontent = BeautifulSoup(html).findAll('a')pat = re.compile(r'href="([^"]*)"')pat2 = re.compile(r'http')for item in content:h = pat.search(str(item))if h:href = h.group(1)if pat2.search(href):ans = hrefelse:ans = url+hreflisthref.append(ans) myfile.write(ans)myfile.write('\r\n')print ansmyfile.close()def main():url = "http://192.168.72.140/qdkj/"   #采集网页的地址listhref =[]    #链接地址localfile = 'ahref.txt'  #保存链接地址为本地文件,文件名grabHref(url,listhref,localfile)listhref = list(set(listhref)) #去除链接中的重复地址curpath = rootpathstart = time.clock()for item in listhref:curpath = rootpathname = item.split('/')[-1]fdir = item.split('/')[3:-1] for i in fdir:curpath += icurpath += '/'print curpathmakedir(curpath)local = curpath+nameurllib.urlretrieve(item, local,Schedule) # 远程保存函数end = time.clock()print u'模板抓取完成!'print u'一共用时:',end-start,u'秒'if __name__=="__main__":main()

python常用小脚本总结相关推荐

  1. 工具|Python常用小脚本

    作者: Beard林 免责声明:本文仅供学习研究,严禁从事非法活动,任何后果由使用者本人负责 0x00 前言 大多数时候,python都不适合编写大型的图形化工具. 但是对于渗透中会用到的小工具来说, ...

  2. pythonencoding etf-8_etf iopv python 代码30个Python常用小技巧

    1.原地交换两个数字x, y =10, 20 print(x, y) y, x = x, y print(x, y) 10 20 20 10 2.链状比较操作符n = 10 print(1 print ...

  3. Python常用小技巧(五)——批量读取json文件

    Python常用小技巧(五)--批量读取json文件 前言:其实Python能够批量读取很多文件,这里,本人以json文件为例(json是标注图片时生成的文件,记录有标注的坐标和标签,友情推荐标注图片 ...

  4. python常用小技巧(四)——批量图片改名

    python常用小技巧(四)--批量图片改名 前言:在日常使用中我们需要批量修改图片名字,使用Python的话就可以很快地完成这个目标 一.材料准备 - os 二.程序编写 # -*- coding: ...

  5. Python常用小技巧(二)——打开图片

    Python常用小技巧(二)--打开图片 前言:对于大量图片的文件夹,你很难手工去检查每张图片是否损坏,这时候就要用程序去检查每张图片是否能打开了 一.材料准备 - os - PIL 二.程序编写 i ...

  6. python常用小技巧(一)——百度图片批量爬取

    python常用小技巧(一)--百度图片无限制批量爬取 前言:我们在日常使用(搜壁纸,搜美女--)或者科研项目(图像识别)中经常要批量获取某种类型的图片,然而很多时候我们都需要一个个点击下载,有什么办 ...

  7. python爬虫脚本ie=utf-8_分享一个Python爬虫小脚本

    此Python小脚本为抓取此页面:http://tieba.baidu.com/p/2108681777 下的所有jpg图像 ''' Created on 2013-4-2 @author: Admi ...

  8. Python常用小技巧,提高刷题效率(适用于蓝桥杯python组)

    1. 掌握python标准库及小技巧 python课程学习到面向对象,就可以刷题参加算法比赛了 对于蓝桥杯不支持第三方库,但学会python标准库,将事半功倍: 2. 常用的列表函数 list1.ap ...

  9. 最值得你收藏的30个Python常用小技巧

    1.原地交换两个数字 x, y =10, 20 print(x, y) y, x = x, y print(x, y) 10 20 20 10 2.链状比较操作符 n = 10 print(1 < ...

最新文章

  1. linux 更改wp版本号,代码实现移除 WordPress 版本号
  2. python入门代码示例-Python入门简单的静态网页爬虫3.0 (爬虫的示例代码)
  3. shell脚本删除linux中的文件
  4. python 实现倒排索引,建立简单的搜索引擎
  5. openresty完全开发指南_FDA拟修订群体药代动力学指南:医药商需要了解些什么?...
  6. python中的sorted是什么意思_python中sort与sorted区别
  7. STM32F0使用LL库实现PWM输出
  8. ORM框架之Mybatis(五)mybatis生成器配置文件说明以及生成的类的说明
  9. bootstrap table导出功能无效报错Uncaught INVALID_CHARACTER_ERR: DOM Exception 5和导出中文乱码问题...
  10. Haproxy均衡负载部署和配置文件详解
  11. 如何提升Wi-Fi速度 学会更改无线信道
  12. 字符编码——简体中文编码中区位码、国标码、内码、外码、字形码的区别及关系
  13. 人工智能导论期末复习题
  14. R语言中三线表是什么?使用table1包绘制(生成)三线表实战
  15. linux启动SSH及开机自动启动
  16. 3dmax2022兼容疯狂模渲大师最新版|疯狂模渲大师3.6.0.4下载安装步骤教程怎么激活素材库和装机3dmax超一流辅助客户端的?
  17. [坑]微信支付首次支付成功,第二次调用失败
  18. 【XBEE手册】ZigBee网络
  19. 【计算机考研】复试常见问题
  20. Linux vmstat 命令详解

热门文章

  1. 小米8 twrp recovery_橙狐Recovery-一款另类功能丰富的第三方刷机工具-支持MIUI OTA
  2. 计算机网络基础 网络互联设备和多层交换
  3. PP-ShiTu 库管理工具使用教程
  4. window下Anaconda环境的创建、删除、激活、退出
  5. PageHelper这种情况下有坑
  6. 移民 萨大 计算机本科 移民家园,移民家园
  7. [C]你的n元一次常系数线性方程组解答小助手
  8. Jolla 宣布 Sailfish 系统浏览器开源
  9. 如何在UnrealEngine虚幻引擎中进行版本管理
  10. 【自学Docker 】Docker ps命令