本文将指导您创建一个简单的并发gRPC聊天服务器应用程序。我们将使用跨平台、开源且模块化的.NET Core框架来构建聊天服务器应用程序。文章将涵盖以下主题:
- A brief introduction to gRPC
- 设置gRPC环境并定义服务契约
- 实现聊天服务及处理客户端请求
- 通过异步编程处理多个客户端的并发连接
- 向同一房间内所有已连接的客户端广播聊天消息
通过本教程,您将掌握如何利用gRPC构建聊天服务器。
什么是gRPC?
gRPC是Google远程过程调用的缩写,最初由Google开发,现由云原生计算基金会(CNCF)维护。gRPC使您能够像调用本地函数一样轻松地连接、调用、操作和调试分布式异构应用程序。
gRPC采用HTTP/2作为传输协议,采用契约优先的API开发方法,使用Protocol Buffers(Protobuf)作为接口定义语言及其底层消息交换格式。它支持四种API类型(一元RPC、服务器端流式RPC、客户端端流式RPC和双向流式RPC)。更多关于gRPC的信息,请阅读此处。
开始入门
在开始编写代码之前,需要安装.NET Core,并确保已准备好以下前提条件:
- Visual Studio Code、Visual Studio或JetBrains Rider IDE
- .NET Core
- gRPC .NET
- Protobuf
步骤1:从Visual Studio或命令行创建gRPC项目
- 您可以使用以下命令创建新项目。如果成功,它应该会在您指定的目录中以名称’ChatServer‘创建。
dotnet new grpc -n ChatServerApp
-
使用您选择的编辑器打开项目。我正在使用Mac版的Visual Studio。
步骤2:在Proto文件中定义Protobuf消息
Protobuf合约:
- 在protos文件夹内创建名为server.proto的.proto文件。该proto文件用于定义服务的结构,包括消息类型和服务支持的方法。
syntax = "proto3";
option csharp_namespace = "ChatServerApp.Protos";
package chat;
service ChatServer {
// 客户端和服务器之间的双向通信流
rpc HandleCommunication(stream ClientMessage) returns (stream ServerMessage);
}
// 客户端消息:
message ClientMessage {
oneof content {
ClientMessageLogin login = 1;
ClientMessageChat chat = 2;
}
}
message ClientMessageLogin {
string chat_room_id = 1;
string user_name = 2;
}
message ClientMessageChat {
string text = 1;
}
// 服务器消息
message ServerMessage {
oneof content {
ServerMessageLoginSuccess login_success = 1;
ServerMessageLoginFailure login_failure = 2;
ServerMessageUserJoined user_joined = 3;
ServerMessageChat chat = 4;
}
}
message ServerMessageLoginFailure {
string reason = 1;
}
message ServerMessageLoginSuccess {
}
message ServerMessageUserJoined {
string user_name = 1;
}
message ServerMessageChat {
string text = 1;
string user_name = 2;
}
- `
ChatServer
` 定义了我们聊天应用的主要服务,包含一个名为 `HandleCommunication
` 的RPC方法。该方法用于客户端与服务器之间的双向流通信。它接收一个 `ClientMessage
` 流作为输入,并返回一个 `ServerMessage
` 流作为输出。
service ChatServer {
// 客户端与服务器之间的双向通信流
rpc HandleCommunication(stream ClientMessage) returns (stream ServerMessage);
}
- `
ClientMessageLogin
`,由客户端发送,包含两个字段:`chat_room_id` 和 `user_name`。此消息类型用于从客户端向服务器发送登录信息。`chat_room_id` 字段指定客户端希望加入的聊天室,而 `user_name` 字段指定客户端在聊天室中使用的用户名
message ClientMessageLogin {
string chat_room_id = 1;
string user_name = 2;
}
- `
ClientMessageChat
` 用于从客户端向服务器发送聊天消息。它包含一个 `text` 字段.
message ClientMessageChat {
string text = 1;
}
- `
ClientMessage
` 定义了客户端可以发送给服务器的不同类型的消息。它包含一个 `oneof` 字段,意味着同时只能设置其中一个字段。如果使用 `oneof
`,生成的C#代码将包含一个枚举,指示哪些字段已被设置。字段名为 `login
` 和 `chat
`,分别对应于 `ClientMessageLogin
` 和 `ClientMessageChat
` 消息。
message ClientMessage {
oneof content {
ClientMessageLogin login = 1;
ClientMessageChat chat = 2;
}
}
ServerMessageLoginFailure
定义了服务器发送给客户端的消息,用于指示客户端登录聊天室失败。其中,“reason”字段指明了失败的原因。
message ServerMessageLoginFailure {
string reason = 1;
}
-
ServerMessageLoginSuccess
定义了 服务器发送给客户端的消息,表示客户端已成功登录聊天室。该消息不含任何字段,仅表明登录成功。当客户端发送ClientMessageLogin
消息时,服务器将根据登录是否成功,相应地回复ServerMessageLoginSuccess
消息或ServerMessageLoginFailure
消息。若登录成功,客户端便可开始发送ClientMessageChat
消息,启动聊天信息交流。
message ServerMessageLoginSuccess {
}
- 消息
ServerMessageUserJoined
定义了服务器发送给客户端的消息,当有新用户加入聊天室时。
message ServerMessageUserJoined {
string user_name = 1;
}
- 消息
ServerMessageChat
定义了服务器发送的消息,指示已收到新的聊天信息。其中,“text”字段指定了聊天信息的内容,而“user_name”字段则指定了发送该消息的用户名。
message ServerMessageChat {
string text = 1;
string user_name = 2;
}
- 消息
ServerMessage
定义了服务器向客户端发送的不同类型的消息。它包含一个oneof
字段名为content,具有多个选项。字段名称包括”login_success
,” “login_failure
,” “user_joined
,” 和 “chat
“,分别对应于ServerMessageLoginSuccess
、ServerMessageLoginFailure
、ServerMessageUserJoined
和ServerMessageChat
消息。
message ServerMessage {
oneof content {
ServerMessageLoginSuccess login_success = 1;
ServerMessageLoginFailure login_failure = 2;
ServerMessageUserJoined user_joined = 3;
ServerMessageChat chat = 4;
}
}
步骤3:添加ChatService
类
添加一个继承自ChatServerBase
的ChatService
类(该类由gRPC代码生成器protoc从server.proto文件生成)。然后重写HandleCommunication
方法。HandleCommunication
方法的实现将负责处理客户端与服务器之间的通信。
public class ChatService : ChatServerBase
{
private readonly ILogger<ChatService> _logger;
public ChatService(ILogger<ChatService> logger)
{
_logger = logger;
}
public override Task HandleCommunication(IAsyncStreamReader<ClientMessage> requestStream, IServerStreamWriter<ServerMessage> responseStream, ServerCallContext context)
{
return base.HandleCommunication(requestStream, responseStream, context);
}
}
步骤4:配置gRPC
在program.cs文件中:
using ChatServer.Services;
using Microsoft.AspNetCore.Server.Kestrel.Core;
var builder = WebApplication.CreateBuilder(args);
/*
// 在macOS上成功运行gRPC需要额外的配置。
// 有关如何在macOS上配置Kestrel和gRPC客户端的说明,
// 请访问 https://go.microsoft.com/fwlink/?linkid=2099682
To avoid missing ALPN support issue on Mac. To work around this issue, configure Kestrel and the gRPC client to use HTTP/2 without TLS.
You should only do this during development. Not using TLS will result in gRPC messages being sent without encryption.
https://learn.microsoft.com/en-us/aspnet/core/grpc/troubleshoot?view=aspnetcore-7.0
*/
builder.WebHost.ConfigureKestrel(options =>
{
// 设置一个不使用TLS的HTTP/2端点。
options.ListenLocalhost(50051, o => o.Protocols =
HttpProtocols.Http2);
});
// 向容器添加服务。
builder.Services.AddGrpc();
builder.Services.AddSingleton();
var app = builder.Build();
// 配置HTTP请求管道。
app.MapGrpcService();
app.MapGet("/", () => "Communication with gRPC endpoints must be made through a gRPC client. To learn how to create a client, visit: https://go.microsoft.com/fwlink/?linkid=2086909");
Console.WriteLine($"gRPC server about to listening on port:50051");
app.Run();
注意:ASP.NET Core gRPC模板和示例默认使用TLS。但出于开发目的,我们配置了Kestrel和gRPC客户端使用HTTP/2不带TLS。
第5步:创建ChatRoomService
并实现HandleCommunication
中所需的各种方法
ChatRoomService
类负责管理聊天室和客户端,以及处理客户端之间的消息发送。它使用ConcurrentDictionary
存储聊天室,并为每个房间维护一个ChatClient
对象列表。AddClientToChatRoom
方法将新客户端添加到聊天室中,而BroadcastClientJoinedRoomMessage
方法在新客户端加入时向房间内的所有客户端发送消息。BroadcastMessageToChatRoom
方法向房间内的所有客户端发送消息,但发送者除外。
ChatClient
类包含一个用于向客户端写入消息的StreamWriter
对象,以及用于识别客户端的UserName属性。
using System;
using ChatServer;
using Grpc.Core;
using System.Collections.Concurrent;
namespace ChatServer.Services
{
public class ChatRoomService
{
private static readonly ConcurrentDictionary> _chatRooms = new ConcurrentDictionary>();
/// 从客户端读取单条消息。
///
///
///
public async Task ReadMessageWithTimeoutAsync(IAsyncStreamReader requestStream, TimeSpan timeout)
{
CancellationTokenSource cancellationTokenSource = new();
cancellationTokenSource.CancelAfter(timeout);
try
{
bool moveNext = await requestStream.MoveNext(cancellationTokenSource.Token);
if (moveNext == false)
{
throw new Exception("connection dropped exception");
}
return requestStream.Current;
}
catch (RpcException ex) when (ex.StatusCode == StatusCode.Cancelled)
{
throw new TimeoutException();
}
}
///
///
///
///
public async Task AddClientToChatRoom(string chatRoomId, ChatClient chatClient)
{
if (!_chatRooms.ContainsKey(chatRoomId))
{
_chatRooms[chatRoomId] = new List { chatClient };
}
else
{
var existingUser = _chatRooms[chatRoomId].FirstOrDefault(c => c.UserName == chatClient.UserName);
if (existingUser != null)
{
// 聊天室中已存在具有相同用户名的用户
throw new InvalidOperationException("User with the same name already exists in the chat room");
}
_chatRooms[chatRoomId].Add(chatClient);
}
await Task.CompletedTask;
}
/// 广播客户端加入房间的消息。
///
///
///
///
public async Task BroadcastClientJoinedRoomMessage(string userName, string chatRoomId)
{
if (_chatRooms.ContainsKey(chatRoomId))
{
var message = new ServerMessage { UserJoined = new ServerMessageUserJoined { UserName = userName } };
var tasks = new List();
foreach (var stream in _chatRooms[chatRoomId])
{
if (stream != null && stream != default)
{
tasks.Add(stream.StreamWriter.WriteAsync(message));
}
}
await Task.WhenAll(tasks);
}
}
///
///
///
///
///
public async Task BroadcastMessageToChatRoom(string chatRoomId, string senderName, string text)
{
if (_chatRooms.ContainsKey(chatRoomId))
{
var message = new ServerMessage { Chat = new ServerMessageChat { UserName = senderName, Text = text } };
var tasks = new List();
var streamList = _chatRooms[chatRoomId];
foreach (var stream in _chatRooms[chatRoomId])
{
//此senderName可以是每个用户的唯一标识Id。
if (stream != null && stream != default && stream.UserName != senderName)
{
tasks.Add(stream.StreamWriter.WriteAsync(message));
}
}
await Task.WhenAll(tasks);
}
}
}
public class ChatClient
{
public IServerStreamWriter StreamWriter { get; set; }
public string UserName { get; set; }
}
}
第6步:最后,实现步骤3中的gRPC HandleCommunication
方法
该方法 HandleCommunication
从客户端接收一个 requestStream
并向客户端发送一个 responseStream
。方法从客户端读取一条消息,提取用户名和 chatRoomId
,并处理两种情况:登录情况和聊天情况。
- 在登录场景中,该方法会验证用户名和
chatRoomId
是否有效,并据此向客户端发送响应消息。若登录成功,客户端将被加入到聊天室,并向聊天室内的所有客户端广播一条消息。 - 而在聊天场景中,该方法会将消息广播给聊天室内的所有客户端。
using System;
using ChatServer;
using Grpc.Core;
namespace ChatServer.Services
{
public class ChatService : ChatServer.ChatServerBase
{
private readonly ILogger _logger;
private readonly ChatRoomService _chatRoomService;
public ChatService(ChatRoomService chatRoomService, ILogger logger)
{
_chatRoomService = chatRoomService;
_logger = logger;
}
public override async Task HandleCommunication(IAsyncStreamReader requestStream, IServerStreamWriter responseStream, ServerCallContext context)
{
var userName = string.Empty;
var chatRoomId = string.Empty;
while (true)
{
// 从客户端读取消息。
var clientMessage = await _chatRoomService.ReadMessageWithTimeoutAsync(requestStream, Timeout.InfiniteTimeSpan);
switch (clientMessage.ContentCase)
{
case ClientMessage.ContentOneofCase.Login:
var loginMessage = clientMessage.Login;
// 从客户端消息中获取用户名和聊天室ID。
chatRoomId = loginMessage.ChatRoomId;
userName = loginMessage.UserName;
if (string.IsNullOrEmpty(userName) || string.IsNullOrEmpty(chatRoomId))
{
// 发送登录失败消息。
var failureMessage = new ServerMessage
{
LoginFailure = new ServerMessageLoginFailure { Reason = "Invalid username" }
};
await responseStream.WriteAsync(failureMessage);
return;
}
// 向客户端发送登录成功消息。
var successMessage = new ServerMessage { LoginSuccess = new ServerMessageLoginSuccess() };
await responseStream.WriteAsync(successMessage);
// 将客户端添加到聊天室。
await _chatRoomService.AddClientToChatRoom(chatRoomId, new ChatClient
{
StreamWriter = responseStream,
UserName = userName
});
break;
case ClientMessage.ContentOneofCase.Chat:
var chatMessage = clientMessage.Chat;
if (userName is not null && chatRoomId is not null)
{
// 向聊天室广播消息。
await _chatRoomService.BroadcastMessageToChatRoom(chatRoomId, userName, chatMessage.Text);
}
break;
}
}
}
}
}
完整的项目目录:
至此,第一部分结束。在接下来的第二部分中,我将创建 一个客户端项目,包含客户端实现,以完善这个聊天应用。
Source:
https://dzone.com/articles/create-a-concurrent-grpc-chat-messaging-in-net-7