3. PySpark Data Audit Functions

3.1. Basic Functions

3.1.1. mkdir

PySparkAudit.PySparkAudit.mkdir(path)[source]

Make a new directory. if it’s exist, keep the old files.

Parameters:path – the directory path

3.1.2. mkdir_clean

PySparkAudit.PySparkAudit.mkdir_clean(path)[source]

Make a new directory. if it’s exist, remove the old files.

Parameters:path – the directory path

3.1.3. df_merge

PySparkAudit.PySparkAudit.df_merge(dfs, key, how='left')[source]

Merge multiple pandas data frames with same key.

Parameters:
  • dfs – name list of the data frames
  • key – key for join
  • how – method for join, the default value is left
Returns:

merged data frame

3.1.4. data_types

PySparkAudit.PySparkAudit.data_types(df_in, tracking=False)[source]

Generate the data types of the rdd data frame.

Parameters:
  • df_in – the input rdd data frame
  • tracking – the flag for displaying CPU time, the default value is False
Returns:

data types pandas data frame

>>> test = spark.createDataFrame([
                    ('Joe', 67, 'F', 7000, 'asymptomatic', 286.1, '2019-6-28'),
                    ('Henry', 67, 'M', 8000, 'asymptomatic', 229.2, '2019-6-29'),
                    ('Sam', 37,  'F', 6000, 'nonanginal', 250.3, '2019-6-30'),
                    ('Max', 56, 'M', 9000, 'nontypical', 236.4, '2019-5-28'),
                    ('Mat', 56, 'F', 9000, 'asymptomatic', 254.5, '2019-4-28')],
                    ['Name', 'Age', 'Sex', 'Salary', 'ChestPain', 'Chol', 'CreatDate']
                   )
>>> test = test.withColumn('CreatDate', F.col('CreatDate').cast('timestamp'))
>>> from PySparkAudit import data_types
>>> data_types(test)
     feature     dtypes
0       Name     string
1        Age     bigint
2        Sex     string
3     Salary     bigint
4  ChestPain     string
5       Chol     double
6  CreatDate  timestamp

3.1.5. dtypes_class

PySparkAudit.PySparkAudit.dtypes_class(df_in)[source]

Generate the data type categories: numerical, categorical, date and unsupported category.

Parameters:df_in – the input rdd data frame
Returns:data type categories
>>> test = spark.createDataFrame([
                    ('Joe', 67, 'F', 7000, 'asymptomatic', 286.1, '2019-6-28'),
                    ('Henry', 67, 'M', 8000, 'asymptomatic', 229.2, '2019-6-29'),
                    ('Sam', 37,  'F', 6000, 'nonanginal', 250.3, '2019-6-30'),
                    ('Max', 56, 'M', 9000, 'nontypical', 236.4, '2019-5-28'),
                    ('Mat', 56, 'F', 9000, 'asymptomatic', 254.5, '2019-4-28')],
                    ['Name', 'Age', 'Sex', 'Salary', 'ChestPain', 'Chol', 'CreatDate']
                   )
>>> test = test.withColumn('CreatDate', F.col('CreatDate').cast('timestamp'))
>>> from PySparkAudit import dtypes_class
>>> dtypes_class(test)
(     feature       DataType
0       Name     StringType
1        Age       LongType
2        Sex     StringType
3    Salary       LongType
4  ChestPain     StringType
5       Chol     DoubleType
6  CreatDate  TimestampType,
['Age', 'Salary', 'Chol'],
['Name', 'Sex', 'ChestPain'],
['CreatDate'], [])

3.1.6. counts

PySparkAudit.PySparkAudit.counts(df_in, tracking=False)[source]

Generate the row counts and not null rows and distinct counts for each feature.

Parameters:
  • df_in – the input rdd data frame
  • tracking – the flag for displaying CPU time, the default value is False
Returns:

the counts in pandas data frame

>>> test = spark.createDataFrame([
                    ('Joe', None, 'F', 70000, 'asymptomatic', 286.1, '2019-6-28'),
                    ('Henry', 67, 'M', 80000, 'asymptomatic', 229.2, '2019-6-29'),
                    ('Sam', 37,  'F', 60000, 'nonanginal', 250.3, '2019-6-30'),
                    ('Max', 56, '  ', 90000, None, 236.4, '2019-5-28'),
                    ('Mat', 56, 'F', None, 'asymptomatic', 254.5, '2019-4-28')],
                    ['Name', 'Age', 'Sex', 'Salary', 'ChestPain', 'Chol', 'CreatDate']
                   )
>>> test = test.withColumn('CreatDate', F.col('CreatDate').cast('timestamp'))
>>> from PySparkAudit import counts
>>> counts(test)
     feature  row_count  notnull_count  distinct_count
0       Name          5              5               5
1        Age          5              4               3
2        Sex          5              5               3
3     Salary          5              4               4
4  ChestPain          5              4               2
5       Chol          5              5               5
6  CreatDate          5              5               5

3.1.7. describe

PySparkAudit.PySparkAudit.describe(df_in, columns=None, tracking=False)[source]

Generate the simple data frame description using .describe() function in pyspark.

Parameters:
  • df_in – the input rdd data frame
  • columns – the specific feature columns, the default value is None
  • tracking – the flag for displaying CPU time, the default value is False
Returns:

the description in pandas data frame

>>> test = spark.createDataFrame([
                ('Joe', 67, 'F', 7000, 'asymptomatic', 286.1, '2019-6-28'),
                ('Henry', 67, 'M', 8000, 'asymptomatic', 229.2, '2019-6-29'),
                ('Sam', 37,  'F', 6000, 'nonanginal', 250.3, '2019-6-30'),
                ('Max', 56, 'M', 9000, 'nontypical', 236.4, '2019-5-28'),
                ('Mat', 56, 'F', 9000, 'asymptomatic', 254.5, '2019-4-28')],
                ['Name', 'Age', 'Sex', 'Salary', 'ChestPain', 'Chol', 'CreatDate']
               )
>>> test = test.withColumn('CreatDate', F.col('CreatDate').cast('timestamp'))
>>> from PySparkAudit import describe
>>> describe(test)
summary   count     mean     ...               min         max
feature                      ...
Name          5     None     ...             Henry         Sam
Age           5     56.6     ...                37          67
Sex           5     None     ...                 F           M
Salary        5  78000.0     ...             60000       90000
ChestPain     5     None     ...      asymptomatic  nontypical
Chol          5    251.3     ...             229.2       286.1
CreatDate     5     None     ...         2019-4-28   2019-6-30

[7 rows x 5 columns]

3.1.8. percentiles

PySparkAudit.PySparkAudit.percentiles(df_in, deciles=False, tracking=False)[source]

Generate the percentiles for rdd data frame.

Parameters:
  • df_in – the input rdd data frame
  • deciles – the flag for generate the deciles
  • tracking – the flag for displaying CPU time, the default value is False
Returns:

percentiles in pandas data frame

>>> test = spark.createDataFrame([
                    ('Joe', 67, 'F', 7000, 'asymptomatic', 286.1, '2019-6-28'),
                    ('Henry', 67, 'M', 8000, 'asymptomatic', 229.2, '2019-6-29'),
                    ('Sam', 37,  'F', 6000, 'nonanginal', 250.3, '2019-6-30'),
                    ('Max', 56, 'M', 9000, 'nontypical', 236.4, '2019-5-28'),
                    ('Mat', 56, 'F', 9000, 'asymptomatic', 254.5, '2019-4-28')],
                    ['Name', 'Age', 'Sex', 'Salary', 'ChestPain', 'Chol', 'CreatDate']
                   )
>>> from PySparkAudit import percentiles
>>> percentiles(test)
   feature       Q1      Med       Q3
0      Age     56.0     67.0     67.0
1   Salary  80000.0  90000.0  90000.0
2     Chol    250.3    254.5    286.1

3.1.9. feature_len

PySparkAudit.PySparkAudit.feature_len(df_in, tracking=False)[source]

Generate feature length statistical results for each feature in the rdd data frame.

Parameters:
  • df_in – the input rdd data frame
  • tracking – the flag for displaying CPU time, the default value is False
Returns:

the feature length statistical results in pandas data frame

>>> test = spark.createDataFrame([
                    ('Joe', 67, 'F', 7000, 'asymptomatic', 286.1, '2019-6-28'),
                    ('Henry', 67, 'M', 8000, 'asymptomatic', 229.2, '2019-6-29'),
                    ('Sam', 37,  'F', 6000, 'nonanginal', 250.3, '2019-6-30'),
                    ('Max', 56, 'M', 9000, 'nontypical', 236.4, '2019-5-28'),
                    ('Mat', 56, 'F', 9000, 'asymptomatic', 254.5, '2019-4-28')],
                    ['Name', 'Age', 'Sex', 'Salary', 'ChestPain', 'Chol', 'CreatDate']
                   )
>>> from PySparkAudit import feature_len
>>> feature_len(test)
     feature  min_length  avg_length  max_length
0       Name         3.0         3.4         5.0
1        Age         2.0         2.0         2.0
2        Sex         1.0         1.0         1.0
3     Salary         5.0         5.0         5.0
4  ChestPain        10.0        11.2        12.0
5       Chol         5.0         5.0         5.0
6  CreatDate         9.0         9.0         9.0

3.1.10. freq_items

PySparkAudit.PySparkAudit.freq_items(df_in, top_n=5, tracking=False)[source]

Generate the top_n frequent items in for each feature in the rdd data frame.

Parameters:
  • df_in – the input rdd data frame
  • top_n – the number of the most frequent item
  • tracking – the flag for displaying CPU time, the default value is False
Returns:

>>> test = spark.createDataFrame([
                    ('Joe', 67, 'F', 7000, 'asymptomatic', 286.1, '2019-6-28'),
                    ('Henry', 67, 'M', 8000, 'asymptomatic', 229.2, '2019-6-29'),
                    ('Sam', 37,  'F', 6000, 'nonanginal', 250.3, '2019-6-30'),
                    ('Max', 56, 'M', 9000, 'nontypical', 236.4, '2019-5-28'),
                    ('Mat', 56, 'F', 9000, 'asymptomatic', 254.5, '2019-4-28')],
                    ['Name', 'Age', 'Sex', 'Salary', 'ChestPain', 'Chol', 'CreatDate']
                   )
>>> from PySparkAudit import freq_items
>>> freq_items(test)
     feature                            freq_items[value, freq]
0       Name  [[Joe, 1], [Mat, 1], [Henry, 1], [Sam, 1], [Ma...
1        Age                        [[67, 2], [56, 2], [37, 1]]
2        Sex                                   [[F, 3], [M, 2]]
3    Salary   [[90000, 2], [70000, 1], [80000, 1], [60000, 1]]
4  ChestPain  [[asymptomatic, 3], [nontypical, 1], [nonangin...
5       Chol  [[286.1, 1], [250.3, 1], [229.2, 1], [236.4, 1...
6  CreatDate  [[2019-6-30, 1], [2019-5-28, 1], [2019-4-28, 1...

3.1.11. rates

PySparkAudit.PySparkAudit.rates(df_in, columns=None, numeric=True, tracking=False)[source]

Generate the null, empty, negative, zero and positive value rates and feature variance for each feature in the rdd data frame.

Parameters:
  • df_in – the input rdd data frame
  • columns – the specific feature columns, the default value is None
  • numeric – the flag for numerical rdd data frame, the default value is True
  • tracking – the flag for displaying CPU time, the default value is False
Returns:

the null, empty, negative, zero and positive value rates and feature variance in pandas data frame

>>> test = spark.createDataFrame([
                    ('Joe', 67, 'F', 7000, 'asymptomatic', 286.1, '2019-6-28'),
                    ('Henry', 67, 'M', 8000, 'asymptomatic', 229.2, '2019-6-29'),
                    ('Sam', 37,  'F', 6000, 'nonanginal', 250.3, '2019-6-30'),
                    ('Max', 56, 'M', 9000, 'nontypical', 236.4, '2019-5-28'),
                    ('Mat', 56, 'F', 9000, 'asymptomatic', 254.5, '2019-4-28')],
                    ['Name', 'Age', 'Sex', 'Salary', 'ChestPain', 'Chol', 'CreatDate']
                   )
>>> from PySparkAudit import rates
>>> rates(test)
     feature  feature_variance    ...     rate_zero  rate_pos
0        Age               0.6    ...           0.0       1.0
1     Salary               0.8    ...           0.0       1.0
2       Chol               1.0    ...           0.0       1.0
3       Name               1.0    ...           0.0       0.0
4        Sex               0.4    ...           0.0       0.0
5  ChestPain               0.6    ...           0.0       0.0
6  CreatDate               1.0    ...           0.0       0.0

[7 rows x 7 columns]

3.1.12. corr_matrix

PySparkAudit.PySparkAudit.corr_matrix(df_in, method='pearson', output_dir=None, rotation=True, display=False, tracking=False)[source]

Generate the correlation matrix and heat map plot for rdd data frame.

Parameters:
  • df_in – the input rdd data frame
  • method – the method which applied to calculate the correlation matrix: pearson or spearman. the default value is pearson
  • output_dir – the out put directory, the default value is the current working directory
  • rotation – the flag for rotating the xticks in the plot, the default value is True
  • display – the flag for displaying the figures, the default value is False
  • tracking – the flag for displaying CPU time, the default value is False
Returns:

the correlation matrix in pandas data frame

>>> test = spark.createDataFrame([
                    ('Joe', 67, 'F', 7000, 'asymptomatic', 286.1, '2019-6-28'),
                    ('Henry', 67, 'M', 8000, 'asymptomatic', 229.2, '2019-6-29'),
                    ('Sam', 37,  'F', 6000, 'nonanginal', 250.3, '2019-6-30'),
                    ('Max', 56, 'M', 9000, 'nontypical', 236.4, '2019-5-28'),
                    ('Mat', 56, 'F', 9000, 'asymptomatic', 254.5, '2019-4-28')],
                    ['Name', 'Age', 'Sex', 'Salary', 'ChestPain', 'Chol', 'CreatDate']
                   )
>>> from PySparkAudit import corr_matrix
>>> corr_matrix(test)
================================================================
The correlation matrix plot Corr.png was located at:
/home/feng/Audited
              Age   Salary      Chol
Age      1.000000  0.431663  0.147226
Salary   0.431663  1.000000 -0.388171
Chol     0.147226 -0.388171  1.000000

3.2. Plot Functions

3.2.1. hist_plot

PySparkAudit.PySparkAudit.hist_plot(df_in, bins=50, output_dir=None, sample_size=None, display=False, tracking=False)[source]

Histogram plot for the numerical features in the rdd data frame. This part is super time and memory consuming. If the data size is larger than 10,000, the histograms will be saved in .pdf format. Otherwise, the histograms will be saved in .png format in hist folder.

If your time and memory are limited, you can use sample_size to generate the subset of the data frame to generate the histograms.

Parameters:
  • df_in – the input rdd data frame
  • bins – the number of bins for generate the bar plots
  • output_dir – the out put directory, the default value is the current working directory
  • sample_size – the size for generate the subset from the rdd data frame, the default value none
  • display – the flag for displaying the figures, the default value is False
  • tracking – the flag for displaying CPU time, the default value is False

3.2.2. bar_plot

PySparkAudit.PySparkAudit.bar_plot(df_in, top_n=20, rotation=True, output_dir=None, display=False, tracking=False)[source]

Bar plot for the categorical features in the rdd data frame.

Parameters:
  • df_in – the input rdd data frame
  • top_n – the number of the most frequent feature to show in the bar plot
  • rotation – the flag for rotating the xticks in the plot, the default value is True
  • output_dir – the out put directory, the default value is the current working directory
  • display – the flag for displaying the figures, the default value is False
  • tracking – the flag for displaying CPU time, the default value is False

3.2.3. trend_plot

PySparkAudit.PySparkAudit.trend_plot(df_in, types='day', d_time=None, rotation=True, output_dir=None, display=False, tracking=False)[source]

Trend plot for the aggregated time series data if the rdd data frame has date features and numerical features.

Parameters:
  • df_in – the input rdd data frame
  • types – the types for time feature aggregation: day, month, year, the default value is day
  • d_time – the specific feature name of the date feature, the default value is the first date feature in the rdd data frame
  • rotation – the flag for rotating the xticks in the plot, the default value is True
  • output_dir – the out put directory, the default value is the current working directory
  • display – the flag for displaying the figures, the default value is False
  • tracking – the flag for displaying CPU time, the default value is False

3.3. Summary Functions

3.3.1. dataset_summary

PySparkAudit.PySparkAudit.dataset_summary(df_in, tracking=False)[source]

The data set basics summary.

Parameters:
  • df_in – the input rdd data frame
  • tracking – the flag for displaying CPU time, the default value is False
Returns:

data set summary in pandas data frame

3.3.2. numeric_summary

PySparkAudit.PySparkAudit.numeric_summary(df_in, columns=None, deciles=False, top_n=5, tracking=False)[source]

The auditing function for numerical rdd data frame.

Parameters:
  • df_in – the input rdd data frame
  • columns – the specific feature columns, the default value is None
  • deciles – the flag for generate the deciles
  • top_n – the number of the most frequent item
  • tracking – the flag for displaying CPU time, the default value is False
Returns:

the audited results for the numerical features in pandas data frame

3.3.3. category_summary

PySparkAudit.PySparkAudit.category_summary(df_in, columns=None, top_n=5, tracking=False)[source]

The auditing function for categorical rdd data frame.

Parameters:
  • df_in – the input rdd data frame
  • columns – the specific feature columns, the default value is None
  • top_n – the number of the most frequent item
  • tracking – the flag for displaying CPU time, the default value is False
Returns:

the audited results for the categorical features in pandas data frame

3.4. Auditing Function

3.4.1. auditing

PySparkAudit.PySparkAudit.auditing(df_in, writer=None, columns=None, deciles=False, top_freq_item=5, bins=50, top_cat_item=20, method='pearson', output_dir=None, types='day', d_time=None, rotation=True, sample_size=None, display=False, tracking=False)[source]

The wrapper of auditing functions.

Parameters:
  • df_in – the input rdd data frame
  • writer – the writer for excel output
  • columns – the specific feature columns, the default value is None
  • deciles – the flag for generate the deciles
  • top_freq_item – the number of the most frequent item
  • bins – the number of bins for generate the bar plots
  • top_cat_item – the number of the most frequent feature to show in the bar plot
  • method – the method which applied to calculate the correlation matrix: pearson or spearman. the default value is pearson
  • output_dir – the out put directory, the default value is the current working directory
  • types – the types for time feature aggregation: day, month, year, the default value is day
  • d_time – the specific feature name of the date feature, the default value is the first date feature in the rdd data frame
  • rotation – the flag for rotating the xticks in the plot, the default value is True
  • sample_size – the size for generate the subset from the rdd data frame, the default value none
  • display – the flag for displaying the figures, the default value is False
  • tracking – the flag for displaying CPU time, the default value is False
Returns:

the all audited results in pandas data frame

3.5. Plotting Function

3.5.1. fig_plots

PySparkAudit.PySparkAudit.fig_plots(df_in, output_dir=None, bins=50, top_n=20, types='day', d_time=None, rotation=True, sample_size=None, display=False, tracking=False)[source]

The wrapper for the plot functions.

Parameters:
  • df_in – the input rdd data frame
  • output_dir – the out put directory, the default value is the current working directory
  • bins – the number of bins for generate the bar plots
  • top_n – the number of the most frequent feature to show in the bar plot
  • types – the types for time feature aggregation: day, month, year, the default value is day
  • d_time – the specific feature name of the date feature, the default value is the first date feature in the rdd data frame
  • rotation – the flag for rotating the xticks in the plot, the default value is True
  • sample_size – the size for generate the subset from the rdd data frame, the default value none
  • display – the flag for displaying the figures, the default value is False
  • tracking – the flag for displaying CPU time, the default value is False