dot Redis 8 来了——而且是开源的

了解更多

无服务器函数和后端服务的连接池

在编写连接到数据库的服务器应用程序时,您经常需要处理连接池,如果忽略这个问题太久,会导致麻烦的后果。因此,让我们深入研究这个问题,并探讨连接池如何解决它。

连接过多

服务器应用程序有一个共同的要求:它们必须响应来自多个客户端的独立请求。一个使用 Redis (或任何其他数据库) 的简单服务器应用程序会为每个请求打开一个新的 Redis 连接。不幸的是,这种方法无法扩展,因为您同时只能打开有限数量的连接,然后一切都会崩溃。打开和关闭连接也很昂贵:对应用程序和对数据库来说都是如此。

好消息是,这个问题完全可以通过更改应用程序的代码来解决。坏消息是,自己这样做并不总是那么容易,而这就是连接池等模式可以发挥作用的地方。

无服务器函数

无服务器函数是云产品中相对较新的补充,但在许多方面,它们类似于旧的 CGI 脚本:一段小型代码,每次请求都会被调用。如果每次调用都独立于其他调用,这是否意味着不可能共享连接?不完全是。

通常,当应用程序调用无服务器函数时,该进程会在关闭之前保持活动一段时间,以防有更多请求进来。只要进程保持活动状态,它就可以维护与 Redis 的长期连接,但这取决于您是否正确实现它。

一般来说,创建持久连接很容易,您只需在主函数体之外实例化它。这是一个使用 JavaScript 的 AWS Lambda 的示例:

// Connection info, a URI with this shape:
// [redis[s]:]//[[user][:password@]][host][:port][/db-number][?db=db-number[&password=bar[&option=value]]]
const REDIS_URI = process.env.REDIS_URI;

// Redis client library
const redis = require('redis');

// Client instance
const r = redis.createClient(REDIS_URI);

// Promisify get and set
const { promisify } = require('util');
const setAsync = promisify(r.set).bind(r);
const getAsync = promisify(r.get).bind(r);

// This is the lambda function:
exports.handler =  async function(event, context) {
  console.log("EVENT: \n" + JSON.stringify(event, null, 2))
  
  // Set a key
  await setAsync("mykey", "hello world");
  
  // Return the value to the caller
  return getAsync("mykey");
}

请注意,您需要将此脚本上传到一个 Zip 存档中,该存档包含 node_redis,这是一个用于 Node.js 的 Redis 客户端 (AWS 文档 中有更详细的解释)。

相同的概念可以应用于其他语言和函数即服务 (FaaS) 云产品 (Google Cloud Functions, Microsoft Azure Functions, 等等)。

对于 JavaScript,由于 Node.js 的单线程特性,客户端不提供连接池。在上面的示例中,我们尝试在多个请求中重复使用相同的连接,但如果您在 Go 或其他具有多线程并发的语言中使用函数,则客户端也需要一个连接锁定方案,例如连接池。

连接池如何工作?

基本原理很简单:实现连接池的客户端打开 n 个与数据库的连接,然后使用一种机制将连接标记为“可用”或“正在使用”,并且只使用空闲的连接。许多连接池可以替代单个连接,因此调用 .connect() 会从池中提取一个连接 (即将其标记为“正在使用”并返回真实连接给调用者),而 .close() 只会将它放回去 (而不实际关闭它)。

如果您正在使用像 Go 这样的多线程语言,请确保选择支持连接池的客户端。 从这个角度来看,Go-redis 是一个不错的选择,您可以在 文档中阅读

某些客户端还允许您直接发送命令,而无需先从池中提取连接。虽然方便,但在以这种方式使用池时,需要记住一些事项(更多内容见下文)。

服务

使用无服务器函数,整个应用程序架构非常简单:它只是一个函数。但是,在处理“有服务器”服务时,当涉及到并发时,共享连接变得更加麻烦。

一个简单的套接字连接不能被多个线程直接使用,因为需要一定程度的协调,以避免同时发送多个请求的片段,这将导致接收者无法理解的混合。

在这种情况下,连接池是一种很好的方法,可以使每个子组件看起来像是唯一使用连接的组件,但即使是连接池也不能完全抽象出连接管理的每个细节。

防止连接泄漏

当从池中提取连接时,您的代码必须确保连接最终被放回。连接池对任何时候可以打开的连接数量都设置了上限(请记住,限制连接总数是目标的一部分),因此泄漏连接最终会导致您的服务死锁,因为最后一个 .connect() 会永远挂起,拒绝打开新连接并徒劳地等待现有连接返回到池中。

有时,未设计为长时间运行的代码会合并到更大的项目中并开始泄漏连接。要防止泄漏,您只需确保在不再需要连接时.close() 连接,但在实践中并不总是那么容易实现,尤其是在大型、混乱的项目中。

让我们看看使用连接池并确保在 Python 中进行正确清理的好方法。

一个 Python 示例:aio-redis

为了向您展示一些示例代码,我将使用 aio-redis,这是一个用于 Python 的 Redis 客户端,它支持 asyncio。使用 aio-redis,可以直接使用连接池而无需先提取连接,如下所示:

import asyncio, aioredis

async def main():
   pool = await aioredis.create_redis_pool('localhost',
      db=0, password=None, ssl=False, minsize=4, maxsize=10, encoding='utf8')

   # No need to pluck a single connection
   # aioredis will schedule your command to
   # a random connection inside the pool.
   await pool.set("key", "hello world")
   await pool.incrby("counter", 3)
 
if __name__ == '__main__':
    loop = asyncio.main_loop()
    loop.run_until_complete(main())

如前所述,这对于简单的用法来说效果很好,但在某些情况下,显式地从池中提取连接是更可取的,尤其是在操作需要很长时间才能完成的情况下,例如在 Streams、Lists、Sorted Sets 或 WAIT 上的阻塞操作中。

阻塞操作

虽然 Redis 命令通常非常快,但有些命令被设计为阻塞的,这意味着它们在满足某些条件之前不会返回答案。例如,在 Streams 上进行阻塞读取 (XREAD) 将等待新条目进入流,当与 BLOCK 选项一起使用时(如果没有它,XREAD 会立即返回一个空结果集)。请记住,这些操作会阻塞客户端,而不是服务器。Redis 仍然能够响应通过其他连接发送的命令。

这些类型的命令对于我们之前展示的用法模式来说是一个问题,因为 aio-redis 不知道给定命令将运行多长时间,并且可能决定将新命令排队到忙于阻塞命令的连接。这意味着在之前的示例中,如果还有另一个异步函数使用池来执行阻塞 XREAD,那么我们的 SETINCRBY 命令可能需要非常长的时间才能完成,或者甚至可能超时。

在这些情况下,您需要从池中显式地提取一个连接,并且还要确保在完成后将其返回。Python 使用一种称为上下文管理器的语言特性来帮助完成最后一部分,您可以使用 with 块来访问它。上下文管理器块是围绕必须始终清理的资源(连接、文件描述符)创建的。在块的末尾,无论我们是成功退出还是抛出异常,上下文管理器都会触发适当的清理过程,在我们的例子中,这包括将连接返回到池中,如下所示

import asyncio, aioredis

async def main():
   pool = await aioredis.create_redis_pool('localhost',
      db=0, password=None, ssl=False, minsize=4, maxsize=10, encoding='utf8')
    
   # Assuming we're not the only ones using the pool
   start_other_coroutines_that_use_redis(pool)
   
   # We reserve a connection for a blocking operation
   with await pool as conn:
      items = await conn.xread(['stream'], latest_ids=['$'], timeout=0)
      await process_items(items)
 
if __name__ == '__main__':
    loop = asyncio.main_loop()
    loop.run_until_complete(main())

(如果您熟悉上下文管理器和 asyncio,您可能会注意到 with await pool … 部分有点奇怪,因为这通常表示为 async with pool …。这是 aio-redis 实现的一个小怪癖,所以一切仍然按预期工作。您可以在 此处找到有关此问题的更多信息。)

MULTI/EXEC 事务

这是另一个特殊情况:MULTI/EXEC 事务。不要误解,事务不是客户端阻塞操作,但它们确实对连接进行了特殊的使用。

当您发送 MULTI 命令时,连接状态会发生变化,Redis 开始将所有命令排队,而不是立即执行它们。当一个命令成功入队(即,它不包含明显的格式错误),Redis 会回复 OK。这意味着连接并非真正被阻塞,但它实际上不能用于多路复用来自程序多个子部分(这些子部分不知道正在发生事务)的命令。

一旦您调用 EXECDISCARD,整个事务将分别成功或失败,并且连接将返回到正常状态。

因此,许多客户端都有专用对象来表示事务。通常,事务对象甚至不会发送命令,直到您结束事务。这提高了性能,而不会改变语义,因为正如前面提到的,命令只会被 Redis 排队,直到事务最终完成:

import asyncio, aioredis

async def main():
   pool = await aioredis.create_redis_pool('localhost',
      db=0, password=None, ssl=False, minsize=4, maxsize=10, encoding='utf8')
    
   # Assuming we're not the only ones using the pool
   start_other_coroutines_that_use_redis(pool)
   
   # This time we do a transaction 
   tr = pool.multi_exec()
   tr.set("key", "hello")
   
   # `execute` must be awaited, and be careful, 
   # awaiting the set command would deadlock the program
   # as you would never be able to commit the transaction!
   await tr.execute()
 
if __name__ == '__main__':
    loop = asyncio.main_loop()
    loop.run_until_complete(main())

连接管理不可忽略

连接管理是任何服务器端应用程序的重要组成部分,因为它通常是一个敏感的路径,鉴于服务器和客户端之间的一对多关系。当您没有正确管理连接时,无服务器函数中无限可扩展性的承诺可能会导致问题,但幸运的是,该解决方案很容易实现。

对于更复杂的架构,连接池允许您仅在本地(子组件)级别考虑连接管理,但您不能完全放弃连接管理,尤其是在执行特殊使用连接的操作时,例如事务或阻塞操作。