Scrapy将抓取结果写入MongoDB数据库
爬虫抓取网页数据,一般字段很多,而且经常更新和变动,用非关系数据库会舒畅很多。 Scrapy写入MongoDB示例代码,并对比MySQL的方式。
# pipelines.py
class DoubanMovieMongoPipeline:
'''抓取的电影信息若存在则更新,不存在则插入数据库'''
collection = 'movie_detail'
def __init__(self, mongo_uri, mongo_db):
self.mongo_uri = mongo_uri
self.mongo_db = mongo_db
@classmethod
def from_crawler(cls, crawler):
return cls(
mongo_uri = crawler.settings.get('MONGO_URI'),
mongo_db = crawler.settings.get('MONGO_DB')
)
def open_spider(self, spider):
self.client = pymongo.MongoClient(self.mongo_uri)
self.db = self.client[self.mongo_db]
def close_spider(self, spider):
self.client.close()
def process_item(self, item, spider):
table = self.db[self.collection]
subject_id = item.get('subject_id') # 唯一 作为查找字段
db_item = table.find_one({'subject_id': subject_id})
if db_item:
# id = db_item.get('_id')
logger.warning('电影已经存在')
for k, v in item.items():
db_v = db_item.get(k)
if k != subject_id and v and v != db_v: # 除了subject字段,其它字段都判断是否要更新
logger.warning('更新字段{}:,原来的值:{},更新为:{}'.format(k, db_v, v))
table.update({'subject_id': subject_id}, {'$set': {k: v}})
else:
logger.warning('电影不存在,存入数据库')
table.insert_one(item)
如果采用MySQL,不但处理表结构和字段麻烦,而且代码写起来也繁杂很多,如下示例:
# pipelines.py
class DoubanMovieMySQLPipeline:
'''存在则更新,不存在插入数据库'''
def __init__(self, mysql_config):
self.mysql_config = mysql_config
@classmethod
def from_crawler(cls, crawler):
mysql_config = {
'db' : crawler.settings.get('MYSQL_DB'),
'user' : crawler.settings.get('MYSQL_USER'),
'host' : crawler.settings.get('MYSQL_HOST'),
'passwd' : crawler.settings.get('MYSQL_PASSWD'),
'port' : crawler.settings.get('MYSQL_PORT'),
'charset' : crawler.settings.get('MYSQL_CHARSET'),
}
return cls(mysql_config=mysql_config)
def process_item(self, item, spider):
con = MySQLdb.connect(**self.mysql_config)
cur = con.cursor()
logger.warning('连接打开')
name = item['name']
if name:
logger.warning('处理电影:{}'.format(name))
# 查询
select_sql = """SELECT * from movie_detail where name='{}'""".format(name)
try:
cur.execute(select_sql)
results = cur.fetchall()
if results: # 电影已经存在 更新
logger.warning('电影已经存在')
result = results[0]
res_dict = { # 可能会变的值 注意 字段的顺序不能改变
'full_name': result[2],
'score': result[3],
'count': result[4],
'is_top250': result[5],
'top_num': result[6],
'detail_url': result[8],
'movie_intro': result[9],
'imdb': result[10],
'have_seen_count': result[11],
'want_see_count': result[12],
'short_comment_count': result[13],
'comment_count': result[14],
}
keys = res_dict.keys()
for k in keys: # 检查每个key 会改变就update
if item[k] and res_dict[k] != item[k]:
logger.warning('{}字段有更新: {}'.format(k, item[k]))
update_sql = """update movie_detail set {}='{}' where name = '{}'"""\
.format(k, item[k], name)
cur.execute(update_sql)
else: # 电影不存在,插入
logger.warning('电影不存在,存入数据库')
sql = """INSERT INTO movie_detail (
name, full_name, score, count, is_top250,
top_num, detail_url, movie_intro, imdb, have_seen_count,
want_see_count, short_comment_count, comment_count
)
VALUES (
%s, %s, %s, %s, %s,
%s, %s, %s, %s, %s,
%s, %s, %s
)"""
args = (
item['name'], item['full_name'], item['score'], item['count'], item['is_top250'],
item['top_num'], item['detail_url'], item['movie_intro'], item['imdb'], item['have_seen_count'],
item['want_see_count'], item['short_comment_count'], item['comment_count']
)
cur.execute(sql, args)
except Exception as e:
logger.error(traceback.format_exc())
con.rollback()
else:
con.commit()
finally:
cur.close()
con.close()
logger.warning('连接关闭')
else: # 电影名找不到 退出
cur.close()
con.close()
logger.error('item中不存在电影名,跳过')
logger.warning('连接关闭')
看懂了代码就可以对比两个的处理逻辑的差异了。