blob: 876f71b95ffe52394170387eda32eaa993ecc84c [file] [log] [blame]
/*
* Copyright (c) 1998, 2020 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0,
* or the Eclipse Distribution License v. 1.0 which is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
*/
// Contributors:
// Oracle - initial API and implementation from Oracle TopLink
package org.eclipse.persistence.sessions.coordination;
import java.net.*;
import java.io.IOException;
import org.eclipse.persistence.exceptions.DiscoveryException;
import org.eclipse.persistence.internal.sessions.coordination.*;
import org.eclipse.persistence.sessions.SessionProfiler;
/**
* <p>
* <b>Purpose</b>: Detects new members of a logical EclipseLink cluster.
* <p>
* <b>Description</b>: Each RemoteCommandManager has its own DiscoveryManager,
* which handles the detection of other remote command services as they become available.
* The DiscoveryManager is an active object (in that it extends Thread) and becomes
* a separate thread when it is started using startDiscovery().
* <p>
* Discovery is done through the use of a multicast. Each discovery manager
* joins the multicast group and announces itself to the group. As it receives
* service announcements from other discovery managers it notifies the RCM to
* establish connections to and from the new service.
*
* @see RemoteCommandManager
* @see java.net.MulticastSocket
* @author Steven Vo
* @since OracleAS TopLink 10<i>g</i> (9.0.4)
*/
public class DiscoveryManager implements Runnable {
/** Default value constants */
public static final String DEFAULT_MULTICAST_GROUP = "226.10.12.64";
public static final int DEFAULT_MULTICAST_PORT = 3121;
public static final int DEFAULT_PACKET_TIME_TO_LIVE = 2;
public static final int DEFAULT_ANNOUNCEMENT_DELAY = 1000;
/** Defines the IP address of the multicast group */
protected String multicastGroupAddress = DEFAULT_MULTICAST_GROUP;
/** Defines the port the multicast socket will be using */
protected int multicastPort = DEFAULT_MULTICAST_PORT;
/** The multicast socket used for discovery */
protected MulticastSocket communicationSocket;
/**
* Number of hops in the life of the datapacket
* Default is 2, a hub and an interface card to prevent the data packets from leaving the localnetwork.
*/
protected int packetTimeToLive = DEFAULT_PACKET_TIME_TO_LIVE;
/** Indicates to the listening thread that it should stop */
protected boolean stopListening = false;
/** Delay time in millis between initialization and when the announcement is sent */
protected int announcementDelay = DEFAULT_ANNOUNCEMENT_DELAY;
/** The remote command manager responsible for this service */
protected RemoteCommandManager rcm;
/**
* Constructors to create a discovery manager.
*/
public DiscoveryManager(RemoteCommandManager mgr) {
this.rcm = mgr;
}
public DiscoveryManager(String address, int port, RemoteCommandManager mgr) {
this(mgr);
this.multicastGroupAddress = address;
this.multicastPort = port;
}
public DiscoveryManager(String address, int port, int delay, RemoteCommandManager mgr) {
this(address, port, mgr);
this.announcementDelay = delay;
}
/**
* INTERNAL:
* Send out an announcement that we are here.
*/
public void announceSession() {
rcm.logDebug("sending_announcement", null);
ServiceAnnouncement outMsg = new ServiceAnnouncement(rcm.getServiceId());
byte[] outBytes = outMsg.toBytes();
try {
// Create a packet to send and send it out to everyone listening
DatagramPacket sendPacket = new DatagramPacket(outBytes, outBytes.length, InetAddress.getByName(multicastGroupAddress), multicastPort);
getCommunicationSocket().send(sendPacket);
Object[] args = null;
rcm.logInfo("announcement_sent", args);
} catch (Exception ex) {
// We got an exception. Map it to an RCM exception and call the handler
DiscoveryException discoveryEx = DiscoveryException.errorSendingAnnouncement(ex);
rcm.handleException(discoveryEx);
}
}
/**
* ADVANCED:
* Announce the local service and join the cluster
*/
public void startDiscovery() {
this.rcm.getCommandProcessor().updateProfile(SessionProfiler.RcmStatus, "Started");
// Only start if we are currently stopped
if (this.isDiscoveryStopped()) {
this.rcm.getServerPlatform().launchContainerRunnable(this);
}
}
/**
* ADVANCED:
* Stop accepting announcements from other services becoming available.
* Note that this does not remove the local service from the cluster.
*/
public void stopDiscovery() {
this.rcm.getCommandProcessor().updateProfile(SessionProfiler.RcmStatus, "Stopped");
stopListening();
try {
// Put in a sleep to give the listener thread a chance to stop
Thread.sleep(500);
} catch (InterruptedException exception) {
}
this.communicationSocket = null;
}
/**
* ADVANCED:
* Return true if discovery has been stopped at the time this method is called.
* If false is returned then it is undefined whether discovery is started or
* stopped. It may be started, or it may be in the process of starting or
* stopping.
*/
public boolean isDiscoveryStopped() {
return (communicationSocket == null);
}
/**
* INTERNAL:
* Create the multicast socket and join the multicast group.
*/
public void createCommunicationSocket() {
Object[] args = { multicastGroupAddress, "" + multicastPort };
rcm.logDebug("initializing_discovery_resources", args);
if (communicationSocket == null) {
try {
communicationSocket = new MulticastSocket(multicastPort);
communicationSocket.setTimeToLive(getPacketTimeToLive());
communicationSocket.joinGroup(InetAddress.getByName(multicastGroupAddress));
} catch (IOException ex) {
// Either we couldn't create the socket or we couldn't join the group
DiscoveryException discoveryEx = DiscoveryException.errorJoiningMulticastGroup(ex);
rcm.handleException(discoveryEx);
}
}
}
/**
* INTERNAL:
* Return the socket that will be used for the multicast.
*/
public MulticastSocket getCommunicationSocket() {
return communicationSocket;
}
/**
* INTERNAL:
* This is the main execution method of discovery. It will create a socket to
* listen on, create a local connection for this service and announce that we are
* open for business.
*/
@Override
public void run() {
//Initialize the communication socket
createCommunicationSocket();
// Create the local connection from which we will receive commands
rcm.getTransportManager().createLocalConnection();
// Announce to all other discovery managers that this service is up. The
// delay allows time for posting of connections to the name service
try {
Thread.sleep(announcementDelay);
} catch (InterruptedException exception) {
}
announceSession();
// Listen for other sessions that are joining
startListening();
}
/**
* INTERNAL:
* This method puts us into the listening mode loop. This thread blocks, waiting
* on announcements that we receive from other discovery managers.
*/
public void startListening() {
byte[] recvBuf = new byte[128];
// Only stop when we get the directive to stop
stopListening = false;
rcm.logInfo("discovery_manager_active", null);
while (!stopListening) {
DatagramPacket recvPacket = new DatagramPacket(recvBuf, recvBuf.length);
ServiceAnnouncement inMsg;
// Block waiting for a message
try {
getCommunicationSocket().receive(recvPacket);
} catch (IOException exception) {
if (stopListening) {
// We caused the exception by closing the socket
rcm.logInfo("discovery_manager_stopped", null);
return;
} else {
// Exception was caused by something else (e.g. network error, etc.)
rcm.handleException(DiscoveryException.errorReceivingAnnouncement(exception));
}
}
// We received a message, unmarshall it into an announcement
try {
inMsg = new ServiceAnnouncement(recvPacket.getData());
} catch (Exception ex) {
// Log a warning that we couldn't process the announcement
Object[] args = { ex };
rcm.logWarning("received_corrupt_announcement", args);
continue;
}
// If the msg is not from ourselves, and is announcing a service on
// the same channel as we are on then we should do something about it
if (!rcm.getServiceId().getId().equals(inMsg.getServiceId().getId()) && (rcm.getServiceId().getChannel().equalsIgnoreCase(inMsg.getServiceId().getChannel()))) {
receivedAnnouncement(inMsg.getServiceId());
}
}
}
/**
* INTERNAL:
* Signal this instance to stop listening.
*/
public void stopListening() {
this.stopListening = true;
if (getCommunicationSocket() != null) {
getCommunicationSocket().close();
}
}
/**
* INTERNAL:
* Process the announcement that indicates that a new service is online
*/
public void receivedAnnouncement(ServiceId serviceId) {
Object[] args = { serviceId };
rcm.logInfo("announcement_received", args);
// Notify the RCM that a new service has been detected
rcm.newServiceDiscovered(serviceId);
}
/**
* PUBLIC:
* Set the amount of time in millis that the service should wait between the time
* that this Remote Service is available and a session announcement is sent out
* to other discovery managers. This may be needed to give some systems more time
* to post their connections into the naming service. Takes effect the next time
* listening is started.
*/
public void setAnnouncementDelay(int millisecondsToDelay) {
announcementDelay = millisecondsToDelay;
}
/**
* PUBLIC:
* Return the amount of time in millis that the service should wait between the time
* that this Remote Service is available and a session announcement is sent out
* to other discovery managers.
*/
public int getAnnouncementDelay() {
return announcementDelay;
}
/**
* PUBLIC:
* Return the host address of the multicast group.
*/
public String getMulticastGroupAddress() {
return (multicastGroupAddress);
}
/**
* PUBLIC:
* Set the host address of the multicast group. Takes effect the next time
* listening is started.
*/
public void setMulticastGroupAddress(String address) {
this.multicastGroupAddress = address;
}
/**
* PUBLIC:
* Set the multicast port used for discovery. Takes effect the next time
* listening is started.
*/
public void setMulticastPort(int port) {
this.multicastPort = port;
}
/**
* PUBLIC:
* Return the multicast port used for discovery.
*/
public int getMulticastPort() {
return (multicastPort);
}
/**
* INTERNAL: invoke when the RCM shutdown. Subclass overrides this method if necessary.
*/
protected void shallowCopy(DiscoveryManager dmgr) {
this.multicastGroupAddress = dmgr.multicastGroupAddress;
this.multicastPort = dmgr.multicastPort;
this.announcementDelay = dmgr.announcementDelay;
this.rcm = dmgr.rcm;
}
/**
* PUBLIC:
* Returns the number of hops the data packets of the session announcement will take before expiring.
* The default is 2, a hub and an interface card to prevent the data packets from leaving the local network.
*/
public int getPacketTimeToLive() {
return this.packetTimeToLive;
}
/**
* PUBLIC:
* Set the number of hops the data packets of the session announcement will take before expiring.
* The default is 2, a hub and an interface card to prevent the data packets from leaving the local network.
*
* Note that if sessions are hosted on different LANs that are part of WAN, the announcement sending by one session
* may not reach other sessions. In this case, consult your network administrator for the right time-to-live value
* or test your network by increase the value until sessions receive announcement sent by others.
*/
public void setPacketTimeToLive(int newPacketTimeToLive) {
this.packetTimeToLive = newPacketTimeToLive;
}
}