前面讲过 python 爬虫的常用技巧,今天补上一篇实战指南,爬取知识星球里某个星球的所有数据,对,你没有听错,是所有数据,包括了内容、问答、评论、图片、文件、甚至是谁点了赞!心动了吧,赶快行动吧。

当然,本篇文章需要你有一点 python 基础,如果没有的话,建议你先收藏,去找一些教程学习一下这门工具人语言。

好了,废话不多说,马上开始。

首先,导入所需要的包:

import queue
import time
import threading
import requests
import pymongo
import logging
import os# 配置用于日志打印的 logger,纯属个人爱好,你可以用 print 代替
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger()

准备数据库

把获取的数据存入 MongoDB 中,为什么选择 MongoDB?因为非关系型数据库比较简单,我们用到的数据结构也不复杂,开发起来比较快。

if __NAME__ == '__MAIN__':try:# 打开数据库连接logger.info('Connecting to MongoDB...')client = pymongo.MongoClient(MONGODB_URI)logger.info('Successfully connected!')# 在此进行爬虫逻辑# 关闭数据库连接logger.info('Closing MongoDB...')client.close()logger.info('Successfully closed!')except Exception as e:logger.error(e)

分析知识星球的网络请求数据

用 Chrome 浏览器的开发者工具对知识星球 PC 端的网络请求进行观察,发现获取星球话题的请求只有一个,我们把它赋值给 BASE_URL。同时发现登录的 token 就在 cookie 里面: zsxq_access_token,啧啧,太明显了。

GROUP = '15281148525182' # 星球id
BASE_URL = 'https://api.zsxq.com/v1.10/groups/{}/topics'.format(GROUP)# 构造全局请求头
headers = {'cookie': '换成你的 Cookie','user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.198 Safari/537.36'
}

分析话题数据,可以归纳总结出以下结论:

  1. 话题类型有两种:talk 是普通话题,只有 1 条内容,q&a 是问答,问答含有提问和回答 2 条内容。
  2. 所有内容均可能包含图片或者文件(不太确定问答的内容是否会包含文件,因此当作可能包含文件处理)。
  3. 当请求返回的话题数量为 0 时,说明已经爬取完毕。

我的 CPU 有 4 个核心,考虑到文本、图片、文件出现的频次和下载时间,多线程设计如下:

  1. 设计 3 个队列:topic_qimages_qfiles_q,分别存取 end_time、图片信息、文件信息,分别用于获取话题信息、下载图片、下载文件。
  2. 设计 4 个线程,1 个获取话题信息,2 个下载图片,1个下载文件。
  3. 当所有队列结束时,程序结束。

流程图

为了能让你更好地理解,我画了一副流程图,可以配合流程图来理解代码,事半功倍。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-kJfE4AsK-1605666950402)(flow.jpg)]

多线程并行

根据上面的分析,创建 3 个队列,4 个线程,并把下面的代码放到连接、关闭数据库代码的中间:

# 任务队列
topic_q = queue.Queue()
image_q = queue.Queue()
file_q = queue.Queue()# 开启获取 topics 的线程
t = threading.Thread(target=get_topics_thread)
t.setDaemon(True)
t.start()# 开启获取 images 的线程
t = threading.Thread(target=get_images_thread)
t.setDaemon(True)
t.start()
# 再开启一个获取 images 的线程
t = threading.Thread(target=get_images_thread)
t.setDaemon(True)
t.start()# 开启获取 files 的线程
t = threading.Thread(target=get_files_thread)
t.setDaemon(True)
t.start()# 把第一个任务添加进队列
topic_q.put(None)# 等待任务队列结束
topic_q.join()
image_q.join()
file_q.join()

下面是各个线程函数,作用是不断的从对应任务队列中取出参数并执行处理方法,fetch_topicsfetch_imagesfetch_files 分别是下载对应内容的方法。

# 话题线程
def get_topics_thread():while True:job = topic_q.get()fetch_topics(job)# time.sleep(1)topic_q.task_done()# 图片线程
def get_images_thread():while True:job = image_q.get()fetch_images(job)# time.sleep(1)image_q.task_done()# 文件线程
def get_files_thread():while True:job = file_q.get()fetch_files(job)# time.sleep(1)file_q.task_done()

下载话题数据

创建 fetch_topics 方法,用来发送获取星球话题的请求,上面已经设置好了 BASE_URL,这里设置请求参数即可。

观察发现,API 的参数有 3 个,分别是:

  1. scope:话题范围,例如:精华话题还是图片话题。all 代表全部话题。
  2. count:返回的话题数量,网站里默认 20 个,但经测试,30 个也能正常返回,40个以上报错。
  3. end_time:关键参数,知识星球通过它来分页,不填则返回最新的 count 个话题,比如 20,如果你想得到第 21 - 40 个话题,那么就需要设置 end_time 为第 20 条话题的创建时间,并且要把创建时间的毫秒数减 1。
# 调用一次该方法,就请求一次 API,根据 end_time 参数的值来控制返回的话题
def fetch_topics(end_time=None):# 设置参数为全部话题,返回话题数量为 30 个params = {'scope': 'all','count': '30',}if end_time != None:params['end_time'] = end_time# 发送请求r = requests.get(BASE_URL, headers=headers, params=params, allow_redirects=False)# 打印请求地址,用来 debugprint(r.url)d = r.json()# 异常处理,如果服务器返回错误,则等候 15 秒,把 end_time 压入话题队列if d['succeeded'] == False:logger.error('get topics error, url: {}, params: {}'.format(BASE_URL, params))time.sleep(15)topic_q.put(end_time)return# 返回的话题数量为 0,说明已经爬取完毕,直接结束方法if len(d['resp_data']['topics']) == 0:logger.info('Fetch topics done!')return 'done'# 到这里说明一切正常,把得到的话题数据全部存入 MongoDBtry:db = client['zsxq']collection = db['topics_{}'.format(GROUP)]insertItems = [{ 'raw_data': topic, 'topic_id': topic['topic_id'] } for topic in d['resp_data']['topics']]insertResult = collection.insert_many(insertItems, ordered=True)logger.info(str(len(insertResult.inserted_ids)) + ' documents were inserted')except Exception as e:logger.error('Insert to mongodb error, related page {}'.format(r.url))logger.error(e)# 循环处理每一条话题数据,get_images 和 get_files 为把图片和文件的信息分别压入图片队列和文件队列for topic in d['resp_data']['topics']:# 类型为 talkif topic['type'] == 'talk':if 'talk' in topic:get_images(topic['talk'])get_files(topic['talk'])# 类型为 q&aelif topic['type'] == 'q&a':if 'question' in topic:get_images(topic['question'])get_files(topic['question'])if 'answer' in topic:get_images(topic['answer'])get_files(topic['answer'])else:# debug 专用,因为不确定是否含有除 talk 和 q&a 以外的话题,如果有,则打印出来,方便处理print(topic)# 到这里,说明得到的话题都处理过了,下面就要处理 end_time,然后把 end_time 压入话题队列end_time = d['resp_data']['topics'][len(d['resp_data']['topics']) - 1]['create_time']tmp = str(int(end_time[20:23]) - 1)while len(tmp) < 3:tmp = '0' + tmpend_time = end_time.replace('.' + end_time[20:23] + '+', '.' + tmp + '+')topic_q.put(end_time)

下载图片

图片可能包含三种类型:thumbnail 缩略图、large 大图、original 原图,不一定全都有,因此在下载前要判断。

def fetch_images(img_info):# 下载图片函数def download(url, image_id, type, subfix):# 设置目标文件位置target_dir = './images/{}/{}.{}'.format(image_id, type, subfix)# 文件夹不存在的话,则创建文件夹if not os.path.exists(os.path.dirname(target_dir)):try:os.makedirs(os.path.dirname(target_dir))except Exception as e:logger.error(e)# 下载with open(target_dir, "wb+") as file:response = requests.get(url)file.write(response.content)# 下面把图片保存的位置存在 MongoDB 中,和原文的 id 和类型对应。try:db = client['zsxq']collection = db['images_{}'.format(GROUP)]insertItem = {'symbol': '{}_{}'.format(image_id, type),'image_id': image_id,'type': type,'url': 'url','target_dir': target_dir}result = collection.insert_one(insertItem)logger.info('1 document was inserted into images_{} collection with the _id: {}'.format(GROUP, result.inserted_id))except Exception as e:logger.error('download image failed, image_id: {}, type: {}'.format(image_id, type))logger.error(e)# 下面处理不同类型的图片,并调用上面的下载方法if 'thumbnail' in img_info:download(img_info['thumbnail']['url'], img_info['image_id'], 'thumbnail', img_info['type'])if 'large' in img_info:download(img_info['thumbnail']['url'], img_info['image_id'], 'large', img_info['type'])if 'original' in img_info:download(img_info['thumbnail']['url'], img_info['image_id'], 'original', img_info['type'])# 由于图片下载比较慢,每下载一组打印一次剩余图片数量,让自己知道当前进度print('Remain: {}'.format(image_q.qsize()))pass

下载文件

知识星球 PC 端是无法下载文件的,我用手机抓包后才得到了下载地址:

def fetch_files(file_info):# 下载文件函数def download(url, filename):# 文件夹不存在的话,则创建文件夹if not os.path.exists(os.path.dirname(filename)):try:os.makedirs(os.path.dirname(filename))except Exception as e:logger.error(e)# 下载with open(filename, "wb+") as file:response = requests.get(url)file.write(response.content)# 下面把文件保存的位置存在 MongoDB 中,和原文的 id 对应。try:db = client['zsxq']collection = db['files_{}'.format(GROUP)]insertItem = {'file_id': file_info['file_id'],'name': file_info['name'],'target_dir': filename}result = collection.insert_one(insertItem)logger.info('1 document was inserted into files_{} collection with the _id: {}'.format(GROUP, result.inserted_id))except Exception as e:logger.error('download file failed, file_id: {}, file_name: {}'.format(file_info['file_id'], file_info['name']))logger.error(e)# 这里就是获取下载地址的 API,在手机上抓包得到的url = 'https://api.zsxq.com/v1.10/files/{}/download_url'.format(file_info['file_id'])r = requests.get(url, headers=headers)d = r.json()# 异常处理,打印错误,然后直接结束方法if d['succeeded'] != True:logger.error('fetch file download information failed, target: {}'.format(file_info))return# 得到下载地址后,执行下载download(d['resp_data']['download_url'], './files/{}/{}'.format(file_info['file_id'], file_info['name']))pass

以上就是今天的实战指南。最后是你们最关心的哪里下载源码?老实说,我能给你的最好建议其实是按照上面的例子自己敲一遍,真的很管用,学编程就是要动手。

注:所有代码均基于 python 3.6.5 版本,使用其他版本可能无法运行。

如何下载源码以及更多的编程资源?只需简单 2 步:

  1. 关注微信公众号:湾区码农
  2. 回复关键词zsxq即可获得

付费的知识星球要过期了,python 教你怎么办相关推荐

  1. Jerry Wang诚邀广大SAP同仁免费加入我的知识星球,共同探讨SAP技术问题

    大家知道Jerry Wang有一个微信公众号"汪子熙",2017年12月27日,Jerry的这个公众号发布了第一篇文章.到今天2018年10月底为止,正好十个月. 在这10个月的时 ...

  2. 内卷?躺平?先看看这6个高质量知识星球

    信息爆炸的互联网时代我们如何高效利用碎片化时间并筛选出有用的信息呢? 信息来源有公域.私域.两微一抖一快,如此信息量巨大的来源,一来二往,产生了知识付费. 知识星球作为一个私密的付费社群工具,在这些公 ...

  3. python爬知识星球付费数据_python抓取知识星球精选帖,制作为pdf文件

    版权声明:本文为xing_star原创文章,转载请注明出处! 背景: 这两年知识付费越来越热,我也加入了不少知识星球,总觉得信息有些过载了.一天不看,就有500+的内容显示未读,弄的自己格外的焦虑.感 ...

  4. python爬知识星球付费数据_用python爬取知识星球

    去年我们做过一个叫「学长问答」的社群活动,里面沉淀了大量有价值的互动信息,后来因为各种原因终止了.今天和涂腾聊起来,觉得这些信息就这么沉寂了太浪费.所以就试着用python爬取了知识星球的内容. 这个 ...

  5. python爬知识星球付费数据_python 知识星球文件下载

    python 知识星球文件下载 #!/usr/bin/python3 # -*- coding: UTF-8 -*- import requests import json from urllib.p ...

  6. python爬知识星球付费数据_以知识星球为例,教你如何运营一个付费社群|社群运营成功案例...

    现在知乎.微信等很多自媒体达人,都会邀请粉丝加入自己的知识星球.知识星球,以信息流付费社区的形式,成为知识付费市场的重要一员.从前身小密圈一夜爆红.崛起.负面新闻,到改制改名,目前知识星球已经非常成熟 ...

  7. python项目总结与展望_我做知识星球一周年总结与未来展望

    从去年今天创建知识星球到今天,刚好一周年,我做知识星球还是有一些心得的,在这一年也有巨大成长,并且也帮助了一大批人成长. 1. 说说星球过去的一周年 先上个图 看看我知识星球的现状: 创建365天,主 ...

  8. Python 抓取知识星球内容生成词云并生成 PDF

    知识星球是什么? 知识星球是创作者连接铁杆粉丝,做出高品质社群,实现知识变现的工具.创作者可以用知识星球连接铁杆粉丝,做出高品质社群,实现知识变现. 以上来自知识星球官网的介绍 https://hel ...

  9. 【Python进阶】Python进阶专栏、编程与开源框架知识星球上线,等你来follow

    大家好,今天我将在有三AI开设新专栏<Python进阶>.在这个专栏中,我们会讲述Python的各种进阶操作,包括Python对文件.数据的处理,Python各种好用的库如NumPy.Sc ...

最新文章

  1. Java实现批量修改文件名,重命名
  2. 面试必会系列 - 1.4 类加载机制
  3. android顶部导航高度,Android特效——————底部/顶部导航条(Fragment+ViewPaper+XTabLayout)...
  4. Spring核心组件的理解
  5. WebService:JAX-WS实现WebService
  6. 前沿重器[21] | 聊聊对话系统:概述
  7. 计算机方面的英语学术期刊,近几年计算机专业英文参考文献 计算机专业英文核心期刊参考文献有哪些...
  8. java登录验证_java实现登录验证码
  9. 从EXCEL导入CAD后如何设置表格文字大小?
  10. OKR 如何转变你的绩效管理策略
  11. Qt连接MySql驱动加载失败问题解决方法
  12. excel删除无尽空白行_从0到1快速入门Excel透视表,看这一篇就够了
  13. 谷歌浏览器翻译插件推荐——Google Chrome 插件推荐
  14. 怎样测试企业级SSD
  15. 用fiddler+chrome搞定在线学习网站
  16. 盘点2020年北京市小升初考试关于信息学竞赛的那些事儿!
  17. c语言素数筛法与分解素因数,质因数分解及代码:
  18. USB_HID协议基础
  19. 【大数据架构】浅谈数据中台
  20. OpenCV实例140+ (1 图像处理基础知识)

热门文章

  1. 【微机原理】EU和BIU
  2. Java算法:计算π(圆周率)
  3. 解决远程调用服务超时---IPV(idea)
  4. 那些有趣/有用的 Python 库
  5. python死循环用什么好弄_是什么神药,治好了我的Python循环语句恐惧症?!
  6. 微信小程序项目实例——别踩白块
  7. 基于pygame开发的飞机打陨石小游戏
  8. 小偷偷银行卡破密码_我如何阻止信用卡小偷被盗3,537个人-并在此过程中挽救了我们的非营利组织...
  9. 民主的模式-36个国家的政府形式和政府绩效
  10. 现代政治经济学(引言)