4. Machine Learning
4.1. Local Spark setup
[ ]:
# Local Spark setup for the book examples.
from pathlib import Path
from pyspark.sql import SparkSession
DATA_DIR = Path.cwd()
OUTPUT_DIR = DATA_DIR / "_spark_output" / "machine-learning"
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
builder = (
SparkSession.builder
.master("local[*]")
.appName("spark-intro-machine-learning")
.config("spark.driver.host", "127.0.0.1")
.config("spark.sql.shuffle.partitions", "4")
.config("spark.default.parallelism", "4")
.config("spark.sql.adaptive.enabled", "false")
)
spark = builder.getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("ERROR")
sqlContext = spark
4.2. Vectors
4.2.1. Dense vector
[1]:
from pyspark.ml.linalg import Vectors
Vectors.dense([1.0, 2.0, 3.0])
[1]:
DenseVector([1.0, 2.0, 3.0])
4.2.2. Sparse vector
[2]:
Vectors.sparse(5, [(0, 1.0), (2, 3.8), (4, 8.8)])
[2]:
SparseVector(5, {0: 1.0, 2: 3.8, 4: 8.8})
4.2.3. Matrix and DataFrame
[3]:
from sklearn.datasets import make_classification
X, y = make_classification(**{
'n_samples': 2000,
'n_features': 4,
'n_informative': 4,
'n_redundant': 0,
'n_repeated': 0,
'n_classes': 2,
'n_clusters_per_class': 2,
'random_state': 37
})
df = spark.createDataFrame(
[(Vectors.dense(X[r,:].tolist()),) for r in range(X.shape[0])],
['features'])
4.3. Basic statistics
4.3.1. Pearson correlation
[4]:
from pyspark.ml.stat import Correlation
print(str(Correlation.corr(df, 'features').head()[0]))
DenseMatrix([[ 1. , 0.19671305, -0.07915219, -0.3243779 ],
[ 0.19671305, 1. , 0.26944672, 0.00364392],
[-0.07915219, 0.26944672, 1. , -0.28759495],
[-0.3243779 , 0.00364392, -0.28759495, 1. ]])
4.3.2. Spearman correlation
[5]:
print(str(Correlation.corr(df, 'features', 'spearman').head()[0]))
DenseMatrix([[ 1. , 0.21295424, -0.11234501, -0.32188153],
[ 0.21295424, 1. , 0.27054041, -0.07225098],
[-0.11234501, 0.27054041, 1. , -0.28120594],
[-0.32188153, -0.07225098, -0.28120594, 1. ]])
4.3.3. Chi-square test
[6]:
X, y = make_classification(**{
'n_samples': 100,
'n_features': 2,
'n_informative': 2,
'n_redundant': 0,
'n_repeated': 0,
'n_classes': 2,
'n_clusters_per_class': 2,
'random_state': 37
})
df = spark.createDataFrame(
[(float(y[r]), Vectors.dense(X[r,:].tolist())) for r in range(X.shape[0])],
['label', 'features'])
[7]:
from pyspark.ml.stat import ChiSquareTest
r = ChiSquareTest.test(df, 'features', 'label').head()
print(f'{r.pValues} : p-values')
print(f'{r.degreesOfFreedom} : dof')
print(f'{r.statistics} : statistics')
[0.4529585113209542,0.4529585113209542] : p-values
[99, 99] : dof
[100.0,100.0] : statistics
4.3.4. Mean and variance
[8]:
from pyspark.ml.stat import Summarizer
s = Summarizer.metrics('mean', 'variance')
df.select(s.summary(df.features)).show(truncate=False)
+-------------------------------------------------------------------------------------+
|aggregate_metrics(features, 1.0) |
+-------------------------------------------------------------------------------------+
|[[-0.0648518109322975,0.027164400904008706], [1.7191301187548882,1.6490793943014903]]|
+-------------------------------------------------------------------------------------+
4.3.5. Min and max
[9]:
s = Summarizer.metrics('min', 'max')
df.select(s.summary(df.features)).show(truncate=False)
+-------------------------------------------------------------------------------+
|aggregate_metrics(features, 1.0) |
+-------------------------------------------------------------------------------+
|[[-3.229583117714798,-2.70686498114644], [2.653016794529104,2.679714287856398]]|
+-------------------------------------------------------------------------------+
4.3.6. Count
[10]:
s = Summarizer.metrics('count')
df.select(s.summary(df.features)).show(truncate=False)
+--------------------------------+
|aggregate_metrics(features, 1.0)|
+--------------------------------+
|[100] |
+--------------------------------+
4.4. Pipelines
[11]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import StandardScaler
scaler = StandardScaler(inputCol='features', outputCol='scaledFeatures',
withStd=True, withMean=False)
lr = LogisticRegression(maxIter=10, regParam=0.01)
pipeline = Pipeline(stages=[scaler, lr])
model = pipeline.fit(df)
y_preds = model.transform(df)
y_preds.select('probability', 'label', 'prediction').show(truncate=False)
+------------------------------------------+-----+----------+
|probability |label|prediction|
+------------------------------------------+-----+----------+
|[0.8680409101265395,0.13195908987346056] |0.0 |0.0 |
|[0.08030517436073896,0.919694825639261] |0.0 |1.0 |
|[0.0385764052721843,0.9614235947278156] |1.0 |1.0 |
|[0.23941846958850424,0.7605815304114959] |1.0 |1.0 |
|[0.9486667828637316,0.0513332171362683] |0.0 |0.0 |
|[0.24662939567538006,0.7533706043246199] |0.0 |1.0 |
|[0.10000853755198902,0.8999914624480111] |1.0 |1.0 |
|[0.05211571635853627,0.9478842836414636] |1.0 |1.0 |
|[0.8505912777705403,0.14940872222945964] |0.0 |0.0 |
|[0.21719054428407597,0.7828094557159241] |1.0 |1.0 |
|[0.05311183455823478,0.9468881654417654] |1.0 |1.0 |
|[0.9482695764129414,0.05173042358705855] |0.0 |0.0 |
|[0.5078594474186815,0.4921405525813185] |1.0 |0.0 |
|[0.030506033909821213,0.9694939660901789] |1.0 |1.0 |
|[0.18788723612617628,0.8121127638738237] |1.0 |1.0 |
|[0.2723894435175389,0.7276105564824611] |1.0 |1.0 |
|[0.0028093683234259397,0.9971906316765742]|1.0 |1.0 |
|[0.8290838868182774,0.17091611318172253] |0.0 |0.0 |
|[0.8706755894225549,0.12932441057744518] |0.0 |0.0 |
|[0.08386681998270544,0.9161331800172946] |1.0 |1.0 |
+------------------------------------------+-----+----------+
only showing top 20 rows
4.5. Feature extractors
4.5.1. TF-IDF
[12]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
raw_df = spark.createDataFrame([
(0.0, 'How to program in Java'),
(0.0, 'Java recipes'),
(0.0, 'Learn Java in 24 hours'),
(1.0, 'How to program in Python'),
(1.0, 'Python recipes'),
(1.0, 'Learn Python in 24 hours')
], ['label', 'title'])
tokenizer = Tokenizer(inputCol='title', outputCol='words')
hashing = HashingTF(inputCol='words', outputCol='raw_features', numFeatures=10)
idf = IDF(inputCol='raw_features', outputCol='features')
pipeline = Pipeline(stages=[tokenizer, hashing, idf])
model = pipeline.fit(raw_df)
rescale_df = model.transform(raw_df)
rescale_df.select('label', 'features').show(truncate=False)
+-----+--------------------------------------------------------------------------------------------------+
|label|features |
+-----+--------------------------------------------------------------------------------------------------+
|0.0 |(10,[4,5,7,8],[0.8472978603872037,0.0,0.6729444732424258,0.8472978603872037]) |
|0.0 |(10,[5,7],[0.0,0.3364722366212129]) |
|0.0 |(10,[1,5,7,9],[0.8472978603872037,0.0,0.3364722366212129,0.3364722366212129]) |
|1.0 |(10,[4,5,7,8,9],[0.8472978603872037,0.0,0.3364722366212129,0.8472978603872037,0.3364722366212129])|
|1.0 |(10,[5,9],[0.0,0.3364722366212129]) |
|1.0 |(10,[1,5,9],[0.8472978603872037,0.0,0.6729444732424258]) |
+-----+--------------------------------------------------------------------------------------------------+
4.5.2. Word2Vect
[13]:
from pyspark.ml.feature import Word2Vec
raw_df = spark.createDataFrame([
('How to program in Java'.split(' '),),
('Java recipes'.split(' '),),
('Learn Java in 24 hours'.split(' '),),
('How to program in Python'.split(' '),),
('Python recipes'.split(' '),),
('Learn Python in 24 hours'.split(' '),)
], ['text'])
word2Vec = Word2Vec(vectorSize=3, minCount=0, inputCol='text', outputCol='result')
model = word2Vec.fit(raw_df)
result = model.transform(raw_df)
for text, vector in result.collect():
print(f'{text} => {vector}')
['How', 'to', 'program', 'in', 'Java'] => [0.016671489179134368,-0.07890784069895745,-0.061020128056406976]
['Java', 'recipes'] => [-0.00206630676984787,-0.133316308259964,0.0072393231093883514]
['Learn', 'Java', 'in', '24', 'hours'] => [-0.0064692735671997076,-0.0450890451669693,-0.044838495552539825]
['How', 'to', 'program', 'in', 'Python'] => [0.006694729626178742,-0.02815251871943474,-0.06902075484395027]
['Python', 'recipes'] => [-0.02700820565223694,-0.0064280033111572266,-0.01276224385946989]
['Learn', 'Python', 'in', '24', 'hours'] => [-0.016446033120155336,0.005666276812553406,-0.052839122340083124]
4.5.3. Count vectorizer
[14]:
from pyspark.ml.feature import CountVectorizer
raw_df = spark.createDataFrame([
(0, 'at bat cat'.split(' ')),
(1, 'at bat bat cat at'.split(' '))
], ['id', 'words'])
cv = CountVectorizer(inputCol='words', outputCol='features', vocabSize=3, minDF=2.0)
model = cv.fit(raw_df)
result = model.transform(raw_df)
result.show(truncate=False)
+---+-----------------------+-------------------------+
|id |words |features |
+---+-----------------------+-------------------------+
|0 |[at, bat, cat] |(3,[0,1,2],[1.0,1.0,1.0])|
|1 |[at, bat, bat, cat, at]|(3,[0,1,2],[2.0,2.0,1.0])|
+---+-----------------------+-------------------------+
4.5.4. Feature hasher
[15]:
from pyspark.ml.feature import FeatureHasher
raw_df = spark.createDataFrame([
(2.2, True, 'a', 'cat'),
(4.5, False, 'b', 'dog'),
(4.4, False, 'c', 'dog'),
(2.3, True, 'd', 'cat')
], ['x1', 'x2', 'x3', 'x4'])
hasher = FeatureHasher(inputCols=['x1', 'x2', 'x3', 'x4'], outputCol='features')
featurized = hasher.transform(raw_df)
featurized.show(truncate=False)
+---+-----+---+---+--------------------------------------------------------+
|x1 |x2 |x3 |x4 |features |
+---+-----+---+---+--------------------------------------------------------+
|2.2|true |a |cat|(262144,[35046,56751,184035,244783],[1.0,1.0,2.2,1.0]) |
|4.5|false|b |dog|(262144,[162446,179156,184035,223707],[1.0,1.0,4.5,1.0])|
|4.4|false|c |dog|(262144,[121161,162446,179156,184035],[1.0,1.0,1.0,4.4])|
|2.3|true |d |cat|(262144,[5506,35046,56751,184035],[1.0,1.0,1.0,2.3]) |
+---+-----+---+---+--------------------------------------------------------+
4.6. Feature transformers
4.6.1. Tokenizer
[16]:
from pyspark.ml.feature import Tokenizer, RegexTokenizer
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType
raw_df = spark.createDataFrame([
(1, 'How to program in Java'),
(2, 'Java recipes'),
(3, 'Learn Java in 24 hours'),
(4, 'How to program in Python'),
(5, 'Python recipes'),
(6, 'Learn Python in 24 hours')
], ['id', 'title'])
tokenizer1 = Tokenizer(inputCol='title', outputCol='words')
tokenizer2 = RegexTokenizer(inputCol='title', outputCol='words', pattern="\\W")
tokenized1 = tokenizer1.transform(raw_df)
tokenized2 = tokenizer2.transform(raw_df)
counter = udf(lambda words: len(words), IntegerType())
tokenized1\
.select('title', 'words')\
.withColumn('tokens', counter(col('words')))\
.show(truncate=False)
tokenized1\
.select('title', 'words') \
.withColumn('tokens', counter(col('words')))\
.show(truncate=False)
+------------------------+------------------------------+------+
|title |words |tokens|
+------------------------+------------------------------+------+
|How to program in Java |[how, to, program, in, java] |5 |
|Java recipes |[java, recipes] |2 |
|Learn Java in 24 hours |[learn, java, in, 24, hours] |5 |
|How to program in Python|[how, to, program, in, python]|5 |
|Python recipes |[python, recipes] |2 |
|Learn Python in 24 hours|[learn, python, in, 24, hours]|5 |
+------------------------+------------------------------+------+
+------------------------+------------------------------+------+
|title |words |tokens|
+------------------------+------------------------------+------+
|How to program in Java |[how, to, program, in, java] |5 |
|Java recipes |[java, recipes] |2 |
|Learn Java in 24 hours |[learn, java, in, 24, hours] |5 |
|How to program in Python|[how, to, program, in, python]|5 |
|Python recipes |[python, recipes] |2 |
|Learn Python in 24 hours|[learn, python, in, 24, hours]|5 |
+------------------------+------------------------------+------+
4.6.2. Stop words remover
[17]:
from pyspark.ml.feature import StopWordsRemover
raw_df = spark.createDataFrame([
(1, 'How to program in Java'.split(' ')),
(2, 'Java recipes'.split(' ')),
(3, 'Learn Java in 24 hours'.split(' ')),
(4, 'How to program in Python'.split(' ')),
(5, 'Python recipes'.split(' ')),
(6, 'Learn Python in 24 hours'.split(' '))
], ['id', 'title'])
remover = StopWordsRemover(inputCol='title', outputCol='filtered')
remover.transform(raw_df).show(truncate=False)
+---+------------------------------+--------------------------+
|id |title |filtered |
+---+------------------------------+--------------------------+
|1 |[How, to, program, in, Java] |[program, Java] |
|2 |[Java, recipes] |[Java, recipes] |
|3 |[Learn, Java, in, 24, hours] |[Learn, Java, 24, hours] |
|4 |[How, to, program, in, Python]|[program, Python] |
|5 |[Python, recipes] |[Python, recipes] |
|6 |[Learn, Python, in, 24, hours]|[Learn, Python, 24, hours]|
+---+------------------------------+--------------------------+
4.6.3. n-gram
[18]:
from pyspark.ml.feature import NGram
raw_df = spark.createDataFrame([
(1, 'How to program in Java'.split(' ')),
(2, 'Java recipes'.split(' ')),
(3, 'Learn Java in 24 hours'.split(' ')),
(4, 'How to program in Python'.split(' ')),
(5, 'Python recipes'.split(' ')),
(6, 'Learn Python in 24 hours'.split(' '))
], ['id', 'title'])
ngram = NGram(n=2, inputCol='title', outputCol='ngrams')
ngramDataFrame = ngram.transform(raw_df)
ngramDataFrame.select('ngrams').show(truncate=False)
+-------------------------------------------+
|ngrams |
+-------------------------------------------+
|[How to, to program, program in, in Java] |
|[Java recipes] |
|[Learn Java, Java in, in 24, 24 hours] |
|[How to, to program, program in, in Python]|
|[Python recipes] |
|[Learn Python, Python in, in 24, 24 hours] |
+-------------------------------------------+
4.6.4. Binarizer
[19]:
from pyspark.ml.feature import Binarizer
raw_df = spark.createDataFrame([
(0, 0.1),
(1, 0.8),
(2, 0.2)
], ['id', 'feature'])
binarizer = Binarizer(threshold=0.5, inputCol='feature', outputCol='binarized_feature')
bin_df = binarizer.transform(raw_df)
print(f'Binarizer output with Threshold = {binarizer.getThreshold()}')
bin_df.show()
Binarizer output with Threshold = 0.5
+---+-------+-----------------+
| id|feature|binarized_feature|
+---+-------+-----------------+
| 0| 0.1| 0.0|
| 1| 0.8| 1.0|
| 2| 0.2| 0.0|
+---+-------+-----------------+
4.6.5. Principal component analysis
[20]:
from pyspark.ml.feature import PCA
X, y = make_classification(**{
'n_samples': 10,
'n_features': 3,
'n_informative': 2,
'n_redundant': 0,
'n_repeated': 0,
'n_classes': 2,
'n_clusters_per_class': 2,
'random_state': 37
})
raw_df = spark.createDataFrame(
[(float(y[r]), Vectors.dense(X[r,:].tolist())) for r in range(X.shape[0])],
['label', 'features'])
pca = PCA(k=2, inputCol='features', outputCol='pca_features')
model = pca.fit(raw_df)
result = model.transform(raw_df)
result.show(truncate=False)
+-----+--------------------------------------------------------------+-----------------------------------------+
|label|features |pca_features |
+-----+--------------------------------------------------------------+-----------------------------------------+
|1.0 |[-0.1398124360852646,1.9404337498133493,0.4441642252501767] |[-1.7146272018673947,0.14595662137262444]|
|0.0 |[-3.1210741978178023,-2.376881942973619,-0.009668122604746449]|[2.977397753291397,-2.234391686043509] |
|0.0 |[-1.4675982457620498,-1.1426554535428304,-0.3958504784847716] |[1.3403558685391892,-1.3153421310337556] |
|1.0 |[-0.6816497596706191,1.2805160706325531,-0.1380411277034303] |[-1.0867726484463247,-0.6289265701421938]|
|0.0 |[0.6329591387069822,-0.5435350586379027,-0.2260626440162484] |[0.3209687438850598,0.32628910847052517] |
|1.0 |[-1.1306477069338503,2.506180828180563,-0.7930780368681248] |[-2.2856106973303487,-1.4360947516967975]|
|1.0 |[-1.0858904463756514,1.449819729192644,-2.0470674393378254] |[-1.5606498256322983,-2.2249862380766023]|
|0.0 |[-0.31513865899791305,-0.9013733970213405,1.2737757744041067] |[1.2000632834593374,0.6567660856334425] |
|0.0 |[1.0463616288162525,-1.0554542219563587,1.6279535580599434] |[1.1061809462255545,1.9010543059395633] |
|1.0 |[0.7147386521308495,1.2019792718852083,1.75902821212546] |[-0.9315862509771586,1.6852330903616992] |
+-----+--------------------------------------------------------------+-----------------------------------------+
4.6.6. String indexer
[21]:
from pyspark.ml.feature import StringIndexer
raw_df = spark.createDataFrame([
(0, 'rat'),
(1, 'bat'),
(2, 'cat'),
(3, 'rat'),
(4, 'rat'),
(5, 'cat')], ['id', 'category'])
StringIndexer(inputCol='category', outputCol='category_index')\
.fit(raw_df)\
.transform(raw_df)\
.show()
+---+--------+--------------+
| id|category|category_index|
+---+--------+--------------+
| 0| rat| 0.0|
| 1| bat| 2.0|
| 2| cat| 1.0|
| 3| rat| 0.0|
| 4| rat| 0.0|
| 5| cat| 1.0|
+---+--------+--------------+
4.6.7. One-hot encoding
[ ]:
from pyspark.ml.feature import OneHotEncoder
raw_df = spark.createDataFrame([
(0.0, 1.0),
(0.0, 0.0),
(1.0, 1.0),
(1.0, 0.0),
(2.0, 2.0)
], ['is_male', 'is_adult'])
encoder = OneHotEncoder(inputCols=['is_male', 'is_adult'], outputCols=['is_male_vec', 'is_adult_vec'])
model = encoder.fit(raw_df)
encoded = model.transform(raw_df)
encoded.show()
4.6.8. SQL transformer
[23]:
from pyspark.ml.feature import SQLTransformer
raw_df = spark.createDataFrame([
(0, 0.98, 0.88),
(1, 0.88, 0.77)
], ['id', 'homework', 'exam'])
transformer = SQLTransformer(
statement='SELECT *, (0.6 * homework + 0.4 * exam) as grade FROM __THIS__')
transformer.transform(raw_df).show()
+---+--------+----+------------------+
| id|homework|exam| grade|
+---+--------+----+------------------+
| 0| 0.98|0.88| 0.94|
| 1| 0.88|0.77|0.8360000000000001|
+---+--------+----+------------------+
4.6.9. Imputer
[24]:
from pyspark.ml.feature import Imputer
raw_df = spark.createDataFrame([
(1.0, float('nan')),
(2.0, float('nan')),
(float('nan'), 3.0),
(4.0, 4.0),
(5.0, 5.0)
], ['x1', 'x2'])
Imputer(inputCols=['x1', 'x2'], outputCols=['x1_imputed', 'x2_imputed'])\
.fit(raw_df)\
.transform(raw_df)\
.show()
+---+---+----------+----------+
| x1| x2|x1_imputed|x2_imputed|
+---+---+----------+----------+
|1.0|NaN| 1.0| 4.0|
|2.0|NaN| 2.0| 4.0|
|NaN|3.0| 3.0| 3.0|
|4.0|4.0| 4.0| 4.0|
|5.0|5.0| 5.0| 5.0|
+---+---+----------+----------+
4.6.10. Vector slicer
[ ]:
from pyspark.ml.feature import VectorSlicer
from pyspark.ml.linalg import Vectors
from pyspark.sql import Row
raw_df = spark.createDataFrame([
Row(user_features=Vectors.dense([-1.0, 1.3, 3.0])),
Row(user_features=Vectors.dense([-2.0, 2.3, 0.0]))])
VectorSlicer(inputCol='user_features', outputCol='features', indices=[1])\
.transform(raw_df)\
.select('user_features', 'features')\
.show()
4.7. Regression
[26]:
from sklearn.datasets import make_regression
from pyspark.ml.regression import LinearRegression
from pyspark.ml.regression import GeneralizedLinearRegression
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.regression import GBTRegressor
X, y = make_regression(**{
'n_samples': 100,
'n_features': 4,
'n_informative': 4,
'n_targets': 1,
'bias': 5.3,
'random_state': 37
})
df = spark.createDataFrame(
[(float(y[r]), Vectors.dense(X[r,:].tolist())) for r in range(X.shape[0])],
['label', 'features'])
LinearRegression(labelCol='label', featuresCol='features',
maxIter=10, regParam=0.3, elasticNetParam=0.8)\
.fit(df).transform(df)
GeneralizedLinearRegression(labelCol='label', featuresCol='features',
family='gaussian', link='identity',
maxIter=10, regParam=0.3)\
.fit(df).transform(df)
DecisionTreeRegressor(labelCol='label', featuresCol='features')\
.fit(df).transform(df)
RandomForestRegressor(labelCol='label', featuresCol='features')\
.fit(df).transform(df)
GBTRegressor(labelCol='label', featuresCol='features', maxIter=10)\
.fit(df).transform(df).select('label', 'prediction').show(n=10)
+-------------------+-------------------+
| label| prediction|
+-------------------+-------------------+
|-156.97053337870528|-159.70341330046185|
| 28.805518697488676| 30.151952001502938|
| 53.26548360120433| 50.65135193096635|
| 76.47476360452336| 77.0946010330639|
|-33.355650442108725|-36.325758608140816|
|-3.6605327490473867| -9.28390361398866|
| -74.72068617488857| -72.259527862171|
| 22.540384098077414| 19.046555779007523|
| 6.261118649116621| 12.180035472252564|
| 31.669985186286436| 29.840281061212405|
+-------------------+-------------------+
only showing top 10 rows
4.8. Classificaton
[27]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.classification import LinearSVC
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer
X, y = make_classification(**{
'n_samples': 100,
'n_features': 2,
'n_informative': 2,
'n_redundant': 0,
'n_repeated': 0,
'n_classes': 2,
'n_clusters_per_class': 2,
'random_state': 37
})
df = spark.createDataFrame(
[(float(y[r]), Vectors.dense(X[r,:].tolist())) for r in range(X.shape[0])],
['label', 'features'])
LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)\
.fit(df)\
.transform(df)
Pipeline(stages=[
StringIndexer(inputCol='label', outputCol='indexed_label'),
VectorIndexer(inputCol='features', outputCol='indexed_features', maxCategories=4),
DecisionTreeClassifier(labelCol='label', featuresCol='features')])\
.fit(df).transform(df)
Pipeline(stages=[
StringIndexer(inputCol='label', outputCol='indexed_label'),
VectorIndexer(inputCol='features', outputCol='indexed_features', maxCategories=4),
RandomForestClassifier(labelCol='label', featuresCol='features', numTrees=10)])\
.fit(df).transform(df)
Pipeline(stages=[
StringIndexer(inputCol='label', outputCol='indexed_label'),
VectorIndexer(inputCol='features', outputCol='indexed_features', maxCategories=4),
GBTClassifier(labelCol='label', featuresCol='features', maxIter=10)])\
.fit(df).transform(df)
MultilayerPerceptronClassifier(labelCol='label', featuresCol='features',
maxIter=100, layers=[2, 5, 4, 2],
blockSize=128, seed=37)\
.fit(df).transform(df)
LinearSVC(labelCol='label', featuresCol='features', maxIter=10, regParam=0.1)\
.fit(df).transform(df).select('label', 'prediction').show(n=10)
+-----+----------+
|label|prediction|
+-----+----------+
| 0.0| 0.0|
| 0.0| 1.0|
| 1.0| 1.0|
| 1.0| 1.0|
| 0.0| 0.0|
| 0.0| 1.0|
| 1.0| 1.0|
| 1.0| 1.0|
| 0.0| 0.0|
| 1.0| 1.0|
+-----+----------+
only showing top 10 rows
4.9. Clustering
[28]:
from sklearn.datasets import make_blobs
from pyspark.ml.clustering import KMeans
from pyspark.ml.clustering import LDA
X, y = make_blobs(**{
'n_samples': 3000,
'n_features': 15,
'centers': 2,
'cluster_std': 1.0,
'center_box': (-10.0, 10.0),
'random_state': 37
})
df = spark.createDataFrame(
[(float(y[r]), Vectors.dense(X[r,:].tolist())) for r in range(X.shape[0])],
['label', 'features'])
KMeans(featuresCol='features').setK(2).setSeed(37).fit(df).transform(df)\
.select('label', 'prediction').show(n=10)
+-----+----------+
|label|prediction|
+-----+----------+
| 1.0| 0|
| 0.0| 1|
| 0.0| 1|
| 1.0| 0|
| 0.0| 1|
| 1.0| 0|
| 0.0| 1|
| 1.0| 0|
| 0.0| 1|
| 0.0| 1|
+-----+----------+
only showing top 10 rows
4.10. Frequent pattern mining
[29]:
from pyspark.ml.fpm import FPGrowth
df = spark.createDataFrame([
(0, [1, 2, 5]),
(1, [1, 2, 3, 5]),
(2, [1, 2])
], ['id', 'items'])
fpg = FPGrowth(itemsCol='items', minSupport=0.5, minConfidence=0.6)
model = fpg.fit(df)
model.freqItemsets.show()
model.associationRules.show()
model.transform(df).show()
+---------+----+
| items|freq|
+---------+----+
| [1]| 3|
| [2]| 3|
| [2, 1]| 3|
| [5]| 2|
| [5, 2]| 2|
|[5, 2, 1]| 2|
| [5, 1]| 2|
+---------+----+
+----------+----------+------------------+----+
|antecedent|consequent| confidence|lift|
+----------+----------+------------------+----+
| [5, 2]| [1]| 1.0| 1.0|
| [2]| [1]| 1.0| 1.0|
| [2]| [5]|0.6666666666666666| 1.0|
| [2, 1]| [5]|0.6666666666666666| 1.0|
| [5]| [2]| 1.0| 1.0|
| [5]| [1]| 1.0| 1.0|
| [5, 1]| [2]| 1.0| 1.0|
| [1]| [2]| 1.0| 1.0|
| [1]| [5]|0.6666666666666666| 1.0|
+----------+----------+------------------+----+
+---+------------+----------+
| id| items|prediction|
+---+------------+----------+
| 0| [1, 2, 5]| []|
| 1|[1, 2, 3, 5]| []|
| 2| [1, 2]| [5]|
+---+------------+----------+
4.11. Model selection
[ ]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
raw_df = spark.createDataFrame([
(1, 'How to program in Java', 1.0, 0),
(2, 'Java recipes', 1.0, 1),
(3, 'Learn Java in 24 hours', 1.0, 0),
(4, 'Java cookbook basics', 1.0, 1),
(5, 'How to program in Python', 0.0, 0),
(6, 'Python recipes', 0.0, 1),
(7, 'Learn Python in 24 hours', 0.0, 0),
(8, 'Python cookbook basics', 0.0, 1)
], ['id', 'title', 'label', 'fold'])
tokenizer = Tokenizer(inputCol='title', outputCol='words')
hasher = HashingTF(inputCol='words', outputCol='features')
lr = LogisticRegression(maxIter=10)
pipeline = Pipeline(stages=[tokenizer, hasher, lr])
param_grid = ParamGridBuilder() \
.addGrid(hasher.numFeatures, [10, 100, 1000]) \
.addGrid(lr.regParam, [0.1, 0.01]) \
.build()
cross_val = CrossValidator(estimator=pipeline,
estimatorParamMaps=param_grid,
evaluator=BinaryClassificationEvaluator(),
numFolds=2,
foldCol='fold',
parallelism=1)
cv_model = cross_val.fit(raw_df)
cv_model.transform(raw_df).select('label', 'prediction').show(n=10)