博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spark的左外连接解决方案
阅读量:2490 次
发布时间:2019-05-11

本文共 2489 字,大约阅读时间需要 8 分钟。

1、Spark的左外连接解决方案之不使用letfOutJoin()

import org.apache.spark.{SparkConf, SparkContext}object LeftOutJoinTest {  def main(args: Array[String]): Unit = {    //连接SparkMaster    val conf = new SparkConf().setAppName("Chenjie's first spark App").setMaster("local")    val sc = new SparkContext(conf)    //从HDFS中读取输入文件并创建RDD    val users = sc.textFile("hdfs://pc1:9000/input/users.txt")    val user_map = users.map(line=>Tuple2(line.split("\t")(0),Tuple2("L",line.split("\t")(1))))    val transactions = sc.textFile("hdfs://pc1:9000/input/transactions.txt")    val transaction_map = transactions.map(line=>Tuple2(line.split("\t")(2),Tuple2("P",line.split("\t")(1))))    val all = transaction_map.union(user_map)    val groupedRDD = all.groupByKey()    val productLocationsRDD = groupedRDD.flatMap{tuple=>      val pairs = tuple._2      var location = "UNKOWN"      val products = new scala.collection.mutable.ArrayBuffer[String]()      pairs.foreach{t2=>        if(t2._1.equals("L")){          location = t2._2        }        else{          products.+=(t2._2)        }      }      val kvlist = new scala.collection.mutable.ArrayBuffer[Tuple2[String,String]]()      for(product <- products){        kvlist.+=((new Tuple2(product,location)))      }      kvlist    }    productLocationsRDD.distinct().groupByKey().map{pair=>      val key = pair._1      val locations = pair._2      val length = locations.size      Tuple2(key,Tuple2(locations,length))    }.saveAsTextFile("hdfs://pc1:9000/output/leftoutjoin_1")  }}

运行结果:

2、Spark的左外连接解决方案之使用letfOutJoin():避免标志位等麻烦

import org.apache.spark.{SparkConf, SparkContext}object LeftOutJoinTest {  def main(args: Array[String]): Unit = {    //连接SparkMaster    val conf = new SparkConf().setAppName("Chenjie's first spark App").setMaster("local")    val sc = new SparkContext(conf)    //从HDFS中读取输入文件并创建RDD    val users = sc.textFile("hdfs://pc1:9000/input/users.txt")    val user_map = users.map(line=>Tuple2(line.split("\t")(0),line.split("\t")(1)))    val transactions = sc.textFile("hdfs://pc1:9000/input/transactions.txt")    val transaction_map = transactions.map(line=>Tuple2(line.split("\t")(2),line.split("\t")(1)))    val joined = transaction_map.leftOuterJoin(user_map)    joined.map(line=>Tuple2(line._2._1,line._2._2.get)).distinct().groupByKey().map{pair=>      val key = pair._1      val locations = pair._2      val length = locations.size      Tuple2(key,Tuple2(locations,length))    }.saveAsTextFile("hdfs://pc1:9000/output/leftoutjoin_2")  }}

你可能感兴趣的文章
使用 CSS 用户选择控制选择
查看>>
简单的交叉报表处理示例.sql
查看>>
MySQL Connector/ODBC 5.2.2 发布
查看>>
oracle 存储过程 stored procedure 查询一条记录或多条记录
查看>>
30个WordPress Retina(iPad)自适应主题
查看>>
python实现文件加密
查看>>
使用JXL组件操作Excel和导出文件
查看>>
【liferay】6、关于liferay中使用requestMapping映射地址提交表单
查看>>
gcd(欧几里得算法)
查看>>
Java执行存储过程
查看>>
mysql 如何提高批量导入的速度
查看>>
VMwareWorkstation设置U盘启动(或U盘使用)
查看>>
java中 set,list,array(集合与数组)相互转换
查看>>
Android客户端打包方案分享
查看>>
012.Adding a New Field --【添加一个新字段】
查看>>
2016年终总结
查看>>
项目Alpha冲刺Day4
查看>>
初始Linux
查看>>
【板+并查集判断连通性】并查集判断连通性
查看>>
C++的ORM工具比较
查看>>