通过《Colly源码解析——框架》分析,我们可以知道Colly执行的主要流程。本文将结合http://go-colly.org上的例子分析一些高级设置的底层实现。(转载请指明出于breaksoftware的csdn博客)

递归深度

以下例子截取于Basic

 c := colly.NewCollector(// Visit only domains: hackerspaces.org, wiki.hackerspaces.orgcolly.AllowedDomains("hackerspaces.org", "wiki.hackerspaces.org"),)// On every a element which has href attribute call callbackc.OnHTML("a[href]", func(e *colly.HTMLElement) {link := e.Attr("href")// Print linkfmt.Printf("Link found: %q -> %s\n", e.Text, link)// Visit link found on page// Only those links are visited which are in AllowedDomainsc.Visit(e.Request.AbsoluteURL(link))})

c是Collector指针,它的Visit方法给scrape传递的“深度”值是1。

func (c *Collector) Visit(URL string) error {return c.scrape(URL, "GET", 1, nil, nil, nil, true)
}

由于NewCollector构造的Collector.MaxDepth为0,而在scrape方法内部调用的requestCheck中,如果此值为0,则不会去做深度检测

// requestCheck methodif c.MaxDepth > 0 && c.MaxDepth < depth {return ErrMaxDepth}

如果希望通过MaxDepth控制深度,则可以参见Max depth例子

 c := colly.NewCollector(// MaxDepth is 1, so only the links on the scraped page// is visited, and no further links are followedcolly.MaxDepth(1),)// On every a element which has href attribute call callbackc.OnHTML("a[href]", func(e *colly.HTMLElement) {link := e.Attr("href")// Print linkfmt.Println(link)// Visit link found on pagee.Request.Visit(link)})

第4行将深度设置为1,这样理论上只能访问第一层的URL。

如果OnHTML中的代码和Basic例子一样,即使用Collector的Visit访问URL,则由于其depth一直传1,而导致requestCheck的深度检测一直不满足条件,从而会访问超过1层的URL。

所以第13行,调用的是HTMLElement的Visit方法

func (r *Request) Visit(URL string) error {return r.collector.scrape(r.AbsoluteURL(URL), "GET", r.Depth+1, nil, r.Ctx, nil, true)
}

相较于Collector的Visit,HTMLElement的Visit方法将Depth增加了1,并且传递了请求的上下文(ctx)。由于depth有变化,所以之后的深度检测会返回错误,从而只会访问1层URL。

规则

Collector的Limit方法用于设置各种规则。这些规则最终在Collector的httpBackend成员中执行。

一个Collector只有一个httpBackend结构体指针,而一个httpBackend结构体可以有一组规则

type httpBackend struct {LimitRules []*LimitRuleClient     *http.Clientlock       *sync.RWMutex
}

规则针对Domain来区分,我们可以通过设定不同的匹配规则,让每组URL执行相应的操作。这些操作包括:

  • 访问并行数
  • 访问间隔延迟

参见Parallel例子。只截取其中关键一段

 // Limit the maximum parallelism to 2// This is necessary if the goroutines are dynamically// created to control the limit of simultaneous requests.//// Parallelism can be controlled also by spawning fixed// number of go routines.c.Limit(&colly.LimitRule{DomainGlob: "*", Parallelism: 2})

Collector的Limit最终会调用到httpBackend的Limit,它将规则加入到规则组后初始化该规则。

// Init initializes the private members of LimitRule
func (r *LimitRule) Init() error {waitChanSize := 1if r.Parallelism > 1 {waitChanSize = r.Parallelism}r.waitChan = make(chan bool, waitChanSize)hasPattern := falseif r.DomainRegexp != "" {c, err := regexp.Compile(r.DomainRegexp)if err != nil {return err}r.compiledRegexp = chasPattern = true}if r.DomainGlob != "" {c, err := glob.Compile(r.DomainGlob)if err != nil {return err}r.compiledGlob = chasPattern = true}if !hasPattern {return ErrNoPattern}return nil
}

第7行创建了一个可以承载waitChanSize个元素的channel。可以看到,如果我们在规则中没有设置并行数,也会创建只有1个元素的channel。这个channel会被用于调节并行执行的任务数量。所以这也就意味着,一旦调用了Limit方法而没设置Parallelism值,该Collector中针对符合规则的请求就会变成串行的。

第10和18行分别针对不同规则初始化一个编译器。因为这个操作比较重,所以在初始化时执行,之后只是简单使用这些编译器即可。

当发起请求时,流程最终会走到httpBackend的Do方法

func (h *httpBackend) Do(request *http.Request, bodySize int) (*Response, error) {r := h.GetMatchingRule(request.URL.Host)if r != nil {r.waitChan <- truedefer func(r *LimitRule) {randomDelay := time.Duration(0)if r.RandomDelay != 0 {randomDelay = time.Duration(rand.Int63n(int64(r.RandomDelay)))}time.Sleep(r.Delay + randomDelay)<-r.waitChan}(r)}

第2行通过域名查找对应的规则,如果找到,则在第4行尝试往channel中加入元素。这个操作相当于上锁。如果channel此时是满的,则该流程会被挂起。否则就执行之后的流程。在Do函数结束,命中规则的会执行上面的匿名函数,它在休眠规则配置的时间后,尝试从channel中获取数据。这个操作相当于释放锁。

Colly就是通过channel的特性实现了并行控制。

并行

在“规则”一节,我们讲到可以通过Parallelism控制并行goroutine的数量。httpBackend的Do方法最终将被Collector的fetch方法调用,而该方法可以被异步执行,即是一个goroutine。这就意味着承载Do逻辑的goroutine执行完毕后就会退出。而一种类似线程的技术在Colly也被支持,它更像一个生产者消费者模型。消费者线程执行完一个任务后不会退出,而在生产者生产出的物料池中取出未处理的任务加以处理。

以下代码截取于Queue

 q, _ := queue.New(2, // Number of consumer threads&queue.InMemoryQueueStorage{MaxSize: 10000}, // Use default queue storage)……for i := 0; i < 5; i++ {// Add URLs to the queueq.AddURL(fmt.Sprintf("%s?n=%d", url, i))}// Consume URLsq.Run(c)

这次没有调用Collector的Visit等函数,而是调用了Queue的Run。

第2行创建了一个具有2个消费者(goroutine)的Queue。第10行预先给这个Queue加入5个需要访问的URL。

// AddURL adds a new URL to the queue
func (q *Queue) AddURL(URL string) error {u, err := url.Parse(URL)if err != nil {return err}r := &colly.Request{URL:    u,Method: "GET",}d, err := r.Marshal()if err != nil {return err}return q.storage.AddRequest(d)
}

AddUrl的第11行将请求序列化,在第15行将该序列化数据保存到“仓库”中。

在Run方法中,Colly将启动2个goroutine。注意它是使用for循环组织的,这意味着如果for内无break,它会一直循环执行下去——不退出。

func (q *Queue) Run(c *colly.Collector) error {wg := &sync.WaitGroup{}for i := 0; i < q.Threads; i++ {wg.Add(1)go func(c *colly.Collector, wg *sync.WaitGroup) {defer wg.Done()for {

如果队列中没有需要处理的request,则会尝试退出

             if q.IsEmpty() {if q.activeThreadCount == 0 {break}ch := make(chan bool)q.lock.Lock()q.threadChans = append(q.threadChans, ch)q.lock.Unlock()action := <-chif action == stop && q.IsEmpty() {break}}

activeThreadCount表示当前运行中的消费者goroutine数量。如果已经没有消费者了,则直接跳出for循环,整个goroutine结束。

如果还有消费者,则创建一个channel,并将其加入到q.threadChans的channel切片中。然后在第9行等待该channel被写入值。如果写入的是true并且此时没有需要处理的request,则退出goroutine。可以看到这段逻辑检测了两次是否有request,这个我们之后再讨论。

如果还有request要处理,则递增消费者数量(在finish中会递减以抵消)。然后从“仓库”中取出一个任务,在通过Request的Do方法发起请求,最后调用finish方法善后。

             q.lock.Lock()atomic.AddInt32(&q.activeThreadCount, 1)q.lock.Unlock()rb, err := q.storage.GetRequest()if err != nil || rb == nil {q.finish()continue}r, err := c.UnmarshalRequest(rb)if err != nil || r == nil {q.finish()continue}r.Do()q.finish()}}(c, wg)}wg.Wait()return nil
}

finish方法干了三件事:

  1. 递减消费者数量,以抵消Run方法中的递增。
  2. 将Queue的各个等待中的,其他goroutine创建的channel传入true值,即告知他们可以退出了。
  3. 给Queue创建一个空的channel切片
func (q *Queue) finish() {q.lock.Lock()q.activeThreadCount--for _, c := range q.threadChans {c <- stop}q.threadChans = make([]chan bool, 0, q.Threads)q.lock.Unlock()
}

我们再看下怎么在请求的过程中给Queue增加任务

// AddRequest adds a new Request to the queue
func (q *Queue) AddRequest(r *colly.Request) error {d, err := r.Marshal()if err != nil {return err}if err := q.storage.AddRequest(d); err != nil {return err}q.lock.Lock()for _, c := range q.threadChans {c <- !stop}q.threadChans = make([]chan bool, 0, q.Threads)q.lock.Unlock()return nil
}

第3~9行,会将请求序列化后保存到“仓库”中。

第10~15行,会将其他goroutine创建的channel传入false,告知它们不要退出。然后再创建一个空的channel切片。

finish和AddRequest都使用锁锁住了所有的逻辑,而且它们都会把其他goroutine创建的channel传入值,然后将Queue的channel切片清空。这样就保证这些channel只可能收到一种状态。由于它自己创建的channel是在finish调用完之后才有机会创建出来,所以不会造成死锁。

再回来看goroutine退出的逻辑

             if q.IsEmpty() {if q.activeThreadCount == 0 {break}ch := make(chan bool)q.lock.Lock()q.threadChans = append(q.threadChans, ch)q.lock.Unlock()action := <-chif action == stop && q.IsEmpty() {break}}

如果finish方法中递减的activeThreadCount为0,这说明这是最后一个goroutine了,而且当前也没request,所以退出。当然此时存在一种可能:在1行执行结束后,其他非消费者goroutine调用AddRequest新增了若干request。而执行第2行时,goroutine将退出,从而导致存在request没有处理的可能。

如果还存在其他goroutine,则本goroutine将在第5行创建一个channel,并将这个channel加入到Queue的channel切片中。供其他goroutine调用finish往channel中传入true,或者AddRequest传入false,调控是否需要退出本过程。在第9行等待channel传出数据前,可能存在如下几种情况:

  1. 执行了finish
  2. 执行了AddRequest
  3. 执行了finish后执行了AddRequest
  4. 执行了AddRequest后执行了finish

如果是第1和4种,action将是false。第2和3种,action是true。但是这个情况下不能单纯的通过action决定是否退出。因为第9和10行执行需要时间,这段时间其他goroutine可能还会执行AddRequest新增任务,或者GetRequest删除任务。所以还要在第10行检测下IsEmpty。

这段是我阅读Colly中思考的最多的代码,因为有goroutine和channel,导致整个逻辑比较复杂。也感慨下,虽然goroutine很方便,但是真的能把它写对也是不容易的。

分布式

在Queue例子中,我们看到“仓库”这个概念。回顾下Queue的例子,“仓库”是InMemoryQueueStorage。顾名思义,它是一个内存型的仓库,所以不存在分布式基础。

 // create a request queue with 2 consumer threadsq, _ := queue.New(2, // Number of consumer threads&queue.InMemoryQueueStorage{MaxSize: 10000}, // Use default queue storage)

一个分布式的例子是Redis backend,截取一段

 // create the redis storagestorage := &redisstorage.Storage{Address:  "127.0.0.1:6379",Password: "",DB:       0,Prefix:   "httpbin_test",}// add storage to the collectorerr := c.SetStorage(storage)if err != nil {panic(err)}// delete previous data from storageif err := storage.Clear(); err != nil {log.Fatal(err)}// close redis clientdefer storage.Client.Close()// create a new request queue with redis storage backendq, _ := queue.New(2, storage)

这儿创建了一个redis型的仓库。不仅Collector的Storage是它,Queue的Storage也是它。这样一个集群上的服务都往这个仓库里存入和取出数据,从而实现分布式架构。

redisstorage库引自github.com/gocolly/redisstorage。我们查看其源码,其实现了Collector的storage需要的接口

type Storage interface {// Init initializes the storageInit() error// Visited receives and stores a request ID that is visited by the CollectorVisited(requestID uint64) error// IsVisited returns true if the request was visited before IsVisited// is calledIsVisited(requestID uint64) (bool, error)// Cookies retrieves stored cookies for a given hostCookies(u *url.URL) string// SetCookies stores cookies for a given hostSetCookies(u *url.URL, cookies string)
}

以及Queue的storage需要的

// Storage is the interface of the queue's storage backend
type Storage interface {// Init initializes the storageInit() error// AddRequest adds a serialized request to the queueAddRequest([]byte) error// GetRequest pops the next request from the queue// or returns error if the queue is emptyGetRequest() ([]byte, error)// QueueSize returns with the size of the queueQueueSize() (int, error)
}

Colly源码解析——结合例子分析底层实现相关推荐

  1. Gin源码解析和例子——中间件(middleware)

    在<Gin源码解析和例子--路由>一文中,我们已经初识中间件.本文将继续探讨这个技术.(转载请指明出于breaksoftware的csdn博客) Gin的中间件,本质是一个匿名回调函数.这 ...

  2. Colly源码解析——框架

    Colly是一个使用golang实现的数据抓取框架,我们可以使用它快速搭建类似网络爬虫这样的应用.本文我们将剖析其源码,以探析其中奥秘.(转载请指明出于breaksoftware的csdn博客) Co ...

  3. Gin源码解析和例子——路由

    Gin是一个基于golang的net包实现的网络框架.从github上,我们可以看到它相对于其他框架而言,具有优越的性能.本系列将从应用的角度来解析其源码.(转载请指明出于breaksoftware的 ...

  4. JDK1.8 HashMap源码解析(不分析红黑树部分)

    一.HashMap数据结构 HashMap由 数组+链表+红黑树实现,桶中元素可能为链表,也可能为红黑树.为了提高综合(查询.添加.修改)效率,当桶中元素数量超过TREEIFY_THRESHOLD(默 ...

  5. Java SPI 源码解析及 demo 讲解

    点击上方 好好学java ,选择 星标 公众号 重磅资讯.干货,第一时间送达 今日推荐:Java实现QQ登录和微博登录个人原创+1博客:点击前往,查看更多 作者:JlDang 来源:https://s ...

  6. datax源码解析-JobContainer的初始化阶段解析

    datax源码解析-JobContainer的初始化阶段解析 写在前面 此次源码分析的版本是3.0.因为插件是datax重要的组成部分,源码分析过程中会涉及到插件部分的源码,为了保持一致性,插件都已大 ...

  7. java treeset原理_Java集合 --- TreeSet底层实现和原理(源码解析)

    概述 文章的内容基于JDK1.7进行分析,之所以选用这个版本,是因为1.8的有些类做了改动,增加了阅读的难度,虽然是1.7,但是对于1.8做了重大改动的内容,文章也会进行说明. TreeSet实现了S ...

  8. Spring源码解析 - AbstractBeanFactory 实现接口与父类分析

    2019独角兽企业重金招聘Python工程师标准>>> 我们先来看类图吧: 除了BeanFactory这一支的接口,AbstractBeanFactory主要实现了AliasRegi ...

  9. kcp 介绍与源代码分析_KCP-GO源码解析

    原标题:KCP-GO源码解析 原文作者:张伯雨 golang技术社区 概念 ARQ:自动重传请求(Automatic Repeat-reQuest,ARQ)是OSI模型中数据链路层的错误纠正协议之一. ...

最新文章

  1. 洛谷mNOIP模拟赛Day2-入阵曲
  2. redis特点单进程单线程高性能服务器,Redis为什么是单线程?Redis又为什么这么快!...
  3. Codeforces Round #604 (Div. 2) E. Beautiful Mirrors 期望dp
  4. 微服务应用容器化场景中常见问题总结
  5. linux ssd hdd 缓存,ArchLinux 部署 SSD 缓存
  6. CString 和 char * 的相互转换
  7. 机器学习—XGboost的原理、工程实现与优缺点
  8. DDC——Deep Domain Confusion Maximizing for Domain Invariance
  9. Nginx工作原理及基本使用
  10. 计算机无法安装VC2015,解决win10安装vc ++2015提示“一个或多个问题导致了安装失败”的方法...
  11. 2022 数学建模B题 高教社杯 含半成品论文 部分代码 全部数学模型 和全套思路
  12. OPNsense用户手册中文版
  13. Lipschitz常数、Lipschitz条件
  14. 程序员深爱的bilibili后台源码泄露,看哔哩哔哩官方回应才放心了
  15. JAVA格式代码出现两次_NullPointerException使用JMockit测试Mocked java.io.File两次
  16. 应急照明市电检测_应急照明接线方式,这几点你必须知道!
  17. rockchip研讨会_地下在线研讨会6
  18. 微搭医美美容小程序官方模板解析
  19. python的函数式编程实例_函数式编程例子
  20. Web前端框架学习—Bootstrap

热门文章

  1. Python OpenCV GrabCut进行前景分割和提取
  2. SIFT和SURF的替换算法——ORB (Oriented FAST and Rotated BRIEF 快速定向和旋转)
  3. deepspeech实时语音识别
  4. php ob_flush无效,php ob_flush,flush在ie中缓冲无效的解决方法
  5. dist包编译html_gulp4 多页面项目管理打包(html, es6,less编译压缩版本控制)
  6. LeetCode刷题记录8——605. Can Place Flowers(easy)
  7. Unity Pro builder创建模块化仓库建筑学习教程
  8. 读书:有趣 -- 酒鬼与圣徒
  9. Linux虚拟机连不上网
  10. ubuntu chm文档阅读器