Skip to content

Commit 0b57d2a

Browse files
committed
Add temporary Postgres consumer within Kafka to save message history
1 parent fa84495 commit 0b57d2a

File tree

7 files changed

+93
-5
lines changed

7 files changed

+93
-5
lines changed

backend/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ create_migration:
44
migrate create -ext=sql -dir=postgres/migrations -seq init
55

66
migrate_up:
7-
migrate -path=postgres/migrations -database "postgresql://${PSQL_USER}:${PSQL_PASSWORD}@localhost:${PSQL_PORT}/${PSQL_DBNAME}?sslmode=disable" -verbose up
7+
migrate -path=postgres/migrations -database "postgresql://${PSQL_USER}:${PSQL_PASSWORD}@${PSQL_HOST}:${PSQL_PORT}/${PSQL_DBNAME}?sslmode=disable" -verbose up
88

99
migrate_down:
1010
migrate -path=postgres/migrations -database "postgresql://${PSQL_USER}:${PSQL_PASSWORD}@${PSQL_HOST}:${PSQL_PORT}/${PSQL_DBNAME}?sslmode=${PSQL_SSLMODE}" -verbose down

backend/kafka/consumer.go

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,17 +6,21 @@ import (
66
"errors"
77
"fmt"
88
"log"
9+
"os"
910
"sync"
1011

1112
"github.com/IBM/sarama"
1213
"github.com/Leo7Deng/ChatApp/cassandra"
1314
"github.com/Leo7Deng/ChatApp/models"
1415
"github.com/Leo7Deng/ChatApp/websockets"
1516
"github.com/gocql/gocql"
17+
"github.com/google/uuid"
18+
"github.com/jackc/pgx/v5/pgxpool"
1619
)
1720

1821
var hub *websockets.Hub
1922
var cassandraSession *gocql.Session
23+
var pool *pgxpool.Pool
2024

2125
func WebsocketConsumer(ctx context.Context, websocketHub *websockets.Hub) {
2226
hub = websocketHub
@@ -32,6 +36,14 @@ func CassandraConsumer(ctx context.Context, session *gocql.Session) {
3236
runConsumer(ctx, groupID, handler)
3337
}
3438

39+
func PostgresConsumer(ctx context.Context, postgresPool *pgxpool.Pool) {
40+
pool = postgresPool
41+
groupID := "postgres-group"
42+
handler := &PostgresConsumerHandler{}
43+
runConsumer(ctx, groupID, handler)
44+
}
45+
46+
3547
func runConsumer(ctx context.Context, groupID string, handler sarama.ConsumerGroupHandler) {
3648
config := sarama.NewConfig()
3749
config.Consumer.Return.Errors = true
@@ -74,6 +86,7 @@ func runConsumer(ctx context.Context, groupID string, handler sarama.ConsumerGro
7486

7587
type WebsocketConsumerHandler struct {hub *websockets.Hub}
7688
type CassandraConsumerHandler struct {}
89+
type PostgresConsumerHandler struct {}
7790

7891
// Setup is run at the beginning of a new session, before ConsumeClaim
7992
func (consumer *WebsocketConsumerHandler) Setup(sarama.ConsumerGroupSession) error {
@@ -82,6 +95,9 @@ func (consumer *WebsocketConsumerHandler) Setup(sarama.ConsumerGroupSession) err
8295
func (consumer *CassandraConsumerHandler) Setup(sarama.ConsumerGroupSession) error {
8396
return nil
8497
}
98+
func (consumer *PostgresConsumerHandler) Setup(sarama.ConsumerGroupSession) error {
99+
return nil
100+
}
85101

86102
// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
87103
func (consumer *WebsocketConsumerHandler) Cleanup(sarama.ConsumerGroupSession) error {
@@ -90,6 +106,9 @@ func (consumer *WebsocketConsumerHandler) Cleanup(sarama.ConsumerGroupSession) e
90106
func (consumer *CassandraConsumerHandler) Cleanup(sarama.ConsumerGroupSession) error {
91107
return nil
92108
}
109+
func (consumer *PostgresConsumerHandler) Cleanup(sarama.ConsumerGroupSession) error {
110+
return nil
111+
}
93112

94113
// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
95114
// Once the Messages() channel is closed, the Handler must finish its processing
@@ -159,4 +178,57 @@ func (consumer *CassandraConsumerHandler) ConsumeClaim(session sarama.ConsumerGr
159178
return nil
160179
}
161180
}
181+
}
182+
183+
func (consumer *PostgresConsumerHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
184+
for {
185+
select {
186+
case message, ok := <-claim.Messages():
187+
if !ok {
188+
log.Printf("message channel was closed")
189+
return nil
190+
}
191+
partition, offset := message.Partition, message.Offset
192+
var websocketMessage models.WebsocketMessage
193+
err := json.Unmarshal(message.Value, &websocketMessage)
194+
if err != nil {
195+
fmt.Printf("Failed to unmarshal message: %v\n", err)
196+
}
197+
log.Printf("Postgres consumer: %s | Partition: %d | Offset: %d\n", message.Value, partition, offset)
198+
type InsertMessage struct {
199+
MessageID string `json:"message_id"`
200+
Message models.Message `json:"message"`
201+
}
202+
insertMessage := InsertMessage{
203+
MessageID: uuid.New().String(),
204+
Message: models.Message{
205+
CircleID: websocketMessage.Message.CircleID,
206+
AuthorID: websocketMessage.Message.AuthorID,
207+
Content: websocketMessage.Message.Content,
208+
CreatedAt: websocketMessage.Message.CreatedAt,
209+
},
210+
}
211+
conn, err := pool.Acquire(session.Context())
212+
if err != nil {
213+
fmt.Fprintf(os.Stderr, "Unable to acquire a connection from the pool: %v\n", err)
214+
return err
215+
}
216+
defer conn.Release()
217+
218+
_, err = conn.Exec(
219+
session.Context(),
220+
"INSERT INTO messages (message_id, circle_id, author_id, content) VALUES ($1, $2, $3, $4)",
221+
insertMessage.MessageID,
222+
insertMessage.Message.CircleID,
223+
insertMessage.Message.AuthorID,
224+
insertMessage.Message.Content,
225+
)
226+
if err != nil {
227+
fmt.Fprintf(os.Stderr, "Error inserting into messages: %v\n", err)
228+
return err
229+
}
230+
case <-session.Context().Done():
231+
return nil
232+
}
233+
}
162234
}

backend/kafka/kafka.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,19 +3,26 @@ package kafka
33
import (
44
"context"
55
"sync"
6+
67
"github.com/Leo7Deng/ChatApp/websockets"
78
"github.com/gocql/gocql"
9+
"github.com/jackc/pgx/v5/pgxpool"
810
)
911

1012
// InitKafka starts the Kafka producer and consumer as goroutines
11-
func InitKafka(ctx context.Context, wg *sync.WaitGroup, hub *websockets.Hub, cassandraSession *gocql.Session) {
13+
func InitKafka(ctx context.Context, wg *sync.WaitGroup, hub *websockets.Hub, cassandraSession *gocql.Session, pool *pgxpool.Pool) {
1214
wg.Add(2)
1315

1416
go func() {
1517
defer wg.Done()
1618
WebsocketConsumer(ctx, hub)
1719
}()
1820

21+
go func() {
22+
defer wg.Done()
23+
PostgresConsumer(ctx, pool)
24+
}()
25+
1926
go func() {
2027
defer wg.Done()
2128
CassandraConsumer(ctx, cassandraSession)

backend/main.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ func main() {
5555

5656
postgres.ConnectPSQL()
5757
defer postgres.ClosePSQL()
58+
pool := postgres.GetPool()
59+
defer pool.Close()
5860

5961
cassandraSession := cassandra.CassandraSession()
6062
defer cassandraSession.Close()
@@ -63,7 +65,7 @@ func main() {
6365

6466
ctx, cancel := context.WithCancel(context.Background())
6567
var wg sync.WaitGroup
66-
kafka.InitKafka(ctx, &wg, hub, cassandraSession)
68+
kafka.InitKafka(ctx, &wg, hub, cassandraSession, pool)
6769

6870
go hub.Run()
6971
fmt.Println("Websocket server started", hub)

backend/postgres/client.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,10 @@ import (
1111
var pool *pgxpool.Pool
1212

1313
func ConnectPSQL() {
14-
db_url := fmt.Sprintf("postgres://%s:%s@%s:%s/%s?sslmode=%s",
14+
db_url := fmt.Sprintf("postgres://%s:%s@postgres:%s/%s?sslmode=%s",
1515
os.Getenv("PSQL_USER"),
1616
// os.Getenv("PSQL_PASSWORD"),
1717
url.QueryEscape(os.Getenv("PSQL_PASSWORD")),
18-
os.Getenv("PSQL_HOST"),
1918
os.Getenv("PSQL_PORT"),
2019
os.Getenv("PSQL_DBNAME"),
2120
os.Getenv("PSQL_SSLMODE"),
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
DROP TABLE IF EXISTS messages;
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
CREATE TABLE messages (
2+
message_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
3+
circle_id INT NOT NULL,
4+
author_id INT NOT NULL,
5+
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
6+
content TEXT NOT NULL
7+
);

0 commit comments

Comments
 (0)