温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

Spark Shell怎么使用

发布时间:2021-12-16 14:48:23 来源:亿速云 阅读:176 作者:iii 栏目:云计算

这篇文章主要讲解了“Spark Shell怎么使用”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“Spark Shell怎么使用”吧!

##使用Spark Shell进行交互分析##

###基础## Spark的shell提供了一个学习API的简单的方式,和一个强大的交互式分析数据的工具。他在Scala(它运行在Java JVM上并且这是一个很好的使用已存在的Java库的方式)和Python中都是可用的。通过在Spark目录运行下面的(脚本)开始:

./bin/spark-shell

Spark的最主要的抽象是一个分布式项目的集合,叫做弹性分布式数据集(RDD)。RDDs可以通过Hadoop输入格式(例如HDFS文件)创建,或者通过转换其他RDDs得到。让我们通过Spark源码目录下的文件README的文本创建一个新的RDD:

scala> val textFile = sc.textFile("README.md")

textFile: spark.RDD[String] = spark.MappedRDD@2ee9b6e3

RDDs拥有actions,他们返回值,并且转换,返回一个新的RDDs的指针。让我们以一些actions开始:

scala> textFile.count() // Number of items in this RDD

res0: Long = 126

scala> textFile.first() // First item in this RDD

res1: String = # Apache Spark

现在,让我们使用一个转换,我们将使用一个filter转换来返回一个拥有这个文件的项目的子集的新的RDD:

scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))

linesWithSpark: spark.RDD[String] = spark.FilteredRDD@7dd4af09

我们可以链式的调用转换盒动作:

scala> textFile.filter(line => line.contains("Spark")).count() // How many lines contain "Spark"?

res3: Long = 15

###更多的RDD操作### RDD动作和转换可以用于更加复杂的计算,让我们说我们想找出包含字数最多的的行:

scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)

res4: Long = 15

This first maps a line to an integer value, creating a new RDD. reduce is called on that RDD to find the largest line count. The arguments to map and reduce are Scala function literals (closures), and can use any language feature or Scala/Java library. For example, we can easily call functions declared elsewhere. We’ll use Math.max() function to make this code easier to understand:

scala> import java.lang.Math import java.lang.Math

scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b)) res5: Int = 15

一个常见的数据流模型是MapReduce,就像大家熟知的Hadoop.Spark可以非常容易的实现MapReduce。

scala> val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)

wordCounts: spark.RDD[(String, Int)] = spark.ShuffledAggregatedRDD@71f027b8

现在,我们联合flatMap,map和reduceByKey转换来计算文件中每一个单词出现的次数作为一个RDD(String,Int)对。要在我们的shell中统计单词数,我们可以使用collect动作:

scala> wordCounts.collect()

res6: Array[(String, Int)] = Array((means,1), (under,2), (this,3), (Because,1), (Python,2), (agree,1), (cluster.,1), ...)

###缓存### Spark同样支持将数据集放到集群范围的内存中缓存。当数据需要重复访问时这非常有用。例如,当查询一个小的”热点“数据集或者当运行一个迭代算法就像PageRank。作为一个简单的例子,让我们将我们的lineWithSpark数据集进行缓存。

scala> linesWithSpark.cache() res7: spark.RDD[String] = spark.FilteredRDD@17e51082

scala> linesWithSpark.count() res8: Long = 15

scala> linesWithSpark.count() res9: Long = 15

##独立应用程序## 现在,假设我们想使用Spark API开发一个独立应用程序。我们将通过一个简单的使用Scala(使用SBT),Java(使用Maven)和Python应用程序:

这个示例将使用Maven来编译一个应用程序Jar,但是其他任何相似的构建工具同样可以工作。

我们将创建一个非常简单的Spark应用程序,SimpleApp.java。

import org.apache.spark.api.java.*;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function;

public class SimpleApp {
 
   public static void main(String agrs[]){

          String logFile="YOUR_SPARK_HOME/README.md";
          SparkConf conf=new SparkConf().setAppName("Simple Application");
          JavaSparkContext sc=new JavaSparkContext(conf);
          JavaDDD<String> logData=sc.textFile(logFile).cache();

          long numAs=logData.filter(new Function<String,Boolean>(){
				public Boolean call(String s){
 					return s.contains("a");
				}
		   }).count();

		
		  long numBs=logData.filter(new Function<String,Boolean>(){
				public Boolean call(String s){
 					return s.contains("b");
				}
		   }).count();

           
 		   System.out.println("Lines with a:"+numAs+" ,lines with b:"+numBs);

   }

}

这个程序只是计算一个文本文件中包含”a"的行数和包含“b”的行数。注意,你必须使用Spark安装的位置替换掉“YOUR_SPARK_HOME".在Scala的例子中,我们初始化一个SparkContext,尽管这里我们使用一个特殊的JavaSparkContext类来获得一个Java友好的(SparkContext)。同时我们创建RDDs(使用JavaRDD表示)并且在他们上面运行转换。最后,我们通过创建一个实现spark.api.java.function.Function的类来传递函数给Spark。Spark编程指南更加详细的描述了他们的不同点。

要构建这个项目,我们编写一个Maven的pox.xml文件,它将Spark作为一个依赖列出。注意,Spark artifacts被标记为一个Scala版本。

<project>
  <groupId>edu.berkeley</groupId>
  <artifactId>simple-project</artifactId>
  <modelVersion>4.0.0</modelVersion>
  <name>Simple Project</name>
  <packaging>jar</packaging>
  <version>1.0</version>
  <repositories>
    <repository>
      <id>Akka repository</id>
      <url>http://repo.akka.io/releases</url>
    </repository>
  </repositories>
  <dependencies>
    <dependency> <!-- Spark dependency -->
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.10</artifactId>
      <version>1.0.2</version>
    </dependency>
  </dependencies>
</project>

我们按照规定的Maven目录架构创建了这些文件:

$ find .
./pom.xml
./src
./src/main
./src/main/java
./src/main/java/SimpleApp.java

现在,我们可以使用Maven打包这个应用,并且使用./bin/spark-submit执行它。

# Package a jar containing your application
$ mvn package
...
[INFO] Building jar: {..}/{..}/target/simple-project-1.0.jar

# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
  --class "SimpleApp" \
  --master local[4] \
  target/simple-project-1.0.jar
...
Lines with a: 46, Lines with b: 23

##Where to Go from Here## 恭喜你运行了第一个Spark应用程序。

  • For an in-depth overview of the API, start with the Spark programming guide, or see “Programming Guides” menu for other components.

  • For running applications on a cluster, head to the deployment overview.

  • Finally, Spark includes several samples in the examples directory (Scala, Java, Python). You can run them as follows:

    For Scala and Java, use run-example: ./bin/run-example SparkPi

    For Python examples, use spark-submit directly: ./bin/spark-submit examples/src/main/python/pi.py

感谢各位的阅读,以上就是“Spark Shell怎么使用”的内容了,经过本文的学习后,相信大家对Spark Shell怎么使用这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是亿速云,小编将为大家推送更多相关知识点的文章,欢迎关注!

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI