dot Redis 8 来了——而且是开源的

了解更多

6.6.2 发送文件

返回主页

6.6.2 发送文件

为了将日志数据发送到我们的日志处理器,我们将有两个不同的组件对数据进行操作。 首先是一个脚本,它会将日志文件放入 Redis 中以命名键,使用我们第 6.5.2 节中的群聊方法将键的名称发布到聊天频道,并等待完成通知(以便不使用比我们的 Redis 机器更多的内存)。 它将等待一个通知,表明 Redis 中存储的与文件名称相似的键的值等于 10,这是我们的聚合进程数。 下面的列表显示了复制日志并在完成后清理自身的功能。

列表 6.30 copy_logs_to_redis() 函数
def copy_logs_to_redis(conn, path, channel, count=10,
                     limit=2**30, quit_when_done=True):
   bytes_in_redis = 0
   waiting = deque()
 
   create_chat(conn, 'source', map(str, range(count)), '', channel)

创建将用于向客户端发送消息的聊天。

   count = str(count)
 
   for logfile in sorted(os.listdir(path)):

迭代所有日志文件。

      full_path = os.path.join(path, logfile)

 
      fsize = os.stat(full_path).st_size
 
      while bytes_in_redis + fsize > limit:
         cleaned = _clean(conn, channel, waiting, count)
         if cleaned:
            bytes_in_redis -= cleaned
      else:
            time.sleep(.25)

如果我们需要更多空间,则清理已完成的文件。

   with open(full_path, 'rb') as inp:
      block = ' '
      while block:
         block = inp.read(2**17)
         conn.append(channel+logfile, block)

将文件上传到 Redis。

   send_message(conn, channel, 'source', logfile)

通知侦听器文件已准备就绪。

   bytes_in_redis += fsize
   waiting.append((logfile, fsize))

更新有关 Redis 内存使用情况的本地信息。

if quit_when_done:
   send_message(conn, channel, 'source', ':done')

我们已经没有文件了,因此发出完成的信号。

while waiting:
   cleaned = _clean(conn, channel, waiting, count)
   if cleaned:
      bytes_in_redis -= cleaned
   else:
      time.sleep(.25)

如果我们需要更多空间,则清理已完成的文件。

def _clean(conn, channel, waiting, count):
   if not waiting:
      return 0
   w0 = waiting[0][0]
   if conn.get(channel + w0 + ':done') == count:
      conn.delete(channel + w0, channel + w0 + ':done')
      return waiting.popleft()[1]
   return 0

我们实际上如何从 Redis 执行清理。

 

将日志复制到 Redis 需要许多详细的步骤,主要包括小心不要一次将太多数据放入 Redis 中,并在所有客户端读取文件后正确清理自身。 通知日志处理器有新文件准备就绪的实际方面很容易,但设置、发送和清理非常详细。