Spark系列之Spark启动与基础使用-程序员宅基地

技术标签: 数据计算  Spark  大数据  


title: Spark系列


第三章 Spark启动与基础使用

3.1 Spark Shell

3.1.1 Spark Shell启动

安装目录的bin目录下面,启动命令:

spark-shell
$SPARK_HOME/bin/spark-shell \
--master spark://hadoop10:7077 \
--executor-memory 512M \
--total-executor-cores 2

注意上面的 cores 参数,若是 0,那么以后这个 spark shell 中运行的代码是不能执行成功的。千万注意。必要要把 cpu cores 和 memory 设置合理。

1、executor memory不能超过虚拟机的内存
2、cpu cores不要超过spark集群能够提供的总cpu cores,否则会使用全部。最好不要使用全部。否则其他程序由于没有cpu core可用,就不能正常运行

参数说明:

--master spark://hadoop10:7077 指定Master的地址
--executor-memory 2G 指定每个worker可用内存为2G
--total-executor-cores 2 指定整个集群使用的cpu核数为2个

注意:

如果启动spark shell时没有指定master地址,但是也可以正常启动spark shell和执行spark shell中的程序,其实是启动了spark的local模式,该模式仅在本机启动一个Driver进程,没有与集群建立联系。

Spark-2.x开始/Spark3.x

Spark Shell 中已经默认将 SparkContext 类初始化为对象 sc
Spark Shell 中已经默认将 SparkSession 类初始化为对象 spark
用户代码如果需要用到,则直接应用 sc,spark 即可

Spark-1.x

Spark Shell 中已经默认将 SparkContext 类初始化为对象 sc
Spark Shell 中已经默认将 SQLContext 类初始化为对象 sqlContext
用户代码如果需要用到,则直接应用sc,sqlContext即可

3.1.2 编写WordCount

在提交WordCount程序之前,先在HDFS集群中的准备一个文件用于做单词统计:

words.txt内容如下:

hello huangbo
hello xuzheng
hello wangbaoqiang

把该文件上传到HDFS文件系统中:

hadoop fs -mkdir -p /spark/wc/input
hadoop fs -put words.txt /spark/wc/input

在 Spark Shell 中提交 WordCount 程序:

sc.textFile("hdfs://hadoop10/spark/wc/input/words.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).saveAsTextFile("hdfs://hadoop10/spark/wc/output")

查询验证结果:

hadoop fs -ls hdfs://hadoop10/spark/wc/output
hadoop fs -cat hdfs://hadoop10/spark/wc/output/*

说明:

sc
.textFile("hdfs://hadoop10/spark/wc/input/words.txt")
.flatMap(_.split(""))
.map((_,1))
.reduceByKey(_+_)
.saveAsTextFile("hdfs://hadoop10/spark/wc/output")

1、sc
	是SparkContext对象,该对象时提交spark程序的入口
2、textFile("hdfs://hadoop10/spark/wc/input/words.txt")是从HDFS中读取数据
	底层是通过InputFormat去读取(因为数据在HDFS,从HDFS读取数据的规范就是InputFormat)
	RDD[String] (文件中的一行,就是RDD中的一条数据。)
3、flatMap(_.split(" "))
	先map,再压平,切割压平
	Array(Array("hello", "a"), Array("hello", "b")) =》 Array("hello", "a","hello", "b")
4、map((_,1))
	将单词和1构成元组(word,1)
5、reduceByKey(_+_)
	按照key进行reduce,并将value累加
6、saveAsTextFile("hdfs://hadoop10/spark/wc/output")
	将结果写入到HDFS对应输出目录中

3.2 Spark Submit

利用Spark自带的例子程序执行一个求PI(蒙特卡洛算法)的程序:

[root@hadoop10 bin]# ./spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://hadoop10:7077 \
--executor-memory 512m \
--total-executor-cores 2 \
/software/spark/examples/jars/spark-examples_2.12-3.1.2.jar \
100

在这里插入图片描述

运行结果:

在这里插入图片描述

理解:

执行通过/software/spark/bin/spark-shell来运行。
如果不设置master那么默认运行在本机节点;
如果设置 --master spark://hadoop10:7077,那么运行在Spark Standalone模式集群。

参数说明:

--class 指定 jar 包的主类

--master 指定jar包提交的模式
详解如下:
1、local
	本地模式,本地运行,可以调试(local 1个线程、local[*]不限线程、local[N] N个线程,理想情况下,N设置为你机器的CPU核数)
2、spark
	提交到Spark Standalone集群,有Master和Worker进程
3、mesos
	将jar包提交到mesos集群,运行在mesos资源管理器框架之上,由mesos负责资源管理,Spark负责任务调度和计算
4、YARN
	将jar包提交到yarn集群,运行在yarn资源管理器框架之上,由yarn负责资源管理,Spark负责任务调度和计算
5、cloud
	比如AWS的EC2,使用这个模式能很方便的访问Amazon的S3,Spark支持多种分布式存储系统,比如HDFS和S3

--deploy-mode 指定jar的运行方式(默认是 client 模式)
详解如下:
1、client 模式
	在提交端生成的JVM会一直等待所有计算过程全部完成才退出,它有两个功能,一个是提交,一个是监控jar包运行(测试环境下使用)
2、cluster 模式
	在提交端生成的JVM在提交jar包后会退出,它只有一个提交功能,然后在某一个 worker上会生成一个Driver的JVM,该JVM执行监控jar包运行的功能,等待所有代码运行完毕退出(生产环境下使用 )

application.jar 指定你的jar包的地址
arguments       传递给main()方法的参数

3.2.1 Spark Submit多种运行模式

1、提交任务到本地运行

/software/spark/bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master local[4] \
--driver-memory 512m \
--executor-memory 512m \
--total-executor-cores 1 \
/software/spark/examples/jars/spark-examples_2.12-3.1.2.jar \
10

2、提交任务到Spark集群运行

/software/spark/bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://hadoop10:7077 \
--driver-memory 512m \
--executor-memory 512m \
--total-executor-cores 1 \
/software/spark/examples/jars/spark-examples_2.12-3.1.2.jar \
10

3、提交Yarn集群,使用Yarn-Client模式

/software/spark/bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode client \
--driver-memory 512m \
--executor-memory 512m \
--total-executor-cores 1 \
/software/spark/examples/jars/spark-examples_2.12-3.1.2.jar \
10

3.3 修改Spark日志级别

3.3.1 临时修改

val sparkContext:SparkContext = new SparkContext(conf)
sparkContext.setLogLevel("WARN")

3.3.2 永久修改

可以通过修改Spark配置文件来Spark日志级别

以下是详细步骤的:

第一步:先进入conf目录
[root@hadoop10 conf]$ cd /software/spark/conf

第二步:准备log4j.properties
[root@hadoop10 conf]$ cp log4j.properties.template log4j.properties

第三步:配置日志级别:
把INFO改成你想要的级别:主要有ERROR, WARN, INFO, DEBUG几种

在这里插入图片描述

3.4 Spark的WordCount案例

3.4.0 spark-shell中的WordCount

[root@hadoop10 bin]# cd /software/spark/bin/
[root@hadoop10 bin]# pwd
/software/spark/bin
[root@hadoop10 bin]# spark-shell 
2021-11-09 16:57:03,855 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://hadoop10:4040
Spark context available as 'sc' (master = local[*], app id = local-1636448230277).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.1.2
      /_/
         
Using Scala version 2.12.10 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_202)
Type in expressions to have them evaluated.
Type :help for more information.

scala> sc.textFile("file:///home/data/wordcount.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).foreach(println)
(hadoop,1)                                                          (0 + 2) / 2]
(hbase,1)
(hello,3)
(world,1)
                                                                                
scala> 

3.4.1 Java7版本WordCount

package com.aa.sparkjava.core.wordcount;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;

import java.util.Arrays;
import java.util.Iterator;

/**
 * @Author AA
 * @Date 2021/11/25 16:46
 * @Project bigdatapre
 * @Package com.aa.sparkjava.core.wordcount
 * WordCountJava java7版本的编写
 */
public class WordCountJava7 {
    
    public static void main(String[] args){
    

        //一、参数判断
        if(args.length!=2){
    
            System.out.println("Usage:JavaWordCount7: Please enter the necessary parameters:<input><output>");
            System.exit(1);
        }

        //二、编程入口
        SparkConf conf = new SparkConf();
        conf.setMaster("local");
        conf.setAppName(WordCountJava7.class.getSimpleName());
        JavaSparkContext jsc = new JavaSparkContext(conf); 

        //三、读取数据,设置输入路径。
        JavaRDD<String> lineRDD = jsc.textFile(args[0]);

        //四、进行逻辑处理,切割压平
        JavaRDD<String> jrdd1 = lineRDD.flatMap(new FlatMapFunction<String, String>() {
    
            @Override
            public Iterator<String> call(String line) throws Exception {
    
                return Arrays.asList(line.split(" ")).iterator();
            }
        });

        //五、将四中的单词结果和1组合成元组
        JavaPairRDD<String, Integer> javaPairRDD = jrdd1.mapToPair(new PairFunction<String, String, Integer>() {
    
            @Override
            public Tuple2<String, Integer> call(String s) throws Exception {
    
                return new Tuple2<String, Integer>(s, 1);
            }
        });

        //六、 分组聚合  reduceByKey()  (a,b)=>a+b   前两个参数是输入参数类型,第三个参数:返回值的类型
        JavaPairRDD<String, Integer> result = javaPairRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
    
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
    
                return v1 + v2;
            }
        });

        //七、保存结果或者打印输出
        //打印输出
        result.foreach(new VoidFunction<Tuple2<String, Integer>>() {
    
            @Override
            public void call(Tuple2<String, Integer> tuple) throws Exception {
    
                System.out.println(tuple);
                //System.out.println(tuple._1 + " " + tuple._2);
            }
        });
        //保存结果
        //result.saveAsTextFile(args[1]);
        //八、释放资源
        jsc.close();
    }
}

3.4.2 Java8 Lambda表达式版本WordCount

package com.aa.sparkjava.core.wordcount;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;

import java.util.Arrays;

/**
 * @Author AA
 * @Date 2021/11/26 11:53
 * @Project bigdatapre
 * @Package com.aa.sparkjava.core.wordcount
 * WordCountJava java8版本  使用lambda表达式
 */
public class WordCountJava8 {
    
    public static void main(String[] args){
    

        if(args.length != 2){
    
            System.out.println("Please enter the necessary parameters:Usage Java8 Lambda WordCount<input><output>");
            System.exit(1);
        }

        SparkConf conf = new SparkConf();
        conf.setMaster("local");
        conf.setAppName(WordCountJava8.class.getSimpleName());
        JavaSparkContext jsc = new JavaSparkContext(conf);
        //1、读取数据
        JavaRDD<String> jrdd = jsc.textFile(args[0]);
        //2、切割压平
        JavaRDD<String> jrdd2 = jrdd.flatMap(t -> Arrays.asList(t.split(" ")).iterator());
        //3、和1组合 
        JavaPairRDD<String, Integer> jprdd = jrdd2.mapToPair(t -> new Tuple2<String, Integer>(t, 1));
        //4、分组聚合
        JavaPairRDD<String, Integer> res = jprdd.reduceByKey((a, b) -> a + b);
        //5、保存输出
        res.saveAsTextFile(args[1]);
        //6、释放资源
        jsc.close();

    }
}

3.4.3 Scala版本WordCount

package com.aa.sparkscala.core.wordcount

import org.apache.spark.rdd.RDD
import org.apache.spark.{
    SparkConf, SparkContext}

/**
 * @Author AA
 * @Date 2021/11/26 15:08
 * @Project bigdatapre
 * @Package com.aa.sparkscala.core.wordcount
 * Spark WordCount  Scala版本  使用老的API SparkContext
 */
object WordCountScala {
    
  def main(args: Array[String]): Unit = {
    
    //1、创建一个SparkConf对象,并设置程序的名称
    val conf = new SparkConf().setAppName("WordCount").setMaster("local")

    //2、创建一个SparkContext对象
    val sparkContext: SparkContext = new SparkContext(conf)

    //3、读取HDFS上的文件构建一个RDD
    val fileRDD: RDD[String] = sparkContext.textFile("D:\\input\\test1.txt")

    //4、构建一个单词RDD
    val wordAndOneRDD: RDD[(String, Int)] = fileRDD.flatMap(_.split(" ")).map((_, 1))

    //5、进行单词的聚合
    val resultRDD: RDD[(String, Int)] = wordAndOneRDD.reduceByKey(_ + _)

    //6、保存结果
    resultRDD.saveAsTextFile("D:\\output\\wordcountscala1")

    //7、关闭sc
    sparkContext.stop()
  }
}

3.4.4 补充API依赖

        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.12.14</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <version>3.1.2</version>
        </dependency>


声明:
        文章中代码及相关语句为自己根据相应理解编写,文章中出现的相关图片为自己实践中的截图和相关技术对应的图片,若有相关异议,请联系删除。感谢。转载请注明出处,感谢。


By luoyepiaoxue2014

B站: https://space.bilibili.com/1523287361 点击打开链接
微博地址: http://weibo.com/luoyepiaoxue2014 点击打开链接

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/luoyepiaoxue2014/article/details/128076088

智能推荐

索引+事务_唯一键索引冲突是inster时产生的还是commit的时候-程序员宅基地

文章浏览阅读296次。这里写目录标题事务的ACID特性数据库的事务是什么?隔离性(Isolation)概念说明脏读可重复读不可重复读:一个事务A,不同时刻读同一数据可能不一样幻读:事务A,B同时Insert,B先提交,A没有察觉InnoDB支持的事务隔离级别MySQL 中执行事务1.读未提交:不加锁2.读提交:解决update/inerst引入的脏读,无法做到可重复读3.可重复读:解决update引入的不可重复读,不能解决Insert引入的幻读总结:MySQL 中是如何实现事务隔离的:可重复读可重复读——多版本——针对一条事务可_唯一键索引冲突是inster时产生的还是commit的时候

QT5.11 for mac使用教程_qt for mac生成的.app应用怎么变成非.app应用-程序员宅基地

文章浏览阅读2.8k次。qt介绍 qt是一套完整到跨平台解决方案古老但高效。主要是用c++语言 qt 有自己的sdk以及编辑器,就像vs一样。到官网下载最新到qt版本。安装打包一篇比较解决问题的打包文章 还有这个 1,将依赖库添加到导出到releaseapp中。也就是说releaseapp默认没有依赖库。 命令: ./Qt安装目录/clang_64/bin/macdeployqt re..._qt for mac生成的.app应用怎么变成非.app应用

[导入]《中国式青春》原始硬盘版[今何在]-程序员宅基地

文章浏览阅读76次。没有人会想到,地球的命运,会因为十二个小时而改变。当氪星球面临毁灭,他们把这个孩子装入太空舱,送向宇宙。这小小的太空舱在宇宙中漂流,穿越一个又一个星云,那在宇宙中飘动了万年的光流风暴和物质云把这个孩子他推向他的终点,太阳系——地球——北美洲——美国——堪萨斯州——莫维尔小镇。如果一切都没有偏差,如果每一颗恒星的引力都恰到好处,他会变成美国英雄、传奇的超人。但是,宇宙间一颗计算外的只有三万之分一立方..._中国式青春今何在下载

MFC3 基本对话框的使用(三) 滑块与进度条_setticfreq-程序员宅基地

文章浏览阅读1.5k次,点赞3次,收藏16次。要求:将滑块与编辑框、进度条相连接。调整滑块位置同时显示滑块当前对应数值,达到设定要求时改变进度条的进度。一、界面设计滑块是slider control,进度条是progress control对于三个滑块,修改属性:对于三个示例编辑框,修改属性:二、添加变量三、初始化滑块和进度条在Dlg.cpp中找到初始化函数BOOL COOPEx3Dlg::OnI..._setticfreq

240320俄罗斯方块java,JAVA游戏编程之三----j2me 手机游戏入门开发--俄罗斯方块_2-程序员宅基地

文章浏览阅读202次。packagecode;//importjava.awt.*;//importjava.awt.Canvas;//importjava.awt.event.*;//importjavax.swing.*;importjava.util.Random;importjavax.microedition.lcdui.*;//写界面所需要的包/***//***俄罗斯方块*高雷*2007年1..._240×320java游戏

在线电影院售票平台(源码+开题报告)-程序员宅基地

文章浏览阅读779次,点赞14次,收藏19次。然后,实现系统的数据管理和服务功能,包括用户的注册与登录、电影的分类与展示、电影信息的查询与推荐、座位的选择与预订、在线支付与电子票生成等。此外,随着在线视频平台的兴起,越来越多的人选择在线观看电影,这对传统电影院产生了巨大的冲击。研究意义: 开发在线电影院售票平台对于提升用户的观影体验、优化电影院的运营效率、促进电影产业的发展具有重要的意义。该系统旨在通过技术手段解决传统电影院售票中的问题,提供一个集成化的电影信息展示、座位选择、在线支付和用户评价平台,同时也为电影院和电影制作方提供有效的工具。

随便推点

Apache Lucene 8.0.0 发布,Java 全文搜索引擎-程序员宅基地

文章浏览阅读239次。开发四年只会写业务代码,分布式高并发都不会还做程序员? >>> Lucene PMC 宣布推出 Ap..._lucene 8.0.0

java趣事_【趣事】Java程序员最年轻,C++程序员最年老-程序员宅基地

文章浏览阅读87次。原标题:【趣事】Java程序员最年轻,C++程序员最年老说起我们对编程世界现有的刻板印象,你一定听说过类似于没有人喜欢用Java编码或者使用C ++都是老人家,等等这样的话。为了分析这些刻板印象背后的真相,Trestle Technology的数据工程师写了一个工具。不知道你有没有听说过微软的Project Oxford,它的Face API可以检测图像中的人脸,并检测这个人是否在笑,他/她的性别..._胡须程序

用什么软件测试内存条稳定,使用内存条检测工具监测内存稳定性,内存条检测工具有哪些...-程序员宅基地

文章浏览阅读1.2w次。内存从某种意义上来说对于电脑的运行会产生着基础性的影响,所以我们需要经常的检测一下我们电脑的稳定性,那么下载一款内存条监测工具对我们来说就非常的有必要了,现在的内存条检测工具有很多,那么我们应该如何选择一款合适的内存条监测工具呢,接下来就为大家介绍一下。【鲁大师】这是一款综合的电脑检测软件,不仅仅可以对我们电脑内存当中的内存进行检测,还可以对我们的电脑系统的方方面面都进行监测,比如说我们的内存占比..._怎么测试内存稳定性

Harmonyos 自定义下拉列表框(select)_harmonyos 下拉列表-程序员宅基地

文章浏览阅读7.8k次。自定义一个下拉列表框,当这个功能有效时,点击可弹出下拉框,选中某个选项后,在左边功能名称下面显示选项值,右边的箭头替换成自定义图标,例如手法功能;当功能无效时,置灰,如力度功能;具体示例如下:代码如下:index.hml<!--手法无效时--><div class="fun-grid-item" if="{{manualInvalid}}"> <div class="grid-item-parent-ver._harmonyos 下拉列表

VBA入门到进阶常用知识代码总结44_msofalse-程序员宅基地

文章浏览阅读1k次。第44集 图片与图形处理198、 Shape对象的类型和属性该对象代表工作表或图形工作表上的所有图形,它是sheets和chart的子对象(属性)。Sheet1.ShapesSub t2()On Error Resume NextDim ms As Shapek = 1For Each ms In Sheet1.Shapesk = k + 1Cells(k, 1) = ms.Na..._msofalse

公司个人年终工作总结【10篇】_csdn 公司 年终终结-程序员宅基地

文章浏览阅读1.2k次。公司个人年终工作总结1 20__年即将过去,在公司领导的悉心关怀下和同事们的帮助指导下,结合我自身的努力,在工作、学习等各方面都取得了长足的进步,尤其是在保险理赔专业知识和技能培养方面的成熟,使我成为一名合格的车险查勘定损员。随着工作岗位的调整,我已经成长为为一名能够独立工作、业务熟练的前台工作人员。现将一年来的工作情况向公司领导总结汇报如下: 一、加强理论学习,注重个人素质提高 加强自身业务学习,争做理赔标兵。在日常的工作学习中,我坚持学习更多的保险知识和业务技能,在老同志的“传帮带”下,不断加强个_csdn 公司 年终终结

推荐文章

热门文章

相关标签