Skip to content

sotowang/Spark_SQL

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

28 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Spark SQL基础

[北风网 Spark 2.0从入门到精通] (278讲)

  • Spark SQL特点
1. 支持多种数据源: Hive,RDD,Parquet,JSON,JDBC
2. 多种性能优化技术: in-memory columnar storage, byte-code generation, cost model动态评估等
3. 组件扩展性: 对于SQL的讲法解析器,分析器以及优化器,用户都可以自己开发,并且动态扩展

Spark SQL 开发步骤

  • 创建SQLContext /HiveContext(官方推荐)对象
 SparkConf sparkConf = new SparkConf()
                .setAppName("...")
                .setMaster("local");

        JavaSparkContext jsc = new JavaSparkContext(sparkConf);

        SQLContext sqlContext = new SQLContext(jsc);

使用Json文件创建DataFrame DataFrameCreate.java

SparkConf sparkConf = new SparkConf()
                .setAppName("DataFrameCreate")
                .setMaster("local");

        JavaSparkContext jsc = new JavaSparkContext(sparkConf);

        SQLContext sqlContext = new SQLContext(jsc);


        DataFrame df =  sqlContext.read().json("/home/sotowang/user/aur/ide/idea/idea-IU-182.3684.101/workspace/SparkSQLProject/src/resources/students.json");

        df.show();

DataFrame 常用操作

打印DataFrame中所有的数据

df.show();

打印DataFrame的元数据 (Schema)

df.printSchema();

查询某列所有数据

df.select("name").show();

查询某几列所有数据,并对列进行计算

df.select(df.col("name"),df.col("age").plus(1)).show();

根据某一列的值进行过滤

df.filter(df.col("age").gt(18)).show();

根据某一列进行分组,然后进行聚合

df.groupBy(df.col("age")).count().show();

RDD转DataFrame

为什么要将RDD转DataFrame?

因为这样的话,我们可以直接针对HDFS等任何可以构建为RDD的数据,使用Spark SQL进行SQL查询,这个功能很强大

Spark SQL 支持2种方式来将RDD转为DataFrame

1. 使用反射来推断包含了特定数据类型的RDD元数据.这种基于反射的方式,代码比较简洁,当你已经知道你的RDD的元素时,是一种不错的方式

2. 通过编程接口来创建DataFrame,你可以在程序运行时动态构建一份元数据,然后将其应用到已经存在的RDD上,代码比较冗长,
    但如果在编写程序时,还不知道RDD的元数据,只有在程序运行时,才能动态得知其元数据,只能通过动态构建元数据的方式

使用反射的方式将RDD转换为Dataframe RDD2DataFrameReflection.java

JavaRDD<String> lines = jsc.textFile("/home/sotowang/user/aur/ide/idea/idea-IU-182.3684.101/workspace/SparkSQLProject/src/resources/students.json");

        JavaRDD<Student> students = lines.map(new Function<String, Student>() {
            public Student call(String line) throws Exception {
                String[] lineSplited = line.split(",");
                Student stu = new Student();
                stu.setId(Integer.valueOf(lineSplited[0]));
                stu.setName(lineSplited[1]);
                stu.setAge(Integer.valueOf(lineSplited[2]));
                return stu;
            }
        });

        //使用反射方式将RDD转换为DataFrame
        //将Student.Class传入进去,其实就是用反射的方式来创建DataFrame
        //因为Student.class本身就是反射的一个应用
        //然后底层还得通过对Student Class进行反射,来获取其中的field
        DataFrame studentDF = sqlContext.createDataFrame(students, Student.class);

        //拿到DataFrame后,将其注册为一个临时表,然后针对其中的数据进行SQL语句
        studentDF.registerTempTable("students");


        //针对students临时表执行语句,查询年龄小于等于18岁的学生,就是teenager

        DataFrame teenagerDF = sqlContext.sql("select * from students where age <= 18");

        //将查询出的DataFrame,再次转换为RDD
        JavaRDD<Row> teenagerRDD = teenagerDF.javaRDD();

        //将RDD中的数据,进行映射,映射为student
        JavaRDD<Student> teenagerStudentRDD = teenagerRDD.map(new Function<Row, Student>() {
            public Student call(Row row) throws Exception {
                Student student = new Student();
                student.setAge(row.getInt(0));
                student.setName(row.getString(2));
                student.setId(row.getInt(1));
                return student;
            }
        });


        //将数据collect,打印出来
        List<Student> studentList = teenagerStudentRDD.collect();

        for (Student student : studentList) {
            System.out.println(student.toString());
        }

出现的问题:

1.Java Bean: Student.java 的序列化问题(不序列化会报错),public class Student implements Serializable

2.将RDD中的数据,进行映射,映射为student时,其RDD中student的属性顺序会乱(与文件中顺序不一致)

以编程方式动态指定元数据,将RDD转换为DataFrame

 //第一步,创建一个普通的RDD,但是,必须将其转换为RDD<Row>的这种格式
        final JavaRDD<String> lines = jsc.textFile("/home/sotowang/user/aur/ide/idea/idea-IU-182.3684.101/workspace/SparkSQLProject/src/resources/students.json");

        JavaRDD<Row> studentRDD = lines.map(new Function<String, Row>() {
            public Row call(String line) throws Exception {
                String[] lineSplited = line.split(",");

                return RowFactory.create(Integer.valueOf(lineSplited[0]), lineSplited[1], Integer.valueOf(lineSplited[2]));

            }
        });

        //第二步:动态构造元数据
        //比如说,id,name等,field的名称和类型,可能都是在程序运行过程中,动态从mysql,db里
        //或者是配置文件中加载的,不固定
        //所以特别适合用这种编程方式,来构造元数据
        List<StructField> structFields = new ArrayList<StructField>();
        structFields.add(DataTypes.createStructField("id", DataTypes.IntegerType, true));
        structFields.add(DataTypes.createStructField("name", DataTypes.StringType, true));
        structFields.add(DataTypes.createStructField("age", DataTypes.IntegerType, true));

        StructType structType = DataTypes.createStructType(structFields);

        //第三步,使用动态构造的元数据,将RDD转为DataFrame
        DataFrame studentDF = sqlContext.createDataFrame(studentRDD, structType);

        studentDF.registerTempTable("students");
        DataFrame teenagerDF = sqlContext.sql("select * from students where age <= 18");
        List<Row> rows = teenagerDF.javaRDD().collect();

        for (Row row : rows) {
            System.out.println(row);
        }

出现的问题:

1.报错:不能直接从String转换为Integer的一个类型转换错误,说明有个数据,给定义成了String类型,结果使用的时候 要用Integer类型来使用,错误报在sql相关的代码中.在sql中,用到age<=18语法,所以强行将age转换为Integer来使用, 但之前有些步骤将age定义了String


通用的load和save操作 GenericLoadSave.java

DataFrame usersDF = sqlContext.read().load("/home/sotowang/user/aur/ide/idea/idea-IU-182.3684.101/workspace/SparkSQLProject/src/resources/users.parquet");

usersDF.printSchema();

usersDF.show();

usersDF.select("name","favorite_color").write().save("/home/sotowang/Desktop/nameAndColors.parquet");

手动指定数据源类型 ManuallySpecifyOptions.java

默认为 parquet

DataFrame usersDF = sqlContext.read()
                .format("parquet")
                .load("/home/sotowang/user/aur/ide/idea/idea-IU-182.3684.101/workspace/SparkSQLProject/src/resources/users.parquet");


usersDF.select("name","favorite_color")
        .write()
        .format("json")
        .save("/home/sotowang/Desktop/nameAndColors");

saveMode SaveModeTest.java

SaveMode.ErrorIfExists 文件存在会报错

usersDF.save("/home/sotowang/user/aur/ide/idea/idea-IU-182.3684.101/workspace/SparkSQLProject/src/resources/users.parquet", SaveMode.ErrorIfExists);

SaveMode.Append 存在追加数据

usersDF.save("/home/sotowang/user/aur/ide/idea/idea-IU-182.3684.101/workspace/SparkSQLProject/src/resources/users.parquet", SaveMode.Append);

SaveMode.Overwrite 覆盖

SaveMode.ignore 忽略


数据源Parquet之使用编程方式加载数据 ParquetLoadData.java

  • Parquet是面向分析型业务的列式存储格式

列式存储与行式存储有哪些优势?

1. 可以路过不符合条件的数据,只需要读取需要的数据,降低IO数据量
2. 压缩编码可以降低磁盘存储空间,由于同一列的数据类型是一样的,可以使用更高效的压缩编码(如Run Length Encoding和Delta Encoding) 进一步节约存储空间
3. 只读取需要的列,支持向量运算,能够获取更好的扫描性能
DataFrame usersDF = sqlContext.read().parquet("/home/sotowang/user/aur/ide/idea/idea-IU-182.3684.101/workspace/SparkSQLProject/src/resources/users.parquet");

//将DataFrame注册为临时表,使用SQL查询需要的数据
usersDF.registerTempTable("users");

DataFrame userNameDf = sqlContext.sql("select name from users ");

//对查询出的DataFrame进行transformation操作,处理数据,然后打印
List<String> userNames = userNameDf.javaRDD().map(new Function<Row, String>() {
    public String call(Row row) throws Exception {
        return "Name: " + row.getString(0);
    }
}).collect();

for (String userName : userNames) {
    System.out.println(userName);
}

数据源Parquet之自动分区推断

用户也许不希望Spark SQL自动推断分区列的数据类型。此时只要设置一个配置即可,

spark.sql.sources.partitionColumnTypeInference.enabled

,默认为true,即自动推断分区列的类型,设置为false,即不会自动推断类型。 禁止自动推断分区列的类型时,所有分区列的类型,就统一默认都是String。

数据源Parquet之合并元数据(默认关闭) ParquetMergeSchema.java

案例:合并学生的基本信息和成绩信息的源数据

//创建一个DataFrame,作为学生的基本信息,并写入一个parquet文件中

List<Tuple2<String, Integer>> studentsWithNameAge = Arrays.asList(new Tuple2<String, Integer>("leo", 23), new Tuple2<String, Integer>("Jack", 25));

JavaRDD<Row> studentWithNameAgeRDD = jsc.parallelize(studentsWithNameAge).map(new Function<Tuple2<String, Integer>, Row>() {
    public Row call(Tuple2<String, Integer> student) throws Exception {
        return RowFactory.create(student._1, student._2);
    }
});

List<StructField> structFields_age = new ArrayList<StructField>();

structFields_age.add(DataTypes.createStructField("name", DataTypes.StringType, true));
structFields_age.add(DataTypes.createStructField("age", DataTypes.IntegerType, true));

StructType structType_age = DataTypes.createStructType(structFields_age);

//创建一个DataFrame,作为学生的基本信息,并写入一个parquet文件
DataFrame studentsWithNameAgeDF = sqlContext.createDataFrame(studentWithNameAgeRDD, structType_age);
studentsWithNameAgeDF.save("/home/sotowang/Desktop/students", SaveMode.Append);



//创建第二个DataFrame,作为学生的成绩信息,并写入一个parquet文件中
List<Tuple2<String, String>> studentsWithNameGrade = Arrays.asList(new Tuple2<String, String>("marry", "A"), new Tuple2<String, String>("tom", "B"));
JavaRDD<Row> studentsWithNameGradeRDD = jsc.parallelize(studentsWithNameGrade).map(new Function<Tuple2<String, String>, Row>() {
    public Row call(Tuple2<String, String> student) throws Exception {
        return RowFactory.create(student._1, student._2);
    }
});
List<StructField> structFields_grade = new ArrayList<StructField>();
structFields_grade.add(DataTypes.createStructField("name", DataTypes.StringType, true));
structFields_grade.add(DataTypes.createStructField("grade", DataTypes.StringType, true));
StructType structType_grade = DataTypes.createStructType(structFields_grade);

//创建一个DataFrame,作为学生的基本信息,并写入一个parquet文件
DataFrame studentsWithNameGradeDF = sqlContext.createDataFrame(studentsWithNameGradeRDD, structType_grade);
studentsWithNameGradeDF.save("/home/sotowang/Desktop/students", SaveMode.Append);


//第一个DataFrame和第二个DataFrame的元数据不一样,一个是包含了name和age两列,一个是包含了name和grade两列
//这里期望读出来的表数据,自动合并含两个文件的元数据,出现三个列,name age grade

//有mergeSchema的方式读取students表中的数据,进行元数据的合并
DataFrame studentsDF = sqlContext.read().option("mergeSchema", "true").parquet("/home/sotowang/Desktop/students");

studentsDF.printSchema();
studentsDF.show();

结果

+-----+----+-----+
| name| age|grade|
+-----+----+-----+
|  leo|  23| null|
| Jack|  25| null|
|marry|null|    A|
|  tom|null|    B|
+-----+----+-----+

Json数据源 JSONDataSource.java

Spark SQL可以自动推断JSON文件的元数据,并且加载其数据,创建一个DataFrame,可以使用SQLContext.read.json()方法,针对一个元素类型为String 的RDD,或者是一个JSON文件

注意:这里使用的JSON文件与传统意义上的JSON文件是不一样的,每行都必须也只能包含一个单独的,包含的有效的JSON对象,不能让一个JSON对象分散在钓竿,否则会报错

案例: 查询成绩为80分以上的学生的基本信息与成绩信息

注:sqlContext.read().json(studentInfoJSONsRDD) ==> 该API可以接受一个JavaRDD直接转为DataFrame,与前面所讲的反射不一样

注: 默认DataFrame中将数字类型转为Long而不是Int,要将Long型转为Integer型需要Integer.valueOf(String.valueOf(row.getLong(1)))

Hive数据源 (企业常用) HiveDataSource.java

操作Hive使用HiveContext而为是SQLContext.HiveContext继承自SQLContext,但是增加了在Hive元数据库中查找表,以及用HiveQL语法编写SQL功能, 除了sql()方法,HiveContext还提供了hql()方法,从而Hive语法来编译. Hive中查询出来的数据是一个Row数组

将hive-site.xml拷贝到spark/conf目录下,将mysql connector 拷贝到spark/lib 目录下

  • Spark SQL允许将数据保存到Hive表中,调用DataFrame的saveAsTable命令,即可将DaraFrame中的数据保存到Hive表中.与registerTempTable不同, saveAsTable是会将DataFrame中的数据物化到Hive表中的,而且还会在Hive元数据库中创建表的元数据

  • 默认情况下,saveAsTable会创建一张Hive Managed Table,也就是说,数据的位置都是由元数据中的信息控制的.当Managed Table被删除时,表中的数据也传动一并被 物理删除

  • regiserTempTable只是注册一个临时的表,只要Spark Application重启或停止了,表就没了,而saveAsTable是物化的表,表会一直存在

  • 调用HiveContext.table() 方法,还可以直接针对Hive中的表,创建一个DataFrame

案例: 查询分数大于80分的学生信息

  • 创建HiveContext,注意:它接收的是sparkContext为参数而不是JavaSparkContext

HiveContext hiveContext = new HiveContext(jsc.sc());

  • 第一个功能:使用HiveContext的sql()/hql() 方法,可以执行Hive中能执行的HiveQL语句
                //第一个功能:使用HiveContext的sql()/hql() 方法,可以执行Hive中能执行的HiveQL语句
        
                //判断是否存在student_infos,若存在则删除
                hiveContext.sql("DROP TABLE IF EXISTS student_infos");
                //如果不存在,则创建该表
                hiveContext.sql("CREATE TABLE IF NOT EXISTS  student_infos(name STRING, age INT ) row format delimited fields terminated by ','");
                //将学生基本信息数据导入student_infos表
                hiveContext.sql("LOAD DATA " +
                        " LOCAL INPATH '/home/sotowang/user/aur/ide/idea/idea-IU-182.3684.101/workspace/SparkSQLProject/src/resources/student_infos.txt' " +
                        " INTO TABLE student_infos ");
        
                //用同样的方式给student_scores导入数据
                hiveContext.sql("DROP TABLE IF EXISTS student_scores ");
                hiveContext.sql("CREATE TABLE IF NOT EXISTS student_scores(name STRING, score INT ) row format delimited fields terminated by ','");
                hiveContext.sql("LOAD DATA " +
                        " LOCAL INPATH '/home/sotowang/user/aur/ide/idea/idea-IU-182.3684.101/workspace/SparkSQLProject/src/resources/student_scores.txt' " +
                        " INTO TABLE student_scores ");
  • 第二个功能,执行sql还可以返回DataFrame,用于查询
      //第二个功能,执行sql还可以返回DataFrame,用于查询
      
              //执行sql查询,关联两张表,查询成绩大于80分的学生
              DataFrame goodStudentDF = hiveContext.sql(" SELECT si.name name, si.age age, ss.score score " +
                      " FROM student_infos si " +
                      " JOIN student_scores ss ON si.name =ss.name " +
                      " WHERE ss.score >= 80 ");
  • 第三个功能,可以将DataFrame中的数据,理论上来说,DataFrame对应的RDD元素是Row即可将DataFrame中的数据保存到hive表中
//第三个功能,可以将DataFrame中的数据,理论上来说,DataFrame对应的RDD元素是Row即可将DataFrame中的数据保存到hive表中

        //将DataFrame中的数据保存到good_student_infos
        hiveContext.sql("DROP TABLE IF EXISTS good_student_infos ");
        goodStudentDF.saveAsTable("good_student_infos");
  • 第四个功能:可以用table() 方法针对hive表,直接创建,DataFrame
       //第四个功能:可以用table() 方法针对hive表,直接创建DataFrame
               //然后针对good_student_infos表直接创建DataFrame
               Row[] goodStudentsRows = hiveContext.table("good_student_infos").collect();
       
               for (Row row : goodStudentsRows) {
                   System.out.println(row);
               }

注1:实际运行过程中出现报错如下:

MetaException(message:file:/user/hive/warehouse/src is not a directory or unable to create one)

解决方法:将hive-site.xml 放至resources目录下

注2:导入HDFS后,表内没数据,因为文件分隔符没有指定为空格

hiveContext.sql("CREATE TABLE IF NOT EXISTS student_infos(name STRING, age INT ) row format delimited fields terminated by ','");

JDBC数据源 JDBCDataSource.java

案例:查询分数大于80分的学生信息

  1. mysql创建数据库

mysql> create database testdb;

mysql> create table student_infos(name varchar(20),age integer);

mysql> create table student_scores(name varchar(20),score integer);

mysql> create table good_student_infos(name varchar(20),age integer,score integer);

mysql> insert into student_infos values('leo',18),('marry',17),('jack',19);

mysql> insert into student_scores values('leo',88),('marry',99),('jack',60);

  1. 读取mysql表
Map<String, String> options = new HashMap<String, String>();
options.put("url", "jdbc:mysql://sotowang-pc:3306/testdb");
options.put("user", "root");
options.put("password", "123456");

options.put("dbtable", "student_infos");
DataFrame studentInfoDF = sqlContext.read().format("jdbc").options(options).load();
  1. 将DataFrame中的数据保存到Mysql数据表中
        //将DataFrame中的数据保存到Mysql数据表中
        studentDF.javaRDD().foreach(new VoidFunction<Row>() {
            public void call(Row row) throws Exception {

                String sql = " insert into good_student_infos values('"
                        + row.getString(0) + "'," +
                        Integer.valueOf(String.valueOf(row.get(1))) + "," +
                        Integer.valueOf(String.valueOf(row.get(2))) + ")";

                Class.forName("com.mysql.jdbc.Driver");
                Connection conn = null;
                Statement statement = null;
                try {
                    conn = DriverManager.getConnection(
                            "jdbc:mysql://sotowang-pc:3306/testdb", "root", "123456"
                    );
                    statement = conn.createStatement();
                    statement.executeUpdate(sql);
                }catch (Exception e){
                    e.printStackTrace();
                }finally {
                    if (statement != null) {
                        statement.close();
                    }
                    if (conn != null) {
                        conn.close();
                    }
                }
            }
        });

Spark SQL高级内置函数

案例:根据每天的用户访问的购买日志统计每日的uv和销售额 (uv指:对用户进行去重以后的访问总数)

内置函数:countDistinct() DailyUV1.java

聚合函数用法

首先对DataFrame调用groupBy()方法,对某一列进行分组, 然后调用agg()方法,对数为内置函数,对见其源码

  • 注:能过阅读Spark agg方法源码,得知需要手动导入api:__ 才能使用agg(countDistinct("userid"))方法
import static org.apache.spark.sql.functions.countDistinct;
JavaRDD<Row> userAccessLogDistinctedRowRDD = userAccessLogRowDF.groupBy("date")
                .agg(countDistinct("userid"))  //(date,count(userid))
                .javaRDD();

内置函数:sum() DailySale.java

开窗函数以及top3销售额统计案例

最常用的函数: row_number() 实现分组取topN的逻辑 RowNumberWindowFunction.java

  • 创建销售额表,sales表
hiveContext.sql("drop table  if exists sales");
hiveContext.sql("create table if not exists sales ( " +
        " product STRING, " +
        " category STRING, " +
        "revenue BIGINT ) ");
hiveContext.sql("load data " +
        " local inpath '/home/sotowang/user/aur/ide/idea/idea-IU-182.3684.101/workspace/SparkSQLProject/src/resources/sales.txt' " +
        " into table sales ");


```* 使用row_number()开窗函数row_number()
作用就是给你一份每个分组的数据 按照其排序打上一个分组内的行号
比如:有一个分组date=20181001,里面有3条数据,1122,1121,1124,
那么对这个分组的每一行使用row_num()开窗函数以后,三等依次会获得一个组内行号行号从1开始递增,
比如1122 1,1121 2,1124 3

```java
DataFrame top3SalesDF = hiveContext.sql("" +
        "select product,category,revenue " +
        " from ( " +
        " select " +
        " product," +
        "category," +
        "revenue," +
        //row_number() 语法说明:
        //首先,在select查询时,使用row_number()函数
        //其次,row_number()后面跟上over关键字
        //然后,括号中,是partition by ,根据哪个字段进行分组
        //其次,order by 起行组内排序
        //然后,row_number()就可以给组内行一个行号
        "row_number() over (partition by category order by revenue desc ) rank " +
        "from sales " +
        " ) tmp_sales " +
        "where rank <=3 ");

//将每组前三的数据保存到一个表中
hiveContext.sql("drop table if exists top3_sales ");
top3SalesDF.saveAsTable("top3_sales");

Spark SQL 与 Hive On Spark

Spark SQL:

1. Spark 自己研发出来的,针对各种数据源:Hive,Parquet,JDBC,RDD等都可以执行查询,一套基于Spark计算引擎的查询引擎

2. 它是一个Spark项目,只不过是提供了针对Hive执行查询的功能而已

3. 适用于Spark技术栈的大数据应用类系统,舆情分析系统,风控系统,用户行为分析系统...

Hive On Spark:

1. 是一个Hive项目,指不通过MapReduce作为唯一的查询引擎,而是将Spark作为底层的查询引擎

2. Hive On Spark,只适用于Spark,在可预见的未来,很有可能Hive默认的底层引擎就从MapReduce切换为Spark了

3. 适合将原有的Hive数据仓库以及数据统计分析替换为Spark引擎,作为全公司通用的大数据分析引擎

Spark SQL与Spark Core合并 DailyTop3KeyWord.java

案例: 每日top3 热点搜索词统计案例实战

数据格式:

日期 用户 搜索词 城市 平台 版本

需求:

1. 筛选出符合查询条件的数据
2. 统计出每天搜索uv排名前3的搜索词
3. 按照每天的top3搜索词搜索总次数,侄是序排序
4. 将数据保存到hive表中

思路分析:

1. 针对原始数据(HDFS文件),获取输入的RDD
2. 使用filter牌子,去针对输入RDD中的数据,进行数据过滤,过滤出符合查询条件的数据
    2.1 普通的做法:直接在filter牌子函数中,使用外部的查询条件(Map),但这样做的话,查询条件Map会发送到每一个task一个副本
    2.2 优化后的做法:将查询条件封装为Broadcast广播变量,在filter算子中,使用Broadcast广播变量
3. 将数据转换为"(日期_搜索词, 用户) "的格式然后转换进行分级,然后再次进行映射,对每天每个搜索词的用户进行去重操作,并统计去重后的数量,即为每天每个搜索词的uv,最后获得"(日期_搜索词,uv)"
4. 将得到的每天每个搜索词的uv,RDD,映射为元素类型为Row的RDD,将该RDD转换为DataFrame
5. 将DataFrame注册为临时表,使用Spark SQL的开窗函数,来统计每天的uv数量排名前3的搜索词,以及它的搜索uv,最后获取,是一个DataFrame
6. 将DataFrame转换为RDD,继续操作按照每天日期来进行分级,并进行映射,计算出每天的top3搜索词的搜索uv的总数,然后uv总数为key,将每天top3搜索词以及搜索次数拼接为一个字符串
7. 按照每天的top3 搜索总uv,再次映射回来,变成"日期_搜索词_uv"的格式
8. 再次映射为DataFra,e,并将数据保存到Hive中
  • 遇到的问题:
  1. 在生成hive表时显示,hdfs文件已存在
Exception in thread "main" org.apache.spark.sql.AnalysisException: path hdfs://sotowang-pc:9000/user/hive/warehouse/daily_top3_keyword_uv already exists.;

解决方法:

hadoop fs -rm -r /user/hive/warehouse/daily_top3_keyword_uv

About

Spark SQL基础

Topics

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published