340 lines
10 KiB
Go
340 lines
10 KiB
Go
package main
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/json"
|
|
"flag"
|
|
"fmt"
|
|
"image/jpeg"
|
|
"io"
|
|
"log"
|
|
"log/slog"
|
|
"net/http"
|
|
"net/url"
|
|
"os"
|
|
"regexp"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"git.zio.sh/astra/bsky2tg/bsky"
|
|
tgbotapi "github.com/OvyFlash/telegram-bot-api"
|
|
|
|
"github.com/bluesky-social/jetstream/pkg/client"
|
|
"github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential"
|
|
"github.com/bluesky-social/jetstream/pkg/models"
|
|
|
|
vidio "git.zio.sh/astra/Vidio"
|
|
)
|
|
|
|
const (
|
|
serverAddr = "wss://jetstream2.us-west.bsky.network/subscribe"
|
|
// serverAddr = "wss://stream.zio.blue/subscribe"
|
|
postFormat = "%s\n—\n<a href=\"https://bsky.app/profile/%s/post/%s\">🦋 @%s</a>"
|
|
quotePostFormat = "<blockquote>%s</blockquote>\n<a href=\"https://bsky.app/profile/%s/post/%s\">➡️ @%s</a>\n—\n<a href=\"https://bsky.app/profile/%s/post/%s\">🦋 @%s</a>"
|
|
)
|
|
|
|
type handler struct {
|
|
seenSeqs map[int64]struct{}
|
|
tg *tgbotapi.BotAPI
|
|
bsky *bsky.BSky
|
|
}
|
|
|
|
var (
|
|
post = flag.String("post", "", "URL to a BlueSky post")
|
|
delete = flag.Bool("delete", false, "true/false to delete post")
|
|
)
|
|
|
|
func main() {
|
|
flag.Parse()
|
|
|
|
var handle = os.Getenv("BSKY_HANDLE")
|
|
var password = os.Getenv("BSKY_PASSWORD")
|
|
bskyClient := bsky.NewBSky()
|
|
err := bskyClient.Auth([]string{handle, password})
|
|
if err != nil {
|
|
log.Fatal(err, ". please set BSKY_HANDLE and BSKY_PASSWORD env variables")
|
|
}
|
|
|
|
h := &handler{
|
|
seenSeqs: make(map[int64]struct{}),
|
|
bsky: bskyClient,
|
|
}
|
|
|
|
endpoint := "https://api.telegram.org/bot%s/%s"
|
|
if os.Getenv("TG_API_ENDPOINT") != "" {
|
|
endpoint = os.Getenv("TG_API_ENDPOINT")
|
|
}
|
|
bot, err := tgbotapi.NewBotAPIWithAPIEndpoint(os.Getenv("TG_TOKEN"), endpoint)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
h.tg = bot
|
|
|
|
if os.Getenv("TG_CHANNEL_ID") == "" {
|
|
log.Fatal("TG_CHANNEL_ID is not set")
|
|
}
|
|
|
|
if *post != "" {
|
|
r := regexp.MustCompile(`^https:\/\/.*?\/profile\/(.*?)\/post\/(.*?)$`)
|
|
s := r.FindStringSubmatch(*post)
|
|
handle := s[1]
|
|
if s[1][0:4] != "did:" {
|
|
handle, _ = bskyClient.ResolveHandle(s[1])
|
|
}
|
|
|
|
if handle != bskyClient.Bluesky.Cfg.DID {
|
|
log.Fatal("Unable to send posts from other accounts")
|
|
}
|
|
|
|
if *delete {
|
|
r, e := h.bsky.Bluesky.GetTelegramData(s[2])
|
|
if e == "" {
|
|
log.Printf("Found post %s in channel %d, deleting", s[2], r.ChannelID)
|
|
m := tgbotapi.NewDeleteMessages(r.ChannelID, r.MessageID)
|
|
h.tg.Send(m)
|
|
h.bsky.Bluesky.DeleteRecord([]string{s[2], s[1], "blue.zio.bsky2tg.post"})
|
|
} else {
|
|
log.Printf("Unable to find post %s on PDS", s[2])
|
|
}
|
|
return
|
|
}
|
|
|
|
postJSON := bskyClient.Bluesky.FetchPost(handle, s[2])
|
|
p, _ := json.Marshal(postJSON.Record)
|
|
h.ProcessPost(&models.Event{
|
|
Did: postJSON.Author.Did,
|
|
TimeUS: postJSON.Record.CreatedAt.Unix(),
|
|
Kind: "",
|
|
Commit: &models.Commit{
|
|
CID: postJSON.Cid,
|
|
Operation: "create",
|
|
RKey: strings.Split(postJSON.URI, "/")[4],
|
|
Collection: "app.bsky.feed.post",
|
|
Record: p,
|
|
},
|
|
})
|
|
return
|
|
}
|
|
|
|
ctx := context.Background()
|
|
slog.SetDefault(slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
|
|
Level: slog.LevelDebug.Level(),
|
|
})))
|
|
|
|
logger := slog.Default()
|
|
config := client.DefaultClientConfig()
|
|
config.WebsocketURL = serverAddr
|
|
config.WantedCollections = []string{"app.bsky.feed.post"}
|
|
config.WantedDids = []string{bskyClient.Bluesky.Cfg.DID}
|
|
config.Compress = true
|
|
|
|
scheduler := sequential.NewScheduler("jetstream_localdev", logger, h.HandleEvent)
|
|
|
|
c, err := client.NewClient(config, logger, scheduler)
|
|
if err != nil {
|
|
log.Fatalf("failed to create client: %v", err)
|
|
}
|
|
|
|
cursor := time.Now().UnixMicro()
|
|
restartCount := 0
|
|
loop:
|
|
if err := c.ConnectAndRead(ctx, &cursor); err != nil {
|
|
log.Printf("failed to connect: %v. retrying", err)
|
|
if restartCount >= 3 {
|
|
log.Print("restart count limit hit, setting cursor to now\n")
|
|
restartCount = 0
|
|
cursor = time.Now().UnixMicro()
|
|
} else {
|
|
restartCount += 1
|
|
cursor = int64(bskyClient.Bluesky.Cfg.Cursor)
|
|
}
|
|
goto loop
|
|
}
|
|
}
|
|
|
|
func (h *handler) HandleEvent(ctx context.Context, event *models.Event) error {
|
|
if event.Commit == nil {
|
|
return nil
|
|
}
|
|
|
|
switch event.Commit.Operation {
|
|
case models.CommitOperationCreate, models.CommitOperationUpdate:
|
|
h.bsky.Bluesky.Cfg.Cursor = event.TimeUS + 1 // +1 to not show same post
|
|
bsky.PersistAuthSession(h.bsky.Bluesky.Cfg)
|
|
h.ProcessPost(event)
|
|
case models.CommitOperationDelete:
|
|
h.bsky.Bluesky.Cfg.Cursor = event.TimeUS + 1 // +1 to not show same post
|
|
bsky.PersistAuthSession(h.bsky.Bluesky.Cfg)
|
|
r, e := h.bsky.Bluesky.GetTelegramData(event.Commit.RKey)
|
|
if e == "" {
|
|
m := tgbotapi.NewDeleteMessages(r.ChannelID, r.MessageID)
|
|
h.tg.Send(m)
|
|
h.bsky.Bluesky.DeleteRecord([]string{event.Commit.RKey, event.Did, "blue.zio.bsky2tg.post"})
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (h *handler) ProcessPost(event *models.Event) error {
|
|
ps, _ := h.bsky.ParsePost(event.Commit.Record)
|
|
po := ps.GetEmbeds()
|
|
cid, _ := strconv.ParseInt(os.Getenv("TG_CHANNEL_ID"), 10, 64)
|
|
|
|
if ps.IsReply() { //|| ps.IsQuotePost() {
|
|
// don't want to post replies to channel
|
|
return nil
|
|
}
|
|
|
|
var captionText string
|
|
if ps.IsQuotePost() {
|
|
if ps.Embed.Record.Type == "app.bsky.embed.record" {
|
|
handle, _ := h.bsky.GetHandleFromDID(strings.Split(ps.Embed.Record.Record.URI, "/")[2])
|
|
captionText = fmt.Sprintf(
|
|
quotePostFormat,
|
|
ps.ProcessFacets(h.bsky.Bluesky.FetchAliases()),
|
|
strings.Split(ps.Embed.Record.Record.URI, "/")[2],
|
|
strings.Split(ps.Embed.Record.Record.URI, "/")[4],
|
|
handle,
|
|
event.Did,
|
|
event.Commit.RKey,
|
|
h.bsky.Bluesky.Cfg.Handle)
|
|
} else {
|
|
handle, _ := h.bsky.GetHandleFromDID(strings.Split(ps.Embed.Record.URI, "/")[2])
|
|
captionText = fmt.Sprintf(
|
|
quotePostFormat,
|
|
ps.ProcessFacets(h.bsky.Bluesky.FetchAliases()),
|
|
strings.Split(ps.Embed.Record.URI, "/")[2],
|
|
strings.Split(ps.Embed.Record.URI, "/")[4],
|
|
handle,
|
|
event.Did,
|
|
event.Commit.RKey,
|
|
h.bsky.Bluesky.Cfg.Handle)
|
|
}
|
|
}
|
|
|
|
if captionText == "" {
|
|
if ps.ProcessFacets(h.bsky.Bluesky.FetchAliases()) != "" {
|
|
captionText = fmt.Sprintf(postFormat, ps.ProcessFacets(h.bsky.Bluesky.FetchAliases()), h.bsky.Bluesky.Cfg.DID, event.Commit.RKey, h.bsky.Bluesky.Cfg.Handle)
|
|
} else {
|
|
captionText = fmt.Sprintf("<a href=\"https://bsky.app/profile/%s/post/%s\">🦋 @%s</a>", h.bsky.Bluesky.Cfg.DID, event.Commit.RKey, h.bsky.Bluesky.Cfg.Handle)
|
|
}
|
|
}
|
|
|
|
// post has media
|
|
if len((*po)) != 0 {
|
|
mediaGroup := []tgbotapi.InputMedia{}
|
|
if (*po)[0].Type == "external" {
|
|
tenorGif := tgbotapi.NewInputMediaVideo(tgbotapi.FileURL((*po)[0].URI)) // is most likely gif from Tenor
|
|
tenorGif.Caption = captionText
|
|
tenorGif.ParseMode = tgbotapi.ModeHTML
|
|
mediaGroup = append(mediaGroup, &tenorGif)
|
|
} else {
|
|
for _, media := range *po {
|
|
switch media.Type {
|
|
case "image":
|
|
mediaAdd := tgbotapi.NewInputMediaPhoto(tgbotapi.FileURL(buildBlobURL(h.bsky.Bluesky.Cfg.PDSURL, h.bsky.Bluesky.Cfg.DID, media.URI)))
|
|
if len(mediaGroup) == 0 {
|
|
mediaAdd.Caption = captionText
|
|
mediaAdd.ParseMode = tgbotapi.ModeHTML
|
|
}
|
|
mediaGroup = append(mediaGroup, &mediaAdd)
|
|
case "video":
|
|
log.Printf("Fetching video: %s\n", buildBlobURL(h.bsky.Bluesky.Cfg.PDSURL, h.bsky.Bluesky.Cfg.DID, media.URI))
|
|
resp, _ := http.Get(buildBlobURL(h.bsky.Bluesky.Cfg.PDSURL, h.bsky.Bluesky.Cfg.DID, media.URI))
|
|
defer resp.Body.Close()
|
|
f, _ := os.Create(media.URI + ".mp4")
|
|
defer f.Close()
|
|
io.Copy(f, resp.Body)
|
|
f.Seek(0, 0)
|
|
mediaAdd := tgbotapi.NewInputMediaVideo(tgbotapi.FileReader{Name: "video.mp4", Reader: f})
|
|
metadata, err := getVideoMetadata(f.Name())
|
|
if err != nil {
|
|
log.Printf("Unable to read video metadata: %s\n", buildBlobURL(h.bsky.Bluesky.Cfg.PDSURL, h.bsky.Bluesky.Cfg.DID, media.URI))
|
|
break
|
|
}
|
|
mediaAdd.SupportsStreaming = true
|
|
mediaAdd.Height = metadata.Height()
|
|
mediaAdd.Width = metadata.Width()
|
|
mediaAdd.Duration = int(metadata.Duration())
|
|
|
|
frames, _ := metadata.ReadFrames(0)
|
|
var buf bytes.Buffer
|
|
jpeg.Encode(&buf, frames[0], &jpeg.Options{Quality: 90})
|
|
mediaAdd.Thumb = tgbotapi.FileBytes{Name: "thumb.jpg", Bytes: buf.Bytes()}
|
|
if len(mediaGroup) == 0 {
|
|
mediaAdd.Caption = captionText
|
|
mediaAdd.ParseMode = tgbotapi.ModeHTML
|
|
}
|
|
os.Remove(media.URI + ".mp4")
|
|
mediaGroup = append(mediaGroup, &mediaAdd)
|
|
}
|
|
}
|
|
}
|
|
if len(mediaGroup) == 0 {
|
|
log.Print("No mediaGroup to send, see previous error")
|
|
} else {
|
|
resp, _ := h.tg.SendMediaGroup(tgbotapi.NewMediaGroup(cid, mediaGroup))
|
|
uri, cid := getLink(event)
|
|
var messageIDs []int
|
|
for _, msgID := range resp {
|
|
messageIDs = append(messageIDs, msgID.MessageID)
|
|
}
|
|
h.bsky.Bluesky.CommitTelegramResponse(&bsky.TelegramRecord{
|
|
ChannelID: resp[0].Chat.ID,
|
|
MessageID: messageIDs,
|
|
Link: &bsky.Link{
|
|
Cid: cid,
|
|
URI: uri,
|
|
},
|
|
}, event.Commit.RKey)
|
|
}
|
|
} else {
|
|
m := tgbotapi.MessageConfig{}
|
|
if captionText == "" {
|
|
m = tgbotapi.NewMessage(cid, fmt.Sprintf(postFormat, ps.ProcessFacets(h.bsky.Bluesky.FetchAliases()), h.bsky.Bluesky.Cfg.DID, event.Commit.RKey, h.bsky.Bluesky.Cfg.Handle))
|
|
} else {
|
|
m = tgbotapi.NewMessage(cid, captionText)
|
|
}
|
|
m.ParseMode = tgbotapi.ModeHTML
|
|
if ps.IsQuotePost() {
|
|
m.LinkPreviewOptions = tgbotapi.LinkPreviewOptions{
|
|
IsDisabled: false,
|
|
URL: fmt.Sprintf("https://bsky.app/profile/%s/post/%s",
|
|
strings.Split(ps.Embed.Record.URI, "/")[2],
|
|
strings.Split(ps.Embed.Record.URI, "/")[4]),
|
|
PreferSmallMedia: false,
|
|
PreferLargeMedia: true,
|
|
ShowAboveText: true,
|
|
}
|
|
} else {
|
|
m.LinkPreviewOptions = tgbotapi.LinkPreviewOptions{IsDisabled: true}
|
|
}
|
|
resp, _ := h.tg.Send(m)
|
|
uri, cid := getLink(event)
|
|
h.bsky.Bluesky.CommitTelegramResponse(&bsky.TelegramRecord{
|
|
ChannelID: resp.Chat.ID,
|
|
MessageID: []int{resp.MessageID},
|
|
Link: &bsky.Link{
|
|
Cid: cid,
|
|
URI: uri,
|
|
},
|
|
}, event.Commit.RKey)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func buildBlobURL(server string, did string, cid string) string {
|
|
return server + "/xrpc/com.atproto.sync.getBlob?did=" + url.QueryEscape(did) + "&cid=" + url.QueryEscape(cid)
|
|
}
|
|
|
|
func getLink(event *models.Event) (string, string) {
|
|
return fmt.Sprintf("at://%s/%s/%s", event.Did, event.Commit.Collection, event.Commit.RKey), event.Commit.CID
|
|
}
|
|
|
|
func getVideoMetadata(fname string) (video *vidio.Video, err error) {
|
|
video, err = vidio.NewVideo(fname)
|
|
return
|
|
}
|