8. Data Manipulation: Features¶
Chinese proverb
All things are diffcult before they are easy!
Feature building is a super important step for modeling which will determine the success or failure of your model. Otherwise, you will get: garbage in; garbage out! The techniques have been covered in the following chapters, the followings are the brief summary. I recently found that the Spark official website did a really good job for tutorial documentation. The chapter is based on Extracting transforming and selecting features.
8.1. Feature Extraction¶
8.1.1. TF-IDF¶
Term frequency-inverse document frequency (TF-IDF) is a feature vectorization method widely used in text mining to reflect the importance of a term to a document in the corpus. More details can be found at: https://spark.apache.org/docs/latest/ml-features#feature-extractors
Stackoverflow TF: Both HashingTF and CountVectorizer can be used to generate the term frequency vectors. A few important differences:
partially reversible (CountVectorizer) vs irreversible (HashingTF) - since hashing is not reversible you cannot restore original input from a hash vector. From the other hand count vector with model (index) can be used to restore unordered input. As a consequence models created using hashed input can be much harder to interpret and monitor.
memory and computational overhead - HashingTF requires only a single data scan and no additional memory beyond original input and vector. CountVectorizer requires additional scan over the data to build a model and additional memory to store vocabulary (index). In case of unigram language model it is usually not a problem but in case of higher n-grams it can be prohibitively expensive or not feasible.
hashing depends on a size of the vector , hashing function and a document. Counting depends on a size of the vector, training corpus and a document.
a source of the information loss - in case of HashingTF it is dimensionality reduction with possible collisions. CountVectorizer discards infrequent tokens. How it affects downstream models depends on a particular use case and data.
HashingTF and CountVectorizer are the two popular alogoritms which used to generate term frequency vectors. They basically convert documents into a numerical representation which can be fed directly or with further processing into other algorithms like LDA, MinHash for Jaccard Distance, Cosine Distance.
: term
: document
: corpus
: the number of the elements in corpus
: Term Frequency: the number of times that term appears in document
: Document Frequency: the number of documents that contains term
: Inverse Document Frequency is a numerical measure of how much information a term provides
the product of TF and IDF
Let’s look at the example:
from pyspark.ml import Pipeline
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
sentenceData = spark.createDataFrame([
(0, "Python python Spark Spark"),
(1, "Python SQL")],
["document", "sentence"])
sentenceData.show(truncate=False)
+--------+-------------------------+
|document|sentence |
+--------+-------------------------+
|0 |Python python Spark Spark|
|1 |Python SQL |
+--------+-------------------------+
Then:
IDF:
TFIDF
8.1.1.1. Countvectorizer¶
Stackoverflow TF: CountVectorizer and CountVectorizerModel aim to help convert a collection of text documents to vectors of token counts. When an a-priori dictionary is not available, CountVectorizer can be used as an Estimator to extract the vocabulary, and generates a CountVectorizerModel. The model produces sparse representations for the documents over the vocabulary, which can then be passed to other algorithms like LDA.
from pyspark.ml import Pipeline
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
sentenceData = spark.createDataFrame([
(0, "Python python Spark Spark"),
(1, "Python SQL")],
["document", "sentence"])
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
vectorizer = CountVectorizer(inputCol="words", outputCol="rawFeatures")
idf = IDF(inputCol="rawFeatures", outputCol="features")
pipeline = Pipeline(stages=[tokenizer, vectorizer, idf])
model = pipeline.fit(sentenceData)
import numpy as np
total_counts = model.transform(sentenceData)\
.select('rawFeatures').rdd\
.map(lambda row: row['rawFeatures'].toArray())\
.reduce(lambda x,y: [x[i]+y[i] for i in range(len(y))])
vocabList = model.stages[1].vocabulary
d = {'vocabList':vocabList,'counts':total_counts}
spark.createDataFrame(np.array(list(d.values())).T.tolist(),list(d.keys())).show()
counts = model.transform(sentenceData).select('rawFeatures').collect()
counts
[Row(rawFeatures=SparseVector(8, {0: 1.0, 1: 1.0, 2: 1.0})),
Row(rawFeatures=SparseVector(8, {0: 1.0, 1: 1.0, 4: 1.0})),
Row(rawFeatures=SparseVector(8, {0: 1.0, 3: 1.0, 5: 1.0, 6: 1.0, 7: 1.0}))]
+---------+------+
|vocabList|counts|
+---------+------+
| python| 3.0|
| spark| 2.0|
| sql| 1.0|
+---------+------+
model.transform(sentenceData).show(truncate=False)
+--------+-------------------------+------------------------------+-------------------+----------------------------------+
|document|sentence |words |rawFeatures |features |
+--------+-------------------------+------------------------------+-------------------+----------------------------------+
|0 |Python python Spark Spark|[python, python, spark, spark]|(3,[0,1],[2.0,2.0])|(3,[0,1],[0.0,0.8109302162163288])|
|1 |Python SQL |[python, sql] |(3,[0,2],[1.0,1.0])|(3,[0,2],[0.0,0.4054651081081644])|
+--------+-------------------------+------------------------------+-------------------+----------------------------------+
from pyspark.sql.types import ArrayType, StringType
def termsIdx2Term(vocabulary):
def termsIdx2Term(termIndices):
return [vocabulary[int(index)] for index in termIndices]
return udf(termsIdx2Term, ArrayType(StringType()))
vectorizerModel = model.stages[1]
vocabList = vectorizerModel.vocabulary
vocabList
['python', 'spark', 'sql']
rawFeatures = model.transform(sentenceData).select('rawFeatures')
rawFeatures.show()
+-------------------+
| rawFeatures|
+-------------------+
|(3,[0,1],[2.0,2.0])|
|(3,[0,2],[1.0,1.0])|
+-------------------+
from pyspark.sql.functions import udf
import pyspark.sql.functions as F
from pyspark.sql.types import StringType, DoubleType, IntegerType
indices_udf = udf(lambda vector: vector.indices.tolist(), ArrayType(IntegerType()))
values_udf = udf(lambda vector: vector.toArray().tolist(), ArrayType(DoubleType()))
rawFeatures.withColumn('indices', indices_udf(F.col('rawFeatures')))\
.withColumn('values', values_udf(F.col('rawFeatures')))\
.withColumn("Terms", termsIdx2Term(vocabList)("indices")).show()
+-------------------+-------+---------------+---------------+
| rawFeatures|indices| values| Terms|
+-------------------+-------+---------------+---------------+
|(3,[0,1],[2.0,2.0])| [0, 1]|[2.0, 2.0, 0.0]|[python, spark]|
|(3,[0,2],[1.0,1.0])| [0, 2]|[1.0, 0.0, 1.0]| [python, sql]|
+-------------------+-------+---------------+---------------+
8.1.1.2. HashingTF¶
Stackoverflow TF: HashingTF is a Transformer which takes sets of terms and converts those sets into fixed-length feature vectors. In text processing, a “set of terms” might be a bag of words. HashingTF utilizes the hashing trick. A raw feature is mapped into an index (term) by applying a hash function. The hash function used here is MurmurHash 3. Then term frequencies are calculated based on the mapped indices. This approach avoids the need to compute a global term-to-index map, which can be expensive for a large corpus, but it suffers from potential hash collisions, where different raw features may become the same term after hashing.
from pyspark.ml import Pipeline
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
sentenceData = spark.createDataFrame([
(0, "Python python Spark Spark"),
(1, "Python SQL")],
["document", "sentence"])
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
vectorizer = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=5)
idf = IDF(inputCol="rawFeatures", outputCol="features")
pipeline = Pipeline(stages=[tokenizer, vectorizer, idf])
model = pipeline.fit(sentenceData)
model.transform(sentenceData).show(truncate=False)
+--------+-------------------------+------------------------------+-------------------+----------------------------------+
|document|sentence |words |rawFeatures |features |
+--------+-------------------------+------------------------------+-------------------+----------------------------------+
|0 |Python python Spark Spark|[python, python, spark, spark]|(5,[0,4],[2.0,2.0])|(5,[0,4],[0.8109302162163288,0.0])|
|1 |Python SQL |[python, sql] |(5,[1,4],[1.0,1.0])|(5,[1,4],[0.4054651081081644,0.0])|
+--------+-------------------------+------------------------------+-------------------+----------------------------------+
8.1.2. Word2Vec¶
8.1.2.1. Word Embeddings¶
Word2Vec is one of the popupar method to implement the Word Embeddings. Word embeddings (The best tutorial I have read. The following word and images content are from Chris Bail, PhD Duke University. So the copyright belongs to Chris Bail, PhD Duke University.) gained fame in the world of automated text analysis when it was demonstrated that they could be used to identify analogies. Figure 1 illustrates the output of a word embedding model where individual words are plotted in three dimensional space generated by the model. By examining the adjacency of words in this space, word embedding models can complete analogies such as “Man is to woman as king is to queen.” If you’d like to explore what the output of a large word embedding model looks like in more detail, check out this fantastic visualization of most words in the English language that was produced using a word embedding model called GloVE.
8.1.2.2. The Context Window¶
Word embeddings are created by identifying the words that occur within something called a “Context Window.” The Figure below illustrates context windows of varied length for a single sentence. The context window is defined by a string of words before and after a focal or “center” word that will be used to train a word embedding model. Each center word and context words can be represented as a vector of numbers that describe the presence or absence of unique words within a dataset, which is perhaps why word embedding models are often described as “word vector” models, or “word2vec” models.
8.1.2.3. Two Types of Embedding Models¶
Word embeddings are usually performed in one of two ways: “Continuous Bag of Words” (CBOW) or a “Skip-Gram Model.” The figure below illustrates the differences between the two models. The CBOW model reads in the context window words and tries to predict the most likely center word. The Skip-Gram Model predicts the context words given the center word. The examples above were created using the Skip-Gram model, which is perhaps most useful for people who want to identify patterns within texts to represent them in multimensional space, whereas the CBOW model is more useful in practical applications such as predictive web search.
8.1.2.4. Word Embedding Models in PySpark¶
from pyspark.ml.feature import Word2Vec
from pyspark.ml import Pipeline
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
word2Vec = Word2Vec(vectorSize=3, minCount=0, inputCol="words", outputCol="feature")
pipeline = Pipeline(stages=[tokenizer, word2Vec])
model = pipeline.fit(sentenceData)
result = model.transform(sentenceData)
result.show()
+-----+--------------------+--------------------+--------------------+
|label| sentence| words| feature|
+-----+--------------------+--------------------+--------------------+
| 0.0| I love Spark| [i, love, spark]|[0.05594437588782...|
| 0.0| I love python| [i, love, python]|[-0.0350368790871...|
| 1.0|I think ML is awe...|[i, think, ml, is...|[0.01242086507845...|
+-----+--------------------+--------------------+--------------------+
w2v = model.stages[1]
w2v.getVectors().show()
+-------+-----------------------------------------------------------------+
|word |vector |
+-------+-----------------------------------------------------------------+
|is |[0.13657838106155396,0.060924094170331955,-0.03379475697875023] |
|awesome|[0.037024181336164474,-0.023855900391936302,0.0760037824511528] |
|i |[-0.0014482572441920638,0.049365971237421036,0.12016955763101578]|
|ml |[-0.14006119966506958,0.01626444421708584,0.042281970381736755] |
|spark |[0.1589149385690689,-0.10970081388950348,-0.10547549277544022] |
|think |[0.030011219903826714,-0.08994936943054199,0.16471518576145172] |
|love |[0.01036644633859396,-0.017782460898160934,0.08870164304971695] |
|python |[-0.11402882635593414,0.045119188725948334,-0.029877422377467155]|
+-------+-----------------------------------------------------------------+
from pyspark.sql.functions import format_number as fmt
w2v.findSynonyms("could", 2).select("word", fmt("similarity", 5).alias("similarity")).show()
+-------+----------+
| word|similarity|
+-------+----------+
|classes| 0.90232|
| i| 0.75424|
+-------+----------+
8.1.3. FeatureHasher¶
from pyspark.ml.feature import FeatureHasher
dataset = spark.createDataFrame([
(2.2, True, "1", "foo"),
(3.3, False, "2", "bar"),
(4.4, False, "3", "baz"),
(5.5, False, "4", "foo")
], ["real", "bool", "stringNum", "string"])
hasher = FeatureHasher(inputCols=["real", "bool", "stringNum", "string"],
outputCol="features")
featurized = hasher.transform(dataset)
featurized.show(truncate=False)
+----+-----+---------+------+--------------------------------------------------------+
|real|bool |stringNum|string|features |
+----+-----+---------+------+--------------------------------------------------------+
|2.2 |true |1 |foo |(262144,[174475,247670,257907,262126],[2.2,1.0,1.0,1.0])|
|3.3 |false|2 |bar |(262144,[70644,89673,173866,174475],[1.0,1.0,1.0,3.3]) |
|4.4 |false|3 |baz |(262144,[22406,70644,174475,187923],[1.0,1.0,4.4,1.0]) |
|5.5 |false|4 |foo |(262144,[70644,101499,174475,257907],[1.0,1.0,5.5,1.0]) |
+----+-----+---------+------+--------------------------------------------------------+
8.1.4. RFormula¶
from pyspark.ml.feature import RFormula
dataset = spark.createDataFrame(
[(7, "US", 18, 1.0),
(8, "CA", 12, 0.0),
(9, "CA", 15, 0.0)],
["id", "country", "hour", "clicked"])
formula = RFormula(
formula="clicked ~ country + hour",
featuresCol="features",
labelCol="label")
output = formula.fit(dataset).transform(dataset)
output.select("features", "label").show()
+----------+-----+
| features|label|
+----------+-----+
|[0.0,18.0]| 1.0|
|[1.0,12.0]| 0.0|
|[1.0,15.0]| 0.0|
+----------+-----+
8.2. Feature Transform¶
8.2.1. Tokenizer¶
from pyspark.ml.feature import Tokenizer, RegexTokenizer
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType
sentenceDataFrame = spark.createDataFrame([
(0, "Hi I heard about Spark"),
(1, "I wish Java could use case classes"),
(2, "Logistic,regression,models,are,neat")
], ["id", "sentence"])
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
regexTokenizer = RegexTokenizer(inputCol="sentence", outputCol="words", pattern="\\W")
# alternatively, pattern="\\w+", gaps(False)
countTokens = udf(lambda words: len(words), IntegerType())
tokenized = tokenizer.transform(sentenceDataFrame)
tokenized.select("sentence", "words")\
.withColumn("tokens", countTokens(col("words"))).show(truncate=False)
regexTokenized = regexTokenizer.transform(sentenceDataFrame)
regexTokenized.select("sentence", "words") \
.withColumn("tokens", countTokens(col("words"))).show(truncate=False)
+-----------------------------------+------------------------------------------+------+
|sentence |words |tokens|
+-----------------------------------+------------------------------------------+------+
|Hi I heard about Spark |[hi, i, heard, about, spark] |5 |
|I wish Java could use case classes |[i, wish, java, could, use, case, classes]|7 |
|Logistic,regression,models,are,neat|[logistic,regression,models,are,neat] |1 |
+-----------------------------------+------------------------------------------+------+
+-----------------------------------+------------------------------------------+------+
|sentence |words |tokens|
+-----------------------------------+------------------------------------------+------+
|Hi I heard about Spark |[hi, i, heard, about, spark] |5 |
|I wish Java could use case classes |[i, wish, java, could, use, case, classes]|7 |
|Logistic,regression,models,are,neat|[logistic, regression, models, are, neat] |5 |
+-----------------------------------+------------------------------------------+------+
8.2.2. StopWordsRemover¶
from pyspark.ml.feature import StopWordsRemover
sentenceData = spark.createDataFrame([
(0, ["I", "saw", "the", "red", "balloon"]),
(1, ["Mary", "had", "a", "little", "lamb"])
], ["id", "raw"])
remover = StopWordsRemover(inputCol="raw", outputCol="removeded")
remover.transform(sentenceData).show(truncate=False)
+---+----------------------------+--------------------+
|id |raw |removeded |
+---+----------------------------+--------------------+
|0 |[I, saw, the, red, balloon] |[saw, red, balloon] |
|1 |[Mary, had, a, little, lamb]|[Mary, little, lamb]|
+---+----------------------------+--------------------+
8.2.3. NGram¶
from pyspark.ml import Pipeline
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml.feature import NGram
sentenceData = spark.createDataFrame([
(0.0, "I love Spark"),
(0.0, "I love python"),
(1.0, "I think ML is awesome")],
["label", "sentence"])
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
ngram = NGram(n=2, inputCol="words", outputCol="ngrams")
idf = IDF(inputCol="rawFeatures", outputCol="features")
pipeline = Pipeline(stages=[tokenizer, ngram])
model = pipeline.fit(sentenceData)
model.transform(sentenceData).show(truncate=False)
+-----+---------------------+---------------------------+--------------------------------------+
|label|sentence |words |ngrams |
+-----+---------------------+---------------------------+--------------------------------------+
|0.0 |I love Spark |[i, love, spark] |[i love, love spark] |
|0.0 |I love python |[i, love, python] |[i love, love python] |
|1.0 |I think ML is awesome|[i, think, ml, is, awesome]|[i think, think ml, ml is, is awesome]|
+-----+---------------------+---------------------------+--------------------------------------+
8.2.4. Binarizer¶
from pyspark.ml.feature import Binarizer
continuousDataFrame = spark.createDataFrame([
(0, 0.1),
(1, 0.8),
(2, 0.2),
(3,0.5)
], ["id", "feature"])
binarizer = Binarizer(threshold=0.5, inputCol="feature", outputCol="binarized_feature")
binarizedDataFrame = binarizer.transform(continuousDataFrame)
print("Binarizer output with Threshold = %f" % binarizer.getThreshold())
binarizedDataFrame.show()
Binarizer output with Threshold = 0.500000
+---+-------+-----------------+
| id|feature|binarized_feature|
+---+-------+-----------------+
| 0| 0.1| 0.0|
| 1| 0.8| 1.0|
| 2| 0.2| 0.0|
| 3| 0.5| 0.0|
+---+-------+-----------------+
8.2.5. Bucketizer¶
[Bucketizer](https://spark.apache.org/docs/latest/ml-features.html#bucketizer) transforms a column of continuous features to a column of feature buckets, where the buckets are specified by users.
from pyspark.ml.feature import QuantileDiscretizer, Bucketizer
data = [(0, 18.0), (1, 19.0), (2, 8.0), (3, 5.0), (4, 2.0)]
df = spark.createDataFrame(data, ["id", "age"])
print(df.show())
splits = [-float("inf"),3, 10,float("inf")]
result_bucketizer = Bucketizer(splits=splits, inputCol="age",outputCol="result").transform(df)
result_bucketizer.show()
+---+----+
| id| age|
+---+----+
| 0|18.0|
| 1|19.0|
| 2| 8.0|
| 3| 5.0|
| 4| 2.0|
+---+----+
None
+---+----+------+
| id| age|result|
+---+----+------+
| 0|18.0| 2.0|
| 1|19.0| 2.0|
| 2| 8.0| 1.0|
| 3| 5.0| 1.0|
| 4| 2.0| 0.0|
+---+----+------+
8.2.6. QuantileDiscretizer¶
QuantileDiscretizer takes a column with continuous features and outputs a column with binned categorical features. The number of bins is set by the numBuckets parameter. It is possible that the number of buckets used will be smaller than this value, for example, if there are too few distinct values of the input to create enough distinct quantiles.
from pyspark.ml.feature import QuantileDiscretizer, Bucketizer
data = [(0, 18.0), (1, 19.0), (2, 8.0), (3, 5.0), (4, 2.0)]
df = spark.createDataFrame(data, ["id", "age"])
print(df.show())
qds = QuantileDiscretizer(numBuckets=5, inputCol="age", outputCol="buckets",
relativeError=0.01, handleInvalid="error")
bucketizer = qds.fit(df)
bucketizer.transform(df).show()
bucketizer.setHandleInvalid("skip").transform(df).show()
+---+----+
| id| age|
+---+----+
| 0|18.0|
| 1|19.0|
| 2| 8.0|
| 3| 5.0|
| 4| 2.0|
+---+----+
None
+---+----+-------+
| id| age|buckets|
+---+----+-------+
| 0|18.0| 3.0|
| 1|19.0| 3.0|
| 2| 8.0| 2.0|
| 3| 5.0| 2.0|
| 4| 2.0| 1.0|
+---+----+-------+
+---+----+-------+
| id| age|buckets|
+---+----+-------+
| 0|18.0| 3.0|
| 1|19.0| 3.0|
| 2| 8.0| 2.0|
| 3| 5.0| 2.0|
| 4| 2.0| 1.0|
+---+----+-------+
If the data has NULL values, then you will get the following results:
from pyspark.ml.feature import QuantileDiscretizer, Bucketizer
data = [(0, 18.0), (1, 19.0), (2, 8.0), (3, 5.0), (4, None)]
df = spark.createDataFrame(data, ["id", "age"])
print(df.show())
splits = [-float("inf"),3, 10,float("inf")]
result_bucketizer = Bucketizer(splits=splits,
inputCol="age",outputCol="result").transform(df)
result_bucketizer.show()
qds = QuantileDiscretizer(numBuckets=5, inputCol="age", outputCol="buckets",
relativeError=0.01, handleInvalid="error")
bucketizer = qds.fit(df)
bucketizer.transform(df).show()
bucketizer.setHandleInvalid("skip").transform(df).show()
+---+----+
| id| age|
+---+----+
| 0|18.0|
| 1|19.0|
| 2| 8.0|
| 3| 5.0|
| 4|null|
+---+----+
None
+---+----+------+
| id| age|result|
+---+----+------+
| 0|18.0| 2.0|
| 1|19.0| 2.0|
| 2| 8.0| 1.0|
| 3| 5.0| 1.0|
| 4|null| null|
+---+----+------+
+---+----+-------+
| id| age|buckets|
+---+----+-------+
| 0|18.0| 3.0|
| 1|19.0| 4.0|
| 2| 8.0| 2.0|
| 3| 5.0| 1.0|
| 4|null| null|
+---+----+-------+
+---+----+-------+
| id| age|buckets|
+---+----+-------+
| 0|18.0| 3.0|
| 1|19.0| 4.0|
| 2| 8.0| 2.0|
| 3| 5.0| 1.0|
+---+----+-------+
8.2.7. StringIndexer¶
from pyspark.ml.feature import StringIndexer
df = spark.createDataFrame(
[(0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c")],
["id", "category"])
indexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
indexed = indexer.fit(df).transform(df)
indexed.show()
+---+--------+-------------+
| id|category|categoryIndex|
+---+--------+-------------+
| 0| a| 0.0|
| 1| b| 2.0|
| 2| c| 1.0|
| 3| a| 0.0|
| 4| a| 0.0|
| 5| c| 1.0|
+---+--------+-------------+
8.2.8. labelConverter¶
from pyspark.ml.feature import IndexToString, StringIndexer
df = spark.createDataFrame(
[(0, "Yes"), (1, "Yes"), (2, "Yes"), (3, "No"), (4, "No"), (5, "No")],
["id", "label"])
indexer = StringIndexer(inputCol="label", outputCol="labelIndex")
model = indexer.fit(df)
indexed = model.transform(df)
print("Transformed string column '%s' to indexed column '%s'"
% (indexer.getInputCol(), indexer.getOutputCol()))
indexed.show()
print("StringIndexer will store labels in output column metadata\n")
converter = IndexToString(inputCol="labelIndex", outputCol="originalLabel")
converted = converter.transform(indexed)
print("Transformed indexed column '%s' back to original string column '%s' using "
"labels in metadata" % (converter.getInputCol(), converter.getOutputCol()))
converted.select("id", "labelIndex", "originalLabel").show()
Transformed string column 'label' to indexed column 'labelIndex'
+---+-----+----------+
| id|label|labelIndex|
+---+-----+----------+
| 0| Yes| 1.0|
| 1| Yes| 1.0|
| 2| Yes| 1.0|
| 3| No| 0.0|
| 4| No| 0.0|
| 5| No| 0.0|
+---+-----+----------+
StringIndexer will store labels in output column metadata
Transformed indexed column 'labelIndex' back to original string column 'originalLabel' using labels in metadata
+---+----------+-------------+
| id|labelIndex|originalLabel|
+---+----------+-------------+
| 0| 1.0| Yes|
| 1| 1.0| Yes|
| 2| 1.0| Yes|
| 3| 0.0| No|
| 4| 0.0| No|
| 5| 0.0| No|
+---+----------+-------------+
from pyspark.ml import Pipeline
from pyspark.ml.feature import IndexToString, StringIndexer
df = spark.createDataFrame(
[(0, "Yes"), (1, "Yes"), (2, "Yes"), (3, "No"), (4, "No"), (5, "No")],
["id", "label"])
indexer = StringIndexer(inputCol="label", outputCol="labelIndex")
converter = IndexToString(inputCol="labelIndex", outputCol="originalLabel")
pipeline = Pipeline(stages=[indexer, converter])
model = pipeline.fit(df)
result = model.transform(df)
result.show()
+---+-----+----------+-------------+
| id|label|labelIndex|originalLabel|
+---+-----+----------+-------------+
| 0| Yes| 1.0| Yes|
| 1| Yes| 1.0| Yes|
| 2| Yes| 1.0| Yes|
| 3| No| 0.0| No|
| 4| No| 0.0| No|
| 5| No| 0.0| No|
+---+-----+----------+-------------+
8.2.9. VectorIndexer¶
from pyspark.ml import Pipeline
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import RFormula
df = spark.createDataFrame([
(0, 2.2, True, "1", "foo", 'CA'),
(1, 3.3, False, "2", "bar", 'US'),
(0, 4.4, False, "3", "baz", 'CHN'),
(1, 5.5, False, "4", "foo", 'AUS')
], ['label',"real", "bool", "stringNum", "string","country"])
formula = RFormula(
formula="label ~ real + bool + stringNum + string + country",
featuresCol="features",
labelCol="label")
# Automatically identify categorical features, and index them.
# We specify maxCategories so features with > 4 distinct values
# are treated as continuous.
featureIndexer = VectorIndexer(inputCol="features", \
outputCol="indexedFeatures",\
maxCategories=2)
pipeline = Pipeline(stages=[formula, featureIndexer])
model = pipeline.fit(df)
result = model.transform(df)
result.show()
+-----+----+-----+---------+------+-------+--------------------+--------------------+
|label|real| bool|stringNum|string|country| features| indexedFeatures|
+-----+----+-----+---------+------+-------+--------------------+--------------------+
| 0| 2.2| true| 1| foo| CA|(10,[0,1,5,7],[2....|(10,[0,1,5,7],[2....|
| 1| 3.3|false| 2| bar| US|(10,[0,3,8],[3.3,...|(10,[0,3,8],[3.3,...|
| 0| 4.4|false| 3| baz| CHN|(10,[0,4,6,9],[4....|(10,[0,4,6,9],[4....|
| 1| 5.5|false| 4| foo| AUS|(10,[0,2,5],[5.5,...|(10,[0,2,5],[5.5,...|
+-----+----+-----+---------+------+-------+--------------------+--------------------+
8.2.10. VectorAssembler¶
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
dataset = spark.createDataFrame(
[(0, 18, 1.0, Vectors.dense([0.0, 10.0, 0.5]), 1.0)],
["id", "hour", "mobile", "userFeatures", "clicked"])
assembler = VectorAssembler(
inputCols=["hour", "mobile", "userFeatures"],
outputCol="features")
output = assembler.transform(dataset)
print("Assembled columns 'hour', 'mobile', 'userFeatures' to vector column 'features'")
output.select("features", "clicked").show(truncate=False)
Assembled columns 'hour', 'mobile', 'userFeatures' to vector column 'features'
+-----------------------+-------+
|features |clicked|
+-----------------------+-------+
|[18.0,1.0,0.0,10.0,0.5]|1.0 |
+-----------------------+-------+
8.2.11. OneHotEncoder¶
This is the note I wrote for one of my readers for explaining the OneHotEncoder. I would like to share it at here:
8.2.11.1. Import and creating SparkSession¶
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Python Spark create RDD example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
df = spark.createDataFrame([
(0, "a"),
(1, "b"),
(2, "c"),
(3, "a"),
(4, "a"),
(5, "c")
], ["id", "category"])
df.show()
+---+--------+
| id|category|
+---+--------+
| 0| a|
| 1| b|
| 2| c|
| 3| a|
| 4| a|
| 5| c|
+---+--------+
8.2.11.2. OneHotEncoder¶
8.2.11.2.1. Encoder¶
from pyspark.ml.feature import OneHotEncoder, StringIndexer
stringIndexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
model = stringIndexer.fit(df)
indexed = model.transform(df)
# default setting: dropLast=True
encoder = OneHotEncoder(inputCol="categoryIndex", outputCol="categoryVec",dropLast=False)
encoded = encoder.transform(indexed)
encoded.show()
+---+--------+-------------+-------------+
| id|category|categoryIndex| categoryVec|
+---+--------+-------------+-------------+
| 0| a| 0.0|(3,[0],[1.0])|
| 1| b| 2.0|(3,[2],[1.0])|
| 2| c| 1.0|(3,[1],[1.0])|
| 3| a| 0.0|(3,[0],[1.0])|
| 4| a| 0.0|(3,[0],[1.0])|
| 5| c| 1.0|(3,[1],[1.0])|
+---+--------+-------------+-------------+
Note
The default setting of OneHotEncoder
is: dropLast=True
# default setting: dropLast=True encoder = OneHotEncoder(inputCol="categoryIndex", outputCol="categoryVec") encoded = encoder.transform(indexed) encoded.show()+---+--------+-------------+-------------+ | id|category|categoryIndex| categoryVec| +---+--------+-------------+-------------+ | 0| a| 0.0|(2,[0],[1.0])| | 1| b| 2.0| (2,[],[])| | 2| c| 1.0|(2,[1],[1.0])| | 3| a| 0.0|(2,[0],[1.0])| | 4| a| 0.0|(2,[0],[1.0])| | 5| c| 1.0|(2,[1],[1.0])| +---+--------+-------------+-------------+
8.2.11.2.2. Vector Assembler¶
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
categoricalCols = ['category']
indexers = [ StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c))
for c in categoricalCols ]
# default setting: dropLast=True
encoders = [ OneHotEncoder(inputCol=indexer.getOutputCol(),
outputCol="{0}_encoded".format(indexer.getOutputCol()),dropLast=False)
for indexer in indexers ]
assembler = VectorAssembler(inputCols=[encoder.getOutputCol() for encoder in encoders]
, outputCol="features")
pipeline = Pipeline(stages=indexers + encoders + [assembler])
model=pipeline.fit(df)
data = model.transform(df)
data.show()
+---+--------+----------------+------------------------+-------------+
| id|category|category_indexed|category_indexed_encoded| features|
+---+--------+----------------+------------------------+-------------+
| 0| a| 0.0| (3,[0],[1.0])|[1.0,0.0,0.0]|
| 1| b| 2.0| (3,[2],[1.0])|[0.0,0.0,1.0]|
| 2| c| 1.0| (3,[1],[1.0])|[0.0,1.0,0.0]|
| 3| a| 0.0| (3,[0],[1.0])|[1.0,0.0,0.0]|
| 4| a| 0.0| (3,[0],[1.0])|[1.0,0.0,0.0]|
| 5| c| 1.0| (3,[1],[1.0])|[0.0,1.0,0.0]|
+---+--------+----------------+------------------------+-------------+
8.2.11.3. Application: Get Dummy Variable¶
def get_dummy(df,indexCol,categoricalCols,continuousCols,labelCol,dropLast=False):
'''
Get dummy variables and concat with continuous variables for ml modeling.
:param df: the dataframe
:param categoricalCols: the name list of the categorical data
:param continuousCols: the name list of the numerical data
:param labelCol: the name of label column
:param dropLast: the flag of drop last column
:return: feature matrix
:author: Wenqiang Feng
:email: von198@gmail.com
>>> df = spark.createDataFrame([
(0, "a"),
(1, "b"),
(2, "c"),
(3, "a"),
(4, "a"),
(5, "c")
], ["id", "category"])
>>> indexCol = 'id'
>>> categoricalCols = ['category']
>>> continuousCols = []
>>> labelCol = []
>>> mat = get_dummy(df,indexCol,categoricalCols,continuousCols,labelCol)
>>> mat.show()
>>>
+---+-------------+
| id| features|
+---+-------------+
| 0|[1.0,0.0,0.0]|
| 1|[0.0,0.0,1.0]|
| 2|[0.0,1.0,0.0]|
| 3|[1.0,0.0,0.0]|
| 4|[1.0,0.0,0.0]|
| 5|[0.0,1.0,0.0]|
+---+-------------+
'''
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.sql.functions import col
indexers = [ StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c))
for c in categoricalCols ]
# default setting: dropLast=True
encoders = [ OneHotEncoder(inputCol=indexer.getOutputCol(),
outputCol="{0}_encoded".format(indexer.getOutputCol()),dropLast=dropLast)
for indexer in indexers ]
assembler = VectorAssembler(inputCols=[encoder.getOutputCol() for encoder in encoders]
+ continuousCols, outputCol="features")
pipeline = Pipeline(stages=indexers + encoders + [assembler])
model=pipeline.fit(df)
data = model.transform(df)
if indexCol and labelCol:
# for supervised learning
data = data.withColumn('label',col(labelCol))
return data.select(indexCol,'features','label')
elif not indexCol and labelCol:
# for supervised learning
data = data.withColumn('label',col(labelCol))
return data.select('features','label')
elif indexCol and not labelCol:
# for unsupervised learning
return data.select(indexCol,'features')
elif not indexCol and not labelCol:
# for unsupervised learning
return data.select('features')
8.2.11.3.1. Unsupervised scenario¶
df = spark.createDataFrame([
(0, "a"),
(1, "b"),
(2, "c"),
(3, "a"),
(4, "a"),
(5, "c")
], ["id", "category"])
df.show()
indexCol = 'id'
categoricalCols = ['category']
continuousCols = []
labelCol = []
mat = get_dummy(df,indexCol,categoricalCols,continuousCols,labelCol)
mat.show()
+---+-------------+
| id| features|
+---+-------------+
| 0|[1.0,0.0,0.0]|
| 1|[0.0,0.0,1.0]|
| 2|[0.0,1.0,0.0]|
| 3|[1.0,0.0,0.0]|
| 4|[1.0,0.0,0.0]|
| 5|[0.0,1.0,0.0]|
+---+-------------+
8.2.11.3.2. Supervised scenario¶
df = spark.read.csv(path='bank.csv',
sep=',',encoding='UTF-8',comment=None,
header=True,inferSchema=True)
indexCol = []
catCols = ['job','marital','education','default',
'housing','loan','contact','poutcome']
contCols = ['balance', 'duration','campaign','pdays','previous']
labelCol = 'y'
data = get_dummy(df,indexCol,catCols,contCols,labelCol,dropLast=False)
data.show(5)
+--------------------+-----+
| features|label|
+--------------------+-----+
|(37,[8,12,17,19,2...| no|
|(37,[4,12,15,19,2...| no|
|(37,[0,13,16,19,2...| no|
|(37,[0,12,16,19,2...| no|
|(37,[1,12,15,19,2...| no|
+--------------------+-----+
only showing top 5 rows
The Jupyter Notebook can be found on Colab: OneHotEncoder .
8.2.12. Scaler¶
from pyspark.ml.feature import Normalizer, StandardScaler, MinMaxScaler, MaxAbsScaler
scaler_type = 'Normal'
if scaler_type=='Normal':
scaler = Normalizer(inputCol="features", outputCol="scaledFeatures", p=1.0)
elif scaler_type=='Standard':
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures",
withStd=True, withMean=False)
elif scaler_type=='MinMaxScaler':
scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")
elif scaler_type=='MaxAbsScaler':
scaler = MaxAbsScaler(inputCol="features", outputCol="scaledFeatures")
from pyspark.ml import Pipeline
from pyspark.ml.linalg import Vectors
df = spark.createDataFrame([
(0, Vectors.dense([1.0, 0.5, -1.0]),),
(1, Vectors.dense([2.0, 1.0, 1.0]),),
(2, Vectors.dense([4.0, 10.0, 2.0]),)
], ["id", "features"])
df.show()
pipeline = Pipeline(stages=[scaler])
model =pipeline.fit(df)
data = model.transform(df)
data.show()
+---+--------------+
| id| features|
+---+--------------+
| 0|[1.0,0.5,-1.0]|
| 1| [2.0,1.0,1.0]|
| 2|[4.0,10.0,2.0]|
+---+--------------+
+---+--------------+------------------+
| id| features| scaledFeatures|
+---+--------------+------------------+
| 0|[1.0,0.5,-1.0]| [0.4,0.2,-0.4]|
| 1| [2.0,1.0,1.0]| [0.5,0.25,0.25]|
| 2|[4.0,10.0,2.0]|[0.25,0.625,0.125]|
+---+--------------+------------------+
8.2.12.1. Normalizer¶
from pyspark.ml.feature import Normalizer
from pyspark.ml.linalg import Vectors
dataFrame = spark.createDataFrame([
(0, Vectors.dense([1.0, 0.5, -1.0]),),
(1, Vectors.dense([2.0, 1.0, 1.0]),),
(2, Vectors.dense([4.0, 10.0, 2.0]),)
], ["id", "features"])
# Normalize each Vector using $L^1$ norm.
normalizer = Normalizer(inputCol="features", outputCol="normFeatures", p=1.0)
l1NormData = normalizer.transform(dataFrame)
print("Normalized using L^1 norm")
l1NormData.show()
# Normalize each Vector using $L^\infty$ norm.
lInfNormData = normalizer.transform(dataFrame, {normalizer.p: float("inf")})
print("Normalized using L^inf norm")
lInfNormData.show()
Normalized using L^1 norm
+---+--------------+------------------+
| id| features| normFeatures|
+---+--------------+------------------+
| 0|[1.0,0.5,-1.0]| [0.4,0.2,-0.4]|
| 1| [2.0,1.0,1.0]| [0.5,0.25,0.25]|
| 2|[4.0,10.0,2.0]|[0.25,0.625,0.125]|
+---+--------------+------------------+
Normalized using L^inf norm
+---+--------------+--------------+
| id| features| normFeatures|
+---+--------------+--------------+
| 0|[1.0,0.5,-1.0]|[1.0,0.5,-1.0]|
| 1| [2.0,1.0,1.0]| [1.0,0.5,0.5]|
| 2|[4.0,10.0,2.0]| [0.4,1.0,0.2]|
+---+--------------+--------------+
8.2.12.2. StandardScaler¶
from pyspark.ml.feature import Normalizer, StandardScaler, MinMaxScaler, MaxAbsScaler
from pyspark.ml.linalg import Vectors
dataFrame = spark.createDataFrame([
(0, Vectors.dense([1.0, 0.5, -1.0]),),
(1, Vectors.dense([2.0, 1.0, 1.0]),),
(2, Vectors.dense([4.0, 10.0, 2.0]),)
], ["id", "features"])
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures",
withStd=True, withMean=False)
scaleredData = scaler.fit((dataFrame)).transform(dataFrame)
scaleredData.show(truncate=False)
+---+--------------+------------------------------------------------------------+
|id |features |scaledFeatures |
+---+--------------+------------------------------------------------------------+
|0 |[1.0,0.5,-1.0]|[0.6546536707079772,0.09352195295828244,-0.6546536707079771]|
|1 |[2.0,1.0,1.0] |[1.3093073414159544,0.1870439059165649,0.6546536707079771] |
|2 |[4.0,10.0,2.0]|[2.618614682831909,1.870439059165649,1.3093073414159542] |
+---+--------------+------------------------------------------------------------+
8.2.12.3. MinMaxScaler¶
from pyspark.ml.feature import Normalizer, StandardScaler, MinMaxScaler, MaxAbsScaler
from pyspark.ml.linalg import Vectors
dataFrame = spark.createDataFrame([
(0, Vectors.dense([1.0, 0.5, -1.0]),),
(1, Vectors.dense([2.0, 1.0, 1.0]),),
(2, Vectors.dense([4.0, 10.0, 2.0]),)
], ["id", "features"])
scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")
scaledData = scaler.fit((dataFrame)).transform(dataFrame)
scaledData.show(truncate=False)
+---+--------------+-----------------------------------------------------------+
|id |features |scaledFeatures |
+---+--------------+-----------------------------------------------------------+
|0 |[1.0,0.5,-1.0]|[0.0,0.0,0.0] |
|1 |[2.0,1.0,1.0] |[0.3333333333333333,0.05263157894736842,0.6666666666666666]|
|2 |[4.0,10.0,2.0]|[1.0,1.0,1.0] |
+---+--------------+-----------------------------------------------------------+
8.2.12.4. MaxAbsScaler¶
from pyspark.ml.feature import Normalizer, StandardScaler, MinMaxScaler, MaxAbsScaler
from pyspark.ml.linalg import Vectors
dataFrame = spark.createDataFrame([
(0, Vectors.dense([1.0, 0.5, -1.0]),),
(1, Vectors.dense([2.0, 1.0, 1.0]),),
(2, Vectors.dense([4.0, 10.0, 2.0]),)
], ["id", "features"])
scaler = MaxAbsScaler(inputCol="features", outputCol="scaledFeatures")
scaledData = scaler.fit((dataFrame)).transform(dataFrame)
scaledData.show(truncate=False)
+---+--------------+----------------+
|id |features |scaledFeatures |
+---+--------------+----------------+
|0 |[1.0,0.5,-1.0]|[0.25,0.05,-0.5]|
|1 |[2.0,1.0,1.0] |[0.5,0.1,0.5] |
|2 |[4.0,10.0,2.0]|[1.0,1.0,1.0] |
+---+--------------+----------------+
8.2.13. PCA¶
from pyspark.ml.feature import PCA
from pyspark.ml.linalg import Vectors
data = [(Vectors.sparse(5, [(1, 1.0), (3, 7.0)]),),
(Vectors.dense([2.0, 0.0, 3.0, 4.0, 5.0]),),
(Vectors.dense([4.0, 0.0, 0.0, 6.0, 7.0]),)]
df = spark.createDataFrame(data, ["features"])
pca = PCA(k=3, inputCol="features", outputCol="pcaFeatures")
model = pca.fit(df)
result = model.transform(df).select("pcaFeatures")
result.show(truncate=False)
+-----------------------------------------------------------+
|pcaFeatures |
+-----------------------------------------------------------+
|[1.6485728230883807,-4.013282700516296,-5.524543751369388] |
|[-4.645104331781534,-1.1167972663619026,-5.524543751369387]|
|[-6.428880535676489,-5.337951427775355,-5.524543751369389] |
+-----------------------------------------------------------+
8.2.14. DCT¶
from pyspark.ml.feature import DCT
from pyspark.ml.linalg import Vectors
df = spark.createDataFrame([
(Vectors.dense([0.0, 1.0, -2.0, 3.0]),),
(Vectors.dense([-1.0, 2.0, 4.0, -7.0]),),
(Vectors.dense([14.0, -2.0, -5.0, 1.0]),)], ["features"])
dct = DCT(inverse=False, inputCol="features", outputCol="featuresDCT")
dctDf = dct.transform(df)
dctDf.select("featuresDCT").show(truncate=False)
+----------------------------------------------------------------+
|featuresDCT |
+----------------------------------------------------------------+
|[1.0,-1.1480502970952693,2.0000000000000004,-2.7716385975338604]|
|[-1.0,3.378492794482933,-7.000000000000001,2.9301512653149677] |
|[4.0,9.304453421915744,11.000000000000002,1.5579302036357163] |
+----------------------------------------------------------------+
8.3. Feature Selection¶
8.3.1. LASSO¶
Variable selection and the removal of correlated variables. The Ridge method shrinks the coefficients of correlated variables while the LASSO method picks one variable and discards the others. The elastic net penalty is a mixture of these two; if variables are correlated in groups then tends to select the groups as in or out. If α is close to 1, the elastic net performs much like the LASSO method and removes any degeneracies and wild behavior caused by extreme correlations.
8.3.2. RandomForest¶
AutoFeatures library based on RandomForest is coming soon………….
8.4. Unbalanced data: Undersampling¶
Since we use PySpark to deal with the big data, Undersampling for Unbalanced Classification is a useful method to deal with the Unbalanced data. Undersampling is a popular technique for unbalanced datasets to reduce the skew in class distributions. However, it is well-known that undersampling one class modifies the priors of the training set and consequently biases the posterior probabilities of a classifier. After you applied the Undersampling, you need to recalibrate the Probability Calibrating Probability with Undersampling for Unbalanced Classification.
df = spark.createDataFrame([
(0, "Yes"),
(1, "Yes"),
(2, "Yes"),
(3, "Yes"),
(4, "No"),
(5, "No")
], ["id", "label"])
df.show()
+---+-----+
| id|label|
+---+-----+
| 0| Yes|
| 1| Yes|
| 2| Yes|
| 3| Yes|
| 4| No|
| 5| No|
+---+-----+
8.4.1. Calculate undersampling Ratio¶
import math
def round_up(n, decimals=0):
multiplier = 10 ** decimals
return math.ceil(n * multiplier) / multiplier
# drop missing value rows
df = df.dropna()
# under-sampling majority set
label_Y = df.filter(df.label=='Yes')
label_N = df.filter(df.label=='No')
sampleRatio = round_up(label_N.count() / df.count(),2)
8.4.2. Undersampling¶
label_Y_sample = label_Y.sample(False, sampleRatio)
# union minority set and the under-sampling majority set
data = label_N.unionAll(label_Y_sample)
data.show()
+---+-----+
| id|label|
+---+-----+
| 4| No|
| 5| No|
| 1| Yes|
| 2| Yes|
+---+-----+
8.4.3. Recalibrating Probability¶
Undersampling is a popular technique for unbalanced datasets to reduce the skew in class distributions. However, it is well-known that undersampling one class modifies the priors of the training set and consequently biases the posterior probabilities of a classifier Calibrating Probability with Undersampling for Unbalanced Classification.
predication.withColumn('adj_probability',sampleRatio*F.col('probability')/((sampleRatio-1)*F.col('probability')+1))