## Licensed to the Apache Software Foundation (ASF) under one or more# contributor license agreements. See the NOTICE file distributed with# this work for additional information regarding copyright ownership.# The ASF licenses this file to You under the Apache License, Version 2.0# (the "License"); you may not use this file except in compliance with# the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License.#importarrayimportsysfromtypingimportAny,List,NamedTuple,Optional,Tuple,Type,UnionfrompysparkimportSparkContext,sincefrompyspark.core.rddimportRDDfrompyspark.mllib.commonimportJavaModelWrapper,callMLlibFunc,inherit_docfrompyspark.mllib.utilimportJavaLoader,JavaSaveablefrompyspark.sqlimportDataFrame__all__=["MatrixFactorizationModel","ALS","Rating"]
[docs]@since("0.9.0")defpredict(self,user:int,product:int)->float:""" Predicts rating for the given user and product. """returnself._java_model.predict(int(user),int(product))
[docs]@since("0.9.0")defpredictAll(self,user_product:RDD[Tuple[int,int]])->RDD[Rating]:""" Returns a list of predicted ratings for input user and product pairs. """assertisinstance(user_product,RDD),"user_product should be RDD of (user, product)"first=user_product.first()assertlen(first)==2,"user_product should be RDD of (user, product)"user_product=user_product.map(lambdau_p:(int(u_p[0]),int(u_p[1])))returnself.call("predict",user_product)
[docs]@since("1.2.0")defuserFeatures(self)->RDD[Tuple[int,array.array]]:""" Returns a paired RDD, where the first element is the user and the second is an array of features corresponding to that user. """returnself.call("getUserFeatures").mapValues(lambdav:array.array("d",v))
[docs]@since("1.2.0")defproductFeatures(self)->RDD[Tuple[int,array.array]]:""" Returns a paired RDD, where the first element is the product and the second is an array of features corresponding to that product. """returnself.call("getProductFeatures").mapValues(lambdav:array.array("d",v))
[docs]@since("1.4.0")defrecommendUsers(self,product:int,num:int)->List[Rating]:""" Recommends the top "num" number of users for a given product and returns a list of Rating objects sorted by the predicted rating in descending order. """returnlist(self.call("recommendUsers",product,num))
[docs]@since("1.4.0")defrecommendProducts(self,user:int,num:int)->List[Rating]:""" Recommends the top "num" number of products for a given user and returns a list of Rating objects sorted by the predicted rating in descending order. """returnlist(self.call("recommendProducts",user,num))
[docs]defrecommendProductsForUsers(self,num:int)->RDD[Tuple[int,Tuple[Rating,...]]]:""" Recommends the top "num" number of products for all users. The number of recommendations returned per user may be less than "num". """returnself.call("wrappedRecommendProductsForUsers",num)
[docs]defrecommendUsersForProducts(self,num:int)->RDD[Tuple[int,Tuple[Rating,...]]]:""" Recommends the top "num" number of users for all products. The number of recommendations returned per product may be less than "num". """returnself.call("wrappedRecommendUsersForProducts",num)
@property@since("1.4.0")defrank(self)->int:"""Rank for the features in this model"""returnself.call("rank")
[docs]@classmethod@since("1.3.1")defload(cls,sc:SparkContext,path:str)->"MatrixFactorizationModel":"""Load a model from the given path"""model=cls._load_java(sc,path)assertsc._jvmisnotNonewrapper=sc._jvm.org.apache.spark.mllib.api.python.MatrixFactorizationModelWrapper(model)returnMatrixFactorizationModel(wrapper)
[docs]classALS:"""Alternating Least Squares matrix factorization .. versionadded:: 0.9.0 """@classmethoddef_prepare(cls,ratings:Any)->RDD[Rating]:ifisinstance(ratings,RDD):passelifisinstance(ratings,DataFrame):ratings=ratings.rddelse:raiseTypeError("Ratings should be represented by either an RDD or a DataFrame, ""but got %s."%type(ratings))first=ratings.first()ifisinstance(first,Rating):passelifisinstance(first,(tuple,list)):ratings=ratings.map(lambdax:Rating(*x))else:raiseTypeError("Expect a Rating or a tuple/list, but got %s."%type(first))returnratings
[docs]@classmethoddeftrain(cls,ratings:Union[RDD[Rating],RDD[Tuple[int,int,float]]],rank:int,iterations:int=5,lambda_:float=0.01,blocks:int=-1,nonnegative:bool=False,seed:Optional[int]=None,)->MatrixFactorizationModel:""" Train a matrix factorization model given an RDD of ratings by users for a subset of products. The ratings matrix is approximated as the product of two lower-rank matrices of a given rank (number of features). To solve for these features, ALS is run iteratively with a configurable level of parallelism. .. versionadded:: 0.9.0 Parameters ---------- ratings : :py:class:`pyspark.RDD` RDD of `Rating` or (userID, productID, rating) tuple. rank : int Number of features to use (also referred to as the number of latent factors). iterations : int, optional Number of iterations of ALS. (default: 5) lambda\\_ : float, optional Regularization parameter. (default: 0.01) blocks : int, optional Number of blocks used to parallelize the computation. A value of -1 will use an auto-configured number of blocks. (default: -1) nonnegative : bool, optional A value of True will solve least-squares with nonnegativity constraints. (default: False) seed : bool, optional Random seed for initial matrix factorization model. A value of None will use system time as the seed. (default: None) """model=callMLlibFunc("trainALSModel",cls._prepare(ratings),rank,iterations,lambda_,blocks,nonnegative,seed,)returnMatrixFactorizationModel(model)
[docs]@classmethoddeftrainImplicit(cls,ratings:Union[RDD[Rating],RDD[Tuple[int,int,float]]],rank:int,iterations:int=5,lambda_:float=0.01,blocks:int=-1,alpha:float=0.01,nonnegative:bool=False,seed:Optional[int]=None,)->MatrixFactorizationModel:""" Train a matrix factorization model given an RDD of 'implicit preferences' of users for a subset of products. The ratings matrix is approximated as the product of two lower-rank matrices of a given rank (number of features). To solve for these features, ALS is run iteratively with a configurable level of parallelism. .. versionadded:: 0.9.0 Parameters ---------- ratings : :py:class:`pyspark.RDD` RDD of `Rating` or (userID, productID, rating) tuple. rank : int Number of features to use (also referred to as the number of latent factors). iterations : int, optional Number of iterations of ALS. (default: 5) lambda\\_ : float, optional Regularization parameter. (default: 0.01) blocks : int, optional Number of blocks used to parallelize the computation. A value of -1 will use an auto-configured number of blocks. (default: -1) alpha : float, optional A constant used in computing confidence. (default: 0.01) nonnegative : bool, optional A value of True will solve least-squares with nonnegativity constraints. (default: False) seed : int, optional Random seed for initial matrix factorization model. A value of None will use system time as the seed. (default: None) """model=callMLlibFunc("trainImplicitALSModel",cls._prepare(ratings),rank,iterations,lambda_,blocks,alpha,nonnegative,seed,)returnMatrixFactorizationModel(model)