zoukankan      html  css  js  c++  java
  • AOSP中的HLS协议解析

    [时间:2018-04] [状态:Open]
    [关键词:流媒体,stream,HLS, AOSP, 源码分析,HttpLiveSource, LiveSession,PlaylistFetcher]

    1. 引言

    本文作为HLS综述的后续文章,也是我之前对Nuplayer源码分析中GenericSource源码解析的姊妹篇。当然本文侧重于结合HLS原理来分析NuPlayer中相关实现逻辑。如果你对NuPlayer不是很了解,建议先简单了解下。

    本文重点关注的是NuPlayer中的HttpLiveSource,从概念上来讲该HttpLiveSource主要负责对m3u8进行解析,并按照HLS协议规定进行数据处理,通过HTTP下载音视频数据,交给特定的demuxer处理。

    2. HttpLiveSource接口

    HttpLiveSource继承自NuPlayer::Source,其主要接口定义如下:

    // code from ~/frameworks/av/media/libmediaplayerservice/nuplayer/HttpLiveSource.h
    struct NuPlayer::HTTPLiveSource : public NuPlayer::Source {
        HTTPLiveSource(const sp<AMessage> &notify, const sp<IMediaHTTPService> &httpService,
                const char *url, const KeyedVector<String8, String8> *headers);
    
        virtual void prepareAsync();
        virtual void start();
    
        virtual status_t dequeueAccessUnit(bool audio, sp<ABuffer> *accessUnit);
        virtual sp<AMessage> getFormat(bool audio);
    
        virtual status_t feedMoreTSData();
    	// 以下函数是获得节目信息及选择节目的
        virtual status_t getDuration(int64_t *durationUs);
        virtual size_t getTrackCount() const;
        virtual sp<AMessage> getTrackInfo(size_t trackIndex) const;
        virtual ssize_t getSelectedTrack(media_track_type /* type */) const;
        virtual status_t selectTrack(size_t trackIndex, bool select, int64_t timeUs);
        virtual status_t seekTo(int64_t seekTimeUs);
    
    protected:
        virtual ~HTTPLiveSource();
        virtual void onMessageReceived(const sp<AMessage> &msg);
    
    private:
        sp<IMediaHTTPService> mHTTPService;
        AString mURL;
        KeyedVector<String8, String8> mExtraHeaders;
        uint32_t mFlags;
        status_t mFinalResult;
        off64_t mOffset;
        sp<ALooper> mLiveLooper;
        sp<LiveSession> mLiveSession;
        int32_t mFetchSubtitleDataGeneration;
        int32_t mFetchMetaDataGeneration;
        bool mHasMetadata;
        bool mMetadataSelected;
    
        void onSessionNotify(const sp<AMessage> &msg);
        void pollForRawData(const sp<AMessage> &msg, int32_t currentGeneration,
                LiveSession::StreamType fetchType, int32_t pushWhat);
    };
    

    这里不做过多解释,基本上和NuPlayer::Source接口类似,只是重写了部分实现。

    3. NuPlayer中的关于HttpLiveSource的部分

    仅有一段提到HttpLiveSource,代码如下:

    static bool IsHTTPLiveURL(const char *url) {
        if (!strncasecmp("http://", url, 7)
                || !strncasecmp("https://", url, 8)
                || !strncasecmp("file://", url, 7)) {
            size_t len = strlen(url);
            if (len >= 5 && !strcasecmp(".m3u8", &url[len - 5])) {
                return true;
            }
    
            if (strstr(url,"m3u8")) {
                return true;
            }
        }
    
        return false;
    }
    
    void NuPlayer::setDataSourceAsync(
            const sp<IMediaHTTPService> &httpService,
            const char *url,
            const KeyedVector<String8, String8> *headers) {
    
        sp<AMessage> msg = new AMessage(kWhatSetDataSource, this);
        size_t len = strlen(url);
    
        sp<AMessage> notify = new AMessage(kWhatSourceNotify, this);
    
        sp<Source> source;
        if (IsHTTPLiveURL(url)) { // 通过URL判断协议类型
            source = new HTTPLiveSource(notify, httpService, url, headers);
        } else if (!strncasecmp(url, "rtsp://", 7)) {
            source = new RTSPSource(
                    notify, httpService, url, headers, mUIDValid, mUID);
        } else if ((!strncasecmp(url, "http://", 7)
                    || !strncasecmp(url, "https://", 8))
                        && ((len >= 4 && !strcasecmp(".sdp", &url[len - 4]))
                        || strstr(url, ".sdp?"))) {
            source = new RTSPSource(
                    notify, httpService, url, headers, mUIDValid, mUID, true);
        } else {
            sp<GenericSource> genericSource =
                    new GenericSource(notify, mUIDValid, mUID);
            // Don't set FLAG_SECURE on mSourceFlags here for widevine.
            // The correct flags will be updated in Source::kWhatFlagsChanged
            // handler when  GenericSource is prepared.
    
            status_t err = genericSource->setDataSource(httpService, url, headers);
    
            if (err == OK) {
                source = genericSource;
            } else {
                ALOGE("Failed to set data source!");
            }
        }
        msg->setObject("source", source);
        msg->post();
    }
    

    代码逻辑相对简单,直接根据URL判断协议类型,创建对应的HttpLiveSource。然后就是使用NuPlayer::Source的通用接口。

    4. HttpLiveSource的部分接口实现

    这部分主要介绍HttpLiveSource的几个核心接口,比如构造函数、prepareAsync、start、seekTo等。

    4.1 构造及析构函数

    代码如下:

    NuPlayer::HTTPLiveSource::HTTPLiveSource(
            const sp<AMessage> &notify,
            const sp<IMediaHTTPService> &httpService,
            const char *url,
            const KeyedVector<String8, String8> *headers)
        : Source(notify),
          mHTTPService(httpService), // HTTP通信模块
          mURL(url), mFlags(0), mFinalResult(OK),
          mOffset(0),mFetchSubtitleDataGeneration(0),
          mFetchMetaDataGeneration(0),
          mHasMetadata(false), mMetadataSelected(false) {
    	// ...额外对headers的处理逻辑
    }
    
    NuPlayer::HTTPLiveSource::~HTTPLiveSource() {
        if (mLiveSession != NULL) {
            mLiveSession->disconnect();
    
            mLiveLooper->unregisterHandler(mLiveSession->id());
            mLiveLooper->unregisterHandler(id());
            mLiveLooper->stop();
    
            mLiveSession.clear();
            mLiveLooper.clear();
        }
    }
    

    4.2 prepareAsync / start函数

    这两个函数实现代码都不复杂。代码如下:

    void NuPlayer::HTTPLiveSource::prepareAsync() {
        if (mLiveLooper == NULL) {
            mLiveLooper = new ALooper;
            mLiveLooper->setName("http live");
            mLiveLooper->start();
    
            mLiveLooper->registerHandler(this);
        }
    
        sp<AMessage> notify = new AMessage(kWhatSessionNotify, this);
    	// 创建LiveSession并启动连接
        mLiveSession = new LiveSession(
                notify,
                (mFlags & kFlagIncognito) ? LiveSession::kFlagIncognito : 0,
                mHTTPService);
    
        mLiveLooper->registerHandler(mLiveSession);
    
        mLiveSession->connectAsync(
                mURL.c_str(), mExtraHeaders.isEmpty() ? NULL : &mExtraHeaders);
    }
    void NuPlayer::HTTPLiveSource::start() {}
    

    4.3 seekTo函数

    seek的实现代码也很简单,直接让LiveSession完成,代码如下:

    status_t NuPlayer::HTTPLiveSource::seekTo(int64_t seekTimeUs) {
        return mLiveSession->seekTo(seekTimeUs);
    }
    

    看完上面的三个主要函数,貌似HttpLiveSource中并没有关于HLS协议解析及多媒体数据demux的处理。好吧,还是分析下LiveSession的代码吧。

    5 LiveSession类的实现分析

    这部分主要目的是解释清楚LiveSession中如何对HLS协议进行解析,如何获得音视频格式,以及如何完成demuxer读入音视频数据包的功能?
    当然LiveSession中其实包含带宽估计功能,根据带宽估计自动执行HLS的切换。
    从第4节可以知道,HttpLiveSource仅调用了LiveSession.connectAsync接口,剩下的处理逻辑如何呢?
    首先我们看一下LiveSession::connectAsync接口的实现代码:

    void LiveSession::connectAsync(
            const char *url, const KeyedVector<String8, String8> *headers) {
        sp<AMessage> msg = new AMessage(kWhatConnect, this);
        msg->setString("url", url);
    
        if (headers != NULL) {
            msg->setPointer(
                    "headers",
                    new KeyedVector<String8, String8>(*headers));
        }
        msg->post();
    }
    

    这里主要逻辑是发送kWhatConnect消息,其处理函数如下:

    //void LiveSession::onMessageReceived(const sp<AMessage> &msg) {
        switch (msg->what()) {
            case kWhatConnect:
            {
                onConnect(msg);
                break;
            }\ ...
    
    void LiveSession::onConnect(const sp<AMessage> &msg) {
        CHECK(msg->findString("url", &mMasterURL));
        // ...
    
        // create looper for fetchers
        if (mFetcherLooper == NULL) {
            mFetcherLooper = new ALooper();
    
            mFetcherLooper->setName("Fetcher");
            mFetcherLooper->start(false, false);
        }
    
        // 创建 master playerlist fetcher并开始请求数据
        addFetcher(mMasterURL.c_str())->fetchPlaylistAsync();
    }
    

    到这里发现,HLS协议中master playlist的解析位置,PlaylistFetcher。我们看一下相关的代码:

    void PlaylistFetcher::fetchPlaylistAsync() { // 发消息
        (new AMessage(kWhatFetchPlaylist, this))->post();
    }
    void PlaylistFetcher::onMessageReceived(const sp<AMessage> &msg) {
       case kWhatFetchPlaylist:
        {
            bool unchanged;
    		// 最核心的处理逻辑,通过HTTPDownloader::fetchPlaylist获取m3u8的内容
            sp<M3UParser> playlist = mHTTPDownloader->fetchPlaylist(
                    mURI.c_str(), NULL /* curPlaylistHash */, &unchanged);
    
            sp<AMessage> notify = mNotify->dup();
            notify->setInt32("what", kWhatPlaylistFetched);
            notify->setObject("playlist", playlist);
            notify->post();
            break;
        }
    }
    

    到这里,大家应该都看到最后一个隐藏的函数是HTTPDownloader::fetchPlaylist,猜测一下其基本功能就是通过HTTP模块下载给定url的m3u8文件,并通过M3UParser解析之,并将其返回。下面是实现代码:

    ssize_t HTTPDownloader::fetchFile(
            const char *url, sp<ABuffer> *out, String8 *actualUrl) {
        ssize_t err = fetchBlock(url, out, 0, -1, 0, actualUrl, true /* reconnect */);
    
        // close off the connection after use
        mHTTPDataSource->disconnect();
    
        return err;
    }
    
    sp<M3UParser> HTTPDownloader::fetchPlaylist(
            const char *url, uint8_t *curPlaylistHash, bool *unchanged) {
        *unchanged = false;
    
        sp<ABuffer> buffer;
        String8 actualUrl;
    	// HTTP协议通信,数据放到buffer里面
        ssize_t err = fetchFile(url, &buffer, &actualUrl);
    
        // close off the connection after use
        mHTTPDataSource->disconnect();
    
        if (err <= 0) {return NULL;}
    	
    	// 字符串解析及HLS协议解析
        sp<M3UParser> playlist =
            new M3UParser(actualUrl.string(), buffer->data(), buffer->size());
    
        if (playlist->initCheck() != OK) return NULL;
    
        return playlist;
    }
    

    通过HTTPDownloader::fetchPlaylist,我们已经拿到了m3u8中具体内容,可以继续处理kWhatFetchPlaylist消息了,之后注册的消息向LiveSession发送kWhatPlaylistFetched消息。其处理函数如下:

    void LiveSession::onMessageReceived(const sp<AMessage> &msg) {
    	case PlaylistFetcher::kWhatPlaylistFetched: {
            onMasterPlaylistFetched(msg);
            break;
        }
    }
    
    void LiveSession::onMasterPlaylistFetched(const sp<AMessage> &msg) {
    	// 检查fetcher的有效性,并删除已完成的fetcher    
    	AString uri;
        ssize_t index = mFetcherInfos.indexOfKey(uri);
        if (index < 0) {
            ALOGW("fetcher for master playlist is gone.");
            return;
        }
    
        // no longer useful, remove
        mFetcherLooper->unregisterHandler(mFetcherInfos[index].mFetcher->id());
        mFetcherInfos.removeItemsAt(index);
    
    	// sp<M3UParser> mPlaylist;
        CHECK(msg->findObject("playlist", (sp<RefBase> *)&mPlaylist));
        if (mPlaylist == NULL) { // 解析m3u8失败,发送错误消息
            postPrepared(ERROR_IO);
            return;
        }
    
    	// 将variant相关属性放到带宽估计的参数里面
        size_t initialBandwidth = 0;
        size_t initialBandwidthIndex = 0;
        int32_t maxWidth = 0;
        int32_t maxHeight = 0;
    
        if (mPlaylist->isVariantPlaylist()) {
            Vector<BandwidthItem> itemsWithVideo;
            for (size_t i = 0; i < mPlaylist->size(); ++i) {
                BandwidthItem item;
                item.mPlaylistIndex = i;
                item.mLastFailureUs = -1ll;
    
                sp<AMessage> meta;
                AString uri;
                mPlaylist->itemAt(i, &uri, &meta);
    
                CHECK(meta->findInt32("bandwidth", (int32_t *)&item.mBandwidth));
    
                int32_t width, height;
                if (meta->findInt32("width", &width)) {
                    maxWidth = max(maxWidth, width);
                }
                if (meta->findInt32("height", &height)) {
                    maxHeight = max(maxHeight, height);
                }
    
                mBandwidthItems.push(item);
                if (mPlaylist->hasType(i, "video")) {
                    itemsWithVideo.push(item);
                }
            }
    
            CHECK_GT(mBandwidthItems.size(), 0u);
            initialBandwidth = mBandwidthItems[0].mBandwidth;
    
            mBandwidthItems.sort(SortByBandwidth);
    
            for (size_t i = 0; i < mBandwidthItems.size(); ++i) {
                if (mBandwidthItems.itemAt(i).mBandwidth == initialBandwidth) {
                    initialBandwidthIndex = i;
                    break;
                }
            }
        }
    
        mMaxWidth = maxWidth > 0 ? maxWidth : mMaxWidth;
        mMaxHeight = maxHeight > 0 ? maxHeight : mMaxHeight;
    	// 选择一个variant(默认使用第一个,最简单的实现)
        mPlaylist->pickRandomMediaItems();
        changeConfiguration(
                0ll /* timeUs */, initialBandwidthIndex, false /* pickTrack */);
    }
    void LiveSession::changeConfiguration(
            int64_t timeUs, ssize_t bandwidthIndex, bool pickTrack) {
        cancelBandwidthSwitch();
    
        CHECK(!mReconfigurationInProgress);
        mReconfigurationInProgress = true;
        if (bandwidthIndex >= 0) {
            mOrigBandwidthIndex = mCurBandwidthIndex;
            mCurBandwidthIndex = bandwidthIndex;
        }
        CHECK_LT(mCurBandwidthIndex, mBandwidthItems.size());
        const BandwidthItem &item = mBandwidthItems.itemAt(mCurBandwidthIndex);
    
        uint32_t streamMask = 0; // streams that should be fetched by the new fetcher
        uint32_t resumeMask = 0; // streams that should be fetched by the original fetcher
    
        AString URIs[kMaxStreams];
        for (size_t i = 0; i < kMaxStreams; ++i) {
            if (mPlaylist->getTypeURI(item.mPlaylistIndex, mStreams[i].mType, &URIs[i])) {
                streamMask |= indexToType(i);
            }
        }
    
        // Step 1, 停止不再需要的fetcher,将其暂停以便后续重用
        for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
            // 设置mToBeRemoved之后不能重用了
            if (mFetcherInfos[i].mToBeRemoved)
                continue;
    
            const AString &uri = mFetcherInfos.keyAt(i);
            sp<PlaylistFetcher> &fetcher = mFetcherInfos.editValueAt(i).mFetcher;
    
            bool discardFetcher = true, delayRemoval = false;
            for (size_t j = 0; j < kMaxStreams; ++j) {
                StreamType type = indexToType(j);
                if ((streamMask & type) && uri == URIs[j]) {
                    resumeMask |= type;
                    streamMask &= ~type;
                    discardFetcher = false;
                }
            }
            // Delay fetcher removal
            if (discardFetcher && timeUs < 0ll && !pickTrack
                    && (fetcher->getStreamTypeMask() & streamMask)) {
                discardFetcher = false;
                delayRemoval = true;
            }
    
            if (discardFetcher) {
                ALOGV("discarding fetcher-%d", fetcher->getFetcherID());
                fetcher->stopAsync();
            } else {
                float threshold = 0.0f; // default to pause after current block (47Kbytes)
                bool disconnect = false;
                if (timeUs >= 0ll) {
                    // seeking, no need to finish fetching
                    disconnect = true;
                } else if (delayRemoval) {
                    // adapting, abort if remaining of current segment is over threshold
                    threshold = getAbortThreshold(
                            mOrigBandwidthIndex, mCurBandwidthIndex);
                }
                fetcher->pauseAsync(threshold, disconnect);
            }
        }
    
        sp<AMessage> msg;
        if (timeUs < 0ll) {
            // skip onChangeConfiguration2 (decoder destruction) if not seeking.
            msg = new AMessage(kWhatChangeConfiguration3, this);
        } else {
            msg = new AMessage(kWhatChangeConfiguration2, this);
        }
        msg->setInt32("streamMask", streamMask);
        msg->setInt32("resumeMask", resumeMask);
        msg->setInt32("pickTrack", pickTrack);
        msg->setInt64("timeUs", timeUs);
        for (size_t i = 0; i < kMaxStreams; ++i) {
            if ((streamMask | resumeMask) & indexToType(i)) {
                msg->setString(mStreams[i].uriKey().c_str(), URIs[i].c_str());
            }
        }
    
        mContinuationCounter = mFetcherInfos.size();
        mContinuation = msg;
    
        if (mContinuationCounter == 0) {
            msg->post();
        }
    }
    

    上面仅仅做了些参数检查以及fetcher资源整理,之后是有两个消息kWhatChangeConfiguration3和kWhatChangeConfiguration2,后者比前者多了一个decoder销毁的处理。接下来我们依次看一下这两个函数的实现吧。

    void LiveSession::onChangeConfiguration2(const sp<AMessage> &msg) {
        mContinuation.clear();
    
        // All fetchers are either suspended or have been removed now.
    
        // 对于seek的情况,先清除之前暂存的数据
        int64_t timeUs;
        CHECK(msg->findInt64("timeUs", &timeUs));
    
        if (timeUs >= 0) {
            mLastSeekTimeUs = timeUs;
            mLastDequeuedTimeUs = timeUs;
    
            for (size_t i = 0; i < mPacketSources.size(); i++) {
                sp<AnotherPacketSource> packetSource = mPacketSources.editValueAt(i);
                sp<MetaData> format = packetSource->getFormat();
                packetSource->setFormat(format);
            }
    
            for (size_t i = 0; i < kMaxStreams; ++i) {
                mStreams[i].reset();
            }
    
            mDiscontinuityOffsetTimesUs.clear();
            mDiscontinuityAbsStartTimesUs.clear();
    
            if (mSeekReplyID != NULL) {
                CHECK(mSeekReply != NULL);
                mSeekReply->setInt32("err", OK);
                mSeekReply->postReply(mSeekReplyID);
                mSeekReplyID.clear();
                mSeekReply.clear();
            }
    
            // seek之后重置下缓冲状态,这将是整个HLS流播放的驱动源
            restartPollBuffering();
        }
    
        uint32_t streamMask, resumeMask;
        CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask));
        CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask));
        streamMask |= resumeMask;
    
        AString URIs[kMaxStreams];
        for (size_t i = 0; i < kMaxStreams; ++i) {
            if (streamMask & indexToType(i)) {
                const AString &uriKey = mStreams[i].uriKey();
                CHECK(msg->findString(uriKey.c_str(), &URIs[i]));
                ALOGV("%s = '%s'", uriKey.c_str(), URIs[i].c_str());
            }
        }
    
        uint32_t changedMask = 0;
        for (size_t i = 0; i < kMaxStreams && i != kSubtitleIndex; ++i) {
            // 在码流切换的情况下,发生seek后流的URL可能变化
            // 这种情况下,取消码流切换,并将seekPos应用到新流上
            if ((mStreamMask & streamMask & indexToType(i))
                    && !mStreams[i].mUri.empty()
                    && !(URIs[i] == mStreams[i].mUri)) {
                sp<AnotherPacketSource> source = mPacketSources.valueFor(indexToType(i));
                if (source->getLatestDequeuedMeta() != NULL) {
                    source->queueDiscontinuity(
                            ATSParser::DISCONTINUITY_FORMATCHANGE, NULL, true);
                }
            }
    		// 判断解码器是否需要关闭
            if ((mStreamMask & ~streamMask & indexToType(i))) {
                changedMask |= indexToType(i);
            }
        }
    
        if (changedMask == 0) {
            // 音视频解码器都没有变化的话,可以直接处理
            onChangeConfiguration3(msg);
            return;
        }
    	// 给NuPlayer发送通知,需要关闭对应解码器,结束之后回发kWhatChangeConfiguration3消息
        sp<AMessage> notify = mNotify->dup();
        notify->setInt32("what", kWhatStreamsChanged);
        notify->setInt32("changedMask", changedMask);
    
        msg->setWhat(kWhatChangeConfiguration3);
        msg->setTarget(this);
    
        notify->setMessage("reply", msg);
        notify->post();
    }
    
    void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) {
        mContinuation.clear();
        // All remaining fetchers are still suspended, the player has shutdown
        // any decoders that needed it.
    
        uint32_t streamMask, resumeMask;
        CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask));
        CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask));
    
        mNewStreamMask = streamMask | resumeMask;
    
        int64_t timeUs;
        int32_t pickTrack;
        bool switching = false;
        CHECK(msg->findInt64("timeUs", &timeUs));
        CHECK(msg->findInt32("pickTrack", &pickTrack));
    	// timeUs负值表示刚启动
        if (timeUs < 0ll) {
            if (!pickTrack) {
                // 判断是否需要切换
                mSwapMask =  mNewStreamMask & mStreamMask & ~resumeMask;
                switching = (mSwapMask != 0);
            }
            mRealTimeBaseUs = ALooper::GetNowUs() - mLastDequeuedTimeUs;
        } else {
            mRealTimeBaseUs = ALooper::GetNowUs() - timeUs;
        }
    
        for (size_t i = 0; i < kMaxStreams; ++i) {
            if (streamMask & indexToType(i)) {
                if (switching) {
                    CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mNewUri));
                } else {
                    CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mUri));
                }
            }
        }
    
        // Of all existing fetchers:
        // * Resume fetchers that are still needed and assign them original packet sources.
        // * Mark otherwise unneeded fetchers for removal.
        for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
            const AString &uri = mFetcherInfos.keyAt(i);
            if (!resumeFetcher(uri, resumeMask, timeUs))
                mFetcherInfos.editValueAt(i).mToBeRemoved = true;
        }
    
        // 到此,streamMask中仅包含需要新建的fetcher
        if (streamMask != 0) {
            ALOGV("creating new fetchers for mask 0x%08x", streamMask);
        }
    
        // Find out when the original fetchers have buffered up to and start the new fetchers
        // at a later timestamp.
        for (size_t i = 0; i < kMaxStreams; i++) {
            if (!(indexToType(i) & streamMask)) {
                continue;
            }
    
            AString uri;
            uri = switching ? mStreams[i].mNewUri : mStreams[i].mUri;
    
            sp<PlaylistFetcher> fetcher = addFetcher(uri.c_str());
            CHECK(fetcher != NULL);
    
            HLSTime startTime;
            SeekMode seekMode = kSeekModeExactPosition;
            sp<AnotherPacketSource> sources[kNumSources];
    
            if (i == kSubtitleIndex || (!pickTrack && !switching)) {
                startTime = latestMediaSegmentStartTime();
            }
    
            for (size_t j = i; j < kMaxStreams; ++j) {
                const AString &streamUri = switching ? mStreams[j].mNewUri : mStreams[j].mUri;
                if ((streamMask & indexToType(j)) && uri == streamUri) {
                    sources[j] = mPacketSources.valueFor(indexToType(j));
    
                    if (timeUs >= 0) {
                        startTime.mTimeUs = timeUs;
                    } else {
                        int32_t type;
                        sp<AMessage> meta;
                        if (!switching) {
                            // selecting, or adapting but no swap required
                            meta = sources[j]->getLatestDequeuedMeta();
                        } else {
                            // adapting and swap required
                            meta = sources[j]->getLatestEnqueuedMeta();
                            if (meta != NULL && mCurBandwidthIndex > mOrigBandwidthIndex) {
                                // switching up
                                meta = sources[j]->getMetaAfterLastDequeued(mUpSwitchMargin);
                            }
                        }
    
                        if ((j == kAudioIndex || j == kVideoIndex)
                                && meta != NULL && !meta->findInt32("discontinuity", &type)) {
                            HLSTime tmpTime(meta);
                            if (startTime < tmpTime) {
                                startTime = tmpTime;
                            }
                        }
    
                        if (!switching) {
                            // selecting, or adapting but no swap required
                            sources[j]->clear();
                            if (j == kSubtitleIndex) {
                                break;
                            }
                            sources[j]->queueDiscontinuity(
                                    ATSParser::DISCONTINUITY_FORMAT_ONLY, NULL, true);
                        } else {
                            // switching, queue discontinuities after resume
                            sources[j] = mPacketSources2.valueFor(indexToType(j));
                            sources[j]->clear();
                            // the new fetcher might be providing streams that used to be
                            // provided by two different fetchers,  if one of the fetcher
                            // paused in the middle while the other somehow paused in next
                            // seg, we have to start from next seg.
                            if (seekMode < mStreams[j].mSeekMode) {
                                seekMode = mStreams[j].mSeekMode;
                            }
                        }
                    }
    
                    streamMask &= ~indexToType(j);
                }
            }
    
            fetcher->startAsync(
                    sources[kAudioIndex],
                    sources[kVideoIndex],
                    sources[kSubtitleIndex],
                    getMetadataSource(sources, mNewStreamMask, switching),
                    startTime.mTimeUs < 0 ? mLastSeekTimeUs : startTime.mTimeUs,
                    startTime.getSegmentTimeUs(),
                    startTime.mSeq,
                    seekMode);
        }
    
        // All fetchers have now been started, the configuration change
        // has completed.
    
        mReconfigurationInProgress = false;
        if (switching) {
            mSwitchInProgress = true;
        } else {
            mStreamMask = mNewStreamMask;
            if (mOrigBandwidthIndex != mCurBandwidthIndex)
                mOrigBandwidthIndex = mCurBandwidthIndex;
        }
    
        if (mDisconnectReplyID != NULL) {
            finishDisconnect();// 这个属于响应退出逻辑
        }
    }
    

    总结一下,connectAsync的完整处理逻辑基本上都在上面了,从URL开始,下载HLS的master playlist,然后解析并初始化fetcher及decoder,最后开始下载segment数据,并解析之。上面还少了一个函数,restartPollBuffering,这是我们读取缓冲区状态的驱动。下面是其实现:

    void LiveSession::schedulePollBuffering() {
        sp<AMessage> msg = new AMessage(kWhatPollBuffering, this);
        msg->setInt32("generation", mPollBufferingGeneration);
        msg->post(1000000ll);// 10ms下载一次
    }
    void LiveSession::cancelPollBuffering() {
        ++mPollBufferingGeneration;
        mPrevBufferPercentage = -1;
    }
    
    void LiveSession::restartPollBuffering() {
        cancelPollBuffering(); // 取消之前的数据下载
        onPollBuffering(); // 重新下载数据
    }
    
    void LiveSession::onPollBuffering() {
        bool underflow, ready, down, up;
        if (checkBuffering(underflow, ready, down, up)) {
            if (mInPreparationPhase) {
                // 支持在preparing时下切
                if (!switchBandwidthIfNeeded(false /* up */, down) && ready) {
                    postPrepared(OK);
                }
            }
    
            if (!mInPreparationPhase) {
                if (ready) {
                    stopBufferingIfNecessary();
                } else if (underflow) {
                    startBufferingIfNecessary();
                }
                switchBandwidthIfNeeded(up, down);
            }
        }
    	// 递归执行此函数
        schedulePollBuffering();
    }
    
    // 消息处理例程部分代码
    case kWhatPollBuffering:
    {
        int32_t generation;
        CHECK(msg->findInt32("generation", &generation));
        if (generation == mPollBufferingGeneration) {
            onPollBuffering();
        }
        break;
    }
    

    从这里发现,其实这就是对已下载数据的判断,并没有下载控制逻辑,所以实际下载代码应该在PlaylistFetcher中。有兴趣的可以去看一下。当然LiveSession中也有带宽估计及切换的逻辑,需要的话可以参考下。
    seekTo的实现会简单一点,最终通过onSeek实现,代码如下:

    void LiveSession::onSeek(const sp<AMessage> &msg) {
        int64_t timeUs;
        CHECK(msg->findInt64("timeUs", &timeUs));
        changeConfiguration(timeUs); // 这跟启动时的情况差不多
    }
    

    6 小结

    本文参考AOSP 7的源代码,简单梳理了下HttpLiveSource对HLS的解析处理逻辑,整理本文的目的仅仅是为了加深这方面的理解。当然本文没有很细节的协议解析以及HLS variant切换的逻辑。所以,仅供参考。

    6.1 参考文献

  • 相关阅读:
    最近学习下,nohup和&的区别
    java 关键字
    iOS之事件穿透
    排序算法——快速排序
    分布式-选举算法
    分布式选举算法
    SolrCloud 分布式集群部署步骤
    linux 启动两个tomcat
    solr安装-tomcat+solrCloud构建稳健solr集群
    JAVA 中BIO,NIO,AIO的理解
  • 原文地址:https://www.cnblogs.com/tocy/p/HLS-impl-in-android-source.html
Copyright © 2011-2022 走看看