/********** This library is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation; either version 2.1 of the License, or (at your option) any later version. (See .) This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. You should have received a copy of the GNU Lesser General Public License along with this library; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA **********/ // "liveMedia" // Copyright (c) 1996-2016 Live Networks, Inc. All rights reserved. // A subclass of "ServerMediaSession" that can be used to create a (unicast) RTSP servers that acts as a 'proxy' for // another (unicast or multicast) RTSP/RTP stream. // Implementation #include "liveMedia.hh" #include "RTSPCommon.hh" #include "GroupsockHelper.hh" // for "our_random()" #ifndef MILLION #define MILLION 1000000 #endif // A "OnDemandServerMediaSubsession" subclass, used to implement a unicast RTSP server that's proxying another RTSP stream: class ProxyServerMediaSubsession: public OnDemandServerMediaSubsession { public: ProxyServerMediaSubsession(MediaSubsession& mediaSubsession, portNumBits initialPortNum, Boolean multiplexRTCPWithRTP); virtual ~ProxyServerMediaSubsession(); char const* codecName() const { return fCodecName; } char const* url() const { return ((ProxyServerMediaSession*)fParentSession)->url(); } private: // redefined virtual functions virtual FramedSource* createNewStreamSource(unsigned clientSessionId, unsigned& estBitrate); virtual void closeStreamSource(FramedSource *inputSource); virtual RTPSink* createNewRTPSink(Groupsock* rtpGroupsock, unsigned char rtpPayloadTypeIfDynamic, FramedSource* inputSource); virtual Groupsock* createGroupsock(struct in_addr const& addr, Port port); virtual RTCPInstance* createRTCP(Groupsock* RTCPgs, unsigned totSessionBW, /* in kbps */ unsigned char const* cname, RTPSink* sink); private: static void subsessionByeHandler(void* clientData); void subsessionByeHandler(); int verbosityLevel() const { return ((ProxyServerMediaSession*)fParentSession)->fVerbosityLevel; } private: friend class ProxyRTSPClient; MediaSubsession& fClientMediaSubsession; // the 'client' media subsession object that corresponds to this 'server' media subsession char const* fCodecName; // copied from "fClientMediaSubsession" once it's been set up ProxyServerMediaSubsession* fNext; // used when we're part of a queue Boolean fHaveSetupStream; }; ////////// ProxyServerMediaSession implementation ////////// UsageEnvironment& operator<<(UsageEnvironment& env, const ProxyServerMediaSession& psms) { // used for debugging return env << "ProxyServerMediaSession[" << psms.url() << "]"; } ProxyRTSPClient* defaultCreateNewProxyRTSPClientFunc(ProxyServerMediaSession& ourServerMediaSession, char const* rtspURL, char const* username, char const* password, portNumBits tunnelOverHTTPPortNum, int verbosityLevel, int socketNumToServer) { return new ProxyRTSPClient(ourServerMediaSession, rtspURL, username, password, tunnelOverHTTPPortNum, verbosityLevel, socketNumToServer); } ProxyServerMediaSession* ProxyServerMediaSession ::createNew(UsageEnvironment& env, GenericMediaServer* ourMediaServer, char const* inputStreamURL, char const* streamName, char const* username, char const* password, portNumBits tunnelOverHTTPPortNum, int verbosityLevel, int socketNumToServer, MediaTranscodingTable* transcodingTable) { return new ProxyServerMediaSession(env, ourMediaServer, inputStreamURL, streamName, username, password, tunnelOverHTTPPortNum, verbosityLevel, socketNumToServer, transcodingTable); } ProxyServerMediaSession ::ProxyServerMediaSession(UsageEnvironment& env, GenericMediaServer* ourMediaServer, char const* inputStreamURL, char const* streamName, char const* username, char const* password, portNumBits tunnelOverHTTPPortNum, int verbosityLevel, int socketNumToServer, MediaTranscodingTable* transcodingTable, createNewProxyRTSPClientFunc* ourCreateNewProxyRTSPClientFunc, portNumBits initialPortNum, Boolean multiplexRTCPWithRTP) : ServerMediaSession(env, streamName, NULL, NULL, False, NULL), describeCompletedFlag(0), fOurMediaServer(ourMediaServer), fClientMediaSession(NULL), fVerbosityLevel(verbosityLevel), fPresentationTimeSessionNormalizer(new PresentationTimeSessionNormalizer(envir())), fCreateNewProxyRTSPClientFunc(ourCreateNewProxyRTSPClientFunc), fTranscodingTable(transcodingTable), fInitialPortNum(initialPortNum), fMultiplexRTCPWithRTP(multiplexRTCPWithRTP) { // Open a RTSP connection to the input stream, and send a "DESCRIBE" command. // We'll use the SDP description in the response to set ourselves up. fProxyRTSPClient = (*fCreateNewProxyRTSPClientFunc)(*this, inputStreamURL, username, password, tunnelOverHTTPPortNum, verbosityLevel > 0 ? verbosityLevel-1 : verbosityLevel, socketNumToServer); ProxyRTSPClient::sendDESCRIBE(fProxyRTSPClient); } ProxyServerMediaSession::~ProxyServerMediaSession() { if (fVerbosityLevel > 0) { envir() << *this << "::~ProxyServerMediaSession()\n"; } // Begin by sending a "TEARDOWN" command (without checking for a response): if (fProxyRTSPClient != NULL && fClientMediaSession != NULL) { fProxyRTSPClient->sendTeardownCommand(*fClientMediaSession, NULL, fProxyRTSPClient->auth()); } // Then delete our state: Medium::close(fTranscodingTable); Medium::close(fClientMediaSession); Medium::close(fProxyRTSPClient); Medium::close(fPresentationTimeSessionNormalizer); } char const* ProxyServerMediaSession::url() const { return fProxyRTSPClient == NULL ? NULL : fProxyRTSPClient->url(); } Groupsock* ProxyServerMediaSession::createGroupsock(struct in_addr const& addr, Port port) { // Default implementation; may be redefined by subclasses: return new Groupsock(envir(), addr, port, 255); } RTCPInstance* ProxyServerMediaSession ::createRTCP(Groupsock* RTCPgs, unsigned totSessionBW, /* in kbps */ unsigned char const* cname, RTPSink* sink) { // Default implementation; may be redefined by subclasses: return RTCPInstance::createNew(envir(), RTCPgs, totSessionBW, cname, sink, NULL/*we're a server*/); } Boolean ProxyServerMediaSession::allowProxyingForSubsession(MediaSubsession const& /*mss*/) { // Default implementation return True; } void ProxyServerMediaSession::continueAfterDESCRIBE(char const* sdpDescription) { describeCompletedFlag = 1; // Create a (client) "MediaSession" object from the stream's SDP description ("resultString"), then iterate through its // "MediaSubsession" objects, to set up corresponding "ServerMediaSubsession" objects that we'll use to serve the stream's tracks. do { fClientMediaSession = MediaSession::createNew(envir(), sdpDescription); if (fClientMediaSession == NULL) break; MediaSubsessionIterator iter(*fClientMediaSession); for (MediaSubsession* mss = iter.next(); mss != NULL; mss = iter.next()) { if (!allowProxyingForSubsession(*mss)) continue; ServerMediaSubsession* smss = new ProxyServerMediaSubsession(*mss, fInitialPortNum, fMultiplexRTCPWithRTP); addSubsession(smss); if (fVerbosityLevel > 0) { envir() << *this << " added new \"ProxyServerMediaSubsession\" for " << mss->protocolName() << "/" << mss->mediumName() << "/" << mss->codecName() << " track\n"; } } } while (0); } void ProxyServerMediaSession::resetDESCRIBEState() { // Delete all of our "ProxyServerMediaSubsession"s; they'll get set up again once we get a response to the new "DESCRIBE". if (fOurMediaServer != NULL) { // First, close any client connections that may have already been set up: fOurMediaServer->closeAllClientSessionsForServerMediaSession(this); } deleteAllSubsessions(); // Finally, delete the client "MediaSession" object that we had set up after receiving the response to the previous "DESCRIBE": Medium::close(fClientMediaSession); fClientMediaSession = NULL; } ///////// RTSP 'response handlers' ////////// static void continueAfterDESCRIBE(RTSPClient* rtspClient, int resultCode, char* resultString) { char const* res; if (resultCode == 0) { // The "DESCRIBE" command succeeded, so "resultString" should be the stream's SDP description. res = resultString; } else { // The "DESCRIBE" command failed. res = NULL; } ((ProxyRTSPClient*)rtspClient)->continueAfterDESCRIBE(res); delete[] resultString; } static void continueAfterSETUP(RTSPClient* rtspClient, int resultCode, char* resultString) { ((ProxyRTSPClient*)rtspClient)->continueAfterSETUP(resultCode); delete[] resultString; } static void continueAfterPLAY(RTSPClient* rtspClient, int resultCode, char* resultString) { ((ProxyRTSPClient*)rtspClient)->continueAfterPLAY(resultCode); delete[] resultString; } static void continueAfterOPTIONS(RTSPClient* rtspClient, int resultCode, char* resultString) { Boolean serverSupportsGetParameter = False; if (resultCode == 0) { // Note whether the server told us that it supports the "GET_PARAMETER" command: serverSupportsGetParameter = RTSPOptionIsSupported("GET_PARAMETER", resultString); } ((ProxyRTSPClient*)rtspClient)->continueAfterLivenessCommand(resultCode, serverSupportsGetParameter); delete[] resultString; } #ifdef SEND_GET_PARAMETER_IF_SUPPORTED static void continueAfterGET_PARAMETER(RTSPClient* rtspClient, int resultCode, char* resultString) { ((ProxyRTSPClient*)rtspClient)->continueAfterLivenessCommand(resultCode, True); delete[] resultString; } #endif ////////// "ProxyRTSPClient" implementation ///////// UsageEnvironment& operator<<(UsageEnvironment& env, const ProxyRTSPClient& proxyRTSPClient) { // used for debugging return env << "ProxyRTSPClient[" << proxyRTSPClient.url() << "]"; } ProxyRTSPClient::ProxyRTSPClient(ProxyServerMediaSession& ourServerMediaSession, char const* rtspURL, char const* username, char const* password, portNumBits tunnelOverHTTPPortNum, int verbosityLevel, int socketNumToServer) : RTSPClient(ourServerMediaSession.envir(), rtspURL, verbosityLevel, "ProxyRTSPClient", tunnelOverHTTPPortNum == (portNumBits)(~0) ? 0 : tunnelOverHTTPPortNum, socketNumToServer), fOurServerMediaSession(ourServerMediaSession), fOurURL(strDup(rtspURL)), fStreamRTPOverTCP(tunnelOverHTTPPortNum != 0), fSetupQueueHead(NULL), fSetupQueueTail(NULL), fNumSetupsDone(0), fNextDESCRIBEDelay(1), fServerSupportsGetParameter(False), fLastCommandWasPLAY(False), fResetOnNextLivenessTest(False), fLivenessCommandTask(NULL), fDESCRIBECommandTask(NULL), fSubsessionTimerTask(NULL) { if (username != NULL && password != NULL) { fOurAuthenticator = new Authenticator(username, password); } else { fOurAuthenticator = NULL; } } void ProxyRTSPClient::reset() { envir().taskScheduler().unscheduleDelayedTask(fLivenessCommandTask); fLivenessCommandTask = NULL; envir().taskScheduler().unscheduleDelayedTask(fDESCRIBECommandTask); fDESCRIBECommandTask = NULL; envir().taskScheduler().unscheduleDelayedTask(fSubsessionTimerTask); fSubsessionTimerTask = NULL; fSetupQueueHead = fSetupQueueTail = NULL; fNumSetupsDone = 0; fNextDESCRIBEDelay = 1; fLastCommandWasPLAY = False; RTSPClient::reset(); } ProxyRTSPClient::~ProxyRTSPClient() { reset(); delete fOurAuthenticator; delete[] fOurURL; } void ProxyRTSPClient::continueAfterDESCRIBE(char const* sdpDescription) { if (sdpDescription != NULL) { fOurServerMediaSession.continueAfterDESCRIBE(sdpDescription); // Unlike most RTSP streams, there might be a long delay between this "DESCRIBE" command (to the downstream server) and the // subsequent "SETUP"/"PLAY" - which doesn't occur until the first time that a client requests the stream. // To prevent the proxied connection (between us and the downstream server) from timing out, we send periodic 'liveness' // ("OPTIONS" or "GET_PARAMETER") commands. (The usual RTCP liveness mechanism wouldn't work here, because RTCP packets // don't get sent until after the "PLAY" command.) scheduleLivenessCommand(); } else { // The "DESCRIBE" command failed, most likely because the server or the stream is not yet running. // Reschedule another "DESCRIBE" command to take place later: scheduleDESCRIBECommand(); } } void ProxyRTSPClient::continueAfterLivenessCommand(int resultCode, Boolean serverSupportsGetParameter) { if (fResetOnNextLivenessTest) { // Hack: We've arranged to reset the connection with the server (regardless of "resultCode"): fResetOnNextLivenessTest = False; resultCode = 2; // arbitrary > 0 } if (resultCode != 0) { // The periodic 'liveness' command failed, suggesting that the back-end stream is no longer alive. // We handle this by resetting our connection state with this server. Any current clients will be closed, but // subsequent clients will cause new RTSP "SETUP"s and "PLAY"s to get done, restarting the stream. // Then continue by sending more "DESCRIBE" commands, to try to restore the stream. fServerSupportsGetParameter = False; // until we learn otherwise, in response to a future "OPTIONS" command if (resultCode < 0) { // The 'liveness' command failed without getting a response from the server (otherwise "resultCode" would have been > 0). // This suggests that the RTSP connection itself has failed. Print this error code, in case it's useful for debugging: if (fVerbosityLevel > 0) { envir() << *this << ": lost connection to server ('errno': " << -resultCode << "). Resetting...\n"; } } reset(); fOurServerMediaSession.resetDESCRIBEState(); setBaseURL(fOurURL); // because we'll be sending an initial "DESCRIBE" all over again sendDESCRIBE(this); return; } fServerSupportsGetParameter = serverSupportsGetParameter; // Schedule the next 'liveness' command (i.e., to tell the back-end server that we're still alive): scheduleLivenessCommand(); } #define SUBSESSION_TIMEOUT_SECONDS 5 // how many seconds to wait for the last track's "SETUP" to be done (note below) void ProxyRTSPClient::continueAfterSETUP(int resultCode) { if (resultCode != 0) { // The "SETUP" command failed, so arrange to reset the state after the next RTSP 'liveness' // command. (We don't do this now, because it deletes the "ProxyServerMediaSubsession", // and we can't do that during "ProxyServerMediaSubsession::createNewStreamSource()".) fResetOnNextLivenessTest = True; envir().taskScheduler().rescheduleDelayedTask(fLivenessCommandTask, 0, sendLivenessCommand, this); return; } if (fVerbosityLevel > 0) { envir() << *this << "::continueAfterSETUP(): head codec: " << fSetupQueueHead->codecName() << "; numSubsessions " << fSetupQueueHead->fParentSession->numSubsessions() << "\n\tqueue:"; for (ProxyServerMediaSubsession* p = fSetupQueueHead; p != NULL; p = p->fNext) { envir() << "\t" << p->codecName(); } envir() << "\n"; } envir().taskScheduler().unscheduleDelayedTask(fSubsessionTimerTask); // in case it had been set // Dequeue the first "ProxyServerMediaSubsession" from our 'SETUP queue'. It will be the one for which this "SETUP" was done: ProxyServerMediaSubsession* smss = fSetupQueueHead; // Assert: != NULL fSetupQueueHead = fSetupQueueHead->fNext; if (fSetupQueueHead == NULL) fSetupQueueTail = NULL; if (fSetupQueueHead != NULL) { // There are still entries in the queue, for tracks for which we have still to do a "SETUP". // "SETUP" the first of these now: sendSetupCommand(fSetupQueueHead->fClientMediaSubsession, ::continueAfterSETUP, False, fStreamRTPOverTCP, False, fOurAuthenticator); ++fNumSetupsDone; fSetupQueueHead->fHaveSetupStream = True; } else { if (fNumSetupsDone >= smss->fParentSession->numSubsessions()) { // We've now finished setting up each of our subsessions (i.e., 'tracks'). // Continue by sending a "PLAY" command (an 'aggregate' "PLAY" command, on the whole session): sendPlayCommand(smss->fClientMediaSubsession.parentSession(), ::continueAfterPLAY, -1.0f, -1.0f, 1.0f, fOurAuthenticator); // the "-1.0f" "start" parameter causes the "PLAY" to be sent without a "Range:" header, in case we'd already done // a "PLAY" before (as a result of a 'subsession timeout' (note below)) fLastCommandWasPLAY = True; } else { // Some of this session's subsessions (i.e., 'tracks') remain to be "SETUP". They might get "SETUP" very soon, but it's // also possible - if the remote client chose to play only some of the session's tracks - that they might not. // To allow for this possibility, we set a timer. If the timer expires without the remaining subsessions getting "SETUP", // then we send a "PLAY" command anyway: fSubsessionTimerTask = envir().taskScheduler().scheduleDelayedTask(SUBSESSION_TIMEOUT_SECONDS*MILLION, (TaskFunc*)subsessionTimeout, this); } } } void ProxyRTSPClient::continueAfterPLAY(int resultCode) { if (resultCode != 0) { // The "PLAY" command failed, so arrange to reset the state after the next RTSP 'liveness' // command. (We don't do this now, because it deletes the "ProxyServerMediaSubsession", // and we can't do that during "ProxyServerMediaSubsession::createNewStreamSource()".) fResetOnNextLivenessTest = True; envir().taskScheduler().rescheduleDelayedTask(fLivenessCommandTask, 0, sendLivenessCommand, this); return; } } void ProxyRTSPClient::scheduleLivenessCommand() { // Delay a random time before sending another 'liveness' command. unsigned delayMax = sessionTimeoutParameter(); // if the server specified a maximum time between 'liveness' probes, then use that if (delayMax == 0) { delayMax = 60; } // Choose a random time from [delayMax/2,delayMax-1) seconds: unsigned const us_1stPart = delayMax*500000; unsigned uSecondsToDelay; if (us_1stPart <= 1000000) { uSecondsToDelay = us_1stPart; } else { unsigned const us_2ndPart = us_1stPart-1000000; uSecondsToDelay = us_1stPart + (us_2ndPart*our_random())%us_2ndPart; } fLivenessCommandTask = envir().taskScheduler().scheduleDelayedTask(uSecondsToDelay, sendLivenessCommand, this); } void ProxyRTSPClient::sendLivenessCommand(void* clientData) { ProxyRTSPClient* rtspClient = (ProxyRTSPClient*)clientData; // Note. By default, we do not send "GET_PARAMETER" as our 'liveness notification' command, even if the server previously // indicated (in its response to our earlier "OPTIONS" command) that it supported "GET_PARAMETER". This is because // "GET_PARAMETER" crashes some camera servers (even though they claimed to support "GET_PARAMETER"). #ifdef SEND_GET_PARAMETER_IF_SUPPORTED MediaSession* sess = rtspClient->fOurServerMediaSession.fClientMediaSession; if (rtspClient->fServerSupportsGetParameter && rtspClient->fNumSetupsDone > 0 && sess != NULL) { rtspClient->sendGetParameterCommand(*sess, ::continueAfterGET_PARAMETER, "", rtspClient->auth()); } else { #endif rtspClient->sendOptionsCommand(::continueAfterOPTIONS, rtspClient->auth()); #ifdef SEND_GET_PARAMETER_IF_SUPPORTED } #endif } void ProxyRTSPClient::scheduleDESCRIBECommand() { // Delay 1s, 2s, 4s, 8s ... 256s until sending the next "DESCRIBE". Then, keep delaying a random time from [256..511] seconds: unsigned secondsToDelay; if (fNextDESCRIBEDelay <= 256) { secondsToDelay = fNextDESCRIBEDelay; fNextDESCRIBEDelay *= 2; } else { secondsToDelay = 256 + (our_random()&0xFF); // [256..511] seconds } if (fVerbosityLevel > 0) { envir() << *this << ": RTSP \"DESCRIBE\" command failed; trying again in " << secondsToDelay << " seconds\n"; } fDESCRIBECommandTask = envir().taskScheduler().scheduleDelayedTask(secondsToDelay*MILLION, sendDESCRIBE, this); } void ProxyRTSPClient::sendDESCRIBE(void* clientData) { ProxyRTSPClient* rtspClient = (ProxyRTSPClient*)clientData; if (rtspClient != NULL) rtspClient->sendDescribeCommand(::continueAfterDESCRIBE, rtspClient->auth()); } void ProxyRTSPClient::subsessionTimeout(void* clientData) { ((ProxyRTSPClient*)clientData)->handleSubsessionTimeout(); } void ProxyRTSPClient::handleSubsessionTimeout() { // We still have one or more subsessions ('tracks') left to "SETUP". But we can't wait any longer for them. Send a "PLAY" now: MediaSession* sess = fOurServerMediaSession.fClientMediaSession; if (sess != NULL) sendPlayCommand(*sess, ::continueAfterPLAY, -1.0f, -1.0f, 1.0f, fOurAuthenticator); fLastCommandWasPLAY = True; } //////// "ProxyServerMediaSubsession" implementation ////////// ProxyServerMediaSubsession ::ProxyServerMediaSubsession(MediaSubsession& mediaSubsession, portNumBits initialPortNum, Boolean multiplexRTCPWithRTP) : OnDemandServerMediaSubsession(mediaSubsession.parentSession().envir(), True/*reuseFirstSource*/, initialPortNum, multiplexRTCPWithRTP), fClientMediaSubsession(mediaSubsession), fCodecName(strDup(mediaSubsession.codecName())), fNext(NULL), fHaveSetupStream(False) { } UsageEnvironment& operator<<(UsageEnvironment& env, const ProxyServerMediaSubsession& psmss) { // used for debugging return env << "ProxyServerMediaSubsession[" << psmss.url() << "," << psmss.codecName() << "]"; } ProxyServerMediaSubsession::~ProxyServerMediaSubsession() { if (verbosityLevel() > 0) { envir() << *this << "::~ProxyServerMediaSubsession()\n"; } delete[] (char*)fCodecName; } FramedSource* ProxyServerMediaSubsession::createNewStreamSource(unsigned clientSessionId, unsigned& estBitrate) { ProxyServerMediaSession* const sms = (ProxyServerMediaSession*)fParentSession; if (verbosityLevel() > 0) { envir() << *this << "::createNewStreamSource(session id " << clientSessionId << ")\n"; } // If we haven't yet created a data source from our 'media subsession' object, initiate() it to do so: if (fClientMediaSubsession.readSource() == NULL) { if (sms->fTranscodingTable == NULL || !sms->fTranscodingTable->weWillTranscode("audio", "MPA-ROBUST")) fClientMediaSubsession.receiveRawMP3ADUs(); // hack for proxying MPA-ROBUST streams if (sms->fTranscodingTable == NULL || !sms->fTranscodingTable->weWillTranscode("video", "JPEG")) fClientMediaSubsession.receiveRawJPEGFrames(); // hack for proxying JPEG/RTP streams. fClientMediaSubsession.initiate(); if (verbosityLevel() > 0) { envir() << "\tInitiated: " << *this << "\n"; } if (fClientMediaSubsession.readSource() != NULL) { // First, check whether we have defined a 'transcoder' filter to be used with this codec: if (sms->fTranscodingTable != NULL) { char* outputCodecName; FramedFilter* transcoder = sms->fTranscodingTable->lookupTranscoder(fClientMediaSubsession, outputCodecName); if (transcoder != NULL) { fClientMediaSubsession.addFilter(transcoder); delete[] (char*)fCodecName; fCodecName = outputCodecName; } } // Then, add to the front of all data sources a filter that will 'normalize' their frames' // presentation times, before the frames get re-transmitted by our server: FramedFilter* normalizerFilter = sms->fPresentationTimeSessionNormalizer ->createNewPresentationTimeSubsessionNormalizer(fClientMediaSubsession.readSource(), fClientMediaSubsession.rtpSource(), fCodecName); fClientMediaSubsession.addFilter(normalizerFilter); // Some data sources require a 'framer' object to be added, before they can be fed into // a "RTPSink". Adjust for this now: if (strcmp(fCodecName, "H264") == 0) { fClientMediaSubsession.addFilter(H264VideoStreamDiscreteFramer ::createNew(envir(), fClientMediaSubsession.readSource())); } else if (strcmp(fCodecName, "H265") == 0) { fClientMediaSubsession.addFilter(H265VideoStreamDiscreteFramer ::createNew(envir(), fClientMediaSubsession.readSource())); } else if (strcmp(fCodecName, "MP4V-ES") == 0) { fClientMediaSubsession.addFilter(MPEG4VideoStreamDiscreteFramer ::createNew(envir(), fClientMediaSubsession.readSource(), True/* leave PTs unmodified*/)); } else if (strcmp(fCodecName, "MPV") == 0) { fClientMediaSubsession.addFilter(MPEG1or2VideoStreamDiscreteFramer ::createNew(envir(), fClientMediaSubsession.readSource(), False, 5.0, True/* leave PTs unmodified*/)); } else if (strcmp(fCodecName, "DV") == 0) { fClientMediaSubsession.addFilter(DVVideoStreamFramer ::createNew(envir(), fClientMediaSubsession.readSource(), False, True/* leave PTs unmodified*/)); } } if (fClientMediaSubsession.rtcpInstance() != NULL) { fClientMediaSubsession.rtcpInstance()->setByeHandler(subsessionByeHandler, this); } } ProxyRTSPClient* const proxyRTSPClient = sms->fProxyRTSPClient; if (clientSessionId != 0) { // We're being called as a result of implementing a RTSP "SETUP". if (!fHaveSetupStream) { // This is our first "SETUP". Send RTSP "SETUP" and later "PLAY" commands to the proxied server, to start streaming: // (Before sending "SETUP", enqueue ourselves on the "RTSPClient"s 'SETUP queue', so we'll be able to get the correct // "ProxyServerMediaSubsession" to handle the response. (Note that responses come back in the same order as requests.)) Boolean queueWasEmpty = proxyRTSPClient->fSetupQueueHead == NULL; if (queueWasEmpty) { proxyRTSPClient->fSetupQueueHead = this; proxyRTSPClient->fSetupQueueTail = this; } else { // Add ourself to the "RTSPClient"s 'SETUP queue' (if we're not already on it): ProxyServerMediaSubsession* psms; for (psms = proxyRTSPClient->fSetupQueueHead; psms != NULL; psms = psms->fNext) { if (psms == this) break; } if (psms == NULL) { proxyRTSPClient->fSetupQueueTail->fNext = this; proxyRTSPClient->fSetupQueueTail = this; } } // Hack: If there's already a pending "SETUP" request, don't send this track's "SETUP" right away, because // the server might not properly handle 'pipelined' requests. Instead, wait until after previous "SETUP" responses come back. if (queueWasEmpty) { proxyRTSPClient->sendSetupCommand(fClientMediaSubsession, ::continueAfterSETUP, False, proxyRTSPClient->fStreamRTPOverTCP, False, proxyRTSPClient->auth()); ++proxyRTSPClient->fNumSetupsDone; fHaveSetupStream = True; } } else { // This is a "SETUP" from a new client. We know that there are no other currently active clients (otherwise we wouldn't // have been called here), so we know that the substream was previously "PAUSE"d. Send "PLAY" downstream once again, // to resume the stream: if (!proxyRTSPClient->fLastCommandWasPLAY) { // so that we send only one "PLAY"; not one for each subsession proxyRTSPClient->sendPlayCommand(fClientMediaSubsession.parentSession(), ::continueAfterPLAY, -1.0f/*resume from previous point*/, -1.0f, 1.0f, proxyRTSPClient->auth()); proxyRTSPClient->fLastCommandWasPLAY = True; } } } estBitrate = fClientMediaSubsession.bandwidth(); if (estBitrate == 0) estBitrate = 50; // kbps, estimate return fClientMediaSubsession.readSource(); } void ProxyServerMediaSubsession::closeStreamSource(FramedSource* inputSource) { if (verbosityLevel() > 0) { envir() << *this << "::closeStreamSource()\n"; } // Because there's only one input source for this 'subsession' (regardless of how many downstream clients are proxying it), // we don't close the input source here. (Instead, we wait until *this* object gets deleted.) // However, because (as evidenced by this function having been called) we no longer have any clients accessing the stream, // then we "PAUSE" the downstream proxied stream, until a new client arrives: if (fHaveSetupStream) { ProxyServerMediaSession* const sms = (ProxyServerMediaSession*)fParentSession; ProxyRTSPClient* const proxyRTSPClient = sms->fProxyRTSPClient; if (proxyRTSPClient->fLastCommandWasPLAY) { // so that we send only one "PAUSE"; not one for each subsession if (fParentSession->referenceCount() > 1) { // There are other client(s) still streaming other subsessions of this stream. // Therefore, we don't send a "PAUSE" for the whole stream, but only for the sub-stream: proxyRTSPClient->sendPauseCommand(fClientMediaSubsession, NULL, proxyRTSPClient->auth()); } else { // Normal case: There are no other client still streaming (parts of) this stream. // Send a "PAUSE" for the whole stream. proxyRTSPClient->sendPauseCommand(fClientMediaSubsession.parentSession(), NULL, proxyRTSPClient->auth()); proxyRTSPClient->fLastCommandWasPLAY = False; } } } } RTPSink* ProxyServerMediaSubsession ::createNewRTPSink(Groupsock* rtpGroupsock, unsigned char rtpPayloadTypeIfDynamic, FramedSource* inputSource) { if (verbosityLevel() > 0) { envir() << *this << "::createNewRTPSink()\n"; } // Create (and return) the appropriate "RTPSink" object for our codec: // (Note: The configuration string might not be correct if a transcoder is used. FIX!) ##### RTPSink* newSink; if (strcmp(fCodecName, "AC3") == 0 || strcmp(fCodecName, "EAC3") == 0) { newSink = AC3AudioRTPSink::createNew(envir(), rtpGroupsock, rtpPayloadTypeIfDynamic, fClientMediaSubsession.rtpTimestampFrequency()); #if 0 // This code does not work; do *not* enable it: } else if (strcmp(fCodecName, "AMR") == 0 || strcmp(fCodecName, "AMR-WB") == 0) { Boolean isWideband = strcmp(fCodecName, "AMR-WB") == 0; newSink = AMRAudioRTPSink::createNew(envir(), rtpGroupsock, rtpPayloadTypeIfDynamic, isWideband, fClientMediaSubsession.numChannels()); #endif } else if (strcmp(fCodecName, "DV") == 0) { newSink = DVVideoRTPSink::createNew(envir(), rtpGroupsock, rtpPayloadTypeIfDynamic); } else if (strcmp(fCodecName, "GSM") == 0) { newSink = GSMAudioRTPSink::createNew(envir(), rtpGroupsock); } else if (strcmp(fCodecName, "H263-1998") == 0 || strcmp(fCodecName, "H263-2000") == 0) { newSink = H263plusVideoRTPSink::createNew(envir(), rtpGroupsock, rtpPayloadTypeIfDynamic, fClientMediaSubsession.rtpTimestampFrequency()); } else if (strcmp(fCodecName, "H264") == 0) { newSink = H264VideoRTPSink::createNew(envir(), rtpGroupsock, rtpPayloadTypeIfDynamic, fClientMediaSubsession.fmtp_spropparametersets()); } else if (strcmp(fCodecName, "H265") == 0) { newSink = H265VideoRTPSink::createNew(envir(), rtpGroupsock, rtpPayloadTypeIfDynamic, fClientMediaSubsession.fmtp_spropvps(), fClientMediaSubsession.fmtp_spropsps(), fClientMediaSubsession.fmtp_sproppps()); } else if (strcmp(fCodecName, "JPEG") == 0) { newSink = SimpleRTPSink::createNew(envir(), rtpGroupsock, 26, 90000, "video", "JPEG", 1/*numChannels*/, False/*allowMultipleFramesPerPacket*/, False/*doNormalMBitRule*/); } else if (strcmp(fCodecName, "MP4A-LATM") == 0) { newSink = MPEG4LATMAudioRTPSink::createNew(envir(), rtpGroupsock, rtpPayloadTypeIfDynamic, fClientMediaSubsession.rtpTimestampFrequency(), fClientMediaSubsession.fmtp_config(), fClientMediaSubsession.numChannels()); } else if (strcmp(fCodecName, "MP4V-ES") == 0) { newSink = MPEG4ESVideoRTPSink::createNew(envir(), rtpGroupsock, rtpPayloadTypeIfDynamic, fClientMediaSubsession.rtpTimestampFrequency(), fClientMediaSubsession.attrVal_unsigned("profile-level-id"), fClientMediaSubsession.fmtp_config()); } else if (strcmp(fCodecName, "MPA") == 0) { newSink = MPEG1or2AudioRTPSink::createNew(envir(), rtpGroupsock); } else if (strcmp(fCodecName, "MPA-ROBUST") == 0) { newSink = MP3ADURTPSink::createNew(envir(), rtpGroupsock, rtpPayloadTypeIfDynamic); } else if (strcmp(fCodecName, "MPEG4-GENERIC") == 0) { newSink = MPEG4GenericRTPSink::createNew(envir(), rtpGroupsock, rtpPayloadTypeIfDynamic, fClientMediaSubsession.rtpTimestampFrequency(), fClientMediaSubsession.mediumName(), fClientMediaSubsession.attrVal_str("mode"), fClientMediaSubsession.fmtp_config(), fClientMediaSubsession.numChannels()); } else if (strcmp(fCodecName, "MPV") == 0) { newSink = MPEG1or2VideoRTPSink::createNew(envir(), rtpGroupsock); } else if (strcmp(fCodecName, "OPUS") == 0) { newSink = SimpleRTPSink::createNew(envir(), rtpGroupsock, rtpPayloadTypeIfDynamic, 48000, "audio", "OPUS", 2, False/*only 1 Opus 'packet' in each RTP packet*/); } else if (strcmp(fCodecName, "T140") == 0) { newSink = T140TextRTPSink::createNew(envir(), rtpGroupsock, rtpPayloadTypeIfDynamic); } else if (strcmp(fCodecName, "THEORA") == 0) { newSink = TheoraVideoRTPSink::createNew(envir(), rtpGroupsock, rtpPayloadTypeIfDynamic, fClientMediaSubsession.fmtp_config()); } else if (strcmp(fCodecName, "VORBIS") == 0) { newSink = VorbisAudioRTPSink::createNew(envir(), rtpGroupsock, rtpPayloadTypeIfDynamic, fClientMediaSubsession.rtpTimestampFrequency(), fClientMediaSubsession.numChannels(), fClientMediaSubsession.fmtp_config()); } else if (strcmp(fCodecName, "VP8") == 0) { newSink = VP8VideoRTPSink::createNew(envir(), rtpGroupsock, rtpPayloadTypeIfDynamic); } else if (strcmp(fCodecName, "VP9") == 0) { newSink = VP9VideoRTPSink::createNew(envir(), rtpGroupsock, rtpPayloadTypeIfDynamic); } else if (strcmp(fCodecName, "AMR") == 0 || strcmp(fCodecName, "AMR-WB") == 0) { // Proxying of these codecs is currently *not* supported, because the data received by the "RTPSource" object is not in a // form that can be fed directly into a corresponding "RTPSink" object. if (verbosityLevel() > 0) { envir() << "\treturns NULL (because we currently don't support the proxying of \"" << fClientMediaSubsession.mediumName() << "/" << fCodecName << "\" streams)\n"; } return NULL; } else if (strcmp(fCodecName, "QCELP") == 0 || strcmp(fCodecName, "H261") == 0 || strcmp(fCodecName, "H263-1998") == 0 || strcmp(fCodecName, "H263-2000") == 0 || strcmp(fCodecName, "X-QT") == 0 || strcmp(fCodecName, "X-QUICKTIME") == 0) { // This codec requires a specialized RTP payload format; however, we don't yet have an appropriate "RTPSink" subclass for it: if (verbosityLevel() > 0) { envir() << "\treturns NULL (because we don't have a \"RTPSink\" subclass for this RTP payload format)\n"; } return NULL; } else { // This codec is assumed to have a simple RTP payload format that can be implemented just with a "SimpleRTPSink": Boolean allowMultipleFramesPerPacket = True; // by default Boolean doNormalMBitRule = True; // by default // Some codecs change the above default parameters: if (strcmp(fCodecName, "MP2T") == 0) { doNormalMBitRule = False; // no RTP 'M' bit } newSink = SimpleRTPSink::createNew(envir(), rtpGroupsock, rtpPayloadTypeIfDynamic, fClientMediaSubsession.rtpTimestampFrequency(), fClientMediaSubsession.mediumName(), fCodecName, fClientMediaSubsession.numChannels(), allowMultipleFramesPerPacket, doNormalMBitRule); } // Because our relayed frames' presentation times are inaccurate until the input frames have been RTCP-synchronized, // we temporarily disable RTCP "SR" reports for this "RTPSink" object: newSink->enableRTCPReports() = False; // Also tell our "PresentationTimeSubsessionNormalizer" object about the "RTPSink", so it can enable RTCP "SR" reports later: PresentationTimeSubsessionNormalizer* ssNormalizer; if (strcmp(fCodecName, "H264") == 0 || strcmp(fCodecName, "H265") == 0 || strcmp(fCodecName, "MP4V-ES") == 0 || strcmp(fCodecName, "MPV") == 0 || strcmp(fCodecName, "DV") == 0) { // There was a separate 'framer' object in front of the "PresentationTimeSubsessionNormalizer", so go back one object to get it: ssNormalizer = (PresentationTimeSubsessionNormalizer*)(((FramedFilter*)inputSource)->inputSource()); } else { ssNormalizer = (PresentationTimeSubsessionNormalizer*)inputSource; } ssNormalizer->setRTPSink(newSink); return newSink; } Groupsock* ProxyServerMediaSubsession::createGroupsock(struct in_addr const& addr, Port port) { ProxyServerMediaSession* parentSession = (ProxyServerMediaSession*)fParentSession; return parentSession->createGroupsock(addr, port); } RTCPInstance* ProxyServerMediaSubsession ::createRTCP(Groupsock* RTCPgs, unsigned totSessionBW, /* in kbps */ unsigned char const* cname, RTPSink* sink) { ProxyServerMediaSession* parentSession = (ProxyServerMediaSession*)fParentSession; return parentSession->createRTCP(RTCPgs, totSessionBW, cname, sink); } void ProxyServerMediaSubsession::subsessionByeHandler(void* clientData) { ((ProxyServerMediaSubsession*)clientData)->subsessionByeHandler(); } void ProxyServerMediaSubsession::subsessionByeHandler() { if (verbosityLevel() > 0) { envir() << *this << ": received RTCP \"BYE\". (The back-end stream has ended.)\n"; } // This "BYE" signals that our input source has (effectively) closed, so pass this onto the front-end clients: fHaveSetupStream = False; // hack to stop "PAUSE" getting sent by: if (fClientMediaSubsession.readSource() != NULL) { fClientMediaSubsession.readSource()->handleClosure(); } // And then treat this as if we had lost connection to the back-end server, // and can reestablish streaming from it only by sending another "DESCRIBE": ProxyServerMediaSession* const sms = (ProxyServerMediaSession*)fParentSession; ProxyRTSPClient* const proxyRTSPClient = sms->fProxyRTSPClient; proxyRTSPClient->continueAfterLivenessCommand(1/*hack*/, proxyRTSPClient->fServerSupportsGetParameter); } ////////// PresentationTimeSessionNormalizer and PresentationTimeSubsessionNormalizer implementations ////////// // PresentationTimeSessionNormalizer: PresentationTimeSessionNormalizer::PresentationTimeSessionNormalizer(UsageEnvironment& env) : Medium(env), fSubsessionNormalizers(NULL), fMasterSSNormalizer(NULL) { } PresentationTimeSessionNormalizer::~PresentationTimeSessionNormalizer() { while (fSubsessionNormalizers != NULL) { Medium::close(fSubsessionNormalizers); } } PresentationTimeSubsessionNormalizer* PresentationTimeSessionNormalizer ::createNewPresentationTimeSubsessionNormalizer(FramedSource* inputSource, RTPSource* rtpSource, char const* codecName) { fSubsessionNormalizers = new PresentationTimeSubsessionNormalizer(*this, inputSource, rtpSource, codecName, fSubsessionNormalizers); return fSubsessionNormalizers; } void PresentationTimeSessionNormalizer ::normalizePresentationTime(PresentationTimeSubsessionNormalizer* ssNormalizer, struct timeval& toPT, struct timeval const& fromPT) { Boolean const hasBeenSynced = ssNormalizer->fRTPSource->hasBeenSynchronizedUsingRTCP(); if (!hasBeenSynced) { // If "fromPT" has not yet been RTCP-synchronized, then it was generated by our own receiving code, and thus // is already aligned with 'wall-clock' time. Just copy it 'as is' to "toPT": toPT = fromPT; } else { if (fMasterSSNormalizer == NULL) { // Make "ssNormalizer" the 'master' subsession - meaning that its presentation time is adjusted to align with 'wall clock' // time, and the presentation times of other subsessions (if any) are adjusted to retain their relative separation with // those of the master: fMasterSSNormalizer = ssNormalizer; struct timeval timeNow; gettimeofday(&timeNow, NULL); // Compute: fPTAdjustment = timeNow - fromPT fPTAdjustment.tv_sec = timeNow.tv_sec - fromPT.tv_sec; fPTAdjustment.tv_usec = timeNow.tv_usec - fromPT.tv_usec; // Note: It's OK if one or both of these fields underflows; the result still works out OK later. } // Compute a normalized presentation time: toPT = fromPT + fPTAdjustment toPT.tv_sec = fromPT.tv_sec + fPTAdjustment.tv_sec - 1; toPT.tv_usec = fromPT.tv_usec + fPTAdjustment.tv_usec + MILLION; while (toPT.tv_usec > MILLION) { ++toPT.tv_sec; toPT.tv_usec -= MILLION; } // Because "ssNormalizer"s relayed presentation times are accurate from now on, enable RTCP "SR" reports for its "RTPSink": RTPSink* const rtpSink = ssNormalizer->fRTPSink; if (rtpSink != NULL) { // sanity check; should always be true rtpSink->enableRTCPReports() = True; } } } void PresentationTimeSessionNormalizer ::removePresentationTimeSubsessionNormalizer(PresentationTimeSubsessionNormalizer* ssNormalizer) { // Unlink "ssNormalizer" from the linked list (starting with "fSubsessionNormalizers"): if (fSubsessionNormalizers == ssNormalizer) { fSubsessionNormalizers = fSubsessionNormalizers->fNext; } else { PresentationTimeSubsessionNormalizer** ssPtrPtr = &(fSubsessionNormalizers->fNext); while (*ssPtrPtr != ssNormalizer) ssPtrPtr = &((*ssPtrPtr)->fNext); *ssPtrPtr = (*ssPtrPtr)->fNext; } } // PresentationTimeSubsessionNormalizer: PresentationTimeSubsessionNormalizer ::PresentationTimeSubsessionNormalizer(PresentationTimeSessionNormalizer& parent, FramedSource* inputSource, RTPSource* rtpSource, char const* codecName, PresentationTimeSubsessionNormalizer* next) : FramedFilter(parent.envir(), inputSource), fParent(parent), fRTPSource(rtpSource), fRTPSink(NULL), fCodecName(codecName), fNext(next) { } PresentationTimeSubsessionNormalizer::~PresentationTimeSubsessionNormalizer() { fParent.removePresentationTimeSubsessionNormalizer(this); } void PresentationTimeSubsessionNormalizer::afterGettingFrame(void* clientData, unsigned frameSize, unsigned numTruncatedBytes, struct timeval presentationTime, unsigned durationInMicroseconds) { ((PresentationTimeSubsessionNormalizer*)clientData) ->afterGettingFrame(frameSize, numTruncatedBytes, presentationTime, durationInMicroseconds); } void PresentationTimeSubsessionNormalizer::afterGettingFrame(unsigned frameSize, unsigned numTruncatedBytes, struct timeval presentationTime, unsigned durationInMicroseconds) { // This filter is implemented by passing all frames through unchanged, except that "fPresentationTime" is changed: fFrameSize = frameSize; fNumTruncatedBytes = numTruncatedBytes; fDurationInMicroseconds = durationInMicroseconds; fParent.normalizePresentationTime(this, fPresentationTime, presentationTime); // Hack for JPEG/RTP proxying. Because we're proxying JPEG by just copying the raw JPEG/RTP payloads, without interpreting them, // we need to also 'copy' the RTP 'M' (marker) bit from the "RTPSource" to the "RTPSink": if (fRTPSource->curPacketMarkerBit() && strcmp(fCodecName, "JPEG") == 0) ((SimpleRTPSink*)fRTPSink)->setMBitOnNextPacket(); // Complete delivery: FramedSource::afterGetting(this); } void PresentationTimeSubsessionNormalizer::doGetNextFrame() { fInputSource->getNextFrame(fTo, fMaxSize, afterGettingFrame, this, FramedSource::handleClosure, this); }