之前我在写百度网盘爬虫,百度图片爬虫的时候答应网友说,抽时间要把ok搜搜的的源码公开,如今是时候兑现诺言了,下面就是爬虫的所有代码,完全,彻底的公开,你会不会写程序都可以使用,不过请先装个linux系统,具备公网条件,然后运行:

python startCrawler.py

有必要提醒你,数据库字段代码中都有,请你自己建张表格,这个太简单了,就不多说了。同时我也提供一下下载地址,源码都在:下载地址1 下载地址2

代码如下:

#!/usr/bin/env python
# encoding: utf-8
"""
author:haoning
create time:2015.8.1
"""
import hashlib
import os
import time
import datetime
import traceback
import sys
import random
import json
import socket
import threading
from hashlib import sha1 #进行hash加密
from random import randint
from struct import unpack
from socket import inet_ntoa
from threading import Timer, Thread
from time import sleep
from collections import deque
from Queue import Queueimport MySQLdb as mdb  #数据库连接器import metautils
import downloadTorrent
from bencode import bencode, bdecode
import pygeoipDB_HOST = '127.0.0.1'
DB_USER = 'root'
DB_PASS = 'root'BOOTSTRAP_NODES = (("67.215.246.10", 6881),("82.221.103.244", 6881),("23.21.224.150", 6881)
)
RATE = 1 #调控速率
TID_LENGTH = 2
RE_JOIN_DHT_INTERVAL = 3
TOKEN_LENGTH = 2
INFO_HASH_LEN = 500000 #50w数据很小,限制内存不至于消耗太大
CACHE_LEN = 100 #更新数据库缓存
WAIT_DOWNLOAD = 80geoip = pygeoip.GeoIP('GeoIP.dat')def is_ip_allowed(ip):country = geoip.country_code_by_addr(ip)if country in ('CN','TW','JP','HK', 'KR'):return Truereturn Falsedef entropy(length):return "".join(chr(randint(0, 255)) for _ in xrange(length))def random_id():h = sha1()h.update(entropy(20))return h.digest()def decode_nodes(nodes):n = []length = len(nodes)if (length % 26) != 0:return nfor i in range(0, length, 26):nid = nodes[i:i+20]ip = inet_ntoa(nodes[i+20:i+24])port = unpack("!H", nodes[i+24:i+26])[0]n.append((nid, ip, port))return ndef timer(t, f):Timer(t, f).start()def get_neighbor(target, nid, end=10):return target[:end]+nid[end:]class KNode(object):def __init__(self, nid, ip, port):self.nid = nidself.ip = ipself.port = portclass DHTClient(Thread):def __init__(self, max_node_qsize):Thread.__init__(self)self.setDaemon(True)self.max_node_qsize = max_node_qsizeself.nid = random_id()self.nodes = deque(maxlen=max_node_qsize)def send_krpc(self, msg, address):try:self.ufd.sendto(bencode(msg), address)except Exception:passdef send_find_node(self, address, nid=None):nid = get_neighbor(nid, self.nid) if nid else self.nidtid = entropy(TID_LENGTH)msg = {"t": tid,"y": "q","q": "find_node","a": {"id": nid,"target": random_id()}}self.send_krpc(msg, address)def join_DHT(self):for address in BOOTSTRAP_NODES:self.send_find_node(address)def re_join_DHT(self):if len(self.nodes) == 0:self.join_DHT()timer(RE_JOIN_DHT_INTERVAL, self.re_join_DHT)def auto_send_find_node(self):wait = 1.0 / self.max_node_qsizewhile True:try:node = self.nodes.popleft()self.send_find_node((node.ip, node.port), node.nid)except IndexError:passtry:sleep(wait)except KeyboardInterrupt:os._exit(0)def process_find_node_response(self, msg, address):nodes = decode_nodes(msg["r"]["nodes"])for node in nodes:(nid, ip, port) = nodeif len(nid) != 20: continueif ip == self.bind_ip: continuen = KNode(nid, ip, port)self.nodes.append(n)class DHTServer(DHTClient): #获得info_hashdef __init__(self, master, bind_ip, bind_port, max_node_qsize):DHTClient.__init__(self, max_node_qsize)self.master = masterself.bind_ip = bind_ipself.bind_port = bind_portself.speed=0self.process_request_actions = {"get_peers": self.on_get_peers_request,"announce_peer": self.on_announce_peer_request,}self.ufd = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)self.ufd.bind((self.bind_ip, self.bind_port))timer(RE_JOIN_DHT_INTERVAL, self.re_join_DHT)def run(self):self.re_join_DHT()while True:try:(data, address) = self.ufd.recvfrom(65536)msg = bdecode(data)self.on_message(msg, address)except Exception:passdef on_message(self, msg, address):global RATE #设为全局量try:if msg["y"] == "r":if msg["r"].has_key("nodes"):self.process_find_node_response(msg, address) #发现节点elif msg["y"] == "q":try:self.speed+=1if self.speed % 10000 ==0:RATE=random.randint(1,3)if RATE==2:RATE=1if RATE==3:RATE=10if self.speed>100000:self.speed=0if self.speed % RATE==0: #数据过多,占用cpu太多,划分限速,1,1,10self.process_request_actions[msg["q"]](msg, address) #处理其他节点的请求,这个过程获取info_hash#self.process_request_actions[msg["q"]](msg, address) #处理其他节点的请求,这个过程获取info_hashexcept KeyError:self.play_dead(msg, address)except KeyError:passdef on_get_peers_request(self, msg, address):try:infohash = msg["a"]["info_hash"]tid = msg["t"]nid = msg["a"]["id"]token = infohash[:TOKEN_LENGTH]msg = {"t": tid,"y": "r","r": {"id": get_neighbor(infohash, self.nid),"nodes": "","token": token}}self.master.log(infohash, address)self.send_krpc(msg, address)except KeyError:passdef on_announce_peer_request(self, msg, address):try:infohash = msg["a"]["info_hash"]token = msg["a"]["token"]nid = msg["a"]["id"]tid = msg["t"]if infohash[:TOKEN_LENGTH] == token:if msg["a"].has_key("implied_port ") and msg["a"]["implied_port "] != 0:port = address[1]else:port = msg["a"]["port"]self.master.log_announce(infohash, (address[0], port))except Exception:print 'error'passfinally:self.ok(msg, address)def play_dead(self, msg, address):try:tid = msg["t"]msg = {"t": tid,"y": "e","e": [202, "Server Error"]}self.send_krpc(msg, address)except KeyError:passdef ok(self, msg, address):try:tid = msg["t"]nid = msg["a"]["id"]msg = {"t": tid,"y": "r","r": {"id": get_neighbor(nid, self.nid)}}self.send_krpc(msg, address)except KeyError:passclass Master(Thread): #解析info_hashdef __init__(self):Thread.__init__(self)self.setDaemon(True)self.queue = Queue()self.cache = Queue()self.count=0self.mutex = threading.RLock() #可重入锁,使单线程可以再次获得已经获得的?self.waitDownload = Queue()self.metadata_queue = Queue()self.dbconn = mdb.connect(DB_HOST, DB_USER, DB_PASS, 'oksousou', charset='utf8')self.dbconn.autocommit(False)self.dbcurr = self.dbconn.cursor()self.dbcurr.execute('SET NAMES utf8')self.visited = set()def lock(self): #加锁self.mutex.acquire()def unlock(self): #解锁self.mutex.release()def work(self,item):print "start thread",itemwhile True:self.prepare_download_metadata()self.lock()self.download_metadata()self.unlock()self.lock()self.got_torrent()self.unlock()def start_work(self,max):for item in xrange(max):t = threading.Thread(target=self.work, args=(item,))t.setDaemon(True)t.start()#入队的种子效率更高def log_announce(self, binhash, address=None):if self.queue.qsize() < INFO_HASH_LEN : #大于INFO_HASH_LEN就不要入队,否则后面来不及处理if is_ip_allowed(address[0]):self.queue.put([address, binhash]) #获得info_hashdef log(self, infohash, address=None):if self.queue.qsize() < INFO_HASH_LEN: #大于INFO_HASH_LEN/2就不要入队,否则后面来不及处理if is_ip_allowed(address[0]):self.queue.put([address, infohash])def prepare_download_metadata(self):if self.queue.qsize() == 0:sleep(2)#从queue中获得info_hash用来下载address, binhash= self.queue.get() if binhash in self.visited:returnif len(self.visited) > 100000: #大于100000重置队列,认为已经访问过了self.visited = set()self.visited.add(binhash)#跟新已经访问过的info_hashinfo_hash = binhash.encode('hex')utcnow = datetime.datetime.utcnow()self.cache.put((address,binhash,utcnow)) #装入缓存队列def download_metadata(self):if self.cache.qsize() > CACHE_LEN/2: #出队更新下载while self.cache.qsize() > 0: #排空队列address,binhash,utcnow = self.cache.get()info_hash = binhash.encode('hex')self.dbcurr.execute('SELECT id FROM search_hash WHERE info_hash=%s', (info_hash,))y = self.dbcurr.fetchone()if y:# 更新最近发现时间,请求数self.dbcurr.execute('UPDATE search_hash SET last_seen=%s, requests=requests+1 WHERE info_hash=%s', (utcnow, info_hash))else: self.waitDownload.put((address, binhash))self.dbconn.commit()if self.waitDownload.qsize() > WAIT_DOWNLOAD:while self.waitDownload.qsize() > 0:address,binhash = self.waitDownload.get()t = threading.Thread(target=downloadTorrent.download_metadata, args=(address, binhash, self.metadata_queue))t.setDaemon(True)t.start()def decode(self, s):if type(s) is list:s = ';'.join(s)u = sfor x in (self.encoding, 'utf8', 'gbk', 'big5'):try:u = s.decode(x)return uexcept:passreturn s.decode(self.encoding, 'ignore')def decode_utf8(self, d, i):if i+'.utf-8' in d:return d[i+'.utf-8'].decode('utf8')return self.decode(d[i])def parse_metadata(self, data): #解析种子info = {}self.encoding = 'utf8'try:torrent = bdecode(data) #编码后解析if not torrent.get('name'):return Noneexcept:return Nonedetail = torrentinfo['name'] = self.decode_utf8(detail, 'name')if 'files' in detail:info['files'] = []for x in detail['files']:if 'path.utf-8' in x:v = {'path': self.decode('/'.join(x['path.utf-8'])), 'length': x['length']}else:v = {'path': self.decode('/'.join(x['path'])), 'length': x['length']}if 'filehash' in x:v['filehash'] = x['filehash'].encode('hex')info['files'].append(v)info['length'] = sum([x['length'] for x in info['files']])else:info['length'] = detail['length']info['data_hash'] = hashlib.md5(detail['pieces']).hexdigest()return infodef got_torrent(self):if self.metadata_queue.qsize() == 0:returnbinhash, address, data,start_time = self.metadata_queue.get()if not data:returntry:info = self.parse_metadata(data)if not info:returnexcept:traceback.print_exc()returntemp = time.time()x = time.localtime(float(temp))utcnow = time.strftime("%Y-%m-%d %H:%M:%S",x) # get time nowinfo_hash = binhash.encode('hex') #磁力info['info_hash'] = info_hash# need to build tagsinfo['tagged'] = Falseinfo['classified'] = Falseinfo['requests'] = 1info['last_seen'] = utcnowinfo['create_time'] = utcnowinfo['source_ip'] = address[0]if info.get('files'):files = [z for z in info['files'] if not z['path'].startswith('_')]if not files:files = info['files']else:files = [{'path': info['name'], 'length': info['length']}]files.sort(key=lambda z:z['length'], reverse=True)bigfname = files[0]['path']info['extension'] = metautils.get_extension(bigfname).lower()info['category'] = metautils.get_category(info['extension'])try:try:print '\n', 'Saved', info['info_hash'], info['name'], (time.time()-start_time), 's', address[0]except:print '\n', 'Saved', info['info_hash']ret = self.dbcurr.execute('INSERT INTO search_hash(info_hash,category,data_hash,name,extension,classified,source_ip,tagged,' + 'length,create_time,last_seen,requests) VALUES(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)',(info['info_hash'], info['category'], info['data_hash'], info['name'], info['extension'], info['classified'],info['source_ip'], info['tagged'], info['length'], info['create_time'], info['last_seen'], info['requests']))if self.count %50 ==0:self.dbconn.commit()if self.count>100000:self.count=0except:print self.name, 'save error', self.name, infotraceback.print_exc()returnif __name__ == "__main__":#启动客户端master = Master()master.start_work(150)#启动服务器dht = DHTServer(master, "0.0.0.0", 6881, max_node_qsize=200)dht.start()dht.auto_send_find_node()

注意,上面的代码有一段代码需要下载种子,所以下面的这段代码十分重要:

#!/usr/bin/env python
# encoding: utf-8
"""
author:haoning
create time:2015.8.1
"""
from hashlib import sha1
import math
from socket import inet_ntoa
import socket
from struct import pack, unpack
from threading import Timer, Thread
from time import sleep, timefrom bencode import bencode, bdecode
from startCrawler import entropyBT_PROTOCOL = "BitTorrent protocol"
BT_MSG_ID = 20
EXT_HANDSHAKE_ID = 0def random_id():hash = sha1()hash.update(entropy(20))return hash.digest()def send_packet(the_socket, msg):the_socket.send(msg)def send_message(the_socket, msg):msg_len = pack(">I", len(msg))send_packet(the_socket, msg_len + msg)def send_handshake(the_socket, infohash):bt_header = chr(len(BT_PROTOCOL)) + BT_PROTOCOLext_bytes = "\x00\x00\x00\x00\x00\x10\x00\x00"peer_id = random_id()packet = bt_header + ext_bytes + infohash + peer_idsend_packet(the_socket, packet)def check_handshake(packet, self_infohash):try:bt_header_len, packet = ord(packet[:1]), packet[1:]if bt_header_len != len(BT_PROTOCOL):return Falseexcept TypeError:return Falsebt_header, packet = packet[:bt_header_len], packet[bt_header_len:]if bt_header != BT_PROTOCOL:return Falsepacket = packet[8:]infohash = packet[:20]if infohash != self_infohash:return Falsereturn Truedef send_ext_handshake(the_socket):msg = chr(BT_MSG_ID) + chr(EXT_HANDSHAKE_ID) + bencode({"m":{"ut_metadata": 1}})send_message(the_socket, msg)def request_metadata(the_socket, ut_metadata, piece):"""bep_0009"""msg = chr(BT_MSG_ID) + chr(ut_metadata) + bencode({"msg_type": 0, "piece": piece})send_message(the_socket, msg)def get_ut_metadata(data):ut_metadata = "_metadata"index = data.index(ut_metadata)+len(ut_metadata) + 1return int(data[index])def get_metadata_size(data):metadata_size = "metadata_size"start = data.index(metadata_size) + len(metadata_size) + 1data = data[start:]return int(data[:data.index("e")])def recvall(the_socket, timeout=5):the_socket.setblocking(0)total_data = []data = ""begin = time()while True:sleep(0.05)if total_data and time()-begin > timeout:breakelif time()-begin > timeout*2:breaktry:data = the_socket.recv(1024)if data:total_data.append(data)begin = time()except Exception:passreturn "".join(total_data)def download_metadata(address, infohash, metadata_queue, timeout=5):metadata = Nonestart_time = time()the_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)try: the_socket.settimeout(timeout)the_socket.connect(address)# handshakesend_handshake(the_socket, infohash)packet = the_socket.recv(4096)# handshake errorif not check_handshake(packet, infohash):return# ext handshakesend_ext_handshake(the_socket)packet = the_socket.recv(4096)# get ut_metadata and metadata_sizeut_metadata, metadata_size = get_ut_metadata(packet), get_metadata_size(packet)# request each piece of metadatametadata = []for piece in range(int(math.ceil(metadata_size/(16.0*1024)))): #piece是个控制块,根据控制块下载数据request_metadata(the_socket, ut_metadata, piece)packet = recvall(the_socket, timeout) #the_socket.recv(1024*17)metadata.append(packet[packet.index("ee")+2:])        metadata = "".join(metadata)except socket.timeout:passexcept Exception, e:passfinally:#print "metadata= %s" %(metadata)the_socket.close() #确保没回都关闭socketif metadata != None: #只让不空的种子入?            metadata_queue.put((infohash, address, metadata,start_time))

其实下载种子还有一种方式就是借助libtorrent,但这个太耗费cpu了,所以我一般不用他,如下:

#coding: utf8

import threading

import traceback

import random

import time

import os

import socket

import libtorrent as lt

threading.stack_size(200*1024)

socket.setdefaulttimeout(30)

def fetch_torrent(session, ih, timeout):

name = ih.upper()

url = 'magnet:?xt=urn:btih:%s' % (name,)

data = ''

params = {

'save_path': '/tmp/downloads/',

'storage_mode': lt.storage_mode_t(2),

'paused': False,

'auto_managed': False,

'duplicate_is_error': True}

try:

handle = lt.add_magnet_uri(session, url, params)

except:

return None

status = session.status()

handle.set_sequential_download(1)

meta = None

down_time = time.time()

down_path = None

for i in xrange(0, timeout):

if handle.has_metadata():

info = handle.get_torrent_info()

down_path = '/tmp/downloads/%s' % info.name()

#print 'status', 'p', status.num_peers, 'g', status.dht_global_nodes, 'ts', status.dht_torrents, 'u', status.total_upload, 'd', status.total_download

meta = info.metadata()

break

time.sleep(1)

if down_path and os.path.exists(down_path):

os.system('rm -rf "%s"' % down_path)

session.remove_torrent(handle)

return meta

def download_metadata(address, binhash, metadata_queue, timeout=20):

metadata = None

start_time = time.time()

try:

session = lt.session()

r = random.randrange(10000, 50000)

session.listen_on(r, r+10)

session.add_dht_router('router.bittorrent.com',6881)

session.add_dht_router('router.utorrent.com',6881)

session.add_dht_router('dht.transmission.com',6881)

session.add_dht_router('127.0.0.1',6881)

session.start_dht()

metadata = fetch_torrent(session, binhash.encode('hex'), timeout)

session = None

except:

traceback.print_exc()

finally:

metadata_queue.put((binhash, address, metadata,start_time))

这个爬虫还是耗费了本人和其他网上高手的很多时间的,请看到这篇博客的朋友保持钻研精神,开源精神,多多交流,秉承分享。本人建立个qq群作为去转盘网的官方群,人数现在也不多,如果有兴趣的话来逛逛吧,多个粉丝去转盘多一份热闹,qq群号:512245829

转载于:https://blog.51cto.com/5912119/1785284

python语言磁力搜索引擎源码公开,基于DHT协议,十二分有技术含量的技术博客...相关推荐

  1. 简单开源java ssm_[VIP源码]【S006】SSM(Spring+Spring MVC+Mybatis) java开源博客管理系统项目源码...

    java源码项目名称:SSM(Spring+Spring MVC+Mybatis) java开源博客管理系统项目源码  java项目源码 1 ?, R, _* q  n8 v) S$ R7 ?百度网盘 ...

  2. python版植物大战僵尸源码_基于python的植物大战僵尸游戏设计与实现.docx

    湖南理工学院毕业设计(论文) PAGE PAGE 1 学 号 毕业设计(论文) 题目:基于python的植物大战僵尸游戏设计与实现 作 者 届 别 届 院 别 信息与通信工程学院 专 业 信息工程 指 ...

  3. C#模仿腾讯QQ源码下载(附效果图)_张童瑶的博客

    该源码是C#语言+SQL Server数据库,开发的一套模仿腾讯QQ的功能,可以实现即时聊天,发送抖动窗口,开通会员,充值Qb,后台管理等等众多功能.源代码里面都有大量注释,都是重量级的代码,学习还是 ...

  4. v15.03 鸿蒙内核源码分析(内存映射) | 映射真是个好东西 | 百篇博客分析HarmonyOS源码

    子曰:"德不孤,必有邻." <论语>:里仁篇 百篇博客系列篇.本篇为: v15.xx 鸿蒙内核源码分析(内存映射篇) | 映射真是个好东西 内存管理相关篇为: v11. ...

  5. v09.04 鸿蒙内核源码分析(调度故事) | 用故事说内核调度 | 百篇博客分析HarmonyOS源码

    子曰:"吾与回言终日,不违如愚.退而省其私,亦足以发.回也,不愚."<论语>:为政篇 百篇博客系列篇.本篇为: v09.xx 鸿蒙内核源码分析(调度故事篇) | 用故事 ...

  6. v45.05 鸿蒙内核源码分析(Fork) | 一次调用 两次返回 | 百篇博客分析HarmonyOS源码

    孔子于乡党,恂恂如也,似不能言者.其在宗庙朝廷,便便言,唯谨尔. <论语>:乡党篇 百篇博客系列篇.本篇为: v45.xx 鸿蒙内核源码分析(Fork篇) | 一次调用 两次返回 进程管理 ...

  7. v54.04 鸿蒙内核源码分析(静态链接) | 一个小项目看中间过程 | 百篇博客分析HarmonyOS源码

    子曰:"回也其庶乎,屡空.赐不受命,而货殖焉,亿则屡中." <论语>:先进篇 百篇博客系列篇.本篇为: v54.xx 鸿蒙内核源码分析(静态链接篇) | 一个小项目看中 ...

  8. v11.03 鸿蒙内核源码分析(内存分配) | 内存有哪些分配方式 | 百篇博客分析HarmonyOS源码

    子曰:"君子周而不比,小人比而不周."<论语>:为政篇 百篇博客系列篇.本篇为: v11.xx 鸿蒙内核源码分析(内存分配篇) | 内存有哪些分配方式 内存管理相关篇为 ...

  9. v19.04 鸿蒙内核源码分析(位图管理) | 特节俭的苦命孩子 | 百篇博客分析HarmonyOS源码

    子曰:"饭疏食,饮水,曲肱而枕之,乐亦在其中矣.不义而富且贵,于我如浮云." <论语>:述而篇 百篇博客系列篇.本篇为: v19.xx 鸿蒙内核源码分析(位图管理篇) ...

最新文章

  1. 理解hasOwnProperty()的作用
  2. makefile ifneq多个判断条件_Python基础语法——条件判断
  3. mongodb的delete_MongoDB 删除数据库
  4. SQLServer约束介绍
  5. mysql存储过程详细教程
  6. 科研 | 如何找到研究的突破点?
  7. linux常用命令技巧
  8. Vue-Router学习笔记-(黑马视频)
  9. 最新GitHub新手使用教程(Windows Git从安装到使用)——详细图解
  10. python写几个好玩的程序_Python写的Msn机器人,几好玩的
  11. laya龙骨换装_FairyGUI - 骨骼动画
  12. 基于UGUI实现类似Excel表格功能
  13. ubuntu安装pandas
  14. 云场景实践研究第50期:咕咚
  15. 以微博为例进行Oauth2进行第三方授权登录
  16. php软件测试课程资源共享网站
  17. wcdma码片速率_WCDMA中码片速率、符号速率、bit速率 WCDMA常用概念
  18. win10 matlab out of memory,Win10玩大型游戏的时候提示“out of memory”怎么办?
  19. 走进Linux——进程(四)进程地址空间
  20. 国庆节,异乡游子如何为自己抢到一张高铁票

热门文章

  1. 传统服务化(SOA)与微服务(Micro Service)的融合之道 1
  2. 关于python中的字符串编码理解
  3. 金蝶EAS BOS开发常用的代码说明及常见问题
  4. 清华女生破解北斗?中国最年轻女博导揭秘背后实情
  5. 《1024伐木累》-小白篇之需求-总章节八
  6. 开源 免费 java CMS - FreeCMS-数据对象-answer
  7. 【转发】 iphone开发随笔,有用的
  8. 【原创】ASP.NET C# 对SQL/ACCESS 数据库的备份和还原函数
  9. Linux下MySQL表名区分大小写
  10. 重新排列数字使其刚好比当前值大 Next Greater Element III