欢迎关注Hadoop、Spark、Flink、Hive、Hbase、Flume等大数据资料分享微信公共账号:iteblog_hadoop
  1. 文章总数:961
  2. 浏览总数:11,461,183
  3. 评论:3870
  4. 分类目录:103 个
  5. 注册用户数:5832
  6. 最后更新:2018年10月17日
过往记忆博客公众号iteblog_hadoop
欢迎关注微信公众号:
iteblog_hadoop
大数据技术博客公众号bigdata_ai
大数据猿:
bigdata_ai

解决Spark shell模式下初始化Job出现的异常

Spark 的 shell 作为一个强大的交互式数据分析工具,提供了一个简单的方式来学习 API。它可以使用 Scala(在 Java 虚拟机上运行现有的 Java 库的一个很好方式) 或 Python。我们很可能会在Spark Shell模式下运行下面的测试代码:


如果想及时了解SparkHadoop或者Hbase相关的文章,欢迎关注微信公共帐号:iteblog_hadoop
scala> import org.apache.hadoop.mapreduce.{Job, MRJobConfig, TaskAttemptID, TaskType}
import org.apache.hadoop.mapreduce.{Job, MRJobConfig, TaskAttemptID, TaskType}           

scala> val job = Job.getInstance(sc.hadoopConfiguration)
java.lang.IllegalStateException: Job in state DEFINE instead of RUNNING
	at org.apache.hadoop.mapreduce.Job.ensureState(Job.java:283)
	at org.apache.hadoop.mapreduce.Job.toString(Job.java:452)
	at scala.runtime.ScalaRunTime$.scala$runtime$ScalaRunTime$$inner$1(ScalaRunTime.scala:324)
	at scala.runtime.ScalaRunTime$.stringOf(ScalaRunTime.scala:329)
	at scala.runtime.ScalaRunTime$.replStringOf(ScalaRunTime.scala:337)
	at .<init>(<console>:10)
	at .<clinit>(<console>)
	at $print(<console>)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:606)
	at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
	at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1346)
	at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
	at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
	at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
	at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)
	at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
	at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
	at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)
	at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)
	at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
	at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)
	at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
	at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
	at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
	at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
	at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)
	at org.apache.spark.repl.Main$.main(Main.scala:31)
	at org.apache.spark.repl.Main.main(Main.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:606)
	at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
	at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
	at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

在初始化job实例的时候出现了java.lang.IllegalStateException: Job in state DEFINE instead of RUNNING异常。这是因为在Spark shell模式下,每运行一行代码其都会输出这个对象,所以在初始化job的时候会调用其toString方法来打印出这个对象;但是在toString方法的实现里面会对其状态进行检查,确保job实例是JobState.RUNNING状态,但是这个时候job的状态是JobState.DEFINE,所以会导致异常。本文将提供两种方法来解决这个问题。

将Job封装到类里面

一种办法就是将Job对象封装到类里面,这样就不会调用Job的toString方法,这样就可以避免出现异常,实现如下:

scala> import org.apache.spark.SparkContext
import org.apache.spark.SparkContext

scala> class JobWrapper(sc:SparkContext){ val job = Job.getInstance(sc.hadoopConfiguration); }
defined class JobWrapper

scala> val paths = new Path("/iteblog/data.csv")
paths: org.apache.hadoop.fs.Path = /iteblog/data.csv

scala> val jobWrapper = new JobWrapper(sc)
jobWrapper: JobWrapper = $iwC$$iwC$JobWrapper@58e4210d

scala> FileInputFormat.setInputPaths(jobWrapper.job, paths)

scala> val splits = format.getSplits(jobWrapper.job)
splits: java.util.List[org.apache.hadoop.mapreduce.InputSplit] = [hdfs://iteblogcluster/iteblog/data.csv:0+3145728, hdfs://iteblogcluster/iteblog/data.csv:3145728+3145728, hdfs://iteblogcluster/iteblog/data.csv:6291456+3428026]

可以看出运行上面代码片段不会出现异常了。

初始化Job对象的时候使用lazy

其实我们可以在初始化Job对象的时候使用lazy,这样会只有job对象被真正使用的时候才会初始化,如下:

scala> lazy val job = Job.getInstance(sc.hadoopConfiguration)
job: org.apache.hadoop.mapreduce.Job = <lazy>

scala> FileInputFormat.setInputPaths(job, paths)

scala> val splits = format.getSplits(jobWrapper.job)
splits: java.util.List[org.apache.hadoop.mapreduce.InputSplit] = [hdfs://iteblogcluster/iteblog/data.csv:0+3145728, hdfs://iteblogcluster/iteblog/data.csv:3145728+3145728, hdfs://iteblogcluster/iteblog/data.csv:6291456+3428026]

这种方法比上面的要简单多了。

本博客文章除特别声明,全部都是原创!
转载本文请加上:转载自过往记忆(https://www.iteblog.com/)
本文链接: 【解决Spark shell模式下初始化Job出现的异常】(https://www.iteblog.com/archives/2142.html)
喜欢 (8)
分享 (0)
发表我的评论
取消评论

表情
本博客评论系统带有自动识别垃圾评论功能,请写一些有意义的评论,谢谢!