本节书摘来自异步社区《微软云计算Windows Azure开发与部署权威指南》一书中的第6章,第6.9节,作者: 尹成 , 郝庭毅 , 张俊强 , 孙奉刚 , 寇睿明 更多章节内容可以访问云栖社区“异步社区”公众号查看。
6.9 AppFabric消息缓冲区:云端上的信息暂存区
6.9.1 认识消息缓冲区
消息缓冲区(Message Buffers)是一些小型、临时的缓存区,用于将消息存放一段时间直到其被在此检索。在Web编程模型场景中,如果Windows Azure服务总线绑定不能使用,那消息缓冲区就会很有用,比如,消息接收和处理控件运行在非Windows平台的计算机上或者该程序不是由Java编写实现的时候。应用程序可以使用HTTP协议访问消息缓冲区,并不一定要安装Windows Azure SDK,所以消息缓冲区让Web开发者、移动设备编程者和其他开发人员都可以通过创建使用HTTP请求获取消息的消息接收和处理控件,来实现自己的应用程序和服务总线的整合。
消息缓冲区技术使用HTTP REST协议来公开对消息缓冲区的各种操作,比如创建消息缓冲区、消息缓冲区发送消息、从消息缓冲区查找消息等。
1.消息缓冲区协议
消息缓冲区协议是一个HTTP REST协议,遵循REST准则,简单并易于理解。这样设计的目的就是确保开发者可以在任何客户端使用该协议,不需要任何库文件或SDK。
该协议依赖于访问控制服务的HTTP认证模型来加强对消息缓冲区的访问控制。这就意味着它使用了SWT(Simple Web Token)机制,用户可以通过该机制使用HTTP获得令牌,并将令牌嵌入HTTP请求中作为一个头部,这个令牌中包含了决定一个操作是否合法的声明。
消息缓冲协议要求每个消息缓冲区在服务总线的命名空间中有唯一的URI标识。这个URI就是代表消息缓冲区实例的一系列资源的根路径。每个资源类型都有唯一的URI和一组相关的HTTP动作集用来实现与该资源的交互。
用于与消息缓冲区交互的动作仿照标准的HTTP命令,这些动作的列表如下。
- POST/PUT:用于创建资源。POST用于创建一个消息资源的新实例。PUT用来创建或更新一个消息缓冲区资源。当使用POST时,新资源的地址会通过response返回。
- PUT:用于更新一个已存在的资源。
- DELETE:用于删除一个已存在的资源。
- GET:用来检索资源的一个实例。在此情况下,GET可以认为是表示资源的一个快照,适当的时候可以将其缓存。
2.消息缓冲区策略
消息缓冲区策略是一个XML文档,该文档定义了一个消息缓冲区所需的语义。当需要创建一个新的消息缓冲区示例时,必须在创建请求中包含一个消息缓冲区策略文档。对于一个已存在的消息缓冲区,可以检索其消息缓冲区策略文档来明确其语义。也可以更新一个存在的消息缓冲区策略文档来对超期的消息缓冲区进行续时,进行此操作时要保证其他的属性值不变。
3.消息缓冲区配额
消息缓冲区默认可以容纳10条消息。可以通过MessageBufferPolicy类的MaxMessageCount属性值来修改这个限制。消息缓冲区最多可以容纳50条消息。
6.9.2 消息缓冲区应用程序开发
上一小节提到了消息缓冲区协议,我们可以使用REST协议来进行消息缓冲区应用程序的开发。同时,如果安装了Windows Azure SDK,也可以使用Windows Azure SDK进行消息缓冲区应用程序的开发。本小节将简单给出两种方式的示例代码。
1.使用REST协议
下面的代码示例使用REST协议创建消息缓冲区,向消息缓冲区发送消息,从消息缓冲区检索消息,最后将消息缓冲区删除。本示例通过System.Net.WebClient来发送和接收HTTP请求。完整的运行示例可参考ServiceBusExploringFeaturesMessageBuffer中Windows Azure SDK samples文件夹下的PlainHttp样例。
// Prompt the user for the service namespace and issuer key.
Console.Write("Please enter your Service Namespace: ");
string serviceNamespace = Console.ReadLine();
Console.Write("Please enter the key for the 'owner' issuer: ");
string ownerKey = Console.ReadLine();
// Create a GUID for the buffer name.
string bufferName = Guid.NewGuid().ToString("N");
// Construct the message buffer URI.
string messageBufferLocation = string.Format("http://{0}.servicebus.windows.net/{1}", serviceNamespace, bufferName);
// Get the AC token
WebClient client = new WebClient();
client.BaseAddress = string.Format("https://{0}-sb.accesscontrol.windows.net/", serviceNamespace);
NameValueCollection values = new NameValueCollection();
values.Add("wrap_name", "owner");
values.Add("wrap_password", ownerKey);
values.Add("wrap_scope", messageBufferLocation);
byte[] responseBytes = client.UploadValues("WRAPv0.9", "POST", values);
string response = Encoding.UTF8.GetString(responseBytes);
string token = Uri.UnescapeDataString(response.Split('&').Single(value => value.StartsWith("wrap_access_token=", StringComparison.OrdinalIgnoreCase)).Split('=')[1]);
// Create the auth header from the token
string authHeaderValue = string.Format("WRAP access_token=\"{0}\"", token);
// Create the message buffer policy.
string policy =
"<entry xmlns=\"http://www.w3.org/2005/Atom\">" +
"<content type=\"text/xml\">" +
"<MessageBufferPolicy xmlns=\"http://schemas.microsoft.com/netservices/2009/05/servicebus/connect\"/>" +
"</content>" +
"</entry>";
// Create a message buffer.
client.BaseAddress = string.Format("https://{0}.servicebus.windows.net/{1}/", serviceNamespace, bufferName);
client.Headers[HttpRequestHeader.ContentType] = "application/atom+xml;type=entry; charset=utf-8";
client.Headers[HttpRequestHeader.Authorization] = authHeaderValue;
client.UploadData(String.Empty, "PUT", Encoding.UTF8.GetBytes(policy));
Console.WriteLine("Message buffer was created at '{0}'.", messageBufferLocation);
// Send a message to the message buffer.
client.Headers[HttpRequestHeader.ContentType] = "text/xml";
client.Headers[HttpRequestHeader.Authorization] = authHeaderValue;
client.UploadData("messages?timeout=20", "POST", Encoding.UTF8.GetBytes("<msg1>This is message #1</msg1>"));
Console.WriteLine("Message sent.");
// Retrieve message.
client.Headers[HttpRequestHeader.Authorization] = authHeaderValue;
string payload = Encoding.UTF8.GetString(client.UploadData("messages/head?timeout=20", "DELETE", newbyte[0]));
Console.WriteLine("Retrieved the message '{0}'.", payload);
// Delete the message buffer.
client.Headers[HttpRequestHeader.Authorization] = authHeaderValue;
client.UploadData(String.Empty, "DELETE", newbyte[0]);
Console.WriteLine("Message buffer at '{0}' was deleted.", messageBufferLocation);
2.使用Windows Azure SDK提供的API
使用Windows Azure SDK的API操作消息缓冲区主要通过两个类——MessageBufferClient和MessageBufferPolicy。
使用MessageBufferClient类可以创建新的消息缓冲区或者检索一个类,通过它与已存在的消息缓冲区交互。该类提供直接针对消息缓冲区操作的方法,比如将消息在消息缓冲区中排队,从消息缓冲区中检索消息等。
使用MessageBufferPolicy可以对消息缓冲区进行包括安全性、生命周期和容量等方面的定制。
下面的代码创建并定制了一个消息缓冲区,然后完成了向该消息缓冲区发送消息和从消息缓冲区检索消息。
string serviceNamespace = "...";
MessageVersion messageVersion = MessageVersion.Soap12WSAddressing10;
string messageAction = "urn:Message";
// Configure credentials.
TransportClientEndpointBehavior behavior = new TransportClientEndpointBehavior();
behavior.CredentialType = TransportClientCredentialType.SharedSecret;
behavior.Credentials.SharedSecret.IssuerName = "...";
behavior.Credentials.SharedSecret.IssuerSecret = "...";
// Configure buffer policy.
MessageBufferPolicy policy = new MessageBufferPolicy
{
ExpiresAfter = TimeSpan.FromMinutes(2.0d),
MaxMessageCount = 100
};
// Create message buffer.
string bufferName = "MyBuffer";
Uri bufferLocation = new Uri("https://" + serviceNamespace + ".servicebus.windows.net/ services/" + bufferName);
MessageBufferClient client = MessageBufferClient.CreateMessageBuffer(behavior, bufferLocation, policy, messageVersion);
// Send 10 messages.
for (int i = 0; i < 10; ++i)
{
client.Send(Message.CreateMessage(messageVersion, messageAction, "Message #" + i));
}
Message message;
string content;
// Retrieve a message (destructive read).
message = client.Retrieve();
content = message.GetBody<string>();
message.Close();
Console.WriteLine("Retrieve message content: {0}", content);
// Retrieve a message (peek/lock).
message = client.PeekLock();
content = message.GetBody<string>();
Console.WriteLine("PeekLock message content: {0}", content);
// Delete previously locked message.
client.DeleteLockedMessage(message);
message.Close();
// If no more messages are retrieved within the ExpiresAfter time span,
// the buffer will automatically be deleted...