dot Redis 8 已发布 — 并且它是开源的

了解更多

Redis、Apache Spark 和 Python 入门

Apache Spark 是最流行的创建分布式数据处理管道的框架之一,在本博客中,我们将介绍如何使用 Spark 并将 Redis 作为计算的数据存储库。Spark 的主要特点是,一个管道(Java、Scala、Python 或 R 脚本)既可以在本地(用于开发)运行,也可以在集群上运行,而无需更改任何源代码。

Spark 通过巧妙地使用延迟计算(在某些上下文中称为“惰性”)来实现这种灵活性。一切都始于 RDD、DataFrame 以及更新的 Dataset 类,它们各自是数据的分布式惰性表示。它们使用分布式文件系统、数据库或其他类似服务作为实际存储后端。它们的操作——例如 map/select、filter/where 和 reduce/groupBy——并不会真正执行计算。相反,每个操作都会向执行计划添加一个步骤,该计划最终在需要实际结果时(例如,尝试将其打印到屏幕上时)运行。

在本地启动脚本时,所有计算都在您的机器上进行。或者,当在分布式集群上启动时,您的数据会被分区到不同的节点;相同的操作(大部分)在 Spark 集群内并行发生。

关于 RDD、DataFrame 和 Dataset

随着时间的推移,Spark 开发了三种不同的 API 来处理分布式数据集。虽然每一次新增都比前一个增加了更多功能,但没有任何一个 API 能完全取代之前的 API。按照创建时间顺序(从旧到新),以下是概述:

  • RDD 提供了低级方式来对数据应用编译时类型安全的操作。使用 RDD,您在代码中表达的是“如何”让事情发生,而不是一种更具声明性的方法。
  • DataFrame 引入了一种类似于 SQL 的方法来表达计算(它甚至支持实际的 SQL 查询)。它的声明性语法允许 Spark 构建优化的查询计划,从而通常比 RDD 代码更快。
  • Dataset 是 DataFrame 在 Java 虚拟机 (JVM) 语言方面的改进。它引入了 DataFrame 所缺乏的编译时类型安全,以及大大减少内存使用的优化行表示。由于动态语言(Python、R)的动态特性,它对这些语言没有实际作用,因此对于它们,您仍然会使用 DataFrame(与此同时,它在内部已被重新实现为 Dataset)。

更多详情,请参阅 Jules Damji 的文章“A Tale of Three Apache Spark APIs”。

关于 spark-redis

spark-redis 是一个开源连接器,允许您使用 Redis 存储数据。

使用 Redis 作为后端的三个主要原因:

  • DataFrame/set 和 Redis 特定的 RDD: spark-redis 实现了更通用的接口,以及暴露 Redis 闻名的数据结构的 RDD。这意味着您可以非常轻松地将现有脚本部署到 Redis 上,并在需要完全控制时利用 Redis 特定的功能。
  • Redis 集群: 连接器支持Redis 集群 API,并充分利用分片数据库,包括重新分片和故障转移。将数据放在 Redis 集群中将大大提高性能,因为您的管道将为您的数据启动多个消费者。
  • Redis Streams: Spark Streaming 与新的Redis Streams 数据结构完美匹配。Redis Streams 还使用消费者组,让您可以优雅地调整并行度级别。

在本文中,我们将重点介绍 Python 入门以及如何使用 DataFrame API。撰写本文时,Scala(可视为 Spark 的“原生”语言)可以使用集成的一些更高级功能,例如 Redis RDD 和 Streams。由于 Scala 是一种 JVM 语言,因此 Java 也可以使用这些功能。对于 Python,我们需要坚持使用 DataFrames。

设置

我们的第一步是使用 pip 安装 pyspark。您还需要在机器上安装 Java 8

$ pip install pyspark

接下来,我们需要 Maven 来构建 spark-redis。您可以从官方网站获取它,或使用包管理器(例如 macOS 上的 homebrew)。

从 GitHub 下载 spark-redis(可以通过 git clone 或下载 zip),然后使用 Maven 构建它。

$ cd spark-redis
$ mvn clean package -DskipTests

target/ 子目录中,您将找到编译好的 jar 文件。

如果您还没有运行中的 Redis 服务器,您需要一个来连接。您可以通过多种方式下载它:从官方网站、包管理器(apt-get 或  brew install redis)或Docker Hub(悄悄地说,这可能是尝试 Redis Enterprise 的好时机)。

启动并运行 Redis 服务器后,您可以启动 pyspark。请注意,您需要更改 VERSION 以反映您从 GitHub 下载的版本。

$ pyspark –jars target/spark-redis-VERSION-jar-with-dependencies.jar

如果您的 Redis 服务器在容器中或启用了身份验证,请在之前的调用中添加这些开关(并根据您的情况更改值)。

–conf “spark.redis.host=localhost” –conf “spark.redis.port=6379” –conf “spark.redis.auth=passwd”

使用示例数据集

现在我们有了一个可以在 Redis 上存储数据的可工作的 pyspark shell,让我们来玩玩这个名人数据集

入门

下载 TSV 文件后,让我们将其加载为 Spark DataFrame。

>>> full_df = spark.read.csv("pantheon.tsv", sep="\t", quote="", header=True, inferSchema=True)
>>> full_df.dtypes
[('en_curid', 'int'), ('name', 'string'), ('numlangs', 'int'), ('birthcity', 'string'), ('birthstate', 'string'), ('countryName', 'string'), ('countryCode', 'string'), ('countryCode3', 'string'), ('LAT', 'double'), ('LON', 'double'), ('continentName', 'string'), ('birthyear', 'string'), ('gender', 'string'), ('occupation', 'string'), ('industry', 'string'), ('domain', 'string'), ('TotalPageViews', 'int'), ('L_star', 'double'), ('StdDevPageViews', 'double'), ('PageViewsEnglish', 'int'), ('PageViewsNonEnglish', 'int'), ('AverageViews', 'double'), ('HPI', 'double')]

现在,调用 .dtypes 会显示数据集中的所有列(及相关类型)的列表。这个数据集有很多有趣的东西可以探究,但出于本例的目的,我们重点关注找出每个国家名人中最常见的职业

让我们首先只保留与我们的目标相关的列。

>>> data = full_df.select("en_curid", "countryCode", "occupation")
>>> data.show(2)
+--------+-----------+-----------+
|en_curid|countryCode| occupation|
+--------+-----------+-----------+
|     307|         US| POLITICIAN|
|     308|         GR|PHILOSOPHER|
+--------+-----------+-----------+
only showing top 2 rows

这将创建一个原始 DataFrame 的副本,该副本只包含三列:每个人的唯一 ID、他们的国家和他们的职业。

我们首先下载了一个小数据集用于这篇博客文章,但在现实生活中,如果您使用 Spark,数据集可能会大得多且托管在远程。因此,下一步我们将尝试通过将数据加载到 Redis 来使情况更真实一些

>>> data.write.format("org.apache.spark.sql.redis").option("table", "people").option("key.column", "en_curid").save()

此命令会将我们的数据集加载到 Redis 中。我们指定的两个选项有助于定义数据在 Redis 中的布局,我们现在就来看看。

Redis 上的 DataFrames

让我们 잠깐 进入 redis-cli,看看我们的 DataFrame 在 Redis 上是如何存储的

$ redis-cli
> SCAN 0 MATCH people:* COUNT 3
1) "2048"
2) 1) "people:2113653"
   2) "people:44849"
   3) "people:399280"
   4) "people:101393"

SCAN 显示了我们加载到 Redis 中的一些键。您可以立即看到我们之前给出的选项是如何用来定义键名的

  • “table”, “people” 定义了这个 DataFrame 键的通用前缀,以及
  • “key.column”, “en_curid” 定义了我们 DataFrame 的主键。

让我们看看一个随机键的内容

> HGETALL people:2113653
1) "countryCode"
2) "DE"
3) "occupation"
4) "SOCCER PLAYER"

如您所见,我们的 DataFrame 的每一行都变成了一个Redis Hash,其中包含 countryCodeoccupation。如前所述,en_curid 用作主键,因此它成为键名的一部分。

现在我们已经看到了数据在 Redis 上的存储方式,让我们回到 pyspark,看看如何实际编写一个管道来获取每个国家名人中最常见的职业。

从 Redis DataFrame 执行计算

即使数据应该还加载在内存中,我们还是从 Redis 加载它,以便编写更接近您在实际生活中会做的事情的代码。

>>> df = spark.read.format("org.apache.spark.sql.redis").option("table", "people").option("key.column", "en_curid").load()
>>> df.show(2)
+--------+-----------+----------+
|en_curid|countryCode|occupation|
+--------+-----------+----------+
|  915950|         ZW|   SWIMMER|
|  726159|         UY|POLITICIAN|
+--------+-----------+----------+
only showing top 2 rows

这就是您的 Spark 管道的起点,所以让我们最终进行计算吧!

>>> counts = df.groupby("countryCode", "occupation").agg({"en_curid": "count"})
>>> counts.show(2)
+-----------+-------------+---------------+
|countryCode|   occupation|count(en_curid)|
+-----------+-------------+---------------+
|         FR|MATHEMATICIAN|             34|
|         IT|SOCCER PLAYER|             81|
+-----------+-------------+---------------+
only showing top 2 rows

现在每一行都表示所有存在的(国家,职业)组合的计数。对于下一步,我们需要为每个国家只选择计数最高的职业。

让我们首先导入一些我们需要的新模块,然后使用窗口函数定义选择最常见职业的代码

>>> from pyspark.sql.window import Window
>>> from pyspark.sql.functions import count, col, row_number
>>> w = Window().partitionBy("countryCode").orderBy(col("count(en_curid)").desc())
>>> result = counts.withColumn("rn", row_number().over(w)).where(col("rn") == 1).select("countryCode", "occupation")
>>> result.show(5)
+-----------+-------------+
|countryCode|   occupation|
+-----------+-------------+
|         DZ|   POLITICIAN|
|         LT|   POLITICIAN|
|         MM|   POLITICIAN|
|         CI|SOCCER PLAYER|
|         AZ|   POLITICIAN|
+-----------+-------------+
only showing top 5 rows

这段代码按 countryCode 对原始行进行了分组,按 count(en_curid) 降序排列了每组的内容,并只取了第一个元素。正如您所见,在这个小样本中,政治家似乎是一个非常常见的职业。

让我们看看这对多少个国家是真实的

>>> result.where(col("occupation") == "POLITICIAN").count()
150

哇,考虑到目前世界上有 195 个国家,这数量真不少。现在,我们只需将剩余的国家保存在 Redis 中

>>> no_pol = result.where(col("occupation") != "POLITICIAN")
>>> no_pol.write.format("org.apache.spark.sql.redis").option("table", "occupation").option("key.column", "countryCode").save()

如果您现在进入 redis-cli,您将能够看到新数据

$ redis-cli
> HGETALL occupation:IT
1) "occupation"
2) "RELIGIOUS FIGURE"
> HGETALL occupation:US
1) "occupation"
2) "ACTOR"

如果您想进一步练习,可以检查原始数据集,看看是否能找到其他引起您兴趣的细节。

关于 Spark 数据类型和 Redis 集群 API 的后记

一个非常重要的、值得重申的点是,对 RDD 或 DataFrame/set 对象的每个操作都将在多个节点上分布式执行。如果我们的例子不仅仅是关于名人,我们一开始就会有数千万行数据。在这种情况下,Spark 会扩展计算。但如果您只有一个 Redis 实例到位,您将有 N 个节点同时请求它,很可能会瓶颈您的网络带宽。

为了充分利用 Redis,您需要使用 Redis Cluster API 进行适当的扩展。这将确保您的所有计算节点在读取时不会因饥饿而停滞,在写入时不会因阻塞而减速。

结论

在本文中,我们探讨了如何下载、编译和部署 spark-redis,以便将 Redis 用作 Spark DataFrames 的后端。Redis 完全支持 DataFrame API,因此移植现有脚本并开始享受 Redis 带来的速度提升应该非常容易。如果您想了解更多信息,请查看 GitHub 上 spark-redis 的文档