爬虫:爬虫的根本就是得到一个网页的源代码数据。更深入一些,就会出现和网页进行POST交互从而获取服务器接收POST请求后返回的数据!总结:爬虫就是由计算机自动与服务器交互获取数据的工具。(爬虫请注意网站的Robot.txt文件!不要让爬虫违法!也不要让爬虫对网站造成伤害!)下面让我们怎么爬虫查看12306网站。

1、首先打开12306余票查询的界面

https://kyfw.12306.cn/otn/lcxxcx/init

我们想要的信息当然就是在输入了始发站、终点站和日期之后各车次的时间和车票余量,那么我们尝试在始发站使用检查元素,观察一下它是怎么上传始发站的信息的,那么我们不妨随便输入出发地、目的地和信息,使用抓包工具来看看它是怎么发包的(使用浏览器也可以,因为我们只需要查看包的内容,不需要更改包)

2、

在chrome的network中我们可以查看到我们点击之后浏览器发送的所有包(关于http包的知识不熟悉的同学,可以看看《图解http》这本书)

点击查询之后我们马上就会注意到以query开头的这个包,显然这就是一个查询指令,我们看看这个包的url

'https://kyfw.12306.cn/otn/lcxxcx/query?purpose_codes=ADULT&queryDate=2016-10-04&from_station=BJP&to_station=XKS'

然后我们看看它的response

仔细观察就能发现它其实是一串json格式的字符串(要非常有经验。。。。)

3、经过以上这些过程,我们大致就能知道我们需要做的是什么了,我们只需要更改url中的data,fromstaion,tostaion后面的内容,然后用requests获得response,然后解析这一串json字符就行了。

但是我们会发现,日期还好说,对于fromstation和tostaion的代码,我们该怎么办呢?

4、有两种可能,一中可能是这些文件在服务器上,每回改变站点网页都会从服务器请求这个站点的代码,还有一种可能是这个已经下载到本地了,如何判断呢?我们不妨改变一下始发站,然后用抓包软件(或者浏览器)观察我们的浏览器是否向12306发送了包

把北京改成了上海,但是我们发现浏览器并没有发送包

这样我们基本可以肯定这个车站编号信息是存在本地了(已经从服务器下载下来)

5、我们这时候,就需要分析html来发现这个编号信息到底储存在了那里

我们试着检查一下出发地附近的html标签,在‘热门'上面点击检查,我们很容易发现这个标签上面带了一个onclick方法

我们发现这个onclick方法指向了一个js文件,并且名字是‘Stationfor12306',基本我们可以确定这个js文件就是我们需要的站点信息文件了。

6、我们尝试在这个html(12306余票查询界面)里面搜一下stationfor,我们马上就能发现,它就在

标签的元素里,并且指向了一个url

进入这个url看看,我们马上就发现站点信息已经被我们找到啦(注意这是一个相对URL,绝对url需要在前面补上https://kyfw.12306.cn/)

关于怎么获取三位数的车站代码,用正则,字符串查询都是可以的啦,由于这里是固定的3位车站代码,我就用简单的字符串查询来提取这个代码了。

7、剩下的工作,基本就是代码实现了,关于具体怎么实现,我把我的代码贴在下面了。

#coding=utf-8

import requests

import argparse

import datetime

import re

from prettytable import PrettyTable

now = datetime.datetime.now()

tomorrow = now+datetime.timedelta(days=1)

tomorrow = tomorrow.strftime('%Y-%m-%d')

print tomorrow

argument = argparse.ArgumentParser()

argument.add_argument('--fromcity','-f',default='hangzhoudong')

argument.add_argument('--tocity','-t',default='xiamen')

argument.add_argument('--date','-d',default=tomorrow)

# argument.add_argument('-d',action='store_true')

args =argument.parse_args()

from_station = args.fromcity

to_station = args.tocity

Date = args.date

stationlist_url = 'https://kyfw.12306.cn/otn/resources/js/framework/station_name.js'

r = requests.get(stationlist_url, verify=False)

stationlist = r.content

ToStation = ''

FromStation = ''

placea = stationlist.find(from_station)

placeb = stationlist.find(to_station)

for i in range(-4,-1):

FromStation += stationlist[placea+i]

for i in range(-4,-1):

ToStation += stationlist[placeb+i]

query_url='https://kyfw.12306.cn/otn/lcxxcx/query?purpose_codes=ADULT&queryDate='+Date+'&from_station='+FromStation+'&to_station='+ToStation

r = requests.get(query_url,verify=False)

with open('json.txt','w') as fp:

fp.write(str(r.json()))

if 'datas' in r.json()["data"]:

rj = r.json()["data"]["datas"]

pt = PrettyTable()

header = '车次 车站 到站时间 时长 一等座 二等座 软卧 硬卧 硬座 无座'.split()

pt._set_field_names(header)

for x in rj:

ptrow = []

ptrow.append(x["station_train_code"])

ptrow.append(\n'.join([x["from_station_name"],x["to_station_name"]]))

ptrow.append(\n'.join([x["start_time"], x["arrive_time"]]))

ptrow.append(x["lishi"].replace(':','h')+'m')

ptrow.append(x['zy_num'])

ptrow.append(x['ze_num'])

ptrow.append(x['rw_num'])

ptrow.append(x['yw_num'])

ptrow.append(x['yz_num'])

ptrow.append(x['wz_num'])

pt.add_row(ptrow)

print pt

else :

print '这两个站点没有直达列车'

以上就是本文关于python编程实现12306的一个小爬虫实例的全部内容,希望对大家有所帮助。下面是12306抢票代码源码:

抢票代码:

千万注意要导入的包,注意注意,不然运行可能会报错!

import urllib, sys, os, time, json

# import http.httplib

import http.client

import gzip

from io import StringIO;

import traceback

import logging

import datetime

import cProfile

import subprocess

#加载你的配置文件名

#from conf_frankie_test import *

#from conf_neil import *

#from conf_example import *

#清理临时文件,如验证码等

from shuapiao12306.conf_example import g_passengers, g_max_auto_times, passwd, g_buy_list, g_ingnore_list,

g_care_seat_types, g_query_data, g_query_sleep_time, user

g_clean_temp = False

##########################internal###############################

g_str_train_types = {

"G": u"高铁",

"L": u"临客",

"D": u"动车",

"Z": u"直达",

"T": u"特快",

"K": u"快速",

}

#g_seat_code

g_seat_code_dict = {

"yz_num":"1",

"rz_num":"2",

"yw_num":"3",

"rw_num":"4",

"gr_num":"6",

"tz_num":"P",

"wz_num":"WZ",

"ze_num":"O",

"zy_num":"M",

"swz_num":"9",

}

logger = logging.getLogger('shuapiao')

g_conn = http.client.HTTPConnection('kyfw.12306.cn', timeout=100)

#restart conn

def restart_conn(conn):

print ("restart connection")

conn.close()

conn = http.client.HTTPConnection('kyfw.12306.cn', timeout=100)

conn.connect()

#装饰器

def retries(max_tries):

def dec(func, conn=g_conn):

def f2(*args, **kwargs):

tries = range(max_tries)

#tries.reverse()

for tries_remaining in tries:

try:

return func(*args, **kwargs)

except http.client.HTTPException as e:

print ("conneciont error")

restart_conn(conn)

except Exception as e:

if tries_remaining > 0:

traceback.print_exc()

logger.error("errror %d" % tries_remaining)

logger.error(traceback.format_exc())

else:

raise e

else:

break

return f2

return dec

#调用OCR

def call_tesseract(in_file):

tesseract_exe_name = 'tesseract'

expect_len = 4

out_file = "o"

args = [tesseract_exe_name, in_file, out_file]

proc = subprocess.Popen(args)

ret = proc.wait()

if ret != 0:

print ("call tesseract failed:%d" % ret)

return ''

out_full = out_file + '.txt'

f = open(out_full)

text = f.read()

f.close()

if g_clean_temp:

os.remove(out_full)

text = text.rstrip(\n')

text = text.replace(" ", "")

print ("auto read rand_code:%s" % text)

if len(text) != expect_len:

print ("auto read faild:%s, %d" % (text, len(text)))

return ''

return text

'''

HttpAuto

'''

class HttpAuto:

def __init__(self):

self.ext_header = {

"Accept":"*/*",

"X-Requested-With":"XMLHttpRequest",

"Referer": "http://kyfw.12306.cn/otn/login/init#",

"Accept-Language": "zh-cn",

"Accept-Encoding": "gzip, deflate",

"Connection":"Keep-Alive",

"Cache-Control": "no-cache",

"User-Agent": "Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; Trident/5.0)",

"Content-Type": "application/x-www-form-urlencoded; charset=UTF-8",

}

self.proxy_ext_header = {

"Accept": "*/*",

"X-Requested-With":"XMLHttpRequest",

"Referer": "http://kyfw.12306.cn/otn/login/init#",

"Accept-Language": "zh-cn",

"Accept-Encoding": "gzip, deflate",

"Proxy-Connection": "Keep-Alive",

"Pragma": "no-cache",

"User-Agent": "Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; Trident/5.0)",

"Content-Type": "application/x-www-form-urlencoded; charset=UTF-8",

}

#cockies

self.sid = ''

self.sip = ''

#passenger info to be POST

self.passengerTicketStr = ''

self.oldPassengerStr = ''

#used to POST

self.globalRepeatSubmitToken = ''

self.key_check_isChange = ''

self.orderId = ''

self.pass_code = 'abcd'

self.rand_code = 'abcd'

return

def construct_passengerTicketStr(self):

print ("###construct_passengerTicketStr###")

str1 = ''

str2 = ''

for p in g_passengers:

str1 = str1 + '1,0,1,' + p['name'] + ',1,' + p['id'] + ','+ p['tel']+ ',N_'

str2 = str2 + p['name'] + ',1,' + p['id'] + ',1_'

str1 = str1[:-1]

self.passengerTicketStr = str1.encode('utf8')

self.oldPassengerStr = str2.encode('utf8')

print ("new:%s" % self.passengerTicketStr)

print ("old:%s" % self.oldPassengerStr)

def logout(self):

url_logout = "http://kyfw.12306.cn/otn/login/loginOut"

g_conn.request('540', url_logout, headers=self.proxy_ext_header)

return True

def __del__(self):

self.logout()

print ("close connnection")

g_conn.close()

return

def update_session_info(self, res):

print ("process header cookie")

update = False

for h in res.getheaders():

if h[0] == "set-cookie":

l = h[1].split(',')[0].strip()

if l.startswith('JSESSIONID'):

self.sid = l.split(';')[0].strip()

update = True

print ("Update sessionid "+self.sid)

if l.startswith('BIGipServerotn'):

self.sip = l.split(';')[0].strip()

update = True

print ("Update sip:"+self.sip)

l = h[1].split(',')[1].strip()

if l.startswith('BIGipServerotn'):

self.sip = l.split(';')[0].strip()

update = True

print ("Update sip:"+self.sip)

return update

def check_pass_code_common(self, module, rand_method):

ret = False

auto_times = g_max_auto_times

while 1:

url_pass_code = "http://kyfw.12306.cn/otn/passcodeNew/getPassCodeNew?module=%s&rand=%s" % (module, rand_method)

print ("send getPassCodeNew:%s" % datetime.datetime.now())

header = ''

if module == 'login':

header = self.ext_header

else:

header = self.proxy_ext_header

g_conn.request('GET', url_pass_code, headers=header)

res = g_conn.getresponse()

print ("recv getPassCodeNew=====>:%s" % datetime.datetime.now())

if module == 'login':

self.update_session_info(res)

self.ext_header["Cookie"] = self.sid+';'+self.sip

#save file

pic_type = res.getheader('Content-Type').split(';')[0].split('/')[1]

data = res.read()

file_name = "./pass_code.%s" % pic_type

f = open(file_name, 'wb')

f.write(data)

f.close()

#auto read or manual

read_pass_code = ''

if g_max_auto_times > 0:

auto_times = auto_times - 1

read_pass_code = call_tesseract(file_name)

if read_pass_code == '':

read_pass_code = input("input passcode(%s):" % file_name)

if read_pass_code == "no":

print ("Get A new PassCode")

continue

elif read_pass_code == "quit":

print ("Quit")

break

print ("input:%s" % read_pass_code)

else:

print ("auto:%s" % read_pass_code)

if g_clean_temp:

os.remove(file_name)

data = []

if module == 'passenger':

self.proxy_ext_header["Referer"] = "http://kyfw.12306.cn/otn/confirmPassenger/initDc#nogo"

self.rand_code = read_pass_code

data = [

("_json_att", ''),

("rand", rand_method),

("randCode", read_pass_code),

("REPEAT_SUBMIT_TOKEN", self.globalRepeatSubmitToken),

]

elif module == 'login':

self.pass_code = read_pass_code

data = [

("randCode", read_pass_code),

("rand", rand_method)

]

else:

pass

post_data = urllib.urlencode(data)

print ("send checkRandCodeAnsyn=====>:" )#% post_data

url_check_rand = "http://kyfw.12306.cn/otn/passcodeNew/checkRandCodeAnsyn"

g_conn.request('POST', url_check_rand, body=post_data, headers=header)

res = g_conn.getresponse()

data = res.read()

print ("recv checkRandCodeAnsyn")

resp = json.loads(data)

if resp['data'] != 'Y':

print ("status error:%s" % resp['data'])

continue

else:

ret = True

break

return ret

@retries(3)

def check_pass_code(self):

print ("#############################Step1:Passcode#########")

module = 'login'

rand_method = 'sjrand'

return self.check_pass_code_common(module, rand_method)

@retries(3)

def check_rand_code(self):

print ("#############################Step8:Randcode#########")

ret = False

module = 'passenger'

rand_method = 'randp'

return self.check_pass_code_common(module, rand_method)

@retries(3)

def loginAysnSuggest(self):

if not self.check_pass_code():

return False

print ("#############################Step2:Login#########")

url_login = "http://kyfw.12306.cn/otn/login/loginAysnSuggest"

data = [

("loginUserDTO.user_name", user),

("userDTO.password", passwd),

("randCode", self.pass_code)

]

post_data = urllib.urlencode(data)

#post_data="loginUserDTO.user_name=frankiezhu%%40foxmail.com&userDTO.password=sky123&randCode=%s" % self.pass_code

self.proxy_ext_header["Cookie"] = self.sid+';'+self.sip

print ("send loginAysnSuggest=====>" ) #% post_data

g_conn.request('POST', url_login, body=post_data, headers=self.proxy_ext_header)

res = g_conn.getresponse()

print ("recv loginAysnSuggest")

data = res.read()

res_json = json.loads(data)

if res_json['status'] != True or not res_json['data'].has_key('loginCheck'):

print (u"return error:%s" % ' '.join(res_json['messages']))

return False

if res_json['data']['loginCheck'] == 'Y':

print (u"login success")

return True

else:

print( u"login error %s" % res_json['data']['loginCheck'])

return False

def show_ticket(self, it):

print( it['station_train_code'], it['from_station_name'],it['to_station_name'],it['start_time'], it['arrive_time'],it['lishi'],

it['swz_num'],it['tz_num'], it['zy_num'],it['ze_num'],it['gr_num'], it['rw_num'],it['yw_num'],it['rz_num'],it['wz_num'],it['canWebBuy'])

return

############

#retcode: -2 for retry, -1 for error, 0 for success

############

def do_ticket(self, json_data, result, want_special):

ret = 0

for item in json_data['data']:

if item['queryLeftNewDTO']['canWebBuy'] == 'N':

continue

train_code = item['queryLeftNewDTO']['station_train_code']

if want_special and not train_code in g_buy_list:

continue

if train_code in g_ingnore_list:

continue

has_ticket = False

for care_type in g_care_seat_types:

if item['queryLeftNewDTO'][care_type] != "--" and item['queryLeftNewDTO'][care_type] != u"无":

has_ticket = True

break

if has_ticket:

result[train_code] = item

#query return none, retry

if not len(result):

return -2

#as the list prority

if want_special:

for train_code in g_buy_list:

if not result.has_key(train_code):

continue

ret = self.buy(result[train_code])

if not ret:

print ("Err during buy")

return -1

else:

return 0

#show all

for train_code, item in result.items():

self.show_ticket(item['queryLeftNewDTO'])

#get promote

cmd = input("input cmd[r|q|K101]:")

cmd = cmd.strip()

print ("input:%s" % cmd)

if cmd == "r":

print ("retry")

return -2

elif cmd == "q":

print ("quit")

return 0

else:

print ("buy ticket:%s" % cmd)

ret = self.buy(result[cmd])

if not ret:

print ("Err during buy")

return -1

else:

return 0

@retries(3)

def query(self):

print ("#############################Step3:Query#########")

self.proxy_ext_header["Referer"] = "http://kyfw.12306.cn/otn/leftTicket/init"

url_query = "http://kyfw.12306.cn/otn/leftTicket/query?" + urllib.urlencode(g_query_data)

print ("start query======>%s" % url_query)

want_special = False

if len(g_buy_list) != 0:

want_special = True

print ("JUST For:%s" % (','.join(g_buy_list)))

else:

print (u"车次 出发->到达 时间:到达 历时 商务座 特等座 一等座 二等座 高级软卧 软卧 硬卧 软座 硬座 无座 其他备注")

#"http://kyfw.12306.cn/otn/leftTicket/query?leftTicketDTO.train_date=2014-01-04&leftTicketDTO.from_station=SHH&leftTicketDTO.to_station=NJH&purpose_codes=ADULT"

q_cnt = 0

while 1:

q_cnt = q_cnt + 1

g_conn.request('GET', url_query, headers=self.proxy_ext_header)

res = g_conn.getresponse()

data = ''

if res.getheader('Content-Encoding') == 'gzip':

tmp = StringIO.StringIO(res.read())

gzipper = gzip.GzipFile(fileobj=tmp)

data = gzipper.read()

else:

data = res.read()

res_json = json.loads(data)

if res_json['status'] != True:

print ("parse json failed! data %s" % data)

continue

result = {}

ret = self.do_ticket(res_json, result, want_special)

if ret == 0:

break

elif ret == -2:

print (u"no ticket, refresh %d times!" % q_cnt)

time.sleep(g_query_sleep_time)

continue

return True

@retries(3)

def confirmPassenger_get_token(self):

print ("#############################Step6:confirmPassenger_get_token #########")

url_confirm_passenger = "http://kyfw.12306.cn/otn/confirmPassenger/initDc"

g_conn.request('GET', url_confirm_passenger, headers=self.proxy_ext_header)

res = g_conn.getresponse()

data = res.read()

if res.getheader('Content-Encoding') == 'gzip':

tmp = StringIO.StringIO(data)

gzipper = gzip.GzipFile(fileobj=tmp)

data = gzipper.readlines()

key_word = "globalRepeatSubmitToken"

key_find = False

line_token = ''

line_request_info = ''

for line in data:

if line.startswith(u' var globalRepeatSubmitToken = '.encode("utf8")):

line_token = line.decode("utf8")

continue

elif line.startswith(u' var ticketInfoForPassengerForm'.encode("utf8")):

line_request_info = line.decode("utf8")

key_find = True

break

if key_find:

self.globalRepeatSubmitToken = line_token.split('=')[1].strip()[1:-2]

print ("Update globalRepeatSubmitToken=%s" % self.globalRepeatSubmitToken)

req_data = line_request_info.split('=')[1].strip()[:-1]

req_data = req_data.replace("null", "''")

req_data = req_data.replace("true", "True")

req_data = req_data.replace("false", "False")

print ("line_request_info")

req_json = eval(req_data)

self.key_check_isChange = req_json['key_check_isChange']

self.leftTicketStr = req_json['leftTicketStr']

print ("Update key_check_isChange=%s" % self.key_check_isChange)

return True

else:

print ("globalRepeatSubmitToken not found")

return False

@retries(3)

def getQueueCount(self, item):

print ("#############################Step:getQueueCount #########")

url_queue_count = "http://kyfw.12306.cn/otn/confirmPassenger/getQueueCount"

#buy_date = 'Sun Jan 5 00:00:00 UTC+0800 2014'

tlist = time.ctime().split()

tlist[3] = '00:00:00'

tlist.insert(4, 'UTC+0800')

buy_date = ' '.join(tlist)

for t_type in g_care_seat_types:

if item['queryLeftNewDTO'][t_type] != "--" and item['queryLeftNewDTO'][t_type] != u"无":

break

s_type = g_seat_code_dict[t_type]

data = [

("train_date", buy_date),

("train_no", item['queryLeftNewDTO']['train_no']),

("stationTrainCode",item['queryLeftNewDTO']['station_train_code']),

("seatType", s_type),

("fromStationTelecode", item['queryLeftNewDTO']['from_station_telecode']),

("toStationTelecode", item['queryLeftNewDTO']['to_station_telecode']),

("leftTicket",item['queryLeftNewDTO']['yp_info']),

("purpose_codes", "00"),

("_json_att", ''),

("REPEAT_SUBMIT_TOKEN", self.globalRepeatSubmitToken),

]

post_data = urllib.urlencode(data)

print ("send getQueueCount=====>" ) #% post_data

g_conn.request('POST', url_queue_count, body=post_data, headers=self.proxy_ext_header)

res = g_conn.getresponse()

data = res.read()

res_json = json.loads(data)

print ("recv getQueueCount:%s" % res_json)

if res_json['status'] != True:

print ("getQueueCount error :%s" % res_json)

return False

return True

@retries(3)

def checkOrderInfo(self):

print ("#############################Step9:checkOrderInfo #########")

url_check_order = "http://kyfw.12306.cn/otn/confirmPassenger/checkOrderInfo"

data = [

("cancel_flag", "2"),

("bed_level_order_num", "000000000000000000000000000000"),

("passengerTicketStr", self.passengerTicketStr),

("oldPassengerStr", self.oldPassengerStr),

("tour_flag","dc"),

("randCode",self.rand_code),

("_json_att", ''),

("REPEAT_SUBMIT_TOKEN", self.globalRepeatSubmitToken),

]

post_data = urllib.urlencode(data)

print ("send checkOrderInfo=====>")

#print "cancel_flag=2&bed_level_order_num=000000000000000000000000000000&passengerTicketStr=1%2C0%2C1%2C%E6%9C%B1%E5%AD%94%E6%B4%8B%2C1%2C320721198711180812%2C13430680458%2CN&oldPassengerStr=%E6%9C%B1%E5%AD%94%E6%B4%8B%2C1%2C320721198711180812%2C1_&tour_flag=dc&randCode=ewgw&_json_att=&REPEAT_SUBMIT_TOKEN=ad51ea02d933faf91d3d2eaeb5d85b3e"

g_conn.request('POST', url_check_order, body=post_data, headers=self.proxy_ext_header)

res = g_conn.getresponse()

data = res.read()

res_json = json.loads(data)

print ("recv checkOrderInfo:%s" % res_json)

if res_json['status'] != True or res_json['data']['submitStatus'] != True:

print ("checkOrderInfo error :%s" % res_json['data']['errMsg'])

return False

return True

@retries(3)

def checkUser(self):

print ("#############################Step4:checkUser #########")

url_check_info = "http://kyfw.12306.cn/otn/login/checkUser"

data = [

('_json_att', ''),

]

post_data = urllib.urlencode(data)

print (post_data)

print ("send checkUser=====>") #% post_data

g_conn.request('POST', url_check_info, body=post_data, headers=self.proxy_ext_header)

res = g_conn.getresponse()

data = res.read()

res_json = json.loads(data)

print ("recv checkUser")

if not res_json['data'].has_key('flag') or res_json['data']['flag'] != True:

print ("check user failed, %s" % res_json)

return False

else:

return True

@retries(3)

def submitOrderRequest(self, item):

print ("#############################Step5:submitOrderRequest #########")

url_submit = "http://kyfw.12306.cn/otn/leftTicket/submitOrderRequest"

post_data = "secretStr=" + item['secretStr']+"&train_date="

+ item['queryLeftNewDTO']['start_train_date']

+ "&back_train_date=" + item['queryLeftNewDTO']['start_train_date']

+ "&tour_flag=dc&purpose_codes=ADULT&query_from_station_name="

+ item['queryLeftNewDTO']['from_station_name']

+ "&query_to_station_name="+item['queryLeftNewDTO']['to_station_name']

+ "&undefined"

print (post_data)

print ("send submitOrderRequest=====>") #% post_data

g_conn.request('POST', url_submit, body=post_data.encode("utf8"), headers=self.proxy_ext_header)

res = g_conn.getresponse()

data = res.read()

res_json = json.loads(data)

if res_json['status'] != True:

print (u"submit order failed")

print (data)

print (''.join(res_json['messages']).encode('gb2312'))

return False

else:

return True

@retries(3)

def confirmSingleForQueue(self):

print ("#############################Step11:confirmSingleForQueue #########")

url_check_info = "http://kyfw.12306.cn/otn/confirmPassenger/confirmSingleForQueue"

data = [

('passengerTicketStr', self.passengerTicketStr),

("oldPassengerStr", self.oldPassengerStr),

('randCode', self.rand_code),

('purpose_codes', "00"),

('key_check_isChange', self.key_check_isChange),

('leftTicketStr', self.leftTicketStr),

('train_location', 'H2'),

('_json_att', ''),

("REPEAT_SUBMIT_TOKEN", self.globalRepeatSubmitToken),

]

post_data = urllib.urlencode(data)

print ("send confirmSingleForQueue=====>") #% post_data

g_conn.request('POST', url_check_info, body=post_data, headers=self.proxy_ext_header)

res = g_conn.getresponse()

data = res.read()

res_json = json.loads(data)

print ("recv confirmSingleForQueue")

if not res_json['data'].has_key('submitStatus') or res_json['data']['submitStatus'] != True:

print (u"confirmSingleForQueue failed, %s" % res_json)

return False

else:

return True

@retries(5)

def queryOrderWaitTime(self):

print ("#############################Step12:queryOrderWaitTime #########")

url_query_wait = "http://kyfw.12306.cn/otn/confirmPassenger/queryOrderWaitTime?"

cnt = 0

while 1:

data = [

('random', int(time.time())),

("tourFlag", "dc"),

('_json_att', ''),

("REPEAT_SUBMIT_TOKEN", self.globalRepeatSubmitToken),

]

url_query_wait = url_query_wait + urllib.urlencode(data)

print ("send queryOrderWaitTime:%d=====>" % cnt) #% url

g_conn.request('GET', url_query_wait, headers=self.proxy_ext_header)

res = g_conn.getresponse()

data = res.read()

res_json = json.loads(data)

print ("recv queryOrderWaitTime:%s" % res_json)

cnt = cnt + 1

if not res_json['data'].has_key('data') or res_json['data']['queryOrderWaitTimeStatus'] != True:

print ("queryOrderWaitTime error")

print (res_json['messages'])

break

if res_json['data']['waitCount'] == 0:

self.orderId = res_json['data']['orderId']

print ("Update orderId:%s" % self.orderId)

break

else:

continue

return True

@retries(3)

def resultOrderForDcQueue(self):

print ("#############################Step13:resultOrderForDcQueue #########")

url_result = "http://kyfw.12306.cn/otn/confirmPassenger/resultOrderForDcQueue"

data = [

('orderSequence_no', self.orderId),

('_json_att', ''),

("REPEAT_SUBMIT_TOKEN", self.globalRepeatSubmitToken),

]

post_data = urllib.urlencode(data)

print ("send resultOrderForDcQueue=====>") #% url

g_conn.request('POST', url_result, body=post_data, headers=self.proxy_ext_header)

res = g_conn.getresponse()

data = res.read()

res_json = json.loads(data)

print ("recv queryOrderWaitTime")

if not res_json['data'].has_key('submitStatus') or res_json['data']['submitStatus'] != True:

print ("submit error")

print (data)

return False

else:

print ("#############################Success check ticket in webbrowser #########")

return True

@retries(3)

def get_passenger_info(self):

print ("#############################Step7:getPassengerDTOs #########")

url_get_passager_info = "http://kyfw.12306.cn/otn/confirmPassenger/getPassengerDTOs"

data = [

('_json_att', ''),

('REPEAT_SUBMIT_TOKEN', self.globalRepeatSubmitToken)

]

post_data = urllib.urlencode(data)

print ("send getPassengerDTOs=====>") #% post_data

g_conn.request('POST', url_get_passager_info, body=post_data, headers=self.proxy_ext_header)

res = g_conn.getresponse()

data = res.read()

res_json = json.loads(data)

print ("recv getPassengerDTOs")

return True

def buy(self, item):

#Step4

if not self.checkUser():

return False

#Step5

if not self.submitOrderRequest(item):

return False

#Step6

if not self.confirmPassenger_get_token():

return False

self.proxy_ext_header["Referer"] = "http://kyfw.12306.cn/otn/confirmPassenger/initDc#nogo"

#Step7

#self.get_passenger_info

#Step8

if not self.check_rand_code():

return False

#Step9

if not self.checkOrderInfo():

return False

#Step10

if not self.getQueueCount(item):

return False

#Step11

if not self.confirmSingleForQueue():

return False

if not self.queryOrderWaitTime():

return False

#Step13

if not self.resultOrderForDcQueue():

return False

return True

def clean_temp_files():

print ("clean_temp_files")

pass

##############################################test#############################

@retries(3)

def test_retries():

print( "test")

raise NameError#httplib.HTTPException

def test_ocr():

f_name = "pass_code.jpeg"

text = call_tesseract(f_name)

print ("read:%s" % text)

@retries(3)

def test_reconnect():

header = {

"Accept":"*/*",

"X-Requested-With":"XMLHttpRequest",

"Accept-Language": "zh-cn",

"Accept-Encoding": "gzip, deflate",

"Connection":"Keep-Alive",

"Cache-Control": "no-cache",

"User-Agent": "Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; Trident/5.0)",

"Content-Type": "application/x-www-form-urlencoded; charset=UTF-8",

}

url = "http://www.baidu.com"

for i in range(3):

print ("send")

g_conn.request('GET', url, headers=header)

res = g_conn.getresponse()

data = res.read()

print ("send")

restart_conn(g_conn)

def test_get_svr_ips():

print ("test_get_svr_ips")

pass

##############################################test#############################

def show_conf():

print ("########show conf##############")

print ("Buy:%s" % (','.join(g_buy_list)))

print ("Ingnore:%s" % (','.join(g_ingnore_list)))

print ("Query data:", g_query_data)

print ("Passengers:", g_passengers)

print ("Sleep time:%f" % g_query_sleep_time)

print ("Auto OCR: %d" % g_max_auto_times)

print (\")

def main():

show_conf()

#set log

hdlr = logging.FileHandler('.log.txt')

formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')

hdlr.setFormatter(formatter)

logger.addHandler(hdlr)

logger.setLevel(logging.WARNING)

#test_retries()

print ("connecting......")

g_conn.connect()

ha = HttpAuto()

ha.construct_passengerTicketStr()

if not ha.loginAysnSuggest():

return False

while 1:

try:

ha.query()

except Exception as e:

traceback.print_exc()

return True

if __name__ == '__main__':

#test_ocr()

#test_reconnect()

main()

python12306源码_春运了,Python大神分享爬取12306车票信息的例子,附抢票源码相关推荐

  1. Python爬取12306车票信息

    Python3爬取12306车票信息 第一次写爬虫,咱从入门级--12306车票爬取 开始 我们要爬取的信息是https://www.12306.cn/index/上的车票信息 当我们选择出发地和目的 ...

  2. 2021最新 python爬取12306列车信息自动抢票并自动识别验证码(三)购票篇

    项目前言 tiebanggg又来更新了,项目--[12306-tiebanggg-master]注:本项目仅供学习研究,如若侵犯到贵公司权益请联系我第一时间进行删除:切忌用于一切非法途径,否则后果自行 ...

  3. python爬取12306列车信息自动抢票并自动识别验证码(一)列车数据获取篇

    项目前言 自学python差不多有一年半载了,这两天利用在甲方公司搬砖空闲之余写了个小项目--[12306-tiebanggg-master].注:本项目仅供学习研究,如若侵犯到贵公司权益请联系我第一 ...

  4. python爬取12306列车信息自动抢票并自动识别验证码(二)selenium登录验证篇

    项目前言 自学python差不多有一年半载了,这两天利用在甲方公司搬砖空闲之余写了个小项目--[12306-tiebanggg-master]注:本项目仅供学习研究,如若侵犯到贵公司权益请联系我第一时 ...

  5. Python爬虫入门(爬取豆瓣电影信息小结)

    Python爬虫入门(爬取豆瓣电影信息小结) 1.爬虫概念 网络爬虫,是一种按照一定规则,自动抓取互联网信息的程序或脚本.爬虫的本质是模拟浏览器打开网页,获取网页中我们想要的那部分数据. 2.基本流程 ...

  6. [Python 爬虫] 使用 Scrapy 爬取新浪微博用户信息(四) —— 应对反爬技术(选取 User-Agent、添加 IP代理池以及Cookies池 )

    上一篇:[Python 爬虫] 使用 Scrapy 爬取新浪微博用户信息(三) -- 数据的持久化--使用MongoDB存储爬取的数据 最近项目有些忙,很多需求紧急上线,所以一直没能完善< 使用 ...

  7. [Python 爬虫] 使用 Scrapy 爬取新浪微博用户信息(二) —— 编写一个基本的 Spider 爬取微博用户信息

    上一篇:[Python 爬虫] 使用 Scrapy 爬取新浪微博用户信息(一) -- 新建爬虫项目 在上一篇我们新建了一个 sina_scrapy 的项目,这一节我们开始正式编写爬虫的代码. 选择目标 ...

  8. 2021最新python爬取12306列车信息自动抢票并自动识别验证码

    项目描述 项目前言 tiebanggg又来更新了,项目--[12306-tiebanggg-master]注:本项目仅供学习研究,如若侵犯到贵公司权益请联系我第一时间进行删除:切忌用于一切非法途径,否 ...

  9. python爬火车票是不是违法_python利用selenium+requests+beautifulsoup爬取12306火车票信息...

    在高速发展的时代.乘车出远门是必不可少的,有些查询信息是要收费的.这里打造免费获取火车票信息 想要爬取12306火车票信息,访问12306官方网站,输入出发地,目的地  ,时间  之后点击确定,这是我 ...

最新文章

  1. Android json请求格式与from表单格式
  2. maven scala plugin 实现jvmArgs,执行过程原理解析笔记
  3. 【LeetCode从零单排】No21.MergeTwoSortedLists
  4. (三)协同过滤算法之基于物品的推荐算法python实现
  5. 计算机视觉应关注的资源
  6. 米家对讲机_对前面两代产品不断总结和完善的产物,米家对讲机2代开箱体验...
  7. 对Spring框架的理解(转)
  8. 深度学习pytorch基础入门教程(1小时)-张量、操作、转换
  9. VBA 收集 Word关键字批量处理-Excel版
  10. 模拟电子技术基础笔记(2)——半导体基础知识
  11. 文件粉碎机c语言代码,VB写文件粉碎机
  12. 阿里褚霸专访-揭秘技术男开挂升级的职业路径(回帖有奖)
  13. 解析常见网络钓鱼攻击方法
  14. 在Windows上安装Elasticsearch v5.4.2
  15. Codeforces Gym 100339B Diversion 树形DP + LCA
  16. 20、ZigBee 开发教程之基础篇—HC-SR501 人体红外传感器
  17. 2020年同济软院夏令营经历
  18. 软件概要设计说明书模版
  19. 设定绘图区坐标轴及标题字体、字号的两种方法
  20. 2022-04-27 openshift集群kubelet中出现unable to fetch pod logs错误问题定位

热门文章

  1. [讲座论坛] 应对气候变化的中国视角
  2. java表格点击添加按钮一行_JavaScript_JQuery实现动态表格点击按钮表格增加一行,功能实现:点击添加按钮,表 - phpStudy...
  3. 云”到底是什么? 云计算类型细分
  4. CV10 图像模糊(均值、高斯、中值、双边滤波)
  5. echarts 设置仪表盘数字的位置_【Python代替Excel】11:用Python做数据仪表盘
  6. STM32单片机基于HAL库开发HC-SR04 超声波测距模块(终极版)
  7. 计算机毕业设计之仿12306火车票购票平台
  8. 使用蓝牙连接设备显示无法连接的解决方案
  9. 「新职业」背后赚钱的需求在哪里?
  10. Centos7下安装配置开源存储虚拟化QuadStor(4)----Vdisk Clone