如何使用 Redis 实现快速数据摄取
以下是一些用 Java 编写的代码片段。它们都使用 Jedis 库。首先,按照 Jedis 的入门页面上的说明下载最新版本的 Jedis。
1. 使用 Redis Streams 进行快速数据摄取
A. 将消息发布到流数据结构。此程序使用 XADD 将新项目添加到流中。文件名:StreamPublish.java。
import java.util.HashMap;
import java.util.Map;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.StreamEntryID;
public class StreamPublish {
static Jedis jedis = new Jedis(“localhost”, 6379);
public static void main(String[] args) throws Exception {
try {
Map & lt;
String, String > kv = new HashMap & lt;
String, String > ();
kv.put(“a”, “100”); // key -> a; value -> 100
jedis.xadd(“MyStream”, StreamEntryID.NEW_ENTRY, kv);
} finally {
jedis.close();
}
}
}
B. 异步从流中消费数据。如果流为空,请等待消息。此程序使用 XREAD 命令。文件名:StreamConsumeAsync.java。
import java.util.AbstractMap.SimpleEntry;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.StreamEntry;
import redis.clients.jedis.StreamEntryID;
public class StreamConsumeAsync {
static Jedis jedis = new Jedis(“localhost”, 6379);
public static void main(String[] args) throws Exception {
// Start from 0. For subsequent queries, read from last id + 1
String lastStreamDataId = “0-0”;
int count = 1000;
long waitTimeInMillis = 5000;
try {
// Read new data asynchronously in a loop
while(true) {
List next = getNext(“MyStream”, lastStreamDataId,
count, waitTimeInMillis);
if (next != null) {
List<StreamEntry> stList = getStreamEntries(next);
if (stList != null) {
// Process data here
for(int j=0; j<stList.size(); j++) {
StreamEntry streamData = (StreamEntry)stList.get(j); // Read the fields (key-value pairs) of data stream
Map<String, String> fields = streamData.getFields(); // Read subsequent data from last id + 1
lastStreamDataId = streamData.getID().getTime()
+”-”
+(streamData.getID().getSequence()+1); System.out.println(stList.get(j));
System.out.println(lastStreamDataId);
}
} else {
System.out.println(“No new data in the stream”);
}
}
}
} finally {
jedis.close();
}
}
// Read the next set of data from the stream
private static List getNext(String streamId, String lastId, int count, long waitTimeInMillis) throws Exception {
HashMap<String, StreamEntryID> map = new HashMap();
String readFrom = lastId;
map.put(streamId, new StreamEntryID(readFrom));
List list = jedis.xread(count, waitTimeInMillis,
(Entry<String, StreamEntryID>)
map.entrySet().toArray()[0]);
return list;
}
// Read stream entries
// Assumes streamList has only one stream
private static List<StreamEntry> getStreamEntries(List streamList) throws Exception {
if (streamList.size()>0) {
SimpleEntry stEntry = (SimpleEntry)streamList.get(0);
return (List<StreamEntry>) stEntry.getValue();
}
return null;
}
}
C. 使用 XRANGE 命令查询流。文件名:StreamQuery.java
import java.util.List;
import java.util.Map;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.StreamEntry;
import redis.clients.jedis.StreamEntryID;
public class StreamQuery {
static Jedis jedis = new Jedis(“localhost”, 6379);
public static void main(String[] args) throws Exception {
String streamID = “MyStream”;
StreamEntryID start = new StreamEntryID(0, 0);
StreamEntryID end = null; // null -> until the last item in the stream
int count = 2;
try {
List & lt;
StreamEntry > stList = jedis.xrange(streamID, start, end, count);
if (stList != null) {
// Process data here
for (int j = 0; j & lt; stList.size(); j++) {
StreamEntry streamData = (StreamEntry) stList.get(j);
System.out.println(streamData); // Read the fields (key-value pairs) of data stream
Map & lt;
String, String > fields = streamData.getFields(); // Read subsequent data from last id + 1
StreamEntryID nextStart =
new StreamEntryID(streamData.getID().getTime(),
(streamData.getID().getSequence() + 1));
}
} else {
System.out.println(“No new data in the stream”);
}
} finally {
jedis.close();
}
}
}
2. 使用发布/订阅进行快速数据摄取
A. 发布到通道。文件名:PubSubPublish.java
import redis.clients.jedis.Jedis;
public class PubSubPublish {
static Jedis jedis = new Jedis(“localhost”, 6379);
public static void main(String[] args) throws Exception {
try {
String channel = “MyChannel”;
String message = “Hello there!”;
jedis.publish(channel, message);
} finally {
jedis.close();
}
}
}
B. 订阅通道。文件名:PubSubPublish.java
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;
public class PubSubSubscribe extends JedisPubSub {
static Jedis jedis = new Jedis(“localhost”, 6379);
public static void main(String[] args) throws Exception {
try {
PubSubSubscribe mySubscriber = new PubSubSubscribe();
String channel = “MyChannel”;
jedis.subscribe(mySubscriber, channel);
} finally {
jedis.close();
}
} // Receive messages
@Override
public void onMessage(String channel, String message) {
System.out.println(message);
}
}
3. 使用列表进行快速数据摄取
A. 将数据推送到列表。文件名:ListPush.java
import redis.clients.jedis.Jedis;
public class ListPush {
static Jedis jedis = new Jedis(“localhost”, 6379);
public static void main(String[] args) throws Exception {
try {
String list = “MyList”;
String message = “Hello there!”;
jedis.lpush(list, message);
} finally {
jedis.close();
}
}
}
B. 从列表中弹出数据。文件名:ListPop.java
import redis.clients.jedis.Jedis;
public class ListPop {
static Jedis jedis = new Jedis(“localhost”, 6379);
public static void main(String[] args) throws Exception {
try {
String list = “MyList”;
String message = jedis.rpop(list);
System.out.println(message);
} finally {
jedis.close();
}
}
}