7. Machine Learning
7.1. Vectors
7.1.1. Dense vector
[1]:
from pyspark.ml.linalg import Vectors
Vectors.dense([1.0, 2.0, 3.0])
[1]:
DenseVector([1.0, 2.0, 3.0])
7.1.2. Sparse vector
[2]:
Vectors.sparse(5, [(0, 1.0), (2, 3.8), (4, 8.8)])
[2]:
SparseVector(5, {0: 1.0, 2: 3.8, 4: 8.8})
7.1.3. Matrix and DataFrame
[3]:
from sklearn.datasets import make_classification
X, y = make_classification(**{
'n_samples': 2000,
'n_features': 4,
'n_informative': 4,
'n_redundant': 0,
'n_repeated': 0,
'n_classes': 2,
'n_clusters_per_class': 2,
'random_state': 37
})
df = spark.createDataFrame(
[(Vectors.dense(X[r,:].tolist()),) for r in range(X.shape[0])],
['features'])
7.2. Basic statistics
7.2.1. Pearson correlation
[4]:
from pyspark.ml.stat import Correlation
print(str(Correlation.corr(df, 'features').head()[0]))
DenseMatrix([[ 1. , 0.19671305, -0.07915219, -0.3243779 ],
[ 0.19671305, 1. , 0.26944672, 0.00364392],
[-0.07915219, 0.26944672, 1. , -0.28759495],
[-0.3243779 , 0.00364392, -0.28759495, 1. ]])
7.2.2. Spearman correlation
[5]:
print(str(Correlation.corr(df, 'features', 'spearman').head()[0]))
DenseMatrix([[ 1. , 0.21295424, -0.11234501, -0.32188153],
[ 0.21295424, 1. , 0.27054041, -0.07225098],
[-0.11234501, 0.27054041, 1. , -0.28120594],
[-0.32188153, -0.07225098, -0.28120594, 1. ]])
7.2.3. Chi-square test
[6]:
X, y = make_classification(**{
'n_samples': 100,
'n_features': 2,
'n_informative': 2,
'n_redundant': 0,
'n_repeated': 0,
'n_classes': 2,
'n_clusters_per_class': 2,
'random_state': 37
})
df = spark.createDataFrame(
[(float(y[r]), Vectors.dense(X[r,:].tolist())) for r in range(X.shape[0])],
['label', 'features'])
[7]:
from pyspark.ml.stat import ChiSquareTest
r = ChiSquareTest.test(df, 'features', 'label').head()
print(f'{r.pValues} : p-values')
print(f'{r.degreesOfFreedom} : dof')
print(f'{r.statistics} : statistics')
[0.4529585113209542,0.4529585113209542] : p-values
[99, 99] : dof
[100.0,100.0] : statistics
7.2.4. Mean and variance
[8]:
from pyspark.ml.stat import Summarizer
s = Summarizer.metrics('mean', 'variance')
df.select(s.summary(df.features)).show(truncate=False)
+-------------------------------------------------------------------------------------+
|aggregate_metrics(features, 1.0) |
+-------------------------------------------------------------------------------------+
|[[-0.0648518109322975,0.027164400904008706], [1.7191301187548882,1.6490793943014903]]|
+-------------------------------------------------------------------------------------+
7.2.5. Min and max
[9]:
s = Summarizer.metrics('min', 'max')
df.select(s.summary(df.features)).show(truncate=False)
+-------------------------------------------------------------------------------+
|aggregate_metrics(features, 1.0) |
+-------------------------------------------------------------------------------+
|[[-3.229583117714798,-2.70686498114644], [2.653016794529104,2.679714287856398]]|
+-------------------------------------------------------------------------------+
7.2.6. Count
[10]:
s = Summarizer.metrics('count')
df.select(s.summary(df.features)).show(truncate=False)
+--------------------------------+
|aggregate_metrics(features, 1.0)|
+--------------------------------+
|[100] |
+--------------------------------+
7.3. Pipelines
[11]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import StandardScaler
scaler = StandardScaler(inputCol='features', outputCol='scaledFeatures',
withStd=True, withMean=False)
lr = LogisticRegression(maxIter=10, regParam=0.01)
pipeline = Pipeline(stages=[scaler, lr])
model = pipeline.fit(df)
y_preds = model.transform(df)
y_preds.select('probability', 'label', 'prediction').show(truncate=False)
+------------------------------------------+-----+----------+
|probability |label|prediction|
+------------------------------------------+-----+----------+
|[0.8680409101265395,0.13195908987346056] |0.0 |0.0 |
|[0.08030517436073896,0.919694825639261] |0.0 |1.0 |
|[0.0385764052721843,0.9614235947278156] |1.0 |1.0 |
|[0.23941846958850424,0.7605815304114959] |1.0 |1.0 |
|[0.9486667828637316,0.0513332171362683] |0.0 |0.0 |
|[0.24662939567538006,0.7533706043246199] |0.0 |1.0 |
|[0.10000853755198902,0.8999914624480111] |1.0 |1.0 |
|[0.05211571635853627,0.9478842836414636] |1.0 |1.0 |
|[0.8505912777705403,0.14940872222945964] |0.0 |0.0 |
|[0.21719054428407597,0.7828094557159241] |1.0 |1.0 |
|[0.05311183455823478,0.9468881654417654] |1.0 |1.0 |
|[0.9482695764129414,0.05173042358705855] |0.0 |0.0 |
|[0.5078594474186815,0.4921405525813185] |1.0 |0.0 |
|[0.030506033909821213,0.9694939660901789] |1.0 |1.0 |
|[0.18788723612617628,0.8121127638738237] |1.0 |1.0 |
|[0.2723894435175389,0.7276105564824611] |1.0 |1.0 |
|[0.0028093683234259397,0.9971906316765742]|1.0 |1.0 |
|[0.8290838868182774,0.17091611318172253] |0.0 |0.0 |
|[0.8706755894225549,0.12932441057744518] |0.0 |0.0 |
|[0.08386681998270544,0.9161331800172946] |1.0 |1.0 |
+------------------------------------------+-----+----------+
only showing top 20 rows
7.4. Feature extractors
7.4.1. TF-IDF
[12]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
raw_df = spark.createDataFrame([
(0.0, 'How to program in Java'),
(0.0, 'Java recipies'),
(0.0, 'Learn Java in 24 hours'),
(1.0, 'How to program in Python'),
(1.0, 'Python recipies'),
(1.0, 'Learn Python in 24 hours')
], ['label', 'title'])
tokenizer = Tokenizer(inputCol='title', outputCol='words')
hashing = HashingTF(inputCol='words', outputCol='raw_features', numFeatures=10)
idf = IDF(inputCol='raw_features', outputCol='features')
pipeline = Pipeline(stages=[tokenizer, hashing, idf])
model = pipeline.fit(raw_df)
rescale_df = model.transform(raw_df)
rescale_df.select('label', 'features').show(truncate=False)
+-----+--------------------------------------------------------------------------------------------------+
|label|features |
+-----+--------------------------------------------------------------------------------------------------+
|0.0 |(10,[4,5,7,8],[0.8472978603872037,0.0,0.6729444732424258,0.8472978603872037]) |
|0.0 |(10,[5,7],[0.0,0.3364722366212129]) |
|0.0 |(10,[1,5,7,9],[0.8472978603872037,0.0,0.3364722366212129,0.3364722366212129]) |
|1.0 |(10,[4,5,7,8,9],[0.8472978603872037,0.0,0.3364722366212129,0.8472978603872037,0.3364722366212129])|
|1.0 |(10,[5,9],[0.0,0.3364722366212129]) |
|1.0 |(10,[1,5,9],[0.8472978603872037,0.0,0.6729444732424258]) |
+-----+--------------------------------------------------------------------------------------------------+
7.4.2. Word2Vect
[13]:
from pyspark.ml.feature import Word2Vec
raw_df = spark.createDataFrame([
('How to program in Java'.split(' '),),
('Java recipies'.split(' '),),
('Learn Java in 24 hours'.split(' '),),
('How to program in Python'.split(' '),),
('Python recipies'.split(' '),),
('Learn Python in 24 hours'.split(' '),)
], ['text'])
word2Vec = Word2Vec(vectorSize=3, minCount=0, inputCol='text', outputCol='result')
model = word2Vec.fit(raw_df)
result = model.transform(raw_df)
for text, vector in result.collect():
print(f'{text} => {vector}')
['How', 'to', 'program', 'in', 'Java'] => [0.016671489179134368,-0.07890784069895745,-0.061020128056406976]
['Java', 'recipies'] => [-0.00206630676984787,-0.133316308259964,0.0072393231093883514]
['Learn', 'Java', 'in', '24', 'hours'] => [-0.0064692735671997076,-0.0450890451669693,-0.044838495552539825]
['How', 'to', 'program', 'in', 'Python'] => [0.006694729626178742,-0.02815251871943474,-0.06902075484395027]
['Python', 'recipies'] => [-0.02700820565223694,-0.0064280033111572266,-0.01276224385946989]
['Learn', 'Python', 'in', '24', 'hours'] => [-0.016446033120155336,0.005666276812553406,-0.052839122340083124]
7.4.3. Count vectorizer
[14]:
from pyspark.ml.feature import CountVectorizer
raw_df = spark.createDataFrame([
(0, 'at bat cat'.split(' ')),
(1, 'at bat bat cat at'.split(' '))
], ['id', 'words'])
cv = CountVectorizer(inputCol='words', outputCol='features', vocabSize=3, minDF=2.0)
model = cv.fit(raw_df)
result = model.transform(raw_df)
result.show(truncate=False)
+---+-----------------------+-------------------------+
|id |words |features |
+---+-----------------------+-------------------------+
|0 |[at, bat, cat] |(3,[0,1,2],[1.0,1.0,1.0])|
|1 |[at, bat, bat, cat, at]|(3,[0,1,2],[2.0,2.0,1.0])|
+---+-----------------------+-------------------------+
7.4.4. Feature hasher
[15]:
from pyspark.ml.feature import FeatureHasher
raw_df = spark.createDataFrame([
(2.2, True, 'a', 'cat'),
(4.5, False, 'b', 'dog'),
(4.4, False, 'c', 'dog'),
(2.3, True, 'd', 'cat')
], ['x1', 'x2', 'x3', 'x4'])
hasher = FeatureHasher(inputCols=['x1', 'x2', 'x3', 'x4'], outputCol='features')
featurized = hasher.transform(raw_df)
featurized.show(truncate=False)
+---+-----+---+---+--------------------------------------------------------+
|x1 |x2 |x3 |x4 |features |
+---+-----+---+---+--------------------------------------------------------+
|2.2|true |a |cat|(262144,[35046,56751,184035,244783],[1.0,1.0,2.2,1.0]) |
|4.5|false|b |dog|(262144,[162446,179156,184035,223707],[1.0,1.0,4.5,1.0])|
|4.4|false|c |dog|(262144,[121161,162446,179156,184035],[1.0,1.0,1.0,4.4])|
|2.3|true |d |cat|(262144,[5506,35046,56751,184035],[1.0,1.0,1.0,2.3]) |
+---+-----+---+---+--------------------------------------------------------+
7.5. Feature transformers
7.5.1. Tokenizer
[16]:
from pyspark.ml.feature import Tokenizer, RegexTokenizer
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType
raw_df = spark.createDataFrame([
(1, 'How to program in Java'),
(2, 'Java recipies'),
(3, 'Learn Java in 24 hours'),
(4, 'How to program in Python'),
(5, 'Python recipies'),
(6, 'Learn Python in 24 hours')
], ['id', 'title'])
tokenizer1 = Tokenizer(inputCol='title', outputCol='words')
tokenizer2 = RegexTokenizer(inputCol='title', outputCol='words', pattern="\\W")
tokenized1 = tokenizer1.transform(raw_df)
tokenized2 = tokenizer2.transform(raw_df)
counter = udf(lambda words: len(words), IntegerType())
tokenized1\
.select('title', 'words')\
.withColumn('tokens', counter(col('words')))\
.show(truncate=False)
tokenized1\
.select('title', 'words') \
.withColumn('tokens', counter(col('words')))\
.show(truncate=False)
+------------------------+------------------------------+------+
|title |words |tokens|
+------------------------+------------------------------+------+
|How to program in Java |[how, to, program, in, java] |5 |
|Java recipies |[java, recipies] |2 |
|Learn Java in 24 hours |[learn, java, in, 24, hours] |5 |
|How to program in Python|[how, to, program, in, python]|5 |
|Python recipies |[python, recipies] |2 |
|Learn Python in 24 hours|[learn, python, in, 24, hours]|5 |
+------------------------+------------------------------+------+
+------------------------+------------------------------+------+
|title |words |tokens|
+------------------------+------------------------------+------+
|How to program in Java |[how, to, program, in, java] |5 |
|Java recipies |[java, recipies] |2 |
|Learn Java in 24 hours |[learn, java, in, 24, hours] |5 |
|How to program in Python|[how, to, program, in, python]|5 |
|Python recipies |[python, recipies] |2 |
|Learn Python in 24 hours|[learn, python, in, 24, hours]|5 |
+------------------------+------------------------------+------+
7.5.2. Stop words remover
[17]:
from pyspark.ml.feature import StopWordsRemover
raw_df = spark.createDataFrame([
(1, 'How to program in Java'.split(' ')),
(2, 'Java recipies'.split(' ')),
(3, 'Learn Java in 24 hours'.split(' ')),
(4, 'How to program in Python'.split(' ')),
(5, 'Python recipies'.split(' ')),
(6, 'Learn Python in 24 hours'.split(' '))
], ['id', 'title'])
remover = StopWordsRemover(inputCol='title', outputCol='filtered')
remover.transform(raw_df).show(truncate=False)
+---+------------------------------+--------------------------+
|id |title |filtered |
+---+------------------------------+--------------------------+
|1 |[How, to, program, in, Java] |[program, Java] |
|2 |[Java, recipies] |[Java, recipies] |
|3 |[Learn, Java, in, 24, hours] |[Learn, Java, 24, hours] |
|4 |[How, to, program, in, Python]|[program, Python] |
|5 |[Python, recipies] |[Python, recipies] |
|6 |[Learn, Python, in, 24, hours]|[Learn, Python, 24, hours]|
+---+------------------------------+--------------------------+
7.5.3. n-gram
[18]:
from pyspark.ml.feature import NGram
raw_df = spark.createDataFrame([
(1, 'How to program in Java'.split(' ')),
(2, 'Java recipies'.split(' ')),
(3, 'Learn Java in 24 hours'.split(' ')),
(4, 'How to program in Python'.split(' ')),
(5, 'Python recipies'.split(' ')),
(6, 'Learn Python in 24 hours'.split(' '))
], ['id', 'title'])
ngram = NGram(n=2, inputCol='title', outputCol='ngrams')
ngramDataFrame = ngram.transform(raw_df)
ngramDataFrame.select('ngrams').show(truncate=False)
+-------------------------------------------+
|ngrams |
+-------------------------------------------+
|[How to, to program, program in, in Java] |
|[Java recipies] |
|[Learn Java, Java in, in 24, 24 hours] |
|[How to, to program, program in, in Python]|
|[Python recipies] |
|[Learn Python, Python in, in 24, 24 hours] |
+-------------------------------------------+
7.5.4. Binarizer
[19]:
from pyspark.ml.feature import Binarizer
raw_df = spark.createDataFrame([
(0, 0.1),
(1, 0.8),
(2, 0.2)
], ['id', 'feature'])
binarizer = Binarizer(threshold=0.5, inputCol='feature', outputCol='binarized_feature')
bin_df = binarizer.transform(raw_df)
print(f'Binarizer output with Threshold = {binarizer.getThreshold()}')
bin_df.show()
Binarizer output with Threshold = 0.5
+---+-------+-----------------+
| id|feature|binarized_feature|
+---+-------+-----------------+
| 0| 0.1| 0.0|
| 1| 0.8| 1.0|
| 2| 0.2| 0.0|
+---+-------+-----------------+
7.5.5. Principal component analysis
[20]:
from pyspark.ml.feature import PCA
X, y = make_classification(**{
'n_samples': 10,
'n_features': 3,
'n_informative': 2,
'n_redundant': 0,
'n_repeated': 0,
'n_classes': 2,
'n_clusters_per_class': 2,
'random_state': 37
})
raw_df = spark.createDataFrame(
[(float(y[r]), Vectors.dense(X[r,:].tolist())) for r in range(X.shape[0])],
['label', 'features'])
pca = PCA(k=2, inputCol='features', outputCol='pca_features')
model = pca.fit(raw_df)
result = model.transform(raw_df)
result.show(truncate=False)
+-----+--------------------------------------------------------------+-----------------------------------------+
|label|features |pca_features |
+-----+--------------------------------------------------------------+-----------------------------------------+
|1.0 |[-0.1398124360852646,1.9404337498133493,0.4441642252501767] |[-1.7146272018673947,0.14595662137262444]|
|0.0 |[-3.1210741978178023,-2.376881942973619,-0.009668122604746449]|[2.977397753291397,-2.234391686043509] |
|0.0 |[-1.4675982457620498,-1.1426554535428304,-0.3958504784847716] |[1.3403558685391892,-1.3153421310337556] |
|1.0 |[-0.6816497596706191,1.2805160706325531,-0.1380411277034303] |[-1.0867726484463247,-0.6289265701421938]|
|0.0 |[0.6329591387069822,-0.5435350586379027,-0.2260626440162484] |[0.3209687438850598,0.32628910847052517] |
|1.0 |[-1.1306477069338503,2.506180828180563,-0.7930780368681248] |[-2.2856106973303487,-1.4360947516967975]|
|1.0 |[-1.0858904463756514,1.449819729192644,-2.0470674393378254] |[-1.5606498256322983,-2.2249862380766023]|
|0.0 |[-0.31513865899791305,-0.9013733970213405,1.2737757744041067] |[1.2000632834593374,0.6567660856334425] |
|0.0 |[1.0463616288162525,-1.0554542219563587,1.6279535580599434] |[1.1061809462255545,1.9010543059395633] |
|1.0 |[0.7147386521308495,1.2019792718852083,1.75902821212546] |[-0.9315862509771586,1.6852330903616992] |
+-----+--------------------------------------------------------------+-----------------------------------------+
7.5.6. String indexer
[21]:
from pyspark.ml.feature import StringIndexer
raw_df = spark.createDataFrame([
(0, 'rat'),
(1, 'bat'),
(2, 'cat'),
(3, 'rat'),
(4, 'rat'),
(5, 'cat')], ['id', 'category'])
StringIndexer(inputCol='category', outputCol='category_index')\
.fit(raw_df)\
.transform(raw_df)\
.show()
+---+--------+--------------+
| id|category|category_index|
+---+--------+--------------+
| 0| rat| 0.0|
| 1| bat| 2.0|
| 2| cat| 1.0|
| 3| rat| 0.0|
| 4| rat| 0.0|
| 5| cat| 1.0|
+---+--------+--------------+
7.5.7. One-hot encoding
[22]:
from pyspark.ml.feature import OneHotEncoderEstimator
raw_df = spark.createDataFrame([
(0.0, 1.0),
(0.0, 0.0),
(1.0, 1.0),
(1.0, 0.0),
(2.0, 2.0)
], ['is_male', 'is_adult'])
encoder = OneHotEncoderEstimator(inputCols=['is_male', 'is_adult'], outputCols=['is_male_vec', 'is_adult_vec'])
model = encoder.fit(raw_df)
encoded = model.transform(raw_df)
encoded.show()
+-------+--------+-------------+-------------+
|is_male|is_adult| is_male_vec| is_adult_vec|
+-------+--------+-------------+-------------+
| 0.0| 1.0|(2,[0],[1.0])|(2,[1],[1.0])|
| 0.0| 0.0|(2,[0],[1.0])|(2,[0],[1.0])|
| 1.0| 1.0|(2,[1],[1.0])|(2,[1],[1.0])|
| 1.0| 0.0|(2,[1],[1.0])|(2,[0],[1.0])|
| 2.0| 2.0| (2,[],[])| (2,[],[])|
+-------+--------+-------------+-------------+
7.5.8. SQL transformer
[23]:
from pyspark.ml.feature import SQLTransformer
raw_df = spark.createDataFrame([
(0, 0.98, 0.88),
(1, 0.88, 0.77)
], ['id', 'homework', 'exam'])
transformer = SQLTransformer(
statement='SELECT *, (0.6 * homework + 0.4 * exam) as grade FROM __THIS__')
transformer.transform(raw_df).show()
+---+--------+----+------------------+
| id|homework|exam| grade|
+---+--------+----+------------------+
| 0| 0.98|0.88| 0.94|
| 1| 0.88|0.77|0.8360000000000001|
+---+--------+----+------------------+
7.5.9. Imputer
[24]:
from pyspark.ml.feature import Imputer
raw_df = spark.createDataFrame([
(1.0, float('nan')),
(2.0, float('nan')),
(float('nan'), 3.0),
(4.0, 4.0),
(5.0, 5.0)
], ['x1', 'x2'])
Imputer(inputCols=['x1', 'x2'], outputCols=['x1_imputed', 'x2_imputed'])\
.fit(raw_df)\
.transform(raw_df)\
.show()
+---+---+----------+----------+
| x1| x2|x1_imputed|x2_imputed|
+---+---+----------+----------+
|1.0|NaN| 1.0| 4.0|
|2.0|NaN| 2.0| 4.0|
|NaN|3.0| 3.0| 3.0|
|4.0|4.0| 4.0| 4.0|
|5.0|5.0| 5.0| 5.0|
+---+---+----------+----------+
7.5.10. Vector slicer
[25]:
from pyspark.ml.feature import VectorSlicer
from pyspark.ml.linalg import Vectors
from pyspark.sql.types import Row
raw_df = spark.createDataFrame([
Row(user_features=Vectors.dense([-1.0, 1.3, 3.0])),
Row(user_features=Vectors.dense([-2.0, 2.3, 0.0]))])
VectorSlicer(inputCol='user_features', outputCol='features', indices=[1])\
.transform(raw_df)\
.select('user_features', 'features')\
.show()
+--------------+--------+
| user_features|features|
+--------------+--------+
|[-1.0,1.3,3.0]| [1.3]|
|[-2.0,2.3,0.0]| [2.3]|
+--------------+--------+
7.6. Regression
[26]:
from sklearn.datasets import make_regression
from pyspark.ml.regression import LinearRegression
from pyspark.ml.regression import GeneralizedLinearRegression
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.regression import GBTRegressor
X, y = make_regression(**{
'n_samples': 100,
'n_features': 4,
'n_informative': 4,
'n_targets': 1,
'bias': 5.3,
'random_state': 37
})
df = spark.createDataFrame(
[(float(y[r]), Vectors.dense(X[r,:].tolist())) for r in range(X.shape[0])],
['label', 'features'])
LinearRegression(labelCol='label', featuresCol='features',
maxIter=10, regParam=0.3, elasticNetParam=0.8)\
.fit(df).transform(df)
GeneralizedLinearRegression(labelCol='label', featuresCol='features',
family='gaussian', link='identity',
maxIter=10, regParam=0.3)\
.fit(df).transform(df)
DecisionTreeRegressor(labelCol='label', featuresCol='features')\
.fit(df).transform(df)
RandomForestRegressor(labelCol='label', featuresCol='features')\
.fit(df).transform(df)
GBTRegressor(labelCol='label', featuresCol='features', maxIter=10)\
.fit(df).transform(df).select('label', 'prediction').show(n=10)
+-------------------+-------------------+
| label| prediction|
+-------------------+-------------------+
|-156.97053337870528|-159.70341330046185|
| 28.805518697488676| 30.151952001502938|
| 53.26548360120433| 50.65135193096635|
| 76.47476360452336| 77.0946010330639|
|-33.355650442108725|-36.325758608140816|
|-3.6605327490473867| -9.28390361398866|
| -74.72068617488857| -72.259527862171|
| 22.540384098077414| 19.046555779007523|
| 6.261118649116621| 12.180035472252564|
| 31.669985186286436| 29.840281061212405|
+-------------------+-------------------+
only showing top 10 rows
7.7. Classificaton
[27]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.classification import LinearSVC
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer
X, y = make_classification(**{
'n_samples': 100,
'n_features': 2,
'n_informative': 2,
'n_redundant': 0,
'n_repeated': 0,
'n_classes': 2,
'n_clusters_per_class': 2,
'random_state': 37
})
df = spark.createDataFrame(
[(float(y[r]), Vectors.dense(X[r,:].tolist())) for r in range(X.shape[0])],
['label', 'features'])
LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)\
.fit(df)\
.transform(df)
Pipeline(stages=[
StringIndexer(inputCol='label', outputCol='indexed_label'),
VectorIndexer(inputCol='features', outputCol='indexed_features', maxCategories=4),
DecisionTreeClassifier(labelCol='label', featuresCol='features')])\
.fit(df).transform(df)
Pipeline(stages=[
StringIndexer(inputCol='label', outputCol='indexed_label'),
VectorIndexer(inputCol='features', outputCol='indexed_features', maxCategories=4),
RandomForestClassifier(labelCol='label', featuresCol='features', numTrees=10)])\
.fit(df).transform(df)
Pipeline(stages=[
StringIndexer(inputCol='label', outputCol='indexed_label'),
VectorIndexer(inputCol='features', outputCol='indexed_features', maxCategories=4),
GBTClassifier(labelCol='label', featuresCol='features', maxIter=10)])\
.fit(df).transform(df)
MultilayerPerceptronClassifier(labelCol='label', featuresCol='features',
maxIter=100, layers=[2, 5, 4, 2],
blockSize=128, seed=37)\
.fit(df).transform(df)
LinearSVC(labelCol='label', featuresCol='features', maxIter=10, regParam=0.1)\
.fit(df).transform(df).select('label', 'prediction').show(n=10)
+-----+----------+
|label|prediction|
+-----+----------+
| 0.0| 0.0|
| 0.0| 1.0|
| 1.0| 1.0|
| 1.0| 1.0|
| 0.0| 0.0|
| 0.0| 1.0|
| 1.0| 1.0|
| 1.0| 1.0|
| 0.0| 0.0|
| 1.0| 1.0|
+-----+----------+
only showing top 10 rows
7.8. Clustering
[28]:
from sklearn.datasets import make_blobs
from pyspark.ml.clustering import KMeans
from pyspark.ml.clustering import LDA
X, y = make_blobs(**{
'n_samples': 3000,
'n_features': 15,
'centers': 2,
'cluster_std': 1.0,
'center_box': (-10.0, 10.0),
'random_state': 37
})
df = spark.createDataFrame(
[(float(y[r]), Vectors.dense(X[r,:].tolist())) for r in range(X.shape[0])],
['label', 'features'])
KMeans(featuresCol='features').setK(2).setSeed(37).fit(df).transform(df)\
.select('label', 'prediction').show(n=10)
+-----+----------+
|label|prediction|
+-----+----------+
| 1.0| 0|
| 0.0| 1|
| 0.0| 1|
| 1.0| 0|
| 0.0| 1|
| 1.0| 0|
| 0.0| 1|
| 1.0| 0|
| 0.0| 1|
| 0.0| 1|
+-----+----------+
only showing top 10 rows
7.9. Frequent pattern mining
[29]:
from pyspark.ml.fpm import FPGrowth
df = spark.createDataFrame([
(0, [1, 2, 5]),
(1, [1, 2, 3, 5]),
(2, [1, 2])
], ['id', 'items'])
fpg = FPGrowth(itemsCol='items', minSupport=0.5, minConfidence=0.6)
model = fpg.fit(df)
model.freqItemsets.show()
model.associationRules.show()
model.transform(df).show()
+---------+----+
| items|freq|
+---------+----+
| [1]| 3|
| [2]| 3|
| [2, 1]| 3|
| [5]| 2|
| [5, 2]| 2|
|[5, 2, 1]| 2|
| [5, 1]| 2|
+---------+----+
+----------+----------+------------------+----+
|antecedent|consequent| confidence|lift|
+----------+----------+------------------+----+
| [5, 2]| [1]| 1.0| 1.0|
| [2]| [1]| 1.0| 1.0|
| [2]| [5]|0.6666666666666666| 1.0|
| [2, 1]| [5]|0.6666666666666666| 1.0|
| [5]| [2]| 1.0| 1.0|
| [5]| [1]| 1.0| 1.0|
| [5, 1]| [2]| 1.0| 1.0|
| [1]| [2]| 1.0| 1.0|
| [1]| [5]|0.6666666666666666| 1.0|
+----------+----------+------------------+----+
+---+------------+----------+
| id| items|prediction|
+---+------------+----------+
| 0| [1, 2, 5]| []|
| 1|[1, 2, 3, 5]| []|
| 2| [1, 2]| [5]|
+---+------------+----------+
7.10. Model selection
[30]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
raw_df = spark.createDataFrame([
(1, 'How to program in Java', 1.0),
(2, 'Java recipies', 1.0),
(3, 'Learn Java in 24 hours', 1.0),
(4, 'How to program in Python', 0.0),
(5, 'Python recipies', 0.0),
(6, 'Learn Python in 24 hours', 0.0)
], ['id', 'title', 'label'])
tokenizer = Tokenizer(inputCol='title', outputCol='words')
hasher = HashingTF(inputCol='words', outputCol='features')
lr = LogisticRegression(maxIter=10)
pipeline = Pipeline(stages=[tokenizer, hasher, lr])
param_grid = ParamGridBuilder() \
.addGrid(hasher.numFeatures, [10, 100, 1000]) \
.addGrid(lr.regParam, [0.1, 0.01]) \
.build()
cross_val = CrossValidator(estimator=pipeline,
estimatorParamMaps=param_grid,
evaluator=BinaryClassificationEvaluator(),
numFolds=2)
cv_model = cross_val.fit(raw_df)
cv_model.transform(raw_df).select('label', 'prediction').show(n=10)
+-----+----------+
|label|prediction|
+-----+----------+
| 1.0| 1.0|
| 1.0| 1.0|
| 1.0| 1.0|
| 0.0| 0.0|
| 0.0| 0.0|
| 0.0| 0.0|
+-----+----------+