Apache Spark DataFrame Numeric Column (Again)

There is nothing like having a way to make it work to find more ways.

I found a .cast() method for the columns I want to use as numeric value and this avoids using a UDF to transform it.

I now prefer this way… until I find another, simpler…

package com.cinq.experience;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;

import java.io.UnsupportedEncodingException;

public class Session {

	public static void main(String[] args) throws UnsupportedEncodingException {
		SparkConf conf = new SparkConf().setAppName("SparkExperience").setMaster("local");
		JavaSparkContext jsc = new JavaSparkContext(conf);
		SQLContext sqlContext = new SQLContext(jsc);

		DataFrame df = sqlContext.read()
				.format("com.databricks.spark.csv")
				.option("header", "true")
				.load("session.csv")
				.cache();

		DataFrame crazy = df.select(df.col("x-custom-a"), df.col("x-custom-count").cast(DataTypes.LongType));
		crazy.groupBy(crazy.col("x-custom-a")).avg("CAST(x-custom-count, LongType)").show();
	}
}
Advertisements

More informative errors for Spark jobs

I am still having a hard time with an error on my Spark job but the error I was getting from the spark-submit script what that it failed because the assembly jar was not available.

When looking at my issue I read this article that mentioned that you can put the assembly jar in HDFS and specify it as an environment variable. It helps the job submission since it does not have to start by copying the jar to HDFS. The documentation on the Spark site mentioned doing doing an environment variable but I simply added it to the spark-env.sh configuration file. It does the same thing.

Side effect is that the error from the console is more informative that a real issue with the app is going on. I don’t get the odd error that the jar is not available but:

Container exited with a non-zero exit code 1
.Failing this attempt.. Failing the application.
appMasterHost: N/A
appQueue: default
appMasterRpcPort: -1
appStartTime: 1421956977050
yarnAppState: FAILED
distributedFinalState: FAILED
appTrackingUrl: http://hostname:8088/cluster/app/application_1421950771320_0008
appUser: hadoop

Spark job not working

I have had a few adventures with Spark jobs not running on Yarn because I forgot to implement classes as serializable.

This one is a new issue that I have not faced before. It seems that reflection is not working because it is trying to reflect on an int and it can’t find the class.

Here are some logs:

14/12/17 16:26:15 INFO DAGScheduler: Submitting Stage 2 (FilteredRDD[2] at filter at WriteReport.java:109), which has no missing parents
14/12/17 16:26:15 INFO DAGScheduler: Submitting 6 missing tasks from Stage 2 (FilteredRDD[2] at filter at WriteReport.java:109)
14/12/17 16:26:15 INFO YarnClusterScheduler: Adding task set 2.0 with 6 tasks
14/12/17 16:26:15 INFO TaskSetManager: Starting task 2.0:0 as TID 12 on executor 1: hostname (NODE_LOCAL)
14/12/17 16:26:15 INFO TaskSetManager: Serialized task 2.0:0 as 7089 bytes in 1 ms
14/12/17 16:26:15 INFO TaskSetManager: Starting task 2.0:1 as TID 13 on executor 1: hostname (NODE_LOCAL)
14/12/17 16:26:15 INFO TaskSetManager: Serialized task 2.0:1 as 7089 bytes in 1 ms
14/12/17 16:26:15 WARN TaskSetManager: Lost TID 12 (task 2.0:0)
14/12/17 16:26:15 WARN TaskSetManager: Loss was due to java.lang.ClassNotFoundException
java.lang.ClassNotFoundException: int
 at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
 at java.net.URLClassLoader$1.run(URLClassLoader.java:355)

Then more tasks fail for the same reason until the job is aborted:

14/12/17 16:26:15 INFO TaskSetManager: Starting task 2.0:0 as TID 14 on executor 1: ot1slhdp001v.mgmt.sl.hgn (NODE_LOCAL)
14/12/17 16:26:15 INFO TaskSetManager: Serialized task 2.0:0 as 7089 bytes in 0 ms
14/12/17 16:26:15 WARN TaskSetManager: Lost TID 13 (task 2.0:1)
14/12/17 16:26:15 INFO TaskSetManager: Loss was due to java.lang.ClassNotFoundException: int [duplicate 1]
14/12/17 16:26:15 INFO TaskSetManager: Starting task 2.0:1 as TID 15 on executor 1: ot1slhdp001v.mgmt.sl.hgn (NODE_LOCAL)
14/12/17 16:26:15 INFO TaskSetManager: Serialized task 2.0:1 as 7089 bytes in 0 ms
14/12/17 16:26:15 WARN TaskSetManager: Lost TID 14 (task 2.0:0)
14/12/17 16:26:15 INFO TaskSetManager: Loss was due to java.lang.ClassNotFoundException: int [duplicate 2]
14/12/17 16:26:15 INFO TaskSetManager: Starting task 2.0:0 as TID 16 on executor 1: ot1slhdp001v.mgmt.sl.hgn (NODE_LOCAL)
14/12/17 16:26:15 INFO TaskSetManager: Serialized task 2.0:0 as 7089 bytes in 0 ms
14/12/17 16:26:15 WARN TaskSetManager: Lost TID 15 (task 2.0:1)
14/12/17 16:26:15 INFO TaskSetManager: Loss was due to java.lang.ClassNotFoundException: int [duplicate 3]
14/12/17 16:26:15 INFO TaskSetManager: Starting task 2.0:1 as TID 17 on executor 1: ot1slhdp001v.mgmt.sl.hgn (NODE_LOCAL)
14/12/17 16:26:15 INFO TaskSetManager: Serialized task 2.0:1 as 7089 bytes in 0 ms
14/12/17 16:26:15 WARN TaskSetManager: Lost TID 16 (task 2.0:0)
14/12/17 16:26:15 INFO TaskSetManager: Loss was due to java.lang.ClassNotFoundException: int [duplicate 4]
14/12/17 16:26:15 INFO TaskSetManager: Starting task 2.0:0 as TID 18 on executor 1: ot1slhdp001v.mgmt.sl.hgn (NODE_LOCAL)
14/12/17 16:26:15 INFO TaskSetManager: Serialized task 2.0:0 as 7089 bytes in 1 ms
14/12/17 16:26:15 WARN TaskSetManager: Lost TID 17 (task 2.0:1)
14/12/17 16:26:15 INFO TaskSetManager: Loss was due to java.lang.ClassNotFoundException: int [duplicate 5]
14/12/17 16:26:15 INFO TaskSetManager: Starting task 2.0:1 as TID 19 on executor 1: ot1slhdp001v.mgmt.sl.hgn (NODE_LOCAL)
14/12/17 16:26:15 INFO TaskSetManager: Serialized task 2.0:1 as 7089 bytes in 0 ms
14/12/17 16:26:15 WARN TaskSetManager: Lost TID 18 (task 2.0:0)
14/12/17 16:26:15 INFO TaskSetManager: Loss was due to java.lang.ClassNotFoundException: int [duplicate 6]
14/12/17 16:26:15 ERROR TaskSetManager: Task 2.0:0 failed 4 times; aborting job
14/12/17 16:26:15 INFO YarnClusterScheduler: Cancelling stage 2
14/12/17 16:26:15 INFO YarnClusterScheduler: Stage 2 was cancelled

After many hours of research I found the issue in a class that is responsible to generate the report. I am passing an ArrayList with 2 objects and that causes this error. If my ArrayList has only 1 element everything “works”.

Starting with Hadoop – 2

I created a page for my Hadoop notes and will keep those up to date with what I am experimenting.

I will post short articles on what I have done and where I am facing challenges.

I think that using the OpenJDK is a mistake so I am testing with the Oracle JVM to see if it fixes some of the issues I am facing.

I have also upgraded to Fedora 20 which should not change much in how Hadoop works. The only I have noticed is an error because the temp directory is gone. I will have to investigate why that is preventing the namenode from starting. Might have to move the temp outside of /tmp to avoid this issue.

Starting with Hadoop

Trying to find simple and authorative documentation for Hadoop is harder than I expected. With the many versions out there it is easy to find documentation for the wrong version and not being able to find what really needs to be done.

Versions:

  • Hadoop 2.2.0
  • OpenJDK 1.7.0_51
  • Fedora 19

I have set my environment variables in my .bash_profile:

export JAVA_HOME=/usr/lib/jvm/java
export HADOOP_HOME=/opt/hadoop-2.2.0
export PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin

Configuration file:
$HADOOP_HOME/etc/hadoop/core-site.xml

<!--?xml version="1.0" encoding="UTF-8"?-->
<configuration>
<property>
  <name>hadoop.tmp.dir</name>
  <value>/tmp/hadoop-${user.name}</value>
</property>
<property>
  <name>fs.default.name</name>
  <value>hdfs://localhost:54310</value>
</property>
<property>
  <name>mapred.job.tracker</name>
  <value>hdfs://localhost:54311</value>
</property>
<property> 
  <name>dfs.replication</name>
  <value>8
</property>
<property>
  <name>mapred.child.java.opts</name>
  <value>-Xmx512m</value>
</property>
</configuration>

First few Hadoop commands:

hadoop namenode -format
hadoop namenode

Things to resolve:

$HADOOP_HOME/sbin/start-all.sh – does not work at all; throws a lot of errors