bsky2tg/main.go
astravexton 1690279d5c
All checks were successful
/ build (push) Successful in 1m25s
add support for multiple message IDs
2025-07-11 14:15:09 +01:00

344 lines
10 KiB
Go

package main
import (
"bytes"
"context"
"encoding/json"
"flag"
"fmt"
"image/jpeg"
"io"
"log"
"log/slog"
"net/http"
"net/url"
"os"
"regexp"
"strconv"
"strings"
"time"
"git.zio.sh/astra/bsky2tg/bsky"
tgbotapi "github.com/OvyFlash/telegram-bot-api"
"github.com/bluesky-social/jetstream/pkg/client"
"github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential"
"github.com/bluesky-social/jetstream/pkg/models"
vidio "git.zio.sh/astra/Vidio"
)
const (
serverAddr = "wss://jetstream2.us-west.bsky.network/subscribe"
// serverAddr = "wss://stream.zio.blue/subscribe"
postFormat = "%s\n—\n<a href=\"https://bsky.app/profile/%s/post/%s\">🦋 @%s</a>"
quotePostFormat = "<blockquote>%s</blockquote>\n<a href=\"https://bsky.app/profile/%s/post/%s\">➡️ @%s</a>\n—\n<a href=\"https://bsky.app/profile/%s/post/%s\">🦋 @%s</a>"
)
type handler struct {
seenSeqs map[int64]struct{}
tg *tgbotapi.BotAPI
bsky *bsky.BSky
}
var (
post = flag.String("post", "", "URL to a BlueSky post")
delete = flag.Bool("delete", false, "true/false to delete post")
)
func main() {
flag.Parse()
var handle = os.Getenv("BSKY_HANDLE")
var password = os.Getenv("BSKY_PASSWORD")
bskyClient := bsky.NewBSky()
err := bskyClient.Auth([]string{handle, password})
if err != nil {
log.Fatal(err, ". please set BSKY_HANDLE and BSKY_PASSWORD env variables")
}
h := &handler{
seenSeqs: make(map[int64]struct{}),
bsky: bskyClient,
}
endpoint := "https://api.telegram.org/bot%s/%s"
if os.Getenv("TG_API_ENDPOINT") != "" {
endpoint = os.Getenv("TG_API_ENDPOINT")
}
bot, err := tgbotapi.NewBotAPIWithAPIEndpoint(os.Getenv("TG_TOKEN"), endpoint)
if err != nil {
panic(err)
}
h.tg = bot
if os.Getenv("TG_CHANNEL_ID") == "" {
log.Fatal("TG_CHANNEL_ID is not set")
}
if *post != "" {
r := regexp.MustCompile(`^https:\/\/.*?\/profile\/(.*?)\/post\/(.*?)$`)
s := r.FindStringSubmatch(*post)
handle := s[1]
if s[1][0:4] != "did:" {
handle, _ = bskyClient.ResolveHandle(s[1])
}
if handle != bskyClient.Bluesky.Cfg.DID {
log.Fatal("Unable to send posts from other accounts")
}
if *delete {
r, e := h.bsky.Bluesky.GetTelegramData(s[2])
if e == "" {
log.Printf("Found post %s in channel %d, deleting", s[2], r.ChannelID)
for _, msgID := range r.MessageID {
m := tgbotapi.NewDeleteMessage(r.ChannelID, msgID)
h.tg.Send(m)
}
h.bsky.Bluesky.DeleteRecord([]string{s[2], s[1], "blue.zio.bsky2tg.post"})
} else {
log.Printf("Unable to find post %s on PDS", s[2])
}
return
}
postJSON := bskyClient.Bluesky.FetchPost(handle, s[2])
p, _ := json.Marshal(postJSON.Record)
h.ProcessPost(&models.Event{
Did: postJSON.Author.Did,
TimeUS: postJSON.Record.CreatedAt.Unix(),
Kind: "",
Commit: &models.Commit{
CID: postJSON.Cid,
Operation: "create",
RKey: strings.Split(postJSON.URI, "/")[4],
Collection: "app.bsky.feed.post",
Record: p,
},
})
return
}
ctx := context.Background()
slog.SetDefault(slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
Level: slog.LevelDebug.Level(),
})))
logger := slog.Default()
config := client.DefaultClientConfig()
config.WebsocketURL = serverAddr
config.WantedCollections = []string{"app.bsky.feed.post"}
config.WantedDids = []string{bskyClient.Bluesky.Cfg.DID}
config.Compress = true
scheduler := sequential.NewScheduler("jetstream_localdev", logger, h.HandleEvent)
c, err := client.NewClient(config, logger, scheduler)
if err != nil {
log.Fatalf("failed to create client: %v", err)
}
cursor := time.Now().UnixMicro()
restartCount := 0
loop:
if err := c.ConnectAndRead(ctx, &cursor); err != nil {
log.Printf("failed to connect: %v. retrying", err)
if restartCount >= 3 {
log.Print("restart count limit hit, setting cursor to now\n")
restartCount = 0
cursor = time.Now().UnixMicro()
} else {
restartCount += 1
cursor = int64(bskyClient.Bluesky.Cfg.Cursor)
}
goto loop
}
}
func (h *handler) HandleEvent(ctx context.Context, event *models.Event) error {
if event.Commit == nil {
return nil
}
switch event.Commit.Operation {
case models.CommitOperationCreate, models.CommitOperationUpdate:
h.bsky.Bluesky.Cfg.Cursor = event.TimeUS + 1 // +1 to not show same post
bsky.PersistAuthSession(h.bsky.Bluesky.Cfg)
h.ProcessPost(event)
case models.CommitOperationDelete:
h.bsky.Bluesky.Cfg.Cursor = event.TimeUS + 1 // +1 to not show same post
bsky.PersistAuthSession(h.bsky.Bluesky.Cfg)
r, e := h.bsky.Bluesky.GetTelegramData(event.Commit.RKey)
if e == "" {
for _, msgID := range r.MessageID {
m := tgbotapi.NewDeleteMessage(r.ChannelID, msgID)
h.tg.Send(m)
}
h.bsky.Bluesky.DeleteRecord([]string{event.Commit.RKey, event.Did, "blue.zio.bsky2tg.post"})
}
}
return nil
}
func (h *handler) ProcessPost(event *models.Event) error {
ps, _ := h.bsky.ParsePost(event.Commit.Record)
po := ps.GetEmbeds()
cid, _ := strconv.ParseInt(os.Getenv("TG_CHANNEL_ID"), 10, 64)
if ps.IsReply() { //|| ps.IsQuotePost() {
// don't want to post replies to channel
return nil
}
var captionText string
if ps.IsQuotePost() {
if ps.Embed.Record.Type == "app.bsky.embed.record" {
handle, _ := h.bsky.GetHandleFromDID(strings.Split(ps.Embed.Record.Record.URI, "/")[2])
captionText = fmt.Sprintf(
quotePostFormat,
ps.ProcessFacets(h.bsky.Bluesky.FetchAliases()),
strings.Split(ps.Embed.Record.Record.URI, "/")[2],
strings.Split(ps.Embed.Record.Record.URI, "/")[4],
handle,
event.Did,
event.Commit.RKey,
h.bsky.Bluesky.Cfg.Handle)
} else {
handle, _ := h.bsky.GetHandleFromDID(strings.Split(ps.Embed.Record.URI, "/")[2])
captionText = fmt.Sprintf(
quotePostFormat,
ps.ProcessFacets(h.bsky.Bluesky.FetchAliases()),
strings.Split(ps.Embed.Record.URI, "/")[2],
strings.Split(ps.Embed.Record.URI, "/")[4],
handle,
event.Did,
event.Commit.RKey,
h.bsky.Bluesky.Cfg.Handle)
}
}
if captionText == "" {
if ps.ProcessFacets(h.bsky.Bluesky.FetchAliases()) != "" {
captionText = fmt.Sprintf(postFormat, ps.ProcessFacets(h.bsky.Bluesky.FetchAliases()), h.bsky.Bluesky.Cfg.DID, event.Commit.RKey, h.bsky.Bluesky.Cfg.Handle)
} else {
captionText = fmt.Sprintf("<a href=\"https://bsky.app/profile/%s/post/%s\">🦋 @%s</a>", h.bsky.Bluesky.Cfg.DID, event.Commit.RKey, h.bsky.Bluesky.Cfg.Handle)
}
}
// post has media
if len((*po)) != 0 {
mediaGroup := []tgbotapi.InputMedia{}
if (*po)[0].Type == "external" {
tenorGif := tgbotapi.NewInputMediaVideo(tgbotapi.FileURL((*po)[0].URI)) // is most likely gif from Tenor
tenorGif.Caption = captionText
tenorGif.ParseMode = tgbotapi.ModeHTML
mediaGroup = append(mediaGroup, &tenorGif)
} else {
for _, media := range *po {
switch media.Type {
case "image":
mediaAdd := tgbotapi.NewInputMediaPhoto(tgbotapi.FileURL(buildBlobURL(h.bsky.Bluesky.Cfg.PDSURL, h.bsky.Bluesky.Cfg.DID, media.URI)))
if len(mediaGroup) == 0 {
mediaAdd.Caption = captionText
mediaAdd.ParseMode = tgbotapi.ModeHTML
}
mediaGroup = append(mediaGroup, &mediaAdd)
case "video":
log.Printf("Fetching video: %s\n", buildBlobURL(h.bsky.Bluesky.Cfg.PDSURL, h.bsky.Bluesky.Cfg.DID, media.URI))
resp, _ := http.Get(buildBlobURL(h.bsky.Bluesky.Cfg.PDSURL, h.bsky.Bluesky.Cfg.DID, media.URI))
defer resp.Body.Close()
f, _ := os.Create(media.URI + ".mp4")
defer f.Close()
io.Copy(f, resp.Body)
f.Seek(0, 0)
mediaAdd := tgbotapi.NewInputMediaVideo(tgbotapi.FileReader{Name: "video.mp4", Reader: f})
metadata, err := getVideoMetadata(f.Name())
if err != nil {
log.Printf("Unable to read video metadata: %s\n", buildBlobURL(h.bsky.Bluesky.Cfg.PDSURL, h.bsky.Bluesky.Cfg.DID, media.URI))
break
}
mediaAdd.SupportsStreaming = true
mediaAdd.Height = metadata.Height()
mediaAdd.Width = metadata.Width()
mediaAdd.Duration = int(metadata.Duration())
frames, _ := metadata.ReadFrames(0)
var buf bytes.Buffer
jpeg.Encode(&buf, frames[0], &jpeg.Options{Quality: 90})
mediaAdd.Thumb = tgbotapi.FileBytes{Name: "thumb.jpg", Bytes: buf.Bytes()}
if len(mediaGroup) == 0 {
mediaAdd.Caption = captionText
mediaAdd.ParseMode = tgbotapi.ModeHTML
}
os.Remove(media.URI + ".mp4")
mediaGroup = append(mediaGroup, &mediaAdd)
}
}
}
if len(mediaGroup) == 0 {
log.Print("No mediaGroup to send, see previous error")
} else {
resp, _ := h.tg.SendMediaGroup(tgbotapi.NewMediaGroup(cid, mediaGroup))
uri, cid := getLink(event)
var messageIDs []int
for _, msgID := range resp {
messageIDs = append(messageIDs, msgID.MessageID)
}
h.bsky.Bluesky.CommitTelegramResponse(&bsky.TelegramRecord{
ChannelID: resp[0].Chat.ID,
MessageID: messageIDs,
Link: &bsky.Link{
Cid: cid,
URI: uri,
},
}, event.Commit.RKey)
}
} else {
m := tgbotapi.MessageConfig{}
if captionText == "" {
m = tgbotapi.NewMessage(cid, fmt.Sprintf(postFormat, ps.ProcessFacets(h.bsky.Bluesky.FetchAliases()), h.bsky.Bluesky.Cfg.DID, event.Commit.RKey, h.bsky.Bluesky.Cfg.Handle))
} else {
m = tgbotapi.NewMessage(cid, captionText)
}
m.ParseMode = tgbotapi.ModeHTML
if ps.IsQuotePost() {
m.LinkPreviewOptions = tgbotapi.LinkPreviewOptions{
IsDisabled: false,
URL: fmt.Sprintf("https://bsky.app/profile/%s/post/%s",
strings.Split(ps.Embed.Record.URI, "/")[2],
strings.Split(ps.Embed.Record.URI, "/")[4]),
PreferSmallMedia: false,
PreferLargeMedia: true,
ShowAboveText: true,
}
} else {
m.LinkPreviewOptions = tgbotapi.LinkPreviewOptions{IsDisabled: true}
}
resp, _ := h.tg.Send(m)
uri, cid := getLink(event)
h.bsky.Bluesky.CommitTelegramResponse(&bsky.TelegramRecord{
ChannelID: resp.Chat.ID,
MessageID: []int{resp.MessageID},
Link: &bsky.Link{
Cid: cid,
URI: uri,
},
}, event.Commit.RKey)
}
return nil
}
func buildBlobURL(server string, did string, cid string) string {
return server + "/xrpc/com.atproto.sync.getBlob?did=" + url.QueryEscape(did) + "&cid=" + url.QueryEscape(cid)
}
func getLink(event *models.Event) (string, string) {
return fmt.Sprintf("at://%s/%s/%s", event.Did, event.Commit.Collection, event.Commit.RKey), event.Commit.CID
}
func getVideoMetadata(fname string) (video *vidio.Video, err error) {
video, err = vidio.NewVideo(fname)
return
}