demo

统计不同电影类型的数量

import org.apache.spark.{SparkConf, SparkContext}

object rdd {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("yarn").setAppName("movies")
    val sc = new SparkContext(conf)

    val textRDD = sc.textFile("hdfs://hadoop201:8020/input/movies.csv")
    val header = textRDD.first()
    val moviesRDD = textRDD.filter(!_.equals(header)) //过滤头部
    moviesRDD.cache()
    val total_films_count = moviesRDD.count()
    println("电影总数:"+total_films_count)
    val movies_genre_count = moviesRDD.flatMap(line=>{
      line.substring(line.lastIndexOf(",")+1).split("\\|")
    }).map((_,1))
      .reduceByKey(_ + _).collect()

    movies_genre_count.foreach(println)

  }
}

统计评分最高的电影

package cn.edu.jit.spark

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{DataTypes, StructType}

object sparksqldemo {
  def main(args: Array[String]): Unit = {

    
    val spark = SparkSession
      .builder()
      .master("yarn")
      .appName("movies")
      .getOrCreate()

    val rating_schema = new StructType()
      .add("userId", DataTypes.IntegerType)
      .add("movieId",DataTypes.IntegerType)
      .add("rating",DataTypes.FloatType)
      .add("timestamp", DataTypes.LongType)

    val movie_schema = new StructType()
      .add("movieId",DataTypes.IntegerType)
      .add("title",DataTypes.StringType)
      .add("genre",DataTypes.StringType)

    val moviesDF = spark.read
      .option("header","true")
      .schema(movie_schema)
      .csv("hdfs://hadoop201:8020/input/movies.csv")

    val ratingsDF = spark.read
      .option("header","true")
      .schema(rating_schema)
      .csv("hdfs://hadoop201:8020/input/ratings.csv")

    // 找平均评分最高的10部电影 id 电影名 评分 要求评分人次大于1000一次
    ratingsDF.createOrReplaceTempView("ratings")
    moviesDF.createOrReplaceTempView("movies")

    val top10 = spark.sql(
      """
        |SELECT
        |r.movieId,m.title,avg(rating) avg_rating,count(*) rating_cnt
        |FROM ratings r JOIN movies m ON r.movieId = m.movieId
        |GROUP BY r.movieId, m.title
        |HAVING rating_cnt >= 1000
        |ORDER BY avg_rating DESC
        |limit 10
        |""".stripMargin)

   top10.show(truncate = false)
/*
这段代码是在使用Apache Spark SQL执行一条SQL查询语句,将结果存储在DataFrame对象top10中,并以表格形式打印出来。具体解释如下:
- `spark.sql`是SparkSession上的一个方法,用于执行给定的SQL查询。
- 该查询包含4个列:movieId、title、avg_rating和rating_cnt,其中,movieId和title分别来自movies表和ratings表,avg_rating表示每部电影的平均评分,rating_cnt表示评分次数。
- `JOIN`关键字用于将ratings表和movies表连接起来,其条件为它们的movieId相等。
- `GROUP BY`子句指示Spark对每个电影进行分组聚合操作,计算每部电影的平均评分和评分次数。
- `HAVING rating_cnt >= 1000`子句用于过滤掉评分次数少于1000次的电影。
- `ORDER BY avg_rating DESC`子句用于按照平均评分从高到低排序。
- `limit 10`子句限制结果集大小为10,只返回平均评分最高的前10部电影的信息。
- `top10.show(truncate = false)`方法用于以表格形式打印DataFrame top10中的结果,truncate参数设置为false表示不截取单元格中显示的内容,以便完全呈现所有结果。
*/

    val top_cnt =  spark.sql(
      """
        |SELECT
        |m.movieId,m.title,avg(r.rating) avg_rating,count(*) total_cnt
        |FROM ratings r JOIN movies m on r.movieId = m.movieId
        |GROUP BY m.movieId,m.title
        |ORDER BY total_cnt DESC
        |LIMIT 10
        |""".stripMargin)

    top_cnt.show(truncate = false)
/*
这段代码是在使用Apache Spark SQL执行一条SQL查询语句,将结果存储在DataFrame对象top_cnt中,然后以表格形式打印出来。具体解释如下:
- `spark.sql`是SparkSession上的一个方法,用于执行给定的SQL查询。
- SQL查询包含4个列:movieId、title、avg_rating和total_cnt。其中,movieId和title列来自movies表,avg_rating和total_cnt列来自ratings表。该查询的目的是计算每个电影的平均评分和评分次数(即记录总数),并仅返回评分次数最高的前10个电影。
- `JOIN`关键字用于将两个表连接起来,其条件为ratings表的movieId与movies表的movieId相等。
- GROUP BY子句指示Spark按照movieId和title进行聚合操作,并对每个聚合组计算其平均评分和评分次数。 
- `ORDER BY total_cnt DESC`子句按total_cnt列进行降序排序,以便将电影按照评分次数从高到低排列。
- `LIMIT 10`子句限制结果集的大小为10,只返回评分次数最高的10部电影的信息。
- `top_cnt.show(truncate = false)`方法用于打印DataFrame top_cnt中的结果,truncate参数设置为false表示不截取单元格中显示的内容,以便完全呈现所有结果。
*/

    spark.sql(
      """
        |SELECT
        |m.movieId,m.title,substring(substring(title,-6),2,4) movie_year,
        |avg(r.rating) avg_rating,count(*) total_cnt
        |FROM ratings r JOIN movies m ON r.movieId = m.movieId
        |GROUP BY m.movieId,m.title,movie_year
        |""".stripMargin).createOrReplaceTempView("t1")
/*
这段代码是在使用Apache Spark SQL执行一条SQL查询语句并将结果存储在一个Spark临时视图中,具体解释如下:

- `spark.sql`是SparkSession上的一个方法,用于执行给定的SQL查询。
- SQL查询包含5个列:movieId、title、movie_year、avg_rating和total_cnt。其中,movieId、title、avg_rating和total_cnt的含义与上一段代码中的相同,movie_year则是通过对title列进行处理得到的电影年份信息。
- `substring(substring(title,-6), 2, 4)`函数用于从title列中抽取出电影年份信息,具体来说,该函数连续调用了两次substring函数:
   - 第一次substring函数将title列的后6个字符(即电影名中括号内的4位年份数字和后面的")"字符)抽取出来。
   - 第二次substring函数则从第一步得到的字符串中,从第2个字符开始抽取4个字符,即抽取年份数字部分。
- `JOIN`、`GROUP BY`和`ORDER BY`的含义与上一段代码中的相同,只是在GROUP BY子句中增加了对movie_year列的聚合操作。
- `createOrReplaceTempView("t1")`将查询结果存储在一个名为"t1"的Spark临时视图中,以便后续对该结果进行进一步处理和分析。注意,该方法不返回任何数据,仅负责将查询结果与一个临时视图相关联。
*/


    spark.sql(
      """
        |SELECT
        |SUM(CASE movie_year WHEN 201 THEN 1 ELSE 0 END) YEAR10S,
        |SUM(CASE movie_year WHEN 200 THEN 1 ELSE 0 END) YEAR00S,
        |SUM(CASE movie_year WHEN 199 THEN 1 ELSE 0 END) YEAR90S
        |
        |FROM t1
        |""".stripMargin).show()
/*
这段代码是在使用Apache Spark SQL执行一条SQL查询语句,并将对临时视图"t1"中数据进行进一步聚合和分析。具体解释如下:
- `spark.sql`是SparkSession上的一个方法,用于执行给定的SQL查询。
- SQL查询包含3个列:YEAR10S、YEAR00S和YEAR90S,这些列是由三组`SUM(CASE ...)`语句计算出来的,表示电影年份为201x、200x和199x的电影总数。
  - `CASE ... WHEN ... THEN ... ELSE ... END`语句是一个条件表达式,用于根据movie_year列的值选择相应的路径,可以理解为IF-THEN-ELSE逻辑判断的简化版:
    - 当movie_year=201时,显式返回数字1。
    - 否则,返回数字0。
  - 将该条件表达式作为SUM函数的参数,即可实现对movie_year为某个特定值的电影数进行统计汇总的功能。
  - Sql查询不允许select语句¥¥现未在group by字句或聚合函数中的列, movie_year格式存在问题, 不符合算术运算, 所以需要对year号进行case操作.
- `FROM t1`指示Spark从名为"t1"的临时视图中获取数据,然后对其进行聚合操作。
- 最终输出结果通过`show()`函数打印。

注意,这段代码的执行依赖于之前定义的临时视图"t1",该视图中存储了关于电影的信息,其中包括每部电影的标题、平均评分和发行年份(已转换为3位数字)。如果在执行本段代码之前未建立该视图,则会导致运行时错误。*/
    spark.close()
  }
}
package jit

import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}

object UserReviewsByGenre {
  def main(args: Array[String]): Unit = {

    System.load("D:/Libs/hadoop-3.2.4/bin/hadoop.dll")

    val spark = SparkSession.builder
      .appName("ML")
      .master("local[*]")
      .getOrCreate()
    import spark.implicits._


    val ratingsDF = spark.read
      .option("header",true)
      .option("inferSchema", true)
      .csv("file:///D:/Data/ml-latest/ratings.csv")



    val moviesDF = spark.read
      .option("header",true)
      .option("inferSchema", true)
      .csv("file:///D:/Data/ml-latest/movies.csv")

    ratingsDF.createOrReplaceTempView("ratings")
    moviesDF.createOrReplaceTempView("movies")

    println("总数:",ratingsDF.select("userId").distinct().count())

    val validate_users = spark.sql(
      """
        |SELECT userId, COUNT(*) as total_count
        |FROM ratings
        |WHERE rating IN (1, 5)
        |GROUP BY userId
        |HAVING COUNT(*) > SUM(CASE WHEN rating IN (1, 5) THEN 1 ELSE 0 END) AND COUNT(*) > 10
        |""".stripMargin)

    validate_users.createOrReplaceTempView("validate_users")
/*
这是一个Spark SQL查询语句,用于验证用户是否为“不良评分者”。
具体来说,它会在名为“ratings”的表中查找将评分设置为1或5的所有评分,并根据每个用户对这些评分的数量进行分组和统计。
然后,对于每个用户,该查询将比较他们所有的评分总数与只包含1或5评分的数量之和。如果前者大于后者且评分总数超过10,则该用户被认为是“不良评分者”。
最终,查询将返回所有符合条件的用户的ID和对应的评分数量,存储在名为“validate_users”的变量中。
*/
    val user_ratings_category_df = spark.sql(
      """
        |SELECT
        |   userId,
        |    SUM(CASE WHEN genres LIKE '%Action%' THEN 1 ELSE 0 END) AS Action,
        |    SUM(CASE WHEN genres LIKE '%Adventure%' THEN 1 ELSE 0 END) AS Adventure,
        |    SUM(CASE WHEN genres LIKE '%Animation%' THEN 1 ELSE 0 END) AS Animation,
        |    SUM(CASE WHEN genres LIKE '%Children%' THEN 1 ELSE 0 END) AS Children,
        |    SUM(CASE WHEN genres LIKE '%Comedy%' THEN 1 ELSE 0 END) AS Comedy,
        |    SUM(CASE WHEN genres LIKE '%Crime%' THEN 1 ELSE 0 END) AS Crime,
        |    SUM(CASE WHEN genres LIKE '%Documentary%' THEN 1 ELSE 0 END) AS Documentary,
        |    SUM(CASE WHEN genres LIKE '%Drama%' THEN 1 ELSE 0 END) AS Drama,
        |    SUM(CASE WHEN genres LIKE '%Fantasy%' THEN 1 ELSE 0 END) AS Fantasy,
        |    SUM(CASE WHEN genres LIKE '%Film-Noir%' THEN 1 ELSE 0 END) AS FilmNoir,
        |    SUM(CASE WHEN genres LIKE '%Horror%' THEN 1 ELSE 0 END) AS Horror,
        |    SUM(CASE WHEN genres LIKE '%Musical%' THEN 1 ELSE 0 END) AS Musical,
        |    SUM(CASE WHEN genres LIKE '%Mystery%' THEN 1 ELSE 0 END) AS Mystery,
        |    SUM(CASE WHEN genres LIKE '%Romance%' THEN 1 ELSE 0 END) AS Romance,
        |    SUM(CASE WHEN genres LIKE '%Sci-Fi%' THEN 1 ELSE 0 END) AS SciFi,
        |    SUM(CASE WHEN genres LIKE '%Thriller%' THEN 1 ELSE 0 END) AS Thriller,
        |    SUM(CASE WHEN genres LIKE '%War%' THEN 1 ELSE 0 END) AS War,
        |    SUM(CASE WHEN genres LIKE '%Western%' THEN 1 ELSE 0 END) AS Western
        |FROM (
        |  SELECT explode(split(genres, "\\|")) AS genre, *
        |  FROM ratings r JOIN movies m ON r.movieId = m.movieId
        |) t
        |GROUP BY userId
        |""".stripMargin)

    user_ratings_category_df.show()
/*
这是一段 Scala 代码,用于使用 Apache Spark 计算用户对电影不同类型的评分总数。以下是代码的主要步骤:
1. 从 ratings 和 movies 表中选择电影和其评分信息
2. 按 | 分隔 genres 列并展开,以将一个电影的不同 genre 序列化为多个行。
3. 对所有的展开后的行按 userId 进行分组,并计算每个人给每个 genre 类型的电影的评分总数。

具体来说,代码使用了 Spark SQL 中的 API,主要实现步骤如下:
1.``SELECT`` 部分:对 userId 进行聚合,并使用``CASE WHEN`` 函数计算每种电影 genre 的评分总数。
2. 子查询块 ``FROM``:在电影和其评分表格"ratings"和"movies"之间执行一个内部联接操作``, JOIN movies m ON r.movieId = m.movieId``,连接条件是电影 ID。
3. 使用``explode()``函数拆分``genres``栏,生成新的列``genre``,包含展开的单个分类。
4. 聚合结果:将同一用户和所有 genre 的评分汇总在一起``GROUP BY userId`` ,统计总节数量(通过求和聚合函数)

最终结果将是一个 DataFrame,它显示了每个用户对每种电影类型的评分总数。
*/ 
    
    spark.sql(
      """
        |SELECT m.title, AVG(r.rating) AS avg_rating, VARIANCE(r.rating) AS rating_variance
        |FROM ratings r JOIN movies m ON r.movieId = m.movieId
        |GROUP BY m.title
        |HAVING COUNT(r.userId) > 100
        |ORDER BY rating_variance DESC
        |LIMIT 10;
        |""".stripMargin).show(truncate = false)
/*
这是一个Spark SQL查询语句,用于计算电影评分的平均值和方差。以下是该查询语句的解释:

- 通过将评分r和电影m连接在一起,使用JOIN指令来联结ratings表和movies表。使用ON指令来指定电影ID匹配。
- 使用GROUP BY指令将结果按电影标题进行分组。
- 利用AVG聚合函数计算每个电影的平均评分,并利用VARIANCE聚合函数计算每个电影评分的方差。
- COUNT函数用于过滤只有小于等于100人评分的电影;HAVING子句筛选掉低于100的电影数量
- 对电影按照评分方差降序排列,并用LIMIT限制结果集为前10项。

通过使用.show(truncate = false),输出结果显示时不截断行。
*/
    spark.sql(
      """
        |SELECT m.title,
        |       COUNT(r.rating) AS num_ratings,
        |       SUM(CASE WHEN r.rating = 1 THEN 1 ELSE 0 END) AS num_1_ratings,
        |       SUM(CASE WHEN r.rating = 2 THEN 1 ELSE 0 END) AS num_2_ratings,
        |       SUM(CASE WHEN r.rating = 3 THEN 1 ELSE 0 END) AS num_3_ratings,
        |       SUM(CASE WHEN r.rating = 4 THEN 1 ELSE 0 END) AS num_4_ratings,
        |       SUM(CASE WHEN r.rating = 5 THEN 1 ELSE 0 END) AS num_5_ratings,
        |       AVG(r.rating) AS avg_rating,
        |       SQRT(SUM(POWER(r.rating - t.avg_rating, 2)) / COUNT(r.rating)) AS std_dev
        |FROM ratings r
        |JOIN movies m ON r.movieId = m.movieId
        |JOIN (SELECT movieId, AVG(rating) AS avg_rating
        |      FROM ratings
        |      GROUP BY movieId
        |      HAVING COUNT(rating) >= 100) t
        |ON r.movieId = t.movieId
        |GROUP BY m.title
        |ORDER BY std_dev DESC
        |LIMIT 10;
        |
        |""".stripMargin).show(truncate = false)
/*
这是一个Spark SQL查询语句。其目的是分析电影评分数据集中每部电影的平均评分,以及评分标准差和各个评分等级所占的比例。具体来说,这个查询将评分数据集ratings和电影数据集movies进行了JOIN操作,并对评分数据进行聚合分析,得到了每部电影的平均评分、评分标准差、不同评分等级对应的评分人数。最后,按照评分标准差倒序排序,并显示前10条记录。
具体解释如下:

首先选取电影名m.title作为输出结果的列,并定义除此之外还有8个计算列,分别是num_ratings、num_1_ratings、num_2_ratings、num_3_ratings、num_4_ratings、num_5_ratings、avg_rating和std_dev。其中,num_ratings表示某一电影的评分数量,num_i_ratings表示给该电影打了 i 分的评分数量,而avg_rating则是该电影的平均评分。
这段代码的核心在于通过计算标准差std_dev来度量电影的评分分布。具体来说,std_dev等于所有评分离平均值的偏差平方和的平均数,再开方得到的结果。也就是公式中的sqrt( Σ(rating - avg_rating)^2 / count(rating) )。这里采用了一个子查询t,用来计算每个电影的平均评分avg_rating。注意,这里只选择评分记录数大于等于100次的电影进行计算,即HAVING COUNT(rating) >= 100。最后,按照std_dev从大到小排序,输出前10个电影的结果,其中truncate=false表示不要截断输出结果。
*/
    spark.close()
  }
}