--- /dev/null
+TODO: sending and receiving JSON objects (using messageConverters and JSON with STOMP protocol)
<groupId>org.hibernate</groupId>
<artifactId>hibernate-validator</artifactId>
</dependency>
+
+
+ <!-- Full-featured broker, TCP connection management. -->
+ <dependency>
+ <groupId>org.projectreactor</groupId>
+ <artifactId>reactor-net</artifactId>
+ <version>1.1.6.RELEASE</version>
+ </dependency>
</dependencies>
<build>
import java.time.LocalDateTime;
import org.springframework.messaging.handler.annotation.MessageMapping;
+import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Controller;
@Controller
// connecting to this URL -> http://172.17.0.3/spring-stomp-server/portfolio
// sending data to /app/greeting
- // The data sent to /app/greeting will retrieved by this method.
+ // The data sent to /app/greeting will be retrieved by this method.
@MessageMapping("/greeting")
+ @SendTo("/topic/greeting")
public String handle(String greeting) {
// STOMP clients subscribed to /topic/greeting will receive the returned data from this method.
// Destination is selected based on a convention but can be overridden via @SendTo
- return "[" + LocalDateTime.now() + ": " + greeting;
+ // I will be using @SendTo. In my case, it is not required (because it is the same as the destination selected
+ // based on the convention) but I will be using it just for fun.
+ return "[" + LocalDateTime.now() + "]: " + greeting;
}
}
--- /dev/null
+package de.spring.stomp.controllers;
+
+import java.security.Principal;
+import java.time.LocalDateTime;
+
+import org.springframework.messaging.handler.annotation.MessageMapping;
+import org.springframework.messaging.simp.annotation.SendToUser;
+import org.springframework.stereotype.Controller;
+
+@Controller
+public class UserMessageTradeController {
+
+ @MessageMapping("/trade")
+ @SendToUser(destinations="/topic/position-updates", broadcast=false /* No idea what is this for */)
+ public String executeTrade(String trade, Principal principal) {
+
+ return "[" + LocalDateTime.now() + "]: " + trade;
+ }
+}
--- /dev/null
+package de.spring.stomp.handlers;
+
+import java.security.Principal;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.http.server.ServerHttpRequest;
+import org.springframework.web.socket.WebSocketHandler;
+import org.springframework.web.socket.server.support.DefaultHandshakeHandler;
+
+/**
+ * In some cases it may be useful to assign an identity to a WebSocket session even when
+ * the user has not been formally authenticated. For example, a mobile app might assign some
+ * identity to anonymous users, perhaps based on geographical location. The do that currently,
+ * an application can sub-class DefaultHandshakeHandler and override the determineUser method.
+ * The custom handshake handler can then be plugged in (see examples in
+ * Section 25.2.4, “Deployment Considerations”)
+ */
+public class CustomHandshakeHandler extends DefaultHandshakeHandler {
+ private static final Logger LOGGER = LoggerFactory.getLogger(CustomHandshakeHandler.class);
+
+ @Override
+ protected Principal determineUser(ServerHttpRequest request, WebSocketHandler wsHandler,
+ Map<String, Object> attributes) {
+
+ Principal principal = request.getPrincipal();
+ LOGGER.info("CustomHandshakeHandler: " + principal.getName());
+
+ return super.determineUser(request, wsHandler, attributes);
+ }
+
+}
+++ /dev/null
-package de.spring.stomp.handlers;
-
-import org.springframework.web.socket.TextMessage;
-import org.springframework.web.socket.WebSocketSession;
-import org.springframework.web.socket.handler.TextWebSocketHandler;
-
-public class MyCustomHandler extends TextWebSocketHandler {
-
- @Override
- public void handleTextMessage(WebSocketSession session, TextMessage message) {
-
- }
-}
--- /dev/null
+package de.spring.stomp.interceptors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.MessageChannel;
+import org.springframework.messaging.simp.stomp.StompCommand;
+import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
+import org.springframework.messaging.support.ChannelInterceptorAdapter;
+
+public class CustomChannelInterceptor extends ChannelInterceptorAdapter {
+ private static final Logger LOGGER = LoggerFactory.getLogger(CustomChannelInterceptor.class);
+
+ @Override
+ public Message<?> preSend(Message<?> message, MessageChannel channel) {
+ StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);
+ StompCommand command = accessor.getCommand();
+
+ LOGGER.info("CustomChannelInterceptor preSend, StompCommand: " + command);
+ LOGGER.info("CustomChannelInterceptor preSend, login: " + accessor.getLogin());
+ LOGGER.info("CustomChannelInterceptor preSend, heartBeat: " + accessor.getHeartbeat());
+ LOGGER.info("CustomChannelInterceptor preSend, destination: " + accessor.getDestination());
+ LOGGER.info("CustomChannelInterceptor preSend, host: " + accessor.getHost());
+
+ return message;
+ }
+
+ @Override
+ public void postSend(Message<?> message, MessageChannel channel, boolean sent) {
+ StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);
+ StompCommand command = accessor.getCommand();
+
+ LOGGER.info("CustomChannelInterceptor postSend, StompCommand: " + command);
+ LOGGER.info("CustomChannelInterceptor postSend, login: " + accessor.getLogin());
+ LOGGER.info("CustomChannelInterceptor postSend, heartBeat: " + accessor.getHeartbeat());
+ LOGGER.info("CustomChannelInterceptor postSend, destination: " + accessor.getDestination());
+ LOGGER.info("CustomChannelInterceptor postSend, host: " + accessor.getHost());
+
+ }
+}
--- /dev/null
+package de.spring.stomp.interceptors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.http.server.ServerHttpRequest;
+import org.springframework.http.server.ServerHttpResponse;
+import org.springframework.web.socket.WebSocketHandler;
+import org.springframework.web.socket.server.support.HttpSessionHandshakeInterceptor;
+
+public class CustomHttpHandshakeInterceptor extends HttpSessionHandshakeInterceptor {
+ private static final Logger LOGGER = LoggerFactory.getLogger(CustomHttpHandshakeInterceptor.class);
+
+
+ @Override
+ public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response,
+ WebSocketHandler wsHandler, Exception ex) {
+ super.afterHandshake(request, response, wsHandler, ex);
+
+ LOGGER.info("Request URI:" + request.getURI());
+ LOGGER.info("Request remote address:" + request.getRemoteAddress());
+ LOGGER.info("Request local address:" + request.getLocalAddress());
+ LOGGER.info("Request headers:" + request.getHeaders());
+
+ LOGGER.info("Response headers:" + response.getHeaders());
+ }
+}
--- /dev/null
+package de.spring.stomp.listeners;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.context.ApplicationListener;
+import org.springframework.messaging.simp.broker.BrokerAvailabilityEvent;
+
+public class BrokerAvailabilityListener implements ApplicationListener<BrokerAvailabilityEvent> {
+ private static final Logger LOGGER = LoggerFactory.getLogger(BrokerAvailabilityListener.class);
+
+ @Override
+ public void onApplicationEvent(BrokerAvailabilityEvent event) {
+
+ LOGGER.info("BrokerAvailabilityEvent timestamp: " + event.getTimestamp());
+ LOGGER.info("BrokerAvailabilityEvent brokerAvailable: " + event.isBrokerAvailable());
+ LOGGER.info("BrokerAvailabilityEvent: " + event.toString());
+ }
+
+}
--- /dev/null
+package de.spring.stomp.listeners;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.context.ApplicationListener;
+import org.springframework.web.socket.messaging.SessionConnectEvent;
+
+public class SessionConnectListener implements ApplicationListener<SessionConnectEvent> {
+ private static final Logger LOGGER = LoggerFactory.getLogger(SessionConnectListener.class);
+
+ @Override
+ public void onApplicationEvent(SessionConnectEvent event) {
+ LOGGER.info("SessionConnectEvent timestamp: " + event.getTimestamp());
+ LOGGER.info("SessionConnectEvent user: " + event.getUser());
+ LOGGER.info("SessionConnectEvent: " + event.toString());
+ }
+
+}
--- /dev/null
+package de.spring.stomp.listeners;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.context.ApplicationListener;
+import org.springframework.web.socket.messaging.SessionConnectedEvent;
+
+public class SessionConnectedListener implements ApplicationListener<SessionConnectedEvent> {
+ private static final Logger LOGGER = LoggerFactory.getLogger(SessionConnectedListener.class);
+
+ @Override
+ public void onApplicationEvent(SessionConnectedEvent event) {
+ LOGGER.info("SessionConnectedEvent timestamp: " + event.getTimestamp());
+ LOGGER.info("SessionConnectedEvent user: " + event.getUser());
+ LOGGER.info("SessionConnectedEvent: " + event.toString());
+ }
+
+}
--- /dev/null
+package de.spring.stomp.listeners;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.context.ApplicationListener;
+import org.springframework.web.socket.messaging.SessionDisconnectEvent;
+
+public class SessionDisconnectListener implements ApplicationListener<SessionDisconnectEvent> {
+ private static final Logger LOGGER = LoggerFactory.getLogger(SessionUnsubscribeListener.class);
+
+ @Override
+ public void onApplicationEvent(SessionDisconnectEvent event) {
+ LOGGER.info("SessionDisconnectEvent timestamp: " + event.getTimestamp());
+ LOGGER.info("SessionDisconnectEvent user: " + event.getUser());
+ LOGGER.info("SessionDisconnectEvent sessionId: " + event.getSessionId());
+ LOGGER.info("SessionDisconnectEvent close status: " + event.getCloseStatus());
+ LOGGER.info("SessionDisconnectEvent: " + event.toString());
+ }
+
+}
--- /dev/null
+package de.spring.stomp.listeners;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.context.ApplicationListener;
+import org.springframework.web.socket.messaging.SessionSubscribeEvent;
+
+public class SessionSubscribeListener implements ApplicationListener<SessionSubscribeEvent> {
+ private static final Logger LOGGER = LoggerFactory.getLogger(SessionSubscribeListener.class);
+
+ @Override
+ public void onApplicationEvent(SessionSubscribeEvent event) {
+ LOGGER.info("SessionSubscribeEvent timestamp: " + event.getTimestamp());
+ LOGGER.info("SessionSubscribeEvent user: " + event.getUser());
+ LOGGER.info("SessionSubscribeEvent: " + event.toString());
+ }
+
+}
--- /dev/null
+package de.spring.stomp.listeners;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.context.ApplicationListener;
+import org.springframework.web.socket.messaging.SessionUnsubscribeEvent;
+
+public class SessionUnsubscribeListener implements ApplicationListener<SessionUnsubscribeEvent> {
+ private static final Logger LOGGER = LoggerFactory.getLogger(SessionUnsubscribeListener.class);
+
+ @Override
+ public void onApplicationEvent(SessionUnsubscribeEvent event) {
+ LOGGER.info("SessionUnsubscribeEvent timestamp: " + event.getTimestamp());
+ LOGGER.info("SessionUnsubscribeEvent user: " + event.getUser());
+ LOGGER.info("SessionUnsubscribeEvent: " + event.toString());
+ }
+
+}
--- /dev/null
+package de.spring.stomp.services;
+
+public interface RestGreetingService {
+
+ void doGreetings(String greeting);
+}
--- /dev/null
+package de.spring.stomp.services;
+
+public interface UserTradeService {
+
+ void doTrade(String user);
+}
--- /dev/null
+package de.spring.stomp.services.impl;
+
+import java.time.LocalDateTime;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.ApplicationListener;
+import org.springframework.messaging.simp.SimpMessagingTemplate;
+import org.springframework.messaging.simp.broker.BrokerAvailabilityEvent;
+import org.springframework.stereotype.Service;
+
+import de.spring.stomp.services.RestGreetingService;
+
+@Service("restGreetingService")
+public class RestGreetingServiceImpl
+ implements RestGreetingService, ApplicationListener<BrokerAvailabilityEvent> {
+ private final SimpMessagingTemplate template;
+
+ private volatile boolean isBrokerAvailable = true;
+
+ @Autowired
+ public RestGreetingServiceImpl(SimpMessagingTemplate template) {
+ this.template = template;
+ }
+
+ @Override
+ public void doGreetings(String greeting) {
+ String text = "[" + LocalDateTime.now() + "]:" + greeting;
+
+ if (isBrokerAvailable) {
+ // STOMP clients subscribed to /topic/greeting will receive the data sent by the convertAndSend method.
+ template.convertAndSend("/topic/greeting", text);
+ }
+ }
+
+ @Override
+ public void onApplicationEvent(BrokerAvailabilityEvent event) {
+ // Components using the SimpMessagingTemplate should subscribe to this event
+ // and avoid sending messages at times when the broker is not available.
+ // In any case they should be prepared to handle MessageDeliveryException
+ // when sending a message.
+ isBrokerAvailable = event.isBrokerAvailable();
+ }
+}
--- /dev/null
+package de.spring.stomp.services.impl;
+
+import java.time.LocalDateTime;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.ApplicationListener;
+import org.springframework.messaging.simp.SimpMessagingTemplate;
+import org.springframework.messaging.simp.broker.BrokerAvailabilityEvent;
+import org.springframework.stereotype.Service;
+
+import de.spring.stomp.services.UserTradeService;
+
+@Service("userTradeService")
+public class UserTradeServiceImpl
+ implements UserTradeService, ApplicationListener<BrokerAvailabilityEvent> {
+ private final SimpMessagingTemplate template;
+
+ private volatile boolean isBrokerAvailable = true;
+
+ @Autowired
+ public UserTradeServiceImpl(SimpMessagingTemplate template) {
+ this.template = template;
+ }
+
+ @Override
+ public void doTrade(String user) {
+ String text = "[" + LocalDateTime.now() + "]:" + user;
+
+ if (isBrokerAvailable) {
+ // STOMP clients subscribed to /topic/position-updates will receive the data sent by the convertAndSend method.
+ template.convertAndSendToUser(user, "/topic/position-updates", text);
+ }
+ }
+
+ @Override
+ public void onApplicationEvent(BrokerAvailabilityEvent event) {
+ // Components using the SimpMessagingTemplate should subscribe to this event
+ // and avoid sending messages at times when the broker is not available.
+ // In any case they should be prepared to handle MessageDeliveryException
+ // when sending a message.
+ isBrokerAvailable = event.isBrokerAvailable();
+ }
+
+}
package de.spring.webservices.rest.controller;
-import java.time.LocalDateTime;
-
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
+import de.spring.stomp.services.RestGreetingService;
+
@RestController
public class RestGreetingController {
-
- private SimpMessagingTemplate template;
+ private final RestGreetingService restGreetingService;
@Autowired
- public RestGreetingController(SimpMessagingTemplate template) {
- this.template = template;
+ public RestGreetingController(RestGreetingService restGreetingService) {
+ this.restGreetingService = restGreetingService;
}
// Sending data to /topic/greeting from REST service.
@RequestMapping(path="/greetings", method=RequestMethod.POST)
public void handle(@RequestBody String greeting) {
- String text = "[" + LocalDateTime.now() + "]:" + greeting;
// STOMP clients subscribed to /topic/greeting will receive the data sent by the convertAndSend method.
- this.template.convertAndSend("/topic/greeting", text);
+ restGreetingService.doGreetings(greeting);
}
}
--- /dev/null
+package de.spring.webservices.rest.controller;
+
+import java.time.LocalDateTime;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestMethod;
+
+import de.spring.stomp.services.UserTradeService;
+
+public class UserTradeController {
+ private final UserTradeService userTradeService;
+
+ @Autowired
+ public UserTradeController(UserTradeService userTradeService) {
+ this.userTradeService = userTradeService;
+ }
+
+ // Sending data to /topic/greeting from REST service.
+ // POST http://localhost:8080/spring-stomp-server/trade
+
+ @RequestMapping(path="/trade", method=RequestMethod.POST)
+ public void handle(@RequestBody String user) {
+ userTradeService.doTrade(user);
+ }
+}
<property name="maxBinaryMessageBufferSize" value="8192"/>
</bean>
+ <!--
+ Note that even though the STOMP CONNECT frame has "login" and "passcode" headers that can be used for
+ authentication, Spring’s STOMP WebSocket support ignores them and currently expects users to have been
+ authenticated already via HTTP.
+
+ In some cases it may be useful to assign an identity to a WebSocket session even when the user has not
+ been formally authenticated. For example, a mobile app might assign some identity to anonymous users,
+ perhaps based on geographical location. The do that currently, an application can sub-class DefaultHandshakeHandler
+ and override the determineUser method. The custom handshake handler can then be plugged in
+ (see examples in Section 25.2.4, “Deployment Considerations”).
+ -->
+ <bean id="customHandshakeHandler" class="de.spring.stomp.handlers.CustomHandshakeHandler"/>
+
+ <!-- Interceptors -->
+ <bean id="customChannelInterceptor" class="de.spring.stomp.interceptors.CustomChannelInterceptor"/>
+ <bean id="customHttpHandshakeInterceptor" class="de.spring.stomp.interceptors.CustomHttpHandshakeInterceptor"/>
+
+ <!-- Listeners -->
+ <bean id="brokerAvailabilityListener" class="de.spring.stomp.listeners.BrokerAvailabilityListener"/>
+ <bean id="sessionConnectedListener" class="de.spring.stomp.listeners.SessionConnectedListener"/>
+ <bean id="sessionConnectListener" class="de.spring.stomp.listeners.SessionConnectListener"/>
+ <bean id="sessionDisconnectListener" class="de.spring.stomp.listeners.SessionDisconnectListener"/>
+ <bean id="sessionSubscribeListener" class="de.spring.stomp.listeners.SessionSubscribeListener"/>
+ <bean id="sessionUnsubscribeListener" class="de.spring.stomp.listeners.SessionUnsubscribeListener"/>
+
<!-- STOMP -->
+ <!-- Simple broker -->
<websocket:message-broker application-destination-prefix="/app">
+ <websocket:transport send-timeout="15000" send-buffer-size="524288" message-size="131072" />
<websocket:stomp-endpoint path="/portfolio" allowed-origins="*">
+ <websocket:handshake-handler ref="customHandshakeHandler" />
+ <websocket:handshake-interceptors>
+ <ref bean="customHttpHandshakeInterceptor"/>
+ </websocket:handshake-interceptors>
+ <websocket:sockjs/>
+ </websocket:stomp-endpoint>
+ <!--
+ In memory broker.
+
+ hearbeat: Configure the value for the heartbeat settings. The first number represents how often the server will
+ write or send a heartbeat. The second is how often the client should write. 0 means no heartbeats.
+ By default this is set to "0, 0" unless the scheduler attribute is also set in which case the
+ default becomes "10000,10000" (in milliseconds).
+
+ Matching the value used by client.
+ -->
+ <websocket:simple-broker prefix="/topic, /queue"
+ heartbeat="0,20000" />
+
+ <websocket:client-inbound-channel>
+ <websocket:executor core-pool-size="100" max-pool-size="200" keep-alive-seconds="600"/>
+ <websocket:interceptors>
+ <ref bean="customChannelInterceptor"/>
+ </websocket:interceptors>
+ </websocket:client-inbound-channel>
+ <websocket:client-outbound-channel>
+ <websocket:executor core-pool-size="101" max-pool-size="201" keep-alive-seconds="601"/>
+ <websocket:interceptors>
+ <ref bean="customChannelInterceptor"/>
+ </websocket:interceptors>
+ </websocket:client-outbound-channel>
+ </websocket:message-broker>
+
+ <!-- Full-featured broker -->
+ <websocket:message-broker application-destination-prefix="/app">
+ <websocket:transport send-timeout="15000" send-buffer-size="524288" message-size="131072" />
+ <websocket:stomp-endpoint path="/fullportfolio" allowed-origins="*">
+ <websocket:handshake-handler ref="customHandshakeHandler" />
+ <websocket:handshake-interceptors>
+ <ref bean="customHttpHandshakeInterceptor"/>
+ </websocket:handshake-interceptors>
<websocket:sockjs/>
</websocket:stomp-endpoint>
<!--
- Full-featured broker, see: http://docs.spring.io/spring/docs/current/spring-framework-reference/html/websocket.html#websocket-stomp-handle-broker-relay
- <websocket:stomp-broker-relay prefix="/topic,/queue" />
+ Full-featured broker
-->
- <websocket:simple-broker prefix="/topic, /queue"/>
+ <websocket:stomp-broker-relay prefix="/topic, /queue"
+ relay-host="" relay-port="" client-login="" client-passcode="" system-login="" system-passcode=""
+ heartbeat-send-interval="" heartbeat-receive-interval="" auto-startup="true" virtual-host=""/>
+
+ <websocket:client-inbound-channel>
+ <websocket:executor core-pool-size="100" max-pool-size="200" keep-alive-seconds="600"/>
+ <websocket:interceptors>
+ <ref bean="customChannelInterceptor"/>
+ </websocket:interceptors>
+ </websocket:client-inbound-channel>
+ <websocket:client-outbound-channel>
+ <websocket:executor core-pool-size="101" max-pool-size="201" keep-alive-seconds="601"/>
+ <websocket:interceptors>
+ <ref bean="customChannelInterceptor"/>
+ </websocket:interceptors>
+ </websocket:client-outbound-channel>
</websocket:message-broker>
</beans>