You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

280 lines
12 KiB

5 years ago
# Define your item pipelines here
#
# Don't forget to add your pipeline to the ITEM_PIPELINES setting
# See: https://docs.scrapy.org/en/latest/topics/item-pipeline.html
import copy
import scrapy
import random
from twisted.enterprise import adbapi
from twisted.internet import reactor
import pymysql
# useful for handling different item types with a single interface
from itemadapter import ItemAdapter
import json
from scrapy.utils.project import get_project_settings
import pymongo
from twisted.enterprise import adbapi
from DBUtils.PooledDB import PooledDB
from demo1.db_utils import MysqlUtil
from scrapy.utils.project import get_project_settings
import logging
class Demo1Pipeline:
def process_item(self, item, spider):
return item
class kejitingPipeline:
def __init__(self):
self.file = open('items.json', 'wb')
print('我是写入科技厅管道')
self.file.write('['.encode('utf-8'))
def process_item(self, item, spider):
line = json.dumps(dict(item)) + ",\n"
self.file.write(line.encode('utf-8'))
return item
def close_spider(self, spider):
self.file.write(']'.encode('utf-8'))
self.file.close()
class MongoDBPipeline(object):
def __init__(self):
settings=get_project_settings()
# 链接数据库
self.client = pymongo.MongoClient(host=settings['MONGO_HOST'], port=settings['MONGO_PORT'])
# 数据库登录需要帐号密码的话
# self.client.admin.authenticate(settings['MINGO_USER'], settings['MONGO_PSW'])
self.db = self.client[settings['MONGO_DB']] # 获得数据库的句柄
self.coll = self.db[settings['MONGO_COLL']] # 获得collection的句柄
def process_item(self, item, spider):
postItem = dict(item) # 把item转化成字典形式
self.coll.insert(postItem) # 向数据库插入一条记录
return item # 会在控制台输出原item数据,可以选择不写
class SpiderSuningBookPipeline(object):
def process_item(self, item, spider):
# collection.insert(dict(item))
# try:
# sql1_after='insert into t_policy set '
#
# self.cursor.execute()
return item
def open_spider(self, spider):
# 连接数据库
self.connect = pymysql.connect(
host='127.0.0.1',
port=3306,
db='study',
user='root',
passwd='123456',
charset='utf8',
use_unicode=True)
# 通过cursor执行增删查改
self.cursor = self.connect.cursor()
self.connect.autocommit(True)
def close_spider(self, spider):
self.cursor.close()
self.connect.close()
class MysqlPipline(object):
pool = None
#打开spider的时候执行
def open_spider(self, spider):
self.pool = MysqlUtil()
def process_item(self, item, spider):
try:
# 执行sql语句
# sql = "select * from torrent_ye"
# count = self.pool.get_all(sql, None)
# print('查询数量为:' + str(count))
# 先去数据库查询,查到了就不入库了
# sql_select = """select count(1) from torrent_ye where torrent_url = %(torrent_url)s"""
# params_select = {'torrent_url': item['torrent_url']}
# flag = self.pool.get_count(sql_select, params_select)
# if flag > 0:
# logging.info('记录已经存在:[%s][%s]', item['torrent_title'], item['torrent_url'])
# return
#
# sql_insert = """insert into torrent_ye(torrent_title, torrent_name, torrent_director,
# torrent_actor, torrent_language, torrent_type, torrent_region, torrent_update_time,
# torrent_status, torrent_show_time, torrent_introduction, torrent_url) values
# (%(torrent_title)s,%(torrent_name)s,%(torrent_director)s,%(torrent_actor)s,%(torrent_language)s,
# %(torrent_type)s,%(torrent_region)s,%(torrent_update_time)s,%(torrent_status)s,%(torrent_show_time)s,%(torrent_introduction)s,%(torrent_url)s)"""
#
# params = {'torrent_title': item['torrent_title'], 'torrent_name': item['torrent_name'],
# 'torrent_director': item['torrent_director'], 'torrent_actor': item['torrent_actor'],
# 'torrent_language': item['torrent_language'], 'torrent_type': item['torrent_type'],
# 'torrent_region': item['torrent_region'], 'torrent_update_time': item['torrent_update_time'],
# 'torrent_status': item['torrent_status'], 'torrent_show_time': item['torrent_show_time'],
# 'torrent_introduction': item['torrent_introduction'], 'torrent_url': item['torrent_url']}
print(item)
copy_item = copy.deepcopy(item)
#先查t_area表得short字段 获得id字段
sel_sql1='select id from t_area where area_short like "%{}%" '
insert_sql='insert into '
self.pool.end("commit")
except Exception as e:
logging.error('发生插入异常:[%s]', e)
self.pool.end("rollback")
def close_spider(self,spider):
pass
class ProcessMysqlPipeline(object):
def __init__(self, dbpool):
self.dbpool = dbpool
@classmethod
def from_settings(cls, settings):
# 需要在setting中设置数据库配置参数
dbparms = dict(
host=settings['MYSQL_HOST'],
db=settings['MYSQL_DBNAME'],
user=settings['MYSQL_USER'],
passwd=settings['MYSQL_PASSWORD'],
charset='utf8',
cursorclass=pymysql.cursors.DictCursor,
use_unicode=True,
)
# 连接ConnectionPool(使用MySQLdb连接,或者pymysql)
dbpool = adbapi.ConnectionPool("MySQLdb", **dbparms) # **让参数变成可变化参数
return cls(dbpool) # 返回实例化对象
def process_item(self, item, spider):
# 使用twisted将MySQL插入变成异步执行
query = self.dbpool.runInteraction(self.do_insert, item)
# 添加异常处理
query.addCallback(self.handle_error)
def handle_error(self, failure):
# 处理异步插入时的异常
5 years ago
logging.info(failure)
5 years ago
def do_insert(self, cursor, item):
# 执行具体的插入
insert_sql = """
insert into jobbole_artitle(name, base_url, date, comment)
VALUES (%s, %s, %s, %s)
"""
cursor.execute(insert_sql, (item['name'], item['base_url'], item['date'], item['coment'],))
#先走的是类方法,所以这里比初始话里面的语句要快的多。
#这里我们整个方法都用得是异步得方式得到得链接。
class MysqlYiBUPipeline(object):
def __init__(self, dbpool):
self.dbpool = dbpool
@classmethod
def from_settings(cls,settings): # 函数名固定,会被scrapy调用,直接可用settings的值
"""
数据库建立连接
:param settings: 配置参数
:return: 实例化参数
"""
adbparams = dict(
host=settings['MYSQL_HOST'],
db=settings['MYSQL_DATABASE'],
user=settings['MYSQL_USER'],
password=settings['MYSQL_PASSWORD'],
cursorclass=pymysql.cursors.DictCursor # 指定cursor类型
)
# 连接数据池ConnectionPool,使用pymysql或者Mysqldb连接
dbpool = adbapi.ConnectionPool('pymysql', **adbparams)
# 返回实例化参数
return cls(dbpool)
def get_date(self,items):
print(items)
# res=self.dbpool.runInteraction(self.col1,items)
# res.addCallback(self.col1,items)
print(self.dbpool.con)
res=self.dbpool.runQuery('select count(*) from t_policy').addCallback(self.col1,items)
print(res)
print('-----')
def col1(self,cursor,items):
print('*****')
print(items)
def process_item(self, item, spider):
"""
使用twisted将MySQL插入变成异步执行通过连接池执行具体的sql操作返回一个对象
"""
asynItem = copy.deepcopy(item)
query = self.dbpool.runInteraction(self.do_insert, asynItem) # 指定操作方法和操作数据
# 添加异常处理
query.addErrback(self.handle_error) # 处理异常
def do_insert(self, cursor, item):
# 对数据库进行插入操作,并不需要commit,twisted会自动commit
#下面这个是一个模拟数据
# item={'biaoqian': ['科研立项'],
# 'biaoti': '2019年度专项项目川藏铁路重大基础科学问题项目指南',
# 'jianjie': '国家自然科学基金委员会现启动专项项目,针对高原深部地球系统过程大断面,开展探索深部-地表内外动力耦合作用致灾机制与铁路工程灾害风险的基础科学问题研究。',
# 'laiyuan': '国家自然科学基金委员会',
# 'leixing': '申报指南',
# 'lianjie': 'https://www.chacewang.com/news/NewsDetail/40341',
# 'shijian': '2019-10-24',
5 years ago
# 'xiangqing': '<div >\n',
# 'wenjian':[
# 'name1','name2','name3','name1','name2','name3','name1','name2','name3'
# ]
5 years ago
# }
5 years ago
logging.info(item)
5 years ago
if item['leixing']=='申报通知':
item['leixing']=str(1)
elif item['leixing']=='公示公告':
item['leixing']=str(2)
elif item['leixing']=='政策动态':
item['leixing']=str(3)
elif item['leixing']=='申报指南':
item['leixing']=str(4)
elif item['leixing']=='政策问答':
item['leixing']=str(5)
elif item['leixing']=='高企政策':
item['leixing']=str(11)
elif item['leixing']=='查策网新闻':
item['leixing']=str(9)
else:
item['leixing']='0'
sel_sql='''
select id from t_area where area_short is not null and area_short = '%s'
''' % item["laiyuan"]
cursor.execute(sel_sql)
result1=cursor.fetchall()
if len(result1)==0:
insert_sql = '''
insert into t_area(area_name,area_short,area_status,area_parent_id,area_type) values('%s','%s','%s','%s','%s')
''' % (str(item["laiyuan"]), str(item["laiyuan"]), str(1), str(1000000), str(1))
cursor.execute(insert_sql)
cursor.execute(sel_sql)
result1 = cursor.fetchall()
laiyuan_id=result1[0].get('id')
insert_sql2='''
insert into t_policy(title,title_url,img_url,publish_depart_id,publish_time,type,content,intro)
values('%s','%s','%s','%s','%s','%s','%s','%s')
5 years ago
''' % (str(item["biaoti"]),str(item['lianjie']),get_project_settings().get('TITLE_IMAGE')+str(random.randint(0,9))+'.png',str(laiyuan_id),item["shijian"],item["leixing"],item["xiangqing"],item["jianjie"])
5 years ago
sel_sql2='''
select id from t_policy where title_url='%s'
'''% (item["lianjie"])
cursor.execute(insert_sql2)
cursor.execute(sel_sql2)
result2 = cursor.fetchall()
5 years ago
xinwen_id=result2[-1].get('id')
5 years ago
for dange_biaoqian in item['biaoqian']:
insert_sql3='''
insert into t_policy_label(policy_id,label_name) values('%s','%s')
'''% (str(xinwen_id),str(dange_biaoqian))
cursor.execute(insert_sql3)
if item.get('wenjian') is not None:
down_list_num = int(len(item.get('wenjian')) / 3)
5 years ago
insert_sql4 = 'insert into t_policy_file_crawl(policy_id,file_name,file_url,file_location) values' + \
str((('("' + str(xinwen_id) + '","{}","{}","{}"),') * down_list_num).rstrip(',')).format(*item.get('wenjian'))
5 years ago
cursor.execute(insert_sql4)
5 years ago
logging.info('插入完成')
5 years ago
def handle_error(self, failure):
if failure:
# 打印错误信息
5 years ago
logging.info('数据库插入异常信息--------:'+failure)
5 years ago
def close_spider(self, spider):
5 years ago
logging.info('爬虫运行完毕了')