Broadcasting and multicasting millions of clients using TCP

Sudeep Dasgupta
7 min readMar 22, 2020

TCP is unicast then how can we broadcast in an optimal way. This blog is all about broadcasting in TCP

In socket programming generally, two protocols are widely used that is TCP and UDP. Above these protocols, more protocols are invented like WebSocket, HTTP, STP, RTMP etc. TCP is generally used where data loss cannot be accepted as TCP made threeway handshake where it sends acknowledgement on every packet it receives but this is way to slow than UDP. UDP is generally used in video streaming where data loss is not a big problem but speed is highly required. UDP does not send an acknowledgement but is much faster than TCP.

TCP is generally unicast that means the server needs to send the response to each client individually whereas UDP has multicast and broadcast support. Multicast means server transmits data to a group and client connected to the same group gets the data. Broadcast means n number of clients receiving same data from the server. Below is a sample image to show you a relation of on to one, one to many, one to all.

Now why TCP broadcasting is necessary and even multicasting. Well, suppose you created a WebSocket server or any simple plain TCP server and millions of clients are connected to a single channel for like live streaming of a channel going on and people connected to that channels are writing comments now web-browser does not support UDP yet, WebRTC is based on UDP but is a caller callee protocol. So we will talk about browser connected to a WebSocket server and pushing messages concurrently and the messages need to be shown back to the clients in the UI of other clients who are writing comments or whatever in that case what server does is it creates a hashmap of the array and put all clients socket object inside the array and can map the array to a hashmap using a key as channel name and whenever a message is received it iterates over the array and send the message to all socket that is in the array. Given below a simple example. It will be in Golang but you can correlate with other languages.

// Host TCP server in localhost with port 8100

server, err := net.Listen("tcp", "127.0.0.1:8100")

// create array of tcp socket client
var listOfClients []*net.TCPConn

// waiting for new connections
for {
conn, err := server.Accept()

tcp := conn.(*net.TCPConn)

tcp.SetNoDelay(true)
tcp.SetKeepAlive(true)
tcp.SetKeepAlive(true)
tcp.SetLinger(1)
tcp.SetReadBuffer(10000)
tcp.SetWriteBuffer(10000)
tcp.SetDeadline(time.Now().Add(1000000 * time.Second))
tcp.SetReadDeadline(time.Now().Add(1000000 * time.Second))
tcp.SetWriteDeadline(time.Now().Add(1000000 * time.Second))

// appending the client socket to array/slice
listOfClients = append(listOfClients, tcp)

// calling handleRequest using goroutines to run in seperate thread
go HandleRequest(*tcp)
}

func HandleRequest(conn net.TCPConn){

// waiting for new messages

for {

// reading the message from socket buffer

message, err := bufio.NewReader(conn).ReadString('\n')

if err != nil {
log.Printf("Error: %+v", err.Error())
return
}

// broadcasting to all clients
go broadCast(message)
}
}


func broadCast(message string){

// iterating over the listOfClients

for i := range listOfClients{
// sending message to all clients in the socket
listOfClients[i].Write(message)
}
}

###################################################################################

// Client Socket example

// Client A connect to this socket

// connecting to localhost with port 8100
conn, _ := net.Dial("tcp", "127.0.0.1:8100")

// writing message to socket

conn.Write([]byte("Message from client A"))

for {
// reading messages from server
message, err := bufio.NewReader(conn).ReadString('\n')
if err != nil {
log.Printf("Error: %+v", err.Error())
return
}

fmt.Println(message)

}

// Same code for another client
// Client B connect to this socket

conn, _ := net.Dial("tcp", "127.0.0.1:8100")

conn.Write([]byte("Message from client B"))

for {

message, err := bufio.NewReader(conn).ReadString('\n')
if err != nil {
log.Printf("Error: %+v", err.Error())
return
}

fmt.Println(message)

}

In this example 2 client “Client A and Client B” both of the connected to localhost with port 8100 and send messages to the server. The server receives the message and the send back the message to all clients connected to the server iterating over the socket client array. This is a legacy code used by many developers and you will find such an example in any language over the internet. Now, what is the issue with this example, when the number of client increases the last client socket that is in the array will get the message very lately as it will iterate over the array again and again whenever a new message is received. You may use concurrency for sending all messages in separate threads or creating a batch and then iterate over the socket array and send the batch message after some interval. Still, this is not a better solution.

So then I came out with a solution this might not be the best solution but I benchmarked it and neither memory utilization is high with n number of client neither CPU and all clients receives message concurrently at the same time without iterating over the socket array over and over again. If you have a better solution please share with me so that I can also learn new stuff from you guys. Below is the code snippet for my solution of the broadcast. I have used a file, but an array can also be used to keep everything in memory, it is up to you but make sure to remove messages from array to avoid memory leaks.

// setting the log file where all the messages will be written

var socketFilePath = "/path/of/the/socket.log"

// opening a file in append mode

FileDescriptor, err := os.OpenFile(socketFilePath,
os.O_APPEND|os.O_WRONLY, 0700)

// checking for errors while opening file
if err != nil {
go log.Write(err.Error())
return
}

// Host TCP server in localhost with port 8100

server, err := net.Listen("tcp", "127.0.0.1:8100")


// waiting for new connections
for {
conn, err := server.Accept()

tcp := conn.(*net.TCPConn)

tcp.SetNoDelay(true)
tcp.SetKeepAlive(true)
tcp.SetKeepAlive(true)
tcp.SetLinger(1)
tcp.SetReadBuffer(10000)
tcp.SetWriteBuffer(10000)
tcp.SetDeadline(time.Now().Add(1000000 * time.Second))
tcp.SetReadDeadline(time.Now().Add(1000000 * time.Second))
tcp.SetWriteDeadline(time.Now().Add(1000000 * time.Second))

// appending the client socket to array/slice
listOfClients = append(listOfClients, tcp)

// subscribe to the channel file
go SubscribeChannel(*tcp)

// calling handleRequest using goroutines to run in seperate thread
go HandleRequest(*tcp)
}

func SubscribeChannel(conn net.TCPConn){

// opening file
file, err := os.Open(socketFilePath)

// defering the file to close, it will be closed only when loop is exited
defer file.Close()

if err != nil {
go log.Write(err.Error())
return
}

// creating a file reader
reader := bufio.NewReader(file)

var line string

var err error

var lastModifiedDate int64

for{

// getting file status
fileStat, err := os.Stat(socketFilePath)

if err != nil {
go log.Write(err.Error())
break
}

// checking for last modified time
if fileStat.ModTime() == lastModifiedDate{
thread.Sleep(1 * time.Nanosecond)
continue
}

// reading lines
line, err = reader.ReadString('\n')

if err != nil {
if err == io.EOF {
continue
}else{
break
}
}

//updating the last modified time
lastModifiedDate = fileStat.ModTime()

// setting a variable to count number of retry
var totalRetry = 0

RETRY:

// if retry is already 3 then exit loop
if totalRetry > 3{
log.Println("Socket disconnected...")
break
}
// writing to clients
_, socketError := conn.Write([]byte(line))

// if failed then retry again till count = 3
if socketError != nil{
totalRetry += 1
goto RETRY
}

}

}

func HandleRequest(conn net.TCPConn){

// waiting for new messages

var callbackchannels = make(chan bool)

for {

// reading the message from socket buffer

message, err := bufio.NewReader(conn).ReadString('\n')

if err != nil {
log.Printf("Error: %+v", err.Error())
return
}

// broadcasting to all clients
go WriteData(message, callbackchannels)

<-callbackchannels
}
}


func WriteData(message string, callback chan bool){

mtx.Lock()

_, err := FileDescriptor.Write(message)

mtx.Unlock()

if (err != nil && err != io.EOF ){
go log.Write(err.Error())
callback <- false
return
}

callback <- true

}

###################################################################################

// Client Socket example

// Client A connect to this socket

// connecting to localhost with port 8100
conn, _ := net.Dial("tcp", "127.0.0.1:8100")

// writing message to socket

conn.Write([]byte("Message from client A"))

for {
// reading messages from server
message, err := bufio.NewReader(conn).ReadString('\n')
if err != nil {
log.Printf("Error: %+v", err.Error())
return
}

fmt.Println(message)

}

// Same code for another client
// Client B connect to this socket

conn, _ := net.Dial("tcp", "127.0.0.1:8100")

conn.Write([]byte("Message from client B"))

for {

message, err := bufio.NewReader(conn).ReadString('\n')
if err != nil {
log.Printf("Error: %+v", err.Error())
return
}

fmt.Println(message)

}

This is not the exact solution I use as here you cant track of how much messages the client has already received as in this solution if the client reconnects again it will start downloading all messages from the file secondly you need to set retention mechanism as the file will become larger day by day. Thirdly I prefer byte buffer to send and receive messages where I set first 4 or 8 bytes as the total size of the packet and then add some more header flags in the next byte array then at the end the actual message as it is fast and easy to parse if you JSON it takes a lot of time to serialize and deserialize messages. To read the file I rather use
totalByteLen, errPacket := file.ReadAt(restPacket, cursor) where I update the cursor and keep track of the total bytes sends to the clients and whenever the client has connected again I have the track of the last byte the subscriber received.
This byte buffer technique is also used in many stock exchanges for faster delivery, it is also used in WebSocket implementation. I have written a blog where I am given a solution to implement WebSocket from scratch rather than using a library. And this file descriptor solution is also used by many log streaming applications where all messages received by a server are written into a file and then the subscriber receives all the messages that are written in the file. This is way faster than the previous solution where you are iterating over the socket array to send message to all client. Modifying this can also be used as multicast where you create a group using hashmap and put all open subscriber file descriptor that to hashmap and send message to that hashmap as it contains the unique group. I will suggest using byte buffer as using byte buffer message transmission is fast secondly you can send video streams, audio streams etc for realtime communication. You can read the byte buffer library that I have written in Golang create byte buffer in easy steps, check out the other blog written by me. If you have more suggestions and better implementations comment below and suggest solutions. Till then happy coding :)

--

--

Sudeep Dasgupta

Machine Learning | Big Data | Video Streaming | Real-Time Low Latency Apps | Product Designer | Programmer | Open Source Contributor