.net core实现redisClient

通信协议

要想自行实现redisClient,则必须先要了解Redis的socket能信协议。新版统一请求协议在 Redis 1.2 版本中引入, 并最终在 Redis 2.0 版本成为 Redis 服务器通信的标准方式。在这个协议中, 所有发送至 Redis 服务器的参数都是二进制安全(binary safe)的。

以下是这个协议的一般形式:

1
2
3
4
5
6
*<参数数量> CR LF
$<参数 1 的字节数量> CR LF
<参数 1 的数据> CR LF
...
$<参数 N 的字节数量> CR LF
<参数 N 的数据> CR LF

注:命令本身也作为协议的其中一个参数来发送。举个例子, 以下是一个命令协议的打印版本:

复制代码
1 *3 
2 $3 
3 SET 
4 $5 
5 mykey 
6 $7 
7 myvalue
复制代码

这个命令的实际协议值如下:

1 "*3\r\n$3\r\nSET\r\n$5\r\nmykey\r\n$7\r\nmyvalue\r\n"

稍后看到, 这种格式除了用作命令请求协议之外, 也用在命令的回复协议中: 这种只有一个参数的回复格式被称为批量回复(Bulk Reply)。统一协议请求原本是用在回复协议中, 用于将列表的多个项返回给客户端的, 这种回复格式被称为多条批量回复(Multi Bulk Reply)。一个多条批量回复以 *<argc>\r\n 为前缀, 后跟多条不同的批量回复, 其中 argc 为这些批量回复的数量。

Redis 命令会返回多种不同类型的回复。一个状态回复(或者单行回复,single line reply)是一段以 "+" 开始、 "\r\n" 结尾的单行字符串。通过检查服务器发回数据的第一个字节, 可以确定这个回复是什么类型:

  • 状态回复(status reply)的第一个字节是 "+"
  • 错误回复(error reply)的第一个字节是 "-"
  • 整数回复(integer reply)的第一个字节是 ":"
  • 批量回复(bulk reply)的第一个字节是 "$"
  • 多条批量回复(multi bulk reply)的第一个字节是 "*"

.net Core Socket

说起socket,就不得不说IOCP了,这个方案本身就是为了解决多连接、高并发而设计的;但是话又说回来,任何方案都有局限性,不可能解决所有问题;这里不去讨论用在这里是否合适,反正本人就是想这么试一把:用一个简单的ioc模式实现SAEA.Socket,并为此设定各种场景,反过来优化SAEA.Socket本身。下面是一段服务器接收连接的代码:

复制代码
1 private void ProcessAccept(SocketAsyncEventArgs args)  
2  {  
3 if (args == null)  
4  {  
5 args = new SocketAsyncEventArgs();  
6 args.Completed += ProcessAccepted;  
7  }  
8 else  
9  { 
10 args.AcceptSocket = null; 
11  } 
12 if (!_listener.AcceptAsync(args)) 
13  { 
14  ProcessAccepted(_listener, args); 
15  } 
16 }
复制代码

 项目结构

在网上找到redis的命令文档后,本人觉的准备工作差不多了,可以初步定一下项目结构:

  Core:定义的是redisclient相关最基本的业务

  Interface:定义的是一些需要抽象出来的接口

  Model:定义的是redis的数据模型及其请求、回复的类型枚举

  Net:这里就是将继承实现SAEA.Socket而来的RedisConnection通信基础

命令解码器

通过前面的准备工作了解到redisClient的关键在于命令的编解码,至于高大上算法或redis官方算法的实现,本人没有去详细了解,一冲动就自行实现了自定义版的解码器。

复制代码
1 public string Coder(RequestType commandName, params string[] @params)  
2  {  
3  _autoResetEvent.WaitOne();  
4 _commandName = commandName;  
5 var sb = new StringBuilder();  
6 sb.AppendLine("*" + @params.Length);  
7 foreach (var param in @params)  
8  {  
9 sb.AppendLine("$" + param.Length); 
10  sb.AppendLine(param); 
11  } 
12 return sb.ToString(); 
13 }
复制代码
复制代码
1 public ResponseData Decoder()  
2  {  
3 var result = new ResponseData();  
4  
5 string command = null;  
6  
7 string error = null;  
8  
9 var len = 0;  
10  
11 switch (_commandName)  
12  {  
13 case RequestType.PING:  
14 command = BlockDequeue();  
15 if (GetStatus(command, out error))  
16  {  17 result.Type = ResponseType.OK;  
18 result.Data = "PONG";  
19  }  
20 else  
21  {  
22 result.Type = ResponseType.Error;  
23 result.Data = error;  
24  }  
25 break;  
26 case RequestType.AUTH:  
27 case RequestType.SELECT:  
28 case RequestType.SLAVEOF:  
29 case RequestType.SET:  
30 case RequestType.DEL:  
31 case RequestType.HSET:  
32 case RequestType.HDEL:  
33 case RequestType.LSET:  
34 command = BlockDequeue();  
35 if (GetStatus(command, out error))  
36  {  
37 result.Type = ResponseType.OK;  
38 result.Data = "OK";  
39  }  
40 else  
41  {  
42 result.Type = ResponseType.Error;  
43 result.Data = error;  44  }  
45 break;  
46 case RequestType.TYPE:  
47 command = BlockDequeue();  
48 if (GetStatusString(command, out string msg))  
49  {  
50 result.Type = ResponseType.OK;  
51  }  
52 else  
53  {  
54 result.Type = ResponseType.Error;  
55  }  
56 result.Data = msg;  
57 break;  
58 case RequestType.GET:  
59 case RequestType.GETSET:  
60 case RequestType.HGET:  
61 case RequestType.LPOP:  
62 case RequestType.RPOP:  
63 case RequestType.SRANDMEMBER:  
64 case RequestType.SPOP:  
65 len = GetWordsNum(BlockDequeue(), out error);  
66 if (len == -1)  
67  {  
68 result.Type = ResponseType.Empty;  
69 result.Data = error;  
70  }  
71 else  
72  {  
73 result.Type = ResponseType.String;  
74 result.Data += BlockDequeue();  
75  }  
76 break;  
77 case RequestType.KEYS:  
78 case RequestType.HKEYS:  
79 case RequestType.LRANGE:  
80 case RequestType.SMEMBERS:  
81 result.Type = ResponseType.Lines;  
82 var sb = new StringBuilder();  
83 var rn = GetRowNum(BlockDequeue(), out error);  
84 if (!string.IsNullOrEmpty(error))  
85  {  
86 result.Type = ResponseType.Error;  
87 result.Data = error;  
88 break;  
89  }  
90 //再尝试读取一次,发现有回车行出现  
91 if (rn == -1) rn = GetRowNum(BlockDequeue(), out error);  
92 if (!string.IsNullOrEmpty(error))  
93  {  
94 result.Type = ResponseType.Error;  
95 result.Data = error;  
96 break;  
97  }  
98 if (rn > 0)  
99  { 
100 for (int i = 0; i < rn; i++) 
101  { 
102 len = GetWordsNum(BlockDequeue(), out error); 
103  sb.AppendLine(BlockDequeue()); 
104  } 
105  } 
106 result.Data = sb.ToString(); 
107 break; 
108 case RequestType.HGETALL: 
109 case RequestType.ZRANGE: 
110 case RequestType.ZREVRANGE: 
111 result.Type = ResponseType.KeyValues; 
112 sb = new StringBuilder(); 
113 rn = GetRowNum(BlockDequeue(), out error); 
114 if (!string.IsNullOrEmpty(error)) 
115  { 
116 result.Type = ResponseType.Error; 
117 result.Data = error; 
118 break; 
119  } 
120 if (rn > 0) 
121  { 
122 for (int i = 0; i < rn; i++) 
123  { 
124 len = GetWordsNum(BlockDequeue(), out error); 
125  sb.AppendLine(BlockDequeue()); 
126  } 
127  } 
128 result.Data = sb.ToString(); 
129 break; 
130 case RequestType.DBSIZE: 
131 case RequestType.EXISTS: 
132 case RequestType.EXPIRE: 
133 case RequestType.PERSIST: 
134 case RequestType.SETNX: 
135 case RequestType.HEXISTS: 
136 case RequestType.HLEN: 
137 case RequestType.LLEN: 
138 case RequestType.LPUSH: 
139 case RequestType.RPUSH: 
140 case RequestType.LREM: 
141 case RequestType.SADD: 
142 case RequestType.SCARD: 
143 case RequestType.SISMEMBER: 
144 case RequestType.SREM: 
145 case RequestType.ZADD: 
146 case RequestType.ZCARD: 
147 case RequestType.ZCOUNT: 
148 case RequestType.ZREM: 
149 case RequestType.PUBLISH: 
150 var val = GetValue(BlockDequeue(), out error); 
151 if (!string.IsNullOrEmpty(error)) 
152  { 
153 result.Type = ResponseType.Error; 
154 result.Data = error; 
155 break; 
156  } 
157 if (val == 0) 
158  { 
159 result.Type = ResponseType.Empty; 
160  } 
161 else 
162  { 
163 result.Type = ResponseType.OK; 
164  } 
165 result.Data = val.ToString(); 
166 break; 
167 case RequestType.INFO: 
168 var rnum = GetWordsNum(BlockDequeue(), out error); 
169 if (!string.IsNullOrEmpty(error)) 
170  { 
171 result.Type = ResponseType.Error; 
172 result.Data = error; 
173 break; 
174  } 
175 var info = ""; 
176 while (info.Length < rnum) 
177  { 
178 info += BlockDequeue(); 
179  } 
180 result.Type = ResponseType.String; 
181 result.Data = info; 
182 break; 
183 case RequestType.SUBSCRIBE: 
184 var r = ""; 
185 while (IsSubed) 
186  { 
187 r = BlockDequeue(); 
188 if (r == "message\r\n") 
189  { 
190 result.Type = ResponseType.Sub; 
191  BlockDequeue(); 
192 result.Data = BlockDequeue(); 
193  BlockDequeue(); 
194 result.Data += BlockDequeue(); 
195 break; 
196  } 
197  } 
198 break; 
199 case RequestType.UNSUBSCRIBE: 
200 var rNum = GetRowNum(BlockDequeue(), out error); 
201 var wNum = GetWordsNum(BlockDequeue(), out error); 
202  BlockDequeue(); 
203 wNum = GetWordsNum(BlockDequeue(), out error); 
204 var channel = BlockDequeue(); 
205 var vNum = GetValue(BlockDequeue(), out error); 
206 IsSubed = false; 
207 break; 
208  } 
209  _autoResetEvent.Set(); 
210 return result; 
211 }
复制代码

命令的封装与测试

有了socket、redisCoder之后,现在就可以按照官方的redis命令来进行.net core的封装了。本人将这些操作封装到RedisClient、RedisDataBase两个类中,然后又想到连接复用的问题,简单实现了一个连接池RedisClientFactory的类。这样一来就可以好好的来实验一把,看看之前的设想最终能不能实现了:

 View Code

经过上面的代码测试,使用redis-cli工具进行monitor命令监控发现——搞定了!另外源码本人已发到github上面了,SAEA.RedisSocket的详细可查看:https://github.com/yswenli/SAEA/tree/master/Src/SAEA.RedisSocket

 

 

转载请标明本文来源:http://www.cnblogs.com/yswenli/p/8608661.html 
     更多内容欢迎star作者的github:https://github.com/yswenli/SAEA
     如果发现本文有什么问题和任何建议,也随时欢迎交流~