Issue
I'm scraping large amount items and pipelines insert to databese one by one. It's take long time.
So, every time the pipeline receives an item, it inserts into the database. And it is not a smart way. I'm looking for a way to buffer the pipeline items and, for example, when we receive 1000 items, execute a bulk insert. How can I achieve that?
My current pipeline:
def __init__(self, **kwargs):
self.cnx = self.mysql_connect()
def open_spider(self, spider):
print("spider open")
def process_item(self, item, spider):
print("Saving item into db ...")
self.save(dict(item))
return item
def close_spider(self, spider):
self.mysql_close()
def mysql_connect(self):
try:
return mysql.connector.connect(**self.conf)
except mysql.connector.Error as err:
if err.errno == errorcode.ER_ACCESS_DENIED_ERROR:
print("Something is wrong with your user name or password")
elif err.errno == errorcode.ER_BAD_DB_ERROR:
print("Database does not exist")
else:
print(err)
def save(self, row):
cursor = self.cnx.cursor()
cursor.execute("SELECT DISTINCT product_id FROM products;")
existing_ids = [row[0] for row in cursor.fetchall()]
create_query = ("INSERT INTO " + self.table +
"(rowid, date, listing_id, product_id, product_name, price, url) "
"VALUES (%(rowid)s, %(date)s, %(listing_id)s, %(product_id)s, %(product_name)s, %(price)s, %(url)s)")
# Insert new row
cursor.execute(create_query, row)
lastRecordId = cursor.lastrowid
# Make sure data is committed to the database
self.cnx.commit()
cursor.close()
print("Item saved with ID: {}" . format(lastRecordId))
product_id = row['product_id']
if not product_id in existing_ids:
create_query = ("INSERT INTO " + self.table2 +
"(product_rowid, date, listing_id, product_id, product_name, price, url) "
"VALUES (%(rowid)s, %(date)s, %(listing_id)s, %(product_id)s, %(product_name)s, %(price)s, %(url)s)")
new_cursor = self.cnx.cursor()
new_cursor.execute(create_query, row)
lastRecordId = cursor.lastrowid
self.cnx.commit()
new_cursor.close()
print("New Item saved with ID: {}" . format(lastRecordId))
def mysql_close(self):
self.cnx.close()
Solution
You can just add a cache in the pipeline constructor and execute a method that caches items as they are processed and then runs the bulk save once some threshold has been reached in the number of rows cached. Then as the spider exits it can bulk save whatever is left in the cache that hasn't been saved yet.
I created an example below to demonstrate this strategy, however I didn't make any alterations to your sql
code, so the code still saves the items one by one to the database, just all at the same time. I am sure there is room for further performance improvements with some changes to your sql
code as well.
class Pipeline:
def __init__(self, **kwargs):
self._rows = [] # store rows temporarily
self._cached_rows = 0 # number of cached rows
self._cache_limit = 1000 # limit before saving to database
self.cnx = self.mysql_connect()
def open_spider(self, spider):
print("spider open")
def save_all(self): # calls self.save method for all cached rows
if len(self._rows) > 0:
list(map(self.save, self._rows))
self._cached_rows = 0 # reset the count
self._rows = [] # reset the cache
def cache_result(self, item): # adds new row to cache
self._rows.append(dict(item))
self._cached_rows += 1
if self._cached_rows >= self._cache_limit: # checks if limit reached
self.save_all() # if it has been reached then save all rows
def process_item(self, item, spider):
print("Saving item into db ...")
self.cache_result(item) # cache this item
return item
def close_spider(self, spider):
self.save_all() # Saves remaining rows once spider closes
self.cnx.close()
def mysql_connect(self):
try:
return mysql.connector.connect(**self.conf)
except mysql.connector.Error as err:
if err.errno == errorcode.ER_ACCESS_DENIED_ERROR:
print("Something is wrong with your user name or password")
elif err.errno == errorcode.ER_BAD_DB_ERROR:
print("Database does not exist")
else:
print(err)
def save(self, row):
cursor = self.cnx.cursor()
cursor.execute("SELECT DISTINCT product_id FROM products;")
existing_ids = [row[0] for row in cursor.fetchall()]
create_query = ("INSERT INTO " + self.table +
"(rowid, date, listing_id, product_id, product_name, price, url) "
"VALUES (%(rowid)s, %(date)s, %(listing_id)s, %(product_id)s, %(product_name)s, %(price)s, %(url)s)")
# Insert new row
cursor.execute(create_query, row)
lastRecordId = cursor.lastrowid
# Make sure data is committed to the database
self.cnx.commit()
cursor.close()
print("Item saved with ID: {}" . format(lastRecordId))
product_id = row['product_id']
if not product_id in existing_ids:
create_query = ("INSERT INTO " + self.table2 +
"(product_rowid, date, listing_id, product_id, product_name, price, url) "
"VALUES (%(rowid)s, %(date)s, %(listing_id)s, %(product_id)s, %(product_name)s, %(price)s, %(url)s)")
new_cursor = self.cnx.cursor()
new_cursor.execute(create_query, row)
lastRecordId = cursor.lastrowid
self.cnx.commit()
new_cursor.close()
print("New Item saved with ID: {}" . format(lastRecordId))
Answered By - Alexander
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.