Periodic session timeout sweep

This commit is contained in:
Anton Tananaev
2026-05-15 06:42:08 -07:00
parent e3c06c8423
commit 950e787885
3 changed files with 81 additions and 32 deletions

View File

@@ -50,6 +50,7 @@ public class ScheduleManager implements LifecycleObject {
TaskDeleteTemporary.class,
TaskReports.class,
TaskDeviceInactivityCheck.class,
TaskSessionTimeout.class,
TaskWebSocketKeepalive.class)
.forEachOrdered(taskClass -> {
var task = injector.getInstance(taskClass);

View File

@@ -0,0 +1,47 @@
/*
* Copyright 2026 Anton Tananaev (anton@traccar.org)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.traccar.schedule;
import jakarta.inject.Inject;
import org.traccar.config.Config;
import org.traccar.config.Keys;
import org.traccar.session.ConnectionManager;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class TaskSessionTimeout implements ScheduleTask {
private final ConnectionManager connectionManager;
private final long period;
@Inject
public TaskSessionTimeout(Config config, ConnectionManager connectionManager) {
this.connectionManager = connectionManager;
period = Math.max(config.getLong(Keys.STATUS_TIMEOUT) / 10, 1);
}
@Override
public void schedule(ScheduledExecutorService executor) {
executor.scheduleAtFixedRate(this, period, period, TimeUnit.SECONDS);
}
@Override
public void run() {
connectionManager.sweepIdleSessions();
}
}

View File

@@ -16,8 +16,6 @@
package org.traccar.session;
import io.netty.channel.Channel;
import io.netty.util.Timeout;
import io.netty.util.Timer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.traccar.Protocol;
@@ -54,7 +52,6 @@ import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@Singleton
@@ -77,7 +74,6 @@ public class ConnectionManager implements BroadcastInterface {
private final CacheManager cacheManager;
private final Storage storage;
private final NotificationManager notificationManager;
private final Timer timer;
private final BroadcastService broadcastService;
private final DeviceLookupService deviceLookupService;
@@ -85,18 +81,15 @@ public class ConnectionManager implements BroadcastInterface {
private final Map<Long, Set<Long>> userDevices = new HashMap<>();
private final Map<Long, Set<Long>> deviceUsers = new HashMap<>();
private final Map<Long, Timeout> timeouts = new ConcurrentHashMap<>();
@Inject
public ConnectionManager(
Config config, CacheManager cacheManager, Storage storage,
NotificationManager notificationManager, Timer timer, BroadcastService broadcastService,
NotificationManager notificationManager, BroadcastService broadcastService,
DeviceLookupService deviceLookupService) {
this.config = config;
this.cacheManager = cacheManager;
this.storage = storage;
this.notificationManager = notificationManager;
this.timer = timer;
this.broadcastService = broadcastService;
this.deviceLookupService = deviceLookupService;
deviceTimeout = config.getLong(Keys.STATUS_TIMEOUT);
@@ -105,6 +98,16 @@ public class ConnectionManager implements BroadcastInterface {
broadcastService.registerListener(this);
}
public void sweepIdleSessions() {
long cutoff = System.currentTimeMillis() - deviceTimeout * 1000;
for (DeviceSession session : sessionsByDeviceId.values()) {
if (session.getLastUpdate() < cutoff) {
deviceUnknown(session.getDeviceId());
}
}
unknownByEndpoint.values().removeIf(entry -> entry.timestamp() < cutoff);
}
public DeviceSession getDeviceSession(long deviceId) {
return sessionsByDeviceId.get(deviceId);
}
@@ -114,19 +117,28 @@ public class ConnectionManager implements BroadcastInterface {
String... uniqueIds) throws Exception {
ConnectionKey connectionKey = new ConnectionKey(channel, remoteAddress);
Map<String, DeviceSession> endpointSessions = sessionsByEndpoint.computeIfAbsent(
connectionKey, k -> new ConcurrentHashMap<>());
Map<String, DeviceSession> endpointSessions = sessionsByEndpoint.get(connectionKey);
uniqueIds = Arrays.stream(uniqueIds).filter(Objects::nonNull).toArray(String[]::new);
if (uniqueIds.length > 0) {
for (String uniqueId : uniqueIds) {
DeviceSession deviceSession = endpointSessions.get(uniqueId);
if (deviceSession != null) {
return deviceSession;
if (endpointSessions != null) {
for (String uniqueId : uniqueIds) {
DeviceSession deviceSession = endpointSessions.get(uniqueId);
if (deviceSession != null) {
deviceSession.setLastUpdate(System.currentTimeMillis());
return deviceSession;
}
}
}
} else {
return endpointSessions.values().stream().findAny().orElse(null);
if (endpointSessions != null) {
DeviceSession deviceSession = endpointSessions.values().stream().findAny().orElse(null);
if (deviceSession != null) {
deviceSession.setLastUpdate(System.currentTimeMillis());
}
return deviceSession;
}
return null;
}
Device device = deviceLookupService.lookup(uniqueIds);
@@ -154,11 +166,14 @@ public class ConnectionManager implements BroadcastInterface {
DeviceSession deviceSession = new DeviceSession(
device.getId(), device.getUniqueId(), device.getModel(), protocol, channel, remoteAddress);
endpointSessions.put(device.getUniqueId(), deviceSession);
sessionsByEndpoint
.computeIfAbsent(connectionKey, k -> new ConcurrentHashMap<>())
.put(device.getUniqueId(), deviceSession);
sessionsByDeviceId.put(device.getId(), deviceSession);
if (oldSession == null) {
cacheManager.addDevice(device.getId(), connectionKey);
cacheManager.addDevice(device.getId(), connectionKey);
if (oldSession != null) {
cacheManager.removeDevice(device.getId(), oldSession.getConnectionKey());
}
return deviceSession;
@@ -256,19 +271,6 @@ public class ConnectionManager implements BroadcastInterface {
device.setLastUpdate(time);
}
Timeout timeout = timeouts.remove(deviceId);
if (timeout != null) {
timeout.cancel();
}
if (status.equals(Device.STATUS_ONLINE)) {
timeouts.put(deviceId, timer.newTimeout(timeout1 -> {
if (!timeout1.isCancelled()) {
deviceUnknown(deviceId);
}
}, deviceTimeout, TimeUnit.SECONDS));
}
try {
storage.updateObject(device, new Request(
new Columns.Include("status", "lastUpdate"),
@@ -293,7 +295,6 @@ public class ConnectionManager implements BroadcastInterface {
if (local) {
broadcastService.updateDevice(true, device);
} else if (Device.STATUS_ONLINE.equals(device.getStatus())) {
timeouts.remove(device.getId());
removeDeviceSession(device.getId());
}
for (long userId : deviceUsers.getOrDefault(device.getId(), Collections.emptySet())) {