使用scrapy-redis组件写分布式爬虫实战

  • baagee 发布于 2017-09-05 23:29:37
  • 分类:Python
  • 6223 人围观
  • 20 人喜欢

今天摸索了一下使用scrapy-redis组件做一个分布式爬虫,虽然历经坎坷,但是最后终于成功了,也学到了相应的知识。

完整项目地址,更多爬虫练手代码请点此进入gitoschina查看

scrapy-redis 官方github地址

1, scrapy-redis的简单理解

Scrapy 是一个通用的爬虫框架,但是不支持分布式,Scrapy-redis是为了更方便地实现Scrapy分布式爬取,而提供了一些以redis为基础的组件(仅有组件)。

安装:pip install scrapy-redis

Scrapy-redis提供了下面四种组件(components):(四种组件意味着这四个模块都要做相应的修改)

  1. Scheduler
  2. Duplication Filter
  3. Item Pipeline
  4. Base Spider

Scheduler:

Scrapy改造了python本来的collection.deque(双向队列)形成了自己的Scrapy queue(https://github.com/scrapy/queuelib/blob/master/queuelib/queue.py)),但是Scrapy多个spider不能共享待爬取队列Scrapy queue, 即Scrapy本身不支持爬虫分布式,scrapy-redis 的解决是把这个Scrapy queue换成redis数据库(也是指redis队列),从同一个redis-server存放要爬取的request,便能让多个spider去同一个数据库里读取。

Scrapy中跟“待爬队列”直接相关的就是调度器Scheduler,它负责对新的request进行入列操作(加入Scrapy queue),取出下一个要爬取的request(从Scrapy queue中取出)等操作。它把待爬队列按照优先级建立了一个字典结构,比如:

    {

        优先级0 : 队列0

        优先级1 : 队列1

        优先级2 : 队列2

    }

然后根据request中的优先级,来决定该入哪个队列,出列时则按优先级较小的优先出列。为了管理这个比较高级的队列字典,Scheduler需要提供一系列的方法。但是原来的Scheduler已经无法使用,所以使用Scrapy-redis的scheduler组件。

Duplication Filter:

Scrapy中用集合实现这个request去重功能,Scrapy中把已经发送的request指纹放入到一个集合中,把下一个request的指纹拿到集合中比对,如果该指纹存在于集合中,说明这个request发送过了,如果没有则继续操作。

在scrapy-redis中去重是由Duplication Filter组件来实现的,它通过redis的set 不重复的特性,巧妙的实现了Duplication Filter去重。scrapy-redis调度器从引擎接受request,将request的指纹存⼊redis的set检查是否重复,并将不重复的request push写⼊redis的 request queue。

引擎请求request(Spider发出的)时,调度器从redis的request queue队列⾥里根据优先级pop 出一个request 返回给引擎,引擎将此request发给spider处理。

Item Pipeline:

引擎将(Spider返回的)爬取到的Item给Item Pipeline,scrapy-redis 的Item Pipeline将爬取到的 Item 存⼊redis的 items queue。

修改过Item Pipeline可以很方便的根据 key 从 items queue 提取item,从⽽实现 items processes集群。

Base Spider

不在使用scrapy原有的Spider类,重写的RedisSpider继承了Spider和RedisMixin这两个类,RedisMixin是用来从redis读取url的类。

当我们生成一个Spider继承RedisSpider时,调用setup_redis函数,这个函数会去连接redis数据库,然后会设置signals(信号):

一个是当spider空闲时候的signal,会调用spider_idle函数,这个函数调用schedule_next_request函数,保证spider是一直活着的状态,并且抛出DontCloseSpider异常。

一个是当抓到一个item时的signal,会调用item_scraped函数,这个函数会调用schedule_next_request函数,获取下一个request。

Scrapy-Redis分布式策略:

假设有四台电脑:Windows 10、Mac OS X、Ubuntu 16.04、CentOS 7.2,任意一台电脑都可以作为 Master端 或 Slaver端,比如:

Master端(核心服务器) :使用 Windows 10,搭建一个Redis数据库,不负责爬取,只负责url指纹判重、Request的分配,以及数据的存储

Slaver端(爬虫程序执行端) :使用 Mac OS X 、Ubuntu 16.04、CentOS 7.2,负责执行爬虫程序,运行过程中提交新的Request给Master

首先Slaver端从Master端拿任务(Request、url)进行数据抓取,Slaver抓取数据的同时,产生新任务的Request便提交给 Master 处理;

Master端只有一个Redis数据库,负责将未处理的Request去重和任务分配,将处理后的Request加入待爬队列,并且存储爬取的数据。


Scrapy-Redis默认使用的就是这种策略,我们实现起来很简单,因为任务调度等工作Scrapy-Redis都已经帮我们做好了,我们只需要继承RedisSpider、指定redis_key就行了。

缺点是,Scrapy-Redis调度的任务是Request对象,里面信息量比较大(不仅包含url,还有callback函数、headers等信息),可能导致的结果就是会降低爬虫速度、而且会占用Redis大量的存储空间,所以如果要保证效率,那么就需要一定硬件水平。

scrapy-redis架构:


2,电影网站实战

要爬的是一个我经常去找电影的一个网站【点此进入,里面的资源也比较新。感兴趣的可以保存书签,就当做爬了人家网站,顺便帮人家打个广告弥补一下吧。哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈。。。。

步骤就是爬电影列表,通过列表进详情页获取电影信息,包括这个电影的各种资源下载链接,包括百度云,磁力链接,电驴链接等。。

经过分析,我发现在电影详情页中,下载链接是异步(ajax)的方式获取的,所以通过信息详情页是获取不到的,必须在次请求(下载地址url)获取下载链接。


思路有了,接下来写代码就很轻松了;

scrapy startproject xunying_redis 命令创建项目,最后项目完成后结构如下:

baagee@baagee-virtual-machine:~/scrapy_study$ tree xunying_redis/
xunying_redis/
├── scrapy.cfg
└── xunying_redis
    ├── __init__.py
    ├── items.py
    ├── middlewares.py
    ├── pipelines.py
    ├── settings.py
    └── spiders
        ├── __init__.py
        └── movieSpider.py

首先编辑settings文件:

# -*- coding: utf-8 -*-

BOT_NAME = 'xunying_redis'

SPIDER_MODULES = ['xunying_redis.spiders']
NEWSPIDER_MODULE = 'xunying_redis.spiders'

# Obey robots.txt rules
ROBOTSTXT_OBEY = False

# Disable cookies (enabled by default)
COOKIES_ENABLED = False

# Override the default request headers:
DEFAULT_REQUEST_HEADERS = {
    'User-Agent':'Mozilla/5.0 (X11; Linux i686) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/60.0.3112.113 Chrome/60.0.3112.113 Safari/537.36',
    'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
}

# Enable or disable downloader middlewares
# See http://scrapy.readthedocs.org/en/latest/topics/downloader-middleware.html
DOWNLOADER_MIDDLEWARES = {
   'xunying_redis.middlewares.RandomUserAgent': 543,
}

#使用scrapy-redis里面的去重组件.
DUPEFILTER_CLASS = "scrapy_redis.dupefilter.RFPDupeFilter"
# 使用scrapy-redis里面的调度器
SCHEDULER = "scrapy_redis.scheduler.Scheduler"
# 允许暂停后,能保存进度
SCHEDULER_PERSIST = True

# 指定排序爬取地址时使用的队列,
# 默认的 按优先级排序(Scrapy默认),由sorted set实现的一种非FIFO、LIFO方式。
SCHEDULER_QUEUE_CLASS = 'scrapy_redis.queue.SpiderPriorityQueue'
# 可选的 按先进先出排序(FIFO)
# SCHEDULER_QUEUE_CLASS = 'scrapy_redis.queue.SpiderQueue'
# 可选的 按后进先出排序(LIFO)
# SCHEDULER_QUEUE_CLASS = 'scrapy_redis.queue.SpiderStack'

# Configure item pipelines
# See http://scrapy.readthedocs.org/en/latest/topics/item-pipeline.html
ITEM_PIPELINES = {
    'xunying_redis.pipelines.XunyingPipeline': 300,
    # 新增加scrapy_redis pipeline管道
    'scrapy_redis.pipelines.RedisPipeline':400
}

# Mysql数据库的配置信息'
MYSQL_HOST = '127.0.0.1'
MYSQL_DBNAME = 'spider_data'
MYSQL_USER = 'root'
MYSQL_PASSWD = 'root'
MYSQL_PORT = 3306

# 指定redis主机
REDIS_HOST='192.168.117.136'
REDIS_PORT=6379

USER_AGENTS = [
    "Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; Win64; x64; Trident/5.0; .NET CLR 3.5.30729; .NET CLR 3.0.30729; .NET CLR 2.0.50727; Media Center PC 6.0)",
    "Mozilla/5.0 (compatible; MSIE 8.0; Windows NT 6.0; Trident/4.0; WOW64; Trident/4.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; .NET CLR 1.0.3705; .NET CLR 1.1.4322)",
    "Mozilla/4.0 (compatible; MSIE 7.0b; Windows NT 5.2; .NET CLR 1.1.4322; .NET CLR 2.0.50727; InfoPath.2; .NET CLR 3.0.04506.30)",
    "Mozilla/5.0 (Windows; U; Windows NT 5.1; zh-CN) AppleWebKit/523.15 (KHTML, like Gecko, Safari/419.3) Arora/0.3 (Change: 287 c9dfb30)",
    "Mozilla/5.0 (X11; U; Linux; en-US) AppleWebKit/527+ (KHTML, like Gecko, Safari/419.3) Arora/0.6",
    "Mozilla/5.0 (Windows; U; Windows NT 5.1; en-US; rv:1.8.1.2pre) Gecko/20070215 K-Ninja/2.1.1",
    "Mozilla/5.0 (Windows; U; Windows NT 5.1; zh-CN; rv:1.9) Gecko/20080705 Firefox/3.0 Kapiko/3.0",
    "Mozilla/5.0 (X11; Linux i686; U;) Gecko/20070322 Kazehakase/0.4.5",
    'Mozilla/5.0 (X11; Linux i686) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/60.0.3112.113 Chrome/60.0.3112.113 Safari/537.36',
    'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/60.0.3112.113 Safari/537.36',
    'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.104 Safari/537.36 Core/1.53.2372.400 QQBrowser/9.5.11096.400',
    'Mozilla/5.0 (Windows NT 10.0; WOW64; rv:55.0) Gecko/20100101 Firefox/55.0'
]

里面设置说明的很清楚了,我就不说明了。

然后编写下载中间件(middlewares),设置随机请求user-agent头:

# -*- coding: utf-8 -*-
import random
from .settings import USER_AGENTS


# 随机的User-Agent
class RandomUserAgent(object):
    def process_request(self, request, spider):
        userAgent = random.choice(USER_AGENTS)
        request.headers.setdefault("User-Agent", userAgent)

编写items(items主要写包保存的字段名):

# -*- coding: utf-8 -*-
import scrapy


class XunyingItem(scrapy.Item):
    # 名字
    name = scrapy.Field()
    # 又名
    rename = scrapy.Field()
    # 编剧
    screenwriter = scrapy.Field()
    # 导演
    director = scrapy.Field()
    # 主演
    star = scrapy.Field()
    # 类型
    type = scrapy.Field()
    # 地区
    address = scrapy.Field()
    # 语言
    language = scrapy.Field()
    # 时长
    long = scrapy.Field()
    # 豆瓣评分
    douban_score = scrapy.Field()
    IMDB_score = scrapy.Field()
    # 临时存评分
    score = scrapy.Field()
    # 上映时间
    time = scrapy.Field()
    # 介绍
    introduce = scrapy.Field()
    # 标签
    tags = scrapy.Field()
    # 资源
    source = scrapy.Field()

编写movieSpider,爬虫主体

# -*- coding: utf-8 -*-
import scrapy
from scrapy.linkextractors import LinkExtractor
from scrapy.spiders import Rule
from scrapy_redis.spiders import RedisCrawlSpider
from xunying_redis.items import XunyingItem
import re


class MoviespiderSpider(RedisCrawlSpider):
    name = 'movieSpider'
    allowed_domains = ['www.xunyingwang.com']

    redis_key = 'movieSpider:start_urls'

    rules = (
        Rule(LinkExtractor(allow='movie\/\?page=\d+'), follow=True),
        Rule(LinkExtractor(allow='movie\/\d+.html'), callback='parse_item', follow=True),
    )

    def parse_item(self, response):
        item = XunyingItem()
        # 电影名
        item['name'] = response.xpath('/html/body/div[1]/div/div/div[1]/h1/text()').extract()[0].strip()
        introduces = response.xpath('/html/body/div[1]/div/div/div[1]/div[2]/div[2]/p/text()').extract()
        # 介绍
        item['introduce'] = ''.join(introduces).strip()
        tags = response.xpath('/html/body/div[1]/div/div/div[1]/div[3]/div[2]/a/text()').extract()
        item['tags'] = ''.join(tags)
        # 其他信息
        info = response.xpath('/html/body/div[1]/div/div/div[1]/div[1]/div[2]/table/tbody/tr')
        kMap = {
            '评分': 'score',
            '编剧': 'screenwriter',
            '主演': 'star',
            '地区': 'address',
            '上映时间': 'time',
            '片长': 'long',
            '类型': 'type',
            '又名': 'rename',
            '导演': 'director',
            '语言': 'language'
        }
        for i in info:
            k = i.xpath('./td[1]/span/text()').extract()[0]
            v = ''.join(i.xpath('./td[2]/a/text() | ./td[2]/text()').extract())
            item[kMap.get(k)] = v.strip().replace('/  显示全部', '')
        # yield item
        pattern = re.compile(r'\d+')
        # 电影id
        id = pattern.search(response.url).group()
        baseUrl = 'http://www.xunyingwang.com/videos/resList/'
        yield scrapy.Request(baseUrl + id, meta={'item': item}, callback=self.getDownload)

    # 获取电影下载链接
    def getDownload(self, response):
        item = response.meta['item']
        trs1 = response.xpath('//*[@id="normalDown"]/div/table/tbody/tr')
        trs2 = response.xpath('//*[@id="sourceDown"]/div/table/tbody/tr')
        source = {}
        if trs1:
            source_1 = []
            for tr in trs1:
                tmp = {}
                _name = tr.xpath('./td[1]/span/text()').extract()[0]
                _href = tr.xpath('./td[2]/div/a/@href').extract()[0]
                tmp['name'] = _name
                tmp['source'] = _href
                if _name == '网盘':
                    _pass = tr.xpath('./td[2]/div/strong/text()').extract()[0]
                    tmp['pass'] = _pass
                else:
                    _title = tr.xpath('./td[2]/div/a/text()').extract()[0]
                    tmp['title'] = _title
                    source_1.append(tmp)
                source['normalDown'] = source_1

        if trs2:
            source_1 = []
            for tr in trs2:
                tmp = {}
                _name = tr.xpath('./td[1]/span/text()').extract()[0]
                _href = tr.xpath('./td[2]/div/a/@href').extract()[0]
                tmp['name'] = _name
                tmp['source'] = _href
                if _name == '网盘':
                    _pass = tr.xpath('./td[2]/div/strong/text()').extract()[0]
                    tmp['pass'] = _pass
                else:
                    _title = tr.xpath('./td[2]/div/a/text()').extract()[0]
                    tmp['title'] = _title
                    source_1.append(tmp)
                source['sourceDown'] = source_1

        item['source'] = str(source)
        yield item

这里和继承CrawlSpider写法略有不同,继承RedisCrawlSpider类,去掉了start_urls加入了redis_key字段,redis_key值是任意字符串,不过为了规范一般用“类名:start_urls”

注意:

RedisSpider类 不需要写start_urls

必须指定redis_key,即启动爬虫的命令,参考格式:redis_key = '类名:start_urls'

根据指定的格式,start_urls将在 Master端的 redis-cli 里 lpush 到 Redis数据库里,RedisSpider 将在数据库里获取start_urls。

pipelines代码:

# -*- coding: utf-8 -*-

import pymysql
from scrapy.utils.project import get_project_settings
import re

class XunyingPipeline(object):
    # def __init__(self):
    #     # 连接数据库
    #     config = {
    #         'host': get_project_settings().get("MYSQL_HOST"),
    #         'port': get_project_settings().get("MYSQL_PORT"),
    #         'user': get_project_settings().get("MYSQL_USER"),
    #         'password': get_project_settings().get("MYSQL_PASSWD"),
    #         'db': get_project_settings().get("MYSQL_DBNAME"),
    #         'charset': 'utf8',
    #         'cursorclass': pymysql.cursors.DictCursor
    #     }
    #     self.conn = pymysql.connect(**config)
    #     self.cur = self.conn.cursor()

    def process_item(self, item, spider):
        i = dict(item)
        if i.get('long',''):
            com = re.compile(r'\d+')
            long = com.findall(i['long'])[0]
            item['long'] = int(long)
            # i['long'] = int(long)
        if i.get('score', ''):
            sMap = {
                '豆瓣': 'douban_score',
                'IMDB': 'IMDB_score'
            }
            tmp=i['score'].split(' / ')#['豆瓣 6.2','IMDB 6.6']
            if len(tmp):
                for t in tmp:
                    tt=t.split(' ')
                    if tt[1]!='N/A':
                        item[sMap[tt[0]]]=float(tt[1])
                        # i[sMap[tt[0]]]=float(tt[1])
        del item['score']
        # del i['score']
        # if not self.check_item_exists(i.get('name')):
        #     # 如果不存在就插入
        #     self.insert_into_table(i)
        # else:
        #     print('此条数据已存在,不插入')
        return item

    # def insert_into_table(self, i):
    #     sql = "INSERT INTO `movies` (`name`, `rename`, `screenwriter`, `director`, `star`, `type`, `address`, `language`, `long`, `douban_score`, `IMDB_score`, `introduce`, `time`, `tags`,`source`) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)"
    #     insertData = [
    #         i.get('name', ''),
    #         i.get('rename', ''),
    #         i.get('screenwriter', ''),
    #         i.get('director', ''),
    #         i.get('star', ''),
    #         i.get('type', ''),
    #         i.get('address', ''),
    #         i.get('language', ''),
    #         i.get('long', ''),
    #         i.get('douban_score', ''),
    #         i.get('IMDB_score', ''),
    #         i.get('introduce', ''),
    #         i.get('time', ''),
    #         i.get('tags', ''),
    #         i.get('source', '')
    #     ]
    #     res = self.cur.execute(sql, insertData)
    #     if res == 1:
    #         self.conn.commit()
    #         print('成功插入 %d 条数据' % self.cur.rowcount)
    #     else:
    #         print('>>>>>>>>>>>>>>>>>数据没插入<<<<<<<<<<<<<<<<<')
    #
    # # 检查数据是否存在
    # def check_item_exists(self, name):
    #     sql = "SELECT id FROM `movies` WHERE `name` = %s"
    #     self.cur.execute(sql, [name])
    #     count = self.cur.rowcount
    #     print('共查找出 %d 条数据' % count)
    #     if count > 0:
    #         return True
    #     else:
    #         return False
    #
    # # 关闭数据库链接
    # def close_spider(self, spider):
    #     self.cur.close()
    #     self.conn.close()

这个pipeline中是scrapy中自带的,在settings中我把它设置成优先级比redispipeline高,数据会先经过这个pipeline管道处理,然后再进入redispipeline中处理。在这个管道里你可以对数据做一些处理(处理评分部分),然后保存到mysql或者其他数据库中。我把他们全注释了。

然后数据就进入redispipeline中了,保存到redis数据库中。

运行代码:

最后就是将写好的代码上传到各个服务器中。这里以我的vmware虚拟机中的ubuntu16系统为master端,vagrant中的centos7和ubuntu14位slave端,master端不爬取数据,只是运行redis,保存数据用的,二两个slave端则爬取数据保存到master端的redis数据库,为了能吧数据保存到master,所以两个slave端代码settings中设置REDIS_HOST和REDIS_PORT要设置成master端的ip和端口。

执行方式:

通过runspider方法执行爬虫的py文件,爬虫(们)将处于等待准备状态:

scrapy runspider xunying_spider.py

在Master端的redis-cli输入push指令,参考格式:

lpush movieSpider:start_urls http://www.xunyingwang.com/movie/?page=1

爬虫获取url,开始执行。

注意:push指令中movieSpider:start_urls这个就是movieSpider中设置的redis_key,必须一致,不然爬虫不能运行。

截图示例:

左边的两个slave服务器处于等待状态,当右边的主服务push一个url入口,左边的两个便有一个获取到这个链接的request,然后返回更多的request到请求队列中,然后两个slave服务器便开始从队列获取请求开始爬了。


redis数据库保存的数据如图所示:


保存到redis里面的数据你可以写个脚本把他们导出到mysql或者mongodb中,这是很简单的,我就不上代码了,因为这篇文章篇幅很长了,就到此为止...


评论

点击图片切换
还没有评论,快来抢沙发吧!