Fix code issues

This commit is contained in:
gion
2020-05-04 19:46:02 +02:00
parent 0b974d09ca
commit 6e22e9222b
11 changed files with 287 additions and 218 deletions

View File

@@ -84,10 +84,10 @@ namespace Emby.Server.Implementations.Session
_logger = loggerFactory.CreateLogger(GetType().Name);
_json = json;
_httpServer = httpServer;
httpServer.WebSocketConnected += _serverManager_WebSocketConnected;
httpServer.WebSocketConnected += OnServerManagerWebSocketConnected;
}
void _serverManager_WebSocketConnected(object sender, GenericEventArgs<IWebSocketConnection> e)
void OnServerManagerWebSocketConnected(object sender, GenericEventArgs<IWebSocketConnection> e)
{
var session = GetSession(e.Argument.QueryString, e.Argument.RemoteEndPoint);
@@ -121,7 +121,7 @@ namespace Emby.Server.Implementations.Session
public void Dispose()
{
_httpServer.WebSocketConnected -= _serverManager_WebSocketConnected;
_httpServer.WebSocketConnected -= OnServerManagerWebSocketConnected;
StopKeepAlive();
}
@@ -149,7 +149,7 @@ namespace Emby.Server.Implementations.Session
private void OnWebSocketClosed(object sender, EventArgs e)
{
var webSocket = (IWebSocketConnection) sender;
_logger.LogDebug("WebSockets {0} closed.", webSocket);
_logger.LogDebug("WebSocket {0} is closed.", webSocket);
RemoveWebSocket(webSocket);
}
@@ -157,7 +157,7 @@ namespace Emby.Server.Implementations.Session
/// Adds a WebSocket to the KeepAlive watchlist.
/// </summary>
/// <param name="webSocket">The WebSocket to monitor.</param>
private async Task KeepAliveWebSocket(IWebSocketConnection webSocket)
private void KeepAliveWebSocket(IWebSocketConnection webSocket)
{
lock (_webSocketsLock)
{
@@ -175,11 +175,11 @@ namespace Emby.Server.Implementations.Session
// Notify WebSocket about timeout
try
{
await SendForceKeepAlive(webSocket);
SendForceKeepAlive(webSocket).Wait();
}
catch (WebSocketException exception)
{
_logger.LogWarning(exception, "Error sending ForceKeepAlive message to WebSocket {0}.", webSocket);
_logger.LogWarning(exception, "Cannot send ForceKeepAlive message to WebSocket {0}.", webSocket);
}
}
@@ -213,7 +213,8 @@ namespace Emby.Server.Implementations.Session
{
_keepAliveCancellationToken = new CancellationTokenSource();
// Start KeepAlive watcher
KeepAliveSockets(
var task = RepeatAsyncCallbackEvery(
KeepAliveSockets,
TimeSpan.FromSeconds(WebSocketLostTimeout * IntervalFactor),
_keepAliveCancellationToken.Token);
}
@@ -245,73 +246,58 @@ namespace Emby.Server.Implementations.Session
}
/// <summary>
/// Checks status of KeepAlive of WebSockets once every the specified interval time.
/// Checks status of KeepAlive of WebSockets.
/// </summary>
/// <param name="interval">The interval.</param>
/// <param name="cancellationToken">The cancellation token.</param>
private async Task KeepAliveSockets(TimeSpan interval, CancellationToken cancellationToken)
private async Task KeepAliveSockets()
{
while (true)
IEnumerable<IWebSocketConnection> inactive;
IEnumerable<IWebSocketConnection> lost;
lock (_webSocketsLock)
{
_logger.LogDebug("Watching {0} WebSockets.", _webSockets.Count());
_logger.LogDebug("Watching {0} WebSockets.", _webSockets.Count);
IEnumerable<IWebSocketConnection> inactive;
IEnumerable<IWebSocketConnection> lost;
lock (_webSocketsLock)
inactive = _webSockets.Where(i =>
{
inactive = _webSockets.Where(i =>
{
var elapsed = (DateTime.UtcNow - i.LastKeepAliveDate).TotalSeconds;
return (elapsed > WebSocketLostTimeout * ForceKeepAliveFactor) && (elapsed < WebSocketLostTimeout);
});
lost = _webSockets.Where(i => (DateTime.UtcNow - i.LastKeepAliveDate).TotalSeconds >= WebSocketLostTimeout);
}
var elapsed = (DateTime.UtcNow - i.LastKeepAliveDate).TotalSeconds;
return (elapsed > WebSocketLostTimeout * ForceKeepAliveFactor) && (elapsed < WebSocketLostTimeout);
});
lost = _webSockets.Where(i => (DateTime.UtcNow - i.LastKeepAliveDate).TotalSeconds >= WebSocketLostTimeout);
}
if (inactive.Any())
{
_logger.LogInformation("Sending ForceKeepAlive message to {0} inactive WebSockets.", inactive.Count());
}
if (inactive.Any())
{
_logger.LogInformation("Sending ForceKeepAlive message to {0} inactive WebSockets.", inactive.Count());
}
foreach (var webSocket in inactive)
{
try
{
await SendForceKeepAlive(webSocket);
}
catch (WebSocketException exception)
{
_logger.LogInformation(exception, "Error sending ForceKeepAlive message to WebSocket.");
lost = lost.Append(webSocket);
}
}
lock (_webSocketsLock)
{
if (lost.Any())
{
_logger.LogInformation("Lost {0} WebSockets.", lost.Count());
foreach (var webSocket in lost.ToList())
{
// TODO: handle session relative to the lost webSocket
RemoveWebSocket(webSocket);
}
}
if (!_webSockets.Any())
{
StopKeepAlive();
}
}
// Wait for next interval
Task task = Task.Delay(interval, cancellationToken);
foreach (var webSocket in inactive)
{
try
{
await task;
await SendForceKeepAlive(webSocket);
}
catch (TaskCanceledException)
catch (WebSocketException exception)
{
return;
_logger.LogInformation(exception, "Error sending ForceKeepAlive message to WebSocket.");
lost = lost.Append(webSocket);
}
}
lock (_webSocketsLock)
{
if (lost.Any())
{
_logger.LogInformation("Lost {0} WebSockets.", lost.Count());
foreach (var webSocket in lost.ToList())
{
// TODO: handle session relative to the lost webSocket
RemoveWebSocket(webSocket);
}
}
if (!_webSockets.Any())
{
StopKeepAlive();
}
}
}
@@ -329,5 +315,30 @@ namespace Emby.Server.Implementations.Session
Data = WebSocketLostTimeout
}, CancellationToken.None);
}
/// <summary>
/// Runs a given async callback once every specified interval time, until cancelled.
/// </summary>
/// <param name="callback">The async callback.</param>
/// <param name="interval">The interval time.</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>Task.</returns>
private async Task RepeatAsyncCallbackEvery(Func<Task> callback, TimeSpan interval, CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
await callback();
Task task = Task.Delay(interval, cancellationToken);
try
{
await task;
}
catch (TaskCanceledException)
{
return;
}
}
}
}
}