在编写连接到数据库的服务器应用程序时,您经常需要处理连接池,如果忽略这个问题太久,会导致麻烦的后果。因此,让我们深入研究这个问题,并探讨连接池如何解决它。
服务器应用程序有一个共同的要求:它们必须响应来自多个客户端的独立请求。一个使用 Redis (或任何其他数据库) 的简单服务器应用程序会为每个请求打开一个新的 Redis 连接。不幸的是,这种方法无法扩展,因为您同时只能打开有限数量的连接,然后一切都会崩溃。打开和关闭连接也很昂贵:对应用程序和对数据库来说都是如此。
好消息是,这个问题完全可以通过更改应用程序的代码来解决。坏消息是,自己这样做并不总是那么容易,而这就是连接池等模式可以发挥作用的地方。
无服务器函数是云产品中相对较新的补充,但在许多方面,它们类似于旧的 CGI 脚本:一段小型代码,每次请求都会被调用。如果每次调用都独立于其他调用,这是否意味着不可能共享连接?不完全是。
通常,当应用程序调用无服务器函数时,该进程会在关闭之前保持活动一段时间,以防有更多请求进来。只要进程保持活动状态,它就可以维护与 Redis 的长期连接,但这取决于您是否正确实现它。
一般来说,创建持久连接很容易,您只需在主函数体之外实例化它。这是一个使用 JavaScript 的 AWS Lambda 的示例:
// Connection info, a URI with this shape:
// [redis[s]:]//[[user][:password@]][host][:port][/db-number][?db=db-number[&password=bar[&option=value]]]
const REDIS_URI = process.env.REDIS_URI;
// Redis client library
const redis = require('redis');
// Client instance
const r = redis.createClient(REDIS_URI);
// Promisify get and set
const { promisify } = require('util');
const setAsync = promisify(r.set).bind(r);
const getAsync = promisify(r.get).bind(r);
// This is the lambda function:
exports.handler = async function(event, context) {
console.log("EVENT: \n" + JSON.stringify(event, null, 2))
// Set a key
await setAsync("mykey", "hello world");
// Return the value to the caller
return getAsync("mykey");
}
请注意,您需要将此脚本上传到一个 Zip 存档中,该存档包含 node_redis,这是一个用于 Node.js 的 Redis 客户端 (AWS 文档 中有更详细的解释)。
相同的概念可以应用于其他语言和函数即服务 (FaaS) 云产品 (Google Cloud Functions, Microsoft Azure Functions, 等等)。
对于 JavaScript,由于 Node.js 的单线程特性,客户端不提供连接池。在上面的示例中,我们尝试在多个请求中重复使用相同的连接,但如果您在 Go 或其他具有多线程并发的语言中使用函数,则客户端也需要一个连接锁定方案,例如连接池。
基本原理很简单:实现连接池的客户端打开 n 个与数据库的连接,然后使用一种机制将连接标记为“可用”或“正在使用”,并且只使用空闲的连接。许多连接池可以替代单个连接,因此调用 .connect() 会从池中提取一个连接 (即将其标记为“正在使用”并返回真实连接给调用者),而 .close() 只会将它放回去 (而不实际关闭它)。
如果您正在使用像 Go 这样的多线程语言,请确保选择支持连接池的客户端。 从这个角度来看,Go-redis 是一个不错的选择,您可以在 文档中阅读。
某些客户端还允许您直接发送命令,而无需先从池中提取连接。虽然方便,但在以这种方式使用池时,需要记住一些事项(更多内容见下文)。
使用无服务器函数,整个应用程序架构非常简单:它只是一个函数。但是,在处理“有服务器”服务时,当涉及到并发时,共享连接变得更加麻烦。
一个简单的套接字连接不能被多个线程直接使用,因为需要一定程度的协调,以避免同时发送多个请求的片段,这将导致接收者无法理解的混合。
在这种情况下,连接池是一种很好的方法,可以使每个子组件看起来像是唯一使用连接的组件,但即使是连接池也不能完全抽象出连接管理的每个细节。
当从池中提取连接时,您的代码必须确保连接最终被放回。连接池对任何时候可以打开的连接数量都设置了上限(请记住,限制连接总数是目标的一部分),因此泄漏连接最终会导致您的服务死锁,因为最后一个 .connect() 会永远挂起,拒绝打开新连接并徒劳地等待现有连接返回到池中。
有时,未设计为长时间运行的代码会合并到更大的项目中并开始泄漏连接。要防止泄漏,您只需确保在不再需要连接时.close() 连接,但在实践中并不总是那么容易实现,尤其是在大型、混乱的项目中。
让我们看看使用连接池并确保在 Python 中进行正确清理的好方法。
为了向您展示一些示例代码,我将使用 aio-redis,这是一个用于 Python 的 Redis 客户端,它支持 asyncio。使用 aio-redis,可以直接使用连接池而无需先提取连接,如下所示:
import asyncio, aioredis
async def main():
pool = await aioredis.create_redis_pool('localhost',
db=0, password=None, ssl=False, minsize=4, maxsize=10, encoding='utf8')
# No need to pluck a single connection
# aioredis will schedule your command to
# a random connection inside the pool.
await pool.set("key", "hello world")
await pool.incrby("counter", 3)
if __name__ == '__main__':
loop = asyncio.main_loop()
loop.run_until_complete(main())
如前所述,这对于简单的用法来说效果很好,但在某些情况下,显式地从池中提取连接是更可取的,尤其是在操作需要很长时间才能完成的情况下,例如在 Streams、Lists、Sorted Sets 或 WAIT 上的阻塞操作中。
虽然 Redis 命令通常非常快,但有些命令被设计为阻塞的,这意味着它们在满足某些条件之前不会返回答案。例如,在 Streams 上进行阻塞读取 (XREAD) 将等待新条目进入流,当与 BLOCK 选项一起使用时(如果没有它,XREAD 会立即返回一个空结果集)。请记住,这些操作会阻塞客户端,而不是服务器。Redis 仍然能够响应通过其他连接发送的命令。
这些类型的命令对于我们之前展示的用法模式来说是一个问题,因为 aio-redis 不知道给定命令将运行多长时间,并且可能决定将新命令排队到忙于阻塞命令的连接。这意味着在之前的示例中,如果还有另一个异步函数使用池来执行阻塞 XREAD,那么我们的 SET 和 INCRBY 命令可能需要非常长的时间才能完成,或者甚至可能超时。
在这些情况下,您需要从池中显式地提取一个连接,并且还要确保在完成后将其返回。Python 使用一种称为上下文管理器的语言特性来帮助完成最后一部分,您可以使用 with 块来访问它。上下文管理器块是围绕必须始终清理的资源(连接、文件描述符)创建的。在块的末尾,无论我们是成功退出还是抛出异常,上下文管理器都会触发适当的清理过程,在我们的例子中,这包括将连接返回到池中,如下所示
import asyncio, aioredis
async def main():
pool = await aioredis.create_redis_pool('localhost',
db=0, password=None, ssl=False, minsize=4, maxsize=10, encoding='utf8')
# Assuming we're not the only ones using the pool
start_other_coroutines_that_use_redis(pool)
# We reserve a connection for a blocking operation
with await pool as conn:
items = await conn.xread(['stream'], latest_ids=['$'], timeout=0)
await process_items(items)
if __name__ == '__main__':
loop = asyncio.main_loop()
loop.run_until_complete(main())
(如果您熟悉上下文管理器和 asyncio,您可能会注意到 with await pool … 部分有点奇怪,因为这通常表示为 async with pool …。这是 aio-redis 实现的一个小怪癖,所以一切仍然按预期工作。您可以在 此处找到有关此问题的更多信息。)
这是另一个特殊情况:MULTI/EXEC 事务。不要误解,事务不是客户端阻塞操作,但它们确实对连接进行了特殊的使用。
当您发送 MULTI 命令时,连接状态会发生变化,Redis 开始将所有命令排队,而不是立即执行它们。当一个命令成功入队(即,它不包含明显的格式错误),Redis 会回复 OK。这意味着连接并非真正被阻塞,但它实际上不能用于多路复用来自程序多个子部分(这些子部分不知道正在发生事务)的命令。
一旦您调用 EXEC 或 DISCARD,整个事务将分别成功或失败,并且连接将返回到正常状态。
因此,许多客户端都有专用对象来表示事务。通常,事务对象甚至不会发送命令,直到您结束事务。这提高了性能,而不会改变语义,因为正如前面提到的,命令只会被 Redis 排队,直到事务最终完成:
import asyncio, aioredis
async def main():
pool = await aioredis.create_redis_pool('localhost',
db=0, password=None, ssl=False, minsize=4, maxsize=10, encoding='utf8')
# Assuming we're not the only ones using the pool
start_other_coroutines_that_use_redis(pool)
# This time we do a transaction
tr = pool.multi_exec()
tr.set("key", "hello")
# `execute` must be awaited, and be careful,
# awaiting the set command would deadlock the program
# as you would never be able to commit the transaction!
await tr.execute()
if __name__ == '__main__':
loop = asyncio.main_loop()
loop.run_until_complete(main())
连接管理是任何服务器端应用程序的重要组成部分,因为它通常是一个敏感的路径,鉴于服务器和客户端之间的一对多关系。当您没有正确管理连接时,无服务器函数中无限可扩展性的承诺可能会导致问题,但幸运的是,该解决方案很容易实现。
对于更复杂的架构,连接池允许您仅在本地(子组件)级别考虑连接管理,但您不能完全放弃连接管理,尤其是在执行特殊使用连接的操作时,例如事务或阻塞操作。