Source code for app

# TODO:
# * import list of asins from file
# * ability to change proxy sources
# * manage ssl errors!! 
# * manage timeouts
# * and manage wait time
# * manage quiet
# * manage destination
# * manage ignore_dups

from bs4 import BeautifulSoup
from fake_useragent import UserAgent
from pprint import pprint
from math import *
import requests
import random
import logging
import argparse
import threading, Queue
import time
import json
import re
import ssl
import os, sys 

from constants import *

[docs]class AmazonScraper(object): """Hello, this is AmazonScraper!""" def __init__(self, **kwargs): default_attr = dict( asin = [], verbose=False, quiet=False, ignore_dups=False, no_reviews = False, no_questions = False, destination='./', save_pages=True ) allowed_attr = list(default_attr.keys()) default_attr.update(kwargs) for key in default_attr: if key in allowed_attr: self.__dict__[key] = kwargs.get(key) self.logger = AmazonScraper.get_logger(level=logging.DEBUG, verbose=default_attr.get('verbose')) # initialize a user agent generator self.ua = UserAgent() self.wait_time = 0.2
[docs] def scrape(self): """Manages the whole scraping process""" for asin in self.asin: try: self.logger.info("Examining product " + asin) # collect a list of proxies # this is updated for each product, # I think it would be better to update it each time # too many proxies get banned by amazon self.proxies = self.get_proxies() # collect the reviews url and the total amount # of pages to be scraped main_reviews_url, review_pages_number = self.retrieve_page(asin) if not self.no_reviews: # scrape the reviews reviews, failed_urls = self.retrieve_reviews(main_reviews_url, review_pages_number) reviews_list = list(reviews.queue) failed_urls_list = list(failed_urls.queue) # log the number of results found self.logger.info("found " + str(len(reviews_list)) + " reviews for product " + asin) self.logger.info("failed " + str(len(failed_urls_list)) + " requests for product " + asin) # save the results to file if not os.path.exists('./reviews'): os.makedirs('./reviews') results_file = open("./reviews/" + asin + ".json", 'w+') for i in reviews_list: json.dump(i, results_file) results_file.write('\n') # switch this if true with param about if not self.no_questions: questions, fails = self.retrieve_questions(asin) questions_list = list(questions.queue) fails_questions_list = list(fails.queue) self.logger.info("found " + str(len(questions_list)) + " questions for product " + asin) self.logger.info("failed " + str(len(fails_questions_list)) + " urls for product " + asin) # save questions to file if not os.path.exists('./questions'): os.makedirs('./questions') questions_file = open("./questions/" + asin + ".json", 'w+') for i in questions_list: json.dump(i, questions_file) questions_file.write('\n') except RuntimeError as err: self.logger.warning(err) except KeyboardInterrupt: self.logger.warning('Keyboard interrupt received') sys.exit()
[docs] def retrieve_questions(self, asin): """Scrapes the questions pages and returns a list of dicts containing questions and respecitive answers""" threads = [] results = Queue.Queue() fails = Queue.Queue() for page_num in range(1, 100): t = threading.Thread( target=self.scrape_page_questions, args=(asin, page_num, fails, results) ) t.start() threads.append(t) for thread in threads: thread.join() return results, fails
[docs] def scrape_page_questions(self, asin, page_num, fails, results): """threads to request and scrape a single questions page""" url = base_questions_url + asin + '/' + str(page_num) attempt = 0 while attempt < 10 : try: res = requests.get(url, timeout = 20, proxies = { 'http' : random.sample( self.proxies, 1 )}, headers = { 'User-Agent' : self.ua.random} ) if res.status_code != 200: raise RuntimeError("Server not responding: status code " + str(res.status_code) + " for url " + url) elif BeautifulSoup(res.content, 'html.parser').title.text == "Robot Check": raise RuntimeError("Robot Check not passed for url " + url) else: soup = BeautifulSoup(res.content, 'html.parser') question_boxes = soup.find_all("div", {"class":"a-fixed-left-grid-col a-col-right"}) for j, box in enumerate(question_boxes): q_a_dict = {} if j==0: continue for k, question in enumerate(box.find_all("div", {"class":"a-fixed-left-grid-col a-col-right"})): if k==0: q_a_dict['question'] = question.a.text.strip() for k, answer in enumerate(box.find_all("div", {"class":"a-fixed-left-grid-col a-col-right"})): if k!=0: ranswer = "" if answer.find("span", {"class":"askLongText"}): ranswer = answer.find("span", {"class":"askLongText"}).text ranswer = ranswer.strip()[:-8] else: ranswer = answer.span.text q_a_dict['answer'] = ranswer.strip() if 'answer' in q_a_dict and 'question' in q_a_dict: results.put(q_a_dict) return except requests.exceptions.Timeout: self.logger.debug("Connection timed out for url " + url) except requests.exceptions.RequestException as err: self.logger.debug(err) except RuntimeError as err: self.logger.debug(err) except KeyboardInterrupt: self.logger.warning('Keyboard interrupt received') sys.exit() attempt += 1 self.logger.debug("failed url " + url + " after several attempts") fails.put(url)
[docs] def retrieve_reviews(self, main_reviews_url, review_pages_number): """Collects the review given the url of the reviews page""" results = Queue.Queue() fails = Queue.Queue() threads = [] # craft the single review page url url_parts = (base_amazon_url + main_reviews_url[1:]).strip().split("/") params = url_parts[-1].split('&') for page_num in range(1, review_pages_number): chosen_params = ['ref=cm_cr_dp_d_show_all_btm_'+str(page_num)+'?ie=UTF8', 'pageNumber='+str(page_num)] final_url = "https:/" for i, item in enumerate(url_parts): if i < (len(url_parts)-1) and i>0: final_url += item + '/' for param in chosen_params: final_url += param + '&' final_url = final_url[:-1] t = threading.Thread( target=self.scrape_page_reviews, args=(final_url, fails, results) ) t.start() threads.append(t) for thread in threads: thread.join() return results, fails
[docs] def scrape_page_reviews(self, url, fails, results): """threads to request and scrape a single review page""" attempt = 0 while attempt < 10 : try: res = requests.get(url, timeout = 20, proxies = { 'http' : random.sample( self.proxies, 1 )}, headers = { 'User-Agent' : self.ua.random} ) if res.status_code != 200: raise RuntimeError("Server not responding: status code " + str(res.status_code) + " for url " + url) elif BeautifulSoup(res.content, 'html.parser').title.text == "Robot Check": raise RuntimeError("Robot Check not passed for url " + url) else: soup = BeautifulSoup(res.content, 'html.parser') review_boxes = soup.find_all("div", {"class":"review"}) for box in review_boxes: review = {} review['title'] = box.find("a", {"class":"review-title"}).text review['text'] = box.find("span", {"class":"review-text"}).text review['date'] = box.find("span", {"class":"review-date"}).text review['rating'] = box.find("i", {"class":"review-rating"}).span.text review['author'] = box.find("a", {"class":"author"}).text review['author_id'] = box.find("a", {"class":"author"}).get('href').split('/')[4] results.put(review) self.logger.info("finished scraping " + url) return except requests.exceptions.Timeout: self.logger.debug("Connection timed out for url " + url) except requests.exceptions.RequestException as err: self.logger.debug(err) except RuntimeError as err: self.logger.debug(err) except KeyboardInterrupt: self.logger.warning('Keyboard interrupt received') sys.exit() attempt += 1 self.logger.debug("failed url " + url + " after several attempts") fails.put(url)
[docs] def retrieve_page(self, asin): """Requests the main product page, saves it, and returns an url for the reviews""" attempt = 0 while attempt < 10: # craft a request res = requests.get(base_product_page_url + asin, proxies = { 'http' : random.sample( self.proxies, 1 )}, headers = { 'User-Agent' : self.ua.random} ) # if the asin does not exist, exit. if res.status_code == 404: self.logger.error("Asin " + asin + " does not exist") raise RuntimeError("Asin " + asin + " does not exist") # if the connection fails, try again elif res.status_code != 200: self.logger.error("Connection error on asin " + asin) # if everything goes well, scrape else: soup = BeautifulSoup(res.content, 'html.parser') # eventually amazon discovers us. # in this case, try again with another proxy if soup.title.text == "Robot Check": self.logger.warning("Robot Check received") else: # save the page if the user specified to if self.save_pages: if not os.path.exists('./pages'): os.makedirs('./pages') page_file = open("./pages/" + asin + '.html', 'w+') page_file.write(res.content) reviews_url = soup.find( "div", { "id" : "reviews-medley-footer" } ).find( "a", { "class" : "a-link-emphasis" } ).get("href") if soup.find("div", {"id":"reviews-medley-footer"}) != None: review_pages_number = int(ceil(float(soup.find("div", {"id":"reviews-medley-footer"}).a.text .split("See all ")[1] .split(" customer")[0].replace(",", ""))/10)) else: review_pages_number = 0 return reviews_url, review_pages_number raise RuntimeError("Fetching product " + asin + " failed after several attempts.")
[docs] def get_proxies(self): """Retrieves a list of proxies""" proxies = set() # eventually put this somewhere else proxy_sources = [ 'https://free-proxy-list.net/anonymous-proxy.html', 'https://www.us-proxy.org/', 'https://www.sslproxies.org/', 'https://www.socks-proxy.net/' ] # count times this is executed. stop at 10 attempts. attempt = 0 while not len(proxies) > 0: for source in proxy_sources: res = requests.get(source, headers={ 'User-Agent':self.ua.random }) if res.status_code != 200: self.logger.error("connection error " + str(res.status_code) \ + " source " + source) else: soup = BeautifulSoup(res.content, 'html.parser') tab = soup.find("table", {"id":"proxylisttable"}) for cell in tab.find_all('td'): if cell.string != None and re.match('\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}', cell.string) != None: proxies.add(cell.string) self.logger.info("found " + str(len(proxies)) + " proxies") if not len(proxies) > 0: attempt += 1 if attempt >= 10: raise requests.ConnectionError("Failed to \ retrieve any proxy after several \ attempts, check your connection status") time.sleep(0.5) else: break return proxies
@staticmethod
[docs] def get_logger(level=logging.DEBUG, verbose=False): """Returns a logger""" logger = logging.getLogger(__name__) fh = logging.FileHandler('scrape.log', 'wa') fh.setFormatter( logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') ) fh.setLevel(level) logger.addHandler(fh) sh = logging.StreamHandler(sys.stdout) sh.setFormatter( logging.Formatter('%(levelname)s: %(message)s') ) if verbose: sh.setLevel(logging.DEBUG) else: sh.setLevel(logging.ERROR) logger.addHandler(sh) logger.setLevel(level) return logger
@staticmethod
[docs] def parse_asins_from_file(path): """Reads a list of asins from a file""" asins = [] try: file_to_read = open(path, 'r') for line in file_to_read: asins.append(line.strip()) except IOError as err: raise ValueError("File not found " + err) return asins
@staticmethod def parse_args(args): parser = argparse.ArgumentParser( description = "amazon-scraper downloads questions and reviews from amazon products", formatter_class = argparse.RawDescriptionHelpFormatter, fromfile_prefix_chars='@' ) parser.add_argument('asin', help='Amazon asin(s) to be scraped', nargs='*') parser.add_argument('--file', '-f', help='Specify path to list of asins') parser.add_argument('--save-pages', '-p', action='store_true', default=True, help='Saves the main pages scraped') parser.add_argument('--verbose', '-v', action='store_true', default=False, help='Logging verbosity level') parser.add_argument('--no-reviews', action='store_true', default=False, help='Do not scrape reviews') parser.add_argument('--no-questions', action='store_true', default=False, help='Do not scrape questions') parser.add_argument('--destination', '-d', default='./', help="Set a destination folder") parser.add_argument('--ignore-dups', action='store_true', help="Do not consider previous operations") parser.add_argument('--quiet', '-q', default=False, action='store_true', help='Be quiet while scraping') args = parser.parse_args(args) if not args.asin and args.file is None: parser.print_help() raise ValueError('Please provide asin or filename.') elif args.asin and args.file: parser.print_help() raise ValueError('Please provide only one of the following: asin(s) or filename') if args.file: args.asin = AmazonScraper.parse_asins_from_file(args.file) return args
def main(): args = AmazonScraper.parse_args(sys.argv[1:]) scraper = AmazonScraper(**vars(args)) scraper.scrape() if __name__ == '__main__': main()