13. RFM Analysis

_images/rfm_business.png

The above figure source: Blast Analytics Marketing

RFM is a method used for analyzing customer value. It is commonly used in database marketing and direct marketing and has received particular attention in retail and professional services industries. More details can be found at Wikipedia RFM_wikipedia.

RFM stands for the three dimensions:

  • Recency – How recently did the customer purchase? i.e. Duration since last purchase

  • Frequency – How often do they purchase? i.e. Total number of purchases

  • Monetary Value – How much do they spend? i.e. Total money this customer spent

13.1. RFM Analysis Methodology

RFM Analysis contains three main steps:

13.1.1. 1. Build the RFM features matrix for each customer

+----------+-------+---------+---------+
|CustomerID|Recency|Frequency| Monetary|
+----------+-------+---------+---------+
|     14911|      1|      248|132572.62|
|     12748|      0|      224|  29072.1|
|     17841|      1|      169| 40340.78|
|     14606|      1|      128| 11713.85|
|     15311|      0|      118| 59419.34|
+----------+-------+---------+---------+
only showing top 5 rows

13.1.2. 2. Determine cutting points for each feature

+----------+-------+---------+--------+-----+-----+-----+
|CustomerID|Recency|Frequency|Monetary|r_seg|f_seg|m_seg|
+----------+-------+---------+--------+-----+-----+-----+
|     17420|     50|        3|  598.83|    2|    3|    2|
|     16861|     59|        3|  151.65|    3|    3|    1|
|     16503|    106|        5| 1421.43|    3|    2|    3|
|     15727|     16|        7| 5178.96|    1|    1|    4|
|     17389|      0|       43|31300.08|    1|    1|    4|
+----------+-------+---------+--------+-----+-----+-----+
only showing top 5 rows

13.1.3. 3. Determine the RFM scores and summarize the corresponding business value

+----------+-------+---------+--------+-----+-----+-----+--------+
|CustomerID|Recency|Frequency|Monetary|r_seg|f_seg|m_seg|RFMScore|
+----------+-------+---------+--------+-----+-----+-----+--------+
|     17988|     11|        8|  191.17|    1|    1|    1|     111|
|     16892|      1|        7|  496.84|    1|    1|    2|     112|
|     16668|     15|        6|  306.72|    1|    1|    2|     112|
|     16554|      3|        7|  641.55|    1|    1|    2|     112|
|     16500|      4|        6|  400.86|    1|    1|    2|     112|
+----------+-------+---------+--------+-----+-----+-----+--------+
only showing top 5 rows

The corresponding business description and marketing value:

_images/rfm_business.png

Source: Blast Analytics Marketing

13.2. Demo

13.2.1. Load and clean data

  1. Set up spark context and SparkSession

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark RFM example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()
  1. Load dataset

df_raw = spark.read.format('com.databricks.spark.csv').\
                       options(header='true', \
                       inferschema='true').\
            load("Online Retail.csv",header=True);

check the data set

df_raw.show(5)
df_raw.printSchema()

Then you will get

+---------+---------+--------------------+--------+------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity| InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|12/1/10 8:26|     2.55|     17850|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|12/1/10 8:26|     3.39|     17850|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|12/1/10 8:26|     2.75|     17850|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|12/1/10 8:26|     3.39|     17850|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|12/1/10 8:26|     3.39|     17850|United Kingdom|
+---------+---------+--------------------+--------+------------+---------+----------+--------------+
only showing top 5 rows

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: string (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- Country: string (nullable = true)
  1. Data clean and data manipulation

  • check and remove the null values

from pyspark.sql.functions import count

def my_count(df_in):
    df_in.agg( *[ count(c).alias(c) for c in df_in.columns ] ).show()
my_count(df_raw)
+---------+---------+-----------+--------+-----------+---------+----------+-------+
|InvoiceNo|StockCode|Description|Quantity|InvoiceDate|UnitPrice|CustomerID|Country|
+---------+---------+-----------+--------+-----------+---------+----------+-------+
|   541909|   541909|     540455|  541909|     541909|   541909|    406829| 541909|
+---------+---------+-----------+--------+-----------+---------+----------+-------+

Since the count results are not the same, we have some null value in the CustomerID column. We can drop these records from the dataset.

df = df_raw.dropna(how='any')
my_count(df)
+---------+---------+-----------+--------+-----------+---------+----------+-------+
|InvoiceNo|StockCode|Description|Quantity|InvoiceDate|UnitPrice|CustomerID|Country|
+---------+---------+-----------+--------+-----------+---------+----------+-------+
|   406829|   406829|     406829|  406829|     406829|   406829|    406829| 406829|
+---------+---------+-----------+--------+-----------+---------+----------+-------+
  • Dealwith the InvoiceDate

from pyspark.sql.functions import to_utc_timestamp, unix_timestamp, lit, datediff, col

timeFmt = "MM/dd/yy HH:mm"

df = df.withColumn('NewInvoiceDate'
                 , to_utc_timestamp(unix_timestamp(col('InvoiceDate'),timeFmt).cast('timestamp')
                 , 'UTC'))
df.show(5)

+---------+---------+--------------------+--------+------------+---------+----------+--------------+--------------------+
|InvoiceNo|StockCode|         Description|Quantity| InvoiceDate|UnitPrice|CustomerID|       Country|      NewInvoiceDate|
+---------+---------+--------------------+--------+------------+---------+----------+--------------+--------------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|12/1/10 8:26|     2.55|     17850|United Kingdom|2010-12-01 08:26:...|
|   536365|    71053| WHITE METAL LANTERN|       6|12/1/10 8:26|     3.39|     17850|United Kingdom|2010-12-01 08:26:...|
|   536365|   84406B|CREAM CUPID HEART...|       8|12/1/10 8:26|     2.75|     17850|United Kingdom|2010-12-01 08:26:...|
|   536365|   84029G|KNITTED UNION FLA...|       6|12/1/10 8:26|     3.39|     17850|United Kingdom|2010-12-01 08:26:...|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|12/1/10 8:26|     3.39|     17850|United Kingdom|2010-12-01 08:26:...|
+---------+---------+--------------------+--------+------------+---------+----------+--------------+--------------------+
only showing top 5 rows

Warning

The spark is pretty sensitive to the date format!

  • calculate total price

from pyspark.sql.functions import round

df = df.withColumn('TotalPrice', round( df.Quantity * df.UnitPrice, 2 ) )
  • calculate the time difference

from pyspark.sql.functions import mean, min, max, sum, datediff, to_date

date_max = df.select(max('NewInvoiceDate')).toPandas()
current = to_utc_timestamp( unix_timestamp(lit(str(date_max.iloc[0][0])), \
                              'yy-MM-dd HH:mm').cast('timestamp'), 'UTC' )

# Calculatre Duration
df = df.withColumn('Duration', datediff(lit(current), 'NewInvoiceDate'))
  • build the Recency, Frequency and Monetary

recency = df.groupBy('CustomerID').agg(min('Duration').alias('Recency'))
frequency = df.groupBy('CustomerID', 'InvoiceNo').count()\
                        .groupBy('CustomerID')\
                        .agg(count("*").alias("Frequency"))
monetary = df.groupBy('CustomerID').agg(round(sum('TotalPrice'), 2).alias('Monetary'))
rfm = recency.join(frequency,'CustomerID', how = 'inner')\
             .join(monetary,'CustomerID', how = 'inner')
rfm.show(5)

+----------+-------+---------+--------+
|CustomerID|Recency|Frequency|Monetary|
+----------+-------+---------+--------+
|     17420|     50|        3|  598.83|
|     16861|     59|        3|  151.65|
|     16503|    106|        5| 1421.43|
|     15727|     16|        7| 5178.96|
|     17389|      0|       43|31300.08|
+----------+-------+---------+--------+
only showing top 5 rows

13.2.2. RFM Segmentation

  1. Determine cutting points

In this section, you can use the techniques (statistical results and visualizations) in Data Exploration section to help you determine the cutting points for each attribute. In my opinion, the cutting points are mainly depend on the business sense. You’s better talk to your makrting people and get feedback and suggestion from them. I will use the quantile as the cutting points in this demo.

cols = ['Recency','Frequency','Monetary']
describe_pd(rfm,cols,1)
+-------+-----------------+-----------------+------------------+
|summary|          Recency|        Frequency|          Monetary|
+-------+-----------------+-----------------+------------------+
|  count|           4372.0|           4372.0|            4372.0|
|   mean|91.58119853613907| 5.07548032936871|1898.4597003659655|
| stddev|100.7721393138483|9.338754163574727| 8219.345141139722|
|    min|              0.0|              1.0|          -4287.63|
|    max|            373.0|            248.0|         279489.02|
|    25%|             16.0|              1.0|293.36249999999995|
|    50%|             50.0|              3.0|           648.075|
|    75%|            143.0|              5.0|          1611.725|
+-------+-----------------+-----------------+------------------+

The user defined function by using the cutting points:

def RScore(x):
    if  x <= 16:
        return 1
    elif x<= 50:
        return 2
    elif x<= 143:
        return 3
    else:
        return 4

def FScore(x):
    if  x <= 1:
        return 4
    elif x <= 3:
        return 3
    elif x <= 5:
        return 2
    else:
        return 1

def MScore(x):
    if  x <= 293:
        return 4
    elif x <= 648:
        return 3
    elif x <= 1611:
        return 2
    else:
        return 1

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, DoubleType

R_udf = udf(lambda x: RScore(x), StringType())
F_udf = udf(lambda x: FScore(x), StringType())
M_udf = udf(lambda x: MScore(x), StringType())
  1. RFM Segmentation

rfm_seg = rfm.withColumn("r_seg", R_udf("Recency"))
rfm_seg = rfm_seg.withColumn("f_seg", F_udf("Frequency"))
rfm_seg = rfm_seg.withColumn("m_seg", M_udf("Monetary"))
rfm_seg.show(5)
+----------+-------+---------+--------+-----+-----+-----+
|CustomerID|Recency|Frequency|Monetary|r_seg|f_seg|m_seg|
+----------+-------+---------+--------+-----+-----+-----+
|     17420|     50|        3|  598.83|    2|    3|    2|
|     16861|     59|        3|  151.65|    3|    3|    1|
|     16503|    106|        5| 1421.43|    3|    2|    3|
|     15727|     16|        7| 5178.96|    1|    1|    4|
|     17389|      0|       43|31300.08|    1|    1|    4|
+----------+-------+---------+--------+-----+-----+-----+
only showing top 5 rows
rfm_seg = rfm_seg.withColumn('RFMScore',
                             F.concat(F.col('r_seg'),F.col('f_seg'), F.col('m_seg')))
rfm_seg.sort(F.col('RFMScore')).show(5)
+----------+-------+---------+--------+-----+-----+-----+--------+
|CustomerID|Recency|Frequency|Monetary|r_seg|f_seg|m_seg|RFMScore|
+----------+-------+---------+--------+-----+-----+-----+--------+
|     17988|     11|        8|  191.17|    1|    1|    1|     111|
|     16892|      1|        7|  496.84|    1|    1|    2|     112|
|     16668|     15|        6|  306.72|    1|    1|    2|     112|
|     16554|      3|        7|  641.55|    1|    1|    2|     112|
|     16500|      4|        6|  400.86|    1|    1|    2|     112|
+----------+-------+---------+--------+-----+-----+-----+--------+
only showing top 5 rows

13.2.3. Statistical Summary

  1. Statistical Summary

  • simple summary

rfm_seg.groupBy('RFMScore')\
       .agg({'Recency':'mean',
             'Frequency': 'mean',
             'Monetary': 'mean'} )\
        .sort(F.col('RFMScore')).show(5)
+--------+-----------------+------------------+------------------+
|RFMScore|     avg(Recency)|     avg(Monetary)|    avg(Frequency)|
+--------+-----------------+------------------+------------------+
|     111|             11.0|            191.17|               8.0|
|     112|              8.0|          505.9775|               7.5|
|     113|7.237113402061856|1223.3604123711339| 7.752577319587629|
|     114|6.035123966942149| 8828.888595041324|18.882231404958677|
|     121|              9.6|            207.24|               4.4|
+--------+-----------------+------------------+------------------+
only showing top 5 rows
  • complex summary

grp = 'RFMScore'
num_cols = ['Recency','Frequency','Monetary']
df_input = rfm_seg

quantile_grouped = quantile_agg(df_input,grp,num_cols)
quantile_grouped.toPandas().to_csv(output_dir+'quantile_grouped.csv')

deciles_grouped = deciles_agg(df_input,grp,num_cols)
deciles_grouped.toPandas().to_csv(output_dir+'deciles_grouped.csv')

13.3. Extension

You can also apply the K-means clustering in Clustering section to do the segmentation.

13.3.1. Build feature matrix

  1. build dense feature matrix

from pyspark.sql import Row
from pyspark.ml.linalg import Vectors

# method 1 (good for small feature):
#def transData(row):
#    return Row(label=row["Sales"],
#               features=Vectors.dense([row["TV"],
#                                       row["Radio"],
#                                       row["Newspaper"]]))

# Method 2 (good for large features):
def transData(data):
    return data.rdd.map(lambda r: [r[0],Vectors.dense(r[1:])]).toDF(['CustomerID','rfm'])
transformed= transData(rfm)
transformed.show(5)
+----------+-------------------+
|CustomerID|                rfm|
+----------+-------------------+
|     17420|  [50.0,3.0,598.83]|
|     16861|  [59.0,3.0,151.65]|
|     16503|[106.0,5.0,1421.43]|
|     15727| [16.0,7.0,5178.96]|
|     17389|[0.0,43.0,31300.08]|
+----------+-------------------+
only showing top 5 rows
  1. Scaler the feature matrix

from pyspark.ml.feature import MinMaxScaler

scaler = MinMaxScaler(inputCol="rfm",\
         outputCol="features")
scalerModel =  scaler.fit(transformed)
scaledData = scalerModel.transform(transformed)
scaledData.show(5,False)
+----------+-------------------+--------------------------------------------------------------+
|CustomerID|rfm                |features                                                      |
+----------+-------------------+--------------------------------------------------------------+
|17420     |[50.0,3.0,598.83]  |[0.13404825737265416,0.008097165991902834,0.01721938714830836]|
|16861     |[59.0,3.0,151.65]  |[0.1581769436997319,0.008097165991902834,0.01564357039241953] |
|16503     |[106.0,5.0,1421.43]|[0.28418230563002683,0.016194331983805668,0.02011814573186342]|
|15727     |[16.0,7.0,5178.96] |[0.04289544235924933,0.024291497975708502,0.03335929858922501]|
|17389     |[0.0,43.0,31300.08]|[0.0,0.1700404858299595,0.12540746393334334]                  |
+----------+-------------------+--------------------------------------------------------------+
only showing top 5 rows

13.3.2. K-means clustering

  1. Find optimal number of cluster

I will present two popular ways to determine the optimal number of the cluster.

  • elbow analysis

#PySpark libraries
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.sql.functions import col, percent_rank, lit
from pyspark.sql.window import Window
from pyspark.sql import DataFrame, Row
from pyspark.sql.types import StructType
from functools import reduce  # For Python 3.x

from pyspark.ml.clustering import KMeans
#from pyspark.ml.evaluation import ClusteringEvaluator  # requires Spark 2.4 or later

import numpy as np
cost = np.zeros(20)
for k in range(2,20):
    kmeans = KMeans()\
            .setK(k)\
            .setSeed(1) \
            .setFeaturesCol("features")\
            .setPredictionCol("cluster")

    model = kmeans.fit(scaledData)
    cost[k] = model.computeCost(scaledData) # requires Spark 2.0 or later
import numpy as np
import matplotlib.mlab as mlab
import matplotlib.pyplot as plt
import seaborn as sbs
from matplotlib.ticker import MaxNLocator

fig, ax = plt.subplots(1,1, figsize =(8,6))
ax.plot(range(2,20),cost[2:20], marker = "o")
ax.set_xlabel('k')
ax.set_ylabel('cost')
ax.xaxis.set_major_locator(MaxNLocator(integer=True))
plt.show()
_images/elbow_rfm.png

Cost v.s. the number of the clusters

In my opinion, sometimes it’s hard to choose the number of the clusters. As shown in Figure Cost v.s. the number of the clusters, you can choose 3, 5 or even 8. I will choose 3 in this demo.

  • Silhouette analysis

#PySpark libraries
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.sql.functions import col, percent_rank, lit
from pyspark.sql.window import Window
from pyspark.sql import DataFrame, Row
from pyspark.sql.types import StructType
from functools import reduce  # For Python 3.x

from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

def optimal_k(df_in,index_col,k_min, k_max,num_runs):
    '''
    Determine optimal number of clusters by using Silhoutte Score Analysis.
    :param df_in: the input dataframe
    :param index_col: the name of the index column
    :param k_min: the train dataset
    :param k_min: the minmum number of the clusters
    :param k_max: the maxmum number of the clusters
    :param num_runs: the number of runs for each fixed clusters

    :return k: optimal number of the clusters
    :return silh_lst: Silhouette score
    :return r_table: the running results table

    :author: Wenqiang Feng
    :email:  von198@gmail.com.com
    '''

    start = time.time()
    silh_lst = []
    k_lst = np.arange(k_min, k_max+1)

    r_table = df_in.select(index_col).toPandas()
    r_table = r_table.set_index(index_col)
    centers = pd.DataFrame()

    for k in k_lst:
        silh_val = []
        for run in np.arange(1, num_runs+1):

            # Trains a k-means model.
            kmeans = KMeans()\
                    .setK(k)\
                    .setSeed(int(np.random.randint(100, size=1)))
            model = kmeans.fit(df_in)

            # Make predictions
            predictions = model.transform(df_in)
            r_table['cluster_{k}_{run}'.format(k=k, run=run)]= predictions.select('prediction').toPandas()

            # Evaluate clustering by computing Silhouette score
            evaluator = ClusteringEvaluator()
            silhouette = evaluator.evaluate(predictions)
            silh_val.append(silhouette)

        silh_array=np.asanyarray(silh_val)
        silh_lst.append(silh_array.mean())

    elapsed =  time.time() - start

    silhouette = pd.DataFrame(list(zip(k_lst,silh_lst)),columns = ['k', 'silhouette'])

    print('+------------------------------------------------------------+')
    print("|         The finding optimal k phase took %8.0f s.       |" %(elapsed))
    print('+------------------------------------------------------------+')


    return k_lst[np.argmax(silh_lst, axis=0)], silhouette , r_table
k, silh_lst, r_table = optimal_k(scaledData,index_col,k_min, k_max,num_runs)

+------------------------------------------------------------+
|         The finding optimal k phase took     1783 s.       |
+------------------------------------------------------------+
spark.createDataFrame(silh_lst).show()

+---+------------------+
|  k|        silhouette|
+---+------------------+
|  3|0.8045154385557953|
|  4|0.6993528775512052|
|  5|0.6689286654221447|
|  6|0.6356184024841809|
|  7|0.7174102265711756|
|  8|0.6720861758298997|
|  9| 0.601771359881241|
| 10|0.6292447334578428|
+---+------------------+

From the silhouette list, we can choose 3 as the optimal number of the clusters.

Warning

ClusteringEvaluator in pyspark.ml.evaluation requires Spark 2.4 or later!!

  1. K-means clustering

k = 3
kmeans = KMeans().setK(k).setSeed(1)
model = kmeans.fit(scaledData)
# Make predictions
predictions = model.transform(scaledData)
predictions.show(5,False)
+----------+-------------------+--------------------+----------+
|CustomerID|                rfm|            features|prediction|
+----------+-------------------+--------------------+----------+
|     17420|  [50.0,3.0,598.83]|[0.13404825737265...|         0|
|     16861|  [59.0,3.0,151.65]|[0.15817694369973...|         0|
|     16503|[106.0,5.0,1421.43]|[0.28418230563002...|         2|
|     15727| [16.0,7.0,5178.96]|[0.04289544235924...|         0|
|     17389|[0.0,43.0,31300.08]|[0.0,0.1700404858...|         0|
+----------+-------------------+--------------------+----------+
only showing top 5 rows

13.3.3. Statistical summary

  1. statistical summary

results = rfm.join(predictions.select('CustomerID','prediction'),'CustomerID',how='left')
results.show(5)
+----------+-------+---------+--------+----------+
|CustomerID|Recency|Frequency|Monetary|prediction|
+----------+-------+---------+--------+----------+
|     13098|      1|       41|28658.88|         0|
|     13248|    124|        2|  465.68|         2|
|     13452|    259|        2|   590.0|         1|
|     13460|     29|        2|  183.44|         0|
|     13518|     85|        1|  659.44|         0|
+----------+-------+---------+--------+----------+
only showing top 5 rows
  • simple summary

results.groupBy('prediction')\
       .agg({'Recency':'mean',
             'Frequency': 'mean',
             'Monetary': 'mean'} )\
        .sort(F.col('prediction')).show(5)
+----------+------------------+------------------+------------------+
|prediction|      avg(Recency)|     avg(Monetary)|    avg(Frequency)|
+----------+------------------+------------------+------------------+
|         0|30.966337980278816|2543.0355321319284| 6.514450867052023|
|         1|296.02403846153845|407.16831730769206|1.5592948717948718|
|         2|154.40148698884758| 702.5096406443623| 2.550185873605948|
+----------+------------------+------------------+------------------+
  • complex summary

grp = 'RFMScore'
num_cols = ['Recency','Frequency','Monetary']
df_input = results

quantile_grouped = quantile_agg(df_input,grp,num_cols)
quantile_grouped.toPandas().to_csv(output_dir+'quantile_grouped.csv')

deciles_grouped = deciles_agg(df_input,grp,num_cols)
deciles_grouped.toPandas().to_csv(output_dir+'deciles_grouped.csv')