Python 网络爬虫:自动化数据采集的艺术

P2P网络信贷,由于其优秀的集资功能和对闲散资金的高度利用,使得普惠金融成为可能,但它也比传统信贷有着更高的风险,其风险主要来自于借贷双方的信息不对称性。我们使用python获取散标详情中借贷人信息数据,以及贷款的相关数据,以备用于数据分析。

一、环境

  • 操作系统:ubuntu14.04
  • python:3.6

二、实现思路

爬取的散标详情页面是静态页面,数据格式是json,我们这里使用jsonpath进行解析提取数据,因为需要登录,我们使用cookie进行登录,然后使用多线程+协程进行数据爬取,考虑反爬,因此使用ip代理池、随机User-Agent等,最后数据写入到mongodb。

三、具体实现

3.1 筛选代理ip

获取代理ip,可以从免费的网站,比如西刺网,也可以花钱买一些高质量的ip。这里我是买了一些ip,24小时有效,虽然是买的,有http,也要https的,但是质量也不是很高,但是也免费的质量要好些,因此还需要通过测试,从一批ip中筛选出一些高质量的ip。

# !/usr/bin/env python3
# -*- encoding: utf-8 -*-

import os
import re
import telnetlib
import time

class GenerateProxies(object):

    def __init__(self):
        # 生成临时文件与代理ip的文件
        self.temp_file_path = './proxies_set.txt'
        self.file_path = './proxies_rm_duplicated.txt'
        # 代理ip文件夹
        self.ip_folder = './proxies_file'
        # 经过测试的ip
        self.test_ip_file = './test_proxies.txt'

    def save_ip_tmp_file(self):
        """
        合并所有ip文件,保存代理ip到临时文件
        :return:
        """
        # 如何临时文件存在,则删除
        if os.path.exists(self.temp_file_path):
            os.remove(self.temp_file_path)

        # 批量读取ip到文件中
        with open(self.temp_file_path, 'a+') as f:
            # 读取文件列表
            file_list = os.listdir(self.ip_folder)
            for file in file_list:
                file_full_path = os.path.join(self.ip_folder, file)
                with open(file_full_path, 'r') as ff:
                    f.write(ff.read())
                    f.writelines('\n')

    def ip_duplicate(self):
        """
        ip去重
        :return:
        """
        with open(self.temp_file_path, 'r') as f:
            ip_list = f.readlines()
            ip_set = set(ip_list)
            # 去重完再写回到新的文件中
            print('ip池中一共有%s个不同的代理ip...' % len(ip_set))
            with open(self.file_path, 'w') as ff:
                for i in ip_set:
                    ff.writelines(i)
        # 删除临时文件
        os.remove(self.temp_file_path)

    def test_ip(self):
        """
        测试ip是否为效
        :return:
        """
        # 判断是否存在经测试过的ip文件
        if os.path.exists(self.test_ip_file):
            os.remove(self.test_ip_file)
        # 读取ip
        with open(self.file_path, 'r') as f:
            ip_list = f.readlines()
            for i in ip_list:
                try:
                    # 匹配ip和端口
                    ip = re.match('(.*):(.*)', i)
                    # 测试ip是否可用
                    telnetlib.Telnet(ip.group(1), port=ip.group(2), timeout=1)
                    print(i)
                    # 如果没有问题,则添加到文件中
                    with open(self.test_ip_file, 'a+') as ff:
                        ff.writelines(i)
                except:
                    print('ip 无效!')
        # 删除去重后的文件
        os.remove(self.file_path)

    def read_ip(self):
        """
        查看多少个有用的代理
        :return:
        """
        with open(self.test_ip_file, 'r') as f:
            ip_list = f.readlines()
            print('当前一共有%s个有用的ip代理...' % len(ip_list))


if __name__ == '__main__':
    print('程序开始...\n当前时间为:%s' % time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
    start_time = time.time()
    # 创建生成代理ip对象
    gen_proxies = GenerateProxies()
    # 合并所有的ip到临时文件中
    gen_proxies.save_ip_tmp_file()
    # ip去重
    gen_proxies.ip_duplicate()
    # 筛选出测试过有用的ip
    gen_proxies.test_ip()
    # 查看有多少个有用的代理
    gen_proxies.read_ip()
    end_time = time.time()
    print('程序结束...\n当前时间为:%s' % time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
    print('一共耗时:%s 秒' % (end_time - start_time))

3.2 定义User-Agent列表

各个版本的浏览器头,都准备几个

# 浏览器头列表
USER_AGENTS = [
    "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; AcooBrowser; .NET CLR 1.1.4322; .NET CLR 2.0.50727)",
    "Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 6.0; Acoo Browser; SLCC1; .NET CLR 2.0.50727; Media Center PC 5.0; .NET CLR 3.0.04506)",
    "Mozilla/4.0 (compatible; MSIE 7.0; AOL 9.5; AOLBuild 4337.35; Windows NT 5.1; .NET CLR 1.1.4322; .NET CLR 2.0.50727)",
    "Mozilla/5.0 (Windows; U; MSIE 9.0; Windows NT 9.0; en-US)",
    "Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; Win64; x64; Trident/5.0; .NET CLR 3.5.30729; .NET CLR 3.0.30729; .NET CLR 2.0.50727; Media Center PC 6.0)",
    "Mozilla/5.0 (compatible; MSIE 8.0; Windows NT 6.0; Trident/4.0; WOW64; Trident/4.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; .NET CLR 1.0.3705; .NET CLR 1.1.4322)",
    "Mozilla/4.0 (compatible; MSIE 7.0b; Windows NT 5.2; .NET CLR 1.1.4322; .NET CLR 2.0.50727; InfoPath.2; .NET CLR 3.0.04506.30)",
    "Mozilla/5.0 (Windows; U; Windows NT 5.1; zh-CN) AppleWebKit/523.15 (KHTML, like Gecko, Safari/419.3) Arora/0.3 (Change: 287 c9dfb30)",
    "Mozilla/5.0 (X11; U; Linux; en-US) AppleWebKit/527+ (KHTML, like Gecko, Safari/419.3) Arora/0.6",
    "Mozilla/5.0 (Windows; U; Windows NT 5.1; en-US; rv:1.8.1.2pre) Gecko/20070215 K-Ninja/2.1.1",
    "Mozilla/5.0 (Windows; U; Windows NT 5.1; zh-CN; rv:1.9) Gecko/20080705 Firefox/3.0 Kapiko/3.0",
    "Mozilla/5.0 (X11; Linux i686; U;) Gecko/20070322 Kazehakase/0.4.5",
    "Mozilla/5.0 (X11; U; Linux i686; en-US; rv:1.9.0.8) Gecko Fedora/1.9.0.8-1.fc10 Kazehakase/0.5.6",
    "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/535.11 (KHTML, like Gecko) Chrome/17.0.963.56 Safari/535.11",
    "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_7_3) AppleWebKit/535.20 (KHTML, like Gecko) Chrome/19.0.1036.7 Safari/535.20",
    "Opera/9.80 (Macintosh; Intel Mac OS X 10.6.8; U; fr) Presto/2.9.168 Version/11.52",
]
3.3 爬虫代码
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import json, time, re, requests
from jsonpath import jsonpath
from datetime import datetime
import pymongo
from queue import Queue  # 导入关于线程的队列
from threading import current_thread
from retrying import retry
from random import choice
from UserAgentSet import USER_AGENTS  # 导入浏览器头
from ProxiesSet import PROXIES  # 导入ip池列表
# from multiprocessing.dummy import Pool  # 导入线程池对象
from gevent.pool import Pool  # 导入协程池对象
import gevent.monkey  # 协程池
gevent.monkey.patch_all()  # 打补丁


class RenRenCrawl(object):

    def __init__(self):
        # 列表信息url
        self.__info_detail = 'https://www.renrendai.com/loan-{}.html'

        # 创建url队列, 用于存放url
        self.__url_list_queue = Queue()
        # 创建线程池对象
        self.__thread_pool = Pool()

        # 打开mongodb连接
        self.__mongo_client = pymongo.MongoClient('127.0.0.1', 27017)
        # 创建db_renren数据库
        self.db = self.__mongo_client.db_renren

    def __del__(self):
        """
        程序结束前关闭mongodb连接
        :return:
        """
        self.__mongo_client.close()

    def get_url_list(self):
        """生产url"""
        for uid in range(1, 2785540):
            url = self.__info_detail.format(uid)
            # 将生成的url存放到队列中
            self.__url_list_queue.put(url)

    @retry(stop_max_attempt_number=5)
    def _parse_url_retry(self, url):
        """
        超时重试。
        消费url,获取html源码。
        :param url:
        :return:
        """
        # 自定义请求头
        headers = {
            'Accept': 'application/json, text/javascript, */*; q=0.01',
            'User-Agent': choice(USER_AGENTS),  # 随机选择一个浏览器头
            'Origin': 'https://www.renrendai.com/',
            'Referer': 'https://www.renrendai.com/loan.html',
        }
        # 从ip代理池中随机选择一个ip
        proxies = {
            "https": "https://" + choice(PROXIES['https']),
        }
        # 超时会报错
        response = requests.get(url, headers=headers, proxies=proxies, timeout=2)
        return response

    def parse_url(self, url):
        """
        消费url,获取html源码
        :param url:
        :return:
        """

        try:
            response = self._parse_url_retry(url)
            print('线程号为:%s, url为:%s, 状态码为:%s' % (current_thread().getName(), url, response.status_code))
            content = response.content.decode('utf-8')
            # 返回内容及状态码
            return content, response.status_code
        except Exception as e:
            content = ''
            status_code = 408
            print('%s Timeout... 状态码为:%s' % (url, status_code))
            return content, status_code

    @staticmethod
    def parse_html(content, url, status_code):
        """
        消费html,生产item
        :return:
        """

        # 如果页面不存在,捕获异常,并保存该链接
        try:
            # 通过正则匹配获取用户信息
            user_info = re.search(r"var info = \'(.*)\'", content).group(1)
            # 将unicode编码进行替换回utf-8编码
            user_info_clean = user_info.replace('\\u0022', '"').replace('\\u005C', '\\').replace('\\u002D', '-')
            # 将json数据转换为dict
            res_dict = json.loads(user_info_clean)
        except Exception as e:
            return {
                'url': url,
                'http_status_code': status_code
            }

        # 将获取的item存放在字典中
        info = {
            'loan': {},
            'borrower': {},
            'userLoanRecord': {},
            'describe': {}
        }
        # ------------------ 借款信息 -------------------- #
        try:
            info['loan']['amount'] = str(jsonpath(res_dict, '$.loan.amount')[0])  # 标的总额
            info['loan']['interest'] = '%.2f' % jsonpath(res_dict, '$.loan.interest')[0] + '%'  # 年利率
            info['loan']['months'] = str(jsonpath(res_dict, '$.loan.months')[0]) + '个月'  # 还款期限
            # 起息日
            interest_date_timestamps = jsonpath(res_dict, '$.interestDate')[0]
            info['loan']['interest_date'] = time.strftime('%Y-%m-%d', time.localtime(
                int(interest_date_timestamps) / 1000)) if interest_date_timestamps else '放款日当日'
            # 提前还款费率
            monthly_min_interest = jsonpath(res_dict, '$.loan.monthlyMinInterest')[0]
            info['loan']['inrepay_penal_fee'] = '%.2f' % int(
                re.search('"inRepayPenalFee":"(.*?)"', monthly_min_interest).group(1)) + '%'
            info['loan']['credit_level'] = jsonpath(res_dict, '$.borrower.creditLevel')[0]  # 风险等级
            info['loan']['repay_type'] = '按月还款/等额本息' if jsonpath(res_dict, '$.loan.repayType')[
                                                            0] == 0 else ''  # 还款方式
            info['loan']['repay_source'] = jsonpath(res_dict, '$.repaySource')[0]  # 还款来源

            # ------------------ 借贷人信息 -------------------- #
            info['borrower']['nick_name'] = jsonpath(res_dict, '$.borrower.nickName')[0]  # 昵称
            info['borrower']['real_name'] = jsonpath(res_dict, '$.borrower.realName')[0]  # 姓名
            info['borrower']['id_no'] = jsonpath(res_dict, '$.borrower.idNo')[0]  # 身份证号
            info['borrower']['gender'] = jsonpath(res_dict, '$.borrower.gender')[0]  # 性别
            info['borrower']['age'] = str(datetime.now().year - int(jsonpath(res_dict, '$.borrower.birthDay')[0][:4]))  # 年龄=当前时间-出生年月
            info['borrower']['graduation'] = jsonpath(res_dict, '$.borrower.graduation')[0]  # 学历
            info['borrower']['marriage'] = '已婚' if jsonpath(res_dict, '$.borrower.marriage')[0] == 'MARRIED' else '未婚'  # 婚姻
            info['borrower']['salary'] = jsonpath(res_dict, '$.borrower.salary')[0]  # 收入
            info['borrower']['has_hose'] = '有房产' if jsonpath(res_dict, '$.borrower.hasHouse')[0] else '无房产'  # 房产
            info['borrower']['house_loan'] = '有房贷' if jsonpath(res_dict, '$.borrower.houseLoan')[0] else '无房贷'  # 房贷
            info['borrower']['has_car'] = '有车产' if jsonpath(res_dict, '$.borrower.hasCar')[0] else '无车产'  # 车产
            info['borrower']['car_loan'] = '有车贷' if jsonpath(res_dict, '$.borrower.carLoan')[0] else '无车贷'  # 车贷
            info['borrower']['office_domain'] = jsonpath(res_dict, '$.borrower.officeDomain')[0]  # 公司行业
            info['borrower']['office_scale'] = jsonpath(res_dict, '$.borrower.officeScale')[0]  # 公司规模
            info['borrower']['position'] = jsonpath(res_dict, '$.borrower.position')[0]  # 岗位职位
            info['borrower']['province'] = jsonpath(res_dict, '$.borrower.province')[0]  # 工作职位
            info['borrower']['work_years'] = jsonpath(res_dict, '$.borrower.workYears')[0]  # 工作时间
            info['borrower']['car_loan'] = jsonpath(res_dict, '$.hasOthDebt')[0] if jsonpath(res_dict, '$.hasOthDebt')[0] else '无'  # 其他负债

            # ------------------ 信用信息 -------------------- #
            info['userLoanRecord']['total_count'] = str(jsonpath(res_dict, '$.userLoanRecord.totalCount')[0]) + '笔'  # 申请借款
            info['userLoanRecord']['available_credits'] = str(jsonpath(res_dict, '$.borrower.availableCredits')[0]) + '元'  # 信用额度
            info['userLoanRecord']['overdue_total_amount'] = str(jsonpath(res_dict, '$.userLoanRecord.overdueTotalAmount')[0]) + '元'  # 逾期金额
            info['userLoanRecord']['success_count'] = str(jsonpath(res_dict, '$.userLoanRecord.successCount')[0]) + '笔'  # 成功借款
            info['userLoanRecord']['borrow_mount'] = str(jsonpath(res_dict, '$.userLoanRecord.borrowAmount')[0]) + '元'  # 借款总额
            info['userLoanRecord']['overdue_count'] = str(jsonpath(res_dict, '$.userLoanRecord.overdueCount')[0]) + '次'  # 逾期次数
            info['userLoanRecord']['already_pay_count'] = str(jsonpath(res_dict, '$.userLoanRecord.alreadyPayCount')[0]) + '笔'  # 还清笔数
            info['userLoanRecord']['notpay_total_amount'] = str(jsonpath(res_dict, '$.userLoanRecord.notPayTotalAmount')[0]) + '元'  # 待还本息
            info['userLoanRecord']['failed_count'] = str(jsonpath(res_dict, '$.userLoanRecord.failedCount')[0]) + '笔'  # 严重逾期

            # ------------------ 贷款描述 -------------------- #
            info['describe']['description'] = jsonpath(res_dict, '$.loan.description')[0]  # 贷款描述

            # ------------------ 其他相关信息 -------------------- #

            return info
        except Exception as e:
            # 错误则返回空, 并设置content键内容为0
            info.setdefault('content', 0)
            return info

    def save_to_mongodb(self, info):
        """
        保存数据
        :return:
        """
        # 创建多个集合保存数据(info_set1, info_set2, ...)
        # 保存一条记录使用insert_one, 多条则insert_many
        self.db.info_set.insert_one(info)

    def exec_task(self):
        """
        执行任务方法
        :return:
        """
        # 从队列中获取url
        url_ = self.__url_list_queue.get()

        # 消费url,获取响应html源码
        content, status_code = self.parse_url(url_)

        # 消费html,生产item
        info = self.parse_html(content, url_, status_code)

        # 保存结果到数据库
        self.save_to_mongodb(info)

        # 通知系统当前任务已完成
        self.__url_list_queue.task_done()

    def exec_task_finished(self, result):
        """
        执行任务完成后的回调方法
        :param result:
            注意,必须要有一个参数接收, 否则会报错。
        :return:
        """
        self.__thread_pool.apply_async(self.exec_task, callback=self.exec_task_finished)

    def run(self):
        # 调用方法,生成url到队列中
        self.get_url_list()

        # 分配任务执行
        for _ in range(500):
            # 执行任务, 执行完之后回调
            self.__thread_pool.apply_async(self.exec_task, callback=self.exec_task_finished)

        # 监控url队列, 直到队列为空, 主线程结束
        self.__url_list_queue.join()


if __name__ == '__main__':
    print('程序开始...\n当前时间为:%s' % time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
    start_time = time.time()
    spider = RenRenCrawl()
    spider.run()
    end_time = time.time()
    print('程序结束...\n当前时间为:%s' % time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
    print('一共耗时:%s 秒,%s 分钟' % (end_time-start_time, (end_time-start_time)//60))

3.4 爬取结果

其中一个借贷人的数据如下:

{
	"detail_info" : {
		"loan" : {
			"repay_source" : null,
			"months" : "10个月",
			"inrepay_penal_fee" : "1.00%",
			"repay_type" : "按月还款/等额本息",
			"interest_date" : "放款日当日",
			"amount" : "12000",
			"credit_level" : "A",
			"interest" : "9.00%"
		},
		"userLoanRecord" : {
			"overdue_total_amount" : "0元",
			"already_pay_count" : "1笔",
			"available_credits" : "0元",
			"success_count" : "1笔",
			"notpay_total_amount" : "0元",
			"overdue_count" : "0次",
			"total_count" : "2笔",
			"borrow_mount" : "12000元",
			"failed_count" : "0笔"
		},
		"describe" : {
			"description" : "深圳市中安信业创业投资有限公司是一家专门为个体工商户、小企业主和低收入家庭提供快速简便、无抵押无担保小额个人贷款服务的企业。公司自2004年开始探索无抵押无担保贷款, 至今累计放款全国最多,小额贷款服务的客户最多。在广东省(深圳市、佛山市),北京市,天津市,上海市,河北省,福建省,山东省,江苏省,湖南省,广西, 四川省,浙江省,河南省,湖北省,安徽省与辽宁省等五十多家便利的网点,逾千名员工专门从事小额贷款业务。中安信业是国内探索无抵押无担保商业化 可持续小额贷款最早的、累计放款量和贷款余额最多的、全国网点最多的、信贷质量最好的、运作最为规范的专业小额贷款机构。"
		},
		"borrower" : {
			"id_no" : "610************311",
			"office_scale" : "10-100人",
			"nick_name" : "752002141009001",
			"province" : "广东省",
			"position" : "电工",
			"gender" : "男",
			"age" : "35",
			"office_domain" : "IT",
			"has_hose" : "有房产",
			"car_loan" : "无",
			"work_years" : "1-3年(含)",
			"marriage" : "已婚",
			"has_car" : "有车产",
			"salary" : "20000-50000元",
			"house_loan" : "无房贷",
			"graduation" : "大专",
			"real_name" : "田**"
		}
	}
}