Files
marketscrape-web/scraper/utils.py

247 lines
7.9 KiB
Python

from nltk.corpus import stopwords
from nltk.sentiment import SentimentIntensityAnalyzer
from nltk.tokenize import RegexpTokenizer
from nltk.stem import WordNetLemmatizer
from bs4 import BeautifulSoup
from difflib import SequenceMatcher
import numpy as np
import statistics
import requests
import re
"""
Cleans a string of text by removing punctuation and extra whitespace.
Args:
text: The string of text to clean.
Returns:
The cleaned string of text.
"""
def clean_text(text: str) -> str:
tokenizer = RegexpTokenizer('\w+|\$[\d\.]+|http\S+')
tokenized = tokenizer.tokenize(text)
tokenized = [word.lower() for word in tokenized]
stop_words = stopwords.words('english')
filtered = [word for word in tokenized if word not in stop_words and word.isalpha()]
lemmatizer = WordNetLemmatizer()
lemmatized = [lemmatizer.lemmatize(word) for word in filtered]
return " ".join(lemmatized)
"""
Clean a listing title by removing punctuation and converting to lowercase.
Args:
title: The listing title to clean.
Returns:
The cleaned listing title.
"""
def clean_listing_title(title: str) -> str:
title = re.sub(r"#", "%2", title)
title = re.sub(r"&", "%26", title)
return title
"""
Remove non-ASCII characters from title and description fields.
Args:
title (str): Title of the item.
Returns:
str: Cleaned title.
"""
def clean_title_description(title: str) -> str:
cleaned = re.sub(r"[^A-Za-z0-9\s]+", " ", title)
cleaned = re.sub(r"\s+", " ", cleaned)
return cleaned
"""
Returns the product description in the soup.
Args:
soup: a BeautifulSoup object containing a product page
Returns:
The product description
"""
def get_product_description(soup: BeautifulSoup) -> str:
description = soup.find_all("div", {"class": "rgHvZc"})
return description
"""
Extracts the price of each product from the HTML.
Args:
soup: The HTML to extract the price from.
Returns:
The price of each product. The price is represented as a
NumPy array.
"""
def get_product_price(soup: BeautifulSoup) -> np.ndarray:
prices = soup.find_all("span", {"class": "HRLxBb"})
values = []
for price in prices:
values.append(price.text)
normalized = [re.sub("\$", "", price) for price in values]
normalized = [re.search(r"[0-9,.]*", price).group(0) for price in normalized]
normalized = [float(price.replace(",", "")) for price in normalized]
outlierless = reject_outliers(np.array(normalized))
return outlierless
"""
Returns the sentiment score of the text, with higher values indicating a more positive sentiment.
Args:
text (str): The text to analyze.
Returns:
float: The sentiment score, with higher values indicating a more positive sentiment.
"""
def sentiment_analysis(text: str) -> float:
sia = SentimentIntensityAnalyzer()
sentiment = sia.polarity_scores(text)
neg, neu, pos, compound = sentiment["neg"], sentiment["neu"], sentiment["pos"], sentiment["compound"]
if compound > 0.0:
rating = 5 * max(pos, compound)
elif compound < 0.0:
rating = 5 * min(neg, compound)
else:
rating = 5 * neu
return abs(rating)
"""
Create a BeautifulSoup object from a URL.
Args:
url (str): URL of the page to scrape
headers (dict): Dictionary of headers to use in the request
Returns:
BeautifulSoup: BeautifulSoup object of the URL's HTML content
"""
def create_soup(url: str, headers: dict) -> BeautifulSoup:
response = requests.get(url, headers=headers)
soup = BeautifulSoup(response.text, 'html.parser')
return soup
"""
This function rejects outliers from the input list of data. The outliers are rejected using the
Tukey method, which defines the outliers as the data that are more than m times the interquartile
range outside the first and third quartiles. The default value of m is 1.5, but can be changed
by the user.
Args:
data: A list of float values.
m: A float value defining the number of interquartile ranges outside which the data are
considered outliers. The default value is 1.5.
Returns:
A list of float values, with the outliers removed.
"""
def reject_outliers(data: list[float], m: float = 1.5) -> list[float]:
distribution = np.abs(data - np.median(data))
m_deviation = np.median(distribution)
standard = distribution / (m_deviation if m_deviation else 1.)
return data[standard < m].tolist()
"""
The rating is based on the difference between the initial and final
price. The rating is 0 if the final price is greater than the initial
price, and 1 if the initial price is greater than the final price.
Otherwise, the rating is the ratio of the initial price to the final
price.
Args:
initial: The initial price.
final: The final price.
Returns:
The rating.
"""
def price_difference_rating(initial: float, final: float) -> float:
if initial <= final:
rating = 5.0
else:
difference = min(initial, final) / max(initial, final)
rating = (difference / 20) * 100
return rating
"""
Finds viable products based on the title of the Marketplace listing,
and utilizes the ramp down of the previous product in the sequence, to
find the minimum, maximum, and median of the prices of the product.
Args:
title: The title of the product.
ramp_down: The ramp down of the previous product in the
sequence.
Returns:
The minimum, maximum, and median of the prices of the product.
"""
def find_viable_product(title: str, ramp_down: float) -> tuple[float, float, float]:
cleaned_title = clean_listing_title(title)
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3538.102 Safari/537.36 Edge/18.19582"
}
url = f"https://www.google.com/search?q={cleaned_title}&sa=X&biw=1920&bih=927&tbm=shop&sxsrf=ALiCzsbtwkWiDOQEcm_9X1UBlEG1iaqXtg%3A1663739640147&ei=-KYqY6CsCLez0PEP0Ias2AI&ved=0ahUKEwigiP-RmaX6AhW3GTQIHVADCysQ4dUDCAU&uact=5&oq=REPLACE&gs_lcp=Cgtwcm9kdWN0cy1jYxADMgUIABCABDIFCAAQgAQyBQgAEIAEMgsIABCABBCxAxCDATIECAAQAzIFCAAQgAQyBQgAEIAEMgUIABCABDIFCAAQgAQyBQgAEIAEOgsIABAeEA8QsAMQGDoNCAAQHhAPELADEAUQGDoGCAAQChADSgQIQRgBUM4MWO4TYJoVaAFwAHgAgAFDiAGNA5IBATeYAQCgAQHIAQPAAQE&sclient=products-cc"
soup = create_soup(url, headers)
similarity_threshold = 0.25
try:
filtered_prices_descriptions = listing_product_similarity(soup, cleaned_title, similarity_threshold)
prices = list(filtered_prices_descriptions.values())
assert len(prices) > 0
except AssertionError:
while len(prices) == 0:
ramp_down += 0.05
filtered_prices_descriptions = listing_product_similarity(soup, cleaned_title, similarity_threshold - ramp_down)
prices = list(filtered_prices_descriptions.values())
median = statistics.median_grouped(prices)
return min(prices), max(prices), median
"""
Returns a dictionary of all products listed on the page that are similar to the given title.
Args:
soup (BeautifulSoup): The parsed HTML of the page.
title (str): The title of the product to compare against.
similarity_threshold (float): The minimum similarity ratio to consider a product similar.
Returns:
dict: A dictionary mapping the product ID to the product title.
"""
def listing_product_similarity(soup: BeautifulSoup, title: str, similarity_threshold: float) -> dict:
normalized = get_product_price(soup)
description = get_product_description(soup)
price_description = {}
for key, value in zip(description, normalized):
google_shopping_title = clean_title_description(key.text.lower())
listing_title = clean_title_description(title.lower())
price_description[key.text] = [value, SequenceMatcher(None, google_shopping_title, listing_title).ratio()]
filtered_prices_descriptions = {}
for key, value in price_description.items():
if value[1] >= similarity_threshold:
filtered_prices_descriptions[key] = value[0]
return filtered_prices_descriptions