dot 快速的未来将来到您所在城市的活动中。

加入我们的 Redis 发布会

深入了解 Redis、Apache Spark 和 Python

Apache Spark 是用于创建分布式数据处理流水线的最受欢迎的框架之一,并且在本博文中,我们将说明如何使用 Spark 及 Redis 作为计算的数据存储库。Spark 的主要功能是,一条流水线(Java、Scala、Python 或 R 脚本)可以同时在本地(用于开发)和群集上运行,而无需更改任何源代码。

通过巧妙地使用延迟计算,或者正如在某些情况下所说的那样,使用惰性,Spark 可实现这种灵活性。一切从 RDD、DataFrame 和更新的数据集类开始,它们都是数据的分布式惰性表示。它们使用分布式文件系统、数据库或其他类似服务作为实际的存储后端。它们的操作(例如映射/选择、筛选/其中和缩减/分组)并不会真正让计算发生。相反,每个操作都会向执行计划添加一个步骤,该计划最终会在需要实际结果时运行(例如,在尝试将其打印到屏幕时)。

在本地启动脚本时,所有计算都将在你的机器上进行。交替地,在分布式群集上启动时,你的数据会分区到不同的节点;相同的操作(大部分)会并行发生在 Spark 群集内。

关于 RDD、DataFrame 和数据集

随着时间的推移,Spark 开发出三个不同的 API 来处理分布式数据集。虽然每个新添加都增加了比前一个更多功能,但没有一个 API 可以完全替代前面一个。按照创建顺序(从最早到最新),概述如下

  • RDD 提供了将编译时类型安全操作应用于数据的低级别方法。使用 RDD,你可以在代码中表达“如何”实现某些事情,而不是更具声明性的方法。
  • DataFrame 引入了一种 SQL 类似方法来表达计算(它甚至支持实际的 SQL 查询)。它的声明式语法允许 Spark 构建优化的查询计划,从而与 RDD 相比产生更快的代码。
  • 数据集 是针对 Java 虚拟机 (JVM) 语言对 DataFrame 的改进。它引入了 DataFrame 缺少的编译时类型安全,以及对行进行了优化的表示形式,极大地减少了内存使用量。它并没有真正为动态语言(Python、R)做任何事情,因为它们动态的特性,所以从这些语言你仍将使用 DataFrame(与此同时,它在内部重新实现为一个数据集)。

有关更多详细信息,请查看 Jules Damji 的“三个 Apache Spark API 的故事”。

关于 spark-redis

spark-redis 是一款开源连接器,允许你使用 Redis 来存储数据。

使用 Redis 作为后端的三个主要原因是

  • DataFrame/set 和 Redis 专用 RDD:spark-redis 不仅实现了更多通用用途的接口,还实现了 RDD,它公开了 Redis 以其著称的 数据结构 。这意味着你可以非常容易地在 Redis 上部署现有脚本,并在需要完全控制时使用 Redis 特定功能。
  • Redis 集群:此连接器遵循 Redis 集群 API,充分利用分片数据库,包括重新分片和故障转移。你将数据保存在 Redis 集群中,将会显著提升性能,因为你的管道将为你启动多方消费者以获取数据。
  • Redis 流: Spark 流 与新的 Redis 流 数据结构完美匹配。Redis 流还使用消费者群组,让你可以优雅地调整并行度级别。

在本文中,我们侧重于 Python 入门和 DataFrame API 使用方式。在撰写本文时,可以将 Scala 视为 Spark 的“原生”语言,它可以使用该集成的某些更高级功能,如 Redis RDD 和流。由于 Scala 是 JVM 语言,Java 可以扩展使用这些功能。使用 Python 时,我们需要坚持使用 DataFrame。

设置

我们的第一步是使用 pip 安装 pyspark。你还需要在计算机上安装 Java 8

$ pip install pyspark

接下来,我们需要 Maven 来构建 spark-redis。你可以从 官方网站 或使用包管理器获取它(例如 macOS 上的 homebrew)。

从 GitHub 下载 spark-redis(git clone 或下载为 zip 文件),然后使用 Maven 构建 spark-redis。

$ cd spark-redis
$ mvn clean package -DskipTests

你所在 target/ 子目录中,你将找到已编译的 jar 文件。

如果没有,你需要一台正在运行的 Redis 服务器连接。你可以通过多种方式下载 Redis:从 官方网站,包管理器 (apt-get 或  brew install redis),或 Docker Hub(嘘,现在可能是尝试 Redis Enterprise 的好时机)。

启动并运行之后,您可以启动 pyspark。请注意,您需要更改 VERSION,以反映从 GitHub 下载的版本。

$ pyspark –jars target/spark-redis-VERSION-jar-with-dependencies.jar

如果 Redis 服务器在容器中或已启用身份验证,请将这些开关添加到前面的调用(并更改这些值以适合您的情况)。

–conf “spark.redis.host=localhost” –conf “spark.redis.port=6379” –conf “spark.redis.auth=passwd”

使用示例数据集进行练习

现在,我们拥有能够将数据存储在 Redis 中、工作正常的 pyspark shell,让我们使用 这个著名人物数据集 进行练习。

入门

下载 TSV 文件之后,让我们将其加载为 Spark DataFrame。

>>> full_df = spark.read.csv("pantheon.tsv", sep="\t", quote="", header=True, inferSchema=True)
>>> full_df.dtypes
[('en_curid', 'int'), ('name', 'string'), ('numlangs', 'int'), ('birthcity', 'string'), ('birthstate', 'string'), ('countryName', 'string'), ('countryCode', 'string'), ('countryCode3', 'string'), ('LAT', 'double'), ('LON', 'double'), ('continentName', 'string'), ('birthyear', 'string'), ('gender', 'string'), ('occupation', 'string'), ('industry', 'string'), ('domain', 'string'), ('TotalPageViews', 'int'), ('L_star', 'double'), ('StdDevPageViews', 'double'), ('PageViewsEnglish', 'int'), ('PageViewsNonEnglish', 'int'), ('AverageViews', 'double'), ('HPI', 'double')]

现在,调用 .dtypes 将显示数据集中所有列(及相关类型)的列表。此数据集中有许多可能非常有趣的内容可以调查,但在此示例中,我们重点关注查找每个国家的著名人物中最常见的职业

首先,我们只保留与目标相关的列。

>>> data = full_df.select("en_curid", "countryCode", "occupation")
>>> data.show(2)
+--------+-----------+-----------+
|en_curid|countryCode| occupation|
+--------+-----------+-----------+
|     307|         US| POLITICIAN|
|     308|         GR|PHILOSOPHER|
+--------+-----------+-----------+
only showing top 2 rows

这将创建原始 DataFrame 的副本,该副本仅包含三列:每个人的唯一 ID、国家和职业。

我们首先为本文下载了一个小数据集,但实际上,如果您使用 Spark,数据集可能更大并托管在远程位置。为此,我们尝试在下一步中通过将数据加载到 Redis 使情况更具真实性。

>>> data.write.format("org.apache.spark.sql.redis").option("table", "people").option("key.column", "en_curid").save()

此命令会将我们的数据集 加载到 Redis 中。我们指定的两个选项有助于定义 Redis 中的数据布局,如下所示。

Redis 中的 DataFrame

让我们在 redis-cli 中跳转片刻,看看 DataFrame 在 Redis 中是如何存储的

$ redis-cli
> SCAN 0 MATCH people:* COUNT 3
1) "2048"
2) 1) "people:2113653"
   2) "people:44849"
   3) "people:399280"
   4) "people:101393"

SCAN 向我们展示了加载到 Redis 的一些键。您可以立即看到我们之前给出的选项是如何用于定义键名称的

  • “table”“people” 定义表示此 DataFrame 的键的公共前缀,并且
  • “key.column”“en_curid” 定义 DataFrame 的主键。

让我们看看随机键的内容

> HGETALL people:2113653
1) "countryCode"
2) "DE"
3) "occupation"
4) "SOCCER PLAYER"

如您所见,我们 DataFrame 的每一行都成为一个 Redis 哈希,其中包含 countryCodeoccupation。如前所述,en_curid 用作主键,因此它成为键名称的一部分。

现在我们已经看到数据是如何存储在 Redis 上的,让我们回到 pyspark 并看看我们如何编写一条管道,来获取每个国籍的名人的最常见职业。

从 Redis DataFrame 中执行计算

尽管我们应当仍在内存中加载数据,但让我们从 Redis 中加载数据,以便编写与你在现实生活中会采用的更为相似的代码。

>>> df = spark.read.format("org.apache.spark.sql.redis").option("table", "people").option("key.column", "en_curid").load()
>>> df.show(2)
+--------+-----------+----------+
|en_curid|countryCode|occupation|
+--------+-----------+----------+
|  915950|         ZW|   SWIMMER|
|  726159|         UY|POLITICIAN|
+--------+-----------+----------+
only showing top 2 rows

你的 Spark 管道将从此开始,让我们最终执行计算!

>>> counts = df.groupby("countryCode", "occupation").agg({"en_curid": "count"})
>>> counts.show(2)
+-----------+-------------+---------------+
|countryCode|   occupation|count(en_curid)|
+-----------+-------------+---------------+
|         FR|MATHEMATICIAN|             34|
|         IT|SOCCER PLAYER|             81|
+-----------+-------------+---------------+
only showing top 2 rows

现在每行表示所有现有的(国家,职业)组合的计数。在下一步中,我们需要为每个国家只选择计数最高的职业。

让我们从导入几个我们需要的模块开始,然后再使用 窗口 来定义选择出现频率最高的职业的代码

>>> from pyspark.sql.window import Window
>>> from pyspark.sql.functions import count, col, row_number
>>> w = Window().partitionBy("countryCode").orderBy(col("count(en_curid)").desc())
>>> result = counts.withColumn("rn", row_number().over(w)).where(col("rn") == 1).select("countryCode", "occupation")
>>> result.show(5)
+-----------+-------------+
|countryCode|   occupation|
+-----------+-------------+
|         DZ|   POLITICIAN|
|         LT|   POLITICIAN|
|         MM|   POLITICIAN|
|         CI|SOCCER PLAYER|
|         AZ|   POLITICIAN|
+-----------+-------------+
only showing top 5 rows

此代码按 countryCode 将原始行进行分组,按 count(en_curid)in 从高到低对每个组的内容进行排序,并只取第一个元素。如你所见,在此小型样本内,政治家似乎是一个很常见的职业。

让我们看看有多少个国家是这样

>>> result.where(col("occupation") == "POLITICIAN").count()
150

哇,这是一个很大的数字,考虑到现在世界上有 195 个国家。现在,我们只将剩余国家存储在 Redis 中

>>> no_pol = result.where(col("occupation") != "POLITICIAN")
>>> no_pol.write.format("org.apache.spark.sql.redis").option("table", "occupation").option("key.column", "countryCode").save()

如果你现在进入 redis-cli 中,你将能够看到新数据

$ redis-cli
> HGETALL occupation:IT
1) "occupation"
2) "RELIGIOUS FIGURE"
> HGETALL occupation:US
1) "occupation"
2) "ACTOR"

如果你想了解更多实践,请检查原始数据集,看看你是否能发现其他让你感兴趣的详情。

后记:Spark 数据类型以及 Redis 集群 API

值得重申的一个非常重要的点是,对 RDD 或 DataFrame/set 对象的每个操作都会在多台节点上进行分布。如果我们的示例不仅仅是关于名人,那么开始时我们将拥有数千万行。在那种情况下,Spark 将扩展计算规模。但是,如果你只有一个 Redis 实例,你将有N 个节点不停地攻击它,很可能会使你的网络带宽成为瓶颈。

为了充分利用 Redis,你需要使用 Redis 集群 API,对其进行适当的扩展。这将确保在读取时所有计算节点都不会陷入饥饿,在写入时不会发生阻塞。

结论

在这篇文章中,我们探讨了如何下载、编译和部署 spark-redis,以便使用 Redis 作为 Spark DataFrames 的后端。Redis 完全支持 DataFrame API,因此非常容易移植现有的脚本并开始享受 Redis 提供的更多速度。如果你想了解更多信息,请查看 在 GitHub 上关于 spark-redis 的文档