抑郁症健康,内容丰富有趣,生活中的好帮手!
抑郁症健康 > rabbitmq取消自动重连_rabbitmq客户端自动重连

rabbitmq取消自动重连_rabbitmq客户端自动重连

时间:2024-04-12 00:09:07

相关推荐

编程rookie, 如有错误请指出 ☞:

253065903@

RabbitMQ

Node.js 客户端( AMQP 0-9-1 V0.5.2

)自动重连

重启策略

开始找解决方案:

通过查看AMQP的源码,发现没有reconnect的选项

然后上GitHub上看有没有人提出类似的问题 github repo

,通过输入 reconnect

搜索issue区找到题为 Support Auto-reconnection

的 issue

,发现这个问题是早在

年就提出来的,可是现在还是 open

的状态!

在这个Issue区发现有一个解决方案还是可以实践一下的:

function connectRMQ() {

amqp.connect(config.rabbitmq.URI).then(function(conn) {

conn.on('close', function() {

console.error('Lost connection to RMQ. Reconnecting in 60 seconds...');

return setTimeout(connectRMQ, 60 * 1000);

});

return conn.createChannel().then(function(ch) {

var ok = ch.assertQueue(config.rabbitmq.queue, {durable: true});

ok = ok.then(function() {

ch.prefetch(1);

ch.consume(config.rabbitmq.queue, doWork, {noAck: false});

});

return ok.then(function() {

console.log(" [*] Waiting in %s.", config.rabbitmq.queue);

});

function doWork(msg) {

var body = msg.content.toString();

console.log(" [x] Received %s", body);

setTimeout(function() {

ch.ack(msg);

}, config.rabbitmq.timeout);

}

});

}).then(null, function() {

setTimeout(connectRMQ, 10 * 1000);

return console.log('connection failed');

});

}

connectRMQ();

上述的解决方案是在建立连接之后对连接添加 close

的监听事件,当 close

事件触发,

或者连接出错以及之后的步骤出错都在10s之后重新调用方法本身实现重连

还有一种简单粗暴的方法,监听 close

、 error

事件, 有错误就抛出来,然后依靠外部的守护程序将此客户端重启

实践

按照Issue区发现的解决方案进行实践,修改之前的代码

#!/usr/bin/env node

const MQ_CONFIG = require('./conf/rabbitmq')

const REDIS_CONFIG = require('./conf/redis')

const Utils = require('./lib/Utils')

const pubRedisCli = Utils.connectRedis(REDIS_CONFIG.url)

var amqp = require('amqplib')

var ex = MQ_CONFIG.ex

var patten = MQ_CONFIG.routing_key

var exType = MQ_CONFIG.ex_type

var q = MQ_CONFIG.q || 'signals'

var cnt = 0

function connect() {

amqp

.connect(MQ_CONFIG.url)

.then(conn => {

conn.on('close', e => {

reconnect(e)

})

return conn

})

.then(conn => {

cnt = 0

log('connect RMQ OK')

console.log(' [*] Waiting for signals. To exit press CTRL+C')

return conn.createChannel()

})

.then(ch => {

return ch.assertQueue(q, {

durable: true

})

.then(() => {

return ch.assertExchange(ex, exType, {

durable: true

})

})

.then(() => {

return ch.assertQueue(q, {

durable: true

})

})

.then(() => {

return ch.bindQueue(q, ex, patten)

})

.then(() => {

return ch.consume(q, (msg) => {

pubRedisCli.publish(msg.fields.routingKey, msg.content.toString(), function (err, reply) {

if (err) {

log(err)

ch.nack(msg)

} else {

if (reply !== 0) {

ch.ack(msg)

} else {

ch.ack(msg)

saveUnSubscribeMsg(msg.content.toString())

}

}

})

})

})

})

.catch(e => {

reconnect(e)

})

}

function reconnect(e) {

log(e.message)

log('lost RMQ connection')

cnt++

log(`正在第${cnt}次重新连接RMQ...`)

setTimeout(() => {

connect()

}, 10 * 1000)

}

connect()

/**

* if signals didn't be subscribed, they would be saved to ./data dir

* @param {string} msg

*/

function saveUnSubscribeMsg(msg) {

let date = new Date().toLocaleDateString()

const fs = require('fs')

const dir = './data'

if (!fs.existsSync(dir)) {

fs.mkdirSync(dir)

}

let path = `${dir}/${date}.txt`

let isExist = fs.existsSync(path)

if (isExist) {

fs.appendFileSync(path, msg)

} else {

fs.writeFileSync(path, msg)

}

}

function log(...args) {

console.log(...args, new Date().toLocaleString())

}

然后进行测试:

通过对MQserver的重启,均正常,然后将MQserver的机器的网断掉测试,发现了close事件并没有监听到,而是报了heartbeat超时的错误,从而程序直接退出了,于是又在代码中加入对error事件的监听:

amqp

.connect(MQ_CONFIG.url)

.then(conn => {

conn.on('error', e => {

reconnect(e)

})

conn.on('close', e => {

reconnect(e)

})

return conn

})

这下应该不会导致程序退出了吧,然而又引入了新的问题,当重启MQserver时,报了 ECONNECTRET

的错误,两个监听事件都监听到了,所以程序重连了两次,导致一个项目在MQserver上建立了两个连接,当再一次重启MQserver时,建立了四个连接!

这是很严重的错误,然而并不是所有时候两个监听事件都能监听到,比如 heartbeat

超时就只报 error

的错误,所有需要想出一个策略,让程序始终与MQserver之间只有一个连接。

采用声明一个变量,记录是不是正在连接

var isConnecting = false

如果已经在连接了,其他的重连都不做处理

function reconnect(e) {

if (!isConnecting) {

isConnecting = true

log(e.message)

log('lost RMQ connection')

cnt++

log(`正在第${cnt}次重新连接RMQ...`)

setTimeout(() => {

connect()

}, 10 * 1000)

}

}

连接上时将重连的标志设为 false

.then(conn => {

cnt = 0

log('connect RMQ OK')

isConnecting = false

于是乎,完整代码如下:

#!/usr/bin/env node

const MQ_CONFIG = require('./conf/rabbitmq')

const REDIS_CONFIG = require('./conf/redis')

const Utils = require('./lib/Utils')

const pubRedisCli = Utils.connectRedis(REDIS_CONFIG.url)

var amqp = require('amqplib')

var ex = MQ_CONFIG.ex

var patten = MQ_CONFIG.routing_key

var exType = MQ_CONFIG.ex_type

var q = MQ_CONFIG.q || 'signals'

var cnt = 0

var isConnecting = false

function connect() {

amqp

.connect(MQ_CONFIG.url)

.then(conn => {

conn.on('error', (e) => {

reconnect(e)

})

conn.on('close', e => {

reconnect(e)

})

return conn

})

.then(conn => {

cnt = 0

log('connect RMQ OK')

isConnecting = false

console.log(' [*] Waiting for signals. To exit press CTRL+C')

return conn.createChannel()

})

.then(ch => {

return ch.assertQueue(q, {

durable: true

})

.then(() => {

return ch.assertExchange(ex, exType, {

durable: true

})

})

.then(() => {

return ch.assertQueue(q, {

durable: true

})

})

.then(() => {

return ch.bindQueue(q, ex, patten)

})

.then(() => {

return ch.consume(q, (msg) => {

pubRedisCli.publish(msg.fields.routingKey, msg.content.toString(), function (err, reply) {

if (err) {

log(err)

ch.nack(msg)

} else {

if (reply !== 0) {

ch.ack(msg)

} else {

ch.ack(msg)

saveUnSubscribeMsg(msg.content.toString())

}

}

})

})

})

})

.catch(e => {

isConnecting = false

reconnect(e)

})

}

function reconnect(e) {

if (!isConnecting) {

isConnecting = true

log(e.message)

log('lost RMQ connection')

cnt++

log(`正在第${cnt}次重新连接RMQ...`)

setTimeout(() => {

connect()

}, 10 * 1000)

}

}

connect()

/**

* if signals didn't be subscribed, they would be saved to ./data dir

* @param {string} msg

*/

function saveUnSubscribeMsg(msg) {

let date = new Date().toLocaleDateString()

const fs = require('fs')

const dir = './data'

if (!fs.existsSync(dir)) {

fs.mkdirSync(dir)

}

let path = `${dir}/${date}.txt`

let isExist = fs.existsSync(path)

if (isExist) {

fs.appendFileSync(path, msg)

} else {

fs.writeFileSync(path, msg)

}

}

function log(...args) {

console.log(...args, new Date().toLocaleString())

}

如果觉得《rabbitmq取消自动重连_rabbitmq客户端自动重连》对你有帮助,请点赞、收藏,并留下你的观点哦!

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。