前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RPC编程

RPC编程

作者头像
小蜜蜂
修改2019-07-16 17:22:47
1.3K0
修改2019-07-16 17:22:47
举报
文章被收录于专栏:明丰随笔明丰随笔

什么是RPC?

RPC全称Remote Procedure Call,即远程方法调用。它的表现形式是:

一个RPC请求从客户端初始化,并发送到远程服务器,执行远程服务器上的方法,远程服务器处理完成之后会把响应返回给客户端,然后客户端的应用程序继续执行。RPC的意义是实现一种调用执行远程方法的协议,让你在本地调用远程的方法,而对你来说这个调用是透明的,就向调用本地方法一样。通过RPC能解耦服务,当server需要对方法的实现进行修改时,client完全感知不到,不用做任何变更。

RPC的使用场景探讨

假设我们有两个数据中心,一个是US,一个是CN。我们的需求是产生的数据需要在US和CN之间进行同步。我们最初的方案是在US环境搭建服务,并在US数据库生成数据。然后再从US和CN的数据库层面进行数据的同步,US和CN背后通过V**的方式连接。这个方案是可行的,但是突然有一天V**坏了,不能再继续使用了,US和CN的数据不能同步了。我们怎么办?从对现有代码修改的成本角度来考虑,我们希望代码改动尽可能小,业务逻辑也能更多确保其正确性。我们的解决方案就是使用RPC,现在的问题是CN的数据没有办法同步,如果我们在CN环境使用相同的代码搭建CN服务,并直接连接CN的数据库。US的服务调用data层方法时候,能够同时调用US本地方法并RPC调用CN服务的data层方法,这样US和CN就会同时产生数据了,数据也就一致了,我们的需求也就解决了。

使用WebService实现RPC

创建一个web服务:RpcWebService.asmx,并部署在远程服务器上。这个服务会对来自客户端的调用,使用反射技术进行还原并调用本地的方法,并把本地的执行结果返回给客户端。在客户端里面需要做一个AOP代理层,从原来的只有调用本地方法变成调用远程方法。我们实现依赖了这些package:

Castle.Core: AOP类库

Newtonsoft.Json: 序列化类库

客户端和服务器之间的关系,请看下图:

具体的代码改动:

1. 服务端添加WebService,处理客户端的请求。

2. 客户端添加AOP代理层。

3. 客户端代码修改成RPC的调用形式。

1. 服务端添加WebService,用来处理来自客户端的请求:

代码语言:javascript
复制
/// <summary>
/// Summary description for RpcWebService
/// </summary>
[WebService(Namespace = "http://tempuri.org/")]
[WebServiceBinding(ConformsTo = WsiProfiles.BasicProfile1_1)]
[System.ComponentModel.ToolboxItem(false)]
// To allow this Web Service to be called from script, using ASP.NET AJAX, uncomment the following line. 
// [System.Web.Script.Services.ScriptService]
public class RpcWebService : System.Web.Services.WebService
{
    [WebMethod]
    public RpcServiceResult CallMethod(RpcServiceContext callContext)
    {
        RpcServiceResult rpcResult = new RpcServiceResult();

        try
        {
            // validate interface/class type
            Type type = Type.GetType(callContext.MethodInstanceTypeName);
            if (type == null)
            {
                throw new InvalidOperationException("Failed to locate type:" + callContext.MethodInstanceTypeName);
            }

            // extract arguments information
            var arguments = callContext.Arguments
                .Select(args =>
                    new
                    {
                        TypeName = args.TypeName,
                        Type = Type.GetType(args.TypeName),
                        Value = RpcJsonSerializer.Deserialize(args.Value, Type.GetType(args.TypeName))
                    });

            // validate arguments
            if (arguments.Any())
            {
                var badTypes = arguments.Where(t => t.Type == null).Select(t => t.TypeName);
                if (badTypes.Any())
                {
                    var msg = string.Join(",", badTypes.ToArray());
                    throw new InvalidOperationException("Failed to locate parameter types:" + badTypes);
                }
            }

            // validate method
            Type[] argsTypes = arguments.Select(t => t.Type).ToArray();
            MethodInfo method = type.GetMethod(callContext.MethodName, argsTypes);
            if (method == null)
            {
                throw new InvalidOperationException(string.Format("Failed to find method named:{0}, with parameters:{1}", callContext.MethodName, string.Join(",", argsTypes.Select(t => t.FullName).ToArray())));
            }

            // invoke method on local instance
            var instance = Activator.CreateInstance(type);
            if (instance == null)
            {
                throw new InvalidOperationException("Failed to create instance for type:" + type.AssemblyQualifiedName);
            }

            var methodResult = method.Invoke(instance, arguments.Select(args => args.Value).ToArray());
            rpcResult.Result = RpcJsonSerializer.Serialize(methodResult);
            rpcResult.ResultTypeName = method.ReturnType.FullName;
            rpcResult.ResultFormat = RpcResultFormat.Json;
            rpcResult.IsSuccess = true;
        }
        catch (Exception ex)
        {
            rpcResult.IsSuccess = false;
            rpcResult.ErrorMessage = ex.Message;
        }

        return rpcResult;
    }
}

public class RpcServiceResult
{
    public bool IsSuccess { get; set; }

    /// <summary>
    /// Serialized result of the method call.
    /// </summary>
    public string Result { get; set; }

    /// <summary>
    /// Name of the type of method return value.
    /// </summary>
    public string ResultTypeName { get; set; }

    /// <summary>
    /// In which the result is formatted.
    /// </summary>
    public RpcResultFormat ResultFormat { get; set; }

    public string ErrorMessage { get; set; }
}

public class RpcServiceContext
{
    /// <summary>
    /// Name of the type where the method is obtained.
    /// </summary>
    public string MethodInstanceTypeName { get; set; }

    /// <summary>
    /// Name of the method
    /// </summary>
    public string MethodName { get; set; }

    /// <summary>
    /// List of method argument
    /// </summary>
    public List<RpcServiceArgument> Arguments { get; set; }

    public RpcServiceContext()
    {
        this.Arguments = new List<RpcServiceArgument>();
    }
}

public class RpcServiceArgument
{
    /// <summary>
    /// Type of the argument
    /// </summary>
    public string TypeName { get; set; }

    /// <summary>
    /// Serialized value of argument
    /// </summary>
    public string Value { get; set; }
}

public enum RpcResultFormat
{
    /// <summary>
    /// Default & preferred
    /// </summary>
    Json
}

2. 客户端AOP代理层代码:

代码语言:javascript
复制
public static class ProxyFactory
{
    private static readonly ProxyGenerator _proxyGenerator = new ProxyGenerator();
    private static ConcurrentDictionary<string, object> _objectCache = new ConcurrentDictionary<string, object>(StringComparer.InvariantCultureIgnoreCase);

    public static T CreateProxy<T>(T target) where T : class
    {
        if (target == null)
        {
            throw new ArgumentNullException("target");
        }

        var proxy = _objectCache.GetOrAdd(typeof(T).AssemblyQualifiedName, (type) =>
        {
            var intercepter = new RemoteInvokeIntercepter();
            return _proxyGenerator.CreateInterfaceProxyWithTarget<T>(target, intercepter);
        }) as T;

        if (proxy == null)
        {
            throw new InvalidOperationException(string.Format("Failed to create proxy for Type: {0}", typeof(T).AssemblyQualifiedName));
        }

        return proxy;
    }
}

internal class RemoteInvokeIntercepter : IInterceptor
{
    private const int MAX_SERVICE_RETRY = 3;

    public RemoteInvokeIntercepter()
    {
    }

    public void Intercept(IInvocation invocation)
    {
        RpcServiceContext callContext = new RpcServiceContext()
        {
            MethodInstanceTypeName = invocation.InvocationTarget.GetType().AssemblyQualifiedName,
            MethodName = invocation.Method.Name,
        };

        callContext.Arguments = invocation.MethodInvocationTarget
            .GetParameters()
            .OrderBy(p => p.Position)
            .Select(p => new RpcServiceArgument()
            {
                TypeName = invocation.Arguments[p.Position] != null
                            ? invocation.Arguments[p.Position].GetType().AssemblyQualifiedName
                            : p.ParameterType.AssemblyQualifiedName,
                Value = RpcJsonSerializer.Serialize(invocation.Arguments[p.Position])
            })
            .ToArray();

        RpcWebServiceSoapClient proxy = new RpcWebServiceSoapClient();

        var returnValueResponse = CallWebServiceWithRetry(proxy.CallMethod, callContext);
        if (!returnValueResponse.IsSuccess)
        {
            throw new InvalidOperationException(string.Format("Failed to call service:{0}, remote error:{1}. Check the error log in rpc service for more details.", invocation.Method.Name, returnValueResponse.ErrorMessage));
        }

        if (invocation.Method.ReturnType.FullName.Equals("System.Void"))
        {
            return;
        }

        if (returnValueResponse.ResultFormat == RpcResultFormat.Json)
        {
            invocation.ReturnValue = RpcJsonSerializer.Deserialize(returnValueResponse.Result, invocation.Method.ReturnType);
        }
        else
        {
            throw new NotSupportedException(string.Format("RpcResultFormat '{0}' is not supported", returnValueResponse.ResultFormat));
        }
    }

    private RpcServiceResult CallWebServiceWithRetry(Func<RpcServiceContext, RpcServiceResult> service, RpcServiceContext args, int retryCountdown = MAX_SERVICE_RETRY)
    {
        try
        {
            return service.Invoke(args);
        }
        catch (Exception exception)
        {
            if (retryCountdown > 0)
            {
                return CallWebServiceWithRetry(service, args, --retryCountdown);
            }
            else
            {
                throw new Exception(string.Format("Failed to call service:{0}, with {1} retries.", service.Method.Name, MAX_SERVICE_RETRY - retryCountdown), exception);
            }
        }
    }
}

public static class RpcJsonSerializer
{
    /// <summary>
    /// Specificly for circular references.
    /// </summary>
    private static readonly JsonSerializer _jsonSerializer = JsonSerializer.Create(
        new JsonSerializerSettings()
        {
            PreserveReferencesHandling = PreserveReferencesHandling.Objects
        }
    );
    public static string Serialize(object obj)
    {
        StringBuilder sb = new StringBuilder();
        using (StringWriter sw = new StringWriter(sb))
        {
            _jsonSerializer.Serialize(sw, obj);
        }
        return sb.ToString();
    }
    public static object Deserialize(string jsonText, Type objectType)
    {
        var reader = new JsonTextReader(new StringReader(jsonText));
        return _jsonSerializer.Deserialize(reader, objectType);
    }
}  

3. 客户端代码修改成RPC的调用形式:

代码语言:javascript
复制
public class MySvc : IMySvc
{
    private static readonly IMySvc instance = new MySvc();
    public static IMySvc RPCInstance
    {
        get
        {
            return ProxyFactory.CreateProxy(instance);
        }
    }
    public string GetString(int num)
    {
        return num.ToString();
    }
}

public class HomeController : Controller
{
    private IMySvc _mySvc;
    public HomeController()
    {
        _mySvc = MySvc.RPCInstance;
    }
    public ActionResult Index()
    {
        ViewBag.Hello = _mySvc.GetString(123);
        return View();
    }
}  

进过这一系列的操作,我们的代码就具备了调用远程服务的能力,而且调用起来就像调用本地代码一样简单!

本文参与?腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-05-30,如有侵权请联系?cloudcommunity@tencent.com 删除

本文分享自 明丰随笔 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与?腾讯云自媒体分享计划? ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
数据库
云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
http://www.vxiaotou.com