##使用Spark Shell进行交互分析##
###基础## Spark的shell提供了一个学习API的简单的方式,和一个强大的交互式分析数据的工具。他在Scala(它运行在Java JVM上并且这是一个很好的使用已存在的Java库的方式)和Python中都是可用的。通过在Spark目录运行下面的(脚本)开始:
scala> val textFile = sc.textFile("README.md")
textFile: spark.RDD[String] = spark.MappedRDD@2ee9b6e3
scala> textFile.count() // Number of items in this RDD
res0: Long = 126
scala> textFile.first() // First item in this RDD
res1: String = # Apache Spark
scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))
linesWithSpark: spark.RDD[String] = spark.FilteredRDD@7dd4af09
scala> textFile.filter(line => line.contains("Spark")).count() // How many lines contain "Spark"?
res3: Long = 15
###更多的RDD操作### RDD动作和转换可以用于更加复杂的计算,让我们说我们想找出包含字数最多的的行:
scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
res4: Long = 15
This first maps a line to an integer value, creating a new RDD. reduce is called on that RDD to find the largest line count. The arguments to map and reduce are Scala function literals (closures), and can use any language feature or Scala/Java library. For example, we can easily call functions declared elsewhere. We’ll use Math.max() function to make this code easier to understand:
scala> import java.lang.Math import java.lang.Math
scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b)) res5: Int = 15
scala> val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)
wordCounts: spark.RDD[(String, Int)] = spark.ShuffledAggregatedRDD@71f027b8
scala> wordCounts.collect()
res6: Array[(String, Int)] = Array((means,1), (under,2), (this,3), (Because,1), (Python,2), (agree,1), (cluster.,1), ...)
###缓存### Spark同样支持将数据集放到集群范围的内存中缓存。当数据需要重复访问时这非常有用。例如,当查询一个小的”热点“数据集或者当运行一个迭代算法就像PageRank。作为一个简单的例子,让我们将我们的lineWithSpark数据集进行缓存。
scala> linesWithSpark.cache() res7: spark.RDD[String] = spark.FilteredRDD@17e51082
scala> linesWithSpark.count() res8: Long = 15
scala> linesWithSpark.count() res9: Long = 15
##独立应用程序## 现在,假设我们想使用Spark API开发一个独立应用程序。我们将通过一个简单的使用Scala(使用SBT),Java(使用Maven)和Python应用程序:
import org.apache.spark.api.java.*; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.Function; public class SimpleApp { public static void main(String agrs[]){ String logFile="YOUR_SPARK_HOME/README.md"; SparkConf conf=new SparkConf().setAppName("Simple Application"); JavaSparkContext sc=new JavaSparkContext(conf); JavaDDD<String> logData=sc.textFile(logFile).cache(); long numAs=logData.filter(new Function<String,Boolean>(){ public Boolean call(String s){ return s.contains("a"); } }).count(); long numBs=logData.filter(new Function<String,Boolean>(){ public Boolean call(String s){ return s.contains("b"); } }).count(); System.out.println("Lines with a:"+numAs+" ,lines with b:"+numBs); } }
要构建这个项目,我们编写一个Maven的pox.xml文件,它将Spark作为一个依赖列出。注意,Spark artifacts被标记为一个Scala版本。
<project> <groupId>edu.berkeley</groupId> <artifactId>simple-project</artifactId> <modelVersion>4.0.0</modelVersion> <name>Simple Project</name> <packaging>jar</packaging> <version>1.0</version> <repositories> <repository> <id>Akka repository</id> <url>http://repo.akka.io/releases</url> </repository> </repositories> <dependencies> <dependency> <!-- Spark dependency --> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> <version>1.0.2</version> </dependency> </dependencies> </project>
$ find . ./pom.xml ./src ./src/main ./src/main/java ./src/main/java/SimpleApp.java
# Package a jar containing your application $ mvn package ... [INFO] Building jar: {..}/{..}/target/simple-project-1.0.jar # Use spark-submit to run your application $ YOUR_SPARK_HOME/bin/spark-submit \ --class "SimpleApp" \ --master local[4] \ target/simple-project-1.0.jar ... Lines with a: 46, Lines with b: 23
##Where to Go from Here## 恭喜你运行了第一个Spark应用程序。
For an in-depth overview of the API, start with the Spark programming guide, or see “Programming Guides” menu for other components.
For running applications on a cluster, head to the deployment overview.
Finally, Spark includes several samples in the examples directory (Scala, Java, Python). You can run them as follows:
For Scala and Java, use run-example: ./bin/run-example SparkPi
For Python examples, use spark-submit directly: ./bin/spark-submit examples/src/main/python/pi.py
