博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
DataSet数据集在使用sql()时,无法使用map,flatMap等转换算子的解决办法
阅读量:6999 次
发布时间:2019-06-27

本文共 2868 字,大约阅读时间需要 9 分钟。

hot3.png

摘要

我们在使用spark的一个流程是:利用spark.sql()函数把数据读入到内存形成DataSet[Row](DataFrame)由于Row是新的spark数据集中无法实现自动的编码,需要对这个数据集进行编码,才能利用这些算子进行相关的操作,如何编码是一个问题,在这里就把这几个问题进行总结一下。报的错误:error: Unable to find encoder for type stored in a Dataset.  Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._  Support for serializing other types will be added in future releases.

报这个错误一般就是我们在使用算子时其返回值的数据类型往往不是spark通过自身的反射能完成的自动编码部分,比如通过map算子,我们在map算子的函数的返回值类型是Map类型的,就会出现上面的问题,因为Map集合类不在:基本的类型和String,case class和元组的范围之内,spark内部不能通过反射完成自动编码。

 

出现这个问题的原因

spark2.0以后的版本采用的是新的分布式数据集DataSet,其中DataFrame是DataSet[Row]的别名形式。而新的数据集采用了很多的优化,其中一个就是利用了Tungsten execution engine的计算引擎,这个计算引擎采用了很多的优化。其中一个就是自己维护了一个内存管理器,从而使计算从java jvm解脱出来了,使得内存的优化得到了很大的提升。同时新的计算引擎,把数据存储在内存中是以二进制的形式存储的,大部分所有的计算都是在二进制数据流上进行的,不需要把二进制数据流反序列化成java对象,然后再把计算的结果序列化成二进制数据流,而是直接在二进制流上进行操作,这样的情况就需要我们存在一种机制就是java对象到二进制数据流的映射关系,不然我们不知道二进制流对应的数据对象是几个字节,spark这个过程是通过Encoders来完成的,spark自身通过反射完成了一部分的自动编码过程:基本的类型和String,case class和元组,对于其他的集合类型或者我们自定义的类,他是无法完成这样的编码的。需要我们自己定义这样的编码也就是让其拥有一个schema。

解决这个问题方式

方法一:

这样就是把其转化为RDD,利用RDD进行操作,但是不建议用这个,相对于RDD,DataSet进行了很多的底层优化,拥有很不错性能

val orderInfo1 = spark.sql( """   |SELECT   |o.id,   |o.user_id   |FROM default.api_order o   |limit 100 """.stripMargin).rdd.map(myfunction)

方法二:

让其自动把DataSet[Row]转化为DataSet[P],如果Row里面有复杂的类型出现的话。

case class Orders(id: String, user_id: String)//这个case class要定义在我们的单例对象的外面object a {def main(args: Array[String]): Unit ={import spark.implicits._val orderInfo1 = spark.sql( """   |SELECT   |o.id,   |o.user_id   |FROM default.api_order o   |limit 100 """.stripMargin).as[Orders].map(myfunction)}}

方式三:

自定义一个schema,然后利用RowEncoder进行编码。这只是一个例子,里面的类型其实都可以通过spark的反射自动完成编码过程。

import spark.implicits._val schema = StructType(StructType(Seq(StructField("id",StringType,true),StructField("user_id",StringType,true))))val encoders = RowEncoder(schema)val orderInfo1 = spark.sql( """   |SELECT   |o.id,   |o.user_id   |FROM default.api_order o   |limit 100 """.stripMargin).map(row => row)(encoders)

方法四:

直接利用scala的模式匹配的策略case Row来进行是可以通过的,原因是case Row()scala模式匹配的知识,这样可以知道集合Row里面拥有多少个基本的类型,则可以通过scala就可以完成对Row的自动编码,然后可以进行相应的处理。

import spark.implicits._val orderInfo1 = spark.sql( """   |SELECT   |o.id,   |o.user_id   |FROM default.api_order o   |limit 100 """.stripMargin).map{case Row(id: String, user_id: String) => (id,user_id)}这个得到的schema为:orderInfo1: org.apache.spark.sql.Dataset[(String, String)] = [_1: string, _2: string]如果换成这样:val orderInfo1 = spark.sql( """   |SELECT   |o.id,   |o.user_id   |FROM default.api_order o   |limit 100 """.stripMargin).map{case Row(id: String, user_id: String) => List(id,user_id)}得到的schema为:orderInfo1: org.apache.spark.sql.Dataset[List[String]] = [value: array
]可以看出:spark是把元祖看成case class一种特殊形式拥有,schame的字段名称为_1,_2这样的特殊case clase

 

转载于:https://my.oschina.net/u/2000675/blog/3022158

你可能感兴趣的文章
方程式组织的漏洞程序稍作修改 就可以攻击最新的思科防火墙
查看>>
HackerOne第二名白帽专访:业余挖洞,两年赚 40 万美金
查看>>
揭秘云梦如何借力阿里云云市场创富千万
查看>>
实时计算:通往物联网的网关?
查看>>
中国人工智能学会通讯——构建强健的人工智能:原因及方式 1. 针对不确定性的决策...
查看>>
IBM Power业务掌门人:认知时代是计算系统演进的拐点
查看>>
有了邮件防火墙你就安全了吗?想法太单纯了……
查看>>
云领未来 OpenStack要从行业抓起
查看>>
《VMware Virtual SAN权威指南》一2.3.5 VSAN网络流量
查看>>
医疗服务机器人在智慧养老领域的应用
查看>>
POS机数据泄露已蔓延至云端
查看>>
教你如何使用分组密码对shellcode中的windows api字符串进行加密
查看>>
蓝牙协议实现爆严重安全漏洞,可在旧版本设备上构建勒索僵尸网络
查看>>
AI 黑客会大规模进军网络安全领域吗?为时尚早,因为太贵了
查看>>
Linux发行版们应该禁用 IPv4 映射的 IPv6 地址吗?
查看>>
iTunes无法验证服务器?来试试这个解决办法
查看>>
又一个“心脏滴血”? OpenSSL将发布安全补丁
查看>>
NAND闪存的现状与未来
查看>>
阿里云E-MapReduce产品简介
查看>>
Go 将统治下一个十年
查看>>