我的项目采用的是基于事件的订阅-发布模式,主要是实现客户端与服务端的即时数据交互,客户端通过订阅事件以待接收数据,服务端通过发布服务将即时数据发布至服务,服务通知订阅了事件的客户端。问题是这样的:软件在网络情况较差的情况下,客户端与服务端的数据交换在一段时间后便停止,这个时候自动如何重新与服务器建立通信。  当服务端与客户端通讯中止的时候,客户端已经故障的情况下,服务端还是对客户端之前保存在服务端的引用进行数据发布,我的问题是如何在这种情况下服务端主动删除无效的客户端引用。(我已尝试删除,但有时候可以,有时候很困难而且几乎不能删除。)  另一个问题是,采用这种模式,不同订阅者(客户端)在接受数据的时候会互相影响,也就是说一个客户端如果接受数据受阻,其他客户端接受也收到影响。不知道该怎么解决。这些问题困扰我一周了,请各位达人帮帮忙,下面我贴出我的代码。发布-订阅类public abstract class SubscriptionManager<T> where T : class
 {
  //定义字典保存订阅者列表
  static Dictionary<string, List<T>> m_TransientStore;  static SubscriptionManager()
  {
   m_TransientStore = new Dictionary<string, List<T>>();
   string[] methods = GetOperations();
   Action<string> insert = delegate(string methodName)
   {
    m_TransientStore.Add(methodName, new List<T>());
   };
   Array.ForEach(methods, insert);
  }
  //获取事件操作
  static string[] GetOperations()
  {
   MethodInfo[] methods = typeof(T).GetMethods(BindingFlags.Public | BindingFlags.FlattenHierarchy | BindingFlags.Instance);
   List<string> operations = new List<string>(methods.Length);   Action<MethodInfo> add = delegate(MethodInfo method)
   {
    Debug.Assert(!operations.Contains(method.Name));
    operations.Add(method.Name);
   };
   Array.ForEach(methods, add);
   return operations.ToArray();
  }  //获取订阅了某一事件的订阅者列表
  internal static T[] GetTransientList(string eventOperation)
  {
   lock (typeof(SubscriptionManager<T>))
   {
    List<T> list = m_TransientStore[eventOperation];
    return list.ToArray();
   }
  }
  //将订阅者添加的字典中
  static void AddTransient(T subscriber, string eventOperation)
  {
   lock (typeof(SubscriptionManager<T>))
   {
    List<T> list = m_TransientStore[eventOperation];
    if (list.Contains(subscriber))
    {
     return;
    }
    list.Add(subscriber);
   }
  }
  //删除订阅了eventOperation的订阅者
  static void RemoveTransient(T subscriber, string eventOperation)
  {
   lock (typeof(SubscriptionManager<T>))
   {
    List<T> list = m_TransientStore[eventOperation];
    list.Remove(subscriber);
   }
  }
  //订阅方法
  public void Subscribe(string eventOperation)
  {
   lock (typeof(SubscriptionManager<T>))
   {
    T subscriber = OperationContext.Current.GetCallbackChannel<T>();
    if (String.IsNullOrEmpty(eventOperation) == false)
    {
     AddTransient(subscriber, eventOperation);
    }
    else
    {
     string[] methods = GetOperations();
     Action<string> addTransient = delegate(string methodName)
     {
      AddTransient(subscriber, methodName);
     };
     Array.ForEach(methods, addTransient);
    }
   }
  }
  //取消订阅方法
  public void Unsubscribe(string eventOperation)
  {
   lock (typeof(SubscriptionManager<T>))
   {
    T subscriber = OperationContext.Current.GetCallbackChannel<T>();
    if (String.IsNullOrEmpty(eventOperation) == false)
    {
     RemoveTransient(subscriber, eventOperation);
    }
    else
    {
     string[] methods = GetOperations();
     Action<string> removeTransient = delegate(string methodName)
     {
      RemoveTransient(subscriber, methodName);
     };
     Array.ForEach(methods, removeTransient);
    }
   }
  } 
 }
 发布服务类public abstract class PublishService<T> where T : class
 {
  //发布者通过调用该方法事件
  protected static void FireEvent(params object[] args)
  {
   StackFrame stackFrame = new StackFrame(1);
   string methodName = stackFrame.GetMethod().Name;   PublishTransient(methodName, args);
  }
  //发布者通过调用该方法发布特定事件
  static void FireEvent(string methodName, params object[] args)
  {
   PublishTransient(methodName, args);
  }  static void PublishTransient(string methodName, params object[] args)
  {
   //获取订阅者列表
   T[] subscribers = SubscriptionManager<T>.GetTransientList(methodName);
   Publish(subscribers, false, methodName, args);
  }  static void Publish(T[] subscribers, bool closeSubscribers, string methodName, params object[] args)
  {
   
   WaitCallback fire = delegate(object subscriber)
   {
    Invoke(subscriber as T, methodName, args);
    if (closeSubscribers)
    {
     using (subscriber as IDisposable)
     { }
    }
   };
   Action<T> queueUp = delegate(T subscriber)
   {
    ThreadPool.QueueUserWorkItem(fire, subscriber);
   };
   Array.ForEach(subscribers, queueUp);
  }
  static void Invoke(T subscriber, string methodName, object[] args)
  {
   Debug.Assert(subscriber != null);
   Type type = typeof(T);
   MethodInfo methodInfo = type.GetMethod(methodName);
   try
   {
    methodInfo.Invoke(subscriber, args);
   }
   catch (Exception e)
   {
    //如果调用发生异常,我尝试在这删除引用,但每次都很困难,有时候能删除,有时候删除很久都不行
    Trace.WriteLine(e.Message);
   }
  }
 }
服务端配置文件:<pre lang="x-c#"><?xml version="1.0"?>
<configuration>
<system.serviceModel>
 <bindings>
  <netTcpBinding>
  <binding name="BindingBehaviorConfiguration" receiveTimeout="00:10:00" maxBufferSize="65536"
     maxReceivedMessageSize="65536" transferMode="Buffered" maxBufferPoolSize="65536"
     closeTimeout="00:10:00" sendTimeout="00:10:00" openTimeout="00:10:00">
   <readerQuotas maxArrayLength="65536" maxStringContentLength="65536" maxBytesPerRead="65536"/>
<reliableSession enabled="true"/>
   <security mode="None">
   <transport clientCredentialType="Windows"/>
   <message clientCredentialType="Windows"/>
   </security>
  </binding>
  </netTcpBinding>
 </bindings>
 <services>
  <service behaviorConfiguration="serviceBehavior" name="HK.Globex.Services.MySubscriptionService">
  <endpoint address="MySubscriptionService" binding="netTcpBinding" bindingConfiguration="BindingBehaviorConfiguration" contract="HK.Globex.Contracts.IMySubscriptionService">
   <identity>
   <dns value="localhost"/>
   </identity>
  </endpoint>
  <endpoint address="mex" binding="mexTcpBinding" bindingConfiguration="" contract="IMetadataExchange">
   <identity>
   <dns value="localhost"/>
   </identity>
  </endpoint>
  <host>
   <baseAddresses>
   <add baseAddress="net.tcp://192.168.1.2:9000"/>
   </baseAddresses>
  </host>
  </service>
  <service behaviorConfiguration="serviceBehavior" name="HK.Globex.Services.MyPublishService">
  <endpoint address="MyPublishService" binding="netTcpBinding" bindingConfiguration="BindingBehaviorConfiguration" contract="HK.Globex.Contracts.IMyEvents">
   <identity>
   <dns value="localhost"/>
   </identity>
  </endpoint>
  <endpoint address="mex" binding="mexTcpBinding" bindingConfiguration="" contract="IMetadataExchange">
   <identity>
   <dns value="localhost"/>
   </identity>
  </endpoint>
  <host>
   <baseAddresses>
   <add baseAddress="net.tcp://192.168.1.2:9001"/>
   </baseAddresses>
  </host>
  </service>
 </services>
 <behaviors>
  <serviceBehaviors>
  <behavior name="serviceBehavior">
   <serviceMetadata/>
   <serviceDebug includeExceptionDetailInFaults="true"/>
   <serviceThrottling maxConcurrentCalls="1000" maxConcurrentInstances="1000" maxConcurrentSessions="1000"/>
  </behavior>
  </serviceBehaviors>
 </behaviors>
 </system.serviceModel>
</configuration>发布者配置文件:<?xml version="1.0"?>
<configuration>
 <system.serviceModel>
  <bindings>
  <netTcpBinding>
  <binding name="NetTcpBinding_MyEventsContract" closeTimeout="00:10:00"
     openTimeout="00:10:00" receiveTimeout="00:10:00" sendTimeout="00:10:00"
     transactionFlow="false" transferMode="Buffered" transactionProtocol="OleTransactions"
     hostNameComparisonMode="StrongWildcard" listenBacklog="10"
     maxBufferPoolSize="65536" maxBufferSize="65536" maxConnections="10"
     maxReceivedMessageSize="65536">
   <reliableSession ordered="false" inactivityTimeout="00:10:00"
      enabled="true" />
   <readerQuotas maxDepth="32" maxStringContentLength="65536" maxArrayLength="65536"
    maxBytesPerRead="65536" maxNameTableCharCount="65536" />
   <security mode="None">
   <transport clientCredentialType="Windows" protectionLevel="EncryptAndSign" />
   <message clientCredentialType="Windows" />
   </security>
  </binding>
  </netTcpBinding>
 </bindings>
 <client>
  <endpoint address="net.tcp://192.168.1.2:9001/MyPublishService"
    binding="netTcpBinding" bindingConfiguration="NetTcpBinding_MyEventsContract"
    contract="MyEventsContract" name="NetTcpBinding_MyEventsContract">
  <identity>
   <dns value="localhost" />
  </identity>
  </endpoint>
 </client>
 </system.serviceModel>
</configuration>客户端配置文件:<?xml version="1.0"?>
<configuration>
 <system.serviceModel>
  <bindings>
  <netTcpBinding>
   <binding name="NetTcpBinding_MySubscriptionContract" closeTimeout="00:10:00"
    openTimeout="00:10:00" receiveTimeout="00:10:00" sendTimeout="00:10:00"
    transactionFlow="false" transferMode="Buffered" transactionProtocol="OleTransactions"
    hostNameComparisonMode="StrongWildcard" listenBacklog="10"
    maxBufferPoolSize="65536" maxBufferSize="65536" maxConnections="10"
    maxReceivedMessageSize="2147483647">
   <readerQuotas maxDepth="32" maxStringContentLength="2147483647" maxArrayLength="2147483647"
    maxBytesPerRead="4096" maxNameTableCharCount="2147483647" />
   <reliableSession ordered="false" inactivityTimeout="00:10:00"
      enabled="true" />
   <security mode="None">
    <transport clientCredentialType="Windows" protectionLevel="EncryptAndSign" />
    <message clientCredentialType="Windows" />
   </security>
   </binding>
   </netTcpBinding>
  </bindings>
  <client>
  <endpoint address="net.tcp://192.168.1.2:9000/MySubscriptionService"
   binding="netTcpBinding" bindingConfiguration="NetTcpBinding_MySubscriptionContract"
   contract="MySubscriptionContract" name="NetTcpBinding_MySubscriptionContract">
   <identity>
   <dns value="localhost" />
   </identity>
  </endpoint>
  </client>
 </system.serviceModel>
</configuration>

解决方案 »

  1.   

    太长了 看不下去
    你服务实例设置成peercall试试?
      

  2.   

    PerCall与PerSession都试过了,这是服务实例激活模式吧,应该和这没多大关系。
      

  3.   

    给你发个帖子,希望能找到一点启发C# 断点续传原理与实现在了解HTTP断点续传的原理之前,让我们先来了解一下HTTP协议,HTTP协议是一种基于tcp的简单协议,分为请求和回复两种。请求协议是由客户机(浏览器)向服务器(WEB SERVER)提交请求时发送报文的协议。回复协议是由服务器(web server),向客户机(浏览器)回复报文时的协议。请求和回复协议都由头和体组成。头和体之间以一行空行为分隔。   以下是一个请求报文与相应的回复报文的例子: 
    GET /image/index_r4_c1.jpg HTTP/1.1 
    Accept: */* 
    Referer: http://192.168.3.120:8080 
    Accept-Language: zh-cn 
    Accept-Encoding: gzip, deflate 
    User-Agent: Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.0; .NET CLR 1.0.3705) 
    Host: 192.168.3.120:8080 
    Connection: Keep-AliveHTTP/1.1 200 OK 
    Server: Microsoft-IIS/5.0 
    Date: Tue, 24 Jun 2003 05:39:40 GMT 
    Content-Type: image/jpeg 
    Accept-Ranges: bytes 
    Last-Modified: Thu, 23 May 2002 03:05:40 GMT 
    ETag: "bec48eb862c21:934" 
    Content-Length: 2827….  下面我们就来说说"断点续传",顾名思义,断点续传就是在上一次下载时断开的位置开始继续下载。 
    在HTTP协议中,可以在请求报文头中加入Range段,来表示客户机希望从何处继续下载。   比如说从第1024字节开始下载,请求报文如下:GET /image/index_r4_c1.jpg HTTP/1.1 
    Accept: */* 
    Referer: http://192.168.3.120:8080 
    Accept-Language: zh-cn 
    Accept-Encoding: gzip, deflate 
    User-Agent: Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.0; .NET CLR 1.0.3705) 
    Host: 192.168.3.120:8080 
    Range:bytes=1024- 
    Connection: Keep-Alive 
      .NET中的相关类   明白了上面的原理,那么,我们来看看.NET FRAMEWORK中为我们提供了哪些类可以来做这些事。   完成HTTP请求System.Net.HttpWebRequest   HttpWebRequest 类对 WebRequest 中定义的属性和方法提供支持,也对使用户能够直接与使用 HTTP 的服务器交互的附加属性和方法提供支持。   HttpWebRequest 将发送到 Internet 资源的公共 HTTP 标头值公开为属性,由方法或系统设置。下表包含完整列表。可以将 Headers 属性中的其他标头设置为名称/值对。但是注意,某些公共标头被视为受限制的,它们或者直接由 API公开,或者受到系统保护,不能被更改。Range也属于被保护之列,不过,.NET为开发者提供了更方便的操作,就是 AddRange方法,向请求添加从请求数据的开始处或结束处的特定范围的字节范围标头   完成文件访问System.IO.FileStream   FileStream 对象支持使用Seek方法对文件进行随机访问, Seek 允许将读取/写入位置移动到文件中的任意位置。这是通过字节偏移参考点参数完成的。字节偏移量是相对于查找参考点而言的,该参考点可以是基础文件的开始、当前位置或结尾,分别由SeekOrigin类的三个属性表示。   代码实现   了解了.NET提供的相关的类,那么,我们就可以方便的实现了。   代码如下: 
    static void Main(string[] args) 
    {string StrFileName="c:\\aa.zip"; //根据实际情况设置 
    string StrUrl="http://www.xxxx.cn/xxxxx.zip"; //根据实际情况设置//打开上次下载的文件或新建文件 
    long lStartPos =0; 
    System.IO.FileStream fs; 
    if (System.IO.File.Exists(StrFileName)) 

    fs= System.IO.File.OpenWrite(StrFileName); 
    lStartPos=fs.Length; 
    fs.Seek(lStartPos,System.IO.SeekOrigin.Current); //移动文件流中的当前指针 

    else 

    fs = new System.IO.FileStream(StrFileName,System.IO.FileMode.Create); 
    lStartPos =0; 
    }//打开网络连接 
    try 

    System.Net.HttpWebRequest request =(System.Net.HttpWebRequest)System.Net.HttpWebRequest.Create(StrUrl); 
    if ( lStartPos>0) 
    request.AddRange((int)lStartPos); //设置Range值//向服务器请求,获得服务器回应数据流 
    System.IO.Stream ns= request.GetResponse().GetResponseStream();byte[] nbytes = new byte[512]; 
    int nReadSize=0; 
    nReadSize=ns.Read(nbytes,0,512); 
    while( nReadSize >0) 

    fs.Write(nbytes,0,nReadSize); 
    nReadSize=ns.Read(nbytes,0,512); 

    fs.Close(); 
    ns.Close(); 
    Console.WriteLine("下载完成"); 

    catch(Exception ex) 

    fs.Close(); 
    Console.WriteLine("下载过程中出现错误:"+ex.ToString()); 
    } }
      

  4.   

     参考了一些英文资料,有老外建议在服务回调客户端的时候采取异步的方法,但是我试了好像不起作用,不知道是我用错了还是怎么的。我主要就是通过修改回调契约,增加了BeginXXx与EndXXx方法:[OperationContract(IsOneWay=true,AsyncPattern=true)]
    IAsyncResult BeginDisplayPrice(string pName,string buyPrice,string sellPrice,AsyncCallback callback,object state);
    void EndDisplayPrice(IAsyncResult result);这样做好像程序不能正常运行,对于我的程序,服务回调客户端该如何实现,谢谢你们!  static void Invoke(T subscriber, string methodName, object[] args)
      {
       Debug.Assert(subscriber != null);
       Type type = typeof(T);   //获取回调方法名字
       MethodInfo methodInfo = type.GetMethod(methodName);
       try
       {    //通过反射调用回调方法
        methodInfo.Invoke(subscriber, args);
       }
       catch (Exception e)
       {
        Trace.WriteLine(e.Message);
       }
      }
      

  5.   

     参考了一些英文资料,有老外建议在服务回调客户端的时候采取异步的方法,但是我试了好像不起作用,不知道是我用错了还是怎么的。我主要就是通过修改回调契约,增加了BeginXXx与EndXXx方法:[OperationContract(IsOneWay=true,AsyncPattern=true)]
    IAsyncResult BeginDisplayPrice(string pName,string buyPrice,string sellPrice,AsyncCallback callback,object state);
    void EndDisplayPrice(IAsyncResult result);这样做好像程序不能正常运行,对于我的程序,服务回调客户端该如何实现,谢谢你们!  static void Invoke(T subscriber, string methodName, object[] args)
      {
       Debug.Assert(subscriber != null);
       Type type = typeof(T);   //获取回调方法名字
       MethodInfo methodInfo = type.GetMethod(methodName);
       try
       {    //通过反射调用回调方法
        methodInfo.Invoke(subscriber, args);
       }
       catch (Exception e)
       {
        Trace.WriteLine(e.Message);
       }
      }
      

  6.   

     参考了一些英文资料,有老外建议在服务回调客户端的时候采取异步的方法,但是我试了好像不起作用,不知道是我用错了还是怎么的。我主要就是通过修改回调契约,增加了BeginXXx与EndXXx方法:[OperationContract(IsOneWay=true,AsyncPattern=true)]
    IAsyncResult BeginDisplayPrice(string pName,string buyPrice,string sellPrice,AsyncCallback callback,object state);
    void EndDisplayPrice(IAsyncResult result);这样做好像程序不能正常运行,对于我的程序,服务回调客户端该如何实现,谢谢你们!  static void Invoke(T subscriber, string methodName, object[] args)
      {
       Debug.Assert(subscriber != null);
       Type type = typeof(T);   //获取回调方法名字
       MethodInfo methodInfo = type.GetMethod(methodName);
       try
       {    //通过反射调用回调方法
        methodInfo.Invoke(subscriber, args);
       }
       catch (Exception e)
       {
        Trace.WriteLine(e.Message);
       }
      }
      

  7.   

    让你的服务接口实现IDispose接口,服务方式为PerSession,其它两种方式肯定不行,必须为PerSession。这时订阅者列表中的每个对象就应该是一个单独的服务对象,当服务端与客户端通讯中止的时候,该服务对象就会出错而调用Dispose方法,你可以在Dispose方法中主动删除无效的客户端引用。
      

  8.   


    使用PerSession模式就不可能互相影响,每个用户单独一个服务对象,互相独立。
      

  9.   

    发布者服务类:
    [ServiceBehavior(InstanceContextMode = InstanceContextMode.PerSession)]
        public class MyPublishService:PublishService<IMyEvents>,IMyEvents
        {
            public void DisplayPrice(string pName, string buyPrice, string sellPrice)
            {
                FireEvent(pName, buyPrice, sellPrice);
            }
        }
    发布者通过生成代理调用DisplayPrice,从而调用上面发的发布者抽象类中的回调方法。
    能不能帮我理理,对在服务端实现异步回调能不能给点提示,谢了。
      

  10.   

    代理类通过什么方式获得的?订阅类的契约是什么?客户端订阅时的代码?贴的代码大部分都不是关键点。至于阻塞,ConcurrencyMode默认是Single,改成Multiple就行了。
      

  11.   

    代理类通过svcutil.exe 方式生成的。
    订阅类的契约:
        [ServiceContract(Name = "SubscriptionContract", Namespace = "http://HK.Globex.Contracts/Client/2010/04")]
        public interface ISubscriptionService
        {
            [OperationContract]
            void Subscribe(string eventOperation);
            [OperationContract]
            void Unsubscribe(string evetnOperation);
        }
    客户端订阅时代码: MySubscriptionContractClient m_proxy = null; //工具生成的代理类
            public MySubscriptionProxyInvoke()
            {
                InstanceContext context = new InstanceContext(this);
                m_proxy = new MySubscriptionContractClient(context);//实例化
            }        public void Subscribe(string Operation)
            {
                m_proxy.Subscribe(Operation);  //客户端订阅操作
            }
            //服务端回调客户端的方法
            public void DisplayPrice(string pName, string buyPrice, string sellPrice)
            {
               
            }
      

  12.   

    关于你说的ConcurrencyMode.Multiple试过了,也不行。
      

  13.   

    你贴了很多代码,但是都没有使用[code]标签贴出来,看得人眼睛很累,而且还缺少注释,函数互相调用的地方又那么多,不像VS中可以直接“转到定义”。说真的,看你的代码太累啊。你还是简单说明下你是如何注册事件的,我感觉你并没有注册事件,事件的注册是通过定义event对象,然后使用+=运算符添加的,取消注册注册是通过-=运算符来执行的。
      

  14.   

    现在好像看懂些了,原来你的思路和我不一样啊。你是以方法为唯一键,保存方法对应的订阅对象到List<T>中,然后你的取消订阅就是从List<T>中移出订阅者。我的思路是,你给订阅者定义一个事件变量,当订阅者要订阅事件时,你将该订阅者的事件执行+=操作,添加该事件,取消事件则执行-=操作。当一个客户连接到服务器后,自动全局保存这个连接对象,那个就是订阅者,当客户放弃连接后,在Dispose方法中,从全局移除保存的这个对象。
      

  15.   

    删除不了是由于lock造成的,可以通过缩小所有lock的范围来解决。
    造成阻塞有很多原因,不知道你服务端的host是什么,如果是iis7的tcp服务采用了ConcurrencyMode.Multiple应该不会有这个问题;如果是win form或者console app可能不能设置成为STA的;可以试一下把callback的operation设置为OneWay;还有就是多个阻塞的客户端是什么客户端,多个独立的app还是多个控件?
      

  16.   

    谢谢你的建议,我用了qldsrx所说的订阅发布模式,通过利用Mutex来缩小和同步删除引用代码,解决了客户端无效引用的删除问题,同事通过修改回调契约为异步回调并在客户端回调的时候采用callback.BeginDisplayPrice(...)。解决了客户端阻塞问题。