CoAP 服务中的 Observe 和 Multicast

CoAP 是一种专门为物联网设计的轻量级应用层协议,它基于 UDP,支持观察(Observe)和组播(Multicast)等特性。其中 ObserveCoAP 的一个重要特性,它允许客户端注册对资源的观察,一旦资源发生变化,服务器就会主动通知客户端。这种机制非常适合物联网场景,例如传感器数据的实时监控、设备状态的实时更新等。其在传输层是基于 UDP 的单播来实现。而 Multicast 则可以实现一次性向多个设备发送数据,在传输层一般是基于 UDP 的组播来提高了数据传输的效率。

UDP 单播+组播的代码实现

C 实现 UDP 单播+组播的代码

由于 Go 的实现封装了很多细节,而且在直接用 Go 调用 CoAP 库的时候出现了一些问题,所以先写了一个 C 语言的实现,然后再仔细研究 Go 的实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/ip.h>
#include <unistd.h>

#define PORT 5683
#define MULTICAST_ADDR "224.0.1.187"
#define BUFFER_SIZE 1024

int main() {
int sockfd;
struct sockaddr_in addr;
struct ip_mreq mreq;
char buffer[BUFFER_SIZE];
ssize_t n;
struct sockaddr_in sender_addr;
socklen_t sender_len = sizeof(sender_addr);

// 创建 UDP 套接字
if ((sockfd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
perror("socket creation failed");
exit(EXIT_FAILURE);
}

// 绑定套接字到本地地址 0.0.0.0:5683
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = INADDR_ANY;
addr.sin_port = htons(PORT);
if (bind(sockfd, (struct sockaddr *)&addr, sizeof(addr)) < 0) {
perror("bind failed");
close(sockfd);
exit(EXIT_FAILURE);
}

// 加入组播组 224.0.1.187
mreq.imr_multiaddr.s_addr = inet_addr(MULTICAST_ADDR);
mreq.imr_interface.s_addr = INADDR_ANY;
if (setsockopt(sockfd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq)) < 0) {
perror("setsockopt IP_ADD_MEMBERSHIP failed");
close(sockfd);
exit(EXIT_FAILURE);
}

printf("Listening on port %d and receiving multicast messages...\n", PORT);

// 接收数据
while (1) {
n = recvfrom(sockfd, buffer, sizeof(buffer) - 1, 0, (struct sockaddr *)&sender_addr, &sender_len);
if (n < 0) {
perror("recvfrom failed");
close(sockfd);
exit(EXIT_FAILURE);
}
buffer[n] = '\0'; // 确保字符串以 null 结尾
printf("Received message: %s\n", buffer);
}

close(sockfd);
return 0;
}

上面的代码是一个简单的 UDP 组播+单播接收程序,它创建了一个 UDP 套接字,绑定到本地地址 0.0.0.0:5683,并加入了组播组 224.0.1.187。然后通过 recvfrom 函数接收数据,打印接收到的消息。

运行程序

1
2
3
$ gcc -o udp_multicast_receive udp_multicast_receive.c
$ ./udp_multicast_receive
Listening on port 5683 and receiving multicast messages...

Go 实现 UDP 单播+组播的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
package main

import (
"fmt"
"net"
"os"
"os/signal"
"sync"
"syscall"
)

const BUF_SIZE int = 8192

func main() {
// group addr
gaddr, _ := net.ResolveUDPAddr("udp", "224.0.1.187:5683")
multicastListener, err := net.ListenMulticastUDP("udp", nil, gaddr)
if err != nil {
checkError(err)
}

var wg sync.WaitGroup

// 捕获 Ctrl+C 信号
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)

// 启动协程接收消息
wg.Add(1)
go func() {
defer wg.Done()
recvMsg("multicast - ", multicastListener)
}()

// 等待信号
<-sigChan
fmt.Println("Received interrupt signal, shutting down...")

// 关闭监听器
multicastListener.Close()

// 等待所有协程完成
wg.Wait()
}

func recvMsg(prefix string, conn *net.UDPConn) {
message := make([]byte, BUF_SIZE)

for {
n, src, err := conn.ReadFromUDP(message)
if err != nil {
if opErr, ok := err.(*net.OpError); ok && opErr.Err.Error() == "use of closed network connection" {
fmt.Println("Connection closed, exiting...")
return
}
fmt.Println("Read error:", err)
continue
}
fmt.Println(prefix, src, ": ", string(message[:n]))
}
}

func checkError(err error) {
if err != nil {
fmt.Fprintf(os.Stderr, "Fatal error: %s", err.Error())
os.Exit(1)
}
}

上面的代码是一个简单的 Go 版本的 UDP 组播+单播接收程序,它创建了一个 UDP 套接字,绑定到组播地址 224.0.1.187:5683,然后通过 ReadFromUDP 函数接收数据,打印接收到的消息。此时已经可以收到发送到组播地址的消息和发送到该主机的单播消息。测试程序如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package main

import (
"fmt"
"net"
"os"
"time"
)

func main() {
conn, err := net.Dial("udp", "224.0.1.187:5683")
if err != nil {
checkError(err)
}

for i := 0; i < 10; i++ {
conn.Write([]byte("hello, multicast!"))
time.Sleep(1 * time.Second)
}
}

func checkError(err error) {
if err != nil {
fmt.Fprintf(os.Stderr, "Fatal error: %s", err.Error())
os.Exit(1)
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package main

import (
"fmt"
"net"
"os"
"time"
)

func main() {
conn, err := net.Dial("udp", "127.0.0.1:5683")
if err != nil {
checkError(err)
}

for i := 0; i < 10; i++ {
conn.Write([]byte("hello, world!"))
time.Sleep(1 * time.Second)
}
}

func checkError(err error) {
if err != nil {
fmt.Fprintf(os.Stderr, "Fatal error: %s", err.Error())
os.Exit(1)
}
}

CoAP 服务中的 Observe 和 Multicast

找了一个 CoAP 的库 github.com/plgd-dev/go-coap 来实现 CoAP 服务中的 ObserveMulticast

直接用 go-coap 库实现 ObserveMulticast 的代码会有一些问题,其中 Observe 的主要是通过库的 ListenAndServe 接口,封装了监听和启动服务的细节,其 example 代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func main() {
log.Fatal(coap.ListenAndServe("udp", ":5688",
mux.HandlerFunc(func(w mux.ResponseWriter, r *mux.Message) {
log.Printf("Got message path=%v: %+v from %v", getPath(r.Options()), r, w.Conn().RemoteAddr())
obs, err := r.Options().Observe()
switch {
case r.Code() == codes.GET && err == nil && obs == 0:
go periodicTransmitter(w.Conn(), r.Token())
case r.Code() == codes.GET:
err := sendResponse(w.Conn(), r.Token(), time.Now(), -1)
if err != nil {
log.Printf("Error on transmitter: %v", err)
}
}
})))
}

Multicast 则是需要通过该库的 NewListenUDP 接口创建一个 UDP 连接,然后通过 Serve 接口来启动服务,其 example 代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
func main() {
m := mux.NewRouter()
m.Handle("/oic/res", mux.HandlerFunc(handleMcast))
multicastAddr := "224.0.1.187:5683"

l, err := net.NewListenUDP("udp4", multicastAddr)
if err != nil {
log.Println(err)
return
}

ifaces, err := gonet.Interfaces()
if err != nil {
log.Println(err)
return
}

a, err := gonet.ResolveUDPAddr("udp", multicastAddr)
if err != nil {
log.Println(err)
return
}

for i := range ifaces {
iface := ifaces[i]
err := l.JoinGroup(&iface, a)
if err != nil {
log.Printf("cannot JoinGroup(%v, %v): %v", iface, a, err)
}
}
err = l.SetMulticastLoopback(true)
if err != nil {
log.Println(err)
return
}

defer l.Close()
s := udp.NewServer(options.WithMux(m))
defer s.Stop()
log.Fatal(s.Serve(l))
}

Go 实现 CoAP 服务中的 Observe 和 Multicast

上面的两个例子分别实现了 ObserveMulticast 的功能,下面将两个例子结合起来,实现 CoAP 服务中的 ObserveMulticast 的功能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
package main

import (
"bytes"
"fmt"
"log"
"net"
"time"

"github.com/plgd-dev/go-coap/v3/message"
"github.com/plgd-dev/go-coap/v3/message/codes"
"github.com/plgd-dev/go-coap/v3/mux"
coapnet "github.com/plgd-dev/go-coap/v3/net"
"github.com/plgd-dev/go-coap/v3/options"
"github.com/plgd-dev/go-coap/v3/udp"
)

func handleMcast(w mux.ResponseWriter, r *mux.Message) {
path, err := r.Options().Path()
if err != nil {
log.Printf("cannot get path: %v", err)
return
}

log.Printf("Got mcast message: path=%q: from %v", path, w.Conn().RemoteAddr())
w.SetResponse(codes.Content, message.TextPlain, bytes.NewReader([]byte("mcast response")))
}

func getPath(opts message.Options) string {
path, err := opts.Path()
if err != nil {
log.Printf("cannot get path: %v", err)
return ""
}
return path
}

func sendResponse(cc mux.Conn, token []byte, subded time.Time, obs int64) error {
m := cc.AcquireMessage(cc.Context())
defer cc.ReleaseMessage(m)
m.SetCode(codes.Content)
m.SetToken(token)
m.SetBody(bytes.NewReader([]byte(fmt.Sprintf("Been running for %v", time.Since(subded)))))
m.SetContentFormat(message.TextPlain)
if obs >= 0 {
m.SetObserve(uint32(obs))
}
return cc.WriteMessage(m)
}

func periodicTransmitter(cc mux.Conn, token []byte) {
subded := time.Now()

for obs := int64(2); ; obs++ {
err := sendResponse(cc, token, subded, obs)
if err != nil {
log.Printf("Error on transmitter, stopping: %v", err)
return
}
time.Sleep(time.Second)
}
}

func main() {
m := mux.NewRouter()
m.Handle("/oic/res", mux.HandlerFunc(handleMcast))
m.DefaultHandle(mux.HandlerFunc(func(w mux.ResponseWriter, r *mux.Message) {
log.Printf("Got message path=%v: %+v from %v", getPath(r.Options()), r, w.Conn().RemoteAddr())
obs, err := r.Options().Observe()
switch {
case r.Code() == codes.GET && err == nil && obs == 0:
go periodicTransmitter(w.Conn(), r.Token())
case r.Code() == codes.GET:
err := sendResponse(w.Conn(), r.Token(), time.Now(), -1)
if err != nil {
log.Printf("Error on transmitter: %v", err)
}
}
}))

multicastAddr := "224.0.1.187:5683"
gaddr, _ := net.ResolveUDPAddr("udp", multicastAddr)
multicastListener, err := net.ListenMulticastUDP("udp", nil, gaddr)
if err != nil {
log.Fatalln("Error listening:", err)
}
l := coapnet.NewUDPConn("udp", multicastListener)
l.SetMulticastLoopback(true)
defer l.Close()

s := udp.NewServer(options.WithMux(m))
defer s.Stop()
log.Fatal(s.Serve(l))
}

CoAPMulticastAndObserve