点 快速的未来正在来到您所在的城市中的某个活动。

在 Redis Released 中加入我们

无服务器函数和后端服务的连接池

在编写连接到数据库的服务器应用程序时,经常会遇到连接池,如果你长期忽视这个问题,它会导致一系列无法解决的后果。因此,让我们深入了解这个问题,并探讨连接池如何解决它。

太多的连接

服务器应用程序有一个共同的要求:它们必须响应来自多个客户端的独立请求。一个拙劣的服务器应用程序在使用 Redis(或任何其他数据库)时,会为每个请求打开一个新的 Redis 连接。遗憾的是,这种方法并不能扩展,因为你只能同时打开有限数量的连接,否则一切都将崩溃。打开和关闭连接也并不便宜:无论对应用程序还是数据库来说都是如此。 

好消息是,这个问题可以通过更改应用程序代码彻底解决。坏消息是你自己做起来并不总那么容易,这就是连接池等模式可以帮助解决的地方。

无服务器函数

无服务器函数是云服务相对较新的功能,但在很多方面它们类似于旧的 CGI 脚本:每请求调用一次的小代码段。如果每个调用都独立于所有其他调用,这是否意味着不可能共享连接?并不完全如此。

通常,当应用程序调用无服务器函数时,该进程在关闭之前会保持活动一段时间,以防收到更多请求。只要进程保持活动状态,它就可以与 Redis 保持长时间连接,但要正确实现,就靠你了。

通常,创建一个持久连接很容易,你只需要在主函数主体之外实例化它即可。这里有个使用 JavaScript 在 AWS Lambda 中的示例:

// Connection info, a URI with this shape:
// [redis[s]:]//[[user][:password@]][host][:port][/db-number][?db=db-number[&password=bar[&option=value]]]
const REDIS_URI = process.env.REDIS_URI;

// Redis client library
const redis = require('redis');

// Client instance
const r = redis.createClient(REDIS_URI);

// Promisify get and set
const { promisify } = require('util');
const setAsync = promisify(r.set).bind(r);
const getAsync = promisify(r.get).bind(r);

// This is the lambda function:
exports.handler =  async function(event, context) {
  console.log("EVENT: \n" + JSON.stringify(event, null, 2))
  
  // Set a key
  await setAsync("mykey", "hello world");
  
  // Return the value to the caller
  return getAsync("mykey");
}

请注意,你需要将此脚本上传到包含 node_redis 的 Zip 存档中,node_redis 是 Node.js 的 Redis 客户端(AWS 文档对此有更详细的解释)。

同样的概念可以应用于其他语言和功能即服务 (FaaS) 云服务(Google Cloud FunctionsMicrosoft Azure Functions,依此类推)。

对于 JavaScript,客户端未提供连接池,因为 Node.js 是单线程的。在上面的示例中,我们试图在多个请求之间重复使用同一个连接,但如果您打算在 Go 或具有多线程并发性的其他语言中使用函数,客户端还需要一个连接锁定方案,例如连接池。

连接池的工作原理是什么?

基本原理很简单:实现了连接池的客户端会打开n个连接到数据库的连接,然后会有一种机制将连接标记为“可用”或“正在使用”,并且仅使用空闲的连接。许多连接池可以作为单一实例连接的原位替换,所以调用 .connect() 会从池中挑选一个连接(即将其标记为“正在使用”,并对调用方返回实际的连接),而 .close() 会简单地将其放回去(实际上不会关闭它)。

如果您正在使用像 Go 这样的多线程语言,请务必选择支持连接池的客户端。 Go-redis从这个角度来看是一个不错的选择,您可以在文档中进行阅读。

有些客户端还允许您发送命令而不必先从池中挑选一个连接。虽然方便,但在使用这种方式的池时需要注意一些事项(下面会详细介绍)。

服务

使用无服务器函数时,应用程序的整个架构都非常简单:它只是一个函数。但是在处理有“服务器”的服务时,在涉及并发性时,共享连接会变得更加繁琐。

一个简单的套接字连接不能被多个线程直接使用,因为需要一定程度的协调,以避免同时发送多个请求的比特和块,这会导致接收方难以理解信息。

在这种情况中,连接池是一个很好的方式,可以让每个子组件看起来像是唯一正在使用连接,但即使是连接池也无法完全抽象出连接管理的每个细节。

防止连接泄漏

当从池中挑选一个连接时,您的代码必须确保该连接最终会被放回去。连接池对可以在任何时间打开的连接数量实施上限(请记住,限制总连接数量是目标的一部分),所以连接泄漏最终会使您的服务陷入僵局,这时最后一个 .connect() 会一直挂起,拒绝打开一个新连接,徒劳地等待现有的连接返回池中。

偶尔,本来并不设计的持久运行的代码会被合并到一个更大项目中,并开始泄漏连接。为了防止泄漏,您只需要确保在不再需要时.close() 连接,但在实际应用中,尤其是在大型、杂乱的项目中,这并不总是很容易实现。

让我们看看在 Python 中使用连接池并确保正确清理的好方法。

Python 示例:aio-redis

为了向您展示一些示例代码,我将使用 aio-redis,这是支持 asyncio 的 Python Redis 客户端

import asyncio, aioredis

async def main():
   pool = await aioredis.create_redis_pool('localhost',
      db=0, password=None, ssl=False, minsize=4, maxsize=10, encoding='utf8')

   # No need to pluck a single connection
   # aioredis will schedule your command to
   # a random connection inside the pool.
   await pool.set("key", "hello world")
   await pool.incrby("counter", 3)
 
if __name__ == '__main__':
    loop = asyncio.main_loop()
    loop.run_until_complete(main())

如前所述,这对简单用法来说很好,但在某些情况下最好明确从池中提取一个连接,尤其是在某个操作需要很长时间才能完成时,例如对流、列表、有序集合或 WAIT 的阻塞操作。

阻塞操作

尽管 Redis 命令往往非常快,但某些命令被设计为阻塞命令,这意味着在满足某些条件之前,它们不会返回答案。例如,在使用 BLOCK 选项时,流上的阻塞读取(XREAD)将等待新条目进入流(如果没有该选项,XREAD 将立即返回一个空结果集)。请记住,这些操作会阻塞客户端,而不是服务器。Redis 仍然能够响应通过其他连接发送的命令。

对于我们之前展示的用法模式来说,这些类型的命令存在问题,因为 aio-redis 无法知道某个给定命令将运行多长时间,并且可能会决定将一个新命令排入队列,而该队列正在使用一个繁忙的阻塞命令。这意味着,在前面的示例中,如果有另一个 async 函数使用该池来执行一个阻塞 XREAD,我们的 SETINCRBY 命令可能需要非常长的时间才能完成,甚至可能超时。

在这些情况下,您需要明确从池中提取一个连接,并在完成后将其放回。Python 通过一个称为上下文管理器的语言功能来帮助完成最后一部分,您可以使用 with 代码块来访问该功能。上下文管理器代码块围绕一个必须始终清理的资源(连接、文件描述符)创建。在代码块的末尾,无论我们是成功退出还是抛出异常,上下文管理器都会触发适当的清理程序,在我们的例子中,它包括将连接放回池中,如下所示

import asyncio, aioredis

async def main():
   pool = await aioredis.create_redis_pool('localhost',
      db=0, password=None, ssl=False, minsize=4, maxsize=10, encoding='utf8')
    
   # Assuming we're not the only ones using the pool
   start_other_coroutines_that_use_redis(pool)
   
   # We reserve a connection for a blocking operation
   with await pool as conn:
      items = await conn.xread(['stream'], latest_ids=['$'], timeout=0)
      await process_items(items)
 
if __name__ == '__main__':
    loop = asyncio.main_loop()
    loop.run_until_complete(main())

(如果你熟悉上下文管理器和异步,你可能会注意到,with await pool ... 部分有点奇怪,因为这通常表示为 async with pool ... 。这是 aio-redis 实现的一个小怪癖,所以一切都仍按预期工作。你可以在这里找到更多关于此问题的信息。)

MULTI/EXEC 事务

这里有另一个特例:MULTI/EXEC 事务。不要搞错,事务不是客户端阻止操作,但它们确实对连接进行了特殊使用。

当你发送 MULTI 时,连接状态改变,Redis 开始将所有命令排队,而不是立即执行它们。当一个命令成功加入队列中(即,它不包含明显的格式错误),Redis 以 OK 回应。这意味着连接没有被实际阻塞,但它不能真正用于多路复用来自不知道正在进行事务的程序多个子部分的命令。

一旦你调用 EXECDISCARD,整个事务将分别成功或失败,而且连接将恢复到正常状态。

出于这个原因,许多客户端都有表示事务的专用对象。通常,事务对象甚至在你结束事务之前都不会发送命令。这改善了性能,而没有改变语义,因为如前所述,直到事务最终完成之前 Redis 才将命令加入队列: 

import asyncio, aioredis

async def main():
   pool = await aioredis.create_redis_pool('localhost',
      db=0, password=None, ssl=False, minsize=4, maxsize=10, encoding='utf8')
    
   # Assuming we're not the only ones using the pool
   start_other_coroutines_that_use_redis(pool)
   
   # This time we do a transaction 
   tr = pool.multi_exec()
   tr.set("key", "hello")
   
   # `execute` must be awaited, and be careful, 
   # awaiting the set command would deadlock the program
   # as you would never be able to commit the transaction!
   await tr.execute()
 
if __name__ == '__main__':
    loop = asyncio.main_loop()
    loop.run_until_complete(main())

不能忽视连接管理

连接管理是任何服务器端应用程序的重要部分,因为它通常是一个敏感路径,因为服务器与客户端之间存在多对一的关系。当你不恰当管理连接时,无服务器函数中无限可扩展性的承诺可能会导致问题,但幸运的是,解决方案易于实现。

对于更复杂的架构,连接池允许你在本地(子组件)级别考虑连接管理,但是你不能完全放弃连接管理,特别是当执行对连接进行特殊使用的操作(例如事务或阻塞操作)时。