1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
|
package sfive
import (
"encoding/binary"
"errors"
"fmt"
"strings"
"time"
)
var (
ErrNotFound = errors.New("not found")
)
const (
// bucket names
hubInfoBn = "HubInfo"
dataUpdatesBn = "DataUpdates"
sourcesFwdBn = "SourcesFwd"
sourcesRevBn = "SourcesRev"
clientDataBn = "ClientData"
userAgentsFwdBn = "UserAgentsFwd"
userAgentsRevBn = "UserAgentsRev"
// well-known keys
hubUUIDKey = "HubUUID"
)
// stored in sourcesRevBn
type streamIdDb struct {
ContentId string `json:"c"`
Format string `json:"f"`
Quality string `json:"q"`
}
type sourceDb struct {
Hostname string `json:"h"`
StreamId streamIdDb `json:"s"`
Tags []string `json:"t"`
}
func NewSourceDb(value StatisticsData) sourceDb {
return sourceDb{
Hostname: value.SourceId.Hostname,
StreamId: streamIdDb{
ContentId: value.SourceId.StreamId.ContentId,
Format: value.SourceId.StreamId.Format,
Quality: value.SourceId.StreamId.Quality,
},
Tags: value.SourceId.Tags,
}
}
func (s sourceDb) String() string {
return fmt.Sprintf("%s/%s/%s/%s/%s", s.Hostname, s.StreamId.ContentId, s.StreamId.Format, s.StreamId.Quality, strings.Join(s.Tags, ","))
}
func (s *SourceId) CopyFromSourceDb(v sourceDb) {
s.Hostname = v.Hostname
s.StreamId.ContentId = v.StreamId.ContentId
s.StreamId.Format = v.StreamId.Format
s.StreamId.Quality = v.StreamId.Quality
s.Tags = v.Tags
}
// stored in clientDataBn
type clientDataDb struct {
Ip string `json:"ip"`
UserAgentId int `json:"ua"`
BytesSent uint `json:"bs"`
}
// stored in dataUpdatesTn
// in DB, StatisticsData/DataUpdate is flattened compared to JSON DTOs
type dataUpdateDb struct {
SourceHubUuid string `json:"h,omitempty"`
SourceHubDataUpdateId int `json:"hi,omitempty"`
SourceId int `json:"si"`
StartTime int64 `json:"st"` // unix timestamp in milliseconds
Duration int64 `json:"du"` // duration in milliseconds
ClientCount uint `json:"cc"`
BytesReceived uint `json:"br"`
BytesSent uint `json:"bs"`
}
func NewDataUpdateDb(v StatisticsData) dataUpdateDb {
return dataUpdateDb{
v.SourceHubUuid,
v.SourceHubDataUpdateId,
-1,
int64(v.StartTime.Unix()*1000) + int64(v.StartTime.Nanosecond()/1000000),
v.Duration,
v.Data.ClientCount,
v.Data.BytesReceived,
v.Data.BytesSent,
}
}
func (s *StatisticsData) CopyFromDataUpdateDb(v dataUpdateDb, vId int, hubId string) {
if v.SourceHubUuid == "" {
s.SourceHubUuid = hubId
s.SourceHubDataUpdateId = vId
} else {
s.SourceHubUuid = v.SourceHubUuid
s.SourceHubDataUpdateId = v.SourceHubDataUpdateId
}
s.StartTime = time.Unix((v.StartTime / 1000), (v.StartTime%1000)*1000000)
s.Duration = v.Duration
s.Data.ClientCount = v.ClientCount
s.Data.BytesReceived = v.BytesReceived
s.Data.BytesSent = v.BytesSent
}
func itob(v int) []byte {
b := make([]byte, 8)
binary.BigEndian.PutUint64(b, uint64(v))
return b
}
func btoi(b []byte) int {
return int(binary.BigEndian.Uint64(b))
}
|