0%

阿里云消息服务MNS-JAVA SDK

准备

  1. 下载最新版 Java SDK,解压到 aliyun-sdk-mns-samples 文件夹。

  2. 用 Eclipse 导入 Maven 工程,选中 aliyun-sdk-mns-samples 文件夹。

    3.在用户目录中创建.aliyun-mns.properties 文件,并填写服务地址、AccessKeyId 和 AccessKeySecret

1
2
3
mns.accountendpoint=http://<yourAccountId>.mns.cn-hangzhou.aliyuncs.com
mns.accesskeyid=<yourAccessKeyId>
mns.accesskeysecret=<yourAccessKeySecret>

4.Maven 依赖

1
2
3
4
5
<dependency>
<groupId>com.aliyun.mns</groupId>
<artifactId>aliyun-sdk-mns</artifactId>
<version>1.1.9.1</version>
</dependency>

主题使用手册

创建主题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class CreateTopicDemo {
public static void main(String[] args) {
CloudAccount account = new CloudAccount("YourAccessId", "YourAccessKey", "MNSEndpoint");
MNSClient client = account.getMNSClient(); // 在程序中,CloudAccount以及MNSClient单例实现即可,多线程安全。

String topicName = "TestTopic";
TopicMeta meta = new TopicMeta();
meta.setTopicName(topicName);

try {
CloudTopic topic = client.createTopic(meta);
} catch (Exception e) {
e.printStackTrace();
System.out.println("create topic error, " + e.getMessage());
}

client.close();
}
}

创建队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class CreateQueueDemo {

public static void main(String[] args) {
CloudAccount account = new CloudAccount(
ServiceSettings.getMNSAccessKeyId(),
ServiceSettings.getMNSAccessKeySecret(),
ServiceSettings.getMNSAccountEndpoint());
MNSClient client = account.getMNSClient();

try
{
QueueMeta qMeta = new QueueMeta();
qMeta.setQueueName("queue-demo");
qMeta.setPollingWaitSeconds(30);
CloudQueue cQueue = client.createQueue(qMeta);
System.out.println("Create queue successfully. URL: " + cQueue.getQueueURL());
} catch (ClientException ce)
{
System.out.println("Something wrong with the network connection between client and MNS service."
+ "Please check your network and DNS availablity.");
ce.printStackTrace();
} catch (ServiceException se)
{
if (se.getErrorCode().equals("QueueNotExist"))
{
System.out.println("Queue is not exist.Please create before use");
} else if (se.getErrorCode().equals("TimeExpired"))
{
System.out.println("The request is time expired. Please check your local machine timeclock");
}
se.printStackTrace();
} catch (Exception e)
{
System.out.println("Unknown exception happened!");
e.printStackTrace();
}

client.close();
}

}

创建订阅

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class SubscribeDemo {
public static void main(String[] args) {
String region = "";
String accountId = "";
String queueName = "TestQueue";
CloudAccount account = new CloudAccount("YourAccessId", "YourAccessKey", "MNSEndpoint");
MNSClient client = account.getMNSClient(); // 在程序中,CloudAccount以及MNSClient单例实现即可,多线程安全。

CloudTopic topic = client.getTopicRef("TestTopic");
try {
SubscriptionMeta subMeta = new SubscriptionMeta();
subMeta.setSubscriptionName("QueueEndpoint2");
subMeta.setEndpoint(String.format("acs:mns:%s:%s:queues/%s", region, accountId, queueName));
subMeta.setNotifyContentFormat(SubscriptionMeta.NotifyContentFormat.XML);
String subUrl = topic.subscribe(subMeta);
System.out.println("subscription url: " + subUrl);
} catch (Exception e) {
e.printStackTrace();
System.out.println("subscribe/unsubribe error");
}

client.close();
}
}

发布消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class PublishMessageDemo {
public static void main(String[] args) {
CloudAccount account = new CloudAccount("YourAccessId", "YourAccessKey", "MNSEndpoint");
MNSClient client = account.getMNSClient(); // 在程序中,CloudAccount以及MNSClient单例实现即可,多线程安全。

CloudTopic topic = client.getTopicRef("TestTopic");
try {
TopicMessage msg = new Base64TopicMessage(); //可以使用TopicMessage结构,选择不进行Base64加密。
msg.setMessageBody("hello world!");
//msg.setMessageTag("filterTag"); //设置该条发布消息的filterTag。
msg = topic.publishMessage(msg);
System.out.println(msg.getMessageId());
System.out.println(msg.getMessageBodyMD5());
} catch (Exception e) {
e.printStackTrace();
System.out.println("subscribe error");
}

client.close();
}
}

从队列接收和删除消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public class ComsumerDemo {

public static void main(String[] args) {
CloudAccount account = new CloudAccount(
ServiceSettings.getMNSAccessKeyId(),
ServiceSettings.getMNSAccessKeySecret(),
ServiceSettings.getMNSAccountEndpoint());
MNSClient client = account.getMNSClient();

try{
CloudQueue queue = client.getQueueRef("queue-demo");
for (int i = 0; i < 10; i++)
{
Message popMsg = queue.popMessage();
if (popMsg != null){
System.out.println("message handle: " + popMsg.getReceiptHandle());
System.out.println("message body: " + popMsg.getMessageBodyAsString());
System.out.println("message id: " + popMsg.getMessageId());
System.out.println("message dequeue count:" + popMsg.getDequeueCount());

queue.deleteMessage(popMsg.getReceiptHandle());
System.out.println("delete message successfully.\n");
}
}
} catch (ClientException ce)
{
System.out.println("Something wrong with the network connection between client and MNS service."
+ "Please check your network and DNS availablity.");
ce.printStackTrace();
} catch (ServiceException se)
{
if (se.getErrorCode().equals("QueueNotExist"))
{
System.out.println("Queue is not exist.Please create queue before use");
} else if (se.getErrorCode().equals("TimeExpired"))
{
System.out.println("The request is time expired. Please check your local machine timeclock");
}

se.printStackTrace();
} catch (Exception e)
{
System.out.println("Unknown exception happened!");
e.printStackTrace();
}

client.close();
}
}

删除主题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class DeleteTopicDemo {
public static void main(String[] args) {
CloudAccount account = new CloudAccount("YourAccessId", "YourAccessKey", "MNSEndpoint");
MNSClient client = account.getMNSClient(); // 在程序中,CloudAccount以及MNSClient单例实现即可,多线程安全。

CloudTopic topic = client.getTopicRef("TestTopic");
try {
topic.delete();
} catch (Exception e) {
e.printStackTrace();
System.out.println("delete topic error");
}

client.close();
}
}

删除队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class DeleteTopicDemo {
public static void main(String[] args) {
CloudAccount account = new CloudAccount("YourAccessId", "YourAccessKey", "MNSEndpoint");
MNSClient client = account.getMNSClient(); // 在程序中,CloudAccount以及MNSClient单例实现即可,多线程安全。

CloudTopic topic = client.getTopicRef("TestTopic");
try {
topic.delete();
} catch (Exception e) {
e.printStackTrace();
System.out.println("delete topic error");
}

client.close();
}
}

FitletTag 使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package com.aliyun.mns.samples;


import com.aliyun.mns.client.CloudAccount;
import com.aliyun.mns.client.CloudQueue;
import com.aliyun.mns.client.CloudTopic;
import com.aliyun.mns.client.MNSClient;
import com.aliyun.mns.common.utils.ServiceSettings;
import com.aliyun.mns.model.*;

public class TopicSample {

public static void main(String[] args) {
CloudAccount account = new CloudAccount(
ServiceSettings.getMNSAccessKeyId(),
ServiceSettings.getMNSAccessKeySecret(),
ServiceSettings.getMNSAccountEndpoint());
MNSClient client = account.getMNSClient();

// 1.创建队列。
QueueMeta queueMeta = new QueueMeta();
queueMeta.setQueueName("TestSubForQueue");
CloudQueue queue = client.createQueue(queueMeta);
// 2.创建主题。
TopicMeta topicMeta = new TopicMeta();
topicMeta.setTopicName("TestTopic");
CloudTopic topic = client.createTopic(topicMeta);
// 3.创建订阅。
SubscriptionMeta subMeta = new SubscriptionMeta();
subMeta.setSubscriptionName("TestForQueueSub");
subMeta.setNotifyContentFormat(SubscriptionMeta.NotifyContentFormat.SIMPLIFIED);
subMeta.setEndpoint(topic.generateQueueEndpoint("TestSubForQueue"));
subMeta.setFilterTag("filterTag");
topic.subscribe(subMeta);
// 4.发布消息。
TopicMessage msg = new Base64TopicMessage();
msg.setMessageBody("hello world");
msg.setMessageTag("filterTag");
msg = topic.publishMessage(msg);
// 5.从订阅的队列中获取消息。
Message msgReceive = queue.popMessage(30);
System.out.println("ReceiveMessage From TestSubForQueue:");
System.out.println(msgReceive.getMessageBody());
System.exit(0);
}
}