Spark 的 shell 作为一个强大的交互式数据分析工具,提供了一个简单的方式来学习 API。它可以使用 Scala(在 Java 虚拟机上运行现有的 Java 库的一个很好方式) 或 Python。我们很可能会在Spark Shell模式下运行下面的测试代码:
scala> import org.apache.hadoop.mapreduce.{Job, MRJobConfig, TaskAttemptID, TaskType}
import org.apache.hadoop.mapreduce.{Job, MRJobConfig, TaskAttemptID, TaskType}
scala> val job = Job.getInstance(sc.hadoopConfiguration)
java.lang.IllegalStateException: Job in state DEFINE instead of RUNNING
at org.apache.hadoop.mapreduce.Job.ensureState(Job.java:283)
at org.apache.hadoop.mapreduce.Job.toString(Job.java:452)
at scala.runtime.ScalaRunTime$.scala$runtime$ScalaRunTime$$inner$1(ScalaRunTime.scala:324)
at scala.runtime.ScalaRunTime$.stringOf(ScalaRunTime.scala:329)
at scala.runtime.ScalaRunTime$.replStringOf(ScalaRunTime.scala:337)
at .<init>(<console>:10)
at .<clinit>(<console>)
at $print(<console>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1346)
at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)
at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)
at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)
at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)
at org.apache.spark.repl.Main$.main(Main.scala:31)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
在初始化job实例的时候出现了java.lang.IllegalStateException: Job in state DEFINE instead of RUNNING异常。这是因为在Spark shell模式下,每运行一行代码其都会输出这个对象,所以在初始化job的时候会调用其toString方法来打印出这个对象;但是在toString方法的实现里面会对其状态进行检查,确保job实例是JobState.RUNNING状态,但是这个时候job的状态是JobState.DEFINE,所以会导致异常。本文将提供两种方法来解决这个问题。
将Job封装到类里面
一种办法就是将Job对象封装到类里面,这样就不会调用Job的toString方法,这样就可以避免出现异常,实现如下:
scala> import org.apache.spark.SparkContext
import org.apache.spark.SparkContext
scala> class JobWrapper(sc:SparkContext){ val job = Job.getInstance(sc.hadoopConfiguration); }
defined class JobWrapper
scala> val paths = new Path("/iteblog/data.csv")
paths: org.apache.hadoop.fs.Path = /iteblog/data.csv
scala> val jobWrapper = new JobWrapper(sc)
jobWrapper: JobWrapper = $iwC$$iwC$JobWrapper@58e4210d
scala> FileInputFormat.setInputPaths(jobWrapper.job, paths)
scala> val splits = format.getSplits(jobWrapper.job)
splits: java.util.List[org.apache.hadoop.mapreduce.InputSplit] = [hdfs://iteblogcluster/iteblog/data.csv:0+3145728, hdfs://iteblogcluster/iteblog/data.csv:3145728+3145728, hdfs://iteblogcluster/iteblog/data.csv:6291456+3428026]
可以看出运行上面代码片段不会出现异常了。
初始化Job对象的时候使用lazy
其实我们可以在初始化Job对象的时候使用lazy,这样会只有job对象被真正使用的时候才会初始化,如下:
scala> lazy val job = Job.getInstance(sc.hadoopConfiguration) job: org.apache.hadoop.mapreduce.Job = <lazy> scala> FileInputFormat.setInputPaths(job, paths) scala> val splits = format.getSplits(jobWrapper.job) splits: java.util.List[org.apache.hadoop.mapreduce.InputSplit] = [hdfs://iteblogcluster/iteblog/data.csv:0+3145728, hdfs://iteblogcluster/iteblog/data.csv:3145728+3145728, hdfs://iteblogcluster/iteblog/data.csv:6291456+3428026]
这种方法比上面的要简单多了。
本博客文章除特别声明,全部都是原创!原创文章版权归过往记忆大数据(过往记忆)所有,未经许可不得转载。
本文链接: 【解决Spark shell模式下初始化Job出现的异常】(https://www.iteblog.com/archives/2142.html)


