在 github 上的 spark-redis 包 是我们1在 Spark-Redis 之旅中的第一步。Spark 围绕大数据的实时可能性捕获了公众的想象力,我们1希望为将这种可能性变为现实做出贡献。
spark-redis 包是 Apache Spark 的 Redis 连接器,它提供对所有 Redis 核心数据结构 (RcDS) 的读写访问权限,作为 RDD(弹性分布式数据集,不要与 RDB 混淆)。
我认为对新的连接器进行测试并展示其一些功能会很有趣。以下是我的旅程日志。首先……
在实际使用 spark-redis 之前,您需要一些先决条件,即:Apache Spark、Scala、Jedis 和 Redis。虽然该包明确说明了每个部分的版本要求,但我实际上使用了更高版本的版本,没有明显的负面影响(分别是 v1.5.2、v2.11.7、v2.8 和不稳定版本)。
我尝试了很多方法来让它全部正常工作。就在我完成一切后,我的朋友兼 Redis 同行 Tim Spann @PaaSDev 在 @DZone 上发布了一篇关于 “设置独立的 Apache Spark 集群” 的分步指南。如果您像我一样使用 Ubuntu,这应该可以帮助您解决最棘手的部分。
满足所有要求后,您只需 git clone https://github.com/RedisLabs/spark-redis,通过运行 sbt(哦,对了,也安装它)进行构建,或者直接使用来自 spark-packages.org () 的包,您就可以准备好了……但要准备去哪里呢?
这是一个教育性练习,我需要一个可以用高级有向无环图引擎和最快的 NoSQL 数据结构存储解决的问题。经过 广泛的研究,我设法确定了当代数据科学中可能面临的最大挑战——词语计数。由于词语计数挑战是 Spark 核心的事实上的“Hello, World!” 等效项,我选择将其用作我的初步探索的基础,并查看如何将其改编以用于 Redis。
当然,在计数单词时,您首先需要单词。由于我是一个意志坚定的人,我决定计算 Redis 源代码文件中的单词(此提交 尤其),也希望在此过程中揭示一些有趣的数据科学事实。准备就绪后,我从进入 spark-shell 开始
itamar@ubuntu:~/src$ ./spark-1.5.2/bin/spark-shell --jars spark-redis/target/spark-redis-0.5.1.jar,jedis/target/jedis-2.8.0.jar Spark context available as sc. SQL context available as sqlContext. Welcome to ____ __ / __/__ ___ _____/ /__ _ / _ / _ `/ __/ '_/ /___/ .__/_,_/_/ /_/_ version 1.5.2 /_/ Using Scala version 2.11.7 (OpenJDK 64-Bit Server VM, Java 1.7.0_91) Type in expressions to have them evaluated. Type :help for more information. scala>
闪烁的光标意味着世界已经准备好迎接我的第一行 Scala 代码,所以我输入了
scala> val wtext = sc.wholeTextFiles("redis/src/*.[ch]") wtext: org.apache.spark.rdd.RDD[(String, String)] = redis/src/*.[ch] WholeTextFileRDD[0] at wholeTextFiles at :24 scala> wtext.count res0: Long = 100
太棒了!我刚开始就证明了数据科学很有用:正好有 100 个 Redis 源代码文件!当然,从 shell 提示符中执行
ls -1 redis.src/*.[ch] | wc -l会告诉我相同的事情,但通过这种方式,我实际上可以看到我的独立 Spark 集群在 WholeTextFileRDD 上完成作业的各个阶段。的确非常酷。
在成功的鼓励下,我迅速前进,打算将文件内容转换为单词(之后可以进行计数)。与使用 TextFileRDD 的常见示例不同,WholeTextFilesRDD 由文件 URL 及其内容组成,因此事实证明以下代码片段完成了对数据进行拆分和清理所需的工作(对 cache() 方法的调用是严格可选的,但我尝试遵循最佳实践,并预计以后会再次使用该 RDD)。
val fwds = wtext. flatMap{ case (filename, contents) => val fname = filename.substring(filename.lastIndexOf("/") + 1) contents. split("W+"). filter(!_.isEmpty). map( word => (fname, word)) } fwds.cache()
关于变量名的说明:我喜欢有意义且简短的变量名,因此自然地,wtf 表示 WholeTextFiles,fwds 表示 FileWords,等等。
一旦 fwds RDD 包含了干净的文件名,所有单词都整齐地拆分后,我便开始进行一些认真的计数。首先,我重新创建了无处不在的单词计数示例
val wcnts = fwds. map{ case (fname, word) => (word, 1) }. reduceByKey(_ + _). map{ case (word, count) => (word, count.toString) }
将以上内容粘贴到 spark-shell 中,然后跟随 take 确认成功
wcnts: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[5] at map at :31
scala> wcnts.take(10)
res1: Array[(String, String)] = Array((requirepass,15), (mixdigest,2), (propagte,1), (used_cpu_sys,1), (rioFdsetRead,2), (0x3e13,1), (preventing,1), (been,12), (modifies,1), (geoArrayCreate,3))
scala> wcnts.count() res2: Long = 12657
关于结果的说明:take 不应该具有确定性,但鉴于“requirepass”这些天一直在出现,它很可能具有宿命性。此外,12657 肯定有某种含义,但我还没有找到它。
现在到了真正有趣的部分,即 Redis。我想确保结果存储在安全的地方(比如我的非持久化、未绑定、无密码的 Redis 服务器;))以便我可以在以后的计算中使用它们。Redis 的排序集非常适合单词计数对,还可以让我像往常一样查询数据。只需一行 Scala 代码即可实现(实际上是三行,但前两行不算)
import com.redis.provider.redis._ val redisDB = ("127.0.0.1", 6379) sc.toRedisZSET(wcnts, "all:words", redisDB)
在将数据存储在我方便查看的位置后,我启动了 cli 并进行了几个快速读取
itamar@ubuntu:~/src$ ./redis/src/redis-cli
127.0.0.1:6379> DBSIZE
(integer) 1
127.0.0.1:6379> ZCARD all:words
(integer) 12657
127.0.0.1:6379> ZSCORE all:words requirepass
"15"
127.0.0.1:6379> ZREVRANGE all:words 0 4 WITHSCORES
1) "the"
2) "8164"
3) "if"
4) "6657"
5) "0"
6) "5396"
7) "c"
8) "4524"
9) "1"
10) "4293"
127.0.0.1:6379> ZRANGE all:words 6378 6379
1) "mbl"
2) "mblen"
不错!我还能在 Redis 中存储什么?当然是一切。文件名本身就是完美的候选者,所以我创建了另一个 RDD 并将其存储在一个常规集合中
val fnames = fwds.
map{ case (fname, word) => fname }.distinct()
sc.toRedisSET(fnames, "all:files", redisDB)
尽管对科学目的非常有用,但 fnames 集合的内容非常平淡,我想要一些更……的东西。那么,如何将每个文件的单词计数存储在其自己的排序集中呢?通过一些转换/操作/RDD,我能够做到这一点
fwds.
groupByKey.
collect.
foreach{ case(fname, contents) =>
val zsetcontents = contents.
groupBy( word => word ).
map{ case(word, list) => (word, list.size.toString) }.
toArray
sc.toRedisZSET(sc.parallelize(zsetcontents), "file:" + fname, redisDB)
}
回到 redis-cli
127.0.0.1:6379> dbsize
(integer) 102
127.0.0.1:6379> ZREVRANGE file:scripting.c 0 4 WITHSCORES
1) "lua"
2) "366"
3) "the"
4) "341"
5) "if"
6) "227"
7) "1"
8) "217"
9) "0"
10) "197"
我可以整夜跳舞(存储单词计数数据),但如果你只写不读,你最好使用 spark-/dev/null 连接器。因此,为了实际利用 Redis 中的数据,我运行了以下代码,该代码获取了每个文件的单词计数并将它们减少到基本上与经典 WC 挑战的相同输出
val rwcnts = sc.fromRedisKeyPattern(redisDB, "file:*").
getZSet().
map{ case (member, count) => (member, count.toFloat.toInt) }.
reduceByKey(_ + _)
然后回到 spark-shell 测试此代码并获得所有单词的总计
scala> rwcnts.count()
res8: Long = 12657
scala> val total = rwcnts.aggregate(0)(
| (acc, value) => acc + value._2,
| (acc1, acc2) => acc1 + acc2)
total: Int = 27265
为了总结,我不能只相信 Spark 的结果,所以我使用 Lua 脚本来仔细检查
local tot1, tot2, cursor = 0, 0, 0 repeat local rep1 = redis.call('SCAN', cursor, 'MATCH', 'file:*') cursor = tonumber(rep1[1]) for _, ssk in pairs(rep1[2]) do local rep2 = redis.call('ZRANGE', ssk, 0, -1, 'WITHSCORES') for i = 2, #rep2, 2 do tot1 = tot1 + tonumber(rep2[i]) end end until cursor == 0 local rep = redis.call('ZRANGE', 'all:words', 0, -1, 'WITHSCORES') for i = 2, #rep, 2 do tot2 = tot2 + tonumber(rep[i]) end return { tot1, tot2 } itamar@ubuntu:~/src$ ./redis/src/redis-cli --eval /tmp/wordcount.lua 1) (integer) 272655 2) (integer) 272655
在数据量很小的时候,您可以使用简单的 wc -w 来计数单词。随着数据量的增长,我们发现新的方法来抽象解决方案,从而获得灵活性可扩展性。虽然我不是数据科学家(目前),但 Spark 是一款令人兴奋的工具——我一直喜欢 DAG——其核心非常有用。而且这还没有涉及它与 Hadoop 生态系统的集成以及对 SQL、流式处理、图形处理和机器学习的扩展……美味。
Redis 解渴了 Spark 对数据的渴望。spark-redis 允许您仅用一行 Scala 代码将 RDD 和 RcDS 相结合。第一个版本已经提供了对所有核心数据结构的直接 RDD 并行读/写访问权限,以及一种礼貌的(即基于 SCAN 的)方法来获取键名。此外,连接器具有相当大的隐藏威力,因为它实际上(Redis)集群感知且将 RDD 分区映射到哈希槽以减少引擎间混洗。这只是连接器的第一个版本,即将发布另一个版本 Soon(tm),它可能会破坏并更改一些内容,因此请考虑这一点。当然,由于该包是 开源的,所以您可以随意使用/扩展/修复/抱怨它 🙂
1. ‘我们’指的是 Redis 和 Redis 社区的才华横溢的 Sun He @sunheehnus。