Issue
Disclaimer The question is part of a thread including those two SO questions (q1, q2)
The data resemble movie ratings from the ratings.csv file (~891mb) of ml-latest dataset.
Once I read the csv file with polars
library like:
movie_ratings = pl.read_csv(os.path.join(application_path + data_directory, "ratings.csv"))
Let's assume we want to compute the similarity between movies seen by user=1 (so for example 62 movies) with the rest of the movies in the dataset. FYI, the dataset has ~83,000 movies so for each other_movie (82,938) compute a similarity with each movie seen by user 1 (62 movies). The complexity is 62x82938 (iterations).
For this example the benchmarks reported are only for 400/82,938 other_movies
To do so, I create two polars
dataframes. One dataframe with the other_movies
(~82,938 row) and a second dataframe with only the movies seen by the user (62 rows).
user_ratings = movie_ratings.filter(pl.col("userId")==input_id) #input_id = 1 (data related to user 1)
user_rated_movies = list(user_ratings.select(pl.col("movieId")).to_numpy().ravel()) #movies seen by user1
potential_movies_to_recommend = list(
movie_ratings.select("movieId").filter( ~(pl.col("movieId").is_in(user_rated_movies)) ).unique().sort("movieId").to_numpy().ravel()
)
items_metadata = (
movie_ratings.filter(
~pl.col("movieId").is_in(user_rated_movies) #& pl.col("movieId").is_in(potential_movie_recommendations[:total_unseen_movies])
)
.group_by("movieId").agg(
users_seen_movie=pl.col("userId").unique(),
user_ratings=pl.col("rating")
)
)
target_items_metadata = (
movie_ratings.filter(
pl.col("movieId").is_in(user_rated_movies) #& pl.col("movieId").is_in(potential_movie_recommendations[:total_unseen_movies])
).group_by("movieId").agg(
users_seen_movie=pl.col("userId").unique(),
user_ratings=pl.col("rating")
)
)
The result are two polars
dataframes with rows(movies) and columns(users seen the movies & the ratings from each user).
The first dataframe contains only other_movies
that we can potentially recommend to user1 seen he/she has not seen them.
The second dataframe contains only the movies seen by the user.
Next my approach is to iterate over each row of the first dataframe by applying a UDF function.
item_metadata_similarity = (
items_metadata.with_columns(
similarity_score=pl.struct(pl.all()).map_elements(
lambda row: item_compute_similarity_scoring_V2(row, similarity_metric, target_items_metadata),
return_dtype=pl.List(pl.List(pl.Float64)),
strategy="threading"
)
)
)
, where item_compute_similarity_scoring_V2
is defined as:
def item_compute_similarity_scoring_V2(
row,
target_movies_metadata:pl.DataFrame
):
users_item1 = np.asarray(row["users_seen_movie"])
ratings_item1 = np.asarray(row["user_ratings"])
computed_similarity: list=[]
for row2 in target_movies_metadata.iter_rows(named=True): #iter over each row from the second dataframe with the movies seen by the user.
users_item2=np.asarray(row2["users_seen_movie"])
ratings_item2=np.asarray(row2["user_ratings"])
r1, r2 = item_ratings(users_item1, ratings_item1, users_item2, ratings_item2)
if r1.shape[0] != 0 and r2.shape[0] != 0:
similarity_score = compute_similarity_score(r1, r2)
if similarity_score > 0.0: #filter out negative or zero similarity scores
computed_similarity.append((row2["movieId"], similarity_score))
most_similar_pairs = sorted(computed_similarity, key=lambda x: x[1], reverse=True)
return most_similar_pairs
, item_ratings
& compute_similarity_score
defined as
def item_ratings(u1:np.ndarray, r1:np.ndarray, u2:np.ndarray, r2:np.ndarray) -> (np.ndarray, np.ndarray):
common_elements, indices1, indices2 = np.intersect1d(u1, u2, return_indices=True)
sr1 = r1[indices1]
sr2 = r2[indices2]
assert len(sr1)==len(sr2), "ratings don't have same lengths"
return sr1, sr2
@jit(nopython=True, parallel=True)
def compute_similarity_score(array1:np.ndarray, array2:np.ndarray) -> float:
assert(array1.shape[0] == array2.shape[0])
a1a2 = 0
a1a1 = 0
a2a2 = 0
for i in range(array1.shape[0]):
a1a2 += array1[i]*array2[i]
a1a1 += array1[i]*array1[i]
a2a2 += array2[i]*array2[i]
cos_theta = 1.0
if a1a1!=0 and a2a2!=0:
cos_theta = float(a1a2/np.sqrt(a1a1*a2a2))
return cos_theta
The function basically, iterates over each row of the second dataframe and for each row computes the similarity between other_movie
and the movie seen by the user. Thus, for
400 movies we do 400*62 iterations, generating 62 similarity scores per other_movie
.
The result from each computation is an array with schema [[1, 0.20], [110, 0.34]]...
(length 62 pairs per other_movie
)
Benchmarks for 400 movies
- INFO - Item-Item: Computed similarity scores for 400 movies in: 0:05:49.887032
- ~2 minutes.
- ~5gb of RAM used.
I would to identify how can I improve the computations by using native polars
commands or exploiting the numba
framework for parallelism.
Update - 2nd approach using to_numpy()
operations without iter_rows()
and map_elements()
user_ratings = movie_ratings.filter(pl.col("userId")==input_id) #input_id = 1
user_rated_movies = user_ratings.select(pl.col("movieId")).to_numpy().ravel()
potential_movies_to_recommend = list(
movie_ratings.select("movieId").filter( ~(pl.col("movieId").is_in(user_rated_movies)) ).unique().sort("movieId").to_numpy().ravel()
)
items_metadata = (
movie_ratings.filter(
~pl.col("movieId").is_in(user_rated_movies)
)
)
# print(items_metadata.head(5))
target_items_metadata = (
movie_ratings.filter(
pl.col("movieId").is_in(user_rated_movies)
)
)
# print(target_items_metadata.head(5))
With this second approach items_metadata
and target_items_metadata
are two large polars tables.
Then my next step is to save both tables into numpy.ndarrays
with the to_numpy()
command.
items_metadata_array = items_metadata.to_numpy()
target_items_metadata_array = target_items_metadata.to_numpy()
computed_similarity_scores:dict = {}
for i, other_movie in enumerate(potential_movies_to_recommend[:400]): #take the first 400 unseen movies by user 1
mask = items_metadata_array[:, 1] == other_movie
other_movies_chunk = items_metadata_array[mask]
u1 = other_movies_chunk[:,0].astype(np.int32)
r1 = other_movies_chunk[:,2].astype(np.float32)
computed_similarity: list=[]
for i, user_movie in enumerate(user_rated_movies):
print(user_movie)
mask = target_items_metadata_array[:, 1] == user_movie
target_movie_chunk = target_items_metadata_array[mask]
u2 = target_movie_chunk[:,0].astype(np.int32)
r2 = target_movie_chunk[:,2].astype(np.float32)
common_r1, common_r2 = item_ratings(u1, r1, u2, r2)
if common_r1.shape[0] != 0 and common_r2.shape[0] != 0:
similarity_score = compute_similarity_score(common_r1, common_r2)
if similarity_score > 0.0:
computed_similarity.append((user_movie, similarity_score))
most_similar_pairs = sorted(computed_similarity, key=lambda x: x[1], reverse=True)[:k_similar_user]
computed_similarity_scores[str(other_movie)] = most_similar_pairs
Benchmarks of the second approach (8.50 minutes > 6 minutes of the first approach)
- Item-Item: Computed similarity scores for 400 movies in: 0:08:50.537102
Update - 3rd approach using iter_rows()
operations
In my third approach, I have better results from the previous two methods, getting results in approximately 2 minutes for user 1 and 400 movies.
items_metadata = (
movie_ratings.filter(
~pl.col("movieId").is_in(user_rated_movies)
)
.group_by("movieId").agg(
users_seen_movie=pl.col("userId").unique(),
user_ratings=pl.col("rating")
)
)
target_items_metadata = (
movie_ratings.filter(
pl.col("movieId").is_in(user_rated_movies)
).group_by("movieId").agg(
users_seen_movie=pl.col("userId").unique(),
user_ratings=pl.col("rating")
)
)
items_metadata
is the metadata of other_movies
not seen by the user 1.
target_items_metadata
the metadata of the movies rated by user 1. By the term metadata I refer to the two aggregated .agg()
columns, users_seen_movie
and user_ratings
Finally, I create two for loops using iter_rows()
method from polars
def cosine_similarity_score(array1:np.ndarray, array2:np.ndarray) -> float:
assert(array1.shape[0] == array2.shape[0])
a1a2 = 0
a1a1 = 0
a2a2 = 0
for i in range(array1.shape[0]):
a1a2 += array1[i]*array2[i]
a1a1 += array1[i]*array1[i]
a2a2 += array2[i]*array2[i]
# cos_theta = 1.0
cos_theta = 0.0
if a1a1!=0 and a2a2!=0:
cos_theta = float(a1a2/np.sqrt(a1a1*a2a2))
return max(0.0, cos_theta)
for row1 in item_metadata.iter_rows():
computed_similarity: list= []
for row2 in target_items_metadata.iter_rows():
r1, r2 = item_ratings(np.asarray(row1[1]), np.asarray(row1[2]), np.asarray(row2[1]), np.asarray(row2[2]))
if r1.shape[0]!=0 and r2.shape[0]!=0:
similarity_score = cosine_similarity_score(r1, r2)
computed_similarity.append((row2[0], similarity_score if similarity_score > 0 else 0))
computed_similarity_scores[str(row1[0])] = sorted(computed_similarity, key=lambda x: x[1], reverse=True)[:k_similar_user]
Benchmarks for 400 movies
- INFO - Item-Item: Computed similarity scores for 400 movies in: 0:01:50
- ~2 minutes.
- ~4.5gb of RAM used.
Solution
I'm not too familiar with numba, so before trying to compare timings, the first thing I would try to do is create a "fully native" Polars approach:
This is a direct translation of the current approach (i.e. it still contains the "double for loop") so it just serves as a baseline attempt.
Because it uses the Lazy API, nothing in the loops is computed.
That is all done when .collect()
is called (which allows Polars to parallelize the work).
The > 0.0
filtering for the similarity_score would be done after the results are collected.
input_id = 1
is_user_rating = pl.col("userId") == input_id
can_recommend = (
pl.col("movieId").is_in(pl.col("movieId").filter(is_user_rating)).not_()
)
cosine_similarity = (
pl.col('rating').dot('rating_right') /
( pl.col('rating').pow(2).sum().sqrt() *
pl.col('rating_right').pow(2).sum().sqrt() )
)
user_rated_movies = movie_ratings.filter(is_user_rating).select("movieId").to_series()
potential_movies_to_recommend = (
movie_ratings.filter(can_recommend).select(pl.col("movieId").unique().sort())
)
# use the Lazy API so we can compute in parallel
df = movie_ratings.lazy()
computed_similarity_scores = []
for other_movie in potential_movies_to_recommend.head(1).to_series(): # .head(N) potential movies
for user_movie in user_rated_movies:
score = (
df.filter(pl.col("movieId") == user_movie)
.join(
df.filter(pl.col("movieId") == other_movie),
on = "userId"
)
.select(cosine = cosine_similarity)
.select(user_movie=user_movie, other_movie=other_movie, similarity_score="cosine")
)
computed_similarity_scores.append(score)
# All scores are computed in parallel
computed_similarity_scores_polars = pl.concat(computed_similarity_scores).collect()
shape: (62, 3)
┌────────────┬─────────────┬──────────────────┐
│ user_movie ┆ other_movie ┆ similarity_score │
│ --- ┆ --- ┆ --- │
│ i32 ┆ i32 ┆ f64 │
╞════════════╪═════════════╪══════════════════╡
│ 1 ┆ 2 ┆ 0.95669 │
│ 110 ┆ 2 ┆ 0.950086 │
│ 158 ┆ 2 ┆ 0.957631 │
│ 260 ┆ 2 ┆ 0.945542 │
│ … ┆ … ┆ … │
│ 49647 ┆ 2 ┆ 0.9411 │
│ 52458 ┆ 2 ┆ 0.955353 │
│ 53996 ┆ 2 ┆ 0.930388 │
│ 54259 ┆ 2 ┆ 0.95469 │
└────────────┴─────────────┴──────────────────┘
Testing .head(100)
I get 58s
runtime compared to 111s
runtime for your example, memory consumption is the same.
duckdb
As a comparison, duckdb with .head(400)
runs in 5s
import duckdb
df = duckdb.sql("""
with
df as (from 'imdb.parquet'),
user as (from df where movieId in (from df select movieId where userId = 1)),
movies as (from df where movieId not in (from df select movieId where userId = 1)),
other as (from df where movieId in (from movies select distinct movieId order by movieId limit 400))
from
user join other using (userId)
select
user.movieId user_movie,
other.movieId other_movie,
list_cosine_similarity(
list(user.rating), list(other.rating)
) similarity_score
group by
user_movie, other_movie
order by
user_movie, other_movie
""").pl()
shape: (24_764, 3)
┌────────────┬─────────────┬──────────────────┐
│ user_movie ┆ other_movie ┆ similarity_score │
│ --- ┆ --- ┆ --- │
│ i64 ┆ i64 ┆ f64 │
╞════════════╪═════════════╪══════════════════╡
│ 1 ┆ 2 ┆ 0.95669 │
│ 1 ┆ 3 ┆ 0.941348 │
│ 1 ┆ 4 ┆ 0.92169 │
│ 1 ┆ 5 ┆ 0.943999 │
│ … ┆ … ┆ … │
│ 54259 ┆ 407 ┆ 0.941241 │
│ 54259 ┆ 408 ┆ 0.934745 │
│ 54259 ┆ 409 ┆ 0.937361 │
│ 54259 ┆ 410 ┆ 0.94937 │
└────────────┴─────────────┴──────────────────┘
Elapsed time: 5.02638 seconds
Answered By - jqurious
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.