DataFrame创建的DSL与SQL方法对比操作指南
发布时间:2024-07-06 11:31:38 所属栏目:MsSql教程 来源:DaWei
导读: 很多朋友都对“创建DataFrame的DSL和SQL方式分别如何操作的”的内容比较感兴趣,对此小编整理了相关的知识分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获,那么感兴趣
很多朋友都对“创建DataFrame的DSL和SQL方式分别如何操作的”的内容比较感兴趣,对此小编整理了相关的知识分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获,那么感兴趣的朋友就继续往下看吧! 创建DataFrame 方式一:DSL方式操作 实例化SparkContext和SparkSession对象 利用StructType类型构建schema,用于定义数据的结构信息 通过SparkContext对象读取文件,生成RDD 将RDD[String]转换成RDD[Row] 通过SparkSession对象创建dataframe 完整代码如下: package com.scala.demo.sql import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.types.{DataType, DataTypes, StructField, StructType} object Demo01 { def main(args: Array[String]): Unit = { // 1.创建SparkContext和SparkSession对象 val sc = new SparkContext(new SparkConf().setAppName("Demo01").setMaster("local[2]")) val sparkSession = SparkSession.builder().getOrCreate() // 2. 使用StructType来定义Schema val mySchema = StructType(List( StructField("empno", DataTypes.IntegerType, false), StructField("ename", DataTypes.StringType, false), StructField("job", DataTypes.StringType, false), StructField("mgr", DataTypes.StringType, false), StructField("hiredate", DataTypes.StringType, false), StructField("sal", DataTypes.IntegerType, false), StructField("comm", DataTypes.StringType, false), StructField("deptno", DataTypes.IntegerType, false) )) // 3. 读取数据 val empRDD = sc.textFile("file:///D:\\TestDatas\\emp.csv") // 4. 将其映射成ROW对象 val rowRDD = empRDD.map(line => { val strings = line.split(",") Row(strings(0).toInt, strings(1), strings(2), strings(3), strings(4), strings(5).toInt,strings(6), strings(7).toInt) }) // 5. 创建DataFrame val dataFrame = sparkSession.createDataFrame(rowRDD, mySchema) // 6. 展示内容 DSL dataFrame.groupBy("deptno").sum("sal").as("result").sort("sum(sal)").show() } } 方式二:SQL方式操作 实例化SparkContext和SparkSession对象 创建case class Emp样例类,用于定义数据的结构信息 通过SparkContext对象读取文件,生成RDD[String] 将RDD[String]转换成RDD[Emp] 引入spark隐式转换函数(必须引入) 将RDD[Emp]转换成DataFrame 将DataFrame注册成一张视图或者临时表 通过调用SparkSession对象的sql函数,编写sql语句 停止资源 具体代码如下: package com.scala.demo.sql import org.apache.spark.rdd.RDD import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.types.{DataType, DataTypes, StructField, StructType} // 0. 数据分析 // 7499,ALLEN,SALESMAN,7698,1981/2/20,1600,300,30 // 1. 定义Emp样例类 case class Emp(empNo:Int,empName:String,job:String,mgr:String,hiredate:String,sal:Int,comm:String,deptNo:Int) 原创图片与内容无关,仅为配文美观 object Demo02 {def main(args: Array[String]): Unit = { // 2. 读取数据将其映射成Row对象 val sc = new SparkContext(new SparkConf().setMaster("local[2]").setAppName("Demo02")) val mapRdd = sc.textFile("file:///D:\\TestDatas\\emp.csv") .map(_.split(",")) val rowRDD:RDD[Emp] = mapRdd.map(line => Emp(line(0).toInt, line(1), line(2), line(3), line(4), line(5).toInt, line(6), line(7).toInt)) // 3。创建dataframe val spark = SparkSession.builder().getOrCreate() // 引入spark隐式转换函数 import spark.implicits._ // 将RDD转成Dataframe val dataFrame = rowRDD.toDF // 4.2 sql语句操作 // 1、将dataframe注册成一张临时表 dataFrame.createOrReplaceTempView("emp") // 2. 编写sql语句进行操作 spark.sql("select deptNo,sum(sal) as total from emp group by deptNo order by total desc").show() // 关闭资源 spark.stop() sc.stop() } } 上述内容具有一定的借鉴价值,感兴趣的朋友可以参考,希望能对大家有帮助。 (编辑:拼字网 - 核心网) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |
推荐文章
站长推荐