为了将日志数据发送到我们的日志处理器,我们将有两个不同的组件对数据进行操作。 首先是一个脚本,它会将日志文件放入 Redis 中以命名键,使用我们第 6.5.2 节中的群聊方法将键的名称发布到聊天频道,并等待完成通知(以便不使用比我们的 Redis 机器更多的内存)。 它将等待一个通知,表明 Redis 中存储的与文件名称相似的键的值等于 10,这是我们的聚合进程数。 下面的列表显示了复制日志并在完成后清理自身的功能。
def copy_logs_to_redis(conn, path, channel, count=10, limit=2**30, quit_when_done=True): bytes_in_redis = 0 waiting = deque()
create_chat(conn, 'source', map(str, range(count)), '', channel)
创建将用于向客户端发送消息的聊天。
count = str(count)
for logfile in sorted(os.listdir(path)):
迭代所有日志文件。
full_path = os.path.join(path, logfile)
fsize = os.stat(full_path).st_size
while bytes_in_redis + fsize > limit: cleaned = _clean(conn, channel, waiting, count) if cleaned: bytes_in_redis -= cleaned else: time.sleep(.25)
如果我们需要更多空间,则清理已完成的文件。
with open(full_path, 'rb') as inp: block = ' ' while block: block = inp.read(2**17) conn.append(channel+logfile, block)
将文件上传到 Redis。
send_message(conn, channel, 'source', logfile)
通知侦听器文件已准备就绪。
bytes_in_redis += fsize waiting.append((logfile, fsize))
更新有关 Redis 内存使用情况的本地信息。
if quit_when_done: send_message(conn, channel, 'source', ':done')
我们已经没有文件了,因此发出完成的信号。
while waiting: cleaned = _clean(conn, channel, waiting, count) if cleaned: bytes_in_redis -= cleaned else: time.sleep(.25)
如果我们需要更多空间,则清理已完成的文件。
def _clean(conn, channel, waiting, count): if not waiting: return 0 w0 = waiting[0][0] if conn.get(channel + w0 + ':done') == count: conn.delete(channel + w0, channel + w0 + ':done') return waiting.popleft()[1] return 0
我们实际上如何从 Redis 执行清理。
将日志复制到 Redis 需要许多详细的步骤,主要包括小心不要一次将太多数据放入 Redis 中,并在所有客户端读取文件后正确清理自身。 通知日志处理器有新文件准备就绪的实际方面很容易,但设置、发送和清理非常详细。