本文共 7974 字,大约阅读时间需要 26 分钟。
部分参考《hadoop大数据实战手册-精英版》-有很多命令查找语句
详细: https://blog.csdn.net/luanpeng825485697/article/details/80319552 hadoop分为几大部分:yarn负责资源和任务管理、hdfs负责分布式存储、map-reduce负责分布式计算 YARN总体上仍然是master/slave(主从)结构HDFS 部分由NameNode、SecondaryNameNode和DataNode组成。
HDFS 文件的大小可以大于网络中任意一个磁盘的容量,文件的所有块并不需要存 储在一个磁盘上,因此可以利用集群上任意一个磁盘进行存储,由于具备这种分布式存 储的逻辑,所以可以存储超大的文件,通常 G、T、P 级别 2) 一次写入,多次读取 一个文件经过创建、写入和关闭之后就不需要改变,这个假设简化了数据一致性的问题,同时提高数据访问的吞吐量 HDFS 是一个主/从(Master/Slave)体系架构,由于分布式存储的性质,集群拥有两 类节点 NameNode 和 DataNode NameNode(名字节点):系统中通常只有一个,中心服务器的角色,管理存储和检索 多个 DataNode 的实际数据所需的所有元数据。 SecondaryNameNode是它的从节点,以防挂掉 。 DataNode(数据节点):系统中通常有多个*,是文件系统中真正存储数据的地方*,在 NameNode 统一调度下进行数据块的创建、删除和复制。运行slave节点。2在 HDFS 中,文件数据是被复制多份的,所以计算将会选 择拥有此数据的最空闲的节点。
MapReduce 计算框架中负责真正计算任务的 TaskTracker对应到HDFS 的DataNode 的角色,一个负责计算,一个负责管理存储数据。 Map 阶段和 Reduce 阶段,也称为映射和缩减阶段,这两个独立的阶段实际上是两个独 立的过程,即 Map 过程和 Reduce 过程,在 Map 中进行数据的读取和预处理,之后将预 处理的结果发送到 Reduce 中进行合并map 分割统计:首先将数据输入到HDFS,再交给map,
reduce 针对 map 阶段的数组进行汇总处理,map 到 reduce 过程中默认存在 shuffle partition 分组机制,保证同一个 word 的记录,会连续传输到 reduce 中,所以在 reduce 阶段只需要对连续相同的 word 后面的技术进行累加求和即可。reduce再将完成的数据输出回HDFS 首先,将数据输入到HDFS,再交给map,map对每个单词进行统计汇总处理 在map之后reduce之前进行排序shuffle ,分组1,读取输入文件内容,解析成key、value对。对输入文件的每一行,解析成key、value对。每一个键值对调用一次map函数。
2,写自己的逻辑,对输入的key、value处理,转换成新的key、value输出。 3,对输出的key、value进行分区 4、对不同分区的数据,按照key进行排序、分组。相同key的value放到一个集合中。 5、(可选)分组后的数据进行归约。1,对多个map任务的输出,按照不同的分区,通过网络copy到不同的reduce节点。
2,对多个map任务的输出进行合并、排序。写reduce函数自己的逻辑,对输入的key、value处理,转换成新的key、value输出。 3、把reduce的输出保存到文件中。Shuffle 过程是指 Mapper 产生的直接输出结果,经过一系列的处理,成为最终的
Reducer 直接输入数据为止的整个过程。这是 mapreduce 的核心过程。该过程可以分为两 个阶段 1、Mapper 端的 Shuffle:由 Mapper 产生的结果并不会直接写入到磁盘中,而是先存 储在内存中,当内存中的数据量达到设定的阀值时,一次性写入到本地磁盘中。并同时进行 sort(排序)、combine(合并)、partition(分片)等操作。其中,sort 是把 Mapper 产 生的结果按照 key 值进行排序;combine 是把 key 值相同的记录进行合并;partition 是把 数据均衡的分配给 Reducer。 R 2、reducer 端的 Shuffle:由于 Mapper 和 Reducer 往往不在同一个节点上运行,所以 Reducer 需要从多个节点上下载 Mapper 的结果数据,并对这些数据进行处理,然后才能 被 Reducer 处理。Zookeeper 是一种分布式的,开源的,应用于分布式应用的协作服务。它提供了一些 简单的操作,使得分布式应用可以基于这些接口实现诸如同步、配置维护和分集群或者命名 的服务
是一个nosql数据库,和mongodb类似
相比hbase,HDFS 适合批处理场景,但不支持数据随机查找,不适合增量数据处理,不支持数 据更新 HBase – Hadoop Database,是一个高可靠性、高性能、面向列、可伸缩的分布式 存储系统,利用 HBase 技术可在廉价 PC Server 上搭建起大规模结构化存储集群。 HBase 以表的形式存储数据。表由行和列族组成。列划分为若干个列族一个数据仓库工具,可以将结构化的数据文件映射为一张数据 库表,并提供完整的 sql 查询功能,Hive 定义了简单的类 SQL 查询语言,称为 HQL。
允许熟悉 SQL 的用户查询数据可以将 sql 语句转换为 MapReduce 任务进行运行,不必 开发专门的 MapReduce。毕竟会写 SQL 的人比写 JAVA 的人多,这样可以让一大批运营 人员直接获取海量数据。在数据仓库建设中,HIVE 灵活易用且易于维护,十分适合数据仓 库的统计分析。使用Hive,就不用去写MapReduce,而是写sql语句就行了。
Hadoop 是 Apache 的一个项目,是一个能够对大量数据进行分布式处理的软件框架。 Storm 是 Apache 基金会的孵化项目,是应用于流式数据实时处理领域的分布式计算 系统。
在 Hadoop 生态圈中,针对大数据进行批量计算时,通常需要一个或者多个 MapReduce 作业来完成,但这种批量计算方式是满足不了对实时性要求高的场景。 Storm 是一个开源分布式实时计算系统,它可以实时可靠地处理流数据。,会自动 地在集群机器上并发地处理流式计算,让你专注于实时处理的业务逻辑。高效(比MapReduce快10~100倍)
1)内存计算引擎,提供Cache机制来支持需要反复迭代计算或者多次数据共享,减少数据读取IO开销 2)DAG引擎,减少多次计算之间中间结果写到HDFS的开销 3)使用多线程池模型来减少task启动开稍,shuffle过程中避免不必要的sort操作以及减少磁盘IO操作 易用 1)提供了丰富的API,支持Java,Scala,Python和R四种语言 2)代码量比MapReduce少2~5倍 与Hadoop集成 读写HDFS/Hbase 与YARN集成 ———————————————— 原文链接:https://blog.csdn.net/luanpeng825485697/article/details/80319552主要有Spark SQL、Spark Streaming、MLlib、GraphX。
Driver-master-slave(worker)-执行Executor和task负责并行计算
1.一般来讲我们编写的Spark程序就是在Driver上由Driver进程执行Driver进程启动以后就会做一些初始化操作,在这个过程中,就会发送请求到Master上进行Spark应用程序的注册,其实就是告诉Master,有一个新的Spark程序要跑起来, 2.一般来讲学习的集群为三个,其中**第一个成为Master,对,就是*第一个节点Master其实就是调度资源和分盘,*还有就是集群的监控,**还有一些其他的操作! Master在接到Spark程序申请以后会发送请求给从节点也就是slave在这里用worker称呼,进行资源的分配,3.第一个worker主要职责是使用自己的内存,存储RDD某些partition,
第二个就是启动其他进程和线程对RDD上面的partition进行处理和计算, worker在接收到Master的请求后会启动Executor进程 4.Executor进程里面包含Task线程,这两货主要就是负责并行计算的,比如申请的RDD,partition,还有就是一些算子,比如:map,flatmap,reduce,executor接受到请求以后就会调用多个Task节点进行执行。task就会对RDDpartition数据执行指定的算子操作,形成新的RDD分区 5.Executor进程启动以后会向Driver进行反注册,这样Driver就知道哪些Executor是为它服务的了. 6.Driver注册了Executor进程以后就会开始执行我们提交的Spark应用程序了,第一步就是创建RDD,然后就是读取数据源, 7.hdfs的内容可能会被读取到多个worker节点上面,形成内存中 的分布式数据库,也就是RDD。Local[N]:本地模式,使用 N 个线程。
Local Cluster[Worker,core,Memory]:伪分布式模式,可以配置所需要启动的虚拟工作节点的数量,以及每个工作节点所管理的 CPU 数量和内存尺寸。 Spark://hostname:port:Standalone 模式,需要部署 Spark 到相关节点,URL 为 Spark Master 主机地址和端口。 Mesos://hostname:port:Mesos 模式,需要部署 Spark 和 Mesos 到相关节点,URL 为 Mesos 主机地址和端口。 YARN standalone/Yarn cluster:YARN 模式一,主程序逻辑和任务都运行在 YARN 集群中。 YARN client:YARN 模式二,主程序逻辑运行在本地,具体任务运行在 YARN 集群sc 代表着 Spark 的上下文,通过该变量可以执行 Spark 的一些操作,而 sqlCtx 代表着 HiveContext 的上下文。
def main(args: Array[String]): Unit = { //1、创建SparkConf对象 val sparkConf: SparkConf = new SparkConf().setAppName("Iplocation").setMaster("local[2]") //2、构建SparkContext对象 val sc = new SparkContext(sparkConf) sc.setLogLevel("warn") //3、加载城市ip信息数据,获取 (ip开始数字、ip结束数字、经度、 纬度) val city_ip_rdd: RDD[(String, String, String, String)] = sc.textFile("./data/ip.txt").map(x=>x.split("\\|")).map(x= >(x(2),x(3),x(x.length-2),x(x.length-1))) //使用spark的广播变量把共同的数据广播到参与计算的worker节点 broadcast val cityIpBroadcast: Broadcast[Array[(String, String, String, String)]] = sc.broadcast(city_ip_rdd.collect()) //4、读取运营商日志数据 val userIpsRDD: RDD[String] = sc.textFile("./data/394251.http.format").ma p(x=>x.split("\\|")(1)) //5、遍历userIpsRDD 获取每一个ip地址,然后转换Long类型数字, 去广播变量值中去比较 val result: RDD[((String, String), Int)] = userIpsRDD.mapPartitions(iter => { val city_ip_array: Array[(String, String, String, String)] = cityIpBroadcast.value //获取广播变量的值 iter.map(ip => { //获取每一个ip地址 val ipNum: Long = ip2Long(ip) // 把ip地址转换成Long类型数字 val index: Int = binarySearch(ipNum, city_ip_array) //拿到ipNum数字去广播变量值中进行匹配 val result: (String, String, String, String) = city_ip_array(index) ((result._3, result._4), 1) //封装结果数据 进行返回 ((经度,纬度),1) }) }) //6、相同经纬度出现的1累加 val finalResult: RDD[((String, String), Int)] = result.reduceByKey(_+_) finalResult.foreach(println) sc.stop() } }
//,统计在README.md中含有Spark的行数有多少,sc.textFile("README.md").filter(_.contains("Spark")).count
2
提交任务
$ mkdir -p /usr/lib/spark/examples/python$ tar zxvf /usr/lib/spark/lib/python.tar.gz -C /usr/lib/spark/examples/python $ ./bin/spark-submit examples/python/pi.py 10
https://www.jianshu.com/p/4cd22eda363f
1、transformation(转换)
它可以实现把一个rdd转换生成一个新的rdd,它是延迟加载,不会立即触
发任务的真正运行
比如 flflatMap/map/reduceByKey
2、action(动作)
它会触发任务的真正运行
比如 collect/saveAsTextFile
https://blog.csdn.net/daerzei/article/details/81512412
Spark中RDD的高效与DAG图有着莫大的关系,在DAG调度中需要对计算过程划分Stage。划分的依据就是就是RDD之间的依赖关系,宽依赖是划分stage的依据
窄依赖指父RDD的每个分区只被子RDD的一个分区所使用,子RDD分区通常对应常数个父RDD分区(O(1),与数据规模无关)比如 map/filter/flatMap等等 ,不会产生shuffle
**宽依赖是指父RDD的每个分区都可能被多个子RDD分区所使用,**子RDD分区通常对应所有的父RDD分区(O(n),与数据规模有关)比如 reduceByKey / groupByKey /sortByKey /groupBy 等等 会产生shuffle
优缺点:
窄依赖允许在一个集群节点上以流水线的方式计算所有父分区,而宽依赖则需要首先计算好所有父分区数据,然后在节点之间进行Shuffle,这与MapReduce类似。
第二窄依赖能够更有效地进行失效节点的恢复,即只需要重新计算丢失分区的父分区,而且不同节点之间可以并行计算;而对于一个宽依赖的Lineage图,单个节点失效可能导致这个RDD的所有祖先丢失部分分区,因而需要整体重新计算。
可以把rdd的数据缓存在内存或者是磁盘中,后续需要用到这份数据,就可以直接
从缓存中获取得到,避免了数据的重复计算。可以调用rdd中的cache和persist2个方法
全都是spark平台下的分布式弹性数据集,为处理超大型数据提供便利
三者都有惰性机制,在进行创建、转换,如map方法时,不会立即执行,只有在遇到Action如foreach时,三者才会开始遍历运算,极端情况下,如果代码里面有创建、转换,但是后面没有在Action中使用对应的结果,在执行时会被直接跳过,如
val sparkconf = new SparkConf().setMaster("local").setAppName("test").set("spark.port.maxRetries","1000")val spark = SparkSession.builder().config(sparkconf).getOrCreate()val rdd=spark.sparkContext.parallelize(Seq(("a", 1), ("b", 1), ("a", 1))) rdd.map{line=> println("运行") line._1}//map中的println("运行")并不会运行
RDD:
1、RDD一般和spark mlib同时使用
2、RDD不支持sparksql操作
DataFrame:
1、与RDD和Dataset不同,DataFrame每一行的类型固定为Row,只有通过解析才能获取各个字段的值,如
2、DataFrame与Dataset一般与spark ml同时使用( ml是新的API,ml包里面的模型是基于dataframe操作的 )
DataFrame与Dataset均支持sparksql的操作,比如select,groupby之类,还能注册临时表/视窗,进行sql语句操作,如
https://www.cnblogs.com/starwater/p/6841807.html
sparkcontext是spark集合入口