阿里云消息队列MQ_HTTP接入 for .NetCore 简单例子
2016-08-08 08:47
531 查看
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Security.Cryptography;
using System.Text;
using System.Threading.Tasks;
using Newtonsoft.Json;
namespace MQWebCore
{
public class MQHelper
{
string URL = "http://publictest-rest.ons.aliyun.com";
string topic, secretKey, accessKey;
public MQHelper(string topic,string secretKey,string accessKey)
{
this.topic = topic;
this.secretKey = secretKey;
this.accessKey = accessKey;
}
/// <summary>
/// URL 中的 Key,Tag以及 POST Content-Type 没有任何的限制,只要确保Key 和 Tag 相同唯一即可
/// </summary>
/// <param name="tag"></param>
/// <param name="key"></param>
/// <param name="body"></param>
/// <returns></returns>
public async Task<bool> Pub(string tag, string key, string body)
{
using (HttpClient httpClient = new HttpClient())
{
httpClient.DefaultRequestHeaders.Connection.Add("keep-alive");
HttpContent content = new StringContent(body, Encoding.UTF8);
httpClient.DefaultRequestHeaders.Accept.Add(new MediaTypeWithQualityHeaderValue("text/html"));
var time = (long)(DateTime.Now.ToUniversalTime() - new DateTime(1970, 1, 1)).TotalMilliseconds;
var signString = Sign(string.Format("{0}\nPID_{0}\n{1}\n{2}", topic, MD5Encrypt(body), time), secretKey);
httpClient.DefaultRequestHeaders.Add("AccessKey", accessKey);
httpClient.DefaultRequestHeaders.Add("Signature", signString);
httpClient.DefaultRequestHeaders.Add("ProducerID", string.Format("PID_{0}", topic));
var url = URL + "/message/?topic=" + topic + "&time=" + time + "&tag=" + tag + "&key=" + key;
var res = await httpClient.PostAsync(url, content);
if (res.StatusCode == System.Net.HttpStatusCode.Created)
{
return true;
}
return false;
}
}
public async void Subscribe(string tag = "*")
{
using (HttpClient httpClient = new HttpClient())
{
httpClient.DefaultRequestHeaders.Connection.Add("keep-alive");
httpClient.DefaultRequestHeaders.Add("Accept-Charset", "utf-8");
var time = (long)(DateTime.Now.ToUniversalTime() - new DateTime(1970, 1, 1)).TotalMilliseconds;
var signString = Sign(string.Format("{0}\nCID_{0}\n{1}", topic, time), secretKey);
httpClient.DefaultRequestHeaders.Add("AccessKey", accessKey);
httpClient.DefaultRequestHeaders.Add("Signature", signString);
httpClient.DefaultRequestHeaders.Add("ConsumerID", string.Format("CID_{0}", topic));
var url = URL + "/message/?topic=" + topic + "&time=" + time + "&num=32";
var res = httpClient.GetAsync(url).GetAwaiter().GetResult();
Console.WriteLine(res.StatusCode);
if (res.StatusCode == System.Net.HttpStatusCode.OK)
{
var msg = await res.Content.ReadAsStringAsync();
Console.WriteLine(msg);
if (msg != null && msg.Length > 10)
{
MQMessage[] mqMsgs = JsonConvert.DeserializeObject<MQMessage[]>(msg);
foreach (var mqMsg in mqMsgs)
{
Delete(mqMsg.msgHandle);
}
}
}
}
}
async void Delete(string msgHandle)
{
using (HttpClient httpClient = new HttpClient())
{
httpClient.DefaultRequestHeaders.Accept.Add(new MediaTypeWithQualityHeaderValue("text/html"));
var time = (long)(DateTime.Now.ToUniversalTime() - new DateTime(1970, 1, 1)).TotalMilliseconds;
var signString = Sign(string.Format("{0}\nCID_{0}\n{1}\n{2}", topic, msgHandle, time), secretKey);
httpClient.DefaultRequestHeaders.Add("AccessKey", accessKey);
httpClient.DefaultRequestHeaders.Add("Signature", signString);
httpClient.DefaultRequestHeaders.Add("ConsumerID", string.Format("CID_{0}", topic));
var url = URL + "/message/?topic=" + topic + "&time=" + time + "&msgHandle=" + msgHandle;
var res = await httpClient.DeleteAsync(url);
if (res.StatusCode == System.Net.HttpStatusCode.NoContent)
{
Console.WriteLine("消息删除成功,无需返回内容");
}
else
{
Console.WriteLine(res.StatusCode);
}
}
}
string MD5Encrypt(string strText)
{
using (var md5 = MD5.Create())
{
var result = md5.ComputeHash(Encoding.UTF8.GetBytes(strText));
return BitConverter.ToString(result).Replace("-", "").ToLower();
}
}
string Sign(string signatureString, string secretKey, bool isRaw = true)
{
var enc = Encoding.UTF8;
HMACSHA1 hmac = new HMACSHA1(enc.GetBytes(secretKey));
hmac.Initialize();
byte[] buffer = enc.GetBytes(signatureString);
if (isRaw)
{
byte[] ret = hmac.ComputeHash(buffer);
return Convert.ToBase64String(ret);
}
else
{
string res = BitConverter.ToString(hmac.ComputeHash(buffer)).Replace("-", "").ToLower();
return Convert.ToBase64String(Encoding.UTF8.GetBytes(res));
}
}
}
public class MQMessage
{
public string body;
public string bornTime;
public string msgHandle;
public string msgId;
public long reconsumeTimes;
public string tag;
}
使用:using MQWebCore;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace ConsoleApp1
{
public class Program
{
public static void Main(string[] args)
{
Console.OutputEncoding = System.Text.Encoding.UTF8;
//Encoding.RegisterProvider(CodePagesEncodingProvider.Instance);
MQHelper mqHelper = new MQHelper("Test", "3412qsd's12", "3412341212");
var res = mqHelper.Pub("testTag", "testKey", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss") + "阿特斯地方").GetAwaiter().GetResult();
Debug.WriteLine(res);
while (true)
{
mqHelper.Subscribe();
Thread.Sleep(1000);
}
Console.Read();
}
}
}
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Security.Cryptography;
using System.Text;
using System.Threading.Tasks;
using Newtonsoft.Json;
namespace MQWebCore
{
public class MQHelper
{
string URL = "http://publictest-rest.ons.aliyun.com";
string topic, secretKey, accessKey;
public MQHelper(string topic,string secretKey,string accessKey)
{
this.topic = topic;
this.secretKey = secretKey;
this.accessKey = accessKey;
}
/// <summary>
/// URL 中的 Key,Tag以及 POST Content-Type 没有任何的限制,只要确保Key 和 Tag 相同唯一即可
/// </summary>
/// <param name="tag"></param>
/// <param name="key"></param>
/// <param name="body"></param>
/// <returns></returns>
public async Task<bool> Pub(string tag, string key, string body)
{
using (HttpClient httpClient = new HttpClient())
{
httpClient.DefaultRequestHeaders.Connection.Add("keep-alive");
HttpContent content = new StringContent(body, Encoding.UTF8);
httpClient.DefaultRequestHeaders.Accept.Add(new MediaTypeWithQualityHeaderValue("text/html"));
var time = (long)(DateTime.Now.ToUniversalTime() - new DateTime(1970, 1, 1)).TotalMilliseconds;
var signString = Sign(string.Format("{0}\nPID_{0}\n{1}\n{2}", topic, MD5Encrypt(body), time), secretKey);
httpClient.DefaultRequestHeaders.Add("AccessKey", accessKey);
httpClient.DefaultRequestHeaders.Add("Signature", signString);
httpClient.DefaultRequestHeaders.Add("ProducerID", string.Format("PID_{0}", topic));
var url = URL + "/message/?topic=" + topic + "&time=" + time + "&tag=" + tag + "&key=" + key;
var res = await httpClient.PostAsync(url, content);
if (res.StatusCode == System.Net.HttpStatusCode.Created)
{
return true;
}
return false;
}
}
public async void Subscribe(string tag = "*")
{
using (HttpClient httpClient = new HttpClient())
{
httpClient.DefaultRequestHeaders.Connection.Add("keep-alive");
httpClient.DefaultRequestHeaders.Add("Accept-Charset", "utf-8");
var time = (long)(DateTime.Now.ToUniversalTime() - new DateTime(1970, 1, 1)).TotalMilliseconds;
var signString = Sign(string.Format("{0}\nCID_{0}\n{1}", topic, time), secretKey);
httpClient.DefaultRequestHeaders.Add("AccessKey", accessKey);
httpClient.DefaultRequestHeaders.Add("Signature", signString);
httpClient.DefaultRequestHeaders.Add("ConsumerID", string.Format("CID_{0}", topic));
var url = URL + "/message/?topic=" + topic + "&time=" + time + "&num=32";
var res = httpClient.GetAsync(url).GetAwaiter().GetResult();
Console.WriteLine(res.StatusCode);
if (res.StatusCode == System.Net.HttpStatusCode.OK)
{
var msg = await res.Content.ReadAsStringAsync();
Console.WriteLine(msg);
if (msg != null && msg.Length > 10)
{
MQMessage[] mqMsgs = JsonConvert.DeserializeObject<MQMessage[]>(msg);
foreach (var mqMsg in mqMsgs)
{
Delete(mqMsg.msgHandle);
}
}
}
}
}
async void Delete(string msgHandle)
{
using (HttpClient httpClient = new HttpClient())
{
httpClient.DefaultRequestHeaders.Accept.Add(new MediaTypeWithQualityHeaderValue("text/html"));
var time = (long)(DateTime.Now.ToUniversalTime() - new DateTime(1970, 1, 1)).TotalMilliseconds;
var signString = Sign(string.Format("{0}\nCID_{0}\n{1}\n{2}", topic, msgHandle, time), secretKey);
httpClient.DefaultRequestHeaders.Add("AccessKey", accessKey);
httpClient.DefaultRequestHeaders.Add("Signature", signString);
httpClient.DefaultRequestHeaders.Add("ConsumerID", string.Format("CID_{0}", topic));
var url = URL + "/message/?topic=" + topic + "&time=" + time + "&msgHandle=" + msgHandle;
var res = await httpClient.DeleteAsync(url);
if (res.StatusCode == System.Net.HttpStatusCode.NoContent)
{
Console.WriteLine("消息删除成功,无需返回内容");
}
else
{
Console.WriteLine(res.StatusCode);
}
}
}
string MD5Encrypt(string strText)
{
using (var md5 = MD5.Create())
{
var result = md5.ComputeHash(Encoding.UTF8.GetBytes(strText));
return BitConverter.ToString(result).Replace("-", "").ToLower();
}
}
string Sign(string signatureString, string secretKey, bool isRaw = true)
{
var enc = Encoding.UTF8;
HMACSHA1 hmac = new HMACSHA1(enc.GetBytes(secretKey));
hmac.Initialize();
byte[] buffer = enc.GetBytes(signatureString);
if (isRaw)
{
byte[] ret = hmac.ComputeHash(buffer);
return Convert.ToBase64String(ret);
}
else
{
string res = BitConverter.ToString(hmac.ComputeHash(buffer)).Replace("-", "").ToLower();
return Convert.ToBase64String(Encoding.UTF8.GetBytes(res));
}
}
}
public class MQMessage
{
public string body;
public string bornTime;
public string msgHandle;
public string msgId;
public long reconsumeTimes;
public string tag;
}
使用:using MQWebCore;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace ConsoleApp1
{
public class Program
{
public static void Main(string[] args)
{
Console.OutputEncoding = System.Text.Encoding.UTF8;
//Encoding.RegisterProvider(CodePagesEncodingProvider.Instance);
MQHelper mqHelper = new MQHelper("Test", "3412qsd's12", "3412341212");
var res = mqHelper.Pub("testTag", "testKey", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss") + "阿特斯地方").GetAwaiter().GetResult();
Debug.WriteLine(res);
while (true)
{
mqHelper.Subscribe();
Thread.Sleep(1000);
}
Console.Read();
}
}
}
相关文章推荐
- 基于HTTP协议的轻量级开源简单消息队列服务:HTTPSQS
- RabbitMQ .NET消息队列使用入门(三)【MVC实现RPC例子】
- Spring + JMS + ActiveMQ实现简单的消息队列(监听器异步实现)
- HTTPSQS(HTTP Simple Queue Service)是一款基于 HTTP GET/POST 协议的轻量级开源简单消息队列服务
- 浅谈使用java实现阿里云消息队列简单封装
- RabbitMQ .NET消息队列使用入门(一)【简单示例】
- 第二篇:spring+activeMQ实现消息队列简单demo
- 分布式消息队列RocketMQ&Kafka -- 消息的“顺序消费”-- 一个看似简单的复杂问题
- 【进程通信】消息队列简单例子
- 阿里云消息队列的C#使用http接口发送消息实例
- 分布式消息队列RocketMQ&Kafka -- 消息的“顺序消费”-- 一个看似简单的复杂问题
- Spring + JMS + ActiveMQ实现简单的消息队列(监听器异步实现)
- 使用java实现阿里云消息队列简单封装
- 分布式消息队列RocketMQ&Kafka -- 消息的“顺序消费”-- 一个看似简单的复杂问题
- RabbitMQ .NET消息队列使用入门(一)【简单示例】
- Spring + JMS + ActiveMQ实现简单的消息队列(监听器异步实现)
- [MQ]ActiveMQ消息收发简单例子
- 分布式消息队列RocketMQ&Kafka -- 消息的“顺序消费”-- 一个看似简单的复杂问题
- c# 消息队列 的简单例子
- MSMQ(Microsoft Message Queue,微软消息队列) Asp.Net 简单示例