zoukankan      html  css  js  c++  java
  • Amazon SQS 消息队列服务

    Amazon sqs是亚马逊提供的线上消息队列服务, 可以实现应用程序解耦,以及可靠性保证。 sqs提供了两种消息队列, 一种是标准消息队列, 一种是先进先出队列(FIFO), 其区别是FIFO是严格有序的,即消息接收的顺序是按照消息发送的顺序来的, 而标准队列是尽最大可能有序, 即不保证一定为有序, 此外FIFO还保证了消息在一定时间内不能重复发出,即使是重复发了, 它也不会把消息发送到队列上。

    队列操作

    创建队列
    AmazonSQS sqs = AmazonSQSClientBuilder.defaultClient();
    CreateQueueRequest create_request = new CreateQueueRequest(QUEUE_NAME)
            .addAttributesEntry("DelaySeconds", "60")
            .addAttributesEntry("MessageRetentionPeriod", "86400");
    
    try {
        sqs.createQueue(create_request);
    } catch (AmazonSQSException e) {
        if (!e.getErrorCode().equals("QueueAlreadyExists")) {
            throw e;
        }
    }
    
    列出队列
    AmazonSQS sqs = AmazonSQSClientBuilder.defaultClient();
    ListQueuesResult lq_result = sqs.listQueues();
    System.out.println("Your SQS Queue URLs:");
    for (String url : lq_result.getQueueUrls()) {
        System.out.println(url);
    }
    
    获取队列Url
    AmazonSQS sqs = AmazonSQSClientBuilder.defaultClient();
    String queue_url = sqs.getQueueUrl(QUEUE_NAME).getQueueUrl();
    
    删除队列
    AmazonSQS sqs = AmazonSQSClientBuilder.defaultClient();
    sqs.deleteQueue(queue_url);
    

    消息操作

    发送消息
    SendMessageRequest send_msg_request = new SendMessageRequest()
            .withQueueUrl(queueUrl)
            .withMessageBody("hello world")
            .withDelaySeconds(5);
    sqs.sendMessage(send_msg_request);
    
    批量发送消息
    SendMessageBatchRequest send_batch_request = new SendMessageBatchRequest()
            .withQueueUrl(queueUrl)
            .withEntries(
                    new SendMessageBatchRequestEntry(
                            "msg_1", "Hello from message 1"),
                    new SendMessageBatchRequestEntry(
                            "msg_2", "Hello from message 2")
                            .withDelaySeconds(10));
    sqs.sendMessageBatch(send_batch_request);
    
    获取消息
    List<Message> messages = sqs.receiveMessage(queueUrl).getMessages();
    
    删除消息
    for (Message m : messages) {
        sqs.deleteMessage(queueUrl, m.getReceiptHandle());
    }
    

    使用JMS方法

    发送消息
    public class TextMessageSender {
    public static void main(String args[]) throws JMSException {
        ExampleConfiguration config = ExampleConfiguration.parseConfig("TextMessageSender", args);
    
        ExampleCommon.setupLogging();
    
        // Create the connection factory based on the config       
        SQSConnectionFactory connectionFactory = new SQSConnectionFactory(
                new ProviderConfiguration(),
                AmazonSQSClientBuilder.standard()
                        .withRegion(config.getRegion().getName())
                        .withCredentials(config.getCredentialsProvider())
                );
    
        // Create the connection
        SQSConnection connection = connectionFactory.createConnection();
    
        // Create the queue if needed
        ExampleCommon.ensureQueueExists(connection, config.getQueueName());
    
        // Create the session
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        MessageProducer producer = session.createProducer( session.createQueue( config.getQueueName() ) );
    
        sendMessages(session, producer);
    
        // Close the connection. This closes the session automatically
        connection.close();
        System.out.println( "Connection closed" );
    }
    
    private static void sendMessages( Session session, MessageProducer producer ) {
        BufferedReader inputReader = new BufferedReader(
            new InputStreamReader( System.in, Charset.defaultCharset() ) );
    
        try {
            String input;
            while( true ) { 
                System.out.print( "Enter message to send (leave empty to exit): " );
                input = inputReader.readLine();
                if( input == null || input.equals("" ) ) break;
    
                TextMessage message = session.createTextMessage(input);
                producer.send(message);
                System.out.println( "Send message " + message.getJMSMessageID() );
            }
        } catch (EOFException e) {
            // Just return on EOF
        } catch (IOException e) {
            System.err.println( "Failed reading input: " + e.getMessage() );
        } catch (JMSException e) {
            System.err.println( "Failed sending message: " + e.getMessage() );
            e.printStackTrace();
        }
    }
    }
    
    同步接收消息
    public class SyncMessageReceiver {
    public static void main(String args[]) throws JMSException {
    ExampleConfiguration config = ExampleConfiguration.parseConfig("SyncMessageReceiver", args);
    
    ExampleCommon.setupLogging();
    
    // Create the connection factory based on the config
    SQSConnectionFactory connectionFactory = new SQSConnectionFactory(
            new ProviderConfiguration(),
            AmazonSQSClientBuilder.standard()
                    .withRegion(config.getRegion().getName())
                    .withCredentials(config.getCredentialsProvider())
            );
    
    // Create the connection
    SQSConnection connection = connectionFactory.createConnection();
    
    // Create the queue if needed
    ExampleCommon.ensureQueueExists(connection, config.getQueueName());
    
    // Create the session
    Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
    MessageConsumer consumer = session.createConsumer( session.createQueue( config.getQueueName() ) );
    
    connection.start();
    
    receiveMessages(session, consumer);
    
    // Close the connection. This closes the session automatically
    connection.close();
    System.out.println( "Connection closed" );
    }
    
    private static void receiveMessages( Session session, MessageConsumer consumer ) {
    try {
        while( true ) {
            System.out.println( "Waiting for messages");
            // Wait 1 minute for a message
            Message message = consumer.receive(TimeUnit.MINUTES.toMillis(1));
            if( message == null ) {
                System.out.println( "Shutting down after 1 minute of silence" );
                break;
            }
            ExampleCommon.handleMessage(message);
            message.acknowledge();
            System.out.println( "Acknowledged message " + message.getJMSMessageID() );
        }
    } catch (JMSException e) {
        System.err.println( "Error receiving from SQS: " + e.getMessage() );
        e.printStackTrace();
    }
    }
    }
    
    异步接收消息
    public class AsyncMessageReceiver {
    public static void main(String args[]) throws JMSException, InterruptedException {
        ExampleConfiguration config = ExampleConfiguration.parseConfig("AsyncMessageReceiver", args);
    
        ExampleCommon.setupLogging();          
    
        // Create the connection factory based on the config
        SQSConnectionFactory connectionFactory = new SQSConnectionFactory(
                new ProviderConfiguration(),
                AmazonSQSClientBuilder.standard()
                        .withRegion(config.getRegion().getName())
                        .withCredentials(config.getCredentialsProvider())
                );
    
        // Create the connection
        SQSConnection connection = connectionFactory.createConnection();
    
        // Create the queue if needed
        ExampleCommon.ensureQueueExists(connection, config.getQueueName());
    
        // Create the session
        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
        MessageConsumer consumer = session.createConsumer( session.createQueue( config.getQueueName() ) );
    
        ReceiverCallback callback = new ReceiverCallback();
        consumer.setMessageListener( callback );
    
        // No messages are processed until this is called
        connection.start();
    
        callback.waitForOneMinuteOfSilence();
        System.out.println( "Returning after one minute of silence" );
    
        // Close the connection. This closes the session automatically
        connection.close();
        System.out.println( "Connection closed" );
    }
    
    
    private static class ReceiverCallback implements MessageListener {
        // Used to listen for message silence
        private volatile long timeOfLastMessage = System.nanoTime();
    
        public void waitForOneMinuteOfSilence() throws InterruptedException {
            for(;;) {
                long timeSinceLastMessage = System.nanoTime() - timeOfLastMessage;
                long remainingTillOneMinuteOfSilence = 
                    TimeUnit.MINUTES.toNanos(1) - timeSinceLastMessage;
                if( remainingTillOneMinuteOfSilence < 0 ) {
                    break;
                }
                TimeUnit.NANOSECONDS.sleep(remainingTillOneMinuteOfSilence);
            }
        }
    
    
        @Override
        public void onMessage(Message message) {
            try {
                ExampleCommon.handleMessage(message);
                message.acknowledge();
                System.out.println( "Acknowledged message " + message.getJMSMessageID() );
                timeOfLastMessage = System.nanoTime();
            } catch (JMSException e) {
                System.err.println( "Error processing message: " + e.getMessage() );
                e.printStackTrace();
            }
        }
    }
    }
    

    https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/examples-sqs-messages.html
    https://docs.amazonaws.cn/en_us/AWSSimpleQueueService/latest/SQSDeveloperGuide/code-examples.html

  • 相关阅读:
    算法
    算法
    算法
    算法
    算法
    【PAT】B1064 朋友数(20 分)
    【PAT】B1065 单身狗(25 分)
    【PAT】B1066 图像过滤(15 分)
    【PAT】B1067 试密码(20 分)
    【PAT】B1068 万绿丛中一点红(20 分)
  • 原文地址:https://www.cnblogs.com/helloz/p/9314915.html
Copyright © 2011-2022 走看看