/** * Declare a queue * @see com.rabbitmq.client.AMQP.Queue.Declare * @see com.rabbitmq.client.AMQP.Queue.DeclareOk * @param queue the name of the queue * @param durable true if we are declaring a durable queue (the queue will survive a server restart) * @param exclusive 该队列是否只供一个消费者进行消费,是否进行消息共享,true-多个消费者消费,false - 只能一个消费者消费 * true if we are declaring an exclusive queue (restricted to this connection) * @param autoDelete 最后一个消费者断开连接以后,该队列是否自动删除 * true if we are declaring an autodelete queue (server will delete it when no longer in use) * @param arguments other properties (construction arguments) for the queue * @return a declaration-confirm method to indicate the queue was successfully declared * @throws java.io.IOException if an error is encountered * * queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments) **/ channel.queueDeclare(SIMPLE_QUEUE_NAME, false, false, false, null); /** * Publish a message. * * Publishing to a non-existent exchange will result in a channel-level * protocol exception, which closes the channel. * * Invocations of <code>Channel#basicPublish</code> will eventually block if a * <a href="https://www.rabbitmq.com/alarms.html">resource-driven alarm</a> is in effect. * * @see com.rabbitmq.client.AMQP.Basic.Publish * @see <a href="https://www.rabbitmq.com/alarms.html">Resource-driven alarms</a> * @param exchange the exchange to publish the message to * @param routingKey the routing key * @param props other properties for the message - routing headers etc * @param body the message body * @throws java.io.IOException if an error is encountered */ channel.basicPublish("", SIMPLE_QUEUE_NAME, null, "hello,world!".getBytes(StandardCharsets.UTF_8));
Channelchannel= connection.createChannel(); /** * Start a non-nolocal, non-exclusive consumer, with * a server-generated consumerTag. * Provide access only to <code>basic.deliver</code> and * <code>basic.cancel</code> AMQP methods (which is sufficient * for most cases). See methods with a {@link Consumer} argument * to have access to all the application callbacks. * @param queue the name of the queue * @param autoAck 自动应答,接收到消息就回复服务端 true if the server should consider messages * acknowledged once delivered; false if the server should expect * explicit acknowledgements * @param deliverCallback callback when a message is delivered * @param cancelCallback callback when the consumer is cancelled * @return the consumerTag generated by the server * String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback) */ channel.basicConsume(SIMPLE_QUEUE_NAME, true, // 消息传递监听,接收到消息时触发 (consumerTag, message) -> { byte[] msgBytes = message.getBody(); System.out.println("接收到消息:" + newString(msgBytes)); }, // 取消消息时的回调 consumerTag -> { System.out.println("消息消费被中断"); }); } }