《机器学习实战》笔记 (第三和第四章) (附Python3版代码)(Machine Learning in Action)

Machine Learning Mar 18, 2018

前言:

《机器学习实战》(Machine Learning in Action)是一本常见的机器学习入门书,书中代码由Python2写成。由于现时Python2已逐渐退出舞台,所以这篇文章将该书的所有代码部分用Python3重写。
代码上传GitHub: https://github.com/kungbob/Machine_Learning_In_Action
原版Python2代码:https://www.manning.com/books/machine-learning-in-action

第三章:决策树

优点: 计算复杂度不高,输出结果易于理解,对中间值的确实不敏感,可以处理不相关特征数据。
缺点:可能会产生过度匹配问题。
适用数据类型:数据型和标称型。

决策树算法原理:

  1. 寻找当前数据集中的最好特征
  2. 使用该特征划分数据集
  3. 为数据集创建分支节点
  4. 如果数据子集的数据属于同一类型,分类完成。相反,则需要为子集重复整个算法流程。

决策树的一般流程:

  1. 收集数据:可以使用任何方法
  2. 准备数据:树构造算法只适用于标称型数据,因此数值型数据必须离散化。
  3. 分析数据:可以使用任何方法,构造书完成后,应该检查图形是否符合预期。
  4. 训练算法:构造树的数据结构。
  5. 测试算法:使用经验树计算错误率。
  6. 使用算法:此步骤可以适用于任何监督学习算法,而使用决策树可以更好地理解数据的内在含义。

注: 书中使用的是ID3算法来划分数据,并非二分法。

示例:使用决策树预测隐形眼镜类型

  1. 收集数据:提供的文本文件。
  2. 准备数据:解析tab键分隔的数据行。
  3. 分析数据:快速检查数据,确保正确地解析数据内容,使用createPlot()函数绘制最终的树形图。
  4. 训练算法:使用3.1节的createTree()函数。
  5. 测试算法:编写测试函数验证决策树可以正确分类给定的数据实例。
  6. 使用算法:存储树的数据结构,以便下次使用时无需重新构造书。

trees.py完整代码(章节中Python命令行代码在GitHub):

from math import log
import operator

# Entropy calculation, program 3_1
def calcShannonEnt(dataSet):
    numEntries = len(dataSet)
    labelCounts = {}
    # Create dictionary for all possible classes
    for featVec in dataSet:
        currentLabel = featVec[-1]
        if currentLabel not in labelCounts.keys():
            labelCounts[currentLabel] = 0
        labelCounts[currentLabel] += 1
    shannonEnt = 0.0
    # Choose 2 as log number
    for key in labelCounts:
        prob = float(labelCounts[key]) / numEntries
        shannonEnt -= prob * log(prob, 2)
    return shannonEnt

def createDataSet():
    dataSet = [[1, 1, 'yes'],
            [1, 1, 'yes'],
            [1, 0, 'no'],
            [0, 1, 'no'],
            [0, 1, 'no']]
    labels = ['no surfacing', 'flippers']
    return dataSet, labels

# Spliting classes for particular feature, program 3_2
def splitDataSet(dataSet, axis, value):
    # Create a new list object
    retDataSet = []
    for featVec in dataSet:
        if featVec[axis] == value:
            # Extraction of data
            reducedFeatVec = featVec[:axis]
            reducedFeatVec.extend(featVec[axis+1:])
            retDataSet.append(reducedFeatVec)
    return retDataSet

# Finding the best feature to split, program 3_3
def chooseBestFeatureToSplit(dataSet):
    numFeatures = len(dataSet[0]) - 1
    baseEntopy = calcShannonEnt(dataSet)
    bestInfoGain = 0.0
    bestFeature = -1
    for i in range(numFeatures):
        # Create the only one classifier label
        featList = [example[i] for example in dataSet]
        uniqueVals = set(featList)
        newEntropy = 0.0
        # Calculate the information entropy for each classification
        for value in uniqueVals:
            subDataSet = splitDataSet(dataSet, i, value)
            prob = len(subDataSet) / float(len(dataSet))
            newEntropy += prob * calcShannonEnt(subDataSet)
        infoGain = baseEntopy - newEntropy
        # Choose the only best entropy
        if(infoGain > bestInfoGain):
            bestInfoGain = infoGain
            bestFeature = i
    return bestFeature

# Find the majority prediction of classes
def majorityCnt(classList):
    classCount = {}
    for vote in classList:
        if vote not in classCount.keys():
            classCount[vote] = 0
        classCount[vote] += 1
    sortedClassCount = sorted(classCount.iteritems(), \
        key = operator.itergetter(1), reverse = True)
    return sortedClassCount[0][0]

# Create Trees, program 3_4
def createTree(dataSet, labels):
    classList = [example[-1] for example in dataSet]
    # If the class are the same, stop classifying
    if classList.count(classList[0]) == len(classList):
        return classList[0]
    # Return the highest frequency class after iterating all features
    if len(dataSet[0]) == 1:
        return majorityCnt(classList)
    bestFeat = chooseBestFeatureToSplit(dataSet)
    bestFeatLabel = labels[bestFeat]
    myTree = {bestFeatLabel: {}}
    del(labels[bestFeat])
    # Get the list of all features value
    featValues = [example[bestFeat] for example in dataSet]
    uniqueVals = set(featValues)
    for value in uniqueVals:
        subLabels = labels[:]
        myTree[bestFeatLabel][value] = createTree(splitDataSet \
                            (dataSet, bestFeat, value), subLabels)
    return myTree

# classification function of decision tree, program 3_8
def classify(inputTree, featLabels, testVec):
    firstStr = list(inputTree.keys())[0]
    secondDict = inputTree[firstStr]
    featIndex = featLabels.index(firstStr)
    # Change the labeled string into index
    for key in secondDict.keys():
        if testVec[featIndex] == key:
            if type(secondDict[key]).__name__ == 'dict' :
                classLabel = classify(secondDict[key], featLabels, testVec)
            else:
                classLabel = secondDict[key]
    return classLabel

# Use pickle to store decision tree, program 3_9
def storeTree(inputTree, filename):
    fw = open(filename, 'w')
    pickle.dump(inputTree, fw)
    fw.close()

def grabTree(filename):
    fr = open(filename)
    return pickle.load(fr)

treePlotter.py

import matplotlib.pyplot as plt
import pickle

# Define the figure size and arrow format, program 3_5
decisionNode = dict(boxstyle="sawtooth", fc = "0.8")
leafNode = dict(boxstyle="round4", fc = "0.8")
arrow_args = dict(arrowstyle = "<-")

# Draw comment with arrows
def plotNode(nodeTxt, centerPt, parentPt, nodeType):
    createPlot.ax1.annotate(nodeTxt, xy = parentPt, \
    xycoords = 'axes fraction', xytext = centerPt, \
    textcoords = 'axes fraction', \
    va = "center", ha = "center", bbox = nodeType, \
    arrowprops = arrow_args)

def createPlot():
    fig = plt.figure(1, facecolor = 'white')
    fig.clf()
    createPlot.ax1 = plt.subplot(111, frameon = False)
    plotNode('Decision Node', (0.5, 0.1), (0.1, 0.5), decisionNode)
    plotNode('Branch Node', (0.8, 0.1), (0.3, 0.8), leafNode)
    plt.show()

# Number of leafs of tree, program 3_6
def getNumLeafs(myTree):
    numLeafs = 0
    firstStr = list(myTree.keys())[0]
    secondDict = myTree[firstStr]
    # Testing whether the data type in node is dict
    for key in secondDict.keys():
        if type(secondDict[key]).__name__ == 'dict':
            numLeafs += getNumLeafs(secondDict[key])
        else:
            numLeafs += 1
    return numLeafs

def getTreeDepth(myTree):
    maxDepth = 0
    firstStr = list(myTree.keys())[0]
    secondDict = myTree[firstStr]
    for key in secondDict.keys():
        if type(secondDict[key]).__name__ == 'dict':
            thisDepth = 1 + getTreeDepth(secondDict[key])
        else:
            thisDepth = 1
        if thisDepth > maxDepth:
            maxDepth = thisDepth
    return maxDepth

def retrieveTree(i):
    listOfTrees = [{'no surfacing': {0: 'no', 1: {'flippers': \
                    {0: 'no', 1: 'yes'}}}},
                   {'no surfacing': {0: 'no', 1: {'flippers': \
                    {0: {'head': {0: 'no', 1: 'yes'}}, 1: 'no'}}}}
                   ]
    return listOfTrees[i]

# Fill text message in father nodes, program 3_7
def plotMidText(cntrPt, parentPt, txtString):
    xMid = (parentPt[0] - cntrPt[0]) / 2.0 + cntrPt[0]
    yMid = (parentPt[1] - cntrPt[1]) / 2.0 + cntrPt[1]
    createPlot.ax1.text(xMid, yMid, txtString)

# Calculate the height and width
def plotTree(myTree, parentPt, nodeTxt):
    numLeafs = getNumLeafs(myTree)
    depth = getTreeDepth(myTree)
    firstStr = list(myTree.keys())[0]
    cntrPt = (plotTree.xOff + (1.0 + float(numLeafs)) / 2.0 / plotTree.totalW, \
                plotTree.yOff)
    # Mark the child node text
    plotMidText(cntrPt, parentPt, nodeTxt)
    plotNode(firstStr, cntrPt, parentPt, decisionNode)
    secondDict = myTree[firstStr]
    # Reduce the shift value of y axis
    plotTree.yOff = plotTree.yOff - 1.0/plotTree.totalD
    for key in secondDict.keys():
        if type(secondDict[key]).__name__== 'dict':
            plotTree(secondDict[key], cntrPt, str(key))
        else:
            plotTree.xOff = plotTree.xOff + 1.0 / plotTree.totalW
            plotNode(secondDict[key], (plotTree.xOff, plotTree.yOff), \
                cntrPt, leafNode)
            plotMidText((plotTree.xOff, plotTree.yOff), cntrPt, str(key))
    plotTree.yOff = plotTree.yOff + 1.0 / plotTree.totalD

def createPlot(inTree):
    fig = plt.figure(1, facecolor = 'white')
    fig.clf()
    axprops = dict(xticks = [], yticks = [])
    createPlot.ax1 = plt.subplot(111, frameon = False, **axprops)
    plotTree.totalW = float(getNumLeafs(inTree))
    plotTree.totalD = float(getTreeDepth(inTree))
    plotTree.xOff = -0.5 / plotTree.totalW
    plotTree.yOff = 1.0
    plotTree(inTree, (0.5, 1.0), '')
    plt.show()

第四章:基于概率论的分类方法:朴素贝叶斯

优点: 在数据较少的情况下仍然有效,可以处理多类别问题。
缺点: 对于输入数据的准备方式较为敏感。
适用数据类型: 标称型数据。

朴素贝叶斯的一般过程:

  1. 收集数据: 可以使用任何方法。本章使用RSS源。
  2. 准备数据: 需要数值型或者布尔型数据。
  3. 分析数据: 有大量特征时,绘制特征作用不大,此时使用时直方图效果更好。
  4. 训练算法: 计算不同的独立特征的条件概率。
  5. 测试算法: 计算错误率。
  6. 使用算法: 一个常见的朴素贝叶斯应用是文档分类。可以在任意的分类场景中使用朴素贝叶斯分类器,不一定非要是文本。

示例:使用朴素贝叶斯对电子邮件进行分类

  1. 收集数据: 提供文本文件。
  2. 准备数据: 将文本文件解析成词条向量。
  3. 分析数据: 检查词条确保解析的正确性。
  4. 训练算法: 使用之前建立的trainNB0()函数。
  5. 测试算法: 使用classifyNB(),并且构建一个新的测试函数来计算文档集的错误率。
  6. 使用算法: 构建一个完整的程序对一组文档进行分类,将错分的文档输出到屏幕上。

bayes.py完整代码(章节中Python命令行代码在GitHub):

from numpy import *
import re, operator

# Convert wordlist to vector, program 4_1
def loadDataSet():
    postingList=[['my', 'dog', 'has', 'flea', 'problems', 'help', 'please'],
                 ['maybe', 'not', 'take', 'him', 'to', 'dog', 'park', 'stupid'],
                 ['my', 'dalmation', 'is', 'so', 'cute', 'I', 'love', 'him'],
                 ['stop', 'posting', 'stupid', 'worthless', 'garbage'],
                 ['mr', 'licks', 'ate', 'my', 'steak', 'how', 'to', 'stop', 'him'],
                 ['quit', 'buying', 'worthless', 'dog', 'food', 'stupid']]
    classVec = [0,1,0,1,0,1]    #1 is abusive, 0 not
    return postingList,classVec

def createVocabList(dataSet):
    # Create an empty set
    vocabSet = set([])
    for document in dataSet:
        # Create a union set
        vocabSet = vocabSet | set(document)
    return list(vocabSet)


def setOfWords2Vec(vocabList, inputSet):
    # Create a zero vector
    returnVec = [0] * len(vocabList)
    for word in inputSet:
        if word in vocabList:
            returnVec[vocabList.index(word)] = 1
        else:
            print("the word: %s is not in mu vocabulary!" % word)
    return returnVec

# Naive Bayes classification training function, program 4_2
def trainNB0(trainMatrix, trainCategory):
    numTrainDocs = len(trainMatrix)
    numWords = len(trainMatrix[0])
    pAbusive = sum(trainCategory)/float(numTrainDocs)
    # Initialise the probability as zero in program 4_2
    # Change in chapter 4_5_3
    p0Num = ones(numWords)
    p1Num = ones(numWords)
    p0Denom = 2.0
    p1Denom = 2.0
    for i in range(numTrainDocs):
        if trainCategory[i] == 1:
            # Addition of vectors
            p1Num += trainMatrix[i]
            p1Denom += sum(trainMatrix[i])
        else:
            p0Num += trainMatrix[i]
            p0Denom += sum(trainMatrix[i])
    # Division to each element only in program 4_2
    # Add log function in chapter 4_5_3
    p1Vect = log(p1Num/p1Denom) # change to log()
    p0Vect = log(p0Num/p0Denom) # change to log()
    return p0Vect, p1Vect, pAbusive

# Naive Bayes classification function
# Program 4_3
def classifyNB(vec2Classify, p0Vec, p1Vec, pClass1):
    p1 = sum(vec2Classify * p1Vec) + log(pClass1)
    p0 = sum(vec2Classify * p0Vec) + log(1.0 - pClass1)
    if p1 > p0:
        return 1
    else:
        return 0

def testingNB():
    listOPosts, listClasses = loadDataSet()
    myVocabList = createVocabList(listOPosts)
    trainMat = []
    for postinDoc in listOPosts:
        trainMat.append(setOfWords2Vec(myVocabList, postinDoc))
    p0V, p1V, pAb = trainNB0(array(trainMat), array(listClasses))
    testEntry = ['love', 'my', 'dalmation']
    thisDoc = array(setOfWords2Vec(myVocabList, testEntry))
    print(testEntry, 'classified as: ', classifyNB(thisDoc, p0V, p1V, pAb))
    testEntry = ['stupid', 'garbage']
    thisDoc = array(setOfWords2Vec(myVocabList, testEntry))
    print(testEntry, 'classified as: ', classifyNB(thisDoc, p0V, p1V, pAb))

# Naive Bayes Word Bag Model, program 4_4
def bagOfWords2VecMN(vocabList, inputSet):
    returnVec = [0] * len(vocabList)
    for word in inputSet:
        if word in vocabList:
            returnVec[vocabList.index(word)] += 1
    return returnVec

# File analysis and complte email test function, program 4_5
def textParse(bigString):
    listOfTokens = re.split(r'\W*', bigString)
    return [tok.lower() for tok in listOfTokens if len(tok) > 2]

def spamTest():
    docList = [];
    classList = [];
    fullText = [];
    for i in range(1, 26):
        # Load and parse the email
        wordList = textParse(open('email/spam/%d.txt' %i).read())
        docList.append(wordList)
        fullText.extend(wordList)
        classList.append(1)
        wordList = textParse(open('email/ham/%d.txt' %i).read())
        docList.append(wordList)
        fullText.extend(wordList)
        classList.append(0)
    vocabList = createVocabList(docList)
    trainingSet = list(range(50))
    testSet = []
    for i in range(10):
        # Construct training dataset using random number
        randIndex = int(random.uniform(0, len(trainingSet)))
        testSet.append(trainingSet[randIndex])
        del(trainingSet[randIndex])
    trainMat = []
    trainClasses = []
    for docIndex in trainingSet:
        trainMat.append(setOfWords2Vec(vocabList, docList[docIndex]))
        trainClasses.append(classList[docIndex])
    p0V, p1V, pSpam = trainNB0(array(trainMat), array(trainClasses))
    errorCount = 0
    for docIndex in testSet:
        # Classify the training dataset
        wordVector = setOfWords2Vec(vocabList, docList[docIndex])
        if classifyNB(array(wordVector), p0V, p1V, pSpam) != classList[docIndex]:
            errorCount += 1
    print('the error rate is: ', float(errorCount)/len(testSet))

# RSS classifier and high drequency word deletion, program 4_6
def calcMostFreq(vocabList, fullText):
    # Calculate the frequency of word
    freqDict = {}
    for token in vocabList:
        freqDict[token] = fullText.count(token)
    sortedFreq = sorted(freqDict.items(), key=operator.itemgetter(1), \
                    reverse = True)
    return sortedFreq[:30]

def localWords(feed1, feed0):
    docList = []
    classList = []
    fullText = []
    minLen = min(len(feed1['entries']), len(feed0['entries']))
    for i in range(minLen):
        # Visit one RSS source each time
        wordList = textParse(feed1['entries'][i]['summary'])
        docList.append(wordList)
        fullText.extend(wordList)
        classList.append(1)
        wordList = textParse(feed0['entries'][i]['summary'])
        docList.append(wordList)
        fullText.extend(wordList)
        classList.append(0)
    vocabList = createVocabList(docList)
    # Remove the top 30 words from vocabList
    top30Words = calcMostFreq(vocabList, fullText)
    for pairW in top30Words:
        if pairW[0] in vocabList:
            vocabList.remove(pairW[0])
    trainingSet = list(range(2*minLen))
    testSet = []
    for i in range(20):
        randIndex = int(random.uniform(0, len(trainingSet)))
        testSet.append(trainingSet[randIndex])
        del(trainingSet[randIndex])
    trainMat = []
    trainClasses = []
    for docIndex in trainingSet:
        trainMat.append(bagOfWords2VecMN(vocabList, docList[docIndex]))
        trainClasses.append(classList[docIndex])
    p0V, p1V, pSpam = trainNB0(array(trainMat), array(trainClasses))
    errorCount = 0
    for docIndex in testSet:
        wordVector = bagOfWords2VecMN(vocabList, docList[docIndex])
        if classifyNB(array(wordVector), p0V, p1V, pSpam) != classList[docIndex]:
            errorCount += 1
    print ('the error rate is: ', float(errorCount) / len(testSet))
    return vocabList, p0V, p1V

# Most frequency word showing function
def getTopWords(ny, sf):
    vocabList, p0V, p1V = localWords(ny, sf)
    topNY = []
    topSF = []
    for i in range(len(p0V)):
        if p0V[i] > -6.0:
            topSF.append((vocabList[i], p0V[i]))
        if p1V[i] > -6.0:
            topNY.append((vocabList[i], p1V[i]))
    sortedSF = sorted(topSF, key=lambda pair: pair[1], reverse = True)
    print("SF**SF**SF**SF**SF**SF**SF**SF**SF**SF**SF**SF**SF**")
    for item in sortedSF:
        print(item[0])
    sortedNY = sorted(topNY, key=lambda pair: pair[1], reverse = True)
    print("NY**NY**NY**NY**NY**NY**NY**NY**NY**NY**NY**NY**NY**")
    for item in sortedNY:
        print(item[0])

示例: 使用朴素贝叶斯分类器从个人广告中获取区域倾向

注意: 现时书中的RSS网址已经不可用,暂时这部分代码并不能运行。

  1. 收集数据: 从RSS源收集内容,这里需要对RSS源构建一个接口。
  2. 准备数据: 将文本文件解构成词条向量。
  3. 分析数据: 检查词条确保解析的正确性。
  4. 训练算法: 使用之前建立的trainNB0()函数。
  5. 测试算法: 观察错误率,确保分类器可用。可以修改切分程序,以降低错误率,提高分类结果。
  6. 使用算法: 构建一个完整的程序,封装所有内容。给定两个RSS源,该程序会显示最常用的公共词。

该部分的代码已经整合于bayes.py文件中。