首页 > 电脑常识 > 云计算

Spark大数据技术与应用

admin 云计算 2021-04-26 15:57:07 spark   大数据  
后台-系统设置-扩展变量-手机广告位-内容正文底部

第一章

1.Spark是什么

概念

Spark是一个大规模数据处理的统一分析引擎。

特点

迅速、通用、易用、支持多种资源管理器

迅速

Spark用十分之一的计算资源,获得了比Hadoop快3倍的速度。

通用

可以用Spark进行sql查询、流式计算、机器学习、图计算。

易用

支持多种编程语言API,包括Java、Scala、Python、R

支持多种支援管理器

Spark可以使用单机集群模式来运行,也可以在Hadoop YARN、Apache Mesos、Kubernates上运行,或者在“云”里运行。

Spark可以访问HDFS、Alluxio、Apache Cassandra、Apache HBase、Apache Hive等上百种数据源。

Spark与Hadoop

区别与联系

解决问题的方式不一样

Hadoop是分布式数据设施。

Spark只是一个专门的工具,不会进行分布式数据的存储。

两者可合可分

Hadoop可用自身的MapReduce来代替Spark

Spark可不依赖Hadoop,而选择其他基于云的数据系统平台。

Spark相对于MapReduce的优势

中间结果输出

Hadoop:两步计算、磁盘存储

Spark:多步计算、内存存储

数据格式和内存布局

Hadoop:使用HDFS

Spark:使用RDD

误区!!!

1.Spark是基于内存的技术

大多数的人会认为Spark都是基于内存的计算的,但是基于如下两个情况,Spark会落地于磁盘

  1. Spark避免不了shuffle

  2. 如果数据过大(比服务器的内存还大)也会落地于磁盘

参考链接

2.Spark要比Hadoop快 10x-100x

在比较短的作业确实能快上100倍,但是在真实的生产环境下,一般只会快 2.5x ~ 3x!

3.Spark的存在将代替Hadoop

目前备受追捧的Spark还有很多缺陷,比如:

  1. 稳定性方面,由于代码质量问题,Spark长时间运行会经常出错,在架构方面,由于大量数据被缓存在RAM中,Java回收垃圾缓慢的情况严重,导致Spark性能不稳定,在复杂场景中SQL的性能甚至不如现有的Map/Reduce。

  2. 不能处理大数据,单独机器处理数据过大,或者由于数据出现问题导致中间结果超过RAM的大小时,常常出现RAM空间不足或无法得出结果。然而,Map/Reduce运算框架可以处理大数据,在这方面,Spark不如Map/Reduce运算框架有效。

  3. 不能支持复杂的SQL统计;目前Spark支持的SQL语法完整程度还不能应用在复杂数据分析中。在可管理性方面,SparkYARN的结合不完善,这就为使用过程中埋下隐忧,容易出现各种难题。

参考链接

用途

推荐系统

实时日志系统

快速查询系统

定制广告系统

用户图计算系统

2.Spark的生态系统

生态系统

在这里插入图片描述

Spark Core

Spark Core提供Spark SQL、Spark Streaming、MLlib、GraphX四大模块,进行离线计算,产生RDD弹性分布式数据集。

Spark SQL && DataFrame

Spark SQL是一种结构化的数据处理模块。

DataFrame是Spark SQL提供的一个编程抽象,相当于一个列数据的分布式的采集组织,在关系数据库或R/Python中的概念相当于一个表。

Spark Streaming

Spark Streaming处理实时数据流并容错。

MLIib

MLlib是Spark提供的可扩展的机器学习库

MLlib提供的API主要分为以下两类:

  • spark.mllib包提供主要API
  • spark.ml包提供构建机器学习工作流的高层次API

GraphX

GraphX是Spark面向图计算提供的框架与算法库

3.Spark的架构与原理

常见术语

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

架构设计

在这里插入图片描述

作业运行流程

在这里插入图片描述在这里插入图片描述

核心原理

4.Spark 2.X新特性

2.x对比1.x

2.x基本上是基于1.x进行了更多的功能和模块的扩展以及性能的提升:

  1. 引入很多优秀特性,性能上有较大提升,API更易用
  2. 实现了离线计算和流计算API的统一
  3. 实现了Spark SQL和 Hive SQL操作API的统一

新特性

1.精简的API

  1. 统一DataFrame和Dataset接口为datasets
  2. 新增SparkSession入口,统一旧的SQLContext与HiveContext
  3. 支持SQL 2003标准,支持子查询,Spark SQL性能有2-10倍的提升

2.搭载了第二代引擎

主要思想:在运行时使用优化后的字节码,将整体查询合成为单个函数,不再使用虚拟函数调用,而是利用CPU来注册中间数据。

3.智能化程度

  1. Structured Streaming引入了低延迟的连续处理
  2. 通过改善Pandas UDFs的性能来提升PySpark
  3. 支持第四种调度引擎 Kubernetes Clusters
  4. 支持 Stream-to-stream Joins

第二章

1.Spark环境搭建

2.Spark集群启动与关闭

Spark运行模式

  1. 在mesos或者yarn集群管理器上部署运行
  2. 在standalone和local的模式下部署运行

启动

start-all.sh(已设置好环境变量)

关闭

stop-all.sh(已设置好环境变量)

3.Spark应用提交到集群

spark-submit                 //提交任务命令
--master spark://master:7077 //提交集群的地址
--deploy-mode client         //部署模式为client模式
--executor-memory 512M       //设置每个执行单元使用512Mb的内存空间
--total-executor-cores 4     //每个执行单元为4个核
demo.py                      //实际提交的应用程序,具体以实际为准

第三章

1.Python编程语言

不用多说。。。

2.Pyspark启动与日志设置

PySpark启动

local、standalone、yarn、mesos

以local模式启动

pyspark --master local[4]

以Yarn模式启动

pyspark --master yarn-client

以Standalone模式启动

pyspark --master spark://Spark:7077

以Mesos模式启动

pyspark --master mesos://Mesos:7077

日志设置

日志级别包括:ALL,DEBUG,ERROR,FATAL,INFO,OFF,TRACE,WARN

控制日志输出内容的方式有两种:

  1. 修改log4j.properties,默认控制台输出INFO及以上级别信息

    log4j.rooCategory=INFO,console

  2. 代码中使用setLogLevel(logLevel)控制日志输出

from pyspark import SparkContext
sc = SparkContext("local", "First App")
sc.setLogLevel("WARN")

3.PySpark开发

就是安装环境,编译器可以用Anaconda,Jupyter notebook,pycharm,pyspark是一个python的第三方库,可以通过pip安装,但是如果安装了Spark包,bin目录里会包含pyspark

第四章

1.RDD简介

几个问题

RDD是什么?

  1. 弹性分布式数据集
  2. 只读的、分区记录的集合
  3. 只能基于在稳定物理存储中的数据集和其他已有的RDD上执行确定性操作来创建

什么是弹性?

  1. RDD可以在内存和磁盘之间手动或自动切换
  2. RDD可以通过转换成为其他的RDD
  3. RDD可以存储任意类型的数据

存储的内容?

初代RDD:真实数据的分区信息,单个分区的读取方法

子代RDD:初代RDD产生子代RDD的原因(动作),初代RDD的引用

数据读取发生在什么时候?

task在executor上运行的时候

五个主要属性

分区信息(Partition)数据集的基本组成单位Compute函数对于给定的数据集,需要做哪些计算Partitioner函数对于计算出来的数据结果如何分发优先位置列表对于data partition的位置偏好依赖关系描述了RDD之间的Lineage

创建RDD

下面代码都是Python API,使用pyspark

基于外部数据源创建

distFile = sc.textFile("file:///FILE_TO_PATH")
#textFile支持从多种源创建RDD,如hdfs://,s3n://
distFile.count()
#计算文本的行数

基于数据集合创建

data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data) #通过并行化创建RDD
#parallelize可以传入分片个数参数,否则采用defaultParallelism
distData.count() #返回RDD中元素的个数

RDD操作

两种算子(Operation)

转换(transformation)在一个已存在的RDD上创建一个新的RDD,但实际的计算并没有执行,仅仅记录操作过程行动(action)执行RDD记录的所有运行transformations操作,并计算结果,结果可返回到driver程序

如何区分?

  1. transformation算子一定会返回一个rdd
  2. Action有的没有返回值,也可能有返回值,但是一定不是rdd

2.RDD算子

Transformation算子-Value型

map(f, preservesPartitioning=False)

通过对这个RDD的每个元素应用一个函数来返回一个新的RDD。

>>> rdd = sc.parallelize(['b', 'a', 'c'])
>>> sorted(rdd.map(lambda x: (x, 1)).collect())
[('a',1), ('b',1), ('c',1)]

flatMap(f, preservesPartition=False)

将函数应用于该RDD的所有元素,然后将结果平坦化(压扁),从而返回新的RDD。

>>> rdd = sc.parallelize([2, 3, 4])
>>> rdd2 = rdd.map(lambda x: range(1, x))
>>> rdd2.collect()
[[1], [1, 2], [1, 2, 3]]
>>> rdd1 = rdd.flatMap(lambda x: range(1, x))
>>> rdd1.collect()
[1, 1, 2, 1, 2, 3]
flatMap与map的区别:
map映射flatMap先映射,后扁平化––map对每一次(func)都产生一个对象,分别产生一个列表flatMap多一步,最后会将所有对象合并为一个列表返回

mapPartitions(f, preservesPartitioning=False)

它的输入函数应用于每个分区,也就是把每个分区中的内容作为整体来处理的

>>> rdd = sc.parallelize([1, 2, 3, 4], 2)
# 上面第二个参数是分区数,所以分成了[1, 2]和[3, 4]。
# 不管分区数为多少,都是取下界。比如上面假如分区数为3,则界限分别在4/3和8/3,取下界则分成[1], [2], [3, 4]。
>>> def f(iterator): yield sum(iterator)
>>> rdd.mapPartitions(f).collect()
[3, 7]

mapPartitionsWithIndex(f, preservesPartitioning=False)

与mapPartitions的区别在于mapPartitionsWithIndex中传入的函数要求接收两个参数
第一个参数为分区编号
第二个为对应分区的元素组成的迭代器

>>> rdd = sc.parallelize([1, 2, 3, 4], 4) # [1] [2] [3] [4]
>>> def f(splitIndex, iterator): yield splitIndex
>>> rdd.mapPartitionsWithIndex(f).sum()
6 # 0+1+2+3

filter(f)

对每个元素应用f函数,返回值为true的元素在RDD中保留,返回值为false的元素将被过滤掉。

>>> rdd = sc.parallelize([1, 2, 3, 4, 5])
>>> rdd.filter(lambda x: x % 2 == 0).collect()
[2, 4]

distinct(numPartitions=None)

将RDD中的元素进行去重操作

>>> rdd = sc.parallelize([1, 1, 2, 3])
>>> rdd.distinct().collect()
[1, 2, 3]

union(other)

合并两个RDD,结果中包含两个RDD中的所有元素

>>> rdd1 = sc.parallelize([1, 2, 3, 4])
>>> rdd2 = sc.parallelize([5, 6, 7, 8])
>>> rdd1.union(rdd2).collect()
[1, 2, 3, 4, 5, 6, 7, 8]

intersection(other)

返回这个RDD和另一个RDD的交集,输出将不包含任何重复的元素

>>> rdd1 = sc.parallelize([1, 10, 2, 3, 4, 5])
>>> rdd2 = sc.parallelize([1, 6, 2, 3, 7, 8])
>>> rdd1.intersectioni(rdd2).collect()
[1, 2, 3]

substract(other)

返回RDD1中出现,但是不在RDD2中出现的元素,不去重

>>> rdd1 = sc.parallelize([('a', 1), ('b', 4), ('b', 5), ('a', 3)])
>>> rdd2 = sc.parallelize([('a', 3), ('c', None)])
>>> rdd1.subtract(rdd2).collect()
[('a', 1), ('b', 4), ('b', 5)]

sortBy(K, ascending=True, numPartitions=None)

根据指定的Key进行排序

>>> tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
>>> sc.parallelize(tmp).sortBy(lambda x: x[0]).collect()
[('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]
>>> sc.parallelize(tmp).sortBy(lambda x: x[1]).collect()
[('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]

Transformation算子-Key-Value型

mapValues(f)

针对(Key, Value)型数据中的Value进行Map操作,而不对Key进行处理。

>>> rdd = sc.parallelize([('a', 1), ('b', 2), ('c', 3)])
>>> rdd.mapValues(lambda value: value + 2).glom().collect() # glom()将同一分区的元素合并到一个列表里
[[('a', 3), ('b', 4), ('c', 5)]]

flatMapValues(f)

完成mapValues处理后,再对结果进行扁平化处理。

>>> rdd = sc.parallelize([('a', ['x', 'y']), ('b', ['p', 'r'])])
>>> rdd.flatMapValues(lambda x: x).collect()
[('a', 'x'), ('a', 'y'), ('b', 'p'), ('b', 'r')]

reduceByKey(func, numPartitions=None, partitionFunc=portable_hash)

相同Key值的value值进行对应函数运算,类似于hdp得combiner操作。

>>> from operator import add
>>> rdd = sc.parallelize([('a', 1), ('b', 2), ('a', 3)])
>>> rdd.reduceByKey(add).collect()
[('a', 4), ('b', 2)]

groupByKey(numPartitions=None, partitionFunc=portable_hash)

将Pair RDD中相同Key的值放在一个序列中

>>> rdd = sc.parallelize([('a', 1), ('b', 1), ('a', 1)])
>>> rdd.groupByKey().mapValues(len).collect()
[('a', 2), ('b', 1)]
>>> rdd.groupByKey().mapValues(list).collect()
[('a', [1 1]), ('b', [1])]

sortByKey(ascending=True, numPartitions=None, keyfunc=lambda x:x)

根据key值进行排序,默认升序

>>> tmp = [('a', 1), ('B', 2), ('1', 3), ('d', 4)]
>>> sc.parallelize(tmp).sortByKey()
[('1', 3), ('B', 2), ('a', 1), ('d', 4)]
>>> sc.parallelize(tmp).sortByKey(True, None, keyfunc=lambda k: k.lower()).collect()
[('1', 3), ('a', 1), ('B', 2), ('d', 4)]

keys()

返回一个仅包含键的RDD

>>> m = sc.parallelize([(1, 2), (3, 4)]).keys()
>>> m.collect()
[1, 3]

values()

返回一个仅包含值的RDD

>>> m = sc.parallelize([(1, 2), (3, 4)]).values()
>>> m.collect()
[2, 4]

joins(rdd)

可以将两个RDD按照相同的Key值join起来

>>> x = sc.parallelize([('a', 1), ('b', 4)])
>>> y = sc.parallelize([('a', 2), ('a', 3)])
>>> x.join(y).collect()
[('a', (1, 2)), ('a', (1, 3))]

leftOuterJoin(rdd)

左外连接,与SQL中的左外连接一致

>>> x = sc.parallelize([('a', 1), ('b', 4)])
>>> y = sc.parallelize([('a', 2)])
>>> x.leftOuterJoin(y).collect()
[('a', (1, 2)), ('b', (4, None))]

rightOuterJoin(rdd)

右外连接,与SQL中的右外连接一致

>>> x = sc.parallelize([('a', 1), ('b', 4)])
>>> y = sc.parallelize([('a', 2)])
>>> x.rightOuterJoin(y).collect()
[('a', (1, 2))]

Action算子

collect()

返回RDD中的所有元素。

>>> sc.parallelize([1, 2]).collect()
[1, 2]

count()

返回RDD中的所有元素的个数。

>>> sc.parallelize([1, 2]).count()
2

reduce(f)

通过指定的聚合方法来对RDD中元素进行聚合。

>>> from operator import add
>>> sc.parallelize([1, 2, 3 ,4 ,5]).reduce(add)
15
>>> sc.parallelize([]).reduce(add)
Traceback (most recent call last):
ValueError: Can not reduce() empty RDD

take(num)

从RDD中返回前num个元素的列表

>>> sc.parallelize([4, 6, 8, 2, 9]).take(2)
[4, 6]
>>> sc.parallelize([4, 6, 8, 2, 9]).take(10)
[4, 6, 8, 2, 9]

takeOrdered(num)

从RDD中返回前num个最小的元素的列表,结果默认升序排列

>>> sc.parallelize([4,6,8,2,9]).takeOrdered(2)
[2, 4]
>>> sc.parallelize([4,6,8,2,9]).takeOrdered(10)
[2, 4, 6, 8, 9]

first()

从RDD中返回第一个元素

>>> sc.parallelize([2,3,4,5,6]).first()
2

top(num, key=None)

从RDD中返回最大的前num个元素列表,结果默认降序排列
如果Key参数有值,则先对各元素进行对应处理
注:会把所有数据都加载到内存,所以该方法只有在数据很小时使用

>>> sc.parallelize([10,4,2,12,3]).top(1)
[12]
>>> sc.parallelize([2,3,4,5,6],2).top(2)
[6, 5)
>>> sc.parallelize([10, 4, 2, 12, 3]).top(4, key=str)
[4, 3, 2, 12]

foreach(f)

遍历RDD的每个元素,并执行f函数操作,无返回值

>>> def f(x): print(x)
>>> sc.parallelize([1, 2, 3, 4, 5]).foreach(f)
1
2
3
4
5

foreachPartition(f)

对每个分区执行f函数操作,无返回值

>>> def f(iterator):
...		s = sum(iterator)
...		print(s)
>>> sc.parallelize([1,2,3,4,5],3).foreachPartition(f) # 1 2+3+4 5
1
9
5	

saveAsTextFile(path, compressionCodecClass=None)

将RDD中的元素以字符串的格式存储在文件系统中。

>>> rdd = sc.parallelize(['foo', 'bar'], 2)
>>> rdd.saveAsTextFile('/home/...')
>>> rdd.saveAsTextFile('hdfs://host:8020/...')

collectAsMap()

以字典形式,返回PairRDD中的键值对。如果key重复,则后面的value覆盖前面的。

>>> rdd = sc.parallelize([(1, 2), (3, 4)])
>>> rdd.collectAsMap()
{1: 2, 3: 4}
>>> rdd = sc.parallelize([(1, 2), (3, 4), (1, 4)])
{1: 4, 3: 4}

countByKey()

以字典形式,返回PairRDD中key值出现的次数

>>> rdd = sc.parallelize([('a', 1), ('b', 1), ('a', 1)])
>>> rdd.countByKey()
[('a', 2), ('b', 1)]

3.共享变量

累加器

accumulator:一个全局共享变量, 可以完成对信息进行聚合操作。

counter = sc.accumulator(0)
rdd = sc.parallelize(range(10))
def increment(x):
	global counter
	counter += x
rdd.foreach(increment)
print("Counter value: ", counter.value)

# Counter value: 45

注意事项!!!

  1. 累加器在Driver端定义赋初始值,累加器只能在Driver端读取最后的值,在Excutor端更新。
  2. 累加器不是一个调优的操作,因为如果不这样做,结果是错的。

广播变量

Spark1.x:HttpBroadcast、TorrentBroadcast
Spark2.x:TorrentBroadcast、TorrentBroadcast:点到点的传输,有效避免单点故障,提高网络利用率,减少节点压力。
Broadcast:

  • 一个全局共享变量,可以广播只读变量。
  • 一般用于处理共享配置文件,通用的数据子,常用的数据结构等等;不适合存放太大的数据
  • 不会内存溢出,因为其数据的保存的 Storage Level 是 MEMORY_AND_DISK 的方式
>>> b = sc.broadcast([1,2,3,4,5])
>>> b.value
[1, 2, 3, 4, 5]
>>> sc.parallelize([0, 0]).flatMap(lambda x: b.value).collect()
[1, 2, 3, 4, 5, 1, 2, 3, 4, 5]
>>> b.unpersist()
# 空

注意事项!!!

  1. 能不能将一个RDD使用广播变量广播出去?
    不能,因为RDD是不存储数据的。可以将RDD的结果广播出去。
  2. 广播变量只能在Driver端定义,不能再Executor端定义。
  3. 在Driver端可以修改广播变量的值,在Executor端无法修改广播变量的值。
  4. 如果Executor端用到了Driver的变量,如果不使用广播变量在Executor有多少task就有多少Driver端的变量副本。
  5. 如果Executor端用到了Driver的变量,如果使用广播变量在每个Executor中只有一份Driver端的变量副本。

4.RDD依赖关系

  • RDD只能基于在稳定物理存储中的数据集和其他已有的RDD上执行确定性操作来创建。
  • 能从其他RDD通过确定操作创建新的RDD的原因是RDD含有从其他RDD衍生(计算)出本RDD的相关信息(即血统,Lineage)
  • Dependency代表了RDD之间的依赖关系,分为窄依赖和宽依赖

窄依赖

------------------------------------------------------未完待续-----------------------------------------------------------

文章来源:https://blog.csdn.net/Hackeryuan/article/details/116099307

后台-系统设置-扩展变量-手机广告位-内容正文底部
版权声明

本文仅代表作者观点,不代表本站立场。
本文系作者授权发表,未经许可,不得转载。
本文地址:https://jcdi.cn/diannaochangshi/yunjisuan/695.html

留言与评论(共有 0 条评论)
   
验证码:
后台-系统设置-扩展变量-手机广告位-评论底部广告位

教程弟

https://www.jcdi.cn/

统计代码 | 京ICP1234567-2号

Powered By 教程弟 教程弟

使用手机软件扫描微信二维码