zoukankan      html  css  js  c++  java
  • 【Canal源码分析】client工作过程

    client的工作过程,需要我们自己去编写对应的逻辑,我们目前只能从example写的例子来看。目前examle中提供了两个例子,一个是单机的,一个是集群的cluster,我们后续如果需要进行开发的话,其实也是开发我们自己的client,以及client的一些逻辑。我们主要看下集群的client是如何实现和消费的,又是怎么和server进行数据交互的。

    我们来看看具体的代码:

    protected void process() {
        int batchSize = 5 * 1024;
        while (running) {
            try {
                MDC.put("destination", destination);
                connector.connect();
                connector.subscribe();
                waiting = false;
                while (running) {
                    Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
                    long batchId = message.getId();
                    int size = message.getEntries().size();
                    if (batchId == -1 || size == 0) {
                        // try {
                        // Thread.sleep(1000);
                        // } catch (InterruptedException e) {
                        // }
                    } else {
                        printSummary(message, batchId, size);
                        printEntry(message.getEntries());
                    }
    
                    connector.ack(batchId); // 提交确认
                    // connector.rollback(batchId); // 处理失败, 回滚数据
                }
            } catch (Exception e) {
                logger.error("process error!", e);
            } finally {
                connector.disconnect();
                MDC.remove("destination");
            }
        }
    }
    

    这个的这样的过程是这样的

    • 连接,connector.connect()
    • 订阅,connector.subscribe
    • 获取数据,connector.getWithoutAck()
    • 业务处理
    • 提交确认,connector.ack()
    • 回滚,connector.rollback()
    • 断开连接,connector.disconnect()

    我们具体来看下。

    一、建立连接

    CanalConnector主要有两个实现,一个是SimpleCanalConnector,一个是ClusterCanalConnector,我们主要看下ClusterCanalConnector,这也是我们要用的一个模式。

    我们用的时候,通过一个工厂类生成我们需要的Connector,这里的工厂类是CanalConnectors,里面包含了生成ClusterCanalConnector的方法。

    public static CanalConnector newClusterConnector(String zkServers, String destination, String username,
                                                     String password) {
        ClusterCanalConnector canalConnector = new ClusterCanalConnector(username,
            password,
            destination,
            new ClusterNodeAccessStrategy(destination, ZkClientx.getZkClient(zkServers)));
        canalConnector.setSoTimeout(30 * 1000);
        return canalConnector;
    }
    

    用到的参数有zk的地址,canal的名称,数据库的账号密码。里面有个ClusterNodeAccessStrategy是用来选择client的策略,这个ClusterNodeAccessStrategy的构造方法里面有些东西需要我们关注下。

    1.1 ClusterNodeAccessStrategy

    public ClusterNodeAccessStrategy(String destination, ZkClientx zkClient){
        this.zkClient = zkClient;
        childListener = new IZkChildListener() {
    
            public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
                initClusters(currentChilds);
            }
    
        };
    
        dataListener = new IZkDataListener() {
    
            public void handleDataDeleted(String dataPath) throws Exception {
                runningAddress = null;
            }
    
            public void handleDataChange(String dataPath, Object data) throws Exception {
                initRunning(data);
            }
    
        };
    
        String clusterPath = ZookeeperPathUtils.getDestinationClusterRoot(destination);
        this.zkClient.subscribeChildChanges(clusterPath, childListener);
        initClusters(this.zkClient.getChildren(clusterPath));
    
        String runningPath = ZookeeperPathUtils.getDestinationServerRunning(destination);
        this.zkClient.subscribeDataChanges(runningPath, dataListener);
        initRunning(this.zkClient.readData(runningPath, true));
    }
    

    这边起了两个监听器,都是监听server端的活动服务器的。一个是获取所有的server列表,一个是获取活动的server服务器,都是从zk的对应节点上去取的。

    1.2 连接connect

    获取到CanalConnector之后,就是真正的连接了。在ClusterCanalConnector中,我们可以看到,其实他底层用的也是SimpleCanalConnector,只不过加了一个选择的策略。

    public void connect() throws CanalClientException {
        if (connected) {
            return;
        }
    
        if (runningMonitor != null) {
            if (!runningMonitor.isStart()) {
                runningMonitor.start();
            }
        } else {
            waitClientRunning();
            if (!running) {
                return;
            }
            doConnect();
            if (filter != null) { // 如果存在条件,说明是自动切换,基于上一次的条件订阅一次
                subscribe(filter);
            }
            if (rollbackOnConnect) {
                rollback();
            }
        }
    
        connected = true;
    }
    

    如果是集群模式的客户端,那么这边的runningMonitor不为空,因为他进行了初始化。我们主要看下runningMonitor.start()里面的操作。

    public void start() {
        super.start();
    
        String path = ZookeeperPathUtils.getDestinationClientRunning(this.destination, clientData.getClientId());
        zkClient.subscribeDataChanges(path, dataListener);
        initRunning();
    }
    

    这边监听的路径是:/otter/canal/destinations/{destination}/{clientId}/running。如果有任何的变化,或节点的删除,那么执行dataListener里面的操作。

    dataListener = new IZkDataListener() {
    
        public void handleDataChange(String dataPath, Object data) throws Exception {
            MDC.put("destination", destination);
            ClientRunningData runningData = JsonUtils.unmarshalFromByte((byte[]) data, ClientRunningData.class);
            if (!isMine(runningData.getAddress())) {
                mutex.set(false);
            }
    
            if (!runningData.isActive() && isMine(runningData.getAddress())) { // 说明出现了主动释放的操作,并且本机之前是active
                release = true;
                releaseRunning();// 彻底释放mainstem
            }
    
            activeData = (ClientRunningData) runningData;
        }
    
        public void handleDataDeleted(String dataPath) throws Exception {
            MDC.put("destination", destination);
            mutex.set(false);
            // 触发一下退出,可能是人为干预的释放操作或者网络闪断引起的session expired timeout
            processActiveExit();
            if (!release && activeData != null && isMine(activeData.getAddress())) {
                // 如果上一次active的状态就是本机,则即时触发一下active抢占
                initRunning();
            } else {
                // 否则就是等待delayTime,避免因网络瞬端或者zk异常,导致出现频繁的切换操作
                delayExector.schedule(new Runnable() {
    
                    public void run() {
                        initRunning();
                    }
                }, delayTime, TimeUnit.SECONDS);
            }
        }
    
    };
    

    这里的注释比较清楚,基本上如果数据发生了变化,那么进行节点释放后,将运行节点置为活动节点。如果发生了数据删除,那么直接触发退出,如果上一次的active状态是本机,那么触发一下active抢占,否则等待delayTime,默认5s后重试。下面我们主要看下initRunning。

    1.3 initRunning

    这块主要是创建运行节点的临时节点。节点路径是/otter/canal/destinations/{destination}/{clientId},节点内容是ClientRunningData的json序列化结果。连接的代码:

    public InetSocketAddress processActiveEnter() {
        InetSocketAddress address = doConnect();
        mutex.set(true);
        if (filter != null) { // 如果存在条件,说明是自动切换,基于上一次的条件订阅一次
            subscribe(filter);
        }
    
        if (rollbackOnConnect) {
            rollback();
        }
    
        return address;
    }
    

    这块有几段逻辑,我们慢慢看下。

    1.3.1 doConnect

    这里是client直接连上了server,通过socket连接,也就是server暴露的socket端口。

    private InetSocketAddress doConnect() throws CanalClientException {
        try {
            channel = SocketChannel.open();
            channel.socket().setSoTimeout(soTimeout);
            SocketAddress address = getAddress();
            if (address == null) {
                address = getNextAddress();
            }
            channel.connect(address);
            readableChannel = Channels.newChannel(channel.socket().getInputStream());
            writableChannel = Channels.newChannel(channel.socket().getOutputStream());
            Packet p = Packet.parseFrom(readNextPacket());
            if (p.getVersion() != 1) {
                throw new CanalClientException("unsupported version at this client.");
            }
    
            if (p.getType() != PacketType.HANDSHAKE) {
                throw new CanalClientException("expect handshake but found other type.");
            }
            //
            Handshake handshake = Handshake.parseFrom(p.getBody());
            supportedCompressions.addAll(handshake.getSupportedCompressionsList());
            //
            ClientAuth ca = ClientAuth.newBuilder()
                .setUsername(username != null ? username : "")
                .setPassword(ByteString.copyFromUtf8(password != null ? password : ""))
                .setNetReadTimeout(soTimeout)
                .setNetWriteTimeout(soTimeout)
                .build();
            writeWithHeader(Packet.newBuilder()
                .setType(PacketType.CLIENTAUTHENTICATION)
                .setBody(ca.toByteString())
                .build()
                .toByteArray());
            //
            Packet ack = Packet.parseFrom(readNextPacket());
            if (ack.getType() != PacketType.ACK) {
                throw new CanalClientException("unexpected packet type when ack is expected");
            }
    
            Ack ackBody = Ack.parseFrom(ack.getBody());
            if (ackBody.getErrorCode() > 0) {
                throw new CanalClientException("something goes wrong when doing authentication: "
                                           + ackBody.getErrorMessage());
            }
    
            connected = true;
            return new InetSocketAddress(channel.socket().getLocalAddress(), channel.socket().getLocalPort());
        } catch (IOException e) {
            throw new CanalClientException(e);
        }
    }
    

    这边采用NIO编程,建立和server的socket连接后,发送了握手包和认证包,当收到ack包后,认为连接成功。认证包的服务端处理在ClientAuthenticationHandler类中,握手处理在HandshakeInitializationHandler类。

    server接收到认证的消息后,会做如下的处理:

    public void messageReceived(final ChannelHandlerContext ctx, MessageEvent e) throws Exception {
        ChannelBuffer buffer = (ChannelBuffer) e.getMessage();
        final Packet packet = Packet.parseFrom(buffer.readBytes(buffer.readableBytes()).array());
        switch (packet.getVersion()) {
            case SUPPORTED_VERSION:
            default:
                final ClientAuth clientAuth = ClientAuth.parseFrom(packet.getBody());
                // 如果存在订阅信息
                if (StringUtils.isNotEmpty(clientAuth.getDestination())
                    && StringUtils.isNotEmpty(clientAuth.getClientId())) {
                    ClientIdentity clientIdentity = new ClientIdentity(clientAuth.getDestination(),
                        Short.valueOf(clientAuth.getClientId()),
                        clientAuth.getFilter());
                    try {
                        MDC.put("destination", clientIdentity.getDestination());
                        embeddedServer.subscribe(clientIdentity);
                        ctx.setAttachment(clientIdentity);// 设置状态数据
                        // 尝试启动,如果已经启动,忽略
                        if (!embeddedServer.isStart(clientIdentity.getDestination())) {
                            ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(clientIdentity.getDestination());
                            if (!runningMonitor.isStart()) {
                                runningMonitor.start();
                            }
                        }
                    } finally {
                        MDC.remove("destination");
                    }
                }
    
                NettyUtils.ack(ctx.getChannel(), new ChannelFutureListener() {
    
                    public void operationComplete(ChannelFuture future) throws Exception {
                        //忽略
                    }
    
                });
                break;
        }
    }
    

    主要的逻辑在subscribe里面。如果metaManager没有启动,那么需要进行启动。启动时,会从zk节点下面拉取一些数据,包括客户端的消费位点情况等等。然后就是订阅,订阅是新建一个zk节点,路径为/otter/canal/destinations/{destination}/{clientId}。然后还有一些过滤器,也需要写到zk中。之后就是获取一下本client的位点信息,如果原来zk中包含,那么直接从内存中获取,否则取eventStore的第一条数据。

    1.3.2 subscribe

    发送订阅消息给server,通过socket的方式。这边是判断,如果filter不为空,才发送订阅消息。服务端的处理过程是这样的:

    case SUBSCRIPTION:
        Sub sub = Sub.parseFrom(packet.getBody());
        if (StringUtils.isNotEmpty(sub.getDestination()) && StringUtils.isNotEmpty(sub.getClientId())) {
            clientIdentity = new ClientIdentity(sub.getDestination(),
                                Short.valueOf(sub.getClientId()),
                                sub.getFilter());
            MDC.put("destination", clientIdentity.getDestination());
    
            // 尝试启动,如果已经启动,忽略
            if (!embeddedServer.isStart(clientIdentity.getDestination())) {
                ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(clientIdentity.getDestination());
                if (!runningMonitor.isStart()) {
                    runningMonitor.start();
                }
            }
    
            embeddedServer.subscribe(clientIdentity);
            ctx.setAttachment(clientIdentity);// 设置状态数据
            NettyUtils.ack(ctx.getChannel(), null);
        } else {
            NettyUtils.error(401,
                MessageFormatter.format("destination or clientId is null", sub.toString()).getMessage(),
                ctx.getChannel(),
                null);
    }
    break;
    

    类似于connect的过程,不过这边带上了filter的参数。这边启动了server以及他的监听器。

    1.3.3 rollback

    这里的回滚是指回滚server端记录的本client的位点信息。

    public void rollback() throws CanalClientException {
        waitClientRunning();
        rollback(0);// 0代笔未设置
    }
    

    这里发送了rollback的指令。服务端是这么处理的:

    case CLIENTROLLBACK:
        ClientRollback rollback = CanalPacket.ClientRollback.parseFrom(packet.getBody());
        MDC.put("destination", rollback.getDestination());
        if (StringUtils.isNotEmpty(rollback.getDestination())
            && StringUtils.isNotEmpty(rollback.getClientId())) {
            clientIdentity = new ClientIdentity(rollback.getDestination(),
                Short.valueOf(rollback.getClientId()));
            if (rollback.getBatchId() == 0L) {
                embeddedServer.rollback(clientIdentity);// 回滚所有批次
            } else {
                embeddedServer.rollback(clientIdentity, rollback.getBatchId()); // 只回滚单个批次
            }
        } else {
            NettyUtils.error(401,
                MessageFormatter.format("destination or clientId is null", rollback.toString())
                    .getMessage(),
                ctx.getChannel(),
                null);
        }
        break;
    

    这里的batchId传入的是0,也就是要回滚所有的批次。我们来看下这个回滚的动作:

    @Override
    public void rollback(ClientIdentity clientIdentity) throws CanalServerException {
        checkStart(clientIdentity.getDestination());
        CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination());
        // 因为存在第一次链接时自动rollback的情况,所以需要忽略未订阅
        boolean hasSubscribe = canalInstance.getMetaManager().hasSubscribe(clientIdentity);
        if (!hasSubscribe) {
            return;
        }
    
        synchronized (canalInstance) {
            // 清除batch信息
            canalInstance.getMetaManager().clearAllBatchs(clientIdentity);
            // rollback eventStore中的状态信息
            canalInstance.getEventStore().rollback();
            logger.info("rollback successfully, clientId:{}", new Object[] { clientIdentity.getClientId() });
        }
    }
    

    这里回滚的,其实是eventStore中的指针,把get的指针设置为之前ack的指针。

    二、订阅数据

    当client连接server完成后,就需要进行binlog数据的订阅。

    public void subscribe() throws CanalClientException {
        subscribe(""); // 传递空字符即可
    }
    
    public void subscribe(String filter) throws CanalClientException {
        int times = 0;
        while (times < retryTimes) {
            try {
                currentConnector.subscribe(filter);
                this.filter = filter;
                return;
            } catch (Throwable t) {
                if (retryTimes == -1 && t.getCause() instanceof InterruptedException) {
                    logger.info("block waiting interrupted by other thread.");
                    return;
                } else {
                    logger.warn(String.format(
                            "something goes wrong when subscribing from server: %s",
                            currentConnector != null ? currentConnector.getAddress() : "null"),
                            t);
                    times++;
                    restart();
                    logger.info("restart the connector for next round retry.");
                }
    
            }
        }
    
        throw new CanalClientException("failed to subscribe after " + times + " times retry.");
    }
    

    订阅这块的内容不再赘述,在上面的connect过程中有提到。这边还有一个失败重试的机制,当异常不是中断异常的情况下,会重试重启client connector,直到达到了阈值retryTimes。

    三、获取数据

    在建立连接和进行数据订阅之后,就可以开始进行binlog数据的获取了。主要的方法是getWithOutAck这个方法,这种是需要client自己进行数据ack的,保证了只有数据真正的被消费,而且进行了业务逻辑处理之后,才会ack。当然,如果有了异常,也会进行一定次数的重试和重启。

    public Message getWithoutAck(int batchSize, Long timeout, TimeUnit unit) throws CanalClientException {
        waitClientRunning();
        try {
            ...//忽略
            writeWithHeader(Packet.newBuilder()
                .setType(PacketType.GET)
                .setBody(Get.newBuilder()
                .setAutoAck(false)
                .setDestination(clientIdentity.getDestination())
                .setClientId(String.valueOf(clientIdentity.getClientId()))
                    .setFetchSize(size)
                    .setTimeout(time)
                    .setUnit(unit.ordinal())
                    .build()
                    .toByteString())
                .build()
                .toByteArray());
            return receiveMessages();
        } catch (IOException e) {
            throw new CanalClientException(e);
        }
    }
    

    我们可以看到,其实是发送了一个GET命令给server端,然后传递了一个参数batchSize,还有超时时间,而且不是自动提交的。服务端的处理是这样的:

    embeddedServer.getWithoutAck(clientIdentity, get.getFetchSize());
    

    也是调用的这个方法:

    @Override
    public Message getWithoutAck(ClientIdentity clientIdentity, int batchSize, Long timeout, TimeUnit unit)
                                                    throws CanalServerException {
        checkStart(clientIdentity.getDestination());
        checkSubscribe(clientIdentity);
    
        CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination());
        synchronized (canalInstance) {
            // 获取到流式数据中的最后一批获取的位置
            PositionRange<LogPosition> positionRanges = canalInstance.getMetaManager().getLastestBatch(clientIdentity);
    
            Events<Event> events = null;
            if (positionRanges != null) { // 存在流数据
                events = getEvents(canalInstance.getEventStore(), positionRanges.getStart(), batchSize, timeout, unit);
            } else {// ack后第一次获取
                Position start = canalInstance.getMetaManager().getCursor(clientIdentity);
                if (start == null) { // 第一次,还没有过ack记录,则获取当前store中的第一条
                    start = canalInstance.getEventStore().getFirstPosition();
                }
    
                events = getEvents(canalInstance.getEventStore(), start, batchSize, timeout, unit);
            }
    
            if (CollectionUtils.isEmpty(events.getEvents())) {
                logger.debug("getWithoutAck successfully, clientId:{} batchSize:{} but result is null",
                        clientIdentity.getClientId(), batchSize);
                return new Message(-1, new ArrayList<Entry>()); // 返回空包,避免生成batchId,浪费性能
            } else {
                // 记录到流式信息
                Long batchId = canalInstance.getMetaManager().addBatch(clientIdentity, events.getPositionRange());
                List<Entry> entrys = Lists.transform(events.getEvents(), new Function<Event, Entry>() {
    
                    public Entry apply(Event input) {
                        return input.getEntry();
                    }
                });
                if (logger.isInfoEnabled()) {
                    logger.info("getWithoutAck successfully, clientId:{} batchSize:{}  real size is {} and result is [batchId:{} , position:{}]",
                            clientIdentity.getClientId(),
                            batchSize,
                            entrys.size(),
                            batchId,
                            events.getPositionRange());
                }
                return new Message(batchId, entrys);
            }
    
        }
    }
    

    最主要的逻辑在这里:

    • 判断canalInstance是否已经启动:checkStart
    • 判断订阅列表中是否包含当前的client:checkSubscribe
    • 根据client信息从metaManager中获取最后消费的批次:getLastestBatch,这块在运行起来后,是从内存中取的,但是在instance启动时,是从zk中拉取的,是从/otter/canal/destinations/{destination}/{clientId}/mark下面获取的,后续也会定时(1s)刷新到这里面
    • 如果能获取到消费的批次,直接从eventStore的队列中获取数据。
    • 如果positionRanges为空,那么从metaManager中获取指针。如果指针也没有,说明原来没有ack过数据,需要从store中第一条开始获取。这个过程其实就是找start,也就是上一次ack的位置。
    • 调用getEvent,获取数据。根据传入的参数不同,调用不同的方法去获取数据,但是最终都是调用的goGet方法。这个doGet方法不是很复杂,主要是根据参数从store队列中获取数据,然后把指针进行新的设置。
    • 如果没有取到binlog数据,那么直接返回,批次号为-1。
    • 如果取到了数据,记录一下流式数据后返回。

    结果封装在Messages中,最终改为Message,包含批次号和binlog列表。

    四、业务处理

    拿到message后,需要进行判断batchId,如果batchId=-1或者binlog大小为0,说明没有拿到数据。否则在message基础上进行逻辑处理。

    Message的内容,后续我们再进行讨论。

    五、提交确认

    connector.ack(batchId); // 提交确认
    

    提交批次id,底层发送CLIENTACK命令到server。server调用CanalServerWithEmbedded的ack方法来进行提交。

    public void ack(ClientIdentity clientIdentity, long batchId) throws CanalServerException {
        checkStart(clientIdentity.getDestination());
        checkSubscribe(clientIdentity);
    
        CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination());
        PositionRange<LogPosition> positionRanges = null;
        positionRanges = canalInstance.getMetaManager().removeBatch(clientIdentity, batchId); // 更新位置
        if (positionRanges == null) { // 说明是重复的ack/rollback
            throw new CanalServerException(String.format("ack error , clientId:%s batchId:%d is not exist , please check",
                clientIdentity.getClientId(),
                batchId));
        }
    
        // 更新cursor
        if (positionRanges.getAck() != null) {
            canalInstance.getMetaManager().updateCursor(clientIdentity, positionRanges.getAck());
            if (logger.isInfoEnabled()) {
                logger.info("ack successfully, clientId:{} batchId:{} position:{}",
                        clientIdentity.getClientId(),
                        batchId,
                        positionRanges);
            }
        }
    
        // 可定时清理数据
        canalInstance.getEventStore().ack(positionRanges.getEnd());
    
    }
    

    首先更新metaManager中的batch,然后更新ack指针,同时清理store中到ack指针位置的数据。

    六、回滚

    如果有失败的情况,需要进行回滚。发送CLIENTROLLBACK命令给server端,进行数据回滚。回滚单个批次时的处理逻辑是这样的:

    @Override
    public void rollback(ClientIdentity clientIdentity, Long batchId) throws CanalServerException {
        checkStart(clientIdentity.getDestination());
        CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination());
    
        // 因为存在第一次链接时自动rollback的情况,所以需要忽略未订阅
        boolean hasSubscribe = canalInstance.getMetaManager().hasSubscribe(clientIdentity);
        if (!hasSubscribe) {
            return;
        }
        synchronized (canalInstance) {
            // 清除batch信息
            PositionRange<LogPosition> positionRanges = canalInstance.getMetaManager().removeBatch(clientIdentity,
                batchId);
            if (positionRanges == null) { // 说明是重复的ack/rollback
                throw new CanalServerException(String.format("rollback error, clientId:%s batchId:%d is not exist , please check",
                    clientIdentity.getClientId(),
                    batchId));
            }
    
            // lastRollbackPostions.put(clientIdentity,
            // positionRanges.getEnd());// 记录一下最后rollback的位置
            // TODO 后续rollback到指定的batchId位置
            canalInstance.getEventStore().rollback();// rollback
                                                     // eventStore中的状态信息
            logger.info("rollback successfully, clientId:{} batchId:{} position:{}",
                clientIdentity.getClientId(),
                batchId,
                positionRanges);
        }
    }
    

    这里的rollback到指定的batchId,其实是假的。他的rollback也是全量回滚到ack的指针位置。

    七、断开连接

    在发生异常情况时,client会断开与server的连接,也就是disconnect方法。

    public void disconnect() throws CanalClientException {
        if (rollbackOnDisConnect && channel.isConnected()) {
            rollback();
        }
    
        connected = false;
        if (runningMonitor != null) {
            if (runningMonitor.isStart()) {
                runningMonitor.stop();
            }
        } else {
            doDisconnnect();
        }
    }
    

    判断是否在断开连接的时候回滚参数(默认false)和当前socket通道是否连接中,进行回滚。

    否则调用runningMonitor.stop方法进行停止。主要的过程是这样的:

    • 取消监听/otter/canal/destinations/{destination}/{clientId}/running/节点变化信息
    • 删除上面这个节点
    • 关闭socket的读通道
    • 关闭socket的写通道
    • 关闭socket channel
  • 相关阅读:
    python变量和作用域
    模块
    装饰器
    转git取消commit
    RTP
    ffmpeg编译
    win7开启wifi
    LIVE555
    【FFmpeg】ffplay播放rtsp视频流花屏问题
    Windows下编译SDL
  • 原文地址:https://www.cnblogs.com/f-zhao/p/9105843.html
Copyright © 2011-2022 走看看