I am looking for some example code of a SQLite pipeline in Scrapy. I know there is no built in support for it, but I'm sure it has been done. Only actual code can help me, as I only know enough Python 开发者_Python百科and Scrapy to complete my very limited task, and need the code as a starting point.
I did something like this:
#
# Author: Jay Vaughan
#
# Pipelines for processing items returned from a scrape.
# Dont forget to add pipeline to the ITEM_PIPELINES setting
# See: http://doc.scrapy.org/topics/item-pipeline.html
#
from scrapy import log
from pysqlite2 import dbapi2 as sqlite
# This pipeline takes the Item and stuffs it into scrapedata.db
class scrapeDatasqLitePipeline(object):
def __init__(self):
# Possible we should be doing this in spider_open instead, but okay
self.connection = sqlite.connect('./scrapedata.db')
self.cursor = self.connection.cursor()
self.cursor.execute('CREATE TABLE IF NOT EXISTS myscrapedata ' \
'(id INTEGER PRIMARY KEY, url VARCHAR(80), desc VARCHAR(80))')
# Take the item and put it in database - do not allow duplicates
def process_item(self, item, spider):
self.cursor.execute("select * from myscrapedata where url=?", item['url'])
result = self.cursor.fetchone()
if result:
log.msg("Item already in database: %s" % item, level=log.DEBUG)
else:
self.cursor.execute(
"insert into myscrapedata (url, desc) values (?, ?)",
(item['url'][0], item['desc'][0])
self.connection.commit()
log.msg("Item stored : " % item, level=log.DEBUG)
return item
def handle_error(self, e):
log.err(e)
Here is a sqlite pipeline with sqlalchemy. With sqlalchemy you can easily change your database if ever needed.
In settings.py
add database configuration
# settings.py
# ...
DATABASE = {
'drivername': 'sqlite',
# 'host': 'localhost',
# 'port': '5432',
# 'username': 'YOUR_USERNAME',
# 'password': 'YOUR_PASSWORD',
'database': 'books.sqlite'
}
Then in pipelines.py
add the following
# pipelines.py
import logging
from scrapy import signals
from sqlalchemy import Column, Integer, String, DateTime
from sqlalchemy import create_engine
from sqlalchemy.engine.url import URL
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import NullPool
logger = logging.getLogger(__name__)
DeclarativeBase = declarative_base()
class Book(DeclarativeBase):
__tablename__ = "books"
id = Column(Integer, primary_key=True)
title = Column('title', String)
author = Column('author', String)
publisher = Column('publisher', String)
url = Column('url', String)
scrape_date = Column('scrape_date', DateTime)
def __repr__(self):
return "<Book({})>".format(self.url)
class SqlitePipeline(object):
def __init__(self, settings):
self.database = settings.get('DATABASE')
self.sessions = {}
@classmethod
def from_crawler(cls, crawler):
pipeline = cls(crawler.settings)
crawler.signals.connect(pipeline.spider_opened, signals.spider_opened)
crawler.signals.connect(pipeline.spider_closed, signals.spider_closed)
return pipeline
def create_engine(self):
engine = create_engine(URL(**self.database), poolclass=NullPool, connect_args = {'charset':'utf8'})
return engine
def create_tables(self, engine):
DeclarativeBase.metadata.create_all(engine, checkfirst=True)
def create_session(self, engine):
session = sessionmaker(bind=engine)()
return session
def spider_opened(self, spider):
engine = self.create_engine()
self.create_tables(engine)
session = self.create_session(engine)
self.sessions[spider] = session
def spider_closed(self, spider):
session = self.sessions.pop(spider)
session.close()
def process_item(self, item, spider):
session = self.sessions[spider]
book = Book(**item)
link_exists = session.query(Book).filter_by(url=item['url']).first() is not None
if link_exists:
logger.info('Item {} is in db'.format(book))
return item
try:
session.add(book)
session.commit()
logger.info('Item {} stored in db'.format(book))
except:
logger.info('Failed to add {} to db'.format(book))
session.rollback()
raise
return item
and items.py
should look like this
#items.py
import scrapy
class BookItem(scrapy.Item):
title = scrapy.Field()
author = scrapy.Field()
publisher = scrapy.Field()
scrape_date = scrapy.Field()
You may also consider to move class Book
into items.py
If you feel comfortable with twisted's adbapi, you can take as starting point this mysql pipeline: http://github.com/darkrho/scrapy-googledir-mysql/blob/master/googledir/pipelines.py
And use this line at __init__
:
self.dbpool = adbapi.ConnectionPool("sqlite3", database="/path/sqlite.db")
The following should be compatible with:
# Python 3.6.4
# Scrapy 1.5.1
# SQLite 3.21.0+
# APSW 3.9.2.post1
If you want to use APSW, just replace sqlite as noted in the comments.
If you have your items.py
look like this:
...
class NewAdsItem(Item):
AdId = Field()
DateR = Field()
DateA = Field()
DateN = Field()
DateE = Field()
URL = Field() # AdURL
In settings.py
:
ITEM_PIPELINES = {
'adbot.pipelines.DbPipeline': 100,
}
SQLITE_FILE = 'mad.db'
SQLITE_TABLE = 'ads'
In pipelines.py
:
import os
import sqlite3 # pip install pysqlite3
#import apsw # pip install apsw
from scrapy import signals
from scrapy.conf import settings
from adbot.items import NewAdsItem # Get items from "items.py"
con = None
ikeys = None
class DbPipeline(object):
dbfile = settings.get('SQLITE_FILE') # './test.db'
dbtable = settings.get('SQLITE_TABLE')
def __init__(self):
self.setupDBCon()
self.createTables()
def setupDBCon(self):
#self.con = apsw.Connection(self.dbfile) # apsw
self.con = sqlite3.connect(self.dbfile) # sqlite3
self.cur = self.con.cursor()
def createTables(self):
self.dropDbTable()
self.createDbTable()
def dropDbTable(self):
print("Dropping old table: %s" % self.dbtable )
self.cur.execute("DROP TABLE IF EXISTS %s" % self.dbtable )
def closeDB(self):
self.con.close()
def __del__(self):
self.closeDB()
def createDbTable(self):
print("Creating new table: %s" % self.dbtable )
#------------------------------
# Construct the item strings:
#------------------------------
# NOTE: This does not work, because items.py class re-orders the items!
#self.ikeys = NewAdsItem.fields.keys()
#print("Keys in creatDbTable: \t%s" % ",".join(self.ikeys) )
#cols = " TEXT, ".join(self.ikeys) + " TEXT"
#print("cols: \t%s:" % cols, flush=True)
#------------------------------
cols = "AdId TEXT, DateR TEXT, DateA TEXT, DateN TEXT, DateE TEXT, URL TEXT"
# NOTE: Use "INSERT OR IGNORE", if you also use: "AdId TEXT NOT NULL UNIQUE"
sql = "CREATE TABLE IF NOT EXISTS %s (id INTEGER PRIMARY KEY NOT NULL, %s)" % (self.dbtable, cols )
#print (sql)
self.cur.execute(sql)
def process_item(self, item, spider):
self.storeInDb(item)
return item
def storeInDb(self, item):
# NOTE: Use "INSERT OR IGNORE", if you also use: "AdId TEXT NOT NULL UNIQUE"
# "INSERT INTO ads ( AdId, DateR, AdURL ) VALUES (?, ?, ?)"
sql = "INSERT INTO {0} ({1}) VALUES ({2})".format(self.dbtable, ','.join(item.keys()), ','.join(['?'] * len(item.keys())) )
# (item.get('AdId',''),item.get('DateR',''),item.get('AdURL',''), ...)
itkeys = ','.join(item.keys()) # item keys as a list
itvals = ','.join(item.values()) # item values as a list
ivals = tuple(item.values()) # item values as a tuple
#print (sql)
#print(" itkeys: \t%s" % itkeys, flush=True)
#print(" itvals: \t%s" % itvals, flush=True)
self.cur.execute(sql, ivals) # WARNING: Does NOT handle '[]'s ==> use: '' in spider
self.con.commit() # used in sqlite3 ONLY! (Remove for APSW)
You can then check your DB from command line with:
echo "select * from ads;" | sqlite3 -csv -header mad.db
WARNING:
Because a difference in how the
items.py
item keys are ordered when obtained viaitem.keys()
or by importing your item class directly viaself.ikeys = NewAdsItem.fields.keys()
, you will find that the first case is sorted according to the order of appearance (in the file), whereas in the second case it is alphabetically ordered. This is very sad, since it creates trouble when you're trying to create the DB tables dynamically, before having executedprocess_item()
.
For anyone trying to solve a similar problem, I just came across a nice Sqlite Item Exproter for SQLite: https://github.com/RockyZ/Scrapy-sqlite-item-exporter.
After including it to your project settings, you can use it with:
scrapy crawl <spider name> -o sqlite.db -t sqlite
It could also be adapted to be used as an Item Pipeline instead of Item Exporter.
精彩评论