Python 网络爬虫:自动化数据采集的艺术
P2P网络信贷,由于其优秀的集资功能和对闲散资金的高度利用,使得普惠金融成为可能,但它也比传统信贷有着更高的风险,其风险主要来自于借贷双方的信息不对称性。我们使用python获取散标详情中借贷人信息数据,以及贷款的相关数据,以备用于数据分析。
一、环境
- 操作系统:ubuntu14.04
- python:3.6
二、实现思路
爬取的散标详情页面是静态页面,数据格式是json,我们这里使用jsonpath进行解析提取数据,因为需要登录,我们使用cookie进行登录,然后使用多线程+协程进行数据爬取,考虑反爬,因此使用ip代理池、随机User-Agent等,最后数据写入到mongodb。
三、具体实现
3.1 筛选代理ip
获取代理ip,可以从免费的网站,比如西刺网,也可以花钱买一些高质量的ip。这里我是买了一些ip,24小时有效,虽然是买的,有http,也要https的,但是质量也不是很高,但是也免费的质量要好些,因此还需要通过测试,从一批ip中筛选出一些高质量的ip。
# !/usr/bin/env python3
# -*- encoding: utf-8 -*-
import os
import re
import telnetlib
import time
class GenerateProxies(object):
def __init__(self):
# 生成临时文件与代理ip的文件
self.temp_file_path = './proxies_set.txt'
self.file_path = './proxies_rm_duplicated.txt'
# 代理ip文件夹
self.ip_folder = './proxies_file'
# 经过测试的ip
self.test_ip_file = './test_proxies.txt'
def save_ip_tmp_file(self):
"""
合并所有ip文件,保存代理ip到临时文件
:return:
"""
# 如何临时文件存在,则删除
if os.path.exists(self.temp_file_path):
os.remove(self.temp_file_path)
# 批量读取ip到文件中
with open(self.temp_file_path, 'a+') as f:
# 读取文件列表
file_list = os.listdir(self.ip_folder)
for file in file_list:
file_full_path = os.path.join(self.ip_folder, file)
with open(file_full_path, 'r') as ff:
f.write(ff.read())
f.writelines('\n')
def ip_duplicate(self):
"""
ip去重
:return:
"""
with open(self.temp_file_path, 'r') as f:
ip_list = f.readlines()
ip_set = set(ip_list)
# 去重完再写回到新的文件中
print('ip池中一共有%s个不同的代理ip...' % len(ip_set))
with open(self.file_path, 'w') as ff:
for i in ip_set:
ff.writelines(i)
# 删除临时文件
os.remove(self.temp_file_path)
def test_ip(self):
"""
测试ip是否为效
:return:
"""
# 判断是否存在经测试过的ip文件
if os.path.exists(self.test_ip_file):
os.remove(self.test_ip_file)
# 读取ip
with open(self.file_path, 'r') as f:
ip_list = f.readlines()
for i in ip_list:
try:
# 匹配ip和端口
ip = re.match('(.*):(.*)', i)
# 测试ip是否可用
telnetlib.Telnet(ip.group(1), port=ip.group(2), timeout=1)
print(i)
# 如果没有问题,则添加到文件中
with open(self.test_ip_file, 'a+') as ff:
ff.writelines(i)
except:
print('ip 无效!')
# 删除去重后的文件
os.remove(self.file_path)
def read_ip(self):
"""
查看多少个有用的代理
:return:
"""
with open(self.test_ip_file, 'r') as f:
ip_list = f.readlines()
print('当前一共有%s个有用的ip代理...' % len(ip_list))
if __name__ == '__main__':
print('程序开始...\n当前时间为:%s' % time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
start_time = time.time()
# 创建生成代理ip对象
gen_proxies = GenerateProxies()
# 合并所有的ip到临时文件中
gen_proxies.save_ip_tmp_file()
# ip去重
gen_proxies.ip_duplicate()
# 筛选出测试过有用的ip
gen_proxies.test_ip()
# 查看有多少个有用的代理
gen_proxies.read_ip()
end_time = time.time()
print('程序结束...\n当前时间为:%s' % time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
print('一共耗时:%s 秒' % (end_time - start_time))
3.2 定义User-Agent列表
各个版本的浏览器头,都准备几个
# 浏览器头列表
USER_AGENTS = [
"Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; AcooBrowser; .NET CLR 1.1.4322; .NET CLR 2.0.50727)",
"Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 6.0; Acoo Browser; SLCC1; .NET CLR 2.0.50727; Media Center PC 5.0; .NET CLR 3.0.04506)",
"Mozilla/4.0 (compatible; MSIE 7.0; AOL 9.5; AOLBuild 4337.35; Windows NT 5.1; .NET CLR 1.1.4322; .NET CLR 2.0.50727)",
"Mozilla/5.0 (Windows; U; MSIE 9.0; Windows NT 9.0; en-US)",
"Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; Win64; x64; Trident/5.0; .NET CLR 3.5.30729; .NET CLR 3.0.30729; .NET CLR 2.0.50727; Media Center PC 6.0)",
"Mozilla/5.0 (compatible; MSIE 8.0; Windows NT 6.0; Trident/4.0; WOW64; Trident/4.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; .NET CLR 1.0.3705; .NET CLR 1.1.4322)",
"Mozilla/4.0 (compatible; MSIE 7.0b; Windows NT 5.2; .NET CLR 1.1.4322; .NET CLR 2.0.50727; InfoPath.2; .NET CLR 3.0.04506.30)",
"Mozilla/5.0 (Windows; U; Windows NT 5.1; zh-CN) AppleWebKit/523.15 (KHTML, like Gecko, Safari/419.3) Arora/0.3 (Change: 287 c9dfb30)",
"Mozilla/5.0 (X11; U; Linux; en-US) AppleWebKit/527+ (KHTML, like Gecko, Safari/419.3) Arora/0.6",
"Mozilla/5.0 (Windows; U; Windows NT 5.1; en-US; rv:1.8.1.2pre) Gecko/20070215 K-Ninja/2.1.1",
"Mozilla/5.0 (Windows; U; Windows NT 5.1; zh-CN; rv:1.9) Gecko/20080705 Firefox/3.0 Kapiko/3.0",
"Mozilla/5.0 (X11; Linux i686; U;) Gecko/20070322 Kazehakase/0.4.5",
"Mozilla/5.0 (X11; U; Linux i686; en-US; rv:1.9.0.8) Gecko Fedora/1.9.0.8-1.fc10 Kazehakase/0.5.6",
"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/535.11 (KHTML, like Gecko) Chrome/17.0.963.56 Safari/535.11",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_7_3) AppleWebKit/535.20 (KHTML, like Gecko) Chrome/19.0.1036.7 Safari/535.20",
"Opera/9.80 (Macintosh; Intel Mac OS X 10.6.8; U; fr) Presto/2.9.168 Version/11.52",
]
3.3 爬虫代码
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import json, time, re, requests
from jsonpath import jsonpath
from datetime import datetime
import pymongo
from queue import Queue # 导入关于线程的队列
from threading import current_thread
from retrying import retry
from random import choice
from UserAgentSet import USER_AGENTS # 导入浏览器头
from ProxiesSet import PROXIES # 导入ip池列表
# from multiprocessing.dummy import Pool # 导入线程池对象
from gevent.pool import Pool # 导入协程池对象
import gevent.monkey # 协程池
gevent.monkey.patch_all() # 打补丁
class RenRenCrawl(object):
def __init__(self):
# 列表信息url
self.__info_detail = 'https://www.renrendai.com/loan-{}.html'
# 创建url队列, 用于存放url
self.__url_list_queue = Queue()
# 创建线程池对象
self.__thread_pool = Pool()
# 打开mongodb连接
self.__mongo_client = pymongo.MongoClient('127.0.0.1', 27017)
# 创建db_renren数据库
self.db = self.__mongo_client.db_renren
def __del__(self):
"""
程序结束前关闭mongodb连接
:return:
"""
self.__mongo_client.close()
def get_url_list(self):
"""生产url"""
for uid in range(1, 2785540):
url = self.__info_detail.format(uid)
# 将生成的url存放到队列中
self.__url_list_queue.put(url)
@retry(stop_max_attempt_number=5)
def _parse_url_retry(self, url):
"""
超时重试。
消费url,获取html源码。
:param url:
:return:
"""
# 自定义请求头
headers = {
'Accept': 'application/json, text/javascript, */*; q=0.01',
'User-Agent': choice(USER_AGENTS), # 随机选择一个浏览器头
'Origin': 'https://www.renrendai.com/',
'Referer': 'https://www.renrendai.com/loan.html',
}
# 从ip代理池中随机选择一个ip
proxies = {
"https": "https://" + choice(PROXIES['https']),
}
# 超时会报错
response = requests.get(url, headers=headers, proxies=proxies, timeout=2)
return response
def parse_url(self, url):
"""
消费url,获取html源码
:param url:
:return:
"""
try:
response = self._parse_url_retry(url)
print('线程号为:%s, url为:%s, 状态码为:%s' % (current_thread().getName(), url, response.status_code))
content = response.content.decode('utf-8')
# 返回内容及状态码
return content, response.status_code
except Exception as e:
content = ''
status_code = 408
print('%s Timeout... 状态码为:%s' % (url, status_code))
return content, status_code
@staticmethod
def parse_html(content, url, status_code):
"""
消费html,生产item
:return:
"""
# 如果页面不存在,捕获异常,并保存该链接
try:
# 通过正则匹配获取用户信息
user_info = re.search(r"var info = \'(.*)\'", content).group(1)
# 将unicode编码进行替换回utf-8编码
user_info_clean = user_info.replace('\\u0022', '"').replace('\\u005C', '\\').replace('\\u002D', '-')
# 将json数据转换为dict
res_dict = json.loads(user_info_clean)
except Exception as e:
return {
'url': url,
'http_status_code': status_code
}
# 将获取的item存放在字典中
info = {
'loan': {},
'borrower': {},
'userLoanRecord': {},
'describe': {}
}
# ------------------ 借款信息 -------------------- #
try:
info['loan']['amount'] = str(jsonpath(res_dict, '$.loan.amount')[0]) # 标的总额
info['loan']['interest'] = '%.2f' % jsonpath(res_dict, '$.loan.interest')[0] + '%' # 年利率
info['loan']['months'] = str(jsonpath(res_dict, '$.loan.months')[0]) + '个月' # 还款期限
# 起息日
interest_date_timestamps = jsonpath(res_dict, '$.interestDate')[0]
info['loan']['interest_date'] = time.strftime('%Y-%m-%d', time.localtime(
int(interest_date_timestamps) / 1000)) if interest_date_timestamps else '放款日当日'
# 提前还款费率
monthly_min_interest = jsonpath(res_dict, '$.loan.monthlyMinInterest')[0]
info['loan']['inrepay_penal_fee'] = '%.2f' % int(
re.search('"inRepayPenalFee":"(.*?)"', monthly_min_interest).group(1)) + '%'
info['loan']['credit_level'] = jsonpath(res_dict, '$.borrower.creditLevel')[0] # 风险等级
info['loan']['repay_type'] = '按月还款/等额本息' if jsonpath(res_dict, '$.loan.repayType')[
0] == 0 else '' # 还款方式
info['loan']['repay_source'] = jsonpath(res_dict, '$.repaySource')[0] # 还款来源
# ------------------ 借贷人信息 -------------------- #
info['borrower']['nick_name'] = jsonpath(res_dict, '$.borrower.nickName')[0] # 昵称
info['borrower']['real_name'] = jsonpath(res_dict, '$.borrower.realName')[0] # 姓名
info['borrower']['id_no'] = jsonpath(res_dict, '$.borrower.idNo')[0] # 身份证号
info['borrower']['gender'] = jsonpath(res_dict, '$.borrower.gender')[0] # 性别
info['borrower']['age'] = str(datetime.now().year - int(jsonpath(res_dict, '$.borrower.birthDay')[0][:4])) # 年龄=当前时间-出生年月
info['borrower']['graduation'] = jsonpath(res_dict, '$.borrower.graduation')[0] # 学历
info['borrower']['marriage'] = '已婚' if jsonpath(res_dict, '$.borrower.marriage')[0] == 'MARRIED' else '未婚' # 婚姻
info['borrower']['salary'] = jsonpath(res_dict, '$.borrower.salary')[0] # 收入
info['borrower']['has_hose'] = '有房产' if jsonpath(res_dict, '$.borrower.hasHouse')[0] else '无房产' # 房产
info['borrower']['house_loan'] = '有房贷' if jsonpath(res_dict, '$.borrower.houseLoan')[0] else '无房贷' # 房贷
info['borrower']['has_car'] = '有车产' if jsonpath(res_dict, '$.borrower.hasCar')[0] else '无车产' # 车产
info['borrower']['car_loan'] = '有车贷' if jsonpath(res_dict, '$.borrower.carLoan')[0] else '无车贷' # 车贷
info['borrower']['office_domain'] = jsonpath(res_dict, '$.borrower.officeDomain')[0] # 公司行业
info['borrower']['office_scale'] = jsonpath(res_dict, '$.borrower.officeScale')[0] # 公司规模
info['borrower']['position'] = jsonpath(res_dict, '$.borrower.position')[0] # 岗位职位
info['borrower']['province'] = jsonpath(res_dict, '$.borrower.province')[0] # 工作职位
info['borrower']['work_years'] = jsonpath(res_dict, '$.borrower.workYears')[0] # 工作时间
info['borrower']['car_loan'] = jsonpath(res_dict, '$.hasOthDebt')[0] if jsonpath(res_dict, '$.hasOthDebt')[0] else '无' # 其他负债
# ------------------ 信用信息 -------------------- #
info['userLoanRecord']['total_count'] = str(jsonpath(res_dict, '$.userLoanRecord.totalCount')[0]) + '笔' # 申请借款
info['userLoanRecord']['available_credits'] = str(jsonpath(res_dict, '$.borrower.availableCredits')[0]) + '元' # 信用额度
info['userLoanRecord']['overdue_total_amount'] = str(jsonpath(res_dict, '$.userLoanRecord.overdueTotalAmount')[0]) + '元' # 逾期金额
info['userLoanRecord']['success_count'] = str(jsonpath(res_dict, '$.userLoanRecord.successCount')[0]) + '笔' # 成功借款
info['userLoanRecord']['borrow_mount'] = str(jsonpath(res_dict, '$.userLoanRecord.borrowAmount')[0]) + '元' # 借款总额
info['userLoanRecord']['overdue_count'] = str(jsonpath(res_dict, '$.userLoanRecord.overdueCount')[0]) + '次' # 逾期次数
info['userLoanRecord']['already_pay_count'] = str(jsonpath(res_dict, '$.userLoanRecord.alreadyPayCount')[0]) + '笔' # 还清笔数
info['userLoanRecord']['notpay_total_amount'] = str(jsonpath(res_dict, '$.userLoanRecord.notPayTotalAmount')[0]) + '元' # 待还本息
info['userLoanRecord']['failed_count'] = str(jsonpath(res_dict, '$.userLoanRecord.failedCount')[0]) + '笔' # 严重逾期
# ------------------ 贷款描述 -------------------- #
info['describe']['description'] = jsonpath(res_dict, '$.loan.description')[0] # 贷款描述
# ------------------ 其他相关信息 -------------------- #
return info
except Exception as e:
# 错误则返回空, 并设置content键内容为0
info.setdefault('content', 0)
return info
def save_to_mongodb(self, info):
"""
保存数据
:return:
"""
# 创建多个集合保存数据(info_set1, info_set2, ...)
# 保存一条记录使用insert_one, 多条则insert_many
self.db.info_set.insert_one(info)
def exec_task(self):
"""
执行任务方法
:return:
"""
# 从队列中获取url
url_ = self.__url_list_queue.get()
# 消费url,获取响应html源码
content, status_code = self.parse_url(url_)
# 消费html,生产item
info = self.parse_html(content, url_, status_code)
# 保存结果到数据库
self.save_to_mongodb(info)
# 通知系统当前任务已完成
self.__url_list_queue.task_done()
def exec_task_finished(self, result):
"""
执行任务完成后的回调方法
:param result:
注意,必须要有一个参数接收, 否则会报错。
:return:
"""
self.__thread_pool.apply_async(self.exec_task, callback=self.exec_task_finished)
def run(self):
# 调用方法,生成url到队列中
self.get_url_list()
# 分配任务执行
for _ in range(500):
# 执行任务, 执行完之后回调
self.__thread_pool.apply_async(self.exec_task, callback=self.exec_task_finished)
# 监控url队列, 直到队列为空, 主线程结束
self.__url_list_queue.join()
if __name__ == '__main__':
print('程序开始...\n当前时间为:%s' % time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
start_time = time.time()
spider = RenRenCrawl()
spider.run()
end_time = time.time()
print('程序结束...\n当前时间为:%s' % time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
print('一共耗时:%s 秒,%s 分钟' % (end_time-start_time, (end_time-start_time)//60))
3.4 爬取结果
其中一个借贷人的数据如下:
{
"detail_info" : {
"loan" : {
"repay_source" : null,
"months" : "10个月",
"inrepay_penal_fee" : "1.00%",
"repay_type" : "按月还款/等额本息",
"interest_date" : "放款日当日",
"amount" : "12000",
"credit_level" : "A",
"interest" : "9.00%"
},
"userLoanRecord" : {
"overdue_total_amount" : "0元",
"already_pay_count" : "1笔",
"available_credits" : "0元",
"success_count" : "1笔",
"notpay_total_amount" : "0元",
"overdue_count" : "0次",
"total_count" : "2笔",
"borrow_mount" : "12000元",
"failed_count" : "0笔"
},
"describe" : {
"description" : "深圳市中安信业创业投资有限公司是一家专门为个体工商户、小企业主和低收入家庭提供快速简便、无抵押无担保小额个人贷款服务的企业。公司自2004年开始探索无抵押无担保贷款, 至今累计放款全国最多,小额贷款服务的客户最多。在广东省(深圳市、佛山市),北京市,天津市,上海市,河北省,福建省,山东省,江苏省,湖南省,广西, 四川省,浙江省,河南省,湖北省,安徽省与辽宁省等五十多家便利的网点,逾千名员工专门从事小额贷款业务。中安信业是国内探索无抵押无担保商业化 可持续小额贷款最早的、累计放款量和贷款余额最多的、全国网点最多的、信贷质量最好的、运作最为规范的专业小额贷款机构。"
},
"borrower" : {
"id_no" : "610************311",
"office_scale" : "10-100人",
"nick_name" : "752002141009001",
"province" : "广东省",
"position" : "电工",
"gender" : "男",
"age" : "35",
"office_domain" : "IT",
"has_hose" : "有房产",
"car_loan" : "无",
"work_years" : "1-3年(含)",
"marriage" : "已婚",
"has_car" : "有车产",
"salary" : "20000-50000元",
"house_loan" : "无房贷",
"graduation" : "大专",
"real_name" : "田**"
}
}
}