爬虫进阶:Scrapy抓取科技平台Zealer

本文涉及的产品
云数据库 Redis 版,社区版 2GB
推荐场景:
搭建游戏排行榜
简介: 开篇  这次的目标网站也是本人一直以来有在关注的科技平台:Zealer,爬取的信息包括全部的科技资讯以及相应的评论。默认配置下运行,大概跑了半个多小时,最终抓取了5000+的资讯以及10几万的评论。

开篇

  这次的目标网站也是本人一直以来有在关注的科技平台:Zealer,爬取的信息包括全部的科技资讯以及相应的评论。默认配置下运行,大概跑了半个多小时,最终抓取了5000+的资讯以及10几万的评论。

Zealer Media
Zealer Media

说明及准备

  开发环境:Scrapy、Redis、PostgreSQL

  数据库表:tb_zealer_series、tb_zealer_media、tb_zealer_comment

  下面对上述每张表进行简要说明:

  • tb_zealer_series,用于存放不同科技频道信息:
tb_zealer_series
tb_zealer_series
-- ----------------------------
-- Table structure for tb_zealer_series
-- ----------------------------
DROP TABLE IF EXISTS "public"."tb_zealer_series";
CREATE TABLE "public"."tb_zealer_series" (
  "id" serial2,
  "name" varchar(25) COLLATE "pg_catalog"."default" NOT NULL,
  "cp" int2 NOT NULL,
  "platform" varchar(20) COLLATE "pg_catalog"."default" NOT NULL,
  "enabled" bool NOT NULL
)
;
COMMENT ON COLUMN "public"."tb_zealer_series"."id" IS '主键id';
COMMENT ON COLUMN "public"."tb_zealer_series"."name" IS '系列名称';
COMMENT ON COLUMN "public"."tb_zealer_series"."cp" IS '标志id';
COMMENT ON COLUMN "public"."tb_zealer_series"."platform" IS '平台类型';
COMMENT ON COLUMN "public"."tb_zealer_series"."enabled" IS '开启标志';

-- ----------------------------
-- Indexes structure for table tb_zealer_series
-- ----------------------------
CREATE UNIQUE INDEX "uni_cp" ON "public"."tb_zealer_series" USING btree (
  "cp" "pg_catalog"."int2_ops" ASC NULLS LAST
);
  • tb_zealer_media,用于保存科技资讯的表:
tb_zealer_media
tb_zealer_media
-- ----------------------------
-- Table structure for tb_zealer_media
-- ----------------------------
DROP TABLE IF EXISTS "public"."tb_zealer_media";
CREATE TABLE "public"."tb_zealer_media" (
  "id" serial4,
  "series_id" int2 NOT NULL,
  "post_id" int4 NOT NULL,
  "title" varchar(100) COLLATE "pg_catalog"."default" NOT NULL,
  "desc" varchar(1500) COLLATE "pg_catalog"."default",
  "label" varchar(100) COLLATE "pg_catalog"."default",
  "cover_picture" varchar(150) COLLATE "pg_catalog"."default",
  "media_info" json,
  "comment_num" int4 NOT NULL,
  "detail_url" varchar(100) COLLATE "pg_catalog"."default" NOT NULL,
  "live_time" varchar(10) COLLATE "pg_catalog"."default",
  "create_time" timestamp(6) NOT NULL
)
;
COMMENT ON COLUMN "public"."tb_zealer_media"."id" IS '主键id';
COMMENT ON COLUMN "public"."tb_zealer_media"."series_id" IS '系列id';
COMMENT ON COLUMN "public"."tb_zealer_media"."post_id" IS '唯一标志';
COMMENT ON COLUMN "public"."tb_zealer_media"."title" IS '标题';
COMMENT ON COLUMN "public"."tb_zealer_media"."desc" IS '描述';
COMMENT ON COLUMN "public"."tb_zealer_media"."label" IS '标签';
COMMENT ON COLUMN "public"."tb_zealer_media"."cover_picture" IS '封面图片';
COMMENT ON COLUMN "public"."tb_zealer_media"."media_info" IS '素材信息';
COMMENT ON COLUMN "public"."tb_zealer_media"."comment_num" IS '评论数量';
COMMENT ON COLUMN "public"."tb_zealer_media"."detail_url" IS '详细地址';
COMMENT ON COLUMN "public"."tb_zealer_media"."live_time" IS '视频时长';
COMMENT ON COLUMN "public"."tb_zealer_media"."create_time" IS '入库时间';

-- ----------------------------
-- Indexes structure for table tb_zealer_media
-- ----------------------------
CREATE UNIQUE INDEX "uni_post_id" ON "public"."tb_zealer_media" USING btree (
  "post_id" "pg_catalog"."int4_ops" ASC NULLS LAST
);
  • tb_zealer_comment,保存每条资讯相应的评论信息:
tb_zealer_comment
tb_zealer_comment
-- ----------------------------
-- Table structure for tb_zealer_comment
-- ----------------------------
DROP TABLE IF EXISTS "public"."tb_zealer_comment";
CREATE TABLE "public"."tb_zealer_comment" (
  "id" serial4,
  "post_id" int4 NOT NULL,
  "username" varchar(50) COLLATE "pg_catalog"."default" NOT NULL,
  "avatar" varchar(150) COLLATE "pg_catalog"."default" NOT NULL,
  "content" varchar(1500) COLLATE "pg_catalog"."default" NOT NULL,
  "comment_time" timestamp(6) NOT NULL,
  "create_time" timestamp(6) NOT NULL,
  "user_id" int4 NOT NULL
)
;
COMMENT ON COLUMN "public"."tb_zealer_comment"."id" IS '主键id';
COMMENT ON COLUMN "public"."tb_zealer_comment"."post_id" IS '唯一标志';
COMMENT ON COLUMN "public"."tb_zealer_comment"."username" IS '用户名';
COMMENT ON COLUMN "public"."tb_zealer_comment"."avatar" IS '用户头像';
COMMENT ON COLUMN "public"."tb_zealer_comment"."content" IS '评论内容';
COMMENT ON COLUMN "public"."tb_zealer_comment"."comment_time" IS '评论时间';
COMMENT ON COLUMN "public"."tb_zealer_comment"."create_time" IS '入库时间';
COMMENT ON COLUMN "public"."tb_zealer_comment"."user_id" IS '用户id';

-- ----------------------------
-- Indexes structure for table tb_zealer_comment
-- ----------------------------
CREATE UNIQUE INDEX "uni_uid_pid_ctime" ON "public"."tb_zealer_comment" USING btree (
  "user_id" "pg_catalog"."int4_ops" ASC NULLS LAST,
  "post_id" "pg_catalog"."int4_ops" ASC NULLS LAST,
  "create_time" "pg_catalog"."timestamp_ops" ASC NULLS LAST
);

抓取"科技频道"信息

  考虑到这块的信息比较少且固定(如下图红框所示),所以用Request+BeautifulSoup提前获取。

Zealer - Media
Zealer - Media
Zealer - X
Zealer - X
import app
import requests
from bs4 import BeautifulSoup
from zealer.service import sql

# BeautifulSoup+Request获取所有系列

postgres = app.postgres()
index = 'http://www.zealer.com/list?platform={}'
platforms = ['media', 'x']  # 对应Zealer官方(MEDIA)和达人专区(X)
for platform in platforms:
    resp = requests.get(index.format(platform))
    bs = BeautifulSoup(resp.text, 'html.parser')
    nav_list = bs.find('p', class_='nav_inner')
    for nav in nav_list.find_all('a', class_=''):
        name, nav_url = nav.text, nav.get('href')
        cp = int(nav_url.split('cp=')[1])
        postgres.handler(sql.save_series(), (name, cp, platform, True))

环境搭建

  新建项目:scrapy startproject zealer

  新建爬虫:scrapy genspider tech zealer.com

Item定义

# -*- coding: utf-8 -*-

# Define here the models for your scraped items
#
# See documentation in:
# https://doc.scrapy.org/en/latest/topics/items.html

from scrapy import Item, Field


class MediaItem(Item):
    # define the fields for your item here like:
    # name = scrapy.Field()
    postId = Field()
    seriesId = Field()
    title = Field()
    desc = Field()
    label = Field()
    coverPicture = Field()
    mediaInfo = Field()
    commentNum = Field()
    detailUrl = Field()
    liveTime = Field()
    pass


class CommentItem(Item):
    userId = Field()
    postId = Field()
    username = Field()
    avatar = Field()
    content = Field()
    commentTime = Field()
    pass

编写爬虫

  先分析下页面数据的渲染形式,通过"开发者工具" -> "Network"查看,相应的资讯以及评论数据都是请求接口获得json后再进行展示的,因此直接请求这两个接口就可以了,参考资讯接口示例 && 评论接口示例,其中资讯接口中的cid表示不同的科技频道,上面已经获取到了保存在tb_zealer_series这个表中,page分页从1开始,评论接口的id参数通过资讯接口获得。

  这里注释掉默认给出的start_urls = ['http://zealer.com/'],然后重写start_requests方法来定义起始爬取逻辑。由于上述两个接口中并没有返回任何终止的条件,所以这里用比较曲折的方法来自行加判断解决:

# -*- coding: utf-8 -*-
import sys
import json
import math
import scrapy
from utils import mytime
from scrapy import Request
from bs4 import BeautifulSoup
from zealer.service import app, sql
from scrapy.loader import ItemLoader
from scrapy.loader.processors import TakeFirst
from zealer.items import MediaItem, CommentItem


class TechSpider(scrapy.Spider):
    name = 'tech'
    allowed_domains = ['zealer.com']

    # start_urls = ['http://zealer.com/']

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.postgres = app.postgres()
        self.series_list = self.postgres.fetch_all(sql.get_series())
        self.series_stop = set()  # 用于判断Media抓取终止
        self.max_page = sys.maxsize
        self.post = 'http://www.zealer.com/post/{}'
        self.sift = 'http://www.zealer.com/x/sift?cid={}&page={}&order=created_at'
        self.comment = 'http://www.zealer.com/Post/comment?id={}&page={}'

    def start_requests(self):
        for series in self.series_list:
            series_id, cp = series[0], series[1]
            for page in range(1, self.max_page):
                if series_id in self.series_stop:
                    self.logger.warning('Stop Media: {}'.format(series_id))
                    self.series_stop.discard(series_id)
                    break
                else:
                    sift = self.sift.format(cp, page)
                    yield Request(sift, callback=self.parse, meta={'series_id': series_id})

    def parse(self, response):
        """解析请求资讯接口返回的JSON数据"""

        data = json.loads(response.body_as_unicode())
        status, messages = data.get('status'), data.get('message')
        self.logger.info('Media URL: {} , status: {} , messages: {}'.format(response.url, status, len(messages)))

        series_id = response.meta['series_id']
        if messages:
            # 解析数据
            for message in messages:
                loader = ItemLoader(item=MediaItem())
                loader.default_output_processor = TakeFirst()
                post_id = message.get('id')
                loader.add_value('postId', int(post_id))
                loader.add_value('seriesId', series_id)
                loader.add_value('title', message.get('title'))
                loader.add_value('coverPicture', message.get('cover'))
                comment_total = int(message.get('comment_total'))
                loader.add_value('commentNum', comment_total)
                loader.add_value('liveTime', message.get('live_time'))
                detail_url = self.post.format(post_id)
                loader.add_value('detailUrl', detail_url)

                yield Request(detail_url, callback=self.parse_detail, meta={'loader': loader})
        else:
            # 终止条件
            self.logger.warning('Judge Stop Media: {}'.format(series_id))
            self.series_stop.add(series_id)

    def parse_detail(self, response):
        """获取资讯详情页的数据"""

        loader = response.meta['loader']
        desc = response.xpath('//p[@class="des_content"]/text()').extract_first()
        loader.add_value('desc', desc)
        tag_list = response.xpath('//div[@class="right_tag"]/a/text()').extract()
        loader.add_value('label', '; '.join(map(str.strip, tag_list)))
        media_info = response.xpath('//script[@type="text/javascript"]/text()[contains(.,"option")]').extract_first()
        media_info = media_info.split('=')[1].split(';')[0].replace(' ', '')
        loader.add_value('mediaInfo', media_info)

        item = loader.load_item()
        comment_num = item.get('commentNum')
        if comment_num:
            """抓取评论数据"""
            post_id = item.get('postId')
            comment_max_page = int(math.ceil(comment_num / 20))
            for page in range(1, comment_max_page):
                yield Request(self.comment.format(post_id, page),
                              callback=self.parse_comment, meta={'post_id': post_id})

        yield item

    def parse_comment(self, response):
        """解析获取评论数据"""

        data = json.loads(response.body_as_unicode())
        status, count = data.get('status'), int(data.get('count'))
        self.logger.info('Comment URL: {} , status: {} , count: {}'.format(response.url, status, count))

        if count:
            content = data.get('content')
            post_id = response.meta['post_id']
            bs = BeautifulSoup(content, 'html.parser')
            comment_list = bs.find_all('li')
            for comment in comment_list:
                item = CommentItem()
                item['postId'] = post_id
                item['userId'] = comment.find('div', class_='list_card')['card']
                item['username'] = comment.find('span', class_='mb_name').text
                item['avatar'] = comment.find('img')['src']
                comment_text = comment.find('p') or comment.find('dd')
                item['content'] = comment_text.text
                comment_time = comment.find('span', class_='commentTime').text.strip()
                item['commentTime'] = self.handleCommentTime(comment_time)

                yield item

    @staticmethod
    def handleCommentTime(comment_time):
        """处理日期问题, 当年的评论返回格式为: x月x日 hh:mm"""

        if comment_time.find('年') == -1:
            comment_time = '{}年{}'.format(mytime.now_year(), comment_time)

        return mytime.str_to_date_with_format(comment_time, '%Y年%m月%d日 %H:%M')

数据入库

  在pipelines.py中操作数据入库,别忘了还要在settings.py中配置pipelines开启:

# -*- coding: utf-8 -*-

# Define your item pipelines here
#
# Don't forget to add your pipeline to the ITEM_PIPELINES setting
# See: https://doc.scrapy.org/en/latest/topics/item-pipeline.html
from utils import mytime
from zealer import items
from zealer.service import app, sql


class ZealerPipeline(object):
    def __init__(self) -> None:
        self.redis = app.redis()
        self.postgres = app.postgres()

    def process_item(self, item, spider):

        if isinstance(item, items.MediaItem):
            series_id, post_id = item.get('seriesId'), item.get('postId')
            key = "zealer:seriesId:{}".format(series_id)

            if not self.redis.sismember(key, post_id):

                item_field = ['title', 'desc', 'label', 'coverPicture',
                              'mediaInfo', 'commentNum', 'detailUrl', 'liveTime']
                data = [item.get(field) for field in item_field]
                data.insert(0, series_id), data.insert(1, post_id), data.append(mytime.now_date())

                effect_count = self.postgres.handler(sql.save_media(), tuple(data))
                if effect_count:
                    self.redis.sadd(key, post_id)

        elif isinstance(item, items.CommentItem):
            post_id, user_id = item.get('postId'), item.get('userId')
            key = "zealer:postId:{}".format(post_id)

            if not self.redis.sismember(key, user_id):
                item_field = ['username', 'avatar', 'content', 'commentTime']
                data = [item.get(field) for field in item_field]
                data.insert(0, post_id), data.append(user_id), data.append(mytime.now_date())

                effect_count = self.postgres.handler(sql.save_comment(), tuple(data))
                if effect_count:
                    self.redis.sadd(key, user_id)

        return item

效果展示

数据展示
数据展示

示例代码 - GitHub

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore     ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库 ECS 实例和一台目标数据库 RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
1月前
|
数据采集 Web App开发 JavaScript
JavaScript爬虫进阶攻略:从网页采集到数据可视化
JavaScript爬虫进阶攻略:从网页采集到数据可视化
|
1月前
|
数据采集 存储 XML
给你一个具体的网站,你会如何设计爬虫来抓取数据?
【2月更文挑战第23天】【2月更文挑战第75篇】给你一个具体的网站,你会如何设计爬虫来抓取数据?
|
2月前
|
数据采集 数据可视化 数据挖掘
Python爬虫实战:抓取网站数据并生成报表
本文将介绍如何使用Python编写简单而高效的网络爬虫,从指定的网站上抓取数据,并利用数据分析库生成可视化报表。通过学习本文内容,读者将能够掌握基本的爬虫技术和数据处理方法,为日后开发更复杂的数据采集与分析工具打下坚实基础。
|
3月前
|
数据采集 大数据 调度
利用aiohttp异步爬虫实现网站数据高效抓取
利用aiohttp异步爬虫实现网站数据高效抓取
|
3月前
|
数据采集 存储 数据处理
Scrapy:Python网络爬虫框架的利器
在当今信息时代,网络数据已成为企业和个人获取信息的重要途径。而Python网络爬虫框架Scrapy则成为了网络爬虫工程师的必备工具。本文将介绍Scrapy的概念与实践,以及其在数据采集和处理过程中的应用。
23 1
|
2月前
|
数据采集 JSON JavaScript
Python爬虫案例:抓取猫眼电影排行榜
python爬取猫眼电影排行榜数据分析,实战。(正则表达式,xpath,beautifulsoup)【2月更文挑战第11天】
69 2
Python爬虫案例:抓取猫眼电影排行榜
|
4月前
|
数据采集 数据库
爬虫增量抓取
爬虫增量抓取
111 3
|
4月前
|
数据采集 调度 Python
Scrapy爬虫中合理使用time.sleep和Request
Scrapy爬虫中合理使用time.sleep和Request
|
24天前
|
数据采集 存储 Rust
Rust高级爬虫:如何利用Rust抓取精美图片
Rust高级爬虫:如何利用Rust抓取精美图片
|
28天前
|
数据采集 Web App开发 搜索推荐
项目配置之道:优化Scrapy参数提升爬虫效率
项目配置之道:优化Scrapy参数提升爬虫效率