《机器学习实战》笔记 (第八章和第九章) (附Python3版代码)(Machine Learning in Action)
前言:
《机器学习实战》(Machine Learning in Action)是一本常见的机器学习入门书,书中代码由Python2写成。由于现时Python2已逐渐退出舞台,所以这篇文章将该书的所有代码部分用Python3重写。
代码上传GitHub: https://github.com/kungbob/Machine_Learning_In_Action
原版Python2代码:https://www.manning.com/books/machine-learning-in-action
注: 第八章和第九章主要内容是用不同的回归方法来预测数据,可视为第五章逻辑回归的扩展。
第八章: 预测数值型数据:回归
- 线性回归
优点: 结果已于理解,计算上不复杂。
缺点: 对非线性的数据拟合不好。
适用数据类型: 数值型和标称型数据。
一般方法
- 收集数据: 采用任意方法收集数据。
- 准备数据: 回归需要数值型数据,标称型数据将被转成二值型数据(0或1)。
- 分析数据: 会出数据的可视化二维图讲有助于对数值做出理解和分析,采用缩减发求得新回归系数后,可以讲拟合线在图上左对比。
- 训练算法: 找到回归系数。
- 测试算法: 使用\(R^2\)或者预测值和数据的拟合度,分析模型的效果。
- 使用算法: 使用回归可以在给定输入的时候预测出一个数值。
示例:用回归法预测乐高套装的价格
注: 书中的Google API并不可用。
- 收集数据: 用Google Shopping的API收集数据。
- 准备数据: 从返回的JSON数据中抽取价格。
- 分析数据: 可视化并观察数据。
- 训练算法: 构建不同的模型,采用逐步线性回归和直接的线性回归模型。
- 测试算法: 使用交叉验证来测试不同的模型,分析哪个效果最好。
- 使用算法: 生成数据模型。
regression.py完整代码(章节中Python命令行代码在GitHub):
from numpy import *
from time import sleep
import json, urllib.request
# Stand regression function and data import function, program 8_1
def loadDataSet(fileName):
numFeat = len(open(fileName).readline().split('\t')) - 1
dataMat = []
labelMat = []
fr = open(fileName)
for line in fr.readlines():
lineArr = []
curLine = line.strip().split('\t')
for i in range(numFeat):
lineArr.append(float(curLine[i]))
dataMat.append(lineArr)
labelMat.append(float(curLine[-1]))
return dataMat, labelMat
def standRegres(xArr, yArr):
xMat = mat(xArr)
yMat = mat(yArr).T
xTx = xMat.T * xMat
if linalg.det(xTx) == 0.0:
print("This matrix is singular, cannot do inverse")
return
ws = xTx.I * (xMat.T * yMat)
return ws
# weighting regression function, program 8_2
def lwlr(testPoint, xArr, yArr, k = 1.0):
xMat = mat(xArr)
yMat = mat(yArr).T
m = shape(xMat)[0]
# Create diagonal matrix
weights = mat(eye((m)))
for j in range(m):
# Decrease the weighting value in expotential
diffMat = testPoint - xMat[j, :]
weights[j, j] = exp(diffMat * diffMat.T / (-2.0 * k ** 2))
xTx = xMat.T * (weights * xMat)
if linalg.det(xTx) == 0.0:
print("this matrix is singular, cannot do inverse")
return
ws = xTx.I * (xMat.T * (weights * yMat))
return testPoint * ws
def lwlrTest(testArr, xArr, yArr, k = 1.0):
m = shape(testArr)[0]
yHat = zeros(m)
for i in range(m):
yHat[i] = lwlr(testArr[i], xArr, yArr, k)
return yHat
# Deducing abalone's age, chapter 8_3
def rssError(yArr, yHatArr):
return ((yArr - yHatArr) ** 2).sum()
# Ridge regression, program 8_3
def ridgeRegres(xMat, yMat, lam = 0.2):
xTx = xMat.T * xMat
denom = xTx + eye(shape(xMat)[1]) * lam
if linalg.det(denom) == 0.0:
print("This matrix is singular, cannot do inverse")
return
ws = denom.I * (xMat.T * yMat)
return ws
def ridgeTest(xArr, yArr):
xMat = mat(xArr)
yMat = mat(yArr).T
yMean = mean(yMat, 0)
yMat = yMat - yMean
# Data regularization
xMeans = mean(xMat, 0)
xVar = var(xMat, 0)
xMat = (xMat - xMeans) / xVar
numTestPts = 30
wMat = zeros((numTestPts, shape(xMat)[1]))
for i in range(numTestPts):
ws = ridgeRegres(xMat, yMat, exp(i-10))
wMat[i, :] = ws.T
return wMat
# Not shown in the book, provided in source code
# Regularize by columns
def regularize(xMat):
inMat = xMat.copy()
# Calc mean then subtract it off
inMeans = mean(inMat,0)
# Calc variance of Xi then divide by it
inVar = var(inMat,0)
inMat = (inMat - inMeans)/inVar
return inMat
# Forward stagewise regression, program 8_4
def stageWise(xArr, yArr, eps = 0.01, numIt = 100):
xMat = mat(xArr)
yMat = mat(yArr).T
yMean = mean(yMat, 0)
yMat = yMat - yMean
xMat = regularize(xMat)
m, n = shape(xMat)
returnMat = zeros((numIt, n))
ws = zeros((n, 1))
wsTest = ws.copy()
wsMax = ws.copy()
for i in range(numIt):
print(ws.T)
lowestError = inf
for j in range(n):
for sign in [-1, 1]:
wsTest = ws.copy()
wsTest[j] += eps * sign
yTest = xMat * wsTest
rssE = rssError(yMat.A, yTest.A)
if rssE < lowestError:
lowestError = rssE
wsMax = wsTest
ws = wsMax.copy()
returnMat[i, :] = ws.T
return returnMat
# Retrieving shopping message function, program 8_5
# Outdated API, should return 404 Error
def searchForSet(retX, retY, setNum, yr, numPce, origPrc):
sleep(10)
myAPIstr = 'get from code.google.com'
# Replace own API with myAPIstr
searchURL = 'https://www.googleapis.com/shopping/search/v1/public/products?key=%s&country=US&q=lego+%d&alt=json' % (myAPIstr, setNum)
print(searchURL)
pg = urllib.request.urlopen(searchURL)
retDict = json.loads(pg.read())
for i in range(len(retDict['items'])):
try:
currItem = retDict['items'][i]
if currItem['product']['condition'] == 'new':
newFlag = 1
else:
newFlag = 0
listOfInv = currItem['product']['inventories']
for item in listOfInv:
sellingPrice = item['price']
# Filter imcomplete set
if sellingPrice > origPrc * 0.5:
print("%d\t%d\t%d\t%f\t%f" %\
(yr, numPce, newFlag, origPrc, sellingPrice))
retX.append([yr, numPce, newFlag, origPrc])
retY.append(sellingPrice)
except:
print("problem with item %d" % i)
def setDataCollect(retX, retY):
searchForSet(retX, retY, 8288, 2006, 800, 49.99)
searchForSet(retX, retY, 10030, 2002, 3096, 269.99)
searchForSet(retX, retY, 10179, 2007, 5195, 499.99)
searchForSet(retX, retY, 10181, 2007, 3428, 199.99)
searchForSet(retX, retY, 10189, 2008, 5922, 299.99)
searchForSet(retX, retY, 10196, 2009, 3263, 249.99)
# Cross validation test on ridges regression, program 8_6
def crossValidation(xArr, yArr, numVal = 10):
m = len(yArr)
indexList = range(m)
errorMat = zeros((numVal, 30))
for i in range(numVal):
# Create training set and testing set
trainX = []
trainY = []
testX = []
testY = []
random.shuffle(indexList)
for j in range(m):
# Append data into training set and testing set
if j < m * 0.9:
trainX.append(xArr[indexList[j]])
trainY.append(yArr[indexList[j]])
else:
testX.append(xArr[indexList[j]])
testY.append(yArr[indexList[j]])
wMat = ridgeTest(trainX, trainY)
for k in range(30):
# Standardize all testing data using training data
matTestX = mat(testX)
matTrainX = mat(trainX)
meanTrain = mean(matTrainX, 0)
varTrain = var(matTrainX, 0)
matTestX = (matTestX - meanTrain) / varTrain
yEst = matTestX * mat(wMat[k, :]).T + mean(trainY)
errorMat[i, k] = rssError(yEst.T.A, array(testY))
meanErrors = mean(errorMat, 0)
minMean = float(min(meanErrors))
bestWeights = wMat[nonzero(meanErrors == minMean)]
xMat = mat(xArr)
yMat = amt(yArr).T
meanX = mean(xMat, 0)
varX = var(xMat, 0)
unReg = bestWeights / varX
print("the best model from Ridge Regression is:\n", unReg)
print("with constant term: ", \
-1 * sum(multiply(meanX, unReg)) + mean(yMat))
第九章: 树回归
优点: 可以对复杂和非线性的数据建模。
缺点: 结果不易理解。
适用数据类型: 数值型和标称型数据。
树回归的一般方法
- 收集数据: 采用任意方法收集数据。
- 准备数据: 需要数值型的数据,标称型数据应该映射成二值型数据。
- 分析数据: 绘出数据的二维可视化显示结果,以字典方式生成树。
- 训练算法: 大部分时间都花费在叶节点树模型的构建上。
- 测试算法: 使用测试数据上的\(R^2\)值来分析模型的结果。
- 使用算法: 使用训练出的树做预测,预测结果还可以用来做很多事情。
示例: 利用GUI对回归树调优
- 收集数据: 提供的文本文件。
- 准备数据: 用Python解析上述文件,得到数值型数据。
- 分析数据: 用Tkinter构建一个GUI来展示模型和数据。
- 训练算法: 训练一颗回归树和一颗模型树,并于数据集一起展示出来。
- 测试算法: 不需要。
- 使用算法: 透过GUI测试不同参数的影响,帮助选择模型的类型。
regTrees.py完整代码(章节中Python命令行代码在GitHub):
from numpy import *
class treeNode():
def __init__(self, feat, val, right, left):
featureToSplitOn = feat
valueOfSplit = val
rightBranch = right
leftBranch = left
# Implementation of CART algorithm, program 9_1
def loadDataSet(fileName):
dataMat = []
fr = open(fileName)
for line in fr.readlines():
curLine = line.strip().split('\t')
# Change every line into float number
fltLine = list(map(float, curLine))
dataMat.append(fltLine)
return dataMat
def binSplitDataSet(dataSet, feature, value):
mat0 = dataSet[nonzero(dataSet[:, feature] > value)[0], :]
mat1 = dataSet[nonzero(dataSet[:, feature] <= value)[0], :]
return mat0, mat1
# Split function of regression Tree, program 9_2
def regLeaf(dataSet):
return mean(dataSet[:, -1])
def regErr(dataSet):
return var(dataSet[:, -1]) * shape(dataSet)[0]
def createTree(dataSet, leafType = regLeaf, errType = regErr, ops = (1, 4)):
feat, val = chooseBestSplit(dataSet, leafType, errType, ops)
# Return node value if requirements are satisfied
if feat == None:
return val
retTree = {}
retTree['spInd'] = feat
retTree['spVal'] = val
lSet, rSet = binSplitDataSet(dataSet, feat, val)
retTree['left'] = createTree(lSet, leafType, errType, ops)
retTree['right'] = createTree(rSet, leafType, errType, ops)
return retTree
def chooseBestSplit(dataSet, leafType = regLeaf, errType = regErr, ops= (1, 4)):
tolS = ops[0]
tolN = ops[1]
# If all values are equal, return
if len(set(dataSet[:, -1].T.tolist()[0])) == 1:
return None, leafType(dataSet)
m, n = shape(dataSet)
S = errType(dataSet)
bestS = inf
bestIndex = 0
bestValue = 0
for featIndex in range(n - 1):
for splitVal in set(dataSet[:, featIndex].T.tolist()[0]):
mat0, mat1 = binSplitDataSet(dataSet, featIndex, splitVal)
if (shape(mat0)[0] < tolN) or (shape(mat1)[0] < tolN):
continue
newS = errType(mat0) + errType(mat1)
if newS < bestS:
bestIndex = featIndex
bestValue = splitVal
bestS = newS
# If error does not reduce, return
if (S - bestS) < tolS:
return None, leafType(dataSet)
mat0, mat1 = binSplitDataSet(dataSet, bestIndex, bestValue)
# If new data set is small, return
if (shape(mat0)[0] < tolN) or (shape(mat1)[0] < tolN):
return None, leafType(dataSet)
return bestIndex, bestValue
def isTree(obj):
return (type(obj).__name__ == 'dict')
def getMean(tree):
if isTree(tree['right']):
tree['right'] = getMean(tree['right'])
if isTree(tree['left']):
tree['left'] = getMean(tree['left'])
return (tree['left'] + tree['right']) / 2.0
def prune(tree, testData):
# If no testing data, delete tree
if shape(testData)[0] == 0:
return getMean(tree)
if (isTree(tree['right']) or isTree(tree['left'])):
lSet, rSet = binSplitDataSet(testData, tree['spInd'], tree['spVal'])
if isTree(tree['left']):
tree['left'] = prune(tree['left'], lSet)
if isTree(tree['right']):
tree['right'] = prune(tree['right'], rSet)
if not isTree(tree['left']) and not isTree(tree['right']):
lSet, rSet = binSplitDataSet(testData, tree['spInd'], tree['spVal'])
errorNoMerge = sum(power(lSet[:, -1] - tree['left'], 2)) + \
sum(power(rSet[:, -1] - tree['right'], 2))
treeMean = (tree['left'] + tree['right']) / 2.0
errorMerge = sum(power(testData[:, -1] - treeMean, 2))
if errorMerge < errorNoMerge:
print("merging")
return treeMean
else:
return treeMean
else:
return tree
# Model Tree's leaf node generation, program 9_4
def linearSolve(dataSet):
m, n = shape(dataSet)
# Set the data of X and Y
X = mat(ones((m, n)))
Y = mat(ones((m, 1)))
X[:, 1:n] = dataSet[:, 0: n-1]
Y = dataSet[:, -1]
xTx = X.T * X
if linalg.det(xTx) == 0.0:
raise NameError('This matrix is singular, cannot do inverse, \n\
try increasing the second value of ops')
ws = xTx.I * (X.T * Y)
return ws, X, Y
def modelLeaf(dataSet):
ws, X, Y = linearSolve(dataSet)
yHat = X * ws
return sum(power(Y - yHat, 2))
def modelErr(dataSet):
ws, X, Y = linearSolve(dataSet)
yHat = X * ws
return sum(power(Y - yHat, 2))
# Use tree regression to predict, program 9_5
def regTreeEval(model, inDat):
return float(model)
def modelTreeEval(model, inDat):
n = inDat.shape[1]
X = mat(ones((1, n)))
X[:, 1: n] = inDat[:, :-1]
return float(X * model)
def treeForeCast(tree, inData, modelEval= regTreeEval):
if not isTree(tree):
return modelEval(tree, inData)
if inData[tree['spInd']] > tree['spVal']:
if isTree(tree['left']):
return treeForeCast(tree['left'], inData, modelEval)
else:
return modelEval(tree['left'], inData)
else:
if isTree(tree['right']):
return treeForeCast(tree['right'], inData, modelEval)
else:
return modelEval(tree['right'], inData)
def createForeCast(tree, testData, modelEval= regTreeEval):
m = len(testData)
yHat = mat(zeros((m, 1)))
for i in range(m):
yHat[i, 0] = treeForeCast(tree, mat(testData[i]), modelEval)
return yHat
treeExplore.py完整代码(章节中Python命令行代码在GitHub):
from numpy import *
from tkinter import *
import regTrees
# Combine Matplotlib and tkinter, program 9_7
import matplotlib
matplotlib.use('TkAgg')
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
from matplotlib.figure import Figure
def reDraw(tolS, tolN):
reDraw.f.clf()
reDraw.a = reDraw.f.add_subplot(111)
# Check which is selected
if chkBtnVar.get():
if tolN < 2:
tolN = 2
myTree = regTrees.createTree(reDraw.rawDat, regTrees.modelLeaf, \
regTrees.modelErr, (tolS, tolN))
yHat = regTrees.createForeCast(myTree, reDraw.testDat, \
regTrees.modelTreeEval)
else:
myTree = regTrees.createTree(reDraw.rawDat, ops = (tolS, tolN))
yHat = regTrees.createForeCast(myTree, reDraw.testDat)
reDraw.a.scatter(reDraw.rawDat[:, 0].A, reDraw.rawDat[:, 1].A, s = 5)
reDraw.a.plot(reDraw.testDat, yHat, linewidth = 2.0)
reDraw.canvas.show()
def getInputs():
try:
tolN = int(tolNentry.get())
except:
tolN = 10
print("enter Integer for tolN")
# Clear wrong input and replace with default value
tolNentry.delete(0, END)
tolNentry.insert(0, '10')
try:
tolS = float(tolSentry.get())
except:
tolS = 1.0
print("enter Float for tolS")
tolSentry.delete(0, end)
TOLsENTRY.INSERT(0, '1.0')
return tolN, tolS
def drawNewTree():
tolN, tolS = getInputs()
reDraw(tolS, tolN)
root = Tk()
# Abandon in chapter 9_7_2
# Label(root, text= "Plot Place Holder").grid(row= 0, columnspan= 3)
# Added in chapter 9_7_2
reDraw.f = Figure(figsize = (5, 1), dpi = 100)
reDraw.canvas = FigureCanvasTkAgg(reDraw.f, master = root)
reDraw.canvas.show()
reDraw.canvas.get_tk_widget().grid(row = 0, columnspan = 3)
Label(root, text= "tolN").grid(row = 1, column = 0)
tolNentry = Entry(root)
tolNentry.grid(row = 1, column = 1)
tolNentry.insert(0, '10')
Label(root, text = "tolS").grid(row = 2, column = 0)
tolSentry = Entry(root)
tolSentry.grid(row = 2, column = 1)
tolSentry.insert(0, '1.0')
Button(root, text = 'ReDraw', command = drawNewTree).grid(row = 1, column = 2, \
rowspan = 3)
chkBtnVar = IntVar()
chkBtn = Checkbutton(root, text = "Model Tree", variable = chkBtnVar)
chkBtn.grid(row = 3, column = 0, columnspan = 2)
reDraw.rawDat = mat(regTrees.loadDataSet('sine.txt'))
reDraw.testDat = arange(min(reDraw.rawDat[:, 0]), \
max(reDraw.rawDat[:, 0]), 0.01)
reDraw(1.0, 10)
Button(root, text = 'Quit', fg = 'black', command = root.quit).grid(row = 1,
column = 2)
root.mainloop()