S抽取Mysql

sq1.0 用Scala抽取Mysql指定数据到Hive的ODS层的指定分区表中


object demo_extractjob {
  def main(args: Array[String]): Unit = {
    System.setProperty("HADOOP_USER_NAME", "root")
    val sparkBuilder = SparkSession.builder()
    if ((args.length > 0 && args(0).equals("local")) || args.length == 0) {
      sparkBuilder.master("local[*]")
    }
    val spark = sparkBuilder.appName("demo_extractjob")
      .enableHiveSupport()
      .getOrCreate()

    /**
      * 连接mysql
      **/
    spark.read.format("jdbc")
      .option("url", "jdbc:mysql://slave2:3306/test?useUnicode=true&characterEncoding=utf8&useSSL=false")  //mysql url
      .option("driver", "com.mysql.jdbc.Driver") //mysql driver
      .option("user","root") //mysql user
      .option("password", "123456") //mysql password
      .option("dbtable", "students").load().createTempView("mysql_table1")

    spark.sql("select * from mysql_table1").show()


    spark.sql("show databases").show()
    //插入hive表
    spark.sql(
      s"""
         |insert overwrite table test.students_01 partition (city='henan')
         |select * from mysql_table1
      """.stripMargin)
  }
}

2.0 数据清洗-去重

  1. 复制表结构

    CREATE TABLE <new_table> like <ole_table>
    
  2. 插入去重后的数据

    insert overwrite table <new_table>(
    select t.name,t.score,t.type
    from(
    	select
        	name,score,type,row_number() over(distribute by name sort by score) as rn
        from <old_talbe>
      ) t where t.rn = 1
    );
    
  3. 总结

    insert overwrite table <new_table> (
    select <字段>
    from (
    select <字段>, row_number() over(distribute by <有重复的字段> sort by <重复字段的排列根据字段>) as rn
    from <old_table>
    ) t where t.rn6=1
    );
    

2.1 数据清洗-缺失字段填充

   |select t1.name,t1.rk from(
   |select *,NVL(comment,-1) rk from region )t1

3.0 离线数据统计

3.1 统计销售量Top5的数据

select *,
row_number() over(partition by subject order by score desc),--不并列排名
rank() over(partition by subject order by score desc),--并列空位排名
dense_rank() over(partition by subject order by score desc)--并列不空位
from score;
select * from 
(select *,row_number() over(partition by subject order by score desc) rmp from score
) t where t.rmp<=3;

3.2 统计某月的总销售额

select *,count(name) over() as total from business where substr(orderdate,1,7)='2017-04';

3.3 统计指定几个月的销售额

select *,sum(cost) over(partition by name,substr(orderdate,1,7)) total_amount from business;

3.4 保存到Mysql中

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("data_from_ods_to_mysql")
val spark=SparkSession.builder().config(sparkConf)
  .config("hive.metastore.uris", "thrift://192.168.1.101:9083")//指定hive的metastore服务地址
  .config("spark.sql.warehouse.dir", "hdfs://192.168.1.100:50070//usr/hive_remote/warehouse")//指定hive在hdfs上的warehouse仓库地址
  .enableHiveSupport()
  .getOrCreate()
spark.sql("use jsj")

val data = spark.sql("select * from hive_test")

data.show()
data.write
  .format("JDBC")
  .option("url", "jdbc:mysql://192.168.1.102:3306/student?useSSL=false")
      .option("driver", "com.mysql.jdbc.Driver")
      .option("user", "root")
      .option("password", "123456")
      .option("dbtable", "student2")
      .mode(SaveMode.Append)
      .save()
println("ok")

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
THE END
分享
二维码
< <上一篇
下一篇>>