blob: c58c998467c5e45ad5b7ee56946deaa1f6239e8e [file] [log] [blame]
/**********
This library is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by the
Free Software Foundation; either version 3 of the License, or (at your
option) any later version. (See <http://www.gnu.org/copyleft/lesser.html>.)
This library is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
more details.
You should have received a copy of the GNU Lesser General Public License
along with this library; if not, write to the Free Software Foundation, Inc.,
51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
**********/
// "liveMedia"
// Copyright (c) 1996-2020 Live Networks, Inc. All rights reserved.
// A subclass of "ServerMediaSession" that can be used to create a (unicast) RTSP servers that acts as a 'proxy' for
// another (unicast or multicast) RTSP/RTP stream.
// Implementation
#include "liveMedia.hh"
#include "RTSPCommon.hh"
#include "GroupsockHelper.hh" // for "our_random()"
#ifndef MILLION
#define MILLION 1000000
#endif
// A "OnDemandServerMediaSubsession" subclass, used to implement a unicast RTSP server that's proxying another RTSP stream:
class ProxyServerMediaSubsession: public OnDemandServerMediaSubsession {
public:
ProxyServerMediaSubsession(MediaSubsession& mediaSubsession,
portNumBits initialPortNum, Boolean multiplexRTCPWithRTP);
virtual ~ProxyServerMediaSubsession();
char const* codecName() const { return fCodecName; }
char const* url() const { return ((ProxyServerMediaSession*)fParentSession)->url(); }
private: // redefined virtual functions
virtual FramedSource* createNewStreamSource(unsigned clientSessionId,
unsigned& estBitrate);
virtual void closeStreamSource(FramedSource *inputSource);
virtual RTPSink* createNewRTPSink(Groupsock* rtpGroupsock,
unsigned char rtpPayloadTypeIfDynamic,
FramedSource* inputSource);
virtual Groupsock* createGroupsock(struct in_addr const& addr, Port port);
virtual RTCPInstance* createRTCP(Groupsock* RTCPgs, unsigned totSessionBW, /* in kbps */
unsigned char const* cname, RTPSink* sink);
private:
static void subsessionByeHandler(void* clientData);
void subsessionByeHandler();
int verbosityLevel() const { return ((ProxyServerMediaSession*)fParentSession)->fVerbosityLevel; }
private:
friend class ProxyRTSPClient;
MediaSubsession& fClientMediaSubsession; // the 'client' media subsession object that corresponds to this 'server' media subsession
char const* fCodecName; // copied from "fClientMediaSubsession" once it's been set up
ProxyServerMediaSubsession* fNext; // used when we're part of a queue
Boolean fHaveSetupStream;
};
////////// ProxyServerMediaSession implementation //////////
UsageEnvironment& operator<<(UsageEnvironment& env, const ProxyServerMediaSession& psms) { // used for debugging
return env << "ProxyServerMediaSession[" << psms.url() << "]";
}
ProxyRTSPClient*
defaultCreateNewProxyRTSPClientFunc(ProxyServerMediaSession& ourServerMediaSession,
char const* rtspURL,
char const* username, char const* password,
portNumBits tunnelOverHTTPPortNum, int verbosityLevel,
int socketNumToServer) {
return new ProxyRTSPClient(ourServerMediaSession, rtspURL, username, password,
tunnelOverHTTPPortNum, verbosityLevel, socketNumToServer);
}
ProxyServerMediaSession* ProxyServerMediaSession
::createNew(UsageEnvironment& env, GenericMediaServer* ourMediaServer,
char const* inputStreamURL, char const* streamName,
char const* username, char const* password,
portNumBits tunnelOverHTTPPortNum, int verbosityLevel, int socketNumToServer,
MediaTranscodingTable* transcodingTable) {
return new ProxyServerMediaSession(env, ourMediaServer, inputStreamURL, streamName, username, password,
tunnelOverHTTPPortNum, verbosityLevel, socketNumToServer,
transcodingTable);
}
ProxyServerMediaSession
::ProxyServerMediaSession(UsageEnvironment& env, GenericMediaServer* ourMediaServer,
char const* inputStreamURL, char const* streamName,
char const* username, char const* password,
portNumBits tunnelOverHTTPPortNum, int verbosityLevel,
int socketNumToServer,
MediaTranscodingTable* transcodingTable,
createNewProxyRTSPClientFunc* ourCreateNewProxyRTSPClientFunc,
portNumBits initialPortNum, Boolean multiplexRTCPWithRTP)
: ServerMediaSession(env, streamName, NULL, NULL, False, NULL),
describeCompletedFlag(0), fOurMediaServer(ourMediaServer), fClientMediaSession(NULL),
fVerbosityLevel(verbosityLevel),
fPresentationTimeSessionNormalizer(new PresentationTimeSessionNormalizer(envir())),
fCreateNewProxyRTSPClientFunc(ourCreateNewProxyRTSPClientFunc),
fTranscodingTable(transcodingTable),
fInitialPortNum(initialPortNum), fMultiplexRTCPWithRTP(multiplexRTCPWithRTP) {
// Open a RTSP connection to the input stream, and send a "DESCRIBE" command.
// We'll use the SDP description in the response to set ourselves up.
fProxyRTSPClient
= (*fCreateNewProxyRTSPClientFunc)(*this, inputStreamURL, username, password,
tunnelOverHTTPPortNum,
verbosityLevel > 0 ? verbosityLevel-1 : verbosityLevel,
socketNumToServer);
fProxyRTSPClient->sendDESCRIBE();
}
ProxyServerMediaSession::~ProxyServerMediaSession() {
if (fVerbosityLevel > 0) {
envir() << *this << "::~ProxyServerMediaSession()\n";
}
// Begin by sending a "TEARDOWN" command (without checking for a response):
if (fProxyRTSPClient != NULL && fClientMediaSession != NULL) {
fProxyRTSPClient->sendTeardownCommand(*fClientMediaSession, NULL, fProxyRTSPClient->auth());
}
// Then delete our state:
Medium::close(fClientMediaSession);
Medium::close(fProxyRTSPClient);
Medium::close(fPresentationTimeSessionNormalizer);
}
char const* ProxyServerMediaSession::url() const {
return fProxyRTSPClient == NULL ? NULL : fProxyRTSPClient->url();
}
Groupsock* ProxyServerMediaSession::createGroupsock(struct in_addr const& addr, Port port) {
// Default implementation; may be redefined by subclasses:
return new Groupsock(envir(), addr, port, 255);
}
RTCPInstance* ProxyServerMediaSession
::createRTCP(Groupsock* RTCPgs, unsigned totSessionBW, /* in kbps */
unsigned char const* cname, RTPSink* sink) {
// Default implementation; may be redefined by subclasses:
return RTCPInstance::createNew(envir(), RTCPgs, totSessionBW, cname, sink, NULL/*we're a server*/);
}
Boolean ProxyServerMediaSession::allowProxyingForSubsession(MediaSubsession const& /*mss*/) {
// Default implementation
return True;
}
void ProxyServerMediaSession::continueAfterDESCRIBE(char const* sdpDescription) {
describeCompletedFlag = 1;
// Create a (client) "MediaSession" object from the stream's SDP description ("resultString"), then iterate through its
// "MediaSubsession" objects, to set up corresponding "ServerMediaSubsession" objects that we'll use to serve the stream's tracks.
do {
fClientMediaSession = MediaSession::createNew(envir(), sdpDescription);
if (fClientMediaSession == NULL) break;
MediaSubsessionIterator iter(*fClientMediaSession);
for (MediaSubsession* mss = iter.next(); mss != NULL; mss = iter.next()) {
if (!allowProxyingForSubsession(*mss)) continue;
ServerMediaSubsession* smss
= new ProxyServerMediaSubsession(*mss, fInitialPortNum, fMultiplexRTCPWithRTP);
addSubsession(smss);
if (fVerbosityLevel > 0) {
envir() << *this << " added new \"ProxyServerMediaSubsession\" for "
<< mss->protocolName() << "/" << mss->mediumName() << "/" << mss->codecName() << " track\n";
}
}
} while (0);
}
void ProxyServerMediaSession::resetDESCRIBEState() {
// Delete all of our "ProxyServerMediaSubsession"s; they'll get set up again once we get a response to the new "DESCRIBE".
if (fOurMediaServer != NULL) {
// First, close any client connections that may have already been set up:
fOurMediaServer->closeAllClientSessionsForServerMediaSession(this);
}
deleteAllSubsessions();
// Finally, delete the client "MediaSession" object that we had set up after receiving the response to the previous "DESCRIBE":
Medium::close(fClientMediaSession); fClientMediaSession = NULL;
}
///////// RTSP 'response handlers' //////////
static void continueAfterDESCRIBE(RTSPClient* rtspClient, int resultCode, char* resultString) {
char const* res;
if (resultCode == 0) {
// The "DESCRIBE" command succeeded, so "resultString" should be the stream's SDP description.
res = resultString;
} else {
// The "DESCRIBE" command failed.
res = NULL;
}
((ProxyRTSPClient*)rtspClient)->continueAfterDESCRIBE(res);
delete[] resultString;
}
static void continueAfterSETUP(RTSPClient* rtspClient, int resultCode, char* resultString) {
((ProxyRTSPClient*)rtspClient)->continueAfterSETUP(resultCode);
delete[] resultString;
}
static void continueAfterPLAY(RTSPClient* rtspClient, int resultCode, char* resultString) {
((ProxyRTSPClient*)rtspClient)->continueAfterPLAY(resultCode);
delete[] resultString;
}
static void continueAfterOPTIONS(RTSPClient* rtspClient, int resultCode, char* resultString) {
Boolean serverSupportsGetParameter = False;
if (resultCode == 0) {
// Note whether the server told us that it supports the "GET_PARAMETER" command:
serverSupportsGetParameter = RTSPOptionIsSupported("GET_PARAMETER", resultString);
}
((ProxyRTSPClient*)rtspClient)->continueAfterLivenessCommand(resultCode, serverSupportsGetParameter);
delete[] resultString;
}
#ifdef SEND_GET_PARAMETER_IF_SUPPORTED
static void continueAfterGET_PARAMETER(RTSPClient* rtspClient, int resultCode, char* resultString) {
((ProxyRTSPClient*)rtspClient)->continueAfterLivenessCommand(resultCode, True);
delete[] resultString;
}
#endif
////////// "ProxyRTSPClient" implementation /////////
UsageEnvironment& operator<<(UsageEnvironment& env, const ProxyRTSPClient& proxyRTSPClient) { // used for debugging
return env << "ProxyRTSPClient[" << proxyRTSPClient.url() << "]";
}
ProxyRTSPClient::ProxyRTSPClient(ProxyServerMediaSession& ourServerMediaSession, char const* rtspURL,
char const* username, char const* password,
portNumBits tunnelOverHTTPPortNum, int verbosityLevel, int socketNumToServer)
: RTSPClient(ourServerMediaSession.envir(), rtspURL, verbosityLevel, "ProxyRTSPClient",
tunnelOverHTTPPortNum == (portNumBits)(~0) ? 0 : tunnelOverHTTPPortNum, socketNumToServer),
fOurServerMediaSession(ourServerMediaSession), fOurURL(strDup(rtspURL)), fStreamRTPOverTCP(tunnelOverHTTPPortNum != 0),
fSetupQueueHead(NULL), fSetupQueueTail(NULL), fNumSetupsDone(0), fNextDESCRIBEDelay(1),
fServerSupportsGetParameter(False), fLastCommandWasPLAY(False), fDoneDESCRIBE(False),
fLivenessCommandTask(NULL), fDESCRIBECommandTask(NULL), fSubsessionTimerTask(NULL), fResetTask(NULL) {
if (username != NULL && password != NULL) {
fOurAuthenticator = new Authenticator(username, password);
} else {
fOurAuthenticator = NULL;
}
}
void ProxyRTSPClient::reset() {
envir().taskScheduler().unscheduleDelayedTask(fLivenessCommandTask);
envir().taskScheduler().unscheduleDelayedTask(fDESCRIBECommandTask);
envir().taskScheduler().unscheduleDelayedTask(fSubsessionTimerTask);
envir().taskScheduler().unscheduleDelayedTask(fResetTask);
fSetupQueueHead = fSetupQueueTail = NULL;
fNumSetupsDone = 0;
fNextDESCRIBEDelay = 1;
fLastCommandWasPLAY = False;
fDoneDESCRIBE = False;
RTSPClient::reset();
}
ProxyRTSPClient::~ProxyRTSPClient() {
reset();
delete fOurAuthenticator;
delete[] fOurURL;
}
int ProxyRTSPClient::connectToServer(int socketNum, portNumBits remotePortNum) {
int res;
res = RTSPClient::connectToServer(socketNum, remotePortNum);
if (res == 0 && fDoneDESCRIBE && fStreamRTPOverTCP) {
if (fVerbosityLevel > 0) {
envir() << "ProxyRTSPClient::connectToServer calling scheduleReset()\n";
}
scheduleReset();
}
return res;
}
void ProxyRTSPClient::continueAfterDESCRIBE(char const* sdpDescription) {
if (sdpDescription != NULL) {
fOurServerMediaSession.continueAfterDESCRIBE(sdpDescription);
// Unlike most RTSP streams, there might be a long delay between this "DESCRIBE" command (to the downstream server) and the
// subsequent "SETUP"/"PLAY" - which doesn't occur until the first time that a client requests the stream.
// To prevent the proxied connection (between us and the downstream server) from timing out, we send periodic 'liveness'
// ("OPTIONS" or "GET_PARAMETER") commands. (The usual RTCP liveness mechanism wouldn't work here, because RTCP packets
// don't get sent until after the "PLAY" command.)
scheduleLivenessCommand();
} else {
// The "DESCRIBE" command failed, most likely because the server or the stream is not yet running.
// Reschedule another "DESCRIBE" command to take place later:
scheduleDESCRIBECommand();
}
fDoneDESCRIBE = True;
}
void ProxyRTSPClient::continueAfterLivenessCommand(int resultCode, Boolean serverSupportsGetParameter) {
if (resultCode != 0) {
// The periodic 'liveness' command failed, suggesting that the back-end stream is no longer alive.
// We handle this by resetting our connection state with this server. Any current clients will be closed, but
// subsequent clients will cause new RTSP "SETUP"s and "PLAY"s to get done, restarting the stream.
// Then continue by sending more "DESCRIBE" commands, to try to restore the stream.
fServerSupportsGetParameter = False; // until we learn otherwise, in response to a future "OPTIONS" command
if (resultCode < 0) {
// The 'liveness' command failed without getting a response from the server (otherwise "resultCode" would have been > 0).
// This suggests that the RTSP connection itself has failed. Print this error code, in case it's useful for debugging:
if (fVerbosityLevel > 0) {
envir() << *this << ": lost connection to server ('errno': " << -resultCode << "). Scheduling reset...\n";
}
}
scheduleReset();
return;
}
fServerSupportsGetParameter = serverSupportsGetParameter;
// Schedule the next 'liveness' command (i.e., to tell the back-end server that we're still alive):
scheduleLivenessCommand();
}
#define SUBSESSION_TIMEOUT_SECONDS 5 // how many seconds to wait for the last track's "SETUP" to be done (note below)
void ProxyRTSPClient::continueAfterSETUP(int resultCode) {
if (resultCode != 0) {
// The "SETUP" command failed, so arrange to reset the state. (We don't do this now, because it deletes the
// "ProxyServerMediaSubsession", and we can't do that during "ProxyServerMediaSubsession::createNewStreamSource()".)
scheduleReset();
return;
}
if (fVerbosityLevel > 0) {
envir() << *this << "::continueAfterSETUP(): head codec: " << fSetupQueueHead->codecName()
<< "; numSubsessions " << fSetupQueueHead->fParentSession->numSubsessions() << "\n\tqueue:";
for (ProxyServerMediaSubsession* p = fSetupQueueHead; p != NULL; p = p->fNext) {
envir() << "\t" << p->codecName();
}
envir() << "\n";
}
envir().taskScheduler().unscheduleDelayedTask(fSubsessionTimerTask); // in case it had been set
// Dequeue the first "ProxyServerMediaSubsession" from our 'SETUP queue'. It will be the one for which this "SETUP" was done:
ProxyServerMediaSubsession* smss = fSetupQueueHead; // Assert: != NULL
fSetupQueueHead = fSetupQueueHead->fNext;
if (fSetupQueueHead == NULL) fSetupQueueTail = NULL;
if (fSetupQueueHead != NULL) {
// There are still entries in the queue, for tracks for which we have still to do a "SETUP".
// "SETUP" the first of these now:
sendSetupCommand(fSetupQueueHead->fClientMediaSubsession, ::continueAfterSETUP,
False, fStreamRTPOverTCP, False, fOurAuthenticator);
++fNumSetupsDone;
fSetupQueueHead->fHaveSetupStream = True;
} else {
if (fNumSetupsDone >= smss->fParentSession->numSubsessions()) {
// We've now finished setting up each of our subsessions (i.e., 'tracks').
// Continue by sending a "PLAY" command (an 'aggregate' "PLAY" command, on the whole session):
sendPlayCommand(smss->fClientMediaSubsession.parentSession(), ::continueAfterPLAY, -1.0f, -1.0f, 1.0f, fOurAuthenticator);
// the "-1.0f" "start" parameter causes the "PLAY" to be sent without a "Range:" header, in case we'd already done
// a "PLAY" before (as a result of a 'subsession timeout' (note below))
fLastCommandWasPLAY = True;
} else {
// Some of this session's subsessions (i.e., 'tracks') remain to be "SETUP". They might get "SETUP" very soon, but it's
// also possible - if the remote client chose to play only some of the session's tracks - that they might not.
// To allow for this possibility, we set a timer. If the timer expires without the remaining subsessions getting "SETUP",
// then we send a "PLAY" command anyway:
fSubsessionTimerTask
= envir().taskScheduler().scheduleDelayedTask(SUBSESSION_TIMEOUT_SECONDS*MILLION, (TaskFunc*)subsessionTimeout, this);
}
}
}
void ProxyRTSPClient::continueAfterPLAY(int resultCode) {
if (resultCode != 0) {
// The "PLAY" command failed, so arrange to reset the state. (We don't do this now, because it deletes the
// "ProxyServerMediaSubsession", and we can't do that during "ProxyServerMediaSubsession::createNewStreamSource()".)
scheduleReset();
return;
}
}
void ProxyRTSPClient::scheduleLivenessCommand() {
// Delay a random time before sending another 'liveness' command.
unsigned delayMax = sessionTimeoutParameter(); // if the server specified a maximum time between 'liveness' probes, then use that
if (delayMax == 0) {
delayMax = 60;
}
// Choose a random time from [delayMax/2,delayMax-1) seconds:
unsigned const us_1stPart = delayMax*500000;
unsigned uSecondsToDelay;
if (us_1stPart <= 1000000) {
uSecondsToDelay = us_1stPart;
} else {
unsigned const us_2ndPart = us_1stPart-1000000;
uSecondsToDelay = us_1stPart + (us_2ndPart*our_random())%us_2ndPart;
}
fLivenessCommandTask = envir().taskScheduler().scheduleDelayedTask(uSecondsToDelay, sendLivenessCommand, this);
}
void ProxyRTSPClient::sendLivenessCommand(void* clientData) {
ProxyRTSPClient* rtspClient = (ProxyRTSPClient*)clientData;
rtspClient->fLivenessCommandTask = NULL;
// Note. By default, we do not send "GET_PARAMETER" as our 'liveness notification' command, even if the server previously
// indicated (in its response to our earlier "OPTIONS" command) that it supported "GET_PARAMETER". This is because
// "GET_PARAMETER" crashes some camera servers (even though they claimed to support "GET_PARAMETER").
#ifdef SEND_GET_PARAMETER_IF_SUPPORTED
MediaSession* sess = rtspClient->fOurServerMediaSession.fClientMediaSession;
if (rtspClient->fServerSupportsGetParameter && rtspClient->fNumSetupsDone > 0 && sess != NULL) {
rtspClient->sendGetParameterCommand(*sess, ::continueAfterGET_PARAMETER, "", rtspClient->auth());
} else {
#endif
rtspClient->sendOptionsCommand(::continueAfterOPTIONS, rtspClient->auth());
#ifdef SEND_GET_PARAMETER_IF_SUPPORTED
}
#endif
}
void ProxyRTSPClient::scheduleReset() {
if (fVerbosityLevel > 0) {
envir() << "ProxyRTSPClient::scheduleReset\n";
}
envir().taskScheduler().rescheduleDelayedTask(fResetTask, 0, doReset, this);
}
void ProxyRTSPClient::doReset() {
fResetTask = NULL;
if (fVerbosityLevel > 0) {
envir() << *this << "::doReset\n";
}
reset();
fOurServerMediaSession.resetDESCRIBEState();
setBaseURL(fOurURL); // because we'll be sending an initial "DESCRIBE" all over again
sendDESCRIBE();
}
void ProxyRTSPClient::doReset(void* clientData) {
ProxyRTSPClient* rtspClient = (ProxyRTSPClient*)clientData;
rtspClient->doReset();
}
void ProxyRTSPClient::scheduleDESCRIBECommand() {
// Delay 1s, 2s, 4s, 8s ... 256s until sending the next "DESCRIBE". Then, keep delaying a random time from [256..511] seconds:
unsigned secondsToDelay;
if (fNextDESCRIBEDelay <= 256) {
secondsToDelay = fNextDESCRIBEDelay;
fNextDESCRIBEDelay *= 2;
} else {
secondsToDelay = 256 + (our_random()&0xFF); // [256..511] seconds
}
if (fVerbosityLevel > 0) {
envir() << *this << ": RTSP \"DESCRIBE\" command failed; trying again in " << secondsToDelay << " seconds\n";
}
fDESCRIBECommandTask = envir().taskScheduler().scheduleDelayedTask(secondsToDelay*MILLION, sendDESCRIBE, this);
}
void ProxyRTSPClient::sendDESCRIBE(void* clientData) {
ProxyRTSPClient* rtspClient = (ProxyRTSPClient*)clientData;
if (rtspClient != NULL) {
rtspClient->fDESCRIBECommandTask = NULL;
rtspClient->sendDESCRIBE();
}
}
void ProxyRTSPClient::sendDESCRIBE() {
sendDescribeCommand(::continueAfterDESCRIBE, auth());
}
void ProxyRTSPClient::subsessionTimeout(void* clientData) {
((ProxyRTSPClient*)clientData)->handleSubsessionTimeout();
}
void ProxyRTSPClient::handleSubsessionTimeout() {
fSubsessionTimerTask = NULL;
// We still have one or more subsessions ('tracks') left to "SETUP". But we can't wait any longer for them. Send a "PLAY" now:
MediaSession* sess = fOurServerMediaSession.fClientMediaSession;
if (sess != NULL) sendPlayCommand(*sess, ::continueAfterPLAY, -1.0f, -1.0f, 1.0f, fOurAuthenticator);
fLastCommandWasPLAY = True;
}
//////// "ProxyServerMediaSubsession" implementation //////////
ProxyServerMediaSubsession
::ProxyServerMediaSubsession(MediaSubsession& mediaSubsession,
portNumBits initialPortNum, Boolean multiplexRTCPWithRTP)
: OnDemandServerMediaSubsession(mediaSubsession.parentSession().envir(), True/*reuseFirstSource*/,
initialPortNum, multiplexRTCPWithRTP),
fClientMediaSubsession(mediaSubsession), fCodecName(strDup(mediaSubsession.codecName())),
fNext(NULL), fHaveSetupStream(False) {
}
UsageEnvironment& operator<<(UsageEnvironment& env, const ProxyServerMediaSubsession& psmss) { // used for debugging
return env << "ProxyServerMediaSubsession[" << psmss.url() << "," << psmss.codecName() << "]";
}
ProxyServerMediaSubsession::~ProxyServerMediaSubsession() {
if (verbosityLevel() > 0) {
envir() << *this << "::~ProxyServerMediaSubsession()\n";
}
delete[] (char*)fCodecName;
}
FramedSource* ProxyServerMediaSubsession::createNewStreamSource(unsigned clientSessionId, unsigned& estBitrate) {
ProxyServerMediaSession* const sms = (ProxyServerMediaSession*)fParentSession;
if (verbosityLevel() > 0) {
envir() << *this << "::createNewStreamSource(session id " << clientSessionId << ")\n";
}
// If we haven't yet created a data source from our 'media subsession' object, initiate() it to do so:
if (fClientMediaSubsession.readSource() == NULL) {
if (sms->fTranscodingTable == NULL || !sms->fTranscodingTable->weWillTranscode("audio", "MPA-ROBUST")) fClientMediaSubsession.receiveRawMP3ADUs(); // hack for proxying MPA-ROBUST streams
if (sms->fTranscodingTable == NULL || !sms->fTranscodingTable->weWillTranscode("video", "JPEG")) fClientMediaSubsession.receiveRawJPEGFrames(); // hack for proxying JPEG/RTP streams.
fClientMediaSubsession.initiate();
if (verbosityLevel() > 0) {
envir() << "\tInitiated: " << *this << "\n";
}
if (fClientMediaSubsession.readSource() != NULL) {
// First, check whether we have defined a 'transcoder' filter to be used with this codec:
if (sms->fTranscodingTable != NULL) {
char* outputCodecName;
FramedFilter* transcoder
= sms->fTranscodingTable->lookupTranscoder(fClientMediaSubsession, outputCodecName);
if (transcoder != NULL) {
fClientMediaSubsession.addFilter(transcoder);
delete[] (char*)fCodecName; fCodecName = outputCodecName;
}
}
// Then, add to the front of all data sources a filter that will 'normalize' their frames'
// presentation times, before the frames get re-transmitted by our server:
FramedFilter* normalizerFilter = sms->fPresentationTimeSessionNormalizer
->createNewPresentationTimeSubsessionNormalizer(fClientMediaSubsession.readSource(),
fClientMediaSubsession.rtpSource(),
fCodecName);
fClientMediaSubsession.addFilter(normalizerFilter);
// Some data sources require a 'framer' object to be added, before they can be fed into
// a "RTPSink". Adjust for this now:
if (strcmp(fCodecName, "H264") == 0) {
fClientMediaSubsession.addFilter(H264VideoStreamDiscreteFramer
::createNew(envir(), fClientMediaSubsession.readSource()));
} else if (strcmp(fCodecName, "H265") == 0) {
fClientMediaSubsession.addFilter(H265VideoStreamDiscreteFramer
::createNew(envir(), fClientMediaSubsession.readSource()));
} else if (strcmp(fCodecName, "MP4V-ES") == 0) {
fClientMediaSubsession.addFilter(MPEG4VideoStreamDiscreteFramer
::createNew(envir(), fClientMediaSubsession.readSource(),
True/* leave PTs unmodified*/));
} else if (strcmp(fCodecName, "MPV") == 0) {
fClientMediaSubsession.addFilter(MPEG1or2VideoStreamDiscreteFramer
::createNew(envir(), fClientMediaSubsession.readSource(),
False, 5.0, True/* leave PTs unmodified*/));
} else if (strcmp(fCodecName, "DV") == 0) {
fClientMediaSubsession.addFilter(DVVideoStreamFramer
::createNew(envir(), fClientMediaSubsession.readSource(),
False, True/* leave PTs unmodified*/));
}
}
if (fClientMediaSubsession.rtcpInstance() != NULL) {
fClientMediaSubsession.rtcpInstance()->setByeHandler(subsessionByeHandler, this);
}
}
ProxyRTSPClient* const proxyRTSPClient = sms->fProxyRTSPClient;
if (clientSessionId != 0) {
// We're being called as a result of implementing a RTSP "SETUP".
if (!fHaveSetupStream) {
// This is our first "SETUP". Send RTSP "SETUP" and later "PLAY" commands to the proxied server, to start streaming:
// (Before sending "SETUP", enqueue ourselves on the "RTSPClient"s 'SETUP queue', so we'll be able to get the correct
// "ProxyServerMediaSubsession" to handle the response. (Note that responses come back in the same order as requests.))
Boolean queueWasEmpty = proxyRTSPClient->fSetupQueueHead == NULL;
if (queueWasEmpty) {
proxyRTSPClient->fSetupQueueHead = this;
proxyRTSPClient->fSetupQueueTail = this;
} else {
// Add ourself to the "RTSPClient"s 'SETUP queue' (if we're not already on it):
ProxyServerMediaSubsession* psms;
for (psms = proxyRTSPClient->fSetupQueueHead; psms != NULL; psms = psms->fNext) {
if (psms == this) break;
}
if (psms == NULL) {
proxyRTSPClient->fSetupQueueTail->fNext = this;
proxyRTSPClient->fSetupQueueTail = this;
}
}
// Hack: If there's already a pending "SETUP" request, don't send this track's "SETUP" right away, because
// the server might not properly handle 'pipelined' requests. Instead, wait until after previous "SETUP" responses come back.
if (queueWasEmpty) {
proxyRTSPClient->sendSetupCommand(fClientMediaSubsession, ::continueAfterSETUP,
False, proxyRTSPClient->fStreamRTPOverTCP, False, proxyRTSPClient->auth());
++proxyRTSPClient->fNumSetupsDone;
fHaveSetupStream = True;
}
} else {
// This is a "SETUP" from a new client. We know that there are no other currently active clients (otherwise we wouldn't
// have been called here), so we know that the substream was previously "PAUSE"d. Send "PLAY" downstream once again,
// to resume the stream:
if (!proxyRTSPClient->fLastCommandWasPLAY) { // so that we send only one "PLAY"; not one for each subsession
proxyRTSPClient->sendPlayCommand(fClientMediaSubsession.parentSession(), ::continueAfterPLAY, -1.0f/*resume from previous point*/,
-1.0f, 1.0f, proxyRTSPClient->auth());
proxyRTSPClient->fLastCommandWasPLAY = True;
}
}
}
estBitrate = fClientMediaSubsession.bandwidth();
if (estBitrate == 0) estBitrate = 50; // kbps, estimate
return fClientMediaSubsession.readSource();
}
void ProxyServerMediaSubsession::closeStreamSource(FramedSource* inputSource) {
if (verbosityLevel() > 0) {
envir() << *this << "::closeStreamSource()\n";
}
// Because there's only one input source for this 'subsession' (regardless of how many downstream clients are proxying it),
// we don't close the input source here. (Instead, we wait until *this* object gets deleted.)
// However, because (as evidenced by this function having been called) we no longer have any clients accessing the stream,
// then we "PAUSE" the downstream proxied stream, until a new client arrives:
if (fHaveSetupStream) {
ProxyServerMediaSession* const sms = (ProxyServerMediaSession*)fParentSession;
ProxyRTSPClient* const proxyRTSPClient = sms->fProxyRTSPClient;
if (proxyRTSPClient->fLastCommandWasPLAY) { // so that we send only one "PAUSE"; not one for each subsession
if (fParentSession->referenceCount() > 1) {
// There are other client(s) still streaming other subsessions of this stream.
// Therefore, we don't send a "PAUSE" for the whole stream, but only for the sub-stream:
proxyRTSPClient->sendPauseCommand(fClientMediaSubsession, NULL, proxyRTSPClient->auth());
} else {
// Normal case: There are no other client still streaming (parts of) this stream.
// Send a "PAUSE" for the whole stream.
proxyRTSPClient->sendPauseCommand(fClientMediaSubsession.parentSession(), NULL, proxyRTSPClient->auth());
proxyRTSPClient->fLastCommandWasPLAY = False;
}
}
}
}
RTPSink* ProxyServerMediaSubsession
::createNewRTPSink(Groupsock* rtpGroupsock, unsigned char rtpPayloadTypeIfDynamic, FramedSource* inputSource) {
if (verbosityLevel() > 0) {
envir() << *this << "::createNewRTPSink()\n";
}
// Create (and return) the appropriate "RTPSink" object for our codec:
// (Note: The configuration string might not be correct if a transcoder is used. FIX!) #####
RTPSink* newSink;
if (strcmp(fCodecName, "AC3") == 0 || strcmp(fCodecName, "EAC3") == 0) {
newSink = AC3AudioRTPSink::createNew(envir(), rtpGroupsock, rtpPayloadTypeIfDynamic,
fClientMediaSubsession.rtpTimestampFrequency());
#if 0 // This code does not work; do *not* enable it:
} else if (strcmp(fCodecName, "AMR") == 0 || strcmp(fCodecName, "AMR-WB") == 0) {
Boolean isWideband = strcmp(fCodecName, "AMR-WB") == 0;
newSink = AMRAudioRTPSink::createNew(envir(), rtpGroupsock, rtpPayloadTypeIfDynamic,
isWideband, fClientMediaSubsession.numChannels());
#endif
} else if (strcmp(fCodecName, "DV") == 0) {
newSink = DVVideoRTPSink::createNew(envir(), rtpGroupsock, rtpPayloadTypeIfDynamic);
} else if (strcmp(fCodecName, "GSM") == 0) {
newSink = GSMAudioRTPSink::createNew(envir(), rtpGroupsock);
} else if (strcmp(fCodecName, "H263-1998") == 0 || strcmp(fCodecName, "H263-2000") == 0) {
newSink = H263plusVideoRTPSink::createNew(envir(), rtpGroupsock, rtpPayloadTypeIfDynamic,
fClientMediaSubsession.rtpTimestampFrequency());
} else if (strcmp(fCodecName, "H264") == 0) {
newSink = H264VideoRTPSink::createNew(envir(), rtpGroupsock, rtpPayloadTypeIfDynamic,
fClientMediaSubsession.fmtp_spropparametersets());
} else if (strcmp(fCodecName, "H265") == 0) {
newSink = H265VideoRTPSink::createNew(envir(), rtpGroupsock, rtpPayloadTypeIfDynamic,
fClientMediaSubsession.fmtp_spropvps(),
fClientMediaSubsession.fmtp_spropsps(),
fClientMediaSubsession.fmtp_sproppps());
} else if (strcmp(fCodecName, "JPEG") == 0) {
newSink = SimpleRTPSink::createNew(envir(), rtpGroupsock, 26, 90000, "video", "JPEG",
1/*numChannels*/, False/*allowMultipleFramesPerPacket*/, False/*doNormalMBitRule*/);
} else if (strcmp(fCodecName, "MP4A-LATM") == 0) {
newSink = MPEG4LATMAudioRTPSink::createNew(envir(), rtpGroupsock, rtpPayloadTypeIfDynamic,
fClientMediaSubsession.rtpTimestampFrequency(),
fClientMediaSubsession.fmtp_config(),
fClientMediaSubsession.numChannels());
} else if (strcmp(fCodecName, "MP4V-ES") == 0) {
newSink = MPEG4ESVideoRTPSink::createNew(envir(), rtpGroupsock, rtpPayloadTypeIfDynamic,
fClientMediaSubsession.rtpTimestampFrequency(),
fClientMediaSubsession.attrVal_unsigned("profile-level-id"),
fClientMediaSubsession.fmtp_config());
} else if (strcmp(fCodecName, "MPA") == 0) {
newSink = MPEG1or2AudioRTPSink::createNew(envir(), rtpGroupsock);
} else if (strcmp(fCodecName, "MPA-ROBUST") == 0) {
newSink = MP3ADURTPSink::createNew(envir(), rtpGroupsock, rtpPayloadTypeIfDynamic);
} else if (strcmp(fCodecName, "MPEG4-GENERIC") == 0) {
newSink = MPEG4GenericRTPSink::createNew(envir(), rtpGroupsock,
rtpPayloadTypeIfDynamic, fClientMediaSubsession.rtpTimestampFrequency(),
fClientMediaSubsession.mediumName(),
fClientMediaSubsession.attrVal_str("mode"),
fClientMediaSubsession.fmtp_config(), fClientMediaSubsession.numChannels());
} else if (strcmp(fCodecName, "MPV") == 0) {
newSink = MPEG1or2VideoRTPSink::createNew(envir(), rtpGroupsock);
} else if (strcmp(fCodecName, "OPUS") == 0) {
newSink = SimpleRTPSink::createNew(envir(), rtpGroupsock, rtpPayloadTypeIfDynamic,
48000, "audio", "OPUS", 2, False/*only 1 Opus 'packet' in each RTP packet*/);
} else if (strcmp(fCodecName, "T140") == 0) {
newSink = T140TextRTPSink::createNew(envir(), rtpGroupsock, rtpPayloadTypeIfDynamic);
} else if (strcmp(fCodecName, "THEORA") == 0) {
newSink = TheoraVideoRTPSink::createNew(envir(), rtpGroupsock, rtpPayloadTypeIfDynamic,
fClientMediaSubsession.fmtp_config());
} else if (strcmp(fCodecName, "VORBIS") == 0) {
newSink = VorbisAudioRTPSink::createNew(envir(), rtpGroupsock, rtpPayloadTypeIfDynamic,
fClientMediaSubsession.rtpTimestampFrequency(), fClientMediaSubsession.numChannels(),
fClientMediaSubsession.fmtp_config());
} else if (strcmp(fCodecName, "VP8") == 0) {
newSink = VP8VideoRTPSink::createNew(envir(), rtpGroupsock, rtpPayloadTypeIfDynamic);
} else if (strcmp(fCodecName, "VP9") == 0) {
newSink = VP9VideoRTPSink::createNew(envir(), rtpGroupsock, rtpPayloadTypeIfDynamic);
} else if (strcmp(fCodecName, "AMR") == 0 || strcmp(fCodecName, "AMR-WB") == 0) {
// Proxying of these codecs is currently *not* supported, because the data received by the "RTPSource" object is not in a
// form that can be fed directly into a corresponding "RTPSink" object.
if (verbosityLevel() > 0) {
envir() << "\treturns NULL (because we currently don't support the proxying of \""
<< fClientMediaSubsession.mediumName() << "/" << fCodecName << "\" streams)\n";
}
return NULL;
} else if (strcmp(fCodecName, "QCELP") == 0 ||
strcmp(fCodecName, "H261") == 0 ||
strcmp(fCodecName, "H263-1998") == 0 || strcmp(fCodecName, "H263-2000") == 0 ||
strcmp(fCodecName, "X-QT") == 0 || strcmp(fCodecName, "X-QUICKTIME") == 0) {
// This codec requires a specialized RTP payload format; however, we don't yet have an appropriate "RTPSink" subclass for it:
if (verbosityLevel() > 0) {
envir() << "\treturns NULL (because we don't have a \"RTPSink\" subclass for this RTP payload format)\n";
}
return NULL;
} else {
// This codec is assumed to have a simple RTP payload format that can be implemented just with a "SimpleRTPSink":
Boolean allowMultipleFramesPerPacket = True; // by default
Boolean doNormalMBitRule = True; // by default
// Some codecs change the above default parameters:
if (strcmp(fCodecName, "MP2T") == 0) {
doNormalMBitRule = False; // no RTP 'M' bit
}
newSink = SimpleRTPSink::createNew(envir(), rtpGroupsock,
rtpPayloadTypeIfDynamic, fClientMediaSubsession.rtpTimestampFrequency(),
fClientMediaSubsession.mediumName(), fCodecName,
fClientMediaSubsession.numChannels(), allowMultipleFramesPerPacket, doNormalMBitRule);
}
// Because our relayed frames' presentation times are inaccurate until the input frames have been RTCP-synchronized,
// we temporarily disable RTCP "SR" reports for this "RTPSink" object:
newSink->enableRTCPReports() = False;
// Also tell our "PresentationTimeSubsessionNormalizer" object about the "RTPSink", so it can enable RTCP "SR" reports later:
PresentationTimeSubsessionNormalizer* ssNormalizer;
if (strcmp(fCodecName, "H264") == 0 ||
strcmp(fCodecName, "H265") == 0 ||
strcmp(fCodecName, "MP4V-ES") == 0 ||
strcmp(fCodecName, "MPV") == 0 ||
strcmp(fCodecName, "DV") == 0) {
// There was a separate 'framer' object in front of the "PresentationTimeSubsessionNormalizer", so go back one object to get it:
ssNormalizer = (PresentationTimeSubsessionNormalizer*)(((FramedFilter*)inputSource)->inputSource());
} else {
ssNormalizer = (PresentationTimeSubsessionNormalizer*)inputSource;
}
ssNormalizer->setRTPSink(newSink);
return newSink;
}
Groupsock* ProxyServerMediaSubsession::createGroupsock(struct in_addr const& addr, Port port) {
ProxyServerMediaSession* parentSession = (ProxyServerMediaSession*)fParentSession;
return parentSession->createGroupsock(addr, port);
}
RTCPInstance* ProxyServerMediaSubsession
::createRTCP(Groupsock* RTCPgs, unsigned totSessionBW, /* in kbps */
unsigned char const* cname, RTPSink* sink) {
ProxyServerMediaSession* parentSession = (ProxyServerMediaSession*)fParentSession;
return parentSession->createRTCP(RTCPgs, totSessionBW, cname, sink);
}
void ProxyServerMediaSubsession::subsessionByeHandler(void* clientData) {
((ProxyServerMediaSubsession*)clientData)->subsessionByeHandler();
}
void ProxyServerMediaSubsession::subsessionByeHandler() {
if (verbosityLevel() > 0) {
envir() << *this << ": received RTCP \"BYE\". (The back-end stream has ended.)\n";
}
// This "BYE" signals that our input source has (effectively) closed, so pass this onto the front-end clients:
fHaveSetupStream = False; // hack to stop "PAUSE" getting sent by:
if (fClientMediaSubsession.readSource() != NULL) {
fClientMediaSubsession.readSource()->handleClosure();
}
// And then treat this as if we had lost connection to the back-end server,
// and can reestablish streaming from it only by sending another "DESCRIBE":
ProxyServerMediaSession* const sms = (ProxyServerMediaSession*)fParentSession;
ProxyRTSPClient* const proxyRTSPClient = sms->fProxyRTSPClient;
proxyRTSPClient->scheduleReset();
}
////////// PresentationTimeSessionNormalizer and PresentationTimeSubsessionNormalizer implementations //////////
// PresentationTimeSessionNormalizer:
PresentationTimeSessionNormalizer::PresentationTimeSessionNormalizer(UsageEnvironment& env)
: Medium(env),
fSubsessionNormalizers(NULL), fMasterSSNormalizer(NULL) {
}
PresentationTimeSessionNormalizer::~PresentationTimeSessionNormalizer() {
while (fSubsessionNormalizers != NULL) {
Medium::close(fSubsessionNormalizers);
}
}
PresentationTimeSubsessionNormalizer* PresentationTimeSessionNormalizer
::createNewPresentationTimeSubsessionNormalizer(FramedSource* inputSource, RTPSource* rtpSource,
char const* codecName) {
fSubsessionNormalizers
= new PresentationTimeSubsessionNormalizer(*this, inputSource, rtpSource, codecName, fSubsessionNormalizers);
return fSubsessionNormalizers;
}
void PresentationTimeSessionNormalizer
::normalizePresentationTime(PresentationTimeSubsessionNormalizer* ssNormalizer,
struct timeval& toPT, struct timeval const& fromPT) {
Boolean const hasBeenSynced = ssNormalizer->fRTPSource->hasBeenSynchronizedUsingRTCP();
if (!hasBeenSynced) {
// If "fromPT" has not yet been RTCP-synchronized, then it was generated by our own receiving code, and thus
// is already aligned with 'wall-clock' time. Just copy it 'as is' to "toPT":
toPT = fromPT;
} else {
if (fMasterSSNormalizer == NULL) {
// Make "ssNormalizer" the 'master' subsession - meaning that its presentation time is adjusted to align with 'wall clock'
// time, and the presentation times of other subsessions (if any) are adjusted to retain their relative separation with
// those of the master:
fMasterSSNormalizer = ssNormalizer;
struct timeval timeNow;
gettimeofday(&timeNow, NULL);
// Compute: fPTAdjustment = timeNow - fromPT
fPTAdjustment.tv_sec = timeNow.tv_sec - fromPT.tv_sec;
fPTAdjustment.tv_usec = timeNow.tv_usec - fromPT.tv_usec;
// Note: It's OK if one or both of these fields underflows; the result still works out OK later.
}
// Compute a normalized presentation time: toPT = fromPT + fPTAdjustment
toPT.tv_sec = fromPT.tv_sec + fPTAdjustment.tv_sec - 1;
toPT.tv_usec = fromPT.tv_usec + fPTAdjustment.tv_usec + MILLION;
while (toPT.tv_usec > MILLION) { ++toPT.tv_sec; toPT.tv_usec -= MILLION; }
// Because "ssNormalizer"s relayed presentation times are accurate from now on, enable RTCP "SR" reports for its "RTPSink":
RTPSink* const rtpSink = ssNormalizer->fRTPSink;
if (rtpSink != NULL) { // sanity check; should always be true
rtpSink->enableRTCPReports() = True;
}
}
}
void PresentationTimeSessionNormalizer
::removePresentationTimeSubsessionNormalizer(PresentationTimeSubsessionNormalizer* ssNormalizer) {
// Unlink "ssNormalizer" from the linked list (starting with "fSubsessionNormalizers"):
if (fSubsessionNormalizers == ssNormalizer) {
fSubsessionNormalizers = fSubsessionNormalizers->fNext;
} else {
PresentationTimeSubsessionNormalizer** ssPtrPtr = &(fSubsessionNormalizers->fNext);
while (*ssPtrPtr != ssNormalizer) ssPtrPtr = &((*ssPtrPtr)->fNext);
*ssPtrPtr = (*ssPtrPtr)->fNext;
}
}
// PresentationTimeSubsessionNormalizer:
PresentationTimeSubsessionNormalizer
::PresentationTimeSubsessionNormalizer(PresentationTimeSessionNormalizer& parent, FramedSource* inputSource, RTPSource* rtpSource,
char const* codecName, PresentationTimeSubsessionNormalizer* next)
: FramedFilter(parent.envir(), inputSource),
fParent(parent), fRTPSource(rtpSource), fRTPSink(NULL), fCodecName(codecName), fNext(next) {
}
PresentationTimeSubsessionNormalizer::~PresentationTimeSubsessionNormalizer() {
fParent.removePresentationTimeSubsessionNormalizer(this);
}
void PresentationTimeSubsessionNormalizer::afterGettingFrame(void* clientData, unsigned frameSize,
unsigned numTruncatedBytes,
struct timeval presentationTime,
unsigned durationInMicroseconds) {
((PresentationTimeSubsessionNormalizer*)clientData)
->afterGettingFrame(frameSize, numTruncatedBytes, presentationTime, durationInMicroseconds);
}
void PresentationTimeSubsessionNormalizer::afterGettingFrame(unsigned frameSize,
unsigned numTruncatedBytes,
struct timeval presentationTime,
unsigned durationInMicroseconds) {
// This filter is implemented by passing all frames through unchanged, except that "fPresentationTime" is changed:
fFrameSize = frameSize;
fNumTruncatedBytes = numTruncatedBytes;
fDurationInMicroseconds = durationInMicroseconds;
fParent.normalizePresentationTime(this, fPresentationTime, presentationTime);
// Hack for JPEG/RTP proxying. Because we're proxying JPEG by just copying the raw JPEG/RTP payloads, without interpreting them,
// we need to also 'copy' the RTP 'M' (marker) bit from the "RTPSource" to the "RTPSink":
if (fRTPSource->curPacketMarkerBit() && strcmp(fCodecName, "JPEG") == 0) ((SimpleRTPSink*)fRTPSink)->setMBitOnNextPacket();
// Complete delivery:
FramedSource::afterGetting(this);
}
void PresentationTimeSubsessionNormalizer::doGetNextFrame() {
fInputSource->getNextFrame(fTo, fMaxSize, afterGettingFrame, this, FramedSource::handleClosure, this);
}