dot Redis 8 来了——而且是开源的

了解更多

连接 Spark 和 Redis:详细了解

Connecting Spark and Redis

github 上的 spark-redis 包 是我们在 Spark-Redis 之旅中的第一步1。 Spark 已经抓住了公众对于大数据实时可能性的想象,我们1 希望能够为实现这一可能性做出贡献。

spark-redis 包是 Apache Spark 的 Redis 连接器,它提供对 Redis 所有核心数据结构 (RcDS) 的读写访问,作为 RDD(弹性分布式数据集,不要与 RDB 混淆)。

我认为对新的连接器进行测试运行并展示它的一些功能会很有趣。 以下是我的旅程日志。 首先要做的事情……

设置

在实际使用 spark-redis 之前,您需要一些先决条件,即:Apache SparkScalaJedisRedis。 虽然该软件包明确说明了每个组件的版本要求,但我实际上使用了更高版本,没有明显的副作用(分别为 v1.5.2、v2.11.7、v2.8 和不稳定版)。

我尝试让所有这些工作花费了很多时间。 在我完成所有设置之后,我的朋友和 Redis 同行 Tim Spann @PaaSDev“设置独立的 Apache Spark 集群” 上发布了一步一步的教程,在 @DZone 上。 如果您像我一样使用 Ubuntu,这应该可以帮助您解决最棘手的部分。

满足所有要求后,您可以直接 git clone https://github.com/RedisLabs/spark-redis,通过运行 sbt 构建它(哦,是的,也安装它),或者直接使用来自 spark-packages.org () 的软件包,您应该可以开始了……但到底要去哪里呢?

任务说明

由于这是一个教育练习,我需要一个可以用高级有向无环图引擎和最快的 NoSQL 数据结构存储解决的问题。 经过 广泛研究 后,我设法确定了当代数据科学中可能存在的最大挑战 - 单词计数。 由于单词计数挑战是 Spark 核心中事实上的“Hello, World!” 等价物,因此我选择使用它作为初步探索的基础,并了解如何将其调整为与 Redis 一起使用。

读取数据

当然,计数单词时首先需要的是单词。 作为一心一意的人,我决定计算 Redis 源代码文件中的单词(这个特定的提交),也希望在此过程中揭示一些有趣的数据科学事实。 一切准备就绪后,我开始进入 spark-shell

itamar@ubuntu:~/src$ ./spark-1.5.2/bin/spark-shell --jars spark-redis/target/spark-redis-0.5.1.jar,jedis/target/jedis-2.8.0.jar
Spark context available as sc.
SQL context available as sqlContext.
Welcome to
____              __
/ __/__  ___ _____/ /__
_ / _ / _ `/ __/  '_/
/___/ .__/_,_/_/ /_/_   version 1.5.2
/_/
Using Scala version 2.11.7 (OpenJDK 64-Bit Server VM, Java 1.7.0_91)
Type in expressions to have them evaluated.
Type :help for more information.
scala>

闪烁的光标意味着世界已经为我的第一行 Scala 做好了准备,所以我输入了

scala> val wtext = sc.wholeTextFiles("redis/src/*.[ch]")
wtext: org.apache.spark.rdd.RDD[(String, String)] = redis/src/*.[ch]
WholeTextFileRDD[0] at wholeTextFiles at :24
scala> wtext.count
res0: Long = 100

太棒了! 我几乎才刚开始,数据科学就证明是有用的:正好有 100 个 Redis 源文件! 当然,从 shell 提示符执行

ls -1 redis.src/*.[ch] | wc -l
可能会告诉我同样的事情,但通过这种方式,我实际上可以看到我的独立 Spark 集群在 WholeTextFileRDD 上完成的作业的各个阶段。 真的很酷。

在我的成功鼓励下,我赶紧将文件的内容转换为单词(稍后可以计数)。 与使用 TextFileRDD 的常见示例不同,WholeTextFilesRDD 由文件 URL 及其内容组成,因此事实证明以下代码段完成了拆分和清理数据所需的工作(对 cache() 方法的调用是严格可选的,但我尝试遵循最佳实践,并希望稍后再次使用该 RDD)。

val fwds = wtext.
flatMap{ case (filename, contents) =>
val fname = filename.substring(filename.lastIndexOf("/") + 1)
contents.
split("W+").
filter(!_.isEmpty).
map( word => (fname, word))
}
fwds.cache()

关于变量名:我喜欢它们有意义且简短,因此自然地,wtf 表示 WholeTextFiles,fwds 表示 FileWords,依此类推。

一旦 fwds RDD 有了干净的文件名并且所有单词都被整齐地拆分,我就开始进行一些严肃的计数了。 首先,我重新创建了无处不在的单词计数示例

val wcnts = fwds.
map{ case (fname, word) => (word, 1) }.
reduceByKey(_ + _).
map{ case (word, count) => (word, count.toString) }

将以上内容粘贴到 spark-shell 中,然后 take 确认成功

wcnts: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[5] at map at :31

 

scala> wcnts.take(10)
res1: Array[(String, String)] = Array((requirepass,15), (mixdigest,2), (propagte,1), (used_cpu_sys,1), (rioFdsetRead,2), (0x3e13,1), (preventing,1), (been,12), (modifies,1), (geoArrayCreate,3))
scala> wcnts.count()
res2: Long = 12657

 

关于结果的说明:take 不应该是确定性的,但考虑到“requirepass”最近不断浮出水面,这可能是致命的。 此外,12657 必须有一些意义,但我还没有找到。

将 RDD 写入 Redis

现在是最有趣的部分,即 Redis。 我想确保结果存储在安全的地方(就像我的非持久化、未绑定、无密码的 Redis 服务器;))这样我就可以在以后的计算中使用它们。 Redis 的排序集合与单词计数对完美匹配,并且还可以让我像我习惯的那样查询数据。 只需一行 Scala 代码即可完成此操作(实际上是三行,但前两行不算在内)

import com.redis.provider.redis._
val redisDB = ("127.0.0.1", 6379)
sc.toRedisZSET(wcnts, "all:words", redisDB)

在我可以方便地检查它的数据就绪后,我启动了 cli 并做了一些快速读取

itamar@ubuntu:~/src$ ./redis/src/redis-cli
127.0.0.1:6379> DBSIZE
(integer) 1
127.0.0.1:6379> ZCARD all:words
(integer) 12657
127.0.0.1:6379> ZSCORE all:words requirepass
"15"
127.0.0.1:6379> ZREVRANGE all:words 0 4 WITHSCORES
1) "the"
2) "8164"
3) "if"
4) "6657"
5) "0"
6) "5396"
7) "c"
8) "4524"
9) "1"
10) "4293"
127.0.0.1:6379> ZRANGE all:words 6378 6379
1) "mbl"
2) "mblen"

不错! 我还能在 Redis 中保存什么? 当然,一切。 文件名本身是完美的候选对象,因此我制作了另一个 RDD 并将其存储在常规集合中

val fnames = fwds.
map{ case (fname, word) => fname }.distinct()
sc.toRedisSET(fnames, "all:files", redisDB)

尽管 contents fnames Set 对于科学目的非常有用,但它非常平凡,我想要更多的东西……那么将每个文件的单词计数存储在其自己的排序集合中怎么样? 通过一些转换/操作/RDD,我能够做到这一点

fwds.
groupByKey.
collect.
foreach{ case(fname, contents) =>
val zsetcontents = contents.
groupBy( word => word ).
map{ case(word, list) => (word, list.size.toString) }.
toArray
sc.toRedisZSET(sc.parallelize(zsetcontents), "file:" + fname, redisDB)
}

返回 redis-cli

127.0.0.1:6379> dbsize
(integer) 102
127.0.0.1:6379> ZREVRANGE file:scripting.c 0 4 WITHSCORES
1) "lua"
2) "366"
3) "the"
4) "341"
5) "if"
6) "227"
7) "1"
8) "217"
9) "0"
10) "197"

从 Redis 读取 RDD

我可以整晚跳舞(存储单词计数数据),但如果您只写而不读,那么最好使用 spark-/dev/null 连接器。 因此,为了实际使用 Redis 中的数据,我运行了以下代码,该代码获取每个文件的单词计数并将其简化为与经典 WC 挑战基本相同的输出

val rwcnts = sc.fromRedisKeyPattern(redisDB, "file:*").
getZSet().
map{ case (member, count) => (member, count.toFloat.toInt) }.
reduceByKey(_ + _)

然后返回 spark-shell 来测试此代码并获得所有单词的总数

scala> rwcnts.count()
res8: Long = 12657
scala> val total = rwcnts.aggregate(0)(
| (acc, value) => acc + value._2,
| (acc1, acc2) => acc1 + acc2)
total: Int = 27265

为了总结,我不能仅仅认为 Spark 的结果是理所当然的,所以我使用 Lua 脚本进行了仔细检查

local tot1, tot2, cursor = 0, 0, 0
repeat
local rep1 = redis.call('SCAN', cursor, 'MATCH', 'file:*')
cursor = tonumber(rep1[1])
for _, ssk in pairs(rep1[2]) do
local rep2 = redis.call('ZRANGE', ssk, 0, -1, 'WITHSCORES')
for i = 2, #rep2, 2 do
tot1 = tot1 + tonumber(rep2[i])
end
end
until cursor == 0
local rep = redis.call('ZRANGE', 'all:words', 0, -1, 'WITHSCORES')
for i = 2, #rep, 2 do
tot2 = tot2 + tonumber(rep[i])
end
return { tot1, tot2 } itamar@ubuntu:~/src$ ./redis/src/redis-cli --eval /tmp/wordcount.lua 1) (integer) 272655 2) (integer) 272655

结束语

Good dags. D'ya like dags?

早些时候,当数据量很小时,您可以使用简单的 wc -w 来计数单词。 随着数据增长,我们找到了抽象解决方案的新方法,并反过来获得了灵活性和可扩展性。 虽然我还不是数据科学家(尚未),但 Spark 是一种令人兴奋的工具 - 我一直喜欢 DAG - 并且其核心非常有用。 即使没有将其与 Hadoop 生态系统的集成以及 SQL、流处理、图形处理和机器学习的扩展……仍然非常有用。

Redis 满足了 Spark 对数据的渴望。 spark-redis 允许您仅使用一行 Scala 代码即可将 RDD 和 RcDS 结合起来。 第一个版本已经提供了直接的 RDD 并行读/写访问所有核心数据结构,以及一种礼貌的(即基于 SCAN 的)方式来获取键名。 此外,该连接器具有相当大的隐藏能力,因为它实际上是(Redis)集群感知的,并将 RDD 分区映射到哈希槽以减少引擎间的混洗。 这只是连接器的第一个版本,很快就会发布另一个版本(可能很快会发布),这可能会破坏和更改一些内容,因此请考虑到这一点。 当然,由于该软件包是 开源的,因此非常欢迎您使用/扩展/修复/抱怨它 🙂

1. ‘我们’作为 Redis 以及 Redis 社区才华横溢的 Sun He @sunheehnus