[时间:2018-04] [状态:Open]
[关键词:流媒体,stream,HLS, AOSP, 源码分析,HttpLiveSource, LiveSession,PlaylistFetcher]
1. 引言
本文作为HLS综述的后续文章,也是我之前对Nuplayer源码分析中GenericSource源码解析的姊妹篇。当然本文侧重于结合HLS原理来分析NuPlayer中相关实现逻辑。如果你对NuPlayer不是很了解,建议先简单了解下。
本文重点关注的是NuPlayer中的HttpLiveSource,从概念上来讲该HttpLiveSource主要负责对m3u8进行解析,并按照HLS协议规定进行数据处理,通过HTTP下载音视频数据,交给特定的demuxer处理。
2. HttpLiveSource接口
HttpLiveSource继承自NuPlayer::Source,其主要接口定义如下:
// code from ~/frameworks/av/media/libmediaplayerservice/nuplayer/HttpLiveSource.h
struct NuPlayer::HTTPLiveSource : public NuPlayer::Source {
HTTPLiveSource(const sp<AMessage> ¬ify, const sp<IMediaHTTPService> &httpService,
const char *url, const KeyedVector<String8, String8> *headers);
virtual void prepareAsync();
virtual void start();
virtual status_t dequeueAccessUnit(bool audio, sp<ABuffer> *accessUnit);
virtual sp<AMessage> getFormat(bool audio);
virtual status_t feedMoreTSData();
// 以下函数是获得节目信息及选择节目的
virtual status_t getDuration(int64_t *durationUs);
virtual size_t getTrackCount() const;
virtual sp<AMessage> getTrackInfo(size_t trackIndex) const;
virtual ssize_t getSelectedTrack(media_track_type /* type */) const;
virtual status_t selectTrack(size_t trackIndex, bool select, int64_t timeUs);
virtual status_t seekTo(int64_t seekTimeUs);
protected:
virtual ~HTTPLiveSource();
virtual void onMessageReceived(const sp<AMessage> &msg);
private:
sp<IMediaHTTPService> mHTTPService;
AString mURL;
KeyedVector<String8, String8> mExtraHeaders;
uint32_t mFlags;
status_t mFinalResult;
off64_t mOffset;
sp<ALooper> mLiveLooper;
sp<LiveSession> mLiveSession;
int32_t mFetchSubtitleDataGeneration;
int32_t mFetchMetaDataGeneration;
bool mHasMetadata;
bool mMetadataSelected;
void onSessionNotify(const sp<AMessage> &msg);
void pollForRawData(const sp<AMessage> &msg, int32_t currentGeneration,
LiveSession::StreamType fetchType, int32_t pushWhat);
};
这里不做过多解释,基本上和NuPlayer::Source接口类似,只是重写了部分实现。
3. NuPlayer中的关于HttpLiveSource的部分
仅有一段提到HttpLiveSource,代码如下:
static bool IsHTTPLiveURL(const char *url) {
if (!strncasecmp("http://", url, 7)
|| !strncasecmp("https://", url, 8)
|| !strncasecmp("file://", url, 7)) {
size_t len = strlen(url);
if (len >= 5 && !strcasecmp(".m3u8", &url[len - 5])) {
return true;
}
if (strstr(url,"m3u8")) {
return true;
}
}
return false;
}
void NuPlayer::setDataSourceAsync(
const sp<IMediaHTTPService> &httpService,
const char *url,
const KeyedVector<String8, String8> *headers) {
sp<AMessage> msg = new AMessage(kWhatSetDataSource, this);
size_t len = strlen(url);
sp<AMessage> notify = new AMessage(kWhatSourceNotify, this);
sp<Source> source;
if (IsHTTPLiveURL(url)) { // 通过URL判断协议类型
source = new HTTPLiveSource(notify, httpService, url, headers);
} else if (!strncasecmp(url, "rtsp://", 7)) {
source = new RTSPSource(
notify, httpService, url, headers, mUIDValid, mUID);
} else if ((!strncasecmp(url, "http://", 7)
|| !strncasecmp(url, "https://", 8))
&& ((len >= 4 && !strcasecmp(".sdp", &url[len - 4]))
|| strstr(url, ".sdp?"))) {
source = new RTSPSource(
notify, httpService, url, headers, mUIDValid, mUID, true);
} else {
sp<GenericSource> genericSource =
new GenericSource(notify, mUIDValid, mUID);
// Don't set FLAG_SECURE on mSourceFlags here for widevine.
// The correct flags will be updated in Source::kWhatFlagsChanged
// handler when GenericSource is prepared.
status_t err = genericSource->setDataSource(httpService, url, headers);
if (err == OK) {
source = genericSource;
} else {
ALOGE("Failed to set data source!");
}
}
msg->setObject("source", source);
msg->post();
}
代码逻辑相对简单,直接根据URL判断协议类型,创建对应的HttpLiveSource。然后就是使用NuPlayer::Source的通用接口。
4. HttpLiveSource的部分接口实现
这部分主要介绍HttpLiveSource的几个核心接口,比如构造函数、prepareAsync、start、seekTo等。
4.1 构造及析构函数
代码如下:
NuPlayer::HTTPLiveSource::HTTPLiveSource(
const sp<AMessage> ¬ify,
const sp<IMediaHTTPService> &httpService,
const char *url,
const KeyedVector<String8, String8> *headers)
: Source(notify),
mHTTPService(httpService), // HTTP通信模块
mURL(url), mFlags(0), mFinalResult(OK),
mOffset(0),mFetchSubtitleDataGeneration(0),
mFetchMetaDataGeneration(0),
mHasMetadata(false), mMetadataSelected(false) {
// ...额外对headers的处理逻辑
}
NuPlayer::HTTPLiveSource::~HTTPLiveSource() {
if (mLiveSession != NULL) {
mLiveSession->disconnect();
mLiveLooper->unregisterHandler(mLiveSession->id());
mLiveLooper->unregisterHandler(id());
mLiveLooper->stop();
mLiveSession.clear();
mLiveLooper.clear();
}
}
4.2 prepareAsync / start函数
这两个函数实现代码都不复杂。代码如下:
void NuPlayer::HTTPLiveSource::prepareAsync() {
if (mLiveLooper == NULL) {
mLiveLooper = new ALooper;
mLiveLooper->setName("http live");
mLiveLooper->start();
mLiveLooper->registerHandler(this);
}
sp<AMessage> notify = new AMessage(kWhatSessionNotify, this);
// 创建LiveSession并启动连接
mLiveSession = new LiveSession(
notify,
(mFlags & kFlagIncognito) ? LiveSession::kFlagIncognito : 0,
mHTTPService);
mLiveLooper->registerHandler(mLiveSession);
mLiveSession->connectAsync(
mURL.c_str(), mExtraHeaders.isEmpty() ? NULL : &mExtraHeaders);
}
void NuPlayer::HTTPLiveSource::start() {}
4.3 seekTo函数
seek的实现代码也很简单,直接让LiveSession完成,代码如下:
status_t NuPlayer::HTTPLiveSource::seekTo(int64_t seekTimeUs) {
return mLiveSession->seekTo(seekTimeUs);
}
看完上面的三个主要函数,貌似HttpLiveSource中并没有关于HLS协议解析及多媒体数据demux的处理。好吧,还是分析下LiveSession的代码吧。
5 LiveSession类的实现分析
这部分主要目的是解释清楚LiveSession中如何对HLS协议进行解析,如何获得音视频格式,以及如何完成demuxer读入音视频数据包的功能?
当然LiveSession中其实包含带宽估计功能,根据带宽估计自动执行HLS的切换。
从第4节可以知道,HttpLiveSource仅调用了LiveSession.connectAsync接口,剩下的处理逻辑如何呢?
首先我们看一下LiveSession::connectAsync接口的实现代码:
void LiveSession::connectAsync(
const char *url, const KeyedVector<String8, String8> *headers) {
sp<AMessage> msg = new AMessage(kWhatConnect, this);
msg->setString("url", url);
if (headers != NULL) {
msg->setPointer(
"headers",
new KeyedVector<String8, String8>(*headers));
}
msg->post();
}
这里主要逻辑是发送kWhatConnect消息,其处理函数如下:
//void LiveSession::onMessageReceived(const sp<AMessage> &msg) {
switch (msg->what()) {
case kWhatConnect:
{
onConnect(msg);
break;
}\ ...
void LiveSession::onConnect(const sp<AMessage> &msg) {
CHECK(msg->findString("url", &mMasterURL));
// ...
// create looper for fetchers
if (mFetcherLooper == NULL) {
mFetcherLooper = new ALooper();
mFetcherLooper->setName("Fetcher");
mFetcherLooper->start(false, false);
}
// 创建 master playerlist fetcher并开始请求数据
addFetcher(mMasterURL.c_str())->fetchPlaylistAsync();
}
到这里发现,HLS协议中master playlist的解析位置,PlaylistFetcher。我们看一下相关的代码:
void PlaylistFetcher::fetchPlaylistAsync() { // 发消息
(new AMessage(kWhatFetchPlaylist, this))->post();
}
void PlaylistFetcher::onMessageReceived(const sp<AMessage> &msg) {
case kWhatFetchPlaylist:
{
bool unchanged;
// 最核心的处理逻辑,通过HTTPDownloader::fetchPlaylist获取m3u8的内容
sp<M3UParser> playlist = mHTTPDownloader->fetchPlaylist(
mURI.c_str(), NULL /* curPlaylistHash */, &unchanged);
sp<AMessage> notify = mNotify->dup();
notify->setInt32("what", kWhatPlaylistFetched);
notify->setObject("playlist", playlist);
notify->post();
break;
}
}
到这里,大家应该都看到最后一个隐藏的函数是HTTPDownloader::fetchPlaylist,猜测一下其基本功能就是通过HTTP模块下载给定url的m3u8文件,并通过M3UParser解析之,并将其返回。下面是实现代码:
ssize_t HTTPDownloader::fetchFile(
const char *url, sp<ABuffer> *out, String8 *actualUrl) {
ssize_t err = fetchBlock(url, out, 0, -1, 0, actualUrl, true /* reconnect */);
// close off the connection after use
mHTTPDataSource->disconnect();
return err;
}
sp<M3UParser> HTTPDownloader::fetchPlaylist(
const char *url, uint8_t *curPlaylistHash, bool *unchanged) {
*unchanged = false;
sp<ABuffer> buffer;
String8 actualUrl;
// HTTP协议通信,数据放到buffer里面
ssize_t err = fetchFile(url, &buffer, &actualUrl);
// close off the connection after use
mHTTPDataSource->disconnect();
if (err <= 0) {return NULL;}
// 字符串解析及HLS协议解析
sp<M3UParser> playlist =
new M3UParser(actualUrl.string(), buffer->data(), buffer->size());
if (playlist->initCheck() != OK) return NULL;
return playlist;
}
通过HTTPDownloader::fetchPlaylist,我们已经拿到了m3u8中具体内容,可以继续处理kWhatFetchPlaylist消息了,之后注册的消息向LiveSession发送kWhatPlaylistFetched消息。其处理函数如下:
void LiveSession::onMessageReceived(const sp<AMessage> &msg) {
case PlaylistFetcher::kWhatPlaylistFetched: {
onMasterPlaylistFetched(msg);
break;
}
}
void LiveSession::onMasterPlaylistFetched(const sp<AMessage> &msg) {
// 检查fetcher的有效性,并删除已完成的fetcher
AString uri;
ssize_t index = mFetcherInfos.indexOfKey(uri);
if (index < 0) {
ALOGW("fetcher for master playlist is gone.");
return;
}
// no longer useful, remove
mFetcherLooper->unregisterHandler(mFetcherInfos[index].mFetcher->id());
mFetcherInfos.removeItemsAt(index);
// sp<M3UParser> mPlaylist;
CHECK(msg->findObject("playlist", (sp<RefBase> *)&mPlaylist));
if (mPlaylist == NULL) { // 解析m3u8失败,发送错误消息
postPrepared(ERROR_IO);
return;
}
// 将variant相关属性放到带宽估计的参数里面
size_t initialBandwidth = 0;
size_t initialBandwidthIndex = 0;
int32_t maxWidth = 0;
int32_t maxHeight = 0;
if (mPlaylist->isVariantPlaylist()) {
Vector<BandwidthItem> itemsWithVideo;
for (size_t i = 0; i < mPlaylist->size(); ++i) {
BandwidthItem item;
item.mPlaylistIndex = i;
item.mLastFailureUs = -1ll;
sp<AMessage> meta;
AString uri;
mPlaylist->itemAt(i, &uri, &meta);
CHECK(meta->findInt32("bandwidth", (int32_t *)&item.mBandwidth));
int32_t width, height;
if (meta->findInt32("width", &width)) {
maxWidth = max(maxWidth, width);
}
if (meta->findInt32("height", &height)) {
maxHeight = max(maxHeight, height);
}
mBandwidthItems.push(item);
if (mPlaylist->hasType(i, "video")) {
itemsWithVideo.push(item);
}
}
CHECK_GT(mBandwidthItems.size(), 0u);
initialBandwidth = mBandwidthItems[0].mBandwidth;
mBandwidthItems.sort(SortByBandwidth);
for (size_t i = 0; i < mBandwidthItems.size(); ++i) {
if (mBandwidthItems.itemAt(i).mBandwidth == initialBandwidth) {
initialBandwidthIndex = i;
break;
}
}
}
mMaxWidth = maxWidth > 0 ? maxWidth : mMaxWidth;
mMaxHeight = maxHeight > 0 ? maxHeight : mMaxHeight;
// 选择一个variant(默认使用第一个,最简单的实现)
mPlaylist->pickRandomMediaItems();
changeConfiguration(
0ll /* timeUs */, initialBandwidthIndex, false /* pickTrack */);
}
void LiveSession::changeConfiguration(
int64_t timeUs, ssize_t bandwidthIndex, bool pickTrack) {
cancelBandwidthSwitch();
CHECK(!mReconfigurationInProgress);
mReconfigurationInProgress = true;
if (bandwidthIndex >= 0) {
mOrigBandwidthIndex = mCurBandwidthIndex;
mCurBandwidthIndex = bandwidthIndex;
}
CHECK_LT(mCurBandwidthIndex, mBandwidthItems.size());
const BandwidthItem &item = mBandwidthItems.itemAt(mCurBandwidthIndex);
uint32_t streamMask = 0; // streams that should be fetched by the new fetcher
uint32_t resumeMask = 0; // streams that should be fetched by the original fetcher
AString URIs[kMaxStreams];
for (size_t i = 0; i < kMaxStreams; ++i) {
if (mPlaylist->getTypeURI(item.mPlaylistIndex, mStreams[i].mType, &URIs[i])) {
streamMask |= indexToType(i);
}
}
// Step 1, 停止不再需要的fetcher,将其暂停以便后续重用
for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
// 设置mToBeRemoved之后不能重用了
if (mFetcherInfos[i].mToBeRemoved)
continue;
const AString &uri = mFetcherInfos.keyAt(i);
sp<PlaylistFetcher> &fetcher = mFetcherInfos.editValueAt(i).mFetcher;
bool discardFetcher = true, delayRemoval = false;
for (size_t j = 0; j < kMaxStreams; ++j) {
StreamType type = indexToType(j);
if ((streamMask & type) && uri == URIs[j]) {
resumeMask |= type;
streamMask &= ~type;
discardFetcher = false;
}
}
// Delay fetcher removal
if (discardFetcher && timeUs < 0ll && !pickTrack
&& (fetcher->getStreamTypeMask() & streamMask)) {
discardFetcher = false;
delayRemoval = true;
}
if (discardFetcher) {
ALOGV("discarding fetcher-%d", fetcher->getFetcherID());
fetcher->stopAsync();
} else {
float threshold = 0.0f; // default to pause after current block (47Kbytes)
bool disconnect = false;
if (timeUs >= 0ll) {
// seeking, no need to finish fetching
disconnect = true;
} else if (delayRemoval) {
// adapting, abort if remaining of current segment is over threshold
threshold = getAbortThreshold(
mOrigBandwidthIndex, mCurBandwidthIndex);
}
fetcher->pauseAsync(threshold, disconnect);
}
}
sp<AMessage> msg;
if (timeUs < 0ll) {
// skip onChangeConfiguration2 (decoder destruction) if not seeking.
msg = new AMessage(kWhatChangeConfiguration3, this);
} else {
msg = new AMessage(kWhatChangeConfiguration2, this);
}
msg->setInt32("streamMask", streamMask);
msg->setInt32("resumeMask", resumeMask);
msg->setInt32("pickTrack", pickTrack);
msg->setInt64("timeUs", timeUs);
for (size_t i = 0; i < kMaxStreams; ++i) {
if ((streamMask | resumeMask) & indexToType(i)) {
msg->setString(mStreams[i].uriKey().c_str(), URIs[i].c_str());
}
}
mContinuationCounter = mFetcherInfos.size();
mContinuation = msg;
if (mContinuationCounter == 0) {
msg->post();
}
}
上面仅仅做了些参数检查以及fetcher资源整理,之后是有两个消息kWhatChangeConfiguration3和kWhatChangeConfiguration2,后者比前者多了一个decoder销毁的处理。接下来我们依次看一下这两个函数的实现吧。
void LiveSession::onChangeConfiguration2(const sp<AMessage> &msg) {
mContinuation.clear();
// All fetchers are either suspended or have been removed now.
// 对于seek的情况,先清除之前暂存的数据
int64_t timeUs;
CHECK(msg->findInt64("timeUs", &timeUs));
if (timeUs >= 0) {
mLastSeekTimeUs = timeUs;
mLastDequeuedTimeUs = timeUs;
for (size_t i = 0; i < mPacketSources.size(); i++) {
sp<AnotherPacketSource> packetSource = mPacketSources.editValueAt(i);
sp<MetaData> format = packetSource->getFormat();
packetSource->setFormat(format);
}
for (size_t i = 0; i < kMaxStreams; ++i) {
mStreams[i].reset();
}
mDiscontinuityOffsetTimesUs.clear();
mDiscontinuityAbsStartTimesUs.clear();
if (mSeekReplyID != NULL) {
CHECK(mSeekReply != NULL);
mSeekReply->setInt32("err", OK);
mSeekReply->postReply(mSeekReplyID);
mSeekReplyID.clear();
mSeekReply.clear();
}
// seek之后重置下缓冲状态,这将是整个HLS流播放的驱动源
restartPollBuffering();
}
uint32_t streamMask, resumeMask;
CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask));
CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask));
streamMask |= resumeMask;
AString URIs[kMaxStreams];
for (size_t i = 0; i < kMaxStreams; ++i) {
if (streamMask & indexToType(i)) {
const AString &uriKey = mStreams[i].uriKey();
CHECK(msg->findString(uriKey.c_str(), &URIs[i]));
ALOGV("%s = '%s'", uriKey.c_str(), URIs[i].c_str());
}
}
uint32_t changedMask = 0;
for (size_t i = 0; i < kMaxStreams && i != kSubtitleIndex; ++i) {
// 在码流切换的情况下,发生seek后流的URL可能变化
// 这种情况下,取消码流切换,并将seekPos应用到新流上
if ((mStreamMask & streamMask & indexToType(i))
&& !mStreams[i].mUri.empty()
&& !(URIs[i] == mStreams[i].mUri)) {
sp<AnotherPacketSource> source = mPacketSources.valueFor(indexToType(i));
if (source->getLatestDequeuedMeta() != NULL) {
source->queueDiscontinuity(
ATSParser::DISCONTINUITY_FORMATCHANGE, NULL, true);
}
}
// 判断解码器是否需要关闭
if ((mStreamMask & ~streamMask & indexToType(i))) {
changedMask |= indexToType(i);
}
}
if (changedMask == 0) {
// 音视频解码器都没有变化的话,可以直接处理
onChangeConfiguration3(msg);
return;
}
// 给NuPlayer发送通知,需要关闭对应解码器,结束之后回发kWhatChangeConfiguration3消息
sp<AMessage> notify = mNotify->dup();
notify->setInt32("what", kWhatStreamsChanged);
notify->setInt32("changedMask", changedMask);
msg->setWhat(kWhatChangeConfiguration3);
msg->setTarget(this);
notify->setMessage("reply", msg);
notify->post();
}
void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) {
mContinuation.clear();
// All remaining fetchers are still suspended, the player has shutdown
// any decoders that needed it.
uint32_t streamMask, resumeMask;
CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask));
CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask));
mNewStreamMask = streamMask | resumeMask;
int64_t timeUs;
int32_t pickTrack;
bool switching = false;
CHECK(msg->findInt64("timeUs", &timeUs));
CHECK(msg->findInt32("pickTrack", &pickTrack));
// timeUs负值表示刚启动
if (timeUs < 0ll) {
if (!pickTrack) {
// 判断是否需要切换
mSwapMask = mNewStreamMask & mStreamMask & ~resumeMask;
switching = (mSwapMask != 0);
}
mRealTimeBaseUs = ALooper::GetNowUs() - mLastDequeuedTimeUs;
} else {
mRealTimeBaseUs = ALooper::GetNowUs() - timeUs;
}
for (size_t i = 0; i < kMaxStreams; ++i) {
if (streamMask & indexToType(i)) {
if (switching) {
CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mNewUri));
} else {
CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mUri));
}
}
}
// Of all existing fetchers:
// * Resume fetchers that are still needed and assign them original packet sources.
// * Mark otherwise unneeded fetchers for removal.
for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
const AString &uri = mFetcherInfos.keyAt(i);
if (!resumeFetcher(uri, resumeMask, timeUs))
mFetcherInfos.editValueAt(i).mToBeRemoved = true;
}
// 到此,streamMask中仅包含需要新建的fetcher
if (streamMask != 0) {
ALOGV("creating new fetchers for mask 0x%08x", streamMask);
}
// Find out when the original fetchers have buffered up to and start the new fetchers
// at a later timestamp.
for (size_t i = 0; i < kMaxStreams; i++) {
if (!(indexToType(i) & streamMask)) {
continue;
}
AString uri;
uri = switching ? mStreams[i].mNewUri : mStreams[i].mUri;
sp<PlaylistFetcher> fetcher = addFetcher(uri.c_str());
CHECK(fetcher != NULL);
HLSTime startTime;
SeekMode seekMode = kSeekModeExactPosition;
sp<AnotherPacketSource> sources[kNumSources];
if (i == kSubtitleIndex || (!pickTrack && !switching)) {
startTime = latestMediaSegmentStartTime();
}
for (size_t j = i; j < kMaxStreams; ++j) {
const AString &streamUri = switching ? mStreams[j].mNewUri : mStreams[j].mUri;
if ((streamMask & indexToType(j)) && uri == streamUri) {
sources[j] = mPacketSources.valueFor(indexToType(j));
if (timeUs >= 0) {
startTime.mTimeUs = timeUs;
} else {
int32_t type;
sp<AMessage> meta;
if (!switching) {
// selecting, or adapting but no swap required
meta = sources[j]->getLatestDequeuedMeta();
} else {
// adapting and swap required
meta = sources[j]->getLatestEnqueuedMeta();
if (meta != NULL && mCurBandwidthIndex > mOrigBandwidthIndex) {
// switching up
meta = sources[j]->getMetaAfterLastDequeued(mUpSwitchMargin);
}
}
if ((j == kAudioIndex || j == kVideoIndex)
&& meta != NULL && !meta->findInt32("discontinuity", &type)) {
HLSTime tmpTime(meta);
if (startTime < tmpTime) {
startTime = tmpTime;
}
}
if (!switching) {
// selecting, or adapting but no swap required
sources[j]->clear();
if (j == kSubtitleIndex) {
break;
}
sources[j]->queueDiscontinuity(
ATSParser::DISCONTINUITY_FORMAT_ONLY, NULL, true);
} else {
// switching, queue discontinuities after resume
sources[j] = mPacketSources2.valueFor(indexToType(j));
sources[j]->clear();
// the new fetcher might be providing streams that used to be
// provided by two different fetchers, if one of the fetcher
// paused in the middle while the other somehow paused in next
// seg, we have to start from next seg.
if (seekMode < mStreams[j].mSeekMode) {
seekMode = mStreams[j].mSeekMode;
}
}
}
streamMask &= ~indexToType(j);
}
}
fetcher->startAsync(
sources[kAudioIndex],
sources[kVideoIndex],
sources[kSubtitleIndex],
getMetadataSource(sources, mNewStreamMask, switching),
startTime.mTimeUs < 0 ? mLastSeekTimeUs : startTime.mTimeUs,
startTime.getSegmentTimeUs(),
startTime.mSeq,
seekMode);
}
// All fetchers have now been started, the configuration change
// has completed.
mReconfigurationInProgress = false;
if (switching) {
mSwitchInProgress = true;
} else {
mStreamMask = mNewStreamMask;
if (mOrigBandwidthIndex != mCurBandwidthIndex)
mOrigBandwidthIndex = mCurBandwidthIndex;
}
if (mDisconnectReplyID != NULL) {
finishDisconnect();// 这个属于响应退出逻辑
}
}
总结一下,connectAsync的完整处理逻辑基本上都在上面了,从URL开始,下载HLS的master playlist,然后解析并初始化fetcher及decoder,最后开始下载segment数据,并解析之。上面还少了一个函数,restartPollBuffering,这是我们读取缓冲区状态的驱动。下面是其实现:
void LiveSession::schedulePollBuffering() {
sp<AMessage> msg = new AMessage(kWhatPollBuffering, this);
msg->setInt32("generation", mPollBufferingGeneration);
msg->post(1000000ll);// 10ms下载一次
}
void LiveSession::cancelPollBuffering() {
++mPollBufferingGeneration;
mPrevBufferPercentage = -1;
}
void LiveSession::restartPollBuffering() {
cancelPollBuffering(); // 取消之前的数据下载
onPollBuffering(); // 重新下载数据
}
void LiveSession::onPollBuffering() {
bool underflow, ready, down, up;
if (checkBuffering(underflow, ready, down, up)) {
if (mInPreparationPhase) {
// 支持在preparing时下切
if (!switchBandwidthIfNeeded(false /* up */, down) && ready) {
postPrepared(OK);
}
}
if (!mInPreparationPhase) {
if (ready) {
stopBufferingIfNecessary();
} else if (underflow) {
startBufferingIfNecessary();
}
switchBandwidthIfNeeded(up, down);
}
}
// 递归执行此函数
schedulePollBuffering();
}
// 消息处理例程部分代码
case kWhatPollBuffering:
{
int32_t generation;
CHECK(msg->findInt32("generation", &generation));
if (generation == mPollBufferingGeneration) {
onPollBuffering();
}
break;
}
从这里发现,其实这就是对已下载数据的判断,并没有下载控制逻辑,所以实际下载代码应该在PlaylistFetcher中。有兴趣的可以去看一下。当然LiveSession中也有带宽估计及切换的逻辑,需要的话可以参考下。
seekTo的实现会简单一点,最终通过onSeek实现,代码如下:
void LiveSession::onSeek(const sp<AMessage> &msg) {
int64_t timeUs;
CHECK(msg->findInt64("timeUs", &timeUs));
changeConfiguration(timeUs); // 这跟启动时的情况差不多
}
6 小结
本文参考AOSP 7的源代码,简单梳理了下HttpLiveSource对HLS的解析处理逻辑,整理本文的目的仅仅是为了加深这方面的理解。当然本文没有很细节的协议解析以及HLS variant切换的逻辑。所以,仅供参考。
6.1 参考文献
- AOSP N源码
- Android 源码分析之基于NuPlayer的HLS流媒体协议