Issue
I'm trying to create a manager for my spiders and record the stats from each crawl job to a sqlite db, unfortunately I can't manage to run the crawlers with CrawlerProcess from a separate python script. I've been looking for possible answers but there is nothing similar/recent out there.
import datetime
import os
from spiders import MySpider # example
from sqlalchemy import create_engine, Column, Integer, Float, DateTime, String
from sqlalchemy.orm import declarative_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy.inspection import inspect
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor
from scrapy.crawler import CrawlerProcess
from scrapy.utils.project import get_project_settings
abspath = os.path.abspath(__file__)
dname = os.path.dirname(abspath)
os.chdir(dname)
jobstore = {
'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
# Set up a thread pool executor
executor = {
'default': ThreadPoolExecutor(10)
}
# Create the scheduler object with the jobstore and executor
scheduler = BlockingScheduler(jobstores=jobstore, executors=executor)
# Create the SQLAlchemy engine and session factory
engine = create_engine('sqlite:///stats.sqlite')
Session = sessionmaker(bind=engine)
# Define the database model for the spider stats
Base = declarative_base()
class SpiderStats(Base):
__tablename__ = 'spider_stats'
id = Column(Integer, primary_key=True)
spider_name = Column(String)
request_count = Column(Integer)
response_count = Column(Integer)
item_count = Column(Integer)
elapsed_time_seconds = Column(Float)
start_time = Column(DateTime)
# Check if the table already exists
inspector = inspect(engine)
if not inspector.has_table('spider_stats'):
# Create the table if it doesn't exist
Base.metadata.create_all(engine)
def extract_spider_stats(spider):
# Extract the spider stats
stats = spider.crawler.stats.get_stats()
# Create a new database record with the stats data
session = Session()
spider_stats = SpiderStats(
spider_name=spider.name,
request_count=stats.get('downloader/request_count', 0),
response_count=stats.get('downloader/response_count', 0),
item_count=stats.get('item_scraped_count', 0),
elapsed_time_seconds=stats.get('elapsed_time_seconds', 0),
start_time=datetime.datetime.now()
)
session.add(spider_stats)
session.commit()
session.close()
# Print the spider stats to the console
print('SPIDER STATS: ', stats)
def run_scrapy_spider(spider_name):
process = CrawlerProcess(get_project_settings())
crawler = process.create_crawler(spider_name)
process.crawl(crawler)
process.start()
process.join()
stats_obj = crawler.stats
stats_dict = stats_obj.get_stats()
print (stats_dict)
scheduler.add_job(run_scrapy_spider, 'interval', seconds=30, args=[MySpider])
scheduler.start()
I'm currently not trying to integrate the db yet because I can't get the crawler to run with CrawlerProcess. I would just like the job to run and print the stats to start with. crawl_manager.py
is located in the scrapy folder the structure is:
scrapy/ <- directory I'm trying to run jobs from.
scrapy/
spiders/
myspider.py
...
...
crawl_manager.py
...
from the current directory I can run scrapy crawl myspider
(class MySpider, name="myspider") and it runs without issues.
When trying to run the manager:
ValueError: signal only works in main thread of the main interpreter
I don't have anything other than the spider running concurrently and would prefer to use CrawlerProcess than CrawlerRunner.
Tried process.start(install_signal_handlers=False)
2023-03-22 15:35:27 [apscheduler.executors.default] ERROR: Job "run_scrapy_spider (trigger: interval[0:00:30], next run at: 2023-03-22 15:35:57 CST)" raised an exception
process.start(install_signal_handlers=False) # <- just add this parameter.
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/x/.pyenv/versions/x/lib/python3.11/site-packages/scrapy/crawler.py", line 383, in start
reactor.run(installSignalHandlers=False) # blocking call
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/x/.pyenv/versions/x/lib/python3.11/site-packages/twisted/internet/asyncioreactor.py", line 254, in run
self.startRunning(installSignalHandlers=installSignalHandlers)
File "/Users/x/.pyenv/versions/x/lib/python3.11/site-packages/twisted/internet/base.py", line 1299, in startRunning
ReactorBase.startRunning(cast(ReactorBase, self))
File "/Users/x/.pyenv/versions/x/lib/python3.11/site-packages/twisted/internet/base.py", line 843, in startRunning
raise error.ReactorNotRestartable()
twisted.internet.error.ReactorNotRestartable
Solution
It seems that since APScheduler runs the jobs as subprocesses the jobs don't run in the main thread from the manager and this creates a problem with the CrawlerProcess.
A working solution is to use a PIPE in a simple run subprocess and then parsing the output to get the stats. I've tested this with multiple spiders and works without issues.
def run_scrapy_spider(spider_name):
print (f'Job started for {spider_name} at {datetime.now()}')
p = run(['scrapy', 'crawl', spider_name], stdout=PIPE, stderr=PIPE)
output = str(p)
...
output is then a string where I regexed the json object with the crawler stats then used json.loads() to make a python dictionary before sending it to fill the sqlite db.
Answered By - DrSocket
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.