| /********** |
| This library is free software; you can redistribute it and/or modify it under |
| the terms of the GNU Lesser General Public License as published by the |
| Free Software Foundation; either version 3 of the License, or (at your |
| option) any later version. (See <http://www.gnu.org/copyleft/lesser.html>.) |
| |
| This library is distributed in the hope that it will be useful, but WITHOUT |
| ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
| FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for |
| more details. |
| |
| You should have received a copy of the GNU Lesser General Public License |
| along with this library; if not, write to the Free Software Foundation, Inc., |
| 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA |
| **********/ |
| // "liveMedia" |
| // Copyright (c) 1996-2020 Live Networks, Inc. All rights reserved. |
| // An class that can be used to create (possibly multiple) 'replicas' of an incoming stream. |
| // Implementation. |
| |
| #include "StreamReplicator.hh" |
| |
| ////////// Definition of "StreamReplica": The class that implements each stream replica ////////// |
| |
| class StreamReplica: public FramedSource { |
| protected: |
| friend class StreamReplicator; |
| StreamReplica(StreamReplicator& ourReplicator); // called only by "StreamReplicator::createStreamReplica()" |
| virtual ~StreamReplica(); |
| |
| private: // redefined virtual functions: |
| virtual void doGetNextFrame(); |
| virtual void doStopGettingFrames(); |
| |
| private: |
| static void copyReceivedFrame(StreamReplica* toReplica, StreamReplica* fromReplica); |
| |
| private: |
| StreamReplicator& fOurReplicator; |
| int fFrameIndex; // 0 or 1, depending upon which frame we're currently requesting; could also be -1 if we've stopped playing |
| |
| // Replicas that are currently awaiting data are kept in a (singly-linked) list: |
| StreamReplica* fNext; |
| }; |
| |
| |
| ////////// StreamReplicator implementation ////////// |
| |
| StreamReplicator* StreamReplicator::createNew(UsageEnvironment& env, FramedSource* inputSource, Boolean deleteWhenLastReplicaDies) { |
| return new StreamReplicator(env, inputSource, deleteWhenLastReplicaDies); |
| } |
| |
| StreamReplicator::StreamReplicator(UsageEnvironment& env, FramedSource* inputSource, Boolean deleteWhenLastReplicaDies) |
| : Medium(env), |
| fInputSource(inputSource), fDeleteWhenLastReplicaDies(deleteWhenLastReplicaDies), fInputSourceHasClosed(False), |
| fNumReplicas(0), fNumActiveReplicas(0), fNumDeliveriesMadeSoFar(0), |
| fFrameIndex(0), fMasterReplica(NULL), fReplicasAwaitingCurrentFrame(NULL), fReplicasAwaitingNextFrame(NULL) { |
| } |
| |
| StreamReplicator::~StreamReplicator() { |
| Medium::close(fInputSource); |
| } |
| |
| FramedSource* StreamReplicator::createStreamReplica() { |
| ++fNumReplicas; |
| return new StreamReplica(*this); |
| } |
| |
| void StreamReplicator::getNextFrame(StreamReplica* replica) { |
| if (fInputSourceHasClosed) { // handle closure instead |
| replica->handleClosure(); |
| return; |
| } |
| |
| if (replica->fFrameIndex == -1) { |
| // This replica had stopped playing (or had just been created), but is now actively reading. Note this: |
| replica->fFrameIndex = fFrameIndex; |
| ++fNumActiveReplicas; |
| } |
| |
| if (fMasterReplica == NULL) { |
| // This is the first replica to request the next unread frame. Make it the 'master' replica - meaning that we read the frame |
| // into its buffer, and then copy from this into the other replicas' buffers. |
| fMasterReplica = replica; |
| |
| // Arrange to read the next frame into this replica's buffer: |
| if (fInputSource != NULL) fInputSource->getNextFrame(fMasterReplica->fTo, fMasterReplica->fMaxSize, |
| afterGettingFrame, this, onSourceClosure, this); |
| } else if (replica->fFrameIndex != fFrameIndex) { |
| // This replica is already asking for the next frame (because it has already received the current frame). Enqueue it: |
| replica->fNext = fReplicasAwaitingNextFrame; |
| fReplicasAwaitingNextFrame = replica; |
| } else { |
| // This replica is asking for the current frame. Enqueue it: |
| replica->fNext = fReplicasAwaitingCurrentFrame; |
| fReplicasAwaitingCurrentFrame = replica; |
| |
| if (fInputSource != NULL && !fInputSource->isCurrentlyAwaitingData()) { |
| // The current frame has already arrived, so deliver it to this replica now: |
| deliverReceivedFrame(); |
| } |
| } |
| } |
| |
| void StreamReplicator::deactivateStreamReplica(StreamReplica* replicaBeingDeactivated) { |
| if (replicaBeingDeactivated->fFrameIndex == -1) return; // this replica has already been deactivated (or was never activated at all) |
| |
| // Assert: fNumActiveReplicas > 0 |
| if (fNumActiveReplicas == 0) fprintf(stderr, "StreamReplicator::deactivateStreamReplica() Internal Error!\n"); // should not happen |
| --fNumActiveReplicas; |
| |
| // Forget about any frame delivery that might have just been made to this replica: |
| if (replicaBeingDeactivated->fFrameIndex != fFrameIndex && fNumDeliveriesMadeSoFar > 0) --fNumDeliveriesMadeSoFar; |
| |
| replicaBeingDeactivated->fFrameIndex = -1; |
| |
| // Check whether the replica being deactivated is the 'master' replica, or is enqueued awaiting a frame: |
| if (replicaBeingDeactivated == fMasterReplica) { |
| // We need to replace the 'master replica', if we can: |
| if (fReplicasAwaitingCurrentFrame == NULL) { |
| // There's currently no replacement 'master replica' |
| fMasterReplica = NULL; |
| } else { |
| // There's another replica that we can use as a replacement 'master replica': |
| fMasterReplica = fReplicasAwaitingCurrentFrame; |
| fReplicasAwaitingCurrentFrame = fReplicasAwaitingCurrentFrame->fNext; |
| fMasterReplica->fNext = NULL; |
| } |
| |
| // Check whether the read into the old master replica's buffer is still pending, or has completed: |
| if (fInputSource != NULL) { |
| if (fInputSource->isCurrentlyAwaitingData()) { |
| // We have a pending read into the old master replica's buffer. |
| // We need to stop it, and retry the read with a new master (if available) |
| fInputSource->stopGettingFrames(); |
| |
| if (fMasterReplica != NULL) { |
| fInputSource->getNextFrame(fMasterReplica->fTo, fMasterReplica->fMaxSize, |
| afterGettingFrame, this, onSourceClosure, this); |
| } |
| } else { |
| // The read into the old master replica's buffer has already completed. Copy the data to the new master replica (if any): |
| if (fMasterReplica != NULL) { |
| StreamReplica::copyReceivedFrame(fMasterReplica, replicaBeingDeactivated); |
| } else { |
| // We don't have a new master replica, so we can't copy the received frame to any new replica that might ask for it. |
| // Fortunately this should be a very rare occurrence. |
| } |
| } |
| } |
| } else { |
| // The replica that's being removed was not our 'master replica', but make sure it's not on either of our queues: |
| if (fReplicasAwaitingCurrentFrame != NULL) { |
| if (replicaBeingDeactivated == fReplicasAwaitingCurrentFrame) { |
| fReplicasAwaitingCurrentFrame = replicaBeingDeactivated->fNext; |
| replicaBeingDeactivated->fNext = NULL; |
| } |
| else { |
| for (StreamReplica* r1 = fReplicasAwaitingCurrentFrame; r1->fNext != NULL; r1 = r1->fNext) { |
| if (r1->fNext == replicaBeingDeactivated) { |
| r1->fNext = replicaBeingDeactivated->fNext; |
| replicaBeingDeactivated->fNext = NULL; |
| break; |
| } |
| } |
| } |
| } |
| if (fReplicasAwaitingNextFrame != NULL) { |
| if (replicaBeingDeactivated == fReplicasAwaitingNextFrame) { |
| fReplicasAwaitingNextFrame = replicaBeingDeactivated->fNext; |
| replicaBeingDeactivated->fNext = NULL; |
| } |
| else { |
| for (StreamReplica* r2 = fReplicasAwaitingNextFrame; r2->fNext != NULL; r2 = r2->fNext) { |
| if (r2->fNext == replicaBeingDeactivated) { |
| r2->fNext = replicaBeingDeactivated->fNext; |
| replicaBeingDeactivated->fNext = NULL; |
| break; |
| } |
| } |
| } |
| } |
| |
| // Check for the possibility that - now that a replica has been deactivated - all other |
| // replicas have received the current frame, and so now we need to complete delivery to |
| // the master replica: |
| if (fMasterReplica != NULL && fInputSource != NULL && !fInputSource->isCurrentlyAwaitingData()) deliverReceivedFrame(); |
| } |
| |
| if (fNumActiveReplicas == 0 && fInputSource != NULL) fInputSource->stopGettingFrames(); // tell our source to stop too |
| } |
| |
| void StreamReplicator::removeStreamReplica(StreamReplica* replicaBeingRemoved) { |
| // First, handle the replica that's being removed the same way that we would if it were merely being deactivated: |
| deactivateStreamReplica(replicaBeingRemoved); |
| |
| // Assert: fNumReplicas > 0 |
| if (fNumReplicas == 0) fprintf(stderr, "StreamReplicator::removeStreamReplica() Internal Error!\n"); // should not happen |
| --fNumReplicas; |
| |
| // If this was the last replica, then delete ourselves (if we were set up to do so): |
| if (fNumReplicas == 0 && fDeleteWhenLastReplicaDies) { |
| Medium::close(this); |
| return; |
| } |
| } |
| |
| void StreamReplicator::afterGettingFrame(void* clientData, unsigned frameSize, unsigned numTruncatedBytes, |
| struct timeval presentationTime, unsigned durationInMicroseconds) { |
| ((StreamReplicator*)clientData)->afterGettingFrame(frameSize, numTruncatedBytes, presentationTime, durationInMicroseconds); |
| } |
| |
| void StreamReplicator::afterGettingFrame(unsigned frameSize, unsigned numTruncatedBytes, |
| struct timeval presentationTime, unsigned durationInMicroseconds) { |
| // The frame was read into our master replica's buffer. Update the master replica's state, but don't complete delivery to it |
| // just yet. We do that later, after we're sure that we've delivered it to all other replicas. |
| fMasterReplica->fFrameSize = frameSize; |
| fMasterReplica->fNumTruncatedBytes = numTruncatedBytes; |
| fMasterReplica->fPresentationTime = presentationTime; |
| fMasterReplica->fDurationInMicroseconds = durationInMicroseconds; |
| |
| deliverReceivedFrame(); |
| } |
| |
| void StreamReplicator::onSourceClosure(void* clientData) { |
| ((StreamReplicator*)clientData)->onSourceClosure(); |
| } |
| |
| void StreamReplicator::onSourceClosure() { |
| fInputSourceHasClosed = True; |
| |
| // Signal the closure to each replica that is currently awaiting a frame: |
| StreamReplica* replica; |
| while ((replica = fReplicasAwaitingCurrentFrame) != NULL) { |
| fReplicasAwaitingCurrentFrame = replica->fNext; |
| replica->fNext = NULL; |
| replica->handleClosure(); |
| } |
| while ((replica = fReplicasAwaitingNextFrame) != NULL) { |
| fReplicasAwaitingNextFrame = replica->fNext; |
| replica->fNext = NULL; |
| replica->handleClosure(); |
| } |
| if ((replica = fMasterReplica) != NULL) { |
| fMasterReplica = NULL; |
| replica->handleClosure(); |
| } |
| } |
| |
| void StreamReplicator::deliverReceivedFrame() { |
| // The 'master replica' has received its copy of the current frame. |
| // Copy it (and complete delivery) to any other replica that has requested this frame. |
| // Then, if no more requests for this frame are expected, complete delivery to the 'master replica' itself. |
| StreamReplica* replica; |
| while ((replica = fReplicasAwaitingCurrentFrame) != NULL) { |
| fReplicasAwaitingCurrentFrame = replica->fNext; |
| replica->fNext = NULL; |
| |
| // Assert: fMasterReplica != NULL |
| if (fMasterReplica == NULL) fprintf(stderr, "StreamReplicator::deliverReceivedFrame() Internal Error 1!\n"); // shouldn't happen |
| StreamReplica::copyReceivedFrame(replica, fMasterReplica); |
| replica->fFrameIndex = 1 - replica->fFrameIndex; // toggle it (0<->1), because this replica no longer awaits the current frame |
| ++fNumDeliveriesMadeSoFar; |
| |
| // Assert: fNumDeliveriesMadeSoFar < fNumActiveReplicas; // because we still have the 'master replica' to deliver to |
| if (!(fNumDeliveriesMadeSoFar < fNumActiveReplicas)) fprintf(stderr, "StreamReplicator::deliverReceivedFrame() Internal Error 2(%d,%d)!\n", fNumDeliveriesMadeSoFar, fNumActiveReplicas); // should not happen |
| |
| // Complete delivery to this replica: |
| FramedSource::afterGetting(replica); |
| } |
| |
| if (fNumDeliveriesMadeSoFar == fNumActiveReplicas - 1 && fMasterReplica != NULL) { |
| // No more requests for this frame are expected, so complete delivery to the 'master replica': |
| replica = fMasterReplica; |
| fMasterReplica = NULL; |
| replica->fFrameIndex = 1 - replica->fFrameIndex; // toggle it (0<->1), because this replica no longer awaits the current frame |
| fFrameIndex = 1 - fFrameIndex; // toggle it (0<->1) for the next frame |
| fNumDeliveriesMadeSoFar = 0; // reset for the next frame |
| |
| if (fReplicasAwaitingNextFrame != NULL) { |
| // One of the other replicas has already requested the next frame, so make it the next 'master replica': |
| fMasterReplica = fReplicasAwaitingNextFrame; |
| fReplicasAwaitingNextFrame = fReplicasAwaitingNextFrame->fNext; |
| fMasterReplica->fNext = NULL; |
| |
| // Arrange to read the next frame into this replica's buffer: |
| if (fInputSource != NULL) fInputSource->getNextFrame(fMasterReplica->fTo, fMasterReplica->fMaxSize, |
| afterGettingFrame, this, onSourceClosure, this); |
| } |
| |
| // Move any other replicas that had already requested the next frame to the 'requesting current frame' list: |
| // Assert: fReplicasAwaitingCurrentFrame == NULL; |
| if (!(fReplicasAwaitingCurrentFrame == NULL)) fprintf(stderr, "StreamReplicator::deliverReceivedFrame() Internal Error 3!\n"); // should not happen |
| fReplicasAwaitingCurrentFrame = fReplicasAwaitingNextFrame; |
| fReplicasAwaitingNextFrame = NULL; |
| |
| // Complete delivery to the 'master' replica (thereby completing all deliveries for this frame): |
| FramedSource::afterGetting(replica); |
| } |
| } |
| |
| |
| ////////// StreamReplica implementation ////////// |
| |
| StreamReplica::StreamReplica(StreamReplicator& ourReplicator) |
| : FramedSource(ourReplicator.envir()), |
| fOurReplicator(ourReplicator), |
| fFrameIndex(-1/*we haven't started playing yet*/), fNext(NULL) { |
| } |
| |
| StreamReplica::~StreamReplica() { |
| fOurReplicator.removeStreamReplica(this); |
| } |
| |
| void StreamReplica::doGetNextFrame() { |
| fOurReplicator.getNextFrame(this); |
| } |
| |
| void StreamReplica::doStopGettingFrames() { |
| fOurReplicator.deactivateStreamReplica(this); |
| } |
| |
| void StreamReplica::copyReceivedFrame(StreamReplica* toReplica, StreamReplica* fromReplica) { |
| // First, figure out how much data to copy. ("toReplica" might have a smaller buffer than "fromReplica".) |
| unsigned numNewBytesToTruncate |
| = toReplica->fMaxSize < fromReplica->fFrameSize ? fromReplica->fFrameSize - toReplica->fMaxSize : 0; |
| toReplica->fFrameSize = fromReplica->fFrameSize - numNewBytesToTruncate; |
| toReplica->fNumTruncatedBytes = fromReplica->fNumTruncatedBytes + numNewBytesToTruncate; |
| |
| memmove(toReplica->fTo, fromReplica->fTo, toReplica->fFrameSize); |
| toReplica->fPresentationTime = fromReplica->fPresentationTime; |
| toReplica->fDurationInMicroseconds = fromReplica->fDurationInMicroseconds; |
| } |