# Define your item pipelines here # # Don't forget to add your pipeline to the ITEM_PIPELINES setting # See: https://docs.scrapy.org/en/latest/topics/item-pipeline.html import copy import scrapy import random from twisted.enterprise import adbapi from twisted.internet import reactor import pymysql # useful for handling different item types with a single interface from itemadapter import ItemAdapter import json from scrapy.utils.project import get_project_settings import pymongo from twisted.enterprise import adbapi from DBUtils.PooledDB import PooledDB from demo1.Util import Asyninser from demo1.db_utils import MysqlUtil from scrapy.utils.project import get_project_settings import logging class Demo1Pipeline: def process_item(self, item, spider): return item class kejitingPipeline: def __init__(self): self.file = open('items.json', 'wb') print('我是写入科技厅管道') self.file.write('['.encode('utf-8')) def process_item(self, item, spider): line = json.dumps(dict(item)) + ",\n" self.file.write(line.encode('utf-8')) return item def close_spider(self, spider): self.file.write(']'.encode('utf-8')) self.file.close() class MongoDBPipeline(object): def __init__(self): settings=get_project_settings() # 链接数据库 self.client = pymongo.MongoClient(host=settings['MONGO_HOST'], port=settings['MONGO_PORT']) # 数据库登录需要帐号密码的话 # self.client.admin.authenticate(settings['MINGO_USER'], settings['MONGO_PSW']) self.db = self.client[settings['MONGO_DB']] # 获得数据库的句柄 self.coll = self.db[settings['MONGO_COLL']] # 获得collection的句柄 def process_item(self, item, spider): postItem = dict(item) # 把item转化成字典形式 self.coll.insert(postItem) # 向数据库插入一条记录 return item # 会在控制台输出原item数据,可以选择不写 class SpiderSuningBookPipeline(object): def process_item(self, item, spider): # collection.insert(dict(item)) # try: # sql1_after='insert into t_policy set ' # # self.cursor.execute() return item def open_spider(self, spider): # 连接数据库 self.connect = pymysql.connect( host='127.0.0.1', port=3306, db='study', user='root', passwd='123456', charset='utf8', use_unicode=True) # 通过cursor执行增删查改 self.cursor = self.connect.cursor() self.connect.autocommit(True) def close_spider(self, spider): self.cursor.close() self.connect.close() class MysqlPipline(object): pool = None #打开spider的时候执行 def open_spider(self, spider): self.pool = MysqlUtil() def process_item(self, item, spider): try: # 执行sql语句 # sql = "select * from torrent_ye" # count = self.pool.get_all(sql, None) # print('查询数量为:' + str(count)) # 先去数据库查询,查到了就不入库了 # sql_select = """select count(1) from torrent_ye where torrent_url = %(torrent_url)s""" # params_select = {'torrent_url': item['torrent_url']} # flag = self.pool.get_count(sql_select, params_select) # if flag > 0: # logging.info('记录已经存在:[%s][%s]', item['torrent_title'], item['torrent_url']) # return # # sql_insert = """insert into torrent_ye(torrent_title, torrent_name, torrent_director, # torrent_actor, torrent_language, torrent_type, torrent_region, torrent_update_time, # torrent_status, torrent_show_time, torrent_introduction, torrent_url) values # (%(torrent_title)s,%(torrent_name)s,%(torrent_director)s,%(torrent_actor)s,%(torrent_language)s, # %(torrent_type)s,%(torrent_region)s,%(torrent_update_time)s,%(torrent_status)s,%(torrent_show_time)s,%(torrent_introduction)s,%(torrent_url)s)""" # # params = {'torrent_title': item['torrent_title'], 'torrent_name': item['torrent_name'], # 'torrent_director': item['torrent_director'], 'torrent_actor': item['torrent_actor'], # 'torrent_language': item['torrent_language'], 'torrent_type': item['torrent_type'], # 'torrent_region': item['torrent_region'], 'torrent_update_time': item['torrent_update_time'], # 'torrent_status': item['torrent_status'], 'torrent_show_time': item['torrent_show_time'], # 'torrent_introduction': item['torrent_introduction'], 'torrent_url': item['torrent_url']} print(item) copy_item = copy.deepcopy(item) #先查t_area表得short字段 获得id字段 sel_sql1='select id from t_area where area_short like "%{}%" ' insert_sql='insert into ' self.pool.end("commit") except Exception as e: logging.error('发生插入异常:[%s]', e) self.pool.end("rollback") def close_spider(self,spider): pass class ProcessMysqlPipeline(object): def __init__(self, dbpool): self.dbpool = dbpool @classmethod def from_settings(cls, settings): # 需要在setting中设置数据库配置参数 dbparms = dict( host=settings['MYSQL_HOST'], db=settings['MYSQL_DBNAME'], user=settings['MYSQL_USER'], passwd=settings['MYSQL_PASSWORD'], charset='utf8', cursorclass=pymysql.cursors.DictCursor, use_unicode=True, ) # 连接ConnectionPool(使用MySQLdb连接,或者pymysql) dbpool = adbapi.ConnectionPool("MySQLdb", **dbparms) # **让参数变成可变化参数 return cls(dbpool) # 返回实例化对象 def process_item(self, item, spider): # 使用twisted将MySQL插入变成异步执行 query = self.dbpool.runInteraction(self.do_insert, item) # 添加异常处理 query.addCallback(self.handle_error) def handle_error(self, failure): # 处理异步插入时的异常 logging.info(failure) def do_insert(self, cursor, item): # 执行具体的插入 insert_sql = """ insert into jobbole_artitle(name, base_url, date, comment) VALUES (%s, %s, %s, %s) """ cursor.execute(insert_sql, (item['name'], item['base_url'], item['date'], item['coment'],)) #先走的是类方法,所以这里比初始话里面的语句要快的多。 #这里我们整个方法都用得是异步得方式得到得链接。 #查策网的 class MysqlYiBUPipeline(object): def __init__(self, dbpool): self.dbpool = dbpool @classmethod def from_settings(cls,settings): # 函数名固定,会被scrapy调用,直接可用settings的值 """ 数据库建立连接 :param settings: 配置参数 :return: 实例化参数 """ adbparams = dict( host=settings['MYSQL_HOST'], db=settings['MYSQL_DATABASE'], user=settings['MYSQL_USER'], password=settings['MYSQL_PASSWORD'], cursorclass=pymysql.cursors.DictCursor # 指定cursor类型 ) # 连接数据池ConnectionPool,使用pymysql或者Mysqldb连接 dbpool = adbapi.ConnectionPool('pymysql', **adbparams) # 返回实例化参数 return cls(dbpool) def get_date(self,items): print(items) # res=self.dbpool.runInteraction(self.col1,items) # res.addCallback(self.col1,items) print(self.dbpool.con) res=self.dbpool.runQuery('select count(*) from t_policy').addCallback(self.col1,items) print(res) print('-----') def col1(self,cursor,items): print('*****') print(items) def process_item(self, item, spider): """ 使用twisted将MySQL插入变成异步执行。通过连接池执行具体的sql操作,返回一个对象 """ asynItem = copy.deepcopy(item) query = self.dbpool.runInteraction(self.do_insert, asynItem) # 指定操作方法和操作数据 # 添加异常处理 query.addErrback(self.handle_error) # 处理异常 return item def do_insert(self, cursor, item): # 对数据库进行插入操作,并不需要commit,twisted会自动commit #下面这个是一个模拟数据 # item={'biaoqian': ['科研立项'], # 'biaoti': '2019年度专项项目川藏铁路重大基础科学问题项目指南', # 'jianjie': '国家自然科学基金委员会现启动专项项目,针对高原深部地球系统过程大断面,开展探索深部-地表内外动力耦合作用致灾机制与铁路工程灾害风险的基础科学问题研究。', # 'laiyuan': '国家自然科学基金委员会', # 'leixing': '申报指南', # 'lianjie': 'https://www.chacewang.com/news/NewsDetail/40341', # 'shijian': '2019-10-24', # 'xiangqing': '
\n', # 'wenjian':[ # 'name1','name2','name3','name1','name2','name3','name1','name2','name3' # ] # } #logging.info(item) if item['leixing']=='申报通知': item['leixing']=str(1) elif item['leixing']=='公示公告': item['leixing']=str(2) elif item['leixing']=='政策动态': item['leixing']=str(3) elif item['leixing']=='申报指南': item['leixing']=str(4) elif item['leixing']=='政策问答': item['leixing']=str(5) elif item['leixing']=='高企政策': item['leixing']=str(11) elif item['leixing']=='查策网新闻': item['leixing']=str(9) else: item['leixing']='0' sel_sql=''' select id from t_area where area_short is not null and area_short = '%s' ''' % item["laiyuan"] cursor.execute(sel_sql) result1=cursor.fetchall() if len(result1)==0: insert_sql = ''' insert into t_area(area_name,area_short,area_status,area_parent_id,area_type) values('%s','%s','%s','%s','%s') ''' % (str(item["laiyuan"]), str(item["laiyuan"]), str(1), str(1000000), str(1)) cursor.execute(insert_sql) cursor.execute(sel_sql) result1 = cursor.fetchall() laiyuan_id=result1[0].get('id') insert_sql2=''' insert into t_policy(title,title_url,img_url,publish_depart_id,publish_time,type,content,intro) values('%s','%s','%s','%s','%s','%s','%s','%s') ''' % (str(item["biaoti"]),str(item['lianjie']),get_project_settings().get('TITLE_IMAGE')+str(random.randint(0,9))+'.png',str(laiyuan_id),item["shijian"],item["leixing"],item["xiangqing"],item["jianjie"]) sel_sql2=''' select id from t_policy where title_url='%s' '''% (item["lianjie"]) cursor.execute(insert_sql2) cursor.execute(sel_sql2) result2 = cursor.fetchall() xinwen_id=result2[-1].get('id') for dange_biaoqian in item['biaoqian']: insert_sql3=''' insert into t_policy_label(policy_id,label_name) values('%s','%s') '''% (str(xinwen_id),str(dange_biaoqian)) cursor.execute(insert_sql3) if item.get('wenjian') is not None: down_list_num = int(len(item.get('wenjian')) / 3) insert_sql4 = 'insert into t_policy_file_crawl(policy_id,file_name,file_url,file_location) values' + \ str((('("' + str(xinwen_id) + '","{}","{}","{}"),') * down_list_num).rstrip(',')).format(*item.get('wenjian')) cursor.execute(insert_sql4) logging.info('插入完成') def handle_error(self, failure): if failure: # 打印错误信息 logging.info('----------数据库插入异常信息--------') logging.info(failure) logging.info('---------异常信息结束--------') def close_spider(self, spider): logging.info('爬虫运行完毕了') #科学技术部的 class kexujishubuPipeline(object): def __init__(self, dbpool): self.dbpool = dbpool @classmethod def from_settings(cls,settings): # 函数名固定,会被scrapy调用,直接可用settings的值 """ 数据库建立连接 :param settings: 配置参数 :return: 实例化参数 """ adbparams = dict( host=settings['MYSQL_HOST'], db=settings['MYSQL_DATABASE'], user=settings['MYSQL_USER'], password=settings['MYSQL_PASSWORD'], cursorclass=pymysql.cursors.DictCursor # 指定cursor类型 ) # 连接数据池ConnectionPool,使用pymysql或者Mysqldb连接 dbpool = adbapi.ConnectionPool('pymysql', **adbparams) # 返回实例化参数 return cls(dbpool) def process_item(self, item, spider): """ 使用twisted将MySQL插入变成异步执行。通过连接池执行具体的sql操作,返回一个对象 """ asynItem = copy.deepcopy(item) query = self.dbpool.runInteraction(self.do_insert, asynItem) # 指定操作方法和操作数据 # 添加异常处理 query.addErrback(self.handle_error) # 处理异常 return item def do_insert(self, cursor, item): # 对数据库进行插入操作,并不需要commit,twisted会自动commit #下面这个是一个模拟数据 #logging.info(item) sel_sql=''' select id from t_area where area_short is not null and area_short = '%s' ''' % item["laiyuan"] cursor.execute(sel_sql) result1=cursor.fetchall() if len(result1)==0: insert_sql = ''' insert into t_area(area_name,area_short,area_status,area_parent_id,area_type) values('%s','%s','%s','%s','%s') ''' % (str(item["laiyuan"]), str(item["laiyuan"]), str(1), str(1000000), str(1)) cursor.execute(insert_sql) cursor.execute(sel_sql) result1 = cursor.fetchall() laiyuan_id=result1[0].get('id') item["jianjie"] = '_' insert_sql2=''' insert into t_policy(title,title_url,img_url,publish_depart_id,publish_time,content,intro) values('%s','%s','%s','%s','%s','%s','%s') ''' % (str(item["biaoti"]),str(item['lianjie']),get_project_settings().get('TITLE_IMAGE')+str(random.randint(0,9))+'.png',str(laiyuan_id),item["shijian"],pymysql.escape_string(item["xiangqing"]),item["jianjie"]) sel_sql2=''' select id from t_policy where title_url='%s' '''% (item["lianjie"]) cursor.execute(insert_sql2) cursor.execute(sel_sql2) result2 = cursor.fetchall() xinwen_id=result2[-1].get('id') item['biaoqian']=['_'] for dange_biaoqian in item['biaoqian']: insert_sql3=''' insert into t_policy_label(policy_id,label_name) values('%s','%s') '''% (str(xinwen_id),str(dange_biaoqian)) cursor.execute(insert_sql3) if item.get('wenjian') is not None: b = [] for a in item.get('wenjian'): b.append(a['file_name']) b.append(a['file_url']) b.append(a['new_file']) down_list_num = len(item.get('wenjian')) insert_sql4 = 'insert into t_policy_file_crawl(policy_id,file_name,file_url,file_location) values' + \ str((('("' + str(xinwen_id) + '","{}","{}","{}"),') * down_list_num).rstrip(',')).format(*b) cursor.execute(insert_sql4) def handle_error(self, failure): if failure: # 打印错误信息 logging.info('----------数据库插入异常信息--------') logging.info(failure) logging.info('---------异常信息结束--------') def close_spider(self, spider): logging.info('爬虫运行完毕了') #工业和信息化部 class gongyehexinxihuabuPipline(Asyninser): def __init__(self,dbpool): self.dbpool=dbpool def do_insert(self, cursor, item): sel_sql = ''' select id from t_area where area_short is not null and area_short = '%s' ''' % item["laiyuan"] cursor.execute(sel_sql) result1 = cursor.fetchall() if len(result1)==0: insert_sql = ''' insert into t_area(area_name,area_short,area_status,area_parent_id,area_type) values('%s','%s','%s','%s','%s') ''' % (str(item["laiyuan"]), str(item["laiyuan"]), str(1), str(1000000), str(1)) cursor.execute(insert_sql) cursor.execute(sel_sql) result1 = cursor.fetchall() laiyuan_id = result1[0].get('id') item["jianjie"] = '_' insert_sql2 = ''' insert into t_policy(title,title_url,img_url,publish_depart_id,publish_time,content,intro) values('%s','%s','%s','%s','%s','%s','%s') ''' % (str(item["biaoti"]), str(item['lianjie']), get_project_settings().get('TITLE_IMAGE') + str(random.randint(0, 9)) + '.png', str(laiyuan_id), item["shijian"], pymysql.escape_string(item["xiangqing"]), item["jianjie"]) sel_sql2 = ''' select id from t_policy where title_url='%s' ''' % (item["lianjie"]) cursor.execute(insert_sql2) cursor.execute(sel_sql2) result2 = cursor.fetchall() xinwen_id = result2[-1].get('id') item['biaoqian'] = ['_'] for dange_biaoqian in item['biaoqian']: insert_sql3 = ''' insert into t_policy_label(policy_id,label_name) values('%s','%s') ''' % (str(xinwen_id), str(dange_biaoqian)) cursor.execute(insert_sql3) if item.get('wenjian') is not None: b = [] for a in item.get('wenjian'): b.append(a['file_name']) b.append(a['file_url']) b.append(a['new_file']) down_list_num = len(item.get('wenjian')) insert_sql4 = 'insert into t_policy_file_crawl(policy_id,file_name,file_url,file_location) values' + \ str((('("' + str(xinwen_id) + '","{}","{}","{}"),') * down_list_num).rstrip(',')).format(*b) cursor.execute(insert_sql4) #国家自然科学基金委员会 and 火炬中心 class ziranweiyuanhuiPipline(Asyninser): def __init__(self,dbpool): self.dbpool=dbpool def do_insert(self, cursor, item): sel_sql = ''' select id from t_area where (area_short is not null and area_short = '%s') or (area_name = '%s') ''' % (item["laiyuan"],item["laiyuan"]) cursor.execute(sel_sql) result1 = cursor.fetchall() if len(result1) == 0: insert_sql = ''' insert into t_area(area_name,area_short,area_status,area_parent_id,area_type) values('%s','%s','%s','%s','%s') ''' % (str(item["laiyuan"]), str(item["laiyuan"]), str(1), str(1000000), str(1)) cursor.execute(insert_sql) cursor.execute(sel_sql) result1 = cursor.fetchall() laiyuan_id = result1[0].get('id') item["jianjie"] = '_' insert_sql2 = ''' insert into t_policy(title,title_url,img_url,publish_depart_id,publish_time,content,intro) values('%s','%s','%s','%s','%s','%s','%s') ''' % (str(item["biaoti"]), str(item['lianjie']), get_project_settings().get('TITLE_IMAGE') + str( random.randint(0, 9)) + '.png', str(laiyuan_id), item["shijian"], pymysql.escape_string(item["xiangqing"]), item["jianjie"]) sel_sql2 = ''' select id from t_policy where title_url='%s' ''' % (item["lianjie"]) cursor.execute(insert_sql2) cursor.execute(sel_sql2) result2 = cursor.fetchall() xinwen_id = result2[-1].get('id') item['biaoqian'] = ['_'] for dange_biaoqian in item['biaoqian']: insert_sql3 = ''' insert into t_policy_label(policy_id,label_name) values('%s','%s') ''' % (str(xinwen_id), str(dange_biaoqian)) cursor.execute(insert_sql3) if item.get('wenjian') is not None: b = [] for a in item.get('wenjian'): b.append(a['file_name']) b.append(a['file_url']) b.append(a['new_file']) down_list_num = len(item.get('wenjian')) insert_sql4 = 'insert into t_policy_file_crawl(policy_id,file_name,file_url,file_location) values' + \ str((('("' + str(xinwen_id) + '","{}","{}","{}"),') * down_list_num).rstrip(',')).format(*b) cursor.execute(insert_sql4)