Fast Data Processing with Spark 2(Third Edition)
上QQ阅读APP看书,第一时间看更新

Loading a simple text file

Let's download a Dataset and do some experimentation. One of the (if not the best) books for machine learning is The Elements of Statistical Learning, Trevor Hastie, Jerome H. Friedman, Robert Tibshirani, Springer. The book site has an interesting set of Datasets. Let's grab the spam Dataset using the following command:

wget http://www-stat.stanford.edu/~tibs/ElemStatLearn/ datasets/spam.data

Alternatively, you can find the spam Dataset from the GitHub link at https://github.com/xsankar/fdps-v3.

Note

All the examples assume that you have downloaded the repository in the fdps-v3 directory in your home folder, that is, ~/fdps-v3/. Please adjust the directory name if you have downloaded them somewhere else.

Now, load it as a text file into Spark with the following commands inside your Spark shell:

scala> val inFile = sc.textFile("data/spam.data")
scala> inFile.count()

This loads the spam.data file into Spark with each line being a separate entry in the Resilient Distributed Datasets (RDDs). You will learn about RDDs in the later chapters; however, RDD, in brief, is the basic data structure that Spark relies on. They are very versatile in terms of scaling, computation capabilities, and transformations.

Tip

Spark uses the paradigm of lazy evaluation. The sc.textFile operation is a lazy operation in Spark, so it doesn't load anything until an action is invoked on the RDD. For example, count() is an action. So the command sc.textfile would succeed even when the user enters a bogus file directory; the RDD will still be created. However, when you type in the action command, it will fail.

The sc command in the command line is the Spark context. While applications would create a Spark context explicitly, the Spark shell creates something called sc for you and this is what we normally use.

You will see the result as follows:

The count() function gives the number of lines in a file.

Now, let's look at the first line. Type in the following command:

scala> inFile.first()
And you will see a string like the screen shot below ! Excellent, you have written your first scala code !

Refer to the following screenshot:

Note that if you're connected to a Spark master, it's possible that it will attempt to load the file on any one of the different machines in the cluster, so make sure that it can be accessed by all the worker nodes in a cluster. In general, you will need to put your data in HDFS, S3, or similar distributed file systems to avoid this problem. In local mode, you can just load the file directly (for example, sc.textFile([filepath])). You can also use the addFile function on the Spark context to make a file available across all of the machines like this:

scala> import org.apache.spark.SparkFiles
scala> val file = sc.addFile("data/spam.data")
scala> val inFile = sc.textFile(SparkFiles.get("spam.data"))

Tip

Like most shells, the Spark shell has a command history; you can press the up arrow key to get to the previous commands. Are you getting tired of typing or not being sure about what method you want to call on an object? Press Tab and the Spark shell will autocomplete the line of code in the best way it can.

For this example, the RDD with each line as an individual string isn't super useful as our input data is actually space-separated numerical information. We can use the map() operation to iterate over the elements of the RDD and quickly convert it to a usable format. Note that _.toDouble is the Scala syntactic sugar for x => x.toDouble. The numbers are separated by the space. We use a map operation to convert the line to a set of numbers in string format (split the line using the space character) and then convert each of the numbers to a double, as shown next:

scala> val nums = inFile.map(line => line.split(' ').map(_.toDouble))

Verify that this is what we want by inspecting some elements in the nums RDD and comparing them against the original string RDD. Take a look at the first element of each of these by calling .first() on the RDDs:

Tip

Most of the output that will follow these commands will be extraneous INFO messages. No doubt, it will be informative to see what Spark does under the covers. However, if you want to keep the detailed messages out, you can copy conf/log4j.properties.template as conf/ log4j.properties and set log4j.logger.org.apache.spark.repl.Main=WARN instead of INFO. Once this is done, none of these messages will appear, and it will be possible for you to concentrate only on the commands and the output.

The screenshot is shown as follows:

cp log4j.properties.template log4j.properties

Then, edit the log4j.properties file and change log4j.logger.org.apache.spark.repl.Main to WARN, as shown in the following screenshot:

We inspect the first line with the first() method:

scala> inFile.first()
res1: String = 0 0.64 0.64 0 0.32 0 0 0 0 0 0 0.64 0 0 0 0.32 0 1.29 1.93 0 0.96 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0.778 0 0 3.756 61 278 1
scala> nums.first()
res2: Array[Double] = Array(0.0, 0.64, 0.64, 0.0, 0.32, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.64, 0.0, 0.0, 0.0, 0.32, 0.0, 1.29, 1.93, 0.0, 0.96, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.778, 0.0, 0.0, 3.756, 61.0, 278.0, 1.0)

When you run a command and do not specify the left-hand side of the assignment (that is, leaving out the val x value of val x = y), the Spark shell will assign a default name (that is, res[number]) to the value.

Tip

Operators in Spark are divided into transformations and actions. Transformations are evaluated lazily. Spark just creates the RDDs' lineage graph when you call a transformation, such as a map. No actual work is done until an action is invoked on the RDD. Creating the RDD and the map functions are transformations. The .first() function is an action that forces the execution.

So, when we created inFile, it really didn't do anything except create a variable and set up the pointers. Only when we call an action, such as .first(), does Spark evaluate the transformations. As a result, even if we point inFile to a nonexistent directory, Spark will take it. However, when we call inFile.first(), it will throw the Input path does not exist: error.

As you can see, the Spark shell is quite powerful. Much of the power comes from it being based on the Scala REPL (the Scala interactive shell) and so it inherits all of the power of the Scala REPL. That being said, most of the time you will probably prefer to work with more traditionally compiled code rather than in the REPL.