為 Mahout 增加聚類評(píng)估功能

字號(hào):


    聚類算法及聚類評(píng)估 Silhouette 簡(jiǎn)介
    聚類算法簡(jiǎn)介
    聚類(clustering)是屬于無監(jiān)督學(xué)習(xí)(Unsupervised learning)的一種,用來把一組數(shù)據(jù)劃分為幾類,每類中的數(shù)據(jù)盡可能的相似,而不同類之間盡可能的差異最大化。通過聚類,可以為樣本選取提供參考,或進(jìn)行根源分析,或作為其它算法的預(yù)處理步驟。
    聚類算法中,最經(jīng)典的要屬于 Kmeans 算法,它的基本思想是:假設(shè)我們要把一組數(shù)據(jù)聚成 N 類,那就:
    把數(shù)據(jù)中的每個(gè)樣本作為一個(gè)向量,記作Ā
    首先隨機(jī)選取 n 個(gè)樣本,把這 n 個(gè)樣本作為 N 類的中心點(diǎn), 稱為 centroid
    針對(duì)數(shù)據(jù)中的所有樣本,計(jì)算到 n 個(gè) centroid 的距離,距離哪個(gè)中心點(diǎn)最近,就屬于哪一類
    在每一類中,重新選取 centroid,假設(shè)該類有 k 個(gè)樣本,則 centroid 為
    重復(fù) 2,3 直到 centroid 的變化小于預(yù)設(shè)的值。
    Mahout 是一個(gè)開源的機(jī)器學(xué)習(xí)軟件,提供了應(yīng)用推薦、聚類、分類、Logistic 回歸分析等算法。特別是由于結(jié)合了 Hadoop 的大數(shù)據(jù)處理能力,每個(gè)算法都可以作為獨(dú)立的 job 方便的部署在 Hadoop 平臺(tái)上,因此得到了越來越廣的應(yīng)用。在聚類領(lǐng)域,Mahout 提供了 Kmeans,LDA, Canopy 等多種算法。
    聚類評(píng)估算法 Silhouette 簡(jiǎn)介
    在 Kmeans 中,我們會(huì)注意到需要我們預(yù)先設(shè)置聚合成幾類。實(shí)際上,在聚類的過程中我們也不可能預(yù)先知道,那只能分成 2 類,3 類,……n 類這樣進(jìn)行嘗試,并評(píng)估每次的聚類效果。
    實(shí)際上,由于聚類的無監(jiān)督學(xué)習(xí)特性,無論什么算法都需要評(píng)估效果。在聚類的評(píng)估中,有基于外部數(shù)據(jù)的評(píng)價(jià),也有單純的基于聚類本身的評(píng)價(jià),其基本思想就是:在同一類中,各個(gè)數(shù)據(jù)點(diǎn)越近越好,并且和類外的數(shù)據(jù)點(diǎn)越遠(yuǎn)越好;前者稱為內(nèi)聚因子(cohension),后者稱為離散因子(separation)。
    把這兩者結(jié)合起來,就形成了評(píng)價(jià)聚類效果的 Silhouette 因子:
    首先看如何評(píng)價(jià)一個(gè)點(diǎn)的聚類效果:
    a = 一個(gè)點(diǎn)到同一聚類內(nèi)其它點(diǎn)的平均距離
    b=min(一個(gè)點(diǎn)到其他聚類內(nèi)的點(diǎn)的平均距離)
    Silhouette 因子s = 1 – a/b (a<b) 或b/a -1 (a>=b)
    衡量整體聚類的效果,則是所有點(diǎn)的 Silhouette 因子的平均值。范圍應(yīng)該在 (-1,1), 值越大則說明聚類效果越好。
    圖 1.Silhouette 中內(nèi)聚、離散因子示意
    以圖 1 為例。圖 1 顯示的是一個(gè)具有 9 個(gè)點(diǎn)的聚類,三個(gè)圓形表示聚成了三類,其中的黃點(diǎn)表示質(zhì)心(centroid)。為了評(píng)估圖 1 中深藍(lán)色點(diǎn)的聚類效果,其內(nèi)聚因子a就是該點(diǎn)到所在圓中其它三個(gè)點(diǎn)的平均距離。離散因子b的計(jì)算相對(duì)復(fù)雜:我們需要先求出到該點(diǎn)到右上角圓中的三個(gè)點(diǎn)的平均距離,記為 b1;然后求出該點(diǎn)到右下角圓中兩個(gè)點(diǎn)的平均距離,記為 b2;b1 和 b2 的較小值則為b。
    在 IBM 的 SPSS Clementine 中,也有 Silhouett 評(píng)估算法的實(shí)現(xiàn),不過 IBM 提供的是一個(gè)簡(jiǎn)化版本,把一個(gè)點(diǎn)到一個(gè)類內(nèi)的距離的平均值,簡(jiǎn)化為到該類質(zhì)心(centroid)的距離,具體來說,就是:
    圖 2.IBM 關(guān)于內(nèi)聚、離散因子的簡(jiǎn)化實(shí)現(xiàn)
    還是以上面描述的 9 個(gè)點(diǎn)聚成 3 類的例子來說明。IBM 的實(shí)現(xiàn)把a(bǔ)的實(shí)現(xiàn)簡(jiǎn)化為到深藍(lán)色的點(diǎn)所在的質(zhì)心的距離。計(jì)算b時(shí)候,還是要先計(jì)算 b1 和 b2,然后求最小值。但 b1 簡(jiǎn)化為到右上角圓質(zhì)心的距離;b2 簡(jiǎn)化為到右下角圓質(zhì)心的距離。
    在下面的內(nèi)容中,我們嘗試?yán)?IBM 簡(jiǎn)化后的公式為 Mahout 增加聚類評(píng)估功能。
    Mahout 聚類過程分析
    Mahout 運(yùn)行環(huán)境簡(jiǎn)介
    前面說過,Mahout 是依賴 Hadoop 環(huán)境,每一個(gè)算法或輔助功能都是作為 Hadoop 的一個(gè)單獨(dú)的 job 來運(yùn)行,所以必須準(zhǔn)備好一個(gè)可運(yùn)行的 Hadoop 環(huán)境,(至少本文寫作時(shí)候使用的 Mahout0.9 還在依賴 Hadoop),如何安裝配置一個(gè)可運(yùn)行的 Hadoop 環(huán)境不在這篇文章的介紹范圍內(nèi)。請(qǐng)自行參考 Hadoop 網(wǎng)站。需要說明的是,本文采用的 Hadoop 為 2.2.0。
    安裝完 Hadoop 后,下載 mahout-distribution-0.9,解壓縮后的重要內(nèi)容如下:
    bin/: 目錄下有 Mahout 可執(zhí)行腳本
    mahout-examples-0.9-job.jar,各種算法的實(shí)現(xiàn)類
    example/ 各種實(shí)現(xiàn)算法的源碼
    conf/ 存放各實(shí)現(xiàn)類的配置文件,其中重要的為 driver.classes.default.props,如果增加實(shí)現(xiàn)算法類,可以在該文件中增加配置項(xiàng),從而可以被 Mahout 啟動(dòng)腳本調(diào)用。
    單獨(dú)執(zhí)行 Mahout,是一個(gè)實(shí)現(xiàn)的各種功能的簡(jiǎn)介,如下例:
    執(zhí)行 /data01/shanlei/src/mahout-distribution-0.9/bin/mahout
    輸出:
    MAHOUT_LOCAL is not set; adding
    HADOOP_CONF_DIR to classpath.
    Running on hadoop, using /data01/shanlei/hadoop-2.2.0/bin/hadoop and
    HADOOP_CONF_DIR=/data01/shanlei/hadoop-2.2.0/conf
    p1 is org.apache.mahout.driver.MahoutDriver
    MAHOUT-JOB:
    /data01/shanlei/src/mahout-distribution-0.9/examples/target/mahout-examples-0.9-job.jar
    An example program must be given as the first argument.
    Valid program names are:
    arff.vector: : Generate Vectors from an ARFF file or directory
    assesser: : assesse cluster result using silhoueter algorithm
    baumwelch: : Baum-Welch algorithm for unsupervised HMM training
    canopy: : Canopy clustering
    cat: : Print a file or resource as the logistic regression models would see it
    cleansvd: : Cleanup and verification of SVD output
    clusterdump: : Dump cluster output to text
    clusterpp: : Groups Clustering Output In Clusters
    cmdump: : Dump confusion matrix in HTML or text formats
    concatmatrices: : Concatenates 2 matrices of same cardinality into a single matrix
    ……
    如果要執(zhí)行某種算法,如上面結(jié)果中顯示的 canopy,就需要執(zhí)行 mahout canopy 加上該算法需要的其它參數(shù)。
    另外,Mahout 算法的輸入輸出,都是在 Hadoop HDFS 上,因此需要通過 hdfs 命令上傳到 hdfs 文件系統(tǒng);輸出大多為 Mahout 特有的二進(jìn)制格式,需要通過 mahout seqdumper 等命令來導(dǎo)出并轉(zhuǎn)換為可讀文本。
    準(zhǔn)備輸入
    Mahout 算法使用的 input 需要特定格式的 Vector 文件,不能夠直接使用一般的文本文件,因此需要把文本轉(zhuǎn)換為 Vector 文件,好在 Mahout 自身提供了這樣的類:
    org.apache.mahout.clustering.conversion.InputDriver。
    在 Mahout 的 conf 目錄中的 driver.classes.default.props 增加如下行:
    org.apache.mahout.clustering.conversion.InputDriver = input2Seq : create sequence file from blank separated files,然后就可以為 Mahout 增加一個(gè)功能,把空格分隔的文本文件轉(zhuǎn)換為 Mahout 聚類可以使用的向量。
    如下面的數(shù)據(jù)所示,該數(shù)據(jù)每行為一個(gè)包含 6 個(gè)屬性的向量:
    1 4 3 11 4 3
    2 2 5 2 10 3
    1 1 2 2 10 1
    1 4 2 11 5 4
    1 1 3 2 10 1
    2 4 5 9 5 2
    2 6 5 3 8 1
    執(zhí)行 ./mahout input2Seq -i /shanlei/userEnum -o /shanlei/vectors 則產(chǎn)生聚類需要的向量文件。
    聚類
    以 Kmeans 聚類為例:
    ./mahout kmeans --input /shanlei/vectors --output /shanlei/kmeans -c /shanlei/k --maxIter 5 -k 8 –cl
    -k 8 指明產(chǎn)生 8 類,執(zhí)行完成后,在/shanlei/kmeans/下會(huì)產(chǎn)生: clusters-0,clusters-1,… …,clusters-n-final 目錄,每個(gè)目錄都是一次迭代產(chǎn)生的 centroids, 目錄數(shù)會(huì)受 --maxIter 控制;最后的結(jié)果會(huì)加上 final。
    利用 Mahout 的 clusterdump 功能我們可以查看聚類的結(jié)果:
    ./mahout clusterdump -i /shanlei/kmeans/clusters-2-final -o ./centroids.txt
    more centroids.txt:
    VL-869{n=49 c=[1.163, 5.082, 4.000,
    4.000, 4.592, 2.429] r=[0.370, 0.965, 1.030, 1.245, 1.244, 1.161]}
    VL-949{n=201 c=[1.229, 4.458, 4.403,
    10.040, 6.134, 1.458] r=[0.420, 1.079, 0.836, 1.196, 1.392, 0.852]}
    … …
    VL-980{n=146 c=[1.281, 2.000, 4.178,
    2.158, 9.911, 1.918] r=[0.449, 0.712, 1.203, 0.570, 0.437, 1.208]}
    VL-869 中的 869 為該類的 id,c=[1.163, 5.082, 4.000, 4.000, 4.592, 2.429] 為 centroid 的坐標(biāo),n=49 表示該類中數(shù)據(jù)點(diǎn)的個(gè)數(shù)。
    如果使用-cl 參數(shù),則在/shanlei/kmeans/下會(huì)產(chǎn)生 clusteredPoints,利用 Mahout 的 seqdumper 可以看其內(nèi)容:
    Input Path:
    hdfs://rac122:18020/shanlei/kmeans/clusteredPoints/part-m-00000
    Key class: class
    org.apache.hadoop.io.IntWritable Value Class: class
    org.apache.mahout.clustering.classify.WeightedPropertyVectorWri
    table
    Key: 301: Value: wt: 1.0 distance:
    6.6834472852629006 vec: 1 = [1.000, 4.000, 3.000, 11.000, 4.000, 3.000]
    Key: 980: Value: wt: 1.0 distance:
    2.3966504034528384 vec: 2 = [2.000, 2.000, 5.000, 2.000, 10.000, 3.000]
    Key 對(duì)應(yīng)的則是相關(guān)聚類的 id,distance 為到 centroid 的距離。vec 則是原始的向量。
    從 Mahout 的聚類輸出結(jié)果來看,能夠很容易的實(shí)現(xiàn) IBM 簡(jiǎn)化后的 Silhouette 算法,內(nèi)聚因子 (a) 可以簡(jiǎn)單的獲取到,而離散因子 (b) 也能夠簡(jiǎn)單的計(jì)算實(shí)現(xiàn)。下面我們就來設(shè)計(jì) Mahout 中的實(shí)現(xiàn)。
    Mahout 中 Silhouette 實(shí)現(xiàn)
    算法設(shè)計(jì):
    遵循 Hadoop 上 MR 程序的設(shè)計(jì)原則,算法設(shè)計(jì)考慮了 mapper,reducer 及 combiner 類。
    Mapper 設(shè)計(jì):
    輸入目錄:聚類的最終結(jié)果目錄 clusteredPoints(通過命令行參數(shù)-i 設(shè)置),
    輸入:
    Key:IntWritable,Value:WeightedPropertyVectorWritable
    輸出:
    Key:IntWritable(無意義,常量 1),Value:Text(單個(gè)點(diǎn)的 Silhouette 值,格式為“cnt,Silhouette 值”)
    Setup 過程:
    因?yàn)樾枰?jì)算 separation 時(shí)候要訪問其它的 centroids,所以在 setup 中讀?。ㄍㄟ^命令行參數(shù)-c 設(shè)置)并緩存。
    Map 過程:
    由于輸入的 Value 為 WeightedPropertyVectorWritable,可以通過訪問字段 distance獲得參數(shù) a,并遍歷緩存的 centroids,針對(duì)其 id 不等于 Key 的,逐一計(jì)算距離,其最小的就是參數(shù) b。
    Map 的結(jié)果 Key 使用常量 1,Value 為形如“1,0.23”這樣的“cnt,Silhouette 值”格式。
    Reducer 設(shè)計(jì):
    輸入:
    Key:IntWritable(常量 1),Value: Text (combine 后的中間 Silhouette 值,格式為“cnt,Silhouette 值”)。
    輸出:
    Key:IntWritable(常量 1),Value:整個(gè)聚類的 Silhouette 值,格式為“cnt,Silhouette 值”。
    輸出目錄:最終文件的產(chǎn)生目錄,通過命令行參數(shù)-o 設(shè)置。
    Reduce 過程:
    根據(jù)“,”把每個(gè) Value,分解為 cnt,和 Silhouette,最后進(jìn)行加權(quán)平均。
    Combiner 設(shè)計(jì):
    為減少數(shù)據(jù)的 copy,采用 combiner,其實(shí)現(xiàn)即為 reducer 的實(shí)現(xiàn)。
    實(shí)現(xiàn)代碼:
    Mapper 類:
    public class AssesserMapper extends Mapper<IntWritable,
    WeightedPropertyVectorWritable, IntWritable, Text> {
    private List<Cluster> clusterModels;
    private static final Logger log = LoggerFactory.getLogger(ClusterAssesser.class);
    protected void setup(Context context) throws IOException, InterruptedException {
    super.setup(context);
    Configuration conf = context.getConfiguration();
    String clustersIn = conf.get(ClusterClassificationConfigKeys.CLUSTERS_IN);
    clusterModels = Lists.newArrayList();
    if (clustersIn != null && !clustersIn.isEmpty()) {
    Path clustersInPath = new Path(clustersIn);
    clusterModels = populateClusterModels(clustersInPath, conf);
    }
    }
    private static List<Cluster> populateClusterModels(Path clustersIn,
    Configuration conf) throws IOException {
    List<Cluster> clusterModels = Lists.newArrayList();
    Path finalClustersPath = finalClustersPath(conf, clustersIn);
    Iterator<?> it = new SequenceFileDirValueIterator<Writable>(finalClustersPath, PathType.LIST,
    PathFilters.partFilter(), null, false, conf);
    while (it.hasNext()) {
    ClusterWritable next = (ClusterWritable) it.next();
    Cluster cluster = next.getValue();
    cluster.configure(conf);
    clusterModels.add(cluster);
    }
    return clusterModels;
    }
    private static Path finalClustersPath(Configuration conf,
    Path clusterOutputPath) throws IOException {
    FileSystem fileSystem = clusterOutputPath.getFileSystem(conf);
    FileStatus[] clusterFiles = fileSystem.listStatus(clusterOutputPath,
    PathFilters.finalPartFilter());
    log.info("files: {}", clusterOutputPath.toString());
    return clusterFiles[0].getPath();
    }
    protected void map(IntWritable key, WeightedPropertyVectorWritable vw, Context context)
    throws IOException, InterruptedException {
    int clusterId=key.get();
    double cohension,separation=-1,silhouete;
    Map<Text,Text> props=vw.getProperties();
    cohension=Float.valueOf(props.get(new Text("distance")).toString());
    Vector vector = vw.getVector();
    for ( Cluster centroid : clusterModels) {
    if (centroid.getId()!=clusterId) {
    DistanceMeasureCluster distanceMeasureCluster = (DistanceMeasureCluster) centroid;
    DistanceMeasure distanceMeasure = distanceMeasureCluster.getMeasure();
    double f = distanceMeasure.distance(centroid.getCenter(), vector);
    if (f<separation || separation<-0.5) separation=f;
    }
    }
    Text value=new Text(Long.toString(1)+","+Double.toString(silhouete));
    IntWritable okey=new IntWritable();
    okey.set(1);
    context.write(okey, value);
    }
    }
    Reducer 類:
    public class AssesserReducer extends Reducer<IntWritable, Text, IntWritable, Text> {
    private static final Logger log = LoggerFactory.getLogger(ClusterAssesser.class);
    protected void setup(Context context) throws IOException, InterruptedException {
    super.setup(context);
    log.info("reducer");
    }
    private static final Pattern SEPARATOR = Pattern.compile("[t,]");
    public void reduce(IntWritable key, Iterable<Text> values,
    Context context)
    throws IOException, InterruptedException {
    long cnt=0;
    double total=0;
    for (Text value : values) {
    String[] p=SEPARATOR.split(value.toString());
    Long itemCnt=Long.parseLong(p[0]);
    double v=Double.parseDouble(p[1]);
    total=total+ itemCnt*v;
    cnt=cnt+itemCnt;
    }
    }
    }
    Job 類:
    public class ClusterAssesser extends AbstractJob {
    private ClusterAssesser() {
    }
    public int run(String[] args) throws Exception {
    addInputOption();
    addOutputOption();
    //addOption(DefaultOptionCreator.methodOption().create());
    addOption(DefaultOptionCreator.clustersInOption()
    .withDescription("The input centroids").create());
    if (parseArguments(args) == null) {
    return -1;
    }
    Path input = getInputPath();
    Path output = getOutputPath();
    Path clustersIn = new Path(getOption(DefaultOptionCreator.CLUSTERS_IN_OPTION));
    if (getConf() == null) {
    setConf(new Configuration());
    }
    run(getConf(), input, clustersIn, output);
    return 0;
    }
    private void run(Configuration conf, Path input, Path clustersIn,
    Path output)throws IOException, InterruptedException,
    ClassNotFoundException {
    conf.set(ClusterClassificationConfigKeys.CLUSTERS_IN, clustersIn.toUri().toString());
    Job job = new Job(conf, "Cluster Assesser using silhouete over input: " + input);
    job.setJarByClass(ClusterAssesser.class);
    job.setInputFormatClass(SequenceFileInputFormat.class);
    job.setOutputFormatClass(SequenceFileOutputFormat.class);
    job.setMapperClass(AssesserMapper.class);
    job.setCombinerClass(AssesserReducer.class);
    job.setReducerClass(AssesserReducer.class);
    job.setNumReduceTasks(1);
    job.setOutputKeyClass(IntWritable.class);
    job.setOutputValueClass(Text.class);
    FileInputFormat.addInputPath(job, input);
    FileOutputFormat.setOutputPath(job, output);
    if (!job.waitForCompletion(true)) {
    throw new InterruptedException("Cluster Assesser Job failed processing " + input);
    }
    }
    private static final Logger log = LoggerFactory.getLogger(ClusterAssesser.class);
    public static void main(String[] args) throws Exception {
    ToolRunner.run(new Configuration(), new ClusterAssesser(), args);
    }
    }
    編譯運(yùn)行:
    編譯環(huán)境準(zhǔn)備:
    在從 Mahout 網(wǎng)站下載的包中,同時(shí)包含了源碼以及可以導(dǎo)入到 eclipse 的工程,導(dǎo)入后,會(huì)產(chǎn)生 mahout-core,mahout-distribution,mahout-example 等不同的 projects,我們首先編譯一遍,保證沒有錯(cuò)誤,然后再考慮如何增加自己的代碼。
    當(dāng)然,Mahout 在頂層目錄也提供了一個(gè)編譯腳本:compile.sh, 可以在命令行完成編譯。
    代碼編譯:
    把自己的代碼放到 example/src/main/java/目錄下,自動(dòng)編譯就可以了。輸出產(chǎn)生的類:com.ai.cluster.assesser.ClusterAssesser,然后就被打包到了 examples/target/mahout-examples-0.9-job.jar 中。
    配置:
    把 examples/target/mahout-examples-0.9-job.jar 覆蓋頂層的 mahout-examples-0.9-job.jar
    通過在 conf/driver.classes.default.props 文件添加如下行,把我們的實(shí)現(xiàn)類加入到 Mahout 的配置中,從而可以通過 Mahout 腳本執(zhí)行:
    com.ai.cluster.assesser.ClusterAssesser = assesser : assesse cluster result using silhoueter algorithm
    運(yùn)行
    利用前面我們做聚類過程分析產(chǎn)生的聚類結(jié)果:
    bin/mahout assesser -i /shanlei/kmeans/clusteredPoints -o /shanlei/silhouete -c
    /shanlei/kmeans --tempDir /shanlei/temp
    其中的-c 為輸入聚類的中心點(diǎn),-i 為聚類的點(diǎn) –o 為最終的輸出。
    查看結(jié)果:
    bin/mahout seqdumper -i /shanlei/silhouete -o ./a.txt
    more a.txt:
    Input Path: hdfs://rac122:18020/shanlei/silhouete/part-r-00000
    Key class: class org.apache.hadoop.io.IntWritable Value Class: class org.apache.hadoop.io.Text
    Key: 1: Value: 1000,0.5217678842906524
    Count: 1
    1000 表示共 1000 個(gè)點(diǎn),0.52176 為聚類的 Silhouette 值。大于 0.5,看起來效果還行。
    結(jié)束語:
    不同于其它的套件,Mahout 從發(fā)布起就是為處理海量數(shù)據(jù)、為生產(chǎn)而準(zhǔn)備的。直到現(xiàn)在,Mahout 的重心還是在優(yōu)化各種算法上面,對(duì)易用性考慮不多,而且學(xué)習(xí)成本也很高。但 Mahout 不僅僅提供某些特定的算法,而且還把前期準(zhǔn)備中的數(shù)據(jù)清洗,轉(zhuǎn)換,以及后續(xù)的效果評(píng)估、圖形化展現(xiàn)都集成在一塊,方便用戶。這不僅是一種發(fā)展趨勢(shì),也是爭(zhēng)取用戶的一個(gè)關(guān)鍵因素。希望大家都能夠加入進(jìn)來,提供各種各樣的輔助功能,讓 Mahout 變得易用起來。