XML(可扩展标记语言,英语:eXtensible Markup Language,简称: XML)是一种标记语言,也是行业标准数据交换交换格式,它很适合在系统之间进行数据存储和交换(话说Hadoop、Hive等的配置文件就是XML格式的)。本文将介绍如何使用MapReduce来读取XML文件。但是Hadoop内部是无法直接解析XML文件;而且XML格式中没有同步标记,所以并行地处理单个XML文件比较棘手。本文将一一介绍如何使用MapReduce处理XML文件,并且能够对其进行分割,然后并行处理这些分片。
  MapReduce里面读取文件一般都是使用InputFormat类进行的,比如读取文本文件可以使用TextInputFormat类进行(关于InputFormat类的源码分析可以参见《MapReduce数据输入中InputFormat类源码解析》),所以我们在MapReduce中解析XML文件也可以自定义类似的XMLInputFormat。熟悉Mahout的同学应该知道,它里面实现了一个XmlInputFormat(https://github.com/apache/mahout/blob/ad84344e4055b1e6adff5779339a33fa29e1265d/examples/src/main/java/org/apache/mahout/classifier/bayes/XmlInputFormat.java)类,我们可以指定指定的开始和结束标记来分隔XML文件,然后我们就可以像解析Text文件一样来解析Xml文件,如下:
Configuration conf = new Configuration();
conf.set("xmlinput.start", "<property>");
conf.set("xmlinput.end", "</property>");
Job job = new Job(conf);
 job.setInputFormatClass(XmlInputFormat.class);
 指定了xmlinput.start和xmlinput.end就可以搜索XML文件中的开始和结束标记,从而获取到XML里面的数据。完整的map程序如下:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.*;
import org.slf4j.*;
import javax.xml.stream.*;
import java.io.*;
public static class Map extends Mapper<LongWritable, Text, Text, Text> {
    @Override
    protected void map(LongWritable key, Text value,
                       Mapper.Context context)
        throws   IOException, InterruptedException {
      String document = value.toString();
      System.out.println("'" + document + "'");
      try {
        XMLStreamReader reader =
            XMLInputFactory.newInstance().createXMLStreamReader(new
                ByteArrayInputStream(document.getBytes()));
        String propertyName = "";
        String propertyValue = "";
        String currentElement = "";
        while (reader.hasNext()) {
          int code = reader.next();
          switch (code) {
            case START_ELEMENT:
              currentElement = reader.getLocalName();
              break;
            case CHARACTERS:
              if (currentElement.equalsIgnoreCase("name")) {
                propertyName += reader.getText();
              } else if (currentElement.equalsIgnoreCase("value")) {
                propertyValue += reader.getText();
              }
              break;
          }
        }
        reader.close();
        context.write(propertyName.trim(), propertyValue.trim());
      } catch (Exception e) {
        log.error("Error processing '" + document + "'", e);
      }
}
 map接收一个Text的值,里面的值包含了开始和结束的标记之间的数据,然后我们就可以使用java内置的XML Streaming API解析器提取每个属性的key和value,最后输出key和value。完整的代码如下:
package com.iteblog.hadoop;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.*;
import org.slf4j.*;
import javax.xml.stream.*;
import java.io.*;
import static javax.xml.stream.XMLStreamConstants.*;
public final class XML{
  private static final Logger log = LoggerFactory.getLogger
      (HadoopPropertyXMLMapReduce.class);
  public static class Map extends Mapper<LongWritable, Text, Text, Text> {
    @Override
    protected void map(LongWritable key, Text value,
                       Mapper.Context context)
        throws IOException, InterruptedException {
      String document = value.toString();
      System.out.println("'" + document + "'");
      try {
        XMLStreamReader reader =
            XMLInputFactory.newInstance().createXMLStreamReader(new
                ByteArrayInputStream(document.getBytes()));
        String propertyName = "";
        String propertyValue = "";
        String currentElement = "";
        while (reader.hasNext()) {
          int code = reader.next();
          switch (code) {
            case START_ELEMENT:
              currentElement = reader.getLocalName();
              break;
            case CHARACTERS:
              if (currentElement.equalsIgnoreCase("name")) {
                propertyName += reader.getText();
              } else if (currentElement.equalsIgnoreCase("value")) {
                propertyValue += reader.getText();
              }
              break;
          }
        }
        reader.close();
        context.write(propertyName.trim(), propertyValue.trim());
      } catch (Exception e) {
        log.error("Error processing '" + document + "'", e);
      }
    }
  }
  public static void main(String... args) throws Exception {
    runJob(args[0], args[1]);
  }
  public static void runJob(String input,
                            String output)
      throws Exception {
    Configuration conf = new Configuration();
    conf.set("key.value.separator.in.input.line", " ");
    conf.set("xmlinput.start", "<property>");
    conf.set("xmlinput.end", "</property>");
    Job job = new Job(conf);
    job.setJarByClass(XML.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);
    job.setMapperClass(Map.class);
    job.setInputFormatClass(XmlInputFormat.class);
    job.setNumReduceTasks(0);
    job.setOutputFormatClass(TextOutputFormat.class);
    FileInputFormat.setInputPaths(job, new Path(input));
    Path outPath = new Path(output);
    FileOutputFormat.setOutputPath(job, outPath);
    outPath.getFileSystem(conf).delete(outPath, true);
    job.waitForCompletion(true);
  }
}
 我们可以使用hadoop内部的core-site.xml文件做测试,如下:
$ bin/hadoop jar xml.jar com.iteblog.hadoop.XML input/core-site.xml output/
运行完上面的程序,可以在output目录下产生几个part*的文件,这说明解析XML文件的程序起作用了。
本博客文章除特别声明,全部都是原创!原创文章版权归过往记忆大数据(过往记忆)所有,未经许可不得转载。
本文链接: 【使用MapReduce读取XML文件】(https://www.iteblog.com/archives/1596.html)


 
 
 
 
 
 
 
 
一直关注博主的这个blog和微信公众号,学习了很多东西,希望能开个复制功能,本人承诺转发时候一定带上链接,并且我也不在about云上发文章,谢谢博主 😀