zoukankan      html  css  js  c++  java
  • Biztalk Server zip unzip pipeline component Development

     

    Biztalk Server zip unzip pipeline component Development

    最近有个B2B的项目涉及和其他合作伙伴(partner)作数据传输,我们这边使用的开发平台(platform)Biztalk Server 2006,数据传输管道(channel)采用window server 2003MSMQ,但是由于MSMQ本身存在单个消息有4M限制的问题,虽软Biztalk Server 2006自带的MSMQ Adapter已经对大消息(Large Data)的支持,提供了[Support segmentation] if true , message larger than 4095KB(approximately 4MB) will be segmented,说明当发送单个报文实例(Instance)超过4MB的时候可以在发送至MSMQ对列的时候进行分割成几个小的消息发送,这些被分割的消息之间通过Message. CorrelationId进行关联具体的做法可以参考《MSMQ消息大于4MB限制的解决办法》采用对消息进行分割的做法会对接收消息需要作特定的判断,相对于对报文压缩来得比较简单;

    下面介绍一下如何通过对Biztalk Pipeline的二次开发实现对报文进行压缩/解压得实现;

    功能描述:

    1/Biztalk 流程(Orchestration)出来的消息在发送端口通过加载pipeline组件实现将消息以zip的方式进行压缩(zip可以对Xml,txt文档的压缩比达到101

    2/将接收的zip文件(支持包含多个文件批处理(batch))进行压缩后进入Biztalk流程(Orchestration)处理;

                        

    具体实现:

    要实现对Biztalk Pipeline的开发对如下接口[Microsoft.BizTalk.Component.Interop.IComponent,IBaseComponent, IPersistPropertyBag, IComponentUI]做实现,好在现在网上提供pipeline component wizradhttp://www.gotdotnet.com/Workspaces/Workspace.aspx?id=1d4f7d6b-7d27-4f05-a8ee-48cfcd5abf4a 可以下载到pipeline开发向导

    实现对文件进行压缩/解压需要的[ICSharpCode.SharpZipLib.dll]目前版本0.85相当稳定;下载地址:http://www.icsharpcode.net/OpenSource/SharpZipLib/ 具体方法请查看版本

    通过安装pipeline component wizrad之后就可以在vs.net中创建你的pipeline component组件了。CategoryTypes表示该组件可以加载到pipline的什么位置CategoryTypes.CATID_Any表示任何位置都可以放;

        [ComponentCategory(CategoryTypes.CATID_PipelineComponent)]

        [System.Runtime.InteropServices.Guid("62656b9b-7d69-407d-b71f-d3c0415c82af")]

        [ComponentCategory(CategoryTypes.CATID_DisassemblingParser)]

        public class UnzipDisassemblerComponent : Microsoft.BizTalk.Component.Interop.IDisassemblerComponent, IBaseComponent, IPersistPropertyBag, IComponentUI

        {

     

            private System.Resources.ResourceManager resourceManager = new System.Resources.ResourceManager("Execution.BizTalk.Common.Pipelines.UnzipDisassemblerPipeline", Assembly.GetExecutingAssembly());

     

            #region IBaseComponent members

    下面是对Biztalk消息进行解压/压缩的代码实现。

           

      1/// <summary>解压
      2
      3        /// called by the messaging engine when a new message arrives
      4
      5        /// </summary>
      6
      7        /// <param name="pc">the pipeline context</param>
      8
      9        /// <param name="inmsg">the actual message</param>

     10
     11        public void Disassemble(Microsoft.BizTalk.Component.Interop.IPipelineContext pc, Microsoft.BizTalk.Message.Interop.IBaseMessage inmsg)
     12
     13        {
     14
     15 
     16
     17            Stream strmZipFile;
     18
     19 
     20
     21            IBaseMessagePart msgPart;
     22
     23            msgPart = inmsg.BodyPart;
     24
     25            strmZipFile = msgPart.GetOriginalDataStream();
     26
     27 
     28
     29            ZipInputStream oZipStream = new ZipInputStream(strmZipFile);
     30
     31            if (!string.IsNullOrEmpty(mPassword))
     32
     33                oZipStream.Password = mPassword;
     34
     35 
     36
     37            try
     38
     39            {
     40
     41 
     42
     43                ZipEntry sEntry = oZipStream.GetNextEntry();
     44
     45                while (sEntry != null )
     46
     47                {
     48
     49                    if (sEntry.IsDirectory)
     50
     51                    {
     52
     53                        sEntry = oZipStream.GetNextEntry();
     54
     55                        continue;
     56
     57                    }

     58
     59 
     60
     61                    MemoryStream strmMem = new MemoryStream();
     62
     63                    byte[] buffer = new byte[4096];
     64
     65                    int count = 0;
     66
     67                    while ((count = oZipStream.Read(buffer, 0, buffer.Length)) != 0)
     68
     69                        strmMem.Write(buffer, 0, count);
     70
     71 
     72
     73                    strmMem.Seek(0, SeekOrigin.Begin);
     74
     75                    msgPart.Data = strmMem;
     76
     77 
     78
     79                    IBaseMessage outMsg;
     80
     81 
     82
     83                    outMsg = pc.GetMessageFactory().CreateMessage();
     84
     85                    outMsg.AddPart("Body", pc.GetMessageFactory().CreateMessagePart(), true);
     86
     87                    outMsg.BodyPart.Data = strmMem;
     88
     89 
     90
     91                    for (int iProp = 0; iProp < inmsg.Context.CountProperties; iProp++)
     92
     93                    {
     94
     95                        string strName;
     96
     97                        string strNSpace;
     98
     99 
    100
    101                        object val = inmsg.Context.ReadAt(iProp, out strName, out strNSpace);
    102
    103                        // If the property has been promoted, respect the settings
    104
    105                        if (inmsg.Context.IsPromoted(strName, strNSpace))
    106
    107                            outMsg.Context.Promote(strName, strNSpace, val);
    108
    109                        else
    110
    111                            outMsg.Context.Write(strName, strNSpace, val);
    112
    113                    }

    114
    115 
    116
    117                    if (this.Namespace != null && this.RootElementName != null)
    118
    119                    {
    120
    121                        string messageType = string.Format("{0}#{1}"this.Namespace, this.RootElementName);
    122
    123                        outMsg.Context.Promote("MessageType""http://schemas.microsoft.com/BizTalk/2003/system-properties", messageType);
    124
    125                    }

    126
    127                    _msgs.Enqueue(outMsg);
    128
    129                    sEntry = oZipStream.GetNextEntry();
    130
    131                }

    132
    133            }

    134
    135            catch (Exception ex)
    136
    137            {
    138
    139                Debug.Write(ex.Message + Environment.NewLine + ex.StackTrace);
    140
    141                throw;
    142
    143            }

    144
    145 
    146
    147        }

    148
    149 
    150
    151//压缩消息
    152
    153/// <summary>
    154
    155        /// Implements IComponent.Execute method.
    156
    157        /// </summary>
    158
    159        /// <param name="pc">Pipeline context</param>
    160
    161        /// <param name="inmsg">Input message</param>
    162
    163        /// <returns>Original input message</returns>
    164
    165        /// <remarks>
    166
    167        /// IComponent.Execute method is used to initiate
    168
    169        /// the processing of the message in this pipeline component.
    170
    171        /// </remarks>

    172
    173        public Microsoft.BizTalk.Message.Interop.IBaseMessage Execute(
    174
    175            Microsoft.BizTalk.Component.Interop.IPipelineContext pc, 
    176
    177            Microsoft.BizTalk.Message.Interop.IBaseMessage inmsg)
    178
    179        {
    180
    181            // 
    182
    183            // TODO: implement component logic
    184
    185            
    186
    187            //string[] filenames = Directory.GetFiles(args[0]);
    188
    189            byte[] buffer = new byte[4096];
    190
    191            string fileName = string.Empty;
    192
    193            Stream strmZipFile=new MemoryStream();
    194
    195            Stream strmOriginFile;
    196
    197            Stream strmZip; ;
    198
    199            IBaseMessagePart msgPart;
    200
    201            msgPart = inmsg.BodyPart;
    202
    203            strmOriginFile = msgPart.GetOriginalDataStream();
    204
    205            try
    206
    207            {
    208
    209 
    210
    211                using (ZipOutputStream s = new ZipOutputStream(strmZipFile))
    212
    213                {
    214
    215 
    216
    217 
    218
    219                    s.SetLevel(9); // 0 - store only to 9 - means best compression
    220
    221                    //fileName = Guid.NewGuid().ToString() + "." + this.extension;
    222
    223                    fileName = inmsg.Context.Read("ReceivedFileName""http://schemas.microsoft.com/BizTalk/2003/" + messageContext.InboundTransportType.ToLower() + "-properties").ToString();
    224
    225                    s.Password = this.password;
    226
    227                    ZipEntry entry = new ZipEntry(fileName);
    228
    229                    s.PutNextEntry(entry);
    230
    231                    StreamUtils.Copy(strmOriginFile, s, buffer);
    232
    233                    s.Finish();
    234
    235                    strmZip = new MemoryStream();
    236
    237                    strmZipFile.Seek(0, SeekOrigin.Begin);
    238
    239                    StreamUtils.Copy(strmZipFile, strmZip, buffer);
    240
    241                    strmZip.Seek(0, SeekOrigin.Begin);
    242
    243                    s.Close();
    244
    245 
    246
    247 
    248
    249                }

    250
    251 
    252
    253                msgPart.Data = strmZip;
    254
    255                pc.ResourceTracker.AddResource(strmZip);
    256
    257                // this way, it's a passthrough pipeline component
    258
    259 
    260
    261                return inmsg;
    262
    263            }

    264
    265            catch (Exception e)
    266
    267            {
    268
    269                Debug.WriteLine("[zipAssember] " + e.Message);
    270
    271                throw e;
    272
    273            }

    274
    275        }

    276
  • 相关阅读:
    jsp页面a标签URL转码问题
    函数的真面目实例
    野指针和内存操作实例
    redhat安装VMware tools的方法
    线索化二叉树实例
    遍历二叉树实例
    创建二叉树实例
    树的存储结构实例
    树的定义实例
    HBase基础和伪分布式安装配置
  • 原文地址:https://www.cnblogs.com/neozhu/p/672885.html
Copyright © 2011-2022 走看看