spark Study
demo
统计不同电影类型的数量
import org.apache.spark.{SparkConf, SparkContext}
object rdd {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("yarn").setAppName("movies")
val sc = new SparkContext(conf)
val textRDD = sc.textFile("hdfs://hadoop201:8020/input/movies.csv")
val header = textRDD.first()
val moviesRDD = textRDD.filter(!_.equals(header)) //过滤头部
moviesRDD.cache()
val total_films_count = moviesRDD.count()
println("电影总数:"+total_films_count)
val movies_genre_count = moviesRDD.flatMap(line=>{
line.substring(line.lastIndexOf(",")+1).split("\\|")
}).map((_,1))
.reduceByKey(_ + _).collect()
movies_genre_count.foreach(println)
}
}
统计评分最高的电影
package cn.edu.jit.spark
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{DataTypes, StructType}
object sparksqldemo {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.master("yarn")
.appName("movies")
.getOrCreate()
val rating_schema = new StructType()
.add("userId", DataTypes.IntegerType)
.add("movieId",DataTypes.IntegerType)
.add("rating",DataTypes.FloatType)
.add("timestamp", DataTypes.LongType)
val movie_schema = new StructType()
.add("movieId",DataTypes.IntegerType)
.add("title",DataTypes.StringType)
.add("genre",DataTypes.StringType)
val moviesDF = spark.read
.option("header","true")
.schema(movie_schema)
.csv("hdfs://hadoop201:8020/input/movies.csv")
val ratingsDF = spark.read
.option("header","true")
.schema(rating_schema)
.csv("hdfs://hadoop201:8020/input/ratings.csv")
// 找平均评分最高的10部电影 id 电影名 评分 要求评分人次大于1000一次
ratingsDF.createOrReplaceTempView("ratings")
moviesDF.createOrReplaceTempView("movies")
val top10 = spark.sql(
"""
|SELECT
|r.movieId,m.title,avg(rating) avg_rating,count(*) rating_cnt
|FROM ratings r JOIN movies m ON r.movieId = m.movieId
|GROUP BY r.movieId, m.title
|HAVING rating_cnt >= 1000
|ORDER BY avg_rating DESC
|limit 10
|""".stripMargin)
top10.show(truncate = false)
/*
这段代码是在使用Apache Spark SQL执行一条SQL查询语句,将结果存储在DataFrame对象top10中,并以表格形式打印出来。具体解释如下:
- `spark.sql`是SparkSession上的一个方法,用于执行给定的SQL查询。
- 该查询包含4个列:movieId、title、avg_rating和rating_cnt,其中,movieId和title分别来自movies表和ratings表,avg_rating表示每部电影的平均评分,rating_cnt表示评分次数。
- `JOIN`关键字用于将ratings表和movies表连接起来,其条件为它们的movieId相等。
- `GROUP BY`子句指示Spark对每个电影进行分组聚合操作,计算每部电影的平均评分和评分次数。
- `HAVING rating_cnt >= 1000`子句用于过滤掉评分次数少于1000次的电影。
- `ORDER BY avg_rating DESC`子句用于按照平均评分从高到低排序。
- `limit 10`子句限制结果集大小为10,只返回平均评分最高的前10部电影的信息。
- `top10.show(truncate = false)`方法用于以表格形式打印DataFrame top10中的结果,truncate参数设置为false表示不截取单元格中显示的内容,以便完全呈现所有结果。
*/
val top_cnt = spark.sql(
"""
|SELECT
|m.movieId,m.title,avg(r.rating) avg_rating,count(*) total_cnt
|FROM ratings r JOIN movies m on r.movieId = m.movieId
|GROUP BY m.movieId,m.title
|ORDER BY total_cnt DESC
|LIMIT 10
|""".stripMargin)
top_cnt.show(truncate = false)
/*
这段代码是在使用Apache Spark SQL执行一条SQL查询语句,将结果存储在DataFrame对象top_cnt中,然后以表格形式打印出来。具体解释如下:
- `spark.sql`是SparkSession上的一个方法,用于执行给定的SQL查询。
- SQL查询包含4个列:movieId、title、avg_rating和total_cnt。其中,movieId和title列来自movies表,avg_rating和total_cnt列来自ratings表。该查询的目的是计算每个电影的平均评分和评分次数(即记录总数),并仅返回评分次数最高的前10个电影。
- `JOIN`关键字用于将两个表连接起来,其条件为ratings表的movieId与movies表的movieId相等。
- GROUP BY子句指示Spark按照movieId和title进行聚合操作,并对每个聚合组计算其平均评分和评分次数。
- `ORDER BY total_cnt DESC`子句按total_cnt列进行降序排序,以便将电影按照评分次数从高到低排列。
- `LIMIT 10`子句限制结果集的大小为10,只返回评分次数最高的10部电影的信息。
- `top_cnt.show(truncate = false)`方法用于打印DataFrame top_cnt中的结果,truncate参数设置为false表示不截取单元格中显示的内容,以便完全呈现所有结果。
*/
spark.sql(
"""
|SELECT
|m.movieId,m.title,substring(substring(title,-6),2,4) movie_year,
|avg(r.rating) avg_rating,count(*) total_cnt
|FROM ratings r JOIN movies m ON r.movieId = m.movieId
|GROUP BY m.movieId,m.title,movie_year
|""".stripMargin).createOrReplaceTempView("t1")
/*
这段代码是在使用Apache Spark SQL执行一条SQL查询语句并将结果存储在一个Spark临时视图中,具体解释如下:
- `spark.sql`是SparkSession上的一个方法,用于执行给定的SQL查询。
- SQL查询包含5个列:movieId、title、movie_year、avg_rating和total_cnt。其中,movieId、title、avg_rating和total_cnt的含义与上一段代码中的相同,movie_year则是通过对title列进行处理得到的电影年份信息。
- `substring(substring(title,-6), 2, 4)`函数用于从title列中抽取出电影年份信息,具体来说,该函数连续调用了两次substring函数:
- 第一次substring函数将title列的后6个字符(即电影名中括号内的4位年份数字和后面的")"字符)抽取出来。
- 第二次substring函数则从第一步得到的字符串中,从第2个字符开始抽取4个字符,即抽取年份数字部分。
- `JOIN`、`GROUP BY`和`ORDER BY`的含义与上一段代码中的相同,只是在GROUP BY子句中增加了对movie_year列的聚合操作。
- `createOrReplaceTempView("t1")`将查询结果存储在一个名为"t1"的Spark临时视图中,以便后续对该结果进行进一步处理和分析。注意,该方法不返回任何数据,仅负责将查询结果与一个临时视图相关联。
*/
spark.sql(
"""
|SELECT
|SUM(CASE movie_year WHEN 201 THEN 1 ELSE 0 END) YEAR10S,
|SUM(CASE movie_year WHEN 200 THEN 1 ELSE 0 END) YEAR00S,
|SUM(CASE movie_year WHEN 199 THEN 1 ELSE 0 END) YEAR90S
|
|FROM t1
|""".stripMargin).show()
/*
这段代码是在使用Apache Spark SQL执行一条SQL查询语句,并将对临时视图"t1"中数据进行进一步聚合和分析。具体解释如下:
- `spark.sql`是SparkSession上的一个方法,用于执行给定的SQL查询。
- SQL查询包含3个列:YEAR10S、YEAR00S和YEAR90S,这些列是由三组`SUM(CASE ...)`语句计算出来的,表示电影年份为201x、200x和199x的电影总数。
- `CASE ... WHEN ... THEN ... ELSE ... END`语句是一个条件表达式,用于根据movie_year列的值选择相应的路径,可以理解为IF-THEN-ELSE逻辑判断的简化版:
- 当movie_year=201时,显式返回数字1。
- 否则,返回数字0。
- 将该条件表达式作为SUM函数的参数,即可实现对movie_year为某个特定值的电影数进行统计汇总的功能。
- Sql查询不允许select语句¥¥现未在group by字句或聚合函数中的列, movie_year格式存在问题, 不符合算术运算, 所以需要对year号进行case操作.
- `FROM t1`指示Spark从名为"t1"的临时视图中获取数据,然后对其进行聚合操作。
- 最终输出结果通过`show()`函数打印。
注意,这段代码的执行依赖于之前定义的临时视图"t1",该视图中存储了关于电影的信息,其中包括每部电影的标题、平均评分和发行年份(已转换为3位数字)。如果在执行本段代码之前未建立该视图,则会导致运行时错误。*/
spark.close()
}
}
package jit
import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}
object UserReviewsByGenre {
def main(args: Array[String]): Unit = {
System.load("D:/Libs/hadoop-3.2.4/bin/hadoop.dll")
val spark = SparkSession.builder
.appName("ML")
.master("local[*]")
.getOrCreate()
import spark.implicits._
val ratingsDF = spark.read
.option("header",true)
.option("inferSchema", true)
.csv("file:///D:/Data/ml-latest/ratings.csv")
val moviesDF = spark.read
.option("header",true)
.option("inferSchema", true)
.csv("file:///D:/Data/ml-latest/movies.csv")
ratingsDF.createOrReplaceTempView("ratings")
moviesDF.createOrReplaceTempView("movies")
println("总数:",ratingsDF.select("userId").distinct().count())
val validate_users = spark.sql(
"""
|SELECT userId, COUNT(*) as total_count
|FROM ratings
|WHERE rating IN (1, 5)
|GROUP BY userId
|HAVING COUNT(*) > SUM(CASE WHEN rating IN (1, 5) THEN 1 ELSE 0 END) AND COUNT(*) > 10
|""".stripMargin)
validate_users.createOrReplaceTempView("validate_users")
/*
这是一个Spark SQL查询语句,用于验证用户是否为“不良评分者”。
具体来说,它会在名为“ratings”的表中查找将评分设置为1或5的所有评分,并根据每个用户对这些评分的数量进行分组和统计。
然后,对于每个用户,该查询将比较他们所有的评分总数与只包含1或5评分的数量之和。如果前者大于后者且评分总数超过10,则该用户被认为是“不良评分者”。
最终,查询将返回所有符合条件的用户的ID和对应的评分数量,存储在名为“validate_users”的变量中。
*/
val user_ratings_category_df = spark.sql(
"""
|SELECT
| userId,
| SUM(CASE WHEN genres LIKE '%Action%' THEN 1 ELSE 0 END) AS Action,
| SUM(CASE WHEN genres LIKE '%Adventure%' THEN 1 ELSE 0 END) AS Adventure,
| SUM(CASE WHEN genres LIKE '%Animation%' THEN 1 ELSE 0 END) AS Animation,
| SUM(CASE WHEN genres LIKE '%Children%' THEN 1 ELSE 0 END) AS Children,
| SUM(CASE WHEN genres LIKE '%Comedy%' THEN 1 ELSE 0 END) AS Comedy,
| SUM(CASE WHEN genres LIKE '%Crime%' THEN 1 ELSE 0 END) AS Crime,
| SUM(CASE WHEN genres LIKE '%Documentary%' THEN 1 ELSE 0 END) AS Documentary,
| SUM(CASE WHEN genres LIKE '%Drama%' THEN 1 ELSE 0 END) AS Drama,
| SUM(CASE WHEN genres LIKE '%Fantasy%' THEN 1 ELSE 0 END) AS Fantasy,
| SUM(CASE WHEN genres LIKE '%Film-Noir%' THEN 1 ELSE 0 END) AS FilmNoir,
| SUM(CASE WHEN genres LIKE '%Horror%' THEN 1 ELSE 0 END) AS Horror,
| SUM(CASE WHEN genres LIKE '%Musical%' THEN 1 ELSE 0 END) AS Musical,
| SUM(CASE WHEN genres LIKE '%Mystery%' THEN 1 ELSE 0 END) AS Mystery,
| SUM(CASE WHEN genres LIKE '%Romance%' THEN 1 ELSE 0 END) AS Romance,
| SUM(CASE WHEN genres LIKE '%Sci-Fi%' THEN 1 ELSE 0 END) AS SciFi,
| SUM(CASE WHEN genres LIKE '%Thriller%' THEN 1 ELSE 0 END) AS Thriller,
| SUM(CASE WHEN genres LIKE '%War%' THEN 1 ELSE 0 END) AS War,
| SUM(CASE WHEN genres LIKE '%Western%' THEN 1 ELSE 0 END) AS Western
|FROM (
| SELECT explode(split(genres, "\\|")) AS genre, *
| FROM ratings r JOIN movies m ON r.movieId = m.movieId
|) t
|GROUP BY userId
|""".stripMargin)
user_ratings_category_df.show()
/*
这是一段 Scala 代码,用于使用 Apache Spark 计算用户对电影不同类型的评分总数。以下是代码的主要步骤:
1. 从 ratings 和 movies 表中选择电影和其评分信息
2. 按 | 分隔 genres 列并展开,以将一个电影的不同 genre 序列化为多个行。
3. 对所有的展开后的行按 userId 进行分组,并计算每个人给每个 genre 类型的电影的评分总数。
具体来说,代码使用了 Spark SQL 中的 API,主要实现步骤如下:
1.``SELECT`` 部分:对 userId 进行聚合,并使用``CASE WHEN`` 函数计算每种电影 genre 的评分总数。
2. 子查询块 ``FROM``:在电影和其评分表格"ratings"和"movies"之间执行一个内部联接操作``, JOIN movies m ON r.movieId = m.movieId``,连接条件是电影 ID。
3. 使用``explode()``函数拆分``genres``栏,生成新的列``genre``,包含展开的单个分类。
4. 聚合结果:将同一用户和所有 genre 的评分汇总在一起``GROUP BY userId`` ,统计总节数量(通过求和聚合函数)
最终结果将是一个 DataFrame,它显示了每个用户对每种电影类型的评分总数。
*/
spark.sql(
"""
|SELECT m.title, AVG(r.rating) AS avg_rating, VARIANCE(r.rating) AS rating_variance
|FROM ratings r JOIN movies m ON r.movieId = m.movieId
|GROUP BY m.title
|HAVING COUNT(r.userId) > 100
|ORDER BY rating_variance DESC
|LIMIT 10;
|""".stripMargin).show(truncate = false)
/*
这是一个Spark SQL查询语句,用于计算电影评分的平均值和方差。以下是该查询语句的解释:
- 通过将评分r和电影m连接在一起,使用JOIN指令来联结ratings表和movies表。使用ON指令来指定电影ID匹配。
- 使用GROUP BY指令将结果按电影标题进行分组。
- 利用AVG聚合函数计算每个电影的平均评分,并利用VARIANCE聚合函数计算每个电影评分的方差。
- COUNT函数用于过滤只有小于等于100人评分的电影;HAVING子句筛选掉低于100的电影数量
- 对电影按照评分方差降序排列,并用LIMIT限制结果集为前10项。
通过使用.show(truncate = false),输出结果显示时不截断行。
*/
spark.sql(
"""
|SELECT m.title,
| COUNT(r.rating) AS num_ratings,
| SUM(CASE WHEN r.rating = 1 THEN 1 ELSE 0 END) AS num_1_ratings,
| SUM(CASE WHEN r.rating = 2 THEN 1 ELSE 0 END) AS num_2_ratings,
| SUM(CASE WHEN r.rating = 3 THEN 1 ELSE 0 END) AS num_3_ratings,
| SUM(CASE WHEN r.rating = 4 THEN 1 ELSE 0 END) AS num_4_ratings,
| SUM(CASE WHEN r.rating = 5 THEN 1 ELSE 0 END) AS num_5_ratings,
| AVG(r.rating) AS avg_rating,
| SQRT(SUM(POWER(r.rating - t.avg_rating, 2)) / COUNT(r.rating)) AS std_dev
|FROM ratings r
|JOIN movies m ON r.movieId = m.movieId
|JOIN (SELECT movieId, AVG(rating) AS avg_rating
| FROM ratings
| GROUP BY movieId
| HAVING COUNT(rating) >= 100) t
|ON r.movieId = t.movieId
|GROUP BY m.title
|ORDER BY std_dev DESC
|LIMIT 10;
|
|""".stripMargin).show(truncate = false)
/*
这是一个Spark SQL查询语句。其目的是分析电影评分数据集中每部电影的平均评分,以及评分标准差和各个评分等级所占的比例。具体来说,这个查询将评分数据集ratings和电影数据集movies进行了JOIN操作,并对评分数据进行聚合分析,得到了每部电影的平均评分、评分标准差、不同评分等级对应的评分人数。最后,按照评分标准差倒序排序,并显示前10条记录。
具体解释如下:
首先选取电影名m.title作为输出结果的列,并定义除此之外还有8个计算列,分别是num_ratings、num_1_ratings、num_2_ratings、num_3_ratings、num_4_ratings、num_5_ratings、avg_rating和std_dev。其中,num_ratings表示某一电影的评分数量,num_i_ratings表示给该电影打了 i 分的评分数量,而avg_rating则是该电影的平均评分。
这段代码的核心在于通过计算标准差std_dev来度量电影的评分分布。具体来说,std_dev等于所有评分离平均值的偏差平方和的平均数,再开方得到的结果。也就是公式中的sqrt( Σ(rating - avg_rating)^2 / count(rating) )。这里采用了一个子查询t,用来计算每个电影的平均评分avg_rating。注意,这里只选择评分记录数大于等于100次的电影进行计算,即HAVING COUNT(rating) >= 100。最后,按照std_dev从大到小排序,输出前10个电影的结果,其中truncate=false表示不要截断输出结果。
*/
spark.close()
}
}
All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.
Comment