Apache Spark 是最流行的创建分布式数据处理管道的框架之一,在本博客中,我们将介绍如何使用 Spark 并将 Redis 作为计算的数据存储库。Spark 的主要特点是,一个管道(Java、Scala、Python 或 R 脚本)既可以在本地(用于开发)运行,也可以在集群上运行,而无需更改任何源代码。
Spark 通过巧妙地使用延迟计算(在某些上下文中称为“惰性”)来实现这种灵活性。一切都始于 RDD、DataFrame 以及更新的 Dataset 类,它们各自是数据的分布式惰性表示。它们使用分布式文件系统、数据库或其他类似服务作为实际存储后端。它们的操作——例如 map/select、filter/where 和 reduce/groupBy——并不会真正执行计算。相反,每个操作都会向执行计划添加一个步骤,该计划最终在需要实际结果时(例如,尝试将其打印到屏幕上时)运行。
在本地启动脚本时,所有计算都在您的机器上进行。或者,当在分布式集群上启动时,您的数据会被分区到不同的节点;相同的操作(大部分)在 Spark 集群内并行发生。
随着时间的推移,Spark 开发了三种不同的 API 来处理分布式数据集。虽然每一次新增都比前一个增加了更多功能,但没有任何一个 API 能完全取代之前的 API。按照创建时间顺序(从旧到新),以下是概述:
更多详情,请参阅 Jules Damji 的文章“A Tale of Three Apache Spark APIs”。
spark-redis 是一个开源连接器,允许您使用 Redis 存储数据。
使用 Redis 作为后端的三个主要原因:
在本文中,我们将重点介绍 Python 入门以及如何使用 DataFrame API。撰写本文时,Scala(可视为 Spark 的“原生”语言)可以使用集成的一些更高级功能,例如 Redis RDD 和 Streams。由于 Scala 是一种 JVM 语言,因此 Java 也可以使用这些功能。对于 Python,我们需要坚持使用 DataFrames。
我们的第一步是使用 pip 安装 pyspark。您还需要在机器上安装 Java 8。
$ pip install pyspark
接下来,我们需要 Maven 来构建 spark-redis。您可以从官方网站获取它,或使用包管理器(例如 macOS 上的 homebrew)。
从 GitHub 下载 spark-redis(可以通过 git clone 或下载 zip),然后使用 Maven 构建它。
$ cd spark-redis
$ mvn clean package -DskipTests
在 target/ 子目录中,您将找到编译好的 jar 文件。
如果您还没有运行中的 Redis 服务器,您需要一个来连接。您可以通过多种方式下载它:从官方网站、包管理器(apt-get 或 brew install redis)或Docker Hub(悄悄地说,这可能是尝试 Redis Enterprise 的好时机)。
启动并运行 Redis 服务器后,您可以启动 pyspark。请注意,您需要更改 VERSION 以反映您从 GitHub 下载的版本。
$ pyspark –jars target/spark-redis-VERSION-jar-with-dependencies.jar
如果您的 Redis 服务器在容器中或启用了身份验证,请在之前的调用中添加这些开关(并根据您的情况更改值)。
–conf “spark.redis.host=localhost” –conf “spark.redis.port=6379” –conf “spark.redis.auth=passwd”
现在我们有了一个可以在 Redis 上存储数据的可工作的 pyspark shell,让我们来玩玩这个名人数据集。
下载 TSV 文件后,让我们将其加载为 Spark DataFrame。
>>> full_df = spark.read.csv("pantheon.tsv", sep="\t", quote="", header=True, inferSchema=True)
>>> full_df.dtypes
[('en_curid', 'int'), ('name', 'string'), ('numlangs', 'int'), ('birthcity', 'string'), ('birthstate', 'string'), ('countryName', 'string'), ('countryCode', 'string'), ('countryCode3', 'string'), ('LAT', 'double'), ('LON', 'double'), ('continentName', 'string'), ('birthyear', 'string'), ('gender', 'string'), ('occupation', 'string'), ('industry', 'string'), ('domain', 'string'), ('TotalPageViews', 'int'), ('L_star', 'double'), ('StdDevPageViews', 'double'), ('PageViewsEnglish', 'int'), ('PageViewsNonEnglish', 'int'), ('AverageViews', 'double'), ('HPI', 'double')]
现在,调用 .dtypes 会显示数据集中的所有列(及相关类型)的列表。这个数据集有很多有趣的东西可以探究,但出于本例的目的,我们重点关注找出每个国家名人中最常见的职业。
让我们首先只保留与我们的目标相关的列。
>>> data = full_df.select("en_curid", "countryCode", "occupation")
>>> data.show(2)
+--------+-----------+-----------+
|en_curid|countryCode| occupation|
+--------+-----------+-----------+
| 307| US| POLITICIAN|
| 308| GR|PHILOSOPHER|
+--------+-----------+-----------+
only showing top 2 rows
这将创建一个原始 DataFrame 的副本,该副本只包含三列:每个人的唯一 ID、他们的国家和他们的职业。
我们首先下载了一个小数据集用于这篇博客文章,但在现实生活中,如果您使用 Spark,数据集可能会大得多且托管在远程。因此,下一步我们将尝试通过将数据加载到 Redis 来使情况更真实一些
>>> data.write.format("org.apache.spark.sql.redis").option("table", "people").option("key.column", "en_curid").save()
此命令会将我们的数据集加载到 Redis 中。我们指定的两个选项有助于定义数据在 Redis 中的布局,我们现在就来看看。
让我们 잠깐 进入 redis-cli,看看我们的 DataFrame 在 Redis 上是如何存储的
$ redis-cli
> SCAN 0 MATCH people:* COUNT 3
1) "2048"
2) 1) "people:2113653"
2) "people:44849"
3) "people:399280"
4) "people:101393"
SCAN 显示了我们加载到 Redis 中的一些键。您可以立即看到我们之前给出的选项是如何用来定义键名的
让我们看看一个随机键的内容
> HGETALL people:2113653
1) "countryCode"
2) "DE"
3) "occupation"
4) "SOCCER PLAYER"
如您所见,我们的 DataFrame 的每一行都变成了一个Redis Hash,其中包含 countryCode 和 occupation。如前所述,en_curid 用作主键,因此它成为键名的一部分。
现在我们已经看到了数据在 Redis 上的存储方式,让我们回到 pyspark,看看如何实际编写一个管道来获取每个国家名人中最常见的职业。
即使数据应该还加载在内存中,我们还是从 Redis 加载它,以便编写更接近您在实际生活中会做的事情的代码。
>>> df = spark.read.format("org.apache.spark.sql.redis").option("table", "people").option("key.column", "en_curid").load()
>>> df.show(2)
+--------+-----------+----------+
|en_curid|countryCode|occupation|
+--------+-----------+----------+
| 915950| ZW| SWIMMER|
| 726159| UY|POLITICIAN|
+--------+-----------+----------+
only showing top 2 rows
这就是您的 Spark 管道的起点,所以让我们最终进行计算吧!
>>> counts = df.groupby("countryCode", "occupation").agg({"en_curid": "count"})
>>> counts.show(2)
+-----------+-------------+---------------+
|countryCode| occupation|count(en_curid)|
+-----------+-------------+---------------+
| FR|MATHEMATICIAN| 34|
| IT|SOCCER PLAYER| 81|
+-----------+-------------+---------------+
only showing top 2 rows
现在每一行都表示所有存在的(国家,职业)组合的计数。对于下一步,我们需要为每个国家只选择计数最高的职业。
让我们首先导入一些我们需要的新模块,然后使用窗口函数定义选择最常见职业的代码
>>> from pyspark.sql.window import Window
>>> from pyspark.sql.functions import count, col, row_number
>>> w = Window().partitionBy("countryCode").orderBy(col("count(en_curid)").desc())
>>> result = counts.withColumn("rn", row_number().over(w)).where(col("rn") == 1).select("countryCode", "occupation")
>>> result.show(5)
+-----------+-------------+
|countryCode| occupation|
+-----------+-------------+
| DZ| POLITICIAN|
| LT| POLITICIAN|
| MM| POLITICIAN|
| CI|SOCCER PLAYER|
| AZ| POLITICIAN|
+-----------+-------------+
only showing top 5 rows
这段代码按 countryCode 对原始行进行了分组,按 count(en_curid) 降序排列了每组的内容,并只取了第一个元素。正如您所见,在这个小样本中,政治家似乎是一个非常常见的职业。
让我们看看这对多少个国家是真实的
>>> result.where(col("occupation") == "POLITICIAN").count()
150
哇,考虑到目前世界上有 195 个国家,这数量真不少。现在,我们只需将剩余的国家保存在 Redis 中
>>> no_pol = result.where(col("occupation") != "POLITICIAN")
>>> no_pol.write.format("org.apache.spark.sql.redis").option("table", "occupation").option("key.column", "countryCode").save()
如果您现在进入 redis-cli,您将能够看到新数据
$ redis-cli
> HGETALL occupation:IT
1) "occupation"
2) "RELIGIOUS FIGURE"
> HGETALL occupation:US
1) "occupation"
2) "ACTOR"
如果您想进一步练习,可以检查原始数据集,看看是否能找到其他引起您兴趣的细节。
一个非常重要的、值得重申的点是,对 RDD 或 DataFrame/set 对象的每个操作都将在多个节点上分布式执行。如果我们的例子不仅仅是关于名人,我们一开始就会有数千万行数据。在这种情况下,Spark 会扩展计算。但如果您只有一个 Redis 实例到位,您将有 N 个节点同时请求它,很可能会瓶颈您的网络带宽。
为了充分利用 Redis,您需要使用 Redis Cluster API 进行适当的扩展。这将确保您的所有计算节点在读取时不会因饥饿而停滞,在写入时不会因阻塞而减速。
在本文中,我们探讨了如何下载、编译和部署 spark-redis,以便将 Redis 用作 Spark DataFrames 的后端。Redis 完全支持 DataFrame API,因此移植现有脚本并开始享受 Redis 带来的速度提升应该非常容易。如果您想了解更多信息,请查看 GitHub 上 spark-redis 的文档。