Apache Zeppelin: behind spark interpreter

Here is an overview, what is hidden behind spark interpreter in Apache Zeppelin.

Source code for Apache Zeppelin is available here: source code.

First of all we should look inside of Interpreter launcher: source of launchers. For spark there is a special launcher SparkInterpreterLauncher.java, which extends standard launcher: StandardInterpreterLauncher.java. Here we can see configuration for spark command line, like –conf <key>=<value> and usage of proxy users, plus, check of running mode, like yarn client or cluster.

Depending on selected isolation options in interpreter, spark is started as RemoteInterpreterRunningProcess or RemoteInterpreterManagedProcess. This stays in launch function in StandardInterpreterLauncher. Important is that spark starts as a new process. For example, if you use spark option –proxy-user, then zeppelin will create separate spark processes per user. Communication between current process and interpreter process is done using RPC in case of RemoteInterpreterManagedProcess.

Behind the scene interpreter.sh is started by RemoteInterpreterManagedProcess. And inside of interpreter.sh script, spark-submit from active spark distribution, with RemoteInterpreterServer as a spark class is called. To communicate with RemoteInterpreterServer from zeppelin UI Apache Thrift RPC is used.

This makes a trick, spark started per spark submit in separate JVM together with Thrift RPC server, to enable communication with spark process.

Spark interpreter (spark-interpreter*.jar) is in class path. RemoteInterpreterServer can initialize then spark inside of its process.

Then we need to look, what happened inside of spark interpreter, which is a classical spark application with SparkContext for spark1 and SparkSession for spark2.

We start with SparkInterpreter, which is driver program for spark. You can see that spark context or spark session is initialized using reflection. It looks like this is done to support both spark versions, and resolve needed version during run time, when it becomes clear, which version of spark to start. Zeppelin uses different scala classes depending on scala version:

SparkScala210Interpreter.scala
SparkScala211Interpreter.scala
SparkScala212Interpreter.scala

These classes extends BaseSparkScalaInterpreter, where you can see also, binding of variables and what are shown during %spark paragraph execution in zeppelin:

  bind("sc", "org.apache.spark.SparkContext", sc, List("""@transient"""))
  bind("sqlContext", sqlContext.getClass.getCanonicalName, sqlContext, List("""@transient"""))

  scalaInterpret("import org.apache.spark.SparkContext._")
  scalaInterpret("import sqlContext.implicits._")
  scalaInterpret("import sqlContext.sql")
  scalaInterpret("import org.apache.spark.sql.functions._")

But what is bind() and scalaInterpret()? Here we need to look into SparkILoop class, which is actually responsible for interactive mode of spark driver. In other case, it will be not possible to show results in zeppelin after spark finishes execution. SparkILoop is a part of spark distribution.

...
sparkILoop = new SparkILoop(None, replOut)
sparkILoop.settings = settings
sparkILoop.createInterpreter()
val in0 = getDeclareField(sparkILoop, "in0").asInstanceOf[Option[BufferedReader]]
val reader = in0.fold(sparkILoop.chooseReader(settings))(r => SimpleReader(r, replOut, interactive = true))

sparkILoop.in = reader
sparkILoop.initializeSynchronous()
sparkILoop.in.postInit()
...
sparkILoop.bind(name, tpe, value, modifier) //bind(...) in code above
... 
sparkILoop.interpret(code)//scalaInterpret() in code above    
...
sparkILoop.closeInterpreter()

[Total: 0   Average: 0/5]