amqp实现go语言,go amqp

即时通信之 - RabbitMQ(基于socket)基础概念详细介绍

转自:

创新互联建站从2013年开始,是专业互联网技术服务公司,拥有项目网站制作、网站设计网站策划,项目实施与项目整合能力。我们以让每一个梦想脱颖而出为使命,1280元济源做网站,已为上家服务,为济源各地企业和个人服务,联系电话:18980820575

h2引言/h2

p你是否遇到过两个(多个)系统间需要通过定时任务来同步某些数据?你是否在为异构系统的不同进程间相互调用、通讯的问题而苦恼、挣扎?如果是,那么恭喜你,消息服务让你可以很轻松地解决这些问题。br /

消息服务擅长于解决多系统、异构系统间的数据交换(消息通知/通讯)问题,你也可以把它用于系统间服务的相互调用(RPC)。本文将要介绍的RabbitMQ就是当前最主流的消息a href="" title="中间件"中间件/a之一。/p

h2RabbitMQ简介/h2

pAMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的a href="" title="中间件"中间件/a设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。br /

AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。br /

RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。br /

下面将重点介绍RabbitMQ中的一些基础概念,了解了这些概念,是使用好RabbitMQ的基础。/p

h2ConnectionFactory、Connection、Channel/h2

pConnectionFactory、Connection、Channel都是RabbitMQ对外提供的API中最基本的对象。Connection是RabbitMQ的socket链接,它封装了socket协议相关部分逻辑。ConnectionFactory为Connection的制造工厂。br /

Channel是我们与RabbitMQ打交道的最重要的一个接口,我们大部分的业务操作是在Channel这个接口中完成的,包括定义Queue、定义Exchange、绑定Queue与Exchange、发布消息等。/p

h2Queue/h2

pQueue(队列)是RabbitMQ的内部对象,用于存储消息,用下图表示。br /

Ubuntu16.04搭建rabbitmq集群

1.1、什么是RabbitMQ?

RabbitMQ 是由 LShift 提供的一个 Advanced Message Queuing Protocol (AMQP) 的开源实现,由以高性能、健壮以及可伸缩性出名的 Erlang 写成,因此也是继承了这些优点。

1.2、什么是AMQP?

AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。它从生产者接收消息并递送给消费者,在这个过程中,根据规则进行路由,缓存与持久化。

AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。

RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如: Python 、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

1.3、RabbitMQ的基础概念

1.4、RabbitMQ的特性

2.1、环境:两台Ubuntu16.04主机

10.27.0.53 rabbitmq-1

10.27.0.130 rabbitmq-2

必须保证各个主机名之间可以ping通

2.2、安装Erlang

2.3、安装rabbitmq

2.4、安装完成后,验证一下服务是否正常

搭建好的rabbitmq默认是没有配置文件的,需要我们来手动添加

Rabbitmq的一些运行脚本存放在 /usr/sbin 下面

2.5、开启web管理插件

2.5.1、创建一个用户nova,并设置密码为123456

2.5.2、查看现有用户表

这个时候nova用户是不能访问web管理插件的,需要配置用户角色,用户角色可分为五类,超级管理员, 监控者, 策略制定者, 普通管理者以及其他。

可登陆管理控制台(启用management plugin的情况下),可查看所有的信息,并且可以对用户,策略(policy)进行操作。

可登陆管理控制台(启用management plugin的情况下),同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)

可登陆管理控制台(启用management plugin的情况下), 同时可以对policy进行管理。但无法查看节点的相关信息。

仅可登陆管理控制台(启用management plugin的情况下),无法看到节点信息,也无法对策略进行管理。

无法登陆管理控制台,通常就是普通的生产者和消费者。

将nova添加到administrator用户组

此时的nova用户只能通过本地来登录其他的IP无法直接使用这个账号。所以需要对他进行授权,使用户nova /(可以访问虚拟主机) 中所有资源的配置、写、读权限以便管理其中的资源

查看用户授权

2.5.3、开启web管理插件并重启rabbitmq服务

以下为关闭插件命令

通过浏览器访问 []输入用户名nova 密码123456就可以看到后台了

3.1、从管理界面可以看到,此时只有一个节点rabbitmq-1,我们需要把rabbitmq-2加进来,rabbitmq-2按照步骤2进行一系列安装就可以。此处不再细说。

3.2、添加节点

两台主机上安装的 RabbitMQ 都保证都可以正常启动,才可以进行以下操作

3.2.1、设置不同节点间统一认证的Erlang Cookie

这里将 rabbitmq-1 的该文件复制到 rabbitmq-2由于这个文件权限是 400为方便传输,先修改权限,非必须操作,所以需要先修改rabbitmq-2中的该文件权限为 777

然后将rabbitmq-1中的该文件拷贝的rabbitmq-2中

最后将权限和所属用户/组修改回来

此时rabbitmq-2节点需要重启一下服务

注意事项

cookie在所有节点上必须完全一样,同步时一定要注意。

erlang是通过主机名来连接服务,必须保证各个主机名之间可以ping通。可以通过编辑/etc/hosts来手工添加主机名和IP对应关系。如果主机名ping不通,rabbitmq服务启动会失败。

3.2.2、通过rabbitmqctl cluster_status命令,可以查看和个节点的状态,节点的名称是rabbit@shorthostname,

rabbitmq-1

rabbitmq-2

3.2.3

将两个节点组成集群

因为rabbitmq-server启动时,会一起启动节点和应用,它预先设置RabbitMQ应用为standalone模式。要将一个节点加入到现有的集群中,你需要停止这个应用并将节点设置为原始状态,然后就为加入集群准备好了。使用rabbitmqctl stop_app仅仅关闭应用。

将2加入1中

启动节点2的应用

如果要使用内存节点,则可以使用以下命令:其中–ram指的是作为内存节点,要是想做为磁盘节点的话,就不用加–ram这个参数了

集群配置好后,可以在 RabbitMQ 任意节点上执行 rabbitmqctl cluster_status 来查看是否集群配置成功。

Rabbitmq-1

Rabbitmq-2

同时在Web管理工具中也可以看到效果

上面配置RabbitMQ默认集群模式,但并不保证队列的高可用性,尽管交换机、绑定这些可以复制到集群里的任何一个节点,但是队列内容不会复制,虽然该模式解决一部分节点压力,但队列节点宕机直接导致该队列无法使用,只能等待重启,所以要想在队列节点宕机或故障也能正常使用,就要复制队列内容到集群里的每个节点,需要创建镜像队列。

镜像队列概念:镜像队列可以同步queue和message,当主queue挂掉,从queue中会有一个变为主queue来接替工作。镜像队列是基于普通的集群模式的,所以你还是得先配置普通集群,然后才能设置镜像队列。镜像队列设置后,会分一个主节点和多个从节点,如果主节点宕机,从节点会有一个选为主节点,原先的主节点起来后会变为从节点。queue和message虽然会存在所有镜像队列中,但客户端读取时不论物理面连接的主节点还是从节点,都是从主节点读取数据,然后主节点再将queue和message的状态同步给从节点,因此多个客户端连接不同的镜像队列不会产生同一message被多次接受的情况。

设置镜像队列策略

在普通集群的中任意节点启用策略,策略会自动同步到集群节点

命令格式

在任意一个节点上执行

集群重启

集群重启时,最后一个挂掉的节点应该第一个重启,如果因特殊原因(比如同时断电),而不知道哪个节点最后一个挂掉。可用以下方法重启:

先在一个节点上执行

在其他节点上执行

查看cluster状态是否正常(要在所有节点上查询)。

添加用户

处于安全的考虑,guest这个默认的用户只能通过 来登录,其他的IP无法直接使用这个账号。所以我们需要添加一个其他用户。

命令格式

删除用户

命令格式

修改密码

命令格式

用户授权

命令格式

该命令使用户nova /(可以访问虚拟主机) 中所有资源的配置、写、读权限以便管理其中的资源

查看用户授权

命令格式

查看当前用户列表

可以看到添加用户成功了,但不是administrator角色

添加角色

这里我们也将nova用户设置为administrator角色

命令格式

再次查看权限

清除权限信息

命令格式

下一篇为测试文章

Spring AMQP杂记之Spring实现简述

上一篇主要介绍了AMQP的一些知识,接下来开始正式步入Spring AMQP。

Message:在AMQP中并没有定义消息的模型,Spring为了方便我们理解与使用,新增了Message接口,在构建消息的时候Spring提供了builde API,MessageBuilder.xx.xx的形式使用起来很方便。

Exchange:这个接口和AMQP中定义的exchange基本相同,就不说了

Queue:同上。

Binding:一般叫他绑定关系,AMQP也有对其的抽象模型,只不过我认为他只不过相当于是附加在队列与交换机上的属性,所以在上篇关于AMQP的介绍中并没有详细说明。呃,其实spring对其的定义就是代表了队列与交换机的绑定关系。。。

spring提供了ConnectionFactory接口,当我们使用的时候会使用它的实现类CachingConnectionFactory,看名字也知道就是基于缓存的连接池,默认的池大小为25。Spring也提供了对于多个connectionFactory的支持接口例如SimpleRoutingConnectionFactory等。

我们使用SpringBoot进行测试,最小化的配置如下

这里先给出一个简单的例子然后再具体讲解。

如图,我们提前声明了一个名为hello的队列,浏览器访问/send时,可以看到控制台打印了相应的时间信息,即被@RabbitListener注解的方法被调用了。如果我们打开RabbitMq的webUI,会发现名为hello的队列中消息数量由0变为1再变为0。注意,这里我们并没有声明Exchange,MQ会为我们将队列绑定到默认的Exchange。

接下来就详细的说一下这个例子。对于操作RabbitMQ,Spring提供了 RabbitTemplate(对于batch操作,相应的是BatchingRabbitTemplate,在1.6版本以后,spring提供了异步的Template--AsyncRabbitTemplate)。我们使用它来发送与接收消息。当发送完消息的时候如何知道本次操作的成功或者失败呢?默认情况下不能被路由的消息将会被丢弃,这会导致消息丢失,不能保证消息可靠性(消息可靠性请参照上一篇AMQP介绍中的推荐)。发布确认机制是保证消息可靠性的第一步,发布确认保证我们知道消息是否成功到达队列中,返回ack则代表成功,nack则代表失败。要使用这个特性,我们需要将RabbitTemplate的mandatory属性和ConnectionFactory的publisherConfirms属性都设为true。这时我们可以在RabbitTemplate上设置setReturnCallback监听来接收MQ服务器返回的状态信息了。对于消息的确认,我们只需要设置RabbitTemplate.ConfirmCallback的回调方法即可。

当我们每次发送请求时,都会打印相应的ack,其中correlationData是生产者在发送数据时可以携带的相关信息。这里有个问题需要注意一下,RabbitTemplate只允许设置一个callback方法,这时你可以将RabbitTemplate的bean设为单例然后设置回调。

这样的缺点是所有使用这个template的地方都会使用这个回调,那么当我们想要为不同的操作定制callBack该怎么做?如果直接在别的地方继续设置会报"Only one ConfirmCallback is supported by each RabbitTemplate"异常,这时候我们就需要将RabbitTemplate的作用域设为@Scope,这样每个bean都是一个新的。难道这样就可以了么?我们的service类一般都是单例的,这意味着当service类生成后,注入的RabbitTemplate就已经不变了,这个就是Single域的bean中注入Scope域bean的问题。一种解决方法是实现ApplicationAware接口注入ApplicationContext,每次使用RabbitTemplate时调用其getBean方法。一个更好的解决方案是使用spring提供的lookup方法。

spring会帮我们代理lookup注解的方法,每次调用都会返回一个全新的bean。但其实平常使用一般都会将发送方单独抽取出来实现回调接口,不会涉及上面的问题,一般都如下配置,注意将template配置成scope即可。

RabbitTemplate可以添加消息转换器,作用就类似于mvc中配置的@ResponseBody消息转换器。

具体如何发送与接收消息感觉不用咋说了。。。就send,receive(x,x,x)这个用IDE看一下方法doc就知道咋用了。receive为拉模式,很少使用,关于接收方法我们更常使用的是异步接收,即推模式,一般使用@RabbitListener 实现

当hello队列中有消息时,方法会自动调用。

像我们平常做web开发,前端想要接受来自后台的消息无非俩个方法,前台请求和后台推送,前台轮询一般就是ajax定时器,推送一般使用WebSocket实现,MQ同样有两种模式:轮询请求队列看是否有消息即拉模式,队列中有消息即对消费者进行通知即推模式。

对于拉模式,Spring提供了receive,receiveAndConvert,和receiveAndReply方法。接收并回复的方法很有用,比如订单系统,下单消息被MQ处理完后再返回消息给其他队列,告诉她这个订单已经完成,可以进行付费操作了。接收并回复调用template.receiveAndReply实现自己的接收回调。对于推模式,项目中基本上使用@RabbitListener注解完成,该注解结合@SendTo注解完成receiveAndReply功能,若没有sendto,这个方法是不允许有返回值的。对于异常情况,配置@RabbitListener的errorHandler和returnExceptions即可。关于@RabbitListener注解的具体使用其实也挺复杂的,推荐直接看文档。使用监听器的过程中消息是默认经过消息转换器的,可以手动为其设置消息转换器。关于RabbitMQ LIstener的配置可以使用Config方式或者SpringBoot的配置文件方式。

上面只是官方文档的一部分,其实除了Listener大部分Config方式的配置都可以用配置文件方式替代。

声明队列与交换机:分为xml方式和Java Config方式(懒得写了,这个基本官网就是复制粘贴)

配置Broker:Spring对其的抽象为RabbitAdmin,也是官网。。

延时队列实现:设置交换机延时属性为true,通过convertAndSend中的MessagePostProcessor实现发送延时消息,这个方法需要安装延时交换机这样的一个插件(也可以通过死信队列实现)

好了。今天就先写这么多,因为实在是写的太乱了,以后有时间整理一下。。。

组件分享之后端组件——基于Golang实现的高性能和弹性的流处理器benthos

近期正在探索前端、后端、系统端各类常用组件与工具,对其一些常见的组件进行再次整理一下,形成标准化组件专题,后续该专题将包含各类语言中的一些常用组件。欢迎大家进行持续关注。

本节我们分享的是基于Golang实现的高性能和弹性的流处理器 benthos ,它能够以各种代理模式连接各种 源 和 接收器,并对有效负载执行 水合、浓缩、转换和过滤 。

它带有 强大的映射语言 ,易于部署和监控,并且可以作为静态二进制文件、docker 映像或 无服务器函数 放入您的管道,使其成为云原生。

Benthos 是完全声明性的,流管道在单个配置文件中定义,允许您指定连接器和处理阶段列表:

Apache Pulsar, AWS (DynamoDB, Kinesis, S3, SQS, SNS), Azure (Blob storage, Queue storage, Table storage), Cassandra, Elasticsearch, File, GCP (Pub/Sub, Cloud storage), HDFS, HTTP (server and client, including websockets), Kafka, Memcached, MQTT, Nanomsg, NATS, NATS JetStream, NATS Streaming, NSQ, AMQP 0.91 (RabbitMQ), AMQP 1, Redis (streams, list, pubsub, hashes), MongoDB, SQL (MySQL, PostgreSQL, Clickhouse, MSSQL), Stdin/Stdout, TCP UDP, sockets and ZMQ4.

1、docker安装

具体使用方式可以参见该 文档

有关如何配置更高级的流处理概念(例如流连接、扩充工作流等)的指导,请查看 说明书部分。

有关在 Go 中构建您自己的自定义插件的指导,请查看 公共 API。


网页题目:amqp实现go语言,go amqp
URL链接:http://csdahua.cn/article/heseee.html
扫二维码与项目经理沟通

我们在微信上24小时期待你的声音

解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流