Create labeledPoints from Spark DataFrame in Python

15,799

If you already have numerical features and which require no additional transformations you can use VectorAssembler to combine columns containing independent variables:

from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
    inputCols=["your", "independent", "variables"],
    outputCol="features")

transformed = assembler.transform(parsedData)

Next you can simply map:

from pyspark.mllib.regression import LabeledPoint
from pyspark.sql.functions import col

(transformed.select(col("outcome_column").alias("label"), col("features"))
  .rdd
  .map(lambda row: LabeledPoint(row.label, row.features)))

As of Spark 2.0 ml and mllib API are no longer compatible and the latter one is going towards deprecation and removal. If you still need this you'll have to convert ml.Vectors to mllib.Vectors.

from pyspark.mllib import linalg as mllib_linalg
from pyspark.ml import linalg as ml_linalg

def as_old(v):
    if isinstance(v, ml_linalg.SparseVector):
        return mllib_linalg.SparseVector(v.size, v.indices, v.values)
    if isinstance(v, ml_linalg.DenseVector):
        return mllib_linalg.DenseVector(v.values)
    raise ValueError("Unsupported type {0}".format(type(v)))

and map:

lambda row: LabeledPoint(row.label, as_old(row.features)))
Share:
15,799

Related videos on Youtube

user1518003
Author by

user1518003

Updated on September 15, 2022

Comments

  • user1518003
    user1518003 over 1 year

    What .map() function in python do I use to create a set of labeledPoints from a spark dataframe? What is the notation if The label/outcome is not the first column but I can refer to its column name, 'status'?

    I create the Python dataframe with this .map() function:

    def parsePoint(line):
        listmp = list(line.split('\t'))
        dataframe = pd.DataFrame(pd.get_dummies(listmp[1:]).sum()).transpose()
        dataframe.insert(0, 'status', dataframe['accepted'])
        if 'NULL' in dataframe.columns:
            dataframe = dataframe.drop('NULL', axis=1)  
        if '' in dataframe.columns:
            dataframe = dataframe.drop('', axis=1)  
        if 'rejected' in dataframe.columns:
            dataframe = dataframe.drop('rejected', axis=1)  
        if 'accepted' in dataframe.columns:
            dataframe = dataframe.drop('accepted', axis=1)  
        return dataframe 
    

    I convert it to a Spark dataframe after the reduce function has recombined all the Pandas dataframes.

    parsedData=sqlContext.createDataFrame(parsedData)
    

    But now how do I create labledPoints from this in Python? I assume it may be another .map() function?