《机器学习实战》笔记 (第三和第四章) (附Python3版代码)(Machine Learning in Action)
前言:
《机器学习实战》(Machine Learning in Action)是一本常见的机器学习入门书,书中代码由Python2写成。由于现时Python2已逐渐退出舞台,所以这篇文章将该书的所有代码部分用Python3重写。
代码上传GitHub: https://github.com/kungbob/Machine_Learning_In_Action
原版Python2代码:https://www.manning.com/books/machine-learning-in-action
第三章:决策树
优点: 计算复杂度不高,输出结果易于理解,对中间值的确实不敏感,可以处理不相关特征数据。
缺点:可能会产生过度匹配问题。
适用数据类型:数据型和标称型。
决策树算法原理:
- 寻找当前数据集中的最好特征
- 使用该特征划分数据集
- 为数据集创建分支节点
- 如果数据子集的数据属于同一类型,分类完成。相反,则需要为子集重复整个算法流程。
决策树的一般流程:
- 收集数据:可以使用任何方法
- 准备数据:树构造算法只适用于标称型数据,因此数值型数据必须离散化。
- 分析数据:可以使用任何方法,构造书完成后,应该检查图形是否符合预期。
- 训练算法:构造树的数据结构。
- 测试算法:使用经验树计算错误率。
- 使用算法:此步骤可以适用于任何监督学习算法,而使用决策树可以更好地理解数据的内在含义。
注: 书中使用的是ID3算法来划分数据,并非二分法。
示例:使用决策树预测隐形眼镜类型
- 收集数据:提供的文本文件。
- 准备数据:解析tab键分隔的数据行。
- 分析数据:快速检查数据,确保正确地解析数据内容,使用createPlot()函数绘制最终的树形图。
- 训练算法:使用3.1节的createTree()函数。
- 测试算法:编写测试函数验证决策树可以正确分类给定的数据实例。
- 使用算法:存储树的数据结构,以便下次使用时无需重新构造书。
trees.py完整代码(章节中Python命令行代码在GitHub):
from math import log
import operator
# Entropy calculation, program 3_1
def calcShannonEnt(dataSet):
numEntries = len(dataSet)
labelCounts = {}
# Create dictionary for all possible classes
for featVec in dataSet:
currentLabel = featVec[-1]
if currentLabel not in labelCounts.keys():
labelCounts[currentLabel] = 0
labelCounts[currentLabel] += 1
shannonEnt = 0.0
# Choose 2 as log number
for key in labelCounts:
prob = float(labelCounts[key]) / numEntries
shannonEnt -= prob * log(prob, 2)
return shannonEnt
def createDataSet():
dataSet = [[1, 1, 'yes'],
[1, 1, 'yes'],
[1, 0, 'no'],
[0, 1, 'no'],
[0, 1, 'no']]
labels = ['no surfacing', 'flippers']
return dataSet, labels
# Spliting classes for particular feature, program 3_2
def splitDataSet(dataSet, axis, value):
# Create a new list object
retDataSet = []
for featVec in dataSet:
if featVec[axis] == value:
# Extraction of data
reducedFeatVec = featVec[:axis]
reducedFeatVec.extend(featVec[axis+1:])
retDataSet.append(reducedFeatVec)
return retDataSet
# Finding the best feature to split, program 3_3
def chooseBestFeatureToSplit(dataSet):
numFeatures = len(dataSet[0]) - 1
baseEntopy = calcShannonEnt(dataSet)
bestInfoGain = 0.0
bestFeature = -1
for i in range(numFeatures):
# Create the only one classifier label
featList = [example[i] for example in dataSet]
uniqueVals = set(featList)
newEntropy = 0.0
# Calculate the information entropy for each classification
for value in uniqueVals:
subDataSet = splitDataSet(dataSet, i, value)
prob = len(subDataSet) / float(len(dataSet))
newEntropy += prob * calcShannonEnt(subDataSet)
infoGain = baseEntopy - newEntropy
# Choose the only best entropy
if(infoGain > bestInfoGain):
bestInfoGain = infoGain
bestFeature = i
return bestFeature
# Find the majority prediction of classes
def majorityCnt(classList):
classCount = {}
for vote in classList:
if vote not in classCount.keys():
classCount[vote] = 0
classCount[vote] += 1
sortedClassCount = sorted(classCount.iteritems(), \
key = operator.itergetter(1), reverse = True)
return sortedClassCount[0][0]
# Create Trees, program 3_4
def createTree(dataSet, labels):
classList = [example[-1] for example in dataSet]
# If the class are the same, stop classifying
if classList.count(classList[0]) == len(classList):
return classList[0]
# Return the highest frequency class after iterating all features
if len(dataSet[0]) == 1:
return majorityCnt(classList)
bestFeat = chooseBestFeatureToSplit(dataSet)
bestFeatLabel = labels[bestFeat]
myTree = {bestFeatLabel: {}}
del(labels[bestFeat])
# Get the list of all features value
featValues = [example[bestFeat] for example in dataSet]
uniqueVals = set(featValues)
for value in uniqueVals:
subLabels = labels[:]
myTree[bestFeatLabel][value] = createTree(splitDataSet \
(dataSet, bestFeat, value), subLabels)
return myTree
# classification function of decision tree, program 3_8
def classify(inputTree, featLabels, testVec):
firstStr = list(inputTree.keys())[0]
secondDict = inputTree[firstStr]
featIndex = featLabels.index(firstStr)
# Change the labeled string into index
for key in secondDict.keys():
if testVec[featIndex] == key:
if type(secondDict[key]).__name__ == 'dict' :
classLabel = classify(secondDict[key], featLabels, testVec)
else:
classLabel = secondDict[key]
return classLabel
# Use pickle to store decision tree, program 3_9
def storeTree(inputTree, filename):
fw = open(filename, 'w')
pickle.dump(inputTree, fw)
fw.close()
def grabTree(filename):
fr = open(filename)
return pickle.load(fr)
treePlotter.py
import matplotlib.pyplot as plt
import pickle
# Define the figure size and arrow format, program 3_5
decisionNode = dict(boxstyle="sawtooth", fc = "0.8")
leafNode = dict(boxstyle="round4", fc = "0.8")
arrow_args = dict(arrowstyle = "<-")
# Draw comment with arrows
def plotNode(nodeTxt, centerPt, parentPt, nodeType):
createPlot.ax1.annotate(nodeTxt, xy = parentPt, \
xycoords = 'axes fraction', xytext = centerPt, \
textcoords = 'axes fraction', \
va = "center", ha = "center", bbox = nodeType, \
arrowprops = arrow_args)
def createPlot():
fig = plt.figure(1, facecolor = 'white')
fig.clf()
createPlot.ax1 = plt.subplot(111, frameon = False)
plotNode('Decision Node', (0.5, 0.1), (0.1, 0.5), decisionNode)
plotNode('Branch Node', (0.8, 0.1), (0.3, 0.8), leafNode)
plt.show()
# Number of leafs of tree, program 3_6
def getNumLeafs(myTree):
numLeafs = 0
firstStr = list(myTree.keys())[0]
secondDict = myTree[firstStr]
# Testing whether the data type in node is dict
for key in secondDict.keys():
if type(secondDict[key]).__name__ == 'dict':
numLeafs += getNumLeafs(secondDict[key])
else:
numLeafs += 1
return numLeafs
def getTreeDepth(myTree):
maxDepth = 0
firstStr = list(myTree.keys())[0]
secondDict = myTree[firstStr]
for key in secondDict.keys():
if type(secondDict[key]).__name__ == 'dict':
thisDepth = 1 + getTreeDepth(secondDict[key])
else:
thisDepth = 1
if thisDepth > maxDepth:
maxDepth = thisDepth
return maxDepth
def retrieveTree(i):
listOfTrees = [{'no surfacing': {0: 'no', 1: {'flippers': \
{0: 'no', 1: 'yes'}}}},
{'no surfacing': {0: 'no', 1: {'flippers': \
{0: {'head': {0: 'no', 1: 'yes'}}, 1: 'no'}}}}
]
return listOfTrees[i]
# Fill text message in father nodes, program 3_7
def plotMidText(cntrPt, parentPt, txtString):
xMid = (parentPt[0] - cntrPt[0]) / 2.0 + cntrPt[0]
yMid = (parentPt[1] - cntrPt[1]) / 2.0 + cntrPt[1]
createPlot.ax1.text(xMid, yMid, txtString)
# Calculate the height and width
def plotTree(myTree, parentPt, nodeTxt):
numLeafs = getNumLeafs(myTree)
depth = getTreeDepth(myTree)
firstStr = list(myTree.keys())[0]
cntrPt = (plotTree.xOff + (1.0 + float(numLeafs)) / 2.0 / plotTree.totalW, \
plotTree.yOff)
# Mark the child node text
plotMidText(cntrPt, parentPt, nodeTxt)
plotNode(firstStr, cntrPt, parentPt, decisionNode)
secondDict = myTree[firstStr]
# Reduce the shift value of y axis
plotTree.yOff = plotTree.yOff - 1.0/plotTree.totalD
for key in secondDict.keys():
if type(secondDict[key]).__name__== 'dict':
plotTree(secondDict[key], cntrPt, str(key))
else:
plotTree.xOff = plotTree.xOff + 1.0 / plotTree.totalW
plotNode(secondDict[key], (plotTree.xOff, plotTree.yOff), \
cntrPt, leafNode)
plotMidText((plotTree.xOff, plotTree.yOff), cntrPt, str(key))
plotTree.yOff = plotTree.yOff + 1.0 / plotTree.totalD
def createPlot(inTree):
fig = plt.figure(1, facecolor = 'white')
fig.clf()
axprops = dict(xticks = [], yticks = [])
createPlot.ax1 = plt.subplot(111, frameon = False, **axprops)
plotTree.totalW = float(getNumLeafs(inTree))
plotTree.totalD = float(getTreeDepth(inTree))
plotTree.xOff = -0.5 / plotTree.totalW
plotTree.yOff = 1.0
plotTree(inTree, (0.5, 1.0), '')
plt.show()
第四章:基于概率论的分类方法:朴素贝叶斯
优点: 在数据较少的情况下仍然有效,可以处理多类别问题。
缺点: 对于输入数据的准备方式较为敏感。
适用数据类型: 标称型数据。
朴素贝叶斯的一般过程:
- 收集数据: 可以使用任何方法。本章使用RSS源。
- 准备数据: 需要数值型或者布尔型数据。
- 分析数据: 有大量特征时,绘制特征作用不大,此时使用时直方图效果更好。
- 训练算法: 计算不同的独立特征的条件概率。
- 测试算法: 计算错误率。
- 使用算法: 一个常见的朴素贝叶斯应用是文档分类。可以在任意的分类场景中使用朴素贝叶斯分类器,不一定非要是文本。
示例:使用朴素贝叶斯对电子邮件进行分类
- 收集数据: 提供文本文件。
- 准备数据: 将文本文件解析成词条向量。
- 分析数据: 检查词条确保解析的正确性。
- 训练算法: 使用之前建立的trainNB0()函数。
- 测试算法: 使用classifyNB(),并且构建一个新的测试函数来计算文档集的错误率。
- 使用算法: 构建一个完整的程序对一组文档进行分类,将错分的文档输出到屏幕上。
bayes.py完整代码(章节中Python命令行代码在GitHub):
from numpy import *
import re, operator
# Convert wordlist to vector, program 4_1
def loadDataSet():
postingList=[['my', 'dog', 'has', 'flea', 'problems', 'help', 'please'],
['maybe', 'not', 'take', 'him', 'to', 'dog', 'park', 'stupid'],
['my', 'dalmation', 'is', 'so', 'cute', 'I', 'love', 'him'],
['stop', 'posting', 'stupid', 'worthless', 'garbage'],
['mr', 'licks', 'ate', 'my', 'steak', 'how', 'to', 'stop', 'him'],
['quit', 'buying', 'worthless', 'dog', 'food', 'stupid']]
classVec = [0,1,0,1,0,1] #1 is abusive, 0 not
return postingList,classVec
def createVocabList(dataSet):
# Create an empty set
vocabSet = set([])
for document in dataSet:
# Create a union set
vocabSet = vocabSet | set(document)
return list(vocabSet)
def setOfWords2Vec(vocabList, inputSet):
# Create a zero vector
returnVec = [0] * len(vocabList)
for word in inputSet:
if word in vocabList:
returnVec[vocabList.index(word)] = 1
else:
print("the word: %s is not in mu vocabulary!" % word)
return returnVec
# Naive Bayes classification training function, program 4_2
def trainNB0(trainMatrix, trainCategory):
numTrainDocs = len(trainMatrix)
numWords = len(trainMatrix[0])
pAbusive = sum(trainCategory)/float(numTrainDocs)
# Initialise the probability as zero in program 4_2
# Change in chapter 4_5_3
p0Num = ones(numWords)
p1Num = ones(numWords)
p0Denom = 2.0
p1Denom = 2.0
for i in range(numTrainDocs):
if trainCategory[i] == 1:
# Addition of vectors
p1Num += trainMatrix[i]
p1Denom += sum(trainMatrix[i])
else:
p0Num += trainMatrix[i]
p0Denom += sum(trainMatrix[i])
# Division to each element only in program 4_2
# Add log function in chapter 4_5_3
p1Vect = log(p1Num/p1Denom) # change to log()
p0Vect = log(p0Num/p0Denom) # change to log()
return p0Vect, p1Vect, pAbusive
# Naive Bayes classification function
# Program 4_3
def classifyNB(vec2Classify, p0Vec, p1Vec, pClass1):
p1 = sum(vec2Classify * p1Vec) + log(pClass1)
p0 = sum(vec2Classify * p0Vec) + log(1.0 - pClass1)
if p1 > p0:
return 1
else:
return 0
def testingNB():
listOPosts, listClasses = loadDataSet()
myVocabList = createVocabList(listOPosts)
trainMat = []
for postinDoc in listOPosts:
trainMat.append(setOfWords2Vec(myVocabList, postinDoc))
p0V, p1V, pAb = trainNB0(array(trainMat), array(listClasses))
testEntry = ['love', 'my', 'dalmation']
thisDoc = array(setOfWords2Vec(myVocabList, testEntry))
print(testEntry, 'classified as: ', classifyNB(thisDoc, p0V, p1V, pAb))
testEntry = ['stupid', 'garbage']
thisDoc = array(setOfWords2Vec(myVocabList, testEntry))
print(testEntry, 'classified as: ', classifyNB(thisDoc, p0V, p1V, pAb))
# Naive Bayes Word Bag Model, program 4_4
def bagOfWords2VecMN(vocabList, inputSet):
returnVec = [0] * len(vocabList)
for word in inputSet:
if word in vocabList:
returnVec[vocabList.index(word)] += 1
return returnVec
# File analysis and complte email test function, program 4_5
def textParse(bigString):
listOfTokens = re.split(r'\W*', bigString)
return [tok.lower() for tok in listOfTokens if len(tok) > 2]
def spamTest():
docList = [];
classList = [];
fullText = [];
for i in range(1, 26):
# Load and parse the email
wordList = textParse(open('email/spam/%d.txt' %i).read())
docList.append(wordList)
fullText.extend(wordList)
classList.append(1)
wordList = textParse(open('email/ham/%d.txt' %i).read())
docList.append(wordList)
fullText.extend(wordList)
classList.append(0)
vocabList = createVocabList(docList)
trainingSet = list(range(50))
testSet = []
for i in range(10):
# Construct training dataset using random number
randIndex = int(random.uniform(0, len(trainingSet)))
testSet.append(trainingSet[randIndex])
del(trainingSet[randIndex])
trainMat = []
trainClasses = []
for docIndex in trainingSet:
trainMat.append(setOfWords2Vec(vocabList, docList[docIndex]))
trainClasses.append(classList[docIndex])
p0V, p1V, pSpam = trainNB0(array(trainMat), array(trainClasses))
errorCount = 0
for docIndex in testSet:
# Classify the training dataset
wordVector = setOfWords2Vec(vocabList, docList[docIndex])
if classifyNB(array(wordVector), p0V, p1V, pSpam) != classList[docIndex]:
errorCount += 1
print('the error rate is: ', float(errorCount)/len(testSet))
# RSS classifier and high drequency word deletion, program 4_6
def calcMostFreq(vocabList, fullText):
# Calculate the frequency of word
freqDict = {}
for token in vocabList:
freqDict[token] = fullText.count(token)
sortedFreq = sorted(freqDict.items(), key=operator.itemgetter(1), \
reverse = True)
return sortedFreq[:30]
def localWords(feed1, feed0):
docList = []
classList = []
fullText = []
minLen = min(len(feed1['entries']), len(feed0['entries']))
for i in range(minLen):
# Visit one RSS source each time
wordList = textParse(feed1['entries'][i]['summary'])
docList.append(wordList)
fullText.extend(wordList)
classList.append(1)
wordList = textParse(feed0['entries'][i]['summary'])
docList.append(wordList)
fullText.extend(wordList)
classList.append(0)
vocabList = createVocabList(docList)
# Remove the top 30 words from vocabList
top30Words = calcMostFreq(vocabList, fullText)
for pairW in top30Words:
if pairW[0] in vocabList:
vocabList.remove(pairW[0])
trainingSet = list(range(2*minLen))
testSet = []
for i in range(20):
randIndex = int(random.uniform(0, len(trainingSet)))
testSet.append(trainingSet[randIndex])
del(trainingSet[randIndex])
trainMat = []
trainClasses = []
for docIndex in trainingSet:
trainMat.append(bagOfWords2VecMN(vocabList, docList[docIndex]))
trainClasses.append(classList[docIndex])
p0V, p1V, pSpam = trainNB0(array(trainMat), array(trainClasses))
errorCount = 0
for docIndex in testSet:
wordVector = bagOfWords2VecMN(vocabList, docList[docIndex])
if classifyNB(array(wordVector), p0V, p1V, pSpam) != classList[docIndex]:
errorCount += 1
print ('the error rate is: ', float(errorCount) / len(testSet))
return vocabList, p0V, p1V
# Most frequency word showing function
def getTopWords(ny, sf):
vocabList, p0V, p1V = localWords(ny, sf)
topNY = []
topSF = []
for i in range(len(p0V)):
if p0V[i] > -6.0:
topSF.append((vocabList[i], p0V[i]))
if p1V[i] > -6.0:
topNY.append((vocabList[i], p1V[i]))
sortedSF = sorted(topSF, key=lambda pair: pair[1], reverse = True)
print("SF**SF**SF**SF**SF**SF**SF**SF**SF**SF**SF**SF**SF**")
for item in sortedSF:
print(item[0])
sortedNY = sorted(topNY, key=lambda pair: pair[1], reverse = True)
print("NY**NY**NY**NY**NY**NY**NY**NY**NY**NY**NY**NY**NY**")
for item in sortedNY:
print(item[0])
示例: 使用朴素贝叶斯分类器从个人广告中获取区域倾向
注意: 现时书中的RSS网址已经不可用,暂时这部分代码并不能运行。
- 收集数据: 从RSS源收集内容,这里需要对RSS源构建一个接口。
- 准备数据: 将文本文件解构成词条向量。
- 分析数据: 检查词条确保解析的正确性。
- 训练算法: 使用之前建立的trainNB0()函数。
- 测试算法: 观察错误率,确保分类器可用。可以修改切分程序,以降低错误率,提高分类结果。
- 使用算法: 构建一个完整的程序,封装所有内容。给定两个RSS源,该程序会显示最常用的公共词。
该部分的代码已经整合于bayes.py文件中。